rasa/nlu/test.py

Summary

Maintainability
F
4 days
Test Coverage
A
95%
import copy
import itertools
import os
import logging
import structlog
from pathlib import Path

import numpy as np
from collections import defaultdict, namedtuple
from tqdm import tqdm
from typing import (
    Iterable,
    Iterator,
    Tuple,
    List,
    Set,
    Optional,
    Text,
    Union,
    Dict,
    Any,
    NamedTuple,
    TYPE_CHECKING,
)

from rasa import telemetry
from rasa.core.agent import Agent
from rasa.core.channels import UserMessage
from rasa.core.processor import MessageProcessor
from rasa.plugin import plugin_manager
from rasa.shared.nlu.training_data.training_data import TrainingData
from rasa.utils.common import TempDirectoryPath, get_temp_dir_name
import rasa.shared.utils.io
import rasa.utils.plotting as plot_utils
import rasa.utils.io as io_utils

from rasa.constants import TEST_DATA_FILE, TRAIN_DATA_FILE, NLG_DATA_FILE
import rasa.nlu.classifiers.fallback_classifier
from rasa.nlu.constants import (
    RESPONSE_SELECTOR_DEFAULT_INTENT,
    RESPONSE_SELECTOR_PROPERTY_NAME,
    RESPONSE_SELECTOR_PREDICTION_KEY,
    TOKENS_NAMES,
    ENTITY_ATTRIBUTE_CONFIDENCE_TYPE,
    ENTITY_ATTRIBUTE_CONFIDENCE_ROLE,
    ENTITY_ATTRIBUTE_CONFIDENCE_GROUP,
    RESPONSE_SELECTOR_RETRIEVAL_INTENTS,
)
from rasa.shared.nlu.constants import (
    TEXT,
    INTENT,
    INTENT_RESPONSE_KEY,
    ENTITIES,
    EXTRACTOR,
    PRETRAINED_EXTRACTORS,
    ENTITY_ATTRIBUTE_TYPE,
    ENTITY_ATTRIBUTE_GROUP,
    ENTITY_ATTRIBUTE_ROLE,
    NO_ENTITY_TAG,
    INTENT_NAME_KEY,
    PREDICTED_CONFIDENCE_KEY,
)
from rasa.nlu.classifiers import fallback_classifier
from rasa.nlu.tokenizers.tokenizer import Token
from rasa.shared.importers.importer import TrainingDataImporter
from rasa.shared.nlu.training_data.formats.rasa_yaml import RasaYAMLWriter

if TYPE_CHECKING:
    from typing_extensions import TypedDict

    EntityPrediction = TypedDict(
        "EntityPrediction",
        {
            "text": Text,
            "entities": List[Dict[Text, Any]],
            "predicted_entities": List[Dict[Text, Any]],
        },
    )
logger = logging.getLogger(__name__)
structlogger = structlog.get_logger()

# Exclude 'EntitySynonymMapper' and 'ResponseSelector' as their super class
# performs entity extraction but those two classifiers don't
ENTITY_PROCESSORS = {"EntitySynonymMapper", "ResponseSelector"}

EXTRACTORS_WITH_CONFIDENCES = {"CRFEntityExtractor", "DIETClassifier"}


class CVEvaluationResult(NamedTuple):
    """Stores NLU cross-validation results."""

    train: Dict
    test: Dict
    evaluation: Dict


NO_ENTITY = "no_entity"

IntentEvaluationResult = namedtuple(
    "IntentEvaluationResult", "intent_target intent_prediction message confidence"
)

ResponseSelectionEvaluationResult = namedtuple(
    "ResponseSelectionEvaluationResult",
    "intent_response_key_target intent_response_key_prediction message confidence",
)

EntityEvaluationResult = namedtuple(
    "EntityEvaluationResult", "entity_targets entity_predictions tokens message"
)

IntentMetrics = Dict[Text, List[float]]
EntityMetrics = Dict[Text, Dict[Text, List[float]]]
ResponseSelectionMetrics = Dict[Text, List[float]]


def log_evaluation_table(
    report: Text, precision: float, f1: float, accuracy: float
) -> None:  # pragma: no cover
    """Log the sklearn evaluation metrics."""
    logger.info(f"F1-Score:  {f1}")
    logger.info(f"Precision: {precision}")
    logger.info(f"Accuracy:  {accuracy}")
    logger.info(f"Classification report: \n{report}")


def remove_empty_intent_examples(
    intent_results: List[IntentEvaluationResult],
) -> List[IntentEvaluationResult]:
    """Remove those examples without an intent.

    Args:
        intent_results: intent evaluation results

    Returns: intent evaluation results
    """
    filtered = []
    for r in intent_results:
        # substitute None values with empty string
        # to enable sklearn evaluation
        if r.intent_prediction is None:
            r = r._replace(intent_prediction="")

        if r.intent_target != "" and r.intent_target is not None:
            filtered.append(r)

    return filtered


def remove_empty_response_examples(
    response_results: List[ResponseSelectionEvaluationResult],
) -> List[ResponseSelectionEvaluationResult]:
    """Remove those examples without a response.

    Args:
        response_results: response selection evaluation results

    Returns:
        Response selection evaluation results
    """
    filtered = []
    for r in response_results:
        # substitute None values with empty string
        # to enable sklearn evaluation
        if r.intent_response_key_prediction is None:
            r = r._replace(intent_response_key_prediction="")

        if r.confidence is None:
            # This might happen if response selector training data is present but
            # no response selector is part of the model
            r = r._replace(confidence=0.0)

        if r.intent_response_key_target:
            filtered.append(r)

    return filtered


def drop_intents_below_freq(
    training_data: TrainingData, cutoff: int = 5
) -> TrainingData:
    """Remove intent groups with less than cutoff instances.

    Args:
        training_data: training data
        cutoff: threshold

    Returns: updated training data
    """
    logger.debug(
        "Raw data intent examples: {}".format(len(training_data.intent_examples))
    )

    examples_per_intent = training_data.number_of_examples_per_intent
    return training_data.filter_training_examples(
        lambda ex: examples_per_intent.get(ex.get(INTENT), 0) >= cutoff
    )


def write_intent_successes(
    intent_results: List[IntentEvaluationResult], successes_filename: Text
) -> None:
    """Write successful intent predictions to a file.

    Args:
        intent_results: intent evaluation result
        successes_filename: filename of file to save successful predictions to
    """
    successes = [
        {
            "text": r.message,
            "intent": r.intent_target,
            "intent_prediction": {
                INTENT_NAME_KEY: r.intent_prediction,
                "confidence": r.confidence,
            },
        }
        for r in intent_results
        if r.intent_target == r.intent_prediction
    ]

    if successes:
        rasa.shared.utils.io.dump_obj_as_json_to_file(successes_filename, successes)
        logger.info(f"Successful intent predictions saved to {successes_filename}.")
        logger.debug(f"\n\nSuccessfully predicted the following intents: \n{successes}")
    else:
        logger.info("No successful intent predictions found.")


def _write_errors(errors: List[Dict], errors_filename: Text, error_type: Text) -> None:
    """Write incorrect intent predictions to a file.

    Args:
        errors: Serializable prediction errors.
        errors_filename: filename of file to save incorrect predictions to
        error_type: NLU entity which was evaluated (e.g. `intent` or `entity`).
    """
    if errors:
        rasa.shared.utils.io.dump_obj_as_json_to_file(errors_filename, errors)
        logger.info(f"Incorrect {error_type} predictions saved to {errors_filename}.")
        logger.debug(
            f"\n\nThese {error_type} examples could not be classified "
            f"correctly: \n{errors}"
        )
    else:
        logger.info(f"Every {error_type} was predicted correctly by the model.")


