ComplianceAsCode/content

View on GitHub
tests/automatus.py

Summary

Maintainability
D
1 day
Test Coverage
#!/usr/bin/python3
from __future__ import print_function

import argparse
import contextlib
import logging
import os
import os.path
import re
import sys
import tempfile
import textwrap
import time
from glob import glob

ssg_dir = os.path.join(os.path.dirname(__file__), "..")
sys.path.append(ssg_dir)

from ssg_test_suite.log import LogHelper
import ssg_test_suite.oscap
import ssg_test_suite.test_env
import ssg_test_suite.profile
import ssg_test_suite.rule
import ssg_test_suite.combined
import ssg_test_suite.template
from ssg_test_suite import xml_operations
from ssg.constants import DERIVATIVES_PRODUCT_MAPPING


def parse_args():
    parser = argparse.ArgumentParser()

    common_parser = argparse.ArgumentParser(add_help=False)
    common_parser.set_defaults(test_env=None)

    backends = common_parser.add_mutually_exclusive_group(required=True)

    backends.add_argument(
        "--docker", dest="docker", metavar="BASE_IMAGE",
        help="Use Docker test environment with this base image.")
    backends.add_argument(
        "--container", dest="container", metavar="BASE_IMAGE",
        help="Use container test environment with this base image.")

    backends.add_argument(
        "--libvirt", dest="libvirt", metavar=("HYPERVISOR", "DOMAIN"), nargs=2,
        help="libvirt hypervisor and domain name. When the leading URI driver protocol "
        "is omitted from the hypervisor, qemu:/// protocol is assumed. "
        "Example of a hypervisor domain name tuple: system ssg-test-suite")

    common_parser.add_argument(
        "--datastream", dest="datastream", metavar="DATASTREAM",
        help="Path to the Source data stream on this machine which is going to be tested. "
        "If not supplied, autodetection is attempted by looking into the build directory.")

    common_parser.add_argument(
        "--product", dest="product", metavar="PRODUCT", default=None,
        help="Product to interpret tests as being run under; autodetected from data stream "
        "if it follows the ssg-<product>-ds*.xml naming convention.")

    benchmarks = common_parser.add_mutually_exclusive_group()
    benchmarks.add_argument(
        "--xccdf-id", dest="xccdf_id", metavar="REF-ID", default=None,
        help="Reference ID related to benchmark to be used.")

    benchmarks.add_argument(
        "--xccdf-id-number", dest="xccdf_id_number", metavar="REF-ID-SELECT", type=int, default=0,
        help="Selection number of reference ID related to benchmark to be used.")

    common_parser.add_argument(
            "--add-platform",
            metavar="<CPE REGEX>",
            default=None,
            help="DEPRECATED: Use --remove-platforms instead; "
            "Find all CPEs that are present in local OpenSCAP's CPE dictionary "
            "that match the provided regex, "
            "and add them as platforms to all data stream benchmarks. "
            "If the regex doesn't match anything, it will be treated "
            "as a literal CPE, and added as a platform. "
            "For example, use 'cpe:/o:fedoraproject:fedora:30' or 'enterprise_linux'.")
    common_parser.add_argument(
        "--remove-fips-certified",
        action="store_true",
        help="Remove dependencies on rule installed_OS_is_FIPS_certified from "
        "all OVAL definitions that depend on it.")
    common_parser.add_argument(
            "--remove-platforms",
            default=False,
            action="store_true",
            help="Remove any platforms from the Benchmark XML elements and Profile "
            "XML elements, essentially making the content applicable to any platform. "
            "Although more low level platforms such as packages or container/machine "
            "CPE are still applicable.")
    common_parser.add_argument(
            "--make-applicable-in-containers",
            default=False,
            action="store_true",
            help="Removes platform constraints from rules "
            "to enable testing these rules on container backends.")
    common_parser.add_argument(
            "--remove-ocp4-only",
            default=False,
            action="store_true",
            help="Removes ocp4-only platform constraint from rules "
            "to enable testing these rules on ocp4 backends.")
    common_parser.add_argument("--loglevel",
                               dest="loglevel",
                               metavar="LOGLEVEL",
                               default="INFO",
                               help="Default level of console output")
    common_parser.add_argument("--logdir",
                               dest="logdir",
                               metavar="LOGDIR",
                               default=None,
                               help="Directory to which all output is saved")

    common_parser.add_argument(
        "--mode",
        dest="scanning_mode",
        default="online",
        choices=("online", "offline"),
        help="What type of check to use - either "
        "Online check done by running oscap inside the concerned system, or "
        "offline check that examines the filesystem from the host "
        "(either may require extended privileges).")

    common_parser.add_argument(
        "--remediate-using",
        dest="remediate_using",
        default="oscap",
        choices=ssg_test_suite.oscap.REMEDIATION_RULE_RUNNERS.keys(),
        help="What type of remediations to use - openscap online one, "
        "or remediation done by using remediation roles "
        "that are saved to disk beforehand.")

    common_parser.add_argument(
        "--duplicate-templates",
        dest="duplicate_templates",
        default=False,
        action="store_true",
        help="Execute all tests even for tests using shared templates; "
        "otherwise, executes one test per template type")

    common_parser.add_argument(
        "--keep-snapshots",
        dest="keep_snapshots",
        action="store_true",
        help="Do not remove existing snapshots created during previous tests.")

    subparsers = parser.add_subparsers(dest="subparser_name",
                                       help="Subcommands: profile, rule, combined")
    subparsers.required = True

    parser_profile = subparsers.add_parser("profile",
                                           formatter_class=argparse.RawDescriptionHelpFormatter,
                                           epilog=textwrap.dedent("""\
                    In case that tested profile contains rules which might prevent root ssh access
                    to the testing VM consider unselecting these rules. To unselect certain rules
                    from a data stream use `ds_unselect_rules.sh` script. List of such rules already
                    exists, see `unselect_rules_list` file.
                    Example usage:
                        ./ds_unselect_rules.sh ../build/ssg-fedora-ds.xml unselect_rules_list
                                           """),
                                           help=("Testing profile-based "
                                                 "remediation applied on already "
                                                 "installed machine"),
                                           parents=[common_parser])
    parser_profile.set_defaults(func=ssg_test_suite.profile.perform_profile_check)
    parser_profile.add_argument("target",
                                nargs="+",
                                metavar="DSPROFILE",
                                help=("Profiles to be tested, 'ALL' means every "
                                      "profile of particular benchmark will be "
                                      "evaluated."))

    parser_rule = subparsers.add_parser("rule",
                                        help=("Testing remediations of particular "
                                              "rule for various situations"),
                                        parents=[common_parser])
    parser_rule.set_defaults(func=ssg_test_suite.rule.perform_rule_check)
    parser_rule.add_argument(
        "target",
        nargs="+",
        metavar="RULE",
        help=(
            "Rule or rules to be tested. Special value 'ALL' means every "
            "rule-testing scenario will be evaluated. The SSG rule ID prefix "
            "is appended automatically if not provided. Wildcards to match "
            "multiple rules are accepted."
            )
        )
    parser_rule.add_argument("--debug",
                             dest="manual_debug",
                             action="store_true",
                             help=("If an error is encountered, all execution "
                                   "on the VM / container will pause to allow "
                                   "debugging."))
    parser_rule.add_argument("--dontclean",
                             dest="dont_clean",
                             action="store_true",
                             help="Do not remove html reports of successful runs")
    parser_rule.add_argument("--no-reports",
                             dest="no_reports",
                             action="store_true",
                             help="Do not run oscap with --report and --results options")
    parser_rule.add_argument("--scenarios",
                             dest="scenarios_regex",
                             default=None,
                             help="Regular expression matching test scenarios to run")
    parser_rule.add_argument("--profile",
                             dest="scenarios_profile",
                             default=None,
                             help="Override the profile used for test scenarios."
                                  " Variable selections will be done according "
                                  "to this profile.")
    parser_rule.add_argument("--slice",
                             dest='_slices',
                             # real dest is postprocessed later:
                             # 'slice_current' and 'slice_total'
                             metavar=('X', 'Y'),
                             default=[1, 1],
                             nargs=2,
                             type=int,
                             help=("Allows to run only Xth slice of Y in total, to enable "
                                   "stable parallelization of the bigger test sets."))

    parser_combined = subparsers.add_parser("combined",
                                            help=("Tests all rules in a profile evaluating them "
                                                  "against their test scenarios."),
                                            parents=[common_parser])
    parser_combined.set_defaults(func=ssg_test_suite.combined.perform_combined_check)
    parser_combined.add_argument("--dontclean",
                                 dest="dont_clean",
                                 action="store_true",
                                 help="Do not remove html reports of successful runs")
    parser_combined.add_argument("--no-reports",
                                 dest="no_reports",
                                 action="store_true",
                                 help="Do not run oscap with --report and --results options")
    parser_combined.add_argument("--scenarios",
                                 dest="scenarios_regex",
                                 default=None,
                                 help="Regular expression matching test scenarios to run")
    parser_combined.add_argument("--slice",
                                 dest='_slices',
                                 # real dest is postprocessed later:
                                 # 'slice_current' and 'slice_total'
                                 metavar=('X', 'Y'),
                                 default=[1, 1],
                                 nargs=2,
                                 type=int,
                                 help=("Allows to run only Xth slice of Y in total, to enable "
                                       "stable parallelization of the bigger test sets."))
    parser_combined.add_argument("target",
                                 nargs="+",
                                 metavar="TARGET",
                                 help=("Profiles whose rules are to be tested. Each rule selected "
                                       "in a profile will be evaluated against all its test "
                                       "scenarios."))

    parser_template = subparsers.add_parser("template",
                                            help=("Tests all rules in a template evaluating them "
                                                  "against their test scenarios."),
                                            parents=[common_parser])
    parser_template.set_defaults(func=ssg_test_suite.template.perform_template_check)
    parser_template.add_argument("--dontclean",
                                 dest="dont_clean",
                                 action="store_true",
                                 help="Do not remove html reports of successful runs")
    parser_template.add_argument("--no-reports",
                                 dest="no_reports",
                                 action="store_true",
                                 help="Do not run oscap with --report and --results options")
    parser_template.add_argument("--scenarios",
                                 dest="scenarios_regex",
                                 default=None,
                                 help="Regular expression matching test scenarios to run")
    parser_template.add_argument("--profile",
                                 dest="scenarios_profile",
                                 default=None,
                                 help="Override the profile used for test scenarios."
                                      " Variable selections will be done according "
                                      "to this profile.")
    parser_template.add_argument("--slice",
                                 dest='_slices',
                                 # real dest is postprocessed later:
                                 # 'slice_current' and 'slice_total'
                                 metavar=('X', 'Y'),
                                 default=[1, 1],
                                 nargs=2,
                                 type=int,
                                 help=("Allows to run only Xth slice of Y in total, to enable "
                                       "stable parallelization of the bigger test sets."))
    parser_template.add_argument("target",
                                 nargs="+",
                                 metavar="TARGET",
                                 help=("Template whose rules are to be tested. Each rule using"
                                       "a template will be evaluated against all its test "
                                       "scenarios."))

    options = parser.parse_args()
    if options.subparser_name in ["rule", "combined", "template"]:
        options.slice_current, options.slice_total = options._slices
        if options.slice_current < 1:
            raise argparse.ArgumentTypeError('Current slice needs to be positive integer')
        if options.slice_total < 1:
            raise argparse.ArgumentTypeError('Number of slices needs to be positive integer')
        if options.slice_current > options.slice_total:
            raise argparse.ArgumentTypeError(
                'Current slice cannot be greater than number of slices')
    return options


def get_logging_dir(options):
    body = 'custom'
    if 'ALL' in options.target:
        body = 'ALL'

    generic_logdir_stem = "{0}-{1}".format(options.subparser_name, body)

    if options.logdir is None:

        date_string = time.strftime('%Y-%m-%d-%H%M', time.localtime())
        logging_dir = os.path.join(
            os.getcwd(), 'logs', '{0}-{1}'.format(
                generic_logdir_stem, date_string))
        logging_dir = LogHelper.find_name(logging_dir)
    else:
        logging_dir = LogHelper.find_name(options.logdir)

    return logging_dir


def _print_available_benchmarks(xccdf_ids, n_xccdf_ids):
    logging.info("The data stream contains {0} Benchmarks".format(n_xccdf_ids))
    for i in range(0, n_xccdf_ids):
        logging.info("{0} - {1}".format(i, xccdf_ids[i]))


def auto_select_xccdf_id(datastream, bench_number):
    xccdf_ids = xml_operations.get_all_xccdf_ids_in_datastream(datastream)
    n_xccdf_ids = len(xccdf_ids)

    if n_xccdf_ids == 0:
        msg = ("The provided data stream doesn't contain any Benchmark")
        raise RuntimeError(msg)

    if bench_number < 0 or bench_number >= n_xccdf_ids:
        _print_available_benchmarks(xccdf_ids, n_xccdf_ids)
        logging.info("Selected Benchmark is {0}".format(bench_number))

        msg = ("Please select a valid Benchmark number")
        raise RuntimeError(msg)

    if n_xccdf_ids > 1:
        _print_available_benchmarks(xccdf_ids, n_xccdf_ids)
        logging.info("Selected Benchmark is {0}".format(bench_number))

        logging.info("To select a different Benchmark, "
                     "use --xccdf-id-number option.")

    return xccdf_ids[bench_number]


