ComplianceAsCode/content

View on GitHub
utils/ansible_playbook_to_role.py

Summary

Maintainability
C
1 day
Test Coverage
#!/usr/bin/python3

from __future__ import print_function

from tempfile import mkdtemp
import io
import os
import os.path
import sys
import shutil
import re
import argparse
import getpass
import yaml
import collections


SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
PLAYBOOK_ROOT = os.path.join(SSG_ROOT, "build", "ansible")

try:
    from github import Github, InputGitAuthor, UnknownObjectException
except ImportError:
    print("Please install PyGithub, on Fedora it's in the python-PyGithub package.",
          file=sys.stderr)
    raise SystemExit(1)


try:
    import ssg.ansible
    import ssg.yaml
    from ssg.utils import mkdir_p
except ImportError:
    print("Unable to find the ssg module. Please run 'source .pyenv'", file=sys.stderr)
    raise SystemExit(1)


def memoize(f):
    memo = {}

    def helper(x):
        if x not in memo:
            memo[x] = f(x)
        return memo[x]
    return helper

# The following code preserves ansible yaml order
# code from arcaduf's gist
# https://gist.github.com/arcaduf/8edbe5900372f0dd30aa037272dfe826
_mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG


def dict_representer(dumper, data):
    return dumper.represent_mapping(_mapping_tag, data.items())


def dict_constructor(loader, node):
    return collections.OrderedDict(loader.construct_pairs(node))


yaml.add_representer(collections.OrderedDict, dict_representer)
yaml.add_constructor(_mapping_tag, dict_constructor)
# End arcaduf gist

PRODUCT_ALLOWLIST = set([
    "rhel8",
    "rhel9",
])

PROFILE_ALLOWLIST = set([
    "anssi_nt28_enhanced",
    "anssi_nt28_high",
    "anssi_nt28_intermediary",
    "anssi_nt28_minimal",
    "anssi_bp28_enhanced",
    "anssi_bp28_high",
    "anssi_bp28_intermediary",
    "anssi_bp28_minimal",
    "C2S",
    "cis",
    "cjis",
    "hipaa",
    "cui",
    "ospp",
    "pci-dss",
    "rht-ccp",
    "stig",
    "rhvh-stig",
    "rhvh-vpp",
    "e8",
    "ism",
])


ORGANIZATION_NAME = "RedHatOfficial"
GIT_COMMIT_AUTHOR_NAME = "ComplianceAsCode development team"
GIT_COMMIT_AUTHOR_EMAIL = "scap-security-guide@lists.fedorahosted.org"
META_TEMPLATE_PATH = os.path.join(
    os.path.dirname(os.path.abspath(__file__)),
    "ansible_galaxy_meta_template.yml"
)
README_TEMPLATE_PATH = os.path.join(
    os.path.dirname(os.path.abspath(__file__)),
    "ansible_galaxy_readme_template.md"
)


def create_empty_repositories(github_new_repos, github_org):
    for github_new_repo in github_new_repos:
        print("Creating new Github repository: %s" % github_new_repo)
        github_org.create_repo(
            github_new_repo,
            description="Role generated from ComplianceAsCode Project",
            homepage="https://github.com/ComplianceAsCode/content/",
            private=False,
            has_issues=False,
            has_wiki=False,
            has_downloads=False)


def clone_and_init_repository(parent_dir, organization, repo):
    os.system(
        "git clone git@github.com:%s/%s" % (organization, repo))
    os.system("ansible-galaxy init " + repo + " --force")
    os.chdir(repo)
    try:
        os.system('git add .')
        os.system('git commit -a -m "Initial commit" --author "%s <%s>"'
                  % (GIT_COMMIT_AUTHOR_NAME, GIT_COMMIT_AUTHOR_EMAIL))
        os.system('git push origin main')
    finally:
        os.chdir("..")


