"""
This file contains functions to edit/preprocess volumes (i.e. not tensors!).
These functions are sorted in five categories:
1- volume editing: this can be applied to any volume (i.e. images or label maps). It contains:
-mask_volume
-rescale_volume
-crop_volume
-crop_volume_around_region
-crop_volume_with_idx
-pad_volume
-flip_volume
-resample_volume
-resample_volume_like
-get_ras_axes
-align_volume_to_ref
-blur_volume
2- label map editing: can be applied to label maps only. It contains:
-correct_label_map
-mask_label_map
-smooth_label_map
-erode_label_map
-get_largest_connected_component
-compute_hard_volumes
-compute_distance_map
3- editing all volumes in a folder: functions are more or less the same as 1, but they now apply to all the volumes
in a given folder. Thus we provide folder paths rather than numpy arrays as inputs. It contains:
-mask_images_in_dir
-rescale_images_in_dir
-crop_images_in_dir
-crop_images_around_region_in_dir
-pad_images_in_dir
-flip_images_in_dir
-align_images_in_dir
-correct_nans_images_in_dir
-blur_images_in_dir
-create_mutlimodal_images
-convert_images_in_dir_to_nifty
-mri_convert_images_in_dir
-samseg_images_in_dir
-niftyreg_images_in_dir
-upsample_anisotropic_images
-simulate_upsampled_anisotropic_images
-check_images_in_dir
4- label maps in dir: same as 3 but for label map-specific functions. It contains:
-correct_labels_in_dir
-mask_labels_in_dir
-smooth_labels_in_dir
-erode_labels_in_dir
-upsample_labels_in_dir
-compute_hard_volumes_in_dir
-build_atlas
5- dataset editing: functions for editing datasets (i.e. images with corresponding label maps). It contains:
-check_images_and_labels
-crop_dataset_to_minimum_size
-subdivide_dataset_to_patches
If you use this code, please cite the first SynthSeg paper:
https://github.com/BBillot/lab2im/blob/master/bibtex.bib
Copyright 2020 Benjamin Billot
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing permissions and limitations under the
License.
"""
# python imports
import os
import csv
import shutil
import numpy as np
import tensorflow as tf
import keras.layers as KL
from keras.models import Model
from scipy.ndimage.filters import convolve
from scipy.ndimage import label as scipy_label
from scipy.interpolate import RegularGridInterpolator
from scipy.ndimage.morphology import distance_transform_edt, binary_fill_holes
from scipy.ndimage import binary_dilation, binary_erosion, gaussian_filter
# project imports
from ext.lab2im import utils
from ext.lab2im.layers import GaussianBlur, ConvertLabels
from ext.lab2im.edit_tensors import blurring_sigma_for_downsampling
# ---------------------------------------------------- edit volume -----------------------------------------------------
def mask_volume(volume, mask=None, threshold=0.1, dilate=0, erode=0, fill_holes=False, masking_value=0,
return_mask=False, return_copy=True):
"""Mask a volume, either with a given mask, or by keeping only the values above a threshold.
:param volume: a numpy array, possibly with several channels
:param mask: (optional) a numpy array to mask volume with.
Mask doesn't have to be a 0/1 array, all strictly positive values of mask are considered for masking volume.
Mask should have the same size as volume. If volume has several channels, mask can either be uni- or multi-channel.
In the first case, the same mask is applied to all channels.
:param threshold: (optional) If mask is None, masking is performed by keeping thresholding the input.
:param dilate: (optional) number of voxels by which to dilate the provided or computed mask.
:param erode: (optional) number of voxels by which to erode the provided or computed mask.
:param fill_holes: (optional) whether to fill the holes in the provided or computed mask.
:param masking_value: (optional) masking value
:param return_mask: (optional) whether to return the applied mask
:param return_copy: (optional) whether to return the original volume or a copy. Default is copy.
:return: the masked volume, and the applied mask if return_mask is True.
"""
# get info
new_volume = volume.copy() if return_copy else volume
vol_shape = list(new_volume.shape)
n_dims, n_channels = utils.get_dims(vol_shape)
# get mask and erode/dilate it
if mask is None:
mask = new_volume >= threshold
else:
assert list(mask.shape[:n_dims]) == vol_shape[:n_dims], 'mask should have shape {0}, or {1}, had {2}'.format(
vol_shape[:n_dims], vol_shape[:n_dims] + [n_channels], list(mask.shape))
mask = mask > 0
if dilate > 0:
dilate_struct = utils.build_binary_structure(dilate, n_dims)
mask_to_apply = binary_dilation(mask, dilate_struct)
else:
mask_to_apply = mask
if erode > 0:
erode_struct = utils.build_binary_structure(erode, n_dims)
mask_to_apply = binary_erosion(mask_to_apply, erode_struct)
if fill_holes:
mask_to_apply = binary_fill_holes(mask_to_apply)
# replace values outside of mask by padding_char
if mask_to_apply.shape == new_volume.shape:
new_volume[np.logical_not(mask_to_apply)] = masking_value
else:
new_volume[np.stack([np.logical_not(mask_to_apply)] * n_channels, axis=-1)] = masking_value
if return_mask:
return new_volume, mask_to_apply
else:
return new_volume
def rescale_volume(volume, new_min=0, new_max=255, min_percentile=2, max_percentile=98, use_positive_only=False):
"""This function linearly rescales a volume between new_min and new_max.
:param volume: a numpy array
:param new_min: (optional) minimum value for the rescaled image.
:param new_max: (optional) maximum value for the rescaled image.
:param min_percentile: (optional) percentile for estimating robust minimum of volume (float in [0,...100]),
where 0 = np.min
:param max_percentile: (optional) percentile for estimating robust maximum of volume (float in [0,...100]),
where 100 = np.max
:param use_positive_only: (optional) whether to use only positive values when estimating the min and max percentile
:return: rescaled volume
"""
# select only positive intensities
new_volume = volume.copy()
intensities = new_volume[new_volume > 0] if use_positive_only else new_volume.flatten()
# define min and max intensities in original image for normalisation
robust_min = np.min(intensities) if min_percentile == 0 else np.percentile(intensities, min_percentile)
robust_max = np.max(intensities) if max_percentile == 100 else np.percentile(intensities, max_percentile)
# trim values outside range
new_volume = np.clip(new_volume, robust_min, robust_max)
# rescale image
if robust_min != robust_max:
return new_min + (new_volume - robust_min) / (robust_max - robust_min) * (new_max - new_min)
else: # avoid dividing by zero
return np.zeros_like(new_volume)
def crop_volume(volume, cropping_margin=None, cropping_shape=None, aff=None, return_crop_idx=False, mode='center'):
"""Crop volume by a given margin, or to a given shape.
:param volume: 2d or 3d numpy array (possibly with multiple channels)
:param cropping_margin: (optional) margin by which to crop the volume. The cropping margin is applied on both sides.
Can be an int, sequence or 1d numpy array of size n_dims. Should be given if cropping_shape is None.
:param cropping_shape: (optional) shape to which the volume will be cropped. Can be an int, sequence or 1d numpy
array of size n_dims. Should be given if cropping_margin is None.
:param aff: (optional) affine matrix of the input volume.
If not None, this function also returns an updated version of the affine matrix for the cropped volume.
:param return_crop_idx: (optional) whether to return the cropping indices used to crop the given volume.
:param mode: (optional) if cropping_shape is not None, whether to extract the centre of the image (mode='center'),
or to randomly crop the volume to the provided shape (mode='random'). Default is 'center'.
:return: cropped volume, corresponding affine matrix if aff is not None, and cropping indices if return_crop_idx is
True (in that order).
"""
assert (cropping_margin is not None) | (cropping_shape is not None), \
'cropping_margin or cropping_shape should be provided'
assert not ((cropping_margin is not None) & (cropping_shape is not None)), \
'only one of cropping_margin or cropping_shape should be provided'
# get info
new_volume = volume.copy()
vol_shape = new_volume.shape
n_dims, _ = utils.get_dims(vol_shape)
# find cropping indices
if cropping_margin is not None:
cropping_margin = utils.reformat_to_list(cropping_margin, length=n_dims)
do_cropping = np.array(vol_shape[:n_dims]) > 2 * np.array(cropping_margin)
min_crop_idx = [cropping_margin[i] if do_cropping[i] else 0 for i in range(n_dims)]
max_crop_idx = [vol_shape[i] - cropping_margin[i] if do_cropping[i] else vol_shape[i] for i in range(n_dims)]
else:
cropping_shape = utils.reformat_to_list(cropping_shape, length=n_dims)
if mode == 'center':
min_crop_idx = np.maximum([int((vol_shape[i] - cropping_shape[i]) / 2) for i in range(n_dims)], 0)
max_crop_idx = np.minimum([min_crop_idx[i] + cropping_shape[i] for i in range(n_dims)],
np.array(vol_shape)[:n_dims])
elif mode == 'random':
crop_max_val = np.maximum(np.array([vol_shape[i] - cropping_shape[i] for i in range(n_dims)]), 0)
min_crop_idx = np.random.randint(0, high=crop_max_val + 1)
max_crop_idx = np.minimum(min_crop_idx + np.array(cropping_shape), np.array(vol_shape)[:n_dims])
else:
raise ValueError('mode should be either "center" or "random", had %s' % mode)
crop_idx = np.concatenate([np.array(min_crop_idx), np.array(max_crop_idx)])
# crop volume
if n_dims == 2:
new_volume = new_volume[crop_idx[0]: crop_idx[2], crop_idx[1]: crop_idx[3], ...]
elif n_dims == 3:
new_volume = new_volume[crop_idx[0]: crop_idx[3], crop_idx[1]: crop_idx[4], crop_idx[2]: crop_idx[5], ...]
# sort outputs
output = [new_volume]
if aff is not None:
aff[0:3, -1] = aff[0:3, -1] + aff[:3, :3] @ np.array(min_crop_idx)
output.append(aff)
if return_crop_idx:
output.append(crop_idx)
return output[0] if len(output) == 1 else tuple(output)
def crop_volume_around_region(volume,
mask=None,
masking_labels=None,
threshold=0.1,
margin=0,
cropping_shape=None,
cropping_shape_div_by=None,
aff=None,
overflow='strict'):
"""Crop a volume around a specific region.
This region is defined by a mask obtained by either:
1) directly specifying it as input (see mask)
2) keeping a set of label values if the volume is a label map (see masking_labels).
3) thresholding the input volume (see threshold)
The cropping region is defined by the bounding box of the mask, which we can further modify by either:
1) extending it by a margin (see margin)
2) providing a specific cropping shape, in this case the cropping region will be centered around the bounding box
(see cropping_shape).
3) extending it to a shape that is divisible by a given number. Again, the cropping region will be centered around
the bounding box (see cropping_shape_div_by).
Finally, if the size of the cropping region has been modified, and that this modified size overflows out of the
image (e.g. because the center of the mask is close to the edge), we can either:
1) stick to the valid image space (the size of the modified cropping region won't be respected)
2) shift the cropping region so that it lies on the valid image space, and if it still overflows, then we restrict
to the valid image space.
3) pad the image with zeros, such that the cropping region is not ill-defined anymore.
3) shift the cropping region to the valida image space, and if it still overflows, then we pad with zeros.
:param volume: a 2d or 3d numpy array
:param mask: (optional) mask of region to crop around. Must be same size as volume. Can either be boolean or 0/1.
If no mask is given, it will be computed by either thresholding the input volume or using masking_labels.
:param masking_labels: (optional) if mask is None, and if the volume is a label map, it can be cropped around a
set of labels specified in masking_labels, which can either be a single int, a sequence or a 1d numpy array.
:param threshold: (optional) if mask amd masking_labels are None, lower bound to determine values to crop around.
:param margin: (optional) add margin around mask
:param cropping_shape: (optional) shape to which the input volumes must be cropped. Volumes are padded around the
centre of the above-defined mask is they are too small for the given shape. Can be an integer or sequence.
Cannot be given at the same time as margin or cropping_shape_div_by.
:param cropping_shape_div_by: (optional) makes sure the shape of the cropped region is divisible by the provided
number. If it is not, then we enlarge the cropping area. If the enlarged area is too big for the input volume, we
pad it with 0. Must be an integer. Cannot be given at the same time as margin or cropping_shape.
:param aff: (optional) if specified, this function returns an updated affine matrix of the volume after cropping.
:param overflow: (optional) how to proceed when the cropping region overflows outside the initial image space.
Can either be 'strict' (default), 'shift-strict', 'padding', 'shift-padding.
:return: the cropped volume, the cropping indices (in the order [lower_bound_dim_1, ..., upper_bound_dim_1, ...]),
and the updated affine matrix if aff is not None.
"""
assert not ((margin > 0) & (cropping_shape is not None)), "margin and cropping_shape can't be given together."
assert not ((margin > 0) & (cropping_shape_div_by is not None)), \
"margin and cropping_shape_div_by can't be given together."
assert not ((cropping_shape_div_by is not None) & (cropping_shape is not None)), \
"cropping_shape_div_by and cropping_shape can't be given together."
new_vol = volume.copy()
n_dims, n_channels = utils.get_dims(new_vol.shape)
vol_shape = np.array(new_vol.shape[:n_dims])
# mask ROIs for cropping
if mask is None:
if masking_labels is not None:
_, mask = mask_label_map(new_vol, masking_values=masking_labels, return_mask=True)
else:
mask = new_vol > threshold
# find cropping indices
if np.any(mask):
indices = np.nonzero(mask)
min_idx = np.array([np.min(idx) for idx in indices])
max_idx = np.array([np.max(idx) for idx in indices])
intermediate_vol_shape = max_idx - min_idx
if (margin == 0) & (cropping_shape is None) & (cropping_shape_div_by is None):
cropping_shape = intermediate_vol_shape
if margin:
cropping_shape = intermediate_vol_shape + 2 * margin
elif cropping_shape is not None:
cropping_shape = np.array(utils.reformat_to_list(cropping_shape, length=n_dims))
elif cropping_shape_div_by is not None:
cropping_shape = [utils.find_closest_number_divisible_by_m(s, cropping_shape_div_by, answer_type='higher')
for s in intermediate_vol_shape]
min_idx = min_idx - np.int32(np.ceil((cropping_shape - intermediate_vol_shape) / 2))
max_idx = max_idx + np.int32(np.floor((cropping_shape - intermediate_vol_shape) / 2))
min_overflow = np.abs(np.minimum(min_idx, 0))
max_overflow = np.maximum(max_idx - vol_shape, 0)
if 'strict' in overflow:
min_overflow = np.zeros_like(min_overflow)
max_overflow = np.zeros_like(min_overflow)
if overflow == 'shift-strict':
min_idx -= max_overflow
max_idx += min_overflow
if overflow == 'shift-padding':
for ii in range(n_dims):
# no need to do anything if both min/max_overflow are 0 (no padding/shifting required at all)
# or if both are positive, because in this case we don't shift at all and we pad directly
if (min_overflow[ii] > 0) & (max_overflow[ii] == 0):
max_idx_new = max_idx[ii] + min_overflow[ii]
if max_idx_new <= vol_shape[ii]:
max_idx[ii] = max_idx_new
min_overflow[ii] = 0
else:
min_overflow[ii] = min_overflow[ii] - (vol_shape[ii] - max_idx[ii])
max_idx[ii] = vol_shape[ii]
elif (min_overflow[ii] == 0) & (max_overflow[ii] > 0):
min_idx_new = min_idx[ii] - max_overflow[ii]
if min_idx_new >= 0:
min_idx[ii] = min_idx_new
max_overflow[ii] = 0
else:
max_overflow[ii] = max_overflow[ii] - min_idx[ii]
min_idx[ii] = 0
# crop volume if necessary
min_idx = np.maximum(min_idx, 0)
max_idx = np.minimum(max_idx, vol_shape)
cropping = np.concatenate([min_idx, max_idx])
if np.any(cropping[:3] > 0) or np.any(cropping[3:] != vol_shape):
if n_dims == 3:
new_vol = new_vol[cropping[0]:cropping[3], cropping[1]:cropping[4], cropping[2]:cropping[5], ...]
elif n_dims == 2:
new_vol = new_vol[cropping[0]:cropping[2], cropping[1]:cropping[3], ...]
else:
raise ValueError('cannot crop volumes with more than 3 dimensions')
# pad volume if necessary
if np.any(min_overflow > 0) | np.any(max_overflow > 0):
pad_margins = tuple([(min_overflow[i], max_overflow[i]) for i in range(n_dims)])
pad_margins = tuple(list(pad_margins) + [(0, 0)]) if n_channels > 1 else pad_margins
new_vol = np.pad(new_vol, pad_margins, mode='constant', constant_values=0)
# if there's nothing to crop around, we return the input as is
else:
min_idx = min_overflow = np.zeros(3)
cropping = None
# return results
if aff is not None:
if n_dims == 2:
min_idx = np.append(min_idx, 0)
min_overflow = np.append(min_overflow, 0)
aff[0:3, -1] = aff[0:3, -1] + aff[:3, :3] @ min_idx
aff[:-1, -1] = aff[:-1, -1] - aff[:-1, :-1] @ min_overflow
return new_vol, cropping, aff
else:
return new_vol, cropping
def crop_volume_with_idx(volume, crop_idx, aff=None, n_dims=None, return_copy=True):
"""Crop a volume with given indices.
:param volume: a 2d or 3d numpy array
:param crop_idx: cropping indices, in the order [lower_bound_dim_1, ..., upper_bound_dim_1, ...].
Can be a list or a 1d numpy array.
:param aff: (optional) if aff is specified, this function returns an updated affine matrix of the volume after
cropping.
:param n_dims: (optional) number of dimensions (excluding channels) of the volume. If not provided, n_dims will be
inferred from the input volume.
:param return_copy: (optional) whether to return the original volume or a copy. Default is copy.
:return: the cropped volume, and the updated affine matrix if aff is not None.
"""
# get info
new_volume = volume.copy() if return_copy else volume
n_dims = int(np.array(crop_idx).shape[0] / 2) if n_dims is None else n_dims
# crop image
if n_dims == 2:
new_volume = new_volume[crop_idx[0]:crop_idx[2], crop_idx[1]:crop_idx[3], ...]
elif n_dims == 3:
new_volume = new_volume[crop_idx[0]:crop_idx[3], crop_idx[1]:crop_idx[4], crop_idx[2]:crop_idx[5], ...]
else:
raise Exception('cannot crop volumes with more than 3 dimensions')
if aff is not None:
aff[0:3, -1] = aff[0:3, -1] + aff[:3, :3] @ crop_idx[:3]
return new_volume, aff
else:
return new_volume
def pad_volume(volume, padding_shape, padding_value=0, aff=None, return_pad_idx=False):
"""Pad volume to a given shape
:param volume: volume to be padded
:param padding_shape: shape to pad volume to. Can be a number, a sequence or a 1d numpy array.
:param padding_value: (optional) value used for padding
:param aff: (optional) affine matrix of the volume
:param return_pad_idx: (optional) the pad_idx corresponds to the indices where we should crop the resulting
padded image (ie the output of this function) to go back to the original volume (ie the input of this function).
:return: padded volume, and updated affine matrix if aff is not None.
"""
# get info
new_volume = volume.copy()
vol_shape = new_volume.shape
n_dims, n_channels = utils.get_dims(vol_shape)
padding_shape = utils.reformat_to_list(padding_shape, length=n_dims, dtype='int')
# check if need to pad
if np.any(np.array(padding_shape, dtype='int32') > np.array(vol_shape[:n_dims], dtype='int32')):
# get padding margins
min_margins = np.maximum(np.int32(np.floor((np.array(padding_shape) - np.array(vol_shape)[:n_dims]) / 2)), 0)
max_margins = np.maximum(np.int32(np.ceil((np.array(padding_shape) - np.array(vol_shape)[:n_dims]) / 2)), 0)
pad_idx = np.concatenate([min_margins, min_margins + np.array(vol_shape[:n_dims])])
pad_margins = tuple([(min_margins[i], max_margins[i]) for i in range(n_dims)])
if n_channels > 1:
pad_margins = tuple(list(pad_margins) + [(0, 0)])
# pad volume
new_volume = np.pad(new_volume, pad_margins, mode='constant', constant_values=padding_value)
if aff is not None:
if n_dims == 2:
min_margins = np.append(min_margins, 0)
aff[:-1, -1] = aff[:-1, -1] - aff[:-1, :-1] @ min_margins
else:
pad_idx = np.concatenate([np.array([0] * n_dims), np.array(vol_shape[:n_dims])])
# sort outputs
output = [new_volume]
if aff is not None:
output.append(aff)
if return_pad_idx:
output.append(pad_idx)
return output[0] if len(output) == 1 else tuple(output)
def flip_volume(volume, axis=None, direction=None, aff=None, return_copy=True):
"""Flip volume along a specified axis.
If unknown, this axis can be inferred from an affine matrix with a specified anatomical direction.
:param volume: a numpy array
:param axis: (optional) axis along which to flip the volume. Can either be an int or a tuple.
:param direction: (optional) if axis is None, the volume can be flipped along an anatomical direction:
'rl' (right/left), 'ap' anterior/posterior), 'si' (superior/inferior).
:param aff: (optional) please provide an affine matrix if direction is not None
:param return_copy: (optional) whether to return the original volume or a copy. Default is copy.
:return: flipped volume
"""
new_volume = volume.copy() if return_copy else volume
assert (axis is not None) | ((aff is not None) & (direction is not None)), \
'please provide either axis, or an affine matrix with a direction'
# get flipping axis from aff if axis not provided
if (axis is None) & (aff is not None):
volume_axes = get_ras_axes(aff)
if direction == 'rl':
axis = volume_axes[0]
elif direction == 'ap':
axis = volume_axes[1]
elif direction == 'si':
axis = volume_axes[2]
else:
raise ValueError("direction should be 'rl', 'ap', or 'si', had %s" % direction)
# flip volume
return np.flip(new_volume, axis=axis)
def resample_volume(volume, aff, new_vox_size, interpolation='linear', blur=True):
"""This function resizes the voxels of a volume to a new provided size, while adjusting the header to keep the RAS
:param volume: a numpy array
:param aff: affine matrix of the volume
:param new_vox_size: new voxel size (3 - element numpy vector) in mm
:param interpolation: (optional) type of interpolation. Can be 'linear' or 'nearest'. Default is 'linear'.
:param blur: (optional) whether to blur before resampling to avoid aliasing effects.
Only used if the input volume is downsampled. Default is True.
:return: new volume and affine matrix
"""
pixdim = np.sqrt(np.sum(aff * aff, axis=0))[:-1]
new_vox_size = np.array(new_vox_size)
factor = pixdim / new_vox_size
sigmas = 0.25 / factor
sigmas[factor > 1] = 0 # don't blur if upsampling
volume_filt = gaussian_filter(volume, sigmas) if blur else volume
# volume2 = zoom(volume_filt, factor, order=1, mode='reflect', prefilter=False)
x = np.arange(0, volume_filt.shape[0])
y = np.arange(0, volume_filt.shape[1])
z = np.arange(0, volume_filt.shape[2])
my_interpolating_function = RegularGridInterpolator((x, y, z), volume_filt, method=interpolation)
start = - (factor - 1) / (2 * factor)
step = 1.0 / factor
stop = start + step * np.ceil(volume_filt.shape * factor)
xi = np.arange(start=start[0], stop=stop[0], step=step[0])
yi = np.arange(start=start[1], stop=stop[1], step=step[1])
zi = np.arange(start=start[2], stop=stop[2], step=step[2])
xi[xi < 0] = 0
yi[yi < 0] = 0
zi[zi < 0] = 0
xi[xi > (volume_filt.shape[0] - 1)] = volume_filt.shape[0] - 1
yi[yi > (volume_filt.shape[1] - 1)] = volume_filt.shape[1] - 1
zi[zi > (volume_filt.shape[2] - 1)] = volume_filt.shape[2] - 1
xig, yig, zig = np.meshgrid(xi, yi, zi, indexing='ij', sparse=True)
volume2 = my_interpolating_function((xig, yig, zig))
aff2 = aff.copy()
for c in range(3):
aff2[:-1, c] = aff2[:-1, c] / factor[c]
aff2[:-1, -1] = aff2[:-1, -1] - np.matmul(aff2[:-1, :-1], 0.5 * (factor - 1))
return volume2, aff2
def resample_volume_like(vol_ref, aff_ref, vol_flo, aff_flo, interpolation='linear'):
"""This function reslices a floating image to the space of a reference image
:param vol_ref: a numpy array with the reference volume
:param aff_ref: affine matrix of the reference volume
:param vol_flo: a numpy array with the floating volume
:param aff_flo: affine matrix of the floating volume
:param interpolation: (optional) type of interpolation. Can be 'linear' or 'nearest'. Default is 'linear'.
:return: resliced volume
"""
T = np.matmul(np.linalg.inv(aff_flo), aff_ref)
xf = np.arange(0, vol_flo.shape[0])
yf = np.arange(0, vol_flo.shape[1])
zf = np.arange(0, vol_flo.shape[2])
my_interpolating_function = RegularGridInterpolator((xf, yf, zf), vol_flo, bounds_error=False, fill_value=0.0,
method=interpolation)
xr = np.arange(0, vol_ref.shape[0])
yr = np.arange(0, vol_ref.shape[1])
zr = np.arange(0, vol_ref.shape[2])
xrg, yrg, zrg = np.meshgrid(xr, yr, zr, indexing='ij', sparse=False)
n = xrg.size
xrg = xrg.reshape([n])
yrg = yrg.reshape([n])
zrg = zrg.reshape([n])
bottom = np.ones_like(xrg)
coords = np.stack([xrg, yrg, zrg, bottom])
coords_new = np.matmul(T, coords)[:-1, :]
result = my_interpolating_function((coords_new[0, :], coords_new[1, :], coords_new[2, :]))
return result.reshape(vol_ref.shape)
def get_ras_axes(aff, n_dims=3):
"""This function finds the RAS axes corresponding to each dimension of a volume, based on its affine matrix.
:param aff: affine matrix Can be a 2d numpy array of size n_dims*n_dims, n_dims+1*n_dims+1, or n_dims*n_dims+1.
:param n_dims: number of dimensions (excluding channels) of the volume corresponding to the provided affine matrix.
:return: two numpy 1d arrays of length n_dims, one with the axes corresponding to RAS orientations,
and one with their corresponding direction.
"""
aff_inverted = np.linalg.inv(aff)
img_ras_axes = np.argmax(np.absolute(aff_inverted[0:n_dims, 0:n_dims]), axis=0)
for i in range(n_dims):
if i not in img_ras_axes:
unique, counts = np.unique(img_ras_axes, return_counts=True)
incorrect_value = unique[np.argmax(counts)]
img_ras_axes[np.where(img_ras_axes == incorrect_value)[0][-1]] = i
return img_ras_axes
def align_volume_to_ref(volume, aff, aff_ref=None, return_aff=False, n_dims=None, return_copy=True):
"""This function aligns a volume to a reference orientation (axis and direction) specified by an affine matrix.
:param volume: a numpy array
:param aff: affine matrix of the floating volume
:param aff_ref: (optional) affine matrix of the target orientation. Default is identity matrix.
:param return_aff: (optional) whether to return the affine matrix of the aligned volume
:param n_dims: (optional) number of dimensions (excluding channels) of the volume. If not provided, n_dims will be
inferred from the input volume.
:param return_copy: (optional) whether to return the original volume or a copy. Default is copy.
:return: aligned volume, with corresponding affine matrix if return_aff is True.
"""
# work on copy
new_volume = volume.copy() if return_copy else volume
aff_flo = aff.copy()
# default value for aff_ref
if aff_ref is None:
aff_ref = np.eye(4)
# extract ras axes
if n_dims is None:
n_dims, _ = utils.get_dims(new_volume.shape)
ras_axes_ref = get_ras_axes(aff_ref, n_dims=n_dims)
ras_axes_flo = get_ras_axes(aff_flo, n_dims=n_dims)
# align axes
aff_flo[:, ras_axes_ref] = aff_flo[:, ras_axes_flo]
for i in range(n_dims):
if ras_axes_flo[i] != ras_axes_ref[i]:
new_volume = np.swapaxes(new_volume, ras_axes_flo[i], ras_axes_ref[i])
swapped_axis_idx = np.where(ras_axes_flo == ras_axes_ref[i])
ras_axes_flo[swapped_axis_idx], ras_axes_flo[i] = ras_axes_flo[i], ras_axes_flo[swapped_axis_idx]
# align directions
dot_products = np.sum(aff_flo[:3, :3] * aff_ref[:3, :3], axis=0)
for i in range(n_dims):
if dot_products[i] < 0:
new_volume = np.flip(new_volume, axis=i)
aff_flo[:, i] = - aff_flo[:, i]
aff_flo[:3, 3] = aff_flo[:3, 3] - aff_flo[:3, i] * (new_volume.shape[i] - 1)
if return_aff:
return new_volume, aff_flo
else:
return new_volume
def blur_volume(volume, sigma, mask=None):
"""Blur volume with gaussian masks of given sigma.
:param volume: 2d or 3d numpy array
:param sigma: standard deviation of the gaussian kernels. Can be a number, a sequence or a 1d numpy array
:param mask: (optional) numpy array of the same shape as volume to correct for edge blurring effects.
Mask can be a boolean or numerical array. In the latter, the mask is computed by keeping all values above zero.
:return: blurred volume
"""
# initialisation
new_volume = volume.copy()
n_dims, _ = utils.get_dims(new_volume.shape)
sigma = utils.reformat_to_list(sigma, length=n_dims, dtype='float')
# blur image
new_volume = gaussian_filter(new_volume, sigma=sigma, mode='nearest') # nearest refers to edge padding
# correct edge effect if mask is not None
if mask is not None:
assert new_volume.shape == mask.shape, 'volume and mask should have the same dimensions: ' \
'got {0} and {1}'.format(new_volume.shape, mask.shape)
mask = (mask > 0) * 1.0
blurred_mask = gaussian_filter(mask, sigma=sigma, mode='nearest')
new_volume = new_volume / (blurred_mask + 1e-6)
new_volume[mask == 0] = 0
return new_volume
# --------------------------------------------------- edit label map ---------------------------------------------------
def correct_label_map(labels, list_incorrect_labels, list_correct_labels=None, use_nearest_label=False,
remove_zero=False, smooth=False):
"""This function corrects specified label values in a label map by either a list of given values, or by the nearest
label.
:param labels: a 2d or 3d label map
:param list_incorrect_labels: list of all label values to correct (eg [1, 2, 3]). Can also be a path to such a list.
:param list_correct_labels: (optional) list of correct label values to replace the incorrect ones.
Correct values must have the same order as their corresponding value in list_incorrect_labels.
When several correct values are possible for the same incorrect value, the nearest correct value will be selected at
each voxel to correct. In that case, the different correct values must be specified inside a list within
list_correct_labels (e.g. [10, 20, 30, [40, 50]).
:param use_nearest_label: (optional) whether to correct the incorrect label values with the nearest labels.
:param remove_zero: (optional) if use_nearest_label is True, set to True not to consider zero among the potential
candidates for the nearest neighbour. -1 will be returned when no solution are possible.
:param smooth: (optional) whether to smooth the corrected label map
:return: corrected label map
"""
assert (list_correct_labels is not None) | use_nearest_label, \
'please provide a list of correct labels, or set use_nearest_label to True.'
assert (list_correct_labels is None) | (not use_nearest_label), \
'cannot provide a list of correct values and set use_nearest_label to True'
# initialisation
new_labels = labels.copy()
list_incorrect_labels = utils.reformat_to_list(utils.load_array_if_path(list_incorrect_labels))
volume_labels = np.unique(labels)
n_dims, _ = utils.get_dims(labels.shape)
# use list of correct values
if list_correct_labels is not None:
list_correct_labels = utils.reformat_to_list(utils.load_array_if_path(list_correct_labels))
# loop over label values
for incorrect_label, correct_label in zip(list_incorrect_labels, list_correct_labels):
if incorrect_label in volume_labels:
# only one possible value to replace with
if isinstance(correct_label, (int, float, np.int64, np.int32, np.int16, np.int8)):
incorrect_voxels = np.where(labels == incorrect_label)
new_labels[incorrect_voxels] = correct_label
# several possibilities
elif isinstance(correct_label, (tuple, list)):
# make sure at least one correct label is present
if not any([lab in volume_labels for lab in correct_label]):
print('no correct values found in volume, please adjust: '
'incorrect: {}, correct: {}'.format(incorrect_label, correct_label))
# crop around incorrect label until we find incorrect labels
correct_label_not_found = True
margin_mult = 1
tmp_labels = None
crop = None
while correct_label_not_found:
tmp_labels, crop = crop_volume_around_region(labels,
masking_labels=incorrect_label,
margin=10 * margin_mult)
correct_label_not_found = not any([lab in np.unique(tmp_labels) for lab in correct_label])
margin_mult += 1
# calculate distance maps for all new label candidates
incorrect_voxels = np.where(tmp_labels == incorrect_label)
distance_map_list = [distance_transform_edt(tmp_labels != lab) for lab in correct_label]
distances_correct = np.stack([dist[incorrect_voxels] for dist in distance_map_list])
# select nearest values and use them to correct label map
idx_correct_lab = np.argmin(distances_correct, axis=0)
incorrect_voxels = tuple([incorrect_voxels[i] + crop[i] for i in range(n_dims)])
new_labels[incorrect_voxels] = np.array(correct_label)[idx_correct_lab]
# use nearest label
else:
# loop over label values
for incorrect_label in list_incorrect_labels:
if incorrect_label in volume_labels:
# loop around regions
components, n_components = scipy_label(labels == incorrect_label)
loop_info = utils.LoopInfo(n_components + 1, 100, 'correcting')
for i in range(1, n_components + 1):
loop_info.update(i)
# crop each region
_, crop = crop_volume_around_region(components, masking_labels=i, margin=1)
tmp_labels = crop_volume_with_idx(labels, crop)
tmp_new_labels = crop_volume_with_idx(new_labels, crop)
# list all possible correct labels
correct_labels = np.unique(tmp_labels)
for il in list_incorrect_labels:
correct_labels = np.delete(correct_labels, np.where(correct_labels == il))
if remove_zero:
correct_labels = np.delete(correct_labels, np.where(correct_labels == 0))
# replace incorrect voxels by new value
incorrect_voxels = np.where(tmp_labels == incorrect_label)
if len(correct_labels) == 0:
tmp_new_labels[incorrect_voxels] = -1
else:
if len(correct_labels) == 1:
idx_correct_lab = np.zeros(len(incorrect_voxels[0]), dtype='int32')
else:
distance_map_list = [distance_transform_edt(tmp_labels != lab) for lab in correct_labels]
distances_correct = np.stack([dist[incorrect_voxels] for dist in distance_map_list])
idx_correct_lab = np.argmin(distances_correct, axis=0)
tmp_new_labels[incorrect_voxels] = np.array(correct_labels)[idx_correct_lab]
# paste back
if n_dims == 2:
new_labels[crop[0]:crop[2], crop[1]:crop[3], ...] = tmp_new_labels
else:
new_labels[crop[0]:crop[3], crop[1]:crop[4], crop[2]:crop[5], ...] = tmp_new_labels
# smoothing
if smooth:
kernel = np.ones(tuple([3] * n_dims))
new_labels = smooth_label_map(new_labels, kernel)
return new_labels
def mask_label_map(labels, masking_values, masking_value=0, return_mask=False):
"""
This function masks a label map around a list of specified values.
:param labels: input label map
:param masking_values: list of values to mask around
:param masking_value: (optional) value to mask the label map with
:param return_mask: (optional) whether to return the applied mask
:return: the masked label map, and the applied mask if return_mask is True.
"""
# build mask and mask labels
mask = np.zeros(labels.shape, dtype=bool)
masked_labels = labels.copy()
for value in utils.reformat_to_list(masking_values):
mask = mask | (labels == value)
masked_labels[np.logical_not(mask)] = masking_value
if return_mask:
mask = mask * 1
return masked_labels, mask
else:
return masked_labels
def smooth_label_map(labels, kernel, labels_list=None, print_progress=0):
"""This function smooth an input label map by replacing each voxel by the value of its most numerous neighbour.
:param labels: input label map
:param kernel: kernel when counting neighbours. Must contain only zeros or ones.
:param labels_list: list of label values to smooth. Defaults is None, where all labels are smoothed.
:param print_progress: (optional) If not 0, interval at which to print the number of processed labels.
:return: smoothed label map
"""
# get info
labels_shape = labels.shape
unique_labels = np.unique(labels).astype('int32')
if labels_list is None:
labels_list = unique_labels
new_labels = mask_new_labels = None
else:
labels_to_keep = [lab for lab in unique_labels if lab not in labels_list]
new_labels, mask_new_labels = mask_label_map(labels, labels_to_keep, return_mask=True)
# loop through label values
count = np.zeros(labels_shape)
labels_smoothed = np.zeros(labels_shape, dtype='int')
loop_info = utils.LoopInfo(len(labels_list), print_progress, 'smoothing')
for la, label in enumerate(labels_list):
if print_progress:
loop_info.update(la)
# count neighbours with same value
mask = (labels == label) * 1
n_neighbours = convolve(mask, kernel)
# update label map and maximum neighbour counts
idx = n_neighbours > count
count[idx] = n_neighbours[idx]
labels_smoothed[idx] = label
labels_smoothed = labels_smoothed.astype('int32')
if new_labels is None:
new_labels = labels_smoothed
else:
new_labels = np.where(mask_new_labels, new_labels, labels_smoothed)
return new_labels
def erode_label_map(labels, labels_to_erode, erosion_factors=1., gpu=False, model=None, return_model=False):
"""Erode a given set of label values within a label map.
:param labels: a 2d or 3d label map
:param labels_to_erode: list of label values to erode
:param erosion_factors: (optional) list of erosion factors to use for each label. If values are integers, normal
erosion applies. If float, we first 1) blur a mask of the corresponding label value, and 2) use the erosion factor
as a threshold in the blurred mask.
If erosion_factors is a single value, the same factor will be applied to all labels.
:param gpu: (optional) whether to use a fast gpu model for blurring (if erosion factors are floats)
:param model: (optional) gpu model for blurring masks (if erosion factors are floats)
:param return_model: (optional) whether to return the gpu blurring model
:return: eroded label map, and gpu blurring model is return_model is True.
"""
# reformat labels_to_erode and erode
new_labels = labels.copy()
labels_to_erode = utils.reformat_to_list(labels_to_erode)
erosion_factors = utils.reformat_to_list(erosion_factors, length=len(labels_to_erode))
labels_shape = list(new_labels.shape)
n_dims, _ = utils.get_dims(labels_shape)
# loop over labels to erode
for label_to_erode, erosion_factor in zip(labels_to_erode, erosion_factors):
assert erosion_factor > 0, 'all erosion factors should be strictly positive, had {}'.format(erosion_factor)
# get mask of current label value
mask = (new_labels == label_to_erode)
# erode as usual if erosion factor is int
if int(erosion_factor) == erosion_factor:
erode_struct = utils.build_binary_structure(int(erosion_factor), n_dims)
eroded_mask = binary_erosion(mask, erode_struct)
# blur mask and use erosion factor as a threshold if float
else:
if gpu:
if model is None:
mask_in = KL.Input(shape=labels_shape + [1], dtype='float32')
blurred_mask = GaussianBlur([1] * 3)(mask_in)
model = Model(inputs=mask_in, outputs=blurred_mask)
eroded_mask = model.predict(utils.add_axis(np.float32(mask), axis=[0, -1]))
else:
eroded_mask = blur_volume(np.array(mask, dtype='float32'), 1)
eroded_mask = np.squeeze(eroded_mask) > erosion_factor
# crop label map and mask around values to change
mask = mask & np.logical_not(eroded_mask)
cropped_lab_mask, cropping = crop_volume_around_region(mask, margin=3)
cropped_labels = crop_volume_with_idx(new_labels, cropping)
# calculate distance maps for all labels in cropped_labels
labels_list = np.unique(cropped_labels)
labels_list = labels_list[labels_list != label_to_erode]
list_dist_maps = [distance_transform_edt(np.logical_not(cropped_labels == la)) for la in labels_list]
candidate_distances = np.stack([dist[cropped_lab_mask] for dist in list_dist_maps])
# select nearest value and put cropped labels back to full label map
idx_correct_lab = np.argmin(candidate_distances, axis=0)
cropped_labels[cropped_lab_mask] = np.array(labels_list)[idx_correct_lab]
if n_dims == 2:
new_labels[cropping[0]:cropping[2], cropping[1]:cropping[3], ...] = cropped_labels
elif n_dims == 3:
new_labels[cropping[0]:cropping[3], cropping[1]:cropping[4], cropping[2]:cropping[5], ...] = cropped_labels
if return_model:
return new_labels, model
else:
return new_labels
def get_largest_connected_component(mask, structure=None):
"""Function to get the largest connected component for a given input.
:param mask: a 2d or 3d label map of boolean type.
:param structure: numpy array defining the connectivity.
"""
components, n_components = scipy_label(mask, structure)
return components == np.argmax(np.bincount(components.flat)[1:]) + 1 if n_components > 0 else mask.copy()
def compute_hard_volumes(labels, voxel_volume=1., label_list=None, skip_background=True):
"""Compute hard volumes in a label map.
:param labels: a label map
:param voxel_volume: (optional) volume of voxel. Default is 1 (i.e. returned volumes are voxel counts).
:param label_list: (optional) list of labels to compute volumes for. Can be an int, a sequence, or a numpy array.
If None, the volumes of all label values are computed.
:param skip_background: (optional) whether to skip computing the volume of the background.
If label_list is None, this assumes background value is 0.
If label_list is not None, this assumes the background is the first value in label list.
:return: numpy 1d vector with the volumes of each structure
"""
# initialisation
subject_label_list = utils.reformat_to_list(np.unique(labels), dtype='int')
if label_list is None:
label_list = subject_label_list
else:
label_list = utils.reformat_to_list(label_list)
if skip_background:
label_list = label_list[1:]
volumes = np.zeros(len(label_list))
# loop over label values
for idx, label in enumerate(label_list):
if label in subject_label_list:
mask = (labels == label) * 1
volumes[idx] = np.sum(mask)
else:
volumes[idx] = 0
return volumes * voxel_volume
def compute_distance_map(labels, masking_labels=None, crop_margin=None):
"""Compute distance map for a given list of label values in a label map.
:param labels: a label map
:param masking_labels: (optional) list of label values to mask the label map with. The distances will be computed
for these labels only. Default is None, where all positive values are considered.
:param crop_margin: (optional) margin with which to crop the input label maps around the labels for which we
want to compute the distance maps.
:return: a distance map with positive values inside the considered regions, and negative values outside."""
n_dims, _ = utils.get_dims(labels.shape)
# crop label map if necessary
if crop_margin is not None:
tmp_labels, crop_idx = crop_volume_around_region(labels, margin=crop_margin)
else:
tmp_labels = labels
crop_idx = None
# mask label map around specify values
if masking_labels is not None:
masking_labels = utils.reformat_to_list(masking_labels)
mask = np.zeros(tmp_labels.shape, dtype='bool')
for masking_label in masking_labels:
mask = mask | tmp_labels == masking_label
else:
mask = tmp_labels > 0
not_mask = np.logical_not(mask)
# compute distances
dist_in = distance_transform_edt(mask)
dist_in = np.where(mask, dist_in - 0.5, dist_in)
dist_out = - distance_transform_edt(not_mask)
dist_out = np.where(not_mask, dist_out + 0.5, dist_out)
tmp_dist = dist_in + dist_out
# put back in original matrix if we cropped
if crop_idx is not None:
dist = np.min(tmp_dist) * np.ones(labels.shape, dtype='float32')
if n_dims == 3:
dist[crop_idx[0]:crop_idx[3], crop_idx[1]:crop_idx[4], crop_idx[2]:crop_idx[5], ...] = tmp_dist
elif n_dims == 2:
dist[crop_idx[0]:crop_idx[2], crop_idx[1]:crop_idx[3], ...] = tmp_dist
else:
dist = tmp_dist
return dist
# ------------------------------------------------- edit volumes in dir ------------------------------------------------
def mask_images_in_dir(image_dir, result_dir, mask_dir=None, threshold=0.1, dilate=0, erode=0, fill_holes=False,
masking_value=0, write_mask=False, mask_result_dir=None, recompute=True):
"""Mask all volumes in a folder, either with masks in a specified folder, or by keeping only the intensity values
above a specified threshold.
:param image_dir: path of directory with images to mask
:param result_dir: path of directory where masked images will be writen
:param mask_dir: (optional) path of directory containing masks. Masks are matched to images by sorting order.
Mask volumes don't have to be boolean or 0/1 arrays as all strictly positive values are used to build the masks.
Masks should have the same size as images. If images are multi-channel, masks can either be uni- or multi-channel.
In the first case, the same mask is applied to all channels.
:param threshold: (optional) If mask is None, masking is performed by keeping thresholding the input.
:param dilate: (optional) number of voxels by which to dilate the provided or computed masks.
:param erode: (optional) number of voxels by which to erode the provided or computed masks.
:param fill_holes: (optional) whether to fill the holes in the provided or computed masks.
:param masking_value: (optional) masking value
:param write_mask: (optional) whether to write the applied masks
:param mask_result_dir: (optional) path of resulting masks, if write_mask is True
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
if mask_result_dir is not None:
utils.mkdir(mask_result_dir)
# get path masks if necessary
path_images = utils.list_images_in_folder(image_dir)
if mask_dir is not None:
path_masks = utils.list_images_in_folder(mask_dir)
else:
path_masks = [None] * len(path_images)
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'masking', True)
for idx, (path_image, path_mask) in enumerate(zip(path_images, path_masks)):
loop_info.update(idx)
# mask images
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
if path_mask is not None:
mask = utils.load_volume(path_mask)
else:
mask = None
im = mask_volume(im, mask, threshold, dilate, erode, fill_holes, masking_value, write_mask)
# write mask if necessary
if write_mask:
assert mask_result_dir is not None, 'if write_mask is True, mask_result_dir has to be specified as well'
mask_result_path = os.path.join(mask_result_dir, os.path.basename(path_image))
utils.save_volume(im[1], aff, h, mask_result_path)
utils.save_volume(im[0], aff, h, path_result)
else:
utils.save_volume(im, aff, h, path_result)
def rescale_images_in_dir(image_dir, result_dir,
new_min=0, new_max=255,
min_percentile=2, max_percentile=98, use_positive_only=True,
recompute=True):
"""This function linearly rescales all volumes in image_dir between new_min and new_max.
:param image_dir: path of directory with images to rescale
:param result_dir: path of directory where rescaled images will be writen
:param new_min: (optional) minimum value for the rescaled images.
:param new_max: (optional) maximum value for the rescaled images.
:param min_percentile: (optional) percentile for estimating robust minimum of volume (float in [0,...100]),
where 0 = np.min
:param max_percentile: (optional) percentile for estimating robust maximum of volume (float in [0,...100]),
where 100 = np.max
:param use_positive_only: (optional) whether to use only positive values when estimating the min and max percentile
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over images
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'rescaling', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
im = rescale_volume(im, new_min, new_max, min_percentile, max_percentile, use_positive_only)
utils.save_volume(im, aff, h, path_result)
def crop_images_in_dir(image_dir, result_dir, cropping_margin=None, cropping_shape=None, recompute=True):
"""Crop all volumes in a folder by a given margin, or to a given shape.
:param image_dir: path of directory with images to rescale
:param result_dir: path of directory where cropped images will be writen
:param cropping_margin: (optional) margin by which to crop the volume.
Can be an int, a sequence or a 1d numpy array. Should be given if cropping_shape is None.
:param cropping_shape: (optional) shape to which the volume will be cropped.
Can be an int, a sequence or a 1d numpy array. Should be given if cropping_margin is None.
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over images and masks
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'cropping', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# crop image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
volume, aff, h = utils.load_volume(path_image, im_only=False)
volume, aff = crop_volume(volume, cropping_margin, cropping_shape, aff)
utils.save_volume(volume, aff, h, path_result)
def crop_images_around_region_in_dir(image_dir,
result_dir,
mask_dir=None,
threshold=0.1,
masking_labels=None,
crop_margin=5,
recompute=True):
"""Crop all volumes in a folder around a region, which is defined for each volume by a mask obtained by either
1) directly providing it as input
2) thresholding the input volume
3) keeping a set of label values if the volume is a label map.
:param image_dir: path of directory with images to crop
:param result_dir: path of directory where cropped images will be writen
:param mask_dir: (optional) path of directory of input masks
:param threshold: (optional) lower bound to determine values to crop around
:param masking_labels: (optional) if the volume is a label map, it can be cropped around a given set of labels by
specifying them in masking_labels, which can either be a single int, a list or a 1d numpy array.
:param crop_margin: (optional) cropping margin
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# list volumes and masks
path_images = utils.list_images_in_folder(image_dir)
if mask_dir is not None:
path_masks = utils.list_images_in_folder(mask_dir)
else:
path_masks = [None] * len(path_images)
# loop over images and masks
loop_info = utils.LoopInfo(len(path_images), 10, 'cropping', True)
for idx, (path_image, path_mask) in enumerate(zip(path_images, path_masks)):
loop_info.update(idx)
# crop image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
volume, aff, h = utils.load_volume(path_image, im_only=True)
if path_mask is not None:
mask = utils.load_volume(path_mask)
else:
mask = None
volume, cropping, aff = crop_volume_around_region(volume, mask, threshold, masking_labels, crop_margin, aff)
utils.save_volume(volume, aff, h, path_result)
def pad_images_in_dir(image_dir, result_dir, max_shape=None, padding_value=0, recompute=True):
"""Pads all the volumes in a folder to the same shape (either provided or computed).
:param image_dir: path of directory with images to pad
:param result_dir: path of directory where padded images will be writen
:param max_shape: (optional) shape to pad the volumes to. Can be an int, a sequence or a 1d numpy array.
If None, volumes will be padded to the shape of the biggest volume in image_dir.
:param padding_value: (optional) value to pad the volumes with.
:param recompute: (optional) whether to recompute result files even if they already exist
:return: shape of the padded volumes.
"""
# create result dir
utils.mkdir(result_dir)
# list labels
path_images = utils.list_images_in_folder(image_dir)
# get maximum shape
if max_shape is None:
max_shape, aff, _, _, h, _ = utils.get_volume_info(path_images[0])
for path_image in path_images[1:]:
image_shape, aff, _, _, h, _ = utils.get_volume_info(path_image)
max_shape = tuple(np.maximum(np.asarray(max_shape), np.asarray(image_shape)))
max_shape = np.array(max_shape)
# loop over label maps
loop_info = utils.LoopInfo(len(path_images), 10, 'padding', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# pad map
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
im, aff = pad_volume(im, max_shape, padding_value, aff)
utils.save_volume(im, aff, h, path_result)
return max_shape
def flip_images_in_dir(image_dir, result_dir, axis=None, direction=None, recompute=True):
"""Flip all images in a directory along a specified axis.
If unknown, this axis can be replaced by an anatomical direction.
:param image_dir: path of directory with images to flip
:param result_dir: path of directory where flipped images will be writen
:param axis: (optional) axis along which to flip the volume
:param direction: (optional) if axis is None, the volume can be flipped along an anatomical direction:
'rl' (right/left), 'ap' (anterior/posterior), 'si' (superior/inferior).
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over images
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'flipping', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# flip image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
im = flip_volume(im, axis=axis, direction=direction, aff=aff)
utils.save_volume(im, aff, h, path_result)
def align_images_in_dir(image_dir, result_dir, aff_ref=None, path_ref=None, recompute=True):
"""This function aligns all images in image_dir to a reference orientation (axes and directions).
This reference orientation can be directly provided as an affine matrix, or can be specified by a reference volume.
If neither are provided, the reference orientation is assumed to be an identity matrix.
:param image_dir: path of directory with images to align
:param result_dir: path of directory where flipped images will be writen
:param aff_ref: (optional) reference affine matrix. Can be a numpy array, or the path to such array.
:param path_ref: (optional) path of a volume to which all images will be aligned. Can also be the path to a folder
with as many images as in image_dir, in which case each image in image_dir is aligned to its counterpart in path_ref
(they are matched by sorting order).
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
path_images = utils.list_images_in_folder(image_dir)
# read reference affine matrix
if path_ref is not None:
assert aff_ref is None, 'cannot provide aff_ref and path_ref together.'
basename = os.path.basename(path_ref)
if ('.nii.gz' in basename) | ('.nii' in basename) | ('.mgz' in basename) | ('.npz' in basename):
_, aff_ref, _ = utils.load_volume(path_ref, im_only=False)
path_refs = [None] * len(path_images)
else:
path_refs = utils.list_images_in_folder(path_ref)
elif aff_ref is not None:
aff_ref = utils.load_array_if_path(aff_ref)
path_refs = [None] * len(path_images)
else:
aff_ref = np.eye(4)
path_refs = [None] * len(path_images)
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'aligning', True)
for idx, (path_image, path_ref) in enumerate(zip(path_images, path_refs)):
loop_info.update(idx)
# align image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
if path_ref is not None:
_, aff_ref, _ = utils.load_volume(path_ref, im_only=False)
im, aff = align_volume_to_ref(im, aff, aff_ref=aff_ref, return_aff=True)
utils.save_volume(im, aff, h, path_result)
def correct_nans_images_in_dir(image_dir, result_dir, recompute=True):
"""Correct NaNs in all images in a directory.
:param image_dir: path of directory with images to correct
:param result_dir: path of directory where corrected images will be writen
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over images
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'correcting', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# flip image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
im[np.isnan(im)] = 0
utils.save_volume(im, aff, h, path_result)
def blur_images_in_dir(image_dir, result_dir, sigma, mask_dir=None, gpu=False, recompute=True):
"""This function blurs all the images in image_dir with kernels of the specified std deviations.
:param image_dir: path of directory with images to blur
:param result_dir: path of directory where blurred images will be writen
:param sigma: standard deviation of the blurring gaussian kernels.
Can be a number (isotropic blurring), or a sequence with the same length as the number of dimensions of images.
:param mask_dir: (optional) path of directory with masks of the region to blur.
Images and masks are matched by sorting order.
:param gpu: (optional) whether to use a fast gpu model for blurring
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# list images and masks
path_images = utils.list_images_in_folder(image_dir)
if mask_dir is not None:
path_masks = utils.list_images_in_folder(mask_dir)
else:
path_masks = [None] * len(path_images)
# loop over images
previous_model_input_shape = None
model = None
loop_info = utils.LoopInfo(len(path_images), 10, 'blurring', True)
for idx, (path_image, path_mask) in enumerate(zip(path_images, path_masks)):
loop_info.update(idx)
# load image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
im, im_shape, aff, n_dims, _, h, _ = utils.get_volume_info(path_image, return_volume=True)
if path_mask is not None:
mask = utils.load_volume(path_mask)
assert mask.shape == im.shape, 'mask and image should have the same shape'
else:
mask = None
# blur image
if gpu:
if (im_shape != previous_model_input_shape) | (model is None):
previous_model_input_shape = im_shape
inputs = [KL.Input(shape=im_shape + [1])]
sigma = utils.reformat_to_list(sigma, length=n_dims)
if mask is None:
image = GaussianBlur(sigma=sigma)(inputs[0])
else:
inputs.append(KL.Input(shape=im_shape + [1], dtype='float32'))
image = GaussianBlur(sigma=sigma, use_mask=True)(inputs)
model = Model(inputs=inputs, outputs=image)
if mask is None:
im = np.squeeze(model.predict(utils.add_axis(im, axis=[0, -1])))
else:
im = np.squeeze(model.predict([utils.add_axis(im, [0, -1]), utils.add_axis(mask, [0, -1])]))
else:
im = blur_volume(im, sigma, mask=mask)
utils.save_volume(im, aff, h, path_result)
def create_mutlimodal_images(list_channel_dir, result_dir, recompute=True):
"""This function forms multimodal images by stacking channels located in different folders.
:param list_channel_dir: list of all directories, each containing the same channel for all images.
Channels are matched between folders by sorting order.
:param result_dir: path of directory where multimodal images will be writen
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
assert isinstance(list_channel_dir, (list, tuple)), 'list_channel_dir should be a list or a tuple'
# gather path of all images for all channels
list_channel_paths = [utils.list_images_in_folder(d) for d in list_channel_dir]
n_images = len(list_channel_paths[0])
n_channels = len(list_channel_dir)
for channel_paths in list_channel_paths:
if len(channel_paths) != n_images:
raise ValueError('all directories should have the same number of files')
# loop over images
loop_info = utils.LoopInfo(n_images, 10, 'processing', True)
for idx in range(n_images):
loop_info.update(idx)
# stack all channels and save multichannel image
path_result = os.path.join(result_dir, os.path.basename(list_channel_paths[0][idx]))
if (not os.path.isfile(path_result)) | recompute:
list_channels = list()
tmp_aff = None
tmp_h = None
for channel_idx in range(n_channels):
tmp_channel, tmp_aff, tmp_h = utils.load_volume(list_channel_paths[channel_idx][idx], im_only=False)
list_channels.append(tmp_channel)
im = np.stack(list_channels, axis=-1)
utils.save_volume(im, tmp_aff, tmp_h, path_result)
def convert_images_in_dir_to_nifty(image_dir, result_dir, aff=None, ref_aff_dir=None, recompute=True):
"""Converts all images in image_dir to nifty format.
:param image_dir: path of directory with images to convert
:param result_dir: path of directory where converted images will be writen
:param aff: (optional) affine matrix in homogeneous coordinates with which to write the images.
Can also be 'FS' to write images with FreeSurfer typical affine matrix.
:param ref_aff_dir: (optional) alternatively to providing a fixed aff, different affine matrices can be used for
each image in image_dir by matching them to corresponding volumes contained in ref_aff_dir.
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# list images
path_images = utils.list_images_in_folder(image_dir)
if ref_aff_dir is not None:
path_ref_images = utils.list_images_in_folder(ref_aff_dir)
else:
path_ref_images = [None] * len(path_images)
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'converting', True)
for idx, (path_image, path_ref) in enumerate(zip(path_images, path_ref_images)):
loop_info.update(idx)
# convert images to nifty format
path_result = os.path.join(result_dir, os.path.basename(utils.strip_extension(path_image))) + '.nii.gz'
if (not os.path.isfile(path_result)) | recompute:
if utils.get_image_extension(path_image) == 'nii.gz':
shutil.copy2(path_image, path_result)
else:
im, tmp_aff, h = utils.load_volume(path_image, im_only=False)
if aff is not None:
tmp_aff = aff
elif path_ref is not None:
_, tmp_aff, h = utils.load_volume(path_ref, im_only=False)
utils.save_volume(im, tmp_aff, h, path_result)
def mri_convert_images_in_dir(image_dir,
result_dir,
interpolation=None,
reference_dir=None,
same_reference=False,
voxsize=None,
path_freesurfer='/usr/local/freesurfer',
mri_convert_path='/usr/local/freesurfer/bin/mri_convert',
recompute=True):
"""This function launches mri_convert on all images contained in image_dir, and writes the results in result_dir.
The interpolation type can be specified (i.e. 'nearest'), as well as a folder containing references for resampling.
reference_dir can be the path of a single *image* if same_reference=True.
:param image_dir: path of directory with images to convert
:param result_dir: path of directory where converted images will be writen
:param interpolation: (optional) interpolation type, can be 'inter' (default), 'cubic', 'nearest', 'trilinear'
:param reference_dir: (optional) path of directory with reference images. References are matched to images by
sorting order. If same_reference is false, references and images are matched by sorting order.
This can also be the path to a single image that will be used as reference for all images im image_dir (set
same_reference to true in that case).
:param same_reference: (optional) whether to use a single image as reference for all images to interpolate.
:param voxsize: (optional) resolution at which to resample converted image. Must be a list of length n_dims.
:param path_freesurfer: (optional) path FreeSurfer home
:param mri_convert_path: (optional) path mri_convert binary file
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
mri_convert = mri_convert_path + ' '
# list images
path_images = utils.list_images_in_folder(image_dir)
if reference_dir is not None:
if same_reference:
path_references = [reference_dir] * len(path_images)
else:
path_references = utils.list_images_in_folder(reference_dir)
assert len(path_references) == len(path_images), 'different number of files in image_dir and reference_dir'
else:
path_references = [None] * len(path_images)
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'converting', True)
for idx, (path_image, path_reference) in enumerate(zip(path_images, path_references)):
loop_info.update(idx)
# convert image
path_result = os.path.join(result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_result)) | recompute:
cmd = mri_convert + path_image + ' ' + path_result + ' -odt float'
if interpolation is not None:
cmd += ' -rt ' + interpolation
if reference_dir is not None:
cmd += ' -rl ' + path_reference
if voxsize is not None:
voxsize = utils.reformat_to_list(voxsize, dtype='float')
cmd += ' --voxsize ' + ' '.join([str(np.around(v, 3)) for v in voxsize])
os.system(cmd)
def samseg_images_in_dir(image_dir,
result_dir,
atlas_dir=None,
threads=4,
path_freesurfer='/usr/local/freesurfer',
keep_segm_only=True,
recompute=True):
"""This function launches samseg for all images contained in image_dir and writes the results in result_dir.
If keep_segm_only=True, the result segmentation is copied in result_dir and SAMSEG's intermediate result dir is
deleted.
:param image_dir: path of directory with input images
:param result_dir: path of directory where processed images folders (if keep_segm_only is False),
or samseg segmentation (if keep_segm_only is True) will be writen
:param atlas_dir: (optional) path of samseg atlas directory. If None, use samseg default atlas.
:param threads: (optional) number of threads to use
:param path_freesurfer: (optional) path FreeSurfer home
:param keep_segm_only: (optional) whether to keep samseg result folders, or only samseg segmentations.
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
path_samseg = os.path.join(path_freesurfer, 'bin', 'run_samseg')
# loop over images
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'processing', True)
for idx, path_image in enumerate(path_images):
loop_info.update(idx)
# build path_result
path_im_result_dir = os.path.join(result_dir, utils.strip_extension(os.path.basename(path_image)))
path_samseg_result = os.path.join(path_im_result_dir, 'seg.mgz')
if keep_segm_only:
path_result = os.path.join(result_dir, utils.strip_extension(os.path.basename(path_image)) + '_seg.mgz')
else:
path_result = path_samseg_result
# run samseg
if (not os.path.isfile(path_result)) | recompute:
cmd = utils.mkcmd(path_samseg, '-i', path_image, '-o', path_im_result_dir, '--threads', threads)
if atlas_dir is not None:
cmd = utils.mkcmd(cmd, '-a', atlas_dir)
os.system(cmd)
# move segmentation to result_dir if necessary
if keep_segm_only:
if os.path.isfile(path_samseg_result):
shutil.move(path_samseg_result, path_result)
if os.path.isdir(path_im_result_dir):
shutil.rmtree(path_im_result_dir)
def niftyreg_images_in_dir(image_dir,
reference_dir,
nifty_reg_function='reg_resample',
input_transformation_dir=None,
result_dir=None,
result_transformation_dir=None,
interpolation=None,
same_floating=False,
same_reference=False,
same_transformation=False,
path_nifty_reg='/home/benjamin/Softwares/niftyreg-gpu/build/reg-apps',
recompute=True):
"""This function launches one of niftyreg functions (reg_aladin, reg_f3d, reg_resample) on all images contained
in image_dir.
:param image_dir: path of directory with images to register. Can also be a single image, in that case set
same_floating to True.
:param reference_dir: path of directory with reference images. If same_reference is false, references and images are
matched by sorting order. This can also be the path to a single image that will be used as reference for all images
im image_dir (set same_reference to True in that case).
:param nifty_reg_function: (optional) name of the niftyreg function to use. Can be 'reg_aladin', 'reg_f3d', or
'reg_resample'. Default is 'reg_resample'.
:param input_transformation_dir: (optional) path of a directory containing all the input transformation (for
reg_resample, or reg_f3d). Can also be the path to a single transformation that will be used for all images
in image_dir (set same_transformation to True in that case).
:param result_dir: path of directory where output images will be writen.
:param result_transformation_dir: path of directory where resulting transformations will be writen (for
reg_aladin and reg_f3d).
:param interpolation: (optional) integer describing the order of the interpolation to apply (0 = nearest neighbours)
:param same_floating: (optional) set to true if only one image is used as floating image.
:param same_reference: (optional) whether to use a single image as reference for all input images.
:param same_transformation: (optional) whether to apply the same transformation to all floating images.
:param path_nifty_reg: (optional) path of the folder containing nifty-reg functions
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dirs
if result_dir is not None:
utils.mkdir(result_dir)
if result_transformation_dir is not None:
utils.mkdir(result_transformation_dir)
nifty_reg = os.path.join(path_nifty_reg, nifty_reg_function)
# list reference and floating images
path_images = utils.list_images_in_folder(image_dir)
path_references = utils.list_images_in_folder(reference_dir)
if same_reference:
path_references = utils.reformat_to_list(path_references, length=len(path_images))
if same_floating:
path_images = utils.reformat_to_list(path_images, length=len(path_references))
assert len(path_references) == len(path_images), 'different number of files in image_dir and reference_dir'
# list input transformations
if input_transformation_dir is not None:
if same_transformation:
path_input_transfs = utils.reformat_to_list(input_transformation_dir, length=len(path_images))
else:
path_input_transfs = utils.list_files(input_transformation_dir)
assert len(path_input_transfs) == len(path_images), 'different number of transformations and images'
else:
path_input_transfs = [None] * len(path_images)
# define flag input trans
if input_transformation_dir is not None:
if nifty_reg_function == 'reg_aladin':
flag_input_trans = '-inaff'
elif nifty_reg_function == 'reg_f3d':
flag_input_trans = '-aff'
elif nifty_reg_function == 'reg_resample':
flag_input_trans = '-trans'
else:
raise Exception('nifty_reg_function can only be "reg_aladin", "reg_f3d", or "reg_resample"')
else:
flag_input_trans = None
# define flag result transformation
if result_transformation_dir is not None:
if nifty_reg_function == 'reg_aladin':
flag_result_trans = '-aff'
elif nifty_reg_function == 'reg_f3d':
flag_result_trans = '-cpp'
else:
raise Exception('result_transformation_dir can only be used with "reg_aladin" or "reg_f3d"')
else:
flag_result_trans = None
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'processing', True)
for idx, (path_image, path_ref, path_input_trans) in enumerate(zip(path_images,
path_references,
path_input_transfs)):
loop_info.update(idx)
# define path registered image
name = os.path.basename(path_ref) if same_floating else os.path.basename(path_image)
if result_dir is not None:
path_result = os.path.join(result_dir, name)
result_already_computed = os.path.isfile(path_result)
else:
path_result = None
result_already_computed = True
# define path resulting transformation
if result_transformation_dir is not None:
if nifty_reg_function == 'reg_aladin':
path_result_trans = os.path.join(result_transformation_dir, utils.strip_extension(name) + '.txt')
result_trans_already_computed = os.path.isfile(path_result_trans)
else:
path_result_trans = os.path.join(result_transformation_dir, name)
result_trans_already_computed = os.path.isfile(path_result_trans)
else:
path_result_trans = None
result_trans_already_computed = True
if (not result_already_computed) | (not result_trans_already_computed) | recompute:
# build main command
cmd = utils.mkcmd(nifty_reg, '-ref', path_ref, '-flo', path_image, '-pad 0')
# add options
if path_result is not None:
cmd = utils.mkcmd(cmd, '-res', path_result)
if flag_input_trans is not None:
cmd = utils.mkcmd(cmd, flag_input_trans, path_input_trans)
if flag_result_trans is not None:
cmd = utils.mkcmd(cmd, flag_result_trans, path_result_trans)
if interpolation is not None:
cmd = utils.mkcmd(cmd, '-inter', interpolation)
# execute
os.system(cmd)
def upsample_anisotropic_images(image_dir,
resample_image_result_dir,
resample_like_dir,
path_freesurfer='/usr/local/freesurfer/',
recompute=True):
"""This function takes as input a set of LR images and resample them to HR with respect to reference images.
:param image_dir: path of directory with input images (only uni-modal images supported)
:param resample_image_result_dir: path of directory where resampled images will be writen
:param resample_like_dir: path of directory with reference images.
:param path_freesurfer: (optional) path freesurfer home, as this function uses mri_convert
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(resample_image_result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
mri_convert = os.path.join(path_freesurfer, 'bin/mri_convert')
# list images and labels
path_images = utils.list_images_in_folder(image_dir)
path_ref_images = utils.list_images_in_folder(resample_like_dir)
assert len(path_images) == len(path_ref_images), \
'the folders containing the images and their references are not the same size'
# loop over images
loop_info = utils.LoopInfo(len(path_images), 10, 'upsampling', True)
for idx, (path_image, path_ref) in enumerate(zip(path_images, path_ref_images)):
loop_info.update(idx)
# upsample image
_, _, n_dims, _, _, image_res = utils.get_volume_info(path_image, return_volume=False)
path_im_upsampled = os.path.join(resample_image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_im_upsampled)) | recompute:
cmd = utils.mkcmd(mri_convert, path_image, path_im_upsampled, '-rl', path_ref, '-odt float')
os.system(cmd)
path_dist_map = os.path.join(resample_image_result_dir, 'dist_map_' + os.path.basename(path_image))
if (not os.path.isfile(path_dist_map)) | recompute:
im, aff, h = utils.load_volume(path_image, im_only=False)
dist_map = np.meshgrid(*[np.arange(s) for s in im.shape], indexing='ij')
tmp_dir = utils.strip_extension(path_im_upsampled) + '_meshes'
utils.mkdir(tmp_dir)
path_meshes_up = list()
for (i, maps) in enumerate(dist_map):
path_mesh = os.path.join(tmp_dir, '%s_' % i + os.path.basename(path_image))
path_mesh_up = os.path.join(tmp_dir, 'up_%s_' % i + os.path.basename(path_image))
utils.save_volume(maps, aff, h, path_mesh)
cmd = utils.mkcmd(mri_convert, path_mesh, path_mesh_up, '-rl', path_im_upsampled, '-odt float')
os.system(cmd)
path_meshes_up.append(path_mesh_up)
mesh_up_0, aff, h = utils.load_volume(path_meshes_up[0], im_only=False)
mesh_up = np.stack([mesh_up_0] + [utils.load_volume(p) for p in path_meshes_up[1:]], -1)
shutil.rmtree(tmp_dir)
floor = np.floor(mesh_up)
ceil = np.ceil(mesh_up)
f_dist = mesh_up - floor
c_dist = ceil - mesh_up
dist = np.minimum(f_dist, c_dist) * utils.add_axis(image_res, axis=[0] * n_dims)
dist = np.sqrt(np.sum(dist ** 2, axis=-1))
utils.save_volume(dist, aff, h, path_dist_map)
def simulate_upsampled_anisotropic_images(image_dir,
downsample_image_result_dir,
resample_image_result_dir,
data_res,
labels_dir=None,
downsample_labels_result_dir=None,
slice_thickness=None,
build_dist_map=False,
path_freesurfer='/usr/local/freesurfer/',
gpu=True,
recompute=True):
"""This function takes as input a set of HR images and creates two datasets with it:
1) a set of LR images obtained by downsampling the HR images with nearest neighbour interpolation,
2) a set of HR images obtained by resampling the LR images to native HR with linear interpolation.
Additionally, this function can also create a set of LR labels from label maps corresponding to the input images.
:param image_dir: path of directory with input images (only uni-model images supported)
:param downsample_image_result_dir: path of directory where downsampled images will be writen
:param resample_image_result_dir: path of directory where resampled images will be writen
:param data_res: resolution of LR images. Can either be: an int, a float, a list or a numpy array.
:param labels_dir: (optional) path of directory with label maps corresponding to input images
:param downsample_labels_result_dir: (optional) path of directory where downsampled label maps will be writen
:param slice_thickness: (optional) thickness of slices to simulate. Can be a number, a list or a numpy array.
:param build_dist_map: (optional) whether to return the resampled images with an additional channel indicating the
distance of each voxel to the nearest acquired voxel. Default is False.
:param path_freesurfer: (optional) path freesurfer home, as this function uses mri_convert
:param gpu: (optional) whether to use a fast gpu model for blurring
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(resample_image_result_dir)
utils.mkdir(downsample_image_result_dir)
if labels_dir is not None:
assert downsample_labels_result_dir is not None, \
'downsample_labels_result_dir should not be None if labels_dir is specified'
utils.mkdir(downsample_labels_result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
mri_convert = os.path.join(path_freesurfer, 'bin/mri_convert')
# list images and labels
path_images = utils.list_images_in_folder(image_dir)
path_labels = [None] * len(path_images) if labels_dir is None else utils.list_images_in_folder(labels_dir)
# initialisation
_, _, n_dims, _, _, image_res = utils.get_volume_info(path_images[0], return_volume=False, aff_ref=np.eye(4))
data_res = np.squeeze(utils.reformat_to_n_channels_array(data_res, n_dims, n_channels=1))
slice_thickness = utils.reformat_to_list(slice_thickness, length=n_dims)
# loop over images
previous_model_input_shape = None
model = None
loop_info = utils.LoopInfo(len(path_images), 10, 'processing', True)
for idx, (path_image, path_labels) in enumerate(zip(path_images, path_labels)):
loop_info.update(idx)
# downsample image
path_im_downsampled = os.path.join(downsample_image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_im_downsampled)) | recompute:
im, _, aff, n_dims, _, h, image_res = utils.get_volume_info(path_image, return_volume=True)
im, aff_aligned = align_volume_to_ref(im, aff, aff_ref=np.eye(4), return_aff=True, n_dims=n_dims)
im_shape = list(im.shape[:n_dims])
sigma = blurring_sigma_for_downsampling(image_res, data_res, thickness=slice_thickness)
sigma = [0 if data_res[i] == image_res[i] else sigma[i] for i in range(n_dims)]
# blur image
if gpu:
if (im_shape != previous_model_input_shape) | (model is None):
previous_model_input_shape = im_shape
image_in = KL.Input(shape=im_shape + [1])
image = GaussianBlur(sigma=sigma)(image_in)
model = Model(inputs=image_in, outputs=image)
im = np.squeeze(model.predict(utils.add_axis(im, axis=[0, -1])))
else:
im = blur_volume(im, sigma, mask=None)
utils.save_volume(im, aff_aligned, h, path_im_downsampled)
# downsample blurred image
voxsize = ' '.join([str(r) for r in data_res])
cmd = utils.mkcmd(mri_convert, path_im_downsampled, path_im_downsampled, '--voxsize', voxsize,
'-odt float -rt nearest')
os.system(cmd)
# downsample labels if necessary
if path_labels is not None:
path_lab_downsampled = os.path.join(downsample_labels_result_dir, os.path.basename(path_labels))
if (not os.path.isfile(path_lab_downsampled)) | recompute:
cmd = utils.mkcmd(mri_convert, path_labels, path_lab_downsampled, '-rl', path_im_downsampled,
'-odt float -rt nearest')
os.system(cmd)
# upsample image
path_im_upsampled = os.path.join(resample_image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_im_upsampled)) | recompute:
cmd = utils.mkcmd(mri_convert, path_im_downsampled, path_im_upsampled, '-rl', path_image, '-odt float')
os.system(cmd)
if build_dist_map:
path_dist_map = os.path.join(resample_image_result_dir, 'dist_map_' + os.path.basename(path_image))
if (not os.path.isfile(path_dist_map)) | recompute:
im, aff, h = utils.load_volume(path_im_downsampled, im_only=False)
dist_map = np.meshgrid(*[np.arange(s) for s in im.shape], indexing='ij')
tmp_dir = utils.strip_extension(path_im_downsampled) + '_meshes'
utils.mkdir(tmp_dir)
path_meshes_up = list()
for (i, d_map) in enumerate(dist_map):
path_mesh = os.path.join(tmp_dir, '%s_' % i + os.path.basename(path_image))
path_mesh_up = os.path.join(tmp_dir, 'up_%s_' % i + os.path.basename(path_image))
utils.save_volume(d_map, aff, h, path_mesh)
cmd = utils.mkcmd(mri_convert, path_mesh, path_mesh_up, '-rl', path_image, '-odt float')
os.system(cmd)
path_meshes_up.append(path_mesh_up)
mesh_up_0, aff, h = utils.load_volume(path_meshes_up[0], im_only=False)
mesh_up = np.stack([mesh_up_0] + [utils.load_volume(p) for p in path_meshes_up[1:]], -1)
shutil.rmtree(tmp_dir)
floor = np.floor(mesh_up)
ceil = np.ceil(mesh_up)
f_dist = mesh_up - floor
c_dist = ceil - mesh_up
dist = np.minimum(f_dist, c_dist) * utils.add_axis(data_res, axis=[0] * n_dims)
dist = np.sqrt(np.sum(dist ** 2, axis=-1))
utils.save_volume(dist, aff, h, path_dist_map)
def check_images_in_dir(image_dir, check_values=False, keep_unique=True, max_channels=10, verbose=True):
"""Check if all volumes within the same folder share the same characteristics: shape, affine matrix, resolution.
Also have option to check if all volumes have the same intensity values (useful for label maps).
:return four lists, each containing the different values detected for a specific parameter among those to check."""
# define information to check
list_shape = list()
list_aff = list()
list_res = list()
list_axes = list()
if check_values:
list_unique_values = list()
else:
list_unique_values = None
# loop through files
path_images = utils.list_images_in_folder(image_dir)
loop_info = utils.LoopInfo(len(path_images), 10, 'checking', verbose) if verbose else None
for idx, path_image in enumerate(path_images):
if loop_info is not None:
loop_info.update(idx)
# get info
im, shape, aff, n_dims, _, h, res = utils.get_volume_info(path_image, True, np.eye(4), max_channels)
axes = get_ras_axes(aff, n_dims=n_dims).tolist()
aff[:, np.arange(n_dims)] = aff[:, axes]
aff = (np.int32(np.round(np.array(aff[:3, :3]), 2) * 100) / 100).tolist()
res = (np.int32(np.round(np.array(res), 2) * 100) / 100).tolist()
# add values to list if not already there
if (shape not in list_shape) | (not keep_unique):
list_shape.append(shape)
if (aff not in list_aff) | (not keep_unique):
list_aff.append(aff)
if (res not in list_res) | (not keep_unique):
list_res.append(res)
if (axes not in list_axes) | (not keep_unique):
list_axes.append(axes)
if list_unique_values is not None:
uni = np.unique(im).tolist()
if (uni not in list_unique_values) | (not keep_unique):
list_unique_values.append(uni)
return list_shape, list_aff, list_res, list_axes, list_unique_values
# ----------------------------------------------- edit label maps in dir -----------------------------------------------
def correct_labels_in_dir(labels_dir, results_dir, incorrect_labels, correct_labels=None,
use_nearest_label=False, remove_zero=False, smooth=False, recompute=True):
"""This function corrects label values for all label maps in a folder with either
- a list a given values,
- or with the nearest label value.
:param labels_dir: path of directory with input label maps
:param results_dir: path of directory where corrected label maps will be writen
:param incorrect_labels: list of all label values to correct (e.g. [1, 2, 3, 4]).
:param correct_labels: (optional) list of correct label values to replace the incorrect ones.
Correct values must have the same order as their corresponding value in list_incorrect_labels.
When several correct values are possible for the same incorrect value, the nearest correct value will be selected at
each voxel to correct. In that case, the different correct values must be specified inside a list within
list_correct_labels (e.g. [10, 20, 30, [40, 50]).
:param use_nearest_label: (optional) whether to correct the incorrect label values with the nearest labels.
:param remove_zero: (optional) if use_nearest_label is True, set to True not to consider zero among the potential
candidates for the nearest neighbour.
:param smooth: (optional) whether to smooth the corrected label maps
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(results_dir)
# prepare data files
path_labels = utils.list_images_in_folder(labels_dir)
loop_info = utils.LoopInfo(len(path_labels), 10, 'correcting', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# correct labels
path_result = os.path.join(results_dir, os.path.basename(path_label))
if (not os.path.isfile(path_result)) | recompute:
im, aff, h = utils.load_volume(path_label, im_only=False, dtype='int32')
im = correct_label_map(im, incorrect_labels, correct_labels, use_nearest_label, remove_zero, smooth)
utils.save_volume(im, aff, h, path_result)
def mask_labels_in_dir(labels_dir, result_dir, values_to_keep, masking_value=0, mask_result_dir=None, recompute=True):
"""This function masks all label maps in a folder by keeping a set of given label values.
:param labels_dir: path of directory with input label maps
:param result_dir: path of directory where corrected label maps will be writen
:param values_to_keep: list of values for masking the label maps.
:param masking_value: (optional) value to mask the label maps with
:param mask_result_dir: (optional) path of directory where applied masks will be writen
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
if mask_result_dir is not None:
utils.mkdir(mask_result_dir)
# reformat values to keep
values_to_keep = utils.reformat_to_list(values_to_keep, load_as_numpy=True)
# loop over labels
path_labels = utils.list_images_in_folder(labels_dir)
loop_info = utils.LoopInfo(len(path_labels), 10, 'masking', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# mask labels
path_result = os.path.join(result_dir, os.path.basename(path_label))
if mask_result_dir is not None:
path_result_mask = os.path.join(mask_result_dir, os.path.basename(path_label))
else:
path_result_mask = ''
if (not os.path.isfile(path_result)) | \
(mask_result_dir is not None) & (not os.path.isfile(path_result_mask)) | \
recompute:
lab, aff, h = utils.load_volume(path_label, im_only=False)
if mask_result_dir is not None:
labels, mask = mask_label_map(lab, values_to_keep, masking_value, return_mask=True)
path_result_mask = os.path.join(mask_result_dir, os.path.basename(path_label))
utils.save_volume(mask, aff, h, path_result_mask)
else:
labels = mask_label_map(lab, values_to_keep, masking_value, return_mask=False)
utils.save_volume(labels, aff, h, path_result)
def smooth_labels_in_dir(labels_dir, result_dir, gpu=False, labels_list=None, connectivity=1, recompute=True):
"""Smooth all label maps in a folder by replacing each voxel by the value of its most numerous neighbours.
:param labels_dir: path of directory with input label maps
:param result_dir: path of directory where smoothed label maps will be writen
:param gpu: (optional) whether to use a gpu implementation for faster processing
:param labels_list: (optional) if gpu is True, path of numpy array with all label values.
Automatically computed if not provided.
:param connectivity: (optional) connectivity to use when smoothing the label maps
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# list label maps
path_labels = utils.list_images_in_folder(labels_dir)
if labels_list is not None:
labels_list, _ = utils.get_list_labels(label_list=labels_list, FS_sort=True)
if gpu:
# initialisation
previous_model_input_shape = None
smoothing_model = None
# loop over label maps
loop_info = utils.LoopInfo(len(path_labels), 10, 'smoothing', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# smooth label map
path_result = os.path.join(result_dir, os.path.basename(path_label))
if (not os.path.isfile(path_result)) | recompute:
labels, label_shape, aff, n_dims, _, h, _ = utils.get_volume_info(path_label, return_volume=True)
if label_shape != previous_model_input_shape:
previous_model_input_shape = label_shape
smoothing_model = smoothing_gpu_model(label_shape, labels_list, connectivity)
unique_labels = np.unique(labels).astype('int32')
if labels_list is None:
smoothed_labels = smoothing_model.predict(utils.add_axis(labels))
else:
labels_to_keep = [lab for lab in unique_labels if lab not in labels_list]
new_labels, mask_new_labels = mask_label_map(labels, labels_to_keep, return_mask=True)
smoothed_labels = np.squeeze(smoothing_model.predict(utils.add_axis(labels)))
smoothed_labels = np.where(mask_new_labels, new_labels, smoothed_labels)
mask_new_zeros = (labels > 0) & (smoothed_labels == 0)
smoothed_labels[mask_new_zeros] = labels[mask_new_zeros]
utils.save_volume(smoothed_labels, aff, h, path_result, dtype='int32')
else:
# build kernel
_, _, n_dims, _, _, _ = utils.get_volume_info(path_labels[0])
kernel = utils.build_binary_structure(connectivity, n_dims, shape=n_dims)
# loop over label maps
loop_info = utils.LoopInfo(len(path_labels), 10, 'smoothing', True)
for idx, path in enumerate(path_labels):
loop_info.update(idx)
# smooth label map
path_result = os.path.join(result_dir, os.path.basename(path))
if (not os.path.isfile(path_result)) | recompute:
volume, aff, h = utils.load_volume(path, im_only=False)
new_volume = smooth_label_map(volume, kernel, labels_list)
utils.save_volume(new_volume, aff, h, path_result, dtype='int32')
def smoothing_gpu_model(label_shape, label_list, connectivity=1):
"""This function builds a gpu model in keras with a tensorflow backend to smooth label maps.
This model replaces each voxel of the input by the value of its most numerous neighbour.
:param label_shape: shape of the label map
:param label_list: list of all labels to consider
:param connectivity: (optional) connectivity to use when smoothing the label maps
:return: gpu smoothing model
"""
# convert labels so values are in [0, ..., N-1] and use one hot encoding
n_labels = label_list.shape[0]
labels_in = KL.Input(shape=label_shape, name='lab_input', dtype='int32')
labels = ConvertLabels(label_list)(labels_in)
labels = KL.Lambda(lambda x: tf.one_hot(tf.cast(x, dtype='int32'), depth=n_labels, axis=-1))(labels)
# count neighbouring voxels
n_dims, _ = utils.get_dims(label_shape)
k = utils.add_axis(utils.build_binary_structure(connectivity, n_dims, shape=n_dims), axis=[-1, -1])
kernel = KL.Lambda(lambda x: tf.convert_to_tensor(k, dtype='float32'))([])
split = KL.Lambda(lambda x: tf.split(x, [1] * n_labels, axis=-1))(labels)
labels = KL.Lambda(lambda x: tf.nn.convolution(x[0], x[1], padding='SAME'))([split[0], kernel])
for i in range(1, n_labels):
tmp = KL.Lambda(lambda x: tf.nn.convolution(x[0], x[1], padding='SAME'))([split[i], kernel])
labels = KL.Lambda(lambda x: tf.concat([x[0], x[1]], -1))([labels, tmp])
# take the argmax and convert labels to original values
labels = KL.Lambda(lambda x: tf.math.argmax(x, -1))(labels)
labels = ConvertLabels(np.arange(n_labels), label_list)(labels)
return Model(inputs=labels_in, outputs=labels)
def erode_labels_in_dir(labels_dir, result_dir, labels_to_erode, erosion_factors=1., gpu=False, recompute=True):
"""Erode a given set of label values for all label maps in a folder.
:param labels_dir: path of directory with input label maps
:param result_dir: path of directory where cropped label maps will be writen
:param labels_to_erode: list of label values to erode
:param erosion_factors: (optional) list of erosion factors to use for each label value. If values are integers,
normal erosion applies. If float, we first 1) blur a mask of the corresponding label value with a gpu model,
and 2) use the erosion factor as a threshold in the blurred mask.
If erosion_factors is a single value, the same factor will be applied to all labels.
:param gpu: (optional) whether to use a fast gpu model for blurring (if erosion factors are floats)
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# create result dir
utils.mkdir(result_dir)
# loop over label maps
model = None
path_labels = utils.list_images_in_folder(labels_dir)
loop_info = utils.LoopInfo(len(path_labels), 5, 'eroding', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# erode label map
labels, aff, h = utils.load_volume(path_label, im_only=False)
path_result = os.path.join(result_dir, os.path.basename(path_label))
if (not os.path.isfile(path_result)) | recompute:
labels, model = erode_label_map(labels, labels_to_erode, erosion_factors, gpu, model, return_model=True)
utils.save_volume(labels, aff, h, path_result)
def upsample_labels_in_dir(labels_dir,
target_res,
result_dir,
path_label_list=None,
path_freesurfer='/usr/local/freesurfer/',
recompute=True):
"""This function upsamples all label maps within a folder. Importantly, each label map is converted into probability
maps for all label values, and all these maps are upsampled separately. The upsampled label maps are recovered by
taking the argmax of the label values probability maps.
:param labels_dir: path of directory with label maps to upsample
:param target_res: resolution at which to upsample the label maps. can be a single number (isotropic), or a list.
:param result_dir: path of directory where the upsampled label maps will be writen
:param path_label_list: (optional) path of numpy array containing all label values.
Computed automatically if not given.
:param path_freesurfer: (optional) path freesurfer home (upsampling performed with mri_convert)
:param recompute: (optional) whether to recompute result files even if they already exists
"""
# prepare result dir
utils.mkdir(result_dir)
# set up FreeSurfer
os.environ['FREESURFER_HOME'] = path_freesurfer
os.system(os.path.join(path_freesurfer, 'SetUpFreeSurfer.sh'))
mri_convert = os.path.join(path_freesurfer, 'bin/mri_convert')
# list label maps
path_labels = utils.list_images_in_folder(labels_dir)
labels_shape, aff, n_dims, _, h, _ = utils.get_volume_info(path_labels[0], max_channels=3)
# build command
target_res = utils.reformat_to_list(target_res, length=n_dims)
post_cmd = '-voxsize ' + ' '.join([str(r) for r in target_res]) + ' -odt float'
# load label list and corresponding LUT to make sure that labels go from 0 to N-1
label_list, _ = utils.get_list_labels(path_label_list, labels_dir=path_labels, FS_sort=False)
new_label_list = np.arange(len(label_list), dtype='int32')
lut = utils.get_mapping_lut(label_list)
# loop over label maps
loop_info = utils.LoopInfo(len(path_labels), 5, 'upsampling', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
path_result = os.path.join(result_dir, os.path.basename(path_label))
if (not os.path.isfile(path_result)) | recompute:
# load volume
labels, aff, h = utils.load_volume(path_label, im_only=False)
labels = lut[labels.astype('int')]
# create individual folders for label map
basefilename = utils.strip_extension(os.path.basename(path_label))
indiv_label_dir = os.path.join(result_dir, basefilename)
upsample_indiv_label_dir = os.path.join(result_dir, basefilename + '_upsampled')
utils.mkdir(indiv_label_dir)
utils.mkdir(upsample_indiv_label_dir)
# loop over label values
for label in new_label_list:
path_mask = os.path.join(indiv_label_dir, str(label) + '.nii.gz')
path_mask_upsampled = os.path.join(upsample_indiv_label_dir, str(label) + '.nii.gz')
if not os.path.isfile(path_mask):
mask = (labels == label) * 1.0
utils.save_volume(mask, aff, h, path_mask)
if not os.path.isfile(path_mask_upsampled):
cmd = utils.mkcmd(mri_convert, path_mask, path_mask_upsampled, post_cmd)
os.system(cmd)
# compute argmax of upsampled probability maps (upload them one at a time)
probmax, aff, h = utils.load_volume(os.path.join(upsample_indiv_label_dir, '0.nii.gz'), im_only=False)
labels = np.zeros(probmax.shape, dtype='int')
for label in new_label_list:
prob = utils.load_volume(os.path.join(upsample_indiv_label_dir, str(label) + '.nii.gz'))
idx = prob > probmax
labels[idx] = label
probmax[idx] = prob[idx]
utils.save_volume(label_list[labels], aff, h, path_result, dtype='int32')
def compute_hard_volumes_in_dir(labels_dir,
voxel_volume=None,
path_label_list=None,
skip_background=True,
path_numpy_result=None,
path_csv_result=None,
FS_sort=False):
"""Compute hard volumes of structures for all label maps in a folder.
:param labels_dir: path of directory with input label maps
:param voxel_volume: (optional) volume of the voxels. If None, it will be directly inferred from the file header.
Set to 1 for a voxel count.
:param path_label_list: (optional) list of labels to compute volumes for.
Can be an int, a sequence, or a numpy array. If None, the volumes of all label values are computed for each subject.
:param skip_background: (optional) whether to skip computing the volume of the background.
If label_list is None, this assumes background value is 0.
If label_list is not None, this assumes the background is the first value in label list.
:param path_numpy_result: (optional) path where to write the result volumes as a numpy array.
:param path_csv_result: (optional) path where to write the results as csv file.
:param FS_sort: (optional) whether to sort the labels in FreeSurfer order.
:return: numpy array with the volume of each structure for all subjects.
Rows represent label values, and columns represent subjects.
"""
# create result directories
if path_numpy_result is not None:
utils.mkdir(os.path.dirname(path_numpy_result))
if path_csv_result is not None:
utils.mkdir(os.path.dirname(path_csv_result))
# load or compute labels list
label_list, _ = utils.get_list_labels(path_label_list, labels_dir, FS_sort=FS_sort)
# create csv volume file if necessary
if path_csv_result is not None:
if skip_background:
cvs_header = [['subject'] + [str(lab) for lab in label_list[1:]]]
else:
cvs_header = [['subject'] + [str(lab) for lab in label_list]]
with open(path_csv_result, 'w') as csvFile:
writer = csv.writer(csvFile)
writer.writerows(cvs_header)
csvFile.close()
# loop over label maps
path_labels = utils.list_images_in_folder(labels_dir)
if skip_background:
volumes = np.zeros((label_list.shape[0] - 1, len(path_labels)))
else:
volumes = np.zeros((label_list.shape[0], len(path_labels)))
loop_info = utils.LoopInfo(len(path_labels), 10, 'processing', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# load segmentation, and compute unique labels
labels, _, _, _, _, _, subject_res = utils.get_volume_info(path_label, return_volume=True)
if voxel_volume is None:
voxel_volume = float(np.prod(subject_res))
subject_volumes = compute_hard_volumes(labels, voxel_volume, label_list, skip_background)
volumes[:, idx] = subject_volumes
# write volumes
if path_csv_result is not None:
subject_volumes = np.around(volumes[:, idx], 3)
row = [utils.strip_suffix(os.path.basename(path_label))] + [str(vol) for vol in subject_volumes]
with open(path_csv_result, 'a') as csvFile:
writer = csv.writer(csvFile)
writer.writerow(row)
csvFile.close()
# write numpy array if necessary
if path_numpy_result is not None:
np.save(path_numpy_result, volumes)
return volumes
def build_atlas(labels_dir,
label_list,
align_centre_of_mass=False,
margin=15,
shape=None,
path_atlas=None):
"""This function builds a binary atlas (defined by label values > 0) from several label maps.
:param labels_dir: path of directory with input label maps
:param label_list: list of all labels in the label maps. If there is more than 1 value here, the different channels
of the atlas (each corresponding to the probability map of a given label) will in the same order as in this list.
:param align_centre_of_mass: whether to build the atlas by aligning the center of mass of each label map.
If False, the atlas has the same size as the input label maps, which are assumed to be aligned.
:param margin: (optional) If align_centre_of_mass is True, margin by which to crop the input label maps around
their center of mass. Therefore it controls the size of the output atlas: (2*margin + 1)**n_dims.
:param shape: shape of the output atlas.
:param path_atlas: (optional) path where the output atlas will be writen.
Default is None, where the atlas is not saved."""
# list of all label maps and create result dir
path_labels = utils.list_images_in_folder(labels_dir)
n_label_maps = len(path_labels)
utils.mkdir(os.path.dirname(path_atlas))
# read list labels and create lut
label_list = np.array(utils.reformat_to_list(label_list, load_as_numpy=True, dtype='int'))
lut = utils.get_mapping_lut(label_list)
n_labels = len(label_list)
# create empty atlas
im_shape, aff, n_dims, _, h, _ = utils.get_volume_info(path_labels[0], aff_ref=np.eye(4))
if align_centre_of_mass:
shape = [margin * 2] * n_dims
else:
shape = utils.reformat_to_list(shape, length=n_dims) if shape is not None else im_shape
shape = shape + [n_labels] if n_labels > 1 else shape
atlas = np.zeros(shape)
# loop over label maps
loop_info = utils.LoopInfo(n_label_maps, 10, 'processing', True)
for idx, path_label in enumerate(path_labels):
loop_info.update(idx)
# load label map and build mask
lab = utils.load_volume(path_label, dtype='int32', aff_ref=np.eye(4))
lab = correct_label_map(lab, [31, 63, 72], [4, 43, 0])
lab = lut[lab.astype('int')]
lab = pad_volume(lab, shape[:n_dims])
lab = crop_volume(lab, cropping_shape=shape[:n_dims])
indices = np.where(lab > 0)
if len(label_list) > 1:
lab = np.identity(n_labels)[lab]
# crop label map around centre of mass
if align_centre_of_mass:
centre_of_mass = np.array([np.mean(indices[0]), np.mean(indices[1]), np.mean(indices[2])], dtype='int32')
min_crop = centre_of_mass - margin
max_crop = centre_of_mass + margin
atlas += lab[min_crop[0]:max_crop[0], min_crop[1]:max_crop[1], min_crop[2]:max_crop[2], ...]
# otherwise just add the one-hot labels
else:
atlas += lab
# normalise atlas and save it if necessary
atlas /= n_label_maps
atlas = align_volume_to_ref(atlas, np.eye(4), aff_ref=aff, n_dims=n_dims)
if path_atlas is not None:
utils.save_volume(atlas, aff, h, path_atlas)
return atlas
# ---------------------------------------------------- edit dataset ----------------------------------------------------
def check_images_and_labels(image_dir, labels_dir, verbose=True):
"""Check if corresponding images and labels have the same affine matrices and shapes.
Labels are matched to images by sorting order.
:param image_dir: path of directory with input images
:param labels_dir: path of directory with corresponding label maps
:param verbose: whether to print out info
"""
# list images and labels
path_images = utils.list_images_in_folder(image_dir)
path_labels = utils.list_images_in_folder(labels_dir)
assert len(path_images) == len(path_labels), 'different number of files in image_dir and labels_dir'
# loop over images and labels
loop_info = utils.LoopInfo(len(path_images), 10, 'checking', verbose) if verbose else None
for idx, (path_image, path_label) in enumerate(zip(path_images, path_labels)):
if loop_info is not None:
loop_info.update(idx)
# load images and labels
im, aff_im, h_im = utils.load_volume(path_image, im_only=False)
lab, aff_lab, h_lab = utils.load_volume(path_label, im_only=False)
aff_im_list = np.round(aff_im, 2).tolist()
aff_lab_list = np.round(aff_lab, 2).tolist()
# check matching affine and shape
if aff_lab_list != aff_im_list:
print('aff mismatch :\n' + path_image)
print(aff_im_list)
print(path_label)
print(aff_lab_list)
print('')
if lab.shape != im.shape:
print('shape mismatch :\n' + path_image)
print(im.shape)
print('\n' + path_label)
print(lab.shape)
print('')
def crop_dataset_to_minimum_size(labels_dir, result_dir, image_dir=None, image_result_dir=None, margin=5):
"""Crop all label maps in a directory to the minimum possible common size, with a margin.
This is achieved by cropping each label map individually to the minimum size, and by padding all the cropped maps to
the same size (taken to be the maximum size of the cropped maps).
If images are provided, they undergo the same transformations as their corresponding label maps.
:param labels_dir: path of directory with input label maps
:param result_dir: path of directory where cropped label maps will be writen
:param image_dir: (optional) if not None, the cropping will be applied to all images in this directory
:param image_result_dir: (optional) path of directory where cropped images will be writen
:param margin: (optional) margin to apply around the label maps during cropping
"""
# create result dir
utils.mkdir(result_dir)
if image_dir is not None:
assert image_result_dir is not None, 'image_result_dir should not be None if image_dir is specified'
utils.mkdir(image_result_dir)
# list labels and images
path_labels = utils.list_images_in_folder(labels_dir)
if image_dir is not None:
path_images = utils.list_images_in_folder(image_dir)
else:
path_images = [None] * len(path_labels)
_, _, n_dims, _, _, _ = utils.get_volume_info(path_labels[0])
# loop over label maps for cropping
print('\ncropping labels to individual minimum size')
maximum_size = np.zeros(n_dims)
loop_info = utils.LoopInfo(len(path_labels), 10, 'cropping', True)
for idx, (path_label, path_image) in enumerate(zip(path_labels, path_images)):
loop_info.update(idx)
# crop label maps and update maximum size of cropped map
label, aff, h = utils.load_volume(path_label, im_only=False)
label, cropping, aff = crop_volume_around_region(label, aff=aff)
utils.save_volume(label, aff, h, os.path.join(result_dir, os.path.basename(path_label)))
maximum_size = np.maximum(maximum_size, np.array(label.shape) + margin * 2) # *2 to add margin on each side
# crop images if required
if path_image is not None:
image, aff_im, h_im = utils.load_volume(path_image, im_only=False)
image, aff_im = crop_volume_with_idx(image, cropping, aff=aff_im)
utils.save_volume(image, aff_im, h_im, os.path.join(image_result_dir, os.path.basename(path_image)))
# loop over label maps for padding
print('\npadding labels to same size')
loop_info = utils.LoopInfo(len(path_labels), 10, 'padding', True)
for idx, (path_label, path_image) in enumerate(zip(path_labels, path_images)):
loop_info.update(idx)
# pad label maps to maximum size
path_result = os.path.join(result_dir, os.path.basename(path_label))
label, aff, h = utils.load_volume(path_result, im_only=False)
label, aff = pad_volume(label, maximum_size, aff=aff)
utils.save_volume(label, aff, h, path_result)
# crop images if required
if path_image is not None:
path_result = os.path.join(image_result_dir, os.path.basename(path_image))
image, aff, h = utils.load_volume(path_result, im_only=False)
image, aff = pad_volume(image, maximum_size, aff=aff)
utils.save_volume(image, aff, h, path_result)
def crop_dataset_around_region_of_same_size(labels_dir,
result_dir,
image_dir=None,
image_result_dir=None,
margin=0,
recompute=True):
# create result dir
utils.mkdir(result_dir)
if image_dir is not None:
assert image_result_dir is not None, 'image_result_dir should not be None if image_dir is specified'
utils.mkdir(image_result_dir)
# list labels and images
path_labels = utils.list_images_in_folder(labels_dir)
path_images = utils.list_images_in_folder(image_dir) if image_dir is not None else [None] * len(path_labels)
_, _, n_dims, _, _, _ = utils.get_volume_info(path_labels[0])
recompute_labels = any([not os.path.isfile(os.path.join(result_dir, os.path.basename(path)))
for path in path_labels])
if (image_dir is not None) & (not recompute_labels):
recompute_labels = any([not os.path.isfile(os.path.join(image_result_dir, os.path.basename(path)))
for path in path_images])
# get minimum patch shape so that no labels are left out when doing the cropping later on
max_crop_shape = np.zeros(n_dims)
if recompute_labels:
for path_label in path_labels:
label, aff, _ = utils.load_volume(path_label, im_only=False)
label = align_volume_to_ref(label, aff, aff_ref=np.eye(4))
label = get_largest_connected_component(label > 0, structure=np.ones((3, 3, 3)))
_, cropping = crop_volume_around_region(label)
max_crop_shape = np.maximum(cropping[n_dims:] - cropping[:n_dims], max_crop_shape)
max_crop_shape += np.array(utils.reformat_to_list(margin, length=n_dims, dtype='int'))
print('max_crop_shape: ', max_crop_shape)
# crop shapes (possibly with padding if images are smaller than crop shape)
for path_label, path_image in zip(path_labels, path_images):
path_label_result = os.path.join(result_dir, os.path.basename(path_label))
path_image_result = os.path.join(image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_image_result)) | (not os.path.isfile(path_label_result)) | recompute:
# load labels
label, aff, h_la = utils.load_volume(path_label, im_only=False, dtype='int32')
label, aff_new = align_volume_to_ref(label, aff, aff_ref=np.eye(4), return_aff=True)
vol_shape = np.array(label.shape[:n_dims])
if path_image is not None:
image, _, h_im = utils.load_volume(path_image, im_only=False)
image = align_volume_to_ref(image, aff, aff_ref=np.eye(4))
else:
image = h_im = None
# mask labels
mask = get_largest_connected_component(label > 0, structure=np.ones((3, 3, 3)))
label[np.logical_not(mask)] = 0
# find cropping indices
indices = np.nonzero(mask)
min_idx = np.maximum(np.array([np.min(idx) for idx in indices]) - margin, 0)
max_idx = np.minimum(np.array([np.max(idx) for idx in indices]) + 1 + margin, vol_shape)
# expand/retract (depending on the desired shape) the cropping region around the centre
intermediate_vol_shape = max_idx - min_idx
min_idx = min_idx - np.int32(np.ceil((max_crop_shape - intermediate_vol_shape) / 2))
max_idx = max_idx + np.int32(np.floor((max_crop_shape - intermediate_vol_shape) / 2))
# check if we need to pad the output to the desired shape
min_padding = np.abs(np.minimum(min_idx, 0))
max_padding = np.maximum(max_idx - vol_shape, 0)
if np.any(min_padding > 0) | np.any(max_padding > 0):
pad_margins = tuple([(min_padding[i], max_padding[i]) for i in range(n_dims)])
else:
pad_margins = None
cropping = np.concatenate([np.maximum(min_idx, 0), np.minimum(max_idx, vol_shape)])
# crop volume
label = crop_volume_with_idx(label, cropping, n_dims=n_dims)
if path_image is not None:
image = crop_volume_with_idx(image, cropping, n_dims=n_dims)
# pad volume if necessary
if pad_margins is not None:
label = np.pad(label, pad_margins, mode='constant', constant_values=0)
if path_image is not None:
_, n_channels = utils.get_dims(image.shape)
pad_margins = tuple(list(pad_margins) + [(0, 0)]) if n_channels > 1 else pad_margins
image = np.pad(image, pad_margins, mode='constant', constant_values=0)
# update aff
if n_dims == 2:
min_idx = np.append(min_idx, 0)
aff_new[0:3, -1] = aff_new[0:3, -1] + aff_new[:3, :3] @ min_idx
# write labels
label, aff_final = align_volume_to_ref(label, aff_new, aff_ref=aff, return_aff=True)
utils.save_volume(label, aff_final, h_la, path_label_result, dtype='int32')
if path_image is not None:
image = align_volume_to_ref(image, aff_new, aff_ref=aff)
utils.save_volume(image, aff_final, h_im, path_image_result)
def crop_dataset_around_region(image_dir, labels_dir, image_result_dir, labels_result_dir, margin=0,
cropping_shape_div_by=None, recompute=True):
# create result dir
utils.mkdir(image_result_dir)
utils.mkdir(labels_result_dir)
# list volumes and masks
path_images = utils.list_images_in_folder(image_dir)
path_labels = utils.list_images_in_folder(labels_dir)
_, _, n_dims, n_channels, _, _ = utils.get_volume_info(path_labels[0])
# loop over images and labels
loop_info = utils.LoopInfo(len(path_images), 10, 'cropping', True)
for idx, (path_image, path_label) in enumerate(zip(path_images, path_labels)):
loop_info.update(idx)
path_label_result = os.path.join(labels_result_dir, os.path.basename(path_label))
path_image_result = os.path.join(image_result_dir, os.path.basename(path_image))
if (not os.path.isfile(path_label_result)) | (not os.path.isfile(path_image_result)) | recompute:
image, aff, h_im = utils.load_volume(path_image, im_only=False)
label, _, h_lab = utils.load_volume(path_label, im_only=False)
mask = get_largest_connected_component(label > 0, structure=np.ones((3, 3, 3)))
label[np.logical_not(mask)] = 0
vol_shape = np.array(label.shape[:n_dims])
# find cropping indices
indices = np.nonzero(mask)
min_idx = np.maximum(np.array([np.min(idx) for idx in indices]) - margin, 0)
max_idx = np.minimum(np.array([np.max(idx) for idx in indices]) + 1 + margin, vol_shape)
# expand/retract (depending on the desired shape) the cropping region around the centre
intermediate_vol_shape = max_idx - min_idx
cropping_shape = np.array([utils.find_closest_number_divisible_by_m(s, cropping_shape_div_by,
answer_type='higher')
for s in intermediate_vol_shape])
min_idx = min_idx - np.int32(np.ceil((cropping_shape - intermediate_vol_shape) / 2))
max_idx = max_idx + np.int32(np.floor((cropping_shape - intermediate_vol_shape) / 2))
# check if we need to pad the output to the desired shape
min_padding = np.abs(np.minimum(min_idx, 0))
max_padding = np.maximum(max_idx - vol_shape, 0)
if np.any(min_padding > 0) | np.any(max_padding > 0):
pad_margins = tuple([(min_padding[i], max_padding[i]) for i in range(n_dims)])
else:
pad_margins = None
cropping = np.concatenate([np.maximum(min_idx, 0), np.minimum(max_idx, vol_shape)])
# crop volume
label = crop_volume_with_idx(label, cropping, n_dims=n_dims)
image = crop_volume_with_idx(image, cropping, n_dims=n_dims)
# pad volume if necessary
if pad_margins is not None:
label = np.pad(label, pad_margins, mode='constant', constant_values=0)
pad_margins = tuple(list(pad_margins) + [(0, 0)]) if n_channels > 1 else pad_margins
image = np.pad(image, pad_margins, mode='constant', constant_values=0)
# update aff
if n_dims == 2:
min_idx = np.append(min_idx, 0)
aff[0:3, -1] = aff[0:3, -1] + aff[:3, :3] @ min_idx
# write results
utils.save_volume(image, aff, h_im, path_image_result)
utils.save_volume(label, aff, h_lab, path_label_result, dtype='int32')
def subdivide_dataset_to_patches(patch_shape,
image_dir=None,
image_result_dir=None,
labels_dir=None,
labels_result_dir=None,
full_background=True,
remove_after_dividing=False):
"""This function subdivides images and/or label maps into several smaller patches of specified shape.
:param patch_shape: shape of patches to create. Can either be an int, a sequence, or a 1d numpy array.
:param image_dir: (optional) path of directory with input images
:param image_result_dir: (optional) path of directory where image patches will be writen
:param labels_dir: (optional) path of directory with input label maps
:param labels_result_dir: (optional) path of directory where label map patches will be writen
:param full_background: (optional) whether to keep patches only labelled as background (only if label maps are
provided).
:param remove_after_dividing: (optional) whether to delete input images after having divided them in smaller
patches. This enables to save disk space in the subdivision process.
"""
# create result dir and list images and label maps
assert (image_dir is not None) | (labels_dir is not None), \
'at least one of image_dir or labels_dir should not be None.'
if image_dir is not None:
assert image_result_dir is not None, 'image_result_dir should not be None if image_dir is specified'
utils.mkdir(image_result_dir)
path_images = utils.list_images_in_folder(image_dir)
else:
path_images = None
if labels_dir is not None:
assert labels_result_dir is not None, 'labels_result_dir should not be None if labels_dir is specified'
utils.mkdir(labels_result_dir)
path_labels = utils.list_images_in_folder(labels_dir)
else:
path_labels = None
if path_images is None:
path_images = [None] * len(path_labels)
if path_labels is None:
path_labels = [None] * len(path_images)
# reformat path_shape
patch_shape = utils.reformat_to_list(patch_shape)
n_dims, _ = utils.get_dims(patch_shape)
# loop over images and labels
loop_info = utils.LoopInfo(len(path_images), 10, 'processing', True)
for idx, (path_image, path_label) in enumerate(zip(path_images, path_labels)):
loop_info.update(idx)
# load image and labels
if path_image is not None:
im, aff_im, h_im = utils.load_volume(path_image, im_only=False, squeeze=False)
else:
im = aff_im = h_im = None
if path_label is not None:
lab, aff_lab, h_lab = utils.load_volume(path_label, im_only=False, squeeze=True)
else:
lab = aff_lab = h_lab = None
# get volume shape
if path_image is not None:
shape = im.shape
else:
shape = lab.shape
# crop image and label map to size divisible by patch_shape
new_size = np.array([utils.find_closest_number_divisible_by_m(shape[i], patch_shape[i]) for i in range(n_dims)])
crop = np.round((np.array(shape[:n_dims]) - new_size) / 2).astype('int')
crop = np.concatenate((crop, crop + new_size), axis=0)
if (im is not None) & (n_dims == 2):
im = im[crop[0]:crop[2], crop[1]:crop[3], ...]
elif (im is not None) & (n_dims == 3):
im = im[crop[0]:crop[3], crop[1]:crop[4], crop[2]:crop[5], ...]
if (lab is not None) & (n_dims == 2):
lab = lab[crop[0]:crop[2], crop[1]:crop[3], ...]
elif (lab is not None) & (n_dims == 3):
lab = lab[crop[0]:crop[3], crop[1]:crop[4], crop[2]:crop[5], ...]
# loop over patches
n_im = 0
n_crop = (new_size / patch_shape).astype('int')
for i in range(n_crop[0]):
i *= patch_shape[0]
for j in range(n_crop[1]):
j *= patch_shape[1]
if n_dims == 2:
# crop volumes
if lab is not None:
temp_la = lab[i:i + patch_shape[0], j:j + patch_shape[1], ...]
else:
temp_la = None
if im is not None:
temp_im = im[i:i + patch_shape[0], j:j + patch_shape[1], ...]
else:
temp_im = None
# write patches
if temp_la is not None:
if full_background | (not (temp_la == 0).all()):
n_im += 1
utils.save_volume(temp_la, aff_lab, h_lab, os.path.join(labels_result_dir,
os.path.basename(path_label.replace('.nii.gz', '_%d.nii.gz' % n_im))))
if temp_im is not None:
utils.save_volume(temp_im, aff_im, h_im, os.path.join(image_result_dir,
os.path.basename(path_image.replace('.nii.gz', '_%d.nii.gz' % n_im))))
else:
utils.save_volume(temp_im, aff_im, h_im, os.path.join(image_result_dir,
os.path.basename(path_image.replace('.nii.gz', '_%d.nii.gz' % n_im))))
elif n_dims == 3:
for k in range(n_crop[2]):
k *= patch_shape[2]
# crop volumes
if lab is not None:
temp_la = lab[i:i + patch_shape[0], j:j + patch_shape[1], k:k + patch_shape[2], ...]
else:
temp_la = None
if im is not None:
temp_im = im[i:i + patch_shape[0], j:j + patch_shape[1], k:k + patch_shape[2], ...]
else:
temp_im = None
# write patches
if temp_la is not None:
if full_background | (not (temp_la == 0).all()):
n_im += 1
utils.save_volume(temp_la, aff_lab, h_lab, os.path.join(labels_result_dir,
os.path.basename(path_label.replace('.nii.gz', '_%d.nii.gz' % n_im))))
if temp_im is not None:
utils.save_volume(temp_im, aff_im, h_im, os.path.join(image_result_dir,
os.path.basename(path_image.replace('.nii.gz',
'_%d.nii.gz' % n_im))))
else:
utils.save_volume(temp_im, aff_im, h_im, os.path.join(image_result_dir,
os.path.basename(path_image.replace('.nii.gz', '_%d.nii.gz' % n_im))))
if remove_after_dividing:
if path_image is not None:
os.remove(path_image)
if path_label is not None:
os.remove(path_label)