[59083a]: / utils / preprocess.py

Download this file

190 lines (148 with data), 7.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from sklearn.feature_selection import VarianceThreshold
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.impute import KNNImputer
import warnings
warnings.filterwarnings("ignore", category=FutureWarning, message=".*interpolate with object dtype is deprecated.*")
warnings.filterwarnings("ignore", category=FutureWarning, message=".*fillna with 'method' is deprecated.*")
pd.set_option('future.no_silent_downcasting', True)
class Preprocess:
def __init__(self, dataframe, missing_value_per, variance_threshold, min_null_per):
self.dataframe = dataframe
self.missing_value_per = missing_value_per
self.variance_threshold = variance_threshold
self.min_null_per = min_null_per
def describe(self):
return self.dataframe.describe(include='all')
def _apply_mapping(self, df, column, mapping):
if column in df.columns:
df[column] = df[column].map(mapping).fillna(df[column])
return df
def _mapping(self, df):
from .mapping import (
age_mapping,
age_binary_mapping,
gender_mapping,
hypertension_mapping,
other_conditions_mapping,
albumin_median_mapping,
albumin_mean_mapping,
albumin_min_mapping,
albumin_diff_mapping,
be_arterial_min_mapping,
bicarb_venous_min_mapping,
calcium_max_mapping,
lymphocytes_median_mapping,
neutrophils_mean_mapping,
pcr_diff_mapping,
platelets_mean_mapping,
potassium_median,
satO2_median_mapping,
sodium_mapping,
dimer_mapping,
resp_mapping,
target_mapping,
data_tags_mapping,
observation_mapping
)
self._apply_mapping(df, 'age more than 65', age_binary_mapping)
self._apply_mapping(df, 'age', age_mapping)
self._apply_mapping(df, 'gender', gender_mapping)
self._apply_mapping(df, 'hypertension', hypertension_mapping)
self._apply_mapping(df, 'other conditions', other_conditions_mapping)
self._apply_mapping(df, 'albumin# median', albumin_median_mapping)
self._apply_mapping(df, 'albumin mean', albumin_mean_mapping)
self._apply_mapping(df, 'albumin min', albumin_min_mapping)
self._apply_mapping(df, 'albumin diff', albumin_diff_mapping)
self._apply_mapping(df, 'be arterial min', be_arterial_min_mapping)
self._apply_mapping(df, 'bicarb venous min', bicarb_venous_min_mapping)
self._apply_mapping(df, 'calcium max', calcium_max_mapping)
self._apply_mapping(df, 'lymphocytes median', lymphocytes_median_mapping)
self._apply_mapping(df, 'neutrophils mean', neutrophils_mean_mapping)
self._apply_mapping(df, 'pcr diff', pcr_diff_mapping)
self._apply_mapping(df, '#platelets mean', platelets_mean_mapping)
self._apply_mapping(df, 'potassium median', potassium_median)
self._apply_mapping(df, 'satO2 arterial median', satO2_median_mapping)
self._apply_mapping(df, 'sodium diff', sodium_mapping)
self._apply_mapping(df, 'd dimer mean', dimer_mapping)
self._apply_mapping(df, 'resp rate min', resp_mapping)
self._apply_mapping(df, 'target label / yes no', target_mapping)
self._apply_mapping(df, 'data tags', data_tags_mapping)
self._apply_mapping(df, 'observation window', observation_mapping)
return df
def _remove_missing_values(self, df):
df_without_last = df.iloc[:, :-1]
missing_percentages = df_without_last.isnull().sum() / len(df)
selected_col = []
for feature_name, missing_value in missing_percentages.items():
if missing_value < self.missing_value_per:
selected_col.append(feature_name)
df_filtered = df.loc[:, selected_col]
df_filtered[df.columns[-1]] = df.loc[:, df.columns[-1]]
return df_filtered
def _remove_by_variance(self, df):
df_without_last = df.iloc[:, :-1]
selector = VarianceThreshold(threshold=self.variance_threshold)
selector.fit(df_without_last)
selected_col = df_without_last.columns[selector.get_support(indices=True)]
df_filtered = df.loc[:, selected_col]
df_filtered[df.columns[-1]] = df.loc[:, df.columns[-1]]
return df_filtered
def _remove_sparse_row(self, df):
non_null_counts = df.notna().sum(axis=1)
n_columns = df.shape[1]
t = self.min_null_per * n_columns
df = df[non_null_counts >= t]
return df
def apply(self):
self.dataframe = self._mapping(self.dataframe)
self.dataframe = self._remove_missing_values(self.dataframe)
self.dataframe = self._remove_by_variance(self.dataframe)
# self.dataframe = self._remove_sparse_row(self.dataframe)
class MissingValue:
def __init__(self, original_df: pd.DataFrame, test_size: float = 0.1):
self.original_df = original_df
self.modified_df = original_df.copy()
flattened_values = original_df.values.flatten()
indices = np.arange(len(flattened_values))
n_samples = int(test_size * len(flattened_values))
random_indices = np.random.choice(indices, size=n_samples, replace=False)
rows, cols = np.divmod(random_indices, original_df.shape[1])
for row, col in zip(rows, cols):
self.modified_df.iat[row, col] = np.nan
def fill_dataframe(self):
filled_df = self.original_df.copy()
for column in self.original_df.columns:
knn_col = self._knn(self.modified_df[[column]])[column]
avg_col = self._avg(self.modified_df[[column]])[column]
interpolated_col = self._interpolation(self.modified_df[[column]].astype(float))[column]
mode_col = self._mode(self.modified_df[[column]])[column]
mae_knn = mean_absolute_error(self.modified_df[[column]].fillna(0), knn_col)
mae_avg = mean_absolute_error(self.modified_df[[column]].fillna(0), avg_col)
mae_interpolated = mean_absolute_error(self.modified_df[[column]].fillna(0), interpolated_col)
mae_mode = mean_absolute_error(self.modified_df[[column]].fillna(0), mode_col)
min_mae = min(mae_knn, mae_avg, mae_interpolated, mae_mode)
if min_mae == mae_knn:
filled_df[column] = knn_col
elif min_mae == mae_avg:
filled_df[column] = avg_col
elif min_mae == mae_interpolated:
filled_df[column] = interpolated_col
else:
filled_df[column] = mode_col
return filled_df
def _knn(self, df: pd.DataFrame, neighbor: int = 3):
knn_imputer = KNNImputer(n_neighbors=neighbor)
df_filled = pd.DataFrame(knn_imputer.fit_transform(df), columns=df.columns)
df_filled.index = df.index
return df_filled
def _avg(self, df: pd.DataFrame):
return df.fillna(df.mean())
def _mode(self, df: pd.DataFrame):
mode_values = df.mode().iloc[0]
return df.fillna(mode_values)
def _interpolation(self, df, method='linear', limit_direction='both'):
df_filled = df.interpolate(method=method, limit_direction=limit_direction)
df_filled = df_filled.fillna(method='bfill').fillna(method='ffill')
return df_filled