Skip to content
Snippets Groups Projects
Commit 0a398936 authored by Dean Jukes's avatar Dean Jukes
Browse files

venv

parents
No related branches found
No related tags found
No related merge requests found
Showing
with 260 additions and 0 deletions
import logging
from collections import defaultdict, namedtuple
from copy import deepcopy
from kafka.vendor import six
log = logging.getLogger(__name__)
ConsumerPair = namedtuple("ConsumerPair", ["src_member_id", "dst_member_id"])
"""
Represents a pair of Kafka consumer ids involved in a partition reassignment.
Each ConsumerPair corresponds to a particular partition or topic, indicates that the particular partition or some
partition of the particular topic was moved from the source consumer to the destination consumer
during the rebalance. This class helps in determining whether a partition reassignment results in cycles among
the generated graph of consumer pairs.
"""
def is_sublist(source, target):
"""Checks if one list is a sublist of another.
Arguments:
source: the list in which to search for the occurrence of target.
target: the list to search for as a sublist of source
Returns:
true if target is in source; false otherwise
"""
for index in (i for i, e in enumerate(source) if e == target[0]):
if tuple(source[index: index + len(target)]) == target:
return True
return False
class PartitionMovements:
"""
This class maintains some data structures to simplify lookup of partition movements among consumers.
At each point of time during a partition rebalance it keeps track of partition movements
corresponding to each topic, and also possible movement (in form a ConsumerPair object) for each partition.
"""
def __init__(self):
self.partition_movements_by_topic = defaultdict(
lambda: defaultdict(set)
)
self.partition_movements = {}
def move_partition(self, partition, old_consumer, new_consumer):
pair = ConsumerPair(src_member_id=old_consumer, dst_member_id=new_consumer)
if partition in self.partition_movements:
# this partition has previously moved
existing_pair = self._remove_movement_record_of_partition(partition)
assert existing_pair.dst_member_id == old_consumer
if existing_pair.src_member_id != new_consumer:
# the partition is not moving back to its previous consumer
self._add_partition_movement_record(
partition, ConsumerPair(src_member_id=existing_pair.src_member_id, dst_member_id=new_consumer)
)
else:
self._add_partition_movement_record(partition, pair)
def get_partition_to_be_moved(self, partition, old_consumer, new_consumer):
if partition.topic not in self.partition_movements_by_topic:
return partition
if partition in self.partition_movements:
# this partition has previously moved
assert old_consumer == self.partition_movements[partition].dst_member_id
old_consumer = self.partition_movements[partition].src_member_id
reverse_pair = ConsumerPair(src_member_id=new_consumer, dst_member_id=old_consumer)
if reverse_pair not in self.partition_movements_by_topic[partition.topic]:
return partition
return next(iter(self.partition_movements_by_topic[partition.topic][reverse_pair]))
def are_sticky(self):
for topic, movements in six.iteritems(self.partition_movements_by_topic):
movement_pairs = set(movements.keys())
if self._has_cycles(movement_pairs):
log.error(
"Stickiness is violated for topic {}\n"
"Partition movements for this topic occurred among the following consumer pairs:\n"
"{}".format(topic, movement_pairs)
)
return False
return True
def _remove_movement_record_of_partition(self, partition):
pair = self.partition_movements[partition]
del self.partition_movements[partition]
self.partition_movements_by_topic[partition.topic][pair].remove(partition)
if not self.partition_movements_by_topic[partition.topic][pair]:
del self.partition_movements_by_topic[partition.topic][pair]
if not self.partition_movements_by_topic[partition.topic]:
del self.partition_movements_by_topic[partition.topic]
return pair
def _add_partition_movement_record(self, partition, pair):
self.partition_movements[partition] = pair
self.partition_movements_by_topic[partition.topic][pair].add(partition)
def _has_cycles(self, consumer_pairs):
cycles = set()
for pair in consumer_pairs:
reduced_pairs = deepcopy(consumer_pairs)
reduced_pairs.remove(pair)
path = [pair.src_member_id]
if self._is_linked(pair.dst_member_id, pair.src_member_id, reduced_pairs, path) and not self._is_subcycle(
path, cycles
):
cycles.add(tuple(path))
log.error("A cycle of length {} was found: {}".format(len(path) - 1, path))
# for now we want to make sure there is no partition movements of the same topic between a pair of consumers.
# the odds of finding a cycle among more than two consumers seem to be very low (according to various randomized
# tests with the given sticky algorithm) that it should not worth the added complexity of handling those cases.
for cycle in cycles:
if len(cycle) == 3: # indicates a cycle of length 2
return True
return False
@staticmethod
def _is_subcycle(cycle, cycles):
super_cycle = deepcopy(cycle)
super_cycle = super_cycle[:-1]
super_cycle.extend(cycle)
for found_cycle in cycles:
if len(found_cycle) == len(cycle) and is_sublist(super_cycle, found_cycle):
return True
return False
def _is_linked(self, src, dst, pairs, current_path):
if src == dst:
return False
if not pairs:
return False
if ConsumerPair(src, dst) in pairs:
current_path.append(src)
current_path.append(dst)
return True
for pair in pairs:
if pair.src_member_id == src:
reduced_set = deepcopy(pairs)
reduced_set.remove(pair)
current_path.append(pair.src_member_id)
return self._is_linked(pair.dst_member_id, dst, reduced_set, current_path)
return False
class SortedSet:
def __init__(self, iterable=None, key=None):
self._key = key if key is not None else lambda x: x
self._set = set(iterable) if iterable is not None else set()
self._cached_last = None
self._cached_first = None
def first(self):
if self._cached_first is not None:
return self._cached_first
first = None
for element in self._set:
if first is None or self._key(first) > self._key(element):
first = element
self._cached_first = first
return first
def last(self):
if self._cached_last is not None:
return self._cached_last
last = None
for element in self._set:
if last is None or self._key(last) < self._key(element):
last = element
self._cached_last = last
return last
def pop_last(self):
value = self.last()
self._set.remove(value)
self._cached_last = None
return value
def add(self, value):
if self._cached_last is not None and self._key(value) > self._key(self._cached_last):
self._cached_last = value
if self._cached_first is not None and self._key(value) < self._key(self._cached_first):
self._cached_first = value
return self._set.add(value)
def remove(self, value):
if self._cached_last is not None and self._cached_last == value:
self._cached_last = None
if self._cached_first is not None and self._cached_first == value:
self._cached_first = None
return self._set.remove(value)
def __contains__(self, value):
return value in self._set
def __iter__(self):
return iter(sorted(self._set, key=self._key))
def _bool(self):
return len(self._set) != 0
__nonzero__ = _bool
__bool__ = _bool
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from __future__ import absolute_import
from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
from kafka.structs import TopicPartition
class ConsumerProtocolMemberMetadata(Struct):
SCHEMA = Schema(
('version', Int16),
('subscription', Array(String('utf-8'))),
('user_data', Bytes))
class ConsumerProtocolMemberAssignment(Struct):
SCHEMA = Schema(
('version', Int16),
('assignment', Array(
('topic', String('utf-8')),
('partitions', Array(Int32)))),
('user_data', Bytes))
def partitions(self):
return [TopicPartition(topic, partition)
for topic, partitions in self.assignment # pylint: disable-msg=no-member
for partition in partitions]
class ConsumerProtocol(object):
PROTOCOL_TYPE = 'consumer'
ASSIGNMENT_STRATEGIES = ('range', 'roundrobin')
METADATA = ConsumerProtocolMemberMetadata
ASSIGNMENT = ConsumerProtocolMemberAssignment
This diff is collapsed.
This diff is collapsed.
from __future__ import absolute_import
from kafka.metrics.compound_stat import NamedMeasurable
from kafka.metrics.dict_reporter import DictReporter
from kafka.metrics.kafka_metric import KafkaMetric
from kafka.metrics.measurable import AnonMeasurable
from kafka.metrics.metric_config import MetricConfig
from kafka.metrics.metric_name import MetricName
from kafka.metrics.metrics import Metrics
from kafka.metrics.quota import Quota
__all__ = [
'AnonMeasurable', 'DictReporter', 'KafkaMetric', 'MetricConfig',
'MetricName', 'Metrics', 'NamedMeasurable', 'Quota'
]
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment