# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
# Parts of this code were copied from NiTime http://nipy.sourceforge.net/nitime
import numpy as np
from scipy.fft import rfft, rfftfreq
from scipy.integrate import trapezoid
from scipy.signal import get_window
from scipy.signal.windows import dpss as sp_dpss
from ..parallel import parallel_func
from ..utils import _check_option, logger, verbose, warn
def dpss_windows(N, half_nbw, Kmax, *, sym=True, norm=None, low_bias=True):
"""Compute Discrete Prolate Spheroidal Sequences.
Will give of orders [0,Kmax-1] for a given frequency-spacing multiple
NW and sequence length N.
.. note:: Copied from NiTime.
Parameters
----------
N : int
Sequence length.
half_nbw : float
Standardized half bandwidth corresponding to 2 * half_bw = BW*f0
= BW*N/dt but with dt taken as 1.
Kmax : int
Number of DPSS windows to return is Kmax (orders 0 through Kmax-1).
sym : bool
Whether to generate a symmetric window (``True``, for filter design) or
a periodic window (``False``, for spectral analysis). Default is
``True``.
.. versionadded:: 1.3
norm : 2 | ``'approximate'`` | ``'subsample'`` | None
Window normalization method. If ``'approximate'`` or ``'subsample'``,
windows are normalized by the maximum, and a correction scale-factor
for even-length windows is applied either using
``N**2/(N**2+half_nbw)`` ("approximate") or a FFT-based subsample shift
("subsample"). ``2`` uses the L2 norm. ``None`` (the default) uses
``"approximate"`` when ``Kmax=None`` and ``2`` otherwise.
.. versionadded:: 1.3
low_bias : bool
Keep only tapers with eigenvalues > 0.9.
Returns
-------
v, e : tuple,
The v array contains DPSS windows shaped (Kmax, N).
e are the eigenvalues.
Notes
-----
Tridiagonal form of DPSS calculation from :footcite:`Slepian1978`.
References
----------
.. footbibliography::
"""
# TODO VERSION can be removed with SciPy 1.16 is min,
# workaround for https://github.com/scipy/scipy/pull/22344
if N <= 1:
dpss, eigvals = np.ones((1, 1)), np.ones(1)
else:
dpss, eigvals = sp_dpss(
N, half_nbw, Kmax, sym=sym, norm=norm, return_ratios=True
)
if low_bias:
idx = eigvals > 0.9
if not idx.any():
warn("Could not properly use low_bias, keeping lowest-bias taper")
idx = [np.argmax(eigvals)]
dpss, eigvals = dpss[idx], eigvals[idx]
assert len(dpss) > 0 # should never happen
assert dpss.shape[1] == N # old nitime bug
return dpss, eigvals
def _psd_from_mt_adaptive(x_mt, eigvals, freq_mask, max_iter=250, return_weights=False):
r"""Use iterative procedure to compute the PSD from tapered spectra.
.. note:: Modified from NiTime.
Parameters
----------
x_mt : array, shape=(n_signals, n_tapers, n_freqs)
The DFTs of the tapered sequences (only positive frequencies)
eigvals : array, length n_tapers
The eigenvalues of the DPSS tapers
freq_mask : array
Frequency indices to keep
max_iter : int
Maximum number of iterations for weight computation.
return_weights : bool
Also return the weights
Returns
-------
psd : array, shape=(n_signals, np.sum(freq_mask))
The computed PSDs
weights : array shape=(n_signals, n_tapers, np.sum(freq_mask))
The weights used to combine the tapered spectra
Notes
-----
The weights to use for making the multitaper estimate, such that
:math:`S_{mt} = \sum_{k} |w_k|^2S_k^{mt} / \sum_{k} |w_k|^2`
"""
n_signals, n_tapers, n_freqs = x_mt.shape
if len(eigvals) != n_tapers:
raise ValueError("Need one eigenvalue for each taper")
if n_tapers < 3:
raise ValueError("Not enough tapers to compute adaptive weights.")
rt_eig = np.sqrt(eigvals)
# estimate the variance from an estimate with fixed weights
psd_est = _psd_from_mt(x_mt, rt_eig[np.newaxis, :, np.newaxis])
x_var = trapezoid(psd_est, dx=np.pi / n_freqs) / (2 * np.pi)
del psd_est
# allocate space for output
psd = np.empty((n_signals, np.sum(freq_mask)))
# only keep the frequencies of interest
x_mt = x_mt[:, :, freq_mask]
if return_weights:
weights = np.empty((n_signals, n_tapers, psd.shape[1]))
for i, (xk, var) in enumerate(zip(x_mt, x_var)):
# combine the SDFs in the traditional way in order to estimate
# the variance of the timeseries
# The process is to iteratively switch solving for the following
# two expressions:
# (1) Adaptive Multitaper SDF:
# S^{mt}(f) = [ sum |d_k(f)|^2 S_k(f) ]/ sum |d_k(f)|^2
#
# (2) Weights
# d_k(f) = [sqrt(lam_k) S^{mt}(f)] / [lam_k S^{mt}(f) + E{B_k(f)}]
#
# Where lam_k are the eigenvalues corresponding to the DPSS tapers,
# and the expected value of the broadband bias function
# E{B_k(f)} is replaced by its full-band integration
# (1/2pi) int_{-pi}^{pi} E{B_k(f)} = sig^2(1-lam_k)
# start with an estimate from incomplete data--the first 2 tapers
psd_iter = _psd_from_mt(xk[:2, :], rt_eig[:2, np.newaxis])
err = np.zeros_like(xk)
for n in range(max_iter):
d_k = psd_iter / (
eigvals[:, np.newaxis] * psd_iter + (1 - eigvals[:, np.newaxis]) * var
)
d_k *= rt_eig[:, np.newaxis]
# Test for convergence -- this is overly conservative, since
# iteration only stops when all frequencies have converged.
# A better approach is to iterate separately for each freq, but
# that is a nonvectorized algorithm.
# Take the RMS difference in weights from the previous iterate
# across frequencies. If the maximum RMS error across freqs is
# less than 1e-10, then we're converged
err -= d_k
if np.max(np.mean(err**2, axis=0)) < 1e-10:
break
# update the iterative estimate with this d_k
psd_iter = _psd_from_mt(xk, d_k)
err = d_k
if n == max_iter - 1:
warn("Iterative multi-taper PSD computation did not converge.")
psd[i, :] = psd_iter
if return_weights:
weights[i, :, :] = d_k
if return_weights:
return psd, weights
else:
return psd
def _psd_from_mt(x_mt, weights):
"""Compute PSD from tapered spectra.
Parameters
----------
x_mt : array, shape=(..., n_tapers, n_freqs)
Tapered spectra
weights : array, shape=(n_tapers,)
Weights used to combine the tapered spectra
Returns
-------
psd : array, shape=(..., n_freqs)
The computed PSD
"""
psd = weights * x_mt
psd *= psd.conj()
psd = psd.real.sum(axis=-2)
psd *= 2 / (weights * weights.conj()).real.sum(axis=-2)
return psd
def _csd_from_mt(x_mt, y_mt, weights_x, weights_y):
"""Compute CSD from tapered spectra.
Parameters
----------
x_mt : array, shape=(..., n_tapers, n_freqs)
Tapered spectra for x
y_mt : array, shape=(..., n_tapers, n_freqs)
Tapered spectra for y
weights_x : array, shape=(n_tapers,)
Weights used to combine the tapered spectra of x_mt
weights_y : array, shape=(n_tapers,)
Weights used to combine the tapered spectra of y_mt
Returns
-------
csd: array
The computed CSD
"""
csd = np.sum(weights_x * x_mt * (weights_y * y_mt).conj(), axis=-2)
denom = np.sqrt((weights_x * weights_x.conj()).real.sum(axis=-2)) * np.sqrt(
(weights_y * weights_y.conj()).real.sum(axis=-2)
)
csd *= 2 / denom
return csd
def _mt_spectra(x, dpss, sfreq, n_fft=None, remove_dc=True):
"""Compute tapered spectra.
Parameters
----------
x : array, shape=(..., n_times)
Input signal
dpss : array, shape=(n_tapers, n_times)
The tapers
sfreq : float
The sampling frequency
n_fft : int | None
Length of the FFT. If None, the number of samples in the input signal
will be used.
%(remove_dc)s
Returns
-------
x_mt : array, shape=(..., n_tapers, n_freqs)
The tapered spectra
freqs : array, shape=(n_freqs,)
The frequency points in Hz of the spectra
"""
if n_fft is None:
n_fft = x.shape[-1]
# remove mean (do not use in-place subtraction as it may modify input x)
if remove_dc:
x = x - np.mean(x, axis=-1, keepdims=True)
# only keep positive frequencies
freqs = rfftfreq(n_fft, 1.0 / sfreq)
# The following is equivalent to this, but uses less memory:
# x_mt = fftpack.fft(x[:, np.newaxis, :] * dpss, n=n_fft)
n_tapers = dpss.shape[0] if dpss.ndim > 1 else 1
x_mt = np.zeros(x.shape[:-1] + (n_tapers, len(freqs)), dtype=np.complex128)
for idx, sig in enumerate(x):
x_mt[idx] = rfft(sig[..., np.newaxis, :] * dpss, n=n_fft)
# Adjust DC and maybe Nyquist, depending on one-sided transform
x_mt[..., 0] /= np.sqrt(2.0)
if n_fft % 2 == 0:
x_mt[..., -1] /= np.sqrt(2.0)
return x_mt, freqs
@verbose
def _compute_mt_params(n_times, sfreq, bandwidth, low_bias, adaptive, verbose=None):
"""Triage windowing and multitaper parameters."""
# Compute standardized half-bandwidth
if isinstance(bandwidth, str):
logger.info(f' Using standard spectrum estimation with "{bandwidth}" window')
window_fun = get_window(bandwidth, n_times)[np.newaxis]
return window_fun, np.ones(1), False
if bandwidth is not None:
half_nbw = float(bandwidth) * n_times / (2.0 * sfreq)
else:
half_nbw = 4.0
if half_nbw < 0.5:
raise ValueError(
f"bandwidth value {bandwidth} yields a normalized half-bandwidth of "
f"{half_nbw} < 0.5, use a value of at least {sfreq / n_times}"
)
# Compute DPSS windows
n_tapers_max = int(2 * half_nbw)
window_fun, eigvals = dpss_windows(
n_times, half_nbw, n_tapers_max, sym=False, low_bias=low_bias
)
logger.info(
f" Using multitaper spectrum estimation with {len(eigvals)} DPSS windows"
)
if adaptive and len(eigvals) < 3:
warn(
"Not adaptively combining the spectral estimators due to a "
f"low number of tapers ({len(eigvals)} < 3)."
)
adaptive = False
return window_fun, eigvals, adaptive
@verbose
def psd_array_multitaper(
x,
sfreq,
fmin=0.0,
fmax=np.inf,
bandwidth=None,
adaptive=False,
low_bias=True,
normalization="length",
remove_dc=True,
output="power",
n_jobs=None,
*,
max_iter=150,
verbose=None,
):
r"""Compute power spectral density (PSD) using a multi-taper method.
The power spectral density is computed with DPSS
tapers :footcite:p:`Slepian1978`.
Parameters
----------
x : array, shape=(..., n_times)
The data to compute PSD from.
sfreq : float
The sampling frequency.
%(fmin_fmax_psd)s
bandwidth : float
Frequency bandwidth of the multi-taper window function in Hz. For a
given frequency, frequencies at ``± bandwidth / 2`` are smoothed
together. The default value is a bandwidth of
``8 * (sfreq / n_times)``.
adaptive : bool
Use adaptive weights to combine the tapered spectra into PSD
(slow, use n_jobs >> 1 to speed up computation).
low_bias : bool
Only use tapers with more than 90%% spectral concentration within
bandwidth.
%(normalization)s
%(remove_dc)s
output : str
The format of the returned ``psds`` array, ``'complex'`` or
``'power'``:
* ``'power'`` : the power spectral density is returned.
* ``'complex'`` : the complex fourier coefficients are returned per
taper.
%(n_jobs)s
%(max_iter_multitaper)s
%(verbose)s
Returns
-------
psds : ndarray, shape (..., n_freqs) or (..., n_tapers, n_freqs)
The power spectral densities. All dimensions up to the last (or the
last two if ``output='complex'``) will be the same as input.
freqs : array
The frequency points in Hz of the PSD.
weights : ndarray
The weights used for averaging across tapers. Only returned if
``output='complex'``.
See Also
--------
csd_multitaper
mne.io.Raw.compute_psd
mne.Epochs.compute_psd
mne.Evoked.compute_psd
Notes
-----
.. versionadded:: 0.14.0
References
----------
.. footbibliography::
"""
_check_option("normalization", normalization, ["length", "full"])
# Reshape data so its 2-D for parallelization
ndim_in = x.ndim
x = np.atleast_2d(x)
n_times = x.shape[-1]
dshape = x.shape[:-1]
x = x.reshape(-1, n_times)
dpss, eigvals, adaptive = _compute_mt_params(
n_times, sfreq, bandwidth, low_bias, adaptive
)
n_tapers = len(dpss)
weights = np.sqrt(eigvals)[np.newaxis, :, np.newaxis]
# decide which frequencies to keep
freqs = rfftfreq(n_times, 1.0 / sfreq)
freq_mask = (freqs >= fmin) & (freqs <= fmax)
freqs = freqs[freq_mask]
n_freqs = len(freqs)
if output == "complex":
psd = np.zeros((x.shape[0], n_tapers, n_freqs), dtype="complex")
else:
psd = np.zeros((x.shape[0], n_freqs))
# Let's go in up to 50 MB chunks of signals to save memory
n_chunk = max(50000000 // (len(freq_mask) * len(eigvals) * 16), 1)
offsets = np.concatenate((np.arange(0, x.shape[0], n_chunk), [x.shape[0]]))
for start, stop in zip(offsets[:-1], offsets[1:]):
x_mt = _mt_spectra(x[start:stop], dpss, sfreq, remove_dc=remove_dc)[0]
if output == "power":
if not adaptive:
psd[start:stop] = _psd_from_mt(x_mt[:, :, freq_mask], weights)
else:
parallel, my_psd_from_mt_adaptive, n_jobs = parallel_func(
_psd_from_mt_adaptive, n_jobs
)
n_splits = min(stop - start, n_jobs)
out = parallel(
my_psd_from_mt_adaptive(x, eigvals, freq_mask, max_iter)
for x in np.array_split(x_mt, n_splits)
)
psd[start:stop] = np.concatenate(out)
else:
psd[start:stop] = x_mt[:, :, freq_mask]
if normalization == "full":
psd /= sfreq
# Combining/reshaping to original data shape
last_dims = (n_freqs,) if output == "power" else (n_tapers, n_freqs)
psd.shape = dshape + last_dims
if ndim_in == 1:
psd = psd[0]
if output == "complex":
return psd, freqs, weights
else:
return psd, freqs
@verbose
def tfr_array_multitaper(
data,
sfreq,
freqs,
n_cycles=7.0,
zero_mean=True,
time_bandwidth=4.0,
use_fft=True,
decim=1,
output="complex",
n_jobs=None,
*,
return_weights=False,
verbose=None,
):
"""Compute Time-Frequency Representation (TFR) using DPSS tapers.
Same computation as `~mne.time_frequency.tfr_multitaper`, but operates on
:class:`NumPy arrays <numpy.ndarray>` instead of `~mne.Epochs` or
`~mne.Evoked` objects.
Parameters
----------
data : array of shape (n_epochs, n_channels, n_times)
The epochs.
sfreq : float
Sampling frequency of the data in Hz.
%(freqs_tfr_array)s
%(n_cycles_tfr)s
zero_mean : bool
If True, make sure the wavelets have a mean of zero. Defaults to True.
%(time_bandwidth_tfr)s
use_fft : bool
Use the FFT for convolutions or not. Defaults to True.
%(decim_tfr)s
output : str, default 'complex'
* ``'complex'`` : single trial per taper complex values.
* ``'power'`` : single trial power.
* ``'phase'`` : single trial per taper phase.
* ``'avg_power'`` : average of single trial power.
* ``'itc'`` : inter-trial coherence.
* ``'avg_power_itc'`` : average of single trial power and inter-trial
coherence across trials.
%(n_jobs)s
The parallelization is implemented across channels.
return_weights : bool, default False
If True, return the taper weights. Only applies if ``output='complex'`` or
``'phase'``.
.. versionadded:: 1.10.0
%(verbose)s
Returns
-------
out : array
Time frequency transform of ``data``.
- if ``output in ('complex',' 'phase')``, array of shape
``(n_epochs, n_chans, n_tapers, n_freqs, n_times)``
- if ``output`` is ``'power'``, array of shape ``(n_epochs, n_chans,
n_freqs, n_times)``
- else, array of shape ``(n_chans, n_freqs, n_times)``
If ``output`` is ``'avg_power_itc'``, the real values in ``out``
contain the average power and the imaginary values contain the
inter-trial coherence: :math:`out = power_{avg} + i * ITC`.
weights : array of shape (n_tapers, n_freqs)
The taper weights. Only returned if ``output='complex'`` or ``'phase'`` and
``return_weights=True``.
See Also
--------
mne.time_frequency.tfr_multitaper
mne.time_frequency.tfr_morlet
mne.time_frequency.tfr_array_morlet
mne.time_frequency.tfr_stockwell
mne.time_frequency.tfr_array_stockwell
Notes
-----
%(temporal_window_tfr_intro)s
%(temporal_window_tfr_multitaper_notes)s
%(time_bandwidth_tfr_notes)s
.. versionadded:: 0.14.0
"""
from .tfr import _compute_tfr
return _compute_tfr(
data,
freqs,
sfreq=sfreq,
method="multitaper",
n_cycles=n_cycles,
zero_mean=zero_mean,
time_bandwidth=time_bandwidth,
use_fft=use_fft,
decim=decim,
output=output,
return_weights=return_weights,
n_jobs=n_jobs,
verbose=verbose,
)