Download this file

204 lines (152 with data), 4.7 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""
This file has some functions to read or write files.
"""
import random
from preprocessOfApneaECG.mit2Segments import Mit2Segment, SEGMENTS_BASE_PATH, SEGMENTS_NUMBER_TRAIN
def get_database(database_name, rdf, numRead=-1, is_debug=False):
"""
Return the database you want to obtain.
:param list database_name: name of database
:param int rdf: 0 means read raw ecg, 1 means denoised ecg.
:param int numRead: the number you want read from database, -1 means read all data.
:param bool is_debug: whether is debug mode.
:return Mit2Segment list: database
"""
if database_name == ["apnea-ecg", "train"]:
base_floder_path = SEGMENTS_BASE_PATH + "train/"
elif database_name == ["apnea-ecg", "test"]:
base_floder_path = SEGMENTS_BASE_PATH + "test/"
else:
raise Exception("Error database name.")
# get the segment amount
read_file_path = base_floder_path + "/extra_info.txt"
with open(read_file_path) as f:
_ = f.readline()
attrs_value = f.readline().replace("\n", "").split(" ")
segment_amount = int(attrs_value[0])
# read ecg segment
read_num = 0
database = []
for segment_number in range(segment_amount):
if is_debug is True:
print("now read file: " + str(segment_number))
if numRead != -1 and read_num >= numRead:
break
eds = Mit2Segment()
eds.global_id = segment_number
eds.read_ecg_segment(rdf, database_name)
database.append(eds)
read_num += 1
if is_debug is True:
print("length of database: %s" % len(database))
return database
def my_random_func(length, min_value, max_value):
"""
Given a range and the list length, return a no duplicates list.
:param int length: list length.
:param int min_value: min of range.
:param int max_value: max of range.
:return list: random list.
"""
random_list = []
count = 0
while count < length:
random_value = random.randint(min_value, max_value)
if random_value not in random_list:
random_list.append(random_value)
count = count + 1
return random_list
def write_txt_file(list_info, write_file_path):
"""
Write list object to TXT file.
:param list list_info: List object you want to write.
:param string write_file_path: TXT file path.
:return: None
"""
with open(write_file_path, "w") as f:
for info in list_info:
f.write(str(info) + "\n")
def read_txt_file(read_file_path):
"""
Read TXT file to list object.
:param string read_file_path: TXT file path.
:return list: list object
"""
with open(read_file_path) as f:
list_t = []
lines = f.readlines()
for line in lines:
float_list = []
str_t = line.replace("[", "").replace("]", "").replace("\n", "").split(",")
for str_s in str_t:
if str_s != "":
float_list.append(float(str_s))
else:
print(read_file_path)
list_t.append(float_list)
return list_t
def get_noise_dataset():
"""
读取ECG噪音片段.
:return:
"""
id_path = "G:/Apnea-ecg/ecg_segments/data/denosing ecg data/" + "train/train_noise_id_matlab.txt"
id_set = []
with open(id_path) as f:
lines = f.readlines()
for line in lines:
line = line.replace("\n", "")
id_set.append(int(float(line)))
# read ecg segment
database = []
for segment_number in id_set:
print("now read file: " + str(segment_number))
eds = ECGDataSegment()
eds.global_id = segment_number
# read_file_path = base_floder_path + str(segment_number) + "/" + "ecg segment data.txt"
eds.read_denosing_ecg_segment("train")
database.append(eds)
print("length of database: %s" % len(database))
return database
def get_some_ecg_segments(number):
"""
Return some ecg samples for debug.
:param int number: the number of segments you want to sample from all ecg segments.
:return list: ecg segments.
"""
random_list = my_random_func(number, 0, SEGMENTS_NUMBER_TRAIN)
base_floder_path = SEGMENTS_BASE_PATH + "raw ecg data/train/"
sample_segments = []
for value in random_list:
print("read %s" % value)
eds = ECGDataSegment()
eds.global_id = value
read_file_path = base_floder_path + str(value) + "/" + "ecg segment data.txt"
eds.read_raw_ecg_segment(read_file_path)
sample_segments.append(eds)
return sample_segments
def get_A_N_number(database):
"""
求出数据库database中A和N的片段数量.
:param database:
:return:
"""
A_number = 0
N_number = 0
for segment in database:
if segment.label == 0:
N_number += 1
else:
A_number += 1
print("A number...")
print(A_number)
print("N number...")
print(N_number)
if __name__ == '__main__':
"""
Test sentences:
"""
# Read trainset of apnea-ecg
# trainset = get_database(["apnea-ecg", "train"], 0, is_debug=True)
# Read testset of apnea-ecg
testset = get_database(["apnea-ecg", "test"], 0, is_debug=True)