cloudsmith-io/cloudsmith-cli

View on GitHub
cloudsmith_cli/cli/webserver.py

Summary

Maintainability
A
35 mins
Test Coverage
F
30%
from functools import cached_property
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qsl, urlparse

import click

from ..core.api.exceptions import ApiException
from ..core.keyring import store_sso_tokens
from .saml import exchange_2fa_token


class AuthenticationWebServer(HTTPServer):
    def __init__(
        self, server_address, RequestHandlerClass, bind_and_activate=True, **kwargs
    ):
        self.api_host = kwargs.get("api_host")
        self.owner = kwargs.get("owner")
        self.debug = kwargs.get("debug", False)
        self.exception = None

        super().__init__(
            server_address, RequestHandlerClass, bind_and_activate=bind_and_activate
        )

    def finish_request(self, request, client_address):
        self.RequestHandlerClass(
            request,
            client_address,
            self,
            api_host=self.api_host,
            owner=self.owner,
            debug=self.debug,
        )

    def _handle_request_noblock(self):
        # override to allow exceptions to bubble up to the CLI
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except (  # pylint: disable=broad-exception-caught
                Exception,
                ApiException,
            ) as exc:
                self.handle_error(request, client_address)
                self.exception = exc
                self.shutdown_request(request)
            except:  # noqa: E722
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)

    def handle_error(self, request, client_address):
        if self.debug:
            super().handle_error(request, client_address)

    def shutdown_request(self, request):
        super().shutdown_request(request)
        if self.exception:
            raise self.exception


class AuthenticationWebRequestHandler(BaseHTTPRequestHandler):
    def __init__(self, request, client_address, server, **kwargs):
        self.api_host = kwargs.get("api_host")
        self.owner = kwargs.get("owner")
        self.debug = kwargs.get("debug", False)

        super().__init__(request, client_address, server)

    def _return_response(self, status=200, message=None):
        self.send_response(status)
        self.send_header("Content-Type", "text/html; charset=utf-8")
        self.end_headers()

        self.wfile.write(message.encode("utf-8"))

    def _return_success_response(self):
        self._return_response(
            message="Authentication complete. You may close this window."
        )

    def _return_error_response(self):
        self._return_response(
            status=500,
            message="Authentication failed. Please check output from the CLI for more details.",
        )

    def log_request(self, code="-", size="-"):
        if self.debug:
            return super().log_request(code=code, size=size)

        return

    def log_error(self, format, *args):  # pylint: disable=redefined-builtin
        if self.debug:
            return super().log_error(format, *args)

        return

    @cached_property
    def url(self):
        return urlparse(self.path)

    @cached_property
    def query_data(self):
        return dict(parse_qsl(self.url.query))

    def do_GET(self):
        access_token = self.query_data.get("access_token")
        refresh_token = self.query_data.get("refresh_token")
        two_factor_token = self.query_data.get("two_factor_token")

        try:
            if access_token:
                store_sso_tokens(self.api_host, access_token, refresh_token)

                self._return_success_response()
                return

            if two_factor_token:
                totp_token = click.prompt(
                    "Please enter your 2FA token", hide_input=True, type=str
                )

                access_token, refresh_token = exchange_2fa_token(
                    self.api_host, two_factor_token, totp_token
                )
                store_sso_tokens(self.api_host, access_token, refresh_token)

                self._return_success_response()
                return
        except Exception as exc:
            self._return_error_response()
            raise exc

        self._return_error_response()
        return