build-scripts/compose_ds.py
#!/usr/bin/python3
import argparse
import os
import sys
import time
import glob
import xml.etree.ElementTree as ET
from ssg.build_sce import collect_sce_checks
from ssg.constants import (
cat_namespace, datastream_namespace, oval_namespace, sce_namespace,
XCCDF12_NS, xlink_namespace)
import ssg.xml
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
ID_NS = "org.open-scap"
component_ref_prefix = "#scap_org.open-scap_cref_"
# Inspired by openscap ds_sds_mangle_filepath() function
def mangle_path(path):
path = path.replace('/', '-')
path = path.replace('@', '-')
path = path.replace('~', '-')
return path
# From the list generated by collect_sce_checks, extract the path to the check content,
# and embed the script into the data stream
def embed_sce_checks_in_datastream(datastreamtree, checklists, checks, build_dir):
sce_files = collect_sce_checks(datastreamtree)
for file in sce_files:
path = os.path.join(build_dir, file)
mangled_path = mangle_path(file)
with open(path, 'rt', encoding='utf8') as fd:
sce_script_content = fd.read()
component_id = "scap_{}_ecomp_{}".format(ID_NS, mangled_path)
component = ET.SubElement(
datastreamtree, '{%s}extended-component' % datastream_namespace,
attrib={
'id': component_id,
'timestamp': get_timestamp(path)
})
# Append the file content
script_data = ET.SubElement(component, '{%s}script' % sce_namespace)
script_data.text = sce_script_content
# Create a component reference to map the checklist to the extended component
component_ref_id = "scap_{}_cref_{}".format(ID_NS, mangled_path)
component_ref = ET.SubElement(
checks, '{%s}component-ref' % datastream_namespace,
attrib={
'id': component_ref_id,
('{%s}href' % xlink_namespace): '#' + component_id
})
# Add the component reference to the catalog of XCCDF checklists
checklists_component_ref = checklists.find(
"{%s}component-ref" % datastream_namespace)
catalog = checklists_component_ref.find('{%s}catalog' % cat_namespace)
uri = ET.SubElement(
catalog, '{%s}uri' % cat_namespace,
attrib={
'name': file,
'uri': '#' + component_ref_id
})
def move_patches_up_to_date_to_source_data_stream_component(datastreamtree):
ds_checklists = datastreamtree.find(
".//{%s}checklists" % datastream_namespace)
checklists_component_ref = ds_checklists.find(
"{%s}component-ref" % datastream_namespace)
checklists_component_ref_id = checklists_component_ref.get('id')
# The component ID is the component-ref href without leading '#'
checklists_component_id = checklists_component_ref.get(
'{%s}href' % xlink_namespace)[1:]
# Locate the <xccdf:check> element of an <xccdf:Rule> with id
# security_patches_up_to_date
checklist_component = None
oval_check = None
ds_components = datastreamtree.findall(
".//{%s}component" % datastream_namespace)
for ds_component in ds_components:
if ds_component.get('id') == checklists_component_id:
checklist_component = ds_component
if checklist_component is None:
# Something strange happened
sys.stderr.write(
"Couldn't find <component> %s referenced by <component-ref> %s" % (
checklists_component_id, checklists_component_ref_id))
sys.exit(1)
rules = checklist_component.findall(".//{%s}Rule" % XCCDF12_NS)
for rule in rules:
if rule.get('id').endswith('rule_security_patches_up_to_date'):
rule_checks = rule.findall("{%s}check" % XCCDF12_NS)
for check in rule_checks:
if check.get('system') == oval_namespace:
oval_check = check
break
if oval_check is None:
# The component doesn't have a security patches up to date rule
# with an OVAL check
return
# SCAP 1.3 demands multi-check true if the Rules
# security_patches_up_to_date is evaluated by multiple OVAL patch class
# definitinos. See 3.2.4.3, SCAP 1.3 standard (NIST.SP.800-126r3)
oval_check.set('multi-check', 'true')
check_content_ref = oval_check.find('{%s}check-content-ref' % XCCDF12_NS)
href_url = check_content_ref.get('href')
# Use URL's path to define the component name and URI
# Path attribute returned from urlparse contains a leading '/', when
# mangling it it will get replaced by '-'. Let's strip the '/' to avoid
# a sequence of "_-" in the component-ref ID.
component_ref_name = mangle_path(urlparse(href_url).path[1:])
component_ref_uri = component_ref_prefix + component_ref_name
# update @href to refer the datastream component name
check_content_ref.set('href', component_ref_name)
# Add a uri refering the component in Rule's Benchmark component-ref
# catalog
uri_exists = False
catalog = checklists_component_ref.find('{%s}catalog' % cat_namespace)
uris = catalog.findall("{%s}uri" % cat_namespace)
for uri in uris:
if uri.get('name') == component_ref_name:
uri_exists = True
return
if not uri_exists:
uri = ET.Element('{%s}uri' % cat_namespace)
uri.set('name', component_ref_name)
uri.set('uri', component_ref_uri)
catalog.append(uri)
# The component-ref ID is the catalog uri without leading '#'
component_ref_feed_id = component_ref_uri[1:]
# Add the component-ref to list of datastreams' checks
check_component_ref_exists = False
ds_checks = datastreamtree.find(".//{%s}checks" % datastream_namespace)
check_component_refs = ds_checks.findall(
"{%s}component-ref" % datastream_namespace)
for check_component_ref in check_component_refs:
if check_component_ref.get('id') == component_ref_feed_id:
check_component_ref_exists = True
return
if not check_component_ref_exists:
component_ref_feed = ET.Element(
'{%s}component-ref' % datastream_namespace)
component_ref_feed.set('id', component_ref_feed_id)
component_ref_feed.set('{%s}href' % xlink_namespace, href_url)
ds_checks.append(component_ref_feed)
def parse_args():
parser = argparse.ArgumentParser(description="Compose an SCAP source data \
stream from individual SCAP components")
parser.add_argument("--build-dir", help="the '/build' directory")
parser.add_argument("--xccdf", help="XCCDF 1.2 checklist file name")
parser.add_argument("--oval", help="OVAL file name")
parser.add_argument("--ocil", help="OCIL file name")
parser.add_argument("--cpe-dict", help="CPE dictionary file name")
parser.add_argument("--cpe-oval", help="CPE OVAL file name")
parser.add_argument("--enable-sce", action='store_true', help="Enable building sce data")
parser.add_argument(
"--output", required=True,
help="Output SCAP 1.3 source data stream file name")
parser.add_argument(
"--multiple-ds",
help="Directory where XCCDF, OVAL, OCIL files with lower case prefixes "
"xccdf, oval, ocil are stored to build multiple data streams. "
"Multiple streams are generated in the thin_ds subdirectory. (off: to disable) "
"e.g.: ~/scap-security-guide/build/rhel9/thin_ds_component/",
)
return parser.parse_args()
def get_timestamp(file_name):
source_date_epoch = os.getenv("SOURCE_DATE_EPOCH")
if source_date_epoch:
time_sec = float(source_date_epoch)
else:
time_sec = os.path.getmtime(file_name)
timestamp = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time_sec))
return timestamp
def add_component(
ds_collection, component_ref_parent, component_file_name,
dependencies=None):
if not os.path.exists(component_file_name):
return
component_id = "scap_%s_comp_%s" % (
ID_NS, os.path.basename(component_file_name))
component = ET.SubElement(
ds_collection, "{%s}component" % datastream_namespace)
component.set("id", component_id)
component.set("timestamp", get_timestamp(component_file_name))
component_ref = ET.SubElement(
component_ref_parent, "{%s}component-ref" % datastream_namespace)
component_ref_id = "scap_%s_cref_%s" % (
ID_NS, os.path.basename(component_file_name))
component_ref.set("id", component_ref_id)
component_ref.set("{%s}href" % xlink_namespace, "#" + component_id)
component_root = ET.parse(component_file_name).getroot()
component.append(component_root)
if dependencies:
create_catalog(component_ref, dependencies)
def create_catalog(component_ref, dependencies):
dependencies = [dep for dep in dependencies if os.path.exists(dep)]
if len(dependencies) == 0:
return
catalog = ET.SubElement(component_ref, "{%s}catalog" % cat_namespace)
for dep in dependencies:
uri = ET.SubElement(catalog, "{%s}uri" % cat_namespace)
dep_base_name = os.path.basename(dep)
uri.set("name", dep_base_name)
uri.set("uri", "#scap_%s_cref_%s" % (ID_NS, dep_base_name))
def compose_ds(
build_dir, xccdf_file_name, oval_file_name, ocil_file_name,
cpe_dict_file_name, cpe_oval_file_name, sce_enabled):
ds_collection = ET.Element(
"{%s}data-stream-collection" % datastream_namespace)
name = "from_xccdf_" + os.path.basename(xccdf_file_name)
ds_collection.set("id", "scap_%s_collection_%s" % (ID_NS, name))
ds_collection.set("schematron-version", "1.3")
ds = ET.SubElement(ds_collection, "{%s}data-stream" % datastream_namespace)
ds.set("id", "scap_%s_datastream_%s" % (ID_NS, name))
ds.set("scap-version", "1.3")
ds.set("use-case", "OTHER")
dictionaries = ET.SubElement(ds, "{%s}dictionaries" % datastream_namespace)
checklists = ET.SubElement(ds, "{%s}checklists" % datastream_namespace)
checks = ET.SubElement(ds, "{%s}checks" % datastream_namespace)
cpe_dict_dependencies = [cpe_oval_file_name]
add_component(
ds_collection, dictionaries, cpe_dict_file_name, cpe_dict_dependencies)
xccdf_dependencies = [oval_file_name, ocil_file_name, cpe_oval_file_name]
add_component(
ds_collection, checklists, xccdf_file_name, xccdf_dependencies)
add_component(ds_collection, checks, oval_file_name)
add_component(ds_collection, checks, ocil_file_name)
add_component(ds_collection, checks, cpe_oval_file_name)
if sce_enabled:
embed_sce_checks_in_datastream(ds_collection, checklists, checks, build_dir)
if hasattr(ET, "indent"):
ET.indent(ds_collection, space=" ", level=0)
ds = ET.ElementTree(ds_collection)
# Move reference to remote OVAL content to a source data stream component
move_patches_up_to_date_to_source_data_stream_component(ds)
return ds
def _store_ds(ds, output):
ds.write(output, xml_declaration=True, encoding="utf-8")
def append_id_to_file_name(path, id_):
return "{0}_{2}{1}".format(*os.path.splitext(path) + (id_,))
def add_dir(path, dir):
return os.path.join(os.path.dirname(path), dir, os.path.basename(path))
def _get_thin_ds_output_path(output, file_name):
return add_dir(
append_id_to_file_name(
output,
os.path.splitext(os.path.basename(file_name))[0]
),
"thin_ds"
)
def _compose_multiple_ds(args):
for xccdf in glob.glob("{}/xccdf*.xml".format(args.multiple_ds)):
oval = xccdf.replace("xccdf", "oval")
ocil = xccdf.replace("xccdf", "ocil")
cpe_dict = xccdf.replace("xccdf", "cpe_dict")
cpe_oval = xccdf.replace("xccdf", "cpe_oval")
ds = compose_ds(
args.build_dir, xccdf, oval, ocil, cpe_dict, cpe_oval, args.enable_sce
)
output = _get_thin_ds_output_path(args.output, xccdf.replace("xccdf_", ""))
if not os.path.exists(os.path.dirname(output)):
os.makedirs(os.path.dirname(output))
_store_ds(ds, output)
if __name__ == "__main__":
args = parse_args()
ssg.xml.register_namespaces()
if args.multiple_ds != "off":
_compose_multiple_ds(args)
ds = compose_ds(
args.build_dir, args.xccdf, args.oval, args.ocil, args.cpe_dict,
args.cpe_oval, args.enable_sce
)
_store_ds(ds, args.output)