coolamqp/attaches/consumer.py
# coding=UTF-8
from __future__ import absolute_import, division, print_function
import io
import logging
import typing as tp
import uuid
from concurrent.futures import Future
from coolamqp.attaches.channeler import Channeler, ST_ONLINE, ST_OFFLINE
from coolamqp.exceptions import AMQPError
from coolamqp.framing.definitions import ChannelOpenOk, BasicConsume, \
BasicConsumeOk, QueueDeclare, QueueDeclareOk, ExchangeDeclare, \
ExchangeDeclareOk, \
QueueBind, QueueBindOk, ChannelClose, BasicDeliver, BasicCancel, \
BasicAck, BasicReject, RESOURCE_LOCKED, BasicCancelOk, BasicQos, BasicQosOk
from coolamqp.framing.frames import AMQPBodyFrame, AMQPHeaderFrame
from coolamqp.objects import Callable
from coolamqp.uplink import HeaderOrBodyWatch, MethodWatch
logger = logging.getLogger(__name__)
EMPTY_MEMORYVIEW = memoryview(b'') # for empty messages
class BodyReceiveMode(object):
# ZC - zero copy
# C - copy (copies every byte once)
BYTES = 0 # message.body will be a single bytes object
# this will gather frames as memoryviews, and b''.join() them upon
# receiving last frame
# this is C
MEMORYVIEW = 1 # message.body will be returned as a memoryview object
# this is ZC for small messages, and C for multi-frame ones
# think less than 800B, since 2048 is the buffer for socket recv, and an
# AMQP frame (or many frames!) have to fit there
LIST_OF_MEMORYVIEW = 2 # message.body will be returned as list of
# memoryview objects these constitute received pieces. this is always ZC
class Consumer(Channeler):
"""
This object represents a consumer in the system.
Consumer may reside on any AMQP broker, this is to be decided by CoolAMQP.
Consumer, when created, has the state of ST_SYNCING. CoolAMQP will
try to declare the consumer where it makes most sense for it to be.
If it succeeds, the consumer will enter state ST_ONLINE, and callables
on_start will be called. This means that broker has confirmed that this
consumer is operational and receiving messages.
Note that does not attempt to cancel consumers, or any of such nonsense.
Having a channel per consumer gives you the unique possibility of simply
closing the channel. Since this implies cancelling the consumer, here you
go.
WARNING: READ DEFAULT VALUES IN CONSTRUCTOR! TAKE CARE WHAT YOUR CONSUMERS
DO!
You can subscribe to be informed when the consumer is cancelled (for any
reason, server or client side) with:
>>> con, fut = Cluster.consume(...)
>>> def im_called_on_cancel_for_any_reason(): # must have arity of 0
>>> ..
>>> con.on_cancel.add(im_called_on_cancel_for_any_reason)
>>> con.cancel()
Or, if RabbitMQ is in use, you can be informed upon a Consumer Cancel
Notification:
>>> con.on_broker_cancel.add(im_cancelled_by_broker)
:param queue: Queue object, being consumed from right now.
Note that name of anonymous queue might change at any time!
:type queue: coolamqp.objects.Queue
:param on_message: callable that will process incoming messages
:type on_message: callable(ReceivedMessage instance)
:param span: optional span, if opentracing is installed
:param no_ack: Will this consumer require acknowledges from messages?
:type no_ack: bool
:param qos: a tuple of (prefetch size, prefetch window) for this
consumer, or an int (prefetch window only).
If an int is passed, prefetch size will be set to 0 (which means
undefined), and this int will be used for prefetch window
:type qos: tuple(int, int) or tuple(None, int) or int
:param cancel_on_failure: Consumer will cancel itself when link goes
down
:type cancel_on_failure: bool
:param future_to_notify: Future to succeed when this consumer goes
online for the first time.
This future can also raise with AMQPError if
it fails to.
:type future_to_notify: concurrent.futures.Future
:param fail_on_first_time_resource_locked: When consumer is declared
for the first time, and RESOURCE_LOCKED is encountered, it will
fail the future with ResourceLocked, and consumer will cancel
itself.
By default it will retry until success is made.
If the consumer doesn't get the chance to be declared - because
of a connection fail - next reconnect will consider this to be
SECOND declaration, ie. it will retry ad infinitum
:type fail_on_first_time_resource_locked: bool
:param body_receive_mode: how should message.body be received. This
has a performance impact
:type body_receive_mode: a property of BodyReceiveMode
"""
__slots__ = ('queue', 'no_ack', 'on_message', 'cancelled', 'receiver',
'attache_group', 'channel_close_sent', 'qos', 'qos_update_sent',
'future_to_notify', 'future_to_notify_on_dead',
'fail_on_first_time_resource_locked', 'cancel_on_failure',
'body_receive_mode', 'consumer_tag', 'on_cancel', 'on_broker_cancel',
'hb_watch', 'deliver_watch', 'span')
def __init__(self, queue, on_message, span=None,
no_ack=True, qos=None,
cancel_on_failure=False,
future_to_notify=None,
fail_on_first_time_resource_locked=False,
body_receive_mode=BodyReceiveMode.BYTES
):
"""
Note that if you specify QoS, it is applied before basic.consume is
sent. This will prevent the broker from hammering you into oblivion
with a mountain of messages.
"""
super(Consumer, self).__init__()
self.span = span
self.queue = queue
self.no_ack = no_ack
self.on_message = on_message
# consumer?
self.receiver = None # MessageReceiver instance
self.attache_group = None # attache group this belongs to.
self.channel_close_sent = False # for avoiding situations where ChannelClose is sent twice
# if this is not None, then it has an attribute
# on_cancel_customer(Consumer instance)
self.qos = _qosify(qos)
self.qos_update_sent = False # QoS was not sent to server
self.future_to_notify = future_to_notify
self.future_to_notify_on_dead = None # .cancel
self.fail_on_first_time_resource_locked = fail_on_first_time_resource_locked
self.cancel_on_failure = cancel_on_failure
self.body_receive_mode = body_receive_mode
self.consumer_tag = None
self.on_cancel = Callable(
oneshots=True) #: public, called on cancel for any reason
self.on_broker_cancel = Callable(
oneshots=True) #: public, called on Customer Cancel Notification
# (RabbitMQ)
def set_qos(self, prefetch_size, prefetch_count): # type: (int, int) -> None
"""
Set new QoS for this consumer.
:param prefetch_size: prefetch in octets
:param prefetch_count: prefetch in whole messages
"""
if self.state == ST_ONLINE:
self.method(BasicQos(prefetch_size or 0, prefetch_count, False))
self.qos = prefetch_size or 0, prefetch_count
def cancel(self): # type: () -> Future
"""
Cancel the customer.
.ack() or .nack() for messages from this customer will have no effect.
:return: a Future to tell when it's done. The future will always
succeed - sooner, or later.
NOTE: Future is OK'd when entire channel is destroyed
"""
if self.future_to_notify_on_dead is not None:
# we cancelled it earlier
return self.future_to_notify_on_dead
else:
self.future_to_notify_on_dead = Future()
self.future_to_notify_on_dead.set_running_or_notify_cancel()
self.cancelled = True
self.on_cancel()
# you'll blow up big next time you try to use this consumer if you
# can't cancel, but just close
if self.consumer_tag is not None:
if not self.channel_close_sent and self.state == ST_ONLINE:
self.method_and_watch(BasicCancel(self.consumer_tag, False),
[BasicCancelOk],
self.on_close)
else:
if not self.channel_close_sent and self.state == ST_ONLINE:
self.method(ChannelClose(0, b'cancelling', 0, 0))
self.channel_close_sent = True
if self.attache_group is not None:
self.attache_group.on_cancel_customer(self)
return self.future_to_notify_on_dead
def on_operational(self, operational): # type: (bool) -> None
super(Consumer, self).on_operational(operational)
if operational:
self.channel_close_sent = False
self.receiver = MessageReceiver(self)
# notify the future
if self.future_to_notify is not None:
self.future_to_notify.set_result(None)
self.future_to_notify = None
else:
self.hb_watch.cancel()
self.deliver_watch.cancel()
self.receiver.on_gone()
self.receiver = None
def on_close(self, payload=None):
# type: (tp.Optional[coolamqp.framing.base.AMQPMethodPayload]) -> None
"""
Handle closing the channel. It sounds like an exception...
This is done in two steps:
1. self.state <- ST_OFFLINE, on_event(EV_OFFLINE) upon detecting
that no more messages will
be there
2. self.channel_id <- None, channel is returned to Connection - c
hannel has been physically torn down
Note, this can be called multiple times, and eventually with None.
"""
if self.cancel_on_failure and (not self.cancelled):
logger.debug(
'Consumer is cancel_on_failure and failure seen, True->cancelled')
self.cancelled = True
self.on_cancel()
if self.state == ST_ONLINE:
# The channel has just lost operationality!
self.on_operational(False)
self.state = ST_OFFLINE
# deliver and head/body watch will clean up after themselves
should_retry = False
if isinstance(payload, BasicCancel):
# Consumer Cancel Notification - by RabbitMQ
# send them back those memoryviews :D
# on_close is a one_shot watch. We need to re-register it now.
self.register_on_close_watch()
if not self.channel_close_sent:
self.methods([BasicCancelOk(payload.consumer_tag),
ChannelClose(0, b'Received basic.cancel', 0, 0)])
self.channel_close_sent = True
self.cancelled = True # wasn't I?
self.on_cancel()
self.on_broker_cancel()
return
if isinstance(payload, BasicCancelOk):
# OK, our cancelling went just fine - proceed with teardown
self.register_on_close_watch()
if not self.channel_close_sent:
self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0))
self.channel_close_sent = True
return
if isinstance(payload, ChannelClose):
rc = payload.reply_code
if rc == RESOURCE_LOCKED:
# special handling
# This is because we might be reconnecting, and the broker
# doesn't know yet that we are dead.
# it won't release our exclusive channels, and that's why
# we'll get RESOURCE_LOCKED.
if self.fail_on_first_time_resource_locked:
# still, a RESOURCE_LOCKED on a first declaration ever
# suggests something is very wrong
self.cancelled = True
self.on_cancel()
else:
# Do not notify the user, and retry at will.
# Do not zero the future - we will need to later confirm
# it, so it doesn't leak.
should_retry = True
if self.future_to_notify:
err = AMQPError(payload)
if self.span is not None:
from opentracing import logs, tags
self.span.set_tag(tags.ERROR, True)
self.span.log_kv({logs.EVENT: tags.ERROR,
logs.ERROR_OBJECT: err,
logs.ERROR_KIND: type(err)})
self.span = None
self.future_to_notify.set_exception(err)
self.future_to_notify = None
logger.debug('Notifying connection closed with %s', payload)
# We might not want to throw the connection away.
should_retry = should_retry and (not self.cancelled)
old_con = self.connection
super(Consumer, self).on_close(
payload) # this None's self.connection and returns port
self.fail_on_first_time_resource_locked = False
if self.future_to_notify_on_dead: # notify it was cancelled
logger.info('Consumer successfully cancelled')
self.future_to_notify_on_dead.set_result(None)
self.future_to_notify_on_dead = None
if should_retry:
if self.span is not None:
from opentracing import logs
self.span.log_kv({logs.EVENT: 'Retrying'})
if old_con.state == ST_ONLINE:
logger.info('Retrying with %s', self.queue.name)
self.attach(old_con)
def on_delivery(self, sth):
"""
Callback for delivery-related shit
:param sth: AMQPMethodFrame WITH basic-deliver, AMQPHeaderFrame or
AMQPBodyFrame
"""
if self.receiver is None:
# dead, cancelled, whatever
return
if isinstance(sth, BasicDeliver):
self.receiver.on_basic_deliver(sth)
elif isinstance(sth, AMQPBodyFrame):
self.receiver.on_body(sth.data)
elif isinstance(sth, AMQPHeaderFrame):
self.receiver.on_head(sth)
# No point in listening for more stuff, that's all the watches
# even listen for
def on_setup(self, payload): # type: (coolamqp.framing.base.AMQPMethodPayload) -> None
"""Called with different kinds of frames - during setup"""
if isinstance(payload, ChannelOpenOk):
# Do we need to declare the exchange?
if self.queue.exchange is not None:
self.connection.method_and_watch(
self.channel_id,
ExchangeDeclare(self.queue.exchange.name.encode('utf8'),
self.queue.exchange.type,
False,
self.queue.exchange.durable,
self.queue.exchange.auto_delete,
False,
False,
[]),
ExchangeDeclareOk,
self.on_setup
)
else:
self.on_setup(ExchangeDeclareOk())
elif isinstance(payload, ExchangeDeclareOk):
# Declare the queue
name = b'' if self.queue.anonymous else self.queue.name
self.connection.method_and_watch(
self.channel_id,
QueueDeclare(
name,
False,
self.queue.durable,
self.queue.exclusive,
self.queue.auto_delete,
False,
[]
),
QueueDeclareOk,
self.on_setup
)
elif isinstance(payload, QueueDeclareOk):
# did we need an anonymous name?
if not self.queue.name:
self.queue.name = payload.queue.tobytes()
queue_declared = False
# We need any form of binding.
if self.queue.exchange is not None:
if self.queue.exchange.type != b'topic':
queue_declared = True
self.method_and_watch(
QueueBind(
self.queue.name,
self.queue.exchange.name.encode('utf8'),
b'', False, []),
QueueBindOk,
self.on_setup
)
if not queue_declared:
# default exchange, pretend it was bind ok
self.on_setup(QueueBindOk())
elif isinstance(payload, QueueBindOk):
if self.qos is not None:
self.method_and_watch(
BasicQos(self.qos[0], self.qos[1], False),
BasicQosOk,
self.on_setup
)
else:
self.on_setup(BasicQosOk()) # pretend QoS went ok
elif isinstance(payload, BasicQosOk):
self.consumer_tag = uuid.uuid4().hex.encode(
'utf8') # str in py2, unicode in py3
self.method_and_watch(
BasicConsume(self.queue.name, self.consumer_tag,
False, self.no_ack, self.queue.exclusive, False,
[]),
BasicConsumeOk,
self.on_setup
)
elif isinstance(payload, BasicConsumeOk):
# AWWW RIGHT~!!! We're good.
consumer_tag = self.consumer_tag
if self.span is not None:
self.span.set_tag('consumer.tag', consumer_tag)
self.on_operational(True)
# Register watches for receiving shit
# this is multi-shot by default
self.hb_watch = HeaderOrBodyWatch(self.channel_id,
self.on_delivery)
self.connection.watch(self.hb_watch)
# multi-shot watches need manual cleanup!
self.deliver_watch = MethodWatch(self.channel_id, BasicDeliver,
self.on_delivery)
self.deliver_watch.oneshot = False
self.connection.watch(self.deliver_watch)
self.state = ST_ONLINE
if self.cancelled:
self.method(ChannelClose(0, b'Received basic.cancel-ok', 0, 0))
self.channel_close_sent = True
self.state = ST_OFFLINE
return
# resend QoS, in case of sth
if self.qos is not None:
self.set_qos(self.qos[0], self.qos[1])
def _qosify(qos):
if qos is not None:
if isinstance(qos, int):
qos = 0, qos
elif qos[0] is None:
qos = 0, qos[1] # prefetch_size=0=undefined
return qos
class MessageReceiver(object):
"""This is an object that is used to received messages.
It maintains all the state, and is used to ack/nack messages as well.
This object is TORN DOWN when a consumer goes offline,
and is recreated when it goes online.
This is called by consumer upon receiving different parts of the message,
and may opt to kill the connection on bad framing with
self.consumer.connection.send(None)
"""
__slots__ = ('consumer', 'state', 'bdeliver', 'header', 'body', 'data_to_go',
'message_size', 'offset', 'acks_pending', 'recv_mode')
def __init__(self, consumer): # type: (Consumer) -> None
self.consumer = consumer
self.state = 0 # 0 - waiting for Basic-Deliver
# 1 - waiting for Header
# 2 - waiting for Body [all]
# 3 - gone!
self.bdeliver = None # payload of Basic-Deliver
self.header = None # AMQPHeaderFrame
if consumer.body_receive_mode == BodyReceiveMode.MEMORYVIEW:
self.body = None # None is an important sign - first piece of
# message
else:
self.body = [] # list of payloads
self.data_to_go = None # set on receiving header, how much bytes we
# need yet
self.message_size = None # in bytes, of currently received message
self.offset = 0 # used only in MEMORYVIEW mode - pointer to self.body
# (which would be a buffer)
self.acks_pending = set() # list of things to ack/reject
self.recv_mode = consumer.body_receive_mode
# if BYTES, pieces (as mvs) are received into .body and b''.join()ed
# at the end
# if MEMORYVIEW:
# upon first piece, if it's a single-frame message,
# it's returned at once
# if multiframe, self.body is made into a buffer
# and further are received into it
# if LIST_OF_MEMORYVIEW, pieces (as mvs) are stored into .body, and
# that's returned
def on_gone(self):
"""Called by Consumer to inform upon discarding this receiver"""
self.state = 3
def confirm(self, delivery_tag, success): # type: (int, tp.Callable[[], None]) -> None
"""
This crafts a constructor for confirming messages.
This should return a callable/0, whose calling will ACK or REJECT the
message.
Calling it multiple times should have no ill effect.
If this receiver is long gone,
:param delivery_tag: delivery_tag to ack
:param success: True if ACK, False if REJECT
:return: callable/0
"""
def clbl():
if self.state == 3:
return # Gone!
if self.consumer.cancelled:
return # cancelled!
if delivery_tag not in self.acks_pending:
return # already confirmed/rejected
if success:
self.consumer.method(BasicAck(delivery_tag, False))
else:
self.consumer.method(BasicReject(delivery_tag, True))
return clbl
def on_head(self, frame):
assert self.state == 1
self.header = frame
self.message_size = self.data_to_go = frame.body_size
self.state = 2
if not self.header.body_size:
# An empty message is no common guest. It won't have a BODY field
# though...
self.on_body(EMPTY_MEMORYVIEW) # trigger it manually
def on_basic_deliver(self, payload):
assert not self.state
self.bdeliver = payload
self.state = 1
def on_body(self, payload):
""":type payload: buffer"""
assert self.state == 2
self.data_to_go -= len(payload)
if self.recv_mode == BodyReceiveMode.MEMORYVIEW:
if self.body is not None:
# continuing a multipart
self.body[self.offset:self.offset + len(payload)] = payload
self.offset += len(payload)
else:
# new one
if not self.data_to_go: # special case - single frame message
self.body = payload
else:
self.body = memoryview(bytearray(self.message_size))
self.body[0:len(payload)] = payload
self.offset = len(payload)
else: # BYTES and LIST_OF_MEMORYVIEW
self.body.append(payload)
assert self.data_to_go >= 0
if not self.data_to_go:
ack_expected = not self.consumer.no_ack
# Message A-OK!
if ack_expected:
self.acks_pending.add(self.bdeliver.delivery_tag)
from coolamqp.objects import ReceivedMessage
# Does body need preprocessing?
body = self.body
if self.recv_mode == BodyReceiveMode.BYTES:
if len(self.body) == 1:
# common case :)
body = self.body[0].tobytes()
else:
# since b''.join() with list comprehension and .tobytes()
# would create
# an extra copy of string
bio = io.BytesIO()
for mv in body:
bio.write(mv)
body = bio.getvalue()
# if MEMORYVIEW, then it's already ok
rm = ReceivedMessage(
body,
self.bdeliver.exchange,
self.bdeliver.routing_key,
self.header.properties,
self.bdeliver.delivery_tag,
None if self.consumer.no_ack else self.confirm(
self.bdeliver.delivery_tag, True),
None if self.consumer.no_ack else self.confirm(
self.bdeliver.delivery_tag, False),
)
self.consumer.on_message(rm)
self.state = 0
# at this point it's safe to clear the body
if self.recv_mode == BodyReceiveMode.MEMORYVIEW:
self.body = None
else:
self.body = []