utils/create_srg_export.py
#!/usr/bin/python3
import argparse
import csv
import datetime
import json
import pathlib
import os
import re
import sys
import yaml
from typing import TextIO
import xml.etree.ElementTree as ET
from utils.srg_export import html, md, xlsx
from utils.srg_export.data import HEADERS, get_iacontrol_mapping
try:
import ssg.build_stig
import ssg.build_yaml
import ssg.constants
import ssg.controls
import ssg.environment
import ssg.rules
import ssg.yaml
except (ModuleNotFoundError, ImportError):
sys.stderr.write("Unable to load ssg python modules.\n")
sys.stderr.write("Hint: run source ./.pyenv.sh\n")
exit(3)
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
RULES_JSON = os.path.join(SSG_ROOT, "build", "rule_dirs.json")
BUILD_CONFIG = os.path.join(SSG_ROOT, "build", "build_config.yml")
OUTPUT = os.path.join(SSG_ROOT, 'build',
f'{datetime.datetime.now().strftime("%s")}_stig_export.csv')
SRG_PATH = os.path.join(SSG_ROOT, 'shared', 'references', 'disa-os-srg-v3r1.xml')
NS = {'scap': ssg.constants.datastream_namespace,
'xccdf-1.2': ssg.constants.XCCDF12_NS,
'xccdf-1.1': ssg.constants.XCCDF11_NS}
def get_iacontrol(srgs: list) -> str:
result = list()
for srg in srgs:
mapping = get_iacontrol_mapping(srg)
if mapping is None:
continue
if srg in mapping:
result.append(mapping[srg])
result_set = set(result)
return ','.join(str(srg) for srg in result_set)
class DisaStatus:
PENDING = "pending"
PLANNED = "planned"
NOT_APPLICABLE = "Not Applicable"
INHERENTLY_MET = "Applicable - Inherently Met"
DOCUMENTATION = "documentation"
PARTIAL = "Applicable - Configurable"
SUPPORTED = "supported"
AUTOMATED = "Applicable - Configurable"
DOES_NOT_MEET = "Applicable - Does Not Meet"
@staticmethod
def from_string(source: str) -> str:
data = {
ssg.controls.Status.INHERENTLY_MET: DisaStatus.INHERENTLY_MET,
ssg.controls.Status.DOES_NOT_MEET: DisaStatus.DOES_NOT_MEET,
ssg.controls.Status.NOT_APPLICABLE: DisaStatus.NOT_APPLICABLE,
ssg.controls.Status.AUTOMATED: DisaStatus.AUTOMATED,
ssg.controls.Status.MANUAL: DisaStatus.AUTOMATED
}
return data.get(source, source)
STATUSES = {AUTOMATED, INHERENTLY_MET, DOES_NOT_MEET, NOT_APPLICABLE}
def html_plain_text(source: str) -> str:
if source is None:
return ""
# Quick and dirty way to clean up HTML fields.
# Add line breaks
result = source.replace("<br />", "\n")
result = result.replace("<br/>", "\n")
result = result.replace("<tt>", '"')
result = result.replace("</tt>", '"')
# Remove all other tags
result = re.sub(r"(?s)<.*?>", " ", result)
# only replace this after replacing other tags as < and >
# would be caught by the generic substitution
result = result.replace(">", ">")
result = result.replace("<", "<")
return result
def get_variable_value(root_path: str, product: str, name: str, selector: str) -> str:
product_value_full_path = pathlib.Path(root_path).joinpath('build').joinpath(product)\
.joinpath('values').joinpath(f'{name}.yml').absolute()
if not product_value_full_path.exists():
sys.stderr.write(f'Undefined variable {name}\n')
exit(7)
with open(product_value_full_path, 'r') as f:
var_yaml = yaml.load(Loader=yaml.BaseLoader, stream=f)
if 'options' not in var_yaml:
sys.stderr.write(f'No options for {name}\n')
exit(8)
if not selector and 'default' not in var_yaml['options']:
sys.stderr.write(f'No default selector for {name}\n')
exit(10)
if not selector:
return str(var_yaml['options']['default'])
if selector not in var_yaml['options']:
sys.stderr.write(f'Option {selector} does not exist for {name}\n')
exit(9)
else:
return str(var_yaml['options'][selector])
def replace_variables(source: str, variables: dict, root_path: str, product: str) -> str:
result = source
if source:
sub_element_regex = r'<sub idref="([a-z0-9_]+)" \/>'
matches = re.finditer(sub_element_regex, source, re.MULTILINE)
for match in matches:
name = match.group(1)
value = get_variable_value(
root_path, product, name, variables.get(name))
result = result.replace(match.group(), value)
return result
def handle_variables(source: str, variables: dict, root_path: str, product: str) -> str:
result = replace_variables(source, variables, root_path, product)
return html_plain_text(result)
def get_description_root(srg: ET.Element) -> ET.Element:
# DISA adds escaped XML to the description field
# This method unescapes that XML and parses it
description_xml = "<root>"
description_xml += srg.find('xccdf-1.1:description', NS).text.replace('<', '<') \
.replace('>', '>').replace(' & ', '')
description_xml += "</root>"
description_root = ET.ElementTree(ET.fromstring(description_xml)).getroot()
return description_root
def get_srg_dict(xml_path: str) -> dict:
if not pathlib.Path(xml_path).exists():
sys.stderr.write("XML for SRG was not found\n")
sys.stderr.write(f"Could not open file {xml_path} \n")
exit(1)
root = ET.parse(xml_path).getroot()
srgs = dict()
for group in root.findall('xccdf-1.1:Group', NS):
for srg in group.findall('xccdf-1.1:Rule', NS):
srg_id = srg.find('xccdf-1.1:version', NS).text
srgs[srg_id] = dict()
srgs[srg_id]['severity'] = ssg.build_stig.get_severity(srg.get('severity'))
srgs[srg_id]['title'] = srg.find('xccdf-1.1:title', NS).text
description_root = get_description_root(srg)
srgs[srg_id]['vuln_discussion'] = \
html_plain_text(description_root.find('VulnDiscussion').text)
srgs[srg_id]['cci'] = \
srg.find("xccdf-1.1:ident[@system='http://cyber.mil/cci']", NS).text
srgs[srg_id]['fix'] = srg.find('xccdf-1.1:fix', NS).text
srgs[srg_id]['check'] = \
html_plain_text(srg.find('xccdf-1.1:check/xccdf-1.1:check-content', NS).text)
srgs[srg_id]['ia_controls'] = description_root.find('IAControls').text
return srgs
def handle_rule_yaml(product: str, rule_dir: str, env_yaml: dict) -> ssg.build_yaml.Rule:
rule_file = ssg.rules.get_rule_dir_yaml(rule_dir)
rule_yaml = ssg.build_yaml.Rule.from_yaml(rule_file, env_yaml=env_yaml)
rule_yaml.normalize(product)
return rule_yaml
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--control', type=str, action="store", required=True,
help="The control file to parse")
parser.add_argument('-o', '--output', type=str,
help=f"The path to the output. Defaults to {OUTPUT}",
default=OUTPUT)
parser.add_argument("-r", "--root", type=str, action="store", default=SSG_ROOT,
help=f"Path to SSG root directory (defaults to {SSG_ROOT})")
parser.add_argument("-j", "--json", type=str, action="store", default=RULES_JSON,
help=f"Path to the rules_dir.json (defaults to {RULES_JSON})")
parser.add_argument("-p", "--product", type=str, action="store", required=True,
help="What product to get STIGs for")
parser.add_argument("-b", "--build-config-yaml", default=BUILD_CONFIG,
help="YAML file with information about the build configuration.")
parser.add_argument("-m", "--manual", type=str, action="store",
help="Path to XML XCCDF manual file to use as the source of the SRGs",
default=SRG_PATH)
parser.add_argument("--prefer-controls", action="store_true",
help="When creating rows prefer checks and fixes from controls over rules",
default=False)
parser.add_argument("-f", "--out-format", type=str, choices=("csv", "xlsx", "html", "md"),
action="store", help="The format the output should take. Defaults to csv",
default="csv")
return parser.parse_args()
def get_policy_specific_content(key: str, rule_object: ssg.build_yaml.Rule) -> str:
psc = rule_object.policy_specific_content
if not psc:
return ""
stig = psc.get('stig')
if not stig:
return ""
return stig.get(key, "")
def use_rule_content(control: ssg.controls.Control, prefer_controls: bool) -> bool:
if control.fixtext is not None and control.check is not None and prefer_controls:
return False
return True
def handle_control(product: str, control: ssg.controls.Control, env_yaml: ssg.environment,
rule_json: dict, srgs: dict, used_rules: list, root_path: str,
prefer_controls: bool) -> list:
if len(control.selections) > 0 and use_rule_content(control, prefer_controls):
rows = list()
for selection in control.selections:
if selection not in used_rules and selection in control.selected:
rule_object = handle_rule_yaml(product, rule_json[selection]['dir'], env_yaml)
row = create_base_row(control, srgs, rule_object)
if control.levels is not None:
row['Severity'] = ssg.build_stig.get_severity(control.levels[0])
requirement = get_policy_specific_content('srg_requirement', rule_object)
row['Requirement'] = handle_variables(requirement,
control.variables, root_path,
product)
rationale = get_policy_specific_content('vuldiscussion', rule_object)
row['Vul Discussion'] = handle_variables(rationale, control.variables,
root_path, product)
checktext = get_policy_specific_content('checktext', rule_object)
row['Check'] = handle_variables(checktext, control.variables, root_path, product)
fixtext = get_policy_specific_content('fixtext', rule_object)
row['Fix'] = handle_variables(fixtext, control.variables, root_path,
product)
row['STIGID'] = rule_object.identifiers.get('cce', "")
if control.status is not None:
row['Status'] = DisaStatus.from_string(control.status)
else:
row['Status'] = DisaStatus.AUTOMATED
used_rules.append(selection)
rows.append(row)
return rows
else:
return [no_selections_row(control, srgs)]
def no_selections_row(control, srgs):
row = create_base_row(control, srgs, ssg.build_yaml.Rule('null'))
row['Requirement'] = control.title
row['Status'] = DisaStatus.from_string(control.status)
row['Fix'] = control.fixtext
row['Check'] = control.check
row['Vul Discussion'] = html_plain_text(control.rationale)
row["STIGID"] = ""
return row
def create_base_row(item: ssg.controls.Control, srgs: dict,
rule_object: ssg.build_yaml.Rule) -> dict:
row = dict()
srg_id = item.id
if srg_id not in srgs:
print(f"Unable to find SRG {srg_id}. Id in the control must be a valid SRGID.")
exit(4)
srg = srgs[srg_id]
if 'srg' in rule_object.references:
row['SRGID'] = ','.join(rule_object.references['srg'])
else:
row['SRGID'] = srg_id
if 'disa' in rule_object.references:
row['CCI'] = ','.join(rule_object.references['disa'])
else:
row['CCI'] = srg['cci']
row['SRG Requirement'] = srg['title']
row['SRG VulDiscussion'] = html_plain_text(srg['vuln_discussion'])
row['SRG Check'] = html_plain_text(srg['check'])
row['SRG Fix'] = srg['fix']
row['Severity'] = ssg.build_stig.get_severity(srg.get('severity'))
row['IA Control'] = get_iacontrol(row['SRGID'])
row['Mitigation'] = item.mitigation
row['Artifact Description'] = item.artifact_description
row['Status Justification'] = item.status_justification
return row
def setup_csv_writer(csv_file: TextIO) -> csv.DictWriter:
csv_writer = csv.DictWriter(csv_file, HEADERS)
csv_writer.writeheader()
return csv_writer
def get_rule_json(json_path: str) -> dict:
with open(json_path, 'r') as json_file:
rule_json = json.load(json_file)
return rule_json
def check_paths(control_path: str, rule_json_path: str) -> None:
control_full_path = pathlib.Path(control_path).absolute()
if not control_full_path.exists():
sys.stderr.write(f"Unable to find control file {control_full_path}\n")
exit(5)
rule_json_full_path = pathlib.Path(rule_json_path).absolute()
if not rule_json_full_path.exists():
sys.stderr.write(f"Unable to find rule_dirs.json file {rule_json_full_path}\n")
sys.stderr.write("Hint: run ./utils/rule_dir_json.py\n")
exit(2)
def check_product_value_path(root_path: str, product: str) -> None:
product_value_full_path = pathlib.Path(root_path).joinpath('build')\
.joinpath(product).joinpath('values').absolute()
if not pathlib.Path.exists(product_value_full_path):
sys.stderr.write(f"Unable to find values directory for"
f" {product} in {product_value_full_path}\n")
sys.stderr.write(f"Have you built {product}\n")
exit(6)
def get_policy(args, env_yaml) -> ssg.controls.Policy:
policy = ssg.controls.Policy(args.control, env_yaml=env_yaml)
policy.load()
return policy
def handle_csv_output(output: str, results: list) -> str:
with open(output, 'w') as csv_file:
csv_writer = setup_csv_writer(csv_file)
for row in results:
csv_writer.writerow(row)
return output
def handle_xlsx_output(output: str, product: str, results: list) -> str:
output = output.replace('.csv', '.xlsx')
for row in results:
row['IA Control'] = get_iacontrol(row['SRGID'])
xlsx.handle_dict(results, output, f'{product} SRG Mapping')
return output
def handle_html_output(output: str, product: str, results: list) -> str:
for row in results:
row['IA Control'] = get_iacontrol(row['SRGID'])
output = output.replace('.csv', '.html')
html.handle_dict(results, output, f'{product} SRG Mapping')
return output
def handle_md_output(output: str, product: str, results: list) -> str:
output = output.replace('.csv', '.md')
for row in results:
row['IA Control'] = get_iacontrol(row['SRGID'])
md.handle_dict(results, output, f'{product} SRG Mapping')
return output
def handle_output(output: str, results: list, format_type: str, product: str) -> None:
if format_type == 'csv':
output = handle_csv_output(output, results)
elif format_type == 'xlsx':
output = handle_xlsx_output(output, product, results)
elif format_type == 'md':
output = handle_md_output(output, product, results)
elif format_type == 'html':
output = handle_html_output(output, product, results)
print(f'Wrote output to {output}')
def get_env_yaml(root: str, product_path: str, build_config_yaml: str) -> dict:
product_dir = os.path.join(root, "products", product_path)
product_yaml_path = os.path.join(product_dir, "product.yml")
env_yaml = ssg.environment.open_environment(
build_config_yaml, product_yaml_path, os.path.join(root, "product_properties"))
return env_yaml
def rows_match(old: dict, new: dict) -> bool:
must_match = ['Requirement', 'Fix', 'Check']
for k in must_match:
if old[k] != new[k]:
return False
return True
def merge_rows(old: dict, new: dict) -> None:
old["SRGID"] = old["SRGID"] + "," + new["SRGID"]
def extend_results(results: list, rows: list, prefer_controls: bool) -> None:
# We always extend with rows that are generated based on rule selection
# We also only attempt to merge the new row if we prefer control-based
# policy data over rule-based
if len(rows) > 1 or not prefer_controls:
results.extend(rows)
return
if len(rows) == 0:
return
# If we only have one row, possibly with check and fix from the control
# file and not rules, let's find a row with the same requirement, fix
# and check and if they match, merge
new_row = rows[0]
for r in results:
if rows_match(r, new_row):
merge_rows(r, new_row)
return
# We didn't find any match, so we just add the row as new
results.extend(rows)
def main() -> None:
args = parse_args()
check_paths(args.control, args.json)
check_product_value_path(args.root, args.product)
srgs = ssg.build_stig.parse_srgs(args.manual)
product_dir = os.path.join(args.root, "products", args.product)
env_yaml = get_env_yaml(
args.root, args.product, args.build_config_yaml)
policy = get_policy(args, env_yaml)
rule_json = get_rule_json(args.json)
used_rules = list()
results = list()
for control in policy.controls:
rows = handle_control(args.product, control, env_yaml, rule_json, srgs, used_rules,
args.root, args.prefer_controls)
extend_results(results, rows, args.prefer_controls)
handle_output(args.output, results, args.out_format, args.product)
if __name__ == '__main__':
main()