uktrade/directory-sso

View on GitHub
sso/user/models.py

Summary

Maintainability
A
0 mins
Test Coverage
from allauth.account.forms import default_token_generator, user_pk_to_url_str
from allauth.account.models import EmailConfirmation
from dateutil.relativedelta import relativedelta
from directory_constants import choices
from django.conf import settings
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager, PermissionsMixin
from django.core.mail import send_mail
from django.db import models
from django.db.models import JSONField, Q
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from sort_order_field import SortOrderField

from sso.api.model_utils import TimeStampedModel
from sso.constants import API_DATETIME_FORMAT


class UserManager(BaseUserManager):
    use_in_migrations = True

    def _create_user(self, email, password, **extra_fields):
        if not email:
            raise ValueError('Email must be set')

        email = self.normalize_email(email)
        user = self.model(email=email, **extra_fields)
        user.set_password(password)
        user.save(using=self._db)

        return user

    def create_user(self, email=None, password=None, **extra_fields):
        extra_fields.setdefault('is_staff', False)
        extra_fields.setdefault('is_superuser', False)

        return self._create_user(email, password, **extra_fields)

    def create_superuser(self, email, password, **extra_fields):
        extra_fields.setdefault('is_staff', True)
        extra_fields.setdefault('is_superuser', True)

        if extra_fields['is_staff'] is not True:
            raise ValueError('Superuser must have is_staff=True')

        if extra_fields['is_superuser'] is not True:
            raise ValueError('Superuser must have is_superuser=True')

        user = self._create_user(email, password, **extra_fields)
        user.emailaddress_set.create(email=email, verified=True)
        return user


class InactiveUserManager(models.Manager):
    def get_queryset(self):
        date = timezone.now() - relativedelta(years=settings.DATA_RETENTION_STORAGE_YEARS)
        return (
            super()
            .get_queryset()
            .filter(
                # Accounts with last login dated less than or equal year and month provided
                Q(
                    last_login__lte=date,
                )
                |
                # Accounts with no login activity, with creation dated less than or equal year and month provided
                Q(
                    last_login__isnull=True,
                    created__lte=date,
                )
            )
            # Exclude superusers and staff members
            .exclude(Q(is_superuser=True) | Q(is_staff=True))
        )


class User(AbstractBaseUser, PermissionsMixin, TimeStampedModel):

    email = models.EmailField(_('email'), unique=True)
    is_staff = models.BooleanField(
        _('staff status'), default=False, help_text=_('Designates whether the user can log into this admin site.')
    )
    is_active = models.BooleanField(
        _('active'),
        default=True,
        help_text=_(
            'Designates whether this user should be treated as active. Unselect this instead of deleting accounts.'
        ),
    )
    date_joined = models.DateTimeField(_('date joined'), default=timezone.now)
    utm = JSONField(blank=True, default=dict, help_text=_('Urchin Tracking Module query parameters passed in the URL'))
    hashed_uuid = models.CharField(max_length=200, help_text='a hash representation of the object\'s id', default='')

    first_name = models.CharField(max_length=30, blank=True)

    last_name = models.CharField(max_length=30, blank=True)

    failed_login_attempts = models.PositiveSmallIntegerField(default=0)

    inactivity_notification = models.PositiveSmallIntegerField(default=0)

    inactivity_notification_sent = models.DateTimeField(_('Notification sent'), null=True)

    objects = UserManager()
    inactive = InactiveUserManager()

    USERNAME_FIELD = 'email'
    REQUIRED_FIELDS = []

    def __str__(self):
        return self.email

    def get_full_name(self):
        # django method that must be implemented
        return self.email

    def get_short_name(self):
        # django method that must be implemented
        return self.email

    def check_password(self, raw_password):
        """Hook to update the failed login attempt counter."""
        is_correct = super().check_password(raw_password)
        if is_correct:
            self.failed_login_attempts = 0
            self.inactivity_notification = 0
            self.save()
        else:
            self.failed_login_attempts += 1
            self.save()
        self.notify_suspicious_login_activity()
        return is_correct

    def notify_suspicious_login_activity(self):
        notification_threshold = settings.SSO_SUSPICIOUS_LOGIN_MAX_ATTEMPTS
        if self.failed_login_attempts == notification_threshold and settings.SSO_SUSPICIOUS_ACTIVITY_NOTIFICATION_EMAIL:
            body_message = "{user} tried to login {attempts} times".format(
                user=self.email, attempts=self.failed_login_attempts
            )
            send_mail(
                subject='Suspicious activity on SSO',
                message=body_message,
                from_email=settings.DEFAULT_FROM_EMAIL,
                recipient_list=[settings.SSO_SUSPICIOUS_ACTIVITY_NOTIFICATION_EMAIL],
            )

    def get_password_reset_link(self):
        return reverse(
            "account_reset_password_from_key",
            kwargs={'uidb36': user_pk_to_url_str(self), 'key': default_token_generator.make_token(self)},
        )

    def get_email_verification_link(self):
        email_address = self.emailaddress_set.last()
        email_confirmation = EmailConfirmation.create(email_address)
        email_confirmation.sent = timezone.now()
        email_confirmation.save()

        return reverse("account_confirm_email", args=[email_confirmation.key])


