failmap/admin

View on GitHub
websecmap/scanners/management/commands/set_latest_scan.py

Summary

Maintainability
C
1 day
Test Coverage
import logging
from datetime import datetime

import pytz
from django.core.management.base import BaseCommand

from websecmap.scanners import ENDPOINT_SCAN_TYPES, URL_SCAN_TYPES
from websecmap.scanners.models import EndpointGenericScan, UrlGenericScan

log = logging.getLogger(__name__)


class Command(BaseCommand):
    """Determines which scans are the latest and sets the latest scan flag. Normally this would happen automatically
    using the scan manager. But the flags are empty in older systems.

    The queries used allow sliding through time, for example setting the latest on a certain date. Which does not
    make sense at all, but works."""

    def handle(self, *args, **options):
        for scan_type in URL_SCAN_TYPES:
            reflag_urlgenericscan(type=scan_type)

        for scan_type in ENDPOINT_SCAN_TYPES:
            reflag_endpointgenericscan(type=scan_type)


def reflag_urlgenericscan(type):
    log.debug("Setting flags on UrlGenericScan type: %s" % type)
    UrlGenericScan.objects.all().filter(type=type).update(is_the_latest_scan=False)

    # get the latest scans
    sql = """
        SELECT
            id,
            last_scan_moment,
            is_the_latest_scan
        FROM scanners_urlgenericscan
        INNER JOIN
            (SELECT MAX(id) as id2 FROM scanners_urlgenericscan egs2
             WHERE last_scan_moment <= '%(when)s' and type = '%(type)s' GROUP BY url_id
             ) as x
        ON x.id2 = scanners_urlgenericscan.id
    """ % {
        "when": datetime.now(pytz.utc),
        "type": type,
    }

    updatescans(UrlGenericScan.objects.raw(sql))


def reflag_endpointgenericscan(type):
    log.debug("Setting flags on EndpointGenericScan type: %s" % type)
    EndpointGenericScan.objects.all().filter(type=type).update(is_the_latest_scan=False)

    # get the latest endpointgenericscans
    sql = """
        SELECT
            id,
            last_scan_moment,
            is_the_latest_scan
        FROM scanners_endpointgenericscan
        INNER JOIN
            (SELECT MAX(id) as id2 FROM scanners_endpointgenericscan egs2
             WHERE last_scan_moment <= '%(when)s' and type = '%(type)s' GROUP BY endpoint_id
             ) as x
        ON x.id2 = scanners_endpointgenericscan.id
    """ % {
        "when": datetime.now(pytz.utc),
        "type": type,
    }

    updatescans(EndpointGenericScan.objects.raw(sql))


def updatescans(scans):
    log.debug("Updating %s scans" % len(list(scans)))
    for scan in scans:
        scan.is_the_latest_scan = True
        scan.save()