cctools
ndcctools.taskvine.task.Task Class Reference
Inheritance diagram for ndcctools.taskvine.task.Task:
ndcctools.taskvine.task.LibraryTask ndcctools.taskvine.task.PythonTask ndcctools.taskvine.dask_executor.PythonTaskDask ndcctools.taskvine.futures.FuturePythonTask ndcctools.taskvine.task.FunctionCall ndcctools.taskvine.dask_executor.FunctionCallDask ndcctools.taskvine.futures.FutureFunctionCall

Public Member Functions

def __init__ (self, command, **task_info)
 Create a new task specification. More...
 
def submit_finalize (self)
 Finalizes the task definition once the manager that will execute is run. More...
 
def clone (self)
 Return a copy of this task. More...
 
def set_command (self, command)
 Set the command to be executed by the task. More...
 
def needs_library (self, library_name)
 Set the name of the library at the worker that should execute the task's command. More...
 
def provides_library (self, library_name)
 Set the library name provided by this task. More...
 
def set_function_slots (self, nslots)
 Set the number of concurrent functions a library can run. More...
 
def set_scheduler (self, scheduler)
 Set the worker selection scheduler for task. More...
 
def set_tag (self, tag)
 Attach a user defined logical name to the task. More...
 
def set_category (self, name)
 Label the task with the given category. More...
 
def add_feature (self, name)
 Label the task with the given user-defined feature. More...
 
def add_input (self, file, remote_name, strict_input=False, mount_symlink=False)
 Add any input object to a task. More...
 
def add_output (self, file, remote_name, watch=False, failure_only=None, success_only=None)
 Add any output object to a task. More...
 
def set_snapshot_file (self, filename)
 When monitoring, indicates a json-encoded file that instructs the monitor to take a snapshot of the task resources. More...
 
def add_starch_package (self, file)
 Add a Starch package as an execution context. More...
 
def add_poncho_package (self, file)
 Add a Poncho package as an execution context. More...
 
def add_execution_context (self, f)
 Adds an execution context to the task. More...
 
def set_retries (self, max_retries)
 Indicate the number of times the task should be retried. More...
 
def set_max_forsaken (self, max_forsaken)
 Indicate the number of times the task can be returned to the manager without being executed. More...
 
def set_cores (self, cores)
 Indicate the number of cores required by this task. More...
 
def set_memory (self, memory)
 Indicate the memory (in MB) required by this task. More...
 
def set_disk (self, disk)
 Indicate the disk space (in MB) required by this task. More...
 
def set_gpus (self, gpus)
 Indicate the number of GPUs required by this task. More...
 
def set_priority (self, priority)
 Indicate the the priority of this task (larger means better priority, default is 0). More...
 
def set_env_var (self, name, value=None)
 Set this environment variable before running the task. More...
 
def set_monitor_output (self, directory)
 Set a name for the resource summary output directory from the monitor. More...
 
def tag (self)
 Get the user-defined logical name for the task. More...
 
def category (self)
 Get the category name for the task. More...
 
def command (self)
 Get the shell command executed by the task. More...
 
def std_output (self)
 Get the standard output of the task. More...
 
def output (self)
 Get the standard output of the task. More...
 
def id (self)
 Get the task id number. More...
 
def exit_code (self)
 Get the exit code of the command executed by the task. More...
 
def result (self)
 Return a string that explains the result of a task. More...
 
def completed (self)
 Return True if task executed and its command terminated normally. More...
 
