Execute¶
The bndl.execute
module provides a means to execute Jobs
on a
cluster of workers.
Jobs
consists of Tasks
which are
interconnected in a directed acyclic graph (traversable in both directions). bndl.execute
provides a Scheduler
which executes jobs and ensure
that tasks are executed in dependency order, tasks are executed with locality (if any) to a worker
and tasks are restarted (if configured), also when a (possibly indirect) dependent task marks a
dependency as failed.
Execute context¶
-
class
bndl.execute.context.
ExecutionContext
(node, config=<Conf {}>)[source]¶ Bases:
bndl.util.lifecycle.Lifecycle
Entry point for executing
Jobs
across workers.-
execute
(job, workers=None, order_results=True, concurrency=None, attempts=None)[source]¶ Execute a
Job
on workers and get the results of eachTask
as it is executed.Parameters: - job (bndl.execute.job.Job) – The job to execute.
- workers (sequence) – A sequence of
RMIPeerNodes
peer nodes to execute the job on. - order_results (bool) – Whether the results of the task are to be yielded in order or not (defaults to True).
- concurrency (int >= 1) – The number of tasks to execute concurrently on each worker.
Defaults to the
bndl.execute.concurrency
configuration parameter. - attempts (int >= 1) – The maximum number of attempts per task (not counting worker failure
as induced from a task failing with NotConnected or a task marked as failed through
bndl.execute.exceptions.DependenciesFailed
). Defaults to thebndl.execute.attempts
configuration parameter.
-
await_workers
(worker_count=None, connect_timeout=5, stable_timeout=60)[source]¶ Waits for workers to be available. If not in connect_timeout a RuntimeError is raised. Once a worker is found, at most stable_timeout seconds will be waited for the cluster to settle. That is, until no new workers are discovered / the worker count is stable.
Parameters: - worker_count (float or None) – The expected worker count. When connected to exactly worker_count workers, this method will return faster.
- connect_timeout (float) – Maximum time in seconds waited until the first worker is discovered.
- stable_timeout (float) – Maximum time in seconds waited until no more workers are discovered.
-
worker_count
¶ The number of workers connected with
-
workers
¶ All peers of the local node with
node_type
worker.
-
cpu_profiling
¶ Perform CPU profiling across the cluster with
yappi
.
-
memory_profiling
¶ Perform memory profiling on the cluster with
tracemalloc
.
-
Jobs and tasks¶
-
class
bndl.execute.job.
Job
(ctx, tasks, name=None, desc=None)[source]¶ Bases:
bndl.util.lifecycle.Lifecycle
A set of
Tasks
which can be executed on a cluster of workers.
-
class
bndl.execute.job.
Task
(ctx, task_id, *, priority=None, name=None, desc=None, group=None)[source]¶ Bases:
bndl.util.lifecycle.Lifecycle
Execution of a Task on a worker is the basic unit of scheduling in
bndl.execute
.-
execute
(scheduler, worker)[source]¶ Execute the task on a worker. The scheduler is provided as ‘context’ for the task.
-
locality
(workers)[source]¶ Indicate locality for executing this task on workers.
Parameters: workers (sequence[worker]) – The workers to determine the locality for. Returns: Sequence[ – A sequence of worker - locality tuples. 0 is indifferent and can be skipped, -1 is forbidden, 1+ increasing locality. Return type: worker, locality
-
started
¶ Whether the task has started
-
done
¶ Whether the task has completed execution
-
set_executing
(worker)[source]¶ Utility for sub-classes to register the task as executing on a worker.
-
mark_done
(result=None)[source]¶ ‘ Externally’ mark the task as done. E.g. because its ‘side effect’ (result) is already available).
-
mark_failed
(exc)[source]¶ ‘Externally’ mark the task as failed. E.g. because the worker which holds the tasks’ result has failed / can’t be reached.
-
-
class
bndl.execute.job.
RmiTask
(ctx, task_id, method, args=(), kwargs=None, *, priority=None, name=None, desc=None, group=None)[source]¶ Bases:
bndl.execute.job.Task
A task which performs a Remote Method Invocation to execute a method with positional and keyword arguments.
Scheduler¶
-
exception
bndl.execute.scheduler.
FailedDependency
(worker_failed=None)[source]¶ Bases:
Exception
Exception to be raised by task (i.e. returned from task.exception for tasks which have failed) to indicate that the task has been marked as failed post-hoc by the execution of another task.
The worker_failed argument indicates whether the worker which executed the task should be considered lost or not.
-
class
bndl.execute.scheduler.
Scheduler
(tasks, done, workers, concurrency=1, attempts=1)[source]¶ Bases:
object
This scheduler executes Tasks taking into account their dependencies and worker locality.
- Worker assignment takes into account:
- concurrency (how many tasks must a worker execute concurrently)
- and worker locality (0 is indifferent, -1 is forbidden, 1+ increasing locality) as locality 0 is likely to be common, this is assumed throughout the scheduler to reduce the memory cost for scheduling
The most important component in the computational complexity of the scheduler is the number of dependencies to track. Many-to-many dependencies should be kept to the thousands or tens of thousands (i.e. 100 * 100 tasks). Such issues can be resolved by introducing a ‘barrier task’ as is done in bndl.compute (this reduced the number of dependencies to n+m instead of n*m).
Exceptions¶
-
exception
bndl.execute.exceptions.
TaskCancelled
[source]¶ Bases:
Exception
Exception raised in a worker when a task is to be cancelled (preempted) by the driver.
This exception is raised through use of PyThreadState_SetAsyncExc.
-
exception
bndl.execute.exceptions.
DependenciesFailed
(failures)[source]¶ Bases:
Exception
Indicate that a task failed due to dependencies not being ‘available’. This will cause the dependencies to be re-executed and the task which raises DependenciesFailed will be scheduled to execute once the dependencies complete.
The failures attribute is a mapping from worker names (strings) to a sequence of task_ids which have failed.
Worker¶
Profiling¶
-
class
bndl.execute.profile.
CpuProfiling
(ctx)[source]¶ Bases:
object
Perform CPU profiling across the cluster with
yappi
.-
print_stats
(max_rows=100, sort_by=None, sort_dir=None, columns=(('name', 84), ('ncall', 12), ('tsub', 8), ('ttot', 8), ('tavg', 8)), per_worker=False, strip_dirs=True, include=(), exclude=(), file=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]¶ Print the top CPU consumers.
Parameters: - max_rows (int) – The maximum number of rows to print. Set to None to print all.
- sort_by (str or None) – The field to sort by, e.g. ‘subtime’ or ‘tottime’
- sort_dir ('asc', 'desc' or None) – Sort direction (ascending or descending).
- (sequence of (name (columns) – str, width:int) tuples): The names and widths (in characters) of the columns to print.
- per_worker (bool) – Whether to print per worker individualy or to print the totals. Defaults to printing totals.
- strip_dirs (bool) – Whether to strip directories (only show packages / modules and line numbers). Defaults to True.
- include (str sequence) – Limit stats to these (root) modules. E.g. ‘bndl’ will yield any modules under bndl, e.g. bndl.util. Requires that strip_dirs is True.
- exclude (str sequence) – Filter out these (root) modules. E.g. ‘bndl’ will yield any modules except thos under bndl, e.g. bndl.util. Requires that strip_dirs is True.
- file (fileobj) – Where to print to. Defaults to sys.stdout.
-
-
class
bndl.execute.profile.
MemoryProfiling
(ctx)[source]¶ Bases:
object
Perform memory profiling on the cluster with
tracemalloc
.-
print_top
(group_by='lineno', limit=30, compare_to=None, per_worker=False, strip_dirs=True, include=(), exclude=(), file=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]¶ Take a snapshot across the cluster and print the top memory allocations.
Parameters: - group_by (str) – Defaults to ‘lineno’.
- limit (int) – The number of lines to print. Defaults to 30.
- compare_to (snapshot) – A snapshot to compare the memory usage to.
- per_worker (bool) – Whether to print the top per worker or not. Defaults to False.
- strip_dirs (bool) – Whether to strip directories. Defaults to True.
- include (sequence[str]) – The modules to include.
- exclude (sequence[str]) – The modules to exclude.
- file (fileobj) – The file to print to. Defaults to
sys.stdout
.
-