|
a |
|
b/Features/FeatureParser.py |
|
|
1 |
#!/usr/bin/env python |
|
|
2 |
# -*- coding: UTF-8 -*- |
|
|
3 |
# |
|
|
4 |
# Copyright 2017 University of Westminster. All Rights Reserved. |
|
|
5 |
# |
|
|
6 |
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
7 |
# you may not use this file except in compliance with the License. |
|
|
8 |
# You may obtain a copy of the License at |
|
|
9 |
# |
|
|
10 |
# http://www.apache.org/licenses/LICENSE-2.0 |
|
|
11 |
# |
|
|
12 |
# Unless required by applicable law or agreed to in writing, software |
|
|
13 |
# distributed under the License is distributed on an "AS IS" BASIS, |
|
|
14 |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
|
15 |
# See the License for the specific language governing permissions and |
|
|
16 |
# limitations under the License. |
|
|
17 |
# ============================================================================== |
|
|
18 |
""" It reads and parses the variables, then it generate features. |
|
|
19 |
""" |
|
|
20 |
|
|
|
21 |
from typing import List, TypeVar, Dict |
|
|
22 |
import sys |
|
|
23 |
import pandas as pd |
|
|
24 |
import numpy as np |
|
|
25 |
import multiprocessing as mp |
|
|
26 |
from functools import partial |
|
|
27 |
from collections import Counter |
|
|
28 |
from ReadersWriters.ReadersWriters import ReadersWriters |
|
|
29 |
import logging |
|
|
30 |
from Features.FeatureParserThread import FeatureParserThread |
|
|
31 |
from Configs.CONSTANTS import CONSTANTS |
|
|
32 |
|
|
|
33 |
PandasDataFrame = TypeVar('DataFrame') |
|
|
34 |
NumpyNdarray = TypeVar('ndarray') |
|
|
35 |
|
|
|
36 |
__author__ = "Mohsen Mesgarpour" |
|
|
37 |
__copyright__ = "Copyright 2016, https://github.com/mesgarpour" |
|
|
38 |
__credits__ = ["Mohsen Mesgarpour"] |
|
|
39 |
__license__ = "GPL" |
|
|
40 |
__version__ = "1.1" |
|
|
41 |
__maintainer__ = "Mohsen Mesgarpour" |
|
|
42 |
__email__ = "mohsen.mesgarpour@gmail.com" |
|
|
43 |
__status__ = "Release" |
|
|
44 |
|
|
|
45 |
|
|
|
46 |
class FeatureParser: |
|
|
47 |
|
|
|
48 |
def __init__(self, |
|
|
49 |
variables_settings: PandasDataFrame, |
|
|
50 |
output_path: str, |
|
|
51 |
output_table: str): |
|
|
52 |
"""Initialise the objects and constants. |
|
|
53 |
:param variables_settings: |
|
|
54 |
:param output_path: the output path. |
|
|
55 |
:param output_table: the output table name. |
|
|
56 |
""" |
|
|
57 |
self.__logger = logging.getLogger(CONSTANTS.app_name) |
|
|
58 |
self.__logger.debug(__name__) |
|
|
59 |
self.__variables_settings = variables_settings |
|
|
60 |
self.__output_path = output_path |
|
|
61 |
self.__output_table = output_table |
|
|
62 |
self.__readers_writers = ReadersWriters() |
|
|
63 |
self.__FeatureParserThread = FeatureParserThread() |
|
|
64 |
|
|
|
65 |
def generate(self, |
|
|
66 |
history_table: str, |
|
|
67 |
features: PandasDataFrame, |
|
|
68 |
variables: PandasDataFrame, |
|
|
69 |
prevalence: Dict) -> PandasDataFrame: |
|
|
70 |
""" |
|
|
71 |
|
|
|
72 |
:param history_table: the source table alias name (a.k.a. history table name) that features belong to |
|
|
73 |
(e.g. inpatient, or outpatient). |
|
|
74 |
:param features: the output features. |
|
|
75 |
:param variables: the input variables. |
|
|
76 |
:param prevalence: the prevalence dictionary of values for all the variables. |
|
|
77 |
:return: the output features. |
|
|
78 |
""" |
|
|
79 |
variables_settings = self.__variables_settings[self.__variables_settings["Table_History_Name"] == history_table] |
|
|
80 |
|
|
|
81 |
for _, row in variables_settings.iterrows(): |
|
|
82 |
self.__logger.info("variable: " + row["Variable_Name"] + " ...") |
|
|
83 |
|
|
|
84 |
if not pd.isnull(row["Variable_Aggregation"]): |
|
|
85 |
postfixes = row["Variable_Aggregation"].replace(' ', '').split(',') |
|
|
86 |
# aggregate stats |
|
|
87 |
features_temp = self.__aggregate( |
|
|
88 |
variables[row["Variable_Name"]], row["Variable_Type_Original"], |
|
|
89 |
postfixes, prevalence[row["Variable_Name"]]) |
|
|
90 |
for p in range(len(postfixes)): |
|
|
91 |
# feature name |
|
|
92 |
feature_name = row["Variable_Name"] + "_" + postfixes[p] |
|
|
93 |
# set |
|
|
94 |
features[feature_name] = features_temp[:, p] |
|
|
95 |
else: |
|
|
96 |
# init and replace none by zero |
|
|
97 |
features_temp = np.nan_to_num(variables[row["Variable_Name"]]) |
|
|
98 |
features_temp = np.where(features_temp == np.array(None), 0, features_temp) |
|
|
99 |
# set |
|
|
100 |
features[row["Variable_Name"]] = features_temp |
|
|
101 |
return features |
|
|
102 |
|
|
|
103 |
def __aggregate(self, |
|
|
104 |
variable: PandasDataFrame, |
|
|
105 |
variable_type: str, |
|
|
106 |
postfixes: str, |
|
|
107 |
prevalence: Dict) -> NumpyNdarray: |
|
|
108 |
""" |
|
|
109 |
|
|
|
110 |
:param variable: the input variable. |
|
|
111 |
:param variable_type: the type of input variable. |
|
|
112 |
:param postfixes: name of the aggregation functions. |
|
|
113 |
:param prevalence: the prevalence dictionary of values for all the variables. |
|
|
114 |
:return: the aggregated variable. |
|
|
115 |
""" |
|
|
116 |
try: |
|
|
117 |
with mp.Pool() as pool: |
|
|
118 |
features_temp = pool.map( |
|
|
119 |
partial(self.__FeatureParserThread.aggregate_cell, postfixes, variable_type, prevalence), variable) |
|
|
120 |
except ValueError as exception: |
|
|
121 |
self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception)) |
|
|
122 |
sys.exit() |
|
|
123 |
|
|
|
124 |
features_temp = np.asarray(features_temp) |
|
|
125 |
return features_temp |
|
|
126 |
|
|
|
127 |
def prevalence(self, |
|
|
128 |
variable: PandasDataFrame, |
|
|
129 |
variable_name: str) -> List: |
|
|
130 |
""" |
|
|
131 |
:param variable: the input variable. |
|
|
132 |
:param variable_name: the name of the input variable. |
|
|
133 |
:return: the prevalence of values for all the variables. |
|
|
134 |
""" |
|
|
135 |
try: |
|
|
136 |
with mp.Pool() as pool: |
|
|
137 |
prevalence_temp = pool.map( |
|
|
138 |
partial(self.__FeatureParserThread.prevalence_cell), variable) |
|
|
139 |
except ValueError as exception: |
|
|
140 |
self.__logger.error(__name__ + " - Invalid configuration(s): " + str(exception)) |
|
|
141 |
sys.exit() |
|
|
142 |
|
|
|
143 |
prevalence_temp = [sub2 for sub1 in prevalence_temp for sub2 in sub1] |
|
|
144 |
prevalence = Counter(prevalence_temp).most_common() |
|
|
145 |
self.__readers_writers.save_text(self.__output_path, self.__output_table, |
|
|
146 |
[variable_name, '; '.join([str(p[0]) + ":" + str(p[1]) for p in prevalence])], |
|
|
147 |
append=True, ext="txt") |
|
|
148 |
prevalence = [p[0] for p in prevalence] |
|
|
149 |
return prevalence |