python/whylogs/experimental/api/logger/__init__.py
import logging
import math
from typing import List, Optional, Tuple, Union
from whylogs.api.logger import log
from whylogs.api.logger.result_set import ViewResultSet
from whylogs.core import DatasetSchema
from whylogs.core.stubs import np, pd
diagnostic_logger = logging.getLogger(__name__)
def _convert_to_int_if_bool(data: pd.core.frame.DataFrame, *columns: str) -> pd.core.frame.DataFrame:
for col in columns:
if all(isinstance(x, bool) for x in data[col]):
data[col] = data[col].apply(lambda x: 1 if x else 0)
return data
class RowWiseMetrics:
def __init__(
self,
target_column: str,
prediction_column: str,
convert_non_numeric: bool = False,
):
self.target_column = target_column
self.prediction_column = prediction_column
self.convert_non_numeric = convert_non_numeric
def relevant_counter(self, row: pd.core.series.Series, k: int) -> int:
if self.convert_non_numeric:
return sum(
[1 if pred_val in row[self.target_column] else 0 for pred_val in row[self.prediction_column][:k]]
)
else:
paired_sorted = sorted(zip(row[self.prediction_column], row[self.target_column]))
sorted_predictions, sorted_targets = zip(*paired_sorted)
sorted_predictions, sorted_targets = list(sorted_predictions), list(sorted_targets)
return sum([1 if target_val else 0 for target_val in sorted_targets[:k]])
def sum_gains(self, row: pd.core.series.Series, k: int) -> int:
if self.convert_non_numeric:
return sum(
[1 if pred_val in row[self.target_column] else 0 for pred_val in row[self.prediction_column][:k]]
)
else:
paired_sorted = sorted(zip(row[self.prediction_column], row[self.target_column]))
sorted_predictions, sorted_targets = zip(*paired_sorted)
sorted_predictions, sorted_targets = list(sorted_predictions), list(sorted_targets)
return sum([target_val if target_val else 0 for target_val in sorted_targets[:k]])
def is_k_item_relevant(self, row: pd.core.series.Series, k: int) -> int:
if self.convert_non_numeric:
return 1 if row[self.prediction_column][k - 1] in row[self.target_column] else 0
else:
index_ki = row[self.prediction_column].index(k)
return 1 if row[self.target_column][index_ki] else 0
def get_top_rank(self, row: pd.core.series.Series, k: int) -> Optional[int]:
for ki in range(1, k + 1):
if self.is_k_item_relevant(row, ki):
return ki
return None
def calc_non_numeric_relevance(self, row_dict: pd.core.series.Series) -> Tuple[List[int], List[int]]:
prediction_relevance = []
ideal_relevance = []
for target_val in row_dict[self.prediction_column]:
ideal_relevance.append(1 if target_val in row_dict[self.target_column] else 0)
prediction_relevance.append(1 if target_val in row_dict[self.target_column] else 0)
for target_val in row_dict[self.target_column]:
if target_val not in row_dict[self.prediction_column]:
ideal_relevance.append(1)
return (prediction_relevance, sorted(ideal_relevance, reverse=True))
def calculate_row_ndcg(self, row_dict: pd.core.series.Series, k: int) -> float:
if not self.convert_non_numeric:
dcg_vals = [
rel / math.log2(pos + 1)
for rel, pos in zip(row_dict[self.target_column], row_dict[self.prediction_column])
if pos <= k
]
idcg_vals = [
rel / math.log2(pos + 2)
for pos, rel in enumerate(sorted(row_dict[self.target_column], reverse=True)[:k])
]
else:
predicted_relevances, ideal_relevances = self.calc_non_numeric_relevance(row_dict)
dcg_vals = [(rel / math.log(i + 2, 2)) for i, rel in enumerate(predicted_relevances[:k])]
idcg_vals = [(rel / math.log(i + 2, 2)) for i, rel in enumerate(ideal_relevances[:k])]
if sum(idcg_vals) == 0:
return 1 # if there is no relevant data, not much the recommender can do
return sum(dcg_vals) / sum(idcg_vals)
def _calculate_average_precisions(
formatted_data: pd.core.frame.DataFrame,
target_column: str,
prediction_column: str,
convert_non_numeric: bool,
k: int,
) -> np.ndarray:
ki_dict: pd.DataFrame = None
last_item_relevant_dict: pd.DataFrame = None
row_metrics_functions = RowWiseMetrics(target_column, prediction_column, convert_non_numeric)
for ki in range(1, k + 1):
ki_result = (
formatted_data.apply(
row_metrics_functions.relevant_counter,
args=(ki,),
axis=1,
)
/ ki
)
last_item_result = formatted_data.apply(row_metrics_functions.is_k_item_relevant, args=(ki,), axis=1)
if ki == 1:
ki_dict = ki_result.to_frame()
ki_dict.columns = ["p@" + str(ki)]
last_item_relevant_dict = last_item_result.to_frame()
last_item_relevant_dict.columns = ["last_item_relevant@" + str(ki)]
else:
ki_dict["p@" + str(ki)] = ki_result
last_item_relevant_dict["last_item_relevant@" + str(ki)] = last_item_result
aps = np.multiply(ki_dict.values, last_item_relevant_dict.values)
nonzero_counts = np.count_nonzero(aps, axis=1)
nonzero_counts[nonzero_counts == 0] = 1
row_sums = aps.sum(axis=1)
averages = row_sums / nonzero_counts
return averages
def log_batch_ranking_metrics(
data: pd.core.frame.DataFrame,
prediction_column: Optional[str] = None,
target_column: Optional[str] = None,
score_column: Optional[str] = None,
k: Optional[int] = None,
convert_non_numeric=False,
schema: Union[DatasetSchema, None] = None,
log_full_data: bool = False,
) -> ViewResultSet:
"""Log ranking metrics for a batch of data.
Parameters
----------
data : pd.core.frame.DataFrame
Dataframe with the data to log.
prediction_column : Optional[str], optional
Column name for the predicted values. If not provided, the score_column and target_column must be provided, by default None
target_column : Optional[str], optional
Column name for the relevance scores. If not provided, relevance must be encoded within prediction column, by default None
score_column : Optional[str], optional
Column name for the scores. Can either be probabilities, confidence values, or other continuous measures.
If not passed, prediction_column must be passed,by default None
k : Optional[int], optional
Consider the top k ranks for metrics calculation.
If `None`, use all outputs, by default None
convert_non_numeric : bool, optional
Indicates whether prediction/target columns are non-numeric.
If True, prediction/target should be strings, by default False
schema : Union[DatasetSchema, None], optional
Defines the schema for tracking metrics in whylogs, by default None
log_full_data : bool, optional
Whether to log the complete dataframe or not.
If True, the complete DF will be logged in addition to the ranking metrics.
If False, only the calculated ranking metrics will be logged.
In a typical production use case, the ground truth might not be available
at the time the remaining data is generated. In order to prevent double profiling the
input features, consider leaving this as False. by default False
Returns
-------
ViewResultSet
Examples
--------
::
import pandas as pd
from whylogs.experimental.api.logger import log_batch_ranking_metrics
# 1st and 2nd recommended items are relevant - 3rd is not
df = pd.DataFrame({"targets": [[1, 0, 1]], "predictions": [[2,3,1]]})
results = log_batch_ranking_metrics(
data=df,
prediction_column="predictions",
target_column="targets",
k=3,
)
::
non_numerical_df = pd.DataFrame(
{
"raw_predictions": [
["cat", "pig", "elephant"],
["horse", "donkey", "robin"],
],
"raw_targets": [
["cat", "elephant"],
["dog"],
],
}
)
# 1st query:
# Recommended items: [cat, pig, elephant]
# Relevant items: [cat, elephant]
# 2nd query:
# Recommended items: [horse, donkey, robin]
# Relevant items: [dog]
results = log_batch_ranking_metrics(
k=2,
data=non_numerical_df,
prediction_column="raw_predictions",
target_column="raw_targets",
convert_non_numeric=True
)
::
binary_single_df = pd.DataFrame(
{
"raw_predictions": [
[True, False, True], # First recommended item: Relevant, Second: Not relevant, Third: Relevant
[False, False, False], # None of the recommended items are relevant
[True, True, False], # First and second recommended items are relevant
]
}
)
result = log_batch_ranking_metrics(data=binary_single_df, prediction_column="raw_predictions", k=3)
"""
formatted_data = data.copy(deep=True) # TODO: does this have to be deep?
if prediction_column is None:
if score_column is not None and target_column is not None:
prediction_column = "__predictions"
# Ties are not being handled here
formatted_data[prediction_column] = formatted_data[score_column].apply(
lambda row: list(np.argsort(np.argsort(-np.array(row))) + 1)
)
else:
raise ValueError("Either prediction_column or score+target columns must be specified")
relevant_cols = [prediction_column]
if target_column is None:
formatted_data = _convert_to_int_if_bool(formatted_data, prediction_column)
target_column = "__targets"
# the relevances in predicitons are moved to targets, and predicitons contains the indices to the target list
formatted_data[target_column] = formatted_data[prediction_column]
formatted_data[prediction_column] = formatted_data[target_column].apply(
lambda row: list(range(1, len(row) + 1))
)
relevant_cols.append(target_column)
if score_column is not None:
relevant_cols.append(score_column)
for col in relevant_cols:
if not formatted_data[col].apply(lambda x: type(x) == list).all():
# wrapping in lists because at least one isn't a list
# TODO: more error checking
formatted_data[col] = formatted_data[col].apply(lambda x: [x])
_max_k = formatted_data[prediction_column].apply(len).max()
if not k:
k = _max_k
if k > _max_k:
diagnostic_logger.warning(
f"Max value of k in the dataset is {_max_k}, but k was set to {k}. Setting k to {_max_k}"
)
k = _max_k
if k and k < 1:
raise ValueError("k must be a positive integer")
row_wise_functions = RowWiseMetrics(target_column, prediction_column, convert_non_numeric)
formatted_data["count_at_k"] = formatted_data.apply(row_wise_functions.relevant_counter, args=(k,), axis=1)
formatted_data["count_all"] = formatted_data.apply(row_wise_functions.relevant_counter, args=(_max_k,), axis=1)
formatted_data["top_rank"] = formatted_data[relevant_cols].apply(
row_wise_functions.get_top_rank, args=(_max_k,), axis=1
)
output_data = pd.DataFrame()
output_data[f"recall_k_{k}"] = formatted_data["count_at_k"] / formatted_data["count_all"]
output_data[f"precision_k_{k}"] = formatted_data["count_at_k"] / (k if k else 1)
output_data["top_rank"] = formatted_data["top_rank"]
output_data["average_precision" + ("_k_" + str(k) if k else "")] = _calculate_average_precisions(
formatted_data, target_column, prediction_column, convert_non_numeric=convert_non_numeric, k=k # type: ignore
)
output_data["norm_dis_cumul_gain" + ("_k_" + str(k) if k else "")] = formatted_data.apply(
row_wise_functions.calculate_row_ndcg, args=(k,), axis=1
)
output_data[f"sum_gain_k_{k}"] = formatted_data.apply(row_wise_functions.sum_gains, args=(k,), axis=1)
hit_ratio = formatted_data["count_at_k"].apply(lambda x: bool(x)).sum() / len(formatted_data)
mrr = (1 / formatted_data["top_rank"]).replace([np.inf, np.nan], 0)
output_data["reciprocal_rank"] = mrr
result = log(pandas=output_data, schema=schema)
result = result.merge(
log(
row={
"accuracy" + ("_k_" + str(k) if k else ""): hit_ratio,
},
schema=schema,
)
)
if log_full_data:
result = result.merge(log(pandas=data, schema=schema))
return result