from sklearn.feature_selection import VarianceThreshold
import pandas as pd
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.impute import KNNImputer
import warnings
warnings.filterwarnings("ignore", category=FutureWarning, message=".*interpolate with object dtype is deprecated.*")
warnings.filterwarnings("ignore", category=FutureWarning, message=".*fillna with 'method' is deprecated.*")
pd.set_option('future.no_silent_downcasting', True)
class Preprocess:
def __init__(self, dataframe, missing_value_per, variance_threshold, min_null_per):
self.dataframe = dataframe
self.missing_value_per = missing_value_per
self.variance_threshold = variance_threshold
self.min_null_per = min_null_per
def describe(self):
return self.dataframe.describe(include='all')
def _apply_mapping(self, df, column, mapping):
if column in df.columns:
df[column] = df[column].map(mapping).fillna(df[column])
return df
def _mapping(self, df):
from .mapping import (
age_mapping,
age_binary_mapping,
gender_mapping,
hypertension_mapping,
other_conditions_mapping,
albumin_median_mapping,
albumin_mean_mapping,
albumin_min_mapping,
albumin_diff_mapping,
be_arterial_min_mapping,
bicarb_venous_min_mapping,
calcium_max_mapping,
lymphocytes_median_mapping,
neutrophils_mean_mapping,
pcr_diff_mapping,
platelets_mean_mapping,
potassium_median,
satO2_median_mapping,
sodium_mapping,
dimer_mapping,
resp_mapping,
target_mapping,
data_tags_mapping,
observation_mapping
)
self._apply_mapping(df, 'age more than 65', age_binary_mapping)
self._apply_mapping(df, 'age', age_mapping)
self._apply_mapping(df, 'gender', gender_mapping)
self._apply_mapping(df, 'hypertension', hypertension_mapping)
self._apply_mapping(df, 'other conditions', other_conditions_mapping)
self._apply_mapping(df, 'albumin# median', albumin_median_mapping)
self._apply_mapping(df, 'albumin mean', albumin_mean_mapping)
self._apply_mapping(df, 'albumin min', albumin_min_mapping)
self._apply_mapping(df, 'albumin diff', albumin_diff_mapping)
self._apply_mapping(df, 'be arterial min', be_arterial_min_mapping)
self._apply_mapping(df, 'bicarb venous min', bicarb_venous_min_mapping)
self._apply_mapping(df, 'calcium max', calcium_max_mapping)
self._apply_mapping(df, 'lymphocytes median', lymphocytes_median_mapping)
self._apply_mapping(df, 'neutrophils mean', neutrophils_mean_mapping)
self._apply_mapping(df, 'pcr diff', pcr_diff_mapping)
self._apply_mapping(df, '#platelets mean', platelets_mean_mapping)
self._apply_mapping(df, 'potassium median', potassium_median)
self._apply_mapping(df, 'satO2 arterial median', satO2_median_mapping)
self._apply_mapping(df, 'sodium diff', sodium_mapping)
self._apply_mapping(df, 'd dimer mean', dimer_mapping)
self._apply_mapping(df, 'resp rate min', resp_mapping)
self._apply_mapping(df, 'target label / yes no', target_mapping)
self._apply_mapping(df, 'data tags', data_tags_mapping)
self._apply_mapping(df, 'observation window', observation_mapping)
return df
def _remove_missing_values(self, df):
df_without_last = df.iloc[:, :-1]
missing_percentages = df_without_last.isnull().sum() / len(df)
selected_col = []
for feature_name, missing_value in missing_percentages.items():
if missing_value < self.missing_value_per:
selected_col.append(feature_name)
df_filtered = df.loc[:, selected_col]
df_filtered[df.columns[-1]] = df.loc[:, df.columns[-1]]
return df_filtered
def _remove_by_variance(self, df):
df_without_last = df.iloc[:, :-1]
selector = VarianceThreshold(threshold=self.variance_threshold)
selector.fit(df_without_last)
selected_col = df_without_last.columns[selector.get_support(indices=True)]
df_filtered = df.loc[:, selected_col]
df_filtered[df.columns[-1]] = df.loc[:, df.columns[-1]]
return df_filtered
def _remove_sparse_row(self, df):
non_null_counts = df.notna().sum(axis=1)
n_columns = df.shape[1]
t = self.min_null_per * n_columns
df = df[non_null_counts >= t]
return df
def apply(self):
self.dataframe = self._mapping(self.dataframe)
self.dataframe = self._remove_missing_values(self.dataframe)
self.dataframe = self._remove_by_variance(self.dataframe)
# self.dataframe = self._remove_sparse_row(self.dataframe)
class MissingValue:
def __init__(self, original_df: pd.DataFrame, test_size: float = 0.1):
self.original_df = original_df
self.modified_df = original_df.copy()
flattened_values = original_df.values.flatten()
indices = np.arange(len(flattened_values))
n_samples = int(test_size * len(flattened_values))
random_indices = np.random.choice(indices, size=n_samples, replace=False)
rows, cols = np.divmod(random_indices, original_df.shape[1])
for row, col in zip(rows, cols):
self.modified_df.iat[row, col] = np.nan
def fill_dataframe(self):
filled_df = self.original_df.copy()
for column in self.original_df.columns:
knn_col = self._knn(self.modified_df[[column]])[column]
avg_col = self._avg(self.modified_df[[column]])[column]
interpolated_col = self._interpolation(self.modified_df[[column]].astype(float))[column]
mode_col = self._mode(self.modified_df[[column]])[column]
mae_knn = mean_absolute_error(self.modified_df[[column]].fillna(0), knn_col)
mae_avg = mean_absolute_error(self.modified_df[[column]].fillna(0), avg_col)
mae_interpolated = mean_absolute_error(self.modified_df[[column]].fillna(0), interpolated_col)
mae_mode = mean_absolute_error(self.modified_df[[column]].fillna(0), mode_col)
min_mae = min(mae_knn, mae_avg, mae_interpolated, mae_mode)
if min_mae == mae_knn:
filled_df[column] = knn_col
elif min_mae == mae_avg:
filled_df[column] = avg_col
elif min_mae == mae_interpolated:
filled_df[column] = interpolated_col
else:
filled_df[column] = mode_col
return filled_df
def _knn(self, df: pd.DataFrame, neighbor: int = 3):
knn_imputer = KNNImputer(n_neighbors=neighbor)
df_filled = pd.DataFrame(knn_imputer.fit_transform(df), columns=df.columns)
df_filled.index = df.index
return df_filled
def _avg(self, df: pd.DataFrame):
return df.fillna(df.mean())
def _mode(self, df: pd.DataFrame):
mode_values = df.mode().iloc[0]
return df.fillna(mode_values)
def _interpolation(self, df, method='linear', limit_direction='both'):
df_filled = df.interpolate(method=method, limit_direction=limit_direction)
df_filled = df_filled.fillna(method='bfill').fillna(method='ffill')
return df_filled