a b/data/warps.py
1
import numpy as np
2
import matplotlib.pyplot as plt
3
import pandas as pd
4
5
import seaborn as sns
6
import sys
7
8
sys.path.append("../..")
9
10
11
# from gp_functions import rbf_covariance
12
from gpsa.util import rbf_kernel_numpy as rbf_covariance
13
from gpsa import polar_warp
14
from scipy.stats import multivariate_normal as mvnpy
15
16
17
def apply_gp_warp(
18
    X_orig_single,
19
    Y_orig_single,
20
    n_views,
21
    # n_samples_per_view,
22
    noise_variance=0.0,
23
    kernel_variance=1.0,
24
    kernel_lengthscale=1.0,
25
    mean_slope=1.0,
26
    mean_intercept=0.0,
27
):
28
29
    n_samples_per_view = X_orig_single.shape[0]
30
    n_spatial_dims = X_orig_single.shape[1]
31
32
    kernel = rbf_covariance
33
    kernel_params_true = np.array([np.log(1.0), np.log(1.0)])
34
35
    X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0)
36
37
    n_samples_list = [n_samples_per_view] * n_views
38
    cumulative_sums = np.cumsum(n_samples_list)
39
    cumulative_sums = np.insert(cumulative_sums, 0, 0)
40
    view_idx = np.array(
41
        [
42
            np.arange(cumulative_sums[ii], cumulative_sums[ii + 1])
43
            for ii in range(n_views)
44
        ]
45
    )
46
    n = np.sum(n_samples_list)
47
48
    X = X_orig.copy()
49
50
    # Draw warped coordinates from a GP
51
    warp_kernel_params_true = np.array(
52
        [np.log(kernel_variance), np.log(kernel_lengthscale)]
53
    )
54
55
    for vv in range(n_views):
56
57
        for ss in range(n_spatial_dims):
58
            X_curr_view_warped = mvnpy.rvs(
59
                mean=X_orig_single[:, ss] * mean_slope + mean_intercept,
60
                cov=kernel(X_orig_single, X_orig_single, warp_kernel_params_true),
61
            )
62
            # import ipdb; ipdb.set_trace()
63
            X[
64
                n_samples_per_view * vv : n_samples_per_view * (vv + 1), ss
65
            ] = X_curr_view_warped
66
67
    Y = np.concatenate([Y_orig_single] * n_views, axis=0)
68
    Y += np.random.normal(scale=np.sqrt(noise_variance), size=Y.shape)
69
70
    return X, Y, n_samples_list, view_idx
71
72
73
def apply_gp_warp_multimodal(
74
    X_orig_singles,
75
    Y_orig_singles,
76
    n_views,
77
    # n_samples_per_view,
78
    noise_variance=0.0,
79
    kernel_variance=1.0,
80
    kernel_lengthscale=1.0,
81
    mean_slope=1.0,
82
    mean_intercept=0.0,
83
):
84
    assert len(X_orig_singles) == len(Y_orig_singles)
85
86
    n_modalities = len(X_orig_singles)
87
88
    modality_idx = np.cumsum([x.shape[0] for x in X_orig_singles])
89
    modality_idx = np.insert(modality_idx, 0, 0)
90
91
    kernel = rbf_covariance
92
    kernel_params_true = np.array([np.log(1.0), np.log(1.0)])
93
94
    X_orig_single = np.concatenate(X_orig_singles, axis=0)
95
96
    X_orig_single = X_orig_single - X_orig_single.min(0)
97
    X_orig_single = X_orig_single / X_orig_single.max(0)
98
    X_orig_single *= 10
99
100
    X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0)
101
102
    n_samples_per_view = X_orig_single.shape[0]
103
    n_spatial_dims = X_orig_single.shape[1]
104
105
    n_samples_list = [n_samples_per_view] * n_views
106
    cumulative_sums = np.cumsum(n_samples_list)
