ulikoehler/UliEngineering

View on GitHub
UliEngineering/SignalProcessing/Utils.py

Summary

Maintainability
A
3 hrs
Test Coverage
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Unsorted signal processing utilities
"""
import numpy as np
from toolz import functoolz
import numbers
import warnings
from .Selection import find_true_runs
from UliEngineering.EngineerIO import normalize_numeric

__all__ = ["remove_mean", "rms", "peak_to_peak", "unstair", "optimum_polyfit", "LinRange", "aggregate", "zero_crossings", "rms_to_peak_to_peak"]

_unstep_reduction_methods = {
    "left": lambda a: a[:, 0],
    "right": lambda a: a[:, 1],
    "middle": lambda a: np.sum(a, axis=1) // 2,
    "reduce": lambda a: a.flatten()
}

def remove_mean(arr):
    """
    Substract the DC signal component, i.e. the arithmetic mean of the array,
    from the array and return the modified array
    """
    return arr - np.mean(arr)

def rms(arr):
    """
    Compute the root-mean-square value of the given array
    """
    return np.sqrt(np.mean(np.square(arr)))

def rms_to_peak_to_peak(rms_val):
    """
    Given an RMS value, returns the peak to peak value
    of a sinusoid signal with that RMS value.
    """
    rms_val = normalize_numeric(rms_val)
    return rms_val * np.sqrt(2)

def peak_to_peak(arr):
    """
    Compute max(arr) - min(arr)
    """
    if arr is None or len(arr) == 0:
        # This causes numpy ValueError since some Numpy version
        return 0.
    return np.max(arr) - np.min(arr)

def unstair(x, y, method="diff", tolerance=1e-9):
    """
    Remove stairs (adjacent equal values) from a function with quantized values.
    The first and the last values are always returned.

    Currently available methods:
        - left: Use the leftmost value of every step
        - middle: Use shrinked ranges to return values in the middle of a step range
        - right: Use the rightmost value of every step
        - reduce: Return exactly the same function, but remove intermediate values below the tolerance.
                  This can be used for data reduction without actually changing the value.

    :param tolerance: The diff value allowed to assume a step.
                      The diff is compared to this on a per-sample basis.
    :return A tuple (x, y) with the corresponding new values.
    """
    # Separate steps and values which are not in a step
    stairs = np.abs(np.diff(y)) < tolerance
    normals = np.logical_not(stairs) # Values which are not in a step
    # Add first and last index. np.diff() returns idxs backshifted by one, so add 1 now
    normal_idxs = np.concatenate(([0, y.size - 1], normals.nonzero()[0] + 1))
    # Convert step sequences to ranges and remove some of their samples, depending on the method
    zero_runs = find_true_runs(stairs)
    zero_runs[:,1] += 1 # End index must not be inclusive
    stair_idxs = _unstep_reduction_methods[method](zero_runs)
    # Return all normal values, plus the reduced step values
    idxs = np.concatenate((normal_idxs, stair_idxs))
    # Postprocessing for special methods
    if method in ["right", "middle"]:
        idxs = np.setdiff1d(idxs, _unstep_reduction_methods["left"](zero_runs))
        # Add first index if not present
        if not (idxs == 0).any():
            idxs = np.concatenate(([0], idxs))
        if not (idxs == 0).any():
            idxs = np.concatenate((idxs, [y.size - 1]))
    else:
        idxs = np.unique(idxs)
    return x[idxs], y[idxs]


def optimum_polyfit(x, y, score=functoolz.compose(np.max, np.abs), max_degree=50, stop_at=1e-10):
    """
    Optimize the degree of a polyfit polynomial so that score(y - poly(x)) is minimized.

    :param max_degree: The maximum degree to try. LinAlgErrors are automatically ignored.
    :param stop_at: If a score lower than this is reached, the function returns early
    :param score: The score function that is applied to y - poly(x). Default: max deviation.
    :return A tuple (poly1d object, degree, score)
    """
    scores = np.empty(max_degree - 1, dtype=np.float64)
    # Ignore rank warnings now, but do not ignore for the final polynomial if not early returning
    with warnings.catch_warnings():
        warnings.simplefilter('ignore', np.RankWarning)
        for deg in range(1, max_degree):
            # Set score to max float value
            try:
                poly = np.poly1d(np.polyfit(x, y, deg))
            except np.linalg.LinAlgError:
                scores[deg - 1] = np.finfo(np.float64).max
                continue
            scores[deg - 1] = score(y - poly(x))
            # Early return if we found a polynomial that is good enough
            if scores[deg - 1] <= stop_at:
                return poly, deg, scores[deg - 1]
    # Find minimum score
    deg = np.argmin(scores) + 1
    # Compute polyfit for that degreet
    poly = np.poly1d(np.polyfit(x, y, deg))
    return poly, deg, np.min(scores)


class LinRange(object):
    """
    Combines the properties of numpy.linspace and Python3's range by providing
    a floating-point capable lazy range generator that does not keep the entire array
    in memory (but calculates slices on the fly.

    Use [:] or any other slice to obtain a numpy array. dtype is used as a wrapper function.
    Requires the use of numpy dtypes.

    Behaves like np.linspace. Use .view() to obtain a LinRange slice.
    """
    def __init__(self, start, stop, n, endpoint=True, dtype=float):
        "Create a new LinRange object using a numpy.linspace-like constructor"
        self.start = start
        self.stop = stop
        n = int(n)
        self.endpoint = endpoint
        self.step = (stop - start) / (n - 1 if endpoint else n)
        self.size = n
        self.dtype = dtype

    @staticmethod
    def range(start, stop, step):
        "Create a new LinRange object using a range()-like constructor"
        return LinRange(start, stop, int((stop - start) / step))

    def __len__(self):
        return self.size

    @property
    def mid(self):
        "Return the middle of the current interval as a floating point value"
        return self.dtype((self.start + self.stop) / 2.)

    @property
    def shape(self):
        return (self.size,)

    def astype(self, typearg):
        """
        Return self copy with a different data type
        """
        return LinRange(self.start, self.stop, self.size, dtype=typearg)

    def copy(self):
        """
        Return self as a numpy array.
        Designed to be compatible with numpy objects
        """
        return np.linspace(self.start, self.stop, self.size,
                           endpoint=self.endpoint, dtype=self.dtype)

    def view(self, start=None, stop=None, step=None):
        """
        Return a LinSpace view of self
        """
        istart, istop, istep = slice(start, stop, step).indices(self.size)
        return LinRange(self[istart], self[istop], (istop - istart) // istep, endpoint=False)

    def __getitem__(self, key):
        """
        Get:
            - A numpy linrange slice
        """
        if isinstance(key, slice):
            istart, istop, istep = key.indices(self.size)
            return np.linspace(self[istart], self[istop], (istop - istart) // istep,
                               endpoint=False, dtype=self.dtype)
        elif isinstance(key, numbers.Number):
            if key < 0:
                key = len(self) + key  # NOTE: Key is negative, so result is < len(self)!
            val = self.start + self.step * key
            # Convert to dtype
            if self.dtype == float:
                return val
            else:
                # TODO find better method, maybe using a scalar
                return np.asarray([val]).astype(self.dtype)[0]
        else:
            raise TypeError("Invalid argument type for slicing: {0}".format(type(key)))

    def __dtype_name(self):
        if hasattr(self.dtype, "__qualname__"):
            return self.dtype.__qualname__
        else:
            return str(self.dtype)

    def __repr__(self):
        return "LinRange({}, {}, {}{})".format(
            self.start,
            self.stop,
            str(self.step) if type(self.step) == np.timedelta64 else self.step,
            "" if self.dtype == float else ", dtype={}".format(self.__dtype_name())
            )

    def __eq__(self, other):
        return self.start == other.start and self.stop == other.stop and self.step == other.step

    def samplerate(self):
        """
        Returns the samplerate as float (unit: Hz).
        This works especially if step is a np.timedelta64 object.
        Else "step" is assumed to be a <seconds> value
        """
        if type(self.step) == np.timedelta64:
            ns = self.step.astype("timedelta64[ns]").astype(int)
            s = ns*1e-9
            return 1./s
        else:
            return 1./self.step

def aggregate(gen):
    """
    Takes any iterable and aggregates subsequent values
    yielded by the iterable into a single value with a counter.

    Yields (value, count) pairs

    Will NOT handle None values correctly
    """
    current = None
    cnt = 0
    for item in gen:
        if item == current:
            cnt += 1
        else:
            if current is not None:
                yield (current, cnt)
            current = item
            cnt = 1
    # Yield remaining item, if any
    if current is not None:
        yield (current, cnt)

def zero_crossings(data):
    """
    Compute indexes in the given data array just before a zero crossing occurs.
    A zero crossing at index i is defined as:
        - data[i] is positive
        - data[i + 1] exists and is negative
    or:
        - data[i] is negative
        - data[i + 1] exists and is positive.

    Returns an 1D numpy array with indexes.

    Thanks to Jim Brissom on Stack Overflow for this solution:
    https://stackoverflow.com/a/3843124/2597135
    """
    return np.where(np.diff(np.sign(data)))[0]