rasa/core/evaluation/marker_base.py

Summary

Maintainability
D
1 day
Test Coverage
A
93%
from __future__ import annotations
import os
from abc import ABC, abstractmethod
from typing import (
    Dict,
    Iterator,
    Optional,
    Set,
    Text,
    List,
    Tuple,
    Type,
    TypeVar,
    TYPE_CHECKING,
    Union,
    Any,
    AsyncIterator,
)

from pathlib import Path
from dataclasses import dataclass

import rasa.shared.core.constants
import rasa.shared.nlu.constants
import rasa.shared.utils.validation
import rasa.shared.utils.io
import rasa.shared.utils.common
from rasa.shared.data import is_likely_yaml_file
from rasa.shared.exceptions import InvalidConfigException, RasaException
from rasa.shared.core.events import ActionExecuted, UserUttered, Event
from rasa import telemetry
from rasa.shared.core.domain import Domain
from rasa.shared.core.trackers import DialogueStateTracker
from rasa.utils.io import WriteRow
from rasa.shared.constants import DOCS_URL_MARKERS

import logging
import csv
import os.path

if TYPE_CHECKING:
    from rasa.core.evaluation.marker import OrMarker

logger = logging.getLogger(__name__)


class MarkerRegistry:
    """Keeps track of tags that can be used to configure markers."""

    all_tags: Set[Text] = set()  # noqa: RUF012
    condition_tag_to_marker_class: Dict[
        Text, Type[ConditionMarker]
    ] = {}  # noqa: RUF012
    operator_tag_to_marker_class: Dict[Text, Type[OperatorMarker]] = {}  # noqa: RUF012
    marker_class_to_tag: Dict[Type[Marker], Text] = {}  # noqa: RUF012
    negated_tag_to_tag: Dict[Text, Text] = {}  # noqa: RUF012
    tag_to_negated_tag: Dict[Text, Text] = {}  # noqa: RUF012

    @classmethod
    def register_builtin_markers(cls) -> None:
        """Must import all modules containing markers."""
        import rasa.core.evaluation.marker  # noqa

    @classmethod
    def configurable_marker(cls, marker_class: Type[Marker]) -> Type[Marker]:
        """Decorator used to register a marker that can be used in config files.

        Args:
            marker_class: the marker class to be made available via config files
        Returns:
            the registered marker class
        """
        if not issubclass(marker_class, Marker):
            raise RuntimeError("Can only register marker classes as configurable.")

        tag = marker_class.positive_tag()
        negated_tag = marker_class.negated_tag()
        cls._register_tags(tags={tag, negated_tag})
        cls._register_tag_class(marker_class=marker_class, positive_tag=tag)
        cls._register_negation(tag=tag, negated_tag=negated_tag)
        return marker_class

    @classmethod
    def _register_tags(cls, tags: Set[Optional[Text]]) -> None:
        specified_tags = {tag for tag in tags if tag is not None}
        for tag_ in specified_tags:
            if tag_ in cls.all_tags:
                raise RuntimeError(
                    "Expected the tags of all configurable markers to be "
                    "identifiable by their tag."
                )
            cls.all_tags.add(tag_)

    @classmethod
    def _register_negation(cls, tag: Text, negated_tag: Optional[Text]) -> None:
        if negated_tag is not None:
            cls.negated_tag_to_tag[negated_tag] = tag
            cls.tag_to_negated_tag[tag] = negated_tag

    @classmethod
    def get_non_negated_tag(cls, tag_or_negated_tag: Text) -> Tuple[Text, bool]:
        """Returns the non-negated marker tag, given a (possible) negated marker tag.

        Args:
            tag_or_negated_tag: the tag for a possibly negated marker
        Returns:
            the tag itself if it was already positive, otherwise the positive version;
            and a boolean that represents whether the given tag was a negative one
        """
        # either the given `marker_name` is already "positive" (e.g. "slot was set")
        # or there is an entry mapping it to it's positive version:
        positive_version = cls.negated_tag_to_tag.get(
            tag_or_negated_tag, tag_or_negated_tag
        )
        is_negation = tag_or_negated_tag != positive_version
        return positive_version, is_negation

    @classmethod
    def _register_tag_class(
        cls, marker_class: Type[Marker], positive_tag: Text
    ) -> None:
        if issubclass(marker_class, ConditionMarker):
            cls.condition_tag_to_marker_class[positive_tag] = marker_class
        elif issubclass(marker_class, OperatorMarker):
            cls.operator_tag_to_marker_class[positive_tag] = marker_class
        else:
            raise RuntimeError(
                f"Can only register `{OperatorMarker.__name__} or "
                f" {ConditionMarker.__name__} subclasses."
            )
        cls.marker_class_to_tag[marker_class] = positive_tag


