a b/deepsleepnet_data/dhedfreader.py
1
#Source : https://github.com/akaraspt/deepsleepnet
2
3
'''
4
Reader for EDF+ files.
5
TODO:
6
 - add support for log-transformed channels:
7
   http://www.edfplus.info/specs/edffloat.html and test with
8
   data generated with
9
   http://www.edfplus.info/downloads/software/NeuroLoopGain.zip.
10
 - check annotations with Schalk's Physiobank data.
11
Copyright (c) 2012 Boris Reuderink.
12
'''
13
14
import re, datetime, operator, logging
15
import numpy as np
16
from collections import namedtuple
17
18
EVENT_CHANNEL = 'EDF Annotations'
19
log = logging.getLogger(__name__)
20
21
class EDFEndOfData: pass
22
23
24
def tal(tal_str):
25
  '''Return a list with (onset, duration, annotation) tuples for an EDF+ TAL
26
  stream.
27
  '''
28
  exp = '(?P<onset>[+\-]\d+(?:\.\d*)?)' + \
29
    '(?:\x15(?P<duration>\d+(?:\.\d*)?))?' + \
30
    '(\x14(?P<annotation>[^\x00]*))?' + \
31
    '(?:\x14\x00)'
32
33
  def annotation_to_list(annotation):
34
    return unicode(annotation, 'utf-8').split('\x14') if annotation else []
35
36
  def parse(dic):
37
    return (
38
      float(dic['onset']),
39
      float(dic['duration']) if dic['duration'] else 0.,
40
      annotation_to_list(dic['annotation']))
41
42
  return [parse(m.groupdict()) for m in re.finditer(exp, tal_str)]
43
44
45
def edf_header(f):
46
  h = {}
47
  assert f.tell() == 0  # check file position
48
  assert f.read(8) == '0       '
49
50
  # recording info)
51
  h['local_subject_id'] = f.read(80).strip()
52
  h['local_recording_id'] = f.read(80).strip()
53
54
  # parse timestamp
55
  (day, month, year) = [int(x) for x in re.findall('(\d+)', f.read(8))]
56
  (hour, minute, sec)= [int(x) for x in re.findall('(\d+)', f.read(8))]
57
  h['date_time'] = str(datetime.datetime(year + 2000, month, day,
58
    hour, minute, sec))
59
60
  # misc
61
  header_nbytes = int(f.read(8))
62
  subtype = f.read(44)[:5]
63
  h['EDF+'] = subtype in ['EDF+C', 'EDF+D']
64
  h['contiguous'] = subtype != 'EDF+D'
65
  h['n_records'] = int(f.read(8))
66
  h['record_length'] = float(f.read(8))  # in seconds
67
  nchannels = h['n_channels'] = int(f.read(4))
68
69
  # read channel info
70
  channels = range(h['n_channels'])
71
  h['label'] = [f.read(16).strip() for n in channels]
72
  h['transducer_type'] = [f.read(80).strip() for n in channels]
73
  h['units'] = [f.read(8).strip() for n in channels]
74
  h['physical_min'] = np.asarray([float(f.read(8)) for n in channels])
75
  h['physical_max'] = np.asarray([float(f.read(8)) for n in channels])
76
  h['digital_min'] = np.asarray([float(f.read(8)) for n in channels])
77
  h['digital_max'] = np.asarray([float(f.read(8)) for n in channels])
78
  h['prefiltering'] = [f.read(80).strip() for n in channels]
79
  h['n_samples_per_record'] = [int(f.read(8)) for n in channels]
80
  f.read(32 * nchannels)  # reserved
81
82
  assert f.tell() == header_nbytes
83
  return h
84
85
86
class BaseEDFReader:
87
  def __init__(self, file):
88
    self.file = file
89
90
91
  def read_header(self):
92
    self.header = h = edf_header(self.file)
93
94
    # calculate ranges for rescaling
95
    self.dig_min = h['digital_min']
96
    self.phys_min = h['physical_min']
97
    phys_range = h['physical_max'] - h['physical_min']
98
    dig_range = h['digital_max'] - h['digital_min']
99
    assert np.all(phys_range > 0)
100
    assert np.all(dig_range > 0)
101
    self.gain = phys_range / dig_range
102
103
104
  def read_raw_record(self):
105
    '''Read a record with data and return a list containing arrays with raw
106
    bytes.
107
    '''
108
    result = []
109
    for nsamp in self.header['n_samples_per_record']:
110
      samples = self.file.read(nsamp * 2)
111
      if len(samples) != nsamp * 2:
112
        raise EDFEndOfData
113
      result.append(samples)
114
    return result
115
116
117
  def convert_record(self, raw_record):
118
    '''Convert a raw record to a (time, signals, events) tuple based on
119
    information in the header.
120
    '''
121
    h = self.header
122
    dig_min, phys_min, gain = self.dig_min, self.phys_min, self.gain
123
    time = float('nan')
124
    signals = []
125
    events = []
126
    for (i, samples) in enumerate(raw_record):
127
      if h['label'][i] == EVENT_CHANNEL:
128
        ann = tal(samples)
129
        time = ann[0][0]
130
        events.extend(ann[1:])
131
        # print(i, samples)
132
        # exit()
133
      else:
134
        # 2-byte little-endian integers
135
        dig = np.fromstring(samples, '<i2').astype(np.float32)
136
        phys = (dig - dig_min[i]) * gain[i] + phys_min[i]
137
        signals.append(phys)
138
139
    return time, signals, events
140
141
142
  def read_record(self):
143
    return self.convert_record(self.read_raw_record())
144
145
146
  def records(self):
147
    '''
148
    Record generator.
149
    '''
150
    try:
151
      while True:
152
        yield self.read_record()
153
    except EDFEndOfData:
154
      pass
155
156
157
def load_edf(edffile):
158
  '''Load an EDF+ file.
159
  Very basic reader for EDF and EDF+ files. While BaseEDFReader does support
160
  exotic features like non-homogeneous sample rates and loading only parts of
161
  the stream, load_edf expects a single fixed sample rate for all channels and
162
  tries to load the whole file.
163
  Parameters
164
  ----------
165
  edffile : file-like object or string
166
  Returns
167
  -------
168
  Named tuple with the fields:
169
    X : NumPy array with shape p by n.
170
      Raw recording of n samples in p dimensions.
171
    sample_rate : float
172
      The sample rate of the recording. Note that mixed sample-rates are not
173
      supported.
174
    sens_lab : list of length p with strings
175
      The labels of the sensors used to record X.
176
    time : NumPy array with length n
177
      The time offset in the recording for each sample.
178
    annotations : a list with tuples
179
      EDF+ annotations are stored in (start, duration, description) tuples.
180
      start : float
181
        Indicates the start of the event in seconds.
182
      duration : float
183
        Indicates the duration of the event in seconds.
184
      description : list with strings
185
        Contains (multiple?) descriptions of the annotation event.
186
  '''
187
  if isinstance(edffile, basestring):
188
    with open(edffile, 'rb') as f:
189
      return load_edf(f)  # convert filename to file
190
191
  reader = BaseEDFReader(edffile)
192
  reader.read_header()
193
194
  h = reader.header
195
  log.debug('EDF header: %s' % h)
196
197
  # get sample rate info
198
  nsamp = np.unique(
199
    [n for (l, n) in zip(h['label'], h['n_samples_per_record'])
200
    if l != EVENT_CHANNEL])
201
  assert nsamp.size == 1, 'Multiple sample rates not supported!'
202
  sample_rate = float(nsamp[0]) / h['record_length']
203
204
  rectime, X, annotations = zip(*reader.records())
205
  X = np.hstack(X)
206
  annotations = reduce(operator.add, annotations)
207
  chan_lab = [lab for lab in reader.header['label'] if lab != EVENT_CHANNEL]
208
209
  # create timestamps
210
  if reader.header['contiguous']:
211
    time = np.arange(X.shape[1]) / sample_rate
212
  else:
213
    reclen = reader.header['record_length']
214
    within_rec_time = np.linspace(0, reclen, nsamp, endpoint=False)
215
    time = np.hstack([t + within_rec_time for t in rectime])
216
217
  tup = namedtuple('EDF', 'X sample_rate chan_lab time annotations')
218
  return tup(X, sample_rate, chan_lab, time, annotations)