gwpy/timeseries/core.py

Summary

Maintainability
F
3 days
Test Coverage
# -*- coding: utf-8 -*-
# Copyright (C) Louisiana State University (2014-2017)
#               Cardiff University (2017-2021)
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""
The TimeSeriesBase
==================

This module defines the following classes

--------------------  ---------------------------------------------------------
`TimeSeriesBase`      base of the `TimeSeries` and `StateVector` classes,
                      provides the constructor and all common methods
                      (mainly everything that isn't signal-processing related)
`TimeSeriesBaseDict`  base of the `TimeSeriesDict`, this exists mainly so that
                      the `TimeSeriesDict` and `StateVectorDict` can be
                      distinct objects
`TimeSeriesBaseList`  base of the `TimeSeriesList` and `StateVectorList`,
                      same reason for living as the `TimeSeriesBaseDict`
--------------------  ---------------------------------------------------------

**None of these objects are really designed to be used other than as bases for
user-facing objects.**
"""

import sys
import warnings
from collections import OrderedDict
from math import ceil

import numpy

from astropy import units
from astropy.io import registry as io_registry

from gwosc.api import DEFAULT_URL as GWOSC_DEFAULT_HOST

from ..types import Series
from ..detector import (Channel, ChannelList)
from ..segments import SegmentList
from ..time import (Time, LIGOTimeGPS, gps_types, to_gps)
from ..utils import gprint

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

__all__ = ['TimeSeriesBase', 'TimeSeriesBaseDict', 'TimeSeriesBaseList']


_UFUNC_STRING = {
    'less': '<',
    'less_equal': '<=',
    'equal': '==',
    'greater_equal': '>=',
    'greater': '>',
}


# -- utilities ----------------------------------------------------------------

def _format_time(gps):
    if isinstance(gps, gps_types):
        return float(gps)
    if isinstance(gps, Time):
        return gps.gps
    return gps


def _dynamic_scaled(scaled, channel):
    """Determine default for scaled based on channel name

    This is mainly to work around LIGO not correctly recording ADC
    scaling parameters for most of Advanced LIGO (through 2023).
    Scaling parameters for H0 and L0 data are also not correct
    starting in mid-2020.

    Parameters
    ----------
    scaled : `bool`, `None`
        the scaled argument as given by the user

    channel : `str`
        the name of the channel to be read

    Returns
    -------
    scaled : `bool`
        `False` if channel is from LIGO, otherwise `True`

    Examples
    --------
    >>> _dynamic_scaled(None, "H1:channel")
    False
    >>> _dynamic_scaled(None, "V1:channel")
    True
    """
    if scaled is not None:
        return scaled
    return not str(channel).startswith(("H0", "L0", "H1", "L1"))


# -- TimeSeriesBase------------------------------------------------------------

class TimeSeriesBase(Series):
    """An `Array` with time-domain metadata.

    Parameters
    ----------
    value : array-like
        input data array

    unit : `~astropy.units.Unit`, optional
        physical unit of these data

    t0 : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
        GPS epoch associated with these data,
        any input parsable by `~gwpy.time.to_gps` is fine

    dt : `float`, `~astropy.units.Quantity`, optional, default: `1`
        time between successive samples (seconds), can also be given inversely
        via `sample_rate`

    sample_rate : `float`, `~astropy.units.Quantity`, optional, default: `1`
        the rate of samples per second (Hertz), can also be given inversely
        via `dt`

    times : `array-like`
        the complete array of GPS times accompanying the data for this series.
        This argument takes precedence over `t0` and `dt` so should be given
        in place of these if relevant, not alongside

    name : `str`, optional
        descriptive title for this array

    channel : `~gwpy.detector.Channel`, `str`, optional
        source data stream for these data

    dtype : `~numpy.dtype`, optional
        input data type

    copy : `bool`, optional, default: `False`
        choose to copy the input data to new memory

    subok : `bool`, optional, default: `True`
        allow passing of sub-classes by the array generator
    """
    _default_xunit = units.second
    _print_slots = ('t0', 'dt', 'name', 'channel')
    DictClass = None

    def __new__(cls, data, unit=None, t0=None, dt=None, sample_rate=None,
                times=None, channel=None, name=None, **kwargs):
        """Generate a new `TimeSeriesBase`.
        """
        # parse t0 or epoch
        epoch = kwargs.pop('epoch', None)
        if epoch is not None and t0 is not None:
            raise ValueError("give only one of epoch or t0")
        if epoch is None and t0 is not None:
            kwargs['x0'] = _format_time(t0)
        elif epoch is not None:
            kwargs['x0'] = _format_time(epoch)
        # parse sample_rate or dt
        if sample_rate is not None and dt is not None:
            raise ValueError("give only one of sample_rate or dt")
        if sample_rate is None and dt is not None:
            kwargs['dx'] = dt
        # parse times
        if times is not None:
            kwargs['xindex'] = times

        # generate TimeSeries
        new = super().__new__(cls, data, name=name, unit=unit,
                              channel=channel, **kwargs)

        # manually set sample_rate if given
        if sample_rate is not None:
            new.sample_rate = sample_rate

        return new

    # -- TimeSeries properties ------------------

    # rename properties from the Series
    t0 = Series.x0
    dt = Series.dx
    span = Series.xspan
    times = Series.xindex

    # -- epoch
    # this gets redefined to attach to the t0 property
    @property
    def epoch(self):
        """GPS epoch for these data.

        This attribute is stored internally by the `t0` attribute

        :type: `~astropy.time.Time`
        """
        try:
            return Time(self.t0, format='gps', scale='utc')
        except AttributeError:
            return None

    @epoch.setter
    def epoch(self, epoch):
        if epoch is None:
            del self.t0
        elif isinstance(epoch, Time):
            self.t0 = epoch.gps
        else:
            try:
                self.t0 = to_gps(epoch)
            except TypeError:
                self.t0 = epoch

    # -- sample_rate
    @property
    def sample_rate(self):
        """Data rate for this `TimeSeries` in samples per second (Hertz).

        This attribute is stored internally by the `dx` attribute

        :type: `~astropy.units.Quantity` scalar
        """
        return (1 / self.dt).to('Hertz')

    @sample_rate.setter
    def sample_rate(self, val):
        if val is None:
            del self.dt
            return
        self.dt = (1 / units.Quantity(val, units.Hertz)).to(self.xunit)

    # -- duration
    @property
    def duration(self):
        """Duration of this series in seconds

        :type: `~astropy.units.Quantity` scalar
        """
        return units.Quantity(self.span[1] - self.span[0], self.xunit,
                              dtype=float)

    # -- TimeSeries accessors -------------------

    @classmethod
    def read(cls, source, *args, **kwargs):
        """Read data into a `TimeSeries`

        Arguments and keywords depend on the output format, see the
        online documentation for full details for each format, the parameters
        below are common to most formats.

        Parameters
        ----------
        source : `str`, `list`
            Source of data, any of the following:

            - `str` path of single data file,
            - `str` path of LAL-format cache file,
            - `list` of paths.

        name : `str`, `~gwpy.detector.Channel`
            the name of the channel to read, or a `Channel` object.

        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
            GPS start time of required data, defaults to start of data found;
            any input parseable by `~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
            GPS end time of required data, defaults to end of data found;
            any input parseable by `~gwpy.time.to_gps` is fine

        format : `str`, optional
            source format identifier. If not given, the format will be
            detected if possible. See below for list of acceptable
            formats.

        nproc : `int`, optional
            number of parallel processes to use, serial process by
            default.

        pad : `float`, optional
            value with which to fill gaps in the source data,
            by default gaps will result in a `ValueError`.

        Raises
        ------
        IndexError
            if ``source`` is an empty list

        Notes
        -----"""
        from .io.core import read as timeseries_reader
        return timeseries_reader(cls, source, *args, **kwargs)

    def write(self, target, *args, **kwargs):
        """Write this `TimeSeries` to a file

        Parameters
        ----------
        target : `str`
            path of output file

        format : `str`, optional
            output format identifier. If not given, the format will be
            detected if possible. See below for list of acceptable
            formats.

        Notes
        -----"""
        return io_registry.write(self, target, *args, **kwargs)

    @classmethod
    def fetch(cls, channel, start, end, host=None, port=None, verbose=False,
              connection=None, verify=False, pad=None, allow_tape=None,
              scaled=None, type=None, dtype=None):
        """Fetch data from NDS

        Parameters
        ----------
        channel : `str`, `~gwpy.detector.Channel`
            the data channel for which to query

        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS start time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS end time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        host : `str`, optional
            URL of NDS server to use, if blank will try any server
            (in a relatively sensible order) to get the data

        port : `int`, optional
            port number for NDS server query, must be given with `host`

        verify : `bool`, optional, default: `False`
            check channels exist in database before asking for data

        scaled : `bool`, optional
            apply slope and bias calibration to ADC data, for non-ADC data
            this option has no effect

        connection : `nds2.connection`, optional
            open NDS connection to use

        verbose : `bool`, optional
            print verbose output about NDS progress, useful for debugging;
            if ``verbose`` is specified as a string, this defines the
            prefix for the progress meter

        type : `int`, optional
            NDS2 channel type integer or string name to match

        dtype : `type`, `numpy.dtype`, `str`, optional
            NDS2 data type to match
        """
        return cls.DictClass.fetch(
            [channel], start, end, host=host, port=port, verbose=verbose,
            connection=connection, verify=verify, pad=pad, scaled=scaled,
            allow_tape=allow_tape, type=type, dtype=dtype)[str(channel)]

    @classmethod
    def fetch_open_data(cls, ifo, start, end, sample_rate=4096,
                        version=None, format='hdf5',
                        host=GWOSC_DEFAULT_HOST, verbose=False,
                        cache=None, **kwargs):
        """Fetch open-access data from the LIGO Open Science Center

        Parameters
        ----------
        ifo : `str`
            the two-character prefix of the IFO in which you are interested,
            e.g. `'L1'`

        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
            GPS start time of required data, defaults to start of data found;
            any input parseable by `~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
            GPS end time of required data, defaults to end of data found;
            any input parseable by `~gwpy.time.to_gps` is fine

        sample_rate : `float`, optional,
            the sample rate of desired data; most data are stored
            by GWOSC at 4096 Hz, however there may be event-related
            data releases with a 16384 Hz rate, default: `4096`

        version : `int`, optional
            version of files to download, defaults to highest discovered
            version

        format : `str`, optional
            the data format to download and parse, default: ``'h5py'``

            - ``'hdf5'``
            - ``'gwf'`` - requires |LDAStools.frameCPP|_

        host : `str`, optional
            HTTP host name of GWOSC server to access

        verbose : `bool`, optional, default: `False`
            print verbose output while fetching data

        cache : `bool`, optional
            save/read a local copy of the remote URL, default: `False`;
            useful if the same remote data are to be accessed multiple times.
            Set `GWPY_CACHE=1` in the environment to auto-cache.

        **kwargs
            any other keyword arguments are passed to the `TimeSeries.read`
            method that parses the file that was downloaded

        Examples
        --------
        >>> from gwpy.timeseries import (TimeSeries, StateVector)
        >>> print(TimeSeries.fetch_open_data('H1', 1126259446, 1126259478))
        TimeSeries([  2.17704028e-19,  2.08763900e-19,  2.39681183e-19,
                    ...,   3.55365541e-20,  6.33533516e-20,
                      7.58121195e-20]
                   unit: Unit(dimensionless),
                   t0: 1126259446.0 s,
                   dt: 0.000244140625 s,
                   name: Strain,
                   channel: None)
        >>> print(StateVector.fetch_open_data('H1', 1126259446, 1126259478))
        StateVector([127,127,127,127,127,127,127,127,127,127,127,127,
                     127,127,127,127,127,127,127,127,127,127,127,127,
                     127,127,127,127,127,127,127,127]
                    unit: Unit(dimensionless),
                    t0: 1126259446.0 s,
                    dt: 1.0 s,
                    name: Data quality,
                    channel: None,
                    bits: Bits(0: data present
                               1: passes cbc CAT1 test
                               2: passes cbc CAT2 test
                               3: passes cbc CAT3 test
                               4: passes burst CAT1 test
                               5: passes burst CAT2 test
                               6: passes burst CAT3 test,
                               channel=None,
                               epoch=1126259446.0))

        For the `StateVector`, the naming of the bits will be
        ``format``-dependent, because they are recorded differently by GWOSC
        in different formats.

        Notes
        -----
        `StateVector` data are not available in ``txt.gz`` format.
        """
        from .io.losc import fetch_gwosc_data
        return fetch_gwosc_data(
            ifo,
            start,
            end,
            sample_rate=sample_rate,
            version=version,
            format=format,
            verbose=verbose,
            cache=cache,
            host=host,
            cls=cls,
            **kwargs,
        )

    @classmethod
    def find(cls, channel, start, end, frametype=None, pad=None,
             scaled=None, nproc=1, verbose=False, **readargs):
        """Find and read data from frames for a channel

        Parameters
        ----------
        channel : `str`, `~gwpy.detector.Channel`
            the name of the channel to read, or a `Channel` object.

        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS start time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS end time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        frametype : `str`, optional
            name of frametype in which this channel is stored, will search
            for containing frame types if necessary

        nproc : `int`, optional, default: `1`
            number of parallel processes to use, serial process by
            default.

        pad : `float`, optional
            value with which to fill gaps in the source data,
            by default gaps will result in a `ValueError`.

        allow_tape : `bool`, optional, default: `True`
            allow reading from frame files on (slow) magnetic tape

        verbose : `bool`, optional
            print verbose output about read progress, if ``verbose``
            is specified as a string, this defines the prefix for the
            progress meter

        **readargs
            any other keyword arguments to be passed to `.read()`
        """
        return cls.DictClass.find(
            [channel], start, end,
            frametype=frametype,
            verbose=verbose,
            pad=pad,
            scaled=scaled,
            nproc=nproc,
            **readargs
        )[str(channel)]

    @classmethod
    def get(cls, channel, start, end, pad=None, scaled=None,
            dtype=None, verbose=False, allow_tape=None, **kwargs):
        """Get data for this channel from frames or NDS

        This method dynamically accesses either frames on disk, or a
        remote NDS2 server to find and return data for the given interval

        Parameters
        ----------
        channel : `str`, `~gwpy.detector.Channel`
            the name of the channel to read, or a `Channel` object.

        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS start time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS end time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        pad : `float`, optional
            value with which to fill gaps in the source data,
            by default gaps will result in a `ValueError`.

        scaled : `bool`, optional
            apply slope and bias calibration to ADC data, for non-ADC data
            this option has no effect

        nproc : `int`, optional, default: `1`
            number of parallel processes to use, serial process by
            default.

        allow_tape : `bool`, optional, default: `None`
            allow the use of frames that are held on tape, default is `None`
            to attempt to allow the `TimeSeries.fetch` method to
            intelligently select a server that doesn't use tapes for
            data storage (doesn't always work), but to eventually allow
            retrieving data from tape if required

        verbose : `bool`, optional
            print verbose output about data access progress, if ``verbose``
            is specified as a string, this defines the prefix for the
            progress meter

        **kwargs
            other keyword arguments to pass to either
            :meth:`.find` (for direct GWF file access) or
            :meth:`.fetch` for remote NDS2 access

        See also
        --------
        TimeSeries.fetch
            for grabbing data from a remote NDS2 server
        TimeSeries.find
            for discovering and reading data from local GWF files
        """
        return cls.DictClass.get(
            [channel], start, end, pad=pad, scaled=scaled, dtype=dtype,
            verbose=verbose, allow_tape=allow_tape, **kwargs)[str(channel)]

    # -- utilities ------------------------------

    def plot(self, method='plot', figsize=(12, 4), xscale='auto-gps',
             **kwargs):
        """Plot the data for this timeseries

        Returns
        -------
        figure : `~matplotlib.figure.Figure`
            the newly created figure, with populated Axes.

        See also
        --------
        matplotlib.pyplot.figure
            for documentation of keyword arguments used to create the
            figure
        matplotlib.figure.Figure.add_subplot
            for documentation of keyword arguments used to create the
            axes
        matplotlib.axes.Axes.plot
            for documentation of keyword arguments used in rendering the data
        """
        kwargs.update(figsize=figsize, xscale=xscale)
        return super().plot(method=method, **kwargs)

    @classmethod
    def from_nds2_buffer(cls, buffer_, scaled=None, copy=True, **metadata):
        """Construct a new series from an `nds2.buffer` object

        **Requires:** |nds2|_

        Parameters
        ----------
        buffer_ : `nds2.buffer`
            the input NDS2-client buffer to read

        scaled : `bool`, optional
            apply slope and bias calibration to ADC data, for non-ADC data
            this option has no effect

        copy : `bool`, optional
            if `True`, copy the contained data array to new  to a new array

        **metadata
            any other metadata keyword arguments to pass to the `TimeSeries`
            constructor

        Returns
        -------
        timeseries : `TimeSeries`
            a new `TimeSeries` containing the data from the `nds2.buffer`,
            and the appropriate metadata
        """
        # get Channel from buffer
        channel = Channel.from_nds2(buffer_.channel)

        # set default metadata
        metadata.setdefault('channel', channel)
        metadata.setdefault('epoch', LIGOTimeGPS(buffer_.gps_seconds,
                                                 buffer_.gps_nanoseconds))
        metadata.setdefault('sample_rate', channel.sample_rate)
        metadata.setdefault('unit', channel.unit)
        metadata.setdefault('name', buffer_.name)

        # unwrap data
        scaled = _dynamic_scaled(scaled, channel.name)
        slope = buffer_.signal_slope
        offset = buffer_.signal_offset
        null_scaling = slope == 1. and offset == 0.
        if scaled and not null_scaling:
            data = buffer_.data.copy() * slope + offset
            copy = False
        else:
            data = buffer_.data

        # construct new TimeSeries-like object
        return cls(data, copy=copy, **metadata)

    @classmethod
    def from_lal(cls, lalts, copy=True):
        """Generate a new TimeSeries from a LAL TimeSeries of any type.
        """
        # convert the units
        from ..utils.lal import (from_lal_unit, from_lal_type)
        unit = from_lal_unit(lalts.sampleUnits)

        try:
            dtype = lalts.data.data.dtype
        except AttributeError:  # no data
            dtype = from_lal_type(lalts)
            data = []
        else:
            data = lalts.data.data

        # create new series
        out = cls(
            data,
            dtype=dtype,
            name=lalts.name or None,
            unit=unit,
            t0=lalts.epoch,
            dt=lalts.deltaT,
            channel=None,
            copy=False,
        )

        if copy:
            return out.copy()
        return out

    def to_lal(self):
        """Convert this `TimeSeries` into a LAL TimeSeries.

        .. note::

           This operation always copies data to new memory.
        """
        import lal
        from ..utils.lal import (find_typed_function, to_lal_unit)

        # map unit
        try:
            unit, scale = to_lal_unit(self.unit)
        except ValueError as exc:
            warnings.warn(f"{exc}, defaulting to lal.DimensionlessUnit")
            unit = lal.DimensionlessUnit
            scale = 1

        # create TimeSeries
        create = find_typed_function(self.dtype, 'Create', 'TimeSeries')
        lalts = create(
            self.name or str(self.channel or "") or None,
            LIGOTimeGPS(to_gps(self.epoch.gps)),
            0,
            self.dt.value,
            unit,
            self.shape[0],
        )

        # assign data
        lalts.data.data = self.value
        if scale != 1:
            lalts.data.data *= scale

        return lalts

    @classmethod
    def from_pycbc(cls, pycbcseries, copy=True):
        """Convert a `pycbc.types.timeseries.TimeSeries` into a `TimeSeries`

        Parameters
        ----------
        pycbcseries : `pycbc.types.timeseries.TimeSeries`
            the input PyCBC `~pycbc.types.timeseries.TimeSeries` array

        copy : `bool`, optional, default: `True`
            if `True`, copy these data to a new array

        Returns
        -------
        timeseries : `TimeSeries`
            a GWpy version of the input timeseries
        """
        return cls(pycbcseries.data, t0=pycbcseries.start_time,
                   dt=pycbcseries.delta_t, copy=copy)

    def to_pycbc(self, copy=True):
        """Convert this `TimeSeries` into a PyCBC
        `~pycbc.types.timeseries.TimeSeries`

        Parameters
        ----------
        copy : `bool`, optional, default: `True`
            if `True`, copy these data to a new array

        Returns
        -------
        timeseries : `~pycbc.types.timeseries.TimeSeries`
            a PyCBC representation of this `TimeSeries`
        """
        from pycbc import types
        return types.TimeSeries(self.value,
                                delta_t=self.dt.to('s').value,
                                epoch=self.epoch.gps, copy=copy)

    # -- TimeSeries operations ------------------

    def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
        out = super().__array_ufunc__(ufunc, method, *inputs, **kwargs)
        if out.dtype is numpy.dtype(bool) and len(inputs) == 2:
            from .statevector import StateTimeSeries
            orig, value = inputs
            try:
                op_ = _UFUNC_STRING[ufunc.__name__]
            except KeyError:
                op_ = ufunc.__name__
            out = out.view(StateTimeSeries)
            out.__metadata_finalize__(orig)
            out.override_unit('')
            oname = orig.name if isinstance(orig, type(self)) else orig
            vname = value.name if isinstance(value, type(self)) else value
            out.name = '{0!s} {1!s} {2!s}'.format(oname, op_, vname)
        return out

    # Quantity overrides __eq__ and __ne__ in a way that doesn't work for us,
    # so we just undo that
    def __eq__(self, other):
        return numpy.ndarray.__eq__(self, other)

    def __ne__(self, other):
        return numpy.ndarray.__ne__(self, other)


# -- TimeSeriesBaseDict -------------------------------------------------------

def as_series_dict_class(seriesclass):
    """Decorate a `dict` class to declare itself as the `DictClass` for
    its `EntryClass`

    This method should be used to decorate sub-classes of the
    `TimeSeriesBaseDict` to provide a reference to that class from the
    relevant subclass of `TimeSeriesBase`.
    """
    def decorate_class(cls):
        """Set ``cls`` as the `DictClass` attribute for this series type
        """
        seriesclass.DictClass = cls
        return cls
    return decorate_class


@as_series_dict_class(TimeSeriesBase)
class TimeSeriesBaseDict(OrderedDict):
    """Ordered key-value mapping of named `TimeSeriesBase` objects

    This object is designed to hold data for many different sources (channels)
    for a single time span.

    The main entry points for this object are the
    :meth:`~TimeSeriesBaseDict.read` and :meth:`~TimeSeriesBaseDict.fetch`
    data access methods.
    """
    EntryClass = TimeSeriesBase

    @property
    def span(self):
        """The GPS ``[start, stop)`` extent of data in this `dict`

        :type: `~gwpy.segments.Segment`
        """
        span = SegmentList()
        for value in self.values():
            span.append(value.span)
        try:
            return span.extent()
        except ValueError as exc:  # empty list
            exc.args = (
                'cannot calculate span for empty {0}'.format(
                    type(self).__name__),
            )
            raise

    @classmethod
    def read(cls, source, *args, **kwargs):
        """Read data for multiple channels into a `TimeSeriesDict`

        Parameters
        ----------
        source : `str`, `list`
            Source of data, any of the following:

            - `str` path of single data file,
            - `str` path of LAL-format cache file,
            - `list` of paths.

        channels : `~gwpy.detector.channel.ChannelList`, `list`
            a list of channels to read from the source.

        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str` optional
            GPS start time of required data, anything parseable by
            :func:`~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
            GPS end time of required data, anything parseable by
            :func:`~gwpy.time.to_gps` is fine

        format : `str`, optional
            source format identifier. If not given, the format will be
            detected if possible. See below for list of acceptable
            formats.

        nproc : `int`, optional
            number of parallel processes to use, serial process by
            default.

        pad : `float`, optional
            value with which to fill gaps in the source data,
            by default gaps will result in a `ValueError`.

        Returns
        -------
        tsdict : `TimeSeriesDict`
            a `TimeSeriesDict` of (`channel`, `TimeSeries`) pairs. The keys
            are guaranteed to be the ordered list `channels` as given.

        Notes
        -----"""
        from .io.core import read as timeseries_reader
        return timeseries_reader(cls, source, *args, **kwargs)

    def write(self, target, *args, **kwargs):
        """Write this `TimeSeriesDict` to a file

        Arguments and keywords depend on the output format, see the
        online documentation for full details for each format.

        Parameters
        ----------
        target : `str`
            output filename

        format : `str`, optional
            output format identifier. If not given, the format will be
            detected if possible. See below for list of acceptable
            formats.

        Notes
        -----"""
        return io_registry.write(self, target, *args, **kwargs)

    def __iadd__(self, other):
        return self.append(other)

    def copy(self):
        """Return a copy of this dict with each value copied to new memory
        """
        new = self.__class__()
        for key, val in self.items():
            new[key] = val.copy()
        return new

    def append(self, other, copy=True, **kwargs):
        """Append the dict ``other`` to this one

        Parameters
        ----------
        other : `dict` of `TimeSeries`
            the container to append to this one

        copy : `bool`, optional
            if `True` copy data from ``other`` before storing, only
            affects those keys in ``other`` that aren't in ``self``

        **kwargs
            other keyword arguments to send to `TimeSeries.append`

        See also
        --------
        TimeSeries.append
            for details of the underlying series append operation
        """
        for key, series in other.items():
            if key in self:
                self[key].append(series, **kwargs)
            elif copy:
                self[key] = series.copy()
            else:
                self[key] = series
        return self

    def prepend(self, other, **kwargs):
        """Prepend the dict ``other`` to this one

        Parameters
        ----------
        other : `dict` of `TimeSeries`
            the container to prepend to this one

        copy : `bool`, optional
            if `True` copy data from ``other`` before storing, only
            affects those keys in ``other`` that aren't in ``self``

        **kwargs
            other keyword arguments to send to `TimeSeries.prepend`

        See also
        --------
        TimeSeries.prepend
            for details of the underlying series prepend operation
        """
        for key, series in other.items():
            if key in self:
                self[key].prepend(series, **kwargs)
            else:
                self[key] = series
        return self

    def crop(self, start=None, end=None, copy=False):
        """Crop each entry of this `dict`.

        This method calls the :meth:`crop` method of all entries and
        modifies this dict in place.

        Parameters
        ----------
        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS start time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
            GPS end time of required data, defaults to end of data found;
            any input parseable by `~gwpy.time.to_gps` is fine

        copy : `bool`, optional, default: `False`
            If `True` copy the data for each entry to fresh memory,
            otherwise return a view.

        See also
        --------
        TimeSeries.crop
            for more details
        """
        for key, val in self.items():
            self[key] = val.crop(start=start, end=end, copy=copy)
        return self

    def resample(self, rate, **kwargs):
        """Resample items in this dict.

        This operation over-writes items inplace.

        Parameters
        ----------
        rate : `dict`, `float`
            either a `dict` of (channel, `float`) pairs for key-wise
            resampling, or a single float/int to resample all items.

        **kwargs
             other keyword arguments to pass to each item's resampling
             method.
        """
        if not isinstance(rate, dict):
            rate = dict((c, rate) for c in self)
        for key, resamp in rate.items():
            self[key] = self[key].resample(resamp, **kwargs)
        return self

    @classmethod
    def fetch(cls, channels, start, end, host=None, port=None,
              verify=False, verbose=False, connection=None,
              pad=None, scaled=None, allow_tape=None, type=None,
              dtype=None):
        """Fetch data from NDS for a number of channels.

        Parameters
        ----------
        channels : `list`
            required data channels.

        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS start time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
            GPS end time of required data, defaults to end of data found;
            any input parseable by `~gwpy.time.to_gps` is fine

        host : `str`, optional
            URL of NDS server to use, if blank will try any server
            (in a relatively sensible order) to get the data

        port : `int`, optional
            port number for NDS server query, must be given with `host`.

        verify : `bool`, optional, default: `True`
            check channels exist in database before asking for data

        verbose : `bool`, optional
            print verbose output about NDS download progress, if ``verbose``
            is specified as a string, this defines the prefix for the
            progress meter

        connection : `nds2.connection`, optional
            open NDS connection to use.

        scaled : `bool`, optional
            apply slope and bias calibration to ADC data, for non-ADC data
            this option has no effect.

        allow_tape : `bool`, optional
            allow data access from slow tapes. If `host` or `connection` is
            given, the default is to do whatever the server default is,
            otherwise servers will be searched in logical order allowing tape
            access if necessary to retrieve the data

        type : `int`, `str`, optional
            NDS2 channel type integer or string name to match.

        dtype : `numpy.dtype`, `str`, `type`, or `dict`
            NDS2 data type to match

        Returns
        -------
        data : :class:`~gwpy.timeseries.TimeSeriesBaseDict`
            a new `TimeSeriesBaseDict` of (`str`, `TimeSeries`) pairs fetched
            from NDS.
        """
        from ..io import nds2 as io_nds2
        from .io.nds2 import (print_verbose, fetch)

        if dtype is None:
            dtype = {}

        # -- open a connection ------------------

        # open connection to specific host
        if connection is None and host is not None:
            print_verbose("Opening new connection to {0}...".format(host),
                          end=' ', verbose=verbose)
            connection = io_nds2.auth_connect(host, port)
            print_verbose('connected', verbose=verbose)
        # otherwise cycle through connections in logical order
        elif connection is None:
            ifos = set([Channel(channel).ifo for channel in channels])
            if len(ifos) == 1:
                ifo = list(ifos)[0]
            else:
                ifo = None
            hostlist = io_nds2.host_resolution_order(ifo, epoch=start)
            if allow_tape is None:
                tapes = [False, True]
            else:
                tapes = [allow_tape]
            for allow_tape_ in tapes:
                error = ""  # container for error message from cls.fetch()
                for host_, port_ in hostlist:
                    try:
                        return cls.fetch(channels, start, end, host=host_,
                                         port=port_, verbose=verbose,
                                         type=type, dtype=dtype, pad=pad,
                                         scaled=scaled, allow_tape=allow_tape_)
                    except (RuntimeError, ValueError) as exc:
                        error = str(exc)  # need to assign to take out of scope
                        msg = error.split('\n', 1)[0]
                        warnings.warn(
                            f"failed to fetch data for {', '.join(channels)} "
                            f"in interval [{start}, {end}): {msg}",
                            io_nds2.NDSWarning,
                        )

                # if failing occurred because of data on tape, don't try
                # reading channels individually, the same error will occur
                if not allow_tape_ and 'Requested data is on tape' in error:
                    continue

                # if we got this far, we can't get all channels in one go
                if len(channels) > 1:
                    return cls(
                        (c, cls.EntryClass.fetch(c, start, end,
                                                 verbose=verbose, type=type,
                                                 verify=verify,
                                                 dtype=dtype.get(c), pad=pad,
                                                 scaled=scaled,
                                                 allow_tape=allow_tape_))
                        for c in channels)
            err = "Cannot find all relevant data on any known server."
            if not verbose:
                err += (" Try again using the verbose=True keyword argument "
                        " to see detailed failures.")
            raise RuntimeError(err)

        # -- at this point we have an open connection, so perform fetch

        start = to_gps(start)
        end = to_gps(end)
        istart = int(start)
        iend = int(ceil(end))

        return fetch(channels, istart, iend, connection=connection,
                     host=host, port=port, verbose=verbose, type=type,
                     dtype=dtype, pad=pad, allow_tape=allow_tape,
                     scaled=scaled,
                     series_class=cls.EntryClass).crop(start, end)

    @classmethod
    def find(cls, channels, start, end, frametype=None,
             frametype_match=None, pad=None, scaled=None, nproc=1,
             verbose=False, allow_tape=True, observatory=None, **readargs):
        """Find and read data from frames for a number of channels.

        This method uses :mod:`gwdatafind` to discover the (`file://`) URLs
        that provide the requested data, then reads those files using
        :meth:`TimeSeriesDict.read()`.

        Parameters
        ----------
        channels : `list`
            Required data channels.

        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS start time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS end time of required data, defaults to end of data found;
            any input parseable by `~gwpy.time.to_gps` is fine

        frametype : `str`
            Name of frametype in which this channel is stored; if not given
            all frametypes discoverable via GWDataFind will be searched for
            the required channels.

        frametype_match : `str`
            Regular expression to use for frametype matching.

        pad : `float`
            Value with which to fill gaps in the source data,
            by default gaps will result in a `ValueError`.

        scaled : `bool`
            Apply slope and bias calibration to ADC data, for non-ADC data
            this option has no effect.

        nproc : `int`
            Number of parallel processes to use.

        allow_tape : `bool`
            Allow reading from frame files on (slow) magnetic tape.

        verbose : `bool`, optional
            Print verbose output about read progress, if ``verbose``
            is specified as a string, this defines the prefix for the
            progress meter.

        readargs
            Any other keyword arguments to be passed to `.read()`.

        Raises
        ------
        requests.exceptions.HTTPError
            If the GWDataFind query fails for any reason.

        RuntimeError
            If no files are found to read, or if the read operation
            fails.
        """
        from ..io import datafind as io_datafind

        start = to_gps(start)
        end = to_gps(end)

        # -- find frametype(s)

        frametypes = {}

        if frametype is None:
            matched = io_datafind.find_best_frametype(
                channels,
                start,
                end,
                frametype_match=frametype_match,
                allow_tape=allow_tape,
            )

            # flip dict to frametypes with a list of channels
            for name, ftype in matched.items():
                try:
                    frametypes[ftype].append(name)
                except KeyError:
                    frametypes[ftype] = [name]

            if verbose and len(frametypes) > 1:
                gprint(f"Determined {len(frametypes)} frametypes to read")
            elif verbose:
                gprint(f"Determined best frametype as '{list(frametypes)[0]}'")
        else:  # use the given frametype for all channels
            frametypes[frametype] = channels

        # -- read data

        out = cls()
        for frametype, clist in frametypes.items():
            if verbose:
                verbose = f"Reading '{frametype}' data"

            # parse as a ChannelList
            channellist = ChannelList.from_names(*clist)
            # strip trend tags from channel names
            names = [c.name for c in channellist]

            # find observatory for this group
            if observatory is None:
                try:
                    obs = ''.join(
                        sorted(set(c.ifo[0] for c in channellist)))
                except TypeError as exc:
                    raise ValueError(
                        "Cannot parse list of IFOs from channel names",
                    ) from exc

            else:
                obs = observatory
            # find frames
            cache = io_datafind.find_urls(
                obs,
                frametype,
                start,
                end,
                on_gaps="error" if pad is None else "warn",
            )
            if not cache:
                raise RuntimeError(
                    f"No {observatory}-{frametype} URLs found for "
                    f"[{start}, {end})",
                )

            # read data
            new = cls.read(
                cache,
                names,
                start=start,
                end=end,
                pad=pad,
                scaled=scaled,
                nproc=nproc,
                verbose=verbose,
                **readargs,
            )

            # map back to user-given channel name and append
            out.append(type(new)(
                (key, new[chan]) for (key, chan) in zip(clist, names)
            ))
        return out

    @classmethod
    def get(cls, channels, start, end, pad=None, scaled=None,
            dtype=None, verbose=False, allow_tape=None, **kwargs):
        """Retrieve data for multiple channels from frames or NDS

        This method dynamically accesses either frames on disk, or a
        remote NDS2 server to find and return data for the given interval

        Parameters
        ----------
        channels : `list`
            required data channels.

        start : `~gwpy.time.LIGOTimeGPS`, `float`, `str`
            GPS start time of required data,
            any input parseable by `~gwpy.time.to_gps` is fine

        end : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
            GPS end time of required data, defaults to end of data found;
            any input parseable by `~gwpy.time.to_gps` is fine

        frametype : `str`, optional
            name of frametype in which this channel is stored, by default
            will search for all required frame types

        pad : `float`, optional
            value with which to fill gaps in the source data,
            by default gaps will result in a `ValueError`.

        scaled : `bool`, optional
            apply slope and bias calibration to ADC data, for non-ADC data
            this option has no effect.

        nproc : `int`, optional, default: `1`
            number of parallel processes to use, serial process by
            default.

        allow_tape : `bool`, optional, default: `None`
            allow the use of frames that are held on tape, default is `None`
            to attempt to allow the `TimeSeries.fetch` method to
            intelligently select a server that doesn't use tapes for
            data storage (doesn't always work), but to eventually allow
            retrieving data from tape if required

        verbose : `bool`, optional
            print verbose output about data access progress, if ``verbose``
            is specified as a string, this defines the prefix for the
            progress meter

        **kwargs
            other keyword arguments to pass to either
            `TimeSeriesBaseDict.find` (for direct GWF file access) or
            `TimeSeriesBaseDict.fetch` for remote NDS2 access
        """
        # separate non-None nds2-only keywords here
        nds_kw = {}
        for key in ('host', 'port', 'connection', 'type', 'dtype'):
            val = kwargs.pop(key, None)
            if val is not None:
                nds_kw[key] = val

        # try and find from frames
        if not nds_kw:
            if verbose:
                gprint("Attempting to access data from frames...")
            try:
                return cls.find(channels, start, end, pad=pad, scaled=scaled,
                                verbose=verbose,
                                allow_tape=allow_tape or False,
                                **kwargs)
            except (ImportError, RuntimeError, ValueError) as exc:
                if verbose:
                    gprint(str(exc), file=sys.stderr)
                    gprint("Failed to access data from frames, trying NDS...")

        # remove kwargs for .find()
        for key in ('nproc', 'frametype', 'frametype_match', 'observatory'):
            kwargs.pop(key, None)
        kwargs.update(nds_kw)  # replace nds keywords

        # otherwise fetch from NDS
        try:
            return cls.fetch(channels, start, end, pad=pad, scaled=scaled,
                             dtype=dtype, allow_tape=allow_tape,
                             verbose=verbose, **kwargs)
        except RuntimeError as exc:
            # if all else fails, try and get each channel individually
            if len(channels) == 1:
                raise
            else:
                if verbose:
                    gprint(str(exc), file=sys.stderr)
                    gprint("Failed to access data for all channels as a "
                           "group, trying individually:")
                return cls(
                    (c, cls.EntryClass.get(c, start, end, pad=pad,
                                           scaled=scaled, dtype=dtype,
                                           allow_tape=allow_tape,
                                           verbose=verbose, **kwargs))
                    for c in channels)

    @classmethod
    def from_nds2_buffers(cls, buffers, scaled=None, copy=True, **metadata):
        """Construct a new dict from a list of `nds2.buffer` objects

        **Requires:** |nds2|_

        Parameters
        ----------
        buffers : `list` of `nds2.buffer`
            the input NDS2-client buffers to read

        scaled : `bool`, optional
            apply slope and bias calibration to ADC data, for non-ADC data
            this option has no effect.

        copy : `bool`, optional
            if `True`, copy the contained data array to new  to a new array

        **metadata
            any other metadata keyword arguments to pass to the `TimeSeries`
            constructor

        Returns
        -------
        dict : `TimeSeriesDict`
            a new `TimeSeriesDict` containing the data from the given buffers
        """
        tsd = cls()
        for buf in buffers:
            tsd[buf.channel.name] = tsd.EntryClass.from_nds2_buffer(
                buf, scaled=scaled, copy=copy, **metadata)
        return tsd

    def plot(self, label='key', method='plot', figsize=(12, 4),
             xscale='auto-gps', **kwargs):
        """Plot the data for this `TimeSeriesBaseDict`.

        Parameters
        ----------
        label : `str`, optional
            labelling system to use, or fixed label for all elements
            Special values include

            - ``'key'``: use the key of the `TimeSeriesBaseDict`,
            - ``'name'``: use the :attr:`~TimeSeries.name` of each element

            If anything else, that fixed label will be used for all lines.

        **kwargs
            all other keyword arguments are passed to the plotter as
            appropriate
        """
        kwargs.update({
            "method": method,
            "label": label,
        })

        # make plot
        from ..plot import Plot

        if kwargs.get("separate", False):
            plot = Plot(*self.values(), **kwargs)
        else:
            plot = Plot(self.values(), **kwargs)

        # update labels
        artmap = {'plot': 'lines', 'scatter': 'collections'}
        artists = [x for ax in plot.axes for
                   x in getattr(ax, artmap.get(method, 'lines'))]
        for key, artist in zip(self, artists):
            if label.lower() == 'name':
                lab = self[key].name
            elif label.lower() == 'key':
                lab = key
            else:
                lab = label
            artist.set_label(lab)

        return plot

    def step(self, label='key', where='post', figsize=(12, 4),
             xscale='auto-gps', **kwargs):
        """Create a step plot of this dict.

        Parameters
        ----------
        label : `str`, optional
            labelling system to use, or fixed label for all elements
            Special values include

            - ``'key'``: use the key of the `TimeSeriesBaseDict`,
            - ``'name'``: use the :attr:`~TimeSeries.name` of each element

            If anything else, that fixed label will be used for all lines.

        **kwargs
            all other keyword arguments are passed to the plotter as
            appropriate
        """
        kwargs.setdefault(
            "drawstyle",
            "steps-{}".format(where),
        )
        tmp = type(self)()
        for key, series in self.items():
            tmp[key] = series.append(series.value[-1:], inplace=False)

        return tmp.plot(label=label, figsize=figsize, xscale=xscale,
                        **kwargs)


# -- TimeSeriesBaseList -------------------------------------------------------

class TimeSeriesBaseList(list):
    """Fancy list representing a list of `TimeSeriesBase`

    The `TimeSeriesBaseList` provides an easy way to collect and organise
    `TimeSeriesBase` for a single `Channel` over multiple segments.

    Parameters
    ----------
    *items
        any number of `TimeSeriesBase`

    Returns
    -------
    list
        a new `TimeSeriesBaseList`

    Raises
    ------
    TypeError
        if any elements are not `TimeSeriesBase`
    """
    EntryClass = TimeSeriesBase

    def __init__(self, *items):
        """Initialise a new list
        """
        super().__init__()
        for item in items:
            self.append(item)

    @property
    def segments(self):
        """The `span` of each series in this list
        """
        from ..segments import SegmentList
        return SegmentList([item.span for item in self])

    def append(self, item):
        if not isinstance(item, self.EntryClass):
            raise TypeError("Cannot append type '%s' to %s"
                            % (type(item).__name__, type(self).__name__))
        super().append(item)
        return self
    append.__doc__ = list.append.__doc__

    def extend(self, item):
        item = TimeSeriesBaseList(*item)
        super().extend(item)
    extend.__doc__ = list.extend.__doc__

    def coalesce(self):
        """Merge contiguous elements of this list into single objects

        This method implicitly sorts and potentially shortens this list.
        """
        self.sort(key=lambda ts: ts.t0.value)
        i = j = 0
        N = len(self)
        while j < N:
            this = self[j]
            j += 1
            if j < N and this.is_contiguous(self[j]) == 1:
                while j < N and this.is_contiguous(self[j]):
                    try:
                        this = self[i] = this.append(self[j])
                    except ValueError as exc:
                        if 'cannot resize this array' in str(exc):
                            this = this.copy()
                            this = self[i] = this.append(self[j])
                        else:
                            raise
                    j += 1
            else:
                self[i] = this
            i += 1
        del self[i:]
        return self

    def join(self, pad=None, gap=None):
        """Concatenate all of the elements of this list into a single object

        Parameters
        ----------
        pad : `float`, optional
            value with which to fill gaps in the source data,
            by default gaps will result in a `ValueError`.

        gap : `str`, optional, default: `'raise'`
            what to do if there are gaps in the data, one of

            - ``'raise'`` - raise a `ValueError`
            - ``'ignore'`` - remove gap and join data
            - ``'pad'`` - pad gap with zeros

            If `pad` is given and is not `None`, the default is ``'pad'``,
            otherwise ``'raise'``.

        Returns
        -------
        series : `gwpy.types.TimeSeriesBase` subclass
             a single series containing all data from each entry in this list

        See also
        --------
        TimeSeries.append
            for details on how the individual series are concatenated together
        """
        if not self:
            return self.EntryClass(numpy.empty((0,) * self.EntryClass._ndim))
        self.sort(key=lambda t: t.epoch.gps)
        out = self[0].copy()
        for series in self[1:]:
            out.append(series, gap=gap, pad=pad)
        return out

    def __getslice__(self, i, j):
        return type(self)(*super().__getslice__(i, j))

    def __getitem__(self, key):
        if isinstance(key, slice):
            return type(self)(
                *super().__getitem__(key))
        return super().__getitem__(key)

    def copy(self):
        """Return a copy of this list with each element copied to new memory
        """
        out = type(self)()
        for series in self:
            out.append(series.copy())
        return out