rasa/shared/core/trackers.py

Summary

Maintainability
D
2 days
Test Coverage
A
96%
import copy
import dataclasses
import itertools
import logging
import os
import time
from collections import deque
from enum import Enum
from typing import (
    Dict,
    Text,
    Any,
    Optional,
    Iterator,
    Generator,
    Type,
    TypeVar,
    List,
    Deque,
    Iterable,
    Union,
    FrozenSet,
    Tuple,
    TYPE_CHECKING,
    cast,
)

import rasa.shared.utils.io
from rasa.shared.constants import ASSISTANT_ID_KEY, DEFAULT_SENDER_ID
from rasa.shared.nlu.constants import (
    ENTITY_ATTRIBUTE_VALUE,
    ENTITY_ATTRIBUTE_TYPE,
    ENTITY_ATTRIBUTE_GROUP,
    ENTITY_ATTRIBUTE_ROLE,
    ACTION_TEXT,
    ACTION_NAME,
    ENTITIES,
    METADATA_MODEL_ID,
)
from rasa.shared.core import events
from rasa.shared.core.constants import (
    ACTION_LISTEN_NAME,
    LOOP_NAME,
    SHOULD_NOT_BE_SET,
    PREVIOUS_ACTION,
    ACTIVE_LOOP,
    ACTION_SESSION_START_NAME,
    FOLLOWUP_ACTION,
)
from rasa.shared.core.conversation import Dialogue
from rasa.shared.core.events import (
    UserUttered,
    ActionExecuted,
    Event,
    Restarted,
    ActionReverted,
    UserUtteranceReverted,
    BotUttered,
    ActiveLoop,
    SessionStarted,
    ActionExecutionRejected,
    DefinePrevUserUtteredFeaturization,
)
from rasa.shared.core.domain import Domain, State
from rasa.shared.core.slots import AnySlot, Slot

if TYPE_CHECKING:
    from rasa.shared.core.events import NLUPredictionData
    from rasa.shared.core.training_data.structures import Story
    from rasa.shared.core.training_data.story_writer.story_writer import StoryWriter

    EventTypeAlias = TypeVar("EventTypeAlias", bound=Event)


@dataclasses.dataclass
class TrackerActiveLoop:
    """Dataclass for `DialogueStateTracker.active_loop`."""

    name: Optional[Text]
    is_interrupted: bool
    rejected: bool
    trigger_message: Optional[Dict]


logger = logging.getLogger(__name__)

# same as State but with Dict[...] substituted with FrozenSet[Tuple[...]]
FrozenState = FrozenSet[Tuple[Text, FrozenSet[Tuple[Text, Tuple[Union[float, Text]]]]]]


class EventVerbosity(Enum):
    """Filter on which events to include in tracker dumps."""

    # no events will be included
    NONE = 1

    # all events, that contribute to the trackers state are included
    # these are all you need to reconstruct the tracker state
    APPLIED = 2

    # include even more events, in this case everything that comes
    # after the most recent restart event. this will also include
    # utterances that got reverted and actions that got undone.
    AFTER_RESTART = 3

    # include every logged event
    ALL = 4


class AnySlotDict(dict):
    """A slot dictionary that pretends every slot exists, by creating slots on demand.

    This only uses the generic slot type! This means certain functionality wont work,
    e.g. properly featurizing the slot.
    """

    def __missing__(self, key: Text) -> Slot:
        value = self[key] = AnySlot(key, mappings=[])
        return value

    def __contains__(self, key: Any) -> bool:
        return True


