melexis/sphinx-coverity-extension

View on GitHub
mlx/coverity.py

Summary

Maintainability
A
0 mins
Test Coverage
# -*- coding: utf-8 -*-

'''
Coverity plugin

Sphinx extension for restructured text that adds Coverity reporting to documentation.
See README.rst for more details.
'''
from getpass import getpass
from urllib.error import URLError, HTTPError

import pkg_resources

from mlx.coverity_logging import report_info, report_warning
from mlx.coverity_services import CoverityConfigurationService, CoverityDefectService
from mlx.coverity_directives.coverity_defect_list import CoverityDefect, CoverityDefectListDirective


class SphinxCoverityConnector():
    """
    Class containing functions and variables for Sphinx to access in specific stages of the documentation build.
    """
    project_name = ''
    coverity_service = None

    def __init__(self):
        """
        Initialize the object by setting error variable to false
        """
        self.coverity_login_error = False
        self.coverity_login_error_msg = ''
        self.stream = ''

    def initialize_environment(self, app):
        """
        Perform initializations needed before the build process starts.
        """
        # LaTeX-support: since we generate empty tags, we need to relax the verbosity of that error
        if 'preamble' not in app.config.latex_elements:
            app.config.latex_elements['preamble'] = ''
        app.config.latex_elements['preamble'] += '''\
    \\makeatletter
    \\let\@noitemerr\\relax
    \\makeatother'''

        self.stream = app.config.coverity_credentials['stream']

        # Login to Coverity and obtain stream information
        try:
            self.input_credentials(app.config.coverity_credentials)
            report_info('Login to Coverity server... ', True)
            coverity_conf_service = CoverityConfigurationService(app.config.coverity_credentials['transport'],
                                                                 app.config.coverity_credentials['hostname'],
                                                                 app.config.coverity_credentials['port'])
            coverity_conf_service.login(app.config.coverity_credentials['username'],
                                        app.config.coverity_credentials['password'])
            report_info('done')

            report_info('obtaining stream information... ', True)
            stream = coverity_conf_service.get_stream(self.stream)
            if stream is None:
                raise ValueError('No such Coverity stream [%s] found on [%s]' %
                                 (self.stream, coverity_conf_service.get_service_url()))
            report_info('done')

            # Get Stream's project name
            report_info('obtaining project name from stream... ', True)
            self.project_name = coverity_conf_service.get_project_name(stream)
            report_info('done')
            self.coverity_service = CoverityDefectService(coverity_conf_service)
            self.coverity_service.login(app.config.coverity_credentials['username'],
                                        app.config.coverity_credentials['password'])
        except (URLError, HTTPError, Exception, ValueError) as error_info:  # pylint: disable=broad-except
            if isinstance(error_info, EOFError):
                self.coverity_login_error_msg = "Coverity credentials are not configured."
            else:
                self.coverity_login_error_msg = str(error_info)
            report_info('failed with: %s' % error_info)
            self.coverity_login_error = True

    # -----------------------------------------------------------------------------
    # Event handlers
    def process_coverity_nodes(self, app, doctree, fromdocname):
        """
        This function should be triggered upon ``doctree-resolved event``

        Obtain information from Coverity server and generate a table.
        """
        if self.coverity_login_error:
            # Create failed topnode
            for node in doctree.traverse(CoverityDefect):
                top_node = node.create_top_node("Failed to connect to Coverity Server")
                node.replace_self(top_node)
            report_warning('Connection failed: %s' % self.coverity_login_error_msg, fromdocname)
            return

        # Item matrix:
        # Create table with related items, printing their target references.
        # Only source and target items matching respective regexp shall be included
        for node in doctree.traverse(CoverityDefect):
            # Get items from server
            try:
                defects = self.get_filtered_defects(node)
            except (URLError, AttributeError, Exception) as err:  # pylint: disable=broad-except
                report_warning('failed with %s' % err, fromdocname)
                continue
            node.perform_replacement(defects, self, app, fromdocname)

    # -----------------------------------------------------------------------------
    # Helper functions of event handlers
    @staticmethod
    def input_credentials(config_credentials):
        """ Ask user to input username and/or password if they haven't been configured yet.

        Args:
            config_credentials (dict): Dictionary to store the user's credentials.
        """
        if not config_credentials['username']:
            config_credentials['username'] = input("Coverity username: ")
        if not config_credentials['password']:
            config_credentials['password'] = getpass("Coverity password: ")

    def get_filtered_defects(self, node):
        """ Fetch defects from suds using filters stored in the given CoverityDefect object.

        Args:
            node (CoverityDefect): CoverityDefect object with zero or more filters stored.

        Returns:
            (suds.sudsobject.mergedDefectsPageDataObj) Suds mergedDefectsPageDataObj object containing filtered defects.
        """
        report_info('obtaining defects... ', True)
        defects = self.coverity_service.get_defects(self.project_name, self.stream, node['filters'])
        report_info("%d received" % (defects['totalNumberOfRecords']))
        report_info("building defects table and/or chart... ", True)
        return defects


# Extension setup
def setup(app):
    '''Extension setup'''
    # Create default configuration. Can be customized in conf.py
    app.add_config_value('coverity_credentials',
                         {
                             'hostname': 'scan.coverity.com',
                             'port': '8080',
                             'transport': 'http',
                             'username': 'reporter',
                             'password': 'coverity',
                             'stream': 'some_coverty_stream',
                         },
                         'env')

    app.add_config_value('TRACEABILITY_ITEM_ID_REGEX', r"([A-Z_]+-[A-Z0-9_]+)", 'env')
    app.add_config_value('TRACEABILITY_ITEM_RELINK', {}, 'env')

    app.add_node(CoverityDefect)

    sphinx_coverity_connector = SphinxCoverityConnector()

    app.add_directive('coverity-list', CoverityDefectListDirective)

    app.connect('doctree-resolved', sphinx_coverity_connector.process_coverity_nodes)

    app.connect('builder-inited', sphinx_coverity_connector.initialize_environment)

    try:
        version = pkg_resources.require('mlx.coverity')[0].version
    except LookupError:
        version = 'dev'
    return {
        'version': version,
        'parallel_read_safe': True,
        'parallel_write_safe': True,
    }