a b/src/cloudbrain/connectors/neurosky.py
1
# -*- coding: utf-8 -*-
2
3
"""
4
Copyright Puzzlebox Productions, LLC (2010-2016)
5
6
Ported from Puzzlebox Synapse
7
ThinkGear code imported from Puzzlebox.Synapse.ThinkGear.Server
8
http://puzzlebox.io
9
10
This code is released under the GNU Pulic License (GPL) version 3
11
For more information please refer to http://www.gnu.org/copyleft/gpl.html
12
13
Author: Steve Castellotti <sc@puzzlebox.io>
14
"""
15
16
__changelog__ = """Last Update: 2017.05.28"""
17
import threading
18
import signal
19
import sys
20
21
from cloudbrain.connectors.thinkgear import SerialDevice
22
from cloudbrain.connectors.thinkgear import puzzlebox_synapse_protocol_thinkgear
23
24
THINKGEAR_DEVICE_SERIAL_PORT = '/dev/tty.MindWaveMobile-DevA'
25
VALID_METRICS = ['eeg', 'poorSignalLevel', 'attention', 'meditation', 'delta',
26
                 'theta', 'lowAlpha', 'highAlpha', 'lowBeta', 'highBeta',
27
                 'lowGamma', 'highGamma']
28
29
30
def displayCSVHeader():
31
    print("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s" % (
32
        'timestamp',
33
        'eeg',
34
        'poorSignalLevel',
35
        'attention',
36
        'meditation',
37
        'delta',
38
        'theta',
39
        'lowAlpha',
40
        'highAlpha',
41
        'lowBeta',
42
        'highBeta',
43
        'lowGamma',
44
        'highGamma'))
45
46
47
def displayCSV(packet):
48
    print("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s" % (
49
        packet['timestamp'],
50
        packet['eeg'],
51
        packet['poorSignalLevel'],
52
        packet['attention'],
53
        packet['meditation'],
54
        packet['delta'],
55
        packet['theta'],
56
        packet['lowAlpha'],
57
        packet['highAlpha'],
58
        packet['lowBeta'],
59
        packet['highBeta'],
60
        packet['lowGamma'],
61
        packet['highGamma']))
62
63
64
class NeuroskyConnector(threading.Thread):
65
    def __init__(self, callback_functions,
66
                 device_address=THINKGEAR_DEVICE_SERIAL_PORT,
67
                 verbosity=0):
68
69
        self.parent = None
70
        self.protocol = None
71
        self.serial_device = None
72
        self.log = None
73
        self.verbosity = verbosity
74
75
        threading.Thread.__init__(self, self.parent)
76
77
        self.device_address = device_address
78
79
        self.data = {
80
            'poorSignalLevel': 200, 'attention': 0, 'meditation': 0, 'delta': 0,
81
            'theta': 0, 'lowAlpha': 0, 'highAlpha': 0, 'lowBeta': 0,
82
            'highBeta': 0, 'lowGamma': 0, 'highGamma': 0
83
        }
84
85
        for metric in callback_functions.keys():
86
            if metric not in VALID_METRICS:
87
                raise ValueError('Output metric is set to "%s" but must be in '
88
                                 'the valid metrics list: %s' % (metric,
89
                                                                 VALID_METRICS))
90
91
        self.callback_functions = callback_functions
92
93
        # Final setup
94
        self.configureEEG()
95
        self.displayCSVHeader = True
96
97
        print("Attempting to connect to NeuroSky headset ...")
98
        print("WARNING: Make sure the headset is on, paired, and "
99
              "has enough battery.")
100
101
    def setPacketCount(self, value):
102
103
        if self.parent is not None:
104
            self.parent.setPacketCount(value)
105
106
    def setBadPackets(self, value):
107
108
        if self.parent is not None:
109
            self.parent.setBadPackets(value)
110
111
    def incrementPacketCount(self):
112
113
        if self.parent is not None:
114
            self.parent.incrementPacketCount()
115
116
    def incrementBadPackets(self):
117
118
        if self.parent is not None:
119
            self.parent.incrementBadPackets()
120
121
    def resetSessionStartTime(self):
122
123
        if self.parent is not None:
124
            self.parent.resetSessionStartTime()
125
126
    def configureEEG(self):
127
128
        self.serial_device = SerialDevice(
129
            self.log,
130
            device_address=self.device_address,
131
            DEBUG=0,
132
            parent=self)
133
134
        self.serial_device.start()
135
136
        self.protocol = puzzlebox_synapse_protocol_thinkgear(
137
            self.log,
138
            self.serial_device,
139
            device_model='NeuroSky MindWave',
140
            DEBUG=0,
141
            parent=self)
142
143
        self.protocol.start()
144
145
    def processPacketThinkGear(self, packet):
146
147
        if self.displayCSVHeader:
148
            displayCSVHeader()
149
            self.displayCSVHeader = False
150
151
        if self.verbosity >= 2:
152
            print(packet)
153
154
        if 'rawEeg' in packet.keys():
155
156
            packet['eeg'] = packet.pop('rawEeg')
157
            packet['poorSignalLevel'] = self.data['poorSignalLevel']
158
            packet['attention'] = self.data['attention']
159
            packet['meditation'] = self.data['meditation']
160
            packet['delta'] = self.data['delta']
161
            packet['theta'] = self.data['theta']
162
            packet['lowAlpha'] = self.data['lowAlpha']
163
            packet['highAlpha'] = self.data['highAlpha']
164
            packet['lowBeta'] = self.data['lowBeta']
165
            packet['highBeta'] = self.data['highBeta']
166
            packet['lowGamma'] = self.data['lowGamma']
167
            packet['highGamma'] = self.data['highGamma']
168
169
            if self.verbosity >= 1:
170
                displayCSV(packet)
171
172
            for metric, callback in self.callback_functions.items():
173
                callback(packet['timestamp'], packet[metric])
174
175
        else:
176
            if 'poorSignalLevel' in packet.keys():
177
                self.data['poorSignalLevel'] = packet['poorSignalLevel']
178
179
            if 'eegPower' in packet.keys():
180
                self.data['delta'] = packet['eegPower']['delta']
181
                self.data['theta'] = packet['eegPower']['theta']
182
                self.data['lowAlpha'] = packet['eegPower']['lowAlpha']
183
                self.data['highAlpha'] = packet['eegPower']['highAlpha']
184
                self.data['lowBeta'] = packet['eegPower']['lowBeta']
185
                self.data['highBeta'] = packet['eegPower']['highBeta']
186
                self.data['lowGamma'] = packet['eegPower']['lowGamma']
187
                self.data['highGamma'] = packet['eegPower']['highGamma']
188
189
            if 'eSense' in packet.keys():
190
                if 'attention' in packet['eSense'].keys():
191
                    self.data['attention'] = packet['eSense']['attention']
192
                if 'meditation' in packet['eSense'].keys():
193
                    self.data['meditation'] = packet['eSense']['meditation']
194
195
    def resetDevice(self):
196
197
        if self.serial_device is not None:
198
            self.serial_device.exitThread()
199
200
        if self.protocol is not None:
201
            self.protocol.exitThread()
202
203
        self.configureEEG()
204
205
    def exitThread(self, callThreadQuit=True):
206
207
        # Call disconnect block in protocol first due to above error
208
        self.protocol.disconnectHardware()
209
210
        if self.serial_device is not None:
211
            self.serial_device.exitThread()
212
213
        if self.protocol is not None:
214
            self.protocol.exitThread()
215
216
        if callThreadQuit:
217
            self.join()
218
219
        if self.parent is None:
220
            sys.exit()
221
222
223
def callback_factory(metric_name):
224
    def print_callback(timestamp, sample):
225
        print('metric_name=%s, timestamp=%s, sample=%s' % (metric_name,
226
                                                           timestamp, sample))
227
228
    return print_callback
229
230
231
def run(device_address=THINKGEAR_DEVICE_SERIAL_PORT):
232
    """Run the NeuroskyConnector."""
233
234
    # Perform correct KeyboardInterrupt handling
235
    signal.signal(signal.SIGINT, signal.SIG_DFL)
236
237
    callbacks = {metric: callback_factory(metric) for metric in VALID_METRICS}
238
239
    connector = NeuroskyConnector(
240
        callback_functions=callbacks,
241
        device_address=device_address)
242
243
    connector.start()
244
245
246
if __name__ == "__main__":
247
    run()