[797475]: / keras_CNN / keras_evaluate.py

Download this file

251 lines (224 with data), 10.5 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""
Evaluate a Keras model on a fresh dataset.
"""
from __future__ import print_function
from keras.models import Model
from keras.optimizers import SGD, Adagrad, Adadelta, RMSprop
from keras.utils import np_utils
from keras.models import load_model
from keras.models import model_from_json
from keras import backend as K
import sys, os, numpy as np
from load_tumor_image_data import *
nb_classes = 2
batch_size = 64
def main():
args = get_args()
model_file = args['model_file']
data_file = args['data_file']
weights_file = None
split_model = False
if args['model_weights'] != '':
weights_file = args['model_weights']
split_model = True
elif os.path.splitext(model_file)[1] == '.json':
weights_file = os.path.splitext(model_file)[0] + '.weights.hd5'
split_model = True
window_normalize = args['window_normalize']
# Give some feedback on settings
if not args['normalize'] and (window_normalize == False):
print("Using raw images.")
if window_normalize:
print("Using window normalization.")
# Load the data file
(X,y) = load_all_data(data_file, normalize=args['normalize'], window_normalize=window_normalize)
# Feedback on the data
print("X shape: {} ; X[0] shape: {} X[0][2] shape: {}".format(X.shape, X[0].shape, X[0][2].shape))
img_rows, img_cols = X[0][2].shape
print('img_rows: {0}, img_cols: {1}'.format(img_rows, img_cols))
print('X shape:', X.shape)
print('{0} samples ({1} (+), {2} (-))'.format(X.shape[0], y.sum(), len(y)-y.sum()))
# convert class vectors to binary class matrices
ground_truth = y
y = np_utils.to_categorical(y, nb_classes)
# Check validity of model file:
model = None
if split_model:
print('Loading model design {0}\nand weights {1}'.format(model_file, weights_file))
with open(model_file, 'rU') as json_file:
model = model_from_json(json_file.read())
model.load_weights(weights_file)
else:
print('Loading full model file {0}'.format(model_file))
model = load_model(model_file)
if args['show_layout']:
print('Network Layout:')
model.summary()
print('\n\n')
y_pred = model.predict(X, batch_size=batch_size, verbose=1)
result = quantify_results(y_pred[:,1], ground_truth, auc=True)
if args['list_cases']:
metadata = load_metadata(data_file)
print('')
print_case_table(y_pred, ground_truth, metadata)
print('\n\n')
if args['expression_file'] != None:
print('\nSaving expression vectors in {0}.'.format(args['expression_file']))
import csv
feature_layer = find_feature_layer(model, index=int(args['expression_layer_index']))
print("Getting output at layer index {0}. Layer output shape: {1}".format(feature_layer, model.layers[feature_layer].output_shape))
expression = get_output_from_layer(model, X, layer_index=feature_layer)
with open(args['expression_file'], 'wb') as csvfile:
writer = csv.writer(csvfile, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
for vec in expression:
writer.writerow(vec)
if args['predictions_file'] != None:
# predictions are just expression at the last layer, reduced to a single floating-point value if
# the number of classes is 2.
print('\nSaving prediction values in {0}.'.format(args['predictions_file']))
import csv
with open(args['predictions_file'], 'wb') as csvfile:
writer = csv.writer(csvfile, delimiter='\t', quoting=csv.QUOTE_MINIMAL)
for vec in y_pred:
try:
if len(vec) == 2:
vec = [vec[1]]
except:
vec = [vec]
writer.writerow(vec)
print('(+) - Total: {0} ; True: {1} ; False {2}'.format(result['npos'],result['tp'],result['fp']))
print('(-) - Total: {0} ; True: {1} ; False {2}'.format(result['nneg'],result['tn'],result['fn']))
print('auc: {0} acc: {1}, sensitivity: {2}, specificity: {3}, precision: {4}'.format(result['auc'], result['acc'], result['sens'],result['spec'], result['prec']))
return 0
def get_output_from_layer(model, X, layer_index=None, train_mode=False, n_categories=2):
"""
get the output from a specific layer given the input; defaults to the last
layer before a reduction to classes (<=n_categories)
"""
mode = 0 if not train_mode else 1
if layer_index == None:
layer_index = find_feature_layer(model, n_categories=n_categories)
get_nth_layer_output = K.function([model.layers[0].input, K.learning_phase()],
[model.layers[layer_index].output])
layer_output = get_nth_layer_output([X, mode])[0]
return layer_output
def find_last_feature_layer(model, n_categories=2):
unwanted_layers = ['Dropout', 'Pooling']
last_layer_index = len(model.layers) - 1
last_layer = model.layers[last_layer_index]
while last_layer.output_shape[-1] <= n_categories \
or any(ltype in str(type(last_layer)) for ltype in unwanted_layers):
last_layer_index -= 1
last_layer = model.layers[last_layer_index]
return last_layer_index
def find_feature_layer(model, index=-2):
unwanted_layers = ['Dropout', 'Pooling']
direction = np.sign(index)
i_orig = index
index = abs(index)
current_layer_index = len(model.layers) - 1
current_layer = model.layers[current_layer_index]
if direction >= 0:
current_layer = model.layers[0]
current_layer_index = 0
while (direction != 0 and index >= 0) or any(ltype in str(type(current_layer)) for ltype in unwanted_layers):
current_layer_index += direction
current_layer = model.layers[current_layer_index]
index -= 1
return current_layer_index
def quantify_results(predictions, ground_truth, auc=False):
fp = fn = tp = tn = npos = nneg = 0
total = len(predictions)
if auc:
from sklearn import metrics
auc_value = round(metrics.roc_auc_score(np.ravel(ground_truth), np.ravel(predictions)), 3)
predictions = np.round(predictions)
for idx, predicted in enumerate(predictions):
if is_positive(ground_truth[idx]):
npos += 1
else:
nneg += 1
if is_positive(predicted) == is_positive(ground_truth[idx]):
if is_positive(ground_truth[idx]):
tp += 1
else:
tn += 1
else:
if is_positive(predicted):
fp += 1
else:
fn += 1
epsilon = 1e-20 # used to avoid divide-by-zero in rare cases where all example are seen as a single class
acc = round((tp + tn) / float(total), 3)
sens = round(tp / (float(npos) + epsilon), 3)
spec = round(tn / (float(nneg) + epsilon), 3)
ppv = round(tp / (float(tp + fp) + epsilon), 3)
npv = round(tn / (float(tn + fn) + epsilon), 3)
result = {
'fp': fp,
'tp': tp,
'fn': fn,
'tn': tn,
'total': total,
'tot': total,
'npos': npos,
'nneg': nneg,
'acc': acc,
'sens': sens,
'tpr': sens,
'spec': spec,
'spc': spec,
'tnr': spec,
'ppv': ppv,
'prec': ppv,
'npv': npv
}
if auc:
result['auc'] = auc_value
return result
def print_case_table(y_pred, ground_truth, metadata={}):
meta_keys = metadata.keys()
headings = ['true class', 'predicted', 'correct?']
headings.extend(meta_keys)
print('\t'.join(headings))
for idx, prediction in enumerate(y_pred):
results = [str(x) for x in [ground_truth[idx][0], prediction, str(ground_truth[idx][0]==prediction)]]
for key in meta_keys:
try:
results.append(str(metadata[key][idx][0]))
except TypeError:
results.append(str(metadata[key][idx]))
print('\t'.join(results))
def array_like(x):
import collections
return isinstance(x, collections.Sequence) or isinstance(x, np.ndarray)
def is_positive(cls):
if array_like(cls) and len(cls) > 1:
return cls[0] == 0
else:
return cls != 0 if not array_like(cls) else cls[0] != 0
def get_args():
import argparse
# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser(prog='{0}'.format(os.path.basename(sys.argv[0])))
ap.add_argument("--list", dest="list_cases", action='store_true', help="List results case-by-case (verbose output).")
ap.add_argument("--layout", dest="show_layout", action='store_true', help="Show network layout.")
ap.add_argument("model_file", metavar='model-file' , help = "Keras model file (.hd5 or .json)")
ap.add_argument("model_weights", metavar='model-weights', nargs='?', default="",
help = "Keras weights file (.hd5); optional if model file was .hd5 or if name is same as model file except for extension.")
ap.add_argument("data_file", metavar='data-file' , help = "HDF5 file containing dataset to evaluate.")
ap.add_argument("--raw", dest='normalize', action='store_false', help="Use raw images; no normalization.")
ap.add_argument("--window", dest='window_normalize', action='store_true', help="Perform HU window normalization.")
ap.add_argument("-x", dest = 'expression_file', required = False,
help = "If given, \"expression levels\" for each item in data file are saved here in CSV-compatible format.")
ap.add_argument("-l", dest = 'expression_layer_index', required = False, default = -2,
help = """Used in conjunction with '-x' to select a specific layer's expression output; positive values
start from the beginning, negative values from the end. Default is last layer before reducing
to classes (-2); pooling and dropout layers do not count.""")
ap.add_argument("-p", dest = 'predictions_file', required=False,
help = "If given, \"predictions\" (floating-point) for each item in data file are saved here in CSV-compatible format.")
args = vars(ap.parse_args())
return args
if __name__ == '__main__':
# construct the argument parser and parse the arguments
sys.exit(main())