Source code for kiel.clients.consumer
import collections
import logging
import json
import socket
import six
from tornado import gen
from kiel.exc import NoOffsetsError
from kiel.protocol import fetch, errors
from kiel.constants import CONSUMER_REPLICA_ID, ERROR_CODES
from .client import Client
log = logging.getLogger(__name__)
[docs]class BaseConsumer(Client):
"""
Base class for consumers, provides `consume()` but no parition allocation.
Allows for customizing the ``deserialier`` used. Default is a JSON
deserializer.
"""
def __init__(
self,
brokers,
deserializer=None,
max_wait_time=1000, # in milliseconds
min_bytes=1,
max_bytes=(1024 * 1024),
):
super(BaseConsumer, self).__init__(brokers)
self.name = ":".join([socket.gethostname(), str(id(self))])
self.deserializer = deserializer or json.loads
self.max_wait_time = max_wait_time
self.min_bytes = min_bytes
self.max_bytes = max_bytes
self.offsets = collections.defaultdict(
lambda: collections.defaultdict(int)
)
self.synced_offsets = set()
@property
def allocation(self):
"""
Property meant to denote which topics and partitions this consumer
should be aware of.
This is left to subclasses to implement, as it is one of the main
behavioral differences between a single consumer and a grouped
consumer.
"""
raise NotImplementedError
@gen.coroutine
[docs] def determine_offsets(self, topic, start=None):
"""
Subclass coroutine function for setting values in ``self.offsets``.
Kafka offers a simple "offset" api as well as a more involved set
of offset fetch and commit apis. Determining which ones to use and
how is left to the subclasses.
"""
raise NotImplementedError
@gen.coroutine
[docs] def consume(self, topic, start=None):
"""
Fetches from a given topics returns a list of deserialized values.
If the given topic is not known to have synced offsets, a call to
`determine_offsets()` is made first.
If a topic is unknown entirely the cluster's ``heal()`` method is
called and the check retried.
Since error codes and deserialization are taken care of by
`handle_fetch_response` this method merely yields to wait on the
deserialized results and returns a flattened list.
"""
if self.closing:
return
if topic not in self.synced_offsets:
try:
yield self.determine_offsets(topic, start)
except NoOffsetsError:
log.error("Unable to determine offsets for topic %s", topic)
raise gen.Return([])
self.synced_offsets.add(topic)
if topic not in self.allocation or not self.allocation[topic]:
log.debug("Consuming unknown topic %s, reloading metadata", topic)
yield self.cluster.heal()
if topic not in self.allocation or not self.allocation[topic]:
log.error("Consuming unknown topic %s and not auto-created", topic)
raise gen.Return([])
ordered = collections.defaultdict(list)
for partition_id in self.allocation[topic]:
leader = self.cluster.get_leader(topic, partition_id)
ordered[leader].append(partition_id)
requests = {}
for leader, partitions in six.iteritems(ordered):
max_partition_bytes = int(self.max_bytes / len(partitions))
requests[leader] = fetch.FetchRequest(
replica_id=CONSUMER_REPLICA_ID,
max_wait_time=self.max_wait_time,
min_bytes=self.min_bytes,
topics=[
fetch.TopicRequest(name=topic, partitions=[
fetch.PartitionRequest(
partition_id=partition_id,
offset=self.offsets[topic][partition_id],
max_bytes=max_partition_bytes,
)
for partition_id in partitions
])
]
)
results = yield self.send(requests)
raise gen.Return([
msg for messageset in results.values() for msg in messageset
if messageset
])
[docs] def handle_fetch_response(self, response):
"""
Handler for responses from the message "fetch" api.
Messages returned with the "no error" code are deserialized and
collected, the full resulting list is returned.
A retriable error code will cause the cluster "heal" flag to be set.
An error indicating that the offset used for the partition was out
of range will cause the offending topic's offsets to be redetermined
on the next call to `consume()`.
.. note::
This class and its subclasses assume that fetch requests are made
on one topic at a time, so this handler only deals with the first
topic returned.
"""
messages = []
# we only fetch one topic so we can assume only one comes back
topic = response.topics[0].name
for partition in response.topics[0].partitions:
code = partition.error_code
if code == errors.no_error:
messages.extend(self.deserialize_messages(topic, partition))
elif code in errors.retriable:
self.heal_cluster = True
elif code == errors.offset_out_of_range:
log.warn("Offset out of range for topic %s", topic)
self.synced_offsets.discard(topic)
else:
log.error(
"Got error %s for topic %s partition %s",
ERROR_CODES[code], topic, partition.partition_id
)
return messages
[docs] def deserialize_messages(self, topic_name, partition):
"""
Calls the ``deserializer`` on each ``Message`` value and gives the
result.
If an error is encountered when deserializing it is logged and the
offending message is skipped.
After each successful deserialization the ``self.offsets`` entry for
the particular topic/partition pair is incremented.
"""
messages = []
for offset, msg in partition.message_set.messages:
try:
value = self.deserializer(msg.value)
except Exception:
log.exception(
"Error deserializing message: '%r'",
getattr(msg, "value", "No value on msg!")
)
continue
messages.append(value)
self.offsets[topic_name][partition.partition_id] = offset + 1
return messages