class UserProfile(TimeStampedModel):
    # TODO: move these over to directory-constants
    CORE_SEGMENTS = [
        ('SUSTAIN', 'Sustain'),
        ('REASSURE', 'Reassure'),
        ('PROMOTE', 'Promote'),
        ('CHALLENGE', 'Challenge'),
    ]

    class Meta:
        ordering = ['-created']

    user = models.OneToOneField(User, related_name='user_profile', on_delete=models.CASCADE)
    first_name = models.CharField(max_length=128)
    last_name = models.CharField(max_length=128)
    job_title = models.CharField(max_length=128, blank=True, null=True)
    mobile_phone_number = models.CharField(max_length=128, blank=True, null=True)
    segment = models.CharField(max_length=15, choices=CORE_SEGMENTS, blank=True, null=True)

    def __str__(self):
        return str(self.user)


class DataRetentionStatistics(TimeStampedModel):
    sso_user = models.IntegerField(blank=True, null=True)
    company_user = models.IntegerField(blank=True, null=True)
    company = models.IntegerField(blank=True, null=True)

    class Meta:
        verbose_name = _('Data Retention Statistics')
        verbose_name_plural = _('Data Retention Statistics')

    def __str__(self):
        return str(self.sso_user)


class Service(TimeStampedModel):
    # a service name e.g. great-cms
    name = models.CharField(max_length=128)


class ServicePage(TimeStampedModel):
    service = models.ForeignKey(Service, on_delete=models.CASCADE, related_name='service_pages')
    page_name = models.CharField(max_length=128)

    class Meta:
        indexes = [models.Index(fields=['service'])]
        unique_together = [['service', 'page_name']]


class UserPageView(TimeStampedModel):
    # Records an instance of a user reading a page ONCE. Subsequent reads will add extra records
    service_page = models.ForeignKey(ServicePage, on_delete=models.CASCADE, related_name='page_views')
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='page_views')

    class Meta:
        unique_together = [['user', 'service_page']]

    def to_dict(self):
        return {
            'service': self.service_page.service.name,
            'page': self.service_page.page_name,
            'modified': self.modified.strftime(API_DATETIME_FORMAT),
            'created': self.created.strftime(API_DATETIME_FORMAT),
        }


class LessonCompleted(TimeStampedModel):
    service = models.ForeignKey(Service, on_delete=models.CASCADE)
    lesson_page = models.CharField(max_length=255)
    lesson = models.IntegerField()
    module = models.IntegerField()  # Saving PK so it can be queried
    user = models.ForeignKey(User, on_delete=models.CASCADE)

    class Meta:
        unique_together = [['user', 'lesson']]

    def to_dict(self):
        return {
            'service': self.service.name,
            'lesson_page': self.lesson_page,
            'lesson': self.lesson,
            'module': self.module,
            'user': self.user.id,
            'modified': self.modified.strftime(API_DATETIME_FORMAT),
            'created': self.created.strftime(API_DATETIME_FORMAT),
        }


QUESTION_TYPES = [
    ('RADIO', 'Radio'),
    ('SELECT', 'Select'),
    ('MULTI_SELECT', 'Multi select'),
    ('TEXT', 'Text'),
    ('COMPANY_LOOKUP', 'Company lookup'),
]


PREDEFINED_CHOICES = [
    ('EXPERTISE_REGION_CHOICES', 'EXPERTISE_REGION_CHOICES'),
    ('TURNOVER_CHOICES', 'TURNOVER_CHOICES'),
    ('SECTORS', 'SECTORS'),
]


class Question(TimeStampedModel):
    service = models.ForeignKey(Service, on_delete=models.CASCADE)
    name = models.CharField(max_length=128)
    title = models.CharField(max_length=256)
    question_type = models.CharField(max_length=20, choices=QUESTION_TYPES)
    question_choices = JSONField(blank=True, default=dict, help_text=_('Question parameters'))
    predefined_choices = models.CharField(blank=True, null=True, max_length=128, choices=PREDEFINED_CHOICES)
    is_active = models.BooleanField(default=True)
    sort_order = SortOrderField(_("Sort"))

    class Meta:
        ordering = ('sort_order',)

    def __str__(self):
        inactive = '(inactive) ' if not self.is_active else ''
        return f'{inactive}{str(self.name)}'

    def to_dict(self):
        question_options = (
            [{'label': label, 'value': value} for value, label in choices.__dict__.get(self.predefined_choices, [])]
            if self.predefined_choices
            else []
        )
        question_choices = self.question_choices or {'options': []}
        if question_options:
            question_choices['options'] = question_choices.get('options', []) + question_options
        return {
            'id': self.id,
            'name': self.name,
            'title': self.title,
            'type': self.question_type,
            'choices': question_choices,
            'order': self.sort_order,
        }


class UserAnswer(TimeStampedModel):
    question = models.ForeignKey(Question, on_delete=models.CASCADE)
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    answer = JSONField(blank=True, null=True, default=dict)

    class Meta:
        ordering = ('user', 'question__sort_order')
        unique_together = [['user', 'question']]

    def to_dict(self):
        return {'question_id': self.question.id, 'answer': self.answer}

    def __str__(self):
        return str(f'{self.user} : {self.question.name}')


class UserData(TimeStampedModel):
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    name = models.CharField(max_length=128)
    data = JSONField(blank=True, default=dict)

    def __str__(self):
        return str(f'{self.user} : {self.name}')

    class Meta:
        unique_together = [['user', 'name']]
        ordering = ['user']
        verbose_name_plural = 'User data'