renalreg/radar

View on GitHub
radar/api/serializers/validators.py

Summary

Maintainability
D
2 days
Test Coverage
from datetime import datetime
import re

from cornflake.fields import ValidationError
from cornflake.utils import safe_strftime
from cornflake.validators import after, not_in_future
import pytz

from radar.models.groups import (
    GROUP_CODE_BAPN,
    GROUP_CODE_CHI,
    GROUP_CODE_HSC,
    GROUP_CODE_NHS,
    GROUP_CODE_RADAR,
    GROUP_CODE_UKRDC,
    GROUP_CODE_UKRR,
    GROUP_TYPE,
)
from radar.models.patients import Patient
from radar.utils import datetime_to_date, is_datetime


HUMAN_DATE_FORMAT = '%d/%m/%Y'

EMAIL_REGEX = re.compile(r'^\S+@[^\.@\s][^@]*\.[^\.@\s]+$')

USERNAME_REGEX = re.compile(r'^[a-z0-9](?:[a-z0-9]*(?:[\.][a-z0-9]+)?)*$')
USERNAME_MIN_LENGTH = 4
USERNAME_MAX_LENGTH = 32

TRAILING_COMMA_REGEX = re.compile(r'\s*,$')

DAY_ZERO = datetime(1900, 1, 1, 0, 0, 0, tzinfo=pytz.utc)

WHITESPACE_REGEX = re.compile(r'\s')
LEADING_ZERO_REGEX = re.compile(r'^0+')
DIGITS_REGEX = re.compile(r'^[0-9]+$')

MIN_UKRR_NO = 199600001
MAX_UKRR_NO = 999999999

BAPN_NO_REGEX = re.compile(r'^[ABCDFGHJKLMNPT][0-9]{,3}$')
BAPN_LEADING_ZEROS = re.compile(r'^[A-Z](0+)')

MIN_UKRDC_NO = 100000001
MAX_UKRDC_NO = 999999999

MIN_GMC_NO = 0
MAX_GMC_NO = 9999999

MIN_NHS_NO = 4000000000

MIN_CHI_NO = 101000000
MAX_CHI_NO = 3112999999

MIN_HSC_NO = 3200000010
MAX_HSC_NO = 3999999999


def after_day_zero(dt_format=HUMAN_DATE_FORMAT):
    after_f = after(min_dt=DAY_ZERO, dt_format=dt_format)

    def after_day_zero_f(value):
        return after_f(value)

    return after_day_zero_f


class after_date_of_birth(object):
    def __init__(self, field_name, patient='patient', parent=None):
        self.field_name = field_name
        self.patient = patient
        self.parent = parent

    def __call__(self, data):
        field = self.parent.fields[self.field_name]

        value = field.get_attribute(data)

        if value is None:
            return data

        if callable(self.patient):
            patient = self.patient()
        elif isinstance(self.patient, Patient):
            patient = self.patient
        else:
            patient_field = self.parent.fields[self.patient]
            patient = patient_field.get_attribute(data)

        if patient is None:
            return data

        if is_datetime(value):
            value_date = datetime_to_date(value)
        else:
            value_date = value

        earliest_date_of_birth = patient.earliest_date_of_birth

        if earliest_date_of_birth is not None and value_date < earliest_date_of_birth:
            message = "Value is before the patient's date of birth ({}).".format(
                safe_strftime(earliest_date_of_birth, HUMAN_DATE_FORMAT)
            )

            raise ValidationError({self.field_name: message})

        return data

    def set_context(self, parent):
        self.parent = parent


class valid_date_for_patient(object):
    def __init__(self, field_name, patient='patient', parent=None):
        self.field_name = field_name
        self.patient = patient
        self.parent = parent

    def __call__(self, data):
        field = self.parent.fields[self.field_name]
        value = field.get_attribute(data)

        # Exit early if value is none
        if value is None:
            return data

        try:
            # Not a long way in the past
            value = after_day_zero()(value)

            # Not in the future
            value = not_in_future()(value)
        except ValidationError as e:
            raise ValidationError({self.field_name: e.errors})

        data[field.source] = value

        # After the patient's date of birth
        data = after_date_of_birth(self.field_name, self.patient, self.parent)(data)

        return data

    def set_context(self, parent):
        self.parent = parent


def username():
    def username_f(value):
        value = value.lower()

        # Old usernames are email addresses
        if not EMAIL_REGEX.match(value):
            if not USERNAME_REGEX.match(value):
                raise ValidationError('Not a valid username.')
            elif len(value) < USERNAME_MIN_LENGTH:
                raise ValidationError('Username too short.')
            elif len(value) > USERNAME_MAX_LENGTH:
                raise ValidationError('Username too long.')

        return value

    return username_f


def remove_trailing_comma():
    def remove_trailing_comma_f(value):
        # Remove a trailing comma
        value = TRAILING_COMMA_REGEX.sub('', value)
        return value

    return remove_trailing_comma_f


def clean_int(value):
    if isinstance(value, str):
        # Remove non-digits
        value = re.sub('[^0-9]', '', value)

        # Remove leading zeros
        value = re.sub('^0+', '', value)

    return value


