Execute

The bndl.execute module provides a means to execute Jobs on a cluster of workers.

Jobs consists of Tasks which are interconnected in a directed acyclic graph (traversable in both directions). bndl.execute provides a Scheduler which executes jobs and ensure that tasks are executed in dependency order, tasks are executed with locality (if any) to a worker and tasks are restarted (if configured), also when a (possibly indirect) dependent task marks a dependency as failed.

Execute context

class bndl.execute.context.ExecutionContext(node, config=<Conf {}>)[source]

Bases: bndl.util.lifecycle.Lifecycle

Entry point for executing Jobs across workers.

execute(job, workers=None, order_results=True, concurrency=None, attempts=None)[source]

Execute a Job on workers and get the results of each Task as it is executed.

Parameters:
  • job (bndl.execute.job.Job) – The job to execute.
  • workers (sequence) – A sequence of RMIPeerNodes peer nodes to execute the job on.
  • order_results (bool) – Whether the results of the task are to be yielded in order or not (defaults to True).
  • concurrency (int >= 1) – The number of tasks to execute concurrently on each worker. Defaults to the bndl.execute.concurrency configuration parameter.
  • attempts (int >= 1) – The maximum number of attempts per task (not counting worker failure as induced from a task failing with NotConnected or a task marked as failed through bndl.execute.exceptions.DependenciesFailed). Defaults to the bndl.execute.attempts configuration parameter.
await_workers(worker_count=None, connect_timeout=5, stable_timeout=60)[source]

Waits for workers to be available. If not in connect_timeout a RuntimeError is raised. Once a worker is found, at most stable_timeout seconds will be waited for the cluster to settle. That is, until no new workers are discovered / the worker count is stable.

Parameters:
  • worker_count (float or None) – The expected worker count. When connected to exactly worker_count workers, this method will return faster.
  • connect_timeout (float) – Maximum time in seconds waited until the first worker is discovered.
  • stable_timeout (float) – Maximum time in seconds waited until no more workers are discovered.
worker_count

The number of workers connected with

workers

All peers of the local node with node_type worker.

cpu_profiling

Perform CPU profiling across the cluster with yappi.

memory_profiling

Perform memory profiling on the cluster with tracemalloc.

Jobs and tasks

class bndl.execute.job.Job(ctx, tasks, name=None, desc=None)[source]

Bases: bndl.util.lifecycle.Lifecycle

A set of Tasks which can be executed on a cluster of workers.

class bndl.execute.job.Task(ctx, task_id, *, priority=None, name=None, desc=None, group=None)[source]

Bases: bndl.util.lifecycle.Lifecycle

Execution of a Task on a worker is the basic unit of scheduling in bndl.execute.

execute(scheduler, worker)[source]

Execute the task on a worker. The scheduler is provided as ‘context’ for the task.

cancel()[source]

Cancel execution (if not already done) of this task.

locality(workers)[source]

Indicate locality for executing this task on workers.

Parameters:workers (sequence[worker]) – The workers to determine the locality for.
Returns:Sequence[ – A sequence of worker - locality tuples. 0 is indifferent and can be skipped, -1 is forbidden, 1+ increasing locality.
Return type:worker, locality
started

Whether the task has started

done

Whether the task has completed execution

set_executing(worker)[source]

Utility for sub-classes to register the task as executing on a worker.

mark_done(result=None)[source]

‘ Externally’ mark the task as done. E.g. because its ‘side effect’ (result) is already available).

mark_failed(exc)[source]

‘Externally’ mark the task as failed. E.g. because the worker which holds the tasks’ result has failed / can’t be reached.

result()[source]

Get the result of the task (blocks). Raises an exception if the task failed with one.

exception()[source]

Get the exception of the task (blocks).

executed_on_last()[source]

The name of the worker this task executed on last (if any).

release()[source]

Release most resources of the task. Invoked after a job’s execution is complete.

class bndl.execute.job.RmiTask(ctx, task_id, method, args=(), kwargs=None, *, priority=None, name=None, desc=None, group=None)[source]

Bases: bndl.execute.job.Task

A task which performs a Remote Method Invocation to execute a method with positional and keyword arguments.

Scheduler

exception bndl.execute.scheduler.FailedDependency(worker_failed=None)[source]

Bases: Exception

