NikolaiGulatz/oauth-token-cache

View on GitHub
oauth_token_cache/token_client.py

Summary

Maintainability
A
45 mins
Test Coverage
"""TokenClient"""
import requests

from .token import Token


class TokenClient:
    """Retrieve OAuth 2.0 tokens from the token endpoint and from the redis cache.

    Args:
        client_id (str)
        client_secret (str)
        token_url (str)
        redis_client (redis.Redis)
        audience (str)
        timeout (:obj:`int`, optional)
    """

    def __init__(
        self,
        client_id=None,
        client_secret=None,
        token_url=None,
        redis_client=None,
        audience=None,
        **kwargs
    ):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_url = token_url
        self.redis_client = redis_client
        self.audience = audience
        self.timeout = kwargs.get("timeout", 5)

    def mocked_token(self, token):
        """Return a hardcoded token instead of requesting a token or reading
        it from cache.

        Args:
            token (str): The token to be returned

        Returns:
            Token: A Token instance
        """
        return Token.from_auth_provider(
            access_token=token,
            expires_in=86400,
            token_type="Bearer",
            audience=self.audience,
        )

    def fresh_token(self):
        """Obtain a fresh OAuth 2.0 token from the token endpoint.

        Returns:
            Token: A Token instance
        """
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "audience": self.audience,
        }

        response = requests.post(
            self.token_url, json=payload, allow_redirects=False, timeout=self.timeout
        )
        response.raise_for_status()
        response = response.json()

        token = Token.from_auth_provider(
            access_token=response["access_token"],
            expires_in=response["expires_in"],
            token_type=response["token_type"],
            audience=self.audience,
        )

        return self.cache_token(token)

    def cached_token(self):
        """Try to retrieve a cached token from Redis.

        Returns:
            None: Returns `None` in case of a cache miss
            Token: Returns a Token instance
        """
        values = self.redis_client.hgetall(self.cache_key)

        if not values:
            return None

        return Token.from_cache(values)

    def cache_token(self, token):
        """Persist a Token instance in redis.

        Args:
            token (Token): The token to persist.

        Returns:
            Token: The persisted token
        """
        pipeline = self.redis_client.pipeline()

        pipeline.hmset(self.cache_key, token.asdict())
        pipeline.expire(self.cache_key, token.expires_in)
        pipeline.execute()

        return token

    @property
    def cache_key(self):
        """Return the cache key for the configured client_id and audience.

        Returns:
            str: A cache key in the format `oauth_token_cache__<client_id>_<audience>`
        """
        return "oauth_token_cache__{client_id}_{audience}".format(
            client_id=self.client_id, audience=self.audience
        )