cctools
ndcctools.taskvine.manager.Manager Class Reference
Inheritance diagram for ndcctools.taskvine.manager.Manager:
ndcctools.taskvine.dask_executor.DaskVine

Public Member Functions

def __init__ (self, port=cvine.VINE_DEFAULT_PORT, name=None, shutdown=False, run_info_path="vine-run-info", staging_path=None, ssl=None, init_fn=None, status_display_interval=None)
 Create a new manager. More...
 
def name (self)
 Get the project name of the manager. More...
 
def port (self)
 Get the listening port of the manager. More...
 
def using_ssl (self)
 Whether the manager is using ssl to talk to workers. More...
 
def logging_directory (self)
 Get the logs directory of the manager. More...
 
def staging_directory (self)
 Get the staging directory of the manager. More...
 
def cache_directory (self)
 Get the caching directory of the manager. More...
 
def stats (self)
 Get manager statistics. More...
 
def stats_category (self, category)
 Get the task statistics for the given category. More...
 
def status (self, request)
 Get manager information as list of dictionaries. More...
 
def summarize_workers (self)
 Get resource statistics of workers connected. More...
 
def set_category_mode (self, category, mode)
 Turn on or off first-allocation labeling for a given category. More...
 
def set_category_autolabel_resource (self, category, resource, autolabel)
 Turn on or off first-allocation labeling for a given category and resource. More...
 
def task_state (self, task_id)
 Get current task state. More...
 
def enable_monitoring (self, watchdog=True, time_series=False)
 Enables resource monitoring for tasks. More...
 
def enable_peer_transfers (self)
 Enable P2P worker transfer functionality. More...
 
def disable_peer_transfers (self)
 Disable P2P worker transfer functionality. More...
 
def enable_disconnect_slow_workers (self, multiplier)
 Change the project name for the given manager. More...
 
def enable_disconnect_slow_workers_category (self, name, multiplier)
 Enable disconnect slow workers functionality for a given manager. More...
 
def set_draining_by_hostname (self, hostname, drain_mode=True)
 Turn on or off draining mode for workers at hostname. More...
 
def empty (self)
 Determine whether there are any known tasks managerd, running, or waiting to be collected. More...
 
def hungry (self)
 Determine whether the manager can support more tasks. More...
 
def set_scheduler (self, scheduler)
 Set the worker selection scheduler for manager. More...
 
def set_name (self, name)
 Change the project name for the given manager. More...
 
def set_manager_preferred_connection (self, mode)
 Set the preference for using hostname over IP address to connect. More...
 
def set_min_task_id (self, minid)
 Set the minimum task_id of future submitted tasks. More...
 
def set_priority (self, priority)
 Change the project priority for the given manager. More...
 
def tasks_left_count (self, ntasks)
 Specify the number of tasks not yet submitted to the manager. More...
 
def set_catalog_servers (self, catalogs)
 Specify the catalog servers the manager should report to. More...
 
def set_property (self, name, value)
 Add a global property to the manager which will be included in periodic reports to the catalog server and other telemetry destinations. More...
 
def set_runtime_info_path (self, dirname)
 Specify a directory to write logs and staging files. More...
 
def set_password (self, password)
 Add a mandatory password that each worker must present. More...
 
def set_password_file (self, file)
 Add a mandatory password file that each worker must present. More...
 
def set_resources_max (self, rmd)
 Specifies the maximum resources allowed for the default category. More...
 
def set_resources_min (self, rmd)
 Specifies the minimum resources allowed for the default category. More...
 
def set_category_resources_max (self, category, rmd)
 Specifies the maximum resources allowed for the given category. More...
 
def set_category_resources_min (self, category, rmd)
 Specifies the minimum resources allowed for the given category. More...
 
def set_category_first_allocation_guess (self, category, rmd)
 Specifies the first-allocation guess for the given category. More...
 
def initialize_categories (self, filename, rm)
 Initialize first value of categories. More...
 
def cancel_by_task_id (self, id)
 Cancel task identified by its task_id. More...
 
def cancel_by_task_tag (self, tag)
 Cancel task identified by its tag. More...
 
def cancel_by_category (self, category)
 Cancel all tasks of the given category. More...
 
def cancel_all (self)
 Cancel all tasks. More...
 
def workers_shutdown (self, n=0)
 Shutdown workers connected to manager. More...
 
def block_host (self, host)
 Block workers running on host from working for the manager. More...
 
def blacklist (self, host)
 Replaced by ndcctools.taskvine.manager.Manager.block_host. More...
 
def block_host_with_timeout (self, host, timeout)
 Block workers running on host for the duration of the given timeout. More...
 
def blacklist_with_timeout (self, host, timeout)
 See ndcctools.taskvine.manager.Manager.block_host_with_timeout. More...
 
def unblock_host (self, host=None)
 Unblock given host, of all hosts if host not given. More...
 
def blacklist_clear (self, host=None)
 See ndcctools.taskvine.manager.Manager.unblock_host. More...
 
def set_keepalive_interval (self, interval)
 Change keepalive interval for a given manager. More...
 
def set_keepalive_timeout (self, timeout)
 Change keepalive timeout for a given manager. More...
 
def tune (self, name, value)
 Tune advanced parameters. More...
 
def submit (self, task)
 Submit a task to the manager. More...
 
def install_library (self, task)
 Submit a library to install on all connected workers. More...
 
def remove_library (self, name)
 Remove a library from all connected workers. More...
 
def create_library_from_functions (self, name, *function_list, poncho_env=None, init_command=None, add_env=True, import_modules=None)
 Turn a list of python functions into a Library Task. More...
 
def create_library_from_serverized_files (self, name, library_path, env=None)
 Turn Library code created with poncho_package_serverize into a Library Task. More...
 
