treyhunner/django-simple-history

View on GitHub
simple_history/models.py

Summary

Maintainability
F
3 days
Test Coverage
import copy
import importlib
import uuid
import warnings
from dataclasses import dataclass
from functools import partial
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Sequence, Type, Union

import django
from django.apps import apps
from django.conf import settings
from django.contrib import admin
from django.contrib.auth import get_user_model
from django.core.exceptions import ImproperlyConfigured, ObjectDoesNotExist
from django.db import models
from django.db.models import ManyToManyField
from django.db.models.fields.proxy import OrderWrt
from django.db.models.fields.related import ForeignKey
from django.db.models.fields.related_descriptors import (
    ForwardManyToOneDescriptor,
    ReverseManyToOneDescriptor,
    create_reverse_many_to_one_manager,
)
from django.db.models.query import QuerySet
from django.db.models.signals import m2m_changed
from django.forms.models import model_to_dict
from django.urls import reverse
from django.utils import timezone
from django.utils.encoding import smart_str
from django.utils.functional import cached_property
from django.utils.text import format_lazy
from django.utils.translation import gettext_lazy as _

from . import exceptions, utils
from .manager import (
    SIMPLE_HISTORY_REVERSE_ATTR_NAME,
    HistoricalQuerySet,
    HistoryDescriptor,
    HistoryManager,
)
from .signals import (
    post_create_historical_m2m_records,
    post_create_historical_record,
    pre_create_historical_m2m_records,
    pre_create_historical_record,
)

try:
    from asgiref.local import Local as LocalContext
except ImportError:
    from threading import local as LocalContext

if TYPE_CHECKING:
    ModelTypeHint = models.Model
else:
    ModelTypeHint = object

registered_models = {}


def _default_get_user(request, **kwargs):
    try:
        return request.user
    except AttributeError:
        return None


def _history_user_getter(historical_instance):
    if historical_instance.history_user_id is None:
        return None
    User = get_user_model()
    try:
        return User.objects.get(pk=historical_instance.history_user_id)
    except User.DoesNotExist:
        return None


def _history_user_setter(historical_instance, user):
    if user is not None:
        historical_instance.history_user_id = user.pk


