Reference

Session

class bndl_cassandra.session.LocalNodeFirstPolicy(child_policy, local_hosts)[source]

Bases: cassandra.policies.TokenAwarePolicy

__init__(child_policy, local_hosts)[source]
is_node_local(host)[source]
make_query_plan(working_keyspace=None, query=None)[source]
__module__ = 'bndl_cassandra.session'
bndl_cassandra.session.prepare(self, query, custom_payload=None)[source]
bndl_cassandra.session.get_contact_points(ctx, contact_points)[source]
bndl_cassandra.session.cassandra_session(ctx, keyspace=None, contact_points=None, row_factory=None, client_protocol_handler=None)[source]

Partitioner

bndl_cassandra.partitioner.get_token_ranges(ring)[source]
bndl_cassandra.partitioner.replicas_for_token(keyspace, token_map, token)[source]
bndl_cassandra.partitioner.ranges_by_replicas(session, keyspace)[source]
class bndl_cassandra.partitioner.Bin[source]

Bases: object

__init__()[source]
__dict__ = mappingproxy({'__dict__': <attribute '__dict__' of 'Bin' objects>, '__init__': <function Bin.__init__>, '__module__': 'bndl_cassandra.partitioner', '__weakref__': <attribute '__weakref__' of 'Bin' objects>, '__doc__': None})
__module__ = 'bndl_cassandra.partitioner'
__weakref__

list of weak references to the object (if defined)

bndl_cassandra.partitioner.partition_ranges_(ranges, max_length, size_estimate)[source]
bndl_cassandra.partitioner.partition_ranges[source]
class bndl_cassandra.partitioner.SizeEstimate(size, partitions, fraction)[source]

Bases: object

__init__(size, partitions, fraction)[source]
__add__(other)[source]
__iadd__(other)[source]
__dict__ = mappingproxy({'__init__': <function SizeEstimate.__init__>, '__add__': <function SizeEstimate.__add__>, '__repr__': <function SizeEstimate.__repr__>, '__dict__': <attribute '__dict__' of 'SizeEstimate' objects>, '__module__': 'bndl_cassandra.partitioner', '__doc__': None, '__iadd__': <function SizeEstimate.__iadd__>, '__hash__': <function SizeEstimate.__hash__>, '__eq__': <function SizeEstimate.__eq__>, '__weakref__': <attribute '__weakref__' of 'SizeEstimate' objects>})
__hash__()[source]
__module__ = 'bndl_cassandra.partitioner'
__weakref__

list of weak references to the object (if defined)

__eq__(other)[source]
__repr__()[source]
bndl_cassandra.partitioner.estimate_size(session, keyspace, table)[source]

Dataset

bndl_cassandra.dataset.get_table_meta(session, keyspace, table)[source]
class bndl_cassandra.dataset.CassandraScanDataset(ctx, keyspace, table, contact_points=None)[source]

Bases: bndl_cassandra.dataset._CassandraDataset

__init__(ctx, keyspace, table, contact_points=None)[source]

Create a scan across keyspace.table.

Parameters:
  • ctx – The compute context.
  • keyspace – str Keyspace of the table to scan.
  • table – str Name of the table to scan.
  • contact_points – None or str or [str,str,str,...] None to use the default contact points or a list of contact points or a comma separated string of contact points.
where(clause, *values)[source]
count(push_down=None)[source]
as_dataframe()[source]

Create a bndl.compute.dataframe.DistributedDataFrame out of a Cassandra table scan.

When primary key fields are selected, they are used to compose a (multilevel) index.

Example:

>>> df = ctx.cassandra_table('ks', 'tbl').as_dataframe()
>>> df.collect()
                                     comments
id          timestamp
ZIJr6BDGCeo 2014-10-09 19:28:43.657         1
            2015-01-12 20:24:49.947         4
            2015-01-13 02:24:30.931         39
kxcT9VOI0oU 2015-01-12 14:24:16.378         1
            2015-01-12 20:24:49.947         5
            2015-01-13 02:24:30.931         8
            2015-02-04 10:29:58.118         4
