rasa/core/training/interactive.py

Summary

Maintainability
F
3 days
Test Coverage
F
58%
import asyncio
import logging
import os
import textwrap
import uuid
from functools import partial
from multiprocessing import Process
from typing import (
    Any,
    Callable,
    Deque,
    Dict,
    List,
    Optional,
    Text,
    Tuple,
    Union,
    Set,
    cast,
)

from sanic import Sanic, response
from sanic.exceptions import NotFound
from sanic.request import Request
from sanic.response import HTTPResponse
from terminaltables import AsciiTable, SingleTable
import terminaltables.width_and_alignment
import numpy as np
from aiohttp import ClientError
from colorclass import Color
import questionary
from questionary import Choice, Form, Question

from rasa import telemetry
import rasa.shared.utils.cli
import rasa.shared.utils.io
import rasa.cli.utils
import rasa.shared.data
from rasa.shared.nlu.constants import TEXT, INTENT_NAME_KEY
from rasa.shared.nlu.training_data.loading import RASA, RASA_YAML
from rasa.shared.core.constants import (
    USER_INTENT_RESTART,
    ACTION_LISTEN_NAME,
    LOOP_NAME,
    ACTIVE_LOOP,
    LOOP_REJECTED,
    REQUESTED_SLOT,
    LOOP_INTERRUPTED,
    ACTION_UNLIKELY_INTENT_NAME,
)
from rasa.core import run, utils
import rasa.core.train
from rasa.core.constants import DEFAULT_SERVER_FORMAT, DEFAULT_SERVER_PORT
from rasa.shared.core.domain import (
    Domain,
    KEY_INTENTS,
    KEY_ENTITIES,
    KEY_RESPONSES,
    KEY_ACTIONS,
    KEY_RESPONSES_TEXT,
)
import rasa.shared.core.events
from rasa.shared.core.events import (
    ActionExecuted,
    ActionReverted,
    BotUttered,
    Event,
    Restarted,
    UserUttered,
    UserUtteranceReverted,
)
from rasa.shared.constants import (
    INTENT_MESSAGE_PREFIX,
    DEFAULT_SENDER_ID,
    UTTER_PREFIX,
    DOCS_URL_POLICIES,
)
from rasa.shared.core.trackers import EventVerbosity, DialogueStateTracker
from rasa.shared.core.training_data import visualization
from rasa.shared.core.training_data.visualization import (
    VISUALIZATION_TEMPLATE_PATH,
    visualize_neighborhood,
)
from rasa.core.utils import AvailableEndpoints
from rasa.shared.importers.rasa import TrainingDataImporter
from rasa.utils.common import update_sanic_log_level
from rasa.utils.endpoints import EndpointConfig
from rasa.shared.exceptions import InvalidConfigException

# noinspection PyProtectedMember
from rasa.shared.nlu.training_data import loading
from rasa.shared.nlu.training_data.message import Message

# WARNING: This command line UI is using an external library
# communicating with the shell - these functions are hard to test
# automatically. If you change anything in here, please make sure to
# run the interactive learning and check if your part of the "ui"
# still works.
import rasa.utils.io as io_utils

from rasa.shared.core.generator import TrackerWithCachedStates

logger = logging.getLogger(__name__)

PATHS = {
    "stories": "data/stories.yml",
    "nlu": "data/nlu.yml",
    "backup": "data/nlu_interactive.yml",
    "domain": "domain.yml",
}

SAVE_IN_E2E = False

# choose other intent, making sure this doesn't clash with an existing intent
OTHER_INTENT = uuid.uuid4().hex
OTHER_ACTION = uuid.uuid4().hex
NEW_ACTION = uuid.uuid4().hex

NEW_RESPONSES: Dict[Text, List[Dict[Text, Any]]] = {}

MAX_NUMBER_OF_TRAINING_STORIES_FOR_VISUALIZATION = 200

DEFAULT_STORY_GRAPH_FILE = "story_graph.dot"


class RestartConversation(Exception):
    """Exception used to break out the flow and restart the conversation."""

    pass


class ForkTracker(Exception):
    """Exception used to break out the flow and fork at a previous step.

    The tracker will be reset to the selected point in the past and the
    conversation will continue from there.
    """

    pass


class UndoLastStep(Exception):
    """Exception used to break out the flow and undo the last step.

    The last step is either the most recent user message or the most
    recent action run by the bot.
    """

    pass


class Abort(Exception):
    """Exception used to abort the interactive learning and exit."""

    pass


async def send_message(
    endpoint: EndpointConfig,
    conversation_id: Text,
    message: Text,
    parse_data: Optional[Dict[Text, Any]] = None,
) -> Optional[Any]:
    """Send a user message to a conversation."""
    payload = {
        "sender": UserUttered.type_name,
        "text": message,
        "parse_data": parse_data,
    }

    return await endpoint.request(
        json=payload,
        method="post",
        subpath=f"/conversations/{conversation_id}/messages",
    )


async def request_prediction(
    endpoint: EndpointConfig, conversation_id: Text
) -> Optional[Any]:
    """Request the next action prediction from core."""
    return await endpoint.request(
        method="post", subpath=f"/conversations/{conversation_id}/predict"
    )


async def retrieve_domain(endpoint: EndpointConfig) -> Optional[Any]:
    """Retrieve the domain from core."""
    return await endpoint.request(
        method="get", subpath="/domain", headers={"Accept": "application/json"}
    )


async def retrieve_status(endpoint: EndpointConfig) -> Optional[Any]:
    """Retrieve the status from core."""
    return await endpoint.request(method="get", subpath="/status")


async def retrieve_tracker(
    endpoint: EndpointConfig,
    conversation_id: Text,
    verbosity: EventVerbosity = EventVerbosity.ALL,
) -> Dict[Text, Any]:
    """Retrieve a tracker from core."""
    path = f"/conversations/{conversation_id}/tracker?include_events={verbosity.name}"
    result = await endpoint.request(
        method="get", subpath=path, headers={"Accept": "application/json"}
    )

    # If the request wasn't successful the previous call had already raised. Hence,
    # we can be sure we have the tracker in the right format.
    return cast(Dict[Text, Any], result)


