vj4/handler/user.py

Summary

Maintainability
A
25 mins
Test Coverage
import asyncio
import datetime

from vj4 import app
from vj4 import constant
from vj4 import error
from vj4.model import builtin
from vj4.model import domain
from vj4.model import record
from vj4.model import system
from vj4.model import token
from vj4.model import user
from vj4.model.adaptor import discussion
from vj4.model.adaptor import problem
from vj4.model.adaptor import setting
from vj4.util import misc
from vj4.util import options
from vj4.util import validator
from vj4.handler import base


class UserSettingsMixin(object):
  def can_view(self, udoc, key):
    privacy = udoc.get('show_' + key, next(iter(setting.SETTINGS_BY_KEY['show_' + key].range)))
    return udoc['_id'] == self.user['_id'] \
           or (privacy == constant.setting.PRIVACY_PUBLIC and True) \
           or (privacy == constant.setting.PRIVACY_REGISTERED_ONLY
               and self.has_priv(builtin.PRIV_USER_PROFILE)) \
           or (privacy == constant.setting.PRIVACY_SECRET
               and self.has_priv(builtin.PRIV_VIEW_USER_SECRET))

  def get_udoc_setting(self, udoc, key):
    if self.can_view(udoc, key):
      return udoc.get(key, None)
    else:
      return None


@app.route('/register', 'user_register', global_route=True)
class UserRegisterHandler(base.Handler):
  @base.require_priv(builtin.PRIV_REGISTER_USER)
  async def get(self):
    self.render('user_register.html')

  @base.require_priv(builtin.PRIV_REGISTER_USER)
  @base.post_argument
  @base.sanitize
  @base.limit_rate('send_mail', 3600, 30)
  async def post(self, *, mail: str):
    validator.check_mail(mail)
    if await user.get_by_mail(mail):
      raise error.UserAlreadyExistError(mail)
    rid, _ = await token.add(token.TYPE_REGISTRATION,
                             options.registration_token_expire_seconds,
                             mail=mail)
    await self.send_mail(mail, 'Sign Up', 'user_register_mail.html',
                         url=self.reverse_url('user_register_with_code', code=rid))
    self.render('user_register_mail_sent.html')


@app.route('/register/{code}', 'user_register_with_code', global_route=True)
class UserRegisterWithCodeHandler(base.Handler):
  TITLE = 'user_register'

  @base.require_priv(builtin.PRIV_REGISTER_USER)
  @base.route_argument
  @base.sanitize
  async def get(self, *, code: str):
    doc = await token.get(code, token.TYPE_REGISTRATION)
    if not doc:
      raise error.InvalidTokenError(token.TYPE_REGISTRATION, code)
    self.render('user_register_with_code.html', mail=doc['mail'])

  @base.require_priv(builtin.PRIV_REGISTER_USER)
  @base.route_argument
  @base.post_argument
  @base.sanitize
  async def post(self, *, code: str, uname: str, password: str, verify_password: str):
    doc = await token.get(code, token.TYPE_REGISTRATION)
    if not doc:
      raise error.InvalidTokenError(token.TYPE_REGISTRATION, code)
    if password != verify_password:
      raise error.VerifyPasswordError()
    uid = await system.inc_user_counter()
    await user.add(uid, uname, password, doc['mail'], self.remote_ip)
    await token.delete(code, token.TYPE_REGISTRATION)
    await self.update_session(new_saved=False, uid=uid)
    self.json_or_redirect(self.reverse_url('domain_main'))


@app.route('/lostpass', 'user_lostpass', global_route=True)
class UserLostpassHandler(base.Handler):
  @base.require_priv(builtin.PRIV_REGISTER_USER)
  async def get(self):
    self.render('user_lostpass.html')

  @base.require_priv(builtin.PRIV_REGISTER_USER)
  @base.post_argument
  @base.sanitize
  @base.limit_rate('send_mail', 3600, 30)
  async def post(self, *, mail: str):
    validator.check_mail(mail)
    udoc = await user.get_by_mail(mail)
    if not udoc:
      raise error.UserNotFoundError(mail)
    rid, _ = await token.add(token.TYPE_LOSTPASS,
                             options.lostpass_token_expire_seconds,
                             uid=udoc['_id'])
    await self.send_mail(mail, 'Lost Password', 'user_lostpass_mail.html',
                         url=self.reverse_url('user_lostpass_with_code', code=rid),
                         uname=udoc['uname'])
    self.render('user_lostpass_mail_sent.html')


@app.route('/lostpass/{code}', 'user_lostpass_with_code', global_route=True)
class UserLostpassWithCodeHandler(base.Handler):
  TITLE = 'user_lostpass'

  @base.require_priv(builtin.PRIV_REGISTER_USER)
  @base.route_argument
  @base.sanitize
  async def get(self, *, code: str):
    tdoc = await token.get(code, token.TYPE_LOSTPASS)
    if not tdoc:
      raise error.InvalidTokenError(token.TYPE_LOSTPASS, code)
    udoc = await user.get_by_uid(tdoc['uid'])
    self.render('user_lostpass_with_code.html', uname=udoc['uname'])

  @base.require_priv(builtin.PRIV_REGISTER_USER)
  @base.route_argument
  @base.post_argument
  @base.sanitize
  async def post(self, *, code: str, password: str, verify_password: str):
    tdoc = await token.get(code, token.TYPE_LOSTPASS)
    if not tdoc:
      raise error.InvalidTokenError(token.TYPE_LOSTPASS, code)
    if password != verify_password:
      raise error.VerifyPasswordError()
    await user.set_password(tdoc['uid'], password)
    await token.delete(code, token.TYPE_LOSTPASS)
    self.json_or_redirect(self.reverse_url('domain_main'))


