thenaterhood/heartbeat

View on GitHub
src/heartbeat/routing/__init__.py

Summary

Maintainability
C
7 hrs
Test Coverage
import datetime
from heartbeat.platform import Topics
from heartbeat.multiprocessing import Cache
import logging
import traceback
from enum import Enum
from queue import Queue
from time import sleep


class RateLimitHandler(object):

    """
    Handles rate limiting events
    """

    def __init__(self, topic_strategies=None, event_cache=None, time_cache=None):
        """
        Constructor

        Params:
            dict() strategies: mapping of event types to limit strategies
            Cache event_cache
            Cache time_cache
        """

        if topic_strategies is None:
            topic_strategies = {
                Topics.WARNING: self.event_different_from_previous,
                Topics.INFO: self.event_different_from_previous,
                Topics.DEBUG: self.event_different_from_previous,
                Topics.VIRT: self.event_different_from_previous,
                Topics.HEARTBEAT: self.always_allow,
                Topics.STARTUP: self.always_allow,
                Topics.ACK: self.always_allow
            }

        self.topic_strategies = topic_strategies

        if event_cache is None:
            event_cache = Cache('EventServerevent-previous-cache')

        self.event_cache = event_cache

        if time_cache is None:
            time_cache = Cache('EventServerevent-time-cache')

        self.time_cache = time_cache

    def always_allow(self, event):
        return True

    def event_different_from_previous(self, event):
        """
        Checks if an event is the same as the previous received
        from the same notifier"
        """
        duplicate = False

        if (self.event_cache.exists(event.source)):
            previous = self.event_cache.read(event.source)
            duplicate = (previous == event.__hash__())

        return not duplicate

    def event_delay_passed(self, event):
        """
        Checks if a specific event happened repeatedly within two hours
        """
        delay_passed = True

        if (self.time_cache.exists(event.__hash__())):
            lastSeen = datetime.datetime.fromtimestamp(
                self.time_cache.read(event.__hash__())
                )
            two_hours_ago = datetime.timedelta(hours=2)
            delay_passed = (datetime.datetime.now() - lastSeen) > two_hours_ago

        return delay_passed

    def allow_event(self, event):
        """
        Whether an event should be allowed to be pushed

        Params:
            Event event
        Returns:
            bool
        """
        allow = False
        if not event.type in self.topic_strategies:
            allow = self.event_different_from_previous(event)

        else:
            allow = self.topic_strategies[event.type](event)

        if allow:
            self.log_event(event)

        return allow

    def log_event(self, event):
        """
        Stores the event time and logs the event as the latest
        from the particular monitor (no duplicate events in a row)
        """
        self.time_cache.write(event.__hash__(), event.when)
        self.event_cache.write(event.source, event.__hash__())
        self.time_cache.writeToDisk()
        self.event_cache.writeToDisk()


class EventRouter(object):

    """
    Handles dispatching events to subscribers based on the topic
    of the event.
    """

    def __init__(self, threadpool, limiter=None, logger=None):
        """
        Constructor

        Params:
            list notifiers: the configured notifiers to push to
            Func limit_strategy: the function to call when checking if an event
                should be thrown or not. None defaults to monitor-based
                (doesn't throw the same event twice in a row from a monitor)
        """
        if (logger is None):
            self.logger = logging.getLogger(
                __name__ + "." + self.__class__.__name__
            )
        else:
            self.logger = logger

        self.topics = {}
        for t in Topics:
            self.topics[t] = []

        if (limiter is None):
            limiter = RateLimitHandler()

        self.limiter = limiter
        self.queue = Queue()
        self.worker_running = False

        self.threadpool = threadpool

    def attach(self, topic, callback):
        """
        Allows other systems to subscribe to events
        of different topics.

        Params:
            Topic topic: topic to subscribe to
            Callable callback: Method to call when a new event of the topic
                is received
        """
        self.logger.debug("%s has subscribed to %s", str(callback), str(topic))
        self.topics[topic].append(callback)

    def put_event(self, event):
        """
        Starts the thread to push notifications

        Params:
            Event event: The event to notify of
        """
        self.logger.info("Event Generated: %s", event.__str__())
        if (self.limiter.allow_event(event)):
            self.logger.debug("Dispatching Event")
            self.queue.put(event)
            self._start_worker()
        else:
            self.logger.debug(
                "Skipping dispatch per limit strategy")

    def _forward_event(self, event):
        """
        Forwards an event to all the handlers subscribed to
        the topic the event is categorized as
        """
        for t in self.topics[event.type]:
            f = self.threadpool.submit(t, event)
            f.add_done_callback(self._check_call_status)

    def _event_queue_worker(self):
        """
        Worker method to be run in a thread to process the event queue
        """
        while not self.queue.empty():
            item = self.queue.get()
            self._forward_event(item)
            self.queue.task_done()
            sleep(3)
        self.logger.debug("Event queue is empty, router worker shutting down")
        self.worker_running = False

    def _start_worker(self):
        if self.worker_running is not True:
            self.worker_running = True
            self.logger.debug("Starting event router worker")
            self.threadpool.submit(self._event_queue_worker)

    def _check_call_status(self, f):
        """
        Checks the status of a completed (or crashed)
        submission to the handler threadpool. This
        method is intended to be submitted to the Future
        via add_done_callback, rather than being
        called directly.

        Params:
            Future f
        """
        error = f.exception(5)
        if error is None:
            return
        else:
            try:
                framesummary = traceback.extract_tb(error.__traceback__)[-1]
                location = "{:s}:{:d}".format(framesummary.filename, framesummary.lineno)
            except (AttributeError, IndexError):
                location = " -- "
            self.logger.error("Handler: %s at %s", str(error), location)


if __name__ == "__main__":
    pass