sylvchev/mdla

View on GitHub
mdla/dict_metrics.py

Summary

Maintainability
B
5 hrs
Test Coverage
# -*- coding: utf-8 -*-
"""
The `dict_metrics` module implements utilities to compare
frames and dictionaries.

This module implements several criteria and metrics to compare different sets
of atoms. This module is primarily focused on multivariate kernels and
atoms.
"""

# Authors: Sylvain Chevallier <sylvain.chevallier@uvsq.fr>
# License: GPL v3

# TODO: add docstring to criteria fonction
#       verify Fubini-Study scale parameter
#       verify beta dist behavior, seems like 1-bd
#       change scale behavior, replace 1-d with d !

import cvxopt as co
import cvxopt.solvers as solv
import numpy as np
import scipy.linalg as sl
from numpy import (
    NaN,
    abs,
    all,
    arccos,
    arcsin,
    argmax,
    array,
    atleast_2d,
    concatenate,
    infty,
    max,
    min,
    ones,
    ones_like,
    sqrt,
    trace,
    unravel_index,
    zeros,
    zeros_like,
)
from numpy.linalg import det, norm, svd


def _kernel_registration(this_kernel, dictionary, g):
    k_len = this_kernel.shape[0]
    n_kernels = len(dictionary)
    k_max_len = array([i.shape[0] for i in dictionary]).max()

    m_dist = ones((n_kernels, k_max_len - k_len + 1)) * infty
    m_corr = zeros((n_kernels, k_max_len - k_len + 1))
    for i, kernel in enumerate(dictionary):  # kernel loop
        ks = kernel.shape[0]
        # for t in range(k_max_len-k_len+1): # convolution loop
        for t in range(ks - k_len + 1):  # convolution loop
            # print ("t = ", t, "and l =", l)
            # print ("kernel = ", kernel.shape,
            #        "and kernel[t:t+l,:] = ", kernel[t:t+k_len,:].shape)
            m_dist[i, t] = g(this_kernel, kernel[t : t + k_len, :])
            m_corr[i, t] = trace(this_kernel.T.dot(kernel[t : t + k_len, :])) / (
                norm(this_kernel, "fro") * norm(kernel[t : t + k_len, :], "fro")
            )
    return m_dist, m_corr


def principal_angles(A, B):
    """Compute the principal angles between subspaces A and B.

    The algorithm for computing the principal angles is described in :
    A. V. Knyazev and M. E. Argentati,
    Principal Angles between Subspaces in an A-Based Scalar Product:
    Algorithms and Perturbation Estimates. SIAM Journal on Scientific Computing,
    23 (2002), no. 6, 2009-2041.
    http://epubs.siam.org/sam-bin/dbq/article/37733
    """
    # eps = np.finfo(np.float64).eps**.981
    # for i in range(A.shape[1]):
    #     normi = la.norm(A[:,i],np.inf)
    #     if normi > eps: A[:,i] = A[:,i]/normi
    # for i in range(B.shape[1]):
    #     normi = la.norm(B[:,i],np.inf)
    #     if normi > eps: B[:,i] = B[:,i]/normi
    QA = sl.orth(A)
    QB = sl.orth(B)
    _, s, Zs = svd(QA.T.dot(QB), full_matrices=False)
    s = np.minimum(s, ones_like(s))
    theta = np.maximum(np.arccos(s), np.zeros_like(s))
    V = QB.dot(Zs)
    idxSmall = s > np.sqrt(2.0) / 2.0
    if np.any(idxSmall):
        RB = V[:, idxSmall]
        _, x, _ = svd(RB - QA.dot(QA.T.dot(RB)), full_matrices=False)
        thetaSmall = np.flipud(
            np.maximum(arcsin(np.minimum(x, ones_like(x))), zeros_like(x))
        )
        theta[idxSmall] = thetaSmall
    return theta


def chordal_principal_angles(A, B):
    """
    chordal_principal_angles(A, B) Compute the chordal distance based on
    principal angles.
    Compute the chordal distance based on principal angles between A and B
    as :math:`d=\sqrt{ \sum_i \sin^2 \theta_i}`
    """
    return sqrt(np.sum(np.sin(principal_angles(A, B)) ** 2))


