ComplianceAsCode/content

View on GitHub
ssg/content_diff.py

Summary

Maintainability
C
1 day
Test Coverage
F
0%
#!/usr/bin/python3

import difflib
import re
import sys
import xml.etree.ElementTree as ET

import ssg.xml
from ssg.constants import FIX_TYPE_TO_SYSTEM, XCCDF12_NS
from ssg.utils import mkdir_p


class StandardContentDiffer(object):

    def __init__(self, old_content, new_content, rule_id, show_diffs, rule_diffs,
                 only_rules, output_dir):
        self.old_content = old_content
        self.new_content = new_content

        self.rule_id = rule_id
        self.show_diffs = show_diffs
        self.rule_diffs = rule_diffs
        self.only_rules = only_rules

        self.context_lines = 3

        self.check_system_map = {
            "OVAL": {"uri": ssg.constants.oval_namespace, "comp_func": self.compare_ovals},
            "OCIL": {"uri": ssg.constants.ocil_cs, "comp_func": self.compare_ocils}
        }

        self.output_dir = output_dir
        if self.rule_diffs:
            self._ensure_output_dir_exists()

    def _ensure_output_dir_exists(self):
        try:
            mkdir_p(self.output_dir)
        except OSError:
            print("Output path '%s' exists and it is not a directory." % self.output_dir)
            sys.exit(1)

    def output_diff(self, identifier, diff, mode="a"):
        if not diff:
            return

        if self.rule_diffs:
            with open("%s/%s" % (self.output_dir, identifier), mode) as f:
                f.write(diff)
        else:
            print(diff)

    def _get_rules_to_compare(self, benchmark):
        rule_to_find = self.rule_id
        if self.rule_id:
            if not self.rule_id.startswith(ssg.constants.OSCAP_RULE):
                rule_to_find = ssg.constants.OSCAP_RULE + self.rule_id
        rules = benchmark.find_rules(rule_to_find)
        return rules

    def compare_rules(self, old_benchmark, new_benchmark):
        try:
            rules_in_old_benchmark = self._get_rules_to_compare(old_benchmark)
        except ValueError as e:
            print(str(e))
            return

        for old_rule in rules_in_old_benchmark:
            rule_id = old_rule.get_attr("id")
            new_rule = new_benchmark.find_rule(rule_id)
            if new_rule is None:
                print("%s is missing in new data stream." % (rule_id))
                continue
            if self.only_rules:
                continue
            self.compare_rule(old_rule, new_rule, rule_id)
            self.compare_platforms(old_rule, new_rule,
                                   old_benchmark, new_benchmark, rule_id)

        if self.rule_diffs:
            print("Diff files saved at %s." % self.output_dir)

    def compare_rule(self, old_rule, new_rule, identifier):
        self.compare_rule_texts(old_rule, new_rule, identifier)
        self.compare_checks(old_rule, new_rule, "OVAL", identifier)
        self.compare_checks(old_rule, new_rule, "OCIL", identifier)
        for remediation_type in FIX_TYPE_TO_SYSTEM.keys():
            self.compare_remediations(old_rule, new_rule, remediation_type, identifier)

    def _get_list_of_platforms(self, cpe_platforms, idref):
        cpe_list = []
        if len(cpe_platforms) == 0:
            print("Platform {0} not defined in platform specification".format(idref))
            return cpe_list
        elif len(cpe_platforms) > 1:
            print("Platform {0} defined more than once".format(idref))
            for cpe_p in cpe_platforms:
                ET.dump(cpe_p)
        for cpe_platform in cpe_platforms:
            check_fact_refs = cpe_platform.find_all_check_fact_ref_elements()
            for check_fact_ref in check_fact_refs:
                cpe_list.append(check_fact_ref.get("id-ref"))
        return cpe_list

    def compare_platforms(self, old_rule, new_rule, old_benchmark, new_benchmark, identifier):
        entries = [{
                "benchmark": old_benchmark,
                "rule": old_rule,
                "cpe": []
            }, {
                "benchmark": new_benchmark,
                "rule": new_rule,
                "cpe": []
            }]

        for entry in entries:
            for platform in entry["rule"].get_all_platform_elements():
                idref = platform.get("idref")
                if idref.startswith("#"):
                    cpe_platforms = entry["benchmark"].find_all_cpe_platforms(idref)
                    entry["cpe"] += self._get_list_of_platforms(cpe_platforms, idref)
                else:
                    entry["cpe"].append(idref)

        if entries[0]["cpe"] != entries[1]["cpe"]:
            print("Platform has been changed for rule '{0}'".format(identifier))

            if self.show_diffs:
                diff = self.generate_diff_text("\n".join(entries[0]["cpe"])+"\n",
                                               "\n".join(entries[1]["cpe"])+"\n",
                                               fromfile=identifier, tofile=identifier)
                self.output_diff(identifier, diff)

    def compare_rule_texts(self, old_rule, new_rule, identifier):
        old_rule_text = old_rule.join_text_elements()
        new_rule_text = new_rule.join_text_elements()

        if old_rule_text == new_rule_text:
            return

        if old_rule_text != "":
            print(
                "New content has different text for rule '%s'." % (identifier))

        if self.show_diffs:
            diff = self.generate_diff_text(old_rule_text, new_rule_text,
                                           fromfile=identifier, tofile=identifier,
                                           n=self.context_lines)

            self.output_diff(identifier, diff, mode="w")

    def compare_check_ids(self, system, identifier, old_check_id, new_check_id):
        if old_check_id != new_check_id:
            print(
                "%s definition ID for rule '%s' has changed from "
                "'%s' to '%s'." % (
                    system, identifier, old_check_id, new_check_id)
            )

    def compare_check_file_names(self, system, identifier,
                                 old_check_file_name, new_check_file_name):
        if old_check_file_name != new_check_file_name:
            print(
                "%s definition file for rule '%s' has changed from "
                "'%s' to '%s'." % (
                    system, identifier, old_check_file_name, new_check_file_name)
            )

    def get_check_docs(self, system, identifier, old_check_file_name, new_check_file_name):
        try:
            old_check_doc = self.old_content.components.get(system)[old_check_file_name]
        except (KeyError, TypeError):
            print(
                "Rule '%s' points to '%s' which isn't a part of the "
                "old data stream" % (identifier, old_check_file_name))
            old_check_doc = None
        try:
            new_check_doc = self.new_content.components.get(system)[new_check_file_name]
        except (KeyError, TypeError):
            print(
                "Rule '%s' points to '%s' which isn't a part of the "
                "new data stream" % (identifier, new_check_file_name))
            new_check_doc = None
        return old_check_doc, new_check_doc

    def compare_checks(self, old_rule, new_rule, system, identifier):
        check_system_uri = self.check_system_map[system]["uri"]
        old_check = old_rule.get_check_element(check_system_uri)
        new_check = new_rule.get_check_element(check_system_uri)
        if (old_check is None and new_check is not None):
            print("New data stream adds %s for rule '%s'." % (system, identifier))
        elif (old_check is not None and new_check is None):
            print(
                "New data stream is missing %s for rule '%s'." % (system, identifier))
        elif (old_check is not None and new_check is not None):
            old_check_content_ref = old_rule.get_check_content_ref_element(old_check)
            new_check_content_ref = new_rule.get_check_content_ref_element(new_check)
            old_check_id = old_check_content_ref.get("name")
            new_check_id = new_check_content_ref.get("name")
            old_check_file_name = old_check_content_ref.get("href")
            new_check_file_name = new_check_content_ref.get("href")

            self.compare_check_ids(system, identifier, old_check_id, new_check_id)
            self.compare_check_file_names(system, identifier,
                                          old_check_file_name, new_check_file_name)

            if (self.show_diffs and
               identifier != "xccdf_org.ssgproject.content_rule_security_patches_up_to_date"):

                old_check_doc, new_check_doc = self.get_check_docs(system, identifier,
                                                                   old_check_file_name,
                                                                   new_check_file_name)
                if not old_check_doc or not new_check_doc:
                    return

                self.check_system_map[system]["comp_func"](old_check_doc, old_check_id,
                                                           new_check_doc, new_check_id, identifier)

    def compare_ovals(self, old_oval_def_doc, old_oval_def_id,
                      new_oval_def_doc, new_oval_def_id, identifier):
        old_def = old_oval_def_doc.find_oval_definition(old_oval_def_id)
        new_def = new_oval_def_doc.find_oval_definition(new_oval_def_id)
        old_els = old_def.get_elements()
        new_els = new_def.get_elements()

        old_els_text = self.serialize_elements(old_els)
        new_els_text = self.serialize_elements(new_els)
        diff = self.generate_diff_text(old_els_text, new_els_text,
                                       fromfile=old_oval_def_id, tofile=new_oval_def_id)
        if diff:
            print("OVAL for rule '%s' differs." % (identifier))
        self.output_diff(identifier, diff)

    def compare_ocils(self, old_ocil_doc, old_ocil_id, new_ocil_doc, new_ocil_id, identifier):
        try:
            old_question = old_ocil_doc.find_boolean_question(old_ocil_id)
            new_question = new_ocil_doc.find_boolean_question(new_ocil_id)
        except ValueError as e:
            print("Rule '%s' OCIL can't be found: %s" % (identifier, str(e)))
            return
        diff = self.generate_diff_text(old_question, new_question,
                                       fromfile=old_ocil_id, tofile=new_ocil_id)
        if diff:
            print("OCIL for rule '%s' differs." % identifier)
        self.output_diff(identifier, diff)

    def compare_remediations(self, old_rule, new_rule, remediation_type, identifier):
        system = FIX_TYPE_TO_SYSTEM[remediation_type]
        old_fix = old_rule.get_fix_element(system)
        new_fix = new_rule.get_fix_element(system)
        if (old_fix is None and new_fix is not None):
            print("New data stream adds %s remediation for rule '%s'." % (
                remediation_type, identifier))
        elif (old_fix is not None and new_fix is None):
            print("New data stream is missing %s remediation for rule '%s'." % (
                remediation_type, identifier))
        elif (old_fix is not None and new_fix is not None):
            self._compare_fix_elements(
                old_fix, new_fix, remediation_type, identifier)

    def _compare_fix_elements(self, old_fix, new_fix, remediation_type, identifier):
        old_fix_id = old_fix.get("id")
        new_fix_id = new_fix.get("id")
        if old_fix_id != new_fix_id:
            print(
                "%s remediation ID for rule '%s' has changed from "
                "'%s' to '%s'." % (
                    remediation_type, identifier, old_fix_id, new_fix_id)
            )
        if self.show_diffs:
            old_fix_text = "".join(old_fix.itertext())
            new_fix_text = "".join(new_fix.itertext())
            diff = self.generate_diff_text(old_fix_text, new_fix_text,
                                           fromfile=identifier, tofile=identifier)
            if diff:
                print("%s remediation for rule '%s' differs." % (remediation_type, identifier))
            self.output_diff(identifier, diff)

    def generate_diff_text(self, old_r, new_r,
                           fromfile="old data stream", tofile="new data stream", n=3):
        if old_r != new_r:
            diff = "".join(difflib.unified_diff(
                old_r.splitlines(keepends=True), new_r.splitlines(keepends=True),
                fromfile=fromfile, tofile=tofile, n=n))
            return diff
        return None

    def serialize_elements(self, elements):
        text = ""
        for thing, atrribute in elements:
            text += "%s %s\n" % (thing, atrribute)
        return text


