marcus67/little_brother

View on GitHub
little_brother/process_handler_manager.py

Summary

Maintainability
D
2 days
Test Coverage
# -*- coding: utf-8 -*-

# Copyright (C) 2019-2021  Marcus Rickert
#
# See https://github.com/marcus67/little_brother
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

import datetime
import os.path
import socket

from little_brother import admin_event
from little_brother import dependency_injection
from little_brother import process_info
from little_brother.admin_data_handler import AdminDataHandler
from little_brother.admin_event import AdminEvent
from little_brother.api.master_connector import MasterConnector
from little_brother.app_control_config_model import AppControlConfigModel
from little_brother.event_handler import EventHandler
from little_brother.language import Language
from little_brother.persistence.persistent_dependency_injection_mix_in import PersistenceDependencyInjectionMixIn
from little_brother.persistence.session_context import SessionContext
from little_brother.process_handler import ProcessHandler
from little_brother.prometheus import PrometheusClient
from little_brother.rule_handler import RuleResultInfo
from little_brother.user_locale_handler import UserLocaleHandler
from little_brother.user_manager import UserManager
from python_base_app import log_handling

DEFAULT_SCAN_ACTIVE = True
DEFAULT_ADMIN_LOOKAHEAD_IN_DAYS = 7  # days
DEFAULT_PROCESS_LOOKUP_IN_DAYS = 7  # days
DEFAULT_HISTORY_LENGTH_IN_DAYS = 180  # days
DEFAULT_MIN_ACTIVITY_DURATION = 60  # seconds
DEFAULT_CHECK_INTERVAL = 5  # seconds
DEFAULT_INDEX_REFRESH_INTERVAL = 60  # seconds
DEFAULT_TOPOLOGY_REFRESH_INTERVAL = 60  # seconds
DEFAULT_MAXIMUM_CLIENT_PING_INTERVAL = 60  # seconds
DEFAULT_WARNING_TIME_WITHOUT_SEND_EVENTS = 3 * DEFAULT_CHECK_INTERVAL  # seconds
DEFAULT_MAXIMUM_TIME_WITHOUT_SEND_EVENTS = 10 * DEFAULT_CHECK_INTERVAL  # minutes
DEFAULT_KILL_PROCESS_DELAY = 10  # seconds
DEFAULT_TIME_EXTENSION_PERIODS = "-30,-15,-5,5,10,15,30,45,60"

ALEMBIC_VERSION_INIT_GUI_CONFIGURATION = ""

SECTION_NAME = "AppControl"

LAST_VERSION_WITHOUT_CLIENT_STAT_SUPPORT = "0.3.8"
MINIMUM_VERSION_WITH_CLIENT_STAT_SUPPORT = "0.3.9"

CSS_CLASS_MAXIMUM_PING_EXCEEDED = "node_inactive"
CSS_CLASS_CLIENT_VERSION_OUTDATED = "node_outdated"

# Dummy function to trigger extraction by pybabel...
_ = lambda x, y=None: x


class ProcessHandlerManager(PersistenceDependencyInjectionMixIn):

    def __init__(self,
                 p_config,
                 p_is_master,
                 p_login_mapping,
                 p_language:Language,
                 p_process_handlers=None):

        super().__init__()

        self._logger = log_handling.get_logger(self.__class__.__name__)

        self._config: AppControlConfigModel = p_config
        self._is_master = p_is_master
        self._process_handlers = p_process_handlers
        self._login_mapping = p_login_mapping
        self._language = p_language

        self._event_handler = None
        self._rule_handler = None
        self._master_connector = None
        self._prometheus_client = None
        self._user_manager = None

        self._user_locale_handler = UserLocaleHandler()
        self._admin_data_handler = None

        if self._config.hostname is None:
            self._host_name = socket.getfqdn()

        else:
            self._host_name = self._config.hostname

        self._process_regex_map = None
        self._prohibited_process_regex_map = None

        self._locale_dir = os.path.join(os.path.dirname(__file__), "translations")

        self._logout_warnings = {}

    @property
    def admin_data_handler(self) -> AdminDataHandler:

        if self._admin_data_handler is None:
            self._admin_data_handler = dependency_injection.container[AdminDataHandler]

        return self._admin_data_handler

    @property
    def prometheus_client(self) -> PrometheusClient:

        if self._prometheus_client is None:
            self._prometheus_client = dependency_injection.container[PrometheusClient]

        return self._prometheus_client

    @property
    def master_connector(self) -> MasterConnector:

        if self._master_connector is None:
            self._master_connector = dependency_injection.container[MasterConnector]

        return self._master_connector

    @property
    def event_handler(self) -> EventHandler:

        if self._event_handler is None:
            self._event_handler = dependency_injection.container[EventHandler]

        return self._event_handler

    @property
    def user_manager(self) -> UserManager:

        if self._user_manager is None:
            self._user_manager = dependency_injection.container[UserManager]

        return self._user_manager

    @property
    def process_regex_map(self):

        with SessionContext(p_persistence=self.persistence) as session_context:
            if self._process_regex_map is None:
                self._process_regex_map = {}

                for user in self.user_entity_manager.users(session_context):
                    if user.active:
                        self._process_regex_map[user.username] = user.regex_process_name_pattern

        return self._process_regex_map

    @property
    def prohibited_process_regex_map(self):

        with SessionContext(p_persistence=self.persistence) as session_context:
            if self._prohibited_process_regex_map is None:
                self._prohibited_process_regex_map = {}

                for user in self.user_entity_manager.users(session_context):
                    self._prohibited_process_regex_map[user.username] = user.regex_prohibited_process_name_pattern

        return self._prohibited_process_regex_map

    def reset_process_patterns(self):
        self._process_regex_map = None
        self._prohibited_process_regex_map = None

    def register_events(self):
        self.event_handler.register_event_handler(
            p_event_type=admin_event.EVENT_TYPE_PROHIBITED_PROCESS, p_handler=self.handle_event_prohibited_process)
        self.event_handler.register_event_handler(
            p_event_type=admin_event.EVENT_TYPE_PROCESS_START, p_handler=self.handle_event_process_start)
        self.event_handler.register_event_handler(
            p_event_type=admin_event.EVENT_TYPE_PROCESS_DOWNTIME, p_handler=self.handle_event_process_downtime)
        self.event_handler.register_event_handler(
            p_event_type=admin_event.EVENT_TYPE_PROCESS_END, p_handler=self.handle_event_process_end)
        self.event_handler.register_event_handler(
            p_event_type=admin_event.EVENT_TYPE_KILL_PROCESS, p_handler=self.handle_event_kill_process)

    def get_process_handler(self, p_id:str, p_raise_exception=False):

        handler = self._process_handlers.get(p_id)

        if handler is None and p_raise_exception:
            fmt = "Unknown process handler '{handler_id}'"
            raise RuntimeError(fmt.format(handler_id=p_id))

        return handler

    def handle_event_process_downtime(self, p_event):

        pinfo, updated = self.get_process_handler(p_id=p_event.processhandler).handle_event_process_downtime(p_event)

        if self.persistence is not None and updated:
            with SessionContext(p_persistence=self.persistence) as session_context:
                self.process_info_entity_manager.update_process_info(
                    p_session_context=session_context, p_process_info=pinfo)

    def handle_event_process_start(self, p_event):

        process_handler = self.get_process_handler(p_id=p_event.processhandler)

        if process_handler is None:
            msg = "Received event for process handler of type id '{id}' which is not registered -> discarding event"
            self._logger.warning(msg.format(id=p_event.processhandler))
            return

        pinfo, updated = process_handler.handle_event_process_start(p_event)

        if updated:
            if self.persistence is not None:
                with SessionContext(p_persistence=self.persistence) as session_context:
                    self.process_info_entity_manager.update_process_info(
                        p_session_context=session_context, p_process_info=pinfo)

        else:
            if self.persistence is not None:
                with SessionContext(p_persistence=self.persistence) as session_context:
                    self.process_info_entity_manager.write_process_info(
                        p_session_context=session_context, p_process_info=pinfo)

        if self._is_master:
            rule_result_info = self.admin_data_handler.get_current_rule_result_info(
                p_reference_time=datetime.datetime.now(), p_process_infos=self.get_process_infos(),
                p_username=p_event.username)

            if rule_result_info is not None and rule_result_info.activity_allowed():
                with SessionContext(p_persistence=self.persistence) as session_context:

                    if rule_result_info.limited_session_time():
                        self.user_manager.issue_notification(
                            p_session_context=session_context,
                            p_username=p_event.username,
                            p_message=self._language.get_text_limited_session_start(p_locale=rule_result_info.locale,
                                                                                    p_variables=rule_result_info.args))
                    else:
                        self.user_manager.issue_notification(
                            p_session_context=session_context,
                            p_username=p_event.username,
                            p_message=self._language.get_text_unlimited_session_start(p_locale=rule_result_info.locale,
                                                                                      p_variables=rule_result_info.args))

    def handle_event_prohibited_process(self, p_event: AdminEvent):

        process_handler = self.get_process_handler(p_id=p_event.processhandler)

        if process_handler is None:
            msg = "Received event for process handler of type id '{id}' which is not registered -> discarding event"
            self._logger.warning(msg.format(id=p_event.processhandler))
            return

        if self._is_master:
            rule_result_info = self.admin_data_handler.get_current_rule_result_info(
                p_reference_time=datetime.datetime.now(), p_process_infos=self.get_process_infos(),
                p_username=p_event.username)

            if rule_result_info is not None and rule_result_info.activity_allowed():
                with SessionContext(p_persistence=self.persistence) as session_context:

                    variables = rule_result_info.args
                    variables['process_name'] = p_event.processname

                    self.user_manager.issue_notification(
                        p_session_context=session_context,
                        p_username=p_event.username,
                        p_message=self._language.get_text_prohibited_process(p_locale=rule_result_info.locale,
                                                                             p_variables=variables))

                    self.queue_event_kill_process(
                        p_hostname=p_event.hostname,
                        p_username=p_event.username,
                        p_processhandler=process_handler.id,
                        p_pid=p_event.pid,
                        p_process_start_time=p_event.process_start_time,
                        p_delay=0)

    def handle_event_process_end(self, p_event):

        pinfo = self.get_process_handler(p_id=p_event.processhandler).handle_event_process_end(p_event)

        if pinfo is None:
            return

        if self.persistence is not None:
            with SessionContext(p_persistence=self.persistence) as session_context:
                self.process_info_entity_manager.update_process_info(
                    p_session_context=session_context, p_process_info=pinfo)

    def handle_event_kill_process(self, p_event):
        with SessionContext(p_persistence=self.persistence) as session_context:
            return self.get_process_handler(p_id=p_event.processhandler).handle_event_kill_process(
                p_session_context=session_context, p_event=p_event,
                p_server_group=self._config.server_group, p_login_mapping=self._login_mapping)

    def scan_processes(self, p_process_handler: ProcessHandler, p_reference_time=None, p_queue_events=True):  # @DontTrace

        if p_reference_time is None:
            p_reference_time = datetime.datetime.now()

        with SessionContext(p_persistence=self.persistence) as session_context:
            events = p_process_handler.scan_processes(
                p_session_context=session_context,
                p_server_group=self._config.server_group, p_login_mapping=self._login_mapping,
                p_host_name=self._host_name,
                p_process_regex_map=self.process_regex_map,
                p_prohibited_process_regex_map=self.prohibited_process_regex_map,
                p_reference_time=p_reference_time)

            if p_queue_events:
                self.event_handler.queue_events(p_events=events, p_to_master=True)

                if not self._is_master:
                    self.event_handler.queue_events_locally(p_events=events)

    def get_process_infos(self):

        process_infos = {}

        for handler in self._process_handlers.values():
            process_infos.update(handler.process_infos)

        return process_infos

    def load_historic_process_infos(self):

        with SessionContext(p_persistence=self.persistence) as session_context:
            pinfos = self.process_info_entity_manager.load_process_infos(
                p_session_context=session_context, p_lookback_in_days=self._config.process_lookback_in_days + 1)

        counter_open_end_time = 0

        for pinfo in pinfos:

            if pinfo.end_time is None:
                end_time = None

            else:
                end_time = pinfo.end_time

            new_pinfo = process_info.ProcessInfo(
                p_hostname=pinfo.hostname,
                p_hostlabel=pinfo.hostlabel,
                p_pid=pinfo.pid,
                p_processhandler=pinfo.processhandler,
                p_username=pinfo.username,
                p_processname=pinfo.processname,
                p_start_time=pinfo.start_time,
                p_downtime=pinfo.downtime,
                p_percent=pinfo.percent,
                p_end_time=end_time)

            if not new_pinfo.is_valid():
                fmt = "Loaded {process_info} is invalid -> ignore"
                self._logger.warning(fmt.format(process_info=str(new_pinfo)))
                continue

            if new_pinfo.end_time is None:
                counter_open_end_time = counter_open_end_time + 1

            process_handler = self.get_process_handler(p_id=new_pinfo.processhandler, p_raise_exception=False)

            if process_handler is not None:
                process_handler.add_historic_process(p_process_info=new_pinfo)

        fmt = "Loaded %d historic process infos from database looking back %s days (%d of which had no end time)" % (
            len(pinfos), self._config.process_lookback_in_days, counter_open_end_time)
        self._logger.info(fmt)

    def send_historic_process_infos(self):

        counter = 0

        for pinfo in self.get_process_infos().values():
            if pinfo.end_time is None and pinfo.hostname != self._host_name:
                self.queue_event_historic_process_start(p_pinfo=pinfo)
                counter = counter + 1

        fmt = "Sent %d historic process infos to clients" % counter
        self._logger.info(fmt)

    def queue_event_historic_process_start(self, p_pinfo):

        event = admin_event.AdminEvent(
            p_event_type=admin_event.EVENT_TYPE_PROCESS_START,
            p_hostname=p_pinfo.hostname,
            p_pid=p_pinfo.pid,
            p_username=p_pinfo.username,
            p_process_start_time=p_pinfo.start_time)
        self.event_handler.queue_event(p_event=event, p_is_action=True)

    def queue_artificial_activation_events(self):

        for handler in self._process_handlers.values():
            events = handler.get_artificial_activation_events()

            fmt = "Artificially activating {process_count} active processes on handler {handler_id}"
            self._logger.info(fmt.format(process_count=len(events), handler_id=handler.id))

            self.event_handler.queue_events(p_events=events, p_to_master=True)

    def queue_artificial_kill_events(self):

        for handler in self._process_handlers.values():
            events = handler.get_artificial_kill_events()

            fmt = "Artificially killing {process_count} active processes on handler {handler_id}"
            self._logger.info(fmt.format(process_count=len(events), handler_id=handler.id))

            self.event_handler.queue_events(p_events=events, p_to_master=False)

    def queue_event_kill_process(self, p_hostname, p_username, p_processhandler, p_pid, p_process_start_time,
                                 p_delay=None):

        event = admin_event.AdminEvent(
            p_event_type=admin_event.EVENT_TYPE_KILL_PROCESS,
            p_hostname=p_hostname,
            p_username=p_username,
            p_processhandler=p_processhandler,
            p_pid=p_pid,
            p_process_start_time=p_process_start_time,
            p_delay=p_delay if p_delay is not None else self._config.kill_process_delay)
        self.event_handler.queue_event(p_event=event, p_is_action=True)

    def handle_downtime(self, p_downtime):

        for process_handler in self._process_handlers.values():
            events = process_handler.get_downtime_corrected_admin_events(p_downtime=p_downtime)

            self.event_handler.queue_events(p_events=events, p_to_master=True)

    def check_issue_logout_warning(self, p_username, p_rule_result_info: RuleResultInfo):

        issue_warning = False
        notification = None

        current_warning = self._logout_warnings.get(p_username)

        if p_rule_result_info.approaching_logout_rules > 0:
            if current_warning is None or p_rule_result_info.get_minutes_left_in_session() != current_warning:
                issue_warning = True
                self._logout_warnings[p_username] = p_rule_result_info.get_minutes_left_in_session()
                notification = self._language.pick_text_for_approaching_logout(p_rule_result_info=p_rule_result_info)

        elif current_warning is not None:
            self._logout_warnings[p_username] = None
            issue_warning = True

        if issue_warning:
            with SessionContext(p_persistence=self.persistence) as session_context:
                self.user_manager.issue_notification(p_session_context=session_context,
                                                     p_username=p_username, p_message=notification)

    def reset_logout_warning(self, p_username):

        if p_username in self._logout_warnings:
            del self._logout_warnings[p_username]

    def handle_rule_result_info(self, p_rule_result_info, p_stat_info, p_user):

        if p_rule_result_info.activity_allowed():
            self.check_issue_logout_warning(p_username=p_user.username, p_rule_result_info=p_rule_result_info)

        else:
            fmt = "Process %s" % str(p_rule_result_info.effective_rule_set)
            self._logger.debug(fmt)

            for (hostname, processes) in p_stat_info.currently_active_host_processes.items():

                fmt = "User %s has %d active process(es) on host %s" % (
                    p_user.username, len(processes), hostname)
                self._logger.debug(fmt)

                process_killed = False

                for processhandler_id, pid, process_start_time in processes:
                    processhandler = self._process_handlers.get(processhandler_id)

                    if processhandler is not None and processhandler.can_kill_processes():
                        if self.prometheus_client is not None:
                            self.prometheus_client.count_forced_logouts(p_username=p_user.username)

                        self.queue_event_kill_process(
                            p_hostname=hostname,
                            p_username=p_user.username,
                            p_processhandler=processhandler_id,
                            p_pid=pid,
                            p_process_start_time=process_start_time)
                        process_killed = True

                if process_killed:
                    with SessionContext(p_persistence=self.persistence) as session_context:
                        self.user_manager.issue_notification(
                            p_session_context=session_context,
                            p_username=p_user.username,
                            p_message=self._language.pick_text_for_ruleset(p_rule_result_info=p_rule_result_info))

                    self.reset_logout_warning(p_username=p_user.username)