a b/Models/main-pretrain_model.py
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""
5
Notice: 
6
[1] This code is only for reference.
7
Please modify the codes to fit your own data.
8
[2] The Code is based on TensorFlow 2.X.
9
Please install the TensorFlow 2.X version.
10
"""
11
12
import numpy as np
13
import pandas as pd
14
15
import tensorflow as tf
16
from tensorflow.keras import layers
17
from tensorflow import keras
18
19
# Read Training Data
20
train_data = pd.read_csv('../source/S-traindata.csv', header=None)
21
train_data = np.array(train_data).astype('float32')
22
23
# Read Training Labels
24
train_labels = pd.read_csv('../source/S-trainlabel.csv', header=None)
25
train_labels = np.array(train_labels).astype('float32')
26
train_labels = tf.one_hot(indices=train_labels, depth=2)
27
train_labels = np.squeeze(train_labels)
28
29
# Read Testing Data
30
test_data = pd.read_csv('../source/S-testdata.csv', header=None)
31
test_data = np.array(test_data).astype('float32')
32
33
# Read Testing Labels
34
test_labels = pd.read_csv('../source/S-testlabel.csv', header=None)
35
test_labels = np.array(test_labels).astype('float32')
36
test_labels = tf.one_hot(indices=test_labels, depth=2)
37
test_labels = np.squeeze(test_labels)
38
39
40
class CatgoricalTP(tf.keras.metrics.Metric):
41
    def __init__(self, name='categorical_tp', **kwargs):
42
        super(CatgoricalTP, self).__init__(name=name, **kwargs)
43
        self.tp = self.add_weight(name='tp', initializer='zeros')
44
45
    def update_state(self, y_true, y_pred, sample_weight=None):
46
        y_pred = tf.argmax(y_pred, axis=-1)
47
        y_true = tf.argmax(y_true, axis=-1)
48
        values = tf.equal(tf.cast(y_pred, 'int32'), tf.cast(y_true, 'int32'))
49
        values = tf.cast(values, 'float32')
50
        if sample_weight is not None:
51
            sample_weights = tf.cast(sample_weight, 'float32')
52
            values = tf.multiply(values, sample_weights)
53
54
        self.tp.assign_add(tf.reduce_sum(values))
55
56
    def result(self):
57
        return self.tp
58
59
    def reset_states(self):
60
        self.tp.assign(0.)
61
62
63
class TransformerBlock(layers.Layer):
64
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.5):
65
        super(TransformerBlock, self).__init__()
66
        self.att = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
67
        self.ffn = keras.Sequential([layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim), ])
68
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
69
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
70
        self.dropout1 = layers.Dropout(rate)
71
        self.dropout2 = layers.Dropout(rate)
72
73
        self.embed_dim = embed_dim
74
        self.num_heads = num_heads
75
        self.ff_dim = ff_dim
76
        self.rate = rate
77
78
    def call(self, inputs, training):
79
        attn_output = self.att(inputs, inputs)
80
        attn_output = self.dropout1(attn_output, training=training)
81
        out1 = self.layernorm1(inputs + attn_output)
82
        ffn_output = self.ffn(out1)
83
        ffn_output = self.dropout2(ffn_output, training=training)
84
        out = self.layernorm2(out1 + ffn_output)
85
86
        return out
87
88
89
class TokenAndPositionEmbedding(layers.Layer):
90
    def __init__(self, maxlen, embed_dim):
91
        super(TokenAndPositionEmbedding, self).__init__()
92
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)
93
        self.maxlen = maxlen
94
        self.embed_dim = embed_dim
95
96
    def call(self, x):
97
        positions = tf.range(start=0, limit=self.maxlen, delta=1)
98
        positions = self.pos_emb(positions)
99
        x = tf.reshape(x, [-1, self.maxlen, self.embed_dim])
100
        out = x + positions
101
102
        return out
103
104
105
maxlen = 3  # (Maximum) length of the signals
106
embed_dim = 97  # Number of features of one time point
107
num_heads = 8  # Number of attention heads
108
ff_dim = 64  # Hidden layer size in feed forward network inside transformer
109
110
111
def get_model():
112
    # Input Time-series
113
    inputs = layers.Input(shape=(maxlen * embed_dim,))
114
    embedding_layer = TokenAndPositionEmbedding(maxlen, embed_dim)
115
    x = embedding_layer(inputs)
116
117
    # Encoder Architecture
118
    transformer_block_1 = TransformerBlock(embed_dim=embed_dim, num_heads=num_heads, ff_dim=ff_dim)
119
    transformer_block_2 = TransformerBlock(embed_dim=embed_dim, num_heads=num_heads, ff_dim=ff_dim)
120
    x = transformer_block_1(x)
121
    x = transformer_block_2(x)
122
123
    # Output
124
    x = layers.GlobalMaxPooling1D()(x)
125
    x = layers.Dropout(0.5)(x)
126
    x = layers.Dense(64, activation="relu")(x)
127
    x = layers.Dropout(0.5)(x)
128
    outputs = layers.Dense(2, activation="softmax")(x)
129
130
    model = keras.Model(inputs=inputs, outputs=outputs)
131
132
    return model
133
134
135
model = get_model()
136
model.compile(optimizer=tf.keras.optimizers.Adam(lr=1e-4),
137
              loss="categorical_crossentropy",
138
              metrics=["accuracy", CatgoricalTP()])
139
140
history = model.fit(
141
    train_data, train_labels, batch_size=64, epochs=100, validation_data=(test_data, test_labels)
142
)
143
144
model.save_weights('model_weight')