thenetcircle/dino

View on GitHub
dino/db/manager/users.py

Summary

Maintainability
D
1 day
Test Coverage
import logging
import traceback
from datetime import datetime
from typing import Optional
from uuid import uuid4 as uuid

from dino import utils
from dino.config import ConfigKeys
from dino.db.manager.base import BaseManager
from dino.environ import GNEnvironment
from dino.exceptions import NoSuchUserException
from dino.exceptions import UnknownBanTypeException
from dino.utils import ActivityBuilder

logger = logging.getLogger(__name__)


def created_activity(user_id: str, user_name: str, target_id: str, target_name: str, session_ids: list, namespace: str) -> dict:
    user_name = utils.b64e(user_name)
    target_name = utils.b64e(target_name)

    return ActivityBuilder.enrich({
        "actor": {
            "id": user_id,
            "content": ",".join(session_ids),
            "displayName": user_name,
            "objectType": "user",
            "url": namespace
        },
        "object": {
            "id": user_id,
            "displayName": user_name,
            "url": namespace
        },
        "verb": "created",
        "target": {
            "id": target_id,
            "displayName": target_name,
            "content": "out_of_scope",
            "objectType": "room"
        }
    })


def join_activity(user_id: str, user_name: str, target_id: str, session_ids: list, namespace: str) -> dict:
    user_name = utils.b64e(user_name)

    return ActivityBuilder.enrich({
        "actor": {
            "id": user_id,
            "content": ",".join(session_ids),
            "displayName": user_name,
            "url": namespace
        },
        "object": {
            "id": user_id,
            "displayName": user_name,
            "objectType": "user",
            "url": namespace
        },
        "verb": "join",
        "target": {
            "id": target_id,
            "content": "out_of_scope",
            "objectTYpe": "room"
        }
    })


def leave_activity(user_id: str, user_name: str, target_id: str, session_ids: list, namespace: str) -> dict:
    user_name = utils.b64e(user_name)

    return ActivityBuilder.enrich({
        "actor": {
            "id": user_id,
            "content": ",".join(session_ids),
            "displayName": user_name,
            "url": namespace
        },
        "object": {
            "id": user_id,
            "displayName": user_name,
            "objectType": "user",
            "url": namespace
        },
        "verb": "leave",
        "target": {
            "id": target_id,
            "content": "out_of_scope",
            "objectTYpe": "room"
        }
    })


