kjappelbaum/pyepal

View on GitHub
src/pyepal/pal/core.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
# pylint:disable=anomalous-backslash-in-string
# Copyright 2020 PyePAL authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Core functions for PAL"""
from typing import List, Sequence, Tuple, Union

import numpy as np
from numba import jit

from .utils import dominance_check_jitted_2, dominance_check_jitted_3, is_pareto_efficient

__all__: List[str] = []


def _get_uncertainty_region(  # pylint:disable=invalid-name
    mu: np.array, std: np.array, beta_sqrt: float
) -> Tuple[np.array, np.array]:
    """

    Args:
        mu (float): mean
        std (float): standard deviation
        beta_sqrt (float): scaling factor

    Returns:
        Tuple[float, float]: lower bound, upper bound
    """
    low_lim, high_lim = mu - beta_sqrt * std, mu + beta_sqrt * std
    return low_lim, high_lim


def _get_uncertainty_regions(
    mus: np.array, stds: np.array, beta_sqrt: float
) -> Union[np.array, np.array]:
    """
    Compute the lower and upper bound of the uncertainty region
         for each dimension (=target)

    Args:
        mus (np.array): means
        stds (np.array): standard deviations
        beta_sqrt (float): scaling factors

    Returns:
        Union[np.array, np.array]: lower bounds, upper bounds
    """
    low_lims, high_lims = [], []

    for i in range(0, mus.shape[1]):
        low_lim, high_lim = _get_uncertainty_region(mus[:, i], stds[:, i], beta_sqrt)
        low_lims.append(low_lim.reshape(-1, 1))
        high_lims.append(high_lim.reshape(-1, 1))

    return np.hstack(low_lims), np.hstack(high_lims)


def _union(
    lows: np.array, ups: np.array, new_lows: np.array, new_ups: np.array
) -> Union[np.array, np.array]:
    """Performing iterative intersection (eq. 6 in PAL paper) in all dimensions.

    Args:
        lows (np.array): lower bounds from previous iteration
        ups (np.array): upper bounds from previous iteration
        new_lows (np.array): lower bounds from current iteration
        new_ups (np.array): upper bounds from current iteration

    Returns:
        Union[np.array, np.array]: lower bounds, upper bounds
    """
    out_lows = []
    out_ups = []

    for i in range(0, lows.shape[1]):
        low, up = _union_one_dim(  # pylint:disable=invalid-name
            lows[:, i], ups[:, i], new_lows[:, i], new_ups[:, i]
        )
        out_lows.append(low.reshape(-1, 1))
        out_ups.append(up.reshape(-1, 1))

    out_lows_array, out_ups_array = np.hstack(out_lows), np.hstack(out_ups)

    return out_lows_array, out_ups_array


@jit(nopython=True)
def _union_one_dim(
    lows: Sequence, ups: Sequence, new_lows: Sequence, new_ups: Sequence
) -> Tuple[np.array, np.array]:
    """Used to intersect the confidence regions, for eq. 6 of the PAL paper.
    The iterative intersection ensures that all uncertainty regions
    are non-increasing with t.

    We do not check for the ordering in this function.
    We really assume that the lower limits are the lower limits
    and the upper limits are the upper limits.

    All arrays must have the same length.

    Args:
        lows (Sequence): lower bounds from previous iteration
        ups (Sequence): upper bounds from previous iteration
        new_lows (Sequence): lower bounds from current iteration
        new_ups (Sequence): upper bounds from current iteration

    Returns:
        Tuple[np.array, np.array]: array of lower limits, array of upper limits
    """
    out_lows = []
    out_ups = []

    for i, low in enumerate(lows):
        # In one dimension we can imagine the following cases where there
        # is zero intersection
        # 1) |--old range--|   |--new range--|,
        # i.e., lower new limit above old upper limit
        # 2) |--new range--|   |--old range--|,
        # i.e., upper new limit below lower old limit
        if (new_lows[i] >= ups[i]) or (new_ups[i] <= low):
            out_lows.append(new_lows[i])
            out_ups.append(new_ups[i])

        # In other cases, we want to intersect the ranges, i.e.
        # |---old range-|-|--new-range--| --> |-|
        # i.e. we take the max of the lower limits and the min of the upper limits
        else:
            out_lows.append(max(low, new_lows[i]))
            out_ups.append(min(ups[i], new_ups[i]))

    return np.array(out_lows), np.array(out_ups)


def _pareto_classify(  # pylint:disable=too-many-arguments, too-many-locals, too-many-branches
    pareto_optimal_0: np.array,
    not_pareto_optimal_0: np.array,
    unclassified_0: np.array,
    rectangle_lows: np.array,
    rectangle_ups: np.array,
    epsilon: np.array,
    is_fixed_epsilon: bool = False,
) -> Tuple[np.array, np.array, np.array]:
    """Performs the classification of the algorithm
    (p. 4 of the PAL paper, see algorithm 1/2 of the epsilon-PAL paper)

    One core concept is that once a point is classified,
    it does no longer change the class.

    When we do the comparison with +/- epsilon we always use the absolute values!
    Otherwise, we get inconcistent results depending on the sign!

    Args:
        pareto_optimal_0 (np.array): boolean mask of points classified as Pareto optimal
        not_pareto_optimal_0 (np.array): boolean mask of points
            classified as non-Pareto optimal
        unclassified_0 (np.array): boolean mask of unclassified points
        rectangle_lows (np.array): lower uncertainty boundaries
        rectangle_ups (np.array): upper uncertainty boundaries
        epsilon (np.array): granularity parameter (one per dimension)
        is_fixed_epsilon (bool): If true it assumes that epsilon contains *absolute*
            tolerance values for every objective. These would typically be calculated as
            :math:`\epsilon_i = \varepsilon_i \cdot r_i`, where :math:`r_i` is the range
            of objective :math:`i`. By default this is False. This is, we will use
            :math:`\epsilon_i = \varepsilon_i \cdot y_i` to compute the objectives,
            which hence avoids the need of knowing the range of a objective before
            using the algorithm.

    Returns:
        Tuple[list, list, list]: binary encoded list of Pareto optimal,
            non-Pareto optimal and unclassified points
    """
    pareto_optimal_t = pareto_optimal_0.copy()
    not_pareto_optimal_t = not_pareto_optimal_0.copy()
    unclassified_t = unclassified_0.copy()

    # This part is only relevant when we have points in set P
    # Then we can use those points to discard points from p_pess (P \cup U)
    if sum(pareto_optimal_0) > 0:
        pareto_indices = np.where(pareto_optimal_0)[0]
        pareto_pessimistic_lows = rectangle_lows[pareto_indices]  # p_pess(P)

        if is_fixed_epsilon:
            tolerances_0 = np.tile(epsilon, (len(pareto_pessimistic_lows), 1))
        else:
            tolerances_0 = np.abs(epsilon * pareto_pessimistic_lows)

        for i in range(0, len(unclassified_0)):
            if unclassified_t[i] == 1:
                # discard if any lower-bound epsilon dominates the upper bound
                if dominance_check_jitted_2(
                    pareto_pessimistic_lows + tolerances_0,
                    rectangle_ups[i],
                ):
                    not_pareto_optimal_t[i] = True
                    unclassified_t[i] = False

    pareto_unclassified_indices = np.where(pareto_optimal_0 | unclassified_t)[0]
    pareto_unclassified_lows = rectangle_lows[pareto_unclassified_indices]

    # assuming maximization
    pareto_unclassified_pessimistic_mask = is_pareto_efficient(-pareto_unclassified_lows)
    original_indices = pareto_unclassified_indices[pareto_unclassified_pessimistic_mask]
    pareto_unclassified_pessimistic_points = pareto_unclassified_lows[
        pareto_unclassified_pessimistic_mask
    ]

    if is_fixed_epsilon:
        tolerances_1 = np.tile(epsilon, (len(pareto_unclassified_pessimistic_points), 1))
    else:
        tolerances_1 = epsilon * np.abs(pareto_unclassified_pessimistic_points)

    for i in range(0, len(unclassified_t)):  # pylint:disable=consider-using-enumerate
        # We can only discard points that are unclassified so far
        # We cannot discard points that are part of p_pess(P \cup U)
        if unclassified_t[i] and (i not in original_indices):
            # discard if any lower-bound epsilon dominates the upper bound
            if dominance_check_jitted_2(
                tolerances_1 + pareto_unclassified_pessimistic_points,
                rectangle_ups[i],
            ):
                not_pareto_optimal_t[i] = True
                unclassified_t[i] = False

    # now, update the pareto set
    # if there is no other point x' such that max(Rt(x')) >= min(Rt(x))
    # move x to Pareto
    unclassified_indices = np.where(unclassified_t | pareto_optimal_t)[0]
    unclassified_ups = rectangle_ups[unclassified_indices]

    index_map = {index: i for i, index in enumerate(unclassified_indices)}

    if is_fixed_epsilon:
        tolerances_2 = np.tile(epsilon, (len(rectangle_lows), 1))
    else:
        tolerances_2 = epsilon * np.abs(rectangle_lows)

    # The index map helps us to mask the current point from the unclassified_ups list
    for i in range(0, len(unclassified_t)):  # pylint:disable=consider-using-enumerate
        # again, we only care about unclassified points
        if unclassified_t[i]:
            # If there is no other point which up is epsilon dominating
            # the low of the current point,
            # the current point is epsilon-accurate Pareto optimal
            if not dominance_check_jitted_3(
                unclassified_ups,
                rectangle_lows[i] + tolerances_2[i],
                index_map[i],
            ):
                pareto_optimal_t[i] = True
                unclassified_t[i] = False

    return pareto_optimal_t, not_pareto_optimal_t, unclassified_t


