initial commit

This commit is contained in:
2025-08-19 09:13:22 -07:00
parent 28464811d6
commit 0977a3e14d
820 changed files with 1003358 additions and 2 deletions

7
mne/io/ctf/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
"""CTF module for conversion to FIF."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
from .ctf import read_raw_ctf, RawCTF

38
mne/io/ctf/constants.py Normal file
View File

@@ -0,0 +1,38 @@
"""CTF constants."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
from ...utils import BunchConst
CTF = BunchConst()
# ctf_types.h
CTF.CTFV_MAX_AVERAGE_BINS = 8
CTF.CTFV_MAX_COILS = 8
CTF.CTFV_MAX_BALANCING = 50
CTF.CTFV_SENSOR_LABEL = 31
CTF.CTFV_COIL_LPA = 1
CTF.CTFV_COIL_RPA = 2
CTF.CTFV_COIL_NAS = 3
CTF.CTFV_COIL_SPARE = 4
CTF.CTFV_REF_MAG_CH = 0
CTF.CTFV_REF_GRAD_CH = 1
CTF.CTFV_MEG_CH = 5
CTF.CTFV_EEG_CH = 9
CTF.CTFV_STIM_CH = 11
CTF.CTFV_FILTER_LOWPASS = 1
CTF.CTFV_FILTER_HIGHPASS = 2
# read_res4.c
CTF.FUNNY_POS = 1844
# read_write_data.c
CTF.HEADER_SIZE = 8
CTF.BLOCK_SIZE = 2000
CTF.SYSTEM_CLOCK_CH = "SCLK01-177"

303
mne/io/ctf/ctf.py Normal file
View File

