cctools
|
Data Structures | |
class | Categories |
Class to encapsule all the categories in a workflow. More... | |
class | Category |
class | ResourceExhaustion |
Exception raised when a function goes over the resources limits. More... | |
class | ResourceInternalError |
Functions | |
def | monitored (limits=None, callback=None, interval=1, return_resources=True) |
Create a monitored version of a function. More... | |
Python resource_monitor bindings.
The objects and methods provided by this package correspond to the native C API in category.h, rmonitor_poll.h, and rmsummary.h
The SWIG-based Python bindings provide a higher-level interface that revolves around the following function/decorator and objects:
def resource_monitor.monitored | ( | limits = None , |
|
callback = None , |
|||
interval = 1 , |
|||
return_resources = True |
|||
) |
Create a monitored version of a function.
It can be used as a decorator, or called by itself.
limits | Dictionary of resource limits to set. Available limits are:
|
callback | Function to call every time a measurement is done. The arguments given to the function are
|
interval | Maximum time in seconds between measurements. |
return_resources | Whether to modify the return value of the function to a tuple of the original result and a dictionary with the final measurements. # Decorating a function:
@monitored()
def my_sum_a(lst):
return sum(lst)
@monitored(return_resources = False, callback = lambda id, name, step, res: print('memory used', res['memory']))
def my_sum_b(lst):
return sum(lst)
>>> (result_a, resources) = my_sum_a([1,2,3])
>>> print(result, resources['memory'])
6, 66
>>> result_b = my_sum_b([1,2,3])
memory used: 66
>>> assert(result_a == result_b)
# Wrapping the already defined function 'sum', adding limits:
my_sum_monitored = monitored(limits = {'memory': 1024})(sum)
try:
# original_result = sum(...)
(original_result, resources_used) = my_sum_monitored(...)
except ResourceExhaustion as e:
print(e)
...
# Defining a function with a callback and a decorator.
# In this example, we record the time series of resources used:
import multiprocessing
results_series = multiprocessing.Queue()
def my_callback(id, name, step, resources):
results_series.put((step, resources))
@monitored(callback = my_callback, return_resources = False):
def my_function(...):
...
result = my_function(...)
# print the time series
while not results_series.empty():
try:
step, resources = results_series.get(False)
print(step, resources)
except multiprocessing.Empty:
pass
|