rasa/server.py
import asyncio
import concurrent.futures
import logging
import multiprocessing
import os
import traceback
from collections import defaultdict
from functools import reduce, wraps
from inspect import isawaitable
from pathlib import Path
from http import HTTPStatus
from typing import (
Any,
Callable,
DefaultDict,
List,
Optional,
Text,
Union,
Dict,
TYPE_CHECKING,
NoReturn,
Coroutine,
)
import aiohttp
import jsonschema
from sanic import Sanic, response
from sanic.request import Request
from sanic.response import HTTPResponse
from sanic_cors import CORS
from sanic_jwt import Initialize, exceptions
import rasa
import rasa.core.utils
from rasa.nlu.emulators.emulator import Emulator
import rasa.utils.common
import rasa.shared.utils.common
import rasa.shared.utils.io
import rasa.shared.utils.validation
import rasa.shared.nlu.training_data.schemas.data_schema
import rasa.utils.endpoints
import rasa.utils.io
from rasa.shared.core.training_data.story_writer.yaml_story_writer import (
YAMLStoryWriter,
)
from rasa.shared.importers.importer import TrainingDataImporter
from rasa.shared.nlu.training_data.formats import RasaYAMLReader
from rasa.core.constants import DEFAULT_RESPONSE_TIMEOUT
from rasa.constants import MINIMUM_COMPATIBLE_VERSION
from rasa.shared.constants import (
DOCS_URL_TRAINING_DATA,
DOCS_BASE_URL,
DEFAULT_SENDER_ID,
DEFAULT_MODELS_PATH,
TEST_STORIES_FILE_PREFIX,
)
from rasa.shared.core.domain import InvalidDomain, Domain
from rasa.core.agent import Agent
from rasa.core.channels.channel import (
CollectingOutputChannel,
OutputChannel,
UserMessage,
)
import rasa.shared.core.events
from rasa.shared.core.events import Event
from rasa.core.test import test
from rasa.utils.common import TempDirectoryPath, get_temp_dir_name
from rasa.shared.core.trackers import (
DialogueStateTracker,
EventVerbosity,
)
from rasa.core.utils import AvailableEndpoints
from rasa.nlu.emulators.no_emulator import NoEmulator
import rasa.nlu.test
from rasa.nlu.test import CVEvaluationResult
from rasa.shared.utils.schemas.events import EVENTS_SCHEMA
from rasa.utils.endpoints import EndpointConfig
if TYPE_CHECKING:
from ssl import SSLContext
from rasa.core.processor import MessageProcessor
from mypy_extensions import Arg, VarArg, KwArg
SanicResponse = Union[
response.HTTPResponse, Coroutine[Any, Any, response.HTTPResponse]
]
SanicView = Callable[
[Arg(Request, "request"), VarArg(), KwArg()],
Coroutine[Any, Any, SanicResponse],
]
logger = logging.getLogger(__name__)
JSON_CONTENT_TYPE = "application/json"
YAML_CONTENT_TYPE = "application/x-yaml"
OUTPUT_CHANNEL_QUERY_KEY = "output_channel"
USE_LATEST_INPUT_CHANNEL_AS_OUTPUT_CHANNEL = "latest"
EXECUTE_SIDE_EFFECTS_QUERY_KEY = "execute_side_effects"
class ErrorResponse(Exception):
"""Common exception to handle failing API requests."""
def __init__(
self,
status: Union[int, HTTPStatus],
reason: Text,
message: Text,
details: Any = None,
help_url: Optional[Text] = None,
) -> None:
"""Creates error.
Args:
status: The HTTP status code to return.
reason: Short summary of the error.
message: Detailed explanation of the error.
details: Additional details which describe the error. Must be serializable.
help_url: URL where users can get further help (e.g. docs).
"""
self.error_info = {
"version": rasa.__version__,
"status": "failure",
"message": message,
"reason": reason,
"details": details or {},
"help": help_url,
"code": status,
}
self.status = status
logger.error(message)
super(ErrorResponse, self).__init__()
def _docs(sub_url: Text) -> Text:
"""Create a url to a subpart of the docs."""
return DOCS_BASE_URL + sub_url
def ensure_loaded_agent(
app: Sanic, require_core_is_ready: bool = False
) -> Callable[[Callable], Callable[..., Any]]:
"""Wraps a request handler ensuring there is a loaded and usable agent.
Require the agent to have a loaded Core model if `require_core_is_ready` is
`True`.
"""
def decorator(f: Callable) -> Callable:
@wraps(f)
def decorated(*args: Any, **kwargs: Any) -> Any:
# noinspection PyUnresolvedReferences
if not app.ctx.agent or not app.ctx.agent.is_ready():
raise ErrorResponse(
HTTPStatus.CONFLICT,
"Conflict",
"No agent loaded. To continue processing, a "
"model of a trained agent needs to be loaded.",
help_url=_docs("/user-guide/configuring-http-api/"),
)
return f(*args, **kwargs)
return decorated
return decorator
def ensure_conversation_exists() -> Callable[["SanicView"], "SanicView"]:
"""Wraps a request handler ensuring the conversation exists."""
def decorator(f: "SanicView") -> "SanicView":
@wraps(f)
async def decorated(
request: Request, *args: Any, **kwargs: Any
) -> "SanicResponse":
conversation_id = kwargs["conversation_id"]
if await request.app.ctx.agent.tracker_store.exists(conversation_id):
return await f(request, *args, **kwargs)
else:
raise ErrorResponse(
HTTPStatus.NOT_FOUND, "Not found", "Conversation ID not found."
)
return decorated
return decorator
def requires_auth(
app: Sanic, token: Optional[Text] = None
) -> Callable[["SanicView"], "SanicView"]:
"""Wraps a request handler with token authentication."""
def decorator(f: "SanicView") -> "SanicView":
def conversation_id_from_args(args: Any, kwargs: Any) -> Optional[Text]:
argnames = rasa.shared.utils.common.arguments_of(f)
try:
sender_id_arg_idx = argnames.index("conversation_id")
if "conversation_id" in kwargs: # try to fetch from kwargs first
return kwargs["conversation_id"]
if sender_id_arg_idx < len(args):
return args[sender_id_arg_idx]
return None
except ValueError:
return None
async def sufficient_scope(
request: Request, *args: Any, **kwargs: Any
) -> Optional[bool]:
# This is a coroutine since `sanic-jwt==1.6`
jwt_data = await rasa.utils.common.call_potential_coroutine(
request.app.ctx.auth.extract_payload(request)
)
user = jwt_data.get("user", {})
username = user.get("username", None)
role = user.get("role", None)
if role == "admin":
return True
elif role == "user":
conversation_id = conversation_id_from_args(args, kwargs)
return conversation_id is not None and username == conversation_id
else:
return False
@wraps(f)
async def decorated(
request: Request, *args: Any, **kwargs: Any
) -> response.HTTPResponse:
provided = request.args.get("token", None)
# noinspection PyProtectedMember
if token is not None and provided == token:
result = f(request, *args, **kwargs)
return await result if isawaitable(result) else result
elif app.config.get(
"USE_JWT"
) and await rasa.utils.common.call_potential_coroutine(
# This is a coroutine since `sanic-jwt==1.6`
request.app.ctx.auth.is_authenticated(request)
):
if await sufficient_scope(request, *args, **kwargs):
result = f(request, *args, **kwargs)
return await result if isawaitable(result) else result
raise ErrorResponse(
HTTPStatus.FORBIDDEN,
"NotAuthorized",
"User has insufficient permissions.",
help_url=_docs(
"/user-guide/configuring-http-api/#security-considerations"
),
)
elif token is None and app.config.get("USE_JWT") is None:
# authentication is disabled
result = f(request, *args, **kwargs)
return await result if isawaitable(result) else result
raise ErrorResponse(
HTTPStatus.UNAUTHORIZED,
"NotAuthenticated",
"User is not authenticated.",
help_url=_docs(
"/user-guide/configuring-http-api/#security-considerations"
),
)
return decorated
return decorator
def event_verbosity_parameter(
request: Request, default_verbosity: EventVerbosity
) -> EventVerbosity:
"""Create `EventVerbosity` object using request params if present."""
event_verbosity_str = request.args.get(
"include_events", default_verbosity.name
).upper()
try:
return EventVerbosity[event_verbosity_str]
except KeyError:
enum_values = ", ".join([e.name for e in EventVerbosity])
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
"Invalid parameter value for 'include_events'. "
"Should be one of {}".format(enum_values),
{"parameter": "include_events", "in": "query"},
)
async def get_test_stories(
processor: "MessageProcessor",
conversation_id: Text,
until_time: Optional[float],
fetch_all_sessions: bool = False,
) -> Text:
"""Retrieves test stories from `processor` for all conversation sessions for
`conversation_id`.
Args:
processor: An instance of `MessageProcessor`.
conversation_id: Conversation ID to fetch stories for.
until_time: Timestamp up to which to include events.
fetch_all_sessions: Whether to fetch stories for all conversation sessions.
If `False`, only the last conversation session is retrieved.
Returns:
The stories for `conversation_id` in test format.
"""
if fetch_all_sessions:
trackers = await processor.get_trackers_for_all_conversation_sessions(
conversation_id
)
else:
trackers = [await processor.get_tracker(conversation_id)]
if until_time is not None:
trackers = [tracker.travel_back_in_time(until_time) for tracker in trackers]
# keep only non-empty trackers
trackers = [tracker for tracker in trackers if len(tracker.events)]
logger.debug(
f"Fetched trackers for {len(trackers)} conversation sessions "
f"for conversation ID {conversation_id}."
)
story_steps = []
more_than_one_story = len(trackers) > 1
for i, tracker in enumerate(trackers, 1):
tracker.sender_id = conversation_id
if more_than_one_story:
tracker.sender_id += f", story {i}"
story_steps += tracker.as_story().story_steps
return YAMLStoryWriter().dumps(story_steps, is_test_story=True)
async def update_conversation_with_events(
conversation_id: Text,
processor: "MessageProcessor",
domain: Domain,
events: List[Event],
) -> DialogueStateTracker:
"""Fetches or creates a tracker for `conversation_id` and appends `events` to it.
Args:
conversation_id: The ID of the conversation to update the tracker for.
processor: An instance of `MessageProcessor`.
domain: The domain associated with the current `Agent`.
events: The events to append to the tracker.
Returns:
The tracker for `conversation_id` with the updated events.
"""
if rasa.shared.core.events.do_events_begin_with_session_start(events):
tracker = await processor.get_tracker(conversation_id)
else:
tracker = await processor.fetch_tracker_with_initial_session(conversation_id)
for event in events:
tracker.update(event, domain)
return tracker
def validate_request_body(request: Request, error_message: Text) -> None:
"""Check if `request` has a body."""
if not request.body:
raise ErrorResponse(HTTPStatus.BAD_REQUEST, "BadRequest", error_message)
def validate_events_in_request_body(request: Request) -> None:
"""Validates events format in request body."""
if not isinstance(request.json, list):
events = [request.json]
else:
events = request.json
try:
jsonschema.validate(events, EVENTS_SCHEMA)
except jsonschema.ValidationError as error:
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
f"Failed to validate the events format. "
f"For more information about the format visit the docs. Error: {error}",
help_url=_docs("/pages/http-api"),
) from error
async def authenticate(_: Request) -> NoReturn:
"""Callback for authentication failed."""
raise exceptions.AuthenticationFailed(
"Direct JWT authentication not supported. You should already have "
"a valid JWT from an authentication provider, Rasa will just make "
"sure that the token is valid, but not issue new tokens."
)
def create_ssl_context(
ssl_certificate: Optional[Text],
ssl_keyfile: Optional[Text],
ssl_ca_file: Optional[Text] = None,
ssl_password: Optional[Text] = None,
) -> Optional["SSLContext"]:
"""Create an SSL context if a proper certificate is passed.
Args:
ssl_certificate: path to the SSL client certificate
ssl_keyfile: path to the SSL key file
ssl_ca_file: path to the SSL CA file for verification (optional)
ssl_password: SSL private key password (optional)
Returns:
SSL context if a valid certificate chain can be loaded, `None` otherwise.
"""
if ssl_certificate:
import ssl
ssl_context = ssl.create_default_context(
purpose=ssl.Purpose.CLIENT_AUTH, cafile=ssl_ca_file
)
ssl_context.load_cert_chain(
ssl_certificate, keyfile=ssl_keyfile, password=ssl_password
)
return ssl_context
else:
return None
def _create_emulator(mode: Optional[Text]) -> Emulator:
"""Create emulator for specified mode.
If no emulator is specified, we will use the Rasa NLU format.
"""
if mode is None:
return NoEmulator()
elif mode.lower() == "wit":
from rasa.nlu.emulators.wit import WitEmulator
return WitEmulator()
elif mode.lower() == "luis":
from rasa.nlu.emulators.luis import LUISEmulator
return LUISEmulator()
elif mode.lower() == "dialogflow":
from rasa.nlu.emulators.dialogflow import DialogflowEmulator
return DialogflowEmulator()
else:
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
"Invalid parameter value for 'emulation_mode'. "
"Should be one of 'WIT', 'LUIS', 'DIALOGFLOW'.",
{"parameter": "emulation_mode", "in": "query"},
)
async def _load_agent(
model_path: Optional[Text] = None,
model_server: Optional[EndpointConfig] = None,
remote_storage: Optional[Text] = None,
endpoints: Optional[AvailableEndpoints] = None,
) -> Agent:
try:
loaded_agent = await rasa.core.agent.load_agent(
model_path=model_path,
model_server=model_server,
remote_storage=remote_storage,
endpoints=endpoints,
)
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"LoadingError",
f"An unexpected error occurred. Error: {e}",
)
if not loaded_agent.is_ready():
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
f"Agent with name '{model_path}' could not be loaded.",
{"parameter": "model", "in": "query"},
)
return loaded_agent
def configure_cors(
app: Sanic, cors_origins: Union[Text, List[Text], None] = ""
) -> None:
"""Configure CORS origins for the given app."""
# Workaround so that socketio works with requests from other origins.
# https://github.com/miguelgrinberg/python-socketio/issues/205#issuecomment-493769183
app.config.CORS_AUTOMATIC_OPTIONS = True
app.config.CORS_SUPPORTS_CREDENTIALS = True
app.config.CORS_EXPOSE_HEADERS = "filename"
CORS(
app, resources={r"/*": {"origins": cors_origins or ""}}, automatic_options=True
)
def add_root_route(app: Sanic) -> None:
"""Add '/' route to return hello."""
@app.get("/")
async def hello(request: Request) -> HTTPResponse:
"""Check if the server is running and responds with the version."""
return response.text("Hello from Rasa: " + rasa.__version__)
def async_if_callback_url(f: Callable[..., Coroutine]) -> Callable:
"""Decorator to enable async request handling.
If the incoming HTTP request specified a `callback_url` query parameter, the request
will return immediately with a 204 while the actual request response will
be sent to the `callback_url`. If an error happens, the error payload will also
be sent to the `callback_url`.
Args:
f: The request handler function which should be decorated.
Returns:
The decorated function.
"""
@wraps(f)
async def decorated_function(
request: Request, *args: Any, **kwargs: Any
) -> HTTPResponse:
callback_url = request.args.get("callback_url")
# Only process request asynchronously if the user specified a `callback_url`
# query parameter.
if not callback_url:
return await f(request, *args, **kwargs)
async def wrapped() -> None:
try:
result: HTTPResponse = await f(request, *args, **kwargs)
payload: Dict[Text, Any] = dict(
data=result.body, headers={"Content-Type": result.content_type}
)
logger.debug(
"Asynchronous processing of request was successful. "
"Sending result to callback URL."
)
except Exception as e:
if not isinstance(e, ErrorResponse):
logger.error(e)
e = ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"UnexpectedError",
f"An unexpected error occurred. Error: {e}",
)
# If an error happens, we send the error payload to the `callback_url`
payload = dict(json=e.error_info)
logger.error(
"Error happened when processing request asynchronously. "
"Sending error to callback URL."
)
async with aiohttp.ClientSession() as session:
await session.post(callback_url, raise_for_status=True, **payload)
# Run the request in the background on the event loop
request.app.add_task(wrapped())
# The incoming request will return immediately with a 204
return response.empty()
return decorated_function
def run_in_thread(f: Callable[..., Coroutine]) -> Callable:
"""Decorator which runs request on a separate thread.
Some requests (e.g. training or cross-validation) are computional intense requests.
This means that they will block the event loop and hence the processing of other
requests. This decorator can be used to process these requests on a separate thread
to avoid blocking the processing of incoming requests.
Args:
f: The request handler function which should be decorated.
Returns:
The decorated function.
"""
@wraps(f)
async def decorated_function(
request: Request, *args: Any, **kwargs: Any
) -> HTTPResponse:
# Use a sync wrapper for our `async` function as `run_in_executor` only supports
# sync functions
def run() -> HTTPResponse:
return asyncio.run(f(request, *args, **kwargs))
with concurrent.futures.ThreadPoolExecutor() as pool:
return await request.app.loop.run_in_executor(pool, run)
return decorated_function
def inject_temp_dir(f: Callable[..., Coroutine]) -> Callable:
"""Decorator to inject a temporary directory before a request and clean up after.
Args:
f: The request handler function which should be decorated.
Returns:
The decorated function.
"""
@wraps(f)
async def decorated_function(*args: Any, **kwargs: Any) -> HTTPResponse:
with TempDirectoryPath(get_temp_dir_name()) as directory:
# Decorated request handles need to have a parameter `temporary_directory`
return await f(*args, temporary_directory=Path(directory), **kwargs)
return decorated_function
def create_app(
agent: Optional["Agent"] = None,
cors_origins: Union[Text, List[Text], None] = "*",
auth_token: Optional[Text] = None,
response_timeout: int = DEFAULT_RESPONSE_TIMEOUT,
jwt_secret: Optional[Text] = None,
jwt_private_key: Optional[Text] = None,
jwt_method: Text = "HS256",
endpoints: Optional[AvailableEndpoints] = None,
) -> Sanic:
"""Class representing a Rasa HTTP server."""
app = Sanic("rasa_server")
app.config.RESPONSE_TIMEOUT = response_timeout
configure_cors(app, cors_origins)
# Set up the Sanic-JWT extension
if jwt_secret and jwt_method:
# `sanic-jwt` depends on having an available event loop when making the call to
# `Initialize`. If there is none, the server startup will fail with
# `There is no current event loop in thread 'MainThread'`.
try:
_ = asyncio.get_running_loop()
except RuntimeError:
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
# since we only want to check signatures, we don't actually care
# about the JWT method and set the passed secret as either symmetric
# or asymmetric key. jwt lib will choose the right one based on method
app.config["USE_JWT"] = True
Initialize(
app,
secret=jwt_secret,
private_key=jwt_private_key,
authenticate=authenticate,
algorithm=jwt_method,
user_id="username",
)
app.ctx.agent = agent
# Initialize shared object of type unsigned int for tracking
# the number of active training processes
app.ctx.active_training_processes = multiprocessing.Value("I", 0)
@app.exception(ErrorResponse)
async def handle_error_response(
request: Request, exception: ErrorResponse
) -> HTTPResponse:
return response.json(exception.error_info, status=exception.status)
add_root_route(app)
@app.get("/version")
async def version(request: Request) -> HTTPResponse:
"""Respond with the version number of the installed Rasa."""
return response.json(
{
"version": rasa.__version__,
"minimum_compatible_version": MINIMUM_COMPATIBLE_VERSION,
}
)
@app.get("/status")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
async def status(request: Request) -> HTTPResponse:
"""Respond with the model name and the fingerprint of that model."""
return response.json(
{
"model_file": app.ctx.agent.processor.model_filename,
"model_id": app.ctx.agent.model_id,
"num_active_training_jobs": app.ctx.active_training_processes.value,
}
)
@app.get("/conversations/<conversation_id:path>/tracker")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
async def retrieve_tracker(request: Request, conversation_id: Text) -> HTTPResponse:
"""Get a dump of a conversation's tracker including its events."""
verbosity = event_verbosity_parameter(request, EventVerbosity.AFTER_RESTART)
until_time = rasa.utils.endpoints.float_arg(request, "until")
tracker = await app.ctx.agent.processor.fetch_full_tracker_with_initial_session(
conversation_id,
output_channel=CollectingOutputChannel(),
)
try:
if until_time is not None:
tracker = tracker.travel_back_in_time(until_time)
state = tracker.current_state(verbosity)
return response.json(state)
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"ConversationError",
f"An unexpected error occurred. Error: {e}",
)
@app.post("/conversations/<conversation_id:path>/tracker/events")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
async def append_events(request: Request, conversation_id: Text) -> HTTPResponse:
"""Append a list of events to the state of a conversation."""
validate_events_in_request_body(request)
verbosity = event_verbosity_parameter(request, EventVerbosity.AFTER_RESTART)
try:
async with app.ctx.agent.lock_store.lock(conversation_id):
processor = app.ctx.agent.processor
events = _get_events_from_request_body(request)
tracker = await update_conversation_with_events(
conversation_id, processor, app.ctx.agent.domain, events
)
output_channel = _get_output_channel(request, tracker)
if rasa.utils.endpoints.bool_arg(
request, EXECUTE_SIDE_EFFECTS_QUERY_KEY, False
):
await processor.execute_side_effects(
events, tracker, output_channel
)
await app.ctx.agent.tracker_store.save(tracker)
return response.json(tracker.current_state(verbosity))
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"ConversationError",
f"An unexpected error occurred. Error: {e}",
)
def _get_events_from_request_body(request: Request) -> List[Event]:
events = request.json
if not isinstance(events, list):
events = [events]
events = [Event.from_parameters(event) for event in events]
events = [event for event in events if event]
if not events:
rasa.shared.utils.io.raise_warning(
f"Append event called, but could not extract a valid event. "
f"Request JSON: {request.json}"
)
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
"Couldn't extract a proper event from the request body.",
{"parameter": "", "in": "body"},
)
return events
@app.put("/conversations/<conversation_id:path>/tracker/events")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
async def replace_events(request: Request, conversation_id: Text) -> HTTPResponse:
"""Use a list of events to set a conversations tracker to a state."""
validate_events_in_request_body(request)
verbosity = event_verbosity_parameter(request, EventVerbosity.AFTER_RESTART)
try:
async with app.ctx.agent.lock_store.lock(conversation_id):
tracker = DialogueStateTracker.from_dict(
conversation_id, request.json, app.ctx.agent.domain.slots
)
# will override an existing tracker with the same id!
await app.ctx.agent.tracker_store.save(tracker)
return response.json(tracker.current_state(verbosity))
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"ConversationError",
f"An unexpected error occurred. Error: {e}",
)
@app.get("/conversations/<conversation_id:path>/story")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
@ensure_conversation_exists()
async def retrieve_story(request: Request, conversation_id: Text) -> HTTPResponse:
"""Get an end-to-end story corresponding to this conversation."""
until_time = rasa.utils.endpoints.float_arg(request, "until")
fetch_all_sessions = rasa.utils.endpoints.bool_arg(
request, "all_sessions", default=False
)
try:
stories = await get_test_stories(
app.ctx.agent.processor,
conversation_id,
until_time,
fetch_all_sessions=fetch_all_sessions,
)
return response.text(stories)
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"ConversationError",
f"An unexpected error occurred. Error: {e}",
)
@app.post("/conversations/<conversation_id:path>/execute")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
@ensure_conversation_exists()
async def execute_action(request: Request, conversation_id: Text) -> HTTPResponse:
rasa.shared.utils.io.raise_warning(
'The "POST /conversations/<conversation_id>/execute"'
" endpoint is deprecated. Inserting actions to the tracker externally"
" should be avoided. Actions should be predicted by the policies only.",
category=FutureWarning,
)
request_params = request.json
action_to_execute = request_params.get("name", None)
if not action_to_execute:
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
"Name of the action not provided in request body.",
{"parameter": "name", "in": "body"},
)
policy = request_params.get("policy", None)
confidence = request_params.get("confidence", None)
verbosity = event_verbosity_parameter(request, EventVerbosity.AFTER_RESTART)
try:
async with app.ctx.agent.lock_store.lock(conversation_id):
tracker = await (
app.ctx.agent.processor.fetch_tracker_and_update_session(
conversation_id
)
)
output_channel = _get_output_channel(request, tracker)
await app.ctx.agent.execute_action(
conversation_id,
action_to_execute,
output_channel,
policy,
confidence,
)
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"ConversationError",
f"An unexpected error occurred. Error: {e}",
)
state = tracker.current_state(verbosity)
response_body: Dict[Text, Any] = {"tracker": state}
if isinstance(output_channel, CollectingOutputChannel):
response_body["messages"] = output_channel.messages
return response.json(response_body)
@app.post("/conversations/<conversation_id:path>/trigger_intent")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
async def trigger_intent(request: Request, conversation_id: Text) -> HTTPResponse:
request_params = request.json
intent_to_trigger = request_params.get("name")
entities = request_params.get("entities", [])
if not intent_to_trigger:
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
"Name of the intent not provided in request body.",
{"parameter": "name", "in": "body"},
)
verbosity = event_verbosity_parameter(request, EventVerbosity.AFTER_RESTART)
try:
async with app.ctx.agent.lock_store.lock(conversation_id):
tracker = await (
app.ctx.agent.processor.fetch_tracker_and_update_session(
conversation_id
)
)
output_channel = _get_output_channel(request, tracker)
if intent_to_trigger not in app.ctx.agent.domain.intents:
raise ErrorResponse(
HTTPStatus.NOT_FOUND,
"NotFound",
f"The intent {trigger_intent} does not exist in the domain.",
)
await app.ctx.agent.trigger_intent(
intent_name=intent_to_trigger,
entities=entities,
output_channel=output_channel,
tracker=tracker,
)
except ErrorResponse:
raise
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"ConversationError",
f"An unexpected error occurred. Error: {e}",
)
state = tracker.current_state(verbosity)
response_body: Dict[Text, Any] = {"tracker": state}
if isinstance(output_channel, CollectingOutputChannel):
response_body["messages"] = output_channel.messages
return response.json(response_body)
@app.post("/conversations/<conversation_id:path>/predict")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
@ensure_conversation_exists()
async def predict(request: Request, conversation_id: Text) -> HTTPResponse:
try:
# Fetches the appropriate bot response in a json format
responses = await app.ctx.agent.predict_next_for_sender_id(conversation_id)
responses["scores"] = sorted(
responses["scores"], key=lambda k: (-k["score"], k["action"])
)
return response.json(responses)
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"ConversationError",
f"An unexpected error occurred. Error: {e}",
)
@app.post("/conversations/<conversation_id:path>/messages")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
async def add_message(request: Request, conversation_id: Text) -> HTTPResponse:
validate_request_body(
request,
"No message defined in request body. Add a message to the request body in "
"order to add it to the tracker.",
)
request_params = request.json
message = request_params.get("text")
sender = request_params.get("sender")
parse_data = request_params.get("parse_data")
verbosity = event_verbosity_parameter(request, EventVerbosity.AFTER_RESTART)
# TODO: implement for agent / bot
if sender != "user":
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
"Currently, only user messages can be passed to this endpoint. "
"Messages of sender '{}' cannot be handled.".format(sender),
{"parameter": "sender", "in": "body"},
)
user_message = UserMessage(message, None, conversation_id, parse_data)
try:
async with app.ctx.agent.lock_store.lock(conversation_id):
# cf. processor.handle_message (ignoring prediction loop run)
tracker = await app.ctx.agent.processor.log_message(
user_message, should_save_tracker=False
)
tracker = await app.ctx.agent.processor.run_action_extract_slots(
user_message.output_channel, tracker
)
await app.ctx.agent.processor.save_tracker(tracker)
return response.json(tracker.current_state(verbosity))
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"ConversationError",
f"An unexpected error occurred. Error: {e}",
)
@app.post("/model/train")
@requires_auth(app, auth_token)
@async_if_callback_url
@run_in_thread
@inject_temp_dir
async def train(request: Request, temporary_directory: Path) -> HTTPResponse:
validate_request_body(
request,
"You must provide training data in the request body in order to "
"train your model.",
)
training_payload = _training_payload_from_yaml(request, temporary_directory)
try:
with app.ctx.active_training_processes.get_lock():
app.ctx.active_training_processes.value += 1
from rasa.model_training import train
# pass `None` to run in default executor
training_result = train(**training_payload)
if training_result.model:
filename = os.path.basename(training_result.model)
return await response.file(
training_result.model,
filename=filename,
headers={"filename": filename},
)
else:
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"TrainingError",
"Ran training, but it finished without a trained model.",
)
except ErrorResponse as e:
raise e
except InvalidDomain as e:
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"InvalidDomainError",
f"Provided domain file is invalid. Error: {e}",
)
except Exception as e:
logger.error(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"TrainingError",
f"An unexpected error occurred during training. Error: {e}",
)
finally:
with app.ctx.active_training_processes.get_lock():
app.ctx.active_training_processes.value -= 1
@app.post("/model/test/stories")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app, require_core_is_ready=True)
@inject_temp_dir
async def evaluate_stories(
request: Request, temporary_directory: Path
) -> HTTPResponse:
"""Evaluate stories against the currently loaded model."""
validate_request_body(
request,
"You must provide some stories in the request body in order to "
"evaluate your model.",
)
test_data = _test_data_file_from_payload(request, temporary_directory)
e2e = rasa.utils.endpoints.bool_arg(request, "e2e", default=False)
try:
evaluation = await test(
test_data, app.ctx.agent, e2e=e2e, disable_plotting=True
)
return response.json(evaluation)
except Exception as e:
logger.error(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"TestingError",
f"An unexpected error occurred during evaluation. Error: {e}",
)
@app.post("/model/test/intents")
@requires_auth(app, auth_token)
@async_if_callback_url
@run_in_thread
@inject_temp_dir
async def evaluate_intents(
request: Request, temporary_directory: Path
) -> HTTPResponse:
"""Evaluate intents against a Rasa model."""
validate_request_body(
request,
"You must provide some nlu data in the request body in order to "
"evaluate your model.",
)
cross_validation_folds = request.args.get("cross_validation_folds")
is_yaml_payload = request.headers.get("Content-type") == YAML_CONTENT_TYPE
test_coroutine = None
if is_yaml_payload:
payload = _training_payload_from_yaml(request, temporary_directory)
config_file = payload.get("config")
test_data = payload.get("training_files")
if cross_validation_folds:
test_coroutine = _cross_validate(
test_data, config_file, int(cross_validation_folds)
)
else:
payload = _nlu_training_payload_from_json(request, temporary_directory)
test_data = payload.get("training_files")
if cross_validation_folds:
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"TestingError",
"Cross-validation is only supported for YAML data.",
)
if not cross_validation_folds:
test_coroutine = _evaluate_model_using_test_set(
request.args.get("model"), test_data
)
try:
if test_coroutine is not None:
evaluation = await test_coroutine
return response.json(evaluation)
except Exception as e:
logger.error(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"TestingError",
f"An unexpected error occurred during evaluation. Error: {e}",
)
async def _evaluate_model_using_test_set(
model_path: Text, test_data_file: Text
) -> Dict:
logger.info("Starting model evaluation using test set.")
eval_agent = app.ctx.agent
if model_path:
model_server = app.ctx.agent.model_server
if model_server is not None:
model_server = model_server.copy()
model_server.url = model_path
# Set wait time between pulls to `0` so that the agent does not schedule
# a job to pull the model from the server
model_server.kwargs["wait_time_between_pulls"] = 0
eval_agent = await _load_agent(
model_path=model_path,
model_server=model_server,
remote_storage=app.ctx.agent.remote_storage,
)
data_path = os.path.abspath(test_data_file)
if not eval_agent.is_ready():
raise ErrorResponse(
HTTPStatus.CONFLICT, "Conflict", "Loaded model file not found."
)
return await rasa.nlu.test.run_evaluation(
data_path, eval_agent.processor, disable_plotting=True, report_as_dict=True
)
async def _cross_validate(data_file: Text, config_file: Text, folds: int) -> Dict:
logger.info(f"Starting cross-validation with {folds} folds.")
importer = TrainingDataImporter.load_from_dict(
config=None, config_path=config_file, training_data_paths=[data_file]
)
config = importer.get_config()
nlu_data = importer.get_nlu_data()
evaluations = await rasa.nlu.test.cross_validate(
data=nlu_data,
n_folds=folds,
nlu_config=config,
disable_plotting=True,
errors=True,
report_as_dict=True,
)
evaluation_results = _get_evaluation_results(*evaluations)
return evaluation_results
def _get_evaluation_results(
intent_report: CVEvaluationResult,
entity_report: CVEvaluationResult,
response_selector_report: CVEvaluationResult,
) -> Dict[Text, Any]:
eval_name_mapping = {
"intent_evaluation": intent_report,
"entity_evaluation": entity_report,
"response_selection_evaluation": response_selector_report,
}
result: DefaultDict[Text, Any] = defaultdict(dict)
for evaluation_name, evaluation in eval_name_mapping.items():
report = evaluation.evaluation.get("report", {})
averages = report.get("weighted avg", {})
result[evaluation_name]["report"] = report
result[evaluation_name]["precision"] = averages.get("precision")
result[evaluation_name]["f1_score"] = averages.get("1-score")
result[evaluation_name]["errors"] = evaluation.evaluation.get("errors", [])
return result
@app.post("/model/predict")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app, require_core_is_ready=True)
async def tracker_predict(request: Request) -> HTTPResponse:
"""Given a list of events, predicts the next action."""
validate_events_in_request_body(request)
verbosity = event_verbosity_parameter(request, EventVerbosity.AFTER_RESTART)
request_params = request.json
try:
tracker = DialogueStateTracker.from_dict(
DEFAULT_SENDER_ID, request_params, app.ctx.agent.domain.slots
)
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
f"Supplied events are not valid. {e}",
{"parameter": "", "in": "body"},
)
try:
result = app.ctx.agent.predict_next_with_tracker(tracker, verbosity)
return response.json(result)
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"PredictionError",
f"An unexpected error occurred. Error: {e}",
)
@app.post("/model/parse")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
async def parse(request: Request) -> HTTPResponse:
validate_request_body(
request,
"No text message defined in request_body. Add text message to request body "
"in order to obtain the intent and extracted entities.",
)
emulation_mode = request.args.get("emulation_mode")
emulator = _create_emulator(emulation_mode)
try:
data = emulator.normalise_request_json(request.json)
try:
parsed_data = await app.ctx.agent.parse_message(data.get("text"))
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"ParsingError",
f"An unexpected error occurred. Error: {e}",
)
response_data = emulator.normalise_response_json(parsed_data)
return response.json(response_data)
except Exception as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.INTERNAL_SERVER_ERROR,
"ParsingError",
f"An unexpected error occurred. Error: {e}",
)
@app.put("/model")
@requires_auth(app, auth_token)
async def load_model(request: Request) -> HTTPResponse:
validate_request_body(request, "No path to model file defined in request_body.")
model_path = request.json.get("model_file", None)
model_server = request.json.get("model_server", None)
remote_storage = request.json.get("remote_storage", None)
if model_server:
try:
model_server = EndpointConfig.from_dict(model_server)
except TypeError as e:
logger.debug(traceback.format_exc())
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
f"Supplied 'model_server' is not valid. Error: {e}",
{"parameter": "model_server", "in": "body"},
)
new_agent = await _load_agent(
model_path=model_path,
model_server=model_server,
remote_storage=remote_storage,
endpoints=endpoints,
)
new_agent.lock_store = app.ctx.agent.lock_store
app.ctx.agent = new_agent
logger.debug(f"Successfully loaded model '{model_path}'.")
return response.json(None, status=HTTPStatus.NO_CONTENT)
@app.delete("/model")
@requires_auth(app, auth_token)
async def unload_model(request: Request) -> HTTPResponse:
model_file = app.ctx.agent.model_name
app.ctx.agent = Agent(lock_store=app.ctx.agent.lock_store)
logger.debug(f"Successfully unloaded model '{model_file}'.")
return response.json(None, status=HTTPStatus.NO_CONTENT)
@app.get("/domain")
@requires_auth(app, auth_token)
@ensure_loaded_agent(app)
async def get_domain(request: Request) -> HTTPResponse:
"""Get current domain in yaml or json format."""
# FIXME: this is a false positive mypy error after upgrading to 0.931
accepts = request.headers.get("Accept", default=JSON_CONTENT_TYPE)
if accepts.endswith("json"):
domain = app.ctx.agent.domain.as_dict()
return response.json(domain)
elif accepts.endswith("yml") or accepts.endswith("yaml"):
domain_yaml = app.ctx.agent.domain.as_yaml()
return response.text(
domain_yaml, status=HTTPStatus.OK, content_type=YAML_CONTENT_TYPE
)
else:
raise ErrorResponse(
HTTPStatus.NOT_ACCEPTABLE,
"NotAcceptable",
f"Invalid Accept header. Domain can be "
f"provided as "
f'json ("Accept: {JSON_CONTENT_TYPE}") or'
f'yml ("Accept: {YAML_CONTENT_TYPE}"). '
f"Make sure you've set the appropriate Accept "
f"header.",
)
return app
def _get_output_channel(
request: Request, tracker: Optional[DialogueStateTracker]
) -> OutputChannel:
"""Returns the `OutputChannel` which should be used for the bot's responses.
Args:
request: HTTP request whose query parameters can specify which `OutputChannel`
should be used.
tracker: Tracker for the conversation. Used to get the latest input channel.
Returns:
`OutputChannel` which should be used to return the bot's responses to.
"""
requested_output_channel = request.args.get(OUTPUT_CHANNEL_QUERY_KEY)
if (
requested_output_channel == USE_LATEST_INPUT_CHANNEL_AS_OUTPUT_CHANNEL
and tracker
):
requested_output_channel = tracker.get_latest_input_channel()
# Interactive training does not set `input_channels`, hence we have to be cautious
registered_input_channels = getattr(request.app.ctx, "input_channels", None) or []
matching_channels = [
channel
for channel in registered_input_channels
if channel.name() == requested_output_channel
]
# Check if matching channels can provide a valid output channel,
# otherwise use `CollectingOutputChannel`
return reduce(
lambda output_channel_created_so_far, input_channel: (
input_channel.get_output_channel() or output_channel_created_so_far
),
matching_channels,
CollectingOutputChannel(),
)
def _test_data_file_from_payload(request: Request, temporary_directory: Path) -> Text:
return str(
_training_payload_from_yaml(
request,
temporary_directory,
# test stories have to prefixed with `test_`
file_name=f"{TEST_STORIES_FILE_PREFIX}data.yml",
)["training_files"]
)
def _training_payload_from_yaml(
request: Request, temp_dir: Path, file_name: Text = "data.yml"
) -> Dict[Text, Any]:
logger.debug("Extracting YAML training data from request body.")
decoded = request.body.decode(rasa.shared.utils.io.DEFAULT_ENCODING)
_validate_yaml_training_payload(decoded)
training_data = temp_dir / file_name
rasa.shared.utils.io.write_text_file(decoded, training_data)
model_output_directory = str(temp_dir)
if rasa.utils.endpoints.bool_arg(request, "save_to_default_model_directory", True):
model_output_directory = DEFAULT_MODELS_PATH
return dict(
domain=str(training_data),
config=str(training_data),
training_files=str(temp_dir),
output=model_output_directory,
force_training=rasa.utils.endpoints.bool_arg(request, "force_training", False),
core_additional_arguments=_extract_core_additional_arguments(request),
nlu_additional_arguments=_extract_nlu_additional_arguments(request),
)
def _nlu_training_payload_from_json(
request: Request, temp_dir: Path, file_name: Text = "data.json"
) -> Dict[Text, Any]:
logger.debug("Extracting JSON training data from request body.")
rasa.shared.utils.validation.validate_training_data(
request.json,
rasa.shared.nlu.training_data.schemas.data_schema.rasa_nlu_data_schema(),
)
training_data = temp_dir / file_name
rasa.shared.utils.io.dump_obj_as_json_to_file(training_data, request.json)
model_output_directory = str(temp_dir)
if rasa.utils.endpoints.bool_arg(request, "save_to_default_model_directory", True):
model_output_directory = DEFAULT_MODELS_PATH
return dict(
domain=str(training_data),
config=str(training_data),
training_files=str(temp_dir),
output=model_output_directory,
force_training=rasa.utils.endpoints.bool_arg(request, "force_training", False),
core_additional_arguments=_extract_core_additional_arguments(request),
nlu_additional_arguments=_extract_nlu_additional_arguments(request),
)
def _validate_yaml_training_payload(yaml_text: Text) -> None:
try:
RasaYAMLReader().validate(yaml_text)
except Exception as e:
raise ErrorResponse(
HTTPStatus.BAD_REQUEST,
"BadRequest",
f"The request body does not contain valid YAML. Error: {e}",
help_url=DOCS_URL_TRAINING_DATA,
)
def _extract_core_additional_arguments(request: Request) -> Dict[Text, Any]:
return {
"augmentation_factor": rasa.utils.endpoints.int_arg(request, "augmentation", 50)
}
def _extract_nlu_additional_arguments(request: Request) -> Dict[Text, Any]:
return {"num_threads": rasa.utils.endpoints.int_arg(request, "num_threads", 1)}