class InvalidMarkerConfig(RasaException):
    """Exception that can be raised when the config for a marker is not valid."""


@dataclass
class EventMetaData:
    """Describes meta data per event in some session."""

    idx: int
    preceding_user_turns: int


# We evaluate markers separately against every session and extract, for every marker
# that we want to evaluate, the meta data of the respective relevant events where the
# marker applies.
SessionEvaluation = Dict[Text, List[EventMetaData]]

T = TypeVar("T")


class Marker(ABC):
    """A marker is a way of describing points in conversations you're interested in.

    Here, markers are stateful objects because they track the events of a conversation.
    At each point in the conversation, one can observe whether a marker applies or
    does not apply to the conversation so far.
    """

    # Identifier for an artificial marker that is created when loading markers
    # from a dictionary of configs. For more details, see `from_config_dict`.
    ANY_MARKER = "<any_marker>"

    def __init__(
        self,
        name: Optional[Text] = None,
        negated: bool = False,
        description: Optional[Text] = None,
    ) -> None:
        """Instantiates a marker.

        Args:
            name: a custom name that can be used to replace the default string
                conversion of this marker
            negated: whether this marker should be negated (i.e. a negated marker
                applies if and only if the non-negated marker does not apply)
            description: an optional description of the marker. It is not used
                internally but can be used to document the marker.

        Raises:
            `InvalidMarkerConfig` if the chosen *name* of the marker is the tag of
            a predefined marker.
        """
        if name == Marker.ANY_MARKER or name in MarkerRegistry.all_tags:
            raise InvalidMarkerConfig(
                f"You must not use the special marker name {Marker.ANY_MARKER}. "
                f"This is to avoid confusion when you generate a marker from a "
                f"dictionary of marker configurations, which will lead to all "
                f"markers being combined under one common `ORMarker` named "
                f"{Marker.ANY_MARKER}."
            )
        self.name = name
        self.history: List[bool] = []
        # Note: we allow negation here even though there might not be a negated tag
        # for 2 reasons: testing and the fact that the `MarkerRegistry`+`from_config`
        # won't allow to create a negated marker if there is no negated tag.
        self.negated: bool = negated
        self.description = description

    def __str__(self) -> Text:
        return self.name or repr(self)

    def __repr__(self) -> Text:
        return self._to_str_with(self.get_tag())

    def get_tag(self) -> Text:
        """Returns the tag describing this marker."""
        if self.negated:
            tag = self.negated_tag()
            if tag is None:
                # We allow the instantiation of a negated marker even if there
                # is no negated tag (ie. direct creation of the negated marker from
                # a config is not allowed). To be able to print a tag nonetheless:
                tag = f"not({self.positive_tag()})"
            return tag
        else:
            return self.positive_tag()

    @staticmethod
    @abstractmethod
    def positive_tag() -> Text:
        """Returns the tag to be used in a config file."""
        ...

    @staticmethod
    def negated_tag() -> Optional[Text]:
        """Returns the tag to be used in a config file for the negated version.

        If this maps to `None`, then this indicates that we do not allow a short-cut
        for negating this marker. Hence, there is not a single tag to instantiate
        a negated version of this marker. One must use a "not" in the configuration
        file then.
        """
        return None

    @abstractmethod
    def _to_str_with(self, tag: Text) -> Text:
        """Returns a string representation using the given tag."""
        ...

    def track(self, event: Event) -> None:
        """Updates the marker according to the given event.

        Args:
            event: the next event of the conversation
        """
        result = self._non_negated_version_applies_at(event)
        if self.negated:
            result = not result
        self.history.append(result)

    @abstractmethod
    def _non_negated_version_applies_at(self, event: Event) -> bool:
        """Checks whether the non-negated version applies at the next given event.

        This method must *not* update the marker.

        Args:
            event: the next event of the conversation
        """
        ...

    def reset(self) -> None:
        """Clears the history of the marker."""
        self.history = []

    @abstractmethod
    def flatten(self) -> Iterator[Marker]:
        """Returns an iterator over all conditions and operators used in this marker.

        Returns:
            an iterator over all conditions and operators that are part of this marker
        """
        ...

    @abstractmethod
    def validate_against_domain(self, domain: Domain) -> bool:
        """Checks that this marker (and its children) refer to entries in the domain.

        Args:
            domain: The domain to check against
        """
        ...

    @abstractmethod
    def max_depth(self) -> int:
        """Gets the maximum depth from this point in the marker tree."""
        ...

    def evaluate_events(self, events: List[Event]) -> List[SessionEvaluation]:
        """Resets the marker, tracks all events, and collects some information.

        The collected information includes:
        - the timestamp of each event where the marker applied and
        - the number of user turns that preceded that timestamp

        If this marker is the special `ANY_MARKER` (identified by its name), then
        results will be collected for all (immediate) sub-markers.

        Args:
            events: a list of events describing a conversation
        Returns:
            a list that contains, for each session contained in the tracker, a
            dictionary mapping that maps marker names to meta data of relevant
            events
        """
        # determine which marker to extract results from
        if isinstance(self, OperatorMarker) and self.name == Marker.ANY_MARKER:
            markers_to_be_evaluated = self.sub_markers
        else:
            markers_to_be_evaluated = [self]

        # split the events into sessions and evaluate them separately
        sessions_and_start_indices = self._split_sessions(events=events)

        extracted_markers: List[Dict[Text, List[EventMetaData]]] = []
        for session, start_idx in sessions_and_start_indices:
            # track all events and collect meta data per time step
            meta_data = self._track_all_and_collect_meta_data(
                events=session, event_idx_offset=start_idx
            )
            # for each marker, keep only certain meta data
            extracted: Dict[Text, List[EventMetaData]] = {
                str(marker): [meta_data[idx] for idx in marker.relevant_events()]
                for marker in markers_to_be_evaluated
            }
            extracted_markers.append(extracted)
        return extracted_markers

    @staticmethod
    def _split_sessions(events: List[Event]) -> List[Tuple[List[Event], int]]:
        """Identifies single sessions in a the given sequence of events.

        Args:
            events: a sequence of events, e.g. extracted from a tracker store
        Returns:
            a list of sub-sequences of the given events that describe single
            conversations and the respective index that describes where the
            subsequence starts in the original sequence
        """
        session_start_indices = [
            idx
            for idx, event in enumerate(events)
            if isinstance(event, ActionExecuted)
            and event.action_name
            == rasa.shared.core.constants.ACTION_SESSION_START_NAME
        ]
        if len(session_start_indices) == 0:
            return [(events, 0)]
        sessions_and_start_indices: List[Tuple[List[Event], int]] = []
        for session_idx in range(len(session_start_indices) - 1):
            start_idx = session_start_indices[session_idx]
            end_idx = session_start_indices[session_idx + 1]
            session = [events[idx] for idx in range(start_idx, end_idx)]
            sessions_and_start_indices.append((session, start_idx))
        last_session = [
            events[idx] for idx in range(session_start_indices[-1], len(events))
        ]
        sessions_and_start_indices.append((last_session, session_start_indices[-1]))
        return sessions_and_start_indices

    def _track_all_and_collect_meta_data(
        self, events: List[Event], event_idx_offset: int = 0
    ) -> List[EventMetaData]:
        """Resets the marker, tracks all events, and collects metadata.

        Args:
            events: all events of a *single* session that should be tracked and
                evaluated
            event_idx_offset: offset that will be used to modify the collected event
                meta data, i.e. all event indices will be shifted by this offset
        Returns:
            metadata for each tracked event with all event indices shifted by the
            given `event_idx_offset`
        """
        self.reset()
        session_meta_data: List[EventMetaData] = []
        num_preceding_user_turns = 0
        for idx, event in enumerate(events):
            is_user_turn = isinstance(event, UserUttered)
            session_meta_data.append(
                EventMetaData(
                    idx=idx + event_idx_offset,
                    preceding_user_turns=num_preceding_user_turns,
                )
            )
            self.track(event=event)
            num_preceding_user_turns += int(is_user_turn)
        return session_meta_data

    def relevant_events(self) -> List[int]:
        """Returns the indices of those tracked events that are relevant for evaluation.

        Note: Overwrite this method if you create a new marker class that should *not*
        contain meta data about each event where the marker applied in the final
        evaluation (see `evaluate_events`).

        Returns:
            indices of tracked events
        """
        return [idx for (idx, applies) in enumerate(self.history) if applies]

    @classmethod
    def from_path(cls, path: Union[Path, Text]) -> "OrMarker":
        """Loads markers from one config file or all config files in a directory tree.

        Each config file should contain a dictionary mapping marker names to the
        respective marker configuration.
        To avoid confusion, the marker names must not coincide with the tag of
        any pre-defined markers. Moreover, marker names must be unique. This means,
        if you you load the markers from multiple files, then you have to make sure
        that the names of the markers defined in these files are unique across all
        loaded files.

        Note that all loaded markers will be combined into one marker via one
        artificial OR-operator. When evaluating the resulting marker, then the
        artificial OR-operator will be ignored and results will be reported for
        every sub-marker.

        For more details how a single marker configuration looks like, have a look
        at `Marker.from_config`.

        Args:
            path: either the path to a single config file or the root of the directory
                tree that contains marker config files
        Returns:
            all configured markers, combined into one marker object
        """
        MarkerRegistry.register_builtin_markers()
        from rasa.core.evaluation.marker import OrMarker

        # collect all the files from which we want to load configs (i.e. either just
        # the given path if points to a yaml or all yaml-files in the directory tree)
        yaml_files = cls._collect_yaml_files_from_path(path)

        # collect all the configs from those yaml files (assure it's dictionaries
        # mapping marker names to something) -- keep track of which config came
        # from which file to be able to raise meaningful errors later
        loaded_configs: Dict[Text, Dict] = cls._collect_configs_from_yaml_files(
            yaml_files
        )

        # create markers from all loaded configurations
        loaded_markers: List[Marker] = []
        for yaml_file, config in loaded_configs.items():
            for marker_name, marker_config in config.items():
                try:
                    marker = Marker.from_config(marker_config, name=marker_name)
                except InvalidMarkerConfig as e:
                    # we don't re-raise here because the stack trace would only be
                    # printed when we run rasa evaluate with --debug flag
                    raise InvalidMarkerConfig(
                        f"Could not load marker {marker_name} from {yaml_file}. "
                        f"Reason: {e!s}. "
                    )
                loaded_markers.append(marker)

        # Reminder: We could also just create a dictionary of markers from this.
        # However, if we want to allow re-using top-level markers (e.g.
        # "custom_marker1 or custom_marker2" and/or optimize the marker evaluation such
        # that e.g. the same condition is not instantiated (and evaluated) twice, then
        # the current approach might be better (e.g. with a dictionary of markers one
        # might expect the markers to be independent objects).

        # combine the markers
        marker = OrMarker(markers=loaded_markers)
        marker.name = Marker.ANY_MARKER  # cannot be set via name parameter
        return marker

    @staticmethod
    def _collect_yaml_files_from_path(path: Union[Text, Path]) -> List[Text]:
        path = os.path.abspath(path)
        if os.path.isfile(path):
            yaml_files = [
                yaml_file for yaml_file in [path] if is_likely_yaml_file(yaml_file)
            ]
            if not yaml_files:
                raise InvalidMarkerConfig(f"Could not find a yaml file at '{path}'.")
        elif os.path.isdir(path):
            yaml_files = [
                os.path.join(root, file)
                for root, _, files in os.walk(path, followlinks=True)
                for file in files
                if is_likely_yaml_file(file)
            ]
            if not yaml_files:
                raise InvalidMarkerConfig(
                    f"Could not find any yaml in the directory tree rooted at '{path}'."
                )
        else:
            raise InvalidMarkerConfig(
                f"The given path ({path}) is neither pointing to a directory "
                f"nor a file. Please specify the location of a yaml file or a "
                f"root directory (all yaml configs found in the directories "
                f"under that root directory will be loaded). "
                f"Refer to the docs for more information: {DOCS_URL_MARKERS} "
            )
        return yaml_files

    @staticmethod
    def _collect_configs_from_yaml_files(yaml_files: List[Text]) -> Dict[Text, Dict]:
        marker_names: Set[Text] = set()
        loaded_configs: Dict[Text, Dict] = {}
        for yaml_file in yaml_files:
            loaded_config = rasa.shared.utils.io.read_yaml_file(yaml_file)
            if not isinstance(loaded_config, dict):
                raise InvalidMarkerConfig(
                    f"Expected the loaded configurations to be a dictionary "
                    f"of marker configurations but found a "
                    f"{type(loaded_config)} in {yaml_file}. "
                    f"Refer to the docs for more information: {DOCS_URL_MARKERS} "
                )
            if set(loaded_config.keys()).intersection(marker_names):
                raise InvalidMarkerConfig(
                    f"The names of markers defined in {yaml_file} "
                    f"({sorted(loaded_config.keys())}) "
                    f"overlap with the names of markers loaded so far "
                    f"({sorted(marker_names)}). "
                    f"Please adapt your configurations such that your custom "
                    f"marker names are unique. "
                    f"Refer to the docs for more information: {DOCS_URL_MARKERS} "
                )
            if set(loaded_config.keys()).intersection(MarkerRegistry.all_tags):
                raise InvalidMarkerConfig(
                    f"The top level of your marker configuration should consist "
                    f"of names for your custom markers. "
                    f"If you use a condition or operator name at the top level, "
                    f"then that will not be recognised as an actual condition "
                    f"or operator and won't be evaluated against any sessions. "
                    f"Please remove any of the pre-defined marker tags "
                    f"(i.e. {MarkerRegistry.all_tags}) "
                    f"from {yaml_file}. "
                    f"Refer to the docs for more information: {DOCS_URL_MARKERS} "
                )
            marker_names.update(loaded_config.keys())
            loaded_configs[yaml_file] = loaded_config
        return loaded_configs

    @staticmethod
    def from_config(config: Any, name: Optional[Text] = None) -> Marker:
        """Creates a marker from the given config.

        A marker configuration is a dictionary mapping a marker tag (either a
        `positive_tag` or a `negated_tag`) to a sub-configuration.
        How that sub-configuration looks like, depends on whether the tag describes
        an operator (see `OperatorMarker.from_tag_and_sub_config`) or a
        condition (see `ConditionMarker.from_tag_and_sub_config`).

        Args:
            config: a marker configuration
            name: a custom name that will be used for the top-level marker (if and
                only if there is only one top-level marker)

        Returns:
            the configured marker
        """
        # Triggers the import of all modules containing marker classes in order to
        # register all configurable markers.
        MarkerRegistry.register_builtin_markers()

        if not isinstance(config, dict) or len(config) not in [1, 2]:
            raise InvalidMarkerConfig(
                "To configure a marker, please define a dictionary that maps a "
                "single operator tag or a single condition tag to the "
                "corresponding parameter configuration or a list of marker "
                "configurations, respectively. "
                f"Refer to the docs for more information: {DOCS_URL_MARKERS} "
            )

        description = config.pop("description", None)
        tag = next(iter(config))
        sub_marker_config = config[tag]

        tag, _ = MarkerRegistry.get_non_negated_tag(tag_or_negated_tag=tag)
        if tag in MarkerRegistry.operator_tag_to_marker_class:
            return OperatorMarker.from_tag_and_sub_config(
                tag=tag,
                sub_config=sub_marker_config,
                name=name,
                description=description,
            )
        elif tag in MarkerRegistry.condition_tag_to_marker_class:
            return ConditionMarker.from_tag_and_sub_config(
                tag=tag,
                sub_config=sub_marker_config,
                name=name,
                description=description,
            )

        raise InvalidMarkerConfig(
            f"Expected a marker configuration with a key that specifies"
            f" an operator or a condition but found {tag}. "
            f"Available conditions and operators are: "
            f"{sorted(MarkerRegistry.all_tags)}. "
            f"Refer to the docs for more information: {DOCS_URL_MARKERS} "
        )

    async def evaluate_trackers(
        self,
        trackers: AsyncIterator[Optional[DialogueStateTracker]],
        output_file: Path,
        session_stats_file: Optional[Path] = None,
        overall_stats_file: Optional[Path] = None,
    ) -> None:
        """Collect markers for each dialogue in each tracker loaded.

        Args:
            trackers: An iterator over the trackers from which we want to extract
                markers.
            output_file: Path to write out the extracted markers.
            session_stats_file: (Optional) Path to write out statistics about the
                extracted markers for each session separately.
            overall_stats_file: (Optional) Path to write out statistics about the
                markers extracted from all session data.

        Raises:
            `FileExistsError` if any of the specified files already exists
            `NotADirectoryError` if any of the specified files is supposed to be
                contained in a directory that does not exist
        """
        # Check files and folders before doing the costly swipe over the trackers:
        for path in [session_stats_file, overall_stats_file, output_file]:
            if path is not None and path.is_file():
                raise FileExistsError(f"Expected that no file {path} already exists.")
            if path is not None and not path.parent.is_dir():
                raise NotADirectoryError(f"Expected directory {path.parent} to exist.")

        # Apply marker to each session stored in each tracker and save the results.
        processed_trackers: Dict[Text, List[SessionEvaluation]] = {}
        async for tracker in trackers:
            if tracker:
                tracker_result = self.evaluate_events(tracker.events)
                processed_trackers[tracker.sender_id] = tracker_result

        processed_trackers_count = len(processed_trackers)
        telemetry.track_markers_extracted(processed_trackers_count)
        Marker._save_results(output_file, processed_trackers)

        # Compute and write statistics if requested.
        if session_stats_file or overall_stats_file:
            from rasa.core.evaluation.marker_stats import MarkerStatistics

            stats = MarkerStatistics()
            for sender_id, tracker_result in processed_trackers.items():
                for session_idx, session_result in enumerate(tracker_result):
                    stats.process(
                        sender_id=sender_id,
                        session_idx=session_idx,
                        meta_data_on_relevant_events_per_marker=session_result,
                    )

            telemetry.track_markers_stats_computed(processed_trackers_count)
            if overall_stats_file:
                stats.overall_statistic_to_csv(path=overall_stats_file)
            if session_stats_file:
                stats.per_session_statistics_to_csv(path=session_stats_file)

    @staticmethod
    def _save_results(path: Path, results: Dict[Text, List[SessionEvaluation]]) -> None:
        """Save extracted marker results as CSV to specified path.

        Args:
            path: Path to write out the extracted markers.
            results: Extracted markers from a selection of trackers.
        """
        with path.open(mode="w") as f:
            table_writer = csv.writer(f)
            table_writer.writerow(
                [
                    "sender_id",
                    "session_idx",
                    "marker",
                    "event_idx",
                    "num_preceding_user_turns",
                ]
            )
            for sender_id, results_per_session in results.items():
                for session_idx, session_result in enumerate(results_per_session):
                    Marker._write_relevant_events(
                        table_writer, sender_id, session_idx, session_result
                    )

    @staticmethod
    def _write_relevant_events(
        writer: WriteRow, sender_id: Text, session_idx: int, session: SessionEvaluation
    ) -> None:
        for marker_name, meta_data_per_relevant_event in session.items():
            for event_meta_data in meta_data_per_relevant_event:
                writer.writerow(
                    [
                        sender_id,
                        str(session_idx),
                        marker_name,
                        str(event_meta_data.idx),
                        str(event_meta_data.preceding_user_turns),
                    ]
                )


