avocado-framework/avocado

View on GitHub
avocado/plugins/diff.py

Summary

Maintainability
D
1 day
Test Coverage
F
58%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2016
# Author: Amador Pahim <apahim@redhat.com>

"""
Job Diff
"""

import argparse
import json
import os
import subprocess
import sys
import tempfile
from difflib import HtmlDiff, unified_diff

from avocado.core import data_dir, exit_codes, jobdata, output
from avocado.core.output import LOG_UI
from avocado.core.plugin_interfaces import CLICmd
from avocado.core.settings import settings
from avocado.core.varianter import Varianter


class Diff(CLICmd):
    """
    Implements the avocado 'diff' subcommand
    """

    name = "diff"
    description = "Shows the difference between 2 jobs."

    def __init__(self):  # pylint: disable=W0231
        self.term = output.TERM_SUPPORT
        self.std_diff_output = True

    def configure(self, parser):
        """
        Add the subparser for the diff action.

        :param parser: The Avocado command line application parser
        :type parser: :class:`avocado.core.parser.ArgumentParser`
        """
        parser = super().configure(parser)

        parser.epilog = (
            "By default, a textual diff report is generated in the standard output."
        )

        help_msg = (
            "A job reference, identified by a (partial) unique ID "
            "(SHA1) or test results directory."
        )
        settings.register_option(
            section="diff",
            key="jobids",
            default=[],
            key_type=list,
            nargs=2,
            help_msg=help_msg,
            parser=parser,
            metavar="JOB",
            positional_arg="jobids",
        )

        help_msg = "Enable HTML output to the FILE where the result should be written."
        settings.register_option(
            section="diff",
            key="html",
            default=None,
            metavar="FILE",
            help_msg=help_msg,
            parser=parser,
            long_arg="--html",
        )

        help_msg = (
            "Generate and open a HTML report in your preferred "
            "browser. If no --html file is provided, create a "
            "temporary file."
        )
        settings.register_option(
            section="diff",
            key="open_browser",
            default=False,
            key_type=bool,
            help_msg=help_msg,
            parser=parser,
            long_arg="--open-browser",
        )

        help_msg = (
            "Comma separated filter of diff sections: "
            "(no)cmdline,(no)time,(no)variants,(no)results, "
            "(no)config,(no)sysinfo (defaults to all enabled)."
        )
        settings.register_option(
            section="diff",
            key="filter",
            metavar="DIFF_FILTER",
            key_type=self._validate_filters,
            help_msg=help_msg,
            default=["cmdline", "time", "variants", "results", "config", "sysinfo"],
            parser=parser,
            long_arg="--diff-filter",
        )

        help_msg = 'Strip the "id" from "id-name;variant" when comparing test results.'
        settings.register_option(
            section="diff",
            key="strip_id",
            default=False,
            key_type=bool,
            help_msg=help_msg,
            parser=parser,
            long_arg="--diff-strip-id",
        )

        help_msg = (
            "Create temporary files with job reports to be used by other diff tools"
        )
        settings.register_option(
            section="diff",
            key="create_reports",
            default=False,
            key_type=bool,
            help_msg=help_msg,
            parser=parser,
            long_arg="--create-reports",
        )

    def run(self, config):
        def _get_name(test):
            return str(test["id"])

        def _get_name_no_id(test):
            return str(test["id"]).split("-", 1)[1]

        job1_dir, job1_id = self._setup_job(config.get("diff.jobids")[0])
        job2_dir, job2_id = self._setup_job(config.get("diff.jobids")[1])

        job1_data = self._get_job_data(job1_dir)
        job2_data = self._get_job_data(job2_dir)

        report_header = "Avocado Job Report\n"
        job1_results = [report_header]
        job2_results = [report_header]

        diff_filter = config.get("diff.filter")
        if "cmdline" in diff_filter:
            cmdline1 = self._get_command_line(job1_dir)
            cmdline2 = self._get_command_line(job2_dir)

            if str(cmdline1) != str(cmdline2):
                command_line_header = ["\n", "# COMMAND LINE\n"]
                job1_results.extend(command_line_header)
                job1_results.append(cmdline1)
                job2_results.extend(command_line_header)
                job2_results.append(cmdline2)

        if "time" in diff_filter:
            time1 = f"{job1_data['time']:.2f} s\n"
            time2 = f"{job2_data['time']:.2f} s\n"

            if str(time1) != str(time2):
                total_time_header = ["\n", "# TOTAL TIME\n"]
                job1_results.extend(total_time_header)
                job1_results.append(time1)
                job2_results.extend(total_time_header)
                job2_results.append(time2)

        if "variants" in diff_filter:
            variants1 = self._get_variants(job1_dir)
            variants2 = self._get_variants(job2_dir)

            if str(variants1) != str(variants2):
                variants_header = ["\n", "# VARIANTS\n"]
                job1_results.extend(variants_header)
                job1_results.extend(variants1)
                job2_results.extend(variants_header)
                job2_results.extend(variants2)

        if "results" in diff_filter:
            results1 = []
            if config.get("diff.strip_id"):
                get_name = _get_name_no_id
            else:
                get_name = _get_name
            for test in job1_data["tests"]:
                test_result = f"{get_name(test)}: {str(test['status'])}\n"
                results1.append(test_result)
            results2 = []
            for test in job2_data["tests"]:
                test_result = f"{get_name(test)}: {str(test['status'])}\n"
                results2.append(test_result)

            if str(results1) != str(results2):
                test_results_header = ["\n", "# TEST RESULTS\n"]
                job1_results.extend(test_results_header)
                job1_results.extend(results1)
                job2_results.extend(test_results_header)
                job2_results.extend(results2)

        if "config" in diff_filter:
            config1 = self._get_config(job1_dir)
            config2 = self._get_config(job2_dir)

            if str(config1) != str(config2):
                config_header = ["\n", "# SETTINGS\n"]
                job1_results.extend(config_header)
                job1_results.extend(config1)
                job2_results.extend(config_header)
                job2_results.extend(config2)

        if "sysinfo" in diff_filter:
            sysinfo_pre1 = self._get_sysinfo(job1_dir, "pre")
            sysinfo_pre2 = self._get_sysinfo(job2_dir, "pre")

            if str(sysinfo_pre1) != str(sysinfo_pre2):
                sysinfo_header_pre = ["\n", "# SYSINFO PRE\n"]
                job1_results.extend(sysinfo_header_pre)
                job1_results.extend(sysinfo_pre1)
                job2_results.extend(sysinfo_header_pre)
                job2_results.extend(sysinfo_pre2)

            sysinfo_post1 = self._get_sysinfo(job1_dir, "post")
            sysinfo_post2 = self._get_sysinfo(job2_dir, "post")

            if str(sysinfo_post1) != str(sysinfo_post2):
                sysinfo_header_post = ["\n", "# SYSINFO POST\n"]
                job1_results.extend(sysinfo_header_post)
                job1_results.extend(sysinfo_post1)
                job2_results.extend(sysinfo_header_post)
                job2_results.extend(sysinfo_post2)

        if config.get("diff.create_reports"):
            self.std_diff_output = False
            prefix = f"avocado_diff_{job1_id[:7]}_"
            tmp_file1 = tempfile.NamedTemporaryFile(
                mode="w", prefix=prefix, suffix=".txt", delete=False
            )
            tmp_file1.writelines(job1_results)
            tmp_file1.close()

            prefix = f"avocado_diff_{job2_id[:7]}_"
            tmp_file2 = tempfile.NamedTemporaryFile(
                mode="w", prefix=prefix, suffix=".txt", delete=False
            )
            tmp_file2.writelines(job2_results)
            tmp_file2.close()

            LOG_UI.info("%s %s", tmp_file1.name, tmp_file2.name)

        html_file = config.get("diff.html")
        open_browser = config.get("diff.open_browser")
        if open_browser and html_file is None:
            prefix = f"avocado_diff_{job1_id[:7]}_{job2_id[:7]}_"
            tmp_file = tempfile.NamedTemporaryFile(
                mode="w", prefix=prefix, suffix=".html", delete=False
            )

            html_file = tmp_file.name

        if html_file is not None:
            self.std_diff_output = False
            try:
                html_diff = HtmlDiff()
                # pylint: disable=W0212
                html_diff._legend = """
                    <table class="diff" summary="Legends">
                    <tr> <td> <table border="" summary="Colors">
                    <tr><th> Colors </th> </tr>
                    <tr><td class="diff_add">&nbsp;Added&nbsp;</td></tr>
                    <tr><td class="diff_chg">Changed</td> </tr>
                    <tr><td class="diff_sub">Deleted</td> </tr>
                    </table></td>
                    <td> <table border="" summary="Links">
                    <tr><th colspan="2"> Links </th> </tr>
                    <tr><td>(f)irst change</td> </tr>
                    <tr><td>(n)ext change</td> </tr>
                    <tr><td>(t)op</td> </tr>
                    </table></td> </tr>
                    </table>"""

                job_diff_html = html_diff.make_file(
                    (_ for _ in job1_results),
                    (_ for _ in job2_results),
                    fromdesc=job1_id,
                    todesc=job2_id,
                )

                with open(html_file, "w", encoding="utf-8") as fp:
                    fp.writelines(job_diff_html)
                LOG_UI.info(html_file)

            except IOError as exception:
                LOG_UI.error(exception)
                sys.exit(exit_codes.AVOCADO_FAIL)

        if open_browser:
            setsid = getattr(os, "setsid", None)
            if not setsid:
                setsid = getattr(os, "setpgrp", None)
            with open(os.devnull, "r+", encoding="utf-8") as in_out:
                cmd = ["xdg-open", html_file]
                subprocess.Popen(  # pylint: disable=W1509
                    cmd,
                    close_fds=True,
                    stdin=in_out,
                    stdout=in_out,
                    stderr=in_out,
                    preexec_fn=setsid,
                )

        if self.std_diff_output:
            if self.term.enabled:
                for line in self._cdiff(
                    unified_diff(
                        job1_results, job2_results, fromfile=job1_id, tofile=job2_id
                    )
                ):
                    LOG_UI.debug(line.strip())
            else:
                for line in unified_diff(
                    job1_results, job2_results, fromfile=job1_id, tofile=job2_id
                ):
                    LOG_UI.debug(line.strip())

    @staticmethod
    def _validate_filters(string):
        input_filter = set(string.split(","))
        include_options = [
            "cmdline",
            "time",
            "variants",
            "results",
            "config",
            "sysinfo",
        ]
        exclude_options = [
            "nocmdline",
            "notime",
            "novariants",
            "noresults",
            "noconfig",
            "nosysinfo",
        ]
        invalid = input_filter.difference(include_options + exclude_options + ["all"])
        if invalid:
            msg = f"Invalid option(s) '{','.join(invalid)}'"
            raise argparse.ArgumentTypeError(msg)
        if input_filter.intersection(exclude_options):
            output_filter = [
                _ for _ in include_options if ("no" + _) not in input_filter
            ]
        elif "all" in input_filter:
            output_filter = include_options
        else:
            output_filter = input_filter

        return output_filter

    @staticmethod
    def _get_job_data(jobdir):
        results_json = os.path.join(jobdir, "results.json")
        with open(results_json, "r", encoding="utf-8") as json_file:
            data = json.load(json_file)

        return data

    @staticmethod
    def _setup_job(job_id):
        resultsdir = data_dir.get_job_results_dir(job_id)
        if resultsdir is None:
            LOG_UI.error("Can't find job results directory for '%s'", job_id)
            sys.exit(exit_codes.AVOCADO_FAIL)

        with open(os.path.join(resultsdir, "id"), "r", encoding="utf-8") as id_file:
            sourcejob = id_file.read().strip()

        return resultsdir, sourcejob

    @staticmethod
    def _get_command_line(resultsdir):
        command_line = jobdata.retrieve_cmdline(resultsdir)
        if command_line is not None:
            return f"{' '.join(command_line)}\n"

        return "Not found\n"

    @staticmethod
    def _get_variants(resultsdir):
        results = []
        variants = Varianter.from_resultsdir(resultsdir)
        if variants is not None:
            for variant in variants:
                results.extend(variant.to_str(variants=2).splitlines())
        else:
            results.append("Not found\n")

        return results

    @staticmethod
    def _get_config(resultsdir):
        config_file = os.path.join(resultsdir, "replay", "config")
        try:
            with open(config_file, "r", encoding="utf-8") as conf:
                return conf.readlines()
        except IOError:
            return ["Not found\n"]

    @staticmethod
    def _get_sysinfo(resultsdir, pre_post):
        sysinfo_dir = os.path.join(resultsdir, "sysinfo", pre_post)
        sysinfo = []
        for path, _, files in os.walk(sysinfo_dir):
            for name in sorted(files):
                name_header = ["\n", f"** {name} **\n"]
                sysinfo.extend(name_header)
                with open(
                    os.path.join(path, name), "r", encoding="utf-8"
                ) as sysinfo_file:
                    try:
                        sysinfo.extend(sysinfo_file.readlines())
                    except UnicodeDecodeError:
                        msg = f"Ignoring file {name} as it cannot be decoded."
                        LOG_UI.debug(msg)
                        continue

        if sysinfo:
            del sysinfo[0]

        return sysinfo

    def _cdiff(self, diff):
        for line in diff:
            if line.startswith("+"):
                yield self.term.COLOR_GREEN + line
            elif line.startswith("-"):
                yield self.term.COLOR_RED + line
            elif line.startswith("@"):
                yield self.term.COLOR_BLUE + line
            else:
                yield line