ComplianceAsCode/content

View on GitHub
tests/test_machine_only_rules.py

Summary

Maintainability
B
4 hrs
Test Coverage
#!/usr/bin/python3

import os
import argparse
import xml.etree.ElementTree as ET
import sys
import ssg.constants
import ssg.yaml
import io
import re


BASH_MACHINE_CONDITIONAL = re.compile(
    r'^.*\[ ! -f /.dockerenv \] && \[ ! -f /run/.containerenv \].*$', re.M)
ANSIBLE_MACHINE_CONDITIONAL = re.compile(
    r'ansible_virtualization_type not in \["docker",\s+"lxc",\s+"openvz",\s+"podman",\s+' +
    '"container"\]',
    re.M)
MACHINE_PLATFORM_ONE_LINE = re.compile(
    r'^\s*platform:\s+machine\s*$', re.M)
MACHINE_PLATFORM_MULTILINE = re.compile(
    r'^\s*platforms:\s*\n(\s+-\s+.*machine.*)+', re.M)


def main():
    args = parse_command_line_args()
    test_result = 0
    for product in ssg.constants.product_directories:
        ds_path = os.path.join(args.build_dir, "ssg-" + product + "-ds.xml")
        if not os.path.exists(ds_path):
            sys.stderr.write("The product data stream '%s' hasn't been build, "
                             "skipping the test.\n" % (ds_path))
            continue
        product_dir = os.path.join(args.source_dir, "products", product)
        product_yaml_path = os.path.join(product_dir, "product.yml")
        product_yaml = ssg.yaml.open_raw(product_yaml_path)
        guide_dir = os.path.abspath(
            os.path.join(product_dir, product_yaml['benchmark_root']))
        additional_content_directories = product_yaml.get("additional_content_directories", [])
        add_content_dirs = [os.path.abspath(
            os.path.join(product_dir, rd)) for rd in additional_content_directories]
        if not check_product(ds_path, [guide_dir] + add_content_dirs):
            test_result = 1
    if test_result:
        sys.exit(1)


def check_product(ds_path, rules_dirs):
    input_rules = scan_rules_groups(rules_dirs, False)
    return not machine_platform_missing_in_rules(ds_path, input_rules)


def shorten_id(full_id):
    id_prefix = ssg.constants.OSCAP_RULE
    return full_id.replace(id_prefix, "")


def get_only_elements_to_check_from_benchmark(benchmark, element_query, short_ids_to_check):
    elements = []
    for elem in benchmark.findall(element_query):
        elem_short_id = shorten_id(elem.get("id"))
        if elem_short_id in short_ids_to_check:
            elements.append(elem)
    return elements


def get_element_fix_text_by_system(element):
    all_fixes = element.findall("{%s}fix" % ssg.constants.XCCDF12_NS)
    fixes_by_system = dict()
    for f in all_fixes:
        system = f.get("system")
        fixes_by_system[system] = "".join(f.itertext())
    return fixes_by_system


def machine_platform_missing_in_rules(ds_path, short_ids_to_check):
    machine_platform_missing = False
    tree = ET.parse(ds_path)
    root = tree.getroot()
    only_rules_query = ".//{%s}Rule" % ssg.constants.XCCDF12_NS
    benchmark = root.find(".//{%s}Benchmark" % ssg.constants.XCCDF12_NS)
    elements_to_check = get_only_elements_to_check_from_benchmark(
            benchmark, only_rules_query, short_ids_to_check)

    for elem in elements_to_check:
        element_fixes = get_element_fix_text_by_system(elem)

        maybe_bash_fix_text = element_fixes.get(ssg.constants.bash_system)
        if maybe_bash_fix_text is None:
            bash_fix_present = False
            bash_fix_has_machine_conditional = False
        else:
            bash_fix_present = True
            bash_fix_has_machine_conditional = BASH_MACHINE_CONDITIONAL.search(
                    maybe_bash_fix_text)

        maybe_ansible_fix_text = element_fixes.get(ssg.constants.ansible_system)
        if maybe_ansible_fix_text is None:
            ansible_fix_present = False
            ansible_fix_has_machine_conditional = False
        else:
            ansible_fix_present = True
            ansible_fix_has_machine_conditional = ANSIBLE_MACHINE_CONDITIONAL.search(
                    maybe_ansible_fix_text)

        elem_short_id = shorten_id(elem.get("id"))
        if ansible_fix_present and not ansible_fix_has_machine_conditional:
            sys.stderr.write(
                "Rule %s in %s is missing a machine conditional in Ansible remediation\n" %
                (elem_short_id, ds_path))
            machine_platform_missing = True
        if bash_fix_present and not bash_fix_has_machine_conditional:
            sys.stderr.write(
                "Rule %s in %s is missing a machine conditional in Bash remediation\n" %
                (elem_short_id, ds_path))
            machine_platform_missing = True
    return machine_platform_missing


def parse_command_line_args():
    parser = argparse.ArgumentParser(
        description="Tests if 'machine' CPEs are "
                    "propagated to the built data stream")
    parser.add_argument("--source_dir", required=True,
                        help="Content source directory path")
    parser.add_argument("--build_dir", required=True,
                        help="Build directory containing built data streams")
    args = parser.parse_args()
    return args


def check_if_machine_only(dirpath, name, is_machine_only_group):
    if name in os.listdir(dirpath):
        if is_machine_only_group:
            return True
        yml_path = os.path.join(dirpath, name)
        with io.open(yml_path, "r", encoding="utf-8") as yml_file:
            yml_file_contents = yml_file.read()
            single_line_platform_found = MACHINE_PLATFORM_ONE_LINE.search(yml_file_contents)
            multiline_platform_found = MACHINE_PLATFORM_MULTILINE.search(yml_file_contents)
            if single_line_platform_found or multiline_platform_found:
                return True
    return False


def scan_rules_groups(dir_paths, parent_machine_only):
    groups = set()
    rules = set()
    for dir_path in dir_paths:
        _, rules = scan_rules_group(dir_path, parent_machine_only, groups, rules)
    return rules


def scan_rules_group(dir_path, parent_machine_only, groups, rules):
    name = os.path.basename(dir_path)
    is_machine_only = False
    if check_if_machine_only(dir_path, "group.yml", parent_machine_only):
        groups.add(name)
        is_machine_only = True
    if check_if_machine_only(dir_path, "rule.yml", parent_machine_only):
        rules.add(name)
    for dir_item in os.listdir(dir_path):
        subdir_path = os.path.join(dir_path, dir_item)
        if os.path.isdir(subdir_path):
            subdir_groups, subdir_rules = scan_rules_group(
                subdir_path, is_machine_only, groups, rules)
            groups |= subdir_groups
            rules |= subdir_rules
    return groups, rules


if __name__ == "__main__":
    main()