[637b40]: / adpkd_segmentation / utils / nifti_utils.py

Download this file

151 lines (120 with data), 4.4 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""Utilities for converting nifti annotations to png"""
from functools import lru_cache
import os
from cv2 import imwrite
import nibabel as nib
import numpy as np
from pathlib import Path
import SimpleITK as sitk
import shutil
from adpkd_segmentation.datasets.masks import (
BACKGROUND_INT,
L_KIDNEY_INT,
R_KIDNEY_INT,
)
NIFTI_FILE = "Untitled.nii.gz"
DICOM_FOLDER = "DICOM_anon"
GROUND = "Ground"
NIFTI_FOLDER = "Nifti"
png_int_to_itk = {BACKGROUND_INT: 0, R_KIDNEY_INT: 1, L_KIDNEY_INT: 2}
@lru_cache()
def create_png_int_mat(shape):
return np.stack(
[
np.broadcast_to(BACKGROUND_INT, shape=shape).astype("uint8"),
np.broadcast_to(L_KIDNEY_INT, shape=shape).astype("uint8"),
np.broadcast_to(R_KIDNEY_INT, shape=shape).astype("uint8"),
]
)
@lru_cache()
def create_nifti_int_mat(shape):
return np.stack(
[
np.broadcast_to(png_int_to_itk[BACKGROUND_INT], shape=shape),
np.broadcast_to(png_int_to_itk[L_KIDNEY_INT], shape=shape),
np.broadcast_to(png_int_to_itk[R_KIDNEY_INT], shape=shape),
]
)
def nifti_to_png_array(nifti_array):
shape = nifti_array.shape
x, y = shape
nifti_array = nifti_array[::-1]
nifti_array = np.rot90(nifti_array, k=3)
if x != y:
shape = (y,x)
mask = (nifti_array == create_nifti_int_mat(shape)).astype("uint8")
png_array = (mask * create_png_int_mat(shape)).sum(axis=0)
return png_array
def load_nifti(nifti_path):
nimg = nib.load(nifti_path)
data = nimg.get_data()
return data
def traverse_folder(folder):
out = []
children = [c for c in folder.iterdir() if c.is_dir()]
if folder.is_dir():
out.append(folder)
for c in children:
out.extend(traverse_folder(c))
return out
def process_dcm_folder(dcm_folder, target_dir):
dcm_folder_str = str(dcm_folder)
parent_name = dcm_folder.parent.name
target_study = target_dir / parent_name
nifti_folder = target_study / NIFTI_FOLDER
ground_folder = target_study / GROUND
target_dcm_folder = target_study / DICOM_FOLDER
nifti_target_file = target_study / NIFTI_FOLDER / NIFTI_FILE
if target_study.exists():
print(f"Already exists, skipping {target_study}")
return
os.makedirs(target_study)
os.makedirs(nifti_folder)
os.makedirs(ground_folder)
os.makedirs(target_dcm_folder)
shutil.copy(dcm_folder / NIFTI_FILE, nifti_target_file)
nifti_array = load_nifti(str(nifti_target_file))
num_annotated = nifti_array.shape[-1]
reader = sitk.ImageSeriesReader()
series_IDs = reader.GetGDCMSeriesIDs(dcm_folder_str)
# more series, but only one of them is annotated
if len(series_IDs) > 1:
correct = None
for idx, series in enumerate(series_IDs):
dcm_files = reader.GetGDCMSeriesFileNames(dcm_folder_str, series)
if len(dcm_files) == num_annotated:
if correct is None:
correct = idx
else:
shutil.rmtree(target_study)
print(f"Can't tell which study annotated in {dcm_folder}")
raise Exception
if correct is None:
shutil.rmtree(target_study)
print(f"Dicoms and nifti annotations not a match in {dcm_folder}")
raise Exception
else:
correct = 0
dcm_files = reader.GetGDCMSeriesFileNames(
dcm_folder_str, series_IDs[correct]
)
for idx, dcm in enumerate(dcm_files):
shutil.copy(dcm, target_dcm_folder)
png_array = nifti_to_png_array(nifti_array[:, :, idx])
png_name = Path(dcm.replace(".dcm", ".png")).name
png_path = str(ground_folder / png_name)
imwrite(png_path, png_array)
def process_nifti_dirs(source_dir, target_dir):
if isinstance(source_dir, str):
source_dir = Path(source_dir)
if isinstance(target_dir, str):
target_dir = Path(target_dir)
subfolders = traverse_folder(source_dir)
for folder in subfolders:
if folder.name == DICOM_FOLDER:
print(f"Processing study {folder.parent.name}")
try:
process_dcm_folder(folder, target_dir)
except Exception:
print(f"Couldn't process {folder.parent}")
continue