python/whylogs/core/metrics/compound_metric.py
from abc import ABC
from copy import deepcopy
from typing import Any, Dict, List, Optional, Type, TypeVar
from whylogs.core.configs import SummaryConfig
from whylogs.core.errors import DeserializationError, UnsupportedError
from whylogs.core.metrics import Metric
from whylogs.core.metrics.metrics import _METRIC_DESERIALIZER_REGISTRY, OperationResult
from whylogs.core.preprocessing import PreprocessedColumn
from whylogs.core.proto import MetricComponentMessage, MetricMessage
COMPOUND_METRIC = TypeVar("COMPOUND_METRIC", bound="CompoundMetric")
class CompoundMetric(Metric, ABC):
"""
CompoundMetric serves as a base class for custom metrics that consist
of one or more metrics. It is handy when you need to do some
processing of the logged values and track serveral metrics on
the results. The sub-metrics must either be a StandardMetric, or tagged
as a @custom_metric or registered via register_metric(). Note that
CompoundMetric is neither, so it cannot be nested.
Typically you will need to override namespace(); columnar_update(), calling
it on the submetrics as needed; and the zero() method to return an
appropriate "empty" instance of your metric. You will need to override from_protobuf()
and merge() if your subclass __init__() method takes arguments different than
CompoundMetrtic's. You can use the submetrics_from_protbuf() and merge_submetrics()
helper methods to implement them. The CompoundMetric class will handle the rest of
the Metric interface. Don't use / or : in the subclass' namespace.
See UnicodeRangeMetric for an example.
"""
submetrics: Dict[str, Metric]
def __init__(self, submetrics: Dict[str, Metric]):
"""
Parameters
----------
submetrics : Dict[str, Metric]
The collection of metrics that comprise the CompoundMetric.
The key servers as the name of the sub-metric. E.g., the
metric summary entries will have keys of the form
"<submetric name>/<component name>"
Submetric names should only contain alphanumeric characters,
hyphens, and underscores.
"""
if ":" in self.namespace or "/" in self.namespace:
raise ValueError(f"Invalid namespace {self.namespace}")
for sub_name, submetric in submetrics.items():
if ":" in sub_name or "/" in sub_name:
raise ValueError(f"Invalid submetric name {sub_name}")
self.submetrics = submetrics.copy()
def merge_submetrics(self: COMPOUND_METRIC, other: COMPOUND_METRIC) -> Dict[str, Metric]:
if self.namespace != other.namespace:
raise ValueError(f"Attempt to merge CompoundMetrics {self.namespace} and {other.namespace}")
submetric_names = set(self.submetrics.keys())
submetric_names.update(other.submetrics.keys())
submetrics: Dict[str, Metric] = dict()
for submetric_name in submetric_names:
if submetric_name in self.submetrics and submetric_name in other.submetrics:
if self.submetrics[submetric_name].namespace != other.submetrics[submetric_name].namespace:
raise ValueError("Attempt to merge CompoundMetrics with incompatible submetric types")
submetrics[submetric_name] = self.submetrics[submetric_name] + other.submetrics[submetric_name]
elif submetric_name in self.submetrics:
submetrics[submetric_name] = deepcopy(self.submetrics[submetric_name])
else:
submetrics[submetric_name] = deepcopy(other.submetrics[submetric_name])
return submetrics
def merge(self: COMPOUND_METRIC, other: COMPOUND_METRIC) -> COMPOUND_METRIC:
return self.__class__(self.merge_submetrics(other))
def to_protobuf(self) -> MetricMessage:
msg = {}
for sub_name, submetric in self.submetrics.items():
sub_msg = submetric.to_protobuf()
for comp_name, comp_msg in sub_msg.metric_components.items():
msg[sub_name + ":" + submetric.namespace + "/" + comp_name] = comp_msg
return MetricMessage(metric_components=msg)
def get_component_paths(self) -> List[str]:
res = []
for sub_name, submetric in self.submetrics.items():
for comp_name in submetric.get_component_paths():
res.append(sub_name + ":" + submetric.namespace + "/" + comp_name)
return res
def to_summary_dict(self, cfg: Optional[SummaryConfig] = None) -> Dict[str, Any]:
cfg = cfg or SummaryConfig()
summary = {}
for sub_name, submetric in self.submetrics.items():
sub_summary = submetric.to_summary_dict(cfg)
for comp_name, comp_msg in sub_summary.items():
summary["/".join([sub_name, comp_name])] = comp_msg
return summary
def columnar_update(self, view: PreprocessedColumn) -> OperationResult:
result = OperationResult()
for submetric in self.submetrics.values():
result = result + submetric.columnar_update(view)
return result
@classmethod
def submetrics_from_protobuf(cls: Type[COMPOUND_METRIC], msg: MetricMessage) -> Dict[str, Metric]:
submetrics: Dict[str, Metric] = {}
submetric_msgs: Dict[str, Dict[str, MetricComponentMessage]] = {}
for key, comp_msg in msg.metric_components.items():
submetric_name, comp_name = key.split("/")
if submetric_msgs.get(submetric_name) is None:
submetric_msgs[submetric_name] = {}
submetric_msgs[submetric_name][comp_name] = comp_msg
for m_name_and_type, metric_components in submetric_msgs.items():
m_name, m_type = m_name_and_type.split(":")
if m_type in _METRIC_DESERIALIZER_REGISTRY:
metric_class = _METRIC_DESERIALIZER_REGISTRY[m_type]
else:
raise UnsupportedError(f"Unsupported metric: {m_type}")
m_msg = MetricMessage(metric_components=metric_components)
try:
submetrics[m_name] = metric_class.from_protobuf(m_msg)
except: # noqa
raise DeserializationError(f"Failed to deserialize metric: {m_name}")
return submetrics
@classmethod
def from_protobuf(cls: Type[COMPOUND_METRIC], msg: MetricMessage) -> COMPOUND_METRIC:
return cls(cls.submetrics_from_protobuf(msg))