Switch to side-by-side view

--- a
+++ b/src/evaluation/statistical_tests.py
@@ -0,0 +1,401 @@
+"""Statistical tests for the experiments of the N2C2 and DDI corpora."""
+
+# Base Dependencies
+# -----------------
+import numpy as np
+from pathlib import Path
+from os.path import join as pjoin
+from typing import List
+
+# Package Dependencies
+# --------------------
+from .io import *
+
+# 3rd Party Dependencies
+# ----------------------
+import pandas as pd
+import matplotlib.pyplot as plt
+import statsmodels.api as sm
+
+from glob import glob
+from scipy.stats import wilcoxon, levene, shapiro, f_oneway, f, kruskal, chi2
+from statsmodels.stats.multitest import multipletests
+from Orange.evaluation import compute_CD, graph_ranks
+from statsmodels.formula.api import ols
+from statsmodels.stats.multicomp import pairwise_tukeyhsd
+from tabulate import tabulate
+
+# Constants
+# ---------
+from constants import METHODS_NAMES, DATASETS, DATASETS_PATHS
+
+ALPHA = 0.05
+STRATEGIES = {
+    "rf": ["LC", "BatchLC"],
+    "bilstm": ["LC", "BatchBALD"],
+    "bert": ["LC", "BatchBALD"],
+    "bert-pairs": ["LC", "BatchBALD"],
+}
+
+BASELINE_NAME = "random"
+
+
+# ANOVA Tests
+# ----------
+def one_way_anova_pl(df: pd.DataFrame, corpus: str, alpha: float):
+    """Performs a one-way ANOVA test to determine if there are significant differences between the methods"""
+    # group results by method
+    grouped = []
+    for method in df["method"].unique():
+        grouped.append(list(df[df["method"] == method]["Micro_f1"]))
+
+    # Perform a one-way ANOVA test
+    _, pval = f_oneway(*grouped)
+
+    print("\n\n**** One-way ANOVA test  ****")
+    print("  - Corpus: ", corpus)
+    print(
+        "  - Description: determines if there are significant differences between the methods"
+    )
+    print("  - p-value: ", pval)
+    print("  - Significant difference: ", bool(pval < alpha))
+
+    # if there is a signficant difference between the methods, permform Tukey's HSD test
+    if pval < alpha:
+        tukey_hsd(df, corpus, alpha)
+
+
+def two_way_anova_pl(df: pd.DataFrame):
+    """Performs a two-way ANOVA test to determine if there are significant differences between the methods and corpora"""
+    # two-way ANOVA test
+    formula = "Micro_f1 ~ C(method) + C(corpus) + C(method):C(corpus)"
+    model = ols(formula, data=df).fit()
+    anova_table = sm.stats.anova_lm(model, typ=2)
+
+    # Check the normality of residuals using Shapiro-Wilk test
+    residuals = model.resid
+    w, p_value = shapiro(residuals)
+
+    print("\n\n**** Two-way ANOVA assumptions  ****")
+    print(" Normality of residuals - Shapiro-Wilk test:")
+    print("     W:", w)
+    print("     p-value:", p_value, "(", (p_value > ALPHA), ")")
+
+    # create Q-Q plot of residuals
+    sm.qqplot(model.resid, line="s")
+    plt.show()
+
+    # Check the homogeneity of variances using Levene's test
+    grouped_data = df.groupby(["method", "corpus"])["Micro_f1"]
+    w, p_value = levene(*[group.values for name, group in grouped_data])
+    print(" Homogeneity of variances - Levene's test:")
+    print("     W:", w)
+    print("     p-value:", p_value, "(", p_value > ALPHA, ")")
+
+    #
+    print("\n\n**** Two-way ANOVA test  ****")
+    print(anova_table)
+
+
+# Post-hoc tests
+# --------------
+def tukey_hsd(df: pd.DataFrame, alpha: float = ALPHA):
+    """Performs Tukey's HSD test to determine which pairs of methods have significantly different means"""
+    corpora = list(df["corpus"].unique())
+
+    # Perform Tukey's HSD test
+    tukey_results_method = pairwise_tukeyhsd(df["Micro_f1"], df["method"], alpha=alpha)
+    if len(corpora) > 1:
+        tukey_resutls_corpus = pairwise_tukeyhsd(
+            df["Micro_f1"], df["corpus"], alpha=alpha
+        )
+
+    print("\n\n**** Tukey's HDS test  ({})****".format(corpora))
+    print(
+        "  - Description: determines which pairs of methods have significantly different means"
+    )
+    print(tukey_results_method)
+    if len(corpora) > 1:
+        print(tukey_resutls_corpus)
+
+
+def nemenyi_test(avg_ranks: List[float], methods: List[str], N: int):
+    """Performs Nemenyi's test to determine which pairs of methods have significantly different means"""
+    cd_nemenyi = compute_CD(
+        avranks=list(avg_ranks), n=N, alpha=str(ALPHA), test="nemenyi"
+    )
+    print("\n\n**** Nemenyi's test  ****")
+    print(" - critical distance (alpha = {}) = ".format(ALPHA), cd_nemenyi)
+    print(" - avg. ranks:")
+    for i in range(len(methods)):
+        print("     - {}: {}".format(methods[i], avg_ranks[i]))
+    graph_ranks(
+        avg_ranks,
+        methods,
+        cd=cd_nemenyi,
+        width=9,
+        textspace=1,
+    )
+    plt.show()
+
+
+def bonferroni_test(avg_ranks: List[float], methods: List[str], N: int):
+    """Performs Bonferroni's test to determine which pairs of methods have significantly different means"""
+    cd_bonferroni = compute_CD(
+        avranks=list(avg_ranks), n=N, alpha=str(ALPHA), test="bonferroni"
+    )
+    print("\n\n**** Bonferroni's test  ****")
+    print(" - critical distance (alpha = {}) = ".format(ALPHA), cd_bonferroni)
+    print(" - avg. ranks:")
+    for i in range(len(methods)):
+        print("     - {}: {}".format(methods[i], avg_ranks[i]))
+    graph_ranks(
+        avg_ranks,
+        methods,
+        cd=cd_bonferroni,
+        width=9,
+        textspace=1,
+    )
+    plt.show()
+
+
+# Non-parametric tests
+# --------------------
+def kruskal_wallis_method(df: pd.DataFrame):
+    """Performs the Kruskal-Wallis test to determine if there are significant differences between the methods"""
+    metric = "Micro_f1"
+    corpora = list(df["corpus"].unique())
+    methods = list(METHODS_NAMES.keys())
+
+    # Create a list of the data for each method
+    method1_data = df[df["method"] == methods[0]][metric]
+    method2_data = df[df["method"] == methods[1]][metric]
+    method3_data = df[df["method"] == methods[2]][metric]
+    method4_data = df[df["method"] == methods[3]][metric]
+
+    # Perform the Kruskal-Wallis test
+    H, p = kruskal(method1_data, method2_data, method3_data, method4_data)
+
+    # Print the test results
+    print("\n\n**** Kruskal-Wallis test  ({})****".format(corpora))
+    print("Kruskal-Wallis H statistic: ", H)
+    print("p-value: ", p)
+
+
+def ivan_and_davenport_test(df: pd.DataFrame):
+    """Performs Ivan and Davenport test to determine which pairs of methods have significantly different means"""
+    N = len(df["corpus"].unique())  # number of corpora
+    K = len(df["method"].unique())  # number of methods
+
+    # calculate mean of f1 for each combination of method and corpus
+    df_mean = df.groupby(["method", "corpus"], as_index=False).mean()
+
+    # pivot the dataframe
+    df_pivot = df_mean.pivot(index="corpus", columns="method", values="Micro_f1")
+    print(df_pivot)
+
+    # compute square ranks of each method
+    df_ranks = df_pivot.rank(axis=1, ascending=False)
+
+    avg_ranks = df_ranks.mean(axis=0).to_dict()
+    sqr_avg_ranks = np.array(list(map(lambda x: x**2, avg_ranks.values())))
+
+    # perform Friedman test
+    friedman = (
+        (12 * N) / (K * (K + 1)) * (sum(sqr_avg_ranks) - (K / 4 * ((K + 1) ** 2)))
+    )
+    F_f = (N - 1) * friedman / (N * (K - 1) - friedman)
+    p_value = 1 - f.cdf(F_f, (K - 1), (K - 1) * (N - 1))
+
+    print("\n\n**** Ivan and Davenport test  ****")
+    print("F_F statistic: ", F_f)
+    print("p-value: ", p_value)
+
+    print()
+
+    # perform Nemenyi test if Friedman test is significant
+    if p_value < ALPHA:
+        nemenyi_test(list(avg_ranks.values()), list(avg_ranks.keys()), N)
+        bonferroni_test(list(avg_ranks.values()), list(avg_ranks.keys()), N)
+
+
+def friedman_test(df: pd.DataFrame):
+    """Performs Friedman's test to determine if there is a significant difference between the methods"""
+    N = len(df["corpus"].unique())  # number of corpora
+    K = len(df["method"].unique())  # number of methods
+
+    # calculate mean of f1 for each combination of method and corpus
+    df_mean = df.groupby(["method", "corpus"], as_index=False).mean()
+
+    # pivot the dataframe
+    df_pivot = df_mean.pivot(index="corpus", columns="method", values="Micro_f1")
+
+    # compute square ranks of each method
+    df_ranks = df_pivot.rank(axis=1, ascending=False)
+    avg_ranks = df_ranks.mean(axis=0).to_dict()
+    sqr_avg_ranks = np.array(list(map(lambda x: x**2, avg_ranks.values())))
+    assert len(sqr_avg_ranks) == K
+
+    # perform Friedman test
+    friedman = (
+        (12 * N) / (K * (K + 1)) * (sum(sqr_avg_ranks) - (K / 4 * ((K + 1) ** 2)))
+    )
+    p_value = 1 - chi2.cdf(friedman, K - 1)
+
+    print("\n\n**** Friedman test  ****")
+    print("Friedman statistic: ", friedman)
+    print("p-value: ", p_value)
+
+    print()
+
+    # perform Nemenyi test if there is a significant difference
+    if p_value < ALPHA:
+        nemenyi_test(list(avg_ranks.values()), list(avg_ranks.keys()), N)
+        bonferroni_test(list(avg_ranks.values()), list(avg_ranks.keys()), N)
+
+
+def load_method_data(method: str, strategy: str):
+    """Loads the data for a given method and strategy."""
+    data = []
+
+    for corpus in DATASETS:
+        if corpus == "n2c2":
+            path = Path(
+                pjoin("results", "n2c2", "all", method, "active learning", strategy)
+            )
+        else:
+            path = Path(pjoin("results", "ddi", method, "active learning", strategy))
+
+        for exp in glob(str(pjoin(path, "*f1.csv"))):
+            df = pd.read_csv(exp)
+            data = data + df["Micro_f1"].tolist()
+
+    return data
+
+
+# Main
+# ----
+def pl_statistical_tests():
+    """Statistical tests for the passive learning experiments."""
+
+    # load results
+    n2c2_results = collect_pl_results_n2c2(Path(pjoin("results", "n2c2", "all")))
+    ddi_results = collect_pl_results_ddi(Path(pjoin("results", "ddi")))
+
+    # concatenate results
+    n2c2_results = n2c2_results[n2c2_results["relation"] == "Micro"]
+    n2c2_results = n2c2_results[["f1", "method"]]
+    n2c2_results.columns = ["Micro_f1", "method"]
+    n2c2_results["corpus"] = "n2c2"
+    ddi_results["corpus"] = "ddi"
+    results = pd.concat([n2c2_results, ddi_results])
+
+    # friedman_test(results)
+    ivan_and_davenport_test(results)
+
+
+def al_performance_statistical_tests():
+    """Statistical tests for the active learning experiments."""
+
+    # create an array to store the p-values for each strategy and scenario
+    p_values = np.zeros((2, 4))
+
+    for j, method in enumerate(METHODS_NAMES.keys()):
+        method_p_values = []
+        strategies = STRATEGIES[method]
+
+        for i, strategy_name in enumerate(strategies):
+            # iterate over each scenario
+            strategy_data = load_method_data(method=method, strategy=strategy_name)
+            baseline_data = load_method_data(method=method, strategy=BASELINE_NAME)
+
+            assert len(strategy_data) == len(baseline_data)
+
+            # calculate the Wilcoxon signed-rank test p-value for the pairs
+            _, p_value = wilcoxon(
+                x=strategy_data, y=baseline_data, alternative="greater"
+            )
+
+            # store the p-value for the current strategy and method
+            method_p_values.append(p_value)
+
+        # perform Bonferroni correction on the p-values for the current scenario
+        rejected, corrected_p_values, _, _ = multipletests(
+            method_p_values, alpha=ALPHA, method="bonferroni"
+        )
+
+        # store the corrected p-values in the array
+        p_values[:, j] = corrected_p_values
+
+    # print the corrected p-values and indicate whether the null hypothesis is rejected or not
+    for j, method in enumerate(METHODS_NAMES.keys()):
+        strategies = STRATEGIES[method]
+
+        for i, strategy_name in enumerate(strategies):
+            is_rejected = p_values[i, j] <= ALPHA
+            print(
+                f"Strategy {strategy_name} vs. {BASELINE_NAME} with method {method}: "
+                f"p-value = {p_values[i, j]:.10f}, "
+                f"null hypothesis is {'rejected' if is_rejected else 'not rejected'}"
+            )
+            print()
+
+
+def ar_statistical_test():
+    ar_ddi = collect_annotation_rates(Path(pjoin("results", "ddi")))
+    ar_n2c2 = collect_annotation_rates(Path(pjoin("results", "n2c2", "all")))
+    ar_ddi["Corpus"] = "DDI"
+    ar_n2c2["Corpus"] = "n2c2"
+    ar_results = pd.concat([ar_ddi, ar_n2c2])
+
+    # create an array to store the p-values for each strategy and scenario
+
+    for metric in ["TAR", "CAR"]:
+        p_values = np.zeros((2, 4))
+
+        for j, method in enumerate(METHODS_NAMES.keys()):
+            method_p_values = []
+
+            for i, strategy_name in enumerate(["LC", "BatchLC / BatchBALD"]):
+                # iterate over each scenario
+                strategy_data = ar_results.loc[
+                    (ar_results["method"] == method)
+                    & (ar_results["strategy"] == strategy_name)
+                ][metric]
+                baseline_data = ar_results.loc[
+                    (ar_results["method"] == method)
+                    & (ar_results["strategy"] == BASELINE_NAME)
+                ][metric]
+
+                assert len(strategy_data) == len(baseline_data)
+
+                # calculate the Wilcoxon signed-rank test p-value for the pairs
+                _, p_value = wilcoxon(
+                    x=strategy_data, y=baseline_data, alternative="two-sided"
+                )
+
+                # store the p-value for the current strategy and method
+                method_p_values.append(p_value)
+
+            # perform Bonferroni correction on the p-values for the current scenario
+            rejected, corrected_p_values, _, _ = multipletests(
+                method_p_values, alpha=ALPHA, method="bonferroni"
+            )
+
+            # store the corrected p-values in the array
+            p_values[:, j] = corrected_p_values
+
+        # print the corrected p-values and indicate whether the null hypothesis is rejected or not
+        print("\nMetric: ", metric)
+        for j, method in enumerate(METHODS_NAMES.keys()):
+            strategies = STRATEGIES[method]
+
+            for i, strategy_name in enumerate(strategies):
+                is_rejected = p_values[i, j] <= ALPHA
+                if is_rejected:
+                    print(
+                        f"Strategy {strategy_name} vs. {BASELINE_NAME} with method {method}: "
+                        f"p-value = {p_values[i, j]:.10f}, "
+                        f"null hypothesis is {'rejected' if is_rejected else 'not rejected'}"
+                    )
+                    print()