--- a +++ b/bc-count/data.py @@ -0,0 +1,476 @@ +############################################## +# # +# Custom data generator # +# # +# Author: Amine Neggazi # +# Email: neggazimedlamine@gmail/com # +# Nick: nemo256 # +# # +# Please read bc-count/LICENSE # +# # +############################################## + +import os +import json + +import cv2 +import numpy as np +import tensorflow as tf +from tensorflow import keras + +# custom imports +from config import * + + +def load_image_list(img_files, gray=False): + ''' + This is the load image list function, which loads an enumerate + of images (param: img_files) + :param img_files --> the input image files which we want to read + + :return imgs --> the images that we read + ''' + imgs = [] + if gray: + for image_file in img_files: + img = cv2.imread(image_file, cv2.IMREAD_GRAYSCALE) + img = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY)[1] + imgs += [img] + + else: + for image_file in img_files: + imgs += [cv2.imread(image_file)] + return imgs + + +def clahe_images(img_list): + ''' + This is the clahe images function, which applies a clahe threshold + the input image list. + :param img_files --> the input image files which we want to read + + :return img_list --> the output images + ''' + for i, img in enumerate(img_list): + clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) + + lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB) + lab[..., 0] = clahe.apply(lab[..., 0]) + img_list[i] = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR) + return img_list + + +def preprocess_image(imgs, padding=padding[1]): + ''' + This is the preprocess data function, which adds a padding to + the input images, masks and edges if there are any. + :param imgs --> the input list of images. + :param padding --> the input padding which is going to be applied. + + :return imgs --> output images with added padding. + ''' + imgs = [np.pad(img, ((padding, padding), + (padding, padding), (0, 0)), mode='constant') for img in imgs] + return imgs + + +def preprocess_data(imgs, mask, edge=None, padding=padding[1]): + ''' + This is the preprocess data function, which adds a padding to + the input images, masks and edges if there are any. + :param imgs --> the input list of images. + :param mask --> the input list of masks. + :param edge --> the input list of edges. + :param padding --> the input padding which is going to be applied. + + :return tuple(imgs, mask, edge if exists) --> output images, masks and edges with padding added. + ''' + imgs = [np.pad(img, ((padding, padding), + (padding, padding), (0, 0)), mode='constant') for img in imgs] + mask = [np.pad(mask, ((padding, padding), + (padding, padding)), mode='constant') for mask in mask] + if edge is not None: + edge = [np.pad(edge, ((padding, padding), + (padding, padding)), mode='constant') for edge in edge] + + if edge is not None: + return imgs, mask, edge + + return imgs, mask + + +def load_data(img_list, mask_list, edge_list=None, padding=padding[1]): + ''' + This is the load data function, which will handle image loading and preprocessing. + :param img_list --> list of input images + :param mask_list --> list of input masks + :param edge_list --> list of input edges + :param padding --> padding to be applied on preprocessing + + :return tuple(imgs, masks and edges if exists) --> the output preprocessed imgs, masks and edges. + ''' + imgs = load_image_list(img_list) + imgs = clahe_images(imgs) + + mask = load_image_list(mask_list, gray=True) + if edge_list: + edge = load_image_list(edge_list, gray=True) + else: + edge = None + + return preprocess_data(imgs, mask, edge, padding=padding) + + +def load_image(img_list, padding=padding[1]): + ''' + This is the load data function, which will handle image loading and preprocessing. + :param img_list --> list of input images + :param padding --> padding to be applied on preprocessing + + :return imgs --> the output preprocessed imgs. + ''' + imgs = load_image_list(img_list) + imgs = clahe_images(imgs) + return preprocess_image(imgs, padding=padding) + + +def aug_lum(image, factor=None): + ''' + This is the augment luminosity function, which we apply to + augment the luminosity of an input image. + :param image --> the input image we want to augment + :param factor --> the factor of luminosity augment (default is 0.5 * random number) + + :return image --> the output luminosity augmented image + ''' + hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) + hsv = hsv.astype(np.float64) + + if factor is None: + lum_offset = 0.5 + np.random.uniform() + else: + lum_offset = factor + + hsv[..., 2] = hsv[..., 2] * lum_offset + hsv[..., 2][hsv[..., 2] > 255] = 255 + hsv = hsv.astype(np.uint8) + + return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) + + +def aug_img(image): + ''' + This is the augment colors function, which we apply to + augment the colors of an given image. + :param image --> the input image we want to augment + + :return image --> the output colors augmented image + ''' + hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) + hsv = hsv.astype(np.float64) + + hue_offset = 0.8 + 0.4*np.random.uniform() + sat_offset = 0.5 + np.random.uniform() + lum_offset = 0.5 + np.random.uniform() + + hsv[..., 0] = hsv[..., 0] * hue_offset + hsv[..., 1] = hsv[..., 1] * sat_offset + hsv[..., 2] = hsv[..., 2] * lum_offset + + hsv[..., 0][hsv[..., 0] > 255] = 255 + hsv[..., 1][hsv[..., 1] > 255] = 255 + hsv[..., 2][hsv[..., 2] > 255] = 255 + + hsv = hsv.astype(np.uint8) + + return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) + + +def train_generator(imgs, mask, edge=None, + scale_range=None, + padding=padding[1], + input_size=input_shape[0], + output_size=output_shape[0], + skip_empty=False): + ''' + This is the train generator function, which generates the train dataset. + :param imgs --> the input images + :param mask --> the input masks + :param edge --> the input edges if there are any (red blood cells only) + :param scale_range --> the factor (i, j) of rescaling. + :param padding --> the padding which will be applied to each image + :param input_size --> the input shape + :param output_size --> the output shape + :param skip_empty --> skip empty chips (random if not set) + + :return chips --> yields an image, mask and edge chip each time it gets executed (called) + ''' + if scale_range is not None: + scale_range = [1 - scale_range, 1 + scale_range] + while True: + # select which type of cell to return + chip_type = np.random.choice([True, False]) + + while True: + # pick random image + i = np.random.randint(len(imgs)) + + # pick random central location in the image (200 + 196/2) + center_offset = padding + (output_size / 2) + x = np.random.randint(center_offset, imgs[i].shape[0] - center_offset) + y = np.random.randint(center_offset, imgs[i].shape[1] - center_offset) + + # scale the box randomly from x0.8 - 1.2x original size + scale = 1 + if scale_range is not None: + scale = scale_range[0] + ((scale_range[0] - scale_range[0]) * np.random.random()) + + # find the edges of a box around the image chip and the mask chip + chip_x_l = int(x - ((input_size / 2) * scale)) + chip_x_r = int(x + ((input_size / 2) * scale)) + chip_y_l = int(y - ((input_size / 2) * scale)) + chip_y_r = int(y + ((input_size / 2) * scale)) + + mask_x_l = int(x - ((output_size / 2) * scale)) + mask_x_r = int(x + ((output_size / 2) * scale)) + mask_y_l = int(y - ((output_size / 2) * scale)) + mask_y_r = int(y + ((output_size / 2) * scale)) + + # take a slice of the image and mask accordingly + temp_chip = imgs[i][chip_x_l:chip_x_r, chip_y_l:chip_y_r] + temp_mask = mask[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r] + if edge is not None: + temp_edge = edge[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r] + + if skip_empty: + if ((temp_mask > 0).sum() > 5) is chip_type: + continue + + # resize the image chip back to 380 and the mask chip to 196 + temp_chip = cv2.resize(temp_chip, + (input_size, input_size), + interpolation=cv2.INTER_CUBIC) + temp_mask = cv2.resize(temp_mask, + (output_size, output_size), + interpolation=cv2.INTER_NEAREST) + if edge is not None: + temp_edge = cv2.resize(temp_edge, + (output_size, output_size), + interpolation=cv2.INTER_NEAREST) + + # randomly rotate (like below) + rot = np.random.randint(4) + temp_chip = np.rot90(temp_chip, k=rot, axes=(0, 1)) + temp_mask = np.rot90(temp_mask, k=rot, axes=(0, 1)) + if edge is not None: + temp_edge = np.rot90(temp_edge, k=rot, axes=(0, 1)) + + # randomly flip + if np.random.random() > 0.5: + temp_chip = np.flip(temp_chip, axis=1) + temp_mask = np.flip(temp_mask, axis=1) + if edge is not None: + temp_edge = np.flip(temp_edge, axis=1) + + # randomly luminosity augment + temp_chip = aug_lum(temp_chip) + + # randomly augment chip + temp_chip = aug_img(temp_chip) + + # rescale the image + temp_chip = temp_chip.astype(np.float32) * 2 + temp_chip /= 255 + temp_chip -= 1 + + # later on ... randomly adjust colours + if edge is not None: + yield temp_chip, ((temp_mask > 0).astype(float)[..., np.newaxis], + (temp_edge > 0).astype(float)[..., np.newaxis]) + else: + yield temp_chip, ((temp_mask > 0).astype(float)[..., np.newaxis]) + break + + +def test_chips(imgs, mask, + edge=None, + padding=padding[1], + input_size=input_shape[0], + output_size=output_shape[0]): + ''' + This is the test chips function, which generates the test dataset. + :param imgs --> the input images + :param mask --> the input masks + :param edge --> the input edges if there are any (red blood cells only) + :param padding --> the padding which will be applied to each image + :param input_size --> the input shape + :param output_size --> the output shape + + :return chips --> yields an image, mask and edge chip each time it gets executed (called) + ''' + center_offset = padding + (output_size / 2) + for i, _ in enumerate(imgs): + for x in np.arange(center_offset, imgs[i].shape[0] - input_size / 2, output_size): + for y in np.arange(center_offset, imgs[i].shape[1] - input_size / 2, output_size): + chip_x_l = int(x - (input_size / 2)) + chip_x_r = int(x + (input_size / 2)) + chip_y_l = int(y - (input_size / 2)) + chip_y_r = int(y + (input_size / 2)) + + mask_x_l = int(x - (output_size / 2)) + mask_x_r = int(x + (output_size / 2)) + mask_y_l = int(y - (output_size / 2)) + mask_y_r = int(y + (output_size / 2)) + + temp_chip = imgs[i][chip_x_l:chip_x_r, chip_y_l:chip_y_r] + temp_mask = mask[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r] + if edge is not None: + temp_edge = edge[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r] + + temp_chip = temp_chip.astype(np.float32) * 2 + temp_chip /= 255 + temp_chip -= 1 + + if edge is not None: + yield temp_chip, ((temp_mask > 0).astype(float)[..., np.newaxis], + (temp_edge > 0).astype(float)[..., np.newaxis]) + else: + yield temp_chip, ((temp_mask > 0).astype(float)[..., np.newaxis]) + break + + +def slice_image(imgs, + padding=padding[1], + input_size=input_shape[0], + output_size=output_shape[0]): + ''' + This is the slice function, which slices each image into image chips. + :param imgs --> the input images + :param padding --> the padding which will be applied to each image + :param input_size --> the input shape + :param output_size --> the output shape + + :return list tuple (list, list, list) --> the tuple list of output (image, mask and edge chips) + ''' + img_chips = [] + + center_offset = padding + (output_size / 2) + for i, _ in enumerate(imgs): + for x in np.arange(center_offset, imgs[i].shape[0] - input_size / 2, output_size): + for y in np.arange(center_offset, imgs[i].shape[1] - input_size / 2, output_size): + chip_x_l = int(x - (input_size / 2)) + chip_x_r = int(x + (input_size / 2)) + chip_y_l = int(y - (input_size / 2)) + chip_y_r = int(y + (input_size / 2)) + + temp_chip = imgs[i][chip_x_l:chip_x_r, chip_y_l:chip_y_r] + + temp_chip = temp_chip.astype(np.float32) * 2 + temp_chip /= 255 + temp_chip -= 1 + + img_chips += [temp_chip] + return np.array(img_chips) + + +def slice(imgs, mask, + edge=None, + padding=padding[1], + input_size=input_shape[0], + output_size=output_shape[0]): + ''' + This is the slice function, which slices each image into image chips. + :param imgs --> the input images + :param mask --> the input masks + :param edge --> the input edges if there are any (red blood cells only) + :param padding --> the padding which will be applied to each image + :param input_size --> the input shape + :param output_size --> the output shape + + :return list tuple (list, list, list) --> the tuple list of output (image, mask and edge chips) + ''' + img_chips = [] + mask_chips = [] + if edge is not None: + edge_chips = [] + + center_offset = padding + (output_size / 2) + for i, _ in enumerate(imgs): + for x in np.arange(center_offset, imgs[i].shape[0] - input_size / 2, output_size): + for y in np.arange(center_offset, imgs[i].shape[1] - input_size / 2, output_size): + chip_x_l = int(x - (input_size / 2)) + chip_x_r = int(x + (input_size / 2)) + chip_y_l = int(y - (input_size / 2)) + chip_y_r = int(y + (input_size / 2)) + + mask_x_l = int(x - (output_size / 2)) + mask_x_r = int(x + (output_size / 2)) + mask_y_l = int(y - (output_size / 2)) + mask_y_r = int(y + (output_size / 2)) + + temp_chip = imgs[i][chip_x_l:chip_x_r, chip_y_l:chip_y_r] + temp_mask = mask[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r] + if edge is not None: + temp_edge = edge[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r] + + temp_chip = temp_chip.astype(np.float32) * 2 + temp_chip /= 255 + temp_chip -= 1 + + img_chips += [temp_chip] + mask_chips += [(temp_mask > 0).astype(float)[..., np.newaxis]] + if edge is not None: + edge_chips += [(temp_edge > 0).astype(float)[..., np.newaxis]] + + img_chips = np.array(img_chips) + mask_chips = np.array(mask_chips) + if edge is not None: + edge_chips = np.array(edge_chips) + + if edge is not None: + return img_chips, mask_chips, edge_chips + + return img_chips, mask_chips + + +def generator(img_list, mask_list, edge_list=None, type='train'): + ''' + This is the generator function, which provides the list of image, mask and edge lists to the train generator and test chips functions. + :param img_list --> the input list of images + :param mask_list --> the input list of masks + :param edge_list --> the input list of edges if there are any + :param type --> can be either train or test, used to determine which generator function is to be called + + :return tensorflow dataset --> the output generated functions fed to tensorflow + ''' + if cell_type == 'rbc': + img, mask, edge = load_data(img_list, mask_list, edge_list) + elif cell_type == 'wbc' or cell_type == 'plt': + img, mask = load_data(img_list, mask_list) + edge = None + + def gen(): + if type == 'train': + return train_generator(img, mask, edge, + padding=padding[0], + input_size=input_shape[0], + output_size=output_shape[0]) + elif type == 'test': + return test_chips(img, mask, edge, + padding=padding[0], + input_size=input_shape[0], + output_size=output_shape[0]) + + # load train dataset to tensorflow for training + if cell_type == 'rbc': + return tf.data.Dataset.from_generator( + gen, + (tf.float64, ((tf.float64), (tf.float64))), + (input_shape, (output_shape, output_shape)) + ) + elif cell_type == 'wbc' or cell_type == 'plt': + return tf.data.Dataset.from_generator( + gen, + (tf.float64, (tf.float64)), + (input_shape, (output_shape)) + )