class StigContentDiffer(StandardContentDiffer):

    def __init__(self, old_content, new_content, rule_id, show_diffs, rule_diffs,
                 only_rules, output_dir):
        super(StigContentDiffer, self).__init__(old_content, new_content, rule_id,
                                                show_diffs, rule_diffs, only_rules, output_dir)

        self.context_lines = 200

    def _get_stig_id(self, element):
        return element.get_version_element().text

    def get_stig_rule_SV(self, sv_rule_id):
        stig_rule_id = re.search(r'(SV-\d+)r\d+_rule', sv_rule_id)
        if not stig_rule_id:
            print("The rule '%s' doesn't have the usual STIG format: 'SV-XXXXXXrXXXXXX_rule.\n"
                  "Please make sure the input contents are DISA STIG Benchmarks." % sv_rule_id)
            sys.exit(1)
        return stig_rule_id.group(1)

    def compare_rules(self, old_benchmark, new_benchmark):
        try:
            rules_in_old_benchmark = self._get_rules_to_compare(old_benchmark)
            rules_in_new_benchmark = self._get_rules_to_compare(new_benchmark)
        except ValueError as e:
            print(str(e))
            return

        # The rules in STIG Benchmarks will have their IDS changed whenever there is an update.
        # However, only the release number changes, the SV number stays the same.
        # This creates map the SV numbers to their equivalent full IDs in the new Benchmark.
        new_rule_mapping = {self.get_stig_rule_SV(rule.get_attr("id")): rule.get_attr("id")
                            for rule in rules_in_new_benchmark}
        old_rule_mapping = {self.get_stig_rule_SV(rule.get_attr("id")): rule.get_attr("id")
                            for rule in rules_in_old_benchmark}

        self.compare_existing_rules(new_benchmark,
                                    old_benchmark, rules_in_old_benchmark, new_rule_mapping)

        self.check_for_new_rules(rules_in_new_benchmark, old_rule_mapping)

        if self.rule_diffs:
            print("Diff files saved at %s." % self.output_dir)

    def compare_existing_rules(self, new_benchmark, old_benchmark,
                               rules_in_old_benchmark, new_rule_mapping):
        for old_rule in rules_in_old_benchmark:
            old_sv_rule_id = self.get_stig_rule_SV(old_rule.get_attr("id"))
            old_stig_id = self._get_stig_id(old_rule)
            try:
                new_sv_rule_id = new_rule_mapping[old_sv_rule_id]
            except KeyError:
                print("%s is missing in new data stream." % old_stig_id)
                continue
            if self.only_rules:
                continue

            new_rule = new_benchmark.find_rule(new_sv_rule_id)
            new_stig_id = self._get_stig_id(new_rule)

            self.compare_rule(old_rule, new_rule, new_stig_id)
            self.compare_platforms(old_rule, new_rule,
                                   old_benchmark, new_benchmark, new_stig_id)

    def check_for_new_rules(self, rules_in_new_benchmark, old_rule_mapping):
        # Check for rules added in new content
        for new_rule in rules_in_new_benchmark:
            new_stig_id = self._get_stig_id(new_rule)
            new_sv_rule_id = self.get_stig_rule_SV(new_rule.get_attr("id"))
            try:
                old_sv_rule_id = old_rule_mapping[new_sv_rule_id]
            except KeyError:
                print("%s was added in new data stream." % (new_stig_id))

                # Compare against empty rule so that a diff is generated
                empty_rule = ssg.xml.XMLRule(ET.Element("{%s}Rule" % XCCDF12_NS))
                self.compare_rule(empty_rule, new_rule, new_stig_id)