def create_library_from_command (self, executable_path, name, env=None)
 Create a Library task from arbitrary inputs. More...
 
def wait (self, timeout="wait_forever")
 Wait for tasks to complete. More...
 
def wait_for_tag (self, tag, timeout="wait_forever")
 Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified tag. More...
 
def wait_for_task_id (self, task_id, timeout="wait_forever")
 Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified task_id. More...
 
def application_info (self)
 Should return a dictionary with information for the status display. More...
 
def map (self, fn, seq, chunksize=1)
 Maps a function to elements in a sequence using taskvine. More...
 
def pair (self, fn, seq1, seq2, chunksize=1, env=None)
 Returns the values for a function of each pair from 2 sequences. More...
 
def tree_reduce (self, fn, seq, chunksize=2)
 Reduces a sequence until only one value is left, and then returns that value. More...
 
def remote_map (self, fn, seq, library, name, chunksize=1)
 Maps a function to elements in a sequence using taskvine remote task. More...
 
def remote_pair (self, fn, seq1, seq2, library, name, chunksize=1)
 Returns the values for a function of each pair from 2 sequences using remote task. More...
 
def remote_tree_reduce (self, fn, seq, library, name, chunksize=2)
 Reduces a sequence until only one value is left, and then returns that value. More...
 
def declare_file (self, path, cache=False, peer_transfer=True, unlink_when_done=False)
 Declare a file obtained from the local filesystem. More...
 
def fetch_file (self, file)
 Fetch file contents from the cluster or local disk. More...
 
def undeclare_file (self, file)
 Un-declare a file that was created by declare_file or similar methods. More...
 
def undeclare_function (self, fn)
 Remove the manager's local serialized copy of a function used with PythonTask. More...
 
def declare_temp (self)
 Declare an anonymous file has no initial content, but is created as the output of a task, and may be consumed by other tasks. More...
 
def declare_url (self, url, cache=False, peer_transfer=True)
 Declare a file obtained from a remote URL. More...
 
def declare_buffer (self, buffer=None, cache=False, peer_transfer=True)
 Declare a file created from a buffer in memory. More...
 
def declare_minitask (self, minitask, source, cache=False, peer_transfer=True)
 Declare a file created by executing a mini-task. More...
 
def declare_untar (self, tarball, cache=False, peer_transfer=True)
 Declare a file created by by unpacking a tar file. More...
 
def declare_poncho (self, package, cache=False, peer_transfer=True)
 Declare a file that sets up a poncho environment. More...
 
def declare_starch (self, starch, cache=False, peer_transfer=True)
 Declare a file create a file by unpacking a starch package. More...
 
def declare_xrootd (self, source, proxy=None, env=None, cache=False, peer_transfer=True)
 Declare a file from accessible from an xrootd server. More...
 
def declare_chirp (self, server, source, ticket=None, env=None, cache=False, peer_transfer=True)
 Declare a file from accessible from an xrootd server. More...
 
def log_txn_app (self, entry)
 Adds a custom APPLICATION entry to the transactions log. More...
 
def log_debug_app (self, entry)
 Adds a custom APPLICATION entry to the debug log. More...
 

Constructor & Destructor Documentation

◆ __init__()

def ndcctools.taskvine.manager.Manager.__init__ (   self,
  port = cvine.VINE_DEFAULT_PORT,
  name = None,
  shutdown = False,
  run_info_path = "vine-run-info",
  staging_path = None,
  ssl = None,
  init_fn = None,
  status_display_interval = None 
)

Create a new manager.

Parameters
selfReference to the current manager object.
portThe port number to listen on. If zero, then a random port is chosen. A range of possible ports (low, hight) can be also specified instead of a single integer. Default is 9123
nameThe project name to use.
shutdownAutomatically shutdown workers when manager is finished. Disabled by default.
run_info_pathDirectory to write log (and staging if staging_path not given) files per run. If None, defaults to "vine-run-info"
staging_pathDirectory to write temporary files. Defaults to run_info_path if not given.
sslA tuple of filenames (ssl_key, ssl_cert) in pem format, or True. If not given, then TSL is not activated. If True, a self-signed temporary key and cert are generated.
init_fnFunction applied to the newly created manager at initialization.
status_display_intervalNumber of seconds between updates to the jupyter status display. None, or less than 1 disables it.
See also
vine_create - For more information about environmental variables that affect the behavior this method.

Member Function Documentation

◆ name()

def ndcctools.taskvine.manager.Manager.name (   self)

Get the project name of the manager.

>>> print(q.name)

◆ port()

def ndcctools.taskvine.manager.Manager.port (   self)

Get the listening port of the manager.

>>> print(q.port)

◆ using_ssl()

def ndcctools.taskvine.manager.Manager.using_ssl (   self)

Whether the manager is using ssl to talk to workers.

>>> print(q.using_ssl)

◆ logging_directory()

def ndcctools.taskvine.manager.Manager.logging_directory (   self)

Get the logs directory of the manager.

◆ staging_directory()

def ndcctools.taskvine.manager.Manager.staging_directory (   self)

Get the staging directory of the manager.

◆ cache_directory()

def ndcctools.taskvine.manager.Manager.cache_directory (   self)

Get the caching directory of the manager.

◆ stats()

def ndcctools.taskvine.manager.Manager.stats (   self)

Get manager statistics.

>>> print(q.stats)

The fields in ndcctools.taskvine.manager.Manager.stats can also be individually accessed through this call. For example:

>>> print(q.stats.workers_busy)

◆ stats_category()

def ndcctools.taskvine.manager.Manager.stats_category (   self,
  category 
)

Get the task statistics for the given category.

