gbrltv/CDESF2

View on GitHub
cdesf/utils/distances.py

Summary

Maintainability
B
4 hrs
Test Coverage
import networkx as nx
import collections
from numpy import average, count_nonzero
from math import log10
from ..data_structures import Case


def calculate_case_distances(
    graph: nx.DiGraph, case: Case, *, additional_attributes: "list[str]" = []
) -> "dict[str, float]":
    """
    Extracts the distances between the current graph and a given case across trace,
    time, and the additional attributes provided.

    Arguments:
        graph: nx.DiGraph

        case: Case

        additional_attributes: list[str]:
            A list of additional attributes to calculate the distances for.

    Returns:
        A dictionary that associates each attribute with the distance between the
        graph and the case.
    """
    trace = case.get_trace()
    time_differences = calculate_case_time_distances(case)

    if not graph.edges or len(trace) < 2:
        # Return a map with all-zero distances.
        default_distances = {"graph": 0.0, "time": 0.0}

        for attribute_name in additional_attributes:
            default_distances[attribute_name] = 0.0

        return default_distances

    distances: "dict[str, float]" = {}

    # Node distances
    for attribute_name in additional_attributes:
        is_numerical = True
        current_total = 0
        difference_total = 0

        for index, activity in enumerate(trace):
            case_value = case.get_attribute(attribute_name)[index]

            activity_node: dict = graph.nodes.get(activity)

            if not activity_node:
                # This is the first time we see this activity, so the distance is maximum.
                # This might be the first ever event for this attribute, so we also need to check to see if it is
                # numerical or categorical
                if isinstance(case_value, str):
                    current_total += 1
                else:
                    current_total += 0  # The average for this attribute so far should remain the same
                    difference_total += case_value  # The distance should be maximum

                continue

            activity_attribute_values = activity_node.get(attribute_name)

            if isinstance(case_value, str):
                # Handle this attribute like a categorical attribute
                is_numerical = False
                activity_attribute_counter = collections.Counter(
                    activity_attribute_values
                )
                activity_attribute_frequency = activity_attribute_counter.get(
                    case_value, 0
                )
                activity_attribute_distance = 1 - (
                    activity_attribute_frequency / len(activity_attribute_values)
                )

                current_total += activity_attribute_distance
            else:
                # Handle this attribute like a numerical attribute
                is_numerical = True
                activity_attribute_average = average(activity_attribute_values)
                current_total += activity_attribute_average

                difference_total += abs(activity_attribute_average - case_value)

        if is_numerical:
            distances[attribute_name] = difference_total / current_total
        else:
            distances[attribute_name] = current_total / len(trace)

    # Edge distances
    normalized_times: "list[float]" = []
    normalized_weight = 0

    for trace_index in range(len(trace) - 1):
        first_node = trace[trace_index]
        second_node = trace[trace_index + 1]

        if graph.has_edge(first_node, second_node):
            edge = graph[first_node][second_node]
            normalized_times.append(edge["time_normalized"])
            normalized_weight += 1 - edge["weight_normalized"]
        else:
            normalized_times.append(0.0)
            normalized_weight += 1

    distances["graph"] = normalized_weight / (len(trace) - 1)

    time_current_total = sum(normalized_times)
    time_difference_total = 0

    for time_index, normalized_time in enumerate(normalized_times):
        time_difference_total += abs(normalized_time - time_differences[time_index])

    try:
        if time_difference_total == 0:
            distances["time"] = 0.0
        elif time_current_total == 0:
            distances["time"] = log10(time_difference_total)
        else:
            distances["time"] = log10(time_difference_total / time_current_total)
    except ValueError:
        distances["time"] = 0.0
    return distances


def calculate_case_time_distances(case: Case) -> "list[float]":
    timestamps = case.get_timestamps()

    if len(timestamps) < 2:
        return [0]

    distances: "list[float]" = []

    for lhs, rhs in list(zip(timestamps, timestamps[1:])):
        distances.append((rhs - lhs).total_seconds())

    return distances