Diff of /data/warps.py [000000] .. [5c09f6]

Switch to side-by-side view

--- a
+++ b/data/warps.py
@@ -0,0 +1,309 @@
+import numpy as np
+import matplotlib.pyplot as plt
+import pandas as pd
+
+import seaborn as sns
+import sys
+
+sys.path.append("../..")
+
+
+# from gp_functions import rbf_covariance
+from gpsa.util import rbf_kernel_numpy as rbf_covariance
+from gpsa import polar_warp
+from scipy.stats import multivariate_normal as mvnpy
+
+
+def apply_gp_warp(
+    X_orig_single,
+    Y_orig_single,
+    n_views,
+    # n_samples_per_view,
+    noise_variance=0.0,
+    kernel_variance=1.0,
+    kernel_lengthscale=1.0,
+    mean_slope=1.0,
+    mean_intercept=0.0,
+):
+
+    n_samples_per_view = X_orig_single.shape[0]
+    n_spatial_dims = X_orig_single.shape[1]
+
+    kernel = rbf_covariance
+    kernel_params_true = np.array([np.log(1.0), np.log(1.0)])
+
+    X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0)
+
+    n_samples_list = [n_samples_per_view] * n_views
+    cumulative_sums = np.cumsum(n_samples_list)
+    cumulative_sums = np.insert(cumulative_sums, 0, 0)
+    view_idx = np.array(
+        [
+            np.arange(cumulative_sums[ii], cumulative_sums[ii + 1])
+            for ii in range(n_views)
+        ]
+    )
+    n = np.sum(n_samples_list)
+
+    X = X_orig.copy()
+
+    # Draw warped coordinates from a GP
+    warp_kernel_params_true = np.array(
+        [np.log(kernel_variance), np.log(kernel_lengthscale)]
+    )
+
+    for vv in range(n_views):
+
+        for ss in range(n_spatial_dims):
+            X_curr_view_warped = mvnpy.rvs(
+                mean=X_orig_single[:, ss] * mean_slope + mean_intercept,
+                cov=kernel(X_orig_single, X_orig_single, warp_kernel_params_true),
+            )
+            # import ipdb; ipdb.set_trace()
+            X[
+                n_samples_per_view * vv : n_samples_per_view * (vv + 1), ss
+            ] = X_curr_view_warped
+
+    Y = np.concatenate([Y_orig_single] * n_views, axis=0)
+    Y += np.random.normal(scale=np.sqrt(noise_variance), size=Y.shape)
+
+    return X, Y, n_samples_list, view_idx
+
+
+def apply_gp_warp_multimodal(
+    X_orig_singles,
+    Y_orig_singles,
+    n_views,
+    # n_samples_per_view,
+    noise_variance=0.0,
+    kernel_variance=1.0,
+    kernel_lengthscale=1.0,
+    mean_slope=1.0,
+    mean_intercept=0.0,
+):
+    assert len(X_orig_singles) == len(Y_orig_singles)
+
+    n_modalities = len(X_orig_singles)
+
+    modality_idx = np.cumsum([x.shape[0] for x in X_orig_singles])
+    modality_idx = np.insert(modality_idx, 0, 0)
+
+    kernel = rbf_covariance
+    kernel_params_true = np.array([np.log(1.0), np.log(1.0)])
+
+    X_orig_single = np.concatenate(X_orig_singles, axis=0)
+
+    X_orig_single = X_orig_single - X_orig_single.min(0)
+    X_orig_single = X_orig_single / X_orig_single.max(0)
+    X_orig_single *= 10
+
+    X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0)
+
+    n_samples_per_view = X_orig_single.shape[0]
+    n_spatial_dims = X_orig_single.shape[1]
+
+    n_samples_list = [n_samples_per_view] * n_views
+    cumulative_sums = np.cumsum(n_samples_list)
+    cumulative_sums = np.insert(cumulative_sums, 0, 0)
+    view_idx = np.array(
+        [
+            np.arange(cumulative_sums[ii], cumulative_sums[ii + 1])
+            for ii in range(n_views)
+        ]
+    )
+    n = np.sum(n_samples_list)
+
+    X = X_orig.copy()
+
+    # Draw warped coordinates from a GP
+    warp_kernel_params_true = np.array(
+        [np.log(kernel_variance), np.log(kernel_lengthscale)]
+    )
+
+    for vv in range(n_views):
+
+        curr_idx = np.arange(n_samples_per_view * vv, n_samples_per_view * (vv + 1))
+
+        for ss in range(n_spatial_dims):
+            X_curr_view_warped = mvnpy.rvs(
+                mean=X_orig_single[:, ss] * mean_slope + mean_intercept,
+                cov=kernel(X_orig_single, X_orig_single, warp_kernel_params_true),
+            )
+            # import ipdb; ipdb.set_trace()
+            X[curr_idx, ss] = X_curr_view_warped
+
+    view_idx = np.cumsum([n_samples_per_view * vv for vv in range(n_views + 1)])
+
+    X_warped = []
+    Y_warped = []
+    n_samples_list = []
+    for mm in range(n_modalities):
+
+        curr_modality_idx = np.concatenate(
+            [
+                view_idx[vv] + np.arange(modality_idx[mm], modality_idx[mm + 1])
+                for vv in range(n_views)
+            ]
+        )
+        X_warped.append(X[curr_modality_idx])
+
+        Y_full_mm = np.concatenate([Y_orig_singles[mm]] * n_views, axis=0)
+        Y_full_mm += np.random.normal(
+            scale=np.sqrt(noise_variance), size=Y_full_mm.shape
+        )
+        Y_warped.append(Y_full_mm)
+        n_samples_list.append([X_orig_singles[mm].shape[0]] * n_views)
+
+    return X_warped, Y_warped, n_samples_list, view_idx
+
+
+def apply_linear_warp(
+    X_orig_single,
+    Y_orig_single,
+    n_views,
+    linear_slope_variance=0.1,
+    linear_intercept_variance=0.1,
+    noise_variance=0.01,
+    rotation=True,
+):
+
+    n_samples_per_view = X_orig_single.shape[0]
+    n_spatial_dims = X_orig_single.shape[1]
+
+    kernel = rbf_covariance
+    kernel_params_true = np.array([np.log(1.0), np.log(1.0)])
+
+    X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0)
+
+    n_samples_list = [n_samples_per_view] * n_views
+    cumulative_sums = np.cumsum(n_samples_list)
+    cumulative_sums = np.insert(cumulative_sums, 0, 0)
+    view_idx = np.array(
+        [
+            np.arange(cumulative_sums[ii], cumulative_sums[ii + 1])
+            for ii in range(n_views)
+        ]
+    )
+    n = np.sum(n_samples_list)
+
+    X = X_orig.copy()
+
+    for vv in range(n_views):
+
+        # curr_slopes = np.eye(n_spatial_dims) + np.random.normal(
+        #     loc=0,
+        #     scale=np.sqrt(linear_slope_variance),
+        #     size=(n_spatial_dims, n_spatial_dims),
+        # )
+        # import ipdb; ipdb.set_trace()
+        # curr_slopes /= np.linalg.norm(curr_slopes, ord=2, axis=0)
+        # curr_slopes = np.linalg.svd(curr_slopes)[0]
+
+        # curr_slopes = np.random.normal(
+        #     loc=1,
+        #     scale=np.sqrt(linear_slope_variance),
+        #     size=n_spatial_dims,
+        # )
+
+        # curr_intercepts = np.random.normal(
+        #     loc=0, scale=np.sqrt(linear_intercept_variance), size=n_spatial_dims
+        # )
+
+        curr_slopes = np.random.uniform(
+            low=1 - linear_slope_variance,
+            high=1 + linear_slope_variance,
+            size=n_spatial_dims,
+        )
+
+        curr_intercepts = np.random.uniform(
+            low=linear_intercept_variance,
+            high=linear_intercept_variance,
+            size=n_spatial_dims,
+        )
+        # print(curr_slopes, curr_intercepts)
+
+        X_curr_view_warped = X_orig_single * curr_slopes + curr_intercepts
+        X[
+            n_samples_per_view * vv : n_samples_per_view * (vv + 1), :
+        ] = X_curr_view_warped
+
+    Y = np.concatenate([Y_orig_single] * n_views, axis=0)
+    Y += np.random.normal(scale=np.sqrt(noise_variance), size=Y.shape)
+
+    return X, Y, n_samples_list, view_idx
+
+
+def apply_polar_warp(
+    X_orig_single,
+    Y_orig_single,
+    n_views,
+    linear_slope_variance=0.1,
+    linear_intercept_variance=0.1,
+    noise_variance=0.01,
+    rotation=True,
+):
+
+    n_samples_per_view = X_orig_single.shape[0]
+    n_spatial_dims = X_orig_single.shape[1]
+
+    kernel = rbf_covariance
+    kernel_params_true = np.array([np.log(1.0), np.log(1.0)])
+
+    X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0)
+
+    n_samples_list = [n_samples_per_view] * n_views
+    cumulative_sums = np.cumsum(n_samples_list)
+    cumulative_sums = np.insert(cumulative_sums, 0, 0)
+    view_idx = np.array(
+        [
+            np.arange(cumulative_sums[ii], cumulative_sums[ii + 1])
+            for ii in range(n_views)
+        ]
+    )
+    n = np.sum(n_samples_list)
+
+    X = X_orig.copy()
+
+    # no_distortion_B = np.array([
+    #         [0, np.pi * 0.5],
+    #         [0, 0]
+    #     ])
+
+    for vv in range(n_views):
+
+        # B = np.random.normal(
+        #     loc=0,
+        #     scale=np.sqrt(linear_slope_variance),
+        #     size=(n_spatial_dims, n_spatial_dims),
+        # )
+        B = np.random.uniform(
+            low=-linear_slope_variance,
+            high=linear_slope_variance,
+            size=(n_spatial_dims, n_spatial_dims),
+        )
+        polar_params = X_orig_single @ B
+        r, theta = polar_params[:, 0], polar_params[:, 1]
+        X_curr_view_warped = np.array(
+            [
+                X_orig_single[:, 0] + r * np.cos(theta),
+                X_orig_single[:, 1] + r * np.sin(theta),
+            ]
+        ).T
+        # import ipdb; ipdb.set_trace()
+        # additive_warp = B[:, 0] * X_orig_single * np.vstack([np.cos(X_orig_single[:, 0] * B[0, 1]), np.sin(X_orig_single[:, 1] * B[1, 1])]).T
+        # X_curr_view_warped = X_orig_single + additive_warp
+
+        # X_curr_view_warped = X_orig_single @ curr_slopes + curr_intercepts
+        X[
+            n_samples_per_view * vv : n_samples_per_view * (vv + 1), :
+        ] = X_curr_view_warped
+
+    Y = np.concatenate([Y_orig_single] * n_views, axis=0)
+    Y += np.random.normal(scale=np.sqrt(noise_variance), size=Y.shape)
+
+    return X, Y, n_samples_list, view_idx
+
+
+if __name__ == "__main__":
+
+    pass