alexandrebarachant/pyRiemann

View on GitHub
pyriemann/datasets/sampling.py

Summary

Maintainability
D
1 day
Test Coverage
from functools import partial
import warnings
import numpy as np
from scipy.stats import multivariate_normal
from sklearn.utils import check_random_state
from joblib import Parallel, delayed

from ..utils.base import sqrtm, expm
from ..utils.test import is_sym_pos_semi_def as is_spsd


def _pdf_r(r, sigma):
    """Pdf for the log of eigenvalues of a SPD matrix.

    Probability density function for the logarithm of the eigenvalues of a SPD
    matrix samples from the Riemannian Gaussian distribution. See Said et al.
    "Riemannian Gaussian distributions on the space of symmetric positive
    definite matrices" (2017) for the mathematical details.

    Parameters
    ----------
    r : ndarray, shape (n_dim,)
        Vector with the logarithm of the eigenvalues of a SPD matrix.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.

    Returns
    -------
    p : float
        Probability density function applied to data point r.
    """

    if (sigma <= 0):
        raise ValueError(f'sigma must be a positive number (Got {sigma})')

    n_dim = len(r)
    partial_1 = -np.sum(r**2) / (2 * sigma**2)
    partial_2 = 0
    for i in range(n_dim):
        for j in range(i + 1, n_dim):
            partial_2 = partial_2 + np.log(np.sinh(np.abs(r[i] - r[j]) / 2))

    return np.exp(partial_1 + partial_2)


def _rejection_sampling_2D_gfunction_plus(sigma, r_sample):
    """Auxiliary function for the 2D rejection sampling algorithm.

    It is used in the case where r is sampled with the function g+.

    Parameters
    ----------
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    r_samples : ndarray, shape (1, n_dim)
        Sample of the r parameters of the Riemannian Gaussian distribution.

    Returns
    -------
    p : float
        Probability of acceptation.

    Notes
    -----
    .. versionadded:: 0.4

    """
    mu_a = np.array([-sigma**2 / 2, (sigma**2) / 2])
    cov_matrix = (sigma**2) * np.eye(2)
    m = np.pi * (sigma**2) * np.exp(sigma**2 / 4)
    if r_sample[0] >= r_sample[1]:
        num = _pdf_r(r_sample, sigma)
        den = multivariate_normal.pdf(r_sample, mean=mu_a, cov=cov_matrix)*m
        return num / den
    return 0


def _rejection_sampling_2D_gfunction_minus(sigma, r_sample):
    """Auxiliary function for the 2D rejection sampling algorithm.

    It is used in the case where r is sampled with the function g-.

    Parameters
    ----------
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    r_samples : ndarray, shape (1, n_dim)
        Sample of the r parameters of the Riemannian Gaussian distribution.

    Returns
    -------
    p : float
        Probability of acceptation.

    Notes
    -----
    .. versionadded:: 0.4

    """
    mu_b = np.array([(sigma**2) / 2, -sigma**2 / 2])
    cov_matrix = (sigma**2) * np.eye(2)
    m = np.pi * (sigma**2) * np.exp(sigma**2 / 4)
    if r_sample[0] < r_sample[1]:
        num = _pdf_r(r_sample, sigma)
        den = multivariate_normal.pdf(r_sample, mean=mu_b, cov=cov_matrix) * m
        return num / den
    return 0


def _rejection_sampling_2D(n_samples, sigma, random_state=None,
                           return_acceptance_rate=False):
    """Rejection sampling algorithm for the 2D case.

    Implementation of a rejection sampling algorithm. The implementation
    follows the description given in page 528 of Christopher Bishop's book
    "Pattern recognition and Machine Learning" (2006).

    Parameters
    ----------
    n_samples : int
        Number of samples to get from the target distribution.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    return_acceptance_rate : boolean, default=False
        Whether to return the acceptance rate with the sample (number of
        samples obtained divided by the number of samples generated by
        the algorithm).

        .. versionadded:: 0.5

    Returns
    -------
    r_samples : ndarray, shape (n_samples, n_dim)
        Samples of the r parameters of the Riemannian Gaussian distribution.
    acceptance_rate : float
        Acceptance rate empirically computed for the generation of the sample.
        Only returned if ``return_acceptance_rate=True``.

    Notes
    -----
    .. versionadded:: 0.4

    """
    mu_a = np.array([-sigma**2 / 2, (sigma**2) / 2])
    mu_b = np.array([(sigma**2) / 2, -sigma**2 / 2])
    cov_matrix = (sigma**2) * np.eye(2)
    r_samples = []
    cpt = 0
    acc = 0
    rs = check_random_state(random_state)
    while cpt != n_samples:
        acc += 1
        if (rs.binomial(1, 0.5, 1) == 1):
            r_sample = multivariate_normal.rvs(mu_a, cov_matrix, 1, rs)
            res = _rejection_sampling_2D_gfunction_plus(sigma, r_sample)
            if rs.rand(1) < res:
                r_samples.append(r_sample)
                cpt += 1
        else:
            r_sample = multivariate_normal.rvs(mu_b, cov_matrix, 1, rs)
            res = _rejection_sampling_2D_gfunction_minus(sigma, r_sample)
            if rs.rand(1) < res:
                r_samples.append(r_sample)
                cpt += 1

    if return_acceptance_rate:
        return np.array(r_samples), n_samples / acc
    return np.array(r_samples)


