Datasets¶
The key datastructure in BNDL Compute is the Dataset
. It is
a distributed partitioned collection of values. The Partitions
yield an iterable of values which may be arbitrary python objects (which in cases need to be
pickleable / serializable). Datasets can be specialized with optimized partitions e.g.
numpy.ndarray instances instead of lists. Datasets can be transformed, reduced (also key-wise),
combined, etc. See further Transformations.
BNDL root data¶
A pipepeline (or graph) of transformations on datasets need to start with one or more ‘root’ dataset. That is, a dataset which isn’t the product of another dataset, e.g. created from an external system.
Collections¶
Python iterables can be partitioned and distributed with
ctx.collection()
.
ctx.range()
provides a way to get a
distributed and partitioned range of integers. For example:
>>> ctx.collection('abcd').max()
'd'
>>> ctx.collection({'x':1, 'y':2, 'z': 3}).collect()
[('y', 2), ('x', 1), ('z', 3)]
>>> ctx.range(10).mean()
4.5
Files¶
Files on the filesystem can be read as BNDL Dataset with
ctx.files()
. This method can be given a path to a single
file, a list of filepaths or a path of a directory (which is scanned recursively by default). For
example:
>>> files = ctx.files('.')
>>> files.filecount
4461
>>>
Key-value pair datasets¶
Python as little to no support for typing variables, the return type of functions, nor for something like generics. PEP 484 may change this, but python is still far from a statically typed language.
Quite a few operators expect key value pairs, e.g.
reduce_by_key()
reduces a dataset key-wise.
I.e. the reduction function is applied on all the values with the same key in the dataset. E.g.
in:
>>> ctx.range(10).map(lambda i: (i//3, i)).aggregate_by_key(sum).collect()
[(0, 3), (1, 12), (2, 21), (3, 9)]
the mapper function (lambda i: (i//3, i)) creates key-value tuples out of a range of integers (0 through 9), but it isn’t anotated and can’t be inspected that it does that. So BNDL has no good means to discern datasets with key-value pairs and those that don’t. Because of this, operators like reduce_by_key are available on any dataset, regardless of applicability.
When using such operators fails with exceptions like:
>>> ctx.range(10).aggregate_by_key(sum).values().max()
2016-12-02 23:42:32,917 - bndl.execute.scheduler - WARNING - <ComputePartitionTask n9h4eu0f.0 failed> failed on 'localdomain.localhost.worker.27110.0.3' after 1 attempts ... aborting
Traceback (most recent call last):
...
File "/home/frens-jan/Workspaces/tgho/bndl/bndl/bndl/compute/shuffle.py", line 389, in part_
return partitioner(key(element))
File "bndl/util/funcs.pyx", line 44, in bndl.util.funcs._getter (bndl/util/funcs.c:2191)
return obj[key]
TypeError: 'int' object is not subscriptable
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/frens-jan/Workspaces/tgho/bndl/bndl/bndl/compute/dataset.py", line 767, in max
return self.aggregate(partial(max, key=key) if key else max)
...
bndl.rmi.exceptions.InvocationException: An exception was raised on localdomain.localhost.worker.27110.0.3: TypeError
consider whether the the dataset actually contains key-value pairs.
Caching¶
Datasets can be cached by calling Dataset.cache
. They can be
cached in memory or on disk using various serializations (‘json’, ‘marshal’, ‘pickle’, ‘msgpack’,
‘text’ and ‘binary’) as well as unserialized in memory.
cache cached uncache