Diff of /features.py [000000] .. [418e14]

Switch to unified view

a b/features.py
1
from sklearn.base import TransformerMixin,BaseEstimator,clone
2
from sklearn.pipeline import Pipeline,FeatureUnion
3
from sklearn.preprocessing import FunctionTransformer
4
from constants import column_names,SEG_ID,NO_SEGMENT,ALL,CUSTOM_FILTER,FEATURE_LEVEL
5
import numpy as np
6
import utils
7
import transformers
8
import logger
9
import pandas as pd
10
11
12
"""
13
Feature Creation
14
"""
15
16
class Featurizer(TransformerMixin,BaseEstimator):
17
    def __init__(self,agg_func,resample_freq,
18
                    col_filter=transformers.do_nothing(),
19
                    pre_processor=transformers.do_nothing(),
20
                    post_processor=transformers.do_nothing(),
21
                    fillna_transformer=transformers.do_nothing(),
22
                    dropna=True
23
                    ):
24
        self.col_filter = col_filter
25
        self.agg_func = agg_func
26
        self.resample_freq = resample_freq
27
        self.pre_processor = pre_processor
28
        self.post_processor = post_processor
29
        self.fillna_transformer = fillna_transformer
30
        self.dropna = dropna
31
32
    def _make_pipeline(self):
33
        dropna_transformer = transformers.do_nothing()
34
        if self.dropna: dropna_transformer = transformers.DropNaN(how='all')
35
36
        return Pipeline([
37
            ('col_filter',self.col_filter),
38
            ('pre_processor',self.pre_processor),
39
            ('aggregator',ResampleAggregator(self.agg_func,column_names.ID,column_names.DATETIME,self.resample_freq)),
40
            ('post_processor',self.post_processor),
41
            ('drop_na_rows',dropna_transformer),
42
            ('fill_na',self.fillna_transformer)
43
        ])
44
45
46
    def fit(self, X, y=None, **fit_params):
47
        self.pipeline = self._make_pipeline()
48
        return self.pipeline.fit(X, y, **fit_params)
49
50
    def transform(self, X):
51
        return self.pipeline.transform(X)
52
53
    def fit_transform(self,X, y=None, **fit_params):
54
        self.pipeline = self._make_pipeline()
55
        return self.pipeline.fit_transform(X, y, **fit_params)
56
57
class DataSpecsFeaturizer(Featurizer):
58
    def __init__(self,agg_func,resample_freq,
59
                    data_specs=[],
60
                    pre_processor=transformers.do_nothing(),
61
                    post_processor=transformers.do_nothing(),
62
                    fillna_transformer=transformers.do_nothing(),
63
                    dropna=True
64
                    ):
65
            self.data_specs = data_specs
66
            super(DataSpecsFeaturizer,self).__init__(agg_func,resample_freq,
67
                                                        col_filter=transformers.DataSpecFilter(data_specs),
68
                                                        pre_processor=pre_processor,
69
                                                        post_processor=post_processor,
70
                                                        fillna_transformer=fillna_transformer,
71
                                                        dropna=dropna
72
                                                    )
73
74
class ResampleAggregator(TransformerMixin,BaseEstimator):
75
76
    def __init__(self,agg_func,groupby_level=None,resample_level=None,resample_freq=None):
77
        self.agg_func=agg_func
78
        self.groupby_level=groupby_level
79
        self.resample_level=resample_level
80
        self.resample_freq=resample_freq
81
82
    def fit(self, X, y=None, **fit_params):
83
        return self
84
85
    def transform(self, X):
86
        if self.groupby_level is not None:
87
            to_resample = X.groupby(level=self.groupby_level)
88
        else: to_resample = X
89
90
        if self.resample_level is not None:
91
            to_agg = to_resample.resample(rule=self.resample_freq,level=self.resample_level,label='right')
92
        else:
93
            to_agg = to_resample
94
95
        return to_agg.agg(self.agg_func)
96
97
class FeatureUnionDF(TransformerMixin,BaseEstimator):
98
    def __init__(self,featurizers,add_name_level=True):
99
        self.featurizers = featurizers
100
        self.add_name_level = add_name_level
101
102
    def fit(self, X, y=None, **fit_params):
103
        for f in self.featurizers:
104
            f[1].fit(X, y=None, **fit_params)
105
        return self
106
107
    def transform(self, X):
108
        return self.do_union(self,X,False)
109
110
    def fit_transform(self,X, y=None, **fit_params):
111
        return self.do_union(X, True, y, **fit_params)
112
113
    def do_union(self,X, is_fit, y=None, **fit_params):
114
115
        logger.log('Begin union for {} transformers'.format(len(self.featurizers)),new_level=True)
116
        df_features = None
117
118
        for f in self.featurizers:
119
            logger.log(f[0],new_level=True)
120
            
121
            if is_fit: df_ft = f[1].fit_transform(X)
122
            else: df_ft = f[1].transform(X)
123
            if self.add_name_level:
124
                df_ft = utils.add_same_val_index_level(df_ft,level_val=f[0],level_name=FEATURE_LEVEL,axis=1)
125
            if df_features is None: df_features = df_ft
126
            else: df_features = df_features.join(df_ft,how='outer')
127
            del df_ft
128
129
            logger.end_log_level()
130
        logger.end_log_level()
131
        return df_features
132
133
134
class DataSetFactory(TransformerMixin,BaseEstimator):
135
136
    def __init__(self,
137
                 featurizers,
138
                 resample_freq,
139
                 components,
140
                 etl_manager,
141
                 pre_processor=transformers.do_nothing(),
142
                 post_processor=transformers.do_nothing(),
143
                 should_fillna=True):
144
        self.featurizers = featurizers
145
        self.resample_freq = resample_freq
146
        self.components = components
147
        self.etl_manager=etl_manager
148
        self.pre_processor = pre_processor
149
        self.post_processor = post_processor
150
        self.should_fillna=should_fillna
151
        return
152
153
    def fit(self,X,y=None, **fit_params):
154
        self.fit_transform(X, y, **fit_params)
155
        return self
156
157
    def transform(self, X):
158
        return self.make_feature_set(self,X,False)
159
160
    def fit_transform(self,X, y=None, **fit_params):
161
        return self.make_feature_set(X, True, y, **fit_params)
162
163
    def make_feature_set(self, ids, fit, y=None, **fit_params):
164
        logger.log("Make Feature Set. id_count={}, #features={}, components=".format(len(ids),len(self.featurizers),self.components),new_level=True)
165
        if fit:
166
            self.comp_preprocessors = [(c,self.preprocessor_pipeline(c)) for c in self.components]
167
168
        adjusted_featurizers = [(ft_name,self.adjust_featurizer(ft)) for ft_name,ft in self.featurizers]
169
170
        pipeline_steps = [
171
            ('pre_processors',FeatureUnionDF(self.comp_preprocessors, add_name_level=False)),
172
            ('feature_union',FeatureUnionDF(adjusted_featurizers)),
173
            ('post_processor',self.post_processor),
174
        ]
175
176
        if self.should_fillna:
177
            pipeline_steps.append(('fillna',LocAndFillNaN(self.featurizers)))
178
179
        ft_union_pipeline = Pipeline(pipeline_steps)
180
        if fit: df = ft_union_pipeline.fit_transform(ids, y, **fit_params)
181
        else: df = ft_union_pipeline.transform(ids)
182
183
        logger.end_log_level()
184
        return df
185
186
    def adjust_featurizer(self,ft):
187
        return Featurizer(ft.agg_func,
188
                            resample_freq=self.resample_freq,
189
                            col_filter=ft.col_filter,
190
                            pre_processor=ft.pre_processor,
191
                            post_processor=ft.post_processor,
192
                            dropna=False
193
                        )
194
195
    def preprocessor_pipeline(self,comp):
196
        return Pipeline([
197
            ('data_loader',ComponentDataLoader(comp, self.etl_manager)),
198
            ('pre_processor',clone(self.pre_processor))
199
        ])
200
201
class LocAndFillNaN(TransformerMixin,BaseEstimator):
202
203
    def __init__(self,featurizers):
204
        self.featurizers = featurizers
205
206
    def transform(self, df):
207
        df = df.copy()
208
        for ft_name,ft in self.featurizers:
209
            df[ft_name] = ft.fillna_transformer.transform(df[ft_name])
210
        return df
211
212
    def fit(self, df, y=None, **fit_params):
213
        for ft_name,ft in self.featurizers:
214
            ft.fillna_transformer.fit(df[ft_name],y,**fit_params)
215
        return self
216
217
class ComponentDataLoader(TransformerMixin,BaseEstimator):
218
219
    def __init__(self,component,etl_manager):
220
        self.component = component
221
        self.etl_manager = etl_manager
222
223
    def transform(self, X):
224
        logger.log('Load data from component: {}'.format(self.component.upper()),new_level=True)
225
        if isinstance(X,pd.DataFrame) or isinstance(X,pd.Series):
226
            X = X.index
227
        if isinstance(X, pd.Index):
228
            ids=X.get_level_values(column_names.ID).unique().tolist()
229
        else: ids=X
230
231
        df_component = self.etl_manager.open_df(self.component,ids=ids)
232
233
        logger.end_log_level()
234
235
        return df_component
236
237
    def fit(self, X, y=None, **fit_params):
238
        return self