Diff of /mimic.py [000000] .. [418e14]

Switch to unified view

a b/mimic.py
1
import utils
2
import pandas as pd
3
from constants import ALL,column_names,NO_UNITS,START_DT,END_DT
4
import logger
5
from sklearn.base import BaseEstimator, TransformerMixin
6
from sklearn.pipeline import Pipeline
7
from fuzzywuzzy import fuzz
8
import re
9
import random
10
import units
11
import transformers
12
import dask.dataframe as dd
13
from extract_transform_load import ETLManager
14
import matplotlib.pyplot as plt
15
16
ITEMID = 'itemid'
17
SUBINDEX = 'subindex'
18
HADM_ID = 'hadm_id'
19
20
UOM_MAP = {
21
    '#': 'number',
22
     '%': 'percent',
23
     '(CU MM|mm3)': 'mm**3',
24
     '24(hr|hour)s?': 'day',
25
     'Deg\\.? F': 'degF',
26
     'Deg\\.? C': 'degC',
27
     'I.U.': 'IU',
28
     'MEQ': 'mEq',
29
     'MM HG': 'mmHg',
30
     '\\+': 'pos',
31
     '\\-': 'neg',
32
     'cmH20': 'cmH2O',
33
     'gms': 'grams',
34
     'kg x m ': 'kg*m',
35
     'lpm': 'liter/min',
36
     'm2': 'm**2',
37
     'mcghr': 'mcg/hr',
38
     'mcgkghr': 'mcg/kg/hr',
39
     'mcgkgmin': 'mcg/kg/min',
40
     'mcgmin': 'mcg/min',
41
     'mghr': 'mg/hr',
42
     'mgkghr': 'mg/kg/hr',
43
     'mgmin': 'mg/min',
44
     '\?F':'degF',
45
     '\?C':'degC',
46
     'Uhr':'U/hr',
47
     'Umin':'U/min',
48
     '/mic l':'1/uL',
49
     'K/uL':'x10e3/uL'
50
    }
51
"""
52
EXPLORING MIMIC-III database
53
"""
54
55
class explorer(object):
56
57
    def __init__(self, mimic_conn=None):
58
        if mimic_conn is None:
59
            mimic_conn = connect()
60
        columns_to_keep = ['label','abbreviation','itemid','linksto','category','unitname']
61
        self.mimic_conn = mimic_conn
62
        self.df_all_defs = item_defs(self.mimic_conn)[columns_to_keep]
63
        self.df_all_defs.set_index('itemid',inplace=True, drop=True)
64
65
    def search(self,terms,loinc_code=None):
66
67
        results = None
68
        for term in terms:
69
            score = self.df_all_defs[['category','label','abbreviation','unitname']].applymap(lambda x: fuzzy_score(str(x),term)).max(axis=1)
70
            if results is None: results = score
71
            else: results = pd.concat([results, score], axis=1).max(axis=1)
72
73
        results.name = 'score'
74
        return self.df_all_defs.join(results.to_frame()).sort_values('score',ascending=False)
75
76
    def investigate(self, itemid,upper_limit):
77
        info = self.df_all_defs.loc[itemid]
78
        table = info.loc['linksto']
79
80
        df = pd.read_sql_query('SELECT * FROM mimiciii.{} WHERE itemid={}'.format(table,itemid),self.mimic_conn)
81
82
        print df.describe(include='all')
83
84
        #value
85
        print 'value count:',df.value.count()
86
        value = df.value
87
        if value.count() > 0:
88
            print value.value_counts()/df.value.count()
89
            value.value_counts().plot('bar')
90
            plt.show()
91
92
        #valuenum
93
        print 'valuenum count:',df.valuenum.count()
94
        valuenum = df.loc[df.valuenum < upper_limit].valuenum
95
        if valuenum.count() > 0:
96
            valuenum.hist()
97
            plt.show()
98
99
        print 'UOM info:'
100
        print df.valueuom.value_counts()
101
102
103
104
        return df
105
106
107
def fuzzy_score(x,y):
108
    if len(x)==0 or len(y) == 0: return 0
109
    x = x.lower()
110
    y = y.lower()
111
    score = pd.np.mean([
112
                fuzz.partial_ratio(x,y),
113
                fuzz.token_sort_ratio(x,y),
114
                fuzz.ratio(x,y)
115
                ])
