""" Filename: main.py Description: FLARES main executable Author: Tyler de Zeeuw License: GPL-3.0 """ # Built-in imports import os import re import sys import time import shlex import pickle import shutil import zipfile import platform import traceback import subprocess from pathlib import Path from datetime import datetime from multiprocessing import Process, current_process, freeze_support, Manager # External library imports import psutil import requests from PySide6.QtWidgets import ( QApplication, QWidget, QMessageBox, QVBoxLayout, QHBoxLayout, QTextEdit, QScrollArea, QComboBox, QGridLayout, QPushButton, QMainWindow, QFileDialog, QLabel, QLineEdit, QFrame, QSizePolicy ) from PySide6.QtCore import QThread, Signal, Qt, QTimer from PySide6.QtGui import QAction, QKeySequence, QIcon, QIntValidator, QDoubleValidator CURRENT_VERSION = "1.0.0" API_URL = "https://git.research.dezeeuw.ca/api/v1/repos/tyler/flares/releases" PLATFORM_NAME = platform.system().lower() # Selectable parameters on the right side of the window SECTIONS = [ { "title": "Preprocessing", "params": [ {"name": "SECONDS_TO_STRIP", "default": 0, "type": int, "help": "Seconds to remove from beginning of file. Setting this to 0 will remove nothing from the file."}, {"name": "DOWNSAMPLE", "default": True, "type": bool, "help": "Downsample snirf files."}, {"name": "DOWNSAMPLE_FREQUENCY", "default": 25, "type": int, "help": "Frequency (Hz) to downsample to. If this is set higher than the input data, new data will be interpolated."}, {"name": "FORCE_DROP_CHANNELS", "default": "", "type": str, "help": "Channels to forcibly drop (comma separated)."}, {"name": "SOURCE_DETECTOR_SEPARATOR", "default": "_", "type": str, "help": "Separator between source and detector."}, ] }, { "title": "Update Optode Positions", "params": [ {"name": "OPTODE_FILE", "default": True, "type": bool, "help": "Use optode file to update positions."}, {"name": "OPTODE_FILE_PATH", "default": "", "type": str, "help": "Optode file location."}, {"name": "OPTODE_FILE_SEPARATOR", "default": ":", "type": str, "help": "Separator in optode file."}, ] }, { "title": "Temporal Derivative Distribution Repair filtering", "params": [ {"name": "TDDR", "default": True, "type": bool, "help": "Apply TDDR filtering."}, ] }, { "title": "Wavelet filtering", "params": [ {"name": "WAVELET", "default": True, "type": bool, "help": "Apply Wavelet filtering."}, {"name": "IQR", "default": 1.5, "type": float, "help": "Interquartile Range for Wavelet filter."}, ] }, { "title": "Heart rate", "params": [ {"name": "HEART_RATE", "default": True, "type": bool, "help": "Calculate heart rate."}, {"name": "SECONDS_TO_STRIP_HR", "default": 5, "type": int, "help": "Seconds to strip for HR calculation."}, {"name": "MAX_LOW_HR", "default": 40, "type": int, "help": "Minimum heart rate value."}, {"name": "MAX_HIGH_HR", "default": 200, "type": int, "help": "Maximum heart rate value."}, {"name": "SMOOTHING_WINDOW_HR", "default": 100, "type": int, "help": "Rolling average window for HR."}, {"name": "HEART_RATE_WINDOW", "default": 25, "type": int, "help": "Range of BPM around average."}, {"name": "SHORT_CHANNEL", "default": True, "type": bool, "help": "Indicates if data has short channel."}, {"name": "SHORT_CHANNEL_THRESH", "default": 0.013, "type": float, "help": "Threshold for short channel (m)."}, ] }, { "title": "Scalp Coupling Index / Peak Spectral Power / Signal to Noise Ratio", "params": [ {"name": "SCI", "default": True, "type": bool, "help": "Calculate Scalp Coupling Index."}, {"name": "SCI_TIME_WINDOW", "default": 3, "type": int, "help": "SCI time window."}, {"name": "SCI_THRESHOLD", "default": 0.6, "type": float, "help": "SCI threshold (0-1)."}, {"name": "PSP", "default": True, "type": bool, "help": "Calculate Peak Spectral Power."}, {"name": "PSP_TIME_WINDOW", "default": 3, "type": int, "help": "PSP time window."}, {"name": "PSP_THRESHOLD", "default": 0.1, "type": float, "help": "PSP threshold."}, {"name": "SNR", "default": True, "type": bool, "help": "Calculate Signal to Noise Ratio."}, {"name": "SNR_TIME_WINDOW", "default": -1, "type": int, "help": "SNR time window."}, {"name": "SNR_THRESHOLD", "default": 2.0, "type": float, "help": "SNR threshold (dB)."}, ] }, { "title": "Drop bad channels", "params": [ {"name": "EXCLUDE_CHANNELS", "default": True, "type": bool, "help": "Drop channels failing metrics."}, {"name": "MAX_BAD_CHANNELS", "default": 15, "type": int, "help": "Max bad channels allowed."}, {"name": "LONG_CHANNEL_THRESH", "default": 0.045, "type": float, "help": "Max distance (m) for channel."}, ] }, { "title": "Optical Density", "params": [ # Intentionally empty (TODO) ] }, { "title": "Haemoglobin Concentration", "params": [ {"name": "PPF", "default": 0.1, "type": float, "help": "Partial Pathlength Factor."}, ] }, { "title": "Design Matrix", "params": [ {"name": "DRIFT_MODEL", "default": "cosine", "type": str, "help": "Drift model for GLM."}, {"name": "DURATION_BETWEEN_ACTIVITIES", "default": 35, "type": int, "help": "Time between activities (s)."}, {"name": "SHORT_CHANNEL_REGRESSION", "default": True, "type": bool, "help": "Use short channel regression."}, ] }, { "title": "General Linear Model", "params": [ {"name": "N_JOBS", "default": 1, "type": int, "help": "Number of jobs for processing."}, ] }, { "title": "Finishing Touches", "params": [ # Intentionally empty (TODO) ] }, ] class UpdateDownloadThread(QThread): """ Thread that downloads and extracts an update package and emits a signal on completion or error. Args: download_url (str): URL of the update zip file to download. latest_version (str): Version string of the latest update. """ update_ready = Signal(str, str) error_occurred = Signal(str) def __init__(self, download_url, latest_version): super().__init__() self.download_url = download_url self.latest_version = latest_version def run(self): try: local_filename = os.path.basename(self.download_url) if PLATFORM_NAME == 'darwin': tmp_dir = '/tmp/flarestempupdate' os.makedirs(tmp_dir, exist_ok=True) local_path = os.path.join(tmp_dir, local_filename) else: local_path = os.path.join(os.getcwd(), local_filename) # Download the file with requests.get(self.download_url, stream=True, timeout=15) as r: r.raise_for_status() with open(local_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) # Extract folder name (remove .zip) if PLATFORM_NAME == 'darwin': extract_folder = os.path.splitext(local_filename)[0] extract_path = os.path.join(tmp_dir, extract_folder) else: extract_folder = os.path.splitext(local_filename)[0] extract_path = os.path.join(os.getcwd(), extract_folder) # Create the folder if not exists os.makedirs(extract_path, exist_ok=True) # Extract the zip file contents if PLATFORM_NAME == 'darwin': subprocess.run(['ditto', '-xk', local_path, extract_path], check=True) else: with zipfile.ZipFile(local_path, 'r') as zip_ref: zip_ref.extractall(extract_path) # Remove the zip once extracted and emit a signal os.remove(local_path) self.update_ready.emit(self.latest_version, extract_path) except Exception as e: # Emit a signal signifying failure self.error_occurred.emit(str(e)) class UpdateCheckThread(QThread): """ Thread that checks for updates by querying the API and emits a signal based on the result. Signals: download_requested(str, str): Emitted with (download_url, latest_version) when an update is available. no_update_available(): Emitted when no update is found or current version is up to date. error_occurred(str): Emitted with an error message if the update check fails. """ download_requested = Signal(str, str) no_update_available = Signal() error_occurred = Signal(str) def run(self): try: latest_version, download_url = self.get_latest_release_for_platform() if not latest_version: self.no_update_available.emit() return if not download_url: self.error_occurred.emit(f"No download available for platform '{PLATFORM_NAME}'") return if self.version_compare(latest_version, CURRENT_VERSION) > 0: self.download_requested.emit(download_url, latest_version) else: self.no_update_available.emit() except Exception as e: self.error_occurred.emit(f"Update check failed: {e}") def version_compare(self, v1, v2): def normalize(v): return [int(x) for x in v.split(".")] return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) def get_latest_release_for_platform(self): response = requests.get(API_URL, timeout=5) response.raise_for_status() releases = response.json() if not releases: return None, None latest = releases[0] tag = latest["tag_name"].lstrip("v") for asset in latest.get("assets", []): if PLATFORM_NAME in asset["name"].lower(): return tag, asset["browser_download_url"] return tag, None class LocalPendingUpdateCheckThread(QThread): """ Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly. Args: current_version (str): Current application version. platform_suffix (str): Platform-specific suffix to identify update folders. """ pending_update_found = Signal(str, str) no_pending_update = Signal() def __init__(self, current_version, platform_suffix): super().__init__() self.current_version = current_version self.platform_suffix = platform_suffix def version_compare(self, v1, v2): def normalize(v): return [int(x) for x in v.split(".")] return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) def run(self): if PLATFORM_NAME == 'darwin': cwd = '/tmp/flarestempupdate' else: cwd = os.getcwd() pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$") found = False try: for item in os.listdir(cwd): folder_path = os.path.join(cwd, item) if os.path.isdir(folder_path) and item.endswith(self.platform_suffix): match = pattern.match(item) if match: folder_version = match.group(1) if self.version_compare(folder_version, self.current_version) > 0: self.pending_update_found.emit(folder_version, folder_path) found = True break except: pass if not found: self.no_pending_update.emit() class AboutWindow(QWidget): """ Simple About window displaying basic application information. Args: parent (QWidget, optional): Parent widget of this window. Defaults to None. """ def __init__(self, parent=None): super().__init__(parent, Qt.WindowType.Window) self.setWindowTitle("About FLARES") self.resize(250, 100) layout = QVBoxLayout() label = QLabel("About FLARES", self) label2 = QLabel("fNIRS Lightweight Analysis, Research, & Evaluation Suite", self) label3 = QLabel("FLARES is licensed under the GPL-3.0 licence. For more information, visit https://www.gnu.org/licenses/gpl-3.0.en.html", self) label4 = QLabel(f"Version v{CURRENT_VERSION}") layout.addWidget(label) layout.addWidget(label2) layout.addWidget(label3) layout.addWidget(label4) self.setLayout(layout) class UserGuideWindow(QWidget): """ Simple User Guide window displaying basic information on how to use the software. Args: parent (QWidget, optional): Parent widget of this window. Defaults to None. """ def __init__(self, parent=None): super().__init__(parent, Qt.WindowType.Window) self.setWindowTitle("User Guide for FLARES") self.resize(250, 100) layout = QVBoxLayout() label = QLabel("No user guide available yet!", self) layout.addWidget(label) self.setLayout(layout) class ProgressBubble(QWidget): """ A clickable widget displaying a progress bar made of colored rectangles and a label. Args: display_name (str): Text to display above the progress bar. file_path (str): Associated file path stored with the bubble. """ clicked = Signal(object) def __init__(self, display_name, file_path): super().__init__() self.layout = QVBoxLayout() self.label = QLabel(display_name) self.label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.label.setStyleSheet(""" QLabel { border: 1px solid #888; border-radius: 10px; padding: 8px 12px; background-color: #e0f0ff; } """) self.progress_layout = QHBoxLayout() self.rects = [] for _ in range(12): rect = QFrame() rect.setFixedSize(10, 20) rect.setStyleSheet("background-color: white; border: 1px solid gray;") self.progress_layout.addWidget(rect) self.rects.append(rect) self.layout.addWidget(self.label) self.layout.addLayout(self.progress_layout) self.setLayout(self.layout) # Store the file path self.file_path = file_path self.current_step = 0 # Make the bubble clickable self.setCursor(Qt.CursorShape.PointingHandCursor) # Resize policy to make bubbles responsive # TODO: Not only do this once but when window is resized too self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Minimum) def update_progress(self, step_index): self.current_step = step_index for i, rect in enumerate(self.rects): if i < step_index: rect.setStyleSheet("background-color: green; border: 1px solid gray;") elif i == step_index: rect.setStyleSheet("background-color: yellow; border: 1px solid gray;") else: rect.setStyleSheet("background-color: white; border: 1px solid gray;") def mousePressEvent(self, event): self.clicked.emit(self) super().mousePressEvent(event) class ParamSection(QWidget): """ A widget section that dynamically creates labeled input fields from parameter metadata. Args: section_data (dict): Dictionary containing section title and list of parameter info. Expected format: { "title": str, "params": [ { "name": str, "type": type, "default": any, "help": str (optional) }, ... ] } """ def __init__(self, section_data): super().__init__() layout = QVBoxLayout() self.setLayout(layout) self.widgets = {} self.selected_path = None # Title label title_label = QLabel(section_data["title"]) title_label.setStyleSheet("font-weight: bold; font-size: 14px; margin-top: 10px; margin-bottom: 5px;") layout.addWidget(title_label) # Horizontal line line = QFrame() line.setFrameShape(QFrame.Shape.HLine) line.setFrameShadow(QFrame.Shadow.Sunken) layout.addWidget(line) for param in section_data["params"]: h_layout = QHBoxLayout() label = QLabel(param["name"]) label.setFixedWidth(180) label.setToolTip(param.get("help", "")) help_text = param.get("help", "") help_btn = QPushButton("?") help_btn.setFixedWidth(25) help_btn.setToolTip(help_text) help_btn.clicked.connect(lambda _, text=help_text: self.show_help_popup(text)) h_layout.addWidget(help_btn) h_layout.setStretch(0, 1) # Set stretch factor for button (10%) h_layout.addWidget(label) h_layout.setStretch(1, 3) # Set the stretch factor for label (40%) # Create input widget based on type if param["type"] == bool: widget = QComboBox() widget.addItems(["True", "False"]) widget.setCurrentText(str(param["default"])) elif param["type"] == int: widget = QLineEdit() widget.setValidator(QIntValidator()) widget.setText(str(param["default"])) elif param["type"] == float: widget = QLineEdit() widget.setValidator(QDoubleValidator()) widget.setText(str(param["default"])) else: # str or list, treat as text for now widget = QLineEdit() widget.setText(str(param["default"])) widget.setToolTip(help_text) h_layout.addWidget(widget) h_layout.setStretch(2, 5) # Set stretch factor for input field (50%) layout.addLayout(h_layout) self.widgets[param["name"]] = widget def show_help_popup(self, text): msg = QMessageBox(self) msg.setWindowTitle("Parameter Info") msg.setText(text) msg.exec() class ViewerWindow(QWidget): """ Window displaying various fNIRS data visualization and analysis options via buttons. Args: all_results (dict): Analysis results data. all_haemo (dict): Haemodynamic data per subject. all_figures (dict): Figures generated from the data. config_snapshot (dict): Configuration snapshot used for analysis. parent (QWidget, optional): Parent widget. Defaults to None. """ def __init__(self, all_results, all_haemo, all_figures, config_snapshot, parent=None): try: super().__init__(parent, Qt.WindowType.Window) self.all_results = all_results self.all_haemo = all_haemo self.all_figures = all_figures self.config_snapshot = config_snapshot if not self.all_haemo: QMessageBox.critical(self, "Data Error", "No haemodynamic data available!") return subjects_dir = resource_path("mne_data/MNE-sample-data/subjects") os.environ["SUBJECTS_DIR"] = subjects_dir # TODO: Thread all of this to not freeze main window import fNIRS_module fNIRS_module.set_config(self.config_snapshot, True) # Set globals in this process layout = QVBoxLayout() button_actions = [ ("Show all Images", lambda: fNIRS_module.show_all_images(self.all_figures)), ("save_all_images", lambda: fNIRS_module.save_all_images(self.all_figures)), ("data_to_csv", lambda: fNIRS_module.data_to_csv(self.all_results)), ("plot_2d_theta_graph", lambda: fNIRS_module.plot_2d_theta_graph(self.all_results)), ("verify_channel_positions", lambda: fNIRS_module.verify_channel_positions(self.all_haemo[list(self.all_haemo.keys())[0]]["full_layout"])), ("brain_landmarks_3d", lambda: fNIRS_module.brain_landmarks_3d(self.all_haemo[list(self.all_haemo.keys())[0]]["full_layout"], 'all')), ("plot_2d_3d_contrasts_between_groups", lambda: fNIRS_module.plot_2d_3d_contrasts_between_groups(self.all_results, self.all_haemo, 'theta', 'all', True)), ("brain_3d_visualization", lambda: fNIRS_module.brain_3d_visualization(self.all_results, self.all_haemo, 0, 't', 'all', True)), ("plot_fir_model_results", lambda: fNIRS_module.plot_fir_model_results(self.all_results, self.all_haemo, 0, 'theta')), ("plot_individual_theta_averages", lambda: fNIRS_module.plot_individual_theta_averages(self.all_results)), ("plot_group_theta_averages", lambda: fNIRS_module.plot_group_theta_averages(self.all_results)), ("plot_avg_significant_activity", lambda: fNIRS_module.plot_avg_significant_activity(self.all_haemo[list(self.all_haemo.keys())[0]]["full_layout"], self.all_results, 'theta')), ("fold_channels", lambda: fNIRS_module.fold_channels(self.all_haemo[list(self.all_haemo.keys())[0]]["full_layout"].copy(), self.all_results, resource_path("mne_data/fOLD/fOLD-public-master/Supplementary"))), ] for text, func in button_actions: btn = QPushButton(text) btn.clicked.connect(self.make_safe_callback(func)) layout.addWidget(btn) self.setLayout(layout) except Exception as e: QMessageBox.critical(None, "Startup Error", f"ViewerWindow failed:\n{str(e)}") def make_safe_callback(self, func): def safe_func(): try: func() except Exception as e: QMessageBox.critical(self, "Error", f"An error occurred:\n{str(e)}") return safe_func class MainApplication(QMainWindow): """ Main application window that creates and sets up the UI. """ progress_update_signal = Signal(str, int) def __init__(self): super().__init__() self.setWindowTitle("FLARES") self.setGeometry(100, 100, 1280, 720) self.about = None self.help = None self.bubble_widgets = {} self.param_sections = [] self.folder_paths = [] self.init_ui() self.create_menu_bar() self.platform_suffix = "-" + PLATFORM_NAME self.pending_update_version = None self.pending_update_path = None # Start local pending update check thread self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix) self.local_check_thread.pending_update_found.connect(self.on_pending_update_found) self.local_check_thread.no_pending_update.connect(self.on_no_pending_update) self.local_check_thread.start() def init_ui(self): # Central widget and main horizontal layout central = QWidget() self.setCentralWidget(central) main_layout = QHBoxLayout() central.setLayout(main_layout) # Left container with vertical layout: top left + bottom left left_container = QWidget() left_layout = QVBoxLayout() left_container.setLayout(left_layout) left_container.setMinimumWidth(300) # Top left widget (for demo, use QTextEdit) self.top_left_widget = QTextEdit() self.top_left_widget.setReadOnly(True) self.top_left_widget.setPlaceholderText("Click a file below to get started! No files below? Open one using File > Open!") self.top_left_widget.setFixedHeight(250) # Bottom left: the bubbles inside a scroll area self.bubble_container = QWidget() self.bubble_layout = QGridLayout() self.bubble_layout.setAlignment(Qt.AlignmentFlag.AlignTop) self.bubble_container.setLayout(self.bubble_layout) self.scroll_area = QScrollArea() self.scroll_area.setWidgetResizable(True) self.scroll_area.setWidget(self.bubble_container) self.scroll_area.setMinimumHeight(300) # Add top left and bottom left to left layout left_layout.addWidget(self.top_left_widget) left_layout.addWidget(self.scroll_area) self.progress_update_signal.connect(self.update_file_progress) # Right widget (full height on right side) — example QTextEdit self.right_container = QWidget() right_container_layout = QVBoxLayout() self.right_container.setLayout(right_container_layout) # Content widget inside scroll area self.right_content_widget = QWidget() right_content_layout = QVBoxLayout() self.right_content_widget.setLayout(right_content_layout) # Option selector dropdown self.option_selector = QComboBox() self.option_selector.addItems(["FIR"]) right_content_layout.addWidget(self.option_selector) # Container for the sections self.rows_container = QWidget() self.rows_layout = QVBoxLayout() self.rows_layout.setSpacing(10) self.rows_container.setLayout(self.rows_layout) right_content_layout.addWidget(self.rows_container) # Spacer at bottom inside scroll area content to push content up right_content_layout.addStretch() # Scroll area for the right side content self.right_scroll_area = QScrollArea() self.right_scroll_area.setWidgetResizable(True) self.right_scroll_area.setWidget(self.right_content_widget) # Buttons widget (fixed below the scroll area) buttons_widget = QWidget() buttons_layout = QHBoxLayout() buttons_widget.setLayout(buttons_layout) buttons_layout.addStretch() self.button1 = QPushButton("Process") self.button2 = QPushButton("Clear") self.button3 = QPushButton("Analysis") buttons_layout.addWidget(self.button1) buttons_layout.addWidget(self.button2) buttons_layout.addWidget(self.button3) self.button1.setMinimumSize(100, 40) self.button2.setMinimumSize(100, 40) self.button3.setMinimumSize(100, 40) self.button3.setVisible(False) self.button1.clicked.connect(self.on_run_task) self.button2.clicked.connect(self.clear_all) self.button3.clicked.connect(self.open_viewer_window) # Add scroll area and buttons widget to right container layout right_container_layout.addWidget(self.right_scroll_area) right_container_layout.addWidget(buttons_widget) # Add left and right containers to main layout main_layout.addWidget(left_container, stretch=55) main_layout.addWidget(self.right_container, stretch=45) # Set size policy to expand self.right_container.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) self.right_scroll_area.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) # Store ParamSection widgets self.option_selector.currentIndexChanged.connect(self.update_sections) # Initial build self.update_sections(0) def create_menu_bar(self): '''Menu Bar at the top of the screen''' menu_bar = self.menuBar() def make_action(name, shortcut=None, slot=None, checkable=False, checked=False, icon=None): action = QAction(name, self) if shortcut: action.setShortcut(QKeySequence(shortcut)) if slot: action.triggered.connect(slot) if checkable: action.setCheckable(True) action.setChecked(checked) if icon: action.setIcon(QIcon(icon)) return action # File menu and actions file_menu = menu_bar.addMenu("File") file_actions = [ ("Open File...", "Ctrl+O", self.open_file_dialog, resource_path("icons/file_open_24dp_1F1F1F.svg")), ("Open Folder...", "Ctrl+Alt+O", self.open_folder_dialog, resource_path("icons/folder_24dp_1F1F1F.svg")), ("Open Folders...", "Ctrl+Shift+O", self.open_multiple_folders_dialog, resource_path("icons/folder_copy_24dp_1F1F1F.svg")), ("Load Project...", "Ctrl+L", self.load_project, resource_path("icons/article_24dp_1F1F1F.svg")), ("Save Project...", "Ctrl+S", self.save_project, resource_path("icons/save_24dp_1F1F1F.svg")), ("Save Project As...", "Ctrl+Shift+S", self.save_project, resource_path("icons/save_as_24dp_1F1F1F.svg")), # Maybe connect a separate method if different ] for i, (name, shortcut, slot, icon) in enumerate(file_actions): file_menu.addAction(make_action(name, shortcut, slot, icon=icon)) if i == 2: # after the first 3 actions (0,1,2) file_menu.addSeparator() file_menu.addSeparator() file_menu.addAction(make_action("Exit", "Ctrl+Q", QApplication.instance().quit, icon=resource_path("icons/exit_to_app_24dp_1F1F1F.svg"))) # Edit menu edit_menu = menu_bar.addMenu("Edit") edit_actions = [ ("Cut", "Ctrl+X", self.cut_text, resource_path("icons/content_cut_24dp_1F1F1F.svg")), ("Copy", "Ctrl+C", self.copy_text, resource_path("icons/content_copy_24dp_1F1F1F.svg")), ("Paste", "Ctrl+V", self.paste_text, resource_path("icons/content_paste_24dp_1F1F1F.svg")) ] for name, shortcut, slot, icon in edit_actions: edit_menu.addAction(make_action(name, shortcut, slot, icon=icon)) # View menu view_menu = menu_bar.addMenu("View") toggle_statusbar_action = make_action("Toggle Status Bar", checkable=True, checked=True, slot=None) view_menu.addAction(toggle_statusbar_action) # Options menu (Help & About) options_menu = menu_bar.addMenu("Options") options_actions = [ ("User Guide", "F1", self.user_guide, resource_path("icons/help_24dp_1F1F1F.svg")), ("Check for Updates", "F5", self.manual_check_for_updates, resource_path("icons/update_24dp_1F1F1F.svg")), ("About", "F12", self.about_window, resource_path("icons/info_24dp_1F1F1F.svg")) ] for i, (name, shortcut, slot, icon) in enumerate(options_actions): options_menu.addAction(make_action(name, shortcut, slot, icon=icon)) if i == 1: # after the first 2 actions (0,1) options_menu.addSeparator() # Optional: status bar self.statusbar = self.statusBar() self.statusbar.showMessage("Ready") def update_sections(self, index): # Clear previous sections for i in reversed(range(self.rows_layout.count())): widget = self.rows_layout.itemAt(i).widget() if widget is not None: widget.deleteLater() self.param_sections.clear() # Add ParamSection widgets from SECTIONS for section in SECTIONS: section_widget = ParamSection(section) self.rows_layout.addWidget(section_widget) self.param_sections.append(section_widget) def clear_all(self): # Clear the bubble layout while self.bubble_layout.count(): item = self.bubble_layout.takeAt(0) widget = item.widget() if widget: widget.deleteLater() # Clear file data self.bubble_widgets.clear() self.statusBar().clearMessage() # Reset any other data variables self.collected_data_snapshot = None self.all_results = None self.all_haemo = None self.all_figures = None # Reset any visible UI elements self.button3.setVisible(False) self.top_left_widget.clear() def open_viewer_window(self): if not hasattr(self, "all_figures") or self.all_figures is None: QMessageBox.warning(self, "No Data", "No figures to show yet!") return self.viewer_window = ViewerWindow(self.all_results, self.all_haemo, self.all_figures, self.collected_data_snapshot, self) self.viewer_window.show() def copy_text(self): self.top_left_widget.copy() # Trigger copy self.statusbar.showMessage("Copied to clipboard") # Show status message def cut_text(self): self.top_left_widget.cut() # Trigger cut self.statusbar.showMessage("Cut to clipboard") # Show status message def paste_text(self): self.top_left_widget.paste() # Trigger paste self.statusbar.showMessage("Pasted from clipboard") # Show status message def about_window(self): if self.about is None or not self.about.isVisible(): self.about = AboutWindow(self) self.about.show() def user_guide(self): if self.help is None or not self.help.isVisible(): self.help = UserGuideWindow(self) self.help.show() def open_file_dialog(self): file_path, _ = QFileDialog.getOpenFileName( self, "Open File", "", "All Files (*);;Text Files (*.txt)" ) if file_path: self.selected_path = file_path # store the file path self.show_files_as_bubbles(file_path) def open_folder_dialog(self): folder_path = QFileDialog.getExistingDirectory( self, "Select Folder", "" ) if folder_path: self.selected_path = folder_path # store the folder path self.show_files_as_bubbles(folder_path) def open_multiple_folders_dialog(self): while True: folder = QFileDialog.getExistingDirectory(self, "Select Folder") if not folder: break if not hasattr(self, 'selected_paths'): self.selected_paths = [] if folder not in self.selected_paths: self.selected_paths.append(folder) self.show_files_as_bubbles(self.selected_paths) # Ask if the user wants to add another more = QMessageBox.question( self, "Add Another?", "Do you want to select another folder?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, ) if more == QMessageBox.StandardButton.No: break def save_project(self): filename, _ = QFileDialog.getSaveFileName( self, "Save Project", "", "FLARE Project (*.flare)" ) if not filename: return try: project_data = { "file_list": [bubble.file_path for bubble in self.bubble_widgets.values()], "progress_states": { bubble.file_path: bubble.current_step for bubble in self.bubble_widgets.values() }, "all_results": self.all_results, "all_haemo": self.all_haemo, "all_figures": self.all_figures, "config_snapshot": self.collected_data_snapshot, } with open(filename, "wb") as f: pickle.dump(project_data, f) QMessageBox.information(self, "Success", f"Project saved to:\n{filename}") except Exception as e: QMessageBox.critical(self, "Error", f"Failed to save project:\n{e}") def load_project(self): filename, _ = QFileDialog.getOpenFileName( self, "Load Project", "", "FLARE Project (*.flare)" ) if not filename: return try: with open(filename, "rb") as f: data = pickle.load(f) self.collected_data_snapshot = data["config_snapshot"] self.all_results = data["all_results"] self.all_haemo = data["all_haemo"] self.all_figures = data["all_figures"] for section_widget in self.param_sections: for name, widget in section_widget.widgets.items(): if name not in self.collected_data_snapshot: continue value = self.collected_data_snapshot[name] if isinstance(widget, QComboBox): widget.setCurrentText("True" if value else "False") elif isinstance(widget, QLineEdit): validator = widget.validator() if isinstance(validator, QIntValidator): widget.setText(str(int(value))) elif isinstance(validator, QDoubleValidator): widget.setText(str(float(value))) else: widget.setText(str(value)) self.show_files_as_bubbles_from_list(data["file_list"], data.get("progress_states", {}), filename) # Re-enable the "Viewer" button self.button3.setVisible(True) QMessageBox.information(self, "Loaded", f"Project loaded from:\n{filename}") except Exception as e: QMessageBox.critical(self, "Error", f"Failed to load project:\n{e}") def show_files_as_bubbles(self, folder_paths): if isinstance(folder_paths, str): folder_paths = [folder_paths] # Clear previous bubbles while self.bubble_layout.count(): item = self.bubble_layout.takeAt(0) widget = item.widget() if widget: widget.deleteLater() temp_bubble = ProgressBubble("Test Bubble", "") # A dummy bubble for measurement temp_bubble.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy. Preferred) # temp_bubble.setAttribute(Qt.WA_OpaquePaintEvent) # Improve rendering? temp_bubble.adjustSize() # Adjust size after the widget is created bubble_width = temp_bubble.width() # Get the actual width of a bubble available_width = self.bubble_container.width() cols = max(1, available_width // bubble_width) # Ensure at least 1 column index = 0 for folder_path in folder_paths: if not os.path.isdir(folder_path): continue files = os.listdir(folder_path) files = [f for f in files if os.path.isfile(os.path.join(folder_path, f))] for filename in files: full_path = os.path.join(folder_path, filename) display_name = f"{os.path.basename(folder_path)} / {filename}" bubble = ProgressBubble(display_name, full_path) bubble.clicked.connect(self.on_bubble_clicked) self.bubble_widgets[filename] = bubble row = index // cols col = index % cols self.bubble_layout.addWidget(bubble, row, col) index += 1 self.statusBar().showMessage(f"{index} file(s) loaded from: {', '.join(folder_paths)}") def show_files_as_bubbles_from_list(self, file_list, progress_states=None, filenames=None): progress_states = progress_states or {} # Clear old while self.bubble_layout.count(): item = self.bubble_layout.takeAt(0) widget = item.widget() if widget: widget.deleteLater() self.bubble_widgets = {} temp_bubble = ProgressBubble("Test Bubble", "") # A dummy bubble for measurement temp_bubble.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Preferred) # temp_bubble.setAttribute(Qt.WA_OpaquePaintEvent) # Improves rendering? temp_bubble.adjustSize() # Adjust size after the widget is created bubble_width = temp_bubble.width() # Get the actual width of a bubble available_width = self.bubble_container.width() cols = max(1, available_width // bubble_width) # Ensure at least 1 column index = 0 for index, file_path in enumerate(file_list): filename = os.path.basename(file_path) display_name = f"{os.path.basename(os.path.dirname(file_path))} / {filename}" # Create bubble with full path bubble = ProgressBubble(display_name, file_path) bubble.clicked.connect(self.on_bubble_clicked) self.bubble_widgets[file_path] = bubble step = progress_states.get(file_path, 0) bubble.update_progress(step) row = index // cols col = index % cols self.bubble_layout.addWidget(bubble, row, col) self.statusBar().showMessage(f"{len(file_list)} files loaded from from {os.path.abspath(filenames)}.") def on_bubble_clicked(self, bubble): file_path = bubble.file_path if not os.path.exists(file_path): self.top_left_widget.setText("File not found.") return size = os.path.getsize(file_path) created = time.ctime(os.path.getctime(file_path)) modified = time.ctime(os.path.getmtime(file_path)) # snirf_info = self.get_snirf_metadata_mne(file_path) info = f"""\ File: {os.path.basename(file_path)} Size: {size:,} bytes Created: {created} Modified: {modified} Full Path: {file_path} """ # if "Error" in snirf_info: # info += f"\nSNIRF Metadata could not be loaded: {snirf_info['Error']}" # else: # info += "\nSNIRF Metadata:\n" # for k, v in snirf_info.items(): # if isinstance(v, list): # info += f" {k}:\n" # for item in v: # info += f" - {item}\n" # else: # info += f" {k}: {v}\n" self.top_left_widget.setText(info) def placeholder(self): QMessageBox.information(self, "Placeholder", "This feature is not implemented yet.") '''MODULE FILE''' def on_run_task(self): collected_data = {} # Add all parameter key-value pairs for section_widget in self.param_sections: for name, widget in section_widget.widgets.items(): if isinstance(widget, QComboBox): val = widget.currentText() == "True" elif isinstance(widget, QLineEdit): text = widget.text() validator = widget.validator() if isinstance(validator, QIntValidator): val = int(text or 0) elif isinstance(validator, QDoubleValidator): val = float(text or 0.0) else: val = text else: val = None collected_data[name] = val # Flattened! if hasattr(self, "selected_paths") and self.selected_paths: # Handle multiple folders parents = [Path(p).parent for p in self.selected_paths] base_parents = set(str(p) for p in parents) if len(base_parents) > 1: raise ValueError("Selected folders must have the same parent directory") collected_data["BASE_SNIRF_FOLDER"] = base_parents.pop() collected_data["SNIRF_SUBFOLDERS"] = [Path(p).name for p in self.selected_paths] collected_data["STIM_DURATION"] = [0 for _ in self.selected_paths] elif hasattr(self, "selected_path") and self.selected_path: # Handle single folder selected_path = Path(self.selected_path) collected_data["BASE_SNIRF_FOLDER"] = str(selected_path.parent) collected_data["SNIRF_SUBFOLDERS"] = [selected_path.name] collected_data["STIM_DURATION"] = [0] else: # No folder selected - handle gracefully or raise error raise ValueError("No folder(s) selected") collected_data["HRF_MODEL"] = 'fir' collected_data["MAX_WORKERS"] = 12 collected_data["FORCE_DROP_CHANNELS"] = [] collected_data["TARGET_ACTIVITY"] = "Reach" collected_data["TARGET_CONTROL"] = "Start of Rest" collected_data["ROI_GROUP_1"] = [[1, 1], [1, 2], [2, 1], [2, 4], [3, 1], # Channel pairings for a region of interest. [2, 2], [4, 3], [4, 4], [5, 5], [6, 4]] collected_data["ROI_GROUP_2"] = [[6, 5], [6, 8], [7, 7], [7, 8], [8, 5], # Channel pairings for another region of interest. [8, 6], [9, 6], [9, 7], [10, 7], [10, 8]] collected_data["ROI_GROUP_1_NAME"] = "Parieto-Ocipital" # Friendly name for the first region of interest group. collected_data["ROI_GROUP_2_NAME"] = "Fronto-Parietal" collected_data["P_THRESHOLD"] = 0.05 collected_data["SEE_BAD_IMAGES"] = True collected_data["ABS_T_VALUE"] = 6 collected_data["ABS_THETA_VALUE"] = 10 collected_data["ABS_CONTRAST_T_VALUE"] = 6 collected_data["ABS_CONTRAST_THETA_VALUE"] = 10 collected_data["ABS_SIGNIFICANCE_T_VALUE"] = 6 collected_data["ABS_SIGNIFICANCE_THETA_VALUE"] = 10 collected_data["BRAIN_DISTANCE"] = 0.02 collected_data["BRAIN_MODE"] = "weighted" collected_data["EPOCH_REJECT_CRITERIA_THRESH"] = 20e-2 collected_data["TIME_MIN_THRESH"] = -5 collected_data["TIME_MAX_THRESH"] = 15 collected_data["VERBOSITY"] = True self.collected_data_snapshot = collected_data.copy() if current_process().name == 'MainProcess': self.manager = Manager() self.result_queue = self.manager.Queue() self.progress_queue = self.manager.Queue() self.result_process = Process( target=run_gui_entry_wrapper, args=(collected_data, self.result_queue, self.progress_queue) ) self.result_process.daemon = False self.result_process.start() print("start was called") self.statusbar.showMessage("Running processing in background...") # Poll the queue periodically self.result_timer = QTimer() self.result_timer.timeout.connect(self.check_for_pipeline_results) self.result_timer.start() self.statusbar.showMessage("Task started in separate process.") def check_for_pipeline_results(self): while not self.result_queue.empty(): msg = self.result_queue.get() if isinstance(msg, dict): if msg.get("success"): all_results, all_haemo, all_figures, all_processes, elapsed_time = msg["result"] self.all_results = all_results self.all_haemo = all_haemo self.all_figures = all_figures self.all_processes = all_processes self.elapsed_time = elapsed_time self.statusbar.showMessage(f"Processing complete! Time elapsed: {elapsed_time:.2f} seconds") self.button3.setVisible(True) else: error_msg = msg.get("error", "Unknown error") print("Error during processing:", error_msg) self.statusbar.showMessage(f"Processing failed! {error_msg}") self.result_timer.stop() self.cleanup_after_process() return elif isinstance(msg, tuple) and msg[0] == 'progress': _, file_name, step_index = msg self.progress_update_signal.emit(file_name, step_index) def cleanup_after_process(self): if hasattr(self, 'result_process'): self.result_process.join(timeout=0) if self.result_process.is_alive(): self.result_process.terminate() self.result_process.join() if hasattr(self, 'result_queue'): if 'AutoProxy' in repr(self.result_queue): pass else: self.result_queue.close() self.result_queue.join_thread() if hasattr(self, 'progress_queue'): if 'AutoProxy' in repr(self.progress_queue): pass else: self.progress_queue.close() self.progress_queue.join_thread() # Shutdown manager to kill its server process and clean up if hasattr(self, 'manager'): self.manager.shutdown() def update_file_progress(self, filename, step_index): bubble = self.bubble_widgets.get(filename) if bubble: bubble.update_progress(step_index) '''UPDATER''' def manual_check_for_updates(self): self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix) self.local_check_thread.pending_update_found.connect(self.on_pending_update_found) self.local_check_thread.no_pending_update.connect(self.on_no_pending_update) self.local_check_thread.start() def on_pending_update_found(self, version, folder_path): self.statusBar().showMessage(f"Pending update found: version {version}") self.pending_update_version = version self.pending_update_path = folder_path self.show_pending_update_popup() def on_no_pending_update(self): # No pending update found locally, start server check directly self.statusBar().showMessage("No pending local update found. Checking server...") self.start_update_check_thread() def show_pending_update_popup(self): msg_box = QMessageBox(self) msg_box.setWindowTitle("Pending Update Found") msg_box.setText(f"A previously downloaded update (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?") install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) msg_box.exec() if msg_box.clickedButton() == install_now_button: self.install_update(self.pending_update_path) else: self.statusBar().showMessage("Pending update available. Install later.") # After user dismisses, still check the server for new updates self.start_update_check_thread() def start_update_check_thread(self): self.check_thread = UpdateCheckThread() self.check_thread.download_requested.connect(self.on_server_update_requested) self.check_thread.no_update_available.connect(self.on_server_no_update) self.check_thread.error_occurred.connect(self.on_error) self.check_thread.start() def on_server_no_update(self): self.statusBar().showMessage("No new updates found on server.", 5000) def on_server_update_requested(self, download_url, latest_version): if self.pending_update_version: cmp = self.version_compare(latest_version, self.pending_update_version) if cmp > 0: # Server version is newer than pending update self.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...") try: shutil.rmtree(self.pending_update_path) self.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}") except Exception as e: self.statusBar().showMessage(f"Failed to delete old update folder: {e}") # Clear pending update info so new download proceeds self.pending_update_version = None self.pending_update_path = None # Download the new update self.download_update(download_url, latest_version) elif cmp == 0: # Versions equal, no download needed self.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.") else: # Server version older than pending? Unlikely but just keep pending update self.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.") else: # No pending update, just download self.download_update(download_url, latest_version) def download_update(self, download_url, latest_version): self.statusBar().showMessage("Downloading update...") self.download_thread = UpdateDownloadThread(download_url, latest_version) self.download_thread.update_ready.connect(self.on_update_ready) self.download_thread.error_occurred.connect(self.on_error) self.download_thread.start() def on_update_ready(self, latest_version, extract_folder): self.statusBar().showMessage("Update downloaded and extracted.") msg_box = QMessageBox(self) msg_box.setWindowTitle("Update Ready") msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?") install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) msg_box.exec() if msg_box.clickedButton() == install_now_button: self.install_update(extract_folder) else: self.statusBar().showMessage("Update ready. Install later.") def install_update(self, extract_folder): # Path to updater executable if PLATFORM_NAME == 'windows': updater_path = os.path.join(os.getcwd(), "flares_updater.exe") elif PLATFORM_NAME == 'darwin': if getattr(sys, 'frozen', False): updater_path = os.path.join(os.path.dirname(sys.executable), "../../../flares_updater.app") else: updater_path = os.path.join(os.getcwd(), "../flares_updater.app") elif PLATFORM_NAME == 'linux': updater_path = os.path.join(os.getcwd(), "flares_updater") else: updater_path = os.getcwd() if not os.path.exists(updater_path): QMessageBox.critical(self, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}") return # Launch updater with extracted folder path as argument try: # Pass current app's executable path for updater to relaunch main_app_executable = os.path.abspath(sys.argv[0]) print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"') if PLATFORM_NAME == 'darwin': subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable]) else: subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path)) # Close the current app so updater can replace files sys.exit(0) except Exception as e: QMessageBox.critical(self, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}") def on_error(self, message): # print(f"Error: {message}") self.statusBar().showMessage(f"Error occurred during update process. {message}") def version_compare(self, v1, v2): def normalize(v): return [int(x) for x in v.split(".")] return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) def closeEvent(self, event): # Gracefully shut down multiprocessing children print("Window is closing. Cleaning up...") if hasattr(self, 'manager'): self.manager.shutdown() for child in self.findChildren(QWidget): if child is not self and child.isVisible(): child.close() kill_child_processes() event.accept() def wait_for_process_to_exit(process_name, timeout=10): """ Waits for a process with the specified name to exit within a timeout period. Args: process_name (str): Name (or part of the name) of the process to wait for. timeout (int, optional): Maximum time to wait in seconds. Defaults to 10. Returns: bool: True if the process exited before the timeout, False otherwise. """ print(f"Waiting for {process_name} to exit...") deadline = time.time() + timeout while time.time() < deadline: still_running = False for proc in psutil.process_iter(['name']): try: if proc.info['name'] and process_name.lower() in proc.info['name'].lower(): still_running = True print(f"Still running: {proc.info['name']} (PID: {proc.pid})") break except (psutil.NoSuchProcess, psutil.AccessDenied): continue if not still_running: print(f"{process_name} has exited.") return True time.sleep(0.5) print(f"{process_name} did not exit in time.") return False def finish_update_if_needed(): """ Completes a pending application update if '--finish-update' is present in the command-line arguments. """ if "--finish-update" in sys.argv: print("Finishing update...") if PLATFORM_NAME == 'darwin': app_dir = '/tmp/flarestempupdate' else: app_dir = os.getcwd() # 1. Find update folder update_folder = None for entry in os.listdir(app_dir): entry_path = os.path.join(app_dir, entry) if os.path.isdir(entry_path) and entry.startswith("flares-") and entry.endswith("-" + PLATFORM_NAME): update_folder = os.path.join(app_dir, entry) break if update_folder is None: print("No update folder found. Skipping update steps.") return if PLATFORM_NAME == 'darwin': update_folder = os.path.join(update_folder, "flares-darwin") # 2. Wait for flares_updater to exit print("Waiting for flares_updater to exit...") for proc in psutil.process_iter(['pid', 'name']): if proc.info['name'] and "flares_updater" in proc.info['name'].lower(): try: proc.wait(timeout=5) except psutil.TimeoutExpired: print("Force killing lingering flares_updater") proc.kill() # 3. Replace the updater if PLATFORM_NAME == 'windows': new_updater = os.path.join(update_folder, "flares_updater.exe") dest_updater = os.path.join(app_dir, "flares_updater.exe") elif PLATFORM_NAME == 'darwin': new_updater = os.path.join(update_folder, "flares_updater.app") dest_updater = os.path.abspath(os.path.join(sys.executable, "../../../../flares_updater.app")) elif PLATFORM_NAME == 'linux': new_updater = os.path.join(update_folder, "flares_updater") dest_updater = os.path.join(app_dir, "flares_updater") else: print("No platform??") new_updater = os.getcwd() dest_updater = os.getcwd() print(f"New updater is {new_updater}") print(f"Dest updater is {dest_updater}") print("Writable?", os.access(dest_updater, os.W_OK)) print("Executable path:", sys.executable) print("Trying to copy:", new_updater, "->", dest_updater) if os.path.exists(new_updater): try: if os.path.exists(dest_updater): if PLATFORM_NAME == 'darwin': try: if os.path.isdir(dest_updater): shutil.rmtree(dest_updater) print(f"Deleted directory: {dest_updater}") else: os.remove(dest_updater) print(f"Deleted file: {dest_updater}") except Exception as e: print(f"Error deleting {dest_updater}: {e}") else: os.remove(dest_updater) if PLATFORM_NAME == 'darwin': wait_for_process_to_exit("flares_updater", timeout=10) subprocess.check_call(["ditto", new_updater, dest_updater]) else: shutil.copy2(new_updater, dest_updater) if PLATFORM_NAME in ('linux', 'darwin'): os.chmod(dest_updater, 0o755) if PLATFORM_NAME == 'darwin': remove_quarantine(dest_updater) print("flares_updater replaced.") except Exception as e: print(f"Failed to replace flares_updater: {e}") # 4. Delete the update folder try: if PLATFORM_NAME == 'darwin': shutil.rmtree(app_dir) else: shutil.rmtree(update_folder) except Exception as e: print(f"Failed to delete update folder: {e}") QMessageBox.information(None, "Update Complete", "The application has been successfully updated.") sys.argv.remove("--finish-update") def remove_quarantine(app_path): """ Removes the macOS quarantine attribute from the specified application path. """ script = f''' do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "FLARES needs privileges to finish the update. (2/2)" ''' try: subprocess.run(['osascript', '-e', script], check=True) print("✅ Quarantine attribute removed.") except subprocess.CalledProcessError as e: print("❌ Failed to remove quarantine attribute.") print(e) def run_gui_entry_wrapper(config, gui_queue, progress_queue): """ Where the processing happens """ try: import fNIRS_module fNIRS_module.gui_entry(config, gui_queue, progress_queue) sys.exit(0) except Exception as e: tb_str = traceback.format_exc() gui_queue.put({ "success": False, "error": f"Child process crashed: {str(e)}\nTraceback:\n{tb_str}" }) def resource_path(relative_path): """ Get absolute path to resource regardless of running directly or packaged using PyInstaller """ if hasattr(sys, '_MEIPASS'): # PyInstaller bundle path base_path = sys._MEIPASS else: base_path = os.path.abspath(".") return os.path.join(base_path, relative_path) def kill_child_processes(): """ Goodbye children """ try: parent = psutil.Process(os.getpid()) children = parent.children(recursive=True) for child in children: try: child.kill() except psutil.NoSuchProcess: pass psutil.wait_procs(children, timeout=5) except Exception as e: print(f"Error killing child processes: {e}") def exception_hook(exc_type, exc_value, exc_traceback): """ Method that will display a popup when the program hard crashes containg what went wrong """ error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) print(error_msg) # also print to console kill_child_processes() # Show error message box # Make sure QApplication exists (or create a minimal one) app = QApplication.instance() if app is None: app = QApplication(sys.argv) QMessageBox.critical(None, "Unexpected Error", f"An unhandled exception occurred:\n\n{error_msg}") # Exit the app after user acknowledges sys.exit(1) if __name__ == "__main__": # Redirect exceptions to the popup window sys.excepthook = exception_hook # Set up application logging if PLATFORM_NAME == "darwin": log_path = os.path.join(os.path.dirname(sys.executable), "../../../flares.log") else: log_path = os.path.join(os.getcwd(), "flares.log") try: os.remove(log_path) except: pass sys.stdout = open(log_path, "a", buffering=1) sys.stderr = sys.stdout print(f"\n=== App started at {datetime.now()} ===\n") freeze_support() # Required for PyInstaller + multiprocessing # Only run GUI in the main process if current_process().name == 'MainProcess': app = QApplication(sys.argv) finish_update_if_needed() window = MainApplication() if PLATFORM_NAME == "darwin": app.setWindowIcon(QIcon(resource_path("icons/main.icns"))) window.setWindowIcon(QIcon(resource_path("icons/main.icns"))) else: app.setWindowIcon(QIcon(resource_path("icons/main.ico"))) window.setWindowIcon(QIcon(resource_path("icons/main.ico"))) window.show() sys.exit(app.exec())