async def send_action(
    endpoint: EndpointConfig,
    conversation_id: Text,
    action_name: Text,
    policy: Optional[Text] = None,
    confidence: Optional[float] = None,
    is_new_action: bool = False,
) -> Optional[Any]:
    """Log an action to a conversation."""
    payload = ActionExecuted(action_name, policy, confidence).as_dict()

    subpath = f"/conversations/{conversation_id}/execute"

    try:
        return await endpoint.request(json=payload, method="post", subpath=subpath)
    except ClientError:
        if is_new_action:
            if action_name in NEW_RESPONSES:
                warning_questions = questionary.confirm(
                    f"WARNING: You have created a new action: '{action_name}', "
                    f"with matching response: "
                    f"'{NEW_RESPONSES[action_name][0][KEY_RESPONSES_TEXT]}'. "
                    f"This action will not return its message in this session, "
                    f"but the new response will be saved to your domain file "
                    f"when you exit and save this session. "
                    f"You do not need to do anything further."
                )
                await _ask_questions(warning_questions, conversation_id, endpoint)
            else:
                warning_questions = questionary.confirm(
                    f"WARNING: You have created a new action: '{action_name}', "
                    f"which was not successfully executed. "
                    f"If this action does not return any events, "
                    f"you do not need to do anything. "
                    f"If this is a custom action which returns events, "
                    f"you are recommended to implement this action "
                    f"in your action server and try again."
                )
                await _ask_questions(warning_questions, conversation_id, endpoint)

            payload = ActionExecuted(action_name).as_dict()
            return await send_event(endpoint, conversation_id, payload)
        else:
            logger.error("failed to execute action!")
            raise


async def send_event(
    endpoint: EndpointConfig,
    conversation_id: Text,
    evt: Union[List[Dict[Text, Any]], Dict[Text, Any]],
) -> Optional[Any]:
    """Log an event to a conversation."""
    subpath = f"/conversations/{conversation_id}/tracker/events"

    return await endpoint.request(json=evt, method="post", subpath=subpath)


def format_bot_output(message: BotUttered) -> Text:
    """Format a bot response to be displayed in the history table."""
    # First, add text to output
    output = message.text or ""

    # Then, append all additional items
    data = message.data or {}
    if not data:
        return output

    if "image" in data and data["image"] is not None:
        output += "\nImage: " + data["image"]

    if "attachment" in data and data["attachment"] is not None:
        output += "\nAttachment: " + data["attachment"]

    if "buttons" in data and data["buttons"] is not None:
        output += "\nButtons:"
        choices = rasa.cli.utils.button_choices_from_message_data(
            data, allow_free_text_input=True
        )
        for choice in choices:
            output += "\n" + choice

    if "elements" in data and data["elements"] is not None:
        output += "\nElements:"
        for idx, element in enumerate(data["elements"]):
            element_str = rasa.cli.utils.element_to_string(element, idx)
            output += "\n" + element_str

    if "quick_replies" in data and data["quick_replies"] is not None:
        output += "\nQuick replies:"
        for idx, element in enumerate(data["quick_replies"]):
            element_str = rasa.cli.utils.element_to_string(element, idx)
            output += "\n" + element_str
    return output


def latest_user_message(events: List[Dict[Text, Any]]) -> Optional[Dict[Text, Any]]:
    """Return most recent user message."""
    for i, e in enumerate(reversed(events)):
        if e.get("event") == UserUttered.type_name:
            return e
    return None


async def _ask_questions(
    questions: Union[Form, Question],
    conversation_id: Text,
    endpoint: EndpointConfig,
    is_abort: Callable[[Dict[Text, Any]], bool] = lambda x: False,
) -> Any:
    """Ask the user a question, if Ctrl-C is pressed provide user with menu."""
    should_retry = True
    answers: Any = {}

    while should_retry:
        answers = await questions.ask_async()
        if answers is None or is_abort(answers):
            should_retry = await _ask_if_quit(conversation_id, endpoint)
        else:
            should_retry = False
    return answers


def _selection_choices_from_intent_prediction(
    predictions: List[Dict[Text, Any]]
) -> List[Dict[Text, Any]]:
    """Given a list of ML predictions create a UI choice list."""
    sorted_intents = sorted(
        predictions, key=lambda k: (-k["confidence"], k[INTENT_NAME_KEY])
    )

    choices = []
    for p in sorted_intents:
        name_with_confidence = (
            f'{p.get("confidence"):03.2f} {p.get(INTENT_NAME_KEY):40}'
        )
        choice = {
            INTENT_NAME_KEY: name_with_confidence,
            "value": p.get(INTENT_NAME_KEY),
        }
        choices.append(choice)

    return choices


async def _request_free_text_intent(
    conversation_id: Text, endpoint: EndpointConfig
) -> Text:
    question = questionary.text(
        message="Please type the intent name:",
        validate=io_utils.not_empty_validator("Please enter an intent name"),
    )
    return await _ask_questions(question, conversation_id, endpoint)


async def _request_free_text_action(
    conversation_id: Text, endpoint: EndpointConfig
) -> Text:
    question = questionary.text(
        message="Please type the action name:",
        validate=io_utils.not_empty_validator("Please enter an action name"),
    )
    return await _ask_questions(question, conversation_id, endpoint)


async def _request_free_text_utterance(
    conversation_id: Text, endpoint: EndpointConfig, action: Text
) -> Text:
    question = questionary.text(
        message=(f"Please type the message for your new bot response '{action}':"),
        validate=io_utils.not_empty_validator("Please enter a response"),
    )
    return await _ask_questions(question, conversation_id, endpoint)


async def _request_selection_from_intents(
    intents: List[Dict[Text, Text]], conversation_id: Text, endpoint: EndpointConfig
) -> Text:
    question = questionary.select("What intent is it?", choices=intents)
    return await _ask_questions(question, conversation_id, endpoint)


async def _request_fork_point_from_list(
    forks: List[Dict[Text, Text]], conversation_id: Text, endpoint: EndpointConfig
) -> Text:
    question = questionary.select(
        "Before which user message do you want to fork?", choices=forks
    )
    return await _ask_questions(question, conversation_id, endpoint)


async def _request_fork_from_user(
    conversation_id: Text, endpoint: EndpointConfig
) -> Optional[List[Dict[Text, Any]]]:
    """Take in a conversation and ask at which point to fork the conversation.

    Returns the list of events that should be kept. Forking means, the
    conversation will be reset and continued from this previous point.
    """
    tracker = await retrieve_tracker(
        endpoint, conversation_id, EventVerbosity.AFTER_RESTART
    )

    choices = []
    for i, e in enumerate(tracker.get("events", [])):
        if e.get("event") == UserUttered.type_name:
            choices.append({"name": e.get("text"), "value": i})

    fork_idx = await _request_fork_point_from_list(
        list(reversed(choices)), conversation_id, endpoint
    )

    if fork_idx is not None:
        return tracker.get("events", [])[: int(fork_idx)]
    else:
        return None


async def _request_intent_from_user(
    latest_message: Dict[Text, Any],
    intents: List[Text],
    conversation_id: Text,
    endpoint: EndpointConfig,
) -> Dict[Text, Any]:
    """Take in latest message and ask which intent it should have been.

    Returns the intent dict that has been selected by the user.
    """
    predictions = latest_message.get("parse_data", {}).get("intent_ranking", [])

    predicted_intents = {p[INTENT_NAME_KEY] for p in predictions}

    for i in intents:
        if i not in predicted_intents:
            predictions.append({INTENT_NAME_KEY: i, "confidence": 0.0})

    # convert intents to ui list and add <other> as a free text alternative
    choices = [
        {INTENT_NAME_KEY: "<create_new_intent>", "value": OTHER_INTENT}
    ] + _selection_choices_from_intent_prediction(predictions)

    intent_name = await _request_selection_from_intents(
        choices, conversation_id, endpoint
    )

    if intent_name == OTHER_INTENT:
        intent_name = await _request_free_text_intent(conversation_id, endpoint)
        selected_intent = {INTENT_NAME_KEY: intent_name, "confidence": 1.0}
    else:
        # returns the selected intent with the original probability value
        selected_intent = next(
            (x for x in predictions if x[INTENT_NAME_KEY] == intent_name),
            {INTENT_NAME_KEY: None},
        )

    return selected_intent


