librenms/librenms

View on GitHub
LibreNMS/service.py

Summary

Maintainability
F
5 days
Test Coverage
import logging
import os
import sys
import threading
import time

import pymysql  # pylint: disable=import-error

import LibreNMS
from LibreNMS.config import DBConfig

try:
    import psutil
except ImportError:
    pass

from datetime import timedelta
from datetime import datetime
from platform import python_version
from time import sleep
from socket import gethostname
from signal import signal, SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGCHLD
from uuid import uuid1

try:
    from systemd.daemon import notify
except ImportError:
    pass

try:
    from redis.exceptions import ConnectionError as RedisConnectionError
except ImportError:

    class RedisConnectionError(Exception):
        pass


logger = logging.getLogger(__name__)


class ServiceConfig(DBConfig):
    def __init__(self):
        """
        Stores all of the configuration variables for the LibreNMS service in a common object
        Starts with defaults, but can be populated with variables from config.php by calling populate()
        """
        self._uuid = str(uuid1())
        self.set_name(gethostname())

    def set_name(self, name):
        if name:
            self.name = name.strip()
            self.unique_name = "{}-{}".format(self.name, self._uuid)

    class PollerConfig:
        def __init__(self, workers, frequency, calculate=None):
            self.enabled = True
            self.workers = workers
            self.frequency = frequency
            self.calculate = calculate

    # config variables with defaults
    BASE_DIR = os.path.abspath(
        os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)
    )

    node_id = None
    name = None
    unique_name = None
    single_instance = True
    distributed = False
    group = 0

    debug = False
    log_level = 20
    max_db_failures = 5

    alerting = PollerConfig(1, 60)
    poller = PollerConfig(24, 300)
    services = PollerConfig(8, 300)
    discovery = PollerConfig(16, 21600)
    billing = PollerConfig(2, 300, 60)
    ping = PollerConfig(1, 60)
    down_retry = 60
    update_enabled = True
    update_frequency = 86400

    master_resolution = 1
    master_timeout = 10

    redis_host = "localhost"
    redis_port = 6379
    redis_db = 0
    redis_user = None
    redis_pass = None
    redis_socket = None
    redis_sentinel = None
    redis_sentinel_user = None
    redis_sentinel_pass = None
    redis_sentinel_service = None
    redis_timeout = 60

    log_output = False
    logdir = "logs"

    watchdog_enabled = False
    watchdog_logfile = "logs/librenms.log"

    def populate(self):
        config = LibreNMS.get_config_data(self.BASE_DIR)

        # populate config variables
        self.node_id = os.getenv("NODE_ID")
        self.set_name(config.get("distributed_poller_name", None))
        self.distributed = config.get("distributed_poller", ServiceConfig.distributed)
        self.group = ServiceConfig.parse_group(
            config.get("distributed_poller_group", ServiceConfig.group)
        )

        # backward compatible options
        self.master_timeout = config.get(
            "service_master_timeout", ServiceConfig.master_timeout
        )
        self.poller.workers = config.get(
            "poller_service_workers", ServiceConfig.poller.workers
        )
        self.poller.frequency = config.get(
            "poller_service_poll_frequency", ServiceConfig.poller.frequency
        )
        self.discovery.frequency = config.get(
            "poller_service_discover_frequency", ServiceConfig.discovery.frequency
        )
        self.down_retry = config.get(
            "poller_service_down_retry", ServiceConfig.down_retry
        )
        self.log_level = config.get("poller_service_loglevel", ServiceConfig.log_level)

        # new options
        self.poller.enabled = config.get("service_poller_enabled", True)  # unused
        self.poller.workers = config.get(
            "service_poller_workers", ServiceConfig.poller.workers
        )
        self.poller.frequency = config.get(
            "service_poller_frequency", ServiceConfig.poller.frequency
        )
        self.discovery.enabled = config.get("service_discovery_enabled", True)  # unused
        self.discovery.workers = config.get(
            "service_discovery_workers", ServiceConfig.discovery.workers
        )
        self.discovery.frequency = config.get(
            "service_discovery_frequency", ServiceConfig.discovery.frequency
        )
        self.services.enabled = config.get("service_services_enabled", True)
        self.services.workers = config.get(
            "service_services_workers", ServiceConfig.services.workers
        )
        self.services.frequency = config.get(
            "service_services_frequency", ServiceConfig.services.frequency
        )
        self.billing.enabled = config.get("service_billing_enabled", True)
        self.billing.frequency = config.get(
            "service_billing_frequency", ServiceConfig.billing.frequency
        )
        self.billing.calculate = config.get(
            "service_billing_calculate_frequency", ServiceConfig.billing.calculate
        )
        self.alerting.enabled = config.get("service_alerting_enabled", True)
        self.alerting.frequency = config.get(
            "service_alerting_frequency", ServiceConfig.alerting.frequency
        )
        self.ping.enabled = config.get("service_ping_enabled", False)
        self.ping.frequency = config.get("ping_rrd_step", ServiceConfig.ping.frequency)
        self.down_retry = config.get(
            "service_poller_down_retry", ServiceConfig.down_retry
        )
        self.log_level = config.get("service_loglevel", ServiceConfig.log_level)
        self.update_enabled = config.get(
            "service_update_enabled", ServiceConfig.update_enabled
        )
        self.update_frequency = config.get(
            "service_update_frequency", ServiceConfig.update_frequency
        )

        self.redis_host = os.getenv(
            "REDIS_HOST", config.get("redis_host", ServiceConfig.redis_host)
        )
        self.redis_db = os.getenv(
            "REDIS_DB", config.get("redis_db", ServiceConfig.redis_db)
        )
        self.redis_user = os.getenv(
            "REDIS_USERNAME", config.get("redis_user", ServiceConfig.redis_user)
        )
        self.redis_pass = os.getenv(
            "REDIS_PASSWORD", config.get("redis_pass", ServiceConfig.redis_pass)
        )
        self.redis_port = int(
            os.getenv("REDIS_PORT", config.get("redis_port", ServiceConfig.redis_port))
        )
        self.redis_socket = os.getenv(
            "REDIS_SOCKET", config.get("redis_socket", ServiceConfig.redis_socket)
        )
        self.redis_sentinel = os.getenv(
            "REDIS_SENTINEL", config.get("redis_sentinel", ServiceConfig.redis_sentinel)
        )
        self.redis_sentinel_user = os.getenv(
            "REDIS_SENTINEL_USERNAME",
            config.get("redis_sentinel_user", ServiceConfig.redis_sentinel_user),
        )
        self.redis_sentinel_pass = os.getenv(
            "REDIS_SENTINEL_PASSWORD",
            config.get("redis_sentinel_pass", ServiceConfig.redis_sentinel_pass),
        )
        self.redis_sentinel_service = os.getenv(
            "REDIS_SENTINEL_SERVICE",
            config.get("redis_sentinel_service", ServiceConfig.redis_sentinel_service),
        )
        self.redis_timeout = int(
            os.getenv(
                "REDIS_TIMEOUT",
                self.alerting.frequency
                if self.alerting.frequency != 0
                else self.redis_timeout,
            )
        )

        self.db_host = os.getenv(
            "DB_HOST", config.get("db_host", ServiceConfig.db_host)
        )
        self.db_name = os.getenv(
            "DB_DATABASE", config.get("db_name", ServiceConfig.db_name)
        )
        self.db_pass = os.getenv(
            "DB_PASSWORD", config.get("db_pass", ServiceConfig.db_pass)
        )
        self.db_port = int(
            os.getenv("DB_PORT", config.get("db_port", ServiceConfig.db_port))
        )
        self.db_socket = os.getenv(
            "DB_SOCKET", config.get("db_socket", ServiceConfig.db_socket)
        )
        self.db_user = os.getenv(
            "DB_USERNAME", config.get("db_user", ServiceConfig.db_user)
        )
        self.db_sslmode = os.getenv(
            "DB_SSLMODE", config.get("db_sslmode", ServiceConfig.db_sslmode)
        )
        self.db_ssl_ca = os.getenv(
            "MYSQL_ATTR_SSL_CA", config.get("db_ssl_ca", ServiceConfig.db_ssl_ca)
        )

        self.watchdog_enabled = config.get(
            "service_watchdog_enabled", ServiceConfig.watchdog_enabled
        )
        self.logdir = config.get("log_dir", ServiceConfig.BASE_DIR + "/logs")
        self.watchdog_logfile = config.get("log_file", self.logdir + "/librenms.log")

        # set convenient debug variable
        self.debug = logging.getLogger().isEnabledFor(logging.DEBUG)

        if not self.debug and self.log_level:
            try:
                logging.getLogger().setLevel(self.log_level)
            except ValueError:
                logger.error(
                    "Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format(
                        self.log_level
                    )
                )
                logging.getLogger().setLevel(logging.INFO)

    def load_poller_config(self, db):
        try:
            settings = {}
            cursor = db.query(
                "SELECT * FROM `poller_cluster` WHERE `node_id`=%s", self.node_id
            )
            if cursor.rowcount == 0:
                return

            for index, setting in enumerate(cursor.fetchone()):
                name = cursor.description[index][0]
                settings[name] = setting

            if settings["poller_name"] is not None:
                self.set_name(settings["poller_name"])
            if settings["poller_groups"] is not None:
                self.group = ServiceConfig.parse_group(settings["poller_groups"])
            if settings["poller_enabled"] is not None:
                self.poller.enabled = settings["poller_enabled"]
            if settings["poller_frequency"] is not None:
                self.poller.frequency = settings["poller_frequency"]
            if settings["poller_workers"] is not None:
                self.poller.workers = settings["poller_workers"]
            if settings["poller_down_retry"] is not None:
                self.down_retry = settings["poller_down_retry"]
            if settings["discovery_enabled"] is not None:
                self.discovery.enabled = settings["discovery_enabled"]
            if settings["discovery_frequency"] is not None:
                self.discovery.frequency = settings["discovery_frequency"]
            if settings["discovery_workers"] is not None:
                self.discovery.workers = settings["discovery_workers"]
            if settings["services_enabled"] is not None:
                self.services.enabled = settings["services_enabled"]
            if settings["services_frequency"] is not None:
                self.services.frequency = settings["services_frequency"]
            if settings["services_workers"] is not None:
                self.services.workers = settings["services_workers"]
            if settings["billing_enabled"] is not None:
                self.billing.enabled = settings["billing_enabled"]
            if settings["billing_frequency"] is not None:
                self.billing.frequency = settings["billing_frequency"]
            if settings["billing_calculate_frequency"] is not None:
                self.billing.calculate = settings["billing_calculate_frequency"]
            if settings["alerting_enabled"] is not None:
                self.alerting.enabled = settings["alerting_enabled"]
            if settings["alerting_frequency"] is not None:
                self.alerting.frequency = settings["alerting_frequency"]
            if settings["ping_enabled"] is not None:
                self.ping.enabled = settings["ping_enabled"]
            if settings["ping_frequency"] is not None:
                self.ping.frequency = settings["ping_frequency"]
            if settings["update_enabled"] is not None:
                self.update_enabled = settings["update_enabled"]
            if settings["update_frequency"] is not None:
                self.update_frequency = settings["update_frequency"]
            if settings["loglevel"] is not None:
                self.log_level = settings["loglevel"]
            if settings["watchdog_enabled"] is not None:
                self.watchdog_enabled = settings["watchdog_enabled"]
            if settings["watchdog_log"] is not None:
                self.watchdog_logfile = settings["watchdog_log"]
        except pymysql.err.Error:
            logger.warning("Unable to load poller (%s) config", self.node_id)

    @staticmethod
    def parse_group(g):
        if g is None:
            return [0]
        elif type(g) is int:
            return [g]
        elif type(g) is str:
            try:
                return [int(x) for x in set(g.split(","))]
            except ValueError:
                pass

        logger.error("Could not parse group string, defaulting to 0")
        return [0]


