from functools import lru_cache
from operator import itemgetter
import copy
import math
import random
from bndl_cassandra.session import cassandra_session
from cytoolz import itertoolz
T_COUNT = 2 ** 64
T_MIN = -(2 ** 63)
T_MAX = (2 ** 63) - 1
[docs]def get_token_ranges(ring):
ring = [t.value for t in ring]
if ring[0] != T_MIN:
ring = [T_MIN] + ring
if ring[-1] != T_MAX:
ring = ring + [T_MAX]
return list(zip(ring[:-1], ring[1:]))
[docs]def replicas_for_token(keyspace, token_map, token):
replicas = token_map.get_replicas(keyspace, token_map.token_class(token))
return tuple(replica.address for replica in replicas)
[docs]def ranges_by_replicas(session, keyspace):
# get raw ranges from token ring
token_map = session.cluster.metadata.token_map
raw_ranges = get_token_ranges(token_map.ring)
# group by replica
return itertoolz.groupby(0, (
(replicas_for_token(keyspace, token_map, start), start, end)
for start, end in raw_ranges
))
[docs]class Bin(object):
[docs] def __init__(self):
self.ranges = []
self.size = 0
[docs]def partition_ranges_(ranges, max_length, size_estimate):
# split ranges such that they are small enough
# ranges which need to be split are yielded immediately, they'd fill a bin anyway
# the ranges smaller than max_length are put into a list for binning
# and sorted larger > smaller (First Fit Decreasing strategy)
partitioned = []
sorted_ranges = []
for start, end in ranges:
length = end - start
if length > max_length:
parts = int((length - 1) / max_length)
step = math.ceil(length / (parts + 1))
for _ in range(parts):
partitioned.append((((start, start + step),), step))
start = start + step
partitioned.append((((start, end),), end - start))
else:
sorted_ranges.append((start, end, length))
# Sort biggest first
sorted_ranges.sort(key=itemgetter(2), reverse=True)
# container for the bins for this replica set
bins = [Bin()]
# build one big partition out of the ranges for this replica set
for start, end, length in sorted_ranges:
bin = bins[0]
# see if it fits in the first (least loaded bin)
if bin.size + length > max_length:
# create a new bin if none found
bin = Bin()
bins.append(bin)
# add the token range to the bin
bin.ranges.append((start, end))
bin.size += length
# sort the bins so that the least loaded bin is the first candidate
bins.sort(key=lambda bin: bin.size)
for bin in bins:
if bin.size:
partitioned.append((bin.ranges, bin.size))
return partitioned
@lru_cache()
[docs]def partition_ranges(ctx, contact_points, keyspace, table=None, size_estimates=None):
assert table or size_estimates
with cassandra_session(ctx, contact_points=contact_points) as session:
# estimate size of table
size_estimate = size_estimates or estimate_size(session, keyspace, table)
# calculate a maximum length of a partition
# while the ratio of size in megabytes and keys may vary across the tokens
# we use the average sizes per token for both to simplify the problem
# to 1d bin packing (instead of vector bin packing)
max_size_mb = ctx.conf.get('bndl_cassandra.part_size_mb')
max_size_keys = ctx.conf.get('bndl_cassandra.part_size_keys')
max_length = T_COUNT / ctx.default_pcount
if size_estimate.token_size_mb:
max_length = min(max_length, max_size_mb / size_estimate.token_size_mb)
if size_estimate.token_size_keys:
max_length = min(max_length, max_size_keys / size_estimate.token_size_keys)
# get token ranges, grouped by replica set
by_replicas = ranges_by_replicas(session, keyspace)
# container for the partitions (replica, ranges, size_mb, size_keys)
partitions = []
# divide the token ranges in partitions, joining ranges for the same replica set
# but limited in size (in bytes and Cassandra partition keys)
# A greedy bin packing algorithm is used
for replicas, ranges in sorted(by_replicas.items(), key=itemgetter(0)):
ranges = itertoolz.pluck([1, 2], ranges)
partitioned = partition_ranges_(tuple(ranges), max_length, size_estimate)
for ranges, size in partitioned:
partitions.append((
replicas, ranges,
size * size_estimate.token_size_mb,
size * size_estimate.token_size_keys
))
# shuffle the partitions so that load is spread over the replica sets
# use a fixed seed so that the 'randomness' is fixed
random.Random(42).shuffle(partitions)
return partitions
[docs]class SizeEstimate(object):
[docs] def __init__(self, size, partitions, fraction):
if fraction:
self.table_size_mb = int(size / fraction)
self.table_size_pk = int(partitions / fraction)
self.token_size_mb = float(self.table_size_mb) / T_COUNT
self.token_size_keys = float(self.table_size_pk) / T_COUNT
else:
self.table_size_mb = 0
self.table_size_pk = 0
self.token_size_mb = 0
self.token_size_keys = 0
[docs] def __add__(self, other):
est = copy.copy(self)
est += other
return est
[docs] def __iadd__(self, other):
self.table_size_mb += other.table_size_mb
self.table_size_pk += other.table_size_pk
self.token_size_keys = (self.token_size_keys + other.token_size_keys) / 2
self.token_size_mb = (self.token_size_mb + other.token_size_mb) / 2
return self
[docs] def __hash__(self):
return (hash(self.table_size_mb) ^
hash(self.table_size_pk) ^
hash(self.token_size_mb) ^
hash(self.token_size_keys))
[docs] def __eq__(self, other):
return (self.table_size_mb == other.table_size_mb and
self.table_size_pk == other.table_size_pk and
self.token_size_mb == other.token_size_mb and
self.token_size_keys == other.token_size_keys)
[docs] def __repr__(self):
return '<SizeEstimate: size=%s, partitions=%s, partitions / token=%s, token size=%s>' % (
self.table_size_mb,
self.table_size_pk,
self.token_size_keys,
self.token_size_mb
)
[docs]def estimate_size(session, keyspace, table):
size_estimate_query = '''
select range_start, range_end, partitions_count, mean_partition_size
from system.size_estimates
where keyspace_name = %s and table_name = %s
'''
size_estimates = list(session.execute(size_estimate_query, (keyspace, table)))
size_b = 0
size_pk = 0
tokens = 0
if len(size_estimates) == 1:
range_estimate = size_estimates[0]
size_pk = range_estimate.partitions_count
size_b = range_estimate.mean_partition_size * range_estimate.partitions_count
tokens = T_COUNT
else:
for range_estimate in size_estimates:
start, end = int(range_estimate.range_start), int(range_estimate.range_end)
# don't bother unwrapping the token range crossing 0
if start > end:
continue
# count partitions, bytes and size of the token range
size_pk += range_estimate.partitions_count
size_b += range_estimate.mean_partition_size * range_estimate.partitions_count
tokens += int(range_estimate.range_end) - int(range_estimate.range_start)
fraction = tokens / T_COUNT
return SizeEstimate(size_b / 1024 / 1024, size_pk, fraction)