Nekmo/simple-monitor-alert

View on GitHub
simple_monitor_alert/monitor.py

Summary

Maintainability
B
4 hrs
Test Coverage
import datetime
import os
import subprocess
import warnings
import logging
import inspect
from collections import defaultdict
from time import sleep, time
from threading import Timer

import dateutil
import six
import sys

from dateutil.tz import tzlocal

from simple_monitor_alert.exceptions import InvalidScriptLineError, InvalidScriptLineLogging
from simple_monitor_alert.lines import Observable, RawLine, RawItemLine, get_observables_from_lines, RawHeaderLine

logger = logging.getLogger('sma')

TIMEOUT = 5


def get_verbose_condition(observable):
    value = observable.get_line_value('value')
    expected = observable.get_matcher()
    if hasattr(expected, 'pattern'):
        expected = expected.pattern
    elif expected:
        expected = expected.parse()
    if isinstance(expected, six.string_types) or isinstance(expected, int):
        expected = '== {}'.format(expected)
    return '{} {}'.format(value, expected)


def log_evaluate(observable, result=None, use_logger=True):
    from simple_monitor_alert.utils.system import get_hostname
    result = result or observable.evaluate()
    level = 'success' if result else observable.get_line_value('level') or 'warning'
    msg = '{} - - Trigger: [{}] ({}) {}. '.format(get_hostname(), level,
                                                  getattr(getattr(observable, 'monitor', None), 'name', '?'),
                                           observable.get_verbose_name_group())
    msg += ('Result: {}' if result else 'Assertion {} failed').format(get_verbose_condition(observable))
    if observable.param_used:
        msg += '. Param used: {}'.format(observable.param_used)
    extra_info = observable.get_line_value('extra_info')
    if extra_info:
        msg += '. Extra info: {}'.format(extra_info)
    if use_logger:
        getattr(logger, 'info' if result else 'warning')(msg)
    else:
        return msg


class Monitor(object):
    lines = None
    headers = None
    items = None
    timeout = None

    def __init__(self, script_path, sma=None):
        self.script_path = script_path
        self.sma = sma
        self.name = os.path.splitext(os.path.split(script_path)[1])[0]

    def _execute_process(self, env):
        lines = []
        popen = subprocess.Popen([self.script_path], stdout=subprocess.PIPE, env=env)

        l = self.get_timer(popen)
        started_at = time()
        # sleep(.1)

        # Realtime Read
        blocksize = 1
        line = b''
        missing_data = True
        while popen.poll() is None or missing_data:
            if self.timeout:
                # No need to wait for X-Timeout header
                blocksize = -1
            if popen.poll() is not None and missing_data:
                # I force the loop if is the last cycle
                blocksize = -1
                missing_data = False
            line += popen.stdout.read(blocksize)
            if b'\n' not in line:
                continue
            processed_lines = line.split(b'\n')
            for line in processed_lines:
                lines.append(line + b'\n')
                timeout = self.get_headers(self.parse_lines([line])).get('X-Timeout')
                timeout = int(timeout) if timeout is not None else None
                if timeout and self.timeout is None:
                    l.cancel()
                    l = self.get_timer(popen, timeout - (time() - started_at))
                self.timeout = timeout
        l.cancel()
        return lines

    def get_timer(self, popen, timeout=TIMEOUT):
        def terminate_popen():
            popen.terminate()
            popen.kill()
        timeout = timeout if timeout > 0 else 0
        l = Timer(timeout, terminate_popen)
        l.start()
        return l

    def execute(self, parameters=None):
        env = self.get_env(parameters)
        lines = self.parse_lines(self._execute_process(env))
        self.lines = list(lines)
        self.items = self.get_observables(self.lines, parameters)
        self.headers = self.get_headers(self.lines)
        # self.evaluate_items()
        return self.items.values()

    def get_env(self, parameters):
        env = os.environ
        if parameters:
            env = env.copy()
            env.update(parameters)
        return env

    def parse_lines(self, lines, on_error=InvalidScriptLineLogging):
        for i, line in enumerate(lines):
            try:
                yield RawLine.parse(line, self)
            except InvalidScriptLineError:
                if inspect.isclass(on_error) and issubclass(on_error, Warning):
                    warnings.warn_explicit(on_error(line, self.script_path), on_error, self.script_path, i + 1)
                elif inspect.isclass(on_error) and issubclass(on_error, Exception):
                    raise on_error(line, self.script_path)
                elif on_error is None:
                    pass
                else:
                    on_error(line, self.script_path)

    def save_headers(self):
        self.sma.monitors_info.set_headers(self, self.headers)
        self.sma.monitors_info.write()

    def get_header(self, header_key):
        return (self.sma.monitors_info.get_monitor(self, create=False) or {}).get('headers', {}).get(header_key)

    def save_last_execution(self):
        self.sma.monitors_info.set_last_execution(self)
        self.sma.monitors_info.write()

    def last_execution(self):
        data = self.sma.monitors_info.get_monitor(self, create=False) or {}
        last_execution = data.get('last_execution', None)
        if last_execution:
            return dateutil.parser.parse(last_execution).replace(tzinfo=tzlocal())

    def shoud_be_executed(self):
        last_execution = self.last_execution()
        run_every = self.get_header('X-Run-Every-Seconds')
        if not last_execution or not run_every:
            return True
        dt = datetime.datetime.now(dateutil.tz.tzlocal())
        return dt - last_execution >= datetime.timedelta(seconds=run_every)

    @staticmethod
    def get_observables(lines, params=None):
        return get_observables_from_lines(lines, params)

    @staticmethod
    def get_headers(lines):
        headers = {}
        for line in filter(lambda x: isinstance(x, RawHeaderLine), lines):
            headers[line.key] = line.value
        return headers


