Switch to side-by-side view

--- a
+++ b/preprocessOfApneaECG/fileIO.py
@@ -0,0 +1,203 @@
+"""
+	This file has some functions to read or write files.
+"""
+
+import random
+from preprocessOfApneaECG.mit2Segments import Mit2Segment, SEGMENTS_BASE_PATH, SEGMENTS_NUMBER_TRAIN
+
+
+def get_database(database_name, rdf, numRead=-1, is_debug=False):
+	"""
+	Return the database you want to obtain.
+	:param list database_name: name of database
+	:param int rdf: 0 means read raw ecg, 1 means denoised ecg.
+	:param int numRead: the number you want read from database, -1 means read all data.
+	:param bool is_debug: whether is debug mode.
+	:return Mit2Segment list: database
+	"""
+	
+	if database_name == ["apnea-ecg", "train"]:
+		base_floder_path = SEGMENTS_BASE_PATH + "train/"
+	elif database_name == ["apnea-ecg", "test"]:
+		base_floder_path = SEGMENTS_BASE_PATH + "test/"
+	else:
+		raise Exception("Error database name.")
+	
+	# get the segment amount
+	read_file_path = base_floder_path + "/extra_info.txt"
+	with open(read_file_path) as f:
+		_ = f.readline()
+		attrs_value = f.readline().replace("\n", "").split(" ")
+		segment_amount = int(attrs_value[0])
+	# read ecg segment
+	read_num = 0
+	database = []
+	for segment_number in range(segment_amount):
+		if is_debug is True:
+			print("now read file: " + str(segment_number))
+		if numRead != -1 and read_num >= numRead:
+			break
+		eds = Mit2Segment()
+		eds.global_id = segment_number
+		eds.read_ecg_segment(rdf, database_name)
+		database.append(eds)
+		read_num += 1
+	if is_debug is True:
+		print("length of database: %s" % len(database))
+	return database
+
+
+def my_random_func(length, min_value, max_value):
+	"""
+	Given a range and the list length, return a no duplicates list.
+	:param int length: list length.
+	:param int min_value: min of range.
+	:param int max_value: max of range.
+	:return list: random list.
+	"""
+	random_list = []
+	count = 0
+	while count < length:
+		random_value = random.randint(min_value, max_value)
+		if random_value not in random_list:
+			random_list.append(random_value)
+			count = count + 1
+	
+	return random_list
+
+
+def write_txt_file(list_info, write_file_path):
+	"""
+	Write list object to TXT file.
+	:param list list_info: List object you want to write.
+	:param string write_file_path: TXT file path.
+	:return: None
+	"""
+	with open(write_file_path, "w") as f:
+		for info in list_info:
+			f.write(str(info) + "\n")
+
+
+def read_txt_file(read_file_path):
+	"""
+	Read TXT file to list object.
+	:param string read_file_path: TXT file path.
+	:return list: list object
+	"""
+	with open(read_file_path) as f:
+		list_t = []
+		lines = f.readlines()
+		for line in lines:
+			float_list = []
+			str_t = line.replace("[", "").replace("]", "").replace("\n", "").split(",")
+			for str_s in str_t:
+				if str_s != "":
+					float_list.append(float(str_s))
+				else:
+					print(read_file_path)
+			list_t.append(float_list)
+	
+	return list_t
+
+
+
+
+def get_noise_dataset():
+	"""
+	读取ECG噪音片段.
+	:return:
+	"""
+	
+	id_path = "G:/Apnea-ecg/ecg_segments/data/denosing ecg data/" + "train/train_noise_id_matlab.txt"
+	
+	id_set = []
+	with open(id_path) as f:
+		lines = f.readlines()
+		for line in lines:
+			line = line.replace("\n", "")
+			id_set.append(int(float(line)))
+	
+	# read ecg segment
+	database = []
+	for segment_number in id_set:
+		print("now read file: " + str(segment_number))
+		
+		eds = ECGDataSegment()
+		eds.global_id = segment_number
+		# read_file_path = base_floder_path + str(segment_number) + "/" + "ecg segment data.txt"
+		eds.read_denosing_ecg_segment("train")
+		database.append(eds)
+	
+	print("length of database: %s" % len(database))
+	
+	return database
+
+
+def get_some_ecg_segments(number):
+	"""
+	Return some ecg samples for debug.
+	:param int number: the number of segments you want to sample from all ecg segments.
+	:return list: ecg segments.
+	"""
+	
+	random_list = my_random_func(number, 0, SEGMENTS_NUMBER_TRAIN)
+	
+	base_floder_path = SEGMENTS_BASE_PATH + "raw ecg data/train/"
+	
+	sample_segments = []
+	for value in random_list:
+		print("read %s" % value)
+		eds = ECGDataSegment()
+		eds.global_id = value
+		read_file_path = base_floder_path + str(value) + "/" + "ecg segment data.txt"
+		eds.read_raw_ecg_segment(read_file_path)
+		sample_segments.append(eds)
+	return sample_segments
+
+
+def get_A_N_number(database):
+	"""
+	求出数据库database中A和N的片段数量.
+	:param database:
+	:return:
+	"""
+	
+	A_number = 0
+	N_number = 0
+	for segment in database:
+		if segment.label == 0:
+			N_number += 1
+		else:
+			A_number += 1
+	print("A number...")
+	print(A_number)
+	print("N number...")
+	print(N_number)
+
+
+if __name__ == '__main__':
+	"""
+	Test sentences:
+	"""
+	# Read trainset of apnea-ecg
+	# trainset = get_database(["apnea-ecg", "train"], 0, is_debug=True)
+	# Read testset of apnea-ecg
+	testset = get_database(["apnea-ecg", "test"], 0, is_debug=True)
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+