def get_datastreams():
    ds_glob = "ssg-*-ds.xml"
    build_dir_path = [os.path.dirname(__file__) or ".", "..", "build"]
    glob_pattern = os.path.sep.join(build_dir_path + [ds_glob])
    datastreams = [os.path.normpath(p) for p in glob(glob_pattern)]
    return datastreams


def get_unique_datastream():
    datastreams = get_datastreams()
    if len(datastreams) == 1:
        return datastreams[0]
    msg = ("Autodetection of the data stream file is possible only when there is "
           "a single one in the build dir, but")
    if not datastreams:
        raise RuntimeError(msg + " there is none.")
    raise RuntimeError(
        msg + " there are {0} of them. Use the --datastream option to select "
        "e.g. {1}".format(len(datastreams), datastreams))


def get_product_id(ds_filename):
    product_regex = re.compile(r'^.*ssg-([a-zA-Z0-9]*)-(ds|ds-1\.2)\.xml$')
    match = product_regex.match(ds_filename)
    if not match:
        msg = "Unable to detect product without explicit --product: "
        msg += "data stream {0} lacks product name".format(ds_filename)
        raise RuntimeError(msg)
    product = match.group(1)
    if product in DERIVATIVES_PRODUCT_MAPPING:
        product = DERIVATIVES_PRODUCT_MAPPING[product]
    return product