A_egyclRPOw 2015-12-16 13:50:53.210         1
            2015-01-18 18:28:19.556         2
            2015-01-22 22:28:33.358         4
            2015-01-27 02:28:59.578         6
            2015-01-31 06:29:07.937         7
span_by(*cols)[source]

Span by groups rows in a Cassandra table scan by a subset of the primary key.

This is useful for tables with clustering columns: rows in a cassandra table scan are returned clustered by partition key and sorted by clustering columns. This is exploited to efficiently (without shuffle) group rows by a part of the primary key.

Example:

>>> tbl = ctx.cassandra_table('ks', 'tbl')
>>> tbl.span_by().collect()
[('ZIJr6BDGCeo',                       comments
  id          timestamp
  ZIJr6BDGCeo 2014-10-09 19:28:43.657         1
              2015-01-12 20:24:49.947         4
              2015-01-13 02:24:30.931         9),
 ('kxcT9VOI0oU',                       comments
  id          timestamp
  kxcT9VOI0oU 2015-01-12 14:24:16.378         1
              2015-01-12 20:24:49.947         2
              2015-01-13 02:24:30.931         5
              2015-02-04 10:29:58.118         8),
 ('A_egyclRPOw',                       comments
  id          timestamp
  A_egyclRPOw 2015-12-16 13:50:53.210         1
              2015-01-18 18:28:19.556         2
              2015-01-22 22:28:33.358         4
              2015-01-27 02:28:59.578         6
              2015-01-31 06:29:07.937         7)]

A Cassandra table scan spanned by part of the primary key consists of pandas.DataFrame objects, and thus allows for easy per group analysis.

>>> for key, count in tbl.span_by().map_values(lambda e: e.count()).collect():
...     print(key, ':', count)
...
ZIJr6BDGCeo : comments    3
dtype: int64
kxcT9VOI0oU : comments    4
dtype: int64
A_egyclRPOw : comments    5
dtype: int64
itake(num)[source]
coscan(*scans, keys=None)[source]
parts()[source]
__module__ = 'bndl_cassandra.dataset'
class bndl_cassandra.dataset.CassandraScanPartition(dset, part_idx, replicas, token_ranges, size_estimate_mb, size_estimate_keys)[source]

Bases: bndl.compute.dataset.Partition

__init__(dset, part_idx, replicas, token_ranges, size_estimate_mb, size_estimate_keys)[source]
__module__ = 'bndl_cassandra.dataset'

CoScan

bndl_cassandra.coscan.get_or_none(index, container)[source]
class bndl_cassandra.coscan.CassandraCoScanDataset(*scans, keys=None, dset_id=None)[source]

Bases: bndl.compute.dataset.Dataset

__init__(*scans, keys=None, dset_id=None)[source]
coscan(*others, keys=None)[source]
parts[source]
__module__ = 'bndl_cassandra.coscan'
class bndl_cassandra.coscan.CassandraCoScanPartition(dset, idx, scans)[source]

Bases: bndl.compute.dataset.Partition

__init__(dset, idx, scans)[source]
__module__ = 'bndl_cassandra.coscan'

Join

class bndl_cassandra.join.CassandraJoinDataset(src, keyspace, table, contact_points=None)[source]

Bases: bndl_cassandra.dataset._CassandraDataset

__init__(src, keyspace, table, contact_points=None)[source]
on(columns=None, key=None)[source]

Join on particular columns (from the primary key in the Cassandra table) and set a key to select the corresponding values from the dataset to join.

Parameters:
  • columns – sequence[str], optional The columns to join on. Must be a left subset of the primary key. The following must hold: primary_key[:len(columns)] == columns. When the full primary key is selected, the rows yielded will be single elements (a dict or namedtuple representing the selected row). When only part of the primary key is selected (the partition key columns and perhaps some clustering columns), a list of selected rows is yielded.
  • key

    callable(element), list[object] or object, optional The key for getting the values to query Cassandra with. Must be a * callable which returns a sequence for each element in this

    dataset with the values to use in the join.
    • or a list of objects to be used as index in each element in a toolz.getter(columns) fashion (i.e. using the __getitem__ protocol)
    • or a plain value to be used with the __getitem__ mechanism
