Execute¶
The bndl.execute module provides a means to execute Jobs on a
cluster of workers.
Jobs consists of Tasks which are
interconnected in a directed acyclic graph (traversable in both directions). bndl.execute
provides a Scheduler which executes jobs and ensure
that tasks are executed in dependency order, tasks are executed with locality (if any) to a worker
and tasks are restarted (if configured), also when a (possibly indirect) dependent task marks a
dependency as failed.
Execute context¶
- 
class 
bndl.execute.context.ExecutionContext(node, config=<Conf {}>)[source]¶ Bases:
bndl.util.lifecycle.LifecycleEntry point for executing
Jobsacross workers.- 
execute(job, workers=None, order_results=True, concurrency=None, attempts=None)[source]¶ Execute a
Jobon workers and get the results of eachTaskas it is executed.Parameters: - job (bndl.execute.job.Job) – The job to execute.
 - workers (sequence) – A sequence of 
RMIPeerNodespeer nodes to execute the job on. - order_results (bool) – Whether the results of the task are to be yielded in order or not (defaults to True).
 - concurrency (int >= 1) – The number of tasks to execute concurrently on each worker.
Defaults to the 
bndl.execute.concurrencyconfiguration parameter. - attempts (int >= 1) – The maximum number of attempts per task (not counting worker failure
as induced from a task failing with NotConnected or a task marked as failed through
bndl.execute.exceptions.DependenciesFailed). Defaults to thebndl.execute.attemptsconfiguration parameter. 
- 
await_workers(worker_count=None, connect_timeout=5, stable_timeout=60)[source]¶ Waits for workers to be available. If not in connect_timeout a RuntimeError is raised. Once a worker is found, at most stable_timeout seconds will be waited for the cluster to settle. That is, until no new workers are discovered / the worker count is stable.
Parameters: - worker_count (float or None) – The expected worker count. When connected to exactly worker_count workers, this method will return faster.
 - connect_timeout (float) – Maximum time in seconds waited until the first worker is discovered.
 - stable_timeout (float) – Maximum time in seconds waited until no more workers are discovered.
 
- 
worker_count¶ The number of workers connected with
- 
workers¶ All peers of the local node with
node_typeworker.
- 
cpu_profiling¶ Perform CPU profiling across the cluster with
yappi.
- 
memory_profiling¶ Perform memory profiling on the cluster with
tracemalloc.
- 
 
Jobs and tasks¶
- 
class 
bndl.execute.job.Job(ctx, tasks, name=None, desc=None)[source]¶ Bases:
bndl.util.lifecycle.LifecycleA set of
Taskswhich can be executed on a cluster of workers.
- 
class 
bndl.execute.job.Task(ctx, task_id, *, priority=None, name=None, desc=None, group=None)[source]¶ Bases:
bndl.util.lifecycle.LifecycleExecution of a Task on a worker is the basic unit of scheduling in
bndl.execute.- 
execute(scheduler, worker)[source]¶ Execute the task on a worker. The scheduler is provided as ‘context’ for the task.
- 
locality(workers)[source]¶ Indicate locality for executing this task on workers.
Parameters: workers (sequence[worker]) – The workers to determine the locality for. Returns: Sequence[ – A sequence of worker - locality tuples. 0 is indifferent and can be skipped, -1 is forbidden, 1+ increasing locality. Return type: worker, locality 
- 
started¶ Whether the task has started
- 
done¶ Whether the task has completed execution
- 
set_executing(worker)[source]¶ Utility for sub-classes to register the task as executing on a worker.
- 
mark_done(result=None)[source]¶ ‘ Externally’ mark the task as done. E.g. because its ‘side effect’ (result) is already available).
- 
mark_failed(exc)[source]¶ ‘Externally’ mark the task as failed. E.g. because the worker which holds the tasks’ result has failed / can’t be reached.
- 
 
