distributed-system-analysis/run-perf

View on GitHub
runperf/result.py

Summary

Maintainability
C
1 day
Test Coverage
A
92%
#!/bin/env python3
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2019
# Author: Lukas Doktor <ldoktor@redhat.com>
# Based on: https://github.com/avocado-framework/avocado/blob/master/avocado
#           /plugins/xunit.py
#           created by Ruda Moura <rmoura@redhat.com>

import collections
import datetime
import glob
import json
import logging
import math
import os
import re
import string
from xml.dom.minidom import Document  # nosec

import numpy

from . import utils


# Test statuses
PASS = 0
SKIP = 99
MINOR_GAIN = 1  # 1/2 tolerance gain
MINOR_LOSS = 2  # 1/2 tolerance loss
FAIL = FAIL_LOSS = -1
FAIL_GAIN = -3
ERROR = -2

STATUS_MAP = {PASS: 'PASS',
              MINOR_GAIN: 'PASS',
              MINOR_LOSS: 'PASS',
              FAIL: 'FAIL',
              ERROR: 'ERR ',
              SKIP: 'ERR ',
              FAIL_GAIN: 'ERR '}

PRINTABLE = string.ascii_letters + string.digits + string.punctuation + '\n\r '

_RE_FAILED_ITERATION_NAME = re.compile(r'.*-fail(\d+)$')

LOG = logging.getLogger(__name__)


def get_uncertainty(no_samples):
    """Return uncertainty coefficient based on the number of no_samples"""
    coefficients = [7, 2.3, 1.7, 1.4, 1.3, 1.3, 1.2, 1.2]
    if no_samples <= 0:
        raise ValueError(f"Number of samples must be > 0 ({no_samples})")
    if no_samples <= 8:
        return coefficients[no_samples - 1]
    return 1


class Model:

    """Model base-class"""

    mean_tolerance = None
    stddev_tolerance = None
    processing_dst_results = False

    def check_result(self, test_name, src, dst):
        """
        Apply model to a test_name

        :param test_name: Name of the current check
        :param src: Original source score
        :param dst: Original destination score
        :param primary: Whether the check is primary
        :return: [(check_name, difference, weight, source value), ...]
                 where source_value is an optional value correcting the source
                 value
        """
        raise NotImplementedError

    def identify(self, data):
        """
        Set/train the model based on provided data

        :param data: dict of {result: [value, value, value]}
        """
        raise NotImplementedError


class ModelLinearRegression(Model):

    """
    Simple linear regression model
    """

    # In case of too-similar-results the model would be stricter, than
    # the original criteria. Let's use the raw values divided by this
    # coefficient to still allow stricter criteria, but not too strict.
    TOO_STRICT_COEFFICIENT = 1.1

    def __init__(self, mean_tolerance, stddev_tolerance, model=None):
        self.mean_tolerance = mean_tolerance
        self.stddev_tolerance = stddev_tolerance
        if model:
            with open(model, encoding="utf-8") as fd_model:
                self.model = json.load(fd_model)
            if "__metadata__" not in self.model:
                # Old results, "upgrade" it
                for key in self.model:
                    self.model[key] = {"raw": None,
                                       "equation": self.model[key]}
        else:
            self.model = {}

    def check_result(self, test_name, src, dst):
        model = self.model.get(test_name)
        if model is None:
            return []
        equation = model["equation"]
        msrc = model["raw"]
        out = [("model", equation[0] * dst + equation[1], 1, msrc)]
        if msrc is not None:
            if test_name.endswith("mean"):
                if src == 0:
                    mrawdiff = 0
                else:
                    mrawdiff = (float(dst) - msrc) / abs(msrc) * 100
            else:
                mrawdiff = msrc - dst
            out.append(("mraw", mrawdiff, 0))
        return out

    def _identify(self, low, high):
        """
        Calculate the linear equation out of min-max values using
        the self.mean_tolerance.

        :param low: low value to be mapped to -self.mean_tolerance
        :param high: high value to be mapped to +self.mean_tolerance
        :return: list of linear equation coefficients (a[0]*x + a[1])
                 or None in case of singular matrix
        """
        equation1 = numpy.array([[high, 1],
                                 [low, 1]])
        equation2 = numpy.array([self.mean_tolerance, -self.mean_tolerance])
        try:
            return list(numpy.linalg.solve(equation1, equation2))
        except numpy.linalg.LinAlgError:
            # Singular matrix, skip this one and use conventional
            # evaluation instead
            return None

    def identify(self, data):
        """
        Identify model based on data

        :param data: dict of {result: [value, value, value]}
        :note: currently uses self.mean_tolerance for all tolerances
        """
        if "__metadata__" not in self.model:
            self.model["__metadata__"] = {"version": 2}
        self.model["__metadata__"]["tolerance"] = self.mean_tolerance
        too_strict_coefficient = (self.mean_tolerance / 100 /
                                  self.TOO_STRICT_COEFFICIENT)
        for test in sorted(data.keys()):
            try:
                values = [float(_) for _ in data.get(test, {}).values()]
            except ValueError:
                # Probably string (error, other kind of result)
                continue
            average = numpy.average(values)
            max_value = max(values)
            highest = average * (1 + too_strict_coefficient)
            if highest > max_value:
                LOG.debug("%s: Adjusting max_value from %.2f to %.2f", test,
                          max_value, highest)
                max_value = highest
            min_value = min(values)
            lowest = average * (1 - too_strict_coefficient)
            if lowest < min_value:
                LOG.debug("%s: Adjusting min_value from %.2f to %.2f", test,
                          min_value, lowest)
                min_value = lowest
            model = self._identify(min_value, max_value)
            if not model:
                # Singular matrix, not possible to map
                LOG.debug("%s: Singular matrix, skipping...", test)
                continue
            if test not in self.model:
                self.model[test] = {}
            self.model[test]["equation"] = model
            self.model[test]["raw"] = average
            self.model[test]["mmin"] = (min_value - average) / average * 100
            self.model[test]["mmax"] = (max_value - average) / average * 100
            LOG.debug("%s: MIN %s->%s MAX %s->%s", test,
                      -self.mean_tolerance,
                      self.model[test]["mmin"],
                      self.mean_tolerance,
                      self.model[test]["mmax"])
        return self.model

    def rebase(self, data):
        """
        Rebase the model to a new average raw values while keeping the
        acceptable deviation.

        :param data: dict of {result: [value, value, value]}
        """
        for test, values in sorted(data.items()):
            if test not in self.model:
                # Test wasn't present in the current model, add it via identify
                self.identify({test: data[test]})
                continue
            try:
                average = numpy.average([float(_) for _ in values.values()])
            except ValueError:
                # Probably string (error, other kind of result)
                continue
            a_x = - self.model[test]["equation"][0]
            self.model[test]["equation"][1] = average * a_x
        return self.model


