Switch to unified view

a b/tests/deseq2/test_dispersions.py
1
import unittest
2
3
import numpy as np
4
import pandas as pd
5
import patsy
6
from scipy.optimize import minimize
7
8
from inmoose.deseq2 import (
9
    DESeq,
10
    DESeqDataSet,
11
    estimateDispersionsFit,
12
    estimateDispersionsGeneEst,
13
    estimateDispersionsMAP,
14
    makeExampleDESeqDataSet,
15
)
16
from inmoose.deseq2.deseq2_cpp import fitDisp
17
from inmoose.deseq2.fitNbinomGLMs import fitNbinomGLMs
18
from inmoose.utils import Factor, dnbinom_mu, dnorm
19
20
21
class Test(unittest.TestCase):
22
    def test_dispersion_errors(self):
23
        """test that expected errors are thrown during dispersion estimation"""
24
        dds = makeExampleDESeqDataSet(n=100, m=2)
25
        dds = dds.estimateSizeFactors()
26
        with self.assertRaisesRegex(
27
            ValueError,
28
            expected_regex="the number of samples and the number of model coefficients are equal",
29
        ):
30
            estimateDispersionsGeneEst(dds)
31
32
        dds = makeExampleDESeqDataSet(
33
            n=100,
34
            m=4,
35
            dispMeanRel=lambda x: 0.001 + x / 1e3,
36
            interceptMean=8,
37
            interceptSD=2,
38
        )
39
        dds = dds.estimateSizeFactors()
40
        dds.var["dispGeneEst"] = np.repeat(1e-7, 100)
41
        with self.assertRaisesRegex(
42
            ValueError,
43
            expected_regex="all gene-wise dispersion estimates are within 2 orders of magnitude",
44
        ):
45
            estimateDispersionsFit(dds)
46
        dds = estimateDispersionsGeneEst(dds)
47
        # TODO expect message "note: fitType='parametric', but the dispersion trend was not well captured"
48
        # estimateDispersionsFit(dds)
49
50
        dds = makeExampleDESeqDataSet(n=100, m=4)
51
        dds = dds.estimateSizeFactors()
52
        dds.var["dispGeneEst"] = np.repeat(1e-7, 100)
53
        dds.setDispFunction(lambda x: 1e-6)
54
        with self.assertLogs("inmoose", level="WARNING") as logChecker:
55
            estimateDispersionsMAP(dds)
56
        self.assertRegex(
57
            logChecker.output[0],
58
            "all genes have dispersion estimates < 1e-06, returning disp = 1e-07",
59
        )
60
61
        dds = makeExampleDESeqDataSet(n=100, m=4)
62
        dds = dds.estimateSizeFactors()
63
        dds.obs["condition"] = Factor(dds.obs["condition"]).add_categories("C")
64
        dds.design = "~condition"
65
        with self.assertRaisesRegex(
66
            ValueError, expected_regex="the model matrix is not full rank"
67
        ):
68
            dds.estimateDispersions()
69
        dds.obs["condition"] = Factor(dds.obs["condition"]).droplevels()
70
        dds.obs["group"] = dds.obs["condition"]
71
        dds.design = "~ group + condition"
72
        with self.assertRaisesRegex(
73
            ValueError, expected_regex="the model matrix is not full rank"
74
        ):
75
            dds.estimateDispersions()
76
77
        dds = makeExampleDESeqDataSet(n=100, m=2)
78
        with self.assertRaisesRegex(
79
            ValueError,
80
            expected_regex="The design matrix has the same number of samples and coefficients to fit",
81
        ):
82
            DESeq(dds)
83
84
    def test_dispersion_fitting(self):
85
        """test that the fitting of dispersion gives the expected values using various methods"""
86
        # test the optimization of the logarithm of dispersion (alpha)
87
        # parameter with Cox-Reid adjustment and prior distribution.
88
        # also test the derivatives of the log posterior w.r.t. log alpha
89
        m = 10
90
        # y = scipy.stats.poisson.rvs(20, size=m, random_state=42)
91
        y = np.array([17, 25, 25, 21, 13, 22, 23, 22, 18, 26])
92
        sf = np.ones(m)
93
        condition = Factor(np.repeat([0, 1], [m / 2, m / 2]))
94
        x = patsy.dmatrix("1+condition", data={"condition": condition})
95
96
        lambda_ = 2
97
        alpha = 0.5
98
99
        # make a DESeqDataSet but don't use the design formula
100
        # instead we supply directly a model matrix
101
        dds = DESeqDataSet(
102
            y[:, None],
103
            clinicalData=pd.DataFrame({"condition": condition}),
104
            design="~condition",
105
        )
