TaskVine Gutenberg Example
This workflow downloads a number of text files from Project Gutenberg, performs an all-to-all comparison of each pair using a Unix script. Demonstrates use of external data, caching, and shared data.
#!/usr/bin/env python
# This example shows some of the remote data handling features of taskvine.
# It performs an all-to-all comparison of twenty (relatively small) documents
# downloaded from the Gutenberg public archive.
# A small shell script (given inline below) is used to perform
# a simple text comparison of each pair of files.
import ndcctools.taskvine as vine
import argparse
import sys
urls_sources = [
# Perform a simple comparison of the words counts of each document
# which are given as the first ($1) and second ($2) command lines.
cat $1 | tr " " "\n" | sort | uniq -c | sort -rn | head -10l > a.tmp
cat $2 | tr " " "\n" | sort | uniq -c | sort -rn | head -10l > b.tmp
diff a.tmp b.tmp
exit 0
if __name__ == "__main__":
parser = argparse.ArgumentParser(
help="disable transfers among workers.",
m = vine.Manager()
print("listening on port", m.port)
args = parser.parse_args()
if args.disable_peer_transfers:
# declare all urls in the manager:
urls = map(lambda u: m.declare_url(u, cache="forever"), urls_sources)
# script to process the files
my_script = m.declare_buffer(compare_script, cache=True)
for (i, url_a) in enumerate(urls):
for (j, url_b) in enumerate(urls):
if url_a == url_b:
t = vine.Task("./my_script file_a.txt file_b.txt")
t.add_input(my_script, "my_script")
t.add_input(url_a, "file_a.txt")
t.add_input(url_b, "file_b.txt")
print(f"submitted task {t.id}: {t.command}")
print("waiting for tasks to complete...")
while not m.empty():
t = m.wait(5)
if t:
if t.successful():
print(f"task {t.id} result: {t.std_output}")
elif t.completed():
print(f"task {t.id} completed with an executin error, exit code {t.exit_code}")
print(f"task {t.id} failed with status {t.result}")
print("all tasks complete!")
# vim: set sts=4 sw=4 ts=4 expandtab ft=python: