SimonBlanke/Gradient-Free-Optimizers

View on GitHub
gradient_free_optimizers/optimizers/smb_opt/smbo.py

Summary

Maintainability
A
1 hr
Test Coverage
# Author: Simon Blanke
# Email: simon.blanke@yahoo.com
# License: MIT License


from ..local_opt import HillClimbingOptimizer
from .sampling import InitialSampler

import numpy as np

np.seterr(divide="ignore", invalid="ignore")


class SMBO(HillClimbingOptimizer):
    def __init__(
        self,
        *args,
        warm_start_smbo=None,
        max_sample_size=10000000,
        sampling={"random": 1000000},
        **kwargs
    ):
        super().__init__(*args, **kwargs)

        self.warm_start_smbo = warm_start_smbo
        self.max_sample_size = max_sample_size
        self.sampling = sampling

        self.sampler = InitialSampler(self.conv, max_sample_size)

        self.init_warm_start_smbo(warm_start_smbo)

    def init_warm_start_smbo(self, search_data):
        if search_data is not None:
            # filter out nan and inf
            warm_start_smbo = search_data[
                ~search_data.isin([np.nan, np.inf, -np.inf]).any(axis=1)
            ]

            # filter out elements that are not in search space
            int_idx_list = []
            for para_name in self.conv.para_names:
                search_data_dim = warm_start_smbo[para_name].values
                search_space_dim = self.conv.search_space[para_name]

                int_idx = np.nonzero(np.in1d(search_data_dim, search_space_dim))[0]
                int_idx_list.append(int_idx)

            intersec = int_idx_list[0]
            for int_idx in int_idx_list[1:]:
                intersec = np.intersect1d(intersec, int_idx)
            warm_start_smbo_f = warm_start_smbo.iloc[intersec]

            X_sample_values = warm_start_smbo_f[self.conv.para_names].values
            Y_sample = warm_start_smbo_f["score"].values

            self.X_sample = self.conv.values2positions(X_sample_values)
            self.Y_sample = list(Y_sample)

        else:
            self.X_sample = []
            self.Y_sample = []

    def track_X_sample(iterate):
        def wrapper(self, *args, **kwargs):
            pos = iterate(self, *args, **kwargs)
            self.X_sample.append(pos)
            return pos

        return wrapper

    def track_y_sample(evaluate):
        def wrapper(self, score):
            evaluate(self, score)

            if np.isnan(score) or np.isinf(score):
                del self.X_sample[-1]
            else:
                self.Y_sample.append(score)

        return wrapper

    def _sampling(self, all_pos_comb):
        if self.sampling is False:
            return all_pos_comb
        elif "random" in self.sampling:
            return self.random_sampling(all_pos_comb)

    def random_sampling(self, pos_comb):
        n_samples = self.sampling["random"]
        n_pos_comb = pos_comb.shape[0]

        if n_pos_comb <= n_samples:
            return pos_comb
        else:
            _idx_sample = np.random.choice(n_pos_comb, n_samples, replace=False)
            pos_comb_sampled = pos_comb[_idx_sample, :]
            return pos_comb_sampled

    def _all_possible_pos(self):
        pos_space = self.sampler.get_pos_space()
        n_dim = len(pos_space)
        all_pos_comb = np.array(np.meshgrid(*pos_space)).T.reshape(-1, n_dim)

        all_pos_comb_constr = []
        for pos in all_pos_comb:
            if self.conv.not_in_constraint(pos):
                all_pos_comb_constr.append(pos)

        all_pos_comb_constr = np.array(all_pos_comb_constr)
        return all_pos_comb_constr

    def memory_warning(self, max_sample_size):
        if (
            self.conv.search_space_size > self.warnings
            and max_sample_size > self.warnings
        ):
            warning_message0 = "\n Warning:"
            warning_message1 = (
                "\n search space size of "
                + str(self.conv.search_space_size)
                + " exceeding recommended limit."
            )
            warning_message3 = "\n Reduce search space size for better performance."
            print(warning_message0 + warning_message1 + warning_message3)

    @track_X_sample
    def init_pos(self):
        return super().init_pos()

    @HillClimbingOptimizer.track_new_pos
    @track_X_sample
    def iterate(self):
        return self._propose_location()

    @HillClimbingOptimizer.track_new_score
    @track_y_sample
    def evaluate(self, score_new):
        self._evaluate_new2current(score_new)
        self._evaluate_current2best()

    @HillClimbingOptimizer.track_new_score
    @track_y_sample
    def evaluate_init(self, score_new):
        self._evaluate_new2current(score_new)
        self._evaluate_current2best()

    def _propose_location(self):
        try:
            self._training()
        except ValueError:
            print(
                "Warning: training sequential model failed. Performing random iteration instead."
            )
            return self.move_random()

        exp_imp = self._expected_improvement()

        index_best = list(exp_imp.argsort()[::-1])
        all_pos_comb_sorted = self.pos_comb[index_best]
        pos_best = all_pos_comb_sorted[0]

        return pos_best