georgeyk/loafer

View on GitHub
loafer/dispatchers.py

Summary

Maintainability
A
0 mins
Test Coverage
import asyncio
import logging
import sys

from .exceptions import DeleteMessage

logger = logging.getLogger(__name__)


class LoaferDispatcher:

    def __init__(self, routes, max_jobs=None):
        self.routes = routes
        jobs = max_jobs or len(routes) * 10
        self._semaphore = asyncio.Semaphore(jobs)

    async def dispatch_message(self, message, route):
        logger.debug('dispatching message to route={}'.format(route))
        confirm_message = False
        if not message:
            logger.warning('message will be ignored:\n{!r}\n'.format(message))
            return confirm_message

        with await self._semaphore:
            try:
                confirm_message = await route.deliver(message)
            except DeleteMessage:
                logger.info('explicit message deletion\n{}\n'.format(message))
                confirm_message = True
            except asyncio.CancelledError:
                msg = '"{!r}" was cancelled, the message will not be acknowledged:\n{}\n'
                logger.warning(msg.format(route.handler, message))
            except Exception as exc:
                logger.exception('{!r}'.format(exc))
                exc_info = sys.exc_info()
                confirm_message = await route.error_handler(exc_info, message)

        return confirm_message

    async def _process_message(self, message, route):
        confirmation = await self.dispatch_message(message, route)
        if confirmation:
            provider = route.provider
            await provider.confirm_message(message)
        return confirmation

    async def _get_route_messages(self, route):
        messages = await route.provider.fetch_messages()
        return messages, route

    async def _dispatch_tasks(self):
        provider_messages_tasks = [
            self._get_route_messages(route) for route in self.routes
        ]

        process_messages_tasks = []
        for provider_task in asyncio.as_completed(provider_messages_tasks):
            messages, route = await provider_task

            process_messages_tasks += [
                self._process_message(message, route) for message in messages
            ]

        if not process_messages_tasks:
            return

        await asyncio.gather(*process_messages_tasks)

    async def dispatch_providers(self, forever=True):
        while True:
            await self._dispatch_tasks()

            if not forever:
                break

    def stop(self):
        for route in self.routes:
            route.stop()