Source code for kiel.clients.grouped

import collections
import itertools
import logging

import six
from tornado import gen

from kiel import constants, exc
from kiel.protocol import coordinator, offset_fetch, offset_commit, errors
from kiel.zookeeper.allocator import PartitionAllocator

from .consumer import BaseConsumer


log = logging.getLogger(__name__)


[docs]class GroupedConsumer(BaseConsumer): """ Consumer class with coordinated resource allocation among like members. Uses an instance of a ``PartitionAllocator`` to determine which topics and partitions to consume. Whenever the allocation is rebalanced, each consumed topic will have its partition offsets re-determined. Constructed similarly to the ``SingleConsumer`` class except for extra paramters ``group``, ``zk_hosts``, ``partition_allocator`` and ``autocommit``. """ def __init__( self, brokers, group, zk_hosts, deserializer=None, partition_allocator=None, autocommit=True, max_wait_time=1000, # in milliseconds min_bytes=1, max_bytes=(1024 * 1024), ): super(GroupedConsumer, self).__init__( brokers, deserializer, max_wait_time, min_bytes, max_bytes ) self.group_name = group self.coordinator_id = None self.allocator = PartitionAllocator( zk_hosts, self.group_name, self.name, allocator_fn=partition_allocator or naive_allocator, on_rebalance=self.synced_offsets.clear ) self.topics_to_commit = set() self.autocommit = autocommit @property def allocation(self): """ Proxy property for the topics/partitions determined by the allocator. """ return self.allocator.allocation @gen.coroutine
[docs] def connect(self): """ Overriding ``connect()`` that handles the allocator and coordinator. Simple augmentation of the base class method that starts the allocator and calls `determine_coordinator()`. """ yield super(GroupedConsumer, self).connect() yield self.allocator.start(self.cluster.topics) yield self.determine_coordinator()
@gen.coroutine
[docs] def consume(self, topic): """ Overriding ``consume()`` that handles committing offsets. This is where the ``autocommit`` flag comes into play. If the flag is set we call `commit_offsets()` here right off the bat. """ result = yield super(GroupedConsumer, self).consume(topic) if topic not in self.synced_offsets: raise gen.Return([]) self.topics_to_commit.add(topic) if self.autocommit: yield self.commit_offsets() raise gen.Return(result)
@gen.coroutine
[docs] def determine_coordinator(self): """ Determines the ID of the broker that coordinates the group. Uses the "consumer metadata" api to do its thing. All brokers contain coordinator metadata so each broker in the cluster is tried until one works. """ request = coordinator.GroupCoordinatorRequest(group=self.group_name) determined = False while not determined: broker_ids = list(self.cluster) if not broker_ids: raise exc.NoBrokersError for broker_id in broker_ids: results = yield self.send({broker_id: request}) determined = results[broker_id] if determined: break
[docs] def handle_group_coordinator_response(self, response): """ Handler for consumer metadata api responses. These responses are relatively simple and successful ones merely list the ID, host and port of the coordinator. Returns ``True`` if the coordinator was deterimend, ``False`` if not. """ determined = False if response.error_code == errors.no_error: log.info("Found coordinator: broker %s", response.coordinator_id) self.coordinator_id = response.coordinator_id determined = True elif response.error_code in errors.retriable: self.heal_cluster = True determined = False else: log.error("Got error %s when determining coordinator") determined = True return determined
@gen.coroutine
[docs] def determine_offsets(self, topic, start=None): """ Fetches offsets for a given topic via the "offset fetch" api. Simple matter of sending an OffsetFetchRequest to the coordinator broker. .. note:: The ``start`` argument is actually ignored, it exists so that the signature remains consistent with the other consumer classes. """ log.info("Fetching offsets for consumer group '%s'", self.group_name) request = offset_fetch.OffsetFetchRequest( group_name=self.group_name, topics=[ offset_fetch.TopicRequest( name=topic, partitions=list(self.allocation[topic]) ) ] ) retry = True while retry: result = yield self.send({self.coordinator_id: request}) retry = result[self.coordinator_id]
[docs] def handle_offset_fetch_response(self, response): """ Handler for offset fetch api responses. Sets the corresponding entry in the ``self.offsets`` structure for successful partition responses. Raises a ``NoOffsetsError`` exception if a fatal, non-retriable error is encountered. Returns ``True`` if the operation should be retried, ``False`` if not. """ retry = False topic = response.topics[0].name for partition in response.topics[0].partitions: code = partition.error_code if code == errors.no_error: log.debug( "Got offset %d for group %s topic %s partition %d", partition.offset, self.group_name, topic, partition.partition_id ) self.offsets[topic][partition.partition_id] = partition.offset elif code == errors.offsets_load_in_progress: log.info( "Offsets load in progress for topic %s partition %s" + " retrying offset fetch.", topic, partition.partition_id ) retry = True elif code in errors.retriable: self.heal_cluster = True retry = True else: log.error( "Got error %s for topic %s partition %s", constants.ERROR_CODES[code], topic, partition.partition_id ) raise exc.NoOffsetsError return retry
@gen.coroutine
[docs] def commit_offsets(self, metadata=None): """ Notifies Kafka that the consumer's messages have been processed. Uses the "v0" version of the offset commit request to maintain compatability with clusters running 0.8.1. """ if metadata is None: metadata = "committed by %s" % self.name log.debug("Committing offsets for consumer group %s", self.group_name) request = offset_commit.OffsetCommitV0Request( group=self.group_name, topics=[ offset_commit.TopicRequest( name=topic, partitions=[ offset_commit.PartitionRequest( partition_id=partition_id, offset=self.offsets[topic][partition_id], metadata=metadata ) for partition_id in partition_ids ] ) for topic, partition_ids in six.iteritems(self.allocation) if topic in self.topics_to_commit ] ) results = yield self.send({self.coordinator_id: request}) retry, adjust_metadata = results[self.coordinator_id] if adjust_metadata: log.warn("Offset commit metadata '%s' was too long.", metadata) metadata = "" if retry: yield self.commit_offsets(metadata=metadata)
[docs] def handle_offset_commit_response(self, response): """ Handles responses from the "offset commit" api. For successful responses the affected topics are dropped from the set of topics that need commits. In the special case of an ``offset_metadata_too_large`` error code the commit is retried with a blank metadata string. """ retry = False adjust_metadata = False for topic in response.topics: for partition in topic.partitions: code = partition.error_code if code == errors.no_error: self.topics_to_commit.discard(topic.name) elif code in errors.retriable: retry = True self.heal_cluster = True elif code == errors.offset_metadata_too_large: retry = True adjust_metadata = True else: log.error( "Got error %s for topic %s partition %s", constants.ERROR_CODES[code], topic, partition.partition_id ) return (retry, adjust_metadata)
@gen.coroutine
[docs] def wind_down(self): """ Winding down calls ``stop()`` on the allocator. """ yield self.allocator.stop()
[docs]def naive_allocator(members, partitions): """ Default allocator with a round robin approach. In this algorithm, each member of the group is cycled over and given a partition until there are no partitions left. This assumes roughly equal capacity for each member and aims for even distribution of partition counts. Does not take into account incidental clustering of partitions within the same topic. """ mapping = collections.defaultdict( lambda: collections.defaultdict(list) ) for member, partition in zip(itertools.cycle(members), partitions): topic, partition_id = partition.split(":") mapping[member][topic].append(int(partition_id)) return mapping