avocado-framework/avocado

View on GitHub
optional_plugins/varianter_yaml_to_mux/avocado_varianter_yaml_to_mux/varianter_yaml_to_mux.py

Summary

Maintainability
D
2 days
Test Coverage
A
92%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2016-2017
# Author: Lukas Doktor <ldoktor@redhat.com>

"""Varianter plugin to parse yaml files to params"""

import ast
import collections
import os
import re
import sys

import yaml
from avocado_varianter_yaml_to_mux import mux  # pylint: disable=W0406

from avocado.core import exit_codes
from avocado.core.output import LOG_UI
from avocado.core.plugin_interfaces import CLI, Init, Varianter
from avocado.core.settings import settings
from avocado.utils import astring

try:
    from yaml import CSafeLoader as SafeLoader
except ImportError:
    from yaml import SafeLoader


# Mapping for yaml flags
YAML_INCLUDE = 100
YAML_USING = 101
YAML_REMOVE_NODE = mux.REMOVE_NODE
YAML_REMOVE_VALUE = mux.REMOVE_VALUE
YAML_MUX = 102
YAML_FILTER_ONLY = 103
YAML_FILTER_OUT = 104

__RE_FILE_SPLIT = re.compile(r"(?<!\\):")  # split by ':' but not '\\:'
__RE_FILE_SUBS = re.compile(r"(?<!\\)\\:")  # substitute '\\:' but not '\\\\:'


class ListOfNodeObjects(list):  # Few methods pylint: disable=R0903
    """
    Used to mark list as list of objects from whose node is going to be created
    """


def _normalize_path(path):
    """
    End the path with single '/'

    :param path: original path
    :type path: str
    :returns: path with trailing '/', or None when empty path
    :rtype: str or None
    """
    if not path:
        return
    if path[-1] != "/":
        path += "/"
    return path


def _handle_control_tag(path, node, value):
    """
    Handling of most YAML control tags (all but "!using")

    :param path: path on the YAML
    :type path: str
    :param node: the node in which to handle control tags
    :type node: instance of :class:`avocado.core.tree.TreeNode` or similar
    :param value: the value of the node
    """
    if value[0].code == YAML_INCLUDE:
        # Include file
        ypath = value[1]
        if not os.path.isabs(ypath):
            ypath = os.path.join(os.path.dirname(path), ypath)
        if not os.path.exists(ypath):
            raise ValueError(
                f"File '{ypath}' included from '{path}' " f"does not exist."
            )
        node.merge(_create_from_yaml("/:" + ypath))
    elif value[0].code in (YAML_REMOVE_NODE, YAML_REMOVE_VALUE):
        value[0].value = value[1]  # set the name
        node.ctrl.append(value[0])  # add "blue pill" of death
    elif value[0].code == YAML_MUX:
        node.multiplex = True
    elif value[0].code == YAML_FILTER_ONLY:
        new_value = _normalize_path(value[1])
        if new_value:
            node.filters[0].append(new_value)
    elif value[0].code == YAML_FILTER_OUT:
        new_value = _normalize_path(value[1])
        if new_value:
            node.filters[1].append(new_value)


def _handle_control_tag_using(path, name, using, value):
    """
    Handling of the "!using" YAML control tag

    :param path: path on the YAML
    :type path: str
    :param name: name to be applied in the "!using" tag
    :type name: str
    :param using: whether using is already being used
    :type using: bool
    :param value: the value of the node
    """
    if using:
        raise ValueError(f"!using can be used only once " f"per node! ({path}:{name})")
    using = value
    if using[0] == "/":
        using = using[1:]
    if using[-1] == "/":
        using = using[:-1]
    return using


