gwpy/timeseries/io/losc.py

Summary

Maintainability
B
5 hrs
Test Coverage
# -*- coding: utf-8 -*-
# Copyright (C) Louisiana State University (2014-2017)
#               Cardiff University (2017-2022)
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""Read and write HDF5 files in the LIGO Open Science Center format

For more details, see :ref:`gwpy-table-io`.
"""

import os.path
import re
from math import ceil
from urllib.parse import urlparse

from astropy.io import registry
from astropy.units import Quantity
from astropy.utils.data import get_readable_fileobj

from gwosc.locate import get_urls

from .. import (StateVector, TimeSeries)
from ...io import (gwf as io_gwf, hdf5 as io_hdf5)
from ...io.cache import (
    file_segment,
    sieve as sieve_cache,
)
from ...io.utils import file_path
from ...detector.units import parse_unit
from ...segments import Segment
from ...time import to_gps
from ...utils.env import bool_env

DQMASK_CHANNEL_REGEX = re.compile(r"\A[A-Z]\d:(GW|L)OSC-.*DQMASK\Z")
STRAIN_CHANNEL_REGEX = re.compile(r"\A[A-Z]\d:(GW|L)OSC-.*STRAIN\Z")

GWOSC_LOCATE_KWARGS = (
    'sample_rate',
    'version',
    'host',
    'format',
    'dataset',
)


# -- utilities ----------------------------------------------------------------

def _download_file(url, cache=None, verbose=False):
    if cache is None:
        cache = bool_env('GWPY_CACHE', False)
    return get_readable_fileobj(url, cache=cache, show_progress=verbose)


def _fetch_gwosc_data_file(url, *args, **kwargs):
    """Fetch a single GWOSC file and return a `Series`.
    """
    cls = kwargs.pop('cls', TimeSeries)
    cache = kwargs.pop('cache', None)
    verbose = kwargs.pop('verbose', False)

    # match file format
    if url.endswith('.gz'):
        ext = os.path.splitext(url[:-3])[-1]
    else:
        ext = os.path.splitext(url)[-1]
    if ext == '.hdf5':
        kwargs.setdefault('format', 'hdf5.gwosc')
    elif ext == '.txt':
        kwargs.setdefault('format', 'ascii.gwosc')
    elif ext == '.gwf':
        kwargs.setdefault('format', 'gwf')

    with _download_file(url, cache, verbose=verbose) as rem:
        # get channel for GWF if not given
        if ext == ".gwf" and (not args or args[0] is None):
            args = (_gwf_channel(rem, cls, kwargs.get("verbose")),)
        if verbose:
            print('Reading data...', end=' ')
        try:
            series = cls.read(rem, *args, **kwargs)
        except Exception as exc:
            if verbose:
                print('')
            exc.args = ("Failed to read GWOSC data from %r: %s"
                        % (url, str(exc)),)
            raise
        else:
            # parse bits from unit in GWF
            if ext == '.gwf' and isinstance(series, StateVector):
                try:
                    bits = {}
                    for bit in str(series.unit).split():
                        a, b = bit.split(':', 1)
                        bits[int(a)] = b
                    series.bits = bits
                    series.override_unit('')
                except (TypeError, ValueError):  # don't care, bad GWOSC
                    pass

            if verbose:
                print('[Done]')
            return series


def _overlapping(files):
    """Quick method to see if a file list contains overlapping files
    """
    segments = set()
    for path in files:
        seg = file_segment(path)
        for s in segments:
            if seg.intersects(s):
                return True
        segments.add(seg)
    return False


# -- remote data access (the main event) --------------------------------------

def fetch_gwosc_data(detector, start, end, cls=TimeSeries, **kwargs):
    """Fetch GWOSC data for a given detector

    This function is for internal purposes only, all users should instead
    use the interface provided by `TimeSeries.fetch_open_data` (and similar
    for `StateVector.fetch_open_data`).
    """
    # format arguments
    start = to_gps(start)
    end = to_gps(end)
    span = Segment(start, end)
    kwargs.update({
        'start': start,
        'end': end,
    })

    # find URLs (requires gwopensci)
    url_kw = {key: kwargs.pop(key) for key in GWOSC_LOCATE_KWARGS if
              key in kwargs}
    if 'sample_rate' in url_kw:  # format as Hertz
        url_kw['sample_rate'] = Quantity(url_kw['sample_rate'], 'Hz').value
    cache = sieve_cache(
        get_urls(detector, int(start), int(ceil(end)), **url_kw),
        segment=span,
    )
    # if event dataset, pick shortest file that covers the request
    # -- this is a bit hacky, and presumes that only an event dataset
    # -- would be produced with overlapping files.
    # -- This should probably be improved to use dataset information
    if len(cache) and _overlapping(cache):
        cache.sort(key=lambda x: abs(file_segment(x)))
        for url in cache:
            a, b = file_segment(url)
            if a <= start and b >= end:
                cache = [url]
                break
    if kwargs.get('verbose', False):  # get_urls() guarantees len(cache) >= 1
        host = urlparse(cache[0]).netloc
        print("Fetched {0} URLs from {1} for [{2} .. {3}))".format(
            len(cache), host, int(start), int(ceil(end))))

    is_gwf = cache[0].endswith('.gwf')
    if is_gwf and len(cache):
        args = (kwargs.pop('channel', None),)
    else:
        args = ()

    # read data
    out = None
    kwargs['cls'] = cls
    for url in cache:
        keep = file_segment(url) & span
        kwargs["start"], kwargs["end"] = keep
        new = _fetch_gwosc_data_file(url, *args, **kwargs)
        if is_gwf and (not args or args[0] is None):
            args = (new.name,)
        if out is None:
            out = new.copy()
        else:
            out.append(new, resize=True)
    return out


# -- I/O ----------------------------------------------------------------------

@io_hdf5.with_read_hdf5
def read_gwosc_hdf5(
    h5f,
    path='strain/Strain',
    start=None,
    end=None,
    copy=False,
):
    """Read a `TimeSeries` from a GWOSC-format HDF file.

    Parameters
    ----------
    h5f : `str`, `h5py.HLObject`
        path of HDF5 file, or open `H5File`

    path : `str`
        name of HDF5 dataset to read.

    Returns
    -------
    data : `~gwpy.timeseries.TimeSeries`
        a new `TimeSeries` containing the data read from disk
    """
    dataset = io_hdf5.find_dataset(h5f, path)
    # read data
    nddata = dataset[()]
    # read metadata
    xunit = parse_unit(dataset.attrs['Xunits'])
    epoch = dataset.attrs['Xstart']
    dt = Quantity(dataset.attrs['Xspacing'], xunit)
    unit = dataset.attrs['Yunits']
    # build and return
    return TimeSeries(nddata, epoch=epoch, sample_rate=(1/dt).to('Hertz'),
                      unit=unit, name=path.rsplit('/', 1)[1],
                      copy=copy).crop(start=start, end=end)


@io_hdf5.with_read_hdf5
def read_gwosc_hdf5_state(
    f,
    path='quality/simple',
    start=None,
    end=None,
    copy=False,
):
    """Read a `StateVector` from a GWOSC-format HDF file.

    Parameters
    ----------
    f : `str`, `h5py.HLObject`
        path of HDF5 file, or open `H5File`

    path : `str`
        path of HDF5 dataset to read.

    start : `Time`, `~gwpy.time.LIGOTimeGPS`, optional
        start GPS time of desired data

    end : `Time`, `~gwpy.time.LIGOTimeGPS`, optional
        end GPS time of desired data

    copy : `bool`, default: `False`
        create a fresh-memory copy of the underlying array

    Returns
    -------
    data : `~gwpy.timeseries.TimeSeries`
        a new `TimeSeries` containing the data read from disk
    """
    # find data
    dataset = io_hdf5.find_dataset(f, '%s/DQmask' % path)
    maskset = io_hdf5.find_dataset(f, '%s/DQDescriptions' % path)
    # read data
    nddata = dataset[()]
    bits = [bytes.decode(bytes(b), 'utf-8') for b in maskset[()]]
    # read metadata
    epoch = dataset.attrs['Xstart']
    try:
        dt = dataset.attrs['Xspacing']
    except KeyError:
        dt = Quantity(1, 's')
    else:
        xunit = parse_unit(dataset.attrs['Xunits'])
        dt = Quantity(dt, xunit)
    return StateVector(nddata, bits=bits, t0=epoch, name='Data quality',
                       dx=dt, copy=copy).crop(start=start, end=end)


def _gwf_channel(path, series_class=TimeSeries, verbose=False):
    """Find the right channel name for a GWOSC GWF file
    """
    channels = list(io_gwf.iter_channel_names(file_path(path)))
    if issubclass(series_class, StateVector):
        regex = DQMASK_CHANNEL_REGEX
    else:
        regex = STRAIN_CHANNEL_REGEX
    found, = list(filter(regex.match, channels))
    if verbose:
        print("Using channel {0!r}".format(found))
    return found


# register
registry.register_reader('hdf5.gwosc', TimeSeries, read_gwosc_hdf5)
registry.register_reader('hdf5.gwosc', StateVector, read_gwosc_hdf5_state)