Execution

Computing Datasets (when an action is invoked on it) is performed by bndl.execute. A directed acyclic graph of data sets (one or more source data sets and the lineage of transformations and combinations) and their partitions is transformed into a directed acyclic graph of tasks.

These tasks are instances of ComputePartitionTask. Each partition in a graph of data sets is computed in one task. Partitions in data sets which are narrow transformations (e.g. map and filter) of their source data set are coalesced into a single task.

Datasets which are shuffled (e.g. sort, reduce_by_key, join) have all-to-all communication and thus can’t be coalesced into other tasks across the shuffle. For example

>>> ctx.range(10, pcount=4).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

will be computed in four tasks, as would:

>>> ctx.range(10, pcount=4).map(str).collect()
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

When introducing a shuffle, the tasks are split into two groups (stages). I.e.

>>> kv = ctx.range(10, pcount=4).map(lambda i: (i//3, 1))
>>> kv.aggregate_by_key(sum, pcount=2).map(str).collect()
['(0, 3)', '(2, 3)', '(1, 3)', '(3, 1)']

will result into six tasks: four ‘mapper’ tasks which would transform the four subranges of 0 through 9 into tuples of (i//3, 1) and ‘shuffle’ them into two buckets (and sort them by key), and two reducer tasks which would read from these buckets and take the sum over the values per key.

bndl.compute.dataset._compute_part(part, dependency_locations)[source]

Compute a partition; this method calls compute for a partition (which calls comppute on its source partition(s), reads data from some external source, from cache, from other workers, etc.). This method is the method to execute by bndl.compute.dataset.ComputePartitionTask (which delegates the RMI part to bndl.execute.jobs.RmiTask.

Parameters:
  • part – The partition to compute.
  • dependency_locations (mapping[str,object]) – A mapping of worker name to a sequence of partition ids executed by this worker.
class bndl.compute.dataset.ComputePartitionTask(part, **kwargs)[source]

Bases: bndl.execute.job.RmiTask

A RMI task to compute a partition (and it’s ‘narrow’ sources). It adds some dependency location tracking and communications as well as memorizing where a partition was computed and cached.

class bndl.compute.dataset.BarrierTask(*args, **kwargs)[source]

Bases: bndl.execute.job.Task

An ‘artificial’ task which disconnects ComputePartitionTasks in order to reduce computational complexity in bndl.execute.scheduler. As this scheduler tracks individual dependencies the scheduling overhead goes through the roof when even a moderate amount of tasks which depend on another set of tasks (which is the case for a shuffle). E.g. consider

ctx.range(1000, pcount=1000).map(lambda i: (i//10, i)).aggregate_by_key(sum).count()

This would result in 1000 mapper and 1000 reducer tasks and thus in 1.000.000 dependencies to be tracked. After introducing the BarrierTask, there are 1000 + 1000 + 1 tasks and 1000 + 1000 dependencies to track.