def _get_intent_errors(intent_results: List[IntentEvaluationResult]) -> List[Dict]:
    return [
        {
            "text": r.message,
            "intent": r.intent_target,
            "intent_prediction": {
                INTENT_NAME_KEY: r.intent_prediction,
                "confidence": r.confidence,
            },
        }
        for r in intent_results
        if r.intent_target != r.intent_prediction
    ]


def write_response_successes(
    response_results: List[ResponseSelectionEvaluationResult], successes_filename: Text
) -> None:
    """Write successful response selection predictions to a file.

    Args:
        response_results: response selection evaluation result
        successes_filename: filename of file to save successful predictions to
    """
    successes = [
        {
            "text": r.message,
            "intent_response_key_target": r.intent_response_key_target,
            "intent_response_key_prediction": {
                "name": r.intent_response_key_prediction,
                "confidence": r.confidence,
            },
        }
        for r in response_results
        if r.intent_response_key_prediction == r.intent_response_key_target
    ]

    if successes:
        rasa.shared.utils.io.dump_obj_as_json_to_file(successes_filename, successes)
        logger.info(f"Successful response predictions saved to {successes_filename}.")
        structlogger.debug("test.write.response", successes=copy.deepcopy(successes))
    else:
        logger.info("No successful response predictions found.")


def _response_errors(
    response_results: List[ResponseSelectionEvaluationResult],
) -> List[Dict]:
    """Write incorrect response selection predictions to a file.

    Args:
        response_results: response selection evaluation result

    Returns:
        Serializable prediction errors.
    """
    return [
        {
            "text": r.message,
            "intent_response_key_target": r.intent_response_key_target,
            "intent_response_key_prediction": {
                "name": r.intent_response_key_prediction,
                "confidence": r.confidence,
            },
        }
        for r in response_results
        if r.intent_response_key_prediction != r.intent_response_key_target
    ]


def plot_attribute_confidences(
    results: Union[
        List[IntentEvaluationResult], List[ResponseSelectionEvaluationResult]
    ],
    hist_filename: Optional[Text],
    target_key: Text,
    prediction_key: Text,
    title: Text,
) -> None:
    """Create histogram of confidence distribution.

    Args:
        results: evaluation results
        hist_filename: filename to save plot to
        target_key: key of target in results
        prediction_key: key of predictions in results
        title: title of plot
    """
    pos_hist = [
        r.confidence
        for r in results
        if getattr(r, target_key) == getattr(r, prediction_key)
    ]

    neg_hist = [
        r.confidence
        for r in results
        if getattr(r, target_key) != getattr(r, prediction_key)
    ]

    plot_utils.plot_paired_histogram([pos_hist, neg_hist], title, hist_filename)


def plot_entity_confidences(
    merged_targets: List[Text],
    merged_predictions: List[Text],
    merged_confidences: List[float],
    hist_filename: Text,
    title: Text,
) -> None:
    """Creates histogram of confidence distribution.

    Args:
        merged_targets: Entity labels.
        merged_predictions: Predicted entities.
        merged_confidences: Confidence scores of predictions.
        hist_filename: filename to save plot to
        title: title of plot
    """
    pos_hist = [
        confidence
        for target, prediction, confidence in zip(
            merged_targets, merged_predictions, merged_confidences
        )
        if target != NO_ENTITY and target == prediction
    ]

    neg_hist = [
        confidence
        for target, prediction, confidence in zip(
            merged_targets, merged_predictions, merged_confidences
        )
        if prediction not in (NO_ENTITY, target)
    ]

    plot_utils.plot_paired_histogram([pos_hist, neg_hist], title, hist_filename)


def evaluate_response_selections(
    response_selection_results: List[ResponseSelectionEvaluationResult],
    output_directory: Optional[Text],
    successes: bool,
    errors: bool,
    disable_plotting: bool,
    report_as_dict: Optional[bool] = None,
) -> Dict:  # pragma: no cover
    """Creates summary statistics for response selection.

    Only considers those examples with a set response.
    Others are filtered out. Returns a dictionary of containing the
    evaluation result.

    Args:
        response_selection_results: response selection evaluation results
        output_directory: directory to store files to
        successes: if True success are written down to disk
        errors: if True errors are written down to disk
        disable_plotting: if True no plots are created
        report_as_dict: `True` if the evaluation report should be returned as `dict`.
            If `False` the report is returned in a human-readable text format. If `None`
            `report_as_dict` is considered as `True` in case an `output_directory` is
            given.

    Returns: dictionary with evaluation results
    """
    # remove empty response targets
    num_examples = len(response_selection_results)
    response_selection_results = remove_empty_response_examples(
        response_selection_results
    )

    logger.info(
        f"Response Selection Evaluation: Only considering those "
        f"{len(response_selection_results)} examples that have a defined response out "
        f"of {num_examples} examples."
    )

    (
        target_intent_response_keys,
        predicted_intent_response_keys,
    ) = _targets_predictions_from(
        response_selection_results,
        "intent_response_key_target",
        "intent_response_key_prediction",
    )

    report, precision, f1, accuracy, confusion_matrix, labels = _calculate_report(
        output_directory,
        target_intent_response_keys,
        predicted_intent_response_keys,
        report_as_dict,
    )
    if output_directory:
        _dump_report(output_directory, "response_selection_report.json", report)

    if successes:
        successes_filename = "response_selection_successes.json"
        if output_directory:
            successes_filename = os.path.join(output_directory, successes_filename)
        # save classified samples to file for debugging
        write_response_successes(response_selection_results, successes_filename)

    response_errors = _response_errors(response_selection_results)

    if errors and output_directory:
        errors_filename = "response_selection_errors.json"
        errors_filename = os.path.join(output_directory, errors_filename)
        _write_errors(response_errors, errors_filename, error_type="response")

    if not disable_plotting:
        confusion_matrix_filename = "response_selection_confusion_matrix.png"
        if output_directory:
            confusion_matrix_filename = os.path.join(
                output_directory, confusion_matrix_filename
            )

        plot_utils.plot_confusion_matrix(
            confusion_matrix,
            classes=labels,
            title="Response Selection Confusion Matrix",
            output_file=confusion_matrix_filename,
        )

        histogram_filename = "response_selection_histogram.png"
        if output_directory:
            histogram_filename = os.path.join(output_directory, histogram_filename)
        plot_attribute_confidences(
            response_selection_results,
            histogram_filename,
            "intent_response_key_target",
            "intent_response_key_prediction",
            title="Response Selection Prediction Confidence Distribution",
        )

    predictions = [
        {
            "text": res.message,
            "intent_response_key_target": res.intent_response_key_target,
            "intent_response_key_prediction": res.intent_response_key_prediction,
            "confidence": res.confidence,
        }
        for res in response_selection_results
    ]

    return {
        "predictions": predictions,
        "report": report,
        "precision": precision,
        "f1_score": f1,
        "accuracy": accuracy,
        "errors": response_errors,
    }


