distributed-system-analysis/run-perf

View on GitHub
runperf/__init__.py

Summary

Maintainability
A
1 hr
Test Coverage
B
88%
#!/bin/env python3
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2018
# Author: Lukas Doktor <ldoktor@redhat.com>

from argparse import ArgumentParser, Action
import collections
import glob
import hashlib
import json
import logging
import os
import re
import shutil
import sys
import threading
import time

import aexpect

from . import exceptions, tests, result, utils
from .machine import Controller
from .version import __version__
from .utils import CONTEXT

PROG = 'run-perf'
DESCRIPTION = ("A tool to execute the same tasks on pre-defined scenarios/"
               "profiles and store the results together with metadata in "
               "a suitable structure for compare-perf to compare them.")


def get_abs_path(path):
    """
    Return absolute path to a given location
    """
    return os.path.abspath(os.path.expanduser(path))


def parse_host(host):
    """
    Go through hosts and split them by ':' to get name:addr

    When name not supplied, uses first part of the provided addr
    """
    if ':' in host:
        return host.split(':', 1)
    return (host.split('.', 1)[0], host)


class DictAction(Action):
    """
    Split items by '=' and store them as a single dictionary
    """

    def __call__(self, parser, namespace, values, option_string=None):

        def split_metadata(item):
            """Split item to key=value pairs"""
            split = item.split('=', 1)
            if len(split) != 2:
                raise ValueError(f"Unable to parse key=value pair from {item}")
            return split

        dictionary = dict(split_metadata(_) for _ in values)
        if isinstance(getattr(namespace, self.dest, None), dict):
            getattr(namespace, self.dest).update(dictionary)
        else:
            setattr(namespace, self.dest, dictionary)


def item_with_params(item):
    """Deserialize item with optional params argument"""
    _item = item.split(':', 1)
    if len(_item) == 2:
        return _item[0], json.loads(_item[1])
    return item, {}


def _parse_args():
    """
    Define parser and return parsed args
    """
    base_dir = os.path.abspath(os.path.dirname(__file__))
    parser = ArgumentParser(prog=PROG,
                            description=DESCRIPTION)
    parser.add_argument("tests", help="Set of tests to be executed; one can "
                        "optionally specify extra params using json format "
                        "separated by `:` (eg. 'fio:{\"type\":\"read\"}'",
                        nargs='+', type=item_with_params)
    parser.add_argument("--profiles", help="Which profiles to use to execute "
                        "the tests (some might require reboot); one can "
                        "optionally specify extra params using json format "
                        "separated by `:` (eg. DefaultLibvirt:"
                        "{\"RUNPERF_TESTS\": \"fio\"}) to tweak the set of "
                        "tests executed under this profile or other cusom "
                        "profile attributes (like qemu path location, ...)",
                        nargs='+', default=[('default', {})],
                        type=item_with_params)
    parser.add_argument("--distro", help="Set the host distro name, eg. "
                        "RHEL-8.0-20180904.n.0", default="Unknown")
    parser.add_argument("--guest-distro", help="Guest distro (default is the "
                        "same as host)")
    parser.add_argument("--hosts", help="Host to be provisioned; optionally "
                        "you can follow hostname by ':' and add human-readable"
                        "name for logging purposes (by default localhost)",
                        default=[("localhost", "127.0.0.1")], nargs='+',
                        type=parse_host)
    parser.add_argument("--provisioner", help="Use plugin to provision the "
                        "hosts", type=item_with_params)
    parser.add_argument("--host-rpms", help="Url/path(s) to rpm packages or "
                        "page with links to rpms to be installed on host via "
                        "'dnf install -y '", nargs="+")
    parser.add_argument("--worker-rpms", help="Url/path(s) to rpm packages or "
                        "page with links to rpms to be installed on workers "
                        "via 'dnf install -y '", nargs="+")
    parser.add_argument("--keep-tmp-files", action="store_true", help="Keep "
                        "the temporary files (local/remote)")
    parser.add_argument("--output",
                        help="Force output directory (%(default)s)",
                        default=f"./result_{time.strftime('%Y%m%d_%H%M%S')}",
                        type=get_abs_path)
    parser.add_argument("--force-params", help="Override params related to "
                        "host/guest configuration which is usually defined "
                        "in YAML files (use json directly)",
                        type=json.loads)
    parser.add_argument("--paths", help="List of additional paths to be used "
                        "for looking for hosts/assets/...", nargs='+',
                        default=[base_dir])
    parser.add_argument("--default-passwords", help="List of default passwords"
                        " to try/use when asked for.", nargs='+')
    parser.add_argument("--retry-tests", help="How many times to try "
                        "re-execute tests on failure (%(default)s)", default=3,
                        type=int)
    parser.add_argument("--host-setup-script", help="Path to a file that will "
                        "be copied to all hosts and executed as part of "
                        "setup (just after provisioning and before "
                        "ssh key generation)")
    parser.add_argument("--host-setup-script-reboot", help="Reboot after "
                        "executing setup script?", action="store_true")
    parser.add_argument("--worker-setup-script", help="Path to a file that "
                        "will be copied to all workers and executed as part of"
                        " their setup")
    parser.add_argument("--metadata", nargs="+", action=DictAction,
                        help="Build metadata to be attached to test results "
                        "using key=value syntax", default={})
    logging_argparse(parser)
    args = parser.parse_args()
    # Add PATHs
    if base_dir not in args.paths:
        args.paths.append(base_dir)
    return args


