|
a |
|
b/hrv.py |
|
|
1 |
# A collection of heartrate variability algorithms for |
|
|
2 |
# both the timedomain and frequency domain. |
|
|
3 |
# |
|
|
4 |
# Copyright (C) 2019 Luis Howell & Bernd Porr |
|
|
5 |
# GPL GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007 |
|
|
6 |
# |
|
|
7 |
|
|
|
8 |
|
|
|
9 |
import numpy as np |
|
|
10 |
import random |
|
|
11 |
import subprocess |
|
|
12 |
from scipy.interpolate import interp1d |
|
|
13 |
from gatspy.periodic import LombScargleFast |
|
|
14 |
|
|
|
15 |
class HRV: |
|
|
16 |
""" |
|
|
17 |
Heartrate variability class which calcualtes the standard HRV |
|
|
18 |
parameters with the help of Python functions and for cross |
|
|
19 |
validation also via the physionet's get_hrv script. |
|
|
20 |
""" |
|
|
21 |
|
|
|
22 |
def __init__(self, sampling_frequency): |
|
|
23 |
""" |
|
|
24 |
Constructor takes the sampling frequency. |
|
|
25 |
All rr_sample data is in sample number and |
|
|
26 |
will assume it's at this sampling rate. |
|
|
27 |
""" |
|
|
28 |
|
|
|
29 |
self.fs = float(sampling_frequency) |
|
|
30 |
self.period = 1.0/sampling_frequency |
|
|
31 |
|
|
|
32 |
|
|
|
33 |
def _intervals(self, rr_samples): |
|
|
34 |
"""Calculate the RR intervals in ms from sample numbers. |
|
|
35 |
|
|
|
36 |
:param rr_samples: R peak sample locations |
|
|
37 |
:type rr_samples: array_like |
|
|
38 |
:return: RR intervals in milliseconds |
|
|
39 |
:rtype: ndarray |
|
|
40 |
""" |
|
|
41 |
|
|
|
42 |
rr_intervals = np.diff(np.array(rr_samples)*self.period*1000) |
|
|
43 |
|
|
|
44 |
return rr_intervals |
|
|
45 |
|
|
|
46 |
|
|
|
47 |
def _timestamps(self, rr_samples): |
|
|
48 |
"""Calculate the timestamps in ms from sample locations. |
|
|
49 |
|
|
|
50 |
:param rr_samples: R peak sample locations |
|
|
51 |
:type rr_samples: array_like |
|
|
52 |
:return: The timestamps in milliseconds |
|
|
53 |
:rtype: array_like |
|
|
54 |
""" |
|
|
55 |
|
|
|
56 |
ts = np.array(rr_samples)*self.period*1000 |
|
|
57 |
|
|
|
58 |
return ts |
|
|
59 |
|
|
|
60 |
|
|
|
61 |
def _succ_diffs(self, rr_samples): |
|
|
62 |
"""Calculate the successive differences of R peaks. |
|
|
63 |
|
|
|
64 |
:param rr_samples: R peak sample locations |
|
|
65 |
:type rr_samples: array_like |
|
|
66 |
:return: The successive differences of R peaks |
|
|
67 |
:rtype: ndarray |
|
|
68 |
""" |
|
|
69 |
|
|
|
70 |
rr_ints = self._intervals(rr_samples) |
|
|
71 |
|
|
|
72 |
succ_diffs = [] |
|
|
73 |
|
|
|
74 |
for i in range(len(rr_ints)-1): |
|
|
75 |
|
|
|
76 |
diff = rr_ints[i+1] - rr_ints[i] |
|
|
77 |
succ_diffs.append(diff) |
|
|
78 |
|
|
|
79 |
return np.array(succ_diffs) |
|
|
80 |
|
|
|
81 |
|
|
|
82 |
def SDNN(self, rr_samples, normalise=False): |
|
|
83 |
"""Calculate SDNN, the standard deviation of NN intervals. |
|
|
84 |
|
|
|
85 |
:param rr_samples: R peak sample locations |
|
|
86 |
:type rr_samples: array_like |
|
|
87 |
:param normalise: normalise the SDNN against the average RR interval, defaults to False |
|
|
88 |
:type normalise: bool, optional |
|
|
89 |
:return: SDNN, the standard deviation of NN intervals |
|
|
90 |
:rtype: float |
|
|
91 |
""" |
|
|
92 |
|
|
|
93 |
rr_intervals = self._intervals(rr_samples) |
|
|
94 |
rr_std = np.std(rr_intervals) |
|
|
95 |
|
|
|
96 |
if normalise: |
|
|
97 |
rr_mean_interval = np.mean(rr_intervals) |
|
|
98 |
rr_std = rr_std/rr_mean_interval |
|
|
99 |
|
|
|
100 |
return rr_std |
|
|
101 |
|
|
|
102 |
|
|
|
103 |
def SDANN(self, rr_samples, average_period=5.0, normalise=False): |
|
|
104 |
"""Calculate SDANN, the standard deviation of the average RR intervals calculated over short periods. |
|
|
105 |
|
|
|
106 |
:param rr_samples: R peak sample locations |
|
|
107 |
:type rr_samples: array_like |
|
|
108 |
:param average_period: The averging period in minutes, defaults to 5.0 |
|
|
109 |
:type average_period: float, optional |
|
|
110 |
:param normalise: normalise the SDANN against the average RR interval, defaults to False |
|
|
111 |
:type normalise: bool, optional |
|
|
112 |
:return: SDANN, the standard deviation of the average RR intervals calculated over short periods |
|
|
113 |
:rtype: float |
|
|
114 |
""" |
|
|
115 |
|
|
|
116 |
average_period_samples = int(self.fs*average_period*60) |
|
|
117 |
average_rr_intervals = [] |
|
|
118 |
|
|
|
119 |
sections = int((np.max(rr_samples)/average_period_samples)+0.5) |
|
|
120 |
|
|
|
121 |
if sections<1: |
|
|
122 |
sections = 1 |
|
|
123 |
|
|
|
124 |
for i in range(sections): |
|
|
125 |
|
|
|
126 |
idx = np.where((rr_samples>=(i*average_period_samples)) & |
|
|
127 |
(rr_samples<((i+1)*average_period_samples))) |
|
|
128 |
idx = idx[0] |
|
|
129 |
section_rr = rr_samples[idx[0]:idx[-1]+1] |
|
|
130 |
|
|
|
131 |
avg_rr_int = np.mean(self._intervals(section_rr)) |
|
|
132 |
average_rr_intervals.append(avg_rr_int) |
|
|
133 |
|
|
|
134 |
rr_std = np.std(average_rr_intervals) |
|
|
135 |
|
|
|
136 |
if normalise: |
|
|
137 |
rr_mean_interval = np.mean(average_rr_intervals) |
|
|
138 |
rr_std = rr_std/rr_mean_interval |
|
|
139 |
|
|
|
140 |
return rr_std |
|
|
141 |
|
|
|
142 |
|
|
|
143 |
def RMSSD(self, rr_samples, normalise = False): |
|
|
144 |
"""Calculate RMSSD (root mean square of successive differences). |
|
|
145 |
|
|
|
146 |
:param rr_samples: R peak sample locations |
|
|
147 |
:type rr_samples: array_like |
|
|
148 |
:param normalise: normalise the RMSSD against the average RR interval, defaults to False |
|
|
149 |
:type normalise: bool, optional |
|
|
150 |
:return: RMSSD (root mean square of successive differences) |
|
|
151 |
:rtype: float |
|
|
152 |
""" |
|
|
153 |
|
|
|
154 |
succ_diffs = self._succ_diffs(rr_samples) |
|
|
155 |
succ_diffs = succ_diffs*succ_diffs |
|
|
156 |
|
|
|
157 |
rms = np.sqrt(np.mean(succ_diffs)) |
|
|
158 |
|
|
|
159 |
if normalise: |
|
|
160 |
rms = rms / np.mean(self._intervals(rr_samples)) |
|
|
161 |
|
|
|
162 |
return rms |
|
|
163 |
|
|
|
164 |
|
|
|
165 |
def SDSD(self, rr_samples): |
|
|
166 |
"""Calculate SDSD (standard deviation of successive differences), the standard deviation of the successive differences between adjacent NNs. |
|
|
167 |
|
|
|
168 |
:param rr_samples: R peak sample locations |
|
|
169 |
:type rr_samples: array_like |
|
|
170 |
:return: SDSD (standard deviation of successive differences) |
|
|
171 |
:rtype: float |
|
|
172 |
""" |
|
|
173 |
|
|
|
174 |
succ_diffs = self._succ_diffs(rr_samples) |
|
|
175 |
|
|
|
176 |
return np.std(succ_diffs) |
|
|
177 |
|
|
|
178 |
|
|
|
179 |
def NN50(self, rr_samples): |
|
|
180 |
"""Calculate NN50, the number of pairs of successive NNs that differ by more than 50 ms. |
|
|
181 |
|
|
|
182 |
:param rr_samples: R peak sample locations |
|
|
183 |
:type rr_samples: array_like |
|
|
184 |
:return: NN50 |
|
|
185 |
:rtype: float |
|
|
186 |
""" |
|
|
187 |
|
|
|
188 |
succ_diffs = self._succ_diffs(rr_samples) |
|
|
189 |
|
|
|
190 |
over_50 = np.where(abs(succ_diffs)>50) |
|
|
191 |
over_50 = over_50[0] |
|
|
192 |
|
|
|
193 |
return len(over_50) |
|
|
194 |
|
|
|
195 |
|
|
|
196 |
def pNN50(self, rr_samples): |
|
|
197 |
"""Calculate pNN50, the proportion of NN50 divided by total number of NNs. |
|
|
198 |
|
|
|
199 |
:param rr_samples: R peak sample locations |
|
|
200 |
:type rr_samples: array_like |
|
|
201 |
:return: pNN50 |
|
|
202 |
:rtype: float |
|
|
203 |
""" |
|
|
204 |
|
|
|
205 |
return self.NN50(rr_samples)/(len(rr_samples)-1) |
|
|
206 |
|
|
|
207 |
|
|
|
208 |
def NN20(self, rr_samples): |
|
|
209 |
"""Calculate NN20, the number of pairs of successive NNs that differ by more than 20 ms. |
|
|
210 |
|
|
|
211 |
:param rr_samples: R peak sample locations |
|
|
212 |
:type rr_samples: array_like |
|
|
213 |
:return: NN20 |
|
|
214 |
:rtype: float |
|
|
215 |
""" |
|
|
216 |
|
|
|
217 |
succ_diffs = self._succ_diffs(rr_samples) |
|
|
218 |
|
|
|
219 |
over_20 = np.where(abs(succ_diffs)>20) |
|
|
220 |
over_20 = over_20[0] |
|
|
221 |
|
|
|
222 |
return len(over_20) |
|
|
223 |
|
|
|
224 |
|
|
|
225 |
def pNN20(self, rr_samples): |
|
|
226 |
"""Calculate pNN20, the proportion of NN20 divided by total number of NNs. |
|
|
227 |
|
|
|
228 |
:param rr_samples: R peak sample locations |
|
|
229 |
:type rr_samples: array_like |
|
|
230 |
:return: pNN20 |
|
|
231 |
:rtype: float |
|
|
232 |
""" |
|
|
233 |
|
|
|
234 |
return self.NN20(rr_samples)/(len(rr_samples)-1) |
|
|
235 |
|
|
|
236 |
|
|
|
237 |
def HR(self, rr_samples): |
|
|
238 |
"""Calculate heart-rates from R peak samples. |
|
|
239 |
|
|
|
240 |
:param rr_samples: R peak sample locations |
|
|
241 |
:type rr_samples: array_like |
|
|
242 |
:return: Heart-rates in BPM |
|
|
243 |
:rtype: ndarray |
|
|
244 |
""" |
|
|
245 |
|
|
|
246 |
rr_intervals = np.diff(rr_samples) |
|
|
247 |
heart_rates = 60.0/(rr_intervals/float(self.fs)) |
|
|
248 |
|
|
|
249 |
return heart_rates |
|
|
250 |
|
|
|
251 |
|
|
|
252 |
def add_rr_error(self, rr_samples, error): |
|
|
253 |
""" |
|
|
254 |
Adds jitter to the heartrate timestamps. |
|
|
255 |
The error and the rr_samples are in timestamps. |
|
|
256 |
Returns the noisy timestamps in samples. |
|
|
257 |
""" |
|
|
258 |
if error==0: |
|
|
259 |
return rr_samples |
|
|
260 |
|
|
|
261 |
error_values = np.random.randint(-abs(error), abs(error)+1, len(rr_samples)) |
|
|
262 |
noisy_rr_samples = rr_samples+error_values |
|
|
263 |
|
|
|
264 |
return noisy_rr_samples |
|
|
265 |
|
|
|
266 |
|
|
|
267 |
def fAnalysis(self, rr_samples): |
|
|
268 |
""" |
|
|
269 |
Frequency analysis to calc self.lf, self.hf, returns the LF/HF-ratio and |
|
|
270 |
also calculates the spectrum as pairs of (self.f_hr_axis,self.f_hr). |
|
|
271 |
The input arrary is in sample points where R peaks have been detected. |
|
|
272 |
""" |
|
|
273 |
# discrete timestamps |
|
|
274 |
self.hr_discrete = self._intervals(rr_samples) / 1000 |
|
|
275 |
# hr positions in time |
|
|
276 |
self.t_hr_discrete = [i/self.fs for i in rr_samples[1:]] |
|
|
277 |
# now let's create function which approximates the hr(t) relationship |
|
|
278 |
self.hr_func = interp1d(self.t_hr_discrete, self.hr_discrete) |
|
|
279 |
# we take 1024 samples for a linear time array for hr(t) |
|
|
280 |
nsamp = 1000 |
|
|
281 |
# linear time array for the heartrate |
|
|
282 |
self.t_hr_linear = np.linspace(self.t_hr_discrete[1], |
|
|
283 |
self.t_hr_discrete[len(self.t_hr_discrete)-2], |
|
|
284 |
num=nsamp) |
|
|
285 |
# duration in secs of the heartrate array minus the ends b/c of the approx |
|
|
286 |
duration = self.t_hr_discrete[len(self.t_hr_discrete)-2] - self.t_hr_discrete[1]; |
|
|
287 |
# heartrate linearly approximated between discrete samples |
|
|
288 |
self.hr_linear = self.hr_func(self.t_hr_linear) |
|
|
289 |
model = LombScargleFast().fit(self.t_hr_discrete, self.hr_discrete, 1E-2) |
|
|
290 |
fmax = 1 |
|
|
291 |
fmin = 0.01 |
|
|
292 |
df = (fmax - fmin) / nsamp |
|
|
293 |
self.f_hr = model.score_frequency_grid(fmin, df, nsamp) |
|
|
294 |
self.f_hr_axis = fmin + df * np.arange(nsamp) |
|
|
295 |
# lf |
|
|
296 |
self.lf = 0 |
|
|
297 |
# hf |
|
|
298 |
self.hf = 0 |
|
|
299 |
for i in range(0,int(nsamp/2)): |
|
|
300 |
if (self.f_hr_axis[i] >= 0.04) and (self.f_hr_axis[i] <= 0.15): |
|
|
301 |
self.lf = self.lf + self.f_hr[i] |
|
|
302 |
if (self.f_hr_axis[i] >= 0.15) and (self.f_hr_axis[i] <= 0.4): |
|
|
303 |
self.hf = self.hf + self.f_hr[i] |
|
|
304 |
# hf |
|
|
305 |
return self.lf/self.hf |
|
|
306 |
|
|
|
307 |
|
|
|
308 |
def hrv_toolkit(self, rr_samples): |
|
|
309 |
""" |
|
|
310 |
Calculating the HRV parameters with the HRV toolkit. |
|
|
311 |
It calls the commandline file get_hrv and then parses the output. |
|
|
312 |
To be able to run check out the subdir HRV.src and compile/install |
|
|
313 |
the binaries in /usr/local/bin. |
|
|
314 |
rr_samples are the samples numbers of the R-peaks. |
|
|
315 |
Returns the lf/hf ratio and also provides self.sdnn, |
|
|
316 |
self.rmssd, self.lf and self.hf. |
|
|
317 |
""" |
|
|
318 |
m = np.hstack((np.vstack(self._timestamps(rr_samples[1:])/1000), |
|
|
319 |
np.vstack(self._intervals(rr_samples)/1000))) |
|
|
320 |
np.savetxt("hr.txt",m,delimiter = ' ', newline=" N\n") |
|
|
321 |
r = subprocess.check_output(['get_hrv', '-s', '-R', 'hr.txt']) |
|
|
322 |
r = str(r) |
|
|
323 |
a = r.split("\\n") |
|
|
324 |
for b in a: |
|
|
325 |
x = b.split("=") |
|
|
326 |
#print(x) |
|
|
327 |
if "SDNN" in x[0]: |
|
|
328 |
self.sdnn = float(x[1]) |
|
|
329 |
elif "rMSSD" in x[0]: |
|
|
330 |
self.rmssd = float(x[1]) |
|
|
331 |
elif "LF PWR" in x[0]: |
|
|
332 |
self.lf = float(x[1]) |
|
|
333 |
elif "HF PWR" in x[0]: |
|
|
334 |
self.hf = float(x[1]) |
|
|
335 |
elif "LF/HF" in x[0]: |
|
|
336 |
self.lfhf = float(x[1]) |
|
|
337 |
return self.lfhf |