def update_repo_release(github, repo):
    repo_tags = [tag for tag in repo.get_tags()]
    try:
        (majv, minv, rel) = repo_tags[0].name.split(".")
        rel = int(rel) + 1
    except IndexError:
        cac = github.get_repo("ComplianceAsCode/content")
        cac_tags = [tag for tag in cac.get_tags() if tag.name != "v0.5.0-InitialDraft"]
        (majv, minv, rel) = cac_tags[0].name.strip("v").split(".")

    new_tag = ("%s.%s.%s" % (majv, minv, rel))
    commits = repo.get_commits()
    print("Tagging new release '%s' for repo '%s'" % (new_tag, repo.name))
    repo.create_git_tag_and_release(new_tag, '', '', '', commits[0].sha, 'commit')


class PlaybookToRoleConverter():
    PRODUCED_FILES = ('defaults/main.yml', 'meta/main.yml', 'tasks/main.yml', 'vars/main.yml',
                      'README.md')

    def __init__(self, local_playbook_filename):
        self._local_playbook_filename = local_playbook_filename

        # ansible language doesn't allow pre_tasks for roles, if the only pre task
        # is the ansible version check we can ignore it because the minimal version
        # is in role metadata
        if "pre_tasks" in self._playbook[0]:
            pre_tasks_data = self._playbook[0]["pre_tasks"]
            if len(pre_tasks_data) == 1 and \
                    pre_tasks_data[0]["name"] == \
                    ssg.ansible.ansible_version_requirement_pre_task_name:
                pass
            else:
                sys.stderr.write(
                    "%s contains pre_tasks other than the version check. "
                    "pre_tasks are not supported for ansible roles and "
                    "will be skipped!.\n")

    @property
    @memoize
    def name(self):
        root, _ = os.path.splitext(os.path.basename(self._local_playbook_filename))
        product, _, profile = root.split("-", 2)
        return "%s_%s" % (product, profile.replace("-", "_").lower())

    @property
    @memoize
    def product(self):
        # Returns the first part [product] of name.
        # ex: rhel7_stig
        # returns: rhel7
        return self.name.split("_")[0]

    @property
    @memoize
    def profile(self):
        # Returns the second part [profile] of name.
        # ex: rhe7_anssi_nt28_enhanced
        # returns: anssi_nt28_enhanced
        return self.name.split("_", 1)[1]

    @property
    @memoize
    def tasks_data(self):
        return self._playbook[0]["tasks"] if "tasks" in self._playbook[0] else []

    @property
    @memoize
    def tasks_local_content(self):
        return yaml.dump(self.tasks_data, width=120, default_flow_style=False) \
            .replace('\n- ', '\n\n- ')

    @property
    @memoize
    def default_vars_data(self):
        return self._playbook[0]["vars"] if "vars" in self._playbook[0] else []

    @property
    @memoize
    def added_variables(self):
        variables = set()
        for task in self.tasks_data:
            if "tags" not in task:
                next
            if "when" not in task:
                task["when"] = []
            elif isinstance(task["when"], str):
                task["when"] = [task["when"]]

            variables_to_add = {self._sanitize_tag(tag)
                                for tag in task["tags"] if self._tag_is_valid_variable(tag)}
            task["when"] = ["{varname} | bool".format(
                varname=v) for v in sorted(variables_to_add)] + task["when"]
            variables.update(variables_to_add)

            if not task["when"]:
                del task["when"]
        return variables

    @property
    def vars_data(self):
        return []

    @property
    @memoize
    def title(self):
        try:
            title = re.search(r'Profile Title:\s+(.+)$', self._description, re.MULTILINE).group(1)
            return '"' + title + '"'
        except AttributeError:
            return re.search(r'Ansible Playbook for\s+(.+)$', self._description, re.MULTILINE) \
                     .group(1)

    @property
    @memoize
    def description_md(self):
        # This is for a role and not a playbook
        description = re.sub(r'Playbook', "Role", self._description)

        # Fix the description format for markdown so that it looks pretty
        return description.replace('\n', '  \n')

    @property
    @memoize
    def _playbook(self):
        return ssg.yaml.ordered_load(self._raw_playbook)

    @property
    @memoize
    def _raw_playbook(self):
        with io.open(self._local_playbook_filename, 'r', encoding="utf-8") as f:
            return f.read()

    @property
    @memoize
    def platform_version(self):
        platform = self.product
        # Check to see if this is RHEL product
        if platform in PRODUCT_ALLOWLIST:
            # For RHEL, we can get what version
            if 'rhel' in platform:
                return platform[-1]
            return "7\n    - 8"
        return "TBD"

    @property
    @memoize
    def _description(self):
        separator = "#" * 79
        offset_from_separator = 3
        first_separator_pos = self._raw_playbook.find(separator)
        second_separator_pos = self._raw_playbook.find(separator,
                                                       first_separator_pos + len(separator))
        description_start = first_separator_pos + len(separator) + offset_from_separator
        description_stop = second_separator_pos - offset_from_separator
        description = self._raw_playbook[description_start:description_stop]
        description = description.replace('# ', '')
        description = description.replace('#', '')

        desc = ""
        # Remove SCAP and Playbook examples from description as they don't belong in roles.
        for line in description.split("\n"):
            if line.startswith("Profile ID:"):
                break
            else:
                desc += (line + "\n")
        return desc.strip("\n\n")

    @property
    def _update_galaxy_tags(self):
        galaxy_tags = {}
        # These are the default tags that all roles share
        tags = [
            "system",
            "hardening",
            "openscap",
            "ssg",
            "scap",
            "security",
            "compliance",
            "complianceascode",
            "redhatofficial",
            "redhat",
        ]
        prod = self.product
        prof = self.profile

        tags.append(prod)
        tags.append(prof.replace("_", ""))

        if prof == 'stig':
            tags.append("disa")

        if 'anssi' in prof:
            tags.append("anssi")

        galaxy_tags['galaxy_tags'] = tags
        return galaxy_tags

    def _tag_is_valid_variable(self, tag):
        if "DISA-STIG" in tag:
            return True

        # rules of kind package_* and service_* can have hyphen in their rule IDs
        pattern = re.compile('(package_.*_(installed|removed))|(service_.*_(enabled|disabled))')
        if pattern.match(tag):
            return True

        return '-' not in tag and tag != 'always'

    def _sanitize_tag(self, tag):
        return tag.replace("-", "_")

    def file(self, filepath):
        if filepath == 'tasks/main.yml':
            return self.tasks_local_content
        elif filepath == 'vars/main.yml':
            if len(self.vars_data) < 1:
                return "---\n# defaults file for {role_name}\n".format(role_name=self.name)
            else:
                return yaml.dump(self.vars_data, width=120, indent=4, default_flow_style=False)
        elif filepath == 'README.md':
            return self._generate_readme_content()
        elif filepath == 'defaults/main.yml':
            return self._generate_defaults_content()
        elif filepath == 'meta/main.yml':
            return self._generate_meta_content()

    def _generate_readme_content(self):
        with io.open(README_TEMPLATE_PATH, 'r',  encoding="utf-8") as f:
            readme_template = f.read()

        local_readme_content = readme_template.replace(
            "@DESCRIPTION@", self.description_md)
        local_readme_content = local_readme_content.replace(
            "@TITLE@", self.title)
        local_readme_content = local_readme_content.replace(
            "@MIN_ANSIBLE_VERSION@", ssg.ansible.min_ansible_version)
        local_readme_content = local_readme_content.replace(
            "@ROLE_NAME@", self.name)
        return local_readme_content

    def _generate_meta_content(self):
        with open(META_TEMPLATE_PATH, 'r') as f:
            meta_template = f.read()
        local_meta_content = meta_template.replace(
            "@ROLE_NAME@", self.name)
        local_meta_content = local_meta_content.replace(
            "@DESCRIPTION@", self.title)
        local_meta_content = local_meta_content.replace(
            "@PLATFORM_VERSION@", self.platform_version)
        local_meta_content = local_meta_content.replace(
            "@GALAXY_TAGS@", yaml.dump(self._update_galaxy_tags).replace("- ", "  - "))
        return local_meta_content.replace(
            "@MIN_ANSIBLE_VERSION@", ssg.ansible.min_ansible_version)

    def _generate_defaults_content(self):
        default_vars_to_add = sorted(self.added_variables)
        default_vars_local_content = yaml.dump(self.default_vars_data, width=120, indent=4,
                                               default_flow_style=False)
        header = [
            "---", "# defaults file for {role_name}\n".format(role_name=self.name),
        ]
        lines = ["{var_name}: true".format(var_name=var_name) for var_name in default_vars_to_add]
        lines.append("")

        return ("%s%s%s" % ("\n".join(header), default_vars_local_content, "\n".join(lines)))

    def save_to_disk(self, directory):
        print("Converting Ansible Playbook {} to Ansible Role {}".format(self._local_playbook_filename, os.path.join(directory, self.name)))
        for filename in self.PRODUCED_FILES:
            abs_path = os.path.join(directory, self.name, filename)
            mkdir_p(os.path.dirname(abs_path))
            open(abs_path, 'wb').write(self.file(filename).encode("utf-8"))


