13 Commits

10 changed files with 1747 additions and 976 deletions

1
.gitignore vendored
View File

@@ -179,3 +179,4 @@ cython_debug/
*.csv *.csv
*.snirf *.snirf
*.json *.json
flares-*

View File

@@ -27,7 +27,7 @@ This only applies for the first time you attempt to run FLARES. Subsequent times
Due to the cost of a code signing certificate, the application is not digitally signed. Once the application is extracted and attempted to be launched for the first time you will get a popup stating: Due to the cost of a code signing certificate, the application is not digitally signed. Once the application is extracted and attempted to be launched for the first time you will get a popup stating:
"Windows protected your PC - Microsoft Defender SmartScreen prevented an unrecognized app from starting. Running this app might put your PC at risk.", with the options of" More info" or "Don't run". "Windows protected your PC - Microsoft Defender SmartScreen prevented an unrecognized app from starting. Running this app might put your PC at risk.", with the options of "More info" or "Don't run".
The solution around this is to click "More info" and then select "Run anyway". The solution around this is to click "More info" and then select "Run anyway".
@@ -41,4 +41,4 @@ There are no conditions for Linux users at this time.
FLARES is distributed under the GPL-3.0 license. FLARES is distributed under the GPL-3.0 license.
Copyright (C) 2025 Tyler de Zeeuw Copyright (C) 2025-2026 Tyler de Zeeuw

View File

