gwpy/table/io/ligolw.py

Summary

Maintainability
B
4 hrs
Test Coverage
# -*- coding: utf-8 -*-
# Copyright (C) Louisiana State University (2014-2017)
#               Cardiff University (2017-2021)
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""Read LIGO_LW documents into :class:`~ligo.lw.table.Table` objects.
"""

import numpy

try:
    from ligo.lw.lsctables import TableByName as _TableByName
except ImportError:
    LIGOLW_TABLES = set()
else:
    LIGOLW_TABLES = set(_TableByName.values())

from ...io import registry
from ...io.ligolw import (
    is_ligolw,
    patch_ligotimegps,
    read_table as read_ligolw_table,
    to_table_type as to_ligolw_table_type,
    write_tables as write_ligolw_tables,
)
from .. import (Table, EventTable)
from .utils import read_with_selection

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'
__all__ = []

# methods to exclude from get_as_columns conversions
GET_AS_EXCLUDE = [
    'column',
    'username',
    'table',
    'time_slide_id',
]

# map custom object types to numpy-compatible type
NUMPY_TYPE_MAP = dict()
try:
    from ligo.lw.lsctables import LIGOTimeGPS
except ImportError:
    pass
else:
    NUMPY_TYPE_MAP[LIGOTimeGPS] = numpy.float_


# -- utilities ----------------------------------------------------------------

def _get_property_columns(tabletype, columns):
    """Returns list of GPS columns required to read gpsproperties for a table

    Examples
    --------
    >>> _get_property_columns(lsctables.SnglBurstTable, ['peak'])
    ['peak_time', 'peak_time_ns']
    """
    from ligo.lw.lsctables import gpsproperty as GpsProperty
    # get properties for row object
    rowvars = vars(tabletype.RowType)
    # build list of real column names for fancy properties
    extracols = {}
    for key in columns:
        prop = rowvars[key]
        if isinstance(prop, GpsProperty):
            extracols[key] = (prop.s_name, prop.ns_name)
    return extracols


def _get_property_type(tabletype, column):
    """Returns the type of values in the given property

    Examples
    --------
    >>> _get_property_type(lsctables.SnglBurstTable, 'peak')
    lal.LIGOTimeGPS
    """
    from ligo.lw.lsctables import gpsproperty as GpsProperty
    prop = vars(tabletype.RowType)[column]
    if isinstance(prop, GpsProperty):
        return LIGOTimeGPS


# -- conversions --------------------------------------------------------------

def to_astropy_table(llwtable, apytable, copy=False, columns=None,
                     use_numpy_dtypes=False, rename=None):
    """Convert a :class:`~ligo.lw.table.Table` to an `~astropy.tableTable`

    This method is designed as an internal method to be attached to
    :class:`~ligo.lw.table.Table` objects as `__astropy_table__`.

    Parameters
    ----------
    llwtable : :class:`~ligo.lw.table.Table`
        the LIGO_LW table to convert from

    apytable : `type`
        `astropy.table.Table` class or subclass

    copy : `bool`, optional
        if `True` copy the input data, otherwise return a reference,
        default: `False`

    columns : `list` of `str`, optional
        the columns to populate, if not given, all columns present in the
        table are mapped

    use_map_dtypes : `bool`, optional
        force column `dtypes

    rename : `dict`, optional
        dict of ('old name', 'new name') pairs to rename columns
        from the original LIGO_LW table

    Returns
    -------
    table : `EventTable`
        a view of the original data
    """
    # set default keywords
    if rename is None:
        rename = {}
    if columns is None:
        columns = llwtable.columnnames

    # extract columns from LIGO_LW table as astropy.table.Column
    data = []
    for colname in columns:
        arr = _get_column(llwtable, colname)

        # transform to astropy.table.Column
        copythis = isinstance(arr, numpy.ndarray)
        data.append(to_astropy_column(arr, apytable.Column, copy=copythis,
                                      use_numpy_dtype=use_numpy_dtypes,
                                      name=rename.get(colname, colname)))

    # build table and return
    return apytable(data, copy=False, meta={'tablename': str(llwtable.Name)})


def _get_column(llwtable, name):
    # handle empty tables as special (since we can't introspect the types)
    if not len(llwtable):
        if name in llwtable.validcolumns:
            dtype = _get_pytype(llwtable.validcolumns[name])
        else:
            dtype = _get_property_type(type(llwtable), name)
        if dtype:
            return numpy.empty((0,), dtype=dtype)

    # try get_{}
    get_ = "get_{}".format(name)
    if name not in GET_AS_EXCLUDE and hasattr(llwtable, get_):
        with patch_ligotimegps(module=type(llwtable).__module__):
            return getattr(llwtable, get_)()

    # try array of property values
    try:
        with patch_ligotimegps(module=type(llwtable).__module__):
            return numpy.asarray([getattr(row, name) for row in llwtable])
    except AttributeError:
        return llwtable.getColumnByName(name)


def to_astropy_column(llwcol, cls, copy=False, dtype=None,
                      use_numpy_dtype=False, **kwargs):
    """Convert a :class:`~ligo.lw.table.Column` to `astropy.table.Column`

    Parameters
    -----------
    llwcol : :class:`~ligo.lw.table.Column`, `numpy.ndarray`, iterable
        the LIGO_LW column to convert, or an iterable

    cls : `~astropy.table.Column`
        the Astropy `~astropy.table.Column` or subclass to convert to

    copy : `bool`, optional
        if `True` copy the input data, otherwise return a reference,
        default: `False`

    dtype : `type`, optional
        the data type to convert to when creating the `~astropy.table.Column`

    use_numpy_dtype : `bool`, optional
        convert object type to numpy dtype, default: `False`, only used
        with ``dtype=None``

    **kwargs
        other keyword arguments are passed to the `~astropy.table.Column`
        creator

    Returns
    -------
    column : `~astropy.table.Column`
        an Astropy version of the given LIGO_LW column
    """
    if dtype is None:  # try and find dtype
        dtype = _get_column_dtype(llwcol)
        if use_numpy_dtype and numpy.dtype(dtype).type is numpy.object_:
            # dtype maps to 'object' in numpy, try and resolve real numpy type
            try:
                dtype = NUMPY_TYPE_MAP[dtype]
            except KeyError:
                raise TypeError(
                    f"no mapping from object type '{dtype}' to numpy type",
                )
    return cls(data=llwcol, copy=copy, dtype=dtype, **kwargs)


def _get_column_dtype(llwcol):
    """Get the data type of a LIGO_LW `Column`

    Parameters
    ----------
    llwcol : :class:`~ligo.lw.table.Column`, `numpy.ndarray`, iterable
        a LIGO_LW column, a numpy array, or an iterable

    Returns
    -------
    dtype : `type`, None
        the object data type for values in the given column, `None` is
        returned if ``llwcol`` is a `numpy.ndarray` with `numpy.object_`
        dtype, or no data type can be parsed (e.g. empty list)
    """
    try:  # maybe its a numpy array already!
        dtype = llwcol.dtype
        if dtype is numpy.dtype('O'):  # don't convert
            raise AttributeError
        return dtype
    except AttributeError:  # dang
        try:  # ligo.lw.table.Column
            llwtype = llwcol.parentNode.validcolumns[llwcol.Name]
        except AttributeError:  # not a column
            try:
                return type(llwcol[0])
            except IndexError:
                return None
        else:  # map column type str to python type
            return _get_pytype(llwtype)


def _get_pytype(llwtype):
    """Returns a dtype-compatible type for the given LIGO_LW type string
    """
    from ligo.lw.types import (ToPyType, ToNumPyType)
    try:
        return ToNumPyType[llwtype]
    except KeyError:
        return ToPyType[llwtype]


def table_to_ligolw(table, tablename):
    """Convert a `astropy.table.Table` to a :class:`ligo.lw.table.Table`
    """
    from ligo.lw import lsctables

    # -- work out columns for LIGO_LW table
    # this is overly complicated because of the way that we magically
    # map properties of ligo.lw.table.Table objects to real columns in
    # astropy.table.Table objects, but also because ligo.lw internally
    # combines table names and column names for some columns
    # (e.g. process:process_id)
    columns = table.columns.keys()
    cls = lsctables.TableByName[tablename]
    inst = lsctables.New(cls)
    try:
        columnnamesreal = dict(zip(inst.columnnames, inst.columnnamesreal))
    except AttributeError:  # glue doesn't have these attributes
        columnnamesreal = {}
    llwcolumns = [columnnamesreal.get(n, n) for n in columns]
    for col, llwcols in _get_property_columns(cls, columns).items():
        idx = llwcolumns.index(col)
        llwcolumns.pop(idx)
        for name in llwcols[::-1]:
            llwcolumns.insert(idx, name)

    # create new LIGO_LW table
    llwtable = lsctables.New(cls, columns=llwcolumns)

    # map rows across
    for row in table:
        llwrow = llwtable.RowType()
        for name in columns:
            setattr(llwrow, name,
                    to_ligolw_table_type(row[name], llwtable, name))
        llwtable.append(llwrow)

    return llwtable


# -- read ---------------------------------------------------------------------

@read_with_selection
def read_table(source, tablename=None, **kwargs):
    """Read a `Table` from one or more LIGO_LW XML documents

    source : `file`, `str`, :class:`~ligo.lw.ligolw.Document`, `list`
        one or more open files, file paths, or LIGO_LW `Document` objects

    tablename : `str`, optional
        the `Name` of the relevant `Table` to read, if not given a table will
        be returned if only one exists in the document(s)

    **kwargs
        keyword arguments for the read, or conversion functions

    See also
    --------
    gwpy.io.ligolw.read_table
        for details of keyword arguments for the read operation
    gwpy.table.io.ligolw.to_astropy_table
        for details of keyword arguments for the conversion operation
    """
    from ligo.lw import table as ligolw_table
    from ligo.lw.lsctables import TableByName

    # -- keyword handling -----------------------

    # separate keywords for reading and converting from LIGO_LW to Astropy
    read_kw = kwargs  # rename for readability
    convert_kw = {
        'rename': None,
        'use_numpy_dtypes': False,
    }
    for key in filter(kwargs.__contains__, convert_kw):
        convert_kw[key] = kwargs.pop(key)
    if convert_kw['rename'] is None:
        convert_kw['rename'] = {}

    # allow user to specify LIGO_LW columns to read to provide the
    # desired output columns
    try:
        columns = list(kwargs.pop('columns'))
    except KeyError:
        columns = None
    try:
        read_kw['columns'] = list(kwargs.pop('ligolw_columns'))
    except KeyError:
        read_kw['columns'] = columns
    convert_kw['columns'] = columns or read_kw['columns']

    if tablename:
        tableclass = TableByName[ligolw_table.Table.TableName(tablename)]
        # work out if fancy property columns are required
        #     means 'peak_time' and 'peak_time_ns' will get read if 'peak'
        #     is requested
        if convert_kw['columns'] is not None:
            readcols = set(read_kw['columns'])
            propcols = _get_property_columns(tableclass, convert_kw['columns'])
            for col in propcols:
                try:
                    readcols.remove(col)
                except KeyError:
                    continue
                readcols.update(propcols[col])
            read_kw['columns'] = list(readcols)

    # -- read -----------------------------------

    return Table(read_ligolw_table(source, tablename=tablename, **read_kw),
                 **convert_kw)


# -- write --------------------------------------------------------------------

def write_table(table, target, tablename=None, **kwargs):
    """Write a `~astropy.table.Table` to file in LIGO_LW XML format
    """
    if tablename is None:  # try and get tablename from metadata
        tablename = table.meta.get('tablename', None)
    if tablename is None:  # panic
        raise ValueError("please pass ``tablename=`` to specify the target "
                         "LIGO_LW Table Name")
    llwtable = table_to_ligolw(
        table,
        tablename,
    )
    return write_ligolw_tables(target, [llwtable], **kwargs)


# -- register -----------------------------------------------------------------

for table_ in LIGOLW_TABLES:
    # register conversion from LIGO_LW to astropy Table
    table_.__astropy_table__ = to_astropy_table

for table_class in (Table, EventTable):
    registry.register_reader('ligolw', table_class, read_table)
    registry.register_writer('ligolw', table_class, write_table)
    registry.register_identifier('ligolw', table_class, is_ligolw)