viatoriche/microservices

View on GitHub
microservices/queues/service.py

Summary

Maintainability
D
2 days
Test Coverage
import socket
from multiprocessing.pool import ThreadPool
from time import sleep

import six

from kombu import Connection, Consumer, Queue
from kombu.exceptions import MessageStateError
from kombu.utils import nested
from microservices.helpers.logs import InstanceLogger
from microservices.utils import get_logger

_logger = get_logger(__name__)


class HandlerError(Exception):
    pass


class DeferredMessage(object):
    _methods_for_callbacks = {
        'ack', 'reject', 'requeue', 'reject_log_error',
        'ack_log_error',
    }

    def __init__(self, message, deferred_callbacks):
        self.message = message
        self.deferred_callbacks = deferred_callbacks

    @property
    def with_deferred_callbacks(self):
        return self.deferred_callbacks is not None

    def __getattr__(self, item):
        entity = getattr(self.message, item)
        if self.with_deferred_callbacks:
            if item in self._methods_for_callbacks:
                return lambda *args, **kwargs: self.deferred_callbacks.append(
                    lambda: entity(*args, **kwargs)
                )
            else:
                return entity
        else:
            return entity


@six.python_2_unicode_compatible
class Rule(object):
    """Rule"""

    def __init__(self, name, handler, logger, autoack=True,
                 deferred_callbacks=None, pool=None,
                 **options):
        """Initialization

        :param name: name of queue
        :param handler: handle for queue
        :param autoack: if true, call message.ack()
        """
        self.handler = handler
        self.name = name
        self.options = options
        self.autoack = autoack
        self.logger = InstanceLogger(self, logger)
        self._name = '<queue: {}>'.format(self.name)
        self.deferred_callbacks = deferred_callbacks
        self.pool = pool

    def __str__(self):
        return self._name

    @property
    def with_deferred_callbacks(self):
        return self.deferred_callbacks is not None

    def add_to_pool(self, handler):
        self.pool.apply_async(handler)

    def callback(self, body, message):
        message = DeferredMessage(message, self.deferred_callbacks)
        self.logger.debug('Data (len: %s) received', len(body))

        def autoack():
            try:
                self.logger.debug('Ack message via autoack')
                message.ack()
            except ConnectionError as e:  # pragma: no cover
                self.logger.error('Connection error: %s when try message.ack',
                                  e.strerror)
            except MessageStateError:
                self.logger.warning(
                    'ACK() was called in handler?')

        def handler():
            try:
                self.logger.debug('Call handler...')
                self.handler(body, HandlerContext(message, self))
            except Exception:
                self.logger.exception('Something happened in user handler')
                raise HandlerError('Something happened in user handler')
            if self.autoack:
                autoack()

        if self.with_deferred_callbacks:
            self.logger.debug('Add handler to pool')
            self.add_to_pool(handler)
        else:
            handler()


class HandlerContext(object):
    """Context for handler function"""

    def __init__(self, message, rule):
        """Initialization

        :param message: original message from kombu
        :type message: kombu.Message
        :param rule: rule object
        :type rule: Rule
        """
        self.message = message
        self.rule = rule


