|
a |
|
b/stacked_ae.py |
|
|
1 |
import keras |
|
|
2 |
from keras.models import Model,Sequential |
|
|
3 |
from keras.layers import Input,Dense, Dropout, Activation, Flatten, Reshape, Conv2D, MaxPooling2D, AveragePooling2D |
|
|
4 |
from keras import regularizers |
|
|
5 |
from keras.losses import mean_squared_error |
|
|
6 |
from keras import losses |
|
|
7 |
import matplotlib.patches as patches |
|
|
8 |
import numpy as np |
|
|
9 |
import dicom |
|
|
10 |
import cv2 |
|
|
11 |
|
|
|
12 |
from utils import * |
|
|
13 |
from train_cnn import run |
|
|
14 |
|
|
|
15 |
def open_data_AE(X_fullsize, y_pred, contour_mask): |
|
|
16 |
""" |
|
|
17 |
Open dataset from the output of the CNN and |
|
|
18 |
unroll it as 64*64 = vector of 4096 elements |
|
|
19 |
:param X_fullsize: full size training set (256x256) |
|
|
20 |
:param y_pred: CNN output |
|
|
21 |
:return: input AE, output |
|
|
22 |
""" |
|
|
23 |
input_AE = [] |
|
|
24 |
contour_experts = [] |
|
|
25 |
for j in range(y_pred.shape[0]): |
|
|
26 |
in_AE = cv2.resize(compute_roi_pred(X_fullsize, y_pred, contour_mask, j, roi_shape=32)[0],(64 , 64)) |
|
|
27 |
contour = cv2.resize(compute_roi_pred(X_fullsize, y_pred, contour_mask, j)[2], (64,64), interpolation = cv2.INTERSECT_NONE) |
|
|
28 |
input_AE.append(in_AE) |
|
|
29 |
contour_experts.append(contour) |
|
|
30 |
return np.array(input_AE).reshape((-1, 64*64)), np.array(contour_experts).reshape((-1, 64*64)) |
|
|
31 |
|
|
|
32 |
def customized_loss(y_true, y_pred, alpha=0.0001, beta=3): |
|
|
33 |
""" |
|
|
34 |
Create a customized loss for the stacked AE. |
|
|
35 |
Linear combination of MSE and KL divergence. |
|
|
36 |
""" |
|
|
37 |
#customize your own loss components |
|
|
38 |
loss1 = losses.mean_absolute_error(y_true, y_pred) |
|
|
39 |
loss2 = losses.kullback_leibler_divergence(y_true, y_pred) |
|
|
40 |
#adjust the weight between loss components |
|
|
41 |
return (alpha/2) * loss1 + beta * loss2 |
|
|
42 |
|
|
|
43 |
def model1(X_train, param_reg, get_history=False, verbose=0,loss = "customized_loss"): |
|
|
44 |
""" |
|
|
45 |
First part of the stacked AE. |
|
|
46 |
Train the AE on the ROI input images. |
|
|
47 |
:param X_train: ROI input image |
|
|
48 |
:param get_history: boolean to return the loss history |
|
|
49 |
:param loss: if "customized_loss" -> customized_loss |
|
|
50 |
:return: encoded ROI image |
|
|
51 |
""" |
|
|
52 |
autoencoder_0 = Sequential() |
|
|
53 |
encoder_0 = Dense(input_dim=4096, units=100, kernel_regularizer=regularizers.l2(param_reg)) |
|
|
54 |
decoder_0 = Dense(input_dim=100, units=4096, kernel_regularizer=regularizers.l2(param_reg)) |
|
|
55 |
autoencoder_0.add(encoder_0) |
|
|
56 |
autoencoder_0.add(decoder_0) |
|
|
57 |
if (loss == "customized_loss"): |
|
|
58 |
loss = customized_loss |
|
|
59 |
autoencoder_0.compile(loss = loss,optimizer='adam', metrics=['accuracy']) |
|
|
60 |
h = autoencoder_0.fit(X_train, X_train, epochs=100, verbose=verbose) |
|
|
61 |
|
|
|
62 |
temp_0 = Sequential() |
|
|
63 |
temp_0.add(encoder_0) |
|
|
64 |
temp_0.compile(loss=loss, optimizer='adam', metrics=['accuracy']) |
|
|
65 |
encoded_X = temp_0.predict(X_train, verbose=0) |
|
|
66 |
if get_history: |
|
|
67 |
return h.history['loss'], encoded_X, encoder_0 |
|
|
68 |
else: |
|
|
69 |
return encoded_X, encoder_0 |
|
|
70 |
|
|
|
71 |
def model2(encoder_0, encoded_X, X_train, param_reg, get_history=False, verbose=0,loss = "customized_loss"): |
|
|
72 |
""" |
|
|
73 |
Second part of the stacked AE. |
|
|
74 |
:param X_train: encoder ROI image |
|
|
75 |
:param get_history: boolean to return the loss history |
|
|
76 |
:return: encoding layer |
|
|
77 |
""" |
|
|
78 |
autoencoder_1 = Sequential() |
|
|
79 |
encoder_1 = Dense(input_dim=100, units=100, kernel_regularizer=regularizers.l2(param_reg)) |
|
|
80 |
decoder_1 = Dense(input_dim=100, units=100, kernel_regularizer=regularizers.l2(param_reg)) |
|
|
81 |
autoencoder_1.add(encoder_1) |
|
|
82 |
autoencoder_1.add(decoder_1) |
|
|
83 |
if (loss == "customized_loss"): |
|
|
84 |
loss = customized_loss |
|
|
85 |
autoencoder_1.compile(loss= loss, optimizer='adam', metrics=['accuracy']) |
|
|
86 |
h = autoencoder_1.fit(encoded_X, encoded_X, epochs=100, verbose=verbose) |
|
|
87 |
|
|
|
88 |
temp_0 = Sequential() |
|
|
89 |
temp_0.add(encoder_0) |
|
|
90 |
temp_0.compile(loss= loss, optimizer='adam', metrics=['accuracy']) |
|
|
91 |
encoded_X = temp_0.predict(X_train, verbose=0) |
|
|
92 |
if get_history: |
|
|
93 |
return h.history['loss'], encoder_1 |
|
|
94 |
else: |
|
|
95 |
return encoder_1 |
|
|
96 |
|
|
|
97 |
def model3(X_train, Y_train, encoder_0, encoder_1, init, param_reg, |
|
|
98 |
get_history=False, verbose=0,loss = "MSE"): |
|
|
99 |
""" |
|
|
100 |
Last part of the stacked AE. |
|
|
101 |
:param X_train: ROI input image |
|
|
102 |
:param init: set the initial kernel weights (None for uniform) |
|
|
103 |
:param get_history: boolean to return the loss history |
|
|
104 |
:return: final model |
|
|
105 |
""" |
|
|
106 |
model = Sequential() |
|
|
107 |
model.add(encoder_0) |
|
|
108 |
model.add(encoder_1) |
|
|
109 |
model.add(Dense(input_dim=100, units=4096, kernel_initializer=init, kernel_regularizer=regularizers.l2(param_reg))) |
|
|
110 |
if (loss == "customized_loss"): |
|
|
111 |
loss = customized_loss |
|
|
112 |
model.compile(optimizer = 'adam', loss = loss, metrics=['accuracy']) |
|
|
113 |
h = model.fit(X_train, Y_train, epochs=20, verbose=verbose) |
|
|
114 |
if get_history: |
|
|
115 |
return h.history['loss'], model |
|
|
116 |
else: |
|
|
117 |
return model |
|
|
118 |
|
|
|
119 |
def run(X_fullsize, y_pred, contour_mask, X_to_pred=None, verbose=0, param_reg=3*0.001, init_3='zero',history=False ,loss1 = "customized_loss",loss2 = "customized_loss",loss3 = "MSE"): |
|
|
120 |
""" |
|
|
121 |
Full pipeline for CNN: load the dataset, train the model and predict ROIs |
|
|
122 |
:param X_fullsize: full size training set (256x256) |
|
|
123 |
:param y_pred: CNN output for training |
|
|
124 |
:param X_to_pred: input for predictions after training (X_train if not specified) |
|
|
125 |
:param verbose: int for verbose |
|
|
126 |
:return: X_train, Y_train, contours_pred |
|
|
127 |
""" |
|
|
128 |
X_train, Y_train = open_data_AE(X_fullsize, y_pred, contour_mask) |
|
|
129 |
encoded_X, encoder_0 = model1(X_train, param_reg=param_reg,loss = loss1) |
|
|
130 |
encoder_1 = model2(encoder_0, encoded_X, X_train, param_reg=param_reg,loss = loss2) |
|
|
131 |
h, model = model3(X_train, Y_train, encoder_0, encoder_1, param_reg=param_reg, |
|
|
132 |
init=init_3, get_history=True, verbose=verbose,loss = loss3) |
|
|
133 |
if not X_to_pred: |
|
|
134 |
X_to_pred = X_train |
|
|
135 |
contours = model.predict(X_to_pred) |
|
|
136 |
binarys = np.array([cv2.threshold(contour, 0, 1, cv2.INTERSECT_NONE)[1].reshape((64,64)) for contour in contours]) |
|
|
137 |
if history: |
|
|
138 |
return X_train, Y_train, binarys, model, h |
|
|
139 |
else: |
|
|
140 |
return X_train, Y_train, binarys, model |
|
|
141 |
|
|
|
142 |
def inference(X_fullsize, y_pred, contour_mask, model): |
|
|
143 |
X_test, Y_test = open_data_AE(X_fullsize, y_pred, contour_mask) |
|
|
144 |
contours = model.predict(X_test) |
|
|
145 |
binarys = np.array([cv2.threshold(contour, 0, 1, cv2.INTERSECT_NONE)[1].reshape((64,64)) for contour in contours]) |
|
|
146 |
return X_test, Y_test, binarys |
|
|
147 |
|
|
|
148 |
|