cctools
|
Public Member Functions | |
def | get (self, dsk, keys, *environment=None, extra_files=None, worker_transfers=True, env_vars=None, checkpoint_fn=None, resources=None, resources_mode=None, submit_per_cycle=None, max_pending=None, retries=5, verbose=False, lib_extra_functions=None, lib_resources=None, lib_command=None, lib_modules=None, task_mode='tasks', scheduling_mode='FIFO', env_per_task=False, progress_disable=False, progress_label="[green]tasks", wrapper=None, wrapper_proc=print, prune_files=False, hoisting_modules=None, import_modules=None, lazy_transfers=True) |
Execute the task graph dsk and return the results for keys in graph. More... | |
![]() | |
def | __init__ (self, port=cvine.VINE_DEFAULT_PORT, name=None, shutdown=False, run_info_path="vine-run-info", run_info_template=None, staging_path=None, ssl=None, init_fn=None, status_display_interval=None) |
Create a new manager. More... | |
def | name (self) |
Get the project name of the manager. More... | |
def | port (self) |
Get the listening port of the manager. More... | |
def | using_ssl (self) |
Whether the manager is using ssl to talk to workers. More... | |
def | logging_directory (self) |
Get the logs directory of the manager. More... | |
def | staging_directory (self) |
Get the staging directory of the manager. More... | |
def | library_logging_directory (self) |
Get the library logs directory of the manager. More... | |
def | cache_directory (self) |
Get the caching directory of the manager. More... | |
def | stats (self) |
Get manager statistics. More... | |
def | stats_category (self, category) |
Get the task statistics for the given category. More... | |
def | status (self, request) |
Get manager information as list of dictionaries. More... | |
def | summarize_workers (self) |
Get resource statistics of workers connected. More... | |
def | update_catalog (self) |
Send update to catalog server. More... | |
def | set_category_mode (self, category, mode) |
Turn on or off first-allocation labeling for a given category. More... | |
def | set_category_autolabel_resource (self, category, resource, autolabel) |
Turn on or off first-allocation labeling for a given category and resource. More... | |
def | task_state (self, task_id) |
Get current task state. More... | |
def | enable_monitoring (self, watchdog=True, time_series=False) |
Enables resource monitoring for tasks. More... | |
def | enable_peer_transfers (self) |
Enable P2P worker transfer functionality. More... | |
def | disable_peer_transfers (self) |
Disable P2P worker transfer functionality. More... | |
def | enable_disconnect_slow_workers (self, multiplier) |
Change the project name for the given manager. More... | |
def | enable_disconnect_slow_workers_category (self, name, multiplier) |
Enable disconnect slow workers functionality for a given manager. More... | |
def | set_draining_by_hostname (self, hostname, drain_mode=True) |
Turn on or off draining mode for workers at hostname. More... | |
def | empty (self) |
Determine whether there are any known tasks managerd, running, or waiting to be collected. More... | |
def | hungry (self) |
Determine whether the manager can support more tasks. More... | |
def | set_scheduler (self, scheduler) |
Set the worker selection scheduler for manager. More... | |
def | set_name (self, name) |
Change the project name for the given manager. More... | |
def | set_manager_preferred_connection (self, mode) |
Set the preference for using hostname over IP address to connect. More... | |
def | set_min_task_id (self, minid) |
Set the minimum task_id of future submitted tasks. More... | |
def | set_priority (self, priority) |
Change the project priority for the given manager. More... | |
def | tasks_left_count (self, ntasks) |
Specify the number of tasks not yet submitted to the manager. More... | |
def | set_catalog_servers (self, catalogs) |
Specify the catalog servers the manager should report to. More... | |
def | set_property (self, name, value) |
Add a global property to the manager which will be included in periodic reports to the catalog server and other telemetry destinations. More... | |
def | set_password (self, password) |
Add a mandatory password that each worker must present. More... | |
def | set_password_file (self, file) |
Add a mandatory password file that each worker must present. More... | |
def | set_resources_max (self, rmd) |
Specifies the maximum resources allowed for the default category. More... | |
def | set_resources_min (self, rmd) |
Specifies the minimum resources allowed for the default category. More... | |
def | set_category_resources_max (self, category, rmd) |
Specifies the maximum resources allowed for the given category. More... | |
def | set_category_resources_min (self, category, rmd) |
Specifies the minimum resources allowed for the given category. More... | |
def | set_category_first_allocation_guess (self, category, rmd) |
Specifies the first-allocation guess for the given category. More... | |
def | set_category_max_concurrent (self, category, max_concurrent) |
Specifies the maximum resources allowed for the given category. More... | |
def | initialize_categories (self, filename, rm) |
Initialize first value of categories. More... | |
def | cancel_by_task_id (self, id) |
Cancel task identified by its task_id. More... | |
def | cancel_by_task_tag (self, tag) |
Cancel task identified by its tag. More... | |
def | cancel_by_category (self, category) |
Cancel all tasks of the given category. More... | |
def | cancel_all (self) |
Cancel all tasks. More... | |
def | workers_shutdown (self, n=0) |
Shutdown workers connected to manager. More... | |
def | block_host (self, host) |
Block workers running on host from working for the manager. More... | |
def | blacklist (self, host) |
Replaced by ndcctools.taskvine.manager.Manager.block_host. More... | |
def | block_host_with_timeout (self, host, timeout) |
Block workers running on host for the duration of the given timeout. More... | |
def | blacklist_with_timeout (self, host, timeout) |
See ndcctools.taskvine.manager.Manager.block_host_with_timeout. More... | |
def | unblock_host (self, host=None) |
Unblock given host, of all hosts if host not given. More... | |
def | blacklist_clear (self, host=None) |
See ndcctools.taskvine.manager.Manager.unblock_host. More... | |
def | set_keepalive_interval (self, interval) |
Change keepalive interval for a given manager. More... | |
def | set_keepalive_timeout (self, timeout) |
Change keepalive timeout for a given manager. More... | |
def | tune (self, name, value) |
Tune advanced parameters. More... | |
def | submit (self, task) |
Submit a task to the manager. More... | |
def | install_library (self, task) |
Submit a library to install on all connected workers. More... | |
def | remove_library (self, name) |
Remove a library from all connected workers. More... | |
def | check_library_exists (self, library_name) |
Check whether a libray exists on the manager or not. More... | |
def | create_library_from_functions (self, library_name, *function_list, poncho_env=None, init_command=None, add_env=True, hoisting_modules=None, exec_mode='fork', library_context_info=None) |
Turn a list of python functions into a Library Task. More... | |
def | create_library_from_serverized_files (self, library_name, library_path, env=None) |
Turn Library code created with poncho_package_serverize into a Library Task. More... | |
def | create_library_from_command (self, executable_path, name, env=None) |
Create a Library task from arbitrary inputs. More... | |
def | wait (self, timeout="wait_forever") |
Wait for tasks to complete. More... | |
def | wait_for_tag (self, tag, timeout="wait_forever") |
Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified tag. More... | |
def | wait_for_task_id (self, task_id, timeout="wait_forever") |
Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified task_id. More... | |
def | application_info (self) |
Should return a dictionary with information for the status display. More... | |
def | map (self, fn, seq, chunksize=1) |
Maps a function to elements in a sequence using taskvine. More... | |
def | pair (self, fn, seq1, seq2, chunksize=1, env=None) |
Returns the values for a function of each pair from 2 sequences. More... | |
def | tree_reduce (self, fn, seq, chunksize=2) |
Reduces a sequence until only one value is left, and then returns that value. More... | |
def | remote_map (self, fn, seq, library, name, chunksize=1) |
Maps a function to elements in a sequence using taskvine remote task. More... | |
def | remote_pair (self, fn, seq1, seq2, library, name, chunksize=1) |
Returns the values for a function of each pair from 2 sequences using remote task. More... | |
def | remote_tree_reduce (self, fn, seq, library, name, chunksize=2) |
Reduces a sequence until only one value is left, and then returns that value. More... | |
def | declare_file (self, path, cache=False, peer_transfer=True, unlink_when_done=False) |
Declare a file obtained from the local filesystem. More... | |
def | fetch_file (self, file) |
Fetch file contents from the cluster or local disk. More... | |
def | undeclare_file (self, file) |
Un-declare a file that was created by declare_file or similar methods. More... | |
def | undeclare_function (self, fn) |
Remove the manager's local serialized copy of a function used with PythonTask. More... | |
def | declare_temp (self) |
Declare an anonymous file has no initial content, but is created as the output of a task, and may be consumed by other tasks. More... | |
def | declare_url (self, url, cache=False, peer_transfer=True) |
Declare a file obtained from a remote URL. More... | |
def | declare_buffer (self, buffer=None, cache=False, peer_transfer=True) |
Declare a file created from a buffer in memory. More... | |
def | declare_minitask (self, minitask, source, cache=False, peer_transfer=True) |
Declare a file created by executing a mini-task. More... | |
def | declare_untar (self, tarball, cache=False, peer_transfer=True) |
Declare a file created by by unpacking a tar file. More... | |
def | declare_poncho (self, package, cache=False, peer_transfer=True) |
Declare a file that sets up a poncho environment. More... | |
def | declare_starch (self, starch, cache=False, peer_transfer=True) |
Declare a file create a file by unpacking a starch package. More... | |
def | declare_xrootd (self, source, proxy=None, env=None, cache=False, peer_transfer=True) |
Declare a file from accessible from an xrootd server. More... | |
def | declare_chirp (self, server, source, ticket=None, env=None, cache=False, peer_transfer=True) |
Declare a file from accessible from an xrootd server. More... | |
def | log_txn_app (self, entry) |
Adds a custom APPLICATION entry to the transactions log. More... | |
def | log_debug_app (self, entry) |
Adds a custom APPLICATION entry to the debug log. More... | |
def | get_file_replica_count (self, file) |
Gets the number of replicas of a file. More... | |
TaskVine Manager specialized to compute dask graphs.
Managers created via DaskVine can be used to execute dask graphs via the method ndcctools.taskvine.dask_executor.DaskVine.get as follows:
Parameters for execution can be set as arguments to the compute function. These arguments are applied to each task executed:
def ndcctools.taskvine.dask_executor.DaskVine.get | ( | self, | |
dsk, | |||
keys, | |||
* | environment = None , |
||
extra_files = None , |
|||
worker_transfers = True , |
|||
env_vars = None , |
|||
checkpoint_fn = None , |
|||
resources = None , |
|||
resources_mode = None , |
|||
submit_per_cycle = None , |
|||
max_pending = None , |
|||
retries = 5 , |
|||
verbose = False , |
|||
lib_extra_functions = None , |
|||
lib_resources = None , |
|||
lib_command = None , |
|||
lib_modules = None , |
|||
task_mode = 'tasks' , |
|||
scheduling_mode = 'FIFO' , |
|||
env_per_task = False , |
|||
progress_disable = False , |
|||
progress_label = "[green]tasks" , |
|||
wrapper = None , |
|||
wrapper_proc = print , |
|||
prune_files = False , |
|||
hoisting_modules = None , |
|||
import_modules = None , |
|||
lazy_transfers = True |
|||
) |
Execute the task graph dsk and return the results for keys in graph.
dsk | The task graph to execute. |
keys | A possible nested list of keys to compute the value from dsk. |
environment | A taskvine file representing an environment to run the tasks. |
extra_files | A dictionary of {taskvine.File: "remote_name"} to add to each task. |
worker_transfers | Whether to keep intermediate results only at workers (True, default) or to bring back each result to the manager (False). True is more IO efficient, but runs the risk of needing to recompute results if workers are lost. |
env_vars | A dictionary of VAR=VALUE environment variables to set per task. A value should be either a string, or a function that accepts as arguments the manager and task, and that returns a string. |
checkpoint_fn | When using worker_transfers, a predicate with arguments (dag, key) called before submitting a task. If True, the result is brought back to the manager. |
resources | A dictionary with optional keys of cores, memory and disk (MB) to set maximum resource usage per task. |
lib_extra_functions | Additional functions to include in execution library (function-calls task_mode) |
lib_resources | A dictionary with optional keys of cores, memory and disk in MB (function-calls task_mode) |
lib_command | A command to be prefixed to the execution of a Library task (function-calls task_mode) |
lib_modules | Hoist these module imports for the execution library (function-calls task_mode) |
env_per_task | execute each task |
resources_mode | Automatically resize allocation per task. One of 'fixed' (use the value of 'resources' above), 'max througput', 'max' (for maximum values seen), 'min_waste', 'greedy bucketing' or 'exhaustive bucketing'. This is done per function type in dsk. |
task_mode | Create tasks as either as 'tasks' (using PythonTasks) or 'function-calls' (using FunctionCalls) |
retries | Number of times to attempt a task. Default is 5. |
submit_per_cycle | Maximum number of tasks to submit to scheduler at once. If None, or less than 1, then all tasks are submitted as they are available. |
max_pending | Maximum number of tasks without a result before new ones are submitted to the scheduler. If None, or less than 1, then no limit is set. |
verbose | if true, emit additional debugging information. |
env_per_task | if true, each task individually expands its own environment. Must use environment option as a str. |
progress_disable | If True, disable progress bar |
progress_label | Label to use in progress bar |
wrapper | Function to wrap dask calls. It should take as arguments (key, fn, *args). It should execute fn(*args) at some point during its execution to produce the dask task result. Should return a tuple of (wrapper result, dask call result). Use for debugging. |
wrapper_proc | Function to process results from wrapper on completion. (default is print) |
prune_files | If True, remove files from the cluster after they are no longer needed. |