georgeyk/loafer

View on GitHub
loafer/managers.py

Summary

Maintainability
A
0 mins
Test Coverage
import asyncio
import logging
import os

from cached_property import cached_property

from .dispatchers import LoaferDispatcher
from .exceptions import ConfigurationError
from .routes import Route
from .runners import LoaferRunner

logger = logging.getLogger(__name__)


class LoaferManager:

    def __init__(self, routes, runner=None, _concurrency_limit=None, _max_threads=None):
        self._concurrency_limit = _concurrency_limit
        if runner is None:
            self.runner = LoaferRunner(on_stop_callback=self.on_loop__stop, max_workers=_max_threads)
        else:
            self.runner = runner

        self.routes = routes

    @cached_property
    def dispatcher(self):
        if not (self.routes and all(isinstance(r, Route) for r in self.routes)):
            raise ConfigurationError('invalid routes to dispatch, routes={}'.format(self.routes))

        return LoaferDispatcher(self.routes, max_jobs=self._concurrency_limit)

    def run(self, forever=True, debug=False):
        loop = self.runner.loop
        self._future = asyncio.ensure_future(
            self.dispatcher.dispatch_providers(forever=forever),
            loop=loop,
        )

        self._future.add_done_callback(self.on_future__errors)
        if not forever:
            self._future.add_done_callback(self.runner.prepare_stop)

        start = 'starting loafer, pid={}, forever={}'
        logger.info(start.format(os.getpid(), forever))
        self.runner.start(debug=debug)

    #
    # Callbacks
    #

    def on_future__errors(self, future):
        if future.cancelled():
            return self.runner.prepare_stop()

        exc = future.exception()
        # Unhandled errors crashes the event loop execution
        if isinstance(exc, BaseException):
            logger.critical('fatal error caught: {!r}'.format(exc))
            self.runner.prepare_stop()

    def on_loop__stop(self, *args, **kwargs):
        logger.info('cancel dispatcher operations ...')

        if hasattr(self, '_future'):
            self._future.cancel()

        self.dispatcher.stop()