optimiser/population/crossentropy.py
"""
Implement Cross-Entropy Method (CEM) for optimization.
References:
1) de Boer, Kroese, Mannor, and Rubinstein. A tutorial on the
cross-entropy method. Annals of Operations Research, 2005.
Author: Nikolay Lysenko
"""
import os
from typing import Any, Dict, Callable, List, Optional
import numpy as np
from optimiser.utils import map_in_parallel
def estimate_at_random_point(
target_fn: Callable[[np.ndarray], float],
mean: np.ndarray,
std: float,
n_trials: int,
aggregation_fn: Callable[[List[float]], float],
target_fn_kwargs: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]: # pragma: no cover # Actually, this function is covered in child process.
"""
Generate point from given distribution and estimate function at it.
:param target_fn:
function to be estimated at a random point (it can be stochastic;
if so, its aggregated value is estimated)
:param mean:
mean of multivariate Gaussian distribution from which random point
is drawn
:param std:
standard deviation of multivariate Gaussian distribution
from which random point is drawn
:param n_trials:
number of runs for function estimation (it makes sense to set it
to more than 1 only if `target_fn` is stochastic)
:param aggregation_fn:
function that aggregates results from multiple trials into a single value
:param target_fn_kwargs:
additional keyword arguments for `target_fn`
:return:
record with sampled point and aggregated value of `target_fn` at it
"""
# Reseed `np` in order to surely have independent results among processes.
np.random.seed(int.from_bytes(os.urandom(4), byteorder='little'))
epsilons = np.random.randn(*mean.shape)
params = mean + std * epsilons
target_fn_kwargs = target_fn_kwargs or {}
results = [target_fn(params, **target_fn_kwargs) for _ in range(n_trials)]
score = aggregation_fn(results)
entry = {'params': params, 'score': score}
return entry
def report_current_results(
population: int,
top_entries: List[Dict[str, Any]],
best_entry: Dict[str, Any]
) -> None: # pragma: no cover
"""
Print current results in a format of table row.
:param population:
number of a current population
:param top_entries:
top entries for a current population
:param best_entry:
the best entry amongst all populations
:return:
None
"""
top_scores = [x['score'] for x in top_entries]
avg_top_score = np.mean(top_scores)
res = f"{population:>10} | {avg_top_score:>26} | {best_entry['score']:>19}"
print(res)
def optimize_with_crossentropy_method(
target_fn: Callable[[np.ndarray], float],
n_populations: int,
population_size: int,
elite_fraction: float,
smoothing_coef: float,
initial_mean: np.ndarray,
std: float,
n_trials: int,
aggregation_fn: str = 'mean',
target_fn_kwargs: Optional[Dict[str, Any]] = None,
parallelling_params: Optional[Dict[str, Any]] = None,
verbose: bool = False
) -> np.ndarray:
"""
Optimize with Cross-Entropy Method (CEM).
:param target_fn:
function to be maximized (it can be stochastic; if so, its
aggregated value is maximized)
:param n_populations:
number of training steps at each of which a pool of candidates is
evaluated, best candidates are selected, and sampling distribution
is updated based on them
:param population_size:
number of candidates to draw and evaluate at each training step
:param elite_fraction:
share of best candidates that are used for training update
:param smoothing_coef:
coefficient of smoothing for updates of candidates mean
:param initial_mean:
mean of multivariate Gaussian distribution from which candidates
are drawn initially
:param std:
standard deviation of all multivariate Gaussian distributions
from which candidates are drawn (for each training step, there is its
own distribution with its own mean)
:param n_trials:
number of runs for each candidate evaluation (it makes sense to set it
to more than 1 only if `target_fn` is stochastic)
:param aggregation_fn:
name of function that aggregates results from multiple trials into
a single score of candidate ('min', 'mean', 'median', and 'max'
are supported)
:param target_fn_kwargs:
additional keyword arguments for `target_fn`
:param parallelling_params:
settings of parallel evaluation of candidates;
by default, number of processes is set to number of cores
and each worker is not replaced with a newer one after some number of
tasks are finished
:param verbose:
if it is set to `True`, current progress is printed
after every population
:return:
found optimal parameters
"""
name_to_fn = {
'min': min,
'mean': np.mean,
'median': np.median,
'max': max
}
agg_fn = name_to_fn[aggregation_fn]
n_top_candidates = round(elite_fraction * population_size)
current_mean = initial_mean
best_entry = {'params': initial_mean, 'score': -np.inf}
extra_axis = len(initial_mean.shape)
if verbose:
print("population | avg_score_over_current_top | global_best_score")
for i_population in range(n_populations):
args = [
(target_fn, current_mean, std, n_trials, agg_fn, target_fn_kwargs)
for _ in range(population_size)
]
pool_kwargs = parallelling_params or {}
entries = map_in_parallel(estimate_at_random_point, args, pool_kwargs)
sorted_entries = sorted(entries, key=lambda x: x['score'])
top_entries = sorted_entries[-n_top_candidates:]
top_params = [
np.expand_dims(x['params'], axis=extra_axis)
for x in top_entries
]
top_params = np.concatenate(top_params, axis=extra_axis)
new_mean = np.mean(top_params, axis=extra_axis)
current_mean = (
smoothing_coef * current_mean + (1 - smoothing_coef) * new_mean
)
new_best_entry = top_entries[-1]
if new_best_entry['score'] > best_entry['score']:
best_entry = new_best_entry
if verbose:
report_current_results(i_population, top_entries, best_entry)
found_params = best_entry['params']
return found_params