yzhao062/Pyod

View on GitHub
pyod/models/rod.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
"""Rotation-based Outlier Detector (ROD)
"""
# Author: Yahya Almardeny <almardeny@gmail.com>
# License: BSD 2 clause


import multiprocessing
from itertools import combinations as com
from multiprocessing import Pool

import numba
import numpy as np
from sklearn.preprocessing import MinMaxScaler, RobustScaler
from sklearn.utils import check_array

from .base import BaseDetector


@numba.njit
def mad(costs, median=None):
    """Apply the robust median absolute deviation (MAD)
    to measure the inconsistency/variability of the
    rotation costs.

    Parameters
    ----------
    costs : list of rotation costs
    median: float (default=None), MAD median

    Returns
    -------
    z : float
        the modified z scores
    """
    costs_ = np.reshape(costs, (-1, 1))
    median = np.nanmedian(costs_) if median is None else median
    diff = np.abs(costs_ - median)
    return np.ravel(0.6745 * diff / np.median(diff)), median


def angle(v1, v2):
    """find the angle between two 3D vectors.

    Parameters
    ----------
    v1 : list, first vector
    v2 : list, second vector

    Returns
    -------
    angle : float, the angle
    """
    return np.arccos(np.dot(v1, v2) /
                     (np.linalg.norm(v1) * np.linalg.norm(v2)))


def geometric_median(x, eps=1e-5):
    """
    Find the multivariate geometric L1-median by applying
    Vardi and Zhang algorithm.

    Parameters
    ----------
    x : array-like, the data points
    eps: float (default=1e-5), a threshold to indicate when to stop

    Returns
    -------
    gm : array, Geometric L1-median
    """
    points = np.unique(x, axis=0)
    gm_ = np.mean(points, 0)  # initialize geometric median
    while True:
        D = euclidean(points, gm_, c=True)
        non_zeros = (D != 0)[:, 0]
        Dinv = 1 / D[non_zeros]
        Dinvs = np.sum(Dinv)
        W = Dinv / Dinvs
        T = np.sum(W * points[non_zeros], 0)
        num_zeros = len(points) - np.sum(non_zeros)
        if num_zeros == 0:
            gm1 = T
        elif num_zeros == len(points):
            return gm_
        else:
            R = (T - gm_) * Dinvs
            r = np.linalg.norm(R)
            r_inv = 0 if r == 0 else num_zeros / r
            gm1 = max(0, 1 - r_inv) * T + min(1, r_inv) * gm_

        if euclidean(gm_, gm1) < eps:
            return gm1

        gm_ = gm1


def scale_angles(gammas, scaler1=None, scaler2=None):
    """
    Scale all angles in which angles <= 90
    degree will be scaled within [0 - 54.7] and
    angles > 90 will be scaled within [90 - 126]

    Parameters
    ----------
    gammas : list, angles
    scaler1: obj (default=None), MinMaxScaler of Angles group 1
    scaler2: obj (default=None), MinMaxScaler of Angles group 2

    Returns
    -------
    scaled angles, scaler1, scaler2
    """
    first, second = [], []
    first_ind, second_ind = [], []
    q1 = np.pi / 2.
    for i, g in enumerate(gammas):
        if g <= q1:
            first.append(g)
            first_ind.append(i)
        else:
            second.append(g)
            second_ind.append(i)
    if scaler1 is None:  # this indicates the `fit()`
        min_f, max_f = 0.001, 0.955
        scaler1 = MinMaxScaler(feature_range=(min_f, max_f))
        # min_f and max_f are required to be fit by scaler for consistency between train and test sets
        scaler1.fit(np.array(first + [min_f, max_f]).reshape(-1, 1))
        first = scaler1.transform(np.array(first).reshape(-1, 1)).reshape(
            -1) if first else []
    else:
        first = scaler1.transform(np.array(first).reshape(-1, 1)).reshape(
            -1) if first else []
    if scaler2 is None:  # this indicates the `fit()`
        min_s, max_s = q1 + 0.001, 2.186
        scaler2 = MinMaxScaler(feature_range=(min_s, max_s))
        # min_s and max_s are required to be fit by scaler for consistency between train and test sets
        scaler2.fit(np.array(second + [min_s, max_s]).reshape(-1, 1))
        second = scaler2.transform(np.array(second).reshape(-1, 1)).reshape(
            -1) if second else []
    else:
        second = scaler2.transform(np.array(second).reshape(-1, 1)).reshape(
            -1) if second else []
    # restore original order
    return np.concatenate([first, second])[
        np.argsort(first_ind + second_ind)], scaler1, scaler2


def euclidean(v1, v2, c=False):
    """
    Find the euclidean distance between two vectors
    or between a vector and a collection of vectors.

    Parameters
    ----------
    v1 : list, first 3D vector or collection of vectors
    v2 : list, second 3D vector
    c : bool (default=False), if True, it means the v1 is a list of vectors.

    Returns
    -------
    list of list of euclidean distances if c==True.
    Otherwise float: the euclidean distance
    """
    if c:
        res = []
        for _v in v1:
            res.append([np.sqrt((_v[0] - v2[0]) ** 2 +
                                (_v[1] - v2[1]) ** 2 +
                                (_v[2] - v2[2]) ** 2)])
        return np.asarray(res)
    return np.sqrt((v1[0] - v2[0]) ** 2 +
                   (v1[1] - v2[1]) ** 2 +
                   (v1[2] - v2[2]) ** 2)


def rod_3D(x, gm=None, median=None, scaler1=None, scaler2=None):
    """
    Find ROD scores for 3D Data.
    note that gm, scaler1 and scaler2 will be returned "as they are"
    and without being changed if the model has been fit already

    Parameters
    ----------
    x : array-like, 3D data points.
    gm: list (default=None), the geometric median
    median: float (default=None), MAD median
    scaler1: obj (default=None), MinMaxScaler of Angles group 1
    scaler2: obj (default=None), MinMaxScaler of Angles group 2

    Returns
    -------
    decision_scores, gm, scaler1, scaler2
    """
    # find the geometric median if it is not already fit
    gm = geometric_median(x) if gm is None else gm
    # find its norm and center data around it
    norm_ = np.linalg.norm(gm)
    _x = x - gm
    # calculate the scaled angles between the geometric median and each data point vector
    v_norm = np.linalg.norm(_x, axis=1)
    gammas, scaler1, scaler2 = scale_angles(
        np.arccos(np.clip(np.dot(_x, gm) / (v_norm * norm_), -1, 1)),
        scaler1=scaler1, scaler2=scaler2)
    # apply the ROD main equation to find the rotation costs
    costs = np.power(v_norm, 3) * np.cos(gammas) * np.square(np.sin(gammas))
    # apply MAD to calculate the decision scores
    decision_scores, median = mad(costs, median=median)
    return decision_scores, list(gm), median, scaler1, scaler2


@numba.njit
def sigmoid(x):
    """
    Implementation of Sigmoid function

    Parameters
    ----------
    x : array-like, decision scores

    Returns
    -------
    array-like, x after applying sigmoid
    """
    return 1 / (1 + np.exp(-x))


def process_sub(subspace, gm, median, scaler1, scaler2):
    """
    Apply ROD on a 3D subSpace then process it with sigmoid
    to compare apples to apples

    Parameters
    ----------
    subspace : array-like, 3D subspace of the data
    gm: list, the geometric median
    median: float, MAD median
    scaler1: obj, MinMaxScaler of Angles group 1
    scaler2: obj, MinMaxScaler of Angles group 2

    Returns
    -------
    ROD decision scores with sigmoid applied, gm, scaler1, scaler2
    """
    mad_subspace, gm, median, scaler1, scaler2 = rod_3D(subspace, gm=gm,
                                                        median=median,
                                                        scaler1=scaler1,
                                                        scaler2=scaler2)
    return sigmoid(
        np.nan_to_num(np.array(mad_subspace))), gm, median, scaler1, scaler2


def rod_nD(X, parallel, gm=None, median=None, data_scaler=None,
           angles_scalers1=None, angles_scalers2=None):
    """
    Find ROD overall scores when Data is higher than 3D:
      # scale dataset using Robust Scaler
      # decompose the full space into a combinations of 3D subspaces,
      # Apply ROD on each combination,
      # squish scores per subspace, so we compare apples to apples,
      # calculate average of ROD scores of all subspaces per observation.
    Note that if gm, data_scaler, angles_scalers1, angles_scalers2 are None,
    that means it is a `fit()` process and they will be calculated and returned
    to the class to be saved for future prediction. Otherwise, if they are not None,
    then it is a prediction process.

    Parameters
    ----------
    X : array-like, data points
    parallel: bool, True runs the algorithm in parallel
    gm: list (default=None), the geometric median
    median: list (default=None), MAD medians
    data_scaler: obj (default=None), RobustScaler of data
    angles_scalers1: list (default=None), MinMaxScalers of Angles group 1
    angles_scalers2: list (default=None), MinMaxScalers of Angles group 2

    Returns
    -------
    ROD decision scores, gm, median, data_scaler, angles_scalers1, angles_scalers2
    """
    if data_scaler is None:  # for fitting
        data_scaler = RobustScaler()
        X = data_scaler.fit_transform(X)
    else:  # for prediction
        X = data_scaler.transform(X)
    dim = X.shape[1]
    all_subspaces = [X[:, _com] for _com in com(range(dim), 3)]
    all_gms = [None] * len(all_subspaces) if gm is None else gm
    all_meds = [None] * len(all_subspaces) if median is None else median
    all_angles_scalers1 = [None] * len(
        all_subspaces) if angles_scalers1 is None else angles_scalers1
    all_angles_scalers2 = [None] * len(
        all_subspaces) if angles_scalers2 is None else angles_scalers2
    if parallel:
        p = Pool(multiprocessing.cpu_count())
        args = [[a, b, c, d, e] for a, b, c, d, e in
                zip(all_subspaces, all_gms, all_meds,
                    all_angles_scalers1, all_angles_scalers2)]
        results = p.starmap(process_sub, args)
        subspaces_scores, gm, median, angles_scalers1, angles_scalers2 = [], [], [], [], []
        for res in results:
            subspaces_scores.append(list(res[0]))
            gm.append(res[1])
            median.append(res[2])
            angles_scalers1.append(res[3])
            angles_scalers2.append(res[4])
        scores = np.average(np.array(subspaces_scores).T, axis=1).reshape(-1)
        p.close()
        p.join()
        return scores, gm, median, data_scaler, angles_scalers1, angles_scalers2
    subspaces_scores, gm, median, angles_scalers1, angles_scalers2 = [], [], [], [], []
    for subspace, _gm, med, ang_s1, ang_s2 in zip(all_subspaces, all_gms,
                                                  all_meds,
                                                  all_angles_scalers1,
                                                  all_angles_scalers2):
        scores_, gm_, med_, ang_s1_, ang_s2_ = process_sub(subspace=subspace,
                                                           gm=_gm, median=med,
                                                           scaler1=ang_s1,
                                                           scaler2=ang_s2)
        subspaces_scores.append(scores_)
        gm.append(gm_)
        median.append(med_)
        angles_scalers1.append(ang_s1_)
        angles_scalers2.append(ang_s2_)
    scores = np.average(np.array(subspaces_scores).T, axis=1).reshape(-1)
    return scores, gm, median, data_scaler, angles_scalers1, angles_scalers2


