yzhao062/Pyod

View on GitHub
pyod/models/dif.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
"""Deep Isolation Forest for Anomaly Detection (DIF)
"""
# Author: Hongzuo Xu <hongzuoxu@126.edu>
# License: BSD 2 clause


import numpy as np

try:
    import torch
except ImportError:
    print('please install torch first')

import torch
from torch.utils.data import DataLoader

from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, MinMaxScaler

from .base import BaseDetector
from ..utils.torch_utility import get_activation_by_name


class DIF(BaseDetector):
    """Deep Isolation Forest (DIF) is an extension of iForest. It uses deep
    representation ensemble to achieve non-linear isolation on original data
    space. See :cite:`xu2023dif` for details.

    Parameters
    ----------
    batch_size : int, optional (default=1000)
        Number of samples per gradient update.

    representation_dim, int, optional (default=20)
        Dimensionality of the representation space.

    hidden_neurons, list, optional (default=[64, 32])
        The number of neurons per hidden layers. So the network has the
        structure as [n_features, hidden_neurons[0], hidden_neurons[1], ..., representation_dim]

    hidden_activation, str, optional (default='tanh')
        Activation function to use for hidden layers.
        All hidden layers are forced to use the same type of activation.
        See https://pytorch.org/docs/stable/nn.html for details.
        Currently only
        'relu': nn.ReLU()
        'sigmoid': nn.Sigmoid()
        'tanh': nn.Tanh()
        are supported. See pyod/utils/torch_utility.py for details.

    skip_connection, boolean, optional (default=False)
        If True, apply skip-connection in the neural network structure.

    n_ensemble, int, optional (default=50)
        The number of deep representation ensemble members.

    n_estimators, int, optional (default=6)
        The number of isolation forest of each representation.

    max_samples, int, optional (default=256)
        The number of samples to draw from X to train each base isolation tree.

    contamination : float in (0., 0.5), optional (default=0.1)
        The amount of contamination of the data set,
        i.e. the proportion of outliers in the data set. Used when fitting to
        define the threshold on the decision function.

    random_state : int or None, optional (default=None)
        If int, random_state is the seed used by the random
        number generator;
        If None, the random number generator is the
        RandomState instance used by `np.random`.

    device, 'cuda', 'cpu', or None, optional (default=None)
        if 'cuda', use GPU acceleration in torch
        if 'cpu', use cpu in torch
        if None, automatically determine whether GPU is available


    Attributes
    ----------
    net_lst : list of torch.Module
        The list of representation neural networks.

    iForest_lst : list of iForest
        The list of instantiated iForest model.

    x_reduced_lst: list of numpy array
        The list of training data representations

    decision_scores_ : numpy array of shape (n_samples,)
        The outlier scores of the training data.
        The higher, the more abnormal. Outliers tend to have higher
        scores. This value is available once the detector is fitted.

    threshold_ : float
        The threshold is based on ``contamination``. It is the
        ``n_samples * contamination`` most abnormal samples in
        ``decision_scores_``. The threshold is calculated for generating
        binary outlier labels.

    labels_ : int, either 0 or 1
        The binary labels of the training data. 0 stands for inliers
        and 1 for outliers/anomalies. It is generated by applying
        ``threshold_`` on ``decision_scores_``.
    """

    def __init__(self,
                 batch_size=1000,
                 representation_dim=20,
                 hidden_neurons=None,
                 hidden_activation='tanh',
                 skip_connection=False,
                 n_ensemble=50,
                 n_estimators=6,
                 max_samples=256,
                 contamination=0.1,
                 random_state=None,
                 device=None):
        super(DIF, self).__init__(contamination=contamination)
        self.batch_size = batch_size
        self.representation_dim = representation_dim
        self.hidden_activation = hidden_activation
        self.skip_connection = skip_connection
        self.hidden_neurons = hidden_neurons

        self.n_ensemble = n_ensemble
        self.n_estimators = n_estimators
        self.max_samples = max_samples

        self.random_state = random_state
        self.device = device

        self.minmax_scaler = None

        # create default calculation device (support GPU if available)
        if self.device is None:
            self.device = torch.device(
                "cuda:0" if torch.cuda.is_available() else "cpu")

        # set random seed
        if self.random_state is not None:
            torch.manual_seed(self.random_state)
            torch.cuda.manual_seed(self.random_state)
            torch.cuda.manual_seed_all(self.random_state)
            np.random.seed(self.random_state)

        # default values for the amount of hidden neurons
        if self.hidden_neurons is None:
            self.hidden_neurons = [500, 100]

    def fit(self, X, y=None):
        """Fit detector. y is ignored in unsupervised methods.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.

        y : Ignored
            Not used, present for API consistency by convention.

        Returns
        -------
        self : object
            Fitted estimator.
        """
        # validate inputs X and y (optional)
        X = check_array(X)
        self._set_n_classes(y)

        n_samples, n_features = X.shape[0], X.shape[1]

        # conduct min-max normalization before feeding into neural networks
        self.minmax_scaler = MinMaxScaler()
        self.minmax_scaler.fit(X)
        X = self.minmax_scaler.transform(X)

        # prepare neural network parameters
        network_params = {
            'n_features': n_features,
            'n_hidden': self.hidden_neurons,
            'n_output': self.representation_dim,
            'activation': self.hidden_activation,
            'skip_connection': self.skip_connection
        }

        # iteration
        self.net_lst = []
        self.iForest_lst = []
        self.x_reduced_lst = []
        ensemble_seeds = np.random.randint(0, 100000, self.n_ensemble)
        for i in range(self.n_ensemble):
            # instantiate network class and seed random seed
            net = MLPnet(**network_params).to(self.device)
            torch.manual_seed(ensemble_seeds[i])

            # initialize network parameters
            for name, param in net.named_parameters():
                if name.endswith('weight'):
                    torch.nn.init.normal_(param, mean=0., std=1.)

            x_reduced = self._deep_representation(net, X)

            # save network and representations
            self.x_reduced_lst.append(x_reduced)
            self.net_lst.append(net)

            # perform iForest upon representations
            self.iForest_lst.append(
                IsolationForest(n_estimators=self.n_estimators,
                                max_samples=self.max_samples,
                                random_state=ensemble_seeds[i])
            )
            self.iForest_lst[i].fit(x_reduced)

        self.decision_scores_ = self.decision_function(X)
        self._process_decision_scores()
        return self

    def decision_function(self, X):
        """Predict raw anomaly score of X using the fitted detector.

        The anomaly score of an input sample is computed based on different
        detector algorithms. For consistency, outliers are assigned with
        larger anomaly scores.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The training input samples. Sparse matrices are accepted only
            if they are supported by the base estimator.

        Returns
        -------
        anomaly_scores : numpy array of shape (n_samples,)
            The anomaly score of the input samples.
        """
        check_is_fitted(self, ['net_lst', 'iForest_lst', 'x_reduced_lst'])
        X = check_array(X)

        # conduct min-max normalization before feeding into neural networks
        X = self.minmax_scaler.transform(X)

        testing_n_samples = X.shape[0]
        score_lst = np.zeros([self.n_ensemble, testing_n_samples])

        # iteration
        for i in range(self.n_ensemble):
            # transform testing data to representation
            x_reduced = self._deep_representation(self.net_lst[i], X)

            # calculate outlier scores
            scores = _cal_score(x_reduced, self.iForest_lst[i])
            score_lst[i] = scores

        final_scores = np.average(score_lst, axis=0)
        return final_scores

    def _deep_representation(self, net, X):
        x_reduced = []

        with torch.no_grad():
            loader = DataLoader(X, batch_size=self.batch_size,
                                drop_last=False, pin_memory=True,
                                shuffle=False)
            for batch_x in loader:
                batch_x = batch_x.float().to(self.device)
                batch_x_reduced = net(batch_x)
                x_reduced.append(batch_x_reduced)

        x_reduced = torch.cat(x_reduced).data.cpu().numpy()
        x_reduced = StandardScaler().fit_transform(x_reduced)
        x_reduced = np.tanh(x_reduced)
        return x_reduced