class Monitors(object):
    monitors = None
    _monitors_paths = None

    def __init__(self, monitors_dir=None, config=None, sma=None):
        # TODO: remove config parameter: get from sma
        config = config or sma.config if sma else None
        self.monitors_dir, self.config = monitors_dir, config
        self.sma = sma

    def get_monitors(self, monitors_dir=None):
        if self.monitors:
            return self.monitors
        monitors_dir = monitors_dir or self.monitors_dir
        self.monitors = [self.get_monitor(x) for x in self._get_monitors_paths(monitors_dir)]
        return self.monitors

    def _get_monitors_paths(self, monitors_dir):
        if self._monitors_paths is None:
            self._monitors_paths = [os.path.join(monitors_dir, file) for file in os.listdir(monitors_dir)]
        return self._monitors_paths

    def get_monitors_names(self, monitors_dir=None):
        monitors_dir = monitors_dir or self.monitors_dir
        return map(lambda x: os.path.splitext(x.split('/')[-1])[0], self._get_monitors_paths(monitors_dir))

    def is_monitor_enabled(self, name):
        return name in self.get_monitors_names()

    def get_monitor(self, script_path):
        return Monitor(script_path, self.sma)

    def get_monitor_params(self, monitor):
        observables = self.config.get_monitor_observables(monitor.name)
        if isinstance(observables, dict):
            observables = observables.values()
        return dict(filter(lambda x: x[1] is not None, [(observable.get_verbose_name_group(), observable.get_param())
                                                        for observable in observables]))

    @staticmethod
    def get_parameters_cycles(parameters):
        if not parameters:
            return [{}]
        names_parameters = defaultdict(list)
        for parameter, value in parameters.items():
            parameter = parameter.split('(')[0]
            names_parameters[parameter].append(value)
        cycles_num = len(sorted(names_parameters.items(), key=lambda x: len(x[1]), reverse=True)[0][1])
        cycles = []
        for i in range(cycles_num):
            cycle = {}
            cycles.append(cycle)
            for key, values in names_parameters.items():
                cycle[key] = values[i % len(values)]
        return cycles

    def execute(self, monitor):
        parameters = self.get_monitor_params(monitor)
        observables = []
        try:
            for params in self.get_parameters_cycles(parameters):
                observables.extend(monitor.execute(params))
            monitor.save_headers()
            monitor.save_last_execution()
        except PermissionError:
            warnings.warn_explicit('No permissions for monitor. Check execution perms and read perms.',
                                   UserWarning, monitor.script_path, 1)
            return []
        new_observables = []
        for observable in observables:
            if observable not in new_observables:
                new_observables.append(observable)
        return new_observables

    def execute_all(self, use_config=True):
        for monitor in self.get_monitors():
            if not monitor.shoud_be_executed():
                continue
            observables = self.execute(monitor)
            if use_config:
                self.update_observables(monitor, observables)
            for observable in observables:
                observable.set_monitor(monitor)
                yield observable

    def update_observables(self, monitor, observables):
        for observable in observables:
            config_observable = self.config.get_observable(monitor.name, observable.name, observable.group)
            observable.update_usign_observable(config_observable)