avocado-framework/avocado

View on GitHub
avocado/core/output.py

Summary

Maintainability
D
2 days
Test Coverage
B
81%
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2013-2014
# Author: Lucas Meneghel Rodrigues <lmr@redhat.com>

"""
Manages output and logging in avocado applications.
"""
import errno
import logging
import os
import re
import sys
import traceback

from avocado.core import exit_codes
from avocado.core.settings import settings
from avocado.core.streams import BUILTIN_STREAMS
from avocado.utils import path as utils_path

#: Handle cases of logging exceptions which will lead to recursion error
logging.raiseExceptions = False

#: Pre-defined Avocado root logger
LOG_ROOT = logging.getLogger("avocado")
#: Pre-defined Avocado human UI logger
LOG_UI = logging.getLogger("avocado.app")
#: Pre-defined Avocado job/test logger
LOG_JOB = logging.getLogger("avocado.job")


class TermSupport:

    COLOR_BLUE = "\033[94m"
    COLOR_GREEN = "\033[92m"
    COLOR_YELLOW = "\033[93m"
    COLOR_RED = "\033[91m"
    COLOR_DARKGREY = "\033[90m"

    CONTROL_END = "\033[0m"

    MOVE_BACK = "\033[1D"
    MOVE_FORWARD = "\033[1C"

    ESCAPE_CODES = [
        COLOR_BLUE,
        COLOR_GREEN,
        COLOR_YELLOW,
        COLOR_RED,
        COLOR_DARKGREY,
        CONTROL_END,
        MOVE_BACK,
        MOVE_FORWARD,
    ]

    """
    Class to help applications to colorize their outputs for terminals.

    This will probe the current terminal and colorize output only if the
    stdout is in a tty or the terminal type is recognized.
    """

    def __init__(self):
        self.HEADER = self.COLOR_BLUE
        self.PASS = self.COLOR_GREEN
        self.SKIP = self.COLOR_YELLOW
        self.FAIL = self.COLOR_RED
        self.INTERRUPT = self.COLOR_RED
        self.ERROR = self.COLOR_RED
        self.WARN = self.COLOR_YELLOW
        self.CANCEL = self.COLOR_YELLOW
        self.PARTIAL = self.COLOR_YELLOW
        self.ENDC = self.CONTROL_END
        self.LOWLIGHT = self.COLOR_DARKGREY
        self.enabled = True
        allowed_terms = [
            "linux",
            "rxvt-unicode",
            "rxvt-unicode-256color",
            "screen",
            "screen-256color",
            "screen.xterm-256color",
            "vt100",
            "xterm",
            "xterm-256color",
            "xterm-kitty",
        ]
        term = os.environ.get("TERM")
        config = settings.as_dict()
        colored = config.get("runner.output.colored")
        force_color = config.get("runner.output.color")
        if force_color == "never":
            self.disable()
        elif force_color == "auto":
            if not colored or not os.isatty(1) or term not in allowed_terms:
                self.disable()
        elif force_color != "always":
            raise ValueError(
                "The value for runner.output.color must be one of "
                "'always', 'never', 'auto' and not " + force_color
            )

    def disable(self):
        """
        Disable colors from the strings output by this class.
        """
        self.enabled = False
        self.HEADER = ""
        self.PASS = ""
        self.SKIP = ""
        self.FAIL = ""
        self.INTERRUPT = ""
        self.ERROR = ""
        self.WARN = ""
        self.CANCEL = ""
        self.PARTIAL = ""
        self.ENDC = ""
        self.LOWLIGHT = ""
        self.MOVE_BACK = ""
        self.MOVE_FORWARD = ""

    def header_str(self, msg):
        """
        Print a header string (blue colored).

        If the output does not support colors, just return the original string.
        """
        return self.HEADER + msg + self.ENDC

    def fail_header_str(self, msg):
        """
        Print a fail header string (red colored).

        If the output does not support colors, just return the original string.
        """
        return self.FAIL + msg + self.ENDC

    def warn_header_str(self, msg):
        """
        Print a warning header string (yellow colored).

        If the output does not support colors, just return the original string.
        """
        return self.WARN + msg + self.ENDC

    def healthy_str(self, msg):
        """
        Print a healthy string (green colored).

        If the output does not support colors, just return the original string.
        """
        return self.PASS + msg + self.ENDC

    def partial_str(self, msg):
        """
        Print a string that denotes partial progress (yellow colored).

        If the output does not support colors, just return the original string.
        """
        return self.PARTIAL + msg + self.ENDC

    def pass_str(self, msg="PASS", move=MOVE_BACK):
        """
        Print a pass string (green colored).

        If the output does not support colors, just return the original string.
        """
        return move + self.PASS + msg + self.ENDC

    def skip_str(self, msg="SKIP", move=MOVE_BACK):
        """
        Print a skip string (yellow colored).

        If the output does not support colors, just return the original string.
        """
        return move + self.SKIP + msg + self.ENDC

    def fail_str(self, msg="FAIL", move=MOVE_BACK):
        """
        Print a fail string (red colored).

        If the output does not support colors, just return the original string.
        """
        return move + self.FAIL + msg + self.ENDC

    def error_str(self, msg="ERROR", move=MOVE_BACK):
        """
        Print a error string (red colored).

        If the output does not support colors, just return the original string.
        """
        return move + self.ERROR + msg + self.ENDC

    def interrupt_str(self, msg="INTERRUPT", move=MOVE_BACK):
        """
        Print an interrupt string (red colored).

        If the output does not support colors, just return the original string.
        """
        return move + self.INTERRUPT + msg + self.ENDC

    def warn_str(self, msg="WARN", move=MOVE_BACK):
        """
        Print an warning string (yellow colored).

        If the output does not support colors, just return the original string.
        """
        return move + self.WARN + msg + self.ENDC