async def _print_history(conversation_id: Text, endpoint: EndpointConfig) -> None:
    """Print information about the conversation for the user."""
    tracker_dump = await retrieve_tracker(
        endpoint, conversation_id, EventVerbosity.AFTER_RESTART
    )
    events = tracker_dump.get("events", [])

    table = _chat_history_table(events)
    slot_strings = _slot_history(tracker_dump)

    print("------")
    print("Chat History\n")
    loop = asyncio.get_running_loop()
    loop.run_in_executor(None, print, table)

    if slot_strings:
        print("\n")
        slots_info = f"Current slots: \n\t{', '.join(slot_strings)}\n"
        loop.run_in_executor(None, print, slots_info)

    loop.run_in_executor(None, print, "------")


def _chat_history_table(events: List[Dict[Text, Any]]) -> Text:
    """Create a table containing bot and user messages.

    Also includes additional information, like any events and
    prediction probabilities.
    """

    def wrap(txt: Text, max_width: int) -> Text:
        true_wrapping_width = calc_true_wrapping_width(txt, max_width)
        return "\n".join(
            textwrap.wrap(txt, true_wrapping_width, replace_whitespace=False)
        )

    def colored(txt: Text, color: Text) -> Text:
        return "{" + color + "}" + txt + "{/" + color + "}"

    def format_user_msg(user_event: UserUttered, max_width: int) -> Text:
        intent = user_event.intent or {}
        intent_name = intent.get(INTENT_NAME_KEY, "")
        _confidence = intent.get("confidence", 1.0)
        _md = _as_md_message(user_event.parse_data)

        _lines = [
            colored(wrap(_md, max_width), "hired"),
            f"intent: {intent_name} {_confidence:03.2f}",
        ]
        return "\n".join(_lines)

    def bot_width(_table: AsciiTable) -> int:
        return _table.column_max_width(1)

    def user_width(_table: AsciiTable) -> int:
        return _table.column_max_width(3)

    def add_bot_cell(data: List[List[Union[Text, Color]]], cell: Text) -> None:
        data.append([len(data), Color(cell), "", ""])

    def add_user_cell(data: List[List[Union[Text, Color]]], cell: Text) -> None:
        data.append([len(data), "", "", Color(cell)])

    # prints the historical interactions between the bot and the user,
    # to help with correctly identifying the action
    table_data = [
        [
            "#  ",
            Color(colored("Bot      ", "autoblue")),
            "  ",
            Color(colored("You       ", "hired")),
        ]
    ]

    table = SingleTable(table_data, "Chat History")

    bot_column = []

    tracker = DialogueStateTracker.from_dict("any", events)
    applied_events = tracker.applied_events()

    for idx, event in enumerate(applied_events):
        if isinstance(event, ActionExecuted):
            if (
                event.action_name == ACTION_UNLIKELY_INTENT_NAME
                and event.confidence == 0
            ):
                continue
            bot_column.append(colored(str(event), "autocyan"))
            if event.confidence is not None:
                bot_column[-1] += colored(f" {event.confidence:03.2f}", "autowhite")

        elif isinstance(event, UserUttered):
            if bot_column:
                text = "\n".join(bot_column)
                add_bot_cell(table_data, text)
                bot_column = []

            msg = format_user_msg(event, user_width(table))
            add_user_cell(table_data, msg)

        elif isinstance(event, BotUttered):
            wrapped = wrap(format_bot_output(event), bot_width(table))
            bot_column.append(colored(wrapped, "autoblue"))

        else:
            if event.as_story_string():
                bot_column.append(wrap(event.as_story_string(), bot_width(table)))

    if bot_column:
        text = "\n".join(bot_column)
        add_bot_cell(table_data, text)

    table.inner_heading_row_border = False
    table.inner_row_border = True
    table.inner_column_border = False
    table.outer_border = False
    table.justify_columns = {0: "left", 1: "left", 2: "center", 3: "right"}

    return table.table


def _slot_history(tracker_dump: Dict[Text, Any]) -> List[Text]:
    """Create an array of slot representations to be displayed."""
    slot_strings = []
    for k, s in tracker_dump.get("slots", {}).items():
        colored_value = rasa.shared.utils.io.wrap_with_color(
            str(s), color=rasa.shared.utils.io.bcolors.WARNING
        )
        slot_strings.append(f"{k}: {colored_value}")
    return slot_strings


async def _retry_on_error(
    func: Callable, export_path: Text, *args: Any, **kwargs: Any
) -> None:
    while True:
        try:
            return func(export_path, *args, **kwargs)
        except OSError as e:
            answer = await questionary.confirm(
                f"Failed to export '{export_path}': {e}. Please make sure 'rasa' "
                f"has read and write access to this file. Would you like to retry?"
            ).ask_async()
            if not answer:
                raise e


async def _write_data_to_file(conversation_id: Text, endpoint: EndpointConfig) -> None:
    """Write stories and nlu data to file."""
    story_path, nlu_path, domain_path = await _request_export_info()

    tracker = await retrieve_tracker(endpoint, conversation_id)
    events = tracker.get("events", [])

    serialised_domain = await retrieve_domain(endpoint)
    domain = Domain.from_dict(serialised_domain)

    await _retry_on_error(_write_stories_to_file, story_path, events, domain)
    await _retry_on_error(_write_nlu_to_file, nlu_path, events)
    await _retry_on_error(_write_domain_to_file, domain_path, events, domain)

    logger.info("Successfully wrote stories and NLU data")


async def _ask_if_quit(conversation_id: Text, endpoint: EndpointConfig) -> bool:
    """Display the exit menu.

    Return `True` if the previous question should be retried.
    """
    answer = await questionary.select(
        message="Do you want to stop?",
        choices=[
            Choice("Continue", "continue"),
            Choice("Undo Last", "undo"),
            Choice("Fork", "fork"),
            Choice("Start Fresh", "restart"),
            Choice("Export & Quit", "quit"),
        ],
    ).ask_async()

    if not answer or answer == "quit":
        # this is also the default answer if the user presses Ctrl-C
        await _write_data_to_file(conversation_id, endpoint)
        raise Abort()
    elif answer == "undo":
        raise UndoLastStep()
    elif answer == "fork":
        raise ForkTracker()
    elif answer == "restart":
        raise RestartConversation()
    else:  # `continue` or no answer
        # in this case we will just return, and the original
        # question will get asked again
        return True


