cctools
ndcctools.taskvine.dask_executor.DaskVine Class Reference
Inheritance diagram for ndcctools.taskvine.dask_executor.DaskVine:
ndcctools.taskvine.manager.Manager

Public Member Functions

def get (self, dsk, keys, *environment=None, extra_files=None, lazy_transfers=False, env_vars=None, low_memory_mode=False, checkpoint_fn=None, resources=None, resources_mode=None, submit_per_cycle=None, retries=5, verbose=False, lib_resources=None, lib_command=None, import_modules=None, task_mode='tasks', env_per_task=False)
 Execute the task graph dsk and return the results for keys in graph. More...
 
- Public Member Functions inherited from ndcctools.taskvine.manager.Manager
def __init__ (self, port=cvine.VINE_DEFAULT_PORT, name=None, shutdown=False, run_info_path="vine-run-info", staging_path=None, ssl=None, init_fn=None, status_display_interval=None)
 Create a new manager. More...
 
def name (self)
 Get the project name of the manager. More...
 
def port (self)
 Get the listening port of the manager. More...
 
def using_ssl (self)
 Whether the manager is using ssl to talk to workers. More...
 
def logging_directory (self)
 Get the logs directory of the manager. More...
 
def staging_directory (self)
 Get the staging directory of the manager. More...
 
def cache_directory (self)
 Get the caching directory of the manager. More...
 
def stats (self)
 Get manager statistics. More...
 
def stats_category (self, category)
 Get the task statistics for the given category. More...
 
def status (self, request)
 Get manager information as list of dictionaries. More...
 
def summarize_workers (self)
 Get resource statistics of workers connected. More...
 
def set_category_mode (self, category, mode)
 Turn on or off first-allocation labeling for a given category. More...
 
def set_category_autolabel_resource (self, category, resource, autolabel)
 Turn on or off first-allocation labeling for a given category and resource. More...
 
def task_state (self, task_id)
 Get current task state. More...
 
def enable_monitoring (self, watchdog=True, time_series=False)
 Enables resource monitoring for tasks. More...
 
def enable_peer_transfers (self)
 Enable P2P worker transfer functionality. More...
 
def disable_peer_transfers (self)
 Disable P2P worker transfer functionality. More...
 
def enable_disconnect_slow_workers (self, multiplier)
 Change the project name for the given manager. More...
 
def enable_disconnect_slow_workers_category (self, name, multiplier)
 Enable disconnect slow workers functionality for a given manager. More...
 
def set_draining_by_hostname (self, hostname, drain_mode=True)
 Turn on or off draining mode for workers at hostname. More...
 
def empty (self)
 Determine whether there are any known tasks managerd, running, or waiting to be collected. More...
 
def hungry (self)
 Determine whether the manager can support more tasks. More...
 
def set_scheduler (self, scheduler)
 Set the worker selection scheduler for manager. More...
 
def set_name (self, name)
 Change the project name for the given manager. More...
 
def set_manager_preferred_connection (self, mode)
 Set the preference for using hostname over IP address to connect. More...
 
def set_min_task_id (self, minid)
 Set the minimum task_id of future submitted tasks. More...
 
def set_priority (self, priority)
 Change the project priority for the given manager. More...
 
def tasks_left_count (self, ntasks)
 Specify the number of tasks not yet submitted to the manager. More...
 
def set_catalog_servers (self, catalogs)
 Specify the catalog servers the manager should report to. More...
 
def set_property (self, name, value)
 Add a global property to the manager which will be included in periodic reports to the catalog server and other telemetry destinations. More...
 
def set_runtime_info_path (self, dirname)
 Specify a directory to write logs and staging files. More...
 
def set_password (self, password)
 Add a mandatory password that each worker must present. More...
 
def set_password_file (self, file)
 Add a mandatory password file that each worker must present. More...
 
def set_resources_max (self, rmd)
 Specifies the maximum resources allowed for the default category. More...
 
def set_resources_min (self, rmd)
 Specifies the minimum resources allowed for the default category. More...
 
def set_category_resources_max (self, category, rmd)
 Specifies the maximum resources allowed for the given category. More...
 
def set_category_resources_min (self, category, rmd)
 Specifies the minimum resources allowed for the given category. More...
 