#: Transparently handles colored terminal, when one is used
TERM_SUPPORT = TermSupport()


#: A collection of mapping from test statuses to colors to be used
#: consistently across the various plugins
TEST_STATUS_MAPPING = {
    "PASS": TERM_SUPPORT.PASS,
    "ERROR": TERM_SUPPORT.ERROR,
    "FAIL": TERM_SUPPORT.FAIL,
    "SKIP": TERM_SUPPORT.SKIP,
    "WARN": TERM_SUPPORT.WARN,
    "INTERRUPTED": TERM_SUPPORT.INTERRUPT,
    "CANCEL": TERM_SUPPORT.CANCEL,
}


#: A collection of mapping from test status to formatting functions
#: to be used consistently across the various plugins
TEST_STATUS_DECORATOR_MAPPING = {
    "PASS": TERM_SUPPORT.pass_str,
    "ERROR": TERM_SUPPORT.error_str,
    "FAIL": TERM_SUPPORT.fail_str,
    "SKIP": TERM_SUPPORT.skip_str,
    "WARN": TERM_SUPPORT.warn_str,
    "INTERRUPTED": TERM_SUPPORT.interrupt_str,
    "CANCEL": TERM_SUPPORT.skip_str,
}


class _StdOutputFile:
    """
    File-like object which stores (_is_stdout, content) into the provided list
    """

    def __init__(self, is_stdout, records):
        """
        :param is_stdout: Is this output stdout or stderr
        :param records: list to store (is_stdout, written_message) records
        """
        self._is_stdout = is_stdout
        self._records = records

    def write(self, msg):
        """
        Record the message
        """
        self._records.append((self._is_stdout, msg))

    def writelines(self, iterable):
        """
        Record all messages
        """
        for line in iterable:
            self.write(line)

    def close(self):
        """File-object methods"""

    def flush(self):
        """File-object methods"""

    @staticmethod
    def isatty():
        """File-object methods"""
        return False

    def seek(self):
        """File-object methods"""

    def tell(self):
        """File-object methods"""

    def getvalue(self):
        """
        Get all messages written to this "file"
        """
        return "\n".join((_[1] for _ in self._records if _[0] == self._is_stdout))