def _add_confused_labels_to_report(
    report: Dict[Text, Dict[Text, Any]],
    confusion_matrix: np.ndarray,
    labels: List[Text],
    exclude_labels: Optional[List[Text]] = None,
) -> Dict[Text, Dict[Text, Union[Dict, Any]]]:
    """Adds a field "confused_with" to the evaluation report.

    The value is a dict of {"false_positive_label": false_positive_count} pairs.
    If there are no false positives in the confusion matrix,
    the dict will be empty. Typically we include the two most
    commonly false positive labels, three in the rare case that
    the diagonal element in the confusion matrix is not one of the
    three highest values in the row.

    Args:
        report: the evaluation report
        confusion_matrix: confusion matrix
        labels: list of labels

    Returns: updated evaluation report
    """
    if exclude_labels is None:
        exclude_labels = []

    # sort confusion matrix by false positives
    indices = np.argsort(confusion_matrix, axis=1)
    n_candidates = min(3, len(labels))

    for label in labels:
        if label in exclude_labels:
            continue
        # it is possible to predict intent 'None'
        if report.get(label):
            report[label]["confused_with"] = {}

    for i, label in enumerate(labels):
        if label in exclude_labels:
            continue
        for j in range(n_candidates):
            label_idx = indices[i, -(1 + j)]
            false_pos_label = labels[label_idx]
            false_positives = int(confusion_matrix[i, label_idx])
            if (
                false_pos_label != label
                and false_pos_label not in exclude_labels
                and false_positives > 0
            ):
                report[label]["confused_with"][false_pos_label] = false_positives

    return report


def evaluate_intents(
    intent_results: List[IntentEvaluationResult],
    output_directory: Optional[Text],
    successes: bool,
    errors: bool,
    disable_plotting: bool,
    report_as_dict: Optional[bool] = None,
) -> Dict:  # pragma: no cover
    """Creates summary statistics for intents.

    Only considers those examples with a set intent. Others are filtered out.
    Returns a dictionary of containing the evaluation result.

    Args:
        intent_results: intent evaluation results
        output_directory: directory to store files to
        successes: if True correct predictions are written to disk
        errors: if True incorrect predictions are written to disk
        disable_plotting: if True no plots are created
        report_as_dict: `True` if the evaluation report should be returned as `dict`.
            If `False` the report is returned in a human-readable text format. If `None`
            `report_as_dict` is considered as `True` in case an `output_directory` is
            given.

    Returns: dictionary with evaluation results
    """
    # remove empty intent targets
    num_examples = len(intent_results)
    intent_results = remove_empty_intent_examples(intent_results)

    logger.info(
        f"Intent Evaluation: Only considering those {len(intent_results)} examples "
        f"that have a defined intent out of {num_examples} examples."
    )

    target_intents, predicted_intents = _targets_predictions_from(
        intent_results, "intent_target", "intent_prediction"
    )

    report, precision, f1, accuracy, confusion_matrix, labels = _calculate_report(
        output_directory, target_intents, predicted_intents, report_as_dict
    )
    if output_directory:
        _dump_report(output_directory, "intent_report.json", report)

    if successes and output_directory:
        successes_filename = os.path.join(output_directory, "intent_successes.json")
        # save classified samples to file for debugging
        write_intent_successes(intent_results, successes_filename)

    intent_errors = _get_intent_errors(intent_results)
    if errors and output_directory:
        errors_filename = os.path.join(output_directory, "intent_errors.json")
        _write_errors(intent_errors, errors_filename, "intent")

    if not disable_plotting:
        confusion_matrix_filename = "intent_confusion_matrix.png"
        if output_directory:
            confusion_matrix_filename = os.path.join(
                output_directory, confusion_matrix_filename
            )
        plot_utils.plot_confusion_matrix(
            confusion_matrix,
            classes=labels,
            title="Intent Confusion matrix",
            output_file=confusion_matrix_filename,
        )

        histogram_filename = "intent_histogram.png"
        if output_directory:
            histogram_filename = os.path.join(output_directory, histogram_filename)
        plot_attribute_confidences(
            intent_results,
            histogram_filename,
            "intent_target",
            "intent_prediction",
            title="Intent Prediction Confidence Distribution",
        )

    predictions = [
        {
            "text": res.message,
            "intent": res.intent_target,
            "predicted": res.intent_prediction,
            "confidence": res.confidence,
        }
        for res in intent_results
    ]

    return {
        "predictions": predictions,
        "report": report,
        "precision": precision,
        "f1_score": f1,
        "accuracy": accuracy,
        "errors": intent_errors,
    }


def _calculate_report(
    output_directory: Optional[Text],
    targets: Iterable[Any],
    predictions: Iterable[Any],
    report_as_dict: Optional[bool] = None,
    exclude_label: Optional[Text] = None,
) -> Tuple[Union[Text, Dict], float, float, float, np.ndarray, List[Text]]:
    from rasa.model_testing import get_evaluation_metrics
    import sklearn.metrics
    import sklearn.utils.multiclass

    confusion_matrix = sklearn.metrics.confusion_matrix(targets, predictions)
    labels = sklearn.utils.multiclass.unique_labels(targets, predictions)

    if report_as_dict is None:
        report_as_dict = bool(output_directory)

    report, precision, f1, accuracy = get_evaluation_metrics(
        targets, predictions, output_dict=report_as_dict, exclude_label=exclude_label
    )

    if report_as_dict:
        report = _add_confused_labels_to_report(  # type: ignore[assignment]
            report,
            confusion_matrix,
            labels,
            exclude_labels=[exclude_label] if exclude_label else [],
        )
    elif not output_directory:
        log_evaluation_table(report, precision, f1, accuracy)

    return report, precision, f1, accuracy, confusion_matrix, labels


def _dump_report(output_directory: Text, filename: Text, report: Dict) -> None:
    report_filename = os.path.join(output_directory, filename)
    rasa.shared.utils.io.dump_obj_as_json_to_file(report_filename, report)
    logger.info(f"Classification report saved to {report_filename}.")


def merge_labels(
    aligned_predictions: List[Dict], extractor: Optional[Text] = None
) -> List[Text]:
    """Concatenates all labels of the aligned predictions.

    Takes the aligned prediction labels which are grouped for each message
    and concatenates them.

    Args:
        aligned_predictions: aligned predictions
        extractor: entity extractor name

    Returns:
        Concatenated predictions
    """
    if extractor:
        label_lists = [ap["extractor_labels"][extractor] for ap in aligned_predictions]
    else:
        label_lists = [ap["target_labels"] for ap in aligned_predictions]

    return list(itertools.chain(*label_lists))


def merge_confidences(
    aligned_predictions: List[Dict], extractor: Optional[Text] = None
) -> List[float]:
    """Concatenates all confidences of the aligned predictions.

    Takes the aligned prediction confidences which are grouped for each message
    and concatenates them.

    Args:
        aligned_predictions: aligned predictions
        extractor: entity extractor name

    Returns:
        Concatenated confidences
    """
    label_lists = [ap["confidences"][extractor] for ap in aligned_predictions]
    return list(itertools.chain(*label_lists))


def substitute_labels(labels: List[Text], old: Text, new: Text) -> List[Text]:
    """Replaces label names in a list of labels.

    Args:
        labels: list of labels
        old: old label name that should be replaced
        new: new label name

    Returns: updated labels
    """
    return [new if label == old else label for label in labels]


