Diff of /ecgdetectors.py [000000] .. [1a3ddc]

Switch to side-by-side view

--- a
+++ b/ecgdetectors.py
@@ -0,0 +1,594 @@
+#
+# A collection of 7 ECG heartbeat detection algorithms implemented
+# in Python. Developed in conjunction with a new ECG database:
+# http://researchdata.gla.ac.uk/716/.
+#
+# Copyright (C) 2019 Luis Howell & Bernd Porr
+# GPL GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007
+#
+import numpy as np
+import pywt
+import pathlib
+import scipy.signal as signal
+from biosppy import ecg
+
+
+class Detectors:
+    """ECG heartbeat detection algorithms
+    General useage instructions:
+    r_peaks = detectors.the_detector(ecg_in_samples)
+    The argument ecg_in_samples is a single channel ECG in volt
+    at the given sample rate.
+    """
+    
+    def __init__(self, sampling_frequency):
+        """
+        The constructor takes the sampling rate in Hz of the ECG data.
+        """
+
+        self.fs = sampling_frequency
+
+
+    def hamilton_detector(self, unfiltered_ecg):
+        """
+        P.S. Hamilton, 
+        Open Source ECG Analysis Software Documentation, E.P.Limited, 2002.
+        """
+        
+        f1 = 8/self.fs
+        f2 = 16/self.fs
+
+        b, a = signal.butter(1, [f1*2, f2*2], btype='bandpass')
+
+        filtered_ecg = signal.lfilter(b, a, unfiltered_ecg)
+
+        diff = abs(np.diff(filtered_ecg))
+
+        b = np.ones(int(0.08*self.fs))
+        b = b/int(0.08*self.fs)
+        a = [1]
+
+        ma = signal.lfilter(b, a, diff)
+
+        ma[0:len(b)*2] = 0
+
+        n_pks = []
+        n_pks_ave = 0.0
+        s_pks = []
+        s_pks_ave = 0.0
+        QRS = [0]
+        RR = []
+        RR_ave = 0.0
+
+        th = 0.0
+
+        i=0
+        idx = []
+        peaks = []  
+
+        for i in range(len(ma)):
+
+            if i>0 and i<len(ma)-1:
+                if ma[i-1]<ma[i] and ma[i+1]<ma[i]:
+                    peak = i
+                    peaks.append(i)
+
+                    if ma[peak] > th and (peak-QRS[-1])>0.3*self.fs:        
+                        QRS.append(peak)
+                        idx.append(i)
+                        s_pks.append(ma[peak])
+                        if len(n_pks)>8:
+                            s_pks.pop(0)
+                        s_pks_ave = np.mean(s_pks)
+
+                        if RR_ave != 0.0:
+                            if QRS[-1]-QRS[-2] > 1.5*RR_ave:
+                                missed_peaks = peaks[idx[-2]+1:idx[-1]]
+                                for missed_peak in missed_peaks:
+                                    if missed_peak-peaks[idx[-2]]>int(0.360*self.fs) and ma[missed_peak]>0.5*th:
+                                        QRS.append(missed_peak)
+                                        QRS.sort()
+                                        break
+
+                        if len(QRS)>2:
+                            RR.append(QRS[-1]-QRS[-2])
+                            if len(RR)>8:
+                                RR.pop(0)
+                            RR_ave = int(np.mean(RR))
+
+                    else:
+                        n_pks.append(ma[peak])
+                        if len(n_pks)>8:
+                            n_pks.pop(0)
+                        n_pks_ave = np.mean(n_pks)
+
+                    th = n_pks_ave + 0.45*(s_pks_ave-n_pks_ave)
+
+                    i+=1
+
+        QRS.pop(0)
+
+        return QRS
+
+    
+    def christov_detector(self, unfiltered_ecg):
+        """
+        Ivaylo I. Christov, 
+        Real time electrocardiogram QRS detection using combined 
+        adaptive threshold, BioMedical Engineering OnLine 2004, 
+        vol. 3:28, 2004.
+        """
+        total_taps = 0
+
+        b = np.ones(int(0.02*self.fs))
+        b = b/int(0.02*self.fs)
+        total_taps += len(b)
+        a = [1]
+
+        MA1 = signal.lfilter(b, a, unfiltered_ecg)
+
+        b = np.ones(int(0.028*self.fs))
+        b = b/int(0.028*self.fs)
+        total_taps += len(b)
+        a = [1]
+
+        MA2 = signal.lfilter(b, a, MA1)
+
+        Y = []
+        for i in range(1, len(MA2)-1):
+            
+            diff = abs(MA2[i+1]-MA2[i-1])
+
+            Y.append(diff)
+
+        b = np.ones(int(0.040*self.fs))
+        b = b/int(0.040*self.fs)
+        total_taps += len(b)
+        a = [1]
+
+        MA3 = signal.lfilter(b, a, Y)
+
+        MA3[0:total_taps] = 0
+
+        ms50 = int(0.05*self.fs)
+        ms200 = int(0.2*self.fs)
+        ms1200 = int(1.2*self.fs)
+        ms350 = int(0.35*self.fs)
+
+        M = 0
+        newM5 = 0
+        M_list = []
+        MM = []
+        M_slope = np.linspace(1.0, 0.6, ms1200-ms200)
+        F = 0
+        F_list = []
+        R = 0
+        RR = []
+        Rm = 0
+        R_list = []
+
+        MFR = 0
+        MFR_list = []
+
+        QRS = []
+
+        for i in range(len(MA3)):
+
+            # M
+            if i < 5*self.fs:
+                M = 0.6*np.max(MA3[:i+1])
+                MM.append(M)
+                if len(MM)>5:
+                    MM.pop(0)
+
+            elif QRS and i < QRS[-1]+ms200:
+                newM5 = 0.6*np.max(MA3[QRS[-1]:i])
+                if newM5>1.5*MM[-1]:
+                    newM5 = 1.1*MM[-1]
+
+            elif QRS and i == QRS[-1]+ms200:
+                if newM5==0:
+                    newM5 = MM[-1]
+                MM.append(newM5)
+                if len(MM)>5:
+                    MM.pop(0)    
+                M = np.mean(MM)    
+            
+            elif QRS and i > QRS[-1]+ms200 and i < QRS[-1]+ms1200:
+
+                M = np.mean(MM)*M_slope[i-(QRS[-1]+ms200)]
+
+            elif QRS and i > QRS[-1]+ms1200:
+                M = 0.6*np.mean(MM)
+
+            # F
+            if i > ms350:
+                F_section = MA3[i-ms350:i]
+                max_latest = np.max(F_section[-ms50:])
+                max_earliest = np.max(F_section[:ms50])
+                F = F + ((max_latest-max_earliest)/150.0)
+
+            # R
+            if QRS and i < QRS[-1]+int((2.0/3.0*Rm)):
+
+                R = 0
+
+            elif QRS and i > QRS[-1]+int((2.0/3.0*Rm)) and i < QRS[-1]+Rm:
+
+                dec = (M-np.mean(MM))/1.4
+                R = 0 + dec
+
+
+            MFR = M+F+R
+            M_list.append(M)
+            F_list.append(F)
+            R_list.append(R)
+            MFR_list.append(MFR)
+
+            if not QRS and MA3[i]>MFR:
+                QRS.append(i)
+            
+            elif QRS and i > QRS[-1]+ms200 and MA3[i]>MFR:
+                QRS.append(i)
+                if len(QRS)>2:
+                    RR.append(QRS[-1]-QRS[-2])
+                    if len(RR)>5:
+                        RR.pop(0)
+                    Rm = int(np.mean(RR))
+
+        QRS.pop(0)
+        
+        return QRS
+
+    
+    def engzee_detector(self, unfiltered_ecg):
+        """
+        C. Zeelenberg, A single scan algorithm for QRS detection and
+        feature extraction, IEEE Comp. in Cardiology, vol. 6,
+        pp. 37-42, 1979 with modifications A. Lourenco, H. Silva,
+        P. Leite, R. Lourenco and A. Fred, “Real Time
+        Electrocardiogram Segmentation for Finger Based ECG
+        Biometrics”, BIOSIGNALS 2012, pp. 49-54, 2012.
+        """
+                
+        f1 = 48/self.fs
+        f2 = 52/self.fs
+        b, a = signal.butter(4, [f1*2, f2*2], btype='bandstop')
+        filtered_ecg = signal.lfilter(b, a, unfiltered_ecg)
+
+        diff = np.zeros(len(filtered_ecg))
+        for i in range(4, len(diff)):
+            diff[i] = filtered_ecg[i]-filtered_ecg[i-4]
+
+        ci = [1,4,6,4,1]        
+        low_pass = signal.lfilter(ci, 1, diff)
+
+        low_pass[:int(0.2*self.fs)] = 0
+      
+        ms200 = int(0.2*self.fs)
+        ms1200 = int(1.2*self.fs)        
+        ms160 = int(0.16*self.fs)
+        neg_threshold = int(0.01*self.fs)
+
+        M = 0
+        M_list = []
+        neg_m = []
+        MM = []
+        M_slope = np.linspace(1.0, 0.6, ms1200-ms200)
+
+        QRS = []
+        r_peaks = []
+
+        counter = 0
+
+        thi_list = []
+        thi = False
+        thf_list = []
+        thf = False
+
+        for i in range(len(low_pass)):
+
+            # M
+            if i < 5*self.fs:
+                M = 0.6*np.max(low_pass[:i+1])
+                MM.append(M)
+                if len(MM)>5:
+                    MM.pop(0)
+
+            elif QRS and i < QRS[-1]+ms200:
+
+                newM5 = 0.6*np.max(low_pass[QRS[-1]:i])
+
+                if newM5>1.5*MM[-1]:
+                    newM5 = 1.1*MM[-1]
+
+            elif QRS and i == QRS[-1]+ms200:
+                MM.append(newM5)
+                if len(MM)>5:
+                    MM.pop(0)    
+                M = np.mean(MM)    
+            
+            elif QRS and i > QRS[-1]+ms200 and i < QRS[-1]+ms1200:
+
+                M = np.mean(MM)*M_slope[i-(QRS[-1]+ms200)]
+
+            elif QRS and i > QRS[-1]+ms1200:
+                M = 0.6*np.mean(MM)
+
+            M_list.append(M)
+            neg_m.append(-M)
+
+
+            if not QRS and low_pass[i]>M:
+                QRS.append(i)
+                thi_list.append(i)
+                thi = True
+            
+            elif QRS and i > QRS[-1]+ms200 and low_pass[i]>M:
+                QRS.append(i)
+                thi_list.append(i)
+                thi = True
+
+            if thi and i<thi_list[-1]+ms160:
+                if low_pass[i]<-M and low_pass[i-1]>-M:
+                    #thf_list.append(i)
+                    thf = True
+                    
+                if thf and low_pass[i]<-M:
+                    thf_list.append(i)
+                    counter += 1
+                
+                elif low_pass[i]>-M and thf:
+                    counter = 0
+                    thi = False
+                    thf = False
+            
+            elif thi and i>thi_list[-1]+ms160:
+                    counter = 0
+                    thi = False
+                    thf = False                                        
+            
+        return thi_list
+
+    
+    def matched_filter_detector(self, unfiltered_ecg):
+        """
+        FIR matched filter using template of QRS complex.
+        Template provided for 250Hz and 360Hz.
+        Uses the Pan and Tompkins thresholding method.
+        """
+        current_dir = pathlib.Path(__file__).resolve()
+        
+        if self.fs == 250:
+            template_dir = current_dir.parent/'templates'/'template_250hz.csv'
+            template = np.loadtxt(template_dir)
+        elif self.fs == 360:
+            template_dir = current_dir.parent/'templates'/'template_360hz.csv'
+            template = np.loadtxt(template_dir)
+        else:
+            print('\n!!No template for this frequency!!\n')
+
+        f0 = 0.1/self.fs
+        f1 = 48/self.fs
+
+        b, a = signal.butter(4, [f0*2, f1*2], btype='bandpass')
+
+        prefiltered_ecg = signal.lfilter(b, a, unfiltered_ecg)
+
+        matched_coeffs = template[::-1]  #time reversing template
+
+        detection = signal.lfilter(matched_coeffs, 1, prefiltered_ecg)  # matched filter FIR filtering
+        squared = detection*detection  # squaring matched filter output
+        squared[:len(template)] = 0
+
+        squared_peaks = panPeakDetect(squared, self.fs)
+  
+        return squared_peaks
+
+    
+    def swt_detector(self, unfiltered_ecg):
+        """
+        Stationary Wavelet Transform 
+        based on Vignesh Kalidas and Lakshman Tamil. 
+        Real-time QRS detector using Stationary Wavelet Transform 
+        for Automated ECG Analysis. 
+        In: 2017 IEEE 17th International Conference on 
+        Bioinformatics and Bioengineering (BIBE). 
+        Uses the Pan and Tompkins thresolding.
+        """
+        
+        swt_level=3
+        padding = -1
+        for i in range(1000):
+            if (len(unfiltered_ecg)+i)%2**swt_level == 0:
+                padding = i
+                break
+
+        if padding > 0:
+            unfiltered_ecg = np.pad(unfiltered_ecg, (0, padding), 'edge')
+        elif padding == -1:
+            print("Padding greater than 1000 required\n")    
+
+        swt_ecg = pywt.swt(unfiltered_ecg, 'db3', level=swt_level)
+        swt_ecg = np.array(swt_ecg)
+        swt_ecg = swt_ecg[0, 1, :]
+
+        squared = swt_ecg*swt_ecg
+
+        f1 = 0.01/self.fs
+        f2 = 10/self.fs
+
+        b, a = signal.butter(3, [f1*2, f2*2], btype='bandpass')
+        filtered_squared = signal.lfilter(b, a, squared)       
+
+        filt_peaks = panPeakDetect(filtered_squared, self.fs)
+        
+        return filt_peaks
+
+
+    def pan_tompkins_detector(self, unfiltered_ecg):
+        """
+        Jiapu Pan and Willis J. Tompkins.
+        A Real-Time QRS Detection Algorithm. 
+        In: IEEE Transactions on Biomedical Engineering 
+        BME-32.3 (1985), pp. 230–236.
+        """
+        
+        f1 = 5/self.fs
+        f2 = 15/self.fs
+
+        b, a = signal.butter(1, [f1*2, f2*2], btype='bandpass')
+
+        filtered_ecg = signal.lfilter(b, a, unfiltered_ecg)        
+
+        diff = np.diff(filtered_ecg) 
+
+        squared = diff*diff
+
+        N = int(0.12*self.fs)
+        mwa = MWA(squared, N)
+        mwa[:int(0.2*self.fs)] = 0
+
+        mwa_peaks = panPeakDetect(mwa, self.fs)
+
+        return mwa_peaks
+
+
+    def two_average_detector(self, unfiltered_ecg):
+        """
+        Elgendi, Mohamed & Jonkman, 
+        Mirjam & De Boer, Friso. (2010).
+        Frequency Bands Effects on QRS Detection.
+        The 3rd International Conference on Bio-inspired Systems 
+        and Signal Processing (BIOSIGNALS2010). 428-431.
+        """
+        
+        f1 = 8/self.fs
+        f2 = 20/self.fs
+
+        b, a = signal.butter(2, [f1*2, f2*2], btype='bandpass')
+
+        filtered_ecg = signal.lfilter(b, a, unfiltered_ecg)
+
+        window1 = int(0.12*self.fs)
+        mwa_qrs = MWA(abs(filtered_ecg), window1)
+
+        window2 = int(0.6*self.fs)
+        mwa_beat = MWA(abs(filtered_ecg), window2)
+
+        blocks = np.zeros(len(unfiltered_ecg))
+        block_height = np.max(filtered_ecg)
+
+        for i in range(len(mwa_qrs)):
+            if mwa_qrs[i] > mwa_beat[i]:
+                blocks[i] = block_height
+            else:
+                blocks[i] = 0
+
+        QRS = []
+
+        for i in range(1, len(blocks)):
+            if blocks[i-1] == 0 and blocks[i] == block_height:
+                start = i
+            
+            elif blocks[i-1] == block_height and blocks[i] == 0:
+                end = i-1
+
+                if end-start>int(0.08*self.fs):
+                    detection = np.argmax(filtered_ecg[start:end+1])+start
+                    if QRS:
+                        if detection-QRS[-1]>int(0.3*self.fs):
+                            QRS.append(detection)
+                    else:
+                        QRS.append(detection)
+
+        return QRS
+
+
+def MWA(input_array, window_size):
+
+    mwa = np.zeros(len(input_array))
+    for i in range(len(input_array)):
+        if i < window_size:
+            section = input_array[0:i]
+        else:
+            section = input_array[i-window_size:i]
+        
+        if i!=0:
+            mwa[i] = np.mean(section)
+        else:
+            mwa[i] = input_array[i]
+
+    return mwa
+
+
+def normalise(input_array):
+
+    output_array = (input_array-np.min(input_array))/(np.max(input_array)-np.min(input_array))
+
+    return output_array
+
+
+def panPeakDetect(detection, fs):    
+
+    min_distance = int(0.25*fs)
+
+    signal_peaks = [0]
+    noise_peaks = []
+
+    SPKI = 0.0
+    NPKI = 0.0
+
+    threshold_I1 = 0.0
+    threshold_I2 = 0.0
+
+    RR_missed = 0
+    index = 0
+    indexes = []
+
+    missed_peaks = []
+    peaks = []
+
+    for i in range(len(detection)):
+
+        if i>0 and i<len(detection)-1:
+            if detection[i-1]<detection[i] and detection[i+1]<detection[i]:
+                peak = i
+                peaks.append(i)
+
+                if detection[peak]>threshold_I1 and (peak-signal_peaks[-1])>0.3*fs:
+                        
+                    signal_peaks.append(peak)
+                    indexes.append(index)
+                    SPKI = 0.125*detection[signal_peaks[-1]] + 0.875*SPKI
+                    if RR_missed!=0:
+                        if signal_peaks[-1]-signal_peaks[-2]>RR_missed:
+                            missed_section_peaks = peaks[indexes[-2]+1:indexes[-1]]
+                            missed_section_peaks2 = []
+                            for missed_peak in missed_section_peaks:
+                                if missed_peak-signal_peaks[-2]>min_distance and signal_peaks[-1]-missed_peak>min_distance and detection[missed_peak]>threshold_I2:
+                                    missed_section_peaks2.append(missed_peak)
+
+                            if len(missed_section_peaks2)>0:           
+                                missed_peak = missed_section_peaks2[np.argmax(detection[missed_section_peaks2])]
+                                missed_peaks.append(missed_peak)
+                                signal_peaks.append(signal_peaks[-1])
+                                signal_peaks[-2] = missed_peak   
+
+                else:
+                    noise_peaks.append(peak)
+                    NPKI = 0.125*detection[noise_peaks[-1]] + 0.875*NPKI
+
+                threshold_I1 = NPKI + 0.25*(SPKI-NPKI)
+                threshold_I2 = 0.5*threshold_I1
+
+                if len(signal_peaks)>8:
+                    RR = np.diff(signal_peaks[-9:])
+                    RR_ave = int(np.mean(RR))
+                    RR_missed = int(1.66*RR_ave)
+
+                index = index+1      
+    
+    signal_peaks.pop(0)
+
+    return signal_peaks