rasa/shared/nlu/training_data/message.py

Summary

Maintainability
C
1 day
Test Coverage
A
92%
from typing import Any, Optional, Tuple, Text, Dict, Set, List

import typing
import copy

import rasa.shared.utils.io
from rasa.shared.exceptions import RasaException
from rasa.shared.nlu.constants import (
    TEXT,
    INTENT,
    RESPONSE,
    INTENT_RESPONSE_KEY,
    METADATA,
    METADATA_INTENT,
    METADATA_EXAMPLE,
    ENTITIES,
    ENTITY_ATTRIBUTE_START,
    ENTITY_ATTRIBUTE_END,
    RESPONSE_IDENTIFIER_DELIMITER,
    FEATURE_TYPE_SENTENCE,
    FEATURE_TYPE_SEQUENCE,
    ACTION_TEXT,
    ACTION_NAME,
    TEXT_TOKENS,
)
from rasa.shared.constants import DIAGNOSTIC_DATA

if typing.TYPE_CHECKING:
    from rasa.shared.nlu.training_data.features import Features


class Message:
    """Container for data that can be used to describe a conversation turn.

    The turn is described by a set of attributes such as e.g. `TEXT`  and  `INTENT`
    when describing a user utterance or e.g. `ACTION_NAME` for describing a bot action.
    The container includes raw information (`self.data`) as well as features
    (`self.features`) for each such attribute.
    Moreover, the message has a timestamp and can keep track about information
    on a specific subset of attributes (`self.output_properties`).
    """

    def __init__(
        self,
        data: Optional[Dict[Text, Any]] = None,
        output_properties: Optional[Set] = None,
        time: Optional[int] = None,
        features: Optional[List["Features"]] = None,
        **kwargs: Any,
    ) -> None:
        """Creates an instance of Message."""
        self.time = time
        self.data = data.copy() if data else {}
        self.features = features if features else []

        self.data.update(**kwargs)
        self._cached_fingerprint: Optional[Text] = None

        if output_properties:
            self.output_properties = output_properties
        else:
            self.output_properties = set()
        self.output_properties.add(TEXT)

    def add_features(self, features: Optional["Features"]) -> None:
        """Add more vectorized features to the message."""
        if features is not None:
            self.features.append(features)
        self._cached_fingerprint = None

    def add_diagnostic_data(self, origin: Text, data: Dict[Text, Any]) -> None:
        """Adds diagnostic data from the `origin` component.

        Args:
            origin: Name of the component that created the data.
            data: The diagnostic data.
        """
        if origin in self.get(DIAGNOSTIC_DATA, {}):
            rasa.shared.utils.io.raise_warning(
                f"Please make sure every pipeline component has a distinct name. "
                f"The name '{origin}' appears at least twice and diagnostic "
                f"data will be overwritten."
            )
        self.data.setdefault(DIAGNOSTIC_DATA, {})
        self.data[DIAGNOSTIC_DATA][origin] = data
        self._cached_fingerprint = None

    def set(self, prop: Text, info: Any, add_to_output: bool = False) -> None:
        """Sets the message's property to the given value.

        Args:
            prop: Name of the property to be set.
            info: Value to be assigned to that property.
            add_to_output: Decides whether to add `prop` to the `output_properties`.
        """
        self.data[prop] = info
        if add_to_output:
            self.output_properties.add(prop)
        self._cached_fingerprint = None

    def get(self, prop: Text, default: Optional[Any] = None) -> Any:
        """Retrieve message property."""
        return self.data.get(prop, default)

    def as_dict_nlu(self) -> dict:
        """Get dict representation of message as it would appear in training data."""
        d = self.as_dict()
        if d.get(INTENT, None):
            d[INTENT] = self.get_full_intent()
        d.pop(RESPONSE, None)
        d.pop(INTENT_RESPONSE_KEY, None)
        return d

    def as_dict(self, only_output_properties: bool = False) -> Dict:
        """Gets dict representation of message."""
        if only_output_properties:
            d = {}
            for key, value in self.data.items():
                if key in self.output_properties:
                    if key == TEXT_TOKENS:
                        d[TEXT_TOKENS] = [(t.start, t.end) for t in value]
                    else:
                        d[key] = value
        else:
            d = self.data

        # Filter all keys with None value. These could have come while building the
        # Message object in markdown format
        return {key: value for key, value in d.items() if value is not None}

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, Message):
            return False
        else:
            return other.fingerprint() == self.fingerprint()

    def __hash__(self) -> int:
        """Calculate a hash for the message.

        Returns:
            Hash of the message.
        """
        return int(self.fingerprint(), 16)

    def fingerprint(self) -> Text:
        """Calculate a string fingerprint for the message.

        Returns:
            Fingerprint of the message.
        """
        if self._cached_fingerprint is None:
            self._cached_fingerprint = rasa.shared.utils.io.deep_container_fingerprint(
                [self.data, self.features]
            )
        return self._cached_fingerprint

    @classmethod
    def build(
        cls,
        text: Text,
        intent: Optional[Text] = None,
        entities: Optional[List[Dict[Text, Any]]] = None,
        intent_metadata: Optional[Any] = None,
        example_metadata: Optional[Any] = None,
        **kwargs: Any,
    ) -> "Message":
        """Builds a Message from `UserUttered` data.

        Args:
            text: text of a user's utterance
            intent: an intent of the user utterance
            entities: entities in the user's utterance
            intent_metadata: optional metadata for the intent
            example_metadata: optional metadata for the intent example

        Returns:
            Message
        """
        data: Dict[Text, Any] = {TEXT: text}
        if intent:
            split_intent, response_key = cls.separate_intent_response_key(intent)
            if split_intent:
                data[INTENT] = split_intent
            if response_key:
                # intent label can be of the form - {intent}/{response_key},
                # so store the full intent label in intent_response_key
                data[INTENT_RESPONSE_KEY] = intent
        if entities:
            data[ENTITIES] = entities
        if intent_metadata is not None:
            data[METADATA] = {METADATA_INTENT: intent_metadata}
        if example_metadata is not None:
            data.setdefault(METADATA, {})[METADATA_EXAMPLE] = example_metadata

        return cls(data, **kwargs)

    def get_full_intent(self) -> Text:
        """Get intent as it appears in training data."""
        return (
            self.get(INTENT_RESPONSE_KEY)
            if self.get(INTENT_RESPONSE_KEY)
            else self.get(INTENT)
        )

    @staticmethod
    def separate_intent_response_key(
        original_intent: Text,
    ) -> Tuple[Text, Optional[Text]]:
        """Splits intent into main intent name and optional sub-intent name.

        For example, `"FAQ/how_to_contribute"` would be split into
        `("FAQ", "how_to_contribute")`. The response delimiter can
        take different values (not just `"/"`) and depends on the
        constant - `RESPONSE_IDENTIFIER_DELIMITER`.
        If there is no response delimiter in the intent, the second tuple
        item is `None`, e.g. `"FAQ"` would be mapped to `("FAQ", None)`.
        """
        split_title = original_intent.split(RESPONSE_IDENTIFIER_DELIMITER)
        if len(split_title) == 2:
            return split_title[0], split_title[1]
        elif len(split_title) == 1:
            return split_title[0], None

        raise RasaException(
            f"Intent name '{original_intent}' is invalid, "
            f"it cannot contain more than one '{RESPONSE_IDENTIFIER_DELIMITER}'."
        )

    def get_sparse_features(
        self, attribute: Text, featurizers: Optional[List[Text]] = None
    ) -> Tuple[Optional["Features"], Optional["Features"]]:
        """Gets all sparse features for the attribute given the list of featurizers.

        If no featurizers are provided, all available features will be considered.

        Args:
            attribute: message attribute
            featurizers: names of featurizers to consider

        Returns:
            Sparse features.
        """
        if featurizers is None:
            featurizers = []

        sequence_features, sentence_features = self._filter_sparse_features(
            attribute, featurizers
        )

        combined_sequence_features = self._combine_features(
            sequence_features, featurizers
        )
        combined_sentence_features = self._combine_features(
            sentence_features, featurizers
        )

        return combined_sequence_features, combined_sentence_features

    def get_sparse_feature_sizes(
        self, attribute: Text, featurizers: Optional[List[Text]] = None
    ) -> Dict[Text, List[int]]:
        """Gets sparse feature sizes for the attribute given the list of featurizers.

        If no featurizers are provided, all available features will be considered.

        Args:
            attribute: message attribute
            featurizers: names of featurizers to consider

        Returns:
            Sparse feature sizes.
        """
        if featurizers is None:
            featurizers = []

        sequence_features, sentence_features = self._filter_sparse_features(
            attribute, featurizers
        )
        sequence_sizes = [f.features.shape[1] for f in sequence_features]
        sentence_sizes = [f.features.shape[1] for f in sentence_features]

        return {
            FEATURE_TYPE_SEQUENCE: sequence_sizes,
            FEATURE_TYPE_SENTENCE: sentence_sizes,
        }

    def get_dense_features(
        self, attribute: Text, featurizers: Optional[List[Text]] = None
    ) -> Tuple[Optional["Features"], Optional["Features"]]:
        """Gets all dense features for the attribute given the list of featurizers.

        If no featurizers are provided, all available features will be considered.

        Args:
            attribute: message attribute
            featurizers: names of featurizers to consider

        Returns:
            Dense features.
        """
        if featurizers is None:
            featurizers = []

        sequence_features, sentence_features = self._filter_dense_features(
            attribute, featurizers
        )

        combined_sequence_features = self._combine_features(
            sequence_features, featurizers
        )
        combined_sentence_features = self._combine_features(
            sentence_features, featurizers
        )

        return combined_sequence_features, combined_sentence_features

    def get_all_features(
        self, attribute: Text, featurizers: Optional[List[Text]] = None
    ) -> List["Features"]:
        """Gets all features for the attribute given the list of featurizers.

        If no featurizers are provided, all available features will be considered.

        Args:
            attribute: message attribute
            featurizers: names of featurizers to consider

        Returns:
            Features.
        """
        sparse_features = self.get_sparse_features(attribute, featurizers)
        dense_features = self.get_dense_features(attribute, featurizers)

        return [f for f in sparse_features + dense_features if f is not None]

    def features_present(
        self, attribute: Text, featurizers: Optional[List[Text]] = None
    ) -> bool:
        """Checks if there are any features present for the attribute and featurizers.

        If no featurizers are provided, all available features will be considered.

        Args:
            attribute: Message attribute.
            featurizers: Names of featurizers to consider.

        Returns:
            ``True``, if features are present, ``False`` otherwise.
        """
        if featurizers is None:
            featurizers = []

        (
            sequence_sparse_features,
            sentence_sparse_features,
        ) = self._filter_sparse_features(attribute, featurizers)
        sequence_dense_features, sentence_dense_features = self._filter_dense_features(
            attribute, featurizers
        )

        return (
            len(sequence_sparse_features) > 0
            or len(sentence_sparse_features) > 0
            or len(sequence_dense_features) > 0
            or len(sentence_dense_features) > 0
        )

    def _filter_dense_features(
        self, attribute: Text, featurizers: List[Text]
    ) -> Tuple[List["Features"], List["Features"]]:
        sentence_features = [
            f
            for f in self.features
            if f.attribute == attribute
            and f.is_dense()
            and f.type == FEATURE_TYPE_SENTENCE
            and (f.origin in featurizers or not featurizers)
        ]
        sequence_features = [
            f
            for f in self.features
            if f.attribute == attribute
            and f.is_dense()
            and f.type == FEATURE_TYPE_SEQUENCE
            and (f.origin in featurizers or not featurizers)
        ]
        return sequence_features, sentence_features

    def _filter_sparse_features(
        self, attribute: Text, featurizers: List[Text]
    ) -> Tuple[List["Features"], List["Features"]]:
        sentence_features = [
            f
            for f in self.features
            if f.attribute == attribute
            and f.is_sparse()
            and f.type == FEATURE_TYPE_SENTENCE
            and (f.origin in featurizers or not featurizers)
        ]
        sequence_features = [
            f
            for f in self.features
            if f.attribute == attribute
            and f.is_sparse()
            and f.type == FEATURE_TYPE_SEQUENCE
            and (f.origin in featurizers or not featurizers)
        ]

        return sequence_features, sentence_features

    @staticmethod
    def _combine_features(
        features: List["Features"], featurizers: List[Text]
    ) -> Optional["Features"]:
        combined_features = None

        for f in features:
            if combined_features is None:
                combined_features = copy.deepcopy(f)
                combined_features.origin = featurizers
            else:
                combined_features.combine_with_features(f)

        return combined_features

    def is_core_or_domain_message(self) -> bool:
        """Checks whether the message is a core message or from the domain.

        E.g. a core message is created from a story or a domain action,
        not from the NLU data.

        Returns:
            True, if message is a core or domain message, false otherwise.
        """
        return bool(
            self.data.get(ACTION_NAME)
            or self.data.get(ACTION_TEXT)
            or (
                (self.data.get(INTENT) or self.data.get(RESPONSE))
                and not self.data.get(TEXT)
            )
            or (
                self.data.get(TEXT)
                and not (self.data.get(INTENT) or self.data.get(RESPONSE))
            )
        )

    def is_e2e_message(self) -> bool:
        """Checks whether the message came from an e2e story.

        Returns:
            `True`, if message is a from an e2e story, `False` otherwise.
        """
        return bool(
            (self.get(ACTION_TEXT) and not self.get(ACTION_NAME))
            or (self.get(TEXT) and not self.get(INTENT))
        )

    def find_overlapping_entities(
        self,
    ) -> List[Tuple[Dict[Text, Any], Dict[Text, Any]]]:
        """Finds any overlapping entity annotations."""
        entities = self.get(ENTITIES, [])[:]
        entities_with_location = [
            e
            for e in entities
            if (ENTITY_ATTRIBUTE_START in e.keys() and ENTITY_ATTRIBUTE_END in e.keys())
        ]
        entities_with_location.sort(key=lambda e: e[ENTITY_ATTRIBUTE_START])
        overlapping_pairs: List[Tuple[Dict[Text, Any], Dict[Text, Any]]] = []
        for i, entity in enumerate(entities_with_location):
            for other_entity in entities_with_location[i + 1 :]:
                if other_entity[ENTITY_ATTRIBUTE_START] < entity[ENTITY_ATTRIBUTE_END]:
                    overlapping_pairs.append((entity, other_entity))
                else:
                    break
        return overlapping_pairs