pyod/models/loda.py
# -*- coding: utf-8 -*-
"""Loda: Lightweight on-line detector of anomalies
Adapted from tilitools (https://github.com/nicococo/tilitools) by
"""
# Author: Yue Zhao <yzhao062@gmail.com>
# License: BSD 2 clause
import numbers
import numpy as np
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
from ..utils.utility import get_optimal_n_bins
class LODA(BaseDetector):
"""Loda: Lightweight on-line detector of anomalies. See
:cite:`pevny2016loda` for more information.
Two versions of LODA are supported:
- Static number of bins: uses a static number of bins for all random cuts.
- Automatic number of bins: every random cut uses a number of bins deemed
to be optimal according to the Birge-Rozenblac method
(:cite:`birge2006many`).
Parameters
----------
contamination : float in (0., 0.5), optional (default=0.1)
The amount of contamination of the data set,
i.e. the proportion of outliers in the data set. Used when fitting to
define the threshold on the decision function.
n_bins : int or string, optional (default = 10)
The number of bins for the histogram. If set to "auto", the
Birge-Rozenblac method will be used to automatically determine the
optimal number of bins.
n_random_cuts : int, optional (default = 100)
The number of random cuts.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is
fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""
def __init__(self, contamination=0.1, n_bins=10, n_random_cuts=100):
super(LODA, self).__init__(contamination=contamination)
self.n_bins = n_bins
self.n_random_cuts = n_random_cuts
self.weights = np.ones(n_random_cuts, dtype=float) / n_random_cuts
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : Ignored
Not used, present for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
# validate inputs X and y (optional)
X = check_array(X)
self._set_n_classes(y)
pred_scores = np.zeros([X.shape[0], 1])
n_components = X.shape[1]
n_nonzero_components = np.sqrt(n_components)
n_zero_components = n_components - int(n_nonzero_components)
self.projections_ = np.random.randn(self.n_random_cuts, n_components)
# If set to auto: determine optimal n_bins using Birge Rozenblac method
if isinstance(self.n_bins, str) and self.n_bins.lower() == "auto":
self.histograms_ = []
self.limits_ = []
self.n_bins_ = [] # only used when n_bins is determined by method "auto"
for i in range(self.n_random_cuts):
rands = np.random.permutation(n_components)[:n_zero_components]
self.projections_[i, rands] = 0.
projected_data = self.projections_[i, :].dot(X.T)
n_bins = get_optimal_n_bins(projected_data)
self.n_bins_.append(n_bins)
histogram, limits = np.histogram(
projected_data, bins=n_bins, density=False)
histogram = histogram.astype(np.float64)
histogram += 1e-12
histogram /= np.sum(histogram)
self.histograms_.append(histogram)
self.limits_.append(limits)
# calculate the scores for the training samples
inds = np.searchsorted(limits[:n_bins - 1],
projected_data, side='left')
pred_scores[:, 0] += -self.weights[i] * np.log(
histogram[inds])
elif isinstance(self.n_bins, numbers.Integral):
self.histograms_ = np.zeros((self.n_random_cuts, self.n_bins))
self.limits_ = np.zeros((self.n_random_cuts, self.n_bins + 1))
for i in range(self.n_random_cuts):
rands = np.random.permutation(n_components)[:n_zero_components]
self.projections_[i, rands] = 0.
projected_data = self.projections_[i, :].dot(X.T)
self.histograms_[i, :], self.limits_[i, :] = np.histogram(
projected_data, bins=self.n_bins, density=False)
self.histograms_[i, :] += 1e-12
self.histograms_[i, :] /= np.sum(self.histograms_[i, :])
# calculate the scores for the training samples
inds = np.searchsorted(self.limits_[i, :self.n_bins - 1],
projected_data, side='left')
pred_scores[:, 0] += -self.weights[i] * np.log(
self.histograms_[i, inds])
else:
raise ValueError("n_bins must be an int or \'auto\', "
"got: %f" % self.n_bins)
self.decision_scores_ = (pred_scores / self.n_random_cuts).ravel()
self._process_decision_scores()
return self
def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector.
The anomaly score of an input sample is computed based on different
detector algorithms. For consistency, outliers are assigned with
larger anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
check_is_fitted(self, ['projections_', 'decision_scores_',
'threshold_', 'labels_'])
X = check_array(X)
pred_scores = np.zeros([X.shape[0], 1])
if isinstance(self.n_bins, str) and self.n_bins.lower() == "auto":
for i in range(self.n_random_cuts):
projected_data = self.projections_[i, :].dot(X.T)
inds = np.searchsorted(self.limits_[i][:self.n_bins_[i] - 1],
projected_data, side='left')
pred_scores[:, 0] += -self.weights[i] * np.log(
self.histograms_[i][inds])
elif isinstance(self.n_bins, numbers.Integral):
for i in range(self.n_random_cuts):
projected_data = self.projections_[i, :].dot(X.T)
inds = np.searchsorted(self.limits_[i, :self.n_bins - 1],
projected_data, side='left')
pred_scores[:, 0] += -self.weights[i] * np.log(
self.histograms_[i, inds])
else:
raise ValueError("n_bins must be an int or \'auto\', "
"got: %f" % self.n_bins)
pred_scores /= self.n_random_cuts
return pred_scores.ravel()