Diff of /hrv.py [000000] .. [1a3ddc]

Switch to side-by-side view

--- a
+++ b/hrv.py
@@ -0,0 +1,337 @@
+# A collection of heartrate variability algorithms for
+# both the timedomain and frequency domain.
+#
+# Copyright (C) 2019 Luis Howell & Bernd Porr
+# GPL GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007
+#
+
+
+import numpy as np
+import random
+import subprocess
+from scipy.interpolate import interp1d
+from gatspy.periodic import LombScargleFast
+
+class HRV:
+    """
+    Heartrate variability class which calcualtes the standard HRV
+    parameters with the help of Python functions and for cross
+    validation also via the physionet's get_hrv script.
+    """
+
+    def __init__(self, sampling_frequency):
+        """
+        Constructor takes the sampling frequency.
+        All rr_sample data is in sample number and
+        will assume it's at this sampling rate.
+        """
+
+        self.fs = float(sampling_frequency)
+        self.period = 1.0/sampling_frequency
+
+    
+    def _intervals(self, rr_samples):
+        """Calculate the RR intervals in ms from sample numbers.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :return: RR intervals in milliseconds
+        :rtype: ndarray
+        """
+
+        rr_intervals = np.diff(np.array(rr_samples)*self.period*1000)
+
+        return rr_intervals
+
+    
+    def _timestamps(self, rr_samples):
+        """Calculate the timestamps in ms from sample locations.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :return: The timestamps in milliseconds
+        :rtype: array_like
+        """
+
+        ts = np.array(rr_samples)*self.period*1000
+
+        return ts
+
+    
+    def _succ_diffs(self, rr_samples):
+        """Calculate the successive differences of R peaks.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :return: The successive differences of R peaks
+        :rtype: ndarray
+        """
+
+        rr_ints = self._intervals(rr_samples)
+
+        succ_diffs = []
+
+        for i in range(len(rr_ints)-1):
+
+            diff = rr_ints[i+1] - rr_ints[i]            
+            succ_diffs.append(diff)
+
+        return np.array(succ_diffs)
+
+   
+    def SDNN(self, rr_samples, normalise=False):
+        """Calculate SDNN, the standard deviation of NN intervals.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :param normalise: normalise the SDNN against the average RR interval, defaults to False
+        :type normalise: bool, optional
+        :return: SDNN, the standard deviation of NN intervals
+        :rtype: float
+        """
+
+        rr_intervals = self._intervals(rr_samples) 
+        rr_std = np.std(rr_intervals)
+
+        if normalise:
+            rr_mean_interval = np.mean(rr_intervals)
+            rr_std = rr_std/rr_mean_interval
+
+        return rr_std
+
+    
+    def SDANN(self, rr_samples, average_period=5.0, normalise=False):
+        """Calculate SDANN, the standard deviation of the average RR intervals calculated over short periods.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :param average_period: The averging period in minutes, defaults to 5.0
+        :type average_period: float, optional
+        :param normalise: normalise the SDANN against the average RR interval, defaults to False
+        :type normalise: bool, optional
+        :return: SDANN, the standard deviation of the average RR intervals calculated over short periods
+        :rtype: float
+        """
+
+        average_period_samples = int(self.fs*average_period*60)
+        average_rr_intervals = []
+
+        sections = int((np.max(rr_samples)/average_period_samples)+0.5)
+
+        if sections<1:
+                sections = 1
+
+        for i in range(sections):
+
+                idx = np.where((rr_samples>=(i*average_period_samples)) &
+                               (rr_samples<((i+1)*average_period_samples)))
+                idx = idx[0]
+                section_rr = rr_samples[idx[0]:idx[-1]+1]
+
+                avg_rr_int = np.mean(self._intervals(section_rr))
+                average_rr_intervals.append(avg_rr_int)
+
+        rr_std = np.std(average_rr_intervals)
+
+        if normalise:
+                rr_mean_interval = np.mean(average_rr_intervals)
+                rr_std = rr_std/rr_mean_interval
+
+        return rr_std
+
+
+    def RMSSD(self, rr_samples, normalise = False):
+        """Calculate RMSSD (root mean square of successive differences).
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :param normalise: normalise the RMSSD against the average RR interval, defaults to False
+        :type normalise: bool, optional
+        :return: RMSSD (root mean square of successive differences)
+        :rtype: float
+        """
+
+        succ_diffs = self._succ_diffs(rr_samples)
+        succ_diffs = succ_diffs*succ_diffs
+
+        rms = np.sqrt(np.mean(succ_diffs))
+
+        if normalise:
+            rms = rms / np.mean(self._intervals(rr_samples))
+
+        return rms
+
+
+    def SDSD(self, rr_samples):
+        """Calculate SDSD (standard deviation of successive differences), the standard deviation of the successive differences between adjacent NNs.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :return: SDSD (standard deviation of successive differences)
+        :rtype: float
+        """
+
+        succ_diffs = self._succ_diffs(rr_samples)
+
+        return np.std(succ_diffs)        
+
+    
+    def NN50(self, rr_samples):
+        """Calculate NN50, the number of pairs of successive NNs that differ by more than 50 ms.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :return: NN50
+        :rtype: float
+        """
+
+        succ_diffs = self._succ_diffs(rr_samples)
+
+        over_50 = np.where(abs(succ_diffs)>50)
+        over_50 = over_50[0]
+
+        return len(over_50)
+
+
+    def pNN50(self, rr_samples):
+        """Calculate pNN50, the proportion of NN50 divided by total number of NNs.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :return: pNN50
+        :rtype: float
+        """
+
+        return self.NN50(rr_samples)/(len(rr_samples)-1)
+
+
+    def NN20(self, rr_samples):
+        """Calculate NN20, the number of pairs of successive NNs that differ by more than 20 ms.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :return: NN20
+        :rtype: float
+        """
+
+        succ_diffs = self._succ_diffs(rr_samples)
+
+        over_20 = np.where(abs(succ_diffs)>20)
+        over_20 = over_20[0]
+
+        return len(over_20)
+
+
+    def pNN20(self, rr_samples):
+        """Calculate pNN20, the proportion of NN20 divided by total number of NNs.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :return: pNN20
+        :rtype: float
+        """
+
+        return self.NN20(rr_samples)/(len(rr_samples)-1)
+
+    
+    def HR(self, rr_samples):
+        """Calculate heart-rates from R peak samples.
+        
+        :param rr_samples: R peak sample locations
+        :type rr_samples: array_like
+        :return: Heart-rates in BPM
+        :rtype: ndarray
+        """
+        
+        rr_intervals = np.diff(rr_samples)
+        heart_rates = 60.0/(rr_intervals/float(self.fs))
+
+        return heart_rates
+
+
+    def add_rr_error(self, rr_samples, error):
+        """
+        Adds jitter to the heartrate timestamps. 
+        The error and the rr_samples are in timestamps.
+        Returns the noisy timestamps in samples.
+        """
+        if error==0:
+            return rr_samples
+
+        error_values = np.random.randint(-abs(error), abs(error)+1, len(rr_samples))
+        noisy_rr_samples = rr_samples+error_values
+
+        return noisy_rr_samples
+
+    
+    def fAnalysis(self, rr_samples):
+        """
+        Frequency analysis to calc self.lf, self.hf, returns the LF/HF-ratio and
+        also calculates the spectrum as pairs of (self.f_hr_axis,self.f_hr).
+        The input arrary is in sample points where R peaks have been detected.
+        """
+        # discrete timestamps
+        self.hr_discrete = self._intervals(rr_samples) / 1000
+        # hr positions in time
+        self.t_hr_discrete  = [i/self.fs for i in rr_samples[1:]]
+        # now let's create function which approximates the hr(t) relationship
+        self.hr_func = interp1d(self.t_hr_discrete, self.hr_discrete)
+        # we take 1024 samples for a linear time array for hr(t)
+        nsamp = 1000
+        # linear time array for the heartrate
+        self.t_hr_linear = np.linspace(self.t_hr_discrete[1],
+                                       self.t_hr_discrete[len(self.t_hr_discrete)-2],
+                                       num=nsamp)
+        # duration in secs of the heartrate array minus the ends b/c of the approx
+        duration = self.t_hr_discrete[len(self.t_hr_discrete)-2] - self.t_hr_discrete[1];
+        # heartrate linearly approximated between discrete samples
+        self.hr_linear = self.hr_func(self.t_hr_linear)
+        model = LombScargleFast().fit(self.t_hr_discrete, self.hr_discrete, 1E-2)
+        fmax = 1
+        fmin = 0.01
+        df = (fmax - fmin) / nsamp
+        self.f_hr = model.score_frequency_grid(fmin, df, nsamp)
+        self.f_hr_axis = fmin + df * np.arange(nsamp)
+        # lf
+        self.lf = 0
+        # hf
+        self.hf = 0
+        for i in range(0,int(nsamp/2)):
+            if (self.f_hr_axis[i] >= 0.04) and (self.f_hr_axis[i] <= 0.15):
+                self.lf = self.lf + self.f_hr[i]
+            if (self.f_hr_axis[i] >= 0.15) and (self.f_hr_axis[i] <= 0.4):
+                self.hf = self.hf + self.f_hr[i]
+        # hf
+        return self.lf/self.hf
+
+    
+    def hrv_toolkit(self, rr_samples):
+        """
+        Calculating the HRV parameters with the HRV toolkit.
+        It calls the commandline file get_hrv and then parses the output.
+        To be able to run check out the subdir HRV.src and compile/install
+        the binaries in /usr/local/bin.
+        rr_samples are the samples numbers of the R-peaks.
+        Returns the lf/hf ratio and also provides self.sdnn, 
+        self.rmssd, self.lf and self.hf.
+        """
+        m = np.hstack((np.vstack(self._timestamps(rr_samples[1:])/1000),
+                       np.vstack(self._intervals(rr_samples)/1000)))
+        np.savetxt("hr.txt",m,delimiter = ' ', newline=" N\n")
+        r = subprocess.check_output(['get_hrv', '-s', '-R', 'hr.txt'])
+        r = str(r)
+        a = r.split("\\n")
+        for b in a:
+            x = b.split("=")
+            #print(x)
+            if "SDNN" in x[0]:
+                self.sdnn = float(x[1])
+            elif "rMSSD" in x[0]:
+                self.rmssd = float(x[1])
+            elif "LF PWR" in x[0]:
+                self.lf = float(x[1])
+            elif "HF PWR" in x[0]:
+                self.hf = float(x[1])
+            elif "LF/HF" in x[0]:
+                self.lfhf = float(x[1])
+        return self.lfhf