DefinetlyNotAI/Logicytics

View on GitHub
CODE/VulnScan/tools/_study_network.py

Summary

Maintainability
A
0 mins
Test Coverage
from __future__ import annotations

import os
import os.path
import random
from collections import OrderedDict
from configparser import ConfigParser
from os import mkdir
from typing import Any

import joblib
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import plotly.graph_objects as go
import seaborn as sns
import torch
import torch.nn as nn
from faker import Faker
from numpy import ndarray, dtype
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.manifold import TSNE
from torch import device
from torch.utils.data import DataLoader, TensorDataset
from torchviz import make_dot
from tqdm import tqdm


# Example of DataLoader for loss landscape (dummy dataset for visualization)
class DummyDataset(torch.utils.data.Dataset):
    """
    A dummy dataset for generating synthetic data for visualization purposes.

    Attributes:
        num_samples (int): Number of samples in the dataset.
        input_dim (int): Dimension of the input data.
        data (list): List of generated data samples.
        labels (list): List of labels corresponding to the data samples.
    """

    def __init__(self, num_samples: int = 100, input_dim: int = 10000):
        """
        Initializes the DummyDataset with the specified number of samples and input dimension.

        Args:
            num_samples (int): Number of samples to generate.
            input_dim (int): Dimension of the input data.
        """
        self.num_samples = num_samples
        self.input_dim = input_dim
        self.data: list[str] = []
        self.labels: list[int] = []
        faker = Faker()
        for _ in range(num_samples):
            if random.random() < 0.05:  # 5% chance to include sensitive data
                self.data.append(f"Name: {faker.name()}, SSN: {faker.ssn()}, Address: {faker.address()}")
                self.labels.append(1)  # Label as sensitive
            else:
                self.data.append(faker.text(max_nb_chars=100))  # Non-sensitive data
                self.labels.append(0)  # Label as non-sensitive

    def __len__(self) -> int:
        """
        Returns the number of samples in the dataset.

        Returns:
            int: Number of samples in the dataset.
        """
        return self.num_samples

    def __getitem__(self, idx: int) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Retrieves the data and label at the specified index.

        Args:
            idx (int): Index of the data and label to retrieve.

        Returns:
            tuple: A tuple containing the data tensor and label tensor.
        """
        data = self.data[idx]
        label = self.labels[idx]
        # Convert data to tensor of ASCII values and pad to input_dim
        data_tensor = torch.tensor([ord(c) for c in data], dtype=torch.float32)
        if len(data_tensor) < self.input_dim:
            padding = torch.zeros(self.input_dim - len(data_tensor))
            data_tensor = torch.cat((data_tensor, padding))
        else:
            data_tensor = data_tensor[:self.input_dim]
        label_tensor = torch.tensor(label, dtype=torch.long)
        return data_tensor, label_tensor


def load_data(text_data: list[str], vectorizer_to_load: TfidfVectorizer | CountVectorizer) -> DataLoader:
    """
    Vectorizes the text data and creates a DataLoader for it.

    Args:
        text_data (list of str): The text data to be vectorized.
        vectorizer_to_load: The vectorizer to use for transforming the text data.

    Returns:
        DataLoader: A DataLoader containing the vectorized text data and dummy labels.
    """
    # Vectorize the text data
    X = vectorizer_to_load.transform(text_data)
    # Create a dummy label for visualization (replace with real labels if available)
    y = np.zeros(len(text_data))
    # Convert to torch tensors
    X_tensor = torch.tensor(X.toarray(), dtype=torch.float32)
    y_tensor = torch.tensor(y, dtype=torch.long)
    dataset = TensorDataset(X_tensor, y_tensor)
    return DataLoader(dataset, batch_size=32, shuffle=True)


def visualize_weight_distribution(model_to_load: torch.nn.Module):
    # Access weights of the first layer
    weights = model_to_load[0].weight.detach().cpu().numpy()  # Move tensor to CPU before conversion to numpy
    plt.hist(weights.flatten(), bins=50)
    plt.title("Weight Distribution - First Layer")
    plt.xlabel("Weight Value")
    plt.ylabel("Frequency")
    plt.savefig("NN features/Weight Distribution.png")
    plt.close()


def visualize_activations(model_to_load: torch.nn.Module, input_tensor: torch.Tensor):
    # Check the device of the model
    device_va = next(model_to_load.parameters()).device

    # Move the input tensor to the same device as the model
    input_tensor = input_tensor.to(device_va)

    activations = []

    # noinspection PyUnusedLocal
    def hook_fn(module, inputx, output):
        # Hook function to extract intermediate layer activations
        activations.append(output)

    model_to_load[0].register_forward_hook(hook_fn)  # Register hook on first layer

    # Perform a forward pass
    _ = model_to_load(input_tensor)
    activation = activations[0].detach().cpu().numpy()  # Move activations to CPU

    # Plot activations as a bar chart
    plt.figure(figsize=(10, 6))
    plt.bar(range(len(activation[0])), activation[0])
    plt.title("Activation Values - First Layer")
    plt.xlabel("Neuron Index")
    plt.ylabel("Activation Value")
    plt.savefig("NN features/Visualize Activation.png")
    plt.close()


def visualize_tsne(model_to_load: torch.nn.Module, dataloader: DataLoader):
    # Get the device of the model
    device_va = next(model_to_load.parameters()).device

    model_to_load.eval()  # Set the model to evaluation mode

    features = []
    labels = []

    with torch.no_grad():
        for data, target in dataloader:
            # Move data and target to the same device as the model
            data, target = data.to(device_va), target.to(device_va)

            # Extract features (output of the model)
            output = model_to_load(data)
            features.append(output.cpu().numpy())  # Move output to CPU for concatenation
            labels.append(target.cpu().numpy())  # Move target to CPU for concatenation

    # Stack all batches
    features = np.vstack(features)
    labels = np.hstack(labels)

    # Determine suitable perplexity
    num_samples = features.shape[0]
    perplexity = min(30, num_samples - 1)  # Ensure perplexity < num_samples

    # Apply t-SNE
    tsne = TSNE(n_components=2, random_state=42, perplexity=perplexity)
    reduced_features = tsne.fit_transform(features)

    # Plot the t-SNE results
    plt.figure(figsize=(10, 8))
    scatter = plt.scatter(reduced_features[:, 0], reduced_features[:, 1], c=labels, cmap='viridis', alpha=0.7)
    plt.colorbar(scatter, label="Class")
    plt.title("t-SNE Visualization of Features")
    plt.xlabel("t-SNE Dimension 1")
    plt.ylabel("t-SNE Dimension 2")
    plt.savefig("NN features/Visualize t-SNE.png")
    plt.close()


# Main function to run all visualizations
def plot_many_graphs():
    print("Starting synthetic data generation...")
    # Load data
    faker = Faker()

    # Generate sensitive examples
    sensitive_data = [
        f"Name: {faker.name()}, SSN: {faker.ssn()}, Address: {faker.address()}",
        f"Credit Card: {faker.credit_card_number()}, Expiry: {faker.credit_card_expire()}, CVV: {faker.credit_card_security_code()}",
        f"Patient: {faker.name()}, Condition: {faker.text(max_nb_chars=20)}",
        f"Password: {faker.password()}",
        f"Email: {faker.email()}",
        f"Phone: {faker.phone_number()}",
        f"Medical Record: {faker.md5()}",
        f"Username: {faker.user_name()}",
        f"IP: {faker.ipv4()}",
    ]

    # Generate non-sensitive examples
    non_sensitive_data = [
        faker.text(max_nb_chars=50) for _ in range(50000)
    ]

    data_text = non_sensitive_data + (sensitive_data * 15)
    random.shuffle(data_text)
    print("Loaded data for visualization.")
    dataloader = load_data(data_text, vectorizer)

    # Visualizations
    print("Creating visualizations...")
    visualize_weight_distribution(model)

    # For activations, use a sample from the dataloader
    print("Creating activation visualizations...")
    sample_input = next(iter(dataloader))[0]
    visualize_activations(model, sample_input)

    print("Creating t-SNE visualization - May take a long time...")
    visualize_tsne(model, dataloader)

    print("Completed.")


# Visualize feature importance (dummy example for visualization) and save as SVG
def visualize_feature_importance(TOKENS: list[str], FEATURE_IMPORTANCE: float | ndarray[Any, dtype[np.floating]],
                                 FILENAME: str = "Plot.svg"):
    # Limit the number of tokens to visualize
    TOKENS = TOKENS[:1000]
    FEATURE_IMPORTANCE = FEATURE_IMPORTANCE[:1000]

    plt.figure(figsize=(len(TOKENS) * 0.5, 6))
    sns.barplot(x=TOKENS, y=FEATURE_IMPORTANCE, palette="coolwarm", hue=TOKENS, legend=False)
    plt.title("Feature Importance")
    plt.xlabel("Tokens")
    plt.ylabel("Importance")
    plt.xticks(rotation=45)
    plt.savefig(FILENAME, format="svg")
    plt.close()  # Close the plot to release memory


# Function to visualize the loss landscape as an interactive 3D object
def plot_loss_landscape_3d(MODEL: torch.nn.Module, DATA_LOADER: DataLoader, CRITERION: torch.nn.Module,
                           GRID_SIZE: int = 200, EPSILON: float = 0.01, FILENAME: str = "Plot.html"):
    MODEL.eval()  # Set model to evaluation mode
    param = next(MODEL.parameters())  # Use the first parameter for landscape perturbations
    param_flat = param.view(-1)

    # Define perturbation directions u and v
    u = torch.randn_like(param_flat).view(param.shape).to(param.device)
    v = torch.randn_like(param_flat).view(param.shape).to(param.device)

    # Normalize perturbations
    u = EPSILON * u / torch.norm(u)
    v = EPSILON * v / torch.norm(v)

    # Create grid
    x = np.linspace(-1, 1, GRID_SIZE)
    y = np.linspace(-1, 1, GRID_SIZE)
    loss_values = np.zeros((GRID_SIZE, GRID_SIZE))

    # Iterate through the grid to compute losses
    for i, dx in enumerate(x):
        print(f"Computing loss for row {i + 1}/{GRID_SIZE}...")
        for j, dy in enumerate(y):
            print(f"    Computing loss for column {j + 1}/{GRID_SIZE}...")
            param.data += dx * u + dy * v  # Apply perturbation
            loss = 0

            # Compute loss for all batches in data loader
            for batch in DATA_LOADER:
                inputs, targets = batch
                inputs = inputs.to(param.device)
                targets = targets.to(param.device)
                outputs = MODEL(inputs)
                loss += CRITERION(outputs, targets).item()

            loss_values[i, j] = loss  # Store the loss
            param.data -= dx * u + dy * v  # Revert perturbation

    # Create a meshgrid for plotting
    X, Y = np.meshgrid(x, y)

    # Plot the 3D surface using Plotly
    fig = go.Figure(data=[go.Surface(z=loss_values, x=X, y=Y, colorscale="Viridis")])
    fig.update_layout(
        title="Loss Landscape (Interactive 3D)",
        scene=dict(
            xaxis_title="Perturbation in u",
            yaxis_title="Perturbation in v",
            zaxis_title="Loss",
        ),
    )

    # Save as an interactive HTML file
    fig.write_html(FILENAME)
    print(f"3D loss landscape saved as {FILENAME}")


def main_plot():
    # Instantiate data loader
    print("Creating dummy data loader...")
    dummy_data_loader = DataLoader(DummyDataset(), batch_size=32)

    # Define loss criterion
    print("Defining loss criterion...")
    criterion = torch.nn.CrossEntropyLoss()

    # Visualizations
    print("Creating visualizations...")
    tokens = vectorizer.get_feature_names_out()

    # Feature importance
    # Max number of features to visualize is 3000 due to image constraints
    print(
        f"Visualizing feature importance - This may take a while for {len(tokens[:NUMBER_OF_FEATURES]) + 1} tokens...")
    feature_importance = np.random.rand(len(tokens[:NUMBER_OF_FEATURES]))  # Example random importance
    visualize_feature_importance(tokens[:NUMBER_OF_FEATURES], feature_importance,
                                 FILENAME="NN features/feature_importance.svg")

    # Loss landscape
    print("Visualizing loss landscape - This may take a while...")
    plot_loss_landscape_3d(model, dummy_data_loader, criterion, FILENAME="NN features/loss_landscape_3d.html")

    # Set model to evaluation mode, and plot many graphs
    print("Setting model to evaluation mode...")
    model.eval()  # Set the model to evaluation mode
    plot_many_graphs()


def save_data(model_to_use: torch.nn.Module, input_size: tuple[int, Any] | int, batch_size: int = -1,
              device_to_use: str = "cuda"):
    def register_hook(module: torch.nn.Module):

        def hook(modules: torch.nn.Module, inputs: (torch.nn.Module, tuple[torch.Tensor]), output: torch.Tensor):
            class_name = str(modules.__class__).split(".")[-1].split("'")[0]
            module_idx = len(summaries)

            m_key = "%s-%i" % (class_name, module_idx + 1)
            summaries[m_key] = OrderedDict()
            summaries[m_key]["input_shape"] = list(inputs[0].size())
            summaries[m_key]["input_shape"][0] = batch_size
            if isinstance(output, (list, tuple)):
                summaries[m_key]["output_shape"] = [
                    [-1] + list(o.size())[1:] for o in output
                ]
            else:
                summaries[m_key]["output_shape"] = list(output.size())
                summaries[m_key]["output_shape"][0] = batch_size

            params = 0
            if hasattr(modules, "weight") and hasattr(modules.weight, "size"):
                params += torch.prod(torch.LongTensor(list(modules.weight.size())))
                summaries[m_key]["trainable"] = modules.weight.requires_grad
            if hasattr(modules, "bias") and hasattr(modules.bias, "size"):
                params += torch.prod(torch.LongTensor(list(modules.bias.size())))
            summaries[m_key]["nb_params"] = params

        if (
                not isinstance(module, nn.Sequential)
                and not isinstance(module, nn.ModuleList)
                and not (module == model_to_use)
        ):
            hooks.append(module.register_forward_hook(hook))

    device_to_use = device_to_use.lower()
    assert device_to_use in [
        "cuda",
        "cpu",
    ], "Input device is not valid, please specify 'cuda' or 'cpu'"

    if device_to_use == "cuda" and torch.cuda.is_available():
        dtype_to_use = torch.cuda.FloatTensor
    else:
        dtype_to_use = torch.FloatTensor

    # multiple inputs to the network
    if isinstance(input_size, tuple):
        input_size = [input_size]

    # batch_size of 2 for batch norm
    x = [torch.rand(2, *in_size).type(dtype_to_use) for in_size in input_size]

    # create properties
    summaries = OrderedDict()
    hooks = []

    # register hook
    model_to_use.apply(register_hook)

    # make a forward pass
    model_to_use(*x)

    # remove these hooks
    for h in hooks:
        h.remove()

    # Save the summary
    mode = "a" if os.path.exists("NN features/Model Summary.txt") else "w"
    with open('NN features/Model Summary.txt', mode) as vf_ms:
        vf_ms.write("----------------------------------------------------------------\n")
        line_new = "{:>20}  {:>25} {:>15}".format("Layer (type)", "Output Shape", "Param #")
        vf_ms.write(f"{line_new}\n")
        vf_ms.write("================================================================\n")
        total_params = 0
        total_output = 0
        trainable_params = 0
        for layer in summaries:
            # input_shape, output_shape, trainable, nb_params
            line_new = "{:>20}  {:>25} {:>15}".format(
                layer,
                str(summaries[layer]["output_shape"]),
                "{0:,}".format(summaries[layer]["nb_params"]),
            )
            total_params += summaries[layer]["nb_params"]
            total_output += np.prod(summaries[layer]["output_shape"])
            if "trainable" in summaries[layer]:
                if summaries[layer]["trainable"]:
                    trainable_params += summaries[layer]["nb_params"]
            vf_ms.write(f"{line_new}\n")

        # assume 4 bytes/number (float on cuda).
        total_input_size = abs(np.prod(input_size) * batch_size * 4. / (1024 ** 2.))
        total_output_size = abs(2. * total_output * 4. / (1024 ** 2.))  # x2 for gradients
        total_params_size = abs(total_params.numpy() * 4. / (1024 ** 2.))
        total_size = total_params_size + total_output_size + total_input_size

        vf_ms.write("\n================================================================")
        vf_ms.write("\nTotal params: {0:,}".format(total_params))
        vf_ms.write("\nTrainable params: {0:,}".format(trainable_params))
        vf_ms.write("\nNon-trainable params: {0:,}".format(total_params - trainable_params))
        vf_ms.write("\n----------------------------------------------------------------")
        vf_ms.write("\nInput size (MB): %0.2f" % total_input_size)
        vf_ms.write("\nForward/backward pass size (MB): %0.2f" % total_output_size)
        vf_ms.write("\nParams size (MB): %0.2f" % total_params_size)
        vf_ms.write("\nEstimated Total Size (MB): %0.2f" % total_size)
        vf_ms.write("\n----------------------------------------------------------------\n")


def save_graph():
    # Create a directed graph
    G = nx.DiGraph()

    def add_edges_bulk(layer_names: str, weight_matrices: np.ndarray[np.float32]):
        """Efficiently add edges to the graph with progress tracking."""
        threshold = 0.1  # Adjust this threshold as needed
        significant_weights = np.abs(weight_matrices) > threshold
        rows, cols = np.where(significant_weights)
        weights = weight_matrices[rows, cols]

        # Use tqdm for progress tracking
        edge_count = len(rows)
        with tqdm(total=edge_count, desc=f"Processing {layer_names}", unit="edges") as pbar:
            for row, col, weight in zip(rows, cols, weights):
                in_node = f"{layer_names}_in_{col}"
                out_node = f"{layer_names}_out_{row}"
                G.add_edge(in_node, out_node, weight=weight)
                pbar.update(1)

    # Process parameters
    for name, param in model.named_parameters():
        if 'weight' in name:
            layer_name = name.split('.')[0]
            weight_matrix = param.data.cpu().numpy()

            # Add edges with progress bar
            add_edges_bulk(layer_name, weight_matrix)

    # Draw the graph
    print("Writing the graph to a file...")
    nx.write_gexf(G, "NN features/Neural Network Nodes Graph.gexf")


def setup_environment():
    print("Visualizing the model and vectorizer features...")
    print("This may take a while, please wait.")

    if not os.path.exists('NN features'):
        mkdir('NN features')


def load_vectorizer():
    vectorizer_load = joblib.load(vectorizer_path)
    feature_names = vectorizer_load.get_feature_names_out()
    with open('NN features/Vectorizer features.txt', 'w') as file:
        file.write(f"Number of features: {len(feature_names)}\n\n")
        file.write('\n'.join(feature_names))
    return vectorizer_load


def visualize_top_features(top_n: int = 90):
    feature_names = vectorizer.get_feature_names_out()
    sorted_indices = vectorizer.idf_.argsort()[:top_n]
    top_features = [feature_names[i] for i in sorted_indices]
    top_idf_scores = vectorizer.idf_[sorted_indices]

    plt.figure(figsize=(20, 12))  # Increase the figure size
    sns.barplot(x=top_idf_scores, y=top_features)
    plt.title('Top 90 Features by IDF Score')
    plt.xlabel('IDF Score')
    plt.ylabel('Feature')

    # Save the plot as a vector graphic
    plt.savefig('NN features/Top_90_Features.svg', format='svg')
    plt.close()


def load_model() -> tuple[Any, device]:
    device_load = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model_load = torch.load(model_path, weights_only=False)
    model_load.to(device_load)
    return model_load, device_load


def save_model_state_dict():
    with open('NN features/Model state dictionary.txt', 'w') as file:
        file.write("Model's state dictionary:\n\n")
        for param_tensor in model.state_dict():
            file.write(f"\n{param_tensor}\t{model.state_dict()[param_tensor].size()}")


def generate_model_visualization():
    dummy_input = torch.randn(1, vectorizer.vocabulary_.__len__()).to(device)
    model_viz = make_dot(model(dummy_input), params=dict(model.named_parameters()), show_attrs=True, show_saved=True)
    model_viz.format = 'png'
    model_viz.render(filename='NN features/Model Visualization', format='png')


def cleanup_temp_files():
    if os.path.exists("NN features/Model Visualization"):
        os.remove("NN features/Model Visualization")


def model_summary():
    mode = "a" if os.path.exists("NN features/Model Summary.txt") else "w"
    with open("NN features/Model Summary.txt", mode) as file:
        file.write(str(model))


if __name__ == '__main__':
    # Print the welcome message
    print("===========================================================================================")
    print("= This script will visualize the features of the model and vectorizer.                    =")
    print("= Please ensure that the model and vectorizer files are present in the specified paths.   =")
    print("= The visualization will be saved in the 'NN features' directory.                         =")
    print("= This script will take a while to run, please be patient.                                =")
    print("===========================================================================================")

    # Read the config file
    print("\n\nReading config file and setting up...")
    config = ConfigParser()
    config.read('../../config.ini')

    setup_environment()

    # Load the paths from the config file
    vectorizer_path = config.get('VulnScan.study Settings', 'vectorizer_path')
    model_path = config.get('VulnScan.study Settings', 'model_path')
    NUMBER_OF_FEATURES = int(config.get('VulnScan.study Settings', 'number_of_features'))

    # Check if the paths exist
    if not os.path.exists(vectorizer_path):
        print(f"Vectorizer file not found. Please double check the path {vectorizer_path}.")
        exit(1)
    if not os.path.exists(model_path):
        print(f"Model file not found. Please double check the path {model_path}.")
        exit(1)

    # Load the vectorizer and model
    vectorizer = load_vectorizer()
    visualize_top_features()
    model, device = load_model()
    # Save the model summary, state dictionary, and visualization
    save_data(model, input_size=(1, vectorizer.vocabulary_.__len__()))
    save_model_state_dict()
    generate_model_visualization()
    cleanup_temp_files()
    save_graph()
    print("Model visualization and summary have been saved to the 'NN features' directory.")

    # Check if GPU is available
    if not os.path.exists('NN features'):
        os.mkdir('NN features')

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Load vectorizer (change the path to your vectorizer .pkl file)
    vectorizer_path = "../Vectorizer .3n3.pkl"
    model_path = "../Model SenseMini .3n3.pth"

    # Load vectorizer
    print(f"Reloading vectorizer from: {vectorizer_path}")
    with open(vectorizer_path, "rb") as f:
        vectorizer = joblib.load(f)

    # Load model and move to the appropriate device (GPU/CPU)
    print(f"Reloading model from: {model_path}")
    model = torch.load(model_path, weights_only=False)
    model.to(device)  # Move model to GPU or CPU

    model_summary()
    main_plot()
else:
    raise ImportError("This training script is meant to be run directly "
                      "and cannot be imported. Please execute it as a standalone script.")