failmap/admin

View on GitHub
websecmap/scanners/tests/test_plannedscan.py

Summary

Maintainability
A
1 hr
Test Coverage
import logging
from datetime import timedelta

from django.utils import timezone
from freezegun import freeze_time

from websecmap.organizations.models import Organization, Url
from websecmap.scanners.models import (
    Endpoint,
    EndpointGenericScan,
    PlannedScan,
    PlannedScanStatistic,
    Scanner,
    State,
    Activity,
)
from websecmap.scanners.plannedscan import (
    calculate_progress,
    finish_multiple,
    get_latest_progress,
    pickup,
    request,
    reset,
    store_progress,
    retrieve_endpoints_from_urls,
)
from websecmap.scanners.scanner.tls_qualys import plan_scan

log = logging.getLogger("websecmap")


def create_organization(name="Test"):
    o = Organization()
    o.name = name
    o.save()
    return o


def create_url(url: str = "example.com"):
    u = Url()
    u.url = url
    u.not_resolvable = False
    u.is_dead = False
    u.save()
    return u


def link_url_to_organization(url: Url, organization: Organization):
    url.organization.add(organization)
    url.save()
    return url


def create_endpoint(url, ip_version, protocol, port):
    e = Endpoint()
    e.url = url
    e.ip_version = ip_version
    e.protocol = protocol
    e.port = port
    e.is_dead = False
    e.save()
    return e


def create_endpoint_scan(endpoint, type, rating, at_when):
    egs = EndpointGenericScan()
    egs.endpoint = endpoint
    egs.is_the_latest_scan = True
    egs.rating = rating
    egs.last_scan_moment = at_when
    egs.rating_determined_on = at_when
    egs.type = type
    egs.save()

    # overwrite the auto_now_add behavior (todo: remove this implicit behavior)
    egs.last_scan_moment = at_when
    egs.save()


def test_plannedscan(db):
    o = create_organization("Test")
    u1 = create_url("example.com")
    link_url_to_organization(u1, o)
    u2 = create_url("example2.com")
    link_url_to_organization(u2, o)

    request(scanner="tls_qualys", activity="scan", urls=[u1, u2])
    assert PlannedScan.objects.all().count() == 2
    assert PlannedScan.objects.all().filter(state=State["requested"].value).count() == 2
    assert calculate_progress() == [
        {
            "scanner": Scanner["tls_qualys"].value,
            "activity": Activity["scan"].value,
            "state": State["requested"].value,
            "amount": 2,
        }
    ]

    urls = pickup(scanner="tls_qualys", activity="scan", amount=10)
    assert len(urls) == 2
    assert PlannedScan.objects.all().filter(state=State["picked_up"]).count() == 2
    assert calculate_progress() == [
        {
            "scanner": Scanner["tls_qualys"].value,
            "activity": Activity["scan"].value,
            "state": State["picked_up"].value,
            "amount": 2,
        }
    ]

    # Verify that you cannot pickup more than the usual 500, in this max 498:
    # todo: get this from caplog: Picking up maximum 498 of total 498 free slots.
    pickup(scanner="tls_qualys", activity="scan", amount=1000)

    finish_multiple(scanner="tls_qualys", activity="scan", urls=[url.pk for url in urls])
    assert PlannedScan.objects.all().filter(state=State["finished"]).count() == 2

    # Finished means that the previous steps will be added to the list if they are not present yet.
    assert calculate_progress() == [
        {
            "activity": Activity["scan"].value,
            "amount": 0,
            "scanner": Scanner["tls_qualys"].value,
            "state": State["requested"].value,
        },
        {
            "activity": Activity["scan"].value,
            "amount": 0,
            "scanner": Scanner["tls_qualys"].value,
            "state": State["picked_up"].value,
        },
        {
            "scanner": Scanner["tls_qualys"].value,
            "activity": Activity["scan"].value,
            "state": State["finished"].value,
            "amount": 2,
        },
    ]

    # test storage:
    store_progress()
    assert get_latest_progress() == calculate_progress()
    assert PlannedScanStatistic.objects.count() == 1
    old_progress = get_latest_progress()

    # test that a new call will store new data
    store_progress()
    assert PlannedScanStatistic.objects.count() == 2

    # make sure only the latest is retrieved:
    request(scanner="tls_qualys", activity="scan", urls=[u1, u2])
    store_progress()
    assert get_latest_progress() == calculate_progress()
    assert PlannedScanStatistic.objects.count() == 3
    assert old_progress != get_latest_progress()