class DialogueStateTracker:
    """Maintains the state of a conversation.

    The field max_event_history will only give you these last events,
    it can be set in the tracker_store.
    """

    @classmethod
    def from_dict(
        cls,
        sender_id: Text,
        events_as_dict: List[Dict[Text, Any]],
        slots: Optional[Iterable[Slot]] = None,
        max_event_history: Optional[int] = None,
    ) -> "DialogueStateTracker":
        """Create a tracker from dump.

        The dump should be an array of dumped events. When restoring
        the tracker, these events will be replayed to recreate the state.
        """
        evts = events.deserialise_events(events_as_dict)

        return cls.from_events(sender_id, evts, slots, max_event_history)

    @classmethod
    def from_events(
        cls,
        sender_id: Text,
        evts: List[Event],
        slots: Optional[Iterable[Slot]] = None,
        max_event_history: Optional[int] = None,
        sender_source: Optional[Text] = None,
        domain: Optional[Domain] = None,
    ) -> "DialogueStateTracker":
        """Creates tracker from existing events.

        Args:
            sender_id: The ID of the conversation.
            evts: Existing events which should be applied to the new tracker.
            slots: Slots which can be set.
            max_event_history: Maximum number of events which should be stored.
            sender_source: File source of the messages.
            domain: The current model domain.

        Returns:
            Instantiated tracker with its state updated according to the given
            events.
        """
        tracker = cls(sender_id, slots, max_event_history, sender_source)

        for e in evts:
            tracker.update(e, domain)

        return tracker

    def __init__(
        self,
        sender_id: Text,
        slots: Optional[Iterable[Slot]],
        max_event_history: Optional[int] = None,
        sender_source: Optional[Text] = None,
        is_rule_tracker: bool = False,
    ) -> None:
        """Initialize the tracker.

        A set of events can be stored externally, and we will run through all
        of them to get the current state. The tracker will represent all the
        information we captured while processing messages of the dialogue.
        """
        # maximum number of events to store
        self._max_event_history = max_event_history
        # list of previously seen events
        self.events = self._create_events([])
        # id of the source of the messages
        self.sender_id = sender_id
        # slots that can be filled in this domain
        if slots is not None:
            self.slots = {slot.name: copy.copy(slot) for slot in slots}
        else:
            self.slots = AnySlotDict()
        # file source of the messages
        self.sender_source = sender_source
        # whether the tracker belongs to a rule-based data
        self.is_rule_tracker = is_rule_tracker

        ###
        # current state of the tracker - MUST be re-creatable by processing
        # all the events. This only defines the attributes, values are set in
        # `reset()`
        ###
        # if tracker is paused, no actions should be taken
        self._paused = False
        # A deterministically scheduled action to be executed next
        self.followup_action: Optional[Text] = ACTION_LISTEN_NAME
        self.latest_action: Optional[Dict[Text, Text]] = None
        # Stores the most recent message sent by the user
        self.latest_message: Optional[UserUttered] = None
        self.latest_bot_utterance: Optional[BotUttered] = None
        self._reset()
        self.active_loop: Optional[TrackerActiveLoop] = None

        # Optional model_id to add to all events.
        self.model_id: Optional[Text] = None
        self.assistant_id: Optional[Text] = None

    ###
    # Public tracker interface
    ###
    def current_state(
        self, event_verbosity: EventVerbosity = EventVerbosity.NONE
    ) -> Dict[Text, Any]:
        """Returns the current tracker state as an object."""
        events = self._events_for_verbosity(event_verbosity)
        events_as_dict = [e.as_dict() for e in events] if events is not None else None
        latest_event_time = None
        if len(self.events) > 0:
            latest_event_time = self.events[-1].timestamp

        return {
            "sender_id": self.sender_id,
            "slots": self.current_slot_values(),
            "latest_message": self._latest_message_data(),
            "latest_event_time": latest_event_time,
            FOLLOWUP_ACTION: self.followup_action,
            "paused": self.is_paused(),
            "events": events_as_dict,
            "latest_input_channel": self.get_latest_input_channel(),
            ACTIVE_LOOP: (
                dataclasses.asdict(self.active_loop) if self.active_loop else {}
            ),
            "latest_action": self.latest_action,
            "latest_action_name": self.latest_action_name,
        }

    def _events_for_verbosity(
        self, event_verbosity: EventVerbosity
    ) -> Optional[List[Event]]:
        if event_verbosity == EventVerbosity.ALL:
            return list(self.events)
        if event_verbosity == EventVerbosity.AFTER_RESTART:
            return self.events_after_latest_restart()
        if event_verbosity == EventVerbosity.APPLIED:
            return self.applied_events()

        return None

    def _latest_message_data(self) -> Optional["NLUPredictionData"]:
        if not self.latest_message:
            return None

        parse_data_with_nlu_state = self.latest_message.parse_data.copy()
        # Combine entities predicted by NLU with entities predicted by policies so that
        # users can access them together via `latest_message` (e.g. in custom actions)
        parse_data_with_nlu_state[ENTITIES] = self.latest_message.entities  # type: ignore[literal-required]  # noqa: E501

        return parse_data_with_nlu_state

    @staticmethod
    def freeze_current_state(state: State) -> FrozenState:
        """Convert State dict into a hashable format FrozenState.

        Args:
            state: The state which should be converted

        Return:
            hashable form of the state of type `FrozenState`
        """
        return frozenset(
            {
                key: frozenset(values.items())
                if isinstance(values, Dict)
                else frozenset(values)
                for key, values in state.items()
            }.items()
        )

    def past_states(
        self,
        domain: Domain,
        omit_unset_slots: bool = False,
        ignore_rule_only_turns: bool = False,
        rule_only_data: Optional[Dict[Text, Any]] = None,
    ) -> List[State]:
        """Generates the past states of this tracker based on the history.

        Args:
            domain: The Domain.
            omit_unset_slots: If `True` do not include the initial values of slots.
            ignore_rule_only_turns: If True ignore dialogue turns that are present
                only in rules.
            rule_only_data: Slots and loops,
                which only occur in rules but not in stories.

        Returns:
            A list of states
        """
        return domain.states_for_tracker_history(
            self,
            omit_unset_slots=omit_unset_slots,
            ignore_rule_only_turns=ignore_rule_only_turns,
            rule_only_data=rule_only_data,
        )

    def change_loop_to(self, loop_name: Optional[Text]) -> None:
        """Set the currently active loop.

        Args:
            loop_name: The name of loop which should be marked as active.
        """
        if loop_name is not None:
            self.active_loop = TrackerActiveLoop(
                loop_name,
                False,
                False,
                self.latest_message.parse_data if self.latest_message else None,
            )
        else:
            self.active_loop = None

    def interrupt_loop(self, is_interrupted: bool) -> None:
        """Interrupt loop and mark that we entered an unhappy path in the conversation.

        Args:
            is_interrupted: `True` if the loop was run after an unhappy path.
        """
        if self.active_loop is not None:
            self.active_loop.is_interrupted = is_interrupted

    def reject_action(self, action_name: Text) -> None:
        """Notify active loop that it was rejected."""
        if self.active_loop is not None and action_name == self.active_loop_name:
            self.active_loop.rejected = True

    def set_latest_action(self, action: Dict[Text, Text]) -> None:
        """Sets latest action name or text.

        Resets loop validation and rejection parameters.

        Args:
            action: Serialized action event.
        """
        self.latest_action = action
        if self.active_loop is not None and self.active_loop_name:
            # reset form validation if some loop is active
            self.active_loop.is_interrupted = False

        if (
            self.active_loop is not None
            and action.get(ACTION_NAME) == self.active_loop_name
        ):
            # reset loop rejection if it was predicted again
            self.active_loop.rejected = False

    def current_slot_values(self) -> Dict[Text, Any]:
        """Return the currently set values of the slots."""
        return {key: slot.value for key, slot in self.slots.items()}

    def get_slot(self, key: Text) -> Optional[Any]:
        """Retrieves the value of a slot."""
        if key in self.slots:
            return self.slots[key].value
        else:
            logger.info(f"Tried to access non existent slot '{key}'")
            return None

    def has_bot_message_after_latest_user_message(self) -> bool:
        """Checks if there is a bot message after the most recent user message.

        Returns:
            `True` if there is an action after the most recent user message.
        """
        for event in reversed(self.applied_events()):
            if isinstance(event, BotUttered):
                return True
            elif isinstance(event, UserUttered):
                return False
        return False

    def has_action_after_latest_user_message(self) -> bool:
        """Check if there is an action after the most recent user message.

        Returns:
            `True` if there is an action after the most recent user message.
        """
        for event in reversed(self.applied_events()):
            if isinstance(event, ActionExecuted):
                return True
            elif isinstance(event, UserUttered):
                return False
        return False

    def get_latest_entity_values(
        self,
        entity_type: Text,
        entity_role: Optional[Text] = None,
        entity_group: Optional[Text] = None,
    ) -> Iterator[Text]:
        """Get entity values found for the passed entity type and optional role and
        group in latest message.

        If you are only interested in the first entity of a given type use
        `next(tracker.get_latest_entity_values(`"`my_entity_name`"`), None)`.
        If no entity is found `None` is the default result.

        Args:
            entity_type: the entity type of interest
            entity_role: optional entity role of interest
            entity_group: optional entity group of interest

        Returns:
            Entity values.
        """
        if self.latest_message is None:
            return iter([])

        return (
            cast(Text, x[ENTITY_ATTRIBUTE_VALUE])
            for x in self.latest_message.entities
            if x.get(ENTITY_ATTRIBUTE_TYPE) == entity_type
            and x.get(ENTITY_ATTRIBUTE_GROUP) == entity_group
            and x.get(ENTITY_ATTRIBUTE_ROLE) == entity_role
        )

    def get_latest_input_channel(self) -> Optional[Text]:
        """Get the name of the input_channel of the latest UserUttered event."""
        for e in reversed(self.events):
            if isinstance(e, UserUttered):
                return e.input_channel
        return None

    def is_paused(self) -> bool:
        """State whether the tracker is currently paused."""
        return self._paused

    def idx_after_latest_restart(self) -> int:
        """Return the idx of the most recent restart in the list of events.

        If the conversation has not been restarted, ``0`` is returned.
        """
        for i, event in enumerate(reversed(self.events)):
            if isinstance(event, Restarted):
                return len(self.events) - i

        return 0

    def events_after_latest_restart(self) -> List[Event]:
        """Return a list of events after the most recent restart."""
        return list(self.events)[self.idx_after_latest_restart() :]

    def init_copy(self) -> "DialogueStateTracker":
        """Creates a new state tracker with the same initial values."""
        return DialogueStateTracker(
            self.sender_id or DEFAULT_SENDER_ID,
            self.slots.values(),
            self._max_event_history,
            is_rule_tracker=self.is_rule_tracker,
        )

    def generate_all_prior_trackers(
        self,
    ) -> Generator[Tuple["DialogueStateTracker", bool], None, None]:
        """Returns a generator of the previous trackers of this tracker.

        Returns:
            The tuple with the tracker before each action,
            and the boolean flag representing whether this action should be hidden
            in the dialogue history created for ML-based policies.
        """
        tracker = self.init_copy()

        for event in self.applied_events():

            if isinstance(event, ActionExecuted):
                yield tracker, event.hide_rule_turn

            tracker.update(event)

        yield tracker, False

    def applied_events(self) -> List[Event]:
        """Returns all actions that should be applied - w/o reverted events.

        Returns:
            The events applied to the tracker.
        """
        loop_names = [
            event.name
            for event in self.events
            if isinstance(event, ActiveLoop) and event.name
        ]

        applied_events: List[Event] = []

        for event in self.events:
            if isinstance(event, (Restarted, SessionStarted)):
                applied_events = []
            elif isinstance(event, ActionReverted):
                self._undo_till_previous(ActionExecuted, applied_events)
            elif isinstance(event, UserUtteranceReverted):
                # Seeing a user uttered event automatically implies there was
                # a listen event right before it, so we'll first rewind the
                # user utterance, then get the action right before it (also removes
                # the `action_listen` action right before it).
                self._undo_till_previous(UserUttered, applied_events)
                self._undo_till_previous(ActionExecuted, applied_events)
            elif (
                isinstance(event, ActionExecuted)
                and event.action_name in loop_names
                and not self._first_loop_execution_or_unhappy_path(
                    event.action_name, applied_events
                )
            ):
                self._undo_till_previous_loop_execution(
                    event.action_name, applied_events
                )
            else:
                applied_events.append(event)

        return applied_events

    @staticmethod
    def _undo_till_previous(event_type: Type[Event], done_events: List[Event]) -> None:
        """Removes events from `done_events`.

        Removes events from `done_events` until the first occurrence `event_type`
        is found which is also removed.
        """
        # list gets modified - hence we need to copy events!
        for e in reversed(done_events[:]):
            del done_events[-1]
            if isinstance(e, event_type):
                break

    def _first_loop_execution_or_unhappy_path(
        self, loop_action_name: Text, applied_events: List[Event]
    ) -> bool:
        next_action: Optional[Text] = None

        for event in reversed(applied_events):
            # Stop looking for a previous loop execution if there is a loop deactivation
            # event because it means that the current loop is running for the first
            # time and previous loop events belong to different loops.
            if isinstance(event, ActiveLoop) and event.name is None:
                return True

            if self._is_within_unhappy_path(loop_action_name, event, next_action):
                return True

            if isinstance(event, ActionExecuted):
                # We found a previous execution of the loop and we are not within an
                # unhappy path.
                if event.action_name == loop_action_name:
                    return False

                # Remember the action as we need that to check whether we might be
                # within an unhappy path.
                next_action = event.action_name

        return True

    @staticmethod
    def _is_within_unhappy_path(
        loop_action_name: Text, event: Event, next_action_in_the_future: Optional[Text]
    ) -> bool:
        # When actual users are talking to the action has to return an
        # `ActionExecutionRejected` in order to enter an unhappy path.
        loop_was_rejected_previously = (
            isinstance(event, ActionExecutionRejected)
            and event.action_name == loop_action_name
        )
        # During the policy training there are no `ActionExecutionRejected` events
        # which let us see whether we are within an unhappy path. Hence, we check if a
        # different action was executed instead of the loop after last user utterance.
        other_action_after_latest_user_utterance = (
            isinstance(event, UserUttered)
            and next_action_in_the_future is not None
            and next_action_in_the_future != loop_action_name
        )

        return loop_was_rejected_previously or other_action_after_latest_user_utterance

    @staticmethod
    def _undo_till_previous_loop_execution(
        loop_action_name: Text, done_events: List[Event]
    ) -> None:
        offset = 0
        for e in reversed(done_events[:]):
            if isinstance(e, ActionExecuted) and e.action_name == loop_action_name:
                break

            if isinstance(
                e, (ActionExecuted, UserUttered, DefinePrevUserUtteredFeaturization)
            ):
                del done_events[-1 - offset]
            else:
                # Remember events which aren't unfeaturized to get the index right
                offset += 1

    def replay_events(self) -> None:
        """Update the tracker based on a list of events."""
        applied_events = self.applied_events()
        for event in applied_events:
            event.apply_to(self)

    def recreate_from_dialogue(self, dialogue: Dialogue) -> None:
        """Use a serialised `Dialogue` to update the trackers state.

        This uses the state as is persisted in a ``TrackerStore``. If the
        tracker is blank before calling this method, the final state will be
        identical to the tracker from which the dialogue was created.
        """
        if not isinstance(dialogue, Dialogue):
            raise ValueError(
                f"story {dialogue} is not of type Dialogue. "
                f"Have you deserialized it?"
            )

        self._reset()
        self.events.extend(dialogue.events)
        self.replay_events()

    def copy(self) -> "DialogueStateTracker":
        """Creates a duplicate of this tracker."""
        return self.travel_back_in_time(float("inf"))

    def travel_back_in_time(self, target_time: float) -> "DialogueStateTracker":
        """Creates a new tracker with a state at a specific timestamp.

        A new tracker will be created and all events previous to the
        passed time stamp will be replayed. Events that occur exactly
        at the target time will be included.
        """
        tracker = self.init_copy()

        for event in self.events:
            if event.timestamp <= target_time:
                tracker.update(event)
            else:
                break

        return tracker  # yields the final state

    def as_dialogue(self) -> Dialogue:
        """Return a ``Dialogue`` object containing all of the turns.

        This can be serialised and later used to recover the state
        of this tracker exactly.
        """
        return Dialogue(self.sender_id, list(self.events))

    def update(self, event: Event, domain: Optional[Domain] = None) -> None:
        """Modify the state of the tracker according to an ``Event``."""
        if not isinstance(event, Event):  # pragma: no cover
            raise ValueError("event to log must be an instance of a subclass of Event.")

        if self.model_id and METADATA_MODEL_ID not in event.metadata:
            event.metadata = {**event.metadata, METADATA_MODEL_ID: self.model_id}

        if self.assistant_id and ASSISTANT_ID_KEY not in event.metadata:
            event.metadata = {**event.metadata, ASSISTANT_ID_KEY: self.assistant_id}

        self.events.append(event)
        event.apply_to(self)

    def update_with_events(
        self,
        new_events: List[Event],
        domain: Optional[Domain],
        override_timestamp: bool = True,
    ) -> None:
        """Adds multiple events to the tracker.

        Args:
            new_events: Events to apply.
            domain: The current model's domain.
            override_timestamp: If `True` refresh all timestamps of the events. As the
                events are usually created at some earlier point, this makes sure that
                all new events come after any current tracker events.
        """
        for e in new_events:
            if override_timestamp:
                e.timestamp = time.time()
            self.update(e, domain)

    def as_story(self, include_source: bool = False) -> "Story":
        """Dump the tracker as a story in the Rasa Core story format.

        Returns the dumped tracker as a string.
        """
        from rasa.shared.core.training_data.structures import Story

        story_name = (
            f"{self.sender_id} ({self.sender_source})"
            if include_source
            else self.sender_id
        )
        return Story.from_events(list(self.events), story_name)

    def export_stories(
        self,
        writer: "StoryWriter",
        e2e: bool = False,
        include_source: bool = False,
        should_append_stories: bool = False,
    ) -> Text:
        """Dump the tracker as a story in the Rasa Core story format.

        Returns:
            The dumped tracker as a string.
        """
        story = self.as_story(include_source)
        return writer.dumps(
            story.story_steps, is_appendable=should_append_stories, is_test_story=e2e
        )

    def export_stories_to_file(self, export_path: Text = "debug_stories.yml") -> None:
        """Dump the tracker as a story to a file."""
        from rasa.shared.core.training_data.story_writer.yaml_story_writer import (
            YAMLStoryWriter,
        )

        append = os.path.exists(export_path)

        rasa.shared.utils.io.write_text_file(
            self.export_stories(YAMLStoryWriter(), should_append_stories=append) + "\n",
            export_path,
            append=append,
        )

    def get_last_event_for(
        self,
        event_type: Union[Type["EventTypeAlias"], Tuple[Type["EventTypeAlias"], ...]],
        action_names_to_exclude: Optional[List[Text]] = None,
        skip: int = 0,
        event_verbosity: EventVerbosity = EventVerbosity.APPLIED,
    ) -> Optional["EventTypeAlias"]:
        """Gets the last event of a given type which was actually applied.

        Args:
            event_type: The type of event you want to find.
            action_names_to_exclude: Events of type `ActionExecuted` which
                should be excluded from the results. Can be used to skip
                `action_listen` events.
            skip: Skips n possible results before return an event.
            event_verbosity: Which `EventVerbosity` should be used to search for events.

        Returns:
            event which matched the query or `None` if no event matched.
        """
        to_exclude = action_names_to_exclude or []

        def filter_function(e: Event) -> bool:
            has_instance = isinstance(e, event_type)
            excluded = isinstance(e, ActionExecuted) and e.action_name in to_exclude
            return has_instance and not excluded

        filtered = filter(
            filter_function, reversed(self._events_for_verbosity(event_verbosity) or [])
        )

        for i in range(skip):
            next(filtered, None)

        return next(filtered, None)

    def last_executed_action_has(self, name: Text, skip: int = 0) -> bool:
        """Returns whether last `ActionExecuted` event had a specific name.

        Args:
            name: Name of the event which should be matched.
            skip: Skips n possible results in between.

        Returns:
            `True` if last executed action had name `name`, otherwise `False`.
        """
        last: Optional[ActionExecuted] = self.get_last_event_for(
            ActionExecuted, action_names_to_exclude=[ACTION_LISTEN_NAME], skip=skip
        )
        return last is not None and last.action_name == name

    ###
    # Internal methods for the modification of the trackers state. Should
    # only be called by events, not directly. Rather update the tracker
    # with an event that in its ``apply_to`` method modifies the tracker.
    ###
    def _reset(self) -> None:
        """Reset tracker to initial state - doesn't delete events though!."""
        self._reset_slots()
        self._paused = False
        self.latest_action = {}
        self.latest_message = UserUttered.empty()
        self.latest_bot_utterance = BotUttered.empty()
        self.followup_action = ACTION_LISTEN_NAME
        self.active_loop = None

    def _reset_slots(self) -> None:
        """Set all the slots to their initial value."""
        for slot in self.slots.values():
            slot.reset()

    def _set_slot(self, key: Text, value: Any) -> None:
        """Sets the value of a slot if that slot exists."""
        if key in self.slots:
            slot = self.slots[key]
            slot.value = value
        else:
            logger.error(
                f"Tried to set non existent slot '{key}'. Make sure you "
                f"added all your slots to your domain file."
            )

    def _create_events(self, evts: List[Event]) -> Deque[Event]:
        if evts and not isinstance(evts[0], Event):  # pragma: no cover
            raise ValueError("events, if given, must be a list of events")
        return deque(evts, self._max_event_history)

    def __eq__(self, other: Any) -> bool:
        if isinstance(self, type(other)):
            return other.events == self.events and self.sender_id == other.sender_id
        else:
            return False

    def __ne__(self, other: Any) -> bool:
        return not self.__eq__(other)

    def trigger_followup_action(self, action: Text) -> None:
        """Triggers another action following the execution of the current."""
        self.followup_action = action

    def clear_followup_action(self) -> None:
        """Clears follow up action when it was executed."""
        self.followup_action = None

    @property
    def active_loop_name(self) -> Optional[Text]:
        """Get the name of the currently active loop.

        Returns: `None` if no active loop or the name of the currently active loop.
        """
        if not self.active_loop or self.active_loop.name == SHOULD_NOT_BE_SET:
            return None

        return self.active_loop.name

    @property
    def latest_action_name(self) -> Optional[Text]:
        """Get the name of the previously executed action or text of e2e action.

        Returns: name of the previously executed action or text of e2e action
        """
        if self.latest_action is None:
            return None

        return self.latest_action.get(ACTION_NAME) or self.latest_action.get(
            ACTION_TEXT
        )

    @property
    def is_active_loop_rejected(self) -> bool:
        """Return True if there is an active loop and it's rejected."""
        return self.active_loop is not None and self.active_loop.rejected

    @property
    def is_active_loop_interrupted(self) -> bool:
        """Return True if there is an active loop and it's interrupted."""
        return self.active_loop is not None and self.active_loop.is_interrupted

    def fingerprint(self) -> Text:
        """Returns a unique hash for the tracker which is stable across python runs.

        Returns:
            fingerprint of the tracker
        """
        data: Dict[Text, Any] = {"sender_id": self.sender_id}

        if self.slots:
            data.update(self.slots)

        if self.events:
            data["events"] = list(self.events)

        return rasa.shared.utils.io.get_dictionary_fingerprint(data)


