Datasets and partitions

class bndl.compute.dataset.Dataset(ctx, src=None, dset_id=None)[source]

Bases: object

map(func, *args, **kwargs)[source]

Transform elements in this dataset one by one.

Parameters:func – callable(element) applied to each element of the dataset

Any extra *args or **kwargs are passed to func (args before element).

starmap(func, *args, **kwargs)[source]

Variadic form of map.

Parameters:func – callable(element) applied to each element of the dataset

Any extra *args or **kwargs are passed to func (args before element).

pluck(ind, default=None)[source]

Pluck indices from each of the elements in this dataset.

Parameters:
  • ind – obj or list The indices to pluck with.
  • default – obj A default value.

For example:

>>> ctx.collection(['abc']*10).pluck(1).collect()
['b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b']
>>> ctx.collection(['abc']*10).pluck([1,2]).collect()
[('b', 'c'), ('b', 'c'), ('b', 'c'), ('b', 'c'), ('b', 'c'), ('b', 'c'), ('b', 'c'), ('b', 'c'), ('b', 'c'), ('b', 'c')]
flatmap(func=None, *args, **kwargs)[source]

Transform the elements in this dataset into iterables and chain them within each of the partitions.

Parameters:func – callable(element) The transformation to apply. Defaults to none; i.e. consider the elements in this the iterables to chain.

Any extra *args or **kwargs are passed to func (args before element).

For example:

>>> ''.join(ctx.collection(['abc']*10).flatmap().collect())
'abcabcabcabcabcabcabcabcabcabc'

or:

>>> import string
>>> ''.join(ctx.range(5).flatmap(lambda i: string.ascii_lowercase[i-1]*i).collect())
'abbcccdddd'
map_partitions(func, *args, **kwargs)[source]

Transform the partitions of this dataset.

Parameters:func – callable(iterator) The transformation to apply.

Any extra *args or **kwargs are passed to func (args before element).

map_partitions_with_index(func, *args, **kwargs)[source]

Transform the partitions - with their index - of this dataset.

Parameters:func – callable(*args, index, iterator, **kwargs) The transformation to apply on the partition index and the iterator over the partition’s elements.

Any extra *args or **kwargs are passed to func (args before index and iterator).

map_partitions_with_part(func, *args, **kwargs)[source]

Transform the partitions - with the source partition object as argument - of this dataset.

Parameters:func – callable(*args, partition, iterator, **kwargs) The transformation to apply on the partition object and the iterator over the partition’s elements.

Any extra *args or **kwargs are passed to func (args before partition and iterator).

pipe(command, reader=<method 'readlines' of '_io._IOBase' objects>, writer=<method 'writelines' of '_io._IOBase' objects>, **opts)[source]

Transform partitions by sending partition data to an external program over stdout/stdin and read back its output.

Parameters:
  • command (str) – Command to execute (is ‘parsed’ by shlex.split).
  • reader (callable -> iterable) – Called with subprocess.Popen.stdout to read and iterate over the program’s output.
  • writer (callable) – Called with subprocess.Popen.stdin and the partition iterable to write the partition’s contents to the program. iterate over the program’s output.
  • **opts – Options to provide to subprocess.Popen.
glom()[source]

Transforms each partition into a partition with one element being the contents of the partition as a ‘stable iterable’ (e.g. a list).

See the bndl.util.collection.is_stable_iterable function for details on what constitutes a stable iterable.

Example:

>>> ctx.range(10, pcount=4).map_partitions(list).glom().collect()
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
concat(sep)[source]

Concattenate the elements in this dataset with sep

Parameters:sep (str, bytes or bytarray) – the value to use to concattenate.
filter(func=None, *args, **kwargs)[source]

Filter out elements from this dataset

