[42b7b1]: / src / LFBNet / preprocessing / preprocessing.py

Download this file

624 lines (492 with data), 21.9 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
""" Script to preprocess a given FDG PET image in .nii.
"""
# Import libraries
import glob
import os
import numpy as np
from numpy import ndarray
from numpy.random import seed
from tqdm import tqdm
from typing import List, Tuple
import matplotlib.pyplot as plt
import warnings
from skimage.transform import resize
import scipy.ndimage
import nibabel as nib
# seed random number generator
seed(1)
def directory_exist(dir_check: str = None):
""" Check if the given path exists.
Args:
dir_check: directory path to check
Raises:
Raises exception if the given directory path doesn't exist.
"""
if os.path.exists(dir_check):
# print("The directory %s does exist \n" % dir_check)
pass
else:
raise Exception("Please provide the correct path to the processed data: %s not found \n" % (dir_check))
def generate_mip_show(pet: int = None, gt: int = None, identifier: str = None):
""" Display the MIP images of a given 3D FDG PET image , and corresponding ground truth gt.
Args:
pet: 3D array of PET images
gt: 3D array of reference or ground truth lymphoma segmentation.
identifier: Name (identifier) of the patient.
"""
# consider axis 0 for sagittal and axis 1 for coronal views
pet, gt = np.array(pet), np.array(gt)
pet = [np.amax(pet, axis=0), np.amax(pet, axis=1)]
gt = [np.amax(gt, axis=0), np.amax(gt, axis=1)]
try:
pet = np.squeeze(pet)
gt = np.squeeze(gt)
except:
pass
img_gt = np.concatenate((pet, gt, pet), axis=0)
display_image(img_gt, identifier=identifier)
def display_image(im_display: ndarray, identifier: str = None):
""" display given array of images.
Args:
im_display: array of images to show
identifier: patient name to display as title
"""
plt.figure(figsize=(10, 1))
plt.subplots_adjust(hspace=0.015)
plt.suptitle("Showing image: " + str(identifier), fontsize=12, y=0.95)
# loop through the length of tickers and keep track of index
for n, im in enumerate(im_display):
# add a new subplot iteratively
plt.subplot(int(len(im_display) // 2), 2, n + 1)
plt.imshow(np.log(im + 1))
plt.show()
def get_pet_gt(current_dir: str = None):
""" Read pet and corresponding reference images.
Args:
current_dir: directory path to the PET and corresponding ground truth image.
Returns:
Returns array of pet and ground truth images.
"""
def get_nii_files(nii_path):
""" Read .nii data in the given nii_path.
if there are more .nii or .nii.gz file inside the nii_path directory, the function will return the
first read image.
Args:
nii_path: directory path to the .nii or .nii.gz file.
Returns:
Returns loaded nibble format if it exists, else it returns empty array.
"""
# more than one .nii or .nii.gz is found in the folder the first will be returned
full_path = [path_i for path_i in glob.glob(str(nii_path) + "/*.nii.gz")]
if not len(full_path): # if no file exists that ends wtih .nii.gz
full_path = [path_i for path_i in glob.glob(str(nii_path) + "/*.nii")]
if not len(full_path):
get_image = None # raise Exception("No .nii or .nii.gz found in %s dirctory" % nii_path)
else:
print("%d files found in %s \t the first will be read " % (len(full_path), nii_path))
print("reading ... %s" % full_path[0])
get_image = nib.load(full_path[0])
return get_image
# all folders files
# all_ = [path_i for path_i in glob.glob(str(nii_path) + "/*.nii")]
# Get pet image
# if "pet" in str()
pet_path = str(current_dir) + "/pet/"
pet = get_nii_files(pet_path)
# Get gt image
try:
# if the ground truth exists
gt_path = str(current_dir) + "/gt/"
gt = get_nii_files(gt_path)
except:
# if the ground truth does not exist
gt = None
return [pet, gt]
def resize_nii_to_desired_spacing(
data: int = None, data_spacing: Tuple[float] = None, desired_spacing: ndarray = None,
interpolation_order_value: int = None
):
""" resizes a given input data into the desired spacing using the specified interpolation order.
Args:
data: array of input data to resize.
data_spacing: original input data spacing.
desired_spacing: required output data spacing.
interpolation_order_value: interpolation order. E.g., 0 for binary images, and 3 for cubic interpolation.
Returns:
resized data.
"""
if desired_spacing is None:
desired_spacing_x, desired_spacing_y, desired_spacing_z = [4.0, 4.0, 4.0]
else:
desired_spacing_x, desired_spacing_y, desired_spacing_z = desired_spacing
print("Given data spacing \t Desired spacing \n")
print(data_spacing, "\t", end="")
print(desired_spacing, "\n")
if not isinstance(data, list):
data = np.array(data)
# New resolution, consider at least the input image is two-dimensional
new_x_resolution = np.ceil(data.shape[0] * (data_spacing[0] / desired_spacing_x))
new_y_resolution = np.ceil(data.shape[1] * (data_spacing[1] / desired_spacing_y))
print('resizing')
print(np.array(data).shape)
if len(data_spacing) == 3 and len(np.squeeze(data).shape) == 3: # 3D input image
new_z_resolution = np.ceil(data.shape[2] * (data_spacing[2] / desired_spacing_z))
# resize to new image resolution
image_resized = resize(
data, (new_x_resolution, new_y_resolution, new_z_resolution), order=interpolation_order_value,
preserve_range=True, anti_aliasing=False
)
else: # if the given input image is 2D
image_resized = resize(
data, (new_x_resolution, new_y_resolution), order=interpolation_order_value, preserve_range=True,
anti_aliasing=False
)
return image_resized
def z_score(image):
""" z-score operation on the input data.
Args:
image: input data to apply the z-score.
Returns:
Standardized value of the input using z-score. Z-score = (input - mean(input))/(std(input))
"""
image = image.copy()
if not isinstance(image, ndarray):
image = np.array(image)
image_nan = np.where(image > 0, image, np.nan)
means = np.nanmean(image_nan, axis=(image.shape))
stds = np.nanstd(image_nan, axis=(image.shape))
image_norm = (image - means) / (stds + 1e-8)
return image_norm
# cropping from the center based:
def crop_nii_to_desired_resolution(data: ndarray = None, cropped_resolution: List[int] = None):
""" Crops the input data in the given cropped resolution.
Args:
data: input data to be cropped.
cropped_resolution: desired output resolution.
Returns:
Returns cropped data of size cropped resolution.
"""
#
if not isinstance(data, list): # if data is not array change it to array
data = np.array(data)
try:
data = np.squeeze(data)
except:
pass
if cropped_resolution is not None:
cropped_resolution = [128, 128, 256]
print("\n Initial data size \t Cropped data size ")
print(data.shape, "\t", end=" ")
print(cropped_resolution, "\n")
if len(cropped_resolution) == 3 and len(data.shape) == 3: # 3D data
x, y, z = data.shape
else:
raise Exception("Input image not !")
# start cropping : get middle x, y, z values by dividing by 2 and subtract the desired center of image resolution
start_cropping_at_x = (x // 2 - (cropped_resolution[0] // 2))
start_cropping_at_y = (y // 2 - (cropped_resolution[1] // 2))
start_cropping_at_z = (z // 2 - (cropped_resolution[2] // 2))
# check for off sets: mean the new cropping resolution is bigger than the input image's resolution
off_set_x, off_set_y, off_set_z = 0, 0, 0
if start_cropping_at_x < 0:
off_set_x = np.abs(start_cropping_at_x)
# set the first pixel at zero
start_cropping_at_x = 0
if start_cropping_at_y < 0:
off_set_y = np.abs(start_cropping_at_y)
# set the first pixel at zero
start_cropping_at_y = 0
if start_cropping_at_z < 0:
off_set_z = np.abs(start_cropping_at_z)
# set the first pixel at zero
start_cropping_at_z = 0
else:
# take [0:cropped_resolution[0]]
# Patients are given [x, y, z] when we say z it starts at zero (leg) to z (head).
start_cropping_at_z = 2 * (z // 2 - (cropped_resolution[2] // 2))
npad = ((off_set_x, off_set_x), (off_set_y, off_set_y), (2 * off_set_z, 0))
# set zero value to the off set pixels mode='constant',
data = np.pad(data, pad_width=npad, constant_values=0)
# cropping to the given or set cropping resolution
data = data[start_cropping_at_x:start_cropping_at_x + cropped_resolution[0],
start_cropping_at_y:start_cropping_at_y + cropped_resolution[1],
start_cropping_at_z:start_cropping_at_z + cropped_resolution[2]]
return data
def save_nii_images(
image: List[ndarray] = None, affine: ndarray = None, path_save: str = None, identifier: str = None,
name: List[str] = None
):
""" Save given images into the given directory. If no saving directory is given it will save into
./data/predicted/' directory.
Args:
image: data to save.
affine: affine value.
path_save: saving directory path.
identifier: unique name of the case.
name: name of the case to read.
Returns:
Saved image.
"""
try:
directory_exist(path_save)
except:
try:
os.mkdir(path_save)
except:
if not os.path.exists("../data"):
os.mkdir("../data")
if not os.path.exists('../data/predicted/'):
os.mkdir('../data/predicted/')
path_save = '../data/predicted/'
if identifier is not None: # associate file name e.g. patient name
dir = os.path.join(path_save, str(identifier))
else:
dir = path_save
if not os.path.exists(dir):
os.mkdir(dir) # os.mkdir("./" + str(identifier))
# print('saving it to %s\n' % dir)
# .nii name, e.g. pet, gt, but if not given the base name of given directory
if name is None:
name = ['image_' + str(os.path.basename(dir)).split('.')[0]]
if affine is None:
affine = np.diag([4, 4, 4, 1])
# image = np.flip(image, axis=-1)
for select_image in range(len(image)):
im_ = nib.Nifti1Image(image[select_image], affine)
save_to = str(dir) + "/" + str(name[select_image])
im_.to_filename(save_to)
def generate_mip_from_3D(given_3d_nii: ndarray = None, mip_axis: int = 0):
""" Projects a 3D data into MIP along the selected MIP axis.
Args:
given_3d_nii: 3D array to project into MIP.
mip_axis: Projecting axis.
Returns:
Returns MIP image.
"""
if given_3d_nii is None:
raise Exception("3D image not given")
if not isinstance(given_3d_nii, list):
given_3d_nii = np.array(given_3d_nii)
mip = np.amax(given_3d_nii, axis=mip_axis)
mip = np.asarray(mip)
return mip
def transform_coordinate_space(modality_1, modality_2, mode='nearest'):
""" Apply affine transformation on given data using affine from the second data.
Adapted from: https://gist.github.com/zivy/79d7ee0490faee1156c1277a78e4a4c4
Transfers coordinate space from modality_2 to modality_1
Input images are in nifty/nibabel format (.nii or .nii.gz)
Args:
modality_1: reference modality.
modality_2: image modality to apply affine transformation.
Returns:
Returns affine transformed form of modality_2 using modality_1 as reference.
"""
aff_t1 = modality_1.affine
try:
aff_t2 = modality_2.affine
except:
aff_t2 = aff_t1
inv_af_2 = np.linalg.inv(aff_t2)
out_shape = modality_1.get_fdata().shape
# desired transformation
T = inv_af_2.dot(aff_t1)
# apply transformation
transformed_img = scipy.ndimage.affine_transform(modality_2.get_fdata(), T, output_shape=out_shape, mode=mode)
return transformed_img
# read PET and GT images
def read_pet_gt_resize_crop_save_as_3d_andor_mip(
data_path: str = None, data_name: str = None, saving_dir: str = None, save_3D: bool = False, crop: bool = True,
output_resolution: List[int] = None, desired_spacing: List[float] = None, generate_mip: bool = False
):
""" Read pet and ground truth images from teh input data path. It also apply resize, and cropping operations.
Args:
data_path: directory to the raw 3D pet and ground truth .nii fies.
data_name: unique the whole data identifier.
saving_dir: directory path to save the processed images.
save_3D: Save the processed 3D data or not.
crop: apply cropping operations.
output_resolution: desired output resolution.
desired_spacing: required to be output spacing.
generate_mip: project the processed 3D data into MIPs.
Returns:
Returns processed data.
"""
if output_resolution is not None:
# output resized and cropped image resolution
rows, columns, depths = output_resolution
else: # default values
# output resized and cropped image resolution=
output_resolution = [128, 128, 256]
if data_name is None:
data_name = "unknown_data"
'''
get directory name or patient id of the PET and gt images
Assuming the file structure is :
---patient_id
-- PET
-- *.nii or .nii.gz
-- gt
-- *.nii or .nii.gz
'''
# check if the directory exist
directory_exist(data_path)
# by default the processed 3d and 2D MIP will be saved into the 'data' subdirectory, respectively with name tags
# as '_default_3d_dir' and '_default_MIP_dir'
def create_directory(directory_to_create: list):
"""
:param directory_to_create:
"""
for directory in directory_to_create:
if not os.path.exists(directory):
os.mkdir(directory) # os.mkdir("./" + str(identifier))
# if saving_dir is None:
# if not os.path.exists("../data"):
# os.mkdir("../data")
#
# saving_dir_3d = "../data/" + str(data_name) + "_default_3d_dir"
#
# # create directory in the data with name resized_cropped_MIP_default
# saving_dir_mip = "../data/" + str(data_name) + "_default_MIP_dir"
#
# create_directory([saving_dir_3d, saving_dir_mip])
#
# # If the saving directory is given, a sub folder for the processed 3d and 2D MIP will be created
# else:
data_3d = str(data_name) + "_default_3d_dir_"
data_mip = str(data_name) + "_default_MIP_dir"
saving_dir_3d = os.path.join(saving_dir, data_3d)
saving_dir_mip = os.path.join(saving_dir, data_mip)
create_directory([saving_dir_3d, saving_dir_mip])
# all patient ids
case_ids = os.listdir(data_path)
if not len(case_ids): # reise exception if the directory is empty
raise Exception("Directory %s is empty" % data_path)
else: # read the pet and gt images
print("Continuing to read %d cases" % len(case_ids))
# resize, to given resolution
if desired_spacing is None:
desired_spacing = [4.0, 4.0, 4.0]
# print where will be saved the 3d and mip
print("\n")
print(40 * "**==**")
print("3D resized and cropped images will be saved to %s, if set save" % saving_dir_3d)
print("Generated MIPs will be saved to %s, if set to save" % saving_dir_mip)
print(40 * "**==**")
print("\n")
for image_name in tqdm(list(case_ids)):
# if image_name in images:
current_id = os.path.join(data_path, image_name)
# read, resize, crop and save as 3D
pet, gt = get_pet_gt(current_id)
# resolution
res_pet = pet.header.get_zooms()
res_pet = tuple([float(round(res, 2)) for res in list(res_pet[:3])])
affine = pet.affine
print(f'Size of the PET input image: {np.asanyarray(pet.dataobj).shape}')
# if there is a ground truth:
if gt is not None:
res_gt = gt.header.get_zooms()
res_gt = tuple([float(round(res, 2)) for res in list(res_gt[:3])])
# check if pet and gt are on the same spacing, otherwise gt could be generated from CT images
if not res_pet[:3] == res_gt[:3]: # assert (res_pet == res_gt)
print("pet, and gt resolutions, respectively \n")
print(res_pet, "\t", res_gt)
warnings.warn("Pet and gt have different spacing, Alignment continue...")
# apply affine transformation to move the gt to the PET space,
# probably the gt was generated from the CT images
gt = transform_coordinate_space(pet, gt, mode='constant')
# raise Exception('Ground truth and pet images are not on the same spacing')
gt[gt >= 1] = 1
gt[gt < 1] = 0
else:
gt = np.asanyarray(gt.dataobj)
"""
For remarc data the ground truth is inverted along the z-axis
"""
if data_name == "remarc":
gt = np.flip(gt, axis=-1)
gt = resize_nii_to_desired_spacing(
gt, data_spacing=res_pet, desired_spacing=desired_spacing, interpolation_order_value=0
)
pet = np.asanyarray(pet.dataobj)
# if the given image has stacked as channel example one image in remarc : 175x175x274x2
if pet.shape[-1] == 2:
pet = pet[..., 0]
# generate_mip_show(pet, gt, identifier=str(image_name))
pet = resize_nii_to_desired_spacing(
pet, data_spacing=res_pet, desired_spacing=desired_spacing, interpolation_order_value=3
)
'''
if most data are with brain images at the very top, avoid cropping the brain images,instead crop from bottom
or the leg part
'''
if gt is not None:
if str(data_name).lower() == 'lnh':
# flip left to right to mach lnh data to remarc
gt = np.flip(gt, axis=-1)
crop_zero_above_brain = True
if crop_zero_above_brain:
# remove all zero pixels just before the brian image
xs, ys, zs = np.where(pet != 0)
if len(zs): # use zs for cropping the ground truth data also
pet = pet[:, :, min(zs):]
# if gt is None:
# generate_mip_show(pet, pet, identifier=str(image_name))
# else:
# generate_mip_show(pet, gt, identifier=str(image_name))
if crop:
pet = crop_nii_to_desired_resolution(pet.copy(), cropped_resolution=output_resolution.copy())
if gt is not None:
# remove the zero values of pet image above the brain
if len(zs): # use zs for cropping the ground truth data also
gt = gt[:, :, min(zs):]
gt = crop_nii_to_desired_resolution(gt.copy(), cropped_resolution=output_resolution.copy())
# if gt is None:
# generate_mip_show(pet, pet, identifier=str(image_name))
# else:
# generate_mip_show(pet, gt, identifier=str(image_name))
if save_3D:
# output image affine
affine = np.diag([desired_spacing[0], desired_spacing[1], desired_spacing[2], 1])
if gt is not None:
save_nii_images(
[pet, gt], affine=affine, path_save=saving_dir_3d, identifier=str(image_name),
name=['pet', 'ground_truth']
)
else:
save_nii_images([pet], affine=affine, path_save=saving_dir_3d, identifier=str(image_name), name=['pet'])
# generate Sagittal and coronal MIPs
if generate_mip:
for sagittal_coronal in range(2):
pet_mip = generate_mip_from_3D(pet, mip_axis=int(sagittal_coronal)) # pet mip
# assuming sagittal is on axis 0, and coronal is on axis 1
if sagittal_coronal == 0: # sagittal
naming_ = "sagittal"
elif sagittal_coronal == 1:
naming_ = "coronal"
if gt is not None:
gt_mip = generate_mip_from_3D(gt, mip_axis=int(sagittal_coronal)) # gt mip
# save the generated MIP
save_nii_images(
[pet_mip, gt_mip], affine, path_save=saving_dir_mip, identifier=str(image_name),
name=['pet_' + str(naming_), 'ground_truth_' + str(naming_)]
)
else:
# save the generated MIP
save_nii_images(
[pet_mip], affine, path_save=saving_dir_mip, identifier=str(image_name),
name=['pet_' + str(naming_)]
)
return saving_dir_mip
# Read .nii files using itk
if __name__ == '__main__':
# how to run examples.
# input_path = r"F:\Data\Remarc\REMARC/"
# data_ = "remarc"
#
input_path = r"F:\Data\Vienna\No_ground_truth/"
data_ = "LNH"
saving_dir_mip = read_pet_gt_resize_crop_save_as_3d_andor_mip(
data_path=input_path, data_name=data_, saving_dir=None, save_3D=True, crop=True,
output_resolution=[128, 128, 256], desired_spacing=None, generate_mip=True
)