Diff of /hrv.py [000000] .. [1a3ddc]

Switch to unified view

a b/hrv.py
1
# A collection of heartrate variability algorithms for
2
# both the timedomain and frequency domain.
3
#
4
# Copyright (C) 2019 Luis Howell & Bernd Porr
5
# GPL GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007
6
#
7
8
9
import numpy as np
10
import random
11
import subprocess
12
from scipy.interpolate import interp1d
13
from gatspy.periodic import LombScargleFast
14
15
class HRV:
16
    """
17
    Heartrate variability class which calcualtes the standard HRV
18
    parameters with the help of Python functions and for cross
19
    validation also via the physionet's get_hrv script.
20
    """
21
22
    def __init__(self, sampling_frequency):
23
        """
24
        Constructor takes the sampling frequency.
25
        All rr_sample data is in sample number and
26
        will assume it's at this sampling rate.
27
        """
28
29
        self.fs = float(sampling_frequency)
30
        self.period = 1.0/sampling_frequency
31
32
    
33
    def _intervals(self, rr_samples):
34
        """Calculate the RR intervals in ms from sample numbers.
35
        
36
        :param rr_samples: R peak sample locations
37
        :type rr_samples: array_like
38
        :return: RR intervals in milliseconds
39
        :rtype: ndarray
40
        """
41
42
        rr_intervals = np.diff(np.array(rr_samples)*self.period*1000)
43
44
        return rr_intervals
45
46
    
47
    def _timestamps(self, rr_samples):
48
        """Calculate the timestamps in ms from sample locations.
49
        
50
        :param rr_samples: R peak sample locations
51
        :type rr_samples: array_like
52
        :return: The timestamps in milliseconds
53
        :rtype: array_like
54
        """
55
56
        ts = np.array(rr_samples)*self.period*1000
57
58
        return ts
59
60
    
61
    def _succ_diffs(self, rr_samples):
62
        """Calculate the successive differences of R peaks.
63
        
64
        :param rr_samples: R peak sample locations
65
        :type rr_samples: array_like
66
        :return: The successive differences of R peaks
67
        :rtype: ndarray
68
        """
69
70
        rr_ints = self._intervals(rr_samples)
71
72
        succ_diffs = []
73
74
        for i in range(len(rr_ints)-1):
75
76
            diff = rr_ints[i+1] - rr_ints[i]            
77
            succ_diffs.append(diff)
78
79
        return np.array(succ_diffs)
80
81
   
82
    def SDNN(self, rr_samples, normalise=False):
83
        """Calculate SDNN, the standard deviation of NN intervals.
84
        
85
        :param rr_samples: R peak sample locations
86
        :type rr_samples: array_like
87
        :param normalise: normalise the SDNN against the average RR interval, defaults to False
88
        :type normalise: bool, optional
89
        :return: SDNN, the standard deviation of NN intervals
90
        :rtype: float
91
        """
92
93
        rr_intervals = self._intervals(rr_samples) 
94
        rr_std = np.std(rr_intervals)
95
96
        if normalise:
97
            rr_mean_interval = np.mean(rr_intervals)
98
            rr_std = rr_std/rr_mean_interval
99
100
        return rr_std
101
102
    
103
    def SDANN(self, rr_samples, average_period=5.0, normalise=False):
104
        """Calculate SDANN, the standard deviation of the average RR intervals calculated over short periods.
105
        
106
        :param rr_samples: R peak sample locations
107
        :type rr_samples: array_like
108
        :param average_period: The averging period in minutes, defaults to 5.0
109
        :type average_period: float, optional
110
        :param normalise: normalise the SDANN against the average RR interval, defaults to False
111
        :type normalise: bool, optional
112
        :return: SDANN, the standard deviation of the average RR intervals calculated over short periods
113
        :rtype: float
114
        """
115
116
        average_period_samples = int(self.fs*average_period*60)
117
        average_rr_intervals = []
118
119
        sections = int((np.max(rr_samples)/average_period_samples)+0.5)
120
121
        if sections<1:
122
                sections = 1
123
124
        for i in range(sections):