class HistoricalRecords:
    DEFAULT_MODEL_NAME_PREFIX = "Historical"

    thread = context = LocalContext()  # retain thread for backwards compatibility
    m2m_models = {}

    def __init__(
        self,
        verbose_name=None,
        verbose_name_plural=None,
        bases=(models.Model,),
        user_related_name="+",
        table_name=None,
        inherit=False,
        excluded_fields=None,
        history_id_field=None,
        history_change_reason_field=None,
        user_model=None,
        get_user=_default_get_user,
        cascade_delete_history=False,
        custom_model_name=None,
        app=None,
        history_user_id_field=None,
        history_user_getter=_history_user_getter,
        history_user_setter=_history_user_setter,
        related_name=None,
        use_base_model_db=False,
        user_db_constraint=True,
        no_db_index=list(),
        excluded_field_kwargs=None,
        history_manager=HistoryManager,
        historical_queryset=HistoricalQuerySet,
        m2m_fields=(),
        m2m_fields_model_field_name="_history_m2m_fields",
        m2m_bases=(models.Model,),
    ):
        self.user_set_verbose_name = verbose_name
        self.user_set_verbose_name_plural = verbose_name_plural
        self.user_related_name = user_related_name
        self.user_db_constraint = user_db_constraint
        self.table_name = table_name
        self.inherit = inherit
        self.history_id_field = history_id_field
        self.history_change_reason_field = history_change_reason_field
        self.user_model = user_model
        self.get_user = get_user
        self.cascade_delete_history = cascade_delete_history
        self.custom_model_name = custom_model_name
        self.app = app
        self.user_id_field = history_user_id_field
        self.user_getter = history_user_getter
        self.user_setter = history_user_setter
        self.related_name = related_name
        self.use_base_model_db = use_base_model_db
        self.history_manager = history_manager
        self.historical_queryset = historical_queryset
        self.m2m_fields = m2m_fields
        self.m2m_fields_model_field_name = m2m_fields_model_field_name

        if isinstance(no_db_index, str):
            no_db_index = [no_db_index]
        self.no_db_index = no_db_index

        if excluded_fields is None:
            excluded_fields = []
        self.excluded_fields = excluded_fields

        if excluded_field_kwargs is None:
            excluded_field_kwargs = {}
        self.excluded_field_kwargs = excluded_field_kwargs
        try:
            if isinstance(bases, str):
                raise TypeError
            self.bases = (HistoricalChanges,) + tuple(bases)
        except TypeError:
            raise TypeError("The `bases` option must be a list or a tuple.")
        try:
            if isinstance(m2m_bases, str):
                raise TypeError
            self.m2m_bases = (HistoricalChanges,) + tuple(m2m_bases)
        except TypeError:
            raise TypeError("The `m2m_bases` option must be a list or a tuple.")

    def contribute_to_class(self, cls, name):
        self.manager_name = name
        self.module = cls.__module__
        self.cls = cls
        models.signals.class_prepared.connect(self.finalize, weak=False)
        self.add_extra_methods(cls)

        if cls._meta.abstract and not self.inherit:
            msg = (
                "HistoricalRecords added to abstract model ({}) without "
                "inherit=True".format(self.cls.__name__)
            )
            warnings.warn(msg, UserWarning)

    def add_extra_methods(self, cls):
        def save_without_historical_record(self, *args, **kwargs):
            """
            Save the model instance without creating a historical record.

            Make sure you know what you're doing before using this method.
            """
            self.skip_history_when_saving = True
            try:
                ret = self.save(*args, **kwargs)
            finally:
                del self.skip_history_when_saving
            return ret

        setattr(cls, "save_without_historical_record", save_without_historical_record)

    def finalize(self, sender, **kwargs):
        inherited = False
        if self.cls is not sender:  # set in concrete
            inherited = self.inherit and issubclass(sender, self.cls)
            if not inherited:
                return  # set in abstract

        if hasattr(sender._meta, "simple_history_manager_attribute"):
            raise exceptions.MultipleRegistrationsError(
                "{}.{} registered multiple times for history tracking.".format(
                    sender._meta.app_label, sender._meta.object_name
                )
            )
        history_model = self.create_history_model(sender, inherited)

        if inherited:
            # Make sure history model is in same module as concrete model
            module = importlib.import_module(history_model.__module__)
        else:
            module = importlib.import_module(self.module)
        setattr(module, history_model.__name__, history_model)

        # The HistoricalRecords object will be discarded,
        # so the signal handlers can't use weak references.
        models.signals.post_save.connect(self.post_save, sender=sender, weak=False)
        models.signals.post_delete.connect(self.post_delete, sender=sender, weak=False)

        m2m_fields = self.get_m2m_fields_from_model(sender)

        for field in m2m_fields:
            m2m_changed.connect(
                partial(self.m2m_changed, attr=field.name),
                sender=field.remote_field.through,
                weak=False,
            )

        descriptor = HistoryDescriptor(
            history_model,
            manager=self.history_manager,
            queryset=self.historical_queryset,
        )
        setattr(sender, self.manager_name, descriptor)
        sender._meta.simple_history_manager_attribute = self.manager_name

        for field in m2m_fields:
            m2m_model = self.create_history_m2m_model(
                history_model, field.remote_field.through
            )
            self.m2m_models[field] = m2m_model

            setattr(module, m2m_model.__name__, m2m_model)

            m2m_descriptor = HistoryDescriptor(m2m_model)
            setattr(history_model, field.name, m2m_descriptor)

    def get_history_model_name(self, model):
        if not self.custom_model_name:
            return f"{self.DEFAULT_MODEL_NAME_PREFIX}{model._meta.object_name}"
        # Must be trying to use a custom history model name
        if callable(self.custom_model_name):
            name = self.custom_model_name(model._meta.object_name)
        else:
            #  simple string
            name = self.custom_model_name
        # Desired class name cannot be same as the model it is tracking
        if not (
            name.lower() == model._meta.object_name.lower()
            and model.__module__ == self.module
        ):
            return name
        raise ValueError(
            "The 'custom_model_name' option '{}' evaluates to a name that is the same "
            "as the model it is tracking. This is not permitted.".format(
                self.custom_model_name
            )
        )

    def create_history_m2m_model(self, model, through_model):
        attrs = {}

        fields = self.copy_fields(through_model)
        attrs.update(fields)
        attrs.update(self.get_extra_fields_m2m(model, through_model, fields))

        name = self.get_history_model_name(through_model)
        registered_models[through_model._meta.db_table] = through_model

        attrs.update(Meta=type("Meta", (), self.get_meta_options_m2m(through_model)))

        m2m_history_model = type(str(name), self.m2m_bases, attrs)

        return m2m_history_model

    def create_history_model(self, model, inherited):
        """
        Creates a historical model to associate with the model provided.
        """
        attrs = {
            "__module__": self.module,
            "_history_excluded_fields": self.excluded_fields,
            "_history_m2m_fields": self.get_m2m_fields_from_model(model),
            "tracked_fields": self.fields_included(model),
        }

        app_module = "%s.models" % model._meta.app_label

        if inherited:
            # inherited use models module
            attrs["__module__"] = model.__module__
        elif model.__module__ != self.module:
            # registered under different app
            attrs["__module__"] = self.module
        elif app_module != self.module:
            # Abuse an internal API because the app registry is loading.
            app = apps.app_configs[model._meta.app_label]
            models_module = app.name
            attrs["__module__"] = models_module

        fields = self.copy_fields(model)
        attrs.update(fields)
        attrs.update(self.get_extra_fields(model, fields))
        # type in python2 wants str as a first argument
        attrs.update(Meta=type("Meta", (), self.get_meta_options(model)))
        if not inherited and self.table_name is not None:
            attrs["Meta"].db_table = self.table_name

        # Set as the default then check for overrides
        name = self.get_history_model_name(model)

        registered_models[model._meta.db_table] = model
        history_model = type(str(name), self.bases, attrs)
        return history_model

    def fields_included(self, model):
        fields = []
        for field in model._meta.fields:
            if field.name not in self.excluded_fields:
                fields.append(field)
        return fields

    def field_excluded_kwargs(self, field):
        """
        Find the excluded kwargs for a given field.
        """
        return self.excluded_field_kwargs.get(field.name, set())

    def copy_fields(self, model):
        """
        Creates copies of the model's original fields, returning
        a dictionary mapping field name to copied field object.
        """
        fields = {}
        for field in self.fields_included(model):
            field = copy.copy(field)
            field.remote_field = copy.copy(field.remote_field)
            if isinstance(field, OrderWrt):
                # OrderWrt is a proxy field, switch to a plain IntegerField
                field.__class__ = models.IntegerField
            if isinstance(field, models.ForeignKey):
                old_field = field
                old_swappable = old_field.swappable
                old_field.swappable = False
                try:
                    _name, _path, args, field_args = old_field.deconstruct()
                finally:
                    old_field.swappable = old_swappable
                if getattr(old_field, "one_to_one", False) or isinstance(
                    old_field, models.OneToOneField
                ):
                    FieldType = models.ForeignKey
                else:
                    FieldType = type(old_field)

                # Remove any excluded kwargs for the field.
                # This is useful when a custom OneToOneField is being used that
                # has a different set of arguments than ForeignKey
                for exclude_arg in self.field_excluded_kwargs(old_field):
                    field_args.pop(exclude_arg, None)

                # If field_args['to'] is 'self' then we have a case where the object
                # has a foreign key to itself. If we pass the historical record's
                # field to = 'self', the foreign key will point to an historical
                # record rather than the base record. We can use old_field.model here.
                if field_args.get("to", None) == "self":
                    field_args["to"] = old_field.model

                # Override certain arguments passed when creating the field
                # so that they work for the historical field.
                field_args.update(
                    db_constraint=False,
                    related_name="+",
                    null=True,
                    blank=True,
                    primary_key=False,
                    db_index=True,
                    serialize=True,
                    unique=False,
                    on_delete=models.DO_NOTHING,
                )
                field = FieldType(*args, **field_args)
                field.name = old_field.name
            else:
                transform_field(field)

            # drop db index
            if field.name in self.no_db_index:
                field.db_index = False

            fields[field.name] = field
        return fields

    def _get_history_change_reason_field(self):
        if self.history_change_reason_field:
            # User specific field from init
            history_change_reason_field = self.history_change_reason_field
        elif getattr(
            settings, "SIMPLE_HISTORY_HISTORY_CHANGE_REASON_USE_TEXT_FIELD", False
        ):
            # Use text field with no max length, not enforced by DB anyways
            history_change_reason_field = models.TextField(null=True)
        else:
            # Current default, with max length
            history_change_reason_field = models.CharField(max_length=100, null=True)

        return history_change_reason_field

    def _get_history_id_field(self):
        if self.history_id_field:
            history_id_field = self.history_id_field.clone()
            history_id_field.primary_key = True
            history_id_field.editable = False
        elif getattr(settings, "SIMPLE_HISTORY_HISTORY_ID_USE_UUID", False):
            history_id_field = models.UUIDField(
                primary_key=True, default=uuid.uuid4, editable=False
            )
        else:
            history_id_field = models.AutoField(primary_key=True)

        return history_id_field

    def _get_history_user_fields(self):
        if self.user_id_field is not None:
            # Tracking user using explicit id rather than Django ForeignKey
            history_user_fields = {
                "history_user": property(self.user_getter, self.user_setter),
                "history_user_id": self.user_id_field,
            }
        else:
            user_model = self.user_model or getattr(
                settings, "AUTH_USER_MODEL", "auth.User"
            )

            history_user_fields = {
                "history_user": models.ForeignKey(
                    user_model,
                    null=True,
                    related_name=self.user_related_name,
                    on_delete=models.SET_NULL,
                    db_constraint=self.user_db_constraint,
                )
            }

        return history_user_fields

    def _get_history_related_field(self, model):
        if self.related_name:
            if self.manager_name == self.related_name:
                raise exceptions.RelatedNameConflictError(
                    "The related name must not be called like the history manager."
                )
            return {
                "history_relation": models.ForeignKey(
                    model,
                    on_delete=models.DO_NOTHING,
                    related_name=self.related_name,
                    db_constraint=False,
                )
            }
        else:
            return {}

    def get_extra_fields_m2m(self, model, through_model, fields):
        """Return dict of extra fields added to the m2m historical record model"""

        extra_fields = {
            "__module__": model.__module__,
            "__str__": lambda self: "{} as of {}".format(
                self._meta.verbose_name, self.history.history_date
            ),
            "history": models.ForeignKey(
                model,
                db_constraint=False,
                on_delete=models.DO_NOTHING,
            ),
            "instance_type": through_model,
            "m2m_history_id": self._get_history_id_field(),
        }

        return extra_fields

    def get_extra_fields(self, model, fields):
        """Return dict of extra fields added to the historical record model"""

        def revert_url(self):
            """URL for this change in the default admin site."""
            opts = model._meta
            app_label, model_name = opts.app_label, opts.model_name
            return reverse(
                f"{admin.site.name}:{app_label}_{model_name}_simple_history",
                args=[getattr(self, opts.pk.attname), self.history_id],
            )

        def get_instance(self):
            attrs = {
                field.attname: getattr(self, field.attname) for field in fields.values()
            }
            if self._history_excluded_fields:
                # We don't add ManyToManyFields to this list because they may cause
                # the subsequent `.get()` call to fail. See #706 for context.
                excluded_attnames = [
                    model._meta.get_field(field).attname
                    for field in self._history_excluded_fields
                    if not isinstance(model._meta.get_field(field), ManyToManyField)
                ]
                try:
                    values = (
                        model.objects.filter(pk=getattr(self, model._meta.pk.attname))
                        .values(*excluded_attnames)
                        .get()
                    )
                except ObjectDoesNotExist:
                    pass
                else:
                    attrs.update(values)
            result = model(**attrs)
            # this is the only way external code could know an instance is historical
            setattr(result, SIMPLE_HISTORY_REVERSE_ATTR_NAME, self)
            return result

        def get_next_record(self):
            """
            Get the next history record for the instance. `None` if last.
            """
            history = utils.get_history_manager_from_history(self)
            return (
                history.filter(history_date__gt=self.history_date)
                .order_by("history_date")
                .first()
            )

        def get_prev_record(self):
            """
            Get the previous history record for the instance. `None` if first.
            """
            history = utils.get_history_manager_from_history(self)
            return (
                history.filter(history_date__lt=self.history_date)
                .order_by("history_date")
                .last()
            )

        def get_default_history_user(instance):
            """
            Returns the user specified by `get_user` method for manually creating
            historical objects
            """
            return self.get_history_user(instance)

        extra_fields = {
            "history_id": self._get_history_id_field(),
            "history_date": models.DateTimeField(db_index=self._date_indexing is True),
            "history_change_reason": self._get_history_change_reason_field(),
            "history_type": models.CharField(
                max_length=1,
                choices=(("+", _("Created")), ("~", _("Changed")), ("-", _("Deleted"))),
            ),
            "history_object": HistoricalObjectDescriptor(
                model, self.fields_included(model)
            ),
            "instance": property(get_instance),
            "instance_type": model,
            "next_record": property(get_next_record),
            "prev_record": property(get_prev_record),
            "revert_url": revert_url,
            "__str__": lambda self: "{} as of {}".format(
                self.history_object, self.history_date
            ),
            "get_default_history_user": staticmethod(get_default_history_user),
        }

        extra_fields.update(self._get_history_related_field(model))
        extra_fields.update(self._get_history_user_fields())

        return extra_fields

    @property
    def _date_indexing(self):
        """False, True, or 'composite'; default is True"""
        result = getattr(settings, "SIMPLE_HISTORY_DATE_INDEX", True)
        valid = True
        if isinstance(result, str):
            result = result.lower()
            if result not in ("composite",):
                valid = False
        elif not isinstance(result, bool):
            valid = False
        if not valid:
            raise ImproperlyConfigured(
                "SIMPLE_HISTORY_DATE_INDEX must be one of (False, True, 'Composite')"
            )
        return result

    def get_meta_options_m2m(self, through_model):
        """
        Returns a dictionary of fields that will be added to
        the Meta inner class of the m2m historical record model.
        """
        name = self.get_history_model_name(through_model)

        meta_fields = {"verbose_name": name}

        if self.app:
            meta_fields["app_label"] = self.app

        return meta_fields

    def get_meta_options(self, model):
        """
        Returns a dictionary of fields that will be added to
        the Meta inner class of the historical record model.
        """
        meta_fields = {
            "ordering": ("-history_date", "-history_id"),
            "get_latest_by": ("history_date", "history_id"),
        }
        if self.user_set_verbose_name:
            name = self.user_set_verbose_name
        else:
            name = format_lazy("historical {}", smart_str(model._meta.verbose_name))
        if self.user_set_verbose_name_plural:
            plural_name = self.user_set_verbose_name_plural
        else:
            plural_name = format_lazy(
                "historical {}", smart_str(model._meta.verbose_name_plural)
            )
        meta_fields["verbose_name"] = name
        meta_fields["verbose_name_plural"] = plural_name
        if self.app:
            meta_fields["app_label"] = self.app
        if self._date_indexing == "composite":
            meta_fields["indexes"] = (
                models.Index(fields=("history_date", model._meta.pk.attname)),
            )
        return meta_fields

    def post_save(self, instance, created, using=None, **kwargs):
        if not getattr(settings, "SIMPLE_HISTORY_ENABLED", True):
            return
        if hasattr(instance, "skip_history_when_saving"):
            return

        if not kwargs.get("raw", False):
            self.create_historical_record(instance, created and "+" or "~", using=using)

    def post_delete(self, instance, using=None, **kwargs):
        if not getattr(settings, "SIMPLE_HISTORY_ENABLED", True):
            return
        if self.cascade_delete_history:
            manager = getattr(instance, self.manager_name)
            manager.using(using).all().delete()
        else:
            self.create_historical_record(instance, "-", using=using)

    def get_change_reason_for_object(self, instance, history_type, using):
        """
        Get change reason for object.
        Customize this method to automatically fill change reason from context.
        """
        return utils.get_change_reason_from_object(instance)

    def m2m_changed(self, instance, action, attr, pk_set, reverse, **_):
        if not getattr(settings, "SIMPLE_HISTORY_ENABLED", True):
            return
        if hasattr(instance, "skip_history_when_saving"):
            return

        if action in ("post_add", "post_remove", "post_clear"):
            # It should be safe to ~ this since the row must exist to modify m2m on it
            self.create_historical_record(instance, "~")

    def create_historical_record_m2ms(self, history_instance, instance):
        for field in history_instance._history_m2m_fields:
            m2m_history_model = self.m2m_models[field]
            original_instance = history_instance.instance
            through_model = getattr(original_instance, field.name).through
            through_model_field_names = [f.name for f in through_model._meta.fields]
            through_model_fk_field_names = [
                f.name for f in through_model._meta.fields if isinstance(f, ForeignKey)
            ]

            insert_rows = []

            through_field_name = utils.get_m2m_field_name(field)
            rows = through_model.objects.filter(**{through_field_name: instance})
            rows = rows.select_related(*through_model_fk_field_names)
            for row in rows:
                insert_row = {"history": history_instance}

                for field_name in through_model_field_names:
                    insert_row[field_name] = getattr(row, field_name)
                insert_rows.append(m2m_history_model(**insert_row))

            pre_create_historical_m2m_records.send(
                sender=m2m_history_model,
                rows=insert_rows,
                history_instance=history_instance,
                instance=instance,
                field=field,
            )
            created_rows = m2m_history_model.objects.bulk_create(insert_rows)
            post_create_historical_m2m_records.send(
                sender=m2m_history_model,
                created_rows=created_rows,
                history_instance=history_instance,
                instance=instance,
                field=field,
            )

    def create_historical_record(self, instance, history_type, using=None):
        using = using if self.use_base_model_db else None
        history_date = getattr(instance, "_history_date", timezone.now())
        history_user = self.get_history_user(instance)
        history_change_reason = self.get_change_reason_for_object(
            instance, history_type, using
        )
        manager = getattr(instance, self.manager_name)

        attrs = {}
        for field in self.fields_included(instance):
            attrs[field.attname] = getattr(instance, field.attname)

        relation_field = getattr(manager.model, "history_relation", None)
        if relation_field is not None:
            attrs["history_relation"] = instance

        history_instance = manager.model(
            history_date=history_date,
            history_type=history_type,
            history_user=history_user,
            history_change_reason=history_change_reason,
            **attrs,
        )

        pre_create_historical_record.send(
            sender=manager.model,
            instance=instance,
            history_date=history_date,
            history_user=history_user,
            history_change_reason=history_change_reason,
            history_instance=history_instance,
            using=using,
        )

        history_instance.save(using=using)
        self.create_historical_record_m2ms(history_instance, instance)

        post_create_historical_record.send(
            sender=manager.model,
            instance=instance,
            history_instance=history_instance,
            history_date=history_date,
            history_user=history_user,
            history_change_reason=history_change_reason,
            using=using,
        )

    def get_history_user(self, instance):
        """Get the modifying user from instance or middleware."""
        try:
            return instance._history_user
        except AttributeError:
            request = None
            try:
                if self.context.request.user.is_authenticated:
                    request = self.context.request
            except AttributeError:
                pass

        return self.get_user(instance=instance, request=request)

    def get_m2m_fields_from_model(self, model):
        m2m_fields = set(self.m2m_fields)
        try:
            m2m_fields.update(getattr(model, self.m2m_fields_model_field_name))
        except AttributeError:
            pass
        field_names = [
            field if isinstance(field, str) else field.name for field in m2m_fields
        ]
        return [getattr(model, field_name).field for field_name in field_names]


def transform_field(field):
    """Customize field appropriately for use in historical model"""
    field.name = field.attname
    if isinstance(field, models.BigAutoField):
        field.__class__ = models.BigIntegerField
    elif isinstance(field, models.AutoField):
        field.__class__ = models.IntegerField

    elif isinstance(field, models.FileField):
        # Don't copy file, just path.
        if getattr(settings, "SIMPLE_HISTORY_FILEFIELD_TO_CHARFIELD", False):
            field.__class__ = models.CharField
        else:
            field.__class__ = models.TextField

    # Historical instance shouldn't change create/update timestamps
    field.auto_now = False
    field.auto_now_add = False
    # Just setting db_collation explicitly since we're not using
    # field.deconstruct() here
    field.db_collation = None

    if field.primary_key or field.unique:
        # Unique fields can no longer be guaranteed unique,
        # but they should still be indexed for faster lookups.
        field.primary_key = False
        # DEV: Remove this check (but keep the contents) when the minimum required
        #      Django version is 5.1
        if django.VERSION >= (5, 1):
            field.unique = False
        # (Django < 5.1) Can't set `unique` as it's a property, so set the backing field
        # (Django >= 5.1) Set the backing field in addition to the cached property
        #                 above, to cover all bases
        field._unique = False
        field.db_index = True
        field.serialize = True


