wglass/lighthouse

View on GitHub
lighthouse/configs/watcher.py

Summary

Maintainability
A
0 mins
Test Coverage
import logging
import threading

from concurrent import futures

from .monitor import ConfigFileMonitor
from lighthouse.events import wait_on_event


MAX_WORKERS = 8


logger = logging.getLogger(__name__)


class ConfigWatcher(object):
    """
    Base class for watchers that monitor and maintain `Configurable` instances.

    Subclasses define which `Configurable` subclasses they watch via the
    `watched_configurables` attribute as well as implement the `run()` and
    `wind_down()` methods.

    Optionally, subclasses can also define "on_<configurable>_<action>" methods
    (e.g. "on_service_update") that will hook into the add/update/remove
    configurable callbacks.

    .. warning::
       Care must be taken that these hooks are idempotent with regards
       to the Watcher subclass instance.  Configuration changes are liable to
       happen at any time and in any order.
    """

    # the list or tuple of Configurable subclasses to watch
    watched_configurables = ()

    def __init__(self, config_dir):
        self.config_dir = config_dir

        self.observers = []
        self.configurables = {}
        for config_class in self.watched_configurables:
            self.configurables[config_class] = {}

        self.work_pool = futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
        self.thread_pool = {}

        self.shutdown = threading.Event()

    def start(self):
        """
        Iterates over the `watched_configurabes` attribute and starts a
        config file monitor for each.  The resulting observer threads are
        kept in an `observers` list attribute.
        """
        for config_class in self.watched_configurables:
            monitor = ConfigFileMonitor(config_class, self.config_dir)
            self.observers.append(
                monitor.start(
                    self.add_configurable,
                    self.update_configurable,
                    self.remove_configurable
                )
            )

        wait_on_event(self.shutdown)

    def wind_down(self):
        """
        This method is called in the `stop()` method once the config file
        observers are stopped but before any threads are joined.

        Subclasses are expected to implement this.
        """
        raise NotImplementedError

    def launch_thread(self, name, fn, *args, **kwargs):
        """
        Adds a named thread to the "thread pool" dictionary of Thread objects.

        A daemon thread that executes the passed-in function `fn` with the
        given args and keyword args is started and tracked in the `thread_pool`
        attribute with the given `name` as the key.
        """
        logger.debug(
            "Launching thread '%s': %s(%s, %s)", name,
            fn, args, kwargs
        )
        self.thread_pool[name] = threading.Thread(
            target=fn, args=args, kwargs=kwargs
        )
        self.thread_pool[name].daemon = True
        self.thread_pool[name].start()

    def kill_thread(self, name):
        """
        Joins the thread in the `thread_pool` dict with the given `name` key.
        """
        if name not in self.thread_pool:
            return

        self.thread_pool[name].join()
        del self.thread_pool[name]

    def add_configurable(self, configurable_class, name, configurable):
        """
        Callback fired when a configurable instance is added.

        Adds the configurable to the proper "registry" and calls a method
        named "on_<configurable classname>_add" in the work pool if the hook
        is defined.

        If the added configurable is already present, `update_configurable()`
        is called instead.
        """
        configurable_class_name = configurable_class.__name__.lower()

        logger.info("Adding %s: '%s'", configurable_class_name, name)

        registry = self.registry_for(configurable_class)

        if name in registry:
            logger.warn(
                "Adding already-existing %s: '%s'",
                configurable_class_name, name
            )

        registry[name] = configurable

        hook = self.hook_for(configurable_class, action="add")
        if not hook:
            return

        def done(f):
            try:
                f.result()
            except Exception:
                logger.exception("Error adding configurable '%s'", name)

        self.work_pool.submit(hook, configurable).add_done_callback(done)

    def update_configurable(self, configurable_class, name, config):
        """
        Callback fired when a configurable instance is updated.

        Looks up the existing configurable in the proper "registry" and
        `apply_config()` is called on it.

        If a method named "on_<configurable classname>_update" is defined it
        is called in the work pool and passed the configurable's name, the old
        config and the new config.

        If the updated configurable is not present, `add_configurable()` is
        called instead.
        """
        configurable_class_name = configurable_class.__name__.lower()

        logger.info(
            "updating %s: '%s'", configurable_class_name, name
        )

        registry = self.registry_for(configurable_class)

        if name not in registry:
            logger.warn(
                "Tried to update unknown %s: '%s'",
                configurable_class_name, name
            )
            self.add_configurable(
                configurable_class,
                configurable_class.from_config(name, config)
            )
            return

        registry[name].apply_config(config)

        hook = self.hook_for(configurable_class, "update")
        if not hook:
            return

        def done(f):
            try:
                f.result()
            except Exception:
                logger.exception("Error updating configurable '%s'", name)

        self.work_pool.submit(hook, name, config).add_done_callback(done)

    def remove_configurable(self, configurable_class, name):
        """
        Callback fired when a configurable instance is removed.

        Looks up the existing configurable in the proper "registry" and
        removes it.

        If a method named "on_<configurable classname>_remove" is defined it
        is called via the work pooland passed the configurable's name.

        If the removed configurable is not present, a warning is given and no
        further action is taken.
        """
        configurable_class_name = configurable_class.__name__.lower()

        logger.info("Removing %s: '%s'", configurable_class_name, name)

        registry = self.registry_for(configurable_class)

        if name not in registry:
            logger.warn(
                "Tried to remove unknown active %s: '%s'",
                configurable_class_name, name
            )
            return

        hook = self.hook_for(configurable_class, action="remove")
        if not hook:
            registry.pop(name)
            return

        def done(f):
            try:
                f.result()
                registry.pop(name)
            except Exception:
                logger.exception("Error removing configurable '%s'", name)

        self.work_pool.submit(hook, name).add_done_callback(done)

    def registry_for(self, configurable_class):
        """
        Helper method for retrieving the "registry" dictionary of a given
        Configurable subclass.

        For example, the registry of Cluster instances for a config watcher
        would be `self.configurables[Cluster]`.
        """
        return self.configurables[configurable_class]

    def hook_for(self, configurable_class, action):
        """
        Helper method for determining if an on_<configurable class>_<action>
        method is present, to be used as a hook in the add/update/remove
        configurable methods.
        """
        configurable_class_name = configurable_class.__name__.lower()

        return getattr(
            self,
            "on_" + configurable_class_name + "_" + action,
            None
        )

    def stop(self):
        """
        Method for shutting down the watcher.

        All config file observers are stopped and their threads joined, along
        with the worker thread pool.
        """
        self.shutdown.set()

        for monitor in self.observers:
            monitor.stop()

        self.wind_down()

        for monitor in self.observers:
            monitor.join()

        for thread in self.thread_pool.values():
            thread.join()

        self.work_pool.shutdown()