async def _request_action_from_user(
    predictions: List[Dict[Text, Any]], conversation_id: Text, endpoint: EndpointConfig
) -> Tuple[Text, bool]:
    """Ask the user to correct an action prediction."""
    await _print_history(conversation_id, endpoint)

    choices = [
        {"name": f'{a["score"]:03.2f} {a["action"]:40}', "value": a["action"]}
        for a in predictions
    ]

    tracker = await retrieve_tracker(endpoint, conversation_id)
    events = tracker.get("events", [])

    session_actions_all = [a["name"] for a in _collect_actions(events)]
    session_actions_unique = list(set(session_actions_all))
    old_actions = [action["value"] for action in choices]
    new_actions = [
        {"name": action, "value": OTHER_ACTION + action}
        for action in session_actions_unique
        if action not in old_actions
    ]
    choices = (
        [{"name": "<create new action>", "value": NEW_ACTION}] + new_actions + choices
    )
    question = questionary.select("What is the next action of the bot?", choices)

    action_name = await _ask_questions(question, conversation_id, endpoint)
    is_new_action = action_name == NEW_ACTION

    if is_new_action:
        # create new action
        action_name = await _request_free_text_action(conversation_id, endpoint)
        if action_name.startswith(UTTER_PREFIX):
            utter_message = await _request_free_text_utterance(
                conversation_id, endpoint, action_name
            )
            NEW_RESPONSES[action_name] = [{KEY_RESPONSES_TEXT: utter_message}]

    elif action_name[:32] == OTHER_ACTION:
        # action was newly created in the session, but not this turn
        is_new_action = True
        action_name = action_name[32:]

    print(f"Thanks! The bot will now run {action_name}.\n")
    return action_name, is_new_action


async def _request_export_info() -> Tuple[Text, Text, Text]:
    import rasa.shared.data

    """Request file path and export stories & nlu data to that path"""

    # export training data and quit
    questions = questionary.form(
        export_stories=questionary.text(
            message="Export stories to (if file exists, this "
            "will append the stories)",
            default=PATHS["stories"],
            validate=io_utils.file_type_validator(
                rasa.shared.data.YAML_FILE_EXTENSIONS,
                "Please provide a valid export path for the stories, "
                "e.g. 'stories.yml'.",
            ),
        ),
        export_nlu=questionary.text(
            message="Export NLU data to (if file exists, this will "
            "merge learned data with previous training examples)",
            default=PATHS["nlu"],
            validate=io_utils.file_type_validator(
                list(rasa.shared.data.TRAINING_DATA_EXTENSIONS),
                "Please provide a valid export path for the NLU data, "
                "e.g. 'nlu.yml'.",
            ),
        ),
        export_domain=questionary.text(
            message="Export domain file to (if file exists, this "
            "will be overwritten)",
            default=PATHS["domain"],
            validate=io_utils.file_type_validator(
                rasa.shared.data.YAML_FILE_EXTENSIONS,
                "Please provide a valid export path for the domain file, "
                "e.g. 'domain.yml'.",
            ),
        ),
    )

    answers = await questions.ask_async()
    if not answers:
        raise Abort()

    return answers["export_stories"], answers["export_nlu"], answers["export_domain"]


def _split_conversation_at_restarts(
    events: List[Dict[Text, Any]]
) -> List[List[Dict[Text, Any]]]:
    """Split a conversation at restart events.

    Returns an array of event lists, without the restart events.
    """
    deserialized_events = [Event.from_parameters(event) for event in events]
    split_events = rasa.shared.core.events.split_events(
        deserialized_events, Restarted, include_splitting_event=False
    )

    return [[event.as_dict() for event in events] for events in split_events]


def _collect_messages(events: List[Dict[Text, Any]]) -> List[Message]:
    """Collect the message text and parsed data from the UserMessage events
    into a list.
    """
    import rasa.shared.nlu.training_data.util as rasa_nlu_training_data_utils

    messages = []

    for event in events:
        if event.get("event") == UserUttered.type_name:
            data = event.get("parse_data", {})
            rasa_nlu_training_data_utils.remove_untrainable_entities_from(data)
            msg = Message.build(
                data["text"], data["intent"][INTENT_NAME_KEY], data["entities"]
            )
            messages.append(msg)
        elif event.get("event") == UserUtteranceReverted.type_name and messages:
            messages.pop()  # user corrected the nlu, remove incorrect example

    return messages


def _collect_actions(events: List[Dict[Text, Any]]) -> List[Dict[Text, Any]]:
    """Collect all the `ActionExecuted` events into a list."""
    return [evt for evt in events if evt.get("event") == ActionExecuted.type_name]


def _write_stories_to_file(
    export_story_path: Text, events: List[Dict[Text, Any]], domain: Domain
) -> None:
    """Write the conversation of the conversation_id to the file paths."""
    from rasa.shared.core.training_data.story_writer.yaml_story_writer import (
        YAMLStoryWriter,
    )

    sub_conversations = _split_conversation_at_restarts(events)
    io_utils.create_path(export_story_path)

    if rasa.shared.data.is_likely_yaml_file(export_story_path):
        writer = YAMLStoryWriter()

    should_append_stories = False
    if os.path.exists(export_story_path):
        append_write = "a"  # append if already exists
        should_append_stories = True
    else:
        append_write = "w"  # make a new file if not

    with open(
        export_story_path, append_write, encoding=rasa.shared.utils.io.DEFAULT_ENCODING
    ) as f:
        interactive_story_counter = 1
        for conversation in sub_conversations:
            parsed_events = rasa.shared.core.events.deserialise_events(conversation)
            tracker = DialogueStateTracker.from_events(
                f"interactive_story_{interactive_story_counter}",
                evts=parsed_events,
                slots=domain.slots,
            )

            if any(
                isinstance(event, UserUttered) for event in tracker.applied_events()
            ):
                interactive_story_counter += 1
                f.write(
                    "\n"
                    + tracker.export_stories(
                        writer=writer,
                        should_append_stories=should_append_stories,
                        e2e=SAVE_IN_E2E,
                    )
                )


def _filter_messages(msgs: List[Message]) -> List[Message]:
    """Filter messages removing those that start with INTENT_MESSAGE_PREFIX."""
    filtered_messages = []
    for msg in msgs:
        if not msg.get(TEXT).startswith(INTENT_MESSAGE_PREFIX):
            filtered_messages.append(msg)
    return filtered_messages


def _write_nlu_to_file(export_nlu_path: Text, events: List[Dict[Text, Any]]) -> None:
    """Write the nlu data of the conversation_id to the file paths."""
    from rasa.shared.nlu.training_data.training_data import TrainingData

    msgs = _collect_messages(events)
    msgs = _filter_messages(msgs)

    # noinspection PyBroadException
    try:
        previous_examples = loading.load_data(export_nlu_path)
    except Exception as e:
        logger.debug(f"An exception occurred while trying to load the NLU data. {e!s}")
        # No previous file exists, use empty training data as replacement.
        previous_examples = TrainingData()

    nlu_data = previous_examples.merge(TrainingData(msgs))

    # need to guess the format of the file before opening it to avoid a read
    # in a write
    nlu_format = _get_nlu_target_format(export_nlu_path)
    if nlu_format == RASA_YAML:
        stringified_training_data = nlu_data.nlu_as_yaml()
    else:
        stringified_training_data = nlu_data.nlu_as_json()

    rasa.shared.utils.io.write_text_file(stringified_training_data, export_nlu_path)


