--- a +++ b/src/cloudbrain/modules/sinks/csvout.py @@ -0,0 +1,122 @@ +import csv +import errno +import json +import logging +import re +import time +import os + +from cloudbrain.modules.interface import ModuleInterface + +_LOGGER = logging.getLogger(__name__) + + +def mkdir_p(path): + """ + Emulate the "mkdir -p" funcitonality of bash. See: + https://stackoverflow.com/questions/600268/ + mkdir-p-functionality-in-python/600612#600612 + """ + try: + os.makedirs(path) + except OSError as exc: # Python >2.5 + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise + + +def _clean_key(key): + """ + Strip the colons from routing key names and return a + name more appropriate for writing output files. + This should probably be moved to a cloudbrain library if it + ends up being a standard method of generating output files. + key: A routing key string + return: The key with colons replaced with underscores + """ + return '_'.join(key.split(':')) if ':' in key else key + + +def _clean_string(s): + """ + Return the given string converted to a string that can be used for a clean + filename. Remove leading and trailing spaces; convert other spaces to + underscores; and remove anything that is not an alphanumeric, dash, + underscore, or dot. + >>> _clean_string("john's portrait in 2004.jpg") + 'johns_portrait_in_2004.jpg' + """ + s = str(s).strip().replace(' ', '_') + return re.sub(r'(?u)[^-\w.]', '_', s) + + +class CSVOutSink(ModuleInterface): + def __init__(self, subscribers, publishers, out_dir): + """ + Set up the CSV writers for each subscribed topic and metric + TODO: Delete, back up, or warn of existing files to prevent + new headers from corrupting existing data. + """ + thread_event = False + super(CSVOutSink, self).__init__(subscribers, publishers) + _LOGGER.debug("Subscribers: %s" % self.subscribers) + _LOGGER.debug("Publishers: %s" % self.publishers) + + if not os.path.exists(out_dir): + _LOGGER.info("Creating missing directory {}".format(out_dir)) + mkdir_p(out_dir) + + self.thread_event = thread_event + self.headers = {} + self.file_name_pattern = '{base_routing_key}_{metric_name}.csv' + self.out_files = {} + # For each subscriber, get each routing key + for subscriber in self.subscribers: + base_routing_key = subscriber.base_routing_key + # For each metric buffer in the router open a file handle + # and save the corresponding CSV writer object + for metric_buffer in subscriber.metric_buffers.values(): + d = {'base_routing_key': _clean_key(base_routing_key), + 'metric_name': _clean_string(metric_buffer.name)} + file_name = self.file_name_pattern.format(**d) + _LOGGER.info("Opening file: {}".format(file_name)) + f = open(os.path.join(out_dir, file_name), 'w+') + writer = csv.writer(f) + headers = metric_buffer.metric_names + self.headers[file_name] = headers + writer.writerow(headers) + self.out_files[file_name] = writer + + def _csv_callback_factory(self, file_name): + def _csv_callback(unsed_ch, unsed_method, unsed_properties, + json_string): + """ + Parse a JSON data message and write its contents to the appropriate + CSV + """ + writer = self.out_files[file_name] + data_buffer = json.loads(json_string) + for data in data_buffer: + writer.writerow([data[h] for h in self.headers[file_name]]) + + return _csv_callback + + def start(self): + """ + Iterate through subscribers and metrics to build + callback functions for each. + """ + for subscriber in self.subscribers: + base_routing_key = subscriber.base_routing_key + for metric_buffer in subscriber.metric_buffers.values(): + num_channels = metric_buffer.num_channels + d = {'base_routing_key': _clean_key(base_routing_key), + 'metric_name': _clean_string(metric_buffer.name)} + file_name = self.file_name_pattern.format(**d) + csv_callback = self._csv_callback_factory(file_name) + routing_key = subscriber.base_routing_key + _LOGGER.debug('Subscribing to %s' % metric_buffer.name) + subscriber.subscribe(metric_buffer.name, csv_callback) + while self.thread_event: + time.sleep(1)