failmap/admin

View on GitHub
websecmap/scanners/scanner/onboard.py

Summary

Maintainability
A
55 mins
Test Coverage
import logging
from datetime import datetime, timedelta
from typing import List

import pytz
from celery import group
from django.utils import timezone

from websecmap.celery import Task, app
from websecmap.map.report import update_report_tasks
from websecmap.organizations.models import Url
from websecmap.scanners.scanner.__init__ import q_configurations_to_scan, url_filters
from websecmap.scanners.scanner.utils import in_chunks
from websecmap.scanners.tasks import crawl_tasks, explore_tasks, scan_tasks

log = logging.getLogger(__package__)


def compose_task(
    organizations_filter: dict = dict(), urls_filter: dict = dict(), endpoints_filter: dict = dict(), **kwargs
) -> Task:
    """Multi-stage onboarding."""

    """
    This onboarder has multiple stages. The main reason for this is that the original plan failed: endpoints where
    discovered but the next task hanged. This is extensively documented here:
    https://github.com/celery/celery/issues/4681

    Therefore this onboarding task creates different sets of tasks per stage per url.

    Stage:
    V ""
    V endpoint_discovery    endpoints are discovered on the url
    V endpoint_finished     done, ready for next stage
    V scans_running         running a series of scans on the endpoints
    V scans_finished        done, ready for next stage
    V crawling              trying to find more endpoints (via DNS)
    V crawl_finished        IMPLICIT! Last step will not be saved.
    - onboarded             onboarding completed

    Todo: date the last step was set. So we can find processes that failed and retry.
    Todo: run this every minute.
    """

    # Resetting the outdated onboarding has a risk: if the queue takes longer than the onboarding tasks to finish the
    # tasks will be performed multiple time. This can grow fast and large. Therefore a very large time has been taken
    # to reset onboarding of tasks. Normally onboarding should be one within 5 minutes. We'll reset after 7 days.
    reset_expired_onboards()

    default_filter = {"onboarded": "False"}
    urls_filter = {**urls_filter, **default_filter}

    urls = Url.objects.all().filter(q_configurations_to_scan(level="url"), **urls_filter)
    urls = url_filters(urls, organizations_filter, urls_filter, endpoints_filter)

    log.info("Found %s urls to create tasks for." % len(urls))

    # it's impossible to set the first stage "endpoint_discovery" in a task, as the task might take an hour
    # to complete (depending of how much work is in the queue. Could be days. So therefore to trigger the first
    # stage and not to re-submit the url for onboarding (again and again and again) the first stage is set in
    # this routine for each of the steps.

    # some urls can be returned a number of times if they are shared amongs organizations, this reduces that:
    # note that mysql and sqlite dont support DISTINCT ON.
    urls = list(set(urls))

    endpoint_discovery_urls = []
    scans_running_urls: List[int] = []
    crawling_urls = []

    tasks = []
    for url in urls:
        log.info("Url %s is at onboarding_stage: %s", url, url.onboarding_stage)

        # you will see this happen per worker-size (so for example per 20 things)
        if not url.onboarding_stage:  # While developing: or url.onboarding_stage == "endpoint_discovery":
            log.info("Exploring on: %s", url)
            update_stage([url], "endpoint_discovery")
            endpoint_discovery_urls.append(url)

        elif url.onboarding_stage in ["endpoint_finished"]:  # dev: , "scans_running"
            log.info("Scanning on: %s", url)
            update_stage([url], "scans_running")
            scans_running_urls.append(url.pk)

        elif url.onboarding_stage == "scans_finished":
            log.info("Crawling on: %s", url)
            update_stage([url], "crawling")
            crawling_urls.append(url)

        else:
            # Do nothing when wheels are set in motion or an unknown state is encountered.
            pass

    for url in endpoint_discovery_urls:
        tasks.append(explore_tasks(url) | update_stage.si([url], "endpoint_finished"))

    chunks = in_chunks(scans_running_urls, 25)
    for chunk in chunks:
        tasks.append(scan_tasks(chunk) | update_report_tasks(chunk) | update_stage.si(chunk, "scans_finished"))

    for url in crawling_urls:
        tasks.append(crawl_tasks(url) | finish_onboarding.si(url))

    log.info("Created %s tasks to be performed." % len(tasks))
    task = group(tasks)

    # log.info("Task:")
    # log.info(task)

    return task


def reset_expired_onboards():
    # If the queues don't finish in 7 days, you have a problem somewhere. This will add to that problem by adding
    # EVEN MORE tasks to the queue. So an unmanaged system will run out of space somewhere sometime :)
    expired = list(Url.objects.all().filter(onboarding_stage_set_on__lte=datetime.now(pytz.utc) - timedelta(days=7)))

    # some older tasks might never have an onboarding stage due to a bug. add those too:
    expired += list(Url.objects.all().filter(onboarding_stage_set_on__isnull=True))

    for url in expired:
        reset_onboarding_status(url)


def reset_onboarding_status(url):
    # set the task a step back.
    # retry endpoint discovery if that didn't finish.
    if url.onboarding_stage == "endpoint_discovery":
        url.onboarding_stage = ""

    # retry scanning after discovery of endpoints
    if url.onboarding_stage == "scans_running":
        url.onboarding_stage = "endpoint_finished"

    # retry crawling after scans are finished
    if url.onboarding_stage == "crawling":
        url.onboarding_stage = "scans_finished"

    url.save()


def forward_onboarding_status(url):
    # set the task a step back.
    # retry endpoint discovery if that didn't finish.
    if url.onboarding_stage == "endpoint_discovery":
        url.onboarding_stage = "endpoint_finished"

    # retry scanning after discovery of endpoints
    if url.onboarding_stage == "scans_running":
        url.onboarding_stage = "scans_finished"

    # retry crawling after scans are finished
    if url.onboarding_stage == "crawling":
        url.onboarded = True
        url.onboarded_on = timezone.now()
        url.onboarding_stage = "onboarded"

    url.save()


@app.task(queue="storage")
def finish_onboarding(url):
    log.info("Finishing onboarding of %s", url)
    url.onboarded = True
    url.onboarded_on = timezone.now()
    url.onboarding_stage = "onboarded"
    url.onboarding_stage_set_on = datetime.now(pytz.utc)
    url.save(update_fields=["onboarded_on", "onboarded", "onboarding_stage", "onboarding_stage_set_on"])
    return True


@app.task(queue="storage")
def update_stage(urls: List[Url], stage=""):

    for url in urls:
        log.info("Updating onboarding_stage of %s from %s to %s", url, url.onboarding_stage, stage)
        url.onboarding_stage = stage
        url.onboarding_stage_set_on = datetime.now(pytz.utc)
        url.save(update_fields=["onboarding_stage", "onboarding_stage_set_on"])

    return True