--- a +++ b/dhedfreader.py @@ -0,0 +1,216 @@ +''' +Reader for EDF+ files. +TODO: + - add support for log-transformed channels: + http://www.edfplus.info/specs/edffloat.html and test with + data generated with + http://www.edfplus.info/downloads/software/NeuroLoopGain.zip. + - check annotations with Schalk's Physiobank data. +Copyright (c) 2012 Boris Reuderink. +''' + +import re, datetime, operator, logging +import numpy as np +from collections import namedtuple + +EVENT_CHANNEL = 'EDF Annotations' +log = logging.getLogger(__name__) + +class EDFEndOfData: pass + + +def tal(tal_str): + '''Return a list with (onset, duration, annotation) tuples for an EDF+ TAL + stream. + ''' + exp = '(?P<onset>[+\-]\d+(?:\.\d*)?)' + \ + '(?:\x15(?P<duration>\d+(?:\.\d*)?))?' + \ + '(\x14(?P<annotation>[^\x00]*))?' + \ + '(?:\x14\x00)' + + def annotation_to_list(annotation): + return unicode(annotation, 'utf-8').split('\x14') if annotation else [] + + def parse(dic): + return ( + float(dic['onset']), + float(dic['duration']) if dic['duration'] else 0., + annotation_to_list(dic['annotation'])) + + return [parse(m.groupdict()) for m in re.finditer(exp, tal_str)] + + +def edf_header(f): + h = {} + assert f.tell() == 0 # check file position + assert f.read(8) == '0 ' + + # recording info) + h['local_subject_id'] = f.read(80).strip() + h['local_recording_id'] = f.read(80).strip() + + # parse timestamp + (day, month, year) = [int(x) for x in re.findall('(\d+)', f.read(8))] + (hour, minute, sec)= [int(x) for x in re.findall('(\d+)', f.read(8))] + h['date_time'] = str(datetime.datetime(year + 2000, month, day, + hour, minute, sec)) + + # misc + header_nbytes = int(f.read(8)) + subtype = f.read(44)[:5] + h['EDF+'] = subtype in ['EDF+C', 'EDF+D'] + h['contiguous'] = subtype != 'EDF+D' + h['n_records'] = int(f.read(8)) + h['record_length'] = float(f.read(8)) # in seconds + nchannels = h['n_channels'] = int(f.read(4)) + + # read channel info + channels = range(h['n_channels']) + h['label'] = [f.read(16).strip() for n in channels] + h['transducer_type'] = [f.read(80).strip() for n in channels] + h['units'] = [f.read(8).strip() for n in channels] + h['physical_min'] = np.asarray([float(f.read(8)) for n in channels]) + h['physical_max'] = np.asarray([float(f.read(8)) for n in channels]) + h['digital_min'] = np.asarray([float(f.read(8)) for n in channels]) + h['digital_max'] = np.asarray([float(f.read(8)) for n in channels]) + h['prefiltering'] = [f.read(80).strip() for n in channels] + h['n_samples_per_record'] = [int(f.read(8)) for n in channels] + f.read(32 * nchannels) # reserved + + assert f.tell() == header_nbytes + return h + + +class BaseEDFReader: + def __init__(self, file): + self.file = file + + + def read_header(self): + self.header = h = edf_header(self.file) + + # calculate ranges for rescaling + self.dig_min = h['digital_min'] + self.phys_min = h['physical_min'] + phys_range = h['physical_max'] - h['physical_min'] + dig_range = h['digital_max'] - h['digital_min'] + assert np.all(phys_range > 0) + assert np.all(dig_range > 0) + self.gain = phys_range / dig_range + + + def read_raw_record(self): + '''Read a record with data_2013 and return a list containing arrays with raw + bytes. + ''' + result = [] + for nsamp in self.header['n_samples_per_record']: + samples = self.file.read(nsamp * 2) + if len(samples) != nsamp * 2: + raise EDFEndOfData + result.append(samples) + return result + + + def convert_record(self, raw_record): + '''Convert a raw record to a (time, signals, events) tuple based on + information in the header. + ''' + h = self.header + dig_min, phys_min, gain = self.dig_min, self.phys_min, self.gain + time = float('nan') + signals = [] + events = [] + for (i, samples) in enumerate(raw_record): + if h['label'][i] == EVENT_CHANNEL: + ann = tal(samples) + time = ann[0][0] + events.extend(ann[1:]) + # print(i, samples) + # exit() + else: + # 2-byte little-endian integers + dig = np.fromstring(samples, '<i2').astype(np.float32) + phys = (dig - dig_min[i]) * gain[i] + phys_min[i] + signals.append(phys) + + return time, signals, events + + + def read_record(self): + return self.convert_record(self.read_raw_record()) + + + def records(self): + ''' + Record generator. + ''' + try: + while True: + yield self.read_record() + except EDFEndOfData: + pass + + +def load_edf(edffile): + '''Load an EDF+ file. + Very basic reader for EDF and EDF+ files. While BaseEDFReader does support + exotic features like non-homogeneous sample rates and loading only parts of + the stream, load_edf expects a single fixed sample rate for all channels and + tries to load the whole file. + Parameters + ---------- + edffile : file-like object or string + Returns + ------- + Named tuple with the fields: + X : NumPy array with shape p by n. + Raw recording of n samples in p dimensions. + sample_rate : float + The sample rate of the recording. Note that mixed sample-rates are not + supported. + sens_lab : list of length p with strings + The labels of the sensors used to record X. + time : NumPy array with length n + The time offset in the recording for each sample. + annotations : a list with tuples + EDF+ annotations are stored in (start, duration, description) tuples. + start : float + Indicates the start of the event in seconds. + duration : float + Indicates the duration of the event in seconds. + description : list with strings + Contains (multiple?) descriptions of the annotation event. + ''' + if isinstance(edffile, basestring): + with open(edffile, 'rb') as f: + return load_edf(f) # convert filename to file + + reader = BaseEDFReader(edffile) + reader.read_header() + + h = reader.header + log.debug('EDF header: %s' % h) + + # get sample rate info + nsamp = np.unique( + [n for (l, n) in zip(h['label'], h['n_samples_per_record']) + if l != EVENT_CHANNEL]) + assert nsamp.size == 1, 'Multiple sample rates not supported!' + sample_rate = float(nsamp[0]) / h['record_length'] + + rectime, X, annotations = zip(*reader.records()) + X = np.hstack(X) + annotations = reduce(operator.add, annotations) + chan_lab = [lab for lab in reader.header['label'] if lab != EVENT_CHANNEL] + + # create timestamps + if reader.header['contiguous']: + time = np.arange(X.shape[1]) / sample_rate + else: + reclen = reader.header['record_length'] + within_rec_time = np.linspace(0, reclen, nsamp, endpoint=False) + time = np.hstack([t + within_rec_time for t in rectime]) + + tup = namedtuple('EDF', 'X sample_rate chan_lab time annotations') + return tup(X, sample_rate, chan_lab, time, annotations)