Diff of /src/ml_models/rf.py [000000] .. [735bb5]

Switch to side-by-side view

--- a
+++ b/src/ml_models/rf.py
@@ -0,0 +1,271 @@
+# Base Dependencies
+# ----------------
+import numpy as np
+
+from typing import Any
+
+# Local Dependencies
+# ------------------
+from features import RandomForestFeatures
+from utils import ddi_binary_relation
+
+# 3rd-Party Dependencies
+# ----------------------
+from sklearn.base import BaseEstimator
+from sklearn.ensemble import RandomForestClassifier
+from imblearn.ensemble import BalancedRandomForestClassifier
+from sklearn.pipeline import Pipeline
+from sklearn.model_selection import RandomizedSearchCV, GridSearchCV
+
+# Constants
+# ---------
+RF_HYPERPARAM_GRID = {
+    "bootstrap": [True, False],
+    "max_depth": [2, 5, 10, 20, 30, 40, 50],
+    "max_features": ["sqrt", "log2", None],
+    "min_samples_leaf": [2, 3, 4],
+    "min_samples_split": [2, 5, 10],
+}
+
+RF_BINARY_THRESHOLD_GRID = [0.5, 0.55, 0.6, 0.65, 0.7, 0.8, 0.85, 0.9]
+
+
+# ML Models
+# ---------
+class RandomForestClassifierOneStage(BaseEstimator):
+    """Random Forest Classifier One Stage
+
+    Random Forest classifier that can be used for both N2C2 and DDI datasets.
+    It consideres a single stage of classification. For the n2c2 corpus, it
+    is used to classify between positive and negative relations. For the N2C2
+    corpus, it is used to classify between the 5 relation types, including
+    the NO-REL type.
+    """
+
+    def __init__(self, dataset: str) -> None:
+        """Initializes the model
+
+        Args:
+            dataset (str): dataset's name
+        """
+        super().__init__()
+        self.dataset = dataset
+        self.clf = RandomForestClassifier(class_weight="balanced")
+
+    @property
+    def scoring(self) -> str:
+        """Scoring metric to use for hyperparameter tuning"""
+        if self.dataset == "n2c2":
+            return "f1"
+        else:
+            return "f1_micro"
+
+    def score(self, X: np.array, Y: np.array, sample_weight=None) -> float:
+        """Scores the model
+
+        Args:
+            X (np.array): Feature matrix
+            Y (np.array): Label vector
+
+        Returns:
+            float: Score
+        """
+        from sklearn.metrics import f1_score
+        return f1_score(Y, self.predict(X), sample_weight=sample_weight)
+    
+    def fit(self, X: np.array, Y: np.array):
+        """Fits the model. It uses 5-fold cross validation to find the best hyperparameters.
+
+        Args:
+            X (np.array): Feature matrix
+            Y (np.array): Label vector
+
+        Returns:
+            RandomForestClassifierOneStage: Fitted model
+        """
+        assert X.shape[0] == len(Y)
+        assert len(X.shape) == 2
+
+        search = RandomizedSearchCV(self.clf, RF_HYPERPARAM_GRID, scoring=self.scoring)
+        search = search.fit(X, Y)
+        self.clf = search.best_estimator_
+
+        return self
+
+    def predict(self, X: np.array):
+        """Predicts the class of a given sample
+
+        Args:
+            X (np.array): Feature matrix
+
+        Returns:
+            np.array: Predicted class
+        """
+        return self.clf.predict(X)
+
+    def predict_log_probab(self, X: np.array):
+        """Predicts the log probability of a given sample
+
+        Args:
+            X (np.array): Feature matrix
+
+        Returns:
+            np.array: Predicted log probability
+        """
+        return self.clf.predict_log_proba(X)
+
+    def predict_proba(self, X: np.array):
+        """Predicts the probability of a given sample
+
+        Args:
+            X (np.array): Feature matrix
+
+        Returns:
+            np.array: Predicted probability
+        """
+        return self.clf.predict_proba(X)
+
+
+class RandomForestClassifierTwoStage(BaseEstimator):
+    """Random Forest Classifier Two Stage 
+
+    Random Forest Classifier that can be used for the DDI dataset. It considers
+    a two stage classification. The first stage is used to classify between posivite
+    and negative relations. The second stage is used to classify between the 4
+    relation types.
+    """
+
+    class BalancedRandomForestClassifierBinary(BaseEstimator):
+        """Balanced Random Forest Classifier Binary
+
+        Random Forest Classifier used in the first stage of the two stage classification.
+        It classifies between positive and negative relations.
+
+        """
+
+        def __init__(
+            self,
+            dataset: str,
+            threshold: float = 0.7,
+        ) -> None:
+            """Initializes the model
+
+            Args:
+                dataset (str): dataset's name
+                threshold (float, optional): classification threshold to classify instances
+                    as positive. Defaults to 0.7.
+            """
+            super().__init__()
+            self.dataset = dataset
+            self.threshold = threshold
+            self.clf = BalancedRandomForestClassifier(class_weight="balanced")
+
+        def make_binary(self, y: Any) -> np.array:
+            
+            if self.dataset == "ddi":
+                return ddi_binary_relation(y)
+            else:
+                raise NotImplementedError
+            
+        def fit(self, X: np.array, Y: np.array):
+            """Fits the model.
+
+            Args:
+                X (np.array): Feature matrix
+                Y (np.array): Label vector
+
+            Returns:
+                BalancedRandomForestClassifierBinary: Fitted model
+            """
+            assert X.shape[0] == len(Y)
+            assert len(X.shape) == 2
+
+            # fit binary random forest
+            self.clf = self.clf.fit(X, Y)
+
+            return self
+
+        def predict(self, X: np.array):
+            """Predicts the class of a given sample
+
+            Args:
+                X (np.array): Feature matrix
+
+            Returns:
+                np.array: Predicted class
+            """
+            Y = (self.clf.predict_proba(X)[:, 1] >= self.threshold).astype(bool)
+            return Y
+
+    def __init__(self) -> None:
+        super().__init__()
+        # 1st classifier - Detect relations - classify between positive and negative
+        self.clf1 = self.BalancedRandomForestClassifierBinary()
+
+        # 2nd classifier - Classifiy relation - classify positive relations into a relation type
+        self.clf2 = BalancedRandomForestClassifier()
+
+    def fit(self, X: np.array, Y: np.array):
+        """Fits the model.
+
+        Args:
+            X (np.array): Feature matrix
+            Y (np.array): Label vector
+
+        Returns:
+            RandomForestClassifierTwoStage: Fitted model
+        """
+        assert X.shape[0] == len(Y)
+        assert len(X.shape) == 2
+        Y_1 = np.array(list(map(lambda y: self.make_binary(y), Y)))
+
+        # fit 1st classifier
+        search1 = GridSearchCV(
+            estimator=self.clf1,
+            param_grid={"threshold": RF_BINARY_THRESHOLD_GRID},
+            scoring="f1",
+        )
+        search1 = search1.fit(X, Y_1)
+        self.clf1 = search1.best_estimator_
+
+        # fit 2nd classifier
+        search2 = RandomizedSearchCV(
+            estimator=self.clf2,
+            param_distributions=RF_HYPERPARAM_GRID,
+            scoring="f1_micro",
+        )
+        search2 = search2.fit(X[Y > 0, :], Y[Y > 0])
+        self.clf2 = search2.best_estimator_
+
+        return self
+
+    def predict(self, X: np.array):
+        """Predicts the class of a given sample
+
+        Args:
+            X (np.array): Feature matrix
+
+        Returns:
+            np.array: Predicted class
+        """
+        Y = self.clf1.predict(X)
+        Y = np.array(Y, dtype=np.int8)
+        Y[Y > 0] = self.clf2.predict(X[Y > 0])
+        return Y
+
+
+# ML Pipelines
+# ------------
+# RandomForestPipelineN2C2 = Pipeline(
+#     [
+#         ("encoder", RandomForestFeatures("n2c2")),
+#         ("clf", RandomForestClassifierOneStageN2C2()),
+#     ]
+# )
+
+# RandomForestPipelineDDI = Pipeline(
+#     [
+#         ("encoder", RandomForestFeatures("ddi")),
+#         ("clf", RandomForestClassifierOneStageDDI()),
+#     ]
+# )