failmap/admin

View on GitHub
websecmap/scanners/scanmanager.py

Summary

Maintainability
A
1 hr
Test Coverage
import logging
from datetime import datetime

import pytz
from django.core.exceptions import ObjectDoesNotExist

from websecmap.scanners.models import Endpoint, EndpointGenericScan, Url, UrlGenericScan

log = logging.getLogger(__package__)


def store_endpoint_scan_result(scan_type: str, endpoint_id: int, rating: str, message: str, evidence: str = ""):

    # Check if the latest scan has the same rating or not:
    try:
        gs = EndpointGenericScan.objects.all().filter(type=scan_type, endpoint=endpoint_id).latest("last_scan_moment")
        exists = True
    except ObjectDoesNotExist:
        exists = False
        gs = EndpointGenericScan()

    # To deduplicate data, only store changes to scans. We'll update just the scan moment, and the rest stays the same.
    # The amount of data saved runs in the gigabytes. So it's worth the while doing it like this :)
    if gs.explanation == str(message) and gs.rating == str(rating):
        log.debug("Scan had the same rating and message, updating last_scan_moment only.")
        gs.last_scan_moment = datetime.now(pytz.utc)
        gs.save(update_fields=["last_scan_moment"])
        return

    # message and rating changed for this scan_type, so it's worth while to save the scan.
    if not exists:
        log.debug("No prior scan result found, creating a new one.")
    else:
        log.debug("Message or rating changed compared to previous scan. Saving the new scan result.")

    gs = EndpointGenericScan()
    # very long csp headers for example
    gs.explanation = message[0:255]
    gs.rating = rating
    gs.endpoint = Endpoint.objects.all().filter(id=endpoint_id).first()
    gs.type = scan_type
    # Very long CSP headers for example take a lot of space.
    gs.evidence = evidence[0:9000]
    gs.last_scan_moment = datetime.now(pytz.utc)
    gs.rating_determined_on = datetime.now(pytz.utc)
    gs.is_the_latest_scan = True
    gs.save()

    # Set all the previous endpoint scans of this endpoint + type to NOT be the latest scan.
    EndpointGenericScan.objects.all().filter(endpoint=gs.endpoint, type=gs.type).exclude(pk=gs.pk).update(
        is_the_latest_scan=False
    )


def store_url_scan_result(scan_type: str, url_id: int, rating: str, message: str, evidence: str = ""):

    # Check if the latest scan has the same rating or not:
    try:
        gs = (
            UrlGenericScan.objects.all()
            .filter(
                type=scan_type,
                url=url_id,
            )
            .latest("last_scan_moment")
        )
    except ObjectDoesNotExist:
        gs = UrlGenericScan()

    # here we figured out that you can still pass a bool while type hinting.
    # log.debug("Explanation new: '%s', old: '%s' eq: %s, Rating new: '%s', old: '%s', eq: %s" %
    #           (message, gs.explanation, message == gs.explanation, rating, gs.rating, str(rating) == gs.rating))

    # last scan had exactly the same result, so don't create a new scan and just update the last scan date.
    # while we have type hinting, it's still possible to pass in a boolean and then you compare a str to a bool...
    if gs.explanation == str(message) and gs.rating == str(rating):
        log.debug("Scan had the same rating and message, updating last_scan_moment only.")
        gs.last_scan_moment = datetime.now(pytz.utc)
        gs.save(update_fields=["last_scan_moment"])
    else:
        # message and rating changed for this scan_type, so it's worth while to save the scan.
        log.debug("Message or rating changed: making a new generic scan.")
        gs = UrlGenericScan()
        gs.explanation = message
        gs.rating = rating
        gs.url = Url.objects.all().filter(id=url_id).first()
        gs.evidence = evidence
        gs.type = scan_type
        gs.last_scan_moment = datetime.now(pytz.utc)
        gs.rating_determined_on = datetime.now(pytz.utc)
        gs.is_the_latest_scan = True
        gs.save()

        UrlGenericScan.objects.all().filter(url=gs.url, type=gs.type).exclude(pk=gs.pk).update(is_the_latest_scan=False)


def endpoint_has_scans(scan_type: str, endpoint_id: int):
    """
    Used for data deduplication. Don't save a scan that had zero points, but you can upgrade
    to zero (or another rating)
    :param scan_type:
    :param endpoint_id:
    :return:
    """

    try:
        gs = (
            EndpointGenericScan.objects.all()
            .filter(
                type=scan_type,
                endpoint=endpoint_id,
            )
            .latest("last_scan_moment")
        )
        if gs.rating:
            return True
    except ObjectDoesNotExist:
        return False