ActivityWatch/aw-core

View on GitHub
aw_datastore/benchmark.py

Summary

Maintainability
A
0 mins
Test Coverage
#!/usr/bin/env python3
import sys
from typing import Callable
from datetime import datetime, timedelta, timezone
from contextlib import contextmanager

from aw_core.models import Event

from takethetime import ttt

from aw_datastore import get_storage_methods, Datastore
from aw_datastore.storages import AbstractStorage

td1s = timedelta(seconds=1)


def create_test_events(n):
    now = datetime.now(timezone.utc) - timedelta(days=1000)

    events = []
    for i in range(n):
        events.append(
            Event(timestamp=now + i * td1s, duration=td1s, data={"label": "asd"})
        )

    return events


@contextmanager
def temporary_bucket(ds):
    bucket_id = "test_bucket"
    try:
        ds.delete_bucket(bucket_id)
    except Exception:
        pass
    bucket = ds.create_bucket(bucket_id, "testingtype", "test-client", "testing-box")
    yield bucket
    ds.delete_bucket(bucket_id)


def benchmark(storage: Callable[..., AbstractStorage]):
    if storage.__name__ == "PeeweeStorage":
        ds = Datastore(storage, testing=True, filepath="test.db")
    else:
        ds = Datastore(storage, testing=True)

    num_single_events = 50
    num_replace_events = 50
    num_bulk_events = 20_000
    num_events = num_single_events + num_replace_events + num_bulk_events + 1
    num_final_events = num_single_events + num_bulk_events + 1

    events = create_test_events(num_events)
    single_events = events[:num_single_events]
    replace_events = events[num_single_events : num_single_events + num_replace_events]
    bulk_events = events[num_single_events + num_replace_events : -1]

    print(storage.__name__)

    with temporary_bucket(ds) as bucket:
        with ttt(" sum"):
            with ttt(f" single insert {num_single_events} events"):
                for event in single_events:
                    bucket.insert(event)

            with ttt(f" bulk insert {num_bulk_events} events"):
                bucket.insert(bulk_events)

            with ttt(f" replace last {num_replace_events}"):
                for e in replace_events:
                    bucket.replace_last(e)

            with ttt(" insert 1 event"):
                bucket.insert(events[-1])

            with ttt(" get one"):
                events_tmp = bucket.get(limit=1)

            with ttt(" get all"):
                events_tmp = bucket.get(limit=-1)
                assert len(events_tmp) == num_final_events

            with ttt(" get range"):
                events_tmp = bucket.get(
                    limit=-1,
                    starttime=events[1].timestamp + 0.01 * td1s,
                    endtime=events[-1].timestamp + events[-1].duration,
                )
                assert len(events_tmp) == num_final_events - 1


if __name__ == "__main__":
    for storage in get_storage_methods().values():
        if len(sys.argv) <= 1 or storage.__name__ in sys.argv:
            benchmark(storage)