[03245f]: / modules / cluster.py

Download this file

135 lines (102 with data), 4.8 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# import sys
import sys
sys.path.append('..')
# progress bar import
from tqdm import tqdm
# numpy, sklearn imports
import numpy as np
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_samples, silhouette_score
from tensorflow.keras.preprocessing.image import load_img
# utils imports
from models import *
from utils.dataset import ImageCLEFDataset
class Cluster:
def __init__(self, K:int, clef_dataset:ImageCLEFDataset):
""" Class to perform K-Means clustering in ImageCLEF dataset. We used this system for the contest
Args:
K (int): The K clusters we want
clef_dataset (ImageCLEFDataset): The dataset we employed. Only CLEF is acceptable.
"""
self.K = K
self.dataset = clef_dataset
def do_PCA(self, features:dict) -> np.array:
""" Perforrms Principal Component Analysis (PCA), to reduce the huge size of the arrays
Args:
features (dict): The image_ids, image_vectors pairs.
Returns:
np.array: The image_ids, image_vectors pairs, with reduced size.
"""
feat = np.array(list(features.values()))
feat = feat.reshape(-1, feat.shape[2])
pca = PCA(n_components=100, random_state=22)
pca.fit(feat)
x = pca.transform(feat)
return x
def do_Kmeans(self, x:np.array) -> KMeans:
""" Fit the K-Means
Args:
x (np.array): The image vectors
Returns:
KMeans: The fitted K-Means object
"""
kmeans = KMeans(n_clusters=self.K, random_state=22)
kmeans.fit(x)
return kmeans
def load_features(self) ->tuple[list[dict], list[dict], list[dict]]:
""" Loads train, validation, test sets
Returns:
tuple[list[dict], list[dict], list[dict]]: The train, validation, test sets in dictionary format
"""
return self.dataset.get_splits_sets()
def clustering(self) -> tuple[dict, dict, dict]:
""" Performs the k-Means clustering using the fitted K-Means object.
Returns:
tuple[dict, dict, dict]: The clustered train, val, test image_ids, image_vectors pairs.
"""
# load splits
train_features, valid_features, test_features = self.load_features()
# get the ids for each split
train_ids, val_ids, test_ids = list(train_features[0].keys()), list(valid_features[0].keys()), list(test_features[0].keys())
# concate all features to perform a more efficient K-Means
all_features = dict(train_features, **valid_features)
all_features = dict(all_features, **test_features)
# reduce size for fast training
pca = self.do_PCA(all_features)
# perform clustering
kmeans = self.do_Kmeans(pca)
train_index_limit, val_index_limit = len(train_features), len(train_features)+len(valid_features)
# get the clustering labels for each set
train_k_means_labels = kmeans.labels_[:train_index_limit]
valid_k_means_labels = kmeans.labels_[train_index_limit:val_index_limit]
test_k_means_labels = kmeans.labels_[val_index_limit:]
print('# train kmeans:', len(train_k_means_labels))
print('# dev kmeans:', len(valid_k_means_labels))
print('# test kmeans:', len(test_k_means_labels))
# store the clustered train, validation, test set images
groups_train = {}
for file, cluster in tqdm(zip(train_ids, train_k_means_labels)):
if cluster not in groups_train.keys():
groups_train[cluster] = []
groups_train[cluster].append(file)
else:
groups_train[cluster].append(file)
groups_valid = {}
for file, cluster in tqdm(zip(val_ids, valid_k_means_labels)):
if cluster not in groups_valid.keys():
groups_valid[cluster] = []
groups_valid[cluster].append(file)
else:
groups_valid[cluster].append(file)
groups_test = {}
for file, cluster in tqdm(zip(test_ids, test_k_means_labels)):
if cluster not in groups_test.keys():
groups_test[cluster] = []
groups_test[cluster].append(file)
else:
groups_test[cluster].append(file)
print('# train kmeans:', len(groups_train))
print('# dev kmeans:', len(groups_valid))
print('# test kmeans:', len(groups_test))
return groups_train, groups_valid, groups_test