class StdOutput:
    """
    Class to modify sys.stdout/sys.stderr
    """

    #: List of records of stored output when stdout/stderr is disabled
    records = []

    def __init__(self):
        self.stdout = self._stdout = sys.stdout
        self.stderr = self._stderr = sys.stderr
        self.__configured = False

    @staticmethod
    def _paginator_in_use():
        """
        :return: True when we output into paginator
        """
        return bool(isinstance(sys.stdout, Paginator))

    @property
    def configured(self):
        """
        Determines if a configuration of any sort has been performed
        """
        return self.__configured

    def print_records(self):
        """
        Prints all stored messages as they occurred into streams they were
        produced for.
        """
        try:
            for stream, msg in self.records:
                if stream:
                    sys.stdout.write(msg)
                else:
                    sys.stderr.write(msg)
            del self.records[:]
        # IOError due to EPIPE is ignored.  That is to avoid having to
        # handle all IOErrors at the main application loop
        # indiscriminately.  By handling them here, we can be sure
        # that the failure was due to stdout or stderr not being
        # connected to an open PIPE.
        except IOError as detail:
            if detail.errno != errno.EPIPE:
                raise

    def fake_outputs(self):
        """
        Replace sys.stdout/sys.stderr with in-memory-objects
        """
        sys.stdout = _StdOutputFile(True, self.records)
        sys.stderr = _StdOutputFile(False, self.records)
        self.__configured = True

    def enable_outputs(self):
        """
        Enable sys.stdout/sys.stderr (either with 2 streams or with paginator)
        """
        sys.stdout = self.stdout
        sys.stderr = self.stderr
        self.__configured = True

    def enable_paginator(self):
        """
        Enable paginator
        """
        try:
            paginator = Paginator()
        except RuntimeError as details:
            # Paginator not available
            LOG_UI.error("Failed to enable paginator: %s", details)
            return
        self.stdout = self.stderr = paginator
        self.__configured = True

    def enable_stderr(self):
        """
        Enable sys.stderr and disable sys.stdout
        """
        sys.stdout = open(os.devnull, "w", encoding="utf-8")
        sys.stderr = self.stderr
        self.__configured = True

    def close(self):
        """
        Enable original sys.stdout/sys.stderr and cleanup
        """
        paginator = None
        if self._paginator_in_use():
            paginator = sys.stdout
        self.enable_outputs()
        if paginator:
            paginator.close()


#: Allows modifying the sys.stdout/sys.stderr
STD_OUTPUT = StdOutput()


def early_start():
    """
    Replace all outputs with in-memory handlers
    """
    if os.environ.get("AVOCADO_LOG_EARLY"):
        add_log_handler(LOG_ROOT, logging.StreamHandler, sys.stdout)
        add_log_handler(LOG_JOB, logging.StreamHandler, sys.stdout)
    else:
        STD_OUTPUT.fake_outputs()
        add_log_handler(LOG_ROOT, MemStreamHandler, None)
    LOG_ROOT.level = logging.DEBUG


CONFIG = []


def del_last_configuration():
    if len(CONFIG) == 1:
        return
    configuration = CONFIG.pop()
    for logger_name in configuration:
        disable_log_handler(logger_name)
    configuration = CONFIG[-1]
    for logger_name, handlers in configuration.items():
        logger = logging.getLogger(logger_name)
        for handler in handlers:
            logger.addHandler(handler)


def split_loggers_and_levels(loggers):
    """Separates logger names and legger levels.

    :param loggers: Logger names with or without levels
    :type loggers: List of strings in format STREAM[:LEVEL][,STREAM[:LEVEL][,...]]
    :yields: Logger name and level
    :rtype: tuple(logger_name, logger_level)
    """
    for stream_name in loggers:
        stream_level = re.split(r"(?<!\\):", stream_name, maxsplit=1)
        name = stream_level[0]
        if len(stream_level) == 1:
            yield name, None
        else:
            level = (
                int(stream_level[1])
                if stream_level[1].isdigit()
                else logging.getLevelName(stream_level[1].upper())
            )
            yield name, level


