ComplianceAsCode/content

View on GitHub
utils/create_scap_delta_tailoring.py

Summary

Maintainability
B
4 hrs
Test Coverage
#!/usr/bin/python3

import argparse
import datetime
import json
import os
import re
import sys
import xml.etree.ElementTree as ET

import ssg.build_yaml
import ssg.constants
import ssg.rules
import ssg.yaml
import ssg.environment

SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
SSG_BUILD_ROOT = os.path.join(SSG_ROOT, "build")
RULES_JSON = os.path.join(SSG_BUILD_ROOT, "rule_dirs.json")
BUILD_CONFIG = os.path.join(SSG_BUILD_ROOT, "build_config.yml")
NS = {'scap': ssg.constants.datastream_namespace,
      'xccdf-1.2': ssg.constants.XCCDF12_NS}
PROFILE = 'stig'


def get_profile(product, profile_name):
    ds_root = ET.parse(os.path.join(SSG_ROOT, 'build', 'ssg-{product}-ds.xml'
                                    .format(product=product))).getroot()
    profiles = ds_root.findall(
        './/{{{scap}}}component/{{{xccdf}}}Benchmark/{{{xccdf}}}Profile'.format(
            scap=NS["scap"], xccdf=NS["xccdf-1.2"])
    )

    profile_name_fqdn = "xccdf_org.ssgproject.content_profile_{profile_name}".format(
        profile_name=profile_name)

    for profile in profiles:
        if profile.attrib['id'] == profile_name_fqdn:
            return profile
    raise ValueError("Profile %s was not found." % profile_name_fqdn)


get_profile.__annotations__ = {'product': str, 'profile_name': str, 'return': ET.Element}


def filter_out_implemented_rules(known_rules, ns, root):
    needed_rules = known_rules.copy()
    groups = root.findall('.//scap:component/xccdf-1.2:Benchmark/xccdf-1.2:Group', ns)
    for group in groups:
        for stig in group.findall('xccdf-1.2:Rule', ns):
            stig_id = stig.find('xccdf-1.2:version', ns).text
            check = stig.find('xccdf-1.2:check', ns)
            if stig_id in known_rules.keys() and len(check) > 0:
                del needed_rules[stig_id]
    return needed_rules


filter_out_implemented_rules.__annotations__ = {'known_rules': dict, 'ns': dict,
                                                'root': ET.Element, 'return': dict}


def handle_rule_yaml(product, rule_id, rule_dir, guide_dir, env_yaml):
    rule_obj = {'id': rule_id, 'dir': rule_dir, 'guide': guide_dir}
    rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)

    rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env_yaml=env_yaml)
    rule_yaml.normalize(product)
    rule_obj['references'] = rule_yaml.references
    return rule_obj


handle_rule_yaml.__annotations__ = {'product': str, 'rule_id': str, 'rule_dir': str,
                                    'guide_dir': str, 'env_yaml': dict, 'return': dict}


def get_platform_rules(product, json_path, resolved_rules_dir, build_root):
    platform_rules = list()
    if resolved_rules_dir:
        rules_path = os.path.join(build_root, product, 'rules')
        for file in os.listdir(rules_path):
            if not file.endswith('.yml'):
                continue
            rule_id = file.replace('.yml', '')
            rule = dict()
            rule_yaml = ssg.yaml.open_raw(os.path.join(rules_path, file))
            rule['id'] = rule_id
            rule['references'] = dict()
            rule['references'] = rule_yaml['references']
            platform_rules.append(rule)
    else:
        rules_json_file = open(json_path, 'r')
        rules_json = json.load(rules_json_file)
        for rule in rules_json.values():
            if product in rule['products']:
                platform_rules.append(rule)
        if not rules_json_file.closed:
            rules_json_file.close()
    return platform_rules


get_platform_rules.__annotations__ = {'product': str, 'json_path': str, 'resolved_rules_dir': bool,
                                      'build_root': str, 'return': list}


def _open_rule_obj(resolved_rules_dir, rule, product, env_yaml):
    if resolved_rules_dir:
        return rule
    try:
        rule_obj = handle_rule_yaml(
            product, rule['id'], rule['dir'], rule['guide'], env_yaml)
    except ssg.yaml.DocumentationNotComplete:
        msg = 'Rule %s throw DocumentationNotComplete' % rule['id']
        sys.stderr.write(msg)
        # Happens on non-debug build when a rule is "documentation-incomplete"
        return None
    return rule_obj


def get_implemented_stigs(product, root_path, build_config_yaml_path,
                          reference_str, json_path, resolved_rules_dir,
                          build_root):
    platform_rules = get_platform_rules(product, json_path, resolved_rules_dir, build_root)

    product_dir = os.path.join(root_path, "products", product)
    product_yaml_path = os.path.join(product_dir, "product.yml")
    env_yaml = ssg.environment.open_environment(
        build_config_yaml_path, product_yaml_path, os.path.join(root_path, "product_properties"))

    known_rules = dict()
    for rule in platform_rules:
        rule_obj = _open_rule_obj(resolved_rules_dir, rule, product, env_yaml)
        if not rule_obj:
            continue
        if reference_str not in rule_obj['references'].keys():
            continue
        refs = rule_obj['references'][reference_str]
        for ref in refs:
            if ref in known_rules:
                known_rules[ref].append(rule['id'])
            else:
                known_rules[ref] = [rule['id']]
    return known_rules


get_implemented_stigs.__annotations__ = {'product': str, 'root_path': str,
                                         'build_config_yaml_path': str, 'reference_str': str,
                                         'json_path': str, 'resolved_rules_dir': bool,
                                         'build_root': str, 'return': dict}


