# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import numpy as np
import pytest
from numpy.fft import rfft, rfftfreq
from mne import create_info
from mne._fiff.pick import _pick_data_channels
from mne.datasets import testing
from mne.io import RawArray, read_raw_fif
from mne.preprocessing import oversampled_temporal_projection
from mne.utils import catch_logging
data_path = testing.data_path(download=False)
erm_fname = data_path / "SSS" / "test_move_anon_erm_raw.fif"
triux_fname = data_path / "SSS" / "TRIUX" / "triux_bmlhus_erm_raw.fif"
skip_fname = data_path / "misc" / "intervalrecording_raw.fif"
def test_otp_array():
"""Test the OTP algorithm on artificial data."""
n_channels, n_time, sfreq = 10, 2000, 1000.0
signal_f = 2.0
rng = np.random.RandomState(0)
data = rng.randn(n_channels, n_time)
raw = RawArray(data, create_info(n_channels, sfreq, "eeg"))
raw.info["bads"] = [raw.ch_names[0]]
signal = np.sin(2 * np.pi * signal_f * raw.times)
raw._data += signal
# Check SNR
def snr(data):
"""Check SNR according to the simulation model."""
data_fft = rfft(data)
freqs = rfftfreq(n_time, 1.0 / 1000.0)
sidx = np.where(freqs == signal_f)[0][0]
oidx = list(range(sidx)) + list(range(sidx + 1, len(freqs)))
snr = (data_fft[:, sidx] * data_fft[:, sidx].conj()).real.sum() / (
data_fft[:, oidx] * data_fft[:, oidx].conj()
).real.sum()
return snr
orig_snr = snr(raw[:][0])
with catch_logging() as log:
raw_otp = oversampled_temporal_projection(raw, duration=2.0, verbose=True)
otp_2_snr = snr(raw_otp[:][0])
assert otp_2_snr > 3 + orig_snr
assert "1 data chunk" in log.getvalue()
with catch_logging() as log:
raw_otp = oversampled_temporal_projection(raw, duration=1.2, verbose=True)
otp_1p5_snr = snr(raw_otp[:][0])
assert otp_1p5_snr > 3 + orig_snr
assert "2 data chunks" in log.getvalue()
with catch_logging() as log:
raw_otp = oversampled_temporal_projection(raw, duration=1.0, verbose=True)
otp_1_snr = snr(raw_otp[:][0])
assert otp_1_snr > 2 + orig_snr
assert "3 data chunks" in log.getvalue()
# Pure-noise test
raw._data -= signal
raw_otp = oversampled_temporal_projection(raw, 2.0)
reduction = np.linalg.norm(raw[:][0], axis=-1) / np.linalg.norm(
raw_otp[:][0], axis=-1
)
assert reduction.min() > 9.0
# Degenerate conditions
raw = RawArray(np.zeros((200, 1000)), create_info(200, sfreq, "eeg"))
with pytest.raises(ValueError): # too short
oversampled_temporal_projection(raw, duration=198.0 / sfreq)
with pytest.raises(ValueError): # duration invalid
oversampled_temporal_projection(
raw, duration=raw.times[-1] + 2.0 / raw.info["sfreq"], verbose=True
)
raw._data[0, 0] = np.inf
with pytest.raises(RuntimeError): # too short
oversampled_temporal_projection(raw, duration=1.0)
@testing.requires_testing_data
def test_otp_real():
"""Test OTP on real data."""
for fname in (erm_fname, triux_fname):
raw = read_raw_fif(fname, allow_maxshield="yes").crop(0, 1)
raw.load_data().pick(raw.ch_names[:10])
raw_otp = oversampled_temporal_projection(raw, 1.0)
picks = _pick_data_channels(raw.info)
reduction = np.linalg.norm(raw[picks][0], axis=-1) / np.linalg.norm(
raw_otp[picks][0], axis=-1
)
assert reduction.min() > 1
# Handling of acquisition skips
raw = read_raw_fif(skip_fname, preload=True)
raw.pick(raw.ch_names[:10])
raw_otp = oversampled_temporal_projection(raw, duration=1.0)