KulikDM/pythresh

View on GitHub
pythresh/thresholds/comb.py

Summary

Maintainability
A
1 hr
Test Coverage
import numpy as np
import scipy.stats as stats
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.ensemble import BaggingClassifier, StackingClassifier
from sklearn.linear_model import RidgeClassifier
from sklearn.utils import check_array

from .base import BaseThresholder
from .thresh_utility import check_scores, cut, normalize


class COMB(BaseThresholder):
    """COMB class for Combined thresholder.

       Use multiple thresholders as a non-parametric means
       to threshold scores generated by the decision_scores where outliers
       are set to any value beyond the (mean, median, or mode) of the
       contamination from the selected combination of thresholders.

       Parameters
       ----------

       thresholders : list, optional (default='default')
            List of instantiated thresholders, e.g. [DSN(), FILTER()].
            Default is [DSN(random_state=self.random_state), FILTER(),
            OCSVM(random_state=self.random_state)]

       max_contam : float, optional (default=0.5)
            Maximum contamination allowed for each threshold output. Thresholded scores
            above the maximum contamination will not be included in the final combined
            threshold

       method : {'mean', 'median', 'mode', 'bagged', 'stacked}, optional (default='stacked')
           evaluation method to apply to contamination levels

           - 'mean':    calculate the mean combined threshold
           - 'median':  calculate the median combined threshold
           - 'mode':    calculate the majority vote or mode of the thresholded labels
           - 'bagged':  use a bagged LaplaceGaussianNB to solve the combined threshold
           - 'stacked': use a stacked Ridge, and LaplaceGaussianNB classifier combined method

       random_state : int, optional (default=1234)
            Random seed for the random number generators of the thresholders. Can also
            be set to None.

       Attributes
       ----------

       thresh_ : threshold value that separates inliers from outliers

       confidence_interval_ : lower and upper confidence interval of the contamination level

       dscores_ : 1D array of decomposed decision scores
    """

    def __init__(self, thresholders='default', max_contam=0.5, method='stacked', random_state=1234):

        self.thresholders = thresholders
        self.max_contam = max_contam
        func = {'mean': np.mean, 'median': np.median,
                'mode': stats.mode, 'bagged': BaggingClassifier,
                'stacked': StackingClassifier}
        self.method = method
        self.method_func = func[method]
        self.random_state = random_state

    def eval(self, decision):
        """Outlier/inlier evaluation process for decision scores.

        Parameters
        ----------
        decision : np.array or list of shape (n_samples)
                   or np.array of shape (n_samples, n_detectors)
                   which are the decision scores from a
                   outlier detection.

        Returns
        -------
        outlier_labels : numpy array of shape (n_samples,)
            For each observation, tells whether or not
            it should be considered as an outlier according to the
            fitted model. 0 stands for inliers and 1 for outliers.
        """

        scores = check_array(decision, ensure_2d=False)

        decision = check_scores(decision, random_state=self.random_state)

        decision = normalize(decision)

        self.dscores_ = decision

        # Initialize thresholders
        if self.thresholders == 'default':
            from .dsn import DSN
            from .filter import FILTER
            from .ocsvm import OCSVM

            self.thresholders = [DSN(random_state=self.random_state), FILTER(),
                                 OCSVM(random_state=self.random_state)]

        # Apply each thresholder
        contam = []
        ratio = []
        counts = len(decision)

        for thresholder in self.thresholders:

            labels = thresholder.eval(scores)
            outlier_ratio = np.sum(labels)/counts

            if outlier_ratio < self.max_contam:

                contam.append(labels)
                ratio.append(outlier_ratio)

        contam = np.array(contam)
        ratio = np.array(ratio)

        # Get lower and upper confidence interval
        low, high = stats.bootstrap(ratio.reshape(1, -1),
                                    np.mean, paired=True,
                                    random_state=self.random_state).confidence_interval
        self.confidence_interval_ = [low, high]

        # Get [mean, median, mode, bagged, or stacked] of inliers
        if (self.method == 'bagged') or (self.method == 'stacked'):

            X = np.tile(decision, len(contam))
            y = np.hstack(contam)

            if (self.method == 'bagged'):
                model = self.method_func(LaplaceGaussianNB(),
                                         n_estimators=12,
                                         random_state=self.random_state)
            else:
                model = self.method_func([('Ridge', RidgeClassifier()),
                                          ('GNB', LaplaceGaussianNB())])

            model.fit(X.reshape(-1, 1), y)
            lbls = model.predict(decision.reshape(-1, 1))

            self.thresh_ = None

            return lbls

        elif self.method == 'mode':

            self.thresh_ = None
            lbls = self.method_func(contam, axis=0)

            return np.squeeze(lbls[0])

        else:

            contam = np.sum(contam, axis=1)/contam.shape[1]
            inlier_ratio = 1-self.method_func(contam)

            idx = int(counts*inlier_ratio)
            ordered = np.sort(decision)
            limit = ordered[idx] if idx < counts else 1.0
            self.thresh_ = limit

            return cut(decision, limit)


class LaplaceGaussianNB(BaseEstimator, ClassifierMixin):

    def __init__(self):

        pass

    def fit(self, X, y):

        X = X.squeeze()

        self.models = []
        self.priors = []
        self.classes_ = [0, 1]
        dist = [stats.laplace, stats.norm]

        for c in self.classes_:

            subset_x = X[y == c]

            self.models.append(dist[c](subset_x.mean(),
                                       subset_x.std()))

            self.priors.append(len(subset_x)/len(X))

        return self

    def predict(self, X):

        likelihoods = self.predict_proba(X)

        return likelihoods.argmax(axis=1)

    def predict_proba(self, X):

        X = X.squeeze()

        likelihoods = []

        for c in self.classes_:

            probs = self.priors[c] * self.models[c].pdf(X)
            likelihoods.append(probs)

        return np.vstack(likelihoods).T