[074d3d]: / mne / _fiff / utils.py

Download this file

332 lines (280 with data), 11.1 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import os
import os.path as op
from pathlib import Path
import numpy as np
from .constants import FIFF
from .meas_info import _get_valid_units
def _check_orig_units(orig_units):
"""Check original units from a raw file.
Units that are close to a valid_unit but not equal can be remapped to fit
into the valid_units. All other units that are not valid will be replaced
with "n/a".
Parameters
----------
orig_units : dict
Dictionary mapping channel names to their units as specified in
the header file. Example: {'FC1': 'nV'}
Returns
-------
orig_units_remapped : dict
Dictionary mapping channel names to their VALID units as specified in
the header file. Invalid units are now labeled "n/a".
Example: {'FC1': 'nV', 'Hfp3erz': 'n/a'}
"""
if orig_units is None:
return
valid_units = _get_valid_units()
valid_units_lowered = [unit.lower() for unit in valid_units]
orig_units_remapped = dict(orig_units)
for ch_name, unit in orig_units.items():
# Be lenient: we ignore case for now.
if unit.lower() in valid_units_lowered:
continue
# Common "invalid units" can be remapped to their valid equivalent
remap_dict = dict()
remap_dict["uv"] = "µV"
remap_dict["μv"] = "µV" # greek letter mu vs micro sign. use micro
remap_dict["\x83\xeav"] = "µV" # for shift-jis mu, use micro
if unit.lower() in remap_dict:
orig_units_remapped[ch_name] = remap_dict[unit.lower()]
continue
# Some units cannot be saved, they are invalid: assign "n/a"
orig_units_remapped[ch_name] = "n/a"
return orig_units_remapped
def _find_channels(ch_names, ch_type="EOG"):
"""Find EOG channel."""
substrings = (ch_type,)
substrings = [s.upper() for s in substrings]
if ch_type == "EOG":
substrings = ("EOG", "EYE")
eog_idx = [
idx
for idx, ch in enumerate(ch_names)
if any(substring in ch.upper() for substring in substrings)
]
return eog_idx
def _mult_cal_one(data_view, one, idx, cals, mult):
"""Take a chunk of raw data, multiply by mult or cals, and store."""
one = np.asarray(one, dtype=data_view.dtype)
assert data_view.shape[1] == one.shape[1], (
data_view.shape[1],
one.shape[1],
) # noqa: E501
if mult is not None:
assert mult.ndim == one.ndim == 2
data_view[:] = mult @ one[idx]
else:
assert cals is not None
if isinstance(idx, slice):
data_view[:] = one[idx]
else:
# faster than doing one = one[idx]
np.take(one, idx, axis=0, out=data_view)
data_view *= cals
def _blk_read_lims(start, stop, buf_len):
"""Deal with indexing in the middle of a data block.
Parameters
----------
start : int
Starting index.
stop : int
Ending index (exclusive).
buf_len : int
Buffer size in samples.
Returns
-------
block_start_idx : int
The first block to start reading from.
r_lims : list
The read limits.
d_lims : list
The write limits.
Notes
-----
Consider this example::
>>> start, stop, buf_len = 2, 27, 10
+---------+---------+---------
File structure: | buf0 | buf1 | buf2 |
+---------+---------+---------
File time: 0 10 20 30
+---------+---------+---------
Requested time: 2 27
| |
blockstart blockstop
| |
start stop
We need 27 - 2 = 25 samples (per channel) to store our data, and
we need to read from 3 buffers (30 samples) to get all of our data.
On all reads but the first, the data we read starts at
the first sample of the buffer. On all reads but the last,
the data we read ends on the last sample of the buffer.
We call ``this_data`` the variable that stores the current buffer's data,
and ``data`` the variable that stores the total output.
On the first read, we need to do this::
>>> data[0:buf_len-2] = this_data[2:buf_len] # doctest: +SKIP
On the second read, we need to do::
>>> data[1*buf_len-2:2*buf_len-2] = this_data[0:buf_len] # doctest: +SKIP
On the final read, we need to do::
>>> data[2*buf_len-2:3*buf_len-2-3] = this_data[0:buf_len-3] # doctest: +SKIP
This function encapsulates this logic to allow a loop over blocks, where
data is stored using the following limits::
>>> data[d_lims[ii, 0]:d_lims[ii, 1]] = this_data[r_lims[ii, 0]:r_lims[ii, 1]] # doctest: +SKIP
""" # noqa: E501
# this is used to deal with indexing in the middle of a sampling period
assert all(isinstance(x, int) for x in (start, stop, buf_len))
block_start_idx = start // buf_len
block_start = block_start_idx * buf_len
last_used_samp = stop - 1
block_stop = last_used_samp - last_used_samp % buf_len + buf_len
read_size = block_stop - block_start
n_blk = read_size // buf_len + (read_size % buf_len != 0)
start_offset = start - block_start
end_offset = block_stop - stop
d_lims = np.empty((n_blk, 2), int)
r_lims = np.empty((n_blk, 2), int)
for bi in range(n_blk):
# Triage start (sidx) and end (eidx) indices for
# data (d) and read (r)
if bi == 0:
d_sidx = 0
r_sidx = start_offset
else:
d_sidx = bi * buf_len - start_offset
r_sidx = 0
if bi == n_blk - 1:
d_eidx = stop - start
r_eidx = buf_len - end_offset
else:
d_eidx = (bi + 1) * buf_len - start_offset
r_eidx = buf_len
d_lims[bi] = [d_sidx, d_eidx]
r_lims[bi] = [r_sidx, r_eidx]
return block_start_idx, r_lims, d_lims
def _file_size(fname):
"""Get the file size in bytes."""
with open(fname, "rb") as f:
f.seek(0, os.SEEK_END)
return f.tell()
def _read_segments_file(
raw,
data,
idx,
fi,
start,
stop,
cals,
mult,
dtype,
n_channels=None,
offset=0,
trigger_ch=None,
):
"""Read a chunk of raw data."""
if n_channels is None:
n_channels = raw._raw_extras[fi]["orig_nchan"]
n_bytes = np.dtype(dtype).itemsize
# data_offset and data_left count data samples (channels x time points),
# not bytes.
data_offset = n_channels * start * n_bytes + offset
data_left = (stop - start) * n_channels
# Read up to 100 MB of data at a time, block_size is in data samples
block_size = ((int(100e6) // n_bytes) // n_channels) * n_channels
block_size = min(data_left, block_size)
with open(raw.filenames[fi], "rb", buffering=0) as fid:
fid.seek(data_offset)
# extract data in chunks
for sample_start in np.arange(0, data_left, block_size) // n_channels:
count = min(block_size, data_left - sample_start * n_channels)
block = np.fromfile(fid, dtype, count)
if block.size != count:
raise RuntimeError(
f"Incorrect number of samples ({block.size} != {count}), please "
"report this error to MNE-Python developers"
)
block = block.reshape(n_channels, -1, order="F")
n_samples = block.shape[1] # = count // n_channels
sample_stop = sample_start + n_samples
if trigger_ch is not None:
stim_ch = trigger_ch[start:stop][sample_start:sample_stop]
block = np.vstack((block, stim_ch))
data_view = data[:, sample_start:sample_stop]
_mult_cal_one(data_view, block, idx, cals, mult)
def read_str(fid, count=1):
"""Read string from a binary file in a python version compatible way."""
dtype = np.dtype(f">S{count}")
string = fid.read(dtype.itemsize)
data = np.frombuffer(string, dtype=dtype)[0]
bytestr = b"".join([data[0 : data.index(b"\x00") if b"\x00" in data else count]])
return str(bytestr.decode("ascii")) # Return native str type for Py2/3
def _create_chs(ch_names, cals, ch_coil, ch_kind, eog, ecg, emg, misc):
"""Initialize info['chs'] for eeg channels."""
chs = list()
for idx, ch_name in enumerate(ch_names):
if ch_name in eog or idx in eog:
coil_type = FIFF.FIFFV_COIL_NONE
kind = FIFF.FIFFV_EOG_CH
elif ch_name in ecg or idx in ecg:
coil_type = FIFF.FIFFV_COIL_NONE
kind = FIFF.FIFFV_ECG_CH
elif ch_name in emg or idx in emg:
coil_type = FIFF.FIFFV_COIL_NONE
kind = FIFF.FIFFV_EMG_CH
elif ch_name in misc or idx in misc:
coil_type = FIFF.FIFFV_COIL_NONE
kind = FIFF.FIFFV_MISC_CH
else:
coil_type = ch_coil
kind = ch_kind
chan_info = {
"cal": cals[idx],
"logno": idx + 1,
"scanno": idx + 1,
"range": 1.0,
"unit_mul": FIFF.FIFF_UNITM_NONE,
"ch_name": ch_name,
"unit": FIFF.FIFF_UNIT_V,
"coord_frame": FIFF.FIFFV_COORD_HEAD,
"coil_type": coil_type,
"kind": kind,
"loc": np.zeros(12),
}
if coil_type == FIFF.FIFFV_COIL_EEG:
chan_info["loc"][:3] = np.nan
chs.append(chan_info)
return chs
def _construct_bids_filename(base, ext, part_idx, validate=True):
"""Construct a BIDS compatible filename for split files."""
# insert index in filename
dirname = op.dirname(base)
base = op.basename(base)
deconstructed_base = base.split("_")
if len(deconstructed_base) < 2 and validate:
raise ValueError(
"Filename base must end with an underscore followed "
f"by the modality (e.g., _eeg or _meg), got {base}"
)
suffix = deconstructed_base[-1]
base = "_".join(deconstructed_base[:-1])
use_fname = f"{base}_split-{part_idx + 1:02}_{suffix}{ext}"
if dirname:
use_fname = op.join(dirname, use_fname)
return use_fname
def _make_split_fnames(fname, n_splits, split_naming):
"""Make a list of split filenames."""
if n_splits == 1:
fname = Path(fname)
return [fname]
res = []
base, ext = op.splitext(fname)
for i in range(n_splits):
if split_naming == "neuromag":
path = Path(f"{base}-{i:d}{ext}" if i else fname)
res.append(path)
else:
assert split_naming == "bids"
path = Path(_construct_bids_filename(base, ext, i))
res.append(path)
return res