@@ -1,3 +1,36 @@
# Version 1.3.0
- This is a save-changing release due to a new save file format. Please update your project files to ensure compatibility
- It is still potentially possible to load older saves by enabling 'Incompatible Save Bypass' from the Preferences menu
- Fixed workers not releasing memory when processing multiple participants. Fixes [Issue 55](https://git.research.dezeeuw.ca/tyler/flares/issues/55)
- Fixed part of an issue where memory could increase over time despite clicking the clear button. There is still some edge cases where this can occur
- Fixed an issue when clearing a bubble, reloading the same file, and clicking it would cause the app to crash. Fixes [Issue 57](https://git.research.dezeeuw.ca/tyler/flares/issues/57)
- Picking a .txt or .xlsx file now has both in the same file selection instead of having to select which extension was desired
- Fixed an issue where the fOLD files were not included in the Windows version. Fixes [Issue 60](https://git.research.dezeeuw.ca/tyler/flares/issues/60)
- Added a new parameter to the right side of the screen: EPOCH_EVENTS_HANDLING. Fixes [Issue 58](https://git.research.dezeeuw.ca/tyler/flares/issues/58)
- EPOCH_EVENTS_HANDLING defaults to 'shift' compared to previous versions where the default would have been equivalent to 'strict'
- The label for ENHANCE_NEGATIVE_CORRELATION no longer gets cut off by its dropdown selection
- Loading in files and folders have changes to immediately show their bubbles having a respective loading symbol on each bubble
- Once the file has been completely loaded and processed, the loading symbol will change to a green checkmark and clicking will be enabled
- The metadata in the File infomation widget is now saved to prevent recalculations every time the bubble is selected
- The status bar will now say loading while the bubbles are being processed, and loaded once the processing has completed
- This new loading method will prevent the application from hanging when loading lots of files at once. Fixes [Issue 59](https://git.research.dezeeuw.ca/tyler/flares/issues/59)
- Fixed text allignment for the first paragraph when a bubble is selected in the 'File information' widget
- The three main widgets are now resizable! All of them have minimum widths to ensure they do not get too squished
- Added a new option 'Reset Window Layout' under the View menu that will resize all widgets back to their default sizes
- Added a new terminal command 'version' that will print the applications current version
# Version 1.2.2
- Added 'Update events in snirf file (BLAZES)...' and renamed 'Update events in snirf file...' to 'Update events in snirf file (BORIS)...' under the Options menu
- The BLAZES option will assign events that are exported directly from the software [BLAZES](https://git.research.dezeeuw.ca/tyler/blazes)
- Moved the updating logic to a seperate file for better reusability and generalization
- Fixed 'Toggle Status Bar' having no effect on the visibility of the status bar
- Fixed a bug when updating optode positions that would prevent .txt files from being selected. Fixes [Issue 54](https://git.research.dezeeuw.ca/tyler/flares/issues/54)
- Fixed a missing dependency in the standalone application when attempting to use an .xlsx file to update optode positions
# Version 1.2.1 # Version 1.2.1
- Added a requirements.txt file to ensure compatibility - Added a requirements.txt file to ensure compatibility

179
flares.py
View File

@@ -16,12 +16,14 @@ from io import BytesIO
from typing import Any, Optional, cast, Literal, Union from typing import Any, Optional, cast, Literal, Union
from itertools import compress from itertools import compress
from copy import deepcopy from copy import deepcopy
from multiprocessing import Queue from multiprocessing import Queue, Pool
import os.path as op import os.path as op
import re import re
import traceback import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed from concurrent.futures import ProcessPoolExecutor, as_completed
from queue import Empty from queue import Empty
import time
import multiprocessing as mp
# External library imports # External library imports
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
@@ -169,6 +171,8 @@ H_FREQ: float
L_TRANS_BANDWIDTH: float L_TRANS_BANDWIDTH: float
H_TRANS_BANDWIDTH: float H_TRANS_BANDWIDTH: float
EPOCH_HANDLING: str
RESAMPLE: bool RESAMPLE: bool
RESAMPLE_FREQ: int RESAMPLE_FREQ: int
STIM_DUR: float STIM_DUR: float
@@ -247,6 +251,28 @@ REQUIRED_KEYS: dict[str, Any] = {
} }
import logging
import os
import psutil
import traceback
audit_log = logging.getLogger("memory_audit")
audit_log.setLevel(logging.INFO)
audit_log.propagate = False # This prevents it from talking to other loggers
# 2. Add a file handler specifically for this audit logger
if not audit_log.handlers:
fh = logging.FileHandler('flares_memory_audit.log')
fh.setFormatter(logging.Formatter('%(asctime)s | PID: %(process)d | %(message)s'))
audit_log.addHandler(fh)
def get_mem_mb():
return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
class ProcessingError(Exception): class ProcessingError(Exception):
def __init__(self, message: str = "Something went wrong!"): def __init__(self, message: str = "Something went wrong!"):
self.message = message self.message = message
@@ -370,58 +396,92 @@ def gui_entry(config: dict[str, Any], gui_queue: Queue, progress_queue: Queue) -
t.join(timeout=5) # prevent permanent hang t.join(timeout=5) # prevent permanent hang
def process_participant_worker(file_path, file_params, file_metadata, result_queue, progress_queue):
file_name = os.path.basename(file_path)
def process_participant_worker(args): try:
file_path, file_params, file_metadata, progress_queue = args # 1. Setup
set_config_me(file_params) set_config_me(file_params)
set_metadata(file_path, file_metadata) set_metadata(file_path, file_metadata)
logger.info(f"DEBUG: Metadata for {file_path}: AGE={globals().get('AGE')}, GENDER={globals().get('GENDER')}, GROUP={globals().get('GROUP')}")
def progress_callback(step_idx): def progress_callback(step_idx):
if progress_queue: if progress_queue:
progress_queue.put(('progress', file_path, step_idx)) # We use put_nowait to prevent the worker from hanging on a full queue
try: try:
progress_queue.put_nowait(('progress', file_path, step_idx))
except: pass
# 2. Process
result = process_participant(file_path, progress_callback=progress_callback) result = process_participant(file_path, progress_callback=progress_callback)
return file_path, result, None
# 3. Report Success
result_queue.put((file_path, result, None))
except Exception as e: except Exception as e:
error_trace = traceback.format_exc() result_queue.put((file_path, None, str(e)))
return file_path, None, (str(e), error_trace)
finally:
# --- THE FIX: MANDATORY EXIT ---
# Explicitly flush the logs and force the process to terminate
audit_log.info(f"Worker for {file_name} calling hard exit.")
sys.stdout.flush()
sys.stderr.flush()
# We use os._exit(0) as a nuclear option if sys.exit() is being caught by a try/except
os._exit(0)
def process_multiple_participants(file_paths, file_params, file_metadata, progress_queue=None, max_workers=None): def process_multiple_participants(file_paths, file_params, file_metadata, progress_queue=None, max_workers=6):
audit_log.info(f"--- SESSION START: {len(file_paths)} files ---")
pending_files = list(file_paths)
active_processes = [] # List of tuples: (Process object, file_path)
results_by_file = {} results_by_file = {}
file_args = [(file_path, file_params, file_metadata, progress_queue) for file_path in file_paths] # We use a manager queue so it handles IPC serialization cleanly
manager = mp.Manager()
result_queue = manager.Queue()
with ProcessPoolExecutor(max_workers=max_workers) as executor: # Loop continues as long as there are files to process OR workers still running
futures = {executor.submit(process_participant_worker, arg): arg[0] for arg in file_args} while pending_files or active_processes:
for future in as_completed(futures): # 1. SPWAN WORKERS: Only spawn if we are under the limit AND have files left
file_path = futures[future] while len(active_processes) < max_workers and pending_files:
file_path = pending_files.pop(0)
p = mp.Process(
target=process_participant_worker,
args=(file_path, file_params, file_metadata, result_queue, progress_queue)
)
p.start()
active_processes.append((p, file_path))
audit_log.info(f"Spawned worker. Active processes: {len(active_processes)}")
# 2. COLLECT RESULTS: Drain the queue continuously so workers don't deadlock
while not result_queue.empty():
try: try:
file_path, result, error = future.result() res_path, result, error = result_queue.get_nowait()
if error: if not error:
error_message, error_traceback = error results_by_file[res_path] = result
if progress_queue: else:
progress_queue.put({ audit_log.error(f"Worker failed on {os.path.basename(res_path)}: {error}")
"type": "error", except Exception:
"file": file_path, break # Queue is empty or busy
"error": error_message,
"traceback": error_traceback
})
continue
results_by_file[file_path] = result # 3. CLEANUP: Check for finished processes and remove them
except Exception as e: for p, f_path in active_processes[:]: # Iterate over a slice copy
print(f"Unexpected error processing {file_path}: {e}") if not p.is_alive():
p.join() # Formally close the process to free OS resources
active_processes.remove((p, f_path))
audit_log.info(f"Worker finished. Active processes dropping to: {len(active_processes)}")
# Brief pause to prevent this while loop from pegging your CPU to 100%
time.sleep(0.5)
audit_log.info("--- SESSION COMPLETE ---")
return results_by_file return results_by_file
def markbad(data, ax, ch_names: list[str]) -> None: def markbad(data, ax, ch_names: list[str]) -> None:
""" """
Add a strikethrough to a plot for channels marked as bad. Add a strikethrough to a plot for channels marked as bad.
@@ -1143,16 +1203,48 @@ def filter_the_data(raw_haemo):
def safe_create_epochs(raw, events, event_dict, tmin, tmax, baseline):
"""
Attempts to create epochs, shifting event times slightly if
sample collisions are detected.
"""
shift_increment = 1.0 / raw.info['sfreq'] # The duration of exactly one sample
for attempt in range(10): # Limit attempts to avoid infinite loops
try:
epochs = Epochs(
raw, events, event_id=event_dict,
tmin=tmin, tmax=tmax, baseline=baseline,
preload=True, verbose=False
)
return epochs
except RuntimeError as e:
if "Event time samples were not unique" in str(e):
# Find duplicates in the events array (column 0 is the sample index)
vals, counts = np.unique(events[:, 0], return_counts=True)
duplicates = vals[counts > 1]
# Shift the second occurrence of every duplicate by 1 sample
for dup in duplicates:
idx = np.where(events[:, 0] == dup)[0][1:] # Get all but the first
events[idx, 0] += 1
print(f"Collision detected. Nudging events by {shift_increment:.4f}s and retrying...")
continue
else:
raise e # Raise if it's a different Runtime Error
raise RuntimeError("Could not resolve event collisions after 10 attempts.")
def epochs_calculations(raw_haemo, events, event_dict): def epochs_calculations(raw_haemo, events, event_dict):
fig_epochs = [] # List to store figures fig_epochs = [] # List to store figures
# Create epochs from raw data if EPOCH_HANDLING == 'shift':
epochs = Epochs(raw_haemo, epochs = safe_create_epochs(raw=raw_haemo, events=events, event_dict=event_dict, tmin=-5, tmax=15, baseline=(None, 0))
events, else:
event_id=event_dict, epochs = Epochs(raw_haemo, events, event_id=event_dict, tmin=-5, tmax=15, baseline=(None, 0))
tmin=-5,
tmax=15,
baseline=(None, 0))
# Make a copy of the epochs and drop bad ones # Make a copy of the epochs and drop bad ones
epochs2 = epochs.copy() epochs2 = epochs.copy()
@@ -1582,16 +1674,13 @@ def resource_path(relative_path):
def fold_channels(raw: BaseRaw) -> None: def fold_channels(raw: BaseRaw) -> None:
# if getattr(sys, 'frozen', False): # Locate the fOLD excel files
if getattr(sys, 'frozen', False):
set_config('MNE_NIRS_FOLD_PATH', resource_path("../../mne_data/fOLD/fOLD-public-master/Supplementary")) # type: ignore
else:
path = os.path.expanduser("~") + "/mne_data/fOLD/fOLD-public-master/Supplementary" path = os.path.expanduser("~") + "/mne_data/fOLD/fOLD-public-master/Supplementary"
logger.info(path)
set_config('MNE_NIRS_FOLD_PATH', resource_path(path)) # type: ignore set_config('MNE_NIRS_FOLD_PATH', resource_path(path)) # type: ignore
# # Locate the fOLD excel files
# else:
# logger.info("yabba")
# set_config('MNE_NIRS_FOLD_PATH', resource_path("../../mne_data/fOLD/fOLD-public-master/Supplementary")) # type: ignore
output = None output = None
# List to store the results # List to store the results

View File

@@ -18,11 +18,12 @@ import subprocess
from datetime import datetime from datetime import datetime
PLATFORM_NAME = platform.system().lower() PLATFORM_NAME = platform.system().lower()
APP_NAME = "flares"
if PLATFORM_NAME == 'darwin': if PLATFORM_NAME == 'darwin':
LOG_FILE = os.path.join(os.path.dirname(sys.executable), "../../../flares_updater.log") LOG_FILE = os.path.join(os.path.dirname(sys.executable), f"../../../{APP_NAME}_updater.log")
else: else:
LOG_FILE = os.path.join(os.getcwd(), "flares_updater.log") LOG_FILE = os.path.join(os.getcwd(), f"{APP_NAME}_updater.log")
def log(msg): def log(msg):
@@ -147,7 +148,7 @@ def copy_update_files_darwin(src_folder, dest_folder, updater_name):
def remove_quarantine(app_path): def remove_quarantine(app_path):
script = f''' script = f'''
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "FLARES needs privileges to finish the update. (1/2)" do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{APP_NAME} needs privileges to finish the update. (1/2)"
''' '''
try: try:
subprocess.run(['osascript', '-e', script], check=True) subprocess.run(['osascript', '-e', script], check=True)
@@ -162,7 +163,7 @@ def main():
log(f"[Updater] sys.argv: {sys.argv}") log(f"[Updater] sys.argv: {sys.argv}")
if len(sys.argv) != 3: if len(sys.argv) != 3:
log("Invalid arguments. Usage: flares_updater <update_folder> <main_app_executable>") log(f"Invalid arguments. Usage: {APP_NAME}_updater <update_folder> <main_app_executable>")
sys.exit(1) sys.exit(1)
update_folder = sys.argv[1] update_folder = sys.argv[1]
@@ -215,7 +216,7 @@ def main():
if PLATFORM_NAME == 'darwin': if PLATFORM_NAME == 'darwin':
log(f'Attempting to delete {ppparent_dir}') log(f'Attempting to delete {ppparent_dir}')
delete_path(ppparent_dir) delete_path(ppparent_dir)
update_folder = os.path.join(sys.argv[1], "flares-darwin") update_folder = os.path.join(sys.argv[1], f"{APP_NAME}-darwin")
copy_update_files_darwin(update_folder, pppparent_dir, updater_name) copy_update_files_darwin(update_folder, pppparent_dir, updater_name)
else: else:

View File

@@ -1,2 +0,0 @@
[LocalizedFileNames]
updater.png=@updater.png,0

View File

@@ -0,0 +1 @@
<svg xmlns="http://www.w3.org/2000/svg" height="24px" viewBox="0 -960 960 960" width="24px" fill="#1f1f1f"><path d="M200-120q-33 0-56.5-23.5T120-200v-560q0-33 23.5-56.5T200-840h560q33 0 56.5 23.5T840-760v560q0 33-23.5 56.5T760-120H200Zm0-80h320v-560H200v560Zm560 0v-560H600v560h160Z"/></svg>

After

Width:  |  Height:  |  Size: 291 B

1895
main.py

File diff suppressed because it is too large Load Diff

Binary file not shown.

539
updater.py Normal file
View File

@@ -0,0 +1,539 @@
"""
Filename: updater.py
Description: Generic updater file
Author: Tyler de Zeeuw
License: GPL-3.0
"""
# Built-in imports
import os
import re
import sys
import time
import shlex
import shutil
import zipfile
import traceback
import subprocess
# External library imports
import psutil
import requests
from PySide6.QtWidgets import QMessageBox
from PySide6.QtCore import QThread, Signal, QObject
class UpdateDownloadThread(QThread):
"""
Thread that downloads and extracts an update package and emits a signal on completion or error.
Args:
download_url (str): URL of the update zip file to download.
latest_version (str): Version string of the latest update.
"""
update_ready = Signal(str, str)
error_occurred = Signal(str)
def __init__(self, download_url, latest_version, platform_name, app_name):
super().__init__()
self.download_url = download_url
self.latest_version = latest_version
self.platform_name = platform_name
self.app_name = app_name
def run(self):
try:
local_filename = os.path.basename(self.download_url)
if self.platform_name == 'darwin':
tmp_dir = f'/tmp/{self.app_name}tempupdate'
os.makedirs(tmp_dir, exist_ok=True)
local_path = os.path.join(tmp_dir, local_filename)
else:
local_path = os.path.join(os.getcwd(), local_filename)
# Download the file
with requests.get(self.download_url, stream=True, timeout=15) as r:
r.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
# Extract folder name (remove .zip)
if self.platform_name == 'darwin':
extract_folder = os.path.splitext(local_filename)[0]
extract_path = os.path.join(tmp_dir, extract_folder)
else:
extract_folder = os.path.splitext(local_filename)[0]
extract_path = os.path.join(os.getcwd(), extract_folder)
# Create the folder if not exists
os.makedirs(extract_path, exist_ok=True)
# Extract the zip file contents
if self.platform_name == 'darwin':
subprocess.run(['ditto', '-xk', local_path, extract_path], check=True)
else:
with zipfile.ZipFile(local_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
# Remove the zip once extracted and emit a signal
os.remove(local_path)
self.update_ready.emit(self.latest_version, extract_path)
except Exception as e:
# Emit a signal signifying failure
self.error_occurred.emit(str(e))
class UpdateCheckThread(QThread):
"""
Thread that checks for updates by querying the API and emits a signal based on the result.
Signals:
download_requested(str, str): Emitted with (download_url, latest_version) when an update is available.
no_update_available(): Emitted when no update is found or current version is up to date.
error_occurred(str): Emitted with an error message if the update check fails.
"""
download_requested = Signal(str, str)
no_update_available = Signal()
error_occurred = Signal(str)
def __init__(self, api_url, api_url_sec, current_version, platform_name, app_name):
super().__init__()
self.api_url = api_url
self.api_url_sec = api_url_sec
self.current_version = current_version
self.platform_name = platform_name
self.app_name = app_name
def run(self):
# if not getattr(sys, 'frozen', False):
# self.error_occurred.emit("Application is not frozen (Development mode).")
# return
try:
latest_version, download_url = self.get_latest_release_for_platform()
if not latest_version:
self.no_update_available.emit()
return
if not download_url:
self.error_occurred.emit(f"No download available for platform '{self.platform_name}'")
return
if self.version_compare(latest_version, self.current_version) > 0:
self.download_requested.emit(download_url, latest_version)
else:
self.no_update_available.emit()
except Exception as e:
self.error_occurred.emit(f"Update check failed: {e}")
def version_compare(self, v1, v2):
def normalize(v): return [int(x) for x in v.split(".")]
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
def get_latest_release_for_platform(self):
urls = [self.api_url, self.api_url_sec]
for url in urls:
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
releases = response.json()
if not releases:
continue
latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None)
if not latest:
continue
tag = latest["tag_name"].lstrip("v")
for asset in latest.get("assets", []):
if self.platform_name in asset["name"].lower():
return tag, asset["browser_download_url"]
return tag, None
except (requests.RequestException, ValueError) as e:
continue
return None, None
class LocalPendingUpdateCheckThread(QThread):
"""
Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly.
Args:
current_version (str): Current application version.
platform_suffix (str): Platform-specific suffix to identify update folders.
"""
pending_update_found = Signal(str, str)
no_pending_update = Signal()
def __init__(self, current_version, platform_suffix, platform_name, app_name):
super().__init__()
self.current_version = current_version
self.platform_suffix = platform_suffix
self.platform_name = platform_name
self.app_name = app_name
def version_compare(self, v1, v2):
def normalize(v): return [int(x) for x in v.split(".")]
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
def run(self):
if self.platform_name == 'darwin':
cwd = f'/tmp/{self.app_name}tempupdate'
else:
cwd = os.getcwd()
pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$")
found = False
try:
for item in os.listdir(cwd):
folder_path = os.path.join(cwd, item)
if os.path.isdir(folder_path) and item.endswith(self.platform_suffix):
match = pattern.match(item)
if match:
folder_version = match.group(1)
if self.version_compare(folder_version, self.current_version) > 0:
self.pending_update_found.emit(folder_version, folder_path)
found = True
break
except:
pass
if not found:
self.no_pending_update.emit()
class UpdateManager(QObject):
"""
Orchestrates the update process.
Main apps should instantiate this and call check_for_updates().
"""
def __init__(self, main_window, api_url, api_url_sec, current_version, platform_name, platform_suffix, app_name):
super().__init__()
self.parent = main_window
self.api_url = api_url
self.api_url_sec = api_url_sec
self.current_version = current_version
self.platform_name = platform_name
self.platform_suffix = platform_suffix
self.app_name = app_name
self.pending_update_version = None
self.pending_update_path = None
def manual_check_for_updates(self):
self.local_check_thread = LocalPendingUpdateCheckThread(self.current_version, self.platform_suffix, self.platform_name, self.app_name)
self.local_check_thread.pending_update_found.connect(self.on_pending_update_found)
self.local_check_thread.no_pending_update.connect(self.on_no_pending_update)
self.local_check_thread.start()
def on_pending_update_found(self, version, folder_path):
self.parent.statusBar().showMessage(f"Pending update found: version {version}")
self.pending_update_version = version
self.pending_update_path = folder_path
self.show_pending_update_popup()
def on_no_pending_update(self):
# No pending update found locally, start server check directly
self.parent.statusBar().showMessage("No pending local update found. Checking server...")
self.start_update_check_thread()
def show_pending_update_popup(self):
msg_box = QMessageBox(self.parent)
msg_box.setWindowTitle("Pending Update Found")
msg_box.setText(f"A previously downloaded update for {self.app_name.upper()} (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?")
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
msg_box.exec()
if msg_box.clickedButton() == install_now_button:
self.install_update(self.pending_update_path)
else:
self.parent.statusBar().showMessage("Pending update available. Install later.")
# After user dismisses, still check the server for new updates
self.start_update_check_thread()
def start_update_check_thread(self):
self.check_thread = UpdateCheckThread(self.api_url, self.api_url_sec, self.current_version, self.platform_name, self.app_name)
self.check_thread.download_requested.connect(self.on_server_update_requested)
self.check_thread.no_update_available.connect(self.on_server_no_update)
self.check_thread.error_occurred.connect(self.on_error)
self.check_thread.start()
def on_server_no_update(self):
self.parent.statusBar().showMessage("No new updates found on server.", 5000)
def on_server_update_requested(self, download_url, latest_version):
if self.pending_update_version:
cmp = self.version_compare(latest_version, self.pending_update_version)
if cmp > 0:
# Server version is newer than pending update
self.parent.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...")
try:
shutil.rmtree(self.pending_update_path)
self.parent.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}")
except Exception as e:
self.parent.statusBar().showMessage(f"Failed to delete old update folder: {e}")
# Clear pending update info so new download proceeds
self.pending_update_version = None
self.pending_update_path = None
# Download the new update
self.download_update(download_url, latest_version)
elif cmp == 0:
# Versions equal, no download needed
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.")
else:
# Server version older than pending? Unlikely but just keep pending update
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.")
else:
# No pending update, just download
self.download_update(download_url, latest_version)
def download_update(self, download_url, latest_version):
self.parent.statusBar().showMessage("Downloading update...")
self.download_thread = UpdateDownloadThread(download_url, latest_version, self.platform_name, self.app_name)
self.download_thread.update_ready.connect(self.on_update_ready)
self.download_thread.error_occurred.connect(self.on_error)
self.download_thread.start()
def on_update_ready(self, latest_version, extract_folder):
self.parent.statusBar().showMessage("Update downloaded and extracted.")
msg_box = QMessageBox(self.parent)
msg_box.setWindowTitle("Update Ready")
msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?")
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
msg_box.exec()
if msg_box.clickedButton() == install_now_button:
self.install_update(extract_folder)
else:
self.parent.statusBar().showMessage("Update ready. Install later.")
def install_update(self, extract_folder):
# Path to updater executable
if self.platform_name == 'windows':
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater.exe")
elif self.platform_name == 'darwin':
if getattr(sys, 'frozen', False):
updater_path = os.path.join(os.path.dirname(sys.executable), f"../../../{self.app_name}_updater.app")
else:
updater_path = os.path.join(os.getcwd(), f"../{self.app_name}_updater.app")
elif self.platform_name == 'linux':
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater")
else:
updater_path = os.getcwd()
if not os.path.exists(updater_path):
QMessageBox.critical(self.parent, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}")
return
# Launch updater with extracted folder path as argument
try:
# Pass current app's executable path for updater to relaunch
main_app_executable = os.path.abspath(sys.argv[0])
print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"')
if self.platform_name == 'darwin':
subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable])
else:
subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path))
# Close the current app so updater can replace files
sys.exit(0)
except Exception as e:
QMessageBox.critical(self.parent, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}")
def on_error(self, message):
# print(f"Error: {message}")
self.parent.statusBar().showMessage(f"Error occurred during update process. {message}")
def version_compare(self, v1, v2):
def normalize(v): return [int(x) for x in v.split(".")]
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
def wait_for_process_to_exit(process_name, timeout=10):
"""
Waits for a process with the specified name to exit within a timeout period.
Args:
process_name (str): Name (or part of the name) of the process to wait for.
timeout (int, optional): Maximum time to wait in seconds. Defaults to 10.
Returns:
bool: True if the process exited before the timeout, False otherwise.
"""
print(f"Waiting for {process_name} to exit...")
deadline = time.time() + timeout
while time.time() < deadline:
still_running = False
for proc in psutil.process_iter(['name']):
try:
if proc.info['name'] and process_name.lower() in proc.info['name'].lower():
still_running = True
print(f"Still running: {proc.info['name']} (PID: {proc.pid})")
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if not still_running:
print(f"{process_name} has exited.")
return True
time.sleep(0.5)
print(f"{process_name} did not exit in time.")
return False
def finish_update_if_needed(platform_name, app_name):
"""
Completes a pending application update if '--finish-update' is present in the command-line arguments.
"""
if "--finish-update" in sys.argv:
print("Finishing update...")
if platform_name == 'darwin':
app_dir = f'/tmp/{app_name}tempupdate'
else:
app_dir = os.getcwd()
# 1. Find update folder
update_folder = None
for entry in os.listdir(app_dir):
entry_path = os.path.join(app_dir, entry)
if os.path.isdir(entry_path) and entry.startswith(f"{app_name}-") and entry.endswith("-" + platform_name):
update_folder = os.path.join(app_dir, entry)
break
if update_folder is None:
print("No update folder found. Skipping update steps.")
return
if platform_name == 'darwin':
update_folder = os.path.join(update_folder, f"{app_name}-darwin")
# 2. Wait for updater to exit
print(f"Waiting for {app_name}_updater to exit...")
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] and f"{app_name}_updater" in proc.info['name'].lower():
try:
proc.wait(timeout=5)
except psutil.TimeoutExpired:
print(f"Force killing lingering {app_name}_updater")
proc.kill()
# 3. Replace the updater
if platform_name == 'windows':
new_updater = os.path.join(update_folder, f"{app_name}_updater.exe")
dest_updater = os.path.join(app_dir, f"{app_name}_updater.exe")
elif platform_name == 'darwin':
new_updater = os.path.join(update_folder, f"{app_name}_updater.app")
dest_updater = os.path.abspath(os.path.join(sys.executable, f"../../../../{app_name}_updater.app"))
elif platform_name == 'linux':
new_updater = os.path.join(update_folder, f"{app_name}_updater")
dest_updater = os.path.join(app_dir, f"{app_name}_updater")
else:
print("Unknown Platform")
new_updater = os.getcwd()
dest_updater = os.getcwd()
print(f"New updater is {new_updater}")
print(f"Dest updater is {dest_updater}")
print("Writable?", os.access(dest_updater, os.W_OK))
print("Executable path:", sys.executable)
print("Trying to copy:", new_updater, "->", dest_updater)
if os.path.exists(new_updater):
try:
if os.path.exists(dest_updater):
if platform_name == 'darwin':
try:
if os.path.isdir(dest_updater):
shutil.rmtree(dest_updater)
print(f"Deleted directory: {dest_updater}")
else:
os.remove(dest_updater)
print(f"Deleted file: {dest_updater}")
except Exception as e:
print(f"Error deleting {dest_updater}: {e}")
else:
os.remove(dest_updater)
if platform_name == 'darwin':
wait_for_process_to_exit(f"{app_name}_updater", timeout=10)
subprocess.check_call(["ditto", new_updater, dest_updater])
else:
shutil.copy2(new_updater, dest_updater)
if platform_name in ('linux', 'darwin'):
os.chmod(dest_updater, 0o755)
if platform_name == 'darwin':
remove_quarantine(dest_updater, app_name)
print(f"{app_name}_updater replaced.")
except Exception as e:
print(f"Failed to replace {app_name}_updater: {e}")
# 4. Delete the update folder
try:
if platform_name == 'darwin':
shutil.rmtree(app_dir)
else:
shutil.rmtree(update_folder)
except Exception as e:
print(f"Failed to delete update folder: {e}")
QMessageBox.information(None, "Update Complete", "The application has been successfully updated.")
sys.argv.remove("--finish-update")
def remove_quarantine(app_path, app_name):
"""
Removes the macOS quarantine attribute from the specified application path.
"""
script = f'''
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{app_name.upper()} needs privileges to finish the update. (2/2)"
'''
try:
subprocess.run(['osascript', '-e', script], check=True)
print("✅ Quarantine attribute removed.")
except subprocess.CalledProcessError as e:
print("❌ Failed to remove quarantine attribute.")
print(e)