Diff of /preprocessing/aux1.py [000000] .. [21363a]

Switch to unified view

a b/preprocessing/aux1.py
1
# -*- coding: utf-8 -*-
2
"""
3
Created on Wed Jul  8 22:00:08 2015.
4
5
@author: rc, alexandre
6
"""
7
8
9
import numpy as np
10
import pandas as pd
11
from mne.io import RawArray
12
from mne.channels import read_montage
13
from mne import create_info, concatenate_raws, pick_types
14
from sklearn.base import BaseEstimator, TransformerMixin
15
from glob import glob
16
17
18
def getChannelNames():
19
    """Return Channels names."""
20
    return ['Fp1', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'FC5', 'FC1', 'FC2',
21
            'FC6', 'T7', 'C3', 'Cz', 'C4', 'T8', 'TP9', 'CP5', 'CP1', 'CP2',
22
            'CP6', 'TP10', 'P7', 'P3', 'Pz', 'P4', 'P8', 'PO9', 'O1', 'Oz',
23
            'O2', 'PO10']
24
25
26
def getEventNames():
27
    """Return Event name."""
28
    return ['HandStart', 'FirstDigitTouch', 'BothStartLoadPhase', 'LiftOff',
29
            'Replace', 'BothReleased']
30
31
32
def load_raw_data(subject, test=False):
33
    """Load Raw data from files.
34
35
    For a given subject, csv files are loaded, converted to MNE raw instance
36
    and concatenated.
37
    If test is True, training data are composed of series 1 to 8 and test data
38
    of series 9 and test. Otherwise, training data are series 1 to 6 and test
39
    data series 7 and 8.
40
    """
41
    fnames_train = glob('../data/train/subj%d_series*_data.csv' % (subject))
42
    fnames_train.sort()
43
    if test:
44
        fnames_test = glob('../data/test/subj%d_series*_data.csv' % (subject))
45
        fnames_test.sort()
46
    else:
47
        fnames_test = fnames_train[-2:]
48
        fnames_train = fnames_train[:-2]
49
50
    # read and concatenate all the files
51
    raw_train = [creat_mne_raw_object(fname) for fname in fnames_train]
52
    raw_train = concatenate_raws(raw_train)
53
    # pick eeg signal
54
    picks = pick_types(raw_train.info, eeg=True)
55
56
    # get training data
57
    data_train = raw_train._data[picks].T
58
    labels_train = raw_train._data[32:].T
59
60
    raw_test = [creat_mne_raw_object(fname, read_events=not test) for fname in
61
                fnames_test]
62
    raw_test = concatenate_raws(raw_test)
63
    data_test = raw_test._data[picks].T
64
65
    # extract labels if validating on series 7&8
66
    labels_test = None
67
    if not test:
68
        labels_test = raw_test._data[32:].T
69
70
    return data_train, labels_train, data_test, labels_test
71
72
73
def creat_mne_raw_object(fname, read_events=True):
74
    """Create a mne raw instance from csv file."""
75
    # Read EEG file
76
    data = pd.read_csv(fname)
77
78
    # get chanel names
79
    ch_names = list(data.columns[1:])
80
81
    # read EEG standard montage from mne
82
    montage = read_montage('standard_1005', ch_names)
83
84
    ch_type = ['eeg']*len(ch_names)
85
    data = 1e-6*np.array(data[ch_names]).T
86
87
    if read_events:
88
        # events file
89
        ev_fname = fname.replace('_data', '_events')
90
        # read event file
91
        events = pd.read_csv(ev_fname)
92
        events_names = events.columns[1:]
93
        events_data = np.array(events[events_names]).T
94
95
        # define channel type, the first is EEG, the last 6 are stimulations
96
        ch_type.extend(['stim']*6)
97
        ch_names.extend(events_names)
98
        # concatenate event file and data
99
        data = np.concatenate((data, events_data))
100
101
    # create and populate MNE info structure
102
    info = create_info(ch_names, sfreq=500.0, ch_types=ch_type,
103
                       montage=montage)
104
    info['filename'] = fname
105
106
    # create raw object
107
    raw = RawArray(data, info, verbose=False)
108
109
    return raw
110
111
112
def sliding_window(sig, window=512, subsample=10, estimator=None):
113
    """Extract a slinding window from signal.
114
115
    Raw signal is padded with zeros on the left to avoid use of future data.
116
    """
117
    Ne, Ns = sig.shape
118
    # get the index before padding
119
    ix = range(0, Ns, subsample)
120
121
    # padd data
122
    padd = np.zeros((Ne, int(window) - 1))
123
    sig = np.concatenate((padd, sig), axis=1)
124
    Ne, Ns = sig.shape
125
126
    if estimator is None:
127
        estimator = np.array
128
    # call this to get the shape
129
    X = estimator(sig[:, 0:window])
130
    dims = list(X.shape)
131
    dims.insert(0, len(ix))
132
    dims = tuple(dims)
133
134
    # allocate array
135
    X = np.empty(dims, dtype=X.dtype)
136
    for i in range(len(ix)):
137
        X[i] = estimator(sig[:, ix[i]:(ix[i] + window)])
138
139
    return X
140
141
142
def delay_preds(X, delay=100, skip=2, subsample=1, start=0, jump=None):
143
    """Delay predictions.
144
145
    Create a feature vector by concatenation of present and past sample.
146
    The concatenation is done by shifting data to the right :
147
148
    out = | x1 x2 x3 ...  xn   |
149
          | 0  x1 x2 ...  xn-1 |
150
          | 0  0  x1 ...  xn-2 |
151
152
    No use of future data.
153
    """
154
    if jump is None:
155
        jump = range(0, delay, skip)
156
    Ns, Ne = X.shape
157
    Ns_subsampled = len(range(start, Ns, subsample))
158
    out = np.zeros((Ns_subsampled, Ne * len(jump)))
159
    for i, sk in enumerate(jump):
160
        chunk = X[0:(Ns - sk)][start::subsample]
161
        out[(Ns_subsampled-chunk.shape[0]):, (i * Ne):((i + 1) * Ne)] = chunk
162
    return out
163
164
165
def delay_preds_2d(X, delay=100, skip=2, subsample=1, start=0, jump=None):
166
    """Delay predictions with 2d shape.
167
168
    Same thing as delay_pred, but return delayed prediction with a 2d shape.
169
    """
170
    if jump is None:
171
        jump = range(0, delay, skip)
172
    Ns, Ne = X.shape
173
    Ns_subsampled = len(range(start, Ns, subsample))
174
    out = np.zeros((Ns_subsampled, len(jump), Ne))
175
    for i, sk in enumerate(jump):
176
        chunk = X[0:(Ns - sk)][start::subsample]
177
        out[(Ns_subsampled-chunk.shape[0]):, i, :] = chunk
178
    return out[:, ::-1, :]
179
180
181
class SlidingWindow(BaseEstimator, TransformerMixin):
182
183
    """Sliding Window tranformer Mixin."""
184
185
    def __init__(self, window=500, subsample=10, estimator=np.array):
186
        """Init."""
187
        self.window = window
188
        self.subsample = subsample
189
        self.estimator = estimator
190
191
    def fit(self, X, y=None):
192
        """Fit, not used."""
193
        return self
194
195
    def transform(self, X, y=None):
196
        """Transform."""
197
        return sliding_window(X.T, window=self.window,
198
                              subsample=self.subsample,
199
                              estimator=self.estimator)
200
201
    def update_subsample(self, old_sub, new_sub):
202
        """update subsampling."""
203
        self.subsample = new_sub
204
205
206
class SubSample(BaseEstimator, TransformerMixin):
207
208
    """Subsample tranformer Mixin."""
209
210
    def __init__(self, subsample=10):
211
        """Init."""
212
        self.subsample = subsample
213
214
    def fit(self, X, y=None):
215
        """Fit, not used."""
216
        return self
217
218
    def transform(self, X, y=None):
219
        """Transform."""
220
        return X[::self.subsample]
221
222
    def update_subsample(self, old_sub, new_sub):
223
        """update subsampling."""
224
        self.subsample = new_sub
225
226
227
class DelayPreds(BaseEstimator, TransformerMixin):
228
229
    """Delayed prediction tranformer Mixin."""
230
231
    def __init__(self, delay=1000, skip=100, two_dim=False):
232
        """Init."""
233
        self.delay = delay
234
        self.skip = skip
235
        self.two_dim = two_dim
236
237
    def fit(self, X, y=None):
238
        """Fit, not used."""
239
        return self
240
241
    def transform(self, X, y=None):
242
        """Transform."""
243
        if self.two_dim:
244
            return delay_preds_2d(X, delay=self.delay, skip=self.skip)
245
        else:
246
            return delay_preds(X, delay=self.delay, skip=self.skip)
247
248
    def update_subsample(self, old_sub, new_sub):
249
        """update subsampling."""
250
        ratio = old_sub / new_sub
251
        self.delay = int(self.delay * ratio)
252
        self.skip = int(self.skip * ratio)
253
254
255
class NoneTransformer(BaseEstimator, TransformerMixin):
256
257
    """Return None Transformer."""
258
259
    def __init__(self):
260
        """Init."""
261
        pass
262
263
    def fit(self, X, y=None):
264
        """Fit, not used."""
265
        return self
266
267
    def transform(self, X, y=None):
268
        """Transform."""
269
        return None