def check_range(value, min_value=None, max_value=None):
    if isinstance(value, str):
        value = int(value)

    # min_value <= x <= max_value
    return (
        (min_value is None or value >= min_value) and
        (max_value is None or value <= max_value)
    )


def _nhs_no(value, min_value=None, max_value=None):
    if not isinstance(value, str):
        value = str(value)

    # Remove non-digits
    value = re.sub('[^0-9]', '', value)

    if len(value) > 10:
        # Remove extra leading zeros
        if re.match('^0+$', value[0:-10]):
            value = value[-10:]
    elif len(value) < 10:
        # Add leading zeros
        value = value.zfill(10)

    if len(value) != 10:
        raise ValueError('Not 10 digits.')

    check_digit = 0

    for i in range(0, 9):
        check_digit += int(value[i]) * (10 - i)

    check_digit = 11 - (check_digit % 11)

    if check_digit == 11:
        check_digit = 0

    if check_digit != int(value[9]):
        raise ValueError('Bas check digit.')

    if not check_range(value, min_value, max_value):
        raise ValueError('Not in range.')

    return value


def nhs_no():
    def nhs_no_f(value):
        try:
            new_value = _nhs_no(value, MIN_NHS_NO)
        except ValueError:
            raise ValidationError('Not a valid NHS number.')

        if isinstance(value, str):
            value = new_value

        return value

    return nhs_no_f


def chi_no():
    def chi_no_f(value):
        try:
            new_value = _nhs_no(value, MIN_CHI_NO, MAX_CHI_NO)
        except ValueError:
            raise ValidationError('Not a valid CHI number.')

        if isinstance(value, str):
            value = new_value

        return value

    return chi_no_f


def hsc_no():
    def hsc_no_f(value):
        try:
            new_value = _nhs_no(value, MIN_HSC_NO, MAX_HSC_NO)
        except ValueError:
            raise ValidationError('Not a valid H&C number.')

        if isinstance(value, str):
            value = new_value

        return value

    return hsc_no_f


def ukrr_no():
    def ukrr_no_f(value):
        value = clean_int(value)

        try:
            x = int(value)
        except ValueError:
            raise ValidationError('Not a valid UK Renal Registry number.')

        if x < MIN_UKRR_NO or x > MAX_UKRR_NO:
            raise ValidationError('Not a valid UK Renal Registry number.')

        return value

    return ukrr_no_f


def nhsbt_no():
    def nhsbt_no_f(value):
        if isinstance(value, str):
            # Remove leading zeros and whitespace
            value = LEADING_ZERO_REGEX.sub('', value)
            value = WHITESPACE_REGEX.sub('', value)

            if not DIGITS_REGEX.match(value):
                raise ValidationError('Not a valid NHS Blood and Tracing number.')

        return value

    return nhsbt_no_f


def bapn_no():
    def bapn_no_f(value):
        value = value.upper()
        value = re.sub('[^A-Z0-9]', '', value)

        if not BAPN_NO_REGEX.match(value):
            raise ValidationError('Not a valid BAPN number.')

        # Remove leading zeros
        value = BAPN_LEADING_ZEROS.sub('', value)

        return value

    return bapn_no_f


def ukrdc_no():
    def ukrdc_no_f(value):
        value = clean_int(value)

        try:
            x = int(value)
        except ValueError:
            raise ValidationError('Not a valid UKRDC number.')

        if x < MIN_UKRDC_NO or x > MAX_UKRDC_NO:
            raise ValidationError('Not a valid UKRDC number.')

        return value

    return ukrdc_no_f


def gmc_number():
    def gmc_number_f(value):
        value = clean_int(value)

        try:
            x = int(value)
        except ValueError:
            raise ValidationError('Not a valid GMC number.')

        if x < MIN_GMC_NO or x > MAX_GMC_NO:
            raise ValidationError('Not a valid GMC number.')

        return value

    return gmc_number_f


def radar_no():
    def radar_no_f(value):
        value = clean_int(value)

        try:
            int(value)
        except ValueError:
            raise ValidationError('Not a valid RaDaR number.')

        return value

    return radar_no_f


NUMBER_VALIDATORS = {
    (GROUP_TYPE.OTHER, GROUP_CODE_NHS): [nhs_no()],
    (GROUP_TYPE.OTHER, GROUP_CODE_CHI): [chi_no()],
    (GROUP_TYPE.OTHER, GROUP_CODE_HSC): [hsc_no()],
    (GROUP_TYPE.OTHER, GROUP_CODE_UKRR): [ukrr_no()],
    (GROUP_TYPE.OTHER, GROUP_CODE_UKRDC): [ukrdc_no()],
    (GROUP_TYPE.OTHER, GROUP_CODE_BAPN): [bapn_no()],
    (GROUP_TYPE.SYSTEM, GROUP_CODE_RADAR): [radar_no()],
}


def get_number_validators(group):
    return NUMBER_VALIDATORS.get((group.type, group.code), [])