def collect_incorrect_entity_predictions(
    entity_results: List[EntityEvaluationResult],
    merged_predictions: List[Text],
    merged_targets: List[Text],
) -> List["EntityPrediction"]:
    """Get incorrect entity predictions.

    Args:
        entity_results: entity evaluation results
        merged_predictions: list of predicted entity labels
        merged_targets: list of true entity labels

    Returns: list of incorrect predictions
    """
    errors = []
    offset = 0
    for entity_result in entity_results:
        for i in range(offset, offset + len(entity_result.tokens)):
            if merged_targets[i] != merged_predictions[i]:
                prediction: EntityPrediction = {
                    "text": entity_result.message,
                    "entities": entity_result.entity_targets,
                    "predicted_entities": entity_result.entity_predictions,
                }
                errors.append(prediction)
                break
        offset += len(entity_result.tokens)
    return errors


def write_successful_entity_predictions(
    entity_results: List[EntityEvaluationResult],
    merged_targets: List[Text],
    merged_predictions: List[Text],
    successes_filename: Text,
) -> None:
    """Write correct entity predictions to a file.

    Args:
        entity_results: response selection evaluation result
        merged_predictions: list of predicted entity labels
        merged_targets: list of true entity labels
        successes_filename: filename of file to save correct predictions to
    """
    successes = collect_successful_entity_predictions(
        entity_results, merged_predictions, merged_targets
    )

    if successes:
        rasa.shared.utils.io.dump_obj_as_json_to_file(successes_filename, successes)
        logger.info(f"Successful entity predictions saved to {successes_filename}.")
        structlogger.debug("test.write.entities", successes=copy.deepcopy(successes))
    else:
        logger.info("No successful entity prediction found.")


def collect_successful_entity_predictions(
    entity_results: List[EntityEvaluationResult],
    merged_predictions: List[Text],
    merged_targets: List[Text],
) -> List["EntityPrediction"]:
    """Get correct entity predictions.

    Args:
        entity_results: entity evaluation results
        merged_predictions: list of predicted entity labels
        merged_targets: list of true entity labels

    Returns: list of correct predictions
    """
    successes = []
    offset = 0
    for entity_result in entity_results:
        for i in range(offset, offset + len(entity_result.tokens)):
            if (
                merged_targets[i] == merged_predictions[i]
                and merged_targets[i] != NO_ENTITY
            ):
                prediction: EntityPrediction = {
                    "text": entity_result.message,
                    "entities": entity_result.entity_targets,
                    "predicted_entities": entity_result.entity_predictions,
                }
                successes.append(prediction)
                break
        offset += len(entity_result.tokens)
    return successes


def evaluate_entities(
    entity_results: List[EntityEvaluationResult],
    extractors: Set[Text],
    output_directory: Optional[Text],
    successes: bool,
    errors: bool,
    disable_plotting: bool,
    report_as_dict: Optional[bool] = None,
) -> Dict:  # pragma: no cover
    """Creates summary statistics for each entity extractor.

    Logs precision, recall, and F1 per entity type for each extractor.

    Args:
        entity_results: entity evaluation results
        extractors: entity extractors to consider
        output_directory: directory to store files to
        successes: if True correct predictions are written to disk
        errors: if True incorrect predictions are written to disk
        disable_plotting: if True no plots are created
        report_as_dict: `True` if the evaluation report should be returned as `dict`.
            If `False` the report is returned in a human-readable text format. If `None`
            `report_as_dict` is considered as `True` in case an `output_directory` is
            given.

    Returns: dictionary with evaluation results
    """
    aligned_predictions = align_all_entity_predictions(entity_results, extractors)
    merged_targets = merge_labels(aligned_predictions)
    merged_targets = substitute_labels(merged_targets, NO_ENTITY_TAG, NO_ENTITY)

    result = {}

    for extractor in extractors:
        merged_predictions = merge_labels(aligned_predictions, extractor)
        merged_predictions = substitute_labels(
            merged_predictions, NO_ENTITY_TAG, NO_ENTITY
        )

        cleaned_targets = plugin_manager().hook.clean_entity_targets_for_evaluation(
            merged_targets=merged_targets, extractor=extractor
        )
        if len(cleaned_targets) > 0:
            cleaned_targets = cleaned_targets[0]
        else:
            cleaned_targets = merged_targets

        logger.info(f"Evaluation for entity extractor: {extractor} ")

        report, precision, f1, accuracy, confusion_matrix, labels = _calculate_report(
            output_directory,
            cleaned_targets,
            merged_predictions,
            report_as_dict,
            exclude_label=NO_ENTITY,
        )
        if output_directory:

            _dump_report(output_directory, f"{extractor}_report.json", report)

        if successes:
            successes_filename = f"{extractor}_successes.json"
            if output_directory:
                successes_filename = os.path.join(output_directory, successes_filename)
            # save classified samples to file for debugging
            write_successful_entity_predictions(
                entity_results, cleaned_targets, merged_predictions, successes_filename
            )

        entity_errors = collect_incorrect_entity_predictions(
            entity_results, merged_predictions, cleaned_targets
        )
        if errors and output_directory:
            errors_filename = os.path.join(output_directory, f"{extractor}_errors.json")

            _write_errors(entity_errors, errors_filename, "entity")

        if not disable_plotting:
            confusion_matrix_filename = f"{extractor}_confusion_matrix.png"
            if output_directory:
                confusion_matrix_filename = os.path.join(
                    output_directory, confusion_matrix_filename
                )
            plot_utils.plot_confusion_matrix(
                confusion_matrix,
                classes=labels,
                title="Entity Confusion matrix",
                output_file=confusion_matrix_filename,
            )

            if extractor in EXTRACTORS_WITH_CONFIDENCES:
                merged_confidences = merge_confidences(aligned_predictions, extractor)
                histogram_filename = f"{extractor}_histogram.png"
                if output_directory:
                    histogram_filename = os.path.join(
                        output_directory, histogram_filename
                    )
                plot_entity_confidences(
                    cleaned_targets,
                    merged_predictions,
                    merged_confidences,
                    title="Entity Prediction Confidence Distribution",
                    hist_filename=histogram_filename,
                )

        result[extractor] = {
            "report": report,
            "precision": precision,
            "f1_score": f1,
            "accuracy": accuracy,
            "errors": entity_errors,
        }

    return result


def is_token_within_entity(token: Token, entity: Dict) -> bool:
    """Checks if a token is within the boundaries of an entity."""
    return determine_intersection(token, entity) == len(token.text)


def does_token_cross_borders(token: Token, entity: Dict) -> bool:
    """Checks if a token crosses the boundaries of an entity."""
    num_intersect = determine_intersection(token, entity)
    return 0 < num_intersect < len(token.text)


def determine_intersection(token: Token, entity: Dict) -> int:
    """Calculates how many characters a given token and entity share."""
    pos_token = set(range(token.start, token.end))
    pos_entity = set(range(entity["start"], entity["end"]))
    return len(pos_token.intersection(pos_entity))


def do_entities_overlap(entities: List[Dict]) -> bool:
    """Checks if entities overlap.

    I.e. cross each others start and end boundaries.

    Args:
        entities: list of entities

    Returns: true if entities overlap, false otherwise.
    """
    sorted_entities = sorted(entities, key=lambda e: e["start"])
    for i in range(len(sorted_entities) - 1):
        curr_ent = sorted_entities[i]
        next_ent = sorted_entities[i + 1]
        if (
            next_ent["start"] < curr_ent["end"]
            and next_ent["entity"] != curr_ent["entity"]
        ):
            structlogger.warning(
                "test.overlaping.entities",
                current_entity=copy.deepcopy(curr_ent),
                next_entity=copy.deepcopy(next_ent),
            )
            return True

    return False