107
    cumulative_sums = np.insert(cumulative_sums, 0, 0)
108
    view_idx = np.array(
109
        [
110
            np.arange(cumulative_sums[ii], cumulative_sums[ii + 1])
111
            for ii in range(n_views)
112
        ]
113
    )
114
    n = np.sum(n_samples_list)
115
116
    X = X_orig.copy()
117
118
    # Draw warped coordinates from a GP
119
    warp_kernel_params_true = np.array(
120
        [np.log(kernel_variance), np.log(kernel_lengthscale)]
121
    )
122
123
    for vv in range(n_views):
124
125
        curr_idx = np.arange(n_samples_per_view * vv, n_samples_per_view * (vv + 1))
126
127
        for ss in range(n_spatial_dims):
128
            X_curr_view_warped = mvnpy.rvs(
129
                mean=X_orig_single[:, ss] * mean_slope + mean_intercept,
130
                cov=kernel(X_orig_single, X_orig_single, warp_kernel_params_true),
131
            )
132
            # import ipdb; ipdb.set_trace()
133
            X[curr_idx, ss] = X_curr_view_warped
134
135
    view_idx = np.cumsum([n_samples_per_view * vv for vv in range(n_views + 1)])
136
137
    X_warped = []
138
    Y_warped = []
139
    n_samples_list = []
140
    for mm in range(n_modalities):
141
142
        curr_modality_idx = np.concatenate(
143
            [
144
                view_idx[vv] + np.arange(modality_idx[mm], modality_idx[mm + 1])
145
                for vv in range(n_views)
146
            ]
147
        )
148
        X_warped.append(X[curr_modality_idx])
149
150
        Y_full_mm = np.concatenate([Y_orig_singles[mm]] * n_views, axis=0)
151
        Y_full_mm += np.random.normal(
152
            scale=np.sqrt(noise_variance), size=Y_full_mm.shape
153
        )
154
        Y_warped.append(Y_full_mm)
155
        n_samples_list.append([X_orig_singles[mm].shape[0]] * n_views)
156
157
    return X_warped, Y_warped, n_samples_list, view_idx
158
159
160
def apply_linear_warp(
161
    X_orig_single,
162
    Y_orig_single,
163
    n_views,
164
    linear_slope_variance=0.1,
165
    linear_intercept_variance=0.1,
166
    noise_variance=0.01,
167
    rotation=True,
168
):
169
170
    n_samples_per_view = X_orig_single.shape[0]
171
    n_spatial_dims = X_orig_single.shape[1]
172
173
    kernel = rbf_covariance
174
    kernel_params_true = np.array([np.log(1.0), np.log(1.0)])
175
176
    X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0)
177
178
    n_samples_list = [n_samples_per_view] * n_views
179
    cumulative_sums = np.cumsum(n_samples_list)
180
    cumulative_sums = np.insert(cumulative_sums, 0, 0)
181
    view_idx = np.array(
182
        [
183
            np.arange(cumulative_sums[ii], cumulative_sums[ii + 1])
184
            for ii in range(n_views)
185
        ]
186
    )
187
    n = np.sum(n_samples_list)
188
189
    X = X_orig.copy()
190
191
    for vv in range(n_views):
192
193
        # curr_slopes = np.eye(n_spatial_dims) + np.random.normal(
194
        #     loc=0,
195
        #     scale=np.sqrt(linear_slope_variance),
196
        #     size=(n_spatial_dims, n_spatial_dims),
197
        # )
198
        # import ipdb; ipdb.set_trace()
199
        # curr_slopes /= np.linalg.norm(curr_slopes, ord=2, axis=0)
200
        # curr_slopes = np.linalg.svd(curr_slopes)[0]
201
202
        # curr_slopes = np.random.normal(
203
        #     loc=1,
204
        #     scale=np.sqrt(linear_slope_variance),
205
        #     size=n_spatial_dims,
206
        # )
207
208
        # curr_intercepts = np.random.normal(
