# A collection of heartrate variability algorithms for
# both the timedomain and frequency domain.
#
# Copyright (C) 2019 Luis Howell & Bernd Porr
# GPL GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007
#
import numpy as np
import random
import subprocess
from scipy.interpolate import interp1d
from gatspy.periodic import LombScargleFast
class HRV:
"""
Heartrate variability class which calcualtes the standard HRV
parameters with the help of Python functions and for cross
validation also via the physionet's get_hrv script.
"""
def __init__(self, sampling_frequency):
"""
Constructor takes the sampling frequency.
All rr_sample data is in sample number and
will assume it's at this sampling rate.
"""
self.fs = float(sampling_frequency)
self.period = 1.0/sampling_frequency
def _intervals(self, rr_samples):
"""Calculate the RR intervals in ms from sample numbers.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:return: RR intervals in milliseconds
:rtype: ndarray
"""
rr_intervals = np.diff(np.array(rr_samples)*self.period*1000)
return rr_intervals
def _timestamps(self, rr_samples):
"""Calculate the timestamps in ms from sample locations.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:return: The timestamps in milliseconds
:rtype: array_like
"""
ts = np.array(rr_samples)*self.period*1000
return ts
def _succ_diffs(self, rr_samples):
"""Calculate the successive differences of R peaks.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:return: The successive differences of R peaks
:rtype: ndarray
"""
rr_ints = self._intervals(rr_samples)
succ_diffs = []
for i in range(len(rr_ints)-1):
diff = rr_ints[i+1] - rr_ints[i]
succ_diffs.append(diff)
return np.array(succ_diffs)
def SDNN(self, rr_samples, normalise=False):
"""Calculate SDNN, the standard deviation of NN intervals.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:param normalise: normalise the SDNN against the average RR interval, defaults to False
:type normalise: bool, optional
:return: SDNN, the standard deviation of NN intervals
:rtype: float
"""
rr_intervals = self._intervals(rr_samples)
rr_std = np.std(rr_intervals)
if normalise:
rr_mean_interval = np.mean(rr_intervals)
rr_std = rr_std/rr_mean_interval
return rr_std
def SDANN(self, rr_samples, average_period=5.0, normalise=False):
"""Calculate SDANN, the standard deviation of the average RR intervals calculated over short periods.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:param average_period: The averging period in minutes, defaults to 5.0
:type average_period: float, optional
:param normalise: normalise the SDANN against the average RR interval, defaults to False
:type normalise: bool, optional
:return: SDANN, the standard deviation of the average RR intervals calculated over short periods
:rtype: float
"""
average_period_samples = int(self.fs*average_period*60)
average_rr_intervals = []
sections = int((np.max(rr_samples)/average_period_samples)+0.5)
if sections<1:
sections = 1
for i in range(sections):
idx = np.where((rr_samples>=(i*average_period_samples)) &
(rr_samples<((i+1)*average_period_samples)))
idx = idx[0]
section_rr = rr_samples[idx[0]:idx[-1]+1]
avg_rr_int = np.mean(self._intervals(section_rr))
average_rr_intervals.append(avg_rr_int)
rr_std = np.std(average_rr_intervals)
if normalise:
rr_mean_interval = np.mean(average_rr_intervals)
rr_std = rr_std/rr_mean_interval
return rr_std
def RMSSD(self, rr_samples, normalise = False):
"""Calculate RMSSD (root mean square of successive differences).
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:param normalise: normalise the RMSSD against the average RR interval, defaults to False
:type normalise: bool, optional
:return: RMSSD (root mean square of successive differences)
:rtype: float
"""
succ_diffs = self._succ_diffs(rr_samples)
succ_diffs = succ_diffs*succ_diffs
rms = np.sqrt(np.mean(succ_diffs))
if normalise:
rms = rms / np.mean(self._intervals(rr_samples))
return rms
def SDSD(self, rr_samples):
"""Calculate SDSD (standard deviation of successive differences), the standard deviation of the successive differences between adjacent NNs.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:return: SDSD (standard deviation of successive differences)
:rtype: float
"""
succ_diffs = self._succ_diffs(rr_samples)
return np.std(succ_diffs)
def NN50(self, rr_samples):
"""Calculate NN50, the number of pairs of successive NNs that differ by more than 50 ms.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:return: NN50
:rtype: float
"""
succ_diffs = self._succ_diffs(rr_samples)
over_50 = np.where(abs(succ_diffs)>50)
over_50 = over_50[0]
return len(over_50)
def pNN50(self, rr_samples):
"""Calculate pNN50, the proportion of NN50 divided by total number of NNs.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:return: pNN50
:rtype: float
"""
return self.NN50(rr_samples)/(len(rr_samples)-1)
def NN20(self, rr_samples):
"""Calculate NN20, the number of pairs of successive NNs that differ by more than 20 ms.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:return: NN20
:rtype: float
"""
succ_diffs = self._succ_diffs(rr_samples)
over_20 = np.where(abs(succ_diffs)>20)
over_20 = over_20[0]
return len(over_20)
def pNN20(self, rr_samples):
"""Calculate pNN20, the proportion of NN20 divided by total number of NNs.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:return: pNN20
:rtype: float
"""
return self.NN20(rr_samples)/(len(rr_samples)-1)
def HR(self, rr_samples):
"""Calculate heart-rates from R peak samples.
:param rr_samples: R peak sample locations
:type rr_samples: array_like
:return: Heart-rates in BPM
:rtype: ndarray
"""
rr_intervals = np.diff(rr_samples)
heart_rates = 60.0/(rr_intervals/float(self.fs))
return heart_rates
def add_rr_error(self, rr_samples, error):
"""
Adds jitter to the heartrate timestamps.
The error and the rr_samples are in timestamps.
Returns the noisy timestamps in samples.
"""
if error==0:
return rr_samples
error_values = np.random.randint(-abs(error), abs(error)+1, len(rr_samples))
noisy_rr_samples = rr_samples+error_values
return noisy_rr_samples
def fAnalysis(self, rr_samples):
"""
Frequency analysis to calc self.lf, self.hf, returns the LF/HF-ratio and
also calculates the spectrum as pairs of (self.f_hr_axis,self.f_hr).
The input arrary is in sample points where R peaks have been detected.
"""
# discrete timestamps
self.hr_discrete = self._intervals(rr_samples) / 1000
# hr positions in time
self.t_hr_discrete = [i/self.fs for i in rr_samples[1:]]
# now let's create function which approximates the hr(t) relationship
self.hr_func = interp1d(self.t_hr_discrete, self.hr_discrete)
# we take 1024 samples for a linear time array for hr(t)
nsamp = 1000
# linear time array for the heartrate
self.t_hr_linear = np.linspace(self.t_hr_discrete[1],
self.t_hr_discrete[len(self.t_hr_discrete)-2],
num=nsamp)
# duration in secs of the heartrate array minus the ends b/c of the approx
duration = self.t_hr_discrete[len(self.t_hr_discrete)-2] - self.t_hr_discrete[1];
# heartrate linearly approximated between discrete samples
self.hr_linear = self.hr_func(self.t_hr_linear)
model = LombScargleFast().fit(self.t_hr_discrete, self.hr_discrete, 1E-2)
fmax = 1
fmin = 0.01
df = (fmax - fmin) / nsamp
self.f_hr = model.score_frequency_grid(fmin, df, nsamp)
self.f_hr_axis = fmin + df * np.arange(nsamp)
# lf
self.lf = 0
# hf
self.hf = 0
for i in range(0,int(nsamp/2)):
if (self.f_hr_axis[i] >= 0.04) and (self.f_hr_axis[i] <= 0.15):
self.lf = self.lf + self.f_hr[i]
if (self.f_hr_axis[i] >= 0.15) and (self.f_hr_axis[i] <= 0.4):
self.hf = self.hf + self.f_hr[i]
# hf
return self.lf/self.hf
def hrv_toolkit(self, rr_samples):
"""
Calculating the HRV parameters with the HRV toolkit.
It calls the commandline file get_hrv and then parses the output.
To be able to run check out the subdir HRV.src and compile/install
the binaries in /usr/local/bin.
rr_samples are the samples numbers of the R-peaks.
Returns the lf/hf ratio and also provides self.sdnn,
self.rmssd, self.lf and self.hf.
"""
m = np.hstack((np.vstack(self._timestamps(rr_samples[1:])/1000),
np.vstack(self._intervals(rr_samples)/1000)))
np.savetxt("hr.txt",m,delimiter = ' ', newline=" N\n")
r = subprocess.check_output(['get_hrv', '-s', '-R', 'hr.txt'])
r = str(r)
a = r.split("\\n")
for b in a:
x = b.split("=")
#print(x)
if "SDNN" in x[0]:
self.sdnn = float(x[1])
elif "rMSSD" in x[0]:
self.rmssd = float(x[1])
elif "LF PWR" in x[0]:
self.lf = float(x[1])
elif "HF PWR" in x[0]:
self.hf = float(x[1])
elif "LF/HF" in x[0]:
self.lfhf = float(x[1])
return self.lfhf