19 Commits

Author SHA1 Message Date
1b5c31e4ef add pyinstaller to dependcies 2026-03-16 14:20:39 -07:00
276e1b1876 fix for xlsx and upgrade dependencies 2026-03-16 14:08:21 -07:00
3cfc6f01a2 typo 2026-03-13 16:57:51 -07:00
91fb9b38d4 copyright 2026-03-13 16:55:49 -07:00
1e346f8ef2 generalization and status bar fix 2026-03-13 14:07:40 -07:00
8c207b17ad more generalization 2026-03-13 13:40:49 -07:00
f794b98d18 changelog and generalization of updater exe 2026-03-13 13:14:00 -07:00
06972bee28 plz 2026-03-13 13:01:47 -07:00
c279f269da update logic seperated and blazes import 2026-03-13 13:00:57 -07:00
c7d044beed group fc 2026-02-03 17:29:34 -08:00
76df19f332 further issue fixes 2026-02-02 13:08:00 -08:00
22695a2281 functions instead of repeating 6 times 2026-02-01 14:12:46 -08:00
f1dd9bd184 release worthy? 2026-01-31 23:42:49 -08:00
dd2ac058af temp fc 2026-01-30 20:16:55 -08:00
98c749477c preferences 2026-01-30 15:38:12 -08:00
92973da658 fix to fold and memory leaks 2026-01-29 22:30:28 -08:00
f82978e2e8 changes and improvements 2026-01-29 17:23:52 -08:00
7007478c3b update ignore 2026-01-28 10:10:26 -08:00
fb728d5033 added support updating optode positions from .xlsx 2026-01-28 10:09:06 -08:00
8 changed files with 2996 additions and 1663 deletions

4
.gitignore vendored
View File

@@ -175,3 +175,7 @@ cython_debug/
.pypirc
/individual_images
*.xlsx
*.csv
*.snirf
*.json

View File

@@ -27,7 +27,7 @@ This only applies for the first time you attempt to run FLARES. Subsequent times
Due to the cost of a code signing certificate, the application is not digitally signed. Once the application is extracted and attempted to be launched for the first time you will get a popup stating:
"Windows protected your PC - Microsoft Defender SmartScreen prevented an unrecognized app from starting. Running this app might put your PC at risk.", with the options of" More info" or "Don't run".
"Windows protected your PC - Microsoft Defender SmartScreen prevented an unrecognized app from starting. Running this app might put your PC at risk.", with the options of "More info" or "Don't run".
The solution around this is to click "More info" and then select "Run anyway".
@@ -41,4 +41,4 @@ There are no conditions for Linux users at this time.
FLARES is distributed under the GPL-3.0 license.
Copyright (C) 2025 Tyler de Zeeuw
Copyright (C) 2025-2026 Tyler de Zeeuw

View File