def find_intersecting_entities(token: Token, entities: List[Dict]) -> List[Dict]:
    """Finds the entities that intersect with a token.

    Args:
        token: a single token
        entities: entities found by a single extractor

    Returns: list of entities
    """
    candidates = []
    for e in entities:
        if is_token_within_entity(token, e):
            candidates.append(e)
        elif does_token_cross_borders(token, e):
            candidates.append(e)
            structlogger.debug(
                "test.intersecting.entities",
                token_text=copy.deepcopy(token.text),
                token_start=token.start,
                token_end=token.end,
                entity=copy.deepcopy(e),
            )
    return candidates


def pick_best_entity_fit(
    token: Token, candidates: List[Dict[Text, Any]]
) -> Optional[Dict[Text, Any]]:
    """Determines the best fitting entity given intersecting entities.

    Args:
        token: a single token
        candidates: entities found by a single extractor
        attribute_key: the attribute key of interest

    Returns:
        the value of the attribute key of the best fitting entity
    """
    if len(candidates) == 0:
        return None
    elif len(candidates) == 1:
        return candidates[0]
    else:
        best_fit = np.argmax([determine_intersection(token, c) for c in candidates])
        return candidates[int(best_fit)]


def determine_token_labels(
    token: Token,
    entities: List[Dict],
    extractors: Optional[Set[Text]] = None,
    attribute_key: Text = ENTITY_ATTRIBUTE_TYPE,
) -> Text:
    """Determines the token label for the provided attribute key given entities that do
    not overlap.

    Args:
        token: a single token
        entities: entities found by a single extractor
        extractors: list of extractors
        attribute_key: the attribute key for which the entity type should be returned
    Returns:
        entity type
    """
    entity = determine_entity_for_token(token, entities, extractors)

    if entity is None:
        return NO_ENTITY_TAG

    label = entity.get(attribute_key)

    if not label:
        return NO_ENTITY_TAG

    return label


def determine_entity_for_token(
    token: Token,
    entities: List[Dict[Text, Any]],
    extractors: Optional[Set[Text]] = None,
) -> Optional[Dict[Text, Any]]:
    """Determines the best fitting entity for the given token, given entities that do
    not overlap.

    Args:
        token: a single token
        entities: entities found by a single extractor
        extractors: list of extractors

    Returns:
        entity type
    """
    if entities is None or len(entities) == 0:
        return None
    if do_any_extractors_not_support_overlap(extractors) and do_entities_overlap(
        entities
    ):
        raise ValueError("The possible entities should not overlap.")

    candidates = find_intersecting_entities(token, entities)
    return pick_best_entity_fit(token, candidates)


def do_any_extractors_not_support_overlap(extractors: Optional[Set[Text]]) -> bool:
    """Checks if any extractor does not support overlapping entities.

    Args:
        Names of the entitiy extractors

    Returns:
        `True` if and only if CRFEntityExtractor or DIETClassifier is in `extractors`
    """
    if extractors is None:
        return False

    from rasa.nlu.extractors.crf_entity_extractor import CRFEntityExtractor
    from rasa.nlu.classifiers.diet_classifier import DIETClassifier

    return not extractors.isdisjoint(
        {CRFEntityExtractor.__name__, DIETClassifier.__name__}
    )


def align_entity_predictions(
    result: EntityEvaluationResult, extractors: Set[Text]
) -> Dict:
    """Aligns entity predictions to the message tokens.

    Determines for every token the true label based on the
    prediction targets and the label assigned by each
    single extractor.

    Args:
        result: entity evaluation result
        extractors: the entity extractors that should be considered

    Returns: dictionary containing the true token labels and token labels
             from the extractors
    """
    true_token_labels = []
    entities_by_extractors: Dict[Text, List] = {
        extractor: [] for extractor in extractors
    }
    for p in result.entity_predictions:
        entities_by_extractors[p[EXTRACTOR]].append(p)
    extractor_labels: Dict[Text, List] = {extractor: [] for extractor in extractors}
    extractor_confidences: Dict[Text, List] = {
        extractor: [] for extractor in extractors
    }
    for t in result.tokens:
        true_token_labels.append(_concat_entity_labels(t, result.entity_targets))
        for extractor, entities in entities_by_extractors.items():
            extracted_labels = _concat_entity_labels(t, entities, {extractor})
            extracted_confidences = _get_entity_confidences(t, entities, {extractor})
            extractor_labels[extractor].append(extracted_labels)
            extractor_confidences[extractor].append(extracted_confidences)

    return {
        "target_labels": true_token_labels,
        "extractor_labels": extractor_labels,
        "confidences": extractor_confidences,
    }


def _concat_entity_labels(
    token: Token, entities: List[Dict], extractors: Optional[Set[Text]] = None
) -> Text:
    """Concatenate labels for entity type, role, and group for evaluation.

    In order to calculate metrics also for entity type, role, and group we need to
    concatenate their labels. For example, 'location.destination'. This allows
    us to report metrics for every combination of entity type, role, and group.

    Args:
        token: the token we are looking at
        entities: the available entities
        extractors: the extractor of interest

    Returns:
        the entity label of the provided token
    """
    entity_label = determine_token_labels(
        token, entities, extractors, ENTITY_ATTRIBUTE_TYPE
    )
    group_label = determine_token_labels(
        token, entities, extractors, ENTITY_ATTRIBUTE_GROUP
    )
    role_label = determine_token_labels(
        token, entities, extractors, ENTITY_ATTRIBUTE_ROLE
    )

    if entity_label == role_label == group_label == NO_ENTITY_TAG:
        return NO_ENTITY_TAG

    labels = [entity_label, group_label, role_label]
    labels = [label for label in labels if label != NO_ENTITY_TAG]

    return ".".join(labels)


def _get_entity_confidences(
    token: Token, entities: List[Dict], extractors: Optional[Set[Text]] = None
) -> float:
    """Get the confidence value of the best fitting entity.

    If multiple confidence values are present, e.g. for type, role, group, we
    pick the lowest confidence value.

    Args:
        token: the token we are looking at
        entities: the available entities
        extractors: the extractor of interest

    Returns:
        the confidence value
    """
    entity = determine_entity_for_token(token, entities, extractors)

    if entity is None:
        return 0.0

    if entity.get("extractor") not in EXTRACTORS_WITH_CONFIDENCES:
        return 0.0

    conf_type = entity.get(ENTITY_ATTRIBUTE_CONFIDENCE_TYPE) or 1.0
    conf_role = entity.get(ENTITY_ATTRIBUTE_CONFIDENCE_ROLE) or 1.0
    conf_group = entity.get(ENTITY_ATTRIBUTE_CONFIDENCE_GROUP) or 1.0

    return min(conf_type, conf_role, conf_group)


def align_all_entity_predictions(
    entity_results: List[EntityEvaluationResult], extractors: Set[Text]
) -> List[Dict]:
    """Aligns entity predictions to the message tokens for the whole dataset
    using align_entity_predictions.

    Args:
        entity_results: list of entity prediction results
        extractors: the entity extractors that should be considered

    Returns: list of dictionaries containing the true token labels and token
    labels from the extractors
    """
    aligned_predictions = []
    for result in entity_results:
        aligned_predictions.append(align_entity_predictions(result, extractors))

    return aligned_predictions