class RoleGithubUpdater(object):
    def __init__(self, repo, local_playbook_filename):
        self.remote_repo = repo
        self.role = PlaybookToRoleConverter(local_playbook_filename)

    def _local_content(self, filepath):
        new_content = self.role.file(filepath)
        return new_content

    def _get_blob_content(self, branch, path_name):
        """
        see:
        https://github.com/PyGithub/PyGithub/issues/661
        """
        ref = self.remote_repo.get_git_ref(f'heads/{branch}')
        tree = self.remote_repo.get_git_tree(ref.object.sha, recursive='/' in path_name).tree
        sha = [x.sha for x in tree if x.path == path_name]
        if not sha:
            return None
        blob = self.remote_repo.get_git_blob(sha[0])
        import base64
        b64 = base64.b64decode(blob.content)
        return (b64.decode("utf8"), sha[0])

    def _get_contents(self, path_name, branch='main'):
        """
        First try to use traditional's github API to get package contents,
        since this API can't fetch file size more than 1MB, use another API when failed.
        """
        content = self.remote_repo.get_contents(path_name, ref=branch)
        if content.content:
            return (content.decoded_content.decode("utf-8"), content.sha)

        blob = self._get_blob_content(branch, path_name)
        if blob is None:
            raise UnknownObjectException(
                'unable to locate file: ' + path_name + ' in branch: ' + branch)
        return blob

    def _remote_content(self, filepath):
        # We want the raw string to compare against _local_content

        # New repos use main instead of master
        branch = 'master'
        if "rhel9" in self.remote_repo.full_name:
            branch = 'main'

        content, sha = self._get_contents(filepath, branch)
        return content, sha

    def _update_content_if_needed(self, filepath):
        remote_content, sha = self._remote_content(filepath)

        if self._local_content(filepath) != remote_content:
            self.remote_repo.update_file(
                filepath,
                "Updated " + filepath,
                self._local_content(filepath),
                sha,
                author=InputGitAuthor(
                    GIT_COMMIT_AUTHOR_NAME, GIT_COMMIT_AUTHOR_EMAIL)
            )
            print("Updating %s in %s" % (filepath, self.remote_repo.name))

    def update_repository(self):
        print("Processing %s..." % self.remote_repo.name)

        for path in PlaybookToRoleConverter.PRODUCED_FILES:
            self._update_content_if_needed(path)

        repo_description = (
            "{title} - Ansible role generated from ComplianceAsCode Project"
            .format(title=self.role.title))
        self.remote_repo.edit(
            self.remote_repo.name,
            description=repo_description,
            homepage="https://github.com/complianceascode/content",
        )


