gwpy/types/io/ligolw.py

Summary

Maintainability
A
2 hrs
Test Coverage
# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2017-2020)
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""Read/write series in LIGO_LW-XML
"""

from .. import Series
from ...io.ligolw import read_ligolw
from ...time import to_gps

__author__ = "Duncan Macleod <duncan.macleod@ligo.org>"


def series_contenthandler():
    """Build a `~xml.sax.handlers.ContentHandler` to read a LIGO_LW <Array>
    """
    from ligo.lw import (
        ligolw,
        array as ligolw_array,
        param as ligolw_param
    )

    @ligolw_array.use_in
    @ligolw_param.use_in
    class ArrayContentHandler(ligolw.LIGOLWContentHandler):
        """`~xml.sax.handlers.ContentHandler` to read a LIGO_LW ``<Array>``
        """
        pass

    return ArrayContentHandler


def _match_name(elem, name):
    """Returns `True` if the ``elem``'s Name matches ``name``
    """
    try:
        return elem.Name == name
    except AttributeError:  # no name
        return False


def _get_time(time):
    """Returns the Time element of a ``<LIGO_LW>``.
    """
    from ligo.lw.ligolw import Time
    if not isinstance(time, Time):
        t, = time.getElementsByTagName(Time.tagName)
        return _get_time(t)
    return to_gps(time.pcdata)


def _match_time(elem, gps):
    """Returns `True` if the ``elem``'s ``<Time>`` matches ``gps``

    This will return `False` if not exactly one ``<Time>`` element
    is found.
    """
    try:
        return _get_time(elem) == to_gps(gps)
    except (AttributeError, ValueError):  # not exactly one Time
        return False


def _match_array(xmldoc, name=None, epoch=None, **params):
    from ligo.lw.ligolw import Array
    from ligo.lw.param import (Param, get_param)

    def _is_match(arr):
        """Work out whether this `<Array>` element matches the request
        """
        parent = arr.parentNode
        if (
            (name is not None and not _match_name(arr, name))
            or (epoch is not None and not _match_time(parent, epoch))
        ):
            return False
        for key, value in params.items():
            try:
                if get_param(parent, key).pcdata != value:
                    return False
            except ValueError:  # no Param with this Name
                return False
        return True

    def _get_filter_keys(arrays, **given):
        """Returns the set of keyword arguments that can be used to filter

        This is just to format a helpful error message for users to show them
        what params they could use to select the right array.
        """
        # return name and epoch if not given by the user
        keys = set(k for k in ("name", "epoch") if given.pop(k, None) is None)
        # add all of the params found in _any_ array
        return (keys | set(
            p.Name for arr in arrays for
            p in arr.parentNode.getElementsByTagName(Param.tagName)
        )) - set(given.keys())

    # parse out correct element
    matches = list(filter(
        _is_match,
        xmldoc.getElementsByTagName(Array.tagName),
    ))
    try:
        arr, = matches
    except ValueError as exc:
        if not matches:  # no arrays found
            exc.args = ("no <Array> elements found matching request",)
        else:  # multiple arrays found
            keys = _get_filter_keys(matches, name=name, epoch=epoch, **params)
            exc.args = (
                "multiple <Array> elements found matching the request, "
                "please use one of the following keyword arguments to "
                "specify the correct array to read: {}".format(
                    ", ".join(map(repr, sorted(keys))),
                ),
            )
        raise

    return arr


def _update_metadata_from_ligolw(array, kwargs):
    from ligo.lw.ligolw import Time
    from ligo.lw.param import get_param

    parent = array.parentNode

    # pick out the epoch
    try:
        time, = parent.getElementsByTagName(Time.tagName)
    except ValueError:
        pass
    else:
        kwargs.setdefault('epoch', _get_time(time))

    # copy over certain other params, if they exist
    for key in ('channel',):
        try:
            kwargs[key] = get_param(parent, key)
        except ValueError:
            pass

    return kwargs


def read_series(source, name=None, epoch=None, contenthandler=None, **params):
    """Read a `Series` from ``LIGO_LW`` XML

    Parameters
    ----------
    source : `file`, `str`, :class:`~ligo.lw.ligolw.Document`
        file path or open ``LIGO_LW``-format XML file

    name : `str`, optional
        name of the relevant ``<Array>`` element to read

    epoch : `float`, `int`, optional
        GPS time epoch of ``<LIGO_LW>`` element to read

    contenthandler : `~xml.sax.handler.ContentHandler`, optional
        the content handler to use when parsing the document, see
        :func:`series_contenthandler` for the default handler

    **params
        other ``<Param>`` ``(name, value)`` pairs to use in matching
        the parent correct ``<LIGO_LW>`` element to read

    Returns
    -------
    series : `~gwpy.types.Series`
        a series with metadata read from the ``<Array>``
    """
    from ligo.lw.ligolw import Dim

    # read document and find relevant <Array> element
    xmldoc = read_ligolw(
        source,
        contenthandler=contenthandler or series_contenthandler(),
    )
    array = _match_array(xmldoc, name=name, epoch=epoch, **params)

    # parse dimensions
    dims = array.getElementsByTagName(Dim.tagName)
    xdim, ydim = dims
    x0 = xdim.Start
    dx = xdim.Scale
    xunit = xdim.Unit
    if ydim.n > Series._ndim + 1:  # check that we can store these data
        raise ValueError(
            "Cannot parse LIGO_LW Array with {} dimensions in a Series".format(
                ydim.n,
            ),
        )

    # parse metadata
    array_kw = {
        'name': array.Name,
        'unit': array.Unit,
        'xunit': xunit,
    }

    # update metadata from parent <LIGO_LW> element
    array_kw = _update_metadata_from_ligolw(array, array_kw)

    # normalize units (mainly for FrequencySeries)
    if array_kw.get("xunit") == "s^-1":
        array_kw["xunit"] = "Hz"
    if (array_kw.get("unit") or "").startswith("s "):
        array_kw["unit"] = array_kw["unit"].split(" ", 1)[1] + " Hz^-1"

    # build Series
    # note: in order to match the functionality of lal.series,
    #       we discard the actual frequency array, I (DMM) am
    #       not convinced this is the right thing to do, but
    #       it at least provides consistency across multiple
    #       implementations
    return Series(array.array[-1], x0=x0, dx=dx, **array_kw)