gwpy/spectrogram/spectrogram.py

Summary

Maintainability
A
3 hrs
Test Coverage
# -*- coding: utf-8 -*-
# Copyright (C) Louisiana State University (2014-2017)
#               Cardiff University (2017-2022)
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""Spectrogram object
"""

import warnings

import numpy

from astropy import units
from astropy.io import registry as io_registry

from ..types import (Array2D, Series)
from ..timeseries import (TimeSeries, TimeSeriesList)
from ..timeseries.core import _format_time
from ..frequencyseries import FrequencySeries
from ..frequencyseries._fdcommon import fdfilter

__author__ = "Duncan Macleod <duncan.macleod@ligo.org"

__all__ = ['Spectrogram', 'SpectrogramList']


def _ordinal(n):
    """Returns the ordinal string for a given integer

    See https://stackoverflow.com/a/20007730/1307974

    Parameters
    ----------
    n : `int`
        the number to convert to ordinal

    Examples
    --------
    >>> _ordinal(11)
    '11th'
    >>> _ordinal(102)
    '102nd'
    """
    idx = int((n//10 % 10 != 1) * (n % 10 < 4) * n % 10)
    return '{}{}'.format(n, "tsnrhtdd"[idx::4])


class Spectrogram(Array2D):
    """A 2D array holding a spectrogram of time-frequency data

    Parameters
    ----------
    value : array-like
        input data array

    unit : `~astropy.units.Unit`, optional
        physical unit of these data

    epoch : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
        GPS epoch associated with these data,
        any input parsable by `~gwpy.time.to_gps` is fine

    sample_rate : `float`, `~astropy.units.Quantity`, optional, default: `1`
        the rate of samples per second (Hertz)

    times : `array-like`
        the complete array of GPS times accompanying the data for this series.
        This argument takes precedence over `epoch` and `sample_rate` so should
        be given in place of these if relevant, not alongside

    f0 : `float`, `~astropy.units.Quantity`, optional, default: `0`
        starting frequency for these data

    df : `float`, `~astropy.units.Quantity`, optional, default: `1`
        frequency resolution for these data

    frequencies : `array-like`
        the complete array of frequencies indexing the data.
        This argument takes precedence over `f0` and `df` so should
        be given in place of these if relevant, not alongside

    epoch : `~gwpy.time.LIGOTimeGPS`, `float`, `str`, optional
        GPS epoch associated with these data,
        any input parsable by `~gwpy.time.to_gps` is fine

    name : `str`, optional
        descriptive title for this array

    channel : `~gwpy.detector.Channel`, `str`, optional
        source data stream for these data

    dtype : `~numpy.dtype`, optional
        input data type

    copy : `bool`, optional, default: `False`
        choose to copy the input data to new memory

    subok : `bool`, optional, default: `True`
        allow passing of sub-classes by the array generator

    Notes
    -----
    Key methods:

    .. autosummary::

       ~Spectrogram.read
       ~Spectrogram.write
       ~Spectrogram.plot
       ~Spectrogram.zpk
    """
    _metadata_slots = Series._metadata_slots + ('y0', 'dy', 'yindex')
    _default_xunit = TimeSeries._default_xunit
    _default_yunit = FrequencySeries._default_xunit
    _rowclass = TimeSeries
    _columnclass = FrequencySeries

    def __new__(cls, data, unit=None, t0=None, dt=None, f0=None, df=None,
                times=None, frequencies=None,
                name=None, channel=None, **kwargs):
        """Generate a new Spectrogram.
        """
        # parse t0 or epoch
        epoch = kwargs.pop('epoch', None)
        if epoch is not None and t0 is not None:
            raise ValueError("give only one of epoch or t0")
        if epoch is None and t0 is not None:
            kwargs['x0'] = _format_time(t0)
        elif epoch is not None:
            kwargs['x0'] = _format_time(epoch)
        # parse sample_rate or dt
        if dt is not None:
            kwargs['dx'] = dt
        # parse times
        if times is not None:
            kwargs['xindex'] = times

        # parse y-axis params
        if f0 is not None:
            kwargs['y0'] = f0
        if df is not None:
            kwargs['dy'] = df
        if frequencies is not None:
            kwargs['yindex'] = frequencies

        # generate Spectrogram
        return super().__new__(cls, data, unit=unit, name=name,
                               channel=channel, **kwargs)

    # -- Spectrogram properties -----------------

    epoch = property(TimeSeries.epoch.__get__, TimeSeries.epoch.__set__,
                     TimeSeries.epoch.__delete__,
                     """Starting GPS epoch for this `Spectrogram`

                     :type: `~gwpy.segments.Segment`
                     """)

    t0 = property(TimeSeries.t0.__get__, TimeSeries.t0.__set__,
                  TimeSeries.t0.__delete__,
                  """GPS time of first time bin

                  :type: `~astropy.units.Quantity` in seconds
                  """)

    dt = property(TimeSeries.dt.__get__, TimeSeries.dt.__set__,
                  TimeSeries.dt.__delete__,
                  """Time-spacing for this `Spectrogram`

                  :type: `~astropy.units.Quantity` in seconds
                  """)

    span = property(TimeSeries.span.__get__, TimeSeries.span.__set__,
                    TimeSeries.span.__delete__,
                    """GPS [start, stop) span for this `Spectrogram`

                    :type: `~gwpy.segments.Segment`
                    """)

    f0 = property(Array2D.y0.__get__, Array2D.y0.__set__,
                  Array2D.y0.__delete__,
                  """Starting frequency for this `Spectrogram`

                  :type: `~astropy.units.Quantity` in Hertz
                  """)

    df = property(Array2D.dy.__get__, Array2D.dy.__set__,
                  Array2D.dy.__delete__,
                  """Frequency spacing of this `Spectrogram`

                  :type: `~astropy.units.Quantity` in Hertz
                  """)

    times = property(fget=Array2D.xindex.__get__,
                     fset=Array2D.xindex.__set__,
                     fdel=Array2D.xindex.__delete__,
                     doc="""Series of GPS times for each sample""")

    frequencies = property(fget=Array2D.yindex.__get__,
                           fset=Array2D.yindex.__set__,
                           fdel=Array2D.yindex.__delete__,
                           doc="Series of frequencies for this Spectrogram")

    band = property(fget=Array2D.yspan.__get__,
                    fset=Array2D.yspan.__set__,
                    fdel=Array2D.yspan.__delete__,
                    doc="""Frequency band described by this `Spectrogram`""")

    # -- Spectrogram i/o ------------------------

    @classmethod
    def read(cls, source, *args, **kwargs):
        """Read data into a `Spectrogram`

        Arguments and keywords depend on the output format, see the
        online documentation for full details for each format, the
        parameters below are common to most formats.

        Parameters
        ----------
        source : `str`, `list`
            Source of data, any of the following:

            - `str` path of single data file,
            - `str` path of LAL-format cache file,
            - `list` of paths.

        *args
            Other arguments are (in general) specific to the given
            ``format``.

        format : `str`, optional
            Source format identifier. If not given, the format will be
            detected if possible. See below for list of acceptable
            formats.

        **kwargs
            Other keywords are (in general) specific to the given ``format``.

        Returns
        -------
        specgram : `Spectrogram`

        Notes
        -----"""
        return io_registry.read(cls, source, *args, **kwargs)

    def write(self, target, *args, **kwargs):
        """Write this `Spectrogram` to a file

        Arguments and keywords depend on the output format, see the
        online documentation for full details for each format, the
        parameters below are common to most formats.

        Parameters
        ----------
        target : `str`
            output filename

        format : `str`, optional
            output format identifier. If not given, the format will be
            detected if possible. See below for list of acceptable
            formats.

        Notes
        -----"""
        return io_registry.write(self, target, *args, **kwargs)

    # -- Spectrogram methods --------------------

    def ratio(self, operand):
        """Calculate the ratio of this `Spectrogram` against a reference

        Parameters
        ----------
        operand : `str`, `FrequencySeries`, `Quantity`
            a `~gwpy.frequencyseries.FrequencySeries` or
            `~astropy.units.Quantity` to weight against, or one of

            - ``'mean'`` : weight against the mean of each spectrum
              in this Spectrogram
            - ``'median'`` : weight against the median of each spectrum
              in this Spectrogram

        Returns
        -------
        spectrogram : `Spectrogram`
            a new `Spectrogram`

        Raises
        ------
        ValueError
            if ``operand`` is given as a `str` that isn't supported
        """
        if isinstance(operand, str):
            if operand == 'mean':
                operand = self.mean(axis=0)
            elif operand == 'median':
                operand = self.median(axis=0)
            else:
                raise ValueError("operand %r unrecognised, please give a "
                                 "Quantity or one of: 'mean', 'median'"
                                 % operand)
        out = self / operand
        return out

    def plot(
        self,
        method="pcolormesh",
        figsize=(12, 6),
        xscale="auto-gps",
        **kwargs,
    ):
        """Plot the data for this `Spectrogram`

        Parameters
        ----------
        method : `str`, optional
            which plotting method to use to render this spectrogram,
            either ``'pcolormesh'`` (default) or ``'imshow'``

        figsize : `tuple` of `float`, optional
            ``(width, height)`` (inches) of the output figure

        xscale : `str`, optional
            the X-axis scale

        **kwargs
            all keyword arguments are passed along to underlying
            functions, see below for references

        Returns
        -------
        plot : `~gwpy.plot.Plot`
            the `Plot` containing the data

        See also
        --------
        matplotlib.pyplot.figure
            for documentation of keyword arguments used to create the
            figure
        matplotlib.figure.Figure.add_subplot
            for documentation of keyword arguments used to create the
            axes
        gwpy.plot.Axes.imshow
        gwpy.plot.Axes.pcolormesh
            for documentation of keyword arguments used in rendering the
            `Spectrogram` data
        """
        return super().plot(
            method=method,
            figsize=figsize,
            xscale=xscale,
            **kwargs,
        )

    @classmethod
    def from_spectra(cls, *spectra, **kwargs):
        """Build a new `Spectrogram` from a list of spectra.

        Parameters
        ----------
        *spectra
            any number of `~gwpy.frequencyseries.FrequencySeries` series
        dt : `float`, `~astropy.units.Quantity`, optional
            stride between given spectra

        Returns
        -------
        Spectrogram
            a new `Spectrogram` from a vertical stacking of the spectra
            The new object takes the metadata from the first given
            `~gwpy.frequencyseries.FrequencySeries` if not given explicitly

        Notes
        -----
        Each `~gwpy.frequencyseries.FrequencySeries` passed to this
        constructor must be the same length.
        """
        data = numpy.vstack([s.value for s in spectra])
        spec1 = list(spectra)[0]
        if not all(s.f0 == spec1.f0 for s in spectra):
            raise ValueError("Cannot stack spectra with different f0")
        if not all(s.df == spec1.df for s in spectra):
            raise ValueError("Cannot stack spectra with different df")
        kwargs.setdefault('name', spec1.name)
        kwargs.setdefault('channel', spec1.channel)
        kwargs.setdefault('epoch', spec1.epoch)
        kwargs.setdefault('f0', spec1.f0)
        kwargs.setdefault('df', spec1.df)
        kwargs.setdefault('unit', spec1.unit)
        if not ('dt' in kwargs or 'times' in kwargs):
            try:
                kwargs.setdefault('dt', spectra[1].epoch.gps - spec1.epoch.gps)
            except (AttributeError, IndexError):
                raise ValueError("Cannot determine dt (time-spacing) for "
                                 "Spectrogram from inputs")
        return Spectrogram(data, **kwargs)

    def percentile(self, percentile):
        """Calculate a given spectral percentile for this `Spectrogram`.

        Parameters
        ----------
        percentile : `float`
            percentile (0 - 100) of the bins to compute

        Returns
        -------
        spectrum : `~gwpy.frequencyseries.FrequencySeries`
            the given percentile `FrequencySeries` calculated from this
            `SpectralVaraicence`
        """
        out = numpy.percentile(self.value, percentile, axis=0)
        if self.name is not None:
            name = '{}: {} percentile'.format(self.name, _ordinal(percentile))
        else:
            name = None
        return FrequencySeries(
            out, epoch=self.epoch, channel=self.channel, name=name,
            f0=self.f0, df=self.df, unit=self.unit, frequencies=(
                hasattr(self, '_frequencies') and self.frequencies or None))

    def zpk(self, zeros, poles, gain, analog=True):
        """Filter this `Spectrogram` by applying a zero-pole-gain filter

        Parameters
        ----------
        zeros : `array-like`
            list of zero frequencies (in Hertz)

        poles : `array-like`
            list of pole frequencies (in Hertz)

        gain : `float`
            DC gain of filter

        analog : `bool`, optional
            type of ZPK being applied, if `analog=True` all parameters
            will be converted in the Z-domain for digital filtering

        Returns
        -------
        specgram : `Spectrogram`
            the frequency-domain filtered version of the input data

        See also
        --------
        Spectrogram.filter
            for details on how a digital ZPK-format filter is applied

        Examples
        --------
        To apply a zpk filter with file poles at 100 Hz, and five zeros at
        1 Hz (giving an overall DC gain of 1e-10)::

            >>> data2 = data.zpk([100]*5, [1]*5, 1e-10)
        """
        return self.filter(zeros, poles, gain, analog=analog)

    def filter(self, *filt, **kwargs):
        """Apply the given filter to this `Spectrogram`.

        Parameters
        ----------
        *filt : filter arguments
            1, 2, 3, or 4 arguments defining the filter to be applied,

                - an ``Nx1`` `~numpy.ndarray` of FIR coefficients
                - an ``Nx6`` `~numpy.ndarray` of SOS coefficients
                - ``(numerator, denominator)`` polynomials
                - ``(zeros, poles, gain)``
                - ``(A, B, C, D)`` 'state-space' representation

        analog : `bool`, optional
            if `True`, filter definition will be converted from Hertz
            to Z-domain digital representation, default: `False`

        inplace : `bool`, optional
            if `True`, this array will be overwritten with the filtered
            version, default: `False`

        Returns
        -------
        result : `Spectrogram`
            the filtered version of the input `Spectrogram`,
            if ``inplace=True`` was given, this is just a reference to
            the modified input array

        Raises
        ------
        ValueError
            if ``filt`` arguments cannot be interpreted properly
        """
        return fdfilter(self, *filt, **kwargs)

    def variance(self, bins=None, low=None, high=None, nbins=500,
                 log=False, norm=False, density=False):
        """Calculate the `SpectralVariance` of this `Spectrogram`.

        Parameters
        ----------
        bins : `~numpy.ndarray`, optional, default `None`
            array of histogram bin edges, including the rightmost edge
        low : `float`, optional, default: `None`
            left edge of lowest amplitude bin, only read
            if ``bins`` is not given
        high : `float`, optional, default: `None`
            right edge of highest amplitude bin, only read
            if ``bins`` is not given
        nbins : `int`, optional, default: `500`
            number of bins to generate, only read if ``bins`` is not
            given
        log : `bool`, optional, default: `False`
            calculate amplitude bins over a logarithmic scale, only
            read if ``bins`` is not given
        norm : `bool`, optional, default: `False`
            normalise bin counts to a unit sum
        density : `bool`, optional, default: `False`
            normalise bin counts to a unit integral

        Returns
        -------
        specvar : `SpectralVariance`
            2D-array of spectral frequency-amplitude counts

        See also
        --------
        numpy.histogram
            for details on specifying bins and weights
        """
        from ..frequencyseries import SpectralVariance
        return SpectralVariance.from_spectrogram(
            self, bins=bins, low=low, high=high, nbins=nbins, log=log,
            norm=norm, density=density)

    # -- Spectrogram connectors -----------------

    def crop_frequencies(self, low=None, high=None, copy=False):
        """Crop this `Spectrogram` to the specified frequencies

        Parameters
        ----------
        low : `float`
            lower frequency bound for cropped `Spectrogram`
        high : `float`
            upper frequency bound for cropped `Spectrogram`
        copy : `bool`
            if `False` return a view of the original data, otherwise create
            a fresh memory copy

        Returns
        -------
        spec : `Spectrogram`
            A new `Spectrogram` with a subset of data from the frequency
            axis
        """
        if low is not None:
            low = units.Quantity(low, self._default_yunit)
        if high is not None:
            high = units.Quantity(high, self._default_yunit)
        # check low frequency
        if low is not None and low == self.f0:
            low = None
        elif low is not None and low < self.f0:
            warnings.warn('Spectrogram.crop_frequencies given low frequency '
                          'cutoff below f0 of the input Spectrogram. Low '
                          'frequency crop will have no effect.')
        # check high frequency
        if high is not None and high.value == self.band[1]:
            high = None
        elif high is not None and high.value > self.band[1]:
            warnings.warn('Spectrogram.crop_frequencies given high frequency '
                          'cutoff above cutoff of the input Spectrogram. High '
                          'frequency crop will have no effect.')
        # find low index
        if low is None:
            idx0 = None
        else:
            idx0 = int(float(low.value - self.f0.value) // self.df.value)
        # find high index
        if high is None:
            idx1 = None
        else:
            idx1 = int(float(high.value - self.f0.value) // self.df.value)
        # crop
        if copy:
            return self[:, idx0:idx1].copy()
        return self[:, idx0:idx1]

    # -- Spectrogram ufuncs ---------------------

    def _wrap_function(self, function, *args, **kwargs):
        out = super()._wrap_function(function, *args, **kwargs)
        # requested frequency axis, return a FrequencySeries
        if out.ndim == 1 and out.x0.unit == self.y0.unit:
            return FrequencySeries(out.value, name=out.name, unit=out.unit,
                                   epoch=out.epoch, channel=out.channel,
                                   f0=out.x0.value, df=out.dx.value)
        # requested time axis, return a TimeSeries
        elif out.ndim == 1:
            return TimeSeries(out.value, name=out.name, unit=out.unit,
                              epoch=out.epoch, channel=out.channel, dx=out.dx)
        # otherwise return whatever we got back from super (Quantity)
        return out
    _wrap_function.__doc__ = Array2D._wrap_function.__doc__

    # -- other ----------------------------------

    def __getitem__(self, item):
        out = super().__getitem__(item)

        # set epoch manually, because Spectrogram doesn't store self._epoch
        if isinstance(out, self._columnclass):
            out.epoch = self.epoch

        return out


class SpectrogramList(TimeSeriesList):
    """Fancy list representing a list of `Spectrogram`

    The `SpectrogramList` provides an easy way to collect and organise
    `Spectrogram` for a single `Channel` over multiple segments.

    Parameters
    ----------
    *items
        any number of `Spectrogram` series

    Returns
    -------
    list
        a new `SpectrogramList`

    Raises
    ------
    TypeError
        if any elements are not of type `Spectrogram`
    """
    EntryClass = Spectrogram