@contextlib.contextmanager
def datastream_in_stash(current_location):
    tfile = tempfile.NamedTemporaryFile(prefix="ssgts-ds-")

    tfile.write(open(current_location, "rb").read())
    tfile.flush()
    yield tfile.name


def normalize_passed_arguments(options):
    targets = []
    for target in options.target:
        if ',' in target:
            targets.extend(target.split(","))
        else:
            targets.append(target)
    options.target = targets

    if 'ALL' in options.target:
        options.target = ['ALL']

    if not options.datastream:
        options.datastream = get_unique_datastream()

    if not options.product and options.datastream:
        options.product = get_product_id(options.datastream)

    if options.xccdf_id is None:
        options.xccdf_id = auto_select_xccdf_id(options.datastream,
                                                options.xccdf_id_number)
    try:
        bench_id = xml_operations.infer_benchmark_id_from_component_ref_id(
            options.datastream, options.xccdf_id)
        options.benchmark_id = bench_id
    except RuntimeError as exc:
        msg = "Error inferring benchmark ID from component refId: {}".format(str(exc))
        raise RuntimeError(msg)

    if options.docker:
        options.test_env = ssg_test_suite.test_env.DockerTestEnv(
            options.scanning_mode, options.docker)
        logging.info(
            "The base image option has been specified, "
            "choosing Docker-based test environment.")
    elif options.container:
        options.test_env = ssg_test_suite.test_env.PodmanTestEnv(
            options.scanning_mode, options.container)
        logging.info(
            "The base image option has been specified, "
            "choosing Podman-based test environment.")
    else:
        hypervisor, domain_name = options.libvirt
        # Possible hypervisor spec we have to catch: qemu+unix:///session
        if not re.match(r"[\w\+]+:///", hypervisor):
            hypervisor = "qemu:///" + hypervisor
        options.test_env = ssg_test_suite.test_env.VMTestEnv(
            options.scanning_mode, hypervisor, domain_name, options.keep_snapshots)
        logging.info(
            "The base image option has not been specified, "
            "choosing libvirt-based test environment.")

    # Add in product to the test environment. This is independent of actual
    # test environment type so we do it after creation.
    options.test_env.product = options.product
    options.test_env.duplicate_templates = options.duplicate_templates

    try:
        benchmark_cpes = xml_operations.benchmark_get_applicable_platforms(
            options.datastream, options.benchmark_id
        )
        options.benchmark_cpes = benchmark_cpes
    except RuntimeError as exc:
        msg = "Error inferring platform from benchmark: {}".format(str(exc))
        raise RuntimeError(msg)


def main():
    options = parse_args()

    log = logging.getLogger()
    # this is general logger level - needs to be
    # debug otherwise it cuts silently everything
    log.setLevel(logging.DEBUG)

    LogHelper.add_console_logger(log, options.loglevel)

    try:
        normalize_passed_arguments(options)
    except RuntimeError as exc:
        msg = "Error occurred during options normalization: {}".format(str(exc))
        logging.error(msg)
        sys.exit(1)
    # logging dir needs to be created based on other options
    # thus we have to postprocess it

    logging_dir = get_logging_dir(options)

    LogHelper.add_logging_dir(log, logging_dir)

    with datastream_in_stash(options.datastream) as stashed_datastream:
        options.datastream = stashed_datastream

        with xml_operations.datastream_root(stashed_datastream, stashed_datastream) as root:
            if options.remove_platforms:
                xml_operations.remove_platforms(root)
            if options.make_applicable_in_containers:
                xml_operations.make_applicable_in_containers(root)
            if options.remove_ocp4_only:
                xml_operations.remove_ocp4_platforms(root)
            if options.add_platform:
                xml_operations.add_platform_to_benchmark(root, options.add_platform)
            if options.remove_fips_certified:
                xml_operations.remove_fips_certified(root)

        options.func(options)


if __name__ == "__main__":
    main()