Parameters
selfReference to the current manager object.
categoryA category name. For example:
s = q.stats_category("my_category")
>>> print(s)
The fields in ndcctools.taskvine.manager.Manager.stats can also be individually accessed through this call. For example:
>>> print(s.tasks_waiting)

◆ status()

def ndcctools.taskvine.manager.Manager.status (   self,
  request 
)

Get manager information as list of dictionaries.

Parameters
selfReference to the current manager object
requestOne of: "manager", "tasks", "workers", or "categories" For example:
import json
tasks_info = q.status("tasks")

◆ summarize_workers()

def ndcctools.taskvine.manager.Manager.summarize_workers (   self)

Get resource statistics of workers connected.

Parameters
selfReference to the current manager object.
Returns
A list of dictionaries that indicate how many .workers connected with a certain number of .cores, .memory, and disk. For example:
workers = q.summarize_workers()
>>> for w in workers:
>>> print("{} workers with: {} cores, {} MB memory, {} MB disk".format(w.workers, w.cores, w.memory, w.disk)

◆ set_category_mode()

def ndcctools.taskvine.manager.Manager.set_category_mode (   self,
  category,
  mode 
)

Turn on or off first-allocation labeling for a given category.

By default, only cores, memory, and disk are labeled, and gpus are unlabeled. NOTE: autolabeling is only meaningfull when task monitoring is enabled (ndcctools.taskvine.manager.Manager.enable_monitoring). When monitoring is enabled and a task exhausts resources in a worker, mode dictates how taskvine handles the exhaustion:

Parameters
selfReference to the current manager object.
categoryA category name. If None, sets the mode by default for newly created categories.
modeOne of:
  • "fixed" Task fails (default).
  • "max" If maximum values are specified for cores, memory, disk, and gpus (e.g. via ndcctools.taskvine.manager.Manager.set_category_resources_max or ndcctools.taskvine.task.Task.set_memory), and one of those resources is exceeded, the task fails. Otherwise it is retried until a large enough worker connects to the manager, using the maximum values specified, and the maximum values so far seen for resources not specified. Use ndcctools.taskvine.task.Task.set_retries to set a limit on the number of times manager attemps to complete the task.
  • "min waste" As above, but manager tries allocations to minimize resource waste.
  • "max throughput" As above, but manager tries allocations to maximize throughput.

◆ set_category_autolabel_resource()

def ndcctools.taskvine.manager.Manager.set_category_autolabel_resource (   self,
  category,
  resource,
  autolabel 
)

Turn on or off first-allocation labeling for a given category and resource.

This function should be use to fine-tune the defaults from ndcctools.taskvine.manager.Manager.set_category_mode.

Parameters
selfReference to the current manager object.
categoryA category name.
resourceA resource name.
autolabelTrue/False for on/off.
Returns
1 if resource is valid, 0 otherwise.

◆ task_state()

def ndcctools.taskvine.manager.Manager.task_state (   self,
  task_id 
)

Get current task state.

See vine_task_state_t for possible values.

>>> print(q.task_state(task_id))

◆ enable_monitoring()

def ndcctools.taskvine.manager.Manager.enable_monitoring (   self,
  watchdog = True,
  time_series = False 
)

Enables resource monitoring for tasks.

The resources measured are available in the resources_measured member of the respective vine_task.

Parameters
selfReference to the current manager object.
watchdogIf not 0, kill tasks that exhaust declared resources.
time_seriesIf not 0, generate a time series of resources per task in VINE_RUNTIME_INFO_DIR/vine-logs/time-series/ (WARNING: for long running tasks these files may reach gigabyte sizes. This function is mostly used for debugging.)

Returns 1 on success, 0 on failure (i.e., monitoring was not enabled).

◆ enable_peer_transfers()

def ndcctools.taskvine.manager.Manager.enable_peer_transfers (   self)

Enable P2P worker transfer functionality.

On by default

Parameters
selfReference to the current manager object.

◆ disable_peer_transfers()

def ndcctools.taskvine.manager.Manager.disable_peer_transfers (   self)

Disable P2P worker transfer functionality.

On by default

Parameters
selfReference to the current manager object.

◆ enable_disconnect_slow_workers()

def ndcctools.taskvine.manager.Manager.enable_disconnect_slow_workers (   self,
  multiplier 
)

Change the project name for the given manager.

Parameters
selfReference to the current manager object.

Enable disconnect slow workers functionality for a given manager for tasks in the "default" category, and for task which category does not set an explicit multiplier.

Parameters
selfReference to the current manager object.
multiplierThe multiplier of the average task time at which point to disconnect a worker; if less than 1, it is disabled (default).

◆ enable_disconnect_slow_workers_category()

def ndcctools.taskvine.manager.Manager.enable_disconnect_slow_workers_category (   self,
  name,
  multiplier 
)

Enable disconnect slow workers functionality for a given manager.

Parameters
selfReference to the current manager object.
nameName of the category.
multiplierThe multiplier of the average task time at which point to disconnect a worker; disabled if less than one (see ndcctools.taskvine.manager.Manager.enable_disconnect_slow_workers)

◆ set_draining_by_hostname()

def ndcctools.taskvine.manager.Manager.set_draining_by_hostname (   self,
  hostname,
  drain_mode = True 
)

Turn on or off draining mode for workers at hostname.

Parameters
selfReference to the current manager object.
hostnameThe hostname the host running the workers.
drain_modeIf True, no new tasks are dispatched to workers at hostname, and empty workers are shutdown. Else, workers works as usual.

◆ empty()

def ndcctools.taskvine.manager.Manager.empty (   self)

Determine whether there are any known tasks managerd, running, or waiting to be collected.

Returns 0 if there are tasks remaining in the system, 1 if the system is "empty".

Parameters
selfReference to the current manager object.

◆ hungry()

def ndcctools.taskvine.manager.Manager.hungry (   self)

Determine whether the manager can support more tasks.

Returns the number of additional tasks it can support if "hungry" and 0 if "sated".

Parameters
selfReference to the current manager object.

◆ set_scheduler()

def ndcctools.taskvine.manager.Manager.set_scheduler (   self,
  scheduler 
)

Set the worker selection scheduler for manager.

Parameters
selfReference to the current manager object.
schedulerOne of the following schedulers to use in assigning a task to a worker. See vine_schedule_t for possible values.

◆ set_name()

def ndcctools.taskvine.manager.Manager.set_name (   self,
  name 
)

Change the project name for the given manager.

Parameters
selfReference to the current manager object.
nameThe new project name.

◆ set_manager_preferred_connection()

def ndcctools.taskvine.manager.Manager.set_manager_preferred_connection (   self,
  mode 
)

Set the preference for using hostname over IP address to connect.

'by_ip' uses IP addresses from the network interfaces of the manager (standard behavior), 'by_hostname' to use the hostname at the manager, or 'by_apparent_ip' to use the address of the manager as seen by the catalog server.

Parameters
selfReference to the current manager object.
modeAn string to indicate using 'by_ip', 'by_hostname' or 'by_apparent_ip'.

◆ set_min_task_id()

def ndcctools.taskvine.manager.Manager.set_min_task_id (   self,
  minid 
)

Set the minimum task_id of future submitted tasks.

Further submitted tasks are guaranteed to have a task_id larger or equal to minid. This function is useful to make task_ids consistent in a workflow that consists of sequential managers. (Note: This function is rarely used). If the minimum id provided is smaller than the last task_id computed, the minimum id provided is ignored.

Parameters
selfReference to the current manager object.
minidMinimum desired task_id
Returns
Returns the actual minimum task_id for future tasks.

◆ set_priority()

def ndcctools.taskvine.manager.Manager.set_priority (   self,
  priority 
)

Change the project priority for the given manager.

Parameters
selfReference to the current manager object.
priorityAn integer that presents the priorty of this manager manager. The higher the value, the higher the priority.

◆ tasks_left_count()

def ndcctools.taskvine.manager.Manager.tasks_left_count (   self,
  ntasks 
)

Specify the number of tasks not yet submitted to the manager.

It is used by vine_factory to determine the number of workers to launch. If not specified, it defaults to 0. vine_factory considers the number of tasks as: num tasks left + num tasks running + num tasks read.

Parameters
selfReference to the current manager object.
ntasksNumber of tasks yet to be submitted.

◆ set_catalog_servers()

def ndcctools.taskvine.manager.Manager.set_catalog_servers (   self,
  catalogs 
)

Specify the catalog servers the manager should report to.

Parameters
selfReference to the current manager object.
catalogsThe catalog servers given as a comma delimited list of hostnames or hostname:port

◆ set_property()

def ndcctools.taskvine.manager.Manager.set_property (   self,
  name,
  value 
)

Add a global property to the manager which will be included in periodic reports to the catalog server and other telemetry destinations.

This is helpful for distinguishing higher level information about the entire run, such as the name of the framework being used, or the logical name of the dataset being processed.

Parameters
mA manager object
nameThe name of the property.
valueThe value of the property.

◆ set_runtime_info_path()

def ndcctools.taskvine.manager.Manager.set_runtime_info_path (   self,
  dirname 
)

Specify a directory to write logs and staging files.

Parameters
selfReference to the current manager object.
dirnameA directory name

◆ set_password()

def ndcctools.taskvine.manager.Manager.set_password (   self,
  password 
)

Add a mandatory password that each worker must present.

Parameters
selfReference to the current manager object.
passwordThe password.

◆ set_password_file()

def ndcctools.taskvine.manager.Manager.set_password_file (   self,
  file 
)

Add a mandatory password file that each worker must present.

Parameters
selfReference to the current manager object.
fileName of the file containing the password.

◆ set_resources_max()

def ndcctools.taskvine.manager.Manager.set_resources_max (   self,
  rmd 
)

Specifies the maximum resources allowed for the default category.

Parameters
selfReference to the current manager object.
rmdDictionary indicating maximum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example:
>>> # A maximum of 4 cores is found on any worker:
>>> q.set_resources_max({'cores': 4})
>>> # A maximum of 8 cores, 1GB of memory, and 10GB disk are found on any worker:
>>> q.set_resources_max({'cores': 8, 'memory': 1024, 'disk': 10240})

◆ set_resources_min()

def ndcctools.taskvine.manager.Manager.set_resources_min (   self,
  rmd 
)

Specifies the minimum resources allowed for the default category.

Parameters
selfReference to the current manager object.
rmdDictionary indicating minimum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example:
>>> # A minimum of 2 cores is found on any worker:
>>> q.set_resources_min({'cores': 2})
>>> # A minimum of 4 cores, 512MB of memory, and 1GB disk are found on any worker:
>>> q.set_resources_min({'cores': 4, 'memory': 512, 'disk': 1024})

◆ set_category_resources_max()

def ndcctools.taskvine.manager.Manager.set_category_resources_max (   self,
  category,
  rmd 
)

Specifies the maximum resources allowed for the given category.

Parameters
selfReference to the current manager object.
categoryName of the category.
rmdDictionary indicating maximum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example:
>>> # A maximum of 4 cores may be used by a task in the category:
>>> q.set_category_resources_max("my_category", {'cores': 4})
>>> # A maximum of 8 cores, 1GB of memory, and 10GB may be used by a task:
>>> q.set_category_resources_max("my_category", {'cores': 8, 'memory': 1024, 'disk': 10240})

◆ set_category_resources_min()

def ndcctools.taskvine.manager.Manager.set_category_resources_min (   self,
  category,
  rmd 
)

Specifies the minimum resources allowed for the given category.

Parameters
selfReference to the current manager object.
categoryName of the category.
rmdDictionary indicating minimum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example:
>>> # A minimum of 2 cores is found on any worker:
>>> q.set_category_resources_min("my_category", {'cores': 2})
>>> # A minimum of 4 cores, 512MB of memory, and 1GB disk are found on any worker:
>>> q.set_category_resources_min("my_category", {'cores': 4, 'memory': 512, 'disk': 1024})

◆ set_category_first_allocation_guess()

def ndcctools.taskvine.manager.Manager.set_category_first_allocation_guess (   self,
  category,
  rmd 
)

Specifies the first-allocation guess for the given category.

Parameters
selfReference to the current manager object.
categoryName of the category.
rmdDictionary indicating maximum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields. For example:
>>> # Tasks are first tried with 4 cores:
>>> q.set_category_first_allocation_guess("my_category", {'cores': 4})
>>> # Tasks are first tried with 8 cores, 1GB of memory, and 10GB:
>>> q.set_category_first_allocation_guess("my_category", {'cores': 8, 'memory': 1024, 'disk': 10240})

◆ initialize_categories()

def ndcctools.taskvine.manager.Manager.initialize_categories (   self,
  filename,
  rm 
)

Initialize first value of categories.

Parameters
selfReference to the current manager object.
rmDictionary indicating maximum values. See ndcctools.taskvine.task.Task.resources_measured for possible fields.
filenameJSON file with resource summaries.

◆ cancel_by_task_id()

def ndcctools.taskvine.manager.Manager.cancel_by_task_id (   self,
  id 
)

Cancel task identified by its task_id.

The cancelled task will be returned in the normal way via wait with a result of VINE_RESULT_CANCELLED.

Parameters
selfReference to the current manager object.
idThe task_id returned from ndcctools.taskvine.manager.Manager.submit.
Returns
One if the task was found and cancelled, zero otherwise.

◆ cancel_by_task_tag()

def ndcctools.taskvine.manager.Manager.cancel_by_task_tag (   self,
  tag 
)

Cancel task identified by its tag.

The cancelled task will be returned in the normal way via wait with a result of VINE_RESULT_CANCELLED.

Parameters
selfReference to the current manager object.
tagThe tag assigned to task using ndcctools.taskvine.task.Task.set_tag.
Returns
One if the task was found and cancelled, zero otherwise.

◆ cancel_by_category()

def ndcctools.taskvine.manager.Manager.cancel_by_category (   self,
  category 
)

Cancel all tasks of the given category.

The cancelled tasks will be returned in the normal way via wait with a result of VINE_RESULT_CANCELLED.

Parameters
selfReference to the current manager object.
categoryThe name of the category to cancel.
Returns
The total number of tasks cancelled.

◆ cancel_all()

def ndcctools.taskvine.manager.Manager.cancel_all (   self)

Cancel all tasks.

The cancelled tasks will be returned in the normal way via wait with a result of VINE_RESULT_CANCELLED.

Parameters
selfReference to the current manager object.
Returns
The total number of tasks cancelled.

◆ workers_shutdown()

def ndcctools.taskvine.manager.Manager.workers_shutdown (   self,
  n = 0 
)

Shutdown workers connected to manager.

Gives a best effort and then returns the number of workers given the shutdown order.

Parameters
selfReference to the current manager object.
nThe number to shutdown. 0 shutdowns all workers

◆ block_host()

def ndcctools.taskvine.manager.Manager.block_host (   self,
  host 
)

Block workers running on host from working for the manager.

Parameters
selfReference to the current manager object.
hostThe hostname the host running the workers.

◆ blacklist()

def ndcctools.taskvine.manager.Manager.blacklist (   self,
  host 
)

◆ block_host_with_timeout()

def ndcctools.taskvine.manager.Manager.block_host_with_timeout (   self,
  host,
  timeout 
)

Block workers running on host for the duration of the given timeout.

Parameters
selfReference to the current manager object.
hostThe hostname the host running the workers.
timeoutHow long this block entry lasts (in seconds). If less than 1, block indefinitely.

◆ blacklist_with_timeout()

def ndcctools.taskvine.manager.Manager.blacklist_with_timeout (   self,
  host,
  timeout 
)

◆ unblock_host()

def ndcctools.taskvine.manager.Manager.unblock_host (   self,
  host = None 
)

Unblock given host, of all hosts if host not given.

Parameters
selfReference to the current manager object.
hostThe of the hostname the host.

◆ blacklist_clear()

def ndcctools.taskvine.manager.Manager.blacklist_clear (   self,
  host = None 
)

◆ set_keepalive_interval()

def ndcctools.taskvine.manager.Manager.set_keepalive_interval (   self,
  interval 
)

Change keepalive interval for a given manager.

Parameters
selfReference to the current manager object.
intervalMinimum number of seconds to wait before sending new keepalive checks to workers.

◆ set_keepalive_timeout()

def ndcctools.taskvine.manager.Manager.set_keepalive_timeout (   self,
  timeout 
)

Change keepalive timeout for a given manager.

Parameters
selfReference to the current manager object.
timeoutMinimum number of seconds to wait for a keepalive response from worker before marking it as dead.

◆ tune()

def ndcctools.taskvine.manager.Manager.tune (   self,
  name,
  value 
)

Tune advanced parameters.

Parameters
selfReference to the current manager object.
nameThe name fo the parameter to tune. Can be one of following:
  • "resource-submit-multiplier" Treat each worker as having ({cores,memory,gpus} * multiplier) when submitting tasks. This allows for tasks to wait at a worker rather than the manager. (default = 1.0)
  • "min-transfer-timeout" Set the minimum number of seconds to wait for files to be transferred to or from a worker. (default=10)
  • "foreman-transfer-timeout" Set the minimum number of seconds to wait for files to be transferred to or from a foreman. (default=3600)
  • "transfer-outlier-factor" Transfer that are this many times slower than the average will be terminated. (default=10x)
  • "default-transfer-rate" The assumed network bandwidth used until sufficient data has been collected. (1MB/s)
  • "disconnect-slow-workers-factor" Set the multiplier of the average task time at which point to disconnect a worker; disabled if less than 1. (default=0)
  • "keepalive-interval" Set the minimum number of seconds to wait before sending new keepalive checks to workers. (default=300)
  • "keepalive-timeout" Set the minimum number of seconds to wait for a keepalive response from worker before marking it as dead. (default=30)
  • "short-timeout" Set the minimum timeout when sending a brief message to a single worker. (default=5s)
  • "long-timeout" Set the minimum timeout when sending a brief message to a foreman. (default=1h)
  • "category-steady-n-tasks" Set the number of tasks considered when computing category buckets.
  • "hungry-minimum" Mimimum number of tasks to consider manager not hungry. (default=10)
  • monitor-interval Maximum number of seconds between resource monitor measurements. If less than 1, use default (5s).
  • "wait-for-workers" Mimimum number of workers to connect before starting dispatching tasks. (default=0)
  • "attempt-schedule-depth" The amount of tasks to attempt scheduling on each pass of send_one_task in the main loop. (default=100)
  • "wait_retrieve_many" Parameter to alter how vine_wait works. If set to 0, cvine.vine_wait breaks out of the while loop whenever a task changes to "task_done" (wait_retrieve_one mode). If set to 1, vine_wait does not break, but continues recieving and dispatching tasks. This occurs until no task is sent or recieved, at which case it breaks out of the while loop (wait_retrieve_many mode). (default=0)
  • "monitor-interval" Parameter to change how frequently the resource monitor records resource consumption of a task in a times series, if this feature is enabled. See enable_monitoring.
valueThe value to set the parameter to.
Returns
0 on succes, -1 on failure.

◆ submit()

def ndcctools.taskvine.manager.Manager.submit (   self,
  task 
)

Submit a task to the manager.

It is safe to re-submit a task returned by ndcctools.taskvine.manager.Manager.wait.

Parameters
selfReference to the current manager object.
taskA task description created from ndcctools.taskvine.task.Task.

◆ install_library()

def ndcctools.taskvine.manager.Manager.install_library (   self,
  task 
)

Submit a library to install on all connected workers.

Parameters
selfReference to the current manager object.
taskA Library Task description created from create_library_from_functions or create_library_from_files

◆ remove_library()

def ndcctools.taskvine.manager.Manager.remove_library (   self,
  name 
)

Remove a library from all connected workers.

Parameters
selfReference to the current manager object.
nameName of the library to be removed.

◆ create_library_from_functions()

def ndcctools.taskvine.manager.Manager.create_library_from_functions (   self,
  name,
function_list,
  poncho_env = None,
  init_command = None,
  add_env = True,
  import_modules = None 
)

Turn a list of python functions into a Library Task.

This Library Task will be run on a worker as a regular task. Note that functions are required to have source code available, so dynamically generated functions won't work (e.g., lambda, interactive).

Parameters
selfReference to the current manager object.
nameName of the Library to be created
function_listList of all functions to be included in the library
poncho_envName of an already prepared poncho environment
init_commandA string describing a shell command to execute before the library task is run
add_envWhether to automatically create and/or add environment to the library
Returns
A task to be used with ndcctools.taskvine.manager.Manager.install_library.
Parameters
import_modulesA list of modules to be imported at the preamble of library

◆ create_library_from_serverized_files()

def ndcctools.taskvine.manager.Manager.create_library_from_serverized_files (   self,
  name,
  library_path,
  env = None 
)

Turn Library code created with poncho_package_serverize into a Library Task.

Parameters
selfReference to the current manager object.
nameName that identifies this library to the FunctionCalls
library_pathFilename of the library (i.e., the output of poncho_package_serverize)
envEnvironment to run the library. Either a vine file that expands to an environment (see ndcctools.taskvine.task.Task.add_environment), or a path to a poncho environment.
Returns
A task to be used with ndcctools.taskvine.manager.Manager.install_library.

◆ create_library_from_command()

def ndcctools.taskvine.manager.Manager.create_library_from_command (   self,
  executable_path,
  name,
  env = None 
)

Create a Library task from arbitrary inputs.

Parameters
selfReference to the current manager object
executable_pathFilename of the library executable
nameName of the library to be created
envEnvironment to run the library. Either a vine file that expands to an environment (see ndcctools.taskvine.task.Task.add_environment), or a path to a poncho environment.
Returns
A task to be used with ndcctools.taskvine.manager.Manager.install_library

◆ wait()

def ndcctools.taskvine.manager.Manager.wait (   self,
  timeout = "wait_forever" 
)

Wait for tasks to complete.

This call will block until the timeout has elapsed

Parameters
selfReference to the current manager object.
timeoutThe number of seconds to wait for a completed task before returning. Use an integer to set the timeout or the value "wait_forever" to block until a task has completed.

◆ wait_for_tag()

def ndcctools.taskvine.manager.Manager.wait_for_tag (   self,
  tag,
  timeout = "wait_forever" 
)

Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified tag.

This call will block until the timeout has elapsed.

Parameters
selfReference to the current manager object.
tagDesired tag. If None, then it is equivalent to self.wait(timeout)
timeoutThe number of seconds to wait for a completed task before returning.

◆ wait_for_task_id()

def ndcctools.taskvine.manager.Manager.wait_for_task_id (   self,
  task_id,
  timeout = "wait_forever" 
)

Similar to ndcctools.taskvine.manager.Manager.wait, but guarantees that the returned task has the specified task_id.

This call will block until the timeout has elapsed.

Parameters
selfReference to the current manager object.
task_idDesired task_id. If -1, then it is equivalent to self.wait(timeout)
timeoutThe number of seconds to wait for a completed task before returning.

◆ application_info()

def ndcctools.taskvine.manager.Manager.application_info (   self)

Should return a dictionary with information for the status display.

This method is meant to be overriden by custom applications.

The dictionary should be of the form:

{ "application_info" : {"values" : dict, "units" : dict} }

where "units" is an optional dictionary that indicates the units of the corresponding key in "values".

Parameters
selfReference to the current work queue object.

For example:

>>> myapp.application_info()
{'application_info': {'values': {'size_max_output': 0.361962, 'current_chunksize': 65536}, 'units': {'size_max_output': 'MB'}}}

◆ map()

def ndcctools.taskvine.manager.Manager.map (   self,
  fn,
  seq,
  chunksize = 1 
)

Maps a function to elements in a sequence using taskvine.

Similar to regular map function in python

Parameters
selfReference to the current manager object.
fnThe function that will be called on each element
seqThe sequence that will call the function
chunksizeThe number of elements to process at once

◆ pair()

def ndcctools.taskvine.manager.Manager.pair (   self,
  fn,
  seq1,
  seq2,
  chunksize = 1,
  env = None 
)

Returns the values for a function of each pair from 2 sequences.

The pairs that are passed into the function are generated by itertools

Parameters
selfReference to the current manager object.
fnThe function that will be called on each element
seq1The first seq that will be used to generate pairs
seq2The second seq that will be used to generate pairs
chunksizeNumber of pairs to process at once (default is 1)
envFilename of a python environment tarball (conda or poncho)

◆ tree_reduce()

def ndcctools.taskvine.manager.Manager.tree_reduce (   self,
  fn,
  seq,
  chunksize = 2 
)

Reduces a sequence until only one value is left, and then returns that value.

The sequence is reduced by passing a pair of elements into a function and then stores the result. It then makes a sequence from the results, and reduces again until one value is left.

If the sequence has an odd length, the last element gets reduced at the end.

Parameters
selfReference to the current manager object.
fnThe function that will be called on each element
seqThe seq that will be reduced
chunksizeThe number of elements per Task (for tree reduc, must be greater than 1)

◆ remote_map()

def ndcctools.taskvine.manager.Manager.remote_map (   self,
  fn,
  seq,
  library,
  name,
  chunksize = 1 
)

Maps a function to elements in a sequence using taskvine remote task.

Similar to regular map function in python, but creates a task to execute each function on a worker running a library

Parameters
selfReference to the current manager object.
fnThe function that will be called on each element. This function exists in library.
seqThe sequence that will call the function
libraryThe name of the library that contains the function fn.
nameThis defines the key in the event json that wraps the data sent to the library.
chunksizeThe number of elements to process at once

◆ remote_pair()

def ndcctools.taskvine.manager.Manager.remote_pair (   self,
  fn,
  seq1,
  seq2,
  library,
  name,
  chunksize = 1 
)

Returns the values for a function of each pair from 2 sequences using remote task.

The pairs that are passed into the function are generated by itertools

Parameters
selfReference to the current manager object.
fnThe function that will be called on each element. This function exists in library.
seq1The first seq that will be used to generate pairs
seq2The second seq that will be used to generate pairs
libraryThe name of the library that contains the function fn.
nameThis defines the key in the event json that wraps the data sent to the library.
chunksizeThe number of elements to process at once

◆ remote_tree_reduce()

def ndcctools.taskvine.manager.Manager.remote_tree_reduce (   self,
  fn,
  seq,
  library,
  name,
  chunksize = 2 
)

Reduces a sequence until only one value is left, and then returns that value.

The sequence is reduced by passing a pair of elements into a function and then stores the result. It then makes a sequence from the results, and reduces again until one value is left. Executes on library

If the sequence has an odd length, the last element gets reduced at the end.

Parameters
selfReference to the current manager object.
fnThe function that will be called on each element. Exists on the library
seqThe seq that will be reduced
libraryThe name of the library that contains the function fn.
nameThis defines the key in the event json that wraps the data sent to the library.
chunksizeThe number of elements per Task (for tree reduc, must be greater than 1)

◆ declare_file()

def ndcctools.taskvine.manager.Manager.declare_file (   self,
  path,
  cache = False,
  peer_transfer = True,
  unlink_when_done = False 
)

Declare a file obtained from the local filesystem.

Parameters
selfThe manager to register this file
pathThe path to the local file
cacheIf True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached).
peer_transferWhether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
unlink_when_doneWhether to delete the file when its reference count is 0. (Warning: Only use on files produced by the application, and never on irreplaceable input files.)
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input or ndcctools.taskvine.task.Task.add_output

