RasaHQ/rasa_core

View on GitHub
rasa/core/channels/telegram.py

Summary

Maintainability
B
4 hrs
Test Coverage
import asyncio
import logging
from sanic import Blueprint, response
from telegram import (
    Bot, InlineKeyboardButton, Update, InlineKeyboardMarkup,
    KeyboardButton, ReplyKeyboardMarkup)

from rasa.core import constants
from rasa.core.channels import InputChannel
from rasa.core.channels.channel import UserMessage, OutputChannel
from rasa.core.constants import INTENT_MESSAGE_PREFIX, USER_INTENT_RESTART

logger = logging.getLogger(__name__)


class TelegramOutput(Bot, OutputChannel):
    """Output channel for Telegram"""

    @classmethod
    def name(cls):
        return "telegram"

    def __init__(self, access_token):
        super(TelegramOutput, self).__init__(access_token)

    async def send_text_message(self, recipient_id, message):
        for message_part in message.split("\n\n"):
            self.send_message(recipient_id, message_part)

    async def send_image_url(self, recipient_id, image_url):
        self.send_photo(recipient_id, image_url)

    async def send_text_with_buttons(self, recipient_id, text,
                                     buttons, button_type="inline", **kwargs):
        """Sends a message with keyboard.

        For more information: https://core.telegram.org/bots#keyboards

        :button_type inline: horizontal inline keyboard

        :button_type vertical: vertical inline keyboard

        :button_type custom: custom keyboard
        """
        if button_type == "inline":
            button_list = [[InlineKeyboardButton(s["title"],
                                                 callback_data=s["payload"])
                            for s in buttons]]
            reply_markup = InlineKeyboardMarkup(button_list)

        elif button_type == "vertical":
            button_list = [[InlineKeyboardButton(s["title"],
                                                 callback_data=s["payload"])]
                           for s in buttons]
            reply_markup = InlineKeyboardMarkup(button_list)

        elif button_type == "custom":
            button_list = []
            for bttn in buttons:
                if isinstance(bttn, list):
                    button_list.append([KeyboardButton(s['title'])
                                        for s in bttn])
                else:
                    button_list.append([KeyboardButton(bttn["title"])])
            reply_markup = ReplyKeyboardMarkup(button_list,
                                               resize_keyboard=True,
                                               one_time_keyboard=True)
        else:
            logger.error('Trying to send text with buttons for unknown '
                         'button type {}'.format(button_type))
            return

        self.send_message(recipient_id, text, reply_markup=reply_markup)


class TelegramInput(InputChannel):
    """Telegram input channel"""

    @classmethod
    def name(cls):
        return "telegram"

    @classmethod
    def from_credentials(cls, credentials):
        if not credentials:
            cls.raise_missing_credentials_exception()

        return cls(credentials.get("access_token"),
                   credentials.get("verify"),
                   credentials.get("webhook_url"))

    def __init__(self, access_token, verify, webhook_url, debug_mode=True):
        self.access_token = access_token
        self.verify = verify
        self.webhook_url = webhook_url
        self.debug_mode = debug_mode

    @staticmethod
    def _is_location(message):
        return message.location

    @staticmethod
    def _is_user_message(message):
        return message.text

    @staticmethod
    def _is_button(update):
        return update.callback_query

    def blueprint(self, on_new_message):
        telegram_webhook = Blueprint('telegram_webhook', __name__)
        out_channel = TelegramOutput(self.access_token)

        @telegram_webhook.route("/", methods=['GET'])
        async def health(request):
            return response.json({"status": "ok"})

        @telegram_webhook.route("/set_webhook", methods=['GET', 'POST'])
        async def set_webhook(request):
            s = out_channel.setWebhook(self.webhook_url)
            if s:
                logger.info("Webhook Setup Successful")
                return response.text("Webhook setup successful")
            else:
                logger.warning("Webhook Setup Failed")
                return response.text("Invalid webhook")

        @telegram_webhook.route("/webhook", methods=['GET', 'POST'])
        async def message(request):
            if request.method == 'POST':

                if not out_channel.get_me()['username'] == self.verify:
                    logger.debug("Invalid access token, check it "
                                 "matches Telegram")
                    return response.text("failed")

                update = Update.de_json(request.json, out_channel)
                if self._is_button(update):
                    msg = update.callback_query.message
                    text = update.callback_query.data
                else:
                    msg = update.message
                    if self._is_user_message(msg):
                        text = msg.text.replace('/bot', '')
                    elif self._is_location(msg):
                        text = ('{{"lng":{0}, "lat":{1}}}'
                                ''.format(msg.location.longitude,
                                          msg.location.latitude))
                    else:
                        return response.text("success")
                sender_id = msg.chat.id
                try:
                    if text == (INTENT_MESSAGE_PREFIX + USER_INTENT_RESTART):
                        await on_new_message(UserMessage(
                            text, out_channel, sender_id,
                            input_channel=self.name()))
                        await on_new_message(UserMessage(
                            '/start', out_channel, sender_id,
                            input_channel=self.name()))
                    else:
                        await on_new_message(UserMessage(
                            text, out_channel, sender_id,
                            input_channel=self.name()))
                except Exception as e:
                    logger.error("Exception when trying to handle "
                                 "message.{0}".format(e))
                    logger.debug(e, exc_info=True)
                    if self.debug_mode:
                        raise
                    pass

                return response.text("success")

        out_channel.setWebhook(self.webhook_url)
        return telegram_webhook