def logging_argparse(parser):
    """
    Define logging argparse arguments
    """
    parser.add_argument("--log", help="Store debug log to a file")
    parser.add_argument("--verbose", "-v", action="count", default=0,
                        help="Increase the stderr verbosity level")


def logging_setup(args, fmt=None):
    """
    Setup logging according to args
    """
    def add_handler(log, handler, level, formatter):
        handler.setFormatter(formatter)
        handler.setLevel(level)
        log.addHandler(handler)

    if args.verbose >= 2:
        log_level = logging.DEBUG
    elif args.verbose >= 1:
        log_level = logging.INFO
    else:
        log_level = logging.WARN
    if fmt is None:
        fmt = '%(asctime)s.%(msecs)03d: %(name)-15s: %(message)s'

    root = logging.getLogger('')
    root.setLevel(logging.DEBUG)
    formatter = logging.Formatter(fmt, datefmt="%H:%M:%S")

    add_handler(root, logging.StreamHandler(sys.stderr), log_level, formatter)
    if args.log:
        add_handler(root, logging.FileHandler(args.log), logging.DEBUG,
                    formatter)


def create_metadata(output_dir, args):
    """
    Generate RUNPERF_METADATA in this directory
    """
    def mask_arguments(cmd, i):
        while i < len(cmd) and not cmd[i].startswith("-"):
            cmd[i] = "MASKED"
            i += 1
    with open(os.path.join(output_dir, "RUNPERF_METADATA"), "w",
              encoding="utf-8") as output:
        # First write all the custom metadata so they can be eventually
        # overridden by our hardcoded values
        if args.metadata:
            output.write("".join(f"{_[0]}:{_[1]}\n"
                                 for _ in args.metadata.items()))
        # Now store certain hardcoded values
        output.write(f"distro:{args.distro}\n")
        if args.guest_distro is None or args.guest_distro == args.distro:
            output.write("guest_distro:DISTRO\n")
        else:
            output.write(f"guest_distro:{args.guest_distro}\n")
        output.write(f"runperf_version:{__version__}\n")
        cmd = list(sys.argv)
        for i in range(len(cmd)):  # pylint: disable=C0200
            this = cmd[i]
            if this == "--distro":
                cmd[i + 1] = "DISTRO"
            elif this == "--guest-distro":
                cmd[i + 1] = "GUEST_DISTRO"
            elif this in ("--default-password", "--metadata"):
                mask_arguments(cmd, i + 1)
            elif this in ("--host-setup-script", "--worker-setup-script"):
                with open(cmd[i + 1], 'rb') as script:
                    cmd[i + 1] = "sha1:"
                    cmd[i + 1] += hashlib.sha1(script.read()).hexdigest()[:6]  # nosec
        output.write(f"runperf_cmd:{' '.join(cmd)}\n")
        output.write(f"machine:{','.join(_[1] for _ in args.hosts)}")
        if "machine_url_base" in args.metadata:
            url_base = args.metadata["machine_url_base"]
            urls = (url_base % {"machine": host[1]}
                    for host in args.hosts)
            output.write(f"\nmachine_url:{','.join(urls)}")
        else:
            output.write(f"\nmachine_url:{args.hosts[0][1]}")


def profile_test_defs(profile_args, default_set):
    """
    Process profile args and return suitable test set

    :param profile_args: profile arguments
    :param default_set: default set of test definitions
    :return: list of test definitions
    """
    if 'RUNPERF_TESTS' not in profile_args:
        return default_set
    testset = profile_args.get('RUNPERF_TESTS')
    defs = []
    for test in testset:
        if test == '$@':
            defs.extend(default_set)
        elif isinstance(test, list):
            defs.append(tests.get(test[0], test[1]))
        else:
            defs.append(tests.get(test, {}))
    return defs


def main():
    """
    A tool to execute the same tasks on pre-defined scenarios/
    profiles and store the results together with metadata in
    a suitable structure for compare-perf to compare them.
    """
    args = _parse_args()
    logging_setup(args)

    log = logging.getLogger("controller")
    # create results (or re-use if asked for)
    if os.path.exists(args.output):
        CONTEXT.set_root(args.output, "Removing previously existing results: "
                         f"{args.output}")
        shutil.rmtree(args.output)
    else:
        CONTEXT.set_root(args.output, f"Creating results: {args.output}")
    try:
        os.makedirs(args.output)
    except FileExistsError:
        pass
    create_metadata(args.output, args)

    hosts = None
    try:
        # Initialize all hosts
        hosts = Controller(args, log)
        _test_defs = list(tests.get(test, extra) for test, extra in args.tests)
        # provision, fetch assets, ...
        hosts.setup()
        try:
            CONTEXT.set(0, "__sysinfo_before__")
            hosts.fetch_logs(CONTEXT.get())
        except Exception as exc:    # pylint: disable=W0703
            utils.record_failure(CONTEXT.get(), exc)
        for profile, profile_args in args.profiles:
            CONTEXT.set_level(0)
            # Check whether this profile changes test set
            test_defs = profile_test_defs(profile_args, _test_defs)
            # Applies profile and set `hosts.workers` to contain list of IP
            # addrs to be used in tests. It might retry on failure
            for i in range(args.retry_tests):
                try:
                    workers = hosts.apply_profile(profile, profile_args)
                    break
                except exceptions.StepFailed:
                    try:
                        hosts.revert_profile()
                    except Exception:   # pylint: disable=W0703
                        pass
            else:
                log.error("ERROR applying profile %s, all tests will be "
                          "SKIPPED!", profile)
                continue

            # Run all tests under current profile
            profile_path = os.path.join(args.output, hosts.profile)
            for test, extra in test_defs:
                for i in range(args.retry_tests):
                    try:
                        hosts.run_test(test, workers, extra)
                        break
                    except (AssertionError, aexpect.ExpectError,
                            aexpect.ShellError, RuntimeError) as details:
                        msg = (f"test {test.test}@{hosts.profile} attempt {i} "
                               f"execution failure: {details}")
                        utils.record_failure(os.path.join(profile_path,
                                                          test.test, str(i)),
                                             details, details=msg)
                else:
                    log.error("ERROR running %s@%s, test will be SKIPPED!",
                              test.test, hosts.profile)
            # Fetch logs
            try:
                CONTEXT.set(1, "__sysinfo__")
                hosts.fetch_logs(CONTEXT.get())
            except Exception as exc:    # pylint: disable=W0703
                utils.record_failure(os.path.join(args.output, hosts.profile),
                                     exc)
            # Revert profile changes. In case manual reboot is required return
            # non-zero.
            CONTEXT.set_level(1, "Reverting profile")
            hosts.revert_profile()
        # Remove unnecessary files
        hosts.cleanup()
        aexpect.kill_tail_threads()
    except Exception as exc:
        CONTEXT.set_level(0, "Handling root exception")
        utils.record_failure(CONTEXT.get(), exc)
        if args.keep_tmp_files:
            log.error("Exception %s, asked not to cleanup by --keep-tmp-files",
                      exc)
        else:
            log.error("Exception %s, cleaning up resources", exc)
            if hosts:
                hosts.cleanup()
        if len(threading.enumerate()) > 1:
            threads = threading.enumerate()
            if any("pydevd.Reader" in str(_) for _ in threads):
                logging.warning("Background threads %s present but 'pydev' "
                                "thread detected, not killing anything",
                                threads)
            else:
                log.warning("Background threads present, killing: %s",
                            threading.enumerate())
                aexpect.kill_tail_threads()
                os.kill(0, 15)
        raise


class ComparePerf:

    """
    Compares run-perf results. With multiple ones it adjusts the limits
    according to their spread.
    """

    _RE_FAILED_ITERATION_NAME = re.compile(r'.*-fail(\d+)$')

    def __init__(self):
        self.log = logging.getLogger("compare")

    @staticmethod
    def _get_name_and_path(arg):
        """
        Parse [name:]path definition
        """
        split_arg = arg.split(':', 1)
        if len(split_arg) == 2:
            if os.path.exists(split_arg[1]):
                return split_arg[0], get_abs_path(split_arg[1])
        if os.path.exists(arg):
            return arg, arg
        if len(split_arg) == 2:
            raise ValueError("None of possible paths exists:\n"
                             f"{split_arg[1]}\n{arg}")
        raise ValueError(f"Path {arg} does not exists")

    def __call__(self):
        """
        Runs the comparison
        """
        parser = ArgumentParser(prog="compare-perf",
                                description="Tool to compare run-perf results")
        parser.add_argument("results", help="Path to run-perf results; when "
                            "multiple results are specified the first one "
                            "is used as the source result, the last one as"
                            "destination result and the middle ones are "
                            "only used as a reference.",
                            nargs="+", type=self._get_name_and_path)
        parser.add_argument("--include-incorrect-results", action="store_true",
                            help="Include incorrect/partial results (by "
                            "default we only include [0-9]* iterations)")
        parser.add_argument("--tolerance", "-t", help="Acceptable tolerance "
                            "(+-%(default)s%%)", default=5, type=float)
        parser.add_argument("--stddev-tolerance", "-s", help="Acceptable "
                            "standard deviation tolerance (+-%(default)s%%)",
                            default=5, type=float)
        parser.add_argument("--model-builds-average", help="Calculate "
                            "average value of all reference builds and "
                            "compare it to the source value. Specify the "
                            "weight of this model. Note the weight might be "
                            "adjusted based on the number of builds "
                            "(when no builds < 8) [%(default)s]", type=float,
                            default=0)
        parser.add_argument("--model-linear-regression", "-l", help="Use "
                            "linear regression model for matching results",
                            nargs='+', default=[])
        parser.add_argument("--n-out-of-results", help="Weight of the "
                            "check that looks at how many times each test "
                            "failed within reference builds.",
                            type=float, default=0)
        parser.add_argument("--n-out-of-results-n", help="How many builds "
                            "can fail to report PASS", type=int, default=2)
        parser.add_argument("--html", help="Create a single-file HTML report "
                            "in the provided path.")
        parser.add_argument("--html-with-charts", action="store_true",
                            help="Generate charts in the html results")
        parser.add_argument("--html-small-file", help="Do not include the "
                            "full environments and such to minimize the report"
                            "size.", action="store_true")
        parser.add_argument("--xunit", help="Write XUnit/JUnit results to "
                            "specified file.")
        logging_argparse(parser)
        args = parser.parse_args()
        logging_setup(args, "%(levelname)-5s| %(message)s")
        models = []
        modifiers = []
        for path in args.model_linear_regression:
            model = result.ModelLinearRegression(args.tolerance,
                                                 args.stddev_tolerance,
                                                 path)
            models.append(model)
        if args.model_builds_average:
            modifiers.append(result.AveragesModifier(
                args.model_builds_average))
        if args.n_out_of_results:
            modifiers.append(result.NOutOfResultsModifier(
                args.n_out_of_results,
                args.n_out_of_results_n))
        results = result.ResultsContainer(self.log, args.tolerance,
                                          args.stddev_tolerance,
                                          models,
                                          args.results[0][0],
                                          args.results[0][1],
                                          modifiers)
        skip_incorrect = not args.include_incorrect_results
        for name, path in args.results[1:-1]:
            res = results.add_result_by_path(name, path,
                                             skip_incorrect=skip_incorrect)
            res.expand_grouped_results()
        res = results.add_result_by_path(args.results[-1][0],
                                         args.results[-1][1], last=True,
                                         skip_incorrect=skip_incorrect)
        if args.xunit:
            with open(args.xunit, 'wb') as xunit_fd:
                xunit_fd.write(res.get_xunit())
            self.log.info("XUnit results written to %s", args.xunit)
        res.evaluate()
        if args.html:
            # Import this only when needed to prevent optional deps
            from . import html_report  # pylint: disable=C0415
            self.log.debug("Generating HTML report: %s", args.html)
            html_report.generate_report(args.html, results,
                                        args.html_with_charts,
                                        args.html_small_file)
        return res.finish()


