secureCodeBox/secureCodeBox

View on GitHub
scanners/zap-advanced/scanner/zapclient/spider/zap_spider_http.py

Summary

Maintainability
C
1 day
Test Coverage
#!/usr/bin/env python

# SPDX-FileCopyrightText: the secureCodeBox authors
#
# SPDX-License-Identifier: Apache-2.0

# -*- coding: utf-8 -*-

import time
import collections
import logging

from zapv2 import ZAPv2, spider

from ..configuration import ZapConfiguration
from . import ZapConfigureSpider

# set up logging to file - see previous section for more details
from ..configuration.helpers import ZapConfigurationContextUsers

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(name)-12s %(levelname)-8s: %(message)s",
    datefmt="%Y-%m-%d %H:%M",
)

logging = logging.getLogger("ZapConfigureSpiderHttp")


class ZapConfigureSpiderHttp(ZapConfigureSpider):
    """This class configures a ZAP HTTP Spider in a running ZAP instance, based on a ZAP Configuration.

    Based on this opensource ZAP Python example:
    - https://github.com/zaproxy/zap-api-python/blob/9bab9bf1862df389a32aab15ea4a910551ba5bfc/src/examples/zap_example_api_script.py
    """

    def __init__(self, zap: ZAPv2, config: ZapConfiguration):
        """Initial constructor used for this class

        Parameters
        ----------
        zap : ZAPv2
            The running ZAP instance to configure.
        config : ZapConfiguration
            The configuration object containing all ZAP configs (based on the class ZapConfiguration).
        """
        self.__spider_id = -1

        super().__init__(zap, config)

    @property
    def get_zap_spider(self) -> spider:
        """Returns the spider of the currently running ZAP instance."""
        return self.get_zap.spider

    @property
    def get_spider_id(self) -> int:
        """Returns the spider id of the currently running ZAP instance."""
        return self.__spider_id

    def has_spider_id(self) -> bool:
        """Returns a spider is currently running in the ZAP instance."""
        return self.__spider_id > 0

    def start_spider(self, url: str, spider_config: collections.OrderedDict):
        """Starts a ZAP Spider with the given spiders configuration, based on the internal referenced ZAP instance.

        Parameters
        ----------
        spider_config: collections.OrderedDict
            The spider configuration based on ZapConfiguration.
        """
        user_id = None
        context_id = None
        context_name = None
        target = ""

        # Clear all existing/previous spider data
        logging.debug("Removing all pre existing spider scans.")
        self.get_zap.spider.remove_all_scans()

        # Open first URL before the spider start's to crawl
        self.get_zap.core.access_url(url)

        if spider_config is not None:

            if "url" in spider_config:
                target = str(spider_config["url"])
            else:
                logging.warning(
                    "The spider configuration section has no specific 'url' target defined, trying to use scanType target instead with url: '%s'",
                    url,
                )
                target = url

            # Configure Spider Options if there are any
            self.configure_spider(spider_config)

            # "Context" is an optional config for spider
            if self._is_not_empty("context", spider_config):

                context_name = str(spider_config["context"])
                spider_context_config = self.get_config.get_active_context_config
                context_id = int(spider_context_config["id"])

                # "User" is an optional config for spider in addition to the context
                if self._is_not_empty("user", spider_config):
                    user_name = str(spider_config["user"])
                    # search for the configured user by its user name in the active context
                    user_id = ZapConfigurationContextUsers.get_context_user_by_name(
                        spider_context_config, user_name
                    )["id"]
            else:
                logging.warning(
                    "No context 'context: XYZ' referenced within the spider config. This is ok but maybe not intended."
                )

            logging.warning("context_id is currently: %s", context_id)
            logging.warning("user_id is currently: %s", user_id)
            if (
                (context_id is not None)
                and int(context_id) >= 0
                and (user_id is not None)
                and int(user_id) >= 0
            ):
                logging.info(
                    "Starting 'traditional' Spider(target=%s) with Context(%s) and User(%s)",
                    target,
                    context_id,
                    user_id,
                )
                result = self.get_zap_spider.scan_as_user(
                    url=target, contextid=context_id, userid=user_id
                )
            else:
                logging.info(
                    "Starting 'traditional' Spider(target=%s) with Context(%s)",
                    target,
                    context_name,
                )
                result = self.get_zap_spider.scan(url=target, contextname=context_name)
        else:
            logging.info(
                "Starting 'traditional' Spider(target=%s) without any additinal configuration!",
                url,
            )
            result = self.get_zap_spider.scan(url=url, contextname=None)

        # Check if spider is running successfully
        if (not str(result).isdigit()) or int(result) < 0:
            logging.error("Spider couldn't be started due to errors: %s", result)
            raise RuntimeError("Spider couldn't be started due to errors: %s", result)
        else:
            logging.info("HTTP Spider successfully started with id: %s", result)
            self.__spider_id = int(result)
            # Give the scanner a chance to start
            time.sleep(5)

            self.wait_until_spider_finished()

    def configure_spider(self, spider_config: collections.OrderedDict):
        """Configures a ZAP HTTP Spider with the given spider configuration, based on the running ZAP instance.

        Parameters
        ----------
        spider_config: collections.OrderedDict
            The spider configuration based on ZapConfiguration.
        """

        logging.debug("Trying to configure the Spider")
        self.configure_scripts(config=spider_config)

        # Configure Spider (ajax or http)

        if self._is_not_empty_integer("maxDuration", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_max_duration(
                    integer=str(spider_config["maxDuration"])
                ),
                method_name="set_option_max_duration",
            )
        if self._is_not_empty_integer("maxDepth", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_max_depth(
                    integer=str(spider_config["maxDepth"])
                ),
                method_name="set_option_max_depth",
            )
        if self._is_not_empty_integer("maxChildren", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_max_children(
                    integer=str(spider_config["maxChildren"])
                ),
                method_name="set_option_max_children",
            )
        if self._is_not_empty_integer("maxParseSizeBytes", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_max_parse_size_bytes(
                    integer=str(spider_config["maxParseSizeBytes"])
                ),
                method_name="set_option_max_parse_size_bytes",
            )
        if self._is_not_empty_bool("acceptCookies", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_accept_cookies(
                    boolean=str(spider_config["acceptCookies"])
                ),
                method_name="set_option_accept_cookies",
            )
        if self._is_not_empty_bool("handleODataParametersVisited", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_handle_o_data_parameters_visited(
                    boolean=str(spider_config["handleODataParametersVisited"])
                ),
                method_name="set_option_handle_o_data_parameters_visited",
            )
        if self._is_not_empty_bool("handleParameters", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_handle_parameters(
                    string=str(spider_config["handleParameters"])
                ),
                method_name="set_option_handle_parameters",
            )

        if self._is_not_empty_bool("parseComments", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_parse_comments(
                    boolean=str(spider_config["parseComments"])
                ),
                method_name="set_option_parse_comments",
            )
        if self._is_not_empty_bool("parseGit", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_parse_git(
                    boolean=str(spider_config["parseGit"])
                ),
                method_name="set_option_parse_git",
            )
        if self._is_not_empty_bool("parseRobotsTxt", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_parse_robots_txt(
                    boolean=str(spider_config["parseRobotsTxt"])
                ),
                method_name="set_option_parse_robots_txt",
            )
        if self._is_not_empty_bool("parseSitemapXml", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_parse_sitemap_xml(
                    boolean=str(spider_config["parseSitemapXml"])
                ),
                method_name="set_option_parse_sitemap_xml",
            )
        if self._is_not_empty_bool("parseSVNEntries", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_parse_svn_entries(
                    boolean=str(spider_config["parseSVNEntries"])
                ),
                method_name="set_option_parse_svn_entries",
            )
        if self._is_not_empty_bool("postForm", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_post_form(
                    boolean=str(spider_config["postForm"])
                ),
                method_name="set_option_post_form",
            )
        if self._is_not_empty_bool("processForm", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_process_form(
                    boolean=str(spider_config["processForm"])
                ),
                method_name="set_option_process_form",
            )

        if self._is_not_empty_integer("requestWaitTime", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_request_wait_time(
                    integer=str(spider_config["requestWaitTime"])
                ),
                method_name="set_option_request_wait_time",
            )
        if self._is_not_empty_bool("sendRefererHeader", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_send_referer_header(
                    boolean=str(spider_config["sendRefererHeader"])
                ),
                method_name="set_option_send_referer_header",
            )
        if self._is_not_empty_integer("threadCount", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_thread_count(
                    integer=str(spider_config["threadCount"])
                ),
                method_name="set_option_thread_count",
            )
        if self._is_not_empty_string("userAgent", spider_config):
            self.check_zap_result(
                result=self.get_zap_spider.set_option_user_agent(
                    string=str(spider_config["userAgent"])
                ),
                method_name="set_option_user_agent",
            )

        if self._is_not_empty_integer("failIfFoundUrlsLessThan", spider_config):
            self.failIfFoundUrlsLessThan = spider_config["failIfFoundUrlsLessThan"]
        else:
            self.failIfFoundUrlsLessThan = 0  # Default value
        
        if self._is_not_empty_integer("warnIfFoundUrlsLessThan", spider_config):   
            self.warnIfFoundUrlsLessThan = spider_config["warnIfFoundUrlsLessThan"]
        else:
            self.warnIfFoundUrlsLessThan = 0  # Default value   

    def check_if_spider_completed(self):
        progress = int(self.get_zap_spider.status(self.get_spider_id))
        logging.info("HTTP Spider(%d) progress: %d", self.get_spider_id, progress)
        return progress >= 100

    def print_spider_summary(self):
        """Method to print out a summary of the spider results"""
        logging.info("HTTP Spider(%s) completed", str(self.get_spider_id))

        num_urls = len(self.get_zap.core.urls())
        if num_urls == 0:
            logging.error(
                "No URLs found - is the target URL accessible? Local services may not be accessible from the Docker container."
            )
            raise RuntimeError(
                "No URLs found by ZAP Spider :-( - is the target URL accessible? Local services may not be accessible from the Docker container."
            )
        
        elif num_urls < self.failIfFoundUrlsLessThan:
            logging.error(
                "Found URLs are less than {self.failIfFoundUrlsLessThan}, failing process."
            )
            raise RuntimeError(
                "Found URLs are less than {self.failIfFoundUrlsLessThan} by ZAP Spider, failing process."
            )
        
        elif num_urls < self.warnIfFoundUrlsLessThan:
            logging.warning(
                "Found URLs are less than {self.warnIfFoundUrlsLessThan}, continuing process."
            )
        else:
            for url in self.get_zap_spider.results(scanid=self.get_spider_id):
                logging.info("Spidered URL: %s", url)
            logging.info(
                "Spider(%s) found total: %s URLs",
                str(self.get_spider_id),
                str(num_urls),
            )

    def stop_spider(self):
        self.get_zap_spider.stop()