TaskVine Gradient Descent Example

This workflow performs a distributed gradient descent to optmize a function. Demonstrates the use of serverless computing, and distributed python environments.

#!/usr/bin/env python3

# Copyright (C) 2023- The University of Notre Dame
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.

# This shows an example of using Library Tasks and FunctionCall Tasks.
# Gradient descent is an algorithm used to optimize the weights of machine
# learning models and regressions.  Running multiple instances of gradient
# descent can be used to overcome local minima and approach the global minimum
# for the best model possible.

# The manager first creates a Library Task that includes the code for the
# gradient descent algorithm, and installs it on connected workers.  The
# manager then creates FunctionCall Task that run individual instance of
# gradient descent for a randomized model.  The manager runs many of these
# tasks, and returns the set of weights that had the lowest overall error.

import ndcctools.taskvine as vine
import json
import numpy as np
import random
import argparse
import getpass


def batch_gradient_descent(train_data, test_data, number_of_params, max_iterations, min_error, learning_rate):
    """ This is the function we will pack into a Library Task.
        arguments:
        train_data:       List of data to train a model with simple linear regression
                        with polynomial basis functions
        test_data:        List of data to test the fit of the model
        number_of_params: the number of parameters/basis functions in the model
        max_iterations:   maximum number of iterations of weight updates to perform
        min_error:        continue training until the difference in error between
                        iterations is less than this value - or max_iterations is reached.
        learning_rate:    how much to update the weights each iteration

        It returns the optimized set of weights """

    # Note that we import the python modules again inside the function. This is
    # because this function will be executed remotely independent from this
    # current python program.
    import random
    import time
    import numpy as np

    # seed the random number generator to ensure this model has a random set of weights
    random.seed(time.time())

    # convert the training and testing data back into np arrays
    train_data = np.array(train_data)
    test_data = np.array(test_data)

    # compute the phi matrix. each row is a different x_i, and each column is (x_i)^n where n is the column number
    phi = np.matrix([[x[0] ** i for i in range(number_of_params)] for x in train_data])

    # ground truth observed value in the training data
    observed_values = np.matrix([[x[1]] for x in train_data])

    # initial randomized w matrix
    w_initial = np.matrix([[random.random()] for x in range(number_of_params)])

    # calculuate the RMS error of the set of weights passed in the argument and return it
    def calculate_error(w_i):
        return 1 / 2 * (phi @ w_i - observed_values).T @ (phi @ w_i - observed_values)

    # compute the batch gradient descent algorithm
    for i in range(max_iterations):
        # update the next set of weights
        w_next = w_initial - learning_rate * (phi.T @ phi @ w_initial - phi.T @ observed_values)

        # if the error is below our threshold, we are finished!
        if calculate_error(w_initial) - calculate_error(w_next) < min_error:
            break

        # otherwise, get ready to update the weights again
        w_initial = w_next

    # return the final set of weights, and the error
    return [w_next.tolist(), calculate_error(w_next).item()]


def main(name, port, number_of_params, max_iterations, min_error, learning_rate, num_tasks):
    m = vine.Manager(name=name, port=port)

    # enable peer transfers to speed up Library environment delivery
    m.enable_peer_transfers()

    # create Library Task from batch_gradient_descent function, and call it gradient_descent_library
    t = m.create_library_from_functions("gradient_descent_library", batch_gradient_descent)

    # specify resources used by Library Task
    t.set_cores(1)
    t.set_disk(2000)
    t.set_memory(2000)

    # install the Library Task on all workers that will connected to the manager
    m.install_library(t)

    # create our data by sampling 1000 points off a sin curve and adding noise
    # can change the function to perform a regression of different functions,
    # or even input other kinds of data
    x_data = np.linspace(0, 1, 100)
    t_data = np.sin(x_data * 2 * np.pi) + np.random.normal(loc=0, scale=0.1, size=x_data.shape)

    # split data into training and test data
    data = np.column_stack((x_data, t_data))
    train_data = []
    test_data = []
    for i in range(len(data)):
        # place 30% of the data into testing data, and the rest into training data
        if random.randint(1, 10) <= 3:
            test_data.append(list(data[i]))
        else:
            train_data.append(list(data[i]))

    # Create FunctionCall Tasks to run the gradient descent operations
    for i in range(num_tasks):
        # the name of the function to be called is batch_gradient_descent, and
        # the name of the Library that it lives in is gradient_descent_library
        # arguments are train_data, test_data, and iterations control
        t = vine.FunctionCall(
                "batch_gradient_descent",
                "gradient_descent_library",
                train_data, test_data,
                number_of_params, max_iterations, min_error, learning_rate)

        # specify resources used by FunctionCall
        t.set_cores(1)
        t.set_disk(1500)
        t.set_memory(1500)
        m.submit(t)

    # keep track of the best set of weights and the lowest error
    best_weights = []
    best_error = float('inf')

    print(f"TaskVine listening for workers on port {m.port}")

    print("waiting for tasks to complete...")
    while not m.empty():
        t = m.wait(5)
        if t:
            try:
                weights, error = json.loads(t.output)["Result"]
                if error < best_error:
                    best_weights = weights
                    best_error = error
            except Exception:
                print(f"Error reading result of task {t.task_id}")
    print(f"The best weights are: {best_weights}")
    print(f"With an RMS error of {best_error}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(prog="vine_example_gradient_descent.py",
                                     description="""This shows an example of using Library Tasks and FunctionCall Tasks.
Gradient descent is an algorithm used to optimize the weights of machine learning models and regressions.""")
    parser.add_argument('--name', nargs='?', type=str, help='name to assign to the manager.', default=f'vine-bgd-{getpass.getuser()}',)
    parser.add_argument('--port', nargs='?', type=int, help='port for the manager to listen for connections. If 0, pick any available.', default=9123,)
    parser.add_argument('--params', nargs='?', type=int, help='the number of parameters/basis functions in the model', default=100)
    parser.add_argument('--iterations', nargs='?', type=int, help='maximum number of iterations of weight updates to perform', default=100000000)
    parser.add_argument('--error', nargs='?', type=float, help='stop when the fit error is less than this value', default=1e-02)
    parser.add_argument('--rate', nargs='?', type=float, help='how much to update the weights each iteration', default=0.0005)
    parser.add_argument('--tasks', nargs='?', type=int, help='number of tasks to run', default=20)
    args = parser.parse_args()

    main(args.name, args.port, args.params, args.iterations, args.error, args.rate, args.tasks)
# vim: set sts=4 sw=4 ts=4 expandtab ft=python: