dino/auth/redis.py
import logging
import traceback
from datetime import datetime
import redis
from typing import Union
from zope.interface import implementer
from dino.auth import IAuth
from dino.config import ConfigKeys
from dino.config import RedisKeys
from dino.config import SessionKeys
logger = logging.getLogger(__name__)
@implementer(IAuth)
class AuthRedis(object):
def __init__(self, host: str, port: int = 6379, db: int = 0, env=None):
if env.config.get(ConfigKeys.TESTING, False) or host == 'mock':
from fakeredis import FakeStrictRedis
self.redis_pool = None
self.redis_instance = FakeStrictRedis(host=host, port=port, db=db)
else:
self.redis_pool = redis.ConnectionPool(host=host, port=port, db=db)
self.redis_instance = None
if env is None:
from dino import environ
self.env = environ.env
else:
self.env = env
@property
def redis(self):
if self.redis_pool is None:
return self.redis_instance
return redis.Redis(connection_pool=self.redis_pool)
def get_user_info(self, user_id: str, skip_cache: bool = False) -> dict:
key = RedisKeys.auth_key(user_id)
if not skip_cache:
session = self.env.cache.get_user_info(user_id)
if session is not None and len(session):
return session
binary_stored_session = self.redis.hgetall(key)
stored_session = dict()
for key, val in binary_stored_session.items():
if type(key) == bytes:
key = str(key, 'utf-8')
if type(val) == bytes:
val = str(val, 'utf-8')
if key in [SessionKeys.token.value, SessionKeys.user_id.value]:
continue
stored_session[key] = val
self.env.cache.set_user_info(user_id, stored_session)
return stored_session
def update_session_for_key(self, user_id: str, session_key: str, session_value: Union[str, datetime]) -> None:
key = RedisKeys.auth_key(user_id)
try:
if type(session_value) == datetime:
session_value = session_value.timestamp()
elif type(session_value) == bool:
session_value = int(session_value)
self.redis.hset(key, session_key, session_value)
except Exception as e:
logger.error(
'could not update session for user %s; key "%s", value "%s": %s',
user_id, session_key, session_value, str(e))
logger.exception(traceback.format_exc(e))
def authenticate_and_populate_session(self, user_id: str, supplied_token: str) -> (bool, Union[None, str], Union[None, dict]):
if user_id is None or len(user_id) == 0:
return False, 'no user_id supplied', None
if supplied_token is None or len(supplied_token) == 0:
return False, 'no token supplied', None
key = RedisKeys.auth_key(user_id)
binary_stored_session = self.redis.hgetall(key)
stored_session = dict()
for key, val in binary_stored_session.items():
if type(key) == bytes:
key = str(key, 'utf-8')
if type(val) == bytes:
val = str(val, 'utf-8')
stored_session[key] = val
if stored_session is None or len(stored_session) == 0:
return False, 'no session found for this user id, not logged in yet', None
stored_token = stored_session.get(SessionKeys.token.value)
if stored_token != supplied_token:
logger.warning(
'user "%s" supplied token "%s" but stored token is "%s"' % (user_id, supplied_token, stored_token))
return False, 'invalid token "%s" supplied for user id "%s"' % (supplied_token, user_id), None
session = dict()
for session_key in SessionKeys:
if not isinstance(session_key.value, str):
continue
session_value = stored_session.get(session_key.value)
if session_value is None or not isinstance(session_value, str) or len(session_value) == 0:
continue
session[session_key.value] = session_value
return True, None, session