[00c700]: / src / cloudbrain / modules / sources / muse.py

Download this file

67 lines (47 with data), 2.2 kB

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import logging
import time
from cloudbrain.connectors.muse import MuseConnector
from cloudbrain.modules.interface import ModuleInterface
_LOGGER = logging.getLogger(__name__)
_LOGGER.level = logging.DEBUG
_LOGGER.addHandler(logging.StreamHandler())
class MuseSource(ModuleInterface):
def __init__(self, subscribers, publishers, ip, port, start_muse_io):
super(MuseSource, self).__init__(subscribers, publishers)
self.ip = ip
self.port = port
self.start_muse_io = start_muse_io
_LOGGER.debug("Subscribers: %s" % self.subscribers)
_LOGGER.debug("Publishers: %s" % self.publishers)
def start(self):
# Callback functions to handle the sample for that metric.
# Each metric has a specific number of channels.
callback_functions = {}
for publisher in self.publishers:
for routing_key, metric_buffer in publisher.metric_buffers.items():
# Note: routing_key is "user_key:metric_name" here.
if routing_key not in callback_functions:
callback_functions[routing_key] = self.callback_factory(
metric_buffer.name, metric_buffer.num_channels,
publisher)
connector = MuseConnector(ip=self.ip,
port=self.port,
start_muse_io=self.start_muse_io,
callback_functions=callback_functions)
_LOGGER.info('Starting muse connector ...')
connector.start()
def callback_factory(self, metric_name, num_channels, publisher):
"""
Callback function generator
:return: callback function
"""
def callback(*data):
"""
Handle muse samples for this metric
"""
data = data[1:] # the first element is the OSC path
message = {'timestamp': int(time.time() * 1000000)} # microseconds
for i in range(num_channels):
message["channel_%s" % i] = data[i]
publisher.publish(metric_name, message)
return callback