
View on GitHub


1 day
Test Coverage
from functools import partial
import warnings
import numpy as np
from scipy.stats import multivariate_normal
from sklearn.utils import check_random_state
from joblib import Parallel, delayed

from ..utils.base import sqrtm, expm
from ..utils.test import is_sym_pos_semi_def as is_spsd

def _pdf_r(r, sigma):
    """Pdf for the log of eigenvalues of a SPD matrix.

    Probability density function for the logarithm of the eigenvalues of a SPD
    matrix samples from the Riemannian Gaussian distribution. See Said et al.
    "Riemannian Gaussian distributions on the space of symmetric positive
    definite matrices" (2017) for the mathematical details.

    r : ndarray, shape (n_dim,)
        Vector with the logarithm of the eigenvalues of a SPD matrix.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.

    p : float
        Probability density function applied to data point r.

    if (sigma <= 0):
        raise ValueError(f'sigma must be a positive number (Got {sigma})')

    n_dim = len(r)
    partial_1 = -np.sum(r**2) / (2 * sigma**2)
    partial_2 = 0
    for i in range(n_dim):
        for j in range(i + 1, n_dim):
            partial_2 = partial_2 + np.log(np.sinh(np.abs(r[i] - r[j]) / 2))

    return np.exp(partial_1 + partial_2)

def _rejection_sampling_2D_gfunction_plus(sigma, r_sample):
    """Auxiliary function for the 2D rejection sampling algorithm.

    It is used in the case where r is sampled with the function g+.

    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    r_samples : ndarray, shape (1, n_dim)
        Sample of the r parameters of the Riemannian Gaussian distribution.

    p : float
        Probability of acceptation.

    .. versionadded:: 0.4

    mu_a = np.array([-sigma**2 / 2, (sigma**2) / 2])
    cov_matrix = (sigma**2) * np.eye(2)
    m = np.pi * (sigma**2) * np.exp(sigma**2 / 4)
    if r_sample[0] >= r_sample[1]:
        num = _pdf_r(r_sample, sigma)
        den = multivariate_normal.pdf(r_sample, mean=mu_a, cov=cov_matrix)*m
        return num / den
    return 0

def _rejection_sampling_2D_gfunction_minus(sigma, r_sample):
    """Auxiliary function for the 2D rejection sampling algorithm.

    It is used in the case where r is sampled with the function g-.

    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    r_samples : ndarray, shape (1, n_dim)
        Sample of the r parameters of the Riemannian Gaussian distribution.

    p : float
        Probability of acceptation.

    .. versionadded:: 0.4

    mu_b = np.array([(sigma**2) / 2, -sigma**2 / 2])
    cov_matrix = (sigma**2) * np.eye(2)
    m = np.pi * (sigma**2) * np.exp(sigma**2 / 4)
    if r_sample[0] < r_sample[1]:
        num = _pdf_r(r_sample, sigma)
        den = multivariate_normal.pdf(r_sample, mean=mu_b, cov=cov_matrix) * m
        return num / den
    return 0

def _rejection_sampling_2D(n_samples, sigma, random_state=None,
    """Rejection sampling algorithm for the 2D case.

    Implementation of a rejection sampling algorithm. The implementation
    follows the description given in page 528 of Christopher Bishop's book
    "Pattern recognition and Machine Learning" (2006).

    n_samples : int
        Number of samples to get from the target distribution.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    return_acceptance_rate : boolean, default=False
        Whether to return the acceptance rate with the sample (number of
        samples obtained divided by the number of samples generated by
        the algorithm).

        .. versionadded:: 0.5

    r_samples : ndarray, shape (n_samples, n_dim)
        Samples of the r parameters of the Riemannian Gaussian distribution.
    acceptance_rate : float
        Acceptance rate empirically computed for the generation of the sample.
        Only returned if ``return_acceptance_rate=True``.

    .. versionadded:: 0.4

    mu_a = np.array([-sigma**2 / 2, (sigma**2) / 2])
    mu_b = np.array([(sigma**2) / 2, -sigma**2 / 2])
    cov_matrix = (sigma**2) * np.eye(2)
    r_samples = []
    cpt = 0
    acc = 0
    rs = check_random_state(random_state)
    while cpt != n_samples:
        acc += 1
        if (rs.binomial(1, 0.5, 1) == 1):
            r_sample = multivariate_normal.rvs(mu_a, cov_matrix, 1, rs)
            res = _rejection_sampling_2D_gfunction_plus(sigma, r_sample)
            if rs.rand(1) < res:
                cpt += 1
            r_sample = multivariate_normal.rvs(mu_b, cov_matrix, 1, rs)
            res = _rejection_sampling_2D_gfunction_minus(sigma, r_sample)
            if rs.rand(1) < res:
                cpt += 1

    if return_acceptance_rate:
        return np.array(r_samples), n_samples / acc
    return np.array(r_samples)