class ModelStdev(ModelLinearRegression):

    """
    Simple linear regression model using 3*stddev as error
    """
    ERROR_COEFICIENT = 3
    TOO_STRICT_COEFFICIENT = 1.1

    def identify(self, data):
        """
        Identify model based on data

        :param data: dict of {result: [value, value, value]}
        :note: currently uses self.mean_tolerance for all tolerances
        """
        if "__metadata__" not in self.model:
            self.model["__metadata__"] = {"version": 1}
        self.model["__metadata__"]["tolerance"] = self.mean_tolerance
        too_strict_coefficient = (self.mean_tolerance / 100 /
                                  self.TOO_STRICT_COEFFICIENT)
        for test in sorted(data.keys()):
            try:
                values = [float(_) for _ in data.get(test, {}).values()]
            except ValueError:
                # Probably string (error, other kind of result)
                continue
            uncertainty = get_uncertainty(len(values))
            average = numpy.average(values)
            max_stddev = self.ERROR_COEFICIENT * numpy.std(values)
            max_value = average + (max_stddev * uncertainty)
            min_value = average - (max_stddev * uncertainty)
            highest = average * (1 + too_strict_coefficient)
            if highest > max_value:
                LOG.debug("%s: Adjusting max_value from %.2f to %.2f", test,
                          max_value, highest)
                max_value = highest
            lowest = average * (1 - too_strict_coefficient)
            if lowest < min_value:
                LOG.debug("%s: Adjusting min_value from %.2f to %.2f", test,
                          min_value, lowest)
                min_value = lowest
            model = self._identify(min_value, max_value)
            if not model:
                # Singular matrix, not possible to map
                LOG.debug("%s: Singular matrix, skipping...", test)
                continue
            if test not in self.model:
                self.model[test] = {}
            self.model[test]["equation"] = model
            self.model[test]["raw"] = average
            self.model[test]["mmin"] = (min_value - average) / average * 100
            self.model[test]["mmax"] = (max_value - average) / average * 100
            LOG.debug("%s: MIN %s->%s MAX %s->%s", test,
                      -self.mean_tolerance,
                      self.model[test]["mmin"],
                      self.mean_tolerance,
                      self.model[test]["mmax"])
        return self.model


