[21363a]: / preprocessing / covs_alex.py

Download this file

136 lines (112 with data), 4.2 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding: utf-8 -*-
"""
Covariance model by alex.
@author: Alexandre Barachant
"""
import numpy as np
from pyriemann.classification import MDM
from pyriemann.utils.mean import mean_covariance
from sklearn.base import BaseEstimator, TransformerMixin
from multiprocessing import Pool
from functools import partial
from mne.event import _find_stim_steps
def create_sequence(events):
"""create sequence from events.
Create a sequence of non-overlapped States from labels.
"""
# init variable
sequence = np.zeros((events.shape[1], 1))
# get hand start
handStart = np.int64(_find_stim_steps(np.atleast_2d(events[0]), 0)[::2, 0])
# lift
lift_on = np.int64(_find_stim_steps(np.atleast_2d(events[1]), 0)[::2, 0])
lift_off = np.int64(_find_stim_steps(np.atleast_2d(events[3]), 0)[1::2, 0])
# replace
replace_on = np.int64(_find_stim_steps(np.atleast_2d(events[4]), 0)
[::2, 0])
replace_off = np.int64(_find_stim_steps(np.atleast_2d(events[5]), 0)
[1::2, 0])
for i in range(len(handStart)):
j = 1
sequence[(handStart[i] - 250):handStart[i]] = j
j += 1
sequence[handStart[i]:lift_on[i]] = j
j += 1
sequence[lift_on[i]:lift_off[i]] = j
j += 1
sequence[lift_off[i]:replace_on[i]] = j
j += 1
sequence[replace_on[i]:replace_off[i]] = j
j += 1
sequence[replace_off[i]:(replace_off[i] + 250)] = j
j += 1
return sequence
class DistanceCalculatorAlex(BaseEstimator, TransformerMixin):
"""Distance Calulator Based on MDM."""
def __init__(self, metric_mean='logeuclid', metric_dist=['riemann'],
n_jobs=7, subsample=10):
"""Init."""
self.metric_mean = metric_mean
self.metric_dist = metric_dist
self.n_jobs = n_jobs
self.subsample = subsample
def fit(self, X, y):
"""Fit."""
self.mdm = MDM(metric=self.metric_mean, n_jobs=self.n_jobs)
labels = np.squeeze(create_sequence(y.T)[::self.subsample])
self.mdm.fit(X, labels)
return self
def transform(self, X, y=None):
"""Transform."""
feattr = []
for metric in self.metric_dist:
self.mdm.metric_dist = metric
feat = self.mdm.transform(X)
# substract distance of the class 0
feat = feat[:, 1:] - np.atleast_2d(feat[:, 0]).T
feattr.append(feat)
feattr = np.concatenate(feattr, axis=1)
feattr[np.isnan(feattr)] = 0
return feattr
def fit_transform(self, X, y):
"""Fit and transform."""
self.fit(X, y)
return self.transform(X)
class DistanceCalculatorRafal(BaseEstimator, TransformerMixin):
"""Distance Calulator Based on MDM Rafal style."""
def __init__(self, metric_mean='logeuclid', metric_dist=['riemann'],
n_jobs=12, subsample=10):
"""Init."""
self.metric_mean = metric_mean
self.metric_dist = metric_dist
self.n_jobs = n_jobs
self.subsample = subsample
def fit(self, X, y):
"""Fit."""
self.mdm = MDM(metric=self.metric_mean, n_jobs=self.n_jobs)
labels = y[::self.subsample]
pCalcMeans = partial(mean_covariance, metric=self.metric_mean)
pool = Pool(processes=6)
mc1 = pool.map(pCalcMeans, [X[labels[:, i] == 1] for i in range(6)])
pool.close()
pool = Pool(processes=6)
mc0 = pool.map(pCalcMeans, [X[labels[:, i] == 0] for i in range(6)])
pool.close()
self.mdm.covmeans = mc1 + mc0
return self
def transform(self, X, y=None):
"""Transform."""
feattr = []
for metric in self.metric_dist:
self.mdm.metric_dist = metric
feat = self.mdm.transform(X)
# substract distance of the class 0
feat = feat[:, 0:6] - feat[:, 6:]
feattr.append(feat)
feattr = np.concatenate(feattr, axis=1)
feattr[np.isnan(feattr)] = 0
return feattr
def fit_transform(self, X, y):
"""Fit and transform."""
self.fit(X, y)
return self.transform(X)