yzhao062/Pyod

View on GitHub
pyod/models/qmcd.py

Summary

Maintainability
A
25 mins
Test Coverage
# -*- coding: utf-8 -*-
"""Quasi-Monte Carlo Discrepancy outlier detection (QMCD)
"""

# Author: D Kulik
# License: BSD 2 clause


import numpy as np
import scipy.stats as stats
from numba import njit, prange
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector


@njit(fastmath=True, parallel=True)
def _wrap_around_discrepancy(data, check):
    """Wrap-around Quasi-Monte Carlo discrepancy method"""

    n = data.shape[0]
    d = data.shape[1]
    p = check.shape[0]

    disc = np.zeros(p)

    for i in prange(p):
        dc = 0.0
        for j in prange(n):
            prod = 1.0
            for k in prange(d):
                x_kikj = abs(check[i, k] - data[j, k])
                prod *= 3.0 / 2.0 - x_kikj + x_kikj ** 2

            dc += prod
        disc[i] = dc

    return - (4.0 / 3.0) ** d + 1.0 / (n ** 2) * disc


class QMCD(BaseDetector):
    """The Wrap-around Quasi-Monte Carlo discrepancy is a uniformity criterion 
       which is used to assess the space filling of a number of samples in a hypercube. 
       It quantifies the distance between the continuous uniform distribution on a 
       hypercube and the discrete uniform distribution on distinct sample points. 
       Therefore, lower discrepancy values for a sample point indicates that it provides 
       better coverage of the parameter space with regard to the rest of the samples. This
       method is kernel based and a higher discrepancy score is relative to the
       rest of the samples, the higher the likelihood of it being an outlier. 
       Read more in the :cite:`fang2001wrap`.
    
    Parameters
    ----------          

    Attributes
    ----------
    decision_scores_ : numpy array of shape (n_samples,)
        The outlier scores of the training data.
        The higher, the more abnormal. Outliers tend to have higher
        scores. This value is available once the detector is
        fitted.

    threshold_ : float
       The modified z-score to use as a threshold. Observations with
       a modified z-score (based on the median absolute deviation) greater
       than this value will be classified as outliers.

    labels_ : int, either 0 or 1
        The binary labels of the training data. 0 stands for inliers
        and 1 for outliers/anomalies. It is generated by applying
        ``threshold_`` on ``decision_scores_``.
        """

    def __init__(self, contamination=0.1):

        super(QMCD, self).__init__(contamination=contamination)

    def fit(self, X, y=None):
        """Fit detector

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.

        y : Ignored
            Not used, present for API consistency by convention.
        """

        # validate inputs X and y (optional)
        X = check_array(X)
        self._set_n_classes(y)

        # Normalize data between 0 and 1
        self._scaler = MinMaxScaler()
        X_norm = self._scaler.fit_transform(X)

        self._fitted_data = X_norm.copy()

        # Calculate WD QMCD scores
        scores = _wrap_around_discrepancy(X_norm, X_norm)

        # Get criterion for inverting scores
        self._is_flipped = False
        skew = stats.skew(scores)
        kurt = stats.kurtosis(scores)

        # Invert score order based on criterion
        if (skew < 0) or ((skew >= 0) & (kurt < 0)):
            scores = scores.max() + scores.min() - scores
            self._is_flipped = True

        self.decision_scores_ = scores

        self._process_decision_scores()

        return self

    def decision_function(self, X):
        """Predict raw anomaly score of X using the fitted detector.

        The anomaly score of an input sample is computed based on different
        detector algorithms. For consistency, outliers are assigned with
        larger anomaly scores.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The independent and dependent/target samples with the target 
            samples being the last column of the numpy array such that
            eg: X = np.append(x, y.reshape(-1,1), axis=1). Sparse matrices are 
            accepted only if they are supported by the base estimator.

        Returns
        -------
        anomaly_scores : numpy array of shape (n_samples,)
            The anomaly score of the input samples.
        """

        check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])

        X = check_array(X)

        # Scale data to fitted data
        X_norm = self._scaler.transform(X)

        # Calculate WD QMCD scores
        scores = _wrap_around_discrepancy(self._fitted_data, X_norm)

        # Invert score order based on criterion
        if self._is_flipped:
            scores = self.decision_scores_.max() + self.decision_scores_.min() - scores

        return scores