tulir/mautrix-telegram

View on GitHub
mautrix_telegram/matrix.py

Summary

Maintainability
B
4 hrs
Test Coverage
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2022 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations

from typing import TYPE_CHECKING
import sys

from mautrix.bridge import BaseMatrixHandler
from mautrix.types import (
    Event,
    EventID,
    EventType,
    MemberStateEventContent,
    PresenceEvent,
    PresenceState,
    ReactionEvent,
    ReceiptEvent,
    RedactionEvent,
    RoomAvatarStateEventContent as AvatarContent,
    RoomID,
    RoomNameStateEventContent as NameContent,
    RoomTopicStateEventContent as TopicContent,
    SingleReceiptEventContent,
    StateEvent,
    TypingEvent,
    UserID,
)

from . import commands as com, portal as po, puppet as pu, user as u
from .commands.portal.util import get_initial_state, user_has_power_level, warn_missing_power
from .types import TelegramID

if TYPE_CHECKING:
    from .__main__ import TelegramBridge


class MatrixHandler(BaseMatrixHandler):
    commands: com.CommandProcessor
    _previously_typing: dict[RoomID, set[UserID]]

    def __init__(self, bridge: "TelegramBridge") -> None:
        prefix, suffix = bridge.config["bridge.username_template"].format(userid=":").split(":")
        homeserver = bridge.config["homeserver.domain"]
        self.user_id_prefix = f"@{prefix}"
        self.user_id_suffix = f"{suffix}:{homeserver}"

        super().__init__(command_processor=com.CommandProcessor(bridge), bridge=bridge)

        self._previously_typing = {}

    async def handle_puppet_group_invite(
        self,
        room_id: RoomID,
        puppet: pu.Puppet,
        invited_by: u.User,
        evt: StateEvent,
        members: list[UserID],
    ) -> None:
        double_puppet = await pu.Puppet.get_by_custom_mxid(invited_by.mxid)
        if (
            not double_puppet
            or self.az.bot_mxid in members
            or not self.config["bridge.create_group_on_invite"]
        ):
            if self.az.bot_mxid not in members:
                await puppet.default_mxid_intent.leave_room(
                    room_id,
                    reason="This ghost does not join multi-user rooms without the bridge bot.",
                )
            else:
                await puppet.default_mxid_intent.send_notice(
                    room_id,
                    "This ghost will remain inactive "
                    "until a Telegram chat is created for this room.",
                )
            return
        elif not await user_has_power_level(
            evt.room_id, double_puppet.intent, invited_by, "bridge"
        ):
            await puppet.default_mxid_intent.leave_room(
                room_id, reason="You do not have the permissions to bridge this room."
            )
            return

        await double_puppet.intent.invite_user(room_id, self.az.bot_mxid)

        title, about, levels, encrypted = await get_initial_state(double_puppet.intent, room_id)
        if not title:
            await puppet.default_mxid_intent.leave_room(
                room_id, reason="Please set a title before inviting Telegram ghosts."
            )
            return

        portal = po.Portal(
            tgid=TelegramID(0),
            tg_receiver=TelegramID(0),
            peer_type="channel",
            mxid=evt.room_id,
            title=title,
            about=about,
            encrypted=encrypted,
        )
        await portal.az.intent.ensure_joined(room_id)
        levels = await portal.az.intent.get_power_levels(room_id)
        invited_by_level = levels.get_user_level(invited_by.mxid)
        if invited_by_level > levels.get_user_level(self.az.bot_mxid):
            levels.users[self.az.bot_mxid] = 100 if invited_by_level >= 100 else invited_by_level
            await double_puppet.intent.set_power_levels(room_id, levels)

        try:
            await portal.create_telegram_chat(invited_by, supergroup=True)
        except ValueError as e:
            await portal.delete()
            await portal.az.intent.send_notice(room_id, e.args[0])
            return

    async def handle_invite(
        self, room_id: RoomID, user_id: UserID, inviter: u.User, event_id: EventID
    ) -> None:
        user = await u.User.get_by_mxid(user_id, create=False)
        if not user:
            return
        await user.ensure_started()
        portal = await po.Portal.get_by_mxid(room_id)
        if (
            user
            and portal
            and await user.has_full_access(allow_bot=True)
            and portal.allow_bridging
        ):
            await portal.handle_matrix_invite(inviter, user)

    async def handle_join(self, room_id: RoomID, user_id: UserID, event_id: EventID) -> None:
        user = await u.User.get_and_start_by_mxid(user_id)

        portal = await po.Portal.get_by_mxid(room_id)
        if not portal or not portal.allow_bridging:
            return

        if not user.relaybot_whitelisted:
            await portal.main_intent.kick_user(
                room_id, user.mxid, "You are not whitelisted on this Telegram bridge."
            )
            return
        elif not await user.is_logged_in() and not portal.has_bot:
            await portal.main_intent.kick_user(
                room_id,
                user.mxid,
                "This chat does not have a bot on the Telegram side for relaying messages sent by"
                " unauthenticated Matrix users.",
            )
            return

        self.log.debug(f"{user.mxid} joined {room_id}")
        if await user.is_logged_in() or portal.has_bot:
            await portal.join_matrix(user, event_id)

    async def handle_leave(self, room_id: RoomID, user_id: UserID, event_id: EventID) -> None:
        self.log.debug(f"{user_id} left {room_id}")
        portal = await po.Portal.get_by_mxid(room_id)
        if not portal or not portal.allow_bridging:
            return

        user = await u.User.get_by_mxid(user_id, create=False)
        if not user:
            return
        await user.ensure_started()
        await portal.leave_matrix(user, event_id)

    async def handle_kick_ban(
        self,
        ban: bool,
        room_id: RoomID,
        user_id: UserID,
        sender: UserID,
        reason: str,
        event_id: EventID,
    ) -> None:
        action = "banned" if ban else "kicked"
        self.log.debug(f"{user_id} was {action} from {room_id} by {sender} for {reason}")
        portal = await po.Portal.get_by_mxid(room_id)
        if not portal or not portal.allow_bridging:
            return

        if user_id == self.az.bot_mxid:
            # Direct chat portal unbridging is handled in portal.kick_matrix
            if portal.peer_type != "user":
                await portal.unbridge()
            return

        sender = await u.User.get_by_mxid(sender, create=False)
        if not sender:
            return
        await sender.ensure_started()

        puppet = await pu.Puppet.get_by_mxid(user_id)
        if puppet:
            if ban:
                await portal.ban_matrix(puppet, sender)
            else:
                await portal.kick_matrix(puppet, sender)
            return

        user = await u.User.get_by_mxid(user_id, create=False)
        if not user:
            return
        await user.ensure_started()
        if ban:
            await portal.ban_matrix(user, sender)
        else:
            await portal.kick_matrix(user, sender)

    async def handle_kick(
        self, room_id: RoomID, user_id: UserID, kicked_by: UserID, reason: str, event_id: EventID
    ) -> None:
        await self.handle_kick_ban(False, room_id, user_id, kicked_by, reason, event_id)

    async def handle_unban(
        self, room_id: RoomID, user_id: UserID, unbanned_by: UserID, reason: str, event_id: EventID
    ) -> None:
        # TODO handle unbans properly instead of handling it as a kick
        await self.handle_kick_ban(False, room_id, user_id, unbanned_by, reason, event_id)

    async def handle_ban(
        self, room_id: RoomID, user_id: UserID, banned_by: UserID, reason: str, event_id: EventID
    ) -> None:
        await self.handle_kick_ban(True, room_id, user_id, banned_by, reason, event_id)

    async def allow_message(self, user: u.User) -> bool:
        return user.relaybot_whitelisted

    async def allow_command(self, user: u.User) -> bool:
        return user.whitelisted

    @staticmethod
    async def allow_bridging_message(user: u.User, portal: po.Portal) -> bool:
        return await user.is_logged_in() or portal.has_bot

    @staticmethod
    async def handle_redaction(evt: RedactionEvent) -> None:
        sender = await u.User.get_and_start_by_mxid(evt.sender)
        if not sender.relaybot_whitelisted:
            return

        portal = await po.Portal.get_by_mxid(evt.room_id)
        if not portal or not portal.allow_bridging:
            return

        await portal.handle_matrix_deletion(sender, evt.redacts, evt.event_id)

    @staticmethod
    async def handle_reaction(evt: ReactionEvent) -> None:
        sender = await u.User.get_and_start_by_mxid(evt.sender)
        if not await sender.has_full_access():
            return

        portal = await po.Portal.get_by_mxid(evt.room_id)
        if not portal or not portal.allow_bridging:
            return

        await portal.handle_matrix_reaction(
            sender, evt.content.relates_to.event_id, evt.content.relates_to.key, evt.event_id
        )

    @staticmethod
    async def handle_power_levels(evt: StateEvent) -> None:
        portal = await po.Portal.get_by_mxid(evt.room_id)
        sender = await u.User.get_and_start_by_mxid(evt.sender)
        if await sender.has_full_access(allow_bot=True) and portal and portal.allow_bridging:
            await portal.handle_matrix_power_levels(
                sender, evt.content.users, evt.unsigned.prev_content.users, evt.event_id
            )

    @staticmethod
    async def handle_room_meta(
        evt_type: EventType,
        room_id: RoomID,
        sender_mxid: UserID,
        content: NameContent | AvatarContent | TopicContent,
        event_id: EventID,
    ) -> None:
        portal = await po.Portal.get_by_mxid(room_id)
        sender = await u.User.get_and_start_by_mxid(sender_mxid)
        if await sender.has_full_access(allow_bot=True) and portal and portal.allow_bridging:
            handler, content_type, content_key = {
                EventType.ROOM_NAME: (portal.handle_matrix_title, NameContent, "name"),
                EventType.ROOM_TOPIC: (portal.handle_matrix_about, TopicContent, "topic"),
                EventType.ROOM_AVATAR: (portal.handle_matrix_avatar, AvatarContent, "url"),
            }[evt_type]
            if not isinstance(content, content_type):
                return
            await handler(sender, content[content_key], event_id)

    @staticmethod
    async def handle_room_pin(
        room_id: RoomID,
        sender_mxid: UserID,
        new_events: set[str],
        old_events: set[str],
        event_id: EventID,
    ) -> None:
        portal = await po.Portal.get_by_mxid(room_id)
        sender = await u.User.get_and_start_by_mxid(sender_mxid)
        if await sender.has_full_access(allow_bot=True) and portal and portal.allow_bridging:
            if not new_events:
                await portal.handle_matrix_unpin_all(sender, event_id)
            else:
                changes = {
                    event_id: event_id in new_events for event_id in new_events ^ old_events
                }
                await portal.handle_matrix_pin(sender, changes, event_id)

    @staticmethod
    async def handle_room_upgrade(
        room_id: RoomID, sender: UserID, new_room_id: RoomID, event_id: EventID
    ) -> None:
        portal = await po.Portal.get_by_mxid(room_id)
        if portal and portal.allow_bridging:
            await portal.handle_matrix_upgrade(sender, new_room_id, event_id)

    async def handle_member_info_change(
        self,
        room_id: RoomID,
        user_id: UserID,
        profile: MemberStateEventContent,
        prev_profile: MemberStateEventContent,
        event_id: EventID,
    ) -> None:
        if profile.displayname == prev_profile.displayname:
            return

        portal = await po.Portal.get_by_mxid(room_id)
        if not portal or not portal.has_bot or not portal.allow_bridging:
            return

        user = await u.User.get_and_start_by_mxid(user_id)
        if await user.needs_relaybot(portal):
            await portal.name_change_matrix(
                user, profile.displayname, prev_profile.displayname, event_id
            )

    async def handle_read_receipt(
        self, user: u.User, portal: po.Portal, event_id: EventID, data: SingleReceiptEventContent
    ) -> None:
        if not portal.allow_bridging:
            return
        await portal.mark_read(user, event_id, data.get("ts", 0))

    @staticmethod
    async def handle_presence(user_id: UserID, presence: PresenceState) -> None:
        user = await u.User.get_by_mxid(user_id, check_db=False, create=False)
        if user and await user.is_logged_in():
            await user.set_presence(presence == PresenceState.ONLINE)

    async def handle_typing(self, room_id: RoomID, now_typing: set[UserID]) -> None:
        portal = await po.Portal.get_by_mxid(room_id)
        if not portal or not portal.allow_bridging:
            return

        previously_typing = self._previously_typing.get(room_id, set())

        for user_id in set(previously_typing | now_typing):
            is_typing = user_id in now_typing
            was_typing = user_id in previously_typing
            if is_typing and was_typing:
                continue

            user = await u.User.get_by_mxid(user_id, check_db=False, create=False)
            if user and await user.is_logged_in():
                await portal.set_typing(user, is_typing)

        self._previously_typing[room_id] = now_typing

    async def handle_ephemeral_event(
        self, evt: ReceiptEvent | PresenceEvent | TypingEvent
    ) -> None:
        if evt.type == EventType.RECEIPT:
            await self.handle_receipt(evt)
        elif evt.type == EventType.PRESENCE:
            await self.handle_presence(evt.sender, evt.content.presence)
        elif evt.type == EventType.TYPING:
            await self.handle_typing(evt.room_id, set(evt.content.user_ids))

    async def handle_event(self, evt: Event) -> None:
        if evt.type == EventType.ROOM_REDACTION:
            await self.handle_redaction(evt)
        elif evt.type == EventType.REACTION:
            await self.handle_reaction(evt)

    async def handle_state_event(self, evt: StateEvent) -> None:
        if evt.type == EventType.ROOM_POWER_LEVELS:
            await self.handle_power_levels(evt)
        elif evt.type in (EventType.ROOM_NAME, EventType.ROOM_AVATAR, EventType.ROOM_TOPIC):
            await self.handle_room_meta(
                evt.type, evt.room_id, evt.sender, evt.content, evt.event_id
            )
        elif evt.type == EventType.ROOM_PINNED_EVENTS:
            new_events = set(evt.content.pinned)
            try:
                old_events = set(evt.unsigned.prev_content.pinned)
            except (KeyError, ValueError, TypeError, AttributeError):
                old_events = set()
            await self.handle_room_pin(
                evt.room_id, evt.sender, new_events, old_events, evt.event_id
            )
        elif evt.type == EventType.ROOM_TOMBSTONE:
            await self.handle_room_upgrade(
                evt.room_id, evt.sender, evt.content.replacement_room, evt.event_id
            )