def _slice_one_sample(ptarget, x0, w, rs):
    """Slice sampling for one sample

    Parameters
    ----------
    ptarget : function with one input
        The target pdf to sample from or a multiple of it.
    x0 : array
        Initial state for the MCMC procedure. Note that the shape of this array
        defines the dimensionality n_dim of the data points to be sampled.
    w : float
        Initial bracket width.
    rs : int, RandomState instance or None
        Pass an int for reproducible output across multiple function calls.

    Returns
    -------
    sample : ndarray, shape (n_dim,)
        Sample from the target pdf.
    """
    xt = np.copy(x0)
    n_dim = len(x0)

    for i in range(n_dim):

        ei = np.zeros(n_dim)
        ei[i] = 1

        # step 1 : evaluate ptarget(xt)
        Px = ptarget(xt)

        # step 2 : draw vertical coordinate uprime ~ U(0, ptarget(xt))
        uprime_i = Px * rs.rand()

        # step 3 : create a horizontal interval (xl_i, xr_i) enclosing xt_i
        r = rs.rand()
        xl_i = xt[i] - r * w
        xr_i = xt[i] + (1-r) * w
        while ptarget(xt + (xl_i - xt[i]) * ei) > uprime_i:
            xl_i = xl_i - w
        while ptarget(xt + (xr_i - xt[i]) * ei) > uprime_i:
            xr_i = xr_i + w

        # step 4 : loop
        while True:
            xprime_i = xl_i + (xr_i - xl_i) * rs.rand()
            Px = ptarget(xt + (xprime_i - xt[i]) * ei)
            if Px > uprime_i:
                break
            else:
                if xprime_i > xt[i]:
                    xr_i = xprime_i
                else:
                    xl_i = xprime_i

        # store coordinate i of new sample
        xt = np.copy(xt)
        xt[i] = xprime_i

    return xt


def _slice_sampling(ptarget, n_samples, x0, n_burnin=20, thin=10,
                    random_state=None, n_jobs=1):
    """Slice sampling procedure.

    Implementation of a slice sampling algorithm for sampling from any target
    pdf or a multiple of it. The implementation follows the description given
    in page 375 of David McKay's book "Information Theory, Inference, and
    Learning Algorithms" (2003).

    Parameters
    ----------
    ptarget : function with one input
        The target pdf to sample from or a multiple of it.
    n_samples : int
        How many samples to get from the ptarget distribution.
    x0 : array
        Initial state for the MCMC procedure. Note that the shape of this array
        defines the dimensionality n_dim of the data points to be sampled.
    n_burnin : int, default=20
        How many samples to discard from the beginning of the chain generated
        by the slice sampling procedure. Usually the first samples are prone to
        non-stationary behavior and do not follow very well the target pdf.
    thin : int, default=10
        Thinning factor for the slice sampling procedure. MCMC samples are
        often correlated between them, so taking one sample every `thin`
        samples can help reducing this correlation. Note that this makes the
        algorithm actually sample `thin x n_samples` samples from the pdf, so
        expect the whole sampling procedure to take longer.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    n_jobs : int, default=1
        The number of jobs to use for the computation. This works by computing
        each of the class centroid in parallel. If -1 all CPUs are used.

    Returns
    -------
    samples : ndarray, shape (n_samples, n_dim)
        Samples from the target pdf.
    """

    if (n_samples <= 0) or (not isinstance(n_samples, int)):
        raise ValueError(
            f'n_samples must be a positive integer (Got {n_samples})')
    if (n_burnin <= 0) or (not isinstance(n_burnin, int)):
        raise ValueError(
            f'n_samples must be a positive integer (Got {n_burnin})')
    if (thin <= 0) or (not isinstance(thin, int)):
        raise ValueError(f'thin must be a positive integer (Got {thin})')

    rs = check_random_state(random_state)
    w = 1.0  # initial bracket width

    n_samples_total = (n_samples + n_burnin) * thin

    samples = Parallel(n_jobs=n_jobs)(
        delayed(_slice_one_sample)(ptarget, x0, w, rs)
        for _ in range(n_samples_total))

    samples = np.array(samples)[(n_burnin * thin):][::thin]

    return samples


