Source code for kiel.zookeeper.party
import logging
from kazoo import exceptions
log = logging.getLogger(__name__)
[docs]class Party(object):
"""
Represents a "party" recipe in Zookeeper.
A party is a recipe where various clients "join" or "leave" (a loss of
connection constituting a "leave") and each member is notified when
membership changes.
This is used in the Zookeeper-based ``GroupedConsumer`` in order to
determine who and how many hosts to divvy up partitions to.
"""
def __init__(self, client, member_name, path, on_change):
self.client = client
self.member_name = member_name
self.path = path
self.on_change = on_change
[docs] def start(self):
"""
Simple method that sets up the membership change callback.
Expected to be called by potential members before the `join()` method.
"""
self.client.ensure_path(self.path)
@self.client.ChildrenWatch(self.path)
def member_change(members):
self.on_change(members)
[docs] def join(self):
"""
Establishes the client as a "member" of the party.
This is done by creating an ephemeral child node of the party's root
path unique to this member. If the path of the child node exists but
this client isn't the owner, the node is re-created in a transaction
to establish ownership.
.. note::
It is important that the child node is *ephemeral* so that lost
connections are indistinguishable from "leaving" the party.
"""
log.info("Joining %s party as %s", self.path, self.member_name)
path = "/".join([self.path, self.member_name])
znode = self.client.exists(path)
if not znode:
log.debug("ZNode at %s does not exist, creating new one.", path)
self.client.create(path, ephemeral=True, makepath=True)
elif znode.owner_session_id != self.client.client_id[0]:
log.debug("ZNode at %s not owned by us, recreating.", path)
txn = self.client.transaction()
txn.delete(path)
txn.create(path, ephemeral=True)
txn.commit()
[docs] def leave(self):
"""
Simple method that marks the client as having left the party.
A simple matter of deleting the ephemeral child node associated with
the client. If the node is already deleted this becomes a no-op.
"""
log.info("Leaving %s party as %s", self.path, self.member_name)
path = "/".join([self.path, self.member_name])
try:
self.client.delete(path)
except exceptions.NoNodeError:
pass