# UNET SEGMENTATION (SUJAL)

In [None]:
## Imports
import os
import sys
import random

import numpy as np
import cv2
import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow import keras

## Seeding
seed = 42
random.seed = seed
np.random.seed = seed
tf.seed = seed

## Data 

In [None]:
    class DataGen(keras.utils.Sequence):
        def __init__(self, ids, path, batch_size=8, image_size=128):
            self.ids = ids
            self.path = path
            self.batch_size = batch_size
            self.image_size = image_size
            self.on_epoch_end()

        def __load__(self, id_name):
            # Construct image path
            image_path = os.path.join(self.path, "images", id_name)

            # Construct mask path
            mask_path = os.path.join(self.path, "masks", id_name)

            # Read and validate image
            image = cv2.imread(image_path, 1)
            if image is None:
                print(f"Warning: Failed to load image: {image_path}")
                return None, None

            image = cv2.resize(image, (self.image_size, self.image_size))

            # Read and validate mask
            mask = cv2.imread(mask_path, 0)  # mask often is a single-channel image
            if mask is None:
                print(f"Warning: Failed to load mask: {mask_path}")
                return None, None

            mask = cv2.resize(mask, (self.image_size, self.image_size))
            mask = np.expand_dims(mask, axis=-1)

            # Normalize
            image = image / 255.0
            mask = mask / 255.0

            return image, mask

        def __getitem__(self, index):
            if (index + 1) * self.batch_size > len(self.ids):
                self.batch_size = len(self.ids) - index * self.batch_size

            files_batch = self.ids[index * self.batch_size: (index + 1) * self.batch_size]

            image = []
            mask = []

            for id_name in files_batch:
                _img, _mask = self.__load__(id_name)
                if _img is not None and _mask is not None:  # Only add valid samples
                    image.append(_img)
                    mask.append(_mask)

            if len(image) == 0:  # Handle case where all files in the batch fail
                raise ValueError("No valid images or masks found in the current batch.")

            image = np.array(image)
            mask = np.array(mask)

            return image, mask

        def on_epoch_end(self):
            pass

        def __len__(self):
            return int(np.ceil(len(self.ids) / float(self.batch_size)))

## Hyperparameters

In [None]:
image_size = 128
train_path = "/Users/sandhyakilari/Desktop/Fall Semester 2024/Computer Vision/Segmentation Project"
epochs = 30
batch_size = 8

## Training Ids
train_ids = os.listdir(os.path.join(train_path, "images"))


## Validation Data Size
val_data_size = 10

valid_ids = train_ids[:val_data_size]
train_ids = train_ids[val_data_size:]

In [None]:
gen = DataGen(train_ids, train_path, batch_size=batch_size, image_size=image_size)
x, y = gen.__getitem__(0)
print(x.shape, y.shape)

In [None]:
r = random.randint(0, len(x)-1)

fig = plt.figure()
fig.subplots_adjust(hspace=0.4, wspace=0.4)
ax = fig.add_subplot(1, 2, 1)
ax.imshow(x[r])
ax = fig.add_subplot(1, 2, 2)
ax.imshow(np.reshape(y[r], (image_size, image_size)), cmap="gray")

## Different Convolutional Blocks

In [None]:
def down_block(x, filters, kernel_size=(3, 3), padding="same", strides=1):
    c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides, activation="relu")(x)
    c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides, activation="relu")(c)
    p = keras.layers.MaxPool2D((2, 2), (2, 2))(c)
    return c, p

def up_block(x, skip, filters, kernel_size=(3, 3), padding="same", strides=1):
    us = keras.layers.UpSampling2D((2, 2))(x)
    concat = keras.layers.Concatenate()([us, skip])
    c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides, activation="relu")(concat)
    c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides, activation="relu")(c)
    return c

def bottleneck(x, filters, kernel_size=(3, 3), padding="same", strides=1):
    c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides, activation="relu")(x)
    c = keras.layers.Conv2D(filters, kernel_size, padding=padding, strides=strides, activation="relu")(c)
    return c

## UNet Model