116
    bonus = 10 if (x in y) or (y in x) else 0
117
    return score + bonus
118
119
def add_item_mapping(component,item_ids,item_map_fpath='config/mimic_item_map.csv'):
120
    item_map = pd.read_csv(item_map_fpath)
121
    new_mappings = pd.DataFrame([(component,item_id) for item_id in item_ids],columns=['component','itemid'])
122
    item_map = pd.concat([item_map,new_mappings]).reset_index(drop=True)
123
    item_map.to_csv(item_map_fpath, index=False)
124
    return item_map
125
126
"""
127
EXTRACTING Data from MIMIC-III
128
- Timeseries data
129
- Context/demographic data
130
"""
131
132
class MimicETLManager(ETLManager):
133
134
    def __init__(self,hdf5_fname,mimic_item_map_fname,data_dict):
135
        self.conn = connect()
136
        self.item_map_fname = mimic_item_map_fname
137
        self.data_dict = data_dict
138
        cleaners = standard_cleaners(data_dict)
139
        super(MimicETLManager,self).__init__(cleaners,hdf5_fname)
140
141
    def extract(self,component):
142
        item_map = pd.read_csv(self.item_map_fname)
143
        return extract_component(self.conn,component,item_map)
144
145
    def transform(self,df,component):
146
        transformers = transform_pipeline(component,self.data_dict)
147
        return transformers.fit_transform(df)
148
149
    def extracted_ids(self,df_extracted):
150
        return df_extracted[column_names.ID].unique().tolist()
151
152
    def extracted_data_count(self,df_extracted):
153
        return df_extracted[column_names.VALUE].count()
154
155
    def all_ids(self):
156
        return get_all_hadm_ids(self.conn)
157
158
159
def extract_component(mimic_conn,component,item_map,hadm_ids=ALL):
160
    itemids = items_for_components(item_map,[component])
161
    if len(itemids) == 0: return None
162
    #Get item defs and filter to what we want
163
    df_item_defs = item_defs(mimic_conn)
164
    df_item_defs = df_item_defs[df_item_defs.itemid.isin(itemids)]
165
    df_item_defs = df_item_defs[~(df_item_defs.linksto == '')]
166
    grouped = df_item_defs.groupby('linksto')
167
168
    df_list = []
169
    df_columns = column_map()
170
171
    too_many_ids = len(hadm_ids) > 2000
172
173
    for table,group in grouped:
174
        itemids = group.itemid.astype(int).tolist()
175
        logger.log('Extracting {} items from {}'.format(len(itemids),table))
176
        is_iemv = table == 'inputevents_mv'
177
        df_col = df_columns.columns.tolist() + (['statusdescription'] if is_iemv else [])
178
        for ix,column_set in df_columns.loc[[table]].iterrows():
179
            psql_col = column_set.tolist() + (['statusdescription'] if is_iemv else [])
180
            query = 'SELECT {} FROM mimiciii.{} WHERE itemid = ANY (ARRAY{})'.format(','.join(psql_col),table,itemids)
181
            if not (hadm_ids == ALL) and not too_many_ids:
182
                query += ' AND hadm_id = ANY (ARRAY{})'.format(hadm_ids)
183
            df = pd.read_sql_query(query,mimic_conn)
184
            df.columns = df_col
185
            if too_many_ids:
186
                df = df[df[column_names.ID].isin(hadm_ids)]
187
            if is_iemv:
188
                df = df.loc[df['statusdescription'].astype(str) != 'Rewritten']
189
                df.drop('statusdescription', axis=1,inplace=True)
190
            df_list.append(df)
191
192
193
194
    logger.log('Combine DF')
195
    df_all = pd.concat(df_list)
196
197
    return df_all
198
199
200
201
def get_context_data(hadm_ids=ALL,mimic_conn=None):
202
203
        if mimic_conn is None:
204
            mimic_conn = connect()
205
        #get HADM info (includes patient demographics)
206
        df_hadm = hadm_data(mimic_conn,hadm_ids)
207
208
        #get icu data
209
        df_icu = icu_data(mimic_conn,hadm_ids)
210
211
        #merge into single dataframe
