--- a +++ b/src/evaluation/statistical_tests.py @@ -0,0 +1,401 @@ +"""Statistical tests for the experiments of the N2C2 and DDI corpora.""" + +# Base Dependencies +# ----------------- +import numpy as np +from pathlib import Path +from os.path import join as pjoin +from typing import List + +# Package Dependencies +# -------------------- +from .io import * + +# 3rd Party Dependencies +# ---------------------- +import pandas as pd +import matplotlib.pyplot as plt +import statsmodels.api as sm + +from glob import glob +from scipy.stats import wilcoxon, levene, shapiro, f_oneway, f, kruskal, chi2 +from statsmodels.stats.multitest import multipletests +from Orange.evaluation import compute_CD, graph_ranks +from statsmodels.formula.api import ols +from statsmodels.stats.multicomp import pairwise_tukeyhsd +from tabulate import tabulate + +# Constants +# --------- +from constants import METHODS_NAMES, DATASETS, DATASETS_PATHS + +ALPHA = 0.05 +STRATEGIES = { + "rf": ["LC", "BatchLC"], + "bilstm": ["LC", "BatchBALD"], + "bert": ["LC", "BatchBALD"], + "bert-pairs": ["LC", "BatchBALD"], +} + +BASELINE_NAME = "random" + + +# ANOVA Tests +# ---------- +def one_way_anova_pl(df: pd.DataFrame, corpus: str, alpha: float): + """Performs a one-way ANOVA test to determine if there are significant differences between the methods""" + # group results by method + grouped = [] + for method in df["method"].unique(): + grouped.append(list(df[df["method"] == method]["Micro_f1"])) + + # Perform a one-way ANOVA test + _, pval = f_oneway(*grouped) + + print("\n\n**** One-way ANOVA test ****") + print(" - Corpus: ", corpus) + print( + " - Description: determines if there are significant differences between the methods" + ) + print(" - p-value: ", pval) + print(" - Significant difference: ", bool(pval < alpha)) + + # if there is a signficant difference between the methods, permform Tukey's HSD test + if pval < alpha: + tukey_hsd(df, corpus, alpha) + + +def two_way_anova_pl(df: pd.DataFrame): + """Performs a two-way ANOVA test to determine if there are significant differences between the methods and corpora""" + # two-way ANOVA test + formula = "Micro_f1 ~ C(method) + C(corpus) + C(method):C(corpus)" + model = ols(formula, data=df).fit() + anova_table = sm.stats.anova_lm(model, typ=2) + + # Check the normality of residuals using Shapiro-Wilk test + residuals = model.resid + w, p_value = shapiro(residuals) + + print("\n\n**** Two-way ANOVA assumptions ****") + print(" Normality of residuals - Shapiro-Wilk test:") + print(" W:", w) + print(" p-value:", p_value, "(", (p_value > ALPHA), ")") + + # create Q-Q plot of residuals + sm.qqplot(model.resid, line="s") + plt.show() + + # Check the homogeneity of variances using Levene's test + grouped_data = df.groupby(["method", "corpus"])["Micro_f1"] + w, p_value = levene(*[group.values for name, group in grouped_data]) + print(" Homogeneity of variances - Levene's test:") + print(" W:", w) + print(" p-value:", p_value, "(", p_value > ALPHA, ")") + + # + print("\n\n**** Two-way ANOVA test ****") + print(anova_table) + + +# Post-hoc tests +# -------------- +def tukey_hsd(df: pd.DataFrame, alpha: float = ALPHA): + """Performs Tukey's HSD test to determine which pairs of methods have significantly different means""" + corpora = list(df["corpus"].unique()) + + # Perform Tukey's HSD test + tukey_results_method = pairwise_tukeyhsd(df["Micro_f1"], df["method"], alpha=alpha) + if len(corpora) > 1: + tukey_resutls_corpus = pairwise_tukeyhsd( + df["Micro_f1"], df["corpus"], alpha=alpha + ) + + print("\n\n**** Tukey's HDS test ({})****".format(corpora)) + print( + " - Description: determines which pairs of methods have significantly different means" + ) + print(tukey_results_method) + if len(corpora) > 1: + print(tukey_resutls_corpus) + + +def nemenyi_test(avg_ranks: List[float], methods: List[str], N: int): + """Performs Nemenyi's test to determine which pairs of methods have significantly different means""" + cd_nemenyi = compute_CD( + avranks=list(avg_ranks), n=N, alpha=str(ALPHA), test="nemenyi" + ) + print("\n\n**** Nemenyi's test ****") + print(" - critical distance (alpha = {}) = ".format(ALPHA), cd_nemenyi) + print(" - avg. ranks:") + for i in range(len(methods)): + print(" - {}: {}".format(methods[i], avg_ranks[i])) + graph_ranks( + avg_ranks, + methods, + cd=cd_nemenyi, + width=9, + textspace=1, + ) + plt.show() + + +def bonferroni_test(avg_ranks: List[float], methods: List[str], N: int): + """Performs Bonferroni's test to determine which pairs of methods have significantly different means""" + cd_bonferroni = compute_CD( + avranks=list(avg_ranks), n=N, alpha=str(ALPHA), test="bonferroni" + ) + print("\n\n**** Bonferroni's test ****") + print(" - critical distance (alpha = {}) = ".format(ALPHA), cd_bonferroni) + print(" - avg. ranks:") + for i in range(len(methods)): + print(" - {}: {}".format(methods[i], avg_ranks[i])) + graph_ranks( + avg_ranks, + methods, + cd=cd_bonferroni, + width=9, + textspace=1, + ) + plt.show() + + +# Non-parametric tests +# -------------------- +def kruskal_wallis_method(df: pd.DataFrame): + """Performs the Kruskal-Wallis test to determine if there are significant differences between the methods""" + metric = "Micro_f1" + corpora = list(df["corpus"].unique()) + methods = list(METHODS_NAMES.keys()) + + # Create a list of the data for each method + method1_data = df[df["method"] == methods[0]][metric] + method2_data = df[df["method"] == methods[1]][metric] + method3_data = df[df["method"] == methods[2]][metric] + method4_data = df[df["method"] == methods[3]][metric] + + # Perform the Kruskal-Wallis test + H, p = kruskal(method1_data, method2_data, method3_data, method4_data) + + # Print the test results + print("\n\n**** Kruskal-Wallis test ({})****".format(corpora)) + print("Kruskal-Wallis H statistic: ", H) + print("p-value: ", p) + + +def ivan_and_davenport_test(df: pd.DataFrame): + """Performs Ivan and Davenport test to determine which pairs of methods have significantly different means""" + N = len(df["corpus"].unique()) # number of corpora + K = len(df["method"].unique()) # number of methods + + # calculate mean of f1 for each combination of method and corpus + df_mean = df.groupby(["method", "corpus"], as_index=False).mean() + + # pivot the dataframe + df_pivot = df_mean.pivot(index="corpus", columns="method", values="Micro_f1") + print(df_pivot) + + # compute square ranks of each method + df_ranks = df_pivot.rank(axis=1, ascending=False) + + avg_ranks = df_ranks.mean(axis=0).to_dict() + sqr_avg_ranks = np.array(list(map(lambda x: x**2, avg_ranks.values()))) + + # perform Friedman test + friedman = ( + (12 * N) / (K * (K + 1)) * (sum(sqr_avg_ranks) - (K / 4 * ((K + 1) ** 2))) + ) + F_f = (N - 1) * friedman / (N * (K - 1) - friedman) + p_value = 1 - f.cdf(F_f, (K - 1), (K - 1) * (N - 1)) + + print("\n\n**** Ivan and Davenport test ****") + print("F_F statistic: ", F_f) + print("p-value: ", p_value) + + print() + + # perform Nemenyi test if Friedman test is significant + if p_value < ALPHA: + nemenyi_test(list(avg_ranks.values()), list(avg_ranks.keys()), N) + bonferroni_test(list(avg_ranks.values()), list(avg_ranks.keys()), N) + + +def friedman_test(df: pd.DataFrame): + """Performs Friedman's test to determine if there is a significant difference between the methods""" + N = len(df["corpus"].unique()) # number of corpora + K = len(df["method"].unique()) # number of methods + + # calculate mean of f1 for each combination of method and corpus + df_mean = df.groupby(["method", "corpus"], as_index=False).mean() + + # pivot the dataframe + df_pivot = df_mean.pivot(index="corpus", columns="method", values="Micro_f1") + + # compute square ranks of each method + df_ranks = df_pivot.rank(axis=1, ascending=False) + avg_ranks = df_ranks.mean(axis=0).to_dict() + sqr_avg_ranks = np.array(list(map(lambda x: x**2, avg_ranks.values()))) + assert len(sqr_avg_ranks) == K + + # perform Friedman test + friedman = ( + (12 * N) / (K * (K + 1)) * (sum(sqr_avg_ranks) - (K / 4 * ((K + 1) ** 2))) + ) + p_value = 1 - chi2.cdf(friedman, K - 1) + + print("\n\n**** Friedman test ****") + print("Friedman statistic: ", friedman) + print("p-value: ", p_value) + + print() + + # perform Nemenyi test if there is a significant difference + if p_value < ALPHA: + nemenyi_test(list(avg_ranks.values()), list(avg_ranks.keys()), N) + bonferroni_test(list(avg_ranks.values()), list(avg_ranks.keys()), N) + + +def load_method_data(method: str, strategy: str): + """Loads the data for a given method and strategy.""" + data = [] + + for corpus in DATASETS: + if corpus == "n2c2": + path = Path( + pjoin("results", "n2c2", "all", method, "active learning", strategy) + ) + else: + path = Path(pjoin("results", "ddi", method, "active learning", strategy)) + + for exp in glob(str(pjoin(path, "*f1.csv"))): + df = pd.read_csv(exp) + data = data + df["Micro_f1"].tolist() + + return data + + +# Main +# ---- +def pl_statistical_tests(): + """Statistical tests for the passive learning experiments.""" + + # load results + n2c2_results = collect_pl_results_n2c2(Path(pjoin("results", "n2c2", "all"))) + ddi_results = collect_pl_results_ddi(Path(pjoin("results", "ddi"))) + + # concatenate results + n2c2_results = n2c2_results[n2c2_results["relation"] == "Micro"] + n2c2_results = n2c2_results[["f1", "method"]] + n2c2_results.columns = ["Micro_f1", "method"] + n2c2_results["corpus"] = "n2c2" + ddi_results["corpus"] = "ddi" + results = pd.concat([n2c2_results, ddi_results]) + + # friedman_test(results) + ivan_and_davenport_test(results) + + +def al_performance_statistical_tests(): + """Statistical tests for the active learning experiments.""" + + # create an array to store the p-values for each strategy and scenario + p_values = np.zeros((2, 4)) + + for j, method in enumerate(METHODS_NAMES.keys()): + method_p_values = [] + strategies = STRATEGIES[method] + + for i, strategy_name in enumerate(strategies): + # iterate over each scenario + strategy_data = load_method_data(method=method, strategy=strategy_name) + baseline_data = load_method_data(method=method, strategy=BASELINE_NAME) + + assert len(strategy_data) == len(baseline_data) + + # calculate the Wilcoxon signed-rank test p-value for the pairs + _, p_value = wilcoxon( + x=strategy_data, y=baseline_data, alternative="greater" + ) + + # store the p-value for the current strategy and method + method_p_values.append(p_value) + + # perform Bonferroni correction on the p-values for the current scenario + rejected, corrected_p_values, _, _ = multipletests( + method_p_values, alpha=ALPHA, method="bonferroni" + ) + + # store the corrected p-values in the array + p_values[:, j] = corrected_p_values + + # print the corrected p-values and indicate whether the null hypothesis is rejected or not + for j, method in enumerate(METHODS_NAMES.keys()): + strategies = STRATEGIES[method] + + for i, strategy_name in enumerate(strategies): + is_rejected = p_values[i, j] <= ALPHA + print( + f"Strategy {strategy_name} vs. {BASELINE_NAME} with method {method}: " + f"p-value = {p_values[i, j]:.10f}, " + f"null hypothesis is {'rejected' if is_rejected else 'not rejected'}" + ) + print() + + +def ar_statistical_test(): + ar_ddi = collect_annotation_rates(Path(pjoin("results", "ddi"))) + ar_n2c2 = collect_annotation_rates(Path(pjoin("results", "n2c2", "all"))) + ar_ddi["Corpus"] = "DDI" + ar_n2c2["Corpus"] = "n2c2" + ar_results = pd.concat([ar_ddi, ar_n2c2]) + + # create an array to store the p-values for each strategy and scenario + + for metric in ["TAR", "CAR"]: + p_values = np.zeros((2, 4)) + + for j, method in enumerate(METHODS_NAMES.keys()): + method_p_values = [] + + for i, strategy_name in enumerate(["LC", "BatchLC / BatchBALD"]): + # iterate over each scenario + strategy_data = ar_results.loc[ + (ar_results["method"] == method) + & (ar_results["strategy"] == strategy_name) + ][metric] + baseline_data = ar_results.loc[ + (ar_results["method"] == method) + & (ar_results["strategy"] == BASELINE_NAME) + ][metric] + + assert len(strategy_data) == len(baseline_data) + + # calculate the Wilcoxon signed-rank test p-value for the pairs + _, p_value = wilcoxon( + x=strategy_data, y=baseline_data, alternative="two-sided" + ) + + # store the p-value for the current strategy and method + method_p_values.append(p_value) + + # perform Bonferroni correction on the p-values for the current scenario + rejected, corrected_p_values, _, _ = multipletests( + method_p_values, alpha=ALPHA, method="bonferroni" + ) + + # store the corrected p-values in the array + p_values[:, j] = corrected_p_values + + # print the corrected p-values and indicate whether the null hypothesis is rejected or not + print("\nMetric: ", metric) + for j, method in enumerate(METHODS_NAMES.keys()): + strategies = STRATEGIES[method] + + for i, strategy_name in enumerate(strategies): + is_rejected = p_values[i, j] <= ALPHA + if is_rejected: + print( + f"Strategy {strategy_name} vs. {BASELINE_NAME} with method {method}: " + f"p-value = {p_values[i, j]:.10f}, " + f"null hypothesis is {'rejected' if is_rejected else 'not rejected'}" + ) + print()