class HistoricForwardManyToOneDescriptor(ForwardManyToOneDescriptor):
    """
    Overrides get_queryset to provide historic query support, should the
    instance be historic (and therefore was generated by a timepoint query)
    and the other side of the relation also uses a history manager.
    """

    def get_queryset(self, **hints) -> QuerySet:
        instance = hints.get("instance")
        if instance:
            history = getattr(instance, SIMPLE_HISTORY_REVERSE_ATTR_NAME, None)
            histmgr = getattr(
                self.field.remote_field.model,
                getattr(
                    self.field.remote_field.model._meta,
                    "simple_history_manager_attribute",
                    "_notthere",
                ),
                None,
            )
            if history and histmgr:
                return histmgr.as_of(getattr(history, "_as_of", history.history_date))
        return super().get_queryset(**hints)


class HistoricReverseManyToOneDescriptor(ReverseManyToOneDescriptor):
    """
    Overrides get_queryset to provide historic query support, should the
    instance be historic (and therefore was generated by a timepoint query)
    and the other side of the relation also uses a history manager.
    """

    @cached_property
    def related_manager_cls(self):
        related_model = self.rel.related_model

        class HistoricRelationModelManager(related_model._default_manager.__class__):
            def get_queryset(self):
                try:
                    return self.instance._prefetched_objects_cache[
                        self.field.remote_field.get_cache_name()
                    ]
                except (AttributeError, KeyError):
                    history = getattr(
                        self.instance, SIMPLE_HISTORY_REVERSE_ATTR_NAME, None
                    )
                    histmgr = getattr(
                        self.model,
                        getattr(
                            self.model._meta,
                            "simple_history_manager_attribute",
                            "_notthere",
                        ),
                        None,
                    )
                    if history and histmgr:
                        queryset = histmgr.as_of(
                            getattr(history, "_as_of", history.history_date)
                        )
                    else:
                        queryset = super().get_queryset()
                    return self._apply_rel_filters(queryset)

        return create_reverse_many_to_one_manager(
            HistoricRelationModelManager, self.rel
        )


class HistoricForeignKey(ForeignKey):
    """
    Allows foreign keys to work properly from a historic instance.

    If you use as_of queries to extract historical instances from
    a model, and you have other models that are related by foreign
    key and also historic, changing them to a HistoricForeignKey
    field type will allow you to naturally cross the relationship
    boundary at the same point in time as the origin instance.

    A historic instance maintains an attribute ("_historic") when
    it is historic, holding the historic record instance and the
    timepoint used to query it ("_as_of").  HistoricForeignKey
    looks for this and uses an as_of query against the related
    object so the relationship is assessed at the same timepoint.
    """

    forward_related_accessor_class = HistoricForwardManyToOneDescriptor
    related_accessor_class = HistoricReverseManyToOneDescriptor


def is_historic(instance):
    """
    Returns True if the instance was acquired with an as_of timepoint.
    """
    return to_historic(instance) is not None


def to_historic(instance):
    """
    Returns a historic model instance if the instance was acquired with
    an as_of timepoint, or None.
    """
    return getattr(instance, SIMPLE_HISTORY_REVERSE_ATTR_NAME, None)


class HistoricalObjectDescriptor:
    def __init__(self, model, fields_included):
        self.model = model
        self.fields_included = fields_included

    def __get__(self, instance, owner):
        if instance is None:
            return self
        values = {f.attname: getattr(instance, f.attname) for f in self.fields_included}
        return self.model(**values)


