#!/usr/bin/env python
# Do *not* edit this script.
# These are helper functions that you can use with your code.
import os, numpy as np
import shutil
# Check if a variable is a number or represents a number.
def is_number(x):
try:
float(x)
return True
except (ValueError, TypeError):
return False
# Check if a variable is an integer or represents an integer.
def is_integer(x):
if is_number(x):
return float(x).is_integer()
else:
return False
# Check if a variable is a a finite number or represents a finite number.
def is_finite_number(x):
if is_number(x):
return np.isfinite(float(x))
else:
return False
# (Re)sort leads using the standard order of leads for the standard twelve-lead ECG.
def sort_leads(leads):
x = ('I', 'II', 'III', 'aVR', 'aVL', 'aVF', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6')
leads = sorted(leads, key=lambda lead: (x.index(lead) if lead in x else len(x) + leads.index(lead)))
return tuple(leads)
# Find header and recording files.
def find_challenge_files(data_directory):
header_files = list()
recording_files = list()
for f in os.listdir(data_directory):
root, extension = os.path.splitext(f)
if not root.startswith('.') and extension=='.hea':
header_file = os.path.join(data_directory, root + '.hea')
recording_file = os.path.join(data_directory, root + '.mat')
if os.path.isfile(header_file) and os.path.isfile(recording_file):
header_files.append(header_file)
recording_files.append(recording_file)
return header_files, recording_files
# Load header file as a string.
def load_header(header_file):
with open(header_file, 'r') as f:
header = f.read()
return header
# Load recording file as an array.
def load_recording(recording_file, header=None, leads=None, key='val'):
from scipy.io import loadmat
recording = loadmat(recording_file)[key]
if header and leads:
recording = choose_leads(recording, header, leads)
return recording
# Choose leads from the recording file.
def choose_leads(recording, header, leads):
num_leads = len(leads)
num_samples = np.shape(recording)[1]
chosen_recording = np.zeros((num_leads, num_samples), recording.dtype)
available_leads = get_leads(header)
for i, lead in enumerate(leads):
if lead in available_leads:
j = available_leads.index(lead)
chosen_recording[i, :] = recording[j, :]
return chosen_recording
# Get recording ID.
def get_recording_id(header):
recording_id = None
for i, l in enumerate(header.split('\n')):
if i==0:
try:
recording_id = l.split(' ')[0]
except:
pass
else:
break
return recording_id
# Get leads from header.
def get_leads(header):
leads = list()
for i, l in enumerate(header.split('\n')):
entries = l.split(' ')
if i==0:
num_leads = int(entries[1])
elif i<=num_leads:
leads.append(entries[-1])
else:
break
return tuple(leads)
# Get age from header.
def get_age(header):
age = None
for l in header.split('\n'):
if l.startswith('#Age'):
try:
age = float(l.split(': ')[1].strip())
except:
age = float('nan')
return age
# Get sex from header.
def get_sex(header):
sex = None
for l in header.split('\n'):
if l.startswith('#Sex'):
try:
sex = l.split(': ')[1].strip()
except:
pass
return sex
# Get frequency from header.
def get_num_leads(header):
num_leads = None
for i, l in enumerate(header.split('\n')):
if i==0:
try:
num_samples = float(l.split(' ')[1])
except:
pass
else:
break
return num_leads
# Get frequency from header.
def get_frequency(header):
frequency = None
for i, l in enumerate(header.split('\n')):
if i==0:
try:
frequency = float(l.split(' ')[2])
except:
pass
else:
break
return frequency
# Get number of samples from header.
def get_num_samples(header):
num_samples = None
for i, l in enumerate(header.split('\n')):
if i==0:
try:
num_samples = float(l.split(' ')[3])
except:
pass
else:
break
return num_samples
# Get analog-to-digital converter (ADC) gains from header.
def get_adc_gains(header, leads):
adc_gains = np.zeros(len(leads))
for i, l in enumerate(header.split('\n')):
entries = l.split(' ')
if i==0:
num_leads = int(entries[1])
elif i<=num_leads:
current_lead = entries[-1]
if current_lead in leads:
j = leads.index(current_lead)
try:
adc_gains[j] = float(entries[2].split('/')[0])
except:
pass
else:
break
return adc_gains
# Get baselines from header.
def get_baselines(header, leads):
baselines = np.zeros(len(leads))
for i, l in enumerate(header.split('\n')):
entries = l.split(' ')
if i==0:
num_leads = int(entries[1])
elif i<=num_leads:
current_lead = entries[-1]
if current_lead in leads:
j = leads.index(current_lead)
try:
baselines[j] = float(entries[4].split('/')[0])
except:
pass
else:
break
return baselines
# Get labels from header.
def get_labels(header):
labels = list()
for l in header.split('\n'):
if l.startswith('#Dx'):
try:
entries = l.split(': ')[1].split(',')
for entry in entries:
labels.append(entry.strip())
except:
pass
return labels
# Save outputs from model.
def save_outputs(output_file, recording_id, classes, labels, probabilities):
# Format the model outputs.
recording_string = '#{}'.format(recording_id)
class_string = ','.join(str(c) for c in classes)
label_string = ','.join(str(l) for l in labels)
probabilities_string = ','.join(str(p) for p in probabilities)
output_string = recording_string + '\n' + class_string + '\n' + label_string + '\n' + probabilities_string + '\n'
# Save the model outputs.
if os.path.isfile(output_file) :
os.remove(output_file)
with open(output_file, 'w') as f:
f.write(output_string)
# Load outputs from model.
def load_outputs(output_file):
with open(output_file, 'r') as f:
for i, l in enumerate(f):
if i==0:
recording_id = l[1:] if len(l)>1 else None
elif i==1:
classes = tuple(entry.strip() for entry in l.split(','))
elif i==2:
labels = tuple(entry.strip() for entry in l.split(','))
elif i==3:
probabilities = tuple(float(entry) if is_finite_number(entry) else float('nan') for entry in l.split(','))
else:
break
return recording_id, classes, labels, probabilities