cctools
|
Public Member Functions | |
def | get (self, dsk, keys, *environment=None, extra_files=None, lazy_transfers=False, env_vars=None, low_memory_mode=False, checkpoint_fn=None, resources=None, resources_mode=None, submit_per_cycle=None, retries=5, verbose=False, lib_resources=None, lib_command=None, import_modules=None, task_mode='tasks', env_per_task=False) |
Execute the task graph dsk and return the results for keys in graph. More... | |
Public Member Functions inherited from ndcctools.taskvine.manager.Manager | |
def | __init__ (self, port=cvine.VINE_DEFAULT_PORT, name=None, shutdown=False, run_info_path="vine-run-info", staging_path=None, ssl=None, init_fn=None, status_display_interval=None) |
Create a new manager. More... | |
def | name (self) |
Get the project name of the manager. More... | |
def | port (self) |
Get the listening port of the manager. More... | |
def | using_ssl (self) |
Whether the manager is using ssl to talk to workers. More... | |
def | logging_directory (self) |
Get the logs directory of the manager. More... | |
def | staging_directory (self) |
Get the staging directory of the manager. More... | |
def | cache_directory (self) |
Get the caching directory of the manager. More... | |
def | stats (self) |
Get manager statistics. More... | |
def | stats_category (self, category) |
Get the task statistics for the given category. More... | |
def | status (self, request) |
Get manager information as list of dictionaries. More... | |
def | summarize_workers (self) |
Get resource statistics of workers connected. More... | |
def | set_category_mode (self, category, mode) |
Turn on or off first-allocation labeling for a given category. More... | |
def | set_category_autolabel_resource (self, category, resource, autolabel) |
Turn on or off first-allocation labeling for a given category and resource. More... | |
def | task_state (self, task_id) |
Get current task state. More... | |
def | enable_monitoring (self, watchdog=True, time_series=False) |
Enables resource monitoring for tasks. More... | |
def | enable_peer_transfers (self) |
Enable P2P worker transfer functionality. More... | |
def | disable_peer_transfers (self) |
Disable P2P worker transfer functionality. More... | |
def | enable_disconnect_slow_workers (self, multiplier) |
Change the project name for the given manager. More... | |
def | enable_disconnect_slow_workers_category (self, name, multiplier) |
Enable disconnect slow workers functionality for a given manager. More... | |
def | set_draining_by_hostname (self, hostname, drain_mode=True) |
Turn on or off draining mode for workers at hostname. More... | |
def | empty (self) |
Determine whether there are any known tasks managerd, running, or waiting to be collected. More... | |
def | hungry (self) |
Determine whether the manager can support more tasks. More... | |
def | set_scheduler (self, scheduler) |
Set the worker selection scheduler for manager. More... | |
def | set_name (self, name) |
Change the project name for the given manager. More... | |
def | set_manager_preferred_connection (self, mode) |
Set the preference for using hostname over IP address to connect. More... | |
def | set_min_task_id (self, minid) |
Set the minimum task_id of future submitted tasks. More... | |
def | set_priority (self, priority) |
Change the project priority for the given manager. More... | |
def | tasks_left_count (self, ntasks) |
Specify the number of tasks not yet submitted to the manager. More... | |
def | set_catalog_servers (self, catalogs) |
Specify the catalog servers the manager should report to. More... | |
def | set_property (self, name, value) |
Add a global property to the manager which will be included in periodic reports to the catalog server and other telemetry destinations. More... | |
def | set_runtime_info_path (self, dirname) |
Specify a directory to write logs and staging files. More... | |
def | set_password (self, password) |
Add a mandatory password that each worker must present. More... | |
def | set_password_file (self, file) |
Add a mandatory password file that each worker must present. More... | |
def | set_resources_max (self, rmd) |
Specifies the maximum resources allowed for the default category. More... | |
def | set_resources_min (self, rmd) |
Specifies the minimum resources allowed for the default category. More... | |
def | set_category_resources_max (self, category, rmd) |
Specifies the maximum resources allowed for the given category. More... | |
def | set_category_resources_min (self, category, rmd) |
Specifies the minimum resources allowed for the given category. More... | |
def | set_category_first_allocation_guess (self, category, rmd) |
Specifies the first-allocation guess for the given category. More... | |
def | initialize_categories (self, filename, rm) |
Initialize first value of categories. More... | |
def | cancel_by_task_id (self, id) |
Cancel task identified by its task_id. More... | |
def | cancel_by_task_tag (self, tag) |
Cancel task identified by its tag. More... | |
def | cancel_by_category (self, category) |
Cancel all tasks of the given category. More... | |
def | cancel_all (self) |
Cancel all tasks. More... | |
def | workers_shutdown (self, n=0) |
Shutdown workers connected to manager. More... | |
def | block_host (self, host) |
Block workers running on host from working for the manager. More... | |
def | blacklist (self, host) |
Replaced by ndcctools.taskvine.manager.Manager.block_host. More... | |
def | block_host_with_timeout (self, host, timeout) |
Block workers running on host for the duration of the given timeout. More... | |
def | blacklist_with_timeout (self, host, timeout) |
See ndcctools.taskvine.manager.Manager.block_host_with_timeout. More... | |
def | unblock_host (self, host=None) |
Unblock given host, of all hosts if host not given. More... | |
def | blacklist_clear (self, host=None) |
See ndcctools.taskvine.manager.Manager.unblock_host. More... | |
def | set_keepalive_interval (self, interval) |
Change keepalive interval for a given manager. More... | |
def | set_keepalive_timeout (self, timeout) |
Change keepalive timeout for a given manager. More... | |
def | tune (self, name, value) |
Tune advanced parameters. More... | |
def | submit (self, task) |
Submit a task to the manager. More... | |
def | install_library (self, task) |
Submit a library to install on all connected workers. More... | |
def | remove_library (self, name) |
Remove a library from all connected workers. More... | |
def | create_library_from_functions (self, name, *function_list, poncho_env=None, init_command=None, add_env=True, import_modules=None) |
Turn a list of python functions into a Library Task. More... | |
def | create_library_from_serverized_files (self, name, library_path, env=None) |
Turn Library code created with poncho_package_serverize into a Library Task. More... | |
def | create_library_from_command (self, executable_path, name, env=None) |
Create a Library task from arbitrary inputs. More... | |
def | wait (self, timeout="wait_forever") |
Wait for tasks to complete. More... | |
def | wait_for_tag (self, tag, timeout="wait_forever") |
Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified tag. More... | |
def | wait_for_task_id (self, task_id, timeout="wait_forever") |
Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified task_id. More... | |
def | application_info (self) |
Should return a dictionary with information for the status display. More... | |
def | map (self, fn, seq, chunksize=1) |
Maps a function to elements in a sequence using taskvine. More... | |
def | pair (self, fn, seq1, seq2, chunksize=1, env=None) |
Returns the values for a function of each pair from 2 sequences. More... | |
def | tree_reduce (self, fn, seq, chunksize=2) |
Reduces a sequence until only one value is left, and then returns that value. More... | |
def | remote_map (self, fn, seq, library, name, chunksize=1) |
Maps a function to elements in a sequence using taskvine remote task. More... | |
def | remote_pair (self, fn, seq1, seq2, library, name, chunksize=1) |
Returns the values for a function of each pair from 2 sequences using remote task. More... | |
def | remote_tree_reduce (self, fn, seq, library, name, chunksize=2) |
Reduces a sequence until only one value is left, and then returns that value. More... | |
def | declare_file (self, path, cache=False, peer_transfer=True, unlink_when_done=False) |
Declare a file obtained from the local filesystem. More... | |
def | fetch_file (self, file) |
Fetch file contents from the cluster or local disk. More... | |
def | undeclare_file (self, file) |
Un-declare a file that was created by declare_file or similar methods. More... | |
def | undeclare_function (self, fn) |
Remove the manager's local serialized copy of a function used with PythonTask. More... | |
def | declare_temp (self) |
Declare an anonymous file has no initial content, but is created as the output of a task, and may be consumed by other tasks. More... | |
def | declare_url (self, url, cache=False, peer_transfer=True) |
Declare a file obtained from a remote URL. More... | |
def | declare_buffer (self, buffer=None, cache=False, peer_transfer=True) |
Declare a file created from a buffer in memory. More... | |
def | declare_minitask (self, minitask, source, cache=False, peer_transfer=True) |
Declare a file created by executing a mini-task. More... | |
def | declare_untar (self, tarball, cache=False, peer_transfer=True) |
Declare a file created by by unpacking a tar file. More... | |
def | declare_poncho (self, package, cache=False, peer_transfer=True) |
Declare a file that sets up a poncho environment. More... | |
def | declare_starch (self, starch, cache=False, peer_transfer=True) |
Declare a file create a file by unpacking a starch package. More... | |
def | declare_xrootd (self, source, proxy=None, env=None, cache=False, peer_transfer=True) |
Declare a file from accessible from an xrootd server. More... | |
def | declare_chirp (self, server, source, ticket=None, env=None, cache=False, peer_transfer=True) |
Declare a file from accessible from an xrootd server. More... | |
def | log_txn_app (self, entry) |
Adds a custom APPLICATION entry to the transactions log. More... | |
def | log_debug_app (self, entry) |
Adds a custom APPLICATION entry to the debug log. More... | |
TaskVine Manager specialized to compute dask graphs.
Managers created via DaskVine can be used to execute dask graphs via the method ndcctools.taskvine.dask_executor.DaskVine.get as follows:
Parameters for execution can be set as arguments to the compute function. These arguments are applied to each task executed:
def ndcctools.taskvine.dask_executor.DaskVine.get | ( | self, | |
dsk, | |||
keys, | |||
* | environment = None , |
||
extra_files = None , |
|||
lazy_transfers = False , |
|||
env_vars = None , |
|||
low_memory_mode = False , |
|||
checkpoint_fn = None , |
|||
resources = None , |
|||
resources_mode = None , |
|||
submit_per_cycle = None , |
|||
retries = 5 , |
|||
verbose = False , |
|||
lib_resources = None , |
|||
lib_command = None , |
|||
import_modules = None , |
|||
task_mode = 'tasks' , |
|||
env_per_task = False |
|||
) |
Execute the task graph dsk and return the results for keys in graph.
dsk | The task graph to execute. |
keys | A possible nested list of keys to compute the value from dsk. |
environment | A taskvine file representing an environment to run the tasks. |
extra_files | A dictionary of {taskvine.File: "remote_name"} to add to each task. |
lazy_transfers | Whether to keep intermediate results only at workers (True) or to bring back each result to the manager (False, default). True is more IO efficient, but runs the risk of needing to recompute results if workers are lost. |
env_vars | A dictionary of VAR=VALUE environment variables to set per task. A value should be either a string, or a function that accepts as arguments the manager and task, and that returns a string. |
low_memory_mode | Split graph vertices to reduce memory needed per function call. It removes some of the dask graph optimizations, thus proceed with care. |
checkpoint_fn | When using lazy_transfers, a predicate with arguments (dag, key) called before submitting a task. If True, the result is brought back to the manager. |
resources | A dictionary with optional keys of cores, memory and disk (MB) to set maximum resource usage per task. |
lib_resources | A dictionary with optional keys of cores, memory and disk (MB) |
lib_command | A command to be prefixed to the execution of a Library task. |
import_modules | Hoist these module imports for the DaskVine Library. |
env_per_task | execute each task |
resources_mode | Automatically resize allocation per task. One of 'fixed' (use the value of 'resources' above), 'max througput', 'max' (for maximum values seen), 'min_waste', 'greedy bucketing' or 'exhaustive bucketing'. This is done per function type in dsk. |
task_mode | Create tasks as either as 'tasks' (using PythonTasks) or 'function-calls' (using FunctionCalls) |
retries | Number of times to attempt a task. Default is 5. |
submit_per_cycle | Maximum number of tasks to serialize per wait call. If None, or less than 1, then all tasks are serialized as they are available. |
verbose | if true, emit additional debugging information. |
env_per_task | if true, each task individually expands its own environment. Must use environment option as a str. |