#!/usr/bin/env python2.7
from __future__ import print_function
from yapsy.PluginManager import PluginManager
import argparse # new in Python2.7
import atexit
import logging
import string
import sys
import threading
import time
logging.basicConfig(level=logging.ERROR)
# Load the plugins from the plugin directory.
manager = PluginManager()
if __name__ == '__main__':
print("------------user.py-------------")
parser = argparse.ArgumentParser(description="OpenBCI 'user'")
parser.add_argument('--board', default="cyton",
help="Choose between [cyton] and [ganglion] boards.")
parser.add_argument('-l', '--list', action='store_true',
help="List available plugins.")
parser.add_argument('-i', '--info', metavar='PLUGIN',
help="Show more information about a plugin.")
parser.add_argument('-p', '--port',
help="For Cyton, port to connect to OpenBCI Dongle " +
"( ex /dev/ttyUSB0 or /dev/tty.usbserial-* ). " +
"For Ganglion, MAC address of the board. For both, AUTO to attempt auto-detection.")
parser.set_defaults(port="AUTO")
# baud rate is not currently used
parser.add_argument('-b', '--baud', default=115200, type=int,
help="Baud rate (not currently used)")
parser.add_argument('--no-filtering', dest='filtering',
action='store_false',
help="Disable notch filtering")
parser.set_defaults(filtering=True)
parser.add_argument('-d', '--daisy', dest='daisy',
action='store_true',
help="Force daisy mode (cyton board)")
parser.add_argument('-x', '--aux', dest='aux',
action='store_true',
help="Enable accelerometer/AUX data (ganglion board)")
# first argument: plugin name, then parameters for plugin
parser.add_argument('-a', '--add', metavar=('PLUGIN', 'PARAM'),
action='append', nargs='+',
help="Select which plugins to activate and set parameters.")
parser.add_argument('--log', dest='log', action='store_true',
help="Log program")
parser.add_argument('--plugins-path', dest='plugins_path', nargs='+',
help="Additional path(s) to look for plugins")
parser.set_defaults(daisy=False, log=False)
args = parser.parse_args()
if not args.add:
print("WARNING: no plugin selected, you will only be able to communicate with the board. "
"You should select at least one plugin with '--add [plugin_name]'. "
"Use '--list' to show available plugins or '--info [plugin_name]' to get more information.")
if args.board == "cyton":
print("Board type: OpenBCI Cyton (v3 API)")
import openbci.cyton as bci
elif args.board == "ganglion":
print("Board type: OpenBCI Ganglion")
import openbci.ganglion as bci
else:
raise ValueError('Board type %r was not recognized. Known are 3 and 4' % args.board)
# Check AUTO port selection, a "None" parameter for the board API
if "AUTO" == args.port.upper():
print("Will try do auto-detect board's port. Set it manually with '--port' if it goes wrong.")
args.port = None
else:
print("Port: ", args.port)
plugins_paths = ["openbci/plugins"]
if args.plugins_path:
plugins_paths += args.plugins_path
manager.setPluginPlaces(plugins_paths)
manager.collectPlugins()
# Print list of available plugins and exit
if args.list:
print("Available plugins:")
for plugin in manager.getAllPlugins():
print("\t- " + plugin.name)
exit()
# User wants more info about a plugin...
if args.info:
plugin = manager.getPluginByName(args.info)
if plugin == None:
# eg: if an import fail inside a plugin, yapsy skip it
print("Error: [ " + args.info +
" ] not found or could not be loaded. Check name and requirements.")
else:
print(plugin.description)
plugin.plugin_object.show_help()
exit()
print("\n------------SETTINGS-------------")
print("Notch filtering:" + str(args.filtering))
# Logging
if args.log:
print("Logging Enabled: " + str(args.log))
logging.basicConfig(filename="OBCI.log", format='%(asctime)s - %(levelname)s : %(message)s',
level=logging.DEBUG)
logging.getLogger('yapsy').setLevel(logging.DEBUG)
logging.info('---------LOG START-------------')
logging.info(args)
else:
print("user.py: Logging Disabled.")
print("\n-------INSTANTIATING BOARD-------")
if args.board == "cyton":
board = bci.OpenBCICyton(port=args.port,
baud=args.baud,
daisy=args.daisy,
filter_data=args.filtering,
scaled_output=True,
log=args.log)
elif args.board == "ganglion":
board = bci.OpenBCIGanglion(port=args.port,
filter_data=args.filtering,
scaled_output=True,
log=args.log,
aux=args.aux)
# Info about effective number of channels and sampling rate
if board.daisy:
print("Force daisy mode:")
else:
print("No daisy:")
print(board.getNbEEGChannels(), "EEG channels and", board.getNbAUXChannels(), "AUX channels at",
board.getSampleRate(), "Hz.")
print("\n------------PLUGINS--------------")
# Loop round the plugins and print their names.
print("Found plugins:")
for plugin in manager.getAllPlugins():
print("[ " + plugin.name + " ]")
print("\n")
# Fetch plugins, try to activate them, add to the list if OK
plug_list = []
callback_list = []
if args.add:
for plug_candidate in args.add:
# first value: plugin name, then optional arguments
plug_name = plug_candidate[0]
plug_args = plug_candidate[1:]
# Try to find name
plug = manager.getPluginByName(plug_name)
if plug == None:
# eg: if an import fail inside a plugin, yapsy skip it
print("Error: [ " + plug_name + " ] not found or could not be loaded. Check name and requirements.")
else:
print("\nActivating [ " + plug_name + " ] plugin...")
if not plug.plugin_object.pre_activate(plug_args, sample_rate=board.getSampleRate(),
eeg_channels=board.getNbEEGChannels(),
aux_channels=board.getNbAUXChannels(),
imp_channels=board.getNbImpChannels()):
print("Error while activating [ " + plug_name + " ], check output for more info.")
else:
print("Plugin [ " + plug_name + "] added to the list")
plug_list.append(plug.plugin_object)
callback_list.append(plug.plugin_object)
if len(plug_list) == 0:
fun = None
else:
fun = callback_list
def cleanUp():
board.disconnect()
print("Deactivating Plugins...")
for plug in plug_list:
plug.deactivate()
print("User.py exiting...")
atexit.register(cleanUp)
print("--------------INFO---------------")
print("User serial interface enabled...\n\
View command map at http://docs.openbci.com.\n\
Type /start to run (/startimp for impedance \n\
checking, if supported) -- and /stop\n\
before issuing new commands afterwards.\n\
Type /exit to exit. \n\
Board outputs are automatically printed as: \n\
% <tab> message\n\
$$$ signals end of message")
print("\n-------------BEGIN---------------")
# Init board state
# s: stop board streaming; v: soft reset of the 32-bit board (no effect with 8bit board)
s = 'sv'
# Tell the board to enable or not daisy module
if board.daisy:
s = s + 'C'
else:
s = s + 'c'
# d: Channels settings back to default
s = s + 'd'
while s != "/exit":
# Send char and wait for registers to set
if not s:
pass
elif "help" in s:
print("View command map at: \
http://docs.openbci.com/software/01-OpenBCI_SDK.\n\
For user interface: read README or view \
https://github.com/OpenBCI/OpenBCI_Python")
elif board.streaming and s != "/stop":
print("Error: the board is currently streaming data, please type '/stop' before issuing new commands.")
else:
# read silently incoming packet if set (used when stream is stopped)
flush = False
if '/' == s[0]:
s = s[1:]
rec = False # current command is recognized or fot
if "T:" in s:
lapse = int(s[string.find(s, "T:") + 2:])
rec = True
elif "t:" in s:
lapse = int(s[string.find(s, "t:") + 2:])
rec = True
else:
lapse = -1
if 'startimp' in s:
if board.getBoardType() == "cyton":
print("Impedance checking not supported on cyton.")
else:
board.setImpedance(True)
if (fun != None):
# start streaming in a separate thread so we could always send commands in here
boardThread = threading.Thread(target=board.start_streaming, args=(fun, lapse))
boardThread.daemon = True # will stop on exit
try:
boardThread.start()
except:
raise
else:
print("No function loaded")
rec = True
elif "start" in s:
board.setImpedance(False)
if fun != None:
# start streaming in a separate thread so we could always send commands in here
boardThread = threading.Thread(target=board.start_streaming, args=(fun, lapse))
boardThread.daemon = True # will stop on exit
try:
boardThread.start()
except:
raise
else:
print("No function loaded")
rec = True
elif 'test' in s:
test = int(s[s.find("test") + 4:])
board.test_signal(test)
rec = True
elif 'stop' in s:
board.stop()
rec = True
flush = True
if rec == False:
print("Command not recognized...")
elif s:
for c in s:
if sys.hexversion > 0x03000000:
board.ser_write(bytes(c, 'utf-8'))
else:
board.ser_write(bytes(c))
time.sleep(0.100)
line = ''
time.sleep(0.1) # Wait to see if the board has anything to report
# The Cyton nicely return incoming packets -- here supposedly messages
# whereas the Ganglion prints incoming ASCII message by itself
if board.getBoardType() == "cyton":
while board.ser_inWaiting():
# we're supposed to get UTF8 text, but the board might behave otherwise
c = board.ser_read().decode('utf-8', errors='replace')
line += c
time.sleep(0.001)
if (c == '\n') and not flush:
print('%\t' + line[:-1])
line = ''
elif board.getBoardType() == "ganglion":
while board.ser_inWaiting():
board.waitForNotifications(0.001)
if not flush:
print(line)
# Take user input
# s = input('--> ')
if sys.hexversion > 0x03000000:
s = input('--> ')
else:
s = raw_input('--> ')