from sklearn.base import BaseEstimator, TransformerMixin
import utils
import abc
import pandas as pd
from constants import variable_type,column_names,NO_UNITS,ALL
import logger
class safe_unstacker(BaseEstimator,TransformerMixin):
def __init__(self, *levels):
self.levels = levels
def fit(self, x, y=None):
return self
def transform(self, df):
return safe_unstack(df,self.levels)
def safe_unstack(df,levels):
subindex = 'subindex'
#add subindex to facilitate unstacking
df = utils.add_subindex(df,subindex)
#unstack!
df_unstacked = df.unstack(levels)
#drop "value" level, which is derivative from value column that is being unstacked against
df_unstacked.columns = df_unstacked.columns.droplevel(0)
# Drop subindex
df_unstacked.index = df_unstacked.index.droplevel(subindex)
df_unstacked.dropna(axis=1,inplace=True,how='all')
return df_unstacked
class add_level(BaseEstimator,TransformerMixin):
def __init__(self,level_val,level_name,axis=0):
self.level_val = level_val
self.level_name = level_name
self.axis = axis
def fit(self, x, y=None):
return self
def transform(self, df):
return utils.add_same_val_index_level(df,self.level_val,self.level_name,self.axis)
class column_standardizer(BaseEstimator,TransformerMixin):
def __init__(self,data_dict,ureg,convert_units=True):
self.data_dict = data_dict
self.ureg = ureg
self.convert_units=convert_units
def fit(self, x, y=None):
return self
def transform(self, df):
df = df.copy()
col_cnt = df.columns.size
if col_cnt == 0: return df
names = ['component','status','variable_type','units','description']
tuples=[]
for col_ix in range(0,col_cnt):
col = df.iloc[:,col_ix]
new_col,new_name = self.standardize(col)
df.iloc[:,col_ix] = new_col
tuples.append(map(str,new_name))
df.columns = pd.MultiIndex.from_tuples(tuples,names=names)
df.sort_index(axis=1, inplace=True)
return df
def standardize(self,col):
old_col_name = col.name
guess_component = old_col_name[0]
units = old_col_name[-2]
desc = old_col_name[-1]
dtype = col.dtype
defs = self.data_dict.tables.definitions
defs = defs[defs.component == guess_component]
best_def = None
for ix,row in defs.iterrows():
def_units = row['units']
if can_convert(def_units,units,self.ureg):
best_def = row
break
if (best_def is None) and (dtype != pd.np.object):
status = 'unknown'
var_type = variable_type.QUANTITATIVE
elif (best_def is None) or ((best_def['variable_type'] == variable_type.QUANTITATIVE) & (dtype == pd.np.object)):
status = 'unknown'
var_type = variable_type.NOMINAL
if units != NO_UNITS:
desc = utils.append_to_description(desc,units)
units = NO_UNITS
else:
status = 'known'
var_type = best_def['variable_type']
new_units = best_def['units']
if new_units != units:
if not self.ureg.same_units(units,new_units) and self.convert_units:
col = self.ureg.convert_units(units,new_units,col)
desc = utils.append_to_description(str(desc),units)
units = new_units
return (col,(guess_component,status,var_type,units,desc))
def can_convert(unit1,unit2,med_ureg):
if (unit1 == unit2): return True
if (NO_UNITS in [unit1,unit2]): return False
return med_ureg.same_dimensionality(unit1,unit2)
class oob_value_remover(BaseEstimator,TransformerMixin):
def __init__(self,data_dict):
self.data_dict = data_dict
def fit(self, x, y=None):
return self
def transform(self, df):
logger.log('Drop OOB data | {}'.format(df.shape),new_level=True)
df = df.copy()
idx = pd.IndexSlice
df = df.sort_index(axis=1).sort_index()
for component in df.columns.get_level_values('component').unique().tolist():
component_defs = self.data_dict.defs_for_component(component)
for units in df[component].columns.get_level_values(column_names.UNITS).unique().tolist():
df_slice = df.loc[:,idx[component,:,:,units,:]]
logger.log('{}, {}, {}'.format(component,units,df_slice.count().sum()))
matching_defs = component_defs[(component_defs.units == units)]
if matching_defs.empty: continue
def_row = matching_defs.iloc[0]
lower = def_row['lower']
upper = def_row['upper']
df.loc[:,idx[component,:,:,units,:]] = remove_oob_values(df_slice,lower,upper)
df.dropna(how='all',inplace=True,axis=1)
logger.end_log_level()
return df
def remove_oob_values(data,lower,upper):
oob_mask = (data < lower) | (data > upper)
return data[~oob_mask]
class split_dtype(BaseEstimator,TransformerMixin):
def fit(self, x, y=None):
return self
def transform(self, df):
if df.empty: return df
df_numeric = df.apply(pd.to_numeric,errors='coerce')
is_string = pd.isnull(df_numeric) & ~pd.isnull(df)
df_string = df[is_string].dropna(how='all')
tuples = [(col_name[0],NO_UNITS,utils.append_to_description(*map(str,col_name[3:0:-1]))) for col_name in df_string.columns]
df_string.columns = pd.MultiIndex.from_tuples(tuples,names = df_string.columns.names)
df_string = utils.add_same_val_index_level(df_string,level_val='string',level_name='dtype',axis=1)
df_numeric = df_numeric.dropna(how='all')
df_numeric = utils.add_same_val_index_level(df_numeric,level_val='number',level_name='dtype',axis=1)
df_joined = df_numeric.join(df_string,how='outer')
del df_string,df_numeric
df_joined.columns = df_joined.columns.droplevel('dtype')
df_joined.dropna(how='all',inplace=True,axis=1)
return df_joined
class combine_like_cols(BaseEstimator,TransformerMixin):
def fit(self, df, y=None, **fit_params):
logger.log('FIT Combine like columns {}'.format(df.shape),new_level=True)
self.columns_to_combine = {}
groupby_cols = list(df.columns.names)
groupby_cols.remove(column_names.DESCRIPTION)
grouped = df.groupby(level=groupby_cols,axis=1)
column_list = []
df_out=None
for index,group in grouped:
index
logger.log(index)
if index[2] == variable_type.NOMINAL: continue
ordered_cols = group[group.count().sort_values(ascending=False).index.tolist()].columns.tolist()
self.columns_to_combine[index] = ordered_cols
logger.end_log_level()
return self
def transform(self, df):
logger.log('TRANSFORM Combine like columns {}'.format(df.shape),new_level=True)
column_list = []
for index,columns in self.columns_to_combine.iteritems():
logger.log(index)
df_list=[]
for col_name in columns:
if col_name not in df.columns:
df[col_name] = pd.np.nan
col = df[col_name].dropna()
col.name = index + (ALL,)
df_list.append(col)
df_combined = pd.concat(df_list).to_frame()
# Here we will drop all duplicate values; since we sort the max col first,
# BEFORE we loop and combine, we will be prioritizing all values from the max value
# column. Although this may be a change in style from previous, it is easy, and will
# most of the time be RIGHT.
duplicates_to_drop = df_combined.index.duplicated(keep='first')
df_combined = df_combined.loc[~duplicates_to_drop]
#drop the combined columns
df.drop(columns,axis=1,inplace=True)
#join the combined column back to the DF
df = df.join(df_combined,how='outer')
df.columns.names = df.columns.names
df.sort_index(inplace=True)
df.sort_index(inplace=True,axis=1)
logger.end_log_level()
return df
class flatten_index(BaseEstimator,TransformerMixin):
def __init__(self,axis=0,suffix=None):
self.axis=axis
self.suffix=suffix
def fit(self, x, y=None):
return self
def transform(self, df):
df = utils.flatten_index(df,axis=self.axis,suffix=self.suffix)
return df
"""
Deal with categorical data
"""
class standardize_categories(BaseEstimator,TransformerMixin):
def __init__(self,data_dict,category_map,use_numeric=True):
self.data_dict = data_dict
self.category_map = category_map
self.use_numeric = use_numeric
def fit(self, x, y=None):
return self
def transform(self, df):
for component in utils.get_components(df):
cat_map = self.category_map.get(component,None)
if cat_map is None: continue
df_slice = df.loc[:,[component]]
categorical_mask = df_slice.columns.get_level_values('variable_type').isin([variable_type.NOMINAL,variable_type.ORDINAL])
df_categories = self.data_dict.tables.categories
to_replace = cat_map.keys()
col = 'val_numeric' if self.use_numeric else 'val_text'
values = [df_categories.loc[cat_ix,col] for cat_ix in cat_map.values()]
df_slice.loc[:,categorical_mask] = df_slice.loc[:,categorical_mask].replace(to_replace=to_replace,value=values)
if not self.use_numeric:
to_replace = [df_categories.loc[cat_ix,'val_numeric'] for cat_ix in cat_map.values()]
df_slice.loc[:,categorical_mask] = df_slice.loc[:,categorical_mask].replace(to_replace=to_replace,value=values)
df.loc[:,[component]] = df_slice
return df
class split_bad_categories(BaseEstimator,TransformerMixin):
def __init__(self,data_dict,use_numeric=True):
self.data_dict = data_dict
self.use_numeric = use_numeric
def fit(self, x, y=None):
return self
def transform(self, df):
for component in utils.get_components(df):
df_categories = self.data_dict.get_categories(component)
if df_categories is None: continue
df_slice = df.loc[:,[component]]
col = 'val_numeric' if self.use_numeric else 'val_text'
valid_values = df_categories.loc[:,col]
categorical_mask = df_slice.columns.get_level_values('variable_type').isin([variable_type.NOMINAL,variable_type.ORDINAL])
categorical_slice = df_slice.loc[:,categorical_mask]
df_valid_mask = categorical_slice.apply(lambda x: x.isin(valid_values))
df_slice.loc[:,categorical_mask] = categorical_slice[df_valid_mask]
df.loc[:,[component]] = df_slice
df_invalid = categorical_slice[~df_valid_mask]
df_invalid.columns = utils.set_level_to_same_val(df_invalid.columns,'status','unknown')
df_invalid.columns = utils.set_level_to_same_val(df_invalid.columns,'variable_type',variable_type.NOMINAL)
df = df.join(df_invalid,how='outer')
del df_invalid
df.dropna(how='all',inplace=True,axis=1)
return df
class nominal_to_onehot(BaseEstimator,TransformerMixin):
def fit(self, x, y=None):
return self
def transform(self, df):
if df.empty: return df
logger.log('Nominal to OneHot',new_level=True)
nominal_cols = df.columns.get_level_values('variable_type') == variable_type.NOMINAL
for col_name in df.loc[:,nominal_cols]:
column = df[col_name]
df.drop(col_name,axis=1,inplace=True)
df_dummies = pd.get_dummies(column)
if df_dummies.empty: continue
dummy_col_names = [col_name[:-1] + ('{}_{}'.format(col_name[-1],text),) for text in df_dummies.columns]
df_dummies.columns = pd.MultiIndex.from_tuples(dummy_col_names,names=df.columns.names)
df = df.join(df_dummies,how='outer')
logger.end_log_level()
return df
"""
Duplicate index aggregators
"""
class same_index_aggregator(BaseEstimator,TransformerMixin):
def __init__(self,agg_func):
self.agg_func = agg_func
def fit(self, x, y=None):
return self
def transform(self, df):
duplicated = df.index.duplicated(keep=False)
df_safe = df[~duplicated]
df_duplicated = df[duplicated]
df_fixed = df_duplicated.groupby(level=df_duplicated.index.names).agg(lambda x:self.agg_func(x))
df_no_dups = pd.concat([df_safe,df_fixed])
df_no_dups.sort_index(inplace=True)
return df_no_dups
"""
Fill NA
"""
class NaNFiller(BaseEstimator,TransformerMixin):
def fit(self, X, y, **fit_params):
self.fill_vals = self.get_fill_vals(X, y, **fit_params)
return self
def transform(self,df):
return df.apply(lambda col: col.fillna(self.fill_vals[col.name]))
def get_fill_vals(self, X, y, **fit_params):
return pd.Series(np.NaN,index=X.columns)
class FillerZero(NaNFiller):
def get_fill_vals(self, X, y, **fit_params):
return pd.Series(0,index=X.columns)
class FillerMean(NaNFiller):
def get_fill_vals(self, X, y, **fit_params):
return X.mean()
class FillerMode(NaNFiller):
def get_fill_vals(self, X, y, **fit_params):
return X.mode().iloc[0]
class do_nothing(BaseEstimator,TransformerMixin):
def fit(self, x, y=None):
return self
def transform(self, df):
return df
class GroupbyAndFFill(BaseEstimator,TransformerMixin):
def __init__(self,level=None,by=None):
self.level=level
self.by=by
def fit(self, x, y=None):
return self
def transform(self, df):
return df.groupby(level=self.level,by=self.by).ffill()
class GroupbyAndBFill(BaseEstimator,TransformerMixin):
def __init__(self,level=None,by=None):
self.level=level
self.by=by
def fit(self, x, y=None):
return self
def transform(self, df):
return df.groupby(level=self.level,by=self.by).bfill()
"""
filtering
"""
class column_filter(BaseEstimator,TransformerMixin):
def fit(self, df, y=None, **fit_params):
logger.log('*fit* Filter columns ({}) {}'.format(self.__class__.__name__, df.shape).format(self.__class__),new_level=True)
if df.empty:
self.cols_to_keep = []
else:
self.cols_to_keep = self.get_columns_to_keep(df, y, **fit_params)
logger.end_log_level()
return self
def transform(self, df):
logger.log('*transform* Filter columns ({}) {}'.format(self.__class__.__name__, df.shape))
df_out = None
if df.empty or len(self.cols_to_keep) == 0: df_out = df.drop(df.columns,axis=1)
else: df_out = df.loc[:,self.cols_to_keep]
logger.log(end_prev=True)
return df_out
def get_columns_to_keep(self,df, y=None, **fit_params):
return df.columns
class DataSpecFilter(column_filter):
def __init__(self,data_specs):
self.data_specs = data_specs
def get_columns_to_keep(self, df, y=None, **fit_params):
df_cols = pd.DataFrame(map(list,df.columns.tolist()),columns=df.columns.names)
mask = utils.complex_row_mask(df_cols,self.data_specs)
return [tuple(x) for x in df_cols[mask].to_records(index=False)]
class max_col_only(column_filter):
def get_columns_to_keep(self, df, y=None, **fit_params):
self.max_col = df.apply(utils.smart_count).sort_values().index.tolist()[-1]
return [self.max_col]
class remove_small_columns(column_filter):
def __init__(self,threshold):
self.threshold = threshold
def get_columns_to_keep(self, df, y=None, **fit_params):
return df.loc[:,df.apply(utils.smart_count) > self.threshold].columns
class multislice_filter(column_filter):
def __init__(self,slice_dict_list):
self.slice_dict_list = slice_dict_list
def get_columns_to_keep(self,df, y=None, **fit_params):
cols = []
for slice_dict in self.slice_dict_list:
levels = slice_dict.keys()
vals = slice_dict.values()
cols += df.xs(vals,level=levels,axis=1,drop_level=False).columns.tolist()
return cols
class DataNeedsFilter(multislice_filter):
def __init__(self,data_needs):
comp_dict = {}
for dn in data_needs:
component = dn[0]
units = dn[1]
units_list = comp_dict.get(component,[])
units_list.append(units)
comp_dict[component] = units_list
slice_dict_list = []
for component,units_list in comp_dict.iteritems():
if ALL in units_list:
slice_dict_list.append({column_names.COMPONENT: component})
continue
for unit in units_list:
slice_dict_list.append({
column_names.COMPONENT: component,
column_names.UNITS : units
})
super(DataNeedsFilter,self).__init__(slice_dict_list)
class func_filter(column_filter):
def __init__(self,filter_func):
self.filter_func = filter_func
def get_columns_to_keep(self,df, y=None, **fit_params):
return df.loc[:,df.apply(self.filter_func)].columns
class record_threshold(func_filter):
def __init__(self,threshold):
self.threshold = threshold
filter_func = lambda col: col.dropna().index.get_level_values(column_names.ID).unique().size > self.threshold
super(record_threshold,self).__init__(filter_func)
class drop_all_nan_cols(func_filter):
def __init__(self):
filter_func = lambda col: ~pd.isnull(col).all()
super(drop_all_nan_cols,self).__init__(filter_func)
class known_col_only(func_filter):
def __init__(self):
filter_func = lambda col: col.name[1] == 'known'
super(known_col_only,self).__init__(filter_func)
class filter_to_component(func_filter):
def __init__(self,components):
self.components = components
filter_func = lambda col: col.name[0] in self.components
super(filter_to_component,self).__init__(filter_func)
class filter_var_type(func_filter):
def __init__(self,var_types):
self.var_types =var_types
filter_func = lambda col: col.name[2] in self.var_types
super(filter_var_type,self).__init__(filter_func)
class summable_only(func_filter):
def __init__(self,ureg,ignore_component_list):
self.ureg = ureg
self.ignore_component_list = ignore_component_list
filter_func = lambda col:summable_only_filter(col,self.ureg,self.ignore_component_list)
super(summable_only,self).__init__(filter_func)
def summable_only_filter(col,ureg,ignore_component_list):
is_summable_unit = lambda col: (col.name[-2] != NO_UNITS) and (ureg.is_volume(str(col.name[-2])) or ureg.is_mass(str(col.name[-2])))
should_ignore_component = lambda col: (col.name[0] in ignore_component_list)
return lambda col: is_summable_unit(col.name) and not should_ignore_component(col.name)
class DropNaN(BaseEstimator,TransformerMixin):
def __init__(self,axis=0,how='any',thresh=None):
self.axis=axis
self.how=how
self.thresh=thresh
def fit(self, df, y=None):
return self
def transform(self, df):
return df.dropna(axis=self.axis,how=self.how,thresh=self.thresh)
class filter_ids(BaseEstimator,TransformerMixin):
def __init__(self,print_loss=False,ids=None):
self.print_loss = print_loss
self.ids = ids
def fit(self, x, y=None, **fit_params):
if self.ids is None:
ids = fit_params.get('ids',None)
if (ids is None) and (y is not None):
ids = y.index.get_level_values(column_names.ID).unique().tolist()
self.ids = ids
return self
def transform(self, df):
if self.ids is not None:
out_df = df.loc[df.index.get_level_values(column_names.ID).isin(self.ids)]
else: out_df = df
if self.print_loss:
print 'Data Loss:',utils.data_loss(df,out_df)
return out_df
class more_than_n_component(BaseEstimator,TransformerMixin):
def __init__(self,n,component):
self.n = n
self.component = component
def fit(self, df, y=None):
return self
def transform(self, df):
if df.empty: return df.drop(df.index)
good_ids = df.loc[:,[self.component]].dropna(how='all').groupby(level=column_names.ID).count() > self.n
good_ids = good_ids.loc[good_ids.iloc[:,0]].index.unique().tolist()
return df.loc[df.index.get_level_values(column_names.ID).isin(good_ids)]
"""
Simple Data Manipulation
"""
class TimeShifter(TransformerMixin,BaseEstimator):
def __init__(self,datetime_level,shift='infer',n=1):
self.shift=shift
self.datetime_level = datetime_level
self.n=n
def fit(self, X, y=None, **fit_params):
return self
def transform(self, df):
shift = self.shift
if shift == 'infer':
infer_freq = lambda grp: grp.index.get_level_values(self.datetime_level).inferred_freq
inferred_freqs = df.groupby(level=column_names.ID).apply(infer_freq)
shift = inferred_freqs.value_counts().sort_values().index[-1]
df = df.reset_index(level=self.datetime_level)
df.loc[:,self.datetime_level] = df.loc[:,self.datetime_level] + self.n*pd.Timedelta(shift)
df.set_index(self.datetime_level,append=True,inplace=True)
return df
class RowShifter(TransformerMixin,BaseEstimator):
def __init__(self,n):
self.n=n
def fit(self, X, y=None, **fit_params):
return self
def transform(self, df):
return df.shift(self.n)
class Replacer(TransformerMixin,BaseEstimator):
def __init__(self,to_replace=None, value=None, regex=False, method='pad'):
self.to_replace = to_replace
self.value = value
self.regex=regex
self.method = method
def fit(self, X, y=None, **fit_params):
return self
def transform(self, df):
return df.replace(
to_replace=self.to_replace,
value=self.value,
regex=self.regex,
method=self.method
)
class Delta(TransformerMixin,BaseEstimator):
def fit(self, X, y=None, **fit_params):
return self
def transform(self, df):
df_last = df.ffill().dropna(how='any')
df_last = utils.add_same_val_index_level(df_last,'last','temp',axis=1)
df_next = df.shift(-1).dropna(how='any')
df_next = utils.add_same_val_index_level(df_next,'next','temp',axis=1)
df_all = df_last.join(df_next,how='inner')
return df_all.loc[:,'next'] - df_all.loc[:,'last']
class ToGroupby(TransformerMixin,BaseEstimator):
def __init__(self, by=None, axis=0, level=None, as_index=True):
self.by=by
self.axis=axis
self.level=level
self.as_index = as_index
def fit(self, X, y=None, **fit_params):
return self
def transform(self, df):
return df.groupby(by=self.by, axis=self.axis, level=self.level, as_index=self.as_index)