◆ fetch_file()

def ndcctools.taskvine.manager.Manager.fetch_file (   self,
  file 
)

Fetch file contents from the cluster or local disk.

Parameters
selfThe manager to register this file
fileThe file object
Returns
The contents of the file as a strong.

◆ undeclare_file()

def ndcctools.taskvine.manager.Manager.undeclare_file (   self,
  file 
)

Un-declare a file that was created by declare_file or similar methods.

The given file or directory object is deleted from all worker's caches, and is no longer available for use as an input file. Completed tasks waiting for retrieval are not affected. Note that all declared files are automatically undeclared by vine_delete, however this function can be used for earlier cleanup of unneeded file objects.

Parameters
selfThe manager to register this file
fileThe file object

◆ undeclare_function()

def ndcctools.taskvine.manager.Manager.undeclare_function (   self,
  fn 
)

Remove the manager's local serialized copy of a function used with PythonTask.

Parameters
selfThe manager to register this file
fnThe function that the manager should forget.

◆ declare_temp()

def ndcctools.taskvine.manager.Manager.declare_temp (   self)

Declare an anonymous file has no initial content, but is created as the output of a task, and may be consumed by other tasks.

Parameters
selfThe manager to register this file
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input or ndcctools.taskvine.task.Task.add_output

◆ declare_url()

