uktrade/directory-sso

View on GitHub
sso/adapters.py

Summary

Maintainability
A
1 hr
Test Coverage
import urllib.parse

from allauth.account.adapter import DefaultAccountAdapter
from allauth.account.models import EmailAddress
from allauth.account.utils import get_request_param
from allauth.exceptions import ImmediateHttpResponse
from allauth.socialaccount.adapter import DefaultSocialAccountAdapter
from django.conf import settings
from django.shortcuts import redirect
from django.urls import reverse
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
from notifications_python_client import NotificationsAPIClient

from sso.constants import RESEND_VERIFICATION_URL
from sso.user.models import UserProfile
from sso.user.utils import get_url_with_redirect, is_valid_redirect
from sso.verification import helpers
from sso.verification.models import VerificationCode

EMAIL_TEMPLATES = {
    'account/email/email_confirmation_signup': settings.GOV_NOTIFY_SIGNUP_CONFIRMATION_TEMPLATE_ID,
    'account/email/email_confirmation': settings.GOV_NOTIFY_SIGNUP_CONFIRMATION_TEMPLATE_ID,
    'account/email/password_reset_key': settings.GOV_NOTIFY_PASSWORD_RESET_TEMPLATE_ID,
}


class AccountAdapter(DefaultAccountAdapter):
    def get_email_confirmation_url(self, request, emailconfirmation):
        """
        Constructs the email confirmation (activation) url.
        """
        redirect_url = settings.DEFAULT_REDIRECT_URL

        redirect_param_value = get_request_param(request, settings.REDIRECT_FIELD_NAME)
        if redirect_param_value:
            redirect_url = redirect_param_value

        email_confirmation_url = super().get_email_confirmation_url(request, emailconfirmation)

        if redirect_url:
            if is_valid_redirect(urllib.parse.unquote(redirect_url)):
                # This is to handle the case when user registered on one device
                # (e.g. desktop) and clicked 'confirm email' on another (e.g.
                # mobile) - if user is automatically logged in on confirm
                # (which will happen when clicking confirm on same machine)
                # login view will redirect to 'next', if not (when switching
                # browsers), user will have to log in and then will be
                # redirected
                login_url_with_next = get_url_with_redirect(url=reverse('account_login'), redirect_url=redirect_url)
                email_confirmation_url = get_url_with_redirect(
                    url=email_confirmation_url, redirect_url=login_url_with_next
                )
        return email_confirmation_url

    def validate_unique_email(self, email):
        # Although email has to be unique, as it is user login, do not validate
        # it, so that e-mail enumeration is not possible - security requirement
        return email

    def save_user(self, request, user, form, commit=True):
        """
        Saves a new `User` instance using information provided in the
        signup form.
        """
        user = super().save_user(request, user, form, commit=False)

        user.utm = request.COOKIES.get('ed_utm', {})

        if commit:
            user.save()

        return user

    @staticmethod
    def build_password_reset_url(context):
        """Add next param if valid redirect."""
        reset_url = context['password_reset_url']
        next_url = context['request'].POST.get('next', '')
        if next_url and is_valid_redirect(next_url):
            reset_url += '?next={next_url}'.format(next_url=next_url)
        return reset_url

    @staticmethod
    def is_social_account(context):
        return True if context['user'].socialaccount_set.first() else False

    @staticmethod
    def is_verified_account(context):
        email_address = EmailAddress.objects.get(email=context['user'].email)
        return email_address.verified

    @staticmethod
    def regenerate_verification_code(context):
        code = helpers.generate_verification_code()
        verification_code = context['user'].verification_code
        verification_code.code = code
        verification_code.save(update_fields=['code'])
        return code

    @staticmethod
    def generate_verification_link(context):
        uidb64 = urlsafe_base64_encode(force_bytes(context['user'].pk))
        token = helpers.verification_token.make_token(context['user'])
        verification_params = f'?uidb64={uidb64}&token={token}'

        return settings.MAGNA_URL + '/signup/' + verification_params

    def send_mail(self, template_prefix, email, context):
        # Don't send an email if the account doesn't exist
        if template_prefix == 'account/email/unknown_account':
            return None

        template_id = EMAIL_TEMPLATES[template_prefix]

        if not self.is_social_account(context):
            #  build personalisation dict from context
            if template_id == settings.GOV_NOTIFY_PASSWORD_RESET_TEMPLATE_ID:
                if self.is_verified_account(context):
                    personalisation = {'password_reset': self.build_password_reset_url(context)}
                else:
                    template_id = settings.GOV_NOTIFY_PASSWORD_RESET_UNVERIFIED_TEMPLATE_ID
                    code = self.regenerate_verification_code(context)
                    personalisation = {
                        'verification_link': self.generate_verification_link(context),
                        'resend_verification_link': RESEND_VERIFICATION_URL,
                        'code': code,
                    }
            else:
                personalisation = {'confirmation_link': context['activate_url']}
        else:
            # This  is a social account send social account reset email
            # Ideally this should be in SocialAccountAdapter but unfortunately there's no hook for send email
            template_id = settings.GOV_NOTIFY_SOCIAL_PASSWORD_RESET_TEMPLATE_ID
            personalisation = {'login_link': settings.MAGNA_URL + '/login/'}

        notifications_client = NotificationsAPIClient(settings.GOV_NOTIFY_API_KEY)
        notifications_client.send_email_notification(
            email_address=email,
            template_id=template_id,
            personalisation=personalisation,
        )

    def send_confirmation_mail(self, request, emailconfirmation, signup):
        # Send the email only if the user signed up via the "old flow". The "new flow" involved a verification code
        try:
            emailconfirmation.email_address.user.verification_code
        except VerificationCode.DoesNotExist:
            return super().send_confirmation_mail(request=request, emailconfirmation=emailconfirmation, signup=signup)
        else:
            pass

    def respond_email_verification_sent(self, request, user):
        # Checking if using "old flow" or "new flow".
        try:
            user.verification_code
        except VerificationCode.DoesNotExist:
            return super().respond_email_verification_sent(request, user)
        else:
            return redirect(RESEND_VERIFICATION_URL)

    def is_safe_url(self, url):
        if url:
            return is_valid_redirect(urllib.parse.unquote(url))


