src/pytest_benchmark/stats.py
import operator
import statistics
from bisect import bisect_left
from bisect import bisect_right
from .utils import cached_property
from .utils import funcname
from .utils import get_cprofile_functions
class Stats:
fields = (
'min',
'max',
'mean',
'stddev',
'rounds',
'median',
'iqr',
'q1',
'q3',
'iqr_outliers',
'stddev_outliers',
'outliers',
'ld15iqr',
'hd15iqr',
'ops',
'total',
)
def __init__(self):
self.data = []
def __bool__(self):
return bool(self.data)
def __nonzero__(self):
return bool(self.data)
def as_dict(self):
return {field: getattr(self, field) for field in self.fields}
def update(self, duration):
self.data.append(duration)
@cached_property
def sorted_data(self):
return sorted(self.data)
@cached_property
def total(self):
return sum(self.data)
@cached_property
def min(self):
return min(self.data)
@cached_property
def max(self):
return max(self.data)
@cached_property
def mean(self):
return statistics.mean(self.data)
@cached_property
def stddev(self):
if len(self.data) > 1:
return statistics.stdev(self.data)
else:
return 0
@property
def stddev_outliers(self):
"""
Count of StdDev outliers: what's beyond (Mean - StdDev, Mean - StdDev)
"""
count = 0
q0 = self.mean - self.stddev
q4 = self.mean + self.stddev
for val in self.data:
if val < q0 or val > q4:
count += 1
return count
@cached_property
def rounds(self):
return len(self.data)
@cached_property
def median(self):
return statistics.median(self.data)
@cached_property
def ld15iqr(self):
"""
Tukey-style Lowest Datum within 1.5 IQR under Q1.
"""
if len(self.data) == 1:
return self.data[0]
else:
return self.sorted_data[bisect_left(self.sorted_data, self.q1 - 1.5 * self.iqr)]
@cached_property
def hd15iqr(self):
"""
Tukey-style Highest Datum within 1.5 IQR over Q3.
"""
if len(self.data) == 1:
return self.data[0]
else:
pos = bisect_right(self.sorted_data, self.q3 + 1.5 * self.iqr)
if pos == len(self.data):
return self.sorted_data[-1]
else:
return self.sorted_data[pos]
@cached_property
def q1(self):
rounds = self.rounds
data = self.sorted_data
# See: https://en.wikipedia.org/wiki/Quartile#Computing_methods
if rounds == 1:
return data[0]
elif rounds % 2: # Method 3
n, q = rounds // 4, rounds % 4
if q == 1:
return 0.25 * data[n - 1] + 0.75 * data[n]
else:
return 0.75 * data[n] + 0.25 * data[n + 1]
else: # Method 2
return statistics.median(data[: rounds // 2])
@cached_property
def q3(self):
rounds = self.rounds
data = self.sorted_data
# See: https://en.wikipedia.org/wiki/Quartile#Computing_methods
if rounds == 1:
return data[0]
elif rounds % 2: # Method 3
n, q = rounds // 4, rounds % 4
if q == 1:
return 0.75 * data[3 * n] + 0.25 * data[3 * n + 1]
else:
return 0.25 * data[3 * n + 1] + 0.75 * data[3 * n + 2]
else: # Method 2
return statistics.median(data[rounds // 2 :])
@cached_property
def iqr(self):
return self.q3 - self.q1
@property
def iqr_outliers(self):
"""
Count of Tukey outliers: what's beyond (Q1 - 1.5IQR, Q3 + 1.5IQR)
"""
count = 0
q0 = self.q1 - 1.5 * self.iqr
q4 = self.q3 + 1.5 * self.iqr
for val in self.data:
if val < q0 or val > q4:
count += 1
return count
@cached_property
def outliers(self):
return f'{self.stddev_outliers};{self.iqr_outliers}'
@cached_property
def ops(self):
if self.total:
return self.rounds / self.total
return 0
class Metadata:
def __init__(self, fixture, iterations, options):
self.name = fixture.name
self.fullname = fixture.fullname
self.group = fixture.group
self.param = fixture.param
self.params = fixture.params
self.extra_info = fixture.extra_info
self.cprofile_stats = fixture.cprofile_stats
self.iterations = iterations
self.stats = Stats()
self.options = options
self.fixture = fixture
def __bool__(self):
return bool(self.stats)
def __nonzero__(self):
return bool(self.stats)
def get(self, key, default=None):
try:
return getattr(self.stats, key)
except AttributeError:
return getattr(self, key, default)
def __getitem__(self, key):
try:
return getattr(self.stats, key)
except AttributeError:
return getattr(self, key)
@property
def has_error(self):
return self.fixture.has_error
def as_dict(self, include_data=True, flat=False, stats=True, cprofile=None):
result = {
'group': self.group,
'name': self.name,
'fullname': self.fullname,
'params': self.params,
'param': self.param,
'extra_info': self.extra_info,
'options': {k: funcname(v) if callable(v) else v for k, v in self.options.items()},
}
if self.cprofile_stats:
cprofile_list = result['cprofile'] = []
cprofile_functions = get_cprofile_functions(self.cprofile_stats)
stats_columns = ['cumtime', 'tottime', 'ncalls', 'ncalls_recursion', 'tottime_per', 'cumtime_per', 'function_name']
# move column first
if cprofile is not None:
stats_columns.remove(cprofile)
stats_columns.insert(0, cprofile)
for column in stats_columns:
cprofile_functions.sort(key=operator.itemgetter(column), reverse=True)
for cprofile_function in cprofile_functions[:25]:
if cprofile_function not in cprofile_list:
cprofile_list.append(cprofile_function)
# if we want only one column or we already have all available functions
if cprofile is None or len(cprofile_functions) == len(cprofile_list):
break
if stats:
stats = self.stats.as_dict()
if include_data:
stats['data'] = self.stats.data
stats['iterations'] = self.iterations
if flat:
result.update(stats)
else:
result['stats'] = stats
return result
def update(self, duration):
self.stats.update(duration / self.iterations)
def normalize_stats(stats):
if 'ops' not in stats:
# fill field added in 3.1.0
stats['ops'] = 1 / stats['mean']
return stats