[e5f1db]: / tests / tools / test_sa.py

Download this file

132 lines (107 with data), 5.2 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import numpy as np
import pytest
import statsmodels
from lifelines import (
CoxPHFitter,
KaplanMeierFitter,
LogLogisticAFTFitter,
NelsonAalenFitter,
WeibullAFTFitter,
WeibullFitter,
)
import ehrapy as ep
@pytest.fixture
def mimic_2_sa():
adata = ep.dt.mimic_2(encoded=False)
adata[:, ["censor_flg"]].X = np.where(adata[:, ["censor_flg"]].X == 0, 1, 0)
adata = adata[:, ["mort_day_censored", "censor_flg"]].copy()
duration_col, event_col = "mort_day_censored", "censor_flg"
return adata, duration_col, event_col
class TestSA:
def test_ols(self):
adata = ep.dt.mimic_2(encoded=False)
formula = "tco2_first ~ pco2_first"
var_names = ["tco2_first", "pco2_first"]
ols = ep.tl.ols(adata, var_names, formula, missing="drop")
s = ols.fit().params.iloc[1]
i = ols.fit().params.iloc[0]
assert isinstance(ols, statsmodels.regression.linear_model.OLS)
assert 0.18857179158259973 == pytest.approx(s)
assert 16.210859352601442 == pytest.approx(i)
def test_glm(self):
adata = ep.dt.mimic_2(encoded=False)
formula = "day_28_flg ~ age"
var_names = ["day_28_flg", "age"]
family = "Binomial"
glm = ep.tl.glm(adata, var_names, formula, family, missing="drop", as_continuous=["age"])
Intercept = glm.fit().params.iloc[0]
age = glm.fit().params.iloc[1]
assert isinstance(glm, statsmodels.genmod.generalized_linear_model.GLM)
assert 5.778006344870297 == pytest.approx(Intercept)
assert -0.06523274132877163 == pytest.approx(age)
@pytest.mark.parametrize("weightings", ["wilcoxon", "tarone-ware", "peto", "fleming-harrington"])
def test_calculate_logrank_pvalue(self, weightings):
durations_A = [1, 2, 3]
event_observed_A = [1, 1, 0]
durations_B = [1, 2, 3, 4]
event_observed_B = [1, 0, 0, 1]
kmf1 = KaplanMeierFitter()
kmf1.fit(durations_A, event_observed_A)
kmf2 = KaplanMeierFitter()
kmf2.fit(durations_B, event_observed_B)
results_pairwise = ep.tl.test_kmf_logrank(kmf1, kmf2)
p_value_pairwise = results_pairwise.p_value
assert 0 < p_value_pairwise < 1
def test_anova_glm(self):
adata = ep.dt.mimic_2(encoded=False)
formula = "day_28_flg ~ age"
var_names = ["day_28_flg", "age"]
family = "Binomial"
age_glm = ep.tl.glm(adata, var_names, formula, family, missing="drop", as_continuous=["age"])
age_glm_result = age_glm.fit()
formula = "day_28_flg ~ age + service_unit"
var_names = ["day_28_flg", "age", "service_unit"]
ageunit_glm = ep.tl.glm(adata, var_names, formula, family="Binomial", missing="drop", as_continuous=["age"])
ageunit_glm_result = ageunit_glm.fit()
dataframe = ep.tl.anova_glm(
age_glm_result, ageunit_glm_result, "day_28_flg ~ age", "day_28_flg ~ age + service_unit"
)
assert len(dataframe) == 2
assert dataframe.shape == (2, 6)
assert dataframe.iloc[1, 4] == 2
assert pytest.approx(dataframe.iloc[1, 5], 0.1) == 0.103185
def _sa_function_assert(self, model, model_class, adata=None):
assert isinstance(model, model_class)
assert len(model.durations) == 1776
assert sum(model.event_observed) == 497
if adata is not None: # doing it disway, due to legacy kmf function
model_summary = adata.uns.get("test")
assert model_summary is not None
if isinstance(model, KaplanMeierFitter) or isinstance(
model, NelsonAalenFitter
): # kmf and nelson_aalen have event_table
assert model_summary.equals(model.event_table)
else:
assert model_summary.equals(model.summary)
def _sa_func_test(self, sa_function, sa_class, mimic_2_sa):
adata, duration_col, event_col = mimic_2_sa
sa = sa_function(adata, duration_col=duration_col, event_col=event_col, uns_key="test")
self._sa_function_assert(sa, sa_class, adata)
def test_kmf(self, mimic_2_sa):
# check for deprecation warning
with pytest.warns(DeprecationWarning):
adata, _, _ = mimic_2_sa
kmf = ep.tl.kmf(adata[:, ["mort_day_censored"]].X, adata[:, ["censor_flg"]].X)
self._sa_function_assert(kmf, KaplanMeierFitter)
def test_kaplan_meier(self, mimic_2_sa):
self._sa_func_test(ep.tl.kaplan_meier, KaplanMeierFitter, mimic_2_sa)
def test_cox_ph(self, mimic_2_sa):
self._sa_func_test(ep.tl.cox_ph, CoxPHFitter, mimic_2_sa)
def test_nelson_aalen(self, mimic_2_sa):
self._sa_func_test(ep.tl.nelson_aalen, NelsonAalenFitter, mimic_2_sa)
def test_weibull(self, mimic_2_sa):
self._sa_func_test(ep.tl.weibull, WeibullFitter, mimic_2_sa)
def test_weibull_aft(self, mimic_2_sa):
self._sa_func_test(ep.tl.weibull_aft, WeibullAFTFitter, mimic_2_sa)
def test_log_logistic(self, mimic_2_sa):
self._sa_func_test(ep.tl.log_logistic_aft, LogLogisticAFTFitter, mimic_2_sa)