melexis/warnings-plugin

View on GitHub
src/mlx/warnings/warnings.py

Summary

Maintainability
D
1 day
Test Coverage
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import errno
import glob
import json
import os
import subprocess
import sys
from importlib.metadata import distribution
from pathlib import Path
from string import Template

from ruamel.yaml import YAML

from .exceptions import WarningsConfigError
from .junit_checker import JUnitChecker
from .regex_checker import CoverityChecker, DoxyChecker, SphinxChecker, XMLRunnerChecker
from .robot_checker import RobotChecker

__version__ = distribution('mlx.warnings').version


def substitute_envvar(checker_config, keys):
    """Modifies configuration for checker in-place, resolving any environment variables for ``keys``

    Args:
        checker_config (dict): Configuration for a specific WarningsChecker
        keys (set): Set of keys to process the value of

    Raises:
        WarningsConfigError: Failed to find an environment variable
    """
    for key in keys:
        if key in checker_config and isinstance(checker_config[key], str):
            template_obj = Template(checker_config[key])
            try:
                checker_config[key] = template_obj.substitute(os.environ)
            except KeyError as err:
                raise WarningsConfigError(f"Failed to find environment variable {err} for configuration value {key!r}")\
                    from None


class WarningsPlugin:

    def __init__(self, verbose=False, config_file=None, cq_enabled=False):
        '''
        Function for initializing the parsers

        Args:
            verbose (bool): optional - enable verbose logging
            config_file (Path): optional - configuration file with setup
            cq_enabled (bool): optional - enable generation of Code Quality report
        '''
        self.activated_checkers = {}
        self.verbose = verbose
        self.cq_enabled = cq_enabled
        self.public_checkers = [SphinxChecker(self.verbose), DoxyChecker(self.verbose), JUnitChecker(self.verbose),
                                XMLRunnerChecker(self.verbose), CoverityChecker(self.verbose),
                                RobotChecker(self.verbose)]

        if config_file:
            with open(config_file, 'r', encoding='utf-8') as open_file:
                if config_file.suffix.lower().startswith('.y'):
                    config = YAML().load(open_file)
                else:
                    config = json.load(open_file)
            self.config_parser(config)

        self.warn_min = 0
        self.warn_max = 0
        self.count = 0
        self.printout = False

    def activate_checker(self, checker):
        '''
        Activate additional checkers after initialization

        Args:
            checker (WarningsChecker): checker object
        '''
        checker.cq_enabled = self.cq_enabled and checker.name in ('doxygen', 'sphinx', 'xmlrunner')
        self.activated_checkers[checker.name] = checker

    def activate_checker_name(self, name):
        '''
        Activates checker by name

        Args:
            name (str): checker name

        Returns:
            WarningsChecker: activated checker object, or None when no checker with the given name exists
        '''
        for checker in self.public_checkers:
            if checker.name == name:
                self.activate_checker(checker)
                return checker
        else:
            print("Checker %s does not exist" % name)

    def get_checker(self, name):
        ''' Get checker by name

        Args:
            name (str): checker name
        Return:
            checker object (WarningsChecker)
        '''
        return self.activated_checkers[name]

    def check(self, content):
        '''
        Function for counting the number of warnings in a specific text

        Args:
            content (str): The text to parse
        '''
        if self.printout:
            print(content)

        if not self.activated_checkers:
            print("No checkers activated. Please use activate_checker function")
        else:
            for checker in self.activated_checkers.values():
                checker.check(content)

    def set_maximum(self, maximum):
        ''' Setter function for the maximum amount of warnings

        Args:
            maximum (int): maximum amount of warnings allowed
        '''
        for checker in self.activated_checkers.values():
            checker.set_maximum(maximum)

    def set_minimum(self, minimum):
        ''' Setter function for the minimum amount of warnings

        Args:
            minimum (int): minimum amount of warnings allowed
        '''
        for checker in self.activated_checkers.values():
            checker.set_minimum(minimum)

    def return_count(self, name=None):
        ''' Getter function for the amount of found warnings

        If the name parameter is set, this function will return the amount of
        warnings found by that checker. If not, the function will return the sum
        of the warnings found by all registered checkers.

        Args:
            name (WarningsChecker): The checker for which to return the amount of warnings (if set)

        Returns:
            int: Amount of found warnings
        '''
        self.count = 0
        if name is None:
            for checker in self.activated_checkers.values():
                self.count += checker.return_count()
        else:
            self.count = self.activated_checkers[name].return_count()
        return self.count

    def return_check_limits(self, name=None):
        ''' Function for determining the return value of the script

        If the name parameter is set, this function will check (and return) the
        return value of that checker. If not, this function checks whether the
        warnings for each registered checker are within the configured limits.

        Args:
            name (WarningsChecker): The checker for which to check the return value

        Return:
            int: 0 if the amount of warnings is within limits, the count of warnings otherwise
                (or 1 in case of a count of 0 warnings)
        '''
        if name is None:
            for checker in self.activated_checkers.values():
                retval = checker.return_check_limits()
                if retval:
                    return retval
        else:
            return self.activated_checkers[name].return_check_limits()

        return 0

    def toggle_printout(self, printout):
        ''' Toggle printout of all the parsed content

        Useful for command input where we want to print content as well

        Args:
            printout (bool): True enables the printout, False provides more silent mode
        '''
        self.printout = printout

    def config_parser(self, config):
        ''' Parsing configuration dict extracted by previously opened JSON file

        Args:
            config (dict): Content of configuration file
        '''
        # activate checker
        for checker in self.public_checkers:
            try:
                checker_config = config[checker.name]
                if bool(checker_config['enabled']):
                    substitute_envvar(checker_config, {'min', 'max'})
                    self.activate_checker(checker)
                    checker.parse_config(checker_config)
                    print("Config parsing for {name} completed".format(name=checker.name))
            except KeyError as err:
                print("Incomplete config. Missing: {key}".format(key=err))

    def write_counted_warnings(self, out_file):
        ''' Writes counted warnings to the given file

        Args:
            out_file (str): Location for the output file
        '''
        with open(out_file, 'w', encoding='utf-8', newline='\n') as open_file:
            for checker in self.activated_checkers.values():
                open_file.write("\n".join(checker.counted_warnings) + "\n")

    def write_code_quality_report(self, out_file):
        ''' Generates the Code Quality report artifact as a JSON file that implements a subset of the Code Climate spec

        Args:
            out_file (str): Location for the output file
        '''
        results = []
        for checker in self.activated_checkers.values():
            results.extend(checker.cq_findings)
        content = json.dumps(results, indent=4, sort_keys=False)
        with open(out_file, 'w', encoding='utf-8', newline='\n') as open_file:
            open_file.write(f"{content}\n")


