collerek/ormar

View on GitHub
ormar/relations/relation.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
from enum import Enum
from typing import (
    TYPE_CHECKING,
    Generic,
    List,
    Optional,
    Set,
    Type,
    TypeVar,
    Union,
    cast,
)

import ormar  # noqa I100
from ormar.exceptions import RelationshipInstanceError  # noqa I100
from ormar.relations.relation_proxy import RelationProxy

if TYPE_CHECKING:  # pragma no cover
    from ormar.models import Model, NewBaseModel, T
    from ormar.relations import RelationsManager
else:
    T = TypeVar("T", bound="Model")


class RelationType(Enum):
    """
    Different types of relations supported by ormar:

    *  ForeignKey = PRIMARY
    *  reverse ForeignKey = REVERSE
    *  ManyToMany = MULTIPLE
    """

    PRIMARY = 1
    REVERSE = 2
    MULTIPLE = 3
    THROUGH = 4


class Relation(Generic[T]):
    """
    Keeps related Models and handles adding/removing of the children.
    """

    def __init__(
        self,
        manager: "RelationsManager",
        type_: RelationType,
        field_name: str,
        to: Type["T"],
        through: Optional[Type["Model"]] = None,
    ) -> None:
        """
        Initialize the Relation and keep the related models either as instances of
        passed Model, or as a RelationProxy which is basically a list of models with
        some special behavior, as it exposes QuerySetProxy and allows querying the
        related models already pre filtered by parent model.

        :param manager: reference to relation manager
        :type manager: RelationsManager
        :param type_: type of the relation
        :type type_: RelationType
        :param field_name: name of the relation field
        :type field_name: str
        :param to: model to which relation leads to
        :type to: Type[Model]
        :param through: model through which relation goes for m2m relations
        :type through: Type[Model]
        """
        self.manager = manager
        self._owner: "Model" = manager.owner
        self._type: RelationType = type_
        self._to_remove: Set = set()
        self.to: Type["T"] = to
        self._through = through
        self.field_name: str = field_name
        self.related_models: Optional[Union[RelationProxy, "Model"]] = (
            RelationProxy(relation=self, type_=type_, to=to, field_name=field_name)
            if type_ in (RelationType.REVERSE, RelationType.MULTIPLE)
            else None
        )

    def clear(self) -> None:
        if self._type in (RelationType.PRIMARY, RelationType.THROUGH):
            self.related_models = None
            self._owner.__dict__[self.field_name] = None
        elif self.related_models is not None:
            related_models = cast("RelationProxy", self.related_models)
            related_models._clear()
            self._owner.__dict__[self.field_name] = None

    @property
    def through(self) -> Type["Model"]:
        if not self._through:  # pragma: no cover
            raise RelationshipInstanceError("Relation does not have through model!")
        return self._through

    def _clean_related(self) -> None:
        """
        Removes dead weakrefs from RelationProxy.
        """
        cleaned_data = [
            x
            for i, x in enumerate(self.related_models)  # type: ignore
            if i not in self._to_remove
        ]
        self.related_models = RelationProxy(
            relation=self,
            type_=self._type,
            to=self.to,
            field_name=self.field_name,
            data_=cleaned_data,
        )
        relation_name = self.field_name
        self._owner.__dict__[relation_name] = cleaned_data
        self._to_remove = set()

    def _find_existing(
        self, child: Union["NewBaseModel", Type["NewBaseModel"]]
    ) -> Optional[int]:
        """
        Find child model in RelationProxy if exists.

        :param child: child model to find
        :type child: Model
        :return: index of child in RelationProxy
        :rtype: Optional[ind]
        """
        if not isinstance(self.related_models, RelationProxy):  # pragma nocover
            raise ValueError("Cannot find existing models in parent relation type")

        if child not in self.related_models:
            return None
        else:
            # We need to clear the weakrefs that don't point to anything anymore
            # There's an assumption here that if some of the related models
            # went out of scope, then they all did, so we can just check the first one
            try:
                self.related_models[0].__repr__.__self__
                return self.related_models.index(child)
            except ReferenceError:
                missing = self.related_models._get_list_of_missing_weakrefs()
                self._to_remove.update(missing)
            return self.related_models.index(child)

    def add(self, child: "Model") -> None:
        """
        Adds child Model to relation, either sets child as related model or adds
        it to the list in RelationProxy depending on relation type.

        :param child: model to add to relation
        :type child: Model
        """
        relation_name = self.field_name
        if self._type in (RelationType.PRIMARY, RelationType.THROUGH):
            self.related_models = child
            self._owner.__dict__[relation_name] = child
        else:
            if self._find_existing(child) is None:
                self.related_models.append(child)  # type: ignore
                rel = self._owner.__dict__.get(relation_name, [])
                rel = rel or []
                if not isinstance(rel, list):
                    rel = [rel]
                self._populate_owner_side_dict(rel=rel, child=child)
                self._owner.__dict__[relation_name] = rel

    def _populate_owner_side_dict(self, rel: List["Model"], child: "Model") -> None:
        try:
            if child not in rel:
                rel.append(child)
        except ReferenceError:
            rel.clear()
            rel.append(child)

    def remove(self, child: Union["NewBaseModel", Type["NewBaseModel"]]) -> None:
        """
        Removes child Model from relation, either sets None as related model or removes
        it from the list in RelationProxy depending on relation type.

        :param child: model to remove from relation
        :type child: Model
        """
        relation_name = self.field_name
        if self._type == RelationType.PRIMARY:
            if self.related_models == child:
                self.related_models = None
                del self._owner.__dict__[relation_name]
        else:
            position = self._find_existing(child)
            if position is not None:
                self.related_models.pop(position)  # type: ignore
                del self._owner.__dict__[relation_name][position]

    def get(self) -> Optional[Union[List["Model"], "Model"]]:
        """
        Return the related model or models from RelationProxy.

        :return: related model/models if set
        :rtype: Optional[Union[List[Model], Model]]
        """
        if self._to_remove:
            self._clean_related()
        return self.related_models

    def __repr__(self) -> str:  # pragma no cover
        if self._to_remove:
            self._clean_related()
        return str(self.related_models)