def _sample_parameter_r(n_samples, n_dim, sigma,
                        random_state=None, n_jobs=1, sampling_method='auto'):
    """Sample the r parameters of a Riemannian Gaussian distribution.

    Sample the logarithm of the eigenvalues of a SPD matrix following a
    Riemannian Gaussian distribution.

    See [1]_ for the mathematical details.

    Parameters
    ----------
    n_samples : int
        How many samples to generate.
    n_dim : int
        Dimensionality of the SPD matrices to be sampled.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    n_jobs : int, default=1
        The number of jobs to use for the computation. This works by computing
        each of the class centroid in parallel. If -1 all CPUs are used.
    sampling_method : str, default='auto'
        Name of the sampling method used to sample samples_r. It can be
        'auto', 'slice' or 'rejection'. If it is 'auto', the sampling_method
        will be equal to 'slice' for n_dim != 2 and equal to
        'rejection' for n_dim = 2.

        .. versionadded:: 0.4

    Returns
    -------
    r_samples : ndarray, shape (n_samples, n_dim)
        Samples of the r parameters of the Riemannian Gaussian distribution.

    References
    ----------
    .. [1] `Riemannian Gaussian distributions on the space of symmetric
        positive definite matrices
        <https://hal.archives-ouvertes.fr/hal-01710191>`_
        S. Said, L. Bombrun, Y. Berthoumieu, and J. Manton. IEEE Trans Inf
        Theory, vol. 63, pp. 2153–2170, 2017.
    """
    if sampling_method not in ['slice', 'rejection', 'auto']:
        raise ValueError(f'Unknown sampling method {sampling_method},'
                         'try slice or rejection')
    if n_dim == 2 and sampling_method != "slice":
        return _rejection_sampling_2D(n_samples, sigma,
                                      random_state=random_state)
    if n_dim != 2 and sampling_method == "rejection":
        raise ValueError(
            f'n_dim={n_dim} is not yet supported with rejection sampling')
    rs = check_random_state(random_state)
    x0 = rs.randn(n_dim)
    ptarget = partial(_pdf_r, sigma=sigma)
    r_samples = _slice_sampling(
        ptarget,
        n_samples=n_samples,
        x0=x0,
        random_state=random_state,
        n_jobs=n_jobs,
        )

    return r_samples


def _sample_parameter_U(n_samples, n_dim, random_state=None):
    """Sample the U parameters of a Riemannian Gaussian distribution.

    Sample the eigenvectors of a SPD matrix following a Riemannian Gaussian
    distribution.

    See https://arxiv.org/pdf/1507.01760.pdf for the mathematical details.

    Parameters
    ----------
    n_samples : int
        How many samples to generate.
    n_dim : int
        Dimensionality of the SPD matrices to be sampled.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.

    Returns
    -------
    u_samples : ndarray, shape (n_samples, n_dim)
        Samples of the U parameters of the Riemannian Gaussian distribution.
    """

    rs = check_random_state(random_state)
    u_samples = np.zeros((n_samples, n_dim, n_dim))
    for i in range(n_samples):
        A = rs.randn(n_dim, n_dim)
        Q, _ = np.linalg.qr(A)
        u_samples[i] = Q

    return u_samples


def _sample_gaussian_spd_centered(n_matrices, n_dim, sigma, random_state=None,
                                  n_jobs=1, sampling_method='auto'):
    """Sample a Riemannian Gaussian distribution centered at the Identity.

    Sample SPD matrices from a Riemannian Gaussian distribution centered at the
    Identity, which has the role of the origin in the SPD manifold, and
    dispersion parametrized by sigma. See [1]_ for the mathematical details.

    Parameters
    ----------
    n_matrices : int
        How many matrices to generate.
    n_dim : int
        Dimensionality of the SPD matrices to be sampled.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    n_jobs : int, default=1
        The number of jobs to use for the computation. This works by computing
        each of the class centroid in parallel. If -1 all CPUs are used.
    sampling_method : str, default='auto'
        Name of the sampling method used to sample samples_r. It can be
        'auto', 'slice' or 'rejection'. If it is 'auto', the sampling_method
        will be equal to 'slice' for n_dim != 2 and equal to
        'rejection' for n_dim = 2.

        .. versionadded:: 0.4

    Returns
    -------
    samples : ndarray, shape (n_matrices, n_dim, n_dim)
        Samples of the Riemannian Gaussian distribution.

    Notes
    -----
    .. versionadded:: 0.3

    References
    ----------
    .. [1] `Riemannian Gaussian distributions on the space of symmetric
        positive definite matrices
        <https://hal.archives-ouvertes.fr/hal-01710191>`_
        S. Said, L. Bombrun, Y. Berthoumieu, and J. Manton. IEEE Trans Inf
        Theory, vol. 63, pp. 2153–2170, 2017.
    """

    samples_r = _sample_parameter_r(n_samples=n_matrices,
                                    n_dim=n_dim,
                                    sigma=sigma,
                                    random_state=random_state,
                                    n_jobs=n_jobs,
                                    sampling_method=sampling_method)
    samples_U = _sample_parameter_U(n_samples=n_matrices,
                                    n_dim=n_dim,
                                    random_state=random_state)

    samples = np.zeros((n_matrices, n_dim, n_dim))
    for i in range(n_matrices):
        Ui = samples_U[i]
        ri = samples_r[i]
        samples[i] = Ui.T @ np.diag(np.exp(ri)) @ Ui
        samples[i] = 0.5 * (samples[i] + samples[i].T)

    return samples