class Service:
    config = ServiceConfig()
    _fp = False
    _started = False
    start_time = 0
    queue_managers = {}
    poller_manager = None
    discovery_manager = None
    last_poll = {}
    reap_flag = False
    terminate_flag = False
    reload_flag = False
    db_failures = 0

    def __init__(self):
        self.start_time = time.time()
        self.config.populate()
        self._db = LibreNMS.DB(self.config)
        self.config.load_poller_config(self._db)

        threading.current_thread().name = self.config.name  # rename main thread
        self.attach_signals()

        self._lm = self.create_lock_manager()
        self.daily_timer = LibreNMS.RecurringTimer(
            self.config.update_frequency, self.run_maintenance, "maintenance"
        )
        self.stats_timer = LibreNMS.RecurringTimer(
            self.config.poller.frequency, self.log_performance_stats, "performance"
        )
        if self.config.watchdog_enabled:
            logger.info(
                "Starting watchdog timer for log file: {}".format(
                    self.config.watchdog_logfile
                )
            )
            self.watchdog_timer = LibreNMS.RecurringTimer(
                self.config.poller.frequency, self.logfile_watchdog, "watchdog"
            )
        else:
            logger.info("Watchdog is disabled.")
        self.systemd_watchdog_timer = LibreNMS.RecurringTimer(
            10, self.systemd_watchdog, "systemd-watchdog"
        )
        self.is_master = False

    def service_age(self):
        return time.time() - self.start_time

    def attach_signals(self):
        logger.debug(
            "Attaching signal handlers on thread %s", threading.current_thread().name
        )
        signal(SIGTERM, self.terminate)  # capture sigterm and exit gracefully
        signal(SIGQUIT, self.terminate)  # capture sigquit and exit gracefully
        signal(SIGINT, self.terminate)  # capture sigint and exit gracefully
        signal(SIGHUP, self.reload)  # capture sighup and restart gracefully

        if "psutil" not in sys.modules:
            logger.warning("psutil is not available, polling gap possible")
        else:
            signal(SIGCHLD, self.reap)  # capture sigchld and reap the process

    def reap_psutil(self):
        """
        A process from a previous invocation is trying to report its status
        """
        # Speed things up by only looking at direct zombie children
        for p in psutil.Process().children(recursive=False):
            try:
                cmd = (
                    p.cmdline()
                )  # cmdline is uncached, so needs to go here to avoid NoSuchProcess
                status = p.status()

                if status == psutil.STATUS_ZOMBIE:
                    pid = p.pid
                    r = os.waitpid(p.pid, os.WNOHANG)
                    logger.warning(
                        'Reaped long running job "%s" in state %s with PID %d - job returned %d',
                        cmd,
                        status,
                        r[0],
                        r[1],
                    )
            except (OSError, psutil.NoSuchProcess):
                # process was already reaped
                continue

    def start(self):
        logger.debug("Performing startup checks...")

        if self.config.single_instance:
            self.check_single_instance()  # don't allow more than one service at a time

        if self._started:
            raise RuntimeWarning("Not allowed to start Poller twice")
        self._started = True

        logger.debug("Starting up queue managers...")

        # initialize and start the worker pools
        self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm)
        self.queue_managers["poller"] = self.poller_manager
        self.discovery_manager = LibreNMS.DiscoveryQueueManager(self.config, self._lm)
        self.queue_managers["discovery"] = self.discovery_manager
        self.queue_managers["alerting"] = LibreNMS.AlertQueueManager(
            self.config, self._lm
        )
        self.queue_managers["services"] = LibreNMS.ServicesQueueManager(
            self.config, self._lm
        )
        self.queue_managers["billing"] = LibreNMS.BillingQueueManager(
            self.config, self._lm
        )
        self.queue_managers["ping"] = LibreNMS.PingQueueManager(self.config, self._lm)

        if self.config.update_enabled:
            self.daily_timer.start()
        self.stats_timer.start()
        self.systemd_watchdog_timer.start()
        if self.config.watchdog_enabled:
            self.watchdog_timer.start()

        logger.info("LibreNMS Service: {} started!".format(self.config.unique_name))
        logger.info(
            "Poller group {}. Using Python {} and {} locks and queues".format(
                "0 (default)" if self.config.group == [0] else self.config.group,
                python_version(),
                "redis" if isinstance(self._lm, LibreNMS.RedisLock) else "internal",
            )
        )
        logger.info(
            "Queue Workers: Discovery={} Poller={} Services={} Alerting={} Billing={} Ping={}".format(
                self.config.discovery.workers
                if self.config.discovery.enabled
                else "disabled",
                self.config.poller.workers
                if self.config.poller.enabled
                else "disabled",
                self.config.services.workers
                if self.config.services.enabled
                else "disabled",
                "enabled" if self.config.alerting.enabled else "disabled",
                "enabled" if self.config.billing.enabled else "disabled",
                "enabled" if self.config.ping.enabled else "disabled",
            )
        )

        if self.config.update_enabled:
            logger.info(
                "Maintenance tasks will be run every {}".format(
                    timedelta(seconds=self.config.update_frequency)
                )
            )
        else:
            logger.warning("Maintenance tasks are disabled.")

        # Main dispatcher loop
        try:
            while not self.terminate_flag:
                if self.reload_flag:
                    logger.info("Picked up reload flag, calling the reload process")
                    self.restart()

                if self.reap_flag:
                    self.reap_flag = False
                    self.reap_psutil()

                master_lock = self._acquire_master()
                if master_lock:
                    if not self.is_master:
                        logger.info(
                            "{} is now the master dispatcher".format(self.config.name)
                        )
                        self.is_master = True
                        self.start_dispatch_timers()

                    devices = self.fetch_immediate_device_list()
                    for device in devices:
                        device_id = device[0]
                        group = device[1]

                        if device[2]:  # polling
                            self.dispatch_immediate_polling(device_id, group)

                        if device[3]:  # discovery
                            self.dispatch_immediate_discovery(device_id, group)
                else:
                    if self.is_master:
                        logger.info(
                            "{} is no longer the master dispatcher".format(
                                self.config.name
                            )
                        )
                        self.stop_dispatch_timers()
                        self.is_master = False  # no longer master
                sleep(self.config.master_resolution)
        except KeyboardInterrupt:
            pass

        logger.info("Dispatch loop terminated")
        self.shutdown()

    def _acquire_master(self):
        return self._lm.lock(
            "dispatch.master", self.config.unique_name, self.config.master_timeout, True
        )

    def _release_master(self):
        self._lm.unlock("dispatch.master", self.config.unique_name)

    # ------------ Discovery ------------
    def dispatch_immediate_discovery(self, device_id, group):
        if not self.discovery_manager.is_locked(device_id):
            self.discovery_manager.post_work(device_id, group)

    # ------------ Polling ------------
    def dispatch_immediate_polling(self, device_id, group):
        if not self.poller_manager.is_locked(device_id):
            self.poller_manager.post_work(device_id, group)

            if self.config.debug:
                cur_time = time.time()
                elapsed = cur_time - self.last_poll.get(device_id, cur_time)
                self.last_poll[device_id] = cur_time
                # arbitrary limit to reduce spam
                if elapsed > (
                    self.config.poller.frequency - self.config.master_resolution
                ):
                    logger.debug(
                        "Dispatching polling for device {}, time since last poll {:.2f}s".format(
                            device_id, elapsed
                        )
                    )

    def fetch_immediate_device_list(self):
        try:
            poller_find_time = self.config.poller.frequency - 1
            discovery_find_time = self.config.discovery.frequency - 1

            result = self._db.query(
                """SELECT `device_id`,
                  `poller_group`,
                  COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_polled_timetaken`, 0) SECOND), 1) AS `poll`,
                  IF(status=0, 0, IF (%s < `last_discovered_timetaken` * 1.25, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_discovered_timetaken`, 0) SECOND), 1))) AS `discover`
                FROM `devices`
                WHERE `disabled` = 0 AND (
                    `last_polled` IS NULL OR
                    `last_discovered` IS NULL OR
                    `last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_polled_timetaken`, 0) SECOND) OR
                    `last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL COALESCE(`last_discovered_timetaken`, 0) SECOND)
                )
                ORDER BY `last_polled_timetaken` DESC""",
                (
                    poller_find_time,
                    self.service_age(),
                    discovery_find_time,
                    poller_find_time,
                    discovery_find_time,
                ),
            )
            self.db_failures = 0
            return result
        except pymysql.err.Error:
            self.db_failures += 1
            if self.db_failures > self.config.max_db_failures:
                logger.warning(
                    "Too many DB failures ({}), attempting to release master".format(
                        self.db_failures
                    )
                )
                self._release_master()
                sleep(
                    self.config.master_resolution
                )  # sleep to give another node a chance to acquire
            return []

    def run_maintenance(self):
        """
        Runs update and cleanup tasks by calling daily.sh.  Reloads the python script after the update.
        Sets a schema-update lock so no distributed pollers will update until the schema has been updated.
        """
        attempt = 0
        wait = 5
        max_runtime = 86100
        max_tries = int(max_runtime / wait)
        logger.info("Waiting for schema lock")
        while not self._lm.lock("schema-update", self.config.unique_name, max_runtime):
            attempt += 1
            if attempt >= max_tries:  # don't get stuck indefinitely
                logger.warning(
                    "Reached max wait for other pollers to update, updating now"
                )
                break
            sleep(wait)

        logger.info("Running maintenance tasks")
        exit_code, output = LibreNMS.call_script("daily.sh")
        if exit_code == 0:
            logger.info("Maintenance tasks complete\n{}".format(output))
        else:
            logger.error("Error {} in daily.sh:\n{}".format(exit_code, output))

        self._lm.unlock("schema-update", self.config.unique_name)

        self.restart()

    def create_lock_manager(self):
        """
        Create a new LockManager.  Tries to create a Redis LockManager, but falls
        back to python's internal threading lock implementation.
        Exits if distributing poller is enabled and a Redis LockManager cannot be created.
        :return: Instance of LockManager
        """
        try:
            return LibreNMS.RedisLock(
                sentinel_kwargs={
                    "username": self.config.redis_sentinel_user,
                    "password": self.config.redis_sentinel_pass,
                    "socket_timeout": self.config.redis_timeout,
                    "unix_socket_path": self.config.redis_socket,
                },
                namespace="librenms.lock",
                host=self.config.redis_host,
                port=self.config.redis_port,
                db=self.config.redis_db,
                username=self.config.redis_user,
                password=self.config.redis_pass,
                unix_socket_path=self.config.redis_socket,
                sentinel=self.config.redis_sentinel,
                sentinel_service=self.config.redis_sentinel_service,
                socket_timeout=self.config.redis_timeout,
            )
        except ImportError:
            if self.config.distributed:
                logger.critical(
                    "ERROR: Redis connection required for distributed polling"
                )
                logger.critical(
                    "Please install redis-py, either through your os software repository or from PyPI"
                )
                self.exit(2)
        except Exception as e:
            if self.config.distributed:
                logger.critical(
                    "ERROR: Redis connection required for distributed polling"
                )
                logger.critical(
                    "Lock manager could not connect to Redis. {}: {}".format(
                        type(e).__name__, e
                    )
                )
                self.exit(2)

        return LibreNMS.ThreadingLock()

    def restart(self):
        """
        Stop then recreate this entire process by re-calling the original script.
        Has the effect of reloading the python files from disk.
        """
        if sys.version_info < (3, 4, 0):
            logger.warning(
                "Skipping restart as running under an incompatible interpreter"
            )
            logger.warning("Please restart manually")
            return

        logger.info("Restarting service... ")

        if "psutil" not in sys.modules:
            logger.warning("psutil is not available, polling gap possible")
            self._stop_managers_and_wait()
        else:
            self._stop_managers()
        self._release_master()

        python = sys.executable
        sys.stdout.flush()
        os.execl(python, python, *sys.argv)

    def reap(self, signalnum=None, flag=None):
        """
        Handle a set the reload flag to begin a clean restart
        :param signalnum: UNIX signal number
        :param flag: Flags accompanying signal
        """
        self.reap_flag = True

    def reload(self, signalnum=None, flag=None):
        """
        Handle a set the reload flag to begin a clean restart
        :param signalnum: UNIX signal number
        :param flag: Flags accompanying signal
        """
        logger.info(
            "Received signal on thread %s, handling", threading.current_thread().name
        )
        self.reload_flag = True

    def terminate(self, signalnum=None, flag=None):
        """
        Handle a set the terminate flag to begin a clean shutdown
        :param signalnum: UNIX signal number
        :param flag: Flags accompanying signal
        """
        logger.info(
            "Received signal on thread %s, handling", threading.current_thread().name
        )
        self.terminate_flag = True

    def shutdown(self, signalnum=None, flag=None):
        """
        Stop and exit, waiting for all child processes to exit.
        :param signalnum: UNIX signal number
        :param flag: Flags accompanying signal
        """
        logger.info("Shutting down, waiting for running jobs to complete...")

        self.stop_dispatch_timers()
        self._release_master()

        self.daily_timer.stop()
        self.stats_timer.stop()
        self.systemd_watchdog_timer.stop()
        if self.config.watchdog_enabled:
            self.watchdog_timer.stop()

        self._stop_managers_and_wait()

        # try to release master lock
        logger.info(
            "Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name
        )
        self.exit(0)

    def start_dispatch_timers(self):
        """
        Start all dispatch timers and begin pushing events into queues.
        This should only be started when we are the master dispatcher.
        """
        for manager in self.queue_managers.values():
            try:
                manager.start_dispatch()
            except AttributeError:
                pass

    def stop_dispatch_timers(self):
        """
        Stop all dispatch timers, this should be called when we are no longer the master dispatcher.
        """
        for manager in self.queue_managers.values():
            try:
                manager.stop_dispatch()
            except AttributeError:
                pass

    def _stop_managers(self):
        for manager in self.queue_managers.values():
            manager.stop()

    def _stop_managers_and_wait(self):
        """
        Stop all QueueManagers, and wait for their processing threads to complete.
        We send the stop signal to all QueueManagers first, then wait for them to finish.
        """
        self._stop_managers()

        for manager in self.queue_managers.values():
            manager.stop_and_wait()

    def check_single_instance(self):
        """
        Check that there is only one instance of the service running on this computer.
        We do this be creating a file in the base directory (.lock.service) if it doesn't exist and
        obtaining an exclusive lock on that file.
        """
        lock_file = "{}/{}".format(self.config.BASE_DIR, ".lock.service")

        import fcntl

        self._fp = open(
            lock_file, "w"
        )  # keep a reference so the file handle isn't garbage collected
        self._fp.flush()
        try:
            fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            logger.warning("Another instance is already running, quitting.")
            self.exit(2)

    def log_performance_stats(self):
        logger.info("Counting up time spent polling")

        try:
            # Report on the poller instance as a whole
            self._db.query(
                "INSERT INTO poller_cluster(node_id, poller_name, poller_version, poller_groups, last_report, master) "
                'values("{0}", "{1}", "{2}", "{3}", NOW(), {4}) '
                'ON DUPLICATE KEY UPDATE poller_version="{2}", poller_groups="{3}", last_report=NOW(), master={4}; '.format(
                    self.config.node_id,
                    self.config.name,
                    "librenms-service",
                    ",".join(str(i) for i in self.config.group),
                    1 if self.is_master else 0,
                )
            )

            # Find our ID
            self._db.query(
                'SELECT id INTO @parent_poller_id FROM poller_cluster WHERE node_id="{0}"; '.format(
                    self.config.node_id
                )
            )

            for worker_type, manager in self.queue_managers.items():
                worker_seconds, devices = manager.performance.reset()

                # Record the queue state
                self._db.query(
                    "INSERT INTO poller_cluster_stats(parent_poller, poller_type, depth, devices, worker_seconds, workers, frequency) "
                    'values(@parent_poller_id, "{0}", {1}, {2}, {3}, {4}, {5}) '
                    "ON DUPLICATE KEY UPDATE depth={1}, devices={2}, worker_seconds={3}, workers={4}, frequency={5}; ".format(
                        worker_type,
                        sum(
                            [
                                manager.get_queue(group).qsize()
                                for group in self.config.group
                            ]
                        ),
                        devices,
                        worker_seconds,
                        getattr(self.config, worker_type).workers,
                        getattr(self.config, worker_type).frequency,
                    )
                )
        except (pymysql.err.Error, ConnectionResetError, RedisConnectionError):
            logger.critical(
                "Unable to log performance statistics - is the database still online?",
                exc_info=True,
            )

    def systemd_watchdog(self):
        if "systemd.daemon" in sys.modules:
            notify("WATCHDOG=1")

    def logfile_watchdog(self):

        try:
            # check that lofgile has been written to within last poll period
            logfile_mdiff = datetime.now().timestamp() - os.path.getmtime(
                self.config.watchdog_logfile
            )
        except FileNotFoundError as e:
            logger.error("Log file not found! {}".format(e))
            return

        if logfile_mdiff > self.config.poller.frequency:
            logger.critical(
                "BARK! Log file older than {}s, restarting service!".format(
                    self.config.poller.frequency
                ),
                exc_info=True,
            )
            self.restart()
        else:
            logger.info("Log file updated {}s ago".format(int(logfile_mdiff)))

    def exit(self, code=0):
        sys.stdout.flush()
        sys.exit(code)