Switch to side-by-side view

--- a
+++ b/src/cloudbrain/modules/sources/muse.py
@@ -0,0 +1,66 @@
+import logging
+import time
+
+from cloudbrain.connectors.muse import MuseConnector
+from cloudbrain.modules.interface import ModuleInterface
+
+_LOGGER = logging.getLogger(__name__)
+_LOGGER.level = logging.DEBUG
+_LOGGER.addHandler(logging.StreamHandler())
+
+
+
+class MuseSource(ModuleInterface):
+    def __init__(self, subscribers, publishers, ip, port, start_muse_io):
+
+        super(MuseSource, self).__init__(subscribers, publishers)
+        self.ip = ip
+        self.port = port
+        self.start_muse_io = start_muse_io
+
+        _LOGGER.debug("Subscribers: %s" % self.subscribers)
+        _LOGGER.debug("Publishers: %s" % self.publishers)
+
+
+    def start(self):
+
+        # Callback functions to handle the sample for that metric.
+        # Each metric has a specific number of channels.
+        callback_functions = {}
+
+        for publisher in self.publishers:
+            for routing_key, metric_buffer in publisher.metric_buffers.items():
+                # Note: routing_key is "user_key:metric_name" here.
+                if routing_key not in callback_functions:
+                    callback_functions[routing_key] = self.callback_factory(
+                        metric_buffer.name, metric_buffer.num_channels,
+                        publisher)
+
+        connector = MuseConnector(ip=self.ip,
+                                  port=self.port,
+                                  start_muse_io=self.start_muse_io,
+                                  callback_functions=callback_functions)
+
+        _LOGGER.info('Starting muse connector ...')
+        connector.start()
+
+
+    def callback_factory(self, metric_name, num_channels, publisher):
+        """
+        Callback function generator
+        :return: callback function
+        """
+
+
+        def callback(*data):
+            """
+            Handle muse samples for this metric
+            """
+            data = data[1:] # the first element is the OSC path
+            message = {'timestamp': int(time.time() * 1000000)}  # microseconds
+            for i in range(num_channels):
+                message["channel_%s" % i] = data[i]
+            publisher.publish(metric_name, message)
+
+
+        return callback