class DiffPerf:

    """
    Compares multipl run-perf and reports the index of the closest one.
    """

    _RE_FAILED_ITERATION_NAME = re.compile(r'.*-fail(\d+)$')

    def __init__(self):
        self.log = logging.getLogger("compare")

    @staticmethod
    def _abs_path(arg):
        """
        Parse [name:]path definition
        """
        if os.path.exists(arg):
            return arg
        raise ValueError(f"Path {arg} does not exists")

    def __call__(self):
        """
        Runs the comparison
        """
        parser = ArgumentParser(prog="diff-perf",
                                description="Compares multiple results and "
                                "reports the closest results (or group) to "
                                "the src one. "
                                "The exit number corresponds to the index "
                                "of the result (or group) for indexes up to "
                                "253. The "
                                "return number 254 means any higher index "
                                "and 255 other failure.")
        parser.add_argument("results", help="Path to run-perf results; first "
                            "one is the src result we are comparing the other "
                            "results to", nargs="+", type=self._abs_path)
        parser.add_argument("--group", "-g", nargs="+", action="append",
                            help="Add groups of results by '-g result1 result2"
                            "-g result3 -g result4 result5'; diff-perf will "
                            "average the result of the same group; "
                            "groups are appended after the results "
                            "specified by the positional arguments so the "
                            "return code will match individual results "
                            "or group.")
        parser.add_argument("--flatten-coefficient", type=float,
                            help="Coefficient used to flatten the probability "
                            "curve based on the standard deviation. "
                            "(%(default)s)", default=1)
        logging_argparse(parser)
        args = parser.parse_args()
        logging_setup(args, "%(levelname)-5s| %(message)s")
        src = args.results[0]
        groups = [[_] for _ in args.results[1:]]
        if args.group:
            groups.extend(args.group)
        if (len(groups) < 2):
            raise RuntimeError("Please specify at least one src and two dst ("
                               "or group of dst) results")
        return result.closest_result(src, groups, args.flatten_coefficient)


