alexandrebarachant/pyRiemann

View on GitHub
pyriemann/datasets/simulated.py

Summary

Maintainability
C
1 day
Test Coverage
import numpy as np
from sklearn.utils.validation import check_random_state

from ..utils.mean import mean_riemann
from ..utils.distance import distance_riemann
from ..utils.base import invsqrtm, powm, sqrtm, expm
from .sampling import generate_random_spd_matrix, sample_gaussian_spd
from ..transfer import encode_domains


def make_matrices(n_matrices, n_dim, kind, rs=None, return_params=False,
                  evals_low=0.5, evals_high=2.0, eigvecs_same=False):
    """Generate a set of matrices, with specific properties.

    Parameters
    ----------
    n_matrices : int
        Number of matrices to generate.
    n_dim : int
        Dimension of square matrices to generate.
    kind : {'real', 'comp', 'spd', 'spsd', 'hpd', 'hpsd'}
        Kind of matrices to generate:

        - 'real' for real-valued matrices;
        - 'comp' for complex-valued matrices;
        - 'spd' for symmetric positive-definite matrices;
        - 'spsd' for symmetric positive semi-definite matrices;
        - 'hpd' for Hermitian positive-definite matrices;
        - 'hpsd' for Hermitian positive semi-definite matrices.
    rs : RandomState instance, default=None
        Random state for reproducible output across multiple function calls.
    return_params : bool, default=False
        If True, then returns evals and evecs for 'spd', 'spsd', 'hpd' and
        'hpsd'.
    evals_low : float, default=0.5
        Lowest value of the uniform distribution to draw eigen values.
    evals_high : float, default=2.0
        Highest value of the uniform distribution to draw eigen values.
    eigvecs_same : bool, default False
        If True, then uses the same eigen vectors for all matrices.

    Returns
    -------
    mats : ndarray, shape (n_matrices, n_dim, n_dim)
        Generated matrices.
    evals : ndarray, shape (n_matrices, n_dim)
        Eigen values used for 'spd', 'spsd', 'hpd' and 'hpsd'.
        Only returned if ``return_params=True``.
    evecs : ndarray, shape (n_matrices, n_dim, n_dim) or (n_dim, n_dim)
        Eigen vectors used for 'spd', 'spsd', 'hpd' and 'hpsd'.
        Only returned if ``return_params=True``.

    Notes
    -----
    .. versionadded:: 0.5
    """
    if kind not in ("real", "comp", "spd", "spsd", "hpd", "hpsd"):
        raise ValueError(f"Unsupported matrix kind: {kind}")

    rs = check_random_state(rs)
    X = rs.randn(n_matrices, n_dim, n_dim)
    if kind == "real":
        return X

    if kind in ("comp", "hpd", "hpsd"):
        X = X + 1j * rs.randn(n_matrices, n_dim, n_dim)
        if kind == "comp":
            return X

    # eigen values
    if evals_low <= 0.0:
        raise ValueError(
            f"Lowest value must be strictly positive (Got {evals_low}).")
    if evals_high <= evals_low:
        raise ValueError(
            "Highest value must be superior to lowest value "
            f"(Got {evals_high} and {evals_low}).")
    evals = rs.uniform(evals_low, evals_high, size=(n_matrices, n_dim))
    if kind in ("spsd", "hpsd"):
        evals[..., -1] = 1e-10  # last eigen value set to almost zero

    # eigen vectors
    if eigvecs_same:
        X = X[0]
    if np.__version__ < '1.22.0' and X.ndim > 2:
        evecs = np.array([np.linalg.qr(x)[0] for x in X])
    else:
        evecs = np.linalg.qr(X)[0]

    # conjugation
    if eigvecs_same:
        mats = np.empty((n_matrices, n_dim, n_dim), dtype=X.dtype)
        for i in range(n_matrices):
            mats[i] = (evecs * evals[i]) @ evecs.conj().T
    else:
        mats = (evecs * evals[:, np.newaxis, :]) @ np.swapaxes(evecs.conj(),
                                                               -2, -1)

    if return_params:
        return mats, evals, evecs
    else:
        return mats


