--- a +++ b/data/warps.py @@ -0,0 +1,309 @@ +import numpy as np +import matplotlib.pyplot as plt +import pandas as pd + +import seaborn as sns +import sys + +sys.path.append("../..") + + +# from gp_functions import rbf_covariance +from gpsa.util import rbf_kernel_numpy as rbf_covariance +from gpsa import polar_warp +from scipy.stats import multivariate_normal as mvnpy + + +def apply_gp_warp( + X_orig_single, + Y_orig_single, + n_views, + # n_samples_per_view, + noise_variance=0.0, + kernel_variance=1.0, + kernel_lengthscale=1.0, + mean_slope=1.0, + mean_intercept=0.0, +): + + n_samples_per_view = X_orig_single.shape[0] + n_spatial_dims = X_orig_single.shape[1] + + kernel = rbf_covariance + kernel_params_true = np.array([np.log(1.0), np.log(1.0)]) + + X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0) + + n_samples_list = [n_samples_per_view] * n_views + cumulative_sums = np.cumsum(n_samples_list) + cumulative_sums = np.insert(cumulative_sums, 0, 0) + view_idx = np.array( + [ + np.arange(cumulative_sums[ii], cumulative_sums[ii + 1]) + for ii in range(n_views) + ] + ) + n = np.sum(n_samples_list) + + X = X_orig.copy() + + # Draw warped coordinates from a GP + warp_kernel_params_true = np.array( + [np.log(kernel_variance), np.log(kernel_lengthscale)] + ) + + for vv in range(n_views): + + for ss in range(n_spatial_dims): + X_curr_view_warped = mvnpy.rvs( + mean=X_orig_single[:, ss] * mean_slope + mean_intercept, + cov=kernel(X_orig_single, X_orig_single, warp_kernel_params_true), + ) + # import ipdb; ipdb.set_trace() + X[ + n_samples_per_view * vv : n_samples_per_view * (vv + 1), ss + ] = X_curr_view_warped + + Y = np.concatenate([Y_orig_single] * n_views, axis=0) + Y += np.random.normal(scale=np.sqrt(noise_variance), size=Y.shape) + + return X, Y, n_samples_list, view_idx + + +def apply_gp_warp_multimodal( + X_orig_singles, + Y_orig_singles, + n_views, + # n_samples_per_view, + noise_variance=0.0, + kernel_variance=1.0, + kernel_lengthscale=1.0, + mean_slope=1.0, + mean_intercept=0.0, +): + assert len(X_orig_singles) == len(Y_orig_singles) + + n_modalities = len(X_orig_singles) + + modality_idx = np.cumsum([x.shape[0] for x in X_orig_singles]) + modality_idx = np.insert(modality_idx, 0, 0) + + kernel = rbf_covariance + kernel_params_true = np.array([np.log(1.0), np.log(1.0)]) + + X_orig_single = np.concatenate(X_orig_singles, axis=0) + + X_orig_single = X_orig_single - X_orig_single.min(0) + X_orig_single = X_orig_single / X_orig_single.max(0) + X_orig_single *= 10 + + X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0) + + n_samples_per_view = X_orig_single.shape[0] + n_spatial_dims = X_orig_single.shape[1] + + n_samples_list = [n_samples_per_view] * n_views + cumulative_sums = np.cumsum(n_samples_list) + cumulative_sums = np.insert(cumulative_sums, 0, 0) + view_idx = np.array( + [ + np.arange(cumulative_sums[ii], cumulative_sums[ii + 1]) + for ii in range(n_views) + ] + ) + n = np.sum(n_samples_list) + + X = X_orig.copy() + + # Draw warped coordinates from a GP + warp_kernel_params_true = np.array( + [np.log(kernel_variance), np.log(kernel_lengthscale)] + ) + + for vv in range(n_views): + + curr_idx = np.arange(n_samples_per_view * vv, n_samples_per_view * (vv + 1)) + + for ss in range(n_spatial_dims): + X_curr_view_warped = mvnpy.rvs( + mean=X_orig_single[:, ss] * mean_slope + mean_intercept, + cov=kernel(X_orig_single, X_orig_single, warp_kernel_params_true), + ) + # import ipdb; ipdb.set_trace() + X[curr_idx, ss] = X_curr_view_warped + + view_idx = np.cumsum([n_samples_per_view * vv for vv in range(n_views + 1)]) + + X_warped = [] + Y_warped = [] + n_samples_list = [] + for mm in range(n_modalities): + + curr_modality_idx = np.concatenate( + [ + view_idx[vv] + np.arange(modality_idx[mm], modality_idx[mm + 1]) + for vv in range(n_views) + ] + ) + X_warped.append(X[curr_modality_idx]) + + Y_full_mm = np.concatenate([Y_orig_singles[mm]] * n_views, axis=0) + Y_full_mm += np.random.normal( + scale=np.sqrt(noise_variance), size=Y_full_mm.shape + ) + Y_warped.append(Y_full_mm) + n_samples_list.append([X_orig_singles[mm].shape[0]] * n_views) + + return X_warped, Y_warped, n_samples_list, view_idx + + +def apply_linear_warp( + X_orig_single, + Y_orig_single, + n_views, + linear_slope_variance=0.1, + linear_intercept_variance=0.1, + noise_variance=0.01, + rotation=True, +): + + n_samples_per_view = X_orig_single.shape[0] + n_spatial_dims = X_orig_single.shape[1] + + kernel = rbf_covariance + kernel_params_true = np.array([np.log(1.0), np.log(1.0)]) + + X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0) + + n_samples_list = [n_samples_per_view] * n_views + cumulative_sums = np.cumsum(n_samples_list) + cumulative_sums = np.insert(cumulative_sums, 0, 0) + view_idx = np.array( + [ + np.arange(cumulative_sums[ii], cumulative_sums[ii + 1]) + for ii in range(n_views) + ] + ) + n = np.sum(n_samples_list) + + X = X_orig.copy() + + for vv in range(n_views): + + # curr_slopes = np.eye(n_spatial_dims) + np.random.normal( + # loc=0, + # scale=np.sqrt(linear_slope_variance), + # size=(n_spatial_dims, n_spatial_dims), + # ) + # import ipdb; ipdb.set_trace() + # curr_slopes /= np.linalg.norm(curr_slopes, ord=2, axis=0) + # curr_slopes = np.linalg.svd(curr_slopes)[0] + + # curr_slopes = np.random.normal( + # loc=1, + # scale=np.sqrt(linear_slope_variance), + # size=n_spatial_dims, + # ) + + # curr_intercepts = np.random.normal( + # loc=0, scale=np.sqrt(linear_intercept_variance), size=n_spatial_dims + # ) + + curr_slopes = np.random.uniform( + low=1 - linear_slope_variance, + high=1 + linear_slope_variance, + size=n_spatial_dims, + ) + + curr_intercepts = np.random.uniform( + low=linear_intercept_variance, + high=linear_intercept_variance, + size=n_spatial_dims, + ) + # print(curr_slopes, curr_intercepts) + + X_curr_view_warped = X_orig_single * curr_slopes + curr_intercepts + X[ + n_samples_per_view * vv : n_samples_per_view * (vv + 1), : + ] = X_curr_view_warped + + Y = np.concatenate([Y_orig_single] * n_views, axis=0) + Y += np.random.normal(scale=np.sqrt(noise_variance), size=Y.shape) + + return X, Y, n_samples_list, view_idx + + +def apply_polar_warp( + X_orig_single, + Y_orig_single, + n_views, + linear_slope_variance=0.1, + linear_intercept_variance=0.1, + noise_variance=0.01, + rotation=True, +): + + n_samples_per_view = X_orig_single.shape[0] + n_spatial_dims = X_orig_single.shape[1] + + kernel = rbf_covariance + kernel_params_true = np.array([np.log(1.0), np.log(1.0)]) + + X_orig = np.concatenate([X_orig_single.copy()] * n_views, axis=0) + + n_samples_list = [n_samples_per_view] * n_views + cumulative_sums = np.cumsum(n_samples_list) + cumulative_sums = np.insert(cumulative_sums, 0, 0) + view_idx = np.array( + [ + np.arange(cumulative_sums[ii], cumulative_sums[ii + 1]) + for ii in range(n_views) + ] + ) + n = np.sum(n_samples_list) + + X = X_orig.copy() + + # no_distortion_B = np.array([ + # [0, np.pi * 0.5], + # [0, 0] + # ]) + + for vv in range(n_views): + + # B = np.random.normal( + # loc=0, + # scale=np.sqrt(linear_slope_variance), + # size=(n_spatial_dims, n_spatial_dims), + # ) + B = np.random.uniform( + low=-linear_slope_variance, + high=linear_slope_variance, + size=(n_spatial_dims, n_spatial_dims), + ) + polar_params = X_orig_single @ B + r, theta = polar_params[:, 0], polar_params[:, 1] + X_curr_view_warped = np.array( + [ + X_orig_single[:, 0] + r * np.cos(theta), + X_orig_single[:, 1] + r * np.sin(theta), + ] + ).T + # import ipdb; ipdb.set_trace() + # additive_warp = B[:, 0] * X_orig_single * np.vstack([np.cos(X_orig_single[:, 0] * B[0, 1]), np.sin(X_orig_single[:, 1] * B[1, 1])]).T + # X_curr_view_warped = X_orig_single + additive_warp + + # X_curr_view_warped = X_orig_single @ curr_slopes + curr_intercepts + X[ + n_samples_per_view * vv : n_samples_per_view * (vv + 1), : + ] = X_curr_view_warped + + Y = np.concatenate([Y_orig_single] * n_views, axis=0) + Y += np.random.normal(scale=np.sqrt(noise_variance), size=Y.shape) + + return X, Y, n_samples_list, view_idx + + +if __name__ == "__main__": + + pass