def warnings_wrapper(args):
    parser = argparse.ArgumentParser(prog='mlx-warnings')
    group1 = parser.add_argument_group('Configuration command line options')
    group1.add_argument('--coverity', dest='coverity', action='store_true')
    group1.add_argument('-d', '--doxygen', dest='doxygen', action='store_true')
    group1.add_argument('-j', '--junit', dest='junit', action='store_true')
    group1.add_argument('-r', '--robot', dest='robot', action='store_true')
    group1.add_argument('-s', '--sphinx', dest='sphinx', action='store_true')
    group1.add_argument('-x', '--xmlrunner', dest='xmlrunner', action='store_true')
    group1.add_argument('--name', default='',
                        help='Name of the Robot Framework test suite to check results of')
    group1.add_argument('-m', '--maxwarnings', '--max-warnings', type=int, default=0,
                        help='Maximum amount of warnings accepted')
    group1.add_argument('--minwarnings', '--min-warnings', type=int, default=0,
                        help='Minimum amount of warnings accepted')
    group1.add_argument('--exact-warnings', type=int, default=0,
                        help='Exact amount of warnings expected')
    group2 = parser.add_argument_group('Configuration file with options')
    group2.add_argument('--config', dest='configfile', action='store', required=False, type=Path,
                        help='Config file in JSON format provides toggle of checkers and their limits')
    group2.add_argument('--include-sphinx-deprecation', dest='include_sphinx_deprecation', action='store_true',
                        help="Sphinx checker will include warnings matching (RemovedInSphinx\\d+Warning) regex")
    parser.add_argument('-o', '--output',
                        help='Output file that contains all counted warnings')
    parser.add_argument('-C', '--code-quality',
                        help='Output Code Quality report artifact for GitLab CI')
    parser.add_argument('-v', '--verbose', dest='verbose', action='store_true')
    parser.add_argument('--command', dest='command', action='store_true',
                        help='Treat program arguments as command to execute to obtain data')
    parser.add_argument('--ignore-retval', dest='ignore', action='store_true',
                        help='Ignore return value of the executed command')
    parser.add_argument('--version', action='version', version='%(prog)s {version}'.format(version=__version__))
    parser.add_argument('logfile', nargs='+', help='Logfile (or command) that might contain warnings')
    parser.add_argument('flags', nargs=argparse.REMAINDER,
                        help='Possible not-used flags from above are considered as command flags')

    args = parser.parse_args(args)
    code_quality_enabled = bool(args.code_quality)

    # Read config file
    if args.configfile is not None:
        checker_flags = args.sphinx or args.doxygen or args.junit or args.coverity or args.xmlrunner or args.robot
        warning_args = args.maxwarnings or args.minwarnings or args.exact_warnings
        if checker_flags or warning_args:
            print("Configfile cannot be provided with other arguments")
            sys.exit(2)
        warnings = WarningsPlugin(verbose=args.verbose, config_file=args.configfile, cq_enabled=code_quality_enabled)
    else:
        warnings = WarningsPlugin(verbose=args.verbose, cq_enabled=code_quality_enabled)
        if args.sphinx:
            warnings.activate_checker_name('sphinx')
        if args.doxygen:
            warnings.activate_checker_name('doxygen')
        if args.junit:
            warnings.activate_checker_name('junit')
        if args.xmlrunner:
            warnings.activate_checker_name('xmlrunner')
        if args.coverity:
            warnings.activate_checker_name('coverity')
        if args.robot:
            robot_checker = warnings.activate_checker_name('robot')
            robot_checker.parse_config({
                'suites': [{'name': args.name, 'min': 0, 'max': 0}],
                'check_suite_names': True,
            })
        if args.exact_warnings:
            if args.maxwarnings | args.minwarnings:
                print("expected-warnings cannot be provided with maxwarnings or minwarnings")
                sys.exit(2)
            warnings.set_maximum(args.exact_warnings)
            warnings.set_minimum(args.exact_warnings)
        else:
            warnings.set_maximum(args.maxwarnings)
            warnings.set_minimum(args.minwarnings)

    if args.include_sphinx_deprecation and 'sphinx' in warnings.activated_checkers.keys():
        warnings.get_checker('sphinx').include_sphinx_deprecation()

    if args.command:
        cmd = args.logfile
        if args.flags:
            cmd.extend(args.flags)
        warnings.toggle_printout(True)
        retval = warnings_command(warnings, cmd)

        if (not args.ignore) and (retval != 0):
            return retval
    else:
        if args.flags:
            print(f"WARNING: Some keyword arguments have been ignored because they followed positional arguments: "
                  f"{' '.join(args.flags)!r}")
        retval = warnings_logfile(warnings, args.logfile)
        if retval != 0:
            return retval

    warnings.return_count()
    if args.output:
        warnings.write_counted_warnings(args.output)
    if args.code_quality:
        warnings.write_code_quality_report(args.code_quality)
    return warnings.return_check_limits()