def chordal(A, B):
    """
    chordal(A, B) Compute the chordal distance
    Compute the chordal distance between A and B
    as d=\sqrt{K - ||\bar{A}^T\bar{B}||_F^2}
    where K is the rank of A and B, || . ||_F is the Frobenius norm,
    \bar{A} is the orthogonal basis associated with A and the same goes for B.
    """
    if A.shape != B.shape:
        raise ValueError(
            f"Atoms have not the same dimension ({A.shape} and {B.shape}). Error raised"
            f"in chordal(A, B)",
        )

    if np.allclose(A, B):
        return 0.0
    else:
        d2 = A.shape[1] - norm(sl.orth(A).T.dot(sl.orth(B)), "fro") ** 2
        if d2 < 0.0:
            return sqrt(abs(d2))
        else:
            return sqrt(d2)


def fubini_study(A, B):
    """
    fubini_study(A, B) Compute the Fubini-Study distance
    Compute the Fubini-Study distance based on principal angles between A and B
    as d=\acos{ \prod_i \theta_i}
    """
    if A.shape != B.shape:
        raise ValueError(
            f"Atoms have different dim ({A.shape} and {B.shape}). Error raised in"
            f"fubini_study(A, B)",
        )
    if np.allclose(A, B):
        return 0.0
    return arccos(det(sl.orth(A).T.dot(sl.orth(B))))


def binet_cauchy(A, B):
    """Compute the Binet-Cauchy distance
    Compute the Binet-Cauchy distance based on principal angles between A
    and B with d=\sqrt{ 1 - \prod_i \cos^2 \theta_i}
    """
    theta = principal_angles(A, B)
    return sqrt(1.0 - np.prod(np.cos(theta) ** 2))


def geodesic(A, B):
    """
    geodesic (A, B) Compute the arc length or geodesic distance
    Compute the arc length or geodesic distance based on principal angles between A
    and B with d=\sqrt{ \sum_i \theta_i^2}
    """
    theta = principal_angles(A, B)
    return norm(theta)


def frobenius(A, B):
    if A.shape != B.shape:
        raise ValueError(
            f"Atoms have different dim ({A.shape} and {B.shape}). Error raised in"
            f"frobenius(A, B)",
        )
    return norm(A - B, "fro")


def abs_euclidean(A, B):
    if (A.ndim != 1 and A.shape[1] != 1) or (B.ndim != 1 and B.shape[1] != 1):
        raise ValueError(
            f"Atoms are not univariate ({A.shape} and {B.shape}). Error raised"
            f"in abs_euclidean(A, B)",
        )
    if np.allclose(A, B):
        return 0.0
    else:
        return sqrt(2.0 * (1.0 - np.abs(A.T.dot(B))))


def euclidean(A, B):
    if (A.ndim != 1 and A.shape[1] != 1) or (B.ndim != 1 and B.shape[1] != 1):
        raise ValueError(
            f"Atoms are not univariate ({A.shape} and {B.shape}). Error raised in"
            f"euclidean(A, B)",
        )
    if np.allclose(A, B):
        return 0.0
    else:
        return sqrt(2.0 * (1.0 - A.T.dot(B)))


def _valid_atom_metric(gdist):
    """Verify that atom metric exist and return the correct function"""
    if gdist == "chordal":
        return chordal
    elif gdist == "chordal_principal_angles":
        return chordal_principal_angles
    elif gdist == "fubinistudy":
        return fubini_study
    elif gdist == "binetcauchy":
        return binet_cauchy
    elif gdist == "geodesic":
        return geodesic
    elif gdist == "frobenius":
        return frobenius
    elif gdist == "abs_euclidean":
        return abs_euclidean
    elif gdist == "euclidean":
        return euclidean
    else:
        return None


def _scale_metric(gdist, d, D1):
    if (
        gdist == "chordal"
        or gdist == "chordal_principal_angles"
        or gdist == "fubinistudy"
        or gdist == "binetcauchy"
        or gdist == "geodesic"
    ):
        # TODO: scale with max n_features
        return d / sqrt(D1[0].shape[0])
    elif gdist == "frobenius":
        return d / sqrt(2.0)
    else:
        return d


