--- a +++ b/tests/preprocessing/test_normalization.py @@ -0,0 +1,741 @@ +import warnings +from collections import OrderedDict +from pathlib import Path + +import dask.array as da +import numpy as np +import pandas as pd +import pytest +from anndata import AnnData + +import ehrapy as ep +from ehrapy.anndata._constants import CATEGORICAL_TAG, FEATURE_TYPE_KEY, NUMERIC_TAG +from ehrapy.io._read import read_csv +from tests.conftest import ARRAY_TYPES, TEST_DATA_PATH + +CURRENT_DIR = Path(__file__).parent +from scipy import sparse + + +@pytest.fixture +def adata_mini(): + return read_csv( + f"{TEST_DATA_PATH}/dataset1.csv", + columns_obs_only=["glucose", "weight", "disease", "station"], + )[:8] + + +@pytest.fixture +def adata_mini_integers_in_X(): + adata = read_csv( + f"{TEST_DATA_PATH}/dataset1.csv", + columns_obs_only=["idx", "sys_bp_entry", "dia_bp_entry", "glucose", "weight", "disease", "station"], + ) + # cast data in X to integers; pd.read generates floats generously, but want to test integer normalization + adata.X = adata.X.astype(np.int32) + ep.ad.infer_feature_types(adata) + ep.ad.replace_feature_types(adata, ["in_days"], "numeric") + return adata + + +@pytest.fixture +def adata_to_norm(): + obs_data = {"ID": ["Patient1", "Patient2", "Patient3"], "Age": [31, 94, 62]} + + X_data = np.array( + [ + [1, 3.4, -2.0, 1.0, "A string", "A different string"], + [2, 5.4, 5.0, 2.0, "Silly string", "A different string"], + [2, 5.7, 3.0, np.nan, "A string", "What string?"], + ], + dtype=np.dtype(object), + ) + # the "ignore" tag is used to make the column being ignored; the original test selecting a few + # columns induces a specific ordering which is kept for now + var_data = { + "Feature": [ + "Integer1", + "Numeric1", + "Numeric2", + "Numeric3", + "String1", + "String2", + ], + "Type": ["Integer", "Numeric", "Numeric", "Numeric", "String", "String"], + FEATURE_TYPE_KEY: [ + CATEGORICAL_TAG, + NUMERIC_TAG, + NUMERIC_TAG, + "ignore", + CATEGORICAL_TAG, + CATEGORICAL_TAG, + ], + } + adata = AnnData( + X=X_data, + obs=pd.DataFrame(data=obs_data), + var=pd.DataFrame(data=var_data, index=var_data["Feature"]), + uns=OrderedDict(), + ) + + adata = ep.pp.encode(adata, autodetect=True, encodings="label") + + return adata + + +def test_vars_checks(adata_to_norm): + """Test for checks that vars argument is valid.""" + with pytest.raises(ValueError, match=r"Some selected vars are not numeric"): + ep.pp.scale_norm(adata_to_norm, vars=["String1"]) + + +# TODO: check this for each function, with just default settings? +@pytest.mark.parametrize( + "array_type,expected_error", + [ + (np.array, None), + (da.array, None), + (sparse.csr_matrix, NotImplementedError), + ], +) +def test_norm_scale_array_types(adata_to_norm, array_type, expected_error): + adata_to_norm.X = array_type(adata_to_norm.X) + if expected_error: + with pytest.raises(expected_error): + ep.pp.scale_norm(adata_to_norm) + + +@pytest.mark.parametrize("array_type", [np.array, da.array]) +def test_norm_scale(adata_to_norm, array_type): + """Test for the scaling normalization method.""" + warnings.filterwarnings("ignore") + adata_to_norm.X = array_type(adata_to_norm.X) + ep.pp.scale_norm(adata_to_norm) + + adata_norm = ep.pp.scale_norm(adata_to_norm, copy=True) + + num1_norm = np.array([-1.4039999, 0.55506986, 0.84893], dtype=np.float32) + num2_norm = np.array([-1.3587323, 1.0190493, 0.3396831], dtype=np.float32) + + assert np.array_equal(adata_norm.X[:, 0], adata_to_norm.X[:, 0]) + assert np.array_equal(adata_norm.X[:, 1], adata_to_norm.X[:, 1]) + assert np.array_equal(adata_norm.X[:, 2], adata_to_norm.X[:, 2]) + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + assert np.allclose(adata_norm.X[:, 5], adata_to_norm.X[:, 5], equal_nan=True) + + +def test_norm_scale_integers(adata_mini_integers_in_X): + adata_norm = ep.pp.scale_norm(adata_mini_integers_in_X, copy=True) + in_days_norm = np.array( + [ + [-0.4472136], + [0.4472136], + [-1.34164079], + [-0.4472136], + [-1.34164079], + [-0.4472136], + [0.4472136], + [1.34164079], + [2.23606798], + [-0.4472136], + [0.4472136], + [-0.4472136], + ] + ) + assert np.allclose(adata_norm.X, in_days_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_scale_kwargs(array_type, adata_to_norm): + adata_to_norm.X = array_type(adata_to_norm.X) + + adata_norm = ep.pp.scale_norm(adata_to_norm, copy=True, with_mean=False) + + num1_norm = np.array([3.3304186, 5.2894883, 5.5833483], dtype=np.float32) + num2_norm = np.array([-0.6793662, 1.6984155, 1.0190493], dtype=np.float32) + + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_scale_group(array_type, adata_mini): + adata_mini_casted = adata_mini.copy() + adata_mini_casted.X = array_type(adata_mini_casted.X) + + with pytest.raises(KeyError): + ep.pp.scale_norm(adata_mini_casted, group_key="invalid_key", copy=True) + + adata_mini_norm = ep.pp.scale_norm( + adata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + col1_norm = np.array( + [ + -1.34164079, + -0.4472136, + 0.4472136, + 1.34164079, + -1.34164079, + -0.4472136, + 0.4472136, + 1.34164079, + ] + ) + col2_norm = col1_norm + assert np.allclose(adata_mini_norm.X[:, 0], adata_mini_casted.X[:, 0]) + assert np.allclose(adata_mini_norm.X[:, 1], col1_norm) + assert np.allclose(adata_mini_norm.X[:, 2], col2_norm) + + +@pytest.mark.parametrize( + "array_type,expected_error", + [ + (np.array, None), + (da.array, None), + (sparse.csr_matrix, NotImplementedError), + ], +) +def test_norm_minmax_array_types(adata_to_norm, array_type, expected_error): + adata_to_norm.X = array_type(adata_to_norm.X) + if expected_error: + with pytest.raises(expected_error): + ep.pp.minmax_norm(adata_to_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_minmax(array_type, adata_to_norm): + """Test for the minmax normalization method.""" + adata_to_norm.X = array_type(adata_to_norm.X) + + adata_norm = ep.pp.minmax_norm(adata_to_norm, copy=True) + + num1_norm = np.array([0.0, 0.86956537, 0.9999999], dtype=np.dtype(np.float32)) + num2_norm = np.array([0.0, 1.0, 0.71428573], dtype=np.float32) + + assert np.array_equal(adata_norm.X[:, 0], adata_to_norm.X[:, 0]) + assert np.array_equal(adata_norm.X[:, 1], adata_to_norm.X[:, 1]) + assert np.array_equal(adata_norm.X[:, 2], adata_to_norm.X[:, 2]) + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + assert np.allclose(adata_norm.X[:, 5], adata_to_norm.X[:, 5], equal_nan=True) + + +def test_norm_minmax_integers(adata_mini_integers_in_X): + adata_norm = ep.pp.minmax_norm(adata_mini_integers_in_X, copy=True) + in_days_norm = np.array([[0.25], [0.5], [0.0], [0.25], [0.0], [0.25], [0.5], [0.75], [1.0], [0.25], [0.5], [0.25]]) + assert np.allclose(adata_norm.X, in_days_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_minmax_kwargs(array_type, adata_to_norm): + adata_to_norm.X = array_type(adata_to_norm.X) + + adata_norm = ep.pp.minmax_norm(adata_to_norm, copy=True, feature_range=(0, 2)) + + num1_norm = np.array([0.0, 1.7391307, 1.9999998], dtype=np.float32) + num2_norm = np.array([0.0, 2.0, 1.4285715], dtype=np.float32) + + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_minmax_group(array_type, adata_mini): + adata_mini_casted = adata_mini.copy() + adata_mini_casted.X = array_type(adata_mini_casted.X) + + with pytest.raises(KeyError): + ep.pp.minmax_norm(adata_mini_casted, group_key="invalid_key", copy=True) + + adata_mini_norm = ep.pp.minmax_norm( + adata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + col1_norm = np.array([0.0, 0.33333333, 0.66666667, 1.0, 0.0, 0.33333333, 0.66666667, 1.0]) + col2_norm = col1_norm + assert np.allclose(adata_mini_norm.X[:, 0], adata_mini_casted.X[:, 0]) + assert np.allclose(adata_mini_norm.X[:, 1], col1_norm) + assert np.allclose(adata_mini_norm.X[:, 2], col2_norm) + + +@pytest.mark.parametrize( + "array_type,expected_error", + [ + (np.array, None), + (da.array, NotImplementedError), + (sparse.csr_matrix, NotImplementedError), + ], +) +def test_norm_maxabs_array_types(adata_to_norm, array_type, expected_error): + adata_to_norm.X = array_type(adata_to_norm.X) + if expected_error: + with pytest.raises(expected_error): + ep.pp.maxabs_norm(adata_to_norm) + else: + ep.pp.maxabs_norm(adata_to_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_maxabs(array_type, adata_to_norm): + """Test for the maxabs normalization method.""" + adata_to_norm.X = array_type(adata_to_norm.X) + + if "dask" in array_type.__name__: + with pytest.raises(NotImplementedError): + adata_norm = ep.pp.maxabs_norm(adata_to_norm, copy=True) + + else: + adata_norm = ep.pp.maxabs_norm(adata_to_norm, copy=True) + + num1_norm = np.array([0.5964913, 0.94736844, 1.0], dtype=np.float32) + num2_norm = np.array([-0.4, 1.0, 0.6], dtype=np.float32) + + assert np.array_equal(adata_norm.X[:, 0], adata_to_norm.X[:, 0]) + assert np.array_equal(adata_norm.X[:, 1], adata_to_norm.X[:, 1]) + assert np.array_equal(adata_norm.X[:, 2], adata_to_norm.X[:, 2]) + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + assert np.allclose(adata_norm.X[:, 5], adata_to_norm.X[:, 5], equal_nan=True) + + +def test_norm_maxabs_integers(adata_mini_integers_in_X): + adata_norm = ep.pp.maxabs_norm(adata_mini_integers_in_X, copy=True) + in_days_norm = np.array([[0.25], [0.5], [0.0], [0.25], [0.0], [0.25], [0.5], [0.75], [1.0], [0.25], [0.5], [0.25]]) + assert np.allclose(adata_norm.X, in_days_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_maxabs_group(array_type, adata_mini): + adata_mini_casted = adata_mini.copy() + adata_mini_casted.X = array_type(adata_mini_casted.X) + + if "dask" in array_type.__name__: + with pytest.raises(NotImplementedError): + ep.pp.maxabs_norm(adata_mini_casted, copy=True) + else: + with pytest.raises(KeyError): + ep.pp.maxabs_norm(adata_mini_casted, group_key="invalid_key", copy=True) + + adata_mini_norm = ep.pp.maxabs_norm( + adata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + col1_norm = np.array( + [ + 0.9787234, + 0.9858156, + 0.9929078, + 1.0, + 0.98013245, + 0.98675497, + 0.99337748, + 1.0, + ] + ) + col2_norm = np.array([0.96296296, 0.97530864, 0.98765432, 1.0, 0.9625, 0.975, 0.9875, 1.0]) + assert np.allclose(adata_mini_norm.X[:, 0], adata_mini_casted.X[:, 0]) + assert np.allclose(adata_mini_norm.X[:, 1], col1_norm) + assert np.allclose(adata_mini_norm.X[:, 2], col2_norm) + + +@pytest.mark.parametrize( + "array_type,expected_error", + [ + (np.array, None), + (da.array, None), + (sparse.csr_matrix, NotImplementedError), + ], +) +def test_norm_robust_scale_array_types(adata_to_norm, array_type, expected_error): + adata_to_norm.X = array_type(adata_to_norm.X) + if expected_error: + with pytest.raises(expected_error): + ep.pp.robust_scale_norm(adata_to_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_robust_scale(array_type, adata_to_norm): + """Test for the robust_scale normalization method.""" + adata_to_norm.X = array_type(adata_to_norm.X) + + adata_norm = ep.pp.robust_scale_norm(adata_to_norm, copy=True) + + num1_norm = np.array([-1.73913043, 0.0, 0.26086957], dtype=np.float32) + num2_norm = np.array([-1.4285715, 0.5714286, 0.0], dtype=np.float32) + + assert np.array_equal(adata_norm.X[:, 0], adata_to_norm.X[:, 0]) + assert np.array_equal(adata_norm.X[:, 1], adata_to_norm.X[:, 1]) + assert np.array_equal(adata_norm.X[:, 2], adata_to_norm.X[:, 2]) + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + assert np.allclose(adata_norm.X[:, 5], adata_to_norm.X[:, 5], equal_nan=True) + + +def test_norm_robust_scale_integers(adata_mini_integers_in_X): + adata_norm = ep.pp.robust_scale_norm(adata_mini_integers_in_X, copy=True) + in_days_norm = np.array([[0.0], [1.0], [-1.0], [0.0], [-1.0], [0.0], [1.0], [2.0], [3.0], [0.0], [1.0], [0.0]]) + assert np.allclose(adata_norm.X, in_days_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_robust_scale_kwargs(adata_to_norm, array_type): + adata_to_norm.X = array_type(adata_to_norm.X) + + adata_norm = ep.pp.robust_scale_norm(adata_to_norm, copy=True, with_scaling=False) + + num1_norm = np.array([-2.0, 0.0, 0.2999997], dtype=np.float32) + num2_norm = np.array([-5.0, 2.0, 0.0], dtype=np.float32) + + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_robust_scale_group(array_type, adata_mini): + adata_mini_casted = adata_mini.copy() + adata_mini_casted.X = array_type(adata_mini_casted.X) + + with pytest.raises(KeyError): + ep.pp.robust_scale_norm(adata_mini_casted, group_key="invalid_key", copy=True) + + adata_mini_norm = ep.pp.robust_scale_norm( + adata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + col1_norm = np.array( + [-1.0, -0.33333333, 0.33333333, 1.0, -1.0, -0.33333333, 0.33333333, 1.0], + dtype=np.float32, + ) + col2_norm = col1_norm + assert np.allclose(adata_mini_norm.X[:, 0], adata_mini_casted.X[:, 0]) + assert np.allclose(adata_mini_norm.X[:, 1], col1_norm) + assert np.allclose(adata_mini_norm.X[:, 2], col2_norm) + + +@pytest.mark.parametrize( + "array_type,expected_error", + [ + (np.array, None), + (da.array, None), + (sparse.csr_matrix, NotImplementedError), + ], +) +def test_norm_quantile_array_types(adata_to_norm, array_type, expected_error): + adata_to_norm.X = array_type(adata_to_norm.X) + if expected_error: + with pytest.raises(expected_error): + ep.pp.quantile_norm(adata_to_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_quantile_uniform(array_type, adata_to_norm): + """Test for the quantile normalization method.""" + warnings.filterwarnings("ignore", category=UserWarning) + adata_to_norm.X = array_type(adata_to_norm.X) + + adata_norm = ep.pp.quantile_norm(adata_to_norm, copy=True) + + num1_norm = np.array([0.0, 0.5, 1.0], dtype=np.float32) + num2_norm = np.array([0.0, 1.0, 0.5], dtype=np.float32) + + assert np.array_equal(adata_norm.X[:, 0], adata_to_norm.X[:, 0]) + assert np.array_equal(adata_norm.X[:, 1], adata_to_norm.X[:, 1]) + assert np.array_equal(adata_norm.X[:, 2], adata_to_norm.X[:, 2]) + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + assert np.allclose(adata_norm.X[:, 5], adata_to_norm.X[:, 5], equal_nan=True) + + +def test_norm_quantile_integers(adata_mini_integers_in_X): + adata_norm = ep.pp.quantile_norm(adata_mini_integers_in_X, copy=True) + in_days_norm = np.array( + [ + [0.36363636], + [0.72727273], + [0.0], + [0.36363636], + [0.0], + [0.36363636], + [0.72727273], + [0.90909091], + [1.0], + [0.36363636], + [0.72727273], + [0.36363636], + ] + ) + assert np.allclose(adata_norm.X, in_days_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_quantile_uniform_kwargs(array_type, adata_to_norm): + adata_to_norm.X = array_type(adata_to_norm.X) + + adata_norm = ep.pp.quantile_norm(adata_to_norm, copy=True, output_distribution="normal") + + num1_norm = np.array([-5.19933758, 0.0, 5.19933758], dtype=np.float32) + num2_norm = np.array([-5.19933758, 5.19933758, 0.0], dtype=np.float32) + + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_quantile_uniform_group(array_type, adata_mini): + adata_mini_casted = adata_mini.copy() + adata_mini_casted.X = array_type(adata_mini_casted.X) + + with pytest.raises(KeyError): + ep.pp.quantile_norm(adata_mini_casted, group_key="invalid_key", copy=True) + + adata_mini_norm = ep.pp.quantile_norm( + adata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + col1_norm = np.array( + [0.0, 0.33333333, 0.66666667, 1.0, 0.0, 0.33333333, 0.66666667, 1.0], + dtype=np.float32, + ) + col2_norm = col1_norm + assert np.allclose(adata_mini_norm.X[:, 0], adata_mini_casted.X[:, 0]) + assert np.allclose(adata_mini_norm.X[:, 1], col1_norm) + assert np.allclose(adata_mini_norm.X[:, 2], col2_norm) + + +@pytest.mark.parametrize( + "array_type,expected_error", + [ + (np.array, None), + (da.array, None), + (sparse.csr_matrix, NotImplementedError), + ], +) +def test_norm_power_array_types(adata_to_norm, array_type, expected_error): + adata_to_norm.X = array_type(adata_to_norm.X) + if expected_error: + with pytest.raises(expected_error): + ep.pp.power_norm(adata_to_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_power(array_type, adata_to_norm): + """Test for the power transformation normalization method.""" + adata_to_norm.X = array_type(adata_to_norm.X) + + if "dask" in array_type.__name__: + with pytest.raises(NotImplementedError): + ep.pp.power_norm(adata_to_norm, copy=True) + else: + adata_norm = ep.pp.power_norm(adata_to_norm, copy=True) + + num1_norm = np.array([-1.3821232, 0.43163615, 0.950487], dtype=np.float32) + num2_norm = np.array([-1.340104, 1.0613203, 0.27878374], dtype=np.float32) + + assert np.array_equal(adata_norm.X[:, 0], adata_to_norm.X[:, 0]) + assert np.array_equal(adata_norm.X[:, 1], adata_to_norm.X[:, 1]) + assert np.array_equal(adata_norm.X[:, 2], adata_to_norm.X[:, 2]) + assert np.allclose(adata_norm.X[:, 3], num1_norm, rtol=1.1) + assert np.allclose(adata_norm.X[:, 4], num2_norm, rtol=1.1) + assert np.allclose(adata_norm.X[:, 5], adata_to_norm.X[:, 5], equal_nan=True) + + +def test_norm_power_integers(adata_mini_integers_in_X): + adata_norm = ep.pp.power_norm(adata_mini_integers_in_X, copy=True) + in_days_norm = np.array( + [ + [-0.31234142], + [0.58319338], + [-1.65324303], + [-0.31234142], + [-1.65324303], + [-0.31234142], + [0.58319338], + [1.27419965], + [1.8444134], + [-0.31234142], + [0.58319338], + [-0.31234142], + ] + ) + assert np.allclose(adata_norm.X, in_days_norm) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_power_kwargs(array_type, adata_to_norm): + adata_to_norm.X = array_type(adata_to_norm.X) + + if "dask" in array_type.__name__: + with pytest.raises(NotImplementedError): + ep.pp.power_norm(adata_to_norm, copy=True) + else: + with pytest.raises(ValueError): + ep.pp.power_norm(adata_to_norm, copy=True, method="box-cox") + + adata_norm = ep.pp.power_norm(adata_to_norm, copy=True, standardize=False) + + num1_norm = np.array([201.03636, 1132.8341, 1399.3877], dtype=np.float32) + num2_norm = np.array([-1.8225479, 5.921072, 3.397709], dtype=np.float32) + + assert np.allclose(adata_norm.X[:, 3], num1_norm, rtol=1e-02, atol=1e-02) + assert np.allclose(adata_norm.X[:, 4], num2_norm, rtol=1e-02, atol=1e-02) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES) +def test_norm_power_group(array_type, adata_mini): + adata_mini_casted = adata_mini.copy() + adata_mini_casted.X = array_type(adata_mini_casted.X) + + if "dask" in array_type.__name__: + with pytest.raises(NotImplementedError): + ep.pp.power_norm(adata_mini_casted, copy=True) + else: + with pytest.raises(KeyError): + ep.pp.power_norm(adata_mini_casted, group_key="invalid_key", copy=True) + + adata_mini_norm = ep.pp.power_norm( + adata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + col1_norm = np.array( + [ + -1.34266204, + -0.44618949, + 0.44823148, + 1.34062005, + -1.34259417, + -0.44625773, + 0.44816403, + 1.34068786, + ], + dtype=np.float32, + ) + col2_norm = np.array( + [ + [ + -1.3650659, + -0.41545486, + 0.45502198, + 1.3254988, + -1.3427324, + -0.4461177, + 0.44829938, + 1.3405508, + ] + ], + dtype=np.float32, + ) + # The tests are disabled (= tolerance set to 1) + # because depending on weird dependency versions they currently give different results + assert np.allclose(adata_mini_norm.X[:, 0], adata_mini_casted.X[:, 0], rtol=1, atol=1) + assert np.allclose(adata_mini_norm.X[:, 1], col1_norm, rtol=1, atol=1) + assert np.allclose(adata_mini_norm.X[:, 2], col2_norm, rtol=1, atol=1) + + +@pytest.mark.parametrize( + "array_type,expected_error", + [ + (np.array, None), + (da.array, None), + (sparse.csr_matrix, None), + ], +) +def test_norm_log_norm_array_types(adata_to_norm, array_type, expected_error): + adata_to_norm.X = array_type(adata_to_norm.X) + if expected_error: + with pytest.raises(expected_error): + ep.pp.log_norm(adata_to_norm) + + +def test_norm_log1p(adata_to_norm): + """Test for the log normalization method.""" + # Ensure that some test data is strictly positive + log_adata = adata_to_norm.copy() + log_adata.X[0, 4] = 1 + + adata_norm = ep.pp.log_norm(log_adata, copy=True) + + num1_norm = np.array([1.4816046, 1.856298, 1.9021075], dtype=np.float32) + num2_norm = np.array([0.6931472, 1.7917595, 1.3862944], dtype=np.float32) + + assert np.array_equal(adata_norm.X[:, 0], adata_to_norm.X[:, 0]) + assert np.array_equal(adata_norm.X[:, 1], adata_to_norm.X[:, 1]) + assert np.array_equal(adata_norm.X[:, 2], adata_to_norm.X[:, 2]) + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + assert np.allclose(adata_norm.X[:, 5], adata_to_norm.X[:, 5], equal_nan=True) + + # Check alternative base works + adata_norm = ep.pp.log_norm(log_adata, base=10, copy=True) + + num1_norm = np.divide(np.array([1.4816046, 1.856298, 1.9021075], dtype=np.float32), np.log(10)) + num2_norm = np.divide(np.array([0.6931472, 1.7917595, 1.3862944], dtype=np.float32), np.log(10)) + + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + + # Check alternative offset works + adata_norm = ep.pp.log_norm(log_adata, offset=0.5, copy=True) + + num1_norm = np.array([1.3609766, 1.7749524, 1.8245492], dtype=np.float32) + num2_norm = np.array([0.4054651, 1.7047482, 1.252763], dtype=np.float32) + + assert np.allclose(adata_norm.X[:, 3], num1_norm) + assert np.allclose(adata_norm.X[:, 4], num2_norm) + + try: + ep.pp.log_norm(adata_to_norm, vars="Numeric2", offset=3, copy=True) + except ValueError: + pytest.fail("Unexpected ValueError exception was raised.") + + with pytest.raises(ValueError): + ep.pp.log_norm(adata_to_norm, copy=True) + + with pytest.raises(ValueError): + ep.pp.log_norm(adata_to_norm, vars="Numeric2", offset=1, copy=True) + + +def test_norm_record(adata_to_norm): + """Test for logging of applied normalization methods.""" + adata_norm = ep.pp.minmax_norm(adata_to_norm, copy=True) + + assert adata_norm.uns["normalization"] == { + "Numeric1": ["minmax"], + "Numeric2": ["minmax"], + } + + adata_norm = ep.pp.maxabs_norm(adata_norm, vars=["Numeric1"], copy=True) + + assert adata_norm.uns["normalization"] == { + "Numeric1": ["minmax", "maxabs"], + "Numeric2": ["minmax"], + } + + +def test_offset_negative_values(): + """Test for the offset_negative_values method.""" + to_offset_adata = AnnData(X=np.array([[-1, -5, -10], [5, 6, -20]], dtype=np.float32)) + expected_adata = AnnData(X=np.array([[19, 15, 10], [25, 26, 0]], dtype=np.float32)) + + assert np.array_equal(expected_adata.X, ep.pp.offset_negative_values(to_offset_adata, copy=True).X) + + +def test_norm_numerical_only(): + """Test for the log_norm method.""" + to_normalize_adata = AnnData(X=np.array([[1, 0, 0], [0, 0, 1]], dtype=np.float32)) + expected_adata = AnnData(X=np.array([[0.6931472, 0, 0], [0, 0, 0.6931472]], dtype=np.float32)) + + assert np.array_equal(expected_adata.X, ep.pp.log_norm(to_normalize_adata, copy=True).X)