def setup_tailoring_profile(profile_id, profile_root):
    tailoring_profile = ET.Element('xccdf-1.2:Profile')
    if profile_id:
        tailoring_profile.set('id', '{oscap}{profile_id}'
                              .format(oscap=ssg.constants.OSCAP_PROFILE, profile_id=profile_id))
    else:
        tailoring_profile.set('id', '{id}_delta_tailoring'.format(id=profile_root.attrib["id"]))
    tailoring_profile.append(profile_root.find('xccdf-1.2:title', NS))
    tailoring_profile.append(profile_root.find('xccdf-1.2:description', NS))
    tailoring_profile.set('extends', profile_root.get('id'))
    return tailoring_profile


setup_tailoring_profile.__annotations__ = {'profile_id': str, 'profile_root': ET.Element}


def _get_datetime():
    try:
        return datetime.datetime.now(datetime.UTC).isoformat()
    except AttributeError:
        return datetime.datetime.utcnow().isoformat()


def create_tailoring(args):
    benchmark_root = ET.parse(args.manual).getroot()
    known_rules = get_implemented_stigs(args.product, args.root, args.build_config_yaml,
                                        args.reference, args.json, args.resolved_rules_dir,
                                        args.build_root)
    needed_rules = filter_out_implemented_rules(known_rules, NS, benchmark_root)
    profile_root = get_profile(args.product, args.profile)
    selections = profile_root.findall('xccdf-1.2:select', NS)
    tailoring_profile = setup_tailoring_profile(args.profile_id, profile_root)
    for selection in selections:
        if selection.attrib['idref'].startswith(ssg.constants.OSCAP_RULE):
            cac_rule_id = selection.attrib['idref'].replace(ssg.constants.OSCAP_RULE, '')
            desired_value = str([cac_rule_id] in list(needed_rules.values())).lower()
            if not bool(selection.get('selected')) == desired_value:
                selection.set('selected', desired_value)
                tailoring_profile.append(selection)
                if not args.quiet:
                    print('Set rule "{cac_rule_id}" selection state to {desired_value}'
                          .format(cac_rule_id=cac_rule_id, desired_value=desired_value))

    tailoring_root = ET.Element('xccdf-1.2:Tailoring')
    version = ET.SubElement(tailoring_root, 'xccdf-1.2:version',
                            attrib={'time': _get_datetime()})
    version.text = '1'
    tailoring_root.set('id', args.tailoring_id)
    tailoring_root.append(tailoring_profile)
    return tailoring_root


create_tailoring.__annotations__ = {'args': argparse.Namespace, 'return': ET.Element}


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
                        help="Path to SSG root directory (defaults to {ssg_root})"
                        .format(ssg_root=SSG_ROOT))
    parser.add_argument("-p", "--product", type=str, action="store", required=True,
                        help="What product to produce the tailoring file for")
    parser.add_argument("-m", "--manual", type=str, action="store", required=True,
                        help="Path to XML XCCDF manual file to use as the source")
    parser.add_argument("-j", "--json", type=str, action="store", default=RULES_JSON,
                        help="Path to the rules_dir.json (defaults to build/stig_control.json)")
    parser.add_argument("-c", "--build-config-yaml", default=BUILD_CONFIG,
                        help="YAML file with information about the build configuration. ")
    parser.add_argument("-b", "--profile", type=str, default="stig",
                        help="What profile to use. Defaults to stig")
    parser.add_argument("-ref", "--reference", type=str, default="stigid",
                        help="Reference system to check for, defaults to stigid")
    parser.add_argument("-o", "--output", type=str,
                        help="Defaults build/PRODUCT_PROFILE_tailoring.xml")
    parser.add_argument("--profile-id", type=str,
                        help="Id of the created profile. Defaults to PROFILE_delta")
    parser.add_argument("--tailoring-id", type=str,
                        default='xccdf_content-disa-delta_tailoring_default',
                        help="Id of the created tailoring file. "
                             "Defaults to xccdf_content-disa-delta_tailoring_default")
    parser.add_argument("-q", "--quiet", action='store_true',
                        help="The script will not produce any output.")
    parser.add_argument("--resolved-rules-dir", action='store_true',
                        help="The script will not use rules_dir.json, "
                             "but instead uses the data from the build.")
    parser.add_argument('-B', '--build-root', type=str, default=SSG_BUILD_ROOT,
                        help="The root of the CMake working directory, "
                             "defaults to {ssg_build_root}".format(ssg_build_root=SSG_BUILD_ROOT))
    parser.add_argument("-d", "--dry-run", action="store_true",
                        help="If set the script will not output.")
    return parser.parse_args()


parse_args.__annotations__ = {'return': argparse.Namespace}


def main():
    args = parse_args()
    ET.register_namespace('xccdf-1.2', ssg.constants.XCCDF12_NS)
    tailoring_root = create_tailoring(args)
    tree = ET.ElementTree(tailoring_root)
    manual_version = re.search(r'(v[0-9]+r[0-9]+)', args.manual)
    if manual_version is None:
        sys.stderr.write("Unable to find version from file name.\n")
        sys.stderr.write("The string v[NUM]r[NUM] must be in the filename.\n")
        exit(1)

    if args.dry_run:
        return 0

    if args.output:
        out = os.path.join(args.output)
    else:
        out = os.path.join(SSG_ROOT, 'build',
                           '{product}_{profile}_{manual_version}_delta_tailoring.xml'
                           .format(product=args.product, profile=args.profile,
                                   manual_version=manual_version.group(0)))
    tree.write(out)
    if not args.quiet:
        print("Wrote tailoring file to {out}.".format(out=out))


if __name__ == '__main__':
    main()