[03464c]: / datasets / a_dataset.py

Download this file

197 lines (181 with data), 10.3 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import os.path
from datasets import load_file
from datasets import get_survival_y_true
from datasets.basic_dataset import BasicDataset
import numpy as np
import pandas as pd
import torch
class ADataset(BasicDataset):
"""
A dataset class for gene expression dataset.
File should be prepared as '/path/to/data/A.tsv'.
For each omics file, each columns should be each sample and each row should be each molecular feature.
"""
def __init__(self, param):
"""
Initialize this dataset class.
"""
BasicDataset.__init__(self, param)
self.omics_dims = []
# Load data for A
A_df = load_file(param, 'A')
# Get the sample list
if param.use_sample_list:
sample_list_path = os.path.join(param.data_root, 'sample_list.tsv') # get the path of sample list
self.sample_list = np.loadtxt(sample_list_path, delimiter='\t', dtype='<U32')
else:
self.sample_list = A_df.columns
# Get the feature list for A
if param.use_feature_lists:
feature_list_A_path = os.path.join(param.data_root, 'feature_list_A.tsv') # get the path of feature list
feature_list_A = np.loadtxt(feature_list_A_path, delimiter='\t', dtype='<U32')
else:
feature_list_A = A_df.index
A_df = A_df.loc[feature_list_A, self.sample_list]
self.A_dim = A_df.shape[0]
self.sample_num = A_df.shape[1]
A_array = A_df.values
if self.param.add_channel:
# Add one dimension for the channel
A_array = A_array[np.newaxis, :, :]
self.A_tensor_all = torch.Tensor(A_array)
self.omics_dims.append(self.A_dim)
self.class_num = 0
if param.downstream_task == 'classification':
# Load labels
labels_path = os.path.join(param.data_root, 'labels.tsv') # get the path of the label
labels_df = pd.read_csv(labels_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.labels_array = labels_df.iloc[:, -1].values
# Get the class number
self.class_num = len(labels_df.iloc[:, -1].unique())
elif param.downstream_task == 'regression':
# Load target values
values_path = os.path.join(param.data_root, 'values.tsv') # get the path of the target value
values_df = pd.read_csv(values_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.values_array = values_df.iloc[:, -1].astype(float).values
self.values_max = self.values_array.max()
self.values_min = self.values_array.min()
elif param.downstream_task == 'survival':
# Load survival data
survival_path = os.path.join(param.data_root, 'survival.tsv') # get the path of the survival data
survival_df = pd.read_csv(survival_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.survival_T_array = survival_df.iloc[:, -2].astype(float).values
self.survival_E_array = survival_df.iloc[:, -1].values
self.survival_T_max = self.survival_T_array.max()
self.survival_T_min = self.survival_T_array.min()
if param.survival_loss == 'MTLR':
self.y_true_tensor = get_survival_y_true(param, self.survival_T_array, self.survival_E_array)
if param.stratify_label:
labels_path = os.path.join(param.data_root, 'labels.tsv') # get the path of the label
labels_df = pd.read_csv(labels_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.labels_array = labels_df.iloc[:, -1].values
elif param.downstream_task == 'multitask':
# Load labels
labels_path = os.path.join(param.data_root, 'labels.tsv') # get the path of the label
labels_df = pd.read_csv(labels_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.labels_array = labels_df.iloc[:, -1].values
# Get the class number
self.class_num = len(labels_df.iloc[:, -1].unique())
# Load target values
values_path = os.path.join(param.data_root, 'values.tsv') # get the path of the target value
values_df = pd.read_csv(values_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.values_array = values_df.iloc[:, -1].astype(float).values
self.values_max = self.values_array.max()
self.values_min = self.values_array.min()
# Load survival data
survival_path = os.path.join(param.data_root, 'survival.tsv') # get the path of the survival data
survival_df = pd.read_csv(survival_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.survival_T_array = survival_df.iloc[:, -2].astype(float).values
self.survival_E_array = survival_df.iloc[:, -1].values
self.survival_T_max = self.survival_T_array.max()
self.survival_T_min = self.survival_T_array.min()
if param.survival_loss == 'MTLR':
self.y_true_tensor = get_survival_y_true(param, self.survival_T_array, self.survival_E_array)
elif param.downstream_task == 'alltask':
# Load labels
self.labels_array = []
self.class_num = []
for i in range(param.task_num-2):
labels_path = os.path.join(param.data_root, 'labels_'+str(i+1)+'.tsv') # get the path of the label
labels_df = pd.read_csv(labels_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.labels_array.append(labels_df.iloc[:, -1].values)
# Get the class number
self.class_num.append(len(labels_df.iloc[:, -1].unique()))
# Load target values
values_path = os.path.join(param.data_root, 'values.tsv') # get the path of the target value
values_df = pd.read_csv(values_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.values_array = values_df.iloc[:, -1].astype(float).values
self.values_max = self.values_array.max()
self.values_min = self.values_array.min()
# Load survival data
survival_path = os.path.join(param.data_root, 'survival.tsv') # get the path of the survival data
survival_df = pd.read_csv(survival_path, sep='\t', header=0, index_col=0).loc[self.sample_list, :]
self.survival_T_array = survival_df.iloc[:, -2].astype(float).values
self.survival_E_array = survival_df.iloc[:, -1].values
self.survival_T_max = self.survival_T_array.max()
self.survival_T_min = self.survival_T_array.min()
if param.survival_loss == 'MTLR':
self.y_true_tensor = get_survival_y_true(param, self.survival_T_array, self.survival_E_array)
def __getitem__(self, index):
"""
Return a data point and its metadata information.
Returns a dictionary that contains A_tensor, label and index
input_omics (list) -- a list of input omics tensor
label (int) -- label of the sample
index (int) -- the index of this data point
"""
# Get the tensor of A
if self.param.add_channel:
A_tensor = self.A_tensor_all[:, :, index]
else:
A_tensor = self.A_tensor_all[:, index]
# Get the tensor of B
if self.param.ch_separate:
B_tensor = list(np.zeros(23))
else:
B_tensor = 0
# Get the tensor of C
C_tensor = 0
if self.param.downstream_task == 'classification':
# Get label
label = self.labels_array[index]
return {'input_omics': [A_tensor, B_tensor, C_tensor], 'label': label, 'index': index}
elif self.param.downstream_task == 'regression':
# Get target value
value = self.values_array[index]
return {'input_omics': [A_tensor, B_tensor, C_tensor], 'value': value, 'index': index}
elif self.param.downstream_task == 'survival':
# Get survival T and E
survival_T = self.survival_T_array[index]
survival_E = self.survival_E_array[index]
y_true = self.y_true_tensor[index, :]
return {'input_omics': [A_tensor, B_tensor, C_tensor], 'survival_T': survival_T, 'survival_E': survival_E, 'y_true': y_true, 'index': index}
elif self.param.downstream_task == 'multitask':
# Get label
label = self.labels_array[index]
# Get target value
value = self.values_array[index]
# Get survival T and E
survival_T = self.survival_T_array[index]
survival_E = self.survival_E_array[index]
y_true = self.y_true_tensor[index, :]
return {'input_omics': [A_tensor, B_tensor, C_tensor], 'label': label, 'value': value, 'survival_T': survival_T, 'survival_E': survival_E, 'y_true': y_true, 'index': index}
elif self.param.downstream_task == 'alltask':
# Get label
label = []
for i in range(self.param.task_num - 2):
label.append(self.labels_array[i][index])
# Get target value
value = self.values_array[index]
# Get survival T and E
survival_T = self.survival_T_array[index]
survival_E = self.survival_E_array[index]
y_true = self.y_true_tensor[index, :]
return {'input_omics': [A_tensor, B_tensor, C_tensor], 'label': label, 'value': value, 'survival_T': survival_T, 'survival_E': survival_E, 'y_true': y_true, 'index': index}
else:
return {'input_omics': [A_tensor, B_tensor, C_tensor], 'index': index}
def __len__(self):
"""
Return the number of data points in the dataset.
"""
return self.sample_num