|
a |
|
b/features.py |
|
|
1 |
from sklearn.base import TransformerMixin,BaseEstimator,clone |
|
|
2 |
from sklearn.pipeline import Pipeline,FeatureUnion |
|
|
3 |
from sklearn.preprocessing import FunctionTransformer |
|
|
4 |
from constants import column_names,SEG_ID,NO_SEGMENT,ALL,CUSTOM_FILTER,FEATURE_LEVEL |
|
|
5 |
import numpy as np |
|
|
6 |
import utils |
|
|
7 |
import transformers |
|
|
8 |
import logger |
|
|
9 |
import pandas as pd |
|
|
10 |
|
|
|
11 |
|
|
|
12 |
""" |
|
|
13 |
Feature Creation |
|
|
14 |
""" |
|
|
15 |
|
|
|
16 |
class Featurizer(TransformerMixin,BaseEstimator): |
|
|
17 |
def __init__(self,agg_func,resample_freq, |
|
|
18 |
col_filter=transformers.do_nothing(), |
|
|
19 |
pre_processor=transformers.do_nothing(), |
|
|
20 |
post_processor=transformers.do_nothing(), |
|
|
21 |
fillna_transformer=transformers.do_nothing(), |
|
|
22 |
dropna=True |
|
|
23 |
): |
|
|
24 |
self.col_filter = col_filter |
|
|
25 |
self.agg_func = agg_func |
|
|
26 |
self.resample_freq = resample_freq |
|
|
27 |
self.pre_processor = pre_processor |
|
|
28 |
self.post_processor = post_processor |
|
|
29 |
self.fillna_transformer = fillna_transformer |
|
|
30 |
self.dropna = dropna |
|
|
31 |
|
|
|
32 |
def _make_pipeline(self): |
|
|
33 |
dropna_transformer = transformers.do_nothing() |
|
|
34 |
if self.dropna: dropna_transformer = transformers.DropNaN(how='all') |
|
|
35 |
|
|
|
36 |
return Pipeline([ |
|
|
37 |
('col_filter',self.col_filter), |
|
|
38 |
('pre_processor',self.pre_processor), |
|
|
39 |
('aggregator',ResampleAggregator(self.agg_func,column_names.ID,column_names.DATETIME,self.resample_freq)), |
|
|
40 |
('post_processor',self.post_processor), |
|
|
41 |
('drop_na_rows',dropna_transformer), |
|
|
42 |
('fill_na',self.fillna_transformer) |
|
|
43 |
]) |
|
|
44 |
|
|
|
45 |
|
|
|
46 |
def fit(self, X, y=None, **fit_params): |
|
|
47 |
self.pipeline = self._make_pipeline() |
|
|
48 |
return self.pipeline.fit(X, y, **fit_params) |
|
|
49 |
|
|
|
50 |
def transform(self, X): |
|
|
51 |
return self.pipeline.transform(X) |
|
|
52 |
|
|
|
53 |
def fit_transform(self,X, y=None, **fit_params): |
|
|
54 |
self.pipeline = self._make_pipeline() |
|
|
55 |
return self.pipeline.fit_transform(X, y, **fit_params) |
|
|
56 |
|
|
|
57 |
class DataSpecsFeaturizer(Featurizer): |
|
|
58 |
def __init__(self,agg_func,resample_freq, |
|
|
59 |
data_specs=[], |
|
|
60 |
pre_processor=transformers.do_nothing(), |
|
|
61 |
post_processor=transformers.do_nothing(), |
|
|
62 |
fillna_transformer=transformers.do_nothing(), |
|
|
63 |
dropna=True |
|
|
64 |
): |
|
|
65 |
self.data_specs = data_specs |
|
|
66 |
super(DataSpecsFeaturizer,self).__init__(agg_func,resample_freq, |
|
|
67 |
col_filter=transformers.DataSpecFilter(data_specs), |
|
|
68 |
pre_processor=pre_processor, |
|
|
69 |
post_processor=post_processor, |
|
|
70 |
fillna_transformer=fillna_transformer, |
|
|
71 |
dropna=dropna |
|
|
72 |
) |
|
|
73 |
|
|
|
74 |
class ResampleAggregator(TransformerMixin,BaseEstimator): |
|
|
75 |
|
|
|
76 |
def __init__(self,agg_func,groupby_level=None,resample_level=None,resample_freq=None): |
|
|
77 |
self.agg_func=agg_func |
|
|
78 |
self.groupby_level=groupby_level |
|
|
79 |
self.resample_level=resample_level |
|
|
80 |
self.resample_freq=resample_freq |
|
|
81 |
|
|
|
82 |
def fit(self, X, y=None, **fit_params): |
|
|
83 |
return self |
|
|
84 |
|
|
|
85 |
def transform(self, X): |
|
|
86 |
if self.groupby_level is not None: |
|
|
87 |
to_resample = X.groupby(level=self.groupby_level) |
|
|
88 |
else: to_resample = X |
|
|
89 |
|
|
|
90 |
if self.resample_level is not None: |
|
|
91 |
to_agg = to_resample.resample(rule=self.resample_freq,level=self.resample_level,label='right') |
|
|
92 |
else: |
|
|
93 |
to_agg = to_resample |
|
|
94 |
|
|
|
95 |
return to_agg.agg(self.agg_func) |
|
|
96 |
|
|
|
97 |
class FeatureUnionDF(TransformerMixin,BaseEstimator): |
|
|
98 |
def __init__(self,featurizers,add_name_level=True): |
|
|
99 |
self.featurizers = featurizers |
|
|
100 |
self.add_name_level = add_name_level |
|
|
101 |
|
|
|
102 |
def fit(self, X, y=None, **fit_params): |
|
|
103 |
for f in self.featurizers: |
|
|
104 |
f[1].fit(X, y=None, **fit_params) |
|
|
105 |
return self |
|
|
106 |
|
|
|
107 |
def transform(self, X): |
|
|
108 |
return self.do_union(self,X,False) |
|
|
109 |
|
|
|
110 |
def fit_transform(self,X, y=None, **fit_params): |
|
|
111 |
return self.do_union(X, True, y, **fit_params) |
|
|
112 |
|
|
|
113 |
def do_union(self,X, is_fit, y=None, **fit_params): |
|
|
114 |
|
|
|
115 |
logger.log('Begin union for {} transformers'.format(len(self.featurizers)),new_level=True) |
|
|
116 |
df_features = None |
|
|
117 |
|
|
|
118 |
for f in self.featurizers: |
|
|
119 |
logger.log(f[0],new_level=True) |
|
|
120 |
|
|
|
121 |
if is_fit: df_ft = f[1].fit_transform(X) |
|
|
122 |
else: df_ft = f[1].transform(X) |
|
|
123 |
if self.add_name_level: |
|
|
124 |
df_ft = utils.add_same_val_index_level(df_ft,level_val=f[0],level_name=FEATURE_LEVEL,axis=1) |
|
|
125 |
if df_features is None: df_features = df_ft |
|
|
126 |
else: df_features = df_features.join(df_ft,how='outer') |
|
|
127 |
del df_ft |
|
|
128 |
|
|
|
129 |
logger.end_log_level() |
|
|
130 |
logger.end_log_level() |
|
|
131 |
return df_features |
|
|
132 |
|
|
|
133 |
|
|
|
134 |
class DataSetFactory(TransformerMixin,BaseEstimator): |
|
|
135 |
|
|
|
136 |
def __init__(self, |
|
|
137 |
featurizers, |
|
|
138 |
resample_freq, |
|
|
139 |
components, |
|
|
140 |
etl_manager, |
|
|
141 |
pre_processor=transformers.do_nothing(), |
|
|
142 |
post_processor=transformers.do_nothing(), |
|
|
143 |
should_fillna=True): |
|
|
144 |
self.featurizers = featurizers |
|
|
145 |
self.resample_freq = resample_freq |
|
|
146 |
self.components = components |
|
|
147 |
self.etl_manager=etl_manager |
|
|
148 |
self.pre_processor = pre_processor |
|
|
149 |
self.post_processor = post_processor |
|
|
150 |
self.should_fillna=should_fillna |
|
|
151 |
return |
|
|
152 |
|
|
|
153 |
def fit(self,X,y=None, **fit_params): |
|
|
154 |
self.fit_transform(X, y, **fit_params) |
|
|
155 |
return self |
|
|
156 |
|
|
|
157 |
def transform(self, X): |
|
|
158 |
return self.make_feature_set(self,X,False) |
|
|
159 |
|
|
|
160 |
def fit_transform(self,X, y=None, **fit_params): |
|
|
161 |
return self.make_feature_set(X, True, y, **fit_params) |
|
|
162 |
|
|
|
163 |
def make_feature_set(self, ids, fit, y=None, **fit_params): |
|
|
164 |
logger.log("Make Feature Set. id_count={}, #features={}, components=".format(len(ids),len(self.featurizers),self.components),new_level=True) |
|
|
165 |
if fit: |
|
|
166 |
self.comp_preprocessors = [(c,self.preprocessor_pipeline(c)) for c in self.components] |
|
|
167 |
|
|
|
168 |
adjusted_featurizers = [(ft_name,self.adjust_featurizer(ft)) for ft_name,ft in self.featurizers] |
|
|
169 |
|
|
|
170 |
pipeline_steps = [ |
|
|
171 |
('pre_processors',FeatureUnionDF(self.comp_preprocessors, add_name_level=False)), |
|
|
172 |
('feature_union',FeatureUnionDF(adjusted_featurizers)), |
|
|
173 |
('post_processor',self.post_processor), |
|
|
174 |
] |
|
|
175 |
|
|
|
176 |
if self.should_fillna: |
|
|
177 |
pipeline_steps.append(('fillna',LocAndFillNaN(self.featurizers))) |
|
|
178 |
|
|
|
179 |
ft_union_pipeline = Pipeline(pipeline_steps) |
|
|
180 |
if fit: df = ft_union_pipeline.fit_transform(ids, y, **fit_params) |
|
|
181 |
else: df = ft_union_pipeline.transform(ids) |
|
|
182 |
|
|
|
183 |
logger.end_log_level() |
|
|
184 |
return df |
|
|
185 |
|
|
|
186 |
def adjust_featurizer(self,ft): |
|
|
187 |
return Featurizer(ft.agg_func, |
|
|
188 |
resample_freq=self.resample_freq, |
|
|
189 |
col_filter=ft.col_filter, |
|
|
190 |
pre_processor=ft.pre_processor, |
|
|
191 |
post_processor=ft.post_processor, |
|
|
192 |
dropna=False |
|
|
193 |
) |
|
|
194 |
|
|
|
195 |
def preprocessor_pipeline(self,comp): |
|
|
196 |
return Pipeline([ |
|
|
197 |
('data_loader',ComponentDataLoader(comp, self.etl_manager)), |
|
|
198 |
('pre_processor',clone(self.pre_processor)) |
|
|
199 |
]) |
|
|
200 |
|
|
|
201 |
class LocAndFillNaN(TransformerMixin,BaseEstimator): |
|
|
202 |
|
|
|
203 |
def __init__(self,featurizers): |
|
|
204 |
self.featurizers = featurizers |
|
|
205 |
|
|
|
206 |
def transform(self, df): |
|
|
207 |
df = df.copy() |
|
|
208 |
for ft_name,ft in self.featurizers: |
|
|
209 |
df[ft_name] = ft.fillna_transformer.transform(df[ft_name]) |
|
|
210 |
return df |
|
|
211 |
|
|
|
212 |
def fit(self, df, y=None, **fit_params): |
|
|
213 |
for ft_name,ft in self.featurizers: |
|
|
214 |
ft.fillna_transformer.fit(df[ft_name],y,**fit_params) |
|
|
215 |
return self |
|
|
216 |
|
|
|
217 |
class ComponentDataLoader(TransformerMixin,BaseEstimator): |
|
|
218 |
|
|
|
219 |
def __init__(self,component,etl_manager): |
|
|
220 |
self.component = component |
|
|
221 |
self.etl_manager = etl_manager |
|
|
222 |
|
|
|
223 |
def transform(self, X): |
|
|
224 |
logger.log('Load data from component: {}'.format(self.component.upper()),new_level=True) |
|
|
225 |
if isinstance(X,pd.DataFrame) or isinstance(X,pd.Series): |
|
|
226 |
X = X.index |
|
|
227 |
if isinstance(X, pd.Index): |
|
|
228 |
ids=X.get_level_values(column_names.ID).unique().tolist() |
|
|
229 |
else: ids=X |
|
|
230 |
|
|
|
231 |
df_component = self.etl_manager.open_df(self.component,ids=ids) |
|
|
232 |
|
|
|
233 |
logger.end_log_level() |
|
|
234 |
|
|
|
235 |
return df_component |
|
|
236 |
|
|
|
237 |
def fit(self, X, y=None, **fit_params): |
|
|
238 |
return self |