cctools
ndcctools.work_queue.WorkQueue Class Reference

Python Work Queue object. More...

Inheritance diagram for ndcctools.work_queue.WorkQueue:

Public Member Functions

def __init__ (self, port=WORK_QUEUE_DEFAULT_PORT, name=None, shutdown=False, stats_log=None, transactions_log=None, debug_log=None, ssl=None, status_display_interval=None)
 Create a new work queue. More...
 
def name (self)
 Get the project name of the queue. More...
 
def port (self)
 Get the listening port of the queue. More...
 
def using_ssl (self)
 Whether the manager is using ssl to talk to workers. More...
 
def stats (self)
 Get queue statistics. More...
 
def stats_hierarchy (self)
 Get worker hierarchy statistics. More...
 
def stats_category (self, category)
 Get the task statistics for the given category. More...
 
def status (self, request)
 Get queue information as list of dictionaries. More...
 
def workers_summary (self)
 Get resource statistics of workers connected. More...
 
def specify_category_mode (self, category, mode)
 Turn on or off first-allocation labeling for a given category. More...
 
def specify_category_autolabel_resource (self, category, resource, autolabel)
 Turn on or off first-allocation labeling for a given category and resource. More...
 
def task_state (self, taskid)
 Get current task state. More...
 
def enable_monitoring (self, dirname=None, watchdog=True)
 Enables resource monitoring of tasks in the queue, and writes a summary per task to the directory given. More...
 
def enable_monitoring_full (self, dirname=None, watchdog=True)
 As ndcctools.work_queue.WorkQueue.enable_monitoring, but it also generates a time series and a debug file. More...
 
def activate_fast_abort (self, multiplier)
 Turn on or off fast abort functionality for a given queue for tasks in the "default" category, and for task which category does not set an explicit multiplier. More...
 
def activate_fast_abort_category (self, name, multiplier)
 Turn on or off fast abort functionality for a given queue. More...
 
def specify_draining_by_hostname (self, hostname, drain_mode=True)
 Turn on or off draining mode for workers at hostname. More...
 
def empty (self)
 Determine whether there are any known tasks queued, running, or waiting to be collected. More...
 
def hungry (self)
 Determine whether the queue can support more tasks. More...
 
def specify_algorithm (self, algorithm)
 Set the worker selection algorithm for queue. More...
 
def specify_task_order (self, order)
 Set the order for dispatching submitted tasks in the queue. More...
 
def specify_name (self, name)
 Change the project name for the given queue. More...
 
def specify_manager_preferred_connection (self, mode)
 Set the preference for using hostname over IP address to connect. More...
 
def specify_master_preferred_connection (self, mode)
 See ndcctools.work_queue.WorkQueue.specify_manager_preferred_connection. More...
 
def specify_min_taskid (self, minid)
 Set the minimum taskid of future submitted tasks. More...
 
def specify_priority (self, priority)
 Change the project priority for the given queue. More...
 
def specify_num_tasks_left (self, ntasks)
 Specify the number of tasks not yet submitted to the queue. More...
 
def specify_manager_mode (self, mode)
 Specify the manager mode for the given queue. More...
 
def specify_master_mode (self, mode)
 
def specify_catalog_server (self, hostname, port)
 Specify the catalog server the manager should report to. More...
 
def specify_log (self, logfile)
 Specify a log file that records the cummulative stats of connected workers and submitted tasks. More...
 
def specify_transactions_log (self, logfile)
 Specify a log file that records the states of tasks. More...
 
def specify_password (self, password)
 Add a mandatory password that each worker must present. More...
 
def specify_password_file (self, file)
 Add a mandatory password file that each worker must present. More...
 
def specify_max_resources (self, rmd)
 Specifies the maximum resources allowed for the default category. More...
 
def specify_min_resources (self, rmd)
 Specifies the minimum resources allowed for the default category. More...
 
def specify_category_max_resources (self, category, rmd)
 Specifies the maximum resources allowed for the given category. More...
 
def specify_category_min_resources (self, category, rmd)
 Specifies the minimum resources allowed for the given category. More...
 
def specify_category_first_allocation_guess (self, category, rmd)
 Specifies the first-allocation guess for the given category. More...
 
def specify_category_max_concurrent (self, category, max_concurrent)
 Specifies the maximum resources allowed for the given category. More...
 
def initialize_categories (self, filename, rm)
 Initialize first value of categories. More...
 
def cancel_by_taskid (self, id)
 Cancel task identified by its taskid and remove from the given queue. More...
 
