failmap/admin

View on GitHub
websecmap/scanners/scanner/dnssec.py

Summary

Maintainability
B
4 hrs
Test Coverage
"""Scans DNSSEC using the dotSE DNSCHECK tool.

This is also a reference implementation of a standardized scanner. This scanner works on Url level, not on endpoint
level.

It's a nightmare to get the tool running on your system so use the one in docker:
docker-build
docker-failmap-with-db scan dnssec

If you must run the DNSSEC scanner yourself, we wish you good luck. To get you started:
brew install perl

CPAN install TAP::Harness::Env
CPAN install File::ShareDir::Install

and then follow the installation instructions here:
https://github.com/dotse/dnscheck/tree/master/engine

We strongly recomend using the docker approach.
"""

import logging
import subprocess
from typing import List

from celery import Task, group
from django.conf import settings
from django.db.models import Q

from websecmap.celery import ParentFailed, app
from websecmap.organizations.models import Organization, Url
from websecmap.scanners import plannedscan
from websecmap.scanners.models import Endpoint
from websecmap.scanners.scanmanager import store_url_scan_result
from websecmap.scanners.scanner.__init__ import allowed_to_scan, q_configurations_to_scan, unique_and_random

log = logging.getLogger(__name__)

# how often a task should be retried when encountering an expectable exception
MAX_RETRIES = 3
RETRY_DELAY = 10

# after which time (seconds) a pending task should no longer be accepted by a worker
# can also be a datetime.
EXPIRES = 3600  # one hour is more then enough


def filter_scan(
    organizations_filter: dict = dict(), urls_filter: dict = dict(), endpoints_filter: dict = dict(), **kwargs
):

    # DNSSEC only works on top level urls
    urls_filter = dict(urls_filter, **{"computed_subdomain": ""})

    # gather urls from organizations
    if organizations_filter:
        organizations = Organization.objects.filter(**organizations_filter).only("id")
        urls = Url.objects.filter(
            q_configurations_to_scan(),
            Q(computed_subdomain__isnull=True) | Q(computed_subdomain=""),
            organization__in=organizations,
            **urls_filter,
        ).only("id", "url")
    elif endpoints_filter:
        # and now retrieve urls from endpoints
        endpoints = Endpoint.objects.filter(**endpoints_filter).only("id")
        urls = Url.objects.filter(
            q_configurations_to_scan(),
            Q(computed_subdomain__isnull=True) | Q(computed_subdomain=""),
            endpoint__in=endpoints,
            **urls_filter,
        ).only("id", "url")
    else:
        # now urls directly
        urls = Url.objects.filter(
            q_configurations_to_scan(), Q(computed_subdomain__isnull=True) | Q(computed_subdomain=""), **urls_filter
        ).only("id", "url")

    # Optimize: only required values, unique and randomized
    urls = unique_and_random(urls)
    return urls


@app.task(queue="storage")
def plan_scan(
    organizations_filter: dict = dict(), urls_filter: dict = dict(), endpoints_filter: dict = dict(), **kwargs
):
    urls = filter_scan(organizations_filter, urls_filter, endpoints_filter, **kwargs)
    log.info(f"Creating DNSSEC scan task for {len(urls)} urls.")
    plannedscan.request(activity="scan", scanner="dnssec", urls=urls)


@app.task(queue="storage")
def compose_planned_scan_task(**kwargs):
    urls = plannedscan.pickup(activity="scan", scanner="dnssec", amount=kwargs.get("amount", 25))
    return compose_scan_task(urls)


def compose_manual_scan_task(
    organizations_filter: dict = dict(), urls_filter: dict = dict(), endpoints_filter: dict = dict(), **kwargs
):

    if not allowed_to_scan("dnssec"):
        return group()

    urls = filter_scan(organizations_filter, urls_filter, endpoints_filter, **kwargs)
    return compose_scan_task(urls)


def compose_scan_task(urls) -> Task:
    """Compose taskset to scan toplevel domains.

    DNSSEC is implemented on a (top level) url. It's useless to scan per-endpoint.
    This is the first scanner that uses the UrlGenericScan table, which looks nearly the same as the
    endpoint variant.
    """
    # The number of top level urls is negligible, so randomization is not needed.

    # create tasks for scanning all selected endpoints as a single managable group
    # Sending entire objects is possible. How signatures (.s and .si) work is documented:
    # http://docs.celeryproject.org/en/latest/reference/celery.html#celery.signature
    task = group(
        scan_dnssec.si(url.url) | store_dnssec.s(url.pk) | plannedscan.finish.si("scan", "dnssec", url.pk)
        for url in urls
    )

    return task


@app.task(queue="storage")
def store_dnssec(result: List[str], url_id: int):
    """
    :param result:
    """

    # if scan task failed, ignore the result (exception) and report failed status
    if isinstance(result, Exception):
        return ParentFailed("skipping result parsing because scan failed.", cause=result)

    # relevant helps to store the minimum amount of information.
    level, relevant = analyze_result(result)

    # Messages are translated for display. Add the exact messages in: /failmap/map/static/js/script.js
    # Run "failmap translate" to have the messages added to:
    # /failmap/map/locale/*/djangojs.po
    # /failmap/map/locale/*/django.po
    # translate them and then run "failmap translate" again.
    messages = {
        "ERROR": "DNSSEC is incorrectly or not configured (errors found).",
        "WARNING": "DNSSEC is incorrectly configured (warnings found).",
        "INFO": "DNSSEC seems to be implemented sufficiently.",
    }

    log.debug("Storing result: %s, for url: %s.", result, url_id)
    # You can save any (string) value and any (string) message.
    # The EndpointScanManager deduplicates the data for you automatically.
    if result:
        store_url_scan_result("DNSSEC", url_id, level, messages[level], evidence=",\n".join(result))

    # return something informative
    return {"status": "success", "result": level}


# amsterdam.nl hangs on october 12 2018
# in some cases this hangs, therefore have a time limit on the task.
# "The worker processing the task will be killed and replaced with a new one when this is exceeded."
@app.task(
    queue="internet",
    bind=True,
    default_retry_delay=RETRY_DELAY,
    retry_kwargs={"max_retries": MAX_RETRIES},
    expires=EXPIRES,
    task_time_limit=120,
)
def scan_dnssec(self, url: str):
    """
    Uses the dnssec scanner of dotse, which works pretty well.

    :param url:

    Possible problems as seen on: https://github.com/stjernstedt/Interlan/blob/master/script/functions
    Timeout of 240 seconds. Nothing more (oh wow).

    """
    try:
        log.info("Start scanning %s", url)

        output = subprocess.check_output([settings.TOOLS["dnscheck"]["executable"], url]).decode("UTF-8")
        content = output.splitlines()

        log.info("Done scanning: %s, result: %s", url, content)
        return content

    # subprocess.CalledProcessError: non zero exit status
    # OSError: Incorrect permission, file doesn't exist, etc
    except (subprocess.CalledProcessError, OSError) as e:
        # If an expected error is encountered put this task back on the queue to be retried.
        # This will keep the chained logic in place (saving result after successful scan).
        # Retry delay and total number of attempts is configured in the task decorator.
        try:
            # Since this action raises an exception itself, any code after this won't be executed.
            raise self.retry(exc=e)
        except BaseException:
            # If this task still fails after maximum retries the last
            # error will be passed as result to the next task.
            log.exception("Retried %s times and it still failed", MAX_RETRIES)
            return e


