
View on GitHub


4 hrs
Test Coverage
from django.urls import re_path
from django.contrib import admin
from django.db.models import Count
from django.http import HttpResponseRedirect
from django.urls import reverse
from django.utils.html import format_html
from django.utils.safestring import mark_safe
from import_export import fields, resources
from import_export.admin import ExportActionMixin, ExportMixin
from parasolr.django import SolrClient

from ppa.archive.models import (
from ppa.archive.views import ImportView
from ppa.archive.templatetags.ppa_tags import hathi_page_url, gale_page_url

# import/export resource
class DigitizedWorkResource(resources.ModelResource):
    # declare export fields to customize output
    # - get display value for choice fields
    item_type = fields.Field(
    source = fields.Field(
    status = fields.Field(

    class Meta:
        model = DigitizedWork
        exclude = ("protected_fields",)
        export_order = (
            "item_type",  # display
            "collections",  # multiple, names
        widgets = {
            # customize many-to-many output for collections
            "collections": {"separator": "; ", "field": "name"},
            # output cluster id instead of pk
            "cluster": {"field": "cluster_id"},

    def get_queryset(self):
        # prefetch related object to make download more efficient
        return super().get_queryset().prefetch_related("collections", "cluster")

class DigitizedWorkAdmin(ExportActionMixin, ExportMixin, admin.ModelAdmin):
    resource_class = DigitizedWorkResource  # resource for export

    # enable "save as new" button to copy and create a new record
    save_as = True

    list_display = (
    fields = (
        ("source", "source_id"),
        ("pages_orig", "pages_digital"),
    # fields that are always read only
    readonly_fields = ("added", "updated", "protected_fields")
    # fields that are read only for HathiTrust records
    hathi_readonly_fields = (

    search_fields = (
    filter_horizontal = ("collections",)
    autocomplete_fields = ["cluster"]
    # date_hierarchy = 'added'  # is this useful?
    list_filter = ["collections", "status", "source", "item_type", "cluster"]
    actions = ["add_works_to_collection", "suppress_works"]

    def get_readonly_fields(self, request, obj=None):
        Determine read only fields based on item source, to prevent
        editing of HathiTrust fields that should not be changed.
        if obj and obj.source == DigitizedWork.HATHI:
            return self.hathi_readonly_fields + self.readonly_fields

        if request.POST.get("_saveasnew"):
            # protected fields must not be read-only in order
            # to preserve/copy when saving as new
            return ("added", "updated")

        return self.readonly_fields

    def list_collections(self, obj):
        """Return a list of :class:ppa.archive.models.Collection object names
        as a comma separated list to populate a change_list column.
        return ", ".join([ for coll in obj.collections.all().order_by("name")])

    list_collections.short_description = "Collections"

    def source_link(self, obj):
        """source id as an html link to source record, when source url is available"""
        if not obj.source_url:
            return obj.source_id

        source_url = obj.source_url
        # hathi/gale excerpt links should include first page
        if obj.pages_digital:
            if obj.source == DigitizedWork.HATHI:
                # hathi page url method requires source id
                source_url = hathi_page_url(obj.source_id, obj.first_page_digital())
            if obj.source == DigitizedWork.GALE:
                # gale page url method requires source url
                source_url = gale_page_url(obj.source_url, obj.first_page_digital())
        return mark_safe(
            '<a href="%s" target="_blank">%s</a>' % (source_url, obj.source_id)

    source_link.short_description = "Source id"
    source_link.admin_order_field = "source_id"

    def change_view(self, request, object_id, form_url="", extra_context=None):
        # customize behavior when copying a record and saving as new
        if request.POST.get("_saveasnew"):
            # if source is unset, this means we are loading the "save as new"
            # form for a hathitrust record
            if not request.POST.get("source"):
                # customize save as new field contents
                instance = DigitizedWork.objects.get(pk=object_id)
                # make a copy of the querydict so we can update it
                post_params = request.POST.copy()
                # read-only fields should be preserved
                post_params["source"] = instance.source
                post_params["source_id"] = instance.source_id
                post_params["source_url"] = instance.source_url
                post_params["record_id"] = instance.record_id
                # copy protected wield flags in simple string format
                ] = instance.protected_fields.to_simple_str()

                # clear out fields that should be changed when excerpting
                clear_fields = [
                    # "page_count",  # read-only, does not automatically propagate
                for field in clear_fields:
                        del post_params[field]
                    except KeyError:

                # update request with our modified post parameters
                request.POST = post_params

        return super().change_view(

    def save_model(self, request, obj, form, change):
        """Note any fields in the protected list that have been changed in
        the admin and preserve in database."""

        # If new object, created from scratch, nothing to track and preserve
        # or if item is not a HathiTrust item, save and return
        if not change or obj.source != DigitizedWork.HATHI:
            super().save_model(request, obj, form, change)
        # has_changes only works for objects that have been changed on their
        # instance -- obj is a new instance *not* a modified one,
        # so compare against database
        db_obj = DigitizedWork.objects.get(
        changed_fields = obj.compare_protected_fields(db_obj)
        # iterate over changed fields and 'append' (OR) to flags
        for field in changed_fields:
            obj.protected_fields = obj.protected_fields | ProtectedWorkFieldFlags(field)
        super().save_model(request, obj, form, change)

    def save_related(self, request, form, formsets, change):
        """Ensure reindex is called when admin form is saved"""
        # m2m relations are handled separately by the admin form so the standard
        # save override will not help as the m2m relationship are not yet set when
        # model's save method is called. See the doc string for save_related
        # at

        super(DigitizedWorkAdmin, self).save_related(request, form, formsets, change)
        digwork = DigitizedWork.objects.get(

    def add_works_to_collection(self, request, queryset):
        Bulk add a queryset of :class:`ppa.archive.DigitizedWork` to
        a :class:`ppa.archive.Collection`.
        # Uses POST from admin rather than a database query to get the pks
        # per the suggested practices in Django documentation
        selected = list(queryset.order_by("id").values_list("id", flat=True))
        # encode the filter querystring so that the bulk add view can return
        # the user to the same admin list view upon completion.
        request.session["collection-add-filters"] = request.GET
        request.session["collection-add-ids"] = selected
        return HttpResponseRedirect(reverse("archive:add-to-collection"))

    add_works_to_collection.short_description = (
        "Add selected digitized works to collections"
    add_works_to_collection.allowed_permissions = ("change",)

    def suppress_works(self, request, queryset):
        """Set status to suppressed for every item in the queryset
        that is not already suppressed."""
        non_suppressed = queryset.exclude(status=DigitizedWork.SUPPRESSED)
        # save the list of ids being suppressed to update the index after
        ids_to_suppress = list(non_suppressed.values_list("source_id", flat=True))
        # change status in the database
        updated = non_suppressed.update(status=DigitizedWork.SUPPRESSED)
        # queryset.update does not trigger save signals;
        # clear suppressed page + work content from the index
        # delete all pages and works associated with any of these source ids
        if ids_to_suppress:
            solr = SolrClient()
                % " OR ".join(['"%s"' % val for val in ids_to_suppress])
        # report on what was done, including any skipped
        skipped = ""
        qs_total = queryset.count()
        if qs_total != updated:
            skipped = " Skipped %d (already suppressed)." % (qs_total - updated)
            "Suppressed %d digitized work%s.%s"
            % (updated, "" if updated == 1 else "s", skipped),

    suppress_works.short_description = "Suppress selected digitized works"

    def get_urls(self):
        """Add url for import admin form"""
        urls = super(DigitizedWorkAdmin, self).get_urls()
        my_urls = [
        return my_urls + urls

class CollectionAdmin(admin.ModelAdmin):
    list_display = ("name", "exclude")
    list_editable = ("exclude",)

class DigitizedWorkInline(admin.TabularInline):
    model = DigitizedWork
    fields = ("source", "source_id", "title", "subtitle", "author")
    extra = 0

class ClusterAdmin(admin.ModelAdmin):
    list_display = ("cluster_id", "works")
    digwork_admin_url = "admin:archive_digitizedwork_changelist"
    inlines = [
    search_fields = ("cluster_id",)

    def get_queryset(self, request):
        # The annotations we use for document count on the list view
        # make the search too slow for autocomplete.
        # Reset to original, unannotated queryset *only* for autocomplete
        qs = super().get_queryset(request)
        if request and request.path == "/admin/autocomplete/":
            # return without annotations
            return qs
        # otherwise, annotate with counts
        return qs.annotate(Count("digitizedwork"))

        description="# works in this cluster",
    def works(self, obj):
        """Custom property to display number of works in a cluster and link
        to a filtered view of the digitized works list."""
        return format_html(
            '<a href="{0}?cluster__id__exact={1!s}">{2}</a>',
        ), DigitizedWorkAdmin), CollectionAdmin), ClusterAdmin)