[214c6e]: / 01_jund_prediction / model_utils.py

Download this file

144 lines (122 with data), 5.4 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import sys
import traceback
from keras.layers import Conv2D
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import GlobalMaxPooling2D
from keras.layers import MaxPooling2D
from keras.layers import BatchNormalization
from keras.layers import Flatten
from keras.layers import Concatenate
from keras import backend as K
from keras.callbacks import EarlyStopping
from sklearn.metrics import average_precision_score
from keras import optimizers
from janggu.layers import DnaConv2D
from janggu import inputlayer
from janggu import Janggu
from data_utils import get_data
def get_opt(name):
if name == 'amsgrad':
opt = optimizers.Adam(amsgrad=True, clipvalue=.5, clipnorm=1.)
elif name == 'sgd':
opt = optimizers.SGD(lr=0.01, decay=1e-6,
momentum=0.9, nesterov=True,
clipvalue=.5, clipnorm=1.)
else:
opt = optimizers.RMSprop(clipvalue=.5, clipnorm=1.)
return opt
@inputlayer
def dna_model(inputs, inp, oup, params):
with inputs.use('dna') as dna_in:
layer = dna_in
if params['seq_dropout'] > 0.0:
layer = Dropout(params['seq_dropout'])(layer)
cl = Conv2D(params['nmotifs1'], (params['motiflen'], 1), activation='relu')
if params['stranded'] == 'double':
layer = DnaConv2D(cl)(layer)
else:
layer = cl(layer)
layer = MaxPooling2D((params['pool1'], 1))(layer)
layer = BatchNormalization()(layer)
layer = Conv2D(params['nmotifs2'], (params['hypermotiflen'], 1), activation='relu')(layer)
return inputs, layer
@inputlayer
def dnase_model(inputs, inp, oup, params):
with inputs.use('dnase') as dnase_in:
layer = dnase_in
cl = Conv2D(params['nkernel1'], (params['kernel1len'], 2),
activation='relu', name='dnase_conv_1')
layer = cl(layer)
layer = MaxPooling2D((params['kernel1pool'], 1), name='dnase_maxpooling_1')(layer)
layer = Conv2D(params['nkernel2'],
(params['kernel2len'], 1), activation='relu', name='dnase_conv_2')(layer)
return inputs, layer
@inputlayer
def dnase_dna_model(inputs, inp, oup, params):
xin_dna, dna_hidden = dna_model(inputs, inp, oup, params)
xin_dnase, dnase_hidden = dnase_model(inputs, inp, oup, params)
if params['concat'] == 'flatten':
dna_hidden = Flatten()(dna_hidden)
dnase_hidden = Flatten()(dnase_hidden)
else:
dna_hidden = GlobalMaxPooling2D()(dna_hidden)
dnase_hidden = GlobalMaxPooling2D()(dnase_hidden)
layer_c = Concatenate()([dna_hidden, dnase_hidden])
layer_1 = Dense(32, activation='relu')(layer_c)
layer_2 = Dense(16, activation='relu')(layer_1)
if params['inception'] == True:
layer = Concatenate()([layer_c, layer_1, layer_2])
else:
layer = layer_2
return inputs, layer
@inputlayer
def get_model(inputs, inp, oup, params):
if params['type'] == 'dna_only':
xin, layer = dna_model(inputs, inp, oup, params)
layer = GlobalMaxPooling2D(name='dna_global_max')(layer)
layer = BatchNormalization(name='dna_batch_2')(layer)
elif params['type'] == 'dnase_dna':
xin, layer = dnase_dna_model(inputs, inp, oup, params)
elif params['type'] in ['dnase_only', 'dnase_bam_only']:
xin, layer = dnase_model(inputs, inp, oup, params)
layer = GlobalMaxPooling2D(name='dnase_global_max')(layer)
layer = BatchNormalization(name='dnase_batch_2')(layer)
output = Dense(1, activation='sigmoid', name='peaks')(layer)
return inputs, output
def objective(params):
train_data, val_data, test_data = get_data(params)
# define a keras model only based on DNA
try:
K.clear_session()
model = Janggu.create(get_model, params, train_data[0], train_data[1], name=params['name'])
model.compile(optimizer=get_opt(params['opt']), loss='binary_crossentropy',
metrics=['acc'])
hist = model.fit(train_data[0], train_data[1], epochs=params['epochs'], batch_size=64,
validation_data=val_data,
callbacks=[EarlyStopping(patience=5, restore_best_weights=True)])
except ValueError:
traceback.print_stack()
exc_type, exc_value, exc_traceback = sys.exc_info()
print(repr(traceback.extract_tb(exc_traceback)))
return {'status': 'fail'}
print('#' * 40)
for key in hist.history:
print('{}: {}'.format(key, hist.history[key][-1]))
print('#' * 40)
pred_test = model.predict(test_data[0])
pred_val = model.predict(val_data[0])
model.evaluate(val_data[0], val_data[1], callbacks=['auprc', 'auroc'], datatags=['val'])
model.evaluate(test_data[0], test_data[1], callbacks=['auprc', 'auroc'], datatags=['test'])
auprc_val = average_precision_score(val_data[1][:], pred_val)
auprc_test = average_precision_score(test_data[1][:], pred_test)
model.summary()
print('auprc_val: {:.2%}'.format(auprc_val))
print('auprc_test: {:.2%}'.format(auprc_test))
return {'loss': hist.history['val_loss'][-1], 'status': 'ok', 'all_losses': hist.history,
'auprc_val': auprc_val,
'auprc_test': auprc_test,
'model_config': model.kerasmodel.to_json(),
'model_weights': model.kerasmodel.get_weights(),
'concrete_params': params,
'modelname': model.name}