class MLPnet(torch.nn.Module):
    def __init__(self, n_features, n_hidden=[500, 100], n_output=20,
                 activation='ReLU', bias=False, batch_norm=False,
                 skip_connection=False):
        super(MLPnet, self).__init__()
        self.skip_connection = skip_connection
        self.n_output = n_output

        num_layers = len(n_hidden)

        if type(activation) == str:
            activation = [activation] * num_layers
            activation.append(None)

        assert len(activation) == len(
            n_hidden) + 1, 'activation and n_hidden are not matched'

        self.layers = []
        for i in range(num_layers + 1):
            in_channels, out_channels = \
                self.get_in_out_channels(i, num_layers, n_features,
                                         n_hidden, n_output, skip_connection)
            self.layers += [
                LinearBlock(in_channels, out_channels,
                            bias=bias, batch_norm=batch_norm,
                            activation=activation[i],
                            skip_connection=skip_connection if i != num_layers else False)
            ]
        self.network = torch.nn.Sequential(*self.layers)

    def forward(self, x):
        x = self.network(x)
        return x

    @staticmethod
    def get_in_out_channels(i, num_layers, n_features, n_hidden, n_output,
                            skip_connection):
        if skip_connection is False:
            in_channels = n_features if i == 0 else n_hidden[i - 1]
            out_channels = n_output if i == num_layers else n_hidden[i]
        else:
            in_channels = n_features if i == 0 else np.sum(
                n_hidden[:i]) + n_features
            out_channels = n_output if i == num_layers else n_hidden[i]
        return in_channels, out_channels


