pyod/models/loci.py
# -*- coding: utf-8 -*-
"""Local Correlation Integral (LOCI).
Part of the codes are adapted from https://github.com/Cloudy10/loci
"""
# Author: Winston Li <jk_zhengli@hotmail.com>
# License: BSD 2 clause
import numpy as np
from numba import njit
from scipy.spatial.distance import pdist, squareform
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from .base import BaseDetector
@njit
def _get_critical_values(dist_matrix, alpha, p_ix, r_max,
r_min=0): # pragma: no cover
"""Computes the critical values of a given distance matrix.
Parameters
----------
dist_matrix : array-like, shape (n_samples, n_features)
The distance matrix w.r.t. to the training samples.
p_ix : int
Subsetting index
alpha : int, default = 0.5
The neighbourhood parameter measures how large of a neighbourhood
should be considered "local".
r_max : int
Maximum neighbourhood radius
r_min : int, default = 0
Minimum neighbourhood radius
Returns
-------
cv : array, shape (n_critical_val, )
Returns a list of critical values.
"""
distances = dist_matrix[p_ix, :]
mask = (r_min < distances) & (distances <= r_max)
cv = np.sort(
np.concatenate((distances[mask], distances[mask] / alpha)))
return cv
@njit
def _get_sampling_N(dist_matrix, p_ix, r): # pragma: no cover
"""Computes the set of r-neighbours.
Parameters
----------
dist_matrix : array-like, shape (n_samples, n_features)
The distance matrix w.r.t. to the training samples.
p_ix : int
Subsetting index
r : int
Neighbourhood radius
Returns
-------
sample : array, shape (n_sample, )
Returns a list of neighbourhood data points.
"""
p_distances = dist_matrix[p_ix, :]
sample = np.nonzero(p_distances <= r)[0]
return sample
class LOCI(BaseDetector):
"""Local Correlation Integral.
LOCI is highly effective for detecting outliers and groups of
outliers ( a.k.a.micro-clusters), which offers the following advantages
and novelties: (a) It provides an automatic, data-dictated cut-off to
determine whether a point is an outlier—in contrast, previous methods
force users to pick cut-offs, without any hints as to what cut-off value
is best for a given dataset. (b) It can provide a LOCI plot for each
point; this plot summarizes a wealth of information about the data in
the vicinity of the point, determining clusters, micro-clusters, their
diameters and their inter-cluster distances. None of the existing
outlier-detection methods can match this feature, because they output
only a single number for each point: its outlierness score.(c) It can
be computed as quickly as the best previous methods
Read more in the :cite:`papadimitriou2003loci`.
Parameters
----------
contamination : float in (0., 0.5), optional (default=0.1)
The amount of contamination of the data set, i.e.
the proportion of outliers in the data set. Used when fitting to
define the threshold on the decision function.
alpha : int, default = 0.5
The neighbourhood parameter measures how large of a neighbourhood
should be considered "local".
k: int, default = 3
An outlier cutoff threshold for determine whether or not a point
should be considered an outlier.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
Examples
--------
>>> from pyod.models.loci import LOCI
>>> from pyod.utils.data import generate_data
>>> n_train = 50
>>> n_test = 50
>>> contamination = 0.1
>>> X_train, y_train, X_test, y_test = generate_data(
... n_train=n_train, n_test=n_test,
... contamination=contamination, random_state=42)
>>> clf = LOCI()
>>> clf.fit(X_train)
LOCI(alpha=0.5, contamination=0.1, k=None)
"""
def __init__(self, contamination=0.1, alpha=0.5, k=3):
super(LOCI, self).__init__(contamination=contamination)
self.alpha = alpha
self.threshold_ = k
def _get_alpha_n(self, dist_matrix, indices, r):
"""Computes the alpha neighbourhood points.
Parameters
----------
dist_matrix : array-like, shape (n_samples, n_features)
The distance matrix w.r.t. to the training samples.
indices : int
Subsetting index
r : int
Neighbourhood radius
Returns
-------
alpha_n : array, shape (n_alpha, )
Returns the alpha neighbourhood points.
"""
if type(indices) is int:
alpha_n = np.count_nonzero(
dist_matrix[indices, :] < (r * self.alpha))
return alpha_n
else:
alpha_n = np.count_nonzero(
dist_matrix[indices, :] < (r * self.alpha), axis=1)
return alpha_n
def _calculate_decision_score(self, X):
"""Computes the outlier scores.
Parameters
----------
X : array-like, shape (n_samples, n_features)
The input data points.
Returns
-------
outlier_scores : list
Returns the list of outlier scores for input dataset.
"""
outlier_scores = [0] * X.shape[0]
dist_matrix = squareform(pdist(X, metric="euclidean"))
max_dist = dist_matrix.max()
r_max = max_dist / self.alpha
for p_ix in range(X.shape[0]):
critical_values = _get_critical_values(dist_matrix, self.alpha,
p_ix, r_max)
for r in critical_values:
n_values = self._get_alpha_n(dist_matrix,
_get_sampling_N(dist_matrix,
p_ix, r), r)
cur_alpha_n = self._get_alpha_n(dist_matrix, p_ix, r)
n_hat = np.mean(n_values)
mdef = 1 - (cur_alpha_n / n_hat)
sigma_mdef = np.std(n_values) / n_hat
if n_hat >= 20:
outlier_scores[p_ix] = mdef / sigma_mdef
if mdef > (self.threshold_ * sigma_mdef):
break
return np.asarray(outlier_scores)
def fit(self, X, y=None):
"""Fit the model using X as training data.
Parameters
----------
X : array, shape (n_samples, n_features)
Training data.
y : Ignored
Not used, present for API consistency by convention.
Returns
-------
self : object
"""
X = check_array(X)
self._set_n_classes(y)
self.decision_scores_ = self._calculate_decision_score(X)
self.labels_ = (self.decision_scores_ > self.threshold_).astype(
'int').ravel()
# calculate for predict_proba()
self._mu = np.mean(self.decision_scores_)
self._sigma = np.std(self.decision_scores_)
return self
def decision_function(self, X):
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
return self._calculate_decision_score(X)