212
        df_hadm_info = df_hadm.merge(df_icu,on=HADM_ID,how='left')
213
214
        df_hadm_info.rename(columns={HADM_ID : column_names.ID},inplace=True)
215
216
        return df_hadm_info
217
218
219
220
def hadm_data(mimic_conn,hadm_ids):
221
    """
222
    expects a TUPLE of hadm_ids
223
    """
224
225
226
    """
227
    @@@@@@@@@@@@
228
    1. Get all demographic data from the ADMISSIONS table = df_hadm
229
       https://mimic.physionet.org/mimictables/admissions/
230
231
       SELECT subject_id, hadm_id, admittime, dischtime, language, religion,
232
           marital_status, ethnicity, diagnosis, admission_location
233
       FROM admissions
234
       WHERE hadm_id IN hadm_ids
235
    @@@@@@@@@@@@
236
    """
237
    table = 'mimiciii.admissions'
238
    hadm_where_case = None if hadm_ids == ALL else 'hadm_id IN {}'.format(tuple(hadm_ids))
239
    col_psql = ['subject_id', HADM_ID, 'admittime', 'dischtime', 'language',
240
                        'religion','marital_status', 'ethnicity', 'diagnosis','admission_location']
241
    col_df = ['pt_id',HADM_ID,START_DT,END_DT,'lang',
242
                        'religion','marital_status','ethnicity','dx_info','admission_location']
243
    df_hadm = context_extraction_helper(mimic_conn,table,col_psql,col_df,hadm_where_case)
244
245
    """
246
    @@@@@@@@@@@@
247
    2. Get all demographic data from PATIENTS table = df_pt
248
       https://mimic.physionet.org/mimictables/patients/
249
250
       SELECT gender, dob, dod
251
       FROM patients
252
       WHERE subject_id IN pt_ids
253
    @@@@@@@@@@@@
254
    """
255
256
    table = 'mimiciii.patients'
257
    pt_ids = df_hadm['pt_id'].unique().tolist()
258
    col_psql = ['subject_id','gender','dob','dod']
259
    col_df = ['pt_id','gender','dob','dod']
260
    df_pt = context_extraction_helper(mimic_conn,table,col_psql,col_df)
261
262
    """
263
264
    @@@@@@@@@@@@
265
    3. Get all ICD codes data from DIAGNOSES_ICD table = df_icd
266
       https://mimic.physionet.org/mimictables/diagnoses_icd/
267
268
       SELECT subject_id, hadm_id, seq_num, icd9_code
269
       FROM diagnoses_icd
270
       WHERE hadm_id IN hadm_ids
271
    @@@@@@@@@@@@
272
    """
273
    table = 'mimiciii.diagnoses_icd'
274
    col_psql = ['subject_id',HADM_ID,'seq_num','icd9_code']
275
    col_df = ['pt_id',HADM_ID,'icd_rank','icd_code']
276
    df_icd = context_extraction_helper(mimic_conn,table,col_psql,col_df,hadm_where_case)
277
278
    """
279
    @@@@@@@@@@@@
280
    4. Make df_icd into single rows for each admission, where one
281
     column is an ordered list of ICD codes for that admission
282
    @@@@@@@@@@@@
283
    """
284
    df_icd = df_icd.sort_values('icd_rank').groupby(HADM_ID).apply(lambda grp: grp['icd_code'].tolist())
285
    df_icd.name = 'icd_codes'
286
    df_icd = df_icd.reset_index()
287
288
    """
289
    @@@@@@@@@@@@
290
    Merging
291
    5. Merge df_pt and df_hadm on subject_id = demographics_df
292
    6. Merge demographics_df with df_icd on hadm_id = df_hadm_info
293
    @@@@@@@@@@@@
294
    """
295
    df_demographics = df_hadm.merge(df_pt,on='pt_id',how='left')
296
    df_hadm_info = df_demographics.merge(df_icd,on=HADM_ID,how='left')
297
298
    """
299
    @@@@@@@@@@@@
300
    Cleaning
301
    7. Remove all NA hadm_ids
302
    8. Add age column
303
    9. cast hadm_id to int
304
    @@@@@@@@@@@@
305
    """
306
    df_hadm_info = df_hadm_info.dropna(subset=[HADM_ID])
