a b/Features/FeatureParser.py
1
#!/usr/bin/env python
2
# -*- coding: UTF-8 -*-
3
#
4
# Copyright 2017 University of Westminster. All Rights Reserved.
5
#
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
#
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
# ==============================================================================
18
""" It reads and parses the variables, then it generate features.
19
"""
20
21
from typing import List, TypeVar, Dict
22
import sys
23
import pandas as pd
24
import numpy as np
25
import multiprocessing as mp
26
from functools import partial
27
from collections import Counter
28
from ReadersWriters.ReadersWriters import ReadersWriters
29
import logging
30
from Features.FeatureParserThread import FeatureParserThread
31
from Configs.CONSTANTS import CONSTANTS
32
33
PandasDataFrame = TypeVar('DataFrame')
34
NumpyNdarray = TypeVar('ndarray')
35
36
__author__ = "Mohsen Mesgarpour"
37
__copyright__ = "Copyright 2016, https://github.com/mesgarpour"
38
__credits__ = ["Mohsen Mesgarpour"]
39
__license__ = "GPL"
40
__version__ = "1.1"
41
__maintainer__ = "Mohsen Mesgarpour"
42
__email__ = "mohsen.mesgarpour@gmail.com"
43
__status__ = "Release"
44
45
46
class FeatureParser:
47
48
    def __init__(self,
49
                 variables_settings: PandasDataFrame,
50
                 output_path: str,
51
                 output_table: str):
52
        """Initialise the objects and constants.
53
        :param variables_settings:
54
        :param output_path: the output path.
55
        :param output_table: the output table name.
56
        """
57
        self.__logger = logging.getLogger(CONSTANTS.app_name)
58
        self.__logger.debug(__name__)
59
        self.__variables_settings = variables_settings
60
        self.__output_path = output_path
61
        self.__output_table = output_table
62
        self.__readers_writers = ReadersWriters()
63
        self.__FeatureParserThread = FeatureParserThread()
64
65
    def generate(self,
66
                 history_table: str,
67
                 features: PandasDataFrame,
68
                 variables: PandasDataFrame,
69
                 prevalence: Dict) -> PandasDataFrame:
70
        """
71
72
        :param history_table: the source table alias name (a.k.a. history table name) that features belong to
73
            (e.g. inpatient, or outpatient).
74
        :param features: the output features.
75
        :param variables: the input variables.
76
        :param prevalence: the prevalence dictionary of values for all the variables.
77
        :return: the output features.
78
        """
79
        variables_settings = self.__variables_settings[self.__variables_settings["Table_History_Name"] == history_table]
80
81
        for _, row in variables_settings.iterrows():
82
            self.__logger.info("variable: " + row["Variable_Name"] + " ...")
83
84
            if not pd.isnull(row["Variable_Aggregation"]):
85
                postfixes = row["Variable_Aggregation"].replace(' ', '').split(',')
86
                # aggregate stats
87
                features_temp = self.__aggregate(
88
                    variables[row["Variable_Name"]], row["Variable_Type_Original"],
89
                    postfixes, prevalence[row["Variable_Name"]])
90
                for p in range(len(postfixes)):
91
                    # feature name
92
                    feature_name = row["Variable_Name"] + "_" + postfixes[p]
93
                    # set
94
                    features[feature_name] = features_temp[:, p]
95
            else:
96
                # init and replace none by zero
97
                features_temp = np.nan_to_num(variables[row["Variable_Name"]])
98
                features_temp = np.where(features_temp == np.array(None), 0, features_temp)
99
                # set
100
                features[row["Variable_Name"]] = features_temp
101
        return features
102
103
    def __aggregate(self,
104
                    variable: PandasDataFrame,
105
                    variable_type: str,
106
                    postfixes: str,
107
                    prevalence: Dict) -> NumpyNdarray:
108
        """
109
110
        :param variable: the input variable.
111
        :param variable_type: the type of input variable.
112
        :param postfixes: name of the aggregation functions.
113
        :param prevalence: the prevalence dictionary of values for all the variables.
114
        :return: the aggregated variable.
115
        """
116
        try:
117
            with mp.Pool() as pool:
118
                features_temp = pool.map(
119
                    partial(self.__FeatureParserThread.aggregate_cell, postfixes, variable_type, prevalence), variable)
120
        except ValueError as exception:
121
            self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception))
122
            sys.exit()
123
124
        features_temp = np.asarray(features_temp)
125
        return features_temp
126
127
    def prevalence(self,
128
                   variable: PandasDataFrame,
129
                   variable_name: str) -> List:
130
        """
131
        :param variable: the input variable.
132
        :param variable_name: the name of the input variable.
133
        :return: the prevalence of values for all the variables.
134
        """
135
        try:
136
            with mp.Pool() as pool:
137
                prevalence_temp = pool.map(
138
                    partial(self.__FeatureParserThread.prevalence_cell), variable)
139
        except ValueError as exception:
140
            self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception))
141
            sys.exit()
142
143
        prevalence_temp = [sub2 for sub1 in prevalence_temp for sub2 in sub1]
144
        prevalence = Counter(prevalence_temp).most_common()
145
        self.__readers_writers.save_text(self.__output_path, self.__output_table,
146
                                         [variable_name, '; '.join([str(p[0]) + ":" + str(p[1]) for p in prevalence])],
147
                                         append=True, ext="txt")
148
        prevalence = [p[0] for p in prevalence]
149
        return prevalence