--- a +++ b/Features/FeatureParserThread.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- +# +# Copyright 2017 University of Westminster. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" It reads and parses the variables, then it generate features, in threaded batches. +""" + +from typing import List, TypeVar, Dict +import numpy as np +import statistics +from scipy.stats import itemfreq + +PandasDataFrame = TypeVar('DataFrame') +NumpyNdarray = TypeVar('ndarray') + +__author__ = "Mohsen Mesgarpour" +__copyright__ = "Copyright 2016, https://github.com/mesgarpour" +__credits__ = ["Mohsen Mesgarpour"] +__license__ = "GPL" +__version__ = "1.1" +__maintainer__ = "Mohsen Mesgarpour" +__email__ = "mohsen.mesgarpour@gmail.com" +__status__ = "Release" + + +class FeatureParserThread: + + @staticmethod + def aggregate_cell(postfixes: str, + variable_type: str, + prevalence: Dict, + variable_cell: str) -> NumpyNdarray: + """Aggregate the variable value, based on the selected aggregated functions. + :param postfixes: the aggregated variable. + :param variable_type: the type of the input variable. + :param prevalence: the prevalence dictionary of values for all the variables. + :param variable_cell: the variable value (a single row) to aggregate. + :return: the aggregated value (a single row). + """ + features_temp = np.zeros([len(postfixes)]) + + # if null or empty + if variable_cell is None or variable_cell == "": + return features_temp + + # parse variables + # Note: replace None with '0' + variable_cell = variable_cell.split('|') + variable_cell = [v2 for v1 in variable_cell for v2 in set(v1.split(','))] + variable_cell = [v if v != "" else 0 for v in variable_cell] + if variable_type == "INT": + variable_cell = list(map(int, variable_cell)) + + # generate freq. table + freq = itemfreq(variable_cell) + freq = np.array([tuple(row) for row in freq if row[1] != 0], dtype=[('value', 'int'), ('freq', 'int')]) + freq_sorted = np.sort(freq, order=['freq', 'value'])[::-1]['value'] + freq_dic = dict(zip(map(str, freq['value']), freq['freq'])) + + # set + for p in range(len(postfixes)): + if len(postfixes[p]) > 11 and postfixes[p][0:11] == "prevalence_": + index = int(postfixes[p].split('_')[1]) - 1 + if index < len(prevalence): + value = prevalence[index] + if str(value) in freq_dic.keys(): + features_temp[p] = freq_dic[value] + elif len(postfixes[p]) > 9 and postfixes[p][0:9] == "max_freq_": + index = int(postfixes[p][9:]) - 1 + if len(freq_sorted) > index: + features_temp[p] = freq_sorted[index] + elif postfixes[p] == "others_cnt": + features_temp[p] = len(freq_sorted) # np.count_nonzero(variable_cell) + elif postfixes[p] == "max": + features_temp[p] = max(variable_cell) + elif postfixes[p] == "avg": + features_temp[p] = statistics.mean(variable_cell) + elif postfixes[p] == "min": + features_temp[p] = min(variable_cell) + elif postfixes[p] == "median": + features_temp[p] = statistics.median(variable_cell) + else: + raise ValueError(postfixes) + return features_temp + + @staticmethod + def prevalence_cell(variable_cell: str) -> List: + """Parse the inputted variable value (a single row), to a list of value. + :param variable_cell: the variable value (a single row), to calculate the prevalence. + :return: the list of values of the current variable value. + """ + # if null or empty + if variable_cell is None or variable_cell == "": + return [] + else: + # parse variables + # Note: replace None with '0' + variable_cell = variable_cell.split('|') + variable_cell = [v2 for v1 in variable_cell for v2 in set(v1.split(','))] + variable_cell = [v if v != "" else 0 for v in variable_cell] + variable_cell = list(map(str, variable_cell)) + return variable_cell