--- a +++ b/modules/cluster.py @@ -0,0 +1,134 @@ +# import sys +import sys +sys.path.append('..') + +# progress bar import +from tqdm import tqdm + +# numpy, sklearn imports +import numpy as np +from sklearn.cluster import KMeans +from sklearn.decomposition import PCA +from sklearn.metrics import silhouette_samples, silhouette_score +from tensorflow.keras.preprocessing.image import load_img + +# utils imports +from models import * +from utils.dataset import ImageCLEFDataset + + +class Cluster: + + def __init__(self, K:int, clef_dataset:ImageCLEFDataset): + """ Class to perform K-Means clustering in ImageCLEF dataset. We used this system for the contest + + Args: + K (int): The K clusters we want + clef_dataset (ImageCLEFDataset): The dataset we employed. Only CLEF is acceptable. + """ + self.K = K + self.dataset = clef_dataset + + def do_PCA(self, features:dict) -> np.array: + """ Perforrms Principal Component Analysis (PCA), to reduce the huge size of the arrays + + Args: + features (dict): The image_ids, image_vectors pairs. + + Returns: + np.array: The image_ids, image_vectors pairs, with reduced size. + """ + + feat = np.array(list(features.values())) + feat = feat.reshape(-1, feat.shape[2]) + + pca = PCA(n_components=100, random_state=22) + pca.fit(feat) + x = pca.transform(feat) + return x + + def do_Kmeans(self, x:np.array) -> KMeans: + """ Fit the K-Means + + Args: + x (np.array): The image vectors + + Returns: + KMeans: The fitted K-Means object + """ + kmeans = KMeans(n_clusters=self.K, random_state=22) + kmeans.fit(x) + return kmeans + + def load_features(self) ->tuple[list[dict], list[dict], list[dict]]: + """ Loads train, validation, test sets + + Returns: + tuple[list[dict], list[dict], list[dict]]: The train, validation, test sets in dictionary format + """ + return self.dataset.get_splits_sets() + + def clustering(self) -> tuple[dict, dict, dict]: + """ Performs the k-Means clustering using the fitted K-Means object. + + Returns: + tuple[dict, dict, dict]: The clustered train, val, test image_ids, image_vectors pairs. + """ + # load splits + train_features, valid_features, test_features = self.load_features() + # get the ids for each split + train_ids, val_ids, test_ids = list(train_features[0].keys()), list(valid_features[0].keys()), list(test_features[0].keys()) + + # concate all features to perform a more efficient K-Means + all_features = dict(train_features, **valid_features) + all_features = dict(all_features, **test_features) + + # reduce size for fast training + pca = self.do_PCA(all_features) + # perform clustering + kmeans = self.do_Kmeans(pca) + + train_index_limit, val_index_limit = len(train_features), len(train_features)+len(valid_features) + # get the clustering labels for each set + train_k_means_labels = kmeans.labels_[:train_index_limit] + valid_k_means_labels = kmeans.labels_[train_index_limit:val_index_limit] + test_k_means_labels = kmeans.labels_[val_index_limit:] + + + print('# train kmeans:', len(train_k_means_labels)) + print('# dev kmeans:', len(valid_k_means_labels)) + print('# test kmeans:', len(test_k_means_labels)) + + + # store the clustered train, validation, test set images + groups_train = {} + for file, cluster in tqdm(zip(train_ids, train_k_means_labels)): + if cluster not in groups_train.keys(): + groups_train[cluster] = [] + groups_train[cluster].append(file) + else: + groups_train[cluster].append(file) + + groups_valid = {} + for file, cluster in tqdm(zip(val_ids, valid_k_means_labels)): + if cluster not in groups_valid.keys(): + groups_valid[cluster] = [] + groups_valid[cluster].append(file) + else: + groups_valid[cluster].append(file) + + groups_test = {} + for file, cluster in tqdm(zip(test_ids, test_k_means_labels)): + if cluster not in groups_test.keys(): + groups_test[cluster] = [] + groups_test[cluster].append(file) + else: + groups_test[cluster].append(file) + + print('# train kmeans:', len(groups_train)) + print('# dev kmeans:', len(groups_valid)) + print('# test kmeans:', len(groups_test)) + return groups_train, groups_valid, groups_test + + +