ionelmc/pytest-benchmark

View on GitHub
src/pytest_benchmark/storage/elasticsearch.py

Summary

Maintainability
A
45 mins
Test Coverage
import re
import uuid
from datetime import date
from datetime import datetime
from decimal import Decimal
from functools import partial

from ..stats import normalize_stats

try:
    import elasticsearch
    from elasticsearch.serializer import JSONSerializer
except ImportError as exc:
    raise ImportError('Please install elasticsearch or pytest-benchmark[elasticsearch]') from exc


class BenchmarkJSONSerializer(JSONSerializer):
    def default(self, data):
        if isinstance(data, (date, datetime)):
            return data.isoformat()
        elif isinstance(data, Decimal):
            return float(data)
        elif isinstance(data, uuid.UUID):
            return str(data)
        else:
            return 'UNSERIALIZABLE[%r]' % data


def _mask_hosts(hosts):
    m = re.compile('^([^:]+)://[^@]+@')
    sub_fun = partial(m.sub, '\\1://***:***@')
    masked_hosts = list(map(sub_fun, hosts))
    return masked_hosts


class ElasticsearchStorage:
    def __init__(self, hosts, index, doctype, project_name, logger, default_machine_id=None):
        self._es_hosts = hosts
        self._es_index = index
        self._es_doctype = doctype
        self._es = elasticsearch.Elasticsearch(self._es_hosts, serializer=BenchmarkJSONSerializer())
        self._project_name = project_name
        self.default_machine_id = default_machine_id
        self.logger = logger
        self._cache = {}
        self._create_index()

    def __str__(self):
        return str(self._es_hosts)

    @property
    def location(self):
        return str(self._es_hosts)

    def query(self):
        """
        Returns sorted records names (ids) that corresponds with project.
        """
        body = {'size': 0, 'aggs': {'benchmark_ids': {'terms': {'field': 'benchmark_id'}}}}
        result = self._es.search(index=self._es_index, doc_type=self._es_doctype, body=body)
        return sorted([record['key'] for record in result['aggregations']['benchmark_ids']['buckets']])

    def load(self, id_prefix=None):
        """
        Yield key and content of records that corresponds with project name.
        """
        r = self._search(self._project_name, id_prefix)
        groupped_data = self._group_by_commit_and_time(r['hits']['hits'])
        result = list(groupped_data.items())
        result.sort(key=lambda x: datetime.strptime(x[1]['datetime'], '%Y-%m-%dT%H:%M:%S.%f'))  # noqa: DTZ007
        for key, data in result:
            for bench in data['benchmarks']:
                normalize_stats(bench['stats'])
            yield key, data

    def _search(self, project, id_prefix=None):
        body = {
            'size': 1000,
            'sort': [{'datetime': {'order': 'desc'}}],
            'query': {'bool': {'filter': {'term': {'commit_info.project': project}}}},
        }
        if id_prefix:
            body['query']['bool']['must'] = {'prefix': {'_id': id_prefix}}

        return self._es.search(index=self._es_index, doc_type=self._es_doctype, body=body)

    @staticmethod
    def _benchmark_from_es_record(source_es_record):
        result = {}
        for benchmark_key in ('group', 'stats', 'options', 'param', 'name', 'params', 'fullname', 'benchmark_id'):
            result[benchmark_key] = source_es_record[benchmark_key]
        return result

    @staticmethod
    def _run_info_from_es_record(source_es_record):
        result = {}
        for run_key in ('machine_info', 'commit_info', 'datetime', 'version'):
            result[run_key] = source_es_record[run_key]
        return result

    def _group_by_commit_and_time(self, hits):
        result = {}
        for hit in hits:
            source_hit = hit['_source']
            key = '{}_{}'.format(source_hit['commit_info']['id'], source_hit['datetime'])
            benchmark = self._benchmark_from_es_record(source_hit)
            if key in result:
                result[key]['benchmarks'].append(benchmark)
            else:
                run_info = self._run_info_from_es_record(source_hit)
                run_info['benchmarks'] = [benchmark]
                result[key] = run_info
        return result

    def load_benchmarks(self, *args):
        """
        Yield benchmarks that corresponds with project. Put path and
        source (uncommon part of path) to benchmark dict.
        """
        id_prefix = args[0] if args else None
        r = self._search(self._project_name, id_prefix)
        for hit in r['hits']['hits']:
            bench = self._benchmark_from_es_record(hit['_source'])
            bench.update(bench.pop('stats'))
            bench['source'] = bench['benchmark_id']
            yield bench

    def save(self, output_json, save):
        output_benchmarks = output_json.pop('benchmarks')
        for bench in output_benchmarks:
            # add top level info from output_json dict to each record
            bench.update(output_json)
            benchmark_id = save
            if self.default_machine_id:
                benchmark_id = self.default_machine_id + '_' + benchmark_id
            doc_id = benchmark_id + '_' + bench['fullname']
            bench['benchmark_id'] = benchmark_id
            self._es.index(
                index=self._es_index,
                doc_type=self._es_doctype,
                body=bench,
                id=doc_id,
            )
        # hide user's credentials before logging
        masked_hosts = _mask_hosts(self._es_hosts)
        self.logger.info(f'Saved benchmark data to {masked_hosts} to index {self._es_index} as doctype {self._es_doctype}')

    def _create_index(self):
        mapping = {
            'mappings': {
                'benchmark': {
                    'properties': {
                        'commit_info': {
                            'properties': {
                                'dirty': {'type': 'boolean'},
                                'id': {'type': 'string', 'index': 'not_analyzed'},
                                'project': {'type': 'string', 'index': 'not_analyzed'},
                            }
                        },
                        'datetime': {'type': 'date', 'format': 'strict_date_optional_time||epoch_millis'},
                        'name': {'type': 'string', 'index': 'not_analyzed'},
                        'fullname': {'type': 'string', 'index': 'not_analyzed'},
                        'version': {'type': 'string', 'index': 'not_analyzed'},
                        'benchmark_id': {
                            'type': 'string',
                            'index': 'not_analyzed',
                        },
                        'machine_info': {
                            'properties': {
                                'machine': {'type': 'string', 'index': 'not_analyzed'},
                                'node': {'type': 'string', 'index': 'not_analyzed'},
                                'processor': {'type': 'string', 'index': 'not_analyzed'},
                                'python_build': {'type': 'string', 'index': 'not_analyzed'},
                                'python_compiler': {'type': 'string', 'index': 'not_analyzed'},
                                'python_implementation': {'type': 'string', 'index': 'not_analyzed'},
                                'python_implementation_version': {'type': 'string', 'index': 'not_analyzed'},
                                'python_version': {'type': 'string', 'index': 'not_analyzed'},
                                'release': {'type': 'string', 'index': 'not_analyzed'},
                                'system': {'type': 'string', 'index': 'not_analyzed'},
                            }
                        },
                        'options': {
                            'properties': {
                                'disable_gc': {'type': 'boolean'},
                                'max_time': {'type': 'double'},
                                'min_rounds': {'type': 'long'},
                                'min_time': {'type': 'double'},
                                'timer': {'type': 'string'},
                                'warmup': {'type': 'boolean'},
                            }
                        },
                        'stats': {
                            'properties': {
                                'hd15iqr': {'type': 'double'},
                                'iqr': {'type': 'double'},
                                'iqr_outliers': {'type': 'long'},
                                'iterations': {'type': 'long'},
                                'ld15iqr': {'type': 'double'},
                                'max': {'type': 'double'},
                                'mean': {'type': 'double'},
                                'median': {'type': 'double'},
                                'min': {'type': 'double'},
                                'outliers': {'type': 'string'},
                                'q1': {'type': 'double'},
                                'q3': {'type': 'double'},
                                'rounds': {'type': 'long'},
                                'stddev': {'type': 'double'},
                                'stddev_outliers': {'type': 'long'},
                                'ops': {'type': 'double'},
                            }
                        },
                    }
                }
            }
        }
        self._es.indices.create(index=self._es_index, ignore=400, body=mapping)