- 
class 
bndl.execute.job.RmiTask(ctx, task_id, method, args=(), kwargs=None, *, priority=None, name=None, desc=None, group=None)[source]¶ Bases:
bndl.execute.job.TaskA task which performs a Remote Method Invocation to execute a method with positional and keyword arguments.
Scheduler¶
- 
exception 
bndl.execute.scheduler.FailedDependency(worker_failed=None)[source]¶ Bases:
ExceptionException to be raised by task (i.e. returned from task.exception for tasks which have failed) to indicate that the task has been marked as failed post-hoc by the execution of another task.
The worker_failed argument indicates whether the worker which executed the task should be considered lost or not.
- 
class 
bndl.execute.scheduler.Scheduler(tasks, done, workers, concurrency=1, attempts=1)[source]¶ Bases:
objectThis scheduler executes Tasks taking into account their dependencies and worker locality.
- Worker assignment takes into account:
 - concurrency (how many tasks must a worker execute concurrently)
 - and worker locality (0 is indifferent, -1 is forbidden, 1+ increasing locality) as locality 0 is likely to be common, this is assumed throughout the scheduler to reduce the memory cost for scheduling
 
The most important component in the computational complexity of the scheduler is the number of dependencies to track. Many-to-many dependencies should be kept to the thousands or tens of thousands (i.e. 100 * 100 tasks). Such issues can be resolved by introducing a ‘barrier task’ as is done in bndl.compute (this reduced the number of dependencies to n+m instead of n*m).
Exceptions¶
- 
exception 
bndl.execute.exceptions.TaskCancelled[source]¶ Bases:
ExceptionException raised in a worker when a task is to be cancelled (preempted) by the driver.
This exception is raised through use of PyThreadState_SetAsyncExc.
- 
exception 
bndl.execute.exceptions.DependenciesFailed(failures)[source]¶ Bases:
ExceptionIndicate that a task failed due to dependencies not being ‘available’. This will cause the dependencies to be re-executed and the task which raises DependenciesFailed will be scheduled to execute once the dependencies complete.
The failures attribute is a mapping from worker names (strings) to a sequence of task_ids which have failed.
Worker¶
Profiling¶
- 
class 
bndl.execute.profile.CpuProfiling(ctx)[source]¶ Bases:
objectPerform CPU profiling across the cluster with
yappi.- 
print_stats(max_rows=100, sort_by=None, sort_dir=None, columns=(('name', 84), ('ncall', 12), ('tsub', 8), ('ttot', 8), ('tavg', 8)), per_worker=False, strip_dirs=True, include=(), exclude=(), file=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]¶ Print the top CPU consumers.
Parameters: - max_rows (int) – The maximum number of rows to print. Set to None to print all.
 - sort_by (str or None) – The field to sort by, e.g. ‘subtime’ or ‘tottime’
 - sort_dir ('asc', 'desc' or None) – Sort direction (ascending or descending).
 - (sequence of (name (columns) – str, width:int) tuples): The names and widths (in characters) of the columns to print.
 - per_worker (bool) – Whether to print per worker individualy or to print the totals. Defaults to printing totals.
 - strip_dirs (bool) – Whether to strip directories (only show packages / modules and line numbers). Defaults to True.
 - include (str sequence) – Limit stats to these (root) modules. E.g. ‘bndl’ will yield any modules under bndl, e.g. bndl.util. Requires that strip_dirs is True.
 - exclude (str sequence) – Filter out these (root) modules. E.g. ‘bndl’ will yield any modules except thos under bndl, e.g. bndl.util. Requires that strip_dirs is True.
 - file (fileobj) – Where to print to. Defaults to sys.stdout.
 
- 
 
- 
class 
bndl.execute.profile.MemoryProfiling(ctx)[source]¶ Bases:
objectPerform memory profiling on the cluster with
tracemalloc.- 
print_top(group_by='lineno', limit=30, compare_to=None, per_worker=False, strip_dirs=True, include=(), exclude=(), file=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]¶ Take a snapshot across the cluster and print the top memory allocations.
Parameters: - group_by (str) – Defaults to ‘lineno’.
 - limit (int) – The number of lines to print. Defaults to 30.
 - compare_to (snapshot) – A snapshot to compare the memory usage to.
 - per_worker (bool) – Whether to print the top per worker or not. Defaults to False.
 - strip_dirs (bool) – Whether to strip directories. Defaults to True.
 - include (sequence[str]) – The modules to include.
 - exclude (sequence[str]) – The modules to exclude.
 - file (fileobj) – The file to print to. Defaults to 
sys.stdout. 
-