async def get_eval_data(
    processor: MessageProcessor, test_data: TrainingData
) -> Tuple[
    List[IntentEvaluationResult],
    List[ResponseSelectionEvaluationResult],
    List[EntityEvaluationResult],
]:
    """Runs the model for the test set and extracts targets and predictions.

    Returns intent results (intent targets and predictions, the original
    messages and the confidences of the predictions), response results (
    response targets and predictions) as well as entity results
    (entity_targets, entity_predictions, and tokens).

    Args:
        processor: the processor
        test_data: test data

    Returns: intent, response, and entity evaluation results
    """
    logger.info("Running model for predictions:")

    intent_results, entity_results, response_selection_results = [], [], []

    response_labels = {
        e.get(INTENT_RESPONSE_KEY)
        for e in test_data.intent_examples
        if e.get(INTENT_RESPONSE_KEY) is not None
    }
    intent_labels = {e.get(INTENT) for e in test_data.intent_examples}
    should_eval_intents = len(intent_labels) >= 2
    should_eval_response_selection = len(response_labels) >= 2
    should_eval_entities = len(test_data.entity_examples) > 0

    for example in tqdm(test_data.nlu_examples):
        tracker = plugin_manager().hook.mock_tracker_for_evaluation(
            example=example, model_metadata=processor.model_metadata
        )
        # if the user overwrites the default implementation take the last tracker
        if isinstance(tracker, list):
            if len(tracker) > 0:
                tracker = tracker[-1]
            else:
                tracker = None
        result = await processor.parse_message(
            UserMessage(text=example.get(TEXT)),
            tracker=tracker,
            only_output_properties=False,
        )
        _remove_entities_of_extractors(result, PRETRAINED_EXTRACTORS)
        if should_eval_intents:
            if fallback_classifier.is_fallback_classifier_prediction(result):
                # Revert fallback prediction to not shadow
                # the wrongly predicted intent
                # during the test phase.
                result = fallback_classifier.undo_fallback_prediction(result)
            intent_prediction = result.get(INTENT, {})
            intent_results.append(
                IntentEvaluationResult(
                    example.get(INTENT, ""),
                    intent_prediction.get(INTENT_NAME_KEY),
                    result.get(TEXT),
                    intent_prediction.get("confidence"),
                )
            )

        if should_eval_response_selection:
            # including all examples here. Empty response examples are filtered at the
            # time of metric calculation
            intent_target = example.get(INTENT, "")
            selector_properties = result.get(RESPONSE_SELECTOR_PROPERTY_NAME, {})
            response_selector_retrieval_intents = selector_properties.get(
                RESPONSE_SELECTOR_RETRIEVAL_INTENTS, set()
            )
            if (
                intent_target in response_selector_retrieval_intents
                and intent_target in selector_properties
            ):
                response_prediction_key = intent_target
            else:
                response_prediction_key = RESPONSE_SELECTOR_DEFAULT_INTENT

            response_prediction = selector_properties.get(
                response_prediction_key, {}
            ).get(RESPONSE_SELECTOR_PREDICTION_KEY, {})

            intent_response_key_target = example.get(INTENT_RESPONSE_KEY, "")

            response_selection_results.append(
                ResponseSelectionEvaluationResult(
                    intent_response_key_target,
                    response_prediction.get(INTENT_RESPONSE_KEY),
                    result.get(TEXT),
                    response_prediction.get(PREDICTED_CONFIDENCE_KEY),
                )
            )

        if should_eval_entities:
            entity_results.append(
                EntityEvaluationResult(
                    example.get(ENTITIES, []),
                    result.get(ENTITIES, []),
                    result.get(TOKENS_NAMES[TEXT], []),
                    result.get(TEXT),
                )
            )

    return intent_results, response_selection_results, entity_results


def _get_active_entity_extractors(
    entity_results: List[EntityEvaluationResult],
) -> Set[Text]:
    """Finds the names of entity extractors from the EntityEvaluationResults."""
    extractors: Set[Text] = set()
    for result in entity_results:
        for prediction in result.entity_predictions:
            if EXTRACTOR in prediction:
                extractors.add(prediction[EXTRACTOR])
    return extractors


def _remove_entities_of_extractors(
    nlu_parse_result: Dict[Text, Any], extractor_names: Set[Text]
) -> None:
    """Removes the entities annotated by the given extractor names."""
    entities = nlu_parse_result.get(ENTITIES)
    if not entities:
        return
    filtered_entities = [e for e in entities if e.get(EXTRACTOR) not in extractor_names]
    nlu_parse_result[ENTITIES] = filtered_entities


async def run_evaluation(
    data_path: Text,
    processor: MessageProcessor,
    output_directory: Optional[Text] = None,
    successes: bool = False,
    errors: bool = False,
    disable_plotting: bool = False,
    report_as_dict: Optional[bool] = None,
    domain_path: Optional[Text] = None,
) -> Dict:  # pragma: no cover
    """Evaluate intent classification, response selection and entity extraction.

    Args:
        data_path: path to the test data
        processor: the processor used to process and predict
        output_directory: path to folder where all output will be stored
        successes: if true successful predictions are written to a file
        errors: if true incorrect predictions are written to a file
        disable_plotting: if true confusion matrix and histogram will not be rendered
        report_as_dict: `True` if the evaluation report should be returned as `dict`.
            If `False` the report is returned in a human-readable text format. If `None`
            `report_as_dict` is considered as `True` in case an `output_directory` is
            given.
        domain_path: Path to the domain file(s).

    Returns: dictionary containing evaluation results
    """
    import rasa.shared.nlu.training_data.loading
    from rasa.shared.constants import DEFAULT_DOMAIN_PATH

    test_data_importer = TrainingDataImporter.load_from_dict(
        training_data_paths=[data_path],
        domain_path=domain_path if domain_path else DEFAULT_DOMAIN_PATH,
    )
    test_data = test_data_importer.get_nlu_data()

    result: Dict[Text, Optional[Dict]] = {
        "intent_evaluation": None,
        "entity_evaluation": None,
        "response_selection_evaluation": None,
    }

    if output_directory:
        rasa.shared.utils.io.create_directory(output_directory)

    (intent_results, response_selection_results, entity_results) = await get_eval_data(
        processor, test_data
    )

    if intent_results:
        logger.info("Intent evaluation results:")
        result["intent_evaluation"] = evaluate_intents(
            intent_results,
            output_directory,
            successes,
            errors,
            disable_plotting,
            report_as_dict=report_as_dict,
        )

    if response_selection_results:
        logger.info("Response selection evaluation results:")
        result["response_selection_evaluation"] = evaluate_response_selections(
            response_selection_results,
            output_directory,
            successes,
            errors,
            disable_plotting,
            report_as_dict=report_as_dict,
        )

    if any(entity_results):
        logger.info("Entity evaluation results:")
        extractors = _get_active_entity_extractors(entity_results)
        result["entity_evaluation"] = evaluate_entities(
            entity_results,
            extractors,
            output_directory,
            successes,
            errors,
            disable_plotting,
            report_as_dict=report_as_dict,
        )

    telemetry.track_nlu_model_test(test_data)

    return result