def _get_nlu_target_format(export_path: Text) -> Text:
    guessed_format = loading.guess_format(export_path)

    if guessed_format not in {RASA, RASA_YAML}:
        if rasa.shared.data.is_likely_json_file(export_path):
            guessed_format = RASA
        elif rasa.shared.data.is_likely_yaml_file(export_path):
            guessed_format = RASA_YAML

    return guessed_format


def _entities_from_messages(messages: List[Message]) -> List[Text]:
    """Return all entities that occur in at least one of the messages."""
    return list({e["entity"] for m in messages for e in m.data.get("entities", [])})


def _intents_from_messages(messages: List[Message]) -> Set[Text]:
    """Return all intents that occur in at least one of the messages."""
    # set of distinct intents
    distinct_intents = {m.data["intent"] for m in messages if "intent" in m.data}

    return distinct_intents


def _write_domain_to_file(
    domain_path: Text, events: List[Dict[Text, Any]], old_domain: Domain
) -> None:
    """Write an updated domain file to the file path."""
    io_utils.create_path(domain_path)

    messages = _collect_messages(events)
    actions = _collect_actions(events)
    responses = NEW_RESPONSES

    # TODO for now there is no way to distinguish between action and form
    collected_actions = list(
        {
            e["name"]
            for e in actions
            if e["name"] not in rasa.shared.core.constants.DEFAULT_ACTION_NAMES
            and e["name"] not in old_domain.form_names
        }
    )

    new_domain = Domain.from_dict(
        {
            KEY_INTENTS: list(_intents_from_messages(messages)),
            KEY_ENTITIES: _entities_from_messages(messages),
            KEY_RESPONSES: responses,
            KEY_ACTIONS: collected_actions,
        }
    )

    old_domain.merge(new_domain).persist(domain_path)


async def _predict_till_next_listen(
    endpoint: EndpointConfig,
    conversation_id: Text,
    conversation_ids: List[Text],
    plot_file: Optional[Text],
) -> None:
    """Predict and validate actions until we need to wait for a user message."""
    listen = False
    while not listen:
        result = await request_prediction(endpoint, conversation_id)
        if result is None:
            result = {}

        predictions = result.get("scores", [])
        if not predictions:
            raise InvalidConfigException(
                "Cannot continue as no action was predicted by the dialogue manager. "
                "This can happen if you trained the assistant with no policy included "
                "in the configuration. If so, please re-train the assistant with at "
                f"least one policy ({DOCS_URL_POLICIES}) included in the configuration."
            )

        probabilities = [prediction["score"] for prediction in predictions]
        pred_out = int(np.argmax(probabilities))
        action_name = predictions[pred_out].get("action")
        policy = result.get("policy")
        confidence = result.get("confidence")

        await _print_history(conversation_id, endpoint)
        await _plot_trackers(
            conversation_ids,
            plot_file,
            endpoint,
            unconfirmed=[ActionExecuted(action_name)],
        )

        listen = await _validate_action(
            action_name, policy, confidence, predictions, endpoint, conversation_id
        )

        await _plot_trackers(conversation_ids, plot_file, endpoint)

    tracker_dump = await retrieve_tracker(
        endpoint, conversation_id, EventVerbosity.AFTER_RESTART
    )
    events = tracker_dump.get("events", [])

    if len(events) >= 2:
        last_event = events[-2]  # last event before action_listen

        # if bot message includes buttons the user will get a list choice to reply
        # the list choice is displayed in place of action listen
        if last_event.get("event") == BotUttered.type_name and last_event["data"].get(
            "buttons", None
        ):
            user_selection = await _get_button_choice(last_event)
            if user_selection != rasa.cli.utils.FREE_TEXT_INPUT_PROMPT:
                await send_message(endpoint, conversation_id, user_selection)


async def _get_button_choice(last_event: Dict[Text, Any]) -> Text:
    data = last_event["data"]
    message = last_event.get("text", "")

    choices = rasa.cli.utils.button_choices_from_message_data(
        data, allow_free_text_input=True
    )
    question = questionary.select(message, choices)
    return await rasa.cli.utils.payload_from_button_question(question)


async def _correct_wrong_nlu(
    corrected_nlu: Dict[Text, Any],
    events: List[Dict[Text, Any]],
    endpoint: EndpointConfig,
    conversation_id: Text,
) -> None:
    """A wrong NLU prediction got corrected, update core's tracker."""
    revert_latest_user_utterance = UserUtteranceReverted().as_dict()
    # `UserUtteranceReverted` also removes the `ACTION_LISTEN` event before, hence we
    # have to replay it.
    listen_for_next_message = ActionExecuted(ACTION_LISTEN_NAME).as_dict()
    corrected_message = latest_user_message(events)

    if corrected_message is None:
        raise Exception("Failed to correct NLU data. User message not found.")

    corrected_message["parse_data"] = corrected_nlu
    await send_event(
        endpoint,
        conversation_id,
        [revert_latest_user_utterance, listen_for_next_message, corrected_message],
    )


async def _correct_wrong_action(
    corrected_action: Text,
    endpoint: EndpointConfig,
    conversation_id: Text,
    is_new_action: bool = False,
) -> None:
    """A wrong action prediction got corrected, update core's tracker."""
    await send_action(
        endpoint, conversation_id, corrected_action, is_new_action=is_new_action
    )


def _form_is_rejected(action_name: Text, tracker: Dict[Text, Any]) -> bool:
    """Check if the form got rejected with the most recent action name."""
    return (
        tracker.get(ACTIVE_LOOP, {}).get(LOOP_NAME)
        and action_name != tracker[ACTIVE_LOOP][LOOP_NAME]
        and action_name != ACTION_LISTEN_NAME
    )


def _form_is_restored(action_name: Text, tracker: Dict[Text, Any]) -> bool:
    """Check whether the form is called again after it was rejected."""
    return (
        tracker.get(ACTIVE_LOOP, {}).get(LOOP_REJECTED)
        and tracker.get("latest_action_name") == ACTION_LISTEN_NAME
        and action_name == tracker.get(ACTIVE_LOOP, {}).get(LOOP_NAME)
    )