def ndcctools.taskvine.manager.Manager.declare_url (   self,
  url,
  cache = False,
  peer_transfer = True 
)

Declare a file obtained from a remote URL.

Parameters
selfThe manager to register this file
urlThe url of the file.
cacheIf True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached).
peer_transferWhether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input

◆ declare_buffer()

def ndcctools.taskvine.manager.Manager.declare_buffer (   self,
  buffer = None,
  cache = False,
  peer_transfer = True 
)

Declare a file created from a buffer in memory.

Parameters
selfThe manager to register this file
bufferThe contents of the buffer, or None for an empty output buffer
cacheIf True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached).
peer_transferWhether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input

For example:

>>> s = "hello pirate ♆"
>>> f = m.declare_buffer(bytes(s, "utf-8"))
>>> print(bytes.decode(f.contents(), "utf-8"))
>>> "hello pirate ♆"

◆ declare_minitask()

def ndcctools.taskvine.manager.Manager.declare_minitask (   self,
  minitask,
  source,
  cache = False,
  peer_transfer = True 
)

Declare a file created by executing a mini-task.

Parameters
selfThe manager to register this file
minitaskThe task to execute in order to produce a file
sourceThe name of the file to extract from the task's sandbox.
cacheIf True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached).
peer_transferWhether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input

