ppa/archive/import_util.py
"""
Import utility classes for creating :mod:`ppa.archives` records from
external sources.
"""
import glob
import logging
import os
import subprocess
import tempfile
from collections import OrderedDict
from datetime import datetime
from json.decoder import JSONDecodeError
from cached_property import cached_property
from django.conf import settings
from django.contrib.admin.models import ADDITION, LogEntry
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from pairtree.pairtree_path import id_to_dirpath
from parasolr.django.signals import IndexableSignalHandler
from ppa.archive import hathi
from ppa.archive.gale import (
GaleAPI,
GaleAPIError,
GaleItemForbidden,
GaleItemNotFound,
MARCRecordNotFound,
get_marc_record,
)
from ppa.archive.models import DigitizedWork, Page
logger = logging.getLogger(__name__)
class DigitizedWorkImporter:
"""Logic for importing content from external sources (e.g. HathiTrust, Gale/ECCO)
to create :class:`~ppa.archive.models.DigitizedWork`
records. Should be extended for specific source logic.
For use in views and manage commands."""
existing_ids = None
#: status - successfully imported record
SUCCESS = 1
#: status - skipped because already in the database
SKIPPED = 2
#: invalid id
INVALID_ID = 3
#: human-readable message to display for result status
status_message = {
SUCCESS: "Success",
SKIPPED: "Skipped; already in the database",
INVALID_ID: "Invalid id",
}
def __init__(self, source_ids=None):
self.imported_works = []
self.results = {}
self.source_ids = source_ids or []
def filter_existing_ids(self):
"""Check for any ids that are in the database so they can
be skipped for import. Populates :attr:`existing_ids`
with an :class:`~collections.OrderedDict` of source_id -> id for
ids already in the database and filters :attr:`source_ids`.
:param source_ids: list of source identifiers correspending to
:attr:`~ppa.archive.models.DigitizedWork.source_id`
"""
# query for digitized work with these ids and return
# source id, db id and generate an ordered dict
self.existing_ids = OrderedDict(
DigitizedWork.objects.filter(source_id__in=self.source_ids).values_list(
"source_id", "id"
)
)
# create initial results dict, marking any skipped ids
self.results = OrderedDict(
(id, self.SKIPPED) for id in self.existing_ids.keys()
)
# filter to ids that are not already present in the database
self.source_ids = set(self.source_ids) - set(self.existing_ids.keys())
# also check for and remove filter invalid ids
self.filter_invalid_ids()
def filter_invalid_ids(self):
# optional filtering hook for subclasses; by default, no filtering
# when implementing, should update self.source_ids
pass
def index(self):
"""Index newly imported content, both metadata and full text."""
if self.imported_works:
DigitizedWork.index_items(self.imported_works)
for work in self.imported_works:
# index page index data in chunks (returns a generator)
DigitizedWork.index_items(Page.page_index_data(work))
def get_status_message(self, status):
"""Get a readable status message for a given status"""
try:
# try message for simple states (success, skipped)
return self.status_message[status]
except KeyError:
# if that fails, check for error message
return self.status_message[status.__class__]
def output_results(self):
"""Provide human-readable report of results for each
id that was processed."""
return OrderedDict(
[
(source_id, self.get_status_message(status))
for source_id, status in self.results.items()
]
)
def add_item_prep(self, user=None):
"""Do any prep needed before calling :meth:`import_digitizedwork`;
extend in subclass when needed.
:params user: optional user to be included in log entry message
"""
pass
def add_items(self, log_msg_src=None, user=None):
"""Add new items from source.
:params log_msg_src: optional source of change to be included in
log entry message
:params user: optional user to be included in log entry message
"""
# assumes filter_existing_ids has already been called
# if all ids were invalid or already present, bail out
if not self.source_ids:
return
# disconnect indexing signal handler before adding new content
IndexableSignalHandler.disconnect()
self.add_item_prep(user=user)
for source_id in self.source_ids:
self.import_digitizedwork(source_id, log_msg_src, user)
# reconnect indexing signal handler
IndexableSignalHandler.connect()
def import_digitizedwork(self, log_msg_src=None, user=None):
"""Import a single item from source. Must be implemented in subclass."""
raise NotImplementedError
class HathiImporter(DigitizedWorkImporter):
"""Logic for creating new :class:`~ppa.archive.models.DigitizedWork`
records from HathiTrust. For use in views and manage commands.
:param list source_ids: list of HathiTrust source ids (htid) to
synchronize (optional)
:param bool rsync_output: determines whether rsync itemized report
is enabled (default: False)
:param str output_dir: base directory for rsync output file
(required if `rsync_output` is True)
:raises ValueError: if output_dir is unset when rsync_output is True or
if output_dir is not an existing directory
"""
#: rsync error
RSYNC_ERROR = 4
#: augment base status messages with hathi-specific codes and messages
status_message = DigitizedWorkImporter.status_message.copy()
status_message.update(
{
hathi.HathiItemNotFound: "Error loading record; check that id is valid.",
# possibly irrelevant with removal of data api code
hathi.HathiItemForbidden: "Permission denied to download data.",
RSYNC_ERROR: "Failed to sync data",
# only saw this one on day, but this was what it was
JSONDecodeError: "HathiTrust catalog temporarily unavailable "
+ "(malformed response).",
}
)
def __init__(self, source_ids=None, rsync_output=False, output_dir=None):
super().__init__(source_ids)
# track whether (and how much) rsync output is desired
self.rsync_output = rsync_output
# if rsync output is enabled, output directory is required
if self.rsync_output:
if output_dir is None:
raise ValueError("output_dir is required when rsync_output is enabled")
elif not os.path.isdir(output_dir):
raise ValueError(
f"rsync output dir {output_dir} is not an existing directory"
)
self.output_dir = output_dir
def filter_invalid_ids(self):
"""Remove any ids that don't look valid. At minimum, must
include `.` separator required for pairtree path."""
invalid_ids = [htid for htid in self.source_ids if "." not in htid]
# add result code to display in output
for htid in invalid_ids:
self.results[htid] = self.INVALID_ID
# remove from the set of ids to be processed and return the rest
self.source_ids = set(self.source_ids) - set(invalid_ids)
@cached_property
def pairtree_paths(self):
"""Dictionary of pairtree paths for each hathi id to be imported."""
id_paths = {}
for htid in self.source_ids:
# split institional prefix from identifier
prefix, ident = htid.split(".", 1)
# generate pairtree path for the item
id_paths[htid] = os.path.join(prefix, "pairtree_root", id_to_dirpath(ident))
# ensure pairtree prefix and version files are included
# for each prefix, so new prefixes will result in valid pairtrees
id_paths[prefix] = "%s/pairtree_prefix" % prefix
# wildcard doesn't work here; if hathitrust ever changes pairtree
# version, this will likely need to change!
id_paths["%s_version" % prefix] = "%s/pairtree_version0_1" % prefix
return id_paths
# rsync command adapted from HathiTrust dataset sync documentation:
# https://github.com/hathitrust/datasets/wiki/Dataset-rsync-instructions
# recursive, copy links, preserve times, delete extra files at destination
# NOTE: add -v if needed for debugging
rsync_cmd = (
"rsync -rLt %(output)s --delete --ignore-errors "
+ " --files-from=%(path_file)s %(server)s:%(src)s %(dest)s"
)
RSYNC_RETURN_CODES = {
1: "Syntax or usage error",
2: "Protocol incompatibility",
3: "Errors selecting input/output files, dirs",
4: "Requested action not supported",
# ... : an attempt was made to manipulate 64-bit
# files on a platform that cannot support them; or an option was specified
# that is supported by the client and not by the server.
5: "Error starting client-server protocol",
6: "Daemon unable to append to log-file",
10: "Error in socket I/O",
11: "Error in file I/O",
12: "Error in rsync protocol data stream",
13: "Errors with program diagnostics",
14: "Error in IPC code",
20: "Received SIGUSR1 or SIGINT",
21: "Some error returned by waitpid()",
22: "Error allocating core memory buffers",
23: "Partial transfer due to error",
24: "Partial transfer due to vanished source files",
25: "The --max-delete limit stopped deletions",
30: "Timeout in data send/receive",
35: "Timeout waiting for daemon connection",
}
def rsync_data(self):
"""Use rsync to retrieve data for the volumes to be imported."""
# limit the number of ids included in the log message
log_detail = ""
rsync_count = len(self.source_ids)
if rsync_count <= 10:
log_detail = ", ".join(self.source_ids)
else:
log_detail = "%d volumes" % rsync_count
logger.info("rsyncing pairtree data for %s", log_detail)
# create temp file with list of paths to synchronize
with tempfile.NamedTemporaryFile(
prefix="ppa_hathi_pathlist-",
suffix=".txt",
mode="w+t",
# temporary preserve file for dev
delete=False,
) as fp:
file_paths = list(self.pairtree_paths.values())
# sorting makes rsync more efficient
file_paths.sort()
fp.write("\n".join(file_paths))
# flush to make content available to rsync
fp.flush()
# populate rsync command with path file name,
# local hathi data dir, and remote dataset server and source
# if rsync output requested, include itemize and log fileargs
output_opts = ""
if self.rsync_output:
outputfilename = os.path.join(
self.output_dir,
"ppa_hathi_rsync_%s.log" % datetime.now().strftime("%Y%m%d-%H%M%S"),
)
# output requested: always log content to a file
output_opts = "--log-file=%s" % outputfilename
# if verbose output requested, itemize report while running
if int(self.rsync_output) >= 2:
output_opts = "-i %s" % output_opts
rsync_cmd = self.rsync_cmd % {
"path_file": fp.name,
"server": settings.HATHITRUST_RSYNC_SERVER,
"src": settings.HATHITRUST_RSYNC_PATH,
"dest": settings.HATHI_DATA,
"output": output_opts,
}
logger.debug("rsync command: %s" % rsync_cmd)
try:
subprocess.run(args=rsync_cmd.split(), check=True)
except subprocess.CalledProcessError as err:
logger.error(
"HathiTrust rsync failed — %s / command: %s"
% (self.RSYNC_RETURN_CODES[err.returncode], rsync_cmd)
)
if self.rsync_output:
return outputfilename
def add_item_prep(self, user=None):
"""Prep before adding new items from HathiTrust.
:params user: optional user to be included in log entry message
"""
# initialize a bibliographic api client to use the same
# session when adding multiple items
self.bib_api = hathi.HathiBibliographicAPI()
# use rsync to copy data from HathiTrust dataset server
# to the local pairtree datastore for ids to be imported
self.rsync_data()
# FIXME: need better error handling here! rsync can error
# or timeout; should we capture output and report that?
# Indexing logs an error if pairtree is not present for an
# unsuppressed work; perhaps we could do a similar check here?
def import_digitizedwork(self, htid, log_msg_src, user):
# if rsync did not create the expected directory,
# set error code and bail out
# if there is a directory but no zip file, bail out
expected_path = os.path.join(settings.HATHI_DATA, self.pairtree_paths[htid])
if not os.path.isdir(expected_path) or not len(
glob.glob(os.path.join(expected_path, "*", "*.zip"))
):
self.results[htid] = self.RSYNC_ERROR
return
try:
# fetch metadata and add to the database
digwork = DigitizedWork.add_from_hathi(
htid, self.bib_api, log_msg_src=log_msg_src, user=user
)
if digwork:
# populate page count
digwork.count_pages()
# save the page count to the database
if digwork.has_changed("page_count"):
digwork.save()
self.imported_works.append(digwork)
self.results[htid] = self.SUCCESS
except (
hathi.HathiItemNotFound,
JSONDecodeError,
hathi.HathiItemForbidden,
) as err:
# json decode error occurred 3/26/2019 - catalog was broken
# and gave a 200 Ok response with PHP error content
# hopefully temporary, but could occur again...
# store the actual error as the results, so that
# downstream code can report as desired
self.results[htid] = err
# remove the partial record if one was created
# (i.e. if metadata succeeded but data failed)
DigitizedWork.objects.filter(source_id=htid).delete()
class GaleImporter(DigitizedWorkImporter):
"""Logic for creating new :class:`~ppa.archive.models.DigitizedWork`
records from Gale/ECCO. For use in views and manage commands."""
#: augment base status messages with hathi-specific codes and messages
status_message = DigitizedWorkImporter.status_message.copy()
status_message.update(
{
GaleAPIError: "Error getting item information from Gale API",
GaleItemForbidden: "Item forbidden (could be invalid id)",
GaleItemNotFound: "Item not found in Gale API",
MARCRecordNotFound: "MARC record not found",
}
)
def add_item_prep(self, user=None):
"""Prepare for adding new items from Gale.
:params user: optional user to be included in log entry
"""
# disconnect indexing signal handler before adding new content
IndexableSignalHandler.disconnect()
# find script user if needed
if user is None:
self.script_user = User.objects.get(username=settings.SCRIPT_USERNAME)
self.digwork_contentype = ContentType.objects.get_for_model(DigitizedWork)
self.gale_api = GaleAPI()
# disconnect indexing signal handler before adding new content
IndexableSignalHandler.disconnect()
def import_digitizedwork(
self, gale_id, log_msg_src="", user=None, collections=None, **kwargs
):
"""Import a single work into the database.
Retrieves bibliographic data from Gale API."""
# NOTE: significant overlap with similar method in import script
try:
item_record = self.gale_api.get_item(gale_id)
except (GaleAPIError, GaleItemForbidden) as err:
# store the error in results for reporting
self.results[gale_id] = err
return
# document metadata is under "doc"
doc_metadata = item_record["doc"]
# create new stub record and populate it from api response
digwork = DigitizedWork(
source_id=gale_id, # or doc_metadata['id']; format CW###
source=DigitizedWork.GALE,
# Gale API now includes ESTC id (updated June 2022)
record_id=doc_metadata["estc"],
source_url=doc_metadata["isShownAt"],
# volume information should be included as volumeNumber when available
enumcron=doc_metadata.get("volumeNumber", "").strip(),
title=doc_metadata["title"].strip(),
page_count=len(item_record["pageResponse"]["pages"]),
# import any notes from csv as private notes
notes=kwargs.get("NOTES", "").strip(),
)
# populate titles, author, publication info from marc record
try:
digwork.metadata_from_marc(get_marc_record(digwork.record_id))
except MARCRecordNotFound as err:
# store the error in results for reporting
self.results[gale_id] = err
return digwork
# set item type when specified and not null;
# otherwise use default as specified in model field
if kwargs.get("item_type"):
digwork.item_type = kwargs["item_type"]
# if item type is article/excerpt,
# override metadata with spreadsheet values
# NOTE: some duplication with hathi_excerpt script
if kwargs["item_type"] != DigitizedWork.FULL:
digwork.title = kwargs.get("Title", digwork.title)
# clear out any existing subtitle; excerpts don't have them
digwork.subtitle = ""
digwork.sort_title = kwargs.get("Sort Title", "").strip()
digwork.book_journal = kwargs.get("Book/Journal Title", "").strip()
# set page range for excerpts from csv when set
# intspan requires commas; allow semicolons in input but convert to commas
digwork.pages_digital = (
kwargs.get("Digital Page Range", "").replace(";", ",").strip()
)
digwork.pages_orig = kwargs.get("Original Page Range", "").strip()
# - optional fields
digwork.author = kwargs.get("Author", "").strip()
digwork.public_notes = kwargs.get("Public Notes", "").strip()
# recalculate page count for the excerpt if page range is set
if digwork.pages_digital:
digwork.page_count = digwork.count_pages()
digwork.save()
self.imported_works.append(digwork)
# use user if specified, otherwise fall back to script user
user = user or self.script_user
# create log entry to document import
change_message = "Created from Gale API"
if log_msg_src:
change_message = ("Created from Gale API %s" % log_msg_src,)
LogEntry.objects.log_action(
user_id=user.pk,
content_type_id=self.digwork_contentype.pk,
object_id=digwork.pk,
object_repr=str(digwork),
change_message=change_message,
action_flag=ADDITION,
)
# add to list of imported works
self.results[gale_id] = self.SUCCESS
# set collection membership if any were specified
if collections:
digwork.collections.set(collections)
# index the work once (signals index twice because of m2m change)
DigitizedWork.index_items([digwork])
# item record used for import includes page metadata;
# for efficiency, index pages at import time with the same api response
DigitizedWork.index_items(Page.gale_page_index_data(digwork, item_record))
# return the newly created record
return digwork
def index(self):
# gale records are indexed at import time, to avoid making multiple API calls
pass