docs/clients/single.rst

Summary

Maintainability
Test Coverage
==========================
The Single Consumer Client
==========================

  The ``SingleConsumer`` client class is used when you want to consume messages
  but don't need to coordinate consumer instances amongst themselves.  It's
  importable via the ``kiel.clients`` module and provides a ``consume()``
  method capable of starting at the beginning or end of a topic *or* a given
  ``datetime`` or ``timedelta``.

.. code-block:: python

  from kiel import clients
  from tornado import gen

  consumer = clients.SingleConsumer(
      ["kafka01", "kafka02"],
      deserializer=None,
      max_wait_time=1000,  # in milliseconds
      min_bytes=1,
      max_bytes=(1024 * 1024),
  )

  @gen.coroutine
  def run():
      yield consumer.connect()
      msgs = yield consumer.consume("example.topic")
      for msg in msgs:
          print(msg)


The only *required* constructor parameter is the list of bootstrap broker
hosts.


Where to Start
--------------

Other than the topic to consume, the ``consume()`` method also takes an optional
parameter of where in the topic's history to start.

.. note::

   The ``start`` parameter is honored in only two cases

   * when consuming from a topic for the first time
   * an "offset out of range" error is encountered.

There are four different possible kinds of values:

* ``SingleConsumer.END`` **(default)**

  This denotes the tail end of the topic, the ``consume()`` call will return
  messages once some are available.

*  ``SingleConsumer.BEGINNING``

   The very beginning of a topic (often 0).  Useful for re-processing topics.

*  ``datetime``

   Starts consuming a topic at roughly the point it was at a given time (in
   UTC).

*  ``timedelta``

   Starts consuming a topic at roughly the point it was at a *reliative*
   time.


.. warning::

   The time-based options rely on epoch seconds and are vulnerable to clock
   skew between brokers and client servers.


The Deserializer
----------------

The JSON Default
~~~~~~~~~~~~~~~~

By default ``json.dumps`` is used as a deserializer.  This works in conjunction
with the default serializer on the ``Producer`` class:

.. code-block:: python

  import random

  from kiel import clients
  from tornado import gen

  producer = clients.Producer(["kafka01"])
  consumer = clients.SingleConsumer(["kafka01"])

  @gen.coroutine
  def produce():
      yield producer.connect()
      while True:
          yield producer.produce(
              "example.colors", {"color": random.choice(["blue", "red"])}
          )

  @gen.coroutine
  def consume():
      yield consumer.connect()
      while True:
          msgs = yield consumer.consume("example.colors")
          for msg in msgs:
              print(msg["color"])

Customizing
~~~~~~~~~~~

Deserializing can be customized via the ``deserializer`` constructor parameter.
The given callable will be passed a message's value as a single argument.

A trivial example where messages are rot-13 encoded:

.. code-block:: python

  import codecs

  from kiel import clients
  from tornado import gen


  def deserialize(value):
      return codecs.decode(value, "rot_13")

  consumer = clients.SingleConsumer(["kafka01"], deserializer=deserialize)

  @gen.coroutine
  def consume():
      yield consumer.connect()
      while True:
          msgs = yield consumer.consume("example.colors")
          for msg in msgs:
              print(msg["color"])


Limiting Responses
------------------

Max and Min Bytes
~~~~~~~~~~~~~~~~~

The size window of responses can be controlled via the ``min_bytes`` and
``max_bytes`` constructor arguments.  These direct the Kafka brokers to
not respond until *at least* ``min_bytes`` of data is present and to
construct responses *no greater* ``max_bytes``.

.. note::

   The ``max_bytes`` directive isn't *exact* as it only limits the data in
   the partition clauses of responses, there will still be other overhead.
   The Kafka protocol does not recognize an overal "max bytes" setting but
   has a *per partition* maximum, which the consumer calculates as
   ``max_bytes`` / number of partitions.

This can be helpful for consumers starting from the beginning of a large topic
and must throttle the otherwise-massive initial responses.

.. code-block:: python

  from kiel import clients
  from tornado import gen

  consumer = clients.SingleConsumer(
      ["kafka01"],
      min_bytes=1024,
      max_bytes=(10 * 1024 * 1024)
  )

  @gen.coroutine
  def start_from_beginning():
      yield consumer.connect()

      msgs = yield consumer.consume("example.topic", start=consumer.BEGINNING)
      while msgs:
          # process msgs, etc.
          msgs = yield consumer.consume("example.topic", start=consumer.BEGINNING)

Response Wait Time
~~~~~~~~~~~~~~~~~~

The ``max_wait_time`` constructor argument can be used to tell brokers how long
the consumer is willing to wait for data.  If the ``max_wait_time`` is reached
before data is available the broker will respond with a retriable "timeout" error
code and the ``consume()`` call will return with an empty list.


Compression
-----------

Kafka bakes compression into the wire protocol itself so the consumer classes
take care of decompression for you.

.. warning::

   Naturally, if you're using compression schemes with external dependencies
   (i.e. non-gzip schemes) when producing messages your consumers must *also*
   have those dependencies installed!