def test_plan_scans(db):
    with freeze_time("2020-01-01"):
        o = create_organization("Test")
        u1 = create_url("example.com")
        e1 = create_endpoint(u1, 4, "https", 443)
        link_url_to_organization(u1, o)
        u2 = create_url("example2.com")
        e2 = create_endpoint(u2, 4, "https", 443)
        link_url_to_organization(u2, o)

        # plan scans on endpoints that have never been scanned.
        plan_scan()
        assert calculate_progress() == [
            {
                "scanner": Scanner["tls_qualys"].value,
                "activity": Activity["scan"].value,
                "state": State["requested"].value,
                "amount": 2,
            }
        ]

        # if there are already planned, no new scans will be created
        plan_scan()
        assert calculate_progress() == [
            {
                "scanner": Scanner["tls_qualys"].value,
                "activity": Activity["scan"].value,
                "state": State["requested"].value,
                "amount": 2,
            }
        ]

        # to test what happens on endpoints that already have scans:
        reset()

        # with very recent scans no new scans will be created:
        create_endpoint_scan(e1, "tls_qualys_encryption_quality", "F", timezone.now())
        create_endpoint_scan(e2, "tls_qualys_encryption_quality", "A", timezone.now())
        plan_scan()
        assert calculate_progress() == []

        # this does not delete endpoints...
        EndpointGenericScan.objects.all().delete()

        # test
        # with some older data, the scan for the bad endpoint will be requested
        create_endpoint_scan(e1, "tls_qualys_encryption_quality", "F", timezone.now() - timedelta(days=100))
        create_endpoint_scan(e2, "tls_qualys_encryption_quality", "A", timezone.now())

        # make sure the endpoint is created:
        first_egs = EndpointGenericScan.objects.last()
        assert first_egs.endpoint == e1
        assert EndpointGenericScan.objects.all().count() == 2
        egs = EndpointGenericScan.objects.all().filter(rating="F").first()
        assert egs.rating_determined_on < timezone.now() - timedelta(days=4)
        assert egs.last_scan_moment < timezone.now() - timedelta(days=4)
        assert egs.is_the_latest_scan is True

        plan_scan()
        assert PlannedScan.objects.all().filter(state=State["requested"].value).count() == 1
        assert calculate_progress() == [
            {
                "scanner": Scanner["tls_qualys"].value,
                "activity": Activity["scan"].value,
                "state": State["requested"].value,
                "amount": 1,
            }
        ]

        EndpointGenericScan.objects.all().delete()

        # test
        # with very old scans both scans will be requested
        create_endpoint_scan(e1, "tls_qualys_encryption_quality", "F", timezone.now() - timedelta(days=6))
        create_endpoint_scan(e2, "tls_qualys_encryption_quality", "A", timezone.now() - timedelta(days=100))
        plan_scan()
        assert PlannedScan.objects.all().filter(state=State["requested"].value).count() == 2
        assert calculate_progress() == [
            {
                "scanner": Scanner["tls_qualys"].value,
                "activity": Activity["scan"].value,
                "state": State["requested"].value,
                "amount": 2,
            }
        ]


def test_retrieve_endpoints_from_urls(db):
    u1 = Url.objects.create(url="example.com")
    u2 = Url.objects.create(url="example.nl")
    e1 = Endpoint.objects.create(url=u1, protocol="http", port="443")

    endpoints, urls_without_endpoints = retrieve_endpoints_from_urls([u1.id, u2.id], protocols=["http", "https"])

    assert endpoints == [e1]
    assert urls_without_endpoints == [u2.id]