"""Read .res4 files."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import os.path as op
import numpy as np
from ...utils import logger
from .constants import CTF
def _make_ctf_name(directory, extra, raise_error=True):
"""Make a CTF name."""
fname = op.join(directory, op.basename(directory)[:-3] + "." + extra)
found = True
if not op.isfile(fname):
if raise_error:
raise OSError(f"Standard file {fname} not found")
found = False
return fname, found
def _read_double(fid, n=1):
"""Read a double."""
return np.fromfile(fid, ">f8", n)
def _read_string(fid, n_bytes, decode=True):
"""Read string."""
s0 = fid.read(n_bytes)
s = s0.split(b"\x00")[0]
return s.decode("utf-8") if decode else s
def _read_ustring(fid, n_bytes):
"""Read unsigned character string."""
return np.fromfile(fid, ">B", n_bytes)
def _read_int2(fid):
"""Read int from short."""
return _auto_cast(np.fromfile(fid, ">i2", 1)[0])
def _read_int(fid):
"""Read a 32-bit integer."""
return np.fromfile(fid, ">i4", 1)[0]
def _move_to_next(fid, byte=8):
"""Move to next byte boundary."""
now = fid.tell()
if now % byte != 0:
now = now - (now % byte) + byte
fid.seek(now, 0)
def _read_filter(fid):
"""Read filter information."""
f = dict()
f["freq"] = _read_double(fid)[0]
f["class"] = _read_int(fid)
f["type"] = _read_int(fid)
f["npar"] = _read_int2(fid)
f["pars"] = _read_double(fid, f["npar"])
return f
def _read_comp_coeff(fid, d):
"""Read compensation coefficients."""
# Read the coefficients and initialize
d["ncomp"] = _read_int2(fid)
d["comp"] = list()
# Read each record
dt = np.dtype(
[
("sensor_name", "S32"),
("coeff_type", ">i4"),
("d0", ">i4"),
("ncoeff", ">i2"),
("sensors", f"S{CTF.CTFV_SENSOR_LABEL}", CTF.CTFV_MAX_BALANCING),
("coeffs", ">f8", CTF.CTFV_MAX_BALANCING),
]
)
comps = np.fromfile(fid, dt, d["ncomp"])
for k in range(d["ncomp"]):
comp = dict()
d["comp"].append(comp)
comp["sensor_name"] = comps["sensor_name"][k].split(b"\x00")[0].decode("utf-8")
comp["coeff_type"] = comps["coeff_type"][k].item()
comp["ncoeff"] = comps["ncoeff"][k].item()
comp["sensors"] = [
s.split(b"\x00")[0].decode("utf-8")
for s in comps["sensors"][k][: comp["ncoeff"]]
]
comp["coeffs"] = comps["coeffs"][k][: comp["ncoeff"]]
comp["scanno"] = d["ch_names"].index(comp["sensor_name"])
def _read_res4(dsdir):
"""Read the magical res4 file."""
# adapted from read_res4.c
name, _ = _make_ctf_name(dsdir, "res4")
res = dict()
with open(name, "rb") as fid:
# Read the fields
res["head"] = _read_string(fid, 8)
res["appname"] = _read_string(fid, 256)
res["origin"] = _read_string(fid, 256)
res["desc"] = _read_string(fid, 256)
res["nave"] = _read_int2(fid)
res["data_time"] = _read_string(fid, 255)
res["data_date"] = _read_string(fid, 255)
# Seems that date and time can be swapped
# (are they entered manually?!)
if "/" in res["data_time"] and ":" in res["data_date"]:
data_date = res["data_date"]
res["data_date"] = res["data_time"]
res["data_time"] = data_date
res["nsamp"] = _read_int(fid)
res["nchan"] = _read_int2(fid)
_move_to_next(fid, 8)
res["sfreq"] = _read_double(fid)[0]
res["epoch_time"] = _read_double(fid)[0]
res["no_trials"] = _read_int2(fid)
_move_to_next(fid, 4)
res["pre_trig_pts"] = _read_int(fid)
res["no_trials_done"] = _read_int2(fid)
res["no_trials_bst_message_windowlay"] = _read_int2(fid)
_move_to_next(fid, 4)
res["save_trials"] = _read_int(fid)
res["primary_trigger"] = fid.read(1)
res["secondary_trigger"] = [
fid.read(1) for k in range(CTF.CTFV_MAX_AVERAGE_BINS)
]
res["trigger_polarity_mask"] = fid.read(1)
res["trigger_mode"] = _read_int2(fid)
_move_to_next(fid, 4)
res["accept_reject"] = _read_int(fid)
res["run_time_bst_message_windowlay"] = _read_int2(fid)
_move_to_next(fid, 4)
res["zero_head"] = _read_int(fid)
_move_to_next(fid, 4)
res["artifact_mode"] = _read_int(fid)
_read_int(fid) # padding
res["nf_run_name"] = _read_string(fid, 32)
res["nf_run_title"] = _read_string(fid, 256)
res["nf_instruments"] = _read_string(fid, 32)
res["nf_collect_descriptor"] = _read_string(fid, 32)
res["nf_subject_id"] = _read_string(fid, 32)
res["nf_operator"] = _read_string(fid, 32)
if len(res["nf_operator"]) == 0:
res["nf_operator"] = None
res["nf_sensor_file_name"] = _read_ustring(fid, 60)
_move_to_next(fid, 4)
res["rdlen"] = _read_int(fid)
fid.seek(CTF.FUNNY_POS, 0)
if res["rdlen"] > 0:
res["run_desc"] = _read_string(fid, res["rdlen"])
# Filters
res["nfilt"] = _read_int2(fid)
res["filters"] = list()
for k in range(res["nfilt"]):
res["filters"].append(_read_filter(fid))
# Channel information (names, then data)
res["ch_names"] = list()
for k in range(res["nchan"]):
ch_name = _read_string(fid, 32)
res["ch_names"].append(ch_name)
_coil_dt = np.dtype(
[
("pos", ">f8", 3),
("d0", ">f8"),
("norm", ">f8", 3),
("d1", ">f8"),
("turns", ">i2"),
("d2", ">i4"),
("d3", ">i2"),
("area", ">f8"),
]
)
_ch_dt = np.dtype(
[
("sensor_type_index", ">i2"),
("original_run_no", ">i2"),
("coil_type", ">i4"),
("proper_gain", ">f8"),
("qgain", ">f8"),
("io_gain", ">f8"),
("io_offset", ">f8"),
("num_coils", ">i2"),
("grad_order_no", ">i2"),
("d0", ">i4"),
("coil", _coil_dt, CTF.CTFV_MAX_COILS),
("head_coil", _coil_dt, CTF.CTFV_MAX_COILS),
]
)
chs = np.fromfile(fid, _ch_dt, res["nchan"])
for coil in (chs["coil"], chs["head_coil"]):
coil["pos"] /= 100.0
coil["area"] *= 1e-4
# convert to dict
chs = [dict(zip(chs.dtype.names, x)) for x in chs]
for ch in chs:
for key, val in ch.items():
ch[key] = _auto_cast(val)
res["chs"] = chs
for k in range(res["nchan"]):
res["chs"][k]["ch_name"] = res["ch_names"][k]
# The compensation coefficients
_read_comp_coeff(fid, res)
logger.info(" res4 data read.")
return res
def _auto_cast(x):
# Upcast scalars
if isinstance(x, np.ScalarType):
if x.dtype.kind == "i":
if x.dtype != np.int64:
x = x.astype(np.int64)
elif x.dtype.kind == "f":
if x.dtype != np.float64:
x = x.astype(np.float64)
return x