--- a
+++ b/utils.py
@@ -0,0 +1,349 @@
+import sqlalchemy
+import pandas as pd
+from constants import variable_type,column_names,ALL
+import logger
+import numpy as np
+import dask.dataframe as dd
+
+
+def psql_connect(user, password, database='postgres', host='localhost', port=5432):
+    '''Returns a connection and a metadata object'''
+    # We connect with the help of the PostgreSQL URL
+    # postgresql://federer:grandestslam@localhost:5432/tennis
+    url = 'postgresql://{}:{}@{}:{}/{}'
+    url = url.format(user, password, host, port, database)
+
+    # The return value of create_engine() is our connection object
+    connection = sqlalchemy.create_engine(url, client_encoding='utf8')
+
+    return connection
+
+def simple_sql_query(table,columns=['*'],where_condition=None):
+    where = ''
+    if where_condition is not None:
+        where = ' WHERE {}'.format(where_condition)
+    return 'SELECT {} FROM {}{}'.format(','.join(columns),table,where)
+
+def save_df(df, hdf5_fname, path):
+    store = pd.HDFStore(hdf5_fname)
+    store[path] = df
+    store.close()
+    return df
+
+def open_df(hdf5_fname, path):
+    store = pd.HDFStore(hdf5_fname)
+    df = store[path]
+    store.close()
+    return df
+
+
+def is_categorical(var_type):
+    return var_type in [variable_type.ORDINAL, variable_type.NOMINAL]
+
+class Bunch(object):
+    def __init__(self, **kwds):
+        self.__dict__.update(kwds)
+
+def add_subindex(df,subindex_name):
+    df = df.sort_index()
+    index = df.index
+    df[subindex_name] = range(0,len(index))
+    duplicated = index.duplicated(keep=False)
+    df.loc[duplicated,subindex_name] = df.loc[duplicated].groupby(level=index.names)[subindex_name].apply(lambda x:x - x.min())
+    df.loc[~duplicated,subindex_name] = 0
+    df.loc[:,subindex_name] = df.loc[:,subindex_name].astype(int)
+    df.set_index(subindex_name, append=True,inplace=True)
+    return df
+
+def add_same_val_index_level(df,level_val,level_name,axis=0):
+    return pd.concat([df], keys=[level_val], names=[level_name],axis=axis)
+
+def set_level_to_same_val(index,level,value):
+    return index.set_labels([0]*index.size,level=level).set_levels([value],level=level)
+
+def data_loss(df_start,df_end):
+    data_loss = df_start.count().sum() - df_end.count().sum()
+    admissions_start = len(df_start.index.get_level_values(column_names.ID).unique().tolist())
+    admisstions_end = len(df_end.index.get_level_values(column_names.ID).unique().tolist())
+
+    admission_loss = admissions_start - admisstions_end
+    percent_loss = str(round(float(admission_loss)/admissions_start * 100,4))+'% records'
+
+    return df_start.shape,df_end.shape,data_loss,admission_loss,percent_loss
+
+
+def get_components(df):
+    return df.columns.get_level_values('component').unique().tolist()
+
+def filter_columns(df,level,value):
+    return df.loc[:,df.columns.get_level_values(level) == value]
+
+def append_to_description(old_desc,addition):
+    return old_desc + '(' + addition + ')'
+
+def get_first_dt(df_ts,df_context):
+    ids = df_ts.index.get_level_values(column_names.ID).unique()
+    admit_dt = get_admit_dt(df_context,ids)
+
+    first_obs_dt = get_first_obs_dt(df_ts)
+    first_dt = first_obs_dt.to_frame().join(admit_dt,how='left').apply(pd.np.min,axis=1)
+    first_dt.name = column_names.DATETIME
+    return first_dt
+
+def get_admit_dt(df_context,ids=ALL):
+    if not ids == ALL:
+        df_context_filtered = df_context[df_context[column_names.ID].isin(ids)]
+    else: df_context_filtered = df_context
+    admit_dt = df_context_filtered.loc[:,['id','start_dt']].drop_duplicates(subset=['id']).set_index('id')
+    return admit_dt
+
+def get_first_obs_dt(df_ts):
+    first_obs_dt = df_ts.groupby(level=column_names.ID).apply(lambda x:x.iloc[0].name[-1])
+    first_obs_dt.name = 'start_dt_obs'
+    return first_obs_dt
+
+def flatten_index(df,join_char='_',suffix=None,axis=0):
+    if axis==0:
+        new_vals = []
+        for row_name in df.index:
+            if type(row_name) is tuple:
+                row = join_char.join(map(str,row_name))
+            else:
+                row = str(row_name)
+
+            if suffix is not None: row = row + join_char + suffix
+
+            new_vals.append(row)
+
+        df.index = pd.Index(new_vals)
+    elif axis==1:
+        new_vals = []
+        for col_name in df.columns:
+            if type(col_name) is tuple:
+                col = join_char.join(map(str,col_name))
+            else:
+                col = str(col_name)
+            if suffix is not None: col = col + join_char + suffix
+
+            new_vals.append(col)
+
+        df.columns = pd.Index(new_vals)
+    else: pass
+    return df
+
+def smart_count(col):
+    var_type = col.name[2]
+    if (var_type == variable_type.NOMINAL) and (col.dtype == pd.np.uint8):
+        return col.sum()
+
+    return col.count()
+
+
+"""
+Pytables/HDF5 I/O with axis deconstruction
+"""
+
+def read_and_reconstruct(hdf5_fname,path,where=None):
+    # Get all paths for dataframes in store
+    data_path,col_path = deconstucted_paths(path)
+
+    data = pd.read_hdf(hdf5_fname,data_path,where=where)
+    columns = pd.read_hdf(hdf5_fname,col_path)
+    return reconstruct_df(data,columns)
+
+
+def reconstruct_df(data,column_df):
+    #reconstruct the columns from dataframe
+    column_index = reconstruct_columns(column_df,data.columns)
+    df = data.copy()
+    df.columns = column_index
+    return df
+
+def reconstruct_columns(column_df,col_ix=None):
+    column_df = column_df.drop('dtype',axis=1)
+    if col_ix is None: col_ix = column_df.index
+    col_arys = column_df.loc[col_ix].T.values
+    levels = column_df.columns.tolist()
+
+    return pd.MultiIndex.from_arrays(col_arys,names=levels)
+
+def deconstruct_and_write(df,hdf5_fname,path,append=False):
+
+    # Deconstruct the dataframe
+    data,columns = deconstruct_df(df)
+
+    # Get all paths for dataframes in store
+    data_path,col_path = deconstucted_paths(path)
+
+    #Open store and save df
+    store = pd.HDFStore(hdf5_fname)
+    store.put(data_path,data,append=append,format='t')
+    if (not append) or col_path not in store:
+        columns.to_hdf(hdf5_fname,col_path,format='t')
+    store.close()
+    return
+
+def deconstruct_df(df):
+    columns = pd.DataFrame(map(list,df.columns.tolist()),columns=df.columns.names)
+    columns['dtype'] = df.dtypes.values.astype(str)
+    data = df.copy()
+    data.columns = [i for i in range(df.shape[1])]
+    return data,columns
+
+def deconstucted_paths(path):
+    data_path = '{}/{}'.format(path,'data')
+    col_path = '{}/{}'.format(path,'columns')
+    return data_path,col_path
+
+def complex_row_mask(df,specs,operator='or'):
+    #init our mask datframe with row index of passed in dataframe
+    df_mask = pd.DataFrame(index=df.index)
+
+    #make sure our specs are a list
+    if not isinstance(specs,list): specs = [specs]
+
+    #if we have no specs, then we are not going to mask anything
+    if (specs is None) or (len(specs) == 0):
+        df_mask.loc[:,0] = True
+
+    #apply specs to create mask
+    for idx,spec in enumerate(specs):
+        df_spec_mask = pd.DataFrame(index=df.index)
+        for col_name,spec_info in spec.iteritems():
+            if callable(spec_info):
+                df_spec_mask.loc[:,col_name] = df.loc[:,col_name].apply(spec_info)
+            else:
+                if not isinstance(spec_info,list): spec_info = [spec_info]
+                df_spec_mask.loc[:,col_name] = df.loc[:,col_name].isin(spec_info)
+        df_mask.loc[:,idx] = df_spec_mask.all(axis=1)
+
+    #if or, will will include rows from each spec.
+    #   if and, only rows that meet criteria of EVERY spec ar included
+    if operator == 'or' : mask = df_mask.any(axis=1)
+    else: mask = df_mask.all(axis=1)
+    return mask
+
+def smart_join(hdf5_fname,paths,joined_path,ids,
+                                        chunksize=5000,
+                                        need_deconstruct=True,
+                                        hdf5_fname_for_join=None,
+                                        overwrite=True):
+
+    logger.log('Smart join: n={}, {}'.format(len(ids),paths),new_level=True)
+
+    if hdf5_fname_for_join is None: hdf5_fname_for_join=hdf5_fname
+
+    store = pd.HDFStore(hdf5_fname_for_join)
+    if (joined_path in store):
+        if overwrite: del store[joined_path]
+        else :
+            store.close()
+            logger.end_log_level()
+            return hdf5_fname_for_join
+    #sort ids, should speed up where clauses and selects
+    ids = sorted(ids)
+
+    #do chunked join
+    logger.log('JOINING dataframes',new_level=True)
+    for ix_start in range(0,len(ids),chunksize):
+        ix_end = min(ix_start + chunksize,len(ids))
+        id_slice = ids[ix_start:ix_end]
+
+        where = '{id_col} in {id_list}'.format(id_col=column_names.ID,id_list=id_slice)
+
+        logger.log('Slice & Join: {} --> {}, n={}'.format(id_slice[0], id_slice[-1],len(id_slice)),new_level=True)
+        df_slice = None
+        # for path in df_dict.keys():
+        for path in paths:
+            try:
+                logger.log(path)
+                if need_deconstruct: slice_to_add = read_and_reconstruct(hdf5_fname,path,where=where)
+                else: slice_to_add = pd.read_hdf(hdf5_fname,path,where=where)
+            except KeyError as err:
+                logger.log(end_prev=True,start=False)
+                print err
+                continue
+
+            if df_slice is None: df_slice = slice_to_add
+            else:
+                df_slice = df_slice.join(slice_to_add,how='outer')
+                del slice_to_add
+
+        logger.end_log_level()
+        logger.log('Append slice')
+
+        if need_deconstruct: deconstruct_and_write(df_slice,hdf5_fname_for_join,joined_path,append=True)
+        else: df_slice.to_hdf(hdf5_fname_for_join,joined_path,append=True,format='t')
+
+        del df_slice
+
+    logger.end_log_level()
+    logger.end_log_level()
+
+    return hdf5_fname_for_join
+
+def make_list_hash(l):
+    #need to sort and make sure list are unique before hashing
+    l = sorted(list(set(l)))
+
+    #use a hash to make sure store this set of ids uniquely
+    key = hash(''.join(map(str,l)))
+
+    return key
+
+"""
+Dask intelligent join
+"""
+
+def dask_open_and_join(hdf5_fname,path,components,ids=ALL,chunksize=500000):
+
+    df_all=None
+    logger.log('DASK OPEN & JOIN n={} components: {}'.format(len(components),components),new_level=True)
+    for component in components:
+        logger.log('{}: {}/{}'.format(component.upper(),components.index(component)+1,len(components)),new_level=True)
+
+        df_comp = open_df(hdf5_fname,'{}/{}'.format(path,component))
+        df_comp.sort_index(inplace=True)
+        df_comp.sort_index(inplace=True, axis=1)
+
+        if not ids == ALL:
+            df_comp = df_comp[df_comp.index.get_level_values(column_names.ID).isin(ids)]
+
+        logger.log('Convert to dask - {}'.format(df_comp.shape))
+        df_dask = dd.from_pandas(df_comp.reset_index(), chunksize=chunksize)
+        del df_comp
+
+        logger.log('Join to big DF')
+
+        if df_all is None: df_all = df_dask
+        else :
+            df_all = df_all.merge(df_dask,how='outer', on=['id','datetime'])
+            del df_dask
+        logger.end_log_level()
+
+    logger.log('Dask DF back to pandas')
+    df_pd = df_all.compute()
+    del df_all
+    df_pd.set_index(['id','datetime'], inplace=True)
+
+    logger.log('SORT Joined DF')
+    df_pd.sort_index(inplace=True)
+    df_pd.sort_index(inplace=True, axis=1)
+    logger.end_log_level()
+    return df_pd
+
+
+"""
+Visualization
+"""
+import seaborn as sns
+import matplotlib.pyplot as plt
+
+def heatmap(df_ts):
+    sns.set(context="paper", font="monospace")
+    corrmat = df_ts.corr()
+    # Set up the matplotlib figure
+    f, ax = plt.subplots(figsize=(50, 50))
+    # Draw the heatmap using seaborn
+    sns.heatmap(corrmat, vmax=1, square=True)
+    return