emory-libraries/eulxml

View on GitHub
eulxml/xmlmap/cerp.py

Summary

Maintainability
B
5 hrs
Test Coverage
# file eulxml/xmlmap/cerp.py
#
#   Copyright 2010,2011 Emory University Libraries
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from __future__ import unicode_literals
import codecs
import datetime
import email
import logging
import os

import six

from eulxml import xmlmap
from eulxml.utils.compat import u

logger = logging.getLogger(__name__)

# CERP is described at http://siarchives.si.edu/cerp/ . XML spec available at
# http://www.records.ncdcr.gov/emailpreservation/mail-account/mail-account_docs.html
# schema resolves but appears to be empty as of April 2016
# Current schema : http://www.history.ncdcr.gov/SHRAB/ar/emailpreservation/mail-account/mail-account.xsd
# internally-reused and general-utility objects
#

class _BaseCerp(xmlmap.XmlObject):
    'Common CERP namespace declarations'
    ROOT_NS = 'http://www.archives.ncdcr.gov/mail-account'
    ROOT_NAMESPACES = { 'xm': ROOT_NS }


class Parameter(_BaseCerp):
    ROOT_NAME = 'Parameter'
    name = xmlmap.StringField('xm:Name')
    value = xmlmap.StringField('xm:Value')

    def __str__(self):
        return '%s=%s' % (self.name, self.value)

    def __repr__(self):
        return '<%s %s>' % (self.__class__.__name__, str(self))


class Header(_BaseCerp):
    ROOT_NAME = 'Header'
    name = xmlmap.StringField('xm:Name')
    value = xmlmap.StringField('xm:Value')
    comments = xmlmap.StringListField('xm:Comments')

    def __str__(self):
        return '%s: %s' % (self.name, self.value)

    def __repr__(self):
        return '<%s %s>' % (self.__class__.__name__, self.name)


class _BaseBody(_BaseCerp):
    '''Common email header elements'''
    content_type_list = xmlmap.StringListField('xm:ContentType')
    charset_list = xmlmap.StringListField('xm:Charset')
    content_name_list = xmlmap.StringListField('xm:ContentName')
    content_type_comments_list = xmlmap.StringListField('xm:ContentTypeComments')
    content_type_param_list = xmlmap.NodeListField('xm:ContentTypeParam', Parameter)
    transfer_encoding_list = xmlmap.StringListField('xm:TransferEncoding')
    transfer_encoding_comments_list = xmlmap.StringListField('xm:TransferEncodingComments')
    content_id_list = xmlmap.StringListField('xm:ContentId')
    content_id_comments_list = xmlmap.StringListField('xm:ContentIdComments')
    description_list = xmlmap.StringListField('xm:Description')
    description_comments_list = xmlmap.StringListField('xm:DescriptionComments')
    disposition_list = xmlmap.StringListField('xm:Disposition')
    disposition_file_name_list = xmlmap.StringListField('xm:DispositionFileName')
    disposition_comments_list = xmlmap.StringListField('xm:DispositionComments')

    disposition_params = xmlmap.NodeListField('xm:DispositionParams', Parameter)
    other_mime_headers = xmlmap.NodeListField('xm:OtherMimeHeader', Header)


class Hash(_BaseCerp):
    ROOT_NAME = 'Hash'
    HASH_FUNCTION_CHOICES = [ 'MD5', 'WHIRLPOOL', 'SHA1', 'SHA224',
                              'SHA256', 'SHA384', 'SHA512', 'RIPEMD160']

    value = xmlmap.StringField('xm:Value')
    function = xmlmap.StringField('xm:Function',
            choices=HASH_FUNCTION_CHOICES)

    def __str__(self):
        return self.value

    def __repr__(self):
        return '<%s %s>' % (self.__class__.__name__, self.function)


class _BaseExternal(_BaseCerp):
    '''Common external entity reference elements'''
    EOL_CHOICES = [ 'CR', 'LF', 'CRLF' ]

    rel_path = xmlmap.StringField('xm:RelPath')
    eol = xmlmap.StringField('xm:Eol', choices=EOL_CHOICES)
    hash = xmlmap.NodeField('xm:Hash', Hash)


class _BaseContent(_BaseCerp):
    '''Common content encoding elements'''
    charset_list = xmlmap.StringListField('xm:CharSet')
    transfer_encoding_list = xmlmap.StringListField('xm:TransferEncoding')


#
# messages and bodies
#

class BodyContent(_BaseContent):
    ROOT_NAME = 'BodyContent'
    content = xmlmap.StringField('xm:Content')


class ExtBodyContent(_BaseExternal, _BaseContent):
    ROOT_NAME = 'ExtBodyContent'
    local_id = xmlmap.IntegerField('xm:LocalId')
    xml_wrapped = xmlmap.SimpleBooleanField('xm:XMLWrapped',
            true='1', false='0')


class SingleBody(_BaseBody):
    ROOT_NAME = 'SingleBody'

    body_content = xmlmap.NodeField('xm:BodyContent', BodyContent)
    ext_body_content = xmlmap.NodeField('xm:ExtBodyContent', ExtBodyContent)
    child_message = xmlmap.NodeField('xm:ChildMessage', None) # this will be fixed below
    @property
    def content(self):
        return self.body_content or \
               self.ext_body_content or \
               self.child_message

    phantom_body = xmlmap.StringField('xm:PhantomBody')


class MultiBody(_BaseCerp):
    ROOT_NAME = 'MultiBody'
    preamble = xmlmap.StringField('xm:Preamble')
    epilogue = xmlmap.StringField('xm:Epilogue')

    single_body = xmlmap.NodeField('xm:SingleBody', SingleBody)
    multi_body = xmlmap.NodeField('xm:MultiBody', 'self')
    @property
    def body(self):
        return self.single_body or self.multi_body


class Incomplete(_BaseCerp):
    ROOT_NAME = 'Incomplete'
    error_type = xmlmap.StringField('xm:ErrorType')
    error_location = xmlmap.StringField('xm:ErrorLocation')

    def __repr__(self):
        return '<%s %s>' % (self.__class__.__name__, self.error_type)


class _BaseMessage(_BaseCerp):
    '''Common message elements'''
    local_id = xmlmap.IntegerField('xm:LocalId')
    message_id = xmlmap.StringField('xm:MessageId')
    message_id_supplied = xmlmap.SimpleBooleanField('xm:MessageId/@Supplied',
            true='1', false=None)
    mime_version = xmlmap.StringField('xm:MimeVersion')
    orig_date_list = xmlmap.StringListField('xm:OrigDate') # FIXME: really datetime
    # NOTE: eulxml.xmlmap.DateTimeField supports specifying format,
    # but we might need additional work since %z only works with
    # strftime, not strptime
    from_list = xmlmap.StringListField('xm:From')
    sender_list = xmlmap.StringListField('xm:Sender')
    to_list = xmlmap.StringListField('xm:To')
    cc_list = xmlmap.StringListField('xm:Cc')
    bcc_list = xmlmap.StringListField('xm:Bcc')
    in_reply_to_list = xmlmap.StringListField('xm:InReplyTo')
    references_list = xmlmap.StringListField('xm:References')
    subject_list = xmlmap.StringListField('xm:Subject')
    comments_list = xmlmap.StringListField('xm:Comments')
    keywords_list = xmlmap.StringListField('xm:Keywords')
    headers = xmlmap.NodeListField('xm:Header', Header)

    single_body = xmlmap.NodeField('xm:SingleBody', SingleBody)
    multi_body = xmlmap.NodeField('xm:MultiBody', MultiBody)
    @property
    def body(self):
        return self.single_body or self.multi_body

    incomplete_list = xmlmap.NodeField('xm:Incomplete', Incomplete)

    def __repr__(self):
        return '<%s %s>' % (self.__class__.__name__,
                self.message_id or self.local_id or '(no id)')


class Message(_BaseMessage, _BaseExternal):
    """A single email message in a :class:`Folder`."""

    ROOT_NAME = 'Message'
    STATUS_FLAG_CHOICES = [ 'Seen', 'Answered', 'Flagged', 'Deleted',
                            'Draft', 'Recent']
    status_flags = xmlmap.StringListField('xm:StatusFlag',
            choices=STATUS_FLAG_CHOICES)

    @classmethod
    def from_email_message(cls, message, local_id=None):
        '''
        Convert an :class:`email.message.Message` or compatible message
        object into a CERP XML :class:`eulxml.xmlmap.cerp.Message`. If an
        id is specified, it will be stored in the Message <LocalId>.

        :param message: `email.message.Message` object
        :param id: optional message id to be set as `local_id`

        :returns: :class:`eulxml.xmlmap.cerp.Message` instance populated
            with message information

        '''
        result = cls()
        if local_id is not None:
            result.local_id = id

        message_id = message.get('Message-Id')
        if message_id:
            result.message_id_supplied = True
            result.message_id = message_id

        result.mime_version = message.get('MIME-Version')

        dates = message.get_all('Date', [])
        result.orig_date_list.extend([parse_mail_date(d) for d in dates])

        result.from_list.extend(message.get_all('From', []))
        result.sender_list.extend(message.get_all('From', []))
        try:
            result.to_list.extend(message.get_all('To', []))
        except UnicodeError:
            print(repr(message['To']))
            raise
        result.cc_list.extend(message.get_all('Cc', []))
        result.bcc_list.extend(message.get_all('Bcc', []))
        result.in_reply_to_list.extend(message.get_all('In-Reply-To', []))
        result.references_list.extend(message.get_all('References', []))
        result.subject_list.extend(message.get_all('Subject', []))
        result.comments_list.extend(message.get_all('Comments', []))
        result.keywords_list.extend(message.get_all('Keywords', []))

        headers = [ Header(name=key, value=val) for key, val in message.items() ]
        result.headers.extend(headers)

        # FIXME: skip multipart messages for now
        if not message.is_multipart():
            result.create_single_body()

            # FIXME: this is a small subset of the actual elements CERP allows.
            # we should add the rest of them, too.

            # message.get_content_type() always returns something. only
            # put it in the CERP if a Content-Type was explicitly specified.
            if message['Content-Type']:
                result.single_body.content_type_list.append(message.get_content_type())
            if message.get_content_charset():
                result.single_body.charset_list.append(message.get_content_charset())
            if message.get_filename():
                result.single_body.content_name_list.append(message.get_filename())

            # FIXME: attaching the body_content only makes sense for text
            # content types. we'll eventually need a better solution for
            # non-text messages
            result.single_body.create_body_content()
            payload = message.get_payload(decode=False)

            # if not unicode, attempt to convert
            if isinstance(payload, six.binary_type):
                charset = message.get_charset()
                # decode according to the specified character set, if any
                if charset is not None:
                    charset_decoder = codecs.getdecoder(str(charset))
                    payload, length = charset_decoder(payload)

                # otherwise, just try to convert
                else:
                    payload = u(payload)

            # remove any control characters not allowed in XML
            control_char_map = dict.fromkeys(range(32))
            for i in [9, 10, 13]: # preserve horizontal tab, line feed, carriage return
                del control_char_map[i]

            payload = u(payload).translate(control_char_map)

            result.single_body.body_content.content = payload

        else:
            # TODO: handle multipart
            logger.warn('CERP conversion does not yet handle multipart')

        # assume we've normalized newlines:
        result.eol = EOLMAP[os.linesep]

        return result


class ChildMessage(_BaseMessage):
    ROOT_NAME = 'ChildMessage'
    # no additional elements

# Patch-up from above. FIXME: This is necessary because of recursive
# NodeFields. eulxml.xmlmap.NodeField doesn't currently support these
SingleBody.child_message.node_class = ChildMessage

#
# accounts and folders
#

class Mbox(_BaseExternal):
    ROOT_NAME = 'Mbox'
    # no additional fields


class Folder(_BaseCerp):
    """A single email folder in an :class:`Account`, composed of multiple
    :class:`Message` objects and associated metadata."""

    ROOT_NAME = 'Folder'

    name = xmlmap.StringField('xm:Name')
    messages = xmlmap.NodeListField('xm:Message', Message)
    subfolders = xmlmap.NodeListField('xm:Folder', 'self')
    mboxes = xmlmap.NodeListField('xm:Mbox', Mbox)

    def __repr__(self):
        return '<%s %s>' % (self.__class__.__name__, self.name)


class ReferencesAccount(_BaseCerp):
    ROOT_NAME = 'ReferencesAccount'
    REF_TYPE_CHOICES = [ 'PreviousContent', 'SubsequentContent',
                         'Supplemental', 'SeeAlso', 'SeeInstead' ]

    href = xmlmap.StringField('xm:Href')
    email_address = xmlmap.StringField('xm:EmailAddress')
    reference_type = xmlmap.StringField('xm:RefType',
            choices=REF_TYPE_CHOICES)


class Account(_BaseCerp):
    """A single email account associated with a single email address and
    composed of multiple :class:`Folder` objects and additional metadata."""

    ROOT_NAME = 'Account'
    XSD_SCHEMA = 'http://www.history.ncdcr.gov/SHRAB/ar/emailpreservation/mail-account/mail-account.xsd'

    email_address = xmlmap.StringField('xm:EmailAddress')
    global_id = xmlmap.StringField('xm:GlobalId')
    references_accounts = xmlmap.NodeListField('xm:ReferencesAccount',
            ReferencesAccount)
    folders = xmlmap.NodeListField('xm:Folder', Folder)

    def __repr__(self):
        return '<%s %s>' % (self.__class__.__name__,
                self.global_id or self.email_address or '(no id)')




def parse_mail_date(datestr):
    '''Helper method used by :meth:`Message.from_email_message` to
    convert dates from rfc822 format to iso 8601.

    :param datestr: string containing a date in rfc822 format
    :returns: string with date in iso 8601 format
    '''

    time_tuple = email.utils.parsedate_tz(datestr)
    if time_tuple is None:
        return datestr
    dt = datetime.datetime.fromtimestamp(email.utils.mktime_tz(time_tuple))
    return dt.isoformat()

EOLMAP = {
    '\r': 'CR',
    '\n': 'LF',
    '\r\n': 'CRLF',
}