api/auth.py
from flask import g, current_app, request
from cryptography.fernet import Fernet, InvalidToken
from api.models import User, db
from datetime import datetime
from functools import wraps
import json
from api.time_management import get_current_datetime
from api.error import error_response
class UserNotFoundError(Exception):
"""
The Exception that indicates the user was not found
"""
pass
class UserDisabledError(Exception):
"""
The Exception that indicates the user was not found
"""
pass
def generate_token(obj):
"""
generate token and expiration, return it.
Args:
obj (dict): the data to encrypt
Return:
token (bytes): encrypted token
"""
fernet = Fernet(current_app.config['SECRET_KEY'])
now = datetime.now().timestamp()
data = {
'issued_at': now,
'data': obj
}
return fernet.encrypt(json.dumps(data).encode())
def decrypt_token(token):
"""
decrypt the token.
Args:
token (str): encrypted token.
Return:
decrypted data (dict): decrypted contents
"""
fernet = Fernet(current_app.config['SECRET_KEY'])
try:
decrypted = fernet.decrypt(token.encode())
except InvalidToken:
return None
return json.loads(decrypted.decode())
def login_required(*required_authority):
"""
a decorder to require login
Args:
*required_authority (str): required authorities
if this is blank, no requirement of authority
"""
def login_required_impl(f):
@wraps(f)
def decorated_function(*args, **kwargs):
def auth_error(code, headm=None):
resp, http_code = error_response(code)
if headm:
resp.headers['WWW-Authenticate'] = 'Bearer ' + headm
return resp, http_code
time = get_current_datetime()
end = current_app.config['END_DATETIME']
if end <= time:
return auth_error(18, 'realm="not acceptable"')
# check form of request header
if 'Authorization' not in request.headers:
return auth_error(0, 'realm="token_required"')
auth = request.headers['Authorization'].split()
if auth[0].lower() != 'bearer':
return auth_error(4, 'error="invalid_request"')
elif len(auth) == 1:
return auth_error(4, 'error="invalid_request"')
elif len(auth) > 2:
return auth_error(4, 'error="invalid_request"')
token = auth[1]
data = decrypt_token(token)
if not data:
return auth_error(0, 'error="invalid_token"')
try:
user = todays_user(user_id=data['data']['user_id'])
except UserNotFoundError:
return auth_error(0, 'realm="invalid_token"')
except UserDisabledError:
return auth_error(0, 'realm="id_disabled"')
if required_authority and \
(user.authority not in required_authority):
return auth_error(15, 'error="insufficient_scope"')
g.token_data = data['data']
return f(*args, **kwargs)
return decorated_function
return login_required_impl
def todays_user(secret_id='', user_id=''):
"""confirm the user id isn't used in other day
and return `User` object
Args:
secret_id (str): secret id of target user
Return:
User (api.models.User): the user object of 'secret_id'
Exceptions:
UserNotFoundError : when user was not found in DB
UserDisabledError : when user was diabled
References are here:
https://github.com/Sakuten/backend/issues/78#issuecomment-416609508
"""
if secret_id:
user = User.query.filter_by(secret_id=secret_id).first()
elif user_id:
user = User.query.get(user_id)
if not user:
raise UserNotFoundError()
if user.kind not in current_app.config['ONE_DAY_KIND']:
return user
if user.first_access is None:
user.first_access = get_current_datetime().date()
db.session.add(user)
db.session.commit()
return user
elif user.first_access == get_current_datetime().date():
return user
else:
raise UserDisabledError()