class TrackerEventDiffEngine:
    """Computes event difference of two trackers."""

    @staticmethod
    def event_difference(
        original: DialogueStateTracker, tracker: DialogueStateTracker
    ) -> List[Event]:
        """Returns all events from the new tracker which are not present
        in the original tracker.

        Args:
            tracker: Tracker containing events from the current conversation session.
        """
        offset = len(original.events) if original else 0
        events = tracker.events
        return list(itertools.islice(events, offset, len(events)))


def get_active_loop_name(
    state: State,
) -> Optional[Text]:
    """Get the name of current active loop.

    Args:
        state: The state from which the name of active loop should be extracted

    Return:
        the name of active loop or None
    """
    if (
        not state.get(ACTIVE_LOOP)
        or state[ACTIVE_LOOP].get(LOOP_NAME) == SHOULD_NOT_BE_SET
    ):
        return None

    # FIXME: better type annotation for `State` would require
    # a larger refactoring (e.g. switch to dataclass)
    return cast(Optional[Text], state[ACTIVE_LOOP].get(LOOP_NAME))


def is_prev_action_listen_in_state(state: State) -> bool:
    """Check if action_listen is the previous executed action.

    Args:
        state: The state for which the check should be performed

    Return:
        boolean value indicating whether action_listen is previous action
    """
    prev_action_name = state.get(PREVIOUS_ACTION, {}).get(ACTION_NAME)
    return prev_action_name == ACTION_LISTEN_NAME


def get_trackers_for_conversation_sessions(
    tracker: DialogueStateTracker,
) -> List[DialogueStateTracker]:
    """Generate trackers for `tracker` that are split by conversation sessions.

    Args:
        tracker: Instance of `DialogueStateTracker` to split.

    Returns:
        The trackers split by conversation sessions.
    """
    split_conversations = events.split_events(
        tracker.events,
        ActionExecuted,
        {"action_name": ACTION_SESSION_START_NAME},
        include_splitting_event=True,
    )

    return [
        DialogueStateTracker.from_events(
            tracker.sender_id,
            evts,
            tracker.slots.values(),
            sender_source=tracker.sender_source,
            max_event_history=tracker._max_event_history,
        )
        for evts in split_conversations
    ]