def _slice_one_sample(ptarget, x0, w, rs):
    """Slice sampling for one sample

    ptarget : function with one input
        The target pdf to sample from or a multiple of it.
    x0 : array
        Initial state for the MCMC procedure. Note that the shape of this array
        defines the dimensionality n_dim of the data points to be sampled.
    w : float
        Initial bracket width.
    rs : int, RandomState instance or None
        Pass an int for reproducible output across multiple function calls.

    sample : ndarray, shape (n_dim,)
        Sample from the target pdf.
    xt = np.copy(x0)
    n_dim = len(x0)

    for i in range(n_dim):

        ei = np.zeros(n_dim)
        ei[i] = 1

        # step 1 : evaluate ptarget(xt)
        Px = ptarget(xt)

        # step 2 : draw vertical coordinate uprime ~ U(0, ptarget(xt))
        uprime_i = Px * rs.rand()

        # step 3 : create a horizontal interval (xl_i, xr_i) enclosing xt_i
        r = rs.rand()
        xl_i = xt[i] - r * w
        xr_i = xt[i] + (1-r) * w
        while ptarget(xt + (xl_i - xt[i]) * ei) > uprime_i:
            xl_i = xl_i - w
        while ptarget(xt + (xr_i - xt[i]) * ei) > uprime_i:
            xr_i = xr_i + w

        # step 4 : loop
        while True:
            xprime_i = xl_i + (xr_i - xl_i) * rs.rand()
            Px = ptarget(xt + (xprime_i - xt[i]) * ei)
            if Px > uprime_i:
                if xprime_i > xt[i]:
                    xr_i = xprime_i
                    xl_i = xprime_i

        # store coordinate i of new sample
        xt = np.copy(xt)
        xt[i] = xprime_i

    return xt

def _slice_sampling(ptarget, n_samples, x0, n_burnin=20, thin=10,
                    random_state=None, n_jobs=1):
    """Slice sampling procedure.

    Implementation of a slice sampling algorithm for sampling from any target
    pdf or a multiple of it. The implementation follows the description given
    in page 375 of David McKay's book "Information Theory, Inference, and
    Learning Algorithms" (2003).

    ptarget : function with one input
        The target pdf to sample from or a multiple of it.
    n_samples : int
        How many samples to get from the ptarget distribution.
    x0 : array
        Initial state for the MCMC procedure. Note that the shape of this array
        defines the dimensionality n_dim of the data points to be sampled.
    n_burnin : int, default=20
        How many samples to discard from the beginning of the chain generated
        by the slice sampling procedure. Usually the first samples are prone to
        non-stationary behavior and do not follow very well the target pdf.
    thin : int, default=10
        Thinning factor for the slice sampling procedure. MCMC samples are
        often correlated between them, so taking one sample every `thin`
        samples can help reducing this correlation. Note that this makes the
        algorithm actually sample `thin x n_samples` samples from the pdf, so
        expect the whole sampling procedure to take longer.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    n_jobs : int, default=1
        The number of jobs to use for the computation. This works by computing
        each of the class centroid in parallel. If -1 all CPUs are used.

    samples : ndarray, shape (n_samples, n_dim)
        Samples from the target pdf.

    if (n_samples <= 0) or (not isinstance(n_samples, int)):
        raise ValueError(
            f'n_samples must be a positive integer (Got {n_samples})')
    if (n_burnin <= 0) or (not isinstance(n_burnin, int)):
        raise ValueError(
            f'n_samples must be a positive integer (Got {n_burnin})')
    if (thin <= 0) or (not isinstance(thin, int)):
        raise ValueError(f'thin must be a positive integer (Got {thin})')

    rs = check_random_state(random_state)
    w = 1.0  # initial bracket width

    n_samples_total = (n_samples + n_burnin) * thin

    samples = Parallel(n_jobs=n_jobs)(
        delayed(_slice_one_sample)(ptarget, x0, w, rs)
        for _ in range(n_samples_total))

    samples = np.array(samples)[(n_burnin * thin):][::thin]

    return samples

