# -*- coding: utf-8 -*-
The `dict_metrics` module implements utilities to compare
frames and dictionaries.

This module implements several criteria and metrics to compare different sets
of atoms. This module is primarily focused on multivariate kernels and

# Authors: Sylvain Chevallier <>
# License: GPL v3

# TODO: add docstring to criteria fonction
#       verify Fubini-Study scale parameter
#       verify beta dist behavior, seems like 1-bd
#       change scale behavior, replace 1-d with d !

import cvxopt as co
import cvxopt.solvers as solv
import numpy as np
import scipy.linalg as sl
from numpy import (
from numpy.linalg import det, norm, svd

def _kernel_registration(this_kernel, dictionary, g):
    k_len = this_kernel.shape[0]
    n_kernels = len(dictionary)
    k_max_len = array([i.shape[0] for i in dictionary]).max()

    m_dist = ones((n_kernels, k_max_len - k_len + 1)) * infty
    m_corr = zeros((n_kernels, k_max_len - k_len + 1))
    for i, kernel in enumerate(dictionary):  # kernel loop
        ks = kernel.shape[0]
        # for t in range(k_max_len-k_len+1): # convolution loop
        for t in range(ks - k_len + 1):  # convolution loop
            # print ("t = ", t, "and l =", l)
            # print ("kernel = ", kernel.shape,
            #        "and kernel[t:t+l,:] = ", kernel[t:t+k_len,:].shape)
            m_dist[i, t] = g(this_kernel, kernel[t : t + k_len, :])
            m_corr[i, t] = trace([t : t + k_len, :])) / (
                norm(this_kernel, "fro") * norm(kernel[t : t + k_len, :], "fro")
    return m_dist, m_corr

def principal_angles(A, B):
    """Compute the principal angles between subspaces A and B.

    The algorithm for computing the principal angles is described in :
    A. V. Knyazev and M. E. Argentati,
    Principal Angles between Subspaces in an A-Based Scalar Product:
    Algorithms and Perturbation Estimates. SIAM Journal on Scientific Computing,
    23 (2002), no. 6, 2009-2041.
    # eps = np.finfo(np.float64).eps**.981
    # for i in range(A.shape[1]):
    #     normi = la.norm(A[:,i],np.inf)
    #     if normi > eps: A[:,i] = A[:,i]/normi
    # for i in range(B.shape[1]):
    #     normi = la.norm(B[:,i],np.inf)
    #     if normi > eps: B[:,i] = B[:,i]/normi
    QA = sl.orth(A)
    QB = sl.orth(B)
    _, s, Zs = svd(, full_matrices=False)
    s = np.minimum(s, ones_like(s))
    theta = np.maximum(np.arccos(s), np.zeros_like(s))
    V =
    idxSmall = s > np.sqrt(2.0) / 2.0
    if np.any(idxSmall):
        RB = V[:, idxSmall]
        _, x, _ = svd(RB -, full_matrices=False)
        thetaSmall = np.flipud(
            np.maximum(arcsin(np.minimum(x, ones_like(x))), zeros_like(x))
        theta[idxSmall] = thetaSmall
    return theta

def chordal_principal_angles(A, B):
    chordal_principal_angles(A, B) Compute the chordal distance based on
    principal angles.
    Compute the chordal distance based on principal angles between A and B
    as :math:`d=\sqrt{ \sum_i \sin^2 \theta_i}`
    return sqrt(np.sum(np.sin(principal_angles(A, B)) ** 2))

def chordal(A, B):
    chordal(A, B) Compute the chordal distance
    Compute the chordal distance between A and B
    as d=\sqrt{K - ||\bar{A}^T\bar{B}||_F^2}
    where K is the rank of A and B, || . ||_F is the Frobenius norm,
    \bar{A} is the orthogonal basis associated with A and the same goes for B.
    if A.shape != B.shape:
        raise ValueError(
            f"Atoms have not the same dimension ({A.shape} and {B.shape}). Error raised"
            f"in chordal(A, B)",

    if np.allclose(A, B):
        return 0.0
        d2 = A.shape[1] - norm(sl.orth(A), "fro") ** 2
        if d2 < 0.0:
            return sqrt(abs(d2))
            return sqrt(d2)

def fubini_study(A, B):
    fubini_study(A, B) Compute the Fubini-Study distance
    Compute the Fubini-Study distance based on principal angles between A and B
    as d=\acos{ \prod_i \theta_i}
    if A.shape != B.shape:
        raise ValueError(
            f"Atoms have different dim ({A.shape} and {B.shape}). Error raised in"
            f"fubini_study(A, B)",
    if np.allclose(A, B):
        return 0.0
    return arccos(det(sl.orth(A)

def binet_cauchy(A, B):
    """Compute the Binet-Cauchy distance
    Compute the Binet-Cauchy distance based on principal angles between A
    and B with d=\sqrt{ 1 - \prod_i \cos^2 \theta_i}
    theta = principal_angles(A, B)
    return sqrt(1.0 - ** 2))