@jit(nopython=True)
def _get_max_wt(  # pylint:disable=too-many-arguments
    rectangle_lows: np.array,
    rectangle_ups: np.array,
    means: np.array,
    pareto_optimal_t: np.array,
    unclassified_t: np.array,
    sampled: np.array,
    pooling_method: str = "fro",
    use_coef_var: bool = True,
) -> int:
    """Returns the index in design space with the maximum size of the hyperrectangle
    (scaled by the mean predictions, i.e., effectively,
    we use the coefficient of variation).
    Samples only from unclassified or Pareto-optimal points.

    Args:
        rectangle_lows (np.array): Lower, pessimistic, bounds of the hyperrectangles
        rectangle_ups (np.array): Upper, optimistic, bounds of the hyperrectangles
        means (np.array): Mean predictions
        pareto_optimal_t (np.array): Mask array that is True
            for the Pareto optimal points
        unclassified_t (np.array): Mask array that is True for the unclassified points
        sampled (np.array): Mask array that is True for the sampled points
        pooling_method (str): Swtich to select the pooling method with which
            the uncertainty in different objectives is aggregated into one number.
            Available options are "fro" (Frobenius/Euclidean norm), "mean", "median"
        use_coef_var (bool): If True, uses the coefficient of variation instead of
            the unscaled rectangle sizes

    Returns:
        int: index with maximum size of hyperrectangle
    """
    max_uncertainty = -np.inf
    maxid = 0

    pooling_method = pooling_method.lower()

    for i in range(0, len(unclassified_t)):  # pylint:disable=consider-using-enumerate
        # Among the points x ∈ Pt ∪ Ut, the one with the largest wt(x)
        # is chosen as the next sample xt to be evaluated.
        # Intuitively, this rule biases the sampling towards exploring,
        # and thus improving the model for, the points most likely to be Pareto-optimal.
        if ((unclassified_t[i] == 1) or (pareto_optimal_t[i] == 1)) and not sampled[i] == 1:
            # weight is the length of the diagonal of the uncertainty region
            if use_coef_var:
                uncer = np.divide(rectangle_ups[i, :] - rectangle_lows[i, :], means[i, :])
            else:
                uncer = rectangle_ups[i, :] - rectangle_lows[i, :]

            uncertainty = _pool(uncer, pooling_method)
            if uncertainty > max_uncertainty:
                max_uncertainty = uncertainty
                maxid = i

    return maxid


