gwpy/detector/channel.py

Summary

Maintainability
C
1 day
Test Coverage
# -*- coding: utf-8 -*-
# Copyright (C) Duncan Macleod (2014-2020)
#
# This file is part of GWpy.
#
# GWpy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GWpy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GWpy.  If not, see <http://www.gnu.org/licenses/>.

"""This module defines the `Channel` and `ChannelList` classes.
"""

import re
from copy import copy
from math import ceil
from urllib.parse import urlparse

from astropy import units
from astropy.io import registry as io_registry

from ..io import nds2 as io_nds2
from ..time import to_gps
from ..utils.misc import if_not_none
from .units import parse_unit

__author__ = 'Duncan Macleod <duncan.macleod@ligo.org>'

QUOTE_REGEX = re.compile(r'^[\s\"\']+|[\s\"\']+$')


class Channel(object):
    """Representation of a laser-interferometer data channel.

    Parameters
    ----------
    name : `str`, `Channel`
        name of this Channel (or another  Channel itself).
        If a `Channel` is given, all other parameters not set explicitly
        will be copied over.
    sample_rate : `float`, `~astropy.units.Quantity`, optional
        number of samples per second
    unit : `~astropy.units.Unit`, `str`, optional
        name of the unit for the data of this channel
    frequency_range : `tuple` of `float`
        [low, high) spectral frequency range of interest for this channel
    safe : `bool`, optional
        is this channel 'safe' to use as a witness of non-gravitational-wave
        noise in the interferometer
    dtype : `numpy.dtype`, optional
        numeric type of data for this channel
    frametype : `str`, optional
        LDAS name for frames that contain this channel
    model : `str`, optional
        name of the SIMULINK front-end model that produces this channel

    Notes
    -----
    The `Channel` structure implemented here is designed to match the
    data recorded in the LIGO Channel Information System
    (https://cis.ligo.org) for which a query interface is provided.
    """
    MATCH = re.compile(
        r'((?:(?P<ifo>[A-Z]\d))?|[\w-]+):'  # match IFO prefix
        r'(?:(?P<system>[a-zA-Z0-9]+))?'  # match system
        r'(?:[-_](?P<subsystem>[a-zA-Z0-9]+))?'  # match subsystem
        r'(?:[-_](?P<signal>[a-zA-Z0-9_-]+?))?'  # match signal
        r'(?:[\.-](?P<trend>[a-z]+))?'  # match trend type
        r'(?:,(?P<type>([a-z]-)?[a-z]+))?$'  # match channel type
    )

    def __init__(self, name, **params):
        """Create a new `Channel`

        Parameters
        ----------
        name : `str`, `Channel`
            the name of the new channel, or an existing channel to copy

        **params
            key, value pairs for attributes of new channel
        """
        # init properties
        self._name = None
        self._ifo = None
        self._system = None
        self._subsystem = None
        self._signal = None
        self._trend = None
        self._sample_rate = None
        self._unit = None
        self._frequency_range = None
        self._safe = None
        self._type = None
        self._dtype = None
        self._frametype = None
        self._model = None
        self._url = None

        # copy existing Channel
        if isinstance(name, Channel):
            self._init_from_channel(name)

        # parse name into component parts
        else:
            self._init_name(name)

        # set metadata
        for key, value in params.items():
            setattr(self, key, value)

    def _init_from_channel(self, other):
        # copy all atrributes from other into self
        for key, value in vars(other).items():
            setattr(self, key, copy(value))

    def _init_name(self, name):
        # strip off NDS stuff for 'name'
        # parse name into component parts
        try:
            parts = self.parse_channel_name(name)
        except (TypeError, ValueError):
            self.name = str(name)
        else:
            self.name = str(name).split(',')[0]
            for key, val in parts.items():
                try:
                    setattr(self, key, val)
                except AttributeError:
                    setattr(self, f'_{key}', val)

    # -- properties -----------------------------

    @property
    def name(self):
        """Name of this channel.

        This should follow the naming convention, with the following
        format: 'IFO:SYSTEM-SUBSYSTEM_SIGNAL'

        :type: `str`
        """
        return self._name

    @name.setter
    def name(self, n):
        self._name = n

    @property
    def sample_rate(self):
        """Rate of samples (Hertz) for this channel.

        :type: `~astropy.units.Quantity`
        """
        return self._sample_rate

    @sample_rate.setter
    def sample_rate(self, rate):
        if rate is None:
            self._sample_rate = None
        elif isinstance(rate, units.Quantity):
            self._sample_rate = rate
        else:
            # pylint: disable=no-member
            self._sample_rate = units.Quantity(float(rate), unit=units.Hertz)

    @property
    def unit(self):
        """Data unit for this channel.

        :type: `~astropy.units.Unit`
        """
        return self._unit

    @unit.setter
    def unit(self, u):
        self._unit = if_not_none(parse_unit, u)

    @property
    def frequency_range(self):
        """Frequency range of interest (Hertz) for this channel

        :type: `~astropy.units.Quantity` array
        """
        return self._frequency_range

    @frequency_range.setter
    def frequency_range(self, frange):
        if frange is None:
            del self.frequency_range
        else:
            low, hi = frange
            self._frequency_range = units.Quantity((low, hi), unit='Hz')

    @frequency_range.deleter
    def frequency_range(self):
        self._frequency_range = None

    @property
    def safe(self):
        """Whether this channel is 'safe' to use as a noise witness

        Any channel that records part or all of a GW signal as it
        interacts with the interferometer is not safe to use as a noise
        witness

        A safe value of `None` simply indicates that the safety of this
        channel has not been determined

        :type: `bool` or `None`
        """
        return self._safe

    @safe.setter
    def safe(self, s):
        if s is None:
            self._safe = None
        else:
            self._safe = bool(s)

    @property
    def model(self):
        """Name of the SIMULINK front-end model that defines this channel.

        :type: `str`
        """
        return self._model

    @model.setter
    def model(self, mdl):
        self._model = mdl.lower() if mdl else mdl

    @property
    def type(self):
        """DAQ data type for this channel.

        Valid values for this field are restricted to those understood
        by the NDS2 client sofware, namely:

        'm-trend', 'online', 'raw', 'reduced', 's-trend', 'static', 'test-pt'

        :type: `str`
        """
        try:
            return self._type
        except AttributeError:
            self._type = None
            return self.type

    @type.setter
    def type(self, type_):
        if type_ is None:
            self._type = None
        else:
            self._type = io_nds2.Nds2ChannelType.find(type_).nds2name

    @property
    def ndstype(self):
        """NDS type integer for this channel.

        This property is mapped to the `Channel.type` string.
        """
        if self.type is not None:
            return io_nds2.Nds2ChannelType.find(self.type).value

    @ndstype.setter
    def ndstype(self, type_):
        self.type = type_

    @property
    def dtype(self):
        """Numeric type for data in this channel.

        :type: `~numpy.dtype`
        """
        return self._dtype

    @dtype.setter
    def dtype(self, type_):
        if type_ is None:
            self._dtype = None
        else:
            self._dtype = io_nds2.Nds2DataType.find(type_).dtype

    @property
    def url(self):
        """CIS browser url for this channel.

        :type: `str`
        """
        return self._url

    @url.setter
    def url(self, href):
        if href is None:
            self._url = None
        else:
            try:
                url = urlparse(href)
                assert url.scheme in ('http', 'https', 'file')
            except (AttributeError, ValueError, AssertionError):
                raise ValueError(f"Invalid URL {href!r}")
            self._url = href

    @property
    def frametype(self):
        """LDAS type description for frame files containing this channel.
        """
        return self._frametype

    @frametype.setter
    def frametype(self, ft):
        self._frametype = ft

    # -- read-only properties -------------------

    @property
    def ifo(self):
        """Interferometer prefix for this channel.

        :type: `str`
        """
        try:
            return self._ifo
        except AttributeError:
            self._ifo = None

    @property
    def system(self):
        """Instrumental system for this channel.

        :type: `str`
        """
        try:
            return self._system
        except AttributeError:
            self._system = None

    @property
    def subsystem(self):
        """Instrumental sub-system for this channel.

        :type: `str`
        """
        try:
            return self._subsystem
        except AttributeError:
            self._subsystem = None

    @property
    def signal(self):
        """Instrumental signal for this channel.

        :type: `str`
        """
        try:
            return self._signal
        except AttributeError:
            self._signal = None

    @property
    def trend(self):
        """Trend type for this channel.

        :type: `str`
        """
        try:
            return self._trend
        except AttributeError:
            self._trend = None

    @property
    def texname(self):
        """Name of this channel in LaTeX printable format.
        """
        return str(self).replace("_", r"\_")

    @property
    def ndsname(self):
        """Name of this channel as stored in the NDS database
        """
        if self.type not in [None, 'raw', 'reduced', 'online']:
            return f'{self.name},{self.type}'
        return self.name

    # -- classmethods ---------------------------

    @classmethod
    def query(cls, name, kerberos=None, **kwargs):
        """Query the LIGO Channel Information System for the `Channel`
        matching the given name

        Parameters
        ----------
        name : `str`
            name of channel

        kerberos : `bool`, optional
            use an existing Kerberos ticket as the authentication credential,
            default behaviour will check for credentials and request username
            and password if none are found (`None`)

        kwargs
            other keyword arguments are passed directly to
            :func:`ciecplib.get`

        Returns
        -------
        c : `Channel`
             a new `Channel` containing all of the attributes set from
             its entry in the CIS
        """
        channellist = ChannelList.query(name, kerberos=kerberos, **kwargs)
        if not channellist:
            raise ValueError(f"No channels found matching '{name}'")
        if len(channellist) > 1:
            raise ValueError(
                f"{len(channellist)} channels found matching '{name}', "
                "please refine search, or use `ChannelList.query` to "
                "return all results"
            )
        return channellist[0]

    @classmethod
    def query_nds2(cls, name, host=None, port=None, connection=None,
                   type=None):
        """Query an NDS server for channel information

        Parameters
        ----------
        name : `str`
            name of requested channel
        host : `str`, optional
            name of NDS2 server.
        port : `int`, optional
            port number for NDS2 connection
        connection : `nds2.connection`
            open connection to use for query
        type : `str`, `int`
            NDS2 channel type with which to restrict query

        Returns
        -------
        channel : `Channel`
            channel with metadata retrieved from NDS2 server

        Raises
        ------
        ValueError
            if multiple channels are found for a given name

        Notes
        -----
        .. warning::

           A `host` is required if an open `connection` is not given
        """
        return ChannelList.query_nds2([name], host=host, port=port,
                                      connection=connection, type=type,
                                      unique=True)[0]

    @classmethod
    def from_nds2(cls, nds2channel):
        """Generate a new channel using an existing nds2.channel object
        """
        return cls(
            nds2channel.name,
            sample_rate=nds2channel.sample_rate,
            unit=nds2channel.signal_units or None,
            dtype=nds2channel.data_type,
            type=nds2channel.channel_type,
        )

    # -- methods --------------------------------

    @classmethod
    def parse_channel_name(cls, name, strict=True):
        """Decompose a channel name string into its components

        Parameters
        ----------
        name : `str`
            name to parse
        strict : `bool`, optional
            require exact matching of format, with no surrounding text,
            default `True`

        Returns
        -------
        match : `dict`
            `dict` of channel name components with the following keys:

            - `'ifo'`: the letter-number interferometer prefix
            - `'system'`: the top-level system name
            - `'subsystem'`: the second-level sub-system name
            - `'signal'`: the remaining underscore-delimited signal name
            - `'trend'`: the trend type
            - `'ndstype'`: the NDS2 channel suffix

            Any optional keys that aren't found will return a value of `None`

        Raises
        ------
        ValueError
            if the name cannot be parsed with at least an IFO and SYSTEM

        Examples
        --------
        >>> Channel.parse_channel_name('L1:LSC-DARM_IN1_DQ')
        {'ifo': 'L1',
         'ndstype': None,
         'signal': 'IN1_DQ',
         'subsystem': 'DARM',
         'system': 'LSC',
         'trend': None}

        >>> Channel.parse_channel_name(
            'H1:ISI-BS_ST1_SENSCOR_GND_STS_X_BLRMS_100M_300M.rms,m-trend')
        {'ifo': 'H1',
         'ndstype': 'm-trend',
         'signal': 'ST1_SENSCOR_GND_STS_X_BLRMS_100M_300M',
         'subsystem': 'BS',
         'system': 'ISI',
         'trend': 'rms'}
        """
        match = cls.MATCH.search(name)
        if match is None or (strict and (
                match.start() != 0 or match.end() != len(name))):
            raise ValueError("Cannot parse channel name according to LIGO "
                             "channel-naming convention T990033")
        return match.groupdict()

    def find_frametype(self, gpstime=None, frametype_match=None,
                       host=None, port=None, return_all=False,
                       allow_tape=True):
        """Find the containing frametype(s) for this `Channel`

        Parameters
        ----------
        gpstime : `int`
            a reference GPS time at which to search for frame files
        frametype_match : `str`
            a regular expression string to use to down-select from the
            list of all available frametypes
        host : `str`
            the name of the datafind server to use for frame file discovery
        port : `int`
            the port of the datafind server on the given host
        return_all: `bool`, default: `False`
            return all matched frame types, otherwise only the first match is
            returned
        allow_tape : `bool`, default: `True`
            include frame files on (slow) magnetic tape in the search

        Returns
        -------
        frametype : `str`, `list`
            the first matching frametype containing the this channel
            (`return_all=False`, or a `list` of all matches
        """
        from ..io import datafind
        return datafind.find_frametype(
            self, gpstime=gpstime, frametype_match=frametype_match,
            host=host, port=port, return_all=return_all,
            allow_tape=allow_tape)

    def copy(self):
        """Returns a copy of this channel
        """
        new = type(self)(str(self))
        new._init_from_channel(self)
        return new

    def __str__(self):
        return self.name

    def __repr__(self):
        repr_ = f'<Channel("{self}"'
        if self.type:
            repr_ += f' [{self.type}]'
        repr_ += f', {self.sample_rate}'
        return repr_ + f') at {hex(id(self))}>'

    def __eq__(self, other):
        for attr in ['name', 'sample_rate', 'unit', 'url', 'type', 'dtype']:
            try:
                if getattr(self, attr) != getattr(other, attr):
                    return False
            except TypeError:
                return False
        return True

    def __hash__(self):
        hash_ = 0
        for attr in ['name', 'sample_rate', 'unit', 'url', 'type', 'dtype']:
            hash_ += hash(getattr(self, attr))
        return hash_


class ChannelList(list):
    """A `list` of `channels <Channel>`, with parsing utilities.
    """

    @property
    def ifos(self):
        """The `set` of interferometer prefixes used in this
        `ChannelList`.
        """
        return set([c.ifo for c in self])

    @classmethod
    def read(cls, source, *args, **kwargs):
        """Read a `ChannelList` from a file

        Parameters
        ----------
        source : `str`, `file`
            either an open file object, or a file name path to read

        Notes
        -----"""
        return io_registry.read(cls, source, *args, **kwargs)

    def write(self, target, *args, **kwargs):
        """Write a `ChannelList` to a file

        Notes
        -----"""
        return io_registry.write(self, target, *args, **kwargs)

    @classmethod
    def from_names(cls, *names):
        """Create a new `ChannelList` from a list of names

        The list of names can include comma-separated sets of names,
        in which case the return will be a flattened list of all parsed
        channel names.
        """
        new = cls()
        for namestr in names:
            for name in cls._split_names(namestr):
                new.append(Channel(name))
        return new

    @staticmethod
    def _split_names(namestr):
        """Split a comma-separated list of channel names.
        """
        out = []
        namestr = QUOTE_REGEX.sub('', namestr)
        while True:
            namestr = namestr.strip('\' \n')
            if ',' not in namestr:
                break
            for nds2type in io_nds2.Nds2ChannelType.nds2names() + ['']:
                if nds2type and f',{nds2type}' in namestr:
                    try:
                        channel, ctype, namestr = namestr.split(',', 2)
                    except ValueError:
                        channel, ctype = namestr.split(',')
                        namestr = ''
                    out.append(f'{channel},{ctype}')
                    break
                elif nds2type == '' and ',' in namestr:
                    channel, namestr = namestr.split(',', 1)
                    out.append(channel)
                    break
        if namestr:
            out.append(namestr)
        return out

    def find(self, name):
        """Find the `Channel` with a specific name in this `ChannelList`.

        Parameters
        ----------
        name : `str`
            name of the `Channel` to find

        Returns
        -------
        index : `int`
            the position of the first `Channel` in this `ChannelList`
            whose `~Channel.name` matches the search key.

        Raises
        ------
        ValueError
            if no matching `Channel` is found.
        """
        for i, chan in enumerate(self):
            if name == chan.name:
                return i
        raise ValueError(name)

    def sieve(self, name=None, sample_rate=None, sample_range=None,
              exact_match=False, **others):
        """Find all `Channels <Channel>` in this list matching the
        specified criteria.

        Parameters
        ----------
        name : `str`, or regular expression
            any part of the channel name against which to match
            (or full name if `exact_match=False` is given)
        sample_rate : `float`
            rate (number of samples per second) to match exactly
        sample_range : 2-`tuple`
            `[low, high]` closed interval or rates to match within
        exact_match : `bool`
            return channels matching `name` exactly, default: `False`

        Returns
        -------
        new : `ChannelList`
            a new `ChannelList` containing the matching channels
        """
        # format name regex
        if isinstance(name, re.Pattern):
            flags = name.flags
            name = name.pattern
        else:
            flags = 0
        if exact_match:
            name = name if name.startswith(r'\A') else fr"\A{name}"
            name = name if name.endswith(r'\Z') else fr"{name}\Z"
        name_regexp = re.compile(name, flags=flags)

        matched = list(self)

        if name is not None:
            matched = [entry for entry in matched if
                       name_regexp.search(entry.name) is not None]

        if sample_rate is not None:
            sample_rate = (sample_rate.value if
                           isinstance(sample_rate, units.Quantity) else
                           float(sample_rate))
            matched = [entry for entry in matched if entry.sample_rate
                       and entry.sample_rate.value == sample_rate]

        if sample_range is not None:
            matched = [
                e for e in matched
                if sample_range[0] <= e.sample_rate.value <= sample_range[1]
            ]

        for attr, val in others.items():
            if val is not None:
                matched = [
                    e for e in matched
                    if hasattr(e, attr) and getattr(e, attr) == val
                ]

        return self.__class__(matched)

    @classmethod
    def query(cls, name, kerberos=None, **kwargs):
        """Query the LIGO Channel Information System a `ChannelList`.

        Parameters
        ----------
        name : `str`
            name of channel, or part of it.

        kerberos : `bool`, optional
            use an existing Kerberos ticket as the authentication credential,
            default behaviour will check for credentials and request username
            and password if none are found (`None`)

        kwargs
            other keyword arguments are passed directly to
            :func:`ciecplib.get`

        Returns
        -------
        channels : `ChannelList`
            a new list containing all `Channels <Channel>` found.
        """
        from .io import cis
        return cis.query(name, kerberos=kerberos, **kwargs)

    @classmethod
    def query_nds2(cls, names, host=None, port=None, connection=None,
                   type=io_nds2.Nds2ChannelType.any(), unique=False):
        """Query an NDS server for channel information

        Parameters
        ----------
        name : `str`
            name of requested channel
        host : `str`, optional
            name of NDS2 server.
        port : `int`, optional
            port number for NDS2 connection
        connection : `nds2.connection`
            open connection to use for query
        type : `str`, `int`
            NDS2 channel type with which to restrict query
        unique : `bool`, optional
            require a unique query result for each name given, default `False`

        Returns
        -------
        channellist : `ChannelList`
            list of `Channels <Channel>` with metadata retrieved from
            NDS2 server

        Raises
        ------
        ValueError
            if multiple channels are found for a given name and `unique=True`
            is given

        Notes
        -----
        .. warning::

           A `host` is required if an open `connection` is not given
        """
        ndschannels = io_nds2.find_channels(names, host=host, port=port,
                                            connection=connection, type=type,
                                            unique=unique)
        return cls(map(Channel.from_nds2, ndschannels))

    @classmethod
    @io_nds2.open_connection
    def query_nds2_availability(cls, channels, start, end, ctype=126,
                                connection=None, host=None, port=None):
        """Query for when data are available for these channels in NDS2

        Parameters
        ----------
        channels : `list`
            list of `Channel` or `str` for which to search

        start : `int`
            GPS start time of search, or any acceptable input to
            :meth:`~gwpy.time.to_gps`

        end : `int`
            GPS end time of search, or any acceptable input to
            :meth:`~gwpy.time.to_gps`

        connection : `nds2.connection`, optional
            open connection to an NDS(2) server, if not given, one will be
            created based on ``host`` and ``port`` keywords

        host : `str`, optional
            name of NDS server host

        port : `int`, optional
            port number for NDS connection

        Returns
        -------
        segdict : `~gwpy.segments.SegmentListDict`
            dict of ``(name, SegmentList)`` pairs
        """
        start = int(to_gps(start))
        end = int(ceil(to_gps(end)))
        chans = io_nds2.find_channels(channels, connection=connection,
                                      unique=True, epoch=(start, end),
                                      type=ctype)
        availability = io_nds2.get_availability(chans, start, end,
                                                connection=connection)
        return type(availability)(zip(channels, availability.values()))