src/spicy/core/profile/services.py
# -*- coding: utf-8 -*-
import datetime
import re
from django.conf import settings
from django.core.cache import cache
from django.core.urlresolvers import reverse
from django.http import HttpResponse
from django.utils.html import escape
from django.utils.translation import ugettext_lazy as _
from django.contrib.auth import login as auth_login
from django.contrib.auth import REDIRECT_FIELD_NAME
from spicy.core.profile.views import generate_random_password
from spicy.core.service import api
from spicy.utils import load_module
from spicy.utils.models import get_custom_model_class
from spicy.core.siteskin.decorators import ajax_request, render_to
from . import defaults, models
from .decorators import is_staff
from .forms import LoginForm
Profile = get_custom_model_class(defaults.CUSTOM_USER_MODEL)
CUSTOM_USER_SIGNUP_FORM = getattr(
settings, 'CUSTOM_USER_SIGNUP_FORM', 'spicy.core.profile.forms.SignupForm')
class ProfileProvider(api.Provider):
model = defaults.CUSTOM_PERMISSION_PROVIDER_MODEL
@ajax_request('/$', is_public=True, use_siteskin=True, use_cache=False)
def login(self, request):
result = self.service.login(request)
status = result['status']
message = result['message']
return dict(status=status, message=message)
@ajax_request('/$', is_public=True, use_siteskin=True, use_cache=False)
def register(self, request):
result = self.service.register(request)
status = result['status']
message = result['message']
return dict(status=status, message=message)
@render_to('profile/social_associations.html', is_public=True,
url_pattern='/(?P<profile_id>[\d]+)/$', use_siteskin=True)
def social_associations(self, request, profile_id):
backends = (
('twitter', 'Twitter'),
('facebook', 'Facebook'),
('linkedin', 'Linkedin'),
('google-oauth', 'Google'),
#('mailru-oauth2', 'Mail.ru'),
('odnoklassniki', 'Одноклассники'),
('vkontakte-oauth2', 'ВКонтакте'),
('vkontakte', 'ВКонтакте'),
('yandex', 'Yandex'),
('livejournal', 'Livejournal'),
('openid', 'OpenID'),
)
associated = request.user.social_auth.all().values_list(
'provider', flat=True)
return {'backends': backends, 'associated': associated}
@is_staff()
@ajax_request('/(?P<consumer_type>\w+)/(?P<consumer_id>\d+)/'
'(?P<tab_name>\w*)/?')
def current_editors(self, request, consumer_type, consumer_id, tab_name):
cache_key_doc = u':'.join((
defaults.ACCESS_CACHE_PREFIX, consumer_type, consumer_id))
if tab_name:
cache_key_tab = u':'.join((cache_key_doc, tab_name))
cache_keys = [(cache_key_doc, 'all_tabs')]
if tab_name:
cache_keys.append((cache_key_tab, 'current_tab'))
now = datetime.datetime.now()
min_time = now - datetime.timedelta(0, defaults.ACCESS_CACHE_PERIOD)
results = {}
users_dict = {}
for cache_key, result_name in cache_keys:
access_dict = cache.get(cache_key, {})
new_dict = {}
current_user_ids = set()
current_users = []
for user_id, access_time in access_dict.iteritems():
if access_time < min_time:
continue
else:
current_user_ids.add(user_id)
new_dict[user_id] = access_time
current_user_ids.add(request.user.pk)
for user_id in current_user_ids:
if not user_id in users_dict.has_key:
profile = Profile.objects.get(pk=user_id)
users_dict[user_id] = u'%s <%s>' % (
escape(profile), profile.email)
current_users.append([user_id, users_dict[user_id]])
current_users.sort()
new_dict[request.user.pk] = now
cache.set(cache_key, new_dict, defaults.ACCESS_CACHE_PERIOD)
results[result_name] = current_users
if not tab_name:
results['current_tab'] = []
return results
@render_to('', url_pattern='/', is_public=True)
def get_captcha(self, request):
from captcha.conf.settings import get_challenge
from captcha.models import CaptchaStore
challenge, response = get_challenge()()
store = CaptchaStore.objects.create(
challenge=challenge, response=response)
key = store.hashkey
return HttpResponse(key)
class ProfileService(api.Interface):
name = 'profile'
label = _('Profile provider service')
statistic_types = (
('default', _('New users')),
('actived', _('Only activated users')),
('commented', _('New users that commented')),
('orders', _('New users that did orders')),
('orders_and_commented', _('New users that commented and did orders')),
('pins_activated', _('Users with pins activated')),
('pins_not_activated', _('Users with pins not activated')),
)
PROVIDER_TEMPLATES_DIR = 'profile/providers/'
schema = dict(GENERIC_CONSUMER=ProfileProvider)
def _get_redirect(self, request):
redirect = request.GET.get(
REDIRECT_FIELD_NAME,
request.session.get(
REDIRECT_FIELD_NAME,
request.session.get('newuser-next', '')))
try:
del request.session[REDIRECT_FIELD_NAME]
except KeyError:
pass
if callable(defaults.DEFAULT_PROFILE_URL):
user_redirect_uri = defaults.DEFAULT_PROFILE_URL(request.user)
elif isinstance(defaults.DEFAULT_PROFILE_URL, basestring):
user_redirect_uri = load_module(defaults.DEFAULT_PROFILE_URL)(
request.user)
else:
user_redirect_uri = defaults.DEFAULT_PROFILE_URL
redirect = redirect or user_redirect_uri
return redirect
def login(self, request):
status = 'error'
message = ''
redirect = None
if request.user.is_authenticated():
redirect = self._get_redirect(request)
return dict(
status='ok',
message=unicode(_('User is signed in already')),
redirect=redirect)
can_login = True
#captcha_required = False
real_ip = request.META.get('REMOTE_ADDR')
is_blacklisted = False
# BBB
# is_blacklisted = real_ip and models.BlacklistedIP.objects.filter(
# ip=real_ip).exists()
if request.method == 'POST':
form = LoginForm(data=request.POST)
# Brute force check.
username = request.POST.get('username')
login_check = self.check_login(
username, real_ip)
if login_check == defaults.AUTH_DISALLOW:
can_login = False
elif login_check == defaults.AUTH_WARN:
form.make_captcha_visible()
#captcha_required = True
if can_login and not is_blacklisted and form.is_valid():
if not redirect or ' ' in redirect or (
'//' in redirect and re.match(r'[^\?]*//', redirect)):
redirect = settings.LOGIN_REDIRECT_URL
if form.cleaned_data['is_remember']:
request.session.set_expiry(1209600)
session_copy = dict(
item for item in request.session.iteritems()
if not item[0].startswith('_'))
auth_login(request, form.get_user())
redirect = self._get_redirect(request)
for key, value in session_copy.iteritems():
request.session[key] = value
request.session[
defaults.PASSWORD_HASH_KEY] = request.user.password
if request.session.test_cookie_worked():
request.session.delete_test_cookie()
status = 'ok'
else:
if not form.is_valid():
message = form.errors.as_text()
elif not can_login or is_blacklisted:
message = _('Login is disabled or your account is banned.')
else:
if redirect:
request.session[REDIRECT_FIELD_NAME] = redirect
request.session['newuser-next'] = redirect
form = LoginForm(request)
return dict(
status=status, message=unicode(message), redirect=redirect,
form=form, REGISTRATION_ENABLED=defaults.REGISTRATION_ENABLED)
def register(self, request):
status = 'error'
message = ''
real_ip = request.META.get('REMOTE_ADDR')
is_blacklisted = real_ip and models.BlacklistedIP.objects.filter(
ip=real_ip).exists()
if request.user.is_authenticated():
redirect = request.REQUEST.get(
REDIRECT_FIELD_NAME, request.session.get(REDIRECT_FIELD_NAME))
try:
del request.session[REDIRECT_FIELD_NAME]
except KeyError:
pass
if callable(defaults.DEFAULT_PROFILE_URL):
user_redirect_uri = defaults.DEFAULT_PROFILE_URL(request.user)
else:
user_redirect_uri = defaults.DEFAULT_PROFILE_URL
redirect = redirect or user_redirect_uri
return dict(status='ok',
message=unicode(message),
redirect=redirect,
REGISTRATION_ENABLED=defaults.REGISTRATION_ENABLED)
elif request.method == "POST":
form = load_module(CUSTOM_USER_SIGNUP_FORM)(request.POST)
redirect = reverse('profile:public:success-signup')
if not is_blacklisted and form.is_valid():
status = 'ok'
new_profile = form.save(
request=request, realhost=request.get_host(),
next_url=request.session.get(REDIRECT_FIELD_NAME, '/'))
request.session['profile_id'] = new_profile.pk
request.session['profile_email'] = new_profile.email
else:
form = load_module(CUSTOM_USER_SIGNUP_FORM)()
request.session.set_test_cookie()
# Display the login form and handle the login action.
redirect = request.REQUEST.get(REDIRECT_FIELD_NAME, '/')
request.session[REDIRECT_FIELD_NAME] = redirect
return dict(
status=status, message=unicode(message), redirect=redirect,
form=form, REGISTRATION_ENABLED=defaults.REGISTRATION_ENABLED, errors=str(form.errors))
def login_or_register(self, request):
status = 'error'
message = ''
redirect = None
can_login = True
action = None
real_ip = request.META.get('REMOTE_ADDR')
is_blacklisted = real_ip and models.BlacklistedIP.objects.filter(
ip=real_ip).exists()
if request.user.is_authenticated():
redirect = self._get_redirect(request)
return dict(
status='ok',
action='authenticated',
message=unicode(_('User is signed in already')),
redirect=redirect)
elif 'register' in request.POST:
action = 'register'
register_form = load_module(CUSTOM_USER_SIGNUP_FORM)(request.POST)
login_form = LoginForm(request)
if not is_blacklisted and register_form.is_valid():
status = 'ok'
new_profile = register_form.save(
request=request, realhost=request.get_host(),
next_url=request.session.get(REDIRECT_FIELD_NAME, '/'))
request.session['profile_id'] = new_profile.pk
request.session['profile_email'] = new_profile.email
elif 'login' in request.POST:
action = 'login'
register_form = load_module(CUSTOM_USER_SIGNUP_FORM)()
login_form = LoginForm(data=request.POST)
# Brute force check.
username = request.POST.get('username')
login_check = self.check_login(
username, real_ip)
if login_check == defaults.AUTH_DISALLOW:
can_login = False
elif login_check == defaults.AUTH_WARN:
login_form.make_captcha_visible()
#captcha_required = True
if can_login and not is_blacklisted and login_form.is_valid():
if not redirect or ' ' in redirect or (
'//' in redirect and re.match(r'[^\?]*//', redirect)):
redirect = settings.LOGIN_REDIRECT_URL
if login_form.cleaned_data['is_remember']:
request.session.set_expiry(1209600)
session_copy = dict(
item for item in request.session.iteritems()
if not item[0].startswith('_'))
auth_login(request, login_form.get_user())
redirect = self._get_redirect(request)
for key, value in session_copy.iteritems():
request.session[key] = value
request.session[
defaults.PASSWORD_HASH_KEY] = request.user.password
if request.session.test_cookie_worked():
request.session.delete_test_cookie()
status = 'ok'
else:
if not login_form.is_valid():
message = login_form.errors.as_text()
elif not can_login or is_blacklisted:
message = _('Login is disabled or your account is banned.')
else:
login_form = LoginForm(request)
register_form = load_module(CUSTOM_USER_SIGNUP_FORM)()
request.session.set_test_cookie()
action = None
if not redirect:
redirect = request.REQUEST.get(REDIRECT_FIELD_NAME, '/')
request.session[REDIRECT_FIELD_NAME] = redirect
return dict(
status=status, message=unicode(message), redirect=redirect,
login_form=login_form, register_form=register_form,
REGISTRATION_ENABLED=defaults.REGISTRATION_ENABLED, action=action)
def restore(self, request):
message = ''
RestorePasswordForm = load_module(defaults.RESTORE_PASSWORD_FORM)
if request.method == 'POST':
form = RestorePasswordForm(request.POST)
if form.is_valid():
profile = Profile.objects.get(
email__iexact=form.cleaned_data['email'])
if 'send_pass' in request.POST:
newpass = generate_random_password()
profile.set_password(newpass)
profile.save()
profile.email_forgotten_passwd(newpass)
return dict(
form=form,
message=_(
'New password has been sent to you email address'))
elif 'resend_activation' in request.POST:
if profile.check_activation():
return dict(
form=form,
message=_('Profile has been already activated'))
else:
try:
profile.generate_activation_key(
realhost=request.get_host(),
next_url=request.path)
message = _(
'New activation key has been sent to your '
'email address.')
return dict(form=form, message=message)
except Exception:
return dict(
form=form,
message=_(
'Unable to send activation key, please '
'try again later'))
else:
message = form.errors.as_text()
else:
if request.user.is_authenticated():
form = RestorePasswordForm(
initial={'email': request.user.email})
else:
form = RestorePasswordForm()
return dict(form=form, message=message)
def get_profiles(self, **kwargs):
return Profile.objects.filter(**kwargs)
def get_profile(self, **kwargs):
return Profile.objects.get(**kwargs)
def get_statistic(self, from_date=None, to_date=None, type='default',
date_trunc='day', where=None):
from statistic.utils import make_stats
if type == 'default':
return make_stats(
'date_joined', date_trunc, 'auth_user', from_date, to_date,
where)
elif type == 'actived':
where.append('is_active = true')
return make_stats(
'date_joined', date_trunc, 'auth_user', from_date, to_date,
where)
elif type == 'commented':
where.append(
'exists (select 1 from cm_comment '
'where cm_comment.author_id = auth_user.id)')
return make_stats(
'date_joined', date_trunc, 'auth_user', from_date, to_date,
where)
elif type == 'orders':
where.append(
'exists (select 1 from sh_order '
'where sh_order.profile_id = auth_user.id)')
return make_stats(
'date_joined', date_trunc, 'auth_user', from_date, to_date,
where)
elif type == 'orders_and_commented':
where.extend([
'exists (select 1 from cm_comment '
'where cm_comment.author_id = auth_user.id)',
'exists (select 1 from sh_order '
'where sh_order.profile_id = auth_user.id)'])
return make_stats(
'date_joined', date_trunc, 'auth_user', from_date, to_date,
where)
elif type == 'pins_activated':
where.append(
'sh_prepaid_content_provider.is_activated = true')
return make_stats(
'sh_prepaid_content_provider.activated_from', date_trunc,
'auth_user', from_date, to_date, where,
join=('sh_prepaid_content_provider on '
'auth_user.id = sh_prepaid_content_provider.profile_id')
)
elif type == 'pins_not_activated':
where.append('sh_prepaid_content_provider.is_activated = false')
return make_stats(
'sh_prepaid_content_provider.activated_from', date_trunc,
'auth_user', from_date, to_date, where,
join=('sh_prepaid_content_provider on '
'auth_user.id = sh_prepaid_content_provider.profile_id'))
def check_login(self, username, add_ip=None):
if not defaults.BRUTEFORCE_CHECK:
return defaults.AUTH_ALLOW
logins = cache.get('login-attempts:%s' % username)
if logins is None:
logins = []
result = defaults.AUTH_DISALLOW
now = datetime.datetime.now()
# Exclude expired attempts made beford hard limit period.
hard_time = datetime.datetime.now() - datetime.timedelta(
defaults.BRUTEFORCE_HARD_PERIOD)
logins = [login for login in logins if login[0] >= hard_time]
if add_ip:
logins.append((now, add_ip))
if len(logins) < defaults.BRUTEFORCE_HARD_LIMIT:
# Number of attempts is lower than hard limit.
soft_time = now - datetime.timedelta(
defaults.BRUTEFORCE_SOFT_PERIOD)
soft_cnt = 0
for time, ip in logins:
if time > soft_time:
soft_cnt += 1
if soft_cnt >= defaults.BRUTEFORCE_SOFT_LIMIT:
# Numer of attempts is higher than soft limit,
# but lower than hard limit.
result = defaults.AUTH_WARN
break
else:
result = defaults.AUTH_ALLOW
cache.set('login-attempts:%s' % username, logins)
return result