@app.route('/login', 'user_login', global_route=True)
class UserLoginHandler(base.Handler):
  async def get(self):
    if self.has_priv(builtin.PRIV_USER_PROFILE):
      self.redirect(self.reverse_url('domain_main'))
    else:
      self.render('user_login.html')

  @base.post_argument
  @base.sanitize
  async def post(self, *, uname: str, password: str, rememberme: bool=False):
    udoc = await user.check_password_by_uname(uname, password, auto_upgrade=True)
    if not udoc:
      raise error.LoginError(uname)
    await asyncio.gather(user.set_by_uid(udoc['_id'],
                                         loginat=datetime.datetime.utcnow(),
                                         loginip=self.remote_ip),
                         self.update_session(new_saved=rememberme, uid=udoc['_id']))
    self.json_or_redirect(self.referer_or_main)


@app.route('/logout', 'user_logout', global_route=True)
class UserLogoutHandler(base.Handler):
  @base.require_priv(builtin.PRIV_USER_PROFILE)
  async def get(self):
    self.render('user_logout.html')

  @base.require_priv(builtin.PRIV_USER_PROFILE)
  @base.post_argument
  @base.require_csrf_token
  async def post(self):
    await self.delete_session()
    self.json_or_redirect(self.referer_or_main)


@app.route('/user/{uid:-?\d+}', 'user_detail')
class UserDetailHandler(base.Handler, UserSettingsMixin):
  @base.route_argument
  @base.sanitize
  async def get(self, *, uid: int):
    is_self_profile = self.has_priv(builtin.PRIV_USER_PROFILE) and self.user['_id'] == uid
    udoc = await user.get_by_uid(uid)
    if not udoc:
      raise error.UserNotFoundError(uid)
    dudoc, sdoc = await asyncio.gather(domain.get_user(self.domain_id, udoc['_id']),
                                       token.get_most_recent_session_by_uid(udoc['_id']))

    rdocs = record.get_multi(get_hidden=self.has_priv(builtin.PRIV_VIEW_HIDDEN_RECORD),
                             uid=uid).sort([('_id', -1)])
    rdocs = await rdocs.limit(10).to_list()
    pdict = await problem.get_dict_multi_domain((rdoc['domain_id'], rdoc['pid']) for rdoc in rdocs)

    # check hidden problem
    if not self.has_perm(builtin.PERM_VIEW_PROBLEM_HIDDEN):
      f = {'hidden': False}
    else:
      f = {}
    pdocs = problem.get_multi(domain_id=self.domain_id, owner_uid=uid, **f).sort([('_id', -1)])
    pcount = await pdocs.count()
    pdocs = await pdocs.limit(10).to_list()

    psdocs = problem.get_multi_solution_by_uid(self.domain_id, uid)
    psdocs_hot = problem.get_multi_solution_by_uid(self.domain_id, uid)
    pscount = await psdocs.count()
    psdocs = await psdocs.limit(10).to_list()
    psdocs_hot = await psdocs_hot.sort([('vote', -1), ('doc_id', -1)]).limit(10).to_list()

    if self.has_perm(builtin.PERM_VIEW_DISCUSSION):
      ddocs = discussion.get_multi(self.domain_id, owner_uid=uid)
      dcount = await ddocs.count()
      ddocs = await ddocs.limit(10).to_list()
      vndict = await discussion.get_dict_vnodes(self.domain_id, map(discussion.node_id, ddocs))
    else:
      ddocs = []
      vndict = {}
      dcount = 0

    self.render('user_detail.html', is_self_profile=is_self_profile,
                udoc=udoc, dudoc=dudoc, sdoc=sdoc,
                rdocs=rdocs, pdict=pdict, pdocs=pdocs, pcount=pcount,
                psdocs=psdocs, pscount=pscount, psdocs_hot=psdocs_hot,
                ddocs=ddocs, dcount=dcount, vndict=vndict)


@app.route('/user/search', 'user_search')
class UserSearchHandler(base.Handler):
  def modify_udoc(self, udict, key):
    udoc = udict[key]
    gravatar_url = misc.gravatar_url(udoc.get('gravatar'))
    if 'gravatar' in udoc and udoc['gravatar']:
      udict[key] = {**udoc,
                    'gravatar_url': gravatar_url,
                    'gravatar': ''}

  @base.require_priv(builtin.PRIV_USER_PROFILE)
  @base.get_argument
  @base.route_argument
  @base.sanitize
  async def get(self, *, q: str, exact_match: bool=False):
    if exact_match:
      udocs = []
    else:
      udocs = await user.get_prefix_list(q, user.PROJECTION_PUBLIC, 20)
    try:
      udoc = await user.get_by_uid(int(q), user.PROJECTION_PUBLIC)
      if udoc:
        udocs.insert(0, udoc)
    except ValueError as e:
      pass
    for i in range(len(udocs)):
      self.modify_udoc(udocs, i)
    self.json(udocs)