"""Populate measurement info."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import os.path as op
from calendar import timegm
from time import strptime
import numpy as np
from ..._fiff.constants import FIFF
from ..._fiff.ctf_comp import _add_kind, _calibrate_comp
from ..._fiff.meas_info import _empty_info
from ..._fiff.write import get_new_file_id
from ...annotations import Annotations
from ...transforms import (
_coord_frame_name,
apply_trans,
combine_transforms,
invert_transform,
)
from ...utils import _clean_names, logger, warn
from .constants import CTF
_ctf_to_fiff = {
CTF.CTFV_COIL_LPA: FIFF.FIFFV_POINT_LPA,
CTF.CTFV_COIL_RPA: FIFF.FIFFV_POINT_RPA,
CTF.CTFV_COIL_NAS: FIFF.FIFFV_POINT_NASION,
}
def _pick_isotrak_and_hpi_coils(res4, coils, t):
"""Pick the HPI coil locations given in device coordinates."""
if coils is None:
return list(), list()
dig = list()
hpi_result = dict(dig_points=list())
n_coil_dev = 0
n_coil_head = 0
for p in coils:
if p["valid"]:
if p["kind"] in [CTF.CTFV_COIL_LPA, CTF.CTFV_COIL_RPA, CTF.CTFV_COIL_NAS]:
kind = FIFF.FIFFV_POINT_CARDINAL
ident = _ctf_to_fiff[p["kind"]]
else: # CTF.CTFV_COIL_SPARE
kind = FIFF.FIFFV_POINT_HPI
ident = p["kind"]
if p["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_DEVICE:
if t is None or t["t_ctf_dev_dev"] is None:
raise RuntimeError(
"No coordinate transformation available for HPI coil locations"
)
d = dict(
kind=kind,
ident=ident,
r=apply_trans(t["t_ctf_dev_dev"], p["r"]),
coord_frame=FIFF.FIFFV_COORD_UNKNOWN,
)
hpi_result["dig_points"].append(d)
n_coil_dev += 1
elif p["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD:
if t is None or t["t_ctf_head_head"] is None:
raise RuntimeError(
"No coordinate transformation "
"available for (virtual) Polhemus data"
)
d = dict(
kind=kind,
ident=ident,
r=apply_trans(t["t_ctf_head_head"], p["r"]),
coord_frame=FIFF.FIFFV_COORD_HEAD,
)
dig.append(d)
n_coil_head += 1
if n_coil_head > 0:
logger.info(" Polhemus data for %d HPI coils added", n_coil_head)
if n_coil_dev > 0:
logger.info(
" Device coordinate locations for %d HPI coils added", n_coil_dev
)
return dig, [hpi_result]
def _convert_time(date_str, time_str):
"""Convert date and time strings to float time."""
if date_str == time_str == "":
date_str = "01/01/1970"
time_str = "00:00:00"
logger.info(
"No date or time found, setting to the start of the "
"POSIX epoch (1970/01/01 midnight)"
)
for fmt in ("%d/%m/%Y", "%d-%b-%Y", "%a, %b %d, %Y", "%Y/%m/%d"):
try:
date = strptime(date_str.strip(), fmt)
except ValueError:
pass
else:
break
else:
raise RuntimeError(
f"Illegal date: {date_str}.\nIf the language of the date does not "
"correspond to your local machine's language try to set the "
"locale to the language of the date string:\n"
'locale.setlocale(locale.LC_ALL, "en_US")'
)
for fmt in ("%H:%M:%S", "%H:%M"):
try:
time = strptime(time_str, fmt)
except ValueError:
pass
else:
break
else:
raise RuntimeError(f"Illegal time: {time_str}")
# MNE-C uses mktime which uses local time, but here we instead decouple
# conversion location from the process, and instead assume that the
# acquisition was in GMT. This will be wrong for most sites, but at least
# the value we obtain here won't depend on the geographical location
# that the file was converted.
res = timegm(
(
date.tm_year,
date.tm_mon,
date.tm_mday,
time.tm_hour,
time.tm_min,
time.tm_sec,
date.tm_wday,
date.tm_yday,
date.tm_isdst,
)
)
return res
def _get_plane_vectors(ez):
"""Get two orthogonal vectors orthogonal to ez (ez will be modified)."""
assert ez.shape == (3,)
ez_len = np.sqrt(np.sum(ez * ez))
if ez_len == 0:
raise RuntimeError("Zero length normal. Cannot proceed.")
if np.abs(ez_len - np.abs(ez[2])) < 1e-5: # ez already in z-direction
ex = np.array([1.0, 0.0, 0.0])
else:
ex = np.zeros(3)
if ez[1] < ez[2]:
ex[0 if ez[0] < ez[1] else 1] = 1.0
else:
ex[0 if ez[0] < ez[2] else 2] = 1.0
ez /= ez_len
ex -= np.dot(ez, ex) * ez
ex /= np.sqrt(np.sum(ex * ex))
ey = np.cross(ez, ex)
return ex, ey
def _at_origin(x):
"""Determine if a vector is at the origin."""
return np.sum(x * x) < 1e-8
def _check_comp_ch(cch, kind, desired=None):
if desired is None:
desired = cch["grad_order_no"]
if cch["grad_order_no"] != desired:
raise RuntimeError(
f"{kind} channel with inconsistent compensation "
f"grade {cch['grad_order_no']}, should be {desired}"
)
return desired
def _convert_channel_info(res4, t, use_eeg_pos):
"""Convert CTF channel information to fif format."""
nmeg = neeg = nstim = nmisc = nref = 0
chs = list()
this_comp = None
for k, cch in enumerate(res4["chs"]):
cal = float(1.0 / (cch["proper_gain"] * cch["qgain"]))
ch = dict(
scanno=k + 1,
range=1.0,
cal=cal,
loc=np.full(12, np.nan),
unit_mul=FIFF.FIFF_UNITM_NONE,
ch_name=cch["ch_name"][:15],
coil_type=FIFF.FIFFV_COIL_NONE,
)
del k
chs.append(ch)
# Create the channel position information
if cch["sensor_type_index"] in (
CTF.CTFV_REF_MAG_CH,
CTF.CTFV_REF_GRAD_CH,
CTF.CTFV_MEG_CH,
):
# Extra check for a valid MEG channel
if (
np.sum(cch["coil"]["pos"][0] ** 2) < 1e-6
or np.sum(cch["coil"]["norm"][0] ** 2) < 1e-6
):
nmisc += 1
ch.update(
logno=nmisc,
coord_frame=FIFF.FIFFV_COORD_UNKNOWN,
kind=FIFF.FIFFV_MISC_CH,
unit=FIFF.FIFF_UNIT_V,
)
text = "MEG"
if cch["sensor_type_index"] != CTF.CTFV_MEG_CH:
text += " ref"
warn(
f"{text} channel {ch['ch_name']} did not have position "
"assigned, so it was changed to a MISC channel"
)
continue
ch["unit"] = FIFF.FIFF_UNIT_T
# Set up the local coordinate frame
r0 = cch["coil"]["pos"][0].copy()
ez = cch["coil"]["norm"][0].copy()
# It turns out that positive proper_gain requires swapping
# of the normal direction
if cch["proper_gain"] > 0.0:
ez *= -1
# Check how the other vectors should be defined
off_diag = False
# Default: ex and ey are arbitrary in the plane normal to ez
if cch["sensor_type_index"] == CTF.CTFV_REF_GRAD_CH:
# The off-diagonal gradiometers are an exception:
#
# We use the same convention for ex as for Neuromag planar
# gradiometers: ex pointing in the positive gradient direction
diff = cch["coil"]["pos"][0] - cch["coil"]["pos"][1]
size = np.sqrt(np.sum(diff * diff))
if size > 0.0:
diff /= size
# Is ez normal to the line joining the coils?
if np.abs(np.dot(diff, ez)) < 1e-3:
off_diag = True
# Handle the off-diagonal gradiometer coordinate system
r0 -= size * diff / 2.0
ex = diff
ey = np.cross(ez, ex)
else:
ex, ey = _get_plane_vectors(ez)
else:
ex, ey = _get_plane_vectors(ez)
# Transform into a Neuromag-like device coordinate system
ch["loc"] = np.concatenate(
[
apply_trans(t["t_ctf_dev_dev"], r0),
apply_trans(t["t_ctf_dev_dev"], ex, move=False),
apply_trans(t["t_ctf_dev_dev"], ey, move=False),
apply_trans(t["t_ctf_dev_dev"], ez, move=False),
]
)
del r0, ex, ey, ez
# Set the coil type
if cch["sensor_type_index"] == CTF.CTFV_REF_MAG_CH:
ch["kind"] = FIFF.FIFFV_REF_MEG_CH
ch["coil_type"] = FIFF.FIFFV_COIL_CTF_REF_MAG
nref += 1
ch["logno"] = nref
elif cch["sensor_type_index"] == CTF.CTFV_REF_GRAD_CH:
ch["kind"] = FIFF.FIFFV_REF_MEG_CH
if off_diag:
ch["coil_type"] = FIFF.FIFFV_COIL_CTF_OFFDIAG_REF_GRAD
else:
ch["coil_type"] = FIFF.FIFFV_COIL_CTF_REF_GRAD
nref += 1
ch["logno"] = nref
else:
this_comp = _check_comp_ch(cch, "Gradiometer", this_comp)
ch["kind"] = FIFF.FIFFV_MEG_CH
ch["coil_type"] = FIFF.FIFFV_COIL_CTF_GRAD
nmeg += 1
ch["logno"] = nmeg
# Encode the software gradiometer order
ch["coil_type"] = int(ch["coil_type"] | (cch["grad_order_no"] << 16))
ch["coord_frame"] = FIFF.FIFFV_COORD_DEVICE
elif cch["sensor_type_index"] == CTF.CTFV_EEG_CH:
coord_frame = FIFF.FIFFV_COORD_HEAD
if use_eeg_pos:
# EEG electrode coordinates may be present but in the
# CTF head frame
ch["loc"][:3] = cch["coil"]["pos"][0]
if not _at_origin(ch["loc"][:3]):
if t["t_ctf_head_head"] is None:
warn(
f"EEG electrode ({ch['ch_name']}) location omitted because "
"of missing HPI information"
)
ch["loc"].fill(np.nan)
coord_frame = FIFF.FIFFV_MNE_COORD_CTF_HEAD
else:
ch["loc"][:3] = apply_trans(t["t_ctf_head_head"], ch["loc"][:3])
neeg += 1
ch.update(
logno=neeg,
kind=FIFF.FIFFV_EEG_CH,
unit=FIFF.FIFF_UNIT_V,
coord_frame=coord_frame,
coil_type=FIFF.FIFFV_COIL_EEG,
)
elif cch["sensor_type_index"] == CTF.CTFV_STIM_CH:
nstim += 1
ch.update(
logno=nstim,
coord_frame=FIFF.FIFFV_COORD_UNKNOWN,
kind=FIFF.FIFFV_STIM_CH,
unit=FIFF.FIFF_UNIT_V,
)
else:
nmisc += 1
ch.update(
logno=nmisc,
coord_frame=FIFF.FIFFV_COORD_UNKNOWN,
kind=FIFF.FIFFV_MISC_CH,
unit=FIFF.FIFF_UNIT_V,
)
return chs
def _comp_sort_keys(c):
"""Sort the compensation data."""
return (int(c["coeff_type"]), int(c["scanno"]))
def _check_comp(comp):
"""Check that conversion to named matrices is possible."""
ref_sens = None
kind = -1
for k, c_k in enumerate(comp):
if c_k["coeff_type"] != kind:
c_ref = c_k
ref_sens = c_ref["sensors"]
kind = c_k["coeff_type"]
elif not c_k["sensors"] == ref_sens:
raise RuntimeError("Cannot use an uneven compensation matrix")
def _conv_comp(comp, first, last, chs):
"""Add a new converted compensation data item."""
ch_names = [c["ch_name"] for c in chs]
n_col = comp[first]["ncoeff"]
col_names = comp[first]["sensors"][:n_col]
row_names = [comp[p]["sensor_name"] for p in range(first, last + 1)]
mask = np.isin(col_names, ch_names) # missing channels excluded
col_names = np.array(col_names)[mask].tolist()
n_col = len(col_names)
n_row = len(row_names)
ccomp = dict(ctfkind=comp[first]["coeff_type"], save_calibrated=False)
_add_kind(ccomp)
data = np.empty((n_row, n_col))
for ii, coeffs in enumerate(comp[first : last + 1]):
# Pick the elements to the matrix
data[ii, :] = coeffs["coeffs"][mask]
ccomp["data"] = dict(
row_names=row_names,
col_names=col_names,
data=data,
nrow=len(row_names),
ncol=len(col_names),
)
mk = ("proper_gain", "qgain")
_calibrate_comp(ccomp, chs, row_names, col_names, mult_keys=mk, flip=True)
return ccomp
def _convert_comp_data(res4):
"""Convert the compensation data into named matrices."""
if res4["ncomp"] == 0:
return
# Sort the coefficients in our favorite order
res4["comp"] = sorted(res4["comp"], key=_comp_sort_keys)
# Check that all items for a given compensation type have the correct
# number of channels
_check_comp(res4["comp"])
# Create named matrices
first = 0
kind = -1
comps = list()
for k in range(len(res4["comp"])):
if res4["comp"][k]["coeff_type"] != kind:
if k > 0:
comps.append(_conv_comp(res4["comp"], first, k - 1, res4["chs"]))
kind = res4["comp"][k]["coeff_type"]
first = k
comps.append(_conv_comp(res4["comp"], first, k, res4["chs"]))
return comps
def _pick_eeg_pos(c):
"""Pick EEG positions."""
eeg = dict(
coord_frame=FIFF.FIFFV_COORD_HEAD,
assign_to_chs=False,
labels=list(),
ids=list(),
rr=list(),
kinds=list(),
np=0,
)
for ch in c["chs"]:
if ch["kind"] == FIFF.FIFFV_EEG_CH and not _at_origin(ch["loc"][:3]):
eeg["labels"].append(ch["ch_name"])
eeg["ids"].append(ch["logno"])
eeg["rr"].append(ch["loc"][:3])
eeg["kinds"].append(FIFF.FIFFV_POINT_EEG)
eeg["np"] += 1
if eeg["np"] == 0:
return None
logger.info("Picked positions of %d EEG channels from channel info", eeg["np"])
return eeg
def _add_eeg_pos(eeg, t, c):
"""Pick the (virtual) EEG position data."""
if eeg is None:
return
if t is None or t["t_ctf_head_head"] is None:
raise RuntimeError(
"No coordinate transformation available for EEG position data"
)
eeg_assigned = 0
if eeg["assign_to_chs"]:
for k in range(eeg["np"]):
# Look for a channel name match
for ch in c["chs"]:
if ch["ch_name"].lower() == eeg["labels"][k].lower():
r0 = ch["loc"][:3]
r0[:] = eeg["rr"][k]
if eeg["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD:
r0[:] = apply_trans(t["t_ctf_head_head"], r0)
elif eeg["coord_frame"] != FIFF.FIFFV_COORD_HEAD:
raise RuntimeError(
"Illegal coordinate frame for EEG electrode "
f"positions : {_coord_frame_name(eeg['coord_frame'])}"
)
# Use the logical channel number as an identifier
eeg["ids"][k] = ch["logno"]
eeg["kinds"][k] = FIFF.FIFFV_POINT_EEG
eeg_assigned += 1
break
# Add these to the Polhemus data
fid_count = eeg_count = extra_count = 0
for k in range(eeg["np"]):
d = dict(
r=eeg["rr"][k].copy(),
kind=eeg["kinds"][k],
ident=eeg["ids"][k],
coord_frame=FIFF.FIFFV_COORD_HEAD,
)
c["dig"].append(d)
if eeg["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD:
d["r"] = apply_trans(t["t_ctf_head_head"], d["r"])
elif eeg["coord_frame"] != FIFF.FIFFV_COORD_HEAD:
raise RuntimeError(
"Illegal coordinate frame for EEG electrode positions: "
+ _coord_frame_name(eeg["coord_frame"])
)
if eeg["kinds"][k] == FIFF.FIFFV_POINT_CARDINAL:
fid_count += 1
elif eeg["kinds"][k] == FIFF.FIFFV_POINT_EEG:
eeg_count += 1
else:
extra_count += 1
if eeg_assigned > 0:
logger.info(
" %d EEG electrode locations assigned to channel info.", eeg_assigned
)
for count, kind in zip(
(fid_count, eeg_count, extra_count),
("fiducials", "EEG locations", "extra points"),
):
if count > 0:
logger.info(" %d %s added to Polhemus data.", count, kind)
_filt_map = {CTF.CTFV_FILTER_LOWPASS: "lowpass", CTF.CTFV_FILTER_HIGHPASS: "highpass"}
def _compose_meas_info(res4, coils, trans, eeg):
"""Create meas info from CTF data."""
info = _empty_info(res4["sfreq"])
# Collect all the necessary data from the structures read
info["meas_id"] = get_new_file_id()
info["meas_id"]["usecs"] = 0
info["meas_id"]["secs"] = _convert_time(res4["data_date"], res4["data_time"])
info["meas_date"] = (info["meas_id"]["secs"], info["meas_id"]["usecs"])
info["experimenter"] = res4["nf_operator"]
info["subject_info"] = dict(his_id=res4["nf_subject_id"])
for filt in res4["filters"]:
if filt["type"] in _filt_map:
info[_filt_map[filt["type"]]] = filt["freq"]
info["dig"], info["hpi_results"] = _pick_isotrak_and_hpi_coils(res4, coils, trans)
if trans is not None:
if len(info["hpi_results"]) > 0:
info["hpi_results"][0]["coord_trans"] = trans["t_ctf_head_head"]
if trans["t_dev_head"] is not None:
info["dev_head_t"] = trans["t_dev_head"]
info["dev_ctf_t"] = combine_transforms(
trans["t_dev_head"],
invert_transform(trans["t_ctf_head_head"]),
FIFF.FIFFV_COORD_DEVICE,
FIFF.FIFFV_MNE_COORD_CTF_HEAD,
)
if trans["t_ctf_head_head"] is not None:
info["ctf_head_t"] = trans["t_ctf_head_head"]
info["chs"] = _convert_channel_info(res4, trans, eeg is None)
info["comps"] = _convert_comp_data(res4)
if eeg is None:
# Pick EEG locations from chan info if not read from a separate file
eeg = _pick_eeg_pos(info)
_add_eeg_pos(eeg, trans, info)
logger.info(" Measurement info composed.")
info._unlocked = False
info._update_redundant()
return info
def _read_bad_chans(directory, info):
"""Read Bad channel list and match to internal names."""
fname = op.join(directory, "BadChannels")
if not op.exists(fname):
return []
mapping = dict(zip(_clean_names(info["ch_names"]), info["ch_names"]))
with open(fname) as fid:
bad_chans = [mapping[f.strip()] for f in fid.readlines()]
return bad_chans
def _annotate_bad_segments(directory, start_time, meas_date):
fname = op.join(directory, "bad.segments")
if not op.exists(fname):
return None
# read in bad segment file
onsets = []
durations = []
desc = []
with open(fname) as fid:
for f in fid.readlines():
tmp = f.strip().split()
desc.append(f"bad_{tmp[0]}")
onsets.append(np.float64(tmp[1]) - start_time)
durations.append(np.float64(tmp[2]) - np.float64(tmp[1]))
# return None if there are no bad segments
if len(onsets) == 0:
return None
return Annotations(onsets, durations, desc, meas_date)