def set_category_first_allocation_guess (self, category, rmd)
 Specifies the first-allocation guess for the given category. More...
 
def initialize_categories (self, filename, rm)
 Initialize first value of categories. More...
 
def cancel_by_task_id (self, id)
 Cancel task identified by its task_id. More...
 
def cancel_by_task_tag (self, tag)
 Cancel task identified by its tag. More...
 
def cancel_by_category (self, category)
 Cancel all tasks of the given category. More...
 
def cancel_all (self)
 Cancel all tasks. More...
 
def workers_shutdown (self, n=0)
 Shutdown workers connected to manager. More...
 
def block_host (self, host)
 Block workers running on host from working for the manager. More...
 
def blacklist (self, host)
 Replaced by ndcctools.taskvine.manager.Manager.block_host. More...
 
def block_host_with_timeout (self, host, timeout)
 Block workers running on host for the duration of the given timeout. More...
 
def blacklist_with_timeout (self, host, timeout)
 See ndcctools.taskvine.manager.Manager.block_host_with_timeout. More...
 
def unblock_host (self, host=None)
 Unblock given host, of all hosts if host not given. More...
 
def blacklist_clear (self, host=None)
 See ndcctools.taskvine.manager.Manager.unblock_host. More...
 
def set_keepalive_interval (self, interval)
 Change keepalive interval for a given manager. More...
 
def set_keepalive_timeout (self, timeout)
 Change keepalive timeout for a given manager. More...
 
def tune (self, name, value)
 Tune advanced parameters. More...
 
def submit (self, task)
 Submit a task to the manager. More...
 
def install_library (self, task)
 Submit a library to install on all connected workers. More...
 
def remove_library (self, name)
 Remove a library from all connected workers. More...
 
def create_library_from_functions (self, name, *function_list, poncho_env=None, init_command=None, add_env=True, import_modules=None)
 Turn a list of python functions into a Library Task. More...
 
def create_library_from_serverized_files (self, name, library_path, env=None)
 Turn Library code created with poncho_package_serverize into a Library Task. More...
 
def create_library_from_command (self, executable_path, name, env=None)
 Create a Library task from arbitrary inputs. More...
 
def wait (self, timeout="wait_forever")
 Wait for tasks to complete. More...
 
def wait_for_tag (self, tag, timeout="wait_forever")
 Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified tag. More...
 
def wait_for_task_id (self, task_id, timeout="wait_forever")
 Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified task_id. More...
 
def application_info (self)
 Should return a dictionary with information for the status display. More...
 
def map (self, fn, seq, chunksize=1)
 Maps a function to elements in a sequence using taskvine. More...
 
def pair (self, fn, seq1, seq2, chunksize=1, env=None)
 Returns the values for a function of each pair from 2 sequences. More...
 
def tree_reduce (self, fn, seq, chunksize=2)
 Reduces a sequence until only one value is left, and then returns that value. More...
 
def remote_map (self, fn, seq, library, name, chunksize=1)
 Maps a function to elements in a sequence using taskvine remote task. More...
 
def remote_pair (self, fn, seq1, seq2, library, name, chunksize=1)
 Returns the values for a function of each pair from 2 sequences using remote task. More...
 
def remote_tree_reduce (self, fn, seq, library, name, chunksize=2)
 Reduces a sequence until only one value is left, and then returns that value. More...
 
def declare_file (self, path, cache=False, peer_transfer=True, unlink_when_done=False)
 Declare a file obtained from the local filesystem. More...
 
def fetch_file (self, file)
 Fetch file contents from the cluster or local disk. More...
 
def undeclare_file (self, file)
 Un-declare a file that was created by declare_file or similar methods. More...
 
def undeclare_function (self, fn)
 Remove the manager's local serialized copy of a function used with PythonTask. More...
 
def declare_temp (self)
 Declare an anonymous file has no initial content, but is created as the output of a task, and may be consumed by other tasks. More...
 
def declare_url (self, url, cache=False, peer_transfer=True)
 Declare a file obtained from a remote URL. More...
 
def declare_buffer (self, buffer=None, cache=False, peer_transfer=True)
 Declare a file created from a buffer in memory. More...
 
def declare_minitask (self, minitask, source, cache=False, peer_transfer=True)
 Declare a file created by executing a mini-task. More...
 