class Result:
    """XUnitResult object"""

    __slots__ = ("_score", "primary", "_status", "_details", "classname",
                 "testname", "srcs", "dst", "params", "good", "small", "big",
                 "error", "agg_diffs", "agg_weights", "tolerance")
    _re_name = re.compile(r'([^/]+)/([^/]+)/([^:]+):'
                          r'./([^/]+)/([^/]+)/([^\.]+)\.(.+)')

    def __init__(self, test, dst, tolerance, primary=False, params=None):
        self._status = None
        self._score = None
        self._details = None
        name = test.rsplit('/', 1)
        if len(name) == 2:
            self.classname, self.testname = name
        elif name:
            self.classname = "<undefined>"
            self.testname = name[0]
        else:
            raise ValueError(f"No test specified {test}")
        self.tolerance = tolerance
        self.primary = primary
        if params is None:
            self.params = {}
        else:
            self.params = params
        self.srcs = []
        self.dst = dst
        self.good = []
        self.small = []
        self.big = []
        self.error = []
        self.agg_diffs = 0
        self.agg_weights = 0

    def _recalculate(self):
        """Recalculate this result status"""
        if not self.agg_weights:
            if not self.error:
                self._score = -100
                self._status = ERROR
                self._details = "No results yet"
                return
            score = -100
        else:
            score = self.agg_diffs / self.agg_weights
        if abs(score) <= self.tolerance:
            report = ["good", "big", "small"]
            minor_tolerance = self.tolerance / 2
            if score > minor_tolerance:
                status = MINOR_GAIN
            elif score < minor_tolerance:
                status = MINOR_LOSS
            else:
                status = PASS
        else:
            if score > 0:
                report = ["big", "good", "small"]
                status = FAIL_GAIN
            else:
                report = ["small", "good", "big"]
                status = FAIL_LOSS
        if self.error:
            report = ["error"] + report
            status = ERROR
        out = []
        for section in report:
            values = getattr(self, section)
            if values:
                out.append(f"{section.upper()} {', '.join(values)}")
        srcs = "/".join(("%.2f" if isinstance(_, float) else "%s") % _
                        for _ in self.srcs)
        out.append(f"({srcs}; {self.dst})")
        out.append(f"+-{self.tolerance}% tolerance")
        self._score = score
        self._status = status
        self._details = " ".join(out)

    @property
    def status(self):
        """Result status"""
        if self._status is None:
            self._recalculate()
        return self._status

    @property
    def score(self):
        """Result score"""
        if self._status is None:
            self._recalculate()
        return self._score

    @property
    def details(self):
        """Description of the result status"""
        if self._status is None:
            self._recalculate()
        return self._details

    def is_stddev(self):
        """Whether this result is "stddev" result (or mean)"""
        return self.testname.endswith("stddev")

    def is_error(self):
        """Whether this result is a runtime error"""
        return self.testname.endswith("error")

    @property
    def name(self):
        """Full test name"""
        return f"{self.classname}/{self.testname}"

    def __str__(self):
        if self.details:
            return (f"{STATUS_MAP[self.status]}: {self.name} {self.score:.2f} "
                    f"({self.details})")
        return f"{STATUS_MAP[self.status]}: {self.name}"

    def get_merged_name(self, merge):
        """
        Report full test name but replace parts specified in "merge" wiht '*'
        """
        if not merge:
            return self.name
        split_name = self._re_name.match(self.name)
        out = []
        out.append('*' if "profile" in merge else split_name[1])
        out.append('*' if "test" in merge else split_name[2])
        out.append('*' if "serial" in merge else split_name[3])
        iteration = split_name[4].split('-', 1)
        if len(iteration) == 2:
            iteration_name, iteration_name_extra = iteration
        else:
            iteration_name = iteration[0]
            iteration_name_extra = '*'
        out.append('*' if "iteration_name" in merge else iteration_name)
        out.append('*' if "iteration_name_extra" in merge
                   else iteration_name_extra)
        out.append('*' if "workflow" in merge else split_name[5])
        out.append('*' if "workflow_type" in merge else split_name[6])
        out.append('*' if "check_type" in merge else split_name[7])
        return (f"{out[0]}/{out[1]}/{out[2]}:./{out[3]}-{out[4]}/{out[5]}/"
                f"{out[6]}.{out[7]}")

    def _add(self, difference, weight, src):
        self.agg_diffs += difference * weight
        self.agg_weights += weight
        if src is not None:
            self.srcs.append(src)

    def add(self, suffix, name, difference, weight, src=None):
        """Add individual result"""
        # Reset status to re-calculate on next query
        self._status = None
        self._add(difference, weight, src)
        msg = f"{name}{suffix} {difference:.2F}%"
        if abs(difference) > self.tolerance:
            if difference > 0:
                self.big.append(msg)
            else:
                self.small.append(msg)
        else:
            self.good.append(msg)

    def add_bad(self, suffix, name, details, difference, weight, src=None):
        """Add a bad result"""
        self._add(difference, weight, src)
        self.error.append(f"{name}{suffix} {details}")


def iter_results_jsons(path, skip_incorrect=False):
    """
    Process runperf results and yield the result.json files
    """
    if skip_incorrect:
        result_name_glob = '[0-9]*'
    else:
        result_name_glob = '*'
    for src_path in glob.glob(os.path.join(path, '*', '*', result_name_glob,
                                           'result.json')):
        yield src_path


def iter_results_errors(path):
    """
    Process runperf results and yield the dirs with runperf errors
    """
    for level in range(4):
        level_path = (path,) + ('*',) * level + ('__error*__',)
        for src_path in glob.glob(os.path.join(*level_path)):
            yield level, src_path


def iter_results(path, skip_incorrect=False):
    """
    Process runperf results and yield individual results

    :param path: base path to runperf results
    :param skip_incorrect: don't yield incorrect results
    :yield result: tuple(test_name, score, is_primary)
    """
    def _find_all_result(test, results):
        for res in results:
            if res['client_hostname'] == 'all':
                return res
        logging.error("Unable to find "
                      "client_hostname==all"
                      " for %s", test)
        return None

    def _handle_iteration(data):
        primary_metrics = []
        test_params = {}
        for i, benchmark in enumerate(data['parameters'].get('benchmark',
                                                             [])):
            primary_metric = benchmark.get('primary_metric')
            if primary_metric:
                primary_metrics.append(primary_metric)
            test_params[i] = "\n".join(f"{item[0]}:{item[1]}"
                                       for item in benchmark.items())
        for i, benchmark in enumerate(data['parameters'].get('user',
                                                             [])):
            if "profile" in benchmark:
                test_params[f"user{i}"] = (f"profile: {benchmark['profile']}")
        for workflow in ('throughput', 'latency'):
            workflow_items = data.get(workflow, {}).items()
            for workflow_type, results in workflow_items:
                test = (f"{result_id}:./{iteration_name}/{workflow}/"
                        f"{workflow_type}.mean")
                res = _find_all_result(test, results)
                if not res:
                    continue
                primary = bool(workflow_type in primary_metrics)
                yield (f"{result_id}:./{iteration_name}/{workflow}/"
                       f"{workflow_type}.mean",
                       res['mean'],  # pylint: disable=W0631
                       primary,
                       test_params)
                yield (f"{result_id}:./{iteration_name}/{workflow}/"
                       f"{workflow_type}.stddev",
                       res['stddevpct'],  # pylint: disable=W0631
                       primary,
                       test_params)

    LOG.debug("Processing %s", path)
    # Process results
    for src_path in iter_results_jsons(path, skip_incorrect):
        with open(src_path, 'r', encoding="utf-8") as src_fd:
            src = json.load(src_fd)
        split_path = src_path.split(os.sep)
        result_id = "/".join(split_path[-4:-1])
        for src_result in src:
            iteration_name = src_result['iteration_name']
            if (skip_incorrect and
                    _RE_FAILED_ITERATION_NAME.match(iteration_name)):
                # Skip failed iterations
                continue
            yield from _handle_iteration(src_result['iteration_data'])
    # Process errors
    for level, src_path in iter_results_errors(path):
        split_path = src_path.split(os.sep)[-(level + 1): -1]
        split_path = split_path + ['*'] * (3 - level)
        result_id = "/".join(split_path)
        exc_path = os.path.join(src_path, 'exception')
        if os.path.exists(exc_path):
            with open(exc_path, encoding="utf-8") as exc_fd:
                exc = exc_fd.read()
        else:
            exc = '<Unknown exception>'
        yield(f"{result_id}:./ERROR/ERROR/ERROR.error", exc, True,
              utils.list_dir_hashes(src_path))


class Modifier:
    """
    Base class for post-analysis modification of results.

    For every reference build the `add_result` is called and the final result
    is then checked using the `check_result` method and appended as a new
    metrics to the test result.
    """
    # Weight of this modifier
    weight = 0

    def add_result(self, result):
        """
        Add reference result

        :param result: `result.Result` result to be processed
        """
        raise NotImplementedError

    def check_result(self, result):
        """
        Add this result and perform additional checks, reporting the findings

        :param result: `result.Result` result to be processed
        :return: [(check_name, difference, weight, source value), ...]
                 where source_value is an optional value correcting the source
                 value
        """
        raise NotImplementedError

    def _adjust_weight(self, count):
        """
        Adjust the weight based on the number of items
        """
        if count < 8:
            return self.weight / get_uncertainty(count)
        return self.weight


class AveragesModifier(Modifier):

    """
    Model that calculates averages of all builds
    """
    # Coefficient to catch multi-builds small regressions
    COEFFICIENT = 2

    def __init__(self, weight):
        self.averages = collections.defaultdict(lambda: [0, 0])
        self.weight = weight
        self.last = False

    def add_result(self, result):
        self.averages[result.name][0] += result.score
        self.averages[result.name][1] += 1
        return []

    def check_result(self, result):
        self.add_result(result)
        if result.name in self.averages:
            entry = self.averages[result.name]
            score = entry[0] / entry[1] * self.COEFFICIENT
            return [("avg", score, self._adjust_weight(entry[1]))]
        return []


class NOutOfResultsModifier(Modifier):

    """
    A model that allows N out of reference builds to fail

    It uses the `allowed_failures` to scale the current result's `tolerance`
    values to indicate in how many builds the current result failed.
    """

    def __init__(self, weight, allowed_failures):
        self.failures = collections.defaultdict(lambda: [0, 0])
        self.weight = weight
        self.allowed_failures = allowed_failures + 1
        if self.allowed_failures <= 0:
            self.allowed_failures = 1

    def add_result(self, result):
        entry = self.failures[result.name]
        entry[1] += 1
        if result.status < 0:
            entry[0] += 1
        return []

    def check_result(self, result):
        self.add_result(result)
        entry = self.failures[result.name]
        score = (entry[0] / self.allowed_failures *
                 result.tolerance * 1.1)
        return [("n_of", score if result.score > 0 else -score,
                 self._adjust_weight(entry[1]))]