class OperatorMarker(Marker, ABC):
    """Combines several markers into one."""

    def __init__(
        self,
        markers: List[Marker],
        negated: bool = False,
        name: Optional[Text] = None,
        description: Optional[Text] = None,
    ) -> None:
        """Instantiates a marker.

        Args:
            markers: the list of markers to combine
            negated: whether this marker should be negated (i.e. a negated marker
                applies if and only if the non-negated marker does not apply)
            name: a custom name that can be used to replace the default string
                conversion of this marker
            description: an optional description of the marker. It is not used
                internally but can be used to document the marker.

        Raises:
            `InvalidMarkerConfig` if the given number of sub-markers does not match
            the expected number of sub-markers
        """
        super().__init__(name=name, negated=negated, description=description)
        self.sub_markers: List[Marker] = markers
        expected_num = self.expected_number_of_sub_markers()
        if expected_num is not None and len(markers) != expected_num:
            raise InvalidMarkerConfig(
                f"Expected {expected_num} sub-marker(s) to be specified for marker "
                f"'{self}' ({self.get_tag()}) but found {len(markers)}. "
                f"Please adapt your configuration so that there are exactly "
                f"{expected_num} sub-markers. "
            )
        elif len(markers) == 0:
            raise InvalidMarkerConfig(
                f"Expected some sub-markers to be specified for {self} but "
                f"found none. Please adapt your configuration so that there is "
                f"at least one sub-marker. "
            )

    @staticmethod
    def expected_number_of_sub_markers() -> Optional[int]:
        """Returns the expected number of sub-markers (if there is any)."""
        return None

    def _to_str_with(self, tag: Text) -> Text:
        marker_str = ", ".join(str(marker) for marker in self.sub_markers)
        return f"{tag}({marker_str})"

    def track(self, event: Event) -> None:
        """Updates the marker according to the given event.

        All sub-markers will be updated before the compound marker itself is updated.

        Args:
            event: the next event of the conversation
        """
        for marker in self.sub_markers:
            marker.track(event)
        super().track(event)

    def flatten(self) -> Iterator[Marker]:
        """Returns an iterator over all included markers, plus this marker itself.

        Returns:
            an iterator over all markers that are part of this marker
        """
        for marker in self.sub_markers:
            for sub_marker in marker.flatten():
                yield sub_marker
        yield self

    def reset(self) -> None:
        """Resets the history of this marker and all its sub-markers."""
        for marker in self.sub_markers:
            marker.reset()
        super().reset()

    def validate_against_domain(self, domain: Domain) -> bool:
        """Checks that this marker (and its children) refer to entries in the domain.

        Args:
            domain: The domain to check against
        """
        return all(
            marker.validate_against_domain(domain) for marker in self.sub_markers
        )

    def max_depth(self) -> int:
        """Gets the maximum depth from this point in the marker tree."""
        return 1 + max(child.max_depth() for child in self.sub_markers)

    @staticmethod
    def from_tag_and_sub_config(
        tag: Text,
        sub_config: Any,
        name: Optional[Text] = None,
        description: Optional[Text] = None,
    ) -> OperatorMarker:
        """Creates an operator marker from the given config.

        The configuration must consist of a list of marker configurations.
        See `Marker.from_config` for more details.

        Args:
            tag: the tag identifying an operator
            sub_config: a list of marker configs
            name: an optional custom name to be attached to the resulting marker
            description: an optional description of the marker
        Returns:
           the configured operator marker
        Raises:
            `InvalidMarkerConfig` if the given config or the tag are not well-defined
        """
        positive_tag, is_negation = MarkerRegistry.get_non_negated_tag(tag)
        operator_class = MarkerRegistry.operator_tag_to_marker_class.get(positive_tag)
        if operator_class is None:
            raise InvalidConfigException(f"Unknown operator '{tag}'.")

        if not isinstance(sub_config, list):
            raise InvalidMarkerConfig(
                f"Expected a list of sub-marker configurations under {tag}."
            )
        collected_sub_markers: List[Marker] = []
        for sub_marker_config in sub_config:
            try:
                sub_marker = Marker.from_config(sub_marker_config)
            except InvalidMarkerConfig as e:
                # we don't re-raise here because the stack trace would only be
                # printed when we run rasa evaluate with --debug flag
                raise InvalidMarkerConfig(
                    f"Could not create sub-marker for operator '{tag}' from "
                    f"{sub_marker_config}. Reason: {e!s}"
                )
            collected_sub_markers.append(sub_marker)
        try:
            marker = operator_class(markers=collected_sub_markers, negated=is_negation)
        except InvalidMarkerConfig as e:
            # we don't re-raise here because the stack trace would only be
            # printed when we run rasa evaluate with --debug flag
            raise InvalidMarkerConfig(
                f"Could not create operator '{tag}' with sub-markers "
                f"{collected_sub_markers}. Reason: {e!s}"
            )
        marker.name = name
        marker.description = description
        return marker


