Source code for kiel.protocol.primitives
import struct
import six
[docs]class Primitive(object):
"""
The most basic structure of the protocol. Subclassed, never used directly.
Used as a building block for the various actually-used primitives outlined
in the Kafka wire protcol docs:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
"""
fmt = None
def __init__(self, value):
self.value = value
[docs] def render(self):
"""
Returns a two-element tuple with the ``struct`` format and list value.
The value is wrapped in a list, as there are some primitives that deal
with multiple values. Any caller of `render()` should expect a list.
"""
return self.fmt, [self.value]
@classmethod
[docs] def parse(cls, buff, offset):
"""
Given a buffer and offset, returns the parsed value and new offset.
Uses the ``fmt`` class attribute to unpack the data from the buffer
and determine the used up number of bytes.
"""
primitive_struct = struct.Struct("!" + cls.fmt)
value = primitive_struct.unpack_from(buff, offset)[0]
offset += primitive_struct.size
return value, offset
def __eq__(self, other):
"""
Basic equality method that tests equality of the ``value`` attributes.
"""
return self.value == other.value
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.value)
[docs]class VariablePrimitive(Primitive):
"""
Base primitive for variable-length scalar primitives (strings and bytes).
"""
size_primitive = None
[docs] def render(self):
"""
Returns the ``struct`` format and list of the size and value.
The format is derived from the size primitive and the length of the
resulting encoded value (e.g. the format for a string of 'foo' ends
up as 'h3s'.
.. note ::
The value is expected to be string-able (wrapped in ``str()``) and is
then encoded as UTF-8.
"""
size_format = self.size_primitive.fmt
if self.value is None:
return size_format, [-1]
value = self.value
if not isinstance(value, six.binary_type):
if not isinstance(value, six.string_types):
value = str(value)
value = value.encode("utf-8")
size = len(value)
fmt = "%s%ds" % (size_format, size)
return fmt, [size, value]
@classmethod
[docs] def parse(cls, buff, offset):
"""
Given a buffer and offset, returns the parsed value and new offset.
Parses the ``size_primitive`` first to determine how many more bytes to
consume to extract the value.
"""
size, offset = cls.size_primitive.parse(buff, offset)
if size == -1:
return None, offset
var_struct = struct.Struct("!%ds" % size)
value = var_struct.unpack_from(buff, offset)[0]
try:
value = value.decode("utf-8")
except UnicodeDecodeError:
pass
offset += var_struct.size
return value, offset
[docs]class Int8(Primitive):
"""
Represents an 8-bit signed integer.
"""
fmt = "b"
[docs]class Int16(Primitive):
"""
Represents an 16-bit signed integer.
"""
fmt = "h"
[docs]class Int32(Primitive):
"""
Represents an 32-bit signed integer.
"""
fmt = "i"
[docs]class Int64(Primitive):
"""
Represents an 64-bit signed integer.
"""
fmt = "q"
[docs]class String(VariablePrimitive):
"""
Represents a string value, length denoted by a 16-bit signed integer.
"""
size_primitive = Int16
def __repr__(self):
return repr(self.value)
[docs]class Bytes(VariablePrimitive):
"""
Represents a bytestring value, length denoted by a 32-bit signed integer.
"""
size_primitive = Int32
[docs]class Array(Primitive):
"""
Represents an array of any arbitrary `Primitive` or ``Part``.
Not used directly but rather by its ``of()`` classmethod to denote an
``Array.of(<something>)``.
"""
item_class = None
@classmethod
[docs] def of(cls, part_class):
"""
Creates a new class with the ``item_class`` attribute properly set.
"""
copy = type(
"ArrayOf%s" % part_class.__name__,
cls.__bases__, dict(cls.__dict__)
)
copy.item_class = part_class
return copy
[docs] def render(self):
"""
Creates a composite ``struct`` format and the data to render with it.
The format and data are prefixed with a 32-bit integer denoting the
number of elements, after which each of the items in the array value
are ``render()``-ed and added to the format and data as well.
"""
value = self.value
if value is None:
value = []
fmt = [Int32.fmt]
data = [len(value)]
for item_value in value:
if issubclass(self.item_class, Primitive):
item = self.item_class(item_value)
else:
item = item_value
item_format, item_data = item.render()
fmt.extend(item_format)
data.extend(item_data)
return "".join(fmt), data
@classmethod
[docs] def parse(cls, buff, offset):
"""
Parses a raw buffer at offset and returns the resulting array value.
Starts off by `parse()`-ing the 32-bit element count, followed by
parsing items out of the buffer "count" times.
"""
count, offset = Int32.parse(buff, offset)
values = []
for _ in range(count):
value, new_offset = cls.item_class.parse(buff, offset)
values.append(value)
offset = new_offset
return values, offset
def __repr__(self):
return "[%s]" % ", ".join(map(repr, self.value))