def generate_folds(
    n: int, training_data: TrainingData
) -> Iterator[Tuple[TrainingData, TrainingData]]:
    """Generates n cross validation folds for given training data."""
    from sklearn.model_selection import StratifiedKFold

    skf = StratifiedKFold(n_splits=n, shuffle=True)
    x = training_data.intent_examples

    # Get labels as they appear in the training data because we want a
    # stratified split on all intents(including retrieval intents if they exist)
    y = [example.get_full_intent() for example in x]
    for i_fold, (train_index, test_index) in enumerate(skf.split(x, y)):
        logger.debug(f"Fold: {i_fold}")
        train = [x[i] for i in train_index]
        test = [x[i] for i in test_index]
        yield (
            TrainingData(
                training_examples=train,
                entity_synonyms=training_data.entity_synonyms,
                regex_features=training_data.regex_features,
                lookup_tables=training_data.lookup_tables,
                responses=training_data.responses,
            ),
            TrainingData(
                training_examples=test,
                entity_synonyms=training_data.entity_synonyms,
                regex_features=training_data.regex_features,
                lookup_tables=training_data.lookup_tables,
                responses=training_data.responses,
            ),
        )


async def combine_result(
    intent_metrics: IntentMetrics,
    entity_metrics: EntityMetrics,
    response_selection_metrics: ResponseSelectionMetrics,
    processor: MessageProcessor,
    data: TrainingData,
    intent_results: Optional[List[IntentEvaluationResult]] = None,
    entity_results: Optional[List[EntityEvaluationResult]] = None,
    response_selection_results: Optional[
        List[ResponseSelectionEvaluationResult]
    ] = None,
) -> Tuple[IntentMetrics, EntityMetrics, ResponseSelectionMetrics]:
    """Collects intent, response selection and entity metrics for cross validation
    folds.

    If `intent_results`, `response_selection_results` or `entity_results` is provided
    as a list, prediction results are also collected.

    Args:
        intent_metrics: intent metrics
        entity_metrics: entity metrics
        response_selection_metrics: response selection metrics
        processor: the processor
        data: training data
        intent_results: intent evaluation results
        entity_results: entity evaluation results
        response_selection_results: reponse selection evaluation results

    Returns: intent, entity, and response selection metrics
    """
    (
        intent_current_metrics,
        entity_current_metrics,
        response_selection_current_metrics,
        current_intent_results,
        current_entity_results,
        current_response_selection_results,
    ) = await compute_metrics(processor, data)

    if intent_results is not None:
        intent_results += current_intent_results

    if entity_results is not None:
        entity_results += current_entity_results

    if response_selection_results is not None:
        response_selection_results += current_response_selection_results

    for k, v in intent_current_metrics.items():
        intent_metrics[k] = v + intent_metrics[k]

    for k, v in response_selection_current_metrics.items():
        response_selection_metrics[k] = v + response_selection_metrics[k]

    for extractor, extractor_metric in entity_current_metrics.items():
        entity_metrics[extractor] = {
            k: v + entity_metrics[extractor][k] for k, v in extractor_metric.items()
        }

    return intent_metrics, entity_metrics, response_selection_metrics


def _contains_entity_labels(entity_results: List[EntityEvaluationResult]) -> bool:

    for result in entity_results:
        if result.entity_targets or result.entity_predictions:
            return True
    return False


async def cross_validate(
    data: TrainingData,
    n_folds: int,
    nlu_config: Union[Text, Dict],
    output: Optional[Text] = None,
    successes: bool = False,
    errors: bool = False,
    disable_plotting: bool = False,
    report_as_dict: Optional[bool] = None,
) -> Tuple[CVEvaluationResult, CVEvaluationResult, CVEvaluationResult]:
    """Stratified cross validation on data.

    Args:
        data: Training Data
        n_folds: integer, number of cv folds
        nlu_config: nlu config file
        output: path to folder where reports are stored
        successes: if true successful predictions are written to a file
        errors: if true incorrect predictions are written to a file
        disable_plotting: if true no confusion matrix and historgram plates are created
        report_as_dict: `True` if the evaluation report should be returned as `dict`.
            If `False` the report is returned in a human-readable text format. If `None`
            `report_as_dict` is considered as `True` in case an `output_directory` is
            given.

    Returns:
        dictionary with key, list structure, where each entry in list
              corresponds to the relevant result for one fold
    """
    import rasa.model_training

    with TempDirectoryPath(get_temp_dir_name()) as temp_dir:
        tmp_path = Path(temp_dir)

        if isinstance(nlu_config, Dict):
            config_path = tmp_path / "config.yml"
            rasa.shared.utils.io.write_yaml(nlu_config, config_path)
            nlu_config = str(config_path)

        if output:
            rasa.shared.utils.io.create_directory(output)

        intent_train_metrics: IntentMetrics = defaultdict(list)
        intent_test_metrics: IntentMetrics = defaultdict(list)
        entity_train_metrics: EntityMetrics = defaultdict(lambda: defaultdict(list))
        entity_test_metrics: EntityMetrics = defaultdict(lambda: defaultdict(list))
        response_selection_train_metrics: ResponseSelectionMetrics = defaultdict(list)
        response_selection_test_metrics: ResponseSelectionMetrics = defaultdict(list)

        intent_test_results: List[IntentEvaluationResult] = []
        entity_test_results: List[EntityEvaluationResult] = []
        response_selection_test_results: List[ResponseSelectionEvaluationResult] = []

        for train, test in generate_folds(n_folds, data):
            training_data_file = tmp_path / "training_data.yml"
            RasaYAMLWriter().dump(training_data_file, train)

            model_file = rasa.model_training.train_nlu(
                nlu_config, str(training_data_file), str(tmp_path)
            )

            processor = Agent.load(model_file).processor

            # calculate train accuracy
            await combine_result(
                intent_train_metrics,
                entity_train_metrics,
                response_selection_train_metrics,
                processor,
                train,
            )
            # calculate test accuracy
            await combine_result(
                intent_test_metrics,
                entity_test_metrics,
                response_selection_test_metrics,
                processor,
                test,
                intent_test_results,
                entity_test_results,
                response_selection_test_results,
            )

        intent_evaluation = {}
        if intent_test_results:
            logger.info("Accumulated test folds intent evaluation results:")
            intent_evaluation = evaluate_intents(
                intent_test_results,
                output,
                successes,
                errors,
                disable_plotting,
                report_as_dict=report_as_dict,
            )

        entity_evaluation = {}
        if entity_test_results:
            logger.info("Accumulated test folds entity evaluation results:")
            extractors = _get_active_entity_extractors(entity_test_results)
            entity_evaluation = evaluate_entities(
                entity_test_results,
                extractors,
                output,
                successes,
                errors,
                disable_plotting,
                report_as_dict=report_as_dict,
            )

        responses_evaluation = {}
        if response_selection_test_results:
            logger.info("Accumulated test folds response selection evaluation results:")
            responses_evaluation = evaluate_response_selections(
                response_selection_test_results,
                output,
                successes,
                errors,
                disable_plotting,
                report_as_dict=report_as_dict,
            )

        return (
            CVEvaluationResult(
                dict(intent_train_metrics), dict(intent_test_metrics), intent_evaluation
            ),
            CVEvaluationResult(
                dict(entity_train_metrics), dict(entity_test_metrics), entity_evaluation
            ),
            CVEvaluationResult(
                dict(response_selection_train_metrics),
                dict(response_selection_test_metrics),
                responses_evaluation,
            ),
        )


def _targets_predictions_from(
    results: Union[
        List[IntentEvaluationResult], List[ResponseSelectionEvaluationResult]
    ],
    target_key: Text,
    prediction_key: Text,
) -> Iterator[Iterable[Optional[Text]]]:
    return zip(*[(getattr(r, target_key), getattr(r, prediction_key)) for r in results])