307
    df_hadm_info['age'] = df_hadm_info['start_dt']-df_hadm_info['dob']
308
    df_hadm_info[HADM_ID] = df_hadm_info[HADM_ID].astype(int)
309
310
    return df_hadm_info
311
312
def icu_data(mimic_conn,hadm_ids):
313
314
    table = 'mimiciii.icustays'
315
    col_psql = [HADM_ID,'icustay_id','dbsource','first_careunit','last_careunit','intime','outtime','los']
316
    col_df = [HADM_ID,'icustay_id','dbsource','first_icu','last_icu','intime','outtime','los']
317
    hadm_where_case = None if hadm_ids == ALL else 'hadm_id IN {}'.format(tuple(hadm_ids))
318
    df_icu = context_extraction_helper(mimic_conn,table,col_psql,col_df,hadm_where_case)
319
320
321
    """
322
    Cleaning
323
    - drop ICUSTAYS without hadm_id
324
    """
325
    df_icu = df_icu.dropna(subset=[HADM_ID])
326
327
    return df_icu
328
329
def context_extraction_helper(mimic_conn,table,col_psql,col_df,where_case=None):
330
    query = utils.simple_sql_query(table,col_psql,where_case)
331
    df = pd.read_sql_query(query,mimic_conn)
332
    rename_dict = dict(zip(col_psql,col_df))
333
    df.rename(index=str,columns=rename_dict,inplace=True)
334
    return df
335
336
337
def column_map():
338
    """
339
    Create column mapping
340
    """
341
    #definitions
342
343
    std_columns = [column_names.ID,column_names.DATETIME,column_names.VALUE,column_names.UNITS,'itemid']
344
    psql_col = ['hadm_id','charttime','value','valueuom','itemid']
345
346
    col_series = pd.Series(
347
        psql_col,
348
        index=std_columns
349
        )
350
351
    map_list = []
352
    col_series.name = 'chartevents'
353
    map_list.append(col_series)
354
355
    col_series = col_series.copy()
356
    col_series.name = 'labevents'
357
    map_list.append(col_series)
358
359
    col_series = col_series.copy()
360
    col_series.name = 'procedureevents_mv'
361
    map_list.append(col_series)
362
363
    col_series = col_series.copy()
364
    col_series.name = 'datetimeevents'
365
    map_list.append(col_series)
366
367
    col_series = col_series.copy()
368
    col_series.name = 'outputevents'
369
    map_list.append(col_series)
370
371
    psql_col = ['hadm_id','starttime','rate','rateuom','itemid']
372
    col_series = pd.Series(
373
        psql_col,
374
        index=std_columns
375
        )
376
    col_series.name = 'inputevents_mv'
377
    map_list.append(col_series)
378
379
    psql_col = ['hadm_id','endtime','amount','amountuom','itemid']
380
    col_series = pd.Series(
381
        psql_col,
382
        index=std_columns
383
        )
384
    col_series.name = 'inputevents_mv'
385
    map_list.append(col_series)
386
387
    psql_col = ['hadm_id','charttime','rate','rateuom','itemid']
388
    col_series = pd.Series(
389
        psql_col,
390
        index=std_columns
391
        )
392
    col_series.name = 'inputevents_cv'
393
    map_list.append(col_series)
394
395
    psql_col = ['hadm_id','charttime','amount','amountuom','itemid']
396
    col_series = pd.Series(
397
        psql_col,
398
        index=std_columns
399
        )
400
    col_series.name = 'inputevents_cv'
401
    map_list.append(col_series)
402
403
404
    return pd.DataFrame(map_list)
405
406
def item_defs(mimic_conn):
407
408
    df_items = pd.read_sql_query('SELECT * FROM mimiciii.d_items',mimic_conn)
409
    df_labitems = pd.read_sql_query('SELECT * FROM mimiciii.d_labitems',mimic_conn)
410
    df_labitems['linksto'] = 'labevents'
411
    df_all_items = pd.concat([df_labitems,df_items])
412
    return df_all_items
413
414
def items_for_components(item_map,components=ALL):
415
    if not (components == ALL):
416
        item_map = item_map[item_map.component.isin(components)]
417
    items = item_map.itemid.unique().astype(int).tolist()
418
    return items
419
420
def get_all_hadm_ids(conn=None):
421
    if conn is None:
