idlesign/django-sitecats

View on GitHub
sitecats/utils.py

Summary

Maintainability
B
4 hrs
Test Coverage
from typing import Type, Any, List, Set, Optional, Union, Dict

from django.apps import apps
from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.db.models import signals, Count, Model
from etc.toolbox import get_model_class_from_string

from .settings import MODEL_CATEGORY, MODEL_TIE

if False:  # pragma: nocover
    from .models import CategoryBase, TieBase, ModelWithCategory  # noqa


def get_category_model() -> Type['CategoryBase']:
    """Returns the Category model, set for the project."""
    return get_model_class_from_string(MODEL_CATEGORY)


def get_tie_model() -> Type['TieBase']:
    """Returns the Tie model, set for the project."""
    return get_model_class_from_string(MODEL_TIE)


_SITECATS_CACHE = None


def get_cache() -> 'Cache':
    """Returns global cache object."""

    global _SITECATS_CACHE

    if _SITECATS_CACHE is None:
        _SITECATS_CACHE = apps.get_app_config('sitecats').get_categories_cache()

    return _SITECATS_CACHE


class Cache:

    # Sitecats objects are stored in Django cache for a year (60 * 60 * 24 * 365 = 31536000 sec).
    # Cache is only invalidated on sitecats Category model save/delete.
    CACHE_TIMEOUT: str = 31536000
    CACHE_ENTRY_NAME: str = 'sitecats'

    CACHE_NAME_IDS: str = 'ids'
    CACHE_NAME_ALIASES: str = 'aliases'
    CACHE_NAME_PARENTS: str = 'parents'

    def __init__(self):
        self._cache = None
        # Listen for signals from the models.
        category_model = get_category_model()
        signals.post_save.connect(self._cache_empty, sender=category_model)
        signals.post_delete.connect(self._cache_empty, sender=category_model)

    def _cache_init(self):
        """Initializes local cache from Django cache if required."""
        cache_ = cache.get(self.CACHE_ENTRY_NAME)

        if cache_ is None:
            categories = get_category_model().objects.order_by('sort_order')

            ids = {category.id: category for category in categories}
            aliases = {category.alias: category for category in categories if category.alias}

            parent_to_children = {}

            for category in categories:
                parent_category = ids.get(category.parent_id, False)
                parent_alias = None

                if parent_category:
                    parent_alias = parent_category.alias

                if parent_alias not in parent_to_children:
                    parent_to_children[parent_alias] = []

                parent_to_children[parent_alias].append(category.id)

            cache_ = {
                self.CACHE_NAME_IDS: ids,
                self.CACHE_NAME_PARENTS: parent_to_children,
                self.CACHE_NAME_ALIASES: aliases
            }

            cache.set(self.CACHE_ENTRY_NAME, cache_, self.CACHE_TIMEOUT)

        self._cache = cache_

    def _cache_empty(self, **kwargs):
        """Empties cached sitecats data."""
        self._cache = None
        cache.delete(self.CACHE_ENTRY_NAME)

    ENTIRE_ENTRY_KEY = tuple()

    def _cache_get_entry(
            self,
            entry_name: str,
            key: Union[str, int] = ENTIRE_ENTRY_KEY,
            default: Any = False

    ) -> Any:
        """Returns cache entry parameter value by its name.

        :param entry_name:
        :param key:
        :param default:

        """
        if key is self.ENTIRE_ENTRY_KEY:
            return self._cache[entry_name]
        return self._cache[entry_name].get(key, default)

    def sort_aliases(self, aliases: List[str]) -> List[str]:
        """Sorts the given aliases list, returns a sorted list.

        :param aliases:

        """
        self._cache_init()
        if not aliases:
            return aliases
        parent_aliases = self._cache_get_entry(self.CACHE_NAME_PARENTS).keys()
        return [parent_alias for parent_alias in parent_aliases if parent_alias in aliases]

    def get_parents_for(self, child_ids: List[int]) -> Set[str]:
        """Returns parent aliases for a list of child IDs.

        :param child_ids:

        """
        self._cache_init()
        parent_candidates = []
        for parent, children in self._cache_get_entry(self.CACHE_NAME_PARENTS).items():
            if set(children).intersection(child_ids):
                parent_candidates.append(parent)
        return set(parent_candidates)  # Make unique.

    def get_children_for(self, parent_alias: str = None, only_with_aliases: bool = False) -> List['CategoryBase']:
        """Returns a list with with categories under the given parent.

        :param parent_alias: Parent category alias or None for categories under root
        :param only_with_aliases: Flag to return only children with aliases

        """
        self._cache_init()
        child_ids = self.get_child_ids(parent_alias)

        if only_with_aliases:
            children = []

            for cid in child_ids:
                category = self.get_category_by_id(cid)
                if category.alias:
                    children.append(category)

            return children

        return [self.get_category_by_id(cid) for cid in child_ids]

    def get_child_ids(self, parent_alias: str) -> List[int]:
        """Returns child IDs of the given parent category

        :param parent_alias: Parent category alias

        """
        self._cache_init()
        return self._cache_get_entry(self.CACHE_NAME_PARENTS, parent_alias, [])

    def get_category_by_alias(self, alias: str) -> Optional['CategoryBase']:
        """Returns Category object by its alias.

        :param alias:

        """
        self._cache_init()
        return self._cache_get_entry(self.CACHE_NAME_ALIASES, alias, None)

    def get_category_by_id(self, cid: int) -> Optional['CategoryBase']:
        """Returns Category object by its id.

        :param cid:

        """
        self._cache_init()
        return self._cache_get_entry(self.CACHE_NAME_IDS, cid, None)

    def find_category(self, parent_alias: str, title: str) -> Optional['CategoryBase']:
        """Searches parent category children for the given title (case independent).

        :param parent_alias:
        :param title:

        """
        get_by_id = self.get_category_by_id

        for cid in self.get_child_ids(parent_alias):
            category = get_by_id(cid)
            if category and category.title.lower() == title.lower():
                return category

        return None

    def get_ties_stats(self, categories: List[int], target_model: Optional[Model] = None) -> Dict[int, int]:
        """Returns a dict with categories popularity stats.

        :param categories:
        :param target_model:

        """
        filter_kwargs = {
            'category_id__in': categories
        }

        if target_model is not None:
            is_cls = hasattr(target_model, '__name__')

            if is_cls:
                concrete = False

            else:
                concrete = True
                filter_kwargs['object_id'] = target_model.id

            filter_kwargs['content_type'] = ContentType.objects.get_for_model(
                target_model, for_concrete_model=concrete
            )

        return {
            item['category_id']: item['ties_num'] for item in
            get_tie_model().objects.filter(
                **filter_kwargs).values('category_id').annotate(ties_num=Count('category'))
        }

    def get_categories(
            self,
            parent_aliases: Optional[Union[str, List[str]]] = None,
            target_object: 'ModelWithCategory' = None,
            tied_only: bool = True
    ):
        """Returns subcategories (or ties if `target_object` is set)
        for the given parent category.

        :param parent_aliases:
        :param target_object:
        :param tied_only: Flag to get only categories with ties. Ties stats are stored in `ties_num` attrs.

        """
        single_mode = False
        if not isinstance(parent_aliases, list):
            single_mode = parent_aliases
            parent_aliases = [parent_aliases]

        all_children = []

        parents_to_children = {}

        for parent_alias in parent_aliases:
            child_ids = self.get_child_ids(parent_alias)
            parents_to_children[parent_alias] = child_ids
            if tied_only:
                all_children.extend(child_ids)

        ties = {}
        if tied_only:
            source = {}
            ties = self.get_ties_stats(all_children, target_object)
            for parent_alias, child_ids in parents_to_children.items():
                common = set(ties.keys()).intersection(child_ids)
                if common:
                    source[parent_alias] = common

        else:
            source = parents_to_children

        categories = {}

        for parent_alias, child_ids in source.items():

            for cat_id in child_ids:
                cat = self.get_category_by_id(cat_id)

                if tied_only:
                    cat.ties_num = ties.get(cat_id, 0)

                if parent_alias not in categories:
                    categories[parent_alias] = []

                categories[parent_alias].append(cat)

        if single_mode != False:  # sic!
            return categories[single_mode]

        return categories