altugkarakurt/ModeTonicEstimation

View on GitHub
morty/classifiers/knn.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
from __future__ import division
from scipy.spatial import distance as spdistance
import collections
import numpy as np
import copy


class KNN(object):
    @classmethod
    def generate_distance_matrix(cls, distrib, peak_idx, training_distribs,
                                 distance_method='bhat'):
        """--------------------------------------------------------------------
        Iteratively calculates the distance of the input distribution from each
        (mode candidate, tonic candidate) pair. This is a generic function,
        that is independent of distribution type or any other parameter value.
        -----------------------------------------------------------------------
        distribs            : Input distribution that is to be estimated
        peak_idx            : List of indices of distribution peaks
        training_distribs   : List of training distributions
        method              : The distance method to be used. The available
                              distances are listed in _distance() method.
        --------------------------------------------------------------------"""
        result = np.zeros((len(peak_idx), len(training_distribs)))
        trial = copy.deepcopy(distrib)

        # convert the shifts from absolute wrt the reference distribution to
        # relative wrt the previous shift
        shift_idx = np.diff(np.insert(peak_idx, 0, 0))

        # Iterates over the peaks, i.e. the tonic candidates
        for i, cur_peak_idx in enumerate(shift_idx):
            trial.shift(cur_peak_idx)

            # Iterates over mode candidates
            for j, td in enumerate(training_distribs):
                assert trial.bin_unit == td.bin_unit, \
                    'The bin units of the compared distributions should match.'
                assert trial.distrib_type() == td.distrib_type(), \
                    'The features should be of the same type'
                assert trial.step_size == td.step_size, \
                    'The step_sizes should be the same'

                if trial.is_pcd():
                    trial_vals = trial.vals
                    td_vals = td.vals
                else:  # PD, compare in the overlapping region
                    min_td_bin = np.min(td.bins)
                    max_td_bin = np.max(td.bins)

                    min_trial_bin = np.min(trial.bins)
                    max_trial_bin = np.max(trial.bins)

                    overlap = [max([min_td_bin, min_trial_bin]),
                               min([max_td_bin, max_trial_bin])]

                    trial_bool = (overlap[0] <= trial.bins) * \
                                 (trial.bins <= overlap[1])
                    trial_vals = trial.vals[trial_bool]

                    td_bool = (overlap[0] <= td.bins) * \
                              (td.bins <= overlap[1])
                    td_vals = td.vals[td_bool]

                if any([td_vals.size == 0, trial_vals.size == 0]):
                    # no overlapping, only in pd
                    assert not trial.is_pcd(), \
                        'The distributions should have been pitch ' \
                        'distribution to become non-overlapping'
                    result[i][j] = 0
                else:
                    # Calls the distance function for each entry of the matrix
                    result[i][j] = cls._compute_measure(trial_vals, td_vals,
                                                        method=distance_method)
        return np.array(result)

    @staticmethod
    def _compute_measure(vals_1, vals_2, method='bhat'):
        """--------------------------------------------------------------------
         Computes the distance or dissimilairty between two 1-D lists of
         values. This function is called with pitch distribution values,
         while generating matrices. The function is symmetric, the two input
         lists are interchangable.
         ----------------------------------------------------------------------
         vals_1, vals_2 : The input value lists.
         method         : The choice of distance method
         ----------------------------------------------------------------------
         manhattan    : Minkowski distance of 1st degree
         euclidean    : Minkowski distance of 2nd degree
         l3           : Minkowski distance of 3rd degree
         bhat         : Bhattacharyya distance
         intersection : Intersection
         corr         : Correlation
         -------------------------------------------------------------------"""
        if method in ['manhattan', 'l1']:
            return spdistance.minkowski(vals_1, vals_2, 1)
        elif method in ['euclidean', 'l2']:
            return spdistance.euclidean(vals_1, vals_2)
        elif method == 'l3':
            return spdistance.minkowski(vals_1, vals_2, 3)
        elif method == 'bhat':  # bhattacharrya distance
            return -np.log(np.sum(np.sqrt(vals_1 * vals_2)))
        elif method == 'jeffrey':  # Jeffrey's divergence
            return (np.sum(vals_1 * np.log(vals_1 / vals_2)) +
                    np.sum(vals_2 * np.log(vals_2 / vals_1)))
        elif method == 'js':  # Jensen–Shannon distance
            return np.sqrt(
                np.sum(vals_1 * np.log(2 * vals_1 / (vals_1 + vals_2))) * 0.5 +
                np.sum(vals_2 * np.log(2 * vals_2 / (vals_1 + vals_2))) * 0.5)
        # Since correlation and intersection are actually similarity measures,
        # we convert them to dissimilarities, by taking 1 - similarity
        elif method == 'dis_intersect':
            return 1.0 - np.sum(np.minimum(vals_1, vals_2)) / np.size(vals_1)
        elif method == 'dis_corr':
            return 1.0 - np.correlate(vals_1, vals_2)
        else:
            return 0

    @staticmethod
    def get_nearest_neighbors(sorted_pair, k_neighbor):
        # parse mode/tonic pairs
        pairs = [pair for pair, dist in sorted_pair[:k_neighbor]]

        # find the most common pairs
        counter = collections.Counter(pairs)
        most_commons = counter.most_common(k_neighbor)
        max_cnt = most_commons[0][1]
        cand_pairs = [c[0] for c in most_commons if c[1] == max_cnt]

        return cand_pairs

    @staticmethod
    def classify(cand_pairs, sorted_pair):
        # in case there are multiple candidates get the pair sorted earlier
        for p in sorted_pair:
            if p[0] in cand_pairs:
                estimated_pair = p

                # pop the estimated pair from the sorted_pair list for ranking
                sorted_pair = [pp for pp in sorted_pair if pp[0] != p[0]]
                return estimated_pair, sorted_pair

        assert False, 'No pair selected, this should be impossible!'