skbonus/meta/_explainable_regressor.py
from __future__ import annotations
from typing import Any, Optional
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, RegressorMixin, clone
from sklearn.tree import DecisionTreeRegressor
from sklearn.utils.validation import (
check_X_y,
check_is_fitted,
check_array,
_check_sample_weight,
)
from . import utils
from interpret.glassbox.ebm.ebm import EBMExplanation
class ExplainableBoostingMetaRegressor(BaseEstimator, RegressorMixin):
"""
A meta regressor that outputs a transparent, explainable model given blackbox models.
It works exactly like the `ExplainableBoostingRegressor` by the interpretml team, but here you can choose any base regressor instead of
being restricted to trees. For example, you can use scikit-learn's `IsotonicRegression` to create a model that is
monotonically increasing or decreasing in some of the features, while still being explainable and well-performing.
See the notes below to find a nice explanation of how the algorithm works at a high level.
Parameters
----------
base_regressor : Any, default=DecisionTreeRegressor(max_depth=4)
A single scikit-learn compatible regressor or a list of those regressors of length `n_features`.
max_rounds : int, default=5000
Conduct the boosting for these many rounds.
learning_rate : float, default=0.01
The learning rate. Should be quite small.
grid_points : int, default=1000
The more grid points, the
- more detailed the explanations get and
- the better the model performs, but
- the slower the algorithm gets.
Examples
--------
>>> import numpy as np
>>> from sklearn.isotonic import IsotonicRegression
>>> np.random.seed(0)
>>> X = np.random.randn(100, 2)
>>> y = 2 * X[:, 0] - 3 * X[:, 1] + np.random.randn(100)
>>> e = ExplainableBoostingMetaRegressor(
... base_regressor=[IsotonicRegression(), IsotonicRegression(increasing=False)],
... grid_points=20
... ).fit(X, y)
>>> e.score(X, y)
0.9377382292348461
>>> e.outputs_[0] # increasing in the first feature, as it should be
array([-4.47984456, -4.47984456, -4.47984456, -4.47984456, -3.00182713,
-2.96627696, -1.60843287, -1.06601264, -0.92013822, -0.7217753 ,
-0.66440783, 0.28132994, 1.33664486, 1.47592253, 1.96677286,
2.88969439, 2.96292906, 4.33642573, 4.38506967, 6.42967225])
>>> e.outputs_[1] # decreasing in the second feature, as it should be
array([ 6.35605214, 6.06407947, 6.05458114, 4.8488004 , 4.41880876,
3.45056373, 2.64560385, 1.6138303 , 0.89860987, 0.458301 ,
0.33455608, -0.43609495, -1.55600464, -2.05142528, -2.42791679,
-3.58961475, -4.80134218, -4.94421252, -5.94858712, -6.36828774])
Notes
-----
Check out the original author's Github at https://github.com/interpretml/interpret and https://www.youtube.com/watch?v=MREiHgHgl0k
for a great introduction into the operations of the algorithm.
"""
def __init__(
self,
base_regressor: Any = None,
max_rounds: int = 5000,
learning_rate: float = 0.01,
grid_points: int = 1000,
feature_names: Optional[list] = None
) -> None:
"""Initialize."""
self.base_regressor = base_regressor
self.max_rounds = max_rounds
self.learning_rate = learning_rate
self.grid_points = grid_points
self.feature_names = feature_names
def fit(
self, X: np.ndarray, y: np.ndarray, sample_weight: np.ndarray = None
) -> ExplainableBoostingMetaRegressor:
"""
Fit the model.
Parameters
----------
X : np.ndarray of shape (n_samples, n_features)
The training data.
y : np.ndarray, 1-dimensional
The target values.
Returns
-------
ExplainableBoostingMetaRegressor
Fitted regressor.
"""
if isinstance(X, pd.DataFrame):
self.feature_names_ = X.columns.tolist()
elif self.feature_names is not None:
self.feature_names_ = list(self.feature_names)
else:
self.feature_names_ = None
X, y = check_X_y(X, y)
sample_weight = _check_sample_weight(sample_weight, X)
self._check_n_features(X, reset=True)
if not isinstance(self.base_regressor, list):
if self.base_regressor is not None:
self.base_regressors_ = self.n_features_in_ * [self.base_regressor]
else:
self.base_regressors_ = self.n_features_in_ * [
DecisionTreeRegressor(max_depth=4)
]
else:
if len(self.base_regressor) == self.n_features_in_:
self.base_regressors_ = self.base_regressor
else:
raise ValueError(
"Number of regressors in base_regressor should be the same as the number of features."
)
if self.learning_rate <= 0:
raise ValueError("learning_rate has to be strictly positive!")
self.domains_ = [
np.linspace(feature_min, feature_max, self.grid_points)
for feature_min, feature_max in zip(X.min(axis=0), X.max(axis=0))
]
self.outputs_ = [np.zeros_like(domain) for domain in self.domains_]
self.mean_ = y.mean()
y_copy = y.copy() - self.mean_
self._fit(X, sample_weight, y_copy)
self.feature_importances_ = self._feature_importances_(X)
self.selector_ = self._selector(X)
return self
def _fit(self, X, sample_weight, y_copy):
for i in range(self.max_rounds):
feature_number = i % self.n_features_in_
h = clone(self.base_regressors_[feature_number])
x = X[:, feature_number].reshape(-1, 1)
h.fit(x, y_copy, sample_weight=sample_weight)
self.outputs_[feature_number] += self.learning_rate * h.predict(
self.domains_[feature_number].reshape(-1, 1)
)
y_copy -= self.learning_rate * h.predict(x)
def predict(self, X: np.ndarray) -> np.ndarray:
"""
Get predictions.
Parameters
----------
X : np.ndarray, shape (n_samples, n_features)
Samples to get predictions of.
Returns
-------
y : np.ndarray, shape (n_samples,)
The predicted values.
"""
X = check_array(X)
check_is_fitted(self)
self._check_n_features(X, reset=False)
n = len(X)
res = np.zeros(n)
for feature_number in range(self.n_features_in_):
grid = self.domains_[feature_number]
feature_outputs = self.outputs_[feature_number][
np.abs(
np.repeat(grid.reshape(-1, 1), n, axis=1) - X[:, feature_number]
).argmin(axis=0)
]
res += feature_outputs
return res + self.mean_
def explain_global(self, name=None):
"""Provides global explanation for model.
Args:
name: User-defined explanation name.
Returns:
An explanation object,
visualizing feature-value pairs as horizontal bar chart.
"""
lower_bound = np.inf
upper_bound = -np.inf
for feature_group_index in range(self.n_features_in_):
errors = utils.fake_std(self)[feature_group_index]
scores = self.outputs_[feature_group_index]
lower_bound = min(lower_bound, np.min(scores - errors))
upper_bound = max(upper_bound, np.max(scores + errors))
bounds = (lower_bound, upper_bound)
# Add per feature graph
data_dicts = []
feature_list = []
density_list = []
for feature_group_index, feature_indexes in enumerate(
[[i] for i in range(self.n_features_in_)]
):
model_graph = self.outputs_[feature_group_index]
# NOTE: This uses stddev. for bounds, consider issue warnings.
errors = utils.fake_std(self)[feature_group_index]
if len(feature_indexes) == 1:
# hack. remove the 0th index which is for missing values
model_graph = model_graph[1:]
errors = errors[1:]
bin_labels = self.domains_[feature_indexes[0]]
scores = list(model_graph)
upper_bounds = list(model_graph + errors)
lower_bounds = list(model_graph - errors)
density_dict = {
"names": utils.get_fake_hist_edges(self, feature_indexes[0]),
"scores": utils.get_fake_hist_counts(self, feature_indexes[0]),
}
feature_dict = {
"type": "univariate",
"names": bin_labels,
"scores": scores,
"scores_range": bounds,
"upper_bounds": upper_bounds,
"lower_bounds": lower_bounds,
}
feature_list.append(feature_dict)
density_list.append(density_dict)
data_dict = {
"type": "univariate",
"names": bin_labels,
"scores": model_graph,
"scores_range": bounds,
"upper_bounds": model_graph + errors,
"lower_bounds": model_graph - errors,
"density": {
"names": utils.get_fake_hist_edges(self, feature_indexes[0]),
"scores": utils.get_fake_hist_counts(self, feature_indexes[0]),
},
}
data_dicts.append(data_dict)
else:
raise Exception("Interactions greater than 2 not supported.")
overall_dict = {
"type": "univariate",
"names": self.feature_names_,
"scores": self.feature_importances_,
}
internal_obj = {
"overall": overall_dict,
"specific": data_dicts,
"mli": [
{
"explanation_type": "ebm_global",
"value": {"feature_list": feature_list},
},
{"explanation_type": "density", "value": {"density": density_list}},
],
}
return EBMExplanation(
"global",
internal_obj,
feature_names=self.feature_names_,
feature_types=["continuous"] * self.n_features_in_,
name='ExplainableBoostingMetaRegressor',
selector=self.selector_,
)
def _feature_importances_(self, X):
res = []
X_ = check_array(X)
n = len(X_)
for feature_number in range(self.n_features_in_):
grid = self.domains_[feature_number]
feature_outputs = self.outputs_[feature_number][
np.abs(
np.repeat(grid.reshape(-1, 1), n, axis=1) - X_[:, feature_number]
).argmin(axis=0)
]
res.append(feature_outputs)
return np.mean(np.abs(res), axis=1)
def _selector(self, X):
if self.feature_names_ is None:
self.feature_names_ = range(X.shape[1])
return pd.DataFrame(
data={
"Name": self.feature_names_,
"Type": "continuous",
"# Unique": len(np.unique(X)),
"% Non-zero": (X != 0).mean(),
},
)