--- a +++ b/ecgdetectors.py @@ -0,0 +1,594 @@ +# +# A collection of 7 ECG heartbeat detection algorithms implemented +# in Python. Developed in conjunction with a new ECG database: +# http://researchdata.gla.ac.uk/716/. +# +# Copyright (C) 2019 Luis Howell & Bernd Porr +# GPL GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007 +# +import numpy as np +import pywt +import pathlib +import scipy.signal as signal +from biosppy import ecg + + +class Detectors: + """ECG heartbeat detection algorithms + General useage instructions: + r_peaks = detectors.the_detector(ecg_in_samples) + The argument ecg_in_samples is a single channel ECG in volt + at the given sample rate. + """ + + def __init__(self, sampling_frequency): + """ + The constructor takes the sampling rate in Hz of the ECG data. + """ + + self.fs = sampling_frequency + + + def hamilton_detector(self, unfiltered_ecg): + """ + P.S. Hamilton, + Open Source ECG Analysis Software Documentation, E.P.Limited, 2002. + """ + + f1 = 8/self.fs + f2 = 16/self.fs + + b, a = signal.butter(1, [f1*2, f2*2], btype='bandpass') + + filtered_ecg = signal.lfilter(b, a, unfiltered_ecg) + + diff = abs(np.diff(filtered_ecg)) + + b = np.ones(int(0.08*self.fs)) + b = b/int(0.08*self.fs) + a = [1] + + ma = signal.lfilter(b, a, diff) + + ma[0:len(b)*2] = 0 + + n_pks = [] + n_pks_ave = 0.0 + s_pks = [] + s_pks_ave = 0.0 + QRS = [0] + RR = [] + RR_ave = 0.0 + + th = 0.0 + + i=0 + idx = [] + peaks = [] + + for i in range(len(ma)): + + if i>0 and i<len(ma)-1: + if ma[i-1]<ma[i] and ma[i+1]<ma[i]: + peak = i + peaks.append(i) + + if ma[peak] > th and (peak-QRS[-1])>0.3*self.fs: + QRS.append(peak) + idx.append(i) + s_pks.append(ma[peak]) + if len(n_pks)>8: + s_pks.pop(0) + s_pks_ave = np.mean(s_pks) + + if RR_ave != 0.0: + if QRS[-1]-QRS[-2] > 1.5*RR_ave: + missed_peaks = peaks[idx[-2]+1:idx[-1]] + for missed_peak in missed_peaks: + if missed_peak-peaks[idx[-2]]>int(0.360*self.fs) and ma[missed_peak]>0.5*th: + QRS.append(missed_peak) + QRS.sort() + break + + if len(QRS)>2: + RR.append(QRS[-1]-QRS[-2]) + if len(RR)>8: + RR.pop(0) + RR_ave = int(np.mean(RR)) + + else: + n_pks.append(ma[peak]) + if len(n_pks)>8: + n_pks.pop(0) + n_pks_ave = np.mean(n_pks) + + th = n_pks_ave + 0.45*(s_pks_ave-n_pks_ave) + + i+=1 + + QRS.pop(0) + + return QRS + + + def christov_detector(self, unfiltered_ecg): + """ + Ivaylo I. Christov, + Real time electrocardiogram QRS detection using combined + adaptive threshold, BioMedical Engineering OnLine 2004, + vol. 3:28, 2004. + """ + total_taps = 0 + + b = np.ones(int(0.02*self.fs)) + b = b/int(0.02*self.fs) + total_taps += len(b) + a = [1] + + MA1 = signal.lfilter(b, a, unfiltered_ecg) + + b = np.ones(int(0.028*self.fs)) + b = b/int(0.028*self.fs) + total_taps += len(b) + a = [1] + + MA2 = signal.lfilter(b, a, MA1) + + Y = [] + for i in range(1, len(MA2)-1): + + diff = abs(MA2[i+1]-MA2[i-1]) + + Y.append(diff) + + b = np.ones(int(0.040*self.fs)) + b = b/int(0.040*self.fs) + total_taps += len(b) + a = [1] + + MA3 = signal.lfilter(b, a, Y) + + MA3[0:total_taps] = 0 + + ms50 = int(0.05*self.fs) + ms200 = int(0.2*self.fs) + ms1200 = int(1.2*self.fs) + ms350 = int(0.35*self.fs) + + M = 0 + newM5 = 0 + M_list = [] + MM = [] + M_slope = np.linspace(1.0, 0.6, ms1200-ms200) + F = 0 + F_list = [] + R = 0 + RR = [] + Rm = 0 + R_list = [] + + MFR = 0 + MFR_list = [] + + QRS = [] + + for i in range(len(MA3)): + + # M + if i < 5*self.fs: + M = 0.6*np.max(MA3[:i+1]) + MM.append(M) + if len(MM)>5: + MM.pop(0) + + elif QRS and i < QRS[-1]+ms200: + newM5 = 0.6*np.max(MA3[QRS[-1]:i]) + if newM5>1.5*MM[-1]: + newM5 = 1.1*MM[-1] + + elif QRS and i == QRS[-1]+ms200: + if newM5==0: + newM5 = MM[-1] + MM.append(newM5) + if len(MM)>5: + MM.pop(0) + M = np.mean(MM) + + elif QRS and i > QRS[-1]+ms200 and i < QRS[-1]+ms1200: + + M = np.mean(MM)*M_slope[i-(QRS[-1]+ms200)] + + elif QRS and i > QRS[-1]+ms1200: + M = 0.6*np.mean(MM) + + # F + if i > ms350: + F_section = MA3[i-ms350:i] + max_latest = np.max(F_section[-ms50:]) + max_earliest = np.max(F_section[:ms50]) + F = F + ((max_latest-max_earliest)/150.0) + + # R + if QRS and i < QRS[-1]+int((2.0/3.0*Rm)): + + R = 0 + + elif QRS and i > QRS[-1]+int((2.0/3.0*Rm)) and i < QRS[-1]+Rm: + + dec = (M-np.mean(MM))/1.4 + R = 0 + dec + + + MFR = M+F+R + M_list.append(M) + F_list.append(F) + R_list.append(R) + MFR_list.append(MFR) + + if not QRS and MA3[i]>MFR: + QRS.append(i) + + elif QRS and i > QRS[-1]+ms200 and MA3[i]>MFR: + QRS.append(i) + if len(QRS)>2: + RR.append(QRS[-1]-QRS[-2]) + if len(RR)>5: + RR.pop(0) + Rm = int(np.mean(RR)) + + QRS.pop(0) + + return QRS + + + def engzee_detector(self, unfiltered_ecg): + """ + C. Zeelenberg, A single scan algorithm for QRS detection and + feature extraction, IEEE Comp. in Cardiology, vol. 6, + pp. 37-42, 1979 with modifications A. Lourenco, H. Silva, + P. Leite, R. Lourenco and A. Fred, “Real Time + Electrocardiogram Segmentation for Finger Based ECG + Biometrics”, BIOSIGNALS 2012, pp. 49-54, 2012. + """ + + f1 = 48/self.fs + f2 = 52/self.fs + b, a = signal.butter(4, [f1*2, f2*2], btype='bandstop') + filtered_ecg = signal.lfilter(b, a, unfiltered_ecg) + + diff = np.zeros(len(filtered_ecg)) + for i in range(4, len(diff)): + diff[i] = filtered_ecg[i]-filtered_ecg[i-4] + + ci = [1,4,6,4,1] + low_pass = signal.lfilter(ci, 1, diff) + + low_pass[:int(0.2*self.fs)] = 0 + + ms200 = int(0.2*self.fs) + ms1200 = int(1.2*self.fs) + ms160 = int(0.16*self.fs) + neg_threshold = int(0.01*self.fs) + + M = 0 + M_list = [] + neg_m = [] + MM = [] + M_slope = np.linspace(1.0, 0.6, ms1200-ms200) + + QRS = [] + r_peaks = [] + + counter = 0 + + thi_list = [] + thi = False + thf_list = [] + thf = False + + for i in range(len(low_pass)): + + # M + if i < 5*self.fs: + M = 0.6*np.max(low_pass[:i+1]) + MM.append(M) + if len(MM)>5: + MM.pop(0) + + elif QRS and i < QRS[-1]+ms200: + + newM5 = 0.6*np.max(low_pass[QRS[-1]:i]) + + if newM5>1.5*MM[-1]: + newM5 = 1.1*MM[-1] + + elif QRS and i == QRS[-1]+ms200: + MM.append(newM5) + if len(MM)>5: + MM.pop(0) + M = np.mean(MM) + + elif QRS and i > QRS[-1]+ms200 and i < QRS[-1]+ms1200: + + M = np.mean(MM)*M_slope[i-(QRS[-1]+ms200)] + + elif QRS and i > QRS[-1]+ms1200: + M = 0.6*np.mean(MM) + + M_list.append(M) + neg_m.append(-M) + + + if not QRS and low_pass[i]>M: + QRS.append(i) + thi_list.append(i) + thi = True + + elif QRS and i > QRS[-1]+ms200 and low_pass[i]>M: + QRS.append(i) + thi_list.append(i) + thi = True + + if thi and i<thi_list[-1]+ms160: + if low_pass[i]<-M and low_pass[i-1]>-M: + #thf_list.append(i) + thf = True + + if thf and low_pass[i]<-M: + thf_list.append(i) + counter += 1 + + elif low_pass[i]>-M and thf: + counter = 0 + thi = False + thf = False + + elif thi and i>thi_list[-1]+ms160: + counter = 0 + thi = False + thf = False + + return thi_list + + + def matched_filter_detector(self, unfiltered_ecg): + """ + FIR matched filter using template of QRS complex. + Template provided for 250Hz and 360Hz. + Uses the Pan and Tompkins thresholding method. + """ + current_dir = pathlib.Path(__file__).resolve() + + if self.fs == 250: + template_dir = current_dir.parent/'templates'/'template_250hz.csv' + template = np.loadtxt(template_dir) + elif self.fs == 360: + template_dir = current_dir.parent/'templates'/'template_360hz.csv' + template = np.loadtxt(template_dir) + else: + print('\n!!No template for this frequency!!\n') + + f0 = 0.1/self.fs + f1 = 48/self.fs + + b, a = signal.butter(4, [f0*2, f1*2], btype='bandpass') + + prefiltered_ecg = signal.lfilter(b, a, unfiltered_ecg) + + matched_coeffs = template[::-1] #time reversing template + + detection = signal.lfilter(matched_coeffs, 1, prefiltered_ecg) # matched filter FIR filtering + squared = detection*detection # squaring matched filter output + squared[:len(template)] = 0 + + squared_peaks = panPeakDetect(squared, self.fs) + + return squared_peaks + + + def swt_detector(self, unfiltered_ecg): + """ + Stationary Wavelet Transform + based on Vignesh Kalidas and Lakshman Tamil. + Real-time QRS detector using Stationary Wavelet Transform + for Automated ECG Analysis. + In: 2017 IEEE 17th International Conference on + Bioinformatics and Bioengineering (BIBE). + Uses the Pan and Tompkins thresolding. + """ + + swt_level=3 + padding = -1 + for i in range(1000): + if (len(unfiltered_ecg)+i)%2**swt_level == 0: + padding = i + break + + if padding > 0: + unfiltered_ecg = np.pad(unfiltered_ecg, (0, padding), 'edge') + elif padding == -1: + print("Padding greater than 1000 required\n") + + swt_ecg = pywt.swt(unfiltered_ecg, 'db3', level=swt_level) + swt_ecg = np.array(swt_ecg) + swt_ecg = swt_ecg[0, 1, :] + + squared = swt_ecg*swt_ecg + + f1 = 0.01/self.fs + f2 = 10/self.fs + + b, a = signal.butter(3, [f1*2, f2*2], btype='bandpass') + filtered_squared = signal.lfilter(b, a, squared) + + filt_peaks = panPeakDetect(filtered_squared, self.fs) + + return filt_peaks + + + def pan_tompkins_detector(self, unfiltered_ecg): + """ + Jiapu Pan and Willis J. Tompkins. + A Real-Time QRS Detection Algorithm. + In: IEEE Transactions on Biomedical Engineering + BME-32.3 (1985), pp. 230–236. + """ + + f1 = 5/self.fs + f2 = 15/self.fs + + b, a = signal.butter(1, [f1*2, f2*2], btype='bandpass') + + filtered_ecg = signal.lfilter(b, a, unfiltered_ecg) + + diff = np.diff(filtered_ecg) + + squared = diff*diff + + N = int(0.12*self.fs) + mwa = MWA(squared, N) + mwa[:int(0.2*self.fs)] = 0 + + mwa_peaks = panPeakDetect(mwa, self.fs) + + return mwa_peaks + + + def two_average_detector(self, unfiltered_ecg): + """ + Elgendi, Mohamed & Jonkman, + Mirjam & De Boer, Friso. (2010). + Frequency Bands Effects on QRS Detection. + The 3rd International Conference on Bio-inspired Systems + and Signal Processing (BIOSIGNALS2010). 428-431. + """ + + f1 = 8/self.fs + f2 = 20/self.fs + + b, a = signal.butter(2, [f1*2, f2*2], btype='bandpass') + + filtered_ecg = signal.lfilter(b, a, unfiltered_ecg) + + window1 = int(0.12*self.fs) + mwa_qrs = MWA(abs(filtered_ecg), window1) + + window2 = int(0.6*self.fs) + mwa_beat = MWA(abs(filtered_ecg), window2) + + blocks = np.zeros(len(unfiltered_ecg)) + block_height = np.max(filtered_ecg) + + for i in range(len(mwa_qrs)): + if mwa_qrs[i] > mwa_beat[i]: + blocks[i] = block_height + else: + blocks[i] = 0 + + QRS = [] + + for i in range(1, len(blocks)): + if blocks[i-1] == 0 and blocks[i] == block_height: + start = i + + elif blocks[i-1] == block_height and blocks[i] == 0: + end = i-1 + + if end-start>int(0.08*self.fs): + detection = np.argmax(filtered_ecg[start:end+1])+start + if QRS: + if detection-QRS[-1]>int(0.3*self.fs): + QRS.append(detection) + else: + QRS.append(detection) + + return QRS + + +def MWA(input_array, window_size): + + mwa = np.zeros(len(input_array)) + for i in range(len(input_array)): + if i < window_size: + section = input_array[0:i] + else: + section = input_array[i-window_size:i] + + if i!=0: + mwa[i] = np.mean(section) + else: + mwa[i] = input_array[i] + + return mwa + + +def normalise(input_array): + + output_array = (input_array-np.min(input_array))/(np.max(input_array)-np.min(input_array)) + + return output_array + + +def panPeakDetect(detection, fs): + + min_distance = int(0.25*fs) + + signal_peaks = [0] + noise_peaks = [] + + SPKI = 0.0 + NPKI = 0.0 + + threshold_I1 = 0.0 + threshold_I2 = 0.0 + + RR_missed = 0 + index = 0 + indexes = [] + + missed_peaks = [] + peaks = [] + + for i in range(len(detection)): + + if i>0 and i<len(detection)-1: + if detection[i-1]<detection[i] and detection[i+1]<detection[i]: + peak = i + peaks.append(i) + + if detection[peak]>threshold_I1 and (peak-signal_peaks[-1])>0.3*fs: + + signal_peaks.append(peak) + indexes.append(index) + SPKI = 0.125*detection[signal_peaks[-1]] + 0.875*SPKI + if RR_missed!=0: + if signal_peaks[-1]-signal_peaks[-2]>RR_missed: + missed_section_peaks = peaks[indexes[-2]+1:indexes[-1]] + missed_section_peaks2 = [] + for missed_peak in missed_section_peaks: + if missed_peak-signal_peaks[-2]>min_distance and signal_peaks[-1]-missed_peak>min_distance and detection[missed_peak]>threshold_I2: + missed_section_peaks2.append(missed_peak) + + if len(missed_section_peaks2)>0: + missed_peak = missed_section_peaks2[np.argmax(detection[missed_section_peaks2])] + missed_peaks.append(missed_peak) + signal_peaks.append(signal_peaks[-1]) + signal_peaks[-2] = missed_peak + + else: + noise_peaks.append(peak) + NPKI = 0.125*detection[noise_peaks[-1]] + 0.875*NPKI + + threshold_I1 = NPKI + 0.25*(SPKI-NPKI) + threshold_I2 = 0.5*threshold_I1 + + if len(signal_peaks)>8: + RR = np.diff(signal_peaks[-9:]) + RR_ave = int(np.mean(RR)) + RR_missed = int(1.66*RR_ave) + + index = index+1 + + signal_peaks.pop(0) + + return signal_peaks