--- a +++ b/heartbeat.py @@ -0,0 +1,397 @@ +import time +import pandas as pd +import numpy as np +from collections import Counter +from scipy import signal +from scipy.signal import find_peaks, resample +import matplotlib.pyplot as plt +import seaborn as sns +import os +from os import listdir +from os.path import isfile, join +import sys +import warnings + +classes_further= {'N':'Normal beat','L':'Left bundle branch block beat','R':'Right bundle branch block beat', +'A':'Atrial premature beat','a':'Aberrated atrial premature beat','J':'Nodal (junctional) premature beat', +'S':'Supraventricular premature beat','V':'Premature ventricular contraction','F':'Fusion of ventricular and normal beat', +'[':'Start of ventricular flutter/fibrillation','!':'Ventricular flutter wave',']':'End of ventricular flutter/fibrillation','e':'Atrial escape beat', +'j':'Nodal (junctional) escape beat','E':'Ventricular escape beat','/':'Paced beat', +'f':'Fusion of paced and normal beat','x':'Non-conducted P-wave (blocked APB)','Q':'Unclassifiable beat', +'|':'Isolated QRS-like artifact'} + +#https://arxiv.org/abs/1805.00794 (original paper) + +def longest(list1): + """ + Returns index for length of longest list for a list of lists + """ + + return max(enumerate(list1), key = lambda tup: len(tup[1]))[0] + +def moving_average(x,window): + """ + Numpy based moving average function. + Input is a signal and window size + Output is average + """ + return np.convolve(x, np.ones((window,))/window, mode='valid') + +def amplitude_ratio(ecg_signal): + """ + From signal get Ratio of + Postive Signal to Negative + """ + return ecg_signal[np.where(ecg_signal>0)].mean()/abs(ecg_signal[np.where(ecg_signal<0)].mean()) + +def distribution_bar(patients,classes,classes_reducer=None): + """ + Generate simple bar plot graphic + of the condition distributions + across all patients and per patient + + Returns a patient information dictionary + that has a counter list of each examined class + patient_dic['101']= [('N', 1860), ('S', 3), ('V', 0), ('F', 0), ('Q', 2)] + + """ + print('Generating_plot(s)...') + patient_dic= {} + for patient in patients: + sig,ecg_notes= get_patient_data(patient) + patient_list= [] + for c in classes.values(): + summed=0 + if classes_reducer != None: + for i in classes_reducer[c]: + summed += Counter(ecg_notes.type)[i] + patient_list.append((c, summed)) + else: + patient_list.append((c, Counter(ecg_notes.type)[c])) + patient_dic[patient]= patient_list + + r_len = len(patients) + barWidth = 0.25 + bars={} + r={} + for i in range(len(classes)): + bars[i] = [x[i][1] for x in list(patient_dic.values())] + if i == 0: + r[0] = np.arange(r_len).tolist() + else: + r[i] = [x + 1/8 for x in r[i-1]] + + + plt.figure(figsize=(25,20)) + # Make the plot + plt.subplot(211) + condition_count={} + for condition,count in bars.items(): + condition_count[condition] = sum(count) + + s = pd.Series( + list(condition_count.values()), + index = [classes[i] for i in list(condition_count.keys())]) + s.plot(kind='bar') + plt.ylabel('Count') + plt.xlabel('Condition') + + plt.subplot(212) + for i in range(0,len(classes)): + plt.bar(r[i], bars[i], width=barWidth, edgecolor='white', label=classes[i]) + n=r[0] + plt.xticks([n + barWidth for n in range(r_len)],patients) + plt.ylabel('Count', fontweight='bold') + plt.xlabel('Patients ({})'.format(r_len), fontweight='bold') + plt.legend() + plt.show() + return patient_dic + +def update_progress(progress,barLength=30): + """ + Simple Progess Bar. Used in each iteration for + each patient that is processed. Shows percent + completion and an arrow + + """ + s=time.time() + status = "" + if isinstance(progress, int): + progress = float(progress) + if not isinstance(progress, float): + progress = 0 + status = "error: progress var must be float\r\n" + if progress < 0: + progress = 0 + status = "Halt...\r\n" + if progress >= 1: + progress = 1 + status = "Done...\r\n" + t=time.time()-s; + block = int(round(barLength*progress)) + text = "\rPercent: [{}] {:.2f}% {}".format( ">"*block + "-"*(barLength-block), progress*100,status) + sys.stdout.write(text) + sys.stdout.flush() + +def zero_pad(lst): + """ + Import a list of lists [[1 x n],[1 x c],[1 x m]] + Create np array size: with the number of columns + being the length of longest list within list of lists. + All shorter other links are zero padded. + + Ex. + Given [[1 x n],[1 x c],[1 x m]] and m > n > c + return array [3,m] + + [[1 x 1],[1 x 2],[1 x 3]] --> [a1 0 0], + [b1 b2 0], + [m1 m2m m3] + + Input: [[list],[list],[list],...,] + Output: Zero Padded Array [len(list) X len_longest_list] + + """ + pad = len(max(lst, key=len)) + return np.array([i + [0]*(pad-len(i)) for i in lst]) + +def get_HR(peaklist,fs): + """ + Takes in List of Sample Peaks and sampling freq. + Computes average distance between peaks w/r time. + Returns BPM + + Inputs: + peaklist:: list of HR R peaks + fs:: sampling rate + Output: + HR (float) + """ + RR_list = [] + for beat in range(len(peaklist)-1): + RR_interval = (peaklist[beat+1] - peaklist[beat]) #Calculate distance between beats in # of samples + ms_dist = ((RR_interval / fs)) #Convert sample distances to s distances + RR_list.append(ms_dist) + bpm =60 / np.mean(RR_list) # (1 minute) / average R-R interval of signal + return bpm + +def all_patients(): + """ + Assumes that all folder called mit_data is next folder + in current directory. This function can be bypassed if + user acquires list of patients (str or int). + + Inputs: None + + Outputs: List of Patients + + """ + + onlyfiles = [f for f in listdir(os.getcwd()+'/mit_data') if isfile(join(os.getcwd()+'/mit_data', f))] + return np.unique([file[0:3] for file in onlyfiles])[1:].tolist() + +def get_patient_data(patient,norm=True, sample_plot=False): + """ + Assumes that all folder called mit_data is next folder + in current directory. Can change this function internally + or write your own personalized one. + + Input: + patient:: Patient Number [Str or Int] + norm:: (optional) =True --> Normalize Data + sample_plot:: (optional) Show Patient ECG Signal [True or False] + Output: + Normalized Signal Data, Ecg Notes + Ecg_Notes:: Labeled Sample Peaks and Heart Conditions + Ecg_Data:: np.array of signal + """ + widths= [4,8,11,6,3,5,5,8] + + patient=str(patient) + ecg_notes= pd.read_fwf('mit_data/{}annotations.txt'.format(patient),widths=widths).drop(['Unnamed: 0'],axis=1) + ecg_data= pd.read_csv('mit_data/{}.csv'.format(patient)) + ecg_data.columns= ['samp_num','signal','V'] + ecg_notes=ecg_notes[['Sample #','Type','Aux']] + ecg_notes.columns=['sample_num','type','aux'] + if norm == True: + ecg_data.signal= z_norm(ecg_data.signal) + if sample_plot == True: + peaklist= ecg_notes.sample_num.astype(int).values + plt.figure() + b=np.random.choice(len(ecg_data.signal)) + plt.plot(ecg_data.signal) + plt.xlim(b,b+5000) + plt.plot(peaklist, ecg_data.signal[peaklist], "x") + plt.title(' Sample Patient {} data'.format(patient)) + return None + + return ecg_data.signal,ecg_notes + +def z_norm(result): + """ + Normalize Data. This fits + all values between 0 and 1. + """ + result = (result-min(result))/(max(result)-min(result)) + return result + +def hr_sample_len(HR,fs=360): + """ + Convert a HR to sample len + + """ + return int((fs*60)/HR)# 60 seconds * samples/sec + +def isolate_patient_data(patients,classes,classes_further,classes_reducer=None, min_HR= 40,max_HR= 140,fs=360,verbose=False,plot_figs=True): + """ + Isolation Model. Examines Patients, Normalizes Signal, + and creates a python array with a length of the number of heat + beates by the lenght of the longest heart rate signal. Signals + that are smaller this are zero padded. These represent the X + values that used for training. They data includes patient number, + patient heart rate, and class of condition the heart beat corres- + ponds too. + + + Input: + patients:: Patient Numbers list of Patient numbers [list] + classes:: classes to be examined {dic} + classes_further:: expansion of previous classes with names {dic} + classes_reducer:: optional dictionary to reduce classes + min_HR:: (optional) minimum HR to consider (longer HR Sample Rate) + max_HR:: (optional) max HR to consider (longer HR Sample Rate) + fs:: (optional) sampling frequency --> 360 for this database + verbose:: (optional) prints out some information per patient if true [boolean] + plot_figs:: (optional) prints out HR and Heat Beat distributions + + Output: + X,y np arrays + Isolated beat:: list of lists of each patient ecg data (unpadded) + """ + isolated_beat= [] + start=time.time() + print('Examining {} patients...'.format(len(patients))) + + for i,patient in enumerate(patients): + ecg_signal,ecg_notes= get_patient_data(patient) + peaklist= ecg_notes.sample_num.astype(int).values + for c in classes.values(): + class_loc=[] + if classes_reducer != None: + for rc in classes_reducer[c]: + class_loc.extend(ecg_notes.loc[ecg_notes.type == rc]['sample_num'].values.tolist()) + else: + class_loc= ecg_notes.loc[ecg_notes.type == c]['sample_num'].values + for n in range(1,len(peaklist)-1): + + if peaklist[n] in set(class_loc): + delta1= int(np.round((peaklist[n+1]-peaklist[n])/2)) + delta2= int(np.round((peaklist[n]-peaklist[n-1])/2)) + peak_data= ecg_signal[peaklist[n]-delta2:peaklist[n]+delta1] + if hr_sample_len(max_HR) <= len(peak_data) <= hr_sample_len(min_HR): + isolated_beat.append([patient,get_HR(peaklist,fs=fs),c]+peak_data.tolist()) + + if verbose == True: + print('\nPatient {}...'.format(patient)) + print('Normalizing --> [0 1]') + print('Patient HR',get_HR(peaklist,fs=fs)) + if len(patients) <=1: + update_progress(i/(len(patients))) + else: + update_progress(i/(len(patients)-1)) + + print('\nPadding...\n') + isolated_beats= zero_pad(isolated_beat) + X=isolated_beats[:,3:].astype(float) + y=isolated_beats[:,:3] + avg_samp=np.array([len(l) for l in isolated_beat]).mean() + print('\nAverage HR Sample Len: {:.2f} samples ({:.2f}s per beat)'.format(avg_samp,avg_samp/360)) + print('Average HR: {:.2f} bpm'.format(y[:,1].astype(float).mean())) + + if plot_figs== True: + if len(patients)==1: + print('\n*****Error Will Arise with Plot because only single sample used*****\n') + print('Plotting...\n') + plt.figure(figsize=(20,10)) + plt.subplot(121) + x=[len(elem) for elem in isolated_beat] + warnings.filterwarnings("ignore") + sns.distplot(x,rug=True) + plt.title('Heart Rate RR Width') + plt.xlabel('HR Sample Interval Length') + plt.subplot(122) + sns.distplot(y[:,1].astype(float),rug=True) + plt.title('Heart Rate Distribution') + plt.xlabel('HR [bpm]') + plt.show() + warnings.resetwarnings() + print('Data Loaded | Shape:{}\n'.format(isolated_beats.shape)) + for label,count in Counter(y[:,2].tolist()).items(): + print(' {} cases of {}\n'.format(count,classes_further[label])) + print('{:.2f}min Runtime'.format((time.time()-start)/60)) + return X,y,isolated_beat + +def show_sample_plots(X,y,classes,classes_further,num_sigs=5,fs=360,plot_xlim=1,dims=[2,4]): + """ + Sample Plot Generator Function. Show user selected amount of + signals per class to show on plt subplots. (One per class --> 2x4) + + Input: + X:: Isolated Patient HR [nd array] + y:: Grouping of (Patient, HR, Class) [nd array] + classes:: classes to be examined {dic} + classes_further:: expansion of previous classes with names {dic} + fs:: (optional) sampling frequency --> 360 for this database + plot_xlim:: xlim dimenstions + Output: + Sample signal plots per class + """ + time= np.arange(0, X.shape[1])/fs + print("MAX HB TIME:",(X.shape[1])/fs) + colors=['m','c','b','g','r'] + plt.figure(figsize=(25,15)) + for c in range(len(classes)): + samples = np.argwhere(y[:,2] == classes[c]).flatten() + rand=np.random.choice(len(samples)) + samples=samples[0+rand:num_sigs+rand] + if len(samples)>0: + subplot_val=dims[0]*100+dims[1]*10+1+c + plt.subplot(subplot_val) + label=classes[c] + plt.title(classes_further[label]+': '+classes[c]) + for samp in samples: + plt.plot(time,X[samp]) + plt.xlim(0,plot_xlim) + plt.xlabel('time [s]') + plt.ylabel('Voltage [-1,1]') + +def most_common_conditions(patients,top_k): + """ + Illustration of the top k classes + for the input number of patients + + """ + allp=[] #all patient notes + for p in patients: + sig,notes=get_patient_data(p) + allp.extend(notes.type.values.tolist()) + + return Counter(allp).most_common(top_k) + +def resample_vals(X,samp_len=187): + """ + Signal resampling function + """ + + X_resamp= np.zeros((X.shape[0],samp_len)) + for i,x in enumerate(X): + X_resamp[i]=resample(x,samp_len) + return X_resamp + +##################MAIN######################### +if __name__ == '__main__': + classes= {0:'N',1:'L',2:'R',3:'V',4:'/',5:'A',6:'f',7:'F'} + patients = all_patients() + patients=[101] + X,y,isolated_beat=isolate_patient_data(patients=patients, classes=classes,classes_further=classes_further, + fs=360,verbose=False,plot_figs=True) \ No newline at end of file