KulikDM/pythresh

View on GitHub
pythresh/thresholds/ocsvm.py

Summary

Maintainability
A
50 mins
Test Coverage
import numpy as np
import scipy.stats as stats
from sklearn.kernel_approximation import AdditiveChi2Sampler
from sklearn.linear_model import RidgeCV, SGDOneClassSVM
from sklearn.metrics import mean_squared_error
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PolynomialFeatures
from sklearn.svm import OneClassSVM

from .base import BaseThresholder
from .thresh_utility import check_scores, gen_kde, normalize


class OCSVM(BaseThresholder):
    """OCSVM class for One-Class Support Vector Machine thresholder.

       Use a one-class svm to evaluate a non-parametric means
       to threshold scores generated by the decision_scores where outliers
       are determined by the one-class svm using a polynomial kernel
       with the polynomial degree either set or determined by regression
       internally. See :cite:`barbado2022ocsvm` for details.

       Parameters
       ----------

       model : {'poly', 'sgd'}, optional (default='sgd')
           OCSVM model to apply

           - 'poly':  Use a polynomial kernel with a regular OCSVM
           - 'sgd':   Used the Additive Chi2 kernel approximation with a SGDOneClassSVM

       degree : int, optional (default='auto')
           Polynomial degree to use for the one-class svm.
           Default 'auto' finds the optimal degree with linear regression

       gamma : float, optional (default='auto')
           Kernel coefficient for polynomial fit for the one-class svm.
           Default 'auto' uses 1 / n_features

       criterion : {'aic', 'bic'}, optional (default='bic')
           regression performance metric. AIC is the Akaike Information Criterion,
           and BIC is the Bayesian Information Criterion. This only applies
           when degree is set to 'auto'

       nu : float, optional (default='auto')
           An upper bound on the fraction of training errors and a lower bound
           of the fraction of support vectors. Default 'auto' sets nu as the ratio
           between the any point that is less than or equal to the median plus
           the absolute difference between the mean and geometric mean over the
           the number of points in the entire dataset

       tol : float, optional (default=1e-3)
           The stopping criterion for the one-class svm

       random_state : int, optional (default=1234)
            Random seed for the SVM's data sampling. Can also be set to None.

       Attributes
       ----------

       thresh_ : threshold value that separates inliers from outliers

       Examples
       --------
       The effects of randomness can affect the thresholder's output performance
       significantly. Therefore, to alleviate the effects of randomness on the
       thresholder a combined model can be used with different random_state values.
       E.g.

       .. code:: python

            # train the KNN detector
            from pyod.models.knn import KNN
            from pythresh.thresholds.comb import COMB
            from pythresh.thresholds.ocsvm import OCSVM

            clf = KNN()
            clf.fit(X_train)

            # get outlier scores
            decision_scores = clf.decision_scores_  # raw outlier scores

            # get outlier labels with combined model
            thres = COMB(thresholders = [OCSVM(random_state=1234),
            OCSVM(random_state=42), OCSVM(random_state=9685),
            OCSVM(random_state=111222)])
            labels = thres.eval(decision_scores)
    """

    def __init__(self, model='sgd', degree='auto', gamma='auto',
                 criterion='bic', nu='auto', tol=1e-3, random_state=1234):

        self.model = model
        self.degree = degree
        self.gamma = gamma
        self.crit = criterion
        self.nu = nu
        self.tol = tol
        self.random_state = random_state

    def eval(self, decision):
        """Outlier/inlier evaluation process for decision scores.

        Parameters
        ----------
        decision : np.array or list of shape (n_samples)
                   or np.array of shape (n_samples, n_detectors)
                   which are the decision scores from a
                   outlier detection.

        Returns
        -------
        outlier_labels : numpy array of shape (n_samples,)
            For each observation, tells whether or not
            it should be considered as an outlier according to the
            fitted model. 0 stands for inliers and 1 for outliers.
        """

        decision = check_scores(decision, random_state=self.random_state)

        decision = normalize(decision)

        self.dscores_ = decision

        # Get auto nu calculation
        if self.nu == 'auto':

            np.seterr(divide='ignore')
            gmean = stats.gmean(decision)
            mean = np.mean(decision)
            med = np.median(decision)

            self.nu = len(decision[decision <= med +
                          abs(mean-gmean)])/len(decision)

        self.nu = 0.5 if self.nu == 1.0 else self.nu

        # Get auto degree calculation
        if (self.degree == 'auto') & (self.model == 'poly'):

            self.degree = self._auto_crit(decision)

        decision = decision.reshape(-1, 1)

        # Create a one-class svm
        if self.model == 'poly':
            clf = OneClassSVM(gamma=self.gamma, kernel='poly',
                              degree=self.degree, nu=self.nu,
                              tol=self.tol).fit(decision)
        else:
            transform = AdditiveChi2Sampler()
            sgd = SGDOneClassSVM(nu=self.nu,
                                 random_state=self.random_state)
            clf = make_pipeline(transform, sgd)
            clf.fit(decision)

        # Predict inliers and outliers
        res = clf.predict(decision)

        res[res == -1] = 0

        # Remove outliers from the left tail (precaution step)
        decision = np.squeeze(decision)
        mask = np.where(decision <= np.mean(decision))
        res[mask] = 0

        self.thresh_ = None

        return res

    def _auto_crit(self, decision):
        """Decide polynomial degree using criterion."""

        # Generate kde
        kde, dat_range = gen_kde(decision, 0, 1, len(decision))

        # Set polynomial degrees to test
        polys = [2, 3, 4, 5, 6, 7, 8, 9, 10]
        n = len(decision)

        decision = decision.reshape(-1, 1)
        kde = kde.reshape(-1, 1)

        scores = []

        for poly in polys:

            # Calculate the polynomial features for the kde
            poly_features = PolynomialFeatures(degree=poly, include_bias=True)
            poly_fit = poly_features.fit_transform(kde)

            # Use regression to fit the polynomial
            poly_reg = RidgeCV(alphas=np.logspace(-1, 2, 100))
            poly_reg.fit(poly_fit, dat_range)
            poly_pred = poly_reg.predict(poly_fit)

            # Get the mse and apply the regression performance metric
            mse = mean_squared_error(dat_range, poly_pred)

            if self.crit == 'aic':
                scores.append(n*np.log(mse) + 2*(poly+1))
            else:
                scores.append(n*np.log(mse) + (poly+1)*np.log(n))

        # Set degree from smallest metric score

        return polys[np.argmin(scores)]