pyod/models/lscp.py
"""Locally Selective Combination of Parallel Outlier Ensembles (LSCP).
Adapted from the original implementation.
"""
# Author: Zain Nasrullah <zain.nasrullah.zn@gmail.com>
# License: BSD 2 clause
import collections
import warnings
import numpy as np
from sklearn.neighbors import KDTree
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.validation import check_random_state
# PyOD imports
from .base import BaseDetector
from ..utils.stat_models import pearsonr
from ..utils.utility import argmaxn
from ..utils.utility import check_detector
from ..utils.utility import generate_bagging_indices
from ..utils.utility import standardizer
# TODO: find random state that is causing runtime warning in pearson
class LSCP(BaseDetector):
""" Locally Selection Combination in Parallel Outlier Ensembles
LSCP is an unsupervised parallel outlier detection ensemble which selects
competent detectors in the local region of a test instance. This
implementation uses an Average of Maximum strategy. First, a heterogeneous
list of base detectors is fit to the training data and then generates a
pseudo ground truth for each train instance is generated by
taking the maximum outlier score.
For each test instance:
1) The local region is defined to be the set of nearest training points in
randomly sampled feature subspaces which occur more frequently than
a defined threshold over multiple iterations.
2) Using the local region, a local pseudo ground truth is defined and the
pearson correlation is calculated between each base detector's training
outlier scores and the pseudo ground truth.
3) A histogram is built out of pearson correlation scores; detectors in
the largest bin are selected as competent base detectors for the given
test instance.
4) The average outlier score of the selected competent detectors is taken
to be the final score.
See :cite:`zhao2019lscp` for details.
Parameters
----------
detector_list : List, length must be greater than 1
Base unsupervised outlier detectors from PyOD. (Note: requires fit and
decision_function methods)
local_region_size : int, optional (default=30)
Number of training points to consider in each iteration of the local
region generation process (30 by default).
local_max_features : float in (0.5, 1.), optional (default=1.0)
Maximum proportion of number of features to consider when defining the
local region (1.0 by default).
n_bins : int, optional (default=10)
Number of bins to use when selecting the local region
random_state : RandomState, optional (default=None)
A random number generator instance to define the state of the random
permutations generator.
contamination : float in (0., 0.5), optional (default=0.1)
The amount of contamination of the data set, i.e.
the proportion of outliers in the data set. Used when fitting to
define the threshold on the decision function (0.1 by default).
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
Examples
--------
>>> from pyod.utils.data import generate_data
>>> from pyod.utils.utility import standardizer
>>> from pyod.models.lscp import LSCP
>>> from pyod.models.lof import LOF
>>> X_train, y_train, X_test, y_test = generate_data(
... n_train=50, n_test=50,
... contamination=0.1, random_state=42)
>>> X_train, X_test = standardizer(X_train, X_test)
>>> detector_list = [LOF(), LOF()]
>>> clf = LSCP(detector_list)
>>> clf.fit(X_train)
LSCP(...)
"""
def __init__(self, detector_list, local_region_size=30,
local_max_features=1.0, n_bins=10,
random_state=None, contamination=0.1):
super(LSCP, self).__init__(contamination=contamination)
self.detector_list = detector_list
self.n_clf = len(self.detector_list)
self.local_region_size = local_region_size
self.local_region_min = 30
self.local_region_max = 200
self.local_max_features = local_max_features
self.local_min_features = 0.5
self.local_region_iterations = 20
self.local_region_threshold = int(self.local_region_iterations / 2)
self.n_bins = n_bins
self.n_selected = 1
self.random_state = random_state
def fit(self, X, y=None):
"""Fit detector. y is ignored in unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : Ignored
Not used, present for API consistency by convention.
Returns
-------
self : object
Fitted estimator.
"""
# check detector_list
if len(self.detector_list) < 2:
raise ValueError("The detector list has less than 2 detectors.")
for detector in self.detector_list:
check_detector(detector)
# check random state and input
self.random_state = check_random_state(self.random_state)
X = check_array(X)
self._set_n_classes(y)
self.n_features_ = X.shape[1]
# normalize input data
self.X_train_norm_ = X
train_scores = np.zeros([self.X_train_norm_.shape[0], self.n_clf])
# fit each base detector and calculate standardized train scores
for k, detector in enumerate(self.detector_list):
detector.fit(self.X_train_norm_)
train_scores[:, k] = detector.decision_scores_
self.train_scores_ = train_scores
# set decision scores and threshold
self.decision_scores_ = self._get_decision_scores(X)
self._process_decision_scores()
return self
def decision_function(self, X):
"""Predict raw anomaly score of X using the fitted detector.
The anomaly score of an input sample is computed based on different
detector algorithms. For consistency, outliers are assigned with
larger anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The training input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
# check whether model has been fit
check_is_fitted(self, ['training_pseudo_label_', 'train_scores_',
'X_train_norm_', 'n_features_'])
# check input array
X = check_array(X)
if self.n_features_ != X.shape[1]:
raise ValueError("Number of features of the model must "
"match the input. Model n_features is {0} and "
"input n_features is {1}."
"".format(self.n_features_, X.shape[1]))
# get decision scores and return
decision_scores = self._get_decision_scores(X)
return decision_scores
def _get_decision_scores(self, X):
""" Helper function for getting outlier scores on test data X (note:
model must already be fit)
Parameters
----------
X : numpy array, shape (n_samples, n_features)
Test data
Returns
-------
pred_scores_ens : numpy array, shape (n_samples,)
Outlier scores for test samples
"""
# raise warning if local region size is outside acceptable limits
if (self.local_region_size < self.local_region_min) or (
self.local_region_size > self.local_region_max):
warnings.warn("Local region size of {} is outside "
"recommended range [{}, {}]".format(
self.local_region_size, self.local_region_min,
self.local_region_max))
# standardize test data and get local region for each test instance
X_test_norm = X
test_local_regions = self._get_local_region(X_test_norm)
# calculate test scores
test_scores = np.zeros([X_test_norm.shape[0], self.n_clf])
for k, detector in enumerate(self.detector_list):
test_scores[:, k] = detector.decision_function(X_test_norm)
# generate standardized scores
train_scores_norm, test_scores_norm = standardizer(self.train_scores_,
test_scores)
# generate pseudo target for training --> for calculating weights
self.training_pseudo_label_ = np.max(train_scores_norm,
axis=1).reshape(-1, 1)
# placeholder for ensemble predictions
pred_scores_ens = np.zeros([X_test_norm.shape[0], ])
# iterate through test instances (test_local_regions
# indices correspond to x_test)
for i, test_local_region in enumerate(test_local_regions):
# get pseudo target and training scores in local region of
# test instance
local_pseudo_ground_truth = self.training_pseudo_label_[
test_local_region,].ravel()
local_train_scores = train_scores_norm[test_local_region, :]
# calculate pearson correlation between local pseudo ground truth
# and local train scores
pearson_corr_scores = np.zeros([self.n_clf, ])
for d in range(self.n_clf):
pearson_corr_scores[d,] = pearsonr(
local_pseudo_ground_truth, local_train_scores[:, d])[0]
# return best score
pred_scores_ens[i,] = np.mean(
test_scores_norm[
i, self._get_competent_detectors(pearson_corr_scores)])
return pred_scores_ens
def _get_local_region(self, X_test_norm):
""" Get local region for each test instance
Parameters
----------
X_test_norm : numpy array, shape (n_samples, n_features)
Normalized test data
Returns
-------
final_local_region_list : List of lists, shape of [n_samples, [local_region]]
Indices of training samples in the local region of each test sample
"""
# Initialize the local region list
local_region_list = [[]] * X_test_norm.shape[0]
if self.local_max_features > 1.0:
warnings.warn(
"Local max features greater than 1.0, reducing to 1.0")
self.local_max_features = 1.0
if self.X_train_norm_.shape[1] * self.local_min_features < 1:
warnings.warn(
"Local min features smaller than 1, increasing to 1.0")
self.local_min_features = 1.0
# perform multiple iterations
for _ in range(self.local_region_iterations):
# if min and max are the same, then use all features
if self.local_max_features == self.local_min_features:
features = range(0, self.X_train_norm_.shape[1])
warnings.warn("Local min features equals local max features; "
"use all features instead.")
else:
# randomly generate feature subspaces
features = generate_bagging_indices(
self.random_state,
bootstrap_features=False,
n_features=self.X_train_norm_.shape[1],
min_features=int(
self.X_train_norm_.shape[1] * self.local_min_features),
max_features=int(
self.X_train_norm_.shape[1] * self.local_max_features))
# build KDTree out of training subspace
tree = KDTree(self.X_train_norm_[:, features])
# Find neighbors of each test instance
_, ind_arr = tree.query(X_test_norm[:, features],
k=self.local_region_size)
# add neighbors to local region list
for j in range(X_test_norm.shape[0]):
local_region_list[j] = local_region_list[j] + \
ind_arr[j, :].tolist()
# keep nearby points which occur at least local_region_threshold times
final_local_region_list = [[]] * X_test_norm.shape[0]
for j in range(X_test_norm.shape[0]):
tmp = [item for item, count in collections.Counter(
local_region_list[j]).items() if
count > self.local_region_threshold]
decrease_value = 0
while len(tmp) < 2:
decrease_value = decrease_value + 1
assert decrease_value < self.local_region_threshold
tmp = [item for item, count in
collections.Counter(local_region_list[j]).items() if
count > (self.local_region_threshold - decrease_value)]
final_local_region_list[j] = tmp
return final_local_region_list
def _get_competent_detectors(self, scores):
""" Identifies competent base detectors based on correlation scores
Parameters
----------
scores : numpy array, shape (n_clf,)
Correlation scores for each classifier (for a specific
test instance)
Returns
-------
candidates : List
Indices for competent detectors (for given test instance)
"""
# create histogram of correlation scores
scores = scores.reshape(-1, 1)
# TODO: handle when Pearson score is 0
# if scores contain nan, change it to 0
if np.isnan(scores).any():
scores = np.nan_to_num(scores)
if self.n_bins > self.n_clf:
warnings.warn(
"The number of histogram bins is greater than the number of "
"classifiers, reducing n_bins to n_clf.")
self.n_bins = self.n_clf
hist, bin_edges = np.histogram(scores, bins=self.n_bins)
# find n_selected largest bins
max_bins = argmaxn(hist, n=self.n_selected)
candidates = []
# iterate through bins
for max_bin in max_bins:
# determine which detectors are inside this bin
selected = np.where((scores >= bin_edges[max_bin])
& (scores <= bin_edges[max_bin + 1]))
# add to list of candidates
candidates = candidates + selected[0].tolist()
return candidates
def __len__(self):
return len(self.detector_list)
def __getitem__(self, index):
return self.detector_list[index]
def __iter__(self):
return iter(self.detector_list)