BlueBrain/BluePyOpt

View on GitHub
bluepyopt/deapext/optimisationsCMA.py

Summary

Maintainability
D
1 day
Test Coverage
"""CMA Optimisation class"""

"""
Copyright (c) 2016-2022, EPFL/Blue Brain Project

 This file is part of BluePyOpt <https://github.com/BlueBrain/BluePyOpt>

 This library is free software; you can redistribute it and/or modify it under
 the terms of the GNU Lesser General Public License version 3.0 as published
 by the Free Software Foundation.

 This library is distributed in the hope that it will be useful, but WITHOUT
 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 details.

 You should have received a copy of the GNU Lesser General Public License
 along with this library; if not, write to the Free Software Foundation, Inc.,
 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""

import logging
import numpy
import pickle
import random
import functools
import shutil
import os
import time

import deap.tools

from .CMA_SO import CMA_SO
from .CMA_MO import CMA_MO
from . import utils

import bluepyopt.optimisations

logger = logging.getLogger("__main__")


def _ind_convert_space(ind, convert_fcn):
    """util function to pass the individual from normalized to real space and
    inversely"""
    return [f(x) for f, x in zip(convert_fcn, ind)]


class DEAPOptimisationCMA(bluepyopt.optimisations.Optimisation):

    """Optimisation class for CMA-based evolution strategies"""

    def __init__(
        self,
        evaluator=None,
        use_scoop=False,
        seed=1,
        offspring_size=None,
        centroids=None,
        sigma=0.4,
        map_function=None,
        hof=None,
        selector_name="single_objective",
        weight_hv=0.5,
        fitness_reduce=numpy.sum,
        use_stagnation_criterion=True,
    ):
        """Constructor

        Args:
            evaluator (Evaluator): Evaluator object
            use_scoop (bool): use scoop map for parallel computation
            seed (float): Random number generator seed
            offspring_size (int): Number of offspring individuals in each
                generation
            centroids (list): list of initial guesses used as the starting
                points of the CMA-ES
            sigma (float): initial standard deviation of the distribution
            map_function (function): Function used to map (parallelize) the
                evaluation function calls
            hof (hof): Hall of Fame object
            selector_name (str): The selector used in the evolutionary
                algorithm, possible values are 'single_objective' or
                'multi_objective'
            weight_hv (float): between 0 and 1. Weight given to the
                hyper-volume contribution when computing the score of an
                individual in MO-CMA. The weight of the fitness contribution
                is computed as 1 - weight_hv.
            fitness_reduce (fcn): function used to reduce the objective values
                to a single fitness score
            use_stagnation_criterion (bool): whether to use the stagnation
                stopping criterion on top of the maximum generation criterion
        """

        super(DEAPOptimisationCMA, self).__init__(evaluator=evaluator)

        self.use_scoop = use_scoop
        self.seed = seed
        self.map_function = map_function

        self.hof = hof
        if self.hof is None:
            self.hof = deap.tools.HallOfFame(10)

        self.offspring_size = offspring_size

        self.fitness_reduce = fitness_reduce
        self.centroids = centroids
        self.sigma = sigma

        if weight_hv > 1.0 or weight_hv < 0.0:
            raise Exception("weight_hv has to be between 0 and 1.")
        self.weight_hv = weight_hv

        self.selector_name = selector_name
        if self.selector_name == "single_objective":
            self.cma_creator = CMA_SO
        elif self.selector_name == "multi_objective":
            self.cma_creator = CMA_MO
        else:
            raise Exception(
                "The selector_name has to be 'single_objective' "
                "or 'multi_objective'. Not "
                "{}".format(self.selector_name)
            )

        self.use_stagnation_criterion = use_stagnation_criterion

        # Number of objective values
        self.problem_size = len(self.evaluator.params)

        # Number of parameters
        self.ind_size = len(self.evaluator.objectives)

        # Create a DEAP toolbox
        self.toolbox = deap.base.Toolbox()

        # Bounds for the parameters
        self.lbounds = [p.lower_bound for p in self.evaluator.params]
        self.ubounds = [p.upper_bound for p in self.evaluator.params]

        # Instantiate functions converting individuals from the original
        # parameter space to (and from) a normalized space bounded to [-1.;1]
        self.ubounds = numpy.asarray(self.ubounds)
        self.lbounds = numpy.asarray(self.lbounds)
        bounds_radius = (self.ubounds - self.lbounds) / 2.0
        bounds_mean = (self.ubounds + self.lbounds) / 2.0

        self.to_norm = []
        self.to_space = []
        for r, m in zip(bounds_radius, bounds_mean):
            self.to_norm.append(
                functools.partial(
                    lambda param, bm, br: (param - bm) / br,
                    bm=m,
                    br=r)
            )
            self.to_space.append(
                functools.partial(
                    lambda param, bm, br: (param * br) + bm,
                    bm=m,
                    br=r
                )
            )

        # Overwrite the bounds with -1. and 1.
        self.lbounds = numpy.full(self.problem_size, -1.0)
        self.ubounds = numpy.full(self.problem_size, 1.0)

        self.setup_deap()

        # In case initial guesses were provided, rescale them to the norm space
        if self.centroids is not None:
            self.centroids = [
                self.toolbox.Individual(_ind_convert_space(ind, self.to_norm))
                for ind in centroids
            ]

    def setup_deap(self):
        """Set up optimisation"""

        # Set random seed
        random.seed(self.seed)
        numpy.random.seed(self.seed)

        # Register the 'uniform' function
        self.toolbox.register(
            "uniformparams",
            utils.uniform,
            self.lbounds,
            self.ubounds,
            self.ind_size
        )

        # Register the individual format
        self.toolbox.register(
            "Individual",
            functools.partial(
                utils.WSListIndividual,
                obj_size=self.ind_size,
                reduce_fcn=self.fitness_reduce,
            ),
        )

        # A Random Individual is created by ListIndividual and parameters are
        # initially picked by 'uniform'
        self.toolbox.register(
            "RandomInd",
            deap.tools.initIterate,
            self.toolbox.Individual,
            self.toolbox.uniformparams,
        )

        # Register the population format. It is a list of individuals
        self.toolbox.register(
            "population", deap.tools.initRepeat, list, self.toolbox.RandomInd
        )

        # Register the evaluation function for the individuals
        self.toolbox.register(
            "evaluate",
            self.evaluator.init_simulator_and_evaluate_with_lists
        )

        import copyreg
        import types

        copyreg.pickle(types.MethodType, utils.reduce_method)

        if self.use_scoop:
            if self.map_function:
                raise Exception(
                    "Impossible to use scoop is providing self defined map "
                    "function: %s" % self.map_function
                )

            from scoop import futures

            self.toolbox.register("map", futures.map)

        elif self.map_function:
            self.toolbox.register("map", self.map_function)

    def run(
        self,
        max_ngen=0,
        cp_frequency=1,
        cp_period=None,
        continue_cp=False,
        cp_filename=None,
        terminator=None,
    ):
        """ Run the optimizer until a stopping criteria is met.

        Args:
            max_ngen(int): Total number of generation to run
            cp_frequency(int): generations between checkpoints
            cp_period(float): minimum time (in s) between checkpoint.
                None to save checkpoint independently of the time between them
            continue_cp(bool): whether to continue
            cp_filename(string): path to checkpoint filename
            terminator (multiprocessing.Event): exit loop when is set.
                Not taken into account if None.
        """
        if cp_filename:
            cp_filename_tmp = cp_filename + '.tmp'

        stats = self.get_stats()

        if continue_cp:

            # A file name has been given, then load the data from the file
            cp = pickle.load(open(cp_filename, "rb"))
            gen = cp["generation"]
            self.hof = cp["halloffame"]
            logbook = cp["logbook"]
            history = cp["history"]
            random.setstate(cp["rndstate"])
            numpy.random.set_state(cp["np_rndstate"])
            CMA_es = cp["CMA_es"]
            CMA_es.map_function = self.map_function

        else:

            history = deap.tools.History()
            logbook = deap.tools.Logbook()
            logbook.header = ["gen", "nevals"] + stats.fields

            # Instantiate the CMA strategy centered on the centroids
            CMA_es = self.cma_creator(
                centroids=self.centroids,
                offspring_size=self.offspring_size,
                sigma=self.sigma,
                max_ngen=max_ngen,
                IndCreator=self.toolbox.Individual,
                RandIndCreator=self.toolbox.RandomInd,
                map_function=self.map_function,
                use_scoop=self.use_scoop,
                use_stagnation_criterion=self.use_stagnation_criterion,
            )

            if self.selector_name == "multi_objective":
                CMA_es.weight_hv = self.weight_hv
                to_evaluate = CMA_es.get_parents(self.to_space)
                fitness = self.toolbox.map(self.toolbox.evaluate, to_evaluate)
                fitness = list(map(list, fitness))
                CMA_es.set_fitness_parents(fitness)

            gen = 1

        pop = CMA_es.get_population(self.to_space)

        param_names = []
        if hasattr(self.evaluator, "param_names"):
            param_names = self.evaluator.param_names

        time_last_save = time.time()
        # Run until a termination criteria is met
        while utils.run_next_gen(CMA_es.active, terminator):
            logger.info("Generation {}".format(gen))

            # Generate the new populations
            n_out = CMA_es.generate_new_pop(
                lbounds=self.lbounds, ubounds=self.ubounds
            )
            logger.debug(
                "Number of individuals outside of bounds: {} ({:.2f}%)".format(
                    n_out,
                    100.0 * n_out / len(CMA_es.population)
                )
            )

            # Get all the individuals in the original space for evaluation
            to_evaluate = CMA_es.get_population(self.to_space)

            # Compute the fitness
            fitness = self.toolbox.map(self.toolbox.evaluate, to_evaluate)
            fitness = list(map(list, fitness))
            nevals = len(to_evaluate)
            CMA_es.set_fitness(fitness)

            # Update the hall of fame, history and logbook
            pop = CMA_es.get_population(self.to_space)
            utils.update_history_and_hof(self.hof, history, pop)
            record = utils.record_stats(stats, logbook, gen, pop, nevals)
            logger.info(logbook.stream)

            # Update the CMA strategy using the new fitness and check if
            # termination conditions were reached
            CMA_es.update_strategy()
            CMA_es.check_termination(gen)
            if (
                cp_filename and
                cp_frequency and
                gen % cp_frequency == 0 and
                (cp_period is None or time.time() - time_last_save > cp_period)
            ):

                # Map function shouldn't be pickled
                temp_mf = CMA_es.map_function
                CMA_es.map_function = None

                cp = dict(
                    population=pop,
                    generation=gen,
                    halloffame=self.hof,
                    history=history,
                    logbook=logbook,
                    rndstate=random.getstate(),
                    np_rndstate=numpy.random.get_state(),
                    CMA_es=CMA_es,
                    param_names=param_names,
                )
                pickle.dump(cp, open(cp_filename_tmp, "wb"))
                if os.path.isfile(cp_filename_tmp):
                    shutil.copy(cp_filename_tmp, cp_filename)
                    logger.debug("Wrote checkpoint to %s", cp_filename)

                CMA_es.map_function = temp_mf

                time_last_save = time.time()

            gen += 1

        return pop, self.hof, logbook, history

    def get_stats(self):
        """Get the stats that will be saved during optimisation"""
        stats = deap.tools.Statistics(key=lambda ind: ind.fitness.reduce)
        stats.register("avg", numpy.mean)
        stats.register("std", numpy.std)
        stats.register("min", numpy.min)
        stats.register("max", numpy.max)
        return stats