def analyze_result(result: List[str]):
    """
    All possible outcomes:
    https://github.com/dotse/dnscheck/blob/5b0fce771259d9dfc03c6c69abba44f2be142c30/engine/t/config/policy.yaml

    dnssec.pl runs with the following settings
    my $check = new DNSCheck({ interactive => 1, extras => { debug => 0 }, localefile => 'locale/en.yaml' });

    this results in output like this:
    3.347: INFO DNSSEC signature RRSIG(faalkaart.nl/IN/SOA/40979) matches records.
    3.347: INFO DNSSEC signature valid: RRSIG(faalkaart.nl/IN/SOA/40979)
    3.347: INFO Enough valid signatures over SOA RRset found for faalkaart.nl.

    optional todo: for debugging also have the url echoed in the output.

    :return:
    """

    errors = []
    warnings = []
    infos = []

    for line in result:
        # remove the cringy timestamp
        line = line.strip()
        line = line[line.find(" ") : len(line)].strip()

        # log.debug(line)

        if line.startswith("INFO"):
            infos.append(line)
        if line.startswith("NOTICE"):
            infos.append(line)
        if line.startswith("WARNING"):
            # The MISSING_DS is never a problem it seems.
            """
            This warning means that there INDEED is an OK DNSSEC implementation as long as you check the parent.

            NL:
            descr: "De child gebruikt zo te zien DNSSEC, maar de parent heeft geen veilige delegation op basis
            van DNSSEC.  Hierdoor is de 'chain of trust' tussen de parent en de child verbroken en 'validating
            resolvers', die op DNSSEC-juistheid controleren, zullen niet in staat zijn om de antwoorden van de
            child te valideren."
            format: 'De Chain of trust voor %s is niet in orde - Er is een DNSKEY aangetroffen bij de child,
            maar DS record bij de parent.'

            EN:
            descr: 'The child seems to use DNSSEC, but the parent has no secure delegation.  The chain of trust
            between the parent and the child is broken and validating resolvers will not be able to validate
            answers from the child.'
            format: 'Broken chain of trust for %s - DNSKEY found at child, but no DS was found at parent.'

            Search for MISSING_DS here: https://github.com/dotse/dnscheck
            """

            if line.startswith("WARNING [DNSSEC:MISSING_DS]"):
                infos.append("WARNING [DNSSEC:MISSING_DS]")
            else:
                warnings.append(line)

        if line.startswith("ERROR"):
            errors.append(line)

        # a beautiful feature of DNSCHECK is that if there is no DNSSEC, an INFO message is given.
        # We'll upgrade the severity here:
        """
        35.268: INFO Begin testing DNSSEC for gratiz.nl.
        35.301: INFO Did not find DS record for gratiz.nl at parent.
        35.374: INFO Servers for gratiz.nl have consistent extra processing status.
        35.402: INFO Authenticated denial records not found for gratiz.nl.
        35.420: INFO Did not find DNSKEY record for gratiz.nl at child.
        35.422: INFO No DNSKEY(s) found at child, other tests skipped.
        35.422: INFO Done testing DNSSEC for gratiz.nl.
        """

        # first line if language files are not installed
        if line.startswith("%s" % "INFO Did not find DNSKEY"):
            log.info(line)
            errors.append(line)
        if line.startswith("%s" % "INFO [DNSSEC:DNSKEY_NOT_FOUND]"):
            log.info(line)
            errors.append(line)

        # Why are the following upgraded? There is no explanation for this.
        # translations for the english language files
        # When NO DS is found, a warning will already be present in the output.
        # WARNING [DNSSEC:MISSING_DS]
        # SIDN ‐ If the parent has a DS record, the child must support DNSSEC (DNSSEC:NO_DS_FOUND).
        # https://gtldresult.icann.org/applicationstatus/applicationdetails:downloadattachment/12382?t:ac=915
        # All municipalities in NL that currently have imperfect DNS have the NO_DS_FOUND error.
        # The warning will be suppressed, as the parent can be checked for a correct DS.
        # you can also see this behavior in DNSVIZ, everything has a DS, except the child. And that is fine.

        # It's not clear if this really is a problematic warning.
        # if line.startswith("%s" % "INFO Did not find DS record"):
        #     log.info(line)
        #     errors.append(line)
        # if line.startswith("%s" % "INFO [DNSSEC:NO_DS_FOUND]"):
        #     log.info(line)
        #     errors.append(line)

        if line.startswith("%s" % "INFO Authenticated denial records not found"):
            log.info(line)
            errors.append(line)

        if line.startswith("%s" % "INFO No DNSKEY(s) found at child"):
            log.info(line)
            errors.append(line)

        # NSEC_NOT_FOUND can still mean NSEC3PARAM_FOUND and NSEC3_ITERATIONS_OK
        # so we don't need to check on this NSEC parameter
        # if line.startswith("%s" % "INFO [DNSSEC:NSEC_NOT_FOUND]"):
        #     log.info(line)
        #     errors.append(line)

        # if line.startswith("%s" % "INFO [DNSSEC:SKIPPED_NO_KEYS]"):
        #     log.info(line)
        #     errors.append(line)

    highest_level = "ERROR" if errors else "WARNING" if warnings else "INFO" if infos else "NONE"

    if highest_level == "NONE":
        raise ValueError("Did not correctly parse DNSSCAN result string. %s " % result)

    relevant_strings = []

    # upgrade relevant to the highest level by overwriting previous levels.
    if infos:
        relevant_strings = infos
    if warnings:
        relevant_strings = warnings
    if errors:
        relevant_strings = errors

    # log.debug("Relevant:")
    # log.debug(relevant_strings)

    return highest_level, relevant_strings