[d437d0]: / Testing ds creator.ipynb

Download this file

2 lines (1 with data), 8.9 kB

{"cells":[{"cell_type":"code","execution_count":null,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"executionInfo":{"elapsed":31262,"status":"ok","timestamp":1662566142239,"user":{"displayName":"Edouard .C","userId":"07320798258634684264"},"user_tz":-120},"id":"7udATNDUW1mz","outputId":"baed0eed-8dbf-46b5-b863-271deb71354a"},"outputs":[],"source":["import os\n","import numpy as np\n","import pandas as pd\n","import scipy\n","from IPython.display import clear_output\n","import scipy.stats as stats\n","import heartpy\n","import json"]},{"cell_type":"code","execution_count":null,"metadata":{},"outputs":[],"source":["\"\"\"Please extract the files in WESAD.zip in a file that will contains S2,S3,...Sk folders of the subject\n","\n","If you want to use a subject for testing (I have used the S17 for testing) I suggest you to move your S17 file to a Testing folder.\n","That is to say for eg : /content/data/WESAD contains S2,S3,...Sk folders but does not contain S17 folder\n","                        /content/data/Testing contains S17 folder\n","\n","\"\"\"\n","DIR_TESTING_FOLDER=\"/content/data/Testing/\" # Please complete the path to your file that contains subject test folder. eg : DIR_TESTING_FOLDER=/content/data/Testing/\n","DIR_SAVING_DATA_TESTING= \"/content/data/Testing/\" #Please complete the path where you want to save your data once treated\n","SUBJECT_USED_FOR_TESTING = \"S17\" #S17 for example"]},{"cell_type":"markdown","metadata":{"id":"Px7fRkoQJN0F"},"source":["# Fonctions extraction ECG"]},{"cell_type":"code","execution_count":3,"metadata":{"executionInfo":{"elapsed":17,"status":"ok","timestamp":1662567751255,"user":{"displayName":"Edouard .C","userId":"07320798258634684264"},"user_tz":-120},"id":"roLCFX6OJRlG"},"outputs":[],"source":["def peak_pos(x : np.array,threshhold: float):\n","  assert len(x.shape)==1\n","  x=(x-np.mean(x))/np.std(x)\n","  smoothed=[]\n","  conv=np.array([0.2,0.2,0.2,0.2,0.2])\n","  smoothed=np.convolve(x,conv, mode='same')\n","  baseline=float(np.mean(smoothed))\n","  peakindex=-1\n","  peakindices=[]\n","  peakvalue=-100\n","  for index in range(0,len(smoothed)):\n","    value=smoothed[index]\n","    if value>baseline:\n","      if peakvalue ==-100 or value>peakvalue :\n","        peakindex=index\n","        peakvalue=value\n","    if value<baseline and peakindex!=-1 and peakvalue>threshhold:\n","      peakindices.append(peakindex)\n","      peakindex=-1\n","      peakvalue=-100\n","  if peakindex!=-1 and peakvalue>threshhold:\n","    peakindices.append(peakindex)\n","  return np.array(peakindices)\n","\n","\n","def TINN(x:np.array):\n","  kernel = stats.gaussian_kde(x)\n","  absi=np.linspace(np.min(x),np.max(x),len(x))\n","  val=kernel.evaluate(absi)\n","  ecart=absi[1]-absi[0]\n","  maxind=np.argmax(val)\n","  max_pos=absi[maxind]\n","  maxvalue=np.amax(val)\n","  N_abs=absi[0:maxind+1]\n","  M_abs=absi[maxind:]\n","  HRVindex=len(x)/maxvalue\n","  err_N=[]\n","  for i in range(0,len(N_abs)-1):\n","    N=N_abs[i]\n","    slope=(maxvalue)/(max_pos-N)\n","    D=val[0:maxind+1]\n","    q=np.clip(slope*ecart*np.arange(-i,-i+maxind+1),0,None)\n","    diff=D-q\n","    err=np.multiply(diff,diff)\n","    err1=np.delete(err,-1)\n","    err2=np.delete(err, 0)\n","    errint=(err1+err2)/2\n","    errtot=np.linalg.norm(errint)\n","    err_N.append((errtot,N,N_abs,q))\n","  err_M=[]\n","  for i in range(1,len(M_abs)):\n","    M=M_abs[i]\n","    slope=(maxvalue)/(max_pos-M)\n","    D=val[maxind:]\n","    q=np.clip(slope*ecart*np.arange(-i,len(D)-i),0,None)\n","    diff=D-q\n","    err=np.multiply(diff,diff)\n","    err1=np.delete(err,-1)\n","    err2=np.delete(err, 0)\n","    errint=(err1+err2)/2\n","    errtot=np.linalg.norm(errint)\n","    err_M.append((errtot,M,M_abs,q))\n","  return (err_N,err_M,absi,val,HRVindex)\n","\n","def best_TINN(x:np.array):\n","  err_N,err_M,_,_,HRVindex=TINN(x)\n","  N=np.argmin(np.array(err_N,dtype=object)[:,0])\n","  M=np.argmin(np.array(err_M,dtype=object)[:,0])\n","  absN=err_N[N][1]\n","  absM=err_M[M][1]\n","  return float(absN),float(absM),float(absM-absN),HRVindex\n","\n","def num_compare_NN50(x,i):\n","  ref=x[i]\n","  k=0\n","  diff=np.absolute(x-ref)\n","  k+=np.sum(np.where(diff>0.05,1,0))\n","  return k \n","\n","def compare_NN50(x):\n","  k=0\n","  for i in range(0,len(x)):\n","    k+=num_compare_NN50(x,i)\n","  if k==0:\n","    k=1\n","  return k,(k/(len(x)*len(x)))\n","\n","def get_freq_features_ecg(x):\n","  mean=np.mean(x)\n","  yf=np.array(scipy.fft.fft(x-mean))\n","  xf=scipy.fft.fftfreq(len(x),mean)[0:len(x)//2]\n","  psd=(2/len(yf))*np.abs(yf)[0:len(x)//2]\n","  fmean=np.mean(xf)\n","  fstd=np.std(xf)\n","  sumpsd=np.sum(psd)\n","  return fmean,fstd,sumpsd #ULf,Lf,Hf,UHf,ratioLH,\n","\n","def get_data_ecg(x):\n","  working,mes=heartpy.process(x,700)\n","  peak=working[\"peaklist\"]\n","  periods=np.array([(peak[i+1]-peak[i])/700 for i in range(0,len(peak)-1)])\n","  frequency=1/periods\n","  meanfreq = np.mean(frequency)\n","  stdfreq = np.std(frequency)\n","  HRV=np.array([(peak[i]-peak[i-1])/700 for i in range(1,len(peak))])\n","  _,_,T,HRVindex=best_TINN(HRV)\n","  num50,p50=compare_NN50(HRV)\n","  meanHRV=np.mean(HRV)\n","  stdHRV=np.std(HRV)\n","  rmsHRV=np.sqrt(np.mean(HRV**2))\n","  fmean,fstd,sumpsd=get_freq_features_ecg(HRV)\n","  return np.array([meanfreq,stdfreq,T,HRVindex,num50,p50,meanHRV,stdHRV,rmsHRV,fmean,fstd,sumpsd]) #ULf,Lf,Hf,UHf,ratioLH\n","\n","def slice_per_label(labelseq,flabel,time_window,step):\n","  pts_window=time_window*flabel\n","  taken_indices=[]\n","  conv=np.array([1 for i in range(0,pts_window)])\n","  for i in range(0,len(labelseq)-pts_window,flabel*step):   #Sliding 5s window, step 1s\n","    extr=labelseq[i:i+pts_window]\n","    res=np.sum(np.multiply(extr,conv))\n","    l=labelseq[i]\n","    if l in [1,2,3,4]:\n","      condition=l*pts_window==res\n","      if condition==True:\n","        taken_indices.append((i,l))\n","  return taken_indices\n","\n","def get_ecg_f_from_file(path):\n","  id=[]\n","  labels=[]\n","  features=[]\n","  taken_indices=[]\n","  fecg=700\n","  i=0\n","  discard=0\n","  discardbis=0\n","  pts_window=20*700\n","  df = pd.read_pickle(path)\n","  label=df['label']\n","  print(\"openned\")\n","  indice_neutral=np.where(label==1)[0]        #Find where is the neutral phase for baseline\n","  taken_indices=slice_per_label(label,700,20,1)   #get all indices of the start of a slice of the sequence with cst label\n","  print(\"indiced\")\n","  ECG_neutral = np.array(df[\"signal\"][\"chest\"][\"ECG\"][indice_neutral[0]:indice_neutral[-1]*700][:,0]) #Baseline\n","  features_neutral=get_data_ecg(ECG_neutral)        #Baseline extraction\n","  for x in taken_indices:\n","    i+=1\n","    if i%100==0:\n","      clear_output(wait=True)\n","      print(path)\n","      print(i/len(taken_indices))\n","    indice = x[0]\n","    try:\n","      ECG=np.array(df['signal']['chest']['ECG'][(indice*fecg)//700:(pts_window+indice)*fecg//700][:,0])\n","      result=np.divide(get_data_ecg(ECG),features_neutral)\n","      if not (np.isinf(result).any() or np.isnan(result).any()):\n","        features.append(result.tolist())\n","        labels.append(int(x[1]))\n","      else : \n","        discard+=1\n","    except KeyboardInterrupt:\n","      print(1/0)\n","    except : \n","      discardbis+=1\n","  print(\"reject because infinit : \" +str(discard))\n","  print(\"reject because error in algorithm : \" +str(discardbis))\n","  return features,id,labels,discard,discardbis,ECG_neutral"]},{"cell_type":"markdown","metadata":{"id":"IirNJcHSxSKQ"},"source":["# Extract ECG"]},{"cell_type":"code","execution_count":null,"metadata":{"colab":{"base_uri":"https://localhost:8080/"},"executionInfo":{"elapsed":70051,"status":"ok","timestamp":1662567822913,"user":{"displayName":"Edouard .C","userId":"07320798258634684264"},"user_tz":-120},"id":"JfKHsLkv5Sd7","outputId":"bb6f4e42-e1ff-44b2-ed7a-8d7b9cf1f3e3"},"outputs":[],"source":["name=SUBJECT_USED_FOR_TESTING\n","data_w={}\n","path =str(DIR_TESTING_FOLDER+name+\"/\"+name+\".pkl\")\n","X,Y,Z,discard,discardbis,neutr=get_ecg_f_from_file(path)\n","data_w[\"id\"]=Y\n","data_w[\"label\"]=Z\n","data_w[\"features\"]=X\n","with open(DIR_SAVING_DATA_TESTING+\"WESADECG_\"+name+\".json\", 'w') as f:\n","  json.dump(data_w, f)\n","  "]}],"metadata":{"colab":{"authorship_tag":"ABX9TyPb1NdC8dXylUXQ+vHRAeAC","collapsed_sections":["Px7fRkoQJN0F"],"provenance":[{"file_id":"1OSDtQEG7si_vZiRYHE_pS9UMZSEA45pT","timestamp":1645769491332},{"file_id":"1RVFHhENCfsih5pcT4rRuH4DqdJ7HBWaz","timestamp":1645763238456}]},"kernelspec":{"display_name":"Python 3.9.7 ('venv': venv)","language":"python","name":"python3"},"language_info":{"name":"python","version":"3.9.7"},"vscode":{"interpreter":{"hash":"b57312d11beb16f41471443c1511f1ce5acdfda8670f4b8103c754f2bf25a58c"}}},"nbformat":4,"nbformat_minor":0}