dino/cache/redis.py
import json
import logging
import traceback
import random
import sys
import socket
from typing import Set
import pytz
from zope.interface import implementer
from typing import Union, List, Optional
from typing import Dict
from typing import Tuple
from dino.config import RedisKeys
from dino.config import ConfigKeys
from dino.config import UserKeys
from dino.config import RoleKeys
from dino.cache import ICache
from datetime import datetime
from datetime import timedelta
import redis
__author__ = 'Oscar Eriksson <oscar.eriks@gmail.com>'
from dino.utils import activity_for_user_joined, activity_for_status_change, split_into_chunks
EIGHT_HOURS_IN_SECONDS = 8*60*60
TEN_MINUTES = 10*60
FIVE_MINUTES = 5*60
ONE_MINUTE = 60
THIRTY_SECONDS = 30
ONE_HOUR = 60*60
TEN_SECONDS = 10
SEVEN_DAYS = 7 * 24 * ONE_HOUR
LONG_AGO = 789000000 # january 1995
logger = logging.getLogger(__name__)
class MemoryCache(object):
def __init__(self):
self.vals = dict()
def set(self, key, value, ttl=30):
try:
expires_at = (datetime.utcnow() + timedelta(seconds=ttl)).timestamp()
self.vals[key] = (expires_at, value)
except:
pass
def get(self, key):
try:
if key not in self.vals:
return None
expires_at, value = self.vals[key]
now = datetime.utcnow().timestamp()
if now > expires_at:
del self.vals[key]
return None
return value
except:
return None
def cleanup(self):
"""
avoid in-memory cache buildup, since keys are only deleted if
the expiry time has passed when accessing them
"""
# skip cleaning up keys that expire <30 seconds from now, usually
# they will be cleaned up normally through being accessed, or else
# they'll be cleaned up next time this method is called
now = (datetime.utcnow() + timedelta(seconds=31)).timestamp()
# in case it gets modified while we iterate
values = self.vals.copy()
n_keys_before = len(values)
for key, (expires_at, _) in values.items():
if now > expires_at:
try:
del self.vals[key]
except KeyError:
# already deleted since we copied the dict
pass
n_keys_after = len(self.vals)
logger.info(f"cleaned up {n_keys_before}-{n_keys_after}={n_keys_before-n_keys_after} in-memory keys")
def delete(self, key):
if key in self.vals:
del self.vals[key]
def flushall(self):
self.vals = dict()
@implementer(ICache)
class CacheRedis(object):
def __init__(self, env, host: str, port: int = 6379, db: int = 0):
if env.config.get(ConfigKeys.TESTING, False) or host == 'mock':
from fakeredis import FakeStrictRedis
self.redis_pool = None
self.redis_instance = FakeStrictRedis(host=host, port=port, db=db)
else:
self.redis_pool = redis.ConnectionPool(host=host, port=port, db=db)
self.redis_instance = None
self.cache = MemoryCache()
self.env = env
self.status_topic = None
# only need status changes tracked if this is a wio node
if 'wio' in env.config.get(ConfigKeys.ENVIRONMENT, 'default'):
self.status_topic = self.env.config.get(ConfigKeys.STATUS_QUEUE, domain=ConfigKeys.EXTERNAL_QUEUE)
logger.info("status topic is: {}".format(self.status_topic))
args = sys.argv
for a in ['--bind', '-b']:
bind_arg_pos = [i for i, x in enumerate(args) if x == a]
if len(bind_arg_pos) > 0:
bind_arg_pos = bind_arg_pos[0]
break
self.listen_port = 'standalone'
if bind_arg_pos is not None and not isinstance(bind_arg_pos, list):
self.listen_port = args[bind_arg_pos + 1].split(':')[1]
self.listen_host = socket.gethostname().split('.')[0]
@property
def redis(self):
if self.redis_pool is None:
return self.redis_instance
return redis.Redis(connection_pool=self.redis_pool)
def _flushall(self) -> None:
self.redis.flushdb()
self.cache.flushall()
def _set(self, key, val, ttl=None) -> None:
if ttl is None:
self.cache.set(key, val)
else:
self.cache.set(key, val, ttl=ttl)
def _get(self, key):
return self.cache.get(key)
def _del(self, key) -> None:
self.cache.delete(key)
def get_all_permanent_rooms(self):
key = RedisKeys.all_permanent_rooms()
rooms = self.redis.get(key)
if rooms is None or len(rooms) == 0:
return None
return str(rooms, 'utf-8').split(',')
def set_all_permanent_rooms(self, rooms):
rooms_str = ','.join(rooms)
key = RedisKeys.all_permanent_rooms()
self.redis.set(key, rooms_str)
self.redis.expire(key, FIVE_MINUTES)
def reset_room_acls_for_action(self, action: str) -> None:
key = RedisKeys.rooms_with_action(action)
self.redis.delete(key)
def get_room_acls_for_action(self, action: str) -> Union[None, Dict[str, Dict[str, str]]]:
key = RedisKeys.rooms_with_action(action)
room_ids_bytes = self.redis.get(key)
if room_ids_bytes is None:
return None
room_ids_str = str(room_ids_bytes, 'utf-8')
room_ids = room_ids_str.split(',')
if len(room_ids) == 0:
return None
room_acls = dict()
for room_id in room_ids:
key = RedisKeys.room_acls_for_action(room_id, action)
acls = self.redis.hgetall(key)
str_acls = dict()
for acl_type, acl_value in acls.items():
str_acls[str(acl_type, 'utf-8')] = str(acl_value, 'utf-8')
room_acls[room_id] = str_acls
return room_acls
def set_room_acls_for_action(self, action: str, acls: Dict[str, Dict[str, str]]) -> None:
for room_id, values in acls.items():
key = RedisKeys.room_acls_for_action(room_id, action)
self.redis.hmset(key, values)
self.redis.expire(key, TEN_MINUTES)
room_ids = list(acls.keys())
key = RedisKeys.rooms_with_action(action)
# avoid race condition that could happen if we used lists instead; after clearing a
# list and before filling it with this updated list of room ids, another client
# might be querying redis and getting 0 results
room_ids_str = ','.join(room_ids)
self.redis.set(key, room_ids_str)
self.redis.expire(key, TEN_MINUTES)
def add_heartbeat(self, user_id: str) -> None:
redis_key = RedisKeys.heartbeat_user(user_id)
self.redis.set(redis_key, user_id)
self.redis.expire(redis_key, ONE_MINUTE)
def check_heartbeat(self, user_id: str) -> bool:
exists = self.has_heartbeat(user_id)
if exists:
self.add_heartbeat(user_id) # will reset the ttl
return exists
def has_heartbeat(self, user_id: str) -> bool:
redis_key = RedisKeys.heartbeat_user(user_id)
return self.redis.exists(redis_key) == 1
def get_rooms_for_user(self, user_id: str):
clean_rooms = dict()
rooms = self.redis.hgetall(RedisKeys.rooms_for_user(user_id))
if rooms is None or len(rooms) == 0:
return clean_rooms
for room_id, room_name in rooms.items():
room_id, room_name = str(room_id, 'utf-8'), str(room_name, 'utf-8')
clean_rooms[room_id] = room_name
return clean_rooms
def set_rooms_for_user(self, user_id: str, rooms: dict):
"""
set the room uuids the user is in
:param user_id: the uuid of the user
:param rooms: a dict of rooms the user is in now {room_uuid: room_name}
:return: nothing
"""
redis_key = RedisKeys.rooms_for_user(user_id)
if rooms is None or len(rooms) == 0:
self.redis.delete(redis_key)
else:
self.redis.hmset(redis_key, rooms)
self.redis.expire(redis_key, int(2*TEN_SECONDS + random.random()*TEN_SECONDS))
def remove_rooms_for_user(self, user_id: str) -> None:
redis_key = RedisKeys.rooms_for_user(user_id)
self.redis.delete(redis_key)
def leave_room_for_user(self, user_id: str, room_id: str) -> None:
redis_key = RedisKeys.rooms_for_user(user_id)
self.redis.hdel(redis_key, room_id)
def is_user_in_room(self, user_id: str, room_id: str):
return self.redis.hexists(RedisKeys.rooms_for_user(user_id), room_id)
def set_user_in_room(self, user_id: str, room_id: str, room_name: str):
return self.redis.hset(RedisKeys.rooms_for_user(user_id), room_id, room_name)
def set_type_of_rooms_in_channel(self, channel_id: str, object_type: str) -> None:
cache_key = RedisKeys.room_types_in_channel(channel_id)
self.cache.set(cache_key, object_type, ttl=int(ONE_MINUTE + random.random()*ONE_MINUTE))
def get_type_of_rooms_in_channel(self, channel_id: str) -> str:
cache_key = RedisKeys.room_types_in_channel(channel_id)
return self.cache.get(cache_key)
def set_is_room_ephemeral(self, room_id: str, is_ephemeral: bool) -> None:
redis_key = RedisKeys.non_ephemeral_rooms()
cache_key = '%s-%s' % (redis_key, room_id)
self.cache.set(cache_key, is_ephemeral)
def is_room_ephemeral(self, room_id: str) -> bool:
redis_key = RedisKeys.non_ephemeral_rooms()
cache_key = '%s-%s' % (redis_key, room_id)
return self.cache.get(cache_key)
def set_default_rooms(self, rooms: list) -> None:
cache_key = RedisKeys.default_rooms()
self.cache.set(cache_key, rooms, ttl=FIVE_MINUTES)
def clear_default_rooms(self) -> None:
redis_key = RedisKeys.default_rooms()
self.cache.delete(redis_key)
def get_default_rooms(self) -> list:
redis_key = RedisKeys.default_rooms()
value = self.cache.get(redis_key)
if value is not None:
return value
rooms = self.redis.smembers(redis_key)
if rooms is not None and len(rooms) > 0:
rooms = [str(room, 'utf-8') for room in rooms]
self.cache.set(redis_key, rooms, ttl=FIVE_MINUTES)
return rooms
return None
def get_default_channel_id(self) -> Optional[str]:
key = RedisKeys.default_channel_id()
default_channel_id = self.cache.get(key)
if default_channel_id is not None:
return default_channel_id
default_channel_id = self.redis.get(key)
if default_channel_id is None:
return None
default_channel_id = str(default_channel_id, 'utf-8')
self.cache.set(key, default_channel_id)
return default_channel_id
def set_default_channel_id(self, channel_id: str) -> None:
key = RedisKeys.default_channel_id()
self.cache.set(key, channel_id)
self.redis.set(key, channel_id)
def get_black_list(self) -> set:
cache_key = RedisKeys.black_list()
value = self.cache.get(cache_key)
if value is not None:
return value
values = self.redis.smembers(cache_key)
if value is not None:
decoded = {str(v, 'utf-8') for v in values}
self.cache.set(cache_key, decoded, ttl=TEN_MINUTES)
return decoded
return None
def reset_black_list(self) -> None:
cache_key = RedisKeys.black_list()
self.cache.delete(cache_key)
self.redis.delete(cache_key)
def set_black_list(self, the_list: set) -> None:
cache_key = RedisKeys.black_list()
self.cache.set(cache_key, the_list, ttl=TEN_MINUTES)
self.redis.delete(cache_key)
self.redis.sadd(cache_key, *the_list)
def remove_from_black_list(self, word: str) -> None:
cache_key = RedisKeys.black_list()
the_cached_list = self.cache.get(cache_key)
the_cached_list.remove(word)
self.cache.set(cache_key, the_cached_list, ttl=TEN_MINUTES)
self.redis.srem(cache_key, word)
def add_to_black_list(self, word: str) -> None:
cache_key = RedisKeys.black_list()
the_cached_list = self.cache.get(cache_key)
the_cached_list.add(word)
self.cache.set(cache_key, the_cached_list, ttl=TEN_MINUTES)
self.redis.sadd(cache_key, word)
def _set_memory_cache_and_hset(self, key: str, user_id: str, timestamp: str) -> None:
cache_key = '%s-%s' % (key, user_id)
self.cache.set(cache_key, timestamp)
self.redis.hset(key, user_id, timestamp)
def set_global_ban_timestamp(self, user_id: str, duration: str, timestamp: str, username: str) -> None:
key = RedisKeys.banned_users()
self._set_memory_cache_and_hset(key, user_id, '%s|%s|%s' % (duration, timestamp, username))
def set_channel_ban_timestamp(self, channel_id: str, user_id: str, duration: str, timestamp: str, username: str) -> None:
key = RedisKeys.banned_users_channel(channel_id)
self._set_memory_cache_and_hset(key, user_id, '%s|%s|%s' % (duration, timestamp, username))
def set_room_ban_timestamp(self, room_id: str, user_id: str, duration: str, timestamp: str, username: str) -> None:
key = RedisKeys.banned_users(room_id)
self._set_memory_cache_and_hset(key, user_id, '%s|%s|%s' % (duration, timestamp, username))
def set_room_mute_timestamp(self, room_id: str, user_id: str, duration: str, timestamp: str) -> None:
key = RedisKeys.muted_users(room_id)
self._set_memory_cache_and_hset(key, user_id, '%s|%s' % (duration, timestamp))
def get_user_roles(self, user_id: str) -> None:
key = RedisKeys.user_roles()
redis_key = '%s-%s' % (key, user_id)
cache_key = '%s-%s' % (key, user_id)
value = self.cache.get(cache_key)
if value is not None:
return value
value = self.redis.get(redis_key)
if value is not None:
value = json.loads(str(value, 'utf-8'))
self.cache.set(cache_key, value, ttl=int(FIVE_MINUTES + random.random()*FIVE_MINUTES))
return value
def set_user_roles(self, user_id: str, roles: dict) -> None:
key = RedisKeys.user_roles()
redis_key = '%s-%s' % (key, user_id)
self.redis.set(redis_key, json.dumps(roles))
self.redis.expire(redis_key, TEN_MINUTES)
self.cache.set(redis_key, roles, ttl=int(FIVE_MINUTES + random.random()*FIVE_MINUTES))
def reset_user_roles(self, user_id: str) -> None:
key = RedisKeys.user_roles()
redis_key = '%s-%s' % (key, user_id)
self.redis.delete(redis_key)
self.cache.delete(redis_key)
def get_admin_room(self) -> Union[str, None]:
key = RedisKeys.admin_room()
value = self.cache.get(key)
if value is not None:
return value
room_id = self.redis.get(key)
if room_id is None or len(str(room_id, 'utf-8').strip()) == 0:
return None
room_id = str(room_id, 'utf-8')
self.cache.set(key, room_id, ttl=TEN_MINUTES)
return room_id
def set_admin_room(self, room_id: str) -> None:
key = RedisKeys.admin_room()
self.redis.set(key, room_id)
self.cache.set(key, room_id, ttl=TEN_MINUTES)
def remove_admin_room(self) -> None:
key = RedisKeys.admin_room()
self.redis.delete(key)
self.cache.delete(key)
def _get_mute_timestamp(self, key: str, user_id: str) -> (str, str):
cache_key = '%s-%s' % (key, user_id)
value = self.cache.get(cache_key)
if value is not None:
return value.split('|', 1)
ban_info = self.redis.hget(key, user_id)
if ban_info is None:
return None, None
ban_info = str(ban_info, 'utf-8')
return ban_info.split('|', 1)
def _get_ban_timestamp(self, key: str, user_id: str) -> (str, str, str):
cache_key = '%s-%s' % (key, user_id)
value = self.cache.get(cache_key)
if value is not None:
return value.split('|', 2)
ban_info = self.redis.hget(key, user_id)
if ban_info is None:
return None, None, None
ban_info = str(ban_info, 'utf-8')
return ban_info.split('|', 2)
def get_global_ban_timestamp(self, user_id: str) -> str:
key = RedisKeys.banned_users()
return self._get_ban_timestamp(key, user_id)
def reset_rooms_for_channel(self, channel_id: str) -> None:
key_with_info = RedisKeys.rooms_for_channel_with_info(channel_id)
key_without_info = RedisKeys.rooms_for_channel_without_info(channel_id)
self.cache.delete(key_with_info)
self.cache.delete(key_without_info)
self.redis.delete(key_with_info)
self.redis.delete(key_without_info)
def get_rooms_for_channel(self, channel_id: str, with_info: bool = True) -> dict:
"""
rooms_with_n_users[room_id] = {
'name': all_rooms[room_id]['name'],
'sort_order': all_rooms[room_id]['sort_order'],
'ephemeral': all_rooms[room_id]['ephemeral'],
'admin': all_rooms[room_id]['admin'],
'users': len(visible_users)
}
"""
if with_info:
return self._get_rooms_for_channel_with_info(channel_id)
else:
return self._get_rooms_for_channel_without_info(channel_id)
def _get_rooms_for_channel_without_info(self, channel_id: str) -> dict:
"""
room.uuid: {
'ephemeral': room.ephemeral,
'name': room.name
}
"""
key = RedisKeys.rooms_for_channel_without_info(channel_id)
rooms = self.cache.get(key)
if rooms is not None:
return rooms
raw_rooms = self.redis.hgetall(key)
if raw_rooms is None or len(raw_rooms) == 0:
return None
clean_rooms = dict()
for room_id, room_info in raw_rooms.items():
room_id = str(room_id, 'utf8')
room_info = str(room_info, 'utf8')
room_ephemeral, room_name = room_info.split('|', maxsplit=1)
if room_ephemeral.lower() in {'', 'true'}:
room_ephemeral = True
else:
room_ephemeral = False
clean_rooms[room_id] = {
'name': room_name,
'ephemeral': room_ephemeral
}
return clean_rooms
def _get_rooms_for_channel_with_info(self, channel_id: str) -> dict:
"""
rooms_with_n_users[room_id] = {
'name': all_rooms[room_id]['name'],
'sort_order': all_rooms[room_id]['sort_order'],
'ephemeral': all_rooms[room_id]['ephemeral'],
'admin': all_rooms[room_id]['admin'],
'users': len(visible_users)
}
"""
key = RedisKeys.rooms_for_channel_with_info(channel_id)
rooms = self.cache.get(key)
if rooms is not None:
return rooms
raw_rooms = self.redis.hgetall(key)
if raw_rooms is None or len(raw_rooms) == 0:
return None
clean_rooms = dict()
for room_id, room_info in raw_rooms.items():
room_id = str(room_id, 'utf8')
room_info = str(room_info, 'utf8')
room_sort, room_ephemeral, room_admin, room_users, room_name = room_info.split('|', maxsplit=4)
if room_sort == '':
room_sort = '999'
room_sort = int(room_sort)
if room_admin.lower() in {'', 'false'}:
room_admin = False
else:
room_admin = True
if room_ephemeral.lower() in {'', 'true'}:
room_ephemeral = True
else:
room_ephemeral = False
if room_users == '':
room_users = '0'
room_users = int(room_users)
clean_rooms[room_id] = {
'name': room_name,
'sort_order': room_sort,
'ephemeral': room_ephemeral,
'admin': room_admin,
'users': room_users
}
return clean_rooms
def set_rooms_for_channel(self, channel_id: str, rooms_infos: dict, with_info: bool = True) -> None:
if with_info:
self._set_rooms_for_channel_with_info(channel_id, rooms_infos)
else:
self._set_rooms_for_channel_without_info(channel_id, rooms_infos)
def _set_rooms_for_channel_with_info(self, channel_id: str, rooms_infos: dict) -> None:
"""
rooms_with_n_users[room_id] = {
'name': all_rooms[room_id]['name'],
'sort_order': all_rooms[room_id]['sort_order'],
'ephemeral': all_rooms[room_id]['ephemeral'],
'admin': all_rooms[room_id]['admin'],
'users': len(visible_users)
}
room_sort, room_ephemeral, room_admin, room_users, room_name = room_info.split('|', maxsplit=4)
"""
key = RedisKeys.rooms_for_channel_with_info(channel_id)
self.cache.set(key, rooms_infos, ttl=TEN_SECONDS)
redis_rooms = dict()
for room_id, room_info in rooms_infos.items():
r_value = '{}|{}|{}|{}|{}'.format(
str(room_info['sort_order']),
str(room_info.get('ephemeral', True)).lower(),
str(room_info.get('admin', False)).lower(),
str(room_info.get('users', 0)),
room_info['name']
)
redis_rooms[room_id] = r_value
if len(redis_rooms) > 0:
self.redis.hmset(key, redis_rooms)
self.redis.expire(key, ONE_MINUTE)
def _set_rooms_for_channel_without_info(self, channel_id: str, rooms_infos: dict) -> None:
"""
room.uuid: {
'ephemeral': room.ephemeral,
'name': room.name
}
"""
key = RedisKeys.rooms_for_channel_without_info(channel_id)
self.cache.set(key, rooms_infos, ttl=TEN_SECONDS)
redis_rooms = dict()
for room_id, room_info in rooms_infos.items():
r_value = '{}|{}'.format(
str(room_info['ephemeral'] or True).lower(),
room_info['name']
)
redis_rooms[room_id] = r_value
if len(redis_rooms) > 0:
self.redis.hmset(key, redis_rooms)
self.redis.expire(key, ONE_MINUTE)
def get_acls_in_room_for_action(self, room_id: str, action: str) -> dict:
key = RedisKeys.acls_in_room_for_action(room_id, action)
return self.cache.get(key)
def set_acls_in_room_for_action(self, room_id: str, action: str, acls: dict) -> None:
key = RedisKeys.acls_in_room_for_action(room_id, action)
self.cache.set(key, acls, ttl=int(TEN_MINUTES + random.random()*TEN_MINUTES))
def get_acls_in_channel_for_action(self, channel_id: str, action: str) -> dict:
key = RedisKeys.acls_in_channel_for_action(channel_id, action)
return self.cache.get(key)
def get_users_in_room_for_role(self, room_id: str, role: str) -> dict:
key = RedisKeys.users_in_room_for_role(room_id, role)
return self.cache.get(key)
def set_avatar_for(self, user_id: str, avatar_url: str, app_avatar_url: str, app_avatar_safe_url: str) -> None:
key = RedisKeys.avatars()
cache_key = '{}-{}'.format(key, user_id)
urls = '|'.join([avatar_url, app_avatar_url, app_avatar_safe_url])
self.cache.set(cache_key, urls, ttl=THIRTY_SECONDS)
self.redis.hset(key, user_id, urls)
def get_avatar_for(self, user_id: str) -> Union[Tuple[str, str, str], None]:
key = RedisKeys.avatars()
cache_key = '{}-{}'.format(key, user_id)
value = self.cache.get(cache_key)
if value is not None:
return value.split('|', maxsplit=2)
value = self.redis.hget(key, user_id)
if value is None:
return None
value = str(value, 'utf-8')
avatar_url, app_avatar_url, app_avatar_safe_url = value.split('|', maxsplit=2)
self.cache.set(cache_key, value, ttl=THIRTY_SECONDS)
return avatar_url, app_avatar_url, app_avatar_safe_url
def reset_sids_for_user(self, user_id: str) -> None:
key = RedisKeys.sid_for_user_id()
self.redis.hdel(key, user_id)
def remove_sid_for_user(self, user_id: str, sid: str) -> None:
def _try_to_remove_sid(sid_to_remove):
sid_key = RedisKeys.user_id_for_sid()
self.redis.hdel(sid_key, sid_to_remove)
if user_id is None:
return
all_sids = self.get_sids_for_user(user_id)
if all_sids is None:
all_sids = set()
if sid_to_remove not in all_sids:
return
all_sids.remove(sid_to_remove)
all_sids = ','.join(list(set(all_sids)))
key = RedisKeys.sid_for_user_id()
self.redis.hset(key, user_id, all_sids)
try:
_try_to_remove_sid(sid)
except RuntimeError:
try:
_try_to_remove_sid(sid)
except RuntimeError as e:
logger.error('could not remove sid {} for user {}, tried 2 times: {}'.format(sid, user_id, str(e)))
logger.exception(traceback.format_exc())
def set_sids_for_user(self, user_id: str, all_sids: list) -> None:
key = RedisKeys.sid_for_user_id()
all_sids = set(all_sids.copy())
sid_key = RedisKeys.user_id_for_sid()
for sid in all_sids:
self.redis.hset(sid_key, sid, user_id)
all_sids = ','.join(list(set(all_sids)))
self.redis.hset(key, user_id, all_sids)
def add_sid_for_user(self, user_id: str, sid: str) -> None:
all_sids = self.get_sids_for_user(user_id)
if all_sids is None:
all_sids = set()
else:
all_sids = set(all_sids)
all_sids.add(sid)
key = RedisKeys.sid_for_user_id()
sid_key = RedisKeys.user_id_for_sid()
for sid in all_sids:
self.redis.hset(sid_key, sid, user_id)
all_sids = ','.join(list(set(all_sids)))
self.redis.hset(key, user_id, all_sids)
def get_user_for_sid(self, sid: str):
sid_key = RedisKeys.user_id_for_sid()
user_id = self.redis.hget(sid_key, sid)
if user_id is not None:
user_id = str(user_id, 'utf-8')
return user_id
def get_sids_for_user(self, user_id: str) -> Union[None, list]:
key = RedisKeys.sid_for_user_id()
all_sids = self.redis.hget(key, user_id)
if all_sids is None:
return None
all_sids = list(set(str(all_sids, 'utf-8').split(',')))
return all_sids.copy()
def get_users_in_room_by_name(self, room_name: str, is_super_user: bool) -> dict:
if is_super_user:
key = RedisKeys.users_in_room_incl_invisible_by_name(room_name)
else:
key = RedisKeys.users_in_room_only_visible_by_name(room_name)
return self.cache.get(key)
def set_users_in_room_by_name(self, room_name: str, users: dict, is_super_user: bool) -> None:
if is_super_user:
key = RedisKeys.users_in_room_incl_invisible_by_name(room_name)
else:
key = RedisKeys.users_in_room_only_visible_by_name(room_name)
self.cache.set(key, users, ttl=int(TEN_SECONDS + random.random()*TEN_SECONDS))
def get_users_in_room(self, room_id: str, is_super_user: bool) -> dict:
if is_super_user:
key = RedisKeys.users_in_room_incl_invisible(room_id)
else:
key = RedisKeys.users_in_room_only_visible(room_id)
return self.cache.get(key)
def set_users_in_room(self, room_id: str, users: dict, is_super_user: bool) -> None:
if is_super_user:
key = RedisKeys.users_in_room_incl_invisible(room_id)
else:
key = RedisKeys.users_in_room_only_visible(room_id)
self.cache.set(key, users, ttl=int(TEN_SECONDS + random.random()*TEN_SECONDS))
def set_users_in_room_for_role(self, room_id: str, role: str, users: dict) -> None:
key = RedisKeys.users_in_room_for_role(room_id, role)
self.cache.set(key, users, ttl=int(FIVE_MINUTES + random.random()*FIVE_MINUTES))
def reset_users_in_room_for_role(self, room_id: str, role: str) -> None:
key = RedisKeys.users_in_room_for_role(room_id, role)
self.cache.delete(key)
def get_users_in_channel_for_role(self, channel_id: str, role: str) -> dict:
key = RedisKeys.users_in_channel_for_role(channel_id, role)
return self.cache.get(key)
def set_users_in_channel_for_role(self, channel_id: str, role: str, users: dict) -> None:
key = RedisKeys.users_in_channel_for_role(channel_id, role)
self.cache.set(key, users, ttl=int(FIVE_MINUTES + random.random()*FIVE_MINUTES))
def reset_users_in_channel_for_role(self, channel_id: str, role: str) -> None:
key = RedisKeys.users_in_channel_for_role(channel_id, role)
self.cache.delete(key)
def set_acls_in_channel_for_action(self, channel_id: str, action: str, acls: dict) -> None:
key = RedisKeys.acls_in_channel_for_action(channel_id, action)
self.cache.set(key, acls, ttl=FIVE_MINUTES)
def reset_acls_in_channel_for_action(self, channel_id: str, action: str) -> None:
key = RedisKeys.acls_in_channel_for_action(channel_id, action)
self.cache.delete(key)
def reset_acls_in_room_for_action(self, room_id: str, action: str) -> None:
key = RedisKeys.acls_in_room_for_action(room_id, action)
self.cache.delete(key)
def reset_acls_in_channel(self, channel_id: str) -> None:
key = RedisKeys.acls_in_channel(channel_id)
self.cache.delete(key)
def reset_acls_in_room(self, room_id: str) -> None:
key = RedisKeys.acls_in_room(room_id)
self.cache.delete(key)
def get_join_count_by_name(self, room_name: str) -> Optional[int]:
key = RedisKeys.join_counts_by_name(room_name)
n_joins = self.redis.get(key)
if n_joins is None:
return None
return int(float(str(n_joins, "utf-8")))
def get_join_count(self, room_id: str) -> Optional[int]:
key = RedisKeys.join_counts(room_id)
n_joins = self.redis.get(key)
if n_joins is None:
return None
return int(float(str(n_joins, "utf-8")))
def set_join_count_by_name(self, room_name: str, n_joins: int) -> None:
key = RedisKeys.join_counts_by_name(room_name)
self.redis.set(key, n_joins)
self.redis.expire(key, SEVEN_DAYS)
def set_join_count(self, room_id: str, n_joins: int) -> None:
key = RedisKeys.join_counts(room_id)
self.redis.set(key, n_joins)
self.redis.expire(key, SEVEN_DAYS)
def set_all_acls_for_channel(self, channel_id: str, acls: dict) -> None:
key = RedisKeys.acls_in_channel(channel_id)
self.cache.set(key, acls, ttl=int(FIVE_MINUTES + random.random() * FIVE_MINUTES))
def set_all_acls_for_room(self, room_id: str, acls: dict) -> None:
key = RedisKeys.acls_in_room(room_id)
self.cache.set(key, acls, ttl=int(FIVE_MINUTES + random.random()*FIVE_MINUTES))
def get_all_acls_for_channel(self, channel_id: str) -> dict:
key = RedisKeys.acls_in_channel(channel_id)
return self.cache.get(key)
def get_all_acls_for_room(self, room_id: str) -> dict:
key = RedisKeys.acls_in_room(room_id)
return self.cache.get(key)
def reset_channels_with_sort(self):
key = RedisKeys.channels_with_sort()
self.cache.delete(key)
self.redis.delete(key)
def set_all_rooms(self, all_rooms):
key = RedisKeys.all_rooms()
self.cache.delete(key)
self.cache.set(key, all_rooms, ttl=ONE_MINUTE)
def get_all_rooms(self) -> Union[List, None]:
key = RedisKeys.all_rooms()
return self.cache.get(key)
def get_can_whisper_to_user(self, sender_id: str, target_user_name: str):
key = RedisKeys.can_whisper_to(sender_id)
cache_key = '%s-%s' % (key, target_user_name)
can_whisper_and_reason = self.cache.get(cache_key)
if can_whisper_and_reason is not None:
can_whisper, reason_code = can_whisper_and_reason
return can_whisper, reason_code
can_whisper = self.redis.hget(key, target_user_name)
if can_whisper is None:
return None, None
can_whisper, reason_code = str(can_whisper, 'utf-8').split('|')
return can_whisper == '1', reason_code
def set_can_whisper_to_user(self, sender_id: str, target_user_name: str, allowed: bool, reason_code: int) -> None:
# if not allowed, we need to check remote system, maybe they will soon be allowed
if not allowed:
return
key = RedisKeys.can_whisper_to(sender_id)
cache_key = '%s-%s' % (key, target_user_name)
can_whisper_and_reason = '|'.join([
'1' if allowed else '0',
str(reason_code)
])
self.cache.set(cache_key, (allowed, reason_code), ttl=TEN_MINUTES)
self.redis.hset(key, target_user_name, can_whisper_and_reason)
self.redis.expire(key, EIGHT_HOURS_IN_SECONDS)
def get_channels_with_sort(self):
key = RedisKeys.channels_with_sort()
channels = self.cache.get(key)
if channels is not None:
return channels
raw_channels = self.redis.hgetall(key)
clean_channels = dict()
if raw_channels is None or len(raw_channels) == 0:
return None
for channel_id, channel_sort_channel_name in raw_channels.items():
try:
channel_sort_channel_name = str(channel_sort_channel_name, 'utf8')
channel_sort, channel_tags, channel_name = channel_sort_channel_name.split('|', maxsplit=2)
channel_sort = int(channel_sort)
channel_id = str(channel_id, 'utf8')
if channel_tags == 'None':
channel_tags = ''
clean_channels[channel_id] = (channel_name, channel_sort, channel_tags)
except Exception as e:
logger.error('invalid channel name in redis with key {}, value was "{}": {}'.format(
key, channel_sort_channel_name, str(e)))
self.cache.set(key, clean_channels, ttl=ONE_MINUTE)
return clean_channels
def set_channels_with_sort(self, channels):
if len(channels) == 0:
return
key = RedisKeys.channels_with_sort()
self.cache.set(key, channels, ttl=ONE_MINUTE)
redis_channels = dict()
for channel_id, (channel_name, channel_sort, tags) in channels.items():
redis_channels[channel_id] = '{}|{}|{}'.format(str(channel_sort), tags, channel_name)
self.redis.hmset(key, redis_channels)
self.redis.expire(key, ONE_MINUTE)
def get_channel_ban_timestamp(self, channel_id: str, user_id: str) -> str:
key = RedisKeys.banned_users_channel(channel_id)
return self._get_ban_timestamp(key, user_id)
def get_room_mute_timestamp(self, room_id: str, user_id: str) -> (str, str):
key = RedisKeys.muted_users(room_id)
return self._get_mute_timestamp(key, user_id)
def get_room_ban_timestamp(self, room_id: str, user_id: str) -> str:
key = RedisKeys.banned_users(room_id)
return self._get_ban_timestamp(key, user_id)
def remove_room_id_for_name(self, channel_id: str, room_name: str) -> None:
key = RedisKeys.room_id_for_name(channel_id)
cache_key = '%s-%s' % (key, room_name)
self.cache.delete(cache_key)
self.redis.hdel(key, room_name)
def get_room_id_for_name(self, channel_id: str, room_name: str) -> str:
key = RedisKeys.room_id_for_name(channel_id)
cache_key = '%s-%s' % (key, room_name)
value = self.cache.get(cache_key)
if value is not None:
return value
value = self.redis.hget(key, room_name)
if value is None:
return None
value = str(value, 'utf-8')
self.cache.set(cache_key, value)
return value
def set_room_id_for_name(self, channel_id, room_name, room_id):
key = RedisKeys.room_id_for_name(channel_id)
cache_key = '%s-%s' % (key, room_name)
self.cache.set(cache_key, room_id)
self.redis.hset(key, room_name, room_id)
def get_user_id(self, user_name: str) -> str:
key = RedisKeys.user_ids()
cache_key = '%s-%s' % (key, user_name)
value = self.cache.get(cache_key)
if value is not None:
return value
user_id = self.redis.hget(key, user_name)
if user_id is not None:
user_id = str(user_id, 'utf-8')
self.cache.set(cache_key, user_id)
return user_id
return user_id
def set_user_id(self, user_id: str, user_name: str):
key = RedisKeys.user_ids()
cache_key = '%s-%s' % (key, user_name)
self.redis.hset(key, user_name, user_id)
self.cache.set(cache_key, user_id)
def get_user_name(self, user_id: str) -> str:
key = RedisKeys.user_names()
cache_key = '%s-%s' % (key, user_id)
value = self.cache.get(cache_key)
if value is not None:
return value
user_name = self.redis.hget(key, user_id)
if user_name is not None:
user_name = str(user_name, 'utf-8')
self.cache.set(cache_key, user_name)
return user_name
return user_name
def set_user_name(self, user_id: str, user_name: str):
key = RedisKeys.user_names()
cache_key = '%s-%s' % (key, user_id)
self.redis.hset(key, user_id, user_name)
self.cache.set(cache_key, user_name)
def get_room_exists(self, channel_id, room_id):
key = RedisKeys.rooms(channel_id)
cache_key = '%s-%s' % (key, room_id)
value = self.cache.get(cache_key)
if value is not None:
return True
exists = self.redis.hexists(key, room_id)
if exists == 1:
self.cache.set(cache_key, True)
return True
return None
def remove_channel_exists(self, channel_id: str) -> None:
key = RedisKeys.channel_exists()
cache_key = '%s-%s' % (key, channel_id)
self.redis.hdel(key, channel_id)
self.cache.delete(cache_key)
key = RedisKeys.channels()
cache_key = '%s-name-%s' % (key, channel_id)
self.cache.delete(cache_key)
self.redis.hdel(key, channel_id)
def remove_room_exists(self, channel_id, room_id):
key = RedisKeys.rooms(channel_id)
cache_key = '%s-%s' % (key, room_id)
self.cache.set(cache_key, None)
self.redis.hdel(key, room_id)
key = RedisKeys.channel_for_rooms()
cache_key = '%s-%s' % (key, room_id)
self.cache.delete(cache_key)
self.redis.hdel(key, room_id)
key = RedisKeys.room_roles(room_id)
self.redis.delete(key)
key = RedisKeys.room_name_for_id()
cache_key = '%s-%s-name' % (key, room_id)
room_name = self.redis.hget(key, room_id)
if room_name is not None:
try:
room_name = str(room_name, 'utf-8')
except Exception:
pass
self.cache.delete(cache_key)
self.redis.hdel(key, room_id)
key = RedisKeys.room_id_for_name(channel_id)
self.redis.hdel(key, room_id)
if room_name is not None:
cache_key = '%s-%s' % (key, room_name)
self.cache.delete(cache_key)
for role in RoleKeys.all_roles:
key = RedisKeys.users_in_room_for_role(room_id, role)
self.redis.delete(key)
def set_room_exists(self, channel_id, room_id, room_name):
key = RedisKeys.rooms(channel_id)
cache_key = '%s-%s' % (key, room_id)
self.cache.set(cache_key, room_name)
self.redis.hset(key, room_id, room_name)
self.redis.expire(key, ONE_MINUTE)
def set_channel_exists(self, channel_id: str) -> None:
key = RedisKeys.channel_exists()
cache_key = '%s-%s' % (key, channel_id)
self.redis.hset(key, channel_id, int(True))
self.redis.expire(key, ONE_MINUTE)
self.cache.set(cache_key, True)
def set_channel_for_room(self, channel_id: str, room_id: str) -> None:
key = RedisKeys.channel_for_rooms()
cache_key = '%s-%s' % (key, room_id)
self.redis.hset(key, room_id, channel_id)
self.redis.expire(key, ONE_HOUR)
self.cache.set(cache_key, channel_id, ttl=TEN_MINUTES)
def get_channel_exists(self, channel_id):
key = RedisKeys.channel_exists()
cache_key = '%s-%s' % (key, channel_id)
value = self.cache.get(cache_key)
if value is not None:
return True
value = self.redis.hget(key, channel_id)
if value is None:
return None
self.cache.set(cache_key, True)
return True
def set_channel_name(self, channel_id: str, channel_name: str) -> None:
key = RedisKeys.channels()
cache_key = '%s-name-%s' % (key, channel_id)
self.cache.set(cache_key, channel_name)
self.redis.hset(key, channel_id, channel_name)
self.redis.expire(key, TEN_MINUTES)
def get_user_name_exists(self, user_name: str) -> bool:
key = RedisKeys.user_names_set()
cache_key = '{}-{}'.format(key, user_name)
if self.cache.get(cache_key) is not None:
return True
return self.redis.sismember(key, user_name)
def set_user_name_exists(self, user_name: str):
key = RedisKeys.user_names_set()
cache_key = '{}-{}'.format(key, user_name)
self.cache.set(cache_key, True, ttl=TEN_MINUTES)
self.redis.sadd(key, user_name)
def get_channel_name(self, channel_id: str) -> str:
key = RedisKeys.channels()
cache_key = '%s-name-%s' % (key, channel_id)
value = self.cache.get(cache_key)
if value is not None:
return value
value = self.redis.hget(key, channel_id)
if value is None:
return None
value = str(value, 'utf-8')
self.cache.set(cache_key, value)
return value
def get_room_name(self, room_id: str) -> str:
key = RedisKeys.room_name_for_id()
cache_key = '%s-%s-name' % (key, room_id)
value = self.cache.get(cache_key)
if value is not None:
return value
value = self.redis.hget(key, room_id)
if value is None:
return None
value = str(value, 'utf-8')
self.cache.set(cache_key, value)
return value
def set_room_name(self, room_id: str, room_name: str) -> None:
key = RedisKeys.room_name_for_id()
cache_key = '%s-%s-name' % (key, room_id)
self.cache.set(cache_key, room_name, ttl=int(FIVE_MINUTES + random.random()*ONE_MINUTE))
self.redis.hset(key, room_id, room_name)
self.redis.expire(key, TEN_MINUTES)
def get_channel_for_room(self, room_id):
key = RedisKeys.channel_for_rooms()
cache_key = '%s-%s' % (key, room_id)
value = self.cache.get(cache_key)
if value is not None:
return value
channel_id = self.redis.hget(key, room_id)
if channel_id is None:
return None
channel_id = str(channel_id, 'utf-8')
self.cache.set(cache_key, channel_id)
return channel_id
def get_user_status(self, user_id: str):
key = RedisKeys.user_status(user_id)
status = self.cache.get(key)
if status is not None:
return status
status = self.redis.get(key)
if status is None or status == '':
return None
status = str(status, 'utf-8')
self.cache.set(key, status, ttl=TEN_SECONDS)
return status
def get_room_owners(self, room_id: str) -> Optional[Set]:
key = RedisKeys.room_owners(room_id)
value = self.cache.get(key)
if value is not None:
return value
value = self.redis.get(key)
if value is None:
return None
value = str(value, 'utf-8')
value = set(value.split(","))
self.cache.set(key, value, ttl=int(TEN_MINUTES + random.random() * TEN_MINUTES))
return value
def set_room_owners(self, room_id, owners: Set[str]) -> None:
key = RedisKeys.room_owners(room_id)
owners_str = ",".join(owners)
self.cache.set(key, owners, ttl=int(TEN_MINUTES + random.random() * TEN_MINUTES))
self.redis.set(key, owners_str)
self.redis.expire(key, int(EIGHT_HOURS_IN_SECONDS + random.random() * EIGHT_HOURS_IN_SECONDS))
def set_user_status(self, user_id: str, status: str) -> None:
key = RedisKeys.user_status(user_id)
self.cache.set(key, status, ttl=THIRTY_SECONDS)
self.redis.set(key, status)
def get_user_info(self, user_id: str) -> dict:
key = RedisKeys.auth_key(user_id)
return self.cache.get(key)
def set_user_info(self, user_id: str, info: dict) -> None:
key = RedisKeys.auth_key(user_id)
self.cache.set(key, info, ttl=int(FIVE_MINUTES + random.random() * ONE_MINUTE))
def reset_user_info(self, user_id: str) -> None:
key = RedisKeys.auth_key(user_id)
self.cache.delete(key)
def user_check_status(self, user_id, other_status):
return self.get_user_status(user_id) == other_status
def user_is_offline(self, user_id):
return self.user_check_status(user_id, UserKeys.STATUS_UNAVAILABLE)
def user_is_online(self, user_id):
return self.user_check_status(user_id, UserKeys.STATUS_AVAILABLE)
def user_is_invisible(self, user_id):
return self.user_check_status(user_id, UserKeys.STATUS_INVISIBLE)
def user_is_in_multicast(self, user_id):
return self.redis.sismember(RedisKeys.users_multi_cast(), str(user_id))
def remove_from_multicast_on_disconnect(self, user_id: str) -> None:
try:
user_id_str = str(user_id).strip()
self.redis.srem(RedisKeys.users_multi_cast(), user_id_str)
except Exception as e:
logger.error('could remove user from multicast: %s' % str(e))
logger.exception(traceback.format_exc())
raise e # force catch from caller
def get_last_online(self, user_id: str) -> Union[str, None]:
user_id_str = str(user_id).strip()
key = RedisKeys.user_last_online(user_id_str)
last_online = self.cache.get(key)
if last_online is not None:
return last_online
last_online = self.redis.get(key)
if last_online is None:
return None
last_online = str(last_online, 'utf-8')
self.cache.set(key, last_online)
return last_online
def set_last_online(self, last_online_times: list):
"""
only called while warming up the cache
"""
for chunk in split_into_chunks(last_online_times, 500):
pipe = self.redis.pipeline()
for user_id, at in chunk:
key = RedisKeys.user_last_online(user_id)
pipe.set(key, str(at))
pipe.execute()
def set_user_offline(self, user_id: str) -> None:
try:
user_id_str = str(user_id).strip()
user_id_int = int(float(user_id))
now = int(datetime.utcnow().timestamp())
self._set_last_online(user_id_str)
self.cache.set(RedisKeys.user_status(user_id_str), UserKeys.STATUS_UNAVAILABLE, ttl=THIRTY_SECONDS)
p = self.redis.pipeline()
p.setbit(RedisKeys.online_bitmap(), user_id_int, 0)
p.srem(RedisKeys.online_set(), user_id_str)
p.srem(RedisKeys.users_multi_cast(), user_id_str)
p.set(RedisKeys.user_status(user_id_str), UserKeys.STATUS_UNAVAILABLE)
p.zadd(RedisKeys.user_status_changed_at(), {user_id_str: now})
p.execute()
if self.status_topic is not None:
self.env.publish(
activity_for_status_change(user_id, "offline"),
external=True,
topic=self.status_topic
)
self.trim_user_changed_at()
except Exception as e:
logger.error('could not set_user_offline(): %s' % str(e))
logger.exception(traceback.format_exc())
raise e # force catch from caller
def set_user_away(self, user_id: str) -> None:
key = RedisKeys.user_status(user_id)
self.cache.set(key, UserKeys.STATUS_AWAY, ttl=THIRTY_SECONDS)
self.redis.set(key, UserKeys.STATUS_AWAY)
if self.status_topic is not None:
self.env.publish(
activity_for_status_change(user_id, "away"),
external=True,
topic=self.status_topic
)
def set_user_online(self, user_id: str) -> None:
try:
user_id_str = str(user_id).strip()
user_id_int = int(float(user_id))
now = int(datetime.utcnow().timestamp())
self.cache.set(RedisKeys.user_status(user_id_str), UserKeys.STATUS_AVAILABLE, ttl=THIRTY_SECONDS)
p = self.redis.pipeline()
p.setbit(RedisKeys.online_bitmap(), user_id_int, 1)
p.sadd(RedisKeys.online_set(), user_id_str)
p.sadd(RedisKeys.users_multi_cast(), user_id_str)
p.set(RedisKeys.user_status(user_id_str), UserKeys.STATUS_AVAILABLE)
p.zadd(RedisKeys.user_status_changed_at(), {user_id_str: now})
p.execute()
if self.status_topic is not None:
self.env.publish(
activity_for_status_change(user_id, "online"),
external=True,
topic=self.status_topic
)
self.trim_user_changed_at()
except Exception as e:
logger.error('could not set_user_online(): %s' % str(e))
logger.exception(traceback.format_exc())
def set_user_status_invisible(self, user_id: str) -> None:
"""
if status is changed to "invisible" using the rest api when the user is
not online, e.g. during "invisible login"
"""
try:
user_id_str = str(user_id).strip()
now = int(datetime.utcnow().timestamp())
self.cache.set(RedisKeys.user_status(user_id_str), UserKeys.STATUS_INVISIBLE, ttl=THIRTY_SECONDS)
p = self.redis.pipeline()
p.set(RedisKeys.user_status(user_id_str), UserKeys.STATUS_INVISIBLE)
p.zadd(RedisKeys.user_status_changed_at(), {user_id_str: now})
p.execute()
if self.status_topic is not None:
self.env.publish(
activity_for_status_change(user_id, "invisible"),
external=True,
topic=self.status_topic
)
self.trim_user_changed_at()
except Exception as e:
logger.error('could not set_user_status_invisible(): %s' % str(e))
logger.exception(traceback.format_exc())
def _set_last_online(self, user_id: str):
u = datetime.utcnow()
u = u.replace(tzinfo=pytz.utc)
unix_time = str(int(u.timestamp()))
logger.info('setting last online for {} to {}'.format(user_id, unix_time))
last_online_key = RedisKeys.user_last_online(user_id)
self.cache.set(last_online_key, unix_time, ttl=ONE_HOUR)
p = self.redis.pipeline()
p.set(last_online_key, unix_time)
p.expire(last_online_key, SEVEN_DAYS)
p.execute()
if self.status_topic is not None:
self.env.publish(
activity_for_status_change(user_id, "lastonline"),
external=True,
topic=self.status_topic
)
def set_user_invisible(self, user_id: str, update_last_online: bool = True) -> None:
try:
user_id_str = str(user_id).strip()
user_id_int = int(float(user_id))
now = int(datetime.utcnow().timestamp())
self.cache.set(RedisKeys.user_status(user_id_str), UserKeys.STATUS_INVISIBLE, ttl=THIRTY_SECONDS)
p = self.redis.pipeline()
p.setbit(RedisKeys.online_bitmap(), user_id_int, 0)
p.srem(RedisKeys.online_set(), user_id_str)
p.sadd(RedisKeys.users_multi_cast(), user_id_str)
p.set(RedisKeys.user_status(user_id_str), UserKeys.STATUS_INVISIBLE)
p.zadd(RedisKeys.user_status_changed_at(), {user_id_str: now})
p.execute()
if self.status_topic is not None:
self.env.publish(
activity_for_status_change(user_id, "invisible"),
external=True,
topic=self.status_topic
)
self.trim_user_changed_at()
if update_last_online:
self._set_last_online(user_id_str)
except Exception as e:
logger.error('could not set_user_invisible(): %s' % str(e))
logger.exception(traceback.format_exc())
def set_session_count(self, session_count: int) -> None:
node_key = '{}-{}'.format(self.listen_host, self.listen_port)
self.redis.hset(RedisKeys.session_count(), node_key, session_count)
def trim_user_changed_at(self):
# just trim sometimes, O(log(n)+m) to remove
if int(random.random() * 100000) == 1:
minus_24h = (datetime.utcnow() - timedelta(hours=24)).timestamp()
self.redis.zremrangebyscore(RedisKeys.user_status_changed_at(), LONG_AGO, minus_24h)