@@ -0,0 +1,303 @@
"""Conversion tool from CTF to FIF."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import os
import numpy as np
from ..._fiff._digitization import _format_dig_points
from ..._fiff.utils import _blk_read_lims, _mult_cal_one
from ...utils import (
_check_fname,
_check_option,
_clean_names,
fill_doc,
logger,
verbose,
)
from ..base import BaseRaw
from .constants import CTF
from .eeg import _read_eeg, _read_pos
from .hc import _read_hc
from .info import _annotate_bad_segments, _compose_meas_info, _read_bad_chans
from .markers import _read_annotations_ctf_call
from .res4 import _make_ctf_name, _read_res4
from .trans import _make_ctf_coord_trans_set
@fill_doc
def read_raw_ctf(
directory, system_clock="truncate", preload=False, clean_names=False, verbose=None
) -> "RawCTF":
"""Raw object from CTF directory.
Parameters
----------
directory : path-like
Path to the CTF data (ending in ``'.ds'``).
system_clock : str
How to treat the system clock. Use "truncate" (default) to truncate
the data file when the system clock drops to zero, and use "ignore"
to ignore the system clock (e.g., if head positions are measured
multiple times during a recording).
%(preload)s
clean_names : bool, optional
If True main channel names and compensation channel names will
be cleaned from CTF suffixes. The default is False.
%(verbose)s
Returns
-------
raw : instance of RawCTF
The raw data.
Notes
-----
.. versionadded:: 0.11
To read in the Polhemus digitization data (for example, from
a .pos file), include the file in the CTF directory. The
points will then automatically be read into the `mne.io.Raw`
instance via `mne.io.read_raw_ctf`.
"""
return RawCTF(
directory,
system_clock,
preload=preload,
clean_names=clean_names,
verbose=verbose,
)
@fill_doc
class RawCTF(BaseRaw):
"""Raw object from CTF directory.
Parameters
----------
directory : path-like
Path to the CTF data (ending in ``'.ds'``).
system_clock : str
How to treat the system clock. Use ``"truncate"`` (default) to truncate
the data file when the system clock drops to zero, and use ``"ignore"``
to ignore the system clock (e.g., if head positions are measured
multiple times during a recording).
%(preload)s
clean_names : bool, optional
If True main channel names and compensation channel names will
be cleaned from CTF suffixes. The default is False.
%(verbose)s
See Also
--------
mne.io.Raw : Documentation of attributes and methods.
"""
@verbose
def __init__(
self,
directory,
system_clock="truncate",
preload=False,
verbose=None,
clean_names=False,
):
# adapted from mne_ctf2fiff.c
directory = str(
_check_fname(directory, "read", True, "directory", need_dir=True)
)
if not directory.endswith(".ds"):
raise TypeError(
f'directory must be a directory ending with ".ds", got {directory}'
)
_check_option("system_clock", system_clock, ["ignore", "truncate"])
logger.info(f"ds directory : {directory}")
res4 = _read_res4(directory) # Read the magical res4 file
coils = _read_hc(directory) # Read the coil locations
eeg = _read_eeg(directory) # Read the EEG electrode loc info
# Investigate the coil location data to get the coordinate trans
coord_trans = _make_ctf_coord_trans_set(res4, coils)
digs = _read_pos(directory, coord_trans)
# Compose a structure which makes fiff writing a piece of cake
info = _compose_meas_info(res4, coils, coord_trans, eeg)
with info._unlock():
info["dig"] += digs
info["dig"] = _format_dig_points(info["dig"])
info["bads"] += _read_bad_chans(directory, info)
# Determine how our data is distributed across files
fnames = list()
last_samps = list()
raw_extras = list()
missing_names = list()
no_samps = list()
while True:
suffix = "meg4" if len(fnames) == 0 else f"{len(fnames)}_meg4"
meg4_name, found = _make_ctf_name(directory, suffix, raise_error=False)
if not found:
missing_names.append(os.path.relpath(meg4_name, directory))
break
# check how much data is in the file
sample_info = _get_sample_info(meg4_name, res4, system_clock)
if sample_info["n_samp"] == 0:
no_samps.append(os.path.relpath(meg4_name, directory))
break
if len(fnames) == 0:
buffer_size_sec = sample_info["block_size"] / info["sfreq"]
else:
buffer_size_sec = 1.0
fnames.append(meg4_name)
last_samps.append(sample_info["n_samp"] - 1)
raw_extras.append(sample_info)
first_samps = [0] * len(last_samps)
if len(fnames) == 0:
raise OSError(
f"Could not find any data, could not find the following "
f"file(s): {missing_names}, and the following file(s) had no "
f"valid samples: {no_samps}"
)
super().__init__(
info,
preload,
first_samps=first_samps,
last_samps=last_samps,
filenames=fnames,
raw_extras=raw_extras,
orig_format="int",
buffer_size_sec=buffer_size_sec,
verbose=verbose,
)
# Add bad segments as Annotations (correct for start time)
start_time = -res4["pre_trig_pts"] / float(info["sfreq"])
annot = _annotate_bad_segments(directory, start_time, info["meas_date"])
marker_annot = _read_annotations_ctf_call(
directory=directory,
total_offset=(res4["pre_trig_pts"] / res4["sfreq"]),
trial_duration=(res4["nsamp"] / res4["sfreq"]),
meas_date=info["meas_date"],
)
annot = marker_annot if annot is None else annot + marker_annot
self.set_annotations(annot)
if clean_names:
_clean_names_inst(self)
def _read_segment_file(self, data, idx, fi, start, stop, cals, mult):
"""Read a chunk of raw data."""
si = self._raw_extras[fi]
offset = 0
trial_start_idx, r_lims, d_lims = _blk_read_lims(
start, stop, int(si["block_size"])
)
with open(self.filenames[fi], "rb") as fid:
for bi in range(len(r_lims)):
samp_offset = (bi + trial_start_idx) * si["res4_nsamp"]
n_read = min(si["n_samp_tot"] - samp_offset, si["block_size"])
# read the chunk of data
# have to be careful on Windows and make sure we are using
# 64-bit integers here
with np.errstate(over="raise"):
pos = np.int64(CTF.HEADER_SIZE)
pos += np.int64(samp_offset) * si["n_chan"] * 4
fid.seek(pos, 0)
this_data = np.fromfile(fid, ">i4", count=si["n_chan"] * n_read)
this_data.shape = (si["n_chan"], n_read)
this_data = this_data[:, r_lims[bi, 0] : r_lims[bi, 1]]
data_view = data[:, d_lims[bi, 0] : d_lims[bi, 1]]
_mult_cal_one(data_view, this_data, idx, cals, mult)
offset += n_read
def _clean_names_inst(inst):
"""Clean up CTF suffixes from channel names."""
mapping = dict(zip(inst.ch_names, _clean_names(inst.ch_names)))
inst.rename_channels(mapping)
for comp in inst.info["comps"]:
for key in ("row_names", "col_names"):
comp["data"][key] = _clean_names(comp["data"][key])
def _get_sample_info(fname, res4, system_clock):
"""Determine the number of valid samples."""
logger.info(f"Finding samples for {fname}: ")
if CTF.SYSTEM_CLOCK_CH in res4["ch_names"]:
clock_ch = res4["ch_names"].index(CTF.SYSTEM_CLOCK_CH)
else:
clock_ch = None
for k, ch in enumerate(res4["chs"]):
if ch["ch_name"] == CTF.SYSTEM_CLOCK_CH:
clock_ch = k
break
with open(fname, "rb") as fid:
fid.seek(0, os.SEEK_END)
st_size = fid.tell()
fid.seek(0, 0)
if (st_size - CTF.HEADER_SIZE) % (4 * res4["nsamp"] * res4["nchan"]) != 0:
raise RuntimeError(
"The number of samples is not an even multiple of the trial size"
)
n_samp_tot = (st_size - CTF.HEADER_SIZE) // (4 * res4["nchan"])
n_trial = n_samp_tot // res4["nsamp"]
n_samp = n_samp_tot
if clock_ch is None:
logger.info(
" System clock channel is not available, assuming "
"all samples to be valid."
)
elif system_clock == "ignore":
logger.info(" System clock channel is available, but ignored.")
else: # use it
logger.info(
" System clock channel is available, checking "
"which samples are valid."
)
for t in range(n_trial):
# Skip to the correct trial
samp_offset = t * res4["nsamp"]
offset = (
CTF.HEADER_SIZE
+ (samp_offset * res4["nchan"] + (clock_ch * res4["nsamp"])) * 4
)
fid.seek(offset, 0)
this_data = np.fromfile(fid, ">i4", res4["nsamp"])
if len(this_data) != res4["nsamp"]:
raise RuntimeError(f"Cannot read data for trial {t+1}.")
end = np.where(this_data == 0)[0]
if len(end) > 0:
n_samp = samp_offset + end[0]
break
if n_samp < res4["nsamp"]:
n_trial = 1
logger.info(
" %d x %d = %d samples from %d chs",
n_trial,
n_samp,
n_samp,
res4["nchan"],
)
else:
n_trial = n_samp // res4["nsamp"]
n_omit = n_samp_tot - n_samp
logger.info(
" %d x %d = %d samples from %d chs",
n_trial,
res4["nsamp"],
n_samp,
res4["nchan"],
)
if n_omit != 0:
logger.info(" %d samples omitted at the end", n_omit)
return dict(
n_samp=n_samp,
n_samp_tot=n_samp_tot,
block_size=res4["nsamp"],
res4_nsamp=res4["nsamp"],
n_chan=res4["nchan"],
)

108
mne/io/ctf/eeg.py Normal file
View File

@@ -0,0 +1,108 @@
"""Read .eeg files."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
from os import listdir
from os.path import join
import numpy as np
from ..._fiff.constants import FIFF
from ...transforms import apply_trans
from ...utils import logger, warn
from .res4 import _make_ctf_name
_cardinal_dict = dict(
nasion=FIFF.FIFFV_POINT_NASION,
lpa=FIFF.FIFFV_POINT_LPA,
left=FIFF.FIFFV_POINT_LPA,
rpa=FIFF.FIFFV_POINT_RPA,
right=FIFF.FIFFV_POINT_RPA,
)
def _read_eeg(directory):
"""Read the .eeg file."""
# Missing file is ok
fname, found = _make_ctf_name(directory, "eeg", raise_error=False)
if not found:
logger.info(" Separate EEG position data file not present.")
return
eeg = dict(
labels=list(),
kinds=list(),
ids=list(),
rr=list(),
np=0,
assign_to_chs=True,
coord_frame=FIFF.FIFFV_MNE_COORD_CTF_HEAD,
)
with open(fname, "rb") as fid:
for line in fid:
line = line.strip()
if len(line) > 0:
parts = line.decode("utf-8").split()
if len(parts) != 5:
raise RuntimeError(f"Illegal data in EEG position file: {line}")
r = np.array([float(p) for p in parts[2:]]) / 100.0
if (r * r).sum() > 1e-4:
label = parts[1]
eeg["labels"].append(label)
eeg["rr"].append(r)
id_ = _cardinal_dict.get(label.lower(), int(parts[0]))
if label.lower() in _cardinal_dict:
kind = FIFF.FIFFV_POINT_CARDINAL
else:
kind = FIFF.FIFFV_POINT_EXTRA
eeg["ids"].append(id_)
eeg["kinds"].append(kind)
eeg["np"] += 1
logger.info(" Separate EEG position data file read.")
return eeg
def _read_pos(directory, transformations):
"""Read the .pos file and return eeg positions as dig extra points."""
fname = [join(directory, f) for f in listdir(directory) if f.endswith(".pos")]
if len(fname) < 1:
return list()
elif len(fname) > 1:
warn(" Found multiple pos files. Extra digitizer points not added.")
return list()
logger.info(f" Reading digitizer points from {fname}...")
if transformations["t_ctf_head_head"] is None:
warn(" No transformation found. Extra digitizer points not added.")
return list()
fname = fname[0]
digs = list()
i = 2000
with open(fname) as fid:
for line in fid:
line = line.strip()
if len(line) > 0:
parts = line.split()
# The lines can have 4 or 5 parts. First part is for the id,
# which can be an int or a string. The last three are for xyz
# coordinates. The extra part is for additional info
# (e.g. 'Pz', 'Cz') which is ignored.
if len(parts) not in [4, 5]:
continue
try:
ident = int(parts[0]) + 1000
except ValueError: # if id is not an int
ident = i
i += 1
dig = dict(
kind=FIFF.FIFFV_POINT_EXTRA,
ident=ident,
r=list(),
coord_frame=FIFF.FIFFV_COORD_HEAD,
)
r = np.array([float(p) for p in parts[-3:]]) / 100.0 # cm to m
if (r * r).sum() > 1e-4:
r = apply_trans(transformations["t_ctf_head_head"], r)
dig["r"] = r
digs.append(dig)
return digs

