avocado-framework/avocado

View on GitHub
avocado/core/varianter.py

Summary

Maintainability
C
7 hrs
Test Coverage
C
72%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2014
#
# Authors: Ruda Moura <rmoura@redhat.com>
#          Ademar Reis <areis@redhat.com>
#          Lucas Meneghel Rodrigues <lmr@redhat.com>

"""
Base classes for implementing the varianter interface
"""

import hashlib
import json
import os

from avocado.core import dispatcher, output, tree
from avocado.utils import astring

VARIANTS_FILENAME = "variants.json"


def is_empty_variant(variant):
    """
    Reports whether the variant contains any data

    :param variant: Avocado test variant (list of TreeNode-like objects)
    :return: True when the variant does not contain (any useful) data
    """
    return not variant or variant == [tree.TreeNode()] * len(variant)


def generate_variant_id(variant):
    """
    Basic function to generate variant-id from a variant

    :param variant: Avocado test variant (list of TreeNode-like objects)
    :return: String compounded of ordered node names and a hash of all
             values.
    """

    def get_variant_name(variant):
        """
        To get the variant full name string

        :param variant: Avocado test variant (list of TreeNode-like objects)
        :return: Complete variant name string
        """
        full_name = []
        for node in variant:
            var_str = []
            while node:
                var_str.append(node.name)
                node = node.parent if hasattr(node, "parent") else None
            try:
                # Let's drop repeated node names and empty string
                full_name.extend([x for x in var_str[::-1][1:] if x not in full_name])
            except IndexError:
                pass
        return "-".join(full_name)

    variant = sorted(variant, key=lambda x: x.path)
    fingerprint = "\n".join(_.fingerprint() for _ in variant)
    return (
        get_variant_name(variant)
        + "-"
        + hashlib.sha1(fingerprint.encode(astring.ENCODING)).hexdigest()[:4]
    )


def variant_to_str(variant, verbosity, out_args=None, debug=False):
    """
    Reports human readable representation of a variant

    :param variant: Valid variant (list of TreeNode-like objects)
    :param verbosity: Output verbosity where 0 means brief
    :param out_args: Extra output arguments (currently unused)
    :param debug: Whether the variant contains and should report debug info
    :return: Human readable representation
    """
    del out_args
    out = []
    if not debug:
        paths = ", ".join([x.path for x in variant["variant"]])
    else:
        color = output.TERM_SUPPORT.LOWLIGHT
        cend = output.TERM_SUPPORT.ENDC
        paths = ", ".join(
            [
                f"{_.name}{color}@{getattr(_, 'yaml', 'Unknown')}{cend}"
                for _ in variant["variant"]
            ]
        )
    out.append(
        "%sVariant %s:    %s"
        % ("\n" if verbosity else "", variant["variant_id"], paths)
    )
    if verbosity:
        env = set()
        for node in variant["variant"]:
            for key, value in node.environment.items():
                origin = node.environment.origin[key].path
                env.add((f"{origin}:{key}", astring.to_text(value)))
        if not env:
            return out
        fmt = "    %%-%ds => %%s" % max(len(_[0]) for _ in env)  # pylint: disable=C0209
        for record in sorted(env):
            out.append(fmt % record)
    return out


def dump_variant(variant):
    """Dump a variant into a json-serializable representation

    :param variant: Valid variant (list of TreeNode-like objects)
    :return: json-serializable representation
    """

    def dump_tree_node(node):
        """
        Turns TreeNode-like object into tuple(path, env_representation)
        """
        return (
            astring.to_text(node.path),
            [
                (
                    astring.to_text(node.environment.origin[key].path),
                    astring.to_text(key),
                    value,
                )
                for key, value in node.environment.items()
            ],
        )

    safe_variant = {}
    safe_variant["paths"] = [astring.to_text(pth) for pth in variant.get("paths")]
    safe_variant["variant_id"] = variant.get("variant_id")
    safe_variant["variant"] = [dump_tree_node(_) for _ in variant.get("variant", [])]
    return safe_variant


def dump_ivariants(ivariants):
    """
    Walks the iterable variants and dumps them into json-serializable object
    """
    variants = []
    for variant in ivariants():
        variants.append(dump_variant(variant))
    return variants


class FakeVariantDispatcher:
    """
    This object can act instead of VarianterDispatcher to report loaded
    variants.
    """

    def __init__(self, state):
        for variant in state:
            variant["variant"] = [
                tree.TreeNodeEnvOnly(path, env) for path, env in variant["variant"]
            ]
        self.variants = state

    def map_method_with_return(self, method, *args, **kwargs):
        """
        Reports list containing one result of map_method on self
        """
        if hasattr(self, method):
            return [getattr(self, method)(*args, **kwargs)]
        else:
            return []

    def to_str(self, summary=0, variants=0, **kwargs):  # pylint: disable=W0613
        if not self.variants:
            return ""
        out = []
        for variant in self.variants:
            paths = ", ".join([x.path for x in variant["variant"]])
            out.append(f"\nVariant {variant['variant_id']}:    {paths}")
            env = set()
            for node in variant["variant"]:
                for key, value in node.environment.items():
                    origin = node.environment.origin[key].path
                    env.add((f"{origin}:{key}", astring.to_text(value)))
            if not env:
                continue
            fmt = "    %%-%ds => %%s" % max(  # pylint: disable=C0209
                len(_[0]) for _ in env
            )
            for record in sorted(env):
                out.append(fmt % record)
        return "\n".join(out)

    def __iter__(self):
        return iter(self.variants)

    def __len__(self):
        return sum(1 for _ in self)


