renalreg/cornflake

View on GitHub
cornflake/fields.py

Summary

Maintainability
F
4 days
Test Coverage
import collections
import copy
import uuid
from datetime import date, datetime

import six

from cornflake.utils import parse_datetime
from cornflake.exceptions import ValidationError, SkipField


class _empty(object):
    def __bool__(self):
        return False

    __nonzero__ = __bool__


empty = _empty()

try:
    basestring
except NameError:
    basestring = str


class Field(object):
    _creation_counter = 0

    error_messages = {
        'required': 'This field is required.',
    }

    def __new__(cls, *args, **kwargs):
        instance = super(Field, cls).__new__(cls)
        instance._args = args
        instance._kwargs = kwargs
        return instance

    def __init__(
            self,
            source=None, field_name=None,
            read_only=False, write_only=False,
            required=None, default=None, default_empty=empty,
            validators=None, error_messages=None,
            initial=None
    ):
        # Keep track of field declaration order
        self._creation_counter = Field._creation_counter
        Field._creation_counter += 1

        if required is None:
            required = not read_only

        if default_empty is empty:
            default_empty = default

        if validators is None:
            validators = list()

        assert not (read_only and write_only)
        assert not (required and read_only)

        self.source = source
        self.field_name = field_name
        self.required = required
        self.default = default
        self.default_empty = default_empty
        self.read_only = read_only
        self.write_only = write_only
        self.validators = validators
        self.initial = initial

        messages = dict()

        for cls in reversed(self.__class__.__mro__):
            messages.update(getattr(cls, 'error_messages', dict()))

        if error_messages is not None:
            messages.update(error_messages)

        self.error_messages = messages

        self.field_name = None
        self.parent = None

        self._context = {}

    def bind(self, parent, field_name=None):
        self.parent = parent

        if self.field_name is None:
            self.field_name = field_name

        if self.source is None:
            self.source = field_name

    def fail(self, key):
        raise ValidationError(self.error_messages[key])

    @property
    def context(self):
        return self.root._context

    def get_attribute(self, instance):
        try:
            if isinstance(instance, collections.Mapping):
                instance = instance[self.source]
            else:
                instance = getattr(instance, self.source)
        except (AttributeError, KeyError) as e:
            if not self.required:
                raise SkipField

            raise e

        return instance

    def get_value(self, data):
        return data.get(self.field_name, empty)

    def get_default(self, empty):
        if empty:
            default = self.default_empty
        else:
            default = self.default

        if callable(default):
            default = default()

        return default

    def get_initial(self):
        initial = self.initial

        if callable(initial):
            initial = initial()

        return initial

    def to_representation(self, value):
        return value

    def to_internal_value(self, data):
        return data

    def validate_empty_values(self, data):
        if data is empty:
            data = self.get_default(True)
        elif data is None:
            data = self.get_default(False)

        if data is None and self.required:
            self.fail('required')

        return data

    def run_validation(self, data):
        data = self.validate_empty_values(data)

        if data is None:
            return data

        value = self.to_internal_value(data)
        value = self.run_validators(value)
        value = self.validate(value)

        return value

    def run_validators(self, value):
        for validator in self.validators:
            if hasattr(validator, 'set_context'):
                validator.set_context(self)

            try:
                value = validator(value)
            except SkipField:
                break

        return value

    def validate(self, value):
        return value

    @property
    def root(self):
        root = self

        while root.parent is not None:
            root = root.parent

        return root

    def __deepcopy__(self, memo):
        args = copy.deepcopy(self._args)
        kwargs = copy.deepcopy(self._kwargs)
        return self.__class__(*args, **kwargs)


class StringField(Field):
    error_messages = {
        'invalid': 'A valid string is required.'
    }

    def __init__(self, **kwargs):
        self.trim_whitespace = kwargs.pop('trim_whitespace', True)
        super(StringField, self).__init__(**kwargs)

    def to_internal_value(self, data):
        if isinstance(data, (dict, list, bool)):
            self.fail('invalid')

        value = six.text_type(data)

        if self.trim_whitespace:
            value = value.strip()

        return value

    def to_representation(self, value):
        return six.text_type(value)


class BooleanField(Field):
    error_messages = {
        'invalid': 'A valid boolean is required.'
    }

    TRUE_VALUES = {'t', 'true', 'y', 'yes', '1', 1, True}
    FALSE_VALUES = {'f', 'false', 'n', 'no', '0', 0, False}

    def to_internal_value(self, data):
        if hasattr(data, 'lower'):
            data = data.lower()

        # Check for TypeError as list and dict aren't hashable
        try:
            if data in self.TRUE_VALUES:
                return True
            elif data in self.FALSE_VALUES:
                return False
            else:
                self.fail('invalid')
        except TypeError:
            self.fail('invalid')

    def to_representation(self, value):
        return bool(value)


class IntegerField(Field):
    error_messages = {
        'invalid': 'A valid integer is required.'
    }

    def to_internal_value(self, data):
        if isinstance(data, basestring):
            data = data.strip()

        try:
            value = int(data)
            value_f = float(data)

            # No floats
            if value != value_f:
                self.fail('invalid')
        except (ValueError, TypeError):
            self.fail('invalid')

        return value

    def to_representation(self, value):
        return int(value)


class FloatField(Field):
    error_messages = {
        'invalid': 'A valid number is required.'
    }

    def to_internal_value(self, data):
        if isinstance(data, basestring):
            data = data.strip()

        try:
            value = float(data)
        except (ValueError, TypeError):
            self.fail('invalid')

        return value

    def to_representation(self, value):
        return float(value)


