KissPeter/APIFuzzer

View on GitHub
apifuzzer/server_fuzzer.py

Summary

Maintainability
A
1 hr
Test Coverage
B
83%
from kitty.data.report import Report
from kitty.fuzzers import ServerFuzzer
from kitty.model import Container, KittyException

from apifuzzer.utils import get_logger, transform_data_to_bytes


def _flatten_dict_entry(orig_key, v):
    """
    This function is called recursively to list the params in template
    :param orig_key: original key
    :param v: list of params
    :rtype: list
    """
    entries = []
    if isinstance(v, list):
        count = 0
        for elem in v:
            entries.extend(_flatten_dict_entry("%s[%s]" % (orig_key, count), elem))
            count += 1
    elif isinstance(v, dict):
        for k in v:
            entries.extend(_flatten_dict_entry("%s/%s" % (orig_key, k), v[k]))
    else:
        entries.append((orig_key, v))
    return entries


class OpenApiServerFuzzer(ServerFuzzer):
    """Extends the ServerFuzzer with exit after the end message."""

    def not_implemented(self, func_name):
        _ = func_name
        pass

    def __init__(self):
        self.logger = get_logger(self.__class__.__name__)
        self.logger.info("Logger initialized")
        super(OpenApiServerFuzzer, self).__init__(logger=self.logger)

    def _transmit(self, node):
        """
        Where the magic happens. This function prepares the request
        :param node: Kitty template
        :type node: object
        """
        payload = {"content_type": self.model.content_type}
        for key in ["url", "method"]:
            payload[key] = transform_data_to_bytes(node.get_field_by_name(key).render())
        fuzz_places = ["params", "headers", "data", "path_variables"]
        for place in fuzz_places:
            try:
                if place in node._fields_dict:
                    param = node.get_field_by_name(place)
                    _result = self._recurse_params(param)
                    payload[place] = _result
            except KittyException as e:
                self.logger.warning(f"Exception occurred while processing {place}: {e}")
        self._last_payload = payload
        try:
            return self.target.transmit(**payload)
        except Exception as e:
            self.logger.error(f"Error in transmit: {e}")
            raise e

    @staticmethod
    def _recurse_params(param):
        """
        Iterates trough parameters recursively
        :param param: param to process
        :type param: object
        :rtype: dict
        """
        _return = dict()
        if isinstance(param, Container):
            for field in param._fields:
                _return[field.get_name()] = OpenApiServerFuzzer._recurse_params(field)
        elif hasattr(param, "render"):
            _return = transform_data_to_bytes(param.render()).decode(errors="ignore")
        return _return

    def _store_report(self, report):
        """
        Enrich fuzz report
        :param report: report to extend
        """
        self.logger.debug("<in>")
        report.add("test_number", self.model.current_index())
        report.add("fuzz_path", self.model.get_sequence_str())
        test_info = self.model.get_test_info()
        data_model_report = Report(name="Data Model")
        for k, v in test_info.items():
            new_entries = _flatten_dict_entry(k, v)
            for (k_, v_) in new_entries:
                data_model_report.add(k_, v_)
        report.add(data_model_report.get_name(), data_model_report)
        payload = self._last_payload
        if payload is not None:
            data_report = Report("payload")
            data_report.add("raw", payload)
            data_report.add("length", len(payload))
            report.add("payload", data_report)
        else:
            report.add("payload", None)

        self.dataman.store_report(report, self.model.current_index())
        # TODO investigate:
        #  self.dataman.get_report_by_id(self.model.current_index())

    def _test_environment(self):
        """
        Checks the test environment - not used
        """
        sequence = self.model.get_sequence()
        try:
            if self._run_sequence(sequence):
                self.logger.info("Environment test failed")
        except Exception:
            self.logger.info("Environment test failed")