emory-libraries/eulfedora

View on GitHub
scripts/fedora-checksums

Summary

Maintainability
Test Coverage
#!/usr/bin/env python

# file scripts/fedora-checksums
#
#   Copyright 2012 Emory University Libraries
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.


# NOTE: more detailed documentation & usage examples are included in
# scripts/__init__.py for inclusion in sphinx docs.

import argparse
from collections import defaultdict
import csv
from eulfedora.server import Repository
from eulfedora.rdfns import model as modelns
from getpass import getpass
import os
import signal
import six
from six.moves.queue import Queue, Empty as EmptyQueue
import sys
import threading
from time import sleep


try:
    from progressbar import ProgressBar, Bar, Percentage, ETA, SimpleProgress
except ImportError:
    ProgressBar = None


class FedoraChecksums(object):

    stats = defaultdict(int)

    csv_file = None
    csv = None

    # interrupt flag to exit the main processing loop when a signal is caught
    interrupted = False

    # URI for Fedora object content model
    object_model = 'info:fedora/fedora-system:FedoraObject-3.0'

    args = None
    parser = None

    def config_arg_parser(self):
        # configure argument parser

        # common args for either mode
        common_args = argparse.ArgumentParser(add_help=False)
        # fedora connection options
        repo_args = common_args.add_argument_group('Fedora repository connection options')
        repo_args.add_argument('--fedora-root', dest='fedora_root', required=True,
                               help='URL for accessing fedora, e.g. http://localhost:8080/fedora/')
        repo_args.add_argument('--fedora-user', dest='fedora_user', default=None,
                               help='Fedora username')
        repo_args.add_argument('--fedora-password', dest='fedora_password', metavar='PASSWORD',
                               default=None, action=PasswordAction,
                               help='Password for the specified Fedora user (leave blank to be prompted)')
        # general script options
        common_args.add_argument('--quiet', '-q', default=False, action='store_true',
                                 help='Quiet mode: only output summary report')
        common_args.add_argument('--max', '-m', type=int, metavar='N',
                                 help='Stop after processing the first %(metavar)s objects')
        common_args.add_argument('--concurrency', type=int, metavar='N', default=5,
                                 help='Number of concurrent validation/repair threads to run (default: %(default)d)')
        common_args.add_argument('--pid-list', metavar='PIDLIST_FILE',
                                 help='File with a list of pids to be checked, one on each line')
        common_args.add_argument('pids', metavar='PID', nargs='*',
                                 help='list specific pids to be checked (optional)')


        # actual main argument parser
        self.parser = argparse.ArgumentParser(description='''Validate or repair datastream
            checksums for Fedora repository content.  By default, iterates through all content
            objects that are findable via RIsearch and checks or repairs all datastreams.
            ''')

        # add subcommands for script modes
        subparsers = self.parser.add_subparsers(dest='mode', title='subcommands')
        # validate
        v_parser = subparsers.add_parser('validate',
                                         help='check for invalid and missing checksums',
                                         parents=[common_args])
        v_parser.add_argument('--csv-file', dest='csv_file', default=None,
                              help='Output results to the specified CSV file')
        v_parser.add_argument('--csv-all', dest='csv_all', action='store_true', default=False,
                              help='Include all results (valid/invalid/missing) in CSV output')
        v_parser.add_argument('--all-versions', '-a', dest='all_versions', action='store_true',
                              help='''Check all versions of datastreams
                              (by default, only current versions are checked)''')
        v_parser.add_argument('--missing-only', dest='missing_only', action='store_true',
                              help='''Only check for datastreams with no checksum''')
        # repair
        r_parser = subparsers.add_parser('repair',
                                         help='repair missing checksums',
                                         parents=[common_args])
        r_parser.add_argument('--checksum-type', dest='checksum_type', default='DEFAULT',
                              help='''Checksum type to use; if not specified,
                              will prompt Fedora to use the configured default checksum type''')

        r_parser.add_argument('--force', dest='force_ds_ids', action=CSVAction,
                              help='''A comma separated list of datastream IDs to repair even if checksum is present''')


    def run(self):
        # bind a handler for interrupt signal
        signal.signal(signal.SIGINT, self.interrupt_handler)

        self.config_arg_parser()
        self.args = self.parser.parse_args()

        if not self.args.mode:
            self.parser.print_help()
            return

        # if in validation mode and csv-file is specified, open the
        # file and write the header row
        if self.args.mode == 'validate' and self.args.csv_file:
            # TODO: error handling for file open/write failure
            self.csv_file = open(self.args.csv_file, 'wb')
            self.csv = csv.writer(self.csv_file, quoting=csv.QUOTE_ALL)
            self.csv.writerow(['pid', 'datastream id', 'date created', 'status',
                               'mimetype', 'versioned', 'control group', 'content models'])

        # TODO: needs fedora error handling (e.g., bad password, hostname, etc)
        repo = Repository(self.args.fedora_root,
                          self.args.fedora_user, self.args.fedora_password)

        if self.args.pids:
            # if pids were specified on the command line, use those
            # get distinct pid list (only process each object once)
            object_pids = set(pid for pid in self.args.pids)
        elif self.args.pid_list:
            # file with pids, one per line
            # skip blank lines and lines starting with #
            with open(self.args.pid_list) as pidfile:
                object_pids = set([line.rstrip('\n') for line in pidfile
                                  if line.strip() != '' and not line.startswith('#')])
        else:
            # otherwise, process all find-able objects
            object_pids = list(repo.risearch.get_subjects(modelns.hasModel, self.object_model))

        # initalize progress bar
        pid_pbar = None
        total = self.args.max or len(object_pids)
        # init progress bar if available and we're checking enough objects
        if total >= 10 and ProgressBar and os.isatty(sys.stderr.fileno()):
            widgets = [Percentage(), ' (', SimpleProgress(), ')',
                       Bar(), ETA()]
            pid_pbar = ProgressBar(widgets = widgets, maxval=total).start()

        self.todo_queue = Queue()
        self.done_queue = Queue()

        # count of # datastreams queued for processing, keyed on pid
        # (used to determine when an object is finished)
        self.queued = {}

        # determine worker and reporter to use for requested items
        if self.args.mode == 'validate':
            worker = ValidationWorker
            reporter = ValidationReporter
        elif self.args.mode == 'repair':
            worker = RepairWorker
            reporter = RepairReporter

        # start the requested number of worker threads
        # NOTE: might not want multiple threads for just a few pids...
        for i in range(self.args.concurrency):
            v = worker(self.todo_queue, self.done_queue,
                                 self.args, self.queued, self.stats)
            v.start()

        # start a single reporter thread to pick up completed items
        vr = reporter(self.done_queue, self.args, self.stats,
                                self.queued, pid_pbar, self.csv)
        vr.start()

        # keep track of number of jobs queued per item
        for pid in object_pids:
            obj = repo.get_object(pid=pid)
            if not obj.exists:
                print("Error: %s does not exist or is inaccessible" % pid)
                continue

            self.queued[obj.pid] = 0

            for dsid in six.iterkeys(obj.ds_list):
                dsobj = obj.getDatastreamObject(dsid)
                self.stats['ds'] += 1

                if self.args.mode == 'validate':
                    self.queue_validation(dsobj)

                elif self.args.mode == 'repair':
                    self.queue_repair(dsobj)

            # if interrupted or at a specified max, quit
            if self.interrupted or \
                   self.args.max and len(self.queued.keys()) >= self.args.max:

                if self.interrupted:
                    # update progressbar to show queued items
                    pid_pbar.maxval = max(len(self.queued.keys()), self.stats['objects'])
                    pid_pbar.update(self.stats['objects'])
                break

        # queue.join blocks; check periodically if the need to check/sleep/interrupt
        while not self.todo_queue.empty():
            sleep(1)
        self.todo_queue.join()

        while not self.done_queue.empty():
            sleep(1)
        self.done_queue.join()

        if pid_pbar and not self.interrupted:
            pid_pbar.finish()

        # summarize what was done
        if self.args.mode == 'validate':
            self.validation_summary()
        elif self.args.mode == 'repair':
            self.repair_summary()

        # if a csv file was opened, close it
        if self.csv_file:
            self.csv_file.close()

    def queue_validation(self, dsobj):
        '''Add information to the ``to-do`` queue for datastreams
        versions to be validated.
        '''
        pid = dsobj.obj.pid
        if pid not in self.queued:
            self.queued[pid] = 0

        if self.args.all_versions:
            # check every version of this datastream
            try:
                for ds in dsobj.history().versions:
                    self.todo_queue.put((pid, dsobj.id, ds.created))
                    self.queued[pid] += 1
                    self.stats['ds_versions'] += 1
            except Exception as e:
                print("error getting history for %s %r" % (pid, e))

        else:
            # current version only
            self.todo_queue.put((pid, dsobj.id, None))
            self.queued[pid] += 1

    def validation_summary(self):
        '''Summarize what was done when the script was run in
        **validation** mode.'''
        totals = '\nTested %(objects)d object(s), %(ds)d datastream(s)' % self.stats
        if self.args.all_versions:
            totals += ', %(ds_versions)d datastream version(s)' % self.stats
        print(totals)
        if not self.args.missing_only:
            print('%(invalid)d invalid checksum(s)' % self.stats)
        print('%(missing)d datastream(s) with no checksum' % self.stats)

    def queue_repair(self, dsobj):
        '''Add information to the ``to-do`` queue for datastream
        to be checked for repair.'''
        pid = dsobj.obj.pid
        if pid not in self.queued:
            self.queued[pid] = 0
        self.queued[pid] += 1
        self.todo_queue.put((pid, dsobj.id))


    def repair_datastream(self, dsobj):
        '''Check for and repair a missing checksum on a single
        datastream.  If checksum type is ``DISABLED`` and checksum
        value is ``none``, update the checksum type and save the
        datastream, prompting Fedora to calculate a new checksum of
        the requested type.

        :param dsobj: :class:`~eulfedora.models.DatastreamObject`
        '''

        if dsobj.checksum_type == 'DISABLED' or dsobj.checksum == 'none' or dsobj.id in self.args.force_ds_ids:
            dsobj.checksum_type = self.args.checksum_type
            try:
                saved = dsobj.save('updating checksum')
                if saved:
                    self.stats['ds_updated'] += 1
            except Exception as e:
                print('Error saving %s/%s : %s' % \
                      (dsobj.obj.pid, dsobj.id, e))
                self.stats['ds_err'] += 1

    def repair_summary(self):
        '''Summarize what was done when the script was run in
        **repair** mode.'''
        print('\nChecked %(objects)d object(s), updated %(ds_updated)d datastream(s)' % \
              self.stats)
        if self.stats['ds_err']:
            print('Error saving %(ds_err)d datastream(s)' % self.stats)

    def interrupt_handler(self, signum, frame):
        '''Gracefully handle a SIGINT, if possible. Sets a flag so main script
        loop can exit cleanly, and restores the default SIGINT behavior,
        so that a second interrupt will stop the script.
        '''
        if signum == signal.SIGINT:
            # restore default signal handler so a second SIGINT can be used to quit
            signal.signal(signal.SIGINT, signal.SIG_DFL)
            # set interrupt flag so main loop knows to quit at a reasonable time
            self.interrupted = True
            # report if script is in the middle of an object
            print('Script will exit after processing the current object.' + \
                 '\n(Ctrl-C / Interrupt again to quit immediately)')

