|
a |
|
b/utils.py |
|
|
1 |
import sqlalchemy |
|
|
2 |
import pandas as pd |
|
|
3 |
from constants import variable_type,column_names,ALL |
|
|
4 |
import logger |
|
|
5 |
import numpy as np |
|
|
6 |
import dask.dataframe as dd |
|
|
7 |
|
|
|
8 |
|
|
|
9 |
def psql_connect(user, password, database='postgres', host='localhost', port=5432): |
|
|
10 |
'''Returns a connection and a metadata object''' |
|
|
11 |
# We connect with the help of the PostgreSQL URL |
|
|
12 |
# postgresql://federer:grandestslam@localhost:5432/tennis |
|
|
13 |
url = 'postgresql://{}:{}@{}:{}/{}' |
|
|
14 |
url = url.format(user, password, host, port, database) |
|
|
15 |
|
|
|
16 |
# The return value of create_engine() is our connection object |
|
|
17 |
connection = sqlalchemy.create_engine(url, client_encoding='utf8') |
|
|
18 |
|
|
|
19 |
return connection |
|
|
20 |
|
|
|
21 |
def simple_sql_query(table,columns=['*'],where_condition=None): |
|
|
22 |
where = '' |
|
|
23 |
if where_condition is not None: |
|
|
24 |
where = ' WHERE {}'.format(where_condition) |
|
|
25 |
return 'SELECT {} FROM {}{}'.format(','.join(columns),table,where) |
|
|
26 |
|
|
|
27 |
def save_df(df, hdf5_fname, path): |
|
|
28 |
store = pd.HDFStore(hdf5_fname) |
|
|
29 |
store[path] = df |
|
|
30 |
store.close() |
|
|
31 |
return df |
|
|
32 |
|
|
|
33 |
def open_df(hdf5_fname, path): |
|
|
34 |
store = pd.HDFStore(hdf5_fname) |
|
|
35 |
df = store[path] |
|
|
36 |
store.close() |
|
|
37 |
return df |
|
|
38 |
|
|
|
39 |
|
|
|
40 |
def is_categorical(var_type): |
|
|
41 |
return var_type in [variable_type.ORDINAL, variable_type.NOMINAL] |
|
|
42 |
|
|
|
43 |
class Bunch(object): |
|
|
44 |
def __init__(self, **kwds): |
|
|
45 |
self.__dict__.update(kwds) |
|
|
46 |
|
|
|
47 |
def add_subindex(df,subindex_name): |
|
|
48 |
df = df.sort_index() |
|
|
49 |
index = df.index |
|
|
50 |
df[subindex_name] = range(0,len(index)) |
|
|
51 |
duplicated = index.duplicated(keep=False) |
|
|
52 |
df.loc[duplicated,subindex_name] = df.loc[duplicated].groupby(level=index.names)[subindex_name].apply(lambda x:x - x.min()) |
|
|
53 |
df.loc[~duplicated,subindex_name] = 0 |
|
|
54 |
df.loc[:,subindex_name] = df.loc[:,subindex_name].astype(int) |
|
|
55 |
df.set_index(subindex_name, append=True,inplace=True) |
|
|
56 |
return df |
|
|
57 |
|
|
|
58 |
def add_same_val_index_level(df,level_val,level_name,axis=0): |
|
|
59 |
return pd.concat([df], keys=[level_val], names=[level_name],axis=axis) |
|
|
60 |
|
|
|
61 |
def set_level_to_same_val(index,level,value): |
|
|
62 |
return index.set_labels([0]*index.size,level=level).set_levels([value],level=level) |
|
|
63 |
|
|
|
64 |
def data_loss(df_start,df_end): |
|
|
65 |
data_loss = df_start.count().sum() - df_end.count().sum() |
|
|
66 |
admissions_start = len(df_start.index.get_level_values(column_names.ID).unique().tolist()) |
|
|
67 |
admisstions_end = len(df_end.index.get_level_values(column_names.ID).unique().tolist()) |
|
|
68 |
|
|
|
69 |
admission_loss = admissions_start - admisstions_end |
|
|
70 |
percent_loss = str(round(float(admission_loss)/admissions_start * 100,4))+'% records' |
|
|
71 |
|
|
|
72 |
return df_start.shape,df_end.shape,data_loss,admission_loss,percent_loss |
|
|
73 |
|
|
|
74 |
|
|
|
75 |
def get_components(df): |
|
|
76 |
return df.columns.get_level_values('component').unique().tolist() |
|
|
77 |
|
|
|
78 |
def filter_columns(df,level,value): |
|
|
79 |
return df.loc[:,df.columns.get_level_values(level) == value] |
|
|
80 |
|
|
|
81 |
def append_to_description(old_desc,addition): |
|
|
82 |
return old_desc + '(' + addition + ')' |
|
|
83 |
|
|
|
84 |
def get_first_dt(df_ts,df_context): |
|
|
85 |
ids = df_ts.index.get_level_values(column_names.ID).unique() |
|
|
86 |
admit_dt = get_admit_dt(df_context,ids) |
|
|
87 |
|
|
|
88 |
first_obs_dt = get_first_obs_dt(df_ts) |
|
|
89 |
first_dt = first_obs_dt.to_frame().join(admit_dt,how='left').apply(pd.np.min,axis=1) |
|
|
90 |
first_dt.name = column_names.DATETIME |
|
|
91 |
return first_dt |
|
|
92 |
|
|
|
93 |
def get_admit_dt(df_context,ids=ALL): |
|
|
94 |
if not ids == ALL: |
|
|
95 |
df_context_filtered = df_context[df_context[column_names.ID].isin(ids)] |
|
|
96 |
else: df_context_filtered = df_context |
|
|
97 |
admit_dt = df_context_filtered.loc[:,['id','start_dt']].drop_duplicates(subset=['id']).set_index('id') |
|
|
98 |
return admit_dt |
|
|
99 |
|
|
|
100 |
def get_first_obs_dt(df_ts): |
|
|
101 |
first_obs_dt = df_ts.groupby(level=column_names.ID).apply(lambda x:x.iloc[0].name[-1]) |
|
|
102 |
first_obs_dt.name = 'start_dt_obs' |
|
|
103 |
return first_obs_dt |
|
|
104 |
|
|
|
105 |
def flatten_index(df,join_char='_',suffix=None,axis=0): |
|
|
106 |
if axis==0: |
|
|
107 |
new_vals = [] |
|
|
108 |
for row_name in df.index: |
|
|
109 |
if type(row_name) is tuple: |
|
|
110 |
row = join_char.join(map(str,row_name)) |
|
|
111 |
else: |
|
|
112 |
row = str(row_name) |
|
|
113 |
|
|
|
114 |
if suffix is not None: row = row + join_char + suffix |
|
|
115 |
|
|
|
116 |
new_vals.append(row) |
|
|
117 |
|
|
|
118 |
df.index = pd.Index(new_vals) |
|
|
119 |
elif axis==1: |
|
|
120 |
new_vals = [] |
|
|
121 |
for col_name in df.columns: |
|
|
122 |
if type(col_name) is tuple: |
|
|
123 |
col = join_char.join(map(str,col_name)) |
|
|
124 |
else: |
|
|
125 |
col = str(col_name) |
|
|
126 |
if suffix is not None: col = col + join_char + suffix |
|
|
127 |
|
|
|
128 |
new_vals.append(col) |
|
|
129 |
|
|
|
130 |
df.columns = pd.Index(new_vals) |
|
|
131 |
else: pass |
|
|
132 |
return df |
|
|
133 |
|
|
|
134 |
def smart_count(col): |
|
|
135 |
var_type = col.name[2] |
|
|
136 |
if (var_type == variable_type.NOMINAL) and (col.dtype == pd.np.uint8): |
|
|
137 |
return col.sum() |
|
|
138 |
|
|
|
139 |
return col.count() |
|
|
140 |
|
|
|
141 |
|
|
|
142 |
""" |
|
|
143 |
Pytables/HDF5 I/O with axis deconstruction |
|
|
144 |
""" |
|
|
145 |
|
|
|
146 |
def read_and_reconstruct(hdf5_fname,path,where=None): |
|
|
147 |
# Get all paths for dataframes in store |
|
|
148 |
data_path,col_path = deconstucted_paths(path) |
|
|
149 |
|
|
|
150 |
data = pd.read_hdf(hdf5_fname,data_path,where=where) |
|
|
151 |
columns = pd.read_hdf(hdf5_fname,col_path) |
|
|
152 |
return reconstruct_df(data,columns) |
|
|
153 |
|
|
|
154 |
|
|
|
155 |
def reconstruct_df(data,column_df): |
|
|
156 |
#reconstruct the columns from dataframe |
|
|
157 |
column_index = reconstruct_columns(column_df,data.columns) |
|
|
158 |
df = data.copy() |
|
|
159 |
df.columns = column_index |
|
|
160 |
return df |
|
|
161 |
|
|
|
162 |
def reconstruct_columns(column_df,col_ix=None): |
|
|
163 |
column_df = column_df.drop('dtype',axis=1) |
|
|
164 |
if col_ix is None: col_ix = column_df.index |
|
|
165 |
col_arys = column_df.loc[col_ix].T.values |
|
|
166 |
levels = column_df.columns.tolist() |
|
|
167 |
|
|
|
168 |
return pd.MultiIndex.from_arrays(col_arys,names=levels) |
|
|
169 |
|
|
|
170 |
def deconstruct_and_write(df,hdf5_fname,path,append=False): |
|
|
171 |
|
|
|
172 |
# Deconstruct the dataframe |
|
|
173 |
data,columns = deconstruct_df(df) |
|
|
174 |
|
|
|
175 |
# Get all paths for dataframes in store |
|
|
176 |
data_path,col_path = deconstucted_paths(path) |
|
|
177 |
|
|
|
178 |
#Open store and save df |
|
|
179 |
store = pd.HDFStore(hdf5_fname) |
|
|
180 |
store.put(data_path,data,append=append,format='t') |
|
|
181 |
if (not append) or col_path not in store: |
|
|
182 |
columns.to_hdf(hdf5_fname,col_path,format='t') |
|
|
183 |
store.close() |
|
|
184 |
return |
|
|
185 |
|
|
|
186 |
def deconstruct_df(df): |
|
|
187 |
columns = pd.DataFrame(map(list,df.columns.tolist()),columns=df.columns.names) |
|
|
188 |
columns['dtype'] = df.dtypes.values.astype(str) |
|
|
189 |
data = df.copy() |
|
|
190 |
data.columns = [i for i in range(df.shape[1])] |
|
|
191 |
return data,columns |
|
|
192 |
|
|
|
193 |
def deconstucted_paths(path): |
|
|
194 |
data_path = '{}/{}'.format(path,'data') |
|
|
195 |
col_path = '{}/{}'.format(path,'columns') |
|
|
196 |
return data_path,col_path |
|
|
197 |
|
|
|
198 |
def complex_row_mask(df,specs,operator='or'): |
|
|
199 |
#init our mask datframe with row index of passed in dataframe |
|
|
200 |
df_mask = pd.DataFrame(index=df.index) |
|
|
201 |
|
|
|
202 |
#make sure our specs are a list |
|
|
203 |
if not isinstance(specs,list): specs = [specs] |
|
|
204 |
|
|
|
205 |
#if we have no specs, then we are not going to mask anything |
|
|
206 |
if (specs is None) or (len(specs) == 0): |
|
|
207 |
df_mask.loc[:,0] = True |
|
|
208 |
|
|
|
209 |
#apply specs to create mask |
|
|
210 |
for idx,spec in enumerate(specs): |
|
|
211 |
df_spec_mask = pd.DataFrame(index=df.index) |
|
|
212 |
for col_name,spec_info in spec.iteritems(): |
|
|
213 |
if callable(spec_info): |
|
|
214 |
df_spec_mask.loc[:,col_name] = df.loc[:,col_name].apply(spec_info) |
|
|
215 |
else: |
|
|
216 |
if not isinstance(spec_info,list): spec_info = [spec_info] |
|
|
217 |
df_spec_mask.loc[:,col_name] = df.loc[:,col_name].isin(spec_info) |
|
|
218 |
df_mask.loc[:,idx] = df_spec_mask.all(axis=1) |
|
|
219 |
|
|
|
220 |
#if or, will will include rows from each spec. |
|
|
221 |
# if and, only rows that meet criteria of EVERY spec ar included |
|
|
222 |
if operator == 'or' : mask = df_mask.any(axis=1) |
|
|
223 |
else: mask = df_mask.all(axis=1) |
|
|
224 |
return mask |
|
|
225 |
|
|
|
226 |
def smart_join(hdf5_fname,paths,joined_path,ids, |
|
|
227 |
chunksize=5000, |
|
|
228 |
need_deconstruct=True, |
|
|
229 |
hdf5_fname_for_join=None, |
|
|
230 |
overwrite=True): |
|
|
231 |
|
|
|
232 |
logger.log('Smart join: n={}, {}'.format(len(ids),paths),new_level=True) |
|
|
233 |
|
|
|
234 |
if hdf5_fname_for_join is None: hdf5_fname_for_join=hdf5_fname |
|
|
235 |
|
|
|
236 |
store = pd.HDFStore(hdf5_fname_for_join) |
|
|
237 |
if (joined_path in store): |
|
|
238 |
if overwrite: del store[joined_path] |
|
|
239 |
else : |
|
|
240 |
store.close() |
|
|
241 |
logger.end_log_level() |
|
|
242 |
return hdf5_fname_for_join |
|
|
243 |
#sort ids, should speed up where clauses and selects |
|
|
244 |
ids = sorted(ids) |
|
|
245 |
|
|
|
246 |
#do chunked join |
|
|
247 |
logger.log('JOINING dataframes',new_level=True) |
|
|
248 |
for ix_start in range(0,len(ids),chunksize): |
|
|
249 |
ix_end = min(ix_start + chunksize,len(ids)) |
|
|
250 |
id_slice = ids[ix_start:ix_end] |
|
|
251 |
|
|
|
252 |
where = '{id_col} in {id_list}'.format(id_col=column_names.ID,id_list=id_slice) |
|
|
253 |
|
|
|
254 |
logger.log('Slice & Join: {} --> {}, n={}'.format(id_slice[0], id_slice[-1],len(id_slice)),new_level=True) |
|
|
255 |
df_slice = None |
|
|
256 |
# for path in df_dict.keys(): |
|
|
257 |
for path in paths: |
|
|
258 |
try: |
|
|
259 |
logger.log(path) |
|
|
260 |
if need_deconstruct: slice_to_add = read_and_reconstruct(hdf5_fname,path,where=where) |
|
|
261 |
else: slice_to_add = pd.read_hdf(hdf5_fname,path,where=where) |
|
|
262 |
except KeyError as err: |
|
|
263 |
logger.log(end_prev=True,start=False) |
|
|
264 |
print err |
|
|
265 |
continue |
|
|
266 |
|
|
|
267 |
if df_slice is None: df_slice = slice_to_add |
|
|
268 |
else: |
|
|
269 |
df_slice = df_slice.join(slice_to_add,how='outer') |
|
|
270 |
del slice_to_add |
|
|
271 |
|
|
|
272 |
logger.end_log_level() |
|
|
273 |
logger.log('Append slice') |
|
|
274 |
|
|
|
275 |
if need_deconstruct: deconstruct_and_write(df_slice,hdf5_fname_for_join,joined_path,append=True) |
|
|
276 |
else: df_slice.to_hdf(hdf5_fname_for_join,joined_path,append=True,format='t') |
|
|
277 |
|
|
|
278 |
del df_slice |
|
|
279 |
|
|
|
280 |
logger.end_log_level() |
|
|
281 |
logger.end_log_level() |
|
|
282 |
|
|
|
283 |
return hdf5_fname_for_join |
|
|
284 |
|
|
|
285 |
def make_list_hash(l): |
|
|
286 |
#need to sort and make sure list are unique before hashing |
|
|
287 |
l = sorted(list(set(l))) |
|
|
288 |
|
|
|
289 |
#use a hash to make sure store this set of ids uniquely |
|
|
290 |
key = hash(''.join(map(str,l))) |
|
|
291 |
|
|
|
292 |
return key |
|
|
293 |
|
|
|
294 |
""" |
|
|
295 |
Dask intelligent join |
|
|
296 |
""" |
|
|
297 |
|
|
|
298 |
def dask_open_and_join(hdf5_fname,path,components,ids=ALL,chunksize=500000): |
|
|
299 |
|
|
|
300 |
df_all=None |
|
|
301 |
logger.log('DASK OPEN & JOIN n={} components: {}'.format(len(components),components),new_level=True) |
|
|
302 |
for component in components: |
|
|
303 |
logger.log('{}: {}/{}'.format(component.upper(),components.index(component)+1,len(components)),new_level=True) |
|
|
304 |
|
|
|
305 |
df_comp = open_df(hdf5_fname,'{}/{}'.format(path,component)) |
|
|
306 |
df_comp.sort_index(inplace=True) |
|
|
307 |
df_comp.sort_index(inplace=True, axis=1) |
|
|
308 |
|
|
|
309 |
if not ids == ALL: |
|
|
310 |
df_comp = df_comp[df_comp.index.get_level_values(column_names.ID).isin(ids)] |
|
|
311 |
|
|
|
312 |
logger.log('Convert to dask - {}'.format(df_comp.shape)) |
|
|
313 |
df_dask = dd.from_pandas(df_comp.reset_index(), chunksize=chunksize) |
|
|
314 |
del df_comp |
|
|
315 |
|
|
|
316 |
logger.log('Join to big DF') |
|
|
317 |
|
|
|
318 |
if df_all is None: df_all = df_dask |
|
|
319 |
else : |
|
|
320 |
df_all = df_all.merge(df_dask,how='outer', on=['id','datetime']) |
|
|
321 |
del df_dask |
|
|
322 |
logger.end_log_level() |
|
|
323 |
|
|
|
324 |
logger.log('Dask DF back to pandas') |
|
|
325 |
df_pd = df_all.compute() |
|
|
326 |
del df_all |
|
|
327 |
df_pd.set_index(['id','datetime'], inplace=True) |
|
|
328 |
|
|
|
329 |
logger.log('SORT Joined DF') |
|
|
330 |
df_pd.sort_index(inplace=True) |
|
|
331 |
df_pd.sort_index(inplace=True, axis=1) |
|
|
332 |
logger.end_log_level() |
|
|
333 |
return df_pd |
|
|
334 |
|
|
|
335 |
|
|
|
336 |
""" |
|
|
337 |
Visualization |
|
|
338 |
""" |
|
|
339 |
import seaborn as sns |
|
|
340 |
import matplotlib.pyplot as plt |
|
|
341 |
|
|
|
342 |
def heatmap(df_ts): |
|
|
343 |
sns.set(context="paper", font="monospace") |
|
|
344 |
corrmat = df_ts.corr() |
|
|
345 |
# Set up the matplotlib figure |
|
|
346 |
f, ax = plt.subplots(figsize=(50, 50)) |
|
|
347 |
# Draw the heatmap using seaborn |
|
|
348 |
sns.heatmap(corrmat, vmax=1, square=True) |
|
|
349 |
return |