◆ declare_untar()

def ndcctools.taskvine.manager.Manager.declare_untar (   self,
  tarball,
  cache = False,
  peer_transfer = True 
)

Declare a file created by by unpacking a tar file.

Parameters
selfThe manager to register this file
tarballThe file object to un-tar
cacheIf True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached).
peer_transferWhether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input

◆ declare_poncho()

def ndcctools.taskvine.manager.Manager.declare_poncho (   self,
  package,
  cache = False,
  peer_transfer = True 
)

Declare a file that sets up a poncho environment.

Parameters
selfThe manager to register this file
packageThe poncho environment tarball. Either a vine file or a string representing a local file.
cacheIf True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached).
peer_transferWhether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input

◆ declare_starch()

def ndcctools.taskvine.manager.Manager.declare_starch (   self,
  starch,
  cache = False,
  peer_transfer = True 
)

Declare a file create a file by unpacking a starch package.

Parameters
selfThe manager to register this file
starchThe startch .sfx file. Either a vine file or a string representing a local file.
cacheIf True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached).
peer_transferWhether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input

◆ declare_xrootd()

def ndcctools.taskvine.manager.Manager.declare_xrootd (   self,
  source,
  proxy = None,
  env = None,
  cache = False,
  peer_transfer = True 
)