class ConditionMarker(Marker, ABC):
    """A marker that does not contain any sub-markers."""

    def __init__(
        self,
        text: Text,
        negated: bool = False,
        name: Optional[Text] = None,
        description: Optional[Text] = None,
    ) -> None:
        """Instantiates an atomic marker.

        Args:
            text: some text used to decide whether the marker applies
            negated: whether this marker should be negated (i.e. a negated marker
                applies if and only if the non-negated marker does not apply)
            name: a custom name that can be used to replace the default string
                conversion of this marker
            description: an optional description of the marker. It is not used
                internally but can be used to document the marker.
        """
        super().__init__(name=name, negated=negated)
        self.text = text
        self.description = description

    def _to_str_with(self, tag: Text) -> Text:
        return f"({tag}: {self.text})"

    def flatten(self) -> Iterator[ConditionMarker]:
        """Returns an iterator that just returns this `AtomicMarker`.

        Returns:
            an iterator over all markers that are part of this marker, i.e. this marker
        """
        yield self

    def max_depth(self) -> int:
        """Gets the maximum depth from this point in the marker tree."""
        return 1

    @staticmethod
    def from_tag_and_sub_config(
        tag: Text,
        sub_config: Any,
        name: Optional[Text] = None,
        description: Optional[Text] = None,
    ) -> ConditionMarker:
        """Creates an atomic marker from the given config.

        Args:
            tag: the tag identifying a condition
            sub_config: a single text parameter expected by all condition markers;
               e.g. if the tag is for an `intent_detected` marker then the `config`
               should contain an intent name
            name: a custom name for this marker
        Returns:
            the configured `ConditionMarker`
        Raises:
            `InvalidMarkerConfig` if the given config or the tag are not well-defined
        """
        positive_tag, is_negation = MarkerRegistry.get_non_negated_tag(tag)
        marker_class = MarkerRegistry.condition_tag_to_marker_class.get(positive_tag)
        if marker_class is None:
            raise InvalidConfigException(f"Unknown condition '{tag}'.")

        if not isinstance(sub_config, str):
            raise InvalidMarkerConfig(
                f"Expected a text parameter to be specified for marker '{tag}'."
            )
        marker = marker_class(sub_config, negated=is_negation)
        marker.name = name
        marker.description = description
        return marker