Diff of /bc-count/data.py [000000] .. [0be6a8]

Switch to side-by-side view

--- a
+++ b/bc-count/data.py
@@ -0,0 +1,476 @@
+##############################################
+#                                            #
+#           Custom data generator            #
+#                                            #
+# Author: Amine Neggazi                      #
+# Email: neggazimedlamine@gmail/com          #
+# Nick: nemo256                              #
+#                                            #
+# Please read bc-count/LICENSE               #
+#                                            #
+##############################################
+
+import os
+import json
+
+import cv2
+import numpy as np
+import tensorflow as tf
+from tensorflow import keras
+
+# custom imports
+from config import *
+
+
+def load_image_list(img_files, gray=False):
+    '''
+    This is the load image list function, which loads an enumerate
+    of images (param: img_files)
+    :param img_files --> the input image files which we want to read
+
+    :return imgs --> the images that we read
+    '''
+    imgs = []
+    if gray:
+        for image_file in img_files:
+            img = cv2.imread(image_file, cv2.IMREAD_GRAYSCALE)
+            img = cv2.threshold(img, 127, 255, cv2.THRESH_BINARY)[1]
+            imgs += [img]
+
+    else:
+        for image_file in img_files:
+            imgs += [cv2.imread(image_file)]
+    return imgs
+
+
+def clahe_images(img_list):
+    '''
+    This is the clahe images function, which applies a clahe threshold
+    the input image list.
+    :param img_files --> the input image files which we want to read
+
+    :return img_list --> the output images
+    '''
+    for i, img in enumerate(img_list):
+        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
+
+        lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
+        lab[..., 0] = clahe.apply(lab[..., 0])
+        img_list[i] = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
+    return img_list
+
+
+def preprocess_image(imgs, padding=padding[1]):
+    '''
+    This is the preprocess data function, which adds a padding to 
+    the input images, masks and edges if there are any.
+    :param imgs --> the input list of images.
+    :param padding --> the input padding which is going to be applied.
+
+    :return imgs --> output images with added padding.
+    '''
+    imgs = [np.pad(img, ((padding, padding),
+                         (padding, padding), (0, 0)), mode='constant') for img in imgs]
+    return imgs
+
+
+def preprocess_data(imgs, mask, edge=None, padding=padding[1]):
+    '''
+    This is the preprocess data function, which adds a padding to 
+    the input images, masks and edges if there are any.
+    :param imgs --> the input list of images.
+    :param mask --> the input list of masks.
+    :param edge --> the input list of edges.
+    :param padding --> the input padding which is going to be applied.
+
+    :return tuple(imgs, mask, edge if exists) --> output images, masks and edges with padding added.
+    '''
+    imgs = [np.pad(img, ((padding, padding),
+                         (padding, padding), (0, 0)), mode='constant') for img in imgs]
+    mask = [np.pad(mask, ((padding, padding),
+                          (padding, padding)), mode='constant') for mask in mask]
+    if edge is not None:
+        edge = [np.pad(edge, ((padding, padding),
+                              (padding, padding)), mode='constant') for edge in edge]
+
+    if edge is not None:
+        return imgs, mask, edge
+
+    return imgs, mask
+
+
+def load_data(img_list, mask_list, edge_list=None, padding=padding[1]):
+    '''
+    This is the load data function, which will handle image loading and preprocessing.
+    :param img_list --> list of input images
+    :param mask_list --> list of input masks
+    :param edge_list --> list of input edges
+    :param padding --> padding to be applied on preprocessing
+
+    :return tuple(imgs, masks and edges if exists) --> the output preprocessed imgs, masks and edges.
+    '''
+    imgs = load_image_list(img_list)
+    imgs = clahe_images(imgs)
+
+    mask = load_image_list(mask_list, gray=True)
+    if edge_list:
+        edge = load_image_list(edge_list, gray=True)
+    else:
+        edge = None
+
+    return preprocess_data(imgs, mask, edge, padding=padding)
+
+
+def load_image(img_list, padding=padding[1]):
+    '''
+    This is the load data function, which will handle image loading and preprocessing.
+    :param img_list --> list of input images
+    :param padding --> padding to be applied on preprocessing
+
+    :return imgs --> the output preprocessed imgs.
+    '''
+    imgs = load_image_list(img_list)
+    imgs = clahe_images(imgs)
+    return preprocess_image(imgs, padding=padding)
+
+
+def aug_lum(image, factor=None):
+    '''
+    This is the augment luminosity function, which we apply to
+    augment the luminosity of an input image.
+    :param image --> the input image we want to augment
+    :param factor --> the factor of luminosity augment (default is 0.5 * random number)
+
+    :return image --> the output luminosity augmented image
+    '''
+    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
+    hsv = hsv.astype(np.float64)
+
+    if factor is None:
+        lum_offset = 0.5 + np.random.uniform()
+    else:
+        lum_offset = factor
+
+    hsv[..., 2] = hsv[..., 2] * lum_offset
+    hsv[..., 2][hsv[..., 2] > 255] = 255
+    hsv = hsv.astype(np.uint8)
+
+    return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
+
+
+def aug_img(image):
+    '''
+    This is the augment colors function, which we apply to
+    augment the colors of an given image.
+    :param image --> the input image we want to augment
+
+    :return image --> the output colors augmented image
+    '''
+    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
+    hsv = hsv.astype(np.float64)
+
+    hue_offset = 0.8 + 0.4*np.random.uniform()
+    sat_offset = 0.5 + np.random.uniform()
+    lum_offset = 0.5 + np.random.uniform()
+
+    hsv[..., 0] = hsv[..., 0] * hue_offset
+    hsv[..., 1] = hsv[..., 1] * sat_offset
+    hsv[..., 2] = hsv[..., 2] * lum_offset
+
+    hsv[..., 0][hsv[..., 0] > 255] = 255
+    hsv[..., 1][hsv[..., 1] > 255] = 255
+    hsv[..., 2][hsv[..., 2] > 255] = 255
+
+    hsv = hsv.astype(np.uint8)
+
+    return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
+
+
+def train_generator(imgs, mask, edge=None,
+                    scale_range=None,
+                    padding=padding[1],
+                    input_size=input_shape[0],
+                    output_size=output_shape[0],
+                    skip_empty=False):
+    '''
+    This is the train generator function, which generates the train dataset.
+    :param imgs --> the input images
+    :param mask --> the input masks
+    :param edge --> the input edges if there are any (red blood cells only)
+    :param scale_range --> the factor (i, j) of rescaling.
+    :param padding --> the padding which will be applied to each image
+    :param input_size --> the input shape
+    :param output_size --> the output shape
+    :param skip_empty --> skip empty chips (random if not set)
+
+    :return chips --> yields an image, mask and edge chip each time it gets executed (called)
+    '''
+    if scale_range is not None:
+        scale_range = [1 - scale_range, 1 + scale_range]
+    while True:
+        # select which type of cell to return
+        chip_type = np.random.choice([True, False])
+
+        while True:
+            # pick random image
+            i = np.random.randint(len(imgs))
+
+            # pick random central location in the image (200 + 196/2)
+            center_offset = padding + (output_size / 2)
+            x = np.random.randint(center_offset, imgs[i].shape[0] - center_offset)
+            y = np.random.randint(center_offset, imgs[i].shape[1] - center_offset)
+
+            # scale the box randomly from x0.8 - 1.2x original size
+            scale = 1
+            if scale_range is not None:
+                scale = scale_range[0] + ((scale_range[0] - scale_range[0]) * np.random.random())
+
+            # find the edges of a box around the image chip and the mask chip
+            chip_x_l = int(x - ((input_size / 2) * scale))
+            chip_x_r = int(x + ((input_size / 2) * scale))
+            chip_y_l = int(y - ((input_size / 2) * scale))
+            chip_y_r = int(y + ((input_size / 2) * scale))
+
+            mask_x_l = int(x - ((output_size / 2) * scale))
+            mask_x_r = int(x + ((output_size / 2) * scale))
+            mask_y_l = int(y - ((output_size / 2) * scale))
+            mask_y_r = int(y + ((output_size / 2) * scale))
+
+            # take a slice of the image and mask accordingly
+            temp_chip = imgs[i][chip_x_l:chip_x_r, chip_y_l:chip_y_r]
+            temp_mask = mask[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r]
+            if edge is not None:
+                temp_edge = edge[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r]
+
+            if skip_empty:
+                if ((temp_mask > 0).sum() > 5) is chip_type:
+                    continue
+
+            # resize the image chip back to 380 and the mask chip to 196
+            temp_chip = cv2.resize(temp_chip,
+                                   (input_size, input_size),
+                                   interpolation=cv2.INTER_CUBIC)
+            temp_mask = cv2.resize(temp_mask,
+                                   (output_size, output_size),
+                                   interpolation=cv2.INTER_NEAREST)
+            if edge is not None:
+                temp_edge = cv2.resize(temp_edge,
+                                       (output_size, output_size),
+                                       interpolation=cv2.INTER_NEAREST)
+
+            # randomly rotate (like below)
+            rot = np.random.randint(4)
+            temp_chip = np.rot90(temp_chip, k=rot, axes=(0, 1))
+            temp_mask = np.rot90(temp_mask, k=rot, axes=(0, 1))
+            if edge is not None:
+                temp_edge = np.rot90(temp_edge, k=rot, axes=(0, 1))
+
+            # randomly flip
+            if np.random.random() > 0.5:
+                temp_chip = np.flip(temp_chip, axis=1)
+                temp_mask = np.flip(temp_mask, axis=1)
+                if edge is not None:
+                    temp_edge = np.flip(temp_edge, axis=1)
+
+            # randomly luminosity augment
+            temp_chip = aug_lum(temp_chip)
+
+            # randomly augment chip
+            temp_chip = aug_img(temp_chip)
+
+            # rescale the image
+            temp_chip = temp_chip.astype(np.float32) * 2
+            temp_chip /= 255
+            temp_chip -= 1
+
+            # later on ... randomly adjust colours
+            if edge is not None:
+                yield temp_chip, ((temp_mask > 0).astype(float)[..., np.newaxis], 
+                                  (temp_edge > 0).astype(float)[..., np.newaxis])
+            else:
+                yield temp_chip, ((temp_mask > 0).astype(float)[..., np.newaxis])
+            break
+
+
+def test_chips(imgs, mask,
+               edge=None,
+               padding=padding[1],
+               input_size=input_shape[0],
+               output_size=output_shape[0]):
+    '''
+    This is the test chips function, which generates the test dataset.
+    :param imgs --> the input images
+    :param mask --> the input masks
+    :param edge --> the input edges if there are any (red blood cells only)
+    :param padding --> the padding which will be applied to each image
+    :param input_size --> the input shape
+    :param output_size --> the output shape
+
+    :return chips --> yields an image, mask and edge chip each time it gets executed (called)
+    '''
+    center_offset = padding + (output_size / 2)
+    for i, _ in enumerate(imgs):
+        for x in np.arange(center_offset, imgs[i].shape[0] - input_size / 2, output_size):
+            for y in np.arange(center_offset, imgs[i].shape[1] - input_size / 2, output_size):
+                chip_x_l = int(x - (input_size / 2))
+                chip_x_r = int(x + (input_size / 2))
+                chip_y_l = int(y - (input_size / 2))
+                chip_y_r = int(y + (input_size / 2))
+
+                mask_x_l = int(x - (output_size / 2))
+                mask_x_r = int(x + (output_size / 2))
+                mask_y_l = int(y - (output_size / 2))
+                mask_y_r = int(y + (output_size / 2))
+
+                temp_chip = imgs[i][chip_x_l:chip_x_r, chip_y_l:chip_y_r]
+                temp_mask = mask[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r]
+                if edge is not None:
+                    temp_edge = edge[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r]
+
+                temp_chip = temp_chip.astype(np.float32) * 2
+                temp_chip /= 255
+                temp_chip -= 1
+
+                if edge is not None:
+                    yield temp_chip, ((temp_mask > 0).astype(float)[..., np.newaxis], 
+                                      (temp_edge > 0).astype(float)[..., np.newaxis])
+                else:
+                    yield temp_chip, ((temp_mask > 0).astype(float)[..., np.newaxis])
+                break
+
+
+def slice_image(imgs,
+                padding=padding[1],
+                input_size=input_shape[0],
+                output_size=output_shape[0]):
+    '''
+    This is the slice function, which slices each image into image chips.
+    :param imgs --> the input images
+    :param padding --> the padding which will be applied to each image
+    :param input_size --> the input shape
+    :param output_size --> the output shape
+
+    :return list tuple (list, list, list) --> the tuple list of output (image, mask and edge chips)
+    '''
+    img_chips = []
+
+    center_offset = padding + (output_size / 2)
+    for i, _ in enumerate(imgs):
+        for x in np.arange(center_offset, imgs[i].shape[0] - input_size / 2, output_size):
+            for y in np.arange(center_offset, imgs[i].shape[1] - input_size / 2, output_size):
+                chip_x_l = int(x - (input_size / 2))
+                chip_x_r = int(x + (input_size / 2))
+                chip_y_l = int(y - (input_size / 2))
+                chip_y_r = int(y + (input_size / 2))
+
+                temp_chip = imgs[i][chip_x_l:chip_x_r, chip_y_l:chip_y_r]
+
+                temp_chip = temp_chip.astype(np.float32) * 2
+                temp_chip /= 255
+                temp_chip -= 1
+
+                img_chips += [temp_chip]
+    return np.array(img_chips)
+
+
+def slice(imgs, mask,
+          edge=None,
+          padding=padding[1],
+          input_size=input_shape[0],
+          output_size=output_shape[0]):
+    '''
+    This is the slice function, which slices each image into image chips.
+    :param imgs --> the input images
+    :param mask --> the input masks
+    :param edge --> the input edges if there are any (red blood cells only)
+    :param padding --> the padding which will be applied to each image
+    :param input_size --> the input shape
+    :param output_size --> the output shape
+
+    :return list tuple (list, list, list) --> the tuple list of output (image, mask and edge chips)
+    '''
+    img_chips = []
+    mask_chips = []
+    if edge is not None:
+        edge_chips = []
+
+    center_offset = padding + (output_size / 2)
+    for i, _ in enumerate(imgs):
+        for x in np.arange(center_offset, imgs[i].shape[0] - input_size / 2, output_size):
+            for y in np.arange(center_offset, imgs[i].shape[1] - input_size / 2, output_size):
+                chip_x_l = int(x - (input_size / 2))
+                chip_x_r = int(x + (input_size / 2))
+                chip_y_l = int(y - (input_size / 2))
+                chip_y_r = int(y + (input_size / 2))
+
+                mask_x_l = int(x - (output_size / 2))
+                mask_x_r = int(x + (output_size / 2))
+                mask_y_l = int(y - (output_size / 2))
+                mask_y_r = int(y + (output_size / 2))
+
+                temp_chip = imgs[i][chip_x_l:chip_x_r, chip_y_l:chip_y_r]
+                temp_mask = mask[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r]
+                if edge is not None:
+                    temp_edge = edge[i][mask_x_l:mask_x_r, mask_y_l:mask_y_r]
+
+                temp_chip = temp_chip.astype(np.float32) * 2
+                temp_chip /= 255
+                temp_chip -= 1
+
+                img_chips += [temp_chip]
+                mask_chips += [(temp_mask > 0).astype(float)[..., np.newaxis]]
+                if edge is not None:
+                    edge_chips += [(temp_edge > 0).astype(float)[..., np.newaxis]]
+
+    img_chips = np.array(img_chips)
+    mask_chips = np.array(mask_chips)
+    if edge is not None:
+        edge_chips = np.array(edge_chips)
+
+    if edge is not None:
+        return img_chips, mask_chips, edge_chips
+
+    return img_chips, mask_chips
+
+
+def generator(img_list, mask_list, edge_list=None, type='train'):
+    '''
+    This is the generator function, which provides the list of image, mask and edge lists to the train generator and test chips functions.
+    :param img_list --> the input list of images
+    :param mask_list --> the input list of masks
+    :param edge_list --> the input list of edges if there are any
+    :param type --> can be either train or test, used to determine which generator function is to be called
+
+    :return tensorflow dataset --> the output generated functions fed to tensorflow
+    '''
+    if cell_type == 'rbc':
+        img, mask, edge = load_data(img_list, mask_list, edge_list)
+    elif cell_type == 'wbc' or cell_type == 'plt':
+        img, mask = load_data(img_list, mask_list)
+        edge = None
+
+    def gen():
+        if type == 'train':
+            return train_generator(img, mask, edge,
+                                   padding=padding[0],
+                                   input_size=input_shape[0],
+                                   output_size=output_shape[0])
+        elif type == 'test':
+            return test_chips(img, mask, edge,
+                              padding=padding[0],
+                              input_size=input_shape[0],
+                              output_size=output_shape[0])
+
+    # load train dataset to tensorflow for training
+    if cell_type == 'rbc':
+        return tf.data.Dataset.from_generator(
+            gen,
+            (tf.float64, ((tf.float64), (tf.float64))),
+            (input_shape, (output_shape, output_shape))
+        )
+    elif cell_type == 'wbc' or cell_type == 'plt':
+        return tf.data.Dataset.from_generator(
+            gen,
+            (tf.float64, (tf.float64)),
+            (input_shape, (output_shape))
+        )