19 Commits

Author SHA1 Message Date
1b5c31e4ef add pyinstaller to dependcies 2026-03-16 14:20:39 -07:00
276e1b1876 fix for xlsx and upgrade dependencies 2026-03-16 14:08:21 -07:00
3cfc6f01a2 typo 2026-03-13 16:57:51 -07:00
91fb9b38d4 copyright 2026-03-13 16:55:49 -07:00
1e346f8ef2 generalization and status bar fix 2026-03-13 14:07:40 -07:00
8c207b17ad more generalization 2026-03-13 13:40:49 -07:00
f794b98d18 changelog and generalization of updater exe 2026-03-13 13:14:00 -07:00
06972bee28 plz 2026-03-13 13:01:47 -07:00
c279f269da update logic seperated and blazes import 2026-03-13 13:00:57 -07:00
c7d044beed group fc 2026-02-03 17:29:34 -08:00
76df19f332 further issue fixes 2026-02-02 13:08:00 -08:00
22695a2281 functions instead of repeating 6 times 2026-02-01 14:12:46 -08:00
f1dd9bd184 release worthy? 2026-01-31 23:42:49 -08:00
dd2ac058af temp fc 2026-01-30 20:16:55 -08:00
98c749477c preferences 2026-01-30 15:38:12 -08:00
92973da658 fix to fold and memory leaks 2026-01-29 22:30:28 -08:00
f82978e2e8 changes and improvements 2026-01-29 17:23:52 -08:00
7007478c3b update ignore 2026-01-28 10:10:26 -08:00
fb728d5033 added support updating optode positions from .xlsx 2026-01-28 10:09:06 -08:00
8 changed files with 2996 additions and 1663 deletions

4
.gitignore vendored
View File

@@ -175,3 +175,7 @@ cython_debug/
.pypirc
/individual_images
*.xlsx
*.csv
*.snirf
*.json

View File

@@ -41,4 +41,4 @@ There are no conditions for Linux users at this time.
FLARES is distributed under the GPL-3.0 license.
Copyright (C) 2025 Tyler de Zeeuw
Copyright (C) 2025-2026 Tyler de Zeeuw

View File

