Diff of /utils.py [000000] .. [418e14]

Switch to unified view

a b/utils.py
1
import sqlalchemy
2
import pandas as pd
3
from constants import variable_type,column_names,ALL
4
import logger
5
import numpy as np
6
import dask.dataframe as dd
7
8
9
def psql_connect(user, password, database='postgres', host='localhost', port=5432):
10
    '''Returns a connection and a metadata object'''
11
    # We connect with the help of the PostgreSQL URL
12
    # postgresql://federer:grandestslam@localhost:5432/tennis
13
    url = 'postgresql://{}:{}@{}:{}/{}'
14
    url = url.format(user, password, host, port, database)
15
16
    # The return value of create_engine() is our connection object
17
    connection = sqlalchemy.create_engine(url, client_encoding='utf8')
18
19
    return connection
20
21
def simple_sql_query(table,columns=['*'],where_condition=None):
22
    where = ''
23
    if where_condition is not None:
24
        where = ' WHERE {}'.format(where_condition)
25
    return 'SELECT {} FROM {}{}'.format(','.join(columns),table,where)
26
27
def save_df(df, hdf5_fname, path):
28
    store = pd.HDFStore(hdf5_fname)
29
    store[path] = df
30
    store.close()
31
    return df
32
33
def open_df(hdf5_fname, path):
34
    store = pd.HDFStore(hdf5_fname)
35
    df = store[path]
36
    store.close()
37
    return df
38
39
40
def is_categorical(var_type):
41
    return var_type in [variable_type.ORDINAL, variable_type.NOMINAL]
42
43
class Bunch(object):
44
    def __init__(self, **kwds):
45
        self.__dict__.update(kwds)
46
47
def add_subindex(df,subindex_name):
48
    df = df.sort_index()
49
    index = df.index
50
    df[subindex_name] = range(0,len(index))
51
    duplicated = index.duplicated(keep=False)
52
    df.loc[duplicated,subindex_name] = df.loc[duplicated].groupby(level=index.names)[subindex_name].apply(lambda x:x - x.min())
53
    df.loc[~duplicated,subindex_name] = 0
54
    df.loc[:,subindex_name] = df.loc[:,subindex_name].astype(int)
55
    df.set_index(subindex_name, append=True,inplace=True)
56
    return df
57
58
def add_same_val_index_level(df,level_val,level_name,axis=0):
59
    return pd.concat([df], keys=[level_val], names=[level_name],axis=axis)
60
61
def set_level_to_same_val(index,level,value):
62
    return index.set_labels([0]*index.size,level=level).set_levels([value],level=level)
63
64
def data_loss(df_start,df_end):
65
    data_loss = df_start.count().sum() - df_end.count().sum()
66
    admissions_start = len(df_start.index.get_level_values(column_names.ID).unique().tolist())
67
    admisstions_end = len(df_end.index.get_level_values(column_names.ID).unique().tolist())
68
69
    admission_loss = admissions_start - admisstions_end
70
    percent_loss = str(round(float(admission_loss)/admissions_start * 100,4))+'% records'
71
72
    return df_start.shape,df_end.shape,data_loss,admission_loss,percent_loss
73
74
75
def get_components(df):
76
    return df.columns.get_level_values('component').unique().tolist()
77
78
def filter_columns(df,level,value):
79
    return df.loc[:,df.columns.get_level_values(level) == value]
80
81
def append_to_description(old_desc,addition):
82
    return old_desc + '(' + addition + ')'
83
84
def get_first_dt(df_ts,df_context):
85
    ids = df_ts.index.get_level_values(column_names.ID).unique()
86
    admit_dt = get_admit_dt(df_context,ids)
87
88
    first_obs_dt = get_first_obs_dt(df_ts)
89
    first_dt = first_obs_dt.to_frame().join(admit_dt,how='left').apply(pd.np.min,axis=1)
90
    first_dt.name = column_names.DATETIME
91
    return first_dt
92
93
def get_admit_dt(df_context,ids=ALL):
94
    if not ids == ALL:
95
        df_context_filtered = df_context[df_context[column_names.ID].isin(ids)]
96
    else: df_context_filtered = df_context
97
    admit_dt = df_context_filtered.loc[:,['id','start_dt']].drop_duplicates(subset=['id']).set_index('id')
98
    return admit_dt
99
100
def get_first_obs_dt(df_ts):
101
    first_obs_dt = df_ts.groupby(level=column_names.ID).apply(lambda x:x.iloc[0].name[-1])
102
    first_obs_dt.name = 'start_dt_obs'
103
    return first_obs_dt
104
105
def flatten_index(df,join_char='_',suffix=None,axis=0):
106
    if axis==0:
