emory-libraries/eulfedora

View on GitHub
eulfedora/syncutil.py

Summary

Maintainability
D
2 days
Test Coverage
# file eulfedora/syncutil.py
#
#   Copyright 2016 Emory University Libraries & IT Services
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.


import binascii
import hashlib
import io
import logging
import math
import re
import six
from six.moves.urllib.parse import urlparse

try:
    import progressbar
except ImportError:
    progressbar = None

from eulfedora.util import force_bytes, force_text


logger = logging.getLogger(__name__)


def sync_object(src_obj, dest_repo, export_context='migrate',
                overwrite=False, show_progress=False,
                requires_auth=False, omit_checksums=False,
                verify=False):
    '''Copy an object from one repository to another using the Fedora
    export functionality.

    :param src_obj: source :class:`~eulfedora.models.DigitalObject` to
        be copied
    :param dest_repo: destination  :class:`~eulfedora.server.Repository`
        where the object will be copied to
    :param export_context: Fedora export format to use, one of "migrate"
        or "archive"; migrate is generally faster, but requires access
        from destination repository to source and may result in checksum
        errors for some content; archive exports take longer to process
        (default: migrate)
    :param overwrite: if an object with the same pid is already present
        in the destination repository, it will be removed only if
        overwrite is set to true (default: false)
    :param show_progress: if True, displays a progress bar with content size,
        progress, speed, and ETA (only applicable to archive exports)
    :param requires_auth: content datastreams require authentication,
        and should have credentials patched in (currently only supported
        in archive-xml export mode)  (default: False)
    :param omit_checksums: scrubs contentDigest -- aka checksums -- from datastreams;
        helpful for datastreams with Redirect (R) or External (E) contexts
        (default: False)
    :returns: result of Fedora ingest on the destination repository on
        success
    '''

    # NOTE: currently exceptions are expected to be handled by the
    # calling method; see repo-cp script for an example

    # if overwrite is not requested, check first and bail out
    dest_obj = dest_repo.get_object(src_obj.pid)
    if not overwrite and dest_obj.exists:
        logger.info('%s exists in destination repo and no overwrite; skipping',
                    src_obj.pid)
        return False

    if show_progress and progressbar:
        # calculate rough estimate of object size
        size_estimate = estimate_object_size(src_obj,
            archive=(export_context in ['archive', 'archive-xml']))
        # create a new progress bar with current pid and size
        widgets = [src_obj.pid,
            ' Estimated size: %s // ' % humanize_file_size(size_estimate),
            'Read: ', progressbar.widgets.DataSize(), ' ',
            progressbar.widgets.AdaptiveTransferSpeed(), ' ',
            '| Uploaded: ', progressbar.widgets.DataSize(value='upload'), ' // ',
            # FileTransferSpeed('upload'), currently no way to track upload speed...
             progressbar.widgets.Timer(), ' | ', progressbar.widgets.AdaptiveETA()
            ]

        class DownUpProgressBar(progressbar.ProgressBar):
            upload = 0
            def data(self):
                data = super(DownUpProgressBar, self).data()
                data['upload'] = self.upload
                return data

        pbar = DownUpProgressBar(widgets=widgets, max_value=size_estimate)
    else:
        pbar = None

    # migrate export can simply be read and uploaded to dest fedora
    if export_context == 'migrate':
        response = src_obj.api.export(src_obj, context=export_context, stream=True)
        export_data = response.iter_content(4096*1024)

    # archive export needs additional processing to handle large binary content
    elif export_context in ['archive', 'archive-xml']:
        export = ArchiveExport(src_obj, dest_repo,
            progress_bar=pbar, requires_auth=requires_auth,
            xml_only=(export_context == 'archive-xml'),
            verify=verify)
        # NOTE: should be possible to pass BytesIO to be read, but that is failing
        export_data = export.object_data().getvalue()

    else:
        raise Exception('Unsupported export context %s', export_context)

    # wipe checksums from FOXML if flagged in options
    if omit_checksums:
        checksum_re = re.compile(b'<foxml:contentDigest.+?/>')
        try:
            # export data is either a string
            export_data = checksum_re.sub(b'', export_data)
        except TypeError:
            export_data = (re.sub(checksum_re, '', chunk)
                           for chunk in export_data)

    if overwrite and dest_obj.exists:
        logger.debug('Overwriting %s in destination repository', src_obj.pid)
        dest_repo.purge_object(src_obj.pid)

    result = dest_repo.ingest(export_data)
    if pbar:
        pbar.finish()
    return force_text(result)

