cctools
ndcctools.resource_monitor Namespace Reference

Data Structures

class  Categories
 Class to encapsule all the categories in a workflow. More...
 
class  Category
 
class  ResourceExhaustion
 Exception raised when a function goes over the resources limits. More...
 
class  ResourceInternalError
 

Functions

def monitored (limits=None, callback=None, interval=1, return_resources=True)
 Create a monitored version of a function. More...
 

Detailed Description

Resource monitoring tool for complex applications - Python interface.

The resource_monitor provides an unprivileged way for systems to monitor the consumption of key resources (cores, memory, disk) of applications ranging from simple Python functions up to complex multi-process trees. It provides measurement, logging, enforcement, and triggers upon various conditions. The objects and methods provided by this package correspond to the native C API in category.h, rmonitor_poll.h, and rmsummary.h

The SWIG-based Python bindings provide a higher-level interface that revolves around the following function/decorator and objects:

Function Documentation

◆ monitored()

def ndcctools.resource_monitor.monitored (   limits = None,
  callback = None,
  interval = 1,
  return_resources = True 
)

Create a monitored version of a function.

It can be used as a decorator, or called by itself.

Parameters
limitsDictionary of resource limits to set. Available limits are:
  • wall_time: time spent during execution (seconds)
  • cpu_time: user + system time of the execution (seconds)
  • cores: peak number of cores used
  • cores_avg: number of cores computed as cpu_time/wall_time
  • max_concurrent_processes: the maximum number of processes running concurrently
  • total_processes: count of all of the processes created
  • virtual_memory: maximum virtual memory across all processes (megabytes)
  • memory: maximum resident size across all processes (megabytes)
  • swap_memory: maximum swap usage across all processes (megabytes)
  • bytes_read: number of bytes read from disk
  • bytes_written: number of bytes written to disk
  • bytes_received: number of bytes read from the network
  • bytes_sent: number of bytes written to the network
  • bandwidth: maximum network bits/s (average over one minute)
  • total_files: total maximum number of files and directories of all the working directories in the tree
  • disk: size of all working directories in the tree (megabytes)
callbackFunction to call every time a measurement is done. The arguments given to the function are
  • id: Unique identifier for the function and its arguments.
  • name: Name of the original function.
  • step: Measurement step. It is -1 for the last measurement taken.
  • resources: Current resources measured.
intervalMaximum time in seconds between measurements.
return_resourcesWhether to modify the return value of the function to a tuple of the original result and a dictionary with the final measurements.
# Decorating a function:
@monitored()
def my_sum_a(lst):
return sum(lst)
@monitored(return_resources = False, callback = lambda id, name, step, res: print('memory used', res['memory']))
def my_sum_b(lst):
return sum(lst)
>>> (result_a, resources) = my_sum_a([1,2,3])
>>> print(result, resources['memory'])
6, 66
>>> result_b = my_sum_b([1,2,3])
memory used: 66
>>> assert(result_a == result_b)
# Wrapping the already defined function 'sum', adding limits:
my_sum_monitored = monitored(limits = {'memory': 1024})(sum)
try:
# original_result = sum(...)
(original_result, resources_used) = my_sum_monitored(...)
except ResourceExhaustion as e:
print(e)
...
# Defining a function with a callback and a decorator.
# In this example, we record the time series of resources used:
import multiprocessing
results_series = multiprocessing.Queue()
def my_callback(id, name, step, resources):
results_series.put((step, resources))
@monitored(callback = my_callback, return_resources = False):
def my_function(...):
...
result = my_function(...)
# print the time series
while not results_series.empty():
try:
step, resources = results_series.get(False)
print(step, resources)
except multiprocessing.Empty:
pass