def _compute_gdm(D1, D2, g):
    """Compute ground distance matrix from dictionaries D1 and D2

    Distance g acts as ground distance.
    A kernel registration is applied if dictionary atoms do not have
    the same size.
    """
    # Do we need a registration? If kernel do not have the same shape, yes
    if not all(array([i.shape[0] for i in D1 + D2]) == D1[0].shape[0]):
        # compute correlation and distance matrices
        k_dim = D1[0].shape[1]
        # minl = np.array([i.shape[1] for i in D1+D2]).min()
        max_l1 = array([i.shape[0] for i in D1]).max()
        max_l2 = array([i.shape[0] for i in D2]).max()
        if max_l2 > max_l1:
            Da = D1
            Db = D2
            max_l = max_l2
        else:
            Da = D2
            Db = D1
            max_l = max_l1
        # Set all Db atom to largest value
        Dbe = []
        for i in range(len(Db)):
            k_l = Db[i].shape[0]
            Dbe.append(concatenate((zeros((max_l - k_l, k_dim)), Db[i]), axis=0))
        gdm = zeros((len(Da), len(Db)))
        for i in range(len(Da)):
            m_dist, m_corr = _kernel_registration(Da[i], Dbe, g)
            k_l = Da[i].shape[0]
            # m_dist, m_corr = _kernel_registration(np.concatenate((zeros((np.int(np.floor((max_l-k_l)/2.)), k_dim)), Da[i], zeros((np.int(np.ceil((max_l-k_l)/2.)), k_dim))), axis=0), Dbe, g)
            for j in range(len(Dbe)):
                gdm[i, j] = m_dist[
                    j, unravel_index(abs(m_corr[j, :]).argmax(), m_corr[j, :].shape)
                ]
    else:
        # all atoms have the same length, no registration
        gdm = zeros((len(D1), len(D2)))
        for i in range(len(D1)):
            for j in range(len(D2)):
                gdm[i, j] = g(D1[i], D2[j])
    return gdm


def hausdorff(D1, D2, gdist, scale=False):
    """
    Compute the Hausdorff distance between two sets of elements, here
    dictionary atoms, using a ground distance.
    Possible choice are "chordal", "fubinistudy", "binetcauchy", "geodesic",
    "frobenius", "abs_euclidean" or "euclidean".
    The scale parameter changes the return value to be between 0 and 1.
    """
    g = _valid_atom_metric(gdist)
    if g is None:
        print("Unknown ground distance, exiting.")
        return NaN
    gdm = _compute_gdm(D1, D2, g)
    d = max([max(min(gdm, axis=0)), max(min(gdm, axis=1))])
    if not scale:
        return d
    else:
        return _scale_metric(gdist, d, D1)


