TaskVine Example: BLAST Query

This workflow downloads the BLAST genome search tool and a protein database, and performs multiple queries against it. It demonstrates use of remote data, unpacking, temporary files, and immediate buffer data.

#!/usr/bin/env python

# Copyright (C) 2022- The University of Notre Dame
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.

# This example shows some of the data handling features of taskvine.
# It performs a BLAST search of the "Landmark" model organism database.
# It works by constructing tasks that download the blast executable
# and landmark database from NCBI, and then performs a short query.

# Each task in the workflow performs a query of the database using
# 16 (random) query strings generated at the manager.
# Both the downloads are automatically unpacked, cached, and shared
# with all the same tasks on the worker.

import ndcctools.taskvine as vine
import random
import argparse
import getpass

# Permitted letters in an amino acid sequence
amino_letters = "ACGTUiRYKMSWBDHVN"

# Number of characters in each query
query_length = 128


def make_query_text(query_count):
    """Create a query string consisting of {query_count} sequences of {query_length} characters."""
    queries = []
    for i in range(query_count):
        query = "".join(random.choices(amino_letters, k=query_length))
        queries.append(query)
    return ">query\n" + "\n".join(queries) + "\n"


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        prog="vine_example_blast.py",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
        description="""This example shows some of the data handling features of taskvine.
It performs a BLAST search of the "Landmark" model organism database.
It works by constructing tasks that download the blast executable
and landmark database from NCBI, and then performs a short query.

Each task in the workflow performs a query of the database using
16 (random) query strings generated at the manager.
Both the downloads are automatically unpacked, cached, and shared
with all the same tasks on the worker.""",
    )

    parser.add_argument(
        "--task-count",
        nargs="?",
        type=int,
        help="the number of tasks to generate.",
        default=10,
    )
    parser.add_argument(
        "--query-count",
        nargs="?",
        type=int,
        help="the number of queries to generate per task.",
        default=16,
    )
    parser.add_argument(
        "--name",
        nargs="?",
        type=str,
        help="name to assign to the manager.",
        default=f"vine-blast-{getpass.getuser()}",
    )
    parser.add_argument(
        "--port",
        nargs="?",
        type=int,
        help="port for the manager to listen for connections. If 0, pick any available.",
        default=9123,
    )
    parser.add_argument(
        "--disable-peer-transfers",
        action="store_true",
        help="disable transfers among workers.",
        default=False,
    )
    parser.add_argument(
        "--max-concurrent-transfers",
        nargs="?",
        type=int,
        help="maximum number of concurrent peer transfers",
        default=3,
    )
    args = parser.parse_args()

    m = vine.Manager(port=args.port)
    m.set_name(args.name)

    if args.disable_peer_transfers:
        m.disable_peer_transfers()

    if args.max_concurrent_transfers:
        m.tune("worker-source-max-transfers", args.max_concurrent_transfers)

    print("Declaring files...")
    blast_url = m.declare_url(
        "https://ftp.ncbi.nlm.nih.gov/blast/executables/blast+/2.13.0/ncbi-blast-2.13.0+-x64-linux.tar.gz",
        cache="always",  # with "always", workers keep this file until they are terminated
    )
    blast = m.declare_untar(blast_url, cache="always")

    landmark_url = m.declare_url(
        "https://ftp.ncbi.nlm.nih.gov/blast/db/landmark.tar.gz",
        cache="always",
    )
    landmark = m.declare_untar(landmark_url)

    print("Declaring tasks...")
    for i in range(args.task_count):
        query = m.declare_buffer(make_query_text(args.query_count))
        t = vine.Task(
            command="blastdir/ncbi-blast-2.13.0+/bin/blastp -db landmark -query query.file",
            inputs={
                query: {"remote_name": "query.file"},
                blast: {"remote_name": "blastdir"},
                landmark: {"remote_name": "landmark"},
            },
            env={"BLASTDB": "landmark"},
            cores=1,
        )

        task_id = m.submit(t)
        print(f"submitted task {t.id}: {t.command}")

    print(f"TaskVine listening for workers on {m.port}")

    print("Waiting for tasks to complete...")
    while not m.empty():
        t = m.wait(5)
        if t:
            if t.successful():
                print(f"task {t.id} result: {t.std_output}")
            elif t.completed():
                print(
                    f"task {t.id} completed with an executin error, exit code {t.exit_code}"
                )
            else:
                print(f"task {t.id} failed with status {t.result}")

    print("all tasks complete!")