def cancel_by_tasktag (self, tag)
 Cancel task identified by its tag and remove from the given queue. More...
 
def cancel_by_category (self, category)
 Cancel all tasks of the given category and remove them from the queue. More...
 
def shutdown_workers (self, n)
 Shutdown workers connected to queue. More...
 
def block_host (self, host)
 Block workers running on host from working for the manager. More...
 
def blacklist (self, host)
 Replaced by ndcctools.work_queue.WorkQueue.block_host. More...
 
def block_host_with_timeout (self, host, timeout)
 Block workers running on host for the duration of the given timeout. More...
 
def blacklist_with_timeout (self, host, timeout)
 See ndcctools.work_queue.WorkQueue.block_host_with_timeout. More...
 
def unblock_host (self, host=None)
 Unblock given host, of all hosts if host not given. More...
 
def blacklist_clear (self, host=None)
 See ndcctools.work_queue.WorkQueue.unblock_host. More...
 
def invalidate_cache_file (self, local_name)
 Delete file from workers's caches. More...
 
def specify_keepalive_interval (self, interval)
 Change keepalive interval for a given queue. More...
 
def specify_keepalive_timeout (self, timeout)
 Change keepalive timeout for a given queue. More...
 
def estimate_capacity (self)
 Turn on manager capacity measurements. More...
 
def tune (self, name, value)
 Tune advanced parameters for work queue. More...
 
def submit (self, task)
 Submit a task to the queue. More...
 
def wait (self, timeout=WORK_QUEUE_WAITFORTASK)
 Wait for tasks to complete. More...
 
def wait_for_tag (self, tag, timeout=WORK_QUEUE_WAITFORTASK)
 Similar to ndcctools.work_queue.WorkQueue.wait, but guarantees that the returned task has the specified tag. More...
 
def application_info (self)
 Should return a dictionary with information for the status display. More...
 
def map (self, fn, seq, chunksize=1)
 Maps a function to elements in a sequence using work_queue. More...
 
def pair (self, fn, seq1, seq2, chunksize=1, env=None)
 Returns the values for a function of each pair from 2 sequences. More...
 
def tree_reduce (self, fn, seq, chunksize=2)
 Reduces a sequence until only one value is left, and then returns that value. More...
 
def remote_map (self, fn, seq, coprocess, name, chunksize=1)
 Maps a function to elements in a sequence using work_queue remote task. More...
 
def remote_pair (self, fn, seq1, seq2, coprocess, name, chunksize=1)
 Returns the values for a function of each pair from 2 sequences using remote task. More...
 
def remote_tree_reduce (self, fn, seq, coprocess, name, chunksize=2)
 Reduces a sequence until only one value is left, and then returns that value. More...
 

Detailed Description

Python Work Queue object.

Constructor & Destructor Documentation

◆ __init__()

def ndcctools.work_queue.WorkQueue.__init__ (   self,
  port = WORK_QUEUE_DEFAULT_PORT,
  name = None,
  shutdown = False,
  stats_log = None,
  transactions_log = None,
  debug_log = None,
  ssl = None,
  status_display_interval = None 
)

Create a new work queue.

Parameters
selfReference to the current work queue object.
portThe port number to listen on. If zero, then a random port is chosen. A range of possible ports (low, hight) can be also specified instead of a single integer.
nameThe project name to use.
stats_logThe name of a file to write the queue's statistics log.
transactions_logThe name of a file to write the queue's transactions log.
debug_logThe name of a file to write the queue's debug log.
shutdownAutomatically shutdown workers when queue is finished. Disabled by default.
sslA tuple of filenames (ssl_key, ssl_cert) in pem format, or True. If not given, then TSL is not activated. If True, a self-signed temporary key and cert are generated.
status_display_intervalNumber of seconds between updates to the jupyter status display. None, or less than 1 disables it.
See also
work_queue_create - For more information about environmental variables that affect the behavior this method.

Member Function Documentation

◆ name()

def ndcctools.work_queue.WorkQueue.name (   self)

Get the project name of the queue.

>>> print(q.name)

◆ port()

def ndcctools.work_queue.WorkQueue.port (   self)

Get the listening port of the queue.

>>> print(q.port)

◆ using_ssl()

def ndcctools.work_queue.WorkQueue.using_ssl (   self)

Whether the manager is using ssl to talk to workers.

>>> print(q.using_ssl)

◆ stats()

def ndcctools.work_queue.WorkQueue.stats (   self)

Get queue statistics.

>>> print(q.stats)

