Crystalnix/termius-cli

View on GitHub
termius/core/storage/__init__.py

Summary

Maintainability
A
2 hrs
Test Coverage
# -*- coding: utf-8 -*-
"""Module for Application storage."""
import logging
from collections import namedtuple
from ..signals import (
    pre_create_instance, post_create_instance,
    pre_update_instance, post_update_instance,
    pre_delete_instance, post_delete_instance,
)
from .idgenerators import UUIDGenerator
from .driver import PersistentDict
from ..exceptions import DoesNotExistException, TooManyEntriesException
from .strategies import SaveStrategy, GetStrategy, SoftDeleteStrategy
from .query import Query


# pylint: disable=too-few-public-methods
class InternalModelContructor(object):
    """Serializer raw data from storage to model.

    For internal use only.
    """

    def __init__(self, strategy):
        """Create new constructor."""
        self.strategy = strategy

    def __call__(self, raw_data, model_class):
        """Return barely wrapping raw_data with model_class."""
        return model_class(raw_data)


# pylint: disable=too-few-public-methods
class ModelContructor(InternalModelContructor):
    """Serializer raw data from storage to model."""

    def __call__(self, raw_data, model_class):
        """Call strategy to retrieve complete model tree."""
        model = super(ModelContructor, self).__call__(raw_data, model_class)
        return self.strategy.get(model)


Strategies = namedtuple('Strategies', ('getter', 'saver', 'deleter'))


class ApplicationStorage(object):
    """Storage for user data."""

    path = '{application_directory}/storage'
    defaultstorage = list
    logger = logging.getLogger(__name__)

    def __init__(self, command, save_strategy=None,
                 get_strategy=None, delete_strategy=None, **kwargs):
        """Create new storage for application."""
        paths_kwargs = dict(
            application_directory=command.app.directory_path, **kwargs
        )
        self._path = self.path.format(**paths_kwargs)
        self.driver = PersistentDict(self._path)
        self.id_generator = UUIDGenerator(self)
        self.command = command

        self.strategies = Strategies(
            self.make_strategy(get_strategy, GetStrategy),
            self.make_strategy(save_strategy, SaveStrategy),
            self.make_strategy(delete_strategy, SoftDeleteStrategy)
        )

        self.internal_model_constructor = InternalModelContructor(
            self.strategies.getter)
        self.model_constructor = ModelContructor(
            self.strategies.getter)

    def __enter__(self):
        """Start transaction."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Process transaction closing and sync driver."""
        self.driver.sync()

    def save(self, original_model):
        """Save model to storage.

        Will return model with id and saved mapped fields Model
        instances with ids.
        """
        model = self.strategies.saver.save(original_model)
        if getattr(model, model.id_name):
            saved_model = self.update(model)
        else:
            saved_model = self.create(model)
            original_model.id = model.id

        return saved_model

    def create(self, model):
        """Add new model in it's list."""
        assert not getattr(model, model.id_name)
        pre_create_instance.send(
            model.__class__, command=self.command, instance=model
        )
        model.id = self.generate_id(model)
        updated_model = self._internal_update(model)
        post_create_instance.send(
            model.__class__, command=self.command, instance=model
        )
        return updated_model

    def update(self, model):
        """Update existed model in it's list."""
        identificator = getattr(model, model.id_name)
        assert identificator

        pre_update_instance.send(
            model.__class__, command=self.command, instance=model
        )
        self._internal_delete(model)
        self.strategies.saver.mark_model(model)
        updated_model = self._internal_update(model)
        post_update_instance.send(
            model.__class__, command=self.command, instance=model
        )
        return updated_model

    def delete(self, model):
        """Delete model from it's list."""
        pre_delete_instance.send(
            model.__class__, command=self.command, instance=model
        )
        self._internal_delete(model)
        self.strategies.deleter.delete(model)
        post_delete_instance.send(
            model.__class__, command=self.command, instance=model
        )

    def confirm_delete(self, deleted_sets):
        """Remove intersection with deleted_sets from storage."""
        self.strategies.deleter.remove_intersection(deleted_sets)

    def get(self, model_class, query_union=None, **kwargs):
        """Get single model with passed lookups.

        Usage:
            list = storage.get(Model, any, **{'field.ge': 1, 'field.le': 5}
        """
        founded_models = self.filter(model_class, query_union, **kwargs)
        single_model = self._validate_the_single_model(founded_models)
        return single_model

    def get_single_by_id(self, model_class, identificator):
        """Retrieve single entry by id from storage."""
        assert identificator
        query = Query(all, id=identificator)
        internal_models = self._internal_get_all(model_class)
        founded_models = [i for i in internal_models if query(i)]
        single_model = self._validate_the_single_model(founded_models)
        return self.model_constructor(single_model, model_class)

    def filter(self, model_class, query_union=None, **kwargs):
        """Filter the model list with passed lookups.

        Usage:
            list = storage.filter(Model, any, **{'field.ge': 1, 'field.le': 5}
        """
        assert isinstance(model_class, type)
        assert kwargs
        query = Query(query_union, **kwargs)
        models = self.get_all(model_class)
        founded_models = [i for i in models if query(i)]
        return founded_models

    def exclude(self, model_class, query_union=None, **kwargs):
        """Exclude the model list when matches the lookups.

        Usage:
            list = storage.exclude(Model, any, **{'field.ge': 1, 'field.le': 5}
        """
        assert isinstance(model_class, type)
        assert kwargs
        query = Query(query_union, **kwargs)
        models = self.get_all(model_class)
        founded_models = [i for i in models if not query(i)]
        return founded_models

    def get_all(self, model_class):
        """Retrieve full model list."""
        return self._get_all_base(model_class, self.model_constructor)

    def _internal_get_all(self, model_class):
        return self._get_all_base(model_class, self.internal_model_constructor)

    def _get_all_base(self, model_class, model_contructor):
        assert isinstance(model_class, type)
        name = model_class.set_name
        data = self.driver.setdefault(name, self.defaultstorage())
        models = self.defaultstorage(
            (model_contructor(i, model_class) for i in data)
        )
        return models

    def _internal_update(self, model):
        models = self._internal_get_all(type(model))
        models.append(model)
        self.driver[model.set_name] = models
        return model

    def _internal_delete(self, model):
        identificator = getattr(model, model.id_name)
        assert identificator

        models = self._internal_get_all(type(model))
        for index, model_obj in enumerate(models):
            if model_obj.id == identificator:
                models.pop(index)
                break
        self.driver[model.set_name] = models

    def low_get(self, key):
        """Get data directly from driver."""
        return self.driver[key]

    def low_set(self, key, value):
        """Set data directly to driver."""
        self.driver[key] = value

    def generate_id(self, model):
        """Generate new local id."""
        return self.id_generator(model)

    def make_strategy(self, strategy_class, default):
        """Create new strategy."""
        strategy_class = strategy_class or default
        return strategy_class(self)

    # pylint: disable=no-self-use
    def _validate_the_single_model(self, founded_models):
        if not founded_models:
            raise DoesNotExistException
        if len(founded_models) != 1:
            raise TooManyEntriesException
        return founded_models[0]