def warnings_command(warnings, cmd):
    ''' Execute command to obtain input for parsing for warnings

    Usually log files are output of the commands. To avoid this additional step
    this function runs a command instead and parses the stderr and stdout of the
    command for warnings.

    Args:
        warnings (WarningsPlugin): Object for warnings where errors should be logged
        cmd (list): List of commands (str), which should be executed to obtain input for parsing

    Return:
        int: Return value of executed command(s)

    Raises:
        OSError: When program is not installed.
    '''
    try:
        print("Executing: ", end='')
        print(cmd)
        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                stdin=subprocess.PIPE, bufsize=1, universal_newlines=True)
        out, err = proc.communicate()
        # Check stdout
        if out:
            try:
                warnings.check(out.decode(encoding="utf-8"))
            except AttributeError:
                warnings.check(out)
        # Check stderr
        if err:
            try:
                warnings.check(err.decode(encoding="utf-8"))
            except AttributeError:
                warnings.check(err)
        return proc.returncode
    except OSError as err:
        if err.errno == errno.ENOENT:
            print("It seems like program " + str(cmd) + " is not installed.")
        raise


def warnings_logfile(warnings, log):
    ''' Parse logfile for warnings

    Args:
        warnings (WarningsPlugin): Object for warnings where errors should be logged
        log: Logfile for parsing

    Return:
        0: Log files existed and are parsed successfully
        1: Log files did not exist
    '''
    # args.logfile doesn't necessarily contain wildcards, but just to be safe, we
    # assume it does, and try to expand them.
    # This mechanism is put in place to allow wildcards to be passed on even when
    # executing the script on windows (in that case there is no shell expansion of wildcards)
    # so that the script can be used in the exact same way even when moving from one
    # OS to another.
    for file_wildcard in log:
        if glob.glob(file_wildcard):
            for logfile in glob.glob(file_wildcard):
                with open(logfile, 'r') as loghandle:
                    warnings.check(loghandle.read())
        else:
            print("FILE: %s does not exist" % file_wildcard)
            return 1

    return 0


def main():
    sys.exit(warnings_wrapper(sys.argv[1:]))


if __name__ == '__main__':
    main()