--- a +++ b/mimic.py @@ -0,0 +1,745 @@ +import utils +import pandas as pd +from constants import ALL,column_names,NO_UNITS,START_DT,END_DT +import logger +from sklearn.base import BaseEstimator, TransformerMixin +from sklearn.pipeline import Pipeline +from fuzzywuzzy import fuzz +import re +import random +import units +import transformers +import dask.dataframe as dd +from extract_transform_load import ETLManager +import matplotlib.pyplot as plt + +ITEMID = 'itemid' +SUBINDEX = 'subindex' +HADM_ID = 'hadm_id' + +UOM_MAP = { + '#': 'number', + '%': 'percent', + '(CU MM|mm3)': 'mm**3', + '24(hr|hour)s?': 'day', + 'Deg\\.? F': 'degF', + 'Deg\\.? C': 'degC', + 'I.U.': 'IU', + 'MEQ': 'mEq', + 'MM HG': 'mmHg', + '\\+': 'pos', + '\\-': 'neg', + 'cmH20': 'cmH2O', + 'gms': 'grams', + 'kg x m ': 'kg*m', + 'lpm': 'liter/min', + 'm2': 'm**2', + 'mcghr': 'mcg/hr', + 'mcgkghr': 'mcg/kg/hr', + 'mcgkgmin': 'mcg/kg/min', + 'mcgmin': 'mcg/min', + 'mghr': 'mg/hr', + 'mgkghr': 'mg/kg/hr', + 'mgmin': 'mg/min', + '\?F':'degF', + '\?C':'degC', + 'Uhr':'U/hr', + 'Umin':'U/min', + '/mic l':'1/uL', + 'K/uL':'x10e3/uL' + } +""" +EXPLORING MIMIC-III database +""" + +class explorer(object): + + def __init__(self, mimic_conn=None): + if mimic_conn is None: + mimic_conn = connect() + columns_to_keep = ['label','abbreviation','itemid','linksto','category','unitname'] + self.mimic_conn = mimic_conn + self.df_all_defs = item_defs(self.mimic_conn)[columns_to_keep] + self.df_all_defs.set_index('itemid',inplace=True, drop=True) + + def search(self,terms,loinc_code=None): + + results = None + for term in terms: + score = self.df_all_defs[['category','label','abbreviation','unitname']].applymap(lambda x: fuzzy_score(str(x),term)).max(axis=1) + if results is None: results = score + else: results = pd.concat([results, score], axis=1).max(axis=1) + + results.name = 'score' + return self.df_all_defs.join(results.to_frame()).sort_values('score',ascending=False) + + def investigate(self, itemid,upper_limit): + info = self.df_all_defs.loc[itemid] + table = info.loc['linksto'] + + df = pd.read_sql_query('SELECT * FROM mimiciii.{} WHERE itemid={}'.format(table,itemid),self.mimic_conn) + + print df.describe(include='all') + + #value + print 'value count:',df.value.count() + value = df.value + if value.count() > 0: + print value.value_counts()/df.value.count() + value.value_counts().plot('bar') + plt.show() + + #valuenum + print 'valuenum count:',df.valuenum.count() + valuenum = df.loc[df.valuenum < upper_limit].valuenum + if valuenum.count() > 0: + valuenum.hist() + plt.show() + + print 'UOM info:' + print df.valueuom.value_counts() + + + + return df + + +def fuzzy_score(x,y): + if len(x)==0 or len(y) == 0: return 0 + x = x.lower() + y = y.lower() + score = pd.np.mean([ + fuzz.partial_ratio(x,y), + fuzz.token_sort_ratio(x,y), + fuzz.ratio(x,y) + ]) + bonus = 10 if (x in y) or (y in x) else 0 + return score + bonus + +def add_item_mapping(component,item_ids,item_map_fpath='config/mimic_item_map.csv'): + item_map = pd.read_csv(item_map_fpath) + new_mappings = pd.DataFrame([(component,item_id) for item_id in item_ids],columns=['component','itemid']) + item_map = pd.concat([item_map,new_mappings]).reset_index(drop=True) + item_map.to_csv(item_map_fpath, index=False) + return item_map + +""" +EXTRACTING Data from MIMIC-III +- Timeseries data +- Context/demographic data +""" + +class MimicETLManager(ETLManager): + + def __init__(self,hdf5_fname,mimic_item_map_fname,data_dict): + self.conn = connect() + self.item_map_fname = mimic_item_map_fname + self.data_dict = data_dict + cleaners = standard_cleaners(data_dict) + super(MimicETLManager,self).__init__(cleaners,hdf5_fname) + + def extract(self,component): + item_map = pd.read_csv(self.item_map_fname) + return extract_component(self.conn,component,item_map) + + def transform(self,df,component): + transformers = transform_pipeline(component,self.data_dict) + return transformers.fit_transform(df) + + def extracted_ids(self,df_extracted): + return df_extracted[column_names.ID].unique().tolist() + + def extracted_data_count(self,df_extracted): + return df_extracted[column_names.VALUE].count() + + def all_ids(self): + return get_all_hadm_ids(self.conn) + + +def extract_component(mimic_conn,component,item_map,hadm_ids=ALL): + itemids = items_for_components(item_map,[component]) + if len(itemids) == 0: return None + #Get item defs and filter to what we want + df_item_defs = item_defs(mimic_conn) + df_item_defs = df_item_defs[df_item_defs.itemid.isin(itemids)] + df_item_defs = df_item_defs[~(df_item_defs.linksto == '')] + grouped = df_item_defs.groupby('linksto') + + df_list = [] + df_columns = column_map() + + too_many_ids = len(hadm_ids) > 2000 + + for table,group in grouped: + itemids = group.itemid.astype(int).tolist() + logger.log('Extracting {} items from {}'.format(len(itemids),table)) + is_iemv = table == 'inputevents_mv' + df_col = df_columns.columns.tolist() + (['statusdescription'] if is_iemv else []) + for ix,column_set in df_columns.loc[[table]].iterrows(): + psql_col = column_set.tolist() + (['statusdescription'] if is_iemv else []) + query = 'SELECT {} FROM mimiciii.{} WHERE itemid = ANY (ARRAY{})'.format(','.join(psql_col),table,itemids) + if not (hadm_ids == ALL) and not too_many_ids: + query += ' AND hadm_id = ANY (ARRAY{})'.format(hadm_ids) + df = pd.read_sql_query(query,mimic_conn) + df.columns = df_col + if too_many_ids: + df = df[df[column_names.ID].isin(hadm_ids)] + if is_iemv: + df = df.loc[df['statusdescription'].astype(str) != 'Rewritten'] + df.drop('statusdescription', axis=1,inplace=True) + df_list.append(df) + + + + logger.log('Combine DF') + df_all = pd.concat(df_list) + + return df_all + + + +def get_context_data(hadm_ids=ALL,mimic_conn=None): + + if mimic_conn is None: + mimic_conn = connect() + #get HADM info (includes patient demographics) + df_hadm = hadm_data(mimic_conn,hadm_ids) + + #get icu data + df_icu = icu_data(mimic_conn,hadm_ids) + + #merge into single dataframe + df_hadm_info = df_hadm.merge(df_icu,on=HADM_ID,how='left') + + df_hadm_info.rename(columns={HADM_ID : column_names.ID},inplace=True) + + return df_hadm_info + + + +def hadm_data(mimic_conn,hadm_ids): + """ + expects a TUPLE of hadm_ids + """ + + + """ + @@@@@@@@@@@@ + 1. Get all demographic data from the ADMISSIONS table = df_hadm + https://mimic.physionet.org/mimictables/admissions/ + + SELECT subject_id, hadm_id, admittime, dischtime, language, religion, + marital_status, ethnicity, diagnosis, admission_location + FROM admissions + WHERE hadm_id IN hadm_ids + @@@@@@@@@@@@ + """ + table = 'mimiciii.admissions' + hadm_where_case = None if hadm_ids == ALL else 'hadm_id IN {}'.format(tuple(hadm_ids)) + col_psql = ['subject_id', HADM_ID, 'admittime', 'dischtime', 'language', + 'religion','marital_status', 'ethnicity', 'diagnosis','admission_location'] + col_df = ['pt_id',HADM_ID,START_DT,END_DT,'lang', + 'religion','marital_status','ethnicity','dx_info','admission_location'] + df_hadm = context_extraction_helper(mimic_conn,table,col_psql,col_df,hadm_where_case) + + """ + @@@@@@@@@@@@ + 2. Get all demographic data from PATIENTS table = df_pt + https://mimic.physionet.org/mimictables/patients/ + + SELECT gender, dob, dod + FROM patients + WHERE subject_id IN pt_ids + @@@@@@@@@@@@ + """ + + table = 'mimiciii.patients' + pt_ids = df_hadm['pt_id'].unique().tolist() + col_psql = ['subject_id','gender','dob','dod'] + col_df = ['pt_id','gender','dob','dod'] + df_pt = context_extraction_helper(mimic_conn,table,col_psql,col_df) + + """ + + @@@@@@@@@@@@ + 3. Get all ICD codes data from DIAGNOSES_ICD table = df_icd + https://mimic.physionet.org/mimictables/diagnoses_icd/ + + SELECT subject_id, hadm_id, seq_num, icd9_code + FROM diagnoses_icd + WHERE hadm_id IN hadm_ids + @@@@@@@@@@@@ + """ + table = 'mimiciii.diagnoses_icd' + col_psql = ['subject_id',HADM_ID,'seq_num','icd9_code'] + col_df = ['pt_id',HADM_ID,'icd_rank','icd_code'] + df_icd = context_extraction_helper(mimic_conn,table,col_psql,col_df,hadm_where_case) + + """ + @@@@@@@@@@@@ + 4. Make df_icd into single rows for each admission, where one + column is an ordered list of ICD codes for that admission + @@@@@@@@@@@@ + """ + df_icd = df_icd.sort_values('icd_rank').groupby(HADM_ID).apply(lambda grp: grp['icd_code'].tolist()) + df_icd.name = 'icd_codes' + df_icd = df_icd.reset_index() + + """ + @@@@@@@@@@@@ + Merging + 5. Merge df_pt and df_hadm on subject_id = demographics_df + 6. Merge demographics_df with df_icd on hadm_id = df_hadm_info + @@@@@@@@@@@@ + """ + df_demographics = df_hadm.merge(df_pt,on='pt_id',how='left') + df_hadm_info = df_demographics.merge(df_icd,on=HADM_ID,how='left') + + """ + @@@@@@@@@@@@ + Cleaning + 7. Remove all NA hadm_ids + 8. Add age column + 9. cast hadm_id to int + @@@@@@@@@@@@ + """ + df_hadm_info = df_hadm_info.dropna(subset=[HADM_ID]) + df_hadm_info['age'] = df_hadm_info['start_dt']-df_hadm_info['dob'] + df_hadm_info[HADM_ID] = df_hadm_info[HADM_ID].astype(int) + + return df_hadm_info + +def icu_data(mimic_conn,hadm_ids): + + table = 'mimiciii.icustays' + col_psql = [HADM_ID,'icustay_id','dbsource','first_careunit','last_careunit','intime','outtime','los'] + col_df = [HADM_ID,'icustay_id','dbsource','first_icu','last_icu','intime','outtime','los'] + hadm_where_case = None if hadm_ids == ALL else 'hadm_id IN {}'.format(tuple(hadm_ids)) + df_icu = context_extraction_helper(mimic_conn,table,col_psql,col_df,hadm_where_case) + + + """ + Cleaning + - drop ICUSTAYS without hadm_id + """ + df_icu = df_icu.dropna(subset=[HADM_ID]) + + return df_icu + +def context_extraction_helper(mimic_conn,table,col_psql,col_df,where_case=None): + query = utils.simple_sql_query(table,col_psql,where_case) + df = pd.read_sql_query(query,mimic_conn) + rename_dict = dict(zip(col_psql,col_df)) + df.rename(index=str,columns=rename_dict,inplace=True) + return df + + +def column_map(): + """ + Create column mapping + """ + #definitions + + std_columns = [column_names.ID,column_names.DATETIME,column_names.VALUE,column_names.UNITS,'itemid'] + psql_col = ['hadm_id','charttime','value','valueuom','itemid'] + + col_series = pd.Series( + psql_col, + index=std_columns + ) + + map_list = [] + col_series.name = 'chartevents' + map_list.append(col_series) + + col_series = col_series.copy() + col_series.name = 'labevents' + map_list.append(col_series) + + col_series = col_series.copy() + col_series.name = 'procedureevents_mv' + map_list.append(col_series) + + col_series = col_series.copy() + col_series.name = 'datetimeevents' + map_list.append(col_series) + + col_series = col_series.copy() + col_series.name = 'outputevents' + map_list.append(col_series) + + psql_col = ['hadm_id','starttime','rate','rateuom','itemid'] + col_series = pd.Series( + psql_col, + index=std_columns + ) + col_series.name = 'inputevents_mv' + map_list.append(col_series) + + psql_col = ['hadm_id','endtime','amount','amountuom','itemid'] + col_series = pd.Series( + psql_col, + index=std_columns + ) + col_series.name = 'inputevents_mv' + map_list.append(col_series) + + psql_col = ['hadm_id','charttime','rate','rateuom','itemid'] + col_series = pd.Series( + psql_col, + index=std_columns + ) + col_series.name = 'inputevents_cv' + map_list.append(col_series) + + psql_col = ['hadm_id','charttime','amount','amountuom','itemid'] + col_series = pd.Series( + psql_col, + index=std_columns + ) + col_series.name = 'inputevents_cv' + map_list.append(col_series) + + + return pd.DataFrame(map_list) + +def item_defs(mimic_conn): + + df_items = pd.read_sql_query('SELECT * FROM mimiciii.d_items',mimic_conn) + df_labitems = pd.read_sql_query('SELECT * FROM mimiciii.d_labitems',mimic_conn) + df_labitems['linksto'] = 'labevents' + df_all_items = pd.concat([df_labitems,df_items]) + return df_all_items + +def items_for_components(item_map,components=ALL): + if not (components == ALL): + item_map = item_map[item_map.component.isin(components)] + items = item_map.itemid.unique().astype(int).tolist() + return items + +def get_all_hadm_ids(conn=None): + if conn is None: + conn = connect() + all_ids = pd.read_sql_query('SELECT hadm_id from mimiciii.admissions',conn)['hadm_id'] + all_ids = all_ids[~pd.isnull(all_ids)] + return all_ids.astype(int).sort_values().tolist() + +def sample_hadm_ids(n,seed): + all_ids = get_all_hadm_ids() + random.seed(seed) + sampled_ids = random.sample(all_ids,n) + return sampled_ids + +def connect(psql_username='postgres',psql_pass='123'): + return utils.psql_connect(psql_username,psql_pass,'mimic') + + +""" +TRANSFORM Data extracted from MIMIC-III +""" + +class CleanUnits(BaseEstimator,TransformerMixin): + + def __init__(self,component,data_dict): + self.data_dict = data_dict + self.component = component + + def fit(self, x, y=None): + return self + + def transform(self, df): + logger.log('Clean UOM',new_level=True) + df = clean_uom(df,self.component,self.data_dict) + logger.end_log_level() + return df + +def clean_uom(df,component,data_dict): + grouped = df.groupby(column_names.UNITS) + for old_uom,group in grouped: + new_uom = process_uom(old_uom,component,data_dict) + df.loc[group.index,column_names.UNITS] = new_uom + if not (old_uom == new_uom): + df.loc[group.index,ITEMID] = utils.append_to_description(df.loc[group.index,ITEMID].astype(str),old_uom) + return df + +def process_uom(units,component,data_dict): + + if units in ['BPM','bpm']: + if component == data_dict.components.HEART_RATE: units = 'beats/min' + if component == data_dict.components.RESPIRATORY_RATE: units = 'breaths/min' + for to_replace,replacement in UOM_MAP.iteritems(): + units = re.sub(to_replace, replacement,units,flags=re.IGNORECASE) + return units + +class clean_extract(BaseEstimator,TransformerMixin): + + def fit(self, x, y=None): + return self + + def transform(self, df): + """ + FORMAT pre-unstack columns + """ + df = df.replace(to_replace='', value=pd.np.nan) + #drop NAN record_id, timestamps, or value + df.dropna(subset=[column_names.ID,column_names.DATETIME,column_names.VALUE], how='any',inplace=True) + + #ID to integer + df.loc[:,column_names.ID] = df.loc[:,column_names.ID].astype(int) + + #DATETIME to pd.DATETIME + df.loc[:,column_names.DATETIME] = pd.to_datetime(df.loc[:,column_names.DATETIME],errors='raise') + + #set UOM to NO_UOM if not declared + df.loc[:,column_names.UNITS] = df.loc[:,column_names.UNITS].fillna(NO_UNITS) + + df.rename(index=str,columns={ITEMID:column_names.DESCRIPTION},inplace=True) + index_cols = [ + column_names.ID, + column_names.DATETIME, + column_names.DESCRIPTION, + column_names.UNITS + ] + #Set up our row index + df.set_index(index_cols,inplace=True) + + return df + + + +class unstacker(transformers.safe_unstacker): + + def __init__(self): + super(unstacker,self).__init__(column_names.UNITS,column_names.DESCRIPTION) + +def transform_pipeline(component,data_dict): + return Pipeline([ + ('clean_units',CleanUnits(component,data_dict)), + ('clean_df',clean_extract()), + ('unstack',unstacker()), + ('add_level',transformers.add_level(component,'component',axis=1)), + ]) + +def standard_cleaners(data_dict): + category_map = mimic_category_map(data_dict) + ureg = units.MedicalUreg() + return Pipeline([ + ('aggregate_same_datetime',transformers.same_index_aggregator(lambda grp:grp.iloc[0])), + ('split_dtype',transformers.split_dtype()), + ('standardize_columns',transformers.column_standardizer(data_dict,ureg)), + ('standardize_categories',transformers.standardize_categories(data_dict,category_map)), + ('split_bad_categories',transformers.split_bad_categories(data_dict)), + ('one_hotter',transformers.nominal_to_onehot()), + ('drop_oob_values',transformers.oob_value_remover(data_dict)) + ]) + + +def mimic_category_map(data_dict): + return { + data_dict.components.GLASGOW_COMA_SCALE_EYE_OPENING: { + '1 No Response': 6, + '2 To pain': 7, + '3 To speech': 8, + '4 Spontaneously': 9 + }, + data_dict.components.GLASGOW_COMA_SCALE_MOTOR: { + '1 No Response': 0, + '2 Abnorm extensn': 1, + '3 Abnorm flexion': 2, + '4 Flex-withdraws': 3, + '5 Localizes Pain': 4, + '6 Obeys Commands': 5 + }, + data_dict.components.GLASGOW_COMA_SCALE_VERBAL: { + '1 No Response': 10, + '1.0 ET/Trach': 10, + '2 Incomp sounds': 11, + '3 Inapprop words': 12, + '4 Confused': 13, + '5 Oriented':14 + } + } + + + + + +def ETL(extractor, + components, + data_dict, + same_dt_aggregator, + hdf5_fname=None,joined_path=None, + hadm_ids=ALL, + use_base_df=True, + to_pandas=False, + chunksize=500000): + + logger.log('***ETL***',new_level=True) + logger.log('SETUP',new_level=True) + + category_map = mimic_category_map(data_dict) + ureg = units.MedicalUreg() + + transformer = transform_pipeline() + + standard_clean_pipeline = Pipeline([ + ('aggregate_same_datetime',same_dt_aggregator), + ('split_dtype',transformers.split_dtype()), + ('standardize_columns',transformers.column_standardizer(data_dict,ureg)), + ('standardize_categories',transformers.standardize_categories(data_dict,category_map)), + ('split_bad_categories',transformers.split_bad_categories(data_dict)), + # ('one_hotter',transformers.nominal_to_onehot()), + ('drop_oob_values',transformers.oob_value_remover(data_dict)) + ]) + + should_save = (hdf5_fname is not None) + + df_base = None + + if should_save & use_base_df: + try: + df_base = utils.open_df(hdf5_fname,joined_path) + except: + pass + + if df_base is not None: + + + existing_components = df_base.columns.get_level_values(column_names.COMPONENT).unique().tolist() + existing_ids = set(df_base.index.get_level_values(column_names.ID).tolist()) + requested_ids = hadm_ids if hadm_ids != ALL else get_all_hadm_ids() + + new_ids = [ID for ID in requested_ids if ID not in existing_ids] + + + #case 1: new ids in existing columns, don't try to be smart with ALL unless not a lot of IDs + if len(new_ids) > 0: + df_addition = ETL(extractor, + existing_components, + data_dict, + same_dt_aggregator, + hadm_ids=new_ids, + to_pandas=True) + if df_addition is not None: + df_base = pd.concat([df_base,df_addition]) + #now we only need to load NEW components + components = [comp for comp in components if comp not in existing_components] + + logger.log('Base DF to Dask') + df_base = dd.from_pandas(df_base.reset_index(), chunksize=chunksize) + + + df_all = df_base + + logger.log('BEGIN ETL for {} admissions and {} components: {}'.format(hadm_ids if hadm_ids == ALL else len(hadm_ids), + len(components), + components),new_level=True,end_level=True) + for component in components: + logger.log('{}: {}/{}'.format(component.upper(),components.index(component)+1,len(components)),new_level=True) + + """ + @@@@@@@@@@@@@@@ + ----EXTRACT---- + @@@@@@@@@@@@@@@ + """ + + logger.log("Extracting...",new_level=True) + df_extracted = extractor.extract_component(component,hadm_ids) + + if df_extracted.empty: + print 'EMPTY Dataframe EXTRACTED for {}, n={} ids'.format(component,len(hadm_ids)) + logger.end_log_level() + continue + + if should_save: + logger.log('Save EXTRACTED DF = {}'.format(df_extracted.shape)) + utils.save_df(df_extracted,hdf5_fname,'extracted/{}'.format(component)) + logger.end_log_level() + + + """ + @@@@@@@@@@@@@@@@@ + ----TRANSFORM---- + @@@@@@@@@@@@@@@@@ + """ + + logger.log("Transforming... {}".format(df_extracted.shape),new_level=True) + transformer.set_params(add_level__level_val=component) + df_transformed = transformer.transform(df_extracted) + + print 'Data Loss (Extract > Transformed):',utils.data_loss(df_extracted.set_index(column_names.ID).value.to_frame(),df_transformed) + + if df_transformed.empty: + print 'EMPTY Dataframe TRANSFORMED for {}, n={} ids'.format(component,len(hadm_ids)) + logger.end_log_level() + continue + + if should_save: + logger.log('Save TRANSFORMED DF = {}'.format(df_transformed.shape)) + utils.save_df(df_transformed,hdf5_fname,'transformed/{}'.format(component)) + logger.end_log_level() + + + + """ + @@@@@@@@@@@@@@@ + -----CLEAN----- + @@@@@@@@@@@@@@@ + """ + + logger.log("Cleaning... {}".format(df_transformed.shape),new_level=True) + df = standard_clean_pipeline.transform(df_transformed) + + print 'Data Loss (Extract > Cleaned):', utils.data_loss(df_extracted.set_index(column_names.ID).value.to_frame(),df) + + if df.empty: + print 'EMPTY Dataframe TRANSFORMED for {}, n={} ids'.format(component,len(hadm_ids)) + logger.end_log_level() + continue + + if should_save: + logger.log('Save CLEANED DF = {}'.format(df.shape)) + utils.save_df(df,hdf5_fname,'cleaned/{}'.format(component)) + logger.end_log_level() + + del df_extracted,df_transformed + + logger.log('Filter & sort - {}'.format(df.shape)) + + df.sort_index(inplace=True) + df.sort_index(inplace=True, axis=1) + + + logger.log('Convert to dask - {}'.format(df.shape)) + df_dask = dd.from_pandas(df.reset_index(), chunksize=chunksize) + del df + + logger.log('Join to big DF') + + if df_all is None: df_all = df_dask + else : + df_all = df_all.merge(df_dask,how='outer', on=['id','datetime']) + del df_dask + + logger.end_log_level() + logger.end_log_level() + + if df_all is None or not to_pandas: + logger.end_log_level() + return df_all + + logger.log('Dask DF back to pandas') + df_pd = df_all.compute() + del df_all + df_pd.set_index(['id','datetime'], inplace=True) + + logger.log('SORT Joined DF') + df_pd.sort_index(inplace=True) + df_pd.sort_index(inplace=True, axis=1) + + if should_save: + logger.log('SAVE Big DF') + utils.save_df(df_pd,hdf5_fname,joined_path) + logger.end_log_level() + + return df_pd