209
        #     loc=0, scale=np.sqrt(linear_intercept_variance), size=n_spatial_dims
210
        # )
211
212
        curr_slopes = np.random.uniform(
213
            low=1 - linear_slope_variance,
214
            high=1 + linear_slope_variance,
215
            size=n_spatial_dims,
216
        )
217
218
        curr_intercepts = np.random.uniform(
219
            low=linear_intercept_variance,
220
            high=linear_intercept_variance,
221
            size=n_spatial_dims,
222
        )
223
        # print(curr_slopes, curr_intercepts)
224
225
        X_curr_view_warped = X_orig_single * curr_slopes + curr_intercepts
226
        X[
227
            n_samples_per_view * vv : n_samples_per_view * (vv + 1), :
228
        ] = X_curr_view_warped
229
230
    Y = np.concatenate([Y_orig_single] * n_views, axis=0)
231
    Y += np.random.normal(scale=np.sqrt(noise_variance), size=Y.shape)
232
233
    return X, Y, n_samples_list, view_idx
234
235
236
def apply_polar_warp(
237
    X_orig_single,
238
    Y_orig_single,
239
    n_views,
240
    linear_slope_variance=0.1,
241
    linear_intercept_variance=0.1,
242
    noise_variance=0.01,
243
    rotation=True,
244
):
245
246
    n_samples_per_view = X_orig_single.shape[0]
247
    n_spatial_dims = X_orig_single.shape[1]
248
249
    kernel = rbf_covariance
250
    kernel_params_true = np.array([np.log(1.0), np.log(1.0)])
251
252
    X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0)
253
254
    n_samples_list = [n_samples_per_view] * n_views
255
    cumulative_sums = np.cumsum(n_samples_list)
256
    cumulative_sums = np.insert(cumulative_sums, 0, 0)
257
    view_idx = np.array(
258
        [
259
            np.arange(cumulative_sums[ii], cumulative_sums[ii + 1])
260
            for ii in range(n_views)
261
        ]
262
    )
263
    n = np.sum(n_samples_list)
264
265
    X = X_orig.copy()
266
267
    # no_distortion_B = np.array([
268
    #         [0, np.pi * 0.5],
269
    #         [0, 0]
270
    #     ])
271
272
    for vv in range(n_views):
273
274
        # B = np.random.normal(
275
        #     loc=0,
276
        #     scale=np.sqrt(linear_slope_variance),
277
        #     size=(n_spatial_dims, n_spatial_dims),
278
        # )
279
        B = np.random.uniform(
280
            low=-linear_slope_variance,
281
            high=linear_slope_variance,
282
            size=(n_spatial_dims, n_spatial_dims),
283
        )
284
        polar_params = X_orig_single @ B
285
        r, theta = polar_params[:, 0], polar_params[:, 1]
286
        X_curr_view_warped = np.array(
287
            [
288
                X_orig_single[:, 0] + r * np.cos(theta),
289
                X_orig_single[:, 1] + r * np.sin(theta),
290
            ]
291
        ).T
292
        # import ipdb; ipdb.set_trace()
293
        # additive_warp = B[:, 0] * X_orig_single * np.vstack([np.cos(X_orig_single[:, 0] * B[0, 1]), np.sin(X_orig_single[:, 1] * B[1, 1])]).T
294
        # X_curr_view_warped = X_orig_single + additive_warp
295
296
        # X_curr_view_warped = X_orig_single @ curr_slopes + curr_intercepts
297
        X[
298
            n_samples_per_view * vv : n_samples_per_view * (vv + 1), :
299
        ] = X_curr_view_warped
300
301
    Y = np.concatenate([Y_orig_single] * n_views, axis=0)
302
    Y += np.random.normal(scale=np.sqrt(noise_variance), size=Y.shape)
303
304
    return X, Y, n_samples_list, view_idx
305
306
307
if __name__ == "__main__":
308
309
    pass