def _sample_parameter_r(n_samples, n_dim, sigma,
                        random_state=None, n_jobs=1, sampling_method='auto'):
    """Sample the r parameters of a Riemannian Gaussian distribution.

    Sample the logarithm of the eigenvalues of a SPD matrix following a
    Riemannian Gaussian distribution.

    See [1]_ for the mathematical details.

    n_samples : int
        How many samples to generate.
    n_dim : int
        Dimensionality of the SPD matrices to be sampled.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    n_jobs : int, default=1
        The number of jobs to use for the computation. This works by computing
        each of the class centroid in parallel. If -1 all CPUs are used.
    sampling_method : str, default='auto'
        Name of the sampling method used to sample samples_r. It can be
        'auto', 'slice' or 'rejection'. If it is 'auto', the sampling_method
        will be equal to 'slice' for n_dim != 2 and equal to
        'rejection' for n_dim = 2.

        .. versionadded:: 0.4

    r_samples : ndarray, shape (n_samples, n_dim)
        Samples of the r parameters of the Riemannian Gaussian distribution.

    .. [1] `Riemannian Gaussian distributions on the space of symmetric
        positive definite matrices
        S. Said, L. Bombrun, Y. Berthoumieu, and J. Manton. IEEE Trans Inf
        Theory, vol. 63, pp. 2153–2170, 2017.
    if sampling_method not in ['slice', 'rejection', 'auto']:
        raise ValueError(f'Unknown sampling method {sampling_method},'
                         'try slice or rejection')
    if n_dim == 2 and sampling_method != "slice":
        return _rejection_sampling_2D(n_samples, sigma,
    if n_dim != 2 and sampling_method == "rejection":
        raise ValueError(
            f'n_dim={n_dim} is not yet supported with rejection sampling')
    rs = check_random_state(random_state)
    x0 = rs.randn(n_dim)
    ptarget = partial(_pdf_r, sigma=sigma)
    r_samples = _slice_sampling(

    return r_samples

def _sample_parameter_U(n_samples, n_dim, random_state=None):
    """Sample the U parameters of a Riemannian Gaussian distribution.

    Sample the eigenvectors of a SPD matrix following a Riemannian Gaussian

    See for the mathematical details.

    n_samples : int
        How many samples to generate.
    n_dim : int
        Dimensionality of the SPD matrices to be sampled.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.

    u_samples : ndarray, shape (n_samples, n_dim)
        Samples of the U parameters of the Riemannian Gaussian distribution.

    rs = check_random_state(random_state)
    u_samples = np.zeros((n_samples, n_dim, n_dim))
    for i in range(n_samples):
        A = rs.randn(n_dim, n_dim)
        Q, _ = np.linalg.qr(A)
        u_samples[i] = Q

    return u_samples

