Diff of /test/test_utils.py [000000] .. [433586]

Switch to unified view

a b/test/test_utils.py
1
import pytest
2
import numpy as np
3
import pandas as pd
4
from scipy import spatial, cluster
5
6
from maui import utils
7
8
9
def test_merge_factors():
10
    z = pd.DataFrame(
11
        [
12
            [1, 1, 1, 0, 0, 0, 1, 0, 0],
13
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
14
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
15
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
16
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
17
            [1, 1, 1, 1, 1, 0, 0, 1, 0],
18
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
19
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
20
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
21
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
22
            [0, 0, 0, 1, 0, 1, 1, 1, 0],
23
        ],
24
        index=[f"sample {i}" for i in range(11)],
25
        columns=[f"LF{i}" for i in range(9)],
26
        dtype=float,
27
    )  # expect 0,1,2 to be merged, and 3,7 to be merged
28
29
    z_merged = utils.merge_factors(z, metric="euclidean", plot_dendro=False)
30
31
    assert z_merged.shape[1] == 6
32
    assert "0_1_2" in z_merged.columns
33
    assert "3_7" in z_merged.columns
34
35
36
def test_merge_factors_with_custom_linkage():
37
    z = pd.DataFrame(
38
        [
39
            [1, 1, 1, 0, 0, 0, 1, 0, 0],
40
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
41
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
42
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
43
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
44
            [1, 1, 1, 1, 1, 0, 0, 1, 0],
45
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
46
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
47
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
48
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
49
            [0, 0, 0, 1, 0, 1, 1, 1, 0],
50
        ],
51
        index=[f"sample {i}" for i in range(11)],
52
        columns=[f"LF{i}" for i in range(9)],
53
        dtype=float,
54
    )  # expect 0,1,2 to be merged, and 3,7 to be merged
55
56
    l = cluster.hierarchy.linkage(spatial.distance.pdist(z.T, "minkowski"), "average")
57
58
    z_merged = utils.merge_factors(z, l=l, plot_dendro=False)
59
60
    assert z_merged.shape[1] == 6
61
    assert "0_1_2" in z_merged.columns
62
    assert "3_7" in z_merged.columns
63
64
65
def test_filter_factors_by_r2():
66
    dummy_z = pd.DataFrame(
67
        [[0, 1, 2], [1, 0, 2]],
68
        index=["sample 1", "sample 2"],
69
        columns=["LF1", "LF2", "LF3"],
70
    )
71
72
    dummy_x = pd.DataFrame(
73
        [[1.0, 1.0, 1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0, 1.0, 1.0]],
74
        columns=[f"feature{i}" for i in range(6)],
75
        index=["sample 1", "sample 2"],
76
    ).T
77
78
    z_filt = utils.filter_factors_by_r2(dummy_z, dummy_x)
79
    assert z_filt.columns.tolist() == ["LF1", "LF2"]
80
81
def test_map_factors_to_feaures_using_linear_models():
82
    dummy_z = pd.DataFrame(
83
        [[0, 1], [1, 0]], index=["sample 1", "sample 2"], columns=["LF1", "LF2"]
84
    )
85
86
    dummy_x = pd.DataFrame(
87
        [[1.0, 1.0, 1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0, 1.0, 1.0]],
88
        columns=[f"feature{i}" for i in range(6)],
89
        index=["sample 1", "sample 2"],
90
    ).T
91
92
    expected_w = np.array(
93
        [[-2.0, 2.0], [-2.0, 2.0], [-2.0, 2.0], [2.0, -2.0], [2.0, -2.0], [2.0, -2.0]]
94
    )
95
96
    w = utils.map_factors_to_feaures_using_linear_models(dummy_z, dummy_x)
97
98
    assert np.allclose(w, expected_w)
99
100
101
def test_correlate_factors_and_features():
102
    dummy_z = pd.DataFrame(
103
        [[0, 1], [1, 0]], index=["sample 1", "sample 2"], columns=["LF1", "LF2"]
104
    )
105
106
    dummy_x = pd.DataFrame(
107
        [[1, 1, 1, 0, 0, 0], [0, 0, 0, 1, 1, 1]],
108
        columns=[f"feature{i}" for i in range(6)],
109
        index=["sample 1", "sample 2"],
110
    )
111
112
    expected_corrs = np.array(
113
        [[-1.0, 1.0], [-1.0, 1.0], [-1.0, 1.0], [1.0, -1.0], [1.0, -1.0], [1.0, -1.0]]
114
    )
115
116
    corrs = utils.correlate_factors_and_features(dummy_z, dummy_x)
117
118
    assert np.allclose(corrs, expected_corrs)
119
120
121
def test_compute_roc():
122
    np.random.seed(0)
123
    dummy_z = pd.DataFrame(
124
        [
125
            [0, 1, 1, 1, 0, 1, 1, 0, 0],
126
            [1, 0, 0, 0, 0, 0, 1, 1, 0],
127
            [1, 0, 1, 0, 0, 0, 1, 1, 0],
128
            [1, 0, 0, 1, 0, 0, 1, 1, 0],
129
            [1, 0, 0, 0, 1, 1, 1, 1, 0],
130
            [1, 1, 1, 0, 0, 0, 1, 1, 1],
131
        ],
132
        index=[f"sample {i}" for i in range(6)],
133
        columns=[f"LF{i}" for i in range(9)],
134
    )
