r4fek/django-cassandra-engine

View on GitHub
django_cassandra_engine/models/django_model_methods.py

Summary

Maintainability
D
1 day
Test Coverage
from django.core.exceptions import NON_FIELD_ERRORS, FieldDoesNotExist, ValidationError

__ALL__ = (
    "_get_pk_val",
    "serializable_value",
    "full_clean",
    "_get_unique_checks",
    "_perform_unique_checks",
    "_perform_date_checks",
    "validate_unique",
    "clean",
    "clean_fields",
)


def _get_pk_val(self, meta=None):
    if not meta:
        meta = self._meta
    return getattr(self, meta.pk.attname)


def serializable_value(self, field_name):
    try:
        field = self._meta.get_field(field_name)
    except FieldDoesNotExist:
        return getattr(self, field_name)
    return getattr(self, field.attname)


def full_clean(self, exclude=None, validate_unique=True):
    # Taken from django.db.models.base
    errors = {}
    if exclude is None:
        exclude = []
    else:
        exclude = list(exclude)

    try:
        self.clean_fields(exclude=exclude)
    except ValidationError as e:
        errors = e.update_error_dict(errors)

    # Form.clean() is run even if other validation fails, so do the
    # same with Model.clean() for consistency.
    try:
        self.clean()
    except ValidationError as e:
        errors = e.update_error_dict(errors)

    # Run unique checks, but only for fields that passed validation.
    if validate_unique:
        for name in errors.keys():
            if name != NON_FIELD_ERRORS and name not in exclude:
                exclude.append(name)
        try:
            self.validate_unique(exclude=exclude)
        except ValidationError as e:
            errors = e.update_error_dict(errors)

    if errors:
        raise ValidationError(errors)


def _get_unique_checks(self, exclude=None):
    # Taken from django.db.models.base
    if exclude is None:
        exclude = []
    unique_checks = []

    unique_togethers = [(self.__class__, self._meta.unique_together)]
    for parent_class in self._meta.get_parent_list():
        if parent_class._meta.unique_together:
            unique_togethers.append((parent_class, parent_class._meta.unique_together))

    for model_class, unique_together in unique_togethers:
        for check in unique_together:
            for name in check:
                # If this is an excluded field, don't add this check.
                if name in exclude:
                    break
            else:
                unique_checks.append((model_class, tuple(check)))

    # These are checks for the unique_for_<date/year/month>.
    date_checks = []

    # Gather a list of checks for fields declared as unique and add them to
    # the list of checks.

    fields_with_class = [(self.__class__, self._meta.local_fields)]
    for parent_class in self._meta.get_parent_list():
        fields_with_class.append((parent_class, parent_class._meta.local_fields))

    for model_class, fields in fields_with_class:
        for f in fields:
            name = f.name
            if name in exclude:
                continue
            if f.unique:
                unique_checks.append((model_class, (name,)))
            if f.unique_for_date and f.unique_for_date not in exclude:
                date_checks.append((model_class, "date", name, f.unique_for_date))
            if f.unique_for_year and f.unique_for_year not in exclude:
                date_checks.append((model_class, "year", name, f.unique_for_year))
            if f.unique_for_month and f.unique_for_month not in exclude:
                date_checks.append((model_class, "month", name, f.unique_for_month))
    return unique_checks, date_checks


def _perform_unique_checks(self, unique_checks):
    # Taken from django.db.models.base
    errors = {}

    for model_class, unique_check in unique_checks:
        # Try to look up an existing object with the same values as this
        # object's values for all the unique field.

        lookup_kwargs = {}
        for field_name in unique_check:
            f = self._meta.get_field(field_name)
            lookup_value = getattr(self, f.attname)
            if lookup_value is None:
                # no value, skip the lookup
                continue
            if f.primary_key and not self._state.adding:
                # no need to check for unique primary key when editing
                continue
            lookup_kwargs[str(field_name)] = lookup_value

        # some fields were skipped, no reason to do the check
        if len(unique_check) != len(lookup_kwargs):
            continue

        qs = model_class._default_manager.filter(**lookup_kwargs)

        # Exclude the current object from the query if we are editing an
        # instance (as opposed to creating a new one)
        # Note that we need to use the pk as defined by model_class, not
        # self.pk. These can be different fields because model inheritance
        # allows single model to have effectively multiple primary keys.
        # Refs #17615.
        model_class_pk = self._get_pk_val(model_class._meta)
        if not self._state.adding and model_class_pk is not None:
            qs = qs.exclude(pk=model_class_pk)
        if qs.exists():
            if len(unique_check) == 1:
                key = unique_check[0]
            else:
                key = NON_FIELD_ERRORS
            errors.setdefault(key, []).append(
                self.unique_error_message(model_class, unique_check)
            )

    return errors


def _perform_date_checks(self, date_checks):
    # Taken from django.db.models.base
    errors = {}
    for model_class, lookup_type, field, unique_for in date_checks:
        lookup_kwargs = {}
        # there's a ticket to add a date lookup, we can remove this special
        # case if that makes it's way in
        date = getattr(self, unique_for)
        if date is None:
            continue
        if lookup_type == "date":
            lookup_kwargs["%s__day" % unique_for] = date.day
            lookup_kwargs["%s__month" % unique_for] = date.month
            lookup_kwargs["%s__year" % unique_for] = date.year
        else:
            lookup_kwargs["%s__%s" % (unique_for, lookup_type)] = getattr(
                date, lookup_type
            )
        lookup_kwargs[field] = getattr(self, field)

        qs = model_class._default_manager.filter(**lookup_kwargs)
        # Exclude the current object from the query if we are editing an
        # instance (as opposed to creating a new one)
        if not self._state.adding and self.pk is not None:
            qs = qs.exclude(pk=self.pk)

        if qs.exists():
            errors.setdefault(field, []).append(
                self.date_error_message(lookup_type, field, unique_for)
            )
    return errors


def validate_unique(self, exclude=None):
    # Taken from django.db.models.base
    unique_checks, date_checks = self._get_unique_checks(exclude=exclude)

    errors = self._perform_unique_checks(unique_checks)
    date_errors = self._perform_date_checks(date_checks)
    for k, v in date_errors.items():
        errors.setdefault(k, []).extend(v)

    if errors:
        raise ValidationError(errors)


def clean(self):
    # Taken from django.db.models.base
    pass


def clean_fields(self, exclude=None):
    # Taken from django.db.models.base
    if exclude is None:
        exclude = []

    errors = {}
    for f in self._meta.fields:
        if f.name in exclude:
            continue
        # Skip validation for empty fields with blank=True. The developer
        # is responsible for making sure they have a valid value.
        raw_value = getattr(self, f.attname)
        if f.blank and raw_value in f.empty_values:
            continue
        try:
            setattr(self, f.attname, f.clean(raw_value, self))
        except ValidationError as e:
            errors[f.name] = e.error_list

    if errors:
        raise ValidationError(errors)