def reconfigure(args):
    """
    Adjust logging handlers accordingly to app args and re-log messages.
    """

    # Remove default handlers from root
    try:
        stderr_root = logging.getLogger().handlers[0]
        if (
            stderr_root.stream is sys.stderr
            and stderr_root.formatter._fmt == "{message}"
            and stderr_root.level == logging.WARNING
        ):
            logging.getLogger().removeHandler(stderr_root)

        stdout_root = logging.getLogger().handlers[0]
        if (
            stdout_root.stream is sys.stdout
            and stdout_root.formatter._fmt == "{message}"
            and stdout_root.level == logging.NOTSET
        ):
            logging.getLogger().removeHandler(stdout_root)
    except (IndexError, AttributeError):
        pass
    # Delete last configuration
    if len(CONFIG) != 0:
        last_configuration = CONFIG[-1]
        for logger_name in last_configuration:
            disable_log_handler(logger_name)

    CONFIG.append({})

    LOG_ROOT.setLevel(logging.DEBUG)

    # Reconfigure stream loggers
    enabled = args.get("core.show")
    if isinstance(enabled, list):
        enabled = set(enabled)
    if not isinstance(enabled, set):
        enabled = set(["app"])
        args["core.show"] = enabled
    if "none" in enabled:
        enabled = set([])
    elif "all" in enabled:
        enabled.update(BUILTIN_STREAMS)
    if os.environ.get("AVOCADO_LOG_EARLY"):
        enabled.add("early")
    # TODO: Avocado relies on stdout/stderr on some places, re-log them here
    # for now. This should be removed once we replace them with logging.
    if enabled:
        if args.get("core.paginator") is True and TERM_SUPPORT.enabled:
            STD_OUTPUT.enable_paginator()
        STD_OUTPUT.enable_outputs()
    else:
        STD_OUTPUT.enable_stderr()
    STD_OUTPUT.print_records()

    for logger_name, logger in BUILTIN_STREAMS.items():
        if logger_name in enabled:
            kwargs = {}
            if logger_name == "app":
                kwargs = {
                    "fmt": logging.Formatter("%(message)s"),
                    "handler_filter": FilterInfoAndLess(),
                }
            add_log_handler(
                logger,
                ProgressStreamHandler,
                STD_OUTPUT.stdout,
                **kwargs,
            )

    # Error handler
    add_log_handler(
        LOG_UI,
        ProgressStreamHandler,
        STD_OUTPUT.stderr,
        logging.DEBUG,
        "%(message)s",
        FilterWarnAndMore(),
    )

    # Add custom loggers
    for name, level in split_loggers_and_levels(
        [_ for _ in enabled if _ not in BUILTIN_STREAMS]
    ):
        try:
            if not level:
                level = logging.DEBUG
            add_log_handler(name, logging.StreamHandler, STD_OUTPUT.stdout, level)
        except ValueError as details:
            LOG_UI.error(
                "Failed to set logger for --show %s:%s: %s.", name, level, details
            )
            sys.exit(exit_codes.AVOCADO_FAIL)

    # Log early_messages
    for handler in LOG_ROOT.handlers:
        if isinstance(handler, MemStreamHandler):
            LOG_ROOT.removeHandler(handler)
    for record in MemStreamHandler.log:
        logging.getLogger(record.name).handle(record)


class FilterWarnAndMore(logging.Filter):
    def filter(self, record):
        return record.levelno >= logging.WARN


class FilterInfoAndLess(logging.Filter):
    def filter(self, record):
        return record.levelno <= logging.INFO


class FilterTestMessage(logging.Filter):
    def filter(self, record):
        return record.module != "messages" and record.funcName != "handle"


class FilterTestMessageOnly(logging.Filter):
    def filter(self, record):
        return record.module == "messages" and record.funcName == "handle"


class ProgressStreamHandler(logging.StreamHandler):
    """
    Handler class that allows users to skip new lines on each emission.
    """

    def emit(self, record):
        try:
            msg = self.format(record)
            if record.levelno < logging.INFO:  # Most messages are INFO
                pass
            elif record.levelno < logging.WARNING:
                msg = TERM_SUPPORT.header_str(msg)
            elif record.levelno < logging.ERROR:
                msg = TERM_SUPPORT.warn_header_str(msg)
            else:
                msg = TERM_SUPPORT.fail_header_str(msg)
            stream = self.stream
            skip_newline = False
            if hasattr(record, "skip_newline"):
                skip_newline = record.skip_newline
            stream.write(msg)
            if not skip_newline:
                stream.write("\n")
            self.flush()
        except (KeyboardInterrupt, SystemExit):  # pylint: disable=W0706
            raise
        except Exception:  # pylint: disable=W0703
            self.handleError(record)


class MemStreamHandler(logging.StreamHandler):
    """
    Handler that stores all records in self.log (shared in all instances)
    """

    log = []

    def emit(self, record):
        self.log.append(record)

    def flush(self):
        """
        This is in-mem object, it does not require flushing
        """


