django_cassandra_engine/models/django_model_methods.py
from django.core.exceptions import NON_FIELD_ERRORS, FieldDoesNotExist, ValidationError
__ALL__ = (
"_get_pk_val",
"serializable_value",
"full_clean",
"_get_unique_checks",
"_perform_unique_checks",
"_perform_date_checks",
"validate_unique",
"clean",
"clean_fields",
)
def _get_pk_val(self, meta=None):
if not meta:
meta = self._meta
return getattr(self, meta.pk.attname)
def serializable_value(self, field_name):
try:
field = self._meta.get_field(field_name)
except FieldDoesNotExist:
return getattr(self, field_name)
return getattr(self, field.attname)
def full_clean(self, exclude=None, validate_unique=True):
# Taken from django.db.models.base
errors = {}
if exclude is None:
exclude = []
else:
exclude = list(exclude)
try:
self.clean_fields(exclude=exclude)
except ValidationError as e:
errors = e.update_error_dict(errors)
# Form.clean() is run even if other validation fails, so do the
# same with Model.clean() for consistency.
try:
self.clean()
except ValidationError as e:
errors = e.update_error_dict(errors)
# Run unique checks, but only for fields that passed validation.
if validate_unique:
for name in errors.keys():
if name != NON_FIELD_ERRORS and name not in exclude:
exclude.append(name)
try:
self.validate_unique(exclude=exclude)
except ValidationError as e:
errors = e.update_error_dict(errors)
if errors:
raise ValidationError(errors)
def _get_unique_checks(self, exclude=None):
# Taken from django.db.models.base
if exclude is None:
exclude = []
unique_checks = []
unique_togethers = [(self.__class__, self._meta.unique_together)]
for parent_class in self._meta.get_parent_list():
if parent_class._meta.unique_together:
unique_togethers.append((parent_class, parent_class._meta.unique_together))
for model_class, unique_together in unique_togethers:
for check in unique_together:
for name in check:
# If this is an excluded field, don't add this check.
if name in exclude:
break
else:
unique_checks.append((model_class, tuple(check)))
# These are checks for the unique_for_<date/year/month>.
date_checks = []
# Gather a list of checks for fields declared as unique and add them to
# the list of checks.
fields_with_class = [(self.__class__, self._meta.local_fields)]
for parent_class in self._meta.get_parent_list():
fields_with_class.append((parent_class, parent_class._meta.local_fields))
for model_class, fields in fields_with_class:
for f in fields:
name = f.name
if name in exclude:
continue
if f.unique:
unique_checks.append((model_class, (name,)))
if f.unique_for_date and f.unique_for_date not in exclude:
date_checks.append((model_class, "date", name, f.unique_for_date))
if f.unique_for_year and f.unique_for_year not in exclude:
date_checks.append((model_class, "year", name, f.unique_for_year))
if f.unique_for_month and f.unique_for_month not in exclude:
date_checks.append((model_class, "month", name, f.unique_for_month))
return unique_checks, date_checks
def _perform_unique_checks(self, unique_checks):
# Taken from django.db.models.base
errors = {}
for model_class, unique_check in unique_checks:
# Try to look up an existing object with the same values as this
# object's values for all the unique field.
lookup_kwargs = {}
for field_name in unique_check:
f = self._meta.get_field(field_name)
lookup_value = getattr(self, f.attname)
if lookup_value is None:
# no value, skip the lookup
continue
if f.primary_key and not self._state.adding:
# no need to check for unique primary key when editing
continue
lookup_kwargs[str(field_name)] = lookup_value
# some fields were skipped, no reason to do the check
if len(unique_check) != len(lookup_kwargs):
continue
qs = model_class._default_manager.filter(**lookup_kwargs)
# Exclude the current object from the query if we are editing an
# instance (as opposed to creating a new one)
# Note that we need to use the pk as defined by model_class, not
# self.pk. These can be different fields because model inheritance
# allows single model to have effectively multiple primary keys.
# Refs #17615.
model_class_pk = self._get_pk_val(model_class._meta)
if not self._state.adding and model_class_pk is not None:
qs = qs.exclude(pk=model_class_pk)
if qs.exists():
if len(unique_check) == 1:
key = unique_check[0]
else:
key = NON_FIELD_ERRORS
errors.setdefault(key, []).append(
self.unique_error_message(model_class, unique_check)
)
return errors
def _perform_date_checks(self, date_checks):
# Taken from django.db.models.base
errors = {}
for model_class, lookup_type, field, unique_for in date_checks:
lookup_kwargs = {}
# there's a ticket to add a date lookup, we can remove this special
# case if that makes it's way in
date = getattr(self, unique_for)
if date is None:
continue
if lookup_type == "date":
lookup_kwargs["%s__day" % unique_for] = date.day
lookup_kwargs["%s__month" % unique_for] = date.month
lookup_kwargs["%s__year" % unique_for] = date.year
else:
lookup_kwargs["%s__%s" % (unique_for, lookup_type)] = getattr(
date, lookup_type
)
lookup_kwargs[field] = getattr(self, field)
qs = model_class._default_manager.filter(**lookup_kwargs)
# Exclude the current object from the query if we are editing an
# instance (as opposed to creating a new one)
if not self._state.adding and self.pk is not None:
qs = qs.exclude(pk=self.pk)
if qs.exists():
errors.setdefault(field, []).append(
self.date_error_message(lookup_type, field, unique_for)
)
return errors
def validate_unique(self, exclude=None):
# Taken from django.db.models.base
unique_checks, date_checks = self._get_unique_checks(exclude=exclude)
errors = self._perform_unique_checks(unique_checks)
date_errors = self._perform_date_checks(date_checks)
for k, v in date_errors.items():
errors.setdefault(k, []).extend(v)
if errors:
raise ValidationError(errors)
def clean(self):
# Taken from django.db.models.base
pass
def clean_fields(self, exclude=None):
# Taken from django.db.models.base
if exclude is None:
exclude = []
errors = {}
for f in self._meta.fields:
if f.name in exclude:
continue
# Skip validation for empty fields with blank=True. The developer
# is responsible for making sure they have a valid value.
raw_value = getattr(self, f.attname)
if f.blank and raw_value in f.empty_values:
continue
try:
setattr(self, f.attname, f.clean(raw_value, self))
except ValidationError as e:
errors[f.name] = e.error_list
if errors:
raise ValidationError(errors)