"""The medical image object.
This module defines :class:`MedicalVolume`, which is a wrapper for nD volumes.
"""
import warnings
from copy import deepcopy
from mmap import mmap
from numbers import Number
from typing import Sequence, Tuple, Union
import nibabel as nib
import numpy as np
import pydicom
from nibabel.spatialimages import SpatialFirstSlicer as _SpatialFirstSlicerNib
from numpy.lib.mixins import NDArrayOperatorsMixin
from packaging import version
from dosma.core import orientation as stdo
from dosma.core.device import Device, cpu_device, get_array_module, get_device, to_device
from dosma.core.io.format_io import ImageDataFormat
from dosma.defaults import SCANNER_ORIGIN_DECIMAL_PRECISION
from dosma.utils import env
if env.sitk_available():
import SimpleITK as sitk
if env.cupy_available():
import cupy as cp
if env.package_available("h5py"):
import h5py
__all__ = ["MedicalVolume"]
# PyTorch version introducing complex tensor support.
_TORCH_COMPLEX_SUPPORT_VERSION = version.Version("1.5.0")
class MedicalVolume(NDArrayOperatorsMixin):
"""The class for medical images.
Medical volumes use ndarrays to represent medical data. However, unlike standard ndarrays,
these volumes have inherent spatial metadata, such as pixel/voxel spacing, global coordinates,
rotation information, all of which can be characterized by an affine matrix following the
RAS+ coordinate system. The code below creates a random 300x300x40 medical volume with
scanner origin ``(0, 0, 0)`` and voxel spacing of ``(1,1,1)``:
>>> mv = MedicalVolume(np.random.rand(300, 300, 40), np.eye(4))
Medical volumes can also store header information that accompanies pixel data
(e.g. DICOM headers). These headers are used to expose metadata, which can be fetched
and set using :meth:`get_metadata()` and :meth:`set_metadata()`, respectively. Headers are
also auto-aligned, which means that headers will be aligned with the slice(s) of data from
which they originated, which makes Python slicing feasible. Currently, medical volumes
support DICOM headers using ``pydicom`` when loaded with :class:`dosma.DicomReader`.
>>> mv.get_metadata("EchoTime") # Returns EchoTime
>>> mv.set_metadata("EchoTime", 10.0) # Sets EchoTime to 10.0
Standard math and boolean operations are supported with other ``MedicalVolume`` objects,
numpy arrays (following standard broadcasting), and scalars. Boolean operations are performed
elementwise, resulting in a volume with shape as ``self.volume.shape``.
If performing operations between ``MedicalVolume`` objects, both objects must have
the same shape and affine matrix (spacing, direction, and origin). Header information
is not deep copied when performing these operations to reduce computational and memory
overhead. The affine matrix (``self.affine``) is copied as it is lightweight and
often modified.
2D images are also supported when viewed trivial 3D volumes with shape ``(H, W, 1)``:
>>> mv = MedicalVolume(np.random.rand(10,20,1), np.eye(4))
Many operations are in-place and modify the instance directly (e.g. `reformat(inplace=True)`).
To allow chaining operations, operations that are in-place return ``self``.
>>> mv2 = mv.reformat(ornt, inplace=True)
>>> id(mv2) == id(mv)
True
Medical volumes can interface with the gpu using the :mod:`cupy` library.
Volumes can be moved between devices (see :class:`Device`) using the ``.to()`` method.
Only the volume data will be moved to the gpu. Headers and affine matrix will remain on
the cpu. The following code moves a MedicalVolume to gpu 0 and back to the cpu:
>>> from dosma import Device
>>> mv = MedicalVolume(np.random.rand((10,20,30)), np.eye(4))
>>> mv_gpu = mv.to(Device(0))
>>> mv_cpu = mv.cpu()
Note, moving data across devices results in a full copy. Above, ``mv_cpu.volume`` and
``mv.volume`` do not share memory. Saving volumes and converting to other images
(e.g. ``SimpleITK.Image``) are only supported for cpu volumes. Volumes can also only
be compared when on the same device. For example, both commands below will raise a
RuntimeError:
>>> mv_gpu == mv_cpu
>>> mv_gpu.is_identical(mv_cpu)
While CuPy requires the current device be set using ``cp.cuda.Device(X).use()`` or inside
the ``with`` context, ``MedicalVolume`` automatically sets the appropriate context
for performing operations. This means the CuPy current device need to be the same as the
``MedicalVolume`` object. For example, the following still works:
>>> cp.cuda.Device(0).use()
>>> mv_gpu = MedicalVolume(cp.ones((3,3,3)), np.eye(4))
>>> cp.cuda.Device(1).use()
>>> mv_gpu *= 2
MedicalVolumes also have a limited NumPy/CuPy-compatible interface.
Standard numpy/cupy functions that preserve array shapes can be performed
on MedicalVolume objects:
>>> log_arr = np.log(mv)
>>> type(log_arr)
<class 'dosma.io.MedicalVolume'>
>>> exp_arr_gpu = cp.exp(mv_gpu)
>>> type(exp_arr_gpu)
<class 'dosma.io.MedicalVolume'>
**ALPHA**: MedicalVolumes are also interoperable with popular image data structures
with zero-copy, meaning array data will not be copied. Formats currently include the
SimpleITK Image, Nibabel Nifti1Image, and PyTorch tensors:
>>> sitk_img = mv.to_sitk() # Convert to SimpleITK Image
>>> mv_from_sitk = MedicalVolume.from_sitk(sitk_img) # Convert back to MedicalVolume
>>> nib_img = mv.to_nib() # Convert to nibabel Nifti1Image
>>> mv_from_nib = MedicalVolume.from_nib(nib_img)
>>> torch_tensor = mv.to_torch() # Convert to torch tensor
>>> mv_from_tensor = MedicalVolume.from_torch(torch_tensor, affine)
**ALPHA**: MedicalVolumes can also be used with memmapped arrays.
This makes loading much faster and allows interaction with larger-than-memory
arrays. Only when the volume is modified will the volume be loaded
into memory and modified. If you take a slice of the memmaped array, the underlying
array will also remain memmapped:
>>> arr = np.load("/path/to/volume.npy", mmap_mode="r")
>>> mv = MedicalVolume(arr, np.eye(4))
>>> mv.is_mmap # returns True
We also preserve Nibabel's memmapping of certain file types (e.g. ``.nii``):
>>> nib_img = nibabel.load("path/to/volume.nii")
>>> mv = MedicalVolume.from_nib(nib_img, mmap=True)
Args:
volume (array-like): nD medical image.
affine (array-like): 4x4 array corresponding to affine matrix transform in RAS+ coordinates.
Must be on cpu (i.e. no ``cupy.ndarray``).
headers (array-like[pydicom.FileDataset]): Headers for DICOM files.
"""
def __init__(self, volume, affine, headers=None):
if not isinstance(volume, np.memmap):
xp = get_array_module(volume)
volume = xp.asarray(volume)
self._volume = volume
self._affine = np.array(affine)
self._headers = self._validate_and_format_headers(headers) if headers is not None else None
def save_volume(self, file_path: str, data_format: ImageDataFormat = ImageDataFormat.nifti):
"""Write volumes in specified data format.
Args:
file_path (str): File path to save data. May be modified to follow convention
given by the data format in which the volume will be saved.
data_format (ImageDataFormat): Format to save data.
"""
import dosma.core.io.format_io_utils
device = self.device
if device != cpu_device:
raise RuntimeError(f"MedicalVolume must be on cpu, got {self.device}")
writer = dosma.core.io.format_io_utils.get_writer(data_format)
writer.save(self, file_path)
def reformat(self, new_orientation: Sequence, inplace: bool = False) -> "MedicalVolume":
"""Reorients volume to a specified orientation.
Flipping and transposing the volume array (``self.volume``) returns a view if possible.
Reorientation method:
---------------------
- Axis transpose and flipping are linear operations and therefore can be treated
independently.
- working example: ('AP', 'SI', 'LR') --> ('RL', 'PA', 'SI')
1. Transpose volume and RAS orientation to appropriate column in matrix
eg. ('AP', 'SI', 'LR') --> ('LR', 'AP', 'SI') - transpose_inds=[2, 0, 1]
2. Flip volume across corresponding axes
eg. ('LR', 'AP', 'SI') --> ('RL', 'PA', 'SI') - flip axes 0,1
Reorientation method implementation:
------------------------------------
1. Transpose: Switching (transposing) axes in volume is the same as switching columns
in affine matrix
2. Flipping: Negate each column corresponding to pixel axis to flip (i, j, k) and
reestablish origins based on flipped axes
Args:
new_orientation (Sequence): New orientation.
inplace (bool, optional): If `True`, do operation in-place and return ``self``.
Returns:
MedicalVolume: The reformatted volume. If ``inplace=True``, returns ``self``.
"""
xp = self.device.xp
device = self.device
headers = self._headers
new_orientation = tuple(new_orientation)
if new_orientation == self.orientation:
if inplace:
return self
return self._partial_clone(volume=self._volume)
temp_orientation = self.orientation
temp_affine = np.array(self._affine)
transpose_inds = stdo.get_transpose_inds(temp_orientation, new_orientation)
all_transpose_inds = transpose_inds + tuple(range(3, self._volume.ndim))
with device:
volume = xp.transpose(self.volume, all_transpose_inds)
if headers is not None:
headers = np.transpose(headers, all_transpose_inds)
for i in range(len(transpose_inds)):
temp_affine[..., i] = self._affine[..., transpose_inds[i]]
temp_orientation = tuple([self.orientation[i] for i in transpose_inds])
flip_axs_inds = list(stdo.get_flip_inds(temp_orientation, new_orientation))
with device:
volume = xp.flip(volume, axis=tuple(flip_axs_inds))
if headers is not None:
headers = np.flip(headers, axis=tuple(flip_axs_inds))
a_vecs = temp_affine[:3, :3]
a_origin = temp_affine[:3, 3]
# phi is a vector of 1s and -1s, where 1 indicates no flip, and -1 indicates flip
# phi is used to determine which columns in affine matrix to flip
phi = np.ones([1, len(a_origin)]).flatten()
phi[flip_axs_inds] *= -1
b_vecs = np.array(a_vecs)
for i in range(len(phi)):
b_vecs[:, i] *= phi[i]
# get number of pixels to shift by on each axis.
# Should be 0 when not flipping - i.e. phi<0 mask
vol_shape_vec = (
(np.asarray(volume.shape[:3]) - 1) * (phi < 0).astype(np.float32)
).transpose()
b_origin = np.round(
a_origin.flatten() - np.matmul(b_vecs, vol_shape_vec).flatten(),
SCANNER_ORIGIN_DECIMAL_PRECISION,
)
temp_affine = np.array(self.affine)
temp_affine[:3, :3] = b_vecs
temp_affine[:3, 3] = b_origin
temp_affine[temp_affine == 0] = 0 # get rid of negative 0s
if inplace:
self._affine = temp_affine
self._volume = volume
self._headers = headers
mv = self
else:
mv = self._partial_clone(volume=volume, affine=temp_affine, headers=headers)
assert (
mv.orientation == new_orientation
), f"Orientation mismatch: Expected: {self.orientation}. Got {new_orientation}"
return mv
def reformat_as(self, other, inplace: bool = False) -> "MedicalVolume":
"""Reformat this to the same orientation as ``other``.
Equivalent to ``self.reformat(other.orientation, inplace)``.
Args:
other (MedicalVolume): The result volume has the same orientation as ``other``.
inplace (bool, optional): If `True`, do operation in-place and return ``self``.
Returns:
MedicalVolume: The reformatted volume. If ``inplace=True``, returns ``self``.
"""
return self.reformat(other.orientation, inplace=inplace)
def is_identical(self, mv):
"""Check if another medical volume is identical.
Two volumes are identical if they have the same pixel_spacing, orientation,
scanner_origin, and volume.
Args:
mv (MedicalVolume): Volume to compare with.
Returns:
bool: `True` if identical, `False` otherwise.
"""
if not isinstance(mv, MedicalVolume):
raise TypeError("`mv` must be a MedicalVolume.")
idevice = self.device
odevice = mv.device
if idevice != odevice:
raise RuntimeError(f"Expected device {idevice}, got {odevice}.")
with idevice:
return self.is_same_dimensions(mv) and (mv.volume == self.volume).all()
def _allclose_spacing(self, mv, precision: int = None, ignore_origin: bool = False):
"""Check if spacing between self and another medical volume is within tolerance.
Tolerance is `10 ** (-precision)`.
Args:
mv (MedicalVolume): Volume to compare with.
precision (`int`, optional): Number of significant figures after the decimal.
If not specified, check that affine matrices between two volumes are identical.
Defaults to `None`.
ignore_origin (bool, optional): If ``True``, ignore matching origin in the affine
matrix.
Returns:
bool: `True` if spacing between two volumes within tolerance, `False` otherwise.
"""
if precision is not None:
tol = 10 ** (-precision)
return np.allclose(mv.affine[:3, :3], self.affine[:3, :3], atol=tol) and (
ignore_origin or np.allclose(mv.scanner_origin, self.scanner_origin, rtol=tol)
)
else:
return (mv.affine == self.affine).all() or (
ignore_origin and (mv.affine[:, :3] == self.affine[:, :3]).all()
)
def is_same_dimensions(self, mv, precision: int = None, err: bool = False):
"""Check if two volumes have the same dimensions.
Two volumes have the same dimensions if they have the same pixel_spacing,
orientation, and scanner_origin.
Args:
mv (MedicalVolume): Volume to compare with.
precision (`int`, optional): Number of significant figures after the decimal.
If not specified, check that affine matrices between two volumes are identical.
Defaults to `None`.
err (bool, optional): If `True` and volumes do not have same dimensions,
raise descriptive ValueError.
Returns:
bool: ``True`` if pixel spacing, orientation, and scanner origin
between two volumes within tolerance, ``False`` otherwise.
Raises:
TypeError: If ``mv`` is not a MedicalVolume.
ValueError: If ``err=True`` and two volumes do not have same dimensions.
"""
if not isinstance(mv, MedicalVolume):
raise TypeError("`mv` must be a MedicalVolume.")
is_close_spacing = self._allclose_spacing(mv, precision)
is_same_orientation = mv.orientation == self.orientation
is_same_shape = mv.volume.shape == self.volume.shape
out = is_close_spacing and is_same_orientation and is_same_shape
if err and not out:
tol_str = f" (tol: 1e-{precision})" if precision else ""
if not is_close_spacing:
raise ValueError(
"Affine matrices not equal{}:\n{}\n{}".format(tol_str, self._affine, mv._affine)
)
if not is_same_orientation:
raise ValueError(
"Orientations not equal: {}, {}".format(self.orientation, mv.orientation)
)
if not is_same_shape:
raise ValueError(
"Shapes not equal: {}, {}".format(self._volume.shape, mv._volume.shape)
)
assert False # should not reach here
return out
def match_orientation(self, mv):
"""Reorient another MedicalVolume to orientation specified by self.orientation.
Args:
mv (MedicalVolume): Volume to reorient.
"""
warnings.warn(
"`match_orientation` is deprecated and will be removed in v0.1. "
"Use `mv.reformat_as(self, inplace=True)` instead.",
DeprecationWarning,
)
if not isinstance(mv, MedicalVolume):
raise TypeError("`mv` must be a MedicalVolume.")
mv.reformat(self.orientation, inplace=True)
def match_orientation_batch(self, mvs): # pragma: no cover
"""Reorient a collection of MedicalVolumes to orientation specified by self.orientation.
Args:
mvs (list[MedicalVolume]): Collection of MedicalVolumes.
"""
warnings.warn(
"`match_orientation_batch` is deprecated and will be removed in v0.1. "
"Use `[x.reformat_as(self, inplace=True) for x in mvs]` instead.",
DeprecationWarning,
)
for mv in mvs:
self.match_orientation(mv)
def clone(self, headers=True):
"""Clones the medical volume.
Args:
headers (bool, optional): If `True`, clone headers.
If `False`, headers have shared memory.
Returns:
mv (MedicalVolume): A cloned MedicalVolume.
"""
return MedicalVolume(
self.volume.copy(),
self.affine.copy(),
headers=deepcopy(self._headers) if headers else self._headers,
)
def to(self, device):
"""Move to device.
If on same device, no-op and returns ``self``.
Args:
device: The device to move to.
Returns:
MedicalVolume
"""
device = Device(device)
if self.device == device:
return self
return self._partial_clone(volume=to_device(self._volume, device))
def cpu(self):
"""Move to cpu."""
return self.to("cpu")
def astype(self, dtype, **kwargs):
"""Modifies dtype of ``self._volume``.
Note this operation is done in place. ``self._volume`` is modified, based
on the ``astype`` implementation of the type associated with ``self._volume``.
No new MedicalVolume is created - ``self`` is returned.
Args:
dtype (str or dtype): Typecode or data-type to which the array is cast.
Returns:
self
"""
if (
env.package_available("h5py")
and isinstance(self._volume, h5py.Dataset)
and version.parse(env.get_version(h5py)) < version.parse("3.0.0")
):
raise ValueError("Cannot cast h5py.Dataset to dtype for h5py<3.0.0")
self._volume = self._volume.astype(dtype, **kwargs)
return self
def to_nib(self):
"""Converts to nibabel Nifti1Image.
Returns:
nibabel.Nifti1Image: The nibabel image.
Raises:
RuntimeError: If medical volume is not on the cpu.
Examples:
>>> mv = MedicalVolume(np.ones((10,20,30)), np.eye(4))
>>> mv.to_nib()
<nibabel.nifti1.Nifti1Image>
"""
device = self.device
if device != cpu_device:
raise RuntimeError(f"MedicalVolume must be on cpu, got {self.device}")
return nib.Nifti1Image(self.A, self.affine.copy())
def to_sitk(self, vdim: int = None, transpose_inplane: bool = False):
"""Converts to SimpleITK Image.
SimpleITK Image objects support vector pixel types, which are represented
as an extra dimension in numpy arrays. The vector dimension can be specified
with ``vdim``.
MedicalVolume must be on cpu. Use ``self.cpu()`` to move.
SimpleITK loads DICOM files as individual slices that get stacked in ``(z, x, y)``
order. Thus, ``sitk.GetArrayFromImage`` returns an array in ``(y, x, z)`` order.
To return a SimpleITK Image that will follow this convention, set
``transpose_inplace=True``. If you have been using SimpleITK to load DICOM files,
you will likely want to specify this parameter.
Args:
vdim (int, optional): The vector dimension.
transpose_inplane (bool, optional): If ``True``, transpose inplane axes.
Recommended to be ``True`` for users who are familiar with SimpleITK's
DICOM loading convention.
Returns:
SimpleITK.Image
Raises:
ImportError: If `SimpleITK` is not installed.
RuntimeError: If MedicalVolume is not on cpu.
Note:
Header information is not currently copied.
"""
if not env.sitk_available():
raise ImportError("SimpleITK is not installed. Install it with `pip install simpleitk`")
device = self.device
if device != cpu_device:
raise RuntimeError(f"MedicalVolume must be on cpu, got {self.device}")
arr = self.volume
ndim = arr.ndim
if vdim is not None:
if vdim < 0:
vdim = ndim + vdim
axes = tuple(i for i in range(ndim) if i != vdim)[::-1] + (vdim,)
else:
axes = range(ndim)[::-1]
arr = np.transpose(arr, axes)
affine = self.affine.copy()
affine[:2] = -affine[:2] # RAS+ -> LPS+
origin = tuple(affine[:3, 3])
spacing = self.pixel_spacing
direction = affine[:3, :3] / np.asarray(spacing)
img = sitk.GetImageFromArray(arr, isVector=vdim is not None)
img.SetOrigin(origin)
img.SetSpacing(spacing)
img.SetDirection(tuple(direction.flatten()))
if transpose_inplane:
pa = sitk.PermuteAxesImageFilter()
pa.SetOrder([1, 0, 2])
img = pa.Execute(img)
return img
def to_torch(
self, requires_grad: bool = False, contiguous: bool = False, view_as_real: bool = False
):
"""Zero-copy conversion to torch tensor.
If torch version supports complex tensors (i.e. torch>=1.5.0), complex MedicalVolume
arrays will be converted into complex tensors (torch.complex64/torch.complex128).
Otherwise, tensors will be returned as the real view, where the last dimension has
two channels (`tensor.shape[-1]==2`). `[..., 0]` and `[..., 1]` correspond to the
real/imaginary channels, respectively.
Args:
requires_grad (bool, optional): Set ``.requires_grad`` for output tensor.
contiguous (bool, optional): Make output tensor contiguous before returning.
view_as_real (bool, optional): If ``True`` and underlying array is complex,
returns a real view of a complex tensor.
Returns:
torch.Tensor: The torch tensor.
Raises:
ImportError: If ``torch`` is not installed.
Note:
This method does not convert affine matrices and headers to tensor types.
Examples:
>>> mv = MedicalVolume(np.ones((2,2,2)), np.eye(4)) # zero-copy on CPU
>>> mv.to_torch()
tensor([[[1., 1.],
[1., 1.]],
[[1., 1.],
[1., 1.]]], dtype=torch.float64)
>>> mv_gpu = MedicalVolume(cp.ones((2,2,2)), np.eye(4)) # zero-copy on GPU
>>> mv.to_torch()
tensor([[[1., 1.],
[1., 1.]],
[[1., 1.],
[1., 1.]]], device="cuda:0", dtype=torch.float64)
>>> # view complex array as real tensor
>>> mv = MedicalVolume(np.ones((3,4,5), dtype=np.complex), np.eye(4))
>>> tensor = mv.to_torch(view_as_real)
>>> tensor.shape
(3, 4, 5, 2)
"""
if not env.package_available("torch"):
raise ImportError( # pragma: no cover
"torch is not installed. Install it with `pip install torch`. "
"See https://pytorch.org/ for more information."
)
import torch
from torch.utils.dlpack import from_dlpack
device = self.device
array = self.A
if any(np.issubdtype(array.dtype, dtype) for dtype in (np.complex64, np.complex128)):
torch_version = env.get_version(torch)
supports_cplx = version.Version(torch_version) >= _TORCH_COMPLEX_SUPPORT_VERSION
if not supports_cplx or view_as_real:
with device:
shape = array.shape
array = array.view(dtype=array.real.dtype)
array = array.reshape(shape + (2,))
if device == cpu_device:
tensor = torch.from_numpy(array)
else:
tensor = from_dlpack(array.toDlpack())
tensor.requires_grad = requires_grad
if contiguous:
tensor = tensor.contiguous()
return tensor
def headers(self, flatten=False):
"""Returns headers.
If headers exist, they are currently stored as an array of
pydicom dataset headers, though this is subject to change.
Args:
flatten (bool, optional): If ``True``, flattens header array
before returning.
Returns:
Optional[ndarray[pydicom.dataset.FileDataset]]: Array of headers (if they exist).
"""
if flatten and self._headers is not None:
return self._headers.flatten()
return self._headers
def get_metadata(self, key, dtype=None, default=np._NoValue):
"""Get metadata value from first header.
The first header is defined as the first header in ``np.flatten(self._headers)``.
To extract header information for other headers, use ``self.headers()``.
Args:
key (``str`` or pydicom.BaseTag``): Metadata field to access.
dtype (type, optional): If specified, data type to cast value to.
By default for DICOM headers, data will be in the value
representation format specified by pydicom. See
``pydicom.valuerep``.
default (Any): Default value to return if `key`` not found in header.
If not specified and ``key`` not found in header, raises a KeyError.
Examples:
>>> mv.get_metadata("EchoTime")
'10.0' # this is a number type ``pydicom.valuerep.DSDecimal``
>>> mv.get_metadata("EchoTime", dtype=float)
10.0
>>> mv.get_metadata("foobar", default=0)
0
Raises:
RuntimeError: If ``self._headers`` is ``None``.
KeyError: If ``key`` not found and ``default`` not specified.
Note:
Currently header information is tied to the ``pydicom.FileDataset`` implementation.
This function is synonymous to ``dataset.<key>`` in ``pydicom.FileDataset``.
"""
if self._headers is None:
raise RuntimeError("No headers found. MedicalVolume must be initialized with `headers`")
headers = self.headers(flatten=True)
if key not in headers[0] and default != np._NoValue:
return default
else:
element = headers[0][key]
val = element.value
if dtype is not None:
val = dtype(val)
return val
def set_metadata(self, key, value, force: bool = False):
"""Sets metadata for all headers.
Args:
key (str or pydicom.BaseTag): Metadata field to access.
value (Any): The value.
force (bool, optional): If ``True``, force the header to
set key even if key does not exist in header.
Raises:
RuntimeError: If ``self._headers`` is ``None``.
"""
if self._headers is None:
if not force:
raise ValueError(
"No headers found. To generate headers and write keys, `force` must be True."
)
self._headers = self._validate_and_format_headers([pydicom.Dataset()])
warnings.warn(
"Headers were generated and may not contain all attributes "
"required to save the volume in DICOM format."
)
VR_registry = {float: "DS", int: "IS", str: "LS"}
for h in self.headers(flatten=True):
if force and key not in h:
try:
setattr(h, key, value)
except TypeError:
h.add_new(key, VR_registry[type(value)], value)
else:
h[key].value = value
def materialize(self):
if not self.is_mmap:
return self
def round(self, decimals=0, affine=False) -> "MedicalVolume":
"""Round array (and optionally affine matrix).
Args:
decimals (int, optional): Number of decimals to round to.
affine (bool, optional): The rounded medical volume.
Returns:
MedicalVolume: MedicalVolume with rounded.
"""
from dosma.core.numpy_routines import around
return around(self, decimals, affine)
def sum(
self,
axis=None,
dtype=None,
out=None,
keepdims=False,
initial=np._NoValue,
where=np._NoValue,
) -> "MedicalVolume":
"""Compute the arithmetic sum along the specified axis. Identical to :meth:`sum_np`.
See :meth:`sum_np` for more information.
Args:
axis: Same as :meth:`sum_np`.
dtype: Same as :meth:`sum_np`.
out: Same as :meth:`sum_np`.
keepdims: Same as :meth:`sum_np`.
initial: Same as :meth:`sum_np`.
where: Same as :meth:`sum_np`.
Returns:
Union[Number, MedicalVolume]: If ``axis=None``, returns a number or a scalar type of
the underlying ndarray. Otherwise, returns a medical volume containing sum
values.
"""
from dosma.core.numpy_routines import sum_np
# `out` is required for cupy arrays because of how cupy calls array.
if out is not None:
raise ValueError("`out` must be None")
return sum_np(self, axis=axis, dtype=dtype, keepdims=keepdims, initial=initial, where=where)
def mean(
self, axis=None, dtype=None, out=None, keepdims=False, where=np._NoValue
) -> Union[Number, "MedicalVolume"]:
"""Compute the arithmetic mean along the specified axis. Identical to :meth:`mean_np`.
See :meth:`mean_np` for more information.
Args:
axis: Same as :meth:`mean_np`.
dtype: Same as :meth:`mean_np`.
out: Same as :meth:`mean_np`.
keepdims: Same as :meth:`mean_np`.
initial: Same as :meth:`mean_np`.
where: Same as :meth:`mean_np`.
Returns:
Union[Number, MedicalVolume]: If ``axis=None``, returns a number or a scalar type of
the underlying ndarray. Otherwise, returns a medical volume containing mean
values.
"""
from dosma.core.numpy_routines import mean_np
# `out` is required for cupy arrays because of how cupy calls array.
if out is not None:
raise ValueError("`out` must be None")
return mean_np(self, axis=axis, dtype=dtype, keepdims=keepdims, where=where)
@property
def A(self):
"""The pixel array. Same as ``self.volume``.
Examples:
>>> mv = MedicalVolume([[[1,2],[3,4]]], np.eye(4))
>>> mv.A
array([[[1, 2],
[3, 4]]])
"""
return self.volume
@property
def volume(self):
"""ndarray: ndarray representing volume values."""
return self._volume
@volume.setter
def volume(self, value):
"""
If the volume is of a different shape, the headers are no longer valid,
so delete all reorientations are done as part of MedicalVolume,
so reorientations are permitted.
However, external setting of the volume to a different shape array is not allowed.
"""
if value.ndim != self._volume.ndim:
raise ValueError("New volume must be same as current volume")
if self._volume.shape != value.shape:
self._headers = None
self._volume = value
self._device = get_device(self._volume)
@property
def pixel_spacing(self):
"""tuple[float]: Pixel spacing in order of current orientation."""
vecs = self._affine[:3, :3]
ps = tuple(np.sqrt(np.sum(vecs ** 2, axis=0)))
assert len(ps) == 3, "Pixel spacing must have length of 3"
return ps
@property
def orientation(self):
"""tuple[str]: Image orientation in standard orientation format.
See orientation.py for more information on conventions.
"""
nib_orientation = nib.aff2axcodes(self._affine)
return stdo.orientation_nib_to_standard(nib_orientation)
@property
def scanner_origin(self):
"""tuple[float]: Scanner origin in global RAS+ x,y,z coordinates."""
return tuple(self._affine[:3, 3])
@property
def affine(self):
"""np.ndarray: 4x4 affine matrix for volume in current orientation."""
return self._affine
@property
def shape(self) -> Tuple[int, ...]:
"""The shape of the underlying ndarray."""
return self._volume.shape
@property
def ndim(self) -> int:
"""int: The number of dimensions of the underlying ndarray."""
return self._volume.ndim
@property
def device(self) -> Device:
"""The device the object is on."""
return get_device(self._volume)
@property
def dtype(self):
"""The ``dtype`` of the ndarray. Same as ``self.volume.dtype``."""
return self._volume.dtype
@property
def is_mmap(self) -> bool:
"""bool: Whether the volume is a memory-mapped array."""
# important to check if .base is a python mmap object, since a view of a mmap
# is also a memmap object, but should not be symlinked or copied
return isinstance(self.A, np.memmap) and isinstance(self.A.base, mmap)
@classmethod
def from_nib(
cls, image, affine_precision: int = None, origin_precision: int = None, mmap: bool = False
) -> "MedicalVolume":
"""Constructs MedicalVolume from nibabel images.
Args:
image (nibabel.Nifti1Image): The nibabel image to convert.
affine_precision (int, optional): If specified, rounds the i/j/k coordinate
vectors in the affine matrix to this decimal precision.
origin_precision (int, optional): If specified, rounds the scanner origin
in the affine matrix to this decimal precision.
mmap (bool, optional): If True, memory map the image.
Returns:
MedicalVolume: The medical image.
Examples:
>>> import nibabel as nib
>>> nib_img = nib.Nifti1Image(np.ones((10,20,30)), np.eye(4))
>>> MedicalVolume.from_nib(nib_img)
MedicalVolume(
shape=(10, 20, 30),
ornt=('LR', 'PA', 'IS')),
spacing=(1.0, 1.0, 1.0),
origin=(0.0, 0.0, 0.0),
device=Device(type='cpu')
)
"""
affine = np.array(image.affine) # Make a copy of the affine matrix.
if affine_precision is not None:
affine[:3, :3] = np.round(affine[:3, :3], affine_precision)
if origin_precision:
affine[:3, 3] = np.round(affine[:3, 3], origin_precision)
data = image.dataobj.__array__() if mmap else image.get_fdata()
mv = cls(data, affine)
if mmap and not mv.is_mmap:
raise ValueError(
"Underlying array in the nibabel image is not mem-mapped. " "Please set mmap=False."
)
return mv
@classmethod
def from_sitk(cls, image, copy=False, transpose_inplane: bool = False) -> "MedicalVolume":
"""Constructs MedicalVolume from SimpleITK.Image.
Use ``transpose_inplane=True`` if the SimpleITK image was loaded with SimpleITK's
DICOM reader or if ``transpose_inplace=True`` was used to create the Image
with :meth:`to_sitk`. See the discussion of SimpleITK's data ordering convention
in :meth:`to_sitk` for more information.
If you are getting a segmentation fault, try using ``copy=True``.
Args:
image (SimpleITK.Image): The image.
copy (bool, optional): If ``True``, copies array.
transpose_inplane (bool, optional): If ``True``, transposes the inplane axes.
Set this to ``True`` if the SimpleITK image was loaded with SimpleITK's
DICOM reader. May need to set ``copy=True`` to avoid segmentation fault.
Returns:
MedicalVolume
Note:
Metadata information is not copied.
"""
if not env.sitk_available():
raise ImportError("SimpleITK is not installed. Install it with `pip install simpleitk`")
if len(image.GetSize()) < 3:
raise ValueError("`image` must be 3D.")
is_vector_image = image.GetNumberOfComponentsPerPixel() > 1
if transpose_inplane:
pa = sitk.PermuteAxesImageFilter()
pa.SetOrder([1, 0, 2])
image = pa.Execute(image)
if copy:
arr = sitk.GetArrayFromImage(image)
else:
arr = sitk.GetArrayViewFromImage(image)
ndim = arr.ndim
if is_vector_image:
axes = tuple(range(ndim)[-2::-1]) + (ndim - 1,)
else:
axes = range(ndim)[::-1]
arr = np.transpose(arr, axes)
origin = image.GetOrigin()
spacing = image.GetSpacing()
direction = np.asarray(image.GetDirection()).reshape(-1, 3)
affine = np.zeros((4, 4))
affine[:3, :3] = direction * np.asarray(spacing)
affine[:3, 3] = origin
affine[:2] = -affine[:2] # LPS+ -> RAS+
affine[3, 3] = 1
return cls(arr, affine)
@classmethod
def from_torch(cls, tensor, affine, headers=None, to_complex: bool = None) -> "MedicalVolume":
"""Zero-copy construction from PyTorch tensor.
Args:
tensor (torch.Tensor): A PyTorch tensor where first three dimensions correspond
to spatial dimensions.
affine (np.ndarray): See class parameters.
headers (np.ndarray[pydicom.FileDataset], optional): See class parameters.
to_complex (bool, optional): If ``True``, interprets tensor as real view of complex
tensor and attempts to restructure it as a complex array.
Returns:
MedicalVolume: A medical image.
Raises:
RuntimeError: If ``affine`` is not on the cpu.
ValueError: If ``tensor`` does not have at least three spatial dimensions.
ValueError: If ``to_complex=True`` and shape is not size ``(..., 2)``.
ImportError: If ``tensor`` on GPU and ``cupy`` not installed.
Examples:
>>> import torch
>>> tensor = torch.ones((2,2,2))
>>> MedicalVolume.from_torch(tensor, affine=np.eye(4))
MedicalVolume(
shape=(2, 2, 2),
ornt=('LR', 'PA', 'IS')),
spacing=(1.0, 1.0, 1.0),
origin=(0.0, 0.0, 0.0),
device=Device(type='cpu')
)
>>> tensor = torch.ones((2,2,2), device="cuda") # zero-copy from GPU 0
>>> MedicalVolume.from_torch(tensor, affine=np.eye(4))
MedicalVolume(
shape=(2, 2, 2),
ornt=('LR', 'PA', 'IS')),
spacing=(1.0, 1.0, 1.0),
origin=(0.0, 0.0, 0.0),
device=Device(type='cuda', index=0)
)
>>> tensor = torch.ones((3,4,5,2)) # treat this tensor as view of complex tensor
>>> mv = MedicalVolume.from_torch(tensor, affine=np.eye(4), to_complex=True)
>>> print(mv)
MedicalVolume(
shape=(3,4,5),
ornt=('LR', 'PA', 'IS')),
spacing=(1.0, 1.0, 1.0),
origin=(0.0, 0.0, 0.0),
device=Device(type='cuda', index=0)
)
>>> mv.dtype
np.complex128
"""
if not env.package_available("torch"):
raise ImportError( # pragma: no cover
"torch is not installed. Install it with `pip install torch`. "
"See https://pytorch.org/ for more information."
)
import torch
from torch.utils.dlpack import to_dlpack
torch_version = env.get_version(torch)
supports_cplx = version.Version(torch_version) >= _TORCH_COMPLEX_SUPPORT_VERSION
# Check if tensor needs to be converted to np.complex type.
# If tensor is of torch.complex64 or torch.complex128 dtype, then from_numpy will take
# care of conversion to appropriate numpy dtype, and we do not need to do the to_complex
# logic.
to_complex = to_complex and (
not supports_cplx
or (supports_cplx and tensor.dtype not in (torch.complex64, torch.complex128))
)
if isinstance(affine, torch.Tensor):
if Device(affine.device) != cpu_device:
raise RuntimeError("Affine matrix must be on the cpu")
affine = affine.numpy()
if (not to_complex and tensor.ndim < 3) or (to_complex and tensor.ndim < 4):
raise ValueError(
f"Tensor must have three spatial dimensions. Got shape {tensor.shape}."
)
if to_complex and tensor.shape[-1] != 2:
raise ValueError(
f"tensor.shape[-1] must have shape 2 when to_complex is specified. "
f"Got shape {tensor.shape}."
)
device = Device(tensor.device)
if device == cpu_device:
array = tensor.detach().numpy()
else:
if env.cupy_available():
array = cp.fromDlpack(to_dlpack(tensor))
else:
raise ImportError( # pragma: no cover
"CuPy is required to convert a GPU torch.Tensor to array. "
"Follow instructions at https://docs.cupy.dev/en/stable/install.html to "
"install the correct binary."
)
if to_complex:
with get_device(array):
if array.dtype == np.float32:
array = array.view(np.complex64)
elif array.dtype == np.float64:
array = array.view(np.complex128)
array = array.reshape(array.shape[:-1])
return cls(array, affine, headers=headers)
def _partial_clone(self, **kwargs) -> "MedicalVolume":
"""Copies constructor information from ``self`` if not available in ``kwargs``."""
if kwargs.get("volume", None) is False:
# special use case to not clone volume
kwargs["volume"] = self._volume
for k in ("volume", "affine"):
if k not in kwargs or (kwargs[k] is True):
kwargs[k] = getattr(self, f"_{k}").copy()
if "headers" not in kwargs:
kwargs["headers"] = self._headers
elif isinstance(kwargs["headers"], bool) and kwargs["headers"]:
kwargs["headers"] = deepcopy(self._headers)
return self.__class__(**kwargs)
def _validate_and_format_headers(self, headers):
"""Validate headers are of appropriate shape and format into standardized shape.
Headers are stored an ndarray of dictionary-like objects with explicit dimensions
that match the dimensions of ``self._volume``. If header objects are not
Assumes ``self._volume`` and ``self._affine`` have been set.
"""
headers = np.asarray(headers)
if headers.ndim > self._volume.ndim:
raise ValueError(
f"`headers` has too many dimensions. "
f"Got headers.ndim={headers.ndim}, but volume.ndim={self._volume.ndim}"
)
for dim in range(-headers.ndim, 0)[::-1]:
if headers.shape[dim] not in (1, self._volume.shape[dim]):
raise ValueError(
f"`headers` must follow standard broadcasting shape. "
f"Got headers.shape={headers.shape}, but volume.shape={self._volume.shape}"
)
ndim = self._volume.ndim
shape = (1,) * (ndim - len(headers.shape)) + headers.shape
headers = np.reshape(headers, shape)
return headers
def _extract_input_array_ufunc(self, input, device=None):
if device is None:
device = self.device
device_err = "Expected device {} but got device ".format(device) + "{}"
if isinstance(input, Number):
return input
elif isinstance(input, np.ndarray):
if device != cpu_device:
raise RuntimeError(device_err.format(cpu_device))
return input
elif env.cupy_available() and isinstance(input, cp.ndarray):
if device != input.device:
raise RuntimeError(device_err.format(Device(input.device)))
return input
elif isinstance(input, MedicalVolume):
if device != input.device:
raise RuntimeError(device_err.format(Device(input.device)))
assert self.is_same_dimensions(input, err=True)
return input._volume
else:
return NotImplemented
def _check_reduce_axis(self, axis: Union[int, Sequence[int]]) -> Tuple[int]:
if axis is None:
return None
is_sequence = isinstance(axis, Sequence)
if not is_sequence:
axis = (axis,)
axis = tuple(x if x >= 0 else self.volume.ndim + x for x in axis)
assert all(x >= 0 for x in axis)
if any(x < 3 for x in axis):
raise ValueError("Cannot reduce MedicalVolume along spatial dimensions")
if not is_sequence:
axis = axis[0]
return axis
def _reduce_array(self, func, *inputs, **kwargs) -> "MedicalVolume":
"""
Assumes inputs have been verified.
"""
device = self.device
xp = device.xp
keepdims = kwargs.get("keepdims", False)
reduce_axis = self._check_reduce_axis(kwargs["axis"])
kwargs["axis"] = reduce_axis
if not isinstance(reduce_axis, Sequence):
reduce_axis = (reduce_axis,)
with device:
volume = func(*inputs, **kwargs)
if xp.isscalar(volume) or volume.ndim == 0:
return volume
if self._headers is not None:
headers_slices = tuple(
slice(None) if x not in reduce_axis else slice(0, 1) if keepdims else 0
for x in range(self._headers.ndim)
)
headers = self._headers[headers_slices]
else:
headers = None
return self._partial_clone(volume=volume, headers=headers)
def __getitem__(self, _slice):
if isinstance(_slice, MedicalVolume):
_slice = _slice.reformat_as(self).A
slicer = _SpatialFirstSlicer(self)
try:
_slice = slicer.check_slicing(_slice)
except ValueError as err:
raise IndexError(*err.args)
volume = self._volume[_slice]
if any(dim == 0 for dim in volume.shape):
raise IndexError("Empty slice requested")
headers = self._headers
if headers is not None:
_slice_headers = []
for idx, x in enumerate(_slice):
if headers.shape[idx] == 1 and not isinstance(x, int):
_slice_headers.append(slice(None))
elif headers.shape[idx] == 1 and isinstance(x, int):
_slice_headers.append(0)
else:
_slice_headers.append(x)
headers = headers[_slice_headers]
affine = slicer.slice_affine(_slice)
return self._partial_clone(volume=volume, affine=affine, headers=headers)
def __setitem__(self, _slice, value):
"""
Note:
When ``value`` is a ``MedicalVolume``, the headers from that value
are not copied over. This may be changed in the future.
"""
if isinstance(value, MedicalVolume):
image = self[_slice]
assert value.is_same_dimensions(image, err=True)
value = value._volume
with self.device:
self._volume[_slice] = value
if self.is_mmap and self._volume.mode == "c":
self._volume = np.asarray(self._volume)
def __repr__(self) -> str:
nl = "\n"
nltb = "\n "
return (
f"{self.__class__.__name__}({nltb}shape={self.shape},{nltb}"
f"ornt={self.orientation}),{nltb}spacing={self.pixel_spacing},{nltb}"
f"origin={self.scanner_origin},{nltb}device={self.device}{nl})"
)
def _iops(self, other, op):
"""Helper function for i-type ops (__iadd__, __isub__, etc.)"""
if isinstance(other, MedicalVolume):
assert self.is_same_dimensions(other, err=True)
other = other.volume
if isinstance(op, str):
op = getattr(self._volume, op)
op(other)
if self.is_mmap and self._volume.mode == "c":
self._volume = np.asarray(self._volume)
return self
def __iadd__(self, other):
return self._iops(other, self._volume.__iadd__)
def __ifloordiv__(self, other):
return self._iops(other, self._volume.__ifloordiv__)
def __imul__(self, other):
return self._iops(other, self._volume.__imul__)
def __ipow__(self, other):
return self._iops(other, self._volume.__ipow__)
def __isub__(self, other):
return self._iops(other, self._volume.__isub__)
def __itruediv__(self, other):
return self._iops(other, self._volume.__itruediv__)
def __array__(self):
"""Wrapper for performing numpy operations on MedicalVolume array.
Examples:
>>> a = np.asarray(mv)
>>> type(a)
<class 'numpy.ndarray'>
Note:
This is not valid when ``self.volume`` is a ``cupy.ndarray``.
All CUDA ndarrays must first be moved to the cpu.
"""
try:
return np.asarray(self.volume)
except TypeError:
raise TypeError(
"Implicit conversion to a NumPy array is not allowed. "
"Please use `.cpu()` to move the array to the cpu explicitly "
"before constructing a NumPy array."
)
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
def _extract_inputs(inputs, device):
_inputs = []
for input in inputs:
input = self._extract_input_array_ufunc(input, device)
if input is NotImplemented:
return input
_inputs.append(input)
return _inputs
if method not in ["__call__", "reduce"]:
return NotImplemented
device = self.device
_inputs = _extract_inputs(inputs, device)
if _inputs is NotImplemented:
return NotImplemented
if method == "__call__":
with device:
volume = ufunc(*_inputs, **kwargs)
if volume.shape != self._volume.shape:
raise ValueError(
f"{self.__class__.__name__} does not support operations that change shape. "
f"Use operations on `self.volume` to modify array objects."
)
return self._partial_clone(volume=volume)
elif method == "reduce":
return self._reduce_array(ufunc.reduce, *_inputs, **kwargs)
def __array_function__(self, func, types, args, kwargs):
from dosma.core.numpy_routines import _HANDLED_NUMPY_FUNCTIONS
if func not in _HANDLED_NUMPY_FUNCTIONS:
return NotImplemented
# Note: this allows subclasses that don't override
# __array_function__ to handle MedicalVolume objects.
if not all(issubclass(t, (MedicalVolume, self.__class__)) for t in types):
return NotImplemented
return _HANDLED_NUMPY_FUNCTIONS[func](*args, **kwargs)
@property
def __cuda_array_interface__(self):
"""Wrapper for performing cupy operations on MedicalVolume array."""
if self.device == cpu_device:
raise TypeError(
"Implicit conversion to a CuPy array is not allowed. "
"Please use `.to(device)` to move the array to the gpu explicitly "
"before constructing a CuPy array."
)
return self.volume.__cuda_array_interface__
class _SpatialFirstSlicer(_SpatialFirstSlicerNib):
def __init__(self, img):
self.img = img
def __getitem__(self, slicer):
raise NotImplementedError("Slicing should be done by `MedicalVolume`")