class ResultsContainer:

    """
    Container to store multiple RelativeResults and provide various stats
    """

    def __init__(self, log, tolerance, stddev_tolerance, models,
                 src_name, src_path, modifiers):
        self.log = log
        self.tolerance = tolerance
        self.stddev_tolerance = stddev_tolerance
        self.models = models
        self.results = collections.OrderedDict()
        self.src_name = src_name
        self.src_results = {test: (score, primary, params)
                            for test, score, primary, params in iter_results(src_path, True)}
        for model in self.models:
            for test, params in model.model.items():
                if "mmin" in params and test in self.src_results:
                    self.src_results[test] = self.src_results[test] + ([params["mmax"], params["mmin"]],)
        self.src_metadata = self._parse_metadata(src_name, src_path)
        self.modifiers = modifiers

    def __iter__(self):
        return iter(self.results.values())

    def __len__(self):
        return len(self.results)

    def __reversed__(self):
        return reversed(self.results.values())

    @staticmethod
    def _parse_metadata(name, path):
        metadata_path = os.path.join(path, "RUNPERF_METADATA")
        metadata = collections.defaultdict(lambda: "Unknown")
        if os.path.exists(metadata_path):
            with open(metadata_path, encoding="utf-8") as src_metadata_fd:
                for line in src_metadata_fd:
                    if not line or line.startswith('#'):
                        continue
                    split_line = line.split(':', 1)
                    if len(split_line) != 2:
                        LOG.warning("Unable to parse metadata of %s: %s",
                                    name, line)
                        continue
                    metadata[split_line[0]] = split_line[1]
        return metadata

    def add_result_by_path(self, name, path, last=False, skip_incorrect=True):
        """
        Insert test result according to path hierarchy
        """
        metadata = self._parse_metadata(name, path)
        res = RelativeResults(self.log, self.tolerance, self.stddev_tolerance,
                              self.models, self.modifiers, metadata)
        src_tests = list(self.src_results.keys())
        for test, score, primary, params in iter_results(path, skip_incorrect):
            if test in src_tests:
                res.record_result(test, self.src_results[test][0],
                                  score, primary, params=params, last=last)
                src_tests.remove(test)
            else:
                res.record_broken(test, "Not present in source results "
                                  f"({score}).", primary, params)
        for missing_test in src_tests:
            res.record_broken(missing_test, "Not present in target results "
                              "(-100)", self.src_results[missing_test][1])
        self.results[name] = res
        return res


