lepture/authlib

View on GitHub
authlib/oauth1/rfc5849/client_auth.py

Summary

Maintainability
A
0 mins
Test Coverage
import time
import base64
import hashlib
from authlib.common.security import generate_token
from authlib.common.urls import extract_params
from authlib.common.encoding import to_native
from .wrapper import OAuth1Request
from .signature import (
    SIGNATURE_HMAC_SHA1,
    SIGNATURE_PLAINTEXT,
    SIGNATURE_RSA_SHA1,
    SIGNATURE_TYPE_HEADER,
    SIGNATURE_TYPE_BODY,
    SIGNATURE_TYPE_QUERY,
)
from .signature import (
    sign_hmac_sha1,
    sign_rsa_sha1,
    sign_plaintext
)
from .parameters import (
    prepare_form_encoded_body,
    prepare_headers,
    prepare_request_uri_query,
)


CONTENT_TYPE_FORM_URLENCODED = 'application/x-www-form-urlencoded'
CONTENT_TYPE_MULTI_PART = 'multipart/form-data'


class ClientAuth:
    SIGNATURE_METHODS = {
        SIGNATURE_HMAC_SHA1: sign_hmac_sha1,
        SIGNATURE_RSA_SHA1: sign_rsa_sha1,
        SIGNATURE_PLAINTEXT: sign_plaintext,
    }

    @classmethod
    def register_signature_method(cls, name, sign):
        """Extend client signature methods.

        :param name: A string to represent signature method.
        :param sign: A function to generate signature.

        The ``sign`` method accept 2 parameters::

            def custom_sign_method(client, request):
                # client is the instance of Client.
                return 'your-signed-string'

            Client.register_signature_method('custom-name', custom_sign_method)
        """
        cls.SIGNATURE_METHODS[name] = sign

    def __init__(self, client_id, client_secret=None,
                 token=None, token_secret=None,
                 redirect_uri=None, rsa_key=None, verifier=None,
                 signature_method=SIGNATURE_HMAC_SHA1,
                 signature_type=SIGNATURE_TYPE_HEADER,
                 realm=None, force_include_body=False):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token = token
        self.token_secret = token_secret
        self.redirect_uri = redirect_uri
        self.signature_method = signature_method
        self.signature_type = signature_type
        self.rsa_key = rsa_key
        self.verifier = verifier
        self.realm = realm
        self.force_include_body = force_include_body

    def get_oauth_signature(self, method, uri, headers, body):
        """Get an OAuth signature to be used in signing a request

        To satisfy `section 3.4.1.2`_ item 2, if the request argument's
        headers dict attribute contains a Host item, its value will
        replace any netloc part of the request argument's uri attribute
        value.

        .. _`section 3.4.1.2`: https://tools.ietf.org/html/rfc5849#section-3.4.1.2
        """
        sign = self.SIGNATURE_METHODS.get(self.signature_method)
        if not sign:
            raise ValueError('Invalid signature method.')

        request = OAuth1Request(method, uri, body=body, headers=headers)
        return sign(self, request)

    def get_oauth_params(self, nonce, timestamp):
        oauth_params = [
            ('oauth_nonce', nonce),
            ('oauth_timestamp', timestamp),
            ('oauth_version', '1.0'),
            ('oauth_signature_method', self.signature_method),
            ('oauth_consumer_key', self.client_id),
        ]
        if self.token:
            oauth_params.append(('oauth_token', self.token))
        if self.redirect_uri:
            oauth_params.append(('oauth_callback', self.redirect_uri))
        if self.verifier:
            oauth_params.append(('oauth_verifier', self.verifier))
        return oauth_params

    def _render(self, uri, headers, body, oauth_params):
        if self.signature_type == SIGNATURE_TYPE_HEADER:
            headers = prepare_headers(oauth_params, headers, realm=self.realm)
        elif self.signature_type == SIGNATURE_TYPE_BODY:
            if CONTENT_TYPE_FORM_URLENCODED in headers.get('Content-Type', ''):
                decoded_body = extract_params(body) or []
                body = prepare_form_encoded_body(oauth_params, decoded_body)
                headers['Content-Type'] = CONTENT_TYPE_FORM_URLENCODED
        elif self.signature_type == SIGNATURE_TYPE_QUERY:
            uri = prepare_request_uri_query(oauth_params, uri)
        else:
            raise ValueError('Unknown signature type specified.')
        return uri, headers, body

    def sign(self, method, uri, headers, body):
        """Sign the HTTP request, add OAuth parameters and signature.

        :param method: HTTP method of the request.
        :param uri:  URI of the HTTP request.
        :param body: Body payload of the HTTP request.
        :param headers: Headers of the HTTP request.
        :return: uri, headers, body
        """
        nonce = generate_nonce()
        timestamp = generate_timestamp()
        if body is None:
            body = b''

        # transform int to str
        timestamp = str(timestamp)

        if headers is None:
            headers = {}

        oauth_params = self.get_oauth_params(nonce, timestamp)

        # https://datatracker.ietf.org/doc/html/draft-eaton-oauth-bodyhash-00.html
        # include oauth_body_hash
        if body and headers.get('Content-Type') != CONTENT_TYPE_FORM_URLENCODED:
            oauth_body_hash = base64.b64encode(hashlib.sha1(body).digest())
            oauth_params.append(('oauth_body_hash', oauth_body_hash.decode('utf-8')))

        uri, headers, body = self._render(uri, headers, body, oauth_params)

        sig = self.get_oauth_signature(method, uri, headers, body)
        oauth_params.append(('oauth_signature', sig))

        uri, headers, body = self._render(uri, headers, body, oauth_params)
        return uri, headers, body

    def prepare(self, method, uri, headers, body):
        """Add OAuth parameters to the request.

        Parameters may be included from the body if the content-type is
        urlencoded, if no content type is set, a guess is made.
        """
        content_type = to_native(headers.get('Content-Type', ''))
        if self.signature_type == SIGNATURE_TYPE_BODY:
            content_type = CONTENT_TYPE_FORM_URLENCODED
        elif not content_type and extract_params(body):
            content_type = CONTENT_TYPE_FORM_URLENCODED

        if CONTENT_TYPE_FORM_URLENCODED in content_type:
            headers['Content-Type'] = CONTENT_TYPE_FORM_URLENCODED
            uri, headers, body = self.sign(method, uri, headers, body)
        elif self.force_include_body:
            # To allow custom clients to work on non form encoded bodies.
            uri, headers, body = self.sign(method, uri, headers, body)
        else:
            # Omit body data in the signing of non form-encoded requests
            uri, headers, _ = self.sign(method, uri, headers, b'')
            body = b''
        return uri, headers, body


def generate_nonce():
    return generate_token()


def generate_timestamp():
    return str(int(time.time()))