async def _confirm_form_validation(
    action_name: Text,
    tracker: Dict[Text, Any],
    endpoint: EndpointConfig,
    conversation_id: Text,
) -> None:
    """Ask a user whether an input for a form should be validated.

    Previous to this call, the active form was chosen after it was rejected.
    """
    requested_slot = tracker.get("slots", {}).get(REQUESTED_SLOT)

    validation_questions = questionary.confirm(
        f"Should '{action_name}' validate user input to fill "
        f"the slot '{requested_slot}'?"
    )
    validate_input = await _ask_questions(
        validation_questions, conversation_id, endpoint
    )

    if not validate_input:
        # notify form action to skip validation
        await send_event(
            endpoint,
            conversation_id,
            {
                "event": rasa.shared.core.events.LoopInterrupted.type_name,
                LOOP_INTERRUPTED: True,
            },
        )

    elif tracker.get(ACTIVE_LOOP, {}).get(LOOP_INTERRUPTED):
        # handle contradiction with learned behaviour
        warning_question = questionary.confirm(
            "ERROR: FormPolicy predicted no form validation "
            "based on previous training stories. "
            "Make sure to remove contradictory stories "
            "from training data. "
            "Otherwise predicting no form validation "
            "will not work as expected."
        )

        await _ask_questions(warning_question, conversation_id, endpoint)
        # notify form action to validate an input
        await send_event(
            endpoint,
            conversation_id,
            {
                "event": rasa.shared.core.events.LoopInterrupted.type_name,
                LOOP_INTERRUPTED: False,
            },
        )


async def _validate_action(
    action_name: Text,
    policy: Text,
    confidence: float,
    predictions: List[Dict[Text, Any]],
    endpoint: EndpointConfig,
    conversation_id: Text,
) -> bool:
    """Query the user to validate if an action prediction is correct.

    Returns `True` if the prediction is correct, `False` otherwise.
    """
    if action_name == ACTION_UNLIKELY_INTENT_NAME:
        question = questionary.confirm(
            f"The bot wants to run '{action_name}' "
            f"to indicate that the last user message was unexpected "
            f"at this point in the conversation. "
            f"Check out UnexpecTEDIntentPolicy "
            f"({DOCS_URL_POLICIES}#unexpected-intent-policy) "
            f"to learn more. Is that correct?"
        )
    else:
        question = questionary.confirm(
            f"The bot wants to run '{action_name}', correct?"
        )

    is_correct = await _ask_questions(question, conversation_id, endpoint)

    if not is_correct and action_name != ACTION_UNLIKELY_INTENT_NAME:
        action_name, is_new_action = await _request_action_from_user(
            predictions, conversation_id, endpoint
        )
    else:
        is_new_action = False

    tracker = await retrieve_tracker(
        endpoint, conversation_id, EventVerbosity.AFTER_RESTART
    )

    if _form_is_rejected(action_name, tracker):
        # notify the tracker that form was rejected
        await send_event(
            endpoint,
            conversation_id,
            {
                "event": "action_execution_rejected",
                LOOP_NAME: tracker[ACTIVE_LOOP][LOOP_NAME],
            },
        )

    elif _form_is_restored(action_name, tracker):
        await _confirm_form_validation(action_name, tracker, endpoint, conversation_id)

    if not is_correct:
        await _correct_wrong_action(
            action_name, endpoint, conversation_id, is_new_action=is_new_action
        )
    else:
        await send_action(endpoint, conversation_id, action_name, policy, confidence)

    return action_name == ACTION_LISTEN_NAME


def _as_md_message(parse_data: Dict[Text, Any]) -> Text:
    """Display the parse data of a message in markdown format."""
    from rasa.shared.nlu.training_data.formats.readerwriter import TrainingDataWriter

    if parse_data.get("text", "").startswith(INTENT_MESSAGE_PREFIX):
        return parse_data["text"]

    if not parse_data.get("entities"):
        parse_data["entities"] = []

    return TrainingDataWriter.generate_message(parse_data)


def _validate_user_regex(latest_message: Dict[Text, Any], intents: List[Text]) -> bool:
    """Validate if a users message input is correct.

    This assumes the user entered an intent directly, e.g. using
    `/greet`. Return `True` if the intent is a known one.
    """
    parse_data = latest_message.get("parse_data", {})
    intent = parse_data.get("intent", {}).get(INTENT_NAME_KEY)

    if intent in intents:
        return True
    else:
        return False


async def _validate_user_text(
    latest_message: Dict[Text, Any], endpoint: EndpointConfig, conversation_id: Text
) -> bool:
    """Validate a user message input as free text.

    This assumes the user message is a text message (so NOT `/greet`).
    """
    parse_data = latest_message.get("parse_data", {})
    text = _as_md_message(parse_data)
    intent = parse_data.get("intent", {}).get(INTENT_NAME_KEY)
    entities = parse_data.get("entities", [])
    if entities:
        message = (
            f"Is the intent '{intent}' correct for '{text}' and are "
            f"all entities labeled correctly?"
        )
    else:
        message = (
            f"Your NLU model classified '{text}' with intent '{intent}'"
            f" and there are no entities, is this correct?"
        )

    if intent is None:
        print(f"The NLU classification for '{text}' returned '{intent}'")
        return False
    else:
        question = questionary.confirm(message)

        return await _ask_questions(question, conversation_id, endpoint)


async def _validate_nlu(
    intents: List[Text], endpoint: EndpointConfig, conversation_id: Text
) -> None:
    """Validate if a user message, either text or intent is correct.

    If the prediction of the latest user message is incorrect,
    the tracker will be corrected with the correct intent / entities.
    """
    tracker = await retrieve_tracker(
        endpoint, conversation_id, EventVerbosity.AFTER_RESTART
    )

    latest_message = latest_user_message(tracker.get("events", [])) or {}

    if latest_message.get("text", "").startswith(INTENT_MESSAGE_PREFIX):
        valid = _validate_user_regex(latest_message, intents)
    else:
        valid = await _validate_user_text(latest_message, endpoint, conversation_id)

    if not valid:
        corrected_intent = await _request_intent_from_user(
            latest_message, intents, conversation_id, endpoint
        )
        # corrected intents have confidence 1.0
        corrected_intent["confidence"] = 1.0

        events = tracker.get("events", [])

        entities = await _correct_entities(latest_message, endpoint, conversation_id)
        corrected_nlu = {
            "intent": corrected_intent,
            "entities": entities,
            "text": latest_message.get("text"),
        }

        await _correct_wrong_nlu(corrected_nlu, events, endpoint, conversation_id)


async def _correct_entities(
    latest_message: Dict[Text, Any], endpoint: EndpointConfig, conversation_id: Text
) -> List[Dict[Text, Any]]:
    """Validate the entities of a user message.

    Returns the corrected entities.
    """
    from rasa.shared.nlu.training_data import entities_parser

    parse_original = latest_message.get("parse_data", {})
    entity_str = _as_md_message(parse_original)
    question = questionary.text(
        "Please mark the entities using [value](type) notation", default=entity_str
    )

    annotation = await _ask_questions(question, conversation_id, endpoint)
    parse_annotated = entities_parser.parse_training_example(annotation)

    corrected_entities = _merge_annotated_and_original_entities(
        parse_annotated, parse_original
    )

    return corrected_entities