@jit(nopython=True)
def _get_max_wt_all(  # pylint:disable=too-many-arguments
    rectangle_lows: np.array,
    rectangle_ups: np.array,
    means: np.array,
    sampled: np.array,
    pooling_method: str = "fro",
    use_coef_var: bool = True,
) -> int:
    """Returns the index in design space with the maximum size of the hyperrectangle
    (scaled by the mean predictions, i.e., effectively,
    we use the coefficient of variation).
    Samples from *all* unsampled points. This is different from the `_get_max_wt`
    function that only samples from the Pareto-optimal or unclassified points that
    have not been sampled yet.

    Args:
        rectangle_lows (np.array): Lower, pessimistic, bounds of the hyperrectangles
        rectangle_ups (np.array): Upper, optimistic, bounds of the hyperrectangles
        means (np.array): Mean predictions
        sampled (np.array): Mask array that is True for the sampled points
        pooling_method (str): Swtich to select the pooling method with which the
            uncertainty in different objectives is aggregated into one number.
            Available options are: "fro" (Frobenius/Euclidean norm), "mean", "median"
        use_coef_var (bool): If True, uses the coefficient of variation instead of
            the unscaled rectangle sizes

    Returns:
        int: index with maximum size of hyperrectangle
    """
    max_uncertainty = -np.inf
    maxid = 0

    pooling_method = pooling_method.lower()

    for i in range(0, len(sampled)):  # pylint:disable=consider-using-enumerate
        # Among the points x ∈ Pt ∪ Ut, the one with the largest wt(x)
        # is chosen as the next sample xt to be evaluated.
        # Intuitively, this rule biases the sampling towards exploring,
        # and thus improving the model for, the points most likely to be Pareto-optimal.
        if not sampled[i] == 1:
            # weight is the length of the diagonal of the uncertainty region
            if use_coef_var:
                uncer = np.divide(rectangle_ups[i, :] - rectangle_lows[i, :], means[i, :])
            else:
                uncer = rectangle_ups[i, :] - rectangle_lows[i, :]
            uncertainty = _pool(uncer, pooling_method)
            if uncertainty > max_uncertainty:
                max_uncertainty = uncertainty
                maxid = i

    return maxid


@jit(nopython=True)
def _pool(array: np.ndarray, method: str) -> float:
    """Summarizes an array into one number

    Args:
        array (np.ndarray): array (,1) of numbers
        method (str): Swtich to select the pooling method with which the uncertainty
            in different objectives is aggregated into one number.
            Available options are: "fro" (Frobenius/Euclidean norm), "mean", "median".
            If the string cannot be matched, it defaults to the Frobenius norm.

    Returns:
        float: result of aggregation
    """
    if method == "fro":
        return np.linalg.norm(array)
    if method == "mean":
        return np.mean(array)
    if method == "median":
        return np.median(array)
    return np.linalg.norm(array)


def _uncertainty(rectangle_ups, rectangle_lows, means):
    """Hyperrectangle sizes"""
    if (rectangle_lows is None) or (rectangle_ups is None):
        raise ValueError(
            "You must have trained a model and\
             run the prediction to calculate hyperrectangle"
        )
    coeff_var = np.nan_to_num(np.divide(rectangle_ups - rectangle_lows, means))
    uncertainty = np.linalg.norm(coeff_var, axis=1)
    return uncertainty