135
    dummy_y = pd.Series(["a", "b", "a", "c", "b", "c"], index=dummy_z.index)
136
137
    roc_curves = utils.compute_roc(dummy_z, dummy_y, cv_folds=2)
138
    assert np.allclose(roc_curves["a"].FPR, [0.0, 0.5, 0.5, 0.75, 1.0])
139
140
141
def test_compute_auc():
142
    fpr = [0.0, 0.0, 0.5, 0.5, 1.0]
143
    tpr = [0.0, 0.5, 0.5, 1.0, 1.0]
144
    roc = utils.auc(fpr, tpr)
145
    assert roc - 0.75 < 1e-6
146
147
148
def test_estimate_km():
149
    yhat = pd.Series(
150
        ["a", "a", "a", "b", "b", "b"], index=[f"Sample {i}" for i in range(6)]
151
    )
152
    durations = np.random.poisson(6, 6)
153
    observed = np.random.randn(6) > 0.1
154
    survival = pd.DataFrame(
155
        dict(duration=durations, observed=observed),
156
        index=[f"Sample {i}" for i in range(6)],
157
    )
158
    km = utils.estimate_kaplan_meier(yhat, survival)
159
160
    assert "a" in km.columns
161
    assert "b" in km.columns
162
163
164
def test_multivariate_logrank_test():
165
    yhat = pd.Series(
166
        ["a", "a", "a", "b", "b", "b"], index=[f"Sample {i}" for i in range(6)]
167
    )
168
    durations = np.random.poisson(6, 6)
169
    observed = np.random.randn(6) > 0.1
170
    survival = pd.DataFrame(
171
        dict(duration=durations, observed=observed),
172
        index=[f"Sample {i}" for i in range(6)],
173
    )
174
    test_stat, p_val = utils.multivariate_logrank_test(yhat, survival)
175
    assert p_val <= 1.0
176
177
178
def test_select_clinical_factors():
179
    dummy_z = pd.DataFrame(
180
        [
181
            [1, 1, 1, 0, 0, 0, 1, 0, 1],
182
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
183
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
184
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
185
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
186
            [1, 1, 1, 1, 1, 0, 0, 1, 0],
187
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
188
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
189
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
190
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
191
            [0, 0, 0, 1, 0, 1, 1, 1, 1],
192
        ],
193
        index=[f"sample {i}" for i in range(11)],
194
        columns=[f"LF{i}" for i in range(9)],
195
    )  # here the first 3 factors separate the groups and the last 6 do not
196
197
    durations = [
198
        1,
199
        2,
200
        3,
201
        4,
202
        5,
203
        6,
204
        1000,
205
        2000,
206
        3000,
207
        4000,
208
        5000,
209
    ]  # here the first 3 have short durations, the last 3 longer ones
210
    observed = [True] * 11  # all events observed
211
    survival = pd.DataFrame(
212
        dict(duration=durations, observed=observed),
213
        index=[f"sample {i}" for i in range(11)],
214
    )
215
216
    z_clinical = utils.select_clinical_factors(dummy_z, survival, cox_penalizer=1, alpha=.1)
217
    assert "LF0" in z_clinical.columns
218
    assert "LF1" in z_clinical.columns
219
    assert "LF2" in z_clinical.columns
220
221
    assert "LF3" not in z_clinical.columns
222
    assert "LF4" not in z_clinical.columns
223
    assert "LF5" not in z_clinical.columns
224
225
226
def test_compute_harrells_c():
227
    dummy_z = pd.DataFrame(
228
        [
229
            [1, 1, 1, 0, 0, 0, 1, 0, 1],
230
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
231
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
232
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
233
            [1, 1, 1, 1, 0, 1, 1, 1, 0],
234
            [1, 1, 1, 1, 1, 0, 0, 1, 0],
235
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
236
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
237
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
238
            [0, 0, 0, 1, 0, 0, 1, 1, 0],
239
            [0, 0, 0, 1, 0, 1, 1, 1, 1],
240
        ],
241
        index=[f"sample {i}" for i in range(11)],
242
        columns=[f"LF{i}" for i in range(9)],
243
    )  # here the first 3 factors separate the groups and the last 6 do not
244
245
    durations = [
246
        1,
247
        2,
248
        3,
249
        4,
250
        5,
251
        6,
252
        1000,
253
        2000,
254
        3000,
255
        4000,
256
        5000,
257
    ]  # here the first 3 have short durations, the last 3 longer ones
258
    observed = [True] * 11  # all events observed
259
    survival = pd.DataFrame(
260
        dict(duration=durations, observed=observed),
261
        index=[f"sample {i}" for i in range(11)],
262
    )
263
    z_clinical = utils.select_clinical_factors(dummy_z, survival, cox_penalizer=1, alpha=.1)
264
265
    np.random.seed(0)
266
    c = utils.compute_harrells_c(z_clinical, survival, cv_folds=3)
267
    assert np.allclose(c, [0.5, 0.8, 0.5], atol=.05)