Switch to side-by-side view

--- a
+++ b/aggmap/aggmodel/explainer.py
@@ -0,0 +1,526 @@
+# -*- coding: utf-8 -*-
+"""
+Created on Fri Sep. 17 17:10:53 2021
+
+@author: wanxiang.shen@u.nus.edu
+"""
+
+
+import numpy as np
+import pandas as pd
+
+from tqdm import tqdm
+from copy import copy
+import shap
+
+
+from sklearn.metrics import mean_squared_error, log_loss
+from sklearn.preprocessing import StandardScaler
+
+from aggmap.utils.matrixopt import conv2
+from aggmap.utils.logtools import print_warn, print_info
+
+
+
+class shapley_explainer:
+    """Kernel Shap based model explaination, the limiations can be found in this paper:https://christophm.github.io/interpretable-ml-book/shapley.html#disadvantages-16 <Problems with Shapley-value-based explanations as feature importance measures>. The SHAP values do not identify causality Global mean absolute Deep SHAP feature importance is the average impact on model output magnitude.
+    
+  
+    Parameters
+    ----------
+    estimator:
+        model with a predict or predict_probe method
+    mp:
+        aggmap object
+    backgroud: string or int
+        {'min', 'global_min','all', int}.
+        if min, then use the min value as the backgroud data (equals to 1 sample)
+        if global_min, then use the min value of all data as the backgroud data.   
+        if int, then sample the K samples as the backgroud data
+        if 'all' use all of the train data as the backgroud data for shap,
+    k_means_sampling: bool,
+        whether use the k-mean to sample the backgroud values or not
+    link : 
+        {"identity", "logit"}. A generalized linear model link to connect the feature importance values to the model output. 
+        Since the feature importance values, phi, sum up to the model output, it often makes sense to connect them 
+        to the output with a link function where link(output) = sum(phi). 
+        If the model output is a probability then the LogitLink link function makes the feature importance values have log-odds units.
+    args: 
+        Other parameters for shap.KernelExplainer.
+        
+        
+    
+    Examples
+    --------
+    >>> import seaborn as sns
+    >>> from aggmap.aggmodel.explainer import shapley_explainer
+    >>> ## shapley explainer
+    >>> shap_explainer = shapley_explainer(estimator, mp)
+    >>> global_imp_shap = shap_explainer.global_explain(clf.X_)
+    >>> local_imp_shap = shap_explainer.local_explain(clf.X_[[0]])
+    >>> ## S-map of shapley explainer
+    >>> sns.heatmap(local_imp_shap.shapley_importance_class_1.values.reshape(mp.fmap_shape), 
+    >>> cmap = 'rainbow')
+    >>> ## shapley plot
+    >>> shap.summary_plot(shap_explainer.shap_values, 
+    >>> feature_names = shap_explainer.feature_names) # #global  plot_type='bar
+    >>> shap.initjs()
+    >>> shap.force_plot(shap_explainer.explainer.expected_value[1], 
+    >>> shap_explainer.shap_values[1], feature_names = shap_explainer.feature_names)
+
+    """
+
+    def __init__(self, estimator, mp, backgroud = 'min', k_means_sampling = True, link='identity', **args):
+        '''
+        
+        Parameters
+        ----------
+        estimator:
+            model with a predict or predict_probe method
+        mp:
+            aggmap object
+        backgroud: string or int
+            {'min', 'global_min', 'all', int}.
+            if min, then use the min value as the backgroud data (equals to 1 sample)
+            if global_min, then use the min value of all data as the backgroud data.            
+            if int, then sample the K samples as the backgroud data
+            if 'all' use all of the train data as the backgroud data for shap,
+        k_means_sampling: bool,
+            whether use the k-mean to sample the backgroud values or not
+        link : 
+            {"identity", "logit"}. A generalized linear model link to connect the feature importance values to the model output. 
+            Since the feature importance values, phi, sum up to the model output, it often makes sense to connect them 
+            to the output with a link function where link(output) = sum(phi). 
+            If the model output is a probability then the LogitLink link function makes the feature importance values have log-odds units.
+        args: 
+            Other parameters for shap.KernelExplainer
+        '''
+        self.estimator = estimator
+        self.mp = mp
+        self.link = link
+        self.backgroud = backgroud
+        self.k_means_sampling = k_means_sampling
+        
+        train_features = self.covert_mpX_to_shapely_df(self.estimator.X_)
+        
+        if type(backgroud) == int:
+            if self.k_means_sampling:
+                self.backgroud_data =  shap.kmeans(train_features, backgroud)
+            else:
+                self.backgroud_data =  shap.sample(train_features, backgroud)
+            
+        else:
+            if backgroud == 'min':
+                self.backgroud_data =  train_features.min().to_frame().T.values
+                
+            elif backgroud == 'global_min':
+                gmin =  train_features.min().min()
+                self.backgroud_data =  np.full(shape=(1, train_features.shape[1]), 
+                                               fill_value = gmin)                
+            else:
+                self.backgroud_data =  train_features
+
+                    
+        self.explainer = shap.KernelExplainer(self._predict_warpper, self.backgroud_data, link=self.link, **args)
+        self.feature_names = train_features.columns.tolist() # mp.alist
+
+        
+    def _predict_warpper(self, X):
+        X_new = self.mp.batch_transform(X, scale=False)
+        if self.estimator.name == 'AggMap Regression Estimator': # case regression task
+            predict_results = self.estimator.predict(X_new)
+        else:
+            predict_results = self.estimator.predict_proba(X_new)
+        return predict_results
+    
+    def get_shap_values(self, X, nsamples = 'auto', **args):
+        df_to_explain = self.covert_mpX_to_shapely_df(X)
+        shap_values = self.explainer.shap_values(df_to_explain, nsamples=nsamples, **args)
+        all_imps = []
+        for i, data in enumerate(shap_values):
+            name = 'shapley_importance_class_' + str(i) 
+            imp = abs(pd.DataFrame(data, columns = self.feature_names)).mean().to_frame(name = name)
+            all_imps.append(imp)
+
+        df_reshape = self.mp.df_grid_reshape.set_index('v')
+        df_reshape.index = self.mp.feature_names_reshape
+        df_imp = df_reshape.join(pd.concat(all_imps, axis=1)).fillna(0)
+        self.df_imp = df_imp
+        self.shap_values = shap_values
+        return shap_values
+    
+    
+    def local_explain(self, X=None, idx=0, nsamples = 'auto', **args):
+        
+        '''
+        Explaination of one sample only:
+        
+        Parameters
+        ----------
+        X: None or 4D array, where the shape is (n, w, h, c)
+           the 4D array of AggMap multi-channel fmaps.
+           Noted if X is None, then use the estimator.X_[[idx]] instead, namely explain the first sample if idx=0
+        nsamples: {'auto', int}
+            Number of times to re-evaluate the model when explaining each prediction. 
+            More samples lead to lower variance estimates of the SHAP values. The “auto” setting uses nsamples = 2 * X.shape[1] + 2048
+        args: other parameters in the shape_values method of shap.KernelExplainer 
+        '''
+        if X is None:
+            print_info('Explaining the first sample only')
+            X = self.clf.X_[[idx]]
+        assert len(X.shape) == 4, 'input X mush a 4D array: (1, w, h, c)'
+        assert len(X) == 1,  'Input X must has one sample only, but got %s' % len(X)
+        
+        shap_values = self.get_shap_values(X, nsamples = nsamples, **args)
+        self.shap_values = shap_values
+        return self.df_imp
+    
+    
+    def global_explain(self, X=None, nsamples = 'auto', **args):
+        '''
+        Explaination of many samples.
+        
+        Parameters
+        ----------
+        X: None or 4D array, where the shape is (n, w, h, c)
+           the 4D array of AggMap multi-channel fmaps.
+           Noted that if X is None, then use the estimator.X_ instead, namely explain the training set of the estimator
+        nsamples: {'auto', int}
+            Number of times to re-evaluate the model when explaining each prediction. 
+            More samples lead to lower variance estimates of the SHAP values. The “auto” setting uses nsamples = 2 * X.shape[1] + 2048
+        args: other parameters in the shape_values method of shap.KernelExplainer 
+        '''
+        if X is None:
+            X = self.clf.X_
+            print_info('Explaining the whole samples of the training Set')
+        assert len(X.shape) == 4, 'input X mush a 4D array: (n, w, h, c)'
+        
+        shap_values = self.get_shap_values(X, nsamples = nsamples, **args)
+        self.shap_values = shap_values
+        return self.df_imp
+
+    
+    def _covert_x_2D(self, X, feature_names):
+        n, w,h, c = X.shape
+        assert len(feature_names) == w*h, 'length of feature_names should be w*h of X.shape (n, w, h,c)'
+        X_2D = X.sum(axis=-1).reshape(n, w*h)
+        return pd.DataFrame(X_2D, columns = feature_names)
+
+
+    def covert_mpX_to_shapely_df(self, X):
+        dfx_stack_reshape = self._covert_x_2D(X, feature_names = self.mp.feature_names_reshape)
+        shapely_df = pd.DataFrame(index=self.mp.alist).join(dfx_stack_reshape.T).T
+        shapely_df = shapely_df.fillna(0)
+        return shapely_df
+
+    
+    
+
+class simply_explainer:
+    
+    """Simply-explainer for model explaination.
+
+    Parameters
+    ----------
+    estimator: object
+        model with a predict or predict_probe method
+    mp: object
+        aggmap object
+    backgroud: {'min', 'global_min','zeros'}, default: 'min'.
+        if "min", then use the min value of a vector of the training set,
+        if 'global_min', then use the min value of all training set.
+        if 'zero', then use all zeros as the backgroud data.    
+    apply_logrithm: bool, default: False
+        apply the logirthm to the feature importance score
+    apply_smoothing: bool, default: False
+        apply the gaussian smoothing on the feature importance score (Saliency map)
+    kernel_size: int, default: 5.
+        the kernel size for the smoothing
+    sigma: float, default: 1.0.
+        the sigma for the smoothing.
+        
+    
+    
+            
+    Examples
+    --------
+    >>> import seaborn as sns
+    >>> from aggmap.aggmodel.explainer import simply_explainer
+    >>> simp_explainer = simply_explainer(estimator, mp)
+    >>> global_imp_simp = simp_explainer.global_explain(clf.X_, clf.y_)
+    >>> local_imp_simp = simp_explainer.local_explain(clf.X_[[0]], clf.y_[[0]])    
+    >>> ## S-map of simply explainer
+    >>> sns.heatmap(local_imp_simp.simply_importance.values.reshape(mp.fmap_shape), 
+    >>> cmap = 'rainbow')
+    
+    """
+    
+    def __init__(self, 
+                 estimator, 
+                 mp, 
+                 backgroud = 'min', 
+                 apply_logrithm = False,
+                 apply_smoothing = False, 
+                 kernel_size = 5, 
+                 sigma = 1.
+                ):
+        '''
+        Simply-explainer for model explaination.
+        
+        Parameters
+        ----------
+        estimator:
+            model with a predict or predict_probe method
+        mp:
+            aggmap object
+        backgroud: 
+            {'min', 'global_min', 'zeros'}, 
+            if 'zero' use all zeros as the backgroud data, 
+            if "min" use the min value of a vector of the training set,
+            if 'global_min', use the min value of all training set.
+        apply_logrithm: bool, default: False
+            apply the logirthm to the feature importance score
+        apply_smoothing: bool, default: False
+            apply the gaussian smoothing on the feature importance score (s-map )
+        kernel_size:
+            the kernel size for the smoothing
+        sigma:
+            the sigma for the smoothing.
+        '''
+        self.estimator = estimator
+        self.mp = mp
+        self.apply_logrithm = apply_logrithm
+        self.apply_smoothing = apply_smoothing
+        self.kernel_size = kernel_size
+        self.sigma = sigma
+        self.backgroud = backgroud
+        if backgroud == 'min':
+            self.backgroud_data =  mp.transform_mpX_to_df(self.estimator.X_).min().values
+        elif backgroud == 'zeros':
+            self.backgroud_data =  np.zeros(shape=(len(mp.df_grid_reshape), ))
+        else:
+            gmin = self.estimator.X_.min()
+            self.backgroud_data =  np.full(shape=(len(mp.df_grid_reshape), ), 
+                                           fill_value = gmin)
+
+        self.scaler = StandardScaler()
+
+        df_grid = mp.df_grid_reshape.set_index('v')
+        df_grid.index = self.mp.feature_names_reshape
+        self.df_grid = df_grid
+        
+        if self.estimator.name == 'AggMap Regression Estimator':
+            self._f = mean_squared_error
+        else:
+            self._f = log_loss
+        
+    def _sigmoid(self, x):
+        return 1 / (1 + np.exp(-x))
+
+    def _islice(self, lst, n):
+        return [lst[i:i + n] for i in range(0, len(lst), n)]    
+    
+    
+    def global_explain(self, X=None, y=None):
+        '''
+        Explaination of many samples.
+        
+        Parameters
+        ----------
+        X: None or 4D array, where the shape is (n, w, h, c)
+           the 4D array of AggMap multi-channel fmaps
+        y: None or 4D array, where the shape is (n, class_num)
+           the True label
+        Noted that if X and y are None, then use the estimator.X_ and estimator.y_ instead, namely explain the training set of the estimator
+        '''
+        
+        if X is None:
+            X = self.estimator.X_
+            y = self.estimator.y_
+            print_info('Explaining the whole samples of the training Set')
+        
+        assert len(X.shape) == 4, 'input X mush a 4D array: (n, w, h, c)'
+        N, W, H, C = X.shape
+        
+        dfY = pd.DataFrame(y)
+        Y_true = y
+        Y_prob = self.estimator._model.predict(X, verbose = 0)
+        
+        T = len(self.df_grid)
+        nX = 20 # 10 arrX to predict
+
+        if self.estimator.name == 'AggMap MultiLabels Estimator':
+            Y_prob = self._sigmoid(Y_prob)
+        final_res = {}
+        for k, col in enumerate(dfY.columns):
+            print_info('calculating feature importance for class %s ...' % col)
+            results = []
+            loss = self._f(Y_true[:, k].tolist(), Y_prob[:, k].tolist())
+            
+            tmp_X = []
+            flag = 0
+            for i in tqdm(range(T), ascii= True):
+                ts = self.df_grid.iloc[i]
+                y = ts.y
+                x = ts.x
+                ## step 1: make permutaions
+                X1 = np.array(X)
+                #x_min = X[:, y, x,:].min()
+                vmin = self.backgroud_data[i]
+                X1[:, y, x,:] = np.full(X1[:, y, x,:].shape, fill_value = vmin)
+                tmp_X.append(X1)
+
+                if (flag == nX) | (i == T-1):
+                    X2p = np.concatenate(tmp_X)
+                    ## step 2: make predictions
+                    Y_pred_prob = self.estimator._model.predict(X2p, verbose = 0) #predict ont by one is not efficiency
+                    if self.estimator.name == 'AggMap MultiLabels Estimator':
+                        Y_pred_prob = self._sigmoid(Y_pred_prob)    
+
+                    ## step 3: calculate changes
+                    for Y_pred in self._islice(Y_pred_prob, N):
+                        mut_loss = self._f(Y_true[:, k].tolist(), Y_pred[:, k].tolist()) 
+                        res =  mut_loss - loss # if res > 0, important, othervise, not important
+                        results.append(res)
+                    flag = 0
+                    tmp_X = []
+                flag += 1
+
+            ## step 4:apply scaling or smothing 
+            s = pd.DataFrame(results).values
+            if self.apply_logrithm:
+                s = np.log(s)
+            smin = np.nanmin(s[s != -np.inf])
+            smax = np.nanmax(s[s != np.inf])
+            s = np.nan_to_num(s, nan=smin, posinf=smax, neginf=smin) #fillna with smin
+            a = self.scaler.fit_transform(s)
+            a = a.reshape(*self.mp.fmap_shape)
+            if self.apply_smoothing:
+                covda = conv2(a, kernel_size=self.kernel_size, sigma=self.sigma)
+                results = covda.reshape(-1,).tolist()
+            else:
+                results = a.reshape(-1,).tolist()
+            final_res.update({col:results})
+
+        df = pd.DataFrame(final_res, index = self.mp.feature_names_reshape)
+        df.columns = df.columns.astype(str)
+        df.columns = 'simply_importance_class_' + df.columns
+        df = self.df_grid.join(df)
+        return df
+
+
+    def local_explain(self, X=None, y=None, idx=0):
+        '''
+        Explaination of one sample only.
+        
+        Parameters
+        ----------
+        X: None or 4D array, where the shape is (1, w, h, c)
+        y: the True label, None or 4D array, where the shape is (1, class_num).
+        idx: int, 
+             index of the sample to interpret
+             Noted that if X and y are None, then use the estimator.X_[[idx]] and estimator.y_[[idx]] instead, namely explain the first sample if idx=0.
+             
+        Return
+        ----------
+            Feature importance of the current class
+            
+
+        '''
+        
+        if X is None:
+            X = self.estimator.X_[[idx]]
+            y = self.estimator.y_[[idx]]
+            print_info('Explaining the one sample of the training Set')
+        
+        assert len(X.shape) == 4, 'input X mush a 4D array: (1, w, h, c)'
+        assert (len(X) == 1) & (len(y) == 1), 'Input X, y must have one sample only, but got %s, %s' % (len(X), len(y))
+
+        
+        N, W, H, C = X.shape
+        
+        dfY = pd.DataFrame(y)
+        Y_true = y
+        Y_prob = self.estimator._model.predict(X, verbose = 0)
+        
+        T = len(self.df_grid)
+        nX = 20 # 10 arrX to predict
+
+        if self.estimator.name == 'AggMap MultiLabels Estimator':
+            Y_prob = self._sigmoid(Y_prob)
+
+        results = []
+        loss = self._f(Y_true.ravel().tolist(),  Y_prob.ravel().tolist())
+
+        all_X1 = []
+        for i in tqdm(range(T), ascii= True):
+            ts = self.df_grid.iloc[i]
+            y = ts.y
+            x = ts.x
+            X1 = np.array(X)
+            vmin = self.backgroud_data[i]
+            X1[:, y, x,:] = np.full(X1[:, y, x,:].shape, fill_value = vmin)
+            all_X1.append(X1)
+
+        all_X = np.concatenate(all_X1)
+        all_Y_pred_prob = self.estimator._model.predict(all_X, verbose = 0)
+
+        for Y_pred_prob in all_Y_pred_prob:
+            if self.estimator.name == 'AggMap MultiLabels Estimator':
+                Y_pred_prob = self._sigmoid(Y_pred_prob)
+            mut_loss = self._f(Y_true.ravel().tolist(), Y_pred_prob.ravel().tolist()) 
+            res =  mut_loss - loss # if res > 0, important, othervise, not important
+            results.append(res)
+
+        ## apply smothing and scalings
+        s = pd.DataFrame(results).values
+        if self.apply_logrithm:
+            s = np.log(s)
+        smin = np.nanmin(s[s != -np.inf])
+        smax = np.nanmax(s[s != np.inf])
+        s = np.nan_to_num(s, nan=smin, posinf=smax, neginf=smin) #fillna with smin
+        a = self.scaler.fit_transform(s)
+        a = a.reshape(*self.mp.fmap_shape)
+        if self.apply_smoothing:
+            covda = conv2(a, kernel_size=self.kernel_size, sigma=self.sigma)
+            results = covda.reshape(-1,).tolist()
+        else:
+            results = a.reshape(-1,).tolist()
+
+        df = pd.DataFrame(results, 
+                          index = self.mp.feature_names_reshape,
+                          columns = ['simply_importance'])
+        df = self.df_grid.join(df)
+        return df
+
+    
+
+if __name__ == '__main__':
+    '''
+    Model explaination using two methods: simply explainer and shapley explainer
+    '''
+    
+    import seaborn as sns
+    
+    ## simply explainer
+    simp_explainer = simply_explainer(estimator, mp)
+    global_imp_simp = simp_explainer.global_explain(clf.X_, clf.y_)
+    local_imp_simp = simp_explainer.local_explain(clf.X_[[0]], clf.y_[[0]])    
+    
+    ## S-map of simply explainer
+    sns.heatmap(local_imp_simp.simply_importance.values.reshape(mp.fmap_shape), cmap = 'rainbow')
+    
+    
+    
+    ## shapley explainer
+    shap_explainer = shapley_explainer(estimator, mp)
+    global_imp_shap = shap_explainer.global_explain(clf.X_)
+    local_imp_shap = shap_explainer.local_explain(clf.X_[[0]])
+    
+    ## S-map of shapley explainer
+    sns.heatmap(local_imp_shap.shapley_importance_class_1.values.reshape(mp.fmap_shape), cmap = 'rainbow')
+
+    ## shapley plot
+    shap.summary_plot(shap_explainer.shap_values, feature_names = shap_explainer.feature_names) # #global  plot_type='bar
+    shap.initjs()
+    shap.force_plot(shap_explainer.explainer.expected_value[1], shap_explainer.shap_values[1], feature_names = shap_explainer.feature_names)