scripts/fedora-checksums
#!/usr/bin/env python
# file scripts/fedora-checksums
#
# Copyright 2012 Emory University Libraries
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# NOTE: more detailed documentation & usage examples are included in
# scripts/__init__.py for inclusion in sphinx docs.
import argparse
from collections import defaultdict
import csv
from eulfedora.server import Repository
from eulfedora.rdfns import model as modelns
from getpass import getpass
import os
import signal
import six
from six.moves.queue import Queue, Empty as EmptyQueue
import sys
import threading
from time import sleep
try:
from progressbar import ProgressBar, Bar, Percentage, ETA, SimpleProgress
except ImportError:
ProgressBar = None
class FedoraChecksums(object):
stats = defaultdict(int)
csv_file = None
csv = None
# interrupt flag to exit the main processing loop when a signal is caught
interrupted = False
# URI for Fedora object content model
object_model = 'info:fedora/fedora-system:FedoraObject-3.0'
args = None
parser = None
def config_arg_parser(self):
# configure argument parser
# common args for either mode
common_args = argparse.ArgumentParser(add_help=False)
# fedora connection options
repo_args = common_args.add_argument_group('Fedora repository connection options')
repo_args.add_argument('--fedora-root', dest='fedora_root', required=True,
help='URL for accessing fedora, e.g. http://localhost:8080/fedora/')
repo_args.add_argument('--fedora-user', dest='fedora_user', default=None,
help='Fedora username')
repo_args.add_argument('--fedora-password', dest='fedora_password', metavar='PASSWORD',
default=None, action=PasswordAction,
help='Password for the specified Fedora user (leave blank to be prompted)')
# general script options
common_args.add_argument('--quiet', '-q', default=False, action='store_true',
help='Quiet mode: only output summary report')
common_args.add_argument('--max', '-m', type=int, metavar='N',
help='Stop after processing the first %(metavar)s objects')
common_args.add_argument('--concurrency', type=int, metavar='N', default=5,
help='Number of concurrent validation/repair threads to run (default: %(default)d)')
common_args.add_argument('--pid-list', metavar='PIDLIST_FILE',
help='File with a list of pids to be checked, one on each line')
common_args.add_argument('pids', metavar='PID', nargs='*',
help='list specific pids to be checked (optional)')
# actual main argument parser
self.parser = argparse.ArgumentParser(description='''Validate or repair datastream
checksums for Fedora repository content. By default, iterates through all content
objects that are findable via RIsearch and checks or repairs all datastreams.
''')
# add subcommands for script modes
subparsers = self.parser.add_subparsers(dest='mode', title='subcommands')
# validate
v_parser = subparsers.add_parser('validate',
help='check for invalid and missing checksums',
parents=[common_args])
v_parser.add_argument('--csv-file', dest='csv_file', default=None,
help='Output results to the specified CSV file')
v_parser.add_argument('--csv-all', dest='csv_all', action='store_true', default=False,
help='Include all results (valid/invalid/missing) in CSV output')
v_parser.add_argument('--all-versions', '-a', dest='all_versions', action='store_true',
help='''Check all versions of datastreams
(by default, only current versions are checked)''')
v_parser.add_argument('--missing-only', dest='missing_only', action='store_true',
help='''Only check for datastreams with no checksum''')
# repair
r_parser = subparsers.add_parser('repair',
help='repair missing checksums',
parents=[common_args])
r_parser.add_argument('--checksum-type', dest='checksum_type', default='DEFAULT',
help='''Checksum type to use; if not specified,
will prompt Fedora to use the configured default checksum type''')
r_parser.add_argument('--force', dest='force_ds_ids', action=CSVAction,
help='''A comma separated list of datastream IDs to repair even if checksum is present''')
def run(self):
# bind a handler for interrupt signal
signal.signal(signal.SIGINT, self.interrupt_handler)
self.config_arg_parser()
self.args = self.parser.parse_args()
if not self.args.mode:
self.parser.print_help()
return
# if in validation mode and csv-file is specified, open the
# file and write the header row
if self.args.mode == 'validate' and self.args.csv_file:
# TODO: error handling for file open/write failure
self.csv_file = open(self.args.csv_file, 'wb')
self.csv = csv.writer(self.csv_file, quoting=csv.QUOTE_ALL)
self.csv.writerow(['pid', 'datastream id', 'date created', 'status',
'mimetype', 'versioned', 'control group', 'content models'])
# TODO: needs fedora error handling (e.g., bad password, hostname, etc)
repo = Repository(self.args.fedora_root,
self.args.fedora_user, self.args.fedora_password)
if self.args.pids:
# if pids were specified on the command line, use those
# get distinct pid list (only process each object once)
object_pids = set(pid for pid in self.args.pids)
elif self.args.pid_list:
# file with pids, one per line
# skip blank lines and lines starting with #
with open(self.args.pid_list) as pidfile:
object_pids = set([line.rstrip('\n') for line in pidfile
if line.strip() != '' and not line.startswith('#')])
else:
# otherwise, process all find-able objects
object_pids = list(repo.risearch.get_subjects(modelns.hasModel, self.object_model))
# initalize progress bar
pid_pbar = None
total = self.args.max or len(object_pids)
# init progress bar if available and we're checking enough objects
if total >= 10 and ProgressBar and os.isatty(sys.stderr.fileno()):
widgets = [Percentage(), ' (', SimpleProgress(), ')',
Bar(), ETA()]
pid_pbar = ProgressBar(widgets = widgets, maxval=total).start()
self.todo_queue = Queue()
self.done_queue = Queue()
# count of # datastreams queued for processing, keyed on pid
# (used to determine when an object is finished)
self.queued = {}
# determine worker and reporter to use for requested items
if self.args.mode == 'validate':
worker = ValidationWorker
reporter = ValidationReporter
elif self.args.mode == 'repair':
worker = RepairWorker
reporter = RepairReporter
# start the requested number of worker threads
# NOTE: might not want multiple threads for just a few pids...
for i in range(self.args.concurrency):
v = worker(self.todo_queue, self.done_queue,
self.args, self.queued, self.stats)
v.start()
# start a single reporter thread to pick up completed items
vr = reporter(self.done_queue, self.args, self.stats,
self.queued, pid_pbar, self.csv)
vr.start()
# keep track of number of jobs queued per item
for pid in object_pids:
obj = repo.get_object(pid=pid)
if not obj.exists:
print("Error: %s does not exist or is inaccessible" % pid)
continue
self.queued[obj.pid] = 0
for dsid in six.iterkeys(obj.ds_list):
dsobj = obj.getDatastreamObject(dsid)
self.stats['ds'] += 1
if self.args.mode == 'validate':
self.queue_validation(dsobj)
elif self.args.mode == 'repair':
self.queue_repair(dsobj)
# if interrupted or at a specified max, quit
if self.interrupted or \
self.args.max and len(self.queued.keys()) >= self.args.max:
if self.interrupted:
# update progressbar to show queued items
pid_pbar.maxval = max(len(self.queued.keys()), self.stats['objects'])
pid_pbar.update(self.stats['objects'])
break
# queue.join blocks; check periodically if the need to check/sleep/interrupt
while not self.todo_queue.empty():
sleep(1)
self.todo_queue.join()
while not self.done_queue.empty():
sleep(1)
self.done_queue.join()
if pid_pbar and not self.interrupted:
pid_pbar.finish()
# summarize what was done
if self.args.mode == 'validate':
self.validation_summary()
elif self.args.mode == 'repair':
self.repair_summary()
# if a csv file was opened, close it
if self.csv_file:
self.csv_file.close()
def queue_validation(self, dsobj):
'''Add information to the ``to-do`` queue for datastreams
versions to be validated.
'''
pid = dsobj.obj.pid
if pid not in self.queued:
self.queued[pid] = 0
if self.args.all_versions:
# check every version of this datastream
try:
for ds in dsobj.history().versions:
self.todo_queue.put((pid, dsobj.id, ds.created))
self.queued[pid] += 1
self.stats['ds_versions'] += 1
except Exception as e:
print("error getting history for %s %r" % (pid, e))
else:
# current version only
self.todo_queue.put((pid, dsobj.id, None))
self.queued[pid] += 1
def validation_summary(self):
'''Summarize what was done when the script was run in
**validation** mode.'''
totals = '\nTested %(objects)d object(s), %(ds)d datastream(s)' % self.stats
if self.args.all_versions:
totals += ', %(ds_versions)d datastream version(s)' % self.stats
print(totals)
if not self.args.missing_only:
print('%(invalid)d invalid checksum(s)' % self.stats)
print('%(missing)d datastream(s) with no checksum' % self.stats)
def queue_repair(self, dsobj):
'''Add information to the ``to-do`` queue for datastream
to be checked for repair.'''
pid = dsobj.obj.pid
if pid not in self.queued:
self.queued[pid] = 0
self.queued[pid] += 1
self.todo_queue.put((pid, dsobj.id))
def repair_datastream(self, dsobj):
'''Check for and repair a missing checksum on a single
datastream. If checksum type is ``DISABLED`` and checksum
value is ``none``, update the checksum type and save the
datastream, prompting Fedora to calculate a new checksum of
the requested type.
:param dsobj: :class:`~eulfedora.models.DatastreamObject`
'''
if dsobj.checksum_type == 'DISABLED' or dsobj.checksum == 'none' or dsobj.id in self.args.force_ds_ids:
dsobj.checksum_type = self.args.checksum_type
try:
saved = dsobj.save('updating checksum')
if saved:
self.stats['ds_updated'] += 1
except Exception as e:
print('Error saving %s/%s : %s' % \
(dsobj.obj.pid, dsobj.id, e))
self.stats['ds_err'] += 1
def repair_summary(self):
'''Summarize what was done when the script was run in
**repair** mode.'''
print('\nChecked %(objects)d object(s), updated %(ds_updated)d datastream(s)' % \
self.stats)
if self.stats['ds_err']:
print('Error saving %(ds_err)d datastream(s)' % self.stats)
def interrupt_handler(self, signum, frame):
'''Gracefully handle a SIGINT, if possible. Sets a flag so main script
loop can exit cleanly, and restores the default SIGINT behavior,
so that a second interrupt will stop the script.
'''
if signum == signal.SIGINT:
# restore default signal handler so a second SIGINT can be used to quit
signal.signal(signal.SIGINT, signal.SIG_DFL)
# set interrupt flag so main loop knows to quit at a reasonable time
self.interrupted = True
# report if script is in the middle of an object
print('Script will exit after processing the current object.' + \
'\n(Ctrl-C / Interrupt again to quit immediately)')
class DatastreamWorker(threading.Thread):
'''Thread class with common logic for processing items in the
``to-do`` queue and adding results to the ``done`` queue. '''
daemon = True
def __init__(self, todo_queue, done_queue, options, queued, stats):
threading.Thread.__init__(self)
self.todo = todo_queue
self.done = done_queue
self.repo = Repository(options.fedora_root,
options.fedora_user, options.fedora_password)
self.queued = queued
self.stats = stats
self.options = options
def run(self):
while True:
try:
item = self.todo.get()
# queued item is a tuple; first element should always be pid
pid = item[0]
self.done.put(self.process_queued_item(item))
self.queued[pid] -= 1
if self.queued[pid] == 0:
self.stats['objects'] += 1
self.todo.task_done()
except EmptyQueue:
sleep(1)
def process_queued_item(self, *args, **kwargs):
# processing logic goes here
pass
class Reporter(threading.Thread):
'''Thread class with common logic for handling items in the
``done`` queue and reporting where appropriate.'''
daemon = True
def __init__(self, done_queue, options, stats, queued,
pbar=None, csv=None):
threading.Thread.__init__(self)
self.done = done_queue
self.quiet = options.quiet
self.csv = csv
self.pbar = pbar
self.stats = stats
self.queued = queued
self.options = options
def run(self):
while True:
try:
self.process_finished_item(self.done.get())
if self.pbar:
# maxval/stats could get out of sync when interrupted
self.pbar.maxval = max(self.pbar.maxval, self.stats['objects'])
self.pbar.update(self.stats['objects'])
self.done.task_done()
except EmptyQueue:
sleep(1)
def process_finished_item(self, *args, **kwargs):
# result processing logic goes here
pass
class ValidationResult(object):
'''Result object for communicating validation outcome.'''
def __init__(self, pid, dsid, date, status, mimetype, versionable,
control_group, content_models):
self.pid = pid
self.dsid = dsid
self.date = date
self.status = status
self.mimetype = mimetype
self.versionable = versionable
self.control_group = control_group
self.content_models = ', '.join(content_models)
@property
def valid(self):
'boolean property indicating if the datastream was valid'
return self.status == 'ok'
def msg(self):
'Information to be displayed for verbose output'
return "%s/%s - %s checksum (%s)" % \
(self.pid, self.dsid,
self.status, self.date)
def csv_data(self):
'List of data fields to be added to CSV file'
return [self.pid, self.dsid, self.date,
self.status, self.mimetype, self.versionable, self.control_group,
self.content_models]
class ValidationWorker(DatastreamWorker):
def process_queued_item(self, item):
'''Validate the checksum and set status to one of ``invalid``,
``missing``, or ``ok``.'''
pid, dsid, date = item
obj = self.repo.get_object(pid)
dsobj = obj.getDatastreamObject(dsid)
try:
if not dsobj.validate_checksum(date=date):
status = 'invalid'
# if the checksum in fedora is stored as DISABLED/none,
# validate_checksum will return True - but that may not be
# what we want, so report as missing.
elif dsobj.checksum_type == 'DISABLED' or dsobj.checksum == 'none':
status = 'missing'
else:
status = 'ok'
except Exception as e:
status = 'error'
print("error validating checksum %s %s %r" % (pid, dsid, e))
return ValidationResult(pid, dsid, date or dsobj.created, status,
dsobj.mimetype, dsobj.versionable,
dsobj.control_group, obj.get_models())
class ValidationReporter(Reporter):
def process_finished_item(self, result):
self.stats[result.status] += 1
if not result.valid:
if not self.quiet:
print(result.msg())
# output to csv if requested and result is invalid or all output requested
if self.csv and not result.valid or self.options.csv_all:
self.csv.writerow(result.csv_data())
class RepairResult(object):
'''Result object for communicating repair outcome.'''
def __init__(self, pid, dsid, saved, error):
self.pid = pid
self.dsid = dsid
self.saved = saved
self.error = error
class RepairWorker(DatastreamWorker):
def process_queued_item(self, item):
'''Check for and repair a missing checksum on a single
datastream. If checksum type is ``DISABLED`` and checksum
value is ``none``, update the checksum type and save the
datastream, prompting Fedora to calculate a new checksum of
the requested type.
:param pid: object pid
:param dsid: datastream id to be checked
'''
pid, dsid = item
obj = self.repo.get_object(pid)
dsobj = obj.getDatastreamObject(dsid)
saved = False
err = None
if dsobj.checksum_type == 'DISABLED' or dsobj.checksum == 'none':
dsobj.checksum_type = self.options.checksum_type
saved = False
err = None
try:
saved = dsobj.save('updating missing checksum')
except Exception as e:
err = 'Error saving %s/%s : %r' % (pid, dsid, e)
# return result even if nothing was done, so reporter thread
# can update status
return RepairResult(pid, dsid, saved, err)
class RepairReporter(Reporter):
def process_finished_item(self, result):
if result.saved:
self.stats['ds_updated'] += 1
if result.error:
print(result.error)
self.stats['ds_err'] += 1
class PasswordAction(argparse.Action):
'''Use :meth:`getpass.getpass` to prompt for a password for a
command-line argument.'''
def __call__(self, parser, namespace, value, option_string=None):
# if a value was specified on the command-line, use that
if value:
setattr(namespace, self.dest, value)
# otherwise, use getpass to prompt for a password
else:
setattr(namespace, self.dest, getpass())
class CSVAction(argparse.Action):
'''Convert CSV values to python list'''
def __call__(self, parser, namespace, value, option_string=None):
setattr(namespace, self.dest, value.split(','))
if __name__ == '__main__':
FedoraChecksums().run()