"""Tests for `wsic` package."""
import sys
import warnings
from pathlib import Path
from typing import Dict
import cv2 as _cv2 # Avoid adding "cv2" to sys.modules for fallback tests
import numpy as np
import pytest
import tifffile
import zarr
from wsic import readers, utils, writers
from wsic.enums import Codec, ColorSpace
from wsic.readers import Reader
from wsic.writers import Writer
def test_jp2_to_deflate_tiled_tiff(samples_path, tmp_path):
"""Test that we can convert a JP2 to a DEFLATE compressed tiled TIFF."""
with warnings.catch_warnings():
warnings.simplefilter("error")
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
writer = writers.TIFFWriter(
path=tmp_path / "XYC.tiff",
shape=reader.shape,
overwrite=False,
tile_size=(256, 256),
codec="deflate",
microns_per_pixel=(0.5, 0.5), # the input .jp2 has no resolution box
)
writer.copy_from_reader(reader=reader, num_workers=3, read_tile_size=(512, 512))
assert writer.path.exists()
assert writer.path.is_file()
assert writer.path.stat().st_size > 0
output = tifffile.imread(writer.path)
assert np.all(reader[:512, :512] == output[:512, :512])
def test_jp2_to_deflate_pyramid_tiff(samples_path, tmp_path):
"""Test that we can convert a JP2 to a DEFLATE compressed pyramid TIFF."""
pyramid_downsamples = [2, 4]
with warnings.catch_warnings():
warnings.simplefilter("error")
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
writer = writers.TIFFWriter(
path=tmp_path / "XYC.tiff",
shape=reader.shape,
overwrite=False,
tile_size=(256, 256),
codec="deflate",
pyramid_downsamples=pyramid_downsamples,
microns_per_pixel=(0.5, 0.5), # the input .jp2 has no resolution box
)
writer.copy_from_reader(reader=reader, num_workers=3, read_tile_size=(512, 512))
assert writer.path.exists()
assert writer.path.is_file()
assert writer.path.stat().st_size > 0
output = tifffile.imread(writer.path)
assert np.all(reader[:512, :512] == output[:512, :512])
tif = tifffile.TiffFile(writer.path)
assert len(tif.series[0].levels) == len(pyramid_downsamples) + 1
def test_no_tqdm(samples_path, tmp_path, monkeypatch):
"""Test making a pyramid TIFF with no tqdm (progress bar) installed."""
# Make tqdm unavailable
monkeypatch.setitem(sys.modules, "tqdm", None)
monkeypatch.setitem(sys.modules, "tqdm.auto", None)
# Sanity check the imports fail
with pytest.raises(ImportError):
import tqdm # noqa
with pytest.raises(ImportError):
from tqdm.auto import tqdm # noqa
pyramid_downsamples = [2, 4]
with warnings.catch_warnings():
warnings.simplefilter("error")
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
writer = writers.TIFFWriter(
path=tmp_path / "XYC.tiff",
shape=reader.shape,
overwrite=False,
tile_size=(256, 256),
codec="deflate",
pyramid_downsamples=pyramid_downsamples,
microns_per_pixel=(0.5, 0.5), # the input .jp2 has no resolution box
)
writer.copy_from_reader(reader=reader, num_workers=3, read_tile_size=(512, 512))
assert writer.path.exists()
assert writer.path.is_file()
assert writer.path.stat().st_size > 0
output = tifffile.imread(writer.path)
assert np.all(reader[:512, :512] == output[:512, :512])
tif = tifffile.TiffFile(writer.path)
assert len(tif.series[0].levels) == len(pyramid_downsamples) + 1
def test_pyramid_tiff(samples_path, tmp_path, monkeypatch):
"""Test pyramid generation using OpenCV to downsample."""
# Try to make a pyramid TIFF
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
pyramid_downsamples = [2, 4]
writer = writers.TIFFWriter(
path=tmp_path / "XYC.tiff",
shape=reader.shape,
overwrite=False,
tile_size=(256, 256),
codec="deflate",
pyramid_downsamples=pyramid_downsamples,
)
writer.copy_from_reader(
reader=reader, num_workers=3, read_tile_size=(512, 512), downsample_method="cv2"
)
assert writer.path.exists()
assert writer.path.is_file()
assert writer.path.stat().st_size > 0
output = tifffile.imread(writer.path)
assert np.all(reader[:512, :512] == output[:512, :512])
tif = tifffile.TiffFile(writer.path)
assert len(tif.series[0].levels) == len(pyramid_downsamples) + 1
def test_pyramid_tiff_no_cv2(samples_path, tmp_path, monkeypatch):
"""Test pyramid generation when cv2 is not installed.
This will use SciPy. This method has a high error on synthetic data,
e.g. a test grid image. It performns better on natural images.
"""
# Make cv2 unavailable
monkeypatch.setitem(sys.modules, "cv2", None)
# Sanity check the import fails
with pytest.raises(ImportError):
import cv2 # noqa # skipcq
# Try to make a pyramid TIFF
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
pyramid_downsamples = [2, 4]
writer = writers.TIFFWriter(
path=tmp_path / "XYC.tiff",
shape=reader.shape,
overwrite=False,
tile_size=(256, 256),
codec="deflate",
pyramid_downsamples=pyramid_downsamples,
)
writer.copy_from_reader(
reader=reader,
num_workers=3,
read_tile_size=(512, 512),
downsample_method="scipy",
)
assert writer.path.exists()
assert writer.path.is_file()
assert writer.path.stat().st_size > 0
output = tifffile.imread(writer.path)
assert np.all(reader[:512, :512] == output[:512, :512])
tif = tifffile.TiffFile(writer.path)
level_0 = tif.series[0].levels[0].asarray()
assert len(tif.series[0].levels) == len(pyramid_downsamples) + 1
for level in tif.series[0].levels[:2]:
level_array = level.asarray()
level_size = level_array.shape[:2][::-1]
resized_level_0 = _cv2.resize(level_0, level_size)
level_array = _cv2.GaussianBlur(level_array, (11, 11), 0)
resized_level_0 = _cv2.GaussianBlur(resized_level_0, (11, 11), 0)
mse = ((level_array.astype(float) - resized_level_0.astype(float)) ** 2).mean()
assert mse < 200
assert len(np.unique(level_array)) > 1
assert resized_level_0.mean() == pytest.approx(level_array.mean(), abs=5)
assert np.allclose(level_array, resized_level_0, atol=50)
def test_pyramid_tiff_no_cv2_no_scipy(samples_path, tmp_path, monkeypatch):
"""Test pyramid generation when neither cv2 or scipy are installed."""
# Make cv2 and scipy unavailable
monkeypatch.setitem(sys.modules, "cv2", None)
monkeypatch.setitem(sys.modules, "scipy", None)
# Sanity check the imports fail
with pytest.raises(ImportError):
import cv2 # noqa # skipcq
with pytest.raises(ImportError):
import scipy # noqa # skipcq
# Try to make a pyramid TIFF
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
pyramid_downsamples = [2, 4]
writer = writers.TIFFWriter(
path=tmp_path / "XYC.tiff",
shape=reader.shape,
overwrite=False,
tile_size=(256, 256),
codec="deflate",
pyramid_downsamples=pyramid_downsamples,
)
writer.copy_from_reader(
reader=reader, num_workers=3, read_tile_size=(512, 512), downsample_method="np"
)
assert writer.path.exists()
assert writer.path.is_file()
assert writer.path.stat().st_size > 0
output = tifffile.imread(writer.path)
assert np.all(reader[:512, :512] == output[:512, :512])
tif = tifffile.TiffFile(writer.path)
level_0 = tif.series[0].levels[0].asarray()
assert len(tif.series[0].levels) == len(pyramid_downsamples) + 1
# Check that the levels are not blank and have a sensible range
for level in tif.series[0].levels[:2]:
level_array = level.asarray()
level_size = level_array.shape[:2][::-1]
resized_level_0 = _cv2.resize(level_0, level_size)
mse = ((level_array.astype(float) - resized_level_0.astype(float)) ** 2).mean()
assert mse < 10
assert len(np.unique(level_array)) > 1
assert resized_level_0.mean() == pytest.approx(level_array.mean(), abs=1)
assert np.allclose(level_array, resized_level_0, atol=1)
def test_jp2_to_webp_tiled_tiff(samples_path, tmp_path):
"""Test that we can convert a JP2 to a WebP compressed tiled TIFF."""
with warnings.catch_warnings():
warnings.simplefilter("error")
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
writer = writers.TIFFWriter(
path=tmp_path / "XYC.tiff",
shape=reader.shape,
overwrite=False,
tile_size=(256, 256),
codec="WebP",
compression_level=-1, # <0 for lossless
microns_per_pixel=(0.5, 0.5), # input has no resolution box
)
writer.copy_from_reader(reader=reader, num_workers=3, read_tile_size=(512, 512))
assert writer.path.exists()
assert writer.path.is_file()
assert writer.path.stat().st_size > 0
output = tifffile.imread(writer.path)
assert np.all(reader[:512, :512] == output[:512, :512])
def test_jp2_to_zarr(samples_path, tmp_path):
"""Convert JP2 to a single level Zarr."""
with warnings.catch_warnings():
warnings.simplefilter("error")
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
writer = writers.ZarrWriter(
path=tmp_path / "XYC.zarr",
shape=reader.shape,
)
writer.copy_from_reader(
reader=reader,
num_workers=3,
read_tile_size=(512, 512),
)
assert writer.path.exists()
assert writer.path.is_dir()
assert list(writer.path.iterdir())
output = zarr.open(writer.path)
assert np.all(reader[:512, :512] == output[0][:512, :512])
def test_jp2_to_pyramid_zarr(samples_path, tmp_path):
"""Convert JP2 to a pyramid Zarr."""
with warnings.catch_warnings():
warnings.simplefilter("error")
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
pyramid_downsamples = [2, 4, 8, 16, 32]
writer = writers.ZarrWriter(
path=tmp_path / "XYC.zarr",
shape=reader.shape,
pyramid_downsamples=pyramid_downsamples,
tile_size=(256, 256),
)
writer.copy_from_reader(reader=reader, num_workers=3, read_tile_size=(256, 256))
assert writer.path.exists()
assert writer.path.is_dir()
assert list(writer.path.iterdir())
output = zarr.open(writer.path)
assert np.all(reader[:512, :512] == output[0][:512, :512])
for level, dowmsample in zip(output.values(), [1] + pyramid_downsamples):
assert level.shape[:2] == (
reader.shape[0] // dowmsample,
reader.shape[1] // dowmsample,
)
def test_warn_unused(samples_path, tmp_path):
"""Test the warning about unsued arguments."""
reader = readers.Reader.from_file(samples_path / "XYC.jp2")
with pytest.warns(UserWarning):
writers.JP2Writer(
path=tmp_path / "XYC.tiff",
shape=reader.shape,
overwrite=False,
tile_size=(256, 256),
codec="WebP",
compression_level=70,
)
def test_read_zarr_array(tmp_path):
"""Test that we can open a Zarr array."""
# Create a Zarr array
array = zarr.open(
tmp_path / "test.zarr",
mode="w",
shape=(10, 10),
chunks=(2, 2),
dtype=np.uint8,
)
array[:] = np.random.randint(0, 255, size=(10, 10))
# Open the array
reader = readers.Reader.from_file(tmp_path / "test.zarr")
assert reader.shape == (10, 10)
assert reader.dtype == np.uint8
def test_tiff_get_tile(samples_path):
"""Test getting a tile from a TIFF."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
tile = reader.get_tile((1, 1), decode=False)
assert isinstance(tile, bytes)
def test_transcode_jpeg_svs_to_zarr(samples_path, tmp_path):
"""Test that we can transcode an JPEG SVS to a Zarr."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
writer = writers.ZarrWriter(
path=tmp_path / "CMU-1-Small-Region.zarr",
shape=reader.shape,
tile_size=reader.tile_shape[::-1],
dtype=reader.dtype,
)
writer.transcode_from_reader(reader=reader)
assert writer.path.exists()
assert writer.path.is_dir()
assert list(writer.path.iterdir())
output = zarr.open(writer.path)
assert np.all(reader[...] == output[0][...])
def test_transcode_svs_to_zarr(samples_path, tmp_path):
"""Test that we can transcode an J2K SVS to a Zarr."""
reader = readers.Reader.from_file(
samples_path
/ "bfconvert"
/ (
"XYC_-compression_JPEG-2000"
"_-tilex_128_-tiley_128"
"_-pyramid-scale_2"
"_-merge.ome.tiff"
)
)
writer = writers.ZarrWriter(
path=tmp_path
/ (
"XYC_-compression_JPEG-2000"
"_-tilex_128_-tiley_128_"
"-pyramid-scale_2_"
"-merge.zarr"
),
shape=reader.shape,
tile_size=reader.tile_shape[::-1],
dtype=reader.dtype,
)
writer.transcode_from_reader(reader=reader)
assert writer.path.exists()
assert writer.path.is_dir()
assert list(writer.path.iterdir())
output = zarr.open(writer.path)
original = reader[...]
new = output[0][...]
assert np.array_equal(original, new)
def test_transcode_svs_to_pyramid_ome_zarr(samples_path, tmp_path):
"""Test that we can transcode an J2K SVS to a pyramid OME-Zarr (NGFF)."""
reader = readers.Reader.from_file(
samples_path
/ "bfconvert"
/ (
"XYC_-compression_JPEG-2000"
"_-tilex_128_-tiley_128"
"_-pyramid-scale_2"
"_-merge.ome.tiff"
)
)
out_path = tmp_path / (
"XYC_-compression_JPEG-2000"
"_-tilex_128_-tiley_128_"
"-pyramid-scale_2_"
"-merge.zarr"
)
writer = writers.ZarrWriter(
path=out_path,
shape=reader.shape,
tile_size=reader.tile_shape[::-1],
dtype=reader.dtype,
pyramid_downsamples=[2, 4, 8],
ome=True,
)
writer.transcode_from_reader(reader=reader)
assert writer.path.exists()
assert writer.path.is_dir()
assert list(writer.path.iterdir())
output = zarr.open(writer.path)
original = reader[...]
new = output[0][...]
assert np.array_equal(original, new)
assert "_creator" in writer.zarr.attrs
assert "omero" in writer.zarr.attrs
assert "multiscales" in writer.zarr.attrs
def test_transcode_jpeg_dicom_wsi_to_zarr(samples_path, tmp_path):
"""Test that we can transcode a JPEG compressed DICOM WSI to a Zarr."""
reader = readers.DICOMWSIReader(samples_path / "CMU-1-Small-Region")
writer = writers.ZarrWriter(
path=tmp_path / "CMU-1.zarr",
shape=reader.shape,
tile_size=reader.tile_shape[::-1],
dtype=reader.dtype,
)
writer.transcode_from_reader(reader=reader)
assert writer.path.exists()
assert writer.path.is_dir()
assert list(writer.path.iterdir())
output = zarr.open(writer.path)
original = reader[...]
new = output[0][...]
assert original.shape == new.shape
# Allow for some slight differences in the pixel values due to
# different decoders.
difference = original.astype(np.float16) - new.astype(np.float16)
mse = (difference**2).mean()
assert mse < 1.5
assert np.percentile(np.abs(difference), 95) < 1
def test_transcode_j2k_dicom_wsi_to_zarr(samples_path, tmp_path):
"""Test that we can transcode a J2K compressed DICOM WSI to a Zarr."""
reader = readers.Reader.from_file(samples_path / "CMU-1-Small-Region-J2K")
writer = writers.ZarrWriter(
path=tmp_path / "CMU-1.zarr",
shape=reader.shape,
tile_size=reader.tile_shape[::-1],
dtype=reader.dtype,
)
writer.transcode_from_reader(reader=reader)
assert writer.path.exists()
assert writer.path.is_dir()
assert list(writer.path.iterdir())
output = zarr.open(writer.path)
original = reader[...]
new = output[0][...]
assert original.shape == new.shape
# Allow for some slight differences in the pixel values due to
# different decoders.
difference = original.astype(np.float16) - new.astype(np.float16)
mse = (difference**2).mean()
assert mse < 1.5
assert np.percentile(np.abs(difference), 95) < 1
def test_tiff_res_tags(samples_path):
"""Test that we can read the resolution tags from a TIFF."""
reader = readers.Reader.from_file(samples_path / "XYC-half-mpp.tiff")
assert reader.microns_per_pixel == (0.5, 0.5)
def test_copy_from_reader_timeout(samples_path, tmp_path):
"""Check that Writer.copy_from_reader raises IOError when timed out."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
writer = writers.ZarrWriter(
path=tmp_path / "CMU-1-Small-Region.zarr",
shape=reader.shape,
tile_size=reader.tile_shape[::-1],
dtype=reader.dtype,
)
warnings.simplefilter("ignore")
with pytest.raises(IOError, match="timed out"):
writer.copy_from_reader(reader=reader, timeout=0)
def test_block_downsample_shape():
"""Test that the block downsample shape is correct."""
shape = (135, 145)
block_shape = (32, 32)
downsample = 3
# (32, 32) / 3 = (10, 10)
# (135, 145) / 32 = (4.21875, 4.53125)
# floor((0.21875, 0.53125) * 10) = (2, 5)
# ((4, 4) * 10) + (2, 5) = (42, 45)
expected = (42, 45)
result_shape, result_tile_shape = utils.block_downsample_shape(
shape=shape, block_shape=block_shape, downsample=downsample
)
assert result_shape == expected
assert result_tile_shape == (10, 10)
def test_thumbnail(samples_path):
"""Test generating a thumbnail from a reader."""
# Compare with cv2 downsampling
reader = readers.TIFFReader(samples_path / "XYC-half-mpp.tiff")
thumbnail = reader.thumbnail(shape=(64, 64))
cv2_thumbnail = _cv2.resize(reader[...], (64, 64), interpolation=_cv2.INTER_AREA)
assert thumbnail.shape == (64, 64, 3)
assert np.allclose(thumbnail, cv2_thumbnail, atol=1)
def test_thumbnail_pil(samples_path, monkeypatch):
"""Test generating a thumbnail from a reader without cv2 installed.
This should fall back to Pillow.
"""
from PIL import Image
# Monkeypatch cv2 to not be installed
monkeypatch.setitem(sys.modules, "cv2", None)
# Sanity check that cv2 is not installed
with pytest.raises(ImportError):
import cv2 # noqa # skipcq
reader = readers.TIFFReader(samples_path / "XYC-half-mpp.tiff")
thumbnail = reader.thumbnail(shape=(64, 64))
pil_thumbnail = Image.fromarray(reader[...]).resize(
(64, 64),
resample=Image.Resampling.BOX,
)
assert thumbnail.shape == (64, 64, 3)
mse = np.mean((thumbnail - pil_thumbnail) ** 2)
assert mse < 1
assert np.allclose(thumbnail, pil_thumbnail, atol=1)
def test_thumbnail_no_cv2_no_pil(samples_path, monkeypatch):
"""Test generating a thumbnail from a reader without cv2 or Pillow installed.
This should fall back to scipy.ndimage.zoom.
"""
# Monkeypatch cv2 and Pillow to not be installed
monkeypatch.setitem(sys.modules, "cv2", None)
monkeypatch.setitem(sys.modules, "PIL", None)
# Sanity check that cv2 and Pillow are not installed
with pytest.raises(ImportError):
import cv2 # noqa # skipcq
with pytest.raises(ImportError):
import PIL # noqa # skipcq
reader = readers.TIFFReader(samples_path / "XYC-half-mpp.tiff")
thumbnail = reader.thumbnail(shape=(64, 64))
zoom = np.divide((64, 64), reader.shape[:2])
zoom = np.append(zoom, 1)
cv2_thumbnail = _cv2.resize(reader[...], (64, 64), interpolation=_cv2.INTER_AREA)
assert thumbnail.shape == (64, 64, 3)
assert np.allclose(thumbnail, cv2_thumbnail, atol=1)
def test_thumbnail_no_cv2_no_pil_no_scipy(samples_path, monkeypatch):
"""Test generating a thumbnail with nearest neighbor subsampling.
This should be the raw numpy fallaback.
"""
# Monkeypatch cv2 and Pillow to not be installed
monkeypatch.setitem(sys.modules, "cv2", None)
monkeypatch.setitem(sys.modules, "PIL", None)
monkeypatch.setitem(sys.modules, "scipy", None)
# Sanity check that modules are not installed
with pytest.raises(ImportError):
import cv2 # noqa # skipcq
with pytest.raises(ImportError):
import PIL # noqa # skipcq
with pytest.raises(ImportError):
import scipy # noqa # skipcq
reader = readers.TIFFReader(samples_path / "XYC-half-mpp.tiff")
with pytest.warns(UserWarning, match="slower"):
thumbnail = reader.thumbnail(shape=(64, 64))
cv2_thumbnail = _cv2.resize(reader[...], (64, 64), interpolation=_cv2.INTER_AREA)
assert thumbnail.shape == (64, 64, 3)
assert np.allclose(thumbnail, cv2_thumbnail, atol=1)
def test_thumbnail_non_power_two(samples_path):
"""Test generating a thumbnail from a reader.
Outputs a non power of two sized thumbnail.
"""
# Compare with cv2 downsampling
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
thumbnail = reader.thumbnail(shape=(59, 59))
cv2_thumbnail = _cv2.resize(reader[...], (59, 59), interpolation=_cv2.INTER_AREA)
assert thumbnail.shape == (59, 59, 3)
mse = np.mean((thumbnail - cv2_thumbnail) ** 2)
psnr = 10 * np.log10(255**2 / mse)
assert psnr > 30
def test_write_rgb_jpeg_svs(samples_path, tmp_path):
"""Test writing an SVS file with RGB JPEG compression."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
writer = writers.SVSWriter(
path=tmp_path / "Neo-CMU-1-Small-Region.svs",
shape=reader.shape,
pyramid_downsamples=[2, 4],
compression_level=70,
)
writer.copy_from_reader(reader=reader)
assert writer.path.exists()
assert writer.path.is_file()
# Pass the tiffile is_svs test
tiff = tifffile.TiffFile(str(writer.path))
assert tiff.is_svs
# Read and compare with OpenSlide
import openslide
with openslide.OpenSlide(str(writer.path)) as slide:
new_svs_region = slide.read_region((0, 0), 0, (1024, 1024))
with openslide.OpenSlide(str(samples_path / "CMU-1-Small-Region.svs")) as slide:
old_svs_region = slide.read_region((0, 0), 0, (1024, 1024))
# Check mean squared error
# There will be some error due to JPEG compression
mse = (np.subtract(new_svs_region, old_svs_region) ** 2).mean()
assert mse < 10
def test_write_ycbcr_jpeg_svs(samples_path, tmp_path):
"""Test writing an SVS file with YCbCr JPEG compression."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
writer = writers.SVSWriter(
path=tmp_path / "Neo-CMU-1-Small-Region.svs",
shape=reader.shape,
pyramid_downsamples=[2, 4],
compression_level=70,
color_mode="YCbCr",
)
writer.copy_from_reader(reader=reader)
assert writer.path.exists()
assert writer.path.is_file()
# Pass the tiffile is_svs test
tiff = tifffile.TiffFile(str(writer.path))
assert tiff.is_svs
# Read and compare with OpenSlide
import openslide
with openslide.OpenSlide(str(writer.path)) as slide:
new_svs_region = slide.read_region((0, 0), 0, (1024, 1024))
with openslide.OpenSlide(str(samples_path / "CMU-1-Small-Region.svs")) as slide:
old_svs_region = slide.read_region((0, 0), 0, (1024, 1024))
# Check mean squared error
mse = (np.subtract(new_svs_region, old_svs_region) ** 2).mean()
assert mse < 10
def test_write_ycrcb_j2k_svs_fails(samples_path, tmp_path):
"""Test writing an SVS file with YCrCb JP2 compression fails."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
with pytest.raises(ValueError, match="only supports JPEG"):
writers.SVSWriter(
path=tmp_path / "Neo-CMU-1-Small-Region.svs",
shape=reader.shape,
pyramid_downsamples=[2, 4],
codec=Codec.JPEG2000,
compression_level=70,
photometric=ColorSpace.YCBCR,
)
def test_write_jp2_resolution(samples_path, tmp_path):
"""Test writing a JP2 with capture resolution metadata."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
out_path = tmp_path / "CMU-1-Small-Region.jp2"
writer = writers.JP2Writer(
path=out_path,
shape=reader.shape,
pyramid_downsamples=[2, 4, 8],
compression_level=70,
microns_per_pixel=(0.5, 0.5),
)
writer.copy_from_reader(reader=reader)
jp2_reader = readers.JP2Reader(out_path)
assert jp2_reader.microns_per_pixel == (0.5, 0.5)
def test_missing_imagecodecs_codec(samples_path, tmp_path):
"""Test writing an SVS file with YCrCb JP2 compression fails."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
with pytest.raises(ValueError, match="Unknown"):
writers.ZarrWriter(
path=tmp_path / "test.zarr",
shape=reader.shape,
pyramid_downsamples=[2, 4],
codec="foo",
compression_level=70,
color_space=ColorSpace.RGB,
)
# Zarr tests for alternate stores
def test_write_read_sqlite_store_zarr(samples_path, tmp_path):
"""Test writing and reading a Zarr with an SQLite store."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
writer = writers.ZarrWriter(
shape=reader.shape,
store=zarr.SQLiteStore(tmp_path / "test.zarr.sqlite"),
)
writer.copy_from_reader(reader=reader)
writer.close()
# SQLite doesn't have a standard file extension. Therefore it is
# hard to infer that the file is a SQLite file. We must pass a
# zarr.SQLiteStore instance to open it.
readers.ZarrReader(zarr.SQLiteStore(tmp_path / "test.zarr.sqlite"))
def test_write_read_zip_store_zarr(samples_path, tmp_path):
"""Test writing and reading a Zarr with a Zip store."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
writer = writers.ZarrWriter(
path=tmp_path / "test.zarr.zip",
shape=reader.shape,
)
writer.copy_from_reader(reader=reader)
writer.close() # Important for zip store!
readers.ZarrReader(tmp_path / "test.zarr.zip")
def test_write_read_temp_store_zarr(samples_path):
"""Test writing and reading a Zarr with a Temp store."""
reader = readers.TIFFReader(samples_path / "CMU-1-Small-Region.svs")
writer = writers.ZarrWriter(
shape=reader.shape,
store=zarr.TempStore("test.zarr"),
)
writer.copy_from_reader(reader=reader)
readers.ZarrReader(writer.zarr.store)
# Test Scenarios
WRITER_EXT_MAPPING = {
".zarr": writers.ZarrWriter,
".tiff": writers.TIFFWriter,
".dcm": writers.DICOMWSIWriter,
}
class TestTranscodeScenarios:
"""Test scenarios for the transcoding WSIs."""
scenarios = [
(
"jpeg_svs_to_zarr",
{
"sample_name": "CMU-1-Small-Region.svs",
"reader_cls": readers.TIFFReader,
"out_reader": readers.ZarrReader,
"out_ext": ".zarr",
},
),
(
"jpeg_tiff_to_zarr",
{
"sample_name": "CMU-1-Small-Region.jpeg.tiff",
"reader_cls": readers.TIFFReader,
"out_reader": readers.ZarrReader,
"out_ext": ".zarr",
},
),
(
"webp_tiff_to_zarr",
{
"sample_name": "CMU-1-Small-Region.webp.tiff",
"reader_cls": readers.TIFFReader,
"out_reader": readers.ZarrReader,
"out_ext": ".zarr",
},
),
(
"jp2_tiff_to_zarr",
{
"sample_name": "CMU-1-Small-Region.jp2.tiff",
"reader_cls": readers.TIFFReader,
"out_reader": readers.ZarrReader,
"out_ext": ".zarr",
},
),
(
"jpeg_dicom_to_zarr",
{
"sample_name": "CMU-1-Small-Region",
"reader_cls": readers.DICOMWSIReader,
"out_reader": readers.ZarrReader,
"out_ext": ".zarr",
},
),
(
"j2k_dicom_to_zarr",
{
"sample_name": "CMU-1-Small-Region-J2K",
"reader_cls": readers.DICOMWSIReader,
"out_reader": readers.ZarrReader,
"out_ext": ".zarr",
},
),
(
"jpeg_dicom_to_tiff",
{
"sample_name": "CMU-1-Small-Region",
"reader_cls": readers.DICOMWSIReader,
"out_reader": readers.TIFFReader,
"out_ext": ".tiff",
},
),
(
"j2k_dicom_to_tiff",
{
"sample_name": "CMU-1-Small-Region-J2K",
"reader_cls": readers.DICOMWSIReader,
"out_reader": readers.TIFFReader,
"out_ext": ".tiff",
},
),
(
"webp_tiff_to_tiff",
{
"sample_name": "CMU-1-Small-Region.webp.tiff",
"reader_cls": readers.TIFFReader,
"out_reader": readers.TIFFReader,
"out_ext": ".tiff",
},
),
(
"jpeg_tiff_to_jpeg_dicom",
{
"sample_name": "CMU-1-Small-Region.jpeg.tiff",
"reader_cls": readers.TIFFReader,
"out_reader": readers.DICOMWSIReader,
"out_ext": ".dcm",
},
),
]
@staticmethod
def test_transcode_tiled(
samples_path: Path,
sample_name: str,
reader_cls: readers.Reader,
out_reader: readers.Reader,
out_ext: str,
tmp_path: Path,
):
"""Test transcoding a tiled WSI."""
in_path = samples_path / sample_name
out_path = (tmp_path / sample_name).with_suffix(out_ext)
reader: Reader = reader_cls(in_path)
writer_cls = WRITER_EXT_MAPPING[out_ext]
writer: Writer = writer_cls(
path=out_path,
shape=reader.shape,
tile_size=reader.tile_shape[::-1],
)
writer.transcode_from_reader(reader=reader)
output_reader: Reader = out_reader(out_path)
assert output_reader.shape == reader.shape
assert output_reader.tile_shape == reader.tile_shape
# Calculate error metrics
reader_img = reader[...]
output_reader_img = output_reader[...]
squared_err = np.subtract(reader_img, output_reader_img) ** 2
abs_err = np.abs(np.subtract(reader_img, output_reader_img))
# A lot of the image should have zero error
assert np.count_nonzero(abs_err) / abs_err.size < 0.25
# Check mean squared error is low
assert np.all(squared_err.mean() < 2.56)
# Check mean absolute error is low
assert np.all(abs_err.mean() < 25.6)
def visually_compare_readers(
self,
in_path: Path,
out_path: Path,
reader: readers.Reader,
output_reader: readers.Reader,
) -> Dict[str, bool]:
"""Compare two readers for manual visual inspection.
Used for debugging.
Args:
in_path:
Path to the input file.
out_path:
Path to the output file.
reader:
Reader for the input file.
output_reader:
Reader for the output file.
"""
import inspect
from matplotlib import pyplot as plt # type: ignore
from matplotlib.widgets import Button # type: ignore
current_frame = inspect.currentframe()
class_name = self.__class__.__name__
function_name = current_frame.f_back.f_code.co_name
# Create a dictionary of arg names to values
args, _, _, values = inspect.getargvalues(current_frame)
args_dict = {arg: values[arg] for arg in args}
function_arguments = ",\n ".join(
f"{k}={v}" if k not in ("self",) else k for k, v in args_dict.items()
)
# Display the function signature and arguments in axs[0]
text_figure = plt.gcf()
text_figure.canvas.set_window_title(f"{class_name} - {function_name}")
text_figure.set_size_inches(8, 2)
plt.suptitle(
f"{function_name}(\n {function_arguments}\n)",
horizontalalignment="left",
verticalalignment="top",
x=0,
)
plt.show(block=False)
# Plot the readers to compare
_, axs = plt.subplots(1, 3, sharex=True, sharey=True)
axs[0].imshow(reader[...])
axs[0].set_title(f"Input\n({in_path.name})")
axs[1].imshow(output_reader[...])
axs[1].set_title(f"Output\n({out_path.name})")
diff = np.abs(np.subtract(reader[...], output_reader[...], dtype=float))
axs[2].imshow(diff.mean(-1))
max_diff = diff.max(axis=(0, 1))
mean_diff = diff.mean(axis=(0, 1))
axs[2].set_title(
f"Difference\nChannel Max Diff {max_diff}\nChannel Mean Diff {mean_diff}"
)
# Set the window title
plt.gcf().canvas.set_window_title(f"{class_name} - {function_name}")
# Add Pass / Fail Buttons with function callbacks
visual_inspections_passed = {}
def pass_callback(event):
"""Callback for the pass button."""
visual_inspections_passed[function_name] = True
plt.close(text_figure)
plt.close()
def fail_callback(event):
"""Callback for the fail button."""
plt.close(text_figure)
plt.close()
ax_pass = plt.axes([0.8, 0.05, 0.1, 0.075])
btn_pass = Button(ax_pass, "Pass", color="lightgreen")
btn_pass.on_clicked(pass_callback)
ax_fail = plt.axes([0.9, 0.05, 0.1, 0.075])
btn_fail = Button(ax_fail, "Fail", color="red")
btn_fail.on_clicked(fail_callback)
# Set suptitle to the function name
plt.suptitle("\n".join([class_name, function_name]))
plt.tight_layout()
plt.show(block=True)
return visual_inspections_passed # noqa: R504
class TestConvertScenarios:
"""Test scenarios for converting between formats."""
scenarios = [
(
"j2k_dicom_to_zarr",
{
"sample_name": "CMU-1-Small-Region-J2K",
"reader_cls": readers.DICOMWSIReader,
"writer_cls": writers.ZarrWriter,
"out_ext": ".zarr",
"codec": "blosc",
},
),
(
"jpeg_dicom_to_blosc_zarr",
{
"sample_name": "CMU-1-Small-Region",
"reader_cls": readers.DICOMWSIReader,
"writer_cls": writers.ZarrWriter,
"out_ext": ".zarr",
"codec": "blosc",
},
),
(
"jpeg_dicom_to_jpeg_zarr",
{
"sample_name": "CMU-1-Small-Region",
"reader_cls": readers.DICOMWSIReader,
"writer_cls": writers.ZarrWriter,
"out_ext": ".zarr",
"codec": "jpeg",
},
),
(
"jp2_to_jpeg_tiff",
{
"sample_name": "XYC.jp2",
"reader_cls": readers.JP2Reader,
"writer_cls": writers.TIFFWriter,
"out_ext": ".tiff",
"codec": "jpeg",
},
),
(
"jp2_to_zarr",
{
"sample_name": "XYC.jp2",
"reader_cls": readers.JP2Reader,
"writer_cls": writers.ZarrWriter,
"out_ext": ".zarr",
"codec": "blosc",
},
),
(
"jp2_to_jpeg_svs",
{
"sample_name": "XYC.jp2",
"reader_cls": readers.JP2Reader,
"writer_cls": writers.SVSWriter,
"out_ext": ".svs",
"codec": "jpeg",
},
),
(
"tiff_to_jp2",
{
"sample_name": "XYC-half-mpp.tiff",
"reader_cls": readers.TIFFReader,
"writer_cls": writers.JP2Writer,
"out_ext": ".jp2",
"codec": "jpeg2000",
},
),
(
"jp2_to_zstd_tiff",
{
"sample_name": "XYC.jp2",
"reader_cls": readers.JP2Reader,
"writer_cls": writers.TIFFWriter,
"out_ext": ".tiff",
"codec": "zstd",
},
),
(
"jp2_to_png_tiff",
{
"sample_name": "XYC.jp2",
"reader_cls": readers.JP2Reader,
"writer_cls": writers.TIFFWriter,
"out_ext": ".tiff",
"codec": "png",
},
),
(
"jp2_to_jpegxr_tiff",
{
"sample_name": "XYC.jp2",
"reader_cls": readers.JP2Reader,
"writer_cls": writers.TIFFWriter,
"out_ext": ".tiff",
"codec": "jpegxr",
},
),
(
"jp2_to_deflate_tiff",
{
"sample_name": "XYC.jp2",
"reader_cls": readers.JP2Reader,
"writer_cls": writers.TIFFWriter,
"out_ext": ".tiff",
"codec": "deflate",
},
),
(
"jp2_to_jpegxl_tiff",
{
"sample_name": "XYC.jp2",
"reader_cls": readers.JP2Reader,
"writer_cls": writers.TIFFWriter,
"out_ext": ".tiff",
"codec": "jpegxl",
},
),
(
"jp2_to_jpeg_dicom",
{
"sample_name": "XYC.jp2",
"reader_cls": readers.JP2Reader,
"writer_cls": writers.DICOMWSIWriter,
"out_ext": ".dcm",
"codec": "jpeg",
},
),
(
"svs_to_jppeg_dicom",
{
"sample_name": "CMU-1-Small-Region.svs",
"reader_cls": readers.OpenSlideReader,
"writer_cls": writers.DICOMWSIWriter,
"out_ext": ".dcm",
"codec": "jpeg",
},
),
(
"tiff_to_dicom",
{
"sample_name": "XYC-half-mpp.tiff",
"reader_cls": readers.TIFFReader,
"writer_cls": writers.DICOMWSIWriter,
"out_ext": ".dcm",
"codec": "jpeg",
},
),
(
"zarr_to_jpeg_dicom",
{
"sample_name": "CMU-1-Small-Region-JPEG.zarr",
"reader_cls": readers.ZarrReader,
"writer_cls": writers.DICOMWSIWriter,
"out_ext": ".tiff",
"codec": "jpeg",
},
),
]
@staticmethod
def test_convert(
samples_path: Path,
sample_name: str,
reader_cls: readers.Reader,
writer_cls: writers.Writer,
out_ext: str,
tmp_path: Path,
codec: str,
):
"""Test converting between formats."""
in_path = samples_path / sample_name
out_path = (tmp_path / sample_name).with_suffix(out_ext)
reader: Reader = reader_cls(in_path)
writer: Writer = writer_cls(
out_path,
shape=reader.shape,
codec=codec,
compression_level=4
if codec
in {
"blosc",
}
else 100,
)
writer.copy_from_reader(
reader,
num_workers=1,
timeout=1e32,
)
# Check that the output file exists
assert out_path.exists()
# Check that the output file has non-zero size
assert out_path.stat().st_size > 0
# Check that the output looks the same as the input
output_reader = readers.Reader.from_file(out_path)
mse = np.mean(np.square(reader[...] - output_reader[...]))
assert mse < 100
class TestWriterScenarios:
"""Test scenarios for writing to formats with codecs."""
scenarios = [
("svs_jpeg", {"writer_cls": writers.SVSWriter, "codec": "jpeg"}),
# Unsupported by tifffile
# ("tiff_blosc", {"writer_cls": writers.TIFFWriter, "codec": "blosc"}),
# ("tiff_blosc2", {"writer_cls": writers.TIFFWriter, "codec": "blosc2"}),
# ("tiff_brotli", {"writer_cls": writers.TIFFWriter, "codec": "brotli"}),
("tiff_deflate", {"writer_cls": writers.TIFFWriter, "codec": "deflate"}),
# Unsupported by tifffile
# ("tiff_j2k", {"writer_cls": writers.TIFFWriter, "codec": "j2k"}),
("tiff_jp2", {"writer_cls": writers.TIFFWriter, "codec": "jpeg2000"}),
("tiff_jpeg", {"writer_cls": writers.TIFFWriter, "codec": "jpeg"}),
# Unsupported by tifffile
# ("tiff_jpls", {"writer_cls": writers.TIFFWriter, "codec": "jpegls"}),
("tiff_jpxl", {"writer_cls": writers.TIFFWriter, "codec": "jpegxl"}),
("tiff_jpxr", {"writer_cls": writers.TIFFWriter, "codec": "jpegxr"}),
# Unsupported by tifffile
# ("tiff_lz4", {"writer_cls": writers.TIFFWriter, "codec": "lz4"}),
# Encode unsupported by imagecodecs
# ("tiff_lzw", {"writer_cls": writers.TIFFWriter, "codec": "lzw"}),
("tiff_png", {"writer_cls": writers.TIFFWriter, "codec": "png"}),
("tiff_webp", {"writer_cls": writers.TIFFWriter, "codec": "webp"}),
# Unsupported by tifffile
# ("tiff_zfp", {"writer_cls": writers.TIFFWriter, "codec": "zfp"}),
("tiff_zstd", {"writer_cls": writers.TIFFWriter, "codec": "zstd"}),
("zarr_blosc", {"writer_cls": writers.ZarrWriter, "codec": "blosc"}),
("zarr_blosc2", {"writer_cls": writers.ZarrWriter, "codec": "blosc2"}),
("zarr_brotli", {"writer_cls": writers.ZarrWriter, "codec": "brotli"}),
("zarr_deflate", {"writer_cls": writers.ZarrWriter, "codec": "deflate"}),
("zarr_j2k", {"writer_cls": writers.ZarrWriter, "codec": "j2k"}),
("zarr_jp2", {"writer_cls": writers.ZarrWriter, "codec": "jpeg2000"}),
("zarr_jpeg", {"writer_cls": writers.ZarrWriter, "codec": "jpeg"}),
("zarr_jpls", {"writer_cls": writers.ZarrWriter, "codec": "jpegls"}),
("zarr_jpxl", {"writer_cls": writers.ZarrWriter, "codec": "jpegxl"}),
("zarr_jpxr", {"writer_cls": writers.ZarrWriter, "codec": "jpegxr"}),
("zarr_lz4", {"writer_cls": writers.ZarrWriter, "codec": "lz4"}),
# Encode unsupported by imagecodecs
# ("zarr_lzw", {"writer_cls": writers.ZarrWriter, "codec": "lzw"}),
("zarr_png", {"writer_cls": writers.ZarrWriter, "codec": "png"}),
("zarr_webp", {"writer_cls": writers.ZarrWriter, "codec": "webp"}),
(
"zarr_zfp",
{"writer_cls": writers.ZarrWriter, "codec": "zfp"},
), # Wrong data type
("zarr_zstd", {"writer_cls": writers.ZarrWriter, "codec": "zstd"}),
]
@staticmethod
def test_write(
samples_path: Path, tmp_path: Path, writer_cls: writers.Writer, codec: str
):
"""Test writing to a format does not error."""
reader = readers.Reader.from_file(samples_path / "CMU-1-Small-Region.svs")
writer = writer_cls(
tmp_path / "image",
shape=reader.shape,
codec=codec,
dtype=float if codec == "zfp" else np.uint8,
)
writer.copy_from_reader(reader)
class TestReaderScenarios:
"""Test scenarios for readers."""
scenarios = [
(
"jpeg_svs_tifffile",
{
"sample_name": "CMU-1-Small-Region.svs",
"reader_cls": readers.TIFFReader,
},
),
(
"jpeg_svs_openslide",
{
"sample_name": "CMU-1-Small-Region.svs",
"reader_cls": readers.OpenSlideReader,
},
),
(
"j2k_dicom",
{
"sample_name": "CMU-1-Small-Region-J2K",
"reader_cls": readers.DICOMWSIReader,
},
),
(
"jpeg_dicom",
{
"sample_name": "CMU-1-Small-Region",
"reader_cls": readers.DICOMWSIReader,
},
),
(
"jpeg_zarr",
{
"sample_name": "CMU-1-Small-Region-JPEG.zarr",
"reader_cls": readers.ZarrReader,
},
),
]
@staticmethod
def test_thumbnail_512_512_approx(
samples_path: Path,
sample_name: str,
reader_cls: readers.Reader,
):
"""Test creating a thumbnail."""
in_path = samples_path / sample_name
reader: readers.Reader = reader_cls(in_path)
reader.thumbnail(
shape=(512, 512),
approx_ok=True,
)
@staticmethod
def test_mpp_found(
samples_path: Path,
sample_name: str,
reader_cls: readers.Reader,
):
"""Check that resolution/mpp is read from the file (not None)."""
in_path = samples_path / sample_name
reader: readers.Reader = reader_cls(in_path)
assert hasattr(reader, "microns_per_pixel")
# A plain zarr doesn't have a resolution attribute so skip this.
# Maybe in duture this could be done via the xarray metadata.
if (
not isinstance(reader, readers.ZarrReader)
or sample_name != "CMU-1-Small-Region-JPEG.zarr"
):
assert reader.microns_per_pixel is not None