TaskVine Gutenberg Example

This workflow downloads a number of text files from Project Gutenberg, performs an all-to-all comparison of each pair using a Unix script. Demonstrates use of external data, caching, and shared data.

#!/usr/bin/env python

# This example shows some of the remote data handling features of taskvine.
# It performs an all-to-all comparison of twenty (relatively small) documents
# downloaded from the Gutenberg public archive.

# A small shell script (given inline below) is used to perform
# a simple text comparison of each pair of files.

import ndcctools.taskvine as vine
import argparse
import sys

urls_sources = [

# Perform a simple comparison of the words counts of each document
# which are given as the first ($1) and second ($2) command lines.
cat $1 | tr " " "\n" | sort | uniq -c | sort -rn | head -10l > a.tmp
cat $2 | tr " " "\n" | sort | uniq -c | sort -rn | head -10l > b.tmp
diff a.tmp b.tmp
exit 0

if __name__ == "__main__":
    parser = argparse.ArgumentParser(

        help="disable transfers among workers.",

    m = vine.Manager()
    print("listening on port", m.port)

    args = parser.parse_args()

    if args.disable_peer_transfers:

    # declare all urls in the manager:
    urls = map(lambda u: m.declare_url(u, cache=True), urls_sources)

    # script to process the files
    my_script = m.declare_buffer(compare_script, cache=True)

    for (i, url_a) in enumerate(urls):
        for (j, url_b) in enumerate(urls):

            if url_a == url_b:

            t = vine.Task("./my_script file_a.txt file_b.txt")
            t.add_input(my_script, "my_script")
            t.add_input(url_a, "file_a.txt")
            t.add_input(url_b, "file_b.txt")


            print(f"submitted task {t.id}: {t.command}")

    print("waiting for tasks to complete...")
    while not m.empty():
        t = m.wait(5)
        if t:
            if t.successful():
                print(f"task {t.id} result: {t.std_output}")
            elif t.completed():
                print(f"task {t.id} completed with an executin error, exit code {t.exit_code}")
                print(f"task {t.id} failed with status {t.result}")

    print("all tasks complete!")