neuropsychology/NeuroKit

View on GitHub
neurokit2/ppg/ppg_findpeaks.py

Summary

Maintainability
A
0 mins
Test Coverage
# -*- coding: utf-8 -*-

import matplotlib.pyplot as plt
import numpy as np
import scipy.signal

from ..signal import signal_smooth


def ppg_findpeaks(ppg_cleaned, sampling_rate=1000, method="elgendi", show=False):
    """**Find systolic peaks in a photoplethysmogram (PPG) signal**

    Parameters
    ----------
    ppg_cleaned : Union[list, np.array, pd.Series]
        The cleaned PPG channel as returned by :func:`.ppg_clean`.
    sampling_rate : int
        The sampling frequency of the PPG (in Hz, i.e., samples/second). The default is 1000.
    method : str
        The processing pipeline to apply. Can be one of ``"elgendi"``. The default is ``"elgendi"``.
    show : bool
        If ``True``, returns a plot of the thresholds used during peak detection. Useful for
        debugging. The default is ``False``.

    Returns
    -------
    info : dict
        A dictionary containing additional information, in this case the samples at which systolic
        peaks occur, accessible with the key ``"PPG_Peaks"``.

    See Also
    --------
    ppg_simulate, ppg_clean

    Examples
    --------
    .. ipython:: python

      import neurokit2 as nk
      import matplotlib.pyplot as plt

      ppg = nk.ppg_simulate(heart_rate=75, duration=30)
      ppg_clean = nk.ppg_clean(ppg)
      info = nk.ppg_findpeaks(ppg_clean)
      peaks = info["PPG_Peaks"]

      @savefig p_ppg_findpeaks1.png scale=100%
      plt.plot(ppg, label="raw PPG")
      plt.plot(ppg_clean, label="clean PPG")
      plt.scatter(peaks, ppg[peaks], c="r", label="systolic peaks")
      plt.legend()
      @suppress
      plt.close()


    References
    ----------
    * Elgendi M, Norton I, Brearley M, Abbott D, Schuurmans D (2013) Systolic Peak Detection in
      Acceleration Photoplethysmograms Measured from Emergency Responders in Tropical Conditions.
      PLoS ONE 8(10): e76585. doi:10.1371/journal.pone.0076585.

    """
    method = method.lower()
    if method in ["elgendi"]:
        peaks = _ppg_findpeaks_elgendi(ppg_cleaned, sampling_rate, show=show)
    else:
        raise ValueError("`method` not found. Must be one of the following: 'elgendi'.")

    # Prepare output.
    info = {"PPG_Peaks": peaks}

    return info


def _ppg_findpeaks_elgendi(
    signal, sampling_rate=1000, peakwindow=0.111, beatwindow=0.667, beatoffset=0.02, mindelay=0.3, show=False
):
    """Implementation of Elgendi M, Norton I, Brearley M, Abbott D, Schuurmans D (2013) Systolic Peak Detection in
    Acceleration Photoplethysmograms Measured from Emergency Responders in Tropical Conditions. PLoS ONE 8(10): e76585.
    doi:10.1371/journal.pone.0076585.

    All tune-able parameters are specified as keyword arguments. `signal` must be the bandpass-filtered raw PPG
    with a lowcut of .5 Hz, a highcut of 8 Hz.

    """
    if show:
        __, (ax0, ax1) = plt.subplots(nrows=2, ncols=1, sharex=True)
        ax0.plot(signal, label="filtered")

    # Ignore the samples with negative amplitudes and square the samples with
    # values larger than zero.
    signal_abs = signal.copy()
    signal_abs[signal_abs < 0] = 0
    sqrd = signal_abs ** 2

    # Compute the thresholds for peak detection. Call with show=True in order
    # to visualize thresholds.
    ma_peak_kernel = int(np.rint(peakwindow * sampling_rate))
    ma_peak = signal_smooth(sqrd, kernel="boxcar", size=ma_peak_kernel)

    ma_beat_kernel = int(np.rint(beatwindow * sampling_rate))
    ma_beat = signal_smooth(sqrd, kernel="boxcar", size=ma_beat_kernel)

    thr1 = ma_beat + beatoffset * np.mean(sqrd)  # threshold 1

    if show:
        ax1.plot(sqrd, label="squared")
        ax1.plot(thr1, label="threshold")
        ax1.legend(loc="upper right")

    # Identify start and end of PPG waves.
    waves = ma_peak > thr1
    beg_waves = np.where(np.logical_and(np.logical_not(waves[0:-1]), waves[1:]))[0]
    end_waves = np.where(np.logical_and(waves[0:-1], np.logical_not(waves[1:])))[0]
    # Throw out wave-ends that precede first wave-start.
    end_waves = end_waves[end_waves > beg_waves[0]]

    # Identify systolic peaks within waves (ignore waves that are too short).
    num_waves = min(beg_waves.size, end_waves.size)
    min_len = int(np.rint(peakwindow * sampling_rate))  # this is threshold 2 in the paper
    min_delay = int(np.rint(mindelay * sampling_rate))
    peaks = [0]

    for i in range(num_waves):

        beg = beg_waves[i]
        end = end_waves[i]
        len_wave = end - beg

        if len_wave < min_len:
            continue

        # Visualize wave span.
        if show:
            ax1.axvspan(beg, end, facecolor="m", alpha=0.5)

        # Find local maxima and their prominence within wave span.
        data = signal[beg:end]
        locmax, props = scipy.signal.find_peaks(data, prominence=(None, None))

        if locmax.size > 0:
            # Identify most prominent local maximum.
            peak = beg + locmax[np.argmax(props["prominences"])]
            # Enforce minimum delay between peaks.
            if peak - peaks[-1] > min_delay:
                peaks.append(peak)

    peaks.pop(0)

    if show:
        ax0.scatter(peaks, signal_abs[peaks], c="r")

    peaks = np.asarray(peaks).astype(int)
    return peaks