## constants for binary content end and start
#: foxml binary content start tag
BINARY_CONTENT_START = b'<foxml:binaryContent>'
#: foxml binary content end tag
BINARY_CONTENT_END = b'</foxml:binaryContent>'


class ArchiveExport(object):
    '''Iteratively process a Fedora archival export in order to copy
    an object into another fedora repository.  Use :meth:`object_data`
    to process the content and provides the foxml to be ingested into
    the destination repository.

    :param obj: source :class:`~eulfedora.models.DigitalObject` to
        be copied
    :param dest_repo: destination :class:`~eulfedora.server.Repository`
        where the object will be copied to
    :param verify: if True, datastream sizes and MD5 checksums will
        be calculated as they are decoded and logged for verification
        (default: False)
    :param progress_bar: optional progressbar object to be updated as
        the export is read and processed
    :param requires_auth: content datastreams require authentication,
        and should have credentials patched in; currently only relevant
        when xml_only is True. (default: False)
    :param xml_only: only use archival data for xml datastreams;
        use fedora datastream dissemination urls for all non-xml content
        (optionally with credentials, if requires_auth is set).
        (default: False)
    '''

    #: regular expression used to identify datastream version information
    #: that is needed for processing datastream content in an archival export
    dsinfo_regex = re.compile(
        r'ID="(?P<id>[^"<>]+)"[^>]*CREATED="(?P<created>[^"<>]+)"[^>]' + \
        r'*MIMETYPE="(?P<mimetype>[^"<>]+)"[^>]*SIZE="(?P<size>\d+)">' + \
        r'\s*<foxml:contentDigest\s+TYPE="(?P<type>[^<>"]+)"[^>]*' + \
        r'DIGEST="(?P<digest>[0-9a-f]*)"/>\s*',
        flags=re.MULTILINE|re.DOTALL)

    # NOTE: regex allows for digest to be empty

    #: url credentials, if needed for datastream content urls
    url_credentials = None

    def __init__(self, obj, dest_repo, verify=False, progress_bar=None,
        requires_auth=False, xml_only=False):
        self.obj = obj
        self.dest_repo = dest_repo
        self.verify = verify
        self.xml_only = xml_only
        self.progress_bar = progress_bar
        if requires_auth:
            # if auth is required, create a credentials string
            # in format to be inserted into a url
            self.url_credentials = '%s:%s@' % (obj.api.username, obj.api.password)

        self.processed_size = 0
        self.foxml_buffer = io.BytesIO()
        self.within_file = False

    _export_response = None

    def get_export(self):
        if self._export_response is None:
            self._export_response = self.obj.api.export(self.obj.pid,
                context='archive', stream=True)
        return self._export_response

    read_block_size = 4096*1024*1024

    _iter_content = None

    _current_chunk = None

    def current_chunk(self):
        return self._current_chunk

    partial_chunk = False
    section_start_idx = None
    end_of_last_chunk = None
    _chunk_leftover = b''

    def get_next_chunk(self):
        self.partial_chunk = False
        if self._iter_content is None:
            self._iter_content = self.get_export().iter_content(self.read_block_size)

        if self._current_chunk is not None:
            self.end_of_last_chunk = self._current_chunk[-400:]

        self._current_chunk = self._chunk_leftover + six.next(self._iter_content)

        # check if chunk ends with a partial binary content tag
        len_to_save = (endswith_partial(self._current_chunk, BINARY_CONTENT_START) \
                    or endswith_partial(self._current_chunk, BINARY_CONTENT_END))
        # if it does, save that content for the next chunk
        if len_to_save:
            self._chunk_leftover = self._current_chunk[-len_to_save:]
            self._current_chunk = self._current_chunk[:-len_to_save]
        else:
            self._chunk_leftover = b''

        return self._current_chunk

    _current_sections = None

    def get_next_section(self):
        if self._current_sections is None:
            if self._current_chunk is None:
                self.get_next_chunk()

            self._current_sections = list(binarycontent_sections(self.current_chunk()))

        if self._current_sections:
            next_section = self._current_sections.pop(0)
            self.processed_size += len(next_section)
            self.update_progressbar()
            return next_section
        else:
            # if current list of sections is empty, look for more content
            # this will raise stop iteration at end of content
            self.get_next_chunk()
            self._current_sections = list(binarycontent_sections(self.current_chunk()))
            return self.get_next_section()

    def get_datastream_info(self, dsinfo):
        '''Use regular expressions to pull datastream [version]
        details (id, mimetype, size, and checksum) for binary content,
        in order to sanity check the decoded data.

        :param dsinfo: text content just before a binaryContent tag
        :returns: dict with keys for id, mimetype, size, type and digest,
            or None if no match is found
        '''
        # we only need to look at the end of this section of content
        dsinfo = dsinfo[-750:]
        # if not enough content is present, include the end of
        # the last read chunk, if available
        if len(dsinfo) < 750 and self.end_of_last_chunk is not None:
            dsinfo = self.end_of_last_chunk + dsinfo

        # force text needed for python 3 compatibility (in python 3
        # dsinfo is bytes instead of a string)
        try:
            text = force_text(dsinfo)
        except UnicodeDecodeError as err:
            # it's possible to see a unicode character split across
            # read blocks; if we get an "invalid start byte" unicode
            # decode error, try converting the text without the first
            # character; if that's the problem, it's not needed
            # for datastream context
            if 'invalid start byte' in force_text(err):
                text = force_text(dsinfo[1:])
            else:
                raise err

        # in case the text contains multiple datastream ids, find
        # all matches and then use the last, since we want the last one
        # in this section, just before the datastream content
        matches = list(self.dsinfo_regex.finditer(text))
        if matches:
            infomatch = matches[-1]
            return infomatch.groupdict()

    def update_progressbar(self):
        # update progressbar if we have one
        if self.progress_bar is not None:
            # progressbar doesn't like it when size exceeds maxval,
            # but we don't actually know maxval; adjust the maxval up
            # when necessary
            if self.progress_bar.max_value < self.processed_size:
                self.progress_bar.max_value = self.processed_size
            self.progress_bar.update(self.processed_size)

    def object_data(self):
        '''Process the archival export and return a buffer with foxml
        content for ingest into the destination repository.

        :returns: :class:`io.BytesIO` for ingest, with references
            to uploaded datastream content or content location urls
        '''
        self.foxml_buffer = io.BytesIO()

        if self.progress_bar:
            self.progress_bar.start()

        previous_section = None
        while True:
            try:
                section = self.get_next_section()
            except StopIteration:
                break

            if section == BINARY_CONTENT_START:
                self.within_file = True

                # get datastream info from the end of the section just before this one
                # (needed to provide size to upload request)
                dsinfo = self.get_datastream_info(previous_section)
                if dsinfo:
                    logger.info('Found encoded datastream %(id)s (%(mimetype)s, size %(size)s, %(type)s %(digest)s)',
                                dsinfo)
                else:
                    # error if datastream info is not found, because either
                    # size or version date is required to handle content
                    raise Exception('Failed to find datastream information for %s from \n%s' \
                        % (self.obj.pid, previous_section))

                if self.xml_only and not \
                   dsinfo['mimetype'] in ['text/xml', 'application/rdf+xml',
                                          'application/xml']:
                    # possibly other mimetypes also?
                    try:
                        dsid = dsinfo['id'].split('.')[0]
                    except ValueError:
                        # if dsid doesn't include a .# (for versioning),
                        # use the id as is.
                        dsid = dsinfo['id']

                    if self.url_credentials:
                        # if url credentials are set, parse the base fedora api
                        # url so they can be inserted at the right place
                        parsed_url = urlparse(self.obj.api.base_url)
                        # reassemble base url, adding in credentials
                        base_url = ''.join([parsed_url.scheme, '://',
                            self.url_credentials, parsed_url.netloc,
                            parsed_url.path])
                    else:
                        base_url = self.obj.api.base_url

                    # versioned datastream dissemination url
                    content_location = '%sobjects/%s/datastreams/%s/content?asOfDateTime=%s' % \
                        (base_url, self.obj.pid, dsid, dsinfo['created'])
                else:
                    upload_args = {}
                    if self.progress_bar:
                        def upload_callback(monitor):
                            self.progress_bar.upload = monitor.bytes_read
                        upload_args = {'callback': upload_callback}

                    # use upload id as content location
                    content_location = self.dest_repo.api.upload(self.encoded_datastream(),
                        size=int(dsinfo['size']), **upload_args)

                self.foxml_buffer.write(force_bytes('<foxml:contentLocation REF="%s" TYPE="URL"/>' \
                    % content_location))

            elif section == BINARY_CONTENT_END:
                # should not occur here; this section will be processed by
                # encoded_datastream method
                self.within_file = False

            elif self.within_file:
                # should not occur here; this section will be pulled by
                # encoded_datastream method

                # binary content within a file - ignore here
                # (handled by encoded_datastream method)
                continue

            else:
                # not start or end of binary content, and not
                # within a file, so yield as is (e.g., datastream tags
                # between small files)
                self.foxml_buffer.write(section)

            previous_section = section

        return self.foxml_buffer

    # generator to iterate through sections and possibly next chunk
    # for upload to fedora
    def encoded_datastream(self):
        '''Generator for datastream content. Takes a list of sections
        of data within the current chunk (split on binaryContent start and
        end tags), runs a base64 decode, and yields the data.  Computes
        datastream size and MD5 as data is decoded for sanity-checking
        purposes.  If binary content is not completed within the current
        chunk, it will retrieve successive chunks of export data until it
        finds the end.  Sets a flag when partial content is left within
        the current chunk for continued processing by :meth:`object_data`.

        :param sections: list of export data split on binary content start
            and end tags, starting with the first section of binary content
        '''

        # return a generator of data to be uploaded to fedora
        size = 0
        if self.verify:
            md5 = hashlib.md5()
        leftover = None

        while self.within_file:
            content = self.get_next_section()
            if content == BINARY_CONTENT_END:
                if self.verify:
                    logger.info('Decoded content size %s (%s) MD5 %s',
                        size, humanize_file_size(size), md5.hexdigest())

                self.within_file = False

            elif self.within_file:
                # if there was leftover binary content from the last chunk,
                # add it to the content now
                if leftover is not None:
                    content = b''.join([leftover, content])
                    leftover = None

                try:
                    # decode method used by base64.decode
                    decoded_content = binascii.a2b_base64(content)
                except binascii.Error:
                    # decoding can fail with a padding error when
                    # a line of encoded content runs across a read chunk
                    lines = content.split(b'\n')
                    # decode and yield all but the last line of encoded content
                    decoded_content = binascii.a2b_base64(b''.join(lines[:-1]))
                    # store the leftover to be decoded with the next chunk
                    leftover = lines[-1]

                if decoded_content is not None:
                    if self.verify:
                        md5.update(decoded_content)

                    size += len(decoded_content)
                    yield decoded_content


def binarycontent_sections(chunk):
    '''Split a chunk of data into sections by start and end binary
    content tags.'''
    # using string split because it is significantly faster than regex.

    # use common text of start and end tags to split the text
    # (i.e. without < or </ tag beginning)
    binary_content_tag = BINARY_CONTENT_START[1:]
    if binary_content_tag not in chunk:
        # if no tags are present, don't do any extra work
        yield chunk

    else:
        # split on common portion of foxml:binaryContent
        sections = chunk.split(binary_content_tag)
        for sec in sections:
            extra = b''
            # check the end of the section to determine start/end tag
            if sec.endswith(b'</'):
                extra = sec[-2:]
                yield sec[:-2]

            elif sec.endswith(b'<'):
                extra = sec[-1:]
                yield sec[:-1]

            else:
                yield sec

            if extra:
                # yield the actual binary content tag
                # (delimiter removed by split, but needed for processing)
                yield b''.join([extra, binary_content_tag])


def estimate_object_size(obj, archive=True):
    '''Calculate a rough estimate of object size, based on the sizes of
    all versions of all datastreams.  If archive is true, adjusts
    the size estimate of managed datastreams for base64 encoded data.
    '''
    size_estimate = 250000   # initial rough estimate for foxml size
    for ds in obj.ds_list:
        dsobj = obj.getDatastreamObject(ds)
        for version in dsobj.history().versions:
            if archive and version.control_group == 'M':
                size_estimate += base64_size(version.size)
            else:
                size_estimate += version.size

    return size_estimate


def base64_size(input_size):
    # from http://stackoverflow.com/questions/1533113/calculate-the-size-to-a-base-64-encoded-message
    adjustment = 3 - (input_size % 3) if (input_size % 3) else 0
    code_padded_size = ((input_size + adjustment) / 3) * 4
    newline_size = ((code_padded_size) / 76) * 1
    return code_padded_size + newline_size


def humanize_file_size(size):
    # human-readable file size from
    # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
    size = abs(size)
    if size == 0:
        return "0B"
    units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']
    p = math.floor(math.log(size, 2)/10)
    return "%.2f%s" % (size/math.pow(1024, p), units[int(p)])


def endswith_partial(text, partial_str):
    '''Check if the text ends with any partial version of the
    specified string.'''
    # at the end of the content
    # we don't care about complete overlap, so start checking
    # for matches without the last character
    test_str = partial_str[:-1]
    # look for progressively smaller segments
    while test_str:
        if text.endswith(test_str):
            return len(test_str)
        test_str = test_str[:-1]
    return False