422
        conn = connect()
423
    all_ids = pd.read_sql_query('SELECT hadm_id from mimiciii.admissions',conn)['hadm_id']
424
    all_ids = all_ids[~pd.isnull(all_ids)]
425
    return all_ids.astype(int).sort_values().tolist()
426
427
def sample_hadm_ids(n,seed):
428
    all_ids = get_all_hadm_ids()
429
    random.seed(seed)
430
    sampled_ids = random.sample(all_ids,n)
431
    return sampled_ids
432
433
def connect(psql_username='postgres',psql_pass='123'):
434
    return utils.psql_connect(psql_username,psql_pass,'mimic')
435
436
437
"""
438
TRANSFORM Data extracted from MIMIC-III
439
"""
440
441
class CleanUnits(BaseEstimator,TransformerMixin):
442
443
    def __init__(self,component,data_dict):
444
        self.data_dict = data_dict
445
        self.component = component
446
447
    def fit(self, x, y=None):
448
        return self
449
450
    def transform(self, df):
451
        logger.log('Clean UOM',new_level=True)
452
        df = clean_uom(df,self.component,self.data_dict)
453
        logger.end_log_level()
454
        return df
455
456
def clean_uom(df,component,data_dict):
457
    grouped = df.groupby(column_names.UNITS)
458
    for old_uom,group in grouped:
459
        new_uom = process_uom(old_uom,component,data_dict)
460
        df.loc[group.index,column_names.UNITS] = new_uom
461
        if not (old_uom == new_uom):
462
            df.loc[group.index,ITEMID] = utils.append_to_description(df.loc[group.index,ITEMID].astype(str),old_uom)
463
    return df
464
465
def process_uom(units,component,data_dict):
466
467
    if units in ['BPM','bpm']:
468
        if component == data_dict.components.HEART_RATE: units = 'beats/min'
469
        if component == data_dict.components.RESPIRATORY_RATE: units = 'breaths/min'
470
    for to_replace,replacement in UOM_MAP.iteritems():
471
        units = re.sub(to_replace, replacement,units,flags=re.IGNORECASE)
472
    return units
473
474
class clean_extract(BaseEstimator,TransformerMixin):
475
476
    def fit(self, x, y=None):
477
        return self
478
479
    def transform(self, df):
480
        """
481
        FORMAT pre-unstack columns
482
        """
483
        df = df.replace(to_replace='', value=pd.np.nan)
484
        #drop NAN record_id, timestamps, or value
485
        df.dropna(subset=[column_names.ID,column_names.DATETIME,column_names.VALUE], how='any',inplace=True)
486
487
        #ID to integer
488
        df.loc[:,column_names.ID] = df.loc[:,column_names.ID].astype(int)
489
490
        #DATETIME to pd.DATETIME
491
        df.loc[:,column_names.DATETIME] = pd.to_datetime(df.loc[:,column_names.DATETIME],errors='raise')
492
493
        #set UOM to NO_UOM if not declared
494
        df.loc[:,column_names.UNITS] = df.loc[:,column_names.UNITS].fillna(NO_UNITS)
495
496
        df.rename(index=str,columns={ITEMID:column_names.DESCRIPTION},inplace=True)
497
        index_cols = [
498
                    column_names.ID,
499
                    column_names.DATETIME,
500
                    column_names.DESCRIPTION,
501
                    column_names.UNITS
502
                ]
503
        #Set up our row index
504
        df.set_index(index_cols,inplace=True)
505
506
        return df
507
508
509
510
class unstacker(transformers.safe_unstacker):
511
512
    def __init__(self):
513
        super(unstacker,self).__init__(column_names.UNITS,column_names.DESCRIPTION)
514
515
def transform_pipeline(component,data_dict):
516
    return Pipeline([
517
        ('clean_units',CleanUnits(component,data_dict)),
518
        ('clean_df',clean_extract()),
519
        ('unstack',unstacker()),
520
        ('add_level',transformers.add_level(component,'component',axis=1)),
521
    ])
522
523
def standard_cleaners(data_dict):
524
    category_map = mimic_category_map(data_dict)
525
    ureg = units.MedicalUreg()