125
126
                idx = np.where((rr_samples>=(i*average_period_samples)) &
127
                               (rr_samples<((i+1)*average_period_samples)))
128
                idx = idx[0]
129
                section_rr = rr_samples[idx[0]:idx[-1]+1]
130
131
                avg_rr_int = np.mean(self._intervals(section_rr))
132
                average_rr_intervals.append(avg_rr_int)
133
134
        rr_std = np.std(average_rr_intervals)
135
136
        if normalise:
137
                rr_mean_interval = np.mean(average_rr_intervals)
138
                rr_std = rr_std/rr_mean_interval
139
140
        return rr_std
141
142
143
    def RMSSD(self, rr_samples, normalise = False):
144
        """Calculate RMSSD (root mean square of successive differences).
145
        
146
        :param rr_samples: R peak sample locations
147
        :type rr_samples: array_like
148
        :param normalise: normalise the RMSSD against the average RR interval, defaults to False
149
        :type normalise: bool, optional
150
        :return: RMSSD (root mean square of successive differences)
151
        :rtype: float
152
        """
153
154
        succ_diffs = self._succ_diffs(rr_samples)
155
        succ_diffs = succ_diffs*succ_diffs
156
157
        rms = np.sqrt(np.mean(succ_diffs))
158
159
        if normalise:
160
            rms = rms / np.mean(self._intervals(rr_samples))
161
162
        return rms
163
164
165
    def SDSD(self, rr_samples):
166
        """Calculate SDSD (standard deviation of successive differences), the standard deviation of the successive differences between adjacent NNs.
167
        
168
        :param rr_samples: R peak sample locations
169
        :type rr_samples: array_like
170
        :return: SDSD (standard deviation of successive differences)
171
        :rtype: float
172
        """
173
174
        succ_diffs = self._succ_diffs(rr_samples)
175
176
        return np.std(succ_diffs)        
177
178
    
179
    def NN50(self, rr_samples):
180
        """Calculate NN50, the number of pairs of successive NNs that differ by more than 50 ms.
181
        
182
        :param rr_samples: R peak sample locations
183
        :type rr_samples: array_like
184
        :return: NN50
185
        :rtype: float
186
        """
187
188
        succ_diffs = self._succ_diffs(rr_samples)
189
190
        over_50 = np.where(abs(succ_diffs)>50)
191
        over_50 = over_50[0]
192
193
        return len(over_50)
194
195
196
    def pNN50(self, rr_samples):
197
        """Calculate pNN50, the proportion of NN50 divided by total number of NNs.
198
        
199
        :param rr_samples: R peak sample locations
200
        :type rr_samples: array_like
201
        :return: pNN50
202
        :rtype: float
203
        """
204
205
        return self.NN50(rr_samples)/(len(rr_samples)-1)
206
207
208
    def NN20(self, rr_samples):
209
        """Calculate NN20, the number of pairs of successive NNs that differ by more than 20 ms.
210
        
211
        :param rr_samples: R peak sample locations
212
        :type rr_samples: array_like
213
        :return: NN20
214
        :rtype: float
215
        """
216
217
        succ_diffs = self._succ_diffs(rr_samples)
218
219
        over_20 = np.where(abs(succ_diffs)>20)
220
        over_20 = over_20[0]
221
222
        return len(over_20)
223
224
225
    def pNN20(self, rr_samples):
226
        """Calculate pNN20, the proportion of NN20 divided by total number of NNs.
227
        
228
        :param rr_samples: R peak sample locations
229
        :type rr_samples: array_like
230
        :return: pNN20
231
        :rtype: float
232
        """
233
234
        return self.NN20(rr_samples)/(len(rr_samples)-1)
235
236
    
237
    def HR(self, rr_samples):
238
        """Calculate heart-rates from R peak samples.
239
        
240
        :param rr_samples: R peak sample locations
241
        :type rr_samples: array_like
242
        :return: Heart-rates in BPM
243
        :rtype: ndarray
244
        """
245
        
246
        rr_intervals = np.diff(rr_samples)
247
        heart_rates = 60.0/(rr_intervals/float(self.fs))
248
249
        return heart_rates
250
251
252
    def add_rr_error(self, rr_samples, error):
253
        """
254
        Adds jitter to the heartrate timestamps. 
255
        The error and the rr_samples are in timestamps.
256
        Returns the noisy timestamps in samples.
257
        """
258
        if error==0:
259
            return rr_samples
260
261
        error_values = np.random.randint(-abs(error), abs(error)+1, len(rr_samples))
262
        noisy_rr_samples = rr_samples+error_values
263
264
        return noisy_rr_samples
265
266
    
267
    def fAnalysis(self, rr_samples):
268
        """
269
        Frequency analysis to calc self.lf, self.hf, returns the LF/HF-ratio and
270
        also calculates the spectrum as pairs of (self.f_hr_axis,self.f_hr).
271
        The input arrary is in sample points where R peaks have been detected.
272
        """
273
        # discrete timestamps
274
        self.hr_discrete = self._intervals(rr_samples) / 1000
275
        # hr positions in time
276
        self.t_hr_discrete  = [i/self.fs for i in rr_samples[1:]]
277
        # now let's create function which approximates the hr(t) relationship
278
        self.hr_func = interp1d(self.t_hr_discrete, self.hr_discrete)
279
        # we take 1024 samples for a linear time array for hr(t)
280
        nsamp = 1000
281
        # linear time array for the heartrate
282
        self.t_hr_linear = np.linspace(self.t_hr_discrete[1],
283
                                       self.t_hr_discrete[len(self.t_hr_discrete)-2],
284
                                       num=nsamp)
285
        # duration in secs of the heartrate array minus the ends b/c of the approx
286
        duration = self.t_hr_discrete[len(self.t_hr_discrete)-2] - self.t_hr_discrete[1];
287
        # heartrate linearly approximated between discrete samples
288
        self.hr_linear = self.hr_func(self.t_hr_linear)
289
        model = LombScargleFast().fit(self.t_hr_discrete, self.hr_discrete, 1E-2)
290
        fmax = 1
291
        fmin = 0.01
292
        df = (fmax - fmin) / nsamp
293
        self.f_hr = model.score_frequency_grid(fmin, df, nsamp)
294
        self.f_hr_axis = fmin + df * np.arange(nsamp)
295
        # lf
296
        self.lf = 0
297
        # hf
298
        self.hf = 0
299
        for i in range(0,int(nsamp/2)):
300
            if (self.f_hr_axis[i] >= 0.04) and (self.f_hr_axis[i] <= 0.15):
301
                self.lf = self.lf + self.f_hr[i]
302
            if (self.f_hr_axis[i] >= 0.15) and (self.f_hr_axis[i] <= 0.4):
303
                self.hf = self.hf + self.f_hr[i]
304
        # hf
305
        return self.lf/self.hf
306
307
    
308
    def hrv_toolkit(self, rr_samples):
309
        """
310
        Calculating the HRV parameters with the HRV toolkit.
311
        It calls the commandline file get_hrv and then parses the output.
312
        To be able to run check out the subdir HRV.src and compile/install
313
        the binaries in /usr/local/bin.
314
        rr_samples are the samples numbers of the R-peaks.
315
        Returns the lf/hf ratio and also provides self.sdnn, 
316
        self.rmssd, self.lf and self.hf.
317
        """
318
        m = np.hstack((np.vstack(self._timestamps(rr_samples[1:])/1000),
319
                       np.vstack(self._intervals(rr_samples)/1000)))
320
        np.savetxt("hr.txt",m,delimiter = ' ', newline=" N\n")
321
        r = subprocess.check_output(['get_hrv', '-s', '-R', 'hr.txt'])
322
        r = str(r)
323
        a = r.split("\\n")
324
        for b in a:
325
            x = b.split("=")
326
            #print(x)
327
            if "SDNN" in x[0]:
328
                self.sdnn = float(x[1])
329
            elif "rMSSD" in x[0]:
330
                self.rmssd = float(x[1])
331
            elif "LF PWR" in x[0]:
332
                self.lf = float(x[1])
333
            elif "HF PWR" in x[0]:
334
                self.hf = float(x[1])
335
            elif "LF/HF" in x[0]:
336
                self.lfhf = float(x[1])
337
        return self.lfhf