ulikoehler/UliEngineering

View on GitHub
UliEngineering/SignalProcessing/Selection.py

Summary

Maintainability
C
7 hrs
Test Coverage
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilities for selecting and finding specific attributes in datasets
"""
import numpy as np
import datetime
import numbers
import functools
from toolz import functoolz
from bisect import bisect_left, bisect_right
import collections

__all__ = ["select_by_datetime", "find_sorted_extrema",
           "select_by_threshold", "find_true_runs", "find_false_runs", "filter_runs",
           "runs_ignore_borders", "shrink_ranges", "IntInterval",
           "random_slice", "find_nearest_idx", "resample_discard",
           "GeneratorCounter", "majority_vote_all", "majority_vote",
           "extract_by_reference", "select_ranges",
           "sorted_range_indices", "multiselect", "find_closest_index"]

# Define interval class and override to obtain operator overridability
__Interval = collections.namedtuple("Interval", ["start", "end"])


class IntInterval(__Interval):
    """
    Tuple-like type that represents an integral interval (or a slice) inside an
    integral space.

    This class:
        - allows easy ofsetting by e.g. adding a scalar and
    other convenience operations.
        - overrides __call__() for convenient slicing.
        - override __len__() for size determination
        - override __mul__ in a center-preserving manner

    Multiplying preserves the center of the interval (might be offset by one
        due to integral properties).
    Multiplying by 1.0 does not perform any change. Multiplication by 0.5
    halves the interval while multiplication by 4 quadruples its size.
    Multiplication doe not use 0-bounded arithmetic to allow multiply-then-add
    offsetting. If the multiplication factor is less than 1 but > 0,
    the interval will maintain a size of at least one.

    Multiplication by zero results in a zero-sized interval.
    """
    @staticmethod
    def from_ranges(ranges):
        """
        Create a list of IntInterval instances from a range array

        Parameters
        ----------
        ranges : array_like
            A (n,2) array containing instances
        """
        return [IntInterval(r[0], r[1]) for r in ranges]

    @staticmethod
    def to_ranges(intervals):
        """
        Converts a list of IntInterval instances to a 2d range array,
        like the one returned by find_true_runs(),
        """
        return np.asarray([(interval[0], interval[1]) for interval in intervals])


    def __radd__(self, i):
        if not isinstance(i, numbers.Integral):
            raise TypeError("Can only add integers to an interval")
        return IntInterval(self.start + i, self.end + i)

    def __add__(self, i):
        return self.__radd__(i)

    def __rsub__(self, i):
        if not isinstance(i, numbers.Integral):
            raise TypeError("Can only substract integers from an interval")
        return IntInterval(i - self.start, i - self.end)

    def __sub__(self, i):
        if not isinstance(i, numbers.Integral):
            raise TypeError("Can only substract integers from an interval")
        return IntInterval(self.start - i, self.end - i)

    def __call__(self, *args):
        if not args:
            raise TypeError("Use one or multiple arrays to produce a slice of them")
        if len(args) == 1:
            return args[0][self.start:self.end]
        return tuple([arr[self.start:self.end] for arr in args])

    def __len__(self):
        return self.end - self.start

    def __mul__(self, n):
        if not isinstance(n, numbers.Number):
            raise TypeError("Intervals can only be multiplied by numbers")
        if n == 1:
            return self
        elif n == 0:  # Return size-0 interval
            center = (self.end + self.start) // 2
            return IntInterval(center, center)
        elif n < 1:  # Shrink
            # Compute what to remove at each end
            toRemove = int(round(len(self) * (1.0 - n) / 2))
            return IntInterval(self.start + toRemove, self.end - toRemove)
        else:  # n > 1: Expand
            # Compute what to remove at each end
            toAdd = int(round(len(self) * (n - 1.0) / 2))
            return IntInterval(self.start - toAdd, self.end + toAdd)

    def __rmul__(self, n):
        return self.__mul__(n)

    def __truediv__(self, n):
        if not isinstance(n, numbers.Number):
            raise TypeError("Intervals can only be divided by numbers")
        return self.__mul__(1.0 / n)


def select_by_datetime(timestamps, time, factor=1.0, around=None, ofs=0.0, side="left"):
    """
    Find the index in a timestamp array that is closest to a given value.
    The timestamp array expected to be sorted in ascending order. It is
    expected to contain Epoch timestamps as either integral or floating-point type.

    This function does perform any copying or modification of the array, so it is suitable
    for huge mmapped arrays.

    Depending on the scale of the timestamps, the factor argument needs to be adjusted
    accordingly. For example, 1e3 means that a difference of 1.0 in two timestamps means
    a difference of 1.0 ms. A factor of 1e6 means that a difference
    of 1.0 in two timestamp values means a difference of 1.0 µs. The default factor of 1.0
    means that 1.0 difference in two timestamps means a difference of 1.0 second and therefore
    is compatible with datetime.timestamp() values

    time may be either a datetime argument or a string in the %Y-%m-%d %H:%M:%S format.
    If the time string contains a decimal dot, the %Y-%m-%d %H:%M:%S.%f format, which
    allows specification up to microseconds, is used.

    Optionally, an offset value may be given that is substracted from the
    calculated timestamp before applying the factor.
    When using a timestamp offset of 1e9 with a factor of 1.0, for example,
    the value in the timestamp array for a given date is expected to be 1e9 less
    than the epoch time.

    This function uses bisect_left internally. Refer to its documentation
    for exact behavioural description.

    If around is None, returns the idx of the closest match.
    Else, returns (idx - around, idx + around), i.e. with idx right in the middle.
    This allows easy generation of ranges centered around a specific value.
    """
    if isinstance(time, str):
        if "." in time:
            time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S.%f")
        else:  # No microsecond part
            time = datetime.datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
    bisect_func = {"left": bisect_left, "right": bisect_right}[side]
    idx = bisect_func(timestamps, (time.timestamp() - ofs) * factor)
    if around is None:
        return idx
    else:  # Return range
        return IntInterval(idx - around, idx + around)

def sorted_range_indices(arr, low, high):
    """
    Compute (startidx, endidx) for a given sorted array for a given low, high range
    so that all x in arr[startidx:endidx] is within (low, high)

    Commonly used for selecting frequency ranges from an FFT frequency.
    """
    startidx = np.searchsorted(arr >= low, True) if low is not None else None
    endidx = np.searchsorted(arr >= high, True) if high is not None else None
    return (startidx, endidx)

def __mapAndSortIndices(x, y, idxs, sort_descending=True):
    """
    Map a list of indices to a y array and sort it by y.
    This is used in multiple selectByX() functions.
    """
    xvals = x[idxs]
    yvals = y[idxs]
    idxs = np.argsort(yvals)
    if sort_descending:
        idxs = np.flipud(idxs)
    # Copy x/y values to new array
    return np.column_stack((xvals[idxs], yvals[idxs]))

def _check_extrema_comparator(comparator):
    """
    Raise if the given comparator is neither np.greater nor np.less
    """
    if comparator != np.greater and comparator != np.less:
        raise ValueError("Comparator may only be np.greater or np.less")


try:
    import scipy.signal
    def find_sorted_extrema(x, y, comparator=np.greater, order=1, mode='clip'):
        """
        Find extrema using the given method and parameters, order them by y value and
        return a (n, 2)-shaped array that contains (for each extremum 0..n-1) the
        x and y value, with the 1st dimension being sorted in descending order.

        The comparator may be 

        This means that ret[0] contains the x, y coordinate of the most significant extremum
        (where the significancy is determined by the comparator)

        Parameters
        ----------
        mode : string
            How the edges of the vector are treated.
            Either 'clip', 'raise' or 'wrap',
            see numpy.take for more details
        comparator:
            Either np.greater or np.less.
            np.greater => Find maxima
            np.less => Find minima
        """
        _check_extrema_comparator(comparator)
        # Determine extrema and x/y values at those indices
        extrema = scipy.signal.argrelextrema(y, comparator, 0, order, mode)[0]
        return __mapAndSortIndices(x, y, extrema, comparator == np.greater)
except ModuleNotFoundError:
    def find_sorted_extrema(*args, **kwargs):
        raise NotImplementedError("You need to install scipy to use find_sorted_extrema()!")

def select_by_threshold(fx, fy, thresh, comparator=np.greater):
    """
    Select values where a specific absolute threshold applies
    Returns a (n, 2)-shaped array where
    ret[i] = (x, y) contains the x and y values
    and the array is sorted in descending order by absolute y values.
    """
    _check_extrema_comparator(comparator)
    idxs = np.where(comparator(fy, thresh))
    return __mapAndSortIndices(fx, fy, idxs, comparator == np.greater)

def find_true_runs(arr):
    """
    Find runs of True values in a boolean array.
    This function is not intended to be used with arrays other than Booleans.
    The end element of the ranges is inclusive.
    Return a (n, 2)-shaped array where the 2nd dimension contain start and end indices.
    """
    # Ensure the ends don't cause issues and we are calculating in int-space
    oneZero = np.zeros(1)
    cated = np.concatenate((oneZero, arr, oneZero)).view(dtype=float)
    diffs = np.diff(cated)
    starts = np.nonzero(diffs >= 1)[0]
    ends = np.nonzero(diffs <= -1)[0] - 1
    return np.vstack((starts, ends)).T

def find_false_runs(arr):
    """Alias for find_true_runs(np.logical_not(arr))"""
    return find_true_runs(np.logical_not(arr))

def runs_ignore_borders(runs, size=-1, ignore_start=True, ignore_end=True):
    """
    Ignore the first and/or the last run if they start at 0 or end at the array size respectively.
    
    Parameters
    ----------
    runs : array_like
        A (n,2) array such as returned by find_true_runs()
    size : int
        The size of the original array. Must be set appropriately when the end range shall be removed
    ignore_start : bool
        If true, the first run will be ignored if its start index is 0
    ignore_end : bool
        If true, the first run will be ignored if its start index is 0.
        The size parameter must be set correctly so it can be recognized
        if the last run ends at the array end.
        If size=-1, ignore_end is forced to False
    """
    startidx = 1 if (runs.size and ignore_start and runs[0,0] == 0) else None
    endidx = -1 if (runs.size and ignore_end and runs[-1,1] == size) else None
    return runs[startidx:endidx]


def __run_size_filter(minsize, maxsize):
    """Return a run filter function that checks >= minsize && <= maxsize."""
    def _filt(run):
        delta = run[1] - run[0] # It is assumed it's a correct run, i.e. run.size == 2 && b > a
        return delta >= minsize and delta <= maxsize
    return _filt

def filter_runs(runs, minsize=2, maxsize=np.inf):
    """
    Given a (n,2) array such as returned by findTrueRuns(), returns a new
    (n-x,2) run list that contains only runs at least minSize in size and
    """
    return np.asarray(list(filter(__run_size_filter(minsize, maxsize), runs)))

def __select_y(ranges, y, selector):
    """maxy selector for shrink_ranges"""
    return np.asarray([start + selector(y[start:end + 1]) for start, end in ranges])

__shrinkRangeMethodLUT = {
    "min": lambda arr: arr[:, 0],
    "max": lambda arr: arr[:, 1],
    "middle": lambda arr: (arr[:, 0] + arr[:, 1]) // 2,
    # Special
    "miny": functools.partial(__select_y, selector=np.argmin),
    "maxy": functools.partial(__select_y, selector=np.argmax),
}

def shrink_ranges(ranges, method="middle", **kwargs):
    """
    Take a (n, 2)-shaped range list like the one returned by find_true_runs()
    and shrink the ranges so they are only 1 wide. Returns a (n)-shaped array of indices.

    Currently supported shrinking methods are:
        - min: Selects the first index of the slice
        - max: Selects the last index of the slice
        - middle: Selects the index (start+end) / 2. y may be None.

    There also some special reducers that require additional kwargs
        - maxy Requires y=array_like kwarg. Selects the index where y is maximal in the range.
        - miny Requires y=array_like kwarg. Selects the index where y is minimal in the range.

    """
    return __shrinkRangeMethodLUT[method](ranges, **kwargs)


def select_ranges(ranges, arr):
    """
    Apply a range array like the one returned by find_true_runs().
    Yields each array slice in order

    Parameters
    ----------
    ranges : array_like
        The (n, 2) range array
    arr : array_like
        The array to apply the ranges to
    """
    for _range in ranges:
        yield arr[_range[0]:_range[1]]



def ranges_to_IntInterva(ranges):
    """
    Convert a 2d range array, like the one returned by find_true_runs(),
    to a list of int ranges).
    """


def find_nearest_idx(arr, v):
    """
    Find the index in the array which refers to the value with the least
    absolute difference from v.
    """
    # Original idea by unutbu @SO: http://stackoverflow.com/a/2566508/2597135
    return (np.abs(arr - v)).argmin()


def random_slice(arr, size):
    """
    Select a uniformly random slice of exactly a given size from the given array.

    Array may be a 1D numpy array or an integral which represents the array size.

    Returns an IntInterval instance or raise if the array is not large enough
    """
    if isinstance(arr, numbers.Integral):
        alen = arr
    else:  # Assume numpy-like
        alen = arr.shape[0]
    if alen < size:
        msg = "Array of size {0} is not large enough to hold interval of size {1}"\
              .format(alen, size)
        raise ValueError(msg)
    elif alen == size:
        return IntInterval(0, alen)
    r = np.random.randint(0, alen - size)
    return IntInterval(r, r + size)


class GeneratorCounter(object):
    """
    Utility class that provides zero-overhead counting for generators.
    At any point in time, len(...) of this class provides the number of
    items iterated so far

    Usage example:

        mycountinggen = CountingTee(mygen)
        myfunc(mycountinggen) # Same result as myfunc(mygen)
        print(len(mycountinggen))
    """
    def __init__(self, gen):
        self.gen = gen
        self.count = 0
        self.iter = self.gen.__iter__()

    def reiter(self, reset_count=False):
        """
        Attempts to restart iterating over the generator.
        This will work properly for lists, for example.

        Set reset_count to True to also reset the internal counter
        """
        if reset_count:
            self.count = 0
        self.iter = self.gen.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        result = next(self.iter)  # raises if no next value
        self.count += 1
        return result

    def __len__(self):
        return self.count


def majority_vote_all(arr, return_absolute=False):
    """
    Perform a majority selection on the value in an array.
    The values are required to be quantized, i.e. if no values are equal
    but only close together, this method won't work (try using a KDE-based method)
    This algorithm is fast and works for huge datasets, however,
    and is well suited, for example, for analyzing FFT outputs
    (as FFT generates quantized outputs).

    majority_vote_all() return a list [(v, f)] which is ordered
    by descending commonness (equally common values are not ordered).
    f is the fraction of the total size of arr.
    If return_absolute == True, f is an absolute frequency.

    arr maybe any array-like structure or generator.
    """
    c = collections.Counter()
    gc = GeneratorCounter(arr)
    c.update(gc)
    most_common = c.most_common()
    if return_absolute:
        return most_common
    return [(rec[0], rec[1] / len(gc)) for rec in most_common]


def majority_vote(arr):
    """
    Wrapper for majority_vote_all() that only returns the most common value.
    The frequency of the value is ignored. None is returned if arr is empty.
    """
    mv = majority_vote_all(arr)
    return mv[0][0] if mv else None


def resample_discard(arr, divisor, ofs=0):
    """
    Resample with an integral divisor, discarding all other samples
    """
    return arr[ofs::divisor]


def extract_by_reference(fx, fy, ref):
    """
    When ref is an array of arbitrary timestamp-like values,
    extracts a range from fx, fy so that the range represented by
    ref matches the range represented by the return value as close as possible.

    Basically, when you have a time range, this function selects the same
    time range from another dataset, even if fx, fy and ref have different
    time resolution and time shifts.

    Returns (fx_new, fy_ne) so that fx_new[0] <= ref[0] and
    fx_new[-1] >= ref[-1] unless fx is too small.

    It is required that fx is sorted and ref[0] > ref[-1].
    A tuple (start, end) can be used instead of ref
    """
    start, end = ref[0], ref[-1]
    idx1 = bisect_left(fx, start)
    idx2 = bisect_left(fx, end)
    return fx[idx1:idx2], fy[idx1:idx2]


def multiselect(lst, indices, convert=functoolz.identity):
    """
    Creates a new list from a list-like object, selecting only the indices
    in the index list, in the specified order.

    This works like numpy indexing with an index array.

    Parameters
    ----------
    lst : object with [] operator
        An object where all indices as listed in indices
        can be used
    indices : iterable
        The indices to use for selecting from lst.
    convert : unary function
        This function is called on indices[i] for every iteration.
        This can be e.g. set to tuple if lst accepts tuple indices
        but the indices argument consists of lists.

    Examples
    --------
    >>> multiselect([1,2,3,4,5,6], [3,1,5])
    [4, 2, 6]
    """
    return [lst[convert(idx)] for idx in indices]


def find_closest_index(frequencies, frequency):
    """
    Find the closest frequency bin in an array of frequencies
    and return its index in the frequency array.
    """
    return np.argmin(np.abs(frequencies - frequency))