class DateField(Field):
    error_messages = {
        'invalid': 'Invalid date format.',
        'datetime': 'Expected a date but got a datetime.',
    }

    def parse(self, data):
        return parse_datetime(data).date()

    def format(self, value):
        return value.isoformat()

    def to_internal_value(self, data):
        if isinstance(data, datetime):
            self.fail('datetime')
        elif isinstance(data, date):
            # Already a date
            return data
        elif not isinstance(data, six.string_types):
            # Not a string
            self.fail('invalid')
        else:
            try:
                value = self.parse(data)
            except ValueError:
                self.fail('invalid')

            return value

    def to_representation(self, value):
        return self.format(value)


class DateTimeField(Field):
    error_messages = {
        'invalid': 'Invalid date format.',
        'date': 'Expected a datetime but got a date.',
    }

    def parse(self, data):
        return parse_datetime(data)

    def format(self, value):
        return value.isoformat()

    def to_internal_value(self, data):
        if isinstance(data, datetime):
            # Already a datetime
            return data
        elif isinstance(data, date):
            self.fail('date')
        elif not isinstance(data, six.string_types):
            # Not a string
            self.fail('invalid')
        else:
            try:
                value = self.parse(data)
            except ValueError:
                self.fail('invalid')

            return value

    def to_representation(self, value):
        return self.format(value)


class ListField(Field):
    child = None

    error_messages = {
        'not_a_list': 'Expected a list.'
    }

    def __init__(self, *args, **kwargs):
        self.child = kwargs.pop('child', copy.deepcopy(self.child))
        assert self.child is not None

        kwargs.setdefault('default', list)

        super(ListField, self).__init__(*args, **kwargs)

        self.child.bind(self)

    def to_internal_value(self, data):
        if not isinstance(data, list):
            self.fail('not_a_list')

        values = []
        errors = {}

        for i, x in enumerate(data):
            try:
                value = self.child.run_validation(x)
            except ValidationError as e:
                errors[i] = e.errors
            else:
                values.append(value)

        if errors:
            raise ValidationError(errors)

        return values

    def to_representation(self, values):
        data = []

        for value in values:
            if value is None:
                data.append(None)
            else:
                data.append(self.child.to_representation(value))

        return data


class CommaSeparatedField(Field):
    child = None

    error_messages = {
        'invalid': 'A valid string is required.'
    }

    def __init__(self, **kwargs):
        self.child = kwargs.pop('child', copy.deepcopy(self.child))
        assert self.child is not None

        kwargs.setdefault('default', list)

        super(CommaSeparatedField, self).__init__(**kwargs)

        self.child.bind(self)

    def to_internal_value(self, data):
        if isinstance(data, dict) or isinstance(data, bool):
            self.fail('invalid')

        if isinstance(data, list):
            parts = data
        else:
            data = six.text_type(data)

            if len(data) == 0:
                parts = []
            else:
                parts = data.split(',')

        values = []

        for part in parts:
            value = self.child.run_validation(part)
            values.append(value)

        return values

    def to_representation(self, values):
        data = []

        for value in values:
            if value is None:
                data.append(None)
            else:
                data.append(six.text_type(self.child.to_representation(value)))

        data = ','.join(data)

        return data


class UUIDField(Field):
    error_messages = {
        'invalid': 'A valid UUID is required.'
    }

    def to_internal_value(self, data):
        if isinstance(data, dict) or isinstance(data, list) or isinstance(data, bool):
            self.fail('invalid')

        value = six.text_type(data)

        try:
            value = six.text_type(uuid.UUID(value))
        except ValueError:
            self.fail('invalid')

        return value

    def to_representation(self, value):
        return six.text_type(value)


class EnumField(Field):
    error_messages = {
        'invalid': 'Not a valid value.'
    }

    def __init__(self, enum, **kwargs):
        super(EnumField, self).__init__(**kwargs)
        self.enum = enum

    def to_internal_value(self, data):
        try:
            value = self.enum(data)
        except ValueError:
            self.fail('invalid')

        return value

    def to_representation(self, value):
        try:
            data = value.value
        except AttributeError:
            data = value

        return data


class LookupField(Field):
    error_messages = {
        'invalid': 'Not a valid value.'
    }

    def __init__(self, items, key_field=None, key_name='key', value_name='value', **kwargs):
        super(LookupField, self).__init__(**kwargs)

        if key_field is None:
            key_field = Field()

        self.key_field = key_field
        self.key_field.bind(self)

        self.value_field = StringField()
        self.value_field.bind(self)

        self.items = items

        self.key_name = key_name
        self.value_name = value_name

    def to_internal_value(self, data):
        if isinstance(data, dict):
            data = data.get(self.key_name, empty)

            if data is empty:
                self.fail('required')
            elif data is None:
                if None in self.items:
                    value = data
                else:
                    self.fail('required')
            else:
                value = self.key_field.to_internal_value(data)
        else:
            value = self.key_field.to_internal_value(data)

        if value not in self.items.keys():
            self.fail('invalid')

        return value

    def to_representation(self, key):
        value = self.items[key]

        return {
            self.key_name: self.key_field.to_representation(key),
            self.value_name: self.value_field.to_representation(value)
        }


class StringLookupField(LookupField):
    def __init__(self, items, **kwargs):
        kwargs['key_field'] = StringField()
        super(StringLookupField, self).__init__(items, **kwargs)


class IntegerLookupField(LookupField):
    def __init__(self, items, **kwargs):
        kwargs['key_field'] = IntegerField()
        super(IntegerLookupField, self).__init__(items, **kwargs)


class EnumLookupField(LookupField):
    def __init__(self, enum, items, **kwargs):
        kwargs['key_field'] = EnumField(enum)
        super(EnumLookupField, self).__init__(items, **kwargs)