class HistoricalChanges(ModelTypeHint):
    def diff_against(
        self,
        old_history: "HistoricalChanges",
        excluded_fields: Iterable[str] = None,
        included_fields: Iterable[str] = None,
        *,
        foreign_keys_are_objs=False,
    ) -> "ModelDelta":
        """
        :param old_history:
        :param excluded_fields: The names of fields to exclude from diffing.
               This takes precedence over ``included_fields``.
        :param included_fields: The names of the only fields to include when diffing.
               If not provided, all history-tracked fields will be included.
        :param foreign_keys_are_objs: If ``False``, the returned diff will only contain
               the raw PKs of any ``ForeignKey`` fields.
               If ``True``, the diff will contain the actual related model objects
               instead of just the PKs; deleted related objects will be instances of
               ``DeletedObject``.
               Note that passing ``True`` will necessarily query the database if the
               related objects have not been prefetched (using e.g.
               ``select_related()``).
        """
        if not isinstance(old_history, type(self)):
            raise TypeError(
                "unsupported type(s) for diffing:"
                f" '{type(self)}' and '{type(old_history)}'"
            )
        if excluded_fields is None:
            excluded_fields = set()

        included_m2m_fields = {field.name for field in old_history._history_m2m_fields}
        if included_fields is None:
            included_fields = {f.name for f in old_history.tracked_fields if f.editable}
        else:
            included_m2m_fields = included_m2m_fields.intersection(included_fields)

        fields = (
            set(included_fields)
            .difference(included_m2m_fields)
            .difference(excluded_fields)
        )
        m2m_fields = set(included_m2m_fields).difference(excluded_fields)

        changes = [
            *self._get_field_changes_for_diff(
                old_history, fields, foreign_keys_are_objs
            ),
            *self._get_m2m_field_changes_for_diff(
                old_history, m2m_fields, foreign_keys_are_objs
            ),
        ]
        # Sort by field (attribute) name, to ensure a consistent order
        changes.sort(key=lambda change: change.field)
        changed_fields = [change.field for change in changes]
        return ModelDelta(changes, changed_fields, old_history, self)

    def _get_field_changes_for_diff(
        self,
        old_history: "HistoricalChanges",
        fields: Iterable[str],
        foreign_keys_are_objs: bool,
    ) -> List["ModelChange"]:
        """Helper method for ``diff_against()``."""
        changes = []

        old_values = model_to_dict(old_history, fields=fields)
        new_values = model_to_dict(self, fields=fields)

        for field in fields:
            old_value = old_values[field]
            new_value = new_values[field]

            if old_value != new_value:
                field_meta = self._meta.get_field(field)
                if foreign_keys_are_objs and isinstance(field_meta, ForeignKey):
                    # Set the fields to their related model objects instead of
                    # the raw PKs from `model_to_dict()`
                    def get_value(record, foreign_key):
                        try:
                            value = getattr(record, field)
                        # `value` seems to be None (without raising this exception)
                        # if the object has not been refreshed from the database
                        except ObjectDoesNotExist:
                            value = None

                        if value is None:
                            value = DeletedObject(field_meta.related_model, foreign_key)
                        return value

                    old_value = get_value(old_history, old_value)
                    new_value = get_value(self, new_value)

                change = ModelChange(field, old_value, new_value)
                changes.append(change)

        return changes

    def _get_m2m_field_changes_for_diff(
        self,
        old_history: "HistoricalChanges",
        m2m_fields: Iterable[str],
        foreign_keys_are_objs: bool,
    ) -> List["ModelChange"]:
        """Helper method for ``diff_against()``."""
        changes = []

        for field in m2m_fields:
            original_field_meta = self.instance_type._meta.get_field(field)
            reverse_field_name = utils.get_m2m_reverse_field_name(original_field_meta)
            # Sort the M2M rows by the related object, to ensure a consistent order
            old_m2m_qs = getattr(old_history, field).order_by(reverse_field_name)
            new_m2m_qs = getattr(self, field).order_by(reverse_field_name)
            m2m_through_model_opts = new_m2m_qs.model._meta

            # Create a list of field names to compare against.
            # The list is generated without the PK of the intermediate (through)
            # table, the foreign key to the history record, and the actual `history`
            # field, to avoid false positives while diffing.
            through_model_fields = [
                f.name
                for f in m2m_through_model_opts.fields
                if f.editable and f.name not in ["id", "m2m_history_id", "history"]
            ]
            old_rows = list(old_m2m_qs.values(*through_model_fields))
            new_rows = list(new_m2m_qs.values(*through_model_fields))

            if old_rows != new_rows:
                if foreign_keys_are_objs:
                    fk_fields = [
                        f
                        for f in through_model_fields
                        if isinstance(m2m_through_model_opts.get_field(f), ForeignKey)
                    ]

                    # Set the through fields to their related model objects instead of
                    # the raw PKs from `values()`
                    def rows_with_foreign_key_objs(m2m_qs):
                        def get_value(obj, through_field):
                            try:
                                value = getattr(obj, through_field)
                            # If the related object has been deleted, `value` seems to
                            # usually already be None instead of raising this exception
                            except ObjectDoesNotExist:
                                value = None

                            if value is None:
                                meta = m2m_through_model_opts.get_field(through_field)
                                foreign_key = getattr(obj, meta.attname)
                                value = DeletedObject(meta.related_model, foreign_key)
                            return value

                        # Replicate the format of the return value of QuerySet.values()
                        return [
                            {
                                through_field: get_value(through_obj, through_field)
                                for through_field in through_model_fields
                            }
                            for through_obj in m2m_qs.select_related(*fk_fields)
                        ]

                    old_rows = rows_with_foreign_key_objs(old_m2m_qs)
                    new_rows = rows_with_foreign_key_objs(new_m2m_qs)

                change = ModelChange(field, old_rows, new_rows)
                changes.append(change)

        return changes