def _apply_using(name, using, node):
    """
    Create the structure defined by "!using" and return the new root

    :param name: the tag name to have the "!using" applied to
    :type name: str
    :param using: the new location to put the tag into
    :type using: bool
    :param node: the node in which to handle control tags
    :type node: instance of :class:`avocado.core.tree.TreeNode` or similar
    """
    if name != "":
        for name in using.split("/")[::-1]:
            node = mux.MuxTreeNode(name, children=[node])
    else:
        using = using.split("/")[::-1]
        node.name = using.pop()
        while True:
            if not using:
                break
            name = using.pop()  # 'using' is list pylint: disable=E1101
            node = mux.MuxTreeNode(name, children=[node])
        node = mux.MuxTreeNode("", children=[node])
    return node


def _node_content_from_node(path, node, values, using):
    """Processes node values into the current node content"""
    for value in values:
        if isinstance(value, mux.MuxTreeNode):
            node.add_child(value)
        elif isinstance(value[0], mux.Control):
            if value[0].code == YAML_USING:
                using = _handle_control_tag_using(path, node.name, using, value[1])
            else:
                _handle_control_tag(path, node, value)
        elif isinstance(value[1], collections.OrderedDict):
            child = _tree_node_from_values(
                path, astring.to_text(value[0]), value[1], using
            )
            node.add_child(child)
        else:
            node.value[value[0]] = value[1]
    return using


def _node_content_from_dict(path, node, values, using):
    """Processes dict values into the current node content"""
    for key, value in values.items():
        if isinstance(key, mux.Control):
            if key.code == YAML_USING:
                using = _handle_control_tag_using(path, node.name, using, value)
            else:
                _handle_control_tag(path, node, [key, value])
        elif isinstance(value, collections.OrderedDict) or value is None:
            node.add_child(_tree_node_from_values(path, key, value, using))
        elif value == "null":
            node.value[key] = None
        else:
            node.value[key] = value
    return using


def _tree_node_from_values(path, name, values, using):
    """Create `name` node and add values"""
    # Initialize the node
    node = mux.MuxTreeNode(astring.to_text(name))
    if not values:
        return node
    using = ""

    # Fill the node content from parsed values
    if isinstance(values, dict):
        using = _node_content_from_dict(path, node, values, using)
    else:
        using = _node_content_from_node(path, node, values, using)

    # Prefix nodes if tag "!using" was used
    if using:
        node = _apply_using(name, using, node)
    return node


def _mapping_to_tree_loader(loader, node, looks_like_node=False):
    """Maps yaml mapping tag to TreeNode structure"""
    _value = []
    for key_node, value_node in node.value:
        # Allow only strings as dict keys
        if key_node.tag.startswith("!"):  # reflect tags everywhere
            key = loader.construct_object(key_node)
        else:
            key = loader.construct_scalar(key_node)
        # If we are to keep them, use following, but we lose the control
        # for both, nodes and dicts
        # key = loader.construct_object(key_node)
        if isinstance(key, mux.Control):
            looks_like_node = True
        value = loader.construct_object(value_node)
        if isinstance(value, ListOfNodeObjects):
            looks_like_node = True
        _value.append((key, value))

    if not looks_like_node:
        return collections.OrderedDict(_value)

    objects = ListOfNodeObjects()
    for name, values in _value:
        if isinstance(values, ListOfNodeObjects):  # New node from list
            objects.append(
                _tree_node_from_values(loader.path, name, values, loader.using)
            )
        elif values is None:  # Empty node
            objects.append(mux.MuxTreeNode(astring.to_text(name)))
        elif values == "null":
            objects.append((name, None))
        else:  # Values
            objects.append((name, values))
    return objects


def _mux_loader(loader, node):
    """
    Special !mux loader which allows to tag node as 'multiplex = True'.
    """
    if not isinstance(node, yaml.ScalarNode):
        objects = _mapping_to_tree_loader(loader, node, looks_like_node=True)
    else:  # This means it's empty node. Don't call mapping_to_tree_loader
        objects = ListOfNodeObjects()
    objects.append((mux.Control(YAML_MUX), None))
    return objects


