ionelmc/pytest-benchmark

View on GitHub
src/pytest_benchmark/stats.py

Summary

Maintainability
B
6 hrs
Test Coverage
import operator
import statistics
from bisect import bisect_left
from bisect import bisect_right

from .utils import cached_property
from .utils import funcname
from .utils import get_cprofile_functions


class Stats:
    fields = (
        'min',
        'max',
        'mean',
        'stddev',
        'rounds',
        'median',
        'iqr',
        'q1',
        'q3',
        'iqr_outliers',
        'stddev_outliers',
        'outliers',
        'ld15iqr',
        'hd15iqr',
        'ops',
        'total',
    )

    def __init__(self):
        self.data = []

    def __bool__(self):
        return bool(self.data)

    def __nonzero__(self):
        return bool(self.data)

    def as_dict(self):
        return {field: getattr(self, field) for field in self.fields}

    def update(self, duration):
        self.data.append(duration)

    @cached_property
    def sorted_data(self):
        return sorted(self.data)

    @cached_property
    def total(self):
        return sum(self.data)

    @cached_property
    def min(self):
        return min(self.data)

    @cached_property
    def max(self):
        return max(self.data)

    @cached_property
    def mean(self):
        return statistics.mean(self.data)

    @cached_property
    def stddev(self):
        if len(self.data) > 1:
            return statistics.stdev(self.data)
        else:
            return 0

    @property
    def stddev_outliers(self):
        """
        Count of StdDev outliers: what's beyond (Mean - StdDev, Mean - StdDev)
        """
        count = 0
        q0 = self.mean - self.stddev
        q4 = self.mean + self.stddev
        for val in self.data:
            if val < q0 or val > q4:
                count += 1
        return count

    @cached_property
    def rounds(self):
        return len(self.data)

    @cached_property
    def median(self):
        return statistics.median(self.data)

    @cached_property
    def ld15iqr(self):
        """
        Tukey-style Lowest Datum within 1.5 IQR under Q1.
        """
        if len(self.data) == 1:
            return self.data[0]
        else:
            return self.sorted_data[bisect_left(self.sorted_data, self.q1 - 1.5 * self.iqr)]

    @cached_property
    def hd15iqr(self):
        """
        Tukey-style Highest Datum within 1.5 IQR over Q3.
        """
        if len(self.data) == 1:
            return self.data[0]
        else:
            pos = bisect_right(self.sorted_data, self.q3 + 1.5 * self.iqr)
            if pos == len(self.data):
                return self.sorted_data[-1]
            else:
                return self.sorted_data[pos]

    @cached_property
    def q1(self):
        rounds = self.rounds
        data = self.sorted_data

        # See: https://en.wikipedia.org/wiki/Quartile#Computing_methods
        if rounds == 1:
            return data[0]
        elif rounds % 2:  # Method 3
            n, q = rounds // 4, rounds % 4
            if q == 1:
                return 0.25 * data[n - 1] + 0.75 * data[n]
            else:
                return 0.75 * data[n] + 0.25 * data[n + 1]
        else:  # Method 2
            return statistics.median(data[: rounds // 2])

    @cached_property
    def q3(self):
        rounds = self.rounds
        data = self.sorted_data

        # See: https://en.wikipedia.org/wiki/Quartile#Computing_methods
        if rounds == 1:
            return data[0]
        elif rounds % 2:  # Method 3
            n, q = rounds // 4, rounds % 4
            if q == 1:
                return 0.75 * data[3 * n] + 0.25 * data[3 * n + 1]
            else:
                return 0.25 * data[3 * n + 1] + 0.75 * data[3 * n + 2]
        else:  # Method 2
            return statistics.median(data[rounds // 2 :])

    @cached_property
    def iqr(self):
        return self.q3 - self.q1

    @property
    def iqr_outliers(self):
        """
        Count of Tukey outliers: what's beyond (Q1 - 1.5IQR, Q3 + 1.5IQR)
        """
        count = 0
        q0 = self.q1 - 1.5 * self.iqr
        q4 = self.q3 + 1.5 * self.iqr
        for val in self.data:
            if val < q0 or val > q4:
                count += 1
        return count

    @cached_property
    def outliers(self):
        return f'{self.stddev_outliers};{self.iqr_outliers}'

    @cached_property
    def ops(self):
        if self.total:
            return self.rounds / self.total
        return 0


class Metadata:
    def __init__(self, fixture, iterations, options):
        self.name = fixture.name
        self.fullname = fixture.fullname
        self.group = fixture.group
        self.param = fixture.param
        self.params = fixture.params
        self.extra_info = fixture.extra_info
        self.cprofile_stats = fixture.cprofile_stats

        self.iterations = iterations
        self.stats = Stats()
        self.options = options
        self.fixture = fixture

    def __bool__(self):
        return bool(self.stats)

    def __nonzero__(self):
        return bool(self.stats)

    def get(self, key, default=None):
        try:
            return getattr(self.stats, key)
        except AttributeError:
            return getattr(self, key, default)

    def __getitem__(self, key):
        try:
            return getattr(self.stats, key)
        except AttributeError:
            return getattr(self, key)

    @property
    def has_error(self):
        return self.fixture.has_error

    def as_dict(self, include_data=True, flat=False, stats=True, cprofile=None):
        result = {
            'group': self.group,
            'name': self.name,
            'fullname': self.fullname,
            'params': self.params,
            'param': self.param,
            'extra_info': self.extra_info,
            'options': {k: funcname(v) if callable(v) else v for k, v in self.options.items()},
        }
        if self.cprofile_stats:
            cprofile_list = result['cprofile'] = []
            cprofile_functions = get_cprofile_functions(self.cprofile_stats)
            stats_columns = ['cumtime', 'tottime', 'ncalls', 'ncalls_recursion', 'tottime_per', 'cumtime_per', 'function_name']
            # move column first
            if cprofile is not None:
                stats_columns.remove(cprofile)
                stats_columns.insert(0, cprofile)
            for column in stats_columns:
                cprofile_functions.sort(key=operator.itemgetter(column), reverse=True)
                for cprofile_function in cprofile_functions[:25]:
                    if cprofile_function not in cprofile_list:
                        cprofile_list.append(cprofile_function)
                # if we want only one column or we already have all available functions
                if cprofile is None or len(cprofile_functions) == len(cprofile_list):
                    break
        if stats:
            stats = self.stats.as_dict()
            if include_data:
                stats['data'] = self.stats.data
            stats['iterations'] = self.iterations
            if flat:
                result.update(stats)
            else:
                result['stats'] = stats
        return result

    def update(self, duration):
        self.stats.update(duration / self.iterations)


def normalize_stats(stats):
    if 'ops' not in stats:
        # fill field added in 3.1.0
        stats['ops'] = 1 / stats['mean']
    return stats