class AnalyzePerf:
    """
    Class to allow result analysis/model creation
    """

    _RE_FAILED_ITERATION_NAME = re.compile(r'.*-fail(\d+)$')

    def __init__(self):
        self.result = None
        self.log = logging.getLogger("compare")

    def __call__(self):
        """
        Runs the comparison
        """

        def csv_safe_str(text):
            """Turn text into csv string removing special characters"""
            safe_text = str(text).replace(',', '_').replace('"', '_')
            return f'"{safe_text}"'

        parser = ArgumentParser(prog="to-csv",
                                description="Tool to export run-perf results "
                                "to csv")
        parser.add_argument("results", help="Path to run-perf results",
                            nargs='+', type=get_abs_path)
        parser.add_argument("-c", "--csv", help="Dump primary results to "
                            "given csv file.")
        parser.add_argument("-l", "--linear-regression", help="Generate "
                            "per-test linear regression model using min-max "
                            "range defined from -t|--tolerance. Recommended "
                            "tolerance is 8 for 2 results and 4 for 5+ "
                            "results.")
        parser.add_argument("-s", "--stddev-linear-regression", help="Generate"
                            " per-test linear regression model mapping "
                            "avg (+/-)3x stddev as (min/max). Recomended "
                            "tolerance values are -4; +4.")
        parser.add_argument("--rebase-model", help="Provide path to a source "
                            "model which will be used to set the acceptable "
                            "deviation but update the raw value based on the "
                            "values from the provided results. Tests not "
                            "present in the old model will be trained based "
                            "on the newly provided results, tests missing in "
                            "the new results will be kept unmodified. This "
                            "is useful when we don't have enough results "
                            "after a perf change that changed only the median "
                            "value but kept the deviation (eg. 10%% "
                            "improvement with a similar jitter)")
        parser.add_argument("-t", "--tolerance", help="Tolerance (-x,+x) used "
                            "by models, by default (%(default)s)",
                            default=4, type=float)
        logging_argparse(parser)
        args = parser.parse_args()
        logging_setup(args, "%(levelname)-5s| %(message)s")

        primary = set()
        storage = {}
        result_names = set()
        for path in args.results:
            results_name = os.path.basename(path)
            result_names.add(results_name)
            for test, score, prim, _ in result.iter_results(path, True):
                if prim:
                    primary.add(test)
                if test not in storage:
                    storage[test] = {}
                storage[test][results_name] = score
        csv = None
        models = []
        try:
            if args.csv:
                csv = open(args.csv, 'w', encoding="utf-8")  # pylint: disable=R1732
            if args.linear_regression:
                models.append((open(args.linear_regression, 'w',  # pylint: disable=R1732
                                    encoding="utf-8"),
                               result.ModelLinearRegression))
            if args.stddev_linear_regression:
                models.append((open(args.stddev_linear_regression, 'w',  # pylint: disable=R1732
                                    encoding="utf-8"),
                               result.ModelStdev))
            result_names = sorted(result_names)
            if csv:
                csv.write("test," + ",".join(csv_safe_str(_)
                                             for _ in result_names))
                for test in sorted(storage.keys()):
                    if test not in primary:
                        continue
                    test_results = storage.get(test, {})
                    csv.write(f"\n{test},")
                    for result_name in result_names:
                        csv.write(str(test_results.get(result_name, -100)) +
                                  ',')
            for fd_model, klass in models:
                model = klass(args.tolerance, args.tolerance,
                              args.rebase_model)
                if args.rebase_model:
                    trained_model = model.rebase(storage)
                else:
                    trained_model = model.identify(storage)
                json.dump(trained_model, fd_model, indent=4)
        finally:
            if csv:
                csv.close()
            for fd_model, _ in models:
                fd_model.close()


