src/triage/component/catwalk/evaluation.py

Summary

Maintainability
D
2 days
Test Coverage
import functools
import itertools
import verboselogs, logging

logger = verboselogs.VerboseLogger(__name__)
import math

import numpy as np
import ohio.ext.pandas
import pandas as pd
import statistics
import typing
from collections import defaultdict
from sqlalchemy.orm import sessionmaker

from aequitas.bias import Bias
from aequitas.fairness import Fairness
from aequitas.group import Group
from aequitas.preprocessing import preprocess_input_df

from . import metrics
from .utils import (
    db_retry,
    sort_predictions_and_labels,
    get_subset_table_name,
    filename_friendly_hash,
)
from triage.util.db import scoped_session
from triage.util.random import generate_python_random_seed
from triage.component.catwalk.storage import MatrixStore

RELATIVE_TOLERANCE = 0.01
SORT_TRIALS = 30


def subset_labels_and_predictions(
    subset_df,
    labels,
    predictions_proba,
    protected_df=None,
):
    """Reduce the labels and predictions to only those relevant to the current
       subset.

    Args:
        subset_df (pandas.DataFrame) A dataframe whose index is the entity_ids
            and as_of_dates in a subset
        labels (pandas.Series) A series of labels with entity_id and as_of_date
            as the index
        predictions_proba (np.array) An array of predictions for the same
            entity_date pairs as the labels and in the same order
        protected_df (pandas.DataFrame) A dataframe of protected group attributes

    Returns: (pandas.Series, np.array, pandas.DataFrame) The labels, predictions, and protected
        group attributes that refer to entity-date pairs in the subset
    """
    indexed_predictions = pd.Series(predictions_proba, index=labels.index)
    if protected_df is None:
        protected_df = pd.DataFrame()

    # The subset isn't specific to the cohort, so inner join to the labels/predictions
    labels_subset = labels.align(subset_df, join="inner")[0]
    predictions_subset = indexed_predictions.align(subset_df, join="inner")[0].values
    protected_df_subset = (
        protected_df
        if protected_df.empty
        else protected_df.align(subset_df, join="inner")[0]
    )
    logger.spam(
        f"{len(labels_subset)} entities in subset out of {len(labels)} in matrix.",
    )

    return (labels_subset, predictions_subset, protected_df_subset)


def query_subset_table(db_engine, as_of_dates, subset_table_name):
    """Queries the subset table to find the entities active at the given
       as_of_dates

    Args:
        db_engine (sqlalchemy.engine) a database engine
        as_of_dates (list) the as_of_Dates to query
        subset_table_name (str) the name of the table to query

    Returns: (pandas.DataFrame) a dataframe indexed by the entity-date pairs
        active in the subset
    """
    as_of_dates_sql = "[{}]".format(
        ", ".join(
            "'{}'".format(date.strftime("%Y-%m-%d %H:%M:%S.%f")) for date in as_of_dates
        )
    )
    query_string = f"""
        with dates as (
            select unnest(array{as_of_dates_sql}::timestamp[]) as as_of_date
        )
        select entity_id, as_of_date, active
        from {subset_table_name}
        join dates using(as_of_date)
    """
    df = pd.DataFrame.pg_copy_from(
        query_string,
        connectable=db_engine,
        parse_dates=["as_of_date"],
        index_col=MatrixStore.indices,
    )
    return df


def generate_binary_at_x(test_predictions, x_value, unit="top_n"):
    """Assign predicted classes based based on top% or absolute rank of score

    Args:
        test_predictions (np.array) A predictions, sorted by risk score descending
        x_value (int) The percentile or absolute value desired
        unit (string, default 'top_n') The thresholding method desired,
            either percentile or top_n

    Returns: (np.array) The predicted classes
    """
    len_predictions = len(test_predictions)
    if len_predictions == 0:
        return np.array([])
    if unit == "percentile":
        cutoff_index = int(len_predictions * (x_value / 100.00))
    else:
        cutoff_index = int(x_value)
    num_ones = cutoff_index if cutoff_index <= len_predictions else len_predictions
    num_zeroes = (
        len_predictions - cutoff_index if cutoff_index <= len_predictions else 0
    )
    test_predictions_binary = np.concatenate(
        (np.ones(num_ones, np.int8), np.zeros(num_zeroes, np.int8))
    )
    return test_predictions_binary