class Varianter:
    """
    This object takes care of producing test variants
    """

    def __init__(self, debug=False, state=None):
        """
        :param debug: Store whether this instance should debug varianter
        :param state: Force-varianter state
        :note: it's necessary to check whether variants debug is enable
               in order to provide the right results.
        """
        if state is None:
            self.debug = debug
            self.node_class = tree.TreeNodeDebug if debug else tree.TreeNode
            self._variant_plugins = dispatcher.VarianterDispatcher()
            self._no_variants = None
        else:
            self.load(state)

    def parse(self, config):
        """
        Apply options defined on the cmdline and initialize the plugins.

        :param config: Configuration received from configuration files, command
                       line parser, etc.
        :type config: dict
        """
        self._variant_plugins.map_method_with_return("initialize", config)
        self._no_variants = sum(self._variant_plugins.map_method_with_return("__len__"))

    def is_parsed(self):
        """
        Reports whether the varianter was already parsed
        """
        return self._no_variants is not None

    def to_str(self, summary=0, variants=0, **kwargs):
        """
        Return human readable representation

        The summary/variants accepts verbosity where 0 means do not display
        at all and maximum is up to the plugin.

        :param summary: How verbose summary to output (int)
        :param variants: How verbose list of variants to output (int)
        :param kwargs: Other free-form arguments
        :rtype: str
        """
        if self._no_variants == 0:  # No variants
            return ""

        out = [
            item
            for item in self._variant_plugins.map_method_with_return(
                "to_str", summary, variants, **kwargs
            )
            if item
        ]

        return "\n\n".join(out)

    def get_number_of_tests(self, test_suite):
        """
        :return: overall number of tests * number of variants
        """
        # Currently number of tests is symmetrical
        if self._no_variants:
            return len(test_suite) * self._no_variants
        else:
            return len(test_suite)

    def dump(self):
        """
        Dump the variants in loadable-state

        This is lossy representation which takes all yielded variants and
        replaces the list of nodes with TreeNodeEnvOnly representations::

            [{'path': path,
              'variant_id': variant_id,
              'variant': dump_tree_nodes(original_variant)},
             {'path': [str, str, ...],
              'variant_id': str,
              'variant': [(str, [(str, str, object), ...])],
             {'path': ['/run/*'],
              'variant_id': 'cat-26c0'
              'variant': [('/pig/cat',
                           [('/pig', 'ant', 'fox'),
                            ('/pig/cat', 'dog', 'bee')])]}
             ...]

        where `dump_tree_nodes` looks like::

            [(node.path, environment_representation),
             (node.path, [(path1, key1, value1), (path2, key2, value2), ...]),
             ('/pig/cat', [('/pig', 'ant', 'fox')])

        :return: loadable Varianter representation
        """
        if not self.is_parsed():
            raise NotImplementedError(
                "Dumping Varianter state before multiplexation is not supported."
            )
        return dump_ivariants(self.itertests)

    def load(self, state):
        """
        Load the variants state

        Current implementation supports loading from a list of loadable
        variants. It replaces the VariantDispatcher with fake implementation
        which reports the loaded (and initialized) variants.

        :param state: loadable Varianter representation
        """
        self.debug = False
        self.node_class = tree.TreeNode
        self._variant_plugins = FakeVariantDispatcher(state)
        self._no_variants = sum(self._variant_plugins.map_method_with_return("__len__"))

    def itertests(self):
        """
        Yields all variants of all plugins

        The variant is defined as dictionary with at least:
         * variant_id - name of the current variant
         * variant - AvocadoParams-compatible variant (usually a list of
                     TreeNodes but dict or simply None are also possible
                     values)
         * paths - default path(s)

        :yield variant
        """
        if self._no_variants:  # Copy template and modify it's params
            plugins_variants = self._variant_plugins.map_method_with_return("__iter__")
            iter_variants = (
                variant
                for plugin_variants in plugins_variants
                for variant in plugin_variants
            )
            for variant in iter(iter_variants):
                yield variant
        else:  # No real variants, but currently *something* needs to be returned
            yield {
                "variant": self.node_class("").get_leaves(),
                "variant_id": None,
                "paths": ["/run/*"],
            }

    @classmethod
    def from_resultsdir(cls, resultsdir):
        """
        Retrieves the job variants objects from the results directory.

        This will return a list of variants since a Job can have multiple
        suites and the variants is per suite.
        """
        path = os.path.join(resultsdir, "jobdata", VARIANTS_FILENAME)
        if not os.path.exists(path):
            return None

        variants = []
        with open(path, "r", encoding="utf-8") as variants_file:
            for variant in json.load(variants_file):
                variants.append(cls(state=variant))
        return variants

    def __len__(self):
        return self._no_variants