The fields in ndcctools.work_queue.WorkQueue.stats can also be individually accessed through this call. For example:

>>> print(q.stats.workers_busy)

◆ stats_hierarchy()

def ndcctools.work_queue.WorkQueue.stats_hierarchy (   self)

Get worker hierarchy statistics.

>>> print(q.stats_hierarchy)

The fields in ndcctools.work_queue.WorkQueue.stats_hierarchy can also be individually accessed through this call. For example:

>>> print(q.stats_hierarchy.workers_busy)

◆ stats_category()

def ndcctools.work_queue.WorkQueue.stats_category (   self,
  category 
)

Get the task statistics for the given category.

Parameters
selfReference to the current work queue object.
categoryA category name. For example:
s = q.stats_category("my_category")
>>> print(s)
The fields in work_queue_stats can also be individually accessed through this call. For example:
>>> print(s.tasks_waiting)

◆ status()

def ndcctools.work_queue.WorkQueue.status (   self,
  request 
)

Get queue information as list of dictionaries.

Parameters
selfReference to the current work queue object
requestOne of: "queue", "tasks", "workers", or "categories" For example:
import json
tasks_info = q.status("tasks")

◆ workers_summary()

def ndcctools.work_queue.WorkQueue.workers_summary (   self)

Get resource statistics of workers connected.

