yzhao062/Pyod

View on GitHub
pyod/models/devnet.py

Summary

Maintainability
B
4 hrs
Test Coverage
# -*- coding: utf-8 -*-

"""Deep anomaly detection with deviation networks
Part of the codes are adapted from
https://github.com/GuansongPang/deviation-network
"""
# Author: Sihan Chen <schen976@usc.edu>
# License: BSD 2 clause


# Import necessary libraries
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.utils import check_array
from torch.utils.data import Dataset, DataLoader

from .base import BaseDetector
from ..utils.torch_utility import TorchDataset

MAX_INT = np.iinfo(np.int32).max
data_format = 0

# Set random seed for reproducibility
np.random.seed(42)
torch.manual_seed(42)


# Define the network architectures
class DevNetD(nn.Module):
    def __init__(self, input_shape):
        super(DevNetD, self).__init__()
        self.fc1 = nn.Linear(input_shape, 1000)
        self.fc2 = nn.Linear(1000, 250)
        self.fc3 = nn.Linear(250, 20)
        self.score = nn.Linear(20, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.score(x)
        return x


class DevNetS(nn.Module):
    def __init__(self, input_shape):
        super(DevNetS, self).__init__()
        self.fc1 = nn.Linear(input_shape, 1000)
        self.score = nn.Linear(1000, 1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.score(x)
        return x


class DevNetLinear(nn.Module):
    def __init__(self, input_shape):
        super(DevNetLinear, self).__init__()
        self.score = nn.Linear(input_shape, 1)

    def forward(self, x):
        x = self.score(x)
        return x


def deviation_loss(y_true, y_pred):
    '''
    Z-score-based deviation loss translated to PyTorch.
    '''
    confidence_margin = 5.0
    # size=5000 is the setting of l in algorithm 1 in the paper
    ref = torch.randn(5000, device=y_pred.device,
                      dtype=torch.float32)  # Generate normal distributed ref values
    dev = (y_pred - ref.mean()) / ref.std()
    inlier_loss = torch.abs(dev)
    outlier_loss = torch.abs(torch.clamp(confidence_margin - dev, min=0))

    # Compute the mean of the weighted sum of inlier and outlier losses
    return torch.mean((1 - y_true) * inlier_loss + y_true * outlier_loss)


# Define the training and testing process
def train_and_test(model, train_loader, test_loader, epochs, device):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    model.train()

    for epoch in range(epochs):
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

    model.eval()
    with torch.no_grad():
        total_loss = 0
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            total_loss += criterion(outputs, labels).item()
        print('Test Loss:', total_loss / len(test_loader))


# Main function to run the model
def deviation_network(input_shape, network_depth):
    '''
    Construct the deviation network-based detection model in PyTorch style
    '''
    # Select the model based on the network depth
    if network_depth == 4:
        model = DevNetD(input_shape)
    elif network_depth == 2:
        model = DevNetS(input_shape)
    elif network_depth == 1:
        model = DevNetLinear(input_shape)
    else:
        raise ValueError(
            "The network depth is not set properly")  # Use exception instead of sys.exit

    # Initialize the optimizer
    optimizer = optim.RMSprop(model.parameters(), lr=0.001,
                              weight_decay=1e-6)  # Set clipnorm equivalent in PyTorch
    return model, optimizer


class SupDataset(Dataset):
    def __init__(self, x, outlier_indices, inlier_indices, rng):
        self.x = x
        self.outlier_indices = outlier_indices
        self.inlier_indices = inlier_indices
        self.rng = np.random.RandomState(
            42)  # Ensure rng is seeded outside or fixed

    def __len__(self):
        return len(self.outlier_indices) + len(
            self.inlier_indices)  # or any other appropriate length

    def __getitem__(self, idx):
        if idx < len(self.inlier_indices):
            # Assuming inliers are processed first
            label = 0  # Assuming inlier label
            index = self.inlier_indices[idx]
        else:
            # Processing outliers
            label = 1  # Assuming outlier label
            index = self.outlier_indices[idx - len(self.inlier_indices)]

        return self.x[index], label


def input_batch_generation_sup_sparse(x_train, outlier_indices, inlier_indices,
                                      batch_size, rng):
    '''
    Batch generation for samples, alternating between positive and negative.
    Adjusted for use with PyTorch, handling data in tensors.
    '''
    training_data = []
    training_labels = []
    n_inliers = len(inlier_indices)
    n_outliers = len(outlier_indices)

    for i in range(batch_size):
        if i % 2 == 0:
            sid = rng.choice(n_inliers, 1)
            training_data.append(x_train[inlier_indices[sid.item()]])
            training_labels.append(0)
        else:
            sid = rng.choice(n_outliers, 1)
            training_data.append(x_train[outlier_indices[sid.item()]])
            training_labels.append(1)

    # Convert lists to tensors
    training_data = torch.stack(training_data)
    training_labels = torch.tensor(training_labels, dtype=torch.long)

    return training_data, training_labels


def load_model_weight_predict(model, x_test):
    # Ensure x_test is a PyTorch tensor and also ensure it's on the same device as the model
    x_test = torch.tensor(x_test, dtype=torch.float32)

    # Assuming data_format variable should be defined somewhere in the context or as a parameter
    data_format = 0  # Assuming it's set correctly according to your use-case

    if data_format == 0:
        scores = model(x_test)
    else:
        data_size = x_test.shape[0]
        scores = torch.zeros([data_size, ])
        batch_size = 512
        for i in range(0, data_size, batch_size):
            end = min(i + batch_size, data_size)
            subset = x_test[i:end]
            scores[i:end] = model(subset)

    # Make sure the output is flattened before returning
    scores = scores.flatten()  # Flatten the tensor to ensure it's one-dimensional

    return scores.detach().cpu().numpy()  # Convert to numpy array if needed


class DevNet(BaseDetector):
    def __init__(self,
                 network_depth=2,
                 batch_size=512,
                 epochs=50,
                 nb_batch=20,
                 known_outliers=30,
                 cont_rate=0.02,
                 data_format=0,  # Assuming '0' for CSV
                 random_seed=42,
                 device=None,
                 contamination=0.1):
        super(DevNet, self).__init__(contamination=contamination)
        self._classes = 2
        self.network_depth = network_depth
        self.batch_size = batch_size
        self.epochs = epochs
        self.nb_batch = nb_batch
        self.known_outliers = known_outliers
        self.cont_rate = cont_rate
        self.data_format = data_format
        self.random_seed = random_seed
        self.device = device
        if self.device is None:
            self.device = torch.device(
                "cuda:0" if torch.cuda.is_available() else "cpu")

    def fit(self, X, y):
        outlier_indices = np.where(y == 1)[0]
        inlier_indices = np.where(y == 0)[0]
        n_outliers = len(outlier_indices)
        print("Original training size: %d, No. outliers: %d" % (
            X.shape[0], n_outliers))
        n_noise = len(np.where(y == 0)[0]) * self.contamination / (
                1. - self.contamination)
        n_noise = int(n_noise)
        outlier_indices = np.where(y == 1)[0]
        inlier_indices = np.where(y == 0)[0]
        print(y.shape[0], outlier_indices.shape[0], inlier_indices.shape[0],
              n_noise)
        # Data manipulation part can be adjusted as needed.
        self.model, optimizer = deviation_network(X.shape[1],
                                                  self.network_depth)
        rng = np.random.RandomState(42)
        train_dataset = SupDataset(X, outlier_indices, inlier_indices, rng)
        train_loader = DataLoader(train_dataset, batch_size=self.batch_size,
                                  shuffle=True)

        def train_model(model, data_loader, epochs):
            model.train()
            for epoch in range(epochs):
                for data, labels in data_loader:
                    data, labels = data.to(torch.float32), labels.to(
                        torch.float32)  # Ensure data types
                    optimizer.zero_grad()
                    outputs = model(data)
                    loss = deviation_loss(outputs, labels)
                    loss.backward()
                    optimizer.step()
                print(f'Epoch {epoch + 1}, Loss: {loss.item()}')

        # Training the model
        train_model(self.model, train_loader, epochs=self.epochs)
        self.decision_scores_ = self.decision_function(X)
        self._process_decision_scores()
        return self

    def decision_function(self, X):
        X = check_array(X)

        dataset = TorchDataset(X=X, return_idx=True)

        dataloader = torch.utils.data.DataLoader(dataset,
                                                 batch_size=self.batch_size,
                                                 shuffle=False)
        # enable the evaluation mode
        self.model.eval()

        # construct the vector for holding the reconstruction error
        outlier_scores = np.zeros([X.shape[0], ])
        with torch.no_grad():
            for data, data_idx in dataloader:
                data_cuda = data.to(self.device).float()
                # this is the outlier score
                outlier_scores[data_idx] = load_model_weight_predict(
                    self.model, data)
        return outlier_scores

    def fit_predict_score(self, X, y, scoring='roc_auc_score'):
        """
        Fit the detector with labels, predict on samples, and evaluate the model by predefined metrics.

        Parameters
        ----------
        X : numpy array of shape (n_samples, n_features)
            The input samples.
        y : numpy array of shape (n_samples,)
            The labels or target values corresponding to X.
        scoring : str, optional (default='roc_auc_score')
            Evaluation metric:
            - 'roc_auc_score': ROC score
            - 'prc_n_score': Precision @ rank n score

        Returns
        -------
        score : float
        """

        # Fit the model with both X and y
        self.fit(X, y)

        # Prediction and scoring
        if scoring == 'roc_auc_score':
            from sklearn.metrics import roc_auc_score
            score = roc_auc_score(y, self.decision_scores_)
        elif scoring == 'prc_n_score':
            from sklearn.metrics import precision_recall_curve
            precision, _, _ = precision_recall_curve(y, self.decision_scores_)
            score = precision[
                1]  # Assuming this is how you'd compute Precision @ rank n
        else:
            raise NotImplementedError('PyOD built-in scoring only supports '
                                      'ROC and Precision @ rank n')

        print("{metric}: {score}".format(metric=scoring, score=score))

        return score