In [None]:
def UNet():
    f = [16, 32, 64, 128, 256]
    inputs = keras.layers.Input((image_size, image_size, 3))

    p0 = inputs
    c1, p1 = down_block(p0, f[0])  # 128 --> 64
    c2, p2 = down_block(p1, f[1])  # 64  --> 32
    c3, p3 = down_block(p2, f[2])  # 32  --> 16
    c4, p4 = down_block(p3, f[3])  # 16  --> 8

    bn = bottleneck(p4, f[4])

    u1 = up_block(bn, c4, f[3])  # 8  --> 16
    u2 = up_block(u1, c3, f[2])  # 16 --> 32
    u3 = up_block(u2, c2, f[1])  # 32 --> 64
    u4 = up_block(u3, c1, f[0])  # 64 --> 128

    outputs = keras.layers.Conv2D(1, (1, 1), padding="same", activation="sigmoid")(u4)
    model = keras.models.Model(inputs, outputs)
    return model

In [None]:
model = UNet()
model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["acc"])
model.summary()

## Training the model

In [None]:
train_gen = DataGen(train_ids, train_path, image_size=image_size, batch_size=batch_size)
valid_gen = DataGen(valid_ids, train_path, image_size=image_size, batch_size=batch_size)

train_steps = len(train_ids)//batch_size
valid_steps = len(valid_ids)//batch_size

history = model.fit(train_gen, validation_data=valid_gen, steps_per_epoch=train_steps,
          validation_steps=valid_steps, epochs=epochs)

# Save the weights
model.save_weights("UNetW.weights.h5")

# Plot training & validation accuracy and loss
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']
num_epochs = min(len(acc), len(val_acc))
epochs_range = range(1, num_epochs + 1)

plt.figure(figsize=(16, 6))

# Accuracy plot
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc[:num_epochs], label='Training Accuracy')
plt.plot(epochs_range, val_acc[:num_epochs], label='Validation Accuracy')
plt.title('Training & Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

# Loss plot
plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss[:num_epochs], label='Training Loss')
plt.plot(epochs_range, val_loss[:num_epochs], label='Validation Loss')
plt.title('Training & Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.show()

## Testing the model

In [None]:
## Save the Weights
model.save_weights("UNetW.weights.h5")

## Dataset for prediction
x, y = valid_gen.__getitem__(2)
result = model.predict(x)

result = result > 0.5

In [None]:

# Load the weights
model.load_weights("UNetW.weights.h5")

# Now you can use the model for predictions or further training
x, y = valid_gen.__getitem__(2)
result = model.predict(x)
result = result > 0.5

In [None]:
fig = plt.figure()
fig.subplots_adjust(hspace=0.4, wspace=0.4)

ax = fig.add_subplot(1, 2, 1)
ax.imshow(np.reshape(y[0]*255, (image_size, image_size)), cmap="gray")

ax = fig.add_subplot(1, 2, 2)
ax.imshow(np.reshape(result[0]*255, (image_size, image_size)), cmap="gray")

In [None]:
fig = plt.figure()
fig.subplots_adjust(hspace=0.4, wspace=0.4)

ax = fig.add_subplot(1, 2, 1)
ax.imshow(np.reshape(y[1]*255, (image_size, image_size)), cmap="gray")

ax = fig.add_subplot(1, 2, 2)
ax.imshow(np.reshape(result[1]*255, (image_size, image_size)), cmap="gray")

# CNN Model (Madhurya)

In [None]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt

import keras
from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten, Reshape
from keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
from keras import regularizers

# Set your data directory
data_dir = '/Users/sandhyakilari/Desktop/Fall Semester 2024/Computer Vision/Segmentation Project'
image_dir = os.path.join(data_dir, "images")
mask_dir = os.path.join(data_dir, "masks")

# List files
image_names = sorted(os.listdir(image_dir))
mask_names = sorted(os.listdir(mask_dir))

image_names = [name for name in image_names if name in mask_names]

# Make sure that image_names and mask_names correspond one-to-one
# If they differ, ensure file naming consistency before running.
assert len(image_names) == len(mask_names), "Number of images and masks do not match."

# Desired shapes
input_image_shape = (64, 64)  # For the CNN input
mask_shape = (32, 32)         # For the CNN output

X = []
Y = []

for img_name, msk_name in zip(image_names, mask_names):
    img_path = os.path.join(image_dir, img_name)
    msk_path = os.path.join(mask_dir, msk_name)

    # Read images in grayscale
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    msk = cv2.imread(msk_path, cv2.IMREAD_GRAYSCALE)

    # Resize to desired shapes
    img = cv2.resize(img, input_image_shape)
    msk = cv2.resize(msk, mask_shape)

    # Normalize (if you want to normalize)
    img = img.astype(np.float32) / 255.0
    msk = msk.astype(np.float32) / 255.0

    # Add channel dimension
    img = np.expand_dims(img, axis=-1)  # (64,64,1)
    msk = np.expand_dims(msk, axis=-1)  # (32,32,1)

    X.append(img)
    Y.append(msk)

X = np.array(X)
Y = np.array(Y)

print('Dataset shape :', X.shape, Y.shape)  # X: (N,64,64,1), Y: (N,32,32,1)

# Create the CNN model
def create_model(input_shape=(64, 64, 1)):
    """
    Simple convnet model: one convolution, one average pooling and one fully connected layer
    ending with a reshape to (32,32,1).
    """
    model = Sequential()
    # Conv layer
    model.add(Conv2D(100, (11,11), padding='valid', strides=(1, 1), input_shape=input_shape))
    # Average Pooling
    model.add(AveragePooling2D((6,6)))
    # Flatten/Reshape step
    # After Conv+Pool:
    # Input: (64x64x1) -> Conv(11x11): (54x54x100) -> AvgPool(6x6): (9x9x100) = 8100 features
    model.add(Reshape((8100,)))  # Flatten to (8100,)
    model.add(Dense(1024, activation='sigmoid', kernel_regularizer=regularizers.l2(0.0001)))
    # Now we have (1024,). We want (32,32,1):
    # 32*32 = 1024, so we reshape to (32,32,1)
    model.add(Reshape((32,32,1)))
    return model

m = create_model()
m.compile(loss='mean_squared_error', optimizer='adam', metrics=['accuracy'])
print('Model Summary:')
m.summary()

# Train the model
epochs = 20
batch_size = 16
history = m.fit(X, Y, batch_size=batch_size, epochs=epochs, validation_split=0.2)

# Plot training and validation loss
plt.figure(figsize=(10,5))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss', linestyle='--')
plt.title("Learning Curve")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

# Save the weights
m.save_weights("UNetW.weights.h5")

# Example prediction
y_pred = m.predict(X, batch_size=batch_size)
print("y_pred shape:", y_pred.shape)

# Visualize a sample
idx = 0
fig, ax = plt.subplots(1, 2, figsize=(8,4))
ax[0].imshow(X[idx].reshape(64,64), cmap='gray')
ax[0].set_title('Input Image')
ax[1].imshow(y_pred[idx].reshape(32,32), cmap='gray')
ax[1].set_title('Predicted Mask')
plt.show()

# AttentionUNET (Vishal)

In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, concatenate, Activation, BatchNormalization, Add, Multiply
from tensorflow.keras.models import Model
import os
import numpy as np
from tensorflow.keras.preprocessing.image import load_img, img_to_array

def attention_block(x, g, inter_channel):
    """
    Attention Block: Refines encoder features based on decoder signals.
    x: Input tensor from the encoder (skip connection)
    g: Gating signal from the decoder (upsampled tensor)
    inter_channel: Number of intermediate channels (reduces computation)
    """
    # 1x1 Convolution on input tensor
    theta_x = Conv2D(inter_channel, kernel_size=(1, 1), strides=(1, 1), padding='same')(x)
    # 1x1 Convolution on gating tensor
    phi_g = Conv2D(inter_channel, kernel_size=(1, 1), strides=(1, 1), padding='same')(g)
    
    # Add the transformed inputs and apply ReLU
    add_xg = Add()([theta_x, phi_g])
    relu_xg = Activation('relu')(add_xg)
    
    # Another 1x1 Convolution to generate attention coefficients
    psi = Conv2D(1, kernel_size=(1, 1), strides=(1, 1), padding='same')(relu_xg)
    # Sigmoid activation to normalize attention weights
    sigmoid_psi = Activation('sigmoid')(psi)
    
    # Multiply the input tensor with the attention weights
    return Multiply()([x, sigmoid_psi])

def conv_block(x, filters):
    """
    Convolutional Block: Apply two 3x3 convolutions followed by BatchNorm and ReLU.
    x: Input tensor
    filters: Number of output filters for the convolutions
    """
    x = Conv2D(filters, kernel_size=(3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Conv2D(filters, kernel_size=(3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    return x

def attention_unet(input_shape, num_classes):
    """
    Attention U-Net model architecture.
    input_shape: Shape of input images (H, W, C)
    num_classes: Number of output segmentation classes
    """
    # Input layer for the images
    inputs = Input(input_shape)
    
    # Encoder (Downsampling path)
    c1 = conv_block(inputs, 64)              # First Conv Block
    p1 = MaxPooling2D((2, 2))(c1)            # Downsample by 2
    
    c2 = conv_block(p1, 128)                 # Second Conv Block
    p2 = MaxPooling2D((2, 2))(c2)            # Downsample by 2
    
    c3 = conv_block(p2, 256)                 # Third Conv Block
    p3 = MaxPooling2D((2, 2))(c3)            # Downsample by 2
    
    c4 = conv_block(p3, 512)                 # Fourth Conv Block
    p4 = MaxPooling2D((2, 2))(c4)            # Downsample by 2
    
    # Bottleneck (lowest level of the U-Net)
    c5 = conv_block(p4, 1024)
    
    # Decoder (Upsampling path)
    up6 = UpSampling2D((2, 2))(c5)           # Upsample
    att6 = attention_block(c4, up6, 512)     # Attention Block
    merge6 = concatenate([up6, att6], axis=-1)  # Concatenate features
    c6 = conv_block(merge6, 512)             # Conv Block after concatenation
    
    up7 = UpSampling2D((2, 2))(c6)
    att7 = attention_block(c3, up7, 256)
    merge7 = concatenate([up7, att7], axis=-1)
    c7 = conv_block(merge7, 256)
    
    up8 = UpSampling2D((2, 2))(c7)
    att8 = attention_block(c2, up8, 128)
    merge8 = concatenate([up8, att8], axis=-1)
    c8 = conv_block(merge8, 128)
    
    up9 = UpSampling2D((2, 2))(c8)
    att9 = attention_block(c1, up9, 64)
    merge9 = concatenate([up9, att9], axis=-1)
    c9 = conv_block(merge9, 64)
    
    # Output layer for segmentation
    outputs = Conv2D(num_classes, (1, 1), activation='softmax' if num_classes > 1 else 'sigmoid')(c9)
    
    # Define the model
    model = Model(inputs=inputs, outputs=outputs)
    return model

# Function to load and preprocess images and masks
def load_data(image_dir, mask_dir, image_size):
    """
    Load and preprocess images and masks for training.
    image_dir: Path to the directory containing input images
    mask_dir: Path to the directory containing segmentation masks
    image_size: Tuple specifying the size (height, width) to resize the images and masks
    """
    images = []
    masks = []
    image_files = sorted(os.listdir(image_dir))
    mask_files = sorted(os.listdir(mask_dir))
    
    for img_file, mask_file in zip(image_files, mask_files):
        try:
            # Load and preprocess images
            img_path = os.path.join(image_dir, img_file)
            mask_path = os.path.join(mask_dir, mask_file)
            
            img = load_img(img_path, target_size=image_size)  # Resize image
            mask = load_img(mask_path, target_size=image_size, color_mode='grayscale')  # Resize mask
            
            # Convert to numpy arrays and normalize
            img = img_to_array(img) / 255.0
            mask = img_to_array(mask) / 255.0
            mask = np.round(mask)  # Ensure masks are binary
            
            images.append(img)
            masks.append(mask)
        except Exception as e:
            print(f"Error loading {img_file} or {mask_file}: {e}. Skipping...")
    
    return np.array(images), np.array(masks)

# Example usage
if __name__ == "__main__":
    # Load data
    image_dir = "./images/"  # Replace with your image directory
    mask_dir = "./masks/"    # Replace with your mask directory
    image_size = (128, 128)       # Resize all images to 128x128
    images, masks = load_data(image_dir, mask_dir, image_size)
    
    # Define the model
    model = attention_unet(input_shape=(128, 128, 3), num_classes=1)
    
    # Compile the model
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    
    # Train the model
    model.fit(images, masks, batch_size=8, epochs=20, validation_split=0.1)