diff --git a/main.py b/main.py
index d3c4497..06ed309 100644
--- a/main.py
+++ b/main.py
@@ -8,14 +8,11 @@ License: GPL-3.0
# Built-in imports
import os
-import re
import sys
import json
import time
-import shlex
import pickle
import shutil
-import zipfile
import platform
import traceback
import subprocess
@@ -28,7 +25,8 @@ from enum import Enum, auto
import numpy as np
import pandas as pd
import psutil
-import requests
+
+from updater import finish_update_if_needed, UpdateManager, LocalPendingUpdateCheckThread
from mne.io import read_raw_snirf
from mne.preprocessing.nirs import source_detector_distances
@@ -46,12 +44,12 @@ from PySide6.QtGui import QAction, QKeySequence, QIcon, QIntValidator, QDoubleVa
from PySide6.QtSvgWidgets import QSvgWidget # needed to show svgs when app is not frozen
-CURRENT_VERSION = "1.2.1"
-
+CURRENT_VERSION = "1.2.0"
API_URL = "https://git.research.dezeeuw.ca/api/v1/repos/tyler/flares/releases"
API_URL_SECONDARY = "https://git.research2.dezeeuw.ca/api/v1/repos/tyler/flares/releases"
-
PLATFORM_NAME = platform.system().lower()
+APP_NAME = "flares"
+
# Selectable parameters on the right side of the window
SECTIONS = [
@@ -225,9 +223,6 @@ SECTIONS = [
]
-
-
-
class SaveProjectThread(QThread):
finished_signal = Signal(str)
error_signal = Signal(str)
@@ -316,189 +311,6 @@ class TerminalWindow(QWidget):
return f"Available commands: {', '.join(self.commands.keys())}"
-class UpdateDownloadThread(QThread):
- """
- Thread that downloads and extracts an update package and emits a signal on completion or error.
-
- Args:
- download_url (str): URL of the update zip file to download.
- latest_version (str): Version string of the latest update.
- """
-
- update_ready = Signal(str, str)
- error_occurred = Signal(str)
-
- def __init__(self, download_url, latest_version):
- super().__init__()
- self.download_url = download_url
- self.latest_version = latest_version
-
- def run(self):
- try:
- local_filename = os.path.basename(self.download_url)
-
- if PLATFORM_NAME == 'darwin':
- tmp_dir = '/tmp/flarestempupdate'
- os.makedirs(tmp_dir, exist_ok=True)
- local_path = os.path.join(tmp_dir, local_filename)
- else:
- local_path = os.path.join(os.getcwd(), local_filename)
-
- # Download the file
- with requests.get(self.download_url, stream=True, timeout=15) as r:
- r.raise_for_status()
- with open(local_path, 'wb') as f:
- for chunk in r.iter_content(chunk_size=8192):
- if chunk:
- f.write(chunk)
-
- # Extract folder name (remove .zip)
- if PLATFORM_NAME == 'darwin':
- extract_folder = os.path.splitext(local_filename)[0]
- extract_path = os.path.join(tmp_dir, extract_folder)
-
- else:
- extract_folder = os.path.splitext(local_filename)[0]
- extract_path = os.path.join(os.getcwd(), extract_folder)
-
- # Create the folder if not exists
- os.makedirs(extract_path, exist_ok=True)
-
- # Extract the zip file contents
- if PLATFORM_NAME == 'darwin':
- subprocess.run(['ditto', '-xk', local_path, extract_path], check=True)
- else:
- with zipfile.ZipFile(local_path, 'r') as zip_ref:
- zip_ref.extractall(extract_path)
-
- # Remove the zip once extracted and emit a signal
- os.remove(local_path)
- self.update_ready.emit(self.latest_version, extract_path)
-
- except Exception as e:
- # Emit a signal signifying failure
- self.error_occurred.emit(str(e))
-
-
-
-class UpdateCheckThread(QThread):
- """
- Thread that checks for updates by querying the API and emits a signal based on the result.
-
- Signals:
- download_requested(str, str): Emitted with (download_url, latest_version) when an update is available.
- no_update_available(): Emitted when no update is found or current version is up to date.
- error_occurred(str): Emitted with an error message if the update check fails.
- """
-
- download_requested = Signal(str, str)
- no_update_available = Signal()
- error_occurred = Signal(str)
-
- def run(self):
- if not getattr(sys, 'frozen', False):
- self.error_occurred.emit("Application is not frozen (Development mode).")
- return
- try:
- latest_version, download_url = self.get_latest_release_for_platform()
- if not latest_version:
- self.no_update_available.emit()
- return
-
- if not download_url:
- self.error_occurred.emit(f"No download available for platform '{PLATFORM_NAME}'")
- return
-
- if self.version_compare(latest_version, CURRENT_VERSION) > 0:
- self.download_requested.emit(download_url, latest_version)
- else:
- self.no_update_available.emit()
-
- except Exception as e:
- self.error_occurred.emit(f"Update check failed: {e}")
-
- def version_compare(self, v1, v2):
- def normalize(v): return [int(x) for x in v.split(".")]
- return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
-
- def get_latest_release_for_platform(self):
- urls = [API_URL, API_URL_SECONDARY]
- for url in urls:
- try:
-
- response = requests.get(url, timeout=5)
- response.raise_for_status()
- releases = response.json()
-
- if not releases:
- continue
-
- latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None)
-
- if not latest:
- continue
-
- tag = latest["tag_name"].lstrip("v")
-
- for asset in latest.get("assets", []):
- if PLATFORM_NAME in asset["name"].lower():
- return tag, asset["browser_download_url"]
-
- return tag, None
- except (requests.RequestException, ValueError) as e:
- continue
- return None, None
-
-
-class LocalPendingUpdateCheckThread(QThread):
- """
- Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly.
-
- Args:
- current_version (str): Current application version.
- platform_suffix (str): Platform-specific suffix to identify update folders.
- """
-
- pending_update_found = Signal(str, str)
- no_pending_update = Signal()
-
- def __init__(self, current_version, platform_suffix):
- super().__init__()
- self.current_version = current_version
- self.platform_suffix = platform_suffix
-
- def version_compare(self, v1, v2):
- def normalize(v): return [int(x) for x in v.split(".")]
- return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
-
- def run(self):
- if PLATFORM_NAME == 'darwin':
- cwd = '/tmp/flarestempupdate'
- else:
- cwd = os.getcwd()
-
- pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$")
- found = False
-
- try:
- for item in os.listdir(cwd):
- folder_path = os.path.join(cwd, item)
- if os.path.isdir(folder_path) and item.endswith(self.platform_suffix):
- match = pattern.match(item)
- if match:
- folder_version = match.group(1)
- if self.version_compare(folder_version, self.current_version) > 0:
- self.pending_update_found.emit(folder_version, folder_path)
- found = True
- break
- except:
- pass
-
- if not found:
- self.no_pending_update.emit()
-
-
-
class AboutWindow(QWidget):
"""
Simple About window displaying basic application information.
@@ -1379,6 +1191,399 @@ class UpdateEventsWindow(QWidget):
return save_path
+
+class UpdateEventsBlazesWindow(QWidget):
+
+ def __init__(self, parent=None, mode=EventUpdateMode.WRITE_SNIRF, caller=None):
+ super().__init__(parent, Qt.WindowType.Window)
+
+ self.mode = mode
+ self.caller = caller or self.__class__.__name__
+ self.setWindowTitle("Update event markers (BLAZES)")
+ self.resize(760, 200)
+
+ self.label_file_a = QLabel("SNIRF file:")
+ self.line_edit_file_a = QLineEdit()
+ self.line_edit_file_a.setReadOnly(True)
+ self.btn_browse_a = QPushButton("Browse .snirf")
+ self.btn_browse_a.clicked.connect(self.browse_file_a)
+
+ self.label_file_b = QLabel("BLAZES file:")
+ self.line_edit_file_b = QLineEdit()
+ self.line_edit_file_b.setReadOnly(True)
+ self.btn_browse_b = QPushButton("Browse .blaze")
+ self.btn_browse_b.clicked.connect(self.browse_file_b)
+
+ self.label_events = QLabel("Events in selected blazes file:")
+ self.combo_events = QComboBox()
+ self.combo_events.setEnabled(False)
+
+ self.label_snirf_events = QLabel("Events in SNIRF file:")
+ self.combo_snirf_events = QComboBox()
+ self.combo_snirf_events.setEnabled(False)
+
+ self.btn_clear = QPushButton("Clear")
+ self.btn_go = QPushButton("Go")
+ self.btn_clear.clicked.connect(self.clear_files)
+ self.btn_go.clicked.connect(self.go_action)
+
+ # ---
+ layout = QVBoxLayout()
+ self.description = QLabel()
+ self.description.setTextFormat(Qt.TextFormat.RichText)
+ self.description.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction)
+ self.description.setOpenExternalLinks(True)
+
+ self.description.setText("The events that are present in a snirf file may not be the events that are to be studied and examined.
"
+ "Utilizing different software and video recordings, it is easy enough to see when an action actually occured in a file.
"
+ "The software BLAZES is used to create these events in video files, and these events can be applied to the snirf file
"
+ "selected below by time syncing it to an event that it shares with the snirf file.")
+
+ layout.addWidget(self.description)
+
+ help_text_a = "Select the SNIRF (.snirf) file to update with new event markers."
+
+ file_a_layout = QHBoxLayout()
+
+ # Help button on the left
+ help_btn_a = QPushButton("?")
+ help_btn_a.setFixedWidth(25)
+ help_btn_a.setToolTip(help_text_a)
+ help_btn_a.clicked.connect(lambda _, text=help_text_a: self.show_help_popup(text))
+ file_a_layout.addWidget(help_btn_a)
+
+ # Container for label + line_edit + browse button with tooltip
+ file_a_container = QWidget()
+ file_a_container_layout = QHBoxLayout()
+ file_a_container_layout.setContentsMargins(0, 0, 0, 0)
+ file_a_container_layout.addWidget(self.label_file_a)
+ file_a_container_layout.addWidget(self.line_edit_file_a)
+ file_a_container_layout.addWidget(self.btn_browse_a)
+ file_a_container.setLayout(file_a_container_layout)
+ file_a_container.setToolTip(help_text_a)
+
+ file_a_layout.addWidget(file_a_container)
+ layout.addLayout(file_a_layout)
+
+
+ help_text_b = "Provide a .blaze output file that contains events for this participant."
+
+ file_b_layout = QHBoxLayout()
+
+ help_btn_b = QPushButton("?")
+ help_btn_b.setFixedWidth(25)
+ help_btn_b.setToolTip(help_text_b)
+ help_btn_b.clicked.connect(lambda _, text=help_text_b: self.show_help_popup(text))
+ file_b_layout.addWidget(help_btn_b)
+
+ file_b_container = QWidget()
+ file_b_container_layout = QHBoxLayout()
+ file_b_container_layout.setContentsMargins(0, 0, 0, 0)
+ file_b_container_layout.addWidget(self.label_file_b)
+ file_b_container_layout.addWidget(self.line_edit_file_b)
+ file_b_container_layout.addWidget(self.btn_browse_b)
+ file_b_container.setLayout(file_b_container_layout)
+ file_b_container.setToolTip(help_text_b)
+
+ file_b_layout.addWidget(file_b_container)
+ layout.addLayout(file_b_layout)
+
+
+ help_text_suffix = "The events extracted from the blaze file."
+
+ suffix2_layout = QHBoxLayout()
+
+ help_btn_suffix = QPushButton("?")
+ help_btn_suffix.setFixedWidth(25)
+ help_btn_suffix.setToolTip(help_text_suffix)
+ help_btn_suffix.clicked.connect(lambda _, text=help_text_suffix: self.show_help_popup(text))
+ suffix2_layout.addWidget(help_btn_suffix)
+
+ suffix2_container = QWidget()
+ suffix2_container_layout = QHBoxLayout()
+ suffix2_container_layout.setContentsMargins(0, 0, 0, 0)
+ suffix2_container_layout.addWidget(self.label_events)
+ suffix2_container_layout.addWidget(self.combo_events)
+ suffix2_container.setLayout(suffix2_container_layout)
+ suffix2_container.setToolTip(help_text_suffix)
+
+ suffix2_layout.addWidget(suffix2_container)
+ layout.addLayout(suffix2_layout)
+
+ snirf_events_layout = QHBoxLayout()
+
+ help_text_snirf_events = "The event markers extracted from the SNIRF file."
+ help_btn_snirf_events = QPushButton("?")
+ help_btn_snirf_events.setFixedWidth(25)
+ help_btn_snirf_events.setToolTip(help_text_snirf_events)
+ help_btn_snirf_events.clicked.connect(lambda _, text=help_text_snirf_events: self.show_help_popup(text))
+ snirf_events_layout.addWidget(help_btn_snirf_events)
+
+ snirf_events_container = QWidget()
+ snirf_events_container_layout = QHBoxLayout()
+ snirf_events_container_layout.setContentsMargins(0, 0, 0, 0)
+ snirf_events_container_layout.addWidget(self.label_snirf_events)
+ snirf_events_container_layout.addWidget(self.combo_snirf_events)
+ snirf_events_container.setLayout(snirf_events_container_layout)
+ snirf_events_container.setToolTip(help_text_snirf_events)
+
+ snirf_events_layout.addWidget(snirf_events_container)
+ layout.addLayout(snirf_events_layout)
+
+ buttons_layout = QHBoxLayout()
+ buttons_layout.addStretch()
+ buttons_layout.addWidget(self.btn_clear)
+ buttons_layout.addWidget(self.btn_go)
+ layout.addLayout(buttons_layout)
+
+ self.setLayout(layout)
+
+
+ def show_help_popup(self, text):
+ msg = QMessageBox(self)
+ msg.setWindowTitle("Parameter Info - FLARES")
+ msg.setText(text)
+ msg.exec()
+
+ def browse_file_a(self):
+ file_path, _ = QFileDialog.getOpenFileName(self, "Select SNIRF File", "", "SNIRF Files (*.snirf)")
+ if file_path:
+ self.line_edit_file_a.setText(file_path)
+ try:
+ raw = read_raw_snirf(file_path, preload=False)
+ annotations = raw.annotations
+
+ # Build individual event entries
+ event_entries = []
+ for onset, description in zip(annotations.onset, annotations.description):
+ event_str = f"{description} @ {onset:.3f}s"
+ event_entries.append(event_str)
+
+ if not event_entries:
+ QMessageBox.information(self, "No Events", "No events found in SNIRF file.")
+ self.combo_snirf_events.clear()
+ self.combo_snirf_events.setEnabled(False)
+ return
+
+ self.combo_snirf_events.clear()
+ self.combo_snirf_events.addItems(event_entries)
+ self.combo_snirf_events.setEnabled(True)
+
+ except Exception as e:
+ QMessageBox.warning(self, "Error", f"Could not read SNIRF file with MNE:\n{str(e)}")
+ self.combo_snirf_events.clear()
+ self.combo_snirf_events.setEnabled(False)
+
+ def browse_file_b(self):
+ file_path, _ = QFileDialog.getOpenFileName(self, "Select BLAZES File", "", "BLAZES project Files (*.blaze)")
+ if file_path:
+ self.line_edit_file_b.setText(file_path)
+
+ try:
+ with open(file_path, 'r', encoding='utf-8') as f:
+ data = json.load(f)
+ self.blazes_data = data
+
+ obs_keys = self.extract_blazes_observation_strings(data)
+ self.combo_events.clear()
+ if obs_keys:
+ self.combo_events.addItems(obs_keys)
+ self.combo_events.setEnabled(True)
+ else:
+ QMessageBox.information(self, "No Events", "No observation keys found in BLAZES file.")
+ self.combo_events.setEnabled(False)
+
+ except (json.JSONDecodeError, FileNotFoundError, KeyError, TypeError) as e:
+ QMessageBox.warning(self, "Error", f"Failed to parse BLAZES file:\n{e}")
+ self.combo_events.clear()
+ self.combo_events.setEnabled(False)
+
+
+ def extract_blazes_observation_strings(self, data):
+ if "obs" not in data:
+ raise KeyError("Missing 'obs' key in BLAZES file.")
+
+ obs = data["obs"]
+ event_strings = []
+
+ for event_name, occurrences in obs.items():
+ # occurrences is a list of dicts: [{"start_frame": 642, "start_time_sec": 26.777, ...}]
+ for entry in occurrences:
+ onset = entry.get("start_time_sec", 0.0)
+ # Formatting to match your SNIRF style: "Event Name @ 0.000s"
+ display_str = f"{event_name} @ {onset:.3f}s"
+ event_strings.append(display_str)
+
+ return event_strings
+
+
+ def clear_files(self):
+ self.line_edit_file_a.clear()
+ self.line_edit_file_b.clear()
+
+
+ def go_action(self):
+ file_a = self.line_edit_file_a.text()
+ file_b = self.line_edit_file_b.text()
+ suffix = APP_NAME
+
+ if not hasattr(self, "blazes_data") or self.combo_events.count() == 0 or self.combo_snirf_events.count() == 0:
+ QMessageBox.warning(self, "Missing data", "Please make sure a BLAZES and SNIRF event are selected.")
+ return
+
+ try:
+ blaze_text = self.combo_events.currentText()
+ _, blaze_time_str = blaze_text.split(" @ ")
+ blaze_anchor_time = float(blaze_time_str.replace("s", "").strip())
+ except Exception as e:
+ QMessageBox.critical(self, "BLAZE Event Error", f"Could not parse BLAZE anchor:\n{e}")
+ return
+
+ try:
+ snirf_text = self.combo_snirf_events.currentText()
+ _, snirf_time_str = snirf_text.split(" @ ")
+ snirf_anchor_time = float(snirf_time_str.replace("s", "").strip())
+ except Exception as e:
+ QMessageBox.critical(self, "SNIRF Event Error", f"Could not parse SNIRF anchor:\n{e}")
+ return
+
+ time_shift = snirf_anchor_time - blaze_anchor_time
+
+ onsets, durations, descriptions = [], [], []
+ skipped_count = 0
+
+ try:
+ ai_data = self.blazes_data.get("ai_tracks", {})
+
+ for track_name, events in ai_data.items():
+ clean_name = track_name.replace("AI: ", "").strip()
+
+ for event in events:
+ original_start = event.get("start_time_sec", 0.0)
+ original_end = event.get("end_time_sec", original_start)
+ duration = original_end - original_start
+
+ # FILTER: Minimum 0.1s duration
+ if duration < 0.1:
+ skipped_count += 1
+ continue
+
+ # Apply shift
+ adjusted_onset = original_start + time_shift
+
+ onsets.append(round(adjusted_onset, 6))
+ durations.append(round(duration, 6))
+ descriptions.append(clean_name)
+
+ except Exception as e:
+ QMessageBox.critical(self, "Track Error", f"Failed to process tracks: {e}")
+ return
+
+ if not onsets:
+ QMessageBox.warning(self, "No Data", f"No events met the 0.1s threshold. (Skipped {skipped_count})")
+ return
+
+ if self.mode == EventUpdateMode.WRITE_SNIRF:
+ suggested_name = f"{os.path.splitext(os.path.basename(file_a))[0]}_{suffix}.snirf"
+ save_path, _ = QFileDialog.getSaveFileName(self, "Save SNIRF", suggested_name, "SNIRF Files (*.snirf)")
+
+ if not save_path: return
+ if not save_path.lower().endswith(".snirf"): save_path += ".snirf"
+
+ try:
+ raw = read_raw_snirf(file_a, preload=True)
+
+ # Create annotations
+ new_annotations = Annotations(
+ onset=onsets,
+ duration=durations,
+ description=descriptions
+ )
+
+ # Replace existing annotations with the new aligned AI tracks
+ raw.set_annotations(new_annotations)
+
+ write_raw_snirf(raw, save_path)
+ QMessageBox.information(self, "Success",
+ f"Aligned {len(onsets)} events.\n(Filtered out {skipped_count} short events)")
+ except Exception as e:
+ QMessageBox.critical(self, "Error", f"Failed to update SNIRF file:\n{e}")
+
+
+
+ def update_optode_positions(self, file_a, file_b, save_path):
+
+ fiducials = {}
+ ch_positions = {}
+
+ # Read the lines from the optode file
+ with open(file_b, 'r') as f:
+ for line in f:
+ if line.strip():
+ # Split by the semicolon and convert to meters
+ ch_name, coords_str = line.split(":")
+ coords = np.array(list(map(float, coords_str.strip().split()))) * 0.001
+
+ # The key we have is a fiducial
+ if ch_name.lower() in ['lpa', 'nz', 'rpa']:
+ fiducials[ch_name.lower()] = coords
+
+ # The key we have is a source or detector
+ else:
+ ch_positions[ch_name.upper()] = coords
+
+ # Create montage with updated coords in head space
+ initial_montage = make_dig_montage(ch_pos=ch_positions, nasion=fiducials.get('nz'), lpa=fiducials.get('lpa'), rpa=fiducials.get('rpa'), coord_frame='head') # type: ignore
+
+ # Read the SNIRF file, set the montage, and write it back
+ raw = read_raw_snirf(file_a, preload=True)
+ raw.set_montage(initial_montage)
+ write_raw_snirf(raw, save_path)
+
+
+ def _apply_events_to_snirf(self, raw, new_annotations, save_path):
+ raw.set_annotations(new_annotations)
+ write_raw_snirf(raw, save_path)
+
+ def _write_event_mapping_json(
+ self,
+ file_a,
+ file_b,
+ selected_obs,
+ snirf_anchor,
+ boris_anchor,
+ time_shift,
+ mapped_events,
+ save_path
+ ):
+ import json
+ from datetime import datetime
+ import os
+
+ payload = {
+ "source": {
+ "called_from": self.caller,
+ "snirf_file": os.path.basename(file_a),
+ "boris_file": os.path.basename(file_b),
+ "observation": selected_obs
+ },
+ "alignment": {
+ "snirf_anchor": snirf_anchor,
+ "boris_anchor": boris_anchor,
+ "time_shift_seconds": time_shift
+ },
+ "events": mapped_events,
+ "created_at": datetime.utcnow().isoformat() + "Z"
+ }
+
+ with open(save_path, "w", encoding="utf-8") as f:
+ json.dump(payload, f, indent=2)
+
+ return save_path
+
+
+
+
class ProgressBubble(QWidget):
"""
A clickable widget displaying a progress bar made of colored rectangles and a label.
@@ -2418,8 +2623,6 @@ class ParticipantBrainViewerWidget(FlaresBaseWidget):
self.thumb_size = QSize(280, 180)
self.showMaximized()
-
-
def show_brain_images(self):
@@ -2549,10 +2752,6 @@ class ParticipantBrainViewerWidget(FlaresBaseWidget):
print(f"No method defined for index {idx}")
-
-
-
-
class ParticipantFunctionalConnectivityWidget(FlaresBaseWidget):
def __init__(self, haemo_dict, epochs_dict):
super().__init__("FunctionalConnectivityWidget")
@@ -2860,7 +3059,6 @@ class GroupFunctionalConnectivityWidget(FlaresBaseWidget):
self.showMaximized()
-
def show_brain_images(self):
import flares
@@ -2954,7 +3152,6 @@ class GroupFunctionalConnectivityWidget(FlaresBaseWidget):
-
class MultiProgressDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
@@ -3538,11 +3735,6 @@ class ParameterInputDialog(QDialog):
-
-
-
-
-
class GroupViewerWidget(FlaresBaseWidget):
def __init__(self, haemo_dict, cha, df_ind, design_matrix, contrast_results, group):
super().__init__("GroupViewer")
@@ -3826,8 +4018,6 @@ class GroupViewerWidget(FlaresBaseWidget):
-
-
class GroupBrainViewerWidget(FlaresBaseWidget):
def __init__(self, haemo_dict, df_ind, design_matrix, group, contrast_results_dict):
super().__init__("GroupBrainViewer")
@@ -3926,13 +4116,10 @@ class GroupBrainViewerWidget(FlaresBaseWidget):
group_a = self.group_a_dropdown.currentText()
self.update_participant_list_for_group(group_a, self.participant_dropdown_a)
-
-
def _on_participants_changed(self, item=None):
self._update_event_dropdown()
-
def _update_event_dropdown(self):
participants_a = self._get_checked_items(self.participant_dropdown_a)
participants_b = self._get_checked_items(self.participant_dropdown_b)
@@ -4160,7 +4347,6 @@ class GroupBrainViewerWidget(FlaresBaseWidget):
-
class ViewerLauncherWidget(QWidget):
def __init__(self, haemo_dict, config_dict, fig_bytes_dict, cha_dict, contrast_results_dict, df_ind, design_matrix, epochs_dict):
super().__init__()
@@ -4197,7 +4383,6 @@ class ViewerLauncherWidget(QWidget):
btn6 = QPushButton("Open Export Data As CSV Viewer")
btn6.clicked.connect(lambda: self.open_export_data_as_csv_viewer(haemo_dict, cha_dict, df_ind, design_matrix, group_dict, contrast_results_dict))
-
layout.addWidget(btn1)
layout.addWidget(btn2)
layout.addWidget(btn3)
@@ -4272,10 +4457,21 @@ class MainApplication(QMainWindow):
self.files_failed = set() # set of failed file paths
self.files_results = {} # dict for successful results (if needed)
+ self.platform_suffix = "-" + PLATFORM_NAME
+
+ self.updater = UpdateManager(
+ main_window=self,
+ api_url=API_URL,
+ api_url_sec=API_URL_SECONDARY,
+ current_version=CURRENT_VERSION,
+ platform_name=PLATFORM_NAME,
+ platform_suffix=self.platform_suffix,
+ app_name=APP_NAME
+ )
+
self.init_ui()
self.create_menu_bar()
- self.platform_suffix = "-" + PLATFORM_NAME
self.pending_update_version = None
self.pending_update_path = None
self.last_clicked_bubble = None
@@ -4285,9 +4481,9 @@ class MainApplication(QMainWindow):
self.current_file = None
# Start local pending update check thread
- self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix)
- self.local_check_thread.pending_update_found.connect(self.on_pending_update_found)
- self.local_check_thread.no_pending_update.connect(self.on_no_pending_update)
+ self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix, PLATFORM_NAME, APP_NAME)
+ self.local_check_thread.pending_update_found.connect(self.updater.on_pending_update_found)
+ self.local_check_thread.no_pending_update.connect(self.updater.on_no_pending_update)
self.local_check_thread.start()
@@ -4503,15 +4699,16 @@ class MainApplication(QMainWindow):
options_actions = [
("User Guide", "F1", self.user_guide, resource_path("icons/help_24dp_1F1F1F.svg")),
- ("Check for Updates", "F5", self.manual_check_for_updates, resource_path("icons/update_24dp_1F1F1F.svg")),
+ ("Check for Updates", "F5", self.updater.manual_check_for_updates, resource_path("icons/update_24dp_1F1F1F.svg")),
("Update optodes in snirf file...", "F6", self.update_optode_positions, resource_path("icons/upgrade_24dp_1F1F1F.svg")),
- ("Update events in snirf file...", "F7", self.update_event_markers, resource_path("icons/upgrade_24dp_1F1F1F.svg")),
+ ("Update events in snirf file (BORIS)...", "F7", self.update_event_markers, resource_path("icons/upgrade_24dp_1F1F1F.svg")),
+ ("Update events in snirf file (BLAZES)...", "F8", self.update_event_markers_blazes, resource_path("icons/upgrade_24dp_1F1F1F.svg")),
("About", "F12", self.about_window, resource_path("icons/info_24dp_1F1F1F.svg"))
]
for i, (name, shortcut, slot, icon) in enumerate(options_actions):
options_menu.addAction(make_action(name, shortcut, slot, icon=icon))
- if i == 1 or i == 3: # after the first 2 actions (0,1)
+ if i == 1 or i == 4: # after the first 2 actions (0,1)
options_menu.addSeparator()
preferences_menu = menu_bar.addMenu("Preferences")
@@ -4595,11 +4792,9 @@ class MainApplication(QMainWindow):
def open_launcher_window(self):
-
self.launcher_window = ViewerLauncherWidget(self.raw_haemo_dict, self.config_dict, self.fig_bytes_dict, self.cha_dict, self.contrast_results_dict, self.df_ind_dict, self.design_matrix_dict, self.epochs_dict)
self.launcher_window.show()
-
def copy_text(self):
self.top_left_widget.copy() # Trigger copy
self.statusbar.showMessage("Copied to clipboard") # Show status message
@@ -4644,12 +4839,16 @@ class MainApplication(QMainWindow):
self.optodes = UpdateOptodesWindow(self)
self.optodes.show()
-
def update_event_markers(self):
if self.events is None or not self.events.isVisible():
self.events = UpdateEventsWindow(self, EventUpdateMode.WRITE_SNIRF, "Manual SNIRF Edit")
self.events.show()
+ def update_event_markers_blazes(self):
+ if self.events is None or not self.events.isVisible():
+ self.events = UpdateEventsBlazesWindow(self, EventUpdateMode.WRITE_SNIRF, "Manual SNIRF Edit")
+ self.events.show()
+
def open_file_dialog(self):
file_path, _ = QFileDialog.getOpenFileName(
self, "Open File", "", "SNIRF Files (*.snirf);;All Files (*)"
@@ -5426,7 +5625,6 @@ class MainApplication(QMainWindow):
-
def check_for_pipeline_results(self):
while not self.result_queue.empty():
msg = self.result_queue.get()
@@ -5578,147 +5776,6 @@ class MainApplication(QMainWindow):
bubble.update_progress(step_index)
- '''UPDATER'''
- def manual_check_for_updates(self):
- self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix)
- self.local_check_thread.pending_update_found.connect(self.on_pending_update_found)
- self.local_check_thread.no_pending_update.connect(self.on_no_pending_update)
- self.local_check_thread.start()
-
- def on_pending_update_found(self, version, folder_path):
- self.statusBar().showMessage(f"Pending update found: version {version}")
- self.pending_update_version = version
- self.pending_update_path = folder_path
- self.show_pending_update_popup()
-
- def on_no_pending_update(self):
- # No pending update found locally, start server check directly
- self.statusBar().showMessage("No pending local update found. Checking server...")
- self.start_update_check_thread()
-
- def show_pending_update_popup(self):
- msg_box = QMessageBox(self)
- msg_box.setWindowTitle("Pending Update Found")
- msg_box.setText(f"A previously downloaded update (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?")
- install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
- install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
- msg_box.exec()
-
- if msg_box.clickedButton() == install_now_button:
- self.install_update(self.pending_update_path)
- else:
- self.statusBar().showMessage("Pending update available. Install later.")
- # After user dismisses, still check the server for new updates
- self.start_update_check_thread()
-
- def start_update_check_thread(self):
- self.check_thread = UpdateCheckThread()
- self.check_thread.download_requested.connect(self.on_server_update_requested)
- self.check_thread.no_update_available.connect(self.on_server_no_update)
- self.check_thread.error_occurred.connect(self.on_error)
- self.check_thread.start()
-
- def on_server_no_update(self):
- self.statusBar().showMessage("No new updates found on server.", 5000)
-
- def on_server_update_requested(self, download_url, latest_version):
- if self.pending_update_version:
- cmp = self.version_compare(latest_version, self.pending_update_version)
- if cmp > 0:
- # Server version is newer than pending update
- self.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...")
- try:
- shutil.rmtree(self.pending_update_path)
- self.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}")
- except Exception as e:
- self.statusBar().showMessage(f"Failed to delete old update folder: {e}")
-
- # Clear pending update info so new download proceeds
- self.pending_update_version = None
- self.pending_update_path = None
-
- # Download the new update
- self.download_update(download_url, latest_version)
- elif cmp == 0:
- # Versions equal, no download needed
- self.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.")
- else:
- # Server version older than pending? Unlikely but just keep pending update
- self.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.")
- else:
- # No pending update, just download
- self.download_update(download_url, latest_version)
-
- def download_update(self, download_url, latest_version):
- self.statusBar().showMessage("Downloading update...")
- self.download_thread = UpdateDownloadThread(download_url, latest_version)
- self.download_thread.update_ready.connect(self.on_update_ready)
- self.download_thread.error_occurred.connect(self.on_error)
- self.download_thread.start()
-
- def on_update_ready(self, latest_version, extract_folder):
- self.statusBar().showMessage("Update downloaded and extracted.")
-
- msg_box = QMessageBox(self)
- msg_box.setWindowTitle("Update Ready")
- msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?")
- install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
- install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
-
- msg_box.exec()
-
- if msg_box.clickedButton() == install_now_button:
- self.install_update(extract_folder)
- else:
- self.statusBar().showMessage("Update ready. Install later.")
-
-
- def install_update(self, extract_folder):
- # Path to updater executable
-
- if PLATFORM_NAME == 'windows':
- updater_path = os.path.join(os.getcwd(), "flares_updater.exe")
- elif PLATFORM_NAME == 'darwin':
- if getattr(sys, 'frozen', False):
- updater_path = os.path.join(os.path.dirname(sys.executable), "../../../flares_updater.app")
- else:
- updater_path = os.path.join(os.getcwd(), "../flares_updater.app")
-
- elif PLATFORM_NAME == 'linux':
- updater_path = os.path.join(os.getcwd(), "flares_updater")
- else:
- updater_path = os.getcwd()
-
- if not os.path.exists(updater_path):
- QMessageBox.critical(self, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}")
- return
-
- # Launch updater with extracted folder path as argument
- try:
- # Pass current app's executable path for updater to relaunch
- main_app_executable = os.path.abspath(sys.argv[0])
-
- print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"')
-
- if PLATFORM_NAME == 'darwin':
- subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable])
- else:
- subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path))
-
- # Close the current app so updater can replace files
- sys.exit(0)
-
- except Exception as e:
- QMessageBox.critical(self, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}")
-
- def on_error(self, message):
- # print(f"Error: {message}")
- self.statusBar().showMessage(f"Error occurred during update process. {message}")
-
- def version_compare(self, v1, v2):
- def normalize(v): return [int(x) for x in v.split(".")]
- return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
-
def get_snirf_metadata_mne(self, file_name):
try:
@@ -5788,162 +5845,6 @@ class MainApplication(QMainWindow):
event.accept()
-def wait_for_process_to_exit(process_name, timeout=10):
- """
- Waits for a process with the specified name to exit within a timeout period.
-
- Args:
- process_name (str): Name (or part of the name) of the process to wait for.
- timeout (int, optional): Maximum time to wait in seconds. Defaults to 10.
-
- Returns:
- bool: True if the process exited before the timeout, False otherwise.
- """
-
- print(f"Waiting for {process_name} to exit...")
- deadline = time.time() + timeout
- while time.time() < deadline:
- still_running = False
- for proc in psutil.process_iter(['name']):
- try:
- if proc.info['name'] and process_name.lower() in proc.info['name'].lower():
- still_running = True
- print(f"Still running: {proc.info['name']} (PID: {proc.pid})")
- break
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- continue
- if not still_running:
- print(f"{process_name} has exited.")
- return True
- time.sleep(0.5)
- print(f"{process_name} did not exit in time.")
- return False
-
-
-def finish_update_if_needed():
- """
- Completes a pending application update if '--finish-update' is present in the command-line arguments.
- """
-
- if "--finish-update" in sys.argv:
- print("Finishing update...")
-
- if PLATFORM_NAME == 'darwin':
- app_dir = '/tmp/flarestempupdate'
- else:
- app_dir = os.getcwd()
-
- # 1. Find update folder
- update_folder = None
- for entry in os.listdir(app_dir):
- entry_path = os.path.join(app_dir, entry)
- if os.path.isdir(entry_path) and entry.startswith("flares-") and entry.endswith("-" + PLATFORM_NAME):
- update_folder = os.path.join(app_dir, entry)
- break
-
- if update_folder is None:
- print("No update folder found. Skipping update steps.")
- return
-
- if PLATFORM_NAME == 'darwin':
- update_folder = os.path.join(update_folder, "flares-darwin")
-
- # 2. Wait for flares_updater to exit
- print("Waiting for flares_updater to exit...")
- for proc in psutil.process_iter(['pid', 'name']):
- if proc.info['name'] and "flares_updater" in proc.info['name'].lower():
- try:
- proc.wait(timeout=5)
- except psutil.TimeoutExpired:
- print("Force killing lingering flares_updater")
- proc.kill()
-
- # 3. Replace the updater
- if PLATFORM_NAME == 'windows':
- new_updater = os.path.join(update_folder, "flares_updater.exe")
- dest_updater = os.path.join(app_dir, "flares_updater.exe")
-
- elif PLATFORM_NAME == 'darwin':
- new_updater = os.path.join(update_folder, "flares_updater.app")
- dest_updater = os.path.abspath(os.path.join(sys.executable, "../../../../flares_updater.app"))
-
- elif PLATFORM_NAME == 'linux':
- new_updater = os.path.join(update_folder, "flares_updater")
- dest_updater = os.path.join(app_dir, "flares_updater")
-
- else:
- print("No platform??")
- new_updater = os.getcwd()
- dest_updater = os.getcwd()
-
- print(f"New updater is {new_updater}")
- print(f"Dest updater is {dest_updater}")
-
- print("Writable?", os.access(dest_updater, os.W_OK))
- print("Executable path:", sys.executable)
- print("Trying to copy:", new_updater, "->", dest_updater)
-
- if os.path.exists(new_updater):
- try:
- if os.path.exists(dest_updater):
- if PLATFORM_NAME == 'darwin':
- try:
- if os.path.isdir(dest_updater):
- shutil.rmtree(dest_updater)
- print(f"Deleted directory: {dest_updater}")
- else:
- os.remove(dest_updater)
- print(f"Deleted file: {dest_updater}")
- except Exception as e:
- print(f"Error deleting {dest_updater}: {e}")
- else:
- os.remove(dest_updater)
-
- if PLATFORM_NAME == 'darwin':
- wait_for_process_to_exit("flares_updater", timeout=10)
- subprocess.check_call(["ditto", new_updater, dest_updater])
- else:
- shutil.copy2(new_updater, dest_updater)
-
- if PLATFORM_NAME in ('linux', 'darwin'):
- os.chmod(dest_updater, 0o755)
-
- if PLATFORM_NAME == 'darwin':
- remove_quarantine(dest_updater)
-
- print("flares_updater replaced.")
- except Exception as e:
- print(f"Failed to replace flares_updater: {e}")
-
- # 4. Delete the update folder
- try:
- if PLATFORM_NAME == 'darwin':
- shutil.rmtree(app_dir)
- else:
- shutil.rmtree(update_folder)
- except Exception as e:
- print(f"Failed to delete update folder: {e}")
-
- QMessageBox.information(None, "Update Complete", "The application has been successfully updated.")
- sys.argv.remove("--finish-update")
-
-
-def remove_quarantine(app_path):
- """
- Removes the macOS quarantine attribute from the specified application path.
- """
-
- script = f'''
- do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "FLARES needs privileges to finish the update. (2/2)"
- '''
- try:
- subprocess.run(['osascript', '-e', script], check=True)
- print("✅ Quarantine attribute removed.")
- except subprocess.CalledProcessError as e:
- print("❌ Failed to remove quarantine attribute.")
- print(e)
-
-
def run_gui_entry_wrapper(config, gui_queue, progress_queue):
"""
Where the processing happens
@@ -6080,7 +5981,7 @@ if __name__ == "__main__":
# Only run GUI in the main process
if current_process().name == 'MainProcess':
app = QApplication(sys.argv)
- finish_update_if_needed()
+ finish_update_if_needed(PLATFORM_NAME, APP_NAME)
window = MainApplication()
if PLATFORM_NAME == "darwin":
@@ -6092,4 +5993,4 @@ if __name__ == "__main__":
window.show()
sys.exit(app.exec())
-# Not 4000 lines yay!
\ No newline at end of file
+# Not 6000 lines yay!
\ No newline at end of file
diff --git a/updater.py b/updater.py
new file mode 100644
index 0000000..154fe18
--- /dev/null
+++ b/updater.py
@@ -0,0 +1,540 @@
+"""
+Filename: updater.py
+Description: Generic updater file
+
+Author: Tyler de Zeeuw
+License: GPL-3.0
+"""
+
+
+# Built-in imports
+import os
+import re
+import sys
+import time
+import shlex
+import shutil
+import zipfile
+import traceback
+import subprocess
+
+# External library imports
+import psutil
+import requests
+
+from PySide6.QtWidgets import QMessageBox
+from PySide6.QtCore import QThread, Signal, QObject
+
+
+class UpdateDownloadThread(QThread):
+ """
+ Thread that downloads and extracts an update package and emits a signal on completion or error.
+
+ Args:
+ download_url (str): URL of the update zip file to download.
+ latest_version (str): Version string of the latest update.
+ """
+
+ update_ready = Signal(str, str)
+ error_occurred = Signal(str)
+
+ def __init__(self, download_url, latest_version, platform_name, app_name):
+ super().__init__()
+ self.download_url = download_url
+ self.latest_version = latest_version
+ self.platform_name = platform_name
+ self.app_name = app_name
+
+ def run(self):
+ try:
+ local_filename = os.path.basename(self.download_url)
+
+ if self.platform_name == 'darwin':
+ tmp_dir = f'/tmp/{self.app_name}tempupdate'
+ os.makedirs(tmp_dir, exist_ok=True)
+ local_path = os.path.join(tmp_dir, local_filename)
+ else:
+ local_path = os.path.join(os.getcwd(), local_filename)
+
+ # Download the file
+ with requests.get(self.download_url, stream=True, timeout=15) as r:
+ r.raise_for_status()
+ with open(local_path, 'wb') as f:
+ for chunk in r.iter_content(chunk_size=8192):
+ if chunk:
+ f.write(chunk)
+
+ # Extract folder name (remove .zip)
+ if self.platform_name == 'darwin':
+ extract_folder = os.path.splitext(local_filename)[0]
+ extract_path = os.path.join(tmp_dir, extract_folder)
+
+ else:
+ extract_folder = os.path.splitext(local_filename)[0]
+ extract_path = os.path.join(os.getcwd(), extract_folder)
+
+ # Create the folder if not exists
+ os.makedirs(extract_path, exist_ok=True)
+
+ # Extract the zip file contents
+ if self.platform_name == 'darwin':
+ subprocess.run(['ditto', '-xk', local_path, extract_path], check=True)
+ else:
+ with zipfile.ZipFile(local_path, 'r') as zip_ref:
+ zip_ref.extractall(extract_path)
+
+ # Remove the zip once extracted and emit a signal
+ os.remove(local_path)
+ self.update_ready.emit(self.latest_version, extract_path)
+
+ except Exception as e:
+ # Emit a signal signifying failure
+ self.error_occurred.emit(str(e))
+
+
+
+class UpdateCheckThread(QThread):
+ """
+ Thread that checks for updates by querying the API and emits a signal based on the result.
+
+ Signals:
+ download_requested(str, str): Emitted with (download_url, latest_version) when an update is available.
+ no_update_available(): Emitted when no update is found or current version is up to date.
+ error_occurred(str): Emitted with an error message if the update check fails.
+ """
+
+ download_requested = Signal(str, str)
+ no_update_available = Signal()
+ error_occurred = Signal(str)
+
+ def __init__(self, api_url, api_url_sec, current_version, platform_name, app_name):
+ super().__init__()
+ self.api_url = api_url
+ self.api_url_sec = api_url_sec
+ self.current_version = current_version
+ self.platform_name = platform_name
+ self.app_name = app_name
+
+ def run(self):
+ # if not getattr(sys, 'frozen', False):
+ # self.error_occurred.emit("Application is not frozen (Development mode).")
+ # return
+ try:
+ latest_version, download_url = self.get_latest_release_for_platform()
+ if not latest_version:
+ self.no_update_available.emit()
+ return
+
+ if not download_url:
+ self.error_occurred.emit(f"No download available for platform '{self.platform_name}'")
+ return
+
+ if self.version_compare(latest_version, self.current_version) > 0:
+ self.download_requested.emit(download_url, latest_version)
+ else:
+ self.no_update_available.emit()
+
+ except Exception as e:
+ self.error_occurred.emit(f"Update check failed: {e}")
+
+ def version_compare(self, v1, v2):
+ def normalize(v): return [int(x) for x in v.split(".")]
+ return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
+
+ def get_latest_release_for_platform(self):
+ urls = [self.api_url, self.api_url_sec]
+ for url in urls:
+ try:
+
+ response = requests.get(url, timeout=5)
+ response.raise_for_status()
+ releases = response.json()
+
+ if not releases:
+ continue
+
+ latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None)
+
+ if not latest:
+ continue
+
+ tag = latest["tag_name"].lstrip("v")
+
+ for asset in latest.get("assets", []):
+ if self.platform_name in asset["name"].lower():
+ return tag, asset["browser_download_url"]
+
+ return tag, None
+ except (requests.RequestException, ValueError) as e:
+ continue
+ return None, None
+
+
+class LocalPendingUpdateCheckThread(QThread):
+ """
+ Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly.
+
+ Args:
+ current_version (str): Current application version.
+ platform_suffix (str): Platform-specific suffix to identify update folders.
+ """
+
+ pending_update_found = Signal(str, str)
+ no_pending_update = Signal()
+
+ def __init__(self, current_version, platform_suffix, platform_name, app_name):
+ super().__init__()
+ self.current_version = current_version
+ self.platform_suffix = platform_suffix
+ self.platform_name = platform_name
+ self.app_name = app_name
+
+ def version_compare(self, v1, v2):
+ def normalize(v): return [int(x) for x in v.split(".")]
+ return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
+
+ def run(self):
+ if self.platform_name == 'darwin':
+ cwd = f'/tmp/{self.app_name}tempupdate'
+ else:
+ cwd = os.getcwd()
+
+ pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$")
+ found = False
+
+ try:
+ for item in os.listdir(cwd):
+ folder_path = os.path.join(cwd, item)
+ if os.path.isdir(folder_path) and item.endswith(self.platform_suffix):
+ match = pattern.match(item)
+ if match:
+ folder_version = match.group(1)
+ if self.version_compare(folder_version, self.current_version) > 0:
+ self.pending_update_found.emit(folder_version, folder_path)
+ found = True
+ break
+ except:
+ pass
+
+ if not found:
+ self.no_pending_update.emit()
+
+
+
+
+
+class UpdateManager(QObject):
+ """
+ Orchestrates the update process.
+ Main apps should instantiate this and call check_for_updates().
+ """
+
+ def __init__(self, main_window, api_url, api_url_sec, current_version, platform_name, platform_suffix, app_name):
+ super().__init__()
+ self.parent = main_window
+ self.api_url = api_url
+ self.api_url_sec = api_url_sec
+ self.current_version = current_version
+ self.platform_name = platform_name
+ self.platform_suffix = platform_suffix
+ self.app_name = app_name
+
+ self.pending_update_version = None
+ self.pending_update_path = None
+
+
+ def manual_check_for_updates(self):
+ self.local_check_thread = LocalPendingUpdateCheckThread(self.current_version, self.platform_suffix, self.platform_name, self.app_name)
+ self.local_check_thread.pending_update_found.connect(self.on_pending_update_found)
+ self.local_check_thread.no_pending_update.connect(self.on_no_pending_update)
+ self.local_check_thread.start()
+
+ def on_pending_update_found(self, version, folder_path):
+ self.parent.statusBar().showMessage(f"Pending update found: version {version}")
+ self.pending_update_version = version
+ self.pending_update_path = folder_path
+ self.show_pending_update_popup()
+
+ def on_no_pending_update(self):
+ # No pending update found locally, start server check directly
+ self.parent.statusBar().showMessage("No pending local update found. Checking server...")
+ self.start_update_check_thread()
+
+ def show_pending_update_popup(self):
+ msg_box = QMessageBox(self.parent)
+ msg_box.setWindowTitle("Pending Update Found")
+ msg_box.setText(f"A previously downloaded update for {self.app_name.upper()} (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?")
+ install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
+ install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
+ msg_box.exec()
+
+ if msg_box.clickedButton() == install_now_button:
+ self.install_update(self.pending_update_path)
+ else:
+ self.parent.statusBar().showMessage("Pending update available. Install later.")
+ # After user dismisses, still check the server for new updates
+ self.start_update_check_thread()
+
+ def start_update_check_thread(self):
+ self.check_thread = UpdateCheckThread(self.api_url, self.api_url_sec, self.current_version, self.platform_name, self.app_name)
+ self.check_thread.download_requested.connect(self.on_server_update_requested)
+ self.check_thread.no_update_available.connect(self.on_server_no_update)
+ self.check_thread.error_occurred.connect(self.on_error)
+ self.check_thread.start()
+
+ def on_server_no_update(self):
+ self.parent.statusBar().showMessage("No new updates found on server.", 5000)
+
+ def on_server_update_requested(self, download_url, latest_version):
+ if self.pending_update_version:
+ cmp = self.version_compare(latest_version, self.pending_update_version)
+ if cmp > 0:
+ # Server version is newer than pending update
+ self.parent.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...")
+ try:
+ shutil.rmtree(self.pending_update_path)
+ self.parent.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}")
+ except Exception as e:
+ self.parent.statusBar().showMessage(f"Failed to delete old update folder: {e}")
+
+ # Clear pending update info so new download proceeds
+ self.pending_update_version = None
+ self.pending_update_path = None
+
+ # Download the new update
+ self.download_update(download_url, latest_version)
+ elif cmp == 0:
+ # Versions equal, no download needed
+ self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.")
+ else:
+ # Server version older than pending? Unlikely but just keep pending update
+ self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.")
+ else:
+ # No pending update, just download
+ self.download_update(download_url, latest_version)
+
+ def download_update(self, download_url, latest_version):
+ self.parent.statusBar().showMessage("Downloading update...")
+ self.download_thread = UpdateDownloadThread(download_url, latest_version, self.platform_name, self.app_name)
+ self.download_thread.update_ready.connect(self.on_update_ready)
+ self.download_thread.error_occurred.connect(self.on_error)
+ self.download_thread.start()
+
+ def on_update_ready(self, latest_version, extract_folder):
+ self.parent.statusBar().showMessage("Update downloaded and extracted.")
+
+ msg_box = QMessageBox(self.parent)
+ msg_box.setWindowTitle("Update Ready")
+ msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?")
+ install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
+ install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
+
+ msg_box.exec()
+
+ if msg_box.clickedButton() == install_now_button:
+ self.install_update(extract_folder)
+ else:
+ self.parent.statusBar().showMessage("Update ready. Install later.")
+
+
+ def install_update(self, extract_folder):
+ # Path to updater executable
+
+ if self.platform_name == 'windows':
+ updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater.exe")
+ elif self.platform_name == 'darwin':
+ if getattr(sys, 'frozen', False):
+ updater_path = os.path.join(os.path.dirname(sys.executable), f"../../../{self.app_name}_updater.app")
+ else:
+ updater_path = os.path.join(os.getcwd(), f"../{self.app_name}_updater.app")
+
+ elif self.platform_name == 'linux':
+ updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater")
+ else:
+ updater_path = os.getcwd()
+
+ if not os.path.exists(updater_path):
+ QMessageBox.critical(self.parent, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}")
+ return
+
+ # Launch updater with extracted folder path as argument
+ try:
+ # Pass current app's executable path for updater to relaunch
+ main_app_executable = os.path.abspath(sys.argv[0])
+
+ print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"')
+
+ if self.platform_name == 'darwin':
+ subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable])
+ else:
+ subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path))
+
+ # Close the current app so updater can replace files
+ sys.exit(0)
+
+ except Exception as e:
+ QMessageBox.critical(self.parent, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}")
+
+ def on_error(self, message):
+ # print(f"Error: {message}")
+ self.parent.statusBar().showMessage(f"Error occurred during update process. {message}")
+
+ def version_compare(self, v1, v2):
+ def normalize(v): return [int(x) for x in v.split(".")]
+ return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
+
+
+def wait_for_process_to_exit(process_name, timeout=10):
+ """
+ Waits for a process with the specified name to exit within a timeout period.
+
+ Args:
+ process_name (str): Name (or part of the name) of the process to wait for.
+ timeout (int, optional): Maximum time to wait in seconds. Defaults to 10.
+
+ Returns:
+ bool: True if the process exited before the timeout, False otherwise.
+ """
+
+ print(f"Waiting for {process_name} to exit...")
+ deadline = time.time() + timeout
+ while time.time() < deadline:
+ still_running = False
+ for proc in psutil.process_iter(['name']):
+ try:
+ if proc.info['name'] and process_name.lower() in proc.info['name'].lower():
+ still_running = True
+ print(f"Still running: {proc.info['name']} (PID: {proc.pid})")
+ break
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
+ continue
+ if not still_running:
+ print(f"{process_name} has exited.")
+ return True
+ time.sleep(0.5)
+ print(f"{process_name} did not exit in time.")
+ return False
+
+
+def finish_update_if_needed(platform_name, app_name):
+ """
+ Completes a pending application update if '--finish-update' is present in the command-line arguments.
+ """
+
+ if "--finish-update" in sys.argv:
+ print("Finishing update...")
+
+ if platform_name == 'darwin':
+ app_dir = f'/tmp/{app_name}tempupdate'
+ else:
+ app_dir = os.getcwd()
+
+ # 1. Find update folder
+ update_folder = None
+ for entry in os.listdir(app_dir):
+ entry_path = os.path.join(app_dir, entry)
+ if os.path.isdir(entry_path) and entry.startswith(f"{app_name}-") and entry.endswith("-" + platform_name):
+ update_folder = os.path.join(app_dir, entry)
+ break
+
+ if update_folder is None:
+ print("No update folder found. Skipping update steps.")
+ return
+
+ if platform_name == 'darwin':
+ update_folder = os.path.join(update_folder, f"{app_name}-darwin")
+
+ # 2. Wait for updater to exit
+ print(f"Waiting for {app_name}_updater to exit...")
+ for proc in psutil.process_iter(['pid', 'name']):
+ if proc.info['name'] and f"{app_name}_updater" in proc.info['name'].lower():
+ try:
+ proc.wait(timeout=5)
+ except psutil.TimeoutExpired:
+ print(f"Force killing lingering {app_name}_updater")
+ proc.kill()
+
+ # 3. Replace the updater
+ if platform_name == 'windows':
+ new_updater = os.path.join(update_folder, f"{app_name}_updater.exe")
+ dest_updater = os.path.join(app_dir, f"{app_name}_updater.exe")
+
+ elif platform_name == 'darwin':
+ new_updater = os.path.join(update_folder, f"{app_name}_updater.app")
+ dest_updater = os.path.abspath(os.path.join(sys.executable, f"../../../../{app_name}_updater.app"))
+
+ elif platform_name == 'linux':
+ new_updater = os.path.join(update_folder, f"{app_name}_updater")
+ dest_updater = os.path.join(app_dir, f"{app_name}_updater")
+
+ else:
+ print("Unknown Platform")
+ new_updater = os.getcwd()
+ dest_updater = os.getcwd()
+
+ print(f"New updater is {new_updater}")
+ print(f"Dest updater is {dest_updater}")
+
+ print("Writable?", os.access(dest_updater, os.W_OK))
+ print("Executable path:", sys.executable)
+ print("Trying to copy:", new_updater, "->", dest_updater)
+
+ if os.path.exists(new_updater):
+ try:
+ if os.path.exists(dest_updater):
+ if platform_name == 'darwin':
+ try:
+ if os.path.isdir(dest_updater):
+ shutil.rmtree(dest_updater)
+ print(f"Deleted directory: {dest_updater}")
+ else:
+ os.remove(dest_updater)
+ print(f"Deleted file: {dest_updater}")
+ except Exception as e:
+ print(f"Error deleting {dest_updater}: {e}")
+ else:
+ os.remove(dest_updater)
+
+ if platform_name == 'darwin':
+ wait_for_process_to_exit(f"{app_name}_updater", timeout=10)
+ subprocess.check_call(["ditto", new_updater, dest_updater])
+ else:
+ shutil.copy2(new_updater, dest_updater)
+
+ if platform_name in ('linux', 'darwin'):
+ os.chmod(dest_updater, 0o755)
+
+ if platform_name == 'darwin':
+ remove_quarantine(dest_updater, app_name)
+
+ print(f"{app_name}_updater replaced.")
+ except Exception as e:
+ print(f"Failed to replace {app_name}_updater: {e}")
+
+ # 4. Delete the update folder
+ try:
+ if platform_name == 'darwin':
+ shutil.rmtree(app_dir)
+ else:
+ shutil.rmtree(update_folder)
+ except Exception as e:
+ print(f"Failed to delete update folder: {e}")
+
+ QMessageBox.information(None, "Update Complete", "The application has been successfully updated.")
+ sys.argv.remove("--finish-update")
+
+
+def remove_quarantine(app_path, app_name):
+ """
+ Removes the macOS quarantine attribute from the specified application path.
+ """
+
+ script = f'''
+ do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{app_name.upper()} needs privileges to finish the update. (2/2)"
+ '''
+ try:
+ subprocess.run(['osascript', '-e', script], check=True)
+ print("✅ Quarantine attribute removed.")
+ except subprocess.CalledProcessError as e:
+ print("❌ Failed to remove quarantine attribute.")
+ print(e)
\ No newline at end of file