def _sample_gaussian_spd_centered(n_matrices, n_dim, sigma, random_state=None,
                                  n_jobs=1, sampling_method='auto'):
    """Sample a Riemannian Gaussian distribution centered at the Identity.

    Sample SPD matrices from a Riemannian Gaussian distribution centered at the
    Identity, which has the role of the origin in the SPD manifold, and
    dispersion parametrized by sigma. See [1]_ for the mathematical details.

    n_matrices : int
        How many matrices to generate.
    n_dim : int
        Dimensionality of the SPD matrices to be sampled.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    n_jobs : int, default=1
        The number of jobs to use for the computation. This works by computing
        each of the class centroid in parallel. If -1 all CPUs are used.
    sampling_method : str, default='auto'
        Name of the sampling method used to sample samples_r. It can be
        'auto', 'slice' or 'rejection'. If it is 'auto', the sampling_method
        will be equal to 'slice' for n_dim != 2 and equal to
        'rejection' for n_dim = 2.

        .. versionadded:: 0.4

    samples : ndarray, shape (n_matrices, n_dim, n_dim)
        Samples of the Riemannian Gaussian distribution.

    .. versionadded:: 0.3

    .. [1] `Riemannian Gaussian distributions on the space of symmetric
        positive definite matrices
        S. Said, L. Bombrun, Y. Berthoumieu, and J. Manton. IEEE Trans Inf
        Theory, vol. 63, pp. 2153–2170, 2017.

    samples_r = _sample_parameter_r(n_samples=n_matrices,
    samples_U = _sample_parameter_U(n_samples=n_matrices,

    samples = np.zeros((n_matrices, n_dim, n_dim))
    for i in range(n_matrices):
        Ui = samples_U[i]
        ri = samples_r[i]
        samples[i] = Ui.T @ np.diag(np.exp(ri)) @ Ui
        samples[i] = 0.5 * (samples[i] + samples[i].T)

    return samples

def sample_gaussian_spd(n_matrices, mean, sigma, random_state=None,
                        n_jobs=1, sampling_method='auto'):
    """Sample a Riemannian Gaussian distribution.

    Sample SPD matrices from a Riemannian Gaussian distribution centered at
    mean and with dispersion parametrized by sigma. This distribution has been
    defined in [1]_ and generalizes the notion of a Gaussian distribution to
    the space of SPD matrices. The sampling is based on a spectral
    factorization of SPD matrices in terms of their eigenvectors (U-parameters)
    and the log of the eigenvalues (r-parameters).

    n_matrices : int
        How many matrices to generate.
    mean : ndarray, shape (n_dim, n_dim)
        Center of the Riemannian Gaussian distribution.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    n_jobs : int, default=1
        The number of jobs to use for the computation. This works by computing
        each of the class centroid in parallel. If -1 all CPUs are used.
    sampling_method : str, default='auto'
        Sampling method to sample eigenvalues. It can be
        'auto', 'slice' or 'rejection'. If it is 'auto', the sampling_method
        will be equal to 'slice' for n_dim != 2 and equal to
        'rejection' for n_dim = 2.

        .. versionadded:: 0.4

    samples : ndarray, shape (n_matrices, n_dim, n_dim)
        Samples of the Riemannian Gaussian distribution.

    .. versionadded:: 0.3

    .. [1] `Riemannian Gaussian distributions on the space of symmetric
        positive definite matrices
        S. Said, L. Bombrun, Y. Berthoumieu, and J. Manton. IEEE Trans Inf
        Theory, vol. 63, pp. 2153–2170, 2017.

    n_dim = mean.shape[0]
    # dispersion is corrected w.r.t. dimension
    samples_centered = _sample_gaussian_spd_centered(
        sigma=sigma / np.sqrt(n_dim),

    # apply the parallel transport to mean on each of the samples
    mean_sqrt = sqrtm(mean)
    samples = mean_sqrt @ samples_centered @ mean_sqrt

    if not is_spsd(samples):
        msg = "Some of the sampled matrices are very badly conditioned and \
               may not behave numerically as a SPD matrix. Try sampling again \
               or reducing the dimensionality of the matrix."

    return samples

def generate_random_spd_matrix(n_dim, random_state=None, *, mat_mean=.0,
    """Generate a random SPD matrix.

    n_dim : int
        Dimensionality of the matrix to sample.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    mat_mean : float, default=0.0
        Mean of random values to generate matrix.
    mat_std : float, default=1.0
        Standard deviation of random values to generate matrix.

    C : ndarray, shape (n_dim, n_dim)
        Random SPD matrix.

    .. versionadded:: 0.3

    if (n_dim <= 0) or (not isinstance(n_dim, int)):
        raise ValueError(
            f'n_samples must be a positive integer (Got {n_dim})')

    rs = check_random_state(random_state)
    A = mat_mean + mat_std * rs.randn(n_dim, n_dim)
    A = 0.5 * (A + A.T)
    C = expm(A)

    if not is_spsd(C):
        msg = "The sampled matrix is very badly conditioned and may not \
               behave numerically as a SPD matrix. Try sampling again or \
               reducing the dimensionality of the matrix."

    return C