melexis/sphinx-coverity-extension

View on GitHub
mlx/coverity_services.py

Summary

Maintainability
A
0 mins
Test Coverage
#!/usr/bin/python

'''Services and other utilities for Coverity scripting'''

# General
import csv
import logging
import re
from urllib.error import URLError

# For Coverity - SOAP
from suds.client import Client
from suds.wsse import Security, UsernameToken

# -- Default values -- and global settings

DEFAULT_WS_VERSION = 'v9'

# Coverity built in Impact statuses
IMPACT_LIST = {'High', 'Medium', 'Low'}

KIND_LIST = {'QUALITY', 'SECURITY', 'TEST'}

# Coverity built in Classifications
CLASSIFICATION_LIST = {'Unclassified', 'Pending', 'False Positive', 'Intentional', 'Bug', 'Untested', 'No Test Needed'}

# Coverity built in Actions
ACTION_LIST = {'Undecided', 'Fix Required', 'Fix Submitted', 'Modeling Required', 'Ignore', 'On hold',
               'For Interest Only'}

ISSUE_KIND_2_LABEL = {'QUALITY': 'Quality', 'SECURITY': 'Security', 'Various': 'Quality/Security', 'TEST': 'Testing'}


# names of Coverity Triage/Attribute fields
EXT_REFERENCE_ATTR_NAME = "Ext. Reference"
DEFECT_STATUS_ATTR_NAME = "DefectStatus"
CLASSIFICATION_ATTR_NAME = "Classification"
ACTION_ATTR_NAME = "Action"
COMMENT_ATTR_NAME = "Comment"


def parse_two_part_term(term, delim=','):
    '''Parse a term assuming [ [part1],[part2] ]'''
    valid = False
    part1 = ""
    part2 = ""
    if term.find(delim) != -1:
        valid = True
        field1 = term.split(delim, 1)[0]
        if bool(field1):
            part1 = field1
        field2 = term.rsplit(delim, 1)[-1]
        if bool(field2):
            part2 = field2
    return valid, part1, part2


def compare_strings(str_a, str_b):
    '''Compare strings for equivalence

    some leniency allowed such as spaces and casing
    '''
    if re.match(str_b, str_a, flags=re.IGNORECASE):
        return True
    # ignore embedded spaces and some odd punctuation characters ("todo" = "To-Do")
    str_a2 = re.sub(r'[.:\-_ ]', '', str_a)
    str_b2 = re.sub(r'[:\-_ ]', '', str_b)  # don't remove dot (part of regex?)
    if re.match(str_b2, str_a2, flags=re.IGNORECASE):
        return True
    return False


class Service:
    '''
    Basic endpoint Service
    '''

    def __init__(self, transport, hostname, port, ws_version=DEFAULT_WS_VERSION):
        self.set_transport(transport)
        self.set_hostname(hostname)
        self.set_port(port)
        self.set_ws_version(ws_version)
        self.client = None

    def set_transport(self, transport):
        '''Set transport protocol'''
        self.transport = transport

    def get_transport(self):
        '''Get transport protocol'''
        return self.transport

    def set_hostname(self, hostname):
        '''Set hostname for service'''
        self.hostname = hostname

    def get_hostname(self):
        '''Get hostname for service'''
        return self.hostname

    def set_port(self, port):
        '''Set port for service'''
        self.port = port

    def get_port(self):
        '''Get port for service'''
        return self.port

    def set_ws_version(self, ws_version):
        '''Set WS version for service'''
        self.ws_version = ws_version

    def get_ws_version(self):
        '''Get WS version for service'''
        return self.ws_version

    def get_service_url(self, path='', add_port=True):
        '''Get Service url with given path'''
        url = self.transport + '://' + self.hostname
        if self.port and add_port:
            url += ':' + self.port
        if path:
            url += path
        return url

    def get_ws_url(self, service):
        '''Get WS url with given service'''
        return self.get_service_url('/ws/' + self.ws_version + '/' + service + '?wsdl')

    def login(self, username, password):
        '''Login to Coverity using given username and password'''
        security = Security()
        token = UsernameToken(username, password)
        security.tokens.append(token)
        self.client.set_options(wsse=security)

    def validate_presence(self, url, service_name):
        '''Initializes the client attribute while validating the presence of the service'''
        try:
            self.client = Client(url)
            logging.info("Validated presence of %s [%s]", service_name, url)
        except URLError:
            self.client = None
            logging.critical("No such %s [%s]", service_name, url)
            raise


