cctools
|
Public Member Functions | |
def | __init__ (self, port=cvine.VINE_DEFAULT_PORT, name=None, shutdown=False, run_info_path="vine-run-info", staging_path=None, ssl=None, init_fn=None, status_display_interval=None) |
Create a new manager. More... | |
def | name (self) |
Get the project name of the manager. More... | |
def | port (self) |
Get the listening port of the manager. More... | |
def | using_ssl (self) |
Whether the manager is using ssl to talk to workers. More... | |
def | logging_directory (self) |
Get the logs directory of the manager. More... | |
def | staging_directory (self) |
Get the staging directory of the manager. More... | |
def | library_logging_directory (self) |
Get the library logs directory of the manager. More... | |
def | cache_directory (self) |
Get the caching directory of the manager. More... | |
def | stats (self) |
Get manager statistics. More... | |
def | stats_category (self, category) |
Get the task statistics for the given category. More... | |
def | status (self, request) |
Get manager information as list of dictionaries. More... | |
def | summarize_workers (self) |
Get resource statistics of workers connected. More... | |
def | update_catalog (self) |
Send update to catalog server. More... | |
def | set_category_mode (self, category, mode) |
Turn on or off first-allocation labeling for a given category. More... | |
def | set_category_autolabel_resource (self, category, resource, autolabel) |
Turn on or off first-allocation labeling for a given category and resource. More... | |
def | task_state (self, task_id) |
Get current task state. More... | |
def | enable_monitoring (self, watchdog=True, time_series=False) |
Enables resource monitoring for tasks. More... | |
def | enable_peer_transfers (self) |
Enable P2P worker transfer functionality. More... | |
def | disable_peer_transfers (self) |
Disable P2P worker transfer functionality. More... | |
def | enable_disconnect_slow_workers (self, multiplier) |
Change the project name for the given manager. More... | |
def | enable_disconnect_slow_workers_category (self, name, multiplier) |
Enable disconnect slow workers functionality for a given manager. More... | |
def | set_draining_by_hostname (self, hostname, drain_mode=True) |
Turn on or off draining mode for workers at hostname. More... | |
def | empty (self) |
Determine whether there are any known tasks managerd, running, or waiting to be collected. More... | |
def | hungry (self) |
Determine whether the manager can support more tasks. More... | |
def | set_scheduler (self, scheduler) |
Set the worker selection scheduler for manager. More... | |
def | set_name (self, name) |
Change the project name for the given manager. More... | |
def | set_manager_preferred_connection (self, mode) |
Set the preference for using hostname over IP address to connect. More... | |
def | set_min_task_id (self, minid) |
Set the minimum task_id of future submitted tasks. More... | |
def | set_priority (self, priority) |
Change the project priority for the given manager. More... | |
def | tasks_left_count (self, ntasks) |
Specify the number of tasks not yet submitted to the manager. More... | |
def | set_catalog_servers (self, catalogs) |
Specify the catalog servers the manager should report to. More... | |
def | set_property (self, name, value) |
Add a global property to the manager which will be included in periodic reports to the catalog server and other telemetry destinations. More... | |
def | set_runtime_info_path (self, dirname) |
Specify a directory to write logs and staging files. More... | |
def | set_password (self, password) |
Add a mandatory password that each worker must present. More... | |
def | set_password_file (self, file) |
Add a mandatory password file that each worker must present. More... | |
def | set_resources_max (self, rmd) |
Specifies the maximum resources allowed for the default category. More... | |
def | set_resources_min (self, rmd) |
Specifies the minimum resources allowed for the default category. More... | |
def | set_category_resources_max (self, category, rmd) |
Specifies the maximum resources allowed for the given category. More... | |
def | set_category_resources_min (self, category, rmd) |
Specifies the minimum resources allowed for the given category. More... | |
def | set_category_first_allocation_guess (self, category, rmd) |
Specifies the first-allocation guess for the given category. More... | |
def | set_category_max_concurrent (self, category, max_concurrent) |
Specifies the maximum resources allowed for the given category. More... | |
def | initialize_categories (self, filename, rm) |
Initialize first value of categories. More... | |
def | cancel_by_task_id (self, id) |
Cancel task identified by its task_id. More... | |
def | cancel_by_task_tag (self, tag) |
Cancel task identified by its tag. More... | |
def | cancel_by_category (self, category) |
Cancel all tasks of the given category. More... | |
def | cancel_all (self) |
Cancel all tasks. More... | |
def | workers_shutdown (self, n=0) |
Shutdown workers connected to manager. More... | |
def | block_host (self, host) |
Block workers running on host from working for the manager. More... | |
def | blacklist (self, host) |
Replaced by ndcctools.taskvine.manager.Manager.block_host. More... | |
def | block_host_with_timeout (self, host, timeout) |
Block workers running on host for the duration of the given timeout. More... | |
def | blacklist_with_timeout (self, host, timeout) |
See ndcctools.taskvine.manager.Manager.block_host_with_timeout. More... | |
def | unblock_host (self, host=None) |
Unblock given host, of all hosts if host not given. More... | |
def | blacklist_clear (self, host=None) |
See ndcctools.taskvine.manager.Manager.unblock_host. More... | |
def | set_keepalive_interval (self, interval) |
Change keepalive interval for a given manager. More... | |
def | set_keepalive_timeout (self, timeout) |
Change keepalive timeout for a given manager. More... | |
def | tune (self, name, value) |
Tune advanced parameters. More... | |
def | submit (self, task) |
Submit a task to the manager. More... | |
def | install_library (self, task) |
Submit a library to install on all connected workers. More... | |
def | remove_library (self, name) |
Remove a library from all connected workers. More... | |
def | check_library_exists (self, library_name) |
Check whether a libray exists on the manager or not. More... | |
def | create_library_from_functions (self, library_name, *function_list, poncho_env=None, init_command=None, add_env=True, hoisting_modules=None) |
Turn a list of python functions into a Library Task. More... | |
def | create_library_from_serverized_files (self, library_name, library_path, env=None) |
Turn Library code created with poncho_package_serverize into a Library Task. More... | |
def | create_library_from_command (self, executable_path, name, env=None) |
Create a Library task from arbitrary inputs. More... | |
def | wait (self, timeout="wait_forever") |
Wait for tasks to complete. More... | |
def | wait_for_tag (self, tag, timeout="wait_forever") |
Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified tag. More... | |
def | wait_for_task_id (self, task_id, timeout="wait_forever") |
Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified task_id. More... | |
def | application_info (self) |
Should return a dictionary with information for the status display. More... | |
def | map (self, fn, seq, chunksize=1) |
Maps a function to elements in a sequence using taskvine. More... | |
def | pair (self, fn, seq1, seq2, chunksize=1, env=None) |
Returns the values for a function of each pair from 2 sequences. More... | |
def | tree_reduce (self, fn, seq, chunksize=2) |
Reduces a sequence until only one value is left, and then returns that value. More... | |
def | remote_map (self, fn, seq, library, name, chunksize=1) |
Maps a function to elements in a sequence using taskvine remote task. More... | |
def | remote_pair (self, fn, seq1, seq2, library, name, chunksize=1) |
Returns the values for a function of each pair from 2 sequences using remote task. More... | |
def | remote_tree_reduce (self, fn, seq, library, name, chunksize=2) |
Reduces a sequence until only one value is left, and then returns that value. More... | |
def | declare_file (self, path, cache=False, peer_transfer=True, unlink_when_done=False) |
Declare a file obtained from the local filesystem. More... | |
def | fetch_file (self, file) |
Fetch file contents from the cluster or local disk. More... | |
def | undeclare_file (self, file) |
Un-declare a file that was created by declare_file or similar methods. More... | |
def | undeclare_function (self, fn) |
Remove the manager's local serialized copy of a function used with PythonTask. More... | |
def | declare_temp (self) |
Declare an anonymous file has no initial content, but is created as the output of a task, and may be consumed by other tasks. More... | |
def | declare_url (self, url, cache=False, peer_transfer=True) |
Declare a file obtained from a remote URL. More... | |
def | declare_buffer (self, buffer=None, cache=False, peer_transfer=True) |
Declare a file created from a buffer in memory. More... | |
def | declare_minitask (self, minitask, source, cache=False, peer_transfer=True) |
Declare a file created by executing a mini-task. More... | |
def | declare_untar (self, tarball, cache=False, peer_transfer=True) |
Declare a file created by by unpacking a tar file. More... | |
def | declare_poncho (self, package, cache=False, peer_transfer=True) |
Declare a file that sets up a poncho environment. More... | |
def | declare_starch (self, starch, cache=False, peer_transfer=True) |
Declare a file create a file by unpacking a starch package. More... | |
def | declare_xrootd (self, source, proxy=None, env=None, cache=False, peer_transfer=True) |
Declare a file from accessible from an xrootd server. More... | |
def | declare_chirp (self, server, source, ticket=None, env=None, cache=False, peer_transfer=True) |
Declare a file from accessible from an xrootd server. More... | |
def | log_txn_app (self, entry) |
Adds a custom APPLICATION entry to the transactions log. More... | |
def | log_debug_app (self, entry) |
Adds a custom APPLICATION entry to the debug log. More... | |
def ndcctools.taskvine.manager.Manager.__init__ | ( | self, | |
port = cvine.VINE_DEFAULT_PORT , |
|||
name = None , |
|||
shutdown = False , |
|||
run_info_path = "vine-run-info" , |
|||
staging_path = None , |
|||
ssl = None , |
|||
init_fn = None , |
|||
status_display_interval = None |
|||
) |
Create a new manager.
self | Reference to the current manager object. |
port | The port number to listen on. If zero, then a random port is chosen. A range of possible ports (low, hight) can be also specified instead of a single integer. Default is 9123 |
name | The project name to use. |
shutdown | Automatically shutdown workers when manager is finished. Disabled by default. |
run_info_path | Directory to write log (and staging if staging_path not given) files per run. If None, defaults to "vine-run-info" |
staging_path | Directory to write temporary files. Defaults to run_info_path if not given. |
ssl | A tuple of filenames (ssl_key, ssl_cert) in pem format, or True. If not given, then TSL is not activated. If True, a self-signed temporary key and cert are generated. |
init_fn | Function applied to the newly created manager at initialization. |
status_display_interval | Number of seconds between updates to the jupyter status display. None, or less than 1 disables it. |
def ndcctools.taskvine.manager.Manager.name | ( | self | ) |
Get the project name of the manager.
def ndcctools.taskvine.manager.Manager.port | ( | self | ) |
Get the listening port of the manager.
def ndcctools.taskvine.manager.Manager.using_ssl | ( | self | ) |
Whether the manager is using ssl to talk to workers.
def ndcctools.taskvine.manager.Manager.logging_directory | ( | self | ) |
Get the logs directory of the manager.
def ndcctools.taskvine.manager.Manager.staging_directory | ( | self | ) |
Get the staging directory of the manager.
def ndcctools.taskvine.manager.Manager.library_logging_directory | ( | self | ) |
Get the library logs directory of the manager.
def ndcctools.taskvine.manager.Manager.cache_directory | ( | self | ) |
Get the caching directory of the manager.
def ndcctools.taskvine.manager.Manager.stats | ( | self | ) |
Get manager statistics.
The fields in ndcctools.taskvine.manager.Manager.stats can also be individually accessed through this call. For example:
def ndcctools.taskvine.manager.Manager.stats_category | ( | self, | |
category | |||
) |
Get the task statistics for the given category.
self | Reference to the current manager object. |
category | A category name. For example: s = q.stats_category("my_category")
>>> print(s)
>>> print(s.tasks_waiting)
|
def ndcctools.taskvine.manager.Manager.status | ( | self, | |
request | |||
) |
Get manager information as list of dictionaries.
self | Reference to the current manager object |
request | One of: "manager", "tasks", "workers", or "categories" For example: import json
tasks_info = q.status("tasks")
|
def ndcctools.taskvine.manager.Manager.summarize_workers | ( | self | ) |
Get resource statistics of workers connected.
self | Reference to the current manager object. |
def ndcctools.taskvine.manager.Manager.update_catalog | ( | self | ) |
Send update to catalog server.
self | Reference to the current manager object. |
def ndcctools.taskvine.manager.Manager.set_category_mode | ( | self, | |
category, | |||
mode | |||
) |
Turn on or off first-allocation labeling for a given category.
By default, only cores, memory, and disk are labeled, and gpus are unlabeled. NOTE: autolabeling is only meaningfull when task monitoring is enabled (ndcctools.taskvine.manager.Manager.enable_monitoring). When monitoring is enabled and a task exhausts resources in a worker, mode dictates how taskvine handles the exhaustion:
self | Reference to the current manager object. |
category | A category name. If None, sets the mode by default for newly created categories. |
mode | One of:
|
def ndcctools.taskvine.manager.Manager.set_category_autolabel_resource | ( | self, | |
category, | |||
resource, | |||
autolabel | |||
) |
Turn on or off first-allocation labeling for a given category and resource.
This function should be use to fine-tune the defaults from ndcctools.taskvine.manager.Manager.set_category_mode.
self | Reference to the current manager object. |
category | A category name. |
resource | A resource name. |
autolabel | True/False for on/off. |
def ndcctools.taskvine.manager.Manager.task_state | ( | self, | |
task_id | |||
) |
Get current task state.
See vine_task_state_t for possible values.
def ndcctools.taskvine.manager.Manager.enable_monitoring | ( | self, | |
watchdog = True , |
|||
time_series = False |
|||
) |
Enables resource monitoring for tasks.
The resources measured are available in the resources_measured member of the respective vine_task.
self | Reference to the current manager object. |
watchdog | If not 0, kill tasks that exhaust declared resources. |
time_series | If not 0, generate a time series of resources per task in VINE_RUNTIME_INFO_DIR/vine-logs/time-series/ (WARNING: for long running tasks these files may reach gigabyte sizes. This function is mostly used for debugging.) |
Returns 1 on success, 0 on failure (i.e., monitoring was not enabled).
def ndcctools.taskvine.manager.Manager.enable_peer_transfers | ( | self | ) |
Enable P2P worker transfer functionality.
On by default
self | Reference to the current manager object. |
def ndcctools.taskvine.manager.Manager.disable_peer_transfers | ( | self | ) |
Disable P2P worker transfer functionality.
On by default
self | Reference to the current manager object. |
def ndcctools.taskvine.manager.Manager.enable_disconnect_slow_workers | ( | self, | |
multiplier | |||
) |
Change the project name for the given manager.
self | Reference to the current manager object. |
Enable disconnect slow workers functionality for a given manager for tasks in the "default" category, and for task which category does not set an explicit multiplier.
self | Reference to the current manager object. |
multiplier | The multiplier of the average task time at which point to disconnect a worker; if less than 1, it is disabled (default). |
def ndcctools.taskvine.manager.Manager.enable_disconnect_slow_workers_category | ( | self, | |
name, | |||
multiplier | |||
) |
Enable disconnect slow workers functionality for a given manager.
self | Reference to the current manager object. |
name | Name of the category. |
multiplier | The multiplier of the average task time at which point to disconnect a worker; disabled if less than one (see ndcctools.taskvine.manager.Manager.enable_disconnect_slow_workers) |
def ndcctools.taskvine.manager.Manager.set_draining_by_hostname | ( | self, | |
hostname, | |||
drain_mode = True |
|||
) |
Turn on or off draining mode for workers at hostname.
self | Reference to the current manager object. |
hostname | The hostname the host running the workers. |
drain_mode | If True, no new tasks are dispatched to workers at hostname, and empty workers are shutdown. Else, workers works as usual. |
def ndcctools.taskvine.manager.Manager.empty | ( | self | ) |
Determine whether there are any known tasks managerd, running, or waiting to be collected.
Returns 0 if there are tasks remaining in the system, 1 if the system is "empty".
self | Reference to the current manager object. |
def ndcctools.taskvine.manager.Manager.hungry | ( | self | ) |
Determine whether the manager can support more tasks.
Returns the number of additional tasks it can support if "hungry" and 0 if "sated".
self | Reference to the current manager object. |
def ndcctools.taskvine.manager.Manager.set_scheduler | ( | self, | |
scheduler | |||
) |
Set the worker selection scheduler for manager.
self | Reference to the current manager object. |
scheduler | One of the following schedulers to use in assigning a task to a worker. See vine_schedule_t for possible values. |
def ndcctools.taskvine.manager.Manager.set_name | ( | self, | |
name | |||
) |
Change the project name for the given manager.
self | Reference to the current manager object. |
name | The new project name. |
def ndcctools.taskvine.manager.Manager.set_manager_preferred_connection | ( | self, | |
mode | |||
) |
Set the preference for using hostname over IP address to connect.
'by_ip' uses IP addresses from the network interfaces of the manager (standard behavior), 'by_hostname' to use the hostname at the manager, or 'by_apparent_ip' to use the address of the manager as seen by the catalog server.
self | Reference to the current manager object. |
mode | An string to indicate using 'by_ip', 'by_hostname' or 'by_apparent_ip'. |
def ndcctools.taskvine.manager.Manager.set_min_task_id | ( | self, | |
minid | |||
) |
Set the minimum task_id of future submitted tasks.
Further submitted tasks are guaranteed to have a task_id larger or equal to minid. This function is useful to make task_ids consistent in a workflow that consists of sequential managers. (Note: This function is rarely used). If the minimum id provided is smaller than the last task_id computed, the minimum id provided is ignored.
self | Reference to the current manager object. |
minid | Minimum desired task_id |
def ndcctools.taskvine.manager.Manager.set_priority | ( | self, | |
priority | |||
) |
Change the project priority for the given manager.
self | Reference to the current manager object. |
priority | An integer that presents the priorty of this manager manager. The higher the value, the higher the priority. |
def ndcctools.taskvine.manager.Manager.tasks_left_count | ( | self, | |
ntasks | |||
) |
Specify the number of tasks not yet submitted to the manager.
It is used by vine_factory to determine the number of workers to launch. If not specified, it defaults to 0. vine_factory considers the number of tasks as: num tasks left + num tasks running + num tasks read.
self | Reference to the current manager object. |
ntasks | Number of tasks yet to be submitted. |
def ndcctools.taskvine.manager.Manager.set_catalog_servers | ( | self, | |
catalogs | |||
) |
Specify the catalog servers the manager should report to.
self | Reference to the current manager object. |
catalogs | The catalog servers given as a comma delimited list of hostnames or hostname:port |
def ndcctools.taskvine.manager.Manager.set_property | ( | self, | |
name, | |||
value | |||
) |
Add a global property to the manager which will be included in periodic reports to the catalog server and other telemetry destinations.
This is helpful for distinguishing higher level information about the entire run, such as the name of the framework being used, or the logical name of the dataset being processed.
m | A manager object |
name | The name of the property. |
value | The value of the property. |
def ndcctools.taskvine.manager.Manager.set_runtime_info_path | ( | self, | |
dirname | |||
) |
Specify a directory to write logs and staging files.
self | Reference to the current manager object. |
dirname | A directory name |
def ndcctools.taskvine.manager.Manager.set_password | ( | self, | |
password | |||
) |
Add a mandatory password that each worker must present.
self | Reference to the current manager object. |
password | The password. |
def ndcctools.taskvine.manager.Manager.set_password_file | ( | self, | |
file | |||
) |
Add a mandatory password file that each worker must present.
self | Reference to the current manager object. |
file | Name of the file containing the password. |
def ndcctools.taskvine.manager.Manager.set_resources_max | ( | self, | |
rmd | |||
) |
Specifies the maximum resources allowed for the default category.
self | Reference to the current manager object. |
rmd | Dictionary indicating maximum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example: >>> # A maximum of 4 cores is found on any worker:
>>> q.set_resources_max({'cores': 4})
>>> # A maximum of 8 cores, 1GB of memory, and 10GB disk are found on any worker:
>>> q.set_resources_max({'cores': 8, 'memory': 1024, 'disk': 10240})
|
def ndcctools.taskvine.manager.Manager.set_resources_min | ( | self, | |
rmd | |||
) |
Specifies the minimum resources allowed for the default category.
self | Reference to the current manager object. |
rmd | Dictionary indicating minimum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example: >>> # A minimum of 2 cores is found on any worker:
>>> q.set_resources_min({'cores': 2})
>>> # A minimum of 4 cores, 512MB of memory, and 1GB disk are found on any worker:
>>> q.set_resources_min({'cores': 4, 'memory': 512, 'disk': 1024})
|
def ndcctools.taskvine.manager.Manager.set_category_resources_max | ( | self, | |
category, | |||
rmd | |||
) |
Specifies the maximum resources allowed for the given category.
self | Reference to the current manager object. |
category | Name of the category. |
rmd | Dictionary indicating maximum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example: >>> # A maximum of 4 cores may be used by a task in the category:
>>> q.set_category_resources_max("my_category", {'cores': 4})
>>> # A maximum of 8 cores, 1GB of memory, and 10GB may be used by a task:
>>> q.set_category_resources_max("my_category", {'cores': 8, 'memory': 1024, 'disk': 10240})
|
def ndcctools.taskvine.manager.Manager.set_category_resources_min | ( | self, | |
category, | |||
rmd | |||
) |
Specifies the minimum resources allowed for the given category.
self | Reference to the current manager object. |
category | Name of the category. |
rmd | Dictionary indicating minimum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example: >>> # A minimum of 2 cores is found on any worker:
>>> q.set_category_resources_min("my_category", {'cores': 2})
>>> # A minimum of 4 cores, 512MB of memory, and 1GB disk are found on any worker:
>>> q.set_category_resources_min("my_category", {'cores': 4, 'memory': 512, 'disk': 1024})
|
def ndcctools.taskvine.manager.Manager.set_category_first_allocation_guess | ( | self, | |
category, | |||
rmd | |||
) |
Specifies the first-allocation guess for the given category.
self | Reference to the current manager object. |
category | Name of the category. |
rmd | Dictionary indicating maximum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example: >>> # Tasks are first tried with 4 cores:
>>> q.set_category_first_allocation_guess("my_category", {'cores': 4})
>>> # Tasks are first tried with 8 cores, 1GB of memory, and 10GB:
>>> q.set_category_first_allocation_guess("my_category", {'cores': 8, 'memory': 1024, 'disk': 10240})
|
def ndcctools.taskvine.manager.Manager.set_category_max_concurrent | ( | self, | |
category, | |||
max_concurrent | |||
) |
Specifies the maximum resources allowed for the given category.
self | Reference to the current work queue object. |
category | Name of the category. |
max_concurrent | Number of maximum concurrent tasks. Less then 0 means unlimited (this is the default). For example: >>> # Do not run more than 5 tasks of "my_category" concurrently:
>>> q.set_category_max_concurrent("my_category", 5)
|
def ndcctools.taskvine.manager.Manager.initialize_categories | ( | self, | |
filename, | |||
rm | |||
) |
Initialize first value of categories.
self | Reference to the current manager object. |
rm | Dictionary indicating maximum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. |
filename | JSON file with resource summaries. |
def ndcctools.taskvine.manager.Manager.cancel_by_task_id | ( | self, | |
id | |||
) |
Cancel task identified by its task_id.
The cancelled task will be returned in the normal way via wait with a result of VINE_RESULT_CANCELLED.
self | Reference to the current manager object. |
id | The task_id returned from ndcctools.taskvine.manager.Manager.submit. |
def ndcctools.taskvine.manager.Manager.cancel_by_task_tag | ( | self, | |
tag | |||
) |
Cancel task identified by its tag.
The cancelled task will be returned in the normal way via wait with a result of VINE_RESULT_CANCELLED.
self | Reference to the current manager object. |
tag | The tag assigned to task using ndcctools.taskvine.task.Task.set_tag. |
def ndcctools.taskvine.manager.Manager.cancel_by_category | ( | self, | |
category | |||
) |
Cancel all tasks of the given category.
The cancelled tasks will be returned in the normal way via wait with a result of VINE_RESULT_CANCELLED.
self | Reference to the current manager object. |
category | The name of the category to cancel. |
def ndcctools.taskvine.manager.Manager.cancel_all | ( | self | ) |
Cancel all tasks.
The cancelled tasks will be returned in the normal way via wait with a result of VINE_RESULT_CANCELLED.
self | Reference to the current manager object. |
def ndcctools.taskvine.manager.Manager.workers_shutdown | ( | self, | |
n = 0 |
|||
) |
Shutdown workers connected to manager.
Gives a best effort and then returns the number of workers given the shutdown order.
self | Reference to the current manager object. |
n | The number to shutdown. 0 shutdowns all workers |
def ndcctools.taskvine.manager.Manager.block_host | ( | self, | |
host | |||
) |
Block workers running on host from working for the manager.
self | Reference to the current manager object. |
host | The hostname the host running the workers. |
def ndcctools.taskvine.manager.Manager.blacklist | ( | self, | |
host | |||
) |
Replaced by ndcctools.taskvine.manager.Manager.block_host.
def ndcctools.taskvine.manager.Manager.block_host_with_timeout | ( | self, | |
host, | |||
timeout | |||
) |
Block workers running on host for the duration of the given timeout.
self | Reference to the current manager object. |
host | The hostname the host running the workers. |
timeout | How long this block entry lasts (in seconds). If less than 1, block indefinitely. |
def ndcctools.taskvine.manager.Manager.blacklist_with_timeout | ( | self, | |
host, | |||
timeout | |||
) |
def ndcctools.taskvine.manager.Manager.unblock_host | ( | self, | |
host = None |
|||
) |
Unblock given host, of all hosts if host not given.
self | Reference to the current manager object. |
host | The of the hostname the host. |
def ndcctools.taskvine.manager.Manager.blacklist_clear | ( | self, | |
host = None |
|||
) |
def ndcctools.taskvine.manager.Manager.set_keepalive_interval | ( | self, | |
interval | |||
) |
Change keepalive interval for a given manager.
self | Reference to the current manager object. |
interval | Minimum number of seconds to wait before sending new keepalive checks to workers. |
def ndcctools.taskvine.manager.Manager.set_keepalive_timeout | ( | self, | |
timeout | |||
) |
Change keepalive timeout for a given manager.
self | Reference to the current manager object. |
timeout | Minimum number of seconds to wait for a keepalive response from worker before marking it as dead. |
def ndcctools.taskvine.manager.Manager.tune | ( | self, | |
name, | |||
value | |||
) |
Tune advanced parameters.
self | Reference to the current manager object. |
name | The name fo the parameter to tune. Can be one of following:
|
value | The value to set the parameter to. |
def ndcctools.taskvine.manager.Manager.submit | ( | self, | |
task | |||
) |
Submit a task to the manager.
It is safe to re-submit a task returned by ndcctools.taskvine.manager.Manager.wait.
self | Reference to the current manager object. |
task | A task description created from ndcctools.taskvine.task.Task. |
def ndcctools.taskvine.manager.Manager.install_library | ( | self, | |
task | |||
) |
Submit a library to install on all connected workers.
self | Reference to the current manager object. |
task | A Library Task description created from create_library_from_functions or create_library_from_files |
def ndcctools.taskvine.manager.Manager.remove_library | ( | self, | |
name | |||
) |
Remove a library from all connected workers.
self | Reference to the current manager object. |
name | Name of the library to be removed. |
def ndcctools.taskvine.manager.Manager.check_library_exists | ( | self, | |
library_name | |||
) |
Check whether a libray exists on the manager or not.
self | Reference to the current manager object. |
library_name | Name of the library to be checked |
def ndcctools.taskvine.manager.Manager.create_library_from_functions | ( | self, | |
library_name, | |||
* | function_list, | ||
poncho_env = None , |
|||
init_command = None , |
|||
add_env = True , |
|||
hoisting_modules = None |
|||
) |
Turn a list of python functions into a Library Task.
This Library Task will be run on a worker as a regular task. Note that functions are required to have source code available, so dynamically generated functions won't work (e.g., lambda, interactive).
self | Reference to the current manager object. |
library_name | Name of the Library to be created |
function_list | List of all functions to be included in the library |
poncho_env | Name of an already prepared poncho environment |
init_command | A string describing a shell command to execute before the library task is run |
add_env | Whether to automatically create and/or add environment to the library |
hoisting_modules | A list of modules imported at the preamble of library, including packages, functions and classes. |
def ndcctools.taskvine.manager.Manager.create_library_from_serverized_files | ( | self, | |
library_name, | |||
library_path, | |||
env = None |
|||
) |
Turn Library code created with poncho_package_serverize into a Library Task.
self | Reference to the current manager object. |
library_name | Name that identifies this library to the FunctionCalls |
library_path | Filename of the library (i.e., the output of poncho_package_serverize) |
env | Environment to run the library. Either a vine file that expands to an environment (see ndcctools.taskvine.task.Task.add_environment), or a path to a poncho environment. |
def ndcctools.taskvine.manager.Manager.create_library_from_command | ( | self, | |
executable_path, | |||
name, | |||
env = None |
|||
) |
Create a Library task from arbitrary inputs.
self | Reference to the current manager object |
executable_path | Filename of the library executable |
name | Name of the library to be created |
env | Environment to run the library. Either a vine file that expands to an environment (see ndcctools.taskvine.task.Task.add_environment), or a path to a poncho environment. |
def ndcctools.taskvine.manager.Manager.wait | ( | self, | |
timeout = "wait_forever" |
|||
) |
Wait for tasks to complete.
This call will block until the timeout has elapsed
self | Reference to the current manager object. |
timeout | The number of seconds to wait for a completed task before returning. Use an integer to set the timeout or the value "wait_forever" to block until a task has completed. If 0, return immediately with a complete task if one available, or None otherwise. |
def ndcctools.taskvine.manager.Manager.wait_for_tag | ( | self, | |
tag, | |||
timeout = "wait_forever" |
|||
) |
Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified tag.
This call will block until the timeout has elapsed.
self | Reference to the current manager object. |
tag | Desired tag. If None, then it is equivalent to self.wait(timeout) |
timeout | The number of seconds to wait for a completed task before returning. If 0, return immediately with a complete task if one available, or None otherwise. |
def ndcctools.taskvine.manager.Manager.wait_for_task_id | ( | self, | |
task_id, | |||
timeout = "wait_forever" |
|||
) |
Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified task_id.
This call will block until the timeout has elapsed.
self | Reference to the current manager object. |
task_id | Desired task_id. If -1, then it is equivalent to self.wait(timeout) |
timeout | The number of seconds to wait for a completed task before returning. If 0, return immediately with a complete task if one available, or None otherwise. |
def ndcctools.taskvine.manager.Manager.application_info | ( | self | ) |
Should return a dictionary with information for the status display.
This method is meant to be overriden by custom applications.
The dictionary should be of the form:
{ "application_info" : {"values" : dict, "units" : dict} }
where "units" is an optional dictionary that indicates the units of the corresponding key in "values".
self | Reference to the current work queue object. |
For example:
def ndcctools.taskvine.manager.Manager.map | ( | self, | |
fn, | |||
seq, | |||
chunksize = 1 |
|||
) |
Maps a function to elements in a sequence using taskvine.
Similar to regular map function in python
self | Reference to the current manager object. |
fn | The function that will be called on each element |
seq | The sequence that will call the function |
chunksize | The number of elements to process at once |
def ndcctools.taskvine.manager.Manager.pair | ( | self, | |
fn, | |||
seq1, | |||
seq2, | |||
chunksize = 1 , |
|||
env = None |
|||
) |
Returns the values for a function of each pair from 2 sequences.
The pairs that are passed into the function are generated by itertools
self | Reference to the current manager object. |
fn | The function that will be called on each element |
seq1 | The first seq that will be used to generate pairs |
seq2 | The second seq that will be used to generate pairs |
chunksize | Number of pairs to process at once (default is 1) |
env | Filename of a python environment tarball (conda or poncho) |
def ndcctools.taskvine.manager.Manager.tree_reduce | ( | self, | |
fn, | |||
seq, | |||
chunksize = 2 |
|||
) |
Reduces a sequence until only one value is left, and then returns that value.
The sequence is reduced by passing a pair of elements into a function and then stores the result. It then makes a sequence from the results, and reduces again until one value is left.
If the sequence has an odd length, the last element gets reduced at the end.
self | Reference to the current manager object. |
fn | The function that will be called on each element |
seq | The seq that will be reduced |
chunksize | The number of elements per Task (for tree reduc, must be greater than 1) |
def ndcctools.taskvine.manager.Manager.remote_map | ( | self, | |
fn, | |||
seq, | |||
library, | |||
name, | |||
chunksize = 1 |
|||
) |
Maps a function to elements in a sequence using taskvine remote task.
Similar to regular map function in python, but creates a task to execute each function on a worker running a library
self | Reference to the current manager object. |
fn | The function that will be called on each element. This function exists in library. |
seq | The sequence that will call the function |
library | The name of the library that contains the function fn. |
name | This defines the key in the event json that wraps the data sent to the library. |
chunksize | The number of elements to process at once |
def ndcctools.taskvine.manager.Manager.remote_pair | ( | self, | |
fn, | |||
seq1, | |||
seq2, | |||
library, | |||
name, | |||
chunksize = 1 |
|||
) |
Returns the values for a function of each pair from 2 sequences using remote task.
The pairs that are passed into the function are generated by itertools
self | Reference to the current manager object. |
fn | The function that will be called on each element. This function exists in library. |
seq1 | The first seq that will be used to generate pairs |
seq2 | The second seq that will be used to generate pairs |
library | The name of the library that contains the function fn. |
name | This defines the key in the event json that wraps the data sent to the library. |
chunksize | The number of elements to process at once |
def ndcctools.taskvine.manager.Manager.remote_tree_reduce | ( | self, | |
fn, | |||
seq, | |||
library, | |||
name, | |||
chunksize = 2 |
|||
) |
Reduces a sequence until only one value is left, and then returns that value.
The sequence is reduced by passing a pair of elements into a function and then stores the result. It then makes a sequence from the results, and reduces again until one value is left. Executes on library
If the sequence has an odd length, the last element gets reduced at the end.
self | Reference to the current manager object. |
fn | The function that will be called on each element. Exists on the library |
seq | The seq that will be reduced |
library | The name of the library that contains the function fn. |
name | This defines the key in the event json that wraps the data sent to the library. |
chunksize | The number of elements per Task (for tree reduc, must be greater than 1) |
def ndcctools.taskvine.manager.Manager.declare_file | ( | self, | |
path, | |||
cache = False , |
|||
peer_transfer = True , |
|||
unlink_when_done = False |
|||
) |
Declare a file obtained from the local filesystem.
self | The manager to register this file |
path | The path to the local file |
cache | If True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached). |
peer_transfer | Whether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True. |
unlink_when_done | Whether to delete the file when its reference count is 0. (Warning: Only use on files produced by the application, and never on irreplaceable input files.) |
def ndcctools.taskvine.manager.Manager.fetch_file | ( | self, | |
file | |||
) |
Fetch file contents from the cluster or local disk.
self | The manager to register this file |
file | The file object |
def ndcctools.taskvine.manager.Manager.undeclare_file | ( | self, | |
file | |||
) |
Un-declare a file that was created by declare_file or similar methods.
The given file or directory object is deleted from all worker's caches, and is no longer available for use as an input file. Completed tasks waiting for retrieval are not affected. Note that all declared files are automatically undeclared by vine_delete, however this function can be used for earlier cleanup of unneeded file objects.
self | The manager to register this file |
file | The file object |
def ndcctools.taskvine.manager.Manager.undeclare_function | ( | self, | |
fn | |||
) |
Remove the manager's local serialized copy of a function used with PythonTask.
self | The manager to register this file |
fn | The function that the manager should forget. |
def ndcctools.taskvine.manager.Manager.declare_temp | ( | self | ) |
Declare an anonymous file has no initial content, but is created as the output of a task, and may be consumed by other tasks.
self | The manager to register this file |
def ndcctools.taskvine.manager.Manager.declare_url | ( | self, | |
url, | |||
cache = False , |
|||
peer_transfer = True |
|||
) |
Declare a file obtained from a remote URL.
self | The manager to register this file |
url | The url of the file. |
cache | If True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached). |
peer_transfer | Whether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True. |
def ndcctools.taskvine.manager.Manager.declare_buffer | ( | self, | |
buffer = None , |
|||
cache = False , |
|||
peer_transfer = True |
|||
) |
Declare a file created from a buffer in memory.
self | The manager to register this file |
buffer | The contents of the buffer, or None for an empty output buffer |
cache | If True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached). |
peer_transfer | Whether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True. |
For example:
def ndcctools.taskvine.manager.Manager.declare_minitask | ( | self, | |
minitask, | |||
source, | |||
cache = False , |
|||
peer_transfer = True |
|||
) |
Declare a file created by executing a mini-task.
self | The manager to register this file |
minitask | The task to execute in order to produce a file |
source | The name of the file to extract from the task's sandbox. |
cache | If True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached). |
peer_transfer | Whether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True. |
def ndcctools.taskvine.manager.Manager.declare_untar | ( | self, | |
tarball, | |||
cache = False , |
|||
peer_transfer = True |
|||
) |
Declare a file created by by unpacking a tar file.
self | The manager to register this file |
tarball | The file object to un-tar |
cache | If True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached). |
peer_transfer | Whether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True. |
def ndcctools.taskvine.manager.Manager.declare_poncho | ( | self, | |
package, | |||
cache = False , |
|||
peer_transfer = True |
|||
) |
Declare a file that sets up a poncho environment.
self | The manager to register this file |
package | The poncho environment tarball. Either a vine file or a string representing a local file. |
cache | If True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached). |
peer_transfer | Whether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True. |
def ndcctools.taskvine.manager.Manager.declare_starch | ( | self, | |
starch, | |||
cache = False , |
|||
peer_transfer = True |
|||
) |
Declare a file create a file by unpacking a starch package.
self | The manager to register this file |
starch | The startch .sfx file. Either a vine file or a string representing a local file. |
cache | If True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached). |
peer_transfer | Whether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True. |
def ndcctools.taskvine.manager.Manager.declare_xrootd | ( | self, | |
source, | |||
proxy = None , |
|||
env = None , |
|||
cache = False , |
|||
peer_transfer = True |
|||
) |
Declare a file from accessible from an xrootd server.
self | The manager to register this file. |
source | The URL address of the root file in text form as: "root://XROOTSERVER[:port]//path/to/file" |
proxy | A ndcctools.taskvine.file.File of the X509 proxy to use. If None, the environment variable X509_USER_PROXY and the file "$TMPDIR/$UID" are considered in that order. If no proxy is present, the transfer is tried without authentication. |
env | If not None, an environment file (e.g poncho or starch, see ndcctools.taskvine.task.Task.add_environment) that contains the xrootd executables. Otherwise assume xrootd is available at the worker. |
cache | If True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached). |
peer_transfer | Whether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True. |
def ndcctools.taskvine.manager.Manager.declare_chirp | ( | self, | |
server, | |||
source, | |||
ticket = None , |
|||
env = None , |
|||
cache = False , |
|||
peer_transfer = True |
|||
) |
Declare a file from accessible from an xrootd server.
self | The manager to register this file. |
server | The chirp server address of the form "hostname[:port"]" |
source | The name of the file in the server |
ticket | If not None, a file object that provides a chirp an authentication ticket |
env | If not None, an environment file (e.g poncho or starch) that contains the chirp executables. Otherwise assume chirp is available at the worker. |
cache | If True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached). |
peer_transfer | Whether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True. |
def ndcctools.taskvine.manager.Manager.log_txn_app | ( | self, | |
entry | |||
) |
Adds a custom APPLICATION entry to the transactions log.
self | The manager to register this file. |
server | A custom transaction message |
def ndcctools.taskvine.manager.Manager.log_debug_app | ( | self, | |
entry | |||
) |
Adds a custom APPLICATION entry to the debug log.
self | The manager to register this file. |
server | A custom debug message |