class MetricDefinition(typing.NamedTuple):
    """A single metric, bound to a particular threshold and parameter combination"""

    metric: str
    threshold_unit: str
    threshold_value: int
    parameter_combination: dict
    parameter_string: str


class MetricEvaluationResult(typing.NamedTuple):
    """A metric and parameter combination alongside preliminary results.

    The 'value' could represent the worst, best, or a random version of tiebreaking.
    """

    metric: str
    parameter: str
    value: float
    num_labeled_examples: int
    num_labeled_above_threshold: int
    num_positive_labels: int


class ModelEvaluator:
    """An object that can score models based on its known metrics"""

    # Available metric calculation functions

    # Each value is expected to be a function that takes in the following params
    # (predictions_proba, predictions_binary, labels, parameters)
    # and return a numeric score
    available_metrics = {
        "precision@": metrics.precision,
        "recall@": metrics.recall,
        "fbeta@": metrics.fbeta,
        "f1": metrics.f1,
        "accuracy": metrics.accuracy,
        "roc_auc": metrics.roc_auc,
        "average precision score": metrics.avg_precision,
        "true positives@": metrics.true_positives,
        "true negatives@": metrics.true_negatives,
        "false positives@": metrics.false_positives,
        "false negatives@": metrics.false_negatives,
        "fpr@": metrics.fpr,
    }

    def __init__(
        self,
        testing_metric_groups,
        training_metric_groups,
        db_engine,
        custom_metrics=None,
        bias_config=None,
    ):
        """
        Args:
            testing_metric_groups (list) A list of groups of metric/configurations
                to use for evaluating all given models

                Each entry is a dict, with a list of metrics, and potentially
                    thresholds and parameter lists. Each metric is expected to
                    be a key in self.available_metrics

                Examples:

                testing_metric_groups = [{
                    'metrics': ['precision@', 'recall@'],
                    'thresholds': {
                        'percentiles': [5.0, 10.0],
                        'top_n': [5, 10]
                    }
                }, {
                    'metrics': ['f1'],
                }, {
                    'metrics': ['fbeta@'],
                    'parameters': [{'beta': 0.75}, {'beta': 1.25}]
                }]
            training_metric_groups (list) metrics to be calculated on training set,
                in the same form as testing_metric_groups
            db_engine (sqlalchemy.engine)
            custom_metrics (dict) Functions to generate metrics
                not available by default
                Each function is expected take in the following params:
                (predictions_proba, predictions_binary, labels, parameters)
                and return a numeric score
        """
        self.testing_metric_groups = testing_metric_groups
        self.training_metric_groups = training_metric_groups
        self.db_engine = db_engine
        self.bias_config = bias_config
        if custom_metrics:
            self._validate_metrics(custom_metrics)
            self.available_metrics.update(custom_metrics)

    @property
    def sessionmaker(self):
        return sessionmaker(bind=self.db_engine)

    def _validate_metrics(self, custom_metrics):
        for name, met in custom_metrics.items():
            if not hasattr(met, "greater_is_better"):
                raise ValueError(
                    f"Custom metric {name} missing greater_is_better " f"attribute"
                )
            elif met.greater_is_better not in (True, False):
                raise ValueError(
                    "For custom metric {name} greater_is_better must be "
                    "boolean True or False"
                )

    def _build_parameter_string(
        self,
        threshold_unit,
        threshold_value,
        parameter_combination,
        threshold_specified_by_user,
    ):
        """Encode the metric parameters and threshold into a short, human-parseable string

        Examples are: '100_abs', '5_pct'

        Args:
            threshold_unit (string) the type of threshold, either 'percentile' or 'top_n'
            threshold_value (int) the numeric threshold,
            parameter_combination (dict) The non-threshold parameter keys and values used
                Usually this will be empty, but an example would be {'beta': 0.25}

        Returns: (string) A short, human-parseable string
        """
        full_params = parameter_combination.copy()
        if threshold_specified_by_user:
            short_threshold_unit = "pct" if threshold_unit == "percentile" else "abs"
            full_params[short_threshold_unit] = threshold_value
        parameter_string = "/".join(
            ["{}_{}".format(val, key) for key, val in full_params.items()]
        )
        return parameter_string

    def _filter_nan_labels(self, predicted_classes: np.array, labels: np.array):
        """Filter missing labels and their corresponding predictions

        Args:
            predicted_classes (list) Predicted binary classes, of same length as labels
            labels (list) Labels, maybe containing NaNs

        Returns: (tuple) Copies of the input lists, with NaN labels removed
        """
        nan_mask = np.isfinite(labels)
        return (predicted_classes[nan_mask], labels[nan_mask])

    def _flatten_metric_threshold(
        self,
        metrics,
        parameters,
        threshold_unit,
        threshold_value,
        threshold_specified_by_user=True,
    ):
        """Flatten lists of metrics and parameters for an individual threshold
        into individual metric definitions.

        Args:
            metrics (list) names of metric to compute
            parameters (list) dicts holding parameters to pass to metrics
            threshold_unit (string) the type of threshold, either 'percentile' or 'top_n'
            threshold_value (int) the numeric threshold,
            threshold_specified_by_user (bool) Whether or not there was any threshold
                specified by the user. Defaults to True

        Returns: (list) MetricDefinition objects
        Raises: UnknownMetricError if a given metric is not present in
            self.available_metrics
        """

        metric_definitions = []
        for metric in metrics:
            if metric not in self.available_metrics:
                raise metrics.UnknownMetricError()

            for parameter_combination in parameters:
                # convert the thresholds/parameters into something
                # more readable
                parameter_string = self._build_parameter_string(
                    threshold_unit=threshold_unit,
                    threshold_value=threshold_value,
                    parameter_combination=parameter_combination,
                    threshold_specified_by_user=threshold_specified_by_user,
                )

                result = MetricDefinition(
                    metric=metric,
                    parameter_string=parameter_string,
                    parameter_combination=parameter_combination,
                    threshold_unit=threshold_unit,
                    threshold_value=threshold_value,
                )
                metric_definitions.append(result)
        return metric_definitions

    def _flatten_metric_config_group(self, group):
        """Flatten lists of metrics, parameters, and thresholds into individual metric definitions

        Args:
            group (dict) A configuration dictionary for the group.
                Should contain the key 'metrics', and optionally 'parameters' or 'thresholds'
        Returns: (list) MetricDefinition objects
        """
        logger.debug(f"Creating evaluations for metric group {group}")
        parameters = group.get("parameters", [{}])
        generate_metrics = functools.partial(
            self._flatten_metric_threshold,
            metrics=group["metrics"],
            parameters=parameters,
        )
        metrics = []
        if "thresholds" not in group:
            logger.notice(
                "Not a thresholded group, generating evaluation based on all predictions"
            )
            metrics = metrics + generate_metrics(
                threshold_unit="percentile",
                threshold_value=100,
                threshold_specified_by_user=False,
            )

        for pct_thresh in group.get("thresholds", {}).get("percentiles", []):
            logger.debug(f"Processing percent threshold {pct_thresh}")
            metrics = metrics + generate_metrics(
                threshold_unit="percentile", threshold_value=pct_thresh
            )

        for abs_thresh in group.get("thresholds", {}).get("top_n", []):
            logger.debug(f"Processing absolute threshold {abs_thresh}")
            metrics = metrics + generate_metrics(
                threshold_unit="top_n", threshold_value=abs_thresh
            )
        return metrics

    def _flatten_metric_config_groups(self, metric_config_groups):
        """Flatten lists of metrics, parameters, and thresholds into individual metric definitions

        Args:
            metric_config_groups (list) A list of metric group configuration dictionaries
                Each dict should contain the key 'metrics', and optionally 'parameters' or 'thresholds'
        Returns:
            (list) MetricDefinition objects
        """
        return [
            item
            for group in metric_config_groups
            for item in self._flatten_metric_config_group(group)
        ]

    def metric_definitions_from_matrix_type(self, matrix_type):
        """Retrieve the correct metric config groups for the matrix type and flatten them into metric definitions

        Args:
            matrix_type (catwalk.storage.MatrixType) A matrix type definition

        Returns:
            (list) MetricDefinition objects
        """
        if matrix_type.is_test:
            return self._flatten_metric_config_groups(self.testing_metric_groups)
        else:
            return self._flatten_metric_config_groups(self.training_metric_groups)

    def needs_evaluations(self, matrix_store, model_id, subset_hash=""):
        """Returns whether or not all the configured metrics are present in the
        database for the given matrix and model.
        Args:
            matrix_store (triage.component.catwalk.storage.MatrixStore)
            model_id (int) A model id
            subset_hash (str) An identifier for the subset to be evaluated

        Returns:
            (bool) whether or not this matrix and model are missing any evaluations in the db
        """

        # assemble a list of evaluation objects from the config
        # by running the evaluation code with an empty list of predictions and labels
        eval_obj = matrix_store.matrix_type.evaluation_obj
        matrix_type = matrix_store.matrix_type
        metric_definitions = self.metric_definitions_from_matrix_type(matrix_type)

        # assemble a list of evaluation objects from the database
        # by querying the unique metrics and parameters relevant to the passed-in matrix
        session = self.sessionmaker()

        evaluation_objects_in_db = (
            session.query(eval_obj)
            .filter_by(
                model_id=model_id,
                evaluation_start_time=matrix_store.as_of_dates[0],
                evaluation_end_time=matrix_store.as_of_dates[-1],
                as_of_date_frequency=matrix_store.metadata["as_of_date_frequency"],
                subset_hash=subset_hash,
            )
            .distinct(eval_obj.metric, eval_obj.parameter)
            .all()
        )

        # The list of needed metrics and parameters are all the unique metric/params from the config
        # not present in the unique metric/params from the db

        evals_needed = bool(
            {(met.metric, met.parameter_string) for met in metric_definitions}
            - {(obj.metric, obj.parameter) for obj in evaluation_objects_in_db}
        )
        session.close()
        if evals_needed:
            logger.notice(
                f"Needed evaluations for model {model_id} on matrix {matrix_store.uuid} are missing"
            )
            return True

        # now check bias config if there
        # if no bias config, no aequitas audits needed, so just return False at this point
        if not self.bias_config:
            logger.notice(f"No aequitas audit configured, so no evaluation needed")
            return False

        # if we do have bias config, return True. Too complicated with aequitas' visibility
        # at present to check whether all the needed records are needed.
        return True

    def _compute_evaluations(self, predictions_proba, labels, metric_definitions):
        """Compute evaluations for a set of predictions and labels

        Args:
            predictions_proba (np.array) predictions, sorted by score descending
            labels (np.array) labels, sorted however the caller wishes to break ties
            metric_definitions (list of MetricDefinition objects) metrics to compute

        Returns: (list of MetricEvaluationResult objects) One result for each metric definition
        """
        evals = []
        for (
            (threshold_unit, threshold_value),
            metrics_for_threshold,
        ) in itertools.groupby(
            metric_definitions, lambda m: (m.threshold_unit, m.threshold_value)
        ):
            predicted_classes = generate_binary_at_x(
                predictions_proba, threshold_value, unit=threshold_unit
            )
            # filter out null labels
            predicted_classes_with_labels, present_labels = self._filter_nan_labels(
                predicted_classes, labels
            )
            num_labeled_examples = len(present_labels)
            num_labeled_above_threshold = np.count_nonzero(
                predicted_classes_with_labels
            )
            num_positive_labels = np.count_nonzero(present_labels)
            for metric_def in metrics_for_threshold:
                # using threshold configuration, convert probabilities to predicted classes
                if len(predictions_proba) == 0:
                    logger.warning(
                        f"{metric_def.metric} not defined for parameter {metric_def.parameter_combination} because no entities "
                        "are in the subset for this matrix. Inserting NULL for value."
                    )
                    value = None
                else:
                    try:
                        value = self.available_metrics[metric_def.metric](
                            predictions_proba,
                            predicted_classes_with_labels,
                            present_labels,
                            metric_def.parameter_combination,
                        )

                    except ValueError:
                        logger.warning(
                            f"{metric_def.metric} not defined for parameter {metric_def.parameter_combination} because all labels "
                            "are the same. Inserting NULL for value."
                        )
                        value = None

                result = MetricEvaluationResult(
                    metric=metric_def.metric,
                    parameter=metric_def.parameter_string,
                    value=value,
                    num_labeled_examples=num_labeled_examples,
                    num_labeled_above_threshold=num_labeled_above_threshold,
                    num_positive_labels=num_positive_labels,
                )
                evals.append(result)
        return evals

    def evaluate(
        self, predictions_proba, matrix_store, model_id, protected_df=None, subset=None
    ):
        """Evaluate a model based on predictions, and save the results

        Args:
            predictions_proba (np.array) List of prediction probabilities
            matrix_store (catwalk.storage.MatrixStore) a wrapper for the
                prediction matrix and metadata
            model_id (int) The database identifier of the model
            subset (dict) A dictionary containing a query and a
                name for the subset to evaluate on, if any
            protected_df (pandas.DataFrame) A dataframe with protected group attributes
        """
        # If we are evaluating on a subset, we want to get just the labels and
        # predictions for the included entity-date pairs
        if subset:
            logger.verbose(
                f"Subsetting labels and predictions of model {model_id} on matrix {matrix_store.uuid}"
            )
            labels, predictions_proba, protected_df = subset_labels_and_predictions(
                subset_df=query_subset_table(
                    self.db_engine,
                    matrix_store.as_of_dates,
                    get_subset_table_name(subset),
                ),
                predictions_proba=predictions_proba,
                labels=matrix_store.labels,
                protected_df=protected_df,
            )
            subset_hash = filename_friendly_hash(subset)
        else:
            logger.debug(
                f"Using all the predictions of model {model_id} on matrix {matrix_store.uuid} for evaluation (i.e. no subset)"
            )
            labels = matrix_store.labels
            subset_hash = ""

        # confirm protected_df and labels have same set and count of values
        if (protected_df is not None) and (not protected_df.empty):
            if (protected_df.index.shape != labels.index.shape) or (
                not protected_df.index.symmetric_difference(labels.index).empty
            ):
                raise ValueError("Mismatch between protected_df and labels indices")

        df_index = labels.index
        labels = np.array(labels)

        matrix_type = matrix_store.matrix_type
        metric_defs = self.metric_definitions_from_matrix_type(matrix_type)

        logger.spam(f"Found {len(metric_defs)} metric definitions total")

        # 1. get worst sorting
        (
            predictions_proba_worst,
            labels_worst,
            df_index_worst,
        ) = sort_predictions_and_labels(
            predictions_proba=predictions_proba,
            labels=labels,
            df_index=df_index,
            tiebreaker="worst",
        )
        worst_lookup = {
            (eval.metric, eval.parameter): eval
            for eval in self._compute_evaluations(
                predictions_proba_worst, labels_worst, metric_defs
            )
        }
        logger.debug(
            f"Predictions from {model_id} sorted by worst case scenario, i.e. all negative and NULL labels first"
        )

        # 2. get best sorting
        (
            predictions_proba_best,
            labels_best,
            df_index_best,
        ) = sort_predictions_and_labels(
            predictions_proba=predictions_proba_worst,
            labels=labels_worst,
            df_index=df_index_worst,
            tiebreaker="best",
        )
        best_lookup = {
            (eval.metric, eval.parameter): eval
            for eval in self._compute_evaluations(
                predictions_proba_best, labels_best, metric_defs
            )
        }
        logger.debug(
            f"Predictions from {model_id} sorted by best case scenario, i.e. all positive labels first, NULL labels at the end"
        )

        evals_without_trials = dict()

        # 3. figure out which metrics have too far of a distance between best and worst
        # and need random trials
        metric_defs_to_trial = []
        for metric_def in metric_defs:
            worst_eval = worst_lookup[(metric_def.metric, metric_def.parameter_string)]
            best_eval = best_lookup[(metric_def.metric, metric_def.parameter_string)]
            if (
                worst_eval.value is None
                or best_eval.value is None
                or math.isclose(
                    worst_eval.value, best_eval.value, rel_tol=RELATIVE_TOLERANCE
                )
            ):
                evals_without_trials[
                    (worst_eval.metric, worst_eval.parameter)
                ] = worst_eval.value
            else:
                metric_defs_to_trial.append(metric_def)

        # 4. get average of n random trials
        logger.debug(
            f"For model {model_id}, {len(metric_defs_to_trial)} metric definitions need {SORT_TRIALS} random trials each as best/worst evals were different"
        )

        random_eval_accumulator = defaultdict(list)
        for _ in range(0, SORT_TRIALS):
            sort_seed = generate_python_random_seed()
            (
                predictions_proba_random,
                labels_random,
                df_index_random,
            ) = sort_predictions_and_labels(
                predictions_proba=predictions_proba_worst,
                labels=labels_worst,
                df_index=df_index_worst,
                tiebreaker="random",
                sort_seed=sort_seed,
            )
            for random_eval in self._compute_evaluations(
                predictions_proba_random, labels_random, metric_defs_to_trial
            ):
                random_eval_accumulator[
                    (random_eval.metric, random_eval.parameter)
                ].append(random_eval.value)

        # 5. flatten best, worst, stochastic results for each metric definition
        # into database records
        evaluation_start_time = matrix_store.as_of_dates[0]
        evaluation_end_time = matrix_store.as_of_dates[-1]
        as_of_date_frequency = matrix_store.metadata["as_of_date_frequency"]
        matrix_uuid = matrix_store.uuid
        evaluations = []
        for metric_def in metric_defs:
            metric_key = (metric_def.metric, metric_def.parameter_string)
            if metric_key in evals_without_trials:
                stochastic_value = evals_without_trials[metric_key]
                standard_deviation = 0
                num_sort_trials = 0
            else:
                trial_results = [
                    value
                    for value in random_eval_accumulator[metric_key]
                    if value is not None
                ]
                stochastic_value = statistics.mean(trial_results)
                try:
                    standard_deviation = statistics.stdev(trial_results)
                except ValueError:
                    logger.warning(
                        f"{metric_def.metric} not defined for parameter {metric_def.parameter_combination} because all values "
                        "are NaN. Inserting NULL for value."
                    )
                    standard_deviation = None

                num_sort_trials = len(trial_results)

            evaluation = matrix_type.evaluation_obj(
                metric=metric_def.metric,
                parameter=metric_def.parameter_string,
                num_labeled_examples=worst_lookup[metric_key].num_labeled_examples,
                num_labeled_above_threshold=worst_lookup[
                    metric_key
                ].num_labeled_above_threshold,
                num_positive_labels=worst_lookup[metric_key].num_positive_labels,
                worst_value=worst_lookup[metric_key].value,
                best_value=best_lookup[metric_key].value,
                stochastic_value=stochastic_value,
                num_sort_trials=num_sort_trials,
                standard_deviation=standard_deviation,
            )
            evaluations.append(evaluation)

        self._write_to_db(
            model_id,
            subset_hash,
            evaluation_start_time,
            evaluation_end_time,
            as_of_date_frequency,
            matrix_store.uuid,
            evaluations,
            matrix_type.evaluation_obj,
        )
        if protected_df is not None:
            self._write_audit_to_db(
                model_id=model_id,
                protected_df=protected_df.reindex(df_index_worst),
                predictions_proba=predictions_proba_worst,
                labels=labels_worst,
                tie_breaker="worst",
                subset_hash=subset_hash,
                matrix_type=matrix_type,
                evaluation_start_time=evaluation_start_time,
                evaluation_end_time=evaluation_end_time,
                matrix_uuid=matrix_store.uuid,
            )
            self._write_audit_to_db(
                model_id=model_id,
                protected_df=protected_df.reindex(df_index_best),
                predictions_proba=predictions_proba_best,
                labels=labels_best,
                tie_breaker="best",
                subset_hash=subset_hash,
                matrix_type=matrix_type,
                evaluation_start_time=evaluation_start_time,
                evaluation_end_time=evaluation_end_time,
                matrix_uuid=matrix_store.uuid,
            )

    def _write_audit_to_db(
        self,
        model_id,
        protected_df,
        predictions_proba,
        labels,
        tie_breaker,
        subset_hash,
        matrix_type,
        evaluation_start_time,
        evaluation_end_time,
        matrix_uuid,
    ):
        """
        Runs the bias audit and saves the result in the bias table.

        Args:
            model_id (int) primary key of the model
            protected_df (pandas.DataFrame) A dataframe with protected group attributes:
            predictions_proba (np.array) List of prediction probabilities
            labels (pandas.Series): List of labels
            tie_breaker: 'best' or 'worst' case tiebreaking rule that the predictions and labels were sorted by
            subset_hash (str) the hash of the subset, if any, that the
                evaluation is made on
            matrix_type (triage.component.catwalk.storage.MatrixType)
                The type of matrix used
            evaluation_start_time (pandas._libs.tslibs.timestamps.Timestamp)
                first as_of_date included in the evaluation period
            evaluation_end_time (pandas._libs.tslibs.timestamps.Timestamp) last
                as_of_date included in the evaluation period
            matrix_uuid: the uuid of the matrix
        Returns:

        """
        if protected_df.empty:
            return

        # to preprocess aequitas requires the following columns:
        # score, label value, model_id, protected attributes
        # fill out the protected_df, which just has protected attributes at this point
        protected_df = protected_df.copy()
        protected_df["model_id"] = model_id
        protected_df["score"] = predictions_proba
        protected_df["label_value"] = labels
        aequitas_df, attr_cols_input = preprocess_input_df(protected_df)

        # create group crosstabs
        g = Group()
        score_thresholds = {}
        score_thresholds["rank_abs"] = self.bias_config["thresholds"].get("top_n", [])
        # convert 0-100 percentile to 0-1 that Aequitas expects
        score_thresholds["rank_pct"] = [
            value / 100.0
            for value in self.bias_config["thresholds"].get("percentiles", [])
        ]
        groups_model, attr_cols = g.get_crosstabs(
            aequitas_df, score_thresholds=score_thresholds, attr_cols=attr_cols_input
        )
        # analyze bias from reference groups
        bias = Bias()
        ref_groups_method = self.bias_config.get("ref_groups_method", None)
        if ref_groups_method == "predefined" and self.bias_config["ref_groups"]:
            bias_df = bias.get_disparity_predefined_groups(
                groups_model, aequitas_df, self.bias_config["ref_groups"]
            )
        elif ref_groups_method == "majority":
            bias_df = bias.get_disparity_major_group(groups_model, aequitas_df)
        else:
            bias_df = bias.get_disparity_min_metric(groups_model, aequitas_df)

        # analyze fairness for each group
        f = Fairness(tau=0.8)  # the default fairness threshold is 0.8
        group_value_df = f.get_group_value_fairness(bias_df)
        group_value_df["subset_hash"] = subset_hash
        group_value_df["tie_breaker"] = tie_breaker
        group_value_df["evaluation_start_time"] = evaluation_start_time
        group_value_df["evaluation_end_time"] = evaluation_end_time
        group_value_df["matrix_uuid"] = matrix_uuid
        group_value_df = group_value_df.rename(
            index=str, columns={"score_threshold": "parameter", "for": "for_"}
        )
        if group_value_df.empty:
            raise ValueError(
                f"""
            Bias audit: aequitas_audit() failed.
            Returned empty dataframe for model_id = {model_id}, and subset_hash = {subset_hash}
            and matrix_type = {matrix_type}"""
            )
        with scoped_session(self.db_engine) as session:
            for index, row in group_value_df.iterrows():
                session.query(matrix_type.aequitas_obj).filter_by(
                    model_id=row["model_id"],
                    evaluation_start_time=row["evaluation_start_time"],
                    evaluation_end_time=row["evaluation_end_time"],
                    subset_hash=row["subset_hash"],
                    parameter=row["parameter"],
                    tie_breaker=row["tie_breaker"],
                    matrix_uuid=row["matrix_uuid"],
                    attribute_name=row["attribute_name"],
                    attribute_value=row["attribute_value"],
                ).delete()
            session.bulk_insert_mappings(
                matrix_type.aequitas_obj, group_value_df.to_dict(orient="records")
            )

    @db_retry
    def _write_to_db(
        self,
        model_id,
        subset_hash,
        evaluation_start_time,
        evaluation_end_time,
        as_of_date_frequency,
        matrix_uuid,
        evaluations,
        evaluation_table_obj,
    ):
        """Write evaluation objects to the database
        Binds the model_id as as_of_date to the given ORM objects
        and writes them to the database
        Args:
            model_id (int) primary key of the model
            subset_hash (str) the hash of the subset, if any, that the
                evaluation is made on
            evaluation_start_time (pandas._libs.tslibs.timestamps.Timestamp)
                first as_of_date included in the evaluation period
            evaluation_end_time (pandas._libs.tslibs.timestamps.Timestamp) last
                as_of_date included in the evaluation period
            as_of_date_frequency (str) the frequency with which as_of_dates
                occur between the evaluation_start_time and evaluation_end_time
            evaluations (list) results_schema.TestEvaluation or TrainEvaluation
                objects
            evaluation_table_obj (schema.TestEvaluation or TrainEvaluation)
                specifies to which table to add the evaluations
        """
        with scoped_session(self.db_engine) as session:
            session.query(evaluation_table_obj).filter_by(
                model_id=model_id,
                evaluation_start_time=evaluation_start_time,
                evaluation_end_time=evaluation_end_time,
                as_of_date_frequency=as_of_date_frequency,
                subset_hash=subset_hash,
            ).delete()

            for evaluation in evaluations:
                evaluation.model_id = model_id
                evaluation.as_of_date_frequency = as_of_date_frequency
                evaluation.subset_hash = subset_hash
                evaluation.evaluation_start_time = evaluation_start_time
                evaluation.evaluation_end_time = evaluation_end_time
                evaluation.as_of_date_frequency = as_of_date_frequency
                evaluation.matrix_uuid = matrix_uuid
                evaluation.subset_hash = subset_hash
                session.add(evaluation)