gwpy/table/io/pycbc.py

Summary

Maintainability
A
2 hrs
Test Coverage
# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2014-2020)
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""Read events from the PyCBC live online GW search
"""

import re
from os.path import basename

import numpy

import h5py

from ...io.hdf5 import (identify_hdf5, with_read_hdf5)
from ...io.registry import (register_reader, register_identifier)
from .. import (Table, EventTable)
from ..filter import (filter_table, parse_column_filters)

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'
__credits__ = 'Alex Nitz <alex.nitz@ligo.org>'

PYCBC_LIVE_FORMAT = 'hdf5.pycbc_live'

META_COLUMNS = {'psd', 'loudest'}
PYCBC_FILENAME = re.compile('([A-Z][0-9])+-Live-[0-9.]+-[0-9.]+.(h5|hdf|hdf5)')


@with_read_hdf5
def table_from_file(source, ifo=None, columns=None, selection=None,
                    loudest=False, extended_metadata=True):
    """Read a `Table` from a PyCBC live HDF5 file

    Parameters
    ----------
    source : `str`, `h5py.File`, `h5py.Group`
        the file path of open `h5py` object from which to read the data

    ifo : `str`, optional
        the interferometer prefix (e.g. ``'G1'``) to read; this is required
        if reading from a file path or `h5py.File` and the containing file
        stores data for multiple interferometers

    columns : `list` or `str`, optional
        the list of column names to read, defaults to all in group

    loudest : `bool`, optional
        read only those events marked as 'loudest',
        default: `False` (read all)

    extended_metadata : `bool`, optional
        record non-column datasets found in the H5 group (e.g. ``'psd'``)
        in the ``meta`` dict, default: `True`

    Returns
    -------
    table : `~gwpy.table.EventTable`
    """
    # find group
    if isinstance(source, h5py.File):
        source, ifo = _find_table_group(source, ifo=ifo)

    # -- by this point 'source' is guaranteed to be an h5py.Group

    # parse default columns
    if columns is None:
        columns = list(_get_columns(source))
    readcols = set(columns)

    # parse selections
    selection = parse_column_filters(selection or [])
    if selection:
        readcols.update(list(zip(*selection))[0])

    # set up meta dict
    meta = {'ifo': ifo}
    meta.update(source.attrs)
    if extended_metadata:
        meta.update(_get_extended_metadata(source))

    if loudest:
        loudidx = source['loudest'][:]

    # map data to columns
    data = []
    for name in readcols:
        # convert hdf5 dataset into Column
        try:
            arr = source[name][:]
        except KeyError:
            if name in GET_COLUMN:
                arr = GET_COLUMN[name](source)
            else:
                raise
        if loudest:
            arr = arr[loudidx]
        data.append(Table.Column(arr, name=name))

    # read, applying selection filters, and column filters
    return filter_table(Table(data, meta=meta), selection)[columns]


def _find_table_group(h5file, ifo=None):
    """Find the right `h5py.Group` within the given `h5py.File`
    """
    exclude = ('background',)
    if ifo is None:
        try:
            ifo, = [key for key in h5file if key not in exclude]
        except ValueError as exc:
            exc.args = ("PyCBC live HDF5 file contains dataset groups "
                        "for multiple interferometers, please specify "
                        "the prefix of the relevant interferometer via "
                        "the `ifo` keyword argument, e.g: `ifo=G1`",)
            raise
    try:
        return h5file[ifo], ifo
    except KeyError as exc:
        exc.args = ("No group for ifo %r in PyCBC live HDF5 file" % ifo,)
        raise


def _get_columns(h5group):
    """Find valid column names from a PyCBC HDF5 Group

    Returns a `set` of names.
    """
    columns = set()
    for name in sorted(h5group):
        if (
            not isinstance(h5group[name], h5py.Dataset)
            or name == 'template_boundaries'
        ):
            continue
        if name.endswith('_template') and name[:-9] in columns:
            continue
        columns.add(name)
    return columns - META_COLUMNS


def _get_extended_metadata(h5group):
    """Extract the extended metadata for a PyCBC table in HDF5

    This method packs non-table-column datasets in the given h5group into
    a metadata `dict`

    Returns
    -------
    meta : `dict`
       the metadata dict
    """
    meta = dict()

    # get PSD
    try:
        psd = h5group['psd']
    except KeyError:
        pass
    else:
        from gwpy.frequencyseries import FrequencySeries
        meta['psd'] = FrequencySeries(
            psd[:], f0=0, df=psd.attrs['delta_f'], name='pycbc_live')

    # get everything else
    for key in META_COLUMNS - {'psd'}:
        try:
            value = h5group[key][:]
        except KeyError:
            pass
        else:
            meta[key] = value

    return meta


def filter_empty_files(files, ifo=None):
    """Remove empty PyCBC-HDF5 files from a list

    Parameters
    ----------
    files : `list`
        A list of file paths to test.

    ifo : `str`, optional
        prefix for the interferometer of interest (e.g. ``'L1'``),
        include this for a more robust test of 'emptiness'

    Returns
    -------
    nonempty : `list`
        the subset of the input ``files`` that are considered not empty

    See also
    --------
    empty_hdf5_file
        for details of the 'emptiness' test
    """
    return type(files)([f for f in files if not empty_hdf5_file(f, ifo=ifo)])


@with_read_hdf5
def empty_hdf5_file(h5f, ifo=None):
    """Determine whether PyCBC-HDF5 file is empty

    A file is considered empty if it contains no groups at the base level,
    or if the ``ifo`` group contains only the ``psd`` dataset.

    Parameters
    ----------
    h5f : `str`
        path of the pycbc_live file to test

    ifo : `str`, optional
        prefix for the interferometer of interest (e.g. ``'L1'``),
        include this for a more robust test of 'emptiness'

    Returns
    -------
    empty : `bool`
        `True` if the file looks to have no content, otherwise `False`
    """
    # the decorator opens the HDF5 file for us, so h5f is guaranteed to
    # be an h5py.Group object
    h5f = h5f.file
    # if the root group is empty, the file is empty
    if list(h5f) == []:
        return True

    # for each group (or the IFO group given by the user),
    # check whether there is any useful content
    groups = h5f.keys() if ifo is None else [ifo]
    for group in groups:
        if not set(h5f.get(group, [])).issubset({'gates', 'psd'}):
            return False
    return True


def identify_pycbc_live(origin, filepath, fileobj, *args, **kwargs):
    """Identify a PyCBC Live file as an HDF5 with the correct name.
    """
    return bool(
        identify_hdf5(origin, filepath, fileobj, *args, **kwargs)
        and (
            filepath is not None
            and PYCBC_FILENAME.match(basename(filepath))
        )
    )


# register for unified I/O (with higher priority than HDF5 reader)
register_identifier(PYCBC_LIVE_FORMAT, EventTable, identify_pycbc_live)
register_reader(PYCBC_LIVE_FORMAT, EventTable, table_from_file, priority=1)

# -- processed columns --------------------------------------------------------
#
# Here we define methods required to build commonly desired columns that
# are just a combination of the basic columns.
#
# Each method should take in an `~h5py.Group` and return a `numpy.ndarray`

GET_COLUMN = {}


def _new_snr_scale(redx2, q=6., n=2.):  # pylint: disable=invalid-name
    return (.5 * (1. + redx2 ** (q/n))) ** (-1./q)


def get_new_snr(h5group, q=6., n=2.):  # pylint:  disable=invalid-name
    """Calculate the 'new SNR' column for this PyCBC HDF5 table group
    """
    newsnr = h5group['snr'][:].copy()
    rchisq = h5group['chisq'][:]
    idx = numpy.where(rchisq > 1.)[0]
    newsnr[idx] *= _new_snr_scale(rchisq[idx], q=q, n=n)
    return newsnr


GET_COLUMN['new_snr'] = get_new_snr


def get_mchirp(h5group):
    """Calculate the chipr mass column for this PyCBC HDF5 table group
    """
    mass1 = h5group['mass1'][:]
    mass2 = h5group['mass2'][:]
    return (mass1 * mass2) ** (3/5.) / (mass1 + mass2) ** (1/5.)


GET_COLUMN['mchirp'] = get_mchirp