djudman/evernote-telegram-bot

View on GitHub
evernotebot/bot/mixins/message.py

Summary

Maintainability
A
1 hr
Test Coverage
import math
from os.path import basename, join
from urllib.parse import urlparse

from evernotebot.bot.errors import EvernoteBotException
from evernotebot.bot.mixins import EvernoteMixin
from evernotebot.util.http import make_request


def get_message_text(message: dict, start: int = 0, end: int = None) -> str:
    text = message.get('text', '')
    text = text.encode('utf-16')
    text = text[2:]  # skip BOM
    start = start * 2 if start is not None else None  # 2 bytes per symbol
    end = end * 2 if end is not None else None
    text = text[start:end].decode('utf-16')
    text = text.replace('&', '&')
    text = text.replace('>', '>')
    text = text.replace('<', '&lt;')
    text = text.replace('\n', '<br />')
    return text


def format_html(message: dict) -> str:
    entities = message.get('entities', [])
    if not entities:
        return get_message_text(message)
    pointer = 0
    strings = []
    for entity in entities:
        offset = entity.get('offset')
        text = get_message_text(message, pointer, offset)
        strings.append(text)
        start, end = offset, offset + entity['length']
        if start < pointer:
            continue
        string = get_message_text(message, start, end)
        type_label = entity['type']
        if type_label == 'text_link':
            url = entity['url']
            html = f'<a href="{url}">{string}</a>'
        elif type_label == 'pre':
            html = f'<pre>{string}</pre>'
        elif type_label == 'bold':
            html = f'<b>{string}</b>'
        elif type_label == 'italic':
            html = f'<i>{string}</i>'
        elif type_label == 'underline':
            html = f'<u>{string}</u>'
        elif type_label == 'strikethrough':
            html = f'<s>{string}</s>'
        else:
            html = string
        strings.append(html)
        pointer = end
    strings.append(get_message_text(message, pointer))
    text = ''.join(strings)
    text = '<br />'.join(text.split('\n'))
    return text


def get_message_caption(message: dict) -> str:
    if user := message.get('forward_from'):
        parts = filter(lambda x: x, (user['first_name'], user['last_name']))
        name = ' '.join(parts)
        if username := user.get('username'):
            name += f' {username}'
        return f'Forwarded from {name}'
    if chat := message.get('forward_from_chat'):
        name = chat.get('title', '')
        return f'Forwarded from {chat["type"]} {name}'
    if sender_name := message.get('forward_sender_name'):
        return f'Forwarded from {sender_name}'
    return message.get('caption')


def get_telegram_link(message: dict) -> str:
    if chat := message.get('forward_from_chat'):
        username = chat.get('username')
        # TODO: get link to message
        # https://t.me/c/1229931078/892
        # https://t.me/c/<forward_from_chat.id>/<forward_from_message_id>
        if not username:
            return 'https://t.me'
        message_id = message['forward_from_message_id']
        return f'https://t.me/{username}/{message_id}'


class MessageHandlerMixin(EvernoteMixin):
    async def on_receive_text(self, message: dict):
        html = format_html(message)
        telegram_link = get_telegram_link(message)
        if telegram_link:
            html = f'<div><p><a href="{telegram_link}">{telegram_link}</a></p>{html}</div>'
        title = get_message_caption(message) or '[Telegram bot]'
        await self.save_note('', title=title, html=html)

    async def on_receive_photo(self, message: dict):
        max_size = 20 * 1024 * 1024  # telegram restriction. We can't download any file that has size more than 20Mb
        file_id = None
        file_size = math.inf
        for photo in message['photo']:  # pick the biggest file
            if photo['file_size'] <= max_size and \
                    (file_size == math.inf or file_size < photo['file_size']):
                file_size = photo['file_size']
                file_id = photo['file_id']
        await self.save_file(file_id, file_size, message)

    async def on_receive_video(self, message: dict):
        file_size = message['video']['file_size']
        file_id = message['video']['file_id']
        await self.save_file(file_id, file_size, message)

    async def on_receive_document(self, message: dict):
        file_size = message['document']['file_size']
        file_id = message['document']['file_id']
        await self.save_file(file_id, file_size, message)

    async def on_receive_voice(self, message: dict):
        file_id = message['voice']['file_id']
        file_size = message['voice']['file_size']
        await self.save_file(file_id, file_size, message)

    async def on_receive_location(self, message: dict):
        latitude = message['location']['latitude']
        longitude = message['location']['longitude']
        maps_url = f'https://maps.google.com/maps?q={latitude},{longitude}'
        title = 'Location'
        html = f'<a href="{maps_url}">{maps_url}</a>'
        if venue := message.get('venue'):
            title = venue.get(title) or title
            address = venue['address']
            html = f'{title}<br />{address}<br /><a href="{maps_url}">{maps_url}</a>'
            if foursquare_id := venue.get('foursquare_id'):
                url = f'https://foursquare.com/v/{foursquare_id}'
                html += f'<br /><a href="{url}">{url}</a>'
        title = get_message_caption(message) or title
        await self.save_note(title=title, html=html)

    async def save_file(self, file_id: str, file_size: int, message: dict):
        download_dir = self.config['tmp_root']
        filename, short_name = await self.download_telegram_file(file_id, file_size, download_dir)
        await self.evernote_check_quota(file_size)
        message_text = message.get('text')
        title = get_message_caption(message) or (message_text and message_text[:20]) or 'File'
        files = ({'path': filename, 'name': short_name},)
        text = ''
        telegram_link = get_telegram_link(message)
        if telegram_link:
            caption = message.get('caption') or 'File'
            text = f'<div><p><a href="{telegram_link}">{telegram_link}</a></p><pre>{caption}</pre></div>'
        await self.save_note('', title=title, files=files, html=text)

    async def download_telegram_file(self, file_id: str, file_size: int, dirpath: str):
        max_size = 20 * 1024 * 1024
        if file_size > max_size:
            raise EvernoteBotException('File too big. Telegram does not allow to the bot to download files over 20Mb.')
        download_url = await self.api.getFile(file_id)
        data = make_request(download_url)
        short_name = basename(urlparse(download_url).path)
        filepath = join(dirpath, f'{file_id}_{short_name}')
        with open(filepath, 'wb') as f:
            f.write(data)
        return filepath, short_name