Parameters:func (callable(element) – The test function to filter this dataset with. An element is retained in the dataset if the test is positive.

Any extra *args or **kwargs are passed to func (args before element).

starfilter(func, *args, **kwargs)[source]

Variadic form of Dataset.filter.

Parameters:func – callable(*element) The test function to filter this dataset with. An element is retained in the dataset if the test is positive. The element is provided as star args.

Any extra *args or **kwargs are passed to func (args before element).

mask_partitions(mask)[source]

TODO

key_by(key)[source]

Prepend the elements in this dataset with a key.

The resulting dataset will consist of K,V tuples.

Parameters:key – callable(element) The transformation of the element which, when applied, provides the key value.

Example:

>>> import string
>>> ctx.range(5).key_by(lambda i: string.ascii_lowercase[i]).collect()
[('a', 0), ('b', 1), ('c', 2), ('d', 3), ('e', 4)]
with_value(val)[source]

Create a dataset of K,V tuples with the elements of this dataset as K and V from the given value.

Parameters:val – callable(element) or obj If val is a callable, it will be applied to the elements of this dataset and the return values will be the values. If val is a plain object, it will be used as a constant value for each element.

Example

>>> ctx.collection('abcdef').with_value(1).collect()
[('a', 1), ('b', 1), ('c', 1), ('d', 1), ('e', 1), ('f', 1)]
key_by_id()[source]

Key the elements of this data set with a unique integer id.

Example

>>> ctx.collection(['a', 'b', 'c', 'd', 'e'], pcount=2).key_by_id().collect()
[(0, 'a'), (2, 'b'), (4, 'c'), (1, 'd'), (3, 'e')]
key_by_idx()[source]

Key the elements of this data set with their index.

This operation starts a job when the data set contains more than 1 partition to calculate offsets for each of the partitions. Use key_by_id or cache the data set to speed up processing.

Example

>>> ctx.collection(['a', 'b', 'c', 'd', 'e']).key_by_idx().collect()
[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e')]
keys()[source]

Pluck the keys from this dataset.

Example

>>> ctx.collection([('a', 1), ('b', 2), ('c', 3)]).keys().collect()
['a', 'b', 'c']
values()[source]

Pluck the values from this dataset.

Example

>>> ctx.collection([('a', 1), ('b', 2), ('c', 3)]).keys().collect()
[1, 2, 3]
map_keys(func, *args, **kwargs)[source]

Transform the keys of this dataset.

Parameters:func – callable(key) Transformation to apply to the keys
map_values(func, *args, **kwargs)[source]

Transform the values of this dataset.

Parameters:func – callable(value) Transformation to apply to the values
pluck_values(ind, default=None)[source]

Pluck indices from each of the values in this dataset of (key, value) pairs.

Parameters:
  • ind – obj or list The indices to pluck with.
  • default – obj A default value.
flatmap_values(func=None, *args, **kwargs)[source]
Parameters:func – callable(value) or None The callable which flattens the values of this dataset or None in order to use the values as iterables to flatten.
filter_bykey(func=None, *args, **kwargs)[source]

Filter the dataset by testing the keys.

Parameters:func – callable(key) The test to apply to the keys. When positive, the key, value tuple will be retained.
filter_byvalue(func=None, *args, **kwargs)[source]

Filter the dataset by testing the values.

Parameters:func – callable(value) The test to apply to the values. When positive, the key, value tuple will be retained.
nlargest(num, key=None)[source]

Take the num largest elements from this dataset.

Parameters:
  • num – int The number of elements to take.
  • key – callable(element) or None The (optional) key to apply when ordering elements.
nsmallest(num, key=None)[source]

Take the num smallest elements from this dataset.

Parameters:
  • num – int The number of elements to take.
  • key – callable(element) or None The (optional) key to apply when ordering elements.
nsmallest_by_key(num, key=None, **shuffle_opts)[source]

Shuffle and keep only the num smallest elements for each key.

nlargest_by_key(num, key=None, **shuffle_opts)[source]

Shuffle and keep only the num largest elements for each key.

histogram(bins=10)[source]

Compute the histogram of a data set.

Parameters:bins – int or sequence The bins to use in computing the histogram; either an int to indicate the number of bins between the minimum and maximum of this data set, or a sorted sequence of unique numbers to be used as edges of the bins.
Returns:A (np.array, np.array) tuple where the first array is the histogram and the second array the (edges of the) bins.

The function behaves similarly to numpy.histogram, but only supports counts per bin (no weights or density/normalization). The resulting histogram and bins should match numpy.histogram very closely.

Example

>>> ctx.collection([1, 2, 1]).histogram([0, 1, 2, 3])
(array([0, 2, 1]), array([0, 1, 2, 3]))
>>> ctx.range(4).histogram(np.arange(5))
(array([1, 1, 1, 1]), array([0, 1, 2, 3, 4]))
>>> ctx.range(4).histogram(5)
(array([1, 1, 0, 1, 1]), array([ 0. ,  0.6,  1.2,  1.8,  2.4,  3. ]))
>>> ctx.range(4).histogram()
(array([1, 0, 0, 1, 0, 0, 1, 0, 0, 1]),
 array([ 0. ,  0.3,  0.6,  0.9,  1.2,  1.5,  1.8,  2.1,  2.4,  2.7,  3. ]))
>>> dset = ctx.collection([1,2,1,3,2,4])
>>> hist, bins = dset.histogram()
>>> hist
array([2, 0, 0, 2, 0, 0, 1, 0, 0, 1])
>>> hist.sum() == dset.count()
True
aggregate(local, comb=None)[source]

Collect an aggregate of this dataset, where the aggregate is determined by a local aggregation and a global combination.

Parameters:
  • local – callable(partition) Function to apply on the partition iterable
  • comb – callable Function to combine the results from local. If None, the local callable will be applied.
combine(zero, merge_value, merge_combs)[source]

Aggregate the dataset by merging element-wise starting with a zero value and finally merge the intermediate results.

Parameters:
  • zero – obj The object to merge values into.
  • merge_value – The operation to merge an object into intermediate value (which initially is the zero value).
  • merge_combs – The operation to pairwise combine the intermediate values into one final value.

Example

>>> strings = ctx.range(1000*1000).map(lambda i: i%1000).map(str)
>>> sorted(strings.combine(set(), lambda s, e: s.add(e) or s, lambda a, b: a|b)))
['0',
 '1',
 ...
 '998',
 '999']
reduce(reduction)[source]

Reduce the dataset into a final element by applying a pairwise reduction as with functools.reduce(...)

Parameters:reduction – The reduction to apply.

Example

>>> ctx.range(100).reduce(lambda a,b: a+b)
4950
tree_aggregate(local, comb=None, depth=2, scale=None, **shuffle_opts)[source]

Tree-wise aggregation by first applying local on each partition and subsequently shuffling the data across workers in depth rounds and for each round aggregating the data by applying comb.

Parameters:
  • local – func(iterable) The aggregation function to apply to each partition.
  • comb – The function to apply in order to combine aggregated partitions.
  • depth – The number of iterations to apply the aggregation in.
  • scale – int or None (default) The factor by which to reduce the partition count in each round. If None, the step is chosen such that each reduction of intermediary results is roughly of the same size (the branching factor in the tree is the same across the entire tree).
tree_combine(zero, merge_value, merge_combs, **kwargs)[source]

Tree-wise version of Dataset.combine. See Dataset.tree_aggregate for details.

tree_reduce(reduction, **kwargs)[source]

Tree-wise version of Dataset.reduce. See Dataset.tree_aggregate for details.

count()[source]

Count the elements in this dataset.

sum()[source]

Sum the elements in this dataset.

Example

>>> ctx.collection(['abc', 'def', 'ghi']).map(len).sum()
9
max(key=None)[source]

Take the largest element of this dataset.

Parameters:
  • key (callable(element) – The (optional) key to apply in comparing element. If
  • is an object, it is used to pluck from the element with the given to get the (key) –
  • key. (comparison) –

Example:

>>> ctx.range(10).max()
9
>>> ctx.range(10).with_value(1).max(0)
(9, 1)
>>> ctx.range(10).map(lambda i: dict(key=i, val=-i)).max('val')
{'val': 0, 'key': 0}
min(key=None)[source]

Take the smallest element of this dataset.

Parameters:
  • key (callable(element) – The (optional) key to apply in comparing element. If
  • is an object, it is used to pluck from the element with the given to get the (key) –
  • key. (comparison) –
mean()[source]

Calculate the mean of this dataset.

stats()[source]

Calculate count, mean, min, max, variance, stdev, skew and kurtosis of this dataset.

mvstats(width=None)[source]

Calculate count and the multivariate mean, min, max, variance, stdev, skew and kurtosis of this dataset.

Parameters:width (int) – The width of the vectors in the dataset to calculate the statistics on. If no width is given, the width is determined through peeking at the first record.
union(other, *others)[source]

Union this dataset with another

Parameters:other – Dataset

Example:

>>> ctx.range(0, 5).union(ctx.range(5, 10)).collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
group_by(key, partitioner=None, pcount=None, **shuffle_opts)[source]

Group the dataset by a given key function.

Parameters:
  • key – callable(element) or obj The callable producing the key to group on or an index / indices for plucking the key from the elements.
  • partitioner – callable(element) A callable producing an integer which is used to determine to which partition the group is assigned.
  • pcount – The number of partitions to group into.

Example

>>> ctx.range(10).group_by(lambda i: i%2).collect()
[(0, [0, 6, 8, 4, 2]), (1, [1, 3, 7, 9, 5])]
group_by_key(partitioner=None, pcount=None, **shuffle_opts)[source]

Group a K, V dataset by K.

Parameters:
  • partitioner – callable The (optional) partitioner to apply.
  • pcount – The number of partitions to group into.
aggregate_by_key(local, comb=None, partitioner=None, pcount=None, **shuffle_opts)[source]

Aggregate the values in a K, V1 dataset into a dataset of K, V2.

Parameters:
  • local – callable(iterable[V1]) A callable which returns the aggregation V2 for the values of a key.
  • comb – callable(iterable[V2]): V2 (optional) A callable which performs a per key aggregation of the V2’s generated by local. Defaults to local.
  • partitioner – The (optional) partitioner to apply.
  • pcount – The number of partitions to combine into.
combine_by_key(create, merge_value, merge_combs, partitioner=None, pcount=None, **shuffle_opts)[source]

Combine the values in a K, V1 dataset into a dataset of K, V2.

Parameters:
  • create – callable(V1) A callable which returns the initial V2 for the value’s key.
  • merge_value – callable(V2, V1): V2 A callable which merges a V1 into a V2.
  • merge_combs – callable(V2, V2) A callable which merges two V2’s.
  • partitioner – The (optional) partitioner to apply.
  • pcount – The number of partitions to combine into.
reduce_by_key(reduction, partitioner=None, pcount=None, **shuffle_opts)[source]

Reduce the values of a K, V dataset.

Parameters:
  • reduction – callable(v, v) The reduction to apply.
  • partitioner – The (optional) partitioner to apply.
  • pcount – The number of partitions to reduce into.

Example

>>> ctx.range(12).map(lambda i: (i%3, 1)).reduce_by_key(lambda a, b: a+b).collect()
[(0, 4), (1, 4), (2, 4)]
join(other, key=None, partitioner=None, pcount=None, **shuffle_opts)[source]

Join two datasets.

Parameters:
  • other – The dataset to join with.
  • key – callable(element) or object The callable which returns the join key or an object used as index to get the join key from the elements in the datasets to join.
  • partitioner – The (optional) partitioner to apply.
  • pcount – The number of partitions to join into.

Example:

>>> ctx.range(0, 5).key_by(lambda i: i%2).join(ctx.range(5, 10).key_by(lambda i: i%2)).collect()
[(0, [(0, 8), (0, 6), (2, 8), (2, 6), (4, 8), (4, 6)]),
 (1, [(1, 5), (1, 9), (1, 7), (3, 5), (3, 9), (3, 7)])]
distinct(pcount=None, key=None, **shuffle_opts)[source]

Select the distinct elements from this dataset.

Parameters:pcount – The number of partitions to shuffle into.

Example

>>> sorted(ctx.range(10).map(lambda i: i%2).distinct().collect())
[0, 1]
count_distinct(pcount=None, **shuffle_opts)[source]

Count the distinct elements in this Dataset.

count_distinct_approx(error_rate=0.05)[source]

Approximate the count of distinct elements in this Dataset through the hyperloglog++ algorithm based on https://github.com/svpcom/hyperloglog.

Parameters:error_rate – float The absolute error / cardinality
count_by_value(depth=1, **shuffle_opts)[source]

Count the occurrence of each distinct value in the data set.

sort(key=None, reverse=False, pcount=None, hd_distribution=False, **shuffle_opts)[source]

Sort the elements in this dataset.

Parameters:
  • key – callable or obj A callable which returns the sort key or an object which is the index in the elements for getting the sort key.
  • reverse – bool If True perform a sort in descending order, or False to sort in ascending order.
  • pcount – Optionally the number of partitions to sort into.

Example

>>> ''.join(ctx.collection('asdfzxcvqwer').sort().collect())
'acdefqrsvwxz'
>>> ctx.range(5).map(lambda i: dict(a=i-2, b=i)).sort(key='a').collect()
[{'b': 0, 'a': -2}, {'b': 1, 'a': -1}, {'b': 2, 'a': 0}, {'b': 3, 'a': 1}, {'b': 4, 'a': 2}]
>>> ctx.range(5).key_by(lambda i: i-2).sort(key=1).sort().collect()
[(-2, 0), (-1, 1), (0, 2), (1, 3), (2, 4)]
shuffle(pcount=None, partitioner=None, bucket=None, key=None, comb=None, sort=None, **opts)[source]

Todo

Document shuffle

zip(other)[source]

Zip the elements of another data set with the elements of this data set.

Parameters:other – bndl.compute.dataset.Dataset The other data set to zip with.

Example

>>> ctx.range(0,10).zip(ctx.range(10,20)).collect()
[(0, 10), (1, 11), (2, 12), (3, 13), (4, 14), (5, 15), (6, 16), (7, 17), (8, 18), (9, 19)]
zip_partitions(other, comb)[source]

Zip the partitions of another data set with the partitions of this data set.

Parameters:
  • other – bndl.compute.dataset.Dataset The other data set to zip the partitions of with the partitions of this data set.
  • comb – func(iterable, iterable) The function which combines the data of the partitions from this and the other data sets.

Example

>>> ctx.range(0,10).zip_partitions(ctx.range(10,20), lambda a, b: zip(a,b)).collect()
[(0, 10), (1, 11), (2, 12), (3, 13), (4, 14), (5, 15), (6, 16), (7, 17), (8, 18), (9, 19)]
sample(fraction, with_replacement=False, seed=None)[source]

Todo

Document sample

take_sample(num, with_replacement=False, seed=None, count=None)[source]

based on https://github.com/apache/spark/blob/master/python/pyspark/rdd.py#L425

collect(parts=False, ordered=True)[source]

Collect the dataset as list.

Parameters:
  • parts (bool) – Collect the individual partitions (True) into a list with an element per partition or a ‘flattened’ list of the elements in the dataset (False).
  • ordered (bool) – Collect partitions (and thus their elements) in order or not.
collect_as_map(parts=False)[source]

Collect a dataset of key-value pairs as dict.

collect_as_set()[source]

Collect the elements of the dataset into a set.

collect_as_pickles(directory=None, compress=None)[source]

Collect each partition as a pickle file into directory

collect_as_json(directory=None, compress=None)[source]

Collect each partition as a line separated json file into directory.

collect_as_files(directory=None, ext='', mode='b', compress=None)[source]

Collect each element in this data set into a file into directory.

Parameters:
  • directory – str The directory to save this data set to.
  • ext – The extenion of the files.
  • compress – None or ‘gzip’ Whether to compress.
icollect(parts=False, ordered=True)[source]

An iterable version of collect() which can be cheaper memory-wise and faster in terms of latency (especially of ordered=False).

first()[source]

Take the first element from this dataset.

take(num)[source]

Take the first num elements from this dataset.

itake(num)[source]

Take the first num elements from this dataset as iterator.

require_workers(workers_required)[source]

Create a clone of this dataset which is only computable on the workers which are returned by the workers_required argument.

Parameters:workers_required – function(iterable[PeerNode]): -> iterable[PeerNode] A function which is given an iterable of workers (PeerNodes) which is to return only those which are allowed to compute this dataset.
require_local_workers()[source]

Require that the dataset is computed on the same node as the driver.

allow_all_workers()[source]

Create a clone of this dataset which resets the worker filter set by require_workers if any.

cache(location='memory', serialization=None, compression=None, provider=None)[source]

Cache the dataset in the workers. Each partition is cached in the memory / on the disk of the worker which computed it.

Parameters:
  • location (str) – ‘memory’ or ‘disk’.
  • serialization (str) – The serialization format must be one of ‘json’, ‘marshal’, ‘pickle’, ‘msgpack’, ‘text’, ‘binary’ or None to cache the data unserialized.
  • compression (str) – ‘gzip’ or None
  • provider (CacheProvider) – Ignore location, serialization and compression and use this custom CacheProvider.
class bndl.compute.dataset.Partition(dset, idx, src=None)[source]

Bases: object

locality(workers)[source]

Determine locality of computing this partition at the given workers.

Returns:a generator of worker, locality pairs.
class bndl.compute.dataset.TransformingDataset(src, *funcs)[source]

Bases: bndl.compute.dataset.Dataset

class bndl.compute.dataset.UnionDataset(src)[source]

Bases: bndl.compute.dataset._MultiSourceDataset

class bndl.compute.dataset.CartesianProductDataset(src)[source]

Bases: bndl.compute.dataset._MultiSourceDataset