marcus67/little_brother

View on GitHub
little_brother/app_control.py

Summary

Maintainability
D
2 days
Test Coverage
# -*- coding: utf-8 -*-

# Copyright (C) 2019-2021  Marcus Rickert
#
# See https://github.com/marcus67/little_brother
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

import datetime
import socket
import sys
import time

import distro
import prometheus_client

from little_brother import admin_event
from little_brother import client_stats
from little_brother import constants
from little_brother import dependency_injection
from little_brother import process_statistics
from little_brother import rule_override
from little_brother import settings
from little_brother.admin_data_handler import AdminDataHandler
from little_brother.api.master_connector import MasterConnector
from little_brother.app_control_config_model import AppControlConfigModel
from little_brother.client_info import ClientInfo
from little_brother.event_handler import EventHandler
from little_brother.language import Language
from little_brother.persistence.persistent_dependency_injection_mix_in import PersistenceDependencyInjectionMixIn
from little_brother.persistence.persistent_user import User
from little_brother.persistence.session_context import SessionContext
from little_brother.process_handler_manager import ProcessHandlerManager
from little_brother.prometheus import PrometheusClient
from little_brother.rule_handler import RuleHandler
from little_brother.user_locale_handler import UserLocaleHandler
from little_brother.user_manager import UserManager
from python_base_app import log_handling
from python_base_app import tools
from python_base_app.base_user_handler import BaseUserHandler

ALEMBIC_VERSION_INIT_GUI_CONFIGURATION = ""

SECTION_NAME = "AppControl"

LAST_VERSION_WITHOUT_CLIENT_STAT_SUPPORT = "0.3.8"
MINIMUM_VERSION_WITH_CLIENT_STAT_SUPPORT = "0.3.9"

CSS_CLASS_MAXIMUM_PING_EXCEEDED = "node_inactive"
CSS_CLASS_CLIENT_VERSION_OUTDATED = "node_outdated"

# Dummy function to trigger extraction by pybabel...
_ = lambda x, y=None: x


