emory-libraries/eulxml

View on GitHub
eulxml/xmlmap/core.py

Summary

Maintainability
D
2 days
Test Coverage
# file eulxml/xmlmap/core.py
#
#   Copyright 2010,2011 Emory University Libraries
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

from __future__ import unicode_literals
import logging
import os
import warnings
import urllib
import time
from lxml import etree
from lxml.builder import ElementMaker
import six
from six.moves.urllib.request import urlopen

from eulxml.utils.compat import u
from eulxml.xmlmap.fields import Field


logger = logging.getLogger(__name__)

__all__ = ['XmlObject', 'parseUri', 'parseString', 'loadSchema',
    'load_xmlobject_from_string', 'load_xmlobject_from_file',
    'load_xslt']

# NB: When parsing XML in this module, we explicitly create a new parser
#   each time. Without this, lxml 2.2.7 uses a global default parser. When
#   parsing strings, lxml appears to set that parser into no-network mode,
#   causing subsequent network-based parses to fail. Specifically, under
#   lxml 2.2.7, the second call here fails::
#
#   >>> etree.fromstring('<foo/>') # set global parser to no-network
#   >>> etree.parse('http://www.w3.org/2001/xml.xsd') # fails in no-network mode
#
#   If we simply construct a separate parser each time, parses will be
#   marginally slower, but this lxml bug will not affect us.
#
#   This lxml behavior has been logged as a bug:
#   https://bugs.launchpad.net/lxml/+bug/673205



def parseUri(stream, uri=None):
    """Read an XML document from a URI, and return a :mod:`lxml.etree`
    document."""
    return etree.parse(stream, parser=_get_xmlparser(), base_url=uri)


def parseString(string, uri=None):
    """Read an XML document provided as a byte string, and return a
    :mod:`lxml.etree` document. String cannot be a Unicode string.
    Base_uri should be provided for the calculation of relative URIs."""
    return etree.fromstring(string, parser=_get_xmlparser(), base_url=uri)

# internal cache for loaded schemas, so we only load each schema once
_loaded_schemas = {}


def loadSchema(uri, base_uri=None):
    """Load an XSD XML document (specified by filename or URL), and return a
    :class:`lxml.etree.XMLSchema`.
    """

    # uri to use for reporting errors - include base uri if any
    if uri in _loaded_schemas:
        return _loaded_schemas[uri]

    error_uri = uri
    if base_uri is not None:
        error_uri += ' (base URI %s)' % base_uri


    try:
        logger.debug('Loading schema %s' % uri)
        _loaded_schemas[uri] = etree.XMLSchema(etree.parse(uri,
                                                           parser=_get_xmlparser(),
                                                           base_url=base_uri))
        return _loaded_schemas[uri]
    except IOError as io_err:
        # add a little more detail to the error message - but should still be an IO error
        raise IOError('Failed to load schema %s : %s' % (error_uri, io_err))
    except etree.XMLSchemaParseError as parse_err:
        # re-raise as a schema parse error, but ensure includes details about schema being loaded
        raise etree.XMLSchemaParseError('Failed to parse schema %s -- %s' % (error_uri, parse_err))


def load_xslt(filename=None, xsl=None):
    '''Load and compile an XSLT document (specified by filename or string)
    for repeated use in transforming XML.
    '''
    parser = _get_xmlparser()
    if filename is not None:
        xslt_doc = etree.parse(filename, parser=parser)
    if xsl is not None:
        xslt_doc = etree.fromstring(xsl, parser=parser)

    return etree.XSLT(xslt_doc)


def _http_uri(uri):
    return uri.startswith('http:') or uri.startswith('https:')


class _FieldDescriptor(object):
    def __init__(self, field):
        self.field = field

    def __get__(self, obj, objtype):
        if obj is None:
            # NOTE: return the *field* here rather than self;
            # allows sphinx autodocumentation to inspect the type properly
            return self.field
        return self.field.get_for_node(obj.node, obj.context)

    def __set__(self, obj, value):
        return self.field.set_for_node(obj.node, obj.context, value)

    def __delete__(self, obj):
        return self.field.delete_for_node(obj.node, obj.context)


class XmlObjectType(type):

    """
    A metaclass for :class:`XmlObject`.

    Analogous in principle to Django's ``ModelBase``, this metaclass
    functions rather differently. While it'll likely get a lot closer over
    time, we just haven't been growing ours long enough to demand all of the
    abstractions built into Django's models. For now, we do three things:

      1. take any :class:`~eulxml.xmlmap.fields.Field` members and convert
         them to descriptors,
      2. store all of these fields and all of the base classes' fields in a
         ``_fields`` dictionary on the class, and
      3. if any local (non-parent) fields look like self-referential
         :class:`eulxml.xmlmap.NodeField` objects then patch them up
         to refer to the newly-created :class:`XmlObject`.

    """

    def __new__(cls, name, bases, defined_attrs):
        use_attrs = {}
        fields = {}
        recursive_fields = []

        # inherit base fields first; that way current class field defs will
        # override parents. note that since the parents already added fields
        # from *their* parents (because they were built from XmlObjectType),
        # we don't have to recurse.
        for base in bases:
            base_fields = getattr(base, '_fields', None)
            if base_fields:
                fields.update(base_fields)
            base_xsd = getattr(base, 'XSD_SCHEMA', None)

        schema_obj = None

        for attr_name, attr_val in defined_attrs.items():
            # XXX: not a fan of isinstance here. maybe use something like
            # django's contribute_to_class?
            if isinstance(attr_val, Field):
                if isinstance(attr_val, SchemaField):
                    # special case: schema field will look at the schema and return appropriate field type
                    if 'XSD_SCHEMA' in defined_attrs or base_xsd:
                        # load schema_obj the first time we need it
                        if schema_obj is None:
                            # if xsd schema is directly defined, use that
                            if 'XSD_SCHEMA' in defined_attrs:
                                schema_obj = load_xmlobject_from_file(defined_attrs['XSD_SCHEMA'],
                                                                      XsdSchema)
                            # otherwise, use nearest parent xsd
                            else:
                                schema_obj = load_xmlobject_from_file(base_xsd, XsdSchema)

                        attr_val = attr_val.get_field(schema_obj)
                field = attr_val
                fields[attr_name] = field
                use_attrs[attr_name] = _FieldDescriptor(field)

                # collect self-referential NodeFields so that we can resolve
                # them once we've created the new class
                node_class = getattr(field, 'node_class', None)
                if isinstance(node_class, six.string_types):
                    if node_class in ('self', name):
                        recursive_fields.append(field)
                    else:
                        msg = ('Class %s has field %s with node_class %s, ' +
                               'but the only supported class names are ' +
                               '"self" and %s.') % (name, attr_val,
                                                    repr(node_class),
                                                    repr(name))
                        raise ValueError(msg)

                # if a field 'foo' has a 'create_for_node' method, then add
                # a 'create_foo' method to call it. generally this isn't
                # helpful, but NodeField uses it.
                if hasattr(attr_val, 'create_for_node'):
                    create_method_name = 'create_' + attr_name
                    create_method = cls._make_create_field(create_method_name, attr_val)
                    use_attrs[create_method_name] = create_method

            else:
                use_attrs[attr_name] = attr_val
        use_attrs['_fields'] = fields

        super_new = super(XmlObjectType, cls).__new__
        new_class = super_new(cls, name, bases, use_attrs)

        # patch self-referential NodeFields (collected above) with the
        # newly-created class
        for field in recursive_fields:
            assert field.node_class in ('self', name)
            field.node_class = new_class

        return new_class

    @staticmethod
    def _make_create_field(field_name, field):
        def create_field(xmlobject):
            field.create_for_node(xmlobject.node, xmlobject.context)
        create_field.__name__ = str(field_name)
        return create_field


@six.python_2_unicode_compatible
class XmlObject(six.with_metaclass(XmlObjectType, object)):

    """
    A Python object wrapped around an XML node.

    Typical programs will define subclasses of :class:`XmlObject` with
    various field members. Some programs will use
    :func:`load_xmlobject_from_string` and :func:`load_xmlobject_from_file`
    to create instances of these subclasses. Other programs will create them
    directly, passing a node argument to the constructor. If the
    subclass defines a :attr:`ROOT_NAME` then this node argument is
    optional: Programs may then create instances directly with no
    constructor arguments.

    Programs can also pass an optional dictionary to the constructor to
    specify namespaces for XPath evaluation.

    If keyword arguments are passed in to the constructor, they will be used to
    set initial values for the corresponding fields on the :class:`XmlObject`.
    (Only currently supported for non-list fields.)

    Custom equality/non-equality tests: two instances of :class:`XmlObject` are
    considered equal if they point to the same lxml element node.
    """

    node = None
    """The top-level xml node wrapped by the object"""

    ROOT_NAME = None
    """A default root element name (without namespace prefix) used when an object
    of this type is created from scratch."""
    ROOT_NS = None
    """The default namespace used when an object of this type is created from
    scratch."""
    ROOT_NAMESPACES = {}
    """A dictionary whose keys are namespace prefixes and whose values are
    namespace URIs. These namespaces are used to create the root element when an
    object of this type is created from scratch; should include the namespace
    and prefix for the root element, if it has one. Any additional namespaces
    will be added to the root element."""

    XSD_SCHEMA = None
    """URI or file path to the XSD schema associated with this :class:`XmlObject`,
    if any.  If configured, will be used for optional validation when calling
    :meth:`load_xmlobject_from_string` and :meth:`load_xmlobject_from_file`,
    and with :meth:`is_valid`.
    """

    schema_validate = True
    '''Override for schema validation; if a schema must be defined for
     the use of :class:`xmlmap.fields.SchemaField` for a sub-xmlobject
     that should not be validated, set to False.'''

    @property
    def xmlschema(self):
        """A parsed XSD schema instance of
        :class:`lxml.etree.XMLSchema`; will be loaded the first time
        it is requested on any instance of this class if XSD_SCHEMA is
        set and xmlchema is None.  If you wish to load and parse the
        schema at class definition time, instead of at class instance
        initialization time, you may want to define your schema in
        your subclass like this::

          XSD_SCHEMA = "http://www.openarchives.org/OAI/2.0/oai_dc.xsd"
          xmlschema = xmlmap.loadSchema(XSD_SCHEMA)

        """
        if self.XSD_SCHEMA:
            return loadSchema(self.XSD_SCHEMA)

    # NOTE: DTD and RNG validation could be handled similarly to XSD validation logic

    def __init__(self, node=None, context=None, **kwargs):
        if node is None:
            node = self._build_root_element()

        self.node = node
        # FIXME: context probably needs work
        # get namespaces from current node OR its parent (in case of an lxml 'smart' string)
        if hasattr(node, 'nsmap'):
            nsmap = node.nsmap
        elif hasattr(node, 'getParent'):
            nsmap = node.nsmap
        else:
            nsmap = {}

        # xpath has no notion of a default namespace - omit any namespace with no prefix
        self.context = {'namespaces': dict([(prefix, ns) for prefix, ns
                                            in six.iteritems(nsmap) if prefix])}

        if context is not None:
            self.context.update(context)
        if hasattr(self, 'ROOT_NAMESPACES'):
            # also include any root namespaces to guarantee that expected prefixes are available
            self.context['namespaces'].update(self.ROOT_NAMESPACES)

        for field, value in six.iteritems(kwargs):
            # TODO (maybe): handle setting/creating list fields
            setattr(self, field, value)

    def _build_root_element(self):
        opts = {}
        if hasattr(self, 'ROOT_NS'):
            opts['namespace'] = self.ROOT_NS
        if hasattr(self, 'ROOT_NAMESPACES'):
            opts['nsmap'] = self.ROOT_NAMESPACES

        E = ElementMaker(**opts)
        root = E(self.ROOT_NAME)
        return root

    def xsl_transform(self, filename=None, xsl=None, return_type=None, **params):
        """Run an xslt transform on the contents of the XmlObject.

        XSLT can be passed in as an XSLT object generated by :meth:`load_xslt`
        or as filename or string. If a params dictionary is specified, its items
        will be passed as parameters to the XSL transformation, and any string
        values will automatically be encoded as XSL string parameters.

        .. Note::

            If XSL is being used multiple times, it is recommended to
            use :meth`:load_xslt` to load and compile the XSLT once.

        :param filename: xslt filename (optional, one of file and xsl is required)
        :param xsl: xslt as string OR compiled XSLT object as returned by
            :meth:`load_xslt` (optional)
        :param return_type: type of object to return; optional, defaults to
            :class:`XmlObject`; specify unicode or string for text output
        :returns: an instance of :class:`XmlObject` or the return_type specified
        """
        # NOTE: converting _XSLTResultTree to XmlObject because of a bug in its unicode method
        # - to output xml result, use serialize instead of unicode
        if return_type is None:
            return_type = XmlObject

        # automatically encode any string params as XSLT string parameters
        for key, val in six.iteritems(params):
            if isinstance(val, six.string_types):
                params[key] = etree.XSLT.strparam(val)

        parser = _get_xmlparser()
        # if a compiled xslt object is passed in, use that first
        if xsl is not None and isinstance(xsl, etree.XSLT):
            result = xsl(self.node, **params)
        else:
            # otherwise, load the xslt
            if filename is not None:
                xslt_doc = etree.parse(filename, parser=parser)
            if xsl is not None:
                xslt_doc = etree.fromstring(xsl, parser=parser)

            # NOTE: there is a memory bug that results in malloc errors and
            # segfaults when using the parsed etree.XSLT approach here.
            # As a workaround, using the document xslt method instead.

            if self.node == self.node.getroottree().getroot():
                # if current node is root node, use entire document for transform
                xmltree = self.node.getroottree()
            else:
                # otherwise, construct a temporary partial document from this node
                partial_doc = etree.fromstring(self.serialize(), parser=parser)
                xmltree = partial_doc.getroottree()

            result = xmltree.xslt(xslt_doc, **params)

        # If XSLT returns nothing, transform returns an _XSLTResultTree
        # with no root node.  Log a warning, and don't generate an
        # empty xmlobject which will behave unexpectedly.

        # text output does not include a root node, so check separately
        if issubclass(return_type, six.string_types):
            if result is None:
                logger.warning("XSL transform generated an empty result")
                return
            else:
                return return_type(result)

        if result is None or result.getroot() is None:
            logger.warning("XSL transform generated an empty result")
        else:
            # pass in root node, rather than the result tree object
            return return_type(result.getroot())

    def __str__(self):
        if isinstance(self.node, six.string_types):
            return self.node
        return self.node.xpath("normalize-space(.)")

    def __string__(self):
        if isinstance(self.node, six.string_types):
            return self.node
        return u(self).encode('ascii', 'xmlcharrefreplace')

    def __eq__(self, other):
        # consider two xmlobjects equal if they are pointing to the same xml node
        if hasattr(other, 'node') and self.node == other.node:
            return True
        # consider two xmlobjects equal if they serialize the same
        if hasattr(other, 'serialize') and self.serialize() == other.serialize():
            return True
        # NOTE: does not address "equivalent" xml, which is potentially very complex
        return False

    def __ne__(self, other):
        return not self.__eq__(other)

    def serialize(self, stream=None, pretty=False):
        """Serialize the contents of the XmlObject to a stream.  Serializes
        current node only; for the entire XML document, use :meth:`serializeDocument`.

        If no stream is specified, returns a string.
        :param stream: stream or other file-like object to write content to (optional)
        :param pretty: pretty-print the XML output; boolean, defaults to False
        :rtype: stream passed in or an instance of :class:`cStringIO.StringIO`
        """
        return self._serialize(self.node, stream=stream, pretty=pretty)

    def serializeDocument(self, stream=None, pretty=False):
        """Serialize the contents of the entire XML document (including Doctype
        declaration, if there is one), with an XML declaration, for the current
        XmlObject to a stream.

        If no stream is specified, returns a string.
        :param stream: stream or other file-like object to write content to (optional)
        :param pretty: pretty-print the XML output; boolean, defaults to False
        :rtype: stream passed in or an instance of :class:`cStringIO.StringIO`
        """
        return self._serialize(self.node.getroottree(), stream=stream, pretty=pretty,
                                xml_declaration=True)

    def _serialize(self, node, stream=None, pretty=False, xml_declaration=False):
        # actual logic of xml serialization
        if stream is None:
            string_mode = True
            stream = six.BytesIO()
        else:
            string_mode = False

        # NOTE: etree c14n doesn't seem to like fedora info: URIs
        stream.write(etree.tostring(node, encoding='UTF-8', pretty_print=pretty,
                                    xml_declaration=xml_declaration))

        if string_mode:
            data = stream.getvalue()
            stream.close()
            return data

        return stream

    def is_valid(self):
        """Determine if the current document is valid as far as we can determine.
        If there is a schema associated, check for schema validity.  Otherwise,
        return True.

        :rtype: boolean
        """
        # valid if there are no validation errors
        return self.validation_errors() == []

    def validation_errors(self):
        """Return a list of validation errors.  Returns an empty list
        if the xml is schema valid or no schema is defined.  If a
        schema is defined but :attr:`schema_validate` is False, schema
        validation will be skipped.

        Currently only supports schema validation.

        :rtype: list
        """
        # if we add other types of validation (DTD, RNG), incorporate them here
        if self.xmlschema and self.schema_validate and not self.schema_valid():
            return self.schema_validation_errors()
        return []

    def schema_valid(self):
        """Determine if the current document is schema-valid according to the
        configured XSD Schema associated with this instance of :class:`XmlObject`.

        :rtype: boolean
        :raises: Exception if no XSD schema is defined for this XmlObject instance
        """
        if self.xmlschema is not None:
            # clear out errors so they are not duplicated by repeated
            # validations on the same schema object
            self.xmlschema._clear_error_log()
            # NOTE: _clear_error_log is technically private, but I can't find
            # any public method to clear the validation log.
            return self.xmlschema.validate(self.node)
        else:
            raise Exception('No XSD schema is defined, cannot validate document')

    def schema_validation_errors(self):
        """
        Retrieve any validation errors that occured during schema validation
        done via :meth:`is_valid`.

        :returns: a list of :class:`lxml.etree._LogEntry` instances
        :raises: Exception if no XSD schema is defined for this XmlObject instance
        """
        if self.xmlschema is not None:
            return self.xmlschema.error_log
        else:
            raise Exception('No XSD schema is defined, cannot return validation errors')

    def is_empty(self):
        """
        Returns True if the root node contains no child elements, no
        attributes, and no text. Returns False if any are present.
        """
        return len(self.node) == 0 and len(self.node.attrib) == 0 \
            and not self.node.text and not self.node.tail  # regular text or text after a node



""" April 2016. Removing Urllib2Resolver so we can support
  loading local copies of schema and skip validation in get_xml_parser """


def _get_xmlparser(xmlclass=XmlObject, validate=False, resolver=None):
    """Initialize an instance of :class:`lxml.etree.XMLParser` with appropriate
    settings for validation.  If validation is requested and the specified
    instance of :class:`XmlObject` has an XSD_SCHEMA defined, that will be used.
    Otherwise, uses DTD validation. Switched resolver to None to skip validation.
    """
    if validate:
        if hasattr(xmlclass, 'XSD_SCHEMA') and xmlclass.XSD_SCHEMA is not None:
            # If the schema has already been loaded, use that.
            # (since we accessing the *class*, accessing 'xmlschema' returns a property,
            # not the initialized schema object we actually want).
            xmlschema = getattr(xmlclass, '_xmlschema', None)
            # otherwise, load the schema
            if xmlschema is None:
                xmlschema = loadSchema(xmlclass.XSD_SCHEMA)
            opts = {'schema': xmlschema}
        else:
            # if configured XmlObject does not have a schema defined, assume DTD validation
            opts = {'dtd_validation': True}
    else:
        # If validation is not requested, then the parsing should fail
        # only for well-formedness issues.
        #
        # Therefore, we must turn off collect_ids, otherwise lxml will
        # have a problem with duplicate IDs as it collects
        # them. However, the XML spec declares ID uniqueness as a
        # validation constraint, not a well-formedness
        # constraint. (See https://www.w3.org/TR/xml/#id.)
        opts = {"collect_ids": False}

    parser = etree.XMLParser(**opts)

    if resolver is not None:
        parser.resolvers.add(resolver)

    return parser


def load_xmlobject_from_string(string, xmlclass=XmlObject, validate=False,
        resolver=None):
    """Initialize an XmlObject from a string.

    If an xmlclass is specified, construct an instance of that class instead
    of :class:`~eulxml.xmlmap.XmlObject`. It should be a subclass of XmlObject.
    The constructor will be passed a single node.

    If validation is requested and the specified subclass of :class:`XmlObject`
    has an XSD_SCHEMA defined, the parser will be configured to validate against
    the specified schema.  Otherwise, the parser will be configured to use DTD
    validation, and expect a Doctype declaration in the xml content.

    :param string: xml content to be loaded, as a string
    :param xmlclass: subclass of :class:`~eulxml.xmlmap.XmlObject` to initialize
    :param validate: boolean, enable validation; defaults to false
    :rtype: instance of :class:`~eulxml.xmlmap.XmlObject` requested
    """
    parser = _get_xmlparser(xmlclass=xmlclass, validate=validate, resolver=resolver)
    element = etree.fromstring(string, parser)
    return xmlclass(element)


def load_xmlobject_from_file(filename, xmlclass=XmlObject, validate=False,
        resolver=None):
    """Initialize an XmlObject from a file.

    See :meth:`load_xmlobject_from_string` for more details; behaves exactly the
    same, and accepts the same parameters, except that it takes a filename
    instead of a string.

    :param filename: name of the file that should be loaded as an xmlobject.
        :meth:`etree.lxml.parse` will accept a file name/path, a file object, a
        file-like object, or an HTTP or FTP url, however file path and URL are
        recommended, as they are generally faster for lxml to handle.
    """
    parser = _get_xmlparser(xmlclass=xmlclass, validate=validate, resolver=resolver)

    tree = etree.parse(filename, parser)
    return xmlclass(tree.getroot())

from eulxml.xmlmap.fields import *
# Import these for backward compatibility. Should consider deprecating these
# and asking new code to pull them from descriptor


# XSD schema xmlobjects - used in XmlObjectType to process SchemaFields
# FIXME: where should these actually go? depends on both XmlObject and fields


class XsdType(XmlObject):
    ROOT_NAME = 'simpleType'
    name = StringField('@name')
    base = StringField('xs:restriction/@base')
    restricted_values = StringListField('xs:restriction/xs:enumeration/@value')

    def base_type(self):
        # for now, only supports simple types - eventually, may want logic to
        # traverse extended types to get to base XSD type
        if ':' in self.base:    # for now, ignore prefix (could be xsd, xs, etc. - how to know which?)
            prefix, basetype = self.base.split(':')
        else:
            basetype = self.base
        return basetype


class XsdSchema(XmlObject):
    ROOT_NAME = 'schema'
    ROOT_NS = 'http://www.w3.org/2001/XMLSchema'
    ROOT_NAMESPACES = {'xs': ROOT_NS}

    def get_type(self, name=None, xpath=None):
        if xpath is None:
            if name is None:
                raise Exception("Must specify either name or xpath")
            xpath = '//*[@name="%s"]' % name

        result = self.node.xpath(xpath)
        if len(result) == 0:
            raise Exception("No Schema type definition found for xpath '%s'" % xpath)
        elif len(result) > 1:
            raise Exception("Too many schema type definitions found for xpath '%s' (found %d)" \
                        % (xpath, len(result)))
        return XsdType(result[0], context=self.context)  # pass in namespaces