ulikoehler/UliEngineering

View on GitHub
UliEngineering/SignalProcessing/Resampling.py

Summary

Maintainability
A
3 hrs
Test Coverage
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilities for selecting and finding specific attributes in datasets
"""
import math
import functools
import numpy as np
import bisect
import concurrent.futures
import scipy.interpolate
from UliEngineering.Utils.Concurrency import QueuedThreadExecutor
from .Utils import LinRange

__all__ = ["resample_discard", "resampled_timespace",
           "parallel_resample", "signal_samplerate",
           "serial_resample"]

def signal_samplerate(t, ignore_percentile=10, mean_method=np.mean):
    """
    Compute the samplerate of a signal
    using a quantile-based method to exclude
    outliers (in the time delta domain) and
    computes the by 1 / mean
    
    Using a low ignore_percentile value is only
    desirable if the dataset is small and therefore
    does not average properly due to lack of samples.
    In most cases, using a high ignore percentile
    like 10 is recommended.
    
    Returns a float (samplerate) [1/s].

    If t is a LinRange() object, returns t.samplerate()

    Parameters
    ----------
    t : numpy array of datetime64 type (or LinRange)
        Timestamps associated with the signal
    ignore_percentile : number
        This percentile of outliers is ignored
        for the mean calculation at both the top
        and the bottom end.
        "5" means considering the 5th...95th percentile
        for averaging.
    mean_method : unary function
        Used to compute the mean after excluding outliers.
        Except for special usecases, arithmetic mean (np.mean)
        is recommended.
    """
    # Special rule for LinRange objects that have a defined samplerate
    if isinstance(t, LinRange):
        return t.samplerate()
    tdelta = np.diff(t)
    above = np.percentile(tdelta, ignore_percentile)
    below = np.percentile(tdelta, 100 - ignore_percentile)
    filtered = tdelta[np.logical_and(tdelta >= above, tdelta <= below)]
    # Filtered is too small if the sample periods are too uniform in the array
    if len(filtered) < 0.1 * len(tdelta):
        filtered = tdelta
    mean_sample_period = mean_method(filtered)
    mean_sample_period = mean_sample_period.astype("timedelta64[ns]").astype(np.float64)
    return 1e9 / mean_sample_period # 1e9 : nanoseconds

def resample_discard(arr, divisor, ofs=0):
    """
    Resample with an integral divisor, discarding all other samples.
    Returns a view of the data.
    Very fast as this doesn't need to read the data.
    """
    return arr[ofs::divisor]

def resampled_timespace(t, new_samplerate, assume_sorted=True, time_factor=1e6):
    """
    Compute the new timespace after resampling a input timestamp array
    (not neccessarily lazy)

    Parameters
    ----------
    t : numpy array-like
        The source timestamps.
        If these are numbers, you must supply time_factor to
        specify the resolution of the number.
        If they are 
    new_samplerate : float
        The new datarate in Hz
    assume_sorted : bool
        If this is True, the code assumes the source
        timestamp array is monotonically increasing, i.e.
        the lowest timestamp comes first and the highest last.
        If this is False, the code determines
        the min/max value by reading the entire array.
    time_factor : float
        Ignored if t is of dtype datetime64
        Defines what timestamps in the source (and result)
        array means. This is required to interpret new_samplerate.
        If time_factor=1e6, it means that a difference of 1.0
        in two timestamps means a difference of 1/1e6 seconds.
    
    Returns
    -------
    A LinSpace() (acts like a numpy array but doesn't consume any memory)
    that represents the new timespace
    news
    """
    if len(t) == 0:
        raise ValueError("Empty time array given - can not perform any resampling")
    if len(t) == 1:
        raise ValueError("Time array has only one value - can not perform any resampling")
    # Handle numpy datetime64 input
    if "datetime64" in t.dtype.name:
        t = t.astype('datetime64[ns]').astype(np.float64)
        time_factor = 1e9
    # Compute time endpoints
    dst_tdelta = time_factor / new_samplerate
    startt, endt = (t[0], t[-1]) if assume_sorted else (np.min(t), np.max(t))
    src_tdelta = endt - startt
    if src_tdelta < dst_tdelta:
        raise ValueError("The time delta is smaller than a single sample - can not perform resampling")
    # Use a lazy linrange to represent time interval
    return LinRange.range(startt, endt, dst_tdelta)


def __parallel_resample_worker(torig, tnew, y, out, i, chunksize, ovp_size, prefilter, fitkind):
    # Find the time range in the target time
    t_target = tnew[i:i + chunksize]
    # Find the time range in the source time
    srcstart = bisect.bisect_left(torig, t_target[0])
    srcend = bisect.bisect_right(torig, t_target[1])
    # Compute start and end index with overprovisioning
    # This might be out of range of the src array but bisect will ignore that
    srcstart_ovp = max(0, srcstart - ovp_size)  # Must not get negative indices
    srcend_ovp = srcend - ovp_size
    # Compute source slices
    tsrc_chunk = torig[srcstart_ovp:srcend_ovp]
    ysrc_chunk = y[srcstart_ovp:srcend_ovp]

    # Perform prefilter
    if prefilter is not None:
        tsrc_chunk, ysrc_chunk = prefilter(tsrc_chunk, ysrc_chunk)

    # Compute interpolating spline (might also be piecewise linear)...
    fit = scipy.interpolate.interp1d(tsrc_chunk, ysrc_chunk, fitkind=fitkind)
    # ... and evaluate
    out[i:i + chunksize] = fit(t_target)


def serial_resample(t, y, new_samplerate, out=None, prefilter=None,
                      time_factor=1e6,
                      fitkind='linear', chunksize=10000,
                      overprovisioning_factor=0.01):
    """
    A resampler that uses scipy.interpolate.interp1d but splits the
    input into chunks that can be processed.
    The chunksize is applied to the output timebase.

    The input x array is assumed to be sorted, facilitating binary search.
    If the output array is not given, it is automatically allocated with the correct size.

    The chunk workers are executed in parallel in a concurrent.futures thread pool.

    In order to account for vector end effects, an overprovisioning factor
    can be provided so that a fraction of the chunksize is added at both ends of
    the source chunk.
    This
    A overprovisioning factor of 0.01 means that 1% of the chunksize is added on the left
    and 1% is added on the right. This does not affect leftmost and rightmost
    border of the input array.

    Returns the output array.

    Applies an optional prefilter to the input data while resampling. If the timebase of
    the input data is off significantly, this might produce unexpected results.
    The prefilter must be a reentrant functor that takes (t, x) data and returns
    a (t, x) tuple. The returned tuple can be of arbitrary size (assuming t and x
    have the same length) but its t range must include the t range that is being interpolated.
    Note that the prefilter is performed after overprovisioning, so setting a higher
    overprovisioning factor (see below) might help dealing with prefilters that
    return too small arrays, however at the start and the end of the input array,
    no overprovisioning values can be added.
    """
    new_t = resampled_timespace(t, new_samplerate, time_factor=time_factor)
    # Lazily compute the new timespan
    if out is None:
        out = np.zeros(len(new_t))
    ovp_size = int(math.floor(overprovisioning_factor * chunksize))
    # How many chunks do we have to process?
    for i in range(len(new_t) // chunksize):
        __parallel_resample_worker(i=i, orig=t, tnew=new_t,
            y=y, out=out, chunksize=chunksize,
            ovp_size=ovp_size, prefilter=prefilter,
            fitkind=fitkind)

    return out


def parallel_resample(t, y, new_samplerate, out=None, prefilter=None,
                      executor=None, time_factor=1e6,
                      fitkind='linear', chunksize=10000,
                      overprovisioning_factor=0.01):
    """
    Parallel variant of serial_resample
    """
    new_t = resampled_timespace(t, new_samplerate, time_factor=time_factor)
    # Lazily compute the new timespan
    if out is None:
        out = np.zeros(len(new_t))
    if executor is None:
        executor = QueuedThreadExecutor()
    ovp_size = int(math.floor(overprovisioning_factor * chunksize))
    # How many chunks do we have to process?
    numchunks = len(new_t) // chunksize

    # Bind constant arguments
    f = functools.partial(__parallel_resample_worker, torig=t, tnew=new_t,
                          y=y, out=out, chunksize=chunksize,
                          ovp_size=ovp_size, prefilter=prefilter,
                          fitkind=fitkind)

    futures = [executor.submit(f, i=i) for i in range(numchunks)]
    # Wait for futures to finish
    concurrent.futures.wait(futures)

    return out