107
        new_vals = []
108
        for row_name in df.index:
109
            if type(row_name) is tuple:
110
                row = join_char.join(map(str,row_name))
111
            else:
112
                row = str(row_name)
113
114
            if suffix is not None: row = row + join_char + suffix
115
116
            new_vals.append(row)
117
118
        df.index = pd.Index(new_vals)
119
    elif axis==1:
120
        new_vals = []
121
        for col_name in df.columns:
122
            if type(col_name) is tuple:
123
                col = join_char.join(map(str,col_name))
124
            else:
125
                col = str(col_name)
126
            if suffix is not None: col = col + join_char + suffix
127
128
            new_vals.append(col)
129
130
        df.columns = pd.Index(new_vals)
131
    else: pass
132
    return df
133
134
def smart_count(col):
135
    var_type = col.name[2]
136
    if (var_type == variable_type.NOMINAL) and (col.dtype == pd.np.uint8):
137
        return col.sum()
138
139
    return col.count()
140
141
142
"""
143
Pytables/HDF5 I/O with axis deconstruction
144
"""
145
146
def read_and_reconstruct(hdf5_fname,path,where=None):
147
    # Get all paths for dataframes in store
148
    data_path,col_path = deconstucted_paths(path)
149
150
    data = pd.read_hdf(hdf5_fname,data_path,where=where)
151
    columns = pd.read_hdf(hdf5_fname,col_path)
152
    return reconstruct_df(data,columns)
153
154
155
def reconstruct_df(data,column_df):
156
    #reconstruct the columns from dataframe
157
    column_index = reconstruct_columns(column_df,data.columns)
158
    df = data.copy()
159
    df.columns = column_index
160
    return df
161
162
def reconstruct_columns(column_df,col_ix=None):
163
    column_df = column_df.drop('dtype',axis=1)
164
    if col_ix is None: col_ix = column_df.index
165
    col_arys = column_df.loc[col_ix].T.values
166
    levels = column_df.columns.tolist()
167
168
    return pd.MultiIndex.from_arrays(col_arys,names=levels)
169
170
def deconstruct_and_write(df,hdf5_fname,path,append=False):
171
172
    # Deconstruct the dataframe
173
    data,columns = deconstruct_df(df)
174
175
    # Get all paths for dataframes in store
176
    data_path,col_path = deconstucted_paths(path)
177
178
    #Open store and save df
179
    store = pd.HDFStore(hdf5_fname)
180
    store.put(data_path,data,append=append,format='t')
181
    if (not append) or col_path not in store:
182
        columns.to_hdf(hdf5_fname,col_path,format='t')
183
    store.close()
184
    return
185
186
def deconstruct_df(df):
187
    columns = pd.DataFrame(map(list,df.columns.tolist()),columns=df.columns.names)
188
    columns['dtype'] = df.dtypes.values.astype(str)
189
    data = df.copy()
190
    data.columns = [i for i in range(df.shape[1])]
191
    return data,columns
192
193
def deconstucted_paths(path):
194
    data_path = '{}/{}'.format(path,'data')
195
    col_path = '{}/{}'.format(path,'columns')
196
    return data_path,col_path
197
198
def complex_row_mask(df,specs,operator='or'):
199
    #init our mask datframe with row index of passed in dataframe
200
    df_mask = pd.DataFrame(index=df.index)
201
202
    #make sure our specs are a list
203
    if not isinstance(specs,list): specs = [specs]
204
205
    #if we have no specs, then we are not going to mask anything
206
    if (specs is None) or (len(specs) == 0):
207
        df_mask.loc[:,0] = True
208
209
    #apply specs to create mask
210
    for idx,spec in enumerate(specs):
211
        df_spec_mask = pd.DataFrame(index=df.index)
212
        for col_name,spec_info in spec.iteritems():
213
            if callable(spec_info):
214
                df_spec_mask.loc[:,col_name] = df.loc[:,col_name].apply(spec_info)
215
            else:
216
                if not isinstance(spec_info,list): spec_info = [spec_info]
217
                df_spec_mask.loc[:,col_name] = df.loc[:,col_name].isin(spec_info)
218
        df_mask.loc[:,idx] = df_spec_mask.all(axis=1)
219
220
    #if or, will will include rows from each spec.
221
    #   if and, only rows that meet criteria of EVERY spec ar included
222
    if operator == 'or' : mask = df_mask.any(axis=1)
223
    else: mask = df_mask.all(axis=1)
224
    return mask
225
226
def smart_join(hdf5_fname,paths,joined_path,ids,
227
                                        chunksize=5000,
228
                                        need_deconstruct=True,
229
                                        hdf5_fname_for_join=None,
230
                                        overwrite=True):
