Diff of /modules/DL_utils.py [000000] .. [1a9b40]

Switch to side-by-side view

--- a
+++ b/modules/DL_utils.py
@@ -0,0 +1,603 @@
+# Utility functions for deep learning with Keras
+# Dr. Tirthajyoti Sarkar, Fremont, CA 94536
+# ==============================================
+
+# NOTES
+# Used tf.keras in general except in special functions where older/independent Keras has been used.
+
+import tensorflow as tf
+import numpy as np
+import matplotlib.pyplot as plt
+
+
+class myCallback(tf.keras.callbacks.Callback):
+    """
+  User can pass on the desired accuracy threshold while creating an instance of the class
+  """
+
+    def __init__(self, acc_threshold=0.9, print_msg=True):
+        self.acc_threshold = acc_threshold
+        self.print_msg = print_msg
+
+    def on_epoch_end(self, epoch, logs={}):
+        if logs.get("acc") > self.acc_threshold:
+            if self.print_msg:
+                print(
+                    "\nReached {}% accuracy so cancelling the training!".format(
+                        self.acc_threshold
+                    )
+                )
+            self.model.stop_training = True
+        else:
+            if self.print_msg:
+                print("\nAccuracy not high enough. Starting another epoch...\n")
+
+    def build_classification_model(
+        num_layers=1,
+        architecture=[32],
+        act_func="relu",
+        input_shape=(28, 28),
+        output_class=10,
+    ):
+        """
+  Builds a densely connected neural network model from user input
+  
+  Arguments
+          num_layers: Number of hidden layers
+          architecture: Architecture of the hidden layers (densely connected)
+          act_func: Activation function. Could be 'relu', 'sigmoid', or 'tanh'.
+          input_shape: Dimension of the input vector
+          output_class: Number of classes in the output vector
+  Returns
+          A neural net (Keras) model for classification
+  """
+        layers = [tf.keras.layers.Flatten(input_shape=input_shape)]
+        if act_func == "relu":
+            activation = tf.nn.relu
+        elif act_func == "sigmoid":
+            activation = tf.nn.sigmoid
+        elif act_func == "tanh":
+            activation = tf.nn.tanh
+
+        for i in range(num_layers):
+            layers.append(tf.keras.layers.Dense(architecture[i], activation=tf.nn.relu))
+        layers.append(tf.keras.layers.Dense(output_class, activation=tf.nn.softmax))
+
+        model = tf.keras.models.Sequential(layers)
+        return model
+
+
+def build_regression_model(
+    input_neurons=10, input_dim=1, num_layers=1, architecture=[32], act_func="relu"
+):
+    """
+  Builds a densely connected neural network model from user input
+  
+  Arguments
+          num_layers: Number of hidden layers
+          architecture: Architecture of the hidden layers (densely connected)
+          act_func: Activation function. Could be 'relu', 'sigmoid', or 'tanh'.
+          input_shape: Dimension of the input vector
+  Returns
+          A neural net (Keras) model for regression
+  """
+    if act_func == "relu":
+        activation = tf.nn.relu
+    elif act_func == "sigmoid":
+        activation = tf.nn.sigmoid
+    elif act_func == "tanh":
+        activation = tf.nn.tanh
+
+    layers = [
+        tf.keras.layers.Dense(input_neurons, input_dim=input_dim, activation=activation)
+    ]
+
+    for i in range(num_layers):
+        layers.append(tf.keras.layers.Dense(architecture[i], activation=activation))
+    layers.append(tf.keras.layers.Dense(1))
+
+    model = tf.keras.models.Sequential(layers)
+    return model
+
+
+def compile_train_classification_model(
+    model,
+    x_train,
+    y_train,
+    callbacks=None,
+    learning_rate=0.001,
+    batch_size=1,
+    epochs=10,
+    verbose=0,
+):
+    """
+  Compiles and trains a given Keras model with the given data. 
+  Assumes Adam optimizer for this implementation.
+  Assumes categorical cross-entropy loss.
+  
+  Arguments
+          learning_rate: Learning rate for the optimizer Adam
+          batch_size: Batch size for the mini-batch optimization
+          epochs: Number of epochs to train
+          verbose: Verbosity of the training process
+  
+  Returns
+  A copy of the model
+  """
+
+    model_copy = model
+    model_copy.compile(
+        optimizer=tf.keras.optimizers.Adam(lr=learning_rate),
+        loss="sparse_categorical_crossentropy",
+        metrics=["accuracy"],
+    )
+
+    if callbacks != None:
+        model_copy.fit(
+            x_train,
+            y_train,
+            epochs=epochs,
+            batch_size=batch_size,
+            callbacks=[callbacks],
+            verbose=verbose,
+        )
+    else:
+        model_copy.fit(
+            x_train, y_train, epochs=epochs, batch_size=batch_size, verbose=verbose
+        )
+    return model_copy
+
+
+def compile_train_regression_model(
+    model,
+    x_train,
+    y_train,
+    callbacks=None,
+    learning_rate=0.001,
+    batch_size=1,
+    epochs=10,
+    verbose=0,
+):
+    """
+  Compiles and trains a given Keras model with the given data for regression. 
+  Assumes Adam optimizer for this implementation.
+  Assumes mean-squared-error loss
+  
+  Arguments
+          learning_rate: Learning rate for the optimizer Adam
+          batch_size: Batch size for the mini-batch operation
+          epochs: Number of epochs to train
+          verbose: Verbosity of the training process
+  
+  Returns
+  A copy of the model
+  """
+
+    model_copy = model
+    model_copy.compile(
+        optimizer=tf.keras.optimizers.Adam(lr=learning_rate),
+        loss="mean_squared_error",
+        metrics=["accuracy"],
+    )
+    if callbacks != None:
+        model_copy.fit(
+            x_train,
+            y_train,
+            epochs=epochs,
+            batch_size=batch_size,
+            callbacks=[callbacks],
+            verbose=verbose,
+        )
+    else:
+        model_copy.fit(
+            x_train, y_train, epochs=epochs, batch_size=batch_size, verbose=verbose
+        )
+    return model_copy
+
+
+def plot_loss_acc(model, target_acc=0.9, title=None):
+    """
+  Takes a Keras model and plots the loss and accuracy over epochs.
+  The same plot shows loss and accuracy on two axes - left and right (with separate scales)
+  Users can supply a title if desired
+  Arguments:
+            target_acc (optional): The desired/ target acc for the function to show a horizontal bar.
+            title (optional): A Python string object to show as the plot's title
+  """
+    e = (
+        np.array(model.history.epoch) + 1
+    )  # Add one to the list of epochs which is zero-indexed
+    # Check to see if loss metric is in the model history
+    assert (
+        "loss" in model.history.history.keys()
+    ), "No loss metric found in the model history"
+    l = np.array(model.history.history["loss"])
+    # Check to see if loss metric is in the model history
+    assert (
+        "acc" in model.history.history.keys()
+    ), "No accuracy metric found in the model history"
+    a = np.array(model.history.history["acc"])
+
+    fig, ax1 = plt.subplots()
+
+    color = "tab:red"
+    ax1.set_xlabel("Epochs", fontsize=15)
+    ax1.set_ylabel("Loss", color=color, fontsize=15)
+    ax1.plot(e, l, color=color, lw=2)
+    ax1.tick_params(axis="y", labelcolor=color)
+    ax1.grid(True)
+
+    ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis
+
+    color = "tab:blue"
+    ax2.set_ylabel(
+        "Accuracy", color=color, fontsize=15
+    )  # we already handled the x-label with ax1
+    ax2.plot(e, a, color=color, lw=2)
+    ax2.tick_params(axis="y", labelcolor=color)
+
+    fig.tight_layout()  # otherwise the right y-label is slightly clipped
+    if title != None:
+        plt.title(title)
+    plt.hlines(
+        y=target_acc, xmin=1, xmax=e.max(), colors="k", linestyles="dashed", lw=3
+    )
+    plt.show()
+
+
+def plot_train_val_acc(model, target_acc=0.9, title=None):
+    """
+  Takes a Keras model and plots the training and validation set accuracy over epochs.
+  The same plot shows both the accuracies on two axes - left and right (with separate scales)
+  Users can supply a title if desired
+  Arguments:
+            target_acc (optional): The desired/ target acc for the function to show a horizontal bar.
+            title (optional): A Python string object to show as the plot's title
+  """
+    e = (
+        np.array(model.history.epoch) + 1
+    )  # Add one to the list of epochs which is zero-indexed
+    # Check to see if loss metric is in the model history
+    assert (
+        "acc" in model.history.history.keys()
+    ), "No accuracy metric found in the model history"
+    a = np.array(model.history.history["acc"])
+    # Check to see if loss metric is in the model history
+    assert (
+        "val_acc" in model.history.history.keys()
+    ), "No validation accuracy metric found in the model history"
+    va = np.array(model.history.history["val_acc"])
+
+    fig, ax1 = plt.subplots()
+
+    color = "tab:red"
+    ax1.set_xlabel("Epochs", fontsize=15)
+    ax1.set_ylabel("Training accuracy", color=color, fontsize=15)
+    ax1.plot(e, a, color=color, lw=2)
+    ax1.tick_params(axis="y", labelcolor=color)
+    ax1.grid(True)
+
+    ax2 = ax1.twinx()  # instantiate a second axes that shares the same x-axis
+
+    color = "tab:blue"
+    ax2.set_ylabel(
+        "Validation accuracy", color=color, fontsize=15
+    )  # we already handled the x-label with ax1
+    ax2.plot(e, va, color=color, lw=2)
+    ax2.tick_params(axis="y", labelcolor=color)
+
+    fig.tight_layout()  # otherwise the right y-label is slightly clipped
+    if title != None:
+        plt.title(title)
+
+    plt.hlines(
+        y=target_acc, xmin=1, xmax=e.max(), colors="k", linestyles="dashed", lw=3
+    )
+
+    plt.show()
+
+
+def train_CNN(
+    train_directory,
+    target_size=(256, 256),
+    callbacks=None,
+    classes=None,
+    batch_size=128,
+    num_classes=2,
+    num_epochs=20,
+    verbose=0,
+):
+    """
+    Trains a conv net for a given dataset contained within a training directory.
+    Users can just supply the path of the training directory and get back a fully trained, 5-layer, convolutional network.
+    
+    Arguments:
+            train_directory: The directory where the training images are stored in separate folders.
+                            These folders should be named as per the classes.
+            target_size: Target size for the training images. A tuple e.g. (200,200)
+            classes: A Python list with the classes 
+            batch_size: Batch size for training
+            num_epochs: Number of epochs for training
+            num_classes: Number of output classes to consider
+            verbose: Verbosity level of the training, passed on to the `fit_generator` method
+    Returns:
+            A trained conv net model
+    
+    """
+    from tensorflow.keras.preprocessing.image import ImageDataGenerator
+    import tensorflow as tf
+    from tensorflow.keras.optimizers import RMSprop
+
+    # ImageDataGenerator object instance with scaling
+    train_datagen = ImageDataGenerator(rescale=1 / 255)
+
+    # Flow training images in batches using the generator
+    train_generator = train_datagen.flow_from_directory(
+        train_directory,  # This is the source directory for training images
+        target_size=target_size,  # All images will be resized to 200 x 200
+        batch_size=batch_size,
+        # Specify the classes explicitly
+        classes=classes,
+        # Since we use categorical_crossentropy loss, we need categorical labels
+        class_mode="categorical",
+    )
+
+    input_shape = tuple(list(target_size) + [3])
+
+    # Model architecture
+    model = tf.keras.models.Sequential(
+        [
+            # Note the input shape is the desired size of the image 200x 200 with 3 bytes color
+            # The first convolution
+            tf.keras.layers.Conv2D(
+                16, (3, 3), activation="relu", input_shape=input_shape
+            ),
+            tf.keras.layers.MaxPooling2D(2, 2),
+            # The second convolution
+            tf.keras.layers.Conv2D(32, (3, 3), activation="relu"),
+            tf.keras.layers.MaxPooling2D(2, 2),
+            # The third convolution
+            tf.keras.layers.Conv2D(64, (3, 3), activation="relu"),
+            tf.keras.layers.MaxPooling2D(2, 2),
+            # The fourth convolution
+            tf.keras.layers.Conv2D(64, (3, 3), activation="relu"),
+            tf.keras.layers.MaxPooling2D(2, 2),
+            # The fifth convolution
+            tf.keras.layers.Conv2D(64, (3, 3), activation="relu"),
+            tf.keras.layers.MaxPooling2D(2, 2),
+            # Flatten the results to feed into a dense layer
+            tf.keras.layers.Flatten(),
+            # 512 neuron in the fully-connected layer
+            tf.keras.layers.Dense(512, activation="relu"),
+            # Output neurons for `num_classes` classes with the softmax activation
+            tf.keras.layers.Dense(num_classes, activation="softmax"),
+        ]
+    )
+
+    # Optimizer and compilation
+    model.compile(
+        loss="categorical_crossentropy", optimizer=RMSprop(lr=0.001), metrics=["acc"]
+    )
+
+    # Total sample count
+    total_sample = train_generator.n
+
+    # Training
+    model.fit_generator(
+        train_generator,
+        callbacks=callbacks,
+        steps_per_epoch=int(total_sample / batch_size),
+        epochs=num_epochs,
+        verbose=verbose,
+    )
+
+    return model
+
+
+def train_CNN_keras(
+    train_directory,
+    target_size=(256, 256),
+    classes=None,
+    batch_size=128,
+    num_classes=2,
+    num_epochs=20,
+    verbose=0,
+):
+    """
+    Trains a conv net for a given dataset contained within a training directory.
+    Users can just supply the path of the training directory and get back a fully trained, 5-layer, convolutional network.
+    
+    Arguments:
+            train_directory: The directory where the training images are stored in separate folders.
+                            These folders should be named as per the classes.
+            target_size: Target size for the training images. A tuple e.g. (200,200)
+            classes: A Python list with the classes 
+            batch_size: Batch size for training
+            num_epochs: Number of epochs for training
+            num_classes: Number of output classes to consider
+            verbose: Verbosity level of the training, passed on to the `fit_generator` method
+    Returns:
+            A trained conv net model
+    
+    """
+    from keras.layers import Conv2D, MaxPooling2D
+    from keras.layers import Dense, Dropout, Flatten
+    from keras.models import Sequential
+    from keras.optimizers import RMSprop
+    from keras.preprocessing.image import ImageDataGenerator
+
+    # ImageDataGenerator object instance with scaling
+    train_datagen = ImageDataGenerator(rescale=1 / 255)
+
+    # Flow training images in batches using the generator
+    train_generator = train_datagen.flow_from_directory(
+        train_directory,  # This is the source directory for training images
+        target_size=target_size,  # All images will be resized to 200 x 200
+        batch_size=batch_size,
+        # Specify the classes explicitly
+        classes=classes,
+        # Since we use categorical_crossentropy loss, we need categorical labels
+        class_mode="categorical",
+    )
+
+    input_shape = tuple(list(target_size) + [3])
+
+    # Model architecture
+    model = Sequential(
+        [
+            # Note the input shape is the desired size of the image 200x 200 with 3 bytes color
+            # The first convolution
+            Conv2D(16, (3, 3), activation="relu", input_shape=input_shape),
+            MaxPooling2D(2, 2),
+            # The second convolution
+            Conv2D(32, (3, 3), activation="relu"),
+            MaxPooling2D(2, 2),
+            # The third convolution
+            Conv2D(64, (3, 3), activation="relu"),
+            MaxPooling2D(2, 2),
+            # The fourth convolution
+            Conv2D(64, (3, 3), activation="relu"),
+            MaxPooling2D(2, 2),
+            # The fifth convolution
+            Conv2D(64, (3, 3), activation="relu"),
+            MaxPooling2D(2, 2),
+            # Flatten the results to feed into a dense layer
+            Flatten(),
+            # 512 neuron in the fully-connected layer
+            Dense(512, activation="relu"),
+            # Output neurons for `num_classes` classes with the softmax activation
+            Dense(num_classes, activation="softmax"),
+        ]
+    )
+
+    # Optimizer and compilation
+    model.compile(
+        loss="categorical_crossentropy", optimizer=RMSprop(lr=0.001), metrics=["acc"]
+    )
+
+    # Total sample count
+    total_sample = train_generator.n
+
+    # Training
+    model.fit_generator(
+        train_generator,
+        steps_per_epoch=int(total_sample / batch_size),
+        epochs=num_epochs,
+        verbose=verbose,
+    )
+
+    return model
+
+
+def preprocess_image(img_path, model=None, rescale=255, resize=(256, 256)):
+    """
+    Preprocesses a given image for prediction with a trained model, with rescaling and resizing options
+    
+    Arguments:
+            img_path: The path to the image file
+            rescale: A float or integer indicating required rescaling. 
+                    The image array will be divided (scaled) by this number.
+            resize: A tuple indicating desired target size. 
+                    This should match the input shape as expected by the model
+    Returns:
+            img: A processed image.
+    """
+    from keras.preprocessing.image import img_to_array, load_img
+    import cv2
+    import numpy as np
+
+    assert type(img_path) == str, "Image path must be a string"
+    assert (
+        type(rescale) == int or type(rescale) == float
+    ), "Rescale factor must be either a float or int"
+    assert (
+        type(resize) == tuple and len(resize) == 2
+    ), "Resize target must be a tuple with two elements"
+
+#    img = load_img(img_path)    
+    img = load_img(img_path,grayscale=True)
+    img = img_to_array(img)
+    img = img / float(rescale)
+    img = cv2.resize(img, resize)
+    if model != None:
+        if len(model.input_shape) == 4:
+            img = np.expand_dims(img, axis=0)
+
+    return img
+
+
+def pred_prob_with_model(img_path, model, rescale=255, resize=(256, 256)):
+    """
+    Tests a given image with a trained model, with rescaling and resizing options
+    
+    Arguments:
+            img_path: The path to the image file
+            model: The trained Keras model
+            rescale: A float or integer indicating required rescaling. 
+                    The image array will be divided (scaled) by this number.
+            resize: A tuple indicating desired target size. 
+                    This should match the input shape as expected by the model
+    Returns:
+            pred: A prediction vector (Numpy array).
+                  Could be either classes or probabilities depending on the model.
+    """
+    from keras.preprocessing.image import img_to_array, load_img
+    import cv2
+
+    assert type(img_path) == str, "Image path must be a string"
+    assert (
+        type(rescale) == int or type(rescale) == float
+    ), "Rescale factor must be either a float or int"
+    assert (
+        type(resize) == tuple and len(resize) == 2
+    ), "Resize target must be a tuple with two elements"
+
+    img = load_img(img_path,grayscale=True)
+    img = img_to_array(img)
+    img = img / float(rescale)
+    img = cv2.resize(img, resize)
+    if len(model.input_shape) == 4:
+        img = np.expand_dims(img, axis=0)
+
+    pred = model.predict(img)
+
+    return pred
+
+
+def pred_class_with_model(img_path, model, rescale=255, resize=(256, 256)):
+    """
+    Tests a given image with a trained model, with rescaling and resizing options
+    
+    Arguments:
+            img_path: The path to the image file
+            model: The trained Keras model
+            rescale: A float or integer indicating required rescaling. 
+                    The image array will be divided (scaled) by this number.
+            resize: A tuple indicating desired target size. 
+                    This should match the input shape as expected by the model
+    Returns:
+            pred: A prediction vector (Numpy array).
+                  Could be either classes or probabilities depending on the model.
+    """
+    from keras.preprocessing.image import img_to_array, load_img
+    import cv2
+
+    assert type(img_path) == str, "Image path must be a string"
+    assert (
+        type(rescale) == int or type(rescale) == float
+    ), "Rescale factor must be either a float or int"
+    assert (
+        type(resize) == tuple and len(resize) == 2
+    ), "Resize target must be a tuple with two elements"
+
+    img = load_img(img_path)
+    img = img_to_array(img)
+    img = img / float(rescale)
+    img = cv2.resize(img, resize)
+    if len(model.input_shape) == 4:
+        img = np.expand_dims(img, axis=0)
+
+    pred = model.predict(img)
+    pred_class = pred.argmax(axis=-1)
+
+    return pred_class