class UserManager(BaseManager):
    def __init__(self, env: GNEnvironment):
        self.env = env

    def auth_user(self, user_id, _):
        self.env.cache.add_heartbeat(user_id)

    def get_users_for_room(self, room_id: str) -> list:
        users = self.env.db.users_in_room(room_id, skip_cache=True)
        output = list()

        for user_id, user_name in users.items():
            output.append({
                'uuid': user_id,
                'name': user_name
            })
        return output

    def del_super_user(self, user_uuid: str) -> None:
        self.env.db.remove_super_user(user_uuid)

    def set_super_user(self, user_uuid: str) -> None:
        self.env.db.set_super_user(user_uuid)

    def search_for(self, query: str) -> list:
        users = self.env.db.search_for_users(query)
        output = list()
        for user in users:
            output.append({
                'uuid': user['uuid'],
                'name': user['name']
            })
        return output

    def join_room(self, user_id, user_name, room_id, session_ids, namespace) -> None:
        data = join_activity(user_id, user_name, room_id, session_ids, namespace)
        self.env.publish(data)

    def room_created(self, user_id, user_name, room_id, room_name, session_ids, namespace) -> None:
        data = created_activity(user_id, user_name, room_id, room_name, session_ids, namespace)
        self.env.publish(data)

    def leave_room(self, user_id, user_name, room_id, session_ids, namespace) -> None:
        data = leave_activity(user_id, user_name, room_id, session_ids, namespace)
        self.env.publish(data)

    def kick_user(
            self, room_id: Optional[str], user_id: str, reason: str = None, admin_id: str = None, room_name: str = None
    ) -> None:
        # TODO: support get room_id from room name

        if room_name is None:
            try:
                room_name = self.env.db.get_room_name(room_id)
            except Exception as e:
                logger.error('could not get room name for room uuid %s: %s' % (room_id, str(e)))
                raise e

        elif room_id is None:
            room_id = utils.get_room_id(room_name, use_default_channel=True)

        try:
            user_name = self.env.db.get_user_name(user_id)
        except Exception as e:
            logger.error('could not get user name for user id %s: %s' % (user_id, str(e)))
            raise e

        kick_activity = {
            'actor': {
                'id': '0',
                'displayName': utils.b64e('admin')
            },
            'verb': 'kick',
            'object': {
                'id': user_id,
                'displayName': utils.b64e(user_name)
            },
            'target': {
                'id': room_id,
                'displayName': utils.b64e(room_name),
                'objectType': 'room',
                'url': '/ws'
            },
            'published': datetime.utcnow().strftime(ConfigKeys.DEFAULT_DATE_FORMAT),
            'id': str(uuid())
        }

        if admin_id is not None:
            kick_activity['actor']['id'] = admin_id

        if reason is not None:
            if utils.is_base64(reason):
                kick_activity['object']['content'] = reason
            else:
                logger.warning('reason is not base64, ignoring')

        self.env.publish(kick_activity)

    def remove_mute(self, user_id: str, room_id: str, room_name: str) -> None:
        mute_activity = {
            'actor': {
                'id': '0',
                'objectType': 'user'
            },
            'verb': 'unmute',
            'object': {
                'id': user_id,
                'objectType': 'user'
            },
            'target': {
                'objectType': 'room',
                'id': room_id,
                'displayName': utils.b64e(room_name)
            },
            'id': str(uuid()),
            'published': datetime.utcnow().strftime(ConfigKeys.DEFAULT_DATE_FORMAT)
        }

        # only send event if unmute was successful
        if self.env.db.remove_room_mute(room_id, user_id):
            self.env.out_of_scope_emit(
                'gn_unmute', mute_activity, json=True, room=user_id,
                broadcast=True, include_self=True, namespace='/ws'
            )

    def mute_user(
            self, user_id: str, room_id: str, duration: str,
            reason: str = None, muter_id: str = None, room_name: str = None
    ) -> None:
        mute_dt = utils.ban_duration_to_datetime(duration)
        end_time = mute_dt.strftime(ConfigKeys.DEFAULT_DATE_FORMAT)

        logger.info(f"muting {user_id} in room {room_id} ({room_name}) until {end_time}")
        self.env.db.mute_user(
            room_id=room_id, user_id=user_id, mute_duration=duration, mute_timestamp=mute_dt.timestamp(),
            room_name=room_name, muter_id=muter_id, reason=reason
        )

        mute_activity = {
            'actor': {
                'id': '0',
                'objectType': 'user'
            },
            'verb': 'mute',
            'object': {
                'id': user_id,
                'summary': duration,
                'updated': end_time,
                'objectType': 'user'
            },
            'target': {
                'id': room_id,
                'objectType': 'room'
            },
            'published': datetime.utcnow().strftime(ConfigKeys.DEFAULT_DATE_FORMAT),
            'id': str(uuid())
        }

        if reason is not None:
            mute_activity['object']['content'] = reason

        if muter_id is not None:
            mute_activity['actor']['id'] = muter_id

        if room_name is not None:
            mute_activity['target']['displayName'] = utils.b64e(room_name)

        self.env.out_of_scope_emit(
            'gn_mute', mute_activity, json=True, room=user_id,
            broadcast=True, include_self=True, namespace='/ws'
        )

    def ban_user(
            self, user_id: str, target_id: str, duration: str, target_type: str,
            reason: str = None, banner_id: str = None, user_name: str = None, target_name: str = None
    ) -> None:
        if target_type not in {'global', 'channel', 'room'}:
            raise UnknownBanTypeException(target_type)

        if target_name is None:
            if target_type == 'channel':
                target_name = self.env.db.get_channel_name(target_id)

            elif target_type == 'room':
                target_name = self.env.db.get_room_name(target_id)

        try:
            user_name = utils.b64e(self.env.db.get_user_name(user_id))
        except NoSuchUserException:
            logger.info('when processing ban request: user %s does not exist, will create' % user_id)
            user_name = user_name or user_id
            self.env.db.create_user(user_id, user_name)

        ban_activity = {
            'actor': {
                'id': '0',
                'displayName': utils.b64e('admin')
            },
            'verb': 'ban',
            'object': {
                'id': user_id,
                'displayName': user_name,
                'summary': duration,
                'updated': utils.ban_duration_to_datetime(duration).strftime(ConfigKeys.DEFAULT_DATE_FORMAT)
            },
            'target': {
                'url': '/ws',
                'objectType': target_type
            },
            'published': datetime.utcnow().strftime(ConfigKeys.DEFAULT_DATE_FORMAT),
            'id': str(uuid())
        }

        if reason is not None:
            ban_activity['object']['content'] = reason

        if banner_id is not None:
            ban_activity['actor']['id'] = banner_id

        if target_name is not None:
            ban_activity['target']['id'] = target_id
            ban_activity['target']['displayName'] = utils.b64e(target_name)

        self.env.publish(ban_activity)

    def remove_ban(self, user_id: str, target_id: str, target_type: str) -> None:
        ban_activity = {
            'actor': {
                'id': '0',
                'displayName': utils.b64e('admin')
            },
            'verb': 'unban',
            'object': {
                'id': user_id,
                'displayName': utils.b64e(utils.get_user_name_for(user_id))
            },
            'target': {
                'objectType': target_type
            },
            'id': str(uuid()),
            'published': datetime.utcnow().strftime(ConfigKeys.DEFAULT_DATE_FORMAT)
        }

        if target_type == 'global':
            self.env.db.remove_global_ban(user_id)

        elif target_type == 'channel':
            self.env.db.remove_channel_ban(target_id, user_id)
            ban_activity['target']['id'] = target_id
            ban_activity['target']['displayName'] = utils.b64e(utils.get_channel_name(target_id))

        elif target_type == 'room':
            self.env.db.remove_room_ban(target_id, user_id)
            ban_activity['target']['id'] = target_id
            ban_activity['target']['displayName'] = utils.b64e(utils.get_room_name(target_id))

        else:
            raise UnknownBanTypeException(target_type)

        self.env.publish(ban_activity)

    def get_banned_users(self) -> dict:
        return self.env.db.get_banned_users()

    def add_channel_admin(self, channel_id: str, user_id: str) -> None:
        self.env.db.set_admin(channel_id, user_id)

    def add_channel_owner(self, channel_id: str, user_id: str) -> None:
        self.env.db.set_owner_channel(channel_id, user_id)

    def add_room_moderator(self, room_id: str, user_id: str) -> None:
        self.env.db.set_moderator(room_id, user_id)

    def add_room_owner(self, room_id: str, user_id: str) -> None:
        self.env.db.set_owner(room_id, user_id)

    def remove_channel_admin(self, channel_id: str, user_id: str) -> None:
        self.env.db.remove_admin(channel_id, user_id)

    def remove_channel_owner(self, channel_id: str, user_id: str) -> None:
        self.env.db.remove_owner_channel(channel_id, user_id)

    def remove_room_moderator(self, room_id: str, user_id: str) -> None:
        self.env.db.remove_moderator(room_id, user_id)

    def remove_room_owner(self, room_id: str, user_id: str) -> None:
        self.env.db.remove_owner(room_id, user_id)

    def create_super_user(self, user_name: str, user_id: str) -> None:
        try:
            self.env.db.create_user(user_id, user_name)
            self.env.db.set_super_user(user_id)
        except Exception as e:
            logger.exception(traceback.format_exc())
            logger.error('could not create super user: %s' % str(e))

    def get_user(self, user_id: str) -> dict:
        user_name = self.env.db.get_user_name(user_id)
        return {
            'uuid': user_id,
            'name': utils.b64e(user_name)
        }

    def _get_user(self, user_id: str, user_name: str) -> dict:
        return {
            'uuid': user_id,
            'name': user_name
        }

    def get_super_users(self) -> list:
        users = self.env.db.get_super_users()
        output = list()
        for user_id, user_name in users.items():
            output.append(self._get_user(user_id, user_name))
        return output