class CoverityConfigurationService(Service):
    '''
    Coverity Configuration Service (WebServices)
    '''

    def __init__(self, transport, hostname, port, ws_version=DEFAULT_WS_VERSION):
        super(CoverityConfigurationService, self).__init__(transport, hostname, port, ws_version)
        self.checkers = None
        url = self.get_ws_url('configurationservice')
        logging.getLogger('suds.client').setLevel(logging.CRITICAL)
        self.validate_presence(url, 'Coverity Configuration Service')

    def login(self, username, password):
        '''Login to Coverity Configuration service using given username and password'''
        super(CoverityConfigurationService, self).login(username, password)
        version = self.get_version()
        if version is None:
            raise RuntimeError("Authentication to [%s] FAILED for [%s] account - check password"
                               % (self.get_service_url(), username))
        else:
            logging.info("Authentication to [%s] using [%s] account was OK - version [%s]",
                         self.get_service_url(), username, version.externalVersion)

    def get_version(self):
        '''Get the version of the service, can be used as a means to validate access permissions'''
        try:
            return self.client.service.getVersion()
        except URLError:
            return None

    @staticmethod
    def get_project_name(stream):
        '''Get the project name from the stream object'''
        return stream.primaryProjectId.name

    @staticmethod
    def get_triage_store(stream):
        '''Get the name of the triaging store from the stream object'''
        return stream.triageStoreId.name

    def get_stream(self, stream_name):
        '''Get the stream object from the stream name'''
        filter_spec = self.client.factory.create('streamFilterSpecDataObj')

        # use stream name as an initial glob pattern
        filter_spec.namePattern = stream_name

        # get all the streams that match
        streams = self.client.service.getStreams(filter_spec)

        # find the one with a matching name
        for stream in streams:
            if compare_strings(stream.id.name, stream_name):
                return stream
        return None

    # get a list of the snapshots in a named stream
    def get_snapshot_for_stream(self, stream_name):
        '''Get snapshot object for given stream name'''
        stream_id = self.client.factory.create('streamIdDataObj')
        stream_id.name = stream_name
        # optional filter specification
        filter_spec = self.client.factory.create('snapshotFilterSpecDataObj')
        # return a list of snapshotDataObj
        return self.client.service.getSnapshotsForStream(stream_id, filter_spec)

    @staticmethod
    def get_snapshot_id(snapshots, idx=1):
        '''Get the nth snapshot (base 1) - minus numbers to count from the end backwards (-1 = last)'''
        if bool(idx):
            num_snapshots = len(snapshots)
            if idx < 0:
                required = num_snapshots + idx + 1
            else:
                required = idx

            if abs(required) > 0 and abs(required) <= num_snapshots:
                # base zero
                return snapshots[required - 1].id
        return 0

    def get_snapshot_detail(self, snapshot_id):
        '''Get detailed information about a single snapshot'''
        snapshot = self.client.factory.create('snapshotIdDataObj')
        snapshot.id = snapshot_id
        # return a snapshotInfoDataObj
        return self.client.service.getSnapshotInformation(snapshot)

    def get_checkers(self):
        '''Get a list of checkers from the service'''
        if not self.checkers:
            self.checkers = self.client.service.getCheckerNames()
        return self.checkers

    @staticmethod
    def add_filter_rqt(name, req_csv, valid_list, filter_list, allow_regex=False):
        '''Lookup the list of given filter possibility, add to filter spec and return a validated list'''
        logging.info('Validate required %s [%s]', name, req_csv)
        validated = ""
        delim = ""
        for field in req_csv.split(','):
            if not valid_list or field in valid_list:
                logging.info('Classification [%s] is valid', field)
                filter_list.append(field)
                validated += delim + field
                delim = ","
            elif allow_regex:
                pattern = re.compile(field)
                for element in valid_list:
                    if pattern.search(element) and element not in filter_list:
                        filter_list.append(element)
                        validated += delim + element
                        delim = ","
            else:
                logging.error('Invalid %s filter: %s', name, field)
        return validated