def parse_args():
    parser = argparse.ArgumentParser(
        description='Generates Ansible Roles and pushes them to Github')
    parser.add_argument(
        "--build-playbooks-dir",
        help="Path to directory containing the generated Ansible Playbooks. "
        "Most likely this is going to be ./build/ansible. Defaults to {}".format(PLAYBOOK_ROOT),
        dest="build_playbooks_dir", default=PLAYBOOK_ROOT)
    parser.add_argument(
        "--dry-run", "-d", dest="dry_run",
        help="Do not push Ansible Roles to Github, store them only to the given local directory."
    )
    parser.add_argument(
        "--organization", "-o", default=ORGANIZATION_NAME,
        help="Name of the Github organization to publish roles to. "
             "Defaults to {}.".format(ORGANIZATION_NAME))
    parser.add_argument(
        "--profile", "-p", default=[], action="append",
        metavar="PROFILE", choices=PROFILE_ALLOWLIST,
        help="What profiles to upload, if not specified, upload all that are applicable.")
    parser.add_argument(
        "--product", "-r", default=[], action="append",
        metavar="PRODUCT", choices=PRODUCT_ALLOWLIST,
        help="What products to upload, if not specified, upload all that are applicable.")
    parser.add_argument(
        "--tag-release", "-n", default=False, action="store_true",
        help="Tag a new release in GitHub. Defaults to False.")
    parser.add_argument(
        "--token", "-t", dest="token",
        help="GitHub token used for organization authorization")
    return parser.parse_args()