class LinearBlock(torch.nn.Module):
    def __init__(self, in_channels, out_channels,
                 activation='Tanh', bias=False, batch_norm=False,
                 skip_connection=False):
        super(LinearBlock, self).__init__()

        self.skip_connection = skip_connection

        self.linear = torch.nn.Linear(in_channels, out_channels, bias=bias)

        if activation is not None:
            # self.act_layer = _instantiate_class("torch.nn.modules.activation", activation)
            self.act_layer = get_activation_by_name(activation)
        else:
            self.act_layer = torch.nn.Identity()

        self.batch_norm = batch_norm
        if batch_norm is True:
            dim = out_channels
            self.bn_layer = torch.nn.BatchNorm1d(dim, affine=bias)

    def forward(self, x):
        x1 = self.linear(x)
        x1 = self.act_layer(x1)

        if self.batch_norm is True:
            x1 = self.bn_layer(x1)

        if self.skip_connection:
            x1 = torch.cat([x, x1], axis=1)

        return x1


def _cal_score(xx, clf):
    depths = np.zeros((xx.shape[0], len(clf.estimators_)))
    depth_sum = np.zeros(xx.shape[0])
    deviations = np.zeros((xx.shape[0], len(clf.estimators_)))
    leaf_samples = np.zeros((xx.shape[0], len(clf.estimators_)))

    for ii, estimator_tree in enumerate(clf.estimators_):
        tree = estimator_tree.tree_
        n_node = tree.node_count

        if n_node == 1:
            continue

        # get feature and threshold of each node in the iTree
        # in feature_lst, -2 indicates the leaf node
        feature_lst, threshold_lst = tree.feature.copy(), tree.threshold.copy()

        # compute depth and score
        leaves_index = estimator_tree.apply(xx)
        node_indicator = estimator_tree.decision_path(xx)

        # The number of training samples in each test sample leaf
        n_node_samples = estimator_tree.tree_.n_node_samples

        # node_indicator is a sparse matrix with shape (n_samples, n_nodes),
        # indicating the path of input data samples
        # each layer would result in a non-zero element in this matrix,
        # and then the row-wise summation is the depth of data sample
        n_samples_leaf = estimator_tree.tree_.n_node_samples[leaves_index]
        d = (np.ravel(node_indicator.sum(axis=1)) + _average_path_length(
            n_samples_leaf) - 1.0)
        depths[:, ii] = d
        depth_sum += d

        # decision path of data matrix XX
        node_indicator = np.array(node_indicator.todense())

        # set a matrix with shape [n_sample, n_node],
        # representing the feature value of each sample on each node
        # set the leaf node as -2
        value_mat = np.array([xx[i][feature_lst] for i in range(xx.shape[0])])
        value_mat[:, np.where(feature_lst == -2)[0]] = -2
        th_mat = np.array([threshold_lst for _ in range(xx.shape[0])])

        mat = np.abs(value_mat - th_mat) * node_indicator

        exist = (mat != 0)
        dev = mat.sum(axis=1) / (exist.sum(axis=1) + 1e-6)
        deviations[:, ii] = dev

    scores = 2 ** (-depth_sum / (len(clf.estimators_) * _average_path_length(
        [clf.max_samples_])))
    deviation = np.mean(deviations, axis=1)
    leaf_sample = (clf.max_samples_ - np.mean(leaf_samples,
                                              axis=1)) / clf.max_samples_

    scores = scores * deviation
    # scores = scores * deviation * leaf_sample
    return scores


def _average_path_length(n_samples_leaf):
    """
    The average path length in a n_samples iTree, which is equal to
    the average path length of an unsuccessful BST search since the
    latter has the same structure as an isolation tree.
    Parameters
    ----------
    n_samples_leaf : array-like of shape (n_samples,)
        The number of training samples in each test sample leaf, for
        each estimators.

    Returns
    -------
    average_path_length : ndarray of shape (n_samples,)
    """

    n_samples_leaf = check_array(n_samples_leaf, ensure_2d=False)

    n_samples_leaf_shape = n_samples_leaf.shape
    n_samples_leaf = n_samples_leaf.reshape((1, -1))
    average_path_length = np.zeros(n_samples_leaf.shape)

    mask_1 = n_samples_leaf <= 1
    mask_2 = n_samples_leaf == 2
    not_mask = ~np.logical_or(mask_1, mask_2)

    average_path_length[mask_1] = 0.
    average_path_length[mask_2] = 1.
    average_path_length[not_mask] = (
            2.0 * (np.log(n_samples_leaf[not_mask] - 1.0) + np.euler_gamma)
            - 2.0 * (n_samples_leaf[not_mask] - 1.0) / n_samples_leaf[not_mask]
    )

    return average_path_length.reshape(n_samples_leaf_shape)