89
mne/io/ctf/hc.py Normal file
View File

@@ -0,0 +1,89 @@
"""Read .hc files."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import numpy as np
from ..._fiff.constants import FIFF
from ...utils import logger
from .constants import CTF
from .res4 import _make_ctf_name
_kind_dict = {
"nasion": CTF.CTFV_COIL_NAS,
"left ear": CTF.CTFV_COIL_LPA,
"right ear": CTF.CTFV_COIL_RPA,
"spare": CTF.CTFV_COIL_SPARE,
}
_coord_dict = {
"relative to dewar": FIFF.FIFFV_MNE_COORD_CTF_DEVICE,
"relative to head": FIFF.FIFFV_MNE_COORD_CTF_HEAD,
}
def _read_one_coil_point(fid):
"""Read coil coordinate information from the hc file."""
# Descriptor
one = "#"
while len(one) > 0 and one[0] == "#":
one = fid.readline()
if len(one) == 0:
return None
one = one.strip().decode("utf-8")
if "Unable" in one:
raise RuntimeError("HPI information not available")
# Hopefully this is an unambiguous interpretation
p = dict()
p["valid"] = "measured" in one
for key, val in _coord_dict.items():
if key in one:
p["coord_frame"] = val
break
else:
p["coord_frame"] = -1
for key, val in _kind_dict.items():
if key in one:
p["kind"] = val
break
else:
p["kind"] = -1
# Three coordinates
p["r"] = np.empty(3)
for ii, coord in enumerate("xyz"):
sp = fid.readline().decode("utf-8").strip()
if len(sp) == 0: # blank line
continue
sp = sp.split(" ")
if len(sp) != 3 or sp[0] != coord or sp[1] != "=":
raise RuntimeError(f"Bad line: {one}")
# We do not deal with centimeters
p["r"][ii] = float(sp[2]) / 100.0
return p
def _read_hc(directory):
"""Read the hc file to get the HPI info and to prepare for coord trans."""
fname, found = _make_ctf_name(directory, "hc", raise_error=False)
if not found:
logger.info(" hc data not present")
return None
s = list()
with open(fname, "rb") as fid:
while True:
p = _read_one_coil_point(fid)
if p is None:
# First point bad indicates that the file is empty
if len(s) == 0:
logger.info("hc file empty, no data present")
return None
# Returns None if at EOF
logger.info(" hc data read.")
return s
if p["valid"]:
s.append(p)

561
mne/io/ctf/info.py Normal file
View File

@@ -0,0 +1,561 @@
"""Populate measurement info."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import os.path as op
from calendar import timegm
from time import strptime
import numpy as np
from ..._fiff.constants import FIFF
from ..._fiff.ctf_comp import _add_kind, _calibrate_comp
from ..._fiff.meas_info import _empty_info
from ..._fiff.write import get_new_file_id
from ...annotations import Annotations
from ...transforms import (
_coord_frame_name,
apply_trans,
combine_transforms,
invert_transform,
)
from ...utils import _clean_names, logger, warn
from .constants import CTF
_ctf_to_fiff = {
CTF.CTFV_COIL_LPA: FIFF.FIFFV_POINT_LPA,
CTF.CTFV_COIL_RPA: FIFF.FIFFV_POINT_RPA,
CTF.CTFV_COIL_NAS: FIFF.FIFFV_POINT_NASION,
}
def _pick_isotrak_and_hpi_coils(res4, coils, t):
"""Pick the HPI coil locations given in device coordinates."""
if coils is None:
return list(), list()
dig = list()
hpi_result = dict(dig_points=list())
n_coil_dev = 0
n_coil_head = 0
for p in coils:
if p["valid"]:
if p["kind"] in [CTF.CTFV_COIL_LPA, CTF.CTFV_COIL_RPA, CTF.CTFV_COIL_NAS]:
kind = FIFF.FIFFV_POINT_CARDINAL
ident = _ctf_to_fiff[p["kind"]]
else: # CTF.CTFV_COIL_SPARE
kind = FIFF.FIFFV_POINT_HPI
ident = p["kind"]
if p["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_DEVICE:
if t is None or t["t_ctf_dev_dev"] is None:
raise RuntimeError(
"No coordinate transformation "
"available for HPI coil locations"
)
d = dict(
kind=kind,
ident=ident,
r=apply_trans(t["t_ctf_dev_dev"], p["r"]),
coord_frame=FIFF.FIFFV_COORD_UNKNOWN,
)
hpi_result["dig_points"].append(d)
n_coil_dev += 1
elif p["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD:
if t is None or t["t_ctf_head_head"] is None:
raise RuntimeError(
"No coordinate transformation "
"available for (virtual) Polhemus data"
)
d = dict(
kind=kind,
ident=ident,
r=apply_trans(t["t_ctf_head_head"], p["r"]),
coord_frame=FIFF.FIFFV_COORD_HEAD,
)
dig.append(d)
n_coil_head += 1
if n_coil_head > 0:
logger.info(" Polhemus data for %d HPI coils added", n_coil_head)
if n_coil_dev > 0:
logger.info(
" Device coordinate locations for %d HPI coils added", n_coil_dev
)
return dig, [hpi_result]
def _convert_time(date_str, time_str):
"""Convert date and time strings to float time."""
if date_str == time_str == "":
date_str = "01/01/1970"
time_str = "00:00:00"
logger.info(
"No date or time found, setting to the start of the "
"POSIX epoch (1970/01/01 midnight)"
)
for fmt in ("%d/%m/%Y", "%d-%b-%Y", "%a, %b %d, %Y", "%Y/%m/%d"):
try:
date = strptime(date_str.strip(), fmt)
except ValueError:
pass
else:
break
else:
raise RuntimeError(
f"Illegal date: {date_str}.\nIf the language of the date does not "
"correspond to your local machine's language try to set the "
"locale to the language of the date string:\n"
'locale.setlocale(locale.LC_ALL, "en_US")'
)
for fmt in ("%H:%M:%S", "%H:%M"):
try:
time = strptime(time_str, fmt)
except ValueError:
pass
else:
break
else:
raise RuntimeError(f"Illegal time: {time_str}")
# MNE-C uses mktime which uses local time, but here we instead decouple
# conversion location from the process, and instead assume that the
# acquisition was in GMT. This will be wrong for most sites, but at least
# the value we obtain here won't depend on the geographical location
# that the file was converted.
res = timegm(
(
date.tm_year,
date.tm_mon,
date.tm_mday,
time.tm_hour,
time.tm_min,
time.tm_sec,
date.tm_wday,
date.tm_yday,
date.tm_isdst,
)
)
return res
def _get_plane_vectors(ez):
"""Get two orthogonal vectors orthogonal to ez (ez will be modified)."""
assert ez.shape == (3,)
ez_len = np.sqrt(np.sum(ez * ez))
if ez_len == 0:
raise RuntimeError("Zero length normal. Cannot proceed.")
if np.abs(ez_len - np.abs(ez[2])) < 1e-5: # ez already in z-direction
ex = np.array([1.0, 0.0, 0.0])
else:
ex = np.zeros(3)
if ez[1] < ez[2]:
ex[0 if ez[0] < ez[1] else 1] = 1.0
else:
ex[0 if ez[0] < ez[2] else 2] = 1.0
ez /= ez_len
ex -= np.dot(ez, ex) * ez
ex /= np.sqrt(np.sum(ex * ex))
ey = np.cross(ez, ex)
return ex, ey
def _at_origin(x):
"""Determine if a vector is at the origin."""
return np.sum(x * x) < 1e-8
def _check_comp_ch(cch, kind, desired=None):
if desired is None:
desired = cch["grad_order_no"]
if cch["grad_order_no"] != desired:
raise RuntimeError(
f"{kind} channel with inconsistent compensation "
f"grade {cch['grad_order_no']}, should be {desired}"
)
return desired
def _convert_channel_info(res4, t, use_eeg_pos):
"""Convert CTF channel information to fif format."""
nmeg = neeg = nstim = nmisc = nref = 0
chs = list()
this_comp = None
for k, cch in enumerate(res4["chs"]):
cal = float(1.0 / (cch["proper_gain"] * cch["qgain"]))
ch = dict(
scanno=k + 1,
range=1.0,
cal=cal,
loc=np.full(12, np.nan),
unit_mul=FIFF.FIFF_UNITM_NONE,
ch_name=cch["ch_name"][:15],
coil_type=FIFF.FIFFV_COIL_NONE,
)
del k
chs.append(ch)
# Create the channel position information
if cch["sensor_type_index"] in (
CTF.CTFV_REF_MAG_CH,
CTF.CTFV_REF_GRAD_CH,
CTF.CTFV_MEG_CH,
):
# Extra check for a valid MEG channel
if (
np.sum(cch["coil"]["pos"][0] ** 2) < 1e-6
or np.sum(cch["coil"]["norm"][0] ** 2) < 1e-6
):
nmisc += 1
ch.update(
logno=nmisc,
coord_frame=FIFF.FIFFV_COORD_UNKNOWN,
kind=FIFF.FIFFV_MISC_CH,
unit=FIFF.FIFF_UNIT_V,
)
text = "MEG"
if cch["sensor_type_index"] != CTF.CTFV_MEG_CH:
text += " ref"
warn(
f"{text} channel {ch['ch_name']} did not have position "
"assigned, so it was changed to a MISC channel"
)
continue
ch["unit"] = FIFF.FIFF_UNIT_T
# Set up the local coordinate frame
r0 = cch["coil"]["pos"][0].copy()
ez = cch["coil"]["norm"][0].copy()
# It turns out that positive proper_gain requires swapping
# of the normal direction
if cch["proper_gain"] > 0.0:
ez *= -1
# Check how the other vectors should be defined
off_diag = False
# Default: ex and ey are arbitrary in the plane normal to ez
if cch["sensor_type_index"] == CTF.CTFV_REF_GRAD_CH:
# The off-diagonal gradiometers are an exception:
#
# We use the same convention for ex as for Neuromag planar
# gradiometers: ex pointing in the positive gradient direction
diff = cch["coil"]["pos"][0] - cch["coil"]["pos"][1]
size = np.sqrt(np.sum(diff * diff))
if size > 0.0:
diff /= size
# Is ez normal to the line joining the coils?
if np.abs(np.dot(diff, ez)) < 1e-3:
off_diag = True
# Handle the off-diagonal gradiometer coordinate system
r0 -= size * diff / 2.0
ex = diff
ey = np.cross(ez, ex)
else:
ex, ey = _get_plane_vectors(ez)
else:
ex, ey = _get_plane_vectors(ez)
# Transform into a Neuromag-like device coordinate system
ch["loc"] = np.concatenate(
[
apply_trans(t["t_ctf_dev_dev"], r0),
apply_trans(t["t_ctf_dev_dev"], ex, move=False),
apply_trans(t["t_ctf_dev_dev"], ey, move=False),
apply_trans(t["t_ctf_dev_dev"], ez, move=False),
]
)
del r0, ex, ey, ez
# Set the coil type
if cch["sensor_type_index"] == CTF.CTFV_REF_MAG_CH:
ch["kind"] = FIFF.FIFFV_REF_MEG_CH
ch["coil_type"] = FIFF.FIFFV_COIL_CTF_REF_MAG
nref += 1
ch["logno"] = nref
elif cch["sensor_type_index"] == CTF.CTFV_REF_GRAD_CH:
ch["kind"] = FIFF.FIFFV_REF_MEG_CH
if off_diag:
ch["coil_type"] = FIFF.FIFFV_COIL_CTF_OFFDIAG_REF_GRAD
else:
ch["coil_type"] = FIFF.FIFFV_COIL_CTF_REF_GRAD
nref += 1
ch["logno"] = nref
else:
this_comp = _check_comp_ch(cch, "Gradiometer", this_comp)
ch["kind"] = FIFF.FIFFV_MEG_CH
ch["coil_type"] = FIFF.FIFFV_COIL_CTF_GRAD
nmeg += 1
ch["logno"] = nmeg
# Encode the software gradiometer order
ch["coil_type"] = int(ch["coil_type"] | (cch["grad_order_no"] << 16))
ch["coord_frame"] = FIFF.FIFFV_COORD_DEVICE
elif cch["sensor_type_index"] == CTF.CTFV_EEG_CH:
coord_frame = FIFF.FIFFV_COORD_HEAD
if use_eeg_pos:
# EEG electrode coordinates may be present but in the
# CTF head frame
ch["loc"][:3] = cch["coil"]["pos"][0]
if not _at_origin(ch["loc"][:3]):
if t["t_ctf_head_head"] is None:
warn(
f"EEG electrode ({ch['ch_name']}) location omitted because "
"of missing HPI information"
)
ch["loc"].fill(np.nan)
coord_frame = FIFF.FIFFV_MNE_COORD_CTF_HEAD
else:
ch["loc"][:3] = apply_trans(t["t_ctf_head_head"], ch["loc"][:3])
neeg += 1
ch.update(
logno=neeg,
kind=FIFF.FIFFV_EEG_CH,
unit=FIFF.FIFF_UNIT_V,
coord_frame=coord_frame,
coil_type=FIFF.FIFFV_COIL_EEG,
)
elif cch["sensor_type_index"] == CTF.CTFV_STIM_CH:
nstim += 1
ch.update(
logno=nstim,
coord_frame=FIFF.FIFFV_COORD_UNKNOWN,
kind=FIFF.FIFFV_STIM_CH,
unit=FIFF.FIFF_UNIT_V,
)
else:
nmisc += 1
ch.update(
logno=nmisc,
coord_frame=FIFF.FIFFV_COORD_UNKNOWN,
kind=FIFF.FIFFV_MISC_CH,
unit=FIFF.FIFF_UNIT_V,
)
return chs
def _comp_sort_keys(c):
"""Sort the compensation data."""
return (int(c["coeff_type"]), int(c["scanno"]))
def _check_comp(comp):
"""Check that conversion to named matrices is possible."""
ref_sens = None
kind = -1
for k, c_k in enumerate(comp):
if c_k["coeff_type"] != kind:
c_ref = c_k
ref_sens = c_ref["sensors"]
kind = c_k["coeff_type"]
elif not c_k["sensors"] == ref_sens:
raise RuntimeError("Cannot use an uneven compensation matrix")
def _conv_comp(comp, first, last, chs):
"""Add a new converted compensation data item."""
ch_names = [c["ch_name"] for c in chs]
n_col = comp[first]["ncoeff"]
col_names = comp[first]["sensors"][:n_col]
row_names = [comp[p]["sensor_name"] for p in range(first, last + 1)]
mask = np.isin(col_names, ch_names) # missing channels excluded
col_names = np.array(col_names)[mask].tolist()
n_col = len(col_names)
n_row = len(row_names)
ccomp = dict(ctfkind=comp[first]["coeff_type"], save_calibrated=False)
_add_kind(ccomp)
data = np.empty((n_row, n_col))
for ii, coeffs in enumerate(comp[first : last + 1]):
# Pick the elements to the matrix
data[ii, :] = coeffs["coeffs"][mask]
ccomp["data"] = dict(
row_names=row_names,
col_names=col_names,
data=data,
nrow=len(row_names),
ncol=len(col_names),
)
mk = ("proper_gain", "qgain")
_calibrate_comp(ccomp, chs, row_names, col_names, mult_keys=mk, flip=True)
return ccomp
def _convert_comp_data(res4):
"""Convert the compensation data into named matrices."""
if res4["ncomp"] == 0:
return
# Sort the coefficients in our favorite order
res4["comp"] = sorted(res4["comp"], key=_comp_sort_keys)
# Check that all items for a given compensation type have the correct
# number of channels
_check_comp(res4["comp"])
# Create named matrices
first = 0
kind = -1
comps = list()
for k in range(len(res4["comp"])):
if res4["comp"][k]["coeff_type"] != kind:
if k > 0:
comps.append(_conv_comp(res4["comp"], first, k - 1, res4["chs"]))
kind = res4["comp"][k]["coeff_type"]
first = k
comps.append(_conv_comp(res4["comp"], first, k, res4["chs"]))
return comps
def _pick_eeg_pos(c):
"""Pick EEG positions."""
eeg = dict(
coord_frame=FIFF.FIFFV_COORD_HEAD,
assign_to_chs=False,
labels=list(),
ids=list(),
rr=list(),
kinds=list(),
np=0,
)
for ch in c["chs"]:
if ch["kind"] == FIFF.FIFFV_EEG_CH and not _at_origin(ch["loc"][:3]):
eeg["labels"].append(ch["ch_name"])
eeg["ids"].append(ch["logno"])
eeg["rr"].append(ch["loc"][:3])
eeg["kinds"].append(FIFF.FIFFV_POINT_EEG)
eeg["np"] += 1
if eeg["np"] == 0:
return None
logger.info("Picked positions of %d EEG channels from channel info", eeg["np"])
return eeg
def _add_eeg_pos(eeg, t, c):
"""Pick the (virtual) EEG position data."""
if eeg is None:
return
if t is None or t["t_ctf_head_head"] is None:
raise RuntimeError(
"No coordinate transformation available for EEG position data"
)
eeg_assigned = 0
if eeg["assign_to_chs"]:
for k in range(eeg["np"]):
# Look for a channel name match
for ch in c["chs"]:
if ch["ch_name"].lower() == eeg["labels"][k].lower():
r0 = ch["loc"][:3]
r0[:] = eeg["rr"][k]
if eeg["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD:
r0[:] = apply_trans(t["t_ctf_head_head"], r0)
elif eeg["coord_frame"] != FIFF.FIFFV_COORD_HEAD:
raise RuntimeError(
"Illegal coordinate frame for EEG electrode "
f"positions : {_coord_frame_name(eeg['coord_frame'])}"
)
# Use the logical channel number as an identifier
eeg["ids"][k] = ch["logno"]
eeg["kinds"][k] = FIFF.FIFFV_POINT_EEG
eeg_assigned += 1
break
# Add these to the Polhemus data
fid_count = eeg_count = extra_count = 0
for k in range(eeg["np"]):
d = dict(
r=eeg["rr"][k].copy(),
kind=eeg["kinds"][k],
ident=eeg["ids"][k],
coord_frame=FIFF.FIFFV_COORD_HEAD,
)
c["dig"].append(d)
if eeg["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD:
d["r"] = apply_trans(t["t_ctf_head_head"], d["r"])
elif eeg["coord_frame"] != FIFF.FIFFV_COORD_HEAD:
raise RuntimeError(
"Illegal coordinate frame for EEG electrode positions: "
+ _coord_frame_name(eeg["coord_frame"])
)
if eeg["kinds"][k] == FIFF.FIFFV_POINT_CARDINAL:
fid_count += 1
elif eeg["kinds"][k] == FIFF.FIFFV_POINT_EEG:
eeg_count += 1
else:
extra_count += 1
if eeg_assigned > 0:
logger.info(
" %d EEG electrode locations assigned to channel info.", eeg_assigned
)
for count, kind in zip(
(fid_count, eeg_count, extra_count),
("fiducials", "EEG locations", "extra points"),
):
if count > 0:
logger.info(" %d %s added to Polhemus data.", count, kind)
_filt_map = {CTF.CTFV_FILTER_LOWPASS: "lowpass", CTF.CTFV_FILTER_HIGHPASS: "highpass"}
def _compose_meas_info(res4, coils, trans, eeg):
"""Create meas info from CTF data."""
info = _empty_info(res4["sfreq"])
# Collect all the necessary data from the structures read
info["meas_id"] = get_new_file_id()
info["meas_id"]["usecs"] = 0
info["meas_id"]["secs"] = _convert_time(res4["data_date"], res4["data_time"])
info["meas_date"] = (info["meas_id"]["secs"], info["meas_id"]["usecs"])
info["experimenter"] = res4["nf_operator"]
info["subject_info"] = dict(his_id=res4["nf_subject_id"])
for filt in res4["filters"]:
if filt["type"] in _filt_map:
info[_filt_map[filt["type"]]] = filt["freq"]
info["dig"], info["hpi_results"] = _pick_isotrak_and_hpi_coils(res4, coils, trans)
if trans is not None:
if len(info["hpi_results"]) > 0:
info["hpi_results"][0]["coord_trans"] = trans["t_ctf_head_head"]
if trans["t_dev_head"] is not None:
info["dev_head_t"] = trans["t_dev_head"]
info["dev_ctf_t"] = combine_transforms(
trans["t_dev_head"],
invert_transform(trans["t_ctf_head_head"]),
FIFF.FIFFV_COORD_DEVICE,
FIFF.FIFFV_MNE_COORD_CTF_HEAD,
)
if trans["t_ctf_head_head"] is not None:
info["ctf_head_t"] = trans["t_ctf_head_head"]
info["chs"] = _convert_channel_info(res4, trans, eeg is None)
info["comps"] = _convert_comp_data(res4)
if eeg is None:
# Pick EEG locations from chan info if not read from a separate file
eeg = _pick_eeg_pos(info)
_add_eeg_pos(eeg, trans, info)
logger.info(" Measurement info composed.")
info._unlocked = False
info._update_redundant()
return info
def _read_bad_chans(directory, info):
"""Read Bad channel list and match to internal names."""
fname = op.join(directory, "BadChannels")
if not op.exists(fname):
return []
mapping = dict(zip(_clean_names(info["ch_names"]), info["ch_names"]))
with open(fname) as fid:
bad_chans = [mapping[f.strip()] for f in fid.readlines()]
return bad_chans
def _annotate_bad_segments(directory, start_time, meas_date):
fname = op.join(directory, "bad.segments")
if not op.exists(fname):
return None
# read in bad segment file
onsets = []
durations = []
desc = []
with open(fname) as fid:
for f in fid.readlines():
tmp = f.strip().split()
desc.append(f"bad_{tmp[0]}")
onsets.append(np.float64(tmp[1]) - start_time)
durations.append(np.float64(tmp[2]) - np.float64(tmp[1]))
# return None if there are no bad segments
if len(onsets) == 0:
return None
return Annotations(onsets, durations, desc, meas_date)

89
mne/io/ctf/markers.py Normal file
View File

@@ -0,0 +1,89 @@
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import os.path as op
from io import BytesIO
import numpy as np
from ...annotations import Annotations
from .info import _convert_time
from .res4 import _read_res4
def _get_markers(fname):
def consume(fid, predicate): # just a consumer to move around conveniently
while predicate(fid.readline()):
pass
def parse_marker(string): # XXX: there should be a nicer way to do that
data = np.genfromtxt(
BytesIO(string.encode()), dtype=[("trial", int), ("sync", float)]
)
return int(data["trial"]), float(data["sync"])
markers = dict()
with open(fname) as fid:
consume(fid, lambda line: not line.startswith("NUMBER OF MARKERS:"))
num_of_markers = int(fid.readline())
for _ in range(num_of_markers):
consume(fid, lambda line: not line.startswith("NAME:"))
label = fid.readline().strip("\n")
consume(fid, lambda line: not line.startswith("NUMBER OF SAMPLES:"))
n_markers = int(fid.readline())
consume(fid, lambda line: not line.startswith("LIST OF SAMPLES:"))
next(fid) # skip the samples header
markers[label] = [parse_marker(next(fid)) for _ in range(n_markers)]
return markers
def _get_res4_info_needed_by_markers(directory):
"""Get required information from CTF res4 information file."""
# we only need a few values from res4. Maybe we can read them directly
# instead of parsing the entire res4 file.
res4 = _read_res4(directory)
total_offset_duration = res4["pre_trig_pts"] / res4["sfreq"]
trial_duration = res4["nsamp"] / res4["sfreq"]
meas_date = (_convert_time(res4["data_date"], res4["data_time"]), 0)
return total_offset_duration, trial_duration, meas_date
def _read_annotations_ctf(directory):
total_offset, trial_duration, meas_date = _get_res4_info_needed_by_markers(
directory
)
return _read_annotations_ctf_call(
directory, total_offset, trial_duration, meas_date
)
def _read_annotations_ctf_call(directory, total_offset, trial_duration, meas_date):
fname = op.join(directory, "MarkerFile.mrk")
if not op.exists(fname):
return Annotations(list(), list(), list(), orig_time=meas_date)
else:
markers = _get_markers(fname)
onset = [
synctime + (trialnum * trial_duration) + total_offset
for _, m in markers.items()
for (trialnum, synctime) in m
]
description = np.concatenate(
[np.repeat(label, len(m)) for label, m in markers.items()]
)
return Annotations(
onset=onset,
duration=np.zeros_like(onset),
description=description,
orig_time=meas_date,
)

232
mne/io/ctf/res4.py Normal file
View File

@@ -0,0 +1,232 @@
"""Read .res4 files."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import os.path as op
import numpy as np
from ...utils import logger
from .constants import CTF
def _make_ctf_name(directory, extra, raise_error=True):
"""Make a CTF name."""
fname = op.join(directory, op.basename(directory)[:-3] + "." + extra)
found = True
if not op.isfile(fname):
if raise_error:
raise OSError(f"Standard file {fname} not found")
found = False
return fname, found
def _read_double(fid, n=1):
"""Read a double."""
return np.fromfile(fid, ">f8", n)
def _read_string(fid, n_bytes, decode=True):
"""Read string."""
s0 = fid.read(n_bytes)
s = s0.split(b"\x00")[0]
return s.decode("utf-8") if decode else s
def _read_ustring(fid, n_bytes):
"""Read unsigned character string."""
return np.fromfile(fid, ">B", n_bytes)
def _read_int2(fid):
"""Read int from short."""
return _auto_cast(np.fromfile(fid, ">i2", 1)[0])
def _read_int(fid):
"""Read a 32-bit integer."""
return np.fromfile(fid, ">i4", 1)[0]
def _move_to_next(fid, byte=8):
"""Move to next byte boundary."""
now = fid.tell()
if now % byte != 0:
now = now - (now % byte) + byte
fid.seek(now, 0)
def _read_filter(fid):
"""Read filter information."""
f = dict()
f["freq"] = _read_double(fid)[0]
f["class"] = _read_int(fid)
f["type"] = _read_int(fid)
f["npar"] = _read_int2(fid)
f["pars"] = _read_double(fid, f["npar"])
return f
def _read_comp_coeff(fid, d):
"""Read compensation coefficients."""
# Read the coefficients and initialize
d["ncomp"] = _read_int2(fid)
d["comp"] = list()
# Read each record
dt = np.dtype(
[
("sensor_name", "S32"),
("coeff_type", ">i4"),
("d0", ">i4"),
("ncoeff", ">i2"),
("sensors", f"S{CTF.CTFV_SENSOR_LABEL}", CTF.CTFV_MAX_BALANCING),
("coeffs", ">f8", CTF.CTFV_MAX_BALANCING),
]
)
comps = np.fromfile(fid, dt, d["ncomp"])
for k in range(d["ncomp"]):
comp = dict()
d["comp"].append(comp)
comp["sensor_name"] = comps["sensor_name"][k].split(b"\x00")[0].decode("utf-8")
comp["coeff_type"] = comps["coeff_type"][k].item()
comp["ncoeff"] = comps["ncoeff"][k].item()
comp["sensors"] = [
s.split(b"\x00")[0].decode("utf-8")
for s in comps["sensors"][k][: comp["ncoeff"]]
]
comp["coeffs"] = comps["coeffs"][k][: comp["ncoeff"]]
comp["scanno"] = d["ch_names"].index(comp["sensor_name"])
def _read_res4(dsdir):
"""Read the magical res4 file."""
# adapted from read_res4.c
name, _ = _make_ctf_name(dsdir, "res4")
res = dict()
with open(name, "rb") as fid:
# Read the fields
res["head"] = _read_string(fid, 8)
res["appname"] = _read_string(fid, 256)
res["origin"] = _read_string(fid, 256)
res["desc"] = _read_string(fid, 256)
res["nave"] = _read_int2(fid)
res["data_time"] = _read_string(fid, 255)
res["data_date"] = _read_string(fid, 255)
# Seems that date and time can be swapped
# (are they entered manually?!)
if "/" in res["data_time"] and ":" in res["data_date"]:
data_date = res["data_date"]
res["data_date"] = res["data_time"]
res["data_time"] = data_date
res["nsamp"] = _read_int(fid)
res["nchan"] = _read_int2(fid)
_move_to_next(fid, 8)
res["sfreq"] = _read_double(fid)[0]
res["epoch_time"] = _read_double(fid)[0]
res["no_trials"] = _read_int2(fid)
_move_to_next(fid, 4)
res["pre_trig_pts"] = _read_int(fid)
res["no_trials_done"] = _read_int2(fid)
res["no_trials_bst_message_windowlay"] = _read_int2(fid)
_move_to_next(fid, 4)
res["save_trials"] = _read_int(fid)
res["primary_trigger"] = fid.read(1)
res["secondary_trigger"] = [
fid.read(1) for k in range(CTF.CTFV_MAX_AVERAGE_BINS)
]
res["trigger_polarity_mask"] = fid.read(1)
res["trigger_mode"] = _read_int2(fid)
_move_to_next(fid, 4)
res["accept_reject"] = _read_int(fid)
res["run_time_bst_message_windowlay"] = _read_int2(fid)
_move_to_next(fid, 4)
res["zero_head"] = _read_int(fid)
_move_to_next(fid, 4)
res["artifact_mode"] = _read_int(fid)
_read_int(fid) # padding
res["nf_run_name"] = _read_string(fid, 32)
res["nf_run_title"] = _read_string(fid, 256)
res["nf_instruments"] = _read_string(fid, 32)
res["nf_collect_descriptor"] = _read_string(fid, 32)
res["nf_subject_id"] = _read_string(fid, 32)
res["nf_operator"] = _read_string(fid, 32)
if len(res["nf_operator"]) == 0:
res["nf_operator"] = None
res["nf_sensor_file_name"] = _read_ustring(fid, 60)
_move_to_next(fid, 4)
res["rdlen"] = _read_int(fid)
fid.seek(CTF.FUNNY_POS, 0)
if res["rdlen"] > 0:
res["run_desc"] = _read_string(fid, res["rdlen"])
# Filters
res["nfilt"] = _read_int2(fid)
res["filters"] = list()
for k in range(res["nfilt"]):
res["filters"].append(_read_filter(fid))
# Channel information (names, then data)
res["ch_names"] = list()
for k in range(res["nchan"]):
ch_name = _read_string(fid, 32)
res["ch_names"].append(ch_name)
_coil_dt = np.dtype(
[
("pos", ">f8", 3),
("d0", ">f8"),
("norm", ">f8", 3),
("d1", ">f8"),
("turns", ">i2"),
("d2", ">i4"),
("d3", ">i2"),
("area", ">f8"),
]
)
_ch_dt = np.dtype(
[
("sensor_type_index", ">i2"),
("original_run_no", ">i2"),
("coil_type", ">i4"),
("proper_gain", ">f8"),
("qgain", ">f8"),
("io_gain", ">f8"),
("io_offset", ">f8"),
("num_coils", ">i2"),
("grad_order_no", ">i2"),
("d0", ">i4"),
("coil", _coil_dt, CTF.CTFV_MAX_COILS),
("head_coil", _coil_dt, CTF.CTFV_MAX_COILS),
]
)
chs = np.fromfile(fid, _ch_dt, res["nchan"])
for coil in (chs["coil"], chs["head_coil"]):
coil["pos"] /= 100.0
coil["area"] *= 1e-4
# convert to dict
chs = [dict(zip(chs.dtype.names, x)) for x in chs]
for ch in chs:
for key, val in ch.items():
ch[key] = _auto_cast(val)
res["chs"] = chs
for k in range(res["nchan"]):
res["chs"][k]["ch_name"] = res["ch_names"][k]
# The compensation coefficients
_read_comp_coeff(fid, res)
logger.info(" res4 data read.")
return res
def _auto_cast(x):
# Upcast scalars
if isinstance(x, np.ScalarType):
if x.dtype.kind == "i":
if x.dtype != np.int64:
x = x.astype(np.int64)
elif x.dtype.kind == "f":
if x.dtype != np.float64:
x = x.astype(np.float64)
return x

132
mne/io/ctf/trans.py Normal file
View File

@@ -0,0 +1,132 @@
"""Create coordinate transforms."""
# Authors: The MNE-Python contributors.
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.
import numpy as np
from ..._fiff.constants import FIFF
from ...transforms import (
Transform,
_fit_matched_points,
_quat_to_affine,
apply_trans,
combine_transforms,
get_ras_to_neuromag_trans,
invert_transform,
)
from ...utils import logger
from .constants import CTF
def _make_transform_card(fro, to, r_lpa, r_nasion, r_rpa):
"""Make a transform from cardinal landmarks."""
return invert_transform(
Transform(to, fro, get_ras_to_neuromag_trans(r_nasion, r_lpa, r_rpa))
)
def _quaternion_align(from_frame, to_frame, from_pts, to_pts, diff_tol=1e-4):
"""Perform an alignment using the unit quaternions (modifies points)."""
assert from_pts.shape[1] == to_pts.shape[1] == 3
trans = _quat_to_affine(_fit_matched_points(from_pts, to_pts)[0])
# Test the transformation and print the results
logger.info(" Quaternion matching (desired vs. transformed):")
for fro, to in zip(from_pts, to_pts):
rr = apply_trans(trans, fro)
diff = np.linalg.norm(to - rr)
logger.info(
" %7.2f %7.2f %7.2f mm <-> %7.2f %7.2f %7.2f mm "
"(orig : %7.2f %7.2f %7.2f mm) diff = %8.3f mm"
% (tuple(1000 * to) + tuple(1000 * rr) + tuple(1000 * fro) + (1000 * diff,))
)
if diff > diff_tol:
raise RuntimeError(
"Something is wrong: quaternion matching did not work (see above)"
)
return Transform(from_frame, to_frame, trans)
def _make_ctf_coord_trans_set(res4, coils):
"""Figure out the necessary coordinate transforms."""
# CTF head > Neuromag head
lpa = rpa = nas = T1 = T2 = T3 = T5 = None
if coils is not None:
for p in coils:
if p["valid"] and (p["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD):
if lpa is None and p["kind"] == CTF.CTFV_COIL_LPA:
lpa = p
elif rpa is None and p["kind"] == CTF.CTFV_COIL_RPA:
rpa = p
elif nas is None and p["kind"] == CTF.CTFV_COIL_NAS:
nas = p
if lpa is None or rpa is None or nas is None:
raise RuntimeError(
"Some of the mandatory HPI device-coordinate info was not there."
)
t = _make_transform_card("head", "ctf_head", lpa["r"], nas["r"], rpa["r"])
T3 = invert_transform(t)
# CTF device -> Neuromag device
#
# Rotate the CTF coordinate frame by 45 degrees and shift by 190 mm
# in z direction to get a coordinate system comparable to the Neuromag one
#
R = np.eye(4)
R[:3, 3] = [0.0, 0.0, 0.19]
val = 0.5 * np.sqrt(2.0)
R[0, 0] = val
R[0, 1] = -val
R[1, 0] = val
R[1, 1] = val
T4 = Transform("ctf_meg", "meg", R)
# CTF device -> CTF head
# We need to make the implicit transform explicit!
h_pts = dict()
d_pts = dict()
kinds = (
CTF.CTFV_COIL_LPA,
CTF.CTFV_COIL_RPA,
CTF.CTFV_COIL_NAS,
CTF.CTFV_COIL_SPARE,
)
if coils is not None:
for p in coils:
if p["valid"]:
if p["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_HEAD:
for kind in kinds:
if kind not in h_pts and p["kind"] == kind:
h_pts[kind] = p["r"]
elif p["coord_frame"] == FIFF.FIFFV_MNE_COORD_CTF_DEVICE:
for kind in kinds:
if kind not in d_pts and p["kind"] == kind:
d_pts[kind] = p["r"]
if any(kind not in h_pts for kind in kinds[:-1]):
raise RuntimeError(
"Some of the mandatory HPI device-coordinate info was not there."
)
if any(kind not in d_pts for kind in kinds[:-1]):
raise RuntimeError(
"Some of the mandatory HPI head-coordinate info was not there."
)
use_kinds = [kind for kind in kinds if (kind in h_pts and kind in d_pts)]
r_head = np.array([h_pts[kind] for kind in use_kinds])
r_dev = np.array([d_pts[kind] for kind in use_kinds])
T2 = _quaternion_align("ctf_meg", "ctf_head", r_dev, r_head)
# The final missing transform
if T3 is not None and T2 is not None:
T5 = combine_transforms(T2, T3, "ctf_meg", "head")
T1 = combine_transforms(invert_transform(T4), T5, "meg", "head")
s = dict(
t_dev_head=T1,
t_ctf_dev_ctf_head=T2,
t_ctf_head_head=T3,
t_ctf_dev_dev=T4,
t_ctf_dev_head=T5,
)
logger.info(" Coordinate transformations established.")
return s