Exception to be raised by task (i.e. returned from task.exception for tasks which have failed) to indicate that the task has been marked as failed post-hoc by the execution of another task.

The worker_failed argument indicates whether the worker which executed the task should be considered lost or not.

class bndl.execute.scheduler.Scheduler(tasks, done, workers, concurrency=1, attempts=1)[source]

Bases: object

This scheduler executes Tasks taking into account their dependencies and worker locality.

Worker assignment takes into account:
  • concurrency (how many tasks must a worker execute concurrently)
  • and worker locality (0 is indifferent, -1 is forbidden, 1+ increasing locality) as locality 0 is likely to be common, this is assumed throughout the scheduler to reduce the memory cost for scheduling

The most important component in the computational complexity of the scheduler is the number of dependencies to track. Many-to-many dependencies should be kept to the thousands or tens of thousands (i.e. 100 * 100 tasks). Such issues can be resolved by introducing a ‘barrier task’ as is done in bndl.compute (this reduced the number of dependencies to n+m instead of n*m).

task_done(task)[source]

When a task completes, delete it from pending, add it to done and set dependent tasks as executable if this task was the last dependency. Reschedule failed tasks or abort scheduling if failed to often.

Exceptions

exception bndl.execute.exceptions.TaskCancelled[source]

Bases: Exception

Exception raised in a worker when a task is to be cancelled (preempted) by the driver.

This exception is raised through use of PyThreadState_SetAsyncExc.

exception bndl.execute.exceptions.DependenciesFailed(failures)[source]

Bases: Exception

Indicate that a task failed due to dependencies not being ‘available’. This will cause the dependencies to be re-executed and the task which raises DependenciesFailed will be scheduled to execute once the dependencies complete.

The failures attribute is a mapping from worker names (strings) to a sequence of task_ids which have failed.

Worker

Profiling

class bndl.execute.profile.CpuProfiling(ctx)[source]

Bases: object

Perform CPU profiling across the cluster with yappi.

start()[source]

Stop CPU profiling.

stop()[source]

Stop CPU profiling.

print_stats(max_rows=100, sort_by=None, sort_dir=None, columns=(('name', 84), ('ncall', 12), ('tsub', 8), ('ttot', 8), ('tavg', 8)), per_worker=False, strip_dirs=True, include=(), exclude=(), file=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Print the top CPU consumers.

Parameters:
  • max_rows (int) – The maximum number of rows to print. Set to None to print all.
  • sort_by (str or None) – The field to sort by, e.g. ‘subtime’ or ‘tottime’
  • sort_dir ('asc', 'desc' or None) – Sort direction (ascending or descending).
  • (sequence of (name (columns) – str, width:int) tuples): The names and widths (in characters) of the columns to print.
  • per_worker (bool) – Whether to print per worker individualy or to print the totals. Defaults to printing totals.
  • strip_dirs (bool) – Whether to strip directories (only show packages / modules and line numbers). Defaults to True.
  • include (str sequence) – Limit stats to these (root) modules. E.g. ‘bndl’ will yield any modules under bndl, e.g. bndl.util. Requires that strip_dirs is True.
  • exclude (str sequence) – Filter out these (root) modules. E.g. ‘bndl’ will yield any modules except thos under bndl, e.g. bndl.util. Requires that strip_dirs is True.
  • file (fileobj) – Where to print to. Defaults to sys.stdout.
class bndl.execute.profile.MemoryProfiling(ctx)[source]

Bases: object

Perform memory profiling on the cluster with tracemalloc.

start()[source]

Start memory profiling.

stop()[source]

Stop memory profiling.

print_top(group_by='lineno', limit=30, compare_to=None, per_worker=False, strip_dirs=True, include=(), exclude=(), file=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Take a snapshot across the cluster and print the top memory allocations.

Parameters:
  • group_by (str) – Defaults to ‘lineno’.
  • limit (int) – The number of lines to print. Defaults to 30.
  • compare_to (snapshot) – A snapshot to compare the memory usage to.
  • per_worker (bool) – Whether to print the top per worker or not. Defaults to False.
  • strip_dirs (bool) – Whether to strip directories. Defaults to True.
  • include (sequence[str]) – The modules to include.
  • exclude (sequence[str]) – The modules to exclude.
  • file (fileobj) – The file to print to. Defaults to sys.stdout.