RasaHQ/rasa_core

View on GitHub
rasa/core/channels/facebook.py

Summary

Maintainability
A
1 hr
Test Coverage
import hashlib
import hmac
import logging
from typing import Text, List, Dict, Any, Callable, Awaitable

from fbmessenger import (MessengerClient, attachments)
from fbmessenger.elements import Text as FBText
from sanic import Blueprint, response

from rasa.core.channels.channel import UserMessage, OutputChannel, InputChannel

logger = logging.getLogger(__name__)


class Messenger:
    """Implement a fbmessenger to parse incoming webhooks and send msgs."""

    @classmethod
    def name(cls):
        return "facebook"

    def __init__(self,
                 page_access_token: Text,
                 on_new_message: Callable[[UserMessage], Awaitable[None]]
                 ) -> None:

        self.on_new_message = on_new_message
        self.client = MessengerClient(page_access_token)
        self.last_message = {}

    def get_user_id(self):
        return self.last_message['sender']['id']

    @staticmethod
    def _is_audio_message(message: Dict[Text, Any]) -> bool:
        """Check if the users message is a recorced voice message."""
        return (message.get('message') and
                message['message'].get('attachments') and
                message['message']['attachments'][0]['type'] == 'audio')

    @staticmethod
    def _is_user_message(message: Dict[Text, Any]) -> bool:
        """Check if the message is a message from the user"""
        return (message.get('message') and
                message['message'].get('text') and
                not message['message'].get("is_echo"))

    async def handle(self, payload):
        for entry in payload['entry']:
            for message in entry['messaging']:
                self.last_message = message
                if message.get('message'):
                    return await self.message(message)
                elif message.get('postback'):
                    return await self.postback(message)

    async def message(self, message: Dict[Text, Any]) -> None:
        """Handle an incoming event from the fb webhook."""

        if self._is_user_message(message):
            text = message['message']['text']
        elif self._is_audio_message(message):
            attachment = message['message']['attachments'][0]
            text = attachment['payload']['url']
        else:
            logger.warning("Received a message from facebook that we can not "
                           "handle. Message: {}".format(message))
            return

        await self._handle_user_message(text, self.get_user_id())

    async def postback(self, message: Dict[Text, Any]) -> None:
        """Handle a postback (e.g. quick reply button)."""

        text = message['postback']['payload']
        await self._handle_user_message(text, self.get_user_id())

    async def _handle_user_message(self, text: Text, sender_id: Text) -> None:
        """Pass on the text to the dialogue engine for processing."""

        out_channel = MessengerBot(self.client)
        user_msg = UserMessage(text, out_channel, sender_id,
                               input_channel=self.name())

        # noinspection PyBroadException
        try:
            await self.on_new_message(user_msg)
        except Exception:
            logger.exception("Exception when trying to handle webhook "
                             "for facebook message.")
            pass


class MessengerBot(OutputChannel):
    """A bot that uses fb-messenger to communicate."""

    @classmethod
    def name(cls):
        return "facebook"

    def __init__(self, messenger_client: MessengerClient) -> None:

        self.messenger_client = messenger_client
        super(MessengerBot, self).__init__()

    def send(self, recipient_id: Text, element: Any) -> None:
        """Sends a message to the recipient using the messenger client."""

        # this is a bit hacky, but the client doesn't have a proper API to
        # send messages but instead expects the incoming sender to be present
        # which we don't have as it is stored in the input channel.
        self.messenger_client.send(element.to_dict(),
                                   {"sender": {"id": recipient_id}},
                                   'RESPONSE')

    async def send_text_message(self, recipient_id: Text,
                                message: Text) -> None:
        """Send a message through this channel."""

        logger.info("Sending message: " + message)

        for message_part in message.split("\n\n"):
            self.send(recipient_id, FBText(text=message_part))

    async def send_image_url(self, recipient_id: Text, image_url: Text) -> None:
        """Sends an image. Default will just post the url as a string."""

        self.send(recipient_id, attachments.Image(url=image_url))

    async def send_text_with_buttons(self, recipient_id: Text, text: Text,
                                     buttons: List[Dict[Text, Any]],
                                     **kwargs: Any) -> None:
        """Sends buttons to the output."""

        # buttons is a list of tuples: [(option_name,payload)]
        if len(buttons) > 3:
            logger.warning(
                "Facebook API currently allows only up to 3 buttons. "
                "If you add more, all will be ignored.")
            await self.send_text_message(recipient_id, text)
        else:
            self._add_postback_info(buttons)

            # Currently there is no predefined way to create a message with
            # buttons in the fbmessenger framework - so we need to create the
            # payload on our own
            payload = {
                "attachment": {
                    "type": "template",
                    "payload": {
                        "template_type": "button",
                        "text": text,
                        "buttons": buttons
                    }
                }
            }
            self.messenger_client.send(payload,
                                       {"sender": {"id": recipient_id}},
                                       'RESPONSE')

    async def send_quick_replies(self,
                                 recipient_id: Text,
                                 text: Text,
                                 quick_replies: List[Dict[Text, Any]],
                                 **kwargs: Any) -> None:
        """Sends quick replies to the output."""

        self._add_text_info(quick_replies)
        self.send(recipient_id, FBText(text=text, quick_replies=quick_replies))

    async def send_custom_message(self, recipient_id: Text,
                                  elements: List[Dict[Text, Any]]) -> None:
        """Sends elements to the output."""

        for element in elements:
            self._add_postback_info(element['buttons'])

        payload = {
            "attachment": {
                "type": "template",
                "payload": {
                    "template_type": "generic",
                    "elements": elements
                }
            }
        }
        self.messenger_client.send(payload,
                                   self._recipient_json(recipient_id),
                                   'RESPONSE')

    @staticmethod
    def _add_text_info(quick_replies: List[Dict[Text, Any]]) -> None:
        """Set quick reply type to text for all buttons without content type.

        Happens in place."""

        for quick_reply in quick_replies:
            if not quick_reply.get('type'):
                quick_reply['content_type'] = "text"

    @staticmethod
    def _add_postback_info(buttons: List[Dict[Text, Any]]) -> None:
        """Make sure every button has a type. Modifications happen in place."""
        for button in buttons:
            if 'type' not in button:
                button['type'] = "postback"

    @staticmethod
    def _recipient_json(recipient_id: Text) -> Dict[Text, Dict[Text, Text]]:
        """Generate the response json for the recipient expected by FB."""
        return {"sender": {"id": recipient_id}}