def emd(D1, D2, gdist, scale=False):
    """
    Compute the Earth Mover's Distance (EMD) between two sets of elements,
    here dictionary atoms, using a ground distance.
    Possible choice are "chordal", "fubinistudy", "binetcauchy", "geodesic",
    "frobenius", "abs_euclidean" or "euclidean".
    The scale parameter changes the return value to be between 0 and 1.
    """
    g = _valid_atom_metric(gdist)
    if g is None:
        print("Unknown ground distance, exiting.")
        return NaN
    # if gdist == "chordal":
    #     g = chordal
    # elif gdist == "chordal_principal_angles":
    #     g = chordal_principal_angles
    # elif gdist == "fubinistudy":
    #     g = fubini_study
    # elif gdist == "binetcauchy":
    #     g = binet_cauchy
    # elif gdist == "geodesic":
    #     g = geodesic
    # elif gdist == "frobenius":
    #     g = frobenius
    # elif gdist == "abs_euclidean":
    #     g = abs_euclidean
    # elif gdist == "euclidean":
    #     g = euclidean
    # else:
    #     print 'Unknown ground distance, exiting.'
    #     return NaN

    # # Do we need a registration? If kernel do not have the same shape, yes
    # if not np.all(np.array([i.shape[0] for i in D1+D2]) == D1[0].shape[0]):
    #     # compute correlation and distance matrices
    #     k_dim = D1[0].shape[1]
    #     # minl = np.array([i.shape[1] for i in D1+D2]).min()
    #     max_l1 = np.array([i.shape[0] for i in D1]).max()
    #     max_l2 = np.array([i.shape[0] for i in D2]).max()
    #     if max_l2 > max_l1:
    #         Da = D1
    #         Db = D2
    #         max_l = max_l2
    #     else:
    #         Da = D2
    #         Db = D1
    #         max_l = max_l1
    #     Dbe = []
    #     for i in range(len(Db)):
    #         k_l = Db[i].shape[0]
    #         Dbe.append(np.concatenate((zeros((max_l-k_l, k_dim)), Db[i]), axis=0))
    #     gdm = zeros((len(Da), len(Db)))
    #     for i in range(len(Da)):
    #         k_l = Da[i].shape[0]
    #         m_dist, m_corr = _kernel_registration(np.concatenate((zeros(( np.int(np.floor((max_l-k_l)/2.)), k_dim)), Da[i], zeros((np.int(np.ceil((max_l-k_l)/2.)), k_dim))), axis=0), Dbe, g)
    #         for j in range(len(Dbe)):
    #             gdm[i,j] = m_dist[j, np.unravel_index(np.abs(m_corr[j,:]).argmax(), m_corr[j,:].shape)]
    # else:
    #     # all atoms have the same length, no registration
    #     gdm = np.zeros((len(D1), len(D2)))
    #     for i in range(len(D1)):
    #         for j in range(len(D2)):
    #             gdm[i,j] = g(D1[i], D2[j])
    gdm = _compute_gdm(D1, D2, g)

    c = co.matrix(gdm.flatten(order="F"))
    G1 = co.spmatrix([], [], [], (len(D1), len(D1) * len(D2)))
    G2 = co.spmatrix([], [], [], (len(D2), len(D1) * len(D2)))
    G3 = co.spmatrix(-1.0, range(len(D1) * len(D2)), range(len(D1) * len(D2)))
    for i in range(len(D1)):
        for j in range(len(D2)):
            k = j + (i * len(D2))
            G1[i, k] = 1.0
            G2[j, k] = 1.0
    G = co.sparse([G1, G2, G3])
    h1 = co.matrix(1.0 / len(D1), (len(D1), 1))
    h2 = co.matrix(1.0 / len(D2), (len(D2), 1))
    h3 = co.spmatrix([], [], [], (len(D1) * len(D2), 1))
    h = co.matrix([h1, h2, h3])
    A = co.matrix(1.0, (1, len(D1) * len(D2)))
    b = co.matrix([1.0])

    co.solvers.options["show_progress"] = False
    sol = solv.lp(c, G, h, A, b)
    d = sol["primal objective"]

    if not scale:
        return d
    else:
        return _scale_metric(gdist, d, D1)
    # if (gdist == "chordal" or gdist == "chordal_principal_angles" or
    #     gdist == "fubinistudy" or gdist == "binetcauchy" or
    #     gdist == "geodesic"):
    #     return d/sqrt(D1[0].shape[0])
    # elif gdist == "frobenius":
    #     return d/sqrt(2.)
    # else:
    #     return d


def _multivariate_correlation(s, D):
    """Compute correlation between multivariate atoms

    Compute the correlation between a multivariate atome s and dictionary D
    as the sum of the correlation in each n_dims dimensions.
    """
    n_features = s.shape[0]
    n_dims = s.shape[1]
    n_kernels = len(D)
    corr = np.zeros((n_kernels, n_features))
    for k in range(n_kernels):  # for all atoms
        corrTmp = 0
        for j in range(n_dims):  # for all dimensions
            corrTmp += np.correlate(s[:, j], D[k][:, j])
        corr[k, : len(corrTmp)] = corrTmp
    return corr


