From dd2ac058af5b2a9722d257187c0eb07c5ad51984 Mon Sep 17 00:00:00 2001 From: tyler Date: Fri, 30 Jan 2026 20:16:55 -0800 Subject: [PATCH] temp fc --- fc.py | 207 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 fc.py diff --git a/fc.py b/fc.py new file mode 100644 index 0000000..fc730fb --- /dev/null +++ b/fc.py @@ -0,0 +1,207 @@ +import mne +import numpy as np + +from mne.preprocessing.nirs import optical_density, beer_lambert_law +from mne_connectivity import spectral_connectivity_epochs +from mne_connectivity.viz import plot_connectivity_circle + +raw = mne.io.read_raw_snirf("E:/CVI_V_Adults_Cor/P21_CVI_V_updated.snirf", preload=True) + +raw.info["bads"] = [] # mark bad channels here if needed + +raw_od = optical_density(raw) +raw_hb = beer_lambert_law(raw_od) + +raw_hbo = raw_hb.copy().pick(picks="hbo") + +raw_hbo.filter( + l_freq=0.01, + h_freq=0.2, + picks="hbo", + verbose=False +) + +events = mne.make_fixed_length_events( + raw_hbo, + duration=30.0 +) + +epochs = mne.Epochs( + raw_hbo, + events, + tmin=0, + tmax=30.0, + baseline=None, + preload=True, + verbose=False +) + +data = epochs.get_data() # (n_epochs, n_channels, n_times) +names = epochs.ch_names +sfreq = epochs.info["sfreq"] + + +con = spectral_connectivity_epochs( + data, + method=["coh", "plv"], + mode="multitaper", + sfreq=sfreq, + fmin=0.04, + fmax=0.2, + faverage=True, + verbose=True +) + + +con_coh, con_plv = con + + +coh = con_coh.get_data(output="dense").squeeze() +plv = con_plv.get_data(output="dense").squeeze() + +np.fill_diagonal(coh, 0) +np.fill_diagonal(plv, 0) + +plot_connectivity_circle( + coh, + names, + title="fNIRS Functional Connectivity (HbO - Coherence)", + n_lines=40 +) + + + +from mne_connectivity import envelope_correlation +env = envelope_correlation( + data, + orthogonalize=False, + absolute=True +) +env_data = env.get_data(output="dense") + +env_corr = env_data.mean(axis=0) + +env_corr = np.squeeze(env_corr) + +np.fill_diagonal(env_corr, 0) + +plot_connectivity_circle( + env_corr, + epochs.ch_names, + title="fNIRS HbO Envelope Correlation (Task Connectivity)", + n_lines=40 +) + + +from mne_nirs.statistics import run_glm +from mne_nirs.experimental_design import make_first_level_design_matrix + +raw_hb.annotations.description = [ + f"Reach_{i}" if d == "Reach" else d + for i, d in enumerate(raw_hb.annotations.description) +] + +design_matrix = make_first_level_design_matrix( + raw_hb, + stim_dur=1.0, # We assume a short burst since duration is unknown + hrf_model='fir', # Finite Impulse Response + fir_delays=np.arange(0, 12) # Look at 0-20 seconds after onset +) + +# 2. Run the GLM +# This calculates the brain's response for every channel +glm_est = run_glm(raw_hb, design_matrix) + +import pandas as pd + +# 3. Extract Beta Weights +beta_df = glm_est.to_dataframe() + +print("\n--- DEBUG: Dataframe Info ---") +print(f"Total rows in beta_df: {len(beta_df)}") +print(f"Columns available: {list(beta_df.columns)}") +print(f"Unique Chroma values: {beta_df['Chroma'].unique()}") +print(f"First 5 unique Conditions: {beta_df['Condition'].unique()[:5]}") + +# FIX: Use .str.contains() because FIR conditions are named like 'Reach[5.0]' +# We filter for HbO AND any condition that starts with 'Reach' +hbo_betas = beta_df[(beta_df['Chroma'] == 'hbo') & + (beta_df['Condition'].str.contains('Reach'))] + +hbo_betas = hbo_betas.copy() +hbo_betas[['Trial', 'Delay']] = hbo_betas['Condition'].str.extract(r'(Reach_\d+)_delay_(\d+)') + +# 2. Find which DELAY (time point) is best across ALL trials +# We convert 'Delay' to numeric so we can sort them properly later if needed +hbo_betas['Delay'] = pd.to_numeric(hbo_betas['Delay']) + +# IMPORTANT: We ignore delay 0 and 1 because they are usually 0.0000 (stimulus onset) +# Brain responses in fNIRS usually peak between delays 4 and 8 (4-8 seconds) + +mask = (hbo_betas['Delay'] >= 4) & (hbo_betas['Delay'] <= 8) +hbo_window = hbo_betas[mask] + +if hbo_window.empty: + print("Warning: No data found in the 4-8s window. Check your 'fir_delays' range.") + # Fallback to whatever is available if 4-8 is missing + mean_by_delay = hbo_betas.groupby('Delay')['theta'].mean() +else: + mean_by_delay = hbo_window.groupby('Delay')['theta'].mean() + +peak_delay_num = mean_by_delay.idxmax() + +print(f"\n--- DEBUG: FIR Timing ---") +print(f"Delays analyzed: {list(mean_by_delay.index)}") +print(f"Peak brain response found at delay: {peak_delay_num}") + +# 3. Filter the data to ONLY include that peak delay across ALL trials +peak_df = hbo_betas[hbo_betas['Delay'] == peak_delay_num] + +mne_order = raw_hbo.ch_names + +# 4. Pivot: Rows = Trials (Reach_1, Reach_2...), Columns = Channels +beta_pivot = peak_df.pivot(index='Trial', columns='ch_name', values='theta') + +beta_pivot = beta_pivot.reindex(columns=mne_order) + +print(f"Pivot table shape: {beta_pivot.shape} (Should be something like 30 trials x 26 channels)") + +# 5. Correlation (Now it has a series of data to correlate!) +beta_corr_matrix = beta_pivot.corr().values +np.fill_diagonal(beta_corr_matrix, 0) + +# Replace any NaNs with 0 (occurs if a channel has 0 variance) +beta_corr_matrix = np.nan_to_num(beta_corr_matrix) + +import matplotlib.pyplot as plt +channel_names = beta_pivot.columns.tolist() +# Create the plot +plot_connectivity_circle( + beta_corr_matrix, + channel_names, + n_lines=40, # Show only the top 40 strongest connections + title=f"FIR Beta Series Connectivity)", +) + +# 1. Aggregate the mean response for each delay across all trials and channels +# We want to see the general 'shape' of the Reach response +time_points = np.arange(0, 12) # Matches your fir_delays +average_response = hbo_betas.groupby('Delay')['theta'].mean() + +# 2. Plotting +plt.figure(figsize=(10, 6)) +for ch in hbo_betas['ch_name'].unique(): + ch_data = hbo_betas[hbo_betas['ch_name'] == ch].groupby('Delay')['theta'].mean() + plt.plot(time_points, ch_data, color='gray', alpha=0.3) # Individual channels + +# Plot the 'Grand Average' in bold red +plt.plot(time_points, average_response, color='red', linewidth=3, label='Grand Average') + +plt.axvline(x=4, color='green', linestyle='--', label='Window Start (4s)') +plt.axvline(x=8, color='green', linestyle='--', label='Window End (8s)') +plt.title("FIR Hemodynamic Response to 'Reach' (HbO)") +plt.xlabel("Seconds after Stimulus") +plt.ylabel("HbO Concentration (Beta Weight)") +plt.legend() +plt.grid(True, alpha=0.3) +plt.show() \ No newline at end of file