biosustain/optlang

View on GitHub
src/optlang/inspyred_interface.py

Summary

Maintainability
B
4 hrs
Test Coverage
# Copyright 2013 Novo Nordisk Foundation Center for Biosustainability,
# Technical University of Denmark.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Interface to the inspyred heuristic optimization framework

Wraps the GLPK solver by subclassing and extending :class:`Model`,
:class:`Variable`, and :class:`Constraint` from :mod:`interface`.
"""

import logging
import random
import types

log = logging.getLogger(__name__)
import sympy
import interface


class Variable(interface.Variable):
    def __init__(self, *args, **kwargs):
        super(Variable, self).__init__(*args, **kwargs)


class Objective(interface.Objective):
    """docstring for Objective"""

    def __init__(self, expression, *args, **kwargs):
        super(Objective, self).__init__(expression, *args, **kwargs)

    @property
    def value(self):
        return self._value

    def __str__(self):
        if isinstance(self.expression, sympy.Basic):
            return super(Objective, self).__str__()
        else:
            return self.expression.__str__()
            # return ' '.join((self.direction, str(self.expression)))

    @property
    def expression(self):
        return self._expression

    @expression.setter
    def expression(self, value):
        self._expression = value


class VariableBounder(object):
    """This class defines a inspyred like Bounder.__init__.py

    TODO: Make this work also for integer and binary type variables?
    """

    def __init__(self, model):
        self.model = model

    def __call__(self, candidate, args):
        variables = self.model.variables
        bounded_candidate = list()
        for c, variable in zip(candidate, variables):
            if variable.type == 'continuous':
                bounded_candidate.append(max(min(c, variable.ub), variable.lb))
            elif variable.type == 'integer':
                bounded_candidate.append(min(range(variable.lb, variable.ub + 1), key=lambda x: abs(x - c)))
            elif variable.type == 'binary':
                # print min([0, 1], key=lambda x: abs(x-c))
                bounded_candidate.append(min([0, 1], key=lambda x: abs(x - c)))
        return bounded_candidate


class Configuration(interface.EvolutionaryOptimizationConfiguration):
    """docstring for Configuration"""

    class SubConfiguration(object):
        pass

    def __init__(self, *args, **kwargs):
        super(Configuration, self).__init__(*args, **kwargs)
        self._algorithm = inspyred.ec.GA
        self._algorithm.terminator = [inspyred.ec.terminators.time_termination,
                                      inspyred.ec.terminators.generation_termination,
                                      inspyred.ec.terminators.evaluation_termination,
                                      inspyred.ec.terminators.diversity_termination,
                                      inspyred.ec.terminators.average_fitness_termination]
        self.pop_size = 100
        self.seeds = []
        self.max_generations = 1
        self.max_evaluations = None
        self.max_time = None

        self.selector_config = self.SubConfiguration()
        self.selector_config.num_selected = None
        self.selector_config.tournament_size = 2
        self.selector_config.num_elites = 0

        self.variator_config = self.SubConfiguration()
        self.variator_config.mutation_rate = .1
        self.variator_config.crossover_rate = 1.
        self.variator_config.num_crossover_points = 1

        self.topology_config = self.SubConfiguration()
        self.topology_config.neighborhood_size = 5

        self.swarm_config = self.SubConfiguration()
        self.swarm_config.inertia = 0.5
        self.swarm_config.cognitive_rate = 2.1
        self.swarm_config.social_rate = 2.1

    @property
    def selector(self):
        return self._algorithm.selector

    @selector.setter
    def selector(self, value):
        self.algorithm.selector = value

    @property
    def variator(self):
        return self._algorithm.variator

    @variator.setter
    def variator(self, value):
        self._algorithm.variator = value

    @property
    def replacer(self):
        return self._algorithm.replacer

    @replacer.setter
    def replacer(self, value):
        self._algorithm.replacer = value

    @property
    def migrator(self):
        return self._algorithm.migrator

    @migrator.setter
    def migrator(self, value):
        self._algorithm.migrator = value

    @property
    def archiver(self):
        return self._algorithm.archiver

    @archiver.setter
    def archiver(self, value):
        self._algorithm.archiver = value

    @property
    def observer(self):
        return self._algorithm.observer

    @observer.setter
    def observer(self, value):
        self._algorithm.observer = value

    @property
    def terminator(self):
        return self._algorithm.terminator

    @terminator.setter
    def terminator(self, value):
        self._algorithm.terminator = value

    @property
    def topology(self):
        return self._algorithm.topology

    @topology.setter
    def topology(self, value):
        if value == 'Ring':
            self._algorithm.topology = inspyred.swarm.topologies.ring_topology
        elif value == 'Star':
            self._algorithm.topology = inspyred.swarm.topologies.star_topology
        elif isinstance(value, types.FunctionType):
            self._algorithm.topology = value
        else:
            raise ValueError("%s is not a supported topology. Try 'Star' or 'Ring' instead.")

    @property
    def algorithm(self):
        return self._algorithm

    @algorithm.setter
    def algorithm(self, value):
        init = False
        try:
            previous_selector = self._algorithm.selector
            previous_variator = self._algorithm.variator
            previous_replacer = self._algorithm.replacer
            # previous_migrator = self._algorithm.migrator
            # previous_archiver = self._algorithm.archiver
            # previous_observer = self._algorithm.observer
            # previous_terminator = self._algorithm.terminator
        except AttributeError:
            init = True

        if value == "EvolutionaryComputation":
            self._algorithm = inspyred.ec.EvolutionaryComputation
        elif value == "GeneticAlgorithm" or value == "GA":
            self._algorithm = inspyred.ec.GA(random)
        elif value == "ParticleSwarmOptimization" or value == "PSO":
            self._algorithm = inspyred.swarm.PSO(random)
        elif value == "AntColonySystem" or value == "ACS":
            self._algorithm = inspyred.swarm.ACS(random)
        elif value == "EvolutionaryStrategy" or value == "ES":
            self._algorithm = inspyred.ec.ES(random)
        elif value == "DifferentialEvolutionaryAlgorithm" or value == "DEA":
            self._algorithm = inspyred.ec.DEA(random)
        elif value == "SimulatedAnnealing" or value == "SA":
            self._algorithm = inspyred.ec.SA(random)
        elif value == "NSGA2":
            self._algorithm = inspyred.emo.NSGA2(random)
        elif value == "PAES":
            self._algorithm = inspyred.emo.PAES(random)
        elif value == "Pareto":
            self._algorithm = inspyred.emo.Pareto(random)
        else:
            raise ValueError(
                "%s is not a supported. Try one of the following instead:"
                "'GeneticAlgorithm', 'ParticleSwarmOptimization', 'EvolutionaryStrategy'."
                "TODO: be more specific here")
        # self._algorithm.terminator = self._default_terminator
        if init is False:
            self._algorithm.selector = previous_selector
            self._algorithm.variator = previous_variator
            self._algorithm.replacer = previous_replacer
            # previous_migrator = self._algorithm.migrator
            # previous_archiver = self._algorithm.archiver
            # previous_observer = self._algorithm.observer
            # previous_terminator = self._algorithm.terminator
            # TODO: setting a new algorithm should recycle old variators, selectors etc.

    def _evolve_kwargs(self):
        """Filter None keyword arguments. Intended to be passed on to algorithm.evolve(...)"""
        valid_evolve_kwargs = (
            'max_generations', 'max_evaluations', 'pop_size', 'neighborhood_size', 'tournament_size', 'mutation_rate')
        filtered_evolve_kwargs = dict()
        for key in valid_evolve_kwargs:
            attr_value = getattr(self, key)
            if attr_value is not None:
                filtered_evolve_kwargs[key] = attr_value
        # return filtered_evolve_kwargs
        return {}


class Model(interface.Model):
    """Interface"""

    def __init__(self, algorithm=None, *args, **kwargs):
        super(Model, self).__init__(*args, **kwargs)
        self.configuration = Configuration()
        if algorithm is None:
            self.configuration.algorithm = "GA"
        else:
            self.configuration.algorithm = algorithm
        self._bounder = VariableBounder(self)
        self._generator = self._generator

    def _generator(self, random, args):
        individual = list()
        for variable in self.variables:
            if variable.type == 'continuous':
                individual.append(random.uniform(variable.lb, variable.ub))
            else:
                individual.append(random.choice(range(variable.lb, variable.ub + 1)))
        return individual

    def _evaluator(self, candidates, args):
        fitness = list()
        for candidate in candidates:
            substitution_dict = dict(zip(self.variables, candidate))
            if isinstance(self.objective.expression, sympy.Basic):
                fitness.append(self.objective.expression.subs(substitution_dict))
            else:
                fitness.append(self.objective.expression(substitution_dict))
        return fitness

    # @inspyred.ec.evaluators.evaluator
    # def _evaluate(self, candidate, args):
    #     substitution_dict = dict(zip(self.variables, candidate))
    #     try:
    #         fitness = self.objective.expression.subs(substitution_dict)
    #     except AttributeError:
    #         fitness = self.objective.expression(substitution_dict)
    #     return fitness

    def optimize(self, *args, **kwargs):
        # import pdb; pdb.set_trace();
        final_population = self.configuration.algorithm.evolve(
            generator=self._generator,
            evaluator=self._evaluator,
            bounder=self._bounder,
            pop_size=self.configuration.pop_size,
            maximize={'max': True, 'min': False}[self.objective.direction],
            max_generations=self.configuration.max_generations,
            max_evaluations=self.configuration.max_evaluations,
            neighborhood_size=self.configuration.topology_config.neighborhood_size,
            mutation_rate=self.configuration.variator_config.mutation_rate,
            tournament_size=self.configuration.selector_config.tournament_size
        )
        return final_population


if __name__ == '__main__':
    # from optlang.interface import Objective, Variable
    import numpy
    import inspyred

    x = Variable('x', lb=0, ub=2)
    y = Variable('y', lb=0, ub=2)
    rosenbrock_obj = Objective((1 - x) ** 2 + 100 * (y - x ** 2) ** 2, name="Rosenbrock function", direction='min')
    print("The rosenbrock function:", rosenbrock_obj)
    print("The global minimum at (x,y) = (1,1) is", rosenbrock_obj.expression.subs({x: 1, y: 1}))

    problem = Model(name='rosenbrock', algorithm='PSO')
    # problem = Model(name='rosenbrock')

    problem.objective = rosenbrock_obj


    def my_observer(population, num_generations, num_evaluations, args):
        best = max(population)
        print(('{0:6} -- {1} : {2}'.format(num_generations,
                                           best.fitness,
                                           str(best.candidate))))


    problem.configuration.max_generations = 100
    problem.configuration.terminator = inspyred.ec.terminators.generation_termination
    problem.configuration.observer = my_observer
    problem.configuration.selector = inspyred.ec.selectors.tournament_selection
    final_pop = problem.optimize()
    fitnesses = [individual.fitness for individual in final_pop]
    print(fitnesses)
    print("mean", numpy.mean(fitnesses))
    print("max", numpy.max(fitnesses))
    print("min", numpy.min(fitnesses))
    # print numpy.std(fitnesses)