def detection_rate(ref, recov, threshold):
    """Compute the detection rate between reference and recovered dictionaries

    The reference ref and the recovered recov are univariate or multivariate
    dictionaries. An atom a of the ref dictionary is considered as recovered if
    $c < threshold$ with $c = argmax_{r \in R} |<a, r>|$, that is the absolute
    value of the maximum correlation between a and any atom r of the recovered
    dictionary R is above a given threshold.
    The process is iterative and an atom r could be matched only once with an
    atom a of the reference dictionary. In other word, each atom a is matched
    with a different atom r.
    """
    n_kernels_ref, n_kernels_recov = len(ref), len(recov)
    n_features = ref[0].shape[0]
    if ref[0].ndim == 1:
        n_dims = 1
        for k in range(n_kernels_ref):
            ref[k] = atleast_2d(ref[k]).T
    else:
        n_dims = ref[0].shape[1]
    if recov[0].ndim == 1:
        for k in range(n_kernels_recov):
            recov[k] = atleast_2d(recov[k]).T
    dr = 0
    corr = zeros((n_kernels_ref, n_kernels_recov))
    for k in range(n_kernels_ref):
        c_tmp = _multivariate_correlation(
            concatenate(
                (zeros((n_features, n_dims)), ref[k], zeros((n_features, n_dims))), axis=0
            ),
            recov,
        )
        for j in range(n_kernels_recov):
            idx_max = argmax(abs(c_tmp[j, :]))
            corr[k, j] = c_tmp[j, idx_max]
    c_local = np.abs(corr.copy())
    for _ in range(n_kernels_ref):
        max_corr = c_local.max()
        if max_corr >= threshold:
            dr += 1
        idx_max = np.unravel_index(c_local.argmax(), c_local.shape)
        c_local[:, idx_max[1]] = zeros(n_kernels_ref)
        c_local[idx_max[0], :] = zeros(n_kernels_recov)
    return float(dr) / n_kernels_recov * 100.0


def _convert_array(ref, recov):
    if ref[0].ndim == 1:
        for k in range(len(ref)):
            ref[k] = atleast_2d(ref[k]).T
    if recov[0].ndim == 1:
        for k in range(len(recov)):
            recov[k] = atleast_2d(recov[k]).T
    D1 = np.array(ref)
    D2 = np.array(recov)
    M = D1.shape[0]
    N = D1.shape[1]
    D1 = D1.reshape((M, N))
    D2 = D2.reshape((M, N))
    return D1, D2, M


def precision_recall(ref, recov, threshold):
    """Compute precision and recall for recovery experiment"""
    D1, D2, M = _convert_array(ref, recov)
    corr = D1.dot(D2.T)
    precision = float((np.max(corr, axis=0) > threshold).sum()) / float(M)
    recall = float((np.max(corr, axis=1) > threshold).sum()) / float(M)
    return precision * 100.0, recall * 100.0


def precision_recall_points(ref, recov):
    """Compute the precision and recall for each atom in a recovery experiment"""
    # if ref[0].ndim == 1:
    #     for k in range(len(ref)):
    #         ref[k] = atleast_2d(ref[k]).T
    # if recov[0].ndim == 1:
    #     for k in range(len(recov)):
    #         recov[k] = atleast_2d(recov[k]).T
    # D1 = np.array(ref)
    # D2 = np.array(recov)
    # M = D1.shape[0]
    # N = D1.shape[1]
    # D1 = D1.reshape((M, N))
    # D2 = D2.reshape((M, N))
    D1, D2, _ = _convert_array(ref, recov)
    corr = D1.dot(D2.T)
    precision = np.max(corr, axis=0)
    recall = np.max(corr, axis=1)
    return precision, recall


def beta_dist(D1, D2):
    """Compute the Beta-distance proposed by Skretting and Engan

    The beta-distance is:
    $\beta(D1, D2)=1/(M1+M2)(\sum_j \beta(D1, d^2_j)+\sum_j \beta(D2, d^1_j))$
    with $\beta(D, x) = arccos(\max_i |d^T_i x|/||x||)$
    as proposed in:
    Karl Skretting and Kjersti Engan,
    Learned dictionaries for sparse image representation: properties and results,
    SPIE, 2011.
    """
    if D1[0].shape != D2[0].shape:
        raise ValueError(
            f"Dictionaries have different dim : {D1[0].shape} and {D2[0].shape}."
        )
    D1 = np.array(D1)
    M1 = D1.shape[0]
    N = D1.shape[1]
    D1 = D1.reshape((M1, N))
    D2 = np.array(D2)
    M2 = D2.shape[0]
    D2 = D2.reshape((M2, N))
    corr = D1.dot(D2.T)
    if np.allclose(np.max(corr, axis=0), ones(M2)) and np.allclose(
        np.max(corr, axis=1), ones(M1)
    ):
        return 0.0
    return (
        np.sum(np.arccos(np.max(corr, axis=0))) + np.sum(np.arccos(np.max(corr, axis=1)))
    ) / (M1 + M2)