526
    return Pipeline([
527
        ('aggregate_same_datetime',transformers.same_index_aggregator(lambda grp:grp.iloc[0])),
528
        ('split_dtype',transformers.split_dtype()),
529
        ('standardize_columns',transformers.column_standardizer(data_dict,ureg)),
530
        ('standardize_categories',transformers.standardize_categories(data_dict,category_map)),
531
        ('split_bad_categories',transformers.split_bad_categories(data_dict)),
532
        ('one_hotter',transformers.nominal_to_onehot()),
533
        ('drop_oob_values',transformers.oob_value_remover(data_dict))
534
    ])
535
536
537
def mimic_category_map(data_dict):
538
    return {
539
        data_dict.components.GLASGOW_COMA_SCALE_EYE_OPENING: {
540
            '1 No Response': 6,
541
            '2 To pain': 7,
542
            '3 To speech': 8,
543
            '4 Spontaneously': 9
544
        },
545
        data_dict.components.GLASGOW_COMA_SCALE_MOTOR: {
546
            '1 No Response': 0,
547
            '2 Abnorm extensn': 1,
548
            '3 Abnorm flexion': 2,
549
            '4 Flex-withdraws': 3,
550
            '5 Localizes Pain': 4,
551
            '6 Obeys Commands': 5
552
        },
553
        data_dict.components.GLASGOW_COMA_SCALE_VERBAL: {
554
            '1 No Response': 10,
555
            '1.0 ET/Trach': 10,
556
            '2 Incomp sounds': 11,
557
            '3 Inapprop words': 12,
558
            '4 Confused': 13,
559
            '5 Oriented':14
560
        }
561
    }
562
563
564
565
566
567
def ETL(extractor,
568
        components,
569
        data_dict,
570
        same_dt_aggregator,
571
        hdf5_fname=None,joined_path=None,
572
        hadm_ids=ALL,
573
        use_base_df=True,
574
        to_pandas=False,
575
        chunksize=500000):
576
577
    logger.log('***ETL***',new_level=True)
578
    logger.log('SETUP',new_level=True)
579
580
    category_map = mimic_category_map(data_dict)
581
    ureg = units.MedicalUreg()
582
583
    transformer = transform_pipeline()
584
585
    standard_clean_pipeline = Pipeline([
586
        ('aggregate_same_datetime',same_dt_aggregator),
587
        ('split_dtype',transformers.split_dtype()),
588
        ('standardize_columns',transformers.column_standardizer(data_dict,ureg)),
589
        ('standardize_categories',transformers.standardize_categories(data_dict,category_map)),
590
        ('split_bad_categories',transformers.split_bad_categories(data_dict)),
591
        # ('one_hotter',transformers.nominal_to_onehot()),
592
        ('drop_oob_values',transformers.oob_value_remover(data_dict))
593
    ])
594
595
    should_save = (hdf5_fname is not None)
596
597
    df_base = None
598
599
    if should_save & use_base_df:
600
        try:
601
            df_base = utils.open_df(hdf5_fname,joined_path)
602
        except:
603
            pass
604
605
    if df_base is not None:
606
607
608
        existing_components = df_base.columns.get_level_values(column_names.COMPONENT).unique().tolist()
609
        existing_ids = set(df_base.index.get_level_values(column_names.ID).tolist())
610
        requested_ids = hadm_ids if hadm_ids != ALL else get_all_hadm_ids()
611
612
        new_ids = [ID for ID in requested_ids if ID not in existing_ids]
613
614
615
        #case 1: new ids in existing columns, don't try to be smart with ALL unless not a lot of IDs
616
        if len(new_ids) > 0:
617
            df_addition = ETL(extractor,
618
                                existing_components,
619
                                data_dict,
620
                                same_dt_aggregator,
621
                                hadm_ids=new_ids,
622
                                to_pandas=True)
623
            if df_addition is not None:
624
                df_base = pd.concat([df_base,df_addition])
625
            #now we only need to load NEW components
626
            components = [comp for comp in components if comp not in existing_components]
627
628
        logger.log('Base DF to Dask')
629
        df_base = dd.from_pandas(df_base.reset_index(), chunksize=chunksize)
630
631
632
    df_all = df_base
633
634
    logger.log('BEGIN ETL for {} admissions and {} components: {}'.format(hadm_ids if hadm_ids == ALL else len(hadm_ids),
635
                                                                            len(components),
636
                                                                            components),new_level=True,end_level=True)
