smok-serwis/smok-client

View on GitHub
smok/metadata.py

Summary

Maintainability
A
0 mins
Test Coverage
F
47%
import queue
import time
import typing as tp
from urllib.parse import urlencode, quote_plus

from satella.coding import for_argument, queue_get
from satella.coding.decorators import retry
from satella.coding.structures import CacheDict

from smok.exceptions import ResponseError
from smok.extras import BaseMetadataDatabase


class PlainDataUpdater:
    __slots__ = 'entries', 'device'

    def __init__(self, device: 'SMOKDevice'):
        self.entries = queue.Queue()
        self.device = device

    def add_update(self, key: str, value: str, timestamp: tp.Optional[float] = None):
        timestamp = timestamp or time.time()
        self.entries.put((key, value, timestamp))

    @queue_get('entries', 0)
    @retry(3, exc_classes=ResponseError)
    def try_update(self, msg) -> None:
        key, value, timestamp = msg

        if not value:
            self.device.api.delete('/v1/device/metadata/plain/%s' % (urlencode(key),))
        else:
            self.device.api.put('/v1/device/metadata/plain/%s' % (quote_plus(key),),
                                json={'value': value, 'unless_timestamp_higher': timestamp})


class StorageDict:
    def __init__(self, device: 'SMOKDevice'):
        self.db = device.meta_database  # type: BaseMetadataDatabase

    def __getitem__(self, item: str) -> str:
        return self.db.get_plain(item)

    @for_argument(None, None, str)
    def __setitem__(self, key: str, value: str) -> None:
        self.db.set_plain(key, value)

    def __delitem__(self, key) -> None:
        self.db.delete_plain(key)


CACHE_MAX_FOR = 60


class PlainMetadata(CacheDict):
    """
    An object representing device's plain metadata.

    Entries will be cached for 60 seconds after downloading them from the server.

    Note that this does not support iteration or getting length, but supports
    __setitem__, __delitem__ and __getitem__.
    """
    __slots__ = ('device', 'updater', 'db', 'cached_for')

    def try_update(self) -> None:
        if self.device.allow_sync:
            self.updater.try_update()

    def __init__(self, device: 'SMOKDevice'):
        self.updater = PlainDataUpdater(device)
        self.db = device.meta_database
        self.device = device
        self.cached_for = {}

    def __getitem__(self, item) -> str:
        try:
            cached_for = self.cached_for[item]
            if time.monotonic() - cached_for > self.device.cache_metadata_for:
                raise KeyError()
            return self.db.get_plain(item)
        except KeyError:
            if not self.device.allow_sync:
                raise
            try:
                resp = self.device.api.get('/v1/device/metadata/plain/%s' % (
                    quote_plus(item)
                ))
                value = resp['value']
                self.db.update_plain(item, value, resp['timestamp'])
                self.cached_for[item] = time.monotonic()
                return value
            except ResponseError as e:
                if e.status_code == 404:
                    raise KeyError('key not found') from e
                return self.db.get_plain(item)

    def get(self, key: str, default=None) -> str:
        """
        Get a value for given metadata, returning default if it KeyErrors
        """
        try:
            return self[key]
        except KeyError:
            return default

    @for_argument(None, None, str)
    def __setitem__(self, key, value) -> None:
        self.db.put_plain(key, value)
        if self.device.allow_sync:
            self.updater.add_update(key, value, time.time())

        self.cached_for[key] = time.monotonic()

    def __delitem__(self, key) -> None:
        self.db.delete_plain(key)
        if self.device.allow_sync:
            self.updater.add_update(key, None, time.time())
        try:
            del self.cached_for[key]
        except KeyError:
            pass