def _merge_annotated_and_original_entities(
    parse_annotated: Message, parse_original: Dict[Text, Any]
) -> List[Dict[Text, Any]]:
    # overwrite entities which have already been
    # annotated in the original annotation to preserve
    # additional entity parser information
    entities = parse_annotated.get("entities", [])[:]
    for i, entity in enumerate(entities):
        for original_entity in parse_original.get("entities", []):
            if _is_same_entity_annotation(entity, original_entity):
                entities[i] = original_entity
                break
    return entities


def _is_same_entity_annotation(entity: Dict[Text, Any], other: Dict[Text, Any]) -> bool:
    return (
        entity["value"] == other["value"]
        and entity["entity"] == other["entity"]
        and entity.get("group") == other.get("group")
        and entity.get("role") == other.get("group")
    )


async def _enter_user_message(conversation_id: Text, endpoint: EndpointConfig) -> None:
    """Request a new message from the user."""
    question = questionary.text("Your input ->")

    message = await _ask_questions(question, conversation_id, endpoint, lambda a: not a)

    if message == (INTENT_MESSAGE_PREFIX + USER_INTENT_RESTART):
        raise RestartConversation()

    await send_message(endpoint, conversation_id, message)


async def is_listening_for_message(
    conversation_id: Text, endpoint: EndpointConfig
) -> bool:
    """Check if the conversation is in need for a user message."""
    tracker = await retrieve_tracker(endpoint, conversation_id, EventVerbosity.APPLIED)

    for i, e in enumerate(reversed(tracker.get("events", []))):
        if e.get("event") == UserUttered.type_name:
            return False
        elif e.get("event") == ActionExecuted.type_name:
            return e.get("name") == ACTION_LISTEN_NAME
    return False


async def _undo_latest(conversation_id: Text, endpoint: EndpointConfig) -> None:
    """Undo either the latest bot action or user message, whatever is last."""
    tracker = await retrieve_tracker(endpoint, conversation_id, EventVerbosity.ALL)

    # Get latest `UserUtterance` or `ActionExecuted` event.
    last_event_type = None
    for i, e in enumerate(reversed(tracker.get("events", []))):
        last_event_type = e.get("event")
        if last_event_type in {ActionExecuted.type_name, UserUttered.type_name}:
            break
        elif last_event_type == Restarted.type_name:
            break

    if last_event_type == ActionExecuted.type_name:
        undo_action = ActionReverted().as_dict()
        await send_event(endpoint, conversation_id, undo_action)
    elif last_event_type == UserUttered.type_name:
        undo_user_message = UserUtteranceReverted().as_dict()
        listen_for_next_message = ActionExecuted(ACTION_LISTEN_NAME).as_dict()

        await send_event(
            endpoint, conversation_id, [undo_user_message, listen_for_next_message]
        )


async def _fetch_events(
    conversation_ids: List[Union[Text, List[Event]]], endpoint: EndpointConfig
) -> List[List[Event]]:
    """Retrieve all event trackers from the endpoint for all conversation ids."""
    event_sequences = []
    for conversation_id in conversation_ids:
        if isinstance(conversation_id, str):
            tracker = await retrieve_tracker(endpoint, conversation_id)
            events = tracker.get("events", [])

            for conversation in _split_conversation_at_restarts(events):
                parsed_events = rasa.shared.core.events.deserialise_events(conversation)
                event_sequences.append(parsed_events)
        else:
            event_sequences.append(conversation_id)
    return event_sequences


async def _plot_trackers(
    conversation_ids: List[Union[Text, List[Event]]],
    output_file: Optional[Text],
    endpoint: EndpointConfig,
    unconfirmed: Optional[List[Event]] = None,
) -> None:
    """Create a plot of the trackers of the passed conversation ids.

    This assumes that the last conversation id is the conversation we are currently
    working on. If there are events that are not part of this active tracker
    yet, they can be passed as part of `unconfirmed`. They will be appended
    to the currently active conversation.
    """
    if not output_file or not conversation_ids:
        # if there is no output file provided, we are going to skip plotting
        # same happens if there are no conversation ids
        return

    event_sequences = await _fetch_events(conversation_ids, endpoint)

    if unconfirmed:
        event_sequences[-1].extend(unconfirmed)

    graph = visualize_neighborhood(
        event_sequences[-1], event_sequences, output_file=None, max_history=2
    )

    from networkx.drawing.nx_pydot import write_dot

    with open(output_file, "w", encoding="utf-8") as f:
        write_dot(graph, f)


def _print_help(skip_visualization: bool) -> None:
    """Print some initial help message for the user."""
    if not skip_visualization:
        visualization_url = DEFAULT_SERVER_FORMAT.format(
            "http", DEFAULT_SERVER_PORT + 1
        )
        visualization_help = (
            f"Visualisation at {visualization_url}/visualization.html ."
        )
    else:
        visualization_help = ""

    rasa.shared.utils.cli.print_success(
        f"Bot loaded. {visualization_help}\n"
        f"Type a message and press enter "
        f"(press 'Ctrl-c' to exit)."
    )


def intent_names_from_domain(domain: Any) -> List[Text]:
    """Get a list of the possible intents names from the domain specification.

    This is its own function as intents are non-trivial to unpack and this
    warrants testing.
    """
    domain_intents = domain.get("intents", []) if domain is not None else []

    # intents with properties such as `use_entities` or `ignore_entities`
    # are a dictionary which needs unpacking. Other intents are strings
    # and can be used as-is.
    return [next(iter(i)) if isinstance(i, dict) else i for i in domain_intents]


async def record_messages(
    endpoint: EndpointConfig,
    file_importer: TrainingDataImporter,
    conversation_id: Text = DEFAULT_SENDER_ID,
    max_message_limit: Optional[int] = None,
    skip_visualization: bool = False,
) -> None:
    """Read messages from the command line and print bot responses."""
    try:
        try:
            domain = await retrieve_domain(endpoint)
        except ClientError:
            logger.exception(
                f"Failed to connect to Rasa Core server at '{endpoint.url}'. "
                f"Is the server running?"
            )
            return

        intents = intent_names_from_domain(domain)

        num_messages = 0

        if not skip_visualization:
            events_including_current_user_id = _get_tracker_events_to_plot(
                domain, file_importer, conversation_id
            )

            plot_file = DEFAULT_STORY_GRAPH_FILE
            await _plot_trackers(events_including_current_user_id, plot_file, endpoint)
        else:
            # `None` means that future `_plot_trackers` calls will also skip the
            # visualization.
            plot_file = None
            events_including_current_user_id = []

        _print_help(skip_visualization)

        while not utils.is_limit_reached(num_messages, max_message_limit):
            try:
                if await is_listening_for_message(conversation_id, endpoint):
                    await _enter_user_message(conversation_id, endpoint)
                    await _validate_nlu(intents, endpoint, conversation_id)

                await _predict_till_next_listen(
                    endpoint,
                    conversation_id,
                    events_including_current_user_id,
                    plot_file,
                )

                num_messages += 1
            except RestartConversation:
                await send_event(endpoint, conversation_id, Restarted().as_dict())

                await send_event(
                    endpoint,
                    conversation_id,
                    ActionExecuted(ACTION_LISTEN_NAME).as_dict(),
                )

                logger.info("Restarted conversation, starting a new one.")
            except UndoLastStep:
                await _undo_latest(conversation_id, endpoint)
                await _print_history(conversation_id, endpoint)
            except ForkTracker:
                await _print_history(conversation_id, endpoint)

                events_fork = await _request_fork_from_user(conversation_id, endpoint)

                await send_event(endpoint, conversation_id, Restarted().as_dict())

                if events_fork:
                    for evt in events_fork:
                        await send_event(endpoint, conversation_id, evt)
                logger.info("Restarted conversation at fork.")

                await _print_history(conversation_id, endpoint)
                await _plot_trackers(
                    events_including_current_user_id, plot_file, endpoint
                )

    except Abort:
        return
    except Exception:
        logger.exception("An exception occurred while recording messages.")
        raise


