--- a +++ b/src/cloudbrain/connectors/neurosky.py @@ -0,0 +1,247 @@ +# -*- coding: utf-8 -*- + +""" +Copyright Puzzlebox Productions, LLC (2010-2016) + +Ported from Puzzlebox Synapse +ThinkGear code imported from Puzzlebox.Synapse.ThinkGear.Server +http://puzzlebox.io + +This code is released under the GNU Pulic License (GPL) version 3 +For more information please refer to http://www.gnu.org/copyleft/gpl.html + +Author: Steve Castellotti <sc@puzzlebox.io> +""" + +__changelog__ = """Last Update: 2017.05.28""" +import threading +import signal +import sys + +from cloudbrain.connectors.thinkgear import SerialDevice +from cloudbrain.connectors.thinkgear import puzzlebox_synapse_protocol_thinkgear + +THINKGEAR_DEVICE_SERIAL_PORT = '/dev/tty.MindWaveMobile-DevA' +VALID_METRICS = ['eeg', 'poorSignalLevel', 'attention', 'meditation', 'delta', + 'theta', 'lowAlpha', 'highAlpha', 'lowBeta', 'highBeta', + 'lowGamma', 'highGamma'] + + +def displayCSVHeader(): + print("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s" % ( + 'timestamp', + 'eeg', + 'poorSignalLevel', + 'attention', + 'meditation', + 'delta', + 'theta', + 'lowAlpha', + 'highAlpha', + 'lowBeta', + 'highBeta', + 'lowGamma', + 'highGamma')) + + +def displayCSV(packet): + print("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s" % ( + packet['timestamp'], + packet['eeg'], + packet['poorSignalLevel'], + packet['attention'], + packet['meditation'], + packet['delta'], + packet['theta'], + packet['lowAlpha'], + packet['highAlpha'], + packet['lowBeta'], + packet['highBeta'], + packet['lowGamma'], + packet['highGamma'])) + + +class NeuroskyConnector(threading.Thread): + def __init__(self, callback_functions, + device_address=THINKGEAR_DEVICE_SERIAL_PORT, + verbosity=0): + + self.parent = None + self.protocol = None + self.serial_device = None + self.log = None + self.verbosity = verbosity + + threading.Thread.__init__(self, self.parent) + + self.device_address = device_address + + self.data = { + 'poorSignalLevel': 200, 'attention': 0, 'meditation': 0, 'delta': 0, + 'theta': 0, 'lowAlpha': 0, 'highAlpha': 0, 'lowBeta': 0, + 'highBeta': 0, 'lowGamma': 0, 'highGamma': 0 + } + + for metric in callback_functions.keys(): + if metric not in VALID_METRICS: + raise ValueError('Output metric is set to "%s" but must be in ' + 'the valid metrics list: %s' % (metric, + VALID_METRICS)) + + self.callback_functions = callback_functions + + # Final setup + self.configureEEG() + self.displayCSVHeader = True + + print("Attempting to connect to NeuroSky headset ...") + print("WARNING: Make sure the headset is on, paired, and " + "has enough battery.") + + def setPacketCount(self, value): + + if self.parent is not None: + self.parent.setPacketCount(value) + + def setBadPackets(self, value): + + if self.parent is not None: + self.parent.setBadPackets(value) + + def incrementPacketCount(self): + + if self.parent is not None: + self.parent.incrementPacketCount() + + def incrementBadPackets(self): + + if self.parent is not None: + self.parent.incrementBadPackets() + + def resetSessionStartTime(self): + + if self.parent is not None: + self.parent.resetSessionStartTime() + + def configureEEG(self): + + self.serial_device = SerialDevice( + self.log, + device_address=self.device_address, + DEBUG=0, + parent=self) + + self.serial_device.start() + + self.protocol = puzzlebox_synapse_protocol_thinkgear( + self.log, + self.serial_device, + device_model='NeuroSky MindWave', + DEBUG=0, + parent=self) + + self.protocol.start() + + def processPacketThinkGear(self, packet): + + if self.displayCSVHeader: + displayCSVHeader() + self.displayCSVHeader = False + + if self.verbosity >= 2: + print(packet) + + if 'rawEeg' in packet.keys(): + + packet['eeg'] = packet.pop('rawEeg') + packet['poorSignalLevel'] = self.data['poorSignalLevel'] + packet['attention'] = self.data['attention'] + packet['meditation'] = self.data['meditation'] + packet['delta'] = self.data['delta'] + packet['theta'] = self.data['theta'] + packet['lowAlpha'] = self.data['lowAlpha'] + packet['highAlpha'] = self.data['highAlpha'] + packet['lowBeta'] = self.data['lowBeta'] + packet['highBeta'] = self.data['highBeta'] + packet['lowGamma'] = self.data['lowGamma'] + packet['highGamma'] = self.data['highGamma'] + + if self.verbosity >= 1: + displayCSV(packet) + + for metric, callback in self.callback_functions.items(): + callback(packet['timestamp'], packet[metric]) + + else: + if 'poorSignalLevel' in packet.keys(): + self.data['poorSignalLevel'] = packet['poorSignalLevel'] + + if 'eegPower' in packet.keys(): + self.data['delta'] = packet['eegPower']['delta'] + self.data['theta'] = packet['eegPower']['theta'] + self.data['lowAlpha'] = packet['eegPower']['lowAlpha'] + self.data['highAlpha'] = packet['eegPower']['highAlpha'] + self.data['lowBeta'] = packet['eegPower']['lowBeta'] + self.data['highBeta'] = packet['eegPower']['highBeta'] + self.data['lowGamma'] = packet['eegPower']['lowGamma'] + self.data['highGamma'] = packet['eegPower']['highGamma'] + + if 'eSense' in packet.keys(): + if 'attention' in packet['eSense'].keys(): + self.data['attention'] = packet['eSense']['attention'] + if 'meditation' in packet['eSense'].keys(): + self.data['meditation'] = packet['eSense']['meditation'] + + def resetDevice(self): + + if self.serial_device is not None: + self.serial_device.exitThread() + + if self.protocol is not None: + self.protocol.exitThread() + + self.configureEEG() + + def exitThread(self, callThreadQuit=True): + + # Call disconnect block in protocol first due to above error + self.protocol.disconnectHardware() + + if self.serial_device is not None: + self.serial_device.exitThread() + + if self.protocol is not None: + self.protocol.exitThread() + + if callThreadQuit: + self.join() + + if self.parent is None: + sys.exit() + + +def callback_factory(metric_name): + def print_callback(timestamp, sample): + print('metric_name=%s, timestamp=%s, sample=%s' % (metric_name, + timestamp, sample)) + + return print_callback + + +def run(device_address=THINKGEAR_DEVICE_SERIAL_PORT): + """Run the NeuroskyConnector.""" + + # Perform correct KeyboardInterrupt handling + signal.signal(signal.SIGINT, signal.SIG_DFL) + + callbacks = {metric: callback_factory(metric) for metric in VALID_METRICS} + + connector = NeuroskyConnector( + callback_functions=callbacks, + device_address=device_address) + + connector.start() + + +if __name__ == "__main__": + run()