cctools
ndcctools.work_queue.Task Class Reference
Inheritance diagram for ndcctools.work_queue.Task:
ndcctools.work_queue.PythonTask ndcctools.work_queue.RemoteTask

Public Member Functions

def __init__ (self, command)
 Create a new task specification. More...
 
def clone (self)
 Return a copy of this task. More...
 
def specify_command (self, command)
 Set the command to be executed by the task. More...
 
def specify_coprocess (self, coprocess)
 Set the coprocess at the worker that should execute the task's command. More...
 
def specify_algorithm (self, algorithm)
 Set the worker selection algorithm for task. More...
 
def specify_tag (self, tag)
 Attach a user defined logical name to the task. More...
 
def specify_category (self, name)
 Label the task with the given category. More...
 
def specify_feature (self, name)
 Label the task with the given user-defined feature. More...
 
def specify_file (self, local_name, remote_name=None, type=None, flags=None, cache=None, failure_only=None)
 Add a file to the task. More...
 
def specify_url (self, url, remote_name, type=None, flags=None, cache=None, failure_only=None)
 Add a url to the task which will be provided as an input file. More...
 
def specify_file_command (self, cmd, remote_name, type=None, flags=None, cache=None, failure_only=None)
 Add an input file produced by a Unix shell command. More...
 
def specify_file_piece (self, local_name, remote_name=None, start_byte=0, end_byte=0, type=None, flags=None, cache=None, failure_only=None)
 Add a file piece to the task. More...
 
def specify_input_file (self, local_name, remote_name=None, flags=None, cache=None)
 Add a input file to the task. More...
 
def specify_output_file (self, local_name, remote_name=None, flags=None, cache=None, failure_only=None)
 Add a output file to the task. More...
 
def specify_directory (self, local_name, remote_name=None, type=None, flags=None, recursive=False, cache=None, failure_only=None)
 Add a directory to the task. More...
 
def specify_buffer (self, buffer, remote_name, flags=None, cache=None)
 Add an input bufer to the task. More...
 
def specify_snapshot_file (self, filename)
 When monitoring, indicates a json-encoded file that instructs the monitor to take a snapshot of the task resources. More...
 
def specify_max_retries (self, max_retries)
 Indicate the number of times the task should be retried. More...
 
def specify_cores (self, cores)
 Indicate the number of cores required by this task. More...
 
def specify_memory (self, memory)
 Indicate the memory (in MB) required by this task. More...
 
def specify_disk (self, disk)
 Indicate the disk space (in MB) required by this task. More...
 
def specify_gpus (self, gpus)
 Indicate the number of GPUs required by this task. More...
 
def specify_priority (self, priority)
 Indicate the the priority of this task (larger means better priority, default is 0). More...
 
def specify_environment_variable (self, name, value=None)
 Set this environment variable before running the task. More...
 
def specify_monitor_output (self, directory)
 Set a name for the resource summary output directory from the monitor. More...
 
def tag (self)
 Get the user-defined logical name for the task. More...
 
def category (self)
 Get the category name for the task. More...
 
def command (self)
 Get the shell command executed by the task. More...
 
def priority (self)
 Get the priority of the task. More...
 
def algorithm (self)
 Get the algorithm for choosing worker to run the task. More...
 
def std_output (self)
 Get the standard output of the task. More...
 
def output (self)
 Get the standard output of the task. More...
 
def id (self)
 Get the task id number. More...
 
def return_status (self)
 Get the exit code of the command executed by the task. More...
 
def result (self)
 Get the result of the task as an integer code, such as successful, missing file, etc. More...
 
def result_str (self)
 Return a string that explains the result of a task. More...
 
def total_submissions (self)
 Get the number of times the task has been resubmitted internally. More...
 
def exhausted_attempts (self)
 Get the number of times the task has been failed given resource exhaustion. More...
 
def host (self)
 Get the address and port of the host on which the task ran. More...
 
def hostname (self)
 Get the name of the host on which the task ran. More...
 
def submit_time (self)
 Get the time at which this task was submitted. More...
 
def finish_time (self)
 Get the time at which this task was finished. More...
 