Declare a file from accessible from an xrootd server.

Parameters
selfThe manager to register this file.
sourceThe URL address of the root file in text form as: "root://XROOTSERVER[:port]//path/to/file"
proxyA ndcctools.taskvine.file.File of the X509 proxy to use. If None, the environment variable X509_USER_PROXY and the file "$TMPDIR/$UID" are considered in that order. If no proxy is present, the transfer is tried without authentication.
envIf not None, an environment file (e.g poncho or starch, see ndcctools.taskvine.task.Task.add_environment) that contains the xrootd executables. Otherwise assume xrootd is available at the worker.
cacheIf True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached).
peer_transferWhether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input

◆ declare_chirp()

def ndcctools.taskvine.manager.Manager.declare_chirp (   self,
  server,
  source,
  ticket = None,
  env = None,
  cache = False,
  peer_transfer = True 
)

Declare a file from accessible from an xrootd server.

Parameters
selfThe manager to register this file.
serverThe chirp server address of the form "hostname[:port"]"
sourceThe name of the file in the server
ticketIf not None, a file object that provides a chirp an authentication ticket
envIf not None, an environment file (e.g poncho or starch) that contains the chirp executables. Otherwise assume chirp is available at the worker.
cacheIf True or 'workflow', cache the file at workers for reuse until the end of the workflow. If 'worker', the file is cache until the end-of-life of the worker. If 'forever', the file is cached beyond the end-of-life of the worker. Default is False (file is not cached).
peer_transferWhether the file can be transfered between workers when peer transfers are enabled (see ndcctools.taskvine.manager.Manager.enable_peer_transfers). Default is True.
Returns
A file object to use in ndcctools.taskvine.task.Task.add_input

◆ log_txn_app()

def ndcctools.taskvine.manager.Manager.log_txn_app (   self,
  entry 
)

Adds a custom APPLICATION entry to the transactions log.

Parameters
selfThe manager to register this file.
serverA custom transaction message

◆ log_debug_app()

def ndcctools.taskvine.manager.Manager.log_debug_app (   self,
  entry 
)

Adds a custom APPLICATION entry to the debug log.

Parameters
selfThe manager to register this file.
serverA custom debug message

The documentation for this class was generated from the following file: