tulir/mautrix-telegram

View on GitHub
mautrix_telegram/formatter/from_telegram.py

Summary

Maintainability
B
4 hrs
Test Coverage
# mautrix-telegram - A Matrix-Telegram puppeting bridge
# Copyright (C) 2021 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations

from html import escape
import logging
import re

from telethon.errors import RPCError
from telethon.helpers import add_surrogate, del_surrogate
from telethon.tl.custom import Message
from telethon.tl.types import (
    Channel,
    InputPeerChannelFromMessage,
    InputPeerUserFromMessage,
    MessageEntityBlockquote,
    MessageEntityBold,
    MessageEntityBotCommand,
    MessageEntityCashtag,
    MessageEntityCode,
    MessageEntityCustomEmoji,
    MessageEntityEmail,
    MessageEntityHashtag,
    MessageEntityItalic,
    MessageEntityMention,
    MessageEntityMentionName,
    MessageEntityPhone,
    MessageEntityPre,
    MessageEntitySpoiler,
    MessageEntityStrike,
    MessageEntityTextUrl,
    MessageEntityUnderline,
    MessageEntityUrl,
    MessageFwdHeader,
    PeerChannel,
    PeerChat,
    PeerUser,
    SponsoredMessage,
    TypeMessageEntity,
    User,
)

from mautrix.types import Format, MessageType, TextMessageEventContent

from .. import abstract_user as au, portal as po, puppet as pu, user as u
from ..db import Message as DBMessage, TelegramFile as DBTelegramFile
from ..tgclient import MautrixTelegramClient
from ..types import TelegramID
from ..util.file_transfer import UnicodeCustomEmoji, transfer_custom_emojis_to_matrix

log: logging.Logger = logging.getLogger("mau.fmt.tg")


async def _get_fwd_entity(client: MautrixTelegramClient, evt: Message) -> Channel | User | None:
    try:
        return await client.get_entity(evt.fwd_from.from_id)
    except (ValueError, RPCError) as e:
        try:
            input_peer = await client.get_input_entity(evt.peer_id)
            if isinstance(evt.fwd_from.from_id, PeerUser):
                return await client.get_entity(
                    InputPeerUserFromMessage(
                        peer=input_peer, msg_id=evt.id, user_id=evt.fwd_from.from_id.user_id
                    )
                )
            elif isinstance(evt.fwd_from.from_id, PeerChannel):
                return await client.get_entity(
                    InputPeerChannelFromMessage(
                        peer=input_peer, msg_id=evt.id, channel_id=evt.fwd_from.from_id.channel_id
                    )
                )
        except (ValueError, RPCError) as e:
            pass
        return None


async def _add_forward_header(
    client: MautrixTelegramClient, content: TextMessageEventContent, evt: Message
) -> None:
    fwd_from = evt.fwd_from
    fwd_from_html, fwd_from_text = None, None
    if isinstance(fwd_from.from_id, PeerUser):
        user = await u.User.get_by_tgid(TelegramID(fwd_from.from_id.user_id))
        if user:
            fwd_from_text = user.displayname or user.mxid
            fwd_from_html = (
                f"<a href='https://matrix.to/#/{user.mxid}'>{escape(fwd_from_text)}</a>"
            )

        if not fwd_from_text:
            puppet = await pu.Puppet.get_by_peer(fwd_from.from_id, create=False)
            if puppet and puppet.displayname:
                fwd_from_text = puppet.displayname or puppet.mxid
                fwd_from_html = (
                    f"<a href='https://matrix.to/#/{puppet.mxid}'>{escape(fwd_from_text)}</a>"
                )

        if not fwd_from_text:
            user = await _get_fwd_entity(client, evt)
            if user:
                fwd_from_text, _ = pu.Puppet.get_displayname(user, False)
                fwd_from_html = f"<b>{escape(fwd_from_text)}</b>"
            else:
                fwd_from_text = fwd_from_html = "unknown user"
    elif isinstance(fwd_from.from_id, (PeerChannel, PeerChat)):
        from_id = (
            fwd_from.from_id.chat_id
            if isinstance(fwd_from.from_id, PeerChat)
            else fwd_from.from_id.channel_id
        )
        portal = await po.Portal.get_by_tgid(TelegramID(from_id))
        if portal and portal.title:
            fwd_from_text = portal.title
            if portal.alias:
                fwd_from_html = (
                    f"<a href='https://matrix.to/#/{portal.alias}'>{escape(fwd_from_text)}</a>"
                )
            else:
                fwd_from_html = f"channel <b>{escape(fwd_from_text)}</b>"
        else:
            channel = await _get_fwd_entity(client, evt)
            if channel:
                fwd_from_text = f"channel {channel.title}"
                fwd_from_html = f"channel <b>{escape(channel.title)}</b>"
            else:
                fwd_from_text = fwd_from_html = "unknown channel"
    elif fwd_from.from_name:
        fwd_from_text = fwd_from.from_name
        fwd_from_html = f"<b>{escape(fwd_from.from_name)}</b>"
    else:
        fwd_from_text = "unknown source"
        fwd_from_html = f"unknown source"

    content.ensure_has_html()
    content.body = "\n".join([f"> {line}" for line in content.body.split("\n")])
    content.body = f"Forwarded from {fwd_from_text}:\n{content.body}"
    content.formatted_body = (
        f"Forwarded message from {fwd_from_html}<br/>"
        f"<tg-forward><blockquote>{content.formatted_body}</blockquote></tg-forward>"
    )


class ReuploadedCustomEmoji(MessageEntityCustomEmoji):
    file: DBTelegramFile

    def __init__(self, parent: MessageEntityCustomEmoji, file: DBTelegramFile) -> None:
        super().__init__(parent.offset, parent.length, parent.document_id)
        self.file = file


async def _convert_custom_emoji(
    source: au.AbstractUser,
    entities: list[TypeMessageEntity],
    client: MautrixTelegramClient | None = None,
) -> None:
    emoji_ids = [
        entity.document_id for entity in entities if isinstance(entity, MessageEntityCustomEmoji)
    ]
    custom_emojis = await transfer_custom_emojis_to_matrix(source, emoji_ids, client=client)
    if len(custom_emojis) > 0:
        for i, entity in enumerate(entities):
            if isinstance(entity, MessageEntityCustomEmoji):
                entities[i] = ReuploadedCustomEmoji(entity, custom_emojis[entity.document_id])


async def telegram_text_to_matrix_html(
    source: au.AbstractUser,
    text: str,
    entities: list[TypeMessageEntity],
    client: MautrixTelegramClient | None = None,
) -> str:
    if not entities:
        return escape(text).replace("\n", "<br/>")
    await _convert_custom_emoji(source, entities, client=client)
    text = add_surrogate(text)
    html = await _telegram_entities_to_matrix_catch(text, entities)
    html = del_surrogate(html)
    return html


async def telegram_to_matrix(
    evt: Message | SponsoredMessage,
    source: au.AbstractUser,
    client: MautrixTelegramClient | None = None,
    override_text: str = None,
    override_entities: list[TypeMessageEntity] = None,
    require_html: bool = False,
) -> TextMessageEventContent:
    if not client:
        client = source.client
    content = TextMessageEventContent(
        msgtype=MessageType.TEXT,
        body=override_text or evt.message,
    )
    entities = override_entities or evt.entities
    if entities:
        content.format = Format.HTML
        content.formatted_body = await telegram_text_to_matrix_html(
            source, content.body, entities, client=client
        )

    if require_html:
        content.ensure_has_html()

    if getattr(evt, "fwd_from", None):
        await _add_forward_header(client, content, evt)

    if isinstance(evt, Message) and evt.post and evt.post_author:
        content.ensure_has_html()
        content.body += f"\n- {evt.post_author}"
        content.formatted_body += f"<br/><i>- <u>{evt.post_author}</u></i>"

    return content


async def _telegram_entities_to_matrix_catch(text: str, entities: list[TypeMessageEntity]) -> str:
    try:
        return await _telegram_entities_to_matrix(text, entities)
    except Exception:
        log.exception(
            "Failed to convert Telegram format:\nmessage=%s\nentities=%s", text, entities
        )
    return "[failed conversion in _telegram_entities_to_matrix]"


def within_surrogate(text, index):
    """
    `True` if ``index`` is within a surrogate (before and after it, not at!).
    """
    return (
        1 < index < len(text)  # in bounds
        and "\ud800" <= text[index - 1] <= "\udbff"  # current is low surrogate
        and "\udc00" <= text[index] <= "\udfff"  # previous is high surrogate
    )


async def _telegram_entities_to_matrix(
    text: str,
    entities: list[TypeMessageEntity | ReuploadedCustomEmoji],
    offset: int = 0,
    length: int = None,
    in_codeblock: bool = False,
) -> str:
    def text_to_html(
        val: str, _in_codeblock: bool = in_codeblock, escape_html: bool = True
    ) -> str:
        if escape_html:
            val = escape(val)
        if not _in_codeblock:
            val = val.replace("\n", "<br/>")
        return val

    if not entities:
        return text_to_html(text)
    if length is None:
        length = len(text)
    html = []
    last_offset = 0
    for i, entity in enumerate(entities):
        if entity.offset >= offset + length:
            break
        relative_offset = entity.offset - offset
        if relative_offset > last_offset:
            html.append(text_to_html(text[last_offset:relative_offset]))
        elif relative_offset < last_offset:
            continue

        while within_surrogate(text, relative_offset):
            relative_offset += 1
        while within_surrogate(text, relative_offset + entity.length):
            entity.length += 1

        skip_entity = False
        is_code_entity = isinstance(entity, (MessageEntityCode, MessageEntityPre))
        entity_text = await _telegram_entities_to_matrix(
            text=text[relative_offset : relative_offset + entity.length],
            entities=entities[i + 1 :],
            offset=entity.offset,
            length=entity.length,
            in_codeblock=is_code_entity,
        )
        entity_text = text_to_html(entity_text, is_code_entity, escape_html=False)
        entity_type = type(entity)

        if entity_type == MessageEntityBold:
            html.append(f"<strong>{entity_text}</strong>")
        elif entity_type == MessageEntityItalic:
            html.append(f"<em>{entity_text}</em>")
        elif entity_type == MessageEntityUnderline:
            html.append(f"<u>{entity_text}</u>")
        elif entity_type == MessageEntityStrike:
            html.append(f"<del>{entity_text}</del>")
        elif entity_type == MessageEntityBlockquote:
            html.append(f"<blockquote>{entity_text}</blockquote>")
        elif entity_type == MessageEntityCode:
            html.append(
                f"<pre><code>{entity_text}</code></pre>"
                if "\n" in entity_text
                else f"<code>{entity_text}</code>"
            )
        elif entity_type == MessageEntityPre:
            skip_entity = _parse_pre(html, entity_text, entity.language)
        elif entity_type == MessageEntityMention:
            skip_entity = await _parse_mention(html, entity_text)
        elif entity_type == MessageEntityMentionName:
            skip_entity = await _parse_name_mention(html, entity_text, TelegramID(entity.user_id))
        elif entity_type == MessageEntityEmail:
            html.append(f"<a href='mailto:{entity_text}'>{entity_text}</a>")
        elif entity_type in (MessageEntityTextUrl, MessageEntityUrl):
            await _parse_url(
                html, entity_text, entity.url if entity_type == MessageEntityTextUrl else None
            )
        elif entity_type == MessageEntityCustomEmoji:
            html.append(entity_text)
        elif entity_type == ReuploadedCustomEmoji:
            if isinstance(entity.file, UnicodeCustomEmoji):
                html.append(entity.file.emoji)
            else:
                html.append(
                    f"<img data-mx-emoticon data-mau-animated-emoji"
                    f' src="{escape(entity.file.mxc)}" height="32" width="32"'
                    f' alt="{entity_text}" title="{entity_text}"/>'
                )
        elif entity_type in (
            MessageEntityBotCommand,
            MessageEntityHashtag,
            MessageEntityCashtag,
            MessageEntityPhone,
        ):
            html.append(f"<font color='#3771bb'>{entity_text}</font>")
        elif entity_type == MessageEntitySpoiler:
            html.append(f"<span data-mx-spoiler>{entity_text}</span>")
        else:
            skip_entity = True
        last_offset = relative_offset + (0 if skip_entity else entity.length)
    html.append(text_to_html(text[last_offset:]))

    html_string = "".join(html)
    # Remove redundant <br>'s after block tags
    html_string = html_string.replace("</blockquote><br/>", "</blockquote>")
    html_string = html_string.replace("</pre><br/>", "</pre>")
    return html_string


def _parse_pre(html: list[str], entity_text: str, language: str) -> bool:
    if language:
        html.append(f"<pre><code class='language-{language}'>{entity_text}</code></pre>")
    else:
        html.append(f"<pre><code>{entity_text}</code></pre>")
    return False


async def _parse_mention(html: list[str], entity_text: str) -> bool:
    username = entity_text[1:]

    mxid = None
    portal = None
    # This is a bit complicated because public channels have both Puppet and Portal instances.
    # Basically the currently intended output is:
    # User/bot mention (bridge user)          -> real user mention
    # User/bot mention (normal Telegram user) -> ghost user mention
    # Public channel with existing portal     -> room mention
    # Public channel without portal           -> ghost user mention
    # Other chat                              -> room mention
    user = await u.User.find_by_username(username) or await pu.Puppet.find_by_username(username)
    if user:
        if isinstance(user, pu.Puppet) and user.is_channel:
            portal = await po.Portal.get_by_tgid(user.tgid)
        mxid = user.mxid
    else:
        portal = await po.Portal.find_by_username(username)
    if portal and (portal.mxid or not user):
        mxid = portal.alias or portal.mxid

    if mxid:
        html.append(f"<a href='https://matrix.to/#/{mxid}'>{entity_text}</a>")
    else:
        return True
    return False


async def _parse_name_mention(html: list[str], entity_text: str, user_id: TelegramID) -> bool:
    user = await u.User.get_by_tgid(user_id)
    if user:
        mxid = user.mxid
    else:
        puppet = await pu.Puppet.get_by_tgid(user_id, create=False)
        mxid = puppet.mxid if puppet else None
    if mxid:
        html.append(f"<a href='https://matrix.to/#/{mxid}'>{entity_text}</a>")
    else:
        return True
    return False


message_link_regex = re.compile(
    r"https?://t(?:elegram)?\.(?:me|dog)"
    # /username or /c/id
    r"/([A-Za-z][A-Za-z0-9_]{3,31}[A-Za-z0-9]|[Cc]/[0-9]{1,20})"
    # /messageid
    r"/([0-9]{1,20})"
)


async def _parse_url(html: list[str], entity_text: str, url: str) -> None:
    url = escape(url) if url else entity_text
    if not url.startswith(("https://", "http://", "ftp://", "magnet://")):
        url = "http://" + url

    message_link_match = message_link_regex.match(url)
    if message_link_match:
        group, msgid_str = message_link_match.groups()
        msgid = int(msgid_str)

        if group.lower().startswith("c/"):
            portal = await po.Portal.get_by_tgid(TelegramID(int(group[2:])))
        else:
            portal = await po.Portal.find_by_username(group)
        if portal:
            message = await DBMessage.get_one_by_tgid(TelegramID(msgid), portal.tgid)
            if message:
                url = f"https://matrix.to/#/{portal.mxid}/{message.mxid}"

    html.append(f"<a href='{url}'>{entity_text}</a>")