@six.python_2_unicode_compatible
class Microservice(object):
    """Microservice for queues"""

    connection = 'amqp:///'

    def __init__(self, connection='amqp:///', logger=None, timeout=1, name=None,
                 workers=None, pool_factory=ThreadPool, reconnect_timeout=1,
                 reconnect_enable=True, workers_override_prefetch=True,
                 immediate_connect=True):
        """Initialization

        :type pool_factory: callable object, pool should has property size
        :param pool_factory: for pool will by configurated as pool_factory(workers)
        :type workers: int
        :param workers: count of workers in pool
        :param connection: connection for queues broker
        :type connection: str, None, dict or Connection
        :param logger: logging instance
        :type logger: Logger
        :param timeout: sleeping for loop, default = 0.1
        :type timeout: None, int or float
        """
        if logger is None:
            logger = _logger

        self.logger = InstanceLogger(self, logger)
        self.connection = self._get_connection(connection)
        self.timeout = timeout
        self.consumers = []
        self.reconnect_timeout = reconnect_timeout
        self.reconnect_enable = reconnect_enable
        self.workers_override_prefetch = workers_override_prefetch

        if name is None:
            try:
                name = '<microservice: {}>'.format(self.connection.as_uri())
            except:  # pragma no cover
                name = '<microservice: {}>'.format(
                    self.connection.transport_cls)  # pragma: no cover

        self.name = name
        self._stop = False
        self._stopped = False
        self.pool = None
        self.workers = workers
        self.deferred_callbacks = None
        if workers:
            self.deferred_callbacks = []
            self.pool = pool_factory(workers)
        if immediate_connect:
            self.connect()

    def __str__(self):
        return self.name

    @property
    def with_pool(self):
        return self.pool is not None

    def _get_connection(self, connection):
        """Create connection strategy

        :param connection: connection for broker
        :type connection: str, None, kombu.connections.Connection, dict
        :return: instance of kombu.connections.Connection
        :rtype: Connection
        """
        if not connection:
            connection = self.connection  # pragma: no cover

        if isinstance(connection, str):
            connection = {'hostname': connection}

        if isinstance(connection, dict):
            connection = Connection(**connection)

        return connection

    def add_queue_rule(self, handler, name, autoack=True, prefetch_size=0,
                       prefetch_count=0, **kwargs):
        """Add queue rule to Microservice

        :param prefetch_count: count of messages for getting from mq
        :param prefetch_size: size in bytes for getting data from mq
        :param handler: function for handling messages
        :param autoack: if True message.ack() after callback
        :type handler: callable object
        :param name: name of queue
        :type name: str
        """

        if self.with_pool:
            if self.workers_override_prefetch:
                prefetch_count = self.workers
            rule = Rule(name, handler, self.logger, autoack=autoack,
                        deferred_callbacks=self.deferred_callbacks,
                        pool=self.pool, **kwargs)
        else:
            rule = Rule(name, handler, self.logger, autoack=autoack, **kwargs)
        self.connect()
        consumer = Consumer(self.connection, queues=[Queue(rule.name)],
                            callbacks=[rule.callback], auto_declare=True)
        consumer.qos(prefetch_count=prefetch_count, prefetch_size=prefetch_size)
        self.consumers.append(consumer)
        self.logger.debug('Rule "%s" added!', rule.name)

    def _start(self):
        self._stopped = False
        self._stop = False
        self.connect()

    def stop(self):
        self._stop = True
        self.logger.info('Try to stop microservice draining events')

    def queue(self, name, autoack=True, prefetch_size=0, prefetch_count=0,
              **kwargs):
        """Decorator for handler function

        >>>app = Microservice()
        >>>
        >>>@app.queue('queue')
        >>>def function(payload, context):
        >>>    pass

        :param prefetch_count: count of messages for getting from mq
        :param prefetch_size: size in bytes for getting data from mq
        :param autoack: if True message.ack() after callback
        :param name: name of queue
        :type name: str
        """

        def decorator(f):
            self.add_queue_rule(f, name, autoack=autoack,
                                prefetch_size=prefetch_size,
                                prefetch_count=prefetch_count,
                                **kwargs)
            return f

        return decorator

    def connect(self):  # pragma no cover
        """Try connect to mq"""
        while not self._stop and not self.connection.connected:
            try:
                self.connection.connect()
                self.logger.info('Connected to mq broker')
                break
            except ConnectionError as e:  # pragma: no cover
                if self.reconnect_enable:
                    self.logger.error(
                        'Connection error, cause: %s. Reconnecting...',
                        e.strerror
                    )
                else:
                    self.stop()
                    break
            except Exception:  # pragma: no cover
                self.logger.exception(
                    'Error when try to connect')  # pragma: no cover
            sleep(self.reconnect_timeout)

    def revive(self):  # pragma no cover
        def _revive():
            for i, consumer in enumerate(self.consumers):
                self.logger.debug('Try revive consumer: %s', i)
                consumer.channel = self.connection
                consumer.revive(consumer.channel)
                self.logger.debug('Consumer: %s was revived', i)

        while not self._stop:
            try:
                _revive()
                break
            except ConnectionError:  # pragma: no cover
                if self.reconnect_enable:
                    self.connect()
                else:
                    self.stop()
                    break
            except Exception:  # pragma: no cover
                self.logger.exception(
                    'Error when try to revive')  # pragma: no cover
                sleep(self.reconnect_timeout)
        self.logger.debug('All consumers %s was revived...', len(self.consumers))

    @property
    def stopped(self):
        return self._stopped

    def drain_results(self):
        while self.deferred_callbacks:
            callback = self.deferred_callbacks.pop()
            try:
                callback()
                self.logger.debug('Called callback. All: %s',
                                  len(self.deferred_callbacks))
            except ConnectionError as e:  # pragma: no cover
                self.logger.error(
                    'Connection error when try callback: %s. Cause: %s. '
                    'Message will be handled on next iteration',
                    callback, e.strerror
                )
            except Exception:  # pragma no cover
                self.logger.exception(
                    'Unknown exception when try callback: %s', callback
                )

    def drain_events(self, infinity=True):

        with nested(*self.consumers):
            while not self._stop:
                try:
                    self.connection.drain_events(timeout=self.timeout)
                except socket.timeout:
                    if not infinity:
                        break
                except ConnectionError as e:  # pragma no cover
                    self.logger.error(
                        'Connection to mq has broken off because: %s. Try to reconnect, %s',
                        e)
                    self.connect()
                    self.revive()
                    break
                except HandlerError:
                    self.logger.exception('Handler error')
                except Exception as e:  # pragma no cover
                    if not self._stop:
                        self.logger.exception(
                            'Something wrong! Try to restart the loop')
                        self.revive()
                        break
                    else:  # pragma: no cover
                        self.logger.exception(
                            'Something wrong! And stopping...')
                        break
                if self.with_pool:
                    try:
                        self.drain_results()
                    except Exception:  # pragma no cover
                        self.logger.exception('Unknown error when '
                                              'draining results')
        if self._stop:
            if self.with_pool:
                try:
                    self.pool.join()
                    self.drain_results()  # pragma: no cover
                except AssertionError:
                    pass
                except Exception: # pragma: no cover
                    self.logger.exception(
                        'Unknown error when '
                        'draining results'
                    )
            self._stopped = True
            self.logger.info('Stopped draining events.')

    def run(self, debug=False):
        """Run microservice in loop, where handle connections

        :param debug: enable/disable debug mode
        :type debug: bool
        """
        if debug:
            from microservices.utils import set_logging

            set_logging('DEBUG')

        def _run():
            self._start()
            self.drain_events(infinity=True)

        while not self._stopped:
            _run()

    def read(self, count=1):
        for x in range(count):
            self.drain_events(infinity=False)