errbotio/errbot

View on GitHub
errbot/backends/telegram_messenger.py

Summary

Maintainability
C
1 day
Test Coverage
import logging
import sys
from typing import Any, BinaryIO, List, Optional, Union

from errbot.backends.base import (
    ONLINE,
    Identifier,
    Message,
    Person,
    Room,
    RoomError,
    RoomOccupant,
    Stream,
)
from errbot.core import ErrBot
from errbot.rendering import text
from errbot.rendering.ansiext import TEXT_CHRS, enable_format

log = logging.getLogger(__name__)

UPDATES_OFFSET_KEY = "_telegram_updates_offset"

try:
    import telegram
except ImportError:
    log.exception("Could not start the Telegram back-end")
    log.fatal(
        "You need to install the telegram support in order "
        "to use the Telegram backend.\n"
        "You should be able to install this package using:\n"
        "pip install errbot[telegram]"
    )
    sys.exit(1)


class RoomsNotSupportedError(RoomError):
    def __init__(self, message: Optional[str] = None):
        if message is None:
            message = (
                "Room operations are not supported on Telegram. "
                "While Telegram itself has groupchat functionality, it does not "
                "expose any APIs to bots to get group membership or otherwise "
                "interact with groupchats."
            )
        super().__init__(message)


class TelegramBotFilter:
    """
    This is a filter for the logging library that filters the
    "No new updates found." log message generated by telegram.bot.

    This is an INFO-level log message that gets logged for every
    getUpdates() call where there are no new messages, so is way
    too verbose.
    """

    @staticmethod
    def filter(record):
        if record.getMessage() == "No new updates found.":
            return 0


class TelegramIdentifier(Identifier):
    def __init__(self, id):
        self._id = str(id)

    @property
    def id(self) -> str:
        return self._id

    def __unicode__(self):
        return str(self._id)

    def __eq__(self, other):
        return self._id == other.id

    __str__ = __unicode__

    aclattr = id


class TelegramPerson(TelegramIdentifier, Person):
    def __init__(self, id, first_name=None, last_name=None, username=None):
        super().__init__(id)
        self._first_name = first_name
        self._last_name = last_name
        self._username = username

    @property
    def id(self) -> str:
        return self._id

    @property
    def first_name(self) -> str:
        return self._first_name

    @property
    def last_name(self) -> str:
        return self._last_name

    @property
    def fullname(self) -> str:
        fullname = self.first_name
        if self.last_name is not None:
            fullname += " " + self.last_name
        return fullname

    @property
    def username(self) -> str:
        return self._username

    @property
    def client(self) -> None:
        return None

    person = id
    nick = username


class TelegramRoom(TelegramIdentifier, Room):
    def __init__(self, id, title=None):
        super().__init__(id)
        self._title = title

    @property
    def id(self) -> str:
        return self._id

    @property
    def title(self):
        """Return the groupchat title (only applies to groupchats)"""
        return self._title

    def join(self, username: str = None, password: str = None) -> None:
        raise RoomsNotSupportedError()

    def create(self) -> None:
        raise RoomsNotSupportedError()

    def leave(self, reason: str = None) -> None:
        raise RoomsNotSupportedError()

    def destroy(self) -> None:
        raise RoomsNotSupportedError()

    @property
    def joined(self) -> None:
        raise RoomsNotSupportedError()

    @property
    def exists(self) -> None:
        raise RoomsNotSupportedError()

    @property
    def topic(self) -> None:
        raise RoomsNotSupportedError()

    @property
    def occupants(self) -> None:
        raise RoomsNotSupportedError()

    def invite(self, *args) -> None:
        raise RoomsNotSupportedError()


class TelegramMUCOccupant(TelegramPerson, RoomOccupant):
    """
    This class represents a person inside a MUC.
    """

    def __init__(
        self, id, room: TelegramRoom, first_name=None, last_name=None, username=None
    ):
        super().__init__(
            id=id, first_name=first_name, last_name=last_name, username=username
        )
        self._room = room

    @property
    def room(self) -> TelegramRoom:
        return self._room

    @property
    def username(self) -> str:
        return self._username


