authlib/integrations/sqla_oauth2/client_mixin.py
import secrets
from sqlalchemy import Column, String, Text, Integer
from authlib.common.encoding import json_loads, json_dumps
from authlib.oauth2.rfc6749 import ClientMixin
from authlib.oauth2.rfc6749 import scope_to_list, list_to_scope
class OAuth2ClientMixin(ClientMixin):
client_id = Column(String(48), index=True)
client_secret = Column(String(120))
client_id_issued_at = Column(Integer, nullable=False, default=0)
client_secret_expires_at = Column(Integer, nullable=False, default=0)
_client_metadata = Column('client_metadata', Text)
@property
def client_info(self):
"""Implementation for Client Info in OAuth 2.0 Dynamic Client
Registration Protocol via `Section 3.2.1`_.
.. _`Section 3.2.1`: https://tools.ietf.org/html/rfc7591#section-3.2.1
"""
return dict(
client_id=self.client_id,
client_secret=self.client_secret,
client_id_issued_at=self.client_id_issued_at,
client_secret_expires_at=self.client_secret_expires_at,
)
@property
def client_metadata(self):
if 'client_metadata' in self.__dict__:
return self.__dict__['client_metadata']
if self._client_metadata:
data = json_loads(self._client_metadata)
self.__dict__['client_metadata'] = data
return data
return {}
def set_client_metadata(self, value):
self._client_metadata = json_dumps(value)
if 'client_metadata' in self.__dict__:
del self.__dict__['client_metadata']
@property
def redirect_uris(self):
return self.client_metadata.get('redirect_uris', [])
@property
def token_endpoint_auth_method(self):
return self.client_metadata.get(
'token_endpoint_auth_method',
'client_secret_basic'
)
@property
def grant_types(self):
return self.client_metadata.get('grant_types', [])
@property
def response_types(self):
return self.client_metadata.get('response_types', [])
@property
def client_name(self):
return self.client_metadata.get('client_name')
@property
def client_uri(self):
return self.client_metadata.get('client_uri')
@property
def logo_uri(self):
return self.client_metadata.get('logo_uri')
@property
def scope(self):
return self.client_metadata.get('scope', '')
@property
def contacts(self):
return self.client_metadata.get('contacts', [])
@property
def tos_uri(self):
return self.client_metadata.get('tos_uri')
@property
def policy_uri(self):
return self.client_metadata.get('policy_uri')
@property
def jwks_uri(self):
return self.client_metadata.get('jwks_uri')
@property
def jwks(self):
return self.client_metadata.get('jwks', [])
@property
def software_id(self):
return self.client_metadata.get('software_id')
@property
def software_version(self):
return self.client_metadata.get('software_version')
def get_client_id(self):
return self.client_id
def get_default_redirect_uri(self):
if self.redirect_uris:
return self.redirect_uris[0]
def get_allowed_scope(self, scope):
if not scope:
return ''
allowed = set(self.scope.split())
scopes = scope_to_list(scope)
return list_to_scope([s for s in scopes if s in allowed])
def check_redirect_uri(self, redirect_uri):
return redirect_uri in self.redirect_uris
def check_client_secret(self, client_secret):
return secrets.compare_digest(self.client_secret, client_secret)
def check_endpoint_auth_method(self, method, endpoint):
if endpoint == 'token':
return self.token_endpoint_auth_method == method
# TODO
return True
def check_response_type(self, response_type):
return response_type in self.response_types
def check_grant_type(self, grant_type):
return grant_type in self.grant_types