improvements

This commit is contained in:
2026-01-14 23:54:03 -08:00
parent 473c945563
commit fe4e8904b4
8 changed files with 225 additions and 186 deletions

307
flares.py
View File

@@ -21,6 +21,7 @@ import os.path as op
import re
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
from queue import Empty
# External library imports
import matplotlib.pyplot as plt
@@ -53,7 +54,7 @@ from scipy.signal import welch, butter, filtfilt # type: ignore
import pywt # type: ignore
import neurokit2 as nk # type: ignore
# Backen visualization needed to be defined for pyinstaller
# Backend visualization needed to be defined for pyinstaller
import pyvistaqt # type: ignore
import vtkmodules.util.data_model
import vtkmodules.util.execution_model
@@ -89,9 +90,10 @@ from mne_nirs.io.fold import fold_channel_specificity # type: ignore
from mne_nirs.preprocessing import peak_power # type: ignore
from mne_nirs.statistics._glm_level_first import RegressionResults # type: ignore
# Needs to be set for men
os.environ["SUBJECTS_DIR"] = str(data_path()) + "/subjects" # type: ignore
# TODO: Tidy this up
FIXED_CATEGORY_COLORS = {
"SCI only": "skyblue",
"PSP only": "salmon",
@@ -112,10 +114,6 @@ FIXED_CATEGORY_COLORS = {
}
AGE: float
GENDER: str
# SECONDS_TO_STRIP: int
DOWNSAMPLE: bool
DOWNSAMPLE_FREQUENCY: int
@@ -123,21 +121,37 @@ TRIM: bool
SECONDS_TO_KEEP: float
OPTODE_PLACEMENT: bool
SHOW_OPTODE_NAMES: bool
HEART_RATE: bool
SHORT_CHANNEL: bool
SHORT_CHANNEL_THRESH: float
LONG_CHANNEL_THRESH: float
HEART_RATE: bool
SECONDS_TO_STRIP_HR: int
MAX_LOW_HR: int
MAX_HIGH_HR: int
SMOOTHING_WINDOW_HR: int
HEART_RATE_WINDOW: int
SCI: bool
SCI_TIME_WINDOW: int
SCI_THRESHOLD: float
SNR: bool
# SNR_TIME_WINDOW : int
# SNR_TIME_WINDOW : int #TODO: is this needed?
SNR_THRESHOLD: float
PSP: bool
PSP_TIME_WINDOW: int
PSP_THRESHOLD: float
BAD_CHANNELS_HANDLING: str
MAX_DIST: float
MIN_NEIGHBORS: int
TDDR: bool
WAVELET: bool
@@ -145,57 +159,39 @@ IQR: float
WAVELET_TYPE: str
WAVELET_LEVEL: int
HEART_RATE = True # True if heart rate should be calculated. This helps the SCI, PSP, and SNR methods to be more accurate.
SECONDS_TO_STRIP_HR =5 # Amount of seconds to temporarily strip from the data to calculate heart rate more effectively. Useful if participant removed cap while still recording.
MAX_LOW_HR = 40 # Any heart rate values lower than this will be set to this value.
MAX_HIGH_HR = 200 # Any heart rate values higher than this will be set to this value.
SMOOTHING_WINDOW_HR = 100 # Heart rate will be calculated as a rolling average over this many amount of samples.
HEART_RATE_WINDOW = 25 # Amount of BPM above and below the calculated average to use for a range of resting BPM.
ENHANCE_NEGATIVE_CORRELATION: bool
FILTER: bool
L_FREQ: float
H_FREQ: float
L_TRANS_BANDWIDTH: float
H_TRANS_BANDWIDTH: float
SHORT_CHANNEL: bool
SHORT_CHANNEL_THRESH: float
LONG_CHANNEL_THRESH: float
STIM_DUR: float
HRF_MODEL: str
DRIFT_MODEL: str
HIGH_PASS: float
DRIFT_ORDER: int
FIR_DELAYS: range
MIN_ONSET: int
OVERSAMPLING: int
REMOVE_EVENTS: list
SHORT_CHANNEL_REGRESSION: bool
NOISE_MODEL: str
BINS: int
N_JOBS: int
TIME_WINDOW_START: int
TIME_WINDOW_END: int
DRIFT_MODEL: str
VERBOSITY = True
# FIXME: Shouldn't need each ordering - just order it before checking
FIXED_CATEGORY_COLORS = {
"SCI only": "skyblue",
"PSP only": "salmon",
"SNR only": "lightgreen",
"PSP + SCI": "orange",
"SCI + SNR": "violet",
"PSP + SNR": "gold",
"SCI + PSP": "orange",
"SNR + SCI": "violet",
"SNR + PSP": "gold",
"PSP + SNR + SCI": "gray",
"SCI + PSP + SNR": "gray",
"SCI + SNR + PSP": "gray",
"PSP + SCI + SNR": "gray",
"PSP + SNR + SCI": "gray",
"SNR + SCI + PSP": "gray",
"SNR + PSP + SCI": "gray",
}
AGE = 25
AGE = 25 # Assume 25 if not set from the GUI. This will result in a reasonable PPF
GENDER = ""
GROUP = "Default"
# These are parameters that are required for the analysis
REQUIRED_KEYS: dict[str, Any] = {
# "SECONDS_TO_STRIP": int,
@@ -262,7 +258,7 @@ PLATFORM_NAME = platform.system().lower()
# Configure logging to file with timestamps and realtime flush
if PLATFORM_NAME == 'darwin':
logging.basicConfig(
filename=os.path.join(os.path.dirname(sys.executable), "../../../fnirs_analysis.log"),
filename=os.path.join(os.path.dirname(sys.executable), "../../../fnirs_analysis.log"), # Needed to get out of the bundled application
level=logging.INFO,
format='%(asctime)s - %(processName)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
@@ -320,8 +316,6 @@ def set_metadata(file_path, metadata: dict[str, Any]) -> None:
val = file_metadata.get(key, None)
if val not in (None, '', [], {}, ()): # check for "empty" values
globals()[key] = val
from queue import Empty # This works with multiprocessing.Manager().Queue()
def gui_entry(config: dict[str, Any], gui_queue: Queue, progress_queue: Queue) -> None:
def forward_progress():
@@ -825,7 +819,7 @@ def get_hbo_hbr_picks(raw):
return hbo_picks, hbr_picks, hbo_wl, hbr_wl
def interpolate_fNIRS_bads_weighted_average(raw, bad_channels, max_dist=0.03, min_neighbors=2):
def interpolate_fNIRS_bads_weighted_average(raw, max_dist=0.03, min_neighbors=2):
"""
Interpolate bad fNIRS channels using a distance-weighted average of nearby good channels.
@@ -1117,17 +1111,17 @@ def mark_bads(raw, bad_sci, bad_snr, bad_psp):
def filter_the_data(raw_haemo):
# --- STEP 5: Filtering (0.010.2 Hz bandpass) ---
# --- STEP 5: Filtering (0.01-0.2 Hz bandpass) ---
fig_filter = raw_haemo.compute_psd(fmax=3).plot(
average=True, color="r", show=False, amplitude=True
)
if L_FREQ == 0 and H_FREQ != 0:
raw_haemo = raw_haemo.filter(l_freq=None, h_freq=H_FREQ, h_trans_bandwidth=0.02)
raw_haemo = raw_haemo.filter(l_freq=None, h_freq=H_FREQ, h_trans_bandwidth=H_TRANS_BANDWIDTH)
elif L_FREQ != 0 and H_FREQ == 0:
raw_haemo = raw_haemo.filter(l_freq=L_FREQ, h_freq=None, l_trans_bandwidth=0.002)
raw_haemo = raw_haemo.filter(l_freq=L_FREQ, h_freq=None, l_trans_bandwidth=L_TRANS_BANDWIDTH)
elif L_FREQ != 0 and H_FREQ != 0:
raw_haemo = raw_haemo.filter(l_freq=L_FREQ, h_freq=H_FREQ, l_trans_bandwidth=0.002, h_trans_bandwidth=0.02)
raw_haemo = raw_haemo.filter(l_freq=L_FREQ, h_freq=H_FREQ, l_trans_bandwidth=L_TRANS_BANDWIDTH, h_trans_bandwidth=H_TRANS_BANDWIDTH)
else:
print("No filter")
#raw_haemo = raw_haemo.filter(l_freq=None, h_freq=0.4, h_trans_bandwidth=0.2)
@@ -1307,6 +1301,19 @@ def epochs_calculations(raw_haemo, events, event_dict):
def make_design_matrix(raw_haemo, short_chans):
events_to_remove = REMOVE_EVENTS
filtered_annotations = [ann for ann in raw_haemo.annotations if ann['description'] not in events_to_remove]
new_annot = Annotations(
onset=[ann['onset'] for ann in filtered_annotations],
duration=[ann['duration'] for ann in filtered_annotations],
description=[ann['description'] for ann in filtered_annotations]
)
# Set the new annotations
raw_haemo.set_annotations(new_annot)
raw_haemo.resample(1, npad="auto")
raw_haemo._data = raw_haemo._data * 1e6
# 2) Create design matrix
@@ -1314,26 +1321,28 @@ def make_design_matrix(raw_haemo, short_chans):
short_chans.resample(1)
design_matrix = make_first_level_design_matrix(
raw=raw_haemo,
hrf_model='fir',
stim_dur=0.5,
fir_delays=range(15),
stim_dur=STIM_DUR,
hrf_model=HRF_MODEL,
drift_model=DRIFT_MODEL,
high_pass=0.01,
oversampling=1,
min_onset=-125,
high_pass=HIGH_PASS,
drift_order=DRIFT_ORDER,
fir_delays=range(15),
add_regs=short_chans.get_data().T,
add_reg_names=short_chans.ch_names
add_reg_names=short_chans.ch_names,
min_onset=MIN_ONSET,
oversampling=OVERSAMPLING
)
else:
design_matrix = make_first_level_design_matrix(
raw=raw_haemo,
hrf_model='fir',
stim_dur=0.5,
fir_delays=range(15),
stim_dur=STIM_DUR,
hrf_model=HRF_MODEL,
drift_model=DRIFT_MODEL,
high_pass=0.01,
oversampling=1,
min_onset=-125,
high_pass=HIGH_PASS,
drift_order=DRIFT_ORDER,
fir_delays=range(15),
min_onset=MIN_ONSET,
oversampling=OVERSAMPLING
)
print(design_matrix.head())
@@ -3310,18 +3319,20 @@ def hr_calc(raw):
return fig, hr1, hr2, low, high
def process_participant(file_path, progress_callback=None):
fig_individual: dict[str, Figure] = {}
# Step 1: Load
# Step 1: Preprocessing
raw = load_snirf(file_path)
fig_raw = raw.plot(duration=raw.times[-1], n_channels=raw.info['nchan'], title="Loaded Raw", show=False)
fig_individual["Loaded Raw"] = fig_raw
if progress_callback: progress_callback(1)
logger.info("1")
logger.info("Step 1 Completed.")
# Step 2: Trimming
if TRIM:
if hasattr(raw, 'annotations') and len(raw.annotations) > 0:
# Get time of first event
@@ -3329,17 +3340,16 @@ def process_participant(file_path, progress_callback=None):
trim_time = max(0, first_event_time - SECONDS_TO_KEEP) # Ensure we don't go negative
raw.crop(tmin=trim_time)
# Shift annotation onsets to match new t=0
import mne
ann = raw.annotations
ann_shifted = mne.Annotations(
ann_shifted = Annotations(
onset=ann.onset - trim_time, # shift to start at zero
duration=ann.duration,
description=ann.description
)
data = raw.get_data()
info = raw.info.copy()
raw = mne.io.RawArray(data, info)
raw = RawArray(data, info)
raw.set_annotations(ann_shifted)
logger.info(f"Trimmed raw data: start at {trim_time}s (5s before first event), t=0 at new start")
@@ -3349,169 +3359,162 @@ def process_participant(file_path, progress_callback=None):
fig_trimmed = raw.plot(duration=raw.times[-1], n_channels=raw.info['nchan'], title="Trimmed Raw", show=False)
fig_individual["Trimmed Raw"] = fig_trimmed
if progress_callback: progress_callback(2)
logger.info("2")
logger.info("Step 2 Completed.")
# Step 1.5: Verify optode positions
# Step 3: Verify Optode Placement
if OPTODE_PLACEMENT:
fig_optodes = raw.plot_sensors(show_names=True, to_sphere=True, show=False) # type: ignore
fig_optodes = raw.plot_sensors(show_names=SHOW_OPTODE_NAMES, to_sphere=True, show=False) # type: ignore
fig_individual["Plot Sensors"] = fig_optodes
if progress_callback: progress_callback(3)
logger.info("3")
logger.info("Step 3 Completed.")
# Step 2: Bad from SCI
# Step 4: Short/Long Channels
if SHORT_CHANNEL:
short_chans = get_short_channels(raw, max_dist=SHORT_CHANNEL_THRESH)
fig_short_chans = short_chans.plot(duration=raw.times[-1], n_channels=raw.info['nchan'], title="Short Channels Only", show=False)
fig_individual["short"] = fig_short_chans
else:
short_chans = None
get_long_channels(raw, min_dist=SHORT_CHANNEL_THRESH, max_dist=LONG_CHANNEL_THRESH) # Don't update the existing raw
if progress_callback: progress_callback(4)
logger.info("Step 4 Completed.")
# Step 5: Heart Rate
if HEART_RATE:
fig, hr1, hr2, low, high = hr_calc(raw)
fig_individual["PSD"] = fig
fig_individual['HeartRate_PSD'] = hr1
fig_individual['HeartRate_Time'] = hr2
if progress_callback: progress_callback(4)
logger.info("4")
if progress_callback: progress_callback(5)
logger.info("Step 5 Completed.")
# Step 6: Scalp Coupling Index
bad_sci = []
if SCI:
bad_sci, fig_sci_1, fig_sci_2 = calculate_scalp_coupling(raw, low, high)
if HEART_RATE:
bad_sci, fig_sci_1, fig_sci_2 = calculate_scalp_coupling(raw, low, high)
else:
bad_sci, fig_sci_1, fig_sci_2 = calculate_scalp_coupling(raw)
fig_individual["SCI1"] = fig_sci_1
fig_individual["SCI2"] = fig_sci_2
if progress_callback: progress_callback(5)
logger.info("5")
if progress_callback: progress_callback(6)
logger.info("Step 6 Completed.")
# Step 2: Bad from SNR
# Step 7: Signal to Noise Ratio
bad_snr = []
if SNR:
bad_snr, fig_snr = calculate_signal_noise_ratio(raw)
fig_individual["SNR1"] = fig_snr
if progress_callback: progress_callback(6)
logger.info("6")
if progress_callback: progress_callback(7)
logger.info("Step 7 Completed.")
# Step 3: Bad from PSP
# Step 8: Peak Spectral Power
bad_psp = []
if PSP:
bad_psp, fig_psp1, fig_psp2 = calculate_peak_power(raw)
fig_individual["PSP1"] = fig_psp1
fig_individual["PSP2"] = fig_psp2
if progress_callback: progress_callback(7)
logger.info("7")
# Step 4: Mark the bad channels
raw, fig_dropped, fig_raw_before, bad_channels = mark_bads(raw, bad_sci, bad_snr, bad_psp)
if fig_dropped and fig_raw_before is not None:
fig_individual["fig2"] = fig_dropped
fig_individual["fig3"] = fig_raw_before
if progress_callback: progress_callback(8)
logger.info("8")
logger.info("Step 8 Completed.")
# Step 9: Bad Channels Handling
if BAD_CHANNELS_HANDLING != "None":
raw, fig_dropped, fig_raw_before, bad_channels = mark_bads(raw, bad_sci, bad_snr, bad_psp)
if fig_dropped and fig_raw_before is not None:
fig_individual["fig2"] = fig_dropped
fig_individual["fig3"] = fig_raw_before
if bad_channels:
if BAD_CHANNELS_HANDLING == "Interpolate":
raw, fig_raw_after = interpolate_fNIRS_bads_weighted_average(raw, max_dist=MAX_DIST, min_neighbors=MIN_NEIGHBORS)
fig_individual["fig4"] = fig_raw_after
elif BAD_CHANNELS_HANDLING == "Remove":
pass
#TODO: Is there more needed here?
# Step 5: Interpolate the bad channels
if bad_channels:
raw, fig_raw_after = interpolate_fNIRS_bads_weighted_average(raw, bad_channels)
fig_individual["fig4"] = fig_raw_after
if progress_callback: progress_callback(9)
logger.info("9")
logger.info("Step 9 Completed.")
# Step 6: Optical Density
# Step 10: Optical Density
raw_od = optical_density(raw)
fig_raw_od = raw_od.plot(duration=raw.times[-1], n_channels=raw.info['nchan'], title="Optical Density", show=False)
fig_individual["Optical Density"] = fig_raw_od
if progress_callback: progress_callback(10)
logger.info("10")
logger.info("Step 10 Completed.")
# Step 7: TDDR
# Step 11: Temporal Derivative Distribution Repair Filtering
if TDDR:
raw_od = temporal_derivative_distribution_repair(raw_od)
fig_raw_od_tddr = raw_od.plot(duration=raw.times[-1], n_channels=raw.info['nchan'], title="After TDDR (Motion Correction)", show=False)
fig_individual["TDDR"] = fig_raw_od_tddr
if progress_callback: progress_callback(11)
logger.info("11")
logger.info("Step 11 Completed.")
# Step 12: Wavelet Filtering
if WAVELET:
raw_od, fig = calculate_and_apply_wavelet(raw_od)
fig_individual["Wavelet"] = fig
if progress_callback: progress_callback(12)
logger.info("12")
logger.info("Step 12 Completed.")
# Step 8: BLL
# Step 13: Haemoglobin Concentration
raw_haemo = beer_lambert_law(raw_od, ppf=calculate_dpf(file_path))
fig_raw_haemo_bll = raw_haemo.plot(duration=raw_haemo.times[-1], n_channels=raw_haemo.info['nchan'], title="HbO and HbR Signals", show=False)
fig_individual["BLL"] = fig_raw_haemo_bll
if progress_callback: progress_callback(13)
logger.info("13")
logger.info("Step 13 Completed.")
# Step 9: ENC
# Step 14: Enhance Negative Correlation
if ENHANCE_NEGATIVE_CORRELATION:
raw_haemo = enhance_negative_correlation(raw_haemo)
fig_raw_haemo_enc = raw_haemo.plot(duration=raw_haemo.times[-1], n_channels=raw_haemo.info['nchan'], title="HbO and HbR Signals", show=False)
fig_raw_haemo_enc = raw_haemo.plot(duration=raw_haemo.times[-1], n_channels=raw_haemo.info['nchan'], title="Enhance Negative Correlation", show=False)
fig_individual["ENC"] = fig_raw_haemo_enc
if progress_callback: progress_callback(14)
logger.info("14")
logger.info("Step 14 Completed.")
# Step 10: Filter
# Step 15: Filter
if FILTER:
raw_haemo, fig_filter, fig_raw_haemo_filter = filter_the_data(raw_haemo)
fig_individual["filter1"] = fig_filter
fig_individual["filter2"] = fig_raw_haemo_filter
if progress_callback: progress_callback(15)
logger.info("15")
logger.info("Step 15 Completed.")
# Step 11: Get short / long channels
if SHORT_CHANNEL:
short_chans = get_short_channels(raw_haemo, max_dist=SHORT_CHANNEL_THRESH)
fig_short_chans = short_chans.plot(duration=raw_haemo.times[-1], n_channels=raw_haemo.info['nchan'], title="Short Channels Only", show=False)
fig_individual["short"] = fig_short_chans
else:
short_chans = None
raw_haemo = get_long_channels(raw_haemo, min_dist=SHORT_CHANNEL_THRESH, max_dist=LONG_CHANNEL_THRESH)
if progress_callback: progress_callback(16)
logger.info("16")
# Step 12: Events from annotations
# Step 16: Extracting Events
events, event_dict = events_from_annotations(raw_haemo)
fig_events = plot_events(events, event_id=event_dict, sfreq=raw_haemo.info["sfreq"], show=False)
fig_individual["events"] = fig_events
if progress_callback: progress_callback(17)
logger.info("17")
if progress_callback: progress_callback(16)
logger.info("Step 16 Completed.")
# Step 13: Epoch calculations
# Step 17: Epoch Calculations
epochs, fig_epochs = epochs_calculations(raw_haemo, events, event_dict)
for name, fig in fig_epochs: # Unpack the tuple here
fig_individual[f"epochs_{name}"] = fig # Store only the figure, not the name
if progress_callback: progress_callback(18)
logger.info("18")
# Step 14: Design Matrix
events_to_remove = REMOVE_EVENTS
filtered_annotations = [ann for ann in raw.annotations if ann['description'] not in events_to_remove]
new_annot = Annotations(
onset=[ann['onset'] for ann in filtered_annotations],
duration=[ann['duration'] for ann in filtered_annotations],
description=[ann['description'] for ann in filtered_annotations]
)
# Set the new annotations
raw_haemo.set_annotations(new_annot)
for name, fig in fig_epochs:
fig_individual[f"epochs_{name}"] = fig
if progress_callback: progress_callback(17)
logger.info("Step 17 Completed.")
# Step 18: Design Matrix
design_matrix, fig_design_matrix = make_design_matrix(raw_haemo, short_chans)
fig_individual["Design Matrix"] = fig_design_matrix
if progress_callback: progress_callback(19)
logger.info("19")
if progress_callback: progress_callback(18)
logger.info("Step 18 Completed.")
# Step 15: Run GLM
glm_est = run_glm(raw_haemo, design_matrix)
# Step 19: Run GLM
glm_est = run_glm(raw_haemo, design_matrix, noise_model=NOISE_MODEL, bins=BINS, n_jobs=N_JOBS, verbose=VERBOSITY)
# Not used AppData\Local\Packages\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\LocalCache\local-packages\Python313\site-packages\nilearn\glm\contrasts.py
# Yes used AppData\Local\Packages\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\LocalCache\local-packages\Python313\site-packages\mne_nirs\utils\_io.py
# The p-value is calculated from this t-statistic using the Students t-distribution with appropriate degrees of freedom.
# The p-value is calculated from this t-statistic using the Student's t-distribution with appropriate degrees of freedom.
# p_value = 2 * stats.t.cdf(-abs(t_statistic), df)
# It is a two-tailed p-value.
# It says how likely it is to observe the effect you did (or something more extreme) if the true effect was zero (null hypothesis).
# A small p-value (e.g., < 0.05) suggests the effect is unlikely to be zero — its "statistically significant."
# A small p-value (e.g., < 0.05) suggests the effect is unlikely to be zero — it's "statistically significant."
# A large p-value means the data do not provide strong evidence that the effect is different from zero.
if progress_callback: progress_callback(20)
logger.info("20")
if progress_callback: progress_callback(19)
logger.info("19")
# Step 16: Plot GLM results
fig_glm_result = plot_glm_results(file_path, raw_haemo, glm_est, design_matrix)