class TelegramBackend(ErrBot):
    def __init__(self, config):
        super().__init__(config)
        logging.getLogger("telegram.bot").addFilter(TelegramBotFilter())

        identity = config.BOT_IDENTITY
        self.token = identity.get("token", None)
        if not self.token:
            log.fatal(
                "You need to supply a token for me to use. You can obtain "
                "a token by registering your bot with the Bot Father (@BotFather)"
            )
            sys.exit(1)
        self.telegram = None  # Will be initialized in serve_once
        self.bot_instance = None  # Will be set in serve_once

        compact = config.COMPACT_OUTPUT if hasattr(config, "COMPACT_OUTPUT") else False
        enable_format("text", TEXT_CHRS, borders=not compact)
        self.md_converter = text()

    def set_message_size_limit(self, limit: int = 1024, hard_limit: int = 1024) -> None:
        """
        Telegram message size limit
        """
        super().set_message_size_limit(limit, hard_limit)

    def serve_once(self) -> None:
        log.info("Initializing connection")
        try:
            self.telegram = telegram.Bot(token=self.token)
            me = self.telegram.getMe()
        except telegram.TelegramError as e:
            log.error("Connection failure: %s", e.message)
            return False

        self.bot_identifier = TelegramPerson(
            id=me.id,
            first_name=me.first_name,
            last_name=me.last_name,
            username=me.username,
        )

        log.info("Connected")
        self.reset_reconnection_count()
        self.connect_callback()

        try:
            offset = self[UPDATES_OFFSET_KEY]
        except KeyError:
            offset = 0

        try:
            while True:
                log.debug("Getting updates with offset %s", offset)
                for update in self.telegram.getUpdates(offset=offset, timeout=60):
                    offset = update.update_id + 1
                    self[UPDATES_OFFSET_KEY] = offset
                    log.debug("Processing update: %s", update)
                    if not hasattr(update, "message"):
                        log.warning("Unknown update type (no message present)")
                        continue
                    try:
                        self._handle_message(update.message)
                    except Exception:
                        log.exception("An exception occurred while processing update")
                log.debug("All updates processed, new offset is %s", offset)
        except KeyboardInterrupt:
            log.info("Interrupt received, shutting down..")
            return True
        except Exception:
            log.exception("Error reading from Telegram updates stream:")
        finally:
            log.debug("Triggering disconnect callback")
            self.disconnect_callback()

    def _handle_message(self, message: Message) -> None:
        """
        Handle a received message.

        :param message:
            A message with a structure as defined at
            https://core.telegram.org/bots/api#message
        """
        if message.text is None:
            log.warning("Unhandled message type (not a text message) ignored")
            return

        message_instance = self.build_message(message.text)
        if message.chat["type"] == "private":
            message_instance.frm = TelegramPerson(
                id=message.from_user.id,
                first_name=message.from_user.first_name,
                last_name=message.from_user.last_name,
                username=message.from_user.username,
            )
            message_instance.to = self.bot_identifier
        else:
            room = TelegramRoom(id=message.chat.id, title=message.chat.title)
            message_instance.frm = TelegramMUCOccupant(
                id=message.from_user.id,
                room=room,
                first_name=message.from_user.first_name,
                last_name=message.from_user.last_name,
                username=message.from_user.username,
            )
            message_instance.to = room
        message_instance.extras["message_id"] = message.message_id
        self.callback_message(message_instance)

    def send_message(self, msg: Message) -> None:
        super().send_message(msg)
        body = self.md_converter.convert(msg.body)
        try:
            self.telegram.sendMessage(msg.to.id, body)
        except Exception:
            log.exception(
                f"An exception occurred while trying to send the following message to {msg.to.id}: {msg.body}"
            )
            raise

    def change_presence(self, status: str = ONLINE, message: str = "") -> None:
        # It looks like telegram doesn't supports online presence for privacy reason.
        pass

    def build_identifier(self, txtrep: str) -> Union[TelegramPerson, TelegramRoom]:
        """
        Convert a textual representation into a :class:`~TelegramPerson` or :class:`~TelegramRoom`.
        """
        log.debug("building an identifier from %s.", txtrep)
        if not self._is_numeric(txtrep):
            raise ValueError("Telegram identifiers must be numeric.")
        id_ = int(txtrep)
        if id_ > 0:
            return TelegramPerson(id=id_)
        else:
            return TelegramRoom(id=id_)

    def build_reply(
        self,
        msg: Message,
        text: Optional[str] = None,
        private: bool = False,
        threaded: bool = False,
    ) -> Message:
        response = self.build_message(text)
        response.frm = self.bot_identifier
        if private:
            response.to = msg.frm
        else:
            response.to = msg.frm if msg.is_direct else msg.to
        return response

    @property
    def mode(self) -> text:
        return "telegram"

    def query_room(self, room: TelegramRoom) -> None:
        """
        Not supported on Telegram.

        :raises: :class:`~RoomsNotSupportedError`
        """
        raise RoomsNotSupportedError()

    def rooms(self) -> None:
        """
        Not supported on Telegram.

        :raises: :class:`~RoomsNotSupportedError`
        """
        raise RoomsNotSupportedError()

    def prefix_groupchat_reply(self, message: Message, identifier: Identifier):
        super().prefix_groupchat_reply(message, identifier)
        message.body = f"@{identifier.nick}: {message.body}"

    def _telegram_special_message(
        self, chat_id: Any, content: Any, msg_type: str, **kwargs
    ) -> telegram.Message:
        """Send special message."""
        if msg_type == "document":
            msg = self.telegram.sendDocument(
                chat_id=chat_id, document=content, **kwargs
            )
        elif msg_type == "photo":
            msg = self.telegram.sendPhoto(chat_id=chat_id, photo=content, **kwargs)

        elif msg_type == "audio":
            msg = self.telegram.sendAudio(chat_id=chat_id, audio=content, **kwargs)

        elif msg_type == "video":
            msg = self.telegram.sendVideo(chat_id=chat_id, video=content, **kwargs)
        elif msg_type == "sticker":
            msg = self.telegram.sendSticker(chat_id=chat_id, sticker=content, **kwargs)
        elif msg_type == "location":
            msg = self.telegram.sendLocation(
                chat_id=chat_id,
                latitude=kwargs.pop("latitude", ""),
                longitude=kwargs.pop("longitude", ""),
                **kwargs,
            )
        else:
            raise ValueError(
                f"Expected a valid choice for `msg_type`, got: {msg_type}."
            )
        return msg

    def _telegram_upload_stream(self, stream: Stream, **kwargs) -> None:
        """Perform upload defined in a stream."""
        msg = None
        try:
            stream.accept()
            msg = self._telegram_special_message(
                chat_id=stream.identifier.id,
                content=stream.raw,
                msg_type=stream.stream_type,
                **kwargs,
            )
        except Exception:
            log.exception(f"Upload of {stream.name} to {stream.identifier} failed.")
        else:
            if msg is None:
                stream.error()
            else:
                stream.success()

    def send_stream_request(
        self,
        identifier: Union[TelegramPerson, TelegramMUCOccupant],
        fsource: Union[str, dict, BinaryIO],
        name: Optional[str] = "file",
        size: Optional[int] = None,
        stream_type: Optional[str] = None,
    ) -> Union[str, Stream]:
        """Starts a file transfer.

        :param identifier: TelegramPerson or TelegramMUCOccupant
            Identifier of the Person or Room to send the stream to.

        :param fsource: str, dict or binary data
            File URL or binary content from a local file.
            Optionally a dict with binary content plus metadata can be given.
            See `stream_type` for more details.

        :param name: str, optional
            Name of the file. Not sure if this works always.

        :param size: str, optional
            Size of the file obtained with os.path.getsize.
            This is only used for debug logging purposes.

        :param stream_type: str, optional
            Type of the stream. Choices: 'document', 'photo', 'audio', 'video', 'sticker', 'location'.

            If 'video', a dict is optional as {'content': fsource, 'duration': str}.
            If 'voice', a dict is optional as {'content': fsource, 'duration': str}.
            If 'audio', a dict is optional as {'content': fsource, 'duration': str, 'performer': str, 'title': str}.

            For 'location' a dict is mandatory as {'latitude': str, 'longitude': str}.
            For 'venue': TODO # see: https://core.telegram.org/bots/api#sendvenue

        :return stream: str or Stream
            If `fsource` is str will return str, else return Stream.
        """

        def _telegram_metadata(fsource):
            if isinstance(fsource, dict):
                return fsource.pop("content"), fsource
            else:
                return fsource, None

        def _is_valid_url(url) -> bool:
            try:
                from urlparse import urlparse
            except Exception:
                from urllib.parse import urlparse

            return bool(urlparse(url).scheme)

        content, meta = _telegram_metadata(fsource)
        if isinstance(content, str):
            if not _is_valid_url(content):
                raise ValueError(f"Not valid URL: {content}")

            self._telegram_special_message(
                chat_id=identifier.id, content=content, msg_type=stream_type, **meta
            )
            log.debug(
                "Requesting upload of %s to %s (size hint: %d, stream type: %s).",
                name,
                identifier.username,
                size,
                stream_type,
            )

            stream = content
        else:
            stream = Stream(identifier, content, name, size, stream_type)
            log.debug(
                "Requesting upload of %s to %s (size hint: %d, stream type: %s)",
                name,
                identifier,
                size,
                stream_type,
            )
            self.thread_pool.apply_async(self._telegram_upload_stream, (stream,))

        return stream

    @staticmethod
    def _is_numeric(input_) -> bool:
        """Return true if input is a number"""
        try:
            int(input_)
            return True
        except ValueError:
            return False