serviceform/serviceform/utils.py
# -*- coding: utf-8 -*-
# (c) 2017 Tuomas Airaksinen
#
# This file is part of Serviceform.
#
# Serviceform is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Serviceform is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Serviceform. If not, see <http://www.gnu.org/licenses/>.
import re
import signal
import uuid
import random
import string
import logging
from itertools import chain
from typing import Match, Optional, TYPE_CHECKING, Iterable, Union
if TYPE_CHECKING:
from .models import ServiceForm, Participant, ResponsibilityPerson
from .models.serviceform import AbstractServiceFormItem
from colorful.forms import RGB_REGEX
from django.contrib import messages
from django.core.cache import caches
from django.http import HttpRequest, HttpResponse
from django.shortcuts import redirect
from django.utils.safestring import mark_safe
from collections import defaultdict
from colour import Color
from django.core.exceptions import PermissionDenied
from django.utils.translation import ugettext_lazy as _
from django.utils import translation
from django.conf import settings
from django.db import transaction
logger = logging.getLogger(__name__)
class DelayedKeyboardInterrupt(object):
def __init__(self):
self.will_interrupt = False
signal.signal(signal.SIGINT, self.handler)
def __enter__(self):
return self
def handler(self, signum, frame):
self.will_interrupt = True
def __exit__(self, type, value, traceback):
signal.signal(signal.SIGINT, signal.SIG_DFL)
if self.will_interrupt:
raise KeyboardInterrupt()
def _get_ident(request: HttpRequest) -> str:
service_form = getattr(request, 'service_form', '')
if not service_form:
logger.error('No serviceform in _get_ident!?')
if request.user.pk:
ident = 'user_%s' % request.user.pk
else:
resp = get_responsible(request)
ident = 'responsible_%s' % resp.pk if resp else 'anonymous'
service_form = resp.form if resp else None
return f"{ident}_serviceform_{getattr(service_form, 'pk', '')}"
class RevisionOptions:
CURRENT = '__current'
ALL = '__all'
settings_defaults = {'revision': RevisionOptions.CURRENT}
def get_report_settings(request: HttpRequest, parameter: str=None) -> Union[dict, str]:
cache = caches['persistent']
report_settings = cache.get('settings_for_%s' % _get_ident(request), settings_defaults.copy())
if parameter:
return report_settings.get(parameter)
return report_settings
def set_report_settings(request: HttpRequest, report_settings: dict) -> None:
cache = caches['persistent']
cache.set('settings_for_%s' % _get_ident(request), report_settings)
def user_has_serviceform_permission(user: settings.AUTH_USER_MODEL, service_form: 'ServiceForm',
raise_permissiondenied: bool=True):
if user.has_perm('serviceform.can_access_serviceform', service_form):
return True
else:
if raise_permissiondenied:
raise PermissionDenied(_('User is not allowed to access document'))
else:
return False
_participants = {}
def get_participant(_id: int) -> 'Participant':
p = _participants.get(_id)
if p is None:
logger.error('Participant %d was not in cache!', _id)
return p
def fetch_participants(service_form: 'ServiceForm', revision_name: str) -> None:
global _participants
from .models import Participant
is_all_revisions = revision_name == RevisionOptions.ALL
is_current_revision = revision_name == RevisionOptions.CURRENT
qs = Participant.objects.prefetch_related('participantlog_set__written_by')
if is_all_revisions:
qs = qs.select_related('form_revision')
participants = qs.filter(form_revision__form=service_form).distinct()
elif is_current_revision:
participants = qs.filter(form_revision=service_form.current_revision)
else:
participants = qs.filter(form_revision__name=revision_name)
_participants = {itm.pk: itm for itm in participants}
return
class ClearParticipantCacheMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
return self.get_response(request)
def process_request(self, request: HttpRequest):
_participants.clear()
_responsible_counts.clear()
class InvalidateCachalotAfterEachRequestMiddleware(object):
"""
This middleware clears the cachalot cache at the end of every request.
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
return self.get_response(request)
def process_exception(self, request: HttpRequest, exception: Exception):
if 'cachalot' in settings.INSTALLED_APPS:
cachalot_cache = settings.CACHALOT_CACHE
caches[cachalot_cache].clear()
def process_response(self, request: HttpRequest, response: HttpResponse):
if 'cachalot' in settings.INSTALLED_APPS:
cachalot_cache = settings.CACHALOT_CACHE
caches[cachalot_cache].clear()
return response
_responsible_counts = defaultdict(int)
def init_serviceform_counters(service_form: 'ServiceForm', all_responsibles: bool=True) -> None:
"""
Initializes counters and collects responsibles from subitems
:return:
"""
activity_count = 0
cat1_counter = 0
_responsible_counts.clear()
def _add_responsible(responsibles: 'Iterable[ResponsibilityPerson]',
*targets: 'AbstractServiceFormItem',
resp_count: bool=False) -> None:
if resp_count:
for r in {resp for target in targets for resp in target.responsibles.all() if resp}:
_responsible_counts[r.pk] += 1
for resp in responsibles:
for t in targets:
t._responsibles.add(resp)
for cat1 in service_form.sub_items:
cat2_counter = 0
cat1._counter = cat1_counter
cat1_counter += 1
cat1._responsibles = set(cat1.responsibles.all())
for cat2 in cat1.sub_items:
cat2._counter = cat2_counter
cat2_counter += 1
_add_responsible(cat2.responsibles.all(), cat1, cat2)
if all_responsibles:
cat2._responsibles.update(set(cat1.responsibles.all()))
for activity in cat2.sub_items:
if not activity.skip_numbering:
activity_count += 1
activity._counter = activity_count
choice_counter = 0
_add_responsible(activity.responsibles.all(), cat1, cat2, activity,
resp_count=True)
if all_responsibles:
activity._responsibles.update(
set(cat1.responsibles.all()) | set(cat2.responsibles.all()))
for choice in activity.sub_items:
if not choice.skip_numbering:
choice_counter += 1
choice._counter = choice_counter
_add_responsible(choice.responsibles.all(), cat1, cat2, activity, choice,
resp_count=True)
if all_responsibles:
choice._responsibles.update(set(activity.responsibles.all()) |
set(cat1.responsibles.all()) |
set(cat2.responsibles.all()))
def shuffle_person_data(service_form: 'ServiceForm') -> None:
from .models import Participant, ResponsibilityPerson, Question
letters = len(string.ascii_letters)
forenames = set()
surnames = set()
participants = Participant.objects.filter(form_revision__form=service_form)
responsibles = ResponsibilityPerson.objects.filter(form=service_form)
for p in chain(participants, responsibles):
for n in p.forenames.split(' '):
if n:
forenames.add(n.title())
for n in p.surname.split('-'):
if n:
surnames.add(n.title())
def shuffle(m, *attrs):
for a in attrs:
old = getattr(m, a)
if old:
new = [string.ascii_letters[random.randrange(0, letters)] for i in range(len(old))]
for i in range(len(old)):
if old[i] in '@ .,':
new[i] = old[i]
setattr(m, a, ''.join(new))
m.save()
def shuffle_question(q):
if q.question.answer_type in [Question.ANSWER_LONG_TEXT,
Question.ANSWER_SHORT_TEXT]:
shuffle(q, 'answer')
forenames = tuple(forenames)
surnames = tuple(surnames)
def valid_email(s):
return re.sub('[^a-zA-Z@\.-]', '', s)
def shuffle_contact_details(p):
p.forenames = ' '.join(
forenames[random.randrange(0, len(forenames))] for i in range(random.randint(1, 2)))
p.surname = '-'.join(
surnames[random.randrange(0, len(surnames))] for i in range(random.randint(1, 2)))
if p.email:
p.email = valid_email(
'%s.%s@email.com' % (p.forenames.replace(' ', '.').lower(), p.surname.lower()))
if p.street_address:
p.street_address = 'Kontaktikatu %d' % random.randint(0, 99)
if p.postal_code:
p.postal_code = ''.join('%s' % random.randint(0, 9) for i in range(5))
if p.phone_number:
p.phone_number = ''.join('%s' % random.randint(0, 9) for i in range(9))
if p.city:
p.city = 'Hemilä'
p.save()
for p in chain(participants, responsibles):
shuffle_contact_details(p)
for p in participants:
for a in p.activities:
shuffle(a, 'additional_info')
for c in a.choices:
shuffle(c, 'additional_info')
for q in p.questions:
shuffle_question(q)
def count_for_responsible(resp: 'ResponsibilityPerson') -> int:
return _responsible_counts[resp.pk]
def generate_uuid() -> str:
return str(uuid.uuid4())
ColorStr = str # TODO: Type validation against RGB_REGEX.pattern?
def darker_color(color: ColorStr) -> ColorStr:
c = Color(color)
h, s, l = c.get_hsl()
l_new = l * 0.75 # l - (1.0-l)*0.5
c.set_hsl((h, s, l_new))
return c.get_hex()
def lighter_color(color: ColorStr) -> ColorStr:
c = Color(color)
h, s, l = c.get_hsl()
l_new = l + (1.0 - l) * 0.5
c.set_hsl((h, s, l_new))
return c.get_hex()
def not_black(color: ColorStr) -> Optional[ColorStr]:
return color if color != '#000000' else None
def color_for_count(count: int) -> ColorStr:
if not count:
return Color('white').get_hex()
c = Color('green')
max = 10
count_real = min(max, count)
c.hue = (max - count_real) / max * 0.33
c.saturation = 1.0
c.luminance = 0.7
return c.get_hex()
def update_serviceform_default_emails() -> None:
from .models import ServiceForm
translation.activate('fi')
for s in ServiceForm.objects.all():
s.create_email_templates()
def clean_session(request: HttpRequest):
keys = ['max_category', 'authenticated_participant', 'authenticated_responsibility',
'verification_sent']
for key in keys:
request.session.pop(key, None)
# request.session.clear()
def get_responsible(request: HttpRequest):
from .models import ResponsibilityPerson
responsible_pk = request.session.get('authenticated_responsibility')
return ResponsibilityPerson.objects.filter(pk=responsible_pk).first()
def safe_join(sep: str, args_generator: Iterable[str]):
sep = mark_safe(sep)
result = mark_safe('')
args = list(args_generator)
for a in args[:-1]:
result += a
result += sep
result += args[-1]
return result
def expire_auth_link(request: HttpRequest, obj: 'Union[Participant, ResponsibilityPerson]') \
-> HttpResponse:
"""
:param request: WSGI request
:param obj: either Participant or ResponsibilityPerson
:return: HttpResponse
"""
obj.resend_auth_link()
messages.info(request,
_('Your authentication URL was expired. New link has been sent to {}').format(
obj.email))
return redirect('password_login', obj.form.slug)
def encode(number: int) -> str:
letters = settings.CODE_LETTERS
result = ''.join(reversed('%o' % (100000 + number)))
for ii in range(0, 9, 2):
result = result.replace(str(ii), letters[ii // 2], 1)
return result
def decode(number: str) -> Optional[int]:
letters = settings.CODE_LETTERS
if number is None:
return None
for ii in range(0, 9, 2):
number = number.replace(letters[ii // 2], str(ii))
try:
result = int(''.join(reversed(number)), 8)
except ValueError:
return None
result -= 100000
if result < 0:
result = None
return result