lliendo/Radar

View on GitHub
radar/plugin/__init__.py

Summary

Maintainability
A
2 hrs
Test Coverage
# -*- coding: utf-8 -*-

"""
This file is part of Radar.

Radar is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

Radar is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.

You should have received a copy of the Lesser GNU General Public License
along with Radar. If not, see <http://www.gnu.org/licenses/>.

Copyright 2015 Lucas Liendo.
"""


from queue import Empty as EmptyQueue
from abc import ABCMeta
from ctypes import cast, py_object
from functools import reduce
from os.path import dirname, join as join_path
from threading import Thread, Event
from ..logger import RadarLogger
from ..config import ConfigBuilder, ConfigError
from ..misc import Switchable
from ..protocol import Message


class ServerPluginError(Exception):
    pass


class ServerPlugin(ConfigBuilder, Switchable):

    __metaclass__ = ABCMeta

    PLUGIN_NAME = ''
    PLUGIN_VERSION = '0.0.1'
    PLUGIN_CONFIG_FILE = ''
    DEFAULT_CONFIG = {}

    def __init__(self):
        if not self.PLUGIN_NAME:
            raise ServerPluginError('Error - Plugin name not defined.')

        try:
            ConfigBuilder.__init__(self, self.PLUGIN_CONFIG_FILE)
            self.merge_config(self.DEFAULT_CONFIG)
        except ConfigError:
            self.config = self.DEFAULT_CONFIG

        Switchable.__init__(self, enabled=self.config.get('enabled', True))
        self._message_actions = {
            Message.TYPE['CHECK REPLY']: self.on_check_reply,
            Message.TYPE['TEST REPLY']: self.on_test_reply,
        }

    @staticmethod
    def get_path(source_filename, config_filename):
        return join_path(dirname(source_filename), config_filename)

    def run(self, address, port, message_type, checks, contacts):
        action = self._message_actions[message_type]
        action(address, port, checks, contacts)

    def log(self, message):
        RadarLogger.log('Plugin \'{:}\' v{:}. {:}'.format(self.PLUGIN_NAME, self.PLUGIN_VERSION, message))

    def configure(self, logger):
        RadarLogger.log('Loading plugin : \'{:}\' v{:}.'.format(self.PLUGIN_NAME, self.PLUGIN_VERSION))
        self.on_start()

    def on_start(self):
        """ Implement this method to initialize the plugin. """
        pass

    def on_check_reply(self, address, port, checks, contacts):
        """ Implement this method to process a check reply. """
        pass

    def on_test_reply(self, address, port, checks, contacts):
        """ Implement this method to process a test reply. """
        pass

    def on_shutdown(self):
        """ Implement this method to tear down the plugin. """
        pass

    def to_dict(self):
        return super(ServerPlugin, self).to_dict(['id', 'plugin_id', 'plugin_version', 'enabled'])

    def __eq__(self, other_plugin):
        return (self.PLUGIN_NAME == other_plugin.PLUGIN_NAME) and \
            (self.PLUGIN_VERSION == other_plugin.PLUGIN_VERSION)


class PluginManager(Thread):

    STOP_EVENT_TIMEOUT = 0.2

    def __init__(self, platform_setup, queue, stop_event=None):
        Thread.__init__(self)
        self._plugins = platform_setup.plugins
        self._queue = queue
        self.stop_event = stop_event or Event()

    # We dereference ids, to avoid re-instantiating objects. We can actually
    # do this because we're on the same process address space.
    def _dereference(self, ids):
        return [cast(object_id, py_object).value for object_id in ids]

    def _flatten(self, list_of_lists):
        return reduce(lambda l, m: l + m, list_of_lists)

    def _get_plugin_args(self, message):
        return (
            message['address'],
            message['port'],
            message['message_type'],
            self._flatten([c.as_list() for c in self._dereference(message['check_ids'])]),
            self._flatten([c.as_list() for c in self._dereference(message['contact_ids'])]),
        )

    def is_stopped(self):
        return self.stop_event.is_set()

    def _run_plugin(self, plugin, address, port, message_type, checks, contacts):
        try:
            plugin.run(address, port, message_type, checks, contacts)
        except Exception as e:
            RadarLogger.log('Error - Plugin \'{:}\' version \'{:}\' raised an error. Details : {:}.'.format(
                plugin.PLUGIN_NAME, plugin.PLUGIN_VERSION, e))

    def _run_plugins(self, queue_message):
        plugin_args = self._get_plugin_args(queue_message)
        [self._run_plugin(p, *plugin_args) for p in self._plugins if p.enabled]

    def run(self):
        while not self.is_stopped():
            try:
                self._run_plugins(self._queue.get_nowait())
            except EmptyQueue:
                self.stop_event.wait(self.STOP_EVENT_TIMEOUT)