src/pyepal/pal/core.py
# -*- coding: utf-8 -*-
# pylint:disable=anomalous-backslash-in-string
# Copyright 2020 PyePAL authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Core functions for PAL"""
from typing import List, Sequence, Tuple, Union
import numpy as np
from numba import jit
from .utils import dominance_check_jitted_2, dominance_check_jitted_3, is_pareto_efficient
__all__: List[str] = []
def _get_uncertainty_region( # pylint:disable=invalid-name
mu: np.array, std: np.array, beta_sqrt: float
) -> Tuple[np.array, np.array]:
"""
Args:
mu (float): mean
std (float): standard deviation
beta_sqrt (float): scaling factor
Returns:
Tuple[float, float]: lower bound, upper bound
"""
low_lim, high_lim = mu - beta_sqrt * std, mu + beta_sqrt * std
return low_lim, high_lim
def _get_uncertainty_regions(
mus: np.array, stds: np.array, beta_sqrt: float
) -> Union[np.array, np.array]:
"""
Compute the lower and upper bound of the uncertainty region
for each dimension (=target)
Args:
mus (np.array): means
stds (np.array): standard deviations
beta_sqrt (float): scaling factors
Returns:
Union[np.array, np.array]: lower bounds, upper bounds
"""
low_lims, high_lims = [], []
for i in range(0, mus.shape[1]):
low_lim, high_lim = _get_uncertainty_region(mus[:, i], stds[:, i], beta_sqrt)
low_lims.append(low_lim.reshape(-1, 1))
high_lims.append(high_lim.reshape(-1, 1))
return np.hstack(low_lims), np.hstack(high_lims)
def _union(
lows: np.array, ups: np.array, new_lows: np.array, new_ups: np.array
) -> Union[np.array, np.array]:
"""Performing iterative intersection (eq. 6 in PAL paper) in all dimensions.
Args:
lows (np.array): lower bounds from previous iteration
ups (np.array): upper bounds from previous iteration
new_lows (np.array): lower bounds from current iteration
new_ups (np.array): upper bounds from current iteration
Returns:
Union[np.array, np.array]: lower bounds, upper bounds
"""
out_lows = []
out_ups = []
for i in range(0, lows.shape[1]):
low, up = _union_one_dim( # pylint:disable=invalid-name
lows[:, i], ups[:, i], new_lows[:, i], new_ups[:, i]
)
out_lows.append(low.reshape(-1, 1))
out_ups.append(up.reshape(-1, 1))
out_lows_array, out_ups_array = np.hstack(out_lows), np.hstack(out_ups)
return out_lows_array, out_ups_array
@jit(nopython=True)
def _union_one_dim(
lows: Sequence, ups: Sequence, new_lows: Sequence, new_ups: Sequence
) -> Tuple[np.array, np.array]:
"""Used to intersect the confidence regions, for eq. 6 of the PAL paper.
The iterative intersection ensures that all uncertainty regions
are non-increasing with t.
We do not check for the ordering in this function.
We really assume that the lower limits are the lower limits
and the upper limits are the upper limits.
All arrays must have the same length.
Args:
lows (Sequence): lower bounds from previous iteration
ups (Sequence): upper bounds from previous iteration
new_lows (Sequence): lower bounds from current iteration
new_ups (Sequence): upper bounds from current iteration
Returns:
Tuple[np.array, np.array]: array of lower limits, array of upper limits
"""
out_lows = []
out_ups = []
for i, low in enumerate(lows):
# In one dimension we can imagine the following cases where there
# is zero intersection
# 1) |--old range--| |--new range--|,
# i.e., lower new limit above old upper limit
# 2) |--new range--| |--old range--|,
# i.e., upper new limit below lower old limit
if (new_lows[i] >= ups[i]) or (new_ups[i] <= low):
out_lows.append(new_lows[i])
out_ups.append(new_ups[i])
# In other cases, we want to intersect the ranges, i.e.
# |---old range-|-|--new-range--| --> |-|
# i.e. we take the max of the lower limits and the min of the upper limits
else:
out_lows.append(max(low, new_lows[i]))
out_ups.append(min(ups[i], new_ups[i]))
return np.array(out_lows), np.array(out_ups)
def _pareto_classify( # pylint:disable=too-many-arguments, too-many-locals, too-many-branches
pareto_optimal_0: np.array,
not_pareto_optimal_0: np.array,
unclassified_0: np.array,
rectangle_lows: np.array,
rectangle_ups: np.array,
epsilon: np.array,
is_fixed_epsilon: bool = False,
) -> Tuple[np.array, np.array, np.array]:
"""Performs the classification of the algorithm
(p. 4 of the PAL paper, see algorithm 1/2 of the epsilon-PAL paper)
One core concept is that once a point is classified,
it does no longer change the class.
When we do the comparison with +/- epsilon we always use the absolute values!
Otherwise, we get inconcistent results depending on the sign!
Args:
pareto_optimal_0 (np.array): boolean mask of points classified as Pareto optimal
not_pareto_optimal_0 (np.array): boolean mask of points
classified as non-Pareto optimal
unclassified_0 (np.array): boolean mask of unclassified points
rectangle_lows (np.array): lower uncertainty boundaries
rectangle_ups (np.array): upper uncertainty boundaries
epsilon (np.array): granularity parameter (one per dimension)
is_fixed_epsilon (bool): If true it assumes that epsilon contains *absolute*
tolerance values for every objective. These would typically be calculated as
:math:`\epsilon_i = \varepsilon_i \cdot r_i`, where :math:`r_i` is the range
of objective :math:`i`. By default this is False. This is, we will use
:math:`\epsilon_i = \varepsilon_i \cdot y_i` to compute the objectives,
which hence avoids the need of knowing the range of a objective before
using the algorithm.
Returns:
Tuple[list, list, list]: binary encoded list of Pareto optimal,
non-Pareto optimal and unclassified points
"""
pareto_optimal_t = pareto_optimal_0.copy()
not_pareto_optimal_t = not_pareto_optimal_0.copy()
unclassified_t = unclassified_0.copy()
# This part is only relevant when we have points in set P
# Then we can use those points to discard points from p_pess (P \cup U)
if sum(pareto_optimal_0) > 0:
pareto_indices = np.where(pareto_optimal_0)[0]
pareto_pessimistic_lows = rectangle_lows[pareto_indices] # p_pess(P)
if is_fixed_epsilon:
tolerances_0 = np.tile(epsilon, (len(pareto_pessimistic_lows), 1))
else:
tolerances_0 = np.abs(epsilon * pareto_pessimistic_lows)
for i in range(0, len(unclassified_0)):
if unclassified_t[i] == 1:
# discard if any lower-bound epsilon dominates the upper bound
if dominance_check_jitted_2(
pareto_pessimistic_lows + tolerances_0,
rectangle_ups[i],
):
not_pareto_optimal_t[i] = True
unclassified_t[i] = False
pareto_unclassified_indices = np.where(pareto_optimal_0 | unclassified_t)[0]
pareto_unclassified_lows = rectangle_lows[pareto_unclassified_indices]
# assuming maximization
pareto_unclassified_pessimistic_mask = is_pareto_efficient(-pareto_unclassified_lows)
original_indices = pareto_unclassified_indices[pareto_unclassified_pessimistic_mask]
pareto_unclassified_pessimistic_points = pareto_unclassified_lows[
pareto_unclassified_pessimistic_mask
]
if is_fixed_epsilon:
tolerances_1 = np.tile(epsilon, (len(pareto_unclassified_pessimistic_points), 1))
else:
tolerances_1 = epsilon * np.abs(pareto_unclassified_pessimistic_points)
for i in range(0, len(unclassified_t)): # pylint:disable=consider-using-enumerate
# We can only discard points that are unclassified so far
# We cannot discard points that are part of p_pess(P \cup U)
if unclassified_t[i] and (i not in original_indices):
# discard if any lower-bound epsilon dominates the upper bound
if dominance_check_jitted_2(
tolerances_1 + pareto_unclassified_pessimistic_points,
rectangle_ups[i],
):
not_pareto_optimal_t[i] = True
unclassified_t[i] = False
# now, update the pareto set
# if there is no other point x' such that max(Rt(x')) >= min(Rt(x))
# move x to Pareto
unclassified_indices = np.where(unclassified_t | pareto_optimal_t)[0]
unclassified_ups = rectangle_ups[unclassified_indices]
index_map = {index: i for i, index in enumerate(unclassified_indices)}
if is_fixed_epsilon:
tolerances_2 = np.tile(epsilon, (len(rectangle_lows), 1))
else:
tolerances_2 = epsilon * np.abs(rectangle_lows)
# The index map helps us to mask the current point from the unclassified_ups list
for i in range(0, len(unclassified_t)): # pylint:disable=consider-using-enumerate
# again, we only care about unclassified points
if unclassified_t[i]:
# If there is no other point which up is epsilon dominating
# the low of the current point,
# the current point is epsilon-accurate Pareto optimal
if not dominance_check_jitted_3(
unclassified_ups,
rectangle_lows[i] + tolerances_2[i],
index_map[i],
):
pareto_optimal_t[i] = True
unclassified_t[i] = False
return pareto_optimal_t, not_pareto_optimal_t, unclassified_t
@jit(nopython=True)
def _get_max_wt( # pylint:disable=too-many-arguments
rectangle_lows: np.array,
rectangle_ups: np.array,
means: np.array,
pareto_optimal_t: np.array,
unclassified_t: np.array,
sampled: np.array,
pooling_method: str = "fro",
use_coef_var: bool = True,
) -> int:
"""Returns the index in design space with the maximum size of the hyperrectangle
(scaled by the mean predictions, i.e., effectively,
we use the coefficient of variation).
Samples only from unclassified or Pareto-optimal points.
Args:
rectangle_lows (np.array): Lower, pessimistic, bounds of the hyperrectangles
rectangle_ups (np.array): Upper, optimistic, bounds of the hyperrectangles
means (np.array): Mean predictions
pareto_optimal_t (np.array): Mask array that is True
for the Pareto optimal points
unclassified_t (np.array): Mask array that is True for the unclassified points
sampled (np.array): Mask array that is True for the sampled points
pooling_method (str): Swtich to select the pooling method with which
the uncertainty in different objectives is aggregated into one number.
Available options are "fro" (Frobenius/Euclidean norm), "mean", "median"
use_coef_var (bool): If True, uses the coefficient of variation instead of
the unscaled rectangle sizes
Returns:
int: index with maximum size of hyperrectangle
"""
max_uncertainty = -np.inf
maxid = 0
pooling_method = pooling_method.lower()
for i in range(0, len(unclassified_t)): # pylint:disable=consider-using-enumerate
# Among the points x ∈ Pt ∪ Ut, the one with the largest wt(x)
# is chosen as the next sample xt to be evaluated.
# Intuitively, this rule biases the sampling towards exploring,
# and thus improving the model for, the points most likely to be Pareto-optimal.
if ((unclassified_t[i] == 1) or (pareto_optimal_t[i] == 1)) and not sampled[i] == 1:
# weight is the length of the diagonal of the uncertainty region
if use_coef_var:
uncer = np.divide(rectangle_ups[i, :] - rectangle_lows[i, :], means[i, :])
else:
uncer = rectangle_ups[i, :] - rectangle_lows[i, :]
uncertainty = _pool(uncer, pooling_method)
if uncertainty > max_uncertainty:
max_uncertainty = uncertainty
maxid = i
return maxid
@jit(nopython=True)
def _get_max_wt_all( # pylint:disable=too-many-arguments
rectangle_lows: np.array,
rectangle_ups: np.array,
means: np.array,
sampled: np.array,
pooling_method: str = "fro",
use_coef_var: bool = True,
) -> int:
"""Returns the index in design space with the maximum size of the hyperrectangle
(scaled by the mean predictions, i.e., effectively,
we use the coefficient of variation).
Samples from *all* unsampled points. This is different from the `_get_max_wt`
function that only samples from the Pareto-optimal or unclassified points that
have not been sampled yet.
Args:
rectangle_lows (np.array): Lower, pessimistic, bounds of the hyperrectangles
rectangle_ups (np.array): Upper, optimistic, bounds of the hyperrectangles
means (np.array): Mean predictions
sampled (np.array): Mask array that is True for the sampled points
pooling_method (str): Swtich to select the pooling method with which the
uncertainty in different objectives is aggregated into one number.
Available options are: "fro" (Frobenius/Euclidean norm), "mean", "median"
use_coef_var (bool): If True, uses the coefficient of variation instead of
the unscaled rectangle sizes
Returns:
int: index with maximum size of hyperrectangle
"""
max_uncertainty = -np.inf
maxid = 0
pooling_method = pooling_method.lower()
for i in range(0, len(sampled)): # pylint:disable=consider-using-enumerate
# Among the points x ∈ Pt ∪ Ut, the one with the largest wt(x)
# is chosen as the next sample xt to be evaluated.
# Intuitively, this rule biases the sampling towards exploring,
# and thus improving the model for, the points most likely to be Pareto-optimal.
if not sampled[i] == 1:
# weight is the length of the diagonal of the uncertainty region
if use_coef_var:
uncer = np.divide(rectangle_ups[i, :] - rectangle_lows[i, :], means[i, :])
else:
uncer = rectangle_ups[i, :] - rectangle_lows[i, :]
uncertainty = _pool(uncer, pooling_method)
if uncertainty > max_uncertainty:
max_uncertainty = uncertainty
maxid = i
return maxid
@jit(nopython=True)
def _pool(array: np.ndarray, method: str) -> float:
"""Summarizes an array into one number
Args:
array (np.ndarray): array (,1) of numbers
method (str): Swtich to select the pooling method with which the uncertainty
in different objectives is aggregated into one number.
Available options are: "fro" (Frobenius/Euclidean norm), "mean", "median".
If the string cannot be matched, it defaults to the Frobenius norm.
Returns:
float: result of aggregation
"""
if method == "fro":
return np.linalg.norm(array)
if method == "mean":
return np.mean(array)
if method == "median":
return np.median(array)
return np.linalg.norm(array)
def _uncertainty(rectangle_ups, rectangle_lows, means):
"""Hyperrectangle sizes"""
if (rectangle_lows is None) or (rectangle_ups is None):
raise ValueError(
"You must have trained a model and\
run the prediction to calculate hyperrectangle"
)
coeff_var = np.nan_to_num(np.divide(rectangle_ups - rectangle_lows, means))
uncertainty = np.linalg.norm(coeff_var, axis=1)
return uncertainty