yzhao062/Pyod

View on GitHub
pyod/models/lmdd.py

Summary

Maintainability
B
4 hrs
Test Coverage
# -*- coding: utf-8 -*-
"""Linear Model Deviation-base outlier detection (LMDD).
"""
# Author: Yahya Almardeny <almardeny@gmail.com>
# License: BSD 2 clause


import numpy as np
from numba import njit
from scipy import stats
from sklearn.utils import check_array, check_random_state

from pyod.utils import check_parameter
from .base import BaseDetector


@njit
def _aad(X):
    """Internal Function to Calculate Average Absolute Deviation
    (a.k.a Mean Absolute Deviation)
    """
    return np.mean(np.absolute(X - np.mean(X)))


def _check_params(n_iter, dis_measure, random_state):
    """Internal function to check for and validate class parameters.
    Also, to return random state instance and the appropriate dissimilarity
    measure if valid.
    """
    if isinstance(n_iter, int):
        check_parameter(n_iter, low=1, param_name='n_iter')
    else:
        raise TypeError("n_iter should be int, got %s" % n_iter)

    if isinstance(dis_measure, str):
        if dis_measure not in ('aad', 'var', 'iqr'):
            raise ValueError("Unknown dissimilarity measure type, "
                             "dis_measure should be in "
                             "(\'aad\', \'var\', \'iqr\'), "
                             "got %s" % dis_measure)
        # TO-DO: 'mad': Median Absolute Deviation to be added
        # once Scipy stats version 1.3.0 is released
    else:
        raise TypeError("dis_measure should be str, got %s" % dis_measure)

    return check_random_state(random_state), _aad if dis_measure == 'aad' \
        else (np.var if dis_measure == 'var'
              else (stats.iqr if dis_measure == 'iqr' else None))


class LMDD(BaseDetector):
    """Linear Method for Deviation-based Outlier Detection.

    LMDD employs the concept of the smoothing factor which
    indicates how much the dissimilarity can be reduced by
    removing a subset of elements from the data-set.
    Read more in the :cite:`arning1996linear`.

    Note: this implementation has minor modification to make it output scores
    instead of labels.

    Parameters
    ----------
    contamination : float in (0., 0.5), optional (default=0.1)
        The amount of contamination of the data set, i.e.
        the proportion of outliers in the data set. Used when fitting to
        define the threshold on the decision function.

    n_iter : int, optional (default=50)
        Number of iterations where in each iteration,
        the process is repeated after randomizing the order of the input.
        Note that n_iter is a very important factor that affects the accuracy.
        The higher the better the accuracy and the longer the execution.

    dis_measure: str, optional (default='aad')
        Dissimilarity measure to be used in calculating the smoothing factor
        for points, options available:

        - 'aad': Average Absolute Deviation
        - 'var': Variance
        - 'iqr': Interquartile Range

    random_state : int, RandomState instance or None, optional (default=None)
        If int, random_state is the seed used by the random number generator;
        If RandomState instance, random_state is the random number generator;
        If None, the random number generator is the RandomState instance used
        by `np.random`.

    Attributes
    ----------
    decision_scores_ : numpy array of shape (n_samples,)
        The outlier scores of the training data.
        The higher, the more abnormal. Outliers tend to have higher
        scores. This value is available once the detector is fitted.

    threshold_ : float
        The threshold is based on ``contamination``. It is the
        ``n_samples * contamination`` most abnormal samples in
        ``decision_scores_``. The threshold is calculated for generating
        binary outlier labels.

    labels_ : int, either 0 or 1
        The binary labels of the training data. 0 stands for inliers
        and 1 for outliers/anomalies. It is generated by applying
        ``threshold_`` on ``decision_scores_``.
    """

    def __init__(self, contamination=0.1, n_iter=50, dis_measure='aad',
                 random_state=None):
        super(LMDD, self).__init__(contamination=contamination)
        self.n_iter, self.n_iter_ = n_iter, n_iter
        self.dis_measure, self.dis_measure_ = dis_measure, dis_measure

        # add this assignment to prevent clone error; not being used.
        self.random_state = random_state
        self.random_state_, self.dis_measure_ = _check_params(n_iter,
                                                              dis_measure,
                                                              random_state)

    def fit(self, X, y=None):
        """Fit detector. y is ignored in unsupervised methods.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.

        y : Ignored
            Not used, present for API consistency by convention.

        Returns
        -------
        self : object
            Fitted estimator.
        """
        X = check_array(X)
        self._set_n_classes(y)
        self.decision_scores_ = self.decision_function(X)
        self._process_decision_scores()
        return self

    def decision_function(self, X):
        """Predict raw anomaly score of X using the fitted detector.

        The anomaly score of an input sample is computed based on different
        detector algorithms. For consistency, outliers are assigned with
        larger anomaly scores.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The training input samples. Sparse matrices are accepted only
            if they are supported by the base estimator.

        Returns
        -------
        anomaly_scores : numpy array of shape (n_samples,)
            The anomaly score of the input samples.
        """
        return self.__sf(X)

    def __dis(self, X):
        """
        Internal function to calculate for
        dissimilarity in a sequence of sets.
        """
        res_ = np.zeros(shape=(X.shape[0],))
        var_max, j = -np.inf, 0
        # this can be vectorized but just for comforting memory
        for i in range(1, X.shape[0]):
            _var = self.dis_measure_(X[:i + 1]) - self.dis_measure_(X[:i])
            if _var > var_max:
                var_max = _var
                j = i

        if var_max > res_[j]:
            res_[j] = var_max

            for k in range(j + 1, X.shape[0]):
                dk_diff = self.dis_measure_(np.vstack((X[:j], X[k]))) \
                          - self.dis_measure_(np.vstack((X[:j + 1], X[k])))
                if dk_diff >= 0:
                    res_[k] = dk_diff + var_max

        return res_

    def __sf(self, X):
        """Internal function to calculate for Smoothing Factors of data points
        Repeated n_iter_ of times in randomized mode.
        """
        dis_ = np.zeros(shape=(X.shape[0],))
        card_ = np.zeros(shape=(X.shape[0],))
        # perform one process with the original input order
        itr_res = self.__dis(X)
        np.put(card_, X.shape[0] - sum([i > 0. for i in itr_res]),
               np.where(itr_res > 0.))

        # create a copy of random state to preserve original state for
        # future fits (if any)
        random_state = np.random.RandomState(
            seed=self.random_state_.get_state()[1][0])
        indices = np.arange(X.shape[0])
        for _ in range(self.n_iter_):
            ind_ = indices
            random_state.shuffle(ind_)
            _x = X[indices]
            # get dissimilarity of this iteration and restore original order
            itr_res = self.__dis(_x)[np.argsort(ind_)]
            current_card = X.shape[0] - sum([i > 0. for i in itr_res])
            # compare with previous iteration to get the maximal dissimilarity
            for i, j in enumerate(itr_res):
                if j > dis_[i]:
                    dis_[i] = j
                    card_[i] = current_card
            # Increase random state seed by one to reorder input next iteration
            random_state.seed(random_state.get_state()[1][0] + 1)

        return np.multiply(dis_, card_)