def make_masks(n_masks, n_dim0, n_dim1_min, rs=None):
    """Generate a set of masks, defined as semi-orthogonal matrices.

    Parameters
    ----------
    n_masks : int
        Number of masks to generate.
    n_dim0 : int
        First dimension of masks.
    n_dim1_min : int
        Minimal value for second dimension of masks.
    rs : RandomState instance, default=None
        Random state for reproducible output across multiple function calls.

    Returns
    -------
    masks : list of n_masks ndarray of shape (n_dim0, n_dim1_i), \
            with different n_dim1_i, such that n_dim1_min <= n_dim1_i <= n_dim0
        Masks.

    Notes
    -----
    .. versionadded:: 0.3
    """
    rs = check_random_state(rs)

    masks = []
    for _ in range(n_masks):
        n_dim1 = rs.randint(n_dim1_min, n_dim0, size=1)[0]
        mask, _ = np.linalg.qr(rs.randn(n_dim0, n_dim1))
        masks.append(mask)
    return masks


def make_gaussian_blobs(n_matrices=100, n_dim=2, class_sep=1.0, class_disp=1.0,
                        return_centers=False, center_dataset=False,
                        random_state=None, centers=None, *, n_jobs=1,
                        sampling_method='auto'):
    """Generate SPD dataset with two classes sampled from Riemannian Gaussian.

    Generate a dataset with SPD matrices drawn from two Riemannian Gaussian
    distributions. The distributions have the same class dispersions and the
    distance between their centers of mass is an input parameter. Useful for
    testing classification or clustering methods.

    Parameters
    ----------
    n_matrices : int, default=100
        How many matrices to generate for each class.
    n_dim : int, default=2
        Dimensionality of the SPD matrices generated by the distributions.
    class_sep : float, default=1.0
        Parameter controlling the separability of the classes.
    class_disp : float, default=1.0
        Intra dispersion of the points sampled from each class.
    centers : ndarray, shape (2, n_dim, n_dim), default=None
        List with the centers of mass for each class. If None, the centers are
        sampled randomly based on class_sep.
    return_centers : bool, default=False
        If True, then return the centers of each cluster
    center_dataset : bool, default=False
        If True, re-center the simulated dataset to the Identity. If False,
        the dataset is centered around a random SPD matrix.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.
    n_jobs : int, default=1
        The number of jobs to use for the computation. This works by computing
        each of the class centroid in parallel. If -1 all CPUs are used.
    sampling_method : str, default='auto'
        Name of the sampling method used to sample samples_r. It can be
        'auto', 'slice' or 'rejection'. If it is 'auto', the sampling_method
        will be equal to 'slice' for n_dim != 2 and equal to
        'rejection' for n_dim = 2.

        .. versionadded:: 0.4

    Returns
    -------
    X : ndarray, shape (2*n_matrices, n_dim, n_dim)
        Set of SPD matrices.
    y : ndarray, shape (2*n_matrices,)
        Labels corresponding to each matrix.
    centers : ndarray, shape (2, n_dim, n_dim)
        The centers of each class. Only returned if ``return_centers=True``.

    Notes
    -----
    .. versionadded:: 0.3

    """
    if not isinstance(class_sep, float):
        raise ValueError(f"class_sep must be a float (Got {class_sep})")

    rs = check_random_state(random_state)
    seeds = rs.randint(100, size=2)

    if centers is None:
        C0_in = np.eye(n_dim)  # first class mean at Identity at first
        Pv = rs.randn(n_dim, n_dim)  # create random tangent vector
        Pv = (Pv + Pv.T)/2   # symmetrize
        Pv = Pv / np.linalg.norm(Pv)  # normalize
        P = expm(Pv)  # take it back to the SPD manifold
        C1_in = powm(P, alpha=class_sep)  # control distance to Identity

    else:
        C0_in, C1_in = centers

    # sample data points from class 0
    X0 = sample_gaussian_spd(
        n_matrices=n_matrices,
        mean=C0_in,
        sigma=class_disp,
        random_state=seeds[0],
        n_jobs=n_jobs,
        sampling_method=sampling_method
    )
    y0 = np.zeros(n_matrices)

    # sample data points from class 1
    X1 = sample_gaussian_spd(
        n_matrices=n_matrices,
        mean=C1_in,
        sigma=class_disp,
        random_state=seeds[1],
        n_jobs=n_jobs,
        sampling_method=sampling_method
    )

    y1 = np.ones(n_matrices)

    # concatenate the samples
    X = np.concatenate([X0, X1])

    # re-center the dataset to the Identity
    M = mean_riemann(X)
    M_invsqrt = invsqrtm(M)
    X = M_invsqrt @ X @ M_invsqrt

    if not center_dataset:
        # center the dataset to a random SPD matrix
        M = generate_random_spd_matrix(n_dim=n_dim, random_state=rs)
        M_sqrt = sqrtm(M)
        X = M_sqrt @ X @ M_sqrt

    # concatenate the labels for each class
    y = np.concatenate([y0, y1]).astype(int)

    # randomly permute the samples of the dataset
    idx = rs.permutation(len(X))
    X, y = X[idx], y[idx]

    if return_centers:
        if centers is None:
            C0_out = mean_riemann(X[y == 0])
            C1_out = mean_riemann(X[y == 1])
        else:
            C0_out = C0_in
            C1_out = C1_in
        centers = np.stack([C0_out, C1_out])
        return X, y, centers
    else:
        return X, y


