coolamqp/clustering/cluster.py
# coding=UTF-8
from __future__ import print_function, absolute_import, division
import logging
import time
import typing as tp
import warnings
from concurrent.futures import Future
import six
from coolamqp.attaches import Publisher, AttacheGroup, Consumer, Declarer
from coolamqp.attaches.utils import close_future
from coolamqp.clustering.events import ConnectionLost, MessageReceived, \
NothingMuch, Event
from coolamqp.clustering.single import SingleNodeReconnector
from coolamqp.exceptions import ConnectionDead
from coolamqp.objects import Exchange, Message, Queue, QueueBind
from coolamqp.uplink import ListenerThread
from coolamqp.utils import monotonic
logger = logging.getLogger(__name__)
nothing_much = NothingMuch()
# If any spans are spawn here, it's Cluster's job to finish them, except for publish()
class Cluster(object):
"""
Frontend for your AMQP needs.
This has ListenerThread.
Call .start() to connect to AMQP.
It is not safe to fork() after .start() is called, but it's OK before.
:param nodes: list of nodes, or a single node. For now, only one is supported.
:param on_fail: callable/0 to call when connection fails in an
unclean way. This is a one-shot
:param extra_properties: refer to documentation in [/coolamqp/connection/connection.py]
Connection.__init__
:param log_frames: an object that supports logging each and every frame CoolAMQP sends and
receives from the broker
:type log_frames: tp.Optional[:class:`coolamqp.tracing.BaseFrameTracer`]
:param name: name to appear in log items and prctl() for the listener thread
:param on_blocked: callable to call when ConnectionBlocked/ConnectionUnblocked is received. It will be
called with a value of True if connection becomes blocked, and False upon an unblock
:param tracer: tracer, if opentracing is installed
"""
# Events you can be informed about
ST_LINK_LOST = 0 # Link has been lost
ST_LINK_REGAINED = 1 # Link has been regained
def __init__(self, nodes, # type: tp.Union[NodeDefinition, tp.List[NodeDefinition]]
on_fail=None, # type: tp.Optional[tp.Callable[[], None]]
extra_properties=None,
# type: tp.Optional[tp.List[tp.Tuple[bytes, tp.Tuple[bytes, str]]]]
log_frames=None,
name=None, # type: tp.Optional[str]
on_blocked=None, # type: tp.Callable[[bool], None],
tracer=None # type: opentracing.Traccer
):
from coolamqp.objects import NodeDefinition
if isinstance(nodes, NodeDefinition):
nodes = [nodes]
if len(nodes) > 1:
raise NotImplementedError(u'Multiple nodes not supported yet')
if tracer is not None:
try:
import opentracing
except ImportError:
raise RuntimeError('tracer given, but opentracing is not installed!')
self.started = False # type: bool
self.tracer = tracer
self.name = name or 'CoolAMQP' # type: str
self.node, = nodes # type: NodeDefinition
self.extra_properties = extra_properties
self.log_frames = log_frames
self.on_blocked = on_blocked # type: tp.Optional[tp.Callable[[bool], None]]
self.connected = False # type: bool
self.listener = None # type: BaseListener
self.attache_group = None # type: AttacheGroup
self.events = None # type: six.moves.queue.Queue
self.snr = None # type: SingleNodeReconnector
self.pub_tr = None # type: Publisher
self.pub_na = None # type: Publisher
self.decl = None # type: Declarer
if on_fail is not None:
def decorated():
if not self.listener.terminating and self.connected:
on_fail()
self.on_fail = decorated
else:
self.on_fail = None
def bind(self, queue, exchange, routing_key, persistent=False, span=None,
dont_trace=False):
"""
Bind a queue to an exchange
"""
if span is not None and not dont_trace:
child_span = self._make_span('bind', span)
else:
child_span = None
fut = self.decl.declare(QueueBind(queue, exchange, routing_key),
persistent=persistent,
span=child_span)
return close_future(fut, child_span)
def declare(self, obj, # type: tp.Union[Queue, Exchange]
persistent=False, # type: bool
span=None, # type: tp.Optional[opentracing.Span]
dont_trace=False # type: bool
): # type: (...) -> concurrent.futures.Future
"""
Declare a Queue/Exchange
:param obj: Queue/Exchange object
:param persistent: should it be redefined upon reconnect?
:param span: optional parent span, if opentracing is installed
:param dont_trace: if True, a span won't be output
:return: Future
"""
if span is not None and not dont_trace:
child_span = self._make_span('declare', span)
else:
child_span = None
fut = self.decl.declare(obj, persistent=persistent, span=child_span)
return close_future(fut, child_span)
def drain(self, timeout, span=None, dont_trace=False): # type: (float) -> Event
"""
Return an Event.
:param timeout: time to wait for an event. 0 means return immediately. None means block forever
:param span: optional parent span, if opentracing is installed
:param dont_trace: if True, this span won't be traced
:return: an Event instance. NothingMuch is returned when there's nothing within a given timoeout
"""
def fetch():
try:
if not timeout:
return self.events.get_nowait()
else:
return self.events.get(True, timeout)
except six.moves.queue.Empty:
return nothing_much
if span is not None and not dont_trace:
from opentracing import tags
parent_span = self.tracer.start_active_span('AMQP call',
child_of=span,
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.DATABASE_TYPE: 'amqp',
tags.DATABASE_STATEMENT: 'drain'
})
with parent_span:
return fetch()
else:
return fetch()
def consume(self, queue, on_message=None, span=None,
dont_trace=False, # type: bool
*args, **kwargs):
# type: (Queue, tp.Callable[[MessageReceived], None]) -> tp.Tuple[Consumer, Future]
"""
Start consuming from a queue.
args and kwargs will be passed to Consumer constructor (coolamqp.attaches.consumer.Consumer).
Don't use future_to_notify - it's done here!
Take care not to lose the Consumer object - it's the only way to cancel a consumer!
:param queue: Queue object, being consumed from right now.
Note that name of anonymous queue might change at any time!
:param on_message: callable that will process incoming messages
if you leave it at None, messages will be .put into self.events
:param span: optional span, if opentracing is installed
:param dont_trace: if True, this won't output a span
:return: a tuple (Consumer instance, and a Future), that tells, when consumer is ready
"""
if span is not None and not dont_trace:
child_span = self._make_span('consume', span)
else:
child_span = None
fut = Future()
fut.set_running_or_notify_cancel() # it's running right now
on_message = on_message or (
lambda msg: self.events.put_nowait(MessageReceived(msg)))
con = Consumer(queue, on_message, future_to_notify=fut, span=span, *args,
**kwargs)
self.attache_group.add(con)
return con, close_future(fut, child_span)
def delete_queue(self, queue): # type: (coolamqp.objects.Queue) -> Future
"""
Delete a queue.
:param queue: Queue instance that represents what to delete
:return: a Future (will succeed with None or fail with AMQPError)
"""
return self.decl.delete_queue(queue)
def _make_span(self, call, span):
try:
from opentracing import tags
except ImportError:
pass
else:
return self.tracer.start_span('AMQP call',
child_of=span,
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.DATABASE_TYPE: 'amqp',
tags.DATABASE_STATEMENT: call
})
def publish(self, message, # type: Message
exchange=None, # type: tp.Union[Exchange, str, bytes]
routing_key=u'', # type: tp.Union[str, bytes]
tx=None, # type: tp.Optional[bool]
confirm=None, # type: tp.Optional[bool]
span=None, # type: tp.Optional[opentracing.Span]
dont_trace=False # type: bool
): # type: (...) -> tp.Optional[Future]
"""
Publish a message.
:param message: Message to publish
:param exchange: exchange to use. Default is the "direct" empty-name exchange.
:param routing_key: routing key to use
:param confirm: Whether to publish it using confirms/transactions.
If you choose so, you will receive a Future that can be used
to check it broker took responsibility for this message.
Note that if tx if False, and message cannot be delivered to broker at once,
it will be discarded
:param tx: deprecated, alias for confirm
:param span: optionally, current span, if opentracing is installed
:param dont_trace: if set to True, a span won't be generated
:return: Future to be finished on completion or None, is confirm/tx was not chosen
"""
if self.tracer is not None and not dont_trace:
span = self._make_span('publish', span)
if isinstance(exchange, Exchange):
exchange = exchange.name.encode('utf8')
elif exchange is None:
exchange = b''
elif isinstance(exchange, six.text_type):
exchange = exchange.encode('utf8')
if isinstance(routing_key, six.text_type):
routing_key = routing_key.encode('utf8')
if tx is not None: # confirm is a drop-in replacement. tx is unfortunately named
warnings.warn(u'Use confirm kwarg instead', DeprecationWarning)
if confirm is not None:
raise RuntimeError(
u'Using both tx= and confirm= at once does not make sense')
elif confirm is not None:
tx = confirm
else:
tx = False
try:
if tx:
clb = self.pub_tr
else:
clb = self.pub_na
return clb.publish(message, exchange, routing_key, span)
except Publisher.UnusablePublisher:
raise NotImplementedError(
u'Sorry, this functionality is not yet implemented!')
def start(self, wait=True, timeout=10.0): # type: (bool, float, bool) -> None
"""
Connect to broker. Initialize Cluster.
Only after this call is Cluster usable.
It is not safe to fork after this.
:param wait: block until connection is ready
:param timeout: timeout to wait until the connection is ready. If it is not, a
ConnectionDead error will be raised
:raise RuntimeError: called more than once
:raise ConnectionDead: failed to connect within timeout
"""
if self.started:
raise RuntimeError(u'[%s] This was already called!' % (self.name,))
self.started = True
self.listener = ListenerThread(name=self.name)
self.attache_group = AttacheGroup()
self.events = six.moves.queue.Queue() # for coolamqp.clustering.events.*
self.snr = SingleNodeReconnector(self.node, self.attache_group,
self.listener, self.extra_properties,
self.log_frames, self.name)
self.snr.on_fail.add(lambda: self.events.put_nowait(ConnectionLost()))
if self.on_fail is not None:
self.snr.on_fail.add(self.on_fail)
if self.on_blocked is not None:
self.snr.on_blocked.add(self.on_blocked)
# Spawn a transactional publisher and a noack publisher
self.pub_tr = Publisher(Publisher.MODE_CNPUB, self)
self.pub_na = Publisher(Publisher.MODE_NOACK, self)
self.decl = Declarer(self)
self.attache_group.add(self.pub_tr)
self.attache_group.add(self.pub_na)
self.attache_group.add(self.decl)
self.listener.init()
self.listener.start()
self.snr.connect(timeout=timeout)
if wait:
# this is only going to take a short amount of time, so we're fine with polling
start_at = monotonic()
while not self.connected and monotonic() - start_at < timeout:
time.sleep(0.1)
if not self.connected:
raise ConnectionDead(
'[%s] Could not connect within %s seconds' % (self.name, timeout,))
def shutdown(self, wait=True): # type: (bool) -> None
"""
Terminate all connections, release resources - finish the job.
:param wait: block until this is done
:raise RuntimeError: if called without start() being called first
"""
self.connected = False
if not self.started:
raise RuntimeError(u'shutdown without start')
logger.info('[%s] Commencing shutdown', self.name)
self.listener.terminate()
if wait:
self.listener.join()