637
    for component in components:
638
        logger.log('{}: {}/{}'.format(component.upper(),components.index(component)+1,len(components)),new_level=True)
639
640
        """
641
        @@@@@@@@@@@@@@@
642
        ----EXTRACT----
643
        @@@@@@@@@@@@@@@
644
        """
645
646
        logger.log("Extracting...",new_level=True)
647
        df_extracted = extractor.extract_component(component,hadm_ids)
648
649
        if df_extracted.empty:
650
            print 'EMPTY Dataframe EXTRACTED for {}, n={} ids'.format(component,len(hadm_ids))
651
            logger.end_log_level()
652
            continue
653
654
        if should_save:
655
            logger.log('Save EXTRACTED DF = {}'.format(df_extracted.shape))
656
            utils.save_df(df_extracted,hdf5_fname,'extracted/{}'.format(component))
657
        logger.end_log_level()
658
659
660
        """
661
        @@@@@@@@@@@@@@@@@
662
        ----TRANSFORM----
663
        @@@@@@@@@@@@@@@@@
664
        """
665
666
        logger.log("Transforming... {}".format(df_extracted.shape),new_level=True)
667
        transformer.set_params(add_level__level_val=component)
668
        df_transformed = transformer.transform(df_extracted)
669
670
        print 'Data Loss (Extract > Transformed):',utils.data_loss(df_extracted.set_index(column_names.ID).value.to_frame(),df_transformed)
671
672
        if df_transformed.empty:
673
            print 'EMPTY Dataframe TRANSFORMED for {}, n={} ids'.format(component,len(hadm_ids))
674
            logger.end_log_level()
675
            continue
676
677
        if should_save:
678
            logger.log('Save TRANSFORMED DF = {}'.format(df_transformed.shape))
679
            utils.save_df(df_transformed,hdf5_fname,'transformed/{}'.format(component))
680
        logger.end_log_level()
681
682
683
684
        """
685
        @@@@@@@@@@@@@@@
686
        -----CLEAN-----
687
        @@@@@@@@@@@@@@@
688
        """
689
690
        logger.log("Cleaning... {}".format(df_transformed.shape),new_level=True)
691
        df = standard_clean_pipeline.transform(df_transformed)
692
693
        print 'Data Loss (Extract > Cleaned):', utils.data_loss(df_extracted.set_index(column_names.ID).value.to_frame(),df)
694
695
        if df.empty:
696
            print 'EMPTY Dataframe TRANSFORMED for {}, n={} ids'.format(component,len(hadm_ids))
697
            logger.end_log_level()
698
            continue
699
700
        if should_save:
701
            logger.log('Save CLEANED DF = {}'.format(df.shape))
702
            utils.save_df(df,hdf5_fname,'cleaned/{}'.format(component))
703
        logger.end_log_level()
704
705
        del df_extracted,df_transformed
706
707
        logger.log('Filter & sort - {}'.format(df.shape))
708
709
        df.sort_index(inplace=True)
710
        df.sort_index(inplace=True, axis=1)
711
712
713
        logger.log('Convert to dask - {}'.format(df.shape))
714
        df_dask = dd.from_pandas(df.reset_index(), chunksize=chunksize)
715
        del df
716
717
        logger.log('Join to big DF')
718
719
        if df_all is None: df_all = df_dask
720
        else :
721
            df_all = df_all.merge(df_dask,how='outer', on=['id','datetime'])
722
            del df_dask
723
724
        logger.end_log_level()
725
    logger.end_log_level()
726
727
    if df_all is None or not to_pandas:
728
        logger.end_log_level()
729
        return df_all
730
731
    logger.log('Dask DF back to pandas')
732
    df_pd = df_all.compute()
733
    del df_all
734
    df_pd.set_index(['id','datetime'], inplace=True)
735
736
    logger.log('SORT Joined DF')
737
    df_pd.sort_index(inplace=True)
738
    df_pd.sort_index(inplace=True, axis=1)
739
740
    if should_save:
741
        logger.log('SAVE Big DF')
742
        utils.save_df(df_pd,hdf5_fname,joined_path)
743
    logger.end_log_level()
744
745
    return df_pd