def total_cmd_exhausted_execute_time (self)
 Get the total time the task executed and failed given resource exhaustion. More...
 
def app_delay (self)
 Get the time spent in upper-level application (outside of work_queue_wait). More...
 
def send_input_start (self)
 Get the time at which the task started to transfer input files. More...
 
def send_input_finish (self)
 Get the time at which the task finished transferring input files. More...
 
def execute_cmd_start (self)
 The time at which the task began. More...
 
def execute_cmd_finish (self)
 Get the time at which the task finished (discovered by the manager). More...
 
def receive_output_start (self)
 Get the time at which the task started to transfer output files. More...
 
def receive_output_finish (self)
 Get the time at which the task finished transferring output files. More...
 
def total_bytes_received (self)
 Get the number of bytes received since task started receiving input data. More...
 
def total_bytes_sent (self)
 Get the number of bytes sent since task started sending input data. More...
 
def total_bytes_transferred (self)
 Get the number of bytes transferred since task started transferring input data. More...
 
def total_transfer_time (self)
 Get the time comsumed in microseconds for transferring total_bytes_transferred. More...
 
def cmd_execution_time (self)
 Time spent in microseconds for executing the command until completion on a single worker. More...
 
def total_cmd_execution_time (self)
 Accumulated time spent in microseconds for executing the command on any worker, regardless of whether the task finished (i.e., this includes time running on workers that disconnected). More...
 
def resources_measured (self)
 Get the resources measured for the task execution if resource monitoring is enabled. More...
 
def limits_exceeded (self)
 Get the resources the task exceeded. More...
 
def resources_requested (self)
 Get the resources the task requested to run. More...
 
def resources_allocated (self)
 Get the resources allocated to the task in its latest attempt. More...
 

Detailed Description

Python Task object

This class is used to create a task specification.

Constructor & Destructor Documentation

◆ __init__()

def ndcctools.work_queue.Task.__init__ (   self,
  command 
)

Create a new task specification.

Parameters
selfReference to the current task object.
commandThe shell command line to be exected by the task.

Member Function Documentation

◆ clone()

def ndcctools.work_queue.Task.clone (   self)

Return a copy of this task.

Return a (deep)copy this task that can also be submitted to the WorkQueue.

◆ specify_command()

def ndcctools.work_queue.Task.specify_command (   self,
  command 
)

Set the command to be executed by the task.

Parameters
selfReference to the current task object.
commandThe command to be executed.

◆ specify_coprocess()

def ndcctools.work_queue.Task.specify_coprocess (   self,
  coprocess 
)

Set the coprocess at the worker that should execute the task's command.

This is not needed for regular tasks.

Parameters
selfReference to the current task object.
coprocessThe name of the coprocess.

◆ specify_algorithm()

def ndcctools.work_queue.Task.specify_algorithm (   self,
  algorithm 
)

Set the worker selection algorithm for task.

Parameters
selfReference to the current task object.
algorithmOne of the following algorithms to use in assigning a task to a worker. See work_queue_schedule_t for possible values.

◆ specify_tag()

def ndcctools.work_queue.Task.specify_tag (   self,
  tag 
)

Attach a user defined logical name to the task.

Parameters
selfReference to the current task object.
tagThe tag to attach to task.

◆ specify_category()

def ndcctools.work_queue.Task.specify_category (   self,
  name 
)

Label the task with the given category.

It is expected that tasks with the same category have similar resources requirements (e.g. for fast abort).

Parameters
selfReference to the current task object.
nameThe name of the category

◆ specify_feature()

def ndcctools.work_queue.Task.specify_feature (   self,
  name 
)

Label the task with the given user-defined feature.

