2 lines (1 with data), 12.6 kB
{"cells":[{"cell_type":"code","execution_count":null,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"executionInfo":{"elapsed":10000,"status":"ok","timestamp":1662474870845,"user":{"displayName":"Edouard .C","userId":"07320798258634684264"},"user_tz":-120},"id":"7udATNDUW1mz","outputId":"43047e21-d0f8-4bf1-f6e1-39e90d5fdaa3"},"outputs":[],"source":["import os\n","import numpy as np\n","import pandas as pd\n","import scipy\n","from IPython.display import clear_output\n","import scipy.stats as stats\n","import heartpy\n","import json"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["\"\"\"Please extract the files in WESAD.zip in a file that will contains S2,S3,...Sk folders of the subject\n","\n","If you want to use a subject for testing (I have used the S17 for testing) I suggest you to move your S17 file to a Testing folder.\n","That is to say for eg : /content/data/WESAD contains S2,S3,...Sk folders but does not contain S17 folder\n"," /content/data/Testing contains S17 folder\n","\"\"\"\n","\n","DIR_WESAD=\"/content/data/WESAD/\" # Please complete the path to your file that contains S2,S3,...Sk folders. eg : DIR_WESAD=/content/data/WESAD/\n","DIR_SAVING_DATA= \"/content/data/Dataset/\" #Please complete the path where you want to save your data once treated"]},{"cell_type":"markdown","metadata":{"id":"Px7fRkoQJN0F"},"source":["# Fonctions extraction ECG"]},{"cell_type":"code","execution_count":null,"metadata":{"id":"roLCFX6OJRlG"},"outputs":[],"source":["def peak_pos(x : np.array,threshhold: float):\n"," \"\"\" Detect the ECG peaks on the signal x, peaks detected must go above the float value threshold\n"," \n"," The signal is smoothed before the peaks detection using the mean on 5 values with a convolution \n"," Returns the index of the peaks \n"," \"\"\"\n"," assert len(x.shape)==1\n"," x=(x-np.mean(x))/np.std(x)\n"," smoothed=[]\n"," conv=np.array([0.2,0.2,0.2,0.2,0.2])\n"," smoothed=np.convolve(x,conv, mode='same')\n"," baseline=float(np.mean(smoothed))\n"," peakindex=-1\n"," peakindices=[]\n"," peakvalue=-100\n","\n"," for index in range(0,len(smoothed)):\n"," value=smoothed[index]\n","\n"," if value>baseline:\n","\n"," if peakvalue ==-100 or value>peakvalue :\n"," peakindex=index\n"," peakvalue=value\n","\n"," if value<baseline and peakindex!=-1 and peakvalue>threshhold:\n"," peakindices.append(peakindex)\n"," peakindex=-1\n"," peakvalue=-100\n","\n"," if peakindex!=-1 and peakvalue>threshhold:\n"," peakindices.append(peakindex)\n","\n"," return np.array(peakindices)\n","\n","\n","def TINN(x:np.array):\n"," \"\"\" Compute all the triangular interpolation to calculate the TINN scores. It also computes HRV index from an array x which contains \n"," all the interbeats times for a given ECG signal.\n","\n"," The axis is divided in 2 parts respectively on the right and left of the abscissa of the maximum value of the gaussian distribution\n"," The TINN score calculation is defined in the WESAD Dataset paper, to calculate it we needthe closest triangular interpolation \n"," of the gaussian distribution of the interbeats times. The triangular interpolation is defined by 2 lines that meet at the maximum value\n"," of the gaussian distribution and cross the x-axis in N on the first half of the x-axis and M on the second half of the x-axis. \n"," Thus inside ]N;M[ the interpolation function != 0\n"," Outside of ]N;M[ the interpolation function equals 0.\n"," \"\"\"\n","\n"," kernel = stats.gaussian_kde(x) #Create an approximated kernel for gaussian distribution from the x array (interbeats times)\n"," absi=np.linspace(np.min(x),np.max(x),len(x)) # Compute the x-axis of the interbeats distribution (from minimum interbeat time to maximum interbeat time)\n"," val=kernel.evaluate(absi) # Fit the gaussian distribution to the created x-axis\n"," ecart=absi[1]-absi[0] # Space between 2 values on the axis\n"," maxind=np.argmax(val) # Select the index for which the gaussian distribution (val array) is maximum \n"," max_pos=absi[maxind] # Interbeat time (abscissa) for which the gaussian distribution is maximum\n"," maxvalue=np.amax(val) # Max of the gaussian distribution\n"," N_abs=absi[0:maxind+1] # First half of the x-axis\n"," M_abs=absi[maxind:] # Second half of the x-axis\n"," HRVindex=len(x)/maxvalue\n"," err_N=[]\n"," err_M=[]\n","\n"," for i in range(0,len(N_abs)-1):\n"," N=N_abs[i]\n"," slope=(maxvalue)/(max_pos-N)\n"," D=val[0:maxind+1]\n"," q=np.clip(slope*ecart*np.arange(-i,-i+maxind+1),0,None) #Triangular interpolation on the First half of the x-axis\n"," diff=D-q \n"," err=np.multiply(diff,diff)\n"," err1=np.delete(err,-1)\n"," err2=np.delete(err, 0)\n"," errint=(err1+err2)/2\n"," errtot=np.linalg.norm(errint) # Error area between the triangular interpolation and the gaussian distribution on the first half of the x-axis\n"," err_N.append((errtot,N,N_abs,q))\n"," \n"," for i in range(1,len(M_abs)):\n"," M=M_abs[i]\n"," slope=(maxvalue)/(max_pos-M)\n"," D=val[maxind:]\n"," q=np.clip(slope*ecart*np.arange(-i,len(D)-i),0,None) #Triangular interpolation on the second half of the x-axis\n"," diff=D-q\n"," err=np.multiply(diff,diff)\n"," err1=np.delete(err,-1)\n"," err2=np.delete(err, 0)\n"," errint=(err1+err2)/2\n"," errtot=np.linalg.norm(errint) # Error area between the triangular interpolation and the gaussian distribution on the second half of the x-axis\n"," err_M.append((errtot,M,M_abs,q))\n","\n"," return (err_N,err_M,absi,val,HRVindex)\n","\n","def best_TINN(x:np.array):\n"," \"\"\"Select the best N and M that give the best triangular interpolation function approximation of the gaussian distrbution and return\n"," N; M; the TINN score = M-N ; and the HRV index\n"," \n"," \"\"\"\n"," err_N,err_M,_,_,HRVindex=TINN(x)\n"," N=np.argmin(np.array(err_N,dtype=object)[:,0])\n"," M=np.argmin(np.array(err_M,dtype=object)[:,0])\n"," absN=err_N[N][1]\n"," absM=err_M[M][1]\n"," return float(absN),float(absM),float(absM-absN),HRVindex\n","\n","def num_compare_NN50(x,i):\n"," \"\"\"Count the number of HRV intervals differing more than 50 ms for a given HRV interval x[i]\n"," \n"," \"\"\"\n"," ref=x[i]\n"," k=0\n"," diff=np.absolute(x-ref)\n"," k+=np.sum(np.where(diff>0.05,1,0))\n"," return k \n","\n","def compare_NN50(x):\n"," \"\"\" Returns the number and percentage of HRV intervals differing more than 50ms for all intervals\n"," \n"," \"\"\"\n"," k=0\n"," for i in range(0,len(x)):\n"," k+=num_compare_NN50(x,i)\n"," if k==0:\n"," k=1\n"," return k,(k/(len(x)*len(x)))\n","\n","def get_freq_features_ecg(x):\n"," \"\"\" Returns frequential features of the Heart Rate Variability signal (interbeats times) by computing FFT, to compute the Fouriers \n"," Frequencies the mean of the Heart Rate variability is used as sampling period \n"," \"\"\"\n"," mean=np.mean(x)\n"," yf=np.array(scipy.fft.fft(x-mean))\n"," xf=scipy.fft.fftfreq(len(x),mean)[0:len(x)//2]\n"," psd=(2/len(yf))*np.abs(yf)[0:len(x)//2]\n"," fmean=np.mean(xf)\n"," fstd=np.std(xf)\n"," sumpsd=np.sum(psd)\n"," return fmean,fstd,sumpsd\n","\n","def get_data_ecg(x):\n"," \"\"\" Collect the features of a given ECG signal x, using HeartPy package to compute the peak list (not the previous developed peak \n"," detection function). \n"," \"\"\"\n"," working,mes=heartpy.process(x,700)\n"," peak=working[\"peaklist\"]\n"," periods=np.array([(peak[i+1]-peak[i])/700 for i in range(0,len(peak)-1)])\n"," frequency=1/periods\n"," meanfreq = np.mean(frequency)\n"," stdfreq = np.std(frequency)\n"," HRV=np.array([(peak[i]-peak[i-1])/700 for i in range(1,len(peak))])\n"," _,_,T,HRVindex=best_TINN(HRV)\n"," num50,p50=compare_NN50(HRV)\n"," meanHRV=np.mean(HRV)\n"," stdHRV=np.std(HRV)\n"," rmsHRV=np.sqrt(np.mean(HRV**2))\n"," fmean,fstd,sumpsd=get_freq_features_ecg(HRV)\n"," return np.array([meanfreq,stdfreq,T,HRVindex,num50,p50,meanHRV,stdHRV,rmsHRV,fmean,fstd,sumpsd])\n","\n","def slice_per_label(labelseq,flabel,time_window,step):\n"," \"\"\"Return a list of index i of the ECG signal that in [i;i+pts_windows] the label (= emotionnal state of the user) is constant.\n"," The window is defined by the user by time_window (s) and flabel (freq of sampling for the label, eg 700Hz) (Hz).\n"," \"\"\"\n"," pts_window=time_window*flabel\n"," taken_indices=[]\n"," conv=np.array([1 for i in range(0,pts_window)])\n"," for i in range(0,len(labelseq)-pts_window,flabel*step): #Sliding 5s window, step 1s\n"," extr=labelseq[i:i+pts_window]\n"," res=np.sum(np.multiply(extr,conv))\n"," l=labelseq[i]\n"," if l in [1,2,3,4]:\n"," condition=l*pts_window==res\n"," if condition==True:\n"," taken_indices.append((i,l))\n"," return taken_indices\n","\n","def get_ecg_f_from_file(path):\n"," \"\"\"Extract all the ECG features for each window of the ECG signal of the given file. Once it is extracted the features are\n"," normalized with the features computed on an ECG signal when the subject is in a neutral state. If the extraction fails for a \n"," given window the extracted features are discarded.\n"," At the end the number of discarded extract is printed\n"," \"\"\"\n"," id=[]\n"," labels=[]\n"," features=[]\n"," taken_indices=[]\n"," fecg=700\n"," i=0\n"," discard=0\n"," discardbis=0\n"," pts_window=20*700\n"," df = pd.read_pickle(path)\n"," label=df['label']\n"," print(\"openned\")\n"," indice_neutral=np.where(label==1)[0] #Find where is the neutral phase for baseline\n"," taken_indices=slice_per_label(label,700,20,1) #get all indices of the start of a slice of the sequence with cst label\n"," print(\"indiced\")\n"," ECG_neutral = np.array(df[\"signal\"][\"chest\"][\"ECG\"][indice_neutral[0]:indice_neutral[-1]*700][:,0]) #Baseline\n"," features_neutral=get_data_ecg(ECG_neutral) #Baseline extraction\n"," for x in taken_indices:\n"," i+=1\n"," if i%100==0:\n"," clear_output(wait=True)\n"," print(path)\n"," print(i/len(taken_indices))\n"," indice = x[0]\n"," try:\n"," ECG=np.array(df['signal']['chest']['ECG'][(indice*fecg)//700:(pts_window+indice)*fecg//700][:,0])\n"," result=np.divide(get_data_ecg(ECG),features_neutral)\n"," if not (np.isinf(result).any() or np.isnan(result).any()):\n"," features.append(result.tolist())\n"," labels.append(int(x[1]))\n"," else : \n"," discard+=1\n"," except KeyboardInterrupt:\n"," print(1/0)\n"," except : \n"," discardbis+=1\n"," print(\"reject because infinit : \" +str(discard))\n"," print(\"reject because error in algorithm : \" +str(discardbis))\n"," return features,id,labels,discard,discardbis,ECG_neutral"]},{"cell_type":"markdown","metadata":{"id":"IirNJcHSxSKQ"},"source":["# Extract ECG"]},{"cell_type":"code","execution_count":null,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"executionInfo":{"elapsed":1124560,"status":"ok","timestamp":1662476164697,"user":{"displayName":"Edouard .C","userId":"07320798258634684264"},"user_tz":-120},"id":"JfKHsLkv5Sd7","outputId":"d52dd227-0057-41f7-cfc8-93f85e7ec4bf"},"outputs":[],"source":["\"\"\" The extracted features and label (id is not used anymore) are stored in a dictionnary for each subject and each dictionnary is written\n","in a json file named with the number of the subject\n","\"\"\"\n","\n","dir_wesad=\" \"\n","l= os.listdir(DIR_WESAD) \n","del l[l.index('wesad_readme.pdf')]\n","try :\n"," del l[l.index('wesad_readme.pdf')]\n","except :\n"," pass\n","i=0\n","for name in l:\n"," i+=1\n"," data_w={}\n"," path =str(DIR_WESAD+name+\"/\"+name+\".pkl\")\n"," print(name)\n"," print(i/len(l))\n"," print(len(l))\n"," X,Y,Z,discard,discardbis,neutr=get_ecg_f_from_file(path)\n"," data_w[\"id\"]=Y\n"," data_w[\"label\"]=Z\n"," data_w[\"features\"]=X\n"," with open(DIR_SAVING_DATA+\"WESADECG_\"+name+\".json\", 'w') as f:\n"," json.dump(data_w, f)"]}],"metadata":{"colab":{"collapsed_sections":["Px7fRkoQJN0F"],"provenance":[{"file_id":"1RVFHhENCfsih5pcT4rRuH4DqdJ7HBWaz","timestamp":1645763238456}]},"kernelspec":{"display_name":"Python 3.9.7 64-bit","language":"python","name":"python3"},"language_info":{"name":"python","version":"3.9.7"},"vscode":{"interpreter":{"hash":"acb408c112aa627a318ac6bee697c54a21dc0d988d17c05deacc60f98e48531a"}}},"nbformat":4,"nbformat_minor":0}