--- a +++ b/catenets/experiment_utils/simulation_utils.py @@ -0,0 +1,322 @@ +""" +Simulation utils, allowing to flexibly consider different DGPs +""" +# Author: Alicia Curth +from typing import Any, Optional, Tuple + +import numpy as np +from scipy.special import expit + + +def simulate_treatment_setup( + n: int, + d: int = 25, + n_w: int = 0, + n_c: int = 0, + n_o: int = 0, + n_t: int = 0, + covariate_model: Any = None, + covariate_model_params: Optional[dict] = None, + propensity_model: Any = None, + propensity_model_params: Optional[dict] = None, + mu_0_model: Any = None, + mu_0_model_params: Optional[dict] = None, + mu_1_model: Any = None, + mu_1_model_params: Optional[dict] = None, + error_sd: float = 1, + seed: int = 42, +) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + """ + Generic function to flexibly simulate a treatment setup. + + Parameters + ---------- + n: int + Number of observations to generate + d: int + dimension of X to generate + n_o: int + Dimension of outcome-factor + n_c: int + Dimension of confounding factor + n_t: int + Dimension of purely predictive variables (support of tau(x) + n_w: int + Dimension of treatment assignment factor + covariate_model: + Model to generate covariates. Default: multivariate normal + covariate_model_params: dict + Additional parameters to pass to covariate model + propensity_model: + Model to generate propensity scores + propensity_model_params: + Additional parameters to pass to propensity model + mu_0_model: + Model to generate untreated outcomes + mu_0_model_params: + Additional parameters to pass to untreated outcome model + mu_1_model: + Model to generate treated outcomes. + mu_1_model_params: + Additional parameters to pass to treated outcome model + error_sd: float, default 1 + Standard deviation of normal errors + seed: int + Seed + + Returns + ------- + X, y, w, p, t - Covariates, observed outcomes, treatment indicators, propensities, CATE + """ + # input checks + n_nuisance = d - (n_c + n_o + n_w + n_t) + if n_nuisance < 0: + raise ValueError("Dimensions should add up to maximally d.") + + # set defaults + if covariate_model is None: + covariate_model = normal_covariate_model + + if covariate_model_params is None: + covariate_model_params = {} + + if propensity_model is None: + propensity_model = propensity_AISTATS + + if propensity_model_params is None: + propensity_model_params = {} + + if mu_0_model is None: + mu_0_model = mu0_AISTATS + + if mu_0_model_params is None: + mu_0_model_params = {} + + if mu_1_model is None: + mu_1_model = mu1_AISTATS + + if mu_1_model_params is None: + mu_1_model_params = {} + + np.random.seed(seed) + + # generate data and outcomes + X = covariate_model( + n=n, + n_nuisance=n_nuisance, + n_c=n_c, + n_o=n_o, + n_w=n_w, + n_t=n_t, + **covariate_model_params + ) + mu_0 = mu_0_model(X, n_c=n_c, n_o=n_o, n_w=n_w, **mu_0_model_params) + mu_1 = mu_1_model( + X, n_c=n_c, n_o=n_o, n_w=n_w, n_t=n_t, mu_0=mu_0, **mu_1_model_params + ) + t = mu_1 - mu_0 + + # generate treatments + p = propensity_model(X, n_c=n_c, n_w=n_w, **propensity_model_params) + w = np.random.binomial(1, p=p) + + # generate observables + y = w * mu_1 + (1 - w) * mu_0 + np.random.normal(0, error_sd, n) + + return X, y, w, p, t + + +# normal covariate model (Adapted from Hassanpour & Greiner, 2020) ------------- +def get_multivariate_normal_params( + m: int, correlated: bool = False +) -> Tuple[np.ndarray, np.ndarray]: + # Adapted from Hassanpour & Greiner (2020) + if correlated: + mu = np.zeros(m) # np.random.normal(size=m)/10 + temp = np.random.uniform(size=(m, m)) + temp = 0.5 * (np.transpose(temp) + temp) + sig = (np.ones((m, m)) - np.eye(m)) * temp / 10 + 0.5 * np.eye( + m + ) # (temp + m * np.eye(m)) / 10 + + else: + mu = np.zeros(m) + sig = np.eye(m) + + return mu, sig + + +def get_set_normal_covariates(m: int, n: int, correlated: bool = False) -> np.ndarray: + if m == 0: + return + mu, sig = get_multivariate_normal_params(m, correlated=correlated) + return np.random.multivariate_normal(mean=mu, cov=sig, size=n) + + +def normal_covariate_model( + n: int, + n_nuisance: int = 25, + n_c: int = 0, + n_o: int = 0, + n_w: int = 0, + n_t: int = 0, + correlated: bool = False, +) -> np.ndarray: + X_stack: Tuple = () + for n_x in [n_w, n_c, n_o, n_t, n_nuisance]: + if n_x > 0: + X_stack = (*X_stack, get_set_normal_covariates(n_x, n, correlated)) + + return np.hstack(X_stack) + + +def propensity_AISTATS( + X: np.ndarray, + n_c: int = 0, + n_w: int = 0, + xi: float = 0.5, + nonlinear: bool = True, + offset: Any = 0, + target_prop: Optional[np.ndarray] = None, +) -> np.ndarray: + if n_c + n_w == 0: + # constant propensity + return xi * np.ones(X.shape[0]) + else: + coefs = np.ones(n_c + n_w) + + if nonlinear: + z = np.dot(X[:, : (n_c + n_w)] ** 2, coefs) / (n_c + n_w) + else: + z = np.dot(X[:, : (n_c + n_w)], coefs) / (n_c + n_w) + + if type(offset) is float or type(offset) is int: + prop = expit(xi * z + offset) + if target_prop is not None: + avg_prop = np.average(prop) + prop = target_prop / avg_prop * prop + return prop + elif offset == "center": + # center the propensity scores to median 0.5 + prop = expit(xi * (z - np.median(z))) + if target_prop is not None: + avg_prop = np.average(prop) + prop = target_prop / avg_prop * prop + return prop + else: + raise ValueError("Not a valid value for offset") + + +def propensity_constant( + X: np.ndarray, n_c: int = 0, n_w: int = 0, xi: float = 0.5 +) -> np.ndarray: + return xi * np.ones(X.shape[0]) + + +def mu0_AISTATS( + X: np.ndarray, n_w: int = 0, n_c: int = 0, n_o: int = 0, scale: bool = False +) -> np.ndarray: + if n_c + n_o == 0: + return np.zeros((X.shape[0])) + else: + if not scale: + coefs = np.ones(n_c + n_o) + else: + coefs = 10 * np.ones(n_c + n_o) / (n_c + n_o) + return np.dot(X[:, n_w : (n_w + n_c + n_o)] ** 2, coefs) + + +def mu1_AISTATS( + X: np.ndarray, + n_w: int = 0, + n_c: int = 0, + n_o: int = 0, + n_t: int = 0, + mu_0: Optional[np.ndarray] = None, + nonlinear: int = 2, + withbase: bool = True, + scale: bool = False, +) -> np.ndarray: + if n_t == 0: + return mu_0 + # use additive effect + else: + if scale: + coefs = 10 * np.ones(n_t) / n_t + else: + coefs = np.ones(n_t) + X_sel = X[:, (n_w + n_c + n_o) : (n_w + n_c + n_o + n_t)] + if withbase: + return mu_0 + np.dot(X_sel ** nonlinear, coefs) + else: + return np.dot(X_sel ** nonlinear, coefs) + + +# Other simulation settings not used in AISTATS paper +# uniform covariate model +def uniform_covariate_model( + n: int, + n_nuisance: int = 0, + n_c: int = 0, + n_o: int = 0, + n_w: int = 0, + n_t: int = 0, + low: int = -1, + high: int = 1, +) -> np.ndarray: + d = n_nuisance + n_c + n_o + n_w + n_t + return np.random.uniform(low=low, high=high, size=(n, d)) + + +def mu1_additive( + X: np.ndarray, + n_w: int = 0, + n_c: int = 0, + n_o: int = 0, + n_t: int = 0, + mu_0: Optional[np.ndarray] = None, +) -> np.ndarray: + if n_t == 0: + return mu_0 + else: + coefs = np.random.normal(size=n_t) + return np.dot(X[:, (n_w + n_c + n_o) : (n_w + n_c + n_o + n_t)], coefs) / n_t + + +# regression surfaces from Hassanpour & Greiner +def mu0_hg(X: np.ndarray, n_w: int = 0, n_c: int = 0, n_o: int = 0) -> np.ndarray: + if n_c + n_o == 0: + return np.zeros((X.shape[0])) + else: + coefs = np.random.normal(size=n_c + n_o) + return np.dot(X[:, n_w : (n_w + n_c + n_o)], coefs) / (n_c + n_o) + + +def mu1_hg( + X: np.ndarray, + n_w: int = 0, + n_c: int = 0, + n_o: int = 0, + n_t: int = 0, + mu_0: Optional[np.ndarray] = None, +) -> np.ndarray: + if n_c + n_o == 0: + return np.zeros((X.shape[0])) + else: + coefs = np.random.normal(size=n_c + n_o) + return np.dot(X[:, n_w : (n_w + n_c + n_o)] ** 2, coefs) / (n_c + n_o) + + +def propensity_hg( + X: np.ndarray, n_c: int = 0, n_w: int = 0, xi: Optional[float] = None +) -> np.ndarray: + # propensity set-up used in Hassanpour & Greiner (2020) + if n_c + n_w == 0: + return 0.5 * np.ones(X.shape[0]) + else: + if xi is None: + xi = 1 + + coefs = np.random.normal(size=n_c + n_w) + z = np.dot(X[:, : (n_c + n_w)], coefs) + return expit(xi * z)