Tasks with the feature will only run on workers that provide it (see worker's –feature option).

Parameters
selfReference to the current task object.
nameThe name of the feature.

◆ specify_file()

def ndcctools.work_queue.Task.specify_file (   self,
  local_name,
  remote_name = None,
  type = None,
  flags = None,
  cache = None,
  failure_only = None 
)

Add a file to the task.

Parameters
selfReference to the current task object.
local_nameThe name of the file on local disk or shared filesystem.
remote_nameThe name of the file at the execution site.
typeMust be one of the following values: WORK_QUEUE_INPUT or WORK_QUEUE_OUTPUT
flagsMay be zero to indicate no special handling, or any of the work_queue_file_flags_t or'd together The most common are:
cacheWhether the file should be cached at workers (True/False)
failure_onlyFor output files, whether the file should be retrieved only when the task fails (e.g., debug logs).

For example:

# The following are equivalent
>>> task.specify_file("/etc/hosts", type=WORK_QUEUE_INPUT, cache = True)
>>> task.specify_file("/etc/hosts", "hosts", type=WORK_QUEUE_INPUT, cache = True)

◆ specify_url()

def ndcctools.work_queue.Task.specify_url (   self,
  url,
  remote_name,
  type = None,
  flags = None,
  cache = None,
  failure_only = None 
)

Add a url to the task which will be provided as an input file.

Parameters
selfReference to the current task object.
urlThe url of the file to provide.
remote_nameThe name of the file as seen by the task.
typeMust be WORK_QUEUE_INPUT. (Output is not currently supported.)
flagsMay be zero to indicate no special handling, or any of the work_queue_file_flags_t or'd together The most common are:
failure_onlyFor output files, whether the file should be retrieved only when the task fails (e.g., debug logs).
cacheWhether the file should be cached at workers (True/False)

For example:

>>> task.specify_url("http://www.google.com/","google.txt",type=WORK_QUEUE_INPUT,flags=WORK_QUEUE_CACHE);

◆ specify_file_command()

def ndcctools.work_queue.Task.specify_file_command (   self,
  cmd,
  remote_name,
  type = None,
  flags = None,
  cache = None,
  failure_only = None 
)

Add an input file produced by a Unix shell command.

The command will be executed at the worker and produce a cacheable file that can be shared among multiple tasks.

Parameters
selfReference to the current task object.
cmdThe shell command which will produce the file. The command must contain the string %% which will be replaced with the cached location of the file.
remote_nameThe name of the file as seen by the task.
typeMust be WORK_QUEUE_INPUT. (Output is not currently supported.)
flagsMay be zero to indicate no special handling, or any of the work_queue_file_flags_t or'd together The most common are:
failure_onlyFor output files, whether the file should be retrieved only when the task fails (e.g., debug logs).
cacheWhether the file should be cached at workers (True/False)

For example:

>>> task.specify_file_command("curl http://www.example.com/mydata.gz | gunzip > %%","infile",type=WORK_QUEUE_INPUT,flags=WORK_QUEUE_CACHE);

◆ specify_file_piece()

def ndcctools.work_queue.Task.specify_file_piece (   self,
  local_name,
  remote_name = None,
  start_byte = 0,
  end_byte = 0,
  type = None,
  flags = None,
  cache = None,
  failure_only = None 
)

Add a file piece to the task.

Parameters
selfReference to the current task object.
local_nameThe name of the file on local disk or shared filesystem.
remote_nameThe name of the file at the execution site.
start_byteThe starting byte offset of the file piece to be transferred.
end_byteThe ending byte offset of the file piece to be transferred.
typeMust be one of the following values: WORK_QUEUE_INPUT or WORK_QUEUE_OUTPUT
flagsMay be zero to indicate no special handling, or any of the work_queue_file_flags_t or'd together The most common are:
cacheWhether the file should be cached at workers (True/False)
failure_onlyFor output files, whether the file should be retrieved only when the task fails (e.g., debug logs).

◆ specify_input_file()

def ndcctools.work_queue.Task.specify_input_file (   self,
  local_name,
  remote_name = None,
  flags = None,
  cache = None 
)

Add a input file to the task.

This is just a wrapper for ndcctools.work_queue.Task.specify_file with type set to WORK_QUEUE_INPUT.

◆ specify_output_file()

def ndcctools.work_queue.Task.specify_output_file (   self,
  local_name,
  remote_name = None,
  flags = None,
  cache = None,
  failure_only = None 
)

Add a output file to the task.

This is just a wrapper for ndcctools.work_queue.Task.specify_file with type set to WORK_QUEUE_OUTPUT.

◆ specify_directory()

def ndcctools.work_queue.Task.specify_directory (   self,
  local_name,
  remote_name = None,
  type = None,
  flags = None,
  recursive = False,
  cache = None,
  failure_only = None 
)

Add a directory to the task.

Parameters
selfReference to the current task object.
local_nameThe name of the directory on local disk or shared filesystem. Optional if the directory is empty.
remote_nameThe name of the directory at the remote execution site.
typeMust be one of the following values: WORK_QUEUE_INPUT or WORK_QUEUE_OUTPUT
flagsMay be zero to indicate no special handling, or any of the work_queue_file_flags_t or'd together The most common are:
recursiveIndicates whether just the directory (False) or the directory and all of its contents (True) should be included.
cacheWhether the file should be cached at workers (True/False)
failure_onlyFor output directories, whether the file should be retrieved only when the task fails (e.g., debug logs).
Returns
1 if the task directory is successfully specified, 0 if either of local_name, or remote_name is null or remote_name is an absolute path.

◆ specify_buffer()

def ndcctools.work_queue.Task.specify_buffer (   self,
  buffer,
  remote_name,
  flags = None,
  cache = None 
)

Add an input bufer to the task.

Parameters
selfReference to the current task object.
bufferThe contents of the buffer to pass as input.
remote_nameThe name of the remote file to create.
flagsMay take the same values as ndcctools.work_queue.Task.specify_file.
cacheWhether the file should be cached at workers (True/False)

◆ specify_snapshot_file()

def ndcctools.work_queue.Task.specify_snapshot_file (   self,
  filename 
)

When monitoring, indicates a json-encoded file that instructs the monitor to take a snapshot of the task resources.

Snapshots appear in the JSON summary file of the task, under the key "snapshots". Snapshots are taken on events on files described in the monitor_snapshot_file. The monitor_snapshot_file is a json encoded file with the following format:

{
"FILENAME": {
"from-start":boolean,
"from-start-if-truncated":boolean,
"delete-if-found":boolean,
"events": [
{
"label":"EVENT_NAME",
"on-create":boolean,
"on-truncate":boolean,
"pattern":"REGEXP",
"count":integer
},
{
"label":"EVENT_NAME",
...
}
]
},
"FILENAME": {
...
}

All keys but "label" are optional:

from-start:boolean If FILENAME exits when task starts running, process from line 1. Default: false, as the task may be appending to an already existing file. from-start-if-truncated If FILENAME is truncated, process from line 1. Default: true, to account for log rotations. delete-if-found Delete FILENAME when found. Default: false

events: label Name that identifies the snapshot. Only alphanumeric, -, and _ characters are allowed. on-create Take a snapshot every time the file is created. Default: false on-truncate Take a snapshot when the file is truncated. Default: false on-pattern Take a snapshot when a line matches the regexp pattern. Default: none count Maximum number of snapshots for this label. Default: -1 (no limit)

Exactly one of on-create, on-truncate, or on-pattern should be specified.

Once a task has finished, the snapshots are available as:

for s in t.resources_measured.snapshots:
print(s.memory)

For more information, consult the manual of the resource_monitor.

Parameters
selfReference to the current task object.
filenameThe name of the snapshot events specification

◆ specify_max_retries()

def ndcctools.work_queue.Task.specify_max_retries (   self,
  max_retries 
)

Indicate the number of times the task should be retried.

If 0 (the default), the task is tried indefinitely. A task that did not succeed after the given number of retries is returned with result WORK_QUEUE_RESULT_MAX_RETRIES.

◆ specify_cores()

def ndcctools.work_queue.Task.specify_cores (   self,
  cores 
)

Indicate the number of cores required by this task.

◆ specify_memory()

def ndcctools.work_queue.Task.specify_memory (   self,
  memory 
)

Indicate the memory (in MB) required by this task.

◆ specify_disk()

def ndcctools.work_queue.Task.specify_disk (   self,
  disk 
)

Indicate the disk space (in MB) required by this task.

◆ specify_gpus()

def ndcctools.work_queue.Task.specify_gpus (   self,
  gpus 
)

Indicate the number of GPUs required by this task.

◆ specify_priority()

def ndcctools.work_queue.Task.specify_priority (   self,
  priority 
)

Indicate the the priority of this task (larger means better priority, default is 0).

◆ specify_environment_variable()

def ndcctools.work_queue.Task.specify_environment_variable (   self,
  name,
  value = None 
)

Set this environment variable before running the task.

If value is None, then variable is unset.

◆ specify_monitor_output()

def ndcctools.work_queue.Task.specify_monitor_output (   self,
  directory 
)

Set a name for the resource summary output directory from the monitor.

◆ tag()

def ndcctools.work_queue.Task.tag (   self)

Get the user-defined logical name for the task.

>>> print(t.tag)

◆ category()

def ndcctools.work_queue.Task.category (   self)

Get the category name for the task.

>>> print(t.category)

◆ command()

def ndcctools.work_queue.Task.command (   self)

Get the shell command executed by the task.

>>> print(t.command)

◆ priority()

def ndcctools.work_queue.Task.priority (   self)

Get the priority of the task.

>>> print(t.priority)

◆ algorithm()

def ndcctools.work_queue.Task.algorithm (   self)

Get the algorithm for choosing worker to run the task.

>>> print(t.algorithm)

◆ std_output()

def ndcctools.work_queue.Task.std_output (   self)

Get the standard output of the task.

Must be called only after the task completes execution.

>>> print(t.std_output)

◆ output()

def ndcctools.work_queue.Task.output (   self)

Get the standard output of the task.

(Same as t.std_output for regular work queue tasks) Must be called only after the task completes execution.

>>> print(t.output)

Reimplemented in ndcctools.work_queue.PythonTask.

◆ id()

def ndcctools.work_queue.Task.id (   self)

Get the task id number.

Must be called only after the task was submitted.

>>> print(t.id)

◆ return_status()

def ndcctools.work_queue.Task.return_status (   self)

Get the exit code of the command executed by the task.

Must be called only after the task completes execution.

>>> print(t.return_status)

◆ result()

def ndcctools.work_queue.Task.result (   self)

Get the result of the task as an integer code, such as successful, missing file, etc.

See work_queue_result_t for possible values. Must be called only after the task completes execution.

>>> print(t.result)
0

◆ result_str()

def ndcctools.work_queue.Task.result_str (   self)

Return a string that explains the result of a task.

Must be called only after the task completes execution.

>>> print(t.result_str)
'SUCCESS'

◆ total_submissions()

def ndcctools.work_queue.Task.total_submissions (   self)

Get the number of times the task has been resubmitted internally.

Must be called only after the task completes execution.

>>> print(t.total_submissions)

◆ exhausted_attempts()

def ndcctools.work_queue.Task.exhausted_attempts (   self)

Get the number of times the task has been failed given resource exhaustion.

>>> print(t.exhausted_attempts)

◆ host()

def ndcctools.work_queue.Task.host (   self)

Get the address and port of the host on which the task ran.

Must be called only after the task completes execution.

>>> print(t.host)

◆ hostname()

def ndcctools.work_queue.Task.hostname (   self)

Get the name of the host on which the task ran.

Must be called only after the task completes execution.

>>> print(t.hostname)

◆ submit_time()

def ndcctools.work_queue.Task.submit_time (   self)

Get the time at which this task was submitted.

Must be called only after the task completes execution.

>>> print(t.submit_time)

◆ finish_time()

def ndcctools.work_queue.Task.finish_time (   self)

Get the time at which this task was finished.

Must be called only after the task completes execution.

>>> print(t.finish_time)

◆ total_cmd_exhausted_execute_time()

def ndcctools.work_queue.Task.total_cmd_exhausted_execute_time (   self)

Get the total time the task executed and failed given resource exhaustion.

>>> print(t.total_cmd_exhausted_execute_time)

◆ app_delay()

def ndcctools.work_queue.Task.app_delay (   self)

Get the time spent in upper-level application (outside of work_queue_wait).

Must be called only after the task completes execution.

>>> print(t.app_delay)

◆ send_input_start()

def ndcctools.work_queue.Task.send_input_start (   self)

Get the time at which the task started to transfer input files.

Must be called only after the task completes execution.

>>> print(t.send_input_start)

◆ send_input_finish()

def ndcctools.work_queue.Task.send_input_finish (   self)

Get the time at which the task finished transferring input files.

Must be called only after the task completes execution.

>>> print(t.send_input_finish)

◆ execute_cmd_start()

def ndcctools.work_queue.Task.execute_cmd_start (   self)

The time at which the task began.

Must be called only after the task completes execution.

>>> print(t.execute_cmd_start)

◆ execute_cmd_finish()

def ndcctools.work_queue.Task.execute_cmd_finish (   self)

Get the time at which the task finished (discovered by the manager).

Must be called only after the task completes execution.

>>> print(t.execute_cmd_finish)

◆ receive_output_start()

def ndcctools.work_queue.Task.receive_output_start (   self)

Get the time at which the task started to transfer output files.

Must be called only after the task completes execution.

>>> print(t.receive_output_start)

◆ receive_output_finish()

def ndcctools.work_queue.Task.receive_output_finish (   self)

Get the time at which the task finished transferring output files.

Must be called only after the task completes execution.

>>> print(t.receive_output_finish)

◆ total_bytes_received()

def ndcctools.work_queue.Task.total_bytes_received (   self)

Get the number of bytes received since task started receiving input data.

Must be called only after the task completes execution.

>>> print(t.total_bytes_received)

◆ total_bytes_sent()

def ndcctools.work_queue.Task.total_bytes_sent (   self)

Get the number of bytes sent since task started sending input data.

Must be called only after the task completes execution.

>>> print(t.total_bytes_sent)

◆ total_bytes_transferred()

def ndcctools.work_queue.Task.total_bytes_transferred (   self)

Get the number of bytes transferred since task started transferring input data.

Must be called only after the task completes execution.

>>> print(t.total_bytes_transferred)

◆ total_transfer_time()

def ndcctools.work_queue.Task.total_transfer_time (   self)

Get the time comsumed in microseconds for transferring total_bytes_transferred.

Must be called only after the task completes execution.

>>> print(t.total_transfer_time)

◆ cmd_execution_time()

def ndcctools.work_queue.Task.cmd_execution_time (   self)

Time spent in microseconds for executing the command until completion on a single worker.

Must be called only after the task completes execution.

>>> print(t.cmd_execution_time)

◆ total_cmd_execution_time()

def ndcctools.work_queue.Task.total_cmd_execution_time (   self)

Accumulated time spent in microseconds for executing the command on any worker, regardless of whether the task finished (i.e., this includes time running on workers that disconnected).

Must be called only after the task completes execution.

>>> print(t.total_cmd_execution_time)

◆ resources_measured()

def ndcctools.work_queue.Task.resources_measured (   self)

Get the resources measured for the task execution if resource monitoring is enabled.

Must be called only after the task completes execution. Valid fields:

start: microseconds at the start of execution

end: microseconds at the end of execution

wall_time: microseconds spent during execution

cpu_time: user + system time of the execution

cores: peak number of cores used

cores_avg: number of cores computed as cpu_time/wall_time

gpus: peak number of gpus used

max_concurrent_processes: the maximum number of processes running concurrently

total_processes: count of all of the processes created

virtual_memory: maximum virtual memory across all processes

memory: maximum resident size across all processes

swap_memory: maximum swap usage across all processes

bytes_read: number of bytes read from disk

bytes_written: number of bytes written to disk

bytes_received: number of bytes read from the network

bytes_sent: number of bytes written to the network

bandwidth: maximum network bits/s (average over one minute)

total_files: total maximum number of files and directories of all the working directories in the tree

disk: size in MB of all working directories in the tree

>>> print(t.resources_measured.memory)

◆ limits_exceeded()

def ndcctools.work_queue.Task.limits_exceeded (   self)

Get the resources the task exceeded.

For valid field see ndcctools.work_queue.Task.resources_measured.

◆ resources_requested()

def ndcctools.work_queue.Task.resources_requested (   self)

Get the resources the task requested to run.

For valid fields see ndcctools.work_queue.Task.resources_measured.

◆ resources_allocated()

def ndcctools.work_queue.Task.resources_allocated (   self)

Get the resources allocated to the task in its latest attempt.

For valid fields see ndcctools.work_queue.Task.resources_measured.


The documentation for this class was generated from the following file: