--- a +++ b/cmac/loader.py @@ -0,0 +1,213 @@ +import os +import glob +import pydicom +import aux_dicom +import numpy as np +import pandas as pd +import nibabel as nib +import vtk +from vtk.util.numpy_support import vtk_to_numpy +import warnings +from skimage.measure import find_contours + +class Tag: + def __init__(self, subject_id=1, data_dir='../datasets/CMAC', work_dir='results/data', results_group='MEVIS'): + + self.subject_id = subject_id + + self.data_dir = os.path.abspath(data_dir) + self.work_dir = os.path.abspath(work_dir) + self.raw_dir = os.path.join(self.data_dir, 'raw', f'v{subject_id}', '3DTAG') + self.gt_dir = os.path.join(self.data_dir, 'GT', '3DTAG', f'v{subject_id}') + self.results_dir = os.path.join(self.data_dir, 'RESULTS', results_group, '3DTAG', f'v{subject_id}') + + self.results_group = results_group + + self.affine_axial = nib.load(os.path.join(self.raw_dir, 'NIFTI', 'NIFTI00.nii')).affine + + def load_pts_gt(self, lmks_obs=1, lmks_frame_id=0): + """Load GT mesh and landmarks.""" + coords = { + 'MEVIS': 'VTK_COORDINATES', + 'UPF': 'VTK_COORDINATES', + 'IUCL': 'DICOM_COORDINATES', + 'INRIA': 'INRIA_COORDINATES' + }[self.results_group] + + mesh_gt_mm, region_ids, subpart_ids = load_pts_mm( + os.path.join(self.gt_dir, 'MESH', coords, f'v{self.subject_id}.vtk'), + regionID=True, subpartID=True + ) + self.mesh_gt_mm = mesh_gt_mm + self.regionIDs = region_ids + self.subpartIDs = subpart_ids + + self.lmks_gt_mm = load_pts_mm( + os.path.join(self.gt_dir, 'LMKS', coords, f'obs{lmks_obs}_groundTruth{lmks_frame_id:02d}.vtk') + ) + self.mesh_gt_mm[:, :2] *= -1 + self.lmks_gt_mm[:, :2] *= -1 + + def load_pts(self, frame_id=0, obs=1): + """Load predicted mesh and landmarks.""" + self.mesh_pred_mm = load_pts_mm( + os.path.join(self.results_dir, 'MESH', f'finalMesh{frame_id:03d}.vtk') + ) + self.lmks_pred_mm = load_pts_mm( + os.path.join(self.results_dir, 'LMKS', f'obs{obs}_results{frame_id:03d}.vtk') + ) + self.mesh_pred_mm[:, :2] *= -1 + self.lmks_pred_mm[:, :2] *= -1 + + def load_nifti_3d(self, frame_id, set_affine=None): + nifti = nib.load(os.path.join(self.raw_dir, 'NIFTI', f'NIFTI{frame_id:02d}.nii')) + return nib.Nifti1Image(nifti.get_fdata(), set_affine if set_affine is not None else nifti.affine) + + def load_nifti_4d(self, n_frames=20, set_affine=None): + shape = self.load_nifti_3d(frame_id=0).shape + arr_4d = np.zeros(shape + (n_frames,)) + for frame_id in range(n_frames): + arr_3d = self.load_nifti_3d(frame_id=frame_id).get_fdata() + arr_4d[:, :, :, frame_id] = arr_3d + + affine = set_affine if set_affine is not None else self.affine_axial + return nib.Nifti1Image(arr_4d, affine) + + +class Cine: + def __init__(self, subject_id=1, data_dir='cMAC', work_dir='results/data', results_group='UPF'): + + self.subject_id = subject_id + + self.data_dir = os.path.abspath(data_dir) + self.work_dir = os.path.abspath(work_dir) + self.raw_dir = os.path.join(self.data_dir, 'raw', f'v{subject_id}', 'cSAX') + self.gt_dir = os.path.join(self.data_dir, 'GT', 'SSFP', f'v{subject_id}') + self.results_dir = os.path.join(self.data_dir, 'RESULTS', results_group, 'SSFP', f'v{subject_id}') + + self.results_group = results_group + + self.ijk2ras, self.affine, self.affine_axial = aux_dicom.read_affine_info(os.path.join(self.raw_dir, 'time_1', '*')) + + def load_pts_gt(self, lmks_frame_id=0, obs=1): + """Load GT mesh and landmarks.""" + coords = { + 'MEVIS': 'VTK_COORDINATES', + 'UPF': 'VTK_COORDINATES', + 'IUCL': 'DICOM_COORDINATES', + 'INRIA': 'INRIA_COORDINATES' + }[self.results_group] + + mesh_gt_mm, region_ids, subpart_ids = load_pts_mm( + os.path.join(self.gt_dir, 'MESH', 'VTK_COORDINATES', f'v{self.subject_id}.vtk'), + regionID=True, subpartID=True + ) + self.mesh_gt_mm = mesh_gt_mm + self.regionIDs = region_ids + self.subpartIDs = subpart_ids + + self.lmks_gt_mm = load_pts_mm( + os.path.join(self.gt_dir, 'LMKS', coords, f'obs{obs}_groundTruth{lmks_frame_id:02d}.vtk') + ) + + def load_pts(self, frame_id=0, obs=1): + """Load predicted mesh and landmarks.""" + self.mesh_pred_mm = load_pts_mm( + os.path.join(self.results_dir, 'MESH', f'finalMesh{frame_id:03d}.vtk') + ) + self.lmks_pred_mm = load_pts_mm( + os.path.join(self.results_dir, 'LMKS', f'obs{obs}_results{frame_id:03d}.vtk') + ) + + def load_nifti_3d(self, frame_id, set_affine=None): + dicom_files = glob.glob(os.path.join(self.raw_dir, f'time_{frame_id + 1}', '*')) + cine_arrs_2d = [pydicom.dcmread(file).pixel_array.T for file in dicom_files] + cine_arr_3d = np.stack(cine_arrs_2d, -1) + return nib.Nifti1Image(cine_arr_3d, set_affine if set_affine is not None else self.affine) + + def load_nifti_4d(self, n_frames=20, set_affine=None): + shape = self.load_nifti_3d(frame_id=0).shape + arr_4d = np.zeros(shape + (n_frames,)) + for frame_id in range(n_frames): + arr_3d = self.load_nifti_3d(frame_id=frame_id).get_fdata() + arr_4d[:, :, :, frame_id] = arr_3d + + affine = set_affine if set_affine is not None else self.affine + return nib.Nifti1Image(arr_4d, affine) + + +def load_pts_mm(pts_filename, regionID=False, subpartID=False): + """Load points (mm) in 3D space stored in vtk file.""" + reader = vtk.vtkDataSetReader() + reader.SetFileName(pts_filename) + reader.Update() + pts_mm = vtk_to_numpy(reader.GetOutput().GetPoints().GetData()) + output = [pts_mm] + if regionID: + output.append(vtk_to_numpy(reader.GetOutput().GetPointData().GetArray('regionID'))) + if subpartID: + output.append(vtk_to_numpy(reader.GetOutput().GetPointData().GetArray('subpartID'))) + return output if len(output) > 1 else pts_mm + +def pts_pix_to_mm(pts_pix, affine): + """Apply affine transformation to pixel coordinates.""" + pts_mm = np.dot(np.c_[pts_pix, np.ones(len(pts_pix))], affine.T)[:, :3] + return pts_mm + +def pts_mm_to_pix(pts_mm, affine): + """Apply inverse affine transformation to mm coordinates.""" + pts_pix = np.dot(np.c_[pts_mm, np.ones(len(pts_mm))], np.linalg.inv(affine).T)[:, :3] + return pts_pix + +def cine_pix_to_tag_pix(cine_pts_pix, cine_affine, tag_affine): + """Convert cine pixel space points to tag pixel space points.""" + return pts_mm_to_pix(pts_pix_to_mm(cine_pts_pix, cine_affine), tag_affine) + +def tag_pix_to_cine_pix(tag_pts_pix, tag_affine, cine_affine): + """Convert tag pixel space points to cine pixel space points.""" + return pts_mm_to_pix(pts_pix_to_mm(tag_pts_pix, tag_affine), cine_affine) + +def cine_cnts_pix_to_tag_cnts_pix(cine_cts_pix, cine_affine, tag_affine): + """Convert contours in cine pixel space to points in tag pixel space.""" + tag_cts_pix = [] + for slice_contours in cine_cts_pix: + slice_tag_cts = [cine_pix_to_tag_pix(contour, cine_affine, tag_affine) for contour in slice_contours] + tag_cts_pix.append(slice_tag_cts) + return tag_cts_pix + +def estimate_affine(ins, outs): + """Estimate affine transformation matrix.""" + l = len(ins) + B = np.vstack([np.transpose(ins), np.ones(l)]) + D = 1.0 / np.linalg.det(B) + entry = lambda r, d: np.linalg.det(np.delete(np.vstack([r, B]), (d + 1), axis=0)) + M = [[(-1) ** i * D * entry(R, i) for i in range(l)] for R in np.transpose(outs)] + return np.array(M + [[0, 0, 0, 1]]) + +############################### +####### EXTRA FUNCTIONS ####### +############################### + +def export_pts_mm_to_slicer_fiducials(pts_mm, fiducials_filename_fscv): + """Save points (mm) in 3D space as a slicer fiducial file (.fscv).""" + with open(fiducials_filename_fscv, 'w') as file: + file.write('# Markups fiducial file version = 4.6\n') + file.write('# CoordinateSystem = 0\n') + file.write('# columns = id,x,y,z,ow,ox,oy,oz,vis,sel,lock,label,desc,associatedNodeID\n') + for idx, point in enumerate(pts_mm): + file.write(f'vtkMRMLMarkupsFiducialNode_0,{point[0]},{point[1]},{point[2]},0,0,0,1,1,1,0,F-{idx},,vtkMRMLScalarVolumeNode1\n') + +def import_pts_mm_from_slicer_fiducials(fiducials_filename_fscv): + """Load points (mm) in 3D space from a slicer fiducial file (.fscv).""" + return pd.read_csv(fiducials_filename_fscv, skiprows=3, header=None).iloc[:, 1:4].to_numpy() + +def import_cnts_pix_from_label(label_nifti_filename, label_id=1): + """Load and convert 3D label of myocardial contours to contour points in pixel space.""" + label = nib.load(label_nifti_filename).get_fdata() + epicardium, endocardium = [], [] + for z in range(label.shape[-1]): + contours = find_contours(label[:, :, z] == label_id, 0.8) + if len(contours) == 2: + epicardium.append(np.c_[contours[0], z * np.ones((len(contours[0]), 1))]) + endocardium.append(np.c_[contours[1], z * np.ones((len(contours[1]), 1))]) + return epicardium, endocardium