Diff of /test/test_utils.py [000000] .. [433586]

Switch to side-by-side view

--- a
+++ b/test/test_utils.py
@@ -0,0 +1,267 @@
+import pytest
+import numpy as np
+import pandas as pd
+from scipy import spatial, cluster
+
+from maui import utils
+
+
+def test_merge_factors():
+    z = pd.DataFrame(
+        [
+            [1, 1, 1, 0, 0, 0, 1, 0, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 1, 0, 0, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 1, 1, 1, 0],
+        ],
+        index=[f"sample {i}" for i in range(11)],
+        columns=[f"LF{i}" for i in range(9)],
+        dtype=float,
+    )  # expect 0,1,2 to be merged, and 3,7 to be merged
+
+    z_merged = utils.merge_factors(z, metric="euclidean", plot_dendro=False)
+
+    assert z_merged.shape[1] == 6
+    assert "0_1_2" in z_merged.columns
+    assert "3_7" in z_merged.columns
+
+
+def test_merge_factors_with_custom_linkage():
+    z = pd.DataFrame(
+        [
+            [1, 1, 1, 0, 0, 0, 1, 0, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 1, 0, 0, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 1, 1, 1, 0],
+        ],
+        index=[f"sample {i}" for i in range(11)],
+        columns=[f"LF{i}" for i in range(9)],
+        dtype=float,
+    )  # expect 0,1,2 to be merged, and 3,7 to be merged
+
+    l = cluster.hierarchy.linkage(spatial.distance.pdist(z.T, "minkowski"), "average")
+
+    z_merged = utils.merge_factors(z, l=l, plot_dendro=False)
+
+    assert z_merged.shape[1] == 6
+    assert "0_1_2" in z_merged.columns
+    assert "3_7" in z_merged.columns
+
+
+def test_filter_factors_by_r2():
+    dummy_z = pd.DataFrame(
+        [[0, 1, 2], [1, 0, 2]],
+        index=["sample 1", "sample 2"],
+        columns=["LF1", "LF2", "LF3"],
+    )
+
+    dummy_x = pd.DataFrame(
+        [[1.0, 1.0, 1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0, 1.0, 1.0]],
+        columns=[f"feature{i}" for i in range(6)],
+        index=["sample 1", "sample 2"],
+    ).T
+
+    z_filt = utils.filter_factors_by_r2(dummy_z, dummy_x)
+    assert z_filt.columns.tolist() == ["LF1", "LF2"]
+
+def test_map_factors_to_feaures_using_linear_models():
+    dummy_z = pd.DataFrame(
+        [[0, 1], [1, 0]], index=["sample 1", "sample 2"], columns=["LF1", "LF2"]
+    )
+
+    dummy_x = pd.DataFrame(
+        [[1.0, 1.0, 1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0, 1.0, 1.0]],
+        columns=[f"feature{i}" for i in range(6)],
+        index=["sample 1", "sample 2"],
+    ).T
+
+    expected_w = np.array(
+        [[-2.0, 2.0], [-2.0, 2.0], [-2.0, 2.0], [2.0, -2.0], [2.0, -2.0], [2.0, -2.0]]
+    )
+
+    w = utils.map_factors_to_feaures_using_linear_models(dummy_z, dummy_x)
+
+    assert np.allclose(w, expected_w)
+
+
+def test_correlate_factors_and_features():
+    dummy_z = pd.DataFrame(
+        [[0, 1], [1, 0]], index=["sample 1", "sample 2"], columns=["LF1", "LF2"]
+    )
+
+    dummy_x = pd.DataFrame(
+        [[1, 1, 1, 0, 0, 0], [0, 0, 0, 1, 1, 1]],
+        columns=[f"feature{i}" for i in range(6)],
+        index=["sample 1", "sample 2"],
+    )
+
+    expected_corrs = np.array(
+        [[-1.0, 1.0], [-1.0, 1.0], [-1.0, 1.0], [1.0, -1.0], [1.0, -1.0], [1.0, -1.0]]
+    )
+
+    corrs = utils.correlate_factors_and_features(dummy_z, dummy_x)
+
+    assert np.allclose(corrs, expected_corrs)
+
+
+def test_compute_roc():
+    np.random.seed(0)
+    dummy_z = pd.DataFrame(
+        [
+            [0, 1, 1, 1, 0, 1, 1, 0, 0],
+            [1, 0, 0, 0, 0, 0, 1, 1, 0],
+            [1, 0, 1, 0, 0, 0, 1, 1, 0],
+            [1, 0, 0, 1, 0, 0, 1, 1, 0],
+            [1, 0, 0, 0, 1, 1, 1, 1, 0],
+            [1, 1, 1, 0, 0, 0, 1, 1, 1],
+        ],
+        index=[f"sample {i}" for i in range(6)],
+        columns=[f"LF{i}" for i in range(9)],
+    )
+    dummy_y = pd.Series(["a", "b", "a", "c", "b", "c"], index=dummy_z.index)
+
+    roc_curves = utils.compute_roc(dummy_z, dummy_y, cv_folds=2)
+    assert np.allclose(roc_curves["a"].FPR, [0.0, 0.5, 0.5, 0.75, 1.0])
+
+
+def test_compute_auc():
+    fpr = [0.0, 0.0, 0.5, 0.5, 1.0]
+    tpr = [0.0, 0.5, 0.5, 1.0, 1.0]
+    roc = utils.auc(fpr, tpr)
+    assert roc - 0.75 < 1e-6
+
+
+def test_estimate_km():
+    yhat = pd.Series(
+        ["a", "a", "a", "b", "b", "b"], index=[f"Sample {i}" for i in range(6)]
+    )
+    durations = np.random.poisson(6, 6)
+    observed = np.random.randn(6) > 0.1
+    survival = pd.DataFrame(
+        dict(duration=durations, observed=observed),
+        index=[f"Sample {i}" for i in range(6)],
+    )
+    km = utils.estimate_kaplan_meier(yhat, survival)
+
+    assert "a" in km.columns
+    assert "b" in km.columns
+
+
+def test_multivariate_logrank_test():
+    yhat = pd.Series(
+        ["a", "a", "a", "b", "b", "b"], index=[f"Sample {i}" for i in range(6)]
+    )
+    durations = np.random.poisson(6, 6)
+    observed = np.random.randn(6) > 0.1
+    survival = pd.DataFrame(
+        dict(duration=durations, observed=observed),
+        index=[f"Sample {i}" for i in range(6)],
+    )
+    test_stat, p_val = utils.multivariate_logrank_test(yhat, survival)
+    assert p_val <= 1.0
+
+
+def test_select_clinical_factors():
+    dummy_z = pd.DataFrame(
+        [
+            [1, 1, 1, 0, 0, 0, 1, 0, 1],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 1, 0, 0, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 1, 1, 1, 1],
+        ],
+        index=[f"sample {i}" for i in range(11)],
+        columns=[f"LF{i}" for i in range(9)],
+    )  # here the first 3 factors separate the groups and the last 6 do not
+
+    durations = [
+        1,
+        2,
+        3,
+        4,
+        5,
+        6,
+        1000,
+        2000,
+        3000,
+        4000,
+        5000,
+    ]  # here the first 3 have short durations, the last 3 longer ones
+    observed = [True] * 11  # all events observed
+    survival = pd.DataFrame(
+        dict(duration=durations, observed=observed),
+        index=[f"sample {i}" for i in range(11)],
+    )
+
+    z_clinical = utils.select_clinical_factors(dummy_z, survival, cox_penalizer=1, alpha=.1)
+    assert "LF0" in z_clinical.columns
+    assert "LF1" in z_clinical.columns
+    assert "LF2" in z_clinical.columns
+
+    assert "LF3" not in z_clinical.columns
+    assert "LF4" not in z_clinical.columns
+    assert "LF5" not in z_clinical.columns
+
+
+def test_compute_harrells_c():
+    dummy_z = pd.DataFrame(
+        [
+            [1, 1, 1, 0, 0, 0, 1, 0, 1],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 0, 1, 1, 1, 0],
+            [1, 1, 1, 1, 1, 0, 0, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 0, 1, 1, 0],
+            [0, 0, 0, 1, 0, 1, 1, 1, 1],
+        ],
+        index=[f"sample {i}" for i in range(11)],
+        columns=[f"LF{i}" for i in range(9)],
+    )  # here the first 3 factors separate the groups and the last 6 do not
+
+    durations = [
+        1,
+        2,
+        3,
+        4,
+        5,
+        6,
+        1000,
+        2000,
+        3000,
+        4000,
+        5000,
+    ]  # here the first 3 have short durations, the last 3 longer ones
+    observed = [True] * 11  # all events observed
+    survival = pd.DataFrame(
+        dict(duration=durations, observed=observed),
+        index=[f"sample {i}" for i in range(11)],
+    )
+    z_clinical = utils.select_clinical_factors(dummy_z, survival, cox_penalizer=1, alpha=.1)
+
+    np.random.seed(0)
+    c = utils.compute_harrells_c(z_clinical, survival, cv_folds=3)
+    assert np.allclose(c, [0.5, 0.8, 0.5], atol=.05)