Parameters
selfReference to the current work queue object.
Returns
A list of dictionaries that indicate how many .workers connected with a certain number of .cores, .memory, and disk. For example:
workers = q.worker_summary()
>>> for w in workers:
>>> print("{} workers with: {} cores, {} MB memory, {} MB disk".format(w.workers, w.cores, w.memory, w.disk)

◆ specify_category_mode()

def ndcctools.work_queue.WorkQueue.specify_category_mode (   self,
  category,
  mode 
)

Turn on or off first-allocation labeling for a given category.

By default, only cores, memory, and disk are labeled, and gpus are unlabeled. NOTE: autolabeling is only meaningfull when task monitoring is enabled (ndcctools.work_queue.WorkQueue.enable_monitoring). When monitoring is enabled and a task exhausts resources in a worker, mode dictates how work queue handles the exhaustion:

Parameters
selfReference to the current work queue object.
categoryA category name. If None, sets the mode by default for newly created categories.
modeOne of:
  • WORK_QUEUE_ALLOCATION_MODE_FIXED Task fails (default).
  • WORK_QUEUE_ALLOCATION_MODE_MAX If maximum values are specified for cores, memory, disk, and gpus (e.g. via ndcctools.work_queue.WorkQueue.specify_category_max_resources or ndcctools.work_queue.Task.specify_memory), and one of those resources is exceeded, the task fails. Otherwise it is retried until a large enough worker connects to the manager, using the maximum values specified, and the maximum values so far seen for resources not specified. Use ndcctools.work_queue.Task.specify_max_retries to set a limit on the number of times work queue attemps to complete the task.
  • WORK_QUEUE_ALLOCATION_MODE_MIN_WASTE As above, but work queue tries allocations to minimize resource waste.
  • WORK_QUEUE_ALLOCATION_MODE_MAX_THROUGHPUT As above, but work queue tries allocations to maximize throughput.

◆ specify_category_autolabel_resource()

def ndcctools.work_queue.WorkQueue.specify_category_autolabel_resource (   self,
  category,
  resource,
  autolabel 
)

Turn on or off first-allocation labeling for a given category and resource.

This function should be use to fine-tune the defaults from ndcctools.work_queue.WorkQueue.specify_category_mode.

Parameters
selfReference to the current work queue object.
categoryA category name.
resourceA resource name.
autolabelTrue/False for on/off.
Returns
1 if resource is valid, 0 otherwise.

◆ task_state()

def ndcctools.work_queue.WorkQueue.task_state (   self,
  taskid 
)

Get current task state.

See work_queue_task_state_t for possible values.

>>> print(q.task_state(taskid))

◆ enable_monitoring()

def ndcctools.work_queue.WorkQueue.enable_monitoring (   self,
  dirname = None,
  watchdog = True 
)

Enables resource monitoring of tasks in the queue, and writes a summary per task to the directory given.

Additionally, all summaries are consolidate into the file all_summaries-PID.log

Returns 1 on success, 0 on failure (i.e., monitoring was not enabled).

Parameters
selfReference to the current work queue object.
dirnameDirectory name for the monitor output.
watchdogIf True (default), kill tasks that exhaust their declared resources.

◆ enable_monitoring_full()

def ndcctools.work_queue.WorkQueue.enable_monitoring_full (   self,
  dirname = None,
  watchdog = True 
)

As ndcctools.work_queue.WorkQueue.enable_monitoring, but it also generates a time series and a debug file.

WARNING: Such files may reach gigabyte sizes for long running tasks.

Returns 1 on success, 0 on failure (i.e., monitoring was not enabled).

Parameters
selfReference to the current work queue object.
dirnameDirectory name for the monitor output.
watchdogIf True (default), kill tasks that exhaust their declared resources.

◆ activate_fast_abort()

def ndcctools.work_queue.WorkQueue.activate_fast_abort (   self,
  multiplier 
)

Turn on or off fast abort functionality for a given queue for tasks in the "default" category, and for task which category does not set an explicit multiplier.

Parameters
selfReference to the current work queue object.
multiplierThe multiplier of the average task time at which point to abort; if negative (the default) fast_abort is deactivated.

◆ activate_fast_abort_category()

def ndcctools.work_queue.WorkQueue.activate_fast_abort_category (   self,
  name,
  multiplier 
)

Turn on or off fast abort functionality for a given queue.

Parameters
selfReference to the current work queue object.
nameName of the category.
multiplierThe multiplier of the average task time at which point to abort; if zero, deacticate for the category, negative (the default), use the one for the "default" category (see ndcctools.work_queue.WorkQueue.activate_fast_abort)

◆ specify_draining_by_hostname()

def ndcctools.work_queue.WorkQueue.specify_draining_by_hostname (   self,
  hostname,
  drain_mode = True 
)

Turn on or off draining mode for workers at hostname.

Parameters
selfReference to the current work queue object.
hostnameThe hostname the host running the workers.
drain_modeIf True, no new tasks are dispatched to workers at hostname, and empty workers are shutdown. Else, workers works as usual.

◆ empty()

def ndcctools.work_queue.WorkQueue.empty (   self)

Determine whether there are any known tasks queued, running, or waiting to be collected.

Returns 0 if there are tasks remaining in the system, 1 if the system is "empty".

Parameters
selfReference to the current work queue object.

◆ hungry()

def ndcctools.work_queue.WorkQueue.hungry (   self)

Determine whether the queue can support more tasks.

Returns the number of additional tasks it can support if "hungry" and 0 if "sated".

Parameters
selfReference to the current work queue object.

◆ specify_algorithm()

def ndcctools.work_queue.WorkQueue.specify_algorithm (   self,
  algorithm 
)

Set the worker selection algorithm for queue.

Parameters
selfReference to the current work queue object.
algorithmOne of the following algorithms to use in assigning a task to a worker. See work_queue_schedule_t for possible values.

◆ specify_task_order()

def ndcctools.work_queue.WorkQueue.specify_task_order (   self,
  order 
)

Set the order for dispatching submitted tasks in the queue.

Parameters
selfReference to the current work queue object.
orderOne of the following algorithms to use in dispatching submitted tasks to workers:

◆ specify_name()

def ndcctools.work_queue.WorkQueue.specify_name (   self,
  name 
)

Change the project name for the given queue.

Parameters
selfReference to the current work queue object.
nameThe new project name.

◆ specify_manager_preferred_connection()

def ndcctools.work_queue.WorkQueue.specify_manager_preferred_connection (   self,
  mode 
)

Set the preference for using hostname over IP address to connect.

'by_ip' uses IP addresses from the network interfaces of the manager (standard behavior), 'by_hostname' to use the hostname at the manager, or 'by_apparent_ip' to use the address of the manager as seen by the catalog server.

Parameters
selfReference to the current work queue object.
modeAn string to indicate using 'by_ip', 'by_hostname' or 'by_apparent_ip'.

◆ specify_master_preferred_connection()

def ndcctools.work_queue.WorkQueue.specify_master_preferred_connection (   self,
  mode 
)

◆ specify_min_taskid()

def ndcctools.work_queue.WorkQueue.specify_min_taskid (   self,
  minid 
)

Set the minimum taskid of future submitted tasks.

Further submitted tasks are guaranteed to have a taskid larger or equal to minid. This function is useful to make taskids consistent in a workflow that consists of sequential managers. (Note: This function is rarely used). If the minimum id provided is smaller than the last taskid computed, the minimum id provided is ignored.

Parameters
selfReference to the current work queue object.
minidMinimum desired taskid
Returns
Returns the actual minimum taskid for future tasks.

◆ specify_priority()

def ndcctools.work_queue.WorkQueue.specify_priority (   self,
  priority 
)

Change the project priority for the given queue.

Parameters
selfReference to the current work queue object.
priorityAn integer that presents the priorty of this work queue manager. The higher the value, the higher the priority.

◆ specify_num_tasks_left()

def ndcctools.work_queue.WorkQueue.specify_num_tasks_left (   self,
  ntasks 
)

Specify the number of tasks not yet submitted to the queue.

It is used by work_queue_factory to determine the number of workers to launch. If not specified, it defaults to 0. work_queue_factory considers the number of tasks as: num tasks left + num tasks running + num tasks read.

Parameters
selfReference to the current work queue object.
ntasksNumber of tasks yet to be submitted.

◆ specify_manager_mode()

def ndcctools.work_queue.WorkQueue.specify_manager_mode (   self,
  mode 
)

Specify the manager mode for the given queue.

(Kept for compatibility. It is no-op.)

Parameters
selfReference to the current work queue object.
modeThis may be one of the following values: WORK_QUEUE_MASTER_MODE_STANDALONE or WORK_QUEUE_MASTER_MODE_CATALOG.

◆ specify_master_mode()

def ndcctools.work_queue.WorkQueue.specify_master_mode (   self,
  mode 
)

◆ specify_catalog_server()

def ndcctools.work_queue.WorkQueue.specify_catalog_server (   self,
  hostname,
  port 
)

Specify the catalog server the manager should report to.

Parameters
selfReference to the current work queue object.
hostnameThe hostname of the catalog server.
portThe port the catalog server is listening on.

◆ specify_log()

def ndcctools.work_queue.WorkQueue.specify_log (   self,
  logfile 
)

Specify a log file that records the cummulative stats of connected workers and submitted tasks.

Parameters
selfReference to the current work queue object.
logfileFilename.

◆ specify_transactions_log()

def ndcctools.work_queue.WorkQueue.specify_transactions_log (   self,
  logfile 
)

Specify a log file that records the states of tasks.

Parameters
selfReference to the current work queue object.
logfileFilename.

◆ specify_password()

def ndcctools.work_queue.WorkQueue.specify_password (   self,
  password 
)

Add a mandatory password that each worker must present.

Parameters
selfReference to the current work queue object.
passwordThe password.

◆ specify_password_file()

def ndcctools.work_queue.WorkQueue.specify_password_file (   self,
  file 
)

Add a mandatory password file that each worker must present.

Parameters
selfReference to the current work queue object.
fileName of the file containing the password.

◆ specify_max_resources()

def ndcctools.work_queue.WorkQueue.specify_max_resources (   self,
  rmd 
)

Specifies the maximum resources allowed for the default category.

Parameters
selfReference to the current work queue object.
rmdDictionary indicating maximum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example:
>>> # A maximum of 4 cores is found on any worker:
>>> q.specify_max_resources({'cores': 4})
>>> # A maximum of 8 cores, 1GB of memory, and 10GB disk are found on any worker:
>>> q.specify_max_resources({'cores': 8, 'memory': 1024, 'disk': 10240})

◆ specify_min_resources()

def ndcctools.work_queue.WorkQueue.specify_min_resources (   self,
  rmd 
)

Specifies the minimum resources allowed for the default category.

Parameters
selfReference to the current work queue object.
rmdDictionary indicating minimum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example:
>>> # A minimum of 2 cores is found on any worker:
>>> q.specify_min_resources({'cores': 2})
>>> # A minimum of 4 cores, 512MB of memory, and 1GB disk are found on any worker:
>>> q.specify_min_resources({'cores': 4, 'memory': 512, 'disk': 1024})

◆ specify_category_max_resources()

def ndcctools.work_queue.WorkQueue.specify_category_max_resources (   self,
  category,
  rmd 
)

Specifies the maximum resources allowed for the given category.

Parameters
selfReference to the current work queue object.
categoryName of the category.
rmdDictionary indicating maximum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example:
>>> # A maximum of 4 cores may be used by a task in the category:
>>> q.specify_category_max_resources("my_category", {'cores': 4})
>>> # A maximum of 8 cores, 1GB of memory, and 10GB may be used by a task:
>>> q.specify_category_max_resources("my_category", {'cores': 8, 'memory': 1024, 'disk': 10240})

◆ specify_category_min_resources()

def ndcctools.work_queue.WorkQueue.specify_category_min_resources (   self,
  category,
  rmd 
)

Specifies the minimum resources allowed for the given category.

Parameters
selfReference to the current work queue object.
categoryName of the category.
rmdDictionary indicating minimum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example:
>>> # A minimum of 2 cores is found on any worker:
>>> q.specify_category_min_resources("my_category", {'cores': 2})
>>> # A minimum of 4 cores, 512MB of memory, and 1GB disk are found on any worker:
>>> q.specify_category_min_resources("my_category", {'cores': 4, 'memory': 512, 'disk': 1024})

◆ specify_category_first_allocation_guess()

def ndcctools.work_queue.WorkQueue.specify_category_first_allocation_guess (   self,
  category,
  rmd 
)

Specifies the first-allocation guess for the given category.

Parameters
selfReference to the current work queue object.
categoryName of the category.
rmdDictionary indicating maximum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example:
>>> # Tasks are first tried with 4 cores:
>>> q.specify_category_first_allocation_guess("my_category", {'cores': 4})
>>> # Tasks are first tried with 8 cores, 1GB of memory, and 10GB:
>>> q.specify_category_first_allocation_guess("my_category", {'cores': 8, 'memory': 1024, 'disk': 10240})

◆ specify_category_max_concurrent()

def ndcctools.work_queue.WorkQueue.specify_category_max_concurrent (   self,
  category,
  max_concurrent 
)

Specifies the maximum resources allowed for the given category.

Parameters
selfReference to the current work queue object.
categoryName of the category.
max_concurrentNumber of maximum concurrent tasks. Less then 0 means unlimited (this is the default). For example:
>>> # Do not run more than 5 tasks of "my_category" concurrently:
>>> q.specify_category_max_concurrent("my_category", 5)

◆ initialize_categories()

def ndcctools.work_queue.WorkQueue.initialize_categories (   self,
  filename,
  rm 
)

Initialize first value of categories.

Parameters
selfReference to the current work queue object.
rmDictionary indicating maximum values. See ndcctools.work_queue.Task.resources_measured for possible fields.
filenameJSON file with resource summaries.

◆ cancel_by_taskid()

def ndcctools.work_queue.WorkQueue.cancel_by_taskid (   self,
  id 
)

Cancel task identified by its taskid and remove from the given queue.

Parameters
selfReference to the current work queue object.
idThe taskid returned from ndcctools.work_queue.WorkQueue.submit.

◆ cancel_by_tasktag()

def ndcctools.work_queue.WorkQueue.cancel_by_tasktag (   self,
  tag 
)

Cancel task identified by its tag and remove from the given queue.

Parameters
selfReference to the current work queue object.
tagThe tag assigned to task using ndcctools.work_queue.Task.specify_tag.

◆ cancel_by_category()

def ndcctools.work_queue.WorkQueue.cancel_by_category (   self,
  category 
)

Cancel all tasks of the given category and remove them from the queue.

Parameters
selfReference to the current work queue object.
categoryThe name of the category to cancel.

◆ shutdown_workers()

def ndcctools.work_queue.WorkQueue.shutdown_workers (   self,
  n 
)

Shutdown workers connected to queue.

Gives a best effort and then returns the number of workers given the shutdown order.

Parameters
selfReference to the current work queue object.
nThe number to shutdown. To shut down all workers, specify "0".

◆ block_host()

def ndcctools.work_queue.WorkQueue.block_host (   self,
  host 
)

Block workers running on host from working for the manager.

Parameters
selfReference to the current work queue object.
hostThe hostname the host running the workers.

◆ blacklist()

def ndcctools.work_queue.WorkQueue.blacklist (   self,
  host 
)

◆ block_host_with_timeout()

def ndcctools.work_queue.WorkQueue.block_host_with_timeout (   self,
  host,
  timeout 
)

Block workers running on host for the duration of the given timeout.

Parameters
selfReference to the current work queue object.
hostThe hostname the host running the workers.
timeoutHow long this block entry lasts (in seconds). If less than 1, block indefinitely.

◆ blacklist_with_timeout()

def ndcctools.work_queue.WorkQueue.blacklist_with_timeout (   self,
  host,
  timeout 
)

◆ unblock_host()

def ndcctools.work_queue.WorkQueue.unblock_host (   self,
  host = None 
)

Unblock given host, of all hosts if host not given.

Parameters
selfReference to the current work queue object.
hostThe of the hostname the host.

◆ blacklist_clear()

def ndcctools.work_queue.WorkQueue.blacklist_clear (   self,
  host = None 
)

◆ invalidate_cache_file()

def ndcctools.work_queue.WorkQueue.invalidate_cache_file (   self,
  local_name 
)

Delete file from workers's caches.

Parameters
selfReference to the current work queue object.
local_nameName of the file as seen by the manager.

◆ specify_keepalive_interval()

def ndcctools.work_queue.WorkQueue.specify_keepalive_interval (   self,
  interval 
)

Change keepalive interval for a given queue.

Parameters
selfReference to the current work queue object.
intervalMinimum number of seconds to wait before sending new keepalive checks to workers.

◆ specify_keepalive_timeout()

def ndcctools.work_queue.WorkQueue.specify_keepalive_timeout (   self,
  timeout 
)

Change keepalive timeout for a given queue.

Parameters
selfReference to the current work queue object.
timeoutMinimum number of seconds to wait for a keepalive response from worker before marking it as dead.

◆ estimate_capacity()

def ndcctools.work_queue.WorkQueue.estimate_capacity (   self)

Turn on manager capacity measurements.

Parameters
selfReference to the current work queue object.

◆ tune()

def ndcctools.work_queue.WorkQueue.tune (   self,
  name,
  value 
)

Tune advanced parameters for work queue.

Parameters
selfReference to the current work queue object.
nameThe name fo the parameter to tune. Can be one of following:
  • "resource-submit-multiplier" Treat each worker as having ({cores,memory,gpus} * multiplier) when submitting tasks. This allows for tasks to wait at a worker rather than the manager. (default = 1.0)
  • "min-transfer-timeout" Set the minimum number of seconds to wait for files to be transferred to or from a worker. (default=10)
  • "foreman-transfer-timeout" Set the minimum number of seconds to wait for files to be transferred to or from a foreman. (default=3600)
  • "transfer-outlier-factor" Transfer that are this many times slower than the average will be aborted. (default=10x)
  • "default-transfer-rate" The assumed network bandwidth used until sufficient data has been collected. (1MB/s)
  • "fast-abort-multiplier" Set the multiplier of the average task time at which point to abort; if negative or zero fast_abort is deactivated. (default=0)
  • "keepalive-interval" Set the minimum number of seconds to wait before sending new keepalive checks to workers. (default=300)
  • "keepalive-timeout" Set the minimum number of seconds to wait for a keepalive response from worker before marking it as dead. (default=30)
  • "short-timeout" Set the minimum timeout when sending a brief message to a single worker. (default=5s)
  • "long-timeout" Set the minimum timeout when sending a brief message to a foreman. (default=1h)
  • "category-steady-n-tasks" Set the number of tasks considered when computing category buckets.
  • "hungry-minimum" Mimimum number of tasks to consider queue not hungry. (default=10)
  • "wait-for-workers" Mimimum number of workers to connect before starting dispatching tasks. (default=0)
  • "attempt-schedule-depth" The amount of tasks to attempt scheduling on each pass of send_one_task in the main loop. (default=100)
  • "wait_retrieve_many" Parameter to alter how work_queue_wait works. If set to 0, work_queue_wait breaks out of the while loop whenever a task changes to WORK_QUEUE_TASK_DONE (wait_retrieve_one mode). If set to 1, work_queue_wait does not break, but continues recieving and dispatching tasks. This occurs until no task is sent or recieved, at which case it breaks out of the while loop (wait_retrieve_many mode). (default=0)
  • "monitor-interval" Parameter to change how frequently the resource monitor records resource consumption of a task in a times series, if this feature is enabled. See enable_monitoring_full.
valueThe value to set the parameter to.
Returns
0 on succes, -1 on failure.

◆ submit()

def ndcctools.work_queue.WorkQueue.submit (   self,
  task 
)

Submit a task to the queue.

It is safe to re-submit a task returned by ndcctools.work_queue.WorkQueue.wait.

Parameters
selfReference to the current work queue object.
taskA task description created from ndcctools.work_queue.Task.

◆ wait()

def ndcctools.work_queue.WorkQueue.wait (   self,
  timeout = WORK_QUEUE_WAITFORTASK 
)

Wait for tasks to complete.

This call will block until the timeout has elapsed

Parameters
selfReference to the current work queue object.
timeoutThe number of seconds to wait for a completed task before returning. Use an integer to set the timeout or the constant WORK_QUEUE_WAITFORTASK to block until a task has completed.

◆ wait_for_tag()

def ndcctools.work_queue.WorkQueue.wait_for_tag (   self,
  tag,
  timeout = WORK_QUEUE_WAITFORTASK 
)

Similar to ndcctools.work_queue.WorkQueue.wait, but guarantees that the returned task has the specified tag.

This call will block until the timeout has elapsed.

Parameters
selfReference to the current work queue object.
tagDesired tag. If None, then it is equivalent to self.wait(timeout)
timeoutThe number of seconds to wait for a completed task before returning.

◆ application_info()

def ndcctools.work_queue.WorkQueue.application_info (   self)

Should return a dictionary with information for the status display.

This method is meant to be overriden by custom applications.

The dictionary should be of the form:

{ "application_info" : {"values" : dict, "units" : dict} }

where "units" is an optional dictionary that indicates the units of the corresponding key in "values".

Parameters
selfReference to the current work queue object.

For example:

>>> myapp.application_info()
{'application_info': {'values': {'size_max_output': 0.361962, 'current_chunksize': 65536}, 'units': {'size_max_output': 'MB'}}}

◆ map()

def ndcctools.work_queue.WorkQueue.map (   self,
  fn,
  seq,
  chunksize = 1 
)

Maps a function to elements in a sequence using work_queue.

Similar to regular map function in python

Parameters
selfReference to the current work queue object.
fnThe function that will be called on each element
seqThe sequence that will call the function
chunksizeThe number of elements to process at once

◆ pair()

def ndcctools.work_queue.WorkQueue.pair (   self,
  fn,
  seq1,
  seq2,
  chunksize = 1,
  env = None 
)

Returns the values for a function of each pair from 2 sequences.

The pairs that are passed into the function are generated by itertools

Parameters
selfReference to the current work queue object.
fnThe function that will be called on each element
seq1The first seq that will be used to generate pairs
seq2The second seq that will be used to generate pairs
chunksizeThe number of pairs to process at once
envPoncho or conda environment tarball filename

◆ tree_reduce()

def ndcctools.work_queue.WorkQueue.tree_reduce (   self,
  fn,
  seq,
  chunksize = 2 
)

Reduces a sequence until only one value is left, and then returns that value.

The sequence is reduced by passing a pair of elements into a function and then stores the result. It then makes a sequence from the results, and reduces again until one value is left.

If the sequence has an odd length, the last element gets reduced at the end.

Parameters
selfReference to the current work queue object.
fnThe function that will be called on each element
seqThe seq that will be reduced
chunksizeThe number of elements per Task (for tree reduc, must be greater than 1)

◆ remote_map()

def ndcctools.work_queue.WorkQueue.remote_map (   self,
  fn,
  seq,
  coprocess,
  name,
  chunksize = 1 
)

Maps a function to elements in a sequence using work_queue remote task.

Similar to regular map function in python, but creates a task to execute each function on a worker running a coprocess

Parameters
selfReference to the current work queue object.
fnThe function that will be called on each element. This function exists in coprocess.
seqThe sequence that will call the function
coprocessThe name of the coprocess that contains the function fn.
nameThis defines the key in the event json that wraps the data sent to the coprocess.
chunksizeThe number of elements to process at once

◆ remote_pair()

def ndcctools.work_queue.WorkQueue.remote_pair (   self,
  fn,
  seq1,
  seq2,
  coprocess,
  name,
  chunksize = 1 
)

Returns the values for a function of each pair from 2 sequences using remote task.

The pairs that are passed into the function are generated by itertools

Parameters
selfReference to the current work queue object.
fnThe function that will be called on each element. This function exists in coprocess.
seq1The first seq that will be used to generate pairs
seq2The second seq that will be used to generate pairs
coprocessThe name of the coprocess that contains the function fn.
nameThis defines the key in the event json that wraps the data sent to the coprocess.
chunksizeThe number of elements to process at once

◆ remote_tree_reduce()

def ndcctools.work_queue.WorkQueue.remote_tree_reduce (   self,
  fn,
  seq,
  coprocess,
  name,
  chunksize = 2 
)

Reduces a sequence until only one value is left, and then returns that value.

The sequence is reduced by passing a pair of elements into a function and then stores the result. It then makes a sequence from the results, and reduces again until one value is left. Executes on coprocess

If the sequence has an odd length, the last element gets reduced at the end.

Parameters
selfReference to the current work queue object.
fnThe function that will be called on each element. Exists on the coprocess
seqThe seq that will be reduced
coprocessThe name of the coprocess that contains the function fn.
nameThis defines the key in the event json that wraps the data sent to the coprocess.
chunksizeThe number of elements per Task (for tree reduc, must be greater than 1)

The documentation for this class was generated from the following file: