--- a +++ b/AICare-baselines/metrics/es.py @@ -0,0 +1,99 @@ +import numpy as np + + +def calculate_confusion_matrix_value_result(outcome_pred, outcome_true): + outcome_pred = 1 if outcome_pred > 0.5 else 0 + if outcome_pred == 1 and outcome_true == 1: + return "tp" + elif outcome_pred == 0 and outcome_true == 0: + return "tn" + elif outcome_pred == 1 and outcome_true == 0: + return "fp" + elif outcome_pred == 0 and outcome_true == 1: + return "fn" + else: + raise ValueError("Unknown value occurred") + +def calculate_es(los_true, threshold, case="tp"): + metric = 0.0 + if case == "tp": + if los_true >= threshold: # predict correct in early stage + metric = 1 + else: + metric = los_true / threshold + elif case == "fn": + if los_true >= threshold: # predict wrong in early stage + metric = 0 + else: + metric = los_true / threshold - 1 + elif case == "tn": + metric = 0.0 + elif case == "fp": + metric = -0.1 # penalty term + return metric + + +def es_score( + y_true_outcome, + y_true_los, + y_pred_outcome, + threshold, + verbose=0 +): + """ + Args: + - threshold: 50%*mean_los (patient-wise) + + Note: + - y/predictions are already flattened here + - so we don't need to consider visits_length + """ + metric = [] + metric_optimal = [] + num_records = len(y_pred_outcome) + for i in range(num_records): + cur_outcome_pred = y_pred_outcome[i] + cur_outcome_true = y_true_outcome[i] + cur_los_true = y_true_los[i] + prediction_result = calculate_confusion_matrix_value_result(cur_outcome_pred, cur_outcome_true) + prediction_result_optimal = calculate_confusion_matrix_value_result(cur_outcome_true, cur_outcome_true) + metric.append( + calculate_es( + cur_los_true, + threshold, + case=prediction_result, + ) + ) + metric_optimal.append( + calculate_es( + cur_los_true, + threshold, + case=prediction_result_optimal, + ) + ) + metric = np.array(metric) + metric_optimal = np.array(metric_optimal) + result = 0.0 + if metric_optimal.sum() > 0.0: + result = metric.sum() / metric_optimal.sum() + result = max(result, -1.0) + if verbose: + print("ES Score:", result) + if isinstance(result, np.float64): + result = result.item() + return {"es": result} + +if __name__ == "__main__": + y_true_outcome = np.array([0,1]) + y_true_los = np.array([5,5]) + y_pred_outcome = np.array([0.7,0.7]) + y_pred_los = np.array([10,10]) + large_los = 110 + threshold = 10 + print(es_score( + y_true_outcome, + y_true_los, + y_pred_outcome, + threshold, + verbose=0 + )) \ No newline at end of file