inner()[source]

Yield only rows which have a corresponding row in the Cassandra table.

left()[source]

Yield rows regardless of whether a row was selected from Cassandra.

parts()[source]
__module__ = 'bndl_cassandra.join'
class bndl_cassandra.join.CassandraJoinPartition(dset, src)[source]

Bases: bndl.compute.dataset.Partition

__init__(dset, src)[source]
__module__ = 'bndl_cassandra.join'

Save

bndl_cassandra.save.batch_statements(session, key, batch_size, buffer_size, statements)[source]
bndl_cassandra.save.execute_save(ctx, statement, iterable, keyspace=None, contact_points=None)[source]

Save elements from an iterable given the insert/update query. Use cassandra_save to save a dataset. This method is useful when saving to multiple tables from a single dataset with map_partitions.

Parameters:
  • ctx – bndl.compute.context.ComputeContext The BNDL compute context to use for configuration and accessing the cassandra_session.
  • statement – str The Cassandra statement to use in saving the iterable.
  • iterable – list, tuple, generator, iterable, ... The values to save.
  • keyspace – str or None The keyspace to execute the save in (if the statements don’t have an explicit keyspace). keyspace is supplied to ctx.cassandra_session which defaults to using the ‘bndl_cassandra.keyspace’ setting as default value.
  • contact_points – str, tuple, or list A string or tuple/list of strings denoting host names (contact points) of the Cassandra cluster to save to. Defaults to using the ip addresses in the BNDL cluster.
Returns:

A count of the records saved.

bndl_cassandra.save.cassandra_execute(dataset, statement, keyspace=None, contact_points=None)[source]

Execute a CQL statement per element in the dataset.

Parameters:
  • dataset – bndl.compute.context.ComputationContext As the cassandra_save function is typically bound on ComputationContext this corresponds to self.
  • statement – str The CQL statement to execute.
  • keyspace – str The name of the Cassandra keyspace to save to.
  • contact_points – str, tuple, or list A string or tuple/list of strings denoting hostnames (contact points) of the Cassandra cluster to save to. Defaults to using the ip addresses in the BNDL cluster.
bndl_cassandra.save.cassandra_save(dataset, keyspace, table, columns=None, keyed_rows=True, ttl=None, timestamp=None, contact_points=None)[source]

Performs a Cassandra insert for each element of the dataset.

Parameters:
  • dataset – bndl.compute.context.ComputationContext As the cassandra_save function is typically bound on ComputationContext this corresponds to self.
  • keyspace – str The name of the Cassandra keyspace to save to.
  • table – The name of the Cassandra table (column family) to save to.
  • columns

    The names of the columns to save.

    When the dataset contains tuples, this arguments is the mapping of the tuple elements to the columns of the table saved to. If not provided, all columns are used.

    When the dataset contains dictionaries, this parameter limits the columns save to Cassandra.

  • keyed_rows – bool Whether to expect a dataset of dicts (or at least supports the __getitem__ protocol with columns names as key) or positional objects (e.g. tuples or lists).
  • ttl – int The time to live to use in saving the records.
  • timestamp – int or datetime.date or datetime.datetime The timestamp to use in saving the records.
  • contact_points – str, tuple, or list A string or tuple/list of strings denoting hostnames (contact points) of the Cassandra cluster to save to. Defaults to using the ip addresses in the BNDL cluster.

Example

>>> ctx.collection([{'key': 1, 'val': 'a' }, {'key': 2, 'val': 'b' },])
       .cassandra_save('keyspace', 'table')
       .execute()
>>> ctx.range(100)
       .map(lambda i: {'key': i, 'val': str(i) }
       .cassandra_save('keyspace', 'table')
       .sum()
100

Metrics

bndl_cassandra.metrics.get_cassandra_metrics()[source]
bndl_cassandra.metrics.combine_metrics(metrics1, metrics2)[source]
bndl_cassandra.metrics.metrics_by_cluster(metrics)[source]