231
232
    logger.log('Smart join: n={}, {}'.format(len(ids),paths),new_level=True)
233
234
    if hdf5_fname_for_join is None: hdf5_fname_for_join=hdf5_fname
235
236
    store = pd.HDFStore(hdf5_fname_for_join)
237
    if (joined_path in store):
238
        if overwrite: del store[joined_path]
239
        else :
240
            store.close()
241
            logger.end_log_level()
242
            return hdf5_fname_for_join
243
    #sort ids, should speed up where clauses and selects
244
    ids = sorted(ids)
245
246
    #do chunked join
247
    logger.log('JOINING dataframes',new_level=True)
248
    for ix_start in range(0,len(ids),chunksize):
249
        ix_end = min(ix_start + chunksize,len(ids))
250
        id_slice = ids[ix_start:ix_end]
251
252
        where = '{id_col} in {id_list}'.format(id_col=column_names.ID,id_list=id_slice)
253
254
        logger.log('Slice & Join: {} --> {}, n={}'.format(id_slice[0], id_slice[-1],len(id_slice)),new_level=True)
255
        df_slice = None
256
        # for path in df_dict.keys():
257
        for path in paths:
258
            try:
259
                logger.log(path)
260
                if need_deconstruct: slice_to_add = read_and_reconstruct(hdf5_fname,path,where=where)
261
                else: slice_to_add = pd.read_hdf(hdf5_fname,path,where=where)
262
            except KeyError as err:
263
                logger.log(end_prev=True,start=False)
264
                print err
265
                continue
266
267
            if df_slice is None: df_slice = slice_to_add
268
            else:
269
                df_slice = df_slice.join(slice_to_add,how='outer')
270
                del slice_to_add
271
272
        logger.end_log_level()
273
        logger.log('Append slice')
274
275
        if need_deconstruct: deconstruct_and_write(df_slice,hdf5_fname_for_join,joined_path,append=True)
276
        else: df_slice.to_hdf(hdf5_fname_for_join,joined_path,append=True,format='t')
277
278
        del df_slice
279
280
    logger.end_log_level()
281
    logger.end_log_level()
282
283
    return hdf5_fname_for_join
284
285
def make_list_hash(l):
286
    #need to sort and make sure list are unique before hashing
287
    l = sorted(list(set(l)))
288
289
    #use a hash to make sure store this set of ids uniquely
290
    key = hash(''.join(map(str,l)))
291
292
    return key
293
294
"""
295
Dask intelligent join
296
"""
297
298
def dask_open_and_join(hdf5_fname,path,components,ids=ALL,chunksize=500000):
299
300
    df_all=None
301
    logger.log('DASK OPEN & JOIN n={} components: {}'.format(len(components),components),new_level=True)
302
    for component in components:
303
        logger.log('{}: {}/{}'.format(component.upper(),components.index(component)+1,len(components)),new_level=True)
304
305
        df_comp = open_df(hdf5_fname,'{}/{}'.format(path,component))
306
        df_comp.sort_index(inplace=True)
307
        df_comp.sort_index(inplace=True, axis=1)
308
309
        if not ids == ALL:
310
            df_comp = df_comp[df_comp.index.get_level_values(column_names.ID).isin(ids)]
311
312
        logger.log('Convert to dask - {}'.format(df_comp.shape))
313
        df_dask = dd.from_pandas(df_comp.reset_index(), chunksize=chunksize)
314
        del df_comp
315
316
        logger.log('Join to big DF')
317
318
        if df_all is None: df_all = df_dask
319
        else :
320
            df_all = df_all.merge(df_dask,how='outer', on=['id','datetime'])
321
            del df_dask
322
        logger.end_log_level()
323
324
    logger.log('Dask DF back to pandas')
325
    df_pd = df_all.compute()
326
    del df_all
327
    df_pd.set_index(['id','datetime'], inplace=True)
328
329
    logger.log('SORT Joined DF')
330
    df_pd.sort_index(inplace=True)
331
    df_pd.sort_index(inplace=True, axis=1)
332
    logger.end_log_level()
333
    return df_pd
334
335
336
"""
337
Visualization
338
"""
339
import seaborn as sns
340
import matplotlib.pyplot as plt
341
342
def heatmap(df_ts):
343
    sns.set(context="paper", font="monospace")
344
    corrmat = df_ts.corr()
345
    # Set up the matplotlib figure
346
    f, ax = plt.subplots(figsize=(50, 50))
347
    # Draw the heatmap using seaborn
348
    sns.heatmap(corrmat, vmax=1, square=True)
349
    return