def geodesic(A, B):
    geodesic (A, B) Compute the arc length or geodesic distance
    Compute the arc length or geodesic distance based on principal angles between A
    and B with d=\sqrt{ \sum_i \theta_i^2}
    theta = principal_angles(A, B)
    return norm(theta)

def frobenius(A, B):
    if A.shape != B.shape:
        raise ValueError(
            f"Atoms have different dim ({A.shape} and {B.shape}). Error raised in"
            f"frobenius(A, B)",
    return norm(A - B, "fro")

def abs_euclidean(A, B):
    if (A.ndim != 1 and A.shape[1] != 1) or (B.ndim != 1 and B.shape[1] != 1):
        raise ValueError(
            f"Atoms are not univariate ({A.shape} and {B.shape}). Error raised"
            f"in abs_euclidean(A, B)",
    if np.allclose(A, B):
        return 0.0
        return sqrt(2.0 * (1.0 - np.abs(

def euclidean(A, B):
    if (A.ndim != 1 and A.shape[1] != 1) or (B.ndim != 1 and B.shape[1] != 1):
        raise ValueError(
            f"Atoms are not univariate ({A.shape} and {B.shape}). Error raised in"
            f"euclidean(A, B)",
    if np.allclose(A, B):
        return 0.0
        return sqrt(2.0 * (1.0 -

def _valid_atom_metric(gdist):
    """Verify that atom metric exist and return the correct function"""
    if gdist == "chordal":
        return chordal
    elif gdist == "chordal_principal_angles":
        return chordal_principal_angles
    elif gdist == "fubinistudy":
        return fubini_study
    elif gdist == "binetcauchy":
        return binet_cauchy
    elif gdist == "geodesic":
        return geodesic
    elif gdist == "frobenius":
        return frobenius
    elif gdist == "abs_euclidean":
        return abs_euclidean
    elif gdist == "euclidean":
        return euclidean
        return None

def _scale_metric(gdist, d, D1):
    if (
        gdist == "chordal"
        or gdist == "chordal_principal_angles"
        or gdist == "fubinistudy"
        or gdist == "binetcauchy"
        or gdist == "geodesic"
        # TODO: scale with max n_features
        return d / sqrt(D1[0].shape[0])
    elif gdist == "frobenius":
        return d / sqrt(2.0)
        return d

def _compute_gdm(D1, D2, g):
    """Compute ground distance matrix from dictionaries D1 and D2

    Distance g acts as ground distance.
    A kernel registration is applied if dictionary atoms do not have
    the same size.
    # Do we need a registration? If kernel do not have the same shape, yes
    if not all(array([i.shape[0] for i in D1 + D2]) == D1[0].shape[0]):
        # compute correlation and distance matrices
        k_dim = D1[0].shape[1]
        # minl = np.array([i.shape[1] for i in D1+D2]).min()
        max_l1 = array([i.shape[0] for i in D1]).max()
        max_l2 = array([i.shape[0] for i in D2]).max()
        if max_l2 > max_l1:
            Da = D1
            Db = D2
            max_l = max_l2
            Da = D2
            Db = D1
            max_l = max_l1
        # Set all Db atom to largest value
        Dbe = []
        for i in range(len(Db)):
            k_l = Db[i].shape[0]
            Dbe.append(concatenate((zeros((max_l - k_l, k_dim)), Db[i]), axis=0))
        gdm = zeros((len(Da), len(Db)))
        for i in range(len(Da)):
            m_dist, m_corr = _kernel_registration(Da[i], Dbe, g)
            k_l = Da[i].shape[0]
            # m_dist, m_corr = _kernel_registration(np.concatenate((zeros((, k_dim)), Da[i], zeros((, k_dim))), axis=0), Dbe, g)
            for j in range(len(Dbe)):
                gdm[i, j] = m_dist[
                    j, unravel_index(abs(m_corr[j, :]).argmax(), m_corr[j, :].shape)
        # all atoms have the same length, no registration
        gdm = zeros((len(D1), len(D2)))
        for i in range(len(D1)):
            for j in range(len(D2)):
                gdm[i, j] = g(D1[i], D2[j])
    return gdm

def hausdorff(D1, D2, gdist, scale=False):
    Compute the Hausdorff distance between two sets of elements, here
    dictionary atoms, using a ground distance.
    Possible choice are "chordal", "fubinistudy", "binetcauchy", "geodesic",
    "frobenius", "abs_euclidean" or "euclidean".
    The scale parameter changes the return value to be between 0 and 1.
    g = _valid_atom_metric(gdist)
    if g is None:
        print("Unknown ground distance, exiting.")
        return NaN
    gdm = _compute_gdm(D1, D2, g)
    d = max([max(min(gdm, axis=0)), max(min(gdm, axis=1))])
    if not scale:
        return d
        return _scale_metric(gdist, d, D1)

def emd(D1, D2, gdist, scale=False):
    Compute the Earth Mover's Distance (EMD) between two sets of elements,
    here dictionary atoms, using a ground distance.
    Possible choice are "chordal", "fubinistudy", "binetcauchy", "geodesic",
    "frobenius", "abs_euclidean" or "euclidean".
    The scale parameter changes the return value to be between 0 and 1.
    g = _valid_atom_metric(gdist)
    if g is None:
        print("Unknown ground distance, exiting.")
        return NaN
    # if gdist == "chordal":
    #     g = chordal
    # elif gdist == "chordal_principal_angles":
    #     g = chordal_principal_angles
    # elif gdist == "fubinistudy":
    #     g = fubini_study
    # elif gdist == "binetcauchy":
    #     g = binet_cauchy
    # elif gdist == "geodesic":
    #     g = geodesic
    # elif gdist == "frobenius":
    #     g = frobenius
    # elif gdist == "abs_euclidean":
    #     g = abs_euclidean
    # elif gdist == "euclidean":
    #     g = euclidean
    # else:
    #     print 'Unknown ground distance, exiting.'
    #     return NaN

    # # Do we need a registration? If kernel do not have the same shape, yes
    # if not np.all(np.array([i.shape[0] for i in D1+D2]) == D1[0].shape[0]):
    #     # compute correlation and distance matrices
    #     k_dim = D1[0].shape[1]
    #     # minl = np.array([i.shape[1] for i in D1+D2]).min()
    #     max_l1 = np.array([i.shape[0] for i in D1]).max()
    #     max_l2 = np.array([i.shape[0] for i in D2]).max()
    #     if max_l2 > max_l1:
    #         Da = D1
    #         Db = D2
    #         max_l = max_l2
    #     else:
    #         Da = D2
    #         Db = D1
    #         max_l = max_l1
    #     Dbe = []
    #     for i in range(len(Db)):
    #         k_l = Db[i].shape[0]
    #         Dbe.append(np.concatenate((zeros((max_l-k_l, k_dim)), Db[i]), axis=0))
    #     gdm = zeros((len(Da), len(Db)))
    #     for i in range(len(Da)):
    #         k_l = Da[i].shape[0]
    #         m_dist, m_corr = _kernel_registration(np.concatenate((zeros((, k_dim)), Da[i], zeros((, k_dim))), axis=0), Dbe, g)
    #         for j in range(len(Dbe)):
    #             gdm[i,j] = m_dist[j, np.unravel_index(np.abs(m_corr[j,:]).argmax(), m_corr[j,:].shape)]
    # else:
    #     # all atoms have the same length, no registration
    #     gdm = np.zeros((len(D1), len(D2)))
    #     for i in range(len(D1)):
    #         for j in range(len(D2)):
    #             gdm[i,j] = g(D1[i], D2[j])
    gdm = _compute_gdm(D1, D2, g)

    c = co.matrix(gdm.flatten(order="F"))
    G1 = co.spmatrix([], [], [], (len(D1), len(D1) * len(D2)))
    G2 = co.spmatrix([], [], [], (len(D2), len(D1) * len(D2)))
    G3 = co.spmatrix(-1.0, range(len(D1) * len(D2)), range(len(D1) * len(D2)))
    for i in range(len(D1)):
        for j in range(len(D2)):
            k = j + (i * len(D2))
            G1[i, k] = 1.0
            G2[j, k] = 1.0
    G = co.sparse([G1, G2, G3])
    h1 = co.matrix(1.0 / len(D1), (len(D1), 1))
    h2 = co.matrix(1.0 / len(D2), (len(D2), 1))
    h3 = co.spmatrix([], [], [], (len(D1) * len(D2), 1))
    h = co.matrix([h1, h2, h3])
    A = co.matrix(1.0, (1, len(D1) * len(D2)))
    b = co.matrix([1.0])

    co.solvers.options["show_progress"] = False
    sol = solv.lp(c, G, h, A, b)
    d = sol["primal objective"]

    if not scale:
        return d
        return _scale_metric(gdist, d, D1)
    # if (gdist == "chordal" or gdist == "chordal_principal_angles" or
    #     gdist == "fubinistudy" or gdist == "binetcauchy" or
    #     gdist == "geodesic"):
    #     return d/sqrt(D1[0].shape[0])
    # elif gdist == "frobenius":
    #     return d/sqrt(2.)
    # else:
    #     return d

def _multivariate_correlation(s, D):
    """Compute correlation between multivariate atoms

    Compute the correlation between a multivariate atome s and dictionary D
    as the sum of the correlation in each n_dims dimensions.
    n_features = s.shape[0]
    n_dims = s.shape[1]
    n_kernels = len(D)
    corr = np.zeros((n_kernels, n_features))
    for k in range(n_kernels):  # for all atoms
        corrTmp = 0
        for j in range(n_dims):  # for all dimensions
            corrTmp += np.correlate(s[:, j], D[k][:, j])
        corr[k, : len(corrTmp)] = corrTmp
    return corr

def detection_rate(ref, recov, threshold):
    """Compute the detection rate between reference and recovered dictionaries

    The reference ref and the recovered recov are univariate or multivariate
    dictionaries. An atom a of the ref dictionary is considered as recovered if
    $c < threshold$ with $c = argmax_{r \in R} |<a, r>|$, that is the absolute
    value of the maximum correlation between a and any atom r of the recovered
    dictionary R is above a given threshold.
    The process is iterative and an atom r could be matched only once with an
    atom a of the reference dictionary. In other word, each atom a is matched
    with a different atom r.
    n_kernels_ref, n_kernels_recov = len(ref), len(recov)
    n_features = ref[0].shape[0]
    if ref[0].ndim == 1:
        n_dims = 1
        for k in range(n_kernels_ref):
            ref[k] = atleast_2d(ref[k]).T
        n_dims = ref[0].shape[1]
    if recov[0].ndim == 1:
        for k in range(n_kernels_recov):
            recov[k] = atleast_2d(recov[k]).T
    dr = 0
    corr = zeros((n_kernels_ref, n_kernels_recov))
    for k in range(n_kernels_ref):
        c_tmp = _multivariate_correlation(
                (zeros((n_features, n_dims)), ref[k], zeros((n_features, n_dims))), axis=0
        for j in range(n_kernels_recov):
            idx_max = argmax(abs(c_tmp[j, :]))
            corr[k, j] = c_tmp[j, idx_max]
    c_local = np.abs(corr.copy())
    for _ in range(n_kernels_ref):
        max_corr = c_local.max()
        if max_corr >= threshold:
            dr += 1
        idx_max = np.unravel_index(c_local.argmax(), c_local.shape)
        c_local[:, idx_max[1]] = zeros(n_kernels_ref)
        c_local[idx_max[0], :] = zeros(n_kernels_recov)
    return float(dr) / n_kernels_recov * 100.0

def _convert_array(ref, recov):
    if ref[0].ndim == 1:
        for k in range(len(ref)):
            ref[k] = atleast_2d(ref[k]).T
    if recov[0].ndim == 1:
        for k in range(len(recov)):
            recov[k] = atleast_2d(recov[k]).T
    D1 = np.array(ref)
    D2 = np.array(recov)
    M = D1.shape[0]
    N = D1.shape[1]
    D1 = D1.reshape((M, N))
    D2 = D2.reshape((M, N))
    return D1, D2, M

def precision_recall(ref, recov, threshold):
    """Compute precision and recall for recovery experiment"""
    D1, D2, M = _convert_array(ref, recov)
    corr =
    precision = float((np.max(corr, axis=0) > threshold).sum()) / float(M)
    recall = float((np.max(corr, axis=1) > threshold).sum()) / float(M)
    return precision * 100.0, recall * 100.0

def precision_recall_points(ref, recov):
    """Compute the precision and recall for each atom in a recovery experiment"""
    # if ref[0].ndim == 1:
    #     for k in range(len(ref)):
    #         ref[k] = atleast_2d(ref[k]).T
    # if recov[0].ndim == 1:
    #     for k in range(len(recov)):
    #         recov[k] = atleast_2d(recov[k]).T
    # D1 = np.array(ref)
    # D2 = np.array(recov)
    # M = D1.shape[0]
    # N = D1.shape[1]
    # D1 = D1.reshape((M, N))
    # D2 = D2.reshape((M, N))
    D1, D2, _ = _convert_array(ref, recov)
    corr =
    precision = np.max(corr, axis=0)
    recall = np.max(corr, axis=1)
    return precision, recall

def beta_dist(D1, D2):
    """Compute the Beta-distance proposed by Skretting and Engan

    The beta-distance is:
    $\beta(D1, D2)=1/(M1+M2)(\sum_j \beta(D1, d^2_j)+\sum_j \beta(D2, d^1_j))$
    with $\beta(D, x) = arccos(\max_i |d^T_i x|/||x||)$
    as proposed in:
    Karl Skretting and Kjersti Engan,
    Learned dictionaries for sparse image representation: properties and results,
    SPIE, 2011.
    if D1[0].shape != D2[0].shape:
        raise ValueError(
            f"Dictionaries have different dim : {D1[0].shape} and {D2[0].shape}."
    D1 = np.array(D1)
    M1 = D1.shape[0]
    N = D1.shape[1]
    D1 = D1.reshape((M1, N))
    D2 = np.array(D2)
    M2 = D2.shape[0]
    D2 = D2.reshape((M2, N))
    corr =
    if np.allclose(np.max(corr, axis=0), ones(M2)) and np.allclose(
        np.max(corr, axis=1), ones(M1)
        return 0.0
    return (
        np.sum(np.arccos(np.max(corr, axis=0))) + np.sum(np.arccos(np.max(corr, axis=1)))
    ) / (M1 + M2)