TaskVine Mosaic Example

This workflow downloads a common image, transforms it using the convert tool to produce a mosaic. Demonstrates use of remote data, unpacking, caching, starch, and temporary files.

#!/usr/bin/env python

# This example program produces a mosaic of images, each one transformed
# with a different amount of swirl.

# It demonstrates several features of TaskVine:
#
# - Each task consumes remote data accessed via url, cached and shared
# among all tasks on that machine.
#
# - Each task uses the "convert" program, which may or may not be installed
# on remote machines.  To make the tasks portable, the program "/usr/bin/convert"
# is packaged up into a self-contained archive "convert.sfx" which contains
# the executable and all of its dynamic dependencies.  This allows the
# use of arbitrary workers without regard to their software context.

import ndcctools.taskvine as vine
import argparse
import os
import sys

# construct a starch package that ensures convert and montage are available
# when the task executes at the worker.
def create_package(package_name, convert="convert", montage="montage"):
    import subprocess
    # add the executables convert, and montage to the package_name starch file.
    # these executables are assumed to be in the current $PATH if a full path was not specified.
    # by default, the starch file will execute the convert command.

    if os.path.exists(package_name):
        print(f"reusing existing {package_name} starch file...")
    else:
        try:
            print(f"creating {package_name} starch file...")
            subprocess.run(["starch", "-x", "convert", "-x", "montage", "-c", "montage", package_name], check=True)
        except subprocess.CalledProcessError:
            print("could not create package.")
            print("check that starch is in $PATH, and check that ImageMagick executables are in $PATH,")
            print("or provide their location at the command line.")


# helper function to report final status of a task
def process_result(t):
    if t:
        if t.successful():
            print(f"task {t.id} done: {t.command}")
        elif t.completed():
            print(f"task {t.id} completed with an execution error, exit code {t.exit_code}")
        else:
            print(f"task {t.id} failed with status {t.result}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
            prog="vine_example_mosaic.py",
            description="This example TaskVine program produces a mosaic of images, each one transformed with a different amount of swirl. Each task consumes remote data accessed via url, cached and shared among all tasks on that machine. Also each task is executed in a starch package that ensures that the required executables and libraries are available at the worker.")

    parser.add_argument('--convert', action='store', help='path to the convert ImageMagick executable.', default="convert")
    parser.add_argument('--montage', action='store', help='path to the montage ImageMagick executable.', default="montage")
    args = parser.parse_args()

    package_filename = "convert_montage.sfx"
    create_package(package_filename)

    m = vine.Manager()
    print(f"listening on port {m.port}")

    # declare the package just created as a starch file.
    # when this package is associated with a task, it will be expanded and wrap its
    # command.
    package = m.declare_starch(package_filename)

    # source image to which all the operations will be applied
    image_file = m.declare_url("https://upload.wikimedia.org/wikipedia/commons/7/74/A-Cat.jpg", cache=True)


    # the image_file will be rotated in steps of 10 degrees by the tasks. The
    # result of these rotations is kept in a vine temporary file. These
    # temporary files stay at the workers for task reuse, and are not
    # transfered back to the manager as outputs.

    # swirl angle -> vine temporary file
    convert_temporary_outputs = {}

    # from 0 to 360, by steps of 10 degrees
    for angle in range(0, 360, 10):
        # the command the task will execute. The convert executable will be
        # provided by the starch package.
        # note that all tasks will produce as a result a file named output.jpg
        # in their respective sandboxes. It is this file that will be declared
        # as temporary file.
        command = f"convert -swirl {angle} cat.jpg output.jpg"

        t = vine.Task(command)
        t.add_starch_package(package)

        # add the main source image
        t.add_input(image_file,"cat.jpg")

        # declare the temporary file, associate it with output.jpg, and record
        # it for future use in montage.
        f = m.declare_temp()
        convert_temporary_outputs[angle] = f
        t.add_output(f, "output.jpg")

        # specify that tasks won't use more than one core. This allows the
        # manager to dispatch as many tasks to a worker as its number of cores.
        t.set_cores(1)

        m.submit(t)
        print(f"submitted task {t.id}: {t.command}")

    print("waiting for convert tasks to complete...")
    print("please create a worker in another terminal. E.g., for a local worker:")
    print(f"vine_worker localhost {m.port}")

    while not m.empty():
        t = m.wait(5)
        if t:
            process_result(t)

        # local image name to use for the temporary file when the convert task
        # executes.
        outfile = f"{angle}.cat.jpg"

    print("Combining images into mosaic.jpg...")

    # create a tasks that combines the results of all the convert tasks.
    # each convert temporary output will be mapped to an {angle}.cat.jpg file.
    # the montage executable will be provided by the starch package
    t = vine.Task("montage `ls *.cat.jpg | sort -n` -tile 6x6 -geometry 128x128+0+0 output_of_montage.jpg")
    t.add_starch_package(package)

    for (angle, f) in convert_temporary_outputs.items():
        convert_output = f"{angle}.cat.jpg"
        t.add_input(f, convert_output)

    # declare the final image file
    final_montage = m.declare_file("mosaic.jpg")

    # add the final output, mapping mosaic.jpg to the file generated by montage: output_of_montage.jpg
    # note that output_of_montage.jpg could have been named mosaic.jpg, as
    # final_montage refers to the file in the manager's filesystem, not the
    # task sandbox.
    t.add_output(final_montage, "output_of_montage.jpg")
    m.submit(t)

    print("waiting for final task to complete...")
    while not m.empty():
        t = m.wait(5)
        if t:
            process_result(t)
    print("all tasks complete!")


    print("deleting temporary files at the worker.")
    # now that we are done using the temporary files, we can delete them from the workers.
    # in this small example this is not strictly necessary, as these files are
    # deleted from workers once the manager terminates.
    for f in convert_temporary_outputs.values():
        m.undeclare_file(f)

# vim: set sts=4 sw=4 ts=4 expandtab ft=python: