fuzeman/trakt.py

View on GitHub
trakt/core/emitter.py

Summary

Maintainability
A
1 hr
Test Coverage


import logging

# concurrent.futures is optional
try:
    from concurrent.futures import ThreadPoolExecutor
except ImportError:
    ThreadPoolExecutor = None


log = logging.getLogger(__name__)


class Emitter(object):
    threading = False
    threading_workers = 2

    __constructed = False
    __name = None

    __callbacks = None
    __threading_pool = None

    def __ensure_constructed(self):
        if self.__constructed:
            return

        self.__callbacks = {}
        self.__constructed = True

        if self.threading:
            if ThreadPoolExecutor is None:
                raise Exception('concurrent.futures is required for threading')

            self.__threading_pool = ThreadPoolExecutor(max_workers=self.threading_workers)

    def __log(self, message, *args, **kwargs):
        if self.__name is None:
            self.__name = '%s.%s' % (
                self.__module__,
                self.__class__.__name__
            )

        log.debug(
            ('[%s]:' % self.__name.ljust(34)) + str(message),
            *args, **kwargs
        )

    def __wrap(self, callback, *args, **kwargs):
        def wrap(func):
            callback(*args, func=func, **kwargs)
            return func

        return wrap

    def on(self, events, func=None, on_bound=None):
        if not func:
            # assume decorator, wrap
            return self.__wrap(self.on, events, on_bound=on_bound)

        if not isinstance(events, (list, tuple)):
            events = [events]

        self.__log('on(events: %s, func: %s)', repr(events), repr(func))

        self.__ensure_constructed()

        for event in events:
            if event not in self.__callbacks:
                self.__callbacks[event] = []

            # Bind callback to event
            self.__callbacks[event].append(func)

        # Call 'on_bound' callback
        if on_bound:
            self.__call(on_bound, kwargs={
                'func': func
            })

        return self

    def once(self, event, func=None):
        if not func:
            # assume decorator, wrap
            return self.__wrap(self.once, event)

        self.__log('once(event: %s, func: %s)', repr(event), repr(func))

        def once_callback(*args, **kwargs):
            self.off(event, once_callback)
            func(*args, **kwargs)

        self.on(event, once_callback)

        return self

    def off(self, event=None, func=None):
        self.__log('off(event: %s, func: %s)', repr(event), repr(func))

        self.__ensure_constructed()

        if event and event not in self.__callbacks:
            return self

        if func and func not in self.__callbacks[event]:
            return self

        if event and func:
            self.__callbacks[event].remove(func)
        elif event:
            self.__callbacks[event] = []
        elif func:
            raise ValueError('"event" is required if "func" is specified')
        else:
            self.__callbacks = {}

        return self

    def emit(self, event, *args, **kwargs):
        suppress = kwargs.pop('__suppress', False)

        if not suppress:
            self.__log('emit(event: %s, args: %s, kwargs: %s)', repr(event), repr_trim(args), repr_trim(kwargs))

        self.__ensure_constructed()

        if event not in self.__callbacks:
            return

        for callback in list(self.__callbacks[event]):
            self.__call(callback, args, kwargs, event)

        return self

    def emit_on(self, event, *args, **kwargs):
        func = kwargs.pop('func', None)

        if not func:
            # assume decorator, wrap
            return self.__wrap(self.emit_on, event, *args, **kwargs)

        self.__log('emit_on(event: %s, func: %s, args: %s, kwargs: %s)',
                   repr(event), repr(func), repr(args), repr(kwargs))

        # Bind func from wrapper
        self.on(event, func)

        # Emit event (calling 'func')
        self.emit(event, *args, **kwargs)

    def pipe(self, events, other):
        if type(events) is not list:
            events = [events]

        self.__log('pipe(events: %s, other: %s)', repr(events), repr(other))

        self.__ensure_constructed()

        for event in events:
            self.on(event, PipeHandler(event, other.emit))

        return self

    def __call(self, callback, args=None, kwargs=None, event=None):
        args = args or ()
        kwargs = kwargs or {}

        if self.threading:
            return self.__call_async(callback, args, kwargs, event)

        return self.__call_sync(callback, args, kwargs, event)

    @classmethod
    def __call_sync(cls, callback, args=None, kwargs=None, event=None):
        try:
            callback(*args, **kwargs)
            return True
        except Exception as ex:
            log.warning('[%s] Exception raised in: %s - %s' % (event, cls.__function_name(callback), ex), exc_info=True)
            return False

    def __call_async(self, callback, args=None, kwargs=None, event=None):
        self.__threading_pool.submit(self.__call_sync, callback, args, kwargs, event)

    @staticmethod
    def __function_name(func):
        fragments = []

        # Try append class name
        cls = getattr(func, 'im_class', None)

        if cls and hasattr(cls, '__name__'):
            fragments.append(cls.__name__)

        # Append function name
        fragments.append(func.__name__)

        return '.'.join(fragments)


class PipeHandler(object):
    def __init__(self, event, callback):
        self.event = event
        self.callback = callback

    def __call__(self, *args, **kwargs):
        self.callback(self.event, *args, **kwargs)


def on(emitter, event, func=None):
    emitter.on(event, func)

    return {
        'destroy': lambda: emitter.off(event, func)
    }


def once(emitter, event, func=None):
    return emitter.once(event, func)


def off(emitter, event, func=None):
    return emitter.off(event, func)


def emit(emitter, event, *args, **kwargs):
    return emitter.emit(event, *args, **kwargs)


def repr_trim(value, length=1000):
    value = repr(value)

    if len(value) < length:
        return value

    return '<%s - %s characters>' % (type(value).__name__, len(value))