--- a +++ b/brainDecode/towardMoabbIntegration/brainDecodeSKLearnWrapper/ShallowFBCSPNet_SpecializedTrainer.py @@ -0,0 +1,281 @@ + + +# TODO, get this to work: +import sys +from sklearn.base import BaseEstimator, ClassifierMixin +from braindecode.datautil.signal_target import SignalAndTarget +from braindecode.models.shallow_fbcsp import ShallowFBCSPNet +from torch import nn +from braindecode.torch_ext.util import set_random_seeds +from torch import optim +import torch +from braindecode.torch_ext.util import np_to_var, var_to_np +from braindecode.datautil.iterators import get_balanced_batches +import torch.nn.functional as F +import numpy as np +from numpy.random import RandomState +from random import randint +from torch.nn import init + + +class ShallowFBCSPNet_SpecializedTrainer(BaseEstimator, ClassifierMixin): + + model = None + + def __init__(self, network=None, filename=None): + self.cuda = True + if network is not None: + self._decorateNetwork(network) + elif filename is not None: + self._loadFromFile(filename) + else: + print("unsupported option") + sys.exit(-1) + + # set default parameters + self.configure() + + def configure(self, + nb_epoch=160, + initial_lr=0.00006, + trainTestRatio=(7/8)): + self.nb_epoch = nb_epoch + self.lr = initial_lr + self.trainTestRatio = trainTestRatio + + def _decorateNetwork(self, network): + + self.model = network # TODO make a deep copy + + # deactivate training for all layers + #for param in network.conv_classifier.parameters(): + # param.requires_grad = False + + # replace last layer with a brand new one (for which training is true by default) + self.model.conv_classifier = nn.Conv2d(5, 2,(116, 1), bias=True).cuda() + self.optimizer = optim.Adam(classifier.model.parameters()) + + # save/load only the model parameters(prefered solution) TODO: ask yannick how to download I guess + torch.save(self.model.state_dict(), "myModel.pth") + + return + + def _loadFromFile(self, filename): + + # TODO: integrate this in saved file parameters somehow + #n_filters_time=10 + #filter_time_length=75 + #n_filters_spat=5 + #pool_time_length=60 + #pool_time_stride=30 + #in_chans = 15 + #input_time_length = 3584 + + + # final_conv_length = auto ensures we only get a single output in the time dimension + self.model = ShallowFBCSPNet( + in_chans=15, + n_classes=2, + input_time_length=3584, + + n_filters_time=10, + filter_time_length=75, + n_filters_spat=5, + pool_time_length=60, + pool_time_stride=30, + + final_conv_length='auto' + + ).create_network() + + + # setup model for cuda + if self.cuda: + print("That's the new one") + self.model.cuda() + + + # load the saved network (makes it possible to run bottom form same starting point + self.model.load_state_dict(torch.load("myModel.pth")) + return + + + """ + Fit the network + Params: + X, data array in the format (...) + y, labels + ref: http://danielhnyk.cz/creating-your-own-estimator-scikit-learn/ + """ + def fit(self, X, y): + + self.nb_epoch=160 + + # prepare an optimizer + self.optimizer = optim.Adam(self.model.conv_classifier.parameters(),lr=self.lr) + + # define a number of train/test trials + nb_train_trials = int(np.floor(self.trainTestRatio*X.shape[0])) + + # split the dataset + train_set = SignalAndTarget(X[:nb_train_trials], y=y[:nb_train_trials]) + test_set = SignalAndTarget(X[nb_train_trials:], y=y[nb_train_trials:]) + + # random generator + self.rng = RandomState(None) + + # array that tracks results + self.loss_rec = np.zeros((self.nb_epoch,2)) + self.accuracy_rec = np.zeros((self.nb_epoch,2)) + + # run all epoch + for i_epoch in range(self.nb_epoch): + + self._batchTrain(i_epoch, train_set) + self._evalTraining(i_epoch, train_set, test_set) + + return self + + + """ + Training iteration, train the network on the train_set + Params: + i_epoch, current epoch iteration + train_set, training set + """ + def _batchTrain(self, i_epoch, train_set): + + + # get a set of balanced batches + i_trials_in_batch = get_balanced_batches(len(train_set.X), self.rng, shuffle=True, + batch_size=32) + + + self.adjust_learning_rate(self.optimizer,i_epoch) + + + # Set model to training mode + self.model.train() + + # go through all batches + for i_trials in i_trials_in_batch: + + # Have to add empty fourth dimension to X + batch_X = train_set.X[i_trials][:,:,:,None] + batch_y = train_set.y[i_trials] + + net_in = np_to_var(batch_X) + net_target = np_to_var(batch_y) + + # if cuda, copy to cuda memory + if self.cuda: + net_in = net_in.cuda() + net_target = net_target.cuda() + + # Remove gradients of last backward pass from all parameters + self.optimizer.zero_grad() + # Compute outputs of the network + outputs = self.model(net_in) + # Compute the loss + loss = F.nll_loss(outputs, net_target) + # Do the backpropagation + loss.backward() + # Update parameters with the optimizer + self.optimizer.step() + + return + + + """ + Evaluation iteration, computes the performance the network + Params: + i_epoch, current epoch iteration + train_set, training set + """ + def _evalTraining(self, i_epoch, train_set, test_set): + + + # Print some statistics each epoch + self.model.eval() + print("Epoch {:d}".format(i_epoch)) + + sets = {'Train' : 0, 'Test' : 1} + + # run evaluation on both train and test sets + for setname, dataset in (('Train', train_set), ('Test', test_set)): + + # get balanced sets + i_trials_in_batch = get_balanced_batches(len(dataset.X), self.rng, batch_size=32, shuffle=False) + + outputs = [] + net_targets = [] + + # for all trials in set + for i_trials in i_trials_in_batch: + + # adapt datasets + batch_X = dataset.X[i_trials][:,:,:,None] + batch_y = dataset.y[i_trials] + + # apply some conversion + net_in = np_to_var(batch_X) + net_target = np_to_var(batch_y) + + # convert + if self.cuda: + net_in = net_in.cuda() + net_target = net_target.cuda() + + net_target = var_to_np(net_target) + output = var_to_np(self.model(net_in)) + outputs.append(output) + net_targets.append(net_target) + + net_targets = np_to_var(np.concatenate(net_targets)) + outputs = np_to_var(np.concatenate(outputs)) + loss = F.nll_loss(outputs, net_targets) + + print("{:6s} Loss: {:.5f}".format(setname, float(var_to_np(loss)))) + + self.loss_rec[i_epoch, sets[setname]] = var_to_np(loss) + predicted_labels = np.argmax(var_to_np(outputs), axis=1) + accuracy = np.mean(dataset.y == predicted_labels) + + print("{:6s} Accuracy: {:.1f}%".format(setname, accuracy * 100)) + self.accuracy_rec[i_epoch, sets[setname]] = accuracy + + return + + def predict(self, X): + self.model.eval() + + #i_trials_in_batch = get_balanced_batches(len(X), self.rng, batch_size=32, shuffle=False) + + outputs = [] + + for i_trials in i_trials_in_batch: + batch_X = dataset.X[i_trials][:,:,:,None] + + net_in = np_to_var(batch_X) + + if self.cuda: + net_in = net_in.cuda() + + output = var_to_np(self.model(net_in)) + outputs.append(output) + + return outputs + + + + def adjust_learning_rate(self, optimizer, epoch): + """Sets the learning rate to the initial LR decayed by 10% every 30 epochs""" + lr = self.lr * (0.1 ** (epoch // 30)) + for param_group in optimizer.param_groups: + param_group['lr'] = lr + + + + + +