class RelativeResults:

    """
    Object to calculate and evaluate entries between two results.
    """

    def __init__(self, log, mean_tolerance, stddev_tolerance, models,
                 modifiers, metadata):
        self.log = log
        self.mean_tolerance = mean_tolerance
        self.stddev_tolerance = stddev_tolerance
        self.records = []
        self.grouped_records = []
        self.models = models
        self.modifiers = modifiers
        self.metadata = metadata

    def record(self, result, grouped=False):
        """Insert result into database"""
        if result.status >= 0:
            self.log.info(str(result))
        else:
            self.log.error(str(result))
        if grouped:
            self.grouped_records.append(result)
        else:
            self.records.append(result)
        return result

    def record_broken(self, test_name, details=None, primary=True, params=None):
        """Insert broken/corrupted result"""
        result = Result(test_name, -100, 1, primary, params)
        result.add_bad("", "", details, -100, 0, 0)
        self.record(result)

    def _calculate_test_difference(self, test_name, src, dst):
        """
        Calculate test difference and tolerance based on the test name

        :param test_name: full test name (str)
        :param src: reference (source) value
        :param dst: current (destination) value
        """
        if test_name.endswith("mean"):
            if src == 0:
                return 0, self.mean_tolerance
            return (float(dst) - src) / abs(src) * 100, self.mean_tolerance
        if test_name.endswith("stddev"):
            return src - dst, self.stddev_tolerance
        return 0 if src == dst else 1, 0

    def record_result(self, test_name, src, dst, primary=False, grouped=False,
                      difference=None, tolerance=None, params=None,
                      last=False):
        """
        Process result and insert it into database
        """
        if difference is None:
            difference, tolerance = self._calculate_test_difference(test_name,
                                                                    src, dst)

        result = Result(test_name, dst, tolerance, primary, params)
        raw_weight = 1
        for i, model in enumerate(self.models):
            for mresult in model.check_result(test_name, src, dst):
                raw_weight = 0
                result.add(i, *mresult)
        result.add("", "raw", difference, raw_weight, src)
        if last:
            for i, modifier in enumerate(self.modifiers):
                for mresult in modifier.check_result(result):
                    result.add("", *mresult)
        else:
            for i, modifier in enumerate(self.modifiers):
                for mresult in modifier.add_result(result):
                    result.add("", *mresult)
        return self.record(result, grouped=grouped)

    def get_xunit(self):
        """
        Log the header (execute last when dynamic number of tests)

        :param total_tests: Amount of executed tests (None=get from recrods)
        """

        def _str(text):
            return ''.join(_ if _ in PRINTABLE else f"\\x{ord(_):02x}"
                           for _ in str(text))

        document = Document()
        testsuite = document.createElement('testsuite')
        testsuite.setAttribute('name', 'runperf')
        testsuite.setAttribute('timestamp',
                               _str(datetime.datetime.now().isoformat()))
        document.appendChild(testsuite)
        errors = 0
        failures = 0
        skipped = 0
        for test in self.records + self.grouped_records:
            # Record only primary results
            if not test.primary:
                continue
            test_name = test.name.rsplit('/', 1)
            testcase = document.createElement('testcase')
            testcase.setAttribute('classname', _str(test_name[0]))
            testcase.setAttribute('name', _str(test_name[1]))
            testcase.setAttribute('time', "0.000")
            status = test.status
            if status < PASS:
                # Use SKIP for gain to better distinguish these in Jenkins
                if status == FAIL_GAIN:
                    skipped += 1
                    element_type = 'skipped'
                elif status in (FAIL, FAIL_LOSS):
                    failures += 1
                    element_type = 'failure'
                else:
                    errors += 1
                    element_type = 'error'
                element = document.createElement(element_type)
                element.setAttribute('type', _str(element_type))
                element.setAttribute('message', _str(test.details))
                testcase.appendChild(element)
            testsuite.appendChild(testcase)

        testsuite.setAttribute('tests', _str(len(self.records)))
        testsuite.setAttribute('errors', _str(errors))
        testsuite.setAttribute('failures', _str(failures))
        testsuite.setAttribute('skipped', _str(skipped))
        testsuite.setAttribute('time', "0.000")
        return document.toprettyxml(encoding='UTF-8')

    def per_type_stats(self, merge=None, primary_only=True):
        """
        Generate stats using merged results (eg. merge all fio-read tests)
        """

        all_means = collections.defaultdict(list)
        all_stddevs = collections.defaultdict(list)
        for record in self.records:
            if primary_only and record.primary is not True:
                continue
            result_id, result_type = (record.get_merged_name(merge)
                                      .rsplit('.', 1))
            if result_type == 'mean':
                all_means[result_id].append(record.score)
            elif result_type == 'stddev':
                all_stddevs[result_id].append(record.score)
            else:  # generic failure
                all_means[result_id].append(record.score)
                all_stddevs[result_id].append(record.score)
        return self.compute_statistics(all_means, all_stddevs)

    def compute_statistics(self, all_means, all_stddevs):
        """
        Calculate statistics for given means/stddevs
        """

        def _str(number):
            return f'{number:.1f}'

        # a+ => average aggregated mean gain
        # astd- => average aggregated stddev loss
        header = ("result_id", "|", "min", "1st", "med", "3rd",
                  "max", "a-", "a+", '|', "stdmin", "std1st",
                  "stdmed", "std3rd", "stdmax", "astd-", "astd+")

        results = []
        for key in set(tuple(all_means.keys()) + tuple(all_stddevs.keys())):
            means = all_means.get(key, [-100])
            stddevs = all_stddevs.get(key, [-100])
            avg_agg_loss = sum(_ for _ in means if _ < 0) / len(means)
            avg_agg_gain = sum(_ for _ in means if _ > 0) / len(means)
            avg_agg_std_loss = sum(_ for _ in stddevs if _ < 0) / len(stddevs)
            avg_agg_std_gain = sum(_ for _ in stddevs if _ > 0) / len(stddevs)
            results.append((key, '|',
                            _str(numpy.min(means)),
                            _str(numpy.percentile(means, 25)),
                            _str(numpy.median(means)),
                            _str(numpy.percentile(means, 75)),
                            _str(numpy.max(means)),
                            _str(avg_agg_loss),
                            _str(avg_agg_gain),
                            '|',
                            _str(numpy.min(stddevs)),
                            _str(numpy.percentile(stddevs, 25)),
                            _str(numpy.median(stddevs)),
                            _str(numpy.percentile(stddevs, 75)),
                            _str(numpy.max(stddevs)),
                            _str(avg_agg_std_loss),
                            _str(avg_agg_std_gain)))
        self.log.info("\n\nPer-result-id averages:\n%s\n\n",
                      utils.tabular_output(results, header))

    def sum_stats(self, primary_only=True):
        """
        Generate summary stats (min/median/max/average...)
        """

        def line_stats(values):
            if not values:  # [] is not supported for numpy.min...
                return [0] * 6
            return [len(values), f"{numpy.median(values):.1f}",
                    f"{numpy.median(values):.1f}", f"{numpy.min(values):.1f}",
                    f"{numpy.max(values):.1f}", f"{numpy.sum(values):.1f}",
                    f"{numpy.average(values):.1f}"]

        gains = []
        m_gains = []
        losses = []
        m_losses = []
        equals = []
        errors = 0
        for record in self.records:
            if primary_only and not record.primary:
                continue
            status = record.status
            if status == PASS:
                equals.append(record.score)
            elif status == MINOR_GAIN:
                m_gains.append(record.score)
            elif status == MINOR_LOSS:
                m_losses.append(record.score)
            elif status == FAIL_GAIN:
                gains.append(record.score)
            elif status == FAIL_LOSS:
                losses.append(record.score)
            else:
                errors += 1

        header = ["", "count", "med", "min", "max", "sum", "avg"]
        matrix = [["Total"] + line_stats(gains + m_gains + losses + m_losses +
                                         equals)]
        matrix.append(["Gains"] + line_stats(gains))
        matrix.append(["Minor gains"] + line_stats(m_gains))
        matrix.append(["Equals"] + line_stats(equals))
        matrix.append(["Minor losses"] + line_stats(m_losses))
        matrix.append(["Losses"] + line_stats(losses))
        matrix.append(["Errors", errors] + ([''] * (len(header) - 2)))
        self.log.info("\n\n%s\n\n", utils.tabular_output(matrix, header))

    def _expand_grouped_result(self, records, merge, last):
        """
        Calculate result entries as averages per group of results

        :param merge: What option should be merged into the same group
        """
        values = collections.defaultdict(list)
        for record in records:
            record_id = record.get_merged_name(merge)
            values[record_id].append(record.score)
        for test_name, values in values.items():
            value = numpy.average(values)
            # Use half of mean_tolerance * uncertainty
            tolerance = self.mean_tolerance * get_uncertainty(len(values)) / 2
            self.record_result(test_name, value, value, True, True,
                               value, tolerance, last=last)

    def expand_grouped_results(self, last=False):
        """
        Calculate pre-defined grouped results
        """
        records = [record for record in self.records
                   if record.primary and record.status != ERROR and
                   not record.is_stddev()]
        # iteration_name_extra only
        self._expand_grouped_result(records, ["iteration_name_extra"], last)
        # iteration_name_extra and profile
        self._expand_grouped_result(records, ["iteration_name_extra",
                                              "profile"], last)
        # everything but profile
        self._expand_grouped_result(records,
                                    ["test", "serial", "iteration_name",
                                     "iteration_name_extra", "workflow",
                                     "workflow_type"], last)

    def evaluate(self):
        """
        Process a default set of statistic on the results
        """
        self.expand_grouped_results(True)
        self.per_type_stats(["iteration_name_extra"])
        self.per_type_stats(["serial", "iteration_name",
                             "iteration_name_extra", "workflow"])
        self.per_type_stats(["test", "serial", "iteration_name",
                             "iteration_name_extra", "workflow",
                             "workflow_type"])

        self.sum_stats()

    def finish(self):
        """
        Evaluate processed results and report the status

        :return: 0 when everything is alright
                 2 when there are any failures (or group failures)
                 3 when no comparisons were performed (eg. all tests were
                 skipped)
        """
        failures = 0
        non_primary_failures = 0
        grouped_failures = 0
        for record in self.records:
            if record.status < 0:
                if record.primary:
                    failures += 1
                else:
                    non_primary_failures += 1
        for record in self.grouped_records:
            if record.status < 0:
                grouped_failures += 1
        if failures or grouped_failures:
            self.log.error("%s/%s/%s/%s primary/grouped/non-primary/all checks"
                           " failed, see logs for details", failures,
                           grouped_failures, non_primary_failures,
                           len(self.records) + len(self.grouped_records))
            return 2
        if not self.records:
            self.log.error("No comparisons performed")
            return 3
        if non_primary_failures:
            self.log.warning("%s/%s non-primary results failed.",
                             non_primary_failures, len(self.records))
        else:
            self.log.info("All %s checks were in limits", len(self.records))
        return 0