def make_outliers(n_matrices, mean, sigma, outlier_coeff=10,
                  random_state=None):
    """Generate a set of outlier points.

    Simulate data points that are outliers for a given Riemannian Gaussian
    distribution with fixed mean and dispersion.

    Parameters
    ----------
    n_matrices : int
        How many matrices to generate.
    mean : ndarray, shape (n_dim, n_dim)
        Center of the Riemannian Gaussian distribution.
    sigma : float
        Dispersion of the Riemannian Gaussian distribution.
    outlier_coeff: float, default=10
        Coefficient determining how to define an outlier data point, i.e. how
        many times the sigma parameter its distance to the mean should be.
    random_state : int, RandomState instance or None, default=None
        Pass an int for reproducible output across multiple function calls.

    Returns
    -------
    outliers : ndarray, shape (n_matrices, n_dim, n_dim)
        Set of simulated outlier matrix.

    Notes
    -----
    .. versionadded:: 0.3
    """

    n_dim = mean.shape[1]
    mean_sqrt = sqrtm(mean)

    outliers = np.zeros((n_matrices, n_dim, n_dim))
    for i in range(n_matrices):
        Oi = generate_random_spd_matrix(n_dim=n_dim, random_state=random_state)
        epsilon_num = outlier_coeff * sigma * n_dim
        epsilon_den = distance_riemann(Oi, np.eye(n_dim), squared=True)
        epsilon = np.sqrt(epsilon_num / epsilon_den)
        outliers[i] = mean_sqrt @ powm(Oi, epsilon) @ mean_sqrt

    return outliers


