Switch to unified view

a b/preprocessOfApneaECG/fileIO.py
1
"""
2
    This file has some functions to read or write files.
3
"""
4
5
import random
6
from preprocessOfApneaECG.mit2Segments import Mit2Segment, SEGMENTS_BASE_PATH, SEGMENTS_NUMBER_TRAIN
7
8
9
def get_database(database_name, rdf, numRead=-1, is_debug=False):
10
    """
11
    Return the database you want to obtain.
12
    :param list database_name: name of database
13
    :param int rdf: 0 means read raw ecg, 1 means denoised ecg.
14
    :param int numRead: the number you want read from database, -1 means read all data.
15
    :param bool is_debug: whether is debug mode.
16
    :return Mit2Segment list: database
17
    """
18
    
19
    if database_name == ["apnea-ecg", "train"]:
20
        base_floder_path = SEGMENTS_BASE_PATH + "train/"
21
    elif database_name == ["apnea-ecg", "test"]:
22
        base_floder_path = SEGMENTS_BASE_PATH + "test/"
23
    else:
24
        raise Exception("Error database name.")
25
    
26
    # get the segment amount
27
    read_file_path = base_floder_path + "/extra_info.txt"
28
    with open(read_file_path) as f:
29
        _ = f.readline()
30
        attrs_value = f.readline().replace("\n", "").split(" ")
31
        segment_amount = int(attrs_value[0])
32
    # read ecg segment
33
    read_num = 0
34
    database = []
35
    for segment_number in range(segment_amount):
36
        if is_debug is True:
37
            print("now read file: " + str(segment_number))
38
        if numRead != -1 and read_num >= numRead:
39
            break
40
        eds = Mit2Segment()
41
        eds.global_id = segment_number
42
        eds.read_ecg_segment(rdf, database_name)
43
        database.append(eds)
44
        read_num += 1
45
    if is_debug is True:
46
        print("length of database: %s" % len(database))
47
    return database
48
49
50
def my_random_func(length, min_value, max_value):
51
    """
52
    Given a range and the list length, return a no duplicates list.
53
    :param int length: list length.
54
    :param int min_value: min of range.
55
    :param int max_value: max of range.
56
    :return list: random list.
57
    """
58
    random_list = []
59
    count = 0
60
    while count < length:
61
        random_value = random.randint(min_value, max_value)
62
        if random_value not in random_list:
63
            random_list.append(random_value)
64
            count = count + 1
65
    
66
    return random_list
67
68
69
def write_txt_file(list_info, write_file_path):
70
    """
71
    Write list object to TXT file.
72
    :param list list_info: List object you want to write.
73
    :param string write_file_path: TXT file path.
74
    :return: None
75
    """
76
    with open(write_file_path, "w") as f:
77
        for info in list_info:
78
            f.write(str(info) + "\n")
79
80
81
def read_txt_file(read_file_path):
82
    """
83
    Read TXT file to list object.
84
    :param string read_file_path: TXT file path.
85
    :return list: list object
86
    """
87
    with open(read_file_path) as f:
88
        list_t = []
89
        lines = f.readlines()
90
        for line in lines:
91
            float_list = []
92
            str_t = line.replace("[", "").replace("]", "").replace("\n", "").split(",")
93
            for str_s in str_t:
94
                if str_s != "":
95
                    float_list.append(float(str_s))
96
                else:
97
                    print(read_file_path)
98
            list_t.append(float_list)
99
    
100
    return list_t
101
102
103
104
105
def get_noise_dataset():
106
    """
107
    读取ECG噪音片段.
108
    :return:
109
    """
110
    
111
    id_path = "G:/Apnea-ecg/ecg_segments/data/denosing ecg data/" + "train/train_noise_id_matlab.txt"
112
    
113
    id_set = []
114
    with open(id_path) as f:
115
        lines = f.readlines()
116
        for line in lines:
117
            line = line.replace("\n", "")
118
            id_set.append(int(float(line)))
119
    
120
    # read ecg segment
121
    database = []
122
    for segment_number in id_set:
123
        print("now read file: " + str(segment_number))
124
        
125
        eds = ECGDataSegment()
126
        eds.global_id = segment_number
127
        # read_file_path = base_floder_path + str(segment_number) + "/" + "ecg segment data.txt"
128
        eds.read_denosing_ecg_segment("train")
129
        database.append(eds)
130
    
131
    print("length of database: %s" % len(database))
132
    
133
    return database
134
135
136
def get_some_ecg_segments(number):
137
    """
138
    Return some ecg samples for debug.
139
    :param int number: the number of segments you want to sample from all ecg segments.
140
    :return list: ecg segments.
141
    """
142
    
143
    random_list = my_random_func(number, 0, SEGMENTS_NUMBER_TRAIN)
144
    
145
    base_floder_path = SEGMENTS_BASE_PATH + "raw ecg data/train/"
146
    
147
    sample_segments = []
148
    for value in random_list:
149
        print("read %s" % value)
150
        eds = ECGDataSegment()
151
        eds.global_id = value
152
        read_file_path = base_floder_path + str(value) + "/" + "ecg segment data.txt"
153
        eds.read_raw_ecg_segment(read_file_path)
154
        sample_segments.append(eds)
155
    return sample_segments
156
157
158
def get_A_N_number(database):
159
    """
160
    求出数据库database中A和N的片段数量.
161
    :param database:
162
    :return:
163
    """
164
    
165
    A_number = 0
166
    N_number = 0
167
    for segment in database:
168
        if segment.label == 0:
169
            N_number += 1
170
        else:
171
            A_number += 1
172
    print("A number...")
173
    print(A_number)
174
    print("N number...")
175
    print(N_number)
176
177
178
if __name__ == '__main__':
179
    """
180
    Test sentences:
181
    """
182
    # Read trainset of apnea-ecg
183
    # trainset = get_database(["apnea-ecg", "train"], 0, is_debug=True)
184
    # Read testset of apnea-ecg
185
    testset = get_database(["apnea-ecg", "test"], 0, is_debug=True)
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203