[6536f9]: / run.py

Download this file

159 lines (113 with data), 6.2 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import os
import torch
import random
from torch import nn
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
import neural
from class_ecgdataset import ECGDataset
from class_deepdataset import DeepDatasetV2
from train_and_eval import train_model, evaluate_model
def run(NUM_SEGMENTS, NUM_SECONDS, NUM_BATCH, LEADS, NUM_EPOCHS, DATA_PATH, FS, NUM_PATIENTS=None):
'''
Function that:
- loads ECG data, prepares the dataset, performs a stratified train-test split
- trains a DeepECG model and evaluates its performance
It operates on a specified number of subjects (if provided), segments, and leads.
Returns:
- accuracy (float): the accuracy of the model on the test set
- total_train_time (int): the total training time in seconds.
- total_eval_time (int): the total evaluation time in seconds.
'''
torch.manual_seed(44)
random.seed(44)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f'Used device: {device}')
print('Loading signals...')
patient_ids_path = os.path.join(DATA_PATH, 'patient_ids.txt')
with open(patient_ids_path, 'r') as f:
PATIENT_IDS = [line.strip() for line in f.readlines()]
if NUM_PATIENTS is not None:
PATIENT_IDS = PATIENT_IDS[:NUM_PATIENTS]
full_dataset = ECGDataset(DATA_PATH, PATIENT_IDS, FS, NUM_SEGMENTS, NUM_SECONDS, LEADS)
# stratified split to maintain equal distribution of each patient's data
train_indices, test_indices = train_test_split(range(len(full_dataset)), test_size=0.5, stratify=full_dataset.labels)
train_dataset = Subset(full_dataset, train_indices)
test_dataset = Subset(full_dataset, test_indices)
train_loader = DataLoader(train_dataset, batch_size=NUM_BATCH, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=NUM_BATCH, shuffle=False)
single_batch_segments, _ = next(iter(train_loader))
output_shape = len(PATIENT_IDS)
hidden_units = 32
dummy_network = neural.DeepECG_DUMMY(len(LEADS), hidden_units, output_shape).to(device)
single_batch_segments = single_batch_segments.to(device)
final_features = dummy_network(single_batch_segments)
model = neural.DeepECG(len(LEADS), hidden_units, output_shape, final_features).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(), lr=0.01)
total_train_time = train_model(model=model,
data_loader=train_loader,
loss_fn=loss_fn,
optimizer=optimizer,
device=device,
num_epochs=NUM_EPOCHS,
output_shape = output_shape
)
accuracy, total_eval_time = evaluate_model(model=model,
test_loader=test_loader,
loss_fn=loss_fn,
device=device,
output_shape = output_shape
)
return accuracy, total_train_time, total_eval_time
def run_deepecg(NUM_BATCH, LEAD, NUM_EPOCHS, DATA_PATH, FS):
'''
Function that:
- loads ECG data, prepares the dataset using DeepECG methodology, performs a stratified train-test split
- trains a DeepECG model and evaluates its performance
It operates on a specified lead of ECG recordings.
Returns:
- accuracy (float): the accuracy of the model on the test set
- total_train_time (int): the total training time in seconds.
- total_eval_time (int): the total evaluation time in seconds.
'''
torch.manual_seed(44)
random.seed(44)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f'Used device: {device}')
print('Loading signals...')
patient_ids_path = os.path.join(DATA_PATH, 'patient_ids.txt')
with open(patient_ids_path, 'r') as f:
PATIENT_IDS = [line.strip() for line in f.readlines()]
full_dataset = DeepDatasetV2(DATA_PATH, PATIENT_IDS, FS, LEAD)
# stratified split to maintain equal distribution of each patient's data
train_indices, test_indices = train_test_split(range(len(full_dataset)), test_size=0.5, stratify=full_dataset.labels)
train_dataset = Subset(full_dataset, train_indices)
test_dataset = Subset(full_dataset, test_indices)
train_loader = DataLoader(train_dataset, batch_size=NUM_BATCH, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=NUM_BATCH, shuffle=False)
single_batch_segments, _ = next(iter(train_loader))
num_leads = 1
output_shape = len(PATIENT_IDS)
hidden_units = 32
dummy_network = neural.DeepECG_DUMMY(num_leads, hidden_units, output_shape).to(device)
single_batch_segments = single_batch_segments.to(device)
final_features = dummy_network(single_batch_segments)
model = neural.DeepECG(num_leads, hidden_units, output_shape, final_features).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(params=model.parameters(), lr=0.01)
total_train_time = train_model(model=model,
data_loader=train_loader,
loss_fn=loss_fn,
optimizer=optimizer,
device=device,
num_epochs=NUM_EPOCHS,
output_shape = output_shape
)
accuracy, total_eval_time = evaluate_model(model=model,
test_loader=test_loader,
loss_fn=loss_fn,
device=device,
output_shape = output_shape
)
return accuracy, total_train_time, total_eval_time