Diff of /Features/Variables.py [000000] .. [b4a150]

Switch to side-by-side view

--- a
+++ b/Features/Variables.py
@@ -0,0 +1,390 @@
+#!/usr/bin/env python
+# -*- coding: UTF-8 -*-
+#
+# Copyright 2017 University of Westminster. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==============================================================================
+""" It reads and process variables.
+"""
+
+from typing import List, TypeVar, Dict, Callable
+import sys
+import logging
+import pandas as pd
+from ReadersWriters.ReadersWriters import ReadersWriters
+from Features.FeatureParser import FeatureParser
+from Configs.CONSTANTS import CONSTANTS
+
+PandasDataFrame = TypeVar('DataFrame')
+FeaturesFeatureParser = TypeVar('FeatureParser')
+
+__author__ = "Mohsen Mesgarpour"
+__copyright__ = "Copyright 2016, https://github.com/mesgarpour"
+__credits__ = ["Mohsen Mesgarpour"]
+__license__ = "GPL"
+__version__ = "1.1"
+__maintainer__ = "Mohsen Mesgarpour"
+__email__ = "mohsen.mesgarpour@gmail.com"
+__status__ = "Release"
+
+
+class Variables:
+    def __init__(self,
+                 model_features_table: str,
+                 input_path: str,
+                 output_path: str,
+                 input_features_configs: str,
+                 output_table: str):
+        """Initialise the objects and constants.
+        :param model_features_table: the feature table name.
+        :param input_path: the input path.
+        :param output_path: the output path.
+        :param input_features_configs: the input features' configuration file.
+        :param output_table: the output table name.
+        """
+        self.__logger = logging.getLogger(CONSTANTS.app_name)
+        self.__logger.debug(__name__)
+        self.__model_features_table = model_features_table
+        self.__output_path = output_path
+        self.__output_table = output_table
+        self.__readers_writers = ReadersWriters()
+        # initialise settings
+        self.__variables_settings = self.__init_settings(input_path, input_features_configs)
+        self.__features_dic_names = self.__init_features_names()
+        self.__features_dic_dtypes = self.__init_features_dtypes()
+        self.__init_output(output_path, output_table)
+
+    def set(self,
+            input_schemas: List,
+            input_tables: List,
+            history_tables: List,
+            column_index: str,
+            query_batch_size: int):
+        """Set the variables by reading the selected features from MySQL database.
+        :param input_schemas: the mysql database schemas.
+        :param input_tables: the mysql table names.
+        :param history_tables: the source tables' alias names (a.k.a. history table name) that features belong to
+            (e.g. inpatient, or outpatient).
+        :param column_index: the name of index column (unique integer value) in the database table, which is used
+            for batch reading the input.
+        :param query_batch_size: the number of rows to be read in each batch.
+        :return:
+        """
+        self.__logger.debug(__name__)
+        query_batch_start, query_batch_max = self.__init_batch(input_schemas[0], input_tables[0])
+        features_names, features_dtypes = self.__set_features_names_types()
+        self.__validate_mysql_names(input_schemas, input_tables)
+        prevalence = self.__init_prevalence(input_schemas, input_tables, history_tables)
+        self.__set_batch(features_names, features_dtypes, input_schemas, input_tables, history_tables, column_index,
+                         prevalence, query_batch_start, query_batch_max, query_batch_size)
+
+    def __init_settings(self,
+                        input_path: str,
+                        input_features_configs: str) -> PandasDataFrame:
+        """Read and set the settings of input variables that are selected.
+        :param input_path: the path of the input file.
+        :param input_features_configs: the input features' configuration file.
+        :return: the input variables settings.
+        """
+        self.__logger.debug(__name__)
+        variables_settings = self.__readers_writers.load_csv(input_path, input_features_configs, 0, True)
+        variables_settings = variables_settings.loc[
+            (variables_settings["Selected"] == 1) &
+            (variables_settings["Table_Reference_Name"] == self.__model_features_table)]
+        variables_settings = variables_settings.reset_index()
+        return variables_settings
+
+    def __init_features_names(self) -> Dict:
+        """Generate the features names, based on variable name, source table alias name (a.k.a. history table
+            name), and the aggregation function name.
+        :return: the name of features.
+        """
+        self.__logger.debug(__name__)
+        table_history_names = set(self.__variables_settings["Table_History_Name"])
+        features_names = dict(zip(table_history_names, [[] for _ in range(len(table_history_names))]))
+        for _, row in self.__variables_settings.iterrows():
+            if not pd.isnull(row["Variable_Aggregation"]):
+                postfixes = row["Variable_Aggregation"].replace(' ', '').split(',')
+                for postfix in postfixes:
+                    features_names[row["Table_History_Name"]].append(row["Variable_Name"] + "_" + postfix)
+            else:
+                features_names[row["Table_History_Name"]].append(row["Variable_Name"])
+        return features_names
+
+    def __init_features_dtypes(self) -> Dict:
+        """Generate the features types, based on the input configuration file.
+        :return: the dtypes of features.
+        """
+        self.__logger.debug(__name__)
+        table_history_names = set(self.__variables_settings["Table_History_Name"])
+        features_dtypes = dict(zip(table_history_names, [[] for _ in range(len(table_history_names))]))
+        for _, row in self.__variables_settings.iterrows():
+            feature_types = row["Variable_dType"].replace(' ', '').split(',')
+            for feature_type in feature_types:
+                features_dtypes[row["Table_History_Name"]].append(feature_type)
+        return features_dtypes
+
+    def __init_output(self,
+                      output_path: str,
+                      output_table: str):
+        """Initialise the output file by writing the header row.
+        :param output_path: the output path.
+        :param output_table: the output table name.
+        """
+        self.__logger.debug(__name__)
+        keys = sorted(self.__features_dic_names.keys())
+        features_names = [f for k in keys for f in self.__features_dic_names[k]]
+        self.__readers_writers.reset_csv(output_path, output_table)
+        self.__readers_writers.save_csv(output_path, output_table, features_names, append=False)
+
+    def __init_prevalence(self,
+                          input_schemas: List,
+                          input_tables: List,
+                          history_tables: List)-> Dict:
+        """Generate the prevalence dictionary of values for all the variables.
+        :param input_schemas: the mysql database schemas.
+        :param input_tables: the mysql table names.
+        :param history_tables: the source tables' alias names (a.k.a. history table name) that features belong to
+            (e.g. inpatient, or outpatient).
+        :return: the prevalence dictionary of values for all the variables.
+        """
+        self.__readers_writers.save_text(
+            self.__output_path, self.__output_table,
+            ["Feature Name", "Top Prevalence Feature Name"], append=False, ext="ini")
+        self.__readers_writers.save_text(
+            self.__output_path, self.__output_table,
+            ["Feature Name", "Prevalence & Freq."], append=False, ext="txt")
+        feature_parser = FeatureParser(self.__variables_settings, self.__output_path, self.__output_table)
+        prevalence = dict()
+
+        # for tables
+        for table_i in range(len(input_schemas)):
+            variables_settings = self.__variables_settings[
+                self.__variables_settings["Table_History_Name"] == history_tables[table_i]]
+            prevalence[input_tables[table_i]] = dict()
+
+            # for features
+            for _, row in variables_settings.iterrows():
+                self.__logger.info("Prevalence: " + row["Variable_Name"] + " ...")
+                if not pd.isnull(row["Variable_Aggregation"]):
+                    # read features
+                    variables = self.__init_prevalence_read(
+                        input_schemas[table_i], input_tables[table_i], row["Variable_Name"])
+
+                    # validate
+                    if variables is None or len(variables) == 0:
+                        continue
+
+                    # prevalence
+                    prevalence[input_tables[table_i]][row["Variable_Name"]] = \
+                        feature_parser.prevalence(variables[row["Variable_Name"]], row["Variable_Name"])
+
+                    # for sub features
+                    postfixes = row["Variable_Aggregation"].replace(' ', '').split(',')
+                    for p in range(len(postfixes)):
+                        feature_name = row["Variable_Name"] + "_" + postfixes[p]
+                        if len(postfixes[p]) > 11 and postfixes[p][0:11] == "prevalence_":
+                            index = int(postfixes[p].split('_')[1]) - 1
+                            feature_name_prevalence = "None"
+                            if index < len(prevalence[input_tables[table_i]][row["Variable_Name"]]):
+                                feature_name_prevalence = \
+                                    feature_name + "_" + \
+                                    str(prevalence[input_tables[table_i]][row["Variable_Name"]][index])
+                            # save prevalence
+                            self.__readers_writers.save_text(self.__output_path, self.__output_table,
+                                                             [feature_name, feature_name_prevalence],
+                                                             append=True, ext="ini")
+        return prevalence
+
+    def __init_prevalence_read(self,
+                               input_schema: str,
+                               input_table: str,
+                               variable_name: str) -> PandasDataFrame:
+        """Read a variable from database, to calculate the prevalence of the values.
+        :param input_schema: the mysql database schema.
+        :param input_table: the mysql database table.
+        :param variable_name: the variable name.
+        :return: the selected variable.
+        """
+        query = "SELECT `" + variable_name + "` FROM `" + input_table + "`;"
+        return self.__readers_writers.load_mysql_query(query, input_schema, dataframing=True)
+
+    def __init_batch(self,
+                     input_schema: str,
+                     input_table: str) -> [int, int]:
+        """Find the minimum and maximum value of the index column, to use when reading mysql tables in
+            batches.
+        :param input_schema: the mysql database schema.
+        :param input_table: the mysql database table.
+        :return: the minimum and maximum of the index column.
+        """
+        self.__logger.debug(__name__)
+        query = "select min(localID), max(localID) from `" + input_table + "`;"
+        output = list(self.__readers_writers.load_mysql_query(query, input_schema, dataframing=False))
+        if [r[0] for r in output][0] is None:
+            self.__logger.error(__name__ + " No data is found: " + query)
+            sys.exit()
+
+        query_batch_start = int([r[0] for r in output][0])
+        query_batch_max = int([r[1] for r in output][0])
+        return query_batch_start, query_batch_max
+
+    def __set_features_names_types(self):
+        """Produce the sorted lists of features names and features dtypes.
+        :return: the sorted lists of features names and features dtypes.
+        """
+        self.__logger.debug(__name__)
+        keys = sorted(self.__features_dic_names.keys())
+        features_names = [f for k in keys for f in self.__features_dic_names[k]]
+        features_dtypes = [pd.Series(dtype=f) for k in keys for f in self.__features_dic_dtypes[k]]
+        features_dtypes = pd.DataFrame(dict(zip(features_names, features_dtypes))).dtypes
+        return features_names, features_dtypes
+
+    def __set_batch(self,
+                    features_names: list,
+                    features_dtypes: Dict,
+                    input_schemas: List,
+                    input_tables: List,
+                    history_tables: List,
+                    column_index: str,
+                    prevalence: Dict,
+                    query_batch_start: int,
+                    query_batch_max: int,
+                    query_batch_size: int):
+        """Using batch processing first read variables, then generate features and write them into output.
+        :param features_names: the name of features that are selected.
+        :param features_dtypes: the dtypes of features that are selected.
+        :param input_schemas: the mysql database schemas.
+        :param input_tables: the mysql table names.
+        :param history_tables: the source tables' alias names (a.k.a. history table name) that features belong to
+            (e.g. inpatient, or outpatient).
+        :param column_index: the name of index column (unique integer value) in the database table, which is used
+            for batch reading the input.
+        :param prevalence: the prevalence dictionary of values for all the variables.
+        :param query_batch_start: the minimum value of the column index.
+        :param query_batch_max: the maximum value of the column index.
+        :param query_batch_size: the number of rows to be read in each batch.
+        """
+        self.__logger.debug(__name__)
+        feature_parser = FeatureParser(self.__variables_settings, self.__output_path, self.__output_table)
+        step = -1
+        batch_break = False
+
+        while not batch_break:
+            step += 1
+            features = None
+            for table_i in range(len(input_schemas)):
+                self.__logger.info("Batch: " + str(step) + "; Table: " + input_tables[table_i])
+
+                # read job
+                variables = self.__set_batch_read(input_schemas[table_i], input_tables[table_i], step, column_index,
+                                                  query_batch_start, query_batch_max, query_batch_size)
+
+                # validate
+                if variables is None:
+                    batch_break = True
+                    break
+                elif len(variables) == 0:
+                    continue
+
+                # process job
+                if features is None:
+                    features = pd.DataFrame(0, index=range(len(variables)), columns=features_names)
+                    features = features.astype(dtype=features_dtypes)
+                features = self.__set_batch_process(
+                    feature_parser, history_tables[table_i], features, variables, prevalence[input_tables[table_i]])
+
+            # write job
+            if features is not None:
+                features = features.astype(dtype=features_dtypes)
+                self.__set_batch_write(features)
+
+    def __set_batch_read(self,
+                         input_schema: str,
+                         input_table: str,
+                         step: int,
+                         column_index: str,
+                         query_batch_start: int,
+                         query_batch_max: int,
+                         query_batch_size: int) -> Callable[[PandasDataFrame, None], None]:
+        """Read the queried variables.
+        :param input_schema: the mysql database schema.
+        :param input_table: the mysql database table.
+        :param step: the batch id.
+        :param column_index: the name of index column (unique integer value) in the database table, which is used
+            for batch reading the input.
+        :param query_batch_start: the minimum value of the column index.
+        :param query_batch_max: the maximum value of the column index.
+        :param query_batch_size: the number of rows to be read in each batch.
+        :return: the queried variables.
+        """
+        step_start = query_batch_start + step * query_batch_size
+        step_end = step_start + query_batch_size
+        if step_start >= query_batch_max:
+            return None
+        # read
+        query = "SELECT * FROM `" + input_table + \
+                "` WHERE `" + str(column_index) + "` >= " + str(step_start) + \
+                " AND `" + str(column_index) + "` < " + str(step_end) + ";"
+        return self.__readers_writers.load_mysql_query(query, input_schema, dataframing=True)
+
+    def __set_batch_process(self,
+                            feature_parser: FeaturesFeatureParser,
+                            history_table: str,
+                            features: PandasDataFrame,
+                            variables: PandasDataFrame,
+                            prevalence: List) -> PandasDataFrame:
+        """Process variables and generate features.
+        :param feature_parser:
+        :param history_table: the source table alias name (a.k.a. history table name) that features belong to
+            (e.g. inpatient, or outpatient).
+        :param features: the output features.
+        :param variables: the input variables.
+        :param prevalence: the prevalence dictionary of values for all the variables.
+        :return: the generated features.
+        """
+        return feature_parser.generate(history_table, features, variables, prevalence)
+
+    def __set_batch_write(self,
+                          features: PandasDataFrame):
+        """Write the features into an output file.
+        :param features: the output features.
+        """
+        self.__readers_writers.save_csv(self.__output_path, self.__output_table, features, append=True)
+
+    def __validate_mysql_names(self,
+                               input_schemas: List,
+                               history_tables: List):
+        """Validate mysql tables and their columns, and generate exception if table/column name is invalid.
+        :param input_schemas: the mysql database schemas.
+        :param history_tables: the source tables' alias names (a.k.a. history table name) that features belong to
+            (e.g. inpatient, or outpatient).
+        """
+        # for tables
+        for table_i in range(len(input_schemas)):
+            variables_settings = self.__variables_settings[
+                self.__variables_settings["Table_History_Name"] == history_tables[table_i]]
+            # validate table name
+            if not self.__readers_writers.exists_mysql(
+                    input_schemas[table_i], history_tables[table_i]):
+                self.__logger.error(__name__ + " - Table does not exist: " + history_tables[table_i])
+                sys.exit()
+
+            # for features
+            for _, row in variables_settings.iterrows():
+                # validate column name
+                if not self.__readers_writers.exists_mysql_column(
+                        input_schemas[table_i], history_tables[table_i], row["Variable_Name"]):
+                    self.__logger.error(__name__ + " - Column does not exist: " + row["Variable_Name"])
+                    sys.exit()