smok-serwis/smok-client

View on GitHub
smok/extras/event_database/in_memory.py

Summary

Maintainability
A
0 mins
Test Coverage
F
56%
import os
import pickle
import time
import typing as tp
import weakref

from satella.coding import Monitor, silence_excs, ListDeleter
from satella.time import parse_time_string

from .base import BaseEventDatabase, BaseEventSynchronization
from ...predicate.event import Event


class InMemoryEventSynchronization(BaseEventSynchronization):
    __slots__ = ('event_db', 'events',)

    def __init__(self, event_db: 'InMemoryEventDatabase', events: tp.List[Event]):
        self.event_db = weakref.proxy(event_db)
        self.events = events

    def get_events(self) -> tp.List[Event]:
        return self.events

    def acknowledge(self, *uuids: str) -> None:
        for event, uuid in zip(self.events, uuids):
            if event.uuid is None:
                event.uuid = uuid

            if event.is_closed():
                with Monitor.acquire(self.event_db), silence_excs(ValueError):
                    self.events.remove(event)

    def negative_acknowledge(self) -> None:
        pass


class InMemoryEventDatabase(BaseEventDatabase, Monitor):
    """
    :param path: path to a DB file with pickled events
    :param keep_in_memory_for: amount of time to keep events for
    """

    @Monitor.synchronized
    def get_all_events(self) -> tp.Iterator[Event]:
        return iter(list(self.events))

    @Monitor.synchronized
    def get_open_events(self) -> tp.Iterator[Event]:
        for event in self.events:
            if not event.is_closed():
                yield event

    @Monitor.synchronized
    def close_event(self, event: Event) -> None:
        self.events_to_sync.append(event)

    @Monitor.synchronized
    def get_events_to_sync(self) -> tp.Optional[BaseEventSynchronization]:
        if not self.events_to_sync:
            return None
        else:
            return InMemoryEventSynchronization(self, self.events_to_sync)

    @Monitor.synchronized
    def add_event(self, event: Event) -> None:
        self.events.append(event)
        self.events_to_sync.append(event)

    def __init__(self, path: str, keep_in_memory_for: tp.Union[str, int] = '30d'):
        self.events = []
        self.path = path
        self.internal_data = {}
        self.events_to_sync = []
        Monitor.__init__(self)
        if os.path.exists(self.path):
            with open(self.path, 'rb') as f_in:
                try:
                    self.internal_data = pickle.load(f_in)
                except pickle.PickleError:
                    pass
        self.keep_in_memory_for = parse_time_string(keep_in_memory_for)

    @Monitor.synchronized
    def checkpoint(self) -> None:
        for i in range(len(self.events)):
            if self.events[i].is_closed():
                if time.time() - self.events.started_on > self.keep_in_memory_for:
                    del self.events[i]
                    return

    def set_cache(self, predicate_id: str, cache) -> None:
        self.internal_data[predicate_id] = cache
        self.sync()

    def get_cache(self, predicate_id: str) -> tp.Any:
        return self.internal_data[predicate_id]

    def sync(self):
        with open(self.path, 'wb') as f_out:
            pickle.dump(self.internal_data, f_out, pickle.HIGHEST_PROTOCOL)

    @silence_excs(KeyError)
    def on_predicate_deleted(self, predicate_id: str) -> None:
        del self.internal_data[predicate_id]
        self.sync()

    @Monitor.synchronized
    def clear_closed_and_synced_events(self) -> None:
        """
        Clear all events that were both closed and are already on the server
        """
        with ListDeleter(self.events) as ld:
            for event in ld:
                if event not in self.events_to_sync:
                    ld.delete()