@@ -1,7 +1,39 @@
# Version 1.2.2
- Added 'Update events in snirf file (BLAZES)...' and renamed 'Update events in snirf file...' to 'Update events in snirf file (BORIS)...'
- The BLAZES option will assign events that are exported directly from the software [BLAZES](https://git.research.dezeeuw.ca/tyler/blazes)
- Moved the updating logic to a seperate file for better reusability and generalization
- Fixed 'Toggle Status Bar' having no effect on the visibility of the status bar
- Fixed a bug when updating optode positions that would prevent .txt files from being selected. Fixes [Issue 54](https://git.research.dezeeuw.ca/tyler/flares/issues/54)
- Fixed a missing dependency in the standalone application when attempting to use an .xlsx file to update optode positions
# Version 1.2.1
- Added a requirements.txt file to ensure compatibility
- Added new options 'Missing Events Bypass' and 'Analysis Clearing Bypass' to the Preferences Menu
- Missing Events Bypass allows comparing events in the Group Viewers even if not all participants in the group have the event present. Fixes [Issue 28](https://git.research.dezeeuw.ca/tyler/flares/issues/28)
- Clicking Process after an analysis has been performed will now clear the existing analysis by default with a popup warning that the analysis will be cleared
- Analysis Clearing Bypass will prevent the popup and will not clear the existing analysis data. Fixes [Issue 41](https://git.research.dezeeuw.ca/tyler/flares/issues/41)
- Clicking 'Clear' should now actually properly clear all data. Hopefully fixes [Issue 9](https://git.research.dezeeuw.ca/tyler/flares/issues/9) for good
- Setting SHORT_CHANNEL to False will now grey out SHORT_CHANNEL_REGRESSION, as it is impossible to regress what does not exist. Sets SHORT_CHANNEL_REGRESSION to False under the hood when it is greyed out regardless of what is displayed. Fixes [Issue 47](https://git.research.dezeeuw.ca/tyler/flares/issues/47)
- Projects can now be saves if files have different parent folders. Fixes [Issue 48](https://git.research.dezeeuw.ca/tyler/flares/issues/48)
- It is no longer possible to attempt a save before any data has been processed. A popup will now display if a save is attempted with nothing to save
- Fixed a bug where LONG_CHANNEL_THRESH was not being applied in the processing steps
- Added a new option in the Analysis window for Group Functional Connectivity. Implements [Issue 50](https://git.research.dezeeuw.ca/tyler/flares/issues/50)
- Group Functional connectivity is still in development and the results should currently be taken with a grain of salt
- A warning is displayed when entering the Group Functional Connectivity Viewer disclosing this
- Fixed a bug when updating optode positions that would prevent .txt files from being selected. Fixes [Issue 54](https://git.research.dezeeuw.ca/tyler/flares/issues/54)
- Fixed a bug where the secondary download server would never get contacted if the primary failed
- Automatic downloads will now ignore prerelease versions. Fixes [Issue 52](https://git.research.dezeeuw.ca/tyler/flares/issues/52)
# Version 1.2.0
- This is a save-breaking release due to a new save file format. Please update your project files to ensure compatibility. Fixes [Issue 30](https://git.research.dezeeuw.ca/tyler/flares/issues/30)
- Added new parameters to the right side of the screen
- These parameters include SHOW_OPTODE_NAMES, SECONDS_TO_STRIP_HR, MAX_LOW_HR, MAX_HIGH_HR, SMOOTHING_WINDOW_HR, HEART_RATE_WINDOW, BAD_CHANNELS_HANDLING, MAX_DIST, MIN_NEIGHBORS, L_TRANS_BANDWIDTH, H_TRANS_BANDWIDTH, RESAMPLE, RESAMPLE_FREQ, STIM_DUR, HRF_MODEL, HIGH_PASS, DRIFT_ORDER, FIR_DELAYS, MIN_ONSET, OVERSAMPLING, SHORT_CHANNEL_REGRESSION, NOISE_MODEL, BINS, and VERBOSITY.
- Certain parameters now have dependencies on other parameters and will now grey out if they are not used
- All the new parameters have default values matching the underlying values in version 1.1.7
- The order of the parameters have changed to match the order that the code runs when the Process button is clicked
- Moved TIME_WINDOW_START and TIME_WINDOW_END to the 'Other' category
@@ -15,6 +47,20 @@
- Fixed the User Guide window to properly display information about the 24 stages and added a link to the Git wiki page
- MAX_WORKERS should now properly repect the value set
- Added a new CSV export option to be used by other applications
- Added support for updating optode positions directly from an .xlsx file from a Polhemius system
- Fixed an issue where the dropdowns in the Viewer windows would immediately open and close when using a trackpad
- glover and spm hrf models now function as intended without crashing. Currently, group analysis is still only supported by fir. Fixes [Issue 8](https://git.research.dezeeuw.ca/tyler/flares/issues/8)
- Clicking 'Clear' should now properly clear all data. Fixes [Issue 9](https://git.research.dezeeuw.ca/tyler/flares/issues/9)
- Revamped the fold channels viewer to not hang the application and to better process multiple participants at once. Fixes [Issue 34](https://git.research.dezeeuw.ca/tyler/flares/issues/34), [Issue 31](https://git.research.dezeeuw.ca/tyler/flares/issues/31)
- Added a Preferences menu to the navigation bar
- Two preferences have been added allowing to bypass the warning of 2D data detected and save files being from previous, potentially breaking versions
- Fixed a typo when saving a CSV that stated a SNIRF was being saved
- Loading a save file now properly restores AGE, GENDER, and GROUP. Fixes [Issue 40](https://git.research.dezeeuw.ca/tyler/flares/issues/40)
- Saving a project now no longer makes the main window go not responding. Fixes [Issue 43](https://git.research.dezeeuw.ca/tyler/flares/issues/43)
- Memory usage should no longer grow when generating lots of images multiple times. Fixes [Issue 36](https://git.research.dezeeuw.ca/tyler/flares/issues/36)
- Added a new option in the Analysis window for Functional Connectivity
- Functional connectivity is still in development and the results should currently be taken with a grain of salt
- A warning is displayed when entering the Functional Connectivity Viewer disclosing this
# Version 1.1.7
@@ -105,7 +151,7 @@
- Added a group option when clicking on a participant's file
- If no group is specified, the participant will be added to the "Default" group
- Added option to update the optode positions in a snirf file from the Options menu (F6)
- Fixed [Issue 3](https://git.research.dezeeuw.ca/tyler/flares/issues/3), [Issue 4](https://git.research.dezeeuw.ca/tyler/flares/issues/4), [Issue 17](https://git.research.dezeeuw.ca/tyler/flares/issues/17), [Issue 21](https://git.research.dezeeuw.ca/tyler/flares/issues/21), [Issue 22](https://git.research.dezeeuw.ca/tyler/flares/issues/22)
- Fixed [Issue 3](https://git.research.dezeeuw.ca/tyler/flares/issues/3), [Issue 5](https://git.research.dezeeuw.ca/tyler/flares/issues/5), [Issue 17](https://git.research.dezeeuw.ca/tyler/flares/issues/17), [Issue 21](https://git.research.dezeeuw.ca/tyler/flares/issues/21), [Issue 22](https://git.research.dezeeuw.ca/tyler/flares/issues/22)
# Version 1.0.1

543
flares.py
View File

@@ -47,9 +47,9 @@ from nilearn.glm.regression import OLSModel
import statsmodels.formula.api as smf # type: ignore
from statsmodels.stats.multitest import multipletests
from scipy import stats
from scipy.spatial.distance import cdist
from scipy.signal import welch, butter, filtfilt # type: ignore
from scipy.stats import pearsonr, zscore, t
import pywt # type: ignore
import neurokit2 as nk # type: ignore
@@ -58,6 +58,7 @@ import neurokit2 as nk # type: ignore
import pyvistaqt # type: ignore
import vtkmodules.util.data_model
import vtkmodules.util.execution_model
import xlrd
# External library imports for mne
from mne import (
@@ -90,6 +91,9 @@ from mne_nirs.io.fold import fold_channel_specificity # type: ignore
from mne_nirs.preprocessing import peak_power # type: ignore
from mne_nirs.statistics._glm_level_first import RegressionResults # type: ignore
from mne_connectivity.viz import plot_connectivity_circle
from mne_connectivity import envelope_correlation, spectral_connectivity_epochs, spectral_connectivity_time
# Needs to be set for mne
os.environ["SUBJECTS_DIR"] = str(data_path()) + "/subjects" # type: ignore
@@ -123,8 +127,6 @@ SECONDS_TO_KEEP: float
OPTODE_PLACEMENT: bool
SHOW_OPTODE_NAMES: bool
HEART_RATE: bool
SHORT_CHANNEL: bool
SHORT_CHANNEL_THRESH: float
LONG_CHANNEL_THRESH: float
@@ -189,9 +191,9 @@ TIME_WINDOW_END: int
MAX_WORKERS: int
VERBOSITY: bool
AGE = 25 # Assume 25 if not set from the GUI. This will result in a reasonable PPF
GENDER = ""
GROUP = "Default"
AGE: int = 25 # Assume 25 if not set from the GUI. This will result in a reasonable PPF
GENDER: str = ""
GROUP: str = "Default"
# These are parameters that are required for the analysis
REQUIRED_KEYS: dict[str, Any] = {
@@ -928,11 +930,12 @@ def interpolate_fNIRS_bads_weighted_average(raw, max_dist=0.03, min_neighbors=2)
raw.info['bads'] = [ch for ch in raw.info['bads'] if ch not in bad_ch_to_remove]
print("\nInterpolation complete.\n")
print("Bads cleared:", raw.info['bads'])
raw.info['bads'] = []
for ch in raw.info['bads']:
print(f"Channel {ch} still marked as bad.")
print("Bads cleared:", raw.info['bads'])
fig_raw_after = raw.plot(duration=raw.times[-1], n_channels=raw.info['nchan'], title="After interpolation", show=False)
return raw, fig_raw_after
@@ -1333,7 +1336,7 @@ def make_design_matrix(raw_haemo, short_chans):
drift_model=DRIFT_MODEL,
high_pass=HIGH_PASS,
drift_order=DRIFT_ORDER,
fir_delays=range(15),
fir_delays=FIR_DELAYS,
add_regs=short_chans.get_data().T,
add_reg_names=short_chans.ch_names,
min_onset=MIN_ONSET,
@@ -1347,7 +1350,7 @@ def make_design_matrix(raw_haemo, short_chans):
drift_model=DRIFT_MODEL,
high_pass=HIGH_PASS,
drift_order=DRIFT_ORDER,
fir_delays=range(15),
fir_delays=FIR_DELAYS,
min_onset=MIN_ONSET,
oversampling=OVERSAMPLING
)
@@ -1659,8 +1662,11 @@ def fold_channels(raw: BaseRaw) -> None:
landmark_color_map = {landmark: colors[i % len(colors)] for i, landmark in enumerate(landmarks)}
# Iterate over each channel
print(len(hbo_channel_names))
for idx, channel_name in enumerate(hbo_channel_names):
print(idx, channel_name)
# Run the fOLD on the selected channel
channel_data = raw.copy().pick(picks=channel_name) # type: ignore
@@ -1703,6 +1709,7 @@ def fold_channels(raw: BaseRaw) -> None:
landmark_specificity_data = []
# TODO: Fix this
if True:
handles = [
@@ -1725,8 +1732,9 @@ def fold_channels(raw: BaseRaw) -> None:
for ax in axes[len(hbo_channel_names):]:
ax.axis('off')
plt.show()
return fig, legend_fig
#plt.show()
fig_dict = {"main": fig, "legend": legend_fig}
return convert_fig_dict_to_png_bytes(fig_dict)
@@ -2246,8 +2254,14 @@ def brain_3d_visualization(raw_haemo, df_cha, selected_event, t_or_theta: Litera
# Get all activity conditions
for cond in [f'{selected_event}']:
if True:
ch_summary = df_cha.query(f"Condition.str.startswith('{cond}_delay_') and Chroma == 'hbo'", engine='python') # type: ignore
ch_summary = df_cha.query(f"Condition.str.startswith('{cond}_delay_') and Chroma == 'hbo'", engine='python') # type: ignore
print(ch_summary)
if ch_summary.empty:
#not fir model
print("No data found for this condition.")
ch_summary = df_cha.query(f"Condition in [@cond] and Chroma == 'hbo'", engine='python')
# Use ordinary least squares (OLS) if only one participant
# TODO: Fix.
@@ -2269,6 +2283,9 @@ def brain_3d_visualization(raw_haemo, df_cha, selected_event, t_or_theta: Litera
valid_channels = ch_summary["ch_name"].unique().tolist() # type: ignore
raw_for_plot = raw_haemo.copy().pick(picks=valid_channels) # type: ignore
print(f"DEBUG: Model DF rows: {len(model_df)}")
print(f"DEBUG: Raw channels: {len(raw_for_plot.ch_names)}")
brain = plot_3d_evoked_array(raw_for_plot.pick(picks="hbo"), model_df, view="dorsal", distance=0.02, colorbar=True, clim=clim, mode="weighted", size=(800, 700)) # type: ignore
if show_optodes == 'all' or show_optodes == 'sensors':
@@ -2804,7 +2821,7 @@ def run_second_level_analysis(df_contrasts, raw, p, bounds):
result = model.fit(Y)
t_val = result.t(0).item()
p_val = 2 * stats.t.sf(np.abs(t_val), df=result.df_model)
p_val = 2 * t.sf(np.abs(t_val), df=result.df_model)
mean_beta = np.mean(Y)
group_results.append({
@@ -3299,7 +3316,7 @@ def hr_calc(raw):
# --- Parameters for PSD ---
desired_bin_hz = 0.1
nperseg = int(sfreq / desired_bin_hz)
hr_range = (30, 180)
hr_range = (30, 180) # TODO: SHould this not use the user defined values?
# --- Function to find strongest local peak ---
def find_hr_from_psd(ch_data):
@@ -3343,6 +3360,7 @@ def process_participant(file_path, progress_callback=None):
logger.info("Step 1 Completed.")
# Step 2: Trimming
# TODO: Clean this into a method
if TRIM:
if hasattr(raw, 'annotations') and len(raw.annotations) > 0:
# Get time of first event
@@ -3385,7 +3403,7 @@ def process_participant(file_path, progress_callback=None):
fig_individual["short"] = fig_short_chans
else:
short_chans = None
get_long_channels(raw, min_dist=SHORT_CHANNEL_THRESH, max_dist=LONG_CHANNEL_THRESH) # Don't update the existing raw
raw = get_long_channels(raw, min_dist=0, max_dist=LONG_CHANNEL_THRESH) # keep both short channels and all channels up to the threshold length
if progress_callback: progress_callback(4)
logger.info("Step 4 Completed.")
@@ -3527,16 +3545,18 @@ def process_participant(file_path, progress_callback=None):
logger.info("19")
# Step 20: Generate GLM Results
fig_glm_result = plot_glm_results(file_path, raw_haemo, glm_est, design_matrix)
for name, fig in fig_glm_result:
fig_individual[f"GLM {name}"] = fig
if "derivative" not in HRF_MODEL.lower():
fig_glm_result = plot_glm_results(file_path, raw_haemo, glm_est, design_matrix)
for name, fig in fig_glm_result:
fig_individual[f"GLM {name}"] = fig
if progress_callback: progress_callback(20)
logger.info("20")
# Step 21: Generate Channel Significance
fig_significance = individual_significance(raw_haemo, glm_est)
for name, fig in fig_significance:
fig_individual[f"Significance {name}"] = fig
if HRF_MODEL == "fir":
fig_significance = individual_significance(raw_haemo, glm_est)
for name, fig in fig_significance:
fig_individual[f"Significance {name}"] = fig
if progress_callback: progress_callback(21)
logger.info("21")
@@ -3568,30 +3588,31 @@ def process_participant(file_path, progress_callback=None):
[(column, contrast_matrix[i]) for i, column in enumerate(design_matrix.columns)]
)
all_delay_cols = [col for col in design_matrix.columns if "_delay_" in col]
all_conditions = sorted({col.split("_delay_")[0] for col in all_delay_cols})
if HRF_MODEL == "fir":
all_delay_cols = [col for col in design_matrix.columns if "_delay_" in col]
all_conditions = sorted({col.split("_delay_")[0] for col in all_delay_cols})
if not all_conditions:
raise ValueError("No FIR regressors found in the design matrix.")
if not all_conditions:
raise ValueError("No FIR regressors found in the design matrix.")
# Build contrast vectors for each condition
contrast_dict = {}
# Build contrast vectors for each condition
contrast_dict = {}
for condition in all_conditions:
delay_cols = [
col for col in all_delay_cols
if col.startswith(f"{condition}_delay_") and
TIME_WINDOW_START <= int(col.split("_delay_")[-1]) <= TIME_WINDOW_END
]
for condition in all_conditions:
delay_cols = [
col for col in all_delay_cols
if col.startswith(f"{condition}_delay_") and
TIME_WINDOW_START <= int(col.split("_delay_")[-1]) <= TIME_WINDOW_END
]
if not delay_cols:
continue # skip if no columns found (shouldn't happen?)
if not delay_cols:
continue # skip if no columns found (shouldn't happen?)
# Average across all delay regressors for this condition
contrast_vector = np.sum([basic_conts[col] for col in delay_cols], axis=0)
contrast_vector /= len(delay_cols)
# Average across all delay regressors for this condition
contrast_vector = np.sum([basic_conts[col] for col in delay_cols], axis=0)
contrast_vector /= len(delay_cols)
contrast_dict[condition] = contrast_vector
contrast_dict[condition] = contrast_vector
if progress_callback: progress_callback(22)
logger.info("22")
@@ -3599,11 +3620,13 @@ def process_participant(file_path, progress_callback=None):
# Step 23: Compute Contrast Results
contrast_results = {}
for cond, contrast_vector in contrast_dict.items():
contrast = glm_est.compute_contrast(contrast_vector) # type: ignore
df = contrast.to_dataframe()
df["ID"] = file_path
contrast_results[cond] = df
if HRF_MODEL == "fir":
for cond, contrast_vector in contrast_dict.items():
contrast = glm_est.compute_contrast(contrast_vector) # type: ignore
df = contrast.to_dataframe()
df["ID"] = file_path
contrast_results[cond] = df
cha["ID"] = file_path
@@ -3618,7 +3641,17 @@ def process_participant(file_path, progress_callback=None):
if progress_callback: progress_callback(25)
logger.info("25")
return raw_haemo, epochs, fig_bytes, cha, contrast_results, df_ind, design_matrix, AGE, GENDER, GROUP, True
# TODO: Tidy up
# Extract the parameters this file was ran with. No need to return age, gender, group?
config = {
k: globals()[k]
for k in __annotations__
if k in globals() and k != "REQUIRED_KEYS"
}
print(config)
return raw_haemo, config, epochs, fig_bytes, cha, contrast_results, df_ind, design_matrix, True
def sanitize_paths_for_pickle(raw_haemo, epochs):
@@ -3629,3 +3662,421 @@ def sanitize_paths_for_pickle(raw_haemo, epochs):
# Fix epochs._raw._filenames
if hasattr(epochs, '_raw') and hasattr(epochs._raw, '_filenames'):
epochs._raw._filenames = [str(p) for p in epochs._raw._filenames]
def functional_connectivity_spectral_epochs(epochs, n_lines, vmin):
# will crash without this load
epochs.load_data()
hbo_epochs = epochs.copy().pick(picks="hbo")
data = hbo_epochs.get_data()
names = hbo_epochs.ch_names
sfreq = hbo_epochs.info["sfreq"]
con = spectral_connectivity_epochs(
data,
method=["coh", "plv"],
mode="multitaper",
sfreq=sfreq,
fmin=0.04,
fmax=0.2,
faverage=True,
verbose=True
)
con_coh, con_plv = con
coh = con_coh.get_data(output="dense").squeeze()
plv = con_plv.get_data(output="dense").squeeze()
np.fill_diagonal(coh, 0)
np.fill_diagonal(plv, 0)
plot_connectivity_circle(
coh,
names,
title="fNIRS Functional Connectivity (HbO - Coherence)",
n_lines=n_lines,
vmin=vmin
)
def functional_connectivity_spectral_time(epochs, n_lines, vmin):
# will crash without this load
epochs.load_data()
hbo_epochs = epochs.copy().pick(picks="hbo")
data = hbo_epochs.get_data()
names = hbo_epochs.ch_names
sfreq = hbo_epochs.info["sfreq"]
freqs = np.linspace(0.04, 0.2, 10)
n_cycles = freqs * 2
con = spectral_connectivity_time(
data,
freqs=freqs,
method=["coh", "plv"],
mode="multitaper",
sfreq=sfreq,
fmin=0.04,
fmax=0.2,
n_cycles=n_cycles,
faverage=True,
verbose=True
)
con_coh, con_plv = con
coh = con_coh.get_data(output="dense").squeeze()
plv = con_plv.get_data(output="dense").squeeze()
np.fill_diagonal(coh, 0)
np.fill_diagonal(plv, 0)
plot_connectivity_circle(
coh,
names,
title="fNIRS Functional Connectivity (HbO - Coherence)",
n_lines=n_lines,
vmin=vmin
)
def functional_connectivity_envelope(epochs, n_lines, vmin):
# will crash without this load
epochs.load_data()
hbo_epochs = epochs.copy().pick(picks="hbo")
data = hbo_epochs.get_data()
env = envelope_correlation(
data,
orthogonalize=False,
absolute=True
)
env_data = env.get_data(output="dense")
env_corr = env_data.mean(axis=0)
env_corr = np.squeeze(env_corr)
np.fill_diagonal(env_corr, 0)
plot_connectivity_circle(
env_corr,
hbo_epochs.ch_names,
title="fNIRS HbO Envelope Correlation (Task Connectivity)",
n_lines=n_lines,
vmin=vmin
)
def functional_connectivity_betas(raw_hbo, n_lines, vmin, event_name=None):
raw_hbo = raw_hbo.copy().pick(picks="hbo")
onsets = raw_hbo.annotations.onset
# CRITICAL: Update the Raw object's annotations so the GLM sees unique events
ann = raw_hbo.annotations
new_desc = []
for i, desc in enumerate(ann.description):
new_desc.append(f"{desc}__trial_{i:03d}")
ann.description = np.array(new_desc)
# shoudl use user defiuned!!!!
design_matrix = make_first_level_design_matrix(
raw=raw_hbo,
hrf_model='fir',
fir_delays=np.arange(0, 12, 1),
drift_model='cosine',
drift_order=1
)
# 3. Run GLM & Extract Betas
glm_results = run_glm(raw_hbo, design_matrix)
betas = np.array(glm_results.theta())
reg_names = list(design_matrix.columns)
n_channels = betas.shape[0]
# ------------------------------------------------------------------
# 5. Find unique trial tags (optionally filtered by event)
# ------------------------------------------------------------------
trial_tags = sorted({
col.split("_delay")[0]
for col in reg_names
if (
("__trial_" in col)
and (event_name is None or col.startswith(event_name + "__"))
)
})
if len(trial_tags) == 0:
raise ValueError(f"No trials found for event_name={event_name}")
# ------------------------------------------------------------------
# 6. Build beta series (average across FIR delays per trial)
# ------------------------------------------------------------------
beta_series = np.zeros((n_channels, len(trial_tags)))
for t, tag in enumerate(trial_tags):
idx = [
i for i, col in enumerate(reg_names)
if col.startswith(f"{tag}_delay")
]
beta_series[:, t] = np.mean(betas[:, idx], axis=1).flatten()
# n_channels, n_trials = betas.shape[0], len(onsets)
# beta_series = np.zeros((n_channels, n_trials))
# for t in range(n_trials):
# trial_indices = [i for i, col in enumerate(reg_names) if col.startswith(f"trial_{t:03d}_delay")]
# if trial_indices:
# beta_series[:, t] = np.mean(betas[:, trial_indices], axis=1).flatten()
# Normalize each channel so they are on the same scale
# Without this, everything is connected to everything. Apparently this is a big issue in fNIRS?
beta_series = zscore(beta_series, axis=1)
global_signal = np.mean(beta_series, axis=0)
beta_series_clean = np.zeros_like(beta_series)
for i in range(n_channels):
slope, _ = np.polyfit(global_signal, beta_series[i, :], 1)
beta_series_clean[i, :] = beta_series[i, :] - (slope * global_signal)
# 4. Correlation & Strict Filtering
corr_matrix = np.zeros((n_channels, n_channels))
p_matrix = np.ones((n_channels, n_channels))
for i in range(n_channels):
for j in range(i + 1, n_channels):
r, p = pearsonr(beta_series_clean[i, :], beta_series_clean[j, :])
corr_matrix[i, j] = corr_matrix[j, i] = r
p_matrix[i, j] = p_matrix[j, i] = p
# 5. High-Bar Thresholding
reject, _ = multipletests(p_matrix[np.triu_indices(n_channels, k=1)], method='fdr_bh', alpha=0.05)[:2]
sig_corr_matrix = np.zeros_like(corr_matrix)
triu = np.triu_indices(n_channels, k=1)
for idx, is_sig in enumerate(reject):
r_val = corr_matrix[triu[0][idx], triu[1][idx]]
# Only keep the absolute strongest connections
if is_sig and abs(r_val) > 0.7:
sig_corr_matrix[triu[0][idx], triu[1][idx]] = r_val
sig_corr_matrix[triu[1][idx], triu[0][idx]] = r_val
# 6. Plot
plot_connectivity_circle(
sig_corr_matrix,
raw_hbo.ch_names,
title="Strictly Filtered Connectivity (TDDR + GSR + Z-Score)",
n_lines=None,
vmin=0.7,
vmax=1.0,
colormap='hot' # Use 'hot' to make positive connections pop
)
def get_single_subject_beta_corr(raw_hbo, event_name=None, config=None):
"""Processes one participant and returns their correlation matrix."""
raw_hbo = raw_hbo.copy().pick(picks="hbo")
ann = raw_hbo.annotations
# Rename for trial-level GLM
new_desc = [f"{desc}__trial_{i:03d}" for i, desc in enumerate(ann.description)]
ann.description = np.array(new_desc)
if config == None:
print("no config")
design_matrix = make_first_level_design_matrix(
raw=raw_hbo, hrf_model='fir',
fir_delays=np.arange(0, 12, 1),
drift_model='cosine', drift_order=1
)
else:
print("config")
if config.get("SHORT_CHANNEL_REGRESSION") == True:
short_chans = get_short_channels(raw_hbo, max_dist=config.get("SHORT_CHANNEL_THRESH"))
design_matrix = make_first_level_design_matrix(
raw=raw_hbo,
stim_dur=config.get("STIM_DUR"),
hrf_model=config.get("HRF_MODEL"),
drift_model=config.get("DRIFT_MODEL"),
high_pass=config.get("HIGH_PASS"),
drift_order=config.get("DRIFT_ORDER"),
fir_delays=config.get("FIR_DELAYS"),
add_regs=short_chans.get_data().T,
add_reg_names=short_chans.ch_names,
min_onset=config.get("MIN_ONSET"),
oversampling=config.get("OVERSAMPLING")
)
print("yep")
else:
design_matrix = make_first_level_design_matrix(
raw=raw_hbo,
stim_dur=config.get("STIM_DUR"),
hrf_model=config.get("HRF_MODEL"),
drift_model=config.get("DRIFT_MODEL"),
high_pass=config.get("HIGH_PASS"),
drift_order=config.get("DRIFT_ORDER"),
fir_delays=config.get("FIR_DELAYS"),
min_onset=config.get("MIN_ONSET"),
oversampling=config.get("OVERSAMPLING")
)
glm_results = run_glm(raw_hbo, design_matrix)
betas = np.array(glm_results.theta())
reg_names = list(design_matrix.columns)
n_channels = betas.shape[0]
# Filter trials by event name
trial_tags = sorted({
col.split("_delay")[0] for col in reg_names
if "__trial_" in col and (event_name is None or col.startswith(event_name + "__"))
})
if not trial_tags:
return None, None
# Build Beta Series
beta_series = np.zeros((n_channels, len(trial_tags)))
for t, tag in enumerate(trial_tags):
idx = [i for i, col in enumerate(reg_names) if col.startswith(f"{tag}_delay")]
beta_series[:, t] = np.mean(betas[:, idx], axis=1).flatten()
#beta_series[:, t] = np.max(betas[:, idx], axis=1).flatten() #TODO: Figure out which one to use
# Z-score and GSR (Global Signal Regression)
beta_series = zscore(beta_series, axis=1)
global_signal = np.mean(beta_series, axis=0)
for i in range(n_channels):
slope, _ = np.polyfit(global_signal, beta_series[i, :], 1)
beta_series[i, :] -= (slope * global_signal)
# Correlation Matrix
corr_matrix = np.corrcoef(beta_series)
return corr_matrix, raw_hbo.ch_names
def run_group_functional_connectivity(haemo_dict, config_dict, selected_paths, event_name, n_lines, vmin):
"""Aggregates multiple participants and triggers the plot."""
all_z_matrices = []
common_names = None
for path in selected_paths:
raw = haemo_dict.get(path)
config = config_dict.get(path)
if raw is None: continue
print(config)
corr, names = get_single_subject_beta_corr(raw, event_name, config)
if corr is not None:
# Fisher Z-transform for averaging
z_mat = np.arctanh(np.clip(corr, -0.99, 0.99))
all_z_matrices.append(z_mat)
common_names = names
from scipy.stats import ttest_1samp
# 1. Convert list to 3D array: (Participants, Channels, Channels)
group_z_data = np.array(all_z_matrices)
print("1")
# 2. Perform a T-Test across the participant dimension (axis 0)
# We test if the mean Z-score is different from 0
# C:\Users\tyler\Desktop\research\.venv\Lib\site-packages\scipy\stats\_axis_nan_policy.py:611: RuntimeWarning: Precision loss occurred in moment calculation due to catastrophic cancellation. This occurs when the data are nearly identical. Results may be unreliable.
# res = hypotest_fun_out(*samples, axis=axis, **kwds)
print("--- Variance Check ---")
# ADD THIS LINE: Define n_channels based on the data shape
# group_z_data.shape is (n_participants, n_channels, n_channels)
n_channels = group_z_data.shape[1]
variance_matrix = np.var(group_z_data, axis=0)
# Find where variance is exactly 0 (or very close to it)
zero_var_indices = np.where(variance_matrix < 1e-15)
coords = list(zip(zero_var_indices[0], zero_var_indices[1]))
diag_count = 0
non_diag_pairs = []
for r, c in coords:
if r == c:
diag_count += 1
else:
non_diag_pairs.append((r, c))
print(f"Total pairs with zero variance: {len(coords)}")
print(f"Identical diagonals: {diag_count}/{n_channels}")
if non_diag_pairs:
print(f"Warning: {len(non_diag_pairs)} non-diagonal pairs have zero variance!")
for r, c in non_diag_pairs[:10]: # Print first 10
print(f" - Pair: Channel {r} & Channel {c}")
else:
print("Clean! Zero variance only exists on the diagonals.")
print("----------------------")
t_stats, p_values = ttest_1samp(group_z_data, popmean=0, axis=0)
print("2")
# 3. Multiple Comparisons Correction (FDR)
# We only care about the upper triangle (unique connections)
n_channels = p_values.shape[0]
triu_indices = np.triu_indices(n_channels, k=1)
flat_p = p_values[triu_indices]
reject, corrected_p = multipletests(flat_p, method='fdr_bh', alpha=0.05)[:2]
# 4. Create the final "Significant" Matrix
avg_r = np.tanh(np.mean(group_z_data, axis=0))
sig_avg_r = np.zeros_like(avg_r)
# Only keep connections that are Significant AND above your VMIN (r-threshold)
for idx, is_sig in enumerate(reject):
row, col = triu_indices[0][idx], triu_indices[1][idx]
r_val = avg_r[row, col]
if is_sig and abs(r_val) >= vmin:
sig_avg_r[row, col] = sig_avg_r[col, row] = r_val
# 5. Plot the significant results
# if not all_z_matrices:
# return
# # Average and convert back to R
# avg_z = np.mean(all_z_matrices, axis=0)
# avg_r = np.tanh(avg_z)
# # Thresholding
# avg_r[np.abs(avg_r) < vmin] = 0
plot_connectivity_circle(
sig_avg_r, common_names, n_lines=n_lines,
title=f"Group Connectivity: {event_name if event_name else 'All Events'}",
vmin=vmin, vmax=1.0, colormap='hot'
)

View File

@@ -18,11 +18,12 @@ import subprocess
from datetime import datetime
PLATFORM_NAME = platform.system().lower()
APP_NAME = "flares"
if PLATFORM_NAME == 'darwin':
LOG_FILE = os.path.join(os.path.dirname(sys.executable), "../../../flares_updater.log")
LOG_FILE = os.path.join(os.path.dirname(sys.executable), f"../../../{APP_NAME}_updater.log")
else:
LOG_FILE = os.path.join(os.getcwd(), "flares_updater.log")
LOG_FILE = os.path.join(os.getcwd(), f"{APP_NAME}_updater.log")
def log(msg):
@@ -147,7 +148,7 @@ def copy_update_files_darwin(src_folder, dest_folder, updater_name):
def remove_quarantine(app_path):
script = f'''
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "FLARES needs privileges to finish the update. (1/2)"
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{APP_NAME} needs privileges to finish the update. (1/2)"
'''
try:
subprocess.run(['osascript', '-e', script], check=True)
@@ -162,7 +163,7 @@ def main():
log(f"[Updater] sys.argv: {sys.argv}")
if len(sys.argv) != 3:
log("Invalid arguments. Usage: flares_updater <update_folder> <main_app_executable>")
log(f"Invalid arguments. Usage: {APP_NAME}_updater <update_folder> <main_app_executable>")
sys.exit(1)
update_folder = sys.argv[1]
@@ -215,7 +216,7 @@ def main():
if PLATFORM_NAME == 'darwin':
log(f'Attempting to delete {ppparent_dir}')
delete_path(ppparent_dir)
update_folder = os.path.join(sys.argv[1], "flares-darwin")
update_folder = os.path.join(sys.argv[1], f"{APP_NAME}-darwin")
copy_update_files_darwin(update_folder, pppparent_dir, updater_name)
else:

3488
main.py

File diff suppressed because it is too large Load Diff

BIN
requirements.txt Normal file

Binary file not shown.

539
updater.py Normal file
View File

@@ -0,0 +1,539 @@
"""
Filename: updater.py
Description: Generic updater file
Author: Tyler de Zeeuw
License: GPL-3.0
"""
# Built-in imports
import os
import re
import sys
import time
import shlex
import shutil
import zipfile
import traceback
import subprocess
# External library imports
import psutil
import requests
from PySide6.QtWidgets import QMessageBox
from PySide6.QtCore import QThread, Signal, QObject
class UpdateDownloadThread(QThread):
"""
Thread that downloads and extracts an update package and emits a signal on completion or error.
Args:
download_url (str): URL of the update zip file to download.
latest_version (str): Version string of the latest update.
"""
update_ready = Signal(str, str)
error_occurred = Signal(str)
def __init__(self, download_url, latest_version, platform_name, app_name):
super().__init__()
self.download_url = download_url
self.latest_version = latest_version
self.platform_name = platform_name
self.app_name = app_name
def run(self):
try:
local_filename = os.path.basename(self.download_url)
if self.platform_name == 'darwin':
tmp_dir = f'/tmp/{self.app_name}tempupdate'
os.makedirs(tmp_dir, exist_ok=True)
local_path = os.path.join(tmp_dir, local_filename)
else:
local_path = os.path.join(os.getcwd(), local_filename)
# Download the file
with requests.get(self.download_url, stream=True, timeout=15) as r:
r.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Extract folder name (remove .zip)
if self.platform_name == 'darwin':
extract_folder = os.path.splitext(local_filename)[0]
extract_path = os.path.join(tmp_dir, extract_folder)
else:
extract_folder = os.path.splitext(local_filename)[0]
extract_path = os.path.join(os.getcwd(), extract_folder)
# Create the folder if not exists
os.makedirs(extract_path, exist_ok=True)
# Extract the zip file contents
if self.platform_name == 'darwin':
subprocess.run(['ditto', '-xk', local_path, extract_path], check=True)
else:
with zipfile.ZipFile(local_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
# Remove the zip once extracted and emit a signal
os.remove(local_path)
self.update_ready.emit(self.latest_version, extract_path)
except Exception as e:
# Emit a signal signifying failure
self.error_occurred.emit(str(e))
class UpdateCheckThread(QThread):
"""
Thread that checks for updates by querying the API and emits a signal based on the result.
Signals:
download_requested(str, str): Emitted with (download_url, latest_version) when an update is available.
no_update_available(): Emitted when no update is found or current version is up to date.
error_occurred(str): Emitted with an error message if the update check fails.
"""
download_requested = Signal(str, str)
no_update_available = Signal()
error_occurred = Signal(str)
def __init__(self, api_url, api_url_sec, current_version, platform_name, app_name):
super().__init__()
self.api_url = api_url
self.api_url_sec = api_url_sec
self.current_version = current_version
self.platform_name = platform_name
self.app_name = app_name
def run(self):
# if not getattr(sys, 'frozen', False):
# self.error_occurred.emit("Application is not frozen (Development mode).")
# return
try:
latest_version, download_url = self.get_latest_release_for_platform()
if not latest_version:
self.no_update_available.emit()
return
if not download_url:
self.error_occurred.emit(f"No download available for platform '{self.platform_name}'")
return
if self.version_compare(latest_version, self.current_version) > 0:
self.download_requested.emit(download_url, latest_version)
else:
self.no_update_available.emit()
except Exception as e:
self.error_occurred.emit(f"Update check failed: {e}")
def version_compare(self, v1, v2):
def normalize(v): return [int(x) for x in v.split(".")]
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
def get_latest_release_for_platform(self):
urls = [self.api_url, self.api_url_sec]
for url in urls:
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
releases = response.json()
if not releases:
continue
latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None)
if not latest:
continue
tag = latest["tag_name"].lstrip("v")
for asset in latest.get("assets", []):
if self.platform_name in asset["name"].lower():
return tag, asset["browser_download_url"]
return tag, None
except (requests.RequestException, ValueError) as e:
continue
return None, None
class LocalPendingUpdateCheckThread(QThread):
"""
Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly.
Args:
current_version (str): Current application version.
platform_suffix (str): Platform-specific suffix to identify update folders.
"""
pending_update_found = Signal(str, str)
no_pending_update = Signal()
def __init__(self, current_version, platform_suffix, platform_name, app_name):
super().__init__()
self.current_version = current_version
self.platform_suffix = platform_suffix
self.platform_name = platform_name
self.app_name = app_name
def version_compare(self, v1, v2):
def normalize(v): return [int(x) for x in v.split(".")]
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
def run(self):
if self.platform_name == 'darwin':
cwd = f'/tmp/{self.app_name}tempupdate'
else:
cwd = os.getcwd()
pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$")
found = False
try:
for item in os.listdir(cwd):
folder_path = os.path.join(cwd, item)
if os.path.isdir(folder_path) and item.endswith(self.platform_suffix):
match = pattern.match(item)
if match:
folder_version = match.group(1)
if self.version_compare(folder_version, self.current_version) > 0:
self.pending_update_found.emit(folder_version, folder_path)
found = True
break
except:
pass
if not found:
self.no_pending_update.emit()
class UpdateManager(QObject):
"""
Orchestrates the update process.
Main apps should instantiate this and call check_for_updates().
"""
def __init__(self, main_window, api_url, api_url_sec, current_version, platform_name, platform_suffix, app_name):
super().__init__()
self.parent = main_window
self.api_url = api_url
self.api_url_sec = api_url_sec
self.current_version = current_version
self.platform_name = platform_name
self.platform_suffix = platform_suffix
self.app_name = app_name
self.pending_update_version = None
self.pending_update_path = None
def manual_check_for_updates(self):
self.local_check_thread = LocalPendingUpdateCheckThread(self.current_version, self.platform_suffix, self.platform_name, self.app_name)
self.local_check_thread.pending_update_found.connect(self.on_pending_update_found)
self.local_check_thread.no_pending_update.connect(self.on_no_pending_update)
self.local_check_thread.start()
def on_pending_update_found(self, version, folder_path):
self.parent.statusBar().showMessage(f"Pending update found: version {version}")
self.pending_update_version = version
self.pending_update_path = folder_path
self.show_pending_update_popup()
def on_no_pending_update(self):
# No pending update found locally, start server check directly
self.parent.statusBar().showMessage("No pending local update found. Checking server...")
self.start_update_check_thread()
def show_pending_update_popup(self):
msg_box = QMessageBox(self.parent)
msg_box.setWindowTitle("Pending Update Found")
msg_box.setText(f"A previously downloaded update for {self.app_name.upper()} (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?")
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
msg_box.exec()
if msg_box.clickedButton() == install_now_button:
self.install_update(self.pending_update_path)
else:
self.parent.statusBar().showMessage("Pending update available. Install later.")
# After user dismisses, still check the server for new updates
self.start_update_check_thread()
def start_update_check_thread(self):
self.check_thread = UpdateCheckThread(self.api_url, self.api_url_sec, self.current_version, self.platform_name, self.app_name)
self.check_thread.download_requested.connect(self.on_server_update_requested)
self.check_thread.no_update_available.connect(self.on_server_no_update)
self.check_thread.error_occurred.connect(self.on_error)
self.check_thread.start()
def on_server_no_update(self):
self.parent.statusBar().showMessage("No new updates found on server.", 5000)
def on_server_update_requested(self, download_url, latest_version):
if self.pending_update_version:
cmp = self.version_compare(latest_version, self.pending_update_version)
if cmp > 0:
# Server version is newer than pending update
self.parent.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...")
try:
shutil.rmtree(self.pending_update_path)
self.parent.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}")
except Exception as e:
self.parent.statusBar().showMessage(f"Failed to delete old update folder: {e}")
# Clear pending update info so new download proceeds
self.pending_update_version = None
self.pending_update_path = None
# Download the new update
self.download_update(download_url, latest_version)
elif cmp == 0:
# Versions equal, no download needed
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.")
else:
# Server version older than pending? Unlikely but just keep pending update
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.")
else:
# No pending update, just download
self.download_update(download_url, latest_version)
def download_update(self, download_url, latest_version):
self.parent.statusBar().showMessage("Downloading update...")
self.download_thread = UpdateDownloadThread(download_url, latest_version, self.platform_name, self.app_name)
self.download_thread.update_ready.connect(self.on_update_ready)
self.download_thread.error_occurred.connect(self.on_error)
self.download_thread.start()
def on_update_ready(self, latest_version, extract_folder):
self.parent.statusBar().showMessage("Update downloaded and extracted.")
msg_box = QMessageBox(self.parent)
msg_box.setWindowTitle("Update Ready")
msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?")
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
msg_box.exec()
if msg_box.clickedButton() == install_now_button:
self.install_update(extract_folder)
else:
self.parent.statusBar().showMessage("Update ready. Install later.")
def install_update(self, extract_folder):
# Path to updater executable
if self.platform_name == 'windows':
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater.exe")
elif self.platform_name == 'darwin':
if getattr(sys, 'frozen', False):
updater_path = os.path.join(os.path.dirname(sys.executable), f"../../../{self.app_name}_updater.app")
else:
updater_path = os.path.join(os.getcwd(), f"../{self.app_name}_updater.app")
elif self.platform_name == 'linux':
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater")
else:
updater_path = os.getcwd()
if not os.path.exists(updater_path):
QMessageBox.critical(self.parent, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}")
return
# Launch updater with extracted folder path as argument
try:
# Pass current app's executable path for updater to relaunch
main_app_executable = os.path.abspath(sys.argv[0])
print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"')
if self.platform_name == 'darwin':
subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable])
else:
subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path))
# Close the current app so updater can replace files
sys.exit(0)
except Exception as e:
QMessageBox.critical(self.parent, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}")
def on_error(self, message):
# print(f"Error: {message}")
self.parent.statusBar().showMessage(f"Error occurred during update process. {message}")
def version_compare(self, v1, v2):
def normalize(v): return [int(x) for x in v.split(".")]
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
def wait_for_process_to_exit(process_name, timeout=10):
"""
Waits for a process with the specified name to exit within a timeout period.
Args:
process_name (str): Name (or part of the name) of the process to wait for.
timeout (int, optional): Maximum time to wait in seconds. Defaults to 10.
Returns:
bool: True if the process exited before the timeout, False otherwise.
"""
print(f"Waiting for {process_name} to exit...")
deadline = time.time() + timeout
while time.time() < deadline:
still_running = False
for proc in psutil.process_iter(['name']):
try:
if proc.info['name'] and process_name.lower() in proc.info['name'].lower():
still_running = True
print(f"Still running: {proc.info['name']} (PID: {proc.pid})")
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if not still_running:
print(f"{process_name} has exited.")
return True
time.sleep(0.5)
print(f"{process_name} did not exit in time.")
return False
def finish_update_if_needed(platform_name, app_name):
"""
Completes a pending application update if '--finish-update' is present in the command-line arguments.
"""
if "--finish-update" in sys.argv:
print("Finishing update...")
if platform_name == 'darwin':
app_dir = f'/tmp/{app_name}tempupdate'
else:
app_dir = os.getcwd()
# 1. Find update folder
update_folder = None
for entry in os.listdir(app_dir):
entry_path = os.path.join(app_dir, entry)
if os.path.isdir(entry_path) and entry.startswith(f"{app_name}-") and entry.endswith("-" + platform_name):
update_folder = os.path.join(app_dir, entry)
break
if update_folder is None:
print("No update folder found. Skipping update steps.")
return
if platform_name == 'darwin':
update_folder = os.path.join(update_folder, f"{app_name}-darwin")
# 2. Wait for updater to exit
print(f"Waiting for {app_name}_updater to exit...")
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] and f"{app_name}_updater" in proc.info['name'].lower():
try:
proc.wait(timeout=5)
except psutil.TimeoutExpired:
print(f"Force killing lingering {app_name}_updater")
proc.kill()
# 3. Replace the updater
if platform_name == 'windows':
new_updater = os.path.join(update_folder, f"{app_name}_updater.exe")
dest_updater = os.path.join(app_dir, f"{app_name}_updater.exe")
elif platform_name == 'darwin':
new_updater = os.path.join(update_folder, f"{app_name}_updater.app")
dest_updater = os.path.abspath(os.path.join(sys.executable, f"../../../../{app_name}_updater.app"))
elif platform_name == 'linux':
new_updater = os.path.join(update_folder, f"{app_name}_updater")
dest_updater = os.path.join(app_dir, f"{app_name}_updater")
else:
print("Unknown Platform")
new_updater = os.getcwd()
dest_updater = os.getcwd()
print(f"New updater is {new_updater}")
print(f"Dest updater is {dest_updater}")
print("Writable?", os.access(dest_updater, os.W_OK))
print("Executable path:", sys.executable)
print("Trying to copy:", new_updater, "->", dest_updater)
if os.path.exists(new_updater):
try:
if os.path.exists(dest_updater):
if platform_name == 'darwin':
try:
if os.path.isdir(dest_updater):
shutil.rmtree(dest_updater)
print(f"Deleted directory: {dest_updater}")
else:
os.remove(dest_updater)
print(f"Deleted file: {dest_updater}")
except Exception as e:
print(f"Error deleting {dest_updater}: {e}")
else:
os.remove(dest_updater)
if platform_name == 'darwin':
wait_for_process_to_exit(f"{app_name}_updater", timeout=10)
subprocess.check_call(["ditto", new_updater, dest_updater])
else:
shutil.copy2(new_updater, dest_updater)
if platform_name in ('linux', 'darwin'):
os.chmod(dest_updater, 0o755)
if platform_name == 'darwin':
remove_quarantine(dest_updater, app_name)
print(f"{app_name}_updater replaced.")
except Exception as e:
print(f"Failed to replace {app_name}_updater: {e}")
# 4. Delete the update folder
try:
if platform_name == 'darwin':
shutil.rmtree(app_dir)
else:
shutil.rmtree(update_folder)
except Exception as e:
print(f"Failed to delete update folder: {e}")
QMessageBox.information(None, "Update Complete", "The application has been successfully updated.")
sys.argv.remove("--finish-update")
def remove_quarantine(app_path, app_name):
"""
Removes the macOS quarantine attribute from the specified application path.
"""
script = f'''
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{app_name.upper()} needs privileges to finish the update. (2/2)"
'''
try:
subprocess.run(['osascript', '-e', script], check=True)
print("✅ Quarantine attribute removed.")
except subprocess.CalledProcessError as e:
print("❌ Failed to remove quarantine attribute.")
print(e)