# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
# Copyright 2003-2010 Jürgen Kayser <rjk23@columbia.edu>
#
# The original CSD Toolbox can be found at
# http://psychophysiology.cpmc.columbia.edu/Software/CSDtoolbox/
#
# Relicensed under BSD-3-Clause and adapted with permission from authors of original GPL
# code.
import numpy as np
from scipy.optimize import minimize_scalar
from scipy.stats import gaussian_kde
from .._fiff.constants import FIFF
from .._fiff.pick import pick_types
from ..bem import fit_sphere_to_headshape
from ..channels.interpolation import _calc_g, _calc_h
from ..epochs import BaseEpochs, make_fixed_length_epochs
from ..evoked import Evoked
from ..io import BaseRaw
from ..utils import _check_preload, _ensure_int, _validate_type, logger, verbose
def _prepare_G(G, lambda2):
G.flat[:: len(G) + 1] += lambda2
# compute the CSD
Gi = np.linalg.inv(G)
TC = Gi.sum(0)
sgi = np.sum(TC) # compute sum total
return Gi, TC, sgi
def _compute_csd(G_precomputed, H, radius):
"""Compute the CSD."""
n_channels = H.shape[0]
data = np.eye(n_channels)
mu = data.mean(0)
Z = data - mu
Gi, TC, sgi = G_precomputed
Cp2 = np.dot(Gi, Z)
c02 = np.sum(Cp2, axis=0) / sgi
C2 = Cp2 - np.dot(TC[:, np.newaxis], c02[np.newaxis, :])
X = np.dot(C2.T, H).T / radius**2
return X
@verbose
def compute_current_source_density(
inst,
sphere="auto",
lambda2=1e-5,
stiffness=4,
n_legendre_terms=50,
copy=True,
*,
verbose=None,
):
"""Get the current source density (CSD) transformation.
Transformation based on spherical spline surface Laplacian
:footcite:`PerrinEtAl1987,PerrinEtAl1989,Cohen2014,KayserTenke2015`.
This function can be used to re-reference the signal using a Laplacian
(LAP) "reference-free" transformation.
Parameters
----------
inst : instance of Raw, Epochs or Evoked
The data to be transformed.
sphere : array-like, shape (4,) | str
The sphere, head-model of the form (x, y, z, r) where x, y, z
is the center of the sphere and r is the radius in meters.
Can also be "auto" to use a digitization-based fit.
lambda2 : float
Regularization parameter, produces smoothness. Defaults to 1e-5.
stiffness : float
Stiffness of the spline.
n_legendre_terms : int
Number of Legendre terms to evaluate.
copy : bool
Whether to overwrite instance data or create a copy.
%(verbose)s
Returns
-------
inst_csd : instance of Raw, Epochs or Evoked
The transformed data. Output type will match input type.
Notes
-----
.. versionadded:: 0.20
References
----------
.. footbibliography::
"""
_validate_type(inst, (BaseEpochs, BaseRaw, Evoked), "inst")
_check_preload(inst, "Computing CSD")
if inst.info["custom_ref_applied"] == FIFF.FIFFV_MNE_CUSTOM_REF_CSD:
raise ValueError("CSD already applied, should not be reapplied")
_validate_type(copy, (bool), "copy")
inst = inst.copy() if copy else inst
picks = pick_types(inst.info, meg=False, eeg=True, exclude=[])
if any([ch in np.array(inst.ch_names)[picks] for ch in inst.info["bads"]]):
raise ValueError(
"CSD cannot be computed with bad EEG channels. Either"
" drop (inst.drop_channels(inst.info['bads']) "
"or interpolate (`inst.interpolate_bads()`) "
"bad EEG channels."
)
if len(picks) == 0:
raise ValueError("No EEG channels found.")
_validate_type(lambda2, "numeric", "lambda2")
if not 0 <= lambda2 < 1:
raise ValueError(f"lambda2 must be between 0 and 1, got {lambda2}")
_validate_type(stiffness, "numeric", "stiffness")
if stiffness < 0:
raise ValueError(f"stiffness must be non-negative got {stiffness}")
n_legendre_terms = _ensure_int(n_legendre_terms, "n_legendre_terms")
if n_legendre_terms < 1:
raise ValueError(
f"n_legendre_terms must be greater than 0, got {n_legendre_terms}"
)
if isinstance(sphere, str) and sphere == "auto":
radius, origin_head, origin_device = fit_sphere_to_headshape(inst.info)
x, y, z = origin_head - origin_device
sphere = (x, y, z, radius)
try:
sphere = np.array(sphere, float)
x, y, z, radius = sphere
except Exception:
raise ValueError(
f'sphere must be "auto" or array-like with shape (4,), got {sphere}'
)
_validate_type(x, "numeric", "x")
_validate_type(y, "numeric", "y")
_validate_type(z, "numeric", "z")
_validate_type(radius, "numeric", "radius")
if radius <= 0:
raise ValueError("sphere radius must be greater than 0, got {radius}")
pos = np.array([inst.info["chs"][pick]["loc"][:3] for pick in picks])
if not np.isfinite(pos).all() or np.isclose(pos, 0.0).all(1).any():
raise ValueError("Zero or infinite position found in chs")
pos -= (x, y, z)
# Project onto a unit sphere to compute the cosine similarity:
pos /= np.linalg.norm(pos, axis=1, keepdims=True)
cos_dist = np.clip(np.dot(pos, pos.T), -1, 1)
# This is equivalent to doing one minus half the squared Euclidean:
# from scipy.spatial.distance import squareform, pdist
# cos_dist = 1 - squareform(pdist(pos, 'sqeuclidean')) / 2.
del pos
G = _calc_g(cos_dist, stiffness=stiffness, n_legendre_terms=n_legendre_terms)
H = _calc_h(cos_dist, stiffness=stiffness, n_legendre_terms=n_legendre_terms)
G_precomputed = _prepare_G(G, lambda2)
trans_csd = _compute_csd(G_precomputed=G_precomputed, H=H, radius=radius)
epochs = [inst._data] if not isinstance(inst, BaseEpochs) else inst._data
for epo in epochs:
epo[picks] = np.dot(trans_csd, epo[picks])
with inst.info._unlock():
inst.info["custom_ref_applied"] = FIFF.FIFFV_MNE_CUSTOM_REF_CSD
for pick in picks:
inst.info["chs"][pick].update(
coil_type=FIFF.FIFFV_COIL_EEG_CSD, unit=FIFF.FIFF_UNIT_V_M2
)
# Remove rejection thresholds for EEG
if isinstance(inst, BaseEpochs):
if inst.reject and "eeg" in inst.reject:
del inst.reject["eeg"]
if inst.flat and "eeg" in inst.flat:
del inst.flat["eeg"]
return inst
@verbose
def compute_bridged_electrodes(
inst,
lm_cutoff=16,
epoch_threshold=0.5,
l_freq=0.5,
h_freq=30,
epoch_duration=2,
bw_method=None,
verbose=None,
):
r"""Compute bridged EEG electrodes using the intrinsic Hjorth algorithm.
First, an electrical distance matrix is computed by taking the pairwise
variance between electrodes. Local minimums in this matrix below
``lm_cutoff`` are indicative of bridging between a pair of electrodes.
Pairs of electrodes are marked as bridged as long as their electrical
distance is below ``lm_cutoff`` on more than the ``epoch_threshold``
proportion of epochs.
Based on :footcite:`TenkeKayser2001,GreischarEtAl2004,DelormeMakeig2004`
and the `EEGLAB implementation
<https://psychophysiology.cpmc.columbia.edu/>`__.
Parameters
----------
inst : instance of Raw, Epochs or Evoked
The data to compute electrode bridging on.
lm_cutoff : float
The distance in :math:`{\mu}V^2` cutoff below which to
search for a local minimum (lm) indicative of bridging.
EEGLAB defaults to 5 :math:`{\mu}V^2`. MNE defaults to
16 :math:`{\mu}V^2` to be conservative based on the distributions in
:footcite:t:`GreischarEtAl2004`.
epoch_threshold : float
The proportion of epochs with electrical distance less than
``lm_cutoff`` in order to consider the channel bridged.
The default is 0.5.
l_freq : float
The low cutoff frequency to use. Default is 0.5 Hz.
h_freq : float
The high cutoff frequency to use. Default is 30 Hz.
epoch_duration : float
The time in seconds to divide the raw into fixed-length epochs
to check for consistent bridging. Only used if ``inst`` is
:class:`mne.io.BaseRaw`. The default is 2 seconds.
bw_method : None
``bw_method`` to pass to :class:`scipy.stats.gaussian_kde`.
%(verbose)s
Returns
-------
bridged_idx : list of tuple
The indices of channels marked as bridged with each bridged
pair stored as a tuple.
ed_matrix : ndarray of float, shape (n_epochs, n_channels, n_channels)
The electrical distance matrix for each pair of EEG electrodes.
Notes
-----
.. versionadded:: 1.1
References
----------
.. footbibliography::
"""
_check_preload(inst, "Computing bridged electrodes")
inst = inst.copy() # don't modify original
picks = pick_types(inst.info, eeg=True)
if len(picks) == 0:
raise RuntimeError("No EEG channels found, cannot compute electrode bridging")
# first, filter
inst.filter(l_freq=l_freq, h_freq=h_freq, picks=picks, verbose=False)
if isinstance(inst, BaseRaw):
inst = make_fixed_length_epochs(
inst, duration=epoch_duration, preload=True, verbose=False
)
# standardize shape
data = inst.get_data(picks=picks)
if isinstance(inst, Evoked):
data = data[np.newaxis, ...] # expand evoked
# next, compute electrical distance matrix, upper triangular
n_epochs = data.shape[0]
ed_matrix = np.zeros((n_epochs, picks.size, picks.size)) * np.nan
for i in range(picks.size):
for j in range(i + 1, picks.size):
ed_matrix[:, i, j] = np.var(data[:, i] - data[:, j], axis=1)
# scale, fill in other half, diagonal
ed_matrix *= 1e12 # scale to muV**2
# initialize bridged indices
bridged_idx = list()
# if not enough values below local minimum cutoff, return no bridges
ed_flat = ed_matrix[~np.isnan(ed_matrix)]
if ed_flat[ed_flat < lm_cutoff].size / n_epochs < epoch_threshold:
return bridged_idx, ed_matrix
# kernel density estimation
kde = gaussian_kde(ed_flat[ed_flat < lm_cutoff], bw_method=bw_method)
with np.errstate(invalid="ignore"):
local_minimum = float(
minimize_scalar(
lambda x: kde(x) if x < lm_cutoff and x > 0 else np.inf
).x.item()
)
logger.info(f"Local minimum {local_minimum} found")
# find electrodes that are below the cutoff local minimum on
# `epochs_threshold` proportion of epochs
for i in range(picks.size):
for j in range(i + 1, picks.size):
bridged_count = np.sum(ed_matrix[:, i, j] < local_minimum)
if bridged_count / n_epochs > epoch_threshold:
logger.info(
"Bridge detected between "
f"{inst.ch_names[picks[i]]} and "
f"{inst.ch_names[picks[j]]}"
)
bridged_idx.append((picks[i], picks[j]))
return bridged_idx, ed_matrix