[7f9fb8]: / mne / time_frequency / csd.py

Download this file

1607 lines (1386 with data), 50.1 kB

   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import copy as cp
import numbers
import numpy as np
from scipy.fft import rfftfreq
from .._fiff.pick import _picks_to_idx, pick_channels
from ..parallel import parallel_func
from ..time_frequency.multitaper import (
_compute_mt_params,
_csd_from_mt,
_mt_spectra,
_psd_from_mt_adaptive,
)
from ..utils import (
ProgressBar,
_check_fname,
_import_h5io_funcs,
_validate_type,
copy_function_doc_to_method_doc,
logger,
verbose,
warn,
)
from ..viz.misc import plot_csd
from .tfr import EpochsTFR, _cwt_array, _get_nfft, morlet
@verbose
def pick_channels_csd(
csd, include=(), exclude=(), ordered=True, copy=True, *, verbose=None
):
"""Pick channels from cross-spectral density matrix.
Parameters
----------
csd : instance of CrossSpectralDensity
The CSD object to select the channels from.
include : list of str
List of channels to include (if empty, include all available).
exclude : list of str
Channels to exclude (if empty, do not exclude any).
%(ordered)s
copy : bool
If True (the default), return a copy of the CSD matrix with the
modified channels. If False, channels are modified in-place.
.. versionadded:: 0.20.0
%(verbose)s
Returns
-------
res : instance of CrossSpectralDensity
Cross-spectral density restricted to selected channels.
"""
if copy:
csd = csd.copy()
sel = pick_channels(csd.ch_names, include=include, exclude=exclude, ordered=ordered)
data = []
for vec in csd._data.T:
mat = _vector_to_sym_mat(vec)
mat = mat[sel, :][:, sel]
data.append(_sym_mat_to_vector(mat))
ch_names = [csd.ch_names[i] for i in sel]
csd._data = np.array(data).T
csd.ch_names = ch_names
return csd
class CrossSpectralDensity:
"""Cross-spectral density.
Given a list of time series, the CSD matrix denotes for each pair of time
series, the cross-spectral density. This matrix is symmetric and internally
stored as a vector.
This object can store multiple CSD matrices: one for each frequency.
Use ``.get_data(freq)`` to obtain an CSD matrix as an ndarray.
Parameters
----------
data : ndarray, shape ((n_channels**2 + n_channels) // 2, n_frequencies)
For each frequency, the cross-spectral density matrix in vector format.
ch_names : list of str
List of string names for each channel.
frequencies : float | list of float | list of list of float
Frequency or frequencies for which the CSD matrix was calculated. When
averaging across frequencies (see the :func:`CrossSpectralDensity.mean`
function), this will be a list of lists that contains for each
frequency bin, the frequencies that were averaged. Frequencies should
always be sorted.
n_fft : int
The number of FFT points or samples that have been used in the
computation of this CSD.
tmin : float | None
Start of the time window for which CSD was calculated in seconds. Can
be ``None`` (the default) to indicate no timing information is
available.
tmax : float | None
End of the time window for which CSD was calculated in seconds. Can be
``None`` (the default) to indicate no timing information is available.
projs : list of Projection | None
List of projectors to apply to timeseries data when using this CSD
object to compute a DICS beamformer. Defaults to ``None``, which means
no projectors will be applied.
See Also
--------
csd_fourier
csd_multitaper
csd_morlet
csd_array_fourier
csd_array_multitaper
csd_array_morlet
"""
def __init__(
self, data, ch_names, frequencies, n_fft, tmin=None, tmax=None, projs=None
):
data = np.asarray(data)
if data.ndim == 1:
data = data[:, np.newaxis]
elif data.ndim > 2:
raise ValueError("`data` should be either a 1D or 2D array.")
self._data = data
if len(ch_names) != _n_dims_from_triu(len(data)):
raise ValueError(
"Number of ch_names does not match the number of "
"time series in the CSD matrix."
)
self.ch_names = list(ch_names)
self.tmin = tmin
self.tmax = tmax
if isinstance(frequencies, numbers.Number):
frequencies = [frequencies]
if len(frequencies) != data.shape[1]:
raise ValueError(
"Number of frequencies does not match the number of CSD matrices in "
f"the data array ({len(frequencies)} != {data.shape[1]})."
)
self.frequencies = frequencies
self.n_fft = n_fft
if projs is None:
self.projs = []
else:
self.projs = cp.deepcopy(projs)
@property
def n_channels(self):
"""Number of time series defined in this CSD object."""
return len(self.ch_names)
@property
def _is_sum(self):
"""Whether the CSD matrix represents a sum (or average) of freqs."""
# If the CSD is an average, the frequencies will be stored as a list
# of lists (or like-like objects) instead of plain numbers.
return not isinstance(self.frequencies[0], numbers.Number)
def __len__(self): # noqa: D105
"""Return number of frequencies.
Returns
-------
n_freqs : int
The number of frequencies.
"""
return len(self.frequencies)
def __repr__(self): # noqa: D105
# Make a pretty string representation of the frequencies
freq_strs = []
for f in self.frequencies:
if isinstance(f, numbers.Number):
freq_strs.append(str(f))
elif len(f) == 1:
freq_strs.append(str(f[0]))
else:
freq_strs.append(f"{np.min(f)}-{np.max(f)}")
freq_str = ", ".join(freq_strs) + " Hz."
if self.tmin is not None and self.tmax is not None:
time_str = f"{self.tmin} to {self.tmax} s"
else:
time_str = "unknown"
return (
"<CrossSpectralDensity | "
f"n_channels={self.n_channels}, time={time_str}, frequencies={freq_str}>"
)
def sum(self, fmin=None, fmax=None):
"""Calculate the sum CSD in the given frequency range(s).
If the exact given frequencies are not available, the nearest
frequencies will be chosen.
Parameters
----------
fmin : float | list of float | None
Lower bound of the frequency range in Hertz. Defaults to the lowest
frequency available. When a list of frequencies is given, these are
used as the lower bounds (inclusive) of frequency bins and the sum
is taken for each bin.
fmax : float | list of float | None
Upper bound of the frequency range in Hertz. Defaults to the
highest frequency available. When a list of frequencies is given,
these are used as the upper bounds (inclusive) of frequency bins
and the sum is taken for each bin.
Returns
-------
csd : instance of CrossSpectralDensity
The CSD matrix, summed across the given frequency range(s).
"""
if self._is_sum:
raise RuntimeError(
"This CSD matrix already represents a mean or sum across frequencies."
)
# Deal with the various ways in which fmin and fmax can be specified
if fmin is None and fmax is None:
fmin = [self.frequencies[0]]
fmax = [self.frequencies[-1]]
else:
if isinstance(fmin, numbers.Number):
fmin = [fmin]
if isinstance(fmax, numbers.Number):
fmax = [fmax]
if fmin is None:
fmin = [self.frequencies[0]] * len(fmax)
if fmax is None:
fmax = [self.frequencies[-1]] * len(fmin)
if any(fmin_ > fmax_ for fmin_, fmax_ in zip(fmin, fmax)):
raise ValueError(
"Some lower bounds are higher than the corresponding upper bounds."
)
# Find the index of the lower bound of each frequency bin
fmin_inds = [self._get_frequency_index(f) for f in fmin]
fmax_inds = [self._get_frequency_index(f) + 1 for f in fmax]
if len(fmin_inds) != len(fmax_inds):
raise ValueError("The length of fmin does not match the length of fmax.")
# Sum across each frequency bin
n_bins = len(fmin_inds)
new_data = np.zeros((self._data.shape[0], n_bins), dtype=self._data.dtype)
new_frequencies = []
for i, (min_ind, max_ind) in enumerate(zip(fmin_inds, fmax_inds)):
new_data[:, i] = self._data[:, min_ind:max_ind].sum(axis=1)
new_frequencies.append(self.frequencies[min_ind:max_ind])
csd_out = CrossSpectralDensity(
data=new_data,
ch_names=self.ch_names,
tmin=self.tmin,
tmax=self.tmax,
frequencies=new_frequencies,
n_fft=self.n_fft,
projs=self.projs,
)
return csd_out
def mean(self, fmin=None, fmax=None):
"""Calculate the mean CSD in the given frequency range(s).
Parameters
----------
fmin : float | list of float | None
Lower bound of the frequency range in Hertz. Defaults to the lowest
frequency available. When a list of frequencies is given, these are
used as the lower bounds (inclusive) of frequency bins and the mean
is taken for each bin.
fmax : float | list of float | None
Upper bound of the frequency range in Hertz. Defaults to the
highest frequency available. When a list of frequencies is given,
these are used as the upper bounds (inclusive) of frequency bins
and the mean is taken for each bin.
Returns
-------
csd : instance of CrossSpectralDensity
The CSD matrix, averaged across the given frequency range(s).
"""
csd = self.sum(fmin, fmax)
for i, f in enumerate(csd.frequencies):
csd._data[:, i] /= len(f)
return csd
def _get_frequency_index(self, freq):
"""Find the index of the given frequency in ``self.frequencies``.
If the exact given frequency is not available, the nearest frequencies
will be chosen, up to a difference of 1 Hertz.
Parameters
----------
freq : float
The frequency to find the index for
Returns
-------
index : int
The index of the frequency nearest to the requested frequency.
"""
if self._is_sum:
raise ValueError(
"This CSD object represents a mean across "
"frequencies. Cannot select a specific "
"frequency."
)
distance = np.abs(np.asarray(self.frequencies) - freq)
index = np.argmin(distance)
min_dist = distance[index]
if min_dist > 1:
raise IndexError(f"Frequency {freq:f} is not available.")
return index
def pick_frequency(self, freq=None, index=None):
"""Get a CrossSpectralDensity object with only the given frequency.
Parameters
----------
freq : float | None
Return the CSD matrix for a specific frequency. Only available
when no averaging across frequencies has been done.
index : int | None
Return the CSD matrix for the frequency or frequency-bin with the
given index.
Returns
-------
csd : instance of CrossSpectralDensity
A CSD object containing a single CSD matrix that corresponds to the
requested frequency or frequency-bin.
See Also
--------
get_data
"""
if freq is None and index is None:
raise ValueError(
'Use either the "freq" or "index" parameter to '
"select the desired frequency."
)
elif freq is not None:
if index is not None:
raise ValueError("Cannot specify both a frequency and index.")
index = self._get_frequency_index(freq)
return self[index]
def get_data(self, frequency=None, index=None, as_cov=False):
"""Get the CSD matrix for a given frequency as NumPy array.
If there is only one matrix defined in the CSD object, calling this
method without any parameters will return it. If multiple matrices are
defined, use either the ``frequency`` or ``index`` parameter to select
one.
Parameters
----------
frequency : float | None
Return the CSD matrix for a specific frequency. Only available when
no averaging across frequencies has been done.
index : int | None
Return the CSD matrix for the frequency or frequency-bin with the
given index.
as_cov : bool
Whether to return the data as a numpy array (`False`, the default),
or pack it in a :class:`mne.Covariance` object (`True`).
.. versionadded:: 0.20
Returns
-------
csd : ndarray, shape (n_channels, n_channels) | instance of Covariance
The CSD matrix corresponding to the requested frequency.
See Also
--------
pick_frequency
"""
if frequency is None and index is None:
if self._data.shape[1] > 1:
raise ValueError(
"Specify either the frequency or index of "
"the frequency bin for which to obtain the "
"CSD matrix."
)
index = 0
elif frequency is not None:
if index is not None:
raise ValueError("Cannot specify both a frequency and index.")
index = self._get_frequency_index(frequency)
data = _vector_to_sym_mat(self._data[:, index])
if as_cov:
# Pack the data into a Covariance object
from ..cov import Covariance # to avoid circular import
return Covariance(
data, self.ch_names, bads=[], projs=self.projs, nfree=self.n_fft
)
else:
return data
@copy_function_doc_to_method_doc(plot_csd)
def plot(
self,
info=None,
mode="csd",
colorbar=True,
cmap="viridis",
n_cols=None,
show=True,
):
return plot_csd(
self,
info=info,
mode=mode,
colorbar=colorbar,
cmap=cmap,
n_cols=n_cols,
show=show,
)
def __setstate__(self, state): # noqa: D105
# Avoid circular import
from ..proj import Projection
self._data = state["data"]
self.tmin = state["tmin"]
self.tmax = state["tmax"]
self.ch_names = state["ch_names"]
self.frequencies = state["frequencies"]
self.n_fft = state["n_fft"]
self.projs = [Projection(**proj) for proj in state["projs"]]
def __getstate__(self): # noqa: D105
return dict(
data=self._data,
tmin=self.tmin,
tmax=self.tmax,
ch_names=self.ch_names,
frequencies=self.frequencies,
n_fft=self.n_fft,
projs=self.projs,
)
def __getitem__(self, sel): # noqa: D105
"""Subselect frequencies.
Parameters
----------
sel : ndarray
Array of frequency indices to subselect.
Returns
-------
csd : instance of CrossSpectralDensity
A new CSD instance with the subset of frequencies.
"""
return CrossSpectralDensity(
data=self._data[:, sel],
ch_names=self.ch_names,
tmin=self.tmin,
tmax=self.tmax,
frequencies=np.atleast_1d(self.frequencies)[sel].tolist(),
n_fft=self.n_fft,
projs=self.projs,
)
@verbose
def save(self, fname, *, overwrite=False, verbose=None):
"""Save the CSD to an HDF5 file.
Parameters
----------
fname : path-like
The name of the file to save the CSD to. The extension ``'.h5'``
will be appended if the given filename doesn't have it already.
%(overwrite)s
.. versionadded:: 1.0
%(verbose)s
.. versionadded:: 1.0
See Also
--------
read_csd : For reading CSD objects from a file.
"""
_, write_hdf5 = _import_h5io_funcs()
fname = _check_fname(fname, overwrite=True)
if fname.suffix != ".h5":
fname = fname.with_name(f"{fname.name}.h5")
fname = _check_fname(fname, overwrite=overwrite)
write_hdf5(fname, self.__getstate__(), overwrite=True, title="conpy")
def copy(self):
"""Return copy of the CrossSpectralDensity object.
Returns
-------
copy : instance of CrossSpectralDensity
A copy of the object.
"""
return cp.deepcopy(self)
def pick_channels(self, ch_names, ordered=False):
"""Pick channels from this cross-spectral density matrix.
Parameters
----------
ch_names : list of str
List of channels to keep. All other channels are dropped.
ordered : bool
If True (default False), ensure that the order of the channels
matches the order of ``ch_names``.
Returns
-------
csd : instance of CrossSpectralDensity.
The modified cross-spectral density object.
Notes
-----
Operates in-place.
.. versionadded:: 0.20.0
"""
return pick_channels_csd(
self, include=ch_names, exclude=[], ordered=ordered, copy=False
)
def _n_dims_from_triu(n):
"""Compute matrix dims from number of elements in the upper triangle.
Parameters
----------
n : int
Number of elements in the upper triangle of the symmetric matrix.
Returns
-------
dim : int
The dimensions of the symmetric matrix.
"""
return int(np.ceil(np.sqrt(n * 2))) - 1
def _vector_to_sym_mat(vec):
"""Convert vector to a symmetric matrix.
The upper triangle of the matrix (including the diagonal) will be filled
with the values of the vector.
Parameters
----------
vec : list or 1d-array
The vector to convert to a symmetric matrix.
Returns
-------
mat : 2d-array
The symmetric matrix.
See Also
--------
_sym_mat_to_vector
"""
dim = _n_dims_from_triu(len(vec))
mat = np.zeros((dim, dim) + vec.shape[1:], dtype=vec.dtype)
# Fill the upper triangle of the matrix
mat[np.triu_indices(dim)] = vec
# Fill out the lower tri (make conjugate to ensure matrix is hermitian)
mat = mat + np.rollaxis(mat, 1).conj()
# We counted the diagonal twice
if np.issubdtype(mat.dtype, np.integer):
mat[np.diag_indices(dim)] //= 2
else:
mat[np.diag_indices(dim)] /= 2
return mat
def _sym_mat_to_vector(mat):
"""Convert a symmetric matrix to a vector.
The upper triangle of the matrix (including the diagonal) will be used
as the values of the vector.
Parameters
----------
mat : 2d-array
The symmetric matrix to convert to a vector
Returns
-------
vec : 1d-array
A vector consisting of the values of the upper triangle of the matrix.
See Also
--------
_vector_to_sym_mat
"""
return mat[np.triu_indices_from(mat)]
def read_csd(fname):
"""Read a CrossSpectralDensity object from an HDF5 file.
Parameters
----------
fname : path-like
The name of the file to read the CSD from. The extension ``'.h5'`` will
be appended if the given filename doesn't have it already.
Returns
-------
csd : instance of CrossSpectralDensity
The CSD that was stored in the file.
See Also
--------
CrossSpectralDensity.save : For saving CSD objects.
"""
read_hdf5, _ = _import_h5io_funcs()
if not fname.endswith(".h5"):
fname += ".h5"
csd_dict = read_hdf5(fname, title="conpy")
if csd_dict["projs"] is not None:
# Avoid circular import
from ..proj import Projection
csd_dict["projs"] = [Projection(**proj) for proj in csd_dict["projs"]]
return CrossSpectralDensity(**csd_dict)
@verbose
def csd_fourier(
epochs,
fmin=0,
fmax=np.inf,
tmin=None,
tmax=None,
picks=None,
n_fft=None,
projs=None,
n_jobs=None,
*,
verbose=None,
):
"""Estimate cross-spectral density from an array using short-time fourier.
Parameters
----------
epochs : instance of Epochs
The epochs to compute the CSD for.
fmin : float
Minimum frequency of interest, in Hertz.
fmax : float | np.inf
Maximum frequency of interest, in Hertz.
tmin : float | None
Minimum time instant to consider, in seconds. If ``None`` start at
first sample.
tmax : float | None
Maximum time instant to consider, in seconds. If ``None`` end at last
sample.
%(picks_good_data_noref)s
n_fft : int | None
Length of the FFT. If ``None``, the exact number of samples between
``tmin`` and ``tmax`` will be used.
projs : list of Projection | None
List of projectors to store in the CSD object. Defaults to ``None``,
which means the projectors defined in the Epochs object will be copied.
%(n_jobs)s
%(verbose)s
Returns
-------
csd : instance of CrossSpectralDensity
The computed cross-spectral density.
See Also
--------
csd_array_fourier
csd_array_morlet
csd_array_multitaper
csd_morlet
csd_multitaper
"""
epochs, projs = _prepare_csd(epochs, tmin, tmax, picks, projs)
return csd_array_fourier(
epochs.get_data(copy=False),
sfreq=epochs.info["sfreq"],
t0=epochs.tmin,
fmin=fmin,
fmax=fmax,
tmin=tmin,
tmax=tmax,
ch_names=epochs.ch_names,
n_fft=n_fft,
projs=projs,
n_jobs=n_jobs,
verbose=verbose,
)
@verbose
def csd_array_fourier(
X,
sfreq,
t0=0,
fmin=0,
fmax=np.inf,
tmin=None,
tmax=None,
ch_names=None,
n_fft=None,
projs=None,
n_jobs=None,
*,
verbose=None,
):
"""Estimate cross-spectral density from an array using short-time fourier.
Parameters
----------
X : array-like, shape (n_epochs, n_channels, n_times)
The time series data consisting of n_epochs separate observations
of signals with n_channels time-series of length n_times.
sfreq : float
Sampling frequency of observations.
t0 : float
Time of the first sample relative to the onset of the epoch, in
seconds. Defaults to 0.
fmin : float
Minimum frequency of interest, in Hertz.
fmax : float | np.inf
Maximum frequency of interest, in Hertz.
tmin : float | None
Minimum time instant to consider, in seconds. If ``None`` start at
first sample.
tmax : float | None
Maximum time instant to consider, in seconds. If ``None`` end at last
sample.
ch_names : list of str | None
A name for each time series. If ``None`` (the default), the series will
be named 'SERIES###'.
n_fft : int | None
Length of the FFT. If ``None``, the exact number of samples between
``tmin`` and ``tmax`` will be used.
projs : list of Projection | None
List of projectors to store in the CSD object. Defaults to ``None``,
which means no projectors are stored.
%(n_jobs)s
%(verbose)s
Returns
-------
csd : instance of CrossSpectralDensity
The computed cross-spectral density.
See Also
--------
csd_array_morlet
csd_array_multitaper
csd_fourier
csd_morlet
csd_multitaper
"""
X, times, tmin, tmax, fmin, fmax = _prepare_csd_array(
X, sfreq, t0, tmin, tmax, fmin, fmax
)
# Slice X to the requested time window
tstart = None if tmin is None else np.searchsorted(times, tmin - 1e-10)
tstop = None if tmax is None else np.searchsorted(times, tmax + 1e-10)
X = X[:, :, tstart:tstop]
times = times[tstart:tstop]
n_times = len(times)
n_fft = n_times if n_fft is None else n_fft
# Preparing frequencies of interest
orig_frequencies = rfftfreq(n_fft, 1.0 / sfreq)
freq_mask = (
(orig_frequencies > 0) & (orig_frequencies >= fmin) & (orig_frequencies <= fmax)
)
frequencies = orig_frequencies[freq_mask]
if len(frequencies) == 0:
raise ValueError(
"No discrete fourier transform results within "
"the given frequency window. Please widen either "
"the frequency window or the time window"
)
# Compute the CSD
return _execute_csd_function(
X,
times,
frequencies,
_csd_fourier,
params=[sfreq, n_times, freq_mask, n_fft],
n_fft=n_fft,
ch_names=ch_names,
projs=projs,
n_jobs=n_jobs,
verbose=verbose,
)
@verbose
def csd_multitaper(
epochs,
fmin=0,
fmax=np.inf,
tmin=None,
tmax=None,
picks=None,
n_fft=None,
bandwidth=None,
adaptive=False,
low_bias=True,
projs=None,
n_jobs=None,
*,
verbose=None,
):
"""Estimate cross-spectral density from epochs using a multitaper method.
Parameters
----------
epochs : instance of Epochs
The epochs to compute the CSD for.
fmin : float | None
Minimum frequency of interest, in Hertz.
fmax : float | np.inf
Maximum frequency of interest, in Hertz.
tmin : float
Minimum time instant to consider, in seconds. If ``None`` start at
first sample.
tmax : float | None
Maximum time instant to consider, in seconds. If ``None`` end at last
sample.
%(picks_good_data_noref)s
n_fft : int | None
Length of the FFT. If ``None``, the exact number of samples between
``tmin`` and ``tmax`` will be used.
bandwidth : float | None
The bandwidth of the multitaper windowing function in Hz.
adaptive : bool
Use adaptive weights to combine the tapered spectra into PSD.
low_bias : bool
Only use tapers with more than 90%% spectral concentration within
bandwidth.
projs : list of Projection | None
List of projectors to store in the CSD object. Defaults to ``None``,
which means the projectors defined in the Epochs object will by copied.
%(n_jobs)s
%(verbose)s
Returns
-------
csd : instance of CrossSpectralDensity
The computed cross-spectral density.
See Also
--------
csd_array_fourier
csd_array_morlet
csd_array_multitaper
csd_fourier
csd_morlet
"""
epochs, projs = _prepare_csd(epochs, tmin, tmax, picks, projs)
return csd_array_multitaper(
epochs.get_data(copy=False),
sfreq=epochs.info["sfreq"],
t0=epochs.tmin,
fmin=fmin,
fmax=fmax,
tmin=tmin,
tmax=tmax,
ch_names=epochs.ch_names,
n_fft=n_fft,
bandwidth=bandwidth,
adaptive=adaptive,
low_bias=low_bias,
projs=projs,
n_jobs=n_jobs,
verbose=verbose,
)
@verbose
def csd_array_multitaper(
X,
sfreq,
t0=0,
fmin=0,
fmax=np.inf,
tmin=None,
tmax=None,
ch_names=None,
n_fft=None,
bandwidth=None,
adaptive=False,
low_bias=True,
projs=None,
n_jobs=None,
max_iter=250,
*,
verbose=None,
):
"""Estimate cross-spectral density from an array using a multitaper method.
Parameters
----------
X : array-like, shape (n_epochs, n_channels, n_times)
The time series data consisting of n_epochs separate observations
of signals with n_channels time-series of length n_times.
sfreq : float
Sampling frequency of observations.
t0 : float
Time of the first sample relative to the onset of the epoch, in
seconds. Defaults to 0.
fmin : float
Minimum frequency of interest, in Hertz.
fmax : float | np.inf
Maximum frequency of interest, in Hertz.
tmin : float | None
Minimum time instant to consider, in seconds. If ``None`` start at
first sample.
tmax : float | None
Maximum time instant to consider, in seconds. If ``None`` end at last
sample.
ch_names : list of str | None
A name for each time series. If ``None`` (the default), the series will
be named 'SERIES###'.
n_fft : int | None
Length of the FFT. If ``None``, the exact number of samples between
``tmin`` and ``tmax`` will be used.
bandwidth : float | None
The bandwidth of the multitaper windowing function in Hz.
adaptive : bool
Use adaptive weights to combine the tapered spectra into PSD.
low_bias : bool
Only use tapers with more than 90%% spectral concentration within
bandwidth.
projs : list of Projection | None
List of projectors to store in the CSD object. Defaults to ``None``,
which means no projectors are stored.
%(n_jobs)s
%(max_iter_multitaper)s
%(verbose)s
Returns
-------
csd : instance of CrossSpectralDensity
The computed cross-spectral density.
See Also
--------
csd_array_fourier
csd_array_morlet
csd_fourier
csd_morlet
csd_multitaper
"""
X, times, tmin, tmax, fmin, fmax = _prepare_csd_array(
X, sfreq, t0, tmin, tmax, fmin, fmax
)
# Slice X to the requested time window
tstart = None if tmin is None else np.searchsorted(times, tmin - 1e-10)
tstop = None if tmax is None else np.searchsorted(times, tmax + 1e-10)
X = X[:, :, tstart:tstop]
times = times[tstart:tstop]
n_times = len(times)
n_fft = n_times if n_fft is None else n_fft
window_fun, eigvals, adaptive = _compute_mt_params(
n_times, sfreq, bandwidth, low_bias, adaptive
)
# Preparing frequencies of interest
orig_frequencies = rfftfreq(n_fft, 1.0 / sfreq)
freq_mask = (
(orig_frequencies > 0) & (orig_frequencies >= fmin) & (orig_frequencies <= fmax)
)
frequencies = orig_frequencies[freq_mask]
if len(frequencies) == 0:
raise ValueError(
"No discrete fourier transform results within "
"the given frequency window. Please widen either "
"the frequency window or the time window"
)
# Compute the CSD
return _execute_csd_function(
X,
times,
frequencies,
_csd_multitaper,
params=[
sfreq,
n_times,
window_fun,
eigvals,
freq_mask,
n_fft,
adaptive,
max_iter,
],
n_fft=n_fft,
ch_names=ch_names,
projs=projs,
n_jobs=n_jobs,
verbose=verbose,
)
@verbose
def csd_morlet(
epochs,
frequencies,
tmin=None,
tmax=None,
picks=None,
n_cycles=7,
use_fft=True,
decim=1,
projs=None,
n_jobs=None,
*,
verbose=None,
):
"""Estimate cross-spectral density from epochs using Morlet wavelets.
Parameters
----------
epochs : instance of Epochs
The epochs to compute the CSD for.
frequencies : list of float
The frequencies of interest, in Hertz.
tmin : float | None
Minimum time instant to consider, in seconds. If ``None`` start at
first sample.
tmax : float | None
Maximum time instant to consider, in seconds. If ``None`` end at last
sample.
%(picks_good_data_noref)s
n_cycles : float | list of float | None
Number of cycles to use when constructing Morlet wavelets. Fixed number
or one per frequency. Defaults to 7.
use_fft : bool
Whether to use FFT-based convolution to compute the wavelet transform.
Defaults to True.
decim : int | slice
To reduce memory usage, decimation factor during time-frequency
decomposition. Defaults to 1 (no decimation).
If `int`, uses tfr[..., ::decim].
If `slice`, uses tfr[..., decim].
projs : list of Projection | None
List of projectors to store in the CSD object. Defaults to ``None``,
which means the projectors defined in the Epochs object will be copied.
%(n_jobs)s
%(verbose)s
Returns
-------
csd : instance of CrossSpectralDensity
The computed cross-spectral density.
See Also
--------
csd_array_fourier
csd_array_morlet
csd_array_multitaper
csd_fourier
csd_multitaper
"""
epochs, projs = _prepare_csd(epochs, tmin, tmax, picks, projs)
return csd_array_morlet(
epochs.get_data(copy=False),
sfreq=epochs.info["sfreq"],
frequencies=frequencies,
t0=epochs.tmin,
tmin=tmin,
tmax=tmax,
ch_names=epochs.ch_names,
n_cycles=n_cycles,
use_fft=use_fft,
decim=decim,
projs=projs,
n_jobs=n_jobs,
verbose=verbose,
)
@verbose
def csd_array_morlet(
X,
sfreq,
frequencies,
t0=0,
tmin=None,
tmax=None,
ch_names=None,
n_cycles=7,
use_fft=True,
decim=1,
projs=None,
n_jobs=None,
*,
verbose=None,
):
"""Estimate cross-spectral density from an array using Morlet wavelets.
Parameters
----------
X : array-like, shape (n_epochs, n_channels, n_times)
The time series data consisting of n_epochs separate observations
of signals with n_channels time-series of length n_times.
sfreq : float
Sampling frequency of observations.
frequencies : list of float
The frequencies of interest, in Hertz.
t0 : float
Time of the first sample relative to the onset of the epoch, in
seconds. Defaults to 0.
tmin : float | None
Minimum time instant to consider, in seconds. If ``None`` start at
first sample.
tmax : float | None
Maximum time instant to consider, in seconds. If ``None`` end at last
sample.
ch_names : list of str | None
A name for each time series. If ``None`` (the default), the series will
be named 'SERIES###'.
n_cycles : float | list of float | None
Number of cycles to use when constructing Morlet wavelets. Fixed number
or one per frequency. Defaults to 7.
use_fft : bool
Whether to use FFT-based convolution to compute the wavelet transform.
Defaults to True.
decim : int | slice
To reduce memory usage, decimation factor during time-frequency
decomposition. Defaults to 1 (no decimation).
If `int`, uses tfr[..., ::decim].
If `slice`, uses tfr[..., decim].
projs : list of Projection | None
List of projectors to store in the CSD object. Defaults to ``None``,
which means the projectors defined in the Epochs object will be copied.
%(n_jobs)s
%(verbose)s
Returns
-------
csd : instance of CrossSpectralDensity
The computed cross-spectral density.
See Also
--------
csd_array_fourier
csd_array_multitaper
csd_fourier
csd_morlet
csd_multitaper
"""
X, times, tmin, tmax, _, _ = _prepare_csd_array(X, sfreq, t0, tmin, tmax)
n_times = len(times)
# Construct the appropriate Morlet wavelets
wavelets = morlet(sfreq, frequencies, n_cycles)
# Slice X to the requested time window + half the length of the longest
# wavelet.
wave_length = len(wavelets[np.argmin(frequencies)]) // 2
tstart = tstop = None
if tmin is not None:
tstart = np.searchsorted(times, tmin)
tstart = max(0, tstart - wave_length)
if tmax is not None:
tstop = np.searchsorted(times, tmax)
tstop = min(n_times, tstop + wave_length)
X = X[:, :, tstart:tstop]
times = times[tstart:tstop]
# After CSD computation, we slice again to the requested time window.
csd_tstart = None if tmin is None else np.searchsorted(times, tmin - 1e-10)
csd_tstop = None if tmax is None else np.searchsorted(times, tmax + 1e-10)
csd_tslice = slice(csd_tstart, csd_tstop)
times = times[csd_tslice]
# Compute the CSD
nfft = _get_nfft(wavelets, X, use_fft)
return _execute_csd_function(
X,
times,
frequencies,
_csd_morlet,
params=[sfreq, wavelets, nfft, csd_tslice, use_fft, decim],
n_fft=1,
ch_names=ch_names,
projs=projs,
n_jobs=n_jobs,
verbose=verbose,
)
def _prepare_csd(epochs, tmin=None, tmax=None, picks=None, projs=None):
"""Do some checking and preprocessing of common csd_* parameters.
See the csd_* functions for documentation of the parameters.
"""
tstep = epochs.times[1] - epochs.times[0]
if tmin is not None and tmin < epochs.times[0] - tstep:
raise ValueError("tmin should be larger than the smallest data time point")
if tmax is not None and tmax > epochs.times[-1] + tstep:
raise ValueError("tmax should be smaller than the largest data time point")
if tmax is not None and tmin is not None:
if tmax < tmin:
raise ValueError("tmax must be larger than tmin")
if epochs.baseline is None and epochs.info["highpass"] < 0.1:
warn(
"Epochs are not baseline corrected or enough highpass filtered. "
"Cross-spectral density may be inaccurate."
)
picks = _picks_to_idx(epochs.info, picks, "data", with_ref_meg=False)
epochs = epochs.copy().pick(picks)
if projs is None:
projs = epochs.info["projs"]
return epochs, projs
def _prepare_csd_array(X, sfreq, t0, tmin, tmax, fmin=None, fmax=None):
"""Do some checking and preprocessing of common csd_r=array_* parameters.
See the csd_array_* functions for documentation of the parameters.
"""
X = np.asarray(X, dtype=float)
if X.ndim != 3:
raise ValueError("X must be n_epochs x n_channels x n_times.")
n_times = X.shape[2]
tstep = 1.0 / sfreq
times = np.arange(n_times) * tstep + t0
# Check tmin and tmax
if tmax is None:
tmax = times.max()
if tmin is None:
tmin = times.min()
if tmax <= tmin:
raise ValueError("tmax must be larger than tmin")
if tmin < times[0] - tstep:
raise ValueError("tmin should be larger than the smallest data time point")
if tmax > times[-1] + tstep:
raise ValueError("tmax should be smaller than the largest data time point")
# Check fmin and fmax
if fmax is not None and fmin is not None and fmax <= fmin:
raise ValueError("fmax must be larger than fmin")
return X, times, tmin, tmax, fmin, fmax
@verbose
def _execute_csd_function(
X,
times,
frequencies,
csd_function,
params,
n_fft,
ch_names=None,
projs=None,
n_jobs=None,
*,
verbose=None,
):
"""Estimate cross-spectral density with a given function.
This function will apply the given CSD function in parallel across epochs.
Parameters
----------
X : array-like, shape (n_epochs, n_channels, n_times)
The time series data consisting of n_epochs separate observations
of signals with n_channels time-series of length n_times.
times : float
Timestamps for each sample.
frequencies : list of float
The frequencies of interest for which the CSD is going to be computed.
csd_function : function
Function that performs the actual CSD computation
params : list
List of parameters to pass the CSD function.
n_fft : int
Number of FFT points. This is stored in the CSD object.
ch_names : list of str | None
A name for each time series. If ``None`` (the default), the series will
be named 'SERIES###'.
projs : list of Projection | None
List of projectors to store in the CSD object. Defaults to ``None``,
which means the projectors defined in the Epochs object will be copied.
%(n_jobs)s
%(verbose)s
Returns
-------
csd : instance of CrossSpectralDensity
The computed cross-spectral density.
"""
n_epochs, n_channels, _ = X.shape
logger.info("Computing cross-spectral density from epochs...")
n_freqs = len(frequencies)
csds_mean = np.zeros(
(n_channels * (n_channels + 1) // 2, n_freqs), dtype=np.complex128
)
# Prepare the function that does the actual CSD computation for parallel
# execution.
parallel, my_csd, n_jobs = parallel_func(csd_function, n_jobs, verbose=verbose)
# Compute CSD for each trial
n_blocks = int(np.ceil(n_epochs / float(n_jobs)))
for i in ProgressBar(range(n_blocks), mesg="CSD epoch blocks"):
epoch_block = X[i * n_jobs : (i + 1) * n_jobs]
csds = parallel(my_csd(this_epoch, *params) for this_epoch in epoch_block)
# Add CSD matrices in-place
csds_mean += np.sum(csds, axis=0)
csds_mean /= n_epochs
logger.info("[done]")
if ch_names is None:
ch_names = [f"SERIES{i + 1:03}" for i in range(n_channels)]
return CrossSpectralDensity(
csds_mean,
ch_names=ch_names,
tmin=times[0],
tmax=times[-1],
frequencies=frequencies,
n_fft=n_fft,
projs=projs,
)
def _csd_fourier(X, sfreq, n_times, freq_mask, n_fft):
"""Compute cross spectral density (CSD) using short-time fourier transform.
Computes the CSD for a single epoch of data.
Parameters
----------
X : ndarray, shape (n_channels, n_times)
The time series data consisting of n_channels time-series of length
n_times.
sfreq : float
The sampling frequency of the data in Hertz.
n_times : int
Number of time samples
freq_mask : ndarray
Which frequencies to use.
n_fft : int
Length of the FFT.
"""
x_mt, _ = _mt_spectra(X, np.hanning(n_times), sfreq, n_fft)
# Hack so we can sum over axis=-2
weights = np.array([1.0])[:, np.newaxis, np.newaxis, np.newaxis]
x_mt = x_mt[:, :, freq_mask]
# Calculating CSD
# Tiling x_mt so that we can easily use _csd_from_mt()
x_mt = x_mt[:, np.newaxis, :, :]
x_mt = np.tile(x_mt, [1, x_mt.shape[0], 1, 1])
y_mt = np.transpose(x_mt, axes=[1, 0, 2, 3])
weights_y = np.transpose(weights, axes=[1, 0, 2, 3])
csds = _csd_from_mt(x_mt, y_mt, weights, weights_y)
# FIXME: don't compute full matrix in the first place
csds = np.array(
[_sym_mat_to_vector(csds[:, :, i]) for i in range(csds.shape[-1])]
).T
# Scaling by number of samples and compensating for loss of power
# due to windowing (see section 11.5.2 in Bendat & Piersol).
csds /= n_times
csds *= 8 / 3.0
# Scaling by sampling frequency for compatibility with Matlab
csds /= sfreq
return csds
def _csd_multitaper(
X, sfreq, n_times, window_fun, eigvals, freq_mask, n_fft, adaptive, max_iter=250
):
"""Compute cross spectral density (CSD) using multitaper module."""
x_mt, _ = _mt_spectra(X, window_fun, sfreq, n_fft)
if adaptive:
# Compute adaptive weights
_, weights = _psd_from_mt_adaptive(
x_mt, eigvals, freq_mask, max_iter, return_weights=True
)
# Tiling weights so that we can easily use _csd_from_mt()
weights = weights[:, np.newaxis, :, :]
weights = np.tile(weights, [1, x_mt.shape[0], 1, 1])
else:
# Do not use adaptive weights
weights = np.sqrt(eigvals)[np.newaxis, np.newaxis, :, np.newaxis]
x_mt = x_mt[:, :, freq_mask]
# Calculating CSD
# Tiling x_mt so that we can easily use _csd_from_mt()
x_mt = x_mt[:, np.newaxis, :, :]
x_mt = np.tile(x_mt, [1, x_mt.shape[0], 1, 1])
y_mt = np.transpose(x_mt, axes=[1, 0, 2, 3])
weights_y = np.transpose(weights, axes=[1, 0, 2, 3])
csds = _csd_from_mt(x_mt, y_mt, weights, weights_y)
# FIXME: don't compute full matrix in the first place
csds = np.array(
[_sym_mat_to_vector(csds[:, :, i]) for i in range(csds.shape[-1])]
).T
# Scaling by sampling frequency for compatibility with Matlab
csds /= sfreq
return csds
def _csd_morlet(data, sfreq, wavelets, nfft, tslice=None, use_fft=True, decim=1):
"""Compute cross spectral density (CSD) using the given Morlet wavelets.
Computes the CSD for a single epoch of data.
Parameters
----------
data : ndarray, shape (n_channels, n_times)
The time series data consisting of n_channels time-series of length
n_times.
sfreq : float
The sampling frequency of the data in Hertz.
wavelets : list of ndarray
The Morlet wavelets for which to compute the CSD's. These have been
created by the `mne.time_frequency.tfr.morlet` function.
nfft : int
The number of FFT points.
tslice : slice | None
The desired time samples to compute the CSD over. If None, defaults to
including all time samples.
use_fft : bool
Whether to use FFT-based convolution to compute the wavelet transform.
Defaults to True.
decim : int | slice
To reduce memory usage, decimation factor during time-frequency
decomposition. Defaults to 1 (no decimation).
Only used in 'cwt_morlet' mode.
If `int`, uses tfr[..., ::decim].
If `slice`, uses tfr[..., decim].
Returns
-------
csd : ndarray, shape ((n_channels**2 + n_channels) / 2 , n_wavelets)
For each wavelet, the upper triangle of the cross spectral density
matrix.
See Also
--------
_vector_to_sym_mat : For converting the CSD to a full matrix.
"""
# Compute PSD
psds = _cwt_array(data, wavelets, nfft, mode="same", use_fft=use_fft, decim=decim)
if tslice is not None:
tstart = None if tslice.start is None else tslice.start // decim
tstop = None if tslice.stop is None else tslice.stop // decim
tstep = None if tslice.step is None else tslice.step // decim
tslice = slice(tstart, tstop, tstep)
psds = psds[:, :, tslice]
psds_conj = np.conj(psds)
# Compute the spectral density between all pairs of series
n_channels = data.shape[0]
csds = np.vstack(
[np.mean(psds[[i]] * psds_conj[i:], axis=2) for i in range(n_channels)]
)
# Scaling by sampling frequency for compatibility with Matlab
csds /= sfreq
return csds
@verbose
def csd_tfr(epochs_tfr, tmin=None, tmax=None, picks=None, projs=None, verbose=None):
"""Compute covariance matrices across frequencies for TFR epochs.
Parameters
----------
epochs_tfr : EpochsTFR
The time-frequency resolved epochs over which to compute the
covariance.
tmin : float | None
Minimum time instant to consider, in seconds. If ``None`` start at
first sample.
tmax : float | None
Maximum time instant to consider, in seconds. If ``None`` end at last
sample.
%(picks_good_data_noref)s
projs : list of Projection | None
List of projectors to store in the CSD object. Defaults to ``None``,
which means the projectors defined in the EpochsTFR object will be
copied.
%(verbose)s
Returns
-------
res : instance of CrossSpectralDensity
Cross-spectral density restricted to selected channels.
"""
_validate_type(epochs_tfr, EpochsTFR)
epochs_tfr, projs = _prepare_csd(epochs_tfr, tmin, tmax, picks, projs)
X = epochs_tfr.data
times = epochs_tfr.times
n_channels, n_freqs = len(epochs_tfr.ch_names), epochs_tfr.freqs.size
data = np.zeros((n_channels * (n_channels + 1) // 2, n_freqs), dtype=np.complex128)
# Slice X to the requested time window
tstart = None if tmin is None else np.searchsorted(times, tmin - 1e-10)
tstop = None if tmax is None else np.searchsorted(times, tmax + 1e-10)
X = X[:, :, :, tstart:tstop]
for idx, epochs_data in enumerate(X):
# This is equivalent to:
# csds = np.vstack([np.mean(epochs_data[[i]] * epochs_data_conj[i:],
# axis=2) for i in range(n_channels)])
# There is a redundancy in the calculation here because we don't really
# need the lower triangle of the matrix, but it should still be faster
# than a loop (hopefully!).
csds = np.einsum("xft,yft->xyf", epochs_data, np.conj(epochs_data))
csds = csds[np.triu_indices(n_channels) + (slice(None),)]
csds /= epochs_data.shape[-1]
# Scaling by sampling frequency for compatibility with Matlab
csds /= epochs_tfr.info["sfreq"]
data += csds
# scale to compute mean
data /= len(epochs_tfr)
# TO DO: EpochTFR should store n_fft to be consistent
return CrossSpectralDensity(
data=data,
ch_names=epochs_tfr.ch_names,
tmin=tmin,
tmax=tmax,
frequencies=epochs_tfr.freqs,
n_fft=None,
projs=projs,
)