Reference¶
Session¶
Partitioner¶
-
class
bndl_cassandra.partitioner.
Bin
[source]¶ Bases:
object
-
__dict__
= mappingproxy({'__dict__': <attribute '__dict__' of 'Bin' objects>, '__init__': <function Bin.__init__>, '__module__': 'bndl_cassandra.partitioner', '__weakref__': <attribute '__weakref__' of 'Bin' objects>, '__doc__': None})¶
-
__module__
= 'bndl_cassandra.partitioner'¶
-
__weakref__
¶ list of weak references to the object (if defined)
-
-
class
bndl_cassandra.partitioner.
SizeEstimate
(size, partitions, fraction)[source]¶ Bases:
object
-
__dict__
= mappingproxy({'__init__': <function SizeEstimate.__init__>, '__add__': <function SizeEstimate.__add__>, '__repr__': <function SizeEstimate.__repr__>, '__dict__': <attribute '__dict__' of 'SizeEstimate' objects>, '__module__': 'bndl_cassandra.partitioner', '__doc__': None, '__iadd__': <function SizeEstimate.__iadd__>, '__hash__': <function SizeEstimate.__hash__>, '__eq__': <function SizeEstimate.__eq__>, '__weakref__': <attribute '__weakref__' of 'SizeEstimate' objects>})¶
-
__module__
= 'bndl_cassandra.partitioner'¶
-
__weakref__
¶ list of weak references to the object (if defined)
-
Dataset¶
-
class
bndl_cassandra.dataset.
CassandraScanDataset
(ctx, keyspace, table, contact_points=None)[source]¶ Bases:
bndl_cassandra.dataset._CassandraDataset
-
__init__
(ctx, keyspace, table, contact_points=None)[source]¶ Create a scan across keyspace.table.
Parameters: - ctx – The compute context.
- keyspace – str Keyspace of the table to scan.
- table – str Name of the table to scan.
- contact_points – None or str or [str,str,str,...] None to use the default contact points or a list of contact points or a comma separated string of contact points.
-
as_dataframe
()[source]¶ Create a bndl.compute.dataframe.DistributedDataFrame out of a Cassandra table scan.
When primary key fields are selected, they are used to compose a (multilevel) index.
Example:
>>> df = ctx.cassandra_table('ks', 'tbl').as_dataframe() >>> df.collect() comments id timestamp ZIJr6BDGCeo 2014-10-09 19:28:43.657 1 2015-01-12 20:24:49.947 4 2015-01-13 02:24:30.931 39 kxcT9VOI0oU 2015-01-12 14:24:16.378 1 2015-01-12 20:24:49.947 5 2015-01-13 02:24:30.931 8 2015-02-04 10:29:58.118 4 A_egyclRPOw 2015-12-16 13:50:53.210 1 2015-01-18 18:28:19.556 2 2015-01-22 22:28:33.358 4 2015-01-27 02:28:59.578 6 2015-01-31 06:29:07.937 7
-
span_by
(*cols)[source]¶ Span by groups rows in a Cassandra table scan by a subset of the primary key.
This is useful for tables with clustering columns: rows in a cassandra table scan are returned clustered by partition key and sorted by clustering columns. This is exploited to efficiently (without shuffle) group rows by a part of the primary key.
Example:
>>> tbl = ctx.cassandra_table('ks', 'tbl') >>> tbl.span_by().collect() [('ZIJr6BDGCeo', comments id timestamp ZIJr6BDGCeo 2014-10-09 19:28:43.657 1 2015-01-12 20:24:49.947 4 2015-01-13 02:24:30.931 9), ('kxcT9VOI0oU', comments id timestamp kxcT9VOI0oU 2015-01-12 14:24:16.378 1 2015-01-12 20:24:49.947 2 2015-01-13 02:24:30.931 5 2015-02-04 10:29:58.118 8), ('A_egyclRPOw', comments id timestamp A_egyclRPOw 2015-12-16 13:50:53.210 1 2015-01-18 18:28:19.556 2 2015-01-22 22:28:33.358 4 2015-01-27 02:28:59.578 6 2015-01-31 06:29:07.937 7)]
A Cassandra table scan spanned by part of the primary key consists of pandas.DataFrame objects, and thus allows for easy per group analysis.
>>> for key, count in tbl.span_by().map_values(lambda e: e.count()).collect(): ... print(key, ':', count) ... ZIJr6BDGCeo : comments 3 dtype: int64 kxcT9VOI0oU : comments 4 dtype: int64 A_egyclRPOw : comments 5 dtype: int64
-
__module__
= 'bndl_cassandra.dataset'¶
-
CoScan¶
Join¶
-
class
bndl_cassandra.join.
CassandraJoinDataset
(src, keyspace, table, contact_points=None)[source]¶ Bases:
bndl_cassandra.dataset._CassandraDataset
-
on
(columns=None, key=None)[source]¶ Join on particular columns (from the primary key in the Cassandra table) and set a key to select the corresponding values from the dataset to join.
Parameters: - columns – sequence[str], optional The columns to join on. Must be a left subset of the primary key. The following must hold: primary_key[:len(columns)] == columns. When the full primary key is selected, the rows yielded will be single elements (a dict or namedtuple representing the selected row). When only part of the primary key is selected (the partition key columns and perhaps some clustering columns), a list of selected rows is yielded.
- key –
callable(element), list[object] or object, optional The key for getting the values to query Cassandra with. Must be a * callable which returns a sequence for each element in this
dataset with the values to use in the join.- or a list of objects to be used as index in each element in a toolz.getter(columns) fashion (i.e. using the __getitem__ protocol)
- or a plain value to be used with the __getitem__ mechanism
-
__module__
= 'bndl_cassandra.join'¶
-
Save¶
-
bndl_cassandra.save.
execute_save
(ctx, statement, iterable, keyspace=None, contact_points=None)[source]¶ Save elements from an iterable given the insert/update query. Use cassandra_save to save a dataset. This method is useful when saving to multiple tables from a single dataset with map_partitions.
Parameters: - ctx – bndl.compute.context.ComputeContext The BNDL compute context to use for configuration and accessing the cassandra_session.
- statement – str The Cassandra statement to use in saving the iterable.
- iterable – list, tuple, generator, iterable, ... The values to save.
- keyspace – str or None The keyspace to execute the save in (if the statements don’t have an explicit keyspace). keyspace is supplied to ctx.cassandra_session which defaults to using the ‘bndl_cassandra.keyspace’ setting as default value.
- contact_points – str, tuple, or list A string or tuple/list of strings denoting host names (contact points) of the Cassandra cluster to save to. Defaults to using the ip addresses in the BNDL cluster.
Returns: A count of the records saved.
-
bndl_cassandra.save.
cassandra_execute
(dataset, statement, keyspace=None, contact_points=None)[source]¶ Execute a CQL statement per element in the dataset.
Parameters: - dataset – bndl.compute.context.ComputationContext As the cassandra_save function is typically bound on ComputationContext this corresponds to self.
- statement – str The CQL statement to execute.
- keyspace – str The name of the Cassandra keyspace to save to.
- contact_points – str, tuple, or list A string or tuple/list of strings denoting hostnames (contact points) of the Cassandra cluster to save to. Defaults to using the ip addresses in the BNDL cluster.
-
bndl_cassandra.save.
cassandra_save
(dataset, keyspace, table, columns=None, keyed_rows=True, ttl=None, timestamp=None, contact_points=None)[source]¶ Performs a Cassandra insert for each element of the dataset.
Parameters: - dataset – bndl.compute.context.ComputationContext As the cassandra_save function is typically bound on ComputationContext this corresponds to self.
- keyspace – str The name of the Cassandra keyspace to save to.
- table – The name of the Cassandra table (column family) to save to.
- columns –
The names of the columns to save.
When the dataset contains tuples, this arguments is the mapping of the tuple elements to the columns of the table saved to. If not provided, all columns are used.
When the dataset contains dictionaries, this parameter limits the columns save to Cassandra.
- keyed_rows – bool Whether to expect a dataset of dicts (or at least supports the __getitem__ protocol with columns names as key) or positional objects (e.g. tuples or lists).
- ttl – int The time to live to use in saving the records.
- timestamp – int or datetime.date or datetime.datetime The timestamp to use in saving the records.
- contact_points – str, tuple, or list A string or tuple/list of strings denoting hostnames (contact points) of the Cassandra cluster to save to. Defaults to using the ip addresses in the BNDL cluster.
Example
>>> ctx.collection([{'key': 1, 'val': 'a' }, {'key': 2, 'val': 'b' },]) .cassandra_save('keyspace', 'table') .execute() >>> ctx.range(100) .map(lambda i: {'key': i, 'val': str(i) } .cassandra_save('keyspace', 'table') .sum() 100