class StripPerf:
    """
    Class to cherry-pick only the data used by run-perf tools useful for
    later analysis.
    """

    def __init__(self):
        self.result = None
        self.log = logging.getLogger("strip")

    def __call__(self):
        """
        Perform the stripping
        """
        parser = ArgumentParser(prog="strip-run-perf",
                                description="Tool to cherry-pick only the data"
                                "used by run-perf tools for later analysis")
        parser.add_argument("src", help="Path to run-perf results",
                            type=get_abs_path)
        parser.add_argument("dst", help="Path to put stripped results to.")
        parser.add_argument("-i", "--include-incorrect", help="Include "
                            "the incorrect results (usually failed ones)",
                            action="store_true")
        parser.add_argument("-s", "--attach-sysinfo", help="Copy the assets "
                            "of failed the results", action="store_true")
        logging_argparse(parser)
        args = parser.parse_args()
        logging_setup(args, "%(levelname)-5s| %(message)s")
        os.makedirs(args.dst, exist_ok=True)
        # Main metadata
        metadata_path = os.path.join(args.src, "RUNPERF_METADATA")
        if os.path.exists(metadata_path):
            shutil.copy(metadata_path,
                        os.path.join(args.dst, "RUNPERF_METADATA"))
        # Results
        for src_json in result.iter_results_jsons(args.src,
                                                  not args.include_incorrect):
            dst_path = self.process_result_json(src_json, args.dst)
            self.process_result_metadata(os.path.dirname(src_json), dst_path)
        # Exceptions
        for level, src_path in result.iter_results_errors(args.src):
            split_path = src_path.split(os.sep)[-(level + 1):]
            result_id = "/".join(split_path)
            shutil.copytree(src_path, os.path.join(args.dst, result_id),
                            dirs_exist_ok=True)
        # Sysinfo
        if args.attach_sysinfo:
            self.process_sysinfo(args.src, args.dst)

    @staticmethod
    def process_result_json(src_path, dst_base):
        """Gather result.json data"""
        def get_workflow_type_data(src_workflow):
            out = []
            # Avoid including per-host results on single worker
            only_all = bool(len(src_workflow) == 2 and
                            any(_.get("client_hostname") == "all"
                                for _ in src_workflow))
            for src in src_workflow:
                if only_all and src.get("client_hostname") != "all":
                    continue
                this = {}
                for include in ("client_hostname", "mean", "stddevpct"):
                    if include in src:
                        this[include] = src[include]
                out.append(this)
            return out

        def get_iteration_data(src_iteration):
            params = src_iteration["iteration_data"]["parameters"]
            iteration = {"iteration_name": src_iteration["iteration_name"],
                         "iteration_data": {"parameters": params}}
            iteration_data = iteration["iteration_data"]
            src_iteration_data = src_iteration["iteration_data"]
            for workflow in ('throughput', 'latency'):
                if workflow not in src_iteration_data:
                    continue
                iteration_data[workflow] = {}
                workflow_items = src_iteration_data[workflow].items()
                for workflow_type, results in workflow_items:
                    workflow_data = get_workflow_type_data(results)
                    iteration_data[workflow][workflow_type] = workflow_data
            return iteration

        with open(src_path, 'r', encoding="utf-8") as src_fd:
            src = json.load(src_fd)
        result_id = os.sep.join(src_path.split(os.sep)[-4:])
        res = []
        for src_iteration in src:
            if "iteration_name" not in src_iteration:
                continue
            if "iteration_data" not in src_iteration:
                continue
            if "parameters" not in src_iteration["iteration_data"]:
                continue
            iteration = get_iteration_data(src_iteration)
            res.append(iteration)
        dst_json = os.path.join(dst_base, result_id)
        dst_path = os.path.dirname(dst_json)
        os.makedirs(dst_path, exist_ok=True)
        with open(dst_json, 'w', encoding="utf-8") as dst:
            json.dump(res, dst)
        return dst_path

    @staticmethod
    def process_result_metadata(src_path, dst_path):
        """Gather RUNPERF_METADATA.json"""
        rp_path = os.path.join(src_path, "RUNPERF_METADATA.json")
        if os.path.exists(rp_path):
            shutil.copy(rp_path,
                        os.path.join(dst_path, "RUNPERF_METADATA.json"))

    @staticmethod
    def process_sysinfo(src_path, dst_path):
        """Gather __sysinfo*__ files (global and profile)"""
        # Global level
        for src in glob.glob(os.path.join(src_path, '__sysinfo*__')):
            sysinfo_dir = os.path.basename(src)
            shutil.copytree(src, os.path.join(dst_path, sysinfo_dir),
                            dirs_exist_ok=True)
        # Profile level
        for src in glob.glob(os.path.join(src_path, '*', '__sysinfo*__')):
            profile_dir, sysinfo_dir = src.rsplit(os.sep, 2)[-2:]
            shutil.copytree(src,
                            os.path.join(dst_path, profile_dir, sysinfo_dir),
                            dirs_exist_ok=True)