fossasia/open-event-orga-server

View on GitHub
app/api/chat/rocket_chat.py

Summary

Maintainability
C
1 day
Test Coverage
import logging
import random
import string
from dataclasses import dataclass
from typing import Optional

import requests

from app.api.helpers.db import get_new_identifier, get_or_create
from app.models import db
from app.models.event import Event
from app.models.microlocation import Microlocation
from app.models.user import User
from app.models.video_stream import VideoStream
from app.settings import get_settings

logger = logging.getLogger(__name__)


class RocketChatException(ValueError):
    class CODES:
        DISABLED = 'integration_disabled'
        UNHANDLED = 'unhandled'

    code = None
    response = None

    def __init__(self, message, code=CODES.UNHANDLED, response=None) -> None:
        self.code = code
        self.response = response
        super().__init__(message)


@dataclass
class RocketChat:
    api_url: str

    @property
    def login_url(self) -> str:
        return self.api_url + '/api/v1/login'

    def login(self, user: User, event: Optional[Event] = None, method: str = 'login'):
        def save_token(token):
            user.rocket_chat_token = token
            db.session.add(user)
            db.session.commit()

        res = requests.post(
            self.login_url,
            json=dict(email=user.email, password=user.rocket_chat_password),
        )
        data = res.json()
        if res.status_code == 200:
            token = data['data']['authToken']
            save_token(token)
            if event:
                self.add_in_room(event, data['data']['userId'])
            return dict(method=method, token=token, res=data)
        else:
            # Unhandled Case
            logger.error('Error while rocket chat login: %s', data)
            raise RocketChatException('Error while logging in', response=res)

    def register(
        self,
        user: User,
        event: Optional[Event] = None,
        username_suffix='',
    ):
        settings = get_settings()
        register_url = self.api_url + '/api/v1/users.register'
        register_data = {
            'name': user.public_name or user.full_name,
            'email': user.email,
            'pass': user.rocket_chat_password,
            'username': user.rocket_chat_username + username_suffix,
        }
        if registration_secret := settings['rocket_chat_registration_secret']:
            register_data['secretURL'] = registration_secret

        res = requests.post(register_url, json=register_data)

        data = res.json()
        if res.status_code == 200:
            return self.login(user, event, 'registered')
        elif res.status_code == 400:
            if data.get('error') == 'Username is already in use':
                # Username conflict. Add random suffix and retry
                return self.register(user, event, '.' + get_new_identifier(length=5))
            logger.info('Bad Request during register: %s', data)
            # Probably already registered. Try logging in
            return self.login(user, event, 'login')
        else:
            logger.error(
                'Error while rocket chat registration: %d %s',
                res.status_code,
                data,
            )
            raise RocketChatException('Error while registration', response=res)

    def get_token(
        self,
        user: User,
        event: Optional[Event] = None,
        retried=False,
        microlocation: Optional[Microlocation] = None,
    ):
        if user.rocket_chat_token:
            res = requests.post(self.login_url, json=dict(resume=user.rocket_chat_token))

            data = res.json()
            if res.status_code == 200:
                if event:
                    self.add_in_room(event, data['data']['userId'], microlocation)
                return dict(method='resumed', token=user.rocket_chat_token, res=data)
            elif res.status_code == 401:
                # Token Expired. Login again

                try:
                    return self.login(user, event, 'login')
                except RocketChatException as rce:
                    if (
                        not retried
                        and rce.response is not None
                        and rce.response.status_code == 401
                    ):
                        # Invalid credentials stored. Reset credentials and retry
                        # If we have already retried, give up
                        user.rocket_chat_token = None
                        db.session.add(user)
                        db.session.commit()
                        return self.get_token(user, event, retried=True)
                    else:
                        raise rce
            else:
                # Unhandled Case
                logger.error('Error while rocket chat resume or login: %s', data)
                raise RocketChatException(
                    'Error while resume or logging in', response=res
                )
        else:
            # No token. Try creating profile, else login

            return self.register(user, event)

    def get_token_virtual_room(
        self,
        user: User,
        event: Optional[Event] = None,
        retried=False,
        videoStream: Optional[VideoStream] = None,
    ):
        if user.rocket_chat_token:
            res = requests.post(self.login_url, json=dict(resume=user.rocket_chat_token))

            data = res.json()
            if res.status_code == 200:
                if event:
                    self.add_in_room_virtual_room(
                        event, data['data']['userId'], videoStream
                    )
                return dict(method='resumed', token=user.rocket_chat_token, res=data)
            elif res.status_code == 401:
                # Token Expired. Login again

                try:
                    return self.login(user, event, 'login')
                except RocketChatException as rce:
                    if (
                        not retried
                        and rce.response is not None
                        and rce.response.status_code == 401
                    ):
                        # Invalid credentials stored. Reset credentials and retry
                        # If we have already retried, give up
                        user.rocket_chat_token = None
                        db.session.add(user)
                        db.session.commit()
                        return self.get_token_virtual_room(user, event, retried=True)
                    else:
                        raise rce
            else:
                # Unhandled Case
                logger.error('Error while rocket chat resume or login: %s', data)
                raise RocketChatException(
                    'Error while resume or logging in', response=res
                )
        else:
            # No token. Try creating profile, else login

            return self.register(user, event)

    def check_or_create_bot(self):
        bot_email = 'open-event-bot@open-event.invalid'
        bot_user, _ = get_or_create(
            User,
            _email=bot_email,
            defaults=dict(password=generate_pass(), first_name='open-event-bot'),
        )

        return bot_user

    def create_room(self, event: Event, microlocation: Optional[Microlocation], data):
        bot_token = data['token']
        bot_id = data['res']['data']['userId']
        if microlocation:
            chat_room_name = microlocation.chat_room_name
        else:
            chat_room_name = event.chat_room_name

        res = requests.post(
            self.api_url + '/api/v1/groups.create',
            json=dict(
                name=chat_room_name,
                members=[bot_id],
            ),
            headers={
                'X-Auth-Token': bot_token,
                'X-User-Id': bot_id,
            },
        )
        if not res.status_code == 200:
            logger.error('Error while creating room : %s', res.json())
            raise RocketChatException('Error while creating room', response=res)
        else:
            group_data = res.json()
            if microlocation:
                microlocation.chat_room_id = group_data['group']['_id']
                db.session.add(microlocation)
            else:
                event.chat_room_id = group_data['group']['_id']
                db.session.add(event)
            db.session.commit()

    def create_room_virtual_room(
        self, event: Event, videoStream: Optional[VideoStream], data
    ):
        bot_token = data['token']
        bot_id = data['res']['data']['userId']
        if videoStream:
            chat_room_name = videoStream.chat_room_name
        else:
            chat_room_name = event.chat_room_name

        res = requests.post(
            self.api_url + '/api/v1/groups.create',
            json=dict(
                name=chat_room_name,
                members=[bot_id],
            ),
            headers={
                'X-Auth-Token': bot_token,
                'X-User-Id': bot_id,
            },
        )
        if not res.status_code == 200:
            logger.error('Error while creating room : %s', res.json())
            raise RocketChatException('Error while creating room', response=res)
        else:
            group_data = res.json()
            if videoStream:
                videoStream.chat_room_id = group_data['group']['_id']
                db.session.add(videoStream)
            else:
                event.chat_room_id = group_data['group']['_id']
                db.session.add(event)
            db.session.commit()

    def add_in_room(
        self, event: Event, rocket_user_id, microlocation: Optional[Microlocation] = None
    ):
        bot = self.check_or_create_bot()
        data = self.get_token(bot)

        if (not event.chat_room_id) or (microlocation and not microlocation.chat_room_id):
            self.create_room(event=event, microlocation=microlocation, data=data)

        if microlocation is not None:
            chat_room_id = microlocation.chat_room_id
        else:
            chat_room_id = event.chat_room_id

        bot_token = data['token']
        bot_id = data['res']['data']['userId']
        room_info = {'roomId': chat_room_id, 'userId': rocket_user_id}

        res = requests.post(
            self.api_url + '/api/v1/groups.invite',
            json=room_info,
            headers={
                'X-Auth-Token': bot_token,
                'X-User-Id': bot_id,
            },
        )

        if res.status_code != 200:
            logger.error('Error while adding user : %s', res.json())
            raise RocketChatException('Error while adding user', response=res)

    def add_in_room_virtual_room(
        self, event: Event, rocket_user_id, videoStream: Optional[VideoStream] = None
    ):
        bot = self.check_or_create_bot()
        data = self.get_token_virtual_room(bot, videoStream=videoStream)
        if (not event.chat_room_id) or (videoStream and not videoStream.chat_room_id):
            self.create_room_virtual_room(event=event, videoStream=videoStream, data=data)

        if videoStream is not None:
            chat_room_id = videoStream.chat_room_id
        else:
            chat_room_id = event.chat_room_id

        bot_token = data['token']
        bot_id = data['res']['data']['userId']
        room_info = {'roomId': chat_room_id, 'userId': rocket_user_id}

        res = requests.post(
            self.api_url + '/api/v1/groups.invite',
            json=room_info,
            headers={
                'X-Auth-Token': bot_token,
                'X-User-Id': bot_id,
            },
        )

        if res.status_code != 200:
            logger.error('Error while adding user : %s', res.json())
            raise RocketChatException('Error while adding user', response=res)


