From c279f269daa48ae382f68a27887a176839b27d38 Mon Sep 17 00:00:00 2001 From: tyler Date: Fri, 13 Mar 2026 13:00:57 -0700 Subject: [PATCH] update logic seperated and blazes import --- main.py | 947 ++++++++++++++++++++++++----------------------------- updater.py | 540 ++++++++++++++++++++++++++++++ 2 files changed, 964 insertions(+), 523 deletions(-) create mode 100644 updater.py diff --git a/main.py b/main.py index d3c4497..06ed309 100644 --- a/main.py +++ b/main.py @@ -8,14 +8,11 @@ License: GPL-3.0 # Built-in imports import os -import re import sys import json import time -import shlex import pickle import shutil -import zipfile import platform import traceback import subprocess @@ -28,7 +25,8 @@ from enum import Enum, auto import numpy as np import pandas as pd import psutil -import requests + +from updater import finish_update_if_needed, UpdateManager, LocalPendingUpdateCheckThread from mne.io import read_raw_snirf from mne.preprocessing.nirs import source_detector_distances @@ -46,12 +44,12 @@ from PySide6.QtGui import QAction, QKeySequence, QIcon, QIntValidator, QDoubleVa from PySide6.QtSvgWidgets import QSvgWidget # needed to show svgs when app is not frozen -CURRENT_VERSION = "1.2.1" - +CURRENT_VERSION = "1.2.0" API_URL = "https://git.research.dezeeuw.ca/api/v1/repos/tyler/flares/releases" API_URL_SECONDARY = "https://git.research2.dezeeuw.ca/api/v1/repos/tyler/flares/releases" - PLATFORM_NAME = platform.system().lower() +APP_NAME = "flares" + # Selectable parameters on the right side of the window SECTIONS = [ @@ -225,9 +223,6 @@ SECTIONS = [ ] - - - class SaveProjectThread(QThread): finished_signal = Signal(str) error_signal = Signal(str) @@ -316,189 +311,6 @@ class TerminalWindow(QWidget): return f"Available commands: {', '.join(self.commands.keys())}" -class UpdateDownloadThread(QThread): - """ - Thread that downloads and extracts an update package and emits a signal on completion or error. - - Args: - download_url (str): URL of the update zip file to download. - latest_version (str): Version string of the latest update. - """ - - update_ready = Signal(str, str) - error_occurred = Signal(str) - - def __init__(self, download_url, latest_version): - super().__init__() - self.download_url = download_url - self.latest_version = latest_version - - def run(self): - try: - local_filename = os.path.basename(self.download_url) - - if PLATFORM_NAME == 'darwin': - tmp_dir = '/tmp/flarestempupdate' - os.makedirs(tmp_dir, exist_ok=True) - local_path = os.path.join(tmp_dir, local_filename) - else: - local_path = os.path.join(os.getcwd(), local_filename) - - # Download the file - with requests.get(self.download_url, stream=True, timeout=15) as r: - r.raise_for_status() - with open(local_path, 'wb') as f: - for chunk in r.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) - - # Extract folder name (remove .zip) - if PLATFORM_NAME == 'darwin': - extract_folder = os.path.splitext(local_filename)[0] - extract_path = os.path.join(tmp_dir, extract_folder) - - else: - extract_folder = os.path.splitext(local_filename)[0] - extract_path = os.path.join(os.getcwd(), extract_folder) - - # Create the folder if not exists - os.makedirs(extract_path, exist_ok=True) - - # Extract the zip file contents - if PLATFORM_NAME == 'darwin': - subprocess.run(['ditto', '-xk', local_path, extract_path], check=True) - else: - with zipfile.ZipFile(local_path, 'r') as zip_ref: - zip_ref.extractall(extract_path) - - # Remove the zip once extracted and emit a signal - os.remove(local_path) - self.update_ready.emit(self.latest_version, extract_path) - - except Exception as e: - # Emit a signal signifying failure - self.error_occurred.emit(str(e)) - - - -class UpdateCheckThread(QThread): - """ - Thread that checks for updates by querying the API and emits a signal based on the result. - - Signals: - download_requested(str, str): Emitted with (download_url, latest_version) when an update is available. - no_update_available(): Emitted when no update is found or current version is up to date. - error_occurred(str): Emitted with an error message if the update check fails. - """ - - download_requested = Signal(str, str) - no_update_available = Signal() - error_occurred = Signal(str) - - def run(self): - if not getattr(sys, 'frozen', False): - self.error_occurred.emit("Application is not frozen (Development mode).") - return - try: - latest_version, download_url = self.get_latest_release_for_platform() - if not latest_version: - self.no_update_available.emit() - return - - if not download_url: - self.error_occurred.emit(f"No download available for platform '{PLATFORM_NAME}'") - return - - if self.version_compare(latest_version, CURRENT_VERSION) > 0: - self.download_requested.emit(download_url, latest_version) - else: - self.no_update_available.emit() - - except Exception as e: - self.error_occurred.emit(f"Update check failed: {e}") - - def version_compare(self, v1, v2): - def normalize(v): return [int(x) for x in v.split(".")] - return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) - - def get_latest_release_for_platform(self): - urls = [API_URL, API_URL_SECONDARY] - for url in urls: - try: - - response = requests.get(url, timeout=5) - response.raise_for_status() - releases = response.json() - - if not releases: - continue - - latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None) - - if not latest: - continue - - tag = latest["tag_name"].lstrip("v") - - for asset in latest.get("assets", []): - if PLATFORM_NAME in asset["name"].lower(): - return tag, asset["browser_download_url"] - - return tag, None - except (requests.RequestException, ValueError) as e: - continue - return None, None - - -class LocalPendingUpdateCheckThread(QThread): - """ - Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly. - - Args: - current_version (str): Current application version. - platform_suffix (str): Platform-specific suffix to identify update folders. - """ - - pending_update_found = Signal(str, str) - no_pending_update = Signal() - - def __init__(self, current_version, platform_suffix): - super().__init__() - self.current_version = current_version - self.platform_suffix = platform_suffix - - def version_compare(self, v1, v2): - def normalize(v): return [int(x) for x in v.split(".")] - return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) - - def run(self): - if PLATFORM_NAME == 'darwin': - cwd = '/tmp/flarestempupdate' - else: - cwd = os.getcwd() - - pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$") - found = False - - try: - for item in os.listdir(cwd): - folder_path = os.path.join(cwd, item) - if os.path.isdir(folder_path) and item.endswith(self.platform_suffix): - match = pattern.match(item) - if match: - folder_version = match.group(1) - if self.version_compare(folder_version, self.current_version) > 0: - self.pending_update_found.emit(folder_version, folder_path) - found = True - break - except: - pass - - if not found: - self.no_pending_update.emit() - - - class AboutWindow(QWidget): """ Simple About window displaying basic application information. @@ -1379,6 +1191,399 @@ class UpdateEventsWindow(QWidget): return save_path + +class UpdateEventsBlazesWindow(QWidget): + + def __init__(self, parent=None, mode=EventUpdateMode.WRITE_SNIRF, caller=None): + super().__init__(parent, Qt.WindowType.Window) + + self.mode = mode + self.caller = caller or self.__class__.__name__ + self.setWindowTitle("Update event markers (BLAZES)") + self.resize(760, 200) + + self.label_file_a = QLabel("SNIRF file:") + self.line_edit_file_a = QLineEdit() + self.line_edit_file_a.setReadOnly(True) + self.btn_browse_a = QPushButton("Browse .snirf") + self.btn_browse_a.clicked.connect(self.browse_file_a) + + self.label_file_b = QLabel("BLAZES file:") + self.line_edit_file_b = QLineEdit() + self.line_edit_file_b.setReadOnly(True) + self.btn_browse_b = QPushButton("Browse .blaze") + self.btn_browse_b.clicked.connect(self.browse_file_b) + + self.label_events = QLabel("Events in selected blazes file:") + self.combo_events = QComboBox() + self.combo_events.setEnabled(False) + + self.label_snirf_events = QLabel("Events in SNIRF file:") + self.combo_snirf_events = QComboBox() + self.combo_snirf_events.setEnabled(False) + + self.btn_clear = QPushButton("Clear") + self.btn_go = QPushButton("Go") + self.btn_clear.clicked.connect(self.clear_files) + self.btn_go.clicked.connect(self.go_action) + + # --- + layout = QVBoxLayout() + self.description = QLabel() + self.description.setTextFormat(Qt.TextFormat.RichText) + self.description.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction) + self.description.setOpenExternalLinks(True) + + self.description.setText("The events that are present in a snirf file may not be the events that are to be studied and examined.
" + "Utilizing different software and video recordings, it is easy enough to see when an action actually occured in a file.
" + "The software BLAZES is used to create these events in video files, and these events can be applied to the snirf file
" + "selected below by time syncing it to an event that it shares with the snirf file.") + + layout.addWidget(self.description) + + help_text_a = "Select the SNIRF (.snirf) file to update with new event markers." + + file_a_layout = QHBoxLayout() + + # Help button on the left + help_btn_a = QPushButton("?") + help_btn_a.setFixedWidth(25) + help_btn_a.setToolTip(help_text_a) + help_btn_a.clicked.connect(lambda _, text=help_text_a: self.show_help_popup(text)) + file_a_layout.addWidget(help_btn_a) + + # Container for label + line_edit + browse button with tooltip + file_a_container = QWidget() + file_a_container_layout = QHBoxLayout() + file_a_container_layout.setContentsMargins(0, 0, 0, 0) + file_a_container_layout.addWidget(self.label_file_a) + file_a_container_layout.addWidget(self.line_edit_file_a) + file_a_container_layout.addWidget(self.btn_browse_a) + file_a_container.setLayout(file_a_container_layout) + file_a_container.setToolTip(help_text_a) + + file_a_layout.addWidget(file_a_container) + layout.addLayout(file_a_layout) + + + help_text_b = "Provide a .blaze output file that contains events for this participant." + + file_b_layout = QHBoxLayout() + + help_btn_b = QPushButton("?") + help_btn_b.setFixedWidth(25) + help_btn_b.setToolTip(help_text_b) + help_btn_b.clicked.connect(lambda _, text=help_text_b: self.show_help_popup(text)) + file_b_layout.addWidget(help_btn_b) + + file_b_container = QWidget() + file_b_container_layout = QHBoxLayout() + file_b_container_layout.setContentsMargins(0, 0, 0, 0) + file_b_container_layout.addWidget(self.label_file_b) + file_b_container_layout.addWidget(self.line_edit_file_b) + file_b_container_layout.addWidget(self.btn_browse_b) + file_b_container.setLayout(file_b_container_layout) + file_b_container.setToolTip(help_text_b) + + file_b_layout.addWidget(file_b_container) + layout.addLayout(file_b_layout) + + + help_text_suffix = "The events extracted from the blaze file." + + suffix2_layout = QHBoxLayout() + + help_btn_suffix = QPushButton("?") + help_btn_suffix.setFixedWidth(25) + help_btn_suffix.setToolTip(help_text_suffix) + help_btn_suffix.clicked.connect(lambda _, text=help_text_suffix: self.show_help_popup(text)) + suffix2_layout.addWidget(help_btn_suffix) + + suffix2_container = QWidget() + suffix2_container_layout = QHBoxLayout() + suffix2_container_layout.setContentsMargins(0, 0, 0, 0) + suffix2_container_layout.addWidget(self.label_events) + suffix2_container_layout.addWidget(self.combo_events) + suffix2_container.setLayout(suffix2_container_layout) + suffix2_container.setToolTip(help_text_suffix) + + suffix2_layout.addWidget(suffix2_container) + layout.addLayout(suffix2_layout) + + snirf_events_layout = QHBoxLayout() + + help_text_snirf_events = "The event markers extracted from the SNIRF file." + help_btn_snirf_events = QPushButton("?") + help_btn_snirf_events.setFixedWidth(25) + help_btn_snirf_events.setToolTip(help_text_snirf_events) + help_btn_snirf_events.clicked.connect(lambda _, text=help_text_snirf_events: self.show_help_popup(text)) + snirf_events_layout.addWidget(help_btn_snirf_events) + + snirf_events_container = QWidget() + snirf_events_container_layout = QHBoxLayout() + snirf_events_container_layout.setContentsMargins(0, 0, 0, 0) + snirf_events_container_layout.addWidget(self.label_snirf_events) + snirf_events_container_layout.addWidget(self.combo_snirf_events) + snirf_events_container.setLayout(snirf_events_container_layout) + snirf_events_container.setToolTip(help_text_snirf_events) + + snirf_events_layout.addWidget(snirf_events_container) + layout.addLayout(snirf_events_layout) + + buttons_layout = QHBoxLayout() + buttons_layout.addStretch() + buttons_layout.addWidget(self.btn_clear) + buttons_layout.addWidget(self.btn_go) + layout.addLayout(buttons_layout) + + self.setLayout(layout) + + + def show_help_popup(self, text): + msg = QMessageBox(self) + msg.setWindowTitle("Parameter Info - FLARES") + msg.setText(text) + msg.exec() + + def browse_file_a(self): + file_path, _ = QFileDialog.getOpenFileName(self, "Select SNIRF File", "", "SNIRF Files (*.snirf)") + if file_path: + self.line_edit_file_a.setText(file_path) + try: + raw = read_raw_snirf(file_path, preload=False) + annotations = raw.annotations + + # Build individual event entries + event_entries = [] + for onset, description in zip(annotations.onset, annotations.description): + event_str = f"{description} @ {onset:.3f}s" + event_entries.append(event_str) + + if not event_entries: + QMessageBox.information(self, "No Events", "No events found in SNIRF file.") + self.combo_snirf_events.clear() + self.combo_snirf_events.setEnabled(False) + return + + self.combo_snirf_events.clear() + self.combo_snirf_events.addItems(event_entries) + self.combo_snirf_events.setEnabled(True) + + except Exception as e: + QMessageBox.warning(self, "Error", f"Could not read SNIRF file with MNE:\n{str(e)}") + self.combo_snirf_events.clear() + self.combo_snirf_events.setEnabled(False) + + def browse_file_b(self): + file_path, _ = QFileDialog.getOpenFileName(self, "Select BLAZES File", "", "BLAZES project Files (*.blaze)") + if file_path: + self.line_edit_file_b.setText(file_path) + + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + self.blazes_data = data + + obs_keys = self.extract_blazes_observation_strings(data) + self.combo_events.clear() + if obs_keys: + self.combo_events.addItems(obs_keys) + self.combo_events.setEnabled(True) + else: + QMessageBox.information(self, "No Events", "No observation keys found in BLAZES file.") + self.combo_events.setEnabled(False) + + except (json.JSONDecodeError, FileNotFoundError, KeyError, TypeError) as e: + QMessageBox.warning(self, "Error", f"Failed to parse BLAZES file:\n{e}") + self.combo_events.clear() + self.combo_events.setEnabled(False) + + + def extract_blazes_observation_strings(self, data): + if "obs" not in data: + raise KeyError("Missing 'obs' key in BLAZES file.") + + obs = data["obs"] + event_strings = [] + + for event_name, occurrences in obs.items(): + # occurrences is a list of dicts: [{"start_frame": 642, "start_time_sec": 26.777, ...}] + for entry in occurrences: + onset = entry.get("start_time_sec", 0.0) + # Formatting to match your SNIRF style: "Event Name @ 0.000s" + display_str = f"{event_name} @ {onset:.3f}s" + event_strings.append(display_str) + + return event_strings + + + def clear_files(self): + self.line_edit_file_a.clear() + self.line_edit_file_b.clear() + + + def go_action(self): + file_a = self.line_edit_file_a.text() + file_b = self.line_edit_file_b.text() + suffix = APP_NAME + + if not hasattr(self, "blazes_data") or self.combo_events.count() == 0 or self.combo_snirf_events.count() == 0: + QMessageBox.warning(self, "Missing data", "Please make sure a BLAZES and SNIRF event are selected.") + return + + try: + blaze_text = self.combo_events.currentText() + _, blaze_time_str = blaze_text.split(" @ ") + blaze_anchor_time = float(blaze_time_str.replace("s", "").strip()) + except Exception as e: + QMessageBox.critical(self, "BLAZE Event Error", f"Could not parse BLAZE anchor:\n{e}") + return + + try: + snirf_text = self.combo_snirf_events.currentText() + _, snirf_time_str = snirf_text.split(" @ ") + snirf_anchor_time = float(snirf_time_str.replace("s", "").strip()) + except Exception as e: + QMessageBox.critical(self, "SNIRF Event Error", f"Could not parse SNIRF anchor:\n{e}") + return + + time_shift = snirf_anchor_time - blaze_anchor_time + + onsets, durations, descriptions = [], [], [] + skipped_count = 0 + + try: + ai_data = self.blazes_data.get("ai_tracks", {}) + + for track_name, events in ai_data.items(): + clean_name = track_name.replace("AI: ", "").strip() + + for event in events: + original_start = event.get("start_time_sec", 0.0) + original_end = event.get("end_time_sec", original_start) + duration = original_end - original_start + + # FILTER: Minimum 0.1s duration + if duration < 0.1: + skipped_count += 1 + continue + + # Apply shift + adjusted_onset = original_start + time_shift + + onsets.append(round(adjusted_onset, 6)) + durations.append(round(duration, 6)) + descriptions.append(clean_name) + + except Exception as e: + QMessageBox.critical(self, "Track Error", f"Failed to process tracks: {e}") + return + + if not onsets: + QMessageBox.warning(self, "No Data", f"No events met the 0.1s threshold. (Skipped {skipped_count})") + return + + if self.mode == EventUpdateMode.WRITE_SNIRF: + suggested_name = f"{os.path.splitext(os.path.basename(file_a))[0]}_{suffix}.snirf" + save_path, _ = QFileDialog.getSaveFileName(self, "Save SNIRF", suggested_name, "SNIRF Files (*.snirf)") + + if not save_path: return + if not save_path.lower().endswith(".snirf"): save_path += ".snirf" + + try: + raw = read_raw_snirf(file_a, preload=True) + + # Create annotations + new_annotations = Annotations( + onset=onsets, + duration=durations, + description=descriptions + ) + + # Replace existing annotations with the new aligned AI tracks + raw.set_annotations(new_annotations) + + write_raw_snirf(raw, save_path) + QMessageBox.information(self, "Success", + f"Aligned {len(onsets)} events.\n(Filtered out {skipped_count} short events)") + except Exception as e: + QMessageBox.critical(self, "Error", f"Failed to update SNIRF file:\n{e}") + + + + def update_optode_positions(self, file_a, file_b, save_path): + + fiducials = {} + ch_positions = {} + + # Read the lines from the optode file + with open(file_b, 'r') as f: + for line in f: + if line.strip(): + # Split by the semicolon and convert to meters + ch_name, coords_str = line.split(":") + coords = np.array(list(map(float, coords_str.strip().split()))) * 0.001 + + # The key we have is a fiducial + if ch_name.lower() in ['lpa', 'nz', 'rpa']: + fiducials[ch_name.lower()] = coords + + # The key we have is a source or detector + else: + ch_positions[ch_name.upper()] = coords + + # Create montage with updated coords in head space + initial_montage = make_dig_montage(ch_pos=ch_positions, nasion=fiducials.get('nz'), lpa=fiducials.get('lpa'), rpa=fiducials.get('rpa'), coord_frame='head') # type: ignore + + # Read the SNIRF file, set the montage, and write it back + raw = read_raw_snirf(file_a, preload=True) + raw.set_montage(initial_montage) + write_raw_snirf(raw, save_path) + + + def _apply_events_to_snirf(self, raw, new_annotations, save_path): + raw.set_annotations(new_annotations) + write_raw_snirf(raw, save_path) + + def _write_event_mapping_json( + self, + file_a, + file_b, + selected_obs, + snirf_anchor, + boris_anchor, + time_shift, + mapped_events, + save_path + ): + import json + from datetime import datetime + import os + + payload = { + "source": { + "called_from": self.caller, + "snirf_file": os.path.basename(file_a), + "boris_file": os.path.basename(file_b), + "observation": selected_obs + }, + "alignment": { + "snirf_anchor": snirf_anchor, + "boris_anchor": boris_anchor, + "time_shift_seconds": time_shift + }, + "events": mapped_events, + "created_at": datetime.utcnow().isoformat() + "Z" + } + + with open(save_path, "w", encoding="utf-8") as f: + json.dump(payload, f, indent=2) + + return save_path + + + + class ProgressBubble(QWidget): """ A clickable widget displaying a progress bar made of colored rectangles and a label. @@ -2418,8 +2623,6 @@ class ParticipantBrainViewerWidget(FlaresBaseWidget): self.thumb_size = QSize(280, 180) self.showMaximized() - - def show_brain_images(self): @@ -2549,10 +2752,6 @@ class ParticipantBrainViewerWidget(FlaresBaseWidget): print(f"No method defined for index {idx}") - - - - class ParticipantFunctionalConnectivityWidget(FlaresBaseWidget): def __init__(self, haemo_dict, epochs_dict): super().__init__("FunctionalConnectivityWidget") @@ -2860,7 +3059,6 @@ class GroupFunctionalConnectivityWidget(FlaresBaseWidget): self.showMaximized() - def show_brain_images(self): import flares @@ -2954,7 +3152,6 @@ class GroupFunctionalConnectivityWidget(FlaresBaseWidget): - class MultiProgressDialog(QDialog): def __init__(self, parent=None): super().__init__(parent) @@ -3538,11 +3735,6 @@ class ParameterInputDialog(QDialog): - - - - - class GroupViewerWidget(FlaresBaseWidget): def __init__(self, haemo_dict, cha, df_ind, design_matrix, contrast_results, group): super().__init__("GroupViewer") @@ -3826,8 +4018,6 @@ class GroupViewerWidget(FlaresBaseWidget): - - class GroupBrainViewerWidget(FlaresBaseWidget): def __init__(self, haemo_dict, df_ind, design_matrix, group, contrast_results_dict): super().__init__("GroupBrainViewer") @@ -3926,13 +4116,10 @@ class GroupBrainViewerWidget(FlaresBaseWidget): group_a = self.group_a_dropdown.currentText() self.update_participant_list_for_group(group_a, self.participant_dropdown_a) - - def _on_participants_changed(self, item=None): self._update_event_dropdown() - def _update_event_dropdown(self): participants_a = self._get_checked_items(self.participant_dropdown_a) participants_b = self._get_checked_items(self.participant_dropdown_b) @@ -4160,7 +4347,6 @@ class GroupBrainViewerWidget(FlaresBaseWidget): - class ViewerLauncherWidget(QWidget): def __init__(self, haemo_dict, config_dict, fig_bytes_dict, cha_dict, contrast_results_dict, df_ind, design_matrix, epochs_dict): super().__init__() @@ -4197,7 +4383,6 @@ class ViewerLauncherWidget(QWidget): btn6 = QPushButton("Open Export Data As CSV Viewer") btn6.clicked.connect(lambda: self.open_export_data_as_csv_viewer(haemo_dict, cha_dict, df_ind, design_matrix, group_dict, contrast_results_dict)) - layout.addWidget(btn1) layout.addWidget(btn2) layout.addWidget(btn3) @@ -4272,10 +4457,21 @@ class MainApplication(QMainWindow): self.files_failed = set() # set of failed file paths self.files_results = {} # dict for successful results (if needed) + self.platform_suffix = "-" + PLATFORM_NAME + + self.updater = UpdateManager( + main_window=self, + api_url=API_URL, + api_url_sec=API_URL_SECONDARY, + current_version=CURRENT_VERSION, + platform_name=PLATFORM_NAME, + platform_suffix=self.platform_suffix, + app_name=APP_NAME + ) + self.init_ui() self.create_menu_bar() - self.platform_suffix = "-" + PLATFORM_NAME self.pending_update_version = None self.pending_update_path = None self.last_clicked_bubble = None @@ -4285,9 +4481,9 @@ class MainApplication(QMainWindow): self.current_file = None # Start local pending update check thread - self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix) - self.local_check_thread.pending_update_found.connect(self.on_pending_update_found) - self.local_check_thread.no_pending_update.connect(self.on_no_pending_update) + self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix, PLATFORM_NAME, APP_NAME) + self.local_check_thread.pending_update_found.connect(self.updater.on_pending_update_found) + self.local_check_thread.no_pending_update.connect(self.updater.on_no_pending_update) self.local_check_thread.start() @@ -4503,15 +4699,16 @@ class MainApplication(QMainWindow): options_actions = [ ("User Guide", "F1", self.user_guide, resource_path("icons/help_24dp_1F1F1F.svg")), - ("Check for Updates", "F5", self.manual_check_for_updates, resource_path("icons/update_24dp_1F1F1F.svg")), + ("Check for Updates", "F5", self.updater.manual_check_for_updates, resource_path("icons/update_24dp_1F1F1F.svg")), ("Update optodes in snirf file...", "F6", self.update_optode_positions, resource_path("icons/upgrade_24dp_1F1F1F.svg")), - ("Update events in snirf file...", "F7", self.update_event_markers, resource_path("icons/upgrade_24dp_1F1F1F.svg")), + ("Update events in snirf file (BORIS)...", "F7", self.update_event_markers, resource_path("icons/upgrade_24dp_1F1F1F.svg")), + ("Update events in snirf file (BLAZES)...", "F8", self.update_event_markers_blazes, resource_path("icons/upgrade_24dp_1F1F1F.svg")), ("About", "F12", self.about_window, resource_path("icons/info_24dp_1F1F1F.svg")) ] for i, (name, shortcut, slot, icon) in enumerate(options_actions): options_menu.addAction(make_action(name, shortcut, slot, icon=icon)) - if i == 1 or i == 3: # after the first 2 actions (0,1) + if i == 1 or i == 4: # after the first 2 actions (0,1) options_menu.addSeparator() preferences_menu = menu_bar.addMenu("Preferences") @@ -4595,11 +4792,9 @@ class MainApplication(QMainWindow): def open_launcher_window(self): - self.launcher_window = ViewerLauncherWidget(self.raw_haemo_dict, self.config_dict, self.fig_bytes_dict, self.cha_dict, self.contrast_results_dict, self.df_ind_dict, self.design_matrix_dict, self.epochs_dict) self.launcher_window.show() - def copy_text(self): self.top_left_widget.copy() # Trigger copy self.statusbar.showMessage("Copied to clipboard") # Show status message @@ -4644,12 +4839,16 @@ class MainApplication(QMainWindow): self.optodes = UpdateOptodesWindow(self) self.optodes.show() - def update_event_markers(self): if self.events is None or not self.events.isVisible(): self.events = UpdateEventsWindow(self, EventUpdateMode.WRITE_SNIRF, "Manual SNIRF Edit") self.events.show() + def update_event_markers_blazes(self): + if self.events is None or not self.events.isVisible(): + self.events = UpdateEventsBlazesWindow(self, EventUpdateMode.WRITE_SNIRF, "Manual SNIRF Edit") + self.events.show() + def open_file_dialog(self): file_path, _ = QFileDialog.getOpenFileName( self, "Open File", "", "SNIRF Files (*.snirf);;All Files (*)" @@ -5426,7 +5625,6 @@ class MainApplication(QMainWindow): - def check_for_pipeline_results(self): while not self.result_queue.empty(): msg = self.result_queue.get() @@ -5578,147 +5776,6 @@ class MainApplication(QMainWindow): bubble.update_progress(step_index) - '''UPDATER''' - def manual_check_for_updates(self): - self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix) - self.local_check_thread.pending_update_found.connect(self.on_pending_update_found) - self.local_check_thread.no_pending_update.connect(self.on_no_pending_update) - self.local_check_thread.start() - - def on_pending_update_found(self, version, folder_path): - self.statusBar().showMessage(f"Pending update found: version {version}") - self.pending_update_version = version - self.pending_update_path = folder_path - self.show_pending_update_popup() - - def on_no_pending_update(self): - # No pending update found locally, start server check directly - self.statusBar().showMessage("No pending local update found. Checking server...") - self.start_update_check_thread() - - def show_pending_update_popup(self): - msg_box = QMessageBox(self) - msg_box.setWindowTitle("Pending Update Found") - msg_box.setText(f"A previously downloaded update (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?") - install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) - install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) - msg_box.exec() - - if msg_box.clickedButton() == install_now_button: - self.install_update(self.pending_update_path) - else: - self.statusBar().showMessage("Pending update available. Install later.") - # After user dismisses, still check the server for new updates - self.start_update_check_thread() - - def start_update_check_thread(self): - self.check_thread = UpdateCheckThread() - self.check_thread.download_requested.connect(self.on_server_update_requested) - self.check_thread.no_update_available.connect(self.on_server_no_update) - self.check_thread.error_occurred.connect(self.on_error) - self.check_thread.start() - - def on_server_no_update(self): - self.statusBar().showMessage("No new updates found on server.", 5000) - - def on_server_update_requested(self, download_url, latest_version): - if self.pending_update_version: - cmp = self.version_compare(latest_version, self.pending_update_version) - if cmp > 0: - # Server version is newer than pending update - self.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...") - try: - shutil.rmtree(self.pending_update_path) - self.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}") - except Exception as e: - self.statusBar().showMessage(f"Failed to delete old update folder: {e}") - - # Clear pending update info so new download proceeds - self.pending_update_version = None - self.pending_update_path = None - - # Download the new update - self.download_update(download_url, latest_version) - elif cmp == 0: - # Versions equal, no download needed - self.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.") - else: - # Server version older than pending? Unlikely but just keep pending update - self.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.") - else: - # No pending update, just download - self.download_update(download_url, latest_version) - - def download_update(self, download_url, latest_version): - self.statusBar().showMessage("Downloading update...") - self.download_thread = UpdateDownloadThread(download_url, latest_version) - self.download_thread.update_ready.connect(self.on_update_ready) - self.download_thread.error_occurred.connect(self.on_error) - self.download_thread.start() - - def on_update_ready(self, latest_version, extract_folder): - self.statusBar().showMessage("Update downloaded and extracted.") - - msg_box = QMessageBox(self) - msg_box.setWindowTitle("Update Ready") - msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?") - install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) - install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) - - msg_box.exec() - - if msg_box.clickedButton() == install_now_button: - self.install_update(extract_folder) - else: - self.statusBar().showMessage("Update ready. Install later.") - - - def install_update(self, extract_folder): - # Path to updater executable - - if PLATFORM_NAME == 'windows': - updater_path = os.path.join(os.getcwd(), "flares_updater.exe") - elif PLATFORM_NAME == 'darwin': - if getattr(sys, 'frozen', False): - updater_path = os.path.join(os.path.dirname(sys.executable), "../../../flares_updater.app") - else: - updater_path = os.path.join(os.getcwd(), "../flares_updater.app") - - elif PLATFORM_NAME == 'linux': - updater_path = os.path.join(os.getcwd(), "flares_updater") - else: - updater_path = os.getcwd() - - if not os.path.exists(updater_path): - QMessageBox.critical(self, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}") - return - - # Launch updater with extracted folder path as argument - try: - # Pass current app's executable path for updater to relaunch - main_app_executable = os.path.abspath(sys.argv[0]) - - print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"') - - if PLATFORM_NAME == 'darwin': - subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable]) - else: - subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path)) - - # Close the current app so updater can replace files - sys.exit(0) - - except Exception as e: - QMessageBox.critical(self, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}") - - def on_error(self, message): - # print(f"Error: {message}") - self.statusBar().showMessage(f"Error occurred during update process. {message}") - - def version_compare(self, v1, v2): - def normalize(v): return [int(x) for x in v.split(".")] - return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) - def get_snirf_metadata_mne(self, file_name): try: @@ -5788,162 +5845,6 @@ class MainApplication(QMainWindow): event.accept() -def wait_for_process_to_exit(process_name, timeout=10): - """ - Waits for a process with the specified name to exit within a timeout period. - - Args: - process_name (str): Name (or part of the name) of the process to wait for. - timeout (int, optional): Maximum time to wait in seconds. Defaults to 10. - - Returns: - bool: True if the process exited before the timeout, False otherwise. - """ - - print(f"Waiting for {process_name} to exit...") - deadline = time.time() + timeout - while time.time() < deadline: - still_running = False - for proc in psutil.process_iter(['name']): - try: - if proc.info['name'] and process_name.lower() in proc.info['name'].lower(): - still_running = True - print(f"Still running: {proc.info['name']} (PID: {proc.pid})") - break - except (psutil.NoSuchProcess, psutil.AccessDenied): - continue - if not still_running: - print(f"{process_name} has exited.") - return True - time.sleep(0.5) - print(f"{process_name} did not exit in time.") - return False - - -def finish_update_if_needed(): - """ - Completes a pending application update if '--finish-update' is present in the command-line arguments. - """ - - if "--finish-update" in sys.argv: - print("Finishing update...") - - if PLATFORM_NAME == 'darwin': - app_dir = '/tmp/flarestempupdate' - else: - app_dir = os.getcwd() - - # 1. Find update folder - update_folder = None - for entry in os.listdir(app_dir): - entry_path = os.path.join(app_dir, entry) - if os.path.isdir(entry_path) and entry.startswith("flares-") and entry.endswith("-" + PLATFORM_NAME): - update_folder = os.path.join(app_dir, entry) - break - - if update_folder is None: - print("No update folder found. Skipping update steps.") - return - - if PLATFORM_NAME == 'darwin': - update_folder = os.path.join(update_folder, "flares-darwin") - - # 2. Wait for flares_updater to exit - print("Waiting for flares_updater to exit...") - for proc in psutil.process_iter(['pid', 'name']): - if proc.info['name'] and "flares_updater" in proc.info['name'].lower(): - try: - proc.wait(timeout=5) - except psutil.TimeoutExpired: - print("Force killing lingering flares_updater") - proc.kill() - - # 3. Replace the updater - if PLATFORM_NAME == 'windows': - new_updater = os.path.join(update_folder, "flares_updater.exe") - dest_updater = os.path.join(app_dir, "flares_updater.exe") - - elif PLATFORM_NAME == 'darwin': - new_updater = os.path.join(update_folder, "flares_updater.app") - dest_updater = os.path.abspath(os.path.join(sys.executable, "../../../../flares_updater.app")) - - elif PLATFORM_NAME == 'linux': - new_updater = os.path.join(update_folder, "flares_updater") - dest_updater = os.path.join(app_dir, "flares_updater") - - else: - print("No platform??") - new_updater = os.getcwd() - dest_updater = os.getcwd() - - print(f"New updater is {new_updater}") - print(f"Dest updater is {dest_updater}") - - print("Writable?", os.access(dest_updater, os.W_OK)) - print("Executable path:", sys.executable) - print("Trying to copy:", new_updater, "->", dest_updater) - - if os.path.exists(new_updater): - try: - if os.path.exists(dest_updater): - if PLATFORM_NAME == 'darwin': - try: - if os.path.isdir(dest_updater): - shutil.rmtree(dest_updater) - print(f"Deleted directory: {dest_updater}") - else: - os.remove(dest_updater) - print(f"Deleted file: {dest_updater}") - except Exception as e: - print(f"Error deleting {dest_updater}: {e}") - else: - os.remove(dest_updater) - - if PLATFORM_NAME == 'darwin': - wait_for_process_to_exit("flares_updater", timeout=10) - subprocess.check_call(["ditto", new_updater, dest_updater]) - else: - shutil.copy2(new_updater, dest_updater) - - if PLATFORM_NAME in ('linux', 'darwin'): - os.chmod(dest_updater, 0o755) - - if PLATFORM_NAME == 'darwin': - remove_quarantine(dest_updater) - - print("flares_updater replaced.") - except Exception as e: - print(f"Failed to replace flares_updater: {e}") - - # 4. Delete the update folder - try: - if PLATFORM_NAME == 'darwin': - shutil.rmtree(app_dir) - else: - shutil.rmtree(update_folder) - except Exception as e: - print(f"Failed to delete update folder: {e}") - - QMessageBox.information(None, "Update Complete", "The application has been successfully updated.") - sys.argv.remove("--finish-update") - - -def remove_quarantine(app_path): - """ - Removes the macOS quarantine attribute from the specified application path. - """ - - script = f''' - do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "FLARES needs privileges to finish the update. (2/2)" - ''' - try: - subprocess.run(['osascript', '-e', script], check=True) - print("✅ Quarantine attribute removed.") - except subprocess.CalledProcessError as e: - print("❌ Failed to remove quarantine attribute.") - print(e) - - def run_gui_entry_wrapper(config, gui_queue, progress_queue): """ Where the processing happens @@ -6080,7 +5981,7 @@ if __name__ == "__main__": # Only run GUI in the main process if current_process().name == 'MainProcess': app = QApplication(sys.argv) - finish_update_if_needed() + finish_update_if_needed(PLATFORM_NAME, APP_NAME) window = MainApplication() if PLATFORM_NAME == "darwin": @@ -6092,4 +5993,4 @@ if __name__ == "__main__": window.show() sys.exit(app.exec()) -# Not 4000 lines yay! \ No newline at end of file +# Not 6000 lines yay! \ No newline at end of file diff --git a/updater.py b/updater.py new file mode 100644 index 0000000..154fe18 --- /dev/null +++ b/updater.py @@ -0,0 +1,540 @@ +""" +Filename: updater.py +Description: Generic updater file + +Author: Tyler de Zeeuw +License: GPL-3.0 +""" + + +# Built-in imports +import os +import re +import sys +import time +import shlex +import shutil +import zipfile +import traceback +import subprocess + +# External library imports +import psutil +import requests + +from PySide6.QtWidgets import QMessageBox +from PySide6.QtCore import QThread, Signal, QObject + + +class UpdateDownloadThread(QThread): + """ + Thread that downloads and extracts an update package and emits a signal on completion or error. + + Args: + download_url (str): URL of the update zip file to download. + latest_version (str): Version string of the latest update. + """ + + update_ready = Signal(str, str) + error_occurred = Signal(str) + + def __init__(self, download_url, latest_version, platform_name, app_name): + super().__init__() + self.download_url = download_url + self.latest_version = latest_version + self.platform_name = platform_name + self.app_name = app_name + + def run(self): + try: + local_filename = os.path.basename(self.download_url) + + if self.platform_name == 'darwin': + tmp_dir = f'/tmp/{self.app_name}tempupdate' + os.makedirs(tmp_dir, exist_ok=True) + local_path = os.path.join(tmp_dir, local_filename) + else: + local_path = os.path.join(os.getcwd(), local_filename) + + # Download the file + with requests.get(self.download_url, stream=True, timeout=15) as r: + r.raise_for_status() + with open(local_path, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + # Extract folder name (remove .zip) + if self.platform_name == 'darwin': + extract_folder = os.path.splitext(local_filename)[0] + extract_path = os.path.join(tmp_dir, extract_folder) + + else: + extract_folder = os.path.splitext(local_filename)[0] + extract_path = os.path.join(os.getcwd(), extract_folder) + + # Create the folder if not exists + os.makedirs(extract_path, exist_ok=True) + + # Extract the zip file contents + if self.platform_name == 'darwin': + subprocess.run(['ditto', '-xk', local_path, extract_path], check=True) + else: + with zipfile.ZipFile(local_path, 'r') as zip_ref: + zip_ref.extractall(extract_path) + + # Remove the zip once extracted and emit a signal + os.remove(local_path) + self.update_ready.emit(self.latest_version, extract_path) + + except Exception as e: + # Emit a signal signifying failure + self.error_occurred.emit(str(e)) + + + +class UpdateCheckThread(QThread): + """ + Thread that checks for updates by querying the API and emits a signal based on the result. + + Signals: + download_requested(str, str): Emitted with (download_url, latest_version) when an update is available. + no_update_available(): Emitted when no update is found or current version is up to date. + error_occurred(str): Emitted with an error message if the update check fails. + """ + + download_requested = Signal(str, str) + no_update_available = Signal() + error_occurred = Signal(str) + + def __init__(self, api_url, api_url_sec, current_version, platform_name, app_name): + super().__init__() + self.api_url = api_url + self.api_url_sec = api_url_sec + self.current_version = current_version + self.platform_name = platform_name + self.app_name = app_name + + def run(self): + # if not getattr(sys, 'frozen', False): + # self.error_occurred.emit("Application is not frozen (Development mode).") + # return + try: + latest_version, download_url = self.get_latest_release_for_platform() + if not latest_version: + self.no_update_available.emit() + return + + if not download_url: + self.error_occurred.emit(f"No download available for platform '{self.platform_name}'") + return + + if self.version_compare(latest_version, self.current_version) > 0: + self.download_requested.emit(download_url, latest_version) + else: + self.no_update_available.emit() + + except Exception as e: + self.error_occurred.emit(f"Update check failed: {e}") + + def version_compare(self, v1, v2): + def normalize(v): return [int(x) for x in v.split(".")] + return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) + + def get_latest_release_for_platform(self): + urls = [self.api_url, self.api_url_sec] + for url in urls: + try: + + response = requests.get(url, timeout=5) + response.raise_for_status() + releases = response.json() + + if not releases: + continue + + latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None) + + if not latest: + continue + + tag = latest["tag_name"].lstrip("v") + + for asset in latest.get("assets", []): + if self.platform_name in asset["name"].lower(): + return tag, asset["browser_download_url"] + + return tag, None + except (requests.RequestException, ValueError) as e: + continue + return None, None + + +class LocalPendingUpdateCheckThread(QThread): + """ + Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly. + + Args: + current_version (str): Current application version. + platform_suffix (str): Platform-specific suffix to identify update folders. + """ + + pending_update_found = Signal(str, str) + no_pending_update = Signal() + + def __init__(self, current_version, platform_suffix, platform_name, app_name): + super().__init__() + self.current_version = current_version + self.platform_suffix = platform_suffix + self.platform_name = platform_name + self.app_name = app_name + + def version_compare(self, v1, v2): + def normalize(v): return [int(x) for x in v.split(".")] + return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) + + def run(self): + if self.platform_name == 'darwin': + cwd = f'/tmp/{self.app_name}tempupdate' + else: + cwd = os.getcwd() + + pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$") + found = False + + try: + for item in os.listdir(cwd): + folder_path = os.path.join(cwd, item) + if os.path.isdir(folder_path) and item.endswith(self.platform_suffix): + match = pattern.match(item) + if match: + folder_version = match.group(1) + if self.version_compare(folder_version, self.current_version) > 0: + self.pending_update_found.emit(folder_version, folder_path) + found = True + break + except: + pass + + if not found: + self.no_pending_update.emit() + + + + + +class UpdateManager(QObject): + """ + Orchestrates the update process. + Main apps should instantiate this and call check_for_updates(). + """ + + def __init__(self, main_window, api_url, api_url_sec, current_version, platform_name, platform_suffix, app_name): + super().__init__() + self.parent = main_window + self.api_url = api_url + self.api_url_sec = api_url_sec + self.current_version = current_version + self.platform_name = platform_name + self.platform_suffix = platform_suffix + self.app_name = app_name + + self.pending_update_version = None + self.pending_update_path = None + + + def manual_check_for_updates(self): + self.local_check_thread = LocalPendingUpdateCheckThread(self.current_version, self.platform_suffix, self.platform_name, self.app_name) + self.local_check_thread.pending_update_found.connect(self.on_pending_update_found) + self.local_check_thread.no_pending_update.connect(self.on_no_pending_update) + self.local_check_thread.start() + + def on_pending_update_found(self, version, folder_path): + self.parent.statusBar().showMessage(f"Pending update found: version {version}") + self.pending_update_version = version + self.pending_update_path = folder_path + self.show_pending_update_popup() + + def on_no_pending_update(self): + # No pending update found locally, start server check directly + self.parent.statusBar().showMessage("No pending local update found. Checking server...") + self.start_update_check_thread() + + def show_pending_update_popup(self): + msg_box = QMessageBox(self.parent) + msg_box.setWindowTitle("Pending Update Found") + msg_box.setText(f"A previously downloaded update for {self.app_name.upper()} (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?") + install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) + install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) + msg_box.exec() + + if msg_box.clickedButton() == install_now_button: + self.install_update(self.pending_update_path) + else: + self.parent.statusBar().showMessage("Pending update available. Install later.") + # After user dismisses, still check the server for new updates + self.start_update_check_thread() + + def start_update_check_thread(self): + self.check_thread = UpdateCheckThread(self.api_url, self.api_url_sec, self.current_version, self.platform_name, self.app_name) + self.check_thread.download_requested.connect(self.on_server_update_requested) + self.check_thread.no_update_available.connect(self.on_server_no_update) + self.check_thread.error_occurred.connect(self.on_error) + self.check_thread.start() + + def on_server_no_update(self): + self.parent.statusBar().showMessage("No new updates found on server.", 5000) + + def on_server_update_requested(self, download_url, latest_version): + if self.pending_update_version: + cmp = self.version_compare(latest_version, self.pending_update_version) + if cmp > 0: + # Server version is newer than pending update + self.parent.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...") + try: + shutil.rmtree(self.pending_update_path) + self.parent.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}") + except Exception as e: + self.parent.statusBar().showMessage(f"Failed to delete old update folder: {e}") + + # Clear pending update info so new download proceeds + self.pending_update_version = None + self.pending_update_path = None + + # Download the new update + self.download_update(download_url, latest_version) + elif cmp == 0: + # Versions equal, no download needed + self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.") + else: + # Server version older than pending? Unlikely but just keep pending update + self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.") + else: + # No pending update, just download + self.download_update(download_url, latest_version) + + def download_update(self, download_url, latest_version): + self.parent.statusBar().showMessage("Downloading update...") + self.download_thread = UpdateDownloadThread(download_url, latest_version, self.platform_name, self.app_name) + self.download_thread.update_ready.connect(self.on_update_ready) + self.download_thread.error_occurred.connect(self.on_error) + self.download_thread.start() + + def on_update_ready(self, latest_version, extract_folder): + self.parent.statusBar().showMessage("Update downloaded and extracted.") + + msg_box = QMessageBox(self.parent) + msg_box.setWindowTitle("Update Ready") + msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?") + install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) + install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) + + msg_box.exec() + + if msg_box.clickedButton() == install_now_button: + self.install_update(extract_folder) + else: + self.parent.statusBar().showMessage("Update ready. Install later.") + + + def install_update(self, extract_folder): + # Path to updater executable + + if self.platform_name == 'windows': + updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater.exe") + elif self.platform_name == 'darwin': + if getattr(sys, 'frozen', False): + updater_path = os.path.join(os.path.dirname(sys.executable), f"../../../{self.app_name}_updater.app") + else: + updater_path = os.path.join(os.getcwd(), f"../{self.app_name}_updater.app") + + elif self.platform_name == 'linux': + updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater") + else: + updater_path = os.getcwd() + + if not os.path.exists(updater_path): + QMessageBox.critical(self.parent, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}") + return + + # Launch updater with extracted folder path as argument + try: + # Pass current app's executable path for updater to relaunch + main_app_executable = os.path.abspath(sys.argv[0]) + + print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"') + + if self.platform_name == 'darwin': + subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable]) + else: + subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path)) + + # Close the current app so updater can replace files + sys.exit(0) + + except Exception as e: + QMessageBox.critical(self.parent, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}") + + def on_error(self, message): + # print(f"Error: {message}") + self.parent.statusBar().showMessage(f"Error occurred during update process. {message}") + + def version_compare(self, v1, v2): + def normalize(v): return [int(x) for x in v.split(".")] + return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) + + +def wait_for_process_to_exit(process_name, timeout=10): + """ + Waits for a process with the specified name to exit within a timeout period. + + Args: + process_name (str): Name (or part of the name) of the process to wait for. + timeout (int, optional): Maximum time to wait in seconds. Defaults to 10. + + Returns: + bool: True if the process exited before the timeout, False otherwise. + """ + + print(f"Waiting for {process_name} to exit...") + deadline = time.time() + timeout + while time.time() < deadline: + still_running = False + for proc in psutil.process_iter(['name']): + try: + if proc.info['name'] and process_name.lower() in proc.info['name'].lower(): + still_running = True + print(f"Still running: {proc.info['name']} (PID: {proc.pid})") + break + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + if not still_running: + print(f"{process_name} has exited.") + return True + time.sleep(0.5) + print(f"{process_name} did not exit in time.") + return False + + +def finish_update_if_needed(platform_name, app_name): + """ + Completes a pending application update if '--finish-update' is present in the command-line arguments. + """ + + if "--finish-update" in sys.argv: + print("Finishing update...") + + if platform_name == 'darwin': + app_dir = f'/tmp/{app_name}tempupdate' + else: + app_dir = os.getcwd() + + # 1. Find update folder + update_folder = None + for entry in os.listdir(app_dir): + entry_path = os.path.join(app_dir, entry) + if os.path.isdir(entry_path) and entry.startswith(f"{app_name}-") and entry.endswith("-" + platform_name): + update_folder = os.path.join(app_dir, entry) + break + + if update_folder is None: + print("No update folder found. Skipping update steps.") + return + + if platform_name == 'darwin': + update_folder = os.path.join(update_folder, f"{app_name}-darwin") + + # 2. Wait for updater to exit + print(f"Waiting for {app_name}_updater to exit...") + for proc in psutil.process_iter(['pid', 'name']): + if proc.info['name'] and f"{app_name}_updater" in proc.info['name'].lower(): + try: + proc.wait(timeout=5) + except psutil.TimeoutExpired: + print(f"Force killing lingering {app_name}_updater") + proc.kill() + + # 3. Replace the updater + if platform_name == 'windows': + new_updater = os.path.join(update_folder, f"{app_name}_updater.exe") + dest_updater = os.path.join(app_dir, f"{app_name}_updater.exe") + + elif platform_name == 'darwin': + new_updater = os.path.join(update_folder, f"{app_name}_updater.app") + dest_updater = os.path.abspath(os.path.join(sys.executable, f"../../../../{app_name}_updater.app")) + + elif platform_name == 'linux': + new_updater = os.path.join(update_folder, f"{app_name}_updater") + dest_updater = os.path.join(app_dir, f"{app_name}_updater") + + else: + print("Unknown Platform") + new_updater = os.getcwd() + dest_updater = os.getcwd() + + print(f"New updater is {new_updater}") + print(f"Dest updater is {dest_updater}") + + print("Writable?", os.access(dest_updater, os.W_OK)) + print("Executable path:", sys.executable) + print("Trying to copy:", new_updater, "->", dest_updater) + + if os.path.exists(new_updater): + try: + if os.path.exists(dest_updater): + if platform_name == 'darwin': + try: + if os.path.isdir(dest_updater): + shutil.rmtree(dest_updater) + print(f"Deleted directory: {dest_updater}") + else: + os.remove(dest_updater) + print(f"Deleted file: {dest_updater}") + except Exception as e: + print(f"Error deleting {dest_updater}: {e}") + else: + os.remove(dest_updater) + + if platform_name == 'darwin': + wait_for_process_to_exit(f"{app_name}_updater", timeout=10) + subprocess.check_call(["ditto", new_updater, dest_updater]) + else: + shutil.copy2(new_updater, dest_updater) + + if platform_name in ('linux', 'darwin'): + os.chmod(dest_updater, 0o755) + + if platform_name == 'darwin': + remove_quarantine(dest_updater, app_name) + + print(f"{app_name}_updater replaced.") + except Exception as e: + print(f"Failed to replace {app_name}_updater: {e}") + + # 4. Delete the update folder + try: + if platform_name == 'darwin': + shutil.rmtree(app_dir) + else: + shutil.rmtree(update_folder) + except Exception as e: + print(f"Failed to delete update folder: {e}") + + QMessageBox.information(None, "Update Complete", "The application has been successfully updated.") + sys.argv.remove("--finish-update") + + +def remove_quarantine(app_path, app_name): + """ + Removes the macOS quarantine attribute from the specified application path. + """ + + script = f''' + do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{app_name.upper()} needs privileges to finish the update. (2/2)" + ''' + try: + subprocess.run(['osascript', '-e', script], check=True) + print("✅ Quarantine attribute removed.") + except subprocess.CalledProcessError as e: + print("❌ Failed to remove quarantine attribute.") + print(e) \ No newline at end of file