class FacebookInput(InputChannel):
    """Facebook input channel implementation. Based on the HTTPInputChannel."""

    @classmethod
    def name(cls):
        return "facebook"

    @classmethod
    def from_credentials(cls, credentials):
        if not credentials:
            cls.raise_missing_credentials_exception()

        return cls(credentials.get("verify"),
                   credentials.get("secret"),
                   credentials.get("page-access-token"))

    def __init__(self, fb_verify: Text, fb_secret: Text,
                 fb_access_token: Text) -> None:
        """Create a facebook input channel.

        Needs a couple of settings to properly authenticate and validate
        messages. Details to setup:

        https://github.com/rehabstudio/fbmessenger#facebook-app-setup

        Args:
            fb_verify: FB Verification string
                (can be chosen by yourself on webhook creation)
            fb_secret: facebook application secret
            fb_access_token: access token to post in the name of the FB page
        """
        self.fb_verify = fb_verify
        self.fb_secret = fb_secret
        self.fb_access_token = fb_access_token

    def blueprint(self, on_new_message):

        fb_webhook = Blueprint('fb_webhook', __name__)

        @fb_webhook.route("/", methods=['GET'])
        async def health(request):
            return response.json({"status": "ok"})

        @fb_webhook.route("/webhook", methods=['GET'])
        async def token_verification(request):
            if request.raw_args.get("hub.verify_token") == self.fb_verify:
                return request.raw_args.get("hub.challenge")
            else:
                logger.warning(
                    "Invalid fb verify token! Make sure this matches "
                    "your webhook settings on the facebook app.")
                return response.text("failure, invalid token")

        @fb_webhook.route("/webhook", methods=['POST'])
        async def webhook(request):
            signature = request.headers.get("X-Hub-Signature") or ''
            if not self.validate_hub_signature(self.fb_secret, request.data,
                                               signature):
                logger.warning("Wrong fb secret! Make sure this matches the "
                               "secret in your facebook app settings")
                return response.text("not validated")

            messenger = Messenger(self.fb_access_token, on_new_message)

            await messenger.handle(request.json)
            return response.text("success")

        return fb_webhook

    @staticmethod
    def validate_hub_signature(app_secret, request_payload,
                               hub_signature_header):
        """Make sure the incoming webhook requests are properly signed.

        Args:
            app_secret: Secret Key for application
            request_payload: request body
            hub_signature_header: X-Hub-Signature header sent with request

        Returns:
            bool: indicated that hub signature is validated
        """

        # noinspection PyBroadException
        try:
            hash_method, hub_signature = hub_signature_header.split('=')
        except Exception:
            pass
        else:
            digest_module = getattr(hashlib, hash_method)
            hmac_object = hmac.new(
                bytearray(app_secret, 'utf8'),
                request_payload, digest_module)
            generated_hash = hmac_object.hexdigest()
            if hub_signature == generated_hash:
                return True
        return False