class SocialAccountAdapter(DefaultSocialAccountAdapter):
    def save_user(self, *args, **kwargs):
        user = super().save_user(*args, **kwargs)
        UserProfile.objects.create(
            user=user,
            first_name=user.first_name,
            last_name=user.last_name,
        )
        self.send_welcome_email(user.email)
        return user

    @staticmethod
    def send_welcome_email(email):
        client = NotificationsAPIClient(settings.GOV_NOTIFY_API_KEY)
        client.send_email_notification(
            email_address=email,
            template_id=settings.GOV_NOTIFY_WELCOME_TEMPLATE_ID,
        )

    def pre_social_login(self, request, sociallogin):
        """
        This hook is invoked just after a user successfully authenticates via a
        social provider, but before the login is actually processed
        (and before the pre_social_login signal is emitted).

        TODO: This code is unfortunately tied to a specific client for the edge
        case of a duplication of social email. In this event, the service will
        respond with a 302 to the client's login page until REST endpoints for
        authentication are enabled.
        """

        # Ignore existing social accounts
        if sociallogin.is_existing:
            return

        # Check if given email address already exists
        try:
            social_user = sociallogin.user
            social_email = social_user.email
            EmailAddress.objects.get(email=social_email)

        # Email does not exist, allauth will handle a new social account
        except EmailAddress.DoesNotExist:
            return

        # Email exists, redirect to login page
        client_url = settings.MAGNA_URL + '/login?email=' + social_email

        raise ImmediateHttpResponse(redirect(client_url))