OpServ-Monitoring/opserv-backend

View on GitHub
app/server/apis/auth/endpoint_authenticate.py

Summary

Maintainability
A
0 mins
Test Coverage
import base64
import time

from server.apis.auth.token_generator import TokenGenerator
from server.apis.endpoint import Endpoint


class EndpointAuthenticate(Endpoint):
    def set_default_headers(self):
        super().set_default_headers()

        self.add_header("Allow", "POST")

    def post(self):
        authorization_header = self.request.headers.get("Authorization")

        if authorization_header is None or not authorization_header.startswith('Basic '):
            self.send_error(400)  # todo details
            return

        decoded_user_name_and_password = base64.b64decode(authorization_header[6:]).decode("utf-8")
        user_name, user_password = decoded_user_name_and_password.split(':', 2)

        # TOOD Exchange with call to data gate
        from database.unified_database_interface import UnifiedDatabaseInterface
        users_writer_reader = UnifiedDatabaseInterface.get_users_writer_reader()

        if users_writer_reader.is_password_valid(user_name, user_password):
            user = users_writer_reader.get_user(user_name)
            user_id = user[0]

            self.respond({
                "access_token": self.generate_jwt_token(str(user_id))
            })
        else:
            self.send_error(403)  # todo details invalid password or username

    @classmethod
    def generate_jwt_token(cls, uid) -> str:
        now = int(time.time())
        valid_seconds = 60 * 60 * 6  # 6 hours validity

        payload = {
            "uid": uid,
            "iat": now,
            "exp": now + valid_seconds
        }

        return TokenGenerator.encode_token(payload).decode("utf-8")