106
        dds.sizeFactors = sf
107
        dds.var["dispersion"] = alpha
108
        dds.var["baseMean"] = np.mean(y)
109
110
        # for testing we convert beta to the natural log scale:
111
        # convert lambda from log to log2 scale by multiplying by log(2)**2
112
        # then convert beta back from log2 to log scale by multiplying by log(2)
113
        betaDESeq = (
114
            np.log(2)
115
            * fitNbinomGLMs(dds, lambda_=[0, lambda_ * np.log(2) ** 2], modelMatrix=x)[
116
                "betaMatrix"
117
            ]
118
        )
119
        log_alpha_prior_mean = 0.5
120
        log_alpha_prior_sigmasq = 1
121
        mu_hat = np.exp(x @ betaDESeq.T).values.squeeze()
122
123
        dispRes = fitDisp(
124
            y[:, None],
125
            x=x,
126
            mu_hat=mu_hat[:, None],
127
            log_alpha=0,
128
            log_alpha_prior_mean=log_alpha_prior_mean,
129
            log_alpha_prior_sigmasq=log_alpha_prior_sigmasq,
130
            min_log_alpha=np.log(1e-8),
131
            kappa_0=1,
132
            tol=1e-16,
133
            maxit=100,
134
            usePrior=True,
135
            weights=np.ones((len(y), 1)),
136
            useWeights=False,
137
            weightThreshold=1e-2,
138
            useCR=True,
139
        )
140
141
        # maximum a posteriori (MAP) estimate from DESeq
142
        dispDESeq = dispRes["log_alpha"]
143
144
        # MAP estimate using optim
145
        def logPost(log_alpha):
146
            alpha = np.exp(log_alpha)
147
            w = np.diag(1 / (1 / mu_hat**2 * (mu_hat + alpha * mu_hat**2)))
148
            logLike = np.sum(dnbinom_mu(y, mu=mu_hat, size=1 / alpha, log=True))
149
            coxReid = -0.5 * np.linalg.slogdet(x.T @ w @ x)[1]
150
            logPrior = dnorm(
151
                log_alpha,
152
                log_alpha_prior_mean,
153
                np.sqrt(log_alpha_prior_sigmasq),
154
                log=True,
155
            )
156
            res = logLike + coxReid + logPrior
157
            return res
158
159
        dispOptim = minimize(lambda p: -logPost(p), 0).x
160
161
        self.assertTrue(np.allclose(dispDESeq, dispOptim))
162
163
        # check derivatives:
164
165
        # from Ted Harding https://stat.ethz.ch/pipermail/r-help/2007-September/140013.html
166
        def num_deriv(f, x, h=0.001):
167
            return (f(x + h / 2) - f(x - h / 2)) / h
168
169
        def num_2nd_deriv(f, x, h=0.001):
170
            return (f(x + h) - 2 * f(x) + f(x - h)) / h**2
171
172
        # first derivative of log posterior wrt log alpha at start
173
        dispDerivDESeq = dispRes["initial_dlp"]
174
        dispDerivNum = num_deriv(logPost, 0)
175
        self.assertTrue(np.allclose(dispDerivDESeq, dispDerivNum))
176
177
        # second derivative at finish
178
        dispD2DESeq = dispRes["last_d2lp"]
179
        dispD2Num = num_2nd_deriv(logPost, dispRes["log_alpha"])
180
        self.assertTrue(np.allclose(dispD2DESeq, dispD2Num))
181
182
        # test fit alternative
183
        dds = makeExampleDESeqDataSet()
184
        dds = dds.estimateSizeFactors()
185
        # ddsLocal = dds.copy().estimateDispersions(fitType="local")
186
        dds.copy().estimateDispersions(fitType="mean")
187
        ddsMed = estimateDispersionsGeneEst(dds.copy())
188
        useForMedian = ddsMed.var["dispGeneEst"] > 1e-7
189
        medianDisp = np.nanmedian(ddsMed.var["dispGeneEst"][useForMedian])
190
        ddsMed.setDispFunction(lambda x: medianDisp)
191
        ddsMed = estimateDispersionsMAP(ddsMed)
192
193
        # test iterative
194
        dds = makeExampleDESeqDataSet(m=50, n=100, betaSD=1, interceptMean=8, seed=42)
195
        dds = dds.estimateSizeFactors()
196
        dds = estimateDispersionsGeneEst(dds, niter=5)
197
        dds = dds[:, ~dds.var["allZero"]]
198
        self.assertTrue(
199
            np.allclose(dds.var["trueDisp"], dds.var["dispGeneEst"], rtol=0.7)
200
        )