class _BaseLoader(SafeLoader):
    """
    YAML loader with additional features related to mux
    """

    SafeLoader.add_constructor("!include", lambda *_: mux.Control(YAML_INCLUDE))
    SafeLoader.add_constructor("!using", lambda *_: mux.Control(YAML_USING))
    SafeLoader.add_constructor("!remove_node", lambda *_: mux.Control(YAML_REMOVE_NODE))
    SafeLoader.add_constructor(
        "!remove_value", lambda *_: mux.Control(YAML_REMOVE_VALUE)
    )
    SafeLoader.add_constructor("!filter-only", lambda *_: mux.Control(YAML_FILTER_ONLY))
    SafeLoader.add_constructor("!filter-out", lambda *_: mux.Control(YAML_FILTER_OUT))
    SafeLoader.add_constructor(
        "tag:yaml.org,2002:python/dict", SafeLoader.construct_yaml_map
    )
    SafeLoader.add_constructor("!mux", _mux_loader)
    SafeLoader.add_constructor(
        yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, _mapping_to_tree_loader
    )


def _create_from_yaml(path):
    """Create tree structure from yaml stream"""

    # Parse file name ([$using:]$path)
    path = __RE_FILE_SPLIT.split(path, 1)
    if len(path) == 1:
        path = __RE_FILE_SUBS.sub(":", path[0])
        using = ["run"]
    else:
        nodes = __RE_FILE_SUBS.sub(":", path[0]).strip("/").split("/")
        using = [node for node in nodes if node]
        if not path[0].startswith("/"):  # relative path, put into /run
            using.insert(0, "run")
        path = __RE_FILE_SUBS.sub(":", path[1])

    # For loader instance needs different "path" and "using" values
    class Loader(_BaseLoader):
        _BaseLoader.path = path
        _BaseLoader.using = using

    # Load the tree
    with open(path, encoding="utf-8") as stream:
        loaded_tree = yaml.load(stream, Loader)  # nosec
        if loaded_tree is None:
            return

        loaded_tree = _tree_node_from_values(path, "", loaded_tree, using)

    # Add prefix
    if using:
        loaded_tree.name = using.pop()
        while True:
            if not using:
                break
            loaded_tree = mux.MuxTreeNode(using.pop(), children=[loaded_tree])
        loaded_tree = mux.MuxTreeNode("", children=[loaded_tree])
    return loaded_tree


def create_from_yaml(paths):
    """Create tree structure from yaml-like file.

    :param paths: File object to be processed
    :raise SyntaxError: When yaml-file is corrupted
    :return: Root of the created tree structure
    """

    def _merge(data, path):
        """Normal run"""
        tmp = _create_from_yaml(path)
        if tmp:
            data.merge(tmp)

    data = mux.MuxTreeNode()
    path = None
    try:
        for path in paths:
            _merge(data, path)
    # Yaml can raise IndexError on some files
    except (yaml.YAMLError, IndexError) as details:
        if "mapping values are not allowed in this context" in astring.to_text(details):
            details = (
                f"{details}\nMake sure !tags and colons are separated "
                f"by a space (eg. !include :)"
            )
        msg = f"Invalid multiplex file '{path}': {details}"
        raise IOError(2, msg, path)
    return data


class YamlToMuxInit(Init):
    """
    YamlToMux initialization plugin
    """

    name = "yaml_to_mux"
    description = "YamlToMux initialization plugin"

    def initialize(self):
        help_msg = (
            "Location of one or more Avocado multiplex (.yaml) "
            "FILE(s) (order dependent)"
        )
        settings.register_option(
            section=self.name, key="files", default=[], key_type=list, help_msg=help_msg
        )

        help_msg = "Filter only path(s) from multiplexing"
        settings.register_option(
            section=self.name,
            key="filter_only",
            default=[],
            key_type=list,
            help_msg=help_msg,
        )

        help_msg = "Filter out path(s) from multiplexing"
        settings.register_option(
            section=self.name, key="filter_out", default=[], help_msg=help_msg
        )

        help_msg = (
            "List of default paths used to determine path priority "
            "when querying for parameters"
        )
        settings.register_option(
            section=self.name,
            key="parameter_paths",
            default=["/run/*"],
            key_type=list,
            help_msg=help_msg,
        )

        help_msg = "Inject [path:]key:node values into the final multiplex tree."
        settings.register_option(
            section=self.name,
            key="inject",
            default=[],
            help_msg=help_msg,
            key_type=list,
        )


