neurokit2/ecg/ecg_findpeaks.py
# - * - coding: utf-8 - * -
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.signal
import scipy.stats
from ..signal import (
signal_findpeaks,
signal_plot,
signal_sanitize,
signal_smooth,
signal_zerocrossings,
)
def ecg_findpeaks(ecg_cleaned, sampling_rate=1000, method="neurokit", show=False, **kwargs):
"""**Locate R-peaks**
Low-level function used by :func:`ecg_peaks` to identify R-peaks in an ECG signal using a
different set of algorithms. Use the main function and see its documentation for details.
Parameters
----------
ecg_cleaned : Union[list, np.array, pd.Series]
See :func:`ecg_peaks()`.
sampling_rate : int
See :func:`ecg_peaks()`.
method : string
See :func:`ecg_peaks()`.
show : bool
If ``True``, will return a plot to visualizing the thresholds used in the algorithm.
Useful for debugging.
**kwargs
Additional keyword arguments, usually specific for each ``method``.
Returns
-------
info : dict
A dictionary containing additional information, in this case the
samples at which R-peaks occur, accessible with the key ``"ECG_R_Peaks"``.
See Also
--------
ecg_peaks, .signal_fixpeaks
"""
# Try retrieving right column
if isinstance(ecg_cleaned, pd.DataFrame):
try:
ecg_cleaned = ecg_cleaned["ECG_Clean"]
except (NameError, KeyError):
try:
ecg_cleaned = ecg_cleaned["ECG_Raw"]
except (NameError, KeyError):
ecg_cleaned = ecg_cleaned["ECG"]
# Sanitize input
ecg_cleaned = signal_sanitize(ecg_cleaned)
method = method.lower() # remove capitalised letters
# Run peak detection algorithm
try:
func = _ecg_findpeaks_findmethod(method)
rpeaks = func(ecg_cleaned, sampling_rate=sampling_rate, show=show, **kwargs)
except ValueError as error:
raise error
# Prepare output.
info = {"ECG_R_Peaks": rpeaks}
return info
# Returns the peak detector function by name
def _ecg_findpeaks_findmethod(method):
if method in ["nk", "nk2", "neurokit", "neurokit2"]:
return _ecg_findpeaks_neurokit
elif method in ["pantompkins", "pantompkins1985"]:
return _ecg_findpeaks_pantompkins
elif method in ["nabian", "nabian2018"]:
return _ecg_findpeaks_nabian2018
elif method in ["gamboa2008", "gamboa"]:
return _ecg_findpeaks_gamboa
elif method in ["ssf", "slopesumfunction", "zong", "zong2003"]:
return _ecg_findpeaks_ssf
elif method in ["hamilton", "hamilton2002"]:
return _ecg_findpeaks_hamilton
elif method in ["christov", "christov2004"]:
return _ecg_findpeaks_christov
elif method in ["engzee", "engzee2012", "engzeemod", "engzeemod2012"]:
return _ecg_findpeaks_engzee
elif method in ["elgendi", "elgendi2010"]:
return _ecg_findpeaks_elgendi
elif method in ["kalidas2017", "swt", "kalidas", "kalidastamil", "kalidastamil2017"]:
return _ecg_findpeaks_kalidas
elif method in ["martinez2003", "martinez"]:
return _ecg_findpeaks_WT
elif method in ["rodrigues2020", "rodrigues2021", "rodrigues", "asi"]:
return _ecg_findpeaks_rodrigues
elif method in ["promac", "all"]:
return _ecg_findpeaks_promac
else:
raise ValueError(f"NeuroKit error: ecg_findpeaks(): '{method}' not implemented.")
# =============================================================================
# Probabilistic Methods-Agreement via Convolution (ProMAC)
# =============================================================================
def _ecg_findpeaks_promac(
signal,
sampling_rate=1000,
show=False,
promac_methods=[
"neurokit",
"gamboa",
"ssf",
"engzee",
"elgendi",
"kalidas",
"martinez",
"rodrigues",
],
threshold=0.33,
gaussian_sd=100,
**kwargs,
):
"""Probabilistic Methods-Agreement via Convolution (ProMAC).
ProMAC combines the result of several R-peak detectors in a probabilistic way. For a given peak
detector, the binary signal representing the peak locations is convolved with a Gaussian
distribution, resulting in a propabilistic representation of each peak location. This procedure
is repeated for all selected 'promac_methods' and the resulting signals are accumulated. Finally,
a threshold is used to accept or reject the peak locations.
See this discussion for more information on the origins of the method:
https://github.com/neuropsychology/NeuroKit/issues/222
Parameters
----------
signal : Union[list, np.array, pd.Series]
The (cleaned) ECG channel, e.g. as returned by `ecg_clean()`.
sampling_rate : int
The sampling frequency of `ecg_signal` (in Hz, i.e., samples/second).
Defaults to 1000.
show : bool
If True, will return a plot to visualizing the thresholds used in the algorithm.
Useful for debugging.
promac_methods : list of string
The algorithms to be used for R-peak detection. See the list of acceptable algorithms for
the 'ecg_peaks' function.
threshold : float
The tolerance for peak acceptance. This value is a percentage of the signal's maximum
value. Only peaks found above this tolerance will be finally considered as actual peaks.
gaussian_sd : int
The standard deviation of the Gaussian distribution used to represent the peak location
probability. This value should be in millisencods and is usually taken as the size of
QRS complexes.
Returns
-------
rpeaks : list of int
A list of array positions at which R-peaks occur.
"""
x = np.zeros(len(signal))
promac_methods = [method.lower() for method in promac_methods] # remove capitalised letters
error_list = [] # Stores the failed methods
for method in promac_methods:
try:
func = _ecg_findpeaks_findmethod(method)
x = _ecg_findpeaks_promac_addconvolve(
signal, sampling_rate, x, func, gaussian_sd=gaussian_sd, **kwargs
)
except ValueError:
error_list.append(f"Method '{method}' is not valid.")
except Exception as error:
error_list.append(f"{method} error: {error}")
# Rescale
x = x / np.max(x)
convoluted = x.copy()
# Remove below threshold
x[x < threshold] = 0
# Find peaks
peaks = signal_findpeaks(x, height_min=threshold)["Peaks"]
if show is True:
signal_plot(pd.DataFrame({"ECG": signal, "Convoluted": convoluted}), standardize=True)
[
plt.axvline(x=peak, color="red", linestyle="--") for peak in peaks
] # pylint: disable=W0106
# I am not sure if mandatory print the best option
if error_list: # empty?
print(error_list)
return peaks
# _ecg_findpeaks_promac_addmethod + _ecg_findpeaks_promac_convolve
# Joining them makes parameters exposition more consistent
def _ecg_findpeaks_promac_addconvolve(signal, sampling_rate, x, fun, gaussian_sd=100, **kwargs):
peaks = fun(signal, sampling_rate=sampling_rate, **kwargs)
mask = np.zeros(len(signal))
mask[peaks] = 1
# SD is defined as a typical QRS size, which for adults if 100ms
sd = sampling_rate * gaussian_sd / 1000
shape = scipy.stats.norm.pdf(np.linspace(-sd * 4, sd * 4, num=int(sd * 8)), loc=0, scale=sd)
x += np.convolve(mask, shape, "same")
return x
# =============================================================================
# NeuroKit
# =============================================================================
def _ecg_findpeaks_neurokit(
signal,
sampling_rate=1000,
smoothwindow=0.1,
avgwindow=0.75,
gradthreshweight=1.5,
minlenweight=0.4,
mindelay=0.3,
show=False,
):
"""All tune-able parameters are specified as keyword arguments.
The `signal` must be the highpass-filtered raw ECG with a lowcut of .5 Hz.
"""
if show is True:
__, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True)
# Compute the ECG's gradient as well as the gradient threshold. Run with
# show=True in order to get an idea of the threshold.
grad = np.gradient(signal)
absgrad = np.abs(grad)
smooth_kernel = int(np.rint(smoothwindow * sampling_rate))
avg_kernel = int(np.rint(avgwindow * sampling_rate))
smoothgrad = signal_smooth(absgrad, kernel="boxcar", size=smooth_kernel)
avggrad = signal_smooth(smoothgrad, kernel="boxcar", size=avg_kernel)
gradthreshold = gradthreshweight * avggrad
mindelay = int(np.rint(sampling_rate * mindelay))
if show is True:
ax1.plot(signal)
ax2.plot(smoothgrad)
ax2.plot(gradthreshold)
# Identify start and end of QRS complexes.
qrs = smoothgrad > gradthreshold
beg_qrs = np.where(np.logical_and(np.logical_not(qrs[0:-1]), qrs[1:]))[0]
end_qrs = np.where(np.logical_and(qrs[0:-1], np.logical_not(qrs[1:])))[0]
# Throw out QRS-ends that precede first QRS-start.
end_qrs = end_qrs[end_qrs > beg_qrs[0]]
# Identify R-peaks within QRS (ignore QRS that are too short).
num_qrs = min(beg_qrs.size, end_qrs.size)
min_len = np.mean(end_qrs[:num_qrs] - beg_qrs[:num_qrs]) * minlenweight
peaks = [0]
for i in range(num_qrs):
beg = beg_qrs[i]
end = end_qrs[i]
len_qrs = end - beg
if len_qrs < min_len:
continue
if show is True:
ax2.axvspan(beg, end, facecolor="m", alpha=0.5)
# Find local maxima and their prominence within QRS.
data = signal[beg:end]
locmax, props = scipy.signal.find_peaks(data, prominence=(None, None))
if locmax.size > 0:
# Identify most prominent local maximum.
peak = beg + locmax[np.argmax(props["prominences"])]
# Enforce minimum delay between peaks.
if peak - peaks[-1] > mindelay:
peaks.append(peak)
peaks.pop(0)
if show is True:
ax1.scatter(peaks, signal[peaks], c="r")
peaks = np.asarray(peaks).astype(int) # Convert to int
return peaks
# =============================================================================
# Pan & Tompkins (1985)
# =============================================================================
def _ecg_findpeaks_pantompkins(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Jiapu Pan and Willis J. Tompkins. A Real-Time QRS Detection Algorithm.
In: IEEE Transactions on Biomedical Engineering BME-32.3 (1985), pp. 230–236.
"""
diff = np.diff(signal)
squared = diff * diff
N = int(0.12 * sampling_rate)
mwa = _ecg_findpeaks_MWA(squared, N)
mwa[: int(0.2 * sampling_rate)] = 0
mwa_peaks = _ecg_findpeaks_peakdetect(mwa, sampling_rate)
mwa_peaks = np.array(mwa_peaks, dtype="int")
return mwa_peaks
# ===========================================================================
# Nabian et al. (2018)
# ===========================================================================
def _ecg_findpeaks_nabian2018(signal, sampling_rate=1000, **kwargs):
"""R peak detection method by Nabian et al. (2018) inspired by the Pan-Tompkins algorithm.
- Nabian, M., Yin, Y., Wormwood, J., Quigley, K. S., Barrett, L. F., & Ostadabbas, S. (2018).
An Open-Source Feature Extraction Tool for the Analysis of Peripheral Physiological Data.
IEEE Journal of Translational Engineering in Health and Medicine, 6, 1-11.
doi:10.1109/jtehm.2018.2878000
"""
window_size = int(0.4 * sampling_rate)
peaks = np.zeros(len(signal))
for i in range(1 + window_size, len(signal) - window_size):
ecg_window = signal[i - window_size : i + window_size]
rpeak = np.argmax(ecg_window)
if i == (i - window_size - 1 + rpeak):
peaks[i] = 1
rpeaks = np.where(peaks == 1)[0]
# min_distance = 200
return rpeaks
# =============================================================================
# Hamilton (2002)
# =============================================================================
def _ecg_findpeaks_hamilton(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Hamilton, Open Source ECG Analysis Software Documentation, E.P.Limited, 2002.
"""
diff = abs(np.diff(signal))
b = np.ones(int(0.08 * sampling_rate))
b = b / int(0.08 * sampling_rate)
a = [1]
ma = scipy.signal.lfilter(b, a, diff)
ma[0 : len(b) * 2] = 0
n_pks = []
n_pks_ave = 0.0
s_pks = []
s_pks_ave = 0.0
QRS = [0]
RR = []
RR_ave = 0.0
th = 0.0
i = 0
idx = []
peaks = []
for i in range(len(ma)): # pylint: disable=C0200,R1702
if (
i > 0 and i < len(ma) - 1 and ma[i - 1] < ma[i] and ma[i + 1] < ma[i]
): # pylint: disable=R1716
peak = i
peaks.append(peak)
if ma[peak] > th and (peak - QRS[-1]) > 0.3 * sampling_rate:
QRS.append(peak)
idx.append(peak)
s_pks.append(ma[peak])
if len(n_pks) > 8:
s_pks.pop(0)
s_pks_ave = np.mean(s_pks)
if RR_ave != 0.0 and QRS[-1] - QRS[-2] > 1.5 * RR_ave:
missed_peaks = peaks[idx[-2] + 1 : idx[-1]]
for missed_peak in missed_peaks:
if (
missed_peak - peaks[idx[-2]] > int(0.36 * sampling_rate)
and ma[missed_peak] > 0.5 * th
):
QRS.append(missed_peak)
QRS.sort()
break
if len(QRS) > 2:
RR.append(QRS[-1] - QRS[-2])
if len(RR) > 8:
RR.pop(0)
RR_ave = int(np.mean(RR))
else:
n_pks.append(ma[peak])
if len(n_pks) > 8:
n_pks.pop(0)
n_pks_ave = np.mean(n_pks)
th = n_pks_ave + 0.45 * (s_pks_ave - n_pks_ave)
i += 1
QRS.pop(0)
QRS = np.array(QRS, dtype="int")
return QRS
# =============================================================================
# Slope Sum Function (SSF) - Zong et al. (2003)
# =============================================================================
def _ecg_findpeaks_ssf(signal, sampling_rate=1000, threshold=20, before=0.03, after=0.01, **kwargs):
"""From https://github.com/PIA-
Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L448.
- W. Zong, T. Heldt, G.B. Moody, and R.G. Mark. An open-source algorithm to detect onset of arterial
blood pressure pulses. In Computers in Cardiology, 2003, pages 259–262, 2003.
"""
# TODO: Doesn't really seems to work
# convert to samples
winB = int(before * sampling_rate)
winA = int(after * sampling_rate)
Rset = set()
length = len(signal)
# diff
dx = np.diff(signal)
dx[dx >= 0] = 0
dx = dx ** 2
# detection
(idx,) = np.nonzero(dx > threshold)
idx0 = np.hstack(([0], idx))
didx = np.diff(idx0)
# search
sidx = idx[didx > 1]
for item in sidx:
a = item - winB
if a < 0:
a = 0
b = item + winA
if b > length:
continue
r = np.argmax(signal[a:b]) + a
Rset.add(r)
# output
rpeaks = list(Rset)
rpeaks.sort()
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Christov (2004)
# =============================================================================
def _ecg_findpeaks_christov(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Ivaylo I. Christov, Real time electrocardiogram QRS detection using combined adaptive threshold,
BioMedical Engineering OnLine 2004, vol. 3:28, 2004.
"""
total_taps = 0
b = np.ones(int(0.02 * sampling_rate))
b = b / int(0.02 * sampling_rate)
total_taps += len(b)
a = [1]
MA1 = scipy.signal.lfilter(b, a, signal)
b = np.ones(int(0.028 * sampling_rate))
b = b / int(0.028 * sampling_rate)
total_taps += len(b)
a = [1]
MA2 = scipy.signal.lfilter(b, a, MA1)
Y = []
for i in range(1, len(MA2) - 1):
diff = abs(MA2[i + 1] - MA2[i - 1])
Y.append(diff)
b = np.ones(int(0.040 * sampling_rate))
b = b / int(0.040 * sampling_rate)
total_taps += len(b)
a = [1]
MA3 = scipy.signal.lfilter(b, a, Y)
MA3[0:total_taps] = 0
ms50 = int(0.05 * sampling_rate)
ms200 = int(0.2 * sampling_rate)
ms1200 = int(1.2 * sampling_rate)
ms350 = int(0.35 * sampling_rate)
M = 0
newM5 = 0
M_list = []
MM = []
M_slope = np.linspace(1.0, 0.6, ms1200 - ms200)
F = 0
F_list = []
R = 0
RR = []
Rm = 0
R_list = []
MFR = 0
MFR_list = []
QRS = []
for i in range(len(MA3)): # pylint: disable=C0200
# M
if i < 5 * sampling_rate:
M = 0.6 * np.max(MA3[: i + 1])
MM.append(M)
if len(MM) > 5:
MM.pop(0)
elif QRS and i < QRS[-1] + ms200:
newM5 = 0.6 * np.max(MA3[QRS[-1] : i])
if newM5 > 1.5 * MM[-1]:
newM5 = 1.1 * MM[-1]
elif QRS and i == QRS[-1] + ms200:
if newM5 == 0:
newM5 = MM[-1]
MM.append(newM5)
if len(MM) > 5:
MM.pop(0)
M = np.mean(MM)
elif QRS and i > QRS[-1] + ms200 and i < QRS[-1] + ms1200:
M = np.mean(MM) * M_slope[i - (QRS[-1] + ms200)]
elif QRS and i > QRS[-1] + ms1200:
M = 0.6 * np.mean(MM)
# F
if i > ms350:
F_section = MA3[i - ms350 : i]
max_latest = np.max(F_section[-ms50:])
max_earliest = np.max(F_section[:ms50])
F += (max_latest - max_earliest) / 150.0
# R
if QRS and i < QRS[-1] + int((2.0 / 3.0 * Rm)):
R = 0
elif QRS and i > QRS[-1] + int((2.0 / 3.0 * Rm)) and i < QRS[-1] + Rm:
dec = (M - np.mean(MM)) / 1.4
R = 0 + dec
MFR = M + F + R
M_list.append(M)
F_list.append(F)
R_list.append(R)
MFR_list.append(MFR)
if not QRS and MA3[i] > MFR:
QRS.append(i)
elif QRS and i > QRS[-1] + ms200 and MA3[i] > MFR:
QRS.append(i)
if len(QRS) > 2:
RR.append(QRS[-1] - QRS[-2])
if len(RR) > 5:
RR.pop(0)
Rm = int(np.mean(RR))
QRS.pop(0)
QRS = np.array(QRS, dtype="int")
return QRS
# =============================================================================
# Gamboa (2008)
# =============================================================================
def _ecg_findpeaks_gamboa(signal, sampling_rate=1000, tol=0.002, **kwargs):
"""From https://github.com/PIA-
Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L834.
- Gamboa, H. (2008). Multi-modal behavioral biometrics based on hci and electrophysiology.
PhD ThesisUniversidade.
"""
hist, edges = np.histogram(signal, 100, density=True)
TH = 0.01
F = np.cumsum(hist)
v0 = edges[np.nonzero(F > TH)[0][0]]
v1 = edges[np.nonzero(F < (1 - TH))[0][-1]]
nrm = max([abs(v0), abs(v1)])
norm_signal = signal / float(nrm)
d2 = np.diff(norm_signal, 2)
b = np.nonzero((np.diff(np.sign(np.diff(-d2)))) == -2)[0] + 2 # pylint: disable=E1130
b = np.intersect1d(b, np.nonzero(-d2 > tol)[0]) # pylint: disable=E1130
rpeaks = []
if len(b) >= 3:
b = b.astype("float")
previous = b[0]
# convert to samples
v_100ms = int(0.1 * sampling_rate)
v_300ms = int(0.3 * sampling_rate)
for i in b[1:]:
if i - previous > v_300ms:
previous = i
rpeaks.append(np.argmax(signal[int(i) : int(i + v_100ms)]) + i)
rpeaks = sorted(list(set(rpeaks)))
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Engzee Modified (2012)
# =============================================================================
def _ecg_findpeaks_engzee(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- C. Zeelenberg, A single scan algorithm for QRS detection and feature extraction, IEEE Comp.
in Cardiology, vol. 6, pp. 37-42, 1979
- A. Lourenco, H. Silva, P. Leite, R. Lourenco and A. Fred, "Real Time Electrocardiogram Segmentation
for Finger Based ECG Biometrics", BIOSIGNALS 2012, pp. 49-54, 2012.
"""
engzee_fake_delay = 0
diff = np.zeros(len(signal))
for i in range(4, len(diff)):
diff[i] = signal[i] - signal[i - 4]
ci = [1, 4, 6, 4, 1]
low_pass = scipy.signal.lfilter(ci, 1, diff)
low_pass[: int(0.2 * sampling_rate)] = 0
ms200 = int(0.2 * sampling_rate)
ms1200 = int(1.2 * sampling_rate)
ms160 = int(0.16 * sampling_rate)
neg_threshold = int(0.01 * sampling_rate)
M = 0
M_list = []
neg_m = []
MM = []
M_slope = np.linspace(1.0, 0.6, ms1200 - ms200)
QRS = []
r_peaks = []
counter = 0
thi_list = []
thi = False
thf_list = []
thf = False
newM5 = False
for i in range(len(low_pass)): # pylint: disable=C0200
# M
if i < 5 * sampling_rate:
M = 0.6 * np.max(low_pass[: i + 1])
MM.append(M)
if len(MM) > 5:
MM.pop(0)
elif QRS and i < QRS[-1] + ms200:
newM5 = 0.6 * np.max(low_pass[QRS[-1] : i])
if newM5 > 1.5 * MM[-1]:
newM5 = 1.1 * MM[-1]
elif newM5 and QRS and i == QRS[-1] + ms200:
MM.append(newM5)
if len(MM) > 5:
MM.pop(0)
M = np.mean(MM)
elif QRS and i > QRS[-1] + ms200 and i < QRS[-1] + ms1200:
M = np.mean(MM) * M_slope[i - (QRS[-1] + ms200)]
elif QRS and i > QRS[-1] + ms1200:
M = 0.6 * np.mean(MM)
M_list.append(M)
neg_m.append(-M)
if not QRS and low_pass[i] > M:
QRS.append(i)
thi_list.append(i)
thi = True
elif QRS and i > QRS[-1] + ms200 and low_pass[i] > M:
QRS.append(i)
thi_list.append(i)
thi = True
if thi and i < thi_list[-1] + ms160:
if low_pass[i] < -M and low_pass[i - 1] > -M:
# thf_list.append(i)
thf = True
if thf and low_pass[i] < -M:
thf_list.append(i)
counter += 1
elif low_pass[i] > -M and thf:
counter = 0
thi = False
thf = False
elif thi and i > thi_list[-1] + ms160:
counter = 0
thi = False
thf = False
if counter > neg_threshold:
unfiltered_section = signal[thi_list[-1] - int(0.01 * sampling_rate) : i]
r_peaks.append(
engzee_fake_delay
+ np.argmax(unfiltered_section)
+ thi_list[-1]
- int(0.01 * sampling_rate)
)
counter = 0
thi = False
thf = False
r_peaks.pop(
0
) # removing the 1st detection as it 1st needs the QRS complex amplitude for the threshold
r_peaks = np.array(r_peaks, dtype="int")
return r_peaks
# =============================================================================
# Stationary Wavelet Transform (SWT) - Kalidas and Tamil (2017)
# =============================================================================
def _ecg_findpeaks_kalidas(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Vignesh Kalidas and Lakshman Tamil (2017). Real-time QRS detector using Stationary Wavelet Transform
for Automated ECG Analysis. In: 2017 IEEE 17th International Conference on Bioinformatics and
Bioengineering (BIBE). Uses the Pan and Tompkins thresolding.
"""
# Try loading pywt
try:
import pywt
except ImportError as import_error:
raise ImportError(
"NeuroKit error: ecg_findpeaks(): the 'PyWavelets' module is required for"
" this method to run. Please install it first (`pip install PyWavelets`)."
) from import_error
signal_length = len(signal)
swt_level = 3
padding = -1
for i in range(1000):
if (len(signal) + i) % 2 ** swt_level == 0:
padding = i
break
if padding > 0:
signal = np.pad(signal, (0, padding), "edge")
elif padding == -1:
print("Padding greater than 1000 required\n")
swt_ecg = pywt.swt(signal, "db3", level=swt_level)
swt_ecg = np.array(swt_ecg)
swt_ecg = swt_ecg[0, 1, :]
squared = swt_ecg * swt_ecg
f1 = 0.01 / (0.5 * sampling_rate)
f2 = 10 / (0.5 * sampling_rate)
sos = scipy.signal.butter(3, [f1, f2], btype="bandpass", output="sos")
filtered_squared = scipy.signal.sosfilt(sos, squared)
# Drop padding to avoid detecting peaks inside it (#456)
filtered_squared = filtered_squared[:signal_length]
filt_peaks = _ecg_findpeaks_peakdetect(filtered_squared, sampling_rate)
filt_peaks = np.array(filt_peaks, dtype="int")
return filt_peaks
# =============================================================================
# Elgendi et al. (2010)
# =============================================================================
def _ecg_findpeaks_elgendi(signal, sampling_rate=1000, **kwargs):
"""From https://github.com/berndporr/py-ecg-detectors/
- Elgendi, Mohamed & Jonkman, Mirjam & De Boer, Friso. (2010). Frequency Bands Effects on QRS Detection.
The 3rd International Conference on Bio-inspired Systems and Signal Processing (BIOSIGNALS2010).
428-431.
"""
window1 = int(0.12 * sampling_rate)
mwa_qrs = _ecg_findpeaks_MWA(abs(signal), window1)
window2 = int(0.6 * sampling_rate)
mwa_beat = _ecg_findpeaks_MWA(abs(signal), window2)
blocks = np.zeros(len(signal))
block_height = np.max(signal)
for i in range(len(mwa_qrs)): # pylint: disable=C0200
blocks[i] = block_height if mwa_qrs[i] > mwa_beat[i] else 0
QRS = []
for i in range(1, len(blocks)):
if blocks[i - 1] == 0 and blocks[i] == block_height:
start = i
elif blocks[i - 1] == block_height and blocks[i] == 0:
end = i - 1
if end - start > int(0.08 * sampling_rate):
detection = np.argmax(signal[start : end + 1]) + start
if QRS:
if detection - QRS[-1] > int(0.3 * sampling_rate):
QRS.append(detection)
else:
QRS.append(detection)
QRS = np.array(QRS, dtype="int")
return QRS
# =============================================================================
# Continuous Wavelet Transform (CWT) - Martinez et al. (2003)
# =============================================================================
#
def _ecg_findpeaks_WT(signal, sampling_rate=1000, **kwargs):
# Try loading pywt
try:
import pywt
except ImportError as import_error:
raise ImportError(
"NeuroKit error: ecg_delineator(): the 'PyWavelets' module is required for"
" this method to run. Please install it first (`pip install PyWavelets`)."
) from import_error
# first derivative of the Gaissian signal
scales = np.array([1, 2, 4, 8, 16])
cwtmatr, __ = pywt.cwt(signal, scales, "gaus1", sampling_period=1.0 / sampling_rate)
# For wt of scale 2^4
signal_4 = cwtmatr[4, :]
epsilon_4 = np.sqrt(np.mean(np.square(signal_4)))
peaks_4, _ = scipy.signal.find_peaks(np.abs(signal_4), height=epsilon_4)
# For wt of scale 2^3
signal_3 = cwtmatr[3, :]
epsilon_3 = np.sqrt(np.mean(np.square(signal_3)))
peaks_3, _ = scipy.signal.find_peaks(np.abs(signal_3), height=epsilon_3)
# Keep only peaks_3 that are nearest to peaks_4
peaks_3_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)): # pylint: disable=C0200
peaks_distance = abs(peaks_4[i] - peaks_3)
peaks_3_keep[i] = peaks_3[np.argmin(peaks_distance)]
# For wt of scale 2^2
signal_2 = cwtmatr[2, :]
epsilon_2 = np.sqrt(np.mean(np.square(signal_2)))
peaks_2, _ = scipy.signal.find_peaks(np.abs(signal_2), height=epsilon_2)
# Keep only peaks_2 that are nearest to peaks_3
peaks_2_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)):
peaks_distance = abs(peaks_3_keep[i] - peaks_2)
peaks_2_keep[i] = peaks_2[np.argmin(peaks_distance)]
# For wt of scale 2^1
signal_1 = cwtmatr[1, :]
epsilon_1 = np.sqrt(np.mean(np.square(signal_1)))
peaks_1, _ = scipy.signal.find_peaks(np.abs(signal_1), height=epsilon_1)
# Keep only peaks_1 that are nearest to peaks_2
peaks_1_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)):
peaks_distance = abs(peaks_2_keep[i] - peaks_1)
peaks_1_keep[i] = peaks_1[np.argmin(peaks_distance)]
# Find R peaks
max_R_peak_dist = int(0.1 * sampling_rate)
rpeaks = []
for index_cur, index_next in zip(peaks_1_keep[:-1], peaks_1_keep[1:]):
correct_sign = signal_1[index_cur] < 0 and signal_1[index_next] > 0 # pylint: disable=R1716
near = (index_next - index_cur) < max_R_peak_dist # limit 2
if near and correct_sign:
rpeaks.append(signal_zerocrossings(signal_1[index_cur : index_next + 1])[0] + index_cur)
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# ASI (FSM based 2020)
# =============================================================================
def _ecg_findpeaks_rodrigues(signal, sampling_rate=1000, **kwargs):
"""Segmenter by Tiago Rodrigues, inspired by on Gutierrez-Rivas (2015) and Sadhukhan (2012).
References
----------
- Gutiérrez-Rivas, R., García, J. J., Marnane, W. P., & Hernández, A. (2015). Novel real-time
low-complexity QRS complex detector based on adaptive thresholding. IEEE Sensors Journal,
15(10), 6036-6043.
- Sadhukhan, D., & Mitra, M. (2012). R-peak detection algorithm for ECG using double difference
and RR interval processing. Procedia Technology, 4, 873-877.
"""
N = int(np.round(3 * sampling_rate / 128))
Nd = N - 1
Pth = (0.7 * sampling_rate) / 128 + 2.7
# Pth = 3, optimal for fs = 250 Hz
Rmin = 0.26
rpeaks = []
i = 1
Ramptotal = 0
# Double derivative squared
diff_ecg = [signal[i] - signal[i - Nd] for i in range(Nd, len(signal))]
ddiff_ecg = [diff_ecg[i] - diff_ecg[i - 1] for i in range(1, len(diff_ecg))]
squar = np.square(ddiff_ecg)
# Integrate moving window
b = np.array(np.ones(N))
a = [1]
processed_ecg = scipy.signal.lfilter(b, a, squar)
tf = len(processed_ecg)
# R-peak finder FSM
while i < tf: # ignore last second of recording
# State 1: looking for maximum
tf1 = np.round(i + Rmin * sampling_rate)
Rpeakamp = 0
while i < tf1:
# Rpeak amplitude and position
if processed_ecg[i] > Rpeakamp:
Rpeakamp = processed_ecg[i]
rpeakpos = i + 1
i += 1
Ramptotal = (19 / 20) * Ramptotal + (1 / 20) * Rpeakamp
rpeaks.append(rpeakpos)
# State 2: waiting state
d = tf1 - rpeakpos
tf2 = i + np.round(0.2 * 2 - d)
while i <= tf2:
i += 1
# State 3: decreasing threshold
Thr = Ramptotal
while i < tf and processed_ecg[i] < Thr:
Thr *= np.exp(-Pth / sampling_rate)
i += 1
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Utilities
# =============================================================================
def _ecg_findpeaks_MWA(signal, window_size, **kwargs):
"""Based on https://github.com/berndporr/py-ecg-detectors/
Optimized for vectorized computation.
"""
window_size = int(window_size)
# Scipy's uniform_filter1d is a fast and accurate way of computing
# moving averages. By default it computes the averages of `window_size`
# elements centered around each element in the input array, including
# `(window_size - 1) // 2` elements after the current element (when
# `window_size` is even, the extra element is taken from before). To
# return causal moving averages, i.e. each output element is the average
# of window_size input elements ending at that position, we use the
# `origin` argument to shift the filter computation accordingly.
mwa = scipy.ndimage.uniform_filter1d(signal, window_size, origin=(window_size - 1) // 2)
# Compute actual moving averages for the first `window_size - 1` elements,
# which the uniform_filter1d function computes using padding. We want
# those output elements to be averages of only the input elements until
# that position.
head_size = min(window_size - 1, len(signal))
mwa[:head_size] = np.cumsum(signal[:head_size]) / np.linspace(1, head_size, head_size)
return mwa
def _ecg_findpeaks_peakdetect(detection, sampling_rate=1000, **kwargs):
"""Based on https://github.com/berndporr/py-ecg-detectors/
Optimized for vectorized computation.
"""
min_peak_distance = int(0.3 * sampling_rate)
min_missed_distance = int(0.25 * sampling_rate)
signal_peaks = []
SPKI = 0.0
NPKI = 0.0
last_peak = 0
last_index = -1
# NOTE: Using plateau_size=(1,1) here avoids detecting flat peaks and
# maintains original py-ecg-detectors behaviour. Flat peaks are typically
# found in measurement artifacts where the signal saturates at maximum
# recording amplitude. Such cases should not be detected as peaks. If we
# do encounter recordings where even normal R peaks are flat, then changing
# this to something like plateau_size=(1, sampling_rate // 10) might make
# sense. See also https://github.com/neuropsychology/NeuroKit/pull/450.
peaks, _ = scipy.signal.find_peaks(detection, plateau_size=(1, 1))
for index, peak in enumerate(peaks):
peak_value = detection[peak]
threshold_I1 = NPKI + 0.25 * (SPKI - NPKI)
if peak_value > threshold_I1 and peak > last_peak + min_peak_distance:
signal_peaks.append(peak)
# RR_missed threshold is based on the previous eight R-R intervals
if len(signal_peaks) > 9:
RR_ave = (signal_peaks[-2] - signal_peaks[-10]) // 8
RR_missed = int(1.66 * RR_ave)
if peak - last_peak > RR_missed:
missed_peaks = peaks[last_index + 1 : index]
missed_peaks = missed_peaks[
(missed_peaks > last_peak + min_missed_distance)
& (missed_peaks < peak - min_missed_distance)
]
threshold_I2 = 0.5 * threshold_I1
missed_peaks = missed_peaks[detection[missed_peaks] > threshold_I2]
if len(missed_peaks) > 0:
signal_peaks[-1] = missed_peaks[np.argmax(detection[missed_peaks])]
signal_peaks.append(peak)
last_peak = peak
last_index = index
SPKI = 0.125 * peak_value + 0.875 * SPKI
else:
NPKI = 0.125 * peak_value + 0.875 * NPKI
return signal_peaks