# stdlib
import random
from typing import Tuple
# third party
import numpy as np
import torch
from scipy.special import expit
from scipy.stats import zscore
from omegaconf import DictConfig, OmegaConf
from src.iterpretability.utils import enable_reproducible_results
from abc import ABC, abstractmethod
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
EPS = 0
class SyntheticSimulatorBase:
"""
Data generation process base class.
"""
def __init__(
self,
seed: int = 42,
num_important_features: int = 10,
random_feature_selection: bool = True,
X: np.ndarray = None,
) -> None:
enable_reproducible_results(seed=seed)
self.seed = seed
# self.prog_mask, self.pred0_mask, self.pred1_mask, self.select_mask = None, None, None, None
# self.prog_weights, self.pred0_weights, self.pred1_weights, self.select_weights = None, None, None, None
# Get masks and weights
self.prog_mask, self.pred0_mask, self.pred1_mask, self.select_mask = self.get_important_features(
X, num_important_features, random_feature_selection
)
self.generate_uniform_weights(X)
def get_important_features(
self, X: np.ndarray, num_important_features: int
) -> Tuple:
raise NotImplementedError
def predict(self, X: np.ndarray) -> Tuple:
raise NotImplementedError
def simulate_dataset(
self,
X: np.ndarray,
predictive_scale: float = 1,
scale_factor: float = 1,
treatment_assign: str = "random",
noise: bool = True,
err_std: float = 0.1,
prop_scale: float = 1,
addvar_scale: float = 0.1,
binary_outcome: bool = False,
) -> Tuple:
enable_reproducible_results(self.seed)
self.scale_factor = scale_factor
self.predictive_scale = predictive_scale
self.treatment_assign = treatment_assign
self.noise = noise
self.err_std = err_std
self.prop_scale = prop_scale
self.addvar_scale = addvar_scale
self.top_idx = None
# Get evaluations of modeled mechanisms
prog, pred0, pred1, prop = self.predict(X)
# Model Propensity/Policy
if self.treatment_assign == "selective":
prop_score = zscore(prop)
propensity = expit(self.prop_scale * prop_score)
elif self.treatment_assign == "random":
# randomly assign treatments
propensity = 0.5 * self.prop_scale * np.ones(len(X))
elif self.treatment_assign == "prog":
# assign treatments according to prognostic score ('self-selection')
# compute normalized prognostic score to get propensity score
prog_score = zscore(prog)
propensity = expit(self.prop_scale * prog_score)
elif self.treatment_assign == "pred":
# assign treatments according to predictive score ('doctor knows CATE')
# compute normalized predictive score to get propensity score
pred_score = zscore(pred1 - pred0)
propensity = expit(self.prop_scale * pred_score)
elif self.treatment_assign == "linear":
# assign treatment according to random linear predictor as in GANITE
# simulate coefficient
coef = np.random.uniform(-0.01, 0.01, size=[X.shape[1], 1])
exponent = zscore(np.matmul(X, coef)).reshape(
X.shape[0],
)
propensity = expit(self.prop_scale * exponent)
elif self.treatment_assign == "irrelevant_var":
all_act = self.prog_weights + self.pred1_weights + self.pred0_weights
top_idx = np.argmin(np.abs(all_act)) # chooses an index with 0 weight
self.top_idx = top_idx
exponent = zscore(X[:, top_idx]).reshape(
X.shape[0],
)
propensity = expit(self.prop_scale * exponent)
else:
raise ValueError(
f"{self.treatment_assign} is not a valid treatment assignment mechanism."
)
W_synth = np.random.binomial(1, p=propensity)
po0 = self.scale_factor * (prog + self.predictive_scale * pred0)
po1 = self.scale_factor * (prog + self.predictive_scale * pred1)
error = 0
if self.noise:
error = (
torch.empty(len(X))
.normal_(std=self.err_std)
.squeeze()
.detach()
.cpu()
.numpy()
)
Y_synth = W_synth * po1 + (1 - W_synth) * po0 + error
if binary_outcome:
Y_prob = expit(2 * (Y_synth - np.mean(Y_synth)) / np.std(Y_synth))
Y_synth = np.random.binomial(1, Y_prob)
return X, W_synth, Y_synth, po0, po1, propensity
def po0(self, X: np.ndarray) -> np.ndarray:
prog_factor, pred0_factor, _, _ = self.predict(X)
po0 = self.scale_factor * (prog_factor + self.predictive_scale * pred0_factor)
return po0
def po1(self, X: np.ndarray) -> np.ndarray:
prog_factor, _, pred1_factor, _ = self.predict(X)
po1 = self.scale_factor * (prog_factor + self.predictive_scale * pred1_factor)
return po1
def te(self, X: np.ndarray) -> np.ndarray:
_, pred0_factor, pred1_factor, _ = self.predict(X)
te = self.scale_factor * self.predictive_scale * (pred1_factor - pred0_factor)
return te
def prop(self, X: np.ndarray) -> np.ndarray:
prog, pred0, pred1, prop = self.predict(X)
# Model Propensity/Policy
if self.treatment_assign == "selective":
prop_score = zscore(prop)
propensity = expit(self.prop_scale * prop_score)
elif self.treatment_assign == "random":
# randomly assign treatments
propensity = 0.5 * self.prop_scale * np.ones(len(X))
elif self.treatment_assign == "prog":
# assign treatments according to prognostic score ('self-selection')
# compute normalized prognostic score to get propensity score
prog_score = zscore(prog)
propensity = expit(self.prop_scale * prog_score)
elif self.treatment_assign == "pred":
# assign treatments according to predictive score ('doctor knows CATE')
# compute normalized predictive score to get propensity score
pred_score = zscore(pred1 - pred0)
propensity = expit(self.prop_scale * pred_score)
elif self.treatment_assign == "linear":
# assign treatment according to random linear predictor as in GANITE
# simulate coefficient
coef = np.random.uniform(-0.01, 0.01, size=[X.shape[1], 1])
exponent = zscore(np.matmul(X, coef)).reshape(
X.shape[0],
)
propensity = expit(self.prop_scale * exponent)
elif self.treatment_assign == "irrelevant_var":
all_act = self.prog_weights + self.pred1_weights + self.pred0_weights
top_idx = np.argmin(np.abs(all_act)) # chooses an index with 0 weight
self.top_idx = top_idx
exponent = zscore(X[:, top_idx]).reshape(
X.shape[0],
)
propensity = expit(self.prop_scale * exponent)
else:
raise ValueError(
f"{self.treatment_assign} is not a valid treatment assignment mechanism."
)
return propensity
def prog(self, X: np.ndarray) -> np.ndarray:
prog_factor, _, _, _ = self.predict(X)
return self.scale_factor * prog_factor
def get_all_important_features(self, with_selective=False) -> np.ndarray:
if with_selective:
all_important_features = np.union1d(self.get_predictive_features(), self.get_prognostic_features())
all_important_features = np.union1d(all_important_features, self.get_selective_features())
else:
all_important_features = np.union1d(
self.get_predictive_features(), self.get_prognostic_features()
)
return all_important_features
def get_predictive_features(self) -> np.ndarray:
pred_features = np.union1d(
np.where((self.pred0_mask).astype(np.int32) != 0)[0],
np.where((self.pred1_mask).astype(np.int32) != 0)[0],
)
return pred_features
def get_prognostic_features(self) -> np.ndarray:
prog_features = np.where((self.prog_mask).astype(np.int32) != 0)
return prog_features
def get_selective_features(self) -> np.ndarray:
select_features = np.where((self.select_mask).astype(np.int32) != 0)
return select_features
def get_important_features(
self,
X: np.ndarray,
num_important_features: int,
random_feature_selection: bool = True,
) -> Tuple:
assert num_important_features <= int(X.shape[1] / 4)
prog_mask = np.zeros(shape=(X.shape[1]))
pred0_mask = np.zeros(shape=(X.shape[1]))
pred1_mask = np.zeros(shape=(X.shape[1]))
select_mask = np.zeros(shape=(X.shape[1]))
all_indices = np.array(range(X.shape[1]))
if random_feature_selection:
np.random.shuffle(all_indices)
prog_indices = all_indices[:num_important_features]
pred0_indices = all_indices[
num_important_features : (2 * num_important_features)
]
pred1_indices = all_indices[
(2 * num_important_features) : (3 * num_important_features)
]
select_indices = all_indices[
(3 * num_important_features) : (4 * num_important_features)
]
prog_mask[prog_indices] = 1
pred0_mask[pred0_indices] = 1
pred1_mask[pred1_indices] = 1
select_mask[select_indices] = 1
return prog_mask, pred0_mask, pred1_mask, select_mask
def get_weights(self):
return self.prog_weights, self.pred0_weights, self.pred1_weights, self.select_weights
def generate_uniform_weights(self, X: np.ndarray):
self.prog_weights = (
np.random.uniform(-1, 1, size=(X.shape[1])) * self.prog_mask
)
self.pred0_weights = (
np.random.uniform(-1, 1, size=(X.shape[1])) * self.pred0_mask
)
self.pred1_weights = (
np.random.uniform(-1, 1, size=(X.shape[1])) * self.pred1_mask
)
self.select_weights = (
np.random.uniform(-1, 1, size=(X.shape[1])) * self.select_mask
)
class SyntheticSimulatorLinear(SyntheticSimulatorBase):
"""
Data generation process for linear outcomes.
Args:
X: np.ndarray/pd.DataFrame
Baseline covariates
num_important_features: Number of features that contribute to EACH outcome (prog, pred0 and pred1)
random_feature_selection: Whether to select features randomly.
seed: int
Random seed
Returns:
X: the same set of covariates
W_synth: simulated treatments
Y_synth: simulated outcomes
po0, po1: potential outcomes for X
propensity: propensity scores
"""
def __init__(
self,
X: np.ndarray,
num_important_features: int = 10,
random_feature_selection: bool = True,
seed: int = 42,
) -> None:
super(SyntheticSimulatorLinear, self).__init__(seed=seed,
X=X,
num_important_features=num_important_features,
random_feature_selection=random_feature_selection)
def predict(self, X: np.ndarray) -> Tuple:
prog = np.dot(X, self.prog_weights)
pred0 = np.dot(X, self.pred0_weights)
pred1 = np.dot(X, self.pred1_weights)
prop = np.dot(X, self.select_weights)
return prog, pred0, pred1, prop
class SyntheticSimulatorModulatedNonLinear(SyntheticSimulatorBase):
"""
Data generation process for non-linear outcomes.
Args:
X: np.ndarray/pd.DataFrame
Baseline covariates
num_important_features: Number of features that contribute to EACH outcome (prog, pred0 and pred1)
random_feature_selection: Whether to select features randomly.
seed: int
Random seed
Returns:
X: the same set of covariates
W_synth: simulated treatments
Y_synth: simulated outcomes
po0, po1: potential outcomes for X
propensity: propensity scores
"""
nonlinearities = [
lambda x: np.abs(x),
lambda x: np.exp(-(x**2) / 2),
lambda x: 1 / (1 + x**2),
lambda x: np.cos(x),
lambda x: np.arctan(x),
lambda x: np.tanh(x),
lambda x: np.sin(x),
lambda x: np.log(1 + x**2),
lambda x: np.sqrt(1 + x**2),
lambda x: np.cosh(x),
]
def __init__(
self,
X: np.ndarray,
non_linearity_scale: float,
num_important_features: int = 10,
selection_type: str = "random",
random_feature_selection: bool = True,
seed: int = 42,
) -> None:
super(SyntheticSimulatorModulatedNonLinear, self).__init__(seed=seed,
X=X,
num_important_features=num_important_features,
random_feature_selection=random_feature_selection)
assert selection_type in {"random"}
assert 0 <= non_linearity_scale <= 1
self.selection_type = selection_type
self.random_feature_selection = random_feature_selection
self.non_linearity_scale = non_linearity_scale
(
self.prog_nonlin,
self.pred0_nonlin,
self.pred1_nonlin,
self.select_nonlin,
) = self.sample_nonlinearities()
def predict(self, X: np.ndarray) -> Tuple:
prog_lin = np.dot(X, self.prog_weights)
pred0_lin = np.dot(X, self.pred0_weights)
pred1_lin = np.dot(X, self.pred1_weights)
prop_lin = np.dot(X, self.select_weights)
prog = (
1 - self.non_linearity_scale
) * prog_lin + self.non_linearity_scale * self.prog_nonlin(prog_lin)
pred0 = (
1 - self.non_linearity_scale
) * pred0_lin + self.non_linearity_scale * self.pred0_nonlin(pred0_lin)
pred1 = (
1 - self.non_linearity_scale
) * pred1_lin + self.non_linearity_scale * self.pred1_nonlin(pred1_lin)
prop = (
1 - self.non_linearity_scale
) * prop_lin + self.non_linearity_scale * self.select_nonlin(prop_lin)
return prog, pred0, pred1, prop
def sample_nonlinearities(self):
random.seed(self.seed)
if self.selection_type == "random":
return random.choices(population=self.nonlinearities, k=4)
else:
raise ValueError(f"Unknown selection type {self.selection_type}.")