gbrltv/CDESF2

View on GitHub
cdesf/utils/metrics.py

Summary

Maintainability
A
2 hrs
Test Coverage
from typing import List, Tuple
import pandas as pd
from datetime import datetime
from os import makedirs
import json
import networkx as nx
from ..data_structures import Case, Cluster
from ..visualization import save_graphviz


class Metrics:
    """
    Controls the computation of metrics during the stream processing
    and writes the results into files periodically (check points)
    """

    case_columns: "list[str]"
    cluster_columns: "list[str]"
    additional_attributes: "list[str]"

    def __init__(self, file_name: str, additional_attributes: "list[str]" = []):
        """
        Creates the paths for the outputs and initializes the metrics attributes

        Parameters
        --------------------------------------
        file_name: str
            Process name, used for the path creation
        """
        self.case_metrics = []
        self.cluster_metrics = []
        self.file_name = file_name
        self.additional_attributes = additional_attributes

        self.path_to_pmg_metrics = f"output/metrics/{file_name}_process_model_graphs"
        self.path_to_pmg_vis = f"output/visualization/{file_name}_process_model_graphs"
        self.path_to_drifts = "output/visualization/drifts"
        self.path_to_case_metrics = "output/metrics/case_metrics"
        self.path_to_cluster_metrics = "output/metrics/cluster_metrics"
        try:
            makedirs(self.path_to_pmg_metrics, exist_ok=True)
            makedirs(self.path_to_pmg_vis, exist_ok=True)
            makedirs(self.path_to_drifts, exist_ok=True)
            makedirs(self.path_to_case_metrics, exist_ok=True)
            makedirs(self.path_to_cluster_metrics, exist_ok=True)

            self.case_columns, self.cluster_columns = self.generate_column_names()

            pd.DataFrame(columns=self.case_columns).to_csv(
                f"{self.path_to_case_metrics}/{file_name}.csv", index=False
            )
            pd.DataFrame(columns=self.cluster_columns).to_csv(
                f"{self.path_to_cluster_metrics}/{file_name}.csv", index=False
            )
        except Exception as e:
            print(e)

    def generate_column_names(self) -> Tuple["list[str]", "list[str]"]:
        case_columns = []
        cluster_columns = []

        case_columns += [
            "stream_index",
            "timestamp",
            "check point",
            "case",
            "graph distance",
            "time distance",
        ]
        case_columns += [
            f"{attribute_name} distance"
            for attribute_name in self.additional_attributes
        ]
        case_columns += ["label"]

        cluster_columns += [
            "stream_index",
            "timestamp",
            "check point",
            "cluster id",
            "graph coordinate",
            "time coordinate",
        ]
        cluster_columns += [
            f"{attribute_name} coordinate"
            for attribute_name in self.additional_attributes
        ]
        cluster_columns += ["radius", "weight", "cluster type"]

        return case_columns, cluster_columns

    def compute_case_metrics(
        self,
        event_index: int,
        timestamp: datetime,
        cp_count: int,
        case: Case,
        label: bool,
    ) -> None:
        """
        Generates case metrics and saves them in the self.case_metrics attribute.

        Parameters
        --------------------------------------
        event_index: int
            Index of the current event
        timestamp: datetime
            Current timestamp of the stream
        cp_count: int
            Current check point
        case: Case
            Case to be saved
        label: bool
            Controls if case is normal or anomalous
        """
        label_str = "anomalous"
        if label:
            label_str = "normal"

        data = [
            event_index,
            timestamp,
            cp_count,
            case.id,
            case.distances.get("graph"),
            case.distances.get("time"),
        ]

        for attribute_name in self.additional_attributes:
            data.append(case.distances.get(attribute_name))

        # `label` is always the last column
        data.append(label_str)

        self.case_metrics.append(data)

    def save_case_metrics_on_check_point(self) -> None:
        """
        Saves the case metrics into a file according to a set path and name.
        Also releases the case_metrics attribute
        """
        cm_path = f"{self.path_to_case_metrics}/{self.file_name}.csv"
        pd.read_csv(cm_path).append(
            pd.DataFrame(self.case_metrics, columns=self.case_columns)
        ).to_csv(cm_path, index=False)
        self.case_metrics.clear()

    def compute_cluster_metrics_helper(
        self,
        event_index: int,
        timestamp: datetime,
        cp_count: int,
        cluster: Cluster,
        cluster_type: str,
    ) -> None:
        """
        Helper function to save metrics into cluster_metrics attribute.
        """
        data = [
            event_index,
            timestamp,
            cp_count,
            cluster.id,
        ]

        for dimension in cluster.centroid:
            data.append(dimension)

        data += [
            cluster.radius,
            cluster.weight,
            cluster_type,
        ]

        self.cluster_metrics.append(data)

    def compute_cluster_metrics(
        self,
        event_index: int,
        timestamp: datetime,
        cp_count: int,
        normal_clusters: Tuple[List[List], List[List]],
        o_clusters: List[Cluster],
    ) -> None:
        """
        Generates cluster metrics and saves them in the self._cluster_metrics attribute.

        Parameters
        --------------------------------------
        event_index: int
            Index of the current event
        timestamp: datetime
            Current timestamp of the stream
        cp_count: int
            Current check point
        normal_clusters: Tuple[List[List], List[List]]
            Core and potential micro-clusters maintained by denstream
        o_clusters: List[Cluster]
            Outlier micro-clusters maintained by denstream
        """
        c_clusters, p_clusters = normal_clusters[0], normal_clusters[1]
        for group in c_clusters:
            for cluster in group:
                self.compute_cluster_metrics_helper(
                    event_index, timestamp, cp_count, cluster, "core micro-cluster"
                )

        for group in p_clusters:
            for cluster in group:
                self.compute_cluster_metrics_helper(
                    event_index, timestamp, cp_count, cluster, "potential micro-cluster"
                )

        for cluster in o_clusters:
            self.compute_cluster_metrics_helper(
                event_index, timestamp, cp_count, cluster, "outlier micro-cluster"
            )

    def save_cluster_metrics_on_check_point(self) -> None:
        """
        Saves the cluster metrics into a file according to a set path and name.
        Also releases the cluster_metrics attribute
        """
        cm_path = f"{self.path_to_cluster_metrics}/{self.file_name}.csv"
        pd.read_csv(cm_path).append(
            pd.DataFrame(self.cluster_metrics, columns=self.cluster_columns)
        ).to_csv(cm_path, index=False)
        self.cluster_metrics.clear()

    def save_pmg_on_check_point(
        self, process_model_graph: nx.DiGraph, cp_count: int
    ) -> None:
        """
        Saves the Process Model Graph at all check points in a JSON file and plots

        Parameters
        --------------------------------------
        process_model_graph: nx.DiGraph
            Process model graph on the current check point
        cp_count: int
            Current check point
        """
        try:
            with open(
                f"{self.path_to_pmg_metrics}/process_model_graph_{cp_count}.json", "w"
            ) as file:
                file.write(
                    json.dumps(
                        nx.readwrite.json_graph.node_link_data(process_model_graph)
                    )
                )

            save_graphviz(
                process_model_graph,
                f"{self.path_to_pmg_vis}/process_model_graph_{cp_count}",
            )
        except Exception as e:
            print(e)