[42b7b1]: / src / LFBNet / utilities / compute_surrogate_features.py

Download this file

304 lines (239 with data), 12.6 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
"""This script computes the surrogate features of the MIP images such as sTMTV and sDmax.
"""
import os
import numpy as np
import pandas as pd
import glob
import csv
import time
from numpy import ndarray
from tqdm import tqdm
from typing import List, Tuple
import random
from scipy.ndimage import label
import nibabel as nib
random.seed(7)
class ComputesTMTVsDmaxFromNii:
""" computes the surrogate features from given 2D coronal and sagittal masks (ground truths).
Args:
data_path: path to the sagittal and coronal masks.
get_identifier: a label to identify mask or ground truth (gt) from expert and predicted masks (prd).
Returns:
Returns a saved csv files with the computed surrogate biomarkers.
"""
def __init__(self, data_path: str = None, get_identifier: str = "predicted"):
self.data_path = data_path
self.get_identifier = get_identifier
def compute_and_save_surrogate_features(self):
""" Compute the surrogate features and save as csv file.
Returns:
Saved surrogate features from either the expert or from the ai in csv file.
"""
# get list of data to compute the surrogate features
case_ids = [index for index in os.listdir(self.data_path) if not str(index).endswith('csv')]
# voxel size of the images is considered constant
voxel_size = [4.0, 4.0, 4.0]
def get_features(mask_to_compute_feature_on: ndarray = None):
""" Computes the surrogate MTV and surrogate dissemination.
Args:
mask_to_compute_feature_on: binary mask to compute the surrogate features.
Returns:
Returns the computed surrogate MTV and dissemination along the height (z) and width (xy) of the image.
"""
prof_xy, prof_z, stmtv = ComputesTMTVsDmaxFromNii.num_white_pixels(mask_to_compute_feature_on.copy())
sdmax_xy = ComputesTMTVsDmaxFromNii.compute_surrogate_dissemination(prof_xy, percentile=[2, 98])
sdmax_z = ComputesTMTVsDmaxFromNii.compute_surrogate_dissemination(prof_z, percentile=[2, 98])
return stmtv, sdmax_xy, sdmax_z
# store all calculated features:
case_name_sagittal_coronal_axial_x_y_z_features = [
['PID', 'sTMTV_sagittal', 'sTMTV_coronal', "sTMTV_(mm\u00b2)", 'Sagittal_xy', 'Sagittal_z', 'Coronal_xy',
'Coronal_z', "sDmax_(mm)", "sDmax_(mm)_euclidean", 'X', 'Y', 'Z']]
for n, id in tqdm(enumerate(case_ids), total=(len(case_ids))):
img_folder = os.path.join(self.data_path, str(id))
case_ids_img_name = os.listdir(img_folder)
# if there is any ids that ends with _0 or _1, the coronal and sagittal images are saved separately,
# otherwise not. sagittal with '_o' and coronal with '_1'.
saved_sagittal_coronal_separately = any([True for case_id in case_ids_img_name if "_sagittal" in case_id])
# dictionary to store the values for sagittal and coronal features
sagittal = dict(smtv=0, sdmax_xy=0, sdmax_z=0)
coronal = dict(smtv=0, sdmax_xy=0, sdmax_z=0)
for index, read_image in tqdm(enumerate(case_ids_img_name), total=(len(case_ids_img_name))):
# get number of files ending with the identifier, i.e., predicted (prd) or ground truth (gt).
if str(self.get_identifier) in str(read_image):
read_image_path = os.path.join(img_folder, read_image)
mask, _ = ComputesTMTVsDmaxFromNii.get_image(read_image_path)
if saved_sagittal_coronal_separately:
# We have sagittal and coronal images saved separately.
if "_sagittal" in str(read_image): # sagittal
sagittal['smtv'], sagittal['sdmax_xy'], sagittal['sdmax_z'] = get_features(mask)
elif "_coronal" in str(read_image): # coronal
coronal['smtv'], coronal['sdmax_xy'], coronal['sdmax_z'] = get_features(mask)
else:
# sagittal and coronal given as one nifti image.
for sagittal_coronal in range(2):
mask_ = mask[sagittal_coronal]
if sagittal_coronal == 0: # sagittal
sagittal['smtv'], sagittal['sdmax_xy'], sagittal['sdmax_z'] = get_features(mask_)
else: # coroanl
coronal['smtv'], coronal['sdmax_xy'], coronal['sdmax_z'] = get_features(mask_)
# combine the sagittal and coronal features, and compute them in physical space.
sTMTV, sDmax_abs, sDmax_sqrt = ComputesTMTVsDmaxFromNii.compute_features_in_physical_space(
sagittal, coronal
)
# add the given patient's features into all dataset.
case_name_sagittal_coronal_axial_x_y_z_features.append(
[str(id), sagittal['smtv'], coronal['smtv'], sTMTV, sagittal['sdmax_xy'], sagittal['sdmax_z'],
coronal['sdmax_xy'], coronal['sdmax_z'], sDmax_abs, sDmax_sqrt, voxel_size[0], voxel_size[1],
voxel_size[2]]
)
# save the computed features into csv file
ComputesTMTVsDmaxFromNii.write_it_to_csv(
data=case_name_sagittal_coronal_axial_x_y_z_features, dir_name=self.data_path,
identifier=self.get_identifier
)
return case_name_sagittal_coronal_axial_x_y_z_features
@staticmethod
def compute_features_in_physical_space(
sagittal: dict = None, coronal: dict = None, voxel_size=None
):
""" Compute features in physical space. Basically multiply the given TMV or dissemination by the voxel space.
Args:
sagittal: sagittal view features.
coronal: coronal view features
voxel_size: voxel size of the mask
Returns:
Returns the biomarker features in physical space, e.g., in mm.
"""
# add the surrogate volume of the coronal and sagittal images.
if voxel_size is None:
voxel_size = [4.0, 4.0, 4.0]
sTMTV = (sagittal['smtv'] + coronal['smtv']) * voxel_size[0] * voxel_size[2]
# add the dissemination of the coronal and sagittal images. Absolute distance.
sDmax_abs = sagittal['sdmax_xy'] * voxel_size[0] + sagittal['sdmax_z'] * voxel_size[2] + coronal['sdmax_xy'] * \
voxel_size[0] + coronal['sdmax_z'] * voxel_size[2]
# add the dissemination of the coronal and sagittal images. Square distance.
sDmax_ecludian = np.sqrt(
np.power(sagittal['sdmax_xy'] * voxel_size[0], 2) + np.power(
sagittal['sdmax_z'] * voxel_size[2], 2
) + np.power(
coronal['sdmax_xy'] * voxel_size[0], 2
) + np.power(coronal['sdmax_z'] * voxel_size[2], 2)
)
return sTMTV, sDmax_abs, sDmax_ecludian
@staticmethod
def threshold(input_image: ndarray = None, cut_off: float = 0.5):
""" threshold the input array value using the input cut-off.
Args:
input_image: array data to threshold
cut_off: thresholding value
Returns:
Returns the binary image threshold from the cut-off value array.
"""
input_image = np.array(input_image)
input_image[input_image < cut_off] = 0
input_image[input_image >= cut_off] = 1
return input_image
@staticmethod
def num_white_pixels(input_image):
""" Computes the surrogate volume of the mask, and the projection profile of the non-zero values into
horizontal (z)
and vertical profiles (xy).
Args:
Input: binary image mask img.
Returns:
Returns the number of pixels across each row of the image in profile_axis_z
and number of pixels across each column of the image in profile axis_xy.
"""
input_image = ComputesTMTVsDmaxFromNii.threshold(input_image)
# remove small nodes < 4.8 cm2
input_image = ComputesTMTVsDmaxFromNii.remove_outliers(input_image, minimum_cc_sum=30)
profile_axis_z, profile_axis_xy = [], []
for index in range(input_image.shape[0]):
profile_axis_xy.append(np.sum(input_image[index, :] > 0))
for index in range(input_image.shape[1]):
profile_axis_z.append(np.sum(input_image[:, index] > 0))
smtv = np.sum(input_image > 0)
return profile_axis_xy, profile_axis_z, smtv
@staticmethod
def remove_outliers(input_image: ndarray = None, minimum_cc_sum: float = None):
""" Remove outliers. Outliers are considered to be small regions (less than the given minimum_cc_sum).
Args:
input_image: input mask.
minimum_cc_sum: threshold value to remove tumor size less than this value.
Returns:
Returns mask with outliers or small independent regions of tumor removed.
"""
binary_mask = input_image.copy()
labelled_mask, num_labels = label(binary_mask)
# Let us now remove all the small regions, i.e., less than the specified mm.
refined_mask = input_image.copy()
for get_label in range(num_labels):
if np.sum(refined_mask[labelled_mask == get_label]) <= minimum_cc_sum:
refined_mask[labelled_mask == get_label] = 0
return refined_mask
@staticmethod
def compute_surrogate_dissemination(input_surrogate: List[ndarray] = None, percentile: List = None):
""" calculate the dissemination.
Args:
input_surrogate: input mask image.
percentile: percentile of the tumor regions to consider for the dissemination computation.
Returns:
Returns calculated surrogate tumor dissemination.
"""
# if the input image has tumor regions, foreground regions.
if np.asarray(input_surrogate).any() > 0:
# get the positions where there is tumor region
ls = [index for index, element in enumerate(input_surrogate) if element != 0]
# distance is the difference between the first and the last index plus one. s
n = int(ls[-1] - ls[0] + 1)
distance = np.absolute(np.ceil(n * percentile[0] / 100) - np.ceil(n * percentile[1] / 100))
else:
distance = 0
return distance
@staticmethod
def get_image(img_mask_pt):
""" get nifti image
Args:
img_mask_pt:path to the mask image to read.
Returns:
Returns threshold mask image.
"""
# get the nifti image
mask = nib.load(img_mask_pt)
# get the voxel spacing
voxel_size = mask.header.get_zooms()
mask = np.asanyarray(mask.dataobj)
mask = ComputesTMTVsDmaxFromNii.threshold(mask)
return mask, voxel_size
@staticmethod
def write_it_to_csv(data: List = None, dir_name: str = None, identifier: str = "predicted"):
""" Write the surrogate feature in xls files
Args:
data: input array to write
dir_name: path to save the xls/csv file
identifier: unique identifier or name of the xls file
"""
data = np.array(data)
if dir_name is None:
dir = "./csv"
else:
dir = os.path.dirname(dir_name)
if not os.path.exists(dir):
os.mkdir(dir)
file_name = os.path.join(dir, 'surrogate_' + str(identifier) + '.csv')
with open(file_name, 'w', newline='') as output:
output_data = csv.writer(output, delimiter=',')
# write the column name in the first row
# columns = [str(i) for i in range(data.shape[1])]
#
# output_data.writerow(columns)
for row_in_data in range(data.shape[0] + 1):
# first row jump it
if row_in_data != 0:
output_data.writerow(data[row_in_data - 1][:])
print("Total data processed %0.3d" % len(data))
print("CSV file saved to: ", dir)
if __name__ == '__main__':
data_pth = r"E:\ai4elife\data\predicted/"
cls = ComputesTMTVsDmaxFromNii(data_path=data_pth, get_identifier="predicted")
cls.compute_and_save_surrogate_features()