data_capture/admin.py

Summary

Maintainability
B
6 hrs
Test Coverage
A
100%
import os
import re
from django.contrib import admin
from django.http import HttpResponse, HttpResponseForbidden
from django.db import models, transaction
from django import forms
from django.core.urlresolvers import reverse
from django.utils.safestring import mark_safe
from django.utils.html import format_html
from django.utils.text import slugify
from django.contrib import messages
from django.contrib.auth.models import User
from django.contrib.auth.admin import UserAdmin
from django.contrib.auth.forms import UserChangeForm
from django.conf.urls import url
from django.shortcuts import get_object_or_404


from frontend.upload import DEFAULT_FILE_EXTENSIONS
from .management.commands.initgroups import VIEW_ATTEMPT_PERMISSION
from . import email
from .schedules import registry
from .models import (SubmittedPriceList, SubmittedPriceListRow,
                     AttemptedPriceListSubmission)
from .templatetags.data_capture import tz_timestamp


class UniqueEmailFormMixin:
    '''
    A mixin that enforces the uniqueness, relative to the User model,
    of an 'email' field in the form it's mixed-in with.

    Taken from https://gist.github.com/gregplaysguitar/1184995.
    '''

    def clean_email(self):
        qs = User.objects.filter(email__iexact=self.cleaned_data['email'])
        if self.instance:
            qs = qs.exclude(pk=self.instance.pk)
        if qs.count():
            raise forms.ValidationError(
                'That email address is already in use.'
            )
        else:
            return self.cleaned_data['email']


class CustomUserCreationForm(forms.ModelForm, UniqueEmailFormMixin):
    '''
    A substitute for django.contrib.auth.forms.UserCreationForm which
    doesn't ask for information about new users that's irrelevant
    to how CALC works.
    '''

    email = forms.EmailField(required=True)

    class Meta:
        model = User
        fields = ('email',)

    def generate_username(self, email, max_attempts=100):
        '''
        Generate a unique username based on the given email address
        by slugifying the first several characters of the username
        part of the email. If needed, a number is added at the end
        to avoid conflicts with existing usernames.
        '''

        basename = slugify(email.split('@')[0])[:15]
        for i in range(max_attempts):
            if i == 0:
                username = basename
            else:
                username = '{}{}'.format(basename, i)
            if not User.objects.filter(username=username).exists():
                return username
        raise Exception(
            'unable to generate username for {} after {} attempts'.format(
                email,
                max_attempts
            )
        )

    def clean(self):
        email = self.cleaned_data.get('email')

        if email:
            self.cleaned_data['username'] = self.generate_username(email)

        return self.cleaned_data

    def save(self, commit=True):
        user = super().save(commit=False)
        user.username = self.cleaned_data['username']
        if commit:
            user.save()
        return user


class CustomUserChangeForm(UserChangeForm, UniqueEmailFormMixin):
    email = forms.EmailField(required=True)


class CustomUserAdmin(UserAdmin):
    '''
    Simplified user admin for non-superusers, which also prevents such
    users from upgrading themselves to superuser.
    '''

    form = CustomUserChangeForm

    add_form_template = 'admin/data_capture/add_user_form.html'

    add_form = CustomUserCreationForm

    add_fieldsets = (
        (None, {
            'classes': ('wide',),
            'fields': ('email',),
        }),
    )

    non_superuser_fieldsets = (
        (None, {'fields': (
            'username',
            # Even though we don't need/use the password field, showing it
            # is apparently required to make submitting changes work.
            'password'
        )}),
        ('Personal info', {'fields': ('first_name', 'last_name', 'email')}),
        ('Permissions', {'fields': ('is_active', 'is_staff', 'groups')}),
        ('Important dates', {'fields': ('last_login', 'date_joined')}),
    )

    list_display = ('email', 'is_staff')

    def get_queryset(self, request):
        qs = super().get_queryset(request)
        if not request.user.is_superuser:
            qs = qs.filter(is_superuser=False)
        return qs

    def get_fieldsets(self, request, obj=None):
        if obj is not None and not request.user.is_superuser:
            return self.non_superuser_fieldsets
        return super().get_fieldsets(request, obj)


admin.site.unregister(User)
admin.site.register(User, CustomUserAdmin)


class SubmittedPriceListRowInline(admin.TabularInline):
    model = SubmittedPriceListRow

    can_delete = False

    fields = (
        'labor_category',
        'education_level',
        'min_years_experience',
        'base_year_rate',
        'sin',
        'is_muted',
    )

    readonly_fields = ()

    formfield_overrides = {
        models.TextField: {'widget': forms.TextInput}
    }

    def has_add_permission(self, request):
        return False

    def get_readonly_fields(self, request, obj=None):
        if obj and obj.status is SubmittedPriceList.STATUS_APPROVED:
            return self.fields
        return self.readonly_fields


@transaction.atomic
def approve(modeladmin, request, queryset):
    not_approved = queryset.exclude(status=SubmittedPriceList.STATUS_APPROVED)
    count = not_approved.count()
    for price_list in not_approved:
        price_list.approve(request.user)
        email.price_list_approved(price_list)

    messages.add_message(
        request,
        messages.INFO,
        '{} price list(s) have been approved and added to CALC.'.format(
            count
        )
    )


approve.short_description = (
    'Approve selected price lists (add their data to CALC)'
)


@transaction.atomic
def retire(modeladmin, request, queryset):
    approved = queryset.filter(status=SubmittedPriceList.STATUS_APPROVED)
    count = approved.count()
    for price_list in approved:
        price_list.retire(request.user)
        email.price_list_retired(price_list)
    messages.add_message(
        request,
        messages.INFO,
        '{} price list(s) have been retired and removed from CALC.'.format(
            count
        )
    )


retire.short_description = (
    'Retire selected price lists (remove their data from CALC)'
)


@transaction.atomic
def reject(modeladmin, request, queryset):
    unreviewed = queryset.filter(status=SubmittedPriceList.STATUS_UNREVIEWED)
    count = unreviewed.count()
    for price_list in unreviewed:
        price_list.reject(request.user)
        email.price_list_rejected(price_list)
    messages.add_message(
        request,
        messages.INFO,
        '{} price list(s) have been rejected.'.format(count)
    )


reject.short_description = (
    'Reject selected price lists'
)


class UndeletableModelAdmin(admin.ModelAdmin):
    '''
    Represents a model admin UI that offers no way of deleting
    instances. This is useful to ensure accidental data loss, especially
    when we want to keep it around for historical/data provenance purposes.
    '''

    # http://stackoverflow.com/a/25813184/2422398
    def get_actions(self, request):
        actions = super().get_actions(request)
        del actions['delete_selected']
        return actions

    def has_delete_permission(self, request, obj=None):
        return False


def short_description(desc: str):
    def decorator(func):
        func.short_description = desc
        return func

    return decorator


class BaseSubmittedPriceListAdmin(UndeletableModelAdmin):
    list_display = ('contract_number', 'vendor_name', 'submitter',
                    'tz_created_at', 'status_changed_by_email',
                    'tz_status_changed_at')

    fields = (
        'contract_number',
        'vendor_name',
        'is_small_business',
        'schedule_title',
        'contractor_site',
        'contract_start',
        'contract_end',
        'escalation_rate',
        'submitter',
        'tz_created_at',
        'tz_updated_at',
        'current_status',
        'tz_status_changed_at',
        'status_changed_by_email',
    )

    readonly_fields = (
        'schedule_title',
        'tz_created_at',
        'tz_updated_at',
        'current_status',
        'submitter',
        'tz_status_changed_at',
        'status_changed_by_email',
    )

    inlines = [
        SubmittedPriceListRowInline
    ]

    @short_description('Status changed by')
    def status_changed_by_email(self, instance):
        '''
        custom field to show the email address of the status_changed_by user
        '''
        return instance.status_changed_by.email

    def current_status(self, instance):
        content = instance.get_status_display() + "<br>"
        if instance.status is SubmittedPriceList.STATUS_APPROVED:
            content += ("<span style=\"color: green\">"
                        "This price list has been approved, so its data is "
                        "now in CALC. To retire it, you will need to use "
                        "the 'Retire selected price lists' action from the "
                        "<a href=\"..\">list view</a>. Note also that in "
                        "order to edit the fields in this price list, you "
                        "will first need to retire it.")
        else:
            content += ("<span style=\"color: red\">"
                        "This price list is not currently approved, so its "
                        "data is not in CALC. To approve it, you will need to "
                        "use the 'Approve selected price lists' action from "
                        "the <a href=\"..\">list view</a>.")

        return mark_safe(content)  # nosec

    @short_description('Created at')
    def tz_created_at(self, instance):
        return tz_timestamp(instance.created_at)

    @short_description('Updated at')
    def tz_updated_at(self, instance):
        return tz_timestamp(instance.updated_at)

    @short_description('Status changed at')
    def tz_status_changed_at(self, instance):
        return tz_timestamp(instance.status_changed_at)

    @short_description('Schedule')
    def schedule_title(self, instance):
        return registry.get_class(instance.schedule).title

    def has_add_permission(self, request):
        return False

    def get_readonly_fields(self, request, obj=None):
        if obj and obj.status is SubmittedPriceList.STATUS_APPROVED:
            return self.fields
        return self.readonly_fields


class ApprovedPriceList(SubmittedPriceList):
    class Meta:
        proxy = True


@admin.register(ApprovedPriceList)
class ApprovedPriceListAdmin(BaseSubmittedPriceListAdmin):
    actions = [retire]

    def get_queryset(self, request):
        qs = super().get_queryset(request)
        return qs.filter(status=SubmittedPriceList.STATUS_APPROVED)


class UnreviewedPriceList(SubmittedPriceList):
    class Meta:
        proxy = True


@admin.register(UnreviewedPriceList)
class UnreviewedPriceListAdmin(BaseSubmittedPriceListAdmin):
    actions = [approve, reject]

    def get_queryset(self, request):
        qs = super().get_queryset(request)
        return qs.filter(status=SubmittedPriceList.STATUS_UNREVIEWED)


class RejectedPriceList(SubmittedPriceList):
    class Meta:
        proxy = True


@admin.register(RejectedPriceList)
class RejectedPriceListAdmin(BaseSubmittedPriceListAdmin):
    actions = [approve]

    def get_queryset(self, request):
        qs = super().get_queryset(request)
        return qs.filter(status=SubmittedPriceList.STATUS_REJECTED)


class RetiredPriceList(SubmittedPriceList):
    class Meta:
        proxy = True


@admin.register(RetiredPriceList)
class RetiredPriceListAdmin(BaseSubmittedPriceListAdmin):
    actions = [approve]

    def get_queryset(self, request):
        qs = super().get_queryset(request)
        return qs.filter(status=SubmittedPriceList.STATUS_RETIRED)


@admin.register(SubmittedPriceListRow)
class SubmittedPriceListRowAdmin(UndeletableModelAdmin):
    list_display = (
        'contract_number',
        'vendor_name',
        'labor_category',
        'education_level',
        'min_years_experience',
        'base_year_rate',
        'sin',
        'is_muted',
    )

    list_editable = (
        'is_muted',
    )

    def contract_number(self, obj):
        # get status label and derive url from it
        status_display = obj.price_list.get_status_display()
        url = reverse(
            'admin:data_capture_{}pricelist_change'.format(status_display),
            args=(obj.price_list.id,))
        return format_html(
            '<a href="{}">{}</a>',
            url,
            obj.price_list.contract_number
        )

    def get_queryset(self, request):
        qs = super().get_queryset(request)
        return qs.exclude(
            price_list__status=SubmittedPriceList.STATUS_APPROVED)

    def vendor_name(self, obj):
        return obj.price_list.vendor_name

    def has_add_permission(self, request):
        return False


def clean_filename(filename):
    '''
    Attempt to clean the filename so it doesn't contain invalid
    characters or potentially dangerous file extensions.

    Examples:

        >>> clean_filename('boop.exe')
        'boop'

        >>> clean_filename('.csv')
        'csv'

        >>> clean_filename('$.csv')
        'data.csv'

        >>> clean_filename('u$goose.xls')
        'ugoose.xls'
    '''

    basename, ext = os.path.splitext(filename)

    basename = ''.join([
        char for char in basename
        if re.match('[A-Za-z0-9_\-]', char)
    ]) or 'data'

    if ext.lower() not in DEFAULT_FILE_EXTENSIONS:
        ext = ''

    return basename + ext


@admin.register(AttemptedPriceListSubmission)
class AttemptedPriceListSubmissionAdmin(admin.ModelAdmin):
    list_display = (
        'created_at',
        'id',
        'submitter',
        'uploaded_file_name',
        'valid_row_count',
        'invalid_row_count',
    )

    change_form_template = 'admin/data_capture/view_attempted_submission.html'

    def has_add_permission(self, request):
        return False

    def send_uploaded_file(self, request, id):
        if not request.user.has_perm(VIEW_ATTEMPT_PERMISSION):
            return HttpResponseForbidden()

        id = int(id)
        obj = get_object_or_404(AttemptedPriceListSubmission, pk=id)
        response = HttpResponse(obj.uploaded_file.contents.read(),
                                content_type='application/octet-stream')
        response['Content-Disposition'] = 'attachment; filename="{}"'.format(
            clean_filename(obj.uploaded_file_name),
        )
        return response

    def get_urls(self):
        return [
            url(r'^(?P<id>\d+)/download/$',
                self.admin_site.admin_view(self.send_uploaded_file),
                name='data_capture_send_uploaded_file'),
        ] + super().get_urls()