class CoverityDefectService(Service):
    '''
    Coverity Defect Service (WebServices)
    '''

    def __init__(self, config_service):
        '''Create a Defect Service, bound to the given Configuration Service'''
        super(CoverityDefectService, self).__init__(config_service.get_transport(),
                                                    config_service.get_hostname(),
                                                    config_service.get_port(),
                                                    config_service.get_ws_version())
        self.config_service = config_service
        self.filters = ""
        # logging.getLogger('suds.client').setLevel(logging.DEBUG)
        url = self.get_ws_url('defectservice')
        self.validate_presence(url, 'Coverity Defect Service')

    def get_defects(self, project, stream, filters, custom=None):
        """ Gets a list of defects for given stream, with some query criteria.

        Args:
            project (str): Name of the project to query
            stream (str): Name of the stream to query
            filters (dict): Dictionary with attribute names as keys and CSV lists of attribute values to query as values
            custom (str): A custom query

        Returns:
            (suds.sudsobject.mergedDefectsPageDataObj) Suds mergedDefectsPageDataObj object containing filtered defects
        """
        logging.info('Querying Coverity for defects in project [%s] stream [%s] ...', project, stream)

        # define the project
        project_id = self.client.factory.create('projectIdDataObj')
        project_id.name = project

        # and the stream
        stream_id = self.client.factory.create('streamIdDataObj')
        stream_id.name = stream

        # create filter spec
        filter_spec = self.client.factory.create('snapshotScopeDefectFilterSpecDataObj')

        # only for this stream
        filter_spec.streamIncludeNameList.append(stream_id)

        # apply any filter on checker names
        if filters['checker']:
            self.config_service.get_checkers()
            self.handle_attribute_filter(filters['checker'],
                                         'Checker',
                                         self.config_service.checkers,
                                         filter_spec.checkerList,
                                         allow_regex=True)

        # apply any filter on impact status
        if filters['impact']:
            self.handle_attribute_filter(filters['impact'], 'Impact', IMPACT_LIST, filter_spec.impactNameList)

        # apply any filter on issue kind
        if filters['kind']:
            self.handle_attribute_filter(filters['kind'], 'Kind', KIND_LIST, filter_spec.issueKindList)

        # apply any filter on classification
        if filters['classification']:
            self.handle_attribute_filter(filters['classification'],
                                         'Classification',
                                         CLASSIFICATION_LIST,
                                         filter_spec.classificationNameList)

        # apply any filter on action
        if filters['action']:
            self.handle_attribute_filter(filters['action'], 'Action', ACTION_LIST, filter_spec.actionNameList)

        # apply any filter on Components
        if filters['component']:
            self.handle_component_filter(filters['component'], filter_spec)

        # apply any filter on CWE values
        if filters['cwe']:
            self.handle_attribute_filter(filters['cwe'], 'CWE', None, filter_spec.cweList)

        # apply any filter on CID values
        if filters['cid']:
            self.handle_attribute_filter(filters['cid'], 'CID', None, filter_spec.cidList)

        # if a special custom attribute value requirement
        if custom:
            self.handle_custom_filter_attribute(custom, filter_spec)

        # create page spec
        page_spec = self.client.factory.create('pageSpecDataObj')
        page_spec.pageSize = 9999
        page_spec.sortAscending = True
        page_spec.startIndex = 0

        # create snapshot scope
        snapshot_scope = self.client.factory.create('snapshotScopeSpecDataObj')

        snapshot_scope.showOutdatedStreams = False
        snapshot_scope.compareOutdatedStreams = False

        snapshot_scope.showSelector = 'last()'
        snapshot_scope.compareSelector = 'last()'

        logging.info('Running Coverity query...')
        return self.client.service.getMergedDefectsForSnapshotScope(project_id, filter_spec,
                                                                    page_spec, snapshot_scope)

    def handle_attribute_filter(self, attribute_values, name, *args, **kwargs):
        """ Applies any filter on an attribute's values.

        Args:
            attribute_values (str): A CSV list of attribute values to query.
            name (str): String representation of the attribute.
        """
        logging.info('Using %s filter [%s]', name, attribute_values)
        validated = self.config_service.add_filter_rqt(name, attribute_values, *args, **kwargs)
        logging.info('Resolves to [%s]', validated)
        if validated:
            self.filters += ("<%s(%s)> " % (name, validated))

    def handle_component_filter(self, attribute_values, filter_spec):
        """ Applies any filter on the component attribute's values.

        Args:
            attribute_values (str): A CSV list of attribute values to query.
            filter_spec (sudsobject.Factory): Object to store filter attributes.
        """
        logging.info('Using Component filter [%s]', attribute_values)
        parser = csv.reader([attribute_values])

        for fields in parser:
            for _, field in enumerate(fields):
                field = field.strip()
                component_id = self.client.factory.create('componentIdDataObj')
                component_id.name = field
                filter_spec.componentIdList.append(component_id)
        self.filters += ("<Components(%s)> " % (attribute_values))

    def handle_custom_filter_attribute(self, custom, filter_spec):
        """ Handles a custom attribute definition, and adds it to the filter spec if it's valid.

        Args:
            custom (str): A custom query.
            filter_spec (sudsobject.Factory): Object to store filter attributes.

        Raises:
            ValueError: Invalid custom attribute definition.
        """
        logging.info('Using attribute filter [%s]', custom)
        # split the name:value[;name:value1[,value2]]
        for fields in csv.reader([custom], delimiter=';'):
            for i, name_value_pair in enumerate(fields):
                name_value_pair = name_value_pair.strip()
                valid, name, values = parse_two_part_term(name_value_pair, ':')
                if valid:
                    logging.info("attr (%d) [%s] = any of ...", i + 1, name)

                    attribute_definition_id = self.client.factory.create('attributeDefinitionIdDataObj')
                    attribute_definition_id.name = name

                    filter_map = self.client.factory.create('attributeDefinitionValueFilterMapDataObj')
                    filter_map.attributeDefinitionId = attribute_definition_id

                    self._append_multiple_values(values, filter_map)

                    filter_spec.attributeDefinitionValueFilterMap.append(filter_map)
                else:
                    raise ValueError('Invalid custom attribute definition [%s]' % name_value_pair)
        self.filters += ("<Attrs(%s)> " % custom)

    def _append_multiple_values(self, values, filter_map):
        '''Append multiple values if there are multiple values delimited with comma'''
        for value_fields in csv.reader([values], delimiter=','):
            for value in value_fields:
                logging.info("             [%s]", value)

                attribute_value_id = self.client.factory.create('attributeValueIdDataObj')
                attribute_value_id.name = value

                filter_map.attributeValueIds.append(attribute_value_id)

    def get_defect(self, cid, stream):
        '''Get the details pertaining a specific CID - it may not have defect instance details if newly eliminated
        (fixed)'''
        logging.info('Fetching data for CID [%s] in stream [%s] ...', cid, stream)

        merged_defect_id = self.client.factory.create('mergedDefectIdDataObj')
        merged_defect_id.cid = cid

        filter_spec = self.client.factory.create('streamDefectFilterSpecDataObj')
        filter_spec.includeDefectInstances = True
        filter_spec.includeHistory = True

        stream_id = self.client.factory.create('streamIdDataObj')
        stream_id.name = stream
        filter_spec.streamIdList.append(stream_id)

        return self.client.service.getStreamDefects(merged_defect_id, filter_spec)

    def add_attribute_name_and_value(self, defect_state_spec, attr_name, attr_value):
        '''Add attribute name and value to given defect state specification'''

        # name value pair to update
        attribute_definition_id = self.client.factory.create('attributeDefinitionIdDataObj')
        attribute_definition_id.name = attr_name

        attribute_value_id = self.client.factory.create('attributeValueIdDataObj')
        attribute_value_id.name = attr_value

        # wrap the name/value pair
        defect_state_attr_value = self.client.factory.create('defectStateAttributeValueDataObj')
        defect_state_attr_value.attributeDefinitionId = attribute_definition_id
        defect_state_attr_value.attributeValueId = attribute_value_id

        # add to our list
        defect_state_spec.defectStateAttributeValues.append(defect_state_attr_value)

    # update the external reference id to a third party
    def update_ext_reference_attribute(self, cid, triage_store, ext_ref_id, ccomment=None):
        '''Update external reference attribute for given CID'''
        logging.info('Updating Coverity: CID [%s] in TS [%s] with Ext Ref [%s]', cid, triage_store, ext_ref_id)

        # triage store identifier
        triage_store_id = self.client.factory.create('triageStoreIdDataObj')
        triage_store_id.name = triage_store

        # CID to update
        merged_defect_id = self.client.factory.create('mergedDefectIdDataObj')
        merged_defect_id.cid = cid

        # if an ext ref id value supplied
        if bool(ext_ref_id):
            attr_value = ext_ref_id
            comment_value = 'Automatically recorded reference to new JIRA ticket.'
        else:
            # set to a space - which works as a blank without the WS complaining :-)
            attr_value = " "
            comment_value = 'Automatically cleared former JIRA ticket reference.'

        # if a Coverity comment to tag on the end
        if bool(ccomment):
            comment_value += " " + ccomment
        logging.info('Comment = [%s]', comment_value)

        defect_state_spec = self.client.factory.create('defectStateSpecDataObj')

        # name value pairs to add to this update
        self.add_attribute_name_and_value(defect_state_spec, EXT_REFERENCE_ATTR_NAME, attr_value)
        self.add_attribute_name_and_value(defect_state_spec, COMMENT_ATTR_NAME, comment_value)

        # apply the update
        return self.client.service.updateTriageForCIDsInTriageStore(triage_store_id, merged_defect_id,
                                                                    defect_state_spec)

    @staticmethod
    def get_instance_impact(stream_defect, instance_number=1):
        '''Get the current impact of the 'nth' incident of this issue (High/Medium/Low)'''
        counter = instance_number
        for instance in stream_defect.defectInstances:
            counter -= 1
            if counter == 0:
                return instance.impact.name
        return ""

    @staticmethod
    def get_value_for_named_attribute(stream_defect, attr_name):
        '''Lookup the value of a named attribute'''
        logging.info('Get value for cov attribute [%s]', attr_name)
        for attr_value in stream_defect.defectStateAttributeValues:
            if compare_strings(attr_value.attributeDefinitionId.name, attr_name):
                logging.info('Resolves to [%s]', attr_value.attributeValueId.name)
                return str(attr_value.attributeValueId.name)
        logging.warning('Value for attribute [%s] not found', attr_name)
        return ""

    @staticmethod
    def get_event_attribute_value(defect_state, name, value=None):
        '''Get specified attribute was set to given matching value'''
        if bool(value):
            logging.info('Searching for attribute [%s] with value [%s]', name, value)
        else:
            logging.info('Searching for attribute [%s]', name)

        for attr_value in defect_state.defectStateAttributeValues:
            # check if we have the named attribute
            if compare_strings(attr_value.attributeDefinitionId.name, name):
                # if any value supplied or it matches requirement
                if bool(attr_value.attributeValueId.name) and\
                   (not value or compare_strings(attr_value.attributeValueId.name, value)):
                    logging.info('Found [%s] = [%s]',
                                 attr_value.attributeDefinitionId.name, attr_value.attributeValueId.name)
                    return True, attr_value.attributeValueId.name
                # break attribute name search - either no value or it doesn't match
                break
        logging.warning('Event for attribute [%s] not found', name)
        return False, None

    def seek_nth_match(self, event_history, nth_event, attr_name, attr_value):
        '''Seek for a given attribute name-value pair in the triaging history'''
        num_match = 0
        for defect_state in event_history:
            # look for the attribute name-value pair in this triage event
            req_event_found, req_attr_value = self.get_event_attribute_value(defect_state, attr_name, attr_value)
            if req_event_found:
                num_match += 1
                # correct one?
                if num_match == nth_event:
                    return True, defect_state, req_attr_value
        return False, None, None

    def get_event_for_attribute_change(self, stream_defect, nth_term, attr_name, attr_value=None):
        '''Get event when specified attribute was set to given matching value'''
        logging.info('Searching for triage event n=[%d] where attribute [%s] is set to [%s]',
                     nth_term, attr_name, attr_value)

        if nth_term > 0:
            found, defect_state, value = self.seek_nth_match(stream_defect.history, nth_term, attr_name, attr_value)
        else:
            found, defect_state, value = self.seek_nth_match(reversed(stream_defect.history), abs(int(nth_term)),
                                                             attr_name, attr_value)

        return found, defect_state, value

    def get_ext_reference_id(self, stream_defect):
        '''Get external reference ID attribute value for given defect'''
        return self.get_value_for_named_attribute(stream_defect, EXT_REFERENCE_ATTR_NAME)

    def get_defect_status(self, stream_defect):
        '''Get defect status attribute value for given defect'''
        return self.get_value_for_named_attribute(stream_defect, DEFECT_STATUS_ATTR_NAME)

    def get_classification(self, stream_defect):
        '''Get classification attribute value for given defect'''
        return self.get_value_for_named_attribute(stream_defect, CLASSIFICATION_ATTR_NAME)

    def get_action(self, stream_defect):
        '''Get action attribute value for given defect'''
        return self.get_value_for_named_attribute(stream_defect, ACTION_ATTR_NAME)

    def get_defect_url(self, stream, cid):
        '''Get URL for given defect CID
        http://machine1.eng.company.com/query/defects.htm?stream=StreamA&cid=1234
        '''
        return self.get_service_url('/query/defects.htm?stream=%s&cid=%s' % (stream, str(cid)), add_port=False)


if __name__ == '__main__':
    print("Sorry, no main here")