class AppControl(PersistenceDependencyInjectionMixIn):

    def __init__(self, p_config,
                 p_debug_mode,
                 p_process_handlers=None,
                 p_device_handler=None,
                 p_notification_handlers=None,
                 p_login_mapping=None,
                 p_locale_helper=None):

        super().__init__()

        self._logger = log_handling.get_logger(self.__class__.__name__)

        self._config: AppControlConfigModel = p_config
        self._debug_mode = p_debug_mode
        self._process_handlers = p_process_handlers
        self._device_handler = p_device_handler

        self._rule_handler = None
        self._notification_handlers = p_notification_handlers
        self._master_connector = None
        self._prometheus_client = None
        self._user_handler = None
        self._locale_helper = p_locale_helper
        self._time_last_successful_send_events = tools.get_current_time()
        self._user_locale_handler = UserLocaleHandler()
        self._admin_data_handler = None

        if self._config.hostname is None:
            self._host_name = socket.getfqdn()

        else:
            self._host_name = self._config.hostname

        self._event_handler = EventHandler(p_host_name=self._host_name, p_is_master=self.is_master())
        dependency_injection.container[EventHandler] = self._event_handler

        self._event_handler.register_event_handler(
            p_event_type=admin_event.EVENT_TYPE_START_CLIENT, p_handler=self.handle_event_start_client)
        self._event_handler.register_event_handler(
            p_event_type=admin_event.EVENT_TYPE_START_MASTER, p_handler=self.handle_event_start_master)
        self._event_handler.register_event_handler(
            p_event_type=admin_event.EVENT_TYPE_UPDATE_CONFIG, p_handler=self.handle_event_update_config)

        self._language = Language()

        self._process_handler_manager = ProcessHandlerManager(
            p_config=self._config, p_process_handlers=self._process_handlers, p_is_master=self.is_master(),
            p_login_mapping=p_login_mapping, p_language=self._language)

        dependency_injection.container[ProcessHandlerManager] = self._process_handler_manager

        self._user_manager = UserManager(p_config=self._config,
                                         p_login_mapping=p_login_mapping,
                                         p_server_group=self._config.server_group,
                                         p_is_master=self.is_master())
        dependency_injection.container[UserManager] = self._user_manager

        with SessionContext(p_persistence=self.persistence) as session_context:
            self._user_manager.reset_users(p_session_context=session_context)

        self._could_not_send = False
        self._client_infos = {}
        self._start_time = time.time()

    @property
    def rule_handler(self) -> RuleHandler:

        if self._rule_handler is None:
            self._rule_handler = dependency_injection.container[RuleHandler]

        return self._rule_handler

    @property
    def admin_data_handler(self) -> AdminDataHandler:

        if self._admin_data_handler is None:
            self._admin_data_handler = dependency_injection.container[AdminDataHandler]

        return self._admin_data_handler

    @property
    def master_connector(self) -> MasterConnector:

        if self._master_connector is None:
            self._master_connector = dependency_injection.container[MasterConnector]

        return self._master_connector

    @property
    def user_handler(self) -> BaseUserHandler:

        if self._user_handler is None:
            self._user_handler = dependency_injection.container[BaseUserHandler]

        return self._user_handler

    @property
    def prometheus_client(self) -> PrometheusClient:

        if self._prometheus_client is None:
            self._prometheus_client = dependency_injection.container[PrometheusClient]

        return self._prometheus_client

    @property
    def check_interval(self):
        return self._config.check_interval

    def set_user_configs(self, p_user_configs):

        with SessionContext(p_persistence=self.persistence) as session_context:

            session = session_context.get_session()

            # Delete all locally known users since the message from the master will not contain users
            # that have been deleted on the master. These might otherwise remain active on the client.
            for user in self.user_entity_manager.users(p_session_context=session_context):
                session.delete(user)

            session.commit()

            for username, user_config in p_user_configs.items():
                user = self.user_entity_manager.get_by_username(p_session_context=session_context, p_username=username)

                if user is None:
                    user = User()
                    user.username = username
                    session.add(user)

                user.process_name_pattern = user_config[constants.JSON_PROCESS_NAME_PATTERN]
                user.prohibited_process_name_pattern = user_config.get(constants.JSON_PROHIBITED_PROCESS_NAME_PATTERN)
                user.active = user_config[constants.JSON_ACTIVE]

                fmt = "Set config for {user}"
                self._logger.info(fmt.format(user=str(user)))

            session.commit()
            self._user_manager.reset_users(p_session_context=session_context)
            self._process_handler_manager.reset_process_patterns()

    def reset_process_patterns(self):
        self._process_handler_manager.reset_process_patterns()

    def is_master(self):

        if self.master_connector is None:
            # This is for test cases which do not instantiate a master connector
            return True

        else:
            return self.master_connector._config.host_url is None

    def set_prometheus_http_requests_summary(self, p_hostname, p_service, p_duration):

        if self.prometheus_client is not None:
            # try to resolve ip addresses
            p_hostname = tools.get_dns_name_by_ip_address(p_ip_address=p_hostname)
            self.prometheus_client.set_http_requests_summary(p_hostname=p_hostname,
                                                              p_service=p_service,
                                                              p_duration=p_duration)

    def set_metrics(self):

        with SessionContext(p_persistence=self.persistence) as session_context:
            if self.prometheus_client is not None:
                self.prometheus_client.set_uptime(p_hostname="master", p_uptime=time.time() - self._start_time)

                self.prometheus_client.set_number_of_monitored_users(
                    self._user_manager.get_number_of_monitored_users())
                self.prometheus_client.set_number_of_configured_users(
                    len(self.user_entity_manager.user_map(session_context)))

                if self._config.scan_active:
                    self.prometheus_client.set_monitored_host(self._host_name, True)

                latest_ping_time = tools.get_current_time() + \
                                   datetime.timedelta(seconds=-self._config.maximum_client_ping_interval)

                for hostname, client_info in self._client_infos.items():
                    active = client_info.last_message > latest_ping_time
                    self.prometheus_client.set_monitored_host(hostname, active)

                if self._device_handler is not None:
                    self.prometheus_client.set_number_of_monitored_devices(
                        self._device_handler.get_number_of_monitored_devices())

                    for device_info in self._device_handler.device_infos.values():
                        self.prometheus_client.set_device_active(
                            device_info.device_name, 1 if device_info.is_up else 0)
                        self.prometheus_client.set_device_response_time(
                            device_info.device_name, device_info.response_time)
                        self.prometheus_client.set_device_moving_average_response_time(
                            device_info.device_name, device_info.moving_average_response_time)

                else:
                    self.prometheus_client.set_number_of_monitored_devices(0)

                for client_info in self._client_infos.values():
                    if client_info.client_stats is not None:
                        self.prometheus_client.set_client_stats(p_hostname=client_info.host_name,
                                                                 p_client_stats=client_info.client_stats)

    def check(self):

        with SessionContext(p_persistence=self.persistence) as session_context:
            self._user_manager.retrieve_user_mappings(p_session_context=session_context)

        reference_time = datetime.datetime.now()

        self._event_handler.process_queue()

        if self.is_master():
            self.process_rule_sets_for_all_users(p_reference_time=reference_time)
            self._event_handler.process_queue()
            self.set_metrics()

        else:
            self.send_events()
            self.check_network()
            self._event_handler.process_queue()


    # def load_offline_users(self):
    #
    #     self._logger.info("Loading offline users to ensure termination of active processes while network is down "
    #                       "or server is not available.")
    #     self._logger.info(f"Currently configured users: {self._}")
    #
    #     with SessionContext(p_persistence=self.persistence) as session_context:
    #         self._user_manager.reset_users(p_session_context=session_context)

    def check_network(self):

        time_since_last_send = int((tools.get_current_time() - self._time_last_successful_send_events).total_seconds())

        if (self._config.warning_time_without_send_events <= time_since_last_send <
                self._config.maximum_time_without_send_events):
            self._logger.warning(f"No successful send events for {time_since_last_send} seconds")
            #self.load_offline_users()

        elif time_since_last_send >= self._config.maximum_time_without_send_events:
            self._process_handler_manager.queue_artificial_kill_events()

    def start(self):

        if self.is_master():
            fmt = "Starting application in MASTER mode"
            self._logger.info(fmt)

            self._process_handler_manager.load_historic_process_infos()
            self._process_handler_manager.send_historic_process_infos()
            self.admin_data_handler.load_rule_overrides()
        #             self.queue_broadcast_event_start_master()

        else:
            fmt = "Starting application in CLIENT mode communicating with master at URL {master_url}"
            self._logger.info(fmt.format(master_url=self.master_connector._get_api_url()))

            self.queue_event_start_client()

        fmt = "Using fully qualified domain name '%s' for process infos" % self._host_name
        self._logger.info(fmt)

        self._process_handler_manager.register_events()
        self._user_manager.register_events()

    def stop(self):

        for handler in self._process_handlers.values():
            events = handler.get_artificial_termination_events()

            fmt = "Artificially terminating {process_count} active processes on {handler_id}"
            self._logger.info(fmt.format(process_count=len(events), handler_id=handler.id))

            self._event_handler.queue_events(p_events=events, p_to_master=True)

        self._event_handler.process_queue()

        if not self.is_master():
            self.send_events()

    def clean_history(self):

        history_length_in_days = self._config.history_length_in_days

        msg = "Deleting historic entries older than {days} days..."
        self._logger.info(msg.format(days=history_length_in_days))

        with SessionContext(p_persistence=self.persistence) as session_context:
            self.rule_override_entity_manager.delete_historic_entries(p_session_context=session_context,
                                                                      p_history_length_in_days=history_length_in_days)
            self.process_info_entity_manager.delete_historic_entries(p_session_context=session_context,
                                                                     p_history_length_in_days=history_length_in_days)
            self.admin_event_entity_manager.delete_historic_entries(p_session_context=session_context,
                                                                    p_history_length_in_days=history_length_in_days)

    def handle_event_update_config(self, p_event):

        if constants.JSON_USER_CONFIG in p_event.payload:
            # New mode (version >= 0.3.13)
            user_config_payload = p_event.payload[constants.JSON_USER_CONFIG]
            self._config.maximum_time_without_send_events = p_event.payload.get(
                constants.JSON_MAXIMUM_TIME_WITHOUT_SEND)

        else:
            # Compatibility mode (version < 0.3.13)
            user_config_payload = p_event.payload

        user_configs = {}

        for username, user_config in user_config_payload.items():
            user_configs[username] = user_config

        self.set_user_configs(p_user_configs=user_configs)

    def update_client_info(self, p_hostname, p_client_stats=None, p_suppress_send_state_update=False):

        client_info = self._client_infos.get(p_hostname)

        if client_info is None:
            client_info = ClientInfo(p_host_name=p_hostname, p_client_stats=p_client_stats, p_is_master=False,
                                     p_maximum_client_ping_interval=self._config.maximum_client_ping_interval,
                                     p_master_version=self.get_client_version(),
                                     )
            self._client_infos[p_hostname] = client_info
            self.send_config_to_client(p_hostname)

            with SessionContext(p_persistence=self.persistence) as session_context:
                self._user_manager.send_login_mapping_to_client(p_session_context=session_context,
                                                                p_hostname=p_hostname)

        client_info.last_message = tools.get_current_time()
        client_info.client_stats = p_client_stats

    def handle_event_start_client(self, p_event):

        self.update_client_info(p_event.hostname, p_suppress_send_state_update=True)
        self.send_config_to_client(p_event.hostname)

        with SessionContext(p_persistence=self.persistence) as session_context:
            self._user_manager.send_login_mapping_to_client(p_session_context=session_context,
                                                            p_hostname=p_event.hostname)

        self._process_handler_manager.send_historic_process_infos()

    def handle_event_start_master(self, p_event):

        self._process_handler_manager.queue_artificial_activation_events()

    def send_config_to_client(self, p_hostname):

        config = {}

        with SessionContext(p_persistence=self.persistence) as session_context:
            user_config = {
                user.username: {constants.JSON_PROCESS_NAME_PATTERN: user.process_name_pattern,
                                constants.JSON_PROHIBITED_PROCESS_NAME_PATTERN: user.prohibited_process_name_pattern,
                                constants.JSON_ACTIVE: user.active}
                for user in self.user_entity_manager.users(p_session_context=session_context)
            }

        config[constants.JSON_USER_CONFIG] = user_config
        config[constants.JSON_MAXIMUM_TIME_WITHOUT_SEND] = self._config.maximum_time_without_send_events

        self.queue_event_update_config(p_hostname=p_hostname, p_config=config)

    def send_config_to_all_clients(self):

        for client in self._client_infos.values():
            self.send_config_to_client(p_hostname=client.host_name)

    def process_rule_sets_for_all_users(self, p_reference_time):

        fmt = "Processing rules for all users START..."
        self._logger.debug(fmt)

        with SessionContext(p_persistence=self.persistence) as session_context:
            users_stat_infos = process_statistics.get_process_statistics(
                p_process_infos=self._process_handler_manager.get_process_infos(),
                p_reference_time=p_reference_time,
                p_max_lookback_in_days=self._config.process_lookback_in_days,
                p_user_map=self.user_entity_manager.user_map(session_context),
                p_min_activity_duration=self._config.min_activity_duration)

            active_time_extensions = self.time_extension_entity_manager.get_active_time_extensions(
                p_session_context=session_context, p_reference_datetime=tools.get_current_time())

            for user in self.user_entity_manager.users(session_context):
                if user.active and user.username in self._user_manager.usernames:

                    user_active = False

                    user_locale = self._user_locale_handler.get_user_locale(
                        p_username=user.username, p_session_context=session_context)

                    rule_set = self.rule_handler.get_active_ruleset(
                        p_rule_sets=user.rulesets, p_reference_date=p_reference_time.date())

                    stat_infos = users_stat_infos.get(user.username)

                    if stat_infos is not None:
                        stat_info = stat_infos.get(rule_set.context)

                        if stat_info is not None:
                            self._logger.debug(str(stat_info))

                            key_rule_override = rule_override.get_key(p_username=user.username,
                                                                      p_reference_date=p_reference_time.date())
                            override = self.admin_data_handler.rule_overrides.get(key_rule_override)

                            if override is not None:
                                self._logger.debug(str(override))

                            rule_result_info = self.rule_handler.process_rule_sets_for_user(
                                p_rule_sets=user.rulesets,
                                p_stat_info=stat_info,
                                p_active_time_extension=active_time_extensions.get(user.username),
                                p_reference_time=p_reference_time,
                                p_rule_override=override,
                                p_locale=user_locale)

                            self.update_current_user_status(rule_result_info, session_context, stat_info, user)
                            self._process_handler_manager.handle_rule_result_info(rule_result_info, stat_info, user)
                            user_active = stat_info.current_activity is not None

                    if self.prometheus_client is not None:
                        self.prometheus_client.set_user_active(p_username=user.username, p_is_active=user_active)

        fmt = "Processing rules for all users END..."
        self._logger.debug(fmt)

    def update_current_user_status(self, p_rule_result_info, p_session_context, p_stat_info, p_user):
        current_user_status = self._user_manager.get_current_user_status(
            p_session_context=p_session_context, p_username=p_user.username)

        if p_stat_info.current_activity_start_time is not None and \
                p_rule_result_info.limited_session_time():
            current_user_status.minutes_left_in_session = \
                p_rule_result_info.get_minutes_left_in_session()

        else:
            current_user_status.minutes_left_in_session = None

        current_user_status.activity_allowed = p_rule_result_info.activity_allowed()
        current_user_status.logged_in = p_stat_info.current_activity is not None

    ###################################################################################################################
    ###################################################################################################################
    ###################################################################################################################

    def queue_event_start_client(self):

        event = admin_event.AdminEvent(
            p_event_type=admin_event.EVENT_TYPE_START_CLIENT,
            p_hostname=self._host_name)
        self._event_handler.queue_event(p_event=event, p_to_master=True)

    def queue_broadcast_event_start_master(self):

        for hostname, client_info in self._client_infos.items():
            if not client_info.start_event_sent:
                event = admin_event.AdminEvent(
                    p_event_type=admin_event.EVENT_TYPE_START_MASTER,
                    p_hostname=hostname)
                self._event_handler.queue_event(p_event=event, p_is_action=True)
                client_info.start_event_sent = True

    def queue_event_update_config(self, p_hostname, p_config):

        event = admin_event.AdminEvent(
            p_event_type=admin_event.EVENT_TYPE_UPDATE_CONFIG,
            p_hostname=p_hostname,
            p_payload=p_config)
        self._event_handler.queue_event(p_event=event, p_is_action=True)

    def queue_event_historic_process_start(self, p_pinfo):

        event = admin_event.AdminEvent(
            p_event_type=admin_event.EVENT_TYPE_PROCESS_START,
            p_hostname=p_pinfo.hostname,
            p_pid=p_pinfo.pid,
            p_username=p_pinfo.username,
            p_process_start_time=p_pinfo.start_time)
        self._event_handler.queue_event(p_event=event, p_is_action=True)

    def get_unmonitored_users(self, p_session_context):

        return [username for username in self.user_handler.list_users() if
                username not in self.user_entity_manager.user_map(p_session_context)]

    def get_unmonitored_devices(self, p_user, p_session_context):

        monitored_devices = [user2device.device.device_name for user2device in p_user.devices]
        return [device for device in self.device_entity_manager.devices(p_session_context)
                if device.device_name not in monitored_devices]

    def get_topology_infos(self, p_session_context):

        topology_infos = [info for info in self._client_infos.values()]

        master_stats = self.get_client_stats()
        master_info = ClientInfo(p_is_master=True, p_host_name=self._host_name, p_client_stats=master_stats)

        topology_infos.append(master_info)

        sorted_infos = sorted(topology_infos, key=lambda info: (0 if info.is_master else 1, info.host_name))
        return sorted_infos

    def get_client_version(self):
        return settings.settings['version']

    def get_client_stats(self):

        a_client_stats = client_stats.ClientStats(
            p_version=self.get_client_version(),
            p_revision=settings.extended_settings['debian_package_revision'],
            p_python_version="{major}.{minor}.{micro}".format(major=sys.version_info.major,
                                                              minor=sys.version_info.minor,
                                                              micro=sys.version_info.micro),
            p_running_in_docker=tools.running_in_docker(),
            p_running_in_snap=tools.running_in_snap(),
            p_linux_distribution=distro.name(pretty=True)
        )

        if not self.is_master():
            # Use the Prometheus Client ProcessCollector to retrieve run time stats for CPU and memory usage
            # to be consistent with the stats collected on the master.

            collector = prometheus_client.process_collector.PROCESS_COLLECTOR
            stats = collector.collect()

            for stat in stats:
                if stat.name == 'process_resident_memory_bytes':
                    a_client_stats.resident_memory_bytes = stat.samples[0].value

                elif stat.name == 'process_start_time_seconds':
                    a_client_stats.start_time_seconds = stat.samples[0].value

                elif stat.name == 'process_cpu_seconds':
                    a_client_stats.cpu_seconds_total = stat.samples[0].value

        return a_client_stats

    def send_events(self):

        outgoing_events = self._event_handler.get_outgoing_events()

        try:
            fmt = "Sending {number} event(s) to master"
            self._logger.debug(fmt.format(number=len(outgoing_events)))
            result = self.master_connector.send_events(p_hostname=self._host_name,
                                                       p_events=outgoing_events,
                                                       p_client_stats=self.get_client_stats())
            self._time_last_successful_send_events = tools.get_current_time()

            if self._could_not_send:
                self._process_handler_manager.queue_artificial_activation_events()
                self._could_not_send = False

            if result is not None:
                self._event_handler.receive_events(p_json_data=result)

        except Exception as e:

            fmt = "Exception '{estr}' while sending events to master. Requeueing events..."
            self._logger.error(fmt.format(estr=str(e)))
            self._event_handler.queue_outgoing_events(p_events=outgoing_events)

            self._could_not_send = True

            if self._debug_mode:
                fmt = "Propagating exception due to debug_mode=True"
                self._logger.warn(fmt)
                raise e

    def receive_client_stats(self, p_json_data):

        return tools.objectify_dict(p_dict=p_json_data, p_class=client_stats.ClientStats)

    def handle_downtime(self, p_downtime):

        for process_handler in self._process_handlers.values():
            events = process_handler.get_downtime_corrected_admin_events(p_downtime=p_downtime)

            self._event_handler.queue_events(p_events=events, p_to_master=True)

    def add_new_user(self, p_session_context, p_username, p_locale=None):

        self.user_entity_manager.add_new_user(p_session_context=p_session_context, p_username=p_username,
                                              p_locale=p_locale)
        self._user_manager.add_monitored_user(p_username=p_username)
        self._user_manager.reset_users(p_session_context=p_session_context)
        self.send_config_to_all_clients()