ComplianceAsCode/content

View on GitHub
ssg/yaml.py

Summary

Maintainability
A
35 mins
Test Coverage
D
66%
from __future__ import absolute_import
from __future__ import print_function

import codecs
import re
import sys
import yaml

from collections import OrderedDict

from .jinja import load_macros, process_file

try:
    from yaml import CSafeLoader as yaml_SafeLoader
except ImportError:
    from yaml import SafeLoader as yaml_SafeLoader

try:
    from yaml import CLoader as yaml_Loader
except ImportError:
    from yaml import Loader as yaml_Loader

try:
    from yaml import CDumper as yaml_Dumper
except ImportError:
    from yaml import Dumper as yaml_Dumper

def _bool_constructor(self, node):
    return self.construct_scalar(node)


def _unicode_constructor(self, node):
    string_like = self.construct_scalar(node)
    return str(string_like)


# Don't follow python bool case
yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:bool', _bool_constructor)
# Python2-relevant - become able to resolve "unicode strings"
yaml_SafeLoader.add_constructor(u'tag:yaml.org,2002:python/unicode', _unicode_constructor)


class DocumentationNotComplete(Exception):
    pass


def _save_rename(result, stem, prefix):
    result["{0}_{1}".format(prefix, stem)] = stem


def _get_yaml_contents_without_documentation_complete(parsed_yaml, substitutions_dict):
    """
    If the YAML is a mapping, then handle the documentation_complete accordingly,
    and take that key-value out.
    Otherwise, if YAML is empty, or it is a list, pass it on.
    """
    if isinstance(parsed_yaml, dict):
        documentation_incomplete_content_and_not_debug_build = (
            parsed_yaml.pop("documentation_complete", "true") == "false"
            and substitutions_dict.get("cmake_build_type") != "Debug")
        if documentation_incomplete_content_and_not_debug_build:
            raise DocumentationNotComplete("documentation not complete and not a debug build")
    return parsed_yaml


def _open_yaml(stream, original_file=None, substitutions_dict={}):
    """
    Open given file-like object and parse it as YAML.

    Optionally, pass the path to the original_file for better error handling
    when the file contents are passed.

    Raise an exception if it contains "documentation_complete" key set to "false".
    """
    try:
        yaml_contents = yaml.load(stream, Loader=yaml_SafeLoader)

        return _get_yaml_contents_without_documentation_complete(yaml_contents, substitutions_dict)
    except DocumentationNotComplete as e:
        raise e
    except Exception as e:
        count = 0
        _file = original_file
        if not _file:
            _file = stream
        with open(_file, "r") as e_file:
            lines = e_file.readlines()
            for line in lines:
                count = count + 1
                if re.match(r"^\s*\t+\s*", line):
                    print("Exception while handling file: %s" % _file, file=sys.stderr)
                    print("TabIndentationError: Line %s contains tabs instead of spaces:" % (count), file=sys.stderr)
                    print("%s\n\n" % repr(line.strip("\n")), file=sys.stderr)
                    sys.exit(1)

        print("Exception while handling file: %s" % _file, file=sys.stderr)
        raise e


def open_and_expand(yaml_file, substitutions_dict=None):
    """
    Process the file as a template, using substitutions_dict to perform
    expansion. Then, process the expansion result as a YAML content.

    See also: _open_yaml
    """
    if substitutions_dict is None:
        substitutions_dict = dict()

    expanded_template = process_file(yaml_file, substitutions_dict)
    try:
        yaml_contents = _open_yaml(expanded_template, yaml_file, substitutions_dict)
    except yaml.scanner.ScannerError as e:
        print("A Jinja template expansion can mess up the indentation.")
        print("Please, check if the contents below are correctly expanded:")
        print("Source yaml: {}".format(yaml_file))
        print("Expanded yaml:\n{}".format(expanded_template))
        sys.exit(1)

    return yaml_contents


def open_and_macro_expand(yaml_file, substitutions_dict=None):
    """
    Do the same as open_and_expand, but load definitions of macros
    so they can be expanded in the template.
    """
    substitutions_dict = load_macros(substitutions_dict)
    return open_and_expand(yaml_file, substitutions_dict)


def open_raw(yaml_file):
    """
    Open given file-like object and parse it as YAML
    without performing any kind of template processing

    See also: _open_yaml
    """
    with codecs.open(yaml_file, "r", "utf8") as stream:
        yaml_contents = _open_yaml(stream, original_file=yaml_file)
    return yaml_contents


def ordered_load(stream, Loader=yaml_Loader, object_pairs_hook=OrderedDict):
    """
    Drop-in replacement for yaml.load(), but preserves order of dictionaries
    """
    class OrderedLoader(Loader):
        pass

    def construct_mapping(loader, node):
        loader.flatten_mapping(node)
        return object_pairs_hook(loader.construct_pairs(node))
    OrderedLoader.add_constructor(
        yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
        construct_mapping)

    try:
        yaml_loaded = yaml.load(stream, OrderedLoader)
    except yaml.scanner.ScannerError as e:
        print("Error when trying to load the stream: {}".format(e))
        print("HINT: Ansible tasks names may cause expansion errors if not properly quoted.")
        print("HINT: Check for unquoted colons or other special symbols in the stream:")
        print("{}".format(stream))
        sys.exit(1)
    return yaml_loaded


def ordered_dump(data, stream=None, Dumper=yaml_Dumper, **kwds):
    """
    Drop-in replacement for yaml.dump(), but preserves order of dictionaries
    """
    class OrderedDumper(Dumper):
        # fix tag indentations
        def increase_indent(self, flow=False, indentless=False):
            return super(OrderedDumper, self).increase_indent(flow, False)

    def _dict_representer(dumper, data):
        return dumper.represent_mapping(
            yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
            data.items())

    def _str_representer(dumper, data):
        if '\n' in data:
            return dumper.represent_scalar(u'tag:yaml.org,2002:str', data,
                                           style='|')
        else:
            return dumper.represent_str(data)

    OrderedDumper.add_representer(OrderedDict, _dict_representer)
    OrderedDumper.add_representer(str, _str_representer)

    # Fix formatting by adding a space in between tasks
    unformatted_yaml = yaml.dump(data, None, OrderedDumper, **kwds)
    formatted_yaml = re.sub(r"[\n]+([\s]*)- name", r"\n\n\1- name", unformatted_yaml)

    # Fix CDumper issue where it adds yaml document ending '...'
    # in some templated ansible remediations
    formatted_yaml = re.sub(r"\n\s*\.\.\.\s*", r"\n", formatted_yaml)

    if stream is not None:
        return stream.write(formatted_yaml)
    else:
        return formatted_yaml


def _strings_to_list(one_or_more_strings):
    """
    Output a list, that either contains one string, or a list of strings.
    In Python, strings can be cast to lists without error, but with unexpected result.
    """
    if isinstance(one_or_more_strings, str):
        return [one_or_more_strings]
    else:
        return list(one_or_more_strings)


def update_yaml_list_or_string(current_contents, new_contents, prepend=False):
    result = []
    if current_contents:
        result += _strings_to_list(current_contents)
    if new_contents:
        if prepend:
            result = _strings_to_list(new_contents) + result
        else:
            result += _strings_to_list(new_contents)
    if not result:
        result = ""
    if len(result) == 1:
        result = result[0]
    return result


def convert_string_to_bool(string):
    """
    Returns True if string is "true" (in any letter case)
    returns False if "false"
    raises ValueError
    """
    lower = string.lower()
    if lower == "true":
        return True
    elif lower == "false":
        return False
    else:
        raise ValueError(
                    "Invalid value %s while expecting boolean string" % string)