class YamlToMuxCLI(CLI):
    """
    Defines arguments for YamlToMux plugin
    """

    name = "yaml_to_mux"
    description = "YamlToMux options for the 'run' subcommand"

    def configure(self, parser):
        """
        Configures "run" and "variants" subparsers
        """
        for name in ("run", "multiplex", "variants"):
            subparser = parser.subcommands.choices.get(name, None)
            if subparser is None:
                continue
            agroup = subparser.add_argument_group("yaml to mux options")
            settings.add_argparser_to_option(
                namespace=f"{self.name}.{'files'}",
                parser=agroup,
                long_arg="--mux-yaml",
                short_arg="-m",
                metavar="FILE",
                nargs="+",
                allow_multiple=True,
            )

            settings.add_argparser_to_option(
                namespace=f"{self.name}.{'filter_only'}",
                parser=agroup,
                long_arg="--mux-filter-only",
                nargs="+",
                allow_multiple=True,
                metavar="FILTER",
            )

            settings.add_argparser_to_option(
                namespace=f"{self.name}.{'filter_out'}",
                parser=agroup,
                long_arg="--mux-filter-out",
                nargs="+",
                allow_multiple=True,
                metavar="FILTER",
            )

            settings.add_argparser_to_option(
                namespace=f"{self.name}.{'parameter_paths'}",
                parser=agroup,
                long_arg="--mux-path",
                nargs="+",
                allow_multiple=True,
                metavar="PATH",
            )

            settings.add_argparser_to_option(
                namespace=f"{self.name}.{'inject'}",
                parser=agroup,
                long_arg="--mux-inject",
                nargs="+",
                allow_multiple=True,
                metavar="PATH_KEY_NODE",
            )

    def run(self, config):
        """
        The YamlToMux varianter plugin handles these
        """


class YamlToMux(mux.MuxPlugin, Varianter):
    """
    Processes the mux options into varianter plugin
    """

    name = "yaml_to_mux"
    description = "Multiplexer plugin to parse yaml files to params"

    def initialize(self, config):
        subcommand = config.get("subcommand")
        data = None

        # Merge the multiplex
        multiplex_files = config.get("yaml_to_mux.files")
        if multiplex_files:
            data = mux.MuxTreeNode()
            try:
                data.merge(create_from_yaml(multiplex_files))
            except IOError as details:
                error_msg = f"{details.strerror} : {details.filename}"
                LOG_UI.error(error_msg)
                if subcommand == "run":
                    sys.exit(exit_codes.AVOCADO_JOB_FAIL)
                else:
                    sys.exit(exit_codes.AVOCADO_FAIL)

        # Extend default multiplex tree of --mux-inject values
        for inject in config.get("yaml_to_mux.inject"):
            entry = inject.split(":", 2)
            if len(entry) < 2:
                raise ValueError(f"key:entry pairs required, " f"found only {entry}")
            elif len(entry) == 2:  # key, entry
                entry.insert(0, "")  # add path='' (root)
            # We try to maintain the data type of the provided value
            try:
                entry[2] = ast.literal_eval(entry[2])
            except (ValueError, SyntaxError, NameError, RecursionError):
                pass
            if data is None:
                data = mux.MuxTreeNode()
            data.get_node(entry[0], True).value[entry[1]] = entry[2]

        if data is not None:
            mux_filter_only = config.get("yaml_to_mux.filter_only")
            mux_filter_out = config.get("yaml_to_mux.filter_out")
            data = mux.apply_filters(data, mux_filter_only, mux_filter_out)
            paths = config.get("yaml_to_mux.parameter_paths")
            self.initialize_mux(data, paths)