def make_classification_transfer(n_matrices, class_sep=3.0, class_disp=1.0,
                                 domain_sep=5.0, theta=0.0, stretch=1.0,
                                 random_state=None, class_names=[1, 2]):
    """Generate source and target toy datasets for transfer learning examples.

    Generate a dataset with 2x2 SPD matrices drawn from two Riemannian Gaussian
    distributions. The distributions have the same class dispersions and the
    distance between their centers of mass is an input parameter. We can
    stretch the target dataset and control a rotation matrix that maps the
    source to the target domains. This function is useful for testing
    classification or clustering methods on transfer learning applications.

    Parameters
    ----------
    n_matrices : int, default=100
        How many 2x2 matrices to generate for each class on each domain.
    class_sep : float, default=3.0
        Distance between the centers of the two classes.
    class_disp : float, default=1.0
        Dispersion of the data points to be sampled on each class.
    domain_sep : float, default=5.0
        Distance between the global means of each source and target datasets.
    theta : float, default=0.0
        Angle of the 2x2 rotation matrix from source to target dataset.
    stretch : float, default=1.0
        Factor to stretch the data points in target dataset. Note that when it
        is != 1.0 the class dispersions in target domain will be different than
        those in source domain (fixed at class_disp).
    random_state : None | int | RandomState instance, default=None
        Pass an int for reproducible output across multiple function calls.
    class_names : list, default=[1, 2]
        Names of classes.

    Returns
    -------
    X_enc : ndarray, shape (4*n_matrices, 2, 2)
        Set of SPD matrices.
    y_enc : ndarray, shape (4*n_matrices,)
        Extended labels for each data point.

    Notes
    -----
    .. versionadded:: 0.4
    """

    rs = check_random_state(random_state)
    seeds = rs.randint(100, size=4)

    # the examples considered here are always for 2x2 matrices
    n_dim = 2
    if len(class_names) != n_dim:
        raise ValueError("class_names must contain 2 elements")

    # create a source dataset with two classes and global mean at identity
    M1_source = np.eye(n_dim)  # first class mean at Identity at first
    X1_source = sample_gaussian_spd(
        n_matrices=n_matrices,
        mean=M1_source,
        sigma=class_disp,
        random_state=seeds[0])
    y1_source = [class_names[0]] * n_matrices
    Pv = rs.randn(n_dim, n_dim)  # create random tangent vector
    Pv = (Pv + Pv.T)/2  # symmetrize
    Pv /= np.linalg.norm(Pv)  # normalize
    P = expm(Pv)  # take it back to the SPD manifold
    M2_source = powm(P, alpha=class_sep)  # control distance to identity
    X2_source = sample_gaussian_spd(
        n_matrices=n_matrices,
        mean=M2_source,
        sigma=class_disp,
        random_state=seeds[1])
    y2_source = [class_names[1]] * n_matrices
    X_source = np.concatenate([X1_source, X2_source])
    M_source = mean_riemann(X_source)
    M_source_invsqrt = invsqrtm(M_source)
    # center the dataset to Identity
    X_source = M_source_invsqrt @ X_source @ M_source_invsqrt
    y_source = np.concatenate([y1_source, y2_source])

    # create target dataset based on the source dataset
    X1_target = sample_gaussian_spd(
        n_matrices=n_matrices,
        mean=M1_source,
        sigma=class_disp,
        random_state=seeds[2])
    X2_target = sample_gaussian_spd(
        n_matrices=n_matrices,
        mean=M2_source,
        sigma=class_disp,
        random_state=seeds[3])
    X_target = np.concatenate([X1_target, X2_target])
    M_target = mean_riemann(X_target)
    M_target_invsqrt = invsqrtm(M_target)
    # center the dataset to Identity
    X_target = M_target_invsqrt @ X_target @ M_target_invsqrt
    y_target = np.copy(y_source)

    # stretch the data points in target domain if needed
    if stretch != 1.0:
        X_target = powm(X_target, alpha=stretch)

    # move the points in X_target with a random matrix A = P * Q

    # create SPD matrix for the translation between domains
    Pv = rs.randn(n_dim, n_dim)  # create random tangent vector
    Pv = (Pv + Pv.T)/2  # symmetrize
    Pv /= np.linalg.norm(Pv)  # normalize
    P = expm(Pv)  # take it to the manifold
    P = powm(P, alpha=domain_sep)  # control distance to identity
    P = sqrtm(P)  # transport matrix

    # create orthogonal matrix for the rotation part
    Q = np.array([[np.cos(theta), -np.sin(theta)],
                  [np.sin(theta), np.cos(theta)]])

    # transform the data points from the target domain
    A = P @ Q
    X_target = A @ X_target @ A.T

    # create array specifying the domain for each epoch
    domains = np.array(
        len(X_source)*['source_domain'] + len(X_target)*['target_domain']
    )

    # encode the labels and domains together
    X = np.concatenate([X_source, X_target])
    y = np.concatenate([y_source, y_target])
    X_enc, y_enc = encode_domains(X, y, domains)

    return X_enc, y_enc