tulir/mautrix-telegram

View on GitHub
mautrix_telegram/portal.py

Summary

Maintainability
F
1 wk
Test Coverage
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2023 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations

from typing import (
    TYPE_CHECKING,
    Any,
    AsyncGenerator,
    Awaitable,
    Callable,
    List,
    Literal,
    NamedTuple,
    Union,
    cast,
)
from collections import defaultdict
from datetime import datetime
from html import escape as escape_html
from sqlite3 import IntegrityError
from string import Template
import asyncio
import base64
import itertools
import random
import time

from asyncpg import UniqueViolationError
from telethon.errors import (
    ChatAdminRequiredError,
    ChatNotModifiedError,
    ChatRestrictedError,
    ChatWriteForbiddenError,
    EntitiesTooLongError,
    EntityBoundsInvalidError,
    EntityMentionUserInvalidError,
    InputUserDeactivatedError,
    MessageEmptyError,
    MessageIdInvalidError,
    MessageNotModifiedError,
    MessageTooLongError,
    PhotoExtInvalidError,
    PhotoInvalidDimensionsError,
    PhotoSaveFileInvalidError,
    ReactionInvalidError,
    RPCError,
    SlowModeWaitError,
    UserBannedInChannelError,
    UserIsBlockedError,
    YouBlockedUserError,
)
from telethon.tl.custom import Dialog
from telethon.tl.functions.channels import (
    CreateChannelRequest,
    EditPhotoRequest,
    EditTitleRequest,
    InviteToChannelRequest,
    JoinChannelRequest,
    UpdateUsernameRequest,
    ViewSponsoredMessageRequest,
)
from telethon.tl.functions.messages import (
    AddChatUserRequest,
    CreateChatRequest,
    EditChatAboutRequest,
    EditChatPhotoRequest,
    EditChatTitleRequest,
    ExportChatInviteRequest,
    GetMessageReactionsListRequest,
    GetMessagesReactionsRequest,
    GetPeerDialogsRequest,
    MigrateChatRequest,
    SendReactionRequest,
    SetTypingRequest,
    UnpinAllMessagesRequest,
    UpdatePinnedMessageRequest,
)
from telethon.tl.patched import Message, MessageService
from telethon.tl.types import (
    Channel,
    ChannelFull,
    Chat,
    ChatBannedRights,
    ChatEmpty,
    ChatFull,
    ChatPhoto,
    ChatPhotoEmpty,
    DocumentAttributeAudio,
    DocumentAttributeFilename,
    DocumentAttributeImageSize,
    DocumentAttributeSticker,
    DocumentAttributeVideo,
    GeoPoint,
    InputChannel,
    InputChatUploadedPhoto,
    InputDialogPeer,
    InputMediaUploadedDocument,
    InputMediaUploadedPhoto,
    InputPeerChannel,
    InputPeerChat,
    InputPeerPhotoFileLocation,
    InputPeerUser,
    InputStickerSetEmpty,
    InputUser,
    MessageActionBoostApply,
    MessageActionChannelCreate,
    MessageActionChatAddUser,
    MessageActionChatCreate,
    MessageActionChatDeletePhoto,
    MessageActionChatDeleteUser,
    MessageActionChatEditPhoto,
    MessageActionChatEditTitle,
    MessageActionChatJoinedByLink,
    MessageActionChatJoinedByRequest,
    MessageActionChatMigrateTo,
    MessageActionContactSignUp,
    MessageActionGameScore,
    MessageActionGiftPremium,
    MessageActionGroupCall,
    MessageActionPhoneCall,
    MessageMediaGame,
    MessageMediaGeo,
    MessagePeerReaction,
    MessageReactions,
    PeerChannel,
    PeerChat,
    PeerUser,
    PhoneCallDiscardReasonBusy,
    PhoneCallDiscardReasonDisconnect,
    PhoneCallDiscardReasonMissed,
    PhoneCallRequested,
    Photo,
    PhotoEmpty,
    ReactionCount,
    ReactionCustomEmoji,
    ReactionEmoji,
    SendMessageCancelAction,
    SendMessageTypingAction,
    SponsoredMessage,
    TypeChannelParticipant,
    TypeChat,
    TypeChatParticipant,
    TypeInputChannel,
    TypeInputPeer,
    TypeMessage,
    TypeMessageAction,
    TypePeer,
    TypeReaction,
    TypeUser,
    TypeUserFull,
    TypeUserProfilePhoto,
    UpdateBotMessageReaction,
    UpdateChannelUserTyping,
    UpdateChatUserTyping,
    UpdateMessageReactions,
    UpdateNewMessage,
    UpdatePhoneCall,
    UpdateUserTyping,
    User,
    UserEmpty,
    UserFull,
    UserProfilePhoto,
    UserProfilePhotoEmpty,
)
from telethon.tl.types.messages import PeerDialogs
from telethon.utils import encode_waveform, get_peer_id
import attr

from mautrix.appservice import DOUBLE_PUPPET_SOURCE_KEY, IntentAPI
from mautrix.bridge import BasePortal, NotificationDisabler, RejectMatrixInvite, async_getter_lock
from mautrix.errors import IntentError, MatrixRequestError, MForbidden
from mautrix.types import (
    BatchID,
    BatchSendEvent,
    BatchSendStateEvent,
    BeeperMessageStatusEventContent,
    ContentURI,
    EventID,
    EventType,
    Format,
    ImageInfo,
    JoinRule,
    LocationMessageEventContent,
    MediaMessageEventContent,
    Membership,
    MemberStateEventContent,
    MessageEventContent,
    MessageStatus,
    MessageStatusReason,
    MessageType,
    PowerLevelStateEventContent,
    RelatesTo,
    RelationType,
    RoomAlias,
    RoomAvatarStateEventContent,
    RoomCreatePreset,
    RoomID,
    RoomNameStateEventContent,
    RoomTopicStateEventContent,
    StateEventContent,
    TextMessageEventContent,
    UserID,
    VideoInfo,
)
from mautrix.util import background_task, magic, markdown, variation_selector
from mautrix.util.format_duration import format_duration
from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
from mautrix.util.simple_lock import SimpleLock
from mautrix.util.simple_template import SimpleTemplate

from . import (
    abstract_user as au,
    formatter,
    matrix as m,
    portal_util as putil,
    puppet as p,
    user as u,
    util,
)
from .config import Config
from .db import (
    Backfill,
    BackfillType,
    DisappearingMessage,
    Message as DBMessage,
    Portal as DBPortal,
    Reaction as DBReaction,
    TelegramFile as DBTelegramFile,
)
from .tgclient import MautrixTelegramClient
from .types import TelegramID
from .util import sane_mimetypes

try:
    from mautrix.crypto.attachments import decrypt_attachment
except ImportError:
    decrypt_attachment = None

if TYPE_CHECKING:
    from .__main__ import TelegramBridge
    from .bot import Bot

StateBridge = EventType.find("m.bridge", EventType.Class.STATE)
StateHalfShotBridge = EventType.find("uk.half-shot.bridge", EventType.Class.STATE)
DummyPortalCreated = EventType.find("fi.mau.dummy.portal_created", EventType.Class.MESSAGE)

InviteList = Union[UserID, List[UserID]]
UpdateTyping = Union[UpdateUserTyping, UpdateChatUserTyping, UpdateChannelUserTyping]
TypeChatPhoto = Union[ChatPhoto, ChatPhotoEmpty, Photo, PhotoEmpty]
MediaHandler = Callable[["au.AbstractUser", IntentAPI, Message, RelatesTo], Awaitable[EventID]]

REACTION_POLL_MIN_INTERVAL = 20


class BridgingError(Exception):
    pass


class IgnoredMessageError(Exception):
    pass


class WrappedReaction(NamedTuple):
    reaction: ReactionEmoji | ReactionCustomEmoji
    date: datetime | None


