|
a |
|
b/Model.py |
|
|
1 |
# -*- coding: utf-8 -*- |
|
|
2 |
""" |
|
|
3 |
Created on Tue Sep 17 11:16:34 2019 |
|
|
4 |
|
|
|
5 |
@author: anne marie delaney |
|
|
6 |
eoin brophy |
|
|
7 |
|
|
|
8 |
Module of the GAN model for time series synthesis. |
|
|
9 |
|
|
|
10 |
""" |
|
|
11 |
|
|
|
12 |
import torch |
|
|
13 |
import torch.nn as nn |
|
|
14 |
|
|
|
15 |
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') |
|
|
16 |
|
|
|
17 |
|
|
|
18 |
""" |
|
|
19 |
NN Definitions |
|
|
20 |
--------------- |
|
|
21 |
Defining the Neural Network Classes to be evaluated in this Notebook |
|
|
22 |
|
|
|
23 |
Minibatch Discrimination |
|
|
24 |
-------------------------- |
|
|
25 |
Creating a module for Minibatch Discrimination to avoid mode collapse as described: |
|
|
26 |
https://arxiv.org/pdf/1606.03498.pdf |
|
|
27 |
https://torchgan.readthedocs.io/en/latest/modules/layers.html#minibatch-discrimination |
|
|
28 |
|
|
|
29 |
""" |
|
|
30 |
|
|
|
31 |
class MinibatchDiscrimination(nn.Module): |
|
|
32 |
def __init__(self,input_features,output_features,minibatch_normal_init, hidden_features=16): |
|
|
33 |
super(MinibatchDiscrimination,self).__init__() |
|
|
34 |
|
|
|
35 |
self.input_features = input_features |
|
|
36 |
self.output_features = output_features |
|
|
37 |
self.hidden_features = hidden_features |
|
|
38 |
self.T = nn.Parameter(torch.randn(self.input_features,self.output_features, self.hidden_features)) |
|
|
39 |
if minibatch_normal_init == True: |
|
|
40 |
nn.init.normal(self.T, 0,1) |
|
|
41 |
|
|
|
42 |
def forward(self,x): |
|
|
43 |
M = torch.mm(x,self.T.view(self.input_features,-1)) |
|
|
44 |
M = M.view(-1, self.output_features, self.hidden_features).unsqueeze(0) |
|
|
45 |
M_t = M.permute(1, 0, 2, 3) |
|
|
46 |
# Broadcasting reduces the matrix subtraction to the form desired in the paper |
|
|
47 |
out = torch.sum(torch.exp(-(torch.abs(M - M_t).sum(3))), dim=0) - 1 |
|
|
48 |
return torch.cat([x, out], 1) |
|
|
49 |
|
|
|
50 |
""" |
|
|
51 |
Discriminator Class |
|
|
52 |
------------------- |
|
|
53 |
This discriminator has a parameter num_cv which allows the user to specify if |
|
|
54 |
they want to have 1 or 2 Convolution Neural Network Layers. |
|
|
55 |
|
|
|
56 |
""" |
|
|
57 |
|
|
|
58 |
class Discriminator(nn.Module): |
|
|
59 |
def __init__(self,seq_length,batch_size,minibatch_normal_init, n_features = 1, num_cv = 1, minibatch = 0, cv1_out= 10, cv1_k = 3, cv1_s = 4, p1_k = 3, p1_s = 3, cv2_out = 10, cv2_k = 3, cv2_s = 3 ,p2_k = 3, p2_s = 3): |
|
|
60 |
super(Discriminator,self).__init__() |
|
|
61 |
self.n_features = n_features |
|
|
62 |
self.seq_length = seq_length |
|
|
63 |
self.batch_size = batch_size |
|
|
64 |
self.num_cv = num_cv |
|
|
65 |
self.minibatch = minibatch |
|
|
66 |
self.cv1_dims = int((((((seq_length - cv1_k)/cv1_s) + 1)-p1_k)/p1_s)+1) |
|
|
67 |
self.cv2_dims = int((((((self.cv1_dims - cv2_k)/cv2_s) + 1)-p2_k)/p2_s)+1) |
|
|
68 |
self.cv1_out = cv1_out |
|
|
69 |
self.cv2_out = cv2_out |
|
|
70 |
|
|
|
71 |
#input should be size (batch_size,num_features,seq_length) for the convolution layer |
|
|
72 |
self.CV1 = nn.Sequential( |
|
|
73 |
nn.Conv1d(in_channels = self.n_features, out_channels = int(cv1_out),kernel_size = int(cv1_k), stride = int(cv1_s)) |
|
|
74 |
,nn.ReLU() |
|
|
75 |
,nn.MaxPool1d(kernel_size = int(p1_k), stride = int(p1_s)) |
|
|
76 |
) |
|
|
77 |
|
|
|
78 |
# 2 convolutional layers |
|
|
79 |
if self.num_cv > 1: |
|
|
80 |
self.CV2 = nn.Sequential( |
|
|
81 |
nn.Conv1d(in_channels = int(cv1_out), out_channels = int(cv2_out) ,kernel_size =int(cv2_k), stride = int(cv2_s)) |
|
|
82 |
,nn.ReLU() |
|
|
83 |
,nn.MaxPool1d(kernel_size = int(p2_k), stride = int(p2_s)) |
|
|
84 |
) |
|
|
85 |
|
|
|
86 |
#Adding a minibatch discriminator layer to add a cripple affect to the discriminator so that it needs to generate sequences that are different from each other. |
|
|
87 |
|
|
|
88 |
if self.minibatch > 0: |
|
|
89 |
self.mb1 = MinibatchDiscrimination(self.cv2_dims*cv2_out,self.minibatch, minibatch_normal_init) |
|
|
90 |
self.out = nn.Sequential(nn.Linear(int(self.cv2_dims*cv2_out)+self.minibatch,1),nn.Sigmoid()) # to make sure the output is between 0 and 1 |
|
|
91 |
else: |
|
|
92 |
self.out = nn.Sequential(nn.Linear(int(self.cv2_dims*cv2_out),1),nn.Sigmoid()) # to make sure the output is between 0 and 1 |
|
|
93 |
|
|
|
94 |
# 1 convolutional layer |
|
|
95 |
else: |
|
|
96 |
|
|
|
97 |
#Adding a minibatch discriminator layer to add a cripple affect to the discriminator so that it needs to generate sequences that are different from each other. |
|
|
98 |
if self.minibatch > 0 : |
|
|
99 |
|
|
|
100 |
self.mb1 = MinibatchDiscrimination(int(self.cv1_dims*cv1_out),self.minibatch, minibatch_normal_init) |
|
|
101 |
self.out = nn.Sequential(nn.Linear(int(self.cv1_dims*cv1_out)+self.minibatch,1),nn.Dropout(0.2),nn.Sigmoid()) # to make sure the output is between 0 and 1 |
|
|
102 |
else: |
|
|
103 |
self.out = nn.Sequential(nn.Linear(int(self.cv1_dims*cv1_out),1),nn.Sigmoid()) |
|
|
104 |
|
|
|
105 |
|
|
|
106 |
|
|
|
107 |
def forward(self,x): |
|
|
108 |
# print("Calculated Output dims after CV1: "+str(self.cv1_dims)) |
|
|
109 |
# print("input: "+str(x.size())) |
|
|
110 |
x = self.CV1(x.view(self.batch_size,1,self.seq_length)) |
|
|
111 |
# print("CV1 Output: "+str(x.size())) |
|
|
112 |
|
|
|
113 |
#2 Convolutional Layers |
|
|
114 |
if self.num_cv > 1: |
|
|
115 |
|
|
|
116 |
x = self.CV2(x) |
|
|
117 |
x = x.view(self.batch_size,-1) |
|
|
118 |
|
|
|
119 |
# print("CV2 Output: "+str(x.size())) |
|
|
120 |
if self.minibatch > 0: |
|
|
121 |
x = self.mb1(x.squeeze()) |
|
|
122 |
# print("minibatch output: "+str(x.size())) |
|
|
123 |
x = self.out(x.squeeze()) |
|
|
124 |
else: |
|
|
125 |
|
|
|
126 |
x = self.out(x.squeeze()) |
|
|
127 |
|
|
|
128 |
# 1 convolutional layers |
|
|
129 |
else: |
|
|
130 |
|
|
|
131 |
x = x.view(self.batch_size,-1) |
|
|
132 |
|
|
|
133 |
#1 convolutional Layer and minibatch discrimination |
|
|
134 |
if self.minibatch > 0: |
|
|
135 |
x = self.mb1(x) |
|
|
136 |
x = self.out(x) |
|
|
137 |
#1 convolutional Layer and no minibatch discrimination |
|
|
138 |
else: |
|
|
139 |
x = self.out(x) |
|
|
140 |
|
|
|
141 |
|
|
|
142 |
|
|
|
143 |
return x |
|
|
144 |
|
|
|
145 |
""" |
|
|
146 |
Generator Class |
|
|
147 |
--------------- |
|
|
148 |
This defines the Generator for evaluation. The Generator consists of two LSTM |
|
|
149 |
layers with a final fully connected layer. |
|
|
150 |
|
|
|
151 |
""" |
|
|
152 |
|
|
|
153 |
class Generator(nn.Module): |
|
|
154 |
def __init__(self,seq_length,batch_size,n_features = 1, hidden_dim = 50, |
|
|
155 |
num_layers = 2, tanh_output = False): |
|
|
156 |
super(Generator,self).__init__() |
|
|
157 |
self.n_features = n_features |
|
|
158 |
self.hidden_dim = hidden_dim |
|
|
159 |
self.num_layers = num_layers |
|
|
160 |
self.seq_length = seq_length |
|
|
161 |
self.batch_size = batch_size |
|
|
162 |
self.tanh_output = tanh_output |
|
|
163 |
|
|
|
164 |
|
|
|
165 |
|
|
|
166 |
self.layer1 = nn.LSTM(input_size = self.n_features, hidden_size = self.hidden_dim, |
|
|
167 |
num_layers = self.num_layers,batch_first = True#,dropout = 0.2, |
|
|
168 |
) |
|
|
169 |
if self.tanh_output == True: |
|
|
170 |
self.out = nn.Sequential(nn.Linear(self.hidden_dim,1),nn.Tanh()) # to make sure the output is between 0 and 1 - removed ,nn.Sigmoid() |
|
|
171 |
else: |
|
|
172 |
self.out = nn.Linear(self.hidden_dim,1) |
|
|
173 |
|
|
|
174 |
def init_hidden(self): |
|
|
175 |
weight = next(self.parameters()).data |
|
|
176 |
hidden = (weight.new(self.num_layers, self.batch_size, self.hidden_dim).zero_().to(device), weight.new(self.num_layers, self.batch_size, self.hidden_dim).zero_().to(device)) |
|
|
177 |
return hidden |
|
|
178 |
|
|
|
179 |
def forward(self,x,hidden): |
|
|
180 |
|
|
|
181 |
x,hidden = self.layer1(x.view(self.batch_size,self.seq_length,1),hidden) |
|
|
182 |
|
|
|
183 |
x = self.out(x) |
|
|
184 |
|
|
|
185 |
return x #,hidden |