failmap/admin

View on GitHub
websecmap/celery/worker.py

Summary

Maintainability
D
1 day
Test Coverage
"""
A worker gets a role from it's configuration. This role determines which queue's are processed by the worker.
The role also specifies what network connectivity is required (ipv4, ipv6 or both).

A normal websecmap server should have both IPv4 and IPv6 access.

On tricky issue:
The ROLE scanner requires both access to IPv4 and IPv6 networks.
The QUEUE scanner contains tasks that do not care about network family and can be either 4 or 6.
"""

import logging
import os
import socket

from constance import config
from kombu import Queue
from retry import retry

log = logging.getLogger(__name__)

# list of all roles that require internet connectivity
ROLES_REQUIRING_ANY_NETWORK = [
    "internet",  # the queue scanner accepts 4and6, 4 or 6 - so ANY network scan :)
]

ROLES_REQUIRING_IPV4_AND_IPV6 = ["default", "all_internet"]

# list of all roles that require IPv6 networking
ROLES_REQUIRING_IPV6 = [
    "v6_internet",
    "default_ipv6",
]

ROLES_REQUIRING_IPV4 = [
    "v4_internet",
    "default_ipv4",
    "qualys",  # only supports ipv4(!)
]

ROLES_REQUIRING_NO_NETWORK = [
    "storage",
    "calculator",
    "claim_proxy",
    "reporting",
]

ROLES_REQUIRING_GUI_AND_NETWORK = ["desktop"]

# define roles for workers
QUEUES_MATCHING_ROLES = {
    # Select between roles.
    # universal worker that has access to database and internet on both v4 and v6
    # will work in one-worker configuration - and slowly -  it's untested and it likely will not be a great experience
    "default": [
        # doesn't care about network family, any network is fine
        Queue("internet"),
        # only ipv4 tasks
        Queue("ipv4"),
        # only ipv4 tasks
        Queue("ipv6"),
        # needs both network families to be present
        Queue("4and6"),
        # for tasks that require a database connection
        Queue("storage"),
        # tasks that require no network, no database. Such as calculations, parsing of datasets etc.
        Queue("isolated"),
        # run qualys scans
        Queue("qualys"),
        Queue("claim_proxy"),
        Queue("reporting"),
        Queue("discover_subdomains"),
        Queue("known_subdomains"),
        Queue("screenshot"),
    ],
    # queuemonitor shows the currently running queues on the dashboard. It will not do anything else. It subscribes
    # to all queues. This does not have a worker (and doesn't need one).
    "queuemonitor": [
        Queue("storage"),
        Queue("4and6"),
        Queue("ipv4"),
        Queue("ipv6"),
        Queue("internet"),
        Queue("qualys"),
        Queue("isolated"),
        Queue("claim_proxy"),
    ],
    "default_ipv4": [
        Queue("internet"),
        Queue("ipv4"),
        Queue("storage"),
        Queue("isolated"),
    ],
    "default_ipv6": [
        Queue("internet"),
        Queue("ipv6"),
        Queue("storage"),
        Queue("isolated"),
    ],
    "v4_internet": [
        Queue("internet"),
        Queue("ipv4"),
        Queue("isolated"),
    ],
    "v6_internet": [
        Queue("internet"),
        Queue("ipv6"),
        Queue("isolated"),
    ],
    "storage": [
        Queue("storage"),
        # Queue('isolated'),  # Do NOT perform isolated (slow) tasks, which might block the worker.
        # Given there is only one storage worker, blocking it doesn't help it's work.
    ],
    "reporting": [
        Queue("reporting"),
    ],
    "claim_proxy": [
        Queue("claim_proxy"),
        # Queue('isolated'),  # Do NOT perform isolated (slow) tasks, which might block the worker.
        # Given there is only one storage worker, blocking it doesn't help it's work.
    ],
    "calculator": [Queue("isolated")],
    "desktop": [Queue("desktop")],
    # universal scanner worker that has internet access for either IPv4 and IPv6 or both (you don't know)
    "any_internet": [
        Queue("internet"),  # tasks that requires ANY network
        Queue("isolated"),  # no network, no database
    ],
    # all internet access, with ipv4 and 6 configured
    "all_internet": [
        Queue("internet"),
        Queue("ipv4"),
        Queue("ipv6"),
        Queue("4and6"),
        Queue("isolated"),
    ],
    # special scanner worker for qualys rate limited tasks to not block queue for other tasks
    # and it needs a dedicated IP address, which is coded in hyper workers.
    "qualys": [
        Queue("qualys"),
    ],
    "discover_subdomains": [
        Queue("discover_subdomains"),
    ],
    "known_subdomains": [
        Queue("known_subdomains"),
    ],
    "screenshot": [
        Queue("screenshot"),
    ],
}


def worker_configuration():
    """Apply specific configuration for worker depending on environment."""

    role = os.environ.get("WORKER_ROLE", "default")

    log.info("Configuring worker for role: %s", role)

    # configure which queues should be consumed depending on assigned role for this worker
    return {"task_queues": QUEUES_MATCHING_ROLES[role]}


@retry(tries=3, delay=5)
def worker_verify_role_capabilities(role):
    """Determine if chosen role can be performed on this host (eg: ipv6 connectivity.)"""

    failed = False

    if role in ROLES_REQUIRING_NO_NETWORK:
        return not failed

    if role in ROLES_REQUIRING_IPV6 or role in ROLES_REQUIRING_IPV4_AND_IPV6:
        # allow IPv6 support to be faked during development
        if not os.environ.get("NETWORK_SUPPORTS_IPV6", False):
            # verify if a https connection to a IPv6 website can be made
            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, 0)
            try:
                s.connect((config.IPV6_TEST_DOMAIN, 443))
            except socket.gaierror:
                # docker container DNS might not be ready, retry
                raise
            except BaseException:
                log.warning("Failed to connect to ipv6 test domain %s via IPv6", config.IPV6_TEST_DOMAIN, exc_info=True)
                failed = True

    if role in ROLES_REQUIRING_IPV4 or role in ROLES_REQUIRING_IPV4_AND_IPV6:
        # verify if a https connection to a website can be made
        # we assume non-ipv4 internet doesn't exist
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        try:
            s.connect((config.CONNECTIVITY_TEST_DOMAIN, 443))
        except socket.gaierror:
            # docker container DNS might not be ready, retry
            raise
        except BaseException:
            log.warning("Failed to connect to test domain %s via IPv4", config.CONNECTIVITY_TEST_DOMAIN, exc_info=True)
            failed = True

    if role in ROLES_REQUIRING_ANY_NETWORK:
        # one may fail.

        # try v4 first
        s4 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        try:
            s4.connect((config.IPV6_TEST_DOMAIN, 443))
        except socket.gaierror:
            # docker container DNS might not be ready, retry
            raise
        except BaseException:
            s6 = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, 0)
            try:
                s6.connect((config.IPV6_TEST_DOMAIN, 443))
            except socket.gaierror:
                # docker container DNS might not be ready, retry
                raise
            except BaseException:
                log.warning(
                    "Failed to connect to test domain %s via both v6 and v6",
                    config.CONNECTIVITY_TEST_DOMAIN,
                    exc_info=True,
                )
                failed = True

    return not failed