cogniteev/docido-python-sdk

View on GitHub
docido_sdk/index/test.py

Summary

Maintainability
D
1 day
Test Coverage

import copy
from contextlib import contextmanager
import json
import os.path as osp
import shutil
import tempfile

import six

from docido_sdk.core import (
    Component,
    implements,
)
from docido_sdk.toolbox.threading_ext import RWLock
from .api import (
    IndexAPIProcessor,
    IndexAPIProvider,
)
from .errors import IndexAPIError
from docido_sdk.toolbox.decorators import reraise
from docido_sdk.toolbox.http_ext import delayed_request


reraise = reraise(IndexAPIError)
ALLOWED_CHECKPOINT_VALUE_TYPES = six.string_types + (int, long, float)


class CustomJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, delayed_request):
            return repr(obj)
        return repr(obj)


class LocalKVProcessor(IndexAPIProcessor):
    """Local thread-safe, `IndexAPIProcessor` persistent storage
    implementation backed by a json file on the local filesystem.
    """

    __lock = RWLock()
    __store = dict()

    def __init__(self, **config):
        """
        :param path: Path to a json file where the KVS is written.
        """
        super(LocalKVProcessor, self).__init__(**config)
        local_storage = config.get('local_storage', {})
        kv_storage = local_storage.get('kv', {})
        path = kv_storage.get('path')
        if path is None:
            path = tempfile.mkdtemp(prefix='docido-local-storage-kv')
        path = osp.join(path, 'kv.yaml')
        self.__path = path
        with self.__lock.write():
            if osp.exists(path):
                with open(path) as istr:
                    self.__store = json.load(istr)

    @reraise
    def get_kv(self, key):
        assert isinstance(key, six.string_types)
        with self.__lock.read():
            return self.__store.get(key)

    @reraise
    def get_kvs(self):
        with self.__lock.read():
            return copy.copy(self.__store)

    @reraise
    def set_kv(self, key, value):
        assert isinstance(key, six.string_types)
        assert isinstance(value, ALLOWED_CHECKPOINT_VALUE_TYPES)
        with self.__lock.write():
            self.__store[key] = value
            self.__persist()

    @reraise
    def delete_kv(self, key):
        assert isinstance(key, six.string_types)
        with self.__lock.write():
            self.__store.pop(key, None)
            self.__persist()

    @reraise
    def delete_kvs(self):
        with self.__lock.write():
            self.__store.clear()
            self.__persist()

    def __persist(self):
        with open(self.__path + '.new', 'w') as ostr:
            json.dump(self.__store, ostr, indent=2, cls=CustomJSONEncoder)
        shutil.move(self.__path + '.new', self.__path)


class LocalKV(Component):
    implements(IndexAPIProvider)

    def get_index_api(self, **config):
        return LocalKVProcessor(**config)


class LocalDumbIndexProcessor(IndexAPIProcessor):
    """Dumb, but yet reentrant, index implementation, persisting indices
    in local-filesystem.

    Some methods does not provide all functionalities the real Docido index
    provides. More information available in documentation of the following
    member methods: `delete_cards`, `search_cards`, and `delete_thumbnails`.
    """
    __lock = RWLock()
    __cards = dict()
    __thumbnails = dict()

    def __init__(self, **config):
        super(LocalDumbIndexProcessor, self).__init__(**config)
        local_storage = config.get('local_storage', {})
        index_storage = local_storage.get('documents', {})
        path = index_storage.get('path')
        if path is None:
            path = tempfile.mkdtemp('docido-local-storage-documents')
        cards_path = osp.join(path, 'cards.yml')
        thumbnails_path = osp.join(path, 'thumbnails.yml')
        failure_probability = index_storage.get('failure_probability', 0)

        self.__cards_path = cards_path
        self.__thumbnails_path = thumbnails_path
        self.__cards = LocalDumbIndexProcessor.load_index(cards_path)
        self.__thumbnails = LocalDumbIndexProcessor.load_index(thumbnails_path)
        self.__failure_probability = failure_probability

    @contextmanager
    def __update(self, cards=False, thumbnails=False):
        self.__lock.writer_acquire()
        try:
            yield
            if cards:
                LocalDumbIndexProcessor.persist_index(
                    self.__cards, self.__cards_path
                )
            if thumbnails:
                LocalDumbIndexProcessor.persist_index(
                    self.__thumbnails,
                    self.__thumbnails_path
                )
        finally:
            self.__lock.writer_release()

    def push_cards(self, cards):
        with self.__update(cards=True):
            for card in cards:
                self.__cards[card['id']] = card

    def delete_cards(self, query=None):
        if query != {'query': {'match_all': {}}}:
            raise IndexAPIError(
                'only match_all query is currently supported, you should use' +
                ' the ElasticSearch processor along with es-settings.yml ' +
                'config file instead'
            )
        with self.__update(cards=True):
            self.__cards.clear()

    def delete_cards_by_id(self, ids):
        errors = []
        with self.__update(cards=True):
            for _id in ids:
                if _id not in self.__cards:
                    errors.append({'status': 404, 'id': _id})
                    continue
                del self.__cards[_id]
            return errors

    def search_cards(self, query=None):
        with self.__lock.read():
            fetch_fields = None
            if query and 'fields' in query.keys():
                fetch_fields = query.get('fields', None)
            result = list()
            if fetch_fields is not None:
                for card in self.__cards.values():
                    result.append(dict((k, card[k]) for k in fetch_fields))
            else:
                for card in self.__cards.values():
                    result.append(card)
        return result
        # return {
        #     'took': 1,
        #     'timed_out': False,
        #     '_shards': {
        #         'total': 1,
        #         'successful': 1,
        #         'failed': 0,
        #     },
        #     'hits': {
        #         'total': len(result),
        #         'max_score': 1.0,
        #         'hits': [{
        #             '_index': 'docido',
        #             '_type': 'item',
        #             '_id': r['id'],
        #             '_score': 1.0,
        #             '_source': r
        #         } for r in result
        #         ],
        #     }
        # }

    def push_thumbnails(self, thumbnails):
        with self.__update(thumbnails=True):
            for id_, payload, mime in thumbnails:
                self.__thumbnails[id_] = (payload, mime)

    def delete_thumbnails(self, query):
        if query != {'query': {'match_all': {}}}:
            raise IndexAPIError(
                'only match_all query is currently supported, you should use' +
                ' the ElasticSearch processor along with es-settings.yml ' +
                'config file instead'
            )
        with self.__update(thumbnails=True):
            self.__thumbnails.clear()

    def delete_thumbnails_by_id(self, ids):
        errors = []
        with self.__update(thumbnails=True):
            for _id in ids:
                if _id not in self.__thumbnails:
                    errors.append({'status': 404, 'id': _id})
                    continue
                del self.__thumbnails[_id]
            return errors

    @classmethod
    def load_index(cls, path):
        if not osp.exists(path):
            return dict()
        else:
            with open(path) as istr:
                return json.load(istr)

    @classmethod
    def persist_index(cls, index, path):
        with open(path + '.new', 'w') as ostr:
            json.dump(index, ostr, indent=2, cls=CustomJSONEncoder)
        shutil.move(path + '.new', path)


class LocalDumbIndex(Component):
    implements(IndexAPIProvider)

    def get_index_api(self, **config):
        return LocalDumbIndexProcessor(**config)