scripts/validate-checksums
#!/usr/bin/env python
# file scripts/validate-checksums
#
# Copyright 2014 Emory University Libraries
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# NOTE: more detailed documentation & usage examples are included in
# scripts/__init__.py for inclusion in sphinx docs.
import argparse
import base64
from collections import defaultdict
import datetime
from email.mime.text import MIMEText
from getpass import getpass
from io import StringIO
import rdflib
from rdflib.namespace import Namespace
import signal
import smtplib
import uuid
import six
from six.moves import configparser
from eulxml import xmlmap
from eulxml.xmlmap import premis
from eulfedora import __version__
from eulfedora.server import Repository
from eulfedora.models import DigitalObject, Relation, DigitalObjectSaveFailure, \
XmlDatastream
from eulfedora.util import RequestFailed, force_text
from eulfedora import cryptutil
from eulfedora.syncutil import humanize_file_size
REPOMGMT = Namespace(rdflib.URIRef('http://pid.emory.edu/ns/2011/repo-management/#'))
PREMIS_V1_NAMESPACE = 'http://www.loc.gov/standards/premis/v1'
class BasePremisV1(premis.BasePremis):
ROOT_NS = PREMIS_V1_NAMESPACE
ROOT_NAMESPACES = {
'p': PREMIS_V1_NAMESPACE
}
class PremisObjectV1(BasePremisV1, premis.Object):
pass
class PremisEvent(premis.Event):
# extend base premis class to add outcome detail field
outcome_detail = xmlmap.StringField('p:eventOutcomeInformation/p:eventOutcomeDetail/p:eventOutcomeDetailNote', required=False)
'''outcome of the event (`eventOutcomeInformation/eventOutcomeDetail/eventOutcomeDetailNote`).'''
class PremisEventV1(BasePremisV1, PremisEvent):
pass
class PremisV1(BasePremisV1, premis.Premis):
XSD_SCHEMA = 'http://www.loc.gov/standards/premis/v1/PREMIS-v1-1.xsd'
object = xmlmap.NodeField('p:object', PremisObjectV1)
'a single PREMIS :class:`object`'
events = xmlmap.NodeListField('p:event', PremisEventV1)
def __init__(self, *args, **kwargs):
# override default version 2.0 in base premis class
if 'version' not in kwargs:
kwargs['version'] = None
super(PremisV1, self).__init__(*args, **kwargs)
class FixityObject(DigitalObject):
'''Generic digital object with access to last fixity check in rels-ext'''
last_fixity_check = Relation(REPOMGMT.lastFixityCheck,
ns_prefix={'eul-repomgmt': REPOMGMT}, rdf_type=rdflib.XSD.dateTime,
related_name='+')
old_premis = XmlDatastream('PREMIS', 'PREMIS metadata', PremisV1)
'''Optional ``premis`` datastream (expects PREMIS) for older
objects that do not follow Hydra naming conventions'''
provenance = XmlDatastream('provenanceMetadata',
'Provenance metadata', premis.Premis)
'''Optional ``provenanceMetadata`` datastream (expects PREMIS) for
objects that follow Hydra naming conventions'''
@property
def premis_ds(self):
# clear any cached version not loaded as premis xml
for premis_id in ['provenanceMetadata', 'PREMIS']:
if premis_id in self.dscache:
del self.dscache[premis_id]
if self.provenance.exists:
return self.provenance
elif self.old_premis.exists:
return self.old_premis
# otherwise none
class ValidateChecksums(object):
#: dictionary to keep counts of objects, datastreams checked, errors found, etc
stats = defaultdict(int)
#: dictionaries to keep track of objects that will need to be reported via email
#: likely should include pid (key), list of tuples (dsid, date)
invalid = {}
missing = {}
#: list of pids with save errors
save_errors = []
#: list of pids with rels-ext errors
relsext_errors = []
#: list of pids with premis errors
premis_errors = []
#: list of error messages where any fedora error occurred
fedora_errors = []
#: interrupt flag to exit the main processing loop when a signal is caught
interrupted = False
#: URI for Fedora object content model
object_model = 'info:fedora/fedora-system:FedoraObject-3.0'
#: default number of days between fixity checks before another
default_days_betweeen_checks = 30
def config_arg_parser(self):
# configure argument parser
# common args for either mode
self.parser = argparse.ArgumentParser()
# general script options
self.parser.add_argument('--quiet', '-q', default=False, action='store_true',
help='Quiet mode: only output summary report')
# config file options
cfg_args = self.parser.add_argument_group('Config file options')
cfg_args.add_argument('--generate-config', '-g', default=False, dest='gen_config',
help='''Create a sample config file at the specified location, including any options passed.
Specify the --fedora-password option to generate an encrypted password in the config file.''')
cfg_args.add_argument('--config', '-c', help='Load the specified config file')
cfg_args.add_argument('--key', '-k',
help='''Optional encryption key for encrypting and decrypting the password in the
config file (you must use the same key for generating and loading)''')
# fedora connection options
repo_args = self.parser.add_argument_group('Fedora repository connection options')
repo_args.add_argument('--fedora-root', dest='fedora_root',
help='URL for accessing fedora, e.g. http://localhost:8080/fedora/')
repo_args.add_argument('--fedora-user', dest='fedora_user', default=None,
help='Fedora username')
repo_args.add_argument('--fedora-password', dest='fedora_password', metavar='PASSWORD',
default=None, action=PasswordAction,
help='Password for the specified Fedora user (leave blank to be prompted)')
# processing opts
proc_args = self.parser.add_argument_group('Processing options')
proc_args.add_argument('--max', '-m', type=int, metavar='N',
help='Stop after processing the first %(metavar)s objects')
proc_args.add_argument('--all-versions', '-a', dest='all_versions', action='store_true',
help='''Check all versions of datastreams
(by default, only current versions are checked)''')
proc_args.add_argument('--since', '-s', dest='since', type=int,
default=self.default_days_betweeen_checks,
help='''Check objects with a last fixity check older
than the specified number of days (default: %(default)s)''')
proc_args.add_argument('--time-limit', '-t', dest='timelimit', type=int,
help='''Only run for the specified duration in minutes''')
proc_args.add_argument('--max-size', dest='max_size', type=int,
help='''Skip datastreams larger than the specified size in bytes''')
# email opts
email_args = self.parser.add_argument_group('Email options')
email_args.add_argument('--email', '-e',
help='''One or more email addresses (comma separated) where a report
should be sent if any errors are encountered''')
email_args.add_argument('--smtp', help='''SMTP server to use for sending email (required for email)''')
email_args.add_argument('--from', dest='from_email',
help='''Email address reports should come from (required for email)''')
# optional list of pids
self.parser.add_argument('pids', metavar='PID', nargs='*',
help='list specific pids to be checked (optional)')
def run(self):
# bind a handler for interrupt signal
signal.signal(signal.SIGINT, self.interrupt_handler)
self.config_arg_parser()
self.args = self.parser.parse_args()
# if requested, generate config file and exit
if self.args.key:
# patch in cryptutil encryption key with specified one
cryptutil.ENCRYPTION_KEY = self.args.key
# if requested, load config file and set arguments
if self.args.config:
self.load_configfile()
# if requested, generate a config file with any options specified so far,
# and then quit
if self.args.gen_config:
self.generate_configfile()
return
# if email is specified without smtp, warn (unless quiet mode)
if self.args.email:
if not self.args.smtp or not self.args.from_email \
and not self.args.quiet:
print('Email address specified without an SMTP server; no email will be sent')
if not self.args.fedora_root:
print('Error: Fedora URL (--fedora-root) is required\n')
self.parser.print_help()
return
# TODO: needs fedora error handling (e.g., bad password, hostname, etc)
self.repo = Repository(self.args.fedora_root,
self.args.fedora_user, self.args.fedora_password)
if self.args.pids:
# if pids were specified on the command line, use those
# get distinct pid list (only process each object once)
object_pids = set(pid for pid in self.args.pids)
else:
# otherwise, process unchecked or last fixity check older
# than the specified time period
object_pids = self.pids_to_check(days=self.args.since)
# if a time limit is requested, calculate when to stop
if self.args.timelimit:
end_time = datetime.datetime.now() + datetime.timedelta(minutes=self.args.timelimit)
if not self.args.quiet:
print('Time limit of %d minutes requested; processing will end at %s' \
% (self.args.timelimit, end_time))
for pid in object_pids:
if not self.args.quiet:
print(pid)
obj = self.repo.get_object(pid=pid, type=FixityObject)
if not obj.exists:
print("Error: %s does not exist or is inaccessible" % pid)
continue
self.stats['objects'] += 1
ds_results = {}
for dsid in six.iterkeys(obj.ds_list):
dsobj = obj.getDatastreamObject(dsid)
# if a maximum size is set, check datastream size
if self.args.max_size is not None and dsobj.size > self.args.max_size:
print('Datastream %s/%s size (%s) is above configured max, skipping' % \
(pid, dsid, humanize_file_size(dsobj.size)))
continue
self.stats['ds'] += 1
res = self.validate_datastream(dsobj)
ds_results[dsid] = res
# whether success or failure, update object as checked
now = datetime.datetime.now()
# needs to be in a format fedora accepts as xsd:dateTime,
# isoformat doesn't seem to work (maybe because without timezone?)
try:
obj.last_fixity_check = now.strftime('%Y-%m-%dT%H:%M:%S')
except:
# in a few cases, objects have malformed RELS-EXT
# (e.g., about=pid instead of rdf:about=pid)
# - catch and report on those
self.stats['relsext_errors'] += 1
self.relsext_errors.append(pid)
continue
self.record_fixity_event(obj, ds_results)
try:
obj.save('datastream fixity check')
except DigitalObjectSaveFailure as err:
print('Error saving %s : %s' % (pid, err))
self.stats['save_errors'] += 1
self.save_errors.append(pid)
# check if any of our end conditions are met
# - interrupted by SIGINT
if self.interrupted:
break
if self.args.max is not None and self.stats['objects'] >= int(self.args.max):
if not self.args.quiet:
print('Processed %d objects (requested maximum of %d); stopping' \
% (self.stats['objects'], self.args.max))
break
if self.args.timelimit and datetime.datetime.now() >= end_time:
if not self.args.quiet:
print('Processing has exceeded requested time limit of %d minutes; stopping' \
% self.args.timelimit)
break
# summary report
totals = '\nChecked %(objects)d object(s), %(ds)d datastream(s)' % self.stats
if self.args.all_versions:
totals += ', %(ds_versions)d datastream version(s)' % self.stats
print(totals)
print('%(invalid)d invalid checksum(s)' % self.stats)
print('''%(save_errors)d save error(s), %(relsext_errors)d object(s) with RELS-EXT errors,
%(premis_errors)d object(s) with PREMIS errors''' % self.stats)
# send an email report if appropriate
self.email_report()
#: SPARQL query to find objects without a fixity check recorded
#: Sort by oldest modification time (since the content that has not been
#: modified the longest is most likely higher risk)
#: NOTE: this is for Sparql 1.0; for 1.1 or higher, use FILTER NOT EXISTS
#: 2nd NOTE: apparently modified must be returned to use in ordering
SPARQL_FIND_UNCHECKED = '''
PREFIX eul-repomgmt: <%s>
SELECT ?pid ?modified
WHERE {
?pid <fedora-model:hasModel> <%s> .
?pid <fedora-view:lastModifiedDate> ?modified
OPTIONAL {
?pid <eul-repomgmt:lastFixityCheck> ?checked
}
FILTER (!BOUND(?checked))
} ORDER BY ?modified ''' % (REPOMGMT, object_model)
SPARQL_FIND_UNCHECKED_SINCE = '''
PREFIX eul-repomgmt: <%s>
SELECT ?pid ?checked
WHERE {
?pid <eul-repomgmt:lastFixityCheck> ?checked
FILTER (?checked < xsd:dateTime('%%s'))
} ORDER BY ?checked ''' % (REPOMGMT, )
# number of results to grab at a time from risearch (10, 100, 1000)
RISEARCH_CHUNKSIZE = 100
def pids_to_check(self, days):
'''Generator that returns a list of pids where the object has never had
a fixity check recorded or where the object has not
been checked since the specified number of days.'''
unchecked_count = self.repo.risearch.sparql_count(self.SPARQL_FIND_UNCHECKED)
if not self.args.quiet:
print('\nFound %d unchecked pids.' % unchecked_count)
if unchecked_count:
# Iterate over unchecked items in chunks so we don't load too many
# pids at once. NOTE: this relies on pids getting successfully
# updated and removed from the unchecked list.
results = self.repo.risearch.sparql_query(self.SPARQL_FIND_UNCHECKED,
limit=self.RISEARCH_CHUNKSIZE)
while results:
# dictreader doesn't provide a built in count, so keep
# track of when we hit a result with rows and bail out
has_results = False
for row in results:
# if a pid failed to save, it could show up again
# skip it and don't count it as having results
if row['pid'] in self.save_errors:
continue
yield row['pid']
has_results = True
if not has_results:
break
results = self.repo.risearch.sparql_query(self.SPARQL_FIND_UNCHECKED,
limit=self.RISEARCH_CHUNKSIZE)
delta = datetime.timedelta(days=days)
datesince = datetime.datetime.now() - delta
query = self.SPARQL_FIND_UNCHECKED_SINCE % datesince.strftime('%Y-%m-%dT%H:%M:%S')
unchecked_since_count = self.repo.risearch.sparql_count(query)
if not self.args.quiet:
print('\nFound %d pids not checked in the last %d days.' % \
(unchecked_since_count, self.args.since))
if unchecked_since_count:
# Iterate over recently unchecked items in chunks so we don't
# load too many pids at once.
# NOTE: this relies on pids getting successfully updated
# so they will no longer match this query.
results = self.repo.risearch.sparql_query(query,
limit=self.RISEARCH_CHUNKSIZE)
while results:
has_results = False
for row in results:
# if a pid failed to save, it could show up again
# skip it and don't count it as having results
if row['pid'] in self.save_errors:
continue
yield row['pid']
has_results = True
# if this query had no results, bail out - we've hit the end
if not has_results:
break
results = self.repo.risearch.sparql_query(query,
limit=self.RISEARCH_CHUNKSIZE)
def validate_datastream(self, dsobj):
'''returns a status (valid/invalid/missing) or list of statuses for
all-versions mode'''
if self.args.all_versions:
result = []
# check every version of this datastream
try:
history = dsobj.history()
except Exception as err:
msg = 'Error: failed to get datastream history for %s/%s : %s' % \
(dsobj.obj.pid, dsobj.id, err)
self.fedora_errors.append(msg)
print(msg)
# bail out
return
for ds in history.versions:
try:
res = self.check_datastream(dsobj, ds.created)
self.stats['ds_versions'] += 1
result.append(res)
except Exception as err:
msg = 'Error checking datastream %s/%s %s : %s' % \
(dsobj.obj.pid, dsobj.id, ds.created, err)
self.fedora_errors.append(msg)
print(msg)
return result
else:
# current version only
try:
result = self.check_datastream(dsobj)
return result
except Exception as err:
msg = 'Error checking datastream %s/%s : %s' % \
(dsobj.obj.pid, dsobj.id, err)
self.fedora_errors.append(msg)
print(msg)
def check_datastream(self, dsobj, date=None):
'''Check the validity of a particular datastream. Checks for
invalid datastreams using
:meth:`~eulfedora.models.DatastreamObject.validate_checksum`
:param dsobj: :class:`~eulfedora.models.DatastreamObject` to
be checked
:param date: optional date/time for a particular
version of the datastream to be checked; when not specified,
the current version will be checked
'''
valid = dsobj.validate_checksum(date=date)
missing = False
if not valid:
self.stats['invalid'] += 1
print("Error: %s/%s - invalid checksum (%s)" % \
(dsobj.obj.pid, dsobj.id, date or dsobj.created))
else:
# if checksum is not invalid it could still be missing
# NOTE: missing checksum test can currently only be done on latest version
if date is None and dsobj.checksum_type == 'DISABLED' or \
dsobj.checksum == 'none':
self.stats['missing'] += 1
print("Error: %s/%s - missing checksum (%s)" % \
(dsobj.obj.pid, dsobj.id, date or dsobj.created))
# if invalid or missing, add to the appropriate dictionary for
# tracking and reporting
if not valid or missing:
# determine which dictionary it should be added to
if not valid:
tracker = self.invalid
else:
tracker = self.missing
# key is pid, value is a list of tuples for the datastream id and version date
pid = dsobj.obj.pid
if pid not in tracker:
tracker[pid] = []
tracker[pid].append((dsobj.id, date or dsobj.created))
if valid and not missing:
return 'valid'
elif missing:
return 'missing'
else:
return 'invalid'
def record_fixity_event(self, obj, ds_results):
premis_ds = obj.premis_ds
# if object does not have a premis datastream, there is nothing to do
if premis_ds is None:
return
# brief outcome of the fixity check - pass if everything is valid,
# fail if anything is not valid
if self.args.all_versions:
# in all versions mode, dict value is a list of results
all_results = []
for val in ds_results.values():
all_results += val
result = 'pass' if all(v == 'valid' for v in all_results) else 'fail'
else:
result = 'pass' if all(v == 'valid' for v in ds_results.values()) else 'fail'
if result == 'pass':
if self.args.all_versions:
detailed_info = 'Datastreams checked: %s' % \
', '.join('%s (%d)' % (dsid, len(vals)) for dsid, vals in ds_results.iteritems())
else:
detailed_info = 'Datastreams checked: %s' % ', '.join(ds_results.keys())
else:
if self.args.all_versions:
detailed_info = 'Datastream results: '
details = []
for dsid, results in ds_results.iteritems():
ds_counts = defaultdict(int)
for r in results:
ds_counts[r] += 1
ds_info = []
for result_type in ['valid', 'missing', 'invalid']:
if ds_counts[result_type]:
# i.e. '%(missing)d missing'
template = '%%(%s)d %s' % (result_type, result_type)
ds_info.append(template % ds_counts)
details.append('%s: %s' % (dsid, ', '.join(ds_info)))
detailed_info = 'Datastream results: %s' % ', '.join(details)
else:
detailed_info = 'Datastream results: %s' % \
', '.join(['%s: %s' % (k, v) for k, v in ds_results.iteritems()])
# use appropriate version of premis to keep content valid
try:
if isinstance(premis_ds.content, PremisV1):
event = PremisEventV1()
else:
event = PremisEvent()
except RequestFailed:
print("Error loading PREMIS datastream (%s/%s) to record fixity check event" % \
(obj.pid, premis_ds.id))
self.stats['premis_errors'] += 1
self.premis_errors.append(obj.pid)
return
event.id_type = 'UUID'
event.id = uuid.uuid1()
event.type = 'fixity check'
event.date = datetime.datetime.now().isoformat()
# event detail should be about the program generating the event
# follow convention po parse out program name/version in case of latter processing
event.detail = 'program="eulfedora validate-checksums"; version="%s"' % __version__
# outcome as basic success or failure pass/fail
event.outcome = result
# detailed outcome results
event.outcome_detail = detailed_info
event.agent_type = 'fedora user'
event.agent_id = self.args.fedora_user
premis_ds.content.events.append(event)
# NOTE: could do schema validation here, but not sure it makes
# sense for an automated process like this one should be...
# valid = premis_ds.content.schema_valid()
# print 'schema valid? ', valid
# if not valid:
# print premis_ds.content.validation_errors()
def email_report(self):
# if email or smtp not specified, can't send
if not self.args.email or not self.args.smtp or not self.args.from_email:
return
# if there are no errors or problems, don't send an email
if not any([self.invalid, self.missing, self.save_errors,
self.relsext_errors, self.premis_errors]):
return
# construct the body of the email
output = StringIO()
for tracker, label in [(self.invalid, 'Invalid'),
(self.missing, 'Missing')]:
# if any items were found in this category (invalid or missing)
if tracker:
# heading label
output.write('\n%s checksums detected:' % label)
# list of pids and information about the datastreams with errors
for pid, vals in tracker.iteritems():
output.write(' %s' % pid)
for dsid, d in vals:
output.write('\t%s (%s)' % (dsid, d))
if self.save_errors:
output.write(u'\nError saving the following objects:')
output.write(u' ' + '\n '.join(self.save_errors))
if self.relsext_errors:
output.write(u'\nError updating the RELS-EXT for the following objects:')
output.write(u' ' + '\n '.join(self.relsext_errors))
if self.premis_errors:
output.write(u'\nError adding PREMIS fixity event for the following objects:')
output.write(u' ' + '\n '.join(self.premis_errors))
if self.fedora_errors:
output.write(u'\nFedora errors when attempting to validate checksums:')
output.write(u' ' + u'\n '.join(self.fedora_errors))
msg = MIMEText(output.getvalue())
output.close()
if ',' in self.args.email:
to_addresses = [e.strip() for e in self.args.email.split(',')]
else:
to_addresses = [self.args.email]
msg['Subject'] = 'Checksum Validation report for %s on %s' % \
(self.args.fedora_root, datetime.date.today())
msg['From'] = self.args.from_email
msg['To'] = ', '.join(to_addresses)
if not self.args.quiet:
print('Sending email report to %s' % ', '.join(to_addresses))
# Send the message via our the specified SMTP server
s = smtplib.SMTP(self.args.smtp)
s.sendmail(self.args.from_email, to_addresses, msg.as_string())
s.quit()
def interrupt_handler(self, signum, frame):
'''Gracefully handle a SIGINT, if possible. Sets a flag so main script
loop can exit cleanly, and restores the default SIGINT behavior,
so that a second interrupt will stop the script.
'''
if signum == signal.SIGINT:
# restore default signal handler so a second SIGINT can be used to quit
signal.signal(signal.SIGINT, signal.SIG_DFL)
# set interrupt flag so main loop knows to quit at a reasonable time
self.interrupted = True
# report if script is in the middle of an object
print('Script will exit after processing the current object.')
print('(Ctrl-C / Interrupt again to quit immediately)')
## config file handling (generate config, load config)
repo_cfg = 'Fedora Settings'
proc_cfg = 'Processing Options'
email_cfg = 'Email options'
# patch in a non-django encryption key
cryptutil.ENCRYPTION_KEY = 'ae764f8rRBN8Y1n4CG56188JXZH1nox6'
def setup_configparser(self):
# define a config file parser with same basic options as command line
# - use command line settings as initial values
config = configparser.ConfigParser()
# fedora connection settings
config.add_section(self.repo_cfg)
config.set(self.repo_cfg, 'fedora_root', self.args.fedora_root)
config.set(self.repo_cfg, 'fedora_user', self.args.fedora_user)
# encrypt passwd before storing it
pwd = self.args.fedora_password
if pwd is not None:
config.set(self.repo_cfg, 'fedora_password', force_text(base64.b64encode(cryptutil.encrypt(pwd))))
# processing options
config.add_section(self.proc_cfg)
config.set(self.proc_cfg, 'max', str(self.args.max))
config.set(self.proc_cfg, 'all_versions', str(self.args.all_versions))
config.set(self.proc_cfg, 'since', str(self.args.since))
config.set(self.proc_cfg, 'timelimit', str(self.args.timelimit))
config.set(self.proc_cfg, 'max_size', str(self.args.max_size))
# email options
config.add_section(self.email_cfg)
config.set(self.email_cfg, 'email', str(self.args.email))
config.set(self.email_cfg, 'from', str(self.args.from_email))
config.set(self.email_cfg, 'smtp', str(self.args.smtp))
return config
def generate_configfile(self):
config = self.setup_configparser()
with open(self.args.gen_config, 'w') as cfgfile:
config.write(cfgfile)
if not self.args.quiet:
print('Config file created at %s' % self.args.gen_config)
def load_configfile(self):
cfg = configparser.ConfigParser()
with open(self.args.config) as cfgfile:
cfg.readfp(cfgfile)
# set args from config, making sure not to override any
# non-defaults sepcified on the command line
# - fedora opts
if cfg.has_section(self.repo_cfg):
if cfg.has_option(self.repo_cfg, 'fedora_root') and \
not self.args.fedora_root:
self.args.fedora_root = cfg.get(self.repo_cfg, 'fedora_root')
if cfg.has_option(self.repo_cfg, 'fedora_user') and \
not self.args.fedora_user:
self.args.fedora_user = cfg.get(self.repo_cfg, 'fedora_user')
if cfg.has_option(self.repo_cfg, 'fedora_password') and \
not self.args.fedora_password:
self.args.fedora_password = force_text(cryptutil.decrypt(base64.b64decode(cfg.get(self.repo_cfg, 'fedora_password'))))
# - processing opts
if cfg.has_section(self.proc_cfg):
if cfg.has_option(self.proc_cfg, 'max') and not self.args.max:
try:
self.args.max = cfg.getint(self.proc_cfg, 'max')
except:
self.args.max = None
if cfg.has_option(self.proc_cfg, 'all_versions') and not \
self.args.all_versions:
self.args.all_versions = cfg.getboolean(self.proc_cfg, 'all_versions')
# since is always set, so command-line opt should only override if
# set to a non-default value, to allow config take precedence
if cfg.has_option(self.proc_cfg, 'since') and \
self.args.since == self.default_days_betweeen_checks:
self.args.since = cfg.getint(self.proc_cfg, 'since')
if cfg.has_option(self.proc_cfg, 'timelimit') and not \
self.args.timelimit:
try:
self.args.timelimit = cfg.getint(self.proc_cfg, 'timelimit')
except:
# if None or non-numeric, getint will fail; ignore invalid number
pass
if cfg.has_option(self.proc_cfg, 'max_size') and not \
self.args.max_size:
try:
self.args.max_size = cfg.getint(self.proc_cfg, 'max_size')
except:
# if None or non-numeric, getint will fail; ignore invalid number
pass
# - email opts
if cfg.has_section(self.email_cfg):
if cfg.has_option(self.email_cfg, 'email') and not self.args.email:
self.args.email = cfg.get(self.email_cfg, 'email')
if self.args.email == 'None':
self.args.email = None
if cfg.has_option(self.email_cfg, 'from') and not self.args.from_email:
self.args.from_email = cfg.get(self.email_cfg, 'from')
if self.args.from_email == 'None':
self.args.from_email = None
if cfg.has_option(self.email_cfg, 'smtp') and not \
self.args.smtp:
self.args.smtp = cfg.get(self.email_cfg, 'smtp')
if self.args.smtp == 'None':
self.args.smtp = None
class PasswordAction(argparse.Action):
'''Use :meth:`getpass.getpass` to prompt for a password for a
command-line argument.'''
def __call__(self, parser, namespace, value, option_string=None):
# if a value was specified on the command-line, use that
if value:
setattr(namespace, self.dest, value)
# otherwise, use getpass to prompt for a password
else:
setattr(namespace, self.dest, getpass())
if __name__ == '__main__':
ValidateChecksums().run()