Compute Context

class bndl.compute.context.ComputeContext(node, config=<Conf {}>)[source]

Bases: bndl.execute.context.ExecutionContext

ComputeContext is the main ‘handle’ into a cluster of BNDL workers from the ‘driver’ node.

It provides a means to create partitioned distributed data sets (which can be then transformed and combined), broadcast data, create accumulators to collect data into, etc.

dense

Create numpy based distributed, partitioned, dense arrays. See dense.arrays.

default_pcount

The default number of partitions for data sets. This either the bndl.compute.pcount configuration value or the number of workers (ctx.worker_count) times the bndl.execute.concurrency configuration value (which defaults to 1).

collection(collection, pcount=None, psize=None)[source]

Create a data set from a python iterable (e.g. a list, dict or iterator).

Note that iterators are consumed immediately since data sets need to be deterministic in their output.

By default pcount (or ctx.default_pcount) partitions are created. pcount must not be set when psize is set.

Parameters:
  • collection (iterable) – The iterable to partition and distribute.
  • pcount (int or None) – The number of partition to build.
  • psize (int or None) – The maximum number of records per partition.

Example:

>>> ctx.collection(list('abcd')).nlargest(2)
['d', 'c']
range(start, stop=None, step=1, pcount=None)[source]

Create a distributed partitioned data set of a range of numbers.

Parameters:
  • start (int) – The start or stop value (if no stop value is given).
  • stop (int) – The stop value or None if start is the stop value.
  • step (int) – The step between each value in the range.
  • pcount (int) – The number of partitions to partition the range into.

Examples:

>>> ctx.range(10).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> ctx.range(10).glom().collect()
[range(0, 2), range(2, 5), range(5, 7), range(7, 10)]
>>> ctx.range(5, 20).glom().map(len).collect()
[3, 4, 4, 4]
>> ctx.range(5, 10).stats()
<Stats count=5, mean=7.0, min=5.0, max=9.0, var=2.0, stdev=1.4142135623730951, skew=0.0, kurt=-1.3>
accumulator(initial)[source]

Create an Accumulator with an initial value.

Parameters:initial – The initial value of the accumulator.

Example:

>>> from bndl.compute.run import ctx
>>> accum = ctx.accumulator(0)
>>> def mapper(i):
...     global accum
...     accum += i
...     return i
...
>>> ctx.range(10).sum()
45
>>> accum.value
0
>>> ctx.range(10).map(mapper).sum()
45
>>> accum.value
45
>>> ctx.range(10).map(mapper).sum()
45
>>> accum.value
90
broadcast(ctx, value, serialization='auto', deserialization=None)

Broadcast data to workers.

Parameters:
  • value (object) – The value to broadcast.
  • serialization (str) – The format to serialize the broadcast value into. Must be one of auto, pickle, marshal, json, binary or text.
  • deserialization (None or function(bytes) –

Data can be ‘shipped’ along to workers in the closure of e.g. a mapper function, but in that case the data is sent once for every partition (task to be precise). For ‘larger’ values this may be wasteful. Use this for instance with lookup tables of e.g. a MB or more.

Note that the broadcast data is loaded on each worker (but only if the broadcast variable is used). The machine running the workers should thus have enough memory to spare.

If deserialization is set serialization must not be set and value must be of type bytes. Otherwise serialize is used to serialize value and its natural deserialization counterpart is used (e.g. bytes.decode followed by json.loads for the ‘json’ serialization format).

Example usage:

>>> tbl = ctx.broadcast(dict(zip(range(4), 'abcd')))
>>> ctx.range(4).map(lambda i: tbl.value[i]).collect()
['a', 'b', 'c', 'd']
files(ctx, root, recursive=True, dfilter=None, ffilter=None, psize_bytes=None, psize_files=None, split=False, location='driver')

Create a Dataset out of files.

Parameters:
  • ctx – The ComputeContext
  • root – str, list If str, root is considered to be a file or directory name or a glob pattern (see glob.glob). If list, root is considered a list of filenames.
  • recursive – bool Whether to recursively search a (root) directory for files, defaults to True.
  • dfilter – function(dir_name) A function to filter out directories by name, return a trueish or falsy value to indicate whether to use or the directory or not.
  • ffilter – function(file_name) A function to filter out files by name, return a trueish or falsy value to indicate whether to use or the file or not.
  • psize_bytes – int or None The maximum number of bytes in a partition.
  • psize_files – int or None The maximum number of files in a partition.
  • split – bool or bytes If False, files will not be split to achieve partitions of max. size psize_bytes. If True, files will be split to achieve partitions of size psize_bytes; files will be split to fill each partition. If bytes, files will be split just after an occurrence of the given string, e.g. a newline.
  • location – str Use ‘driver’ to locate the files on the driver machine or use ‘workers’ to locate them on the worker machines. In the later case one worker per ip address will be selected and it scans the local directory. This requires root to be a str.

Todo

The members of dense.sources still don’t show signature but they are methods. Their signatures are rendered correctly in ipython with e.g. ctx.dense.array?`and also `help(ctx.dense.array) does an okay job ...

class bndl.compute.dense.arrays(extended)[source]

Bases: bndl.util.objects.ExtensionGroup

Create numpy based distributed, partitioned, dense arrays.

random
array

Distribute a numpy array across pcount partitions or in psize partitions. Note that the array is partitioned along axis 0.

Parameters:
  • arr (np.ndarray) – The array to partition and distribute.
  • pcount (int or None) – The number of partitions.
  • psize (int or None) – The maximum size of the partitions.
range

A distributed and partitioned np.arange like data set.

Parameters:
  • start (int) – The start or stop value (if no stop value is given).
  • stop (int) – The stop value or None if start is the stop value.
  • step (int) – The step between each value in the range.
  • dtype – The type of the elements in the array.
  • pcount (int) – The number of partitions to partition the range into.
empty

A distributed and partitioned version of np.empty.

zeros

A distributed and partitioned version of np.zeros.

ones

A distributed and partitioned version of np.ones.