class Portal(DBPortal, BasePortal):
    bot: "Bot"
    config: Config
    matrix: m.MatrixHandler
    disappearing_msg_class = DisappearingMessage

    # Instance cache
    by_mxid: dict[RoomID, Portal] = {}
    by_tgid: dict[tuple[TelegramID, TelegramID], Portal] = {}

    # Config cache
    filter_mode: str
    filter_list: list[int]
    filter_users: bool | None

    max_initial_member_sync: int
    sync_channel_members: bool
    sync_matrix_state: bool
    public_portals: bool
    private_chat_portal_meta: Literal["default", "always", "never"]

    alias_template: SimpleTemplate[str]
    hs_domain: str

    # Instance variables
    deleted: bool

    backfill_lock: SimpleLock
    backfill_method_lock: asyncio.Lock
    backfill_enable: bool

    alias: RoomAlias | None

    dedup: putil.PortalDedup
    send_lock: putil.PortalSendLock
    reaction_lock: putil.PortalReactionLock
    _pin_lock: asyncio.Lock

    _main_intent: IntentAPI | None
    _room_create_lock: asyncio.Lock

    _sponsored_msg: SponsoredMessage | None
    _sponsored_entity: User | Channel | None
    _sponsored_msg_ts: float
    _sponsored_msg_lock: asyncio.Lock
    _sponsored_evt_id: EventID | None
    _sponsored_seen: dict[UserID, bool]
    _new_messages_after_sponsored: bool

    _prev_reaction_poll: dict[UserID, float]

    _msg_conv: putil.TelegramMessageConverter

    def __init__(
        self,
        tgid: TelegramID,
        tg_receiver: TelegramID,
        peer_type: str,
        megagroup: bool = False,
        mxid: RoomID | None = None,
        avatar_url: ContentURI | None = None,
        encrypted: bool = False,
        first_event_id: EventID | None = None,
        next_batch_id: BatchID | None = None,
        base_insertion_id: EventID | None = None,
        sponsored_event_id: EventID | None = None,
        sponsored_event_ts: int | None = None,
        sponsored_msg_random_id: bytes | None = None,
        username: str | None = None,
        title: str | None = None,
        about: str | None = None,
        photo_id: str | None = None,
        name_set: bool = False,
        avatar_set: bool = False,
        local_config: dict[str, Any] | None = None,
    ) -> None:
        super().__init__(
            tgid=tgid,
            tg_receiver=tg_receiver,
            peer_type=peer_type,
            megagroup=megagroup,
            mxid=mxid,
            avatar_url=avatar_url,
            encrypted=encrypted,
            first_event_id=first_event_id,
            next_batch_id=next_batch_id,
            base_insertion_id=base_insertion_id,
            sponsored_event_id=sponsored_event_id,
            sponsored_event_ts=sponsored_event_ts,
            sponsored_msg_random_id=sponsored_msg_random_id,
            username=username,
            title=title,
            about=about,
            photo_id=photo_id,
            name_set=name_set,
            avatar_set=avatar_set,
            local_config=local_config or {},
        )
        BasePortal.__init__(self)
        self.log = self.log.getChild(self.tgid_log if self.tgid else self.mxid)
        self._main_intent = None
        self.deleted = False

        self.backfill_lock = SimpleLock(
            "Waiting for backfilling to finish before handling %s", log=self.log
        )
        self.backfill_method_lock = asyncio.Lock()

        self.dedup = putil.PortalDedup(self)
        self.send_lock = putil.PortalSendLock()
        self.reaction_lock = putil.PortalReactionLock()
        self._pin_lock = asyncio.Lock()
        self._room_create_lock = asyncio.Lock()

        self._sponsored_msg = None
        self._sponsored_msg_ts = 0
        self._sponsored_msg_lock = asyncio.Lock()
        self._sponsored_seen = {}
        self._new_messages_after_sponsored = True
        self._bridging_blocked_at_runtime = False

        self._prev_reaction_poll = defaultdict(lambda: 0.0)

        self._msg_conv = putil.TelegramMessageConverter(self)

    # region Properties

    @property
    def tgid_full(self) -> tuple[TelegramID, TelegramID]:
        return self.tgid, self.tg_receiver

    @property
    def tgid_log(self) -> str:
        if self.tgid == self.tg_receiver:
            return str(self.tgid)
        return f"{self.tg_receiver}<->{self.tgid}"

    @property
    def name(self) -> str:
        return self.title

    @property
    def alias(self) -> RoomAlias | None:
        if not self.username:
            return None
        return RoomAlias(f"#{self.alias_localpart}:{self.hs_domain}")

    @property
    def alias_localpart(self) -> str | None:
        if not self.username:
            return None
        return self.alias_template.format(self.username)

    @property
    def peer(self) -> TypePeer | TypeInputPeer:
        if self.peer_type == "user":
            return PeerUser(user_id=self.tgid)
        elif self.peer_type == "chat":
            return PeerChat(chat_id=self.tgid)
        elif self.peer_type == "channel":
            return PeerChannel(channel_id=self.tgid)

    @property
    def is_direct(self) -> bool:
        return self.peer_type == "user"

    @property
    def is_channel(self) -> bool:
        return self.peer_type == "channel"

    @property
    def has_bot(self) -> bool:
        return bool(self.bot) and (
            self.bot.is_in_chat(self.tgid)
            or (self.peer_type == "user" and self.tg_receiver == self.bot.tgid)
        )

    @property
    def main_intent(self) -> IntentAPI:
        if self._main_intent is None:
            raise RuntimeError("Portal must be postinit()ed before main_intent can be used")
        return self._main_intent

    @property
    def allow_bridging(self) -> bool:
        if self._bridging_blocked_at_runtime:
            return False
        elif self.peer_type == "user" and self.filter_users is not None:
            return self.filter_users
        elif self.filter_mode == "whitelist":
            return self.tgid in self.filter_list
        elif self.filter_mode == "blacklist":
            return self.tgid not in self.filter_list
        return True

    @property
    def set_dm_room_metadata(self) -> bool:
        return (
            not self.is_direct
            or self.private_chat_portal_meta == "always"
            or (self.encrypted and self.private_chat_portal_meta != "never")
        )

    @classmethod
    def init_cls(cls, bridge: "TelegramBridge") -> None:
        BasePortal.bridge = bridge
        cls.az = bridge.az
        cls.config = bridge.config
        cls.loop = bridge.loop
        cls.matrix = bridge.matrix
        cls.bot = bridge.bot

        cls.max_initial_member_sync = cls.config["bridge.max_initial_member_sync"]
        cls.sync_channel_members = cls.config["bridge.sync_channel_members"]
        cls.sync_matrix_state = cls.config["bridge.sync_matrix_state"]
        cls.public_portals = cls.config["bridge.public_portals"]
        cls.private_chat_portal_meta = cls.config["bridge.private_chat_portal_meta"]
        cls.filter_mode = cls.config["bridge.filter.mode"]
        cls.filter_list = cls.config["bridge.filter.list"]
        cls.filter_users = cls.config["bridge.filter.users"]
        cls.hs_domain = cls.config["homeserver.domain"]
        cls.backfill_enable = cls.config["bridge.backfill.enable"]
        cls.alias_template = SimpleTemplate(
            cls.config["bridge.alias_template"],
            "groupname",
            prefix="#",
            suffix=f":{cls.hs_domain}",
        )
        NotificationDisabler.puppet_cls = p.Puppet
        NotificationDisabler.config_enabled = cls.config["bridge.backfill.disable_notifications"]

    # endregion
    # region Matrix -> Telegram metadata

    async def save(self) -> None:
        if self.deleted:
            await super().insert()
            await self.postinit()
            self.deleted = False
        else:
            await super().save()

    async def get_telegram_users_in_matrix_room(
        self, source: u.User, pre_create: bool = False
    ) -> tuple[list[InputUser], list[UserID], list[u.User]]:
        user_tgids = {}
        users = []
        intent = self.az.intent if pre_create else self.main_intent
        user_mxids = await intent.get_room_members(self.mxid, (Membership.JOIN, Membership.INVITE))
        for mxid in user_mxids:
            if mxid == self.az.bot_mxid:
                continue
            mx_user = await u.User.get_by_mxid(mxid, create=False)
            if mx_user and mx_user.tgid:
                users.append(mx_user)
                user_tgids[mx_user.tgid] = mxid
            puppet_id = p.Puppet.get_id_from_mxid(mxid)
            if puppet_id:
                user_tgids[puppet_id] = mxid
        input_users = []
        errors = []
        for tgid, mxid in user_tgids.items():
            try:
                input_users.append(await source.client.get_input_entity(tgid))
            except ValueError as e:
                source.log.debug(
                    f"Failed to find the input entity for {tgid} ({mxid}) for "
                    f"creating a group: {e}"
                )
                errors.append(mxid)
        return input_users, errors, users

    async def upgrade_telegram_chat(self, source: u.User) -> None:
        if self.peer_type != "chat":
            raise ValueError("Only normal group chats are upgradable to supergroups.")

        response = await source.client(MigrateChatRequest(chat_id=self.tgid))
        entity = None
        for chat in response.chats:
            if isinstance(chat, Channel):
                entity = chat
                break
        if not entity:
            raise ValueError("Upgrade may have failed: output channel not found.")
        await self._migrate_and_save_telegram(TelegramID(entity.id))
        await self.update_info(source, entity)

    async def _migrate_and_save_telegram(self, new_id: TelegramID) -> None:
        async with self._async_get_locks[(new_id,)]:
            await self._migrate_and_save_telegram_locked(new_id)

    async def _migrate_and_save_telegram_locked(self, new_id: TelegramID) -> None:
        self.log.info(f"Starting migration to {new_id}")
        try:
            del self.by_tgid[self.tgid_full]
        except KeyError:
            pass
        try:
            existing = self.by_tgid[(new_id, new_id)]
        except KeyError:
            existing = None
        self.by_tgid[(new_id, new_id)] = self
        if existing:
            if existing.mxid:
                self.log.warning(f"Deleting existing portal room {existing.mxid} for {new_id}")
                await existing.cleanup_and_delete()
            else:
                self.log.debug(f"Deleting old database entry for {new_id}")
                await existing.delete()
        old_id = self.tgid
        await self.update_id(new_id, "channel")
        self.log = self.__class__.log.getChild(self.tgid_log)
        self.log.info(f"Telegram chat upgraded from {old_id}")

    async def set_telegram_username(self, source: u.User, username: str) -> None:
        if self.peer_type != "channel":
            raise ValueError("Only channels and supergroups have usernames.")
        await source.client(UpdateUsernameRequest(await self.get_input_entity(source), username))
        if await self._update_username(username):
            await self.save()

    async def create_telegram_chat(self, source: u.User, supergroup: bool = False) -> None:
        if not self.mxid:
            raise ValueError("Can't create Telegram chat for portal without Matrix room.")
        invites, errors, users = await self.get_telegram_users_in_matrix_room(
            source, pre_create=True
        )
        if len(errors) > 0:
            error_list = "\n".join(f"* [{mxid}](https://matrix.to/#/{mxid})" for mxid in errors)
            command_prefix = self.config["bridge.command_prefix"]
            message = (
                f"Failed to add the following users to the chat:\n\n{error_list}\n\n"
                f"You can try `{command_prefix} search -r <username>` to help the bridge find "
                "those users."
            )
            await self.az.intent.send_notice(
                self.mxid, text=message, html=markdown.render(message)
            )
        elif self.tgid:
            raise ValueError("Can't create Telegram chat for portal with existing Telegram chat.")

        if len(invites) < 2:
            if self.bot is not None:
                info, mxid = await self.bot.get_me()
                raise ValueError(
                    "Not enough Telegram users to create a chat. "
                    "Invite more Telegram ghost users to the room, such as the "
                    f"relaybot ([{info.first_name}](https://matrix.to/#/{mxid}))."
                )
            raise ValueError(
                "Not enough Telegram users to create a chat. "
                "Invite more Telegram ghost users to the room."
            )
        if self.peer_type == "chat":
            response = await source.client(CreateChatRequest(title=self.title, users=invites))
            entity = response.chats[0]
        elif self.peer_type == "channel":
            response = await source.client(
                CreateChannelRequest(
                    title=self.title, about=self.about or "", megagroup=supergroup
                )
            )
            entity = response.chats[0]
            await source.client(
                InviteToChannelRequest(
                    channel=await source.client.get_input_entity(entity), users=invites
                )
            )
        else:
            raise ValueError("Invalid peer type for Telegram chat creation")

        self.tgid = entity.id
        self.tg_receiver = self.tgid
        await self.postinit()
        await self.insert()
        await self.update_info(source, entity)
        self.log = self.__class__.log.getChild(self.tgid_log)

        if self.bot and self.bot.tgid in invites:
            await self.bot.add_chat(self.tgid, self.peer_type)

        levels = await self.main_intent.get_power_levels(self.mxid)
        if levels.get_user_level(self.main_intent.mxid) == 100:
            levels = putil.get_base_power_levels(self, levels, entity)
            await self.main_intent.set_power_levels(self.mxid, levels)
        await self.handle_matrix_power_levels(source, levels.users, {}, None)
        await self.update_bridge_info()
        for user in users:
            await user.register_portal(self)
        await self.main_intent.send_notice(self.mxid, f"Telegram chat created. ID: {self.tgid}")

    async def handle_matrix_invite(
        self, invited_by: u.User, puppet: p.Puppet | au.AbstractUser
    ) -> None:
        if isinstance(puppet, p.Puppet) and puppet.is_channel:
            raise ValueError("Can't invite channels to chats")
        try:
            if self.peer_type == "chat":
                await invited_by.client(
                    AddChatUserRequest(chat_id=self.tgid, user_id=puppet.tgid, fwd_limit=0)
                )
            elif self.peer_type == "channel":
                await invited_by.client(
                    InviteToChannelRequest(channel=self.peer, users=[puppet.tgid])
                )
            # We don't care if there are invites for private chat portals with the relaybot.
            elif not self.bot or self.tg_receiver != self.bot.tgid:
                raise RejectMatrixInvite("You can't invite additional users to private chats.")
        except RPCError as e:
            raise RejectMatrixInvite(e.message) from e

    # endregion
    # region Telegram -> Matrix metadata

    def _get_invite_content(self, double_puppet: p.Puppet | None) -> dict[str, Any]:
        invite_content = {}
        if double_puppet:
            invite_content["fi.mau.will_auto_accept"] = True
        if self.is_direct:
            invite_content["is_direct"] = True
        return invite_content

    async def invite_to_matrix(self, users: InviteList) -> None:
        if isinstance(users, list):
            for user in users:
                await self.invite_to_matrix(user)
        else:
            puppet = await p.Puppet.get_by_custom_mxid(users)
            await self.main_intent.invite_user(
                self.mxid, users, check_cache=True, extra_content=self._get_invite_content(puppet)
            )
            if puppet:
                try:
                    await puppet.intent.ensure_joined(self.mxid)
                except Exception:
                    self.log.exception("Failed to ensure %s is joined to portal", users)

    async def update_matrix_room(
        self,
        user: au.AbstractUser,
        entity: TypeChat | User,
        puppet: p.Puppet = None,
        levels: PowerLevelStateEventContent = None,
        users: list[User] = None,
        client: MautrixTelegramClient | None = None,
    ) -> None:
        try:
            await self._update_matrix_room(user, entity, puppet, levels, users, client)
        except Exception:
            self.log.exception("Fatal error updating Matrix room")

    async def _update_matrix_room(
        self,
        user: au.AbstractUser,
        entity: TypeChat | User,
        puppet: p.Puppet = None,
        levels: PowerLevelStateEventContent = None,
        users: list[User] = None,
        client: MautrixTelegramClient | None = None,
    ) -> None:
        if not client:
            client = user.client
        if not self.is_direct:
            await self.update_info(user, entity, client=client)
            if not users:
                users = await self._get_users(client, entity)
            await self._sync_telegram_users(user, users, client=client)
            await self.update_power_levels(users, levels)
        else:
            if not puppet:
                puppet = await self.get_dm_puppet()
            await puppet.update_info(user, entity)
            await puppet.intent_for(self).join_room(self.mxid)
            await self.update_info_from_puppet(puppet, user, entity.photo)

            puppet = await p.Puppet.get_by_custom_mxid(user.mxid)
            if puppet:
                try:
                    did_join = await puppet.intent.ensure_joined(self.mxid)
                    if isinstance(user, u.User) and did_join and self.peer_type == "user":
                        await user.update_direct_chats({self.main_intent.mxid: [self.mxid]})
                except Exception:
                    self.log.exception("Failed to ensure %s is joined to portal", user.mxid)

        if self.sync_matrix_state:
            await self.main_intent.get_joined_members(self.mxid)

    async def update_info_from_puppet(
        self,
        puppet: p.Puppet | None = None,
        source: au.AbstractUser | None = None,
        photo: UserProfilePhoto | None = None,
    ) -> None:
        if puppet is None:
            puppet = await self.get_dm_puppet()
        changed = await self._update_avatar_from_puppet(puppet, source, photo)
        changed = await self._update_title(puppet.displayname) or changed
        if changed:
            await self.save()
            await self.update_bridge_info()

    async def create_matrix_room(
        self,
        user: au.AbstractUser,
        entity: TypeChat | User = None,
        invites: InviteList = None,
        update_if_exists: bool = True,
        from_dialog_sync: bool = False,
        client: MautrixTelegramClient | None = None,
    ) -> RoomID | None:
        if self.mxid:
            if update_if_exists:
                if not entity:
                    try:
                        entity = await self.get_entity(user, client)
                    except Exception:
                        self.log.exception(f"Failed to get entity through {user.tgid} for update")
                        return self.mxid
                update = self.update_matrix_room(user, entity)
                background_task.create(update)
                await self.invite_to_matrix(invites or [])
            return self.mxid
        elif user.is_relaybot and self.config["bridge.relaybot.ignore_unbridged_group_chat"]:
            raise Exception("create_matrix_room called as relaybot")
        async with self._room_create_lock:
            try:
                return await self._create_matrix_room(
                    user, entity, invites, client=client, from_dialog_sync=from_dialog_sync
                )
            except Exception:
                self.log.exception("Fatal error creating Matrix room")

    @property
    def bridge_info_state_key(self) -> str:
        return f"net.maunium.telegram://telegram/{self.tgid}"

    @property
    def bridge_info(self) -> dict[str, Any]:
        info = {
            "bridgebot": self.az.bot_mxid,
            "creator": self.main_intent.mxid,
            "protocol": {
                "id": "telegram",
                "displayname": "Telegram",
                "avatar_url": self.config["appservice.bot_avatar"],
                "external_url": "https://telegram.org",
            },
            "channel": {
                "id": str(self.tgid),
                "displayname": self.title,
                "avatar_url": self.avatar_url,
            },
        }
        if self.username:
            info["channel"]["external_url"] = f"https://t.me/{self.username}"
        elif self.peer_type == "user":
            # TODO this doesn't feel very reliable
            puppet = p.Puppet.by_tgid.get(self.tgid, None)
            if puppet and puppet.username:
                info["channel"]["external_url"] = f"https://t.me/{puppet.username}"
        return info

    async def update_bridge_info(self) -> None:
        if not self.mxid:
            self.log.debug("Not updating bridge info: no Matrix room created")
            return
        try:
            self.log.debug("Updating bridge info...")
            await self.main_intent.send_state_event(
                self.mxid, StateBridge, self.bridge_info, self.bridge_info_state_key
            )
            # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
            await self.main_intent.send_state_event(
                self.mxid, StateHalfShotBridge, self.bridge_info, self.bridge_info_state_key
            )
        except Exception:
            self.log.warning("Failed to update bridge info", exc_info=True)

    async def _create_matrix_room(
        self,
        user: au.AbstractUser,
        entity: TypeChat | User,
        invites: InviteList,
        from_dialog_sync: bool,
        client: MautrixTelegramClient | None = None,
    ) -> RoomID | None:
        if self.mxid:
            return self.mxid
        elif not self.allow_bridging:
            return None
        if not client:
            client = user.client

        invites = invites or []

        dialog = None
        if not from_dialog_sync and not user.is_bot:
            self.log.debug("Fetching dialog info for new portal")
            try:
                dialogs: PeerDialogs | None = await user.client(
                    GetPeerDialogsRequest(
                        peers=[InputDialogPeer(await self.get_input_entity(user))]
                    )
                )
            except Exception:
                self.log.warning("Failed to fetch dialog info", exc_info=True)
                dialogs = None
            if dialogs and dialogs.chats and dialogs.chats[0].id == self.tgid:
                entity = dialogs.chats[0]
                self.log.debug("Got entity info from get dialogs request")
            elif dialogs and self.is_direct and dialogs.users:
                for dialog_user in dialogs.users:
                    if dialog_user.id == self.tgid:
                        entity = dialog_user
                        self.log.debug("Got user entity info from get dialogs request")
                        break
            if dialogs and dialogs.dialogs:
                entities = {
                    get_peer_id(x): x
                    for x in itertools.chain(dialogs.users, dialogs.chats)
                    if not isinstance(x, (UserEmpty, ChatEmpty))
                }
                msg = dialogs.messages[0] if len(dialogs.messages) == 1 else None
                dialog = Dialog(user.client, dialogs.dialogs[0], entities, msg)
                self.log.debug("Got dialog info for new portal: %s", dialog)

        if not entity:
            entity = await self.get_entity(user, client)
            self.log.trace("Fetched data: %s", entity)

        participants_count = 2
        if isinstance(entity, Chat):
            participants_count = entity.participants_count
            if entity.deactivated or entity.migrated_to:
                self.log.error(
                    "Throwing error for attempted portal creation "
                    f"({entity.deactivated=}, {entity.migrated_to=})"
                )
                raise RuntimeError("Tried to create portal for deactivated chat")
        elif isinstance(entity, Channel) and not entity.broadcast:
            participants_count = entity.participants_count
        if participants_count is None and self.config["bridge.max_member_count"] > 0:
            self.log.warning(f"Participant count not found in entity, fetching manually")
            participants_count = (await client.get_participants(entity, limit=0)).total
        if participants_count and 0 < self.config["bridge.max_member_count"] < participants_count:
            self.log.warning(f"Not bridging chat, too many participants (%d)", participants_count)
            self._bridging_blocked_at_runtime = True
            return None

        self.log.debug("Preparing to create room")

        if self.is_direct:
            puppet = await self.get_dm_puppet()
            await puppet.update_info(user, entity, client_override=client)
            self._main_intent = puppet.intent_for(self)
            if self.tgid == user.tgid:
                self.title = "Telegram Saved Messages"
                self.about = "Your Telegram cloud storage chat"
        else:
            puppet = None
            self._main_intent = self.az.intent
            await self.update_info(user, entity, client=client)

        preset = RoomCreatePreset.PRIVATE
        if self.peer_type == "channel" and entity.username:
            if self.public_portals:
                preset = RoomCreatePreset.PUBLIC
            self.username = entity.username
            alias = self.alias_localpart
        else:
            # TODO invite link alias?
            alias = None

        if alias:
            # TODO? properly handle existing room aliases
            await self.main_intent.remove_room_alias(alias)

        power_levels = putil.get_base_power_levels(self, entity=entity)
        users = None
        if not self.is_direct:
            users = await self._get_users(client, entity)
            if self.has_bot:
                extra_invites = self.config["bridge.relaybot.group_chat_invite"]
                invites += extra_invites
                for invite in extra_invites:
                    power_levels.users.setdefault(invite, 100)
            await putil.participants_to_power_levels(self, users, power_levels)
        elif self.bot and self.tg_receiver == self.bot.tgid:
            assert puppet is not None
            invites += self.config["bridge.relaybot.private_chat.invite"]
            for invite in invites:
                power_levels.users.setdefault(invite, 100)
            self.title = puppet.displayname

        initial_state = [
            {
                "type": EventType.ROOM_POWER_LEVELS.serialize(),
                "content": power_levels.serialize(),
            },
            {
                "type": str(StateBridge),
                "state_key": self.bridge_info_state_key,
                "content": self.bridge_info,
            },
            # TODO remove this once https://github.com/matrix-org/matrix-doc/pull/2346 is in spec
            {
                "type": str(StateHalfShotBridge),
                "state_key": self.bridge_info_state_key,
                "content": self.bridge_info,
            },
        ]
        autojoin_invites = self.bridge.homeserver_software.is_hungry
        create_invites = set()
        if autojoin_invites:
            create_invites |= set(invites)
            invites = []
            if not self.is_direct:
                create_invites |= await self._sync_telegram_users(user, users, client=client)
        if self.config["bridge.encryption.default"] and self.matrix.e2ee:
            self.encrypted = True
            initial_state.append(
                {
                    "type": str(EventType.ROOM_ENCRYPTION),
                    "content": self.get_encryption_state_event_json(),
                }
            )
            if self.is_direct:
                create_invites.add(self.az.bot_mxid)
        if self.is_direct:
            assert puppet is not None
            self.title = puppet.displayname
            self.avatar_url = puppet.avatar_url
            self.photo_id = puppet.photo_id
        creation_content = {}
        if not self.config["bridge.federate_rooms"]:
            creation_content["m.federate"] = False
        if self.avatar_url and self.set_dm_room_metadata:
            initial_state.append(
                {
                    "type": str(EventType.ROOM_AVATAR),
                    "content": {"url": self.avatar_url},
                }
            )

        with self.backfill_lock:
            self.log.debug(
                f"Creating room with parameters invite={create_invites}, {autojoin_invites=}, "
                f"{preset=}, {alias=!r}, name={self.title!r}, topic={self.about!r}, "
                f"{creation_content=}, is_direct={self.is_direct}, {self.set_dm_room_metadata=}"
            )
            room_id = await self.main_intent.create_room(
                alias_localpart=alias,
                preset=preset,
                is_direct=self.is_direct,
                invitees=list(create_invites),
                name=self.title if self.set_dm_room_metadata else None,
                topic=self.about,
                initial_state=initial_state,
                creation_content=creation_content,
                beeper_auto_join_invites=autojoin_invites,
            )
            if not room_id:
                raise Exception(f"Failed to create room")
            self.name_set = bool(self.title) and self.set_dm_room_metadata
            self.avatar_set = bool(self.avatar_url) and self.set_dm_room_metadata

            if not autojoin_invites and self.encrypted and self.matrix.e2ee and self.is_direct:
                try:
                    await self.az.intent.ensure_joined(room_id)
                except Exception:
                    self.log.warning(f"Failed to add bridge bot to new private chat {room_id}")

            self.mxid = room_id
            self.by_mxid[self.mxid] = self
            await self.save()
            self.log.debug(f"Matrix room created: {self.mxid}")
            await self.az.state_store.set_power_levels(self.mxid, power_levels)
            await user.register_portal(self)
            if dialog and isinstance(user, u.User):
                await user.post_sync_dialog(
                    self, puppet=None, was_created=True, **user.dialog_to_sync_args(dialog)
                )

            if not autojoin_invites or not self.is_direct:
                await self.invite_to_matrix(invites)
                await self.update_matrix_room(
                    user, entity, puppet, levels=power_levels, users=users, client=client
                )
            else:
                # When using autojoining, all metadata is already set, so just update state caches
                await self.main_intent.get_joined_members(self.mxid)

            self.first_event_id = await self.main_intent.send_message_event(
                self.mxid, DummyPortalCreated, {}
            )
            await self.save()

            if self.backfill_enable:
                try:
                    await self.forward_backfill(user, initial=True, client=client)
                except Exception:
                    self.log.exception("Error in initial backfill")
                if self._enable_batch_sending:
                    await self.enqueue_backfill(user, priority=50)

        return self.mxid

    async def _get_users(
        self,
        client: MautrixTelegramClient,
        entity: TypeInputPeer | InputUser | TypeChat | TypeUser | InputChannel,
    ) -> list[TypeUser]:
        if self.peer_type == "channel" and not self.megagroup and not self.sync_channel_members:
            return []
        limit = self.max_initial_member_sync
        if limit == 0:
            return []
        return await putil.get_users(client, self.tgid, entity, limit, self.peer_type)

    async def update_power_levels(
        self,
        users: list[TypeUser | TypeChatParticipant | TypeChannelParticipant],
        levels: PowerLevelStateEventContent = None,
    ) -> None:
        if not levels:
            levels = await self.main_intent.get_power_levels(self.mxid)
        if await putil.participants_to_power_levels(self, users, levels):
            await self.main_intent.set_power_levels(self.mxid, levels)

    async def update_default_banned_rights(self, dbr: ChatBannedRights) -> None:
        self.log.debug("Default rights in chat changed: %s", dbr)
        levels = await self.main_intent.get_power_levels(self.mxid)
        levels = putil.get_base_power_levels(self, levels, dbr=dbr)
        await self.main_intent.set_power_levels(self.mxid, levels)

    async def _add_bot_chat(self, bot: User) -> None:
        if self.bot and bot.id == self.bot.tgid:
            await self.bot.add_chat(self.tgid, self.peer_type)
            return

        user = await u.User.get_by_tgid(TelegramID(bot.id))
        if user and user.is_bot:
            await user.register_portal(self)

    async def _sync_telegram_users(
        self,
        source: au.AbstractUser,
        users: list[User],
        client: MautrixTelegramClient | None = None,
    ) -> set[UserID] | None:
        allowed_tgids = set()
        join_mxids = set()
        skip_deleted = self.config["bridge.skip_deleted_members"]
        for entity in users:
            puppet = await p.Puppet.get_by_tgid(TelegramID(entity.id))
            if entity.bot:
                await self._add_bot_chat(entity)
            allowed_tgids.add(entity.id)

            await puppet.update_info(source, entity, client_override=client)
            if skip_deleted and entity.deleted:
                continue

            if self.mxid:
                await puppet.intent_for(self).ensure_joined(self.mxid)
            else:
                join_mxids.add(puppet.intent_for(self).mxid)

            user = await u.User.get_by_tgid(TelegramID(entity.id))
            if user:
                if self.mxid:
                    await self.invite_to_matrix(user.mxid)
                else:
                    join_mxids.add(user.mxid)

        if not self.mxid:
            return join_mxids

        # We can't trust the member list if any of the following cases is true:
        #  * There are close to 10 000 users, because Telegram might not be sending all members.
        #  * The member sync count is limited, because then we might ignore some members.
        #  * It's a channel, because non-admins don't have access to the member list
        #    and even admins can only see 200 members.
        #  * The source user is not in the chat, because that likely means it's a group
        #    with the member list hidden (so only admins are visible).
        trust_member_list = (
            (
                len(allowed_tgids) < 9900
                if self.max_initial_member_sync < 0
                else len(allowed_tgids) < self.max_initial_member_sync - 10
            )
            and (self.megagroup or self.peer_type != "channel")
            and source.tgid in allowed_tgids
        )
        if not trust_member_list:
            return None

        for user_mxid in await self.main_intent.get_room_members(self.mxid):
            if user_mxid == self.az.bot_mxid:
                continue

            puppet = await p.Puppet.get_by_mxid(user_mxid)
            if puppet:
                # TODO figure out when/how to clean up channels from the member list
                if puppet.id in allowed_tgids or puppet.is_channel:
                    continue
                if self.bot and puppet.id == self.bot.tgid:
                    await self.bot.remove_chat(self.tgid)
                try:
                    await self.main_intent.kick_user(
                        self.mxid, user_mxid, "User had left this Telegram chat."
                    )
                except MForbidden:
                    pass
                continue

            mx_user = await u.User.get_by_mxid(user_mxid, create=False)
            if mx_user:
                if mx_user.tgid in allowed_tgids:
                    continue
                if mx_user.is_bot:
                    await mx_user.unregister_portal(*self.tgid_full)
                if not self.has_bot:
                    try:
                        await self.main_intent.kick_user(
                            self.mxid, mx_user.mxid, "You had left this Telegram chat."
                        )
                    except MForbidden:
                        pass

        return None

    async def _add_telegram_user(
        self, user_id: TelegramID, source: au.AbstractUser | None = None
    ) -> None:
        puppet = await p.Puppet.get_by_tgid(user_id)
        if source:
            try:
                entity: User = await source.client.get_entity(PeerUser(user_id))
            except ValueError:
                self.log.warning(
                    f"Couldn't get info of {user_id} through {source.tgid} to add them to the room"
                )
                return
            await puppet.update_info(source, entity)
            await puppet.intent_for(self).ensure_joined(self.mxid)

        user = await u.User.get_by_tgid(user_id)
        if user:
            await user.register_portal(self)
            await self.invite_to_matrix(user.mxid)

    async def delete_telegram_user(self, user_id: TelegramID, sender: p.Puppet | None) -> None:
        puppet = await p.Puppet.get_by_tgid(user_id)
        if sender is None:
            sender = puppet
        user = await u.User.get_by_tgid(user_id)
        kick_message = (
            f"Kicked by {sender.displayname}"
            if sender and sender.tgid != puppet.tgid
            else "Left Telegram chat"
        )
        puppet_extra_content = None
        if sender.is_real_user:
            puppet_extra_content = {DOUBLE_PUPPET_SOURCE_KEY: self.bridge.name}
        if sender.tgid != puppet.tgid:
            try:
                await sender.intent_for(self).kick_user(
                    self.mxid, puppet.mxid, extra_content=puppet_extra_content
                )
            except MForbidden:
                try:
                    await self.main_intent.kick_user(self.mxid, puppet.mxid, kick_message)
                except MForbidden as e:
                    self.log.warning(f"Failed to kick {puppet.mxid}: {e}")
        elif await self.az.state_store.is_joined(self.mxid, puppet.intent_for(self).mxid):
            await puppet.intent_for(self).leave_room(self.mxid, extra_content=puppet_extra_content)
        if user:
            await user.unregister_portal(*self.tgid_full)
            if sender.tgid != puppet.tgid:
                try:
                    await sender.intent_for(self).kick_user(
                        self.mxid, user.mxid, extra_content=puppet_extra_content
                    )
                    return
                except MForbidden:
                    pass
            try:
                await self.main_intent.kick_user(self.mxid, user.mxid, kick_message)
            except MForbidden as e:
                self.log.warning(f"Failed to kick {user.mxid}: {e}")

    async def update_info(
        self,
        user: au.AbstractUser,
        entity: TypeChat = None,
        client: MautrixTelegramClient | None = None,
    ) -> None:
        if self.peer_type == "user":
            self.log.warning("Called update_info() for direct chat portal")
            return

        changed = False
        self.log.debug("Updating info")
        try:
            if not entity:
                entity = await self.get_entity(user, client)
                self.log.trace("Fetched data: %s", entity)

            if self.peer_type == "channel":
                changed = self.megagroup != entity.megagroup or changed
                self.megagroup = entity.megagroup
                changed = await self._update_username(entity.username) or changed

            if hasattr(entity, "about"):
                changed = self._update_about(entity.about) or changed

            changed = await self._update_title(entity.title) or changed

            if isinstance(entity.photo, ChatPhoto):
                changed = await self._update_avatar(user, entity.photo, client=client) or changed
        except Exception:
            self.log.exception(f"Failed to update info from source {user.tgid}")

        if changed:
            await self.save()
            await self.update_bridge_info()

    async def _update_username(self, username: str, save: bool = False) -> bool:
        if self.username == username:
            return False

        if self.username:
            await self.main_intent.remove_room_alias(self.alias_localpart)
        self.username = username or None
        if self.mxid:
            if self.username:
                await self.main_intent.add_room_alias(
                    self.mxid, self.alias_localpart, override=True
                )
                if self.public_portals:
                    await self.main_intent.set_join_rule(self.mxid, JoinRule.PUBLIC)
            else:
                await self.main_intent.set_join_rule(self.mxid, JoinRule.INVITE)

        if save:
            await self.save()
        return True

    async def _try_set_state(
        self, sender: p.Puppet | None, evt_type: EventType, content: StateEventContent
    ) -> None:
        if sender:
            try:
                intent = sender.intent_for(self)
                if sender.is_real_user:
                    content[DOUBLE_PUPPET_SOURCE_KEY] = self.bridge.name
                await intent.send_state_event(self.mxid, evt_type, content)
            except MForbidden:
                await self.main_intent.send_state_event(self.mxid, evt_type, content)
        else:
            await self.main_intent.send_state_event(self.mxid, evt_type, content)

    async def _update_about(
        self, about: str, sender: p.Puppet | None = None, save: bool = False
    ) -> bool:
        if self.about == about:
            return False

        self.about = about
        if self.mxid:
            await self._try_set_state(
                sender, EventType.ROOM_TOPIC, RoomTopicStateEventContent(topic=self.about)
            )
        if save:
            await self.save()
        return True

    async def _update_title(
        self, title: str, sender: p.Puppet | None = None, save: bool = False
    ) -> bool:
        if self.title == title and (self.name_set or not self.set_dm_room_metadata):
            return False

        self.title = title
        self.name_set = False
        if self.mxid and self.set_dm_room_metadata:
            try:
                await self._try_set_state(
                    sender, EventType.ROOM_NAME, RoomNameStateEventContent(name=self.title)
                )
                self.name_set = True
            except Exception as e:
                self.log.warning(f"Failed to set room name: {e}")
        if save:
            await self.save()
        return True

    async def _update_avatar_from_puppet(
        self, puppet: p.Puppet, user: au.AbstractUser | None, photo: UserProfilePhoto | None
    ) -> bool:
        if self.photo_id == puppet.photo_id and (self.avatar_set or not self.set_dm_room_metadata):
            return False
        if puppet.avatar_url:
            self.photo_id = puppet.photo_id
            self.avatar_url = puppet.avatar_url
            self.avatar_set = False
            if self.mxid and self.set_dm_room_metadata:
                try:
                    await self._try_set_state(
                        None,
                        EventType.ROOM_AVATAR,
                        RoomAvatarStateEventContent(url=self.avatar_url),
                    )
                    self.avatar_set = True
                except Exception as e:
                    self.log.warning(f"Failed to set room avatar: {e}")
            return True
        elif photo is not None and user is not None and self.set_dm_room_metadata:
            return await self._update_avatar(user, photo=photo)
        else:
            return False

    async def _update_avatar(
        self,
        user: au.AbstractUser,
        photo: TypeChatPhoto | TypeUserProfilePhoto,
        sender: p.Puppet | None = None,
        save: bool = False,
        client: MautrixTelegramClient | None = None,
    ) -> bool:
        if isinstance(photo, (ChatPhoto, UserProfilePhoto)):
            loc = InputPeerPhotoFileLocation(
                peer=await self.get_input_entity(user), photo_id=photo.photo_id, big=True
            )
            photo_id = str(photo.photo_id)
        elif isinstance(photo, Photo):
            loc, _ = self._msg_conv.get_largest_photo_size(photo)
            photo_id = str(loc.id)
        elif isinstance(photo, (UserProfilePhotoEmpty, ChatPhotoEmpty, PhotoEmpty, type(None))):
            photo_id = ""
            loc = None
        else:
            raise ValueError(f"Unknown photo type {type(photo)}")
        if (
            self.peer_type == "user"
            and not photo_id
            and not self.config["bridge.allow_avatar_remove"]
        ):
            return False
        if self.photo_id != photo_id or not self.avatar_set:
            if not photo_id:
                self.photo_id = ""
                self.avatar_url = None
            elif self.photo_id != photo_id or not self.avatar_url:
                file = await util.transfer_file_to_matrix(
                    client or user.client,
                    self.main_intent,
                    loc,
                    async_upload=self.config["homeserver.async_media"],
                )
                if not file:
                    return False
                self.photo_id = photo_id
                self.avatar_url = file.mxc
            if self.mxid:
                try:
                    await self._try_set_state(
                        sender,
                        EventType.ROOM_AVATAR,
                        RoomAvatarStateEventContent(url=self.avatar_url),
                    )
                    self.avatar_set = True
                except Exception as e:
                    self.log.warning(f"Failed to set room avatar: {e}")
                    self.avatar_set = False
            if save:
                await self.save()
            return True
        return False

    # endregion
    # region Matrix -> Telegram bridging

    async def _send_delivery_receipt(
        self, event_id: EventID, room_id: RoomID | None = None
    ) -> None:
        if event_id and self.config["bridge.delivery_receipts"]:
            try:
                await self.az.intent.mark_read(room_id or self.mxid, event_id)
            except Exception:
                self.log.exception("Failed to send delivery receipt for %s", event_id)

    async def _get_state_change_message(
        self, event: str, user: u.User, **kwargs: Any
    ) -> str | None:
        tpl = self.get_config(f"state_event_formats.{event}")
        if len(tpl) == 0:
            # Empty format means they don't want the message
            return None
        displayname = await self.get_displayname(user)

        tpl_args = {
            "mxid": user.mxid,
            "username": user.mxid_localpart,
            "displayname": escape_html(displayname),
            "distinguisher": self._get_distinguisher(user.mxid),
            **kwargs,
        }
        return Template(tpl).safe_substitute(tpl_args)

    async def _send_state_change_message(
        self, event: str, user: u.User, event_id: EventID, **kwargs: Any
    ) -> None:
        if not self.has_bot:
            return
        elif (
            self.peer_type == "user"
            and not self.config["bridge.relaybot.private_chat.state_changes"]
        ):
            return
        async with self.send_lock(self.bot.tgid):
            message = await self._get_state_change_message(event, user, **kwargs)
            if not message:
                return
            message, entities = await formatter.matrix_to_telegram(self.bot.client, html=message)
            response = await self.bot.client.send_message(
                self.peer, message, formatting_entities=entities
            )
            space = self.tgid if self.peer_type == "channel" else self.bot.tgid
            self.dedup.check(response, (event_id, space))

    async def name_change_matrix(
        self, user: u.User, displayname: str, prev_displayname: str, event_id: EventID
    ) -> None:
        await self._send_state_change_message(
            "name_change",
            user,
            event_id,
            displayname=displayname,
            prev_displayname=prev_displayname,
        )

    async def get_displayname(self, user: u.User) -> str:
        return await self.main_intent.get_room_displayname(self.mxid, user.mxid) or user.mxid

    def set_typing(
        self, user: u.User, typing: bool = True, action: type = SendMessageTypingAction
    ) -> Awaitable[bool]:
        return user.client(
            SetTypingRequest(self.peer, action() if typing else SendMessageCancelAction())
        )

    async def _get_sponsored_message(
        self, user: u.User
    ) -> tuple[SponsoredMessage | None, Channel | User | None]:
        if user.is_bot:
            return None, None
        elif self._sponsored_msg_ts + 5 * 60 > time.monotonic():
            return self._sponsored_msg, self._sponsored_entity

        self.log.trace(f"Fetching a new sponsored message through {user.mxid}")
        self._sponsored_msg, t_id, self._sponsored_entity = await putil.get_sponsored_message(
            user, await self.get_input_entity(user)
        )
        self._sponsored_msg_ts = time.monotonic()
        if self._sponsored_msg is not None and self._sponsored_entity is None:
            self.log.warning(f"GetSponsoredMessages didn't return entity for {t_id}")
        return self._sponsored_msg, self._sponsored_entity

    async def _send_sponsored_msg(self, user: u.User) -> None:
        msg, entity = await self._get_sponsored_message(user)
        if msg is None:
            self.log.trace("Didn't get a sponsored message")
            return
        if self.sponsored_event_id is not None:
            self.log.debug(
                f"Redacting old sponsored {self.sponsored_event_id}"
                " in preparation for sending new one"
            )
            await self.main_intent.redact(self.mxid, self.sponsored_event_id)
        content = await putil.make_sponsored_message_content(user, msg, entity)
        self.log.trace("Sending sponsored message")
        self.sponsored_event_id = await self._send_message(self.main_intent, content)
        self.sponsored_event_ts = int(time.time())
        self.sponsored_msg_random_id = msg.random_id
        self._new_messages_after_sponsored = False
        self._sponsored_seen = {}
        await self.save()
        self.log.debug(
            f"Sent sponsored message {base64.b64encode(self.sponsored_msg_random_id)} "
            f"to Matrix {self.sponsored_event_id} / {self.sponsored_event_ts}"
        )

    @property
    def _sponsored_is_expired(self) -> bool:
        return (
            self.sponsored_event_id is None
            or self.sponsored_event_ts + 24 * 60 * 60 < int(time.time())
        ) and self._new_messages_after_sponsored

    async def _try_handle_read_for_sponsored_msg(
        self, user: u.User, event_id: EventID, timestamp: int
    ) -> None:
        try:
            await self._handle_read_for_sponsored_msg(user, event_id, timestamp)
        except Exception:
            self.log.warning(
                "Error handling read receipt for sponsored message processing", exc_info=True
            )

    async def _handle_read_for_sponsored_msg(
        self, user: u.User, event_id: EventID, timestamp: int
    ) -> None:
        if user.is_bot or not self.username:
            return
        if self._sponsored_is_expired:
            self.log.trace("Sponsored message is expired, sending new one")
            async with self._sponsored_msg_lock:
                if self._sponsored_is_expired:
                    await self._send_sponsored_msg(user)
                    return

        if (
            self.sponsored_event_id == event_id or self.sponsored_event_ts <= timestamp
        ) and not self._sponsored_seen.get(user.mxid, False):
            self._sponsored_seen[user.mxid] = True
            self.log.debug(
                f"Marking sponsored message {self.sponsored_event_id} as seen by {user.mxid}"
            )
            await user.client(
                ViewSponsoredMessageRequest(
                    channel=await self.get_input_entity(user),
                    random_id=self.sponsored_msg_random_id,
                )
            )

    async def mark_read(self, user: u.User, event_id: EventID, timestamp: int) -> None:
        if user.is_bot:
            return
        space = self.tgid if self.peer_type == "channel" else user.tgid
        message = await DBMessage.get_by_mxid(event_id, self.mxid, space)
        if not message:
            message = await DBMessage.find_last(self.mxid, space)
            if not message:
                self.log.debug(
                    f"Dropping Matrix read receipt from {user.mxid}: "
                    f"target message {event_id} not known and last message in chat not found"
                )
                return
            else:
                self.log.debug(
                    f"Matrix read receipt target {event_id} not known, marking "
                    f"messages up to most recent ({message.mxid}/{message.tgid}) "
                    f"as read by {user.mxid}/{user.tgid}"
                )
        else:
            self.log.debug(
                "Handling Matrix read receipt: marking messages up to "
                f"{message.mxid}/{message.tgid} as read by {user.mxid}/{user.tgid}"
            )
        await user.client.send_read_acknowledge(
            self.peer, max_id=message.tgid, clear_mentions=True, clear_reactions=True
        )
        if self.peer_type == "channel":
            if not self.megagroup:
                background_task.create(
                    self._try_handle_read_for_sponsored_msg(user, event_id, timestamp)
                )
            else:
                background_task.create(self._poll_telegram_reactions(user))

    async def _preproc_kick_ban(
        self, user: u.User | p.Puppet, source: u.User
    ) -> au.AbstractUser | None:
        if user.tgid == source.tgid:
            return None
        if self.peer_type == "user" and user.tgid == self.tgid:
            await self.delete()
            return None
        if isinstance(user, u.User) and await user.needs_relaybot(self):
            if not self.bot:
                return None
            # TODO kick message
            return None
        if await source.needs_relaybot(self):
            if not self.has_bot:
                return None
            return self.bot
        return source

    async def kick_matrix(self, user: u.User | p.Puppet, source: u.User) -> None:
        source = await self._preproc_kick_ban(user, source)
        if source is not None:
            await source.client.kick_participant(self.peer, user.peer)

    async def ban_matrix(self, user: u.User | p.Puppet, source: u.User):
        source = await self._preproc_kick_ban(user, source)
        if source is not None:
            await source.client.edit_permissions(self.peer, user.peer, view_messages=False)

    async def leave_matrix(self, user: u.User, event_id: EventID) -> None:
        if await user.needs_relaybot(self):
            await self._send_state_change_message("leave", user, event_id)
            return

        if self.peer_type == "user":
            await self.main_intent.leave_room(self.mxid)
            await self.delete()
            try:
                del self.by_tgid[self.tgid_full]
                del self.by_mxid[self.mxid]
            except KeyError:
                pass
        elif self.config["bridge.bridge_matrix_leave"]:
            await user.client.delete_dialog(self.peer)

    async def join_matrix(self, user: u.User, event_id: EventID) -> None:
        if await user.needs_relaybot(self):
            await self._send_state_change_message("join", user, event_id)
            return

        if self.peer_type == "channel" and not user.is_bot:
            await user.client(JoinChannelRequest(channel=await self.get_input_entity(user)))
        else:
            # We'll just assume the user is already in the chat.
            pass

    @staticmethod
    def hash_user_id(val: UserID) -> int:
        """
        A simple Matrix user ID hashing algorithm that matches what Element does.

        Args:
            val: the Matrix user ID.

        Returns:
            A 32-bit hash of the user ID.
        """
        out = 0
        for char in val:
            out = (out << 5) - out + ord(char)
            # Emulate JS's 32-bit signed bitwise OR `hash |= 0`
            out = (out & 2**31 - 1) - (out & 2**31)
        return abs(out)

    def _get_distinguisher(self, user_id: UserID) -> str:
        ruds = self.get_config("relay_user_distinguishers") or []
        return ruds[self.hash_user_id(user_id) % len(ruds)] if ruds else ""

    async def _apply_msg_format(self, sender: u.User, content: MessageEventContent) -> None:
        if isinstance(content, TextMessageEventContent):
            content.ensure_has_html()
        else:
            content.format = Format.HTML
            content.formatted_body = escape_html(content.body).replace("\n", "<br/>")

        tpl = (
            self.get_config(f"message_formats.[{content.msgtype.value}]")
            or "<b>$sender_displayname</b>: $message"
        )
        displayname = await self.get_displayname(sender)
        tpl_args = dict(
            sender_mxid=sender.mxid,
            sender_username=sender.mxid_localpart,
            sender_displayname=escape_html(displayname),
            message=content.formatted_body,
            body=content.body,
            formatted_body=content.formatted_body,
            distinguisher=self._get_distinguisher(sender.mxid),
        )
        content.formatted_body = Template(tpl).safe_substitute(tpl_args)

    async def _apply_emote_format(self, sender: u.User, content: TextMessageEventContent) -> None:
        content.ensure_has_html()

        tpl = self.get_config("emote_format")
        puppet = await p.Puppet.get_by_tgid(sender.tgid)
        content.formatted_body = Template(tpl).safe_substitute(
            dict(
                sender_mxid=sender.mxid,
                sender_username=sender.mxid_localpart,
                sender_displayname=escape_html(await self.get_displayname(sender)),
                mention=f"<a href='https://matrix.to/#/{puppet.mxid}'>{puppet.displayname}</a>",
                username=sender.tg_username,
                displayname=puppet.displayname,
                body=content.body,
                formatted_body=content.formatted_body,
            )
        )
        content.msgtype = MessageType.TEXT

    async def _pre_process_matrix_message(
        self, sender: u.User, use_relaybot: bool, content: MessageEventContent
    ) -> None:
        if use_relaybot:
            await self._apply_msg_format(sender, content)
        elif content.msgtype == MessageType.EMOTE:
            await self._apply_emote_format(sender, content)

    async def _handle_matrix_text(
        self,
        sender: u.User,
        logged_in: bool,
        event_id: EventID,
        space: TelegramID,
        client: MautrixTelegramClient,
        content: TextMessageEventContent,
        reply_to: TelegramID | None,
    ) -> None:
        message, entities = await formatter.matrix_to_telegram(
            client, text=content.body, html=content.formatted(Format.HTML)
        )
        sender_id = sender.tgid if logged_in else self.bot.tgid
        async with self.send_lock(sender_id):
            lp = self.get_config("telegram_link_preview")
            if content.get_edit():
                orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
                if orig_msg:
                    resp = await client.edit_message(
                        self.peer,
                        orig_msg.tgid,
                        message,
                        formatting_entities=entities,
                        link_preview=lp,
                    )
                    await self._mark_matrix_handled(
                        sender=sender,
                        sender_tgid=sender_id,
                        event_type=EventType.ROOM_MESSAGE,
                        event_id=event_id,
                        space=space,
                        edit_index=-1,
                        response=resp,
                        msgtype=content.msgtype,
                    )
                    return
            response = await client.send_message(
                self.peer,
                message,
                reply_to=reply_to,
                formatting_entities=entities,
                link_preview=lp,
            )
            await self._mark_matrix_handled(
                sender=sender,
                sender_tgid=sender_id,
                event_type=EventType.ROOM_MESSAGE,
                event_id=event_id,
                space=space,
                edit_index=0,
                response=response,
                msgtype=content.msgtype,
            )

    async def _handle_matrix_file(
        self,
        sender: u.User,
        logged_in: bool,
        event_id: EventID,
        space: TelegramID,
        client: MautrixTelegramClient,
        content: MediaMessageEventContent,
        reply_to: TelegramID,
        file_name: str,
        caption: TextMessageEventContent = None,
    ) -> None:
        sender_id = sender.tgid if logged_in else self.bot.tgid
        mime = content.info.mimetype
        if isinstance(content.info, (ImageInfo, VideoInfo)):
            w, h = content.info.width, content.info.height
        else:
            w = h = None
        max_image_size = self.config["bridge.image_as_file_size"] * 1000**2
        max_image_pixels = self.config["bridge.image_as_file_pixels"]

        attributes = []
        if self.config["bridge.parallel_file_transfer"] and content.url:
            file_handle, file_size = await util.parallel_transfer_to_telegram(
                client, self.main_intent, content.url, sender_id
            )
        else:
            if content.file:
                if not decrypt_attachment:
                    raise BridgingError(
                        f"Can't bridge encrypted media event {event_id}: "
                        "encryption dependencies not installed"
                    )
                file = await self.main_intent.download_media(content.file.url)
                file = decrypt_attachment(
                    file, content.file.key.key, content.file.hashes.get("sha256"), content.file.iv
                )
            else:
                file = await self.main_intent.download_media(content.url)

            if content.msgtype == MessageType.STICKER:
                if mime == "image/gif":
                    # Remove sticker description
                    file_name = "sticker.gif"
                else:
                    if mime not in ("video/webm", "application/x-tgsticker"):
                        mime, file, w, h = util.convert_image(
                            file, source_mime=mime, target_type="webp"
                        )
                    attributes.append(
                        DocumentAttributeSticker(
                            alt=content.body, stickerset=InputStickerSetEmpty()
                        )
                    )

            file_handle = await client.upload_file(file)
            file_size = len(file)

        file_handle.name = file_name
        force_document = file_size >= max_image_size
        attributes.append(DocumentAttributeFilename(file_name=file_name))

        if content.msgtype == MessageType.VIDEO:
            attributes.append(
                DocumentAttributeVideo(
                    duration=int(content.info.duration // 1000 if content.info.duration else 0),
                    w=w or 0,
                    h=h or 0,
                )
            )
        elif content.msgtype == MessageType.AUDIO:
            waveform = content.get("org.matrix.msc1767.audio", {}).get("waveform", [])
            if waveform:
                waveform_max = max(waveform)
                waveform = [round(part / max(waveform_max / 32, 1)) for part in waveform]
            attributes.append(
                DocumentAttributeAudio(
                    duration=int(content.info.duration // 1000 if content.info.duration else 0),
                    voice="org.matrix.msc3245.voice" in content,
                    waveform=encode_waveform(waveform) if waveform else None,
                )
            )
        elif w and h:
            attributes.append(DocumentAttributeImageSize(w, h))
            force_document = force_document or w * h >= max_image_pixels

        if "fi.mau.telegram.force_document" in content:
            force_document = bool(content["fi.mau.telegram.force_document"])

        if (mime == "image/png" or mime == "image/jpeg") and not force_document:
            media = InputMediaUploadedPhoto(file_handle)
        else:
            media = InputMediaUploadedDocument(
                file=file_handle,
                attributes=attributes,
                mime_type=mime or "application/octet-stream",
            )

        capt, entities = (
            await formatter.matrix_to_telegram(
                client, text=caption.body, html=caption.formatted(Format.HTML)
            )
            if caption
            else (None, None)
        )

        async with self.send_lock(sender_id):
            if await self._matrix_document_edit(
                sender, sender_id, client, content, space, capt, entities, media, event_id
            ):
                return
            try:
                try:
                    response = await client.send_media(
                        self.peer, media, reply_to=reply_to, caption=capt, entities=entities
                    )
                except (
                    PhotoInvalidDimensionsError,
                    PhotoSaveFileInvalidError,
                    PhotoExtInvalidError,
                ):
                    media = InputMediaUploadedDocument(
                        file=media.file, mime_type=mime, attributes=attributes
                    )
                    response = await client.send_media(
                        self.peer, media, reply_to=reply_to, caption=capt, entities=entities
                    )
            except Exception:
                raise
            else:
                await self._mark_matrix_handled(
                    sender=sender,
                    sender_tgid=sender_id,
                    event_type=EventType.ROOM_MESSAGE,
                    event_id=event_id,
                    space=space,
                    edit_index=0,
                    response=response,
                    msgtype=content.msgtype,
                )

    async def _matrix_document_edit(
        self,
        sender: u.User,
        sender_tgid: TelegramID,
        client: MautrixTelegramClient,
        content: MessageEventContent,
        space: TelegramID,
        caption: str,
        caption_entities,
        media: Any,
        event_id: EventID,
    ) -> bool:
        if content.get_edit():
            orig_msg = await DBMessage.get_by_mxid(content.get_edit(), self.mxid, space)
            if orig_msg:
                response = await client.edit_message(
                    self.peer,
                    orig_msg.tgid,
                    caption,
                    formatting_entities=caption_entities,
                    file=media,
                )
                await self._mark_matrix_handled(
                    sender=sender,
                    sender_tgid=sender_tgid,
                    event_type=EventType.ROOM_MESSAGE,
                    event_id=event_id,
                    space=space,
                    edit_index=-1,
                    response=response,
                    msgtype=content.msgtype,
                )
                return True
        return False

    async def _handle_matrix_location(
        self,
        sender: u.User,
        logged_in: bool,
        event_id: EventID,
        space: TelegramID,
        client: MautrixTelegramClient,
        content: LocationMessageEventContent,
        reply_to: TelegramID,
    ) -> None:
        sender_id = sender.tgid if logged_in else self.bot.tgid
        try:
            lat, long = content.geo_uri[len("geo:") :].split(";")[0].split(",")
            lat, long = float(lat), float(long)
        except (KeyError, ValueError):
            self.log.exception("Failed to parse location")
            return None
        try:
            caption = content["org.matrix.msc3488.location"]["description"]
            entities = []
        except KeyError:
            caption, entities = await formatter.matrix_to_telegram(client, text=content.body)
        media = MessageMediaGeo(geo=GeoPoint(lat=lat, long=long, access_hash=0))

        async with self.send_lock(sender_id):
            if await self._matrix_document_edit(
                sender, sender_id, client, content, space, caption, entities, media, event_id
            ):
                return
            try:
                response = await client.send_media(
                    self.peer, media, reply_to=reply_to, caption=caption, entities=entities
                )
            except Exception:
                raise
            else:
                await self._mark_matrix_handled(
                    sender=sender,
                    sender_tgid=sender_id,
                    event_type=EventType.ROOM_MESSAGE,
                    event_id=event_id,
                    space=space,
                    edit_index=0,
                    response=response,
                    msgtype=content.msgtype,
                )

    async def _mark_matrix_handled(
        self,
        sender: u.User,
        sender_tgid: TelegramID,
        event_type: EventType,
        event_id: EventID,
        space: TelegramID,
        edit_index: int,
        response: TypeMessage,
        msgtype: MessageType | None = None,
    ) -> None:
        self.log.trace("Raw event handling response for %s: %s", event_id, response)
        event_hash, _ = self.dedup.check(response, (event_id, space), force_hash=edit_index != 0)
        if edit_index < 0:
            prev_edit = await DBMessage.get_one_by_tgid(TelegramID(response.id), space, -1)
            edit_index = prev_edit.edit_index + 1
        await DBMessage(
            tgid=TelegramID(response.id),
            tg_space=space,
            mx_room=self.mxid,
            mxid=event_id,
            edit_index=edit_index,
            content_hash=event_hash,
            sender_mxid=sender.mxid,
            sender=sender_tgid,
        ).insert()
        sender.send_remote_checkpoint(
            MessageSendCheckpointStatus.SUCCESS,
            event_id,
            self.mxid,
            event_type,
            message_type=msgtype,
        )
        await self._send_delivery_receipt(event_id)
        background_task.create(self._send_message_status(event_id, err=None))
        if response.ttl_period:
            await self._mark_disappearing(
                event_id=event_id,
                seconds=response.ttl_period,
                expires_at=int(response.date.timestamp()) + response.ttl_period,
            )
        self.log.debug(
            f"Handled Matrix message {event_id} -> {response.id} (edit index {edit_index})"
        )

    @staticmethod
    def _error_to_human_message(err: Exception) -> str | None:
        if isinstance(err, YouBlockedUserError):
            return "You blocked this user"
        elif isinstance(err, UserIsBlockedError):
            return "You were blocked by this user"
        elif isinstance(err, UserBannedInChannelError):
            return "You're banned from sending messages in supergroups/channels"
        elif isinstance(err, InputUserDeactivatedError):
            return "This user was deleted"
        elif isinstance(err, ChatAdminRequiredError):
            return "Only admins can do that"
        elif isinstance(err, (ChatRestrictedError, ChatWriteForbiddenError)):
            return "You can't send messages in this chat"
        elif isinstance(err, SlowModeWaitError):
            return f"Slow mode enabled, wait {format_duration(err.seconds)} before sending"
        elif isinstance(err, MessageEmptyError):
            return "Message is empty"
        elif isinstance(err, MessageTooLongError):
            return "Message is too long"
        elif isinstance(err, EntitiesTooLongError):
            return "Message has too many formatting entities"
        elif isinstance(err, EntityBoundsInvalidError):
            return "Message formatting entities are malformed"
        elif isinstance(err, EntityMentionUserInvalidError):
            return "You mentioned an invalid user"
        return None

    async def _send_message_status(self, event_id: EventID, err: Exception | None) -> None:
        if not self.config["bridge.message_status_events"]:
            return
        intent = self.az.intent if self.encrypted else self.main_intent
        status = BeeperMessageStatusEventContent(
            network=self.bridge_info_state_key,
            relates_to=RelatesTo(
                rel_type=RelationType.REFERENCE,
                event_id=event_id,
            ),
        )
        if isinstance(err, IgnoredMessageError):
            status.reason = MessageStatusReason.UNSUPPORTED
            status.error = str(err)
            status.status = MessageStatus.FAIL
        elif err:
            status.reason = MessageStatusReason.GENERIC_ERROR
            status.error = f"{type(err).__name__}: {err}"
            status.status = MessageStatus.RETRIABLE
            status.message = self._error_to_human_message(err)
        else:
            status.status = MessageStatus.SUCCESS

        await intent.send_message_event(
            room_id=self.mxid,
            event_type=EventType.BEEPER_MESSAGE_STATUS,
            content=status,
        )

    async def _send_bridge_error(
        self,
        sender: u.User,
        err: Exception,
        event_id: EventID,
        event_type: EventType,
        message_type: MessageType | None = None,
        msg: str | None = None,
    ) -> None:
        sender.send_remote_checkpoint(
            MessageSendCheckpointStatus.PERM_FAILURE,
            event_id,
            self.mxid,
            event_type,
            message_type=message_type,
            error=err,
        )

        if msg and self.config["bridge.delivery_error_reports"]:
            if not isinstance(err, MessageNotModifiedError):
                await self._send_message(
                    self.main_intent, TextMessageEventContent(msgtype=MessageType.NOTICE, body=msg)
                )
        await self._send_message_status(event_id, err)

    async def handle_matrix_message(
        self, sender: u.User, content: MessageEventContent, event_id: EventID
    ) -> None:
        try:
            await self._handle_matrix_message(sender, content, event_id)
        except RPCError as e:
            self.log.exception(f"RPCError while bridging {event_id}: {e}")
            await self._send_bridge_error(
                sender,
                e,
                event_id,
                EventType.ROOM_MESSAGE,
                message_type=content.msgtype,
                msg=f"\u26a0 Your message may not have been bridged: {e}",
            )
        except Exception as e:
            if isinstance(e, IgnoredMessageError):
                self.log.debug(f"Ignored {event_id}: {e}")
            else:
                self.log.exception(f"Failed to bridge {event_id}")
            await self._send_bridge_error(
                sender,
                e,
                event_id,
                EventType.ROOM_MESSAGE,
                message_type=content.msgtype,
            )

    async def _find_source_msg(
        self, sender: u.User, content: MessageEventContent
    ) -> DBMessage | None:
        try:
            source = content["fi.mau.telegram.source"]
        except KeyError:
            return None
        if not isinstance(source, dict):
            return None
        try:
            msg_id = source["id"]
            space = source["space"]
            chat_id = source["chat_id"]
            peer_type = source["peer_type"]
        except KeyError:
            return None
        if (
            not isinstance(msg_id, int)
            or not isinstance(chat_id, int)
            or not isinstance(space, int)
            or not isinstance(peer_type, str)
        ):
            return None
        elif await sender.needs_relaybot(self):
            return None
        if peer_type == "user" and space != sender.tgid:
            return
        dbm = await DBMessage.get_one_by_tgid(TelegramID(msg_id), TelegramID(space))
        if dbm and peer_type == "chat" and space != sender.tgid:
            dbm = DBMessage.get_by_mxid(dbm.mxid, dbm.mx_room, sender.tgid)
        return dbm

    async def _handle_matrix_forward(
        self,
        sender: u.User,
        msg: DBMessage,
        event_id: EventID,
        space: TelegramID,
        msgtype: MessageType,
    ) -> bool:
        source_portal = await Portal.get_by_mxid(msg.mx_room)
        if not source_portal:
            return False
        async with self.send_lock(sender.tgid):
            try:
                response = await sender.client.forward_messages(
                    self.peer,
                    messages=[msg.tgid],
                    from_peer=source_portal.peer,
                )
            except Exception as e:
                self.log.warning(
                    f"Failed to send {event_id} from {sender.mxid} as forward of {msg.tgid} "
                    f"from {source_portal.tgid}: {e}, falling back to normal message handling"
                )
                return False
            else:
                await self._mark_matrix_handled(
                    sender=sender,
                    sender_tgid=sender.tgid,
                    event_type=EventType.ROOM_MESSAGE,
                    event_id=event_id,
                    space=space,
                    edit_index=0,
                    response=response[0],
                    msgtype=msgtype,
                )
                return True

    async def _handle_matrix_message(
        self, sender: u.User, content: MessageEventContent, event_id: EventID
    ) -> None:
        if not content.msgtype:
            raise IgnoredMessageError("Message doesn't have a msgtype")
        elif not content.body:
            raise IgnoredMessageError("Message doesn't have a body")

        logged_in = not await sender.needs_relaybot(self)
        client = sender.client if logged_in else self.bot.client
        space = (
            self.tgid
            if self.peer_type == "channel"  # Channels have their own ID space
            else (sender.tgid if logged_in else self.bot.tgid)
        )
        source_msg = await self._find_source_msg(sender, content)
        if source_msg and await self._handle_matrix_forward(
            sender, source_msg, event_id, space, content.msgtype
        ):
            return
        reply_to = await formatter.matrix_reply_to_telegram(content, space, room_id=self.mxid)

        media = (
            MessageType.STICKER,
            MessageType.IMAGE,
            MessageType.FILE,
            MessageType.AUDIO,
            MessageType.VIDEO,
        )

        if content.msgtype == MessageType.NOTICE:
            bridge_notices = self.get_config("bridge_notices.default")
            excepted = sender.mxid in self.get_config("bridge_notices.exceptions")
            if not bridge_notices and not excepted:
                raise IgnoredMessageError("Notices are not configured to be bridged.")

        if content.msgtype in (MessageType.TEXT, MessageType.EMOTE, MessageType.NOTICE):
            await self._pre_process_matrix_message(sender, not logged_in, content)
            await self._handle_matrix_text(
                sender, logged_in, event_id, space, client, content, reply_to
            )
        elif content.msgtype == MessageType.LOCATION:
            await self._pre_process_matrix_message(sender, not logged_in, content)
            await self._handle_matrix_location(
                sender, logged_in, event_id, space, client, content, reply_to
            )
        elif content.msgtype in media:
            file_name = content.body
            try:
                caption_content: TextMessageEventContent | None = sender.command_status["caption"]
                reply_to = reply_to or await formatter.matrix_reply_to_telegram(
                    caption_content, space, room_id=self.mxid
                )
                sender.command_status = None
            except (KeyError, TypeError):
                if not logged_in or (
                    "filename" in content and content["filename"] != content.body
                ):
                    if "filename" in content:
                        file_name = content["filename"]
                    caption_content = TextMessageEventContent(
                        msgtype=MessageType.TEXT,
                        body=content.body,
                    )
                    if (
                        "formatted_body" in content
                        and str(content.get("format")) == Format.HTML.value
                    ):
                        caption_content["formatted_body"] = content["formatted_body"]
                        caption_content["format"] = Format.HTML
                else:
                    caption_content = None
            if caption_content:
                caption_content.msgtype = content.msgtype
                await self._pre_process_matrix_message(sender, not logged_in, caption_content)
            await self._handle_matrix_file(
                sender,
                logged_in,
                event_id,
                space,
                client,
                content,
                reply_to,
                file_name,
                caption_content,
            )
        else:
            self.log.debug(
                f"Didn't handle Matrix event {event_id} due to unknown msgtype {content.msgtype}"
            )
            self.log.trace("Unhandled Matrix event content: %s", content)
            raise IgnoredMessageError(f"Unhandled msgtype {content.msgtype}")

    async def handle_matrix_unpin_all(self, sender: u.User, pin_event_id: EventID) -> None:
        await sender.client(UnpinAllMessagesRequest(peer=self.peer))
        await self._send_delivery_receipt(pin_event_id)

    async def handle_matrix_pin(
        self, sender: u.User, changes: dict[EventID, bool], pin_event_id: EventID
    ) -> None:
        tg_space = self.tgid if self.peer_type == "channel" else sender.tgid
        ids = {
            msg.mxid: msg.tgid
            for msg in await DBMessage.get_by_mxids(
                list(changes.keys()), mx_room=self.mxid, tg_space=tg_space
            )
        }
        for event_id, pinned in changes.items():
            try:
                await sender.client(
                    UpdatePinnedMessageRequest(peer=self.peer, id=ids[event_id], unpin=not pinned)
                )
            except (ChatNotModifiedError, MessageIdInvalidError, KeyError):
                pass
        await self._send_delivery_receipt(pin_event_id)

    async def handle_matrix_deletion(
        self, deleter: u.User, event_id: EventID, redaction_event_id: EventID
    ) -> None:
        try:
            await self._handle_matrix_deletion(deleter, event_id)
        except IgnoredMessageError as e:
            self.log.debug(str(e))
            await self._send_bridge_error(deleter, e, redaction_event_id, EventType.ROOM_REDACTION)
        except Exception as e:
            self.log.exception(f"Failed to bridge redaction by {deleter.mxid}")
            await self._send_bridge_error(deleter, e, redaction_event_id, EventType.ROOM_REDACTION)
        else:
            deleter.send_remote_checkpoint(
                MessageSendCheckpointStatus.SUCCESS,
                redaction_event_id,
                self.mxid,
                EventType.ROOM_REDACTION,
            )
            await self._send_delivery_receipt(redaction_event_id)
            background_task.create(self._send_message_status(redaction_event_id, err=None))

    async def _handle_matrix_reaction_deletion(
        self, deleter: u.User, event_id: EventID, tg_space: TelegramID
    ) -> None:
        reaction = await DBReaction.get_by_mxid(event_id, self.mxid)
        if not reaction:
            raise IgnoredMessageError(f"Ignoring Matrix redaction of unknown event {event_id}")
        elif reaction.tg_sender != deleter.tgid:
            raise IgnoredMessageError(f"Ignoring Matrix redaction of reaction by another user")
        msg = await DBMessage.get_by_mxid(reaction.msg_mxid, reaction.mx_room, tg_space)
        if not msg or msg.redacted:
            raise IgnoredMessageError(
                f"Ignoring Matrix redaction of reaction to unknown event {reaction.msg_mxid}"
            )
        async with self.reaction_lock(msg.mxid):
            await reaction.delete()
            new_reactions = None
            if await deleter.get_max_reactions() > 1:
                new_reactions = [
                    react.telegram
                    for react in await DBReaction.get_by_sender(
                        msg.mxid, msg.mx_room, deleter.tgid
                    )
                ] or None
            await deleter.client(
                SendReactionRequest(peer=self.peer, msg_id=msg.tgid, reaction=new_reactions)
            )
            self.log.debug(
                f"Handled Matrix deletion of reaction {event_id} to {msg.tgid} "
                f"(new reaction count: {len(new_reactions) if new_reactions else 0})"
            )

    async def _handle_matrix_deletion(self, deleter: u.User, event_id: EventID) -> None:
        real_deleter = deleter if not await deleter.needs_relaybot(self) else self.bot
        tg_space = self.tgid if self.peer_type == "channel" else real_deleter.tgid
        message = await DBMessage.get_by_mxid(event_id, self.mxid, tg_space)
        if not message:
            await self._handle_matrix_reaction_deletion(real_deleter, event_id, tg_space)
        elif message.redacted:
            raise IgnoredMessageError(
                "Ignoring Matrix redaction of already redacted event "
                f"{message.mxid} in {message.mx_room}"
            )
        elif message.edit_index != 0:
            await message.mark_redacted()
            raise IgnoredMessageError(
                f"Ignoring Matrix redaction of edit event {message.mxid} in {message.mx_room}"
            )
        else:
            await message.mark_redacted()
            await real_deleter.client.delete_messages(self.peer, [message.tgid])
            self.log.debug(f"Handled Matrix redaction of {event_id} / {message.tgid}")

    async def handle_matrix_reaction(
        self, user: u.User, target_event_id: EventID, emoji: str, reaction_event_id: EventID
    ) -> None:
        emoji_id = emoji
        reaction = ReactionEmoji(emoticon=variation_selector.remove(emoji))
        if emoji.startswith("mxc://"):
            db_reaction = await DBTelegramFile.find_by_mxc(ContentURI(emoji))
            if not db_reaction or not db_reaction.id.isdecimal():
                self.log.debug(f"Dropping unknown reaction {emoji} by {user.mxid}")
                if not self.has_bot:
                    await self.main_intent.redact(
                        self.mxid, reaction_event_id, reason="Unrecognized custom emoji"
                    )
                await self._send_bridge_error(
                    user,
                    Exception("Unrecognized custom emoji"),
                    reaction_event_id,
                    EventType.REACTION,
                )
                return
            reaction = ReactionCustomEmoji(document_id=int(db_reaction.id))
            emoji_id = db_reaction.id
        elif (
            self.config["bridge.always_custom_emoji_reaction"]
            or reaction.emoticon not in await user.get_available_reactions()
        ):
            try:
                doc_id = util.unicode_custom_emoji_map[reaction.emoticon]
            except KeyError:
                pass
            else:
                self.log.trace(
                    f"Using custom reaction {doc_id} instead of unicode {reaction.emoticon} "
                    f"for {user.mxid}'s reaction"
                )
                reaction = ReactionCustomEmoji(document_id=doc_id)
                emoji_id = str(doc_id)
        try:
            async with self.reaction_lock(target_event_id):
                await self._handle_matrix_reaction(
                    user, target_event_id, emoji_id, reaction, reaction_event_id
                )
        except IgnoredMessageError as e:
            self.log.debug(str(e))
            await self._send_bridge_error(user, e, reaction_event_id, EventType.REACTION)
        except ReactionInvalidError as e:
            # Don't redact reactions in relaybot chats, there are usually other Matrix users too.
            if not self.has_bot:
                await self.main_intent.redact(
                    self.mxid, reaction_event_id, reason="Emoji not allowed"
                )
            self.log.debug(f"Failed to bridge reaction by {user.mxid}: emoji not allowed")
            await self._send_bridge_error(user, e, reaction_event_id, EventType.REACTION)
        except Exception as e:
            self.log.exception(f"Failed to bridge reaction by {user.mxid}")
            await self._send_bridge_error(user, e, reaction_event_id, EventType.REACTION)
        else:
            user.send_remote_checkpoint(
                MessageSendCheckpointStatus.SUCCESS,
                reaction_event_id,
                self.mxid,
                EventType.REACTION,
            )
            await self._send_delivery_receipt(reaction_event_id)
            background_task.create(self._send_message_status(reaction_event_id, err=None))

    async def _handle_matrix_reaction(
        self,
        user: u.User,
        target_event_id: EventID,
        emoji_id: str,
        reaction: TypeReaction,
        reaction_event_id: EventID,
    ) -> None:
        tg_space = self.tgid if self.peer_type == "channel" else user.tgid
        msg = await DBMessage.get_by_mxid(target_event_id, self.mxid, tg_space)
        if not msg:
            raise IgnoredMessageError(
                f"Ignoring Matrix reaction to unknown event {target_event_id}"
            )
        elif msg.redacted:
            raise IgnoredMessageError(
                f"Ignoring Matrix reaction to redacted event {target_event_id}"
            )
        elif msg.edit_index != 0:
            raise IgnoredMessageError(f"Ignoring Matrix reaction to edit event {target_event_id}")

        existing_reacts = await DBReaction.get_by_sender(msg.mxid, msg.mx_room, user.tgid)
        new_tg_reactions: list[TypeReaction] = []
        reactions_to_remove: list[DBReaction] = []
        max_reactions = await user.get_max_reactions()
        max_reactions -= 1  # Leave one reaction of space for the new reaction
        for db_reaction in existing_reacts:
            if db_reaction.reaction == emoji_id:
                raise IgnoredMessageError("Ignoring duplicate Matrix reaction")
            if len(new_tg_reactions) < max_reactions:
                new_tg_reactions.append(db_reaction.telegram)
            else:
                reactions_to_remove.append(db_reaction)
        new_tg_reactions.append(reaction)

        await user.client(
            SendReactionRequest(peer=self.peer, msg_id=msg.tgid, reaction=new_tg_reactions)
        )
        puppet = await user.get_puppet()
        removed = 0
        for db_reaction in reactions_to_remove:
            removed += 1
            await db_reaction.delete()
            await puppet.intent_for(self).redact(db_reaction.mx_room, db_reaction.mxid)
        self.log.debug(
            f"Handled Matrix reaction {reaction_event_id} to {msg.tgid} "
            f"(new reaction count: {len(new_tg_reactions)}, removed {removed} old reactions)"
        )
        await DBReaction(
            mxid=reaction_event_id,
            mx_room=self.mxid,
            msg_mxid=msg.mxid,
            tg_sender=user.tgid,
            reaction=emoji_id,
        ).save()

    async def _update_telegram_power_level(
        self, sender: u.User, user_id: TelegramID, level: int
    ) -> None:
        moderator = level >= 50
        admin = level >= 75
        await sender.client.edit_admin(
            self.peer,
            user_id,
            change_info=moderator,
            post_messages=moderator,
            edit_messages=moderator,
            delete_messages=moderator,
            ban_users=moderator,
            invite_users=moderator,
            pin_messages=moderator,
            add_admins=admin,
        )

    async def handle_matrix_power_levels(
        self,
        sender: u.User,
        new_users: dict[UserID, int],
        old_users: dict[UserID, int],
        event_id: EventID | None,
    ) -> None:
        # TODO handle all power level changes and bridge exact admin rights to supergroups/channels
        for user, level in new_users.items():
            if not user or user == self.main_intent.mxid or user == sender.mxid:
                continue
            user_id = p.Puppet.get_id_from_mxid(user)
            if not user_id:
                mx_user = await u.User.get_by_mxid(user, create=False)
                if not mx_user or not mx_user.tgid:
                    continue
                user_id = mx_user.tgid
            if not user_id or user_id == sender.tgid:
                continue
            if user not in old_users or level != old_users[user]:
                await self._update_telegram_power_level(sender, user_id, level)

    async def handle_matrix_about(self, sender: u.User, about: str, event_id: EventID) -> None:
        if self.peer_type not in ("chat", "channel"):
            return
        peer = await self.get_input_entity(sender)
        await sender.client(EditChatAboutRequest(peer=peer, about=about))
        self.about = about
        await self.save()
        await self._send_delivery_receipt(event_id)

    async def handle_matrix_title(self, sender: u.User, title: str, event_id: EventID) -> None:
        if self.peer_type not in ("chat", "channel"):
            return

        if self.peer_type == "chat":
            response = await sender.client(EditChatTitleRequest(chat_id=self.tgid, title=title))
        else:
            channel = await self.get_input_entity(sender)
            response = await sender.client(EditTitleRequest(channel=channel, title=title))
        self.dedup.register_outgoing_actions(response)
        self.title = title
        await self.save()
        await self._send_delivery_receipt(event_id)
        await self.update_bridge_info()

    async def handle_matrix_avatar(
        self, sender: u.User, url: ContentURI, event_id: EventID
    ) -> None:
        if self.peer_type not in ("chat", "channel"):
            # Invalid peer type
            return
        elif self.avatar_url == url:
            return

        self.avatar_url = url
        file = await self.main_intent.download_media(url)
        mime = magic.mimetype(file)
        ext = sane_mimetypes.guess_extension(mime)
        uploaded = await sender.client.upload_file(file, file_name=f"avatar{ext}")
        photo = InputChatUploadedPhoto(file=uploaded)

        if self.peer_type == "chat":
            response = await sender.client(EditChatPhotoRequest(chat_id=self.tgid, photo=photo))
        else:
            channel = await self.get_input_entity(sender)
            response = await sender.client(EditPhotoRequest(channel=channel, photo=photo))
        self.dedup.register_outgoing_actions(response)
        for update in response.updates:
            is_photo_update = (
                isinstance(update, UpdateNewMessage)
                and isinstance(update.message, MessageService)
                and isinstance(update.message.action, MessageActionChatEditPhoto)
            )
            if is_photo_update:
                loc, size = self._msg_conv.get_largest_photo_size(update.message.action.photo)
                self.photo_id = str(loc.id)
                await self.save()
                break
        await self._send_delivery_receipt(event_id)
        await self.update_bridge_info()

    async def handle_matrix_upgrade(
        self, sender: UserID, new_room: RoomID, event_id: EventID
    ) -> None:
        _, server = self.main_intent.parse_user_id(sender)
        old_room = self.mxid
        await self.migrate_and_save_matrix(new_room)
        await self.main_intent.join_room(new_room, servers=[server])
        entity: TypeChat | User | None = None
        user: au.AbstractUser | None = None
        if self.bot and self.has_bot:
            user = self.bot
            entity = await self.get_entity(self.bot)
        if not entity:
            user_mxids = await self.main_intent.get_room_members(self.mxid)
            for user_str in user_mxids:
                user_id = UserID(user_str)
                if user_id == self.az.bot_mxid:
                    continue
                user = await u.User.get_by_mxid(user_id, create=False)
                if user and user.tgid:
                    entity = await self.get_entity(user)
                    if entity:
                        break
        if not entity:
            self.log.error(
                "Failed to fully migrate to upgraded Matrix room: no Telegram user found."
            )
            return
        await self.update_matrix_room(user, entity)
        self.log.info(f"{sender} upgraded room from {old_room} to {self.mxid}")
        await self._send_delivery_receipt(event_id, room_id=old_room)

    async def migrate_and_save_matrix(self, new_id: RoomID) -> None:
        try:
            del self.by_mxid[self.mxid]
        except KeyError:
            pass
        self.mxid = new_id
        self.next_batch_id = None
        self.first_event_id = None
        self.by_mxid[self.mxid] = self
        await self.save()

    # endregion
    # region Telegram -> Matrix bridging

    async def handle_telegram_typing(self, user: p.Puppet, update: UpdateTyping) -> None:
        if user.is_real_user:
            # Ignore typing notifications from double puppeted users to avoid echoing
            return
        is_typing = isinstance(update.action, SendMessageTypingAction)
        await user.default_mxid_intent.set_typing(self.mxid, timeout=5000 if is_typing else 0)

    async def handle_telegram_edit(
        self, source: au.AbstractUser, sender: p.Puppet | None, evt: Message
    ) -> None:
        if not self.mxid:
            self.log.debug("Ignoring edit to %d as chat has no Matrix room", evt.id)
            return
        elif hasattr(evt, "media") and isinstance(evt.media, MessageMediaGame):
            self.log.debug("Ignoring game message edit event")
            return

        if self.peer_type != "channel" and isinstance(evt, Message) and evt.reactions is not None:
            background_task.create(
                self.try_handle_telegram_reactions(source, TelegramID(evt.id), evt.reactions)
            )
        sender_id = sender.tgid if sender else self.tgid

        async with self.send_lock(sender_id, required=False):
            tg_space = self.tgid if self.peer_type == "channel" else source.tgid

            temporary_identifier = EventID(
                f"${random.randint(1000000000000, 9999999999999)}TGBRIDGEDITEMP"
            )
            event_hash, duplicate_found = self.dedup.check(
                evt, (temporary_identifier, tg_space), force_hash=True
            )
            if duplicate_found:
                mxid, other_tg_space = duplicate_found
                if tg_space != other_tg_space:
                    prev_edit_msg = await DBMessage.get_one_by_tgid(
                        TelegramID(evt.id), tg_space, edit_index=-1
                    )
                    if (
                        not prev_edit_msg
                        or prev_edit_msg.mxid == mxid
                        or prev_edit_msg.content_hash == event_hash
                    ):
                        return
                    await DBMessage(
                        mxid=mxid,
                        mx_room=self.mxid,
                        tg_space=tg_space,
                        tgid=TelegramID(evt.id),
                        edit_index=prev_edit_msg.edit_index + 1,
                        content_hash=event_hash,
                        sender=sender_id,
                    ).insert()
                return

        editing_msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
        if not editing_msg:
            self.log.info(
                f"Didn't find edited message {evt.id}@{tg_space} (src {source.tgid}) "
                "in database."
            )
            return
        prev_edit_msg = (
            await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space, -1) or editing_msg
        )
        if prev_edit_msg.content_hash == event_hash:
            self.log.debug(
                f"Ignoring edit of message {evt.id}@{tg_space} (src {source.tgid}):"
                " content hash didn't change"
            )
            await DBMessage.delete_temp_mxid(temporary_identifier, self.mxid)
            return

        intent = sender.intent_for(self) if sender else self.main_intent
        is_bot = sender.is_bot if sender else False
        converted = await self._msg_conv.convert(
            source, intent, is_bot, self.is_channel, evt, no_reply_fallback=True
        )
        converted.content.set_edit(editing_msg.mxid)
        await intent.set_typing(self.mxid, timeout=0)
        timestamp = evt.edit_date if evt.edit_date != evt.date else None
        event_id = await self._send_message(
            intent, converted.content, timestamp=timestamp, event_type=converted.type
        )

        await DBMessage(
            mxid=event_id,
            mx_room=self.mxid,
            tg_space=tg_space,
            tgid=TelegramID(evt.id),
            edit_index=prev_edit_msg.edit_index + 1,
            content_hash=event_hash,
            sender=sender_id,
        ).insert()
        await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)

    @property
    def _backfill_config_type(self) -> str:
        if self.peer_type == "user":
            return "user"
        elif self.peer_type == "chat":
            return "normal_group"
        elif self.megagroup:
            return "supergroup"
        else:
            return "channel"

    @property
    def _default_max_batches(self) -> int:
        return self.config[f"bridge.backfill.incremental.max_batches.{self._backfill_config_type}"]

    @property
    def _enable_batch_sending(self) -> bool:
        return self.bridge.matrix.versions.supports("com.beeper.batch_sending")

    async def enqueue_backfill(
        self,
        source: u.User,
        priority: int,
        max_batches: int | None = None,
        messages_per_batch: int | None = None,
        anchor_msg_id: int | None = None,
        extra_data: dict[str, Any] | None = None,
        type: BackfillType = BackfillType.HISTORICAL,
    ) -> None:
        new_backfill = Backfill.new(
            user_mxid=source.mxid,
            priority=priority,
            type=type,
            portal_tgid=self.tgid,
            portal_tg_receiver=self.tg_receiver,
            anchor_msg_id=anchor_msg_id,
            extra_data=extra_data,
            messages_per_batch=(
                messages_per_batch or self.config["bridge.backfill.incremental.messages_per_batch"]
            ),
            post_batch_delay=self.config["bridge.backfill.incremental.post_batch_delay"],
            max_batches=max_batches or self._default_max_batches,
        )
        deleted_entries = await new_backfill.insert()
        if deleted_entries:
            self.log.debug(
                "Deleted backfill queue entries while inserting new item: %s", deleted_entries
            )
        source.wakeup_backfill_task.set()

    async def forward_backfill(
        self,
        source: u.User,
        initial: bool,
        last_tgid: int | None = None,
        override_limit: int | None = None,
        client: MautrixTelegramClient | None = None,
    ) -> str:
        if not client:
            client = source.client
        type = "initial" if initial else "sync"
        limit = (
            override_limit
            or self.config[f"bridge.backfill.forward_limits.{type}.{self._backfill_config_type}"]
        )
        if limit == 0:
            return "Limit is zero, not backfilling"
        timeout = self.config["bridge.backfill.forward_timeout"]
        with self.backfill_lock:
            task = self.backfill(
                source, client, forward=True, forward_limit=limit, last_tgid=last_tgid
            )
            if timeout > 0:
                output = await asyncio.wait_for(task, timeout=timeout)
            else:
                output = await task
            self.log.debug(f"Forward backfill complete, status: {output}")
            return output

    async def backfill(
        self,
        source: u.User,
        client: MautrixTelegramClient,
        req: Backfill | None = None,
        forward: bool = False,
        forward_limit: int | None = None,
        last_tgid: int | None = None,
    ) -> str:
        if not self.backfill_enable:
            return "Backfilling is disabled in the bridge config"
        async with self.backfill_method_lock:
            return await self._locked_backfill(
                source, client, req, forward, forward_limit, last_tgid
            )

    async def _locked_backfill(
        self,
        source: u.User,
        client: MautrixTelegramClient,
        req: Backfill | None = None,
        forward: bool = False,
        forward_limit: int | None = None,
        last_tgid: int | None = None,
    ) -> str:
        assert forward != bool(req)
        if not self.config["bridge.backfill.normal_groups"] and self.peer_type == "chat":
            return "Backfilling normal groups is disabled in the bridge config"
        tg_space = source.tgid if self.peer_type != "channel" else self.tgid
        if forward:
            last_in_room = await DBMessage.find_last(self.mxid, tg_space)
            min_id = last_in_room.tgid if last_in_room else 0
            if last_tgid is None:
                messages = await source.client.get_messages(self.peer, limit=1)
                if not messages:
                    return "Chat is empty, nothing to backfill"
                last_tgid = messages[0].id
            if last_tgid <= min_id or (last_tgid == 1 and self.peer_type == "channel"):
                return (
                    f"Last bridged message {min_id} is equal to or greater than last message "
                    f"in Telegram chat {last_tgid}, nothing to backfill"
                )
            limit = last_tgid - min_id
            if (forward_limit or 0) > 0:
                limit = min(limit, forward_limit)
            self.log.debug(
                f"Backfilling up to {limit} messages after ID {min_id} through {source.mxid} "
                f"(last message: {last_tgid})"
            )
            anchor_id = min_id
        else:
            limit = req.messages_per_batch
            first_in_room = await DBMessage.find_first(self.mxid, tg_space)
            anchor_id = first_in_room.tgid if first_in_room else None
            anchor_source = "lowest in chat"
            if req.anchor_msg_id and req.anchor_msg_id < anchor_id:
                anchor_source = "backfill queue anchor"
                anchor_id = req.anchor_msg_id
            self.log.debug(
                f"Backfilling up to {req.messages_per_batch} historical messages "
                f"before {anchor_id} ({anchor_source}) through {source.mxid}"
            )
        event_count, message_count, lowest_id = await self._backfill_messages(
            source, client, forward, anchor_id, limit
        )
        await self.save()
        if forward:
            self.log.debug(f"Forward backfill finished with {event_count}/{message_count} events")
        elif message_count > 0 and lowest_id and lowest_id > 1:
            if req.max_batches in (0, 1):
                self.log.debug(f"Backfilled enough through {source.mxid}, not enqueuing more")
                return "Already backfilled enough batches, not enqueuing more"
            self.log.debug(f"Enqueuing more backfill through {source.mxid}")
            await self.enqueue_backfill(
                source,
                priority=max(100, req.priority + 1),
                messages_per_batch=req.messages_per_batch,
                max_batches=-1 if req.max_batches < 0 else (req.max_batches - 1),
                anchor_msg_id=lowest_id,
            )
        else:
            self.log.debug("No more messages to backfill")
        return f"Backfilled {event_count} messages"

    async def _convert_batch_msg(
        self,
        source: u.User,
        client: MautrixTelegramClient,
        msg: Message,
    ) -> tuple[putil.ConvertedMessage, IntentAPI]:
        if msg.from_id and isinstance(msg.from_id, (PeerUser, PeerChannel)):
            sender = await p.Puppet.get_by_peer(msg.from_id)
        elif isinstance(msg.peer_id, PeerUser):
            if msg.out:
                sender = await p.Puppet.get_by_tgid(source.tgid)
            else:
                sender = await p.Puppet.get_by_peer(msg.peer_id)
        else:
            sender = None
        if sender:
            intent = sender.intent_for(self)
            if not sender.displayname:
                entity = await client.get_entity(sender.peer)
                await sender.update_info(source, entity, client_override=client)
        else:
            intent = self.main_intent
        if (
            intent.api.is_real_user
            and not intent.api.is_real_user_as_token
            and not self._enable_batch_sending
        ):
            intent = sender.default_mxid_intent
        is_bot = sender.is_bot if sender else False
        converted = await self._msg_conv.convert(
            source,
            intent,
            is_bot,
            self.is_channel,
            msg,
            client=client,
            deterministic_reply_id=self.bridge.homeserver_software.is_hungry,
        )
        return converted, intent

    async def _wrap_batch_msg(
        self,
        intent: IntentAPI,
        msg: Message,
        converted: putil.ConvertedMessage,
        caption: bool = False,
        event_id: EventID | None = None,
    ) -> BatchSendEvent:
        if caption:
            content = converted.caption
            event_type = EventType.ROOM_MESSAGE
        else:
            content = converted.content
            event_type = converted.type
        if self.encrypted and self.matrix.e2ee:
            event_type, content = await self.matrix.e2ee.encrypt(self.mxid, event_type, content)
        if intent.api.is_real_user:
            content[DOUBLE_PUPPET_SOURCE_KEY] = self.bridge.name
        return BatchSendEvent(
            sender=intent.mxid,
            timestamp=int(msg.date.timestamp() * 1000),
            content=content,
            type=event_type,
            event_id=event_id,
        )

    async def _backfill_messages(
        self,
        source: u.User,
        client: MautrixTelegramClient,
        forward: bool,
        anchor_id: int,
        limit: int,
    ) -> tuple[int, int, TelegramID]:
        entity = await self.get_input_entity(source)
        events = []
        intents = []
        metas = []
        tg_space = self.tgid if self.peer_type == "channel" else source.tgid

        lowest_id = 0
        first_id_found = False
        first_id = anchor_id
        message_count = 0
        minmax = {"min_id": anchor_id} if forward else {"max_id": anchor_id}
        if not forward and not anchor_id:
            anchor_id = 2**31 - 1
            minmax = {}
        self.log.debug(f"Iterating messages through {source.tgid} with {limit=}, {minmax}")
        delay_warn_handle = self.loop.call_later(
            5 * 60, lambda: self.log.warning("Iterating messages is taking long")
        )
        # Iterate messages newest to oldest and collect the results
        async for msg in client.iter_messages(entity, limit=limit, **minmax):
            message_count += 1
            if message_count == 1:
                self.log.debug(f"Backfill iter: got first message {msg.id}")
            elif message_count % 50 == 0:
                self.log.debug(f"Backfill iter: got {message_count} messages so far (at {msg.id})")
            if (forward and msg.id <= anchor_id) or (not forward and msg.id >= anchor_id):
                continue
            elif isinstance(msg, MessageService):
                # TODO some service messages can be backfilled
                continue
            if not lowest_id or msg.id < lowest_id:
                lowest_id = msg.id
            if not first_id_found:
                first_id = msg.id
                first_id_found = True

            converted, intent = await self._convert_batch_msg(source, client, msg)
            if converted is None:
                continue
            d_event_id = None
            if self.bridge.homeserver_software.is_hungry:
                d_event_id = self._msg_conv.deterministic_event_id(tg_space, msg.id)
            events.append(await self._wrap_batch_msg(intent, msg, converted, event_id=d_event_id))
            intents.append(intent)
            metas.append(msg)
            if converted.caption:
                events.append(await self._wrap_batch_msg(intent, msg, converted, caption=True))
                intents.append(intent)
                metas.append(None)
        delay_warn_handle.cancel()
        if len(events) == 0:
            self.log.debug(
                f"Didn't get any events to send out of {message_count} messages fetched "
                f"(first received ID: {first_id}, lowest: {lowest_id})"
            )
            return 0, message_count, lowest_id
        self.log.debug(
            f"Got {len(events)} events to send out of {message_count} messages fetched "
            f"(first received ID: {first_id}, lowest: {lowest_id})"
        )
        if self._enable_batch_sending:
            resp = await self.main_intent.beeper_batch_send(
                self.mxid,
                # We iterated the events in reverse chronological order,
                # so reverse them before sending
                events=list(reversed(events)),
                forward=forward,
            )
            event_ids = resp.event_ids
        else:
            event_ids = [
                await intent.send_message_event(
                    self.mxid, evt.type, evt.content, timestamp=evt.timestamp
                )
                for evt, intent in zip(reversed(events), reversed(intents))
            ]
        tg_space = source.tgid if self.peer_type != "channel" else self.tgid
        await DBMessage.bulk_insert(
            [
                DBMessage(
                    mxid=event_id,
                    mx_room=self.mxid,
                    tgid=msg.id,
                    tg_space=tg_space,
                    edit_index=0,
                    content_hash=self.dedup.hash_event(msg),
                    # TODO sender
                )
                # Original arrays are in reverse chronological order, but event IDs are
                # chronological (because we reversed the original messages list before sending)
                for event_id, msg in zip(event_ids, reversed(metas))
                if msg is not None
            ]
        )
        return len(events), message_count, lowest_id

    def _split_dm_reaction_counts(self, counts: list[ReactionCount]) -> list[MessagePeerReaction]:
        reactions = []
        for item in counts:
            if item.count == 2:
                reactions += [
                    MessagePeerReaction(
                        reaction=item.reaction, peer_id=PeerUser(self.tgid), date=None
                    ),
                    MessagePeerReaction(
                        reaction=item.reaction,
                        peer_id=PeerUser(self.tg_receiver),
                        date=None,
                    ),
                ]
            elif item.count == 1:
                reactions.append(
                    MessagePeerReaction(
                        reaction=item.reaction,
                        peer_id=PeerUser(self.tg_receiver if item.chosen_order else self.tgid),
                        date=None,
                    )
                )
        return reactions

    async def _poll_telegram_reactions(self, source: au.AbstractUser) -> None:
        now = time.monotonic()
        if self._prev_reaction_poll[source.mxid] + REACTION_POLL_MIN_INTERVAL > now:
            self.log.trace(
                f"Not polling reactions through {source.mxid}, "
                f"last poll was less than {REACTION_POLL_MIN_INTERVAL} seconds ago"
            )
            return
        self._prev_reaction_poll[source.mxid] = now
        self.log.debug(f"Polling reactions for recent messages through {source.mxid}")
        messages = await DBMessage.find_recent(self.mxid, source.tgid)
        message_ids = [message.tgid for message in messages]
        updates = await source.client(GetMessagesReactionsRequest(peer=self.peer, id=message_ids))
        for user in updates.users:
            user: User
            puppet = await p.Puppet.get_by_tgid(TelegramID(user.id))
            await puppet.update_info(source, user)
        for upd in updates.updates:
            if isinstance(upd, UpdateMessageReactions):
                await self.handle_telegram_reactions(source, TelegramID(upd.msg_id), upd.reactions)
            else:
                self.log.warning(f"Unexpected update type {type(upd)} in get reactions response")

    async def try_handle_telegram_reactions(
        self,
        source: au.AbstractUser,
        msg_id: TelegramID,
        data: MessageReactions,
        dbm: DBMessage | None = None,
        timestamp: datetime | None = None,
    ) -> None:
        try:
            await self.handle_telegram_reactions(source, msg_id, data, dbm, timestamp)
        except Exception:
            self.log.exception(f"Error handling reactions in message {msg_id}")

    async def handle_telegram_reactions(
        self,
        source: au.AbstractUser,
        msg_id: TelegramID,
        data: MessageReactions,
        dbm: DBMessage | None = None,
        timestamp: datetime | None = None,
    ) -> None:
        total_count = sum(item.count for item in data.results)
        recent_reactions = data.recent_reactions or []
        if total_count > 0 and not recent_reactions and not data.can_see_list:
            # We don't know who reacted in a channel, so we can't bridge it properly either
            return
        if self.peer_type == "channel" and not self.megagroup:
            # This should never happen with the previous if
            self.log.warning(f"Can see reaction list in channel ({data!s})")
            # return

        tg_space = self.tgid if self.peer_type == "channel" else source.tgid
        if dbm is None:
            dbm = await DBMessage.get_one_by_tgid(msg_id, tg_space)
            if dbm is None:
                return

        if not recent_reactions or len(recent_reactions) < total_count:
            if self.peer_type == "user":
                recent_reactions = self._split_dm_reaction_counts(data.results)
            elif source.is_bot:
                # Can't fetch exact reaction senders as a bot
                return
            else:
                # TODO should calls to this be limited?
                resp = await source.client(
                    GetMessageReactionsListRequest(peer=self.peer, id=dbm.tgid, limit=100)
                )
                recent_reactions = resp.reactions

        async with self.reaction_lock(dbm.mxid):
            await self._handle_telegram_user_reactions_locked(
                source, dbm, recent_reactions, total_count, timestamp=timestamp
            )

    async def handle_telegram_bot_reactions(
        self, source: au.AbstractUser, update: UpdateBotMessageReaction
    ) -> None:
        tg_space = self.tgid if self.peer_type == "channel" else source.tgid
        dbm = await DBMessage.get_one_by_tgid(TelegramID(update.msg_id), tg_space)
        if dbm is None:
            return
        reactions: dict[TelegramID, list[WrappedReaction]] = {}
        custom_emoji_ids: list[int] = []
        if isinstance(update.actor, PeerUser):
            user_id = TelegramID(update.actor.user_id)
        elif isinstance(update.actor, PeerChannel):
            user_id = TelegramID(update.actor.channel_id)
        else:
            return
        for reaction in update.new_reactions:
            reactions.setdefault(user_id, []).append(WrappedReaction(reaction=reaction, date=None))
        async with self.reaction_lock(dbm.mxid):
            await self._handle_telegram_parsed_reactions_locked(
                source,
                dbm,
                reactions,
                custom_emoji_ids,
                is_full=True,
                only_user_id=user_id,
                timestamp=update.date,
            )

    @staticmethod
    def _reactions_filter(lst: list[WrappedReaction], existing: DBReaction) -> bool:
        if not lst:
            return False
        for wrapped_reaction in lst:
            reaction = wrapped_reaction.reaction
            if isinstance(reaction, ReactionCustomEmoji) and existing.reaction == str(
                reaction.document_id
            ):
                lst.remove(wrapped_reaction)
                return True
            elif isinstance(reaction, ReactionEmoji) and existing.reaction == reaction.emoticon:
                lst.remove(wrapped_reaction)
                return True
        return False

    @staticmethod
    async def _get_reaction_limit(source: au.AbstractUser, sender: TelegramID) -> int:
        puppet = await p.Puppet.get_by_tgid(sender, create=False)
        is_premium = puppet and puppet.is_premium
        if isinstance(source, u.User) and not source.is_bot:
            return await source.get_max_reactions(is_premium)
        return 3 if is_premium else 1

    async def _handle_telegram_user_reactions_locked(
        self,
        source: au.AbstractUser,
        msg: DBMessage,
        reaction_list: list[MessagePeerReaction],
        total_count: int,
        timestamp: datetime | None = None,
    ) -> None:
        reactions: dict[TelegramID, list[WrappedReaction]] = {}
        custom_emoji_ids: list[int] = []
        for reaction in reaction_list:
            if isinstance(reaction.peer_id, (PeerUser, PeerChannel)) and isinstance(
                reaction.reaction, (ReactionEmoji, ReactionCustomEmoji)
            ):
                sender_user_id = p.Puppet.get_id_from_peer(reaction.peer_id)
                reactions.setdefault(sender_user_id, []).append(
                    WrappedReaction(reaction.reaction, reaction.date)
                )
                if isinstance(reaction.reaction, ReactionCustomEmoji):
                    custom_emoji_ids.append(reaction.reaction.document_id)
        is_full = len(reaction_list) == total_count
        await self._handle_telegram_parsed_reactions_locked(
            source,
            msg,
            reactions,
            custom_emoji_ids,
            is_full=is_full,
            timestamp=timestamp,
        )

    async def _handle_telegram_parsed_reactions_locked(
        self,
        source: au.AbstractUser,
        msg: DBMessage,
        reactions: dict[TelegramID, list[WrappedReaction]],
        custom_emoji_ids: list[int],
        is_full: bool,
        only_user_id: TelegramID | None = None,
        timestamp: datetime | None = None,
    ) -> None:
        custom_emojis = await util.transfer_custom_emojis_to_matrix(source, custom_emoji_ids)

        existing_reactions = await DBReaction.get_all_by_message(msg.mxid, msg.mx_room)

        removed: list[DBReaction] = []
        for existing_reaction in existing_reactions:
            sender_id = existing_reaction.tg_sender
            if only_user_id is not None and sender_id != only_user_id:
                continue
            new_reactions = reactions.get(sender_id)
            if self._reactions_filter(new_reactions, existing_reaction):
                if new_reactions is not None and len(new_reactions) == 0:
                    reactions.pop(sender_id)
            else:
                if is_full or (
                    new_reactions is not None
                    and len(new_reactions) == await self._get_reaction_limit(source, sender_id)
                ):
                    removed.append(existing_reaction)
                # else: assume the reaction is still there, too much effort to fetch it

        new_reaction: TypeReaction
        for sender, new_reactions in reactions.items():
            for new_wrapped_reaction in new_reactions:
                new_reaction = new_wrapped_reaction.reaction
                if isinstance(new_reaction, ReactionEmoji):
                    emoji_id = new_reaction.emoticon
                    matrix_reaction = variation_selector.add(new_reaction.emoticon)
                elif isinstance(new_reaction, ReactionCustomEmoji):
                    emoji_id = str(new_reaction.document_id)
                    custom_emoji = custom_emojis[new_reaction.document_id]
                    if isinstance(custom_emoji, util.UnicodeCustomEmoji):
                        matrix_reaction = custom_emoji.emoji
                    else:
                        matrix_reaction = custom_emoji.mxc
                else:
                    self.log.warning("Unknown reaction type %s", type(new_reaction))
                    continue
                self.log.debug(f"Bridging reaction {emoji_id} by {sender} to {msg.tgid}")
                puppet: p.Puppet = await p.Puppet.get_by_tgid(sender)
                mxid = await puppet.intent_for(self).react(
                    msg.mx_room,
                    msg.mxid,
                    matrix_reaction,
                    timestamp=new_wrapped_reaction.date or timestamp,
                )
                await DBReaction(
                    mxid=mxid,
                    mx_room=msg.mx_room,
                    msg_mxid=msg.mxid,
                    tg_sender=sender,
                    reaction=emoji_id,
                ).save()
        for removed_reaction in removed:
            self.log.debug(
                f"Removing reaction {removed_reaction.reaction} by {removed_reaction.tg_sender} "
                f"to {msg.tgid}"
            )
            puppet = await p.Puppet.get_by_tgid(removed_reaction.tg_sender)
            await puppet.intent_for(self).redact(removed_reaction.mx_room, removed_reaction.mxid)
            await removed_reaction.delete()

    async def handle_telegram_message(
        self, source: au.AbstractUser, sender: p.Puppet | None, evt: Message
    ) -> None:
        try:
            await self._handle_telegram_message(source, sender, evt)
        except Exception:
            sender_id = sender.tgid if sender else None
            self.log.exception(
                f"Failed to handle Telegram message {evt.id} from {sender_id} via {source.tgid}"
            )
            if self.config["bridge.incoming_bridge_error_reports"]:
                intent = sender.intent_for(self) if sender else self.main_intent
                await self._send_message(
                    intent,
                    TextMessageEventContent(
                        msgtype=MessageType.NOTICE,
                        body="Error processing message from Telegram",
                    ),
                )

    async def _handle_telegram_message(
        self, source: au.AbstractUser, sender: p.Puppet | None, evt: Message
    ) -> None:
        if not self.mxid:
            if source.is_relaybot and self.config["bridge.relaybot.ignore_unbridged_group_chat"]:
                return
            self.log.debug("Got telegram message %d, but no room exists, creating...", evt.id)
            await self.create_matrix_room(source, invites=[source.mxid], update_if_exists=False)
            if not self.mxid:
                self.log.warning("Room doesn't exist even after creating, dropping %d", evt.id)
                return

        if (
            self.peer_type == "user"
            and sender
            and sender.tgid == self.tg_receiver
            and not sender.is_real_user
            and not await self.az.state_store.is_joined(self.mxid, sender.mxid)
        ):
            self.log.debug(
                f"Ignoring private chat message {evt.id}@{source.tgid} as receiver does"
                " not have matrix puppeting and their default puppet isn't in the room"
            )
            return

        sender_id = sender.tgid if sender else self.tgid
        async with self.send_lock(sender_id, required=False):
            tg_space = self.tgid if self.peer_type == "channel" else source.tgid

            temporary_identifier = EventID(
                f"${random.randint(1000000000000, 9999999999999)}TGBRIDGETEMP"
            )
            event_hash, duplicate_found = self.dedup.check(evt, (temporary_identifier, tg_space))
            if duplicate_found:
                mxid, other_tg_space = duplicate_found
                self.log.debug(
                    f"Ignoring message {evt.id}@{tg_space} (src {source.tgid}) "
                    f"as it was already handled (in space {other_tg_space})"
                )
                if tg_space != other_tg_space:
                    await DBMessage(
                        tgid=TelegramID(evt.id),
                        mx_room=self.mxid,
                        mxid=mxid,
                        tg_space=tg_space,
                        edit_index=0,
                        content_hash=event_hash,
                        sender=sender_id,
                    ).insert()
                return

        msg = await DBMessage.get_one_by_tgid(TelegramID(evt.id), tg_space)
        if msg:
            self.log.debug(
                f"Ignoring message {evt.id} (src {source.tgid}) as it was already "
                f"handled into {msg.mxid}."
            )
            return

        self.log.debug(
            "Handling Telegram message %d@%d from %s (ts: %s)",
            evt.id,
            tg_space,
            sender_id,
            evt.date,
        )
        self.log.trace("Message content: %s", evt)

        if sender and not sender.displayname:
            self.log.debug(
                f"Telegram user {sender.tgid} sent a message, but doesn't have a displayname,"
                " updating info..."
            )
            try:
                entity = await source.client.get_entity(sender.peer)
                await sender.update_info(source, entity)
                if not sender.displayname:
                    self.log.debug(
                        f"Telegram user {sender.tgid} doesn't have a displayname even after"
                        f" updating with data {entity!s}"
                    )
            except ValueError as e:
                self.log.warning(
                    f"Couldn't find entity to update profile of {sender.tgid}", exc_info=True
                )

        if sender:
            # TODO don't use double puppet when backfilling
            intent = sender.intent_for(self)
        else:
            intent = self.main_intent
        is_bot = sender.is_bot if sender else False
        converted = await self._msg_conv.convert(source, intent, is_bot, self.is_channel, evt)
        if not converted:
            return
        await intent.set_typing(self.mxid, timeout=0)
        event_id = await self._send_message(
            intent, converted.content, timestamp=evt.date, event_type=converted.type
        )
        caption_id = None
        if converted.caption:
            caption_id = await self._send_message(intent, converted.caption, timestamp=evt.date)

        self._new_messages_after_sponsored = True

        another_event_hash, prev_id = self.dedup.update(
            evt, (event_id, tg_space), (temporary_identifier, tg_space)
        )
        assert another_event_hash == event_hash
        if prev_id:
            self.log.debug(
                f"Sent message {evt.id}@{tg_space} to Matrix as {event_id}. "
                f"Temporary dedup identifier was {temporary_identifier}, "
                f"but dedup map contained {prev_id[1]} instead! -- "
                "This was probably a race condition caused by Telegram sending updates"
                "to other clients before responding to the sender. I'll just redact "
                "the likely duplicate message now."
            )
            await intent.redact(self.mxid, event_id)
            return

        self.log.debug("Handled Telegram message %d@%d -> %s", evt.id, tg_space, event_id)
        try:
            dbm = DBMessage(
                tgid=TelegramID(evt.id),
                mx_room=self.mxid,
                mxid=event_id,
                tg_space=tg_space,
                edit_index=0,
                content_hash=event_hash,
                sender=sender_id,
            )
            await dbm.insert()
            await DBMessage.replace_temp_mxid(temporary_identifier, self.mxid, event_id)
        except (IntegrityError, UniqueViolationError) as e:
            self.log.error(
                f"{type(e).__name__} while saving message mapping {evt.id}@{tg_space} "
                f"-> {event_id}: {e}"
            )
            await intent.redact(self.mxid, event_id)
            return
        if isinstance(evt, Message) and evt.reactions:
            background_task.create(
                self.try_handle_telegram_reactions(
                    source, dbm.tgid, evt.reactions, dbm=dbm, timestamp=evt.date
                )
            )
        await self._send_delivery_receipt(event_id)
        if converted.disappear_seconds:
            if converted.disappear_start_immediately:
                expires_at = int(evt.date.timestamp()) + converted.disappear_seconds
            else:
                expires_at = None
            await self._mark_disappearing(event_id, converted.disappear_seconds, expires_at)
            if caption_id:
                await self._mark_disappearing(caption_id, converted.disappear_seconds, expires_at)

    async def _mark_disappearing(
        self, event_id: EventID, seconds: int, expires_at: int | None
    ) -> None:
        dm = DisappearingMessage(
            self.mxid, event_id, seconds, expiration_ts=expires_at * 1000 if expires_at else None
        )
        await dm.insert()
        if expires_at:
            background_task.create(self._disappear_event(dm))

    async def _create_room_on_action(
        self, source: au.AbstractUser, action: TypeMessageAction
    ) -> bool:
        if source.is_relaybot and self.config["bridge.relaybot.ignore_unbridged_group_chat"]:
            return False
        create_and_exit = (MessageActionChatCreate, MessageActionChannelCreate)
        create_and_continue = (
            MessageActionChatAddUser,
            MessageActionChatJoinedByLink,
            MessageActionChatJoinedByRequest,
        )
        if isinstance(action, create_and_exit) or isinstance(action, create_and_continue):
            self.log.debug(
                f"Got telegram action of type {type(action).__name__},"
                " but no room exists, creating..."
            )
            await self.create_matrix_room(
                source, invites=[source.mxid], update_if_exists=isinstance(action, create_and_exit)
            )
        if not isinstance(action, create_and_continue):
            return False
        return True

    async def handle_telegram_direct_call(
        self, source: au.AbstractUser, sender: p.Puppet, update: UpdatePhoneCall
    ) -> None:
        if isinstance(update.phone_call, PhoneCallRequested):
            call_type = "video call" if update.phone_call.video else "call"
            await self._send_message(
                sender.intent_for(self),
                TextMessageEventContent(msgtype=MessageType.EMOTE, body=f"started a {call_type}"),
            )

    async def handle_telegram_action(
        self, source: au.AbstractUser, sender: p.Puppet | None, update: MessageService
    ) -> None:
        action = update.action
        should_ignore = (
            not self.mxid and not await self._create_room_on_action(source, action)
        ) or self.dedup.check_action(update)
        if should_ignore or not self.mxid:
            return
        if isinstance(action, MessageActionChatEditTitle):
            await self._update_title(action.title, sender=sender, save=True)
            await self.update_bridge_info()
        elif isinstance(action, MessageActionChatEditPhoto):
            await self._update_avatar(source, action.photo, sender=sender, save=True)
            await self.update_bridge_info()
        elif isinstance(action, MessageActionChatDeletePhoto):
            await self._update_avatar(source, ChatPhotoEmpty(), sender=sender, save=True)
            await self.update_bridge_info()
        elif isinstance(action, MessageActionChatAddUser):
            for user_id in action.users:
                await self._add_telegram_user(TelegramID(user_id), source)
        elif isinstance(action, (MessageActionChatJoinedByLink, MessageActionChatJoinedByRequest)):
            await self._add_telegram_user(sender.id, source)
        elif isinstance(action, MessageActionChatDeleteUser):
            await self.delete_telegram_user(TelegramID(action.user_id), sender)
        elif isinstance(action, MessageActionChatMigrateTo):
            await self._migrate_and_save_telegram(TelegramID(action.channel_id))
            await self._send_message(
                sender.intent_for(self),
                TextMessageEventContent(
                    msgtype=MessageType.EMOTE,
                    body="upgraded this group to a supergroup",
                ),
            )
            await self.update_bridge_info()
        elif isinstance(action, MessageActionPhoneCall):
            call_type = "Video call" if action.video else "Call"
            end_reason = "ended"
            if isinstance(action.reason, PhoneCallDiscardReasonMissed):
                end_reason = "cancelled" if sender.tgid == source.tgid else "missed"
            elif isinstance(action.reason, PhoneCallDiscardReasonBusy):
                end_reason = "rejected"
            elif isinstance(action.reason, PhoneCallDiscardReasonDisconnect):
                end_reason = "disconnected"
            body = f"{call_type} {end_reason}"
            if action.duration:
                body += f" ({format_duration(action.duration)})"
            await self._send_message(
                sender.intent_for(self),
                TextMessageEventContent(msgtype=MessageType.NOTICE, body=body),
            )
        elif isinstance(action, MessageActionGroupCall):
            await self._send_message(
                sender.intent_for(self),
                TextMessageEventContent(
                    msgtype=MessageType.EMOTE,
                    body=(
                        "started a video chat"
                        if action.duration is None
                        else f"ended the video chat ({format_duration(action.duration)})"
                    ),
                ),
            )
        elif isinstance(action, MessageActionGiftPremium):
            await self._send_message(
                sender.intent_for(self),
                TextMessageEventContent(
                    msgtype=MessageType.EMOTE,
                    body=(
                        f"gifted Telegram Premium for {action.months} months "
                        f"({action.amount / 100} {action.currency})"
                    ),
                ),
            )
        elif isinstance(action, MessageActionBoostApply):
            await self._send_message(
                sender.intent_for(self),
                TextMessageEventContent(
                    msgtype=MessageType.EMOTE,
                    body=(
                        "boosted the group"
                        if action.boosts == 1
                        else f"boosted the group {action.boosts} times"
                    ),
                ),
            )
        elif isinstance(action, MessageActionGameScore):
            # TODO handle game score
            pass
        elif isinstance(action, MessageActionContactSignUp):
            await self.handle_telegram_joined(source, sender, update)
        else:
            self.log.trace("Unhandled Telegram action in %s: %s", self.title, action)

    async def handle_telegram_joined(
        self,
        source: au.AbstractUser,
        sender: p.Puppet,
        update: MessageService,
        backfill: bool = False,
    ) -> None:
        assert isinstance(update.action, MessageActionContactSignUp)

        msg = await DBMessage.get_one_by_tgid(TelegramID(update.id), source.tgid)
        if msg:
            self.log.debug(
                f"Ignoring new user message {update.id} (src {source.tgid}) as it was already "
                f"handled into {msg.mxid}."
            )
            return

        content = TextMessageEventContent(msgtype=MessageType.EMOTE, body="joined Telegram")
        event_id = await self._send_message(
            sender.intent_for(self), content, timestamp=update.date
        )
        await DBMessage(
            tgid=TelegramID(update.id),
            mx_room=self.mxid,
            mxid=event_id,
            tg_space=source.tgid,
            edit_index=0,
            sender=sender.id,
        ).insert()
        if self.config["bridge.always_read_joined_telegram_notice"]:
            double_puppet = await p.Puppet.get_by_tgid(source.tgid)
            if double_puppet and double_puppet.is_real_user:
                await double_puppet.intent.mark_read(self.mxid, event_id)

    async def set_telegram_admin(self, user_id: TelegramID) -> None:
        puppet = await p.Puppet.get_by_tgid(user_id)
        user = await u.User.get_by_tgid(user_id)

        levels = await self.main_intent.get_power_levels(self.mxid)
        if user:
            levels.users[user.mxid] = 50
        if puppet:
            levels.users[puppet.mxid] = 50
        await self.main_intent.set_power_levels(self.mxid, levels)

    async def receive_telegram_pin_ids(
        self, msg_ids: list[TelegramID], receiver: TelegramID, remove: bool
    ) -> None:
        async with self._pin_lock:
            tg_space = receiver if self.peer_type != "channel" else self.tgid
            previously_pinned = await self.main_intent.get_pinned_messages(self.mxid)
            currently_pinned_dict = {event_id: True for event_id in previously_pinned}
            for message in await DBMessage.get_first_by_tgids(msg_ids, tg_space):
                if remove:
                    currently_pinned_dict.pop(message.mxid, None)
                else:
                    currently_pinned_dict[message.mxid] = True
            currently_pinned = list(currently_pinned_dict.keys())
            if currently_pinned != previously_pinned:
                await self.main_intent.set_pinned_messages(self.mxid, currently_pinned)

    async def set_telegram_admins_enabled(self, enabled: bool) -> None:
        level = 50 if enabled else 10
        levels = await self.main_intent.get_power_levels(self.mxid)
        levels.invite = level
        levels.events[EventType.ROOM_NAME] = level
        levels.events[EventType.ROOM_AVATAR] = level
        await self.main_intent.set_power_levels(self.mxid, levels)

    # endregion
    # region Miscellaneous getters

    def get_config(self, key: str) -> Any:
        local = util.recursive_get(self.local_config, key)
        if local is not None:
            return local
        return self.config[f"bridge.{key}"]

    async def can_user_perform(self, user: u.User, event: str) -> bool:
        if user.is_admin:
            return True
        if not self.mxid:
            # No room for anybody to perform actions in
            return False
        try:
            await self.main_intent.get_power_levels(self.mxid)
        except MatrixRequestError:
            return False
        evt_type = EventType.find(f"fi.mau.telegram.{event}", t_class=EventType.Class.STATE)
        return await self.main_intent.state_store.has_power_level(self.mxid, user.mxid, evt_type)

    def get_input_entity(
        self, user: au.AbstractUser
    ) -> Awaitable[TypeInputPeer | TypeInputChannel]:
        return user.client.get_input_entity(self.peer)

    async def get_entity(
        self, user: au.AbstractUser, client: MautrixTelegramClient | None = None
    ) -> TypeChat:
        if not client:
            client = user.client
        try:
            return await client.get_entity(self.peer)
        except ValueError:
            if user.is_bot:
                self.log.warning(f"Could not find entity with bot {user.tgid}. Failing...")
                raise
            self.log.warning(
                f"Could not find entity with user {user.tgid}. falling back to get_dialogs."
            )
            async for dialog in client.iter_dialogs():
                if dialog.entity.id == self.tgid:
                    return dialog.entity
            raise

    async def get_invite_link(
        self,
        user: u.User,
        uses: int | None = None,
        expire: datetime | None = None,
        request_needed: bool = False,
        title: str | None = None,
    ) -> str:
        if self.peer_type == "user":
            raise ValueError("You can't invite users to private chats.")
        if self.username:
            return f"https://t.me/{self.username}"
        link = await user.client(
            ExportChatInviteRequest(
                peer=await self.get_input_entity(user),
                expire_date=expire,
                usage_limit=uses,
                request_needed=request_needed,
                title=title,
            )
        )
        return link.link

    # endregion
    # region Matrix room cleanup

    async def get_authenticated_matrix_users(self) -> list[UserID]:
        try:
            members = await self.main_intent.get_room_members(self.mxid)
        except MatrixRequestError:
            return []
        authenticated: list[UserID] = []
        has_bot = self.has_bot
        for member in members:
            if p.Puppet.get_id_from_mxid(member) or member == self.az.bot_mxid:
                continue
            user = await u.User.get_and_start_by_mxid(member)
            authenticated_through_bot = has_bot and user.relaybot_whitelisted
            if authenticated_through_bot or await user.has_full_access(allow_bot=True):
                authenticated.append(user.mxid)
        return authenticated

    async def cleanup_portal(
        self, message: str, puppets_only: bool = False, delete: bool = True
    ) -> None:
        if self.username:
            try:
                await self.main_intent.remove_room_alias(self.alias_localpart)
            except (MatrixRequestError, IntentError):
                self.log.warning("Failed to remove alias when cleaning up room", exc_info=True)
        await self.cleanup_room(self.main_intent, self.mxid, message, puppets_only)
        if delete:
            await self.delete()

    async def delete(self) -> None:
        try:
            del self.by_tgid[self.tgid_full]
        except KeyError:
            pass
        try:
            del self.by_mxid[self.mxid]
        except KeyError:
            pass
        self.name_set = False
        self.avatar_set = False
        self.about = None
        self.next_batch_id = None
        self.first_event_id = None
        self.sponsored_event_id = None
        self.sponsored_event_ts = None
        self.sponsored_msg_random_id = None
        await super().delete()
        await DBMessage.delete_all(self.mxid)
        await DBReaction.delete_all(self.mxid)
        self.deleted = True

    # endregion
    # region Class instance lookup

    async def get_dm_puppet(self) -> p.Puppet | None:
        if not self.is_direct:
            return None
        return await p.Puppet.get_by_tgid(self.tgid)

    async def postinit(self) -> None:
        puppet = await self.get_dm_puppet()
        self._main_intent = puppet.intent_for(self) if self.is_direct else self.az.intent

        if self.tgid:
            self.by_tgid[self.tgid_full] = self
        if self.mxid:
            self.by_mxid[self.mxid] = self

    @classmethod
    async def _yield_portals(
        cls, query: Awaitable[list[DBPortal]]
    ) -> AsyncGenerator[Portal, None]:
        portals = await query
        portal: cls
        for portal in portals:
            try:
                yield cls.by_tgid[portal.tgid_full]
            except KeyError:
                await portal.postinit()
                yield portal

    @classmethod
    def all(cls) -> AsyncGenerator[Portal, None]:
        return cls._yield_portals(super().all())

    @classmethod
    def find_private_chats_of(cls, tg_receiver: TelegramID) -> AsyncGenerator[Portal, None]:
        return cls._yield_portals(super().find_private_chats_of(tg_receiver))

    @classmethod
    def find_private_chats_with(cls, tgid: TelegramID) -> AsyncGenerator[Portal, None]:
        return cls._yield_portals(super().find_private_chats_with(tgid))

    @classmethod
    @async_getter_lock
    async def get_by_mxid(cls, mxid: RoomID, /) -> Portal | None:
        try:
            return cls.by_mxid[mxid]
        except KeyError:
            pass

        portal = cast(cls, await super().get_by_mxid(mxid))
        if portal:
            await portal.postinit()
            return portal

        return None

    @classmethod
    def get_username_from_mx_alias(cls, alias: str) -> str | None:
        return cls.alias_template.parse(alias)

    @classmethod
    async def find_by_username(cls, username: str) -> Portal | None:
        if not username:
            return None

        username = username.lower()

        for _, portal in cls.by_tgid.items():
            if portal.username and portal.username.lower() == username:
                return portal

        portal = cast(cls, await super().find_by_username(username))
        if portal:
            try:
                return cls.by_tgid[portal.tgid_full]
            except KeyError:
                await portal.postinit()
                return portal

        return None

    @classmethod
    @async_getter_lock
    async def get_by_tgid(
        cls, tgid: TelegramID, /, *, tg_receiver: TelegramID | None = None, peer_type: str = None
    ) -> Portal | None:
        if peer_type == "user" and tg_receiver is None:
            raise ValueError('tg_receiver is required when peer_type is "user"')
        tg_receiver = tg_receiver or tgid
        tgid_full = (tgid, tg_receiver)
        try:
            return cls.by_tgid[tgid_full]
        except KeyError:
            pass

        portal = cast(cls, await super().get_by_tgid(tgid, tg_receiver))
        if portal:
            await portal.postinit()
            return portal

        if peer_type:
            cls.log.info(f"Creating portal object for {peer_type} {tgid} (receiver {tg_receiver})")
            # TODO enable this for non-release builds
            #      (or add better wrong peer type error handling)
            # if peer_type == "chat":
            #     import traceback
            #     cls.log.info("Chat portal stack trace:\n" + "".join(traceback.format_stack()))
            portal = cls(tgid, peer_type=peer_type, tg_receiver=tg_receiver)
            await portal.postinit()
            await portal.insert()
            return portal

        return None

    @classmethod
    async def get_by_entity(
        cls,
        entity: TypeChat | TypePeer | TypeUser | TypeUserFull | TypeInputPeer,
        tg_receiver: TelegramID | None = None,
        create: bool = True,
    ) -> Portal | None:
        entity_type = type(entity)
        if entity_type in (Chat, ChatFull):
            type_name = "chat"
            entity_id = entity.id
        elif entity_type in (PeerChat, InputPeerChat):
            type_name = "chat"
            entity_id = entity.chat_id
        elif entity_type in (Channel, ChannelFull):
            type_name = "channel"
            entity_id = entity.id
        elif entity_type in (PeerChannel, InputPeerChannel, InputChannel):
            type_name = "channel"
            entity_id = entity.channel_id
        elif entity_type in (User, UserFull):
            type_name = "user"
            entity_id = entity.id
        elif entity_type in (PeerUser, InputPeerUser, InputUser):
            type_name = "user"
            entity_id = entity.user_id
        else:
            raise ValueError(f"Unknown entity type {entity_type.__name__}")
        return await cls.get_by_tgid(
            TelegramID(entity_id),
            tg_receiver=tg_receiver if type_name == "user" else entity_id,
            peer_type=type_name if create else None,
        )

    # endregion