class ROD(BaseDetector):
    """Rotation-based Outlier Detection (ROD), is a robust and parameter-free
    algorithm that requires no statistical distribution assumptions and
    works intuitively in three-dimensional space, where the 3D-vectors,
    representing the data points, are rotated about the geometric median
    two times counterclockwise using Rodrigues rotation formula.
    The results of the rotation are parallelepipeds where their volumes are
    mathematically analyzed as cost functions and used to calculate the
    Median Absolute Deviations to obtain the outlying score.
    For high dimensions > 3, the overall score is calculated by taking the
    average of the overall 3D-subspaces scores, that were resulted from
    decomposing the original data space.
    See :cite:`almardeny2020novel` for details.

    Parameters
    ----------
    contamination : float in (0., 0.5), optional (default=0.1)
        The amount of contamination of the data set, i.e.
        the proportion of outliers in the data set. Used when fitting to
        define the threshold on the decision function.

    parallel_execution: bool, optional (default=False).
        If set to True, the algorithm will run in parallel,
        for a better execution time. It is recommended to set
        this parameter to True ONLY for high dimensional data > 10,
        and if a proper hardware is available.

    Attributes
    ----------
    decision_scores_ : numpy array of shape (n_samples,)
        The outlier scores of the training data.
        The higher, the more abnormal. Outliers tend to have higher
        scores. This value is available once the detector is
        fitted.

    threshold_ : float
        The threshold is based on ``contamination``. It is the
        ``n_samples * contamination`` most abnormal samples in
        ``decision_scores_``. The threshold is calculated for generating
        binary outlier labels.

    labels_ : int, either 0 or 1
        The binary labels of the training data. 0 stands for inliers
        and 1 for outliers/anomalies. It is generated by applying
        ``threshold_`` on ``decision_scores_``.
    """

    def __init__(self, contamination=0.1, parallel_execution=False):
        super(ROD, self).__init__(contamination=contamination)
        if not isinstance(parallel_execution, bool):
            raise TypeError("parallel_execution should be bool. "
                            "Got {}".format(type(parallel_execution)))
        self.parallel_execution = parallel_execution

    def fit(self, X, y=None):
        """Fit detector. y is ignored in unsupervised methods.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.

        y : Ignored
            Not used, present for API consistency by convention.

        Returns
        -------
        self : object
            Fitted estimator.
        """
        X = check_array(X)
        self._set_n_classes(y)
        # reset learning parameters after each fit
        self.gm_ = None  # geometric median(s)
        self.median_ = None  # MAD median(s)
        self.data_scaler_ = None  # data scaler (in case of d>3)
        self.angles_scaler1_ = None  # scaler(s) of Angles Group 1
        self.angles_scaler2_ = None  # scaler(s) of Angles Group 2
        self.decision_scores_ = self.decision_function(X)
        self._process_decision_scores()

        return self

    def decision_function(self, X):
        """Predict raw anomaly score of X using the fitted detector.

        The anomaly score of an input sample is computed based on different
        detector algorithms. For consistency, outliers are assigned with
        larger anomaly scores.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The training input samples. Sparse matrices are accepted only
            if they are supported by the base estimator.

        Returns
        -------
        anomaly_scores : numpy array of shape (n_samples,)
                         The anomaly score of the input samples.
        """
        X = check_array(X)
        if X.shape[1] < 3:
            X = np.hstack((X, np.zeros(shape=(X.shape[0], 3 - X.shape[1]))))

        if X.shape[1] == 3:
            scores, self.gm_, self.median_, self.angles_scaler1_, \
                self.angles_scaler2_ = rod_3D(x=X, gm=self.gm_,
                                              median=self.median_,
                                              scaler1=self.angles_scaler1_,
                                              scaler2=self.angles_scaler2_)
            return scores

        scores, self.gm_, self.median_, self.data_scaler_, \
            self.angles_scaler1_, self.angles_scaler2_ = rod_nD(X=X,
                                                                parallel=self.parallel_execution,
                                                                gm=self.gm_,
                                                                median=self.median_,
                                                                data_scaler=self.data_scaler_,
                                                                angles_scalers1=self.angles_scaler1_,
                                                                angles_scalers2=self.angles_scaler2_)
        return scores