def locally_clone_and_init_repositories(organization, repo_list):
    temp_dir = mkdtemp()
    current_dir = os.getcwd()
    os.chdir(temp_dir)
    try:
        for repo in repo_list:
            clone_and_init_repository(temp_dir, organization, repo)
    finally:
        os.chdir(current_dir)
        shutil.rmtree(temp_dir)


def select_roles_to_upload(product_allowlist, profile_allowlist,
                           build_playbooks_dir):
    selected_roles = dict()
    for filename in sorted(os.listdir(build_playbooks_dir)):
        root, ext = os.path.splitext(filename)
        if ext == ".yml":
            # the format is product-playbook-profile.yml
            product, _, profile = root.split("-", 2)
            if product in product_allowlist and profile in profile_allowlist:
                role_name = "ansible-role-%s-%s" % (product, profile)
                selected_roles[role_name] = (product, profile)
    return selected_roles


def main():
    args = parse_args()

    product_allowlist = set(PRODUCT_ALLOWLIST)
    profile_allowlist = set(PROFILE_ALLOWLIST)

    potential_roles = {
        ("ansible-role-%s-%s" % (product, profile))
        for product in product_allowlist for profile in profile_allowlist
    }

    if args.product:
        product_allowlist &= set(args.product)
    if args.profile:
        profile_allowlist &= set(args.profile)

    selected_roles = select_roles_to_upload(
        product_allowlist, profile_allowlist, args.build_playbooks_dir
    )

    if args.dry_run:
        for product_profile in selected_roles.values():
            playbook_filename = "%s-playbook-%s.yml" % product_profile
            playbook_full_path = os.path.join(
                args.build_playbooks_dir, playbook_filename)
            PlaybookToRoleConverter(playbook_full_path).save_to_disk(args.dry_run)
    else:
        if not args.token:
            print("Input your GitHub credentials:")
            username = input("username or token: ")
            password = getpass.getpass("password (or empty for token): ")
        else:
            username = args.token
            password = ""

        github = Github(username, password)
        github_org = github.get_organization(args.organization)
        github_repositories = [repo.name for repo in github_org.get_repos()]

        # Create empty repositories
        github_new_repos = sorted(list(set(map(str.lower, selected_roles.keys())) - set(map(str.lower, github_repositories))))
        if github_new_repos:
            create_empty_repositories(github_new_repos, github_org)

            locally_clone_and_init_repositories(args.organization, github_new_repos)

        # Update repositories
        for repo in sorted(github_org.get_repos(), key=lambda repo: repo.name):
            if repo.name in selected_roles:
                playbook_filename = "%s-playbook-%s.yml" % selected_roles[repo.name]
                playbook_full_path = os.path.join(
                    args.build_playbooks_dir, playbook_filename)
                RoleGithubUpdater(repo, playbook_full_path).update_repository()
                if args.tag_release:
                    update_repo_release(github, repo)
            elif repo.name not in potential_roles:
                print("Repo '%s' is not managed by this script. "
                      "It may need to be deleted, please verify and do that "
                      "manually!" % repo.name, file=sys.stderr)


if __name__ == "__main__":
    main()