DefinetlyNotAI/Logicytics

View on GitHub
CODE/vulnscan.py

Summary

Maintainability
A
0 mins
Test Coverage
from __future__ import annotations

import mimetypes
import os
import threading
import warnings

import joblib
import numpy as np
import torch
from safetensors import safe_open
from sklearn.feature_extraction.text import TfidfVectorizer

# Set up logging
from logicytics import Log, DEBUG

if __name__ == "__main__":
    log = Log({"log_level": DEBUG})

log.info("Locking threads - Model and Vectorizer")
model_lock = threading.Lock()
vectorizer_lock = threading.Lock()

model_to_use = None
vectorizer_to_use = None


def load_model(model_path_to_load: str) -> safe_open | torch.nn.Module:
    """
    Load a machine learning model from the specified file path.

    Args:
        model_path_to_load (str): Path to the model file.

    Returns:
        model: Loaded model object.

    Raises:
        ValueError: If the model file format is unsupported.
    """
    if model_path_to_load.endswith('.pkl'):
        return joblib.load(model_path_to_load)
    elif model_path_to_load.endswith('.safetensors'):
        return safe_open(model_path_to_load, framework='torch')
    elif model_path_to_load.endswith('.pth'):
        with warnings.catch_warnings():
            warnings.filterwarnings("ignore", category=FutureWarning)
            return torch.load(model_path_to_load, weights_only=False)
    else:
        raise ValueError("Unsupported model file format. Use .pkl, .safetensors, or .pth")


def scan_path(model_path: str, scan_paths: str, vectorizer_path: str):
    global model_to_use, vectorizer_to_use
    try:
        with model_lock:
            if model_to_use is None:
                log.info(f"Loading model from {model_path}")
                model_to_use = load_model(model_path)
        with vectorizer_lock:
            if vectorizer_to_use is None:
                log.info(f"Loading vectorizer from {vectorizer_path}")
                vectorizer_to_use = joblib.load(vectorizer_path)
        vulnscan(model_to_use, scan_paths, vectorizer_to_use)
    except Exception as e:
        log.error(f"Error scanning path {scan_paths}: {e}")


def is_sensitive(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_content: str) -> tuple[bool, float, str]:
    """
    Determine if the file content is sensitive using the provided model and vectorizer.

    Args:
        model: Machine learning model.
        vectorizer: Vectorizer to transform file content.
        file_content (str): Content of the file to be analyzed.

    Returns:
        tuple: (True if the content is sensitive, False otherwise, prediction probability, reason).
    """
    if isinstance(model, torch.nn.Module):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model.to(device)
        features = vectorizer.transform([file_content])
        model.eval()
        with torch.no_grad():
            features_tensor = torch.tensor(features.toarray(), dtype=torch.float32).to(device)
            prediction = model(features_tensor)
            probability = torch.softmax(prediction, dim=1).max().item()
            top_features = np.argsort(features.toarray()[0])[-5:]
            reason = ", ".join([vectorizer.get_feature_names_out()[i] for i in top_features])
            return prediction.argmax(dim=1).item() == 1, probability, reason
    else:
        features = vectorizer.transform([file_content])
        prediction = model.predict_proba(features)
        probability = prediction.max()
        top_features = np.argsort(features.toarray()[0])[-5:]
        reason = ", ".join([vectorizer.get_feature_names_out()[i] for i in top_features])
        return model.predict(features)[0] == 1, probability, reason


def scan_file(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_path: str) -> tuple[bool, float, str]:
    """
    Scan a single file to determine if it contains sensitive content.

    Args:
        model: Machine learning model.
        vectorizer: Vectorizer to transform file content.
        file_path (str): Path to the file to be scanned.

    Returns:
        tuple: (True if the file is sensitive, False otherwise, prediction probability).
    """
    mime_type, _ = mimetypes.guess_type(file_path)
    if mime_type and mime_type.startswith('text'):
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
            content = file.read()
        return is_sensitive(model, vectorizer, content)
    else:
        with open(file_path, 'r', errors='ignore') as file:
            content = file.read()
        return is_sensitive(model, vectorizer, content)


def vulnscan(model, SCAN_PATH, vectorizer):
    log.info(f"Scanning {SCAN_PATH}")
    result, probability, reason = scan_file(model, vectorizer, SCAN_PATH)
    if result:
        log.info(f"File {SCAN_PATH} is sensitive with probability {probability:.2f}. Reason: {reason}")
        if not os.path.exists("Sensitive_File_Paths.txt"):
            with open("Sensitive_File_Paths.txt", "w") as sensitive_file:
                sensitive_file.write(f"{SCAN_PATH}\n\n")
        with open("Sensitive_File_Paths.txt", "a") as sensitive_file:
            sensitive_file.write(f"{SCAN_PATH}\n")


# Start scanning
log.info("Getting paths to scan - This may take some time!!")

threads = []
paths = []
base_paths = [
    "C:\\Users\\",
    "C:\\Windows\\Logs",
    "C:\\Program Files",
    "C:\\Program Files (x86)"
]

for base_path in base_paths:
    for root, dirs, files_main in os.walk(base_path):
        for file_main in files_main:
            paths.append(os.path.join(root, file_main))

# Start scanning
log.warning("Starting scan - This may take hours and consume memory!!")

for path in paths:
    thread = threading.Thread(target=scan_path,
                              args=("VulnScan/Model SenseMini .3n3.pth", path, "VulnScan/Vectorizer .3n3.pkl"))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()