--- a +++ b/Stats/PreProcess.py @@ -0,0 +1,670 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- +# +# Copyright 2017 University of Westminster. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""It is an interface for the developed pre-processing functions (factoring and near-zero-variance, +high-linear-correlation) and statistical summaries. +""" + +from typing import Dict, List, TypeVar, Any +from Configs.CONSTANTS import CONSTANTS +from ReadersWriters.PyConfigParser import PyConfigParser +from ReadersWriters.ReadersWriters import ReadersWriters +from Stats.FactoringThread import FactoringThread +from Stats.TransformThread import TransformThread +import os +import sys +import numpy as np +import pandas as pd +import multiprocessing as mp +from collections import OrderedDict +from sklearn import feature_selection +from scipy.stats import stats +from functools import partial +import logging + +PandasDataFrame = TypeVar('DataFrame') +CollectionsOrderedDict = TypeVar('OrderedDict') + +__author__ = "Mohsen Mesgarpour" +__copyright__ = "Copyright 2016, https://github.com/mesgarpour" +__credits__ = ["Mohsen Mesgarpour"] +__license__ = "GPL" +__version__ = "1.1" +__maintainer__ = "Mohsen Mesgarpour" +__email__ = "mohsen.mesgarpour@gmail.com" +__status__ = "Release" + + +class PreProcess: + def __init__(self, + output_path: str): + """Initialise the objects and constants. + :param output_path: + """ + self.__logger = logging.getLogger(CONSTANTS.app_name) + self.__logger.debug(__name__) + self.__output_path = output_path + self.__readers_writers = ReadersWriters() + + def stats_discrete_df(self, + df: PandasDataFrame, + includes: List, + file_name: str) -> PandasDataFrame: + """Calculate the odds ratio for all the features that are included and all the categorical states. + :param df: the features dataframe. + :param includes: the name of included features. + :param file_name: the name of the summary output file. + :return: the summary output. + """ + self.__logger.debug("Produce statistics for discrete features.") + summaries = None + self.__readers_writers.save_csv(path=self.__output_path, + title=file_name, + data=[], + append=False) + + for f_name in includes: + if f_name in df: + self.__readers_writers.save_csv(path=self.__output_path, + title=file_name, + data=["Feature Name", f_name], + append=True) + summaries = stats.itemfreq(df[f_name]) + summaries = pd.DataFrame({"value": summaries[:, 0], "freq": summaries[:, 1]}) + summaries = summaries.sort_values("freq", ascending=False) + self.__readers_writers.save_csv(path=self.__output_path, + title=file_name, + data=summaries, + append=True, + header=True) + return summaries + + def stats_continuous_df(self, + df: PandasDataFrame, + includes: List, + file_name: str) -> PandasDataFrame: + """Calculate the descriptive statistics for all the included continuous features. + :param df: the features dataframe. + :param includes: the name of included features. + :param file_name: the name of the summary output file. + :return: the summary output. + """ + self.__logger.debug("Produce statistics for continuous features.") + summaries = None + self.__readers_writers.save_csv(path=self.__output_path, + title=file_name, + data=[], + append=False) + + for f_name in includes: + if f_name in df: + self.__readers_writers.save_csv(path=self.__output_path, + title=file_name, + data=["Feature Name", f_name], + append=True) + summaries = df[f_name].apply(pd.to_numeric).describe( + percentiles=[0.05, 0.25, 0.5, 0.75, 0.95]).transpose() + summaries = pd.Series.to_frame(summaries).transpose() + self.__readers_writers.save_csv(path=self.__output_path, + title=file_name, + data=summaries, + append=True, + header=True) + return summaries + + def factoring_group_wise(self, + df: PandasDataFrame, + categories_dic: Dict, + labels_dic: Dict, + dtypes_dic: Dict, + threaded: bool=False) -> PandasDataFrame: + """Categorise groups of features that are selected. + :param df: the features dataframe. + :param categories_dic: the dictionary of the categorical states for the included features. + :param labels_dic: the dictionary of the features names of the categorised features. + :param dtypes_dic: the dictionary of the dtypes of the categorised features. + :param threaded: indicates if it is multi-threaded. + :return: the inputted dataframe with categorised features (if applicable). + """ + self.__logger.debug("Categorise groups of features.") + categories_dic = OrderedDict(categories_dic) + + if threaded is not True: + pool_df_encoded = self.__factoring_group_wise_series(df, categories_dic, labels_dic) + else: + pool_df_encoded = self.__factoring_group_wise_threaded(df, categories_dic, labels_dic) + + # encoded labels + labels_encoded = [] + for label_group in categories_dic.keys(): + labels_encoded += list(categories_dic[label_group].keys()) + + # preserve types + dtype_orig = {**df.dtypes.to_dict(), **dtypes_dic} + dtype_orig = pd.DataFrame(dtype_orig, index=[0]).dtypes + for label in labels_encoded: + del dtype_orig[label] + + # combine + df = df.drop(labels_encoded, axis=1) + df = pd.concat([df] + pool_df_encoded, axis=1) + df = df.astype(dtype_orig) + return df + + def __factoring_group_wise_series(self, + df: PandasDataFrame, + categories_dic: Dict, + labels_dic: Dict) -> List: + """Categorise a group of features that are selected (single-threaded). + :param df: the features dataframe. + :param categories_dic: the dictionary of the categorical states for the included features. + :param labels_dic: the dictionary of the features names of the categorised features. + :return: the categorised features. + """ + self.__logger.debug("Categorise groups of features (single-threaded).") + factoring_thread = FactoringThread(df, categories_dic, labels_dic) + pool_df_encoded = [] + + try: + for label_group in categories_dic.keys(): + pool_df_encoded.append(factoring_thread.factor_arr_multiple(label_group)) + except ValueError as exception: + self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception)) + sys.exit() + return pool_df_encoded + + def __factoring_group_wise_threaded(self, + df: PandasDataFrame, + categories_dic: Dict, + labels_dic: Dict) -> List: + """Categorise a group of features that are selected (multi-threaded). + :param df: the features dataframe. + :param categories_dic: the dictionary of the categorical states for the included features. + :param labels_dic: the dictionary of the features names of the categorised features. + :return: the categorised features. + """ + self.__logger.debug("Categorise groups of features (multi-threaded).") + factoring_thread = FactoringThread(df, categories_dic, labels_dic) + try: + with mp.Pool(processes=(mp.cpu_count() - 1)) as pool: + pool_df_encoded = pool.map( + partial(factoring_thread.factor_arr_multiple), categories_dic.keys()) + except ValueError as exception: + self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception)) + sys.exit() + return pool_df_encoded + + def factoring_feature_wise(self, + df: PandasDataFrame, + categories_dic: Dict, + labels_dic: Dict, + dtypes_dic: Dict, + threaded: bool=False) -> PandasDataFrame: + """Categorise features that are selected. + :param df: the features dataframe. + :param categories_dic: the dictionary of the categorical states for the included features. + :param labels_dic: the dictionary of the features names of the categorised features. + :param dtypes_dic: the dictionary of the dtypes of the categorised features. + :param threaded: indicates if it is multi-threaded. + :return: the inputted dataframe with categorised features (if applicable). + """ + self.__logger.debug("Categorise.") + categories_dic = OrderedDict(categories_dic) + + if threaded is not True: + pool_df_encoded = self.__factoring_feature_wise_series(df, categories_dic, labels_dic) + else: + pool_df_encoded = self.__factoring_feature_wise_threaded(df, categories_dic, labels_dic) + + # encoded labels + labels_encoded = list(categories_dic.keys()) + + # preserve types + dtype_orig = {**df.dtypes.to_dict(), **dtypes_dic} + dtype_orig = pd.DataFrame(dtype_orig, index=[0]).dtypes + for label in labels_encoded: + del dtype_orig[label] + + # combine + df = df.drop(labels_encoded, axis=1) + df = pd.concat([df] + pool_df_encoded, axis=1) + df = df.astype(dtype_orig) + return df + + def __factoring_feature_wise_series(self, + df: PandasDataFrame, + categories_dic: Dict, + labels_dic: Dict) -> List: + """Categorise features that are selected (single-threaded). + :param df: the features dataframe. + :param categories_dic: the dictionary of the categorical states for the included features. + :param labels_dic: the dictionary of the features names of the categorised features. + :return: the categorised features. + """ + self.__logger.debug("Categorise (single-threaded).") + factoring_thread = FactoringThread(df, categories_dic, labels_dic) + pool_df_encoded = [] + + try: + for label_group in categories_dic.keys(): + pool_df_encoded.append(factoring_thread.factor_arr(label_group)) + except ValueError as exception: + self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception)) + sys.exit() + return pool_df_encoded + + def __factoring_feature_wise_threaded(self, + df: PandasDataFrame, + categories_dic: Dict, + labels_dic: Dict) -> List: + """Categorise features that are selected (multi-threaded). + :param df: the features dataframe. + :param categories_dic: the dictionary of the categorical states for the included features. + :param labels_dic: the dictionary of the features names of the categorised features. + :return: the categorised features. + """ + self.__logger.debug("Categorise (multi-threaded).") + factoring_thread = FactoringThread(df, categories_dic, labels_dic) + try: + with mp.Pool() as pool: + pool_df_encoded = pool.map( + partial(factoring_thread.factor_arr), categories_dic.keys()) + except ValueError as exception: + self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception)) + sys.exit() + return pool_df_encoded + + def transform_df(self, + df: PandasDataFrame, + excludes: List, + transform_type: str, + threaded: bool=False, + method_args: Dict=None, + **kwargs: Any) -> [PandasDataFrame, Dict]: + """Transform the included features, using the selected and configured method. + :param df: the features dataframe. + :param excludes: the name of excluded features. + :param transform_type: the transformation type (options: 'scale', 'robust_scale', 'max_abs_scalar', + 'normalizer', 'kernel_centerer', 'yeo_johnson', 'box_cox') + :param threaded: indicates if it is multi-threaded. + :param method_args: the transformation arguments, which needs to preserved if it is applied to more than + one data set. + :param kwargs: the input argument for the selected transformation function. + :return: the inputted dataframe with transformed features (if applicable). + """ + self.__logger.info("Transform Features.") + excludes = set(excludes) + includes = [label for label in df.columns.values if label not in excludes] + method_args = dict() if method_args is None else method_args + + # preserve types + dtype_orig = df.dtypes.to_dict() + for label in includes: + dtype_orig[label] = 'f8' + dtype_orig = pd.DataFrame(dtype_orig, index=[0]).dtypes + df = df.astype(dtype_orig) + + # transform + if threaded is False: + df, method_args = self.__transform_df_series(df, includes, transform_type, **kwargs) + else: + df, method_args = self.__transform_df_threaded(df, includes, transform_type, method_args, **kwargs) + return df, method_args + + def __transform_df_series(self, + df: PandasDataFrame, + includes: List, + transform_type: str, + method_args: Dict=None, + **kwargs: Any) -> [PandasDataFrame, Dict]: + """Transform the included features, using the selected and configured method (single-threaded). + :param df: the features dataframe. + :param includes: the name of included features. + :param transform_type: the transformation type (options: 'scale', 'robust_scale', 'max_abs_scalar', + 'normalizer', 'kernel_centerer', 'yeo_johnson', 'box_cox') + :param method_args: the transformation arguments, which needs to preserved if it is applied to more than + one data set. + :param kwargs: the input argument for the selected transformation function. + :return: the transformed feature. + """ + self.__logger.debug("Transform features (single-threaded).") + transform_thread = TransformThread(**kwargs) + method_args = dict() if method_args is None else method_args + + try: + if transform_type == "scale": + for name in includes: + transform_thread.transform_scale_arr(df, method_args, name) + elif transform_type == "robust_scale": + for name in includes: + transform_thread.transform_robust_scale_arr(df, method_args, name) + elif transform_type == "max_abs_scalar": + for name in includes: + transform_thread.transform_max_abs_scalar_arr(df, method_args, name) + elif transform_type == "normalizer": + for name in includes: + transform_thread.transform_normalizer_arr(df, method_args, name) + elif transform_type == "kernel_centerer": + for name in includes: + transform_thread.transform_kernel_centerer_arr(df, method_args, name) + elif transform_type == "yeo_johnson": + for name in includes: + transform_thread.transform_yeo_johnson_arr(df, method_args, name) + elif transform_type == "box_cox": + for name in includes: + transform_thread.transform_box_cox_arr(df, method_args, name) + else: + raise Exception(transform_type) + except ValueError as exception: + self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception)) + sys.exit() + + return df, method_args + + def __transform_df_threaded(self, + df: PandasDataFrame, + includes: List, + transform_type: str, + method_args: Dict=None, + **kwargs: Any) -> [PandasDataFrame, Dict]: + """Transform the included features, using the selected and configured method (multi-threaded). + :param df: the features dataframe. + :param includes: the name of included features. + :param transform_type: the transformation arguments, which needs to preserved if it is applied to more than + one data set. + :param method_args: the transformation arguments, which needs to preserved if it is applied to more than + one data set. + :param kwargs: the input argument for the selected transformation function. + :return: the transformed feature. + """ + self.__logger.debug("Transform features (multi-threaded).") + manager = mp.Manager() + dt = manager.dict(list(zip(df[includes].columns, df[includes].T.values.tolist()))) + transform_thread = TransformThread(**kwargs) + method_args = dict() if method_args is None else method_args + + # run + try: + with mp.Pool(processes=(mp.cpu_count() - 1)) as pool: + if transform_type == "scale": + pool.map(partial(transform_thread.transform_scale_arr, dt, method_args), includes) + elif transform_type == "robust_scale": + pool.map(partial(transform_thread.transform_robust_scale_arr, dt, method_args), includes) + elif transform_type == "max_abs_scalar": + pool.map(partial(transform_thread.transform_max_abs_scalar_arr, dt, method_args), includes) + elif transform_type == "normalizer": + pool.map(partial(transform_thread.transform_normalizer_arr, dt, method_args), includes) + elif transform_type == "kernel_centerer": + pool.map(partial(transform_thread.transform_kernel_centerer_arr, dt, method_args), includes) + elif transform_type == "yeo_johnson": + pool.map(partial(transform_thread.transform_yeo_johnson_arr, dt, method_args), includes) + elif transform_type == "box_cox": + pool.map(partial(transform_thread.transform_box_cox_arr, dt, method_args), includes) + else: + raise Exception(transform_type) + except ValueError as exception: + self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception)) + sys.exit() + + # set + for k, v in dt.items(): + df[k] = v + + return df, method_args + + def high_linear_correlation_df(self, + df: PandasDataFrame, + excludes: List, + file_name: str, + thresh_corr_cut: float=0.95, + to_search: bool=True) -> [PandasDataFrame, CollectionsOrderedDict]: + """Find and optionally remove the selected highly linearly correlated features. + The Pearson correlation coefficient was calculated for all the pair of variables to measure linear dependence + between them. + :param df: the features dataframe. + :param excludes: the name of excluded features. + :param file_name: the name of the summary output file. + :param thresh_corr_cut: the numeric value for the pair-wise absolute correlation cutoff. e.g. 0.95. + :param to_search: to search or use the saved configuration. + :return: the inputted dataframe with exclusion of features that were selected to be removed. + """ + self.__logger.debug("Remove features with high linear correlation (if applicable).") + corr = None + df_excludes = df[excludes] + excludes = set(excludes) + matches = [] + summaries = OrderedDict() + + # search + if to_search is True: + corr = df[[col for col in df.columns if col not in excludes]].corr(method='pearson') + for label in corr.columns.values: + matches_temp = list(corr[abs(corr[label]) >= thresh_corr_cut].index) + if len(matches_temp) > 1: + # set matches + try: + matches_temp.remove(label) + except ValueError and AttributeError: + pass # not in some-list! OR not behaving like a list! + matches = np.union1d(matches, matches_temp) + + # summaries + for match in matches_temp: + if match in summaries.keys(): + matches_temp.remove(match) + if len(matches_temp) > 0: + summaries[label] = matches_temp + self.__logger.info("High Linear Correlation: " + label + " ~ " + str(matches_temp)) + + # delete + df = self.__remove(df, summaries, to_search, os.path.join(self.__output_path, file_name + ".ini")) + for name in excludes: + df[name] = df_excludes[name] + if any(np.isnan(df.index)): + df = df.reset_index(drop=True) + + # summaries + if to_search is True: + summaries["Features Matches"] = matches + summaries["Correlation Matrix"] = corr + return df, summaries + + def near_zero_var_df_sklearn(self, + df: PandasDataFrame, + excludes: List, + file_name: str, + thresh_variance: float=0.05, + to_search: bool=True) -> [PandasDataFrame, CollectionsOrderedDict]: + """Find and optionally remove the selected near-zero-variance features (Scikit algorithm). + Feature selector that removes all low-variance features. + This feature selection algorithm looks only at the features (X), not the desired outputs (y), and can thus be + used for unsupervised learning. + :param df: the features dataframe. + :param excludes: the name of excluded features. + :param file_name: the name of the summary output file. + :param thresh_variance: Features with a training-set variance lower than this threshold will be removed. + The default is to keep all features with non-zero variance, i.e. remove the features that have the same + value in all samples. + :param to_search: to search or use the saved configuration. + :return: the inputted dataframe with exclusion of features that were selected to be removed. + """ + self.__logger.debug("Remove features with near-zero-variance (if applicable), using Scikit algorithm.") + df_excludes = df[excludes] + excludes = set(excludes) + matches = [] + indices = OrderedDict() + summaries = OrderedDict() + + # find indices + for label in df.columns.values(): + indices[df.columns.get_loc(label)] = label + + # search + if to_search is True: + variances_ = feature_selection.VarianceThreshold(thresh_variance) + matches_indices = variances_.get_support(indices=True) + matches_labels = [indices[index] for index in matches_indices] + for match in matches_labels: + if match not in excludes: + matches += [match] + + # delete + df = self.__remove(df, {'NZV': list(matches)}, to_search, os.path.join(self.__output_path, file_name + ".ini")) + for name in excludes: + df[name] = df_excludes[name] + if any(np.isnan(df.index)): + df = df.reset_index(drop=True) + + # summaries + if to_search is True: + summaries["Features Matches"] = matches + return df, summaries + + def near_zero_var_df(self, + df: PandasDataFrame, + excludes: List, + file_name: str, + thresh_unique_cut: float=100, + thresh_freq_cut: float=1000, + to_search: bool=True) -> [PandasDataFrame, CollectionsOrderedDict]: + """Find and optionally remove the selected near-zero-variance features (custom algorithm). + The features that had constant counts less than or equal a threshold may be filtered out, + to exclude highly constants and near-zero variances. + Rules are as the following: + - Frequency ratio: The frequency of the most prevalent value over the second most frequent value to be + greater than a threshold; + - Percent of unique values: The number of unique values divided by the total number of samples to be greater + than the threshold. + :param df: the features dataframe. + :param excludes: the name of excluded features. + :param file_name: the name of the summary output file. + :param thresh_unique_cut: the cutoff for the percentage of distinct values out of the number of total samples + (upper limit). e.g. 10 * 100 / 100. + :param thresh_freq_cut: the cutoff for the ratio of the most common value to the second most common value + (lower limit). e.g. 95/5. + :param to_search: to search or use the saved configuration. + :return: the inputted dataframe with exclusion of features that were selected to be removed. + """ + self.__logger.debug("Remove features with near-zero-variance (if applicable), using custom algorithm.") + df_excludes = df[excludes] + excludes = set(excludes) + matches = [] + summaries = OrderedDict() + + # search + if to_search is True: + for label in df.columns.values: + # set match and summaries + # check of NaN + if not isinstance(df[label].iloc[0], (int, np.int, float, np.float)) \ + or np.isnan(np.sum(df[label])): + matches += [label] + continue + # check of near zero variance + match, summaries[label] = self.__near_zero_var( + df[label], label, excludes, thresh_unique_cut, thresh_freq_cut) + if match is True: + matches += [label] + self.__logger.info("Near Zero Variance: " + label) + + # to_remove + df = self.__remove(df, {'NZV': list(matches)}, to_search, os.path.join(self.__output_path, file_name + ".ini")) + for name in excludes: + df[name] = df_excludes[name] + if any(np.isnan(df.index)): + df = df.reset_index(drop=True) + + # summaries + if to_search is True: + summaries["Features Matches"] = matches + return df, summaries + + def __near_zero_var(self, + arr: List, + label: str, + excludes: set, + thresh_unique_cut: float, + thresh_freq_cut: float) -> [bool, Dict]: + """Assess a single feature for near-zero-variance (custom algorithm). + The features that had constant counts less than or equal a threshold may be filtered out, + to exclude highly constants and near-zero variances. + Rules are as the following: + - Frequency ratio: The frequency of the most prevalent value over the second most frequent value to be + greater than a threshold; + - Percent of unique values: The number of unique values divided by the total number of samples to be greater + than the threshold. + + :param arr: the feature value. + :param label: the feature name. + :param excludes: the name of excluded features. + :param thresh_unique_cut: the cutoff for the percentage of distinct values out of the number of total samples + (upper limit). e.g. 10 * 100 / 100. + :param thresh_freq_cut: the cutoff for the ratio of the most common value to the second most common value + (lower limit). e.g. 95/5. + :return: indicates if the feature has near-zero-variance. + """ + self.__logger.debug("Find near-zero-variance (if applicable), using custom algorithm.") + unique, counts = np.unique(arr, return_counts=True) + if len(counts) == 1: + return True, {'unique': list(unique), 'counts': list(counts)} + else: + counts = sorted(counts, reverse=True) + if label not in excludes and (len(unique) * 100) / float(len(arr)) > thresh_unique_cut: + return True, {'unique': list(unique), 'counts': list(counts)} + if label not in excludes and counts[0] / float(counts[1]) > thresh_freq_cut: + return True, {'unique': list(unique), 'counts': list(counts)} + else: + return False, {'unique': list(unique), 'counts': list(counts)} + + def __remove(self, + df: PandasDataFrame, + dict_matches: Dict, + to_search: bool, + path: str, + section: str="features") -> PandasDataFrame: + """Confirm removals and if confirmed, then re-read the selected features, then remove + :param df: the features dataframe. + :param dict_matches: the matched features. + :param to_search: to search or use the saved configuration. + :param path: the file path to the configuration file. + :param section: the section name in the configuration file. + :return: the updated features. + """ + self.__logger.debug("Confirm removals and implement removal process.") + config = PyConfigParser(path, CONSTANTS.app_name) + + if to_search is True: + # write to config + config.reset() + config.write_dict(dict_matches, section) + # confirm + response = self.__readers_writers.question_overwrite( + "the features defined in the following file to be removed: " + path) + if response is False: + config.reset() + return df + + # if to_search is False or response was yes then read from config + config.refresh() + dict_matches = config.read_dict(section) + + # remove + self.__logger.debug("The feature removal list: " + ",".join(dict_matches)) + labels = [label for label_group in dict_matches.values() for label in label_group if label in df] + if len(labels) > 0: + df = df.drop(labels, axis=1) + return df