"""
Functions for loading data.
Loads raw data into several "experiences" (tasks)
for continual learning training scenario.
Tasks split by given demographic.
Loads:
- MIMIC-III - ICU time-series data
- eICU-CRD - ICU time-series data
- random - sequential data
"""
import copy
import json
import pandas as pd
import numpy as np
from pathlib import Path
import torch
import sparse
import random
from avalanche.benchmarks.generators import tensors_benchmark
DATA_DIR = Path(__file__).parents[2] / "data"
# Reproducibility
SEED = 12345
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
# JA: Save as .json?
DEMO_COL_PREFIXES = {
"mimic3": {
"sex": "GENDER_value:F",
"age": "AGE_value:",
"ethnicity": "ETHNICITY_value:",
"ethnicity_coarse": "ETHNICITY_COARSE_value:",
"ward": "FIRST_CAREUNIT_value:",
},
"eicu": {
"sex": "gender_value:",
"age": "age_value:",
"ethnicity": "ethnicity_value:",
"hospital": "hospitalid_value:",
"ward": "unittype_value:",
},
}
########################
# Simulated (random) DATA
########################
def random_data(seq_len=48, n_vars=6, n_tasks=3, n_samples=150, p_outcome=0.1):
"""
Returns a sequence of random sequential data and associated binary targets.
"""
tasks = [
(
torch.randn(n_samples, seq_len, n_vars),
(torch.rand(n_samples) < p_outcome).long(),
)
for _ in range(n_tasks)
]
return tasks
#######################
# ALL
#######################
def cache_processed_dataset():
# Given dataset/demo/outcome
# Create train and val, and train and test datasets
# Save as numpy arrays in data/preprocessed/dataset/outcome/demo
# Load numpy arrays
return NotImplementedError
def load_data(data, demo, outcome, validate=False):
"""
Data of form:
(
x:(samples, variables, time_steps),
y:(outcome,)
)
"""
# JA: Implement "Save tensor as .np object" on first load, load local copy if exists
if data == "random":
experiences = random_data()
test_experiences = copy.deepcopy(experiences)
weights = None
elif data in ("mimic3", "eicu"):
tasks = split_tasks_fiddle(data, demo, outcome)
experiences, test_experiences = split_trainvaltest_fiddle(
tasks, print_task_partitions=not validate
)
experiences = [
(torch.FloatTensor(feat), torch.LongTensor(target))
for feat, target in experiences
]
test_experiences = [
(torch.FloatTensor(feat), torch.LongTensor(target))
for feat, target in test_experiences
]
# Class weights for balancing
class1_count = sum(experiences[0][1]) + sum(experiences[1][1])
class0_count = len(experiences[0][1]) + len(experiences[1][1]) - class1_count
weights = class1_count / torch.LongTensor([class0_count, class1_count])
if validate:
experiences = experiences[:2]
test_experiences = test_experiences[:2]
else:
# Cap n tasks
experiences = experiences[:20]
test_experiences = test_experiences[:20]
# Do not use validation sets for training
if not validate and len(experiences) > 5:
experiences = experiences[2:]
test_experiences = test_experiences[2:]
n_tasks = len(experiences)
n_timesteps = experiences[0][0].shape[-2]
n_channels = experiences[0][0].shape[-1]
scenario = tensors_benchmark(
train_tensors=experiences,
test_tensors=test_experiences,
task_labels=[0 for _ in range(n_tasks)], # Task label of each train exp
complete_test_set_only=False,
)
# JA: Investigate from avalanche.benchmarks.utils.avalanche_dataset import AvalancheDataset
return scenario, n_tasks, n_timesteps, n_channels, weights
##########
# FIDDLE
##########
def get_ethnicity_coarse(data, outcome):
"""
MIMIC-3 has detailed ethnicity values, but some of these groups have no mortality data.
Hence create broader groups to get better binary class balance of tasks.
"""
features_X, features_s, X_feature_names, s_feature_names, df_outcome = load_fiddle(
data=data, outcome=outcome
)
eth_map = {}
eth_map["ETHNICITY_COARSE_value:WHITE"] = [
c for c in s_feature_names if c.startswith("ETHNICITY_value:WHITE")
]
eth_map["ETHNICITY_COARSE_value:ASIAN"] = [
c for c in s_feature_names if c.startswith("ETHNICITY_value:ASIAN")
]
eth_map["ETHNICITY_COARSE_value:BLACK"] = [
c for c in s_feature_names if c.startswith("ETHNICITY_value:BLACK")
]
eth_map["ETHNICITY_COARSE_value:HISPA"] = [
c for c in s_feature_names if c.startswith("ETHNICITY_value:HISPANIC")
]
eth_map["ETHNICITY_COARSE_value:OTHER"] = [
c
for c in s_feature_names
if c.startswith("ETHNICITY_value:")
and c
not in eth_map["ETHNICITY_COARSE_value:WHITE"]
+ eth_map["ETHNICITY_COARSE_value:BLACK"]
+ eth_map["ETHNICITY_COARSE_value:ASIAN"]
+ eth_map["ETHNICITY_COARSE_value:HISPA"]
]
for k, cols in eth_map.items():
s_feature_names.append(k)
idx = [s_feature_names.index(col) for col in cols]
features_s = np.append(
features_s, features_s[:, idx].any(axis=1)[:, np.newaxis], axis=1
)
return features_X, features_s, X_feature_names, s_feature_names, df_outcome
def recover_admission_time(data, outcome):
"""
Function to recover datetime info for admission from FIDDLE.
"""
*_, df_outcome = load_fiddle(data, outcome)
df_outcome["SUBJECT_ID"] = (
df_outcome["stay"].str.split("_", expand=True)[0].astype(int)
)
df_outcome["stay_number"] = (
df_outcome["stay"]
.str.split("_", expand=True)[1]
.str.replace("episode", "")
.astype(int)
)
# load original MIMIC-III csv
df_mimic = pd.read_csv(
DATA_DIR / "FIDDLE_mimic3" / "ADMISSIONS.csv", parse_dates=["ADMITTIME"]
)
# grab quarter (season) from data and id
df_mimic["quarter"] = df_mimic["ADMITTIME"].dt.quarter
admission_group = df_mimic.sort_values("ADMITTIME").groupby("SUBJECT_ID")
df_mimic["stay_number"] = admission_group.cumcount() + 1
df_mimic = df_mimic[["SUBJECT_ID", "stay_number", "quarter"]]
return df_outcome.merge(df_mimic, on=["SUBJECT_ID", "stay_number"])
def get_eicu_region(df):
raise NotImplementedError
def load_fiddle(data, outcome, n=None, vitals_only=True):
"""
- `data`: ['eicu', 'mimic3']
- `task`: ['ARF_4h','ARF_12h','Shock_4h','Shock_12h','mortality_48h']
- `n`: number of samples to pick
features of form N_patients x Seq_len x Features
"""
data_dir = DATA_DIR / f"FIDDLE_{data}"
with open(
data_dir / "features" / outcome / "X.feature_names.json", encoding="utf-8"
) as X_file:
X_feature_names = json.load(X_file)
with open(
data_dir / "features" / outcome / "s.feature_names.json", encoding="utf-8"
) as s_file:
s_feature_names = json.load(s_file)
# Take only subset of vars to reduce mem overhead
if data == "eicu":
vitals = ["Vital Signs|"]
elif data == "mimic3":
vitals = ["HR", "RR", "SpO2", "SBP", "Heart Rhythm", "SysBP", "DiaBP"]
vital_col_ids = [
X_feature_names.index(var)
for var in X_feature_names
for prefix in vitals
if var.startswith(prefix)
]
if vitals_only:
X_feature_names = [X_feature_names[i] for i in vital_col_ids]
features_X_subset_ids = vital_col_ids
else:
X_n = len(X_feature_names)
# X_n = 400
features_X_subset_ids = list(set(range(X_n)).union(set(vital_col_ids)))
# Loading np arrays
features_X = sparse.load_npz(data_dir / "features" / outcome / "X.npz")[
:n, :, features_X_subset_ids
].todense()
features_s = sparse.load_npz(data_dir / "features" / outcome / "s.npz")[
:n
].todense()
df_outcome = pd.read_csv(data_dir / "population" / f"{outcome}.csv")[:n]
df_outcome["y_true"] = df_outcome[f"{outcome.split('_')[0]}_LABEL"]
return features_X, features_s, X_feature_names, s_feature_names, df_outcome
def get_modes(x, feat, seq_dim=1):
"""
For a tensor of shape NxLxF
Returns modal value for given feature across sequence dim.
"""
# JA: Check conversion to tnsor, dtype etc
return torch.LongTensor(x[:, :, feat]).mode(dim=seq_dim)[0].clone().detach().numpy()
def split_tasks_fiddle(data, demo, outcome, order="random", seed=SEED):
"""
Takes FIDDLE format data and given an outcome and demographic,
splits the input data across that demographic into multiple
tasks/experiences.
"""
if demo == "ethnicity_coarse":
(
features_X,
features_s,
X_feature_names,
s_feature_names,
df_outcome,
) = get_ethnicity_coarse(data, outcome)
else:
(
features_X,
features_s,
X_feature_names,
s_feature_names,
df_outcome,
) = load_fiddle(data, outcome)
static_onehot_demos = [
"sex",
"age",
"ethnicity",
"ethnicity_coarse",
"hospital",
"ward",
]
if demo in static_onehot_demos:
cols = [
c for c in s_feature_names if c.startswith(DEMO_COL_PREFIXES[data][demo])
]
demo_onehots = [s_feature_names.index(col) for col in cols]
tasks_idx = [features_s[:, i] == 1 for i in demo_onehots]
elif demo == "time_season":
seasons = recover_admission_time(data, outcome)["quarter"]
tasks_idx = [seasons == i for i in range(1, 5)]
else:
raise NotImplementedError
all_features = concat_timevar_static_feats(features_X, features_s)
# Reproducible RNG
if order == "random":
rng = np.random.default_rng(seed)
rng.shuffle(tasks_idx)
elif order == "reverse":
tasks_idx = reversed(tasks_idx)
tasks = [(all_features[idx], df_outcome[idx]) for idx in tasks_idx]
return tasks
def concat_timevar_static_feats(features_X, features_s):
"""
Concatenates time-varying features with static features.
Static features padded to length of sequence,
and appended along feature axis.
"""
# JA: Need to test this has no bugs.
# Repeat static vals length of sequence across new axis
s_expanded = np.expand_dims(features_s, 1).repeat(features_X.shape[1], axis=1)
# Concatenate across feat axis
all_feats = np.concatenate((features_X, s_expanded), -1)
return all_feats
def split_trainvaltest_fiddle(
tasks, val_as_test=True, print_task_partitions=True, seed=SEED
):
"""
Takes a dataset of multiple tasks/experiences and splits it into train and val/test sets.
Assumes FIDDLE style outcome/partition cols in df of outcome values.
"""
# Only MIMIC-III mortality_48h has train/val/test split
# JA: This currently splits on sample/admission. DOES NOT SPLIT ON PATIENT ID
# Need to incorporate patient ID split from
# https://github.com/MLD3/FIDDLE-experiments/blob/master/mimic3_experiments/1_data_extraction/extract_data.py
# elsewhere defined?
# Train/val/test/split
for i in range(len(tasks)):
if "partition" not in tasks[i][1]:
# Reproducible RNG
rng = np.random.default_rng(seed)
n = len(tasks[i][1])
partition = rng.choice(["train", "val", "test"], n, p=[0.7, 0.15, 0.15])
tasks[i][1]["partition"] = partition
if print_task_partitions:
partitions = get_task_partition_sizes(tasks)
for p in partitions:
print(p)
if val_as_test:
tasks_train = [
(
t[0][t[1]["partition"] == "train"],
t[1][t[1]["partition"] == "train"]["y_true"].values,
)
for t in tasks
]
tasks_test = [
(
t[0][t[1]["partition"] == "val"],
t[1][t[1]["partition"] == "val"]["y_true"].values,
)
for t in tasks
]
else:
tasks_train = [
(
t[0][t[1]["partition"].isin(["train", "val"])],
t[1][t[1]["partition"].isin(["train", "val"])]["y_true"].values,
)
for t in tasks
]
tasks_test = [
(
t[0][t[1]["partition"] == "test"],
t[1][t[1]["partition"] == "test"]["y_true"].values,
)
for t in tasks
]
return tasks_train, tasks_test
#############################
# Helper funcs for figs, data, info for paper
#############################
def get_corr_feats_target(df, target):
cols = df.columns.drop(target)
df[cols].corr()[target][:]
def get_demo_labels(data, demo, outcome):
"""
Gets labels for demo splits from feature col names.
"""
data_dir = DATA_DIR / f"FIDDLE_{data}"
with open(
data_dir / "features" / outcome / "s.feature_names.json", encoding="utf-8"
) as s_file:
s_feature_names = json.load(s_file)
cols = [
col.split(":")[-1]
for col in s_feature_names
if col.startswith(DEMO_COL_PREFIXES[data][demo])
]
return cols
def get_demo_labels_table(demo, datasets=["mimic3", "eicu"]):
# pd.options.display.max_colwidth = 1000
# Domainshifts present (over outcomes)
task_data = []
outcomes = ["mortality_48h", "ARF_4h", "Shock_4h", "ARF_12h", "Shock_12h"]
all_tasks = set.union(
*[
set(get_demo_labels(data, demo, outcome))
for outcome in outcomes
for data in datasets
]
)
cols = ["Dataset", "Outcome"] + list(all_tasks)
for data in datasets:
for outcome in outcomes:
tasks = get_demo_labels(data, demo, outcome)
task_data.append(
[data, outcome.replace("_", " ")]
+ ["\checkmark" if task in tasks else " " for task in all_tasks]
)
df = pd.DataFrame(columns=cols, data=task_data)
df = df.set_index(["Dataset", "Outcome"])
s = df.sum()
df = df[s.sort_values(ascending=False).index[:]]
return df
def get_task_partition_sizes(tasks):
"""
Prints the number of positive and negative samples in each train/val/test split
for each task.
"""
tables = []
for t in tasks:
tables.append(
t[1][["partition", "y_true"]]
.groupby("partition")
.agg(Total=("y_true", "count"), Outcome=("y_true", "sum"))
)
return tables
def generate_data_tables(data, demo, outcome, seed=SEED):
"""Generate latex tables describing data."""
tasks = split_tasks_fiddle(data, demo, outcome)
for i in range(len(tasks)):
if "partition" not in tasks[i][1]:
# Reproducible RNG
rng = np.random.default_rng(seed)
n = len(tasks[i][1])
partition = rng.choice(["train", "val", "test"], n, p=[0.7, 0.15, 0.15])
tasks[i][1]["partition"] = partition
dfs = get_task_partition_sizes(tasks)
for i, df in enumerate(dfs):
df["task"] = i
df = pd.concat(dfs)
df = df.set_index(["task"], append=True)
df = df.unstack()
df = df.reorder_levels([-1, -2], axis=1)
df = df.sort_index(axis=1, level=0)
df = df.reindex(columns=df.columns.reindex(["Total", "Outcome"], level=1)[0])
return df