Diff of /dhedfreader.py [000000] .. [3f9044]

Switch to unified view

a b/dhedfreader.py
1
'''
2
Reader for EDF+ files.
3
TODO:
4
 - add support for log-transformed channels:
5
   http://www.edfplus.info/specs/edffloat.html and test with
6
   data generated with
7
   http://www.edfplus.info/downloads/software/NeuroLoopGain.zip.
8
 - check annotations with Schalk's Physiobank data.
9
Copyright (c) 2012 Boris Reuderink.
10
'''
11
12
import re, datetime, operator, logging
13
import numpy as np
14
from collections import namedtuple
15
16
EVENT_CHANNEL = 'EDF Annotations'
17
log = logging.getLogger(__name__)
18
19
class EDFEndOfData: pass
20
21
22
def tal(tal_str):
23
  '''Return a list with (onset, duration, annotation) tuples for an EDF+ TAL
24
  stream.
25
  '''
26
  exp = '(?P<onset>[+\-]\d+(?:\.\d*)?)' + \
27
    '(?:\x15(?P<duration>\d+(?:\.\d*)?))?' + \
28
    '(\x14(?P<annotation>[^\x00]*))?' + \
29
    '(?:\x14\x00)'
30
31
  def annotation_to_list(annotation):
32
    return unicode(annotation, 'utf-8').split('\x14') if annotation else []
33
34
  def parse(dic):
35
    return (
36
      float(dic['onset']),
37
      float(dic['duration']) if dic['duration'] else 0.,
38
      annotation_to_list(dic['annotation']))
39
40
  return [parse(m.groupdict()) for m in re.finditer(exp, tal_str)]
41
42
43
def edf_header(f):
44
  h = {}
45
  assert f.tell() == 0  # check file position
46
  assert f.read(8) == '0       '
47
48
  # recording info)
49
  h['local_subject_id'] = f.read(80).strip()
50
  h['local_recording_id'] = f.read(80).strip()
51
52
  # parse timestamp
53
  (day, month, year) = [int(x) for x in re.findall('(\d+)', f.read(8))]
54
  (hour, minute, sec)= [int(x) for x in re.findall('(\d+)', f.read(8))]
55
  h['date_time'] = str(datetime.datetime(year + 2000, month, day,
56
    hour, minute, sec))
57
58
  # misc
59
  header_nbytes = int(f.read(8))
60
  subtype = f.read(44)[:5]
61
  h['EDF+'] = subtype in ['EDF+C', 'EDF+D']
62
  h['contiguous'] = subtype != 'EDF+D'
63
  h['n_records'] = int(f.read(8))
64
  h['record_length'] = float(f.read(8))  # in seconds
65
  nchannels = h['n_channels'] = int(f.read(4))
66
67
  # read channel info
68
  channels = range(h['n_channels'])
69
  h['label'] = [f.read(16).strip() for n in channels]
70
  h['transducer_type'] = [f.read(80).strip() for n in channels]
71
  h['units'] = [f.read(8).strip() for n in channels]
72
  h['physical_min'] = np.asarray([float(f.read(8)) for n in channels])
73
  h['physical_max'] = np.asarray([float(f.read(8)) for n in channels])
74
  h['digital_min'] = np.asarray([float(f.read(8)) for n in channels])
75
  h['digital_max'] = np.asarray([float(f.read(8)) for n in channels])
76
  h['prefiltering'] = [f.read(80).strip() for n in channels]
77
  h['n_samples_per_record'] = [int(f.read(8)) for n in channels]
78
  f.read(32 * nchannels)  # reserved
79
80
  assert f.tell() == header_nbytes
81
  return h
82
83
84
class BaseEDFReader:
85
  def __init__(self, file):
86
    self.file = file
87
88
89
  def read_header(self):
90
    self.header = h = edf_header(self.file)
91
92
    # calculate ranges for rescaling
93
    self.dig_min = h['digital_min']
94
    self.phys_min = h['physical_min']
95
    phys_range = h['physical_max'] - h['physical_min']
96
    dig_range = h['digital_max'] - h['digital_min']
97
    assert np.all(phys_range > 0)
98
    assert np.all(dig_range > 0)
99
    self.gain = phys_range / dig_range
100
101
102
  def read_raw_record(self):
103
    '''Read a record with data_2013 and return a list containing arrays with raw
104
    bytes.
105
    '''
106
    result = []
107
    for nsamp in self.header['n_samples_per_record']:
108
      samples = self.file.read(nsamp * 2)
109
      if len(samples) != nsamp * 2:
110
        raise EDFEndOfData
111
      result.append(samples)
112
    return result
113
114
115
  def convert_record(self, raw_record):
116
    '''Convert a raw record to a (time, signals, events) tuple based on
117
    information in the header.
118
    '''
119
    h = self.header
120
    dig_min, phys_min, gain = self.dig_min, self.phys_min, self.gain
121
    time = float('nan')
122
    signals = []
123
    events = []
124
    for (i, samples) in enumerate(raw_record):
125
      if h['label'][i] == EVENT_CHANNEL:
126
        ann = tal(samples)
127
        time = ann[0][0]
128
        events.extend(ann[1:])
129
        # print(i, samples)
130
        # exit()
131
      else:
132
        # 2-byte little-endian integers
133
        dig = np.fromstring(samples, '<i2').astype(np.float32)
134
        phys = (dig - dig_min[i]) * gain[i] + phys_min[i]
135
        signals.append(phys)
136
137
    return time, signals, events
138
139
140
  def read_record(self):
141
    return self.convert_record(self.read_raw_record())
142
143
144
  def records(self):
145
    '''
146
    Record generator.
147
    '''
148
    try:
149
      while True:
150
        yield self.read_record()
151
    except EDFEndOfData:
152
      pass
153
154
155
def load_edf(edffile):
156
  '''Load an EDF+ file.
157
  Very basic reader for EDF and EDF+ files. While BaseEDFReader does support
158
  exotic features like non-homogeneous sample rates and loading only parts of
159
  the stream, load_edf expects a single fixed sample rate for all channels and
160
  tries to load the whole file.
161
  Parameters
162
  ----------
163
  edffile : file-like object or string
164
  Returns
165
  -------
166
  Named tuple with the fields:
167
    X : NumPy array with shape p by n.
168
      Raw recording of n samples in p dimensions.
169
    sample_rate : float
170
      The sample rate of the recording. Note that mixed sample-rates are not
171
      supported.
172
    sens_lab : list of length p with strings
173
      The labels of the sensors used to record X.
174
    time : NumPy array with length n
175
      The time offset in the recording for each sample.
176
    annotations : a list with tuples
177
      EDF+ annotations are stored in (start, duration, description) tuples.
178
      start : float
179
        Indicates the start of the event in seconds.
180
      duration : float
181
        Indicates the duration of the event in seconds.
182
      description : list with strings
183
        Contains (multiple?) descriptions of the annotation event.
184
  '''
185
  if isinstance(edffile, basestring):
186
    with open(edffile, 'rb') as f:
187
      return load_edf(f)  # convert filename to file
188
189
  reader = BaseEDFReader(edffile)
190
  reader.read_header()
191
192
  h = reader.header
193
  log.debug('EDF header: %s' % h)
194
195
  # get sample rate info
196
  nsamp = np.unique(
197
    [n for (l, n) in zip(h['label'], h['n_samples_per_record'])
198
    if l != EVENT_CHANNEL])
199
  assert nsamp.size == 1, 'Multiple sample rates not supported!'
200
  sample_rate = float(nsamp[0]) / h['record_length']
201
202
  rectime, X, annotations = zip(*reader.records())
203
  X = np.hstack(X)
204
  annotations = reduce(operator.add, annotations)
205
  chan_lab = [lab for lab in reader.header['label'] if lab != EVENT_CHANNEL]
206
207
  # create timestamps
208
  if reader.header['contiguous']:
209
    time = np.arange(X.shape[1]) / sample_rate
210
  else:
211
    reclen = reader.header['record_length']
212
    within_rec_time = np.linspace(0, reclen, nsamp, endpoint=False)
213
    time = np.hstack([t + within_rec_time for t in rectime])
214
215
  tup = namedtuple('EDF', 'X sample_rate chan_lab time annotations')
216
  return tup(X, sample_rate, chan_lab, time, annotations)