class Paginator:
    """
    Paginator that uses less to display contents on the terminal.

    Contains cleanup handling for when user presses 'q' (to quit less).
    """

    def __init__(self):
        self.pipe = None
        paginator = os.environ.get("PAGER")
        if not paginator:
            try:
                paginator = f"{utils_path.find_command('less')} -FRX"
            except utils_path.CmdNotFoundError as details:
                raise RuntimeError(f"Unable to enable pagination: {details}")

        self.pipe = os.popen(paginator, "w")

    def __del__(self):
        self.close()

    def close(self):
        if self.pipe:
            try:
                self.pipe.close()
            except OSError:
                pass

    def write(self, msg):
        if self.pipe:
            try:
                self.pipe.write(msg)
            except (OSError, ValueError):
                pass

    def flush(self):
        if self.pipe and not self.pipe.closed:
            self.pipe.flush()


def add_log_handler(
    logger,
    klass=logging.StreamHandler,
    stream=sys.stdout,
    level=logging.DEBUG,
    fmt="%(name)s: %(message)s",
    handler_filter=None,
):
    """
    Add handler to a logger.

    :param logger_name: the name of a :class:`logging.Logger` instance, that
                        is, the parameter to :func:`logging.getLogger`
    :param klass: Handler class (defaults to :class:`logging.StreamHandler`)
    :param stream: Logging stream, to be passed as an argument to ``klass``
                   (defaults to ``sys.stdout``)
    :param level: Log level (defaults to `INFO``)
    :param fmt: Logging format (defaults to ``%(name)s: %(message)s``)
    :param handler_filter: Logging filter class based on logging.Filter
    """

    def save_handler(logger_name, handler):
        if not CONFIG:
            CONFIG.append({})
        configuration = CONFIG[-1]
        if logger_name not in configuration:
            configuration[logger_name] = []
        configuration[logger_name].append(handler)

    if isinstance(logger, str):
        logger = logging.getLogger(logger)
    handler = klass(stream)
    handler.setLevel(level)
    if isinstance(fmt, str):
        fmt = logging.Formatter(fmt=fmt)
    if handler_filter:
        handler.addFilter(handler_filter)
    handler.setFormatter(fmt)
    if handler_filter:
        handler.addFilter(handler_filter)

    logger.addHandler(handler)
    save_handler(logger.name, handler)
    return handler


def disable_log_handler(logger):
    if not logger:
        return
    if isinstance(logger, str):
        logger = logging.getLogger(logger)
    # Handlers might be reused elsewhere, can't delete them
    while logger.handlers:
        logger.handlers.pop()
    logger.handlers.append(logging.NullHandler())


class Throbber:
    """
    Produces a spinner used to notify progress in the application UI.
    """

    STEPS = ["-", "\\", "|", "/"]
    # Only print a throbber when we're on a terminal
    if TERM_SUPPORT.enabled:
        MOVES = [
            TERM_SUPPORT.MOVE_BACK + STEPS[0],
            TERM_SUPPORT.MOVE_BACK + STEPS[1],
            TERM_SUPPORT.MOVE_BACK + STEPS[2],
            TERM_SUPPORT.MOVE_BACK + STEPS[3],
        ]
    else:
        MOVES = ["", "", "", ""]

    def __init__(self):
        self.position = 0

    def _update_position(self):
        if self.position == (len(self.MOVES) - 1):
            self.position = 0
        else:
            self.position += 1

    def render(self):
        result = self.MOVES[self.position]
        self._update_position()
        return result


def log_plugin_failures(failures):
    """
    Log in the application UI failures to load a set of plugins

    :param failures: a list of load failures, usually coming from a
                     :class:`avocado.core.dispatcher.Dispatcher`
                     attribute `load_failures`
    """
    msg_fmt = 'Failed to load plugin from module "%s": %s :\n%s'
    config = settings.as_dict()
    silenced = config.get("plugins.skip_broken_plugin_notification")
    for failure in failures:
        if failure[0].module_name in silenced:
            continue
        if hasattr(failure[1], "__traceback__"):
            str_tb = "".join(traceback.format_tb(failure[1].__traceback__))
        else:
            str_tb = "Traceback not available"
        LOG_UI.error(msg_fmt, failure[0].module_name, repr(failure[1]), str_tb)