@dataclass(frozen=True)
class DeletedObject:
    model: Type[models.Model]
    pk: Any

    def __str__(self):
        deleted_model_str = _("Deleted %(type_name)s") % {
            "type_name": self.model._meta.verbose_name,
        }
        return f"{deleted_model_str} (pk={self.pk})"


# Either:
# - The value of a foreign key field:
#   - If ``foreign_keys_are_objs=True`` is passed to ``diff_against()``:
#     Either the related object or ``DeletedObject``.
#   - Otherwise:
#     The PK of the related object.
#
# - The value of a many-to-many field:
#   A list of dicts from the through model field names to either:
#   - If ``foreign_keys_are_objs=True`` is passed to ``diff_against()``:
#     Either the through model's related objects or ``DeletedObject``.
#   - Otherwise:
#     The PK of the through model's related objects.
#
# - Any of the other possible values of a model field.
ModelChangeValue = Union[Any, DeletedObject, List[Dict[str, Union[Any, DeletedObject]]]]


@dataclass(frozen=True)
class ModelChange:
    field: str
    old: ModelChangeValue
    new: ModelChangeValue


@dataclass(frozen=True)
class ModelDelta:
    changes: Sequence[ModelChange]
    changed_fields: Sequence[str]
    old_record: HistoricalChanges
    new_record: HistoricalChanges