[00c700]: / src / cloudbrain / core / model.py

Download this file

55 lines (44 with data), 1.6 kB

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""
Cloudbrain's OO data model.
"""
class MetricBuffer(object):
def __init__(self, name, num_channels, buffer_size):
self.name = name
self.num_channels = num_channels
self.metric_names = ["channel_%s" % i for i in range(self.num_channels)]
self.metric_names.append("timestamp")
self.buffer_size = buffer_size
self.data_buffer = []
def _validate_datum(self, datum):
"""
Validate if the datum being sent is following the right schema.
:param datum: MetricBuffer data point. E.g:
{"timestamp": <float>, "metric_0": <float>, ..., "metric_7": <float>}
:type datum: dict
"""
if sorted(datum.keys()) != sorted(self.metric_names):
raise ValueError("MetricBuffer keys should be %s but are %s" % (
self.metric_names, datum.keys()))
def add(self, datum):
"""
Append datum to the buffer.
:param datum: metric data point with the following format:
{"timestamp": <float>, "metric_0": <float>, ..., "metric_7": <float>}
:type datum: dict
:returns: (list of dicts) 'None' if the buffer isn't full yet.
A list of dicts otherwise. E.g:
[
{"timestamp": <float>, "metric_0": <float>, ..., "metric_7": <float>},
...
{"timestamp": <float>, "metric_0": <float>, ..., "metric_7": <float>}
]
:rtype: list of dicts
"""
self._validate_datum(datum)
self.data_buffer.append(datum)
if len(self.data_buffer) >= self.buffer_size:
data_buffer = self.data_buffer
self.data_buffer = []
return data_buffer
else:
return None