FilippoAiraldi/vectorized-particle-swarm

View on GitHub
src/vpso/ask_and_tell.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
import numba as nb
import numpy as np

from vpso.math import pso_equation
from vpso.mutation import mutate
from vpso.reparation import repair_out_of_bounds
from vpso.typing import Array1d, Array2d, Array3d


@nb.njit(
    nb.types.UniTuple(nb.float64[:, :, :], 2)(
        nb.float64[:, :, :],  # x
        nb.float64[:, :, :],  # px
        nb.float64[:, :],  # pf
        nb.float64[:, :, :],  # sx
        nb.float64[:, :, :],  # v
        nb.float64[:, :, :],  # v_max
        nb.float64[:, :, :],  # lb
        nb.float64[:, :, :],  # ub
        nb.int32,  # nvec
        nb.int32,  # dim
        nb.float64[:, :, :],  # w
        nb.float64[:, :, :],  # c1
        nb.float64[:, :, :],  # c2
        nb.int32,  # iters
        nb.bool_,  # perturb_best
        nb.float64,  # mutation_prob
        nb.types.NumPyRandomGeneratorType("NumPyRandomGeneratorType"),
    ),
    cache=True,
    nogil=True,
)
def generate_offsprings(
    x: Array3d,
    px: Array3d,
    pf: Array2d,
    sx: Array3d,
    v: Array3d,
    v_max: Array3d,
    lb: Array3d,
    ub: Array3d,
    nvec: int,
    dim: int,
    w: Array3d,
    c1: Array3d,
    c2: Array3d,
    repair_iters: int,
    perturb_best: bool,
    mutation_prob: float,
    np_random: np.random.Generator,
) -> tuple[Array3d, Array3d]:
    """Given the current swarm, generates the next generation of the particle swarm.

    Parameters
    ----------
    x : 3d array
        Current positions of the particles. An array of shape `(N, M, d)`, where `N` is
        the number of vectorized problems to solve simultaneously, `M` is the number of
        particles in the swarm, and `d` is the dimension of the search space.
    px : 3d array
        Best positions of the particles so far. An array of shape `(N, M, d)`.
    pf : 2d array
        Best values of the particles so far. An array of shape `(N, M)`.
    sx : 3d array
        Social best, i.e., the best particle so far. An array of shape `(N, 1, d)`.
    v : 3d array
        Current velocities of the particles. An array of shape `(N, M, d)`.
    v_max : 3d array
        Maximum velocities of the particles. An array of shape `(N, 1, d)`.
    lb : 3d array
        Lower bound of the search space. An array of shape `(N, 1, d)`.
    ub : 3d array
        Upper bound of the search space. An array of shape `(N, 1, d)`.
    nvec : int
        Number of vectorized problems.
    dim : int
        Dimensionality of the problem.
    w : 3d array
        Inertia weight. An array of shape `(N, 1, 1)`, where each element is used for
        the corresponding problem.
    c1 : 3d array
        Cognitive weight. An array of shape `(N, 1, 1)`, where each element is used for
        the corresponding problem.
    c2 : 3d array
        Social weight. An array of shape `(N, 1, 1)`, where each element is used for
        the corresponding problem.
    repair_iters : int
        Number of iterations to try to repair the particles before random sampling.
    perturb_best : bool
        Whether to perturb the best particle or not.
    mutation_prob : float
        Probability of applying the mutation.
    np_random : np.random.Generator
        Random number generator.

    Returns
    -------
    tuple of 3d arrays
        The positions and velocities of the particles in the next generation.
    """
    x_new, v_new = pso_equation(x, px, sx, v, v_max, w, c1, c2, np_random)
    x_new, v_new = repair_out_of_bounds(
        x, x_new, v_new, px, sx, v, v_max, lb, ub, w, c1, c2, repair_iters, np_random
    )
    if perturb_best:
        mutate(x_new, px, pf, lb, ub, nvec, dim, mutation_prob, np_random)
    return x_new, v_new


@nb.njit(
    nb.types.Tuple((nb.float64[:, :, :], nb.float64[:, :]))(
        nb.float64[:, :, :],  # x
        nb.float64[:, :],  # f
        nb.float64[:, :, :],  # px
        nb.float64[:, :],  # pf
    ),
    cache=True,
    nogil=True,
)
def advance_population(
    x: Array3d, f: Array2d, px: Array3d, pf: Array2d
) -> tuple[Array3d, Array2d]:
    """Advances the population by replacing the particles with better positions.

    Parameters
    ----------
    x : 3d array
        Current positions of the particles. An array of shape `(N, M, d)`, where `N` is
        the number of vectorized problems to solve simultaneously, `M` is the number of
        particles in the swarm, and `d` is the dimension of the search space.
    f : 2d array
        Current values of the particles. An array of shape `(N, M)`.
    px : 3d array
        Best positions of the particles so far. An array of shape `(N, M, d)`.
    pf : 2d array
        Best values of the particles so far. An array of shape `(N, M)`.

    Returns
    -------
    tuple of 3d and 2d arrays
        Returns the new positions and values of the best particles.
    """
    improvement_mask = f < pf
    px = np.where(improvement_mask[:, :, np.newaxis], x, px)
    pf = np.where(improvement_mask, f, pf)
    return px, pf


def get_best(px: Array3d, pf: Array2d, nvec: int) -> tuple[Array3d, Array1d]:
    """Returns the best particle and its value for each problem.

    Parameters
    ----------
    px : 3d array
        Best positions of the particles so far. An array of shape `(N, M, d)`, where `N`
        is the number of vectorized problems to solve simultaneously, `M` is the number
        of particles in the swarm, and `d` is the dimension of the search space.
    pf : 2d array
        Best values of the particles so far. An array of shape `(N, M)`.
    nvec : int
        Number of vectorized problems.

    Returns
    -------
    tuple of 3d and 1d arrays
        The best particle and its value for each problem with shape `(N, 1, d)` and
        `(N,)`, respectively.
    """
    idx = np.arange(nvec)
    k = pf.argmin(1)
    sx = px[idx, np.newaxis, k]  # (social/global) best particle
    sf = pf[idx, k]  # (social/global) best value
    return sx, sf