yzhao062/Pyod

View on GitHub
pyod/models/loda.py

Summary

Maintainability
C
7 hrs
Test Coverage
# -*- coding: utf-8 -*-
"""Loda: Lightweight on-line detector of anomalies
Adapted from tilitools (https://github.com/nicococo/tilitools) by
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause


import numbers

import numpy as np
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector
from ..utils.utility import get_optimal_n_bins


class LODA(BaseDetector):
    """Loda: Lightweight on-line detector of anomalies. See
    :cite:`pevny2016loda` for more information.
    
    Two versions of LODA are supported:        
    - Static number of bins: uses a static number of bins for all random cuts.
    - Automatic number of bins: every random cut uses a number of bins deemed 
      to be optimal according to the Birge-Rozenblac method
      (:cite:`birge2006many`).

    Parameters
    ----------
    contamination : float in (0., 0.5), optional (default=0.1)
        The amount of contamination of the data set,
        i.e. the proportion of outliers in the data set. Used when fitting to
        define the threshold on the decision function.

    n_bins : int or string, optional (default = 10)
        The number of bins for the histogram. If set to "auto", the 
        Birge-Rozenblac method will be used to automatically determine the 
        optimal number of bins.

    n_random_cuts : int, optional (default = 100)
        The number of random cuts.

    Attributes
    ----------
    decision_scores_ : numpy array of shape (n_samples,)
        The outlier scores of the training data.
        The higher, the more abnormal. Outliers tend to have higher
        scores. This value is available once the detector is
        fitted.

    threshold_ : float
        The threshold is based on ``contamination``. It is the
        ``n_samples * contamination`` most abnormal samples in
        ``decision_scores_``. The threshold is calculated for generating
        binary outlier labels.

    labels_ : int, either 0 or 1
        The binary labels of the training data. 0 stands for inliers
        and 1 for outliers/anomalies. It is generated by applying
        ``threshold_`` on ``decision_scores_``.
    """

    def __init__(self, contamination=0.1, n_bins=10, n_random_cuts=100):
        super(LODA, self).__init__(contamination=contamination)
        self.n_bins = n_bins
        self.n_random_cuts = n_random_cuts
        self.weights = np.ones(n_random_cuts, dtype=float) / n_random_cuts

    def fit(self, X, y=None):
        """Fit detector. y is ignored in unsupervised methods.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.

        y : Ignored
            Not used, present for API consistency by convention.

        Returns
        -------
        self : object
            Fitted estimator.
        """
        # validate inputs X and y (optional)
        X = check_array(X)
        self._set_n_classes(y)
        pred_scores = np.zeros([X.shape[0], 1])

        n_components = X.shape[1]
        n_nonzero_components = np.sqrt(n_components)
        n_zero_components = n_components - int(n_nonzero_components)

        self.projections_ = np.random.randn(self.n_random_cuts, n_components)

        # If set to auto: determine optimal n_bins using Birge Rozenblac method
        if isinstance(self.n_bins, str) and self.n_bins.lower() == "auto":

            self.histograms_ = []
            self.limits_ = []
            self.n_bins_ = []  # only used when n_bins is determined by method "auto"

            for i in range(self.n_random_cuts):
                rands = np.random.permutation(n_components)[:n_zero_components]
                self.projections_[i, rands] = 0.
                projected_data = self.projections_[i, :].dot(X.T)

                n_bins = get_optimal_n_bins(projected_data)
                self.n_bins_.append(n_bins)

                histogram, limits = np.histogram(
                    projected_data, bins=n_bins, density=False)
                histogram = histogram.astype(np.float64)
                histogram += 1e-12
                histogram /= np.sum(histogram)

                self.histograms_.append(histogram)
                self.limits_.append(limits)

                # calculate the scores for the training samples
                inds = np.searchsorted(limits[:n_bins - 1],
                                       projected_data, side='left')
                pred_scores[:, 0] += -self.weights[i] * np.log(
                    histogram[inds])

        elif isinstance(self.n_bins, numbers.Integral):

            self.histograms_ = np.zeros((self.n_random_cuts, self.n_bins))
            self.limits_ = np.zeros((self.n_random_cuts, self.n_bins + 1))

            for i in range(self.n_random_cuts):
                rands = np.random.permutation(n_components)[:n_zero_components]
                self.projections_[i, rands] = 0.
                projected_data = self.projections_[i, :].dot(X.T)
                self.histograms_[i, :], self.limits_[i, :] = np.histogram(
                    projected_data, bins=self.n_bins, density=False)
                self.histograms_[i, :] += 1e-12
                self.histograms_[i, :] /= np.sum(self.histograms_[i, :])

                # calculate the scores for the training samples
                inds = np.searchsorted(self.limits_[i, :self.n_bins - 1],
                                       projected_data, side='left')
                pred_scores[:, 0] += -self.weights[i] * np.log(
                    self.histograms_[i, inds])

        else:
            raise ValueError("n_bins must be an int or \'auto\', "
                             "got: %f" % self.n_bins)

        self.decision_scores_ = (pred_scores / self.n_random_cuts).ravel()
        self._process_decision_scores()

        return self

    def decision_function(self, X):
        """Predict raw anomaly score of X using the fitted detector.

        The anomaly score of an input sample is computed based on different
        detector algorithms. For consistency, outliers are assigned with
        larger anomaly scores.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The training input samples. Sparse matrices are accepted only
            if they are supported by the base estimator.

        Returns
        -------
        anomaly_scores : numpy array of shape (n_samples,)
            The anomaly score of the input samples.
        """
        check_is_fitted(self, ['projections_', 'decision_scores_',
                               'threshold_', 'labels_'])

        X = check_array(X)
        pred_scores = np.zeros([X.shape[0], 1])

        if isinstance(self.n_bins, str) and self.n_bins.lower() == "auto":

            for i in range(self.n_random_cuts):
                projected_data = self.projections_[i, :].dot(X.T)

                inds = np.searchsorted(self.limits_[i][:self.n_bins_[i] - 1],
                                       projected_data, side='left')
                pred_scores[:, 0] += -self.weights[i] * np.log(
                    self.histograms_[i][inds])

        elif isinstance(self.n_bins, numbers.Integral):

            for i in range(self.n_random_cuts):
                projected_data = self.projections_[i, :].dot(X.T)

                inds = np.searchsorted(self.limits_[i, :self.n_bins - 1],
                                       projected_data, side='left')
                pred_scores[:, 0] += -self.weights[i] * np.log(
                    self.histograms_[i, inds])
        else:
            raise ValueError("n_bins must be an int or \'auto\', "
                             "got: %f" % self.n_bins)

        pred_scores /= self.n_random_cuts
        return pred_scores.ravel()