cctools
|
Python Work Queue object. More...
Public Member Functions | |
def | __init__ (self, port=WORK_QUEUE_DEFAULT_PORT, name=None, shutdown=False, stats_log=None, transactions_log=None, debug_log=None, ssl=None, status_display_interval=None) |
Create a new work queue. More... | |
def | name (self) |
Get the project name of the queue. More... | |
def | port (self) |
Get the listening port of the queue. More... | |
def | using_ssl (self) |
Whether the manager is using ssl to talk to workers. More... | |
def | stats (self) |
Get queue statistics. More... | |
def | stats_hierarchy (self) |
Get worker hierarchy statistics. More... | |
def | stats_category (self, category) |
Get the task statistics for the given category. More... | |
def | status (self, request) |
Get queue information as list of dictionaries. More... | |
def | workers_summary (self) |
Get resource statistics of workers connected. More... | |
def | specify_category_mode (self, category, mode) |
Turn on or off first-allocation labeling for a given category. More... | |
def | specify_category_autolabel_resource (self, category, resource, autolabel) |
Turn on or off first-allocation labeling for a given category and resource. More... | |
def | task_state (self, taskid) |
Get current task state. More... | |
def | enable_monitoring (self, dirname=None, watchdog=True) |
Enables resource monitoring of tasks in the queue, and writes a summary per task to the directory given. More... | |
def | enable_monitoring_full (self, dirname=None, watchdog=True) |
As ndcctools.work_queue.WorkQueue.enable_monitoring, but it also generates a time series and a debug file. More... | |
def | activate_fast_abort (self, multiplier) |
Turn on or off fast abort functionality for a given queue for tasks in the "default" category, and for task which category does not set an explicit multiplier. More... | |
def | activate_fast_abort_category (self, name, multiplier) |
Turn on or off fast abort functionality for a given queue. More... | |
def | specify_draining_by_hostname (self, hostname, drain_mode=True) |
Turn on or off draining mode for workers at hostname. More... | |
def | empty (self) |
Determine whether there are any known tasks queued, running, or waiting to be collected. More... | |
def | hungry (self) |
Determine whether the queue can support more tasks. More... | |
def | specify_algorithm (self, algorithm) |
Set the worker selection algorithm for queue. More... | |
def | specify_task_order (self, order) |
Set the order for dispatching submitted tasks in the queue. More... | |
def | specify_name (self, name) |
Change the project name for the given queue. More... | |
def | specify_manager_preferred_connection (self, mode) |
Set the preference for using hostname over IP address to connect. More... | |
def | specify_master_preferred_connection (self, mode) |
See ndcctools.work_queue.WorkQueue.specify_manager_preferred_connection. More... | |
def | specify_min_taskid (self, minid) |
Set the minimum taskid of future submitted tasks. More... | |
def | specify_priority (self, priority) |
Change the project priority for the given queue. More... | |
def | specify_num_tasks_left (self, ntasks) |
Specify the number of tasks not yet submitted to the queue. More... | |
def | specify_manager_mode (self, mode) |
Specify the manager mode for the given queue. More... | |
def | specify_master_mode (self, mode) |
def | specify_catalog_server (self, hostname, port) |
Specify the catalog server the manager should report to. More... | |
def | specify_log (self, logfile) |
Specify a log file that records the cummulative stats of connected workers and submitted tasks. More... | |
def | specify_transactions_log (self, logfile) |
Specify a log file that records the states of tasks. More... | |
def | specify_password (self, password) |
Add a mandatory password that each worker must present. More... | |
def | specify_password_file (self, file) |
Add a mandatory password file that each worker must present. More... | |
def | specify_max_resources (self, rmd) |
Specifies the maximum resources allowed for the default category. More... | |
def | specify_min_resources (self, rmd) |
Specifies the minimum resources allowed for the default category. More... | |
def | specify_category_max_resources (self, category, rmd) |
Specifies the maximum resources allowed for the given category. More... | |
def | specify_category_min_resources (self, category, rmd) |
Specifies the minimum resources allowed for the given category. More... | |
def | specify_category_first_allocation_guess (self, category, rmd) |
Specifies the first-allocation guess for the given category. More... | |
def | specify_category_max_concurrent (self, category, max_concurrent) |
Specifies the maximum resources allowed for the given category. More... | |
def | initialize_categories (self, filename, rm) |
Initialize first value of categories. More... | |
def | cancel_by_taskid (self, id) |
Cancel task identified by its taskid and remove from the given queue. More... | |
def | cancel_by_tasktag (self, tag) |
Cancel task identified by its tag and remove from the given queue. More... | |
def | cancel_by_category (self, category) |
Cancel all tasks of the given category and remove them from the queue. More... | |
def | shutdown_workers (self, n) |
Shutdown workers connected to queue. More... | |
def | block_host (self, host) |
Block workers running on host from working for the manager. More... | |
def | blacklist (self, host) |
Replaced by ndcctools.work_queue.WorkQueue.block_host. More... | |
def | block_host_with_timeout (self, host, timeout) |
Block workers running on host for the duration of the given timeout. More... | |
def | blacklist_with_timeout (self, host, timeout) |
See ndcctools.work_queue.WorkQueue.block_host_with_timeout. More... | |
def | unblock_host (self, host=None) |
Unblock given host, of all hosts if host not given. More... | |
def | blacklist_clear (self, host=None) |
See ndcctools.work_queue.WorkQueue.unblock_host. More... | |
def | invalidate_cache_file (self, local_name) |
Delete file from workers's caches. More... | |
def | specify_keepalive_interval (self, interval) |
Change keepalive interval for a given queue. More... | |
def | specify_keepalive_timeout (self, timeout) |
Change keepalive timeout for a given queue. More... | |
def | estimate_capacity (self) |
Turn on manager capacity measurements. More... | |
def | tune (self, name, value) |
Tune advanced parameters for work queue. More... | |
def | submit (self, task) |
Submit a task to the queue. More... | |
def | wait (self, timeout=WORK_QUEUE_WAITFORTASK) |
Wait for tasks to complete. More... | |
def | wait_for_tag (self, tag, timeout=WORK_QUEUE_WAITFORTASK) |
Similar to ndcctools.work_queue.WorkQueue.wait, but guarantees that the returned task has the specified tag. More... | |
def | application_info (self) |
Should return a dictionary with information for the status display. More... | |
def | map (self, fn, seq, chunksize=1) |
Maps a function to elements in a sequence using work_queue. More... | |
def | pair (self, fn, seq1, seq2, chunksize=1, env=None) |
Returns the values for a function of each pair from 2 sequences. More... | |
def | tree_reduce (self, fn, seq, chunksize=2) |
Reduces a sequence until only one value is left, and then returns that value. More... | |
def | remote_map (self, fn, seq, coprocess, name, chunksize=1) |
Maps a function to elements in a sequence using work_queue remote task. More... | |
def | remote_pair (self, fn, seq1, seq2, coprocess, name, chunksize=1) |
Returns the values for a function of each pair from 2 sequences using remote task. More... | |
def | remote_tree_reduce (self, fn, seq, coprocess, name, chunksize=2) |
Reduces a sequence until only one value is left, and then returns that value. More... | |
Python Work Queue object.
def ndcctools.work_queue.WorkQueue.__init__ | ( | self, | |
port = WORK_QUEUE_DEFAULT_PORT , |
|||
name = None , |
|||
shutdown = False , |
|||
stats_log = None , |
|||
transactions_log = None , |
|||
debug_log = None , |
|||
ssl = None , |
|||
status_display_interval = None |
|||
) |
Create a new work queue.
self | Reference to the current work queue object. |
port | The port number to listen on. If zero, then a random port is chosen. A range of possible ports (low, hight) can be also specified instead of a single integer. |
name | The project name to use. |
stats_log | The name of a file to write the queue's statistics log. |
transactions_log | The name of a file to write the queue's transactions log. |
debug_log | The name of a file to write the queue's debug log. |
shutdown | Automatically shutdown workers when queue is finished. Disabled by default. |
ssl | A tuple of filenames (ssl_key, ssl_cert) in pem format, or True. If not given, then TSL is not activated. If True, a self-signed temporary key and cert are generated. |
status_display_interval | Number of seconds between updates to the jupyter status display. None, or less than 1 disables it. |
def ndcctools.work_queue.WorkQueue.name | ( | self | ) |
Get the project name of the queue.
def ndcctools.work_queue.WorkQueue.port | ( | self | ) |
Get the listening port of the queue.
def ndcctools.work_queue.WorkQueue.using_ssl | ( | self | ) |
Whether the manager is using ssl to talk to workers.
def ndcctools.work_queue.WorkQueue.stats | ( | self | ) |
Get queue statistics.
The fields in ndcctools.work_queue.WorkQueue.stats can also be individually accessed through this call. For example:
def ndcctools.work_queue.WorkQueue.stats_hierarchy | ( | self | ) |
Get worker hierarchy statistics.
The fields in ndcctools.work_queue.WorkQueue.stats_hierarchy can also be individually accessed through this call. For example:
def ndcctools.work_queue.WorkQueue.stats_category | ( | self, | |
category | |||
) |
Get the task statistics for the given category.
self | Reference to the current work queue object. |
category | A category name. For example: s = q.stats_category("my_category")
>>> print(s)
>>> print(s.tasks_waiting)
|
def ndcctools.work_queue.WorkQueue.status | ( | self, | |
request | |||
) |
Get queue information as list of dictionaries.
self | Reference to the current work queue object |
request | One of: "queue", "tasks", "workers", or "categories" For example: import json
tasks_info = q.status("tasks")
|
def ndcctools.work_queue.WorkQueue.workers_summary | ( | self | ) |
Get resource statistics of workers connected.
self | Reference to the current work queue object. |
def ndcctools.work_queue.WorkQueue.specify_category_mode | ( | self, | |
category, | |||
mode | |||
) |
Turn on or off first-allocation labeling for a given category.
By default, only cores, memory, and disk are labeled, and gpus are unlabeled. NOTE: autolabeling is only meaningfull when task monitoring is enabled (ndcctools.work_queue.WorkQueue.enable_monitoring). When monitoring is enabled and a task exhausts resources in a worker, mode dictates how work queue handles the exhaustion:
self | Reference to the current work queue object. |
category | A category name. If None, sets the mode by default for newly created categories. |
mode | One of:
|
def ndcctools.work_queue.WorkQueue.specify_category_autolabel_resource | ( | self, | |
category, | |||
resource, | |||
autolabel | |||
) |
Turn on or off first-allocation labeling for a given category and resource.
This function should be use to fine-tune the defaults from ndcctools.work_queue.WorkQueue.specify_category_mode.
self | Reference to the current work queue object. |
category | A category name. |
resource | A resource name. |
autolabel | True/False for on/off. |
def ndcctools.work_queue.WorkQueue.task_state | ( | self, | |
taskid | |||
) |
Get current task state.
See work_queue_task_state_t for possible values.
def ndcctools.work_queue.WorkQueue.enable_monitoring | ( | self, | |
dirname = None , |
|||
watchdog = True |
|||
) |
Enables resource monitoring of tasks in the queue, and writes a summary per task to the directory given.
Additionally, all summaries are consolidate into the file all_summaries-PID.log
Returns 1 on success, 0 on failure (i.e., monitoring was not enabled).
self | Reference to the current work queue object. |
dirname | Directory name for the monitor output. |
watchdog | If True (default), kill tasks that exhaust their declared resources. |
def ndcctools.work_queue.WorkQueue.enable_monitoring_full | ( | self, | |
dirname = None , |
|||
watchdog = True |
|||
) |
As ndcctools.work_queue.WorkQueue.enable_monitoring, but it also generates a time series and a debug file.
WARNING: Such files may reach gigabyte sizes for long running tasks.
Returns 1 on success, 0 on failure (i.e., monitoring was not enabled).
self | Reference to the current work queue object. |
dirname | Directory name for the monitor output. |
watchdog | If True (default), kill tasks that exhaust their declared resources. |
def ndcctools.work_queue.WorkQueue.activate_fast_abort | ( | self, | |
multiplier | |||
) |
Turn on or off fast abort functionality for a given queue for tasks in the "default" category, and for task which category does not set an explicit multiplier.
self | Reference to the current work queue object. |
multiplier | The multiplier of the average task time at which point to abort; if negative (the default) fast_abort is deactivated. |
def ndcctools.work_queue.WorkQueue.activate_fast_abort_category | ( | self, | |
name, | |||
multiplier | |||
) |
Turn on or off fast abort functionality for a given queue.
self | Reference to the current work queue object. |
name | Name of the category. |
multiplier | The multiplier of the average task time at which point to abort; if zero, deacticate for the category, negative (the default), use the one for the "default" category (see ndcctools.work_queue.WorkQueue.activate_fast_abort) |
def ndcctools.work_queue.WorkQueue.specify_draining_by_hostname | ( | self, | |
hostname, | |||
drain_mode = True |
|||
) |
Turn on or off draining mode for workers at hostname.
self | Reference to the current work queue object. |
hostname | The hostname the host running the workers. |
drain_mode | If True, no new tasks are dispatched to workers at hostname, and empty workers are shutdown. Else, workers works as usual. |
def ndcctools.work_queue.WorkQueue.empty | ( | self | ) |
Determine whether there are any known tasks queued, running, or waiting to be collected.
Returns 0 if there are tasks remaining in the system, 1 if the system is "empty".
self | Reference to the current work queue object. |
def ndcctools.work_queue.WorkQueue.hungry | ( | self | ) |
Determine whether the queue can support more tasks.
Returns the number of additional tasks it can support if "hungry" and 0 if "sated".
self | Reference to the current work queue object. |
def ndcctools.work_queue.WorkQueue.specify_algorithm | ( | self, | |
algorithm | |||
) |
Set the worker selection algorithm for queue.
self | Reference to the current work queue object. |
algorithm | One of the following algorithms to use in assigning a task to a worker. See work_queue_schedule_t for possible values. |
def ndcctools.work_queue.WorkQueue.specify_task_order | ( | self, | |
order | |||
) |
Set the order for dispatching submitted tasks in the queue.
self | Reference to the current work queue object. |
order | One of the following algorithms to use in dispatching submitted tasks to workers: |
def ndcctools.work_queue.WorkQueue.specify_name | ( | self, | |
name | |||
) |
Change the project name for the given queue.
self | Reference to the current work queue object. |
name | The new project name. |
def ndcctools.work_queue.WorkQueue.specify_manager_preferred_connection | ( | self, | |
mode | |||
) |
Set the preference for using hostname over IP address to connect.
'by_ip' uses IP addresses from the network interfaces of the manager (standard behavior), 'by_hostname' to use the hostname at the manager, or 'by_apparent_ip' to use the address of the manager as seen by the catalog server.
self | Reference to the current work queue object. |
mode | An string to indicate using 'by_ip', 'by_hostname' or 'by_apparent_ip'. |
def ndcctools.work_queue.WorkQueue.specify_master_preferred_connection | ( | self, | |
mode | |||
) |
def ndcctools.work_queue.WorkQueue.specify_min_taskid | ( | self, | |
minid | |||
) |
Set the minimum taskid of future submitted tasks.
Further submitted tasks are guaranteed to have a taskid larger or equal to minid. This function is useful to make taskids consistent in a workflow that consists of sequential managers. (Note: This function is rarely used). If the minimum id provided is smaller than the last taskid computed, the minimum id provided is ignored.
self | Reference to the current work queue object. |
minid | Minimum desired taskid |
def ndcctools.work_queue.WorkQueue.specify_priority | ( | self, | |
priority | |||
) |
Change the project priority for the given queue.
self | Reference to the current work queue object. |
priority | An integer that presents the priorty of this work queue manager. The higher the value, the higher the priority. |
def ndcctools.work_queue.WorkQueue.specify_num_tasks_left | ( | self, | |
ntasks | |||
) |
Specify the number of tasks not yet submitted to the queue.
It is used by work_queue_factory to determine the number of workers to launch. If not specified, it defaults to 0. work_queue_factory considers the number of tasks as: num tasks left + num tasks running + num tasks read.
self | Reference to the current work queue object. |
ntasks | Number of tasks yet to be submitted. |
def ndcctools.work_queue.WorkQueue.specify_manager_mode | ( | self, | |
mode | |||
) |
Specify the manager mode for the given queue.
(Kept for compatibility. It is no-op.)
self | Reference to the current work queue object. |
mode | This may be one of the following values: WORK_QUEUE_MASTER_MODE_STANDALONE or WORK_QUEUE_MASTER_MODE_CATALOG. |
def ndcctools.work_queue.WorkQueue.specify_master_mode | ( | self, | |
mode | |||
) |
def ndcctools.work_queue.WorkQueue.specify_catalog_server | ( | self, | |
hostname, | |||
port | |||
) |
Specify the catalog server the manager should report to.
self | Reference to the current work queue object. |
hostname | The hostname of the catalog server. |
port | The port the catalog server is listening on. |
def ndcctools.work_queue.WorkQueue.specify_log | ( | self, | |
logfile | |||
) |
Specify a log file that records the cummulative stats of connected workers and submitted tasks.
self | Reference to the current work queue object. |
logfile | Filename. |
def ndcctools.work_queue.WorkQueue.specify_transactions_log | ( | self, | |
logfile | |||
) |
Specify a log file that records the states of tasks.
self | Reference to the current work queue object. |
logfile | Filename. |
def ndcctools.work_queue.WorkQueue.specify_password | ( | self, | |
password | |||
) |
Add a mandatory password that each worker must present.
self | Reference to the current work queue object. |
password | The password. |
def ndcctools.work_queue.WorkQueue.specify_password_file | ( | self, | |
file | |||
) |
Add a mandatory password file that each worker must present.
self | Reference to the current work queue object. |
file | Name of the file containing the password. |
def ndcctools.work_queue.WorkQueue.specify_max_resources | ( | self, | |
rmd | |||
) |
Specifies the maximum resources allowed for the default category.
self | Reference to the current work queue object. |
rmd | Dictionary indicating maximum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example: >>> # A maximum of 4 cores is found on any worker:
>>> q.specify_max_resources({'cores': 4})
>>> # A maximum of 8 cores, 1GB of memory, and 10GB disk are found on any worker:
>>> q.specify_max_resources({'cores': 8, 'memory': 1024, 'disk': 10240})
|
def ndcctools.work_queue.WorkQueue.specify_min_resources | ( | self, | |
rmd | |||
) |
Specifies the minimum resources allowed for the default category.
self | Reference to the current work queue object. |
rmd | Dictionary indicating minimum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example: >>> # A minimum of 2 cores is found on any worker:
>>> q.specify_min_resources({'cores': 2})
>>> # A minimum of 4 cores, 512MB of memory, and 1GB disk are found on any worker:
>>> q.specify_min_resources({'cores': 4, 'memory': 512, 'disk': 1024})
|
def ndcctools.work_queue.WorkQueue.specify_category_max_resources | ( | self, | |
category, | |||
rmd | |||
) |
Specifies the maximum resources allowed for the given category.
self | Reference to the current work queue object. |
category | Name of the category. |
rmd | Dictionary indicating maximum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example: >>> # A maximum of 4 cores may be used by a task in the category:
>>> q.specify_category_max_resources("my_category", {'cores': 4})
>>> # A maximum of 8 cores, 1GB of memory, and 10GB may be used by a task:
>>> q.specify_category_max_resources("my_category", {'cores': 8, 'memory': 1024, 'disk': 10240})
|
def ndcctools.work_queue.WorkQueue.specify_category_min_resources | ( | self, | |
category, | |||
rmd | |||
) |
Specifies the minimum resources allowed for the given category.
self | Reference to the current work queue object. |
category | Name of the category. |
rmd | Dictionary indicating minimum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example: >>> # A minimum of 2 cores is found on any worker:
>>> q.specify_category_min_resources("my_category", {'cores': 2})
>>> # A minimum of 4 cores, 512MB of memory, and 1GB disk are found on any worker:
>>> q.specify_category_min_resources("my_category", {'cores': 4, 'memory': 512, 'disk': 1024})
|
def ndcctools.work_queue.WorkQueue.specify_category_first_allocation_guess | ( | self, | |
category, | |||
rmd | |||
) |
Specifies the first-allocation guess for the given category.
self | Reference to the current work queue object. |
category | Name of the category. |
rmd | Dictionary indicating maximum values. See ndcctools.work_queue.Task.resources_measured for possible fields. For example: >>> # Tasks are first tried with 4 cores:
>>> q.specify_category_first_allocation_guess("my_category", {'cores': 4})
>>> # Tasks are first tried with 8 cores, 1GB of memory, and 10GB:
>>> q.specify_category_first_allocation_guess("my_category", {'cores': 8, 'memory': 1024, 'disk': 10240})
|
def ndcctools.work_queue.WorkQueue.specify_category_max_concurrent | ( | self, | |
category, | |||
max_concurrent | |||
) |
Specifies the maximum resources allowed for the given category.
self | Reference to the current work queue object. |
category | Name of the category. |
max_concurrent | Number of maximum concurrent tasks. Less then 0 means unlimited (this is the default). For example: >>> # Do not run more than 5 tasks of "my_category" concurrently:
>>> q.specify_category_max_concurrent("my_category", 5)
|
def ndcctools.work_queue.WorkQueue.initialize_categories | ( | self, | |
filename, | |||
rm | |||
) |
Initialize first value of categories.
self | Reference to the current work queue object. |
rm | Dictionary indicating maximum values. See ndcctools.work_queue.Task.resources_measured for possible fields. |
filename | JSON file with resource summaries. |
def ndcctools.work_queue.WorkQueue.cancel_by_taskid | ( | self, | |
id | |||
) |
Cancel task identified by its taskid and remove from the given queue.
self | Reference to the current work queue object. |
id | The taskid returned from ndcctools.work_queue.WorkQueue.submit. |
def ndcctools.work_queue.WorkQueue.cancel_by_tasktag | ( | self, | |
tag | |||
) |
Cancel task identified by its tag and remove from the given queue.
self | Reference to the current work queue object. |
tag | The tag assigned to task using ndcctools.work_queue.Task.specify_tag. |
def ndcctools.work_queue.WorkQueue.cancel_by_category | ( | self, | |
category | |||
) |
Cancel all tasks of the given category and remove them from the queue.
self | Reference to the current work queue object. |
category | The name of the category to cancel. |
def ndcctools.work_queue.WorkQueue.shutdown_workers | ( | self, | |
n | |||
) |
Shutdown workers connected to queue.
Gives a best effort and then returns the number of workers given the shutdown order.
self | Reference to the current work queue object. |
n | The number to shutdown. To shut down all workers, specify "0". |
def ndcctools.work_queue.WorkQueue.block_host | ( | self, | |
host | |||
) |
Block workers running on host from working for the manager.
self | Reference to the current work queue object. |
host | The hostname the host running the workers. |
def ndcctools.work_queue.WorkQueue.blacklist | ( | self, | |
host | |||
) |
Replaced by ndcctools.work_queue.WorkQueue.block_host.
def ndcctools.work_queue.WorkQueue.block_host_with_timeout | ( | self, | |
host, | |||
timeout | |||
) |
Block workers running on host for the duration of the given timeout.
self | Reference to the current work queue object. |
host | The hostname the host running the workers. |
timeout | How long this block entry lasts (in seconds). If less than 1, block indefinitely. |
def ndcctools.work_queue.WorkQueue.blacklist_with_timeout | ( | self, | |
host, | |||
timeout | |||
) |
def ndcctools.work_queue.WorkQueue.unblock_host | ( | self, | |
host = None |
|||
) |
Unblock given host, of all hosts if host not given.
self | Reference to the current work queue object. |
host | The of the hostname the host. |
def ndcctools.work_queue.WorkQueue.blacklist_clear | ( | self, | |
host = None |
|||
) |
def ndcctools.work_queue.WorkQueue.invalidate_cache_file | ( | self, | |
local_name | |||
) |
Delete file from workers's caches.
self | Reference to the current work queue object. |
local_name | Name of the file as seen by the manager. |
def ndcctools.work_queue.WorkQueue.specify_keepalive_interval | ( | self, | |
interval | |||
) |
Change keepalive interval for a given queue.
self | Reference to the current work queue object. |
interval | Minimum number of seconds to wait before sending new keepalive checks to workers. |
def ndcctools.work_queue.WorkQueue.specify_keepalive_timeout | ( | self, | |
timeout | |||
) |
Change keepalive timeout for a given queue.
self | Reference to the current work queue object. |
timeout | Minimum number of seconds to wait for a keepalive response from worker before marking it as dead. |
def ndcctools.work_queue.WorkQueue.estimate_capacity | ( | self | ) |
Turn on manager capacity measurements.
self | Reference to the current work queue object. |
def ndcctools.work_queue.WorkQueue.tune | ( | self, | |
name, | |||
value | |||
) |
Tune advanced parameters for work queue.
self | Reference to the current work queue object. |
name | The name fo the parameter to tune. Can be one of following:
|
value | The value to set the parameter to. |
def ndcctools.work_queue.WorkQueue.submit | ( | self, | |
task | |||
) |
Submit a task to the queue.
It is safe to re-submit a task returned by ndcctools.work_queue.WorkQueue.wait.
self | Reference to the current work queue object. |
task | A task description created from ndcctools.work_queue.Task. |
def ndcctools.work_queue.WorkQueue.wait | ( | self, | |
timeout = WORK_QUEUE_WAITFORTASK |
|||
) |
Wait for tasks to complete.
This call will block until the timeout has elapsed
self | Reference to the current work queue object. |
timeout | The number of seconds to wait for a completed task before returning. Use an integer to set the timeout or the constant WORK_QUEUE_WAITFORTASK to block until a task has completed. |
def ndcctools.work_queue.WorkQueue.wait_for_tag | ( | self, | |
tag, | |||
timeout = WORK_QUEUE_WAITFORTASK |
|||
) |
Similar to ndcctools.work_queue.WorkQueue.wait, but guarantees that the returned task has the specified tag.
This call will block until the timeout has elapsed.
self | Reference to the current work queue object. |
tag | Desired tag. If None, then it is equivalent to self.wait(timeout) |
timeout | The number of seconds to wait for a completed task before returning. |
def ndcctools.work_queue.WorkQueue.application_info | ( | self | ) |
Should return a dictionary with information for the status display.
This method is meant to be overriden by custom applications.
The dictionary should be of the form:
{ "application_info" : {"values" : dict, "units" : dict} }
where "units" is an optional dictionary that indicates the units of the corresponding key in "values".
self | Reference to the current work queue object. |
For example:
def ndcctools.work_queue.WorkQueue.map | ( | self, | |
fn, | |||
seq, | |||
chunksize = 1 |
|||
) |
Maps a function to elements in a sequence using work_queue.
Similar to regular map function in python
self | Reference to the current work queue object. |
fn | The function that will be called on each element |
seq | The sequence that will call the function |
chunksize | The number of elements to process at once |
def ndcctools.work_queue.WorkQueue.pair | ( | self, | |
fn, | |||
seq1, | |||
seq2, | |||
chunksize = 1 , |
|||
env = None |
|||
) |
Returns the values for a function of each pair from 2 sequences.
The pairs that are passed into the function are generated by itertools
self | Reference to the current work queue object. |
fn | The function that will be called on each element |
seq1 | The first seq that will be used to generate pairs |
seq2 | The second seq that will be used to generate pairs |
chunksize | The number of pairs to process at once |
env | Poncho or conda environment tarball filename |
def ndcctools.work_queue.WorkQueue.tree_reduce | ( | self, | |
fn, | |||
seq, | |||
chunksize = 2 |
|||
) |
Reduces a sequence until only one value is left, and then returns that value.
The sequence is reduced by passing a pair of elements into a function and then stores the result. It then makes a sequence from the results, and reduces again until one value is left.
If the sequence has an odd length, the last element gets reduced at the end.
self | Reference to the current work queue object. |
fn | The function that will be called on each element |
seq | The seq that will be reduced |
chunksize | The number of elements per Task (for tree reduc, must be greater than 1) |
def ndcctools.work_queue.WorkQueue.remote_map | ( | self, | |
fn, | |||
seq, | |||
coprocess, | |||
name, | |||
chunksize = 1 |
|||
) |
Maps a function to elements in a sequence using work_queue remote task.
Similar to regular map function in python, but creates a task to execute each function on a worker running a coprocess
self | Reference to the current work queue object. |
fn | The function that will be called on each element. This function exists in coprocess. |
seq | The sequence that will call the function |
coprocess | The name of the coprocess that contains the function fn. |
name | This defines the key in the event json that wraps the data sent to the coprocess. |
chunksize | The number of elements to process at once |
def ndcctools.work_queue.WorkQueue.remote_pair | ( | self, | |
fn, | |||
seq1, | |||
seq2, | |||
coprocess, | |||
name, | |||
chunksize = 1 |
|||
) |
Returns the values for a function of each pair from 2 sequences using remote task.
The pairs that are passed into the function are generated by itertools
self | Reference to the current work queue object. |
fn | The function that will be called on each element. This function exists in coprocess. |
seq1 | The first seq that will be used to generate pairs |
seq2 | The second seq that will be used to generate pairs |
coprocess | The name of the coprocess that contains the function fn. |
name | This defines the key in the event json that wraps the data sent to the coprocess. |
chunksize | The number of elements to process at once |
def ndcctools.work_queue.WorkQueue.remote_tree_reduce | ( | self, | |
fn, | |||
seq, | |||
coprocess, | |||
name, | |||
chunksize = 2 |
|||
) |
Reduces a sequence until only one value is left, and then returns that value.
The sequence is reduced by passing a pair of elements into a function and then stores the result. It then makes a sequence from the results, and reduces again until one value is left. Executes on coprocess
If the sequence has an odd length, the last element gets reduced at the end.
self | Reference to the current work queue object. |
fn | The function that will be called on each element. Exists on the coprocess |
seq | The seq that will be reduced |
coprocess | The name of the coprocess that contains the function fn. |
name | This defines the key in the event json that wraps the data sent to the coprocess. |
chunksize | The number of elements per Task (for tree reduc, must be greater than 1) |