def sample_gaussian_spd(n_matrices, mean, sigma, random_state=None,
                        n_jobs=1, sampling_method='auto'):
    """Sample a Riemannian Gaussian distribution.

    Sample SPD matrices from a Riemannian Gaussian distribution centered at
    mean and with dispersion parametrized by sigma. This distribution has been
    defined in [1]_ and generalizes the notion of a Gaussian distribution to
    the space of SPD matrices. The sampling is based on a spectral
    factorization of SPD matrices in terms of their eigenvectors (U-parameters)
    and the log of the eigenvalues (r-parameters).

    Parameters
    ----------
    n_matrices : int
        How many matrices to generate.
    mean : ndarray, shape (n_dim, n_dim)
        Center of the Riemannian Gaussian distribution.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    n_jobs : int, default=1
        The number of jobs to use for the computation. This works by computing
        each of the class centroid in parallel. If -1 all CPUs are used.
    sampling_method : str, default='auto'
        Sampling method to sample eigenvalues. It can be
        'auto', 'slice' or 'rejection'. If it is 'auto', the sampling_method
        will be equal to 'slice' for n_dim != 2 and equal to
        'rejection' for n_dim = 2.

        .. versionadded:: 0.4

    Returns
    -------
    samples : ndarray, shape (n_matrices, n_dim, n_dim)
        Samples of the Riemannian Gaussian distribution.

    Notes
    -----
    .. versionadded:: 0.3

    References
    ----------
    .. [1] `Riemannian Gaussian distributions on the space of symmetric
        positive definite matrices
        <https://hal.archives-ouvertes.fr/hal-01710191>`_
        S. Said, L. Bombrun, Y. Berthoumieu, and J. Manton. IEEE Trans Inf
        Theory, vol. 63, pp. 2153–2170, 2017.
    """

    n_dim = mean.shape[0]
    # dispersion is corrected w.r.t. dimension
    samples_centered = _sample_gaussian_spd_centered(
        n_matrices=n_matrices,
        n_dim=n_dim,
        sigma=sigma / np.sqrt(n_dim),
        random_state=random_state,
        n_jobs=n_jobs,
        sampling_method=sampling_method
    )

    # apply the parallel transport to mean on each of the samples
    mean_sqrt = sqrtm(mean)
    samples = mean_sqrt @ samples_centered @ mean_sqrt

    if not is_spsd(samples):
        msg = "Some of the sampled matrices are very badly conditioned and \
               may not behave numerically as a SPD matrix. Try sampling again \
               or reducing the dimensionality of the matrix."
        warnings.warn(msg)

    return samples


def generate_random_spd_matrix(n_dim, random_state=None, *, mat_mean=.0,
                               mat_std=1.):
    """Generate a random SPD matrix.

    Parameters
    ----------
    n_dim : int
        Dimensionality of the matrix to sample.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    mat_mean : float, default=0.0
        Mean of random values to generate matrix.
    mat_std : float, default=1.0
        Standard deviation of random values to generate matrix.

    Returns
    -------
    C : ndarray, shape (n_dim, n_dim)
        Random SPD matrix.

    Notes
    -----
    .. versionadded:: 0.3
    """

    if (n_dim <= 0) or (not isinstance(n_dim, int)):
        raise ValueError(
            f'n_samples must be a positive integer (Got {n_dim})')

    rs = check_random_state(random_state)
    A = mat_mean + mat_std * rs.randn(n_dim, n_dim)
    A = 0.5 * (A + A.T)
    C = expm(A)

    if not is_spsd(C):
        msg = "The sampled matrix is very badly conditioned and may not \
               behave numerically as a SPD matrix. Try sampling again or \
               reducing the dimensionality of the matrix."
        warnings.warn(msg)

    return C