rasa/core/channels/botframework.py
# -*- coding: utf-8 -*-
import datetime
import json
import logging
import requests
from sanic import Blueprint, response
from sanic.request import Request
from typing import Text, Dict, Any
from rasa.core.channels.channel import UserMessage, OutputChannel, InputChannel
logger = logging.getLogger(__name__)
MICROSOFT_OAUTH2_URL = 'https://login.microsoftonline.com'
MICROSOFT_OAUTH2_PATH = 'botframework.com/oauth2/v2.0/token'
class BotFramework(OutputChannel):
"""A Microsoft Bot Framework communication channel."""
token_expiration_date = datetime.datetime.now()
headers = None
@classmethod
def name(cls):
return "botframework"
def __init__(self,
app_id: Text,
app_password: Text,
conversation: Dict[Text, Any],
bot_id: Text,
service_url: Text) -> None:
self.app_id = app_id
self.app_password = app_password
self.conversation = conversation
self.global_uri = "{}v3/".format(service_url)
self.bot_id = bot_id
async def _get_headers(self):
if BotFramework.token_expiration_date < datetime.datetime.now():
uri = "{}/{}".format(MICROSOFT_OAUTH2_URL, MICROSOFT_OAUTH2_PATH)
grant_type = 'client_credentials'
scope = 'https://api.botframework.com/.default'
payload = {'client_id': self.app_id,
'client_secret': self.app_password,
'grant_type': grant_type,
'scope': scope}
token_response = requests.post(uri, data=payload)
if token_response.ok:
token_data = token_response.json()
access_token = token_data['access_token']
token_expiration = token_data['expires_in']
BotFramework.token_expiration_date = \
datetime.datetime.now() + \
datetime.timedelta(seconds=int(token_expiration))
BotFramework.headers = {"content-type": "application/json",
"Authorization": "Bearer %s" %
access_token}
return BotFramework.headers
else:
logger.error('Could not get BotFramework token')
else:
return BotFramework.headers
async def send(self,
recipient_id: Text,
message_data: Dict[Text, Any]) -> None:
post_message_uri = ('{}conversations/{}/activities'
''.format(self.global_uri, self.conversation['id']))
data = {"type": "message",
"recipient": {
"id": recipient_id
},
"from": self.bot_id,
"channelData": {
"notification": {
"alert": "true"
}
},
"text": ""}
data.update(message_data)
headers = await self._get_headers()
send_response = requests.post(post_message_uri,
headers=headers,
data=json.dumps(data))
if not send_response.ok:
logger.error("Error trying to send botframework messge. "
"Response: %s", send_response.text)
async def send_text_message(self, recipient_id, message):
for message_part in message.split("\n\n"):
text_message = {"text": message_part}
await self.send(recipient_id, text_message)
async def send_image_url(self, recipient_id, image_url):
hero_content = {
'contentType': 'application/vnd.microsoft.card.hero',
'content': {
'images': [{'url': image_url}]
}
}
image_message = {"attachments": [hero_content]}
await self.send(recipient_id, image_message)
async def send_text_with_buttons(self, recipient_id, message, buttons,
**kwargs):
hero_content = {
'contentType': 'application/vnd.microsoft.card.hero',
'content': {
'subtitle': message,
'buttons': buttons
}
}
buttons_message = {"attachments": [hero_content]}
await self.send(recipient_id, buttons_message)
async def send_custom_message(self, recipient_id, elements):
await self.send(recipient_id, elements[0])
class BotFrameworkInput(InputChannel):
"""Bot Framework input channel implementation."""
@classmethod
def name(cls):
return "botframework"
@classmethod
def from_credentials(cls, credentials):
if not credentials:
cls.raise_missing_credentials_exception()
return cls(credentials.get("app_id"), credentials.get("app_password"))
def __init__(self, app_id: Text, app_password: Text) -> None:
"""Create a Bot Framework input channel.
Args:
app_id: Bot Framework's API id
app_password: Bot Framework application secret
"""
self.app_id = app_id
self.app_password = app_password
def blueprint(self, on_new_message):
botframework_webhook = Blueprint('botframework_webhook', __name__)
@botframework_webhook.route("/", methods=['GET'])
async def health(request):
return response.json({"status": "ok"})
@botframework_webhook.route("/webhook", methods=['POST'])
async def webhook(request: Request):
postdata = request.json
try:
if postdata["type"] == "message":
out_channel = BotFramework(self.app_id, self.app_password,
postdata["conversation"],
postdata["recipient"],
postdata["serviceUrl"])
user_msg = UserMessage(postdata["text"], out_channel,
postdata["from"]["id"],
input_channel=self.name())
await on_new_message(user_msg)
else:
logger.info("Not received message type")
except Exception as e:
logger.error("Exception when trying to handle "
"message.{0}".format(e))
logger.debug(e, exc_info=True)
pass
return response.text("success")
return botframework_webhook