def declare_untar (self, tarball, cache=False, peer_transfer=True)
 Declare a file created by by unpacking a tar file. More...
 
def declare_poncho (self, package, cache=False, peer_transfer=True)
 Declare a file that sets up a poncho environment. More...
 
def declare_starch (self, starch, cache=False, peer_transfer=True)
 Declare a file create a file by unpacking a starch package. More...
 
def declare_xrootd (self, source, proxy=None, env=None, cache=False, peer_transfer=True)
 Declare a file from accessible from an xrootd server. More...
 
def declare_chirp (self, server, source, ticket=None, env=None, cache=False, peer_transfer=True)
 Declare a file from accessible from an xrootd server. More...
 
def log_txn_app (self, entry)
 Adds a custom APPLICATION entry to the transactions log. More...
 
def log_debug_app (self, entry)
 Adds a custom APPLICATION entry to the debug log. More...
 

Detailed Description

TaskVine Manager specialized to compute dask graphs.

Managers created via DaskVine can be used to execute dask graphs via the method ndcctools.taskvine.dask_executor.DaskVine.get as follows:

m = DaskVine(...)
# Initialize as any other. @see ndcctools.taskvine.manager.Manager
result = v.compute(scheduler= m.get)
# or set by temporarily as the default for dask:
with dask.config.set(scheduler=m.get):
result = v.compute()

Parameters for execution can be set as arguments to the compute function. These arguments are applied to each task executed:

my_env = m.declare_poncho("my_env.tar.gz")
with dask.config.set(scheduler=m.get):
# Each task uses at most 4 cores, they run in the my_env environment, and
# their allocation is set to maximum values seen.
# If resource_mode is different than None, then the resource monitor is activated.
result = v.compute(resources={"cores": 1}, resources_mode="max", environment=my_env)

Member Function Documentation

◆ get()

def ndcctools.taskvine.dask_executor.DaskVine.get (   self,
  dsk,
  keys,
environment = None,
  extra_files = None,
  lazy_transfers = False,
  env_vars = None,
  low_memory_mode = False,
  checkpoint_fn = None,
  resources = None,
  resources_mode = None,
  submit_per_cycle = None,
  retries = 5,
  verbose = False,
  lib_resources = None,
  lib_command = None,
  import_modules = None,
  task_mode = 'tasks',
  env_per_task = False 
)

Execute the task graph dsk and return the results for keys in graph.

Parameters
dskThe task graph to execute.
keysA possible nested list of keys to compute the value from dsk.
environmentA taskvine file representing an environment to run the tasks.
extra_filesA dictionary of {taskvine.File: "remote_name"} to add to each task.
lazy_transfersWhether to keep intermediate results only at workers (True) or to bring back each result to the manager (False, default). True is more IO efficient, but runs the risk of needing to recompute results if workers are lost.
env_varsA dictionary of VAR=VALUE environment variables to set per task. A value should be either a string, or a function that accepts as arguments the manager and task, and that returns a string.
low_memory_modeSplit graph vertices to reduce memory needed per function call. It removes some of the dask graph optimizations, thus proceed with care.
checkpoint_fnWhen using lazy_transfers, a predicate with arguments (dag, key) called before submitting a task. If True, the result is brought back to the manager.
resourcesA dictionary with optional keys of cores, memory and disk (MB) to set maximum resource usage per task.
lib_resourcesA dictionary with optional keys of cores, memory and disk (MB)
lib_commandA command to be prefixed to the execution of a Library task.
import_modulesHoist these module imports for the DaskVine Library.
env_per_taskexecute each task
resources_modeAutomatically resize allocation per task. One of 'fixed' (use the value of 'resources' above), 'max througput', 'max' (for maximum values seen), 'min_waste', 'greedy bucketing' or 'exhaustive bucketing'. This is done per function type in dsk.
task_modeCreate tasks as either as 'tasks' (using PythonTasks) or 'function-calls' (using FunctionCalls)
retriesNumber of times to attempt a task. Default is 5.
submit_per_cycleMaximum number of tasks to serialize per wait call. If None, or less than 1, then all tasks are serialized as they are available.
verboseif true, emit additional debugging information.
env_per_taskif true, each task individually expands its own environment. Must use environment option as a str.

The documentation for this class was generated from the following file: