marcus67/little_brother

View on GitHub
little_brother/user_manager.py

Summary

Maintainability
B
4 hrs
Test Coverage
# -*- coding: utf-8 -*-

# Copyright (C) 2019-2021  Marcus Rickert
#
# See https://github.com/marcus67/little_brother_taskbar
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

from little_brother import admin_event
from little_brother import dependency_injection
from little_brother.event_handler import EventHandler
from little_brother.login_mapping import LoginMapping
from little_brother.persistence.persistent_dependency_injection_mix_in import PersistenceDependencyInjectionMixIn
from little_brother.persistence.persistent_user import User
from little_brother.persistence.session_context import SessionContext
from little_brother.user_locale_handler import UserLocaleHandler
from little_brother.user_status import UserStatus
from python_base_app import log_handling
from python_base_app.base_user_handler import BaseUserHandler


class UserManager(PersistenceDependencyInjectionMixIn):

    def __init__(self, p_config, p_login_mapping, p_server_group, p_is_master):

        super().__init__()
        self._logger = log_handling.get_logger(self.__class__.__name__)

        self._user_handler = None
        self._event_handler = None

        self._server_group = p_server_group
        self._is_master = p_is_master
        self._config = p_config
        self._usernames_not_found = []
        self._usernames = []

        if p_login_mapping is None:
            p_login_mapping = LoginMapping(p_default_server_group=p_server_group)

        self._login_mapping = p_login_mapping

        self._user_status = {}

        self._login_mapping_received = self._is_master
        self._user_locale_handler = UserLocaleHandler()

    @property
    def event_handler(self) -> EventHandler:

        if self._event_handler is None:
            self._event_handler = dependency_injection.container[EventHandler]

        return self._event_handler

    @property
    def user_handler(self) -> BaseUserHandler:

        if self._user_handler is None:
            self._user_handler = dependency_injection.container[BaseUserHandler]

        return self._user_handler

    @property
    def usernames(self):
        return self._usernames

    def register_events(self):
        self.event_handler.register_event_handler(
            p_event_type=admin_event.EVENT_TYPE_UPDATE_LOGIN_MAPPING, p_handler=self.handle_event_update_login_mapping)

    def reset_users(self, p_session_context: SessionContext):
        self._usernames = []

        active_users = [ key for key, user in self.user_entity_manager.user_map(p_session_context=p_session_context).items() if user.active ]
        self._usernames_not_found.extend(active_users)

        fmt = "Watching usernames: %s" % ",".join(self. _usernames_not_found)
        self._logger.info(fmt)

    def handle_event_update_login_mapping(self, p_event):

        with SessionContext(p_persistence=self.persistence) as session_context:
            self._login_mapping.from_json(p_session_context=session_context, p_json=p_event.payload)
            server_group_names = ', '.join(self._login_mapping.get_server_group_names(
                p_session_context=session_context))
            fmt = f"Received login mapping for server group(s) {server_group_names}"
            self._logger.info(fmt)
            self._login_mapping_received = True

    def retrieve_user_mappings(self, p_session_context: SessionContext):

        if len(self._usernames_not_found) > 0:
            usernames_found = []

            for username in self._usernames_not_found:
                uid = self._login_mapping.get_uid_by_login(p_session_context=p_session_context,
                                                           p_server_group=self._server_group,
                                                           p_login=username)

#                if uid is None and self._login_mapping_received:
                if uid is None:
                    uid = self.user_handler.get_uid(p_username=username)

                    if uid is not None:
                        if not self._login_mapping_received:
                            self._logger.warn(f"Setting local mapping entry for user {username}")
                        self._login_mapping.add_entry(p_session_context=p_session_context,
                                                      p_server_group=self._server_group,
                                                      p_uid=uid, p_username=username)

                if uid is not None:
                    usernames_found.append(username)
                    if username not in self._usernames:
                        self._usernames.append(username)

                    fmt = "Found user information for user '{user}': UID={uid}"
                    self._logger.info(fmt.format(user=username, uid=uid))

                else:
                    fmt = "Cannot find user information for user '{user}', will retry later..."
                    self._logger.warning(fmt.format(user=username))

            for username in usernames_found:
                self._usernames_not_found.remove(username)

            if len(self._usernames_not_found) == 0:
                fmt = "Retrieved user information for all {user_count} users"
                self._logger.info(fmt.format(user_count=len(self._usernames)))

    def get_number_of_monitored_users(self):

        count = 0

        with SessionContext(p_persistence=self.persistence) as session_context:
            for user in self.user_entity_manager.users(session_context):
                if user.active and user.username in self._usernames:
                    count += 1

        return count

    def queue_event_update_login_mapping(self, p_hostname, p_login_mapping):

        event = admin_event.AdminEvent(
            p_event_type=admin_event.EVENT_TYPE_UPDATE_LOGIN_MAPPING,
            p_hostname=p_hostname,
            p_payload=p_login_mapping)
        self.event_handler.queue_event(p_event=event, p_is_action=True)

    def send_login_mapping_to_client(self, p_session_context: SessionContext, p_hostname:str):

        self.queue_event_update_login_mapping(
            p_hostname=p_hostname,
            p_login_mapping=self._login_mapping.to_json(p_session_context=p_session_context))

    def get_current_user_status(self, p_session_context: SessionContext, p_username: str):

        current_user_status = self._user_status.get(p_username)

        if current_user_status is None:
            current_user_status = UserStatus(p_username=p_username)
            self._user_status[p_username] = current_user_status

        current_user_status.warning_time_without_send_events = self._config.warning_time_without_send_events
        current_user_status.maximum_time_without_send_events = self._config.maximum_time_without_send_events

        current_user_status.locale = self._user_locale_handler.get_user_locale(
            p_session_context=p_session_context, p_username=p_username)

        user: User = self.user_entity_manager.user_map(p_session_context=p_session_context).get(p_username)

        current_user_status.monitoring_active = user.active if user is not None else False

        return current_user_status

    def add_monitored_user(self, p_username):

        if p_username not in self._usernames:
            self._usernames_not_found.append(p_username)
            fmt = "Adding new monitored user '{username}'"
            self._logger.info(fmt.format(username=p_username))

    def remove_monitored_user(self, p_username):

        if p_username in self._usernames:
            fmt = "Remove monitored user '{username}'"
            self._usernames.remove(p_username)
            self._logger.info(fmt.format(username=p_username))


    def issue_notification(self, p_session_context, p_username, p_message):

        current_user_status = self.get_current_user_status(p_session_context=p_session_context, p_username=p_username)
        current_user_status.notification = p_message