src/triage/component/audition/selection_rules.py
import inspect
import verboselogs, logging
logger = verboselogs.VerboseLogger(__name__)
from numpy import exp, log, average
from .metric_directionality import greater_is_better, best_in_series, idxbest
def random_model_group(df, train_end_time, n=1):
"""Pick a random model group (as a baseline)
Arguments:
train_end_time (Timestamp): current train end time
df (pandas.DataFrame): dataframe containing the columns
model_group_id,
train_end_time,
metric,
parameter,
raw_value,
below_best
n (int): the number of model group ids to return
Returns: (int) the model group id to select, with highest current raw metric value
"""
return df["model_group_id"].drop_duplicates().sample(frac=1).tolist()[:n]
def _mg_best_avg_by(df, value_col, metric, n=1):
"""Best model group in dataframe by average of some column
Args:
df (pandas.DataFrame)
value_col (str): The column which contains the value to be averaged
metric (str): the name of the column
n (int): the number of model group ids to return
"""
if n == 1:
return [
getattr(
df.groupby(["model_group_id"])[value_col].mean().sample(frac=1),
idxbest(metric),
)()
]
else:
if greater_is_better(metric):
return (
df.groupby(["model_group_id"])[value_col]
.mean()
.nlargest(n)
.index.tolist()
)
else:
return (
df.groupby(["model_group_id"])[value_col]
.mean()
.nsmallest(n)
.index.tolist()
)
def best_current_value(df, train_end_time, metric, parameter, n=1):
"""Pick the model group with the best current metric value
Arguments:
metric (string): model evaluation metric, such as 'precision@'
parameter (string): model evaluation metric parameter,
such as '300_abs'
train_end_time (Timestamp): current train end time
df (pandas.DataFrame): dataframe containing the columns:
model_group_id,
train_end_time,
metric,
parameter,
raw_value,
dist_from_best_case
n (int): the number of model group ids to return
Returns: (int) the model group id to select, with highest current raw metric value
"""
curr_df = df.loc[
(df["train_end_time"] == train_end_time)
& (df["metric"] == metric)
& (df["parameter"] == parameter)
]
# sample(frac=1) to shuffle rows so we don't accidentally introduce bias in breaking ties
best_raw_value = getattr(curr_df["raw_value"], best_in_series(metric))()
if n <= 1:
return (
curr_df.loc[curr_df["raw_value"] == best_raw_value, "model_group_id"]
.sample(frac=1)
.tolist()
)
else:
if greater_is_better(metric):
result = curr_df.nlargest(n, "raw_value")["model_group_id"].tolist()
return result
else:
result = curr_df.nsmallest(n, "raw_value")["model_group_id"].tolist()
return result
def best_average_value(df, train_end_time, metric, parameter, n=1):
"""Pick the model with the highest average metric value so far
Arguments:
metric (string): model evaluation metric, such as 'precision@'
parameter (string): model evaluation metric parameter,
such as '300_abs'
train_end_time (Timestamp): current train end time
df (pandas.DataFrame): dataframe containing the columns
model_group_id,
train_end_time,
metric,
parameter,
raw_value,
dist_from_best_case
n (int): the number of model group ids to return
Returns: (int) the model group id to select, with highest mean raw metric value
"""
met_df = df.loc[(df["metric"] == metric) & (df["parameter"] == parameter)]
return _mg_best_avg_by(met_df, "raw_value", metric, n)
def lowest_metric_variance(df, train_end_time, metric, parameter, n=1):
"""Pick the model with the lowest metric variance so far
Arguments:
metric (string): model evaluation metric, such as 'precision@'
parameter (string): model evaluation metric parameter,
such as '300_abs'
train_end_time (Timestamp): current train end time
df (pandas.DataFrame): dataframe containing the columns
model_group_id,
train_end_time,
metric,
parameter,
raw_value,
below_best
n (int): the number of model group ids to return
Returns: (int) the model group id to select, with highest mean raw metric value
"""
met_df = (
df.loc[(df["metric"] == metric) & (df["parameter"] == parameter)]
.groupby(["model_group_id"])["raw_value"]
.std()
)
if met_df.isnull().sum() == met_df.shape[0]:
# variance will be undefined in first time window since we only have one obseravtion
# per model group
logger.debug(
"Null metric variances for %s %s at %s; picking at random",
metric,
parameter,
train_end_time,
)
return df["model_group_id"].drop_duplicates().sample(n=n).tolist()
elif met_df.isnull().sum() > 0:
# the variances should be all null or no nulls, a mix shouldn't be possible
# since we should have the same number of observations for every model group
raise ValueError(
"Mix of null and non-null metric variances for or {} {} at {}".format(
metric, parameter, train_end_time
)
)
if n == 1:
# sample(frac=1) to shuffle rows so we don't accidentally introduce bias in breaking ties
return [met_df.sample(frac=1).idxmin()]
else:
return met_df.nsmallest(n).index.tolist()
def most_frequent_best_dist(
df, train_end_time, metric, parameter, dist_from_best_case, n=1
):
"""Pick the model that is most frequently within `dist_from_best_case` from the
best-performing model group across test sets so far
Arguments:
dist_from_best_case (float): distance from the best performing model
metric (string): model evaluation metric, such as 'precision@'
parameter (string): model evaluation metric parameter,
such as '300_abs'
train_end_time (Timestamp): current train end time
df (pandas.DataFrame): dataframe containing the columns
model_group_id,
train_end_time,
metric,
parameter,
raw_value,
below_best
n (int): the number of model group ids to return
Returns: (int) the model group id to select, with highest mean raw metric value
"""
met_df = df.loc[(df["metric"] == metric) & (df["parameter"] == parameter)]
met_df["within_dist"] = (df["dist_from_best_case"] <= dist_from_best_case).astype(
"int"
)
if n == 1:
# sample(frac=1) to shuffle rows so we don't accidentally introduce bias in breaking ties
return [
met_df.groupby(["model_group_id"])["within_dist"]
.mean()
.sample(frac=1)
.idxmax()
]
else:
return (
met_df.groupby(["model_group_id"])["within_dist"]
.mean()
.nlargest(n)
.index.tolist()
)
def best_average_two_metrics(
df,
train_end_time,
metric1,
parameter1,
metric2,
parameter2,
metric1_weight=0.5,
n=1,
):
"""Pick the model with the highest average combined value to date
of two metrics weighted together using `metric1_weight`
Arguments:
metric1_weight (float): relative weight of metric1, between 0 and 1
metric1 (string): model evaluation metric, such as 'precision@'
parameter1 (string): model evaluation metric parameter,
such as '300_abs'
metric2 (string): model evaluation metric, such as 'precision@'
parameter2 (string): model evaluation metric parameter,
such as '300_abs'
train_end_time (Timestamp): current train end time
df (pandas.DataFrame): dataframe containing the columns
model_group_id,
train_end_time,
metric,
parameter,
raw_value,
below_best
n (int): the number of model group ids to return
Returns: (int) the model group id to select, with highest mean raw metric value
"""
if metric1_weight < 0 or metric1_weight > 1:
raise ValueError("Metric weight must be between 0 and 1")
metric1_dir = greater_is_better(metric1)
metric2_dir = greater_is_better(metric2)
if metric1_dir != metric2_dir:
raise ValueError("Metric directionalities must be the same")
met_df = df.loc[
((df["metric"] == metric1) & (df["parameter"] == parameter1))
| ((df["metric"] == metric2) & (df["parameter"] == parameter2))
]
met_df.loc[
(met_df["metric"] == metric1) & (met_df["parameter"] == parameter1),
"weighted_raw",
] = (
met_df.loc[
(met_df["metric"] == metric1) & (met_df["parameter"] == parameter1),
"raw_value",
]
* metric1_weight
)
met_df.loc[
(met_df["metric"] == metric2) & (met_df["parameter"] == parameter2),
"weighted_raw",
] = met_df.loc[
(met_df["metric"] == metric2) & (met_df["parameter"] == parameter2), "raw_value"
] * (
1.0 - metric1_weight
)
met_df_wt = met_df.groupby(
["model_group_id", "train_end_time"], as_index=False
).sum()
# sample(frac=1) to shuffle rows so we don't accidentally introduce bias in breaking ties
return _mg_best_avg_by(met_df_wt, "weighted_raw", metric1, n)
def best_avg_var_penalized(df, train_end_time, metric, parameter, stdev_penalty, n=1):
"""Pick the model with the highest
average metric value so far, placing less weight in older
results. You need to specify two parameters: the shape of how the
weight affects points (decay_type, linear or exponential) and the relative
weight of the most recent point (curr_weight).
Arguments:
stdev_penalty (float): penalty for instability
metric (string): model evaluation metric, such as 'precision@'
parameter (string): model evaluation metric parameter,
such as '300_abs'
train_end_time (Timestamp): current train end time
df (pandas.DataFrame): dataframe containing the columns
model_group_id,
train_end_time,
metric,
parameter,
raw_value,
below_best
n (int): the number of model group ids to return
Returns: (int) the model group id to select, with highest mean raw metric value
"""
# for metrics where smaller values are better, the penalty for instability should
# add to the mean, so introduce a factor of -1
stdev_penalty = stdev_penalty if greater_is_better(metric) else -1.0 * stdev_penalty
met_df = df.loc[(df["metric"] == metric) & (df["parameter"] == parameter)]
met_df_grp = met_df.groupby(["model_group_id"]).aggregate(
{"raw_value": ["mean", "std"]}
)
met_df_grp.columns = met_df_grp.columns.droplevel(0)
met_df_grp.columns = ["raw_avg", "raw_stdev"]
if met_df_grp["raw_stdev"].isnull().sum() == met_df_grp.shape[0]:
# variance will be undefined in first time window since we only have one obseravtion
# per model group
logger.debug(
"Null metric variances for %s %s at %s; just using mean",
metric,
parameter,
train_end_time,
)
return [getattr(met_df_grp["raw_avg"].sample(frac=1), idxbest(metric))()]
elif met_df_grp["raw_stdev"].isnull().sum() > 0:
# the variances should be all null or no nulls, a mix shouldn't be possible
# since we should have the same number of observations for every model group
raise ValueError(
"Mix of null and non-null metric variances for or {} {} at {}".format(
metric, parameter, train_end_time
)
)
min_stdev = met_df_grp["raw_stdev"].min()
met_df_grp["penalized_avg"] = met_df_grp["raw_avg"] - stdev_penalty * (
met_df_grp["raw_stdev"] - min_stdev
)
if n == 1:
# sample(frac=1) to shuffle rows so we don't accidentally introduce bias in breaking ties
return [getattr(met_df_grp["penalized_avg"].sample(frac=1), idxbest(metric))()]
else:
if greater_is_better(metric):
return met_df_grp["penalized_avg"].nlargest(n).index.tolist()
else:
return met_df_grp["penalized_avg"].nsmallest(n).index.tolist()
def best_avg_recency_weight(
df, train_end_time, metric, parameter, curr_weight, decay_type, n=1
):
"""Pick the model with the highest average metric value so far, penalized
for relative variance as:
avg_value - (stdev_penalty) * (stdev - min_stdev)
where min_stdev is the minimum standard deviation of the metric across all
model groups
Arguments:
decay_type (string): either 'linear' or 'exponential'; the shape of
how the weights fall off between the current and first point
curr_weight (float): amount of weight to put on the most recent point,
relative to the first point (e.g., a value of 5.0 would mean the
current data is weighted 5 times as much as the first one)
metric (string): model evaluation metric, such as 'precision@'
parameter (string): model evaluation metric parameter,
such as '300_abs'
train_end_time (Timestamp): current train end time
df (pandas.DataFrame): dataframe containing the columns
model_group_id,
train_end_time,
metric,
parameter,
raw_value,
below_best
n (int): the number of model group ids to return
Returns: (int) the model group id to select, with highest mean raw metric value
"""
# curr_weight is amount of weight to put on current point, relative to the first point
# (e.g., if the first point has a weight of 1.0)
# decay type is linear or exponetial
first_date = df["train_end_time"].min()
df["days_out"] = (df["train_end_time"] - first_date).apply(lambda x: float(x.days))
tmax = df["days_out"].max()
if tmax == 0:
# only one date (must be on first time point), so everything gets a weight of 1
df["weight"] = 1.0
elif decay_type == "linear":
# weight = (curr_weight - 1.0) * (t/tmax) + 1.0
df["weight"] = (curr_weight - 1.0) * (df["days_out"] / tmax) + 1.0
elif decay_type == "exponential":
# weight = exp(ln(curr_weight)*t/tmax)
df["weight"] = exp(log(curr_weight) * df["days_out"] / tmax)
else:
raise ValueError("Must specify linear or exponential decay type")
def wm(x):
return average(x, weights=df.loc[x.index, "weight"])
met_df = df.loc[(df["metric"] == metric) & (df["parameter"] == parameter)]
if n == 1:
# sample(frac=1) to shuffle rows so we don't accidentally introduce bias in breaking ties
result = getattr(
met_df.groupby(["model_group_id"])
.aggregate({"raw_value": wm})
.sample(frac=1),
idxbest(metric),
)()
return result.tolist()
else:
met_df_grp = met_df.groupby(["model_group_id"]).aggregate({"raw_value": wm})
if greater_is_better(metric):
return met_df_grp["raw_value"].nlargest(n).index.tolist()
else:
return met_df_grp["raw_value"].nsmallest(n).index.tolist()
SELECTION_RULES = {
"random_model_group": random_model_group,
"best_current_value": best_current_value,
"best_average_value": best_average_value,
"lowest_metric_variance": lowest_metric_variance,
"most_frequent_best_dist": most_frequent_best_dist,
"best_average_two_metrics": best_average_two_metrics,
"best_avg_var_penalized": best_avg_var_penalized,
"best_avg_recency_weight": best_avg_recency_weight,
}
class BoundSelectionRule:
"""A selection rule bound with a set of arguments
Args:
args (dict): A set of keyword arguments, that should be sufficient
to call the function when a dataframe and train_end_time is added
function_name (string, optional): The name of a function in SELECTION_RULES
descriptive_name (string, optional): A descriptive name, used in charts
If none is given it will be automatically constructed
function (function, optional): A function
"""
def __init__(self, args, function_name=None, descriptive_name=None, function=None):
if not function_name and not function:
raise ValueError("Need either function_name or function")
if not descriptive_name and not function_name:
raise ValueError("Need either descriptive_name or function_name")
self.args = args
self.function_name = function_name
self._function = function
self._descriptive_name = descriptive_name
@property
def function(self):
if not self._function:
self._function = SELECTION_RULES[self.function_name]
return self._function
@property
def descriptive_name(self):
if not self._descriptive_name:
self._descriptive_name = self._build_descriptive_name()
return self._descriptive_name
def __str__(self):
return self.descriptive_name
def _build_descriptive_name(self):
"""Build a descriptive name for the bound selection rule
Constructed using the function name and arguments.
"""
argspec = inspect.getfullargspec(self.function)
args = [arg for arg in argspec.args if arg not in ["df", "train_end_time", "n"]]
return "_".join([self.function_name] + [str(self.args[key]) for key in args])
def pick(self, dataframe, train_end_time):
"""Run the selection rule for a given time on a dataframe
Args:
dataframe (pandas.DataFrame)
train_end_time (timestamp) Current train end time
Returns: (int) a model group id
"""
return self.function(dataframe, train_end_time, **(self.args))