KulikDM/pythresh

View on GitHub
pythresh/thresholds/fgd.py

Summary

Maintainability
A
35 mins
Test Coverage
import numpy as np

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, gen_kde, normalize


class FGD(BaseThresholder):
    """FGD class for Fixed Gradient Descent thresholder.

       Use the fixed gradient descent to evaluate a non-parametric means
       to threshold scores generated by the decision_scores where outliers
       are set to any value beyond where the first derivative of the kde
       with respect to the decision scores passes the mean of the first
       and second inflection points. See :cite:`qi2021fgd` for details.

       Parameters
       ----------

       random_state : int, optional (default=1234)
            Random seed for the random number generators of the thresholders. Can also
            be set to None.

       Attributes
       ----------

       thresh_ : threshold value that separates inliers from outliers

       dscores_ : 1D array of decomposed decision scores

       Notes
       -----

       A probability distribution of the decision scores is generated using
       kernel density estimation. The first derivative of the pdf is
       calculated, and the threshold is set as the middle point between the
       first and second inflection points starting from the left side of the
       data range.
    """

    def __init__(self, random_state=1234):

        self.random_state = random_state

    def eval(self, decision):
        """Outlier/inlier evaluation process for decision scores.

        Parameters
        ----------
        decision : np.array or list of shape (n_samples)
                   or np.array of shape (n_samples, n_detectors)
                   which are the decision scores from a
                   outlier detection.

        Returns
        -------
        outlier_labels : numpy array of shape (n_samples,)
            For each observation, tells whether or not
            it should be considered as an outlier according to the
            fitted model. 0 stands for inliers and 1 for outliers.
        """

        decision = check_scores(decision, random_state=self.random_state)

        decision = normalize(decision)

        self.dscores_ = decision

        # Generate KDE
        val, dat_range = gen_kde(decision, 0, 1, len(decision)*3)

        # Calculate the first derivative of the KDE with respect
        # to the data range
        deriv = np.gradient(val, dat_range[1]-dat_range[0])

        count = 0
        ind = []

        # Find the first two inflection points
        for i in range(len(deriv)-1):

            if (deriv[i] > 0) & (deriv[i+1] <= 0):
                count += 1
                ind.append(i)
                if count == 2:
                    break

        limit = ((dat_range[ind[0]]+dat_range[ind[1]])/2 if
                 len(ind) > 1 else 1.1)
        self.thresh_ = limit

        return cut(decision, limit)