def _get_tracker_events_to_plot(
    domain: Dict[Text, Any], file_importer: TrainingDataImporter, conversation_id: Text
) -> List[Union[Text, Deque[Event]]]:
    training_trackers = _get_training_trackers(file_importer, domain)
    number_of_trackers = len(training_trackers)
    if number_of_trackers > MAX_NUMBER_OF_TRAINING_STORIES_FOR_VISUALIZATION:
        rasa.shared.utils.cli.print_warning(
            f"You have {number_of_trackers} different story paths in "
            f"your training data. Visualizing them is very resource "
            f"consuming. Hence, the visualization will only show the stories "
            f"which you created during interactive learning, but not your "
            f"training stories."
        )
        training_trackers = []

    training_data_events: List[Union[Text, Deque[Event]]] = [
        t.events for t in training_trackers
    ]
    return training_data_events + [conversation_id]


def _get_training_trackers(
    file_importer: TrainingDataImporter, domain: Dict[str, Any]
) -> List[TrackerWithCachedStates]:
    from rasa.core import training

    return training.load_data(
        file_importer,
        Domain.from_dict(domain),
        augmentation_factor=0,
        use_story_concatenation=False,
    )


def _serve_application(
    app: Sanic,
    file_importer: TrainingDataImporter,
    skip_visualization: bool,
    conversation_id: Text,
    port: int,
) -> Sanic:
    """Start a core server and attach the interactive learning IO."""
    endpoint = EndpointConfig(url=DEFAULT_SERVER_FORMAT.format("http", port))

    async def run_interactive_io(running_app: Sanic) -> None:
        """Small wrapper to shut down the server once cmd io is done."""
        await record_messages(
            endpoint=endpoint,
            file_importer=file_importer,
            skip_visualization=skip_visualization,
            conversation_id=conversation_id,
        )

        logger.info("Killing Sanic server now.")

        running_app.stop()  # kill the sanic server

    app.add_task(run_interactive_io)

    update_sanic_log_level()

    app.run(host="0.0.0.0", port=port)

    return app


def start_visualization(image_path: Text, port: int) -> None:
    """Add routes to serve the conversation visualization files."""
    app = Sanic("rasa_interactive")

    # noinspection PyUnusedLocal
    @app.exception(NotFound)
    async def ignore_404s(request: Request, exception: Exception) -> HTTPResponse:
        return response.text("Not found", status=404)

    # noinspection PyUnusedLocal
    @app.route(VISUALIZATION_TEMPLATE_PATH, methods=["GET"])
    async def visualisation_html(request: Request) -> HTTPResponse:
        return await response.file(visualization.visualization_html_path())

    # noinspection PyUnusedLocal
    @app.route("/visualization.dot", methods=["GET"])
    async def visualisation_png(request: Request) -> HTTPResponse:
        try:
            headers = {"Cache-Control": "no-cache"}
            return await response.file(os.path.abspath(image_path), headers=headers)
        except FileNotFoundError:
            return response.text("", 404)

    update_sanic_log_level()

    app.run(host="0.0.0.0", port=port, access_log=False)


def run_interactive_learning(
    file_importer: TrainingDataImporter,
    skip_visualization: bool = False,
    conversation_id: Text = uuid.uuid4().hex,
    server_args: Optional[Dict[Text, Any]] = None,
) -> None:
    """Start the interactive learning with the model of the agent."""
    global SAVE_IN_E2E
    server_args = server_args or {}

    if server_args.get("nlu_data"):
        PATHS["nlu"] = server_args["nlu_data"]

    if server_args.get("stories"):
        PATHS["stories"] = server_args["stories"]

    if server_args.get("domain"):
        PATHS["domain"] = server_args["domain"]

    port = server_args.get("port", DEFAULT_SERVER_PORT)

    SAVE_IN_E2E = server_args["e2e"]

    if not skip_visualization:
        visualisation_port = port + 1
        p = Process(
            target=start_visualization,
            args=(DEFAULT_STORY_GRAPH_FILE, visualisation_port),
            daemon=True,
        )
        p.start()
    else:
        p = None

    app = run.configure_app(port=port, conversation_id="default", enable_api=True)
    endpoints = AvailableEndpoints.read_endpoints(server_args.get("endpoints"))

    # before_server_start handlers make sure the agent is loaded before the
    # interactive learning IO starts
    app.register_listener(
        partial(run.load_agent_on_start, server_args.get("model"), endpoints, None),
        "before_server_start",
    )

    telemetry.track_interactive_learning_start(skip_visualization, SAVE_IN_E2E)

    _serve_application(app, file_importer, skip_visualization, conversation_id, port)

    if not skip_visualization and p is not None:
        p.terminate()
        p.join()


def calc_true_wrapping_width(text: Text, monospace_wrapping_width: int) -> int:
    """Calculates a wrapping width that also works for CJK characters.

    Chinese, Japanese and Korean characters are often broader than ascii
    characters:
    abcdefgh (8 chars)
    我要去北京 (5 chars, roughly same visible width)

    We need to account for that otherwise the wrapping doesn't work
    appropriately for long strings and the table overflows and creates
    errors.

    params:
        text: text sequence that should be wrapped into multiple lines
        monospace_wrapping_width: the maximum width per line in number of
            standard ascii characters
    returns:
        The maximum line width for the given string that takes into account
        the strings visible width, so that it won't lead to table overflow.
    """
    true_wrapping_width = 0

    # testing potential width from longest to shortest
    for potential_width in range(monospace_wrapping_width, -1, -1):
        lines = textwrap.wrap(text, potential_width)
        # test whether all lines' visible width fits the available width
        if all(
            [
                terminaltables.width_and_alignment.visible_width(line)
                <= monospace_wrapping_width
                for line in lines
            ]
        ):
            true_wrapping_width = potential_width
            break

    return true_wrapping_width