@@ -1,7 +1,39 @@
# Version 1.2.2
- Added 'Update events in snirf file (BLAZES)...' and renamed 'Update events in snirf file...' to 'Update events in snirf file (BORIS)...'
- The BLAZES option will assign events that are exported directly from the software [BLAZES](https://git.research.dezeeuw.ca/tyler/blazes)
- Moved the updating logic to a seperate file for better reusability and generalization
- Fixed 'Toggle Status Bar' having no effect on the visibility of the status bar
- Fixed a bug when updating optode positions that would prevent .txt files from being selected. Fixes [Issue 54](https://git.research.dezeeuw.ca/tyler/flares/issues/54)
- Fixed a missing dependency in the standalone application when attempting to use an .xlsx file to update optode positions
# Version 1.2.1
- Added a requirements.txt file to ensure compatibility
- Added new options 'Missing Events Bypass' and 'Analysis Clearing Bypass' to the Preferences Menu
- Missing Events Bypass allows comparing events in the Group Viewers even if not all participants in the group have the event present. Fixes [Issue 28](https://git.research.dezeeuw.ca/tyler/flares/issues/28)
- Clicking Process after an analysis has been performed will now clear the existing analysis by default with a popup warning that the analysis will be cleared
- Analysis Clearing Bypass will prevent the popup and will not clear the existing analysis data. Fixes [Issue 41](https://git.research.dezeeuw.ca/tyler/flares/issues/41)
- Clicking 'Clear' should now actually properly clear all data. Hopefully fixes [Issue 9](https://git.research.dezeeuw.ca/tyler/flares/issues/9) for good
- Setting SHORT_CHANNEL to False will now grey out SHORT_CHANNEL_REGRESSION, as it is impossible to regress what does not exist. Sets SHORT_CHANNEL_REGRESSION to False under the hood when it is greyed out regardless of what is displayed. Fixes [Issue 47](https://git.research.dezeeuw.ca/tyler/flares/issues/47)
- Projects can now be saves if files have different parent folders. Fixes [Issue 48](https://git.research.dezeeuw.ca/tyler/flares/issues/48)
- It is no longer possible to attempt a save before any data has been processed. A popup will now display if a save is attempted with nothing to save
- Fixed a bug where LONG_CHANNEL_THRESH was not being applied in the processing steps
- Added a new option in the Analysis window for Group Functional Connectivity. Implements [Issue 50](https://git.research.dezeeuw.ca/tyler/flares/issues/50)
- Group Functional connectivity is still in development and the results should currently be taken with a grain of salt
- A warning is displayed when entering the Group Functional Connectivity Viewer disclosing this
- Fixed a bug when updating optode positions that would prevent .txt files from being selected. Fixes [Issue 54](https://git.research.dezeeuw.ca/tyler/flares/issues/54)
- Fixed a bug where the secondary download server would never get contacted if the primary failed
- Automatic downloads will now ignore prerelease versions. Fixes [Issue 52](https://git.research.dezeeuw.ca/tyler/flares/issues/52)
# Version 1.2.0
- This is a save-breaking release due to a new save file format. Please update your project files to ensure compatibility. Fixes [Issue 30](https://git.research.dezeeuw.ca/tyler/flares/issues/30)
- Added new parameters to the right side of the screen
- These parameters include SHOW_OPTODE_NAMES, SECONDS_TO_STRIP_HR, MAX_LOW_HR, MAX_HIGH_HR, SMOOTHING_WINDOW_HR, HEART_RATE_WINDOW, BAD_CHANNELS_HANDLING, MAX_DIST, MIN_NEIGHBORS, L_TRANS_BANDWIDTH, H_TRANS_BANDWIDTH, RESAMPLE, RESAMPLE_FREQ, STIM_DUR, HRF_MODEL, HIGH_PASS, DRIFT_ORDER, FIR_DELAYS, MIN_ONSET, OVERSAMPLING, SHORT_CHANNEL_REGRESSION, NOISE_MODEL, BINS, and VERBOSITY.
- Certain parameters now have dependencies on other parameters and will now grey out if they are not used
- All the new parameters have default values matching the underlying values in version 1.1.7
- The order of the parameters have changed to match the order that the code runs when the Process button is clicked
- Moved TIME_WINDOW_START and TIME_WINDOW_END to the 'Other' category
@@ -15,6 +47,20 @@
- Fixed the User Guide window to properly display information about the 24 stages and added a link to the Git wiki page
- MAX_WORKERS should now properly repect the value set
- Added a new CSV export option to be used by other applications
- Added support for updating optode positions directly from an .xlsx file from a Polhemius system
- Fixed an issue where the dropdowns in the Viewer windows would immediately open and close when using a trackpad
- glover and spm hrf models now function as intended without crashing. Currently, group analysis is still only supported by fir. Fixes [Issue 8](https://git.research.dezeeuw.ca/tyler/flares/issues/8)
- Clicking 'Clear' should now properly clear all data. Fixes [Issue 9](https://git.research.dezeeuw.ca/tyler/flares/issues/9)
- Revamped the fold channels viewer to not hang the application and to better process multiple participants at once. Fixes [Issue 34](https://git.research.dezeeuw.ca/tyler/flares/issues/34), [Issue 31](https://git.research.dezeeuw.ca/tyler/flares/issues/31)
- Added a Preferences menu to the navigation bar
- Two preferences have been added allowing to bypass the warning of 2D data detected and save files being from previous, potentially breaking versions
- Fixed a typo when saving a CSV that stated a SNIRF was being saved
- Loading a save file now properly restores AGE, GENDER, and GROUP. Fixes [Issue 40](https://git.research.dezeeuw.ca/tyler/flares/issues/40)
- Saving a project now no longer makes the main window go not responding. Fixes [Issue 43](https://git.research.dezeeuw.ca/tyler/flares/issues/43)
- Memory usage should no longer grow when generating lots of images multiple times. Fixes [Issue 36](https://git.research.dezeeuw.ca/tyler/flares/issues/36)
- Added a new option in the Analysis window for Functional Connectivity
- Functional connectivity is still in development and the results should currently be taken with a grain of salt
- A warning is displayed when entering the Functional Connectivity Viewer disclosing this
# Version 1.1.7
@@ -105,7 +151,7 @@
- Added a group option when clicking on a participant's file
- If no group is specified, the participant will be added to the "Default" group
- Added option to update the optode positions in a snirf file from the Options menu (F6)
- Fixed [Issue 3](https://git.research.dezeeuw.ca/tyler/flares/issues/3), [Issue 4](https://git.research.dezeeuw.ca/tyler/flares/issues/4), [Issue 17](https://git.research.dezeeuw.ca/tyler/flares/issues/17), [Issue 21](https://git.research.dezeeuw.ca/tyler/flares/issues/21), [Issue 22](https://git.research.dezeeuw.ca/tyler/flares/issues/22)
- Fixed [Issue 3](https://git.research.dezeeuw.ca/tyler/flares/issues/3), [Issue 5](https://git.research.dezeeuw.ca/tyler/flares/issues/5), [Issue 17](https://git.research.dezeeuw.ca/tyler/flares/issues/17), [Issue 21](https://git.research.dezeeuw.ca/tyler/flares/issues/21), [Issue 22](https://git.research.dezeeuw.ca/tyler/flares/issues/22)
# Version 1.0.1

483
flares.py
View File

@@ -47,9 +47,9 @@ from nilearn.glm.regression import OLSModel
import statsmodels.formula.api as smf # type: ignore
from statsmodels.stats.multitest import multipletests
from scipy import stats
from scipy.spatial.distance import cdist
from scipy.signal import welch, butter, filtfilt # type: ignore
from scipy.stats import pearsonr, zscore, t
import pywt # type: ignore
import neurokit2 as nk # type: ignore
@@ -58,6 +58,7 @@ import neurokit2 as nk # type: ignore
import pyvistaqt # type: ignore
import vtkmodules.util.data_model
import vtkmodules.util.execution_model
import xlrd
# External library imports for mne
from mne import (
@@ -90,6 +91,9 @@ from mne_nirs.io.fold import fold_channel_specificity # type: ignore
from mne_nirs.preprocessing import peak_power # type: ignore
from mne_nirs.statistics._glm_level_first import RegressionResults # type: ignore
from mne_connectivity.viz import plot_connectivity_circle
from mne_connectivity import envelope_correlation, spectral_connectivity_epochs, spectral_connectivity_time
# Needs to be set for mne
os.environ["SUBJECTS_DIR"] = str(data_path()) + "/subjects" # type: ignore
@@ -123,8 +127,6 @@ SECONDS_TO_KEEP: float
OPTODE_PLACEMENT: bool
SHOW_OPTODE_NAMES: bool
HEART_RATE: bool
SHORT_CHANNEL: bool
SHORT_CHANNEL_THRESH: float
LONG_CHANNEL_THRESH: float
@@ -189,9 +191,9 @@ TIME_WINDOW_END: int
MAX_WORKERS: int
VERBOSITY: bool
AGE = 25 # Assume 25 if not set from the GUI. This will result in a reasonable PPF
GENDER = ""
GROUP = "Default"
AGE: int = 25 # Assume 25 if not set from the GUI. This will result in a reasonable PPF
GENDER: str = ""
GROUP: str = "Default"
# These are parameters that are required for the analysis
REQUIRED_KEYS: dict[str, Any] = {
@@ -928,11 +930,12 @@ def interpolate_fNIRS_bads_weighted_average(raw, max_dist=0.03, min_neighbors=2)
raw.info['bads'] = [ch for ch in raw.info['bads'] if ch not in bad_ch_to_remove]
print("\nInterpolation complete.\n")
print("Bads cleared:", raw.info['bads'])
raw.info['bads'] = []
for ch in raw.info['bads']:
print(f"Channel {ch} still marked as bad.")
print("Bads cleared:", raw.info['bads'])
fig_raw_after = raw.plot(duration=raw.times[-1], n_channels=raw.info['nchan'], title="After interpolation", show=False)
return raw, fig_raw_after
@@ -1333,7 +1336,7 @@ def make_design_matrix(raw_haemo, short_chans):
drift_model=DRIFT_MODEL,
high_pass=HIGH_PASS,
drift_order=DRIFT_ORDER,
fir_delays=range(15),
fir_delays=FIR_DELAYS,
add_regs=short_chans.get_data().T,
add_reg_names=short_chans.ch_names,
min_onset=MIN_ONSET,
@@ -1347,7 +1350,7 @@ def make_design_matrix(raw_haemo, short_chans):
drift_model=DRIFT_MODEL,
high_pass=HIGH_PASS,
drift_order=DRIFT_ORDER,
fir_delays=range(15),
fir_delays=FIR_DELAYS,
min_onset=MIN_ONSET,
oversampling=OVERSAMPLING
)
@@ -1659,8 +1662,11 @@ def fold_channels(raw: BaseRaw) -> None:
landmark_color_map = {landmark: colors[i % len(colors)] for i, landmark in enumerate(landmarks)}
# Iterate over each channel
print(len(hbo_channel_names))
for idx, channel_name in enumerate(hbo_channel_names):
print(idx, channel_name)
# Run the fOLD on the selected channel
channel_data = raw.copy().pick(picks=channel_name) # type: ignore
@@ -1703,6 +1709,7 @@ def fold_channels(raw: BaseRaw) -> None:
landmark_specificity_data = []
# TODO: Fix this
if True:
handles = [
@@ -1725,8 +1732,9 @@ def fold_channels(raw: BaseRaw) -> None:
for ax in axes[len(hbo_channel_names):]:
ax.axis('off')
plt.show()
return fig, legend_fig
#plt.show()
fig_dict = {"main": fig, "legend": legend_fig}
return convert_fig_dict_to_png_bytes(fig_dict)
@@ -2246,9 +2254,15 @@ def brain_3d_visualization(raw_haemo, df_cha, selected_event, t_or_theta: Litera
# Get all activity conditions
for cond in [f'{selected_event}']:
if True:
ch_summary = df_cha.query(f"Condition.str.startswith('{cond}_delay_') and Chroma == 'hbo'", engine='python') # type: ignore
print(ch_summary)
if ch_summary.empty:
#not fir model
print("No data found for this condition.")
ch_summary = df_cha.query(f"Condition in [@cond] and Chroma == 'hbo'", engine='python')
# Use ordinary least squares (OLS) if only one participant
# TODO: Fix.
if True:
@@ -2269,6 +2283,9 @@ def brain_3d_visualization(raw_haemo, df_cha, selected_event, t_or_theta: Litera
valid_channels = ch_summary["ch_name"].unique().tolist() # type: ignore
raw_for_plot = raw_haemo.copy().pick(picks=valid_channels) # type: ignore
print(f"DEBUG: Model DF rows: {len(model_df)}")
print(f"DEBUG: Raw channels: {len(raw_for_plot.ch_names)}")
brain = plot_3d_evoked_array(raw_for_plot.pick(picks="hbo"), model_df, view="dorsal", distance=0.02, colorbar=True, clim=clim, mode="weighted", size=(800, 700)) # type: ignore
if show_optodes == 'all' or show_optodes == 'sensors':
@@ -2804,7 +2821,7 @@ def run_second_level_analysis(df_contrasts, raw, p, bounds):
result = model.fit(Y)
t_val = result.t(0).item()
p_val = 2 * stats.t.sf(np.abs(t_val), df=result.df_model)
p_val = 2 * t.sf(np.abs(t_val), df=result.df_model)
mean_beta = np.mean(Y)
group_results.append({
@@ -3299,7 +3316,7 @@ def hr_calc(raw):
# --- Parameters for PSD ---
desired_bin_hz = 0.1
nperseg = int(sfreq / desired_bin_hz)
hr_range = (30, 180)
hr_range = (30, 180) # TODO: SHould this not use the user defined values?
# --- Function to find strongest local peak ---
def find_hr_from_psd(ch_data):
@@ -3343,6 +3360,7 @@ def process_participant(file_path, progress_callback=None):
logger.info("Step 1 Completed.")
# Step 2: Trimming
# TODO: Clean this into a method
if TRIM:
if hasattr(raw, 'annotations') and len(raw.annotations) > 0:
# Get time of first event
@@ -3385,7 +3403,7 @@ def process_participant(file_path, progress_callback=None):
fig_individual["short"] = fig_short_chans
else:
short_chans = None
get_long_channels(raw, min_dist=SHORT_CHANNEL_THRESH, max_dist=LONG_CHANNEL_THRESH) # Don't update the existing raw
raw = get_long_channels(raw, min_dist=0, max_dist=LONG_CHANNEL_THRESH) # keep both short channels and all channels up to the threshold length
if progress_callback: progress_callback(4)
logger.info("Step 4 Completed.")
@@ -3527,6 +3545,7 @@ def process_participant(file_path, progress_callback=None):
logger.info("19")
# Step 20: Generate GLM Results
if "derivative" not in HRF_MODEL.lower():
fig_glm_result = plot_glm_results(file_path, raw_haemo, glm_est, design_matrix)
for name, fig in fig_glm_result:
fig_individual[f"GLM {name}"] = fig
@@ -3534,6 +3553,7 @@ def process_participant(file_path, progress_callback=None):
logger.info("20")
# Step 21: Generate Channel Significance
if HRF_MODEL == "fir":
fig_significance = individual_significance(raw_haemo, glm_est)
for name, fig in fig_significance:
fig_individual[f"Significance {name}"] = fig
@@ -3568,6 +3588,7 @@ def process_participant(file_path, progress_callback=None):
[(column, contrast_matrix[i]) for i, column in enumerate(design_matrix.columns)]
)
if HRF_MODEL == "fir":
all_delay_cols = [col for col in design_matrix.columns if "_delay_" in col]
all_conditions = sorted({col.split("_delay_")[0] for col in all_delay_cols})
@@ -3599,6 +3620,8 @@ def process_participant(file_path, progress_callback=None):
# Step 23: Compute Contrast Results
contrast_results = {}
if HRF_MODEL == "fir":
for cond, contrast_vector in contrast_dict.items():
contrast = glm_est.compute_contrast(contrast_vector) # type: ignore
df = contrast.to_dataframe()
@@ -3618,7 +3641,17 @@ def process_participant(file_path, progress_callback=None):
if progress_callback: progress_callback(25)
logger.info("25")
return raw_haemo, epochs, fig_bytes, cha, contrast_results, df_ind, design_matrix, AGE, GENDER, GROUP, True
# TODO: Tidy up
# Extract the parameters this file was ran with. No need to return age, gender, group?
config = {
k: globals()[k]
for k in __annotations__
if k in globals() and k != "REQUIRED_KEYS"
}
print(config)
return raw_haemo, config, epochs, fig_bytes, cha, contrast_results, df_ind, design_matrix, True
def sanitize_paths_for_pickle(raw_haemo, epochs):
@@ -3629,3 +3662,421 @@ def sanitize_paths_for_pickle(raw_haemo, epochs):
# Fix epochs._raw._filenames
if hasattr(epochs, '_raw') and hasattr(epochs._raw, '_filenames'):
epochs._raw._filenames = [str(p) for p in epochs._raw._filenames]
def functional_connectivity_spectral_epochs(epochs, n_lines, vmin):
# will crash without this load
epochs.load_data()
hbo_epochs = epochs.copy().pick(picks="hbo")
data = hbo_epochs.get_data()
names = hbo_epochs.ch_names
sfreq = hbo_epochs.info["sfreq"]
con = spectral_connectivity_epochs(
data,
method=["coh", "plv"],
mode="multitaper",
sfreq=sfreq,
fmin=0.04,
fmax=0.2,
faverage=True,
verbose=True
)
con_coh, con_plv = con
coh = con_coh.get_data(output="dense").squeeze()
plv = con_plv.get_data(output="dense").squeeze()
np.fill_diagonal(coh, 0)
np.fill_diagonal(plv, 0)
plot_connectivity_circle(
coh,
names,
title="fNIRS Functional Connectivity (HbO - Coherence)",
n_lines=n_lines,
vmin=vmin
)
def functional_connectivity_spectral_time(epochs, n_lines, vmin):
# will crash without this load
epochs.load_data()
hbo_epochs = epochs.copy().pick(picks="hbo")
data = hbo_epochs.get_data()
names = hbo_epochs.ch_names
sfreq = hbo_epochs.info["sfreq"]
freqs = np.linspace(0.04, 0.2, 10)
n_cycles = freqs * 2
con = spectral_connectivity_time(
data,
freqs=freqs,
method=["coh", "plv"],
mode="multitaper",
sfreq=sfreq,
fmin=0.04,
fmax=0.2,
n_cycles=n_cycles,
faverage=True,
verbose=True
)
con_coh, con_plv = con
coh = con_coh.get_data(output="dense").squeeze()
plv = con_plv.get_data(output="dense").squeeze()
np.fill_diagonal(coh, 0)
np.fill_diagonal(plv, 0)
plot_connectivity_circle(
coh,
names,
title="fNIRS Functional Connectivity (HbO - Coherence)",
n_lines=n_lines,
vmin=vmin
)
def functional_connectivity_envelope(epochs, n_lines, vmin):
# will crash without this load
epochs.load_data()
hbo_epochs = epochs.copy().pick(picks="hbo")
data = hbo_epochs.get_data()
env = envelope_correlation(
data,
orthogonalize=False,
absolute=True
)
env_data = env.get_data(output="dense")
env_corr = env_data.mean(axis=0)
env_corr = np.squeeze(env_corr)
np.fill_diagonal(env_corr, 0)
plot_connectivity_circle(
env_corr,
hbo_epochs.ch_names,
title="fNIRS HbO Envelope Correlation (Task Connectivity)",
n_lines=n_lines,
vmin=vmin
)
def functional_connectivity_betas(raw_hbo, n_lines, vmin, event_name=None):
raw_hbo = raw_hbo.copy().pick(picks="hbo")
onsets = raw_hbo.annotations.onset
# CRITICAL: Update the Raw object's annotations so the GLM sees unique events
ann = raw_hbo.annotations
new_desc = []
for i, desc in enumerate(ann.description):
new_desc.append(f"{desc}__trial_{i:03d}")
ann.description = np.array(new_desc)
# shoudl use user defiuned!!!!
design_matrix = make_first_level_design_matrix(
raw=raw_hbo,
hrf_model='fir',
fir_delays=np.arange(0, 12, 1),
drift_model='cosine',
drift_order=1
)
# 3. Run GLM & Extract Betas
glm_results = run_glm(raw_hbo, design_matrix)
betas = np.array(glm_results.theta())
reg_names = list(design_matrix.columns)
n_channels = betas.shape[0]
# ------------------------------------------------------------------
# 5. Find unique trial tags (optionally filtered by event)
# ------------------------------------------------------------------
trial_tags = sorted({
col.split("_delay")[0]
for col in reg_names
if (
("__trial_" in col)
and (event_name is None or col.startswith(event_name + "__"))
)
})
if len(trial_tags) == 0:
raise ValueError(f"No trials found for event_name={event_name}")
# ------------------------------------------------------------------
# 6. Build beta series (average across FIR delays per trial)
# ------------------------------------------------------------------
beta_series = np.zeros((n_channels, len(trial_tags)))
for t, tag in enumerate(trial_tags):
idx = [
i for i, col in enumerate(reg_names)
if col.startswith(f"{tag}_delay")
]
beta_series[:, t] = np.mean(betas[:, idx], axis=1).flatten()
# n_channels, n_trials = betas.shape[0], len(onsets)
# beta_series = np.zeros((n_channels, n_trials))
# for t in range(n_trials):
# trial_indices = [i for i, col in enumerate(reg_names) if col.startswith(f"trial_{t:03d}_delay")]
# if trial_indices:
# beta_series[:, t] = np.mean(betas[:, trial_indices], axis=1).flatten()
# Normalize each channel so they are on the same scale
# Without this, everything is connected to everything. Apparently this is a big issue in fNIRS?
beta_series = zscore(beta_series, axis=1)
global_signal = np.mean(beta_series, axis=0)
beta_series_clean = np.zeros_like(beta_series)
for i in range(n_channels):
slope, _ = np.polyfit(global_signal, beta_series[i, :], 1)
beta_series_clean[i, :] = beta_series[i, :] - (slope * global_signal)
# 4. Correlation & Strict Filtering
corr_matrix = np.zeros((n_channels, n_channels))
p_matrix = np.ones((n_channels, n_channels))
for i in range(n_channels):
for j in range(i + 1, n_channels):
r, p = pearsonr(beta_series_clean[i, :], beta_series_clean[j, :])
corr_matrix[i, j] = corr_matrix[j, i] = r
p_matrix[i, j] = p_matrix[j, i] = p
# 5. High-Bar Thresholding
reject, _ = multipletests(p_matrix[np.triu_indices(n_channels, k=1)], method='fdr_bh', alpha=0.05)[:2]
sig_corr_matrix = np.zeros_like(corr_matrix)
triu = np.triu_indices(n_channels, k=1)
for idx, is_sig in enumerate(reject):
r_val = corr_matrix[triu[0][idx], triu[1][idx]]
# Only keep the absolute strongest connections
if is_sig and abs(r_val) > 0.7:
sig_corr_matrix[triu[0][idx], triu[1][idx]] = r_val
sig_corr_matrix[triu[1][idx], triu[0][idx]] = r_val
# 6. Plot
plot_connectivity_circle(
sig_corr_matrix,
raw_hbo.ch_names,
title="Strictly Filtered Connectivity (TDDR + GSR + Z-Score)",
n_lines=None,
vmin=0.7,
vmax=1.0,
colormap='hot' # Use 'hot' to make positive connections pop
)
def get_single_subject_beta_corr(raw_hbo, event_name=None, config=None):
"""Processes one participant and returns their correlation matrix."""
raw_hbo = raw_hbo.copy().pick(picks="hbo")
ann = raw_hbo.annotations
# Rename for trial-level GLM
new_desc = [f"{desc}__trial_{i:03d}" for i, desc in enumerate(ann.description)]
ann.description = np.array(new_desc)
if config == None:
print("no config")
design_matrix = make_first_level_design_matrix(
raw=raw_hbo, hrf_model='fir',
fir_delays=np.arange(0, 12, 1),
drift_model='cosine', drift_order=1
)
else:
print("config")
if config.get("SHORT_CHANNEL_REGRESSION") == True:
short_chans = get_short_channels(raw_hbo, max_dist=config.get("SHORT_CHANNEL_THRESH"))
design_matrix = make_first_level_design_matrix(
raw=raw_hbo,
stim_dur=config.get("STIM_DUR"),
hrf_model=config.get("HRF_MODEL"),
drift_model=config.get("DRIFT_MODEL"),
high_pass=config.get("HIGH_PASS"),
drift_order=config.get("DRIFT_ORDER"),
fir_delays=config.get("FIR_DELAYS"),
add_regs=short_chans.get_data().T,
add_reg_names=short_chans.ch_names,
min_onset=config.get("MIN_ONSET"),
oversampling=config.get("OVERSAMPLING")
)
print("yep")
else:
design_matrix = make_first_level_design_matrix(
raw=raw_hbo,
stim_dur=config.get("STIM_DUR"),
hrf_model=config.get("HRF_MODEL"),
drift_model=config.get("DRIFT_MODEL"),
high_pass=config.get("HIGH_PASS"),
drift_order=config.get("DRIFT_ORDER"),
fir_delays=config.get("FIR_DELAYS"),
min_onset=config.get("MIN_ONSET"),
oversampling=config.get("OVERSAMPLING")
)
glm_results = run_glm(raw_hbo, design_matrix)
betas = np.array(glm_results.theta())
reg_names = list(design_matrix.columns)
n_channels = betas.shape[0]
# Filter trials by event name
trial_tags = sorted({
col.split("_delay")[0] for col in reg_names
if "__trial_" in col and (event_name is None or col.startswith(event_name + "__"))
})
if not trial_tags:
return None, None
# Build Beta Series
beta_series = np.zeros((n_channels, len(trial_tags)))
for t, tag in enumerate(trial_tags):
idx = [i for i, col in enumerate(reg_names) if col.startswith(f"{tag}_delay")]
beta_series[:, t] = np.mean(betas[:, idx], axis=1).flatten()
#beta_series[:, t] = np.max(betas[:, idx], axis=1).flatten() #TODO: Figure out which one to use
# Z-score and GSR (Global Signal Regression)
beta_series = zscore(beta_series, axis=1)
global_signal = np.mean(beta_series, axis=0)
for i in range(n_channels):
slope, _ = np.polyfit(global_signal, beta_series[i, :], 1)
beta_series[i, :] -= (slope * global_signal)
# Correlation Matrix
corr_matrix = np.corrcoef(beta_series)
return corr_matrix, raw_hbo.ch_names
def run_group_functional_connectivity(haemo_dict, config_dict, selected_paths, event_name, n_lines, vmin):
"""Aggregates multiple participants and triggers the plot."""
all_z_matrices = []
common_names = None
for path in selected_paths:
raw = haemo_dict.get(path)
config = config_dict.get(path)
if raw is None: continue
print(config)
corr, names = get_single_subject_beta_corr(raw, event_name, config)
if corr is not None:
# Fisher Z-transform for averaging
z_mat = np.arctanh(np.clip(corr, -0.99, 0.99))
all_z_matrices.append(z_mat)
common_names = names
from scipy.stats import ttest_1samp
# 1. Convert list to 3D array: (Participants, Channels, Channels)
group_z_data = np.array(all_z_matrices)
print("1")
# 2. Perform a T-Test across the participant dimension (axis 0)
# We test if the mean Z-score is different from 0
# C:\Users\tyler\Desktop\research\.venv\Lib\site-packages\scipy\stats\_axis_nan_policy.py:611: RuntimeWarning: Precision loss occurred in moment calculation due to catastrophic cancellation. This occurs when the data are nearly identical. Results may be unreliable.
# res = hypotest_fun_out(*samples, axis=axis, **kwds)
print("--- Variance Check ---")
# ADD THIS LINE: Define n_channels based on the data shape
# group_z_data.shape is (n_participants, n_channels, n_channels)
n_channels = group_z_data.shape[1]
variance_matrix = np.var(group_z_data, axis=0)
# Find where variance is exactly 0 (or very close to it)
zero_var_indices = np.where(variance_matrix < 1e-15)
coords = list(zip(zero_var_indices[0], zero_var_indices[1]))
diag_count = 0
non_diag_pairs = []
for r, c in coords:
if r == c:
diag_count += 1
else:
non_diag_pairs.append((r, c))
print(f"Total pairs with zero variance: {len(coords)}")
print(f"Identical diagonals: {diag_count}/{n_channels}")
if non_diag_pairs:
print(f"Warning: {len(non_diag_pairs)} non-diagonal pairs have zero variance!")
for r, c in non_diag_pairs[:10]: # Print first 10
print(f" - Pair: Channel {r} & Channel {c}")
else:
print("Clean! Zero variance only exists on the diagonals.")
print("----------------------")
t_stats, p_values = ttest_1samp(group_z_data, popmean=0, axis=0)
print("2")
# 3. Multiple Comparisons Correction (FDR)
# We only care about the upper triangle (unique connections)
n_channels = p_values.shape[0]
triu_indices = np.triu_indices(n_channels, k=1)
flat_p = p_values[triu_indices]
reject, corrected_p = multipletests(flat_p, method='fdr_bh', alpha=0.05)[:2]
# 4. Create the final "Significant" Matrix
avg_r = np.tanh(np.mean(group_z_data, axis=0))
sig_avg_r = np.zeros_like(avg_r)
# Only keep connections that are Significant AND above your VMIN (r-threshold)
for idx, is_sig in enumerate(reject):
row, col = triu_indices[0][idx], triu_indices[1][idx]
r_val = avg_r[row, col]
if is_sig and abs(r_val) >= vmin:
sig_avg_r[row, col] = sig_avg_r[col, row] = r_val
# 5. Plot the significant results
# if not all_z_matrices:
# return
# # Average and convert back to R
# avg_z = np.mean(all_z_matrices, axis=0)
# avg_r = np.tanh(avg_z)
# # Thresholding
# avg_r[np.abs(avg_r) < vmin] = 0
plot_connectivity_circle(
sig_avg_r, common_names, n_lines=n_lines,
title=f"Group Connectivity: {event_name if event_name else 'All Events'}",
vmin=vmin, vmax=1.0, colormap='hot'
)

View File

@@ -18,11 +18,12 @@ import subprocess
from datetime import datetime
PLATFORM_NAME = platform.system().lower()
APP_NAME = "flares"
if PLATFORM_NAME == 'darwin':
LOG_FILE = os.path.join(os.path.dirname(sys.executable), "../../../flares_updater.log")
LOG_FILE = os.path.join(os.path.dirname(sys.executable), f"../../../{APP_NAME}_updater.log")
else:
LOG_FILE = os.path.join(os.getcwd(), "flares_updater.log")
LOG_FILE = os.path.join(os.getcwd(), f"{APP_NAME}_updater.log")
def log(msg):
@@ -147,7 +148,7 @@ def copy_update_files_darwin(src_folder, dest_folder, updater_name):
def remove_quarantine(app_path):
script = f'''
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "FLARES needs privileges to finish the update. (1/2)"
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{APP_NAME} needs privileges to finish the update. (1/2)"
'''
try:
subprocess.run(['osascript', '-e', script], check=True)
@@ -162,7 +163,7 @@ def main():
log(f"[Updater] sys.argv: {sys.argv}")
if len(sys.argv) != 3:
log("Invalid arguments. Usage: flares_updater <update_folder> <main_app_executable>")
log(f"Invalid arguments. Usage: {APP_NAME}_updater <update_folder> <main_app_executable>")
sys.exit(1)
update_folder = sys.argv[1]
@@ -215,7 +216,7 @@ def main():
if PLATFORM_NAME == 'darwin':
log(f'Attempting to delete {ppparent_dir}')
delete_path(ppparent_dir)
update_folder = os.path.join(sys.argv[1], "flares-darwin")
update_folder = os.path.join(sys.argv[1], f"{APP_NAME}-darwin")
copy_update_files_darwin(update_folder, pppparent_dir, updater_name)
else:

3446
main.py

File diff suppressed because it is too large Load Diff

BIN
requirements.txt Normal file

Binary file not shown.

539
updater.py Normal file
View File

@@ -0,0 +1,539 @@
"""
Filename: updater.py
Description: Generic updater file
Author: Tyler de Zeeuw
License: GPL-3.0
"""
# Built-in imports
import os
import re
import sys
import time
import shlex
import shutil
import zipfile
import traceback
import subprocess
# External library imports
import psutil
import requests
from PySide6.QtWidgets import QMessageBox
from PySide6.QtCore import QThread, Signal, QObject
class UpdateDownloadThread(QThread):
"""
Thread that downloads and extracts an update package and emits a signal on completion or error.
Args:
download_url (str): URL of the update zip file to download.
latest_version (str): Version string of the latest update.
"""
update_ready = Signal(str, str)
error_occurred = Signal(str)
def __init__(self, download_url, latest_version, platform_name, app_name):
super().__init__()
self.download_url = download_url
self.latest_version = latest_version
self.platform_name = platform_name
self.app_name = app_name
def run(self):
try:
local_filename = os.path.basename(self.download_url)
if self.platform_name == 'darwin':
tmp_dir = f'/tmp/{self.app_name}tempupdate'
os.makedirs(tmp_dir, exist_ok=True)
local_path = os.path.join(tmp_dir, local_filename)
else:
local_path = os.path.join(os.getcwd(), local_filename)
# Download the file
with requests.get(self.download_url, stream=True, timeout=15) as r:
r.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Extract folder name (remove .zip)
if self.platform_name == 'darwin':
extract_folder = os.path.splitext(local_filename)[0]
extract_path = os.path.join(tmp_dir, extract_folder)
else:
extract_folder = os.path.splitext(local_filename)[0]
extract_path = os.path.join(os.getcwd(), extract_folder)
# Create the folder if not exists
os.makedirs(extract_path, exist_ok=True)
# Extract the zip file contents
if self.platform_name == 'darwin':
subprocess.run(['ditto', '-xk', local_path, extract_path], check=True)
else:
with zipfile.ZipFile(local_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
# Remove the zip once extracted and emit a signal
os.remove(local_path)
self.update_ready.emit(self.latest_version, extract_path)
except Exception as e:
# Emit a signal signifying failure
self.error_occurred.emit(str(e))
class UpdateCheckThread(QThread):
"""
Thread that checks for updates by querying the API and emits a signal based on the result.
Signals:
download_requested(str, str): Emitted with (download_url, latest_version) when an update is available.
no_update_available(): Emitted when no update is found or current version is up to date.
error_occurred(str): Emitted with an error message if the update check fails.
"""
download_requested = Signal(str, str)
no_update_available = Signal()
error_occurred = Signal(str)
def __init__(self, api_url, api_url_sec, current_version, platform_name, app_name):
super().__init__()
self.api_url = api_url
self.api_url_sec = api_url_sec
self.current_version = current_version
self.platform_name = platform_name
self.app_name = app_name
def run(self):
# if not getattr(sys, 'frozen', False):
# self.error_occurred.emit("Application is not frozen (Development mode).")
# return
try:
latest_version, download_url = self.get_latest_release_for_platform()
if not latest_version:
self.no_update_available.emit()
return
if not download_url:
self.error_occurred.emit(f"No download available for platform '{self.platform_name}'")
return
if self.version_compare(latest_version, self.current_version) > 0:
self.download_requested.emit(download_url, latest_version)
else:
self.no_update_available.emit()
except Exception as e:
self.error_occurred.emit(f"Update check failed: {e}")
def version_compare(self, v1, v2):
def normalize(v): return [int(x) for x in v.split(".")]
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
def get_latest_release_for_platform(self):
urls = [self.api_url, self.api_url_sec]
for url in urls:
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
releases = response.json()
if not releases:
continue
latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None)
if not latest:
continue
tag = latest["tag_name"].lstrip("v")
for asset in latest.get("assets", []):
if self.platform_name in asset["name"].lower():
return tag, asset["browser_download_url"]
return tag, None
except (requests.RequestException, ValueError) as e:
continue
return None, None
class LocalPendingUpdateCheckThread(QThread):
"""
Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly.
Args:
current_version (str): Current application version.
platform_suffix (str): Platform-specific suffix to identify update folders.
"""
pending_update_found = Signal(str, str)
no_pending_update = Signal()
def __init__(self, current_version, platform_suffix, platform_name, app_name):
super().__init__()
self.current_version = current_version
self.platform_suffix = platform_suffix
self.platform_name = platform_name
self.app_name = app_name
def version_compare(self, v1, v2):
def normalize(v): return [int(x) for x in v.split(".")]
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
def run(self):
if self.platform_name == 'darwin':
cwd = f'/tmp/{self.app_name}tempupdate'
else:
cwd = os.getcwd()
pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$")
found = False
try:
for item in os.listdir(cwd):
folder_path = os.path.join(cwd, item)
if os.path.isdir(folder_path) and item.endswith(self.platform_suffix):
match = pattern.match(item)
if match:
folder_version = match.group(1)
if self.version_compare(folder_version, self.current_version) > 0:
self.pending_update_found.emit(folder_version, folder_path)
found = True
break
except:
pass
if not found:
self.no_pending_update.emit()
class UpdateManager(QObject):
"""
Orchestrates the update process.
Main apps should instantiate this and call check_for_updates().
"""
def __init__(self, main_window, api_url, api_url_sec, current_version, platform_name, platform_suffix, app_name):
super().__init__()
self.parent = main_window
self.api_url = api_url
self.api_url_sec = api_url_sec
self.current_version = current_version
self.platform_name = platform_name
self.platform_suffix = platform_suffix
self.app_name = app_name
self.pending_update_version = None
self.pending_update_path = None
def manual_check_for_updates(self):
self.local_check_thread = LocalPendingUpdateCheckThread(self.current_version, self.platform_suffix, self.platform_name, self.app_name)
self.local_check_thread.pending_update_found.connect(self.on_pending_update_found)
self.local_check_thread.no_pending_update.connect(self.on_no_pending_update)
self.local_check_thread.start()
def on_pending_update_found(self, version, folder_path):
self.parent.statusBar().showMessage(f"Pending update found: version {version}")
self.pending_update_version = version
self.pending_update_path = folder_path
self.show_pending_update_popup()
def on_no_pending_update(self):
# No pending update found locally, start server check directly
self.parent.statusBar().showMessage("No pending local update found. Checking server...")
self.start_update_check_thread()
def show_pending_update_popup(self):
msg_box = QMessageBox(self.parent)
msg_box.setWindowTitle("Pending Update Found")
msg_box.setText(f"A previously downloaded update for {self.app_name.upper()} (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?")
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
msg_box.exec()
if msg_box.clickedButton() == install_now_button:
self.install_update(self.pending_update_path)
else:
self.parent.statusBar().showMessage("Pending update available. Install later.")
# After user dismisses, still check the server for new updates
self.start_update_check_thread()
def start_update_check_thread(self):
self.check_thread = UpdateCheckThread(self.api_url, self.api_url_sec, self.current_version, self.platform_name, self.app_name)
self.check_thread.download_requested.connect(self.on_server_update_requested)
self.check_thread.no_update_available.connect(self.on_server_no_update)
self.check_thread.error_occurred.connect(self.on_error)
self.check_thread.start()
def on_server_no_update(self):
self.parent.statusBar().showMessage("No new updates found on server.", 5000)
def on_server_update_requested(self, download_url, latest_version):
if self.pending_update_version:
cmp = self.version_compare(latest_version, self.pending_update_version)
if cmp > 0:
# Server version is newer than pending update
self.parent.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...")
try:
shutil.rmtree(self.pending_update_path)
self.parent.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}")
except Exception as e:
self.parent.statusBar().showMessage(f"Failed to delete old update folder: {e}")
# Clear pending update info so new download proceeds
self.pending_update_version = None
self.pending_update_path = None
# Download the new update
self.download_update(download_url, latest_version)
elif cmp == 0:
# Versions equal, no download needed
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.")
else:
# Server version older than pending? Unlikely but just keep pending update
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.")
else:
# No pending update, just download
self.download_update(download_url, latest_version)
def download_update(self, download_url, latest_version):
self.parent.statusBar().showMessage("Downloading update...")
self.download_thread = UpdateDownloadThread(download_url, latest_version, self.platform_name, self.app_name)
self.download_thread.update_ready.connect(self.on_update_ready)
self.download_thread.error_occurred.connect(self.on_error)
self.download_thread.start()
def on_update_ready(self, latest_version, extract_folder):
self.parent.statusBar().showMessage("Update downloaded and extracted.")
msg_box = QMessageBox(self.parent)
msg_box.setWindowTitle("Update Ready")
msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?")
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
msg_box.exec()
if msg_box.clickedButton() == install_now_button:
self.install_update(extract_folder)
else:
self.parent.statusBar().showMessage("Update ready. Install later.")
def install_update(self, extract_folder):
# Path to updater executable
if self.platform_name == 'windows':
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater.exe")
elif self.platform_name == 'darwin':
if getattr(sys, 'frozen', False):
updater_path = os.path.join(os.path.dirname(sys.executable), f"../../../{self.app_name}_updater.app")
else:
updater_path = os.path.join(os.getcwd(), f"../{self.app_name}_updater.app")
elif self.platform_name == 'linux':
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater")
else:
updater_path = os.getcwd()
if not os.path.exists(updater_path):
QMessageBox.critical(self.parent, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}")
return
# Launch updater with extracted folder path as argument
try:
# Pass current app's executable path for updater to relaunch
main_app_executable = os.path.abspath(sys.argv[0])
print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"')
if self.platform_name == 'darwin':
subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable])
else:
subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path))
# Close the current app so updater can replace files
sys.exit(0)
except Exception as e:
QMessageBox.critical(self.parent, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}")
def on_error(self, message):
# print(f"Error: {message}")
self.parent.statusBar().showMessage(f"Error occurred during update process. {message}")
def version_compare(self, v1, v2):
def normalize(v): return [int(x) for x in v.split(".")]
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
def wait_for_process_to_exit(process_name, timeout=10):
"""
Waits for a process with the specified name to exit within a timeout period.
Args:
process_name (str): Name (or part of the name) of the process to wait for.
timeout (int, optional): Maximum time to wait in seconds. Defaults to 10.
Returns:
bool: True if the process exited before the timeout, False otherwise.
"""
print(f"Waiting for {process_name} to exit...")
deadline = time.time() + timeout
while time.time() < deadline:
still_running = False
for proc in psutil.process_iter(['name']):
try:
if proc.info['name'] and process_name.lower() in proc.info['name'].lower():
still_running = True
print(f"Still running: {proc.info['name']} (PID: {proc.pid})")
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if not still_running:
print(f"{process_name} has exited.")
return True
time.sleep(0.5)
print(f"{process_name} did not exit in time.")
return False
def finish_update_if_needed(platform_name, app_name):
"""
Completes a pending application update if '--finish-update' is present in the command-line arguments.
"""
if "--finish-update" in sys.argv:
print("Finishing update...")
if platform_name == 'darwin':
app_dir = f'/tmp/{app_name}tempupdate'
else:
app_dir = os.getcwd()
# 1. Find update folder
update_folder = None
for entry in os.listdir(app_dir):
entry_path = os.path.join(app_dir, entry)
if os.path.isdir(entry_path) and entry.startswith(f"{app_name}-") and entry.endswith("-" + platform_name):
update_folder = os.path.join(app_dir, entry)
break
if update_folder is None:
print("No update folder found. Skipping update steps.")
return
if platform_name == 'darwin':
update_folder = os.path.join(update_folder, f"{app_name}-darwin")
# 2. Wait for updater to exit
print(f"Waiting for {app_name}_updater to exit...")
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] and f"{app_name}_updater" in proc.info['name'].lower():
try:
proc.wait(timeout=5)
except psutil.TimeoutExpired:
print(f"Force killing lingering {app_name}_updater")
proc.kill()
# 3. Replace the updater
if platform_name == 'windows':
new_updater = os.path.join(update_folder, f"{app_name}_updater.exe")
dest_updater = os.path.join(app_dir, f"{app_name}_updater.exe")
elif platform_name == 'darwin':
new_updater = os.path.join(update_folder, f"{app_name}_updater.app")
dest_updater = os.path.abspath(os.path.join(sys.executable, f"../../../../{app_name}_updater.app"))
elif platform_name == 'linux':
new_updater = os.path.join(update_folder, f"{app_name}_updater")
dest_updater = os.path.join(app_dir, f"{app_name}_updater")
else:
print("Unknown Platform")
new_updater = os.getcwd()
dest_updater = os.getcwd()
print(f"New updater is {new_updater}")
print(f"Dest updater is {dest_updater}")
print("Writable?", os.access(dest_updater, os.W_OK))
print("Executable path:", sys.executable)
print("Trying to copy:", new_updater, "->", dest_updater)
if os.path.exists(new_updater):
try:
if os.path.exists(dest_updater):
if platform_name == 'darwin':
try:
if os.path.isdir(dest_updater):
shutil.rmtree(dest_updater)
print(f"Deleted directory: {dest_updater}")
else:
os.remove(dest_updater)
print(f"Deleted file: {dest_updater}")
except Exception as e:
print(f"Error deleting {dest_updater}: {e}")
else:
os.remove(dest_updater)
if platform_name == 'darwin':
wait_for_process_to_exit(f"{app_name}_updater", timeout=10)
subprocess.check_call(["ditto", new_updater, dest_updater])
else:
shutil.copy2(new_updater, dest_updater)
if platform_name in ('linux', 'darwin'):
os.chmod(dest_updater, 0o755)
if platform_name == 'darwin':
remove_quarantine(dest_updater, app_name)
print(f"{app_name}_updater replaced.")
except Exception as e:
print(f"Failed to replace {app_name}_updater: {e}")
# 4. Delete the update folder
try:
if platform_name == 'darwin':
shutil.rmtree(app_dir)
else:
shutil.rmtree(update_folder)
except Exception as e:
print(f"Failed to delete update folder: {e}")
QMessageBox.information(None, "Update Complete", "The application has been successfully updated.")
sys.argv.remove("--finish-update")
def remove_quarantine(app_path, app_name):
"""
Removes the macOS quarantine attribute from the specified application path.
"""
script = f'''
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{app_name.upper()} needs privileges to finish the update. (2/2)"
'''
try:
subprocess.run(['osascript', '-e', script], check=True)
print("✅ Quarantine attribute removed.")
except subprocess.CalledProcessError as e:
print("❌ Failed to remove quarantine attribute.")
print(e)