yzhao062/Pyod

View on GitHub
pyod/models/abod.py

Summary

Maintainability
A
3 hrs
Test Coverage
# -*- coding: utf-8 -*-
"""Angle-based Outlier Detector (ABOD)
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause

from __future__ import division
from __future__ import print_function

import warnings
from itertools import combinations

import numpy as np
from numba import njit
from sklearn.neighbors import KDTree
from sklearn.neighbors import NearestNeighbors
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted

from .base import BaseDetector
from ..utils.utility import check_parameter


@njit
def _wcos(curr_pt, a, b):  # pragma: no cover
    """Internal function to calculate weighted cosine using optimized
    numba code.

    Parameters
    ----------
    curr_pt : numpy array of shape (n_samples, n_features)
        Current sample to be calculated.

    a : numpy array of shape (n_samples, n_features)
        Training sample a.

    b : numpy array of shape (n_samples, n_features)
        Training sample b.

    Returns
    -------
    wcos : float in range [-1, 1]
        Cosine similarity between a-curr_pt and b-curr_pt.

    """

    a_curr = a - curr_pt
    b_curr = b - curr_pt

    # wcos = (<a_curr, b_curr>/((|a_curr|*|b_curr|)^2)
    wcos = np.dot(a_curr, b_curr) / (
            np.linalg.norm(a_curr, 2) ** 2) / (
                   np.linalg.norm(b_curr, 2) ** 2)
    return wcos


def _calculate_wocs(curr_pt, X, X_ind):
    """Calculated the variance of weighted cosine of a point.
    wcos = (<a_curr, b_curr>/((|a_curr|*|b_curr|)^2)

    Parameters
    ----------
    curr_pt : numpy array, shape (1, n_features)
        The sample to be calculated.

    X : numpy array of shape (n_samples, n_features)
        The training dataset.

    X_ind : list
        The valid index of the training data.

    Returns
    -------
    cos_angle_var : float
        The variance of cosine angle

    """
    wcos_list = []
    curr_pair_inds = list(combinations(X_ind, 2))
    for j, (a_ind, b_ind) in enumerate(curr_pair_inds):
        a = X[a_ind, :]
        b = X[b_ind, :]

        # skip if no angle can be formed
        if np.array_equal(a, curr_pt) or np.array_equal(b, curr_pt):
            continue
        # add the weighted cosine to the list
        wcos_list.append(_wcos(curr_pt, a, b))
    return np.var(wcos_list)


# noinspection PyPep8Naming
class ABOD(BaseDetector):
    """ABOD class for Angle-base Outlier Detection.
    For an observation, the variance of its weighted cosine scores to all
    neighbors could be viewed as the outlying score.
    See :cite:`kriegel2008angle` for details.

    Two version of ABOD are supported:

    - Fast ABOD: use k nearest neighbors to approximate.
    - Original ABOD: consider all training points with high time complexity at
      O(n^3).

    Parameters
    ----------
    contamination : float in (0., 0.5), optional (default=0.1)
        The amount of contamination of the data set, i.e.
        the proportion of outliers in the data set. Used when fitting to
        define the threshold on the decision function.

    n_neighbors : int, optional (default=10)
        Number of neighbors to use by default for k neighbors queries.

    method: str, optional (default='fast')
        Valid values for metric are:

        - 'fast': fast ABOD. Only consider n_neighbors of training points
        - 'default': original ABOD with all training points, which could be
          slow

    Attributes
    ----------
    decision_scores_ : numpy array of shape (n_samples,)
        The outlier scores of the training data.
        The higher, the more abnormal. Outliers tend to have higher
        scores. This value is available once the detector is
        fitted.

    threshold_ : float
        The threshold is based on ``contamination``. It is the
        ``n_samples * contamination`` most abnormal samples in
        ``decision_scores_``. The threshold is calculated for generating
        binary outlier labels.

    labels_ : int, either 0 or 1
        The binary labels of the training data. 0 stands for inliers
        and 1 for outliers/anomalies. It is generated by applying
        ``threshold_`` on ``decision_scores_``.
    """

    def __init__(self, contamination=0.1, n_neighbors=5, method='fast'):
        super(ABOD, self).__init__(contamination=contamination)
        self.method = method
        self.n_neighbors = n_neighbors

    def fit(self, X, y=None):
        """Fit detector. y is ignored in unsupervised methods.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.

        y : Ignored
            Not used, present for API consistency by convention.

        Returns
        -------
        self : object
            Fitted estimator.
        """
        # validate inputs X and y (optional)
        X = check_array(X)
        self._set_n_classes(y)

        self.X_train_ = X
        self.n_train_ = X.shape[0]
        self.decision_scores_ = np.zeros([self.n_train_, 1])

        if self.method == 'fast':
            self._fit_fast()
        elif self.method == 'default':
            self._fit_default()
        else:
            raise ValueError(self.method, "is not a valid method")

        # flip the scores
        self.decision_scores_ = self.decision_scores_.ravel() * -1
        self._process_decision_scores()
        return self

    def _fit_default(self):
        """Default ABOD method. Use all training points with high complexity
        O(n^3). For internal use only.
        """
        for i in range(self.n_train_):
            curr_pt = self.X_train_[i, :]

            # get the index pairs of the neighbors, remove itself from index
            X_ind = list(range(0, self.n_train_))
            X_ind.remove(i)

            self.decision_scores_[i, 0] = _calculate_wocs(curr_pt,
                                                          self.X_train_,
                                                          X_ind)
        return self

    def _fit_fast(self):
        """Fast ABOD method. Only use n_neighbors for angle calculation.
        Internal use only
        """

        # make sure the n_neighbors is in the range
        if self.n_neighbors >= self.n_train_:
            self.n_neighbors = self.n_train_ - 1
            warnings.warn("n_neighbors is set to the number of "
                          "training points minus 1: {0}".format(self.n_train_))

            check_parameter(self.n_neighbors, 1, self.n_train_,
                            include_left=True, include_right=True)

        self.tree_ = KDTree(self.X_train_)

        neigh = NearestNeighbors(n_neighbors=self.n_neighbors)
        neigh.fit(self.X_train_)
        ind_arr = neigh.kneighbors(n_neighbors=self.n_neighbors,
                                   return_distance=False)

        for i in range(self.n_train_):
            curr_pt = self.X_train_[i, :]
            X_ind = ind_arr[i, :]
            self.decision_scores_[i, 0] = _calculate_wocs(curr_pt,
                                                          self.X_train_,
                                                          X_ind)
        return self

    # noinspection PyPep8Naming
    def decision_function(self, X):
        """Predict raw anomaly score of X using the fitted detector.

        The anomaly score of an input sample is computed based on different
        detector algorithms. For consistency, outliers are assigned with
        larger anomaly scores.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The training input samples. Sparse matrices are accepted only
            if they are supported by the base estimator.

        Returns
        -------
        anomaly_scores : numpy array of shape (n_samples,)
            The anomaly score of the input samples.
        """

        check_is_fitted(self, ['X_train_', 'n_train_', 'decision_scores_',
                               'threshold_', 'labels_'])
        X = check_array(X)

        if self.method == 'fast':  # fast ABOD
            # outliers have higher outlier scores
            return self._decision_function_fast(X) * -1
        else:  # default ABOD
            return self._decision_function_default(X) * -1

    def _decision_function_default(self, X):
        """Internal method for predicting outlier scores using default ABOD.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The training input samples.

        Returns
        -------
        pred_score : array, shape (n_samples,)
            The anomaly score of the input samples.

        """
        # initialize the output score
        pred_score = np.zeros([X.shape[0], 1])

        for i in range(X.shape[0]):
            curr_pt = X[i, :]
            # get the index pairs of the neighbors
            X_ind = list(range(0, self.n_train_))
            pred_score[i, :] = _calculate_wocs(curr_pt, self.X_train_, X_ind)

        return pred_score.ravel()

    def _decision_function_fast(self, X):
        """Internal method for predicting outlier scores using Fast ABOD.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The training input samples.

        Returns
        -------
        pred_score : array, shape (n_samples,)
            The anomaly score of the input samples.

        """

        check_is_fitted(self, ['tree_'])
        # initialize the output score
        pred_score = np.zeros([X.shape[0], 1])

        # get the indexes of the X's k nearest training points
        _, ind_arr = self.tree_.query(X, k=self.n_neighbors)

        for i in range(X.shape[0]):
            curr_pt = X[i, :]
            X_ind = ind_arr[i, :]
            pred_score[i, :] = _calculate_wocs(curr_pt, self.X_train_, X_ind)

        return pred_score.ravel()