xyb/drf-passwordless-jwt

View on GitHub
drf_passwordless_jwt/views.py

Summary

Maintainability
A
0 mins
Test Coverage
A
100%
from datetime import timedelta

from django.conf import settings
from django.utils import timezone
from drfpasswordless.models import CallbackToken
from drfpasswordless.views import ObtainAuthTokenFromCallbackToken
from drfpasswordless.views import ObtainEmailCallbackToken
from rest_framework import status
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView

from .consts import LONG_LIVE_TIME
from .serializers import EmailAuthWhiteListSerializer
from .serializers import JWTSerializer
from .testaccount import exists_test_account
from .testaccount import get_test_account_token
from .utils import generate_jwt


class ObtainEmailTokenView(ObtainEmailCallbackToken):
    serializer_class = EmailAuthWhiteListSerializer

    def post(self, request, *args, **kwargs):
        email = request.data["email"]
        if exists_test_account(email):
            return Response(
                {
                    "detail": f"test account email {email!r} available",
                },
            )

        return super().post(request, *args, **kwargs)


class ObtainJWTView(ObtainAuthTokenFromCallbackToken):
    def post(self, request, *args, **kwargs):
        email = request.data["email"]
        if exists_test_account(email):
            if request.data["token"] == get_test_account_token(email):
                return Response(
                    {
                        "email": email,
                        "token": generate_jwt(email),
                    },
                )

        resp = super().post(request, *args, **kwargs)
        token = generate_jwt(email)
        resp.data["email"] = email
        resp.data["token"] = token

        current_time = timezone.now()
        remove_time = current_time - timedelta(
            seconds=settings.OTP_TOKEN_CLEAN_SECONDS,
        )
        tokens = CallbackToken.objects.filter(created_at__lt=remove_time)
        tokens.delete()

        return resp


class VerifyJWTView(APIView):
    permission_classes = [AllowAny]
    serializer_class = JWTSerializer

    def post(self, request, *args, **kwargs):
        email = request.data.get("email")
        if email and exists_test_account(email):
            return Response(
                {
                    "email": email,
                    "exp": LONG_LIVE_TIME,
                },
            )

        serializer = self.serializer_class(
            data=request.data,
            context={"request": request},
        )
        if serializer.is_valid(raise_exception=False):
            return Response(
                serializer.validated_data["token"],
                status=status.HTTP_200_OK,
            )

        return Response(
            status=status.HTTP_401_UNAUTHORIZED,
            headers={"Access-Control-Allow-Origin": "*"},
        )


class VerifyJWTHeaderView(APIView):
    permission_classes = [AllowAny]
    serializer_class = JWTSerializer

    def get(self, request, *args, **kwargs):
        request_method = request.headers.get("X-Forwarded-Method", "")
        if request_method.upper() == "OPTIONS":
            return Response(status=status.HTTP_200_OK)

        email = request.headers.get("x-email")
        if email and exists_test_account(email):
            return Response(
                {
                    "email": email,
                    "exp": LONG_LIVE_TIME,
                },
            )

        auth_header = request.headers.get(settings.AUTH_HEADER_NAME)
        request.headers.get("Cookie")
        auth_cookie = request.COOKIES.get(settings.AUTH_COOKIE_NAME, "")
        if auth_header:
            try:
                _, token = auth_header.split()
            except ValueError:
                return Response(
                    status=status.HTTP_401_UNAUTHORIZED,
                    data={
                        "error": "Invalid request,"
                        f" header {settings.AUTH_HEADER_NAME!r}"
                        f" must be provided",
                    },
                    headers={"Access-Control-Allow-Origin": "*"},
                )
        elif auth_cookie:
            token = auth_cookie
        else:
            return Response(
                status=status.HTTP_401_UNAUTHORIZED,
                data={
                    "error": f"header {settings.AUTH_HEADER_NAME!r} or"
                    f" cookie {settings.AUTH_COOKIE_NAME!r}"
                    "must be provided",
                },
                headers={"Access-Control-Allow-Origin": "*"},
            )

        serializer = self.serializer_class(
            data={"token": token},
            context={"request": request},
        )
        if serializer.is_valid(raise_exception=False):
            return Response(
                serializer.validated_data["token"],
                status=status.HTTP_200_OK,
            )

        return Response(
            status=status.HTTP_401_UNAUTHORIZED,
            headers={"Access-Control-Allow-Origin": "*"},
        )