Broadcast and Accumulate

class bndl.compute.accumulate.Accumulator(ctx, host, initial, accumulator_id=None)[source]

Bases: object

A value on which commutative and associative operations can be performed from remote workers.

bndl.compute.broadcast.broadcast(ctx, value, serialization='auto', deserialization=None)[source]

Broadcast data to workers.

Parameters:
  • value (object) – The value to broadcast.
  • serialization (str) – The format to serialize the broadcast value into. Must be one of auto, pickle, marshal, json, binary or text.
  • deserialization (None or function(bytes) –

Data can be ‘shipped’ along to workers in the closure of e.g. a mapper function, but in that case the data is sent once for every partition (task to be precise). For ‘larger’ values this may be wasteful. Use this for instance with lookup tables of e.g. a MB or more.

Note that the broadcast data is loaded on each worker (but only if the broadcast variable is used). The machine running the workers should thus have enough memory to spare.

If deserialization is set serialization must not be set and value must be of type bytes. Otherwise serialize is used to serialize value and its natural deserialization counterpart is used (e.g. bytes.decode followed by json.loads for the ‘json’ serialization format).

Example usage:

>>> tbl = ctx.broadcast(dict(zip(range(4), 'abcd')))
>>> ctx.range(4).map(lambda i: tbl.value[i]).collect()
['a', 'b', 'c', 'd']