def successful (self)
 Return True if task executed successfully, (i.e. More...
 
def get_metric (self, name)
 Return various integer performance metrics about a completed task. More...
 
def addrport (self)
 Get the address and port of the host on which the task ran. More...
 
def hostname (self)
 Get the address and port of the host on which the task ran. More...
 
def resources_measured (self)
 Get the resources measured for the task execution if resource monitoring is enabled. More...
 
def limits_exceeded (self)
 Get the resources the task exceeded. More...
 
def resources_requested (self)
 Get the resources the task requested to run. More...
 
def resources_allocated (self)
 Get the resources allocated to the task in its latest attempt. More...
 
def add_nopen (self, manager)
 Adds inputs for nopen library and rules file and sets LD_PRELOAD. More...
 

Detailed Description

TaskVine Task object

This class is used to create a task specification to be submitted to a ndcctools.taskvine.manager.Manager.

Constructor & Destructor Documentation

◆ __init__()

def ndcctools.taskvine.task.Task.__init__ (   self,
  command,
**  task_info 
)

Create a new task specification.

Parameters
selfReference to the current task object.
commandThe shell command line to be exected by the task.
task_infoOptional dictionary containing specified task parameters.

Member Function Documentation

◆ submit_finalize()

def ndcctools.taskvine.task.Task.submit_finalize (   self)

Finalizes the task definition once the manager that will execute is run.

This function is run by the manager before registering the task for execution.

Parameters
selfReference to the current python task object

Reimplemented in ndcctools.taskvine.task.FunctionCall, ndcctools.taskvine.task.PythonTask, ndcctools.taskvine.futures.FuturePythonTask, and ndcctools.taskvine.futures.FutureFunctionCall.

◆ clone()

def ndcctools.taskvine.task.Task.clone (   self)

Return a copy of this task.

Return a (deep)copy this task that can also be submitted to the ndcctools.taskvine.

◆ set_command()

def ndcctools.taskvine.task.Task.set_command (   self,
  command 
)

Set the command to be executed by the task.

Parameters
selfReference to the current task object.
commandThe command to be executed.

◆ needs_library()

def ndcctools.taskvine.task.Task.needs_library (   self,
  library_name 
)

Set the name of the library at the worker that should execute the task's command.

This is not needed for regular tasks.

Parameters
selfReference to the current task object.
library_nameThe name of the library

◆ provides_library()

def ndcctools.taskvine.task.Task.provides_library (   self,
  library_name 
)

Set the library name provided by this task.

This is not needed for regular tasks.

Parameters
selfReference to the current task object.
library_nameThe name of the library.

◆ set_function_slots()

def ndcctools.taskvine.task.Task.set_function_slots (   self,
  nslots 
)

Set the number of concurrent functions a library can run.

This is not needed for regular tasks.

Parameters
selfReference to the current task object.
nslotsThe maximum number of concurrent functions this library can run.

◆ set_scheduler()

def ndcctools.taskvine.task.Task.set_scheduler (   self,
  scheduler 
)

Set the worker selection scheduler for task.

Parameters
selfReference to the current task object.
schedulerOne of the following schedulers to use in assigning a task to a worker. See vine_schedule_t for possible values.

◆ set_tag()

def ndcctools.taskvine.task.Task.set_tag (   self,
  tag 
)

Attach a user defined logical name to the task.

Parameters
selfReference to the current task object.
tagThe tag to attach to task.

◆ set_category()

def ndcctools.taskvine.task.Task.set_category (   self,
  name 
)

Label the task with the given category.

It is expected that tasks with the same category have similar resources requirements (e.g. to disconnect slow workers).

Parameters
selfReference to the current task object.
nameThe name of the category

◆ add_feature()

def ndcctools.taskvine.task.Task.add_feature (   self,
  name 
)

Label the task with the given user-defined feature.

Tasks with the feature will only run on workers that provide it (see worker's –feature option).

Parameters
selfReference to the current task object.
nameThe name of the feature.

◆ add_input()

def ndcctools.taskvine.task.Task.add_input (   self,
  file,
  remote_name,
  strict_input = False,
  mount_symlink = False 
)

Add any input object to a task.

Parameters
selfReference to the current task object.
fileA file object of class ndcctools.taskvine.file.File, such as from ndcctools.taskvine.manager.Manager.declare_file, ndcctools.taskvine.manager.Manager.declare_buffer, ndcctools.taskvine.manager.Manager.declare_url, etc.
remote_nameThe name of the file at the execution site.
strict_inputWhether the file should be transfered to the worker for execution. If no worker has all the input files already cached marked as strict inputs for the task, the task fails.

For example:

>>> url = m.declare_url(http://somewhere.edu/data.tgz)
>>> f = m.declare_untar(url)
>>> task.add_input(f,"data")

◆ add_output()

def ndcctools.taskvine.task.Task.add_output (   self,
  file,
  remote_name,
  watch = False,
  failure_only = None,
  success_only = None 
)

Add any output object to a task.

Parameters
selfReference to the current task object.
fileA file object of class ndcctools.taskvine.file.File, such as from ndcctools.taskvine.manager.Manager.declare_file, or ndcctools.taskvine.manager.Manager.declare_buffer ndcctools.taskvine.task.Task.add_input
remote_nameThe name of the file at the execution site.
watchWatch the output file and send back changes as the task runs.
success_onlyWhether the file should be retrieved only when the task succeeds. Default is False.
failure_onlyWhether the file should be retrieved only when the task fails (e.g., debug logs). Default is False.

For example:

>>> file = m.declare_file("output.txt")
>>> task.add_output(file,"out")

◆ set_snapshot_file()

def ndcctools.taskvine.task.Task.set_snapshot_file (   self,
  filename 
)

When monitoring, indicates a json-encoded file that instructs the monitor to take a snapshot of the task resources.

Snapshots appear in the JSON summary file of the task, under the key "snapshots". Snapshots are taken on events on files described in the monitor_snapshot_file. The monitor_snapshot_file is a json encoded file with the following format:

{
"FILENAME": {
"from-start":boolean,
"from-start-if-truncated":boolean,
"delete-if-found":boolean,
"events": [
{
"label":"EVENT_NAME",
"on-create":boolean,
"on-truncate":boolean,
"pattern":"REGEXP",
"count":integer
},
{
"label":"EVENT_NAME",
...
}
]
},
"FILENAME": {
...
}

All keys but "label" are optional:

from-start:boolean If FILENAME exits when task starts running, process from line 1. Default: false, as the task may be appending to an already existing file. from-start-if-truncated If FILENAME is truncated, process from line 1. Default: true, to account for log rotations. delete-if-found Delete FILENAME when found. Default: false

events: label Name that identifies the snapshot. Only alphanumeric, -, and _ characters are allowed. on-create Take a snapshot every time the file is created. Default: false on-truncate Take a snapshot when the file is truncated. Default: false on-pattern Take a snapshot when a line matches the regexp pattern. Default: none count Maximum number of snapshots for this label. Default: -1 (no limit)

Exactly one of on-create, on-truncate, or on-pattern should be specified.

Once a task has finished, the snapshots are available as:

for s in t.resources_measured.snapshots:
print(s.memory)

For more information, consult the manual of the resource_monitor.

Parameters
selfReference to the current task object.
filenameThe name of the snapshot events specification

◆ add_starch_package()

def ndcctools.taskvine.task.Task.add_starch_package (   self,
  file 
)

Add a Starch package as an execution context.

The file given must refer to a (unpacked) package containing libraries captured by the starch command. The task will execute using this package as its environment.

Parameters
tA task object.
fA file containing an unpacked Starch package.

◆ add_poncho_package()

def ndcctools.taskvine.task.Task.add_poncho_package (   self,
  file 
)

Add a Poncho package as an execution context.

The file given must refer to a (unpacked) PONCHO package, containing a set of Python modules needed by the task. The task will execute using this package as its Python environment.

Parameters
tA task object.
fA file containing an unpacked Poncho package.

◆ add_execution_context()

def ndcctools.taskvine.task.Task.add_execution_context (   self,
  f 
)

Adds an execution context to the task.

The context file given must expand to a directory containing (at a minimum) a file named bin/run_in_env that will perform any desired setup (e.g. setting PATH, LD_LIBRARY_PATH, PYTHONPATH), execute the given command, and then perform any desired cleanup. The context directory may also include any support files or libraries needed by the task. If specified multiple times, execution contexts are nested in the order given (i.e. first added is the first applied).

See also
add_poncho_package
add_starch_package
Parameters
tA task object.
fThe execution context file.

◆ set_retries()

def ndcctools.taskvine.task.Task.set_retries (   self,
  max_retries 
)

Indicate the number of times the task should be retried.

If less than 1 (the default), the task is tried indefinitely. A task that did not succeed after the given number of retries is returned with result "max retries".

◆ set_max_forsaken()

def ndcctools.taskvine.task.Task.set_max_forsaken (   self,
  max_forsaken 
)

Indicate the number of times the task can be returned to the manager without being executed.

If less than 0 (the default), the task is tried indefinitely. A task that did not succeed after the given number of retries is returned with result "forsaken".

◆ set_cores()

def ndcctools.taskvine.task.Task.set_cores (   self,
  cores 
)

Indicate the number of cores required by this task.

◆ set_memory()

def ndcctools.taskvine.task.Task.set_memory (   self,
  memory 
)

Indicate the memory (in MB) required by this task.

◆ set_disk()

def ndcctools.taskvine.task.Task.set_disk (   self,
  disk 
)

Indicate the disk space (in MB) required by this task.

◆ set_gpus()

def ndcctools.taskvine.task.Task.set_gpus (   self,
  gpus 
)

Indicate the number of GPUs required by this task.

◆ set_priority()

def ndcctools.taskvine.task.Task.set_priority (   self,
  priority 
)

Indicate the the priority of this task (larger means better priority, default is 0).

◆ set_env_var()

def ndcctools.taskvine.task.Task.set_env_var (   self,
  name,
  value = None 
)

Set this environment variable before running the task.

If value is None, then variable is unset.

◆ set_monitor_output()

def ndcctools.taskvine.task.Task.set_monitor_output (   self,
  directory 
)

Set a name for the resource summary output directory from the monitor.

◆ tag()

def ndcctools.taskvine.task.Task.tag (   self)

Get the user-defined logical name for the task.

>>> print(t.tag)

◆ category()

def ndcctools.taskvine.task.Task.category (   self)

Get the category name for the task.

>>> print(t.category)

◆ command()

def ndcctools.taskvine.task.Task.command (   self)

Get the shell command executed by the task.

>>> print(t.command)

◆ std_output()

def ndcctools.taskvine.task.Task.std_output (   self)

Get the standard output of the task.

Must be called only after the task completes execution.

>>> print(t.std_output)

◆ output()

def ndcctools.taskvine.task.Task.output (   self)

Get the standard output of the task.

(Same as t.std_output for regular taskvine tasks) Must be called only after the task completes execution. If this task is a FunctionCall task then we apply some transformations as FunctionCall returns a specifically formatted result.

>>> print(t.output)

Reimplemented in ndcctools.taskvine.task.FunctionCall, and ndcctools.taskvine.task.PythonTask.

◆ id()

def ndcctools.taskvine.task.Task.id (   self)

Get the task id number.

Must be called only after the task was submitted.

>>> print(t.id)

◆ exit_code()

def ndcctools.taskvine.task.Task.exit_code (   self)

Get the exit code of the command executed by the task.

Must be called only after the task completes execution.

>>> print(t.exit_code)

◆ result()

def ndcctools.taskvine.task.Task.result (   self)

Return a string that explains the result of a task.

Must be called only after the task completes execution.

Possible results are: "success" "input missing" "output missing" "stdout missing" "signal" "resource exhaustion" "max end time" "unknown" "forsaken" "max retries" "max wall time" "monitor error" "output transfer error" "fixed location missing"

>>> print(t.result)
'success'

◆ completed()

def ndcctools.taskvine.task.Task.completed (   self)

Return True if task executed and its command terminated normally.

If True, the exit code of the command can be retrieved with exit_code. If False, the error condition can be retrieved with result. It must be called only after the task completes execution.

>>> # completed tasks with a failed command execution:
>>> print(t.completed())
True
>>> print(t.exit_code)
1
>>> # task with an error condition:
>>> print(t.completed())
False
>>> print(t.result)
max retries

◆ successful()

def ndcctools.taskvine.task.Task.successful (   self)

Return True if task executed successfully, (i.e.

its command terminated normally with exit code 0 and produced all its declared output files). Differs from ndcctools.taskvine.task.Task.completed in that the exit code of the command should be zero. It must be called only after the task completes execution.

>>> # completed tasks with a failed command execution:
>>> print(t.completed())
True
>>> print(t.successful())
False

◆ get_metric()

def ndcctools.taskvine.task.Task.get_metric (   self,
  name 
)

Return various integer performance metrics about a completed task.

Must be called only after the task completes execution.

Valid metric names:

  • time_when_submitted
  • time_when_done
  • time_when_commit_start
  • time_when_commit_end
  • time_when_retrieval
  • time_workers_execute_last
  • time_workers_execute_all
  • time_workers_execute_exhaustion
  • time_workers_execute_failure
  • bytes_received
  • bytes_sent
  • bytes_transferred
>>> print(t.get_metric("total_submissions")

◆ addrport()

def ndcctools.taskvine.task.Task.addrport (   self)

Get the address and port of the host on which the task ran.

Must be called only after the task completes execution.

>>> print(t.host)

◆ hostname()

def ndcctools.taskvine.task.Task.hostname (   self)

Get the address and port of the host on which the task ran.

Must be called only after the task completes execution.

>>> print(t.host)

◆ resources_measured()

def ndcctools.taskvine.task.Task.resources_measured (   self)

Get the resources measured for the task execution if resource monitoring is enabled.

Must be called only after the task completes execution. Valid fields:

start: microseconds at the start of execution

end: microseconds at the end of execution

wall_time: microseconds spent during execution

cpu_time: user + system time of the execution

cores: peak number of cores used

cores_avg: number of cores computed as cpu_time/wall_time

gpus: peak number of gpus used

max_concurrent_processes: the maximum number of processes running concurrently

total_processes: count of all of the processes created

virtual_memory: maximum virtual memory across all processes

memory: maximum resident size across all processes

swap_memory: maximum swap usage across all processes

bytes_read: number of bytes read from disk

bytes_written: number of bytes written to disk

bytes_received: number of bytes read from the network

bytes_sent: number of bytes written to the network

bandwidth: maximum network bits/s (average over one minute)

total_files: total maximum number of files and directories of all the working directories in the tree

disk: size in MB of all working directories in the tree

>>> print(t.resources_measured.memory)

◆ limits_exceeded()

def ndcctools.taskvine.task.Task.limits_exceeded (   self)

Get the resources the task exceeded.

For valid field see ndcctools.taskvine.task.Task.resources_measured.

◆ resources_requested()

def ndcctools.taskvine.task.Task.resources_requested (   self)

Get the resources the task requested to run.

For valid fields see ndcctools.taskvine.task.Task.resources_measured.

◆ resources_allocated()

def ndcctools.taskvine.task.Task.resources_allocated (   self)

Get the resources allocated to the task in its latest attempt.

For valid fields ndcctools.taskvine.task.Task.resources_measured.

◆ add_nopen()

def ndcctools.taskvine.task.Task.add_nopen (   self,
  manager 
)

Adds inputs for nopen library and rules file and sets LD_PRELOAD.


The documentation for this class was generated from the following file: