gwpy/io/gwf.py

Summary

Maintainability
B
4 hrs
Test Coverage
# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2014-2020)
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""I/O utilities for GWF files using the lalframe or frameCPP APIs
"""

import warnings

import numpy

from ..segments import (Segment, SegmentList)
from ..time import (to_gps, LIGOTimeGPS)
from .cache import read_cache
from .utils import file_path

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

# first 4 bytes of any valid GWF file (see LIGO-T970130 §4.3.1)
GWF_SIGNATURE = b'IGWD'


# -- i/o ----------------------------------------------------------------------

def identify_gwf(origin, filepath, fileobj, *args, **kwargs):
    """Identify a filename or file object as GWF

    This function is overloaded in that it will also identify a cache file
    as 'gwf' if the first entry in the cache contains a GWF file extension
    """
    # pylint: disable=unused-argument

    # try and read file descriptor
    if fileobj is not None:
        loc = fileobj.tell()
        fileobj.seek(0)
        try:
            if fileobj.read(4) == GWF_SIGNATURE:
                return True
        finally:
            fileobj.seek(loc)
    if filepath is not None:
        if filepath.endswith('.gwf'):
            return True
        if filepath.endswith(('.lcf', '.cache')):
            try:
                cache = read_cache(filepath)
            except IOError:
                return False
            else:
                if cache[0].path.endswith('.gwf'):
                    return True


def open_gwf(filename, mode='r'):
    """Open a filename for reading or writing GWF format data

    Parameters
    ----------
    filename : `str`
        the path to read from, or write to

    mode : `str`, optional
        either ``'r'`` (read) or ``'w'`` (write)

    Returns
    -------
    `LDAStools.frameCPP.IFrameFStream`
        the input frame stream (if `mode='r'`), or
    `LDAStools.frameCPP.IFrameFStream`
        the output frame stream (if `mode='w'`)
    """
    if mode not in ('r', 'w'):
        raise ValueError("mode must be either 'r' or 'w'")
    from LDAStools import frameCPP
    filename = file_path(filename)
    if mode == 'r':
        return frameCPP.IFrameFStream(str(filename))
    return frameCPP.OFrameFStream(str(filename))


def write_frames(filename, frames, compression='GZIP', compression_level=None):
    """Write a list of frame objects to a file

    **Requires:** |LDAStools.frameCPP|_

    Parameters
    ----------
    filename : `str`
        path to write into

    frames : `list` of `LDAStools.frameCPP.FrameH`
        list of frames to write into file

    compression : `int`, `str`, optional
        name of compresion algorithm to use, or its endian-appropriate
        ID, choose from

        - ``'RAW'``
        - ``'GZIP'``
        - ``'DIFF_GZIP'``
        - ``'ZERO_SUPPRESS'``
        - ``'ZERO_SUPPRESS_OTHERWISE_GZIP'``

    compression_level : `int`, optional
        compression level for given method, default is ``6`` for GZIP-based
        methods, otherwise ``0``
    """
    from LDAStools import frameCPP
    from ._framecpp import (Compression, DefaultCompressionLevel)

    # handle compression arguments
    if not isinstance(compression, int):
        compression = Compression[compression]
    if compression_level is None:
        compression_level = DefaultCompressionLevel[compression.name]

    # open stream
    stream = open_gwf(filename, 'w')

    # write frames one-by-one
    if isinstance(frames, frameCPP.FrameH):
        frames = [frames]
    for frame in frames:
        stream.WriteFrame(frame, int(compression), int(compression_level))
    # stream auto-closes (apparently)


def create_frame(time=0, duration=None, name='gwpy', run=-1, ifos=None):
    """Create a new :class:`~LDAStools.frameCPP.FrameH`

    **Requires:** |LDAStools.frameCPP|_

    Parameters
    ----------
    time : `float`, optional
        frame start time in GPS seconds

    duration : `float`, optional
        frame length in seconds

    name : `str`, optional
        name of project or other experiment description

    run : `int`, optional
        run number (number < 0 reserved for simulated data); monotonic for
        experimental runs

    ifos : `list`, optional
        list of interferometer prefices (e.g. ``'L1'``) associated with this
        frame

    Returns
    -------
    frame : :class:`~LDAStools.frameCPP.FrameH`
        the newly created frame header
    """
    from LDAStools import frameCPP
    from ._framecpp import DetectorLocation

    # create frame
    frame = frameCPP.FrameH()

    # add timing
    gps = to_gps(time)
    gps = frameCPP.GPSTime(gps.gpsSeconds, gps.gpsNanoSeconds)
    frame.SetGTime(gps)
    if duration is not None:
        frame.SetDt(float(duration))

    # add FrDetectors
    for prefix in ifos or []:
        frame.AppendFrDetector(
            frameCPP.GetDetector(DetectorLocation[prefix], gps),
        )

    # add descriptions
    frame.SetName(name)
    frame.SetRun(run)

    return frame


def create_fradcdata(series, frame_epoch=0,
                     channelgroup=0, channelid=0, nbits=16):
    """Create a `~frameCPP.FrAdcData` from a `~gwpy.types.Series`

    .. note::

       Currently this method is restricted to 1-dimensional arrays.

    Parameters
    ----------
    series : `~gwpy.types.Series`
        the input data array to store

    frame_epoch : `float`, `int`, optional
        the GPS start epoch of the `Frame` that will contain this
        data structure

    Returns
    -------
    frdata : `~frameCPP.FrAdcData`
        the newly created data structure

    Notes
    -----
    See Table 10 (§4.3.2.4) of LIGO-T970130 for more details
    """
    from LDAStools import frameCPP

    # assert correct type
    if not series.xunit.is_equivalent('s') or series.ndim != 1:
        raise TypeError("only 1-dimensional timeseries data can be "
                        "written as FrAdcData")

    frdata = frameCPP.FrAdcData(
        _series_name(series),
        channelgroup,
        channelid,
        nbits,
        (1 / series.dx.to('s')).value
    )
    frdata.SetTimeOffset(
        float(to_gps(series.x0.value) - to_gps(frame_epoch)),
    )
    return frdata


def _get_series_trange(series):
    if series.xunit.is_equivalent('s'):
        return abs(series.xspan)
    return 0


def _get_series_frange(series):
    if series.xunit.is_equivalent('Hz'):  # FrequencySeries
        return abs(series.xspan)
    elif series.ndim == 2 and series.yunit.is_equivalent('Hz'):  # Spectrogram
        return abs(series.yspan)
    return 0


def create_frprocdata(series, frame_epoch=0, comment=None,
                      type=None, subtype=None, trange=None,
                      fshift=0, phase=0, frange=None, bandwidth=0):
    """Create a `~frameCPP.FrAdcData` from a `~gwpy.types.Series`

    .. note::

       Currently this method is restricted to 1-dimensional arrays.

    Parameters
    ----------
    series : `~gwpy.types.Series`
        the input data array to store

    frame_epoch : `float`, `int`, optional
        the GPS start epoch of the `Frame` that will contain this
        data structure

    comment : `str`, optional
        comment

    type : `int`, `str`, optional
        type of data object

    subtype : `int`, `str`, optional
        subtype for f-Series

    trange : `float`, optional
        duration of sampled data

    fshift : `float`, optional
        frequency in the original data that corresponds to 0 Hz in the
        heterodyned series

    phase : `float`, optional
        phase of the heterodyning signal at start of dataset

    frange : `float`, optional
        frequency range

    bandwidth : `float, optional
        reoslution bandwidth

    Returns
    -------
    frdata : `~frameCPP.FrAdcData`
        the newly created data structure

    Notes
    -----
    See Table 17 (§4.3.2.11) of LIGO-T970130 for more details
    """
    from LDAStools import frameCPP

    # format auxiliary data
    if trange is None:
        trange = _get_series_trange(series)
    if frange is None:
        frange = _get_series_frange(series)

    return frameCPP.FrProcData(
        _series_name(series),
        str(comment or series.name),
        _get_frprocdata_type(series, type),
        _get_frprocdata_subtype(series, subtype),
        float(to_gps(series.x0.value) - to_gps(frame_epoch)),
        trange,
        fshift,
        phase,
        frange,
        bandwidth,
    )


def create_frsimdata(series, frame_epoch=0, comment=None, fshift=0, phase=0):
    """Create a `~frameCPP.FrAdcData` from a `~gwpy.types.Series`

    .. note::

       Currently this method is restricted to 1-dimensional arrays.

    Parameters
    ----------
    series : `~gwpy.types.Series`
        the input data array to store

    frame_epoch : `float`, `int`, optional
        the GPS start epoch of the `Frame` that will contain this
        data structure

    fshift : `float`, optional
        frequency in the original data that corresponds to 0 Hz in the
        heterodyned series

    phase : `float`, optional
        phase of the heterodyning signal at start of dataset

    Returns
    -------
    frdata : `~frameCPP.FrSimData`
        the newly created data structure

    Notes
    -----
    See Table 20 (§4.3.2.14) of LIGO-T970130 for more details
    """
    from LDAStools import frameCPP

    # assert correct type
    if not series.xunit.is_equivalent('s'):
        raise TypeError("only timeseries data can be written as FrSimData")

    return frameCPP.FrSimData(
        _series_name(series),
        str(comment or series.name),
        (1 / series.dx.to('s')).value,
        float(to_gps(series.x0.value) - to_gps(frame_epoch)),
        fshift,
        phase,
    )


def create_frvect(series):
    """Create a `~frameCPP.FrVect` from a `~gwpy.types.Series`

    .. note::

       Currently this method is restricted to 1-dimensional arrays.

    Parameters
    ----------
    series : `~gwpy.types.Series`
        the input data array to store

    Returns
    -------
    frvect : `~frameCPP.FrVect`
        the newly created data vector
    """
    from LDAStools import frameCPP
    from ._framecpp import FrVectType

    # create dimensions
    dims = frameCPP.Dimension(
        series.shape[0],  # num elements
        series.dx.value,  # step size
        str(series.dx.unit),  # unit
        0,  # starting value
    )

    # create FrVect
    vect = frameCPP.FrVect(
        _series_name(series),  # name
        int(FrVectType.find(series.dtype)),  # data type enum
        series.ndim,  # num dimensions
        dims,  # dimension object
        str(series.unit),  # unit
    )

    # populate FrVect
    vect.GetDataArray()[:] = numpy.require(series.value, requirements=['C'])

    return vect


# -- utilities ----------------------------------------------------------------

def num_channels(framefile):
    """Find the total number of channels in this framefile

    **Requires:** |LDAStools.frameCPP|_

    Parameters
    ----------
    framefile : `str`
        path to GWF-format file on disk

    Returns
    -------
    n : `int`
        the total number of channels found in the table of contents for this
        file
    """
    return len(get_channel_names(framefile))


def get_channel_type(channel, framefile):
    """Find the channel type in a given GWF file

    **Requires:** |LDAStools.frameCPP|_

    Parameters
    ----------
    channel : `str`, `~gwpy.detector.Channel`
        name of data channel to find

    framefile : `str`
        path of GWF file in which to search

    Returns
    -------
    ctype : `str`
        the type of the channel ('adc', 'sim', or 'proc')

    Raises
    ------
    ValueError
        if the channel is not found in the table-of-contents
    """
    channel = str(channel)
    for name, type_ in _iter_channels(framefile):
        if channel == name:
            return type_
    raise ValueError(
        f"'{channel}' not found in table-of-contents for {framefile}",
    )


def channel_in_frame(channel, framefile):
    """Determine whether a channel is stored in this framefile

    **Requires:** |LDAStools.frameCPP|_

    Parameters
    ----------
    channel : `str`
        name of channel to find

    framefile : `str`
        path of GWF file to test

    Returns
    -------
    inframe : `bool`
        whether this channel is included in the table of contents for
        the given framefile
    """
    channel = str(channel)
    for name in iter_channel_names(framefile):
        if channel == name:
            return True
    return False


def iter_channel_names(framefile):
    """Iterate over the names of channels found in a GWF file

    **Requires:** |LDAStools.frameCPP|_

    Parameters
    ----------
    framefile : `str`
        path of GWF file to read

    Returns
    -------
    channels : `generator`
        an iterator that will loop over the names of channels as read from
        the table of contents of the given GWF file
    """
    for name, _ in _iter_channels(framefile):
        yield name


def get_channel_names(framefile):
    """Return a list of all channel names found in a GWF file

    This method just returns

    >>> list(iter_channel_names(framefile))

    **Requires:** |LDAStools.frameCPP|_

    Parameters
    ----------
    framefile : `str`
        path of GWF file to read

    Returns
    -------
    channels : `list` of `str`
        a `list` of channel names as read from the table of contents of
        the given GWF file
    """
    return list(iter_channel_names(framefile))


def _iter_channels(framefile):
    """Yields the name and type of each channel in a GWF file TOC

    **Requires:** |LDAStools.frameCPP|_

    Parameters
    ----------
    framefile : `str`, `LDAStools.frameCPP.IFrameFStream`
        path of GWF file, or open file stream, to read
    """
    from LDAStools import frameCPP
    if not isinstance(framefile, frameCPP.IFrameFStream):
        framefile = open_gwf(framefile, 'r')
    toc = framefile.GetTOC()
    for typename in ('Sim', 'Proc', 'ADC'):
        typen = typename.lower()
        for name in getattr(toc, f"Get{typename}")():
            yield name, typen


def data_segments(paths, channel, warn=True):
    """Returns the segments containing data for a channel

    **Requires:** |LDAStools.frameCPP|_

    A frame is considered to contain data if a valid FrData structure
    (of any type) exists for the channel in that frame.  No checks
    are directly made against the underlying FrVect structures.

    Parameters
    ----------
    paths : `list` of `str`
        a list of GWF file paths

    channel : `str`
        the name to check in each frame

    warn : `bool`, optional
        emit a `UserWarning` when a channel is not found in a frame

    Returns
    -------
    segments : `~gwpy.segments.SegmentList`
        the list of segments containing data
    """
    segments = SegmentList()
    for path in paths:
        segments.extend(_gwf_channel_segments(path, channel, warn=warn))
    return segments.coalesce()


def _gwf_channel_segments(path, channel, warn=True):
    """Yields the segments containing data for ``channel`` in this GWF path
    """
    stream = open_gwf(path)
    # get segments for frames
    toc = stream.GetTOC()
    secs = toc.GetGTimeS()
    nano = toc.GetGTimeN()
    dur = toc.GetDt()

    readers = [getattr(stream, f"ReadFr{type_.title()}Data") for
               type_ in ("proc", "sim", "adc")]

    # for each segment, try and read the data for this channel
    for i, (s, ns, dt) in enumerate(zip(secs, nano, dur)):
        for read in readers:
            try:
                read(i, channel)
            except (IndexError, ValueError):
                continue
            readers = [read]  # use this one from now on
            epoch = LIGOTimeGPS(s, ns)
            yield Segment(epoch, epoch + dt)
            break
        else:  # none of the readers worked for this channel, warn
            if warn:
                warnings.warn(
                    f"'{channel}' not found in frame {i} of {path}",
                )


def _get_type(type_, enum):
    """Handle a type string, or just return an `int`

    Only to be called in relation to FrProcDataType and FrProcDataSubType
    """
    if isinstance(type_, int):
        return type_
    return enum[str(type_).upper()]


def _get_frprocdata_type(series, type_):
    """Determine the appropriate `FrProcDataType` for this series

    Notes
    -----
    See Table 17 (§4.3.2.11) of LIGO-T970130 for more details
    """
    from ._framecpp import FrProcDataType

    if type_ is not None:  # format user value
        return _get_type(type_, FrProcDataType)

    if series.ndim == 1 and series.xunit.is_equivalent("s"):
        type_ = FrProcDataType.TIME_SERIES
    elif series.ndim == 1 and series.xunit.is_equivalent("Hz"):
        type_ = FrProcDataType.FREQUENCY_SERIES
    elif series.ndim == 1:
        type_ = FrProcDataType.OTHER_1D_SERIES_DATA
    elif (
            series.ndim == 2
            and series.xunit.is_equivalent("s")
            and series.yunit.is_equivalent("Hz")
    ):
        type_ = FrProcDataType.TIME_FREQUENCY
    elif series.ndim > 2:
        type_ = FrProcDataType.MULTI_DIMENSIONAL
    else:
        type_ = FrProcDataType.UNKNOWN

    return type_


def _get_frprocdata_subtype(series, subtype):
    """Determine the appropriate `FrProcDataSubType` for this series

    Notes
    -----
    See Table 17 (§4.3.2.11) of LIGO-T970130 for more details
    """
    from ._framecpp import FrProcDataSubType

    if subtype is not None:  # format user value
        return _get_type(subtype, FrProcDataSubType)

    if series.unit == 'coherence':
        return FrProcDataSubType.COHERENCE
    return FrProcDataSubType.UNKNOWN


def _series_name(series):
    """Returns the 'name' of a `Series` that should be written to GWF

    This is basically `series.name or str(series.channel) or ""`

    Parameters
    ----------
    series : `gwpy.types.Series`
        the input series that will be written

    Returns
    -------
    name : `str`
        the name to use when storing this series
    """
    return (
        series.name
        or str(series.channel or "")
        or None
    )