def generate_pass(size=10, chars=string.ascii_lowercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


def get_rocket_chat_token(
    user: User,
    event: Optional[Event] = None,
    microlocation: Optional[Microlocation] = None,
):
    settings = get_settings()
    if not (api_url := settings['rocket_chat_url']):
        raise RocketChatException(
            'Rocket Chat Integration is not enabled', RocketChatException.CODES.DISABLED
        )

    rocket_chat = RocketChat(api_url)
    return rocket_chat.get_token(user, event, microlocation=microlocation)


def get_rocket_chat_token_virtual_room(
    user: User,
    event: Optional[Event] = None,
    videoStream: Optional[VideoStream] = None,
):
    settings = get_settings()
    if not (api_url := settings['rocket_chat_url']):
        raise RocketChatException(
            'Rocket Chat Integration is not enabled', RocketChatException.CODES.DISABLED
        )

    rocket_chat = RocketChat(api_url)
    return rocket_chat.get_token_virtual_room(user, event, videoStream=videoStream)


def rename_rocketchat_room(event: Event):
    settings = get_settings()
    if not event.chat_room_id or not (api_url := settings['rocket_chat_url']):
        return

    rocket_chat = RocketChat(api_url)
    bot = rocket_chat.check_or_create_bot()
    data = rocket_chat.get_token(bot)

    bot_token = data['token']
    bot_id = data['res']['data']['userId']

    res = requests.post(
        rocket_chat.api_url + '/api/v1/groups.rename',
        json=dict(
            name=event.chat_room_name,
            roomId=event.chat_room_id,
        ),
        headers={
            'X-Auth-Token': bot_token,
            'X-User-Id': bot_id,
        },
    )

    if not res.status_code == 200:
        logger.error('Error while changing room name : %s', res.json())
        raise RocketChatException('Error while changing room name', response=res)