async def compute_metrics(
    processor: MessageProcessor, training_data: TrainingData
) -> Tuple[
    IntentMetrics,
    EntityMetrics,
    ResponseSelectionMetrics,
    List[IntentEvaluationResult],
    List[EntityEvaluationResult],
    List[ResponseSelectionEvaluationResult],
]:
    """Computes metrics for intent classification, response selection and entity
    extraction.

    Args:
        processor: the processor
        training_data: training data

    Returns: intent, response selection and entity metrics, and prediction results.
    """
    intent_results, response_selection_results, entity_results = await get_eval_data(
        processor, training_data
    )

    intent_results = remove_empty_intent_examples(intent_results)

    response_selection_results = remove_empty_response_examples(
        response_selection_results
    )

    intent_metrics: IntentMetrics = {}
    if intent_results:
        intent_metrics = _compute_metrics(
            intent_results, "intent_target", "intent_prediction"
        )

    entity_metrics = {}
    if entity_results:
        entity_metrics = _compute_entity_metrics(entity_results)

    response_selection_metrics: ResponseSelectionMetrics = {}
    if response_selection_results:
        response_selection_metrics = _compute_metrics(
            response_selection_results,
            "intent_response_key_target",
            "intent_response_key_prediction",
        )

    return (
        intent_metrics,
        entity_metrics,
        response_selection_metrics,
        intent_results,
        entity_results,
        response_selection_results,
    )


async def compare_nlu(
    configs: List[Text],
    data: TrainingData,
    exclusion_percentages: List[int],
    f_score_results: Dict[Text, List[List[float]]],
    model_names: List[Text],
    output: Text,
    runs: int,
) -> List[int]:
    """Trains and compares multiple NLU models.
    For each run and exclusion percentage a model per config file is trained.
    Thereby, the model is trained only on the current percentage of training data.
    Afterwards, the model is tested on the complete test data of that run.
    All results are stored in the provided output directory.

    Args:
        configs: config files needed for training
        data: training data
        exclusion_percentages: percentages of training data to exclude during comparison
        f_score_results: dictionary of model name to f-score results per run
        model_names: names of the models to train
        output: the output directory
        runs: number of comparison runs

    Returns: training examples per run
    """
    import rasa.model_training

    training_examples_per_run = []

    for run in range(runs):

        logger.info("Beginning comparison run {}/{}".format(run + 1, runs))

        run_path = os.path.join(output, "run_{}".format(run + 1))
        io_utils.create_path(run_path)

        test_path = os.path.join(run_path, TEST_DATA_FILE)
        io_utils.create_path(test_path)

        train, test = data.train_test_split()
        rasa.shared.utils.io.write_text_file(test.nlu_as_yaml(), test_path)

        for percentage in exclusion_percentages:
            percent_string = f"{percentage}%_exclusion"

            _, train_included = train.train_test_split(percentage / 100)
            # only count for the first run and ignore the others
            if run == 0:
                training_examples_per_run.append(len(train_included.nlu_examples))

            model_output_path = os.path.join(run_path, percent_string)
            train_split_path = os.path.join(model_output_path, "train")
            train_nlu_split_path = os.path.join(train_split_path, TRAIN_DATA_FILE)
            train_nlg_split_path = os.path.join(train_split_path, NLG_DATA_FILE)
            io_utils.create_path(train_nlu_split_path)
            rasa.shared.utils.io.write_text_file(
                train_included.nlu_as_yaml(), train_nlu_split_path
            )
            rasa.shared.utils.io.write_text_file(
                train_included.nlg_as_yaml(), train_nlg_split_path
            )

            for nlu_config, model_name in zip(configs, model_names):
                logger.info(
                    "Evaluating configuration '{}' with {} training data.".format(
                        model_name, percent_string
                    )
                )

                try:
                    model_path = rasa.model_training.train_nlu(
                        nlu_config,
                        train_split_path,
                        model_output_path,
                        fixed_model_name=model_name,
                    )
                except Exception as e:  # skipcq: PYL-W0703
                    # general exception catching needed to continue evaluating other
                    # model configurations
                    logger.warning(f"Training model '{model_name}' failed. Error: {e}")
                    f_score_results[model_name][run].append(0.0)
                    continue

                output_path = os.path.join(model_output_path, f"{model_name}_report")
                processor = Agent.load(model_path=model_path).processor
                result = await run_evaluation(
                    test_path, processor, output_directory=output_path, errors=True
                )

                f1 = result["intent_evaluation"]["f1_score"]
                f_score_results[model_name][run].append(f1)

    return training_examples_per_run


def _compute_metrics(
    results: Union[
        List[IntentEvaluationResult], List[ResponseSelectionEvaluationResult]
    ],
    target_key: Text,
    prediction_key: Text,
) -> Union[IntentMetrics, ResponseSelectionMetrics]:
    """Computes evaluation metrics for a given corpus and returns the results.

    Args:
        results: evaluation results
        target_key: target key name
        prediction_key: prediction key name

    Returns: metrics
    """
    from rasa.model_testing import get_evaluation_metrics

    # compute fold metrics
    targets, predictions = _targets_predictions_from(
        results, target_key, prediction_key
    )
    _, precision, f1, accuracy = get_evaluation_metrics(targets, predictions)

    return {"Accuracy": [accuracy], "F1-score": [f1], "Precision": [precision]}


def _compute_entity_metrics(
    entity_results: List[EntityEvaluationResult],
) -> EntityMetrics:
    """Computes entity evaluation metrics and returns the results.

    Args:
        entity_results: entity evaluation results
    Returns: entity metrics
    """
    from rasa.model_testing import get_evaluation_metrics

    entity_metric_results: EntityMetrics = defaultdict(lambda: defaultdict(list))
    extractors = _get_active_entity_extractors(entity_results)

    if not extractors:
        return entity_metric_results

    aligned_predictions = align_all_entity_predictions(entity_results, extractors)

    merged_targets = merge_labels(aligned_predictions)
    merged_targets = substitute_labels(merged_targets, NO_ENTITY_TAG, NO_ENTITY)

    for extractor in extractors:
        merged_predictions = merge_labels(aligned_predictions, extractor)
        merged_predictions = substitute_labels(
            merged_predictions, NO_ENTITY_TAG, NO_ENTITY
        )
        _, precision, f1, accuracy = get_evaluation_metrics(
            merged_targets, merged_predictions, exclude_label=NO_ENTITY
        )
        entity_metric_results[extractor]["Accuracy"].append(accuracy)
        entity_metric_results[extractor]["F1-score"].append(f1)
        entity_metric_results[extractor]["Precision"].append(precision)

    return entity_metric_results


def log_results(results: IntentMetrics, dataset_name: Text) -> None:
    """Logs results of cross validation.

    Args:
        results: dictionary of results returned from cross validation
        dataset_name: string of which dataset the results are from, e.g. test/train
    """
    for k, v in results.items():
        logger.info(f"{dataset_name} {k}: {np.mean(v):.3f} ({np.std(v):.3f})")


def log_entity_results(results: EntityMetrics, dataset_name: Text) -> None:
    """Logs entity results of cross validation.

    Args:
        results: dictionary of dictionaries of results returned from cross validation
        dataset_name: string of which dataset the results are from, e.g. test/train
    """
    for extractor, result in results.items():
        logger.info(f"Entity extractor: {extractor}")
        log_results(result, dataset_name)