class DatastreamWorker(threading.Thread):
    '''Thread class with common logic for processing items in the
    ``to-do`` queue and adding results to the ``done`` queue. '''
    daemon = True

    def __init__(self, todo_queue, done_queue, options, queued, stats):
        threading.Thread.__init__(self)
        self.todo = todo_queue
        self.done = done_queue
        self.repo = Repository(options.fedora_root,
                          options.fedora_user, options.fedora_password)
        self.queued = queued
        self.stats = stats
        self.options = options

    def run(self):
        while True:
            try:
                item = self.todo.get()
                # queued item is a tuple; first element should always be pid
                pid = item[0]
                self.done.put(self.process_queued_item(item))

                self.queued[pid] -= 1
                if self.queued[pid] == 0:
                    self.stats['objects'] += 1

                self.todo.task_done()
            except EmptyQueue:
                sleep(1)

    def process_queued_item(self, *args, **kwargs):
        # processing logic goes here
        pass


class Reporter(threading.Thread):
    '''Thread class with common logic for handling items in the
    ``done`` queue and reporting where appropriate.'''

    daemon = True

    def __init__(self, done_queue, options, stats, queued,
                 pbar=None, csv=None):
        threading.Thread.__init__(self)
        self.done = done_queue
        self.quiet = options.quiet
        self.csv = csv
        self.pbar = pbar
        self.stats = stats
        self.queued = queued
        self.options = options

    def run(self):
        while True:
            try:
                self.process_finished_item(self.done.get())

                if self.pbar:
                    # maxval/stats could get out of sync when interrupted
                    self.pbar.maxval = max(self.pbar.maxval, self.stats['objects'])
                    self.pbar.update(self.stats['objects'])

                self.done.task_done()

            except EmptyQueue:
                sleep(1)

    def process_finished_item(self, *args, **kwargs):
        # result processing logic goes here
        pass


class ValidationResult(object):
    '''Result object for communicating validation outcome.'''

    def __init__(self, pid, dsid, date, status, mimetype, versionable,
                 control_group, content_models):
        self.pid = pid
        self.dsid = dsid
        self.date = date
        self.status = status
        self.mimetype = mimetype
        self.versionable = versionable
        self.control_group = control_group
        self.content_models = ', '.join(content_models)

    @property
    def valid(self):
        'boolean property indicating if the datastream was valid'
        return self.status == 'ok'

    def msg(self):
        'Information to be displayed for verbose output'
        return "%s/%s - %s checksum (%s)" % \
               (self.pid, self.dsid,
                self.status, self.date)

    def csv_data(self):
        'List of data fields to be added to CSV file'
        return [self.pid, self.dsid, self.date,
                self.status, self.mimetype, self.versionable, self.control_group,
                self.content_models]


class ValidationWorker(DatastreamWorker):

    def process_queued_item(self, item):
        '''Validate the checksum and set status to one of ``invalid``,
        ``missing``, or ``ok``.'''
        pid, dsid, date = item

        obj = self.repo.get_object(pid)
        dsobj = obj.getDatastreamObject(dsid)

        try:
            if not dsobj.validate_checksum(date=date):
                status = 'invalid'

            # if the checksum in fedora is stored as DISABLED/none,
            # validate_checksum will return True - but that may not be
            # what we want, so report as missing.
            elif dsobj.checksum_type == 'DISABLED' or dsobj.checksum == 'none':
                status = 'missing'

            else:
                status = 'ok'
        except Exception as e:
            status = 'error'
            print("error validating checksum %s %s %r" % (pid, dsid, e))

        return ValidationResult(pid, dsid, date or dsobj.created, status,
                                dsobj.mimetype, dsobj.versionable,
                                dsobj.control_group, obj.get_models())


class ValidationReporter(Reporter):

    def process_finished_item(self, result):
        self.stats[result.status] += 1

        if not result.valid:
            if not self.quiet:
                print(result.msg())
        # output to csv if requested and result is invalid or all output requested
        if self.csv and not result.valid or self.options.csv_all:
            self.csv.writerow(result.csv_data())


class RepairResult(object):
    '''Result object for communicating repair outcome.'''

    def __init__(self, pid, dsid, saved, error):
        self.pid = pid
        self.dsid = dsid
        self.saved = saved
        self.error = error


class RepairWorker(DatastreamWorker):

    def process_queued_item(self, item):
        '''Check for and repair a missing checksum on a single
        datastream.  If checksum type is ``DISABLED`` and checksum
        value is ``none``, update the checksum type and save the
        datastream, prompting Fedora to calculate a new checksum of
        the requested type.

        :param pid: object pid
        :param dsid: datastream id to be checked
        '''

        pid, dsid = item
        obj = self.repo.get_object(pid)
        dsobj = obj.getDatastreamObject(dsid)
        saved = False
        err = None

        if dsobj.checksum_type == 'DISABLED' or dsobj.checksum == 'none':
            dsobj.checksum_type = self.options.checksum_type

            saved = False
            err = None

            try:
                saved = dsobj.save('updating missing checksum')
            except Exception as e:
                err = 'Error saving %s/%s : %r' % (pid, dsid, e)

        # return result even if nothing was done, so reporter thread
        # can update status
        return RepairResult(pid, dsid, saved, err)


class RepairReporter(Reporter):

    def process_finished_item(self, result):
        if result.saved:
            self.stats['ds_updated'] += 1
        if result.error:
            print(result.error)
            self.stats['ds_err'] += 1


class PasswordAction(argparse.Action):
    '''Use :meth:`getpass.getpass` to prompt for a password for a
    command-line argument.'''
    def __call__(self, parser, namespace, value, option_string=None):
        # if a value was specified on the command-line, use that
        if value:
            setattr(namespace, self.dest, value)
        # otherwise, use getpass to prompt for a password
        else:
            setattr(namespace, self.dest, getpass())

class CSVAction(argparse.Action):
    '''Convert CSV values to python list'''
    def __call__(self, parser, namespace, value, option_string=None):
        setattr(namespace, self.dest, value.split(','))

if __name__ == '__main__':
    FedoraChecksums().run()