def closest_result(src_path, dst_path_groups, flatten_coefficient=1):
    """
    Compare results and find the one that has more results closer to the src
    one

    :param src_path: Path to the src result
    :param dst_paths: List of paths to results we are comparing to
    """
    def norm_normpdf(x, mean, sd):  # Using math symbols pylint: disable=C0103
        """
        Normalized normal probability density function

        This calculates the normal pdf and then multiplies it by standard
        deviation to always scale it as it the std was 1 (useful to compare
        results with different stds together as if they were alike)

        As a last step scale the probability from the highest ~0.4 to ~1 (there
        is still some rounding, but slightly above 1)
        """
        if x is None:
            return 0
        var = float(sd)**2
        denom = (2*math.pi*var)**.5
        num = math.exp(-(float(x)-float(mean))**2/(2*var))
        return (num/denom * sd) * 2.51

    def process_score(storage, selection):
        """
        Find the highest number in a $storage looking only on items specified
        in the $selection variable.
        """
        values = [storage[i] for i in selection]
        score = max(values)
        count = storage.count(score)
        LOG.info("Score: %s %s", score, values)
        if count == 1:
            for i in selection:
                if storage[i] == score:
                    return i
        return [i for i, value in enumerate(storage)
                if i in selection and value == score]

    def _process_results(dst_paths):
        storage = collections.defaultdict(
            lambda: [[None, None] for _ in range(len(dst_paths))])
        for idx, path in enumerate(dst_paths):
            for test, score, _, _ in iter_results(path, True):
                if test.endswith("stddev"):
                    # Skip stddev = 0 as that is basically no stddev
                    if score == 0:
                        continue
                    name = test[:-7]
                    storage[name][idx][1] = score
                else:
                    name = test.rsplit('.', 1)[0]
                    storage[name][idx][0] = score
        return storage

    def _calculate_stats(src, storage, groups):
        def _distance(i, score):
            """Calculate absolute distance while sanitizing odd values"""
            this_score = this[i][0]
            if this_score is None:
                return None
            try:
                return abs(this_score - score)
            except TypeError:
                return 0 if this_score == score else 1

        def _stddev_norm_scores(this, score, stddev):
            """
            We know the stddev of all samples of this test. To deal with
            uncertainty calculate the maximum standard deviation (out
            of stddev_pct) and use it to calculate the probability of
            the compare-with values.
            """
            max_stddev = max((_[1] * _[0] for _ in this
                              if _[1] is not None))
            if stddev:
                max_stddev = max(max_stddev, stddev * score)
            # We calculate max_stddev from stddevpct * score, divide by
            # 100 to move from pct to absolute value
            max_stddev = max_stddev / 100 * flatten_coefficient
            return [norm_normpdf(_[0], score, max_stddev) for _ in this]

        def _max_distance_norm_scores(this, score):
            """
            Normalize distance between min and max values of all samples
            (including src).
            """
            distances = [_distance(x, score) for x in range(len(this))]
            # Treat missing results by using 2x max distance
            min_distance = min(_ for _ in distances if _ is not None)
            max_distance = max(_ for _ in distances if _ is not None)
            if None in distances:
                if min_distance == max_distance:
                    _bad_distance = min_distance * 2
                else:
                    _bad_distance = max_distance * 2
                if min_distance == 0:
                    _bad_distance = 1
                distances = [_bad_distance if _ is None else _
                             for _ in distances]
            elif min_distance == max_distance:
                # Skip results where all distances are 0 (100% match for all)
                return None
            one_third_of_max_distance = max(distances) / 3
            # Normalize distance so they are within 0-3. That way we'd be able
            # to calculate normal distribution via e^(-1/2*x^2)
            norm_distances = [_ / one_third_of_max_distance for _ in distances]
            # Calculate the norm distance per each element using simplified
            # norm because we already normalized the distances to the range
            # of 0-3
            # Divide each element by 2 to decrease the significance of this
            # method to the stddev based one
            return [math.exp(-1/2 * distance ** 2) / 2
                    for distance in norm_distances]

        def _average_group_scores(norm_score, groups):
            grp_norm_score = []
            idx = 0
            for group_len in groups:
                if group_len == 1:
                    # Single result, just paste the value
                    grp_norm_score.append(norm_score[idx])
                    idx += 1
                    continue
                # Multiple results, calculate the average
                value = sum(norm_score[_]
                            for _ in range(idx, idx + group_len))
                grp_norm_score.append(value / group_len)
                idx = idx + group_len
            return grp_norm_score

        def _format_grp_list(values, groups):
            """Report values per groups"""
            out = []
            idx = 0
            for group_len in groups:
                end = idx + group_len
                out.append(values[idx:end])
                idx = end
            return out

        # stats is a list of per-cathegory similarities
        # [0] => distances of primary scores
        # [1] => distances of secondary scores
        stats = [[0] * no_results for _ in range(2)]
        # Iterate only through the src items as the missing tests from other
        # results should not affect the closenest of the current result.
        for test, value in src.items():
            score, primary, stddev = value
            if test not in storage:
                LOG.debug("%s: SKIP - not in any dst result", test)
                continue
            if not primary:
                this_cathegory = stats[1]
            else:
                this_cathegory = stats[0]
            this = storage[test]
            # Distances are in absolute values
            if stddev or any(True for _ in this if _[1] is not None):
                norm_score = _stddev_norm_scores(this, score, stddev)
            else:
                norm_score = _max_distance_norm_scores(this, score)
            if norm_score is None:
                LOG.debug("%s: SKIP - same distances", test)
                continue
            grp_norm_score = _average_group_scores(norm_score, groups)
            # Add current scores to the cathegory results. No need to average
            # or normalize again as we do normalize individual results
            for idx, result_score in enumerate(grp_norm_score):
                this_cathegory[idx] += result_score
            if primary:
                LOG.info("P %s: %s %s", test, grp_norm_score,
                         _format_grp_list(norm_score, groups))
            else:
                LOG.debug("S %s: %s %s", test, grp_norm_score,
                          _format_grp_list(norm_score, groups))
        return stats

    def _process_src(src_path):
        src = {}
        for test, score, primary, _ in iter_results(src_path, True):
            if test.endswith("stddev"):
                name = test[:-7]
                if name not in src:
                    src[name] = [None, primary, score]
                else:
                    src[name][2] = score
            else:
                name = test.rsplit('.', 1)[0]
                if name not in src:
                    src[name] = [score, primary, None]
                else:
                    src[name][0] = score
                    src[name][1] |= primary
        return src

    src = _process_src(src_path)
    dst_paths = [item for sublist in dst_path_groups for item in sublist]
    groups = [len(_) for _ in dst_path_groups]
    storage = _process_results(dst_paths)
    no_results = len(groups)
    stats = _calculate_stats(src, storage, groups)
    selection = range(no_results)
    for i, values in enumerate(stats):
        ret = process_score(values, selection)
        if isinstance(ret, int):
            return ret
        LOG.warning("Unable to resolve on level %s, consider tweaking the "
                    "flatten coefficient.", i)
        selection = ret
    return selection[0]