TaskVine Gutenberg Example

This workflow downloads a number of text files from Project Gutenberg, performs an all-to-all comparison of each pair using a Unix script. Demonstrates use of external data, caching, and shared data.

#!/usr/bin/env python

# This example shows some of the remote data handling features of taskvine.
# It performs an all-to-all comparison of twenty (relatively small) documents
# downloaded from the Gutenberg public archive.

# A small shell script (given inline below) is used to perform
# a simple text comparison of each pair of files.

import ndcctools.taskvine as vine
import argparse
import sys

urls_sources = [
    "http://www.gutenberg.org/files/1960/1960.txt",
    "http://www.gutenberg.org/files/1961/1961.txt",
    "http://www.gutenberg.org/files/1962/1962.txt",
    "http://www.gutenberg.org/files/1963/1963.txt",
    "http://www.gutenberg.org/files/1965/1965.txt",
    "http://www.gutenberg.org/files/1966/1966.txt",
    "http://www.gutenberg.org/files/1967/1967.txt",
    "http://www.gutenberg.org/files/1968/1968.txt",
    "http://www.gutenberg.org/files/1969/1969.txt",
    "http://www.gutenberg.org/files/1970/1970.txt",
    "http://www.gutenberg.org/files/1971/1971.txt",
    "http://www.gutenberg.org/files/1972/1972.txt",
    "http://www.gutenberg.org/files/1973/1973.txt",
    "http://www.gutenberg.org/files/1974/1974.txt",
    "http://www.gutenberg.org/files/1975/1975.txt",
    "http://www.gutenberg.org/files/1976/1976.txt",
    "http://www.gutenberg.org/files/1977/1977.txt",
    "http://www.gutenberg.org/files/1978/1978.txt",
    "http://www.gutenberg.org/files/1979/1979.txt",
    "http://www.gutenberg.org/files/1980/1980.txt",
    "http://www.gutenberg.org/files/1981/1981.txt",
    "http://www.gutenberg.org/files/1982/1982.txt",
    "http://www.gutenberg.org/files/1983/1983.txt",
    "http://www.gutenberg.org/files/1985/1985.txt",
    "http://www.gutenberg.org/files/1986/1986.txt",
    "http://www.gutenberg.org/files/1987/1987.txt",
]

compare_script="""
#!/bin/sh
# Perform a simple comparison of the words counts of each document
# which are given as the first ($1) and second ($2) command lines.
cat $1 | tr " " "\n" | sort | uniq -c | sort -rn | head -10l > a.tmp
cat $2 | tr " " "\n" | sort | uniq -c | sort -rn | head -10l > b.tmp
diff a.tmp b.tmp
exit 0
"""

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        prog="vine_example_gutenberg.py",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    parser.add_argument(
        "--disable-peer-transfers",
        action="store_true",
        help="disable transfers among workers.",
        default=False,
    )

    m = vine.Manager()
    print("listening on port", m.port)

    args = parser.parse_args()

    if args.disable_peer_transfers:
        m.disable_peer_transfers()

    # declare all urls in the manager:
    urls = map(lambda u: m.declare_url(u, cache="forever"), urls_sources)

    # script to process the files
    my_script = m.declare_buffer(compare_script, cache=True)

    for (i, url_a) in enumerate(urls):
        for (j, url_b) in enumerate(urls):

            if url_a == url_b:
                continue

            t = vine.Task("./my_script file_a.txt file_b.txt")
            t.add_input(my_script, "my_script")
            t.add_input(url_a, "file_a.txt")
            t.add_input(url_b, "file_b.txt")

            t.set_cores(1)

            m.submit(t)
            print(f"submitted task {t.id}: {t.command}")

    print("waiting for tasks to complete...")
    while not m.empty():
        t = m.wait(5)
        if t:
            if t.successful():
                print(f"task {t.id} result: {t.std_output}")
            elif t.completed():
                print(f"task {t.id} completed with an executin error, exit code {t.exit_code}")
            else:
                print(f"task {t.id} failed with status {t.result}")

    print("all tasks complete!")
# vim: set sts=4 sw=4 ts=4 expandtab ft=python: