[074d3d]: / mne / forward / _compute_forward.py

Download this file

898 lines (775 with data), 34.2 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
# The computations in this code were primarily derived from Matti Hämäläinen's
# C code.
#
# Many of the idealized equations behind these calculations can be found in:
# 1) Realistic conductivity geometry model of the human head for interpretation
# of neuromagnetic data. Hämäläinen and Sarvas, 1989. Specific to MNE
# 2) EEG and MEG: forward solutions for inverse methods. Mosher, Leahy, and
# Lewis, 1999. Generalized discussion of forward solutions.
from copy import deepcopy
import numpy as np
from .._fiff.constants import FIFF
from ..bem import _import_openmeeg, _make_openmeeg_geometry
from ..fixes import bincount, jit
from ..parallel import parallel_func
from ..surface import _jit_cross, _project_onto_surface
from ..transforms import apply_trans, invert_transform
from ..utils import _check_option, _pl, fill_doc, logger, verbose, warn
# #############################################################################
# COIL SPECIFICATION AND FIELD COMPUTATION MATRIX
def _dup_coil_set(coils, coord_frame, t):
"""Make a duplicate."""
if t is not None and coord_frame != t["from"]:
raise RuntimeError("transformation frame does not match the coil set")
coils = deepcopy(coils)
if t is not None:
coord_frame = t["to"]
for coil in coils:
for key in ("ex", "ey", "ez"):
if key in coil:
coil[key] = apply_trans(t["trans"], coil[key], False)
coil["r0"] = apply_trans(t["trans"], coil["r0"])
coil["rmag"] = apply_trans(t["trans"], coil["rmag"])
coil["cosmag"] = apply_trans(t["trans"], coil["cosmag"], False)
coil["coord_frame"] = t["to"]
return coils, coord_frame
def _check_coil_frame(coils, coord_frame, bem):
"""Check to make sure the coils are in the correct coordinate frame."""
if coord_frame != FIFF.FIFFV_COORD_MRI:
if coord_frame == FIFF.FIFFV_COORD_HEAD:
# Make a transformed duplicate
coils, coord_frame = _dup_coil_set(coils, coord_frame, bem["head_mri_t"])
else:
raise RuntimeError(f"Bad coil coordinate frame {coord_frame}")
return coils, coord_frame
@fill_doc
def _lin_field_coeff(surf, mult, rmags, cosmags, ws, bins, n_jobs):
"""Parallel wrapper for _do_lin_field_coeff to compute linear coefficients.
Parameters
----------
surf : dict
Dict containing information for one surface of the BEM
mult : float
Multiplier for particular BEM surface (Iso Skull Approach discussed in
Mosher et al., 1999 and Hämäläinen and Sarvas, 1989 Section III?)
rmag : ndarray, shape (n_integration_pts, 3)
3D positions of MEG coil integration points (from coil['rmag'])
cosmag : ndarray, shape (n_integration_pts, 3)
Direction of the MEG coil integration points (from coil['cosmag'])
ws : ndarray, shape (n_integration_pts,)
Weights for MEG coil integration points
bins : ndarray, shape (n_integration_points,)
The sensor assignments for each rmag/cosmag/w.
%(n_jobs)s
Returns
-------
coeff : list
Linear coefficients with lead fields for each BEM vertex on each sensor
(?)
"""
parallel, p_fun, n_jobs = parallel_func(
_do_lin_field_coeff, n_jobs, max_jobs=len(surf["tris"])
)
nas = np.array_split
coeffs = parallel(
p_fun(surf["rr"], t, tn, ta, rmags, cosmags, ws, bins)
for t, tn, ta in zip(
nas(surf["tris"], n_jobs),
nas(surf["tri_nn"], n_jobs),
nas(surf["tri_area"], n_jobs),
)
)
return mult * np.sum(coeffs, axis=0)
@jit()
def _do_lin_field_coeff(bem_rr, tris, tn, ta, rmags, cosmags, ws, bins):
"""Compute field coefficients (parallel-friendly).
See section IV of Mosher et al., 1999 (specifically equation 35).
Parameters
----------
bem_rr : ndarray, shape (n_BEM_vertices, 3)
Positions on one BEM surface in 3-space. 2562 BEM vertices for BEM with
5120 triangles (ico-4)
tris : ndarray, shape (n_BEM_vertices, 3)
Vertex indices for each triangle (referring to bem_rr)
tn : ndarray, shape (n_BEM_vertices, 3)
Triangle unit normal vectors
ta : ndarray, shape (n_BEM_vertices,)
Triangle areas
rmag : ndarray, shape (n_sensor_pts, 3)
3D positions of MEG coil integration points (from coil['rmag'])
cosmag : ndarray, shape (n_sensor_pts, 3)
Direction of the MEG coil integration points (from coil['cosmag'])
ws : ndarray, shape (n_sensor_pts,)
Weights for MEG coil integration points
bins : ndarray, shape (n_sensor_pts,)
The sensor assignments for each rmag/cosmag/w.
Returns
-------
coeff : ndarray, shape (n_MEG_sensors, n_BEM_vertices)
Linear coefficients with effect of each BEM vertex on each sensor (?)
"""
coeff = np.zeros((bins[-1] + 1, len(bem_rr)))
w_cosmags = ws.reshape(-1, 1) * cosmags
diff = rmags.reshape(rmags.shape[0], 1, rmags.shape[1]) - bem_rr
den = np.sum(diff * diff, axis=-1)
den *= np.sqrt(den)
den *= 3
for ti in range(len(tris)):
tri, tri_nn, tri_area = tris[ti], tn[ti], ta[ti]
# Accumulate the coefficients for each triangle node and add to the
# corresponding coefficient matrix
# Simple version (bem_lin_field_coeffs_simple)
# The following is equivalent to:
# tri_rr = bem_rr[tri]
# for j, coil in enumerate(coils['coils']):
# x = func(coil['rmag'], coil['cosmag'],
# tri_rr, tri_nn, tri_area)
# res = np.sum(coil['w'][np.newaxis, :] * x, axis=1)
# coeff[j][tri + off] += mult * res
c = np.empty((diff.shape[0], tri.shape[0], diff.shape[2]))
_jit_cross(c, diff[:, tri], tri_nn)
c *= w_cosmags.reshape(w_cosmags.shape[0], 1, w_cosmags.shape[1])
for ti in range(3):
x = np.sum(c[:, ti], axis=-1)
x /= den[:, tri[ti]] / tri_area
coeff[:, tri[ti]] += bincount(bins, weights=x, minlength=bins[-1] + 1)
return coeff
def _concatenate_coils(coils):
"""Concatenate MEG coil parameters."""
rmags = np.concatenate([coil["rmag"] for coil in coils])
cosmags = np.concatenate([coil["cosmag"] for coil in coils])
ws = np.concatenate([coil["w"] for coil in coils])
n_int = np.array([len(coil["rmag"]) for coil in coils])
if n_int[-1] == 0:
# We assume each sensor has at least one integration point,
# which should be a safe assumption. But let's check it here, since
# our code elsewhere relies on bins[-1] + 1 being the number of sensors
raise RuntimeError("not supported")
bins = np.repeat(np.arange(len(n_int)), n_int)
return rmags, cosmags, ws, bins
@fill_doc
def _bem_specify_coils(bem, coils, coord_frame, mults, n_jobs):
"""Set up for computing the solution at a set of MEG coils.
Parameters
----------
bem : instance of ConductorModel
BEM information
coils : list of dict, len(n_MEG_sensors)
MEG sensor information dicts
coord_frame : int
Class constant identifying coordinate frame
mults : ndarray, shape (1, n_BEM_vertices)
Multiplier for every vertex in BEM
%(n_jobs)s
Returns
-------
sol: ndarray, shape (n_MEG_sensors, n_BEM_vertices)
MEG solution
"""
# Make sure MEG coils are in MRI coordinate frame to match BEM coords
coils, coord_frame = _check_coil_frame(coils, coord_frame, bem)
# leaving this in in case we want to easily add in the future
# if method != 'simple': # in ['ferguson', 'urankar']:
# raise NotImplementedError
# Compute the weighting factors to obtain the magnetic field in the linear
# potential approximation
# Process each of the surfaces
rmags, cosmags, ws, bins = _triage_coils(coils)
del coils
lens = np.cumsum(np.r_[0, [len(s["rr"]) for s in bem["surfs"]]])
sol = np.zeros((bins[-1] + 1, bem["solution"].shape[1]))
lims = np.concatenate([np.arange(0, sol.shape[0], 100), [sol.shape[0]]])
# Put through the bem (in channel-based chunks to save memory)
for start, stop in zip(lims[:-1], lims[1:]):
mask = np.logical_and(bins >= start, bins < stop)
r, c, w, b = rmags[mask], cosmags[mask], ws[mask], bins[mask] - start
# Compute coeffs for each surface, one at a time
for o1, o2, surf, mult in zip(
lens[:-1], lens[1:], bem["surfs"], bem["field_mult"]
):
coeff = _lin_field_coeff(surf, mult, r, c, w, b, n_jobs)
sol[start:stop] += np.dot(coeff, bem["solution"][o1:o2])
sol *= mults
return sol
def _bem_specify_els(bem, els, mults):
"""Set up for computing the solution at a set of EEG electrodes.
Parameters
----------
bem : instance of ConductorModel
BEM information
els : list of dict, len(n_EEG_sensors)
List of EEG sensor information dicts
mults: ndarray, shape (1, n_BEM_vertices)
Multiplier for every vertex in BEM
Returns
-------
sol : ndarray, shape (n_EEG_sensors, n_BEM_vertices)
EEG solution
"""
sol = np.zeros((len(els), bem["solution"].shape[1]))
scalp = bem["surfs"][0]
# Operate on all integration points for all electrodes (in MRI coords)
rrs = np.concatenate(
[apply_trans(bem["head_mri_t"]["trans"], el["rmag"]) for el in els], axis=0
)
ws = np.concatenate([el["w"] for el in els])
tri_weights, tri_idx = _project_onto_surface(rrs, scalp)
tri_weights *= ws[:, np.newaxis]
weights = np.matmul(
tri_weights[:, np.newaxis], bem["solution"][scalp["tris"][tri_idx]]
)[:, 0]
# there are way more vertices than electrodes generally, so let's iterate
# over the electrodes
edges = np.concatenate([[0], np.cumsum([len(el["w"]) for el in els])])
for ii, (start, stop) in enumerate(zip(edges[:-1], edges[1:])):
sol[ii] = weights[start:stop].sum(0)
sol *= mults
return sol
# #############################################################################
# BEM COMPUTATION
_MAG_FACTOR = 1e-7 # μ_0 / (4π)
# def _bem_inf_pot(rd, Q, rp):
# """The infinite medium potential in one direction. See Eq. (8) in
# Mosher, 1999"""
# NOTE: the (μ_0 / (4π) factor has been moved to _prep_field_communication
# diff = rp - rd # (Observation point position) - (Source position)
# diff2 = np.sum(diff * diff, axis=1) # Squared magnitude of diff
# # (Dipole moment) dot (diff) / (magnitude ^ 3)
# return np.sum(Q * diff, axis=1) / (diff2 * np.sqrt(diff2))
@jit()
def _bem_inf_pots(mri_rr, bem_rr, mri_Q=None):
"""Compute the infinite medium potential in all 3 directions.
Parameters
----------
mri_rr : ndarray, shape (n_dipole_vertices, 3)
Chunk of 3D dipole positions in MRI coordinates
bem_rr: ndarray, shape (n_BEM_vertices, 3)
3D vertex positions for one BEM surface
mri_Q : ndarray, shape (3, 3)
3x3 head -> MRI transform. I.e., head_mri_t.dot(np.eye(3))
Returns
-------
ndarray : shape(n_dipole_vertices, 3, n_BEM_vertices)
"""
# NOTE: the (μ_0 / (4π) factor has been moved to _prep_field_communication
# Get position difference vector between BEM vertex and dipole
diff = np.empty((len(mri_rr), 3, len(bem_rr)))
for ri in range(mri_rr.shape[0]):
rr = mri_rr[ri]
this_diff = bem_rr - rr
diff_norm = np.sum(this_diff * this_diff, axis=1)
diff_norm *= np.sqrt(diff_norm)
diff_norm[diff_norm == 0] = 1.0
if mri_Q is not None:
this_diff = np.dot(this_diff, mri_Q.T)
this_diff /= diff_norm.reshape(-1, 1)
diff[ri] = this_diff.T
return diff
# This function has been refactored to process all points simultaneously
# def _bem_inf_field(rd, Q, rp, d):
# """Infinite-medium magnetic field. See (7) in Mosher, 1999"""
# # Get vector from source to sensor integration point
# diff = rp - rd
# diff2 = np.sum(diff * diff, axis=1) # Get magnitude of diff
#
# # Compute cross product between diff and dipole to get magnetic field at
# # integration point
# x = fast_cross_3d(Q[np.newaxis, :], diff)
#
# # Take magnetic field dotted by integration point normal to get magnetic
# # field threading the current loop. Divide by R^3 (equivalently, R^2 * R)
# return np.sum(x * d, axis=1) / (diff2 * np.sqrt(diff2))
@jit()
def _bem_inf_fields(rr, rmag, cosmag):
"""Compute infinite-medium magnetic field at one MEG sensor.
This operates on all dipoles in all 3 basis directions.
Parameters
----------
rr : ndarray, shape (n_source_points, 3)
3D dipole source positions
rmag : ndarray, shape (n_sensor points, 3)
3D positions of 1 MEG coil's integration points (from coil['rmag'])
cosmag : ndarray, shape (n_sensor_points, 3)
Direction of 1 MEG coil's integration points (from coil['cosmag'])
Returns
-------
ndarray, shape (n_dipoles, 3, n_integration_pts)
Magnetic field from all dipoles at each MEG sensor integration point
"""
# rr, rmag refactored according to Equation (19) in Mosher, 1999
# Knowing that we're doing all directions, refactor above function:
# rr, 3, rmag
diff = rmag.T.reshape(1, 3, rmag.shape[0]) - rr.reshape(rr.shape[0], 3, 1)
diff_norm = np.sum(diff * diff, axis=1) # rr, rmag
diff_norm *= np.sqrt(diff_norm) # Get magnitude of distance cubed
diff_norm_ = diff_norm.reshape(-1)
diff_norm_[diff_norm_ == 0] = 1 # avoid nans
# This is the result of cross-prod calcs with basis vectors,
# as if we had taken (Q=np.eye(3)), then multiplied by cosmags
# factor, and then summed across directions
x = np.empty((rr.shape[0], 3, rmag.shape[0]))
x[:, 0] = diff[:, 1] * cosmag[:, 2] - diff[:, 2] * cosmag[:, 1]
x[:, 1] = diff[:, 2] * cosmag[:, 0] - diff[:, 0] * cosmag[:, 2]
x[:, 2] = diff[:, 0] * cosmag[:, 1] - diff[:, 1] * cosmag[:, 0]
diff_norm = diff_norm_.reshape((rr.shape[0], 1, rmag.shape[0]))
x /= diff_norm
# x.shape == (rr.shape[0], 3, rmag.shape[0])
return x
@fill_doc
def _bem_pot_or_field(rr, mri_rr, mri_Q, coils, solution, bem_rr, n_jobs, coil_type):
"""Calculate the magnetic field or electric potential forward solution.
The code is very similar between EEG and MEG potentials, so combine them.
This does the work of "fwd_comp_field" (which wraps to "fwd_bem_field")
and "fwd_bem_pot_els" in MNE-C.
Parameters
----------
rr : ndarray, shape (n_dipoles, 3)
3D dipole source positions
mri_rr : ndarray, shape (n_dipoles, 3)
3D source positions in MRI coordinates
mri_Q :
3x3 head -> MRI transform. I.e., head_mri_t.dot(np.eye(3))
coils : list of dict, len(sensors)
List of sensors where each element contains sensor specific information
solution : ndarray, shape (n_sensors, n_BEM_rr)
Comes from _bem_specify_coils
bem_rr : ndarray, shape (n_BEM_vertices, 3)
3D vertex positions for all surfaces in the BEM
%(n_jobs)s
coil_type : str
'meg' or 'eeg'
Returns
-------
B : ndarray, shape (n_dipoles * 3, n_sensors)
Forward solution for a set of sensors
"""
# Both MEG and EEG have the inifinite-medium potentials
# This could be just vectorized, but eats too much memory, so instead we
# reduce memory by chunking within _do_inf_pots and parallelize, too:
parallel, p_fun, n_jobs = parallel_func(_do_inf_pots, n_jobs, max_jobs=len(rr))
nas = np.array_split
B = np.sum(
parallel(
p_fun(
mri_rr, sr.copy(), np.ascontiguousarray(mri_Q), np.array(sol)
) # copy and contig
for sr, sol in zip(nas(bem_rr, n_jobs), nas(solution.T, n_jobs))
),
axis=0,
)
# The copy()s above should make it so the whole objects don't need to be
# pickled...
# Only MEG coils are sensitive to the primary current distribution.
if coil_type == "meg":
# Primary current contribution (can be calc. in coil/dipole coords)
parallel, p_fun, n_jobs = parallel_func(_do_prim_curr, n_jobs)
pcc = np.concatenate(parallel(p_fun(r, coils) for r in nas(rr, n_jobs)), axis=0)
B += pcc
B *= _MAG_FACTOR
return B
def _do_prim_curr(rr, coils):
"""Calculate primary currents in a set of MEG coils.
See Mosher et al., 1999 Section II for discussion of primary vs. volume
currents.
Parameters
----------
rr : ndarray, shape (n_dipoles, 3)
3D dipole source positions in head coordinates
coils : list of dict
List of MEG coils where each element contains coil specific information
Returns
-------
pc : ndarray, shape (n_sources, n_MEG_sensors)
Primary current for set of MEG coils due to all sources
"""
rmags, cosmags, ws, bins = _triage_coils(coils)
n_coils = bins[-1] + 1
del coils
pc = np.empty((len(rr) * 3, n_coils))
for start, stop in _rr_bounds(rr, chunk=1):
pp = _bem_inf_fields(rr[start:stop], rmags, cosmags)
pp *= ws
pp.shape = (3 * (stop - start), -1)
pc[3 * start : 3 * stop] = [
bincount(bins, this_pp, bins[-1] + 1) for this_pp in pp
]
return pc
def _rr_bounds(rr, chunk=200):
# chunk data nicely
bounds = np.concatenate([np.arange(0, len(rr), chunk), [len(rr)]])
return zip(bounds[:-1], bounds[1:])
def _do_inf_pots(mri_rr, bem_rr, mri_Q, sol):
"""Calculate infinite potentials for MEG or EEG sensors using chunks.
Parameters
----------
mri_rr : ndarray, shape (n_dipoles, 3)
3D dipole source positions in MRI coordinates
bem_rr : ndarray, shape (n_BEM_vertices, 3)
3D vertex positions for all surfaces in the BEM
mri_Q :
3x3 head -> MRI transform. I.e., head_mri_t.dot(np.eye(3))
sol : ndarray, shape (n_sensors_subset, n_BEM_vertices_subset)
Comes from _bem_specify_coils
Returns
-------
B : ndarray, (n_dipoles * 3, n_sensors)
Forward solution for sensors due to volume currents
"""
# Doing work of 'fwd_bem_pot_calc' in MNE-C
# The following code is equivalent to this, but saves memory
# v0s = _bem_inf_pots(rr, bem_rr, Q) # n_rr x 3 x n_bem_rr
# v0s.shape = (len(rr) * 3, v0s.shape[2])
# B = np.dot(v0s, sol)
# We chunk the source mri_rr's in order to save memory
B = np.empty((len(mri_rr) * 3, sol.shape[1]))
for start, stop in _rr_bounds(mri_rr):
# v0 in Hämäläinen et al., 1989 == v_inf in Mosher, et al., 1999
v0s = _bem_inf_pots(mri_rr[start:stop], bem_rr, mri_Q)
v0s = v0s.reshape(-1, v0s.shape[2])
B[3 * start : 3 * stop] = np.dot(v0s, sol)
return B
# #############################################################################
# SPHERE COMPUTATION
def _sphere_pot_or_field(rr, mri_rr, mri_Q, coils, solution, bem_rr, n_jobs, coil_type):
"""Do potential or field for spherical model."""
fun = _eeg_spherepot_coil if coil_type == "eeg" else _sphere_field
parallel, p_fun, n_jobs = parallel_func(fun, n_jobs, max_jobs=len(rr))
B = np.concatenate(
parallel(p_fun(r, coils, sphere=solution) for r in np.array_split(rr, n_jobs))
)
return B
def _sphere_field(rrs, coils, sphere):
"""Compute field for spherical model using Jukka Sarvas' field computation.
Jukka Sarvas, "Basic mathematical and electromagnetic concepts of the
biomagnetic inverse problem", Phys. Med. Biol. 1987, Vol. 32, 1, 11-22.
The formulas have been manipulated for efficient computation
by Matti Hämäläinen, February 1990
"""
rmags, cosmags, ws, bins = _triage_coils(coils)
return _do_sphere_field(rrs, rmags, cosmags, ws, bins, sphere["r0"])
@jit()
def _do_sphere_field(rrs, rmags, cosmags, ws, bins, r0):
n_coils = bins[-1] + 1
# Shift to the sphere model coordinates
rrs = rrs - r0
B = np.zeros((3 * len(rrs), n_coils))
for ri in range(len(rrs)):
rr = rrs[ri]
# Check for a dipole at the origin
if np.sqrt(np.dot(rr, rr)) <= 1e-10:
continue
this_poss = rmags - r0
# Vector from dipole to the field point
a_vec = this_poss - rr
a = np.sqrt(np.sum(a_vec * a_vec, axis=1))
r = np.sqrt(np.sum(this_poss * this_poss, axis=1))
rr0 = np.sum(this_poss * rr, axis=1)
ar = (r * r) - rr0
ar0 = ar / a
F = a * (r * a + ar)
gr = (a * a) / r + ar0 + 2.0 * (a + r)
g0 = a + 2 * r + ar0
# Compute the dot products needed
re = np.sum(this_poss * cosmags, axis=1)
r0e = np.sum(rr * cosmags, axis=1)
g = (g0 * r0e - gr * re) / (F * F)
good = (a > 0) | (r > 0) | ((a * r) + 1 > 1e-5)
rr_ = rr.reshape(1, 3)
v1 = np.empty((cosmags.shape[0], 3))
_jit_cross(v1, rr_, cosmags)
v2 = np.empty((cosmags.shape[0], 3))
_jit_cross(v2, rr_, this_poss)
xx = (good * ws).reshape(-1, 1) * (
v1 / F.reshape(-1, 1) + v2 * g.reshape(-1, 1)
)
for jj in range(3):
zz = bincount(bins, xx[:, jj], n_coils)
B[3 * ri + jj, :] = zz
B *= _MAG_FACTOR
return B
def _eeg_spherepot_coil(rrs, coils, sphere):
"""Calculate the EEG in the sphere model."""
rmags, cosmags, ws, bins = _triage_coils(coils)
n_coils = bins[-1] + 1
del coils
# Shift to the sphere model coordinates
rrs = rrs - sphere["r0"]
B = np.zeros((3 * len(rrs), n_coils))
for ri, rr in enumerate(rrs):
# Only process dipoles inside the innermost sphere
if np.sqrt(np.dot(rr, rr)) >= sphere["layers"][0]["rad"]:
continue
# fwd_eeg_spherepot_vec
vval_one = np.zeros((len(rmags), 3))
# Make a weighted sum over the equivalence parameters
for eq in range(sphere["nfit"]):
# Scale the dipole position
rd = sphere["mu"][eq] * rr
rd2 = np.sum(rd * rd)
rd2_inv = 1.0 / rd2
# Go over all electrodes
this_pos = rmags - sphere["r0"]
# Scale location onto the surface of the sphere (not used)
# if sphere['scale_pos']:
# pos_len = (sphere['layers'][-1]['rad'] /
# np.sqrt(np.sum(this_pos * this_pos, axis=1)))
# this_pos *= pos_len
# Vector from dipole to the field point
a_vec = this_pos - rd
# Compute the dot products needed
a = np.sqrt(np.sum(a_vec * a_vec, axis=1))
a3 = 2.0 / (a * a * a)
r2 = np.sum(this_pos * this_pos, axis=1)
r = np.sqrt(r2)
rrd = np.sum(this_pos * rd, axis=1)
ra = r2 - rrd
rda = rrd - rd2
# The main ingredients
F = a * (r * a + ra)
c1 = a3 * rda + 1.0 / a - 1.0 / r
c2 = a3 + (a + r) / (r * F)
# Mix them together and scale by lambda/(rd*rd)
m1 = c1 - c2 * rrd
m2 = c2 * rd2
vval_one += (
sphere["lambda"][eq]
* rd2_inv
* (m1[:, np.newaxis] * rd + m2[:, np.newaxis] * this_pos)
)
# compute total result
xx = vval_one * ws[:, np.newaxis]
zz = np.array([bincount(bins, x, bins[-1] + 1) for x in xx.T])
B[3 * ri : 3 * ri + 3, :] = zz
# finishing by scaling by 1/(4*M_PI)
B *= 0.25 / np.pi
return B
def _triage_coils(coils):
return coils if isinstance(coils, tuple) else _concatenate_coils(coils)
# #############################################################################
# MAGNETIC DIPOLE (e.g. CHPI)
_MIN_DIST_LIMIT = 1e-5
def _magnetic_dipole_field_vec(rrs, coils, too_close="raise"):
rmags, cosmags, ws, bins = _triage_coils(coils)
fwd, min_dist = _compute_mdfv(rrs, rmags, cosmags, ws, bins, too_close)
if min_dist < _MIN_DIST_LIMIT:
msg = f"Coil too close (dist = {min_dist * 1000:g} mm)"
if too_close == "raise":
raise RuntimeError(msg)
func = warn if too_close == "warning" else logger.info
func(msg)
return fwd
@jit()
def _compute_mdfv(rrs, rmags, cosmags, ws, bins, too_close):
"""Compute an MEG forward solution for a set of magnetic dipoles."""
# The code below is a more efficient version (~30x) of this:
# for ri, rr in enumerate(rrs):
# for k in range(len(coils)):
# this_coil = coils[k]
# # Go through all points
# diff = this_coil['rmag'] - rr
# dist2 = np.sum(diff * diff, axis=1)[:, np.newaxis]
# dist = np.sqrt(dist2)
# if (dist < 1e-5).any():
# raise RuntimeError('Coil too close')
# dist5 = dist2 * dist2 * dist
# sum_ = (3 * diff * np.sum(diff * this_coil['cosmag'],
# axis=1)[:, np.newaxis] -
# dist2 * this_coil['cosmag']) / dist5
# fwd[3*ri:3*ri+3, k] = 1e-7 * np.dot(this_coil['w'], sum_)
fwd = np.zeros((3 * len(rrs), bins[-1] + 1))
min_dist = np.inf
ws2 = ws.reshape(-1, 1)
for ri in range(len(rrs)):
rr = rrs[ri]
diff = rmags - rr
dist2_ = np.sum(diff * diff, axis=1)
dist2 = dist2_.reshape(-1, 1)
dist = np.sqrt(dist2)
min_dist = min(dist.min(), min_dist)
if min_dist < _MIN_DIST_LIMIT and too_close == "raise":
break
t_ = np.sum(diff * cosmags, axis=1)
t = t_.reshape(-1, 1)
sum_ = ws2 * (3 * diff * t - dist2 * cosmags) / (dist2 * dist2 * dist)
for ii in range(3):
fwd[3 * ri + ii] = bincount(bins, sum_[:, ii], bins[-1] + 1)
fwd *= _MAG_FACTOR
return fwd, min_dist
# #############################################################################
# MAIN TRIAGING FUNCTION
@verbose
def _prep_field_computation(rr, *, sensors, bem, n_jobs, verbose=None):
"""Precompute and store some things that are used for both MEG and EEG.
Calculation includes multiplication factors, coordinate transforms,
compensations, and forward solutions. All are stored in modified fwd_data.
Parameters
----------
rr : ndarray, shape (n_dipoles, 3)
3D dipole source positions in head coordinates
bem : instance of ConductorModel
Boundary Element Model information
fwd_data : dict
Dict containing sensor information in the head coordinate frame.
Gets updated here with BEM and sensor information for later forward
calculations.
%(n_jobs)s
%(verbose)s
"""
bem_rr = mults = mri_Q = head_mri_t = None
if not bem["is_sphere"]:
if bem["bem_method"] != FIFF.FIFFV_BEM_APPROX_LINEAR:
raise RuntimeError("only linear collocation supported")
# Store (and apply soon) μ_0/(4π) factor before source computations
mults = np.repeat(
bem["source_mult"] / (4.0 * np.pi), [len(s["rr"]) for s in bem["surfs"]]
)[np.newaxis, :]
# Get positions of BEM points for every surface
bem_rr = np.concatenate([s["rr"] for s in bem["surfs"]])
# The dipole location and orientation must be transformed
head_mri_t = bem["head_mri_t"]
mri_Q = bem["head_mri_t"]["trans"][:3, :3].T
solutions = dict()
for coil_type in sensors:
coils = sensors[coil_type]["defs"]
if not bem["is_sphere"]:
if coil_type == "meg":
# MEG field computation matrices for BEM
start = "Composing the field computation matrix"
logger.info("\n" + start + "...")
cf = FIFF.FIFFV_COORD_HEAD
# multiply solution by "mults" here for simplicity
solution = _bem_specify_coils(bem, coils, cf, mults, n_jobs)
else:
# Compute solution for EEG sensor
logger.info("Setting up for EEG...")
solution = _bem_specify_els(bem, coils, mults)
else:
solution = bem
if coil_type == "eeg":
logger.info(
"Using the equivalent source approach in the "
"homogeneous sphere for EEG"
)
sensors[coil_type]["defs"] = _triage_coils(coils)
solutions[coil_type] = solution
# Get appropriate forward physics function depending on sphere or BEM model
fun = _sphere_pot_or_field if bem["is_sphere"] else _bem_pot_or_field
# Update fwd_data with
# bem_rr (3D BEM vertex positions)
# mri_Q (3x3 Head->MRI coord transformation applied to identity matrix)
# head_mri_t (head->MRI coord transform dict)
# fun (_bem_pot_or_field if not 'sphere'; otherwise _sph_pot_or_field)
# solutions (len 2 list; [ndarray, shape (n_MEG_sens, n BEM vertices),
# ndarray, shape (n_EEG_sens, n BEM vertices)]
fwd_data = dict(
bem_rr=bem_rr, mri_Q=mri_Q, head_mri_t=head_mri_t, fun=fun, solutions=solutions
)
return fwd_data
@fill_doc
def _compute_forwards_meeg(rr, *, sensors, fwd_data, n_jobs, silent=False):
"""Compute MEG and EEG forward solutions for all sensor types."""
Bs = dict()
# The dipole location and orientation must be transformed to mri coords
mri_rr = None
if fwd_data["head_mri_t"] is not None:
mri_rr = np.ascontiguousarray(apply_trans(fwd_data["head_mri_t"]["trans"], rr))
mri_Q, bem_rr, fun = fwd_data["mri_Q"], fwd_data["bem_rr"], fwd_data["fun"]
solutions = fwd_data["solutions"]
del fwd_data
for coil_type, sens in sensors.items():
coils = sens["defs"]
compensator = sens.get("compensator", None)
post_picks = sens.get("post_picks", None)
solution = solutions.get(coil_type, None)
# Do the actual forward calculation for a list MEG/EEG sensors
if not silent:
logger.info(
f"Computing {coil_type.upper()} at {len(rr)} source location{_pl(rr)} "
"(free orientations)..."
)
# Calculate forward solution using spherical or BEM model
B = fun(
rr,
mri_rr,
mri_Q,
coils=coils,
solution=solution,
bem_rr=bem_rr,
n_jobs=n_jobs,
coil_type=coil_type,
)
# Compensate if needed (only done for MEG systems w/compensation)
if compensator is not None:
B = B @ compensator.T
if post_picks is not None:
B = B[:, post_picks]
Bs[coil_type] = B
return Bs
@verbose
def _compute_forwards(rr, *, bem, sensors, n_jobs, verbose=None):
"""Compute the MEG and EEG forward solutions."""
# Split calculation into two steps to save (potentially) a lot of time
# when e.g. dipole fitting
solver = bem.get("solver", "mne")
_check_option("solver", solver, ("mne", "openmeeg"))
if bem["is_sphere"] or solver == "mne":
fwd_data = _prep_field_computation(rr, sensors=sensors, bem=bem, n_jobs=n_jobs)
Bs = _compute_forwards_meeg(
rr, sensors=sensors, fwd_data=fwd_data, n_jobs=n_jobs
)
else:
Bs = _compute_forwards_openmeeg(rr, bem=bem, sensors=sensors)
n_sensors_want = sum(len(s["ch_names"]) for s in sensors.values())
n_sensors = sum(B.shape[1] for B in Bs.values())
n_sources = list(Bs.values())[0].shape[0]
assert (n_sources, n_sensors) == (len(rr) * 3, n_sensors_want)
return Bs
def _compute_forwards_openmeeg(rr, *, bem, sensors):
"""Compute the MEG and EEG forward solutions for OpenMEEG."""
if len(bem["surfs"]) != 3:
raise RuntimeError("Only 3-layer BEM is supported for OpenMEEG.")
om = _import_openmeeg("compute a forward solution using OpenMEEG")
hminv = om.SymMatrix(bem["solution"])
geom = _make_openmeeg_geometry(bem, invert_transform(bem["head_mri_t"]))
# Make dipoles for all XYZ orientations
dipoles = np.c_[
np.kron(rr.T, np.ones(3)[None, :]).T,
np.kron(np.ones(len(rr))[:, None], np.eye(3)),
]
dipoles = np.asfortranarray(dipoles)
dipoles = om.Matrix(dipoles)
dsm = om.DipSourceMat(geom, dipoles, "Brain")
Bs = dict()
if "eeg" in sensors:
rmags, _, ws, bins = _concatenate_coils(sensors["eeg"]["defs"])
rmags = np.asfortranarray(rmags.astype(np.float64))
eeg_sensors = om.Sensors(om.Matrix(np.asfortranarray(rmags)), geom)
h2em = om.Head2EEGMat(geom, eeg_sensors)
eeg_fwd_full = om.GainEEG(hminv, dsm, h2em).array()
Bs["eeg"] = np.array(
[bincount(bins, ws * x, bins[-1] + 1) for x in eeg_fwd_full.T], float
)
if "meg" in sensors:
rmags, cosmags, ws, bins = _concatenate_coils(sensors["meg"]["defs"])
rmags = np.asfortranarray(rmags.astype(np.float64))
cosmags = np.asfortranarray(cosmags.astype(np.float64))
labels = [str(ii) for ii in range(len(rmags))]
weights = radii = np.ones(len(labels))
meg_sensors = om.Sensors(labels, rmags, cosmags, weights, radii)
h2mm = om.Head2MEGMat(geom, meg_sensors)
ds2mm = om.DipSource2MEGMat(dipoles, meg_sensors)
meg_fwd_full = om.GainMEG(hminv, dsm, h2mm, ds2mm).array()
B = np.array(
[bincount(bins, ws * x, bins[-1] + 1) for x in meg_fwd_full.T], float
)
compensator = sensors["meg"].get("compensator", None)
post_picks = sensors["meg"].get("post_picks", None)
if compensator is not None:
B = B @ compensator.T
if post_picks is not None:
B = B[:, post_picks]
Bs["meg"] = B
return Bs