""" Filename: main.py Description: FLARES main executable Author: Tyler de Zeeuw License: GPL-3.0 """ # Built-in imports import os import re import sys import json import time import shlex import pickle import shutil import zipfile import platform import traceback import subprocess from pathlib import Path, PurePosixPath from datetime import datetime from multiprocessing import Process, current_process, freeze_support, Manager import numpy as np import pandas as pd # External library imports import psutil import requests from mne.io import read_raw_snirf from mne.preprocessing.nirs import source_detector_distances from mne_nirs.io import write_raw_snirf from mne.channels import make_dig_montage from mne import Annotations from PySide6.QtWidgets import ( QApplication, QWidget, QMessageBox, QVBoxLayout, QHBoxLayout, QTextEdit, QScrollArea, QComboBox, QGridLayout, QPushButton, QMainWindow, QFileDialog, QLabel, QLineEdit, QFrame, QSizePolicy, QGroupBox, QDialog, QListView, ) from PySide6.QtCore import QThread, Signal, Qt, QTimer, QEvent, QSize from PySide6.QtGui import QAction, QKeySequence, QIcon, QIntValidator, QDoubleValidator, QPixmap, QStandardItemModel, QStandardItem from PySide6.QtSvgWidgets import QSvgWidget # needed to show svgs when app is not frozen CURRENT_VERSION = "1.0.0" API_URL = "https://git.research.dezeeuw.ca/api/v1/repos/tyler/flares/releases" API_URL_SECONDARY = "https://git.research2.dezeeuw.ca/api/v1/repos/tyler/flares/releases" PLATFORM_NAME = platform.system().lower() # Selectable parameters on the right side of the window SECTIONS = [ { "title": "Preprocessing", "params": [ {"name": "SECONDS_TO_STRIP", "default": 0, "type": int, "help": "Seconds to remove from beginning of file. Setting this to 0 will remove nothing from the file."}, {"name": "DOWNSAMPLE", "default": True, "type": bool, "help": "Downsample snirf files."}, {"name": "DOWNSAMPLE_FREQUENCY", "default": 25, "type": int, "help": "Frequency (Hz) to downsample to. If this is set higher than the input data, new data will be interpolated."}, ] }, { "title": "Scalp Coupling Index", "params": [ {"name": "SCI", "default": True, "type": bool, "help": "Calculate Scalp Coupling Index."}, {"name": "SCI_TIME_WINDOW", "default": 3, "type": int, "help": "SCI time window."}, {"name": "SCI_THRESHOLD", "default": 0.6, "type": float, "help": "SCI threshold (0-1)."}, ] }, { "title": "Signal to Noise Ratio", "params": [ {"name": "SNR", "default": True, "type": bool, "help": "Calculate Signal to Noise Ratio."}, # {"name": "SNR_TIME_WINDOW", "default": -1, "type": int, "help": "SNR time window."}, {"name": "SNR_THRESHOLD", "default": 5.0, "type": float, "help": "SNR threshold (dB)."}, ] }, { "title": "Peak Spectral Power", "params": [ {"name": "PSP", "default": True, "type": bool, "help": "Calculate Peak Spectral Power."}, {"name": "PSP_TIME_WINDOW", "default": 3, "type": int, "help": "PSP time window."}, {"name": "PSP_THRESHOLD", "default": 0.1, "type": float, "help": "PSP threshold."}, ] }, { "title": "Bad Channels Handling", "params": [ # {"name": "NOT_IMPLEMENTED", "default": True, "type": bool, "help": "Calculate Peak Spectral Power."}, # {"name": "NOT_IMPLEMENTED", "default": 3, "type": int, "help": "PSP time window."}, # {"name": "NOT_IMPLEMENTED", "default": 0.1, "type": float, "help": "PSP threshold."}, ] }, { "title": "Optical Density", "params": [ # Intentionally empty (TODO) ] }, { "title": "Temporal Derivative Distribution Repair filtering", "params": [ {"name": "TDDR", "default": True, "type": bool, "help": "Apply TDDR filtering."}, ] }, { "title": "Haemoglobin Concentration", "params": [ # Intentionally empty (TODO) ] }, { "title": "Enhance Negative Correlation", "params": [ #{"name": "ENHANCE_NEGATIVE_CORRELATION", "default": False, "type": bool, "help": "Calculate Peak Spectral Power."}, ] }, { "title": "Filtering", "params": [ #{"name": "FILTER", "default": True, "type": bool, "help": "Calculate Peak Spectral Power."}, ] }, { "title": "Short Channels", "params": [ {"name": "SHORT_CHANNEL", "default": True, "type": bool, "help": "Does the data have a short channel?"}, ] }, { "title": "Extracting Events", "params": [ #{"name": "EVENTS", "default": True, "type": bool, "help": "Calculate Peak Spectral Power."}, ] }, { "title": "Epoch Calculations", "params": [ #{"name": "EVENTS", "default": True, "type": bool, "help": "Calculate Peak Spectral Power."}, ] }, { "title": "Design Matrix", "params": [ {"name": "REMOVE_EVENTS", "default": "None", "type": list, "help": "Remove events matching the names provided before generating the Design Matrix"}, # {"name": "DRIFT_MODEL", "default": "cosine", "type": str, "help": "Drift model for GLM."}, # {"name": "DURATION_BETWEEN_ACTIVITIES", "default": 35, "type": int, "help": "Time between activities (s)."}, # {"name": "SHORT_CHANNEL_REGRESSION", "default": True, "type": bool, "help": "Use short channel regression."}, ] }, { "title": "General Linear Model", "params": [ {"name": "TIME_WINDOW_START", "default": "0", "type": int, "help": "Where to start averaging the fir model bins. Only affects the significance and contrast images."}, {"name": "TIME_WINDOW_END", "default": "15", "type": int, "help": "Where to end averaging the fir model bins. Only affects the significance and contrast images."}, #{"name": "N_JOBS", "default": 1, "type": int, "help": "Number of jobs for GLM processing."}, ] }, { "title": "Finishing Touches", "params": [ # Intentionally empty (TODO) ] }, { "title": "Other", "params": [ {"name": "MAX_WORKERS", "default": 4, "type": int, "help": "Number of files to process at once."}, ] }, ] class TerminalWindow(QWidget): def __init__(self, parent=None): super().__init__(parent, Qt.WindowType.Window) self.setWindowTitle("Terminal - FLARES") self.output_area = QTextEdit() self.output_area.setReadOnly(True) self.input_line = QLineEdit() self.input_line.returnPressed.connect(self.handle_command) layout = QVBoxLayout() layout.addWidget(self.output_area) layout.addWidget(self.input_line) self.setLayout(layout) self.commands = { "hello": self.cmd_hello, "help": self.cmd_help } def handle_command(self): command_text = self.input_line.text() self.input_line.clear() self.output_area.append(f"> {command_text}") parts = command_text.strip().split() if not parts: return command_name = parts[0] args = parts[1:] func = self.commands.get(command_name) if func: try: result = func(*args) if result: self.output_area.append(str(result)) except Exception as e: self.output_area.append(f"[Error] {e}") else: self.output_area.append(f"[Unknown command] '{command_name}'") def cmd_hello(self, *args): return "Hello from the terminal!" def cmd_help(self, *args): return f"Available commands: {', '.join(self.commands.keys())}" class UpdateDownloadThread(QThread): """ Thread that downloads and extracts an update package and emits a signal on completion or error. Args: download_url (str): URL of the update zip file to download. latest_version (str): Version string of the latest update. """ update_ready = Signal(str, str) error_occurred = Signal(str) def __init__(self, download_url, latest_version): super().__init__() self.download_url = download_url self.latest_version = latest_version def run(self): try: local_filename = os.path.basename(self.download_url) if PLATFORM_NAME == 'darwin': tmp_dir = '/tmp/flarestempupdate' os.makedirs(tmp_dir, exist_ok=True) local_path = os.path.join(tmp_dir, local_filename) else: local_path = os.path.join(os.getcwd(), local_filename) # Download the file with requests.get(self.download_url, stream=True, timeout=15) as r: r.raise_for_status() with open(local_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) # Extract folder name (remove .zip) if PLATFORM_NAME == 'darwin': extract_folder = os.path.splitext(local_filename)[0] extract_path = os.path.join(tmp_dir, extract_folder) else: extract_folder = os.path.splitext(local_filename)[0] extract_path = os.path.join(os.getcwd(), extract_folder) # Create the folder if not exists os.makedirs(extract_path, exist_ok=True) # Extract the zip file contents if PLATFORM_NAME == 'darwin': subprocess.run(['ditto', '-xk', local_path, extract_path], check=True) else: with zipfile.ZipFile(local_path, 'r') as zip_ref: zip_ref.extractall(extract_path) # Remove the zip once extracted and emit a signal os.remove(local_path) self.update_ready.emit(self.latest_version, extract_path) except Exception as e: # Emit a signal signifying failure self.error_occurred.emit(str(e)) class UpdateCheckThread(QThread): """ Thread that checks for updates by querying the API and emits a signal based on the result. Signals: download_requested(str, str): Emitted with (download_url, latest_version) when an update is available. no_update_available(): Emitted when no update is found or current version is up to date. error_occurred(str): Emitted with an error message if the update check fails. """ download_requested = Signal(str, str) no_update_available = Signal() error_occurred = Signal(str) def run(self): if not getattr(sys, 'frozen', False): self.error_occurred.emit("Application is not frozen (Development mode).") return try: latest_version, download_url = self.get_latest_release_for_platform() if not latest_version: self.no_update_available.emit() return if not download_url: self.error_occurred.emit(f"No download available for platform '{PLATFORM_NAME}'") return if self.version_compare(latest_version, CURRENT_VERSION) > 0: self.download_requested.emit(download_url, latest_version) else: self.no_update_available.emit() except Exception as e: self.error_occurred.emit(f"Update check failed: {e}") def version_compare(self, v1, v2): def normalize(v): return [int(x) for x in v.split(".")] return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) def get_latest_release_for_platform(self): urls = [API_URL, API_URL_SECONDARY] for url in urls: try: response = requests.get(API_URL, timeout=5) response.raise_for_status() releases = response.json() if not releases: return None, None latest = releases[0] tag = latest["tag_name"].lstrip("v") for asset in latest.get("assets", []): if PLATFORM_NAME in asset["name"].lower(): return tag, asset["browser_download_url"] return tag, None except (requests.RequestException, ValueError) as e: continue return None, None class LocalPendingUpdateCheckThread(QThread): """ Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly. Args: current_version (str): Current application version. platform_suffix (str): Platform-specific suffix to identify update folders. """ pending_update_found = Signal(str, str) no_pending_update = Signal() def __init__(self, current_version, platform_suffix): super().__init__() self.current_version = current_version self.platform_suffix = platform_suffix def version_compare(self, v1, v2): def normalize(v): return [int(x) for x in v.split(".")] return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) def run(self): if PLATFORM_NAME == 'darwin': cwd = '/tmp/flarestempupdate' else: cwd = os.getcwd() pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$") found = False try: for item in os.listdir(cwd): folder_path = os.path.join(cwd, item) if os.path.isdir(folder_path) and item.endswith(self.platform_suffix): match = pattern.match(item) if match: folder_version = match.group(1) if self.version_compare(folder_version, self.current_version) > 0: self.pending_update_found.emit(folder_version, folder_path) found = True break except: pass if not found: self.no_pending_update.emit() class AboutWindow(QWidget): """ Simple About window displaying basic application information. Args: parent (QWidget, optional): Parent widget of this window. Defaults to None. """ def __init__(self, parent=None): super().__init__(parent, Qt.WindowType.Window) self.setWindowTitle("About FLARES") self.resize(250, 100) layout = QVBoxLayout() label = QLabel("About FLARES", self) label2 = QLabel("fNIRS Lightweight Analysis, Research, & Evaluation Suite", self) label3 = QLabel("FLARES is licensed under the GPL-3.0 licence. For more information, visit https://www.gnu.org/licenses/gpl-3.0.en.html", self) label4 = QLabel(f"Version v{CURRENT_VERSION}") layout.addWidget(label) layout.addWidget(label2) layout.addWidget(label3) layout.addWidget(label4) self.setLayout(layout) class UserGuideWindow(QWidget): """ Simple User Guide window displaying basic information on how to use the software. Args: parent (QWidget, optional): Parent widget of this window. Defaults to None. """ def __init__(self, parent=None): super().__init__(parent, Qt.WindowType.Window) self.setWindowTitle("User Guide for FLARES") self.resize(250, 100) layout = QVBoxLayout() label = QLabel("No user guide available yet!", self) layout.addWidget(label) self.setLayout(layout) class UpdateOptodesWindow(QWidget): def __init__(self, parent=None): super().__init__(parent, Qt.WindowType.Window) self.setWindowTitle("Update optode positions") self.resize(760, 200) self.label_file_a = QLabel("SNIRF file:") self.line_edit_file_a = QLineEdit() self.line_edit_file_a.setReadOnly(True) self.btn_browse_a = QPushButton("Browse .snirf") self.btn_browse_a.clicked.connect(self.browse_file_a) self.label_file_b = QLabel("TXT file:") self.line_edit_file_b = QLineEdit() self.line_edit_file_b.setReadOnly(True) self.btn_browse_b = QPushButton("Browse .txt") self.btn_browse_b.clicked.connect(self.browse_file_b) self.label_suffix = QLabel("Suffix to append to filename:") self.line_edit_suffix = QLineEdit() self.line_edit_suffix.setText("flare") self.btn_clear = QPushButton("Clear") self.btn_go = QPushButton("Go") self.btn_clear.clicked.connect(self.clear_files) self.btn_go.clicked.connect(self.go_action) # --- layout = QVBoxLayout() self.description = QLabel() self.description.setTextFormat(Qt.TextFormat.RichText) self.description.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction) self.description.setOpenExternalLinks(False) # Handle the click internally self.description.setText("Some software when creating snirf files will insert a template of optode positions as the correct position of the optodes for the participant.
" "This is rarely correct as each head differs slightly in shape or size, and a lot of calculations require the optodes to be in the correct location.
" "Using a .txt file, we can update the positions in the snirf file to match those of a digitization system such as one from Polhemus or elsewhere.
" "The .txt file should have the fiducials, detectors, and sources clearly labeled, followed by the x, y, and z coordinates seperated by a space.
" "An example format of what a digitization text file should look like can be found by clicking here.") self.description.linkActivated.connect(self.handle_link_click) layout.addWidget(self.description) help_text_a = "Select the SNIRF (.snirf) file to update with new optode positions." file_a_layout = QHBoxLayout() # Help button on the left help_btn_a = QPushButton("?") help_btn_a.setFixedWidth(25) help_btn_a.setToolTip(help_text_a) help_btn_a.clicked.connect(lambda _, text=help_text_a: self.show_help_popup(text)) file_a_layout.addWidget(help_btn_a) # Container for label + line_edit + browse button with tooltip file_a_container = QWidget() file_a_container_layout = QHBoxLayout() file_a_container_layout.setContentsMargins(0, 0, 0, 0) file_a_container_layout.addWidget(self.label_file_a) file_a_container_layout.addWidget(self.line_edit_file_a) file_a_container_layout.addWidget(self.btn_browse_a) file_a_container.setLayout(file_a_container_layout) file_a_container.setToolTip(help_text_a) file_a_layout.addWidget(file_a_container) layout.addLayout(file_a_layout) help_text_b = "Provide a .txt file with labeled optodes (e.g., nz, rpa, lpa, d1, s1) and their x, y, z coordinates." file_b_layout = QHBoxLayout() help_btn_b = QPushButton("?") help_btn_b.setFixedWidth(25) help_btn_b.setToolTip(help_text_b) help_btn_b.clicked.connect(lambda _, text=help_text_b: self.show_help_popup(text)) file_b_layout.addWidget(help_btn_b) file_b_container = QWidget() file_b_container_layout = QHBoxLayout() file_b_container_layout.setContentsMargins(0, 0, 0, 0) file_b_container_layout.addWidget(self.label_file_b) file_b_container_layout.addWidget(self.line_edit_file_b) file_b_container_layout.addWidget(self.btn_browse_b) file_b_container.setLayout(file_b_container_layout) file_b_container.setToolTip(help_text_b) file_b_layout.addWidget(file_b_container) layout.addLayout(file_b_layout) help_text_suffix = "This text will be appended to the original filename when saving. Default is 'flare'." suffix_layout = QHBoxLayout() help_btn_suffix = QPushButton("?") help_btn_suffix.setFixedWidth(25) help_btn_suffix.setToolTip(help_text_suffix) help_btn_suffix.clicked.connect(lambda _, text=help_text_suffix: self.show_help_popup(text)) suffix_layout.addWidget(help_btn_suffix) suffix_container = QWidget() suffix_container_layout = QHBoxLayout() suffix_container_layout.setContentsMargins(0, 0, 0, 0) suffix_container_layout.addWidget(self.label_suffix) suffix_container_layout.addWidget(self.line_edit_suffix) suffix_container.setLayout(suffix_container_layout) suffix_container.setToolTip(help_text_suffix) suffix_layout.addWidget(suffix_container) layout.addLayout(suffix_layout) buttons_layout = QHBoxLayout() buttons_layout.addStretch() buttons_layout.addWidget(self.btn_clear) buttons_layout.addWidget(self.btn_go) layout.addLayout(buttons_layout) self.setLayout(layout) def show_help_popup(self, text): msg = QMessageBox(self) msg.setWindowTitle("Parameter Info") msg.setText(text) msg.exec() def handle_link_click(self, link): if link == "custom_link": msg = QMessageBox(self) msg.setWindowTitle("Example Digitization File") text = "nz: -1.91 85.175 -31.1525\n" \ "rpa: 80.3825 -17.1925 -57.2775\n" \ "lpa: -81.815 -17.1925 -57.965\n" \ "d1: 0.01 -97.5175 62.5875\n" \ "d2: 25.125 -103.415 45.045\n" \ "d3: 49.095 -97.9025 30.2075\n" \ "s1: 0.01 -112.43 32.595\n" \ "s2: 30.325 -84.3125 71.8975\n" \ "s3: 0.01 -70.6875 89.0925\n" msg.setText(text) msg.exec() def browse_file_a(self): file_path, _ = QFileDialog.getOpenFileName(self, "Select SNIRF File", "", "SNIRF Files (*.snirf)") if file_path: self.line_edit_file_a.setText(file_path) def browse_file_b(self): file_path, _ = QFileDialog.getOpenFileName(self, "Select TXT File", "", "Text Files (*.txt)") if file_path: self.line_edit_file_b.setText(file_path) def clear_files(self): self.line_edit_file_a.clear() self.line_edit_file_b.clear() def go_action(self): file_a = self.line_edit_file_a.text() file_b = self.line_edit_file_b.text() suffix = self.line_edit_suffix.text().strip() or "flare" if not file_a: QMessageBox.critical(self, "Missing File", "Please select a SNIRF file.") return if not file_b: QMessageBox.critical(self, "Missing File", "Please select a TXT file.") return # Get original filename without extension base_name = os.path.splitext(os.path.basename(file_a))[0] suggested_name = f"{base_name}_{suffix}.snirf" # Open save dialog with default name save_path, _ = QFileDialog.getSaveFileName( self, "Save SNIRF File As", suggested_name, "SNIRF Files (*.snirf)" ) if not save_path: print("Save cancelled.") return # Ensure .snirf extension if not save_path.lower().endswith(".snirf"): save_path += ".snirf" try: self.update_optode_positions(file_a=file_a, file_b=file_b, save_path=save_path) except Exception as e: QMessageBox.critical(self, "Error", f"Failed to write file:\n{e}") return QMessageBox.information(self, "File Saved", f"File was saved to:\n{save_path}") def update_optode_positions(self, file_a, file_b, save_path): fiducials = {} ch_positions = {} # Read the lines from the optode file with open(file_b, 'r') as f: for line in f: if line.strip(): # Split by the semicolon and convert to meters ch_name, coords_str = line.split(":") coords = np.array(list(map(float, coords_str.strip().split()))) * 0.001 # The key we have is a fiducial if ch_name.lower() in ['lpa', 'nz', 'rpa']: fiducials[ch_name.lower()] = coords # The key we have is a source or detector else: ch_positions[ch_name.upper()] = coords # Create montage with updated coords in head space initial_montage = make_dig_montage(ch_pos=ch_positions, nasion=fiducials.get('nz'), lpa=fiducials.get('lpa'), rpa=fiducials.get('rpa'), coord_frame='head') # type: ignore # Read the SNIRF file, set the montage, and write it back raw = read_raw_snirf(file_a, preload=True) raw.set_montage(initial_montage) write_raw_snirf(raw, save_path) class UpdateEventsWindow(QWidget): def __init__(self, parent=None): super().__init__(parent, Qt.WindowType.Window) self.setWindowTitle("Update event markers") self.resize(760, 200) self.label_file_a = QLabel("SNIRF file:") self.line_edit_file_a = QLineEdit() self.line_edit_file_a.setReadOnly(True) self.btn_browse_a = QPushButton("Browse .snirf") self.btn_browse_a.clicked.connect(self.browse_file_a) self.label_file_b = QLabel("BORIS file:") self.line_edit_file_b = QLineEdit() self.line_edit_file_b.setReadOnly(True) self.btn_browse_b = QPushButton("Browse .boris") self.btn_browse_b.clicked.connect(self.browse_file_b) self.label_suffix = QLabel("Filename in BORIS project file:") self.combo_suffix = QComboBox() self.combo_suffix.setEditable(False) self.combo_suffix.currentIndexChanged.connect(self.on_observation_selected) self.label_events = QLabel("Events in selected observation:") self.combo_events = QComboBox() self.combo_events.setEnabled(False) self.label_snirf_events = QLabel("Events in SNIRF file:") self.combo_snirf_events = QComboBox() self.combo_snirf_events.setEnabled(False) self.btn_clear = QPushButton("Clear") self.btn_go = QPushButton("Go") self.btn_clear.clicked.connect(self.clear_files) self.btn_go.clicked.connect(self.go_action) # --- layout = QVBoxLayout() self.description = QLabel() self.description.setTextFormat(Qt.TextFormat.RichText) self.description.setTextInteractionFlags(Qt.TextInteractionFlag.TextBrowserInteraction) self.description.setOpenExternalLinks(True) self.description.setText("The events that are present in a snirf file may not be the events that are to be studied and examined.
" "Utilizing different software and video recordings, it is easy enough to see when an action actually occured in a file.
" "The software BORIS is used to add these events to video files, and these events can be applied to the snirf file
" "selected below by selecting the correct BORIS observation and time syncing it to an event that it shares with the snirf file.") layout.addWidget(self.description) help_text_a = "Select the SNIRF (.snirf) file to update with new event markers." file_a_layout = QHBoxLayout() # Help button on the left help_btn_a = QPushButton("?") help_btn_a.setFixedWidth(25) help_btn_a.setToolTip(help_text_a) help_btn_a.clicked.connect(lambda _, text=help_text_a: self.show_help_popup(text)) file_a_layout.addWidget(help_btn_a) # Container for label + line_edit + browse button with tooltip file_a_container = QWidget() file_a_container_layout = QHBoxLayout() file_a_container_layout.setContentsMargins(0, 0, 0, 0) file_a_container_layout.addWidget(self.label_file_a) file_a_container_layout.addWidget(self.line_edit_file_a) file_a_container_layout.addWidget(self.btn_browse_a) file_a_container.setLayout(file_a_container_layout) file_a_container.setToolTip(help_text_a) file_a_layout.addWidget(file_a_container) layout.addLayout(file_a_layout) help_text_b = "Provide a .boris project file that contains events for this participant." file_b_layout = QHBoxLayout() help_btn_b = QPushButton("?") help_btn_b.setFixedWidth(25) help_btn_b.setToolTip(help_text_b) help_btn_b.clicked.connect(lambda _, text=help_text_b: self.show_help_popup(text)) file_b_layout.addWidget(help_btn_b) file_b_container = QWidget() file_b_container_layout = QHBoxLayout() file_b_container_layout.setContentsMargins(0, 0, 0, 0) file_b_container_layout.addWidget(self.label_file_b) file_b_container_layout.addWidget(self.line_edit_file_b) file_b_container_layout.addWidget(self.btn_browse_b) file_b_container.setLayout(file_b_container_layout) file_b_container.setToolTip(help_text_b) file_b_layout.addWidget(file_b_container) layout.addLayout(file_b_layout) help_text_suffix = "This participant from the .boris project file matches the .snirf file." suffix_layout = QHBoxLayout() help_btn_suffix = QPushButton("?") help_btn_suffix.setFixedWidth(25) help_btn_suffix.setToolTip(help_text_suffix) help_btn_suffix.clicked.connect(lambda _, text=help_text_suffix: self.show_help_popup(text)) suffix_layout.addWidget(help_btn_suffix) suffix_container = QWidget() suffix_container_layout = QHBoxLayout() suffix_container_layout.setContentsMargins(0, 0, 0, 0) suffix_container_layout.addWidget(self.label_suffix) suffix_container_layout.addWidget(self.combo_suffix) suffix_container.setLayout(suffix_container_layout) suffix_container.setToolTip(help_text_suffix) suffix_layout.addWidget(suffix_container) layout.addLayout(suffix_layout) help_text_suffix = "The events extracted from the BORIS project file for the selected observation." suffix2_layout = QHBoxLayout() help_btn_suffix = QPushButton("?") help_btn_suffix.setFixedWidth(25) help_btn_suffix.setToolTip(help_text_suffix) help_btn_suffix.clicked.connect(lambda _, text=help_text_suffix: self.show_help_popup(text)) suffix2_layout.addWidget(help_btn_suffix) suffix2_container = QWidget() suffix2_container_layout = QHBoxLayout() suffix2_container_layout.setContentsMargins(0, 0, 0, 0) suffix2_container_layout.addWidget(self.label_events) suffix2_container_layout.addWidget(self.combo_events) suffix2_container.setLayout(suffix2_container_layout) suffix2_container.setToolTip(help_text_suffix) suffix2_layout.addWidget(suffix2_container) layout.addLayout(suffix2_layout) snirf_events_layout = QHBoxLayout() help_text_snirf_events = "The event markers extracted from the SNIRF file." help_btn_snirf_events = QPushButton("?") help_btn_snirf_events.setFixedWidth(25) help_btn_snirf_events.setToolTip(help_text_snirf_events) help_btn_snirf_events.clicked.connect(lambda _, text=help_text_snirf_events: self.show_help_popup(text)) snirf_events_layout.addWidget(help_btn_snirf_events) snirf_events_container = QWidget() snirf_events_container_layout = QHBoxLayout() snirf_events_container_layout.setContentsMargins(0, 0, 0, 0) snirf_events_container_layout.addWidget(self.label_snirf_events) snirf_events_container_layout.addWidget(self.combo_snirf_events) snirf_events_container.setLayout(snirf_events_container_layout) snirf_events_container.setToolTip(help_text_snirf_events) snirf_events_layout.addWidget(snirf_events_container) layout.addLayout(snirf_events_layout) buttons_layout = QHBoxLayout() buttons_layout.addStretch() buttons_layout.addWidget(self.btn_clear) buttons_layout.addWidget(self.btn_go) layout.addLayout(buttons_layout) self.setLayout(layout) def show_help_popup(self, text): msg = QMessageBox(self) msg.setWindowTitle("Parameter Info") msg.setText(text) msg.exec() def browse_file_a(self): file_path, _ = QFileDialog.getOpenFileName(self, "Select SNIRF File", "", "SNIRF Files (*.snirf)") if file_path: self.line_edit_file_a.setText(file_path) try: raw = read_raw_snirf(file_path, preload=False) annotations = raw.annotations # Build individual event entries event_entries = [] for onset, description in zip(annotations.onset, annotations.description): event_str = f"{description} @ {onset:.3f}s" event_entries.append(event_str) if not event_entries: QMessageBox.information(self, "No Events", "No events found in SNIRF file.") self.combo_snirf_events.clear() self.combo_snirf_events.setEnabled(False) return self.combo_snirf_events.clear() self.combo_snirf_events.addItems(event_entries) self.combo_snirf_events.setEnabled(True) except Exception as e: QMessageBox.warning(self, "Error", f"Could not read SNIRF file with MNE:\n{str(e)}") self.combo_snirf_events.clear() self.combo_snirf_events.setEnabled(False) def browse_file_b(self): file_path, _ = QFileDialog.getOpenFileName(self, "Select BORIS File", "", "BORIS project Files (*.boris)") if file_path: self.line_edit_file_b.setText(file_path) try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) self.boris_data = data observation_keys = self.extract_boris_observation_keys(data) self.combo_suffix.clear() self.combo_suffix.addItems(observation_keys) except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: QMessageBox.warning(self, "Error", f"Failed to parse BORIS file:\n{e}") def extract_boris_observation_keys(self, data): if "observations" not in data: raise KeyError("Missing 'observations' key in BORIS file.") observations = data["observations"] if not isinstance(observations, dict): raise TypeError("'observations' must be a dictionary.") return list(observations.keys()) def on_observation_selected(self): selected_obs = self.combo_suffix.currentText() if not selected_obs or not hasattr(self, 'boris_data'): self.combo_events.clear() self.combo_events.setEnabled(False) return try: events = self.boris_data["observations"][selected_obs]["events"] except (KeyError, TypeError): self.combo_events.clear() self.combo_events.setEnabled(False) return event_entries = [] for event in events: if isinstance(event, list) and len(event) >= 3: timestamp = event[0] label = event[2] display = f"{label} @ {timestamp:.3f}" event_entries.append(display) self.combo_events.clear() self.combo_events.addItems(event_entries) self.combo_events.setEnabled(bool(event_entries)) def clear_files(self): self.line_edit_file_a.clear() self.line_edit_file_b.clear() def go_action(self): file_a = self.line_edit_file_a.text() file_b = self.line_edit_file_b.text() suffix = "flare" if not hasattr(self, "boris_data") or self.combo_events.count() == 0 or self.combo_snirf_events.count() == 0: QMessageBox.warning(self, "Missing data", "Please make sure a BORIS and SNIRF event are selected.") return # Extract BORIS anchor try: boris_label, boris_time_str = self.combo_events.currentText().split(" @ ") boris_anchor_time = float(boris_time_str.replace("s", "").strip()) except Exception as e: QMessageBox.critical(self, "BORIS Event Error", f"Could not parse BORIS anchor event:\n{e}") return # Extract SNIRF anchor try: snirf_label, snirf_time_str = self.combo_snirf_events.currentText().split(" @ ") snirf_anchor_time = float(snirf_time_str.replace("s", "").strip()) except Exception as e: QMessageBox.critical(self, "SNIRF Event Error", f"Could not parse SNIRF anchor event:\n{e}") return time_shift = snirf_anchor_time - boris_anchor_time selected_obs = self.combo_suffix.currentText() if not selected_obs or selected_obs not in self.boris_data["observations"]: QMessageBox.warning(self, "Invalid selection", "Selected observation not found in BORIS file.") return boris_events = self.boris_data["observations"][selected_obs].get("events", []) if not boris_events: QMessageBox.warning(self, "No BORIS events", "No events found in selected BORIS observation.") return snirf_path = self.line_edit_file_a.text() if not snirf_path: QMessageBox.warning(self, "No SNIRF file", "Please select a SNIRF file.") return base_name = os.path.splitext(os.path.basename(file_a))[0] suggested_name = f"{base_name}_{suffix}.snirf" # Open save dialog save_path, _ = QFileDialog.getSaveFileName( self, "Save SNIRF File As", suggested_name, "SNIRF Files (*.snirf)" ) if not save_path: print("Save cancelled.") return if not save_path.lower().endswith(".snirf"): save_path += ".snirf" try: raw = read_raw_snirf(snirf_path, preload=True) onsets = [] durations = [] descriptions = [] open_events = {} # label -> list of start times label_counts = {} used_times = set() sfreq = raw.info['sfreq'] # sampling frequency in Hz min_shift = 1.0 / sfreq max_attempts = 10 for event in boris_events: if not isinstance(event, list) or len(event) < 3: continue event_time = event[0] label = event[2] count = label_counts.get(label, 0) + 1 label_counts[label] = count if label not in open_events: open_events[label] = [] if count % 2 == 1: # Odd occurrence = start event open_events[label].append(event_time) else: # Even occurrence = end event if open_events[label]: matched_start = open_events[label].pop(0) duration = event_time - matched_start if duration <= 0: print(f"Warning: Duration for {label} is non-positive ({duration}). Skipping.") continue shifted_start = matched_start + time_shift adjusted_time = shifted_start attempts = 0 while round(adjusted_time, 6) in used_times and attempts < max_attempts: adjusted_time += min_shift attempts += 1 if attempts == max_attempts: print(f"Warning: Couldn't find unique time for {label} @ {matched_start}s. Skipping.") continue adjusted_time = round(adjusted_time, 6) used_times.add(adjusted_time) print(f"Adding event: {label} @ {adjusted_time:.3f}s for {duration:.3f}s") onsets.append(adjusted_time) durations.append(duration) descriptions.append(label) else: print(f"Warning: Unmatched end for label '{label}' at {event_time:.3f}s. Skipping.") # Optionally warn about any unmatched starts left open for label, starts in open_events.items(): for start_time in starts: shifted_start = start_time + time_shift adjusted_time = shifted_start attempts = 0 while round(adjusted_time, 6) in used_times and attempts < max_attempts: adjusted_time += min_shift attempts += 1 if attempts == max_attempts: print(f"Warning: Couldn't find unique time for unmatched start {label} @ {start_time}s. Skipping.") continue adjusted_time = round(adjusted_time, 6) used_times.add(adjusted_time) print(f"Warning: Unmatched start for label '{label}' at {start_time:.3f}s. Adding with duration 0.") onsets.append(adjusted_time) durations.append(0.0) descriptions.append(label) new_annotations = Annotations(onset=onsets, duration=durations, description=descriptions) raw.set_annotations(new_annotations) write_raw_snirf(raw, save_path) QMessageBox.information(self, "Success", "SNIRF file updated with aligned BORIS events.") except Exception as e: QMessageBox.critical(self, "Error", f"Failed to update SNIRF file:\n{e}") def update_optode_positions(self, file_a, file_b, save_path): fiducials = {} ch_positions = {} # Read the lines from the optode file with open(file_b, 'r') as f: for line in f: if line.strip(): # Split by the semicolon and convert to meters ch_name, coords_str = line.split(":") coords = np.array(list(map(float, coords_str.strip().split()))) * 0.001 # The key we have is a fiducial if ch_name.lower() in ['lpa', 'nz', 'rpa']: fiducials[ch_name.lower()] = coords # The key we have is a source or detector else: ch_positions[ch_name.upper()] = coords # Create montage with updated coords in head space initial_montage = make_dig_montage(ch_pos=ch_positions, nasion=fiducials.get('nz'), lpa=fiducials.get('lpa'), rpa=fiducials.get('rpa'), coord_frame='head') # type: ignore # Read the SNIRF file, set the montage, and write it back raw = read_raw_snirf(file_a, preload=True) raw.set_montage(initial_montage) write_raw_snirf(raw, save_path) class ProgressBubble(QWidget): """ A clickable widget displaying a progress bar made of colored rectangles and a label. Args: display_name (str): Text to display above the progress bar. file_path (str): Associated file path stored with the bubble. """ clicked = Signal(object) def __init__(self, display_name, file_path): super().__init__() self.layout = QVBoxLayout() self.label = QLabel(display_name) self.base_text = display_name self.label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.label.setStyleSheet(""" QLabel { border: 1px solid #888; border-radius: 10px; padding: 8px 12px; background-color: #e0f0ff; } """) self.progress_layout = QHBoxLayout() self.rects = [] for _ in range(19): rect = QFrame() rect.setFixedSize(10, 18) rect.setStyleSheet("background-color: white; border: 1px solid gray;") self.progress_layout.addWidget(rect) self.rects.append(rect) self.layout.addWidget(self.label) self.layout.addLayout(self.progress_layout) self.setLayout(self.layout) # Store the file path self.file_path = os.path.normpath(file_path) self.current_step = 0 # Make the bubble clickable self.setCursor(Qt.CursorShape.PointingHandCursor) # Resize policy to make bubbles responsive # TODO: Not only do this once but when window is resized too. Also just doesnt work self.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Minimum) def update_progress(self, step_index): self.current_step = step_index for i, rect in enumerate(self.rects): if i < step_index: rect.setStyleSheet("background-color: green; border: 1px solid gray;") elif i == step_index: rect.setStyleSheet("background-color: yellow; border: 1px solid gray;") else: rect.setStyleSheet("background-color: white; border: 1px solid gray;") def mark_cancelled(self): if 0 <= self.current_step < len(self.rects): rect = self.rects[self.current_step] rect.setStyleSheet("background-color: red; border: 1px solid gray;") def mousePressEvent(self, event): self.clicked.emit(self) super().mousePressEvent(event) def setSuffixText(self, suffix): if suffix: self.label.setText(f"{self.base_text} {suffix}") else: self.label.setText(self.base_text) class ParamSection(QWidget): """ A widget section that dynamically creates labeled input fields from parameter metadata. Args: section_data (dict): Dictionary containing section title and list of parameter info. Expected format: { "title": str, "params": [ { "name": str, "type": type, "default": any, "help": str (optional) }, ... ] } """ def __init__(self, section_data): super().__init__() layout = QVBoxLayout() self.setLayout(layout) self.widgets = {} self.selected_path = None # Title label title_label = QLabel(section_data["title"]) title_label.setStyleSheet("font-weight: bold; font-size: 14px; margin-top: 10px; margin-bottom: 5px;") layout.addWidget(title_label) # Horizontal line line = QFrame() line.setFrameShape(QFrame.Shape.HLine) line.setFrameShadow(QFrame.Shadow.Sunken) layout.addWidget(line) for param in section_data["params"]: h_layout = QHBoxLayout() label = QLabel(param["name"]) label.setFixedWidth(180) label.setToolTip(param.get("help", "")) help_text = param.get("help", "") help_btn = QPushButton("?") help_btn.setFixedWidth(25) help_btn.setToolTip(help_text) help_btn.clicked.connect(lambda _, text=help_text: self.show_help_popup(text)) h_layout.addWidget(help_btn) h_layout.setStretch(0, 1) # Set stretch factor for button (10%) h_layout.addWidget(label) h_layout.setStretch(1, 3) # Set the stretch factor for label (40%) # Create input widget based on type if param["type"] == bool: widget = QComboBox() widget.addItems(["True", "False"]) widget.setCurrentText(str(param["default"])) elif param["type"] == int: widget = QLineEdit() widget.setValidator(QIntValidator()) widget.setText(str(param["default"])) elif param["type"] == float: widget = QLineEdit() widget.setValidator(QDoubleValidator()) widget.setText(str(param["default"])) elif param["type"] == list: widget = self._create_multiselect_dropdown(None) else: widget = QLineEdit() widget.setText(str(param["default"])) widget.setToolTip(help_text) h_layout.addWidget(widget) h_layout.setStretch(2, 5) # Set stretch factor for input field (50%) layout.addLayout(h_layout) self.widgets[param["name"]] = { "widget": widget, "type": param["type"] } def _create_multiselect_dropdown(self, items): combo = FullClickComboBox() combo.setView(QListView()) model = QStandardItemModel() combo.setModel(model) combo.setEditable(True) combo.lineEdit().setReadOnly(True) combo.lineEdit().setPlaceholderText("Select...") dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(toggle_item) if items is not None: for item in items: standard_item = QStandardItem(item) standard_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) standard_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(standard_item) combo.setInsertPolicy(QComboBox.NoInsert) def on_view_clicked(index): item = model.itemFromIndex(index) if item.isCheckable(): new_state = Qt.Checked if item.checkState() == Qt.Unchecked else Qt.Unchecked item.setCheckState(new_state) combo.view().pressed.connect(on_view_clicked) self._updating_checkstates = False def on_item_changed(item): if self._updating_checkstates: return self._updating_checkstates = True normal_items = [model.item(i) for i in range(2, model.rowCount())] # skip dummy and toggle if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) if all_checked: for i in normal_items: i.setCheckState(Qt.Unchecked) toggle_item.setCheckState(Qt.Unchecked) else: for i in normal_items: i.setCheckState(Qt.Checked) toggle_item.setCheckState(Qt.Checked) elif item == dummy_item: pass else: # When normal items change, update toggle item all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) self._updating_checkstates = False for param_name, info in self.widgets.items(): if info["widget"] == combo: self.update_dropdown_label(param_name) break model.itemChanged.connect(on_item_changed) combo.setInsertPolicy(QComboBox.NoInsert) return combo def show_help_popup(self, text): msg = QMessageBox(self) msg.setWindowTitle("Parameter Info") msg.setText(text) msg.exec() def get_param_values(self): values = {} for name, info in self.widgets.items(): widget = info["widget"] expected_type = info["type"] if expected_type == bool: values[name] = widget.currentText() == "True" elif expected_type == list: values[name] = [x.strip() for x in widget.lineEdit().text().split(",") if x.strip()] else: raw_text = widget.text() try: if expected_type == int: values[name] = int(raw_text) elif expected_type == float: values[name] = float(raw_text) elif expected_type == str: values[name] = raw_text else: values[name] = raw_text # Fallback except Exception as e: raise ValueError(f"Invalid value for {name}: {raw_text}") from e return values def update_dropdown_items(self, param_name, new_items): """ Updates the items in a multi-select dropdown parameter field. Args: param_name (str): The parameter name (must match one in self.widgets). new_items (list): The new items to populate in the dropdown. """ widget_info = self.widgets.get(param_name) print("[ParamSection] Current widget keys:", list(self.widgets.keys())) if not widget_info: print(f"[ParamSection] No widget found for param '{param_name}'") return widget = widget_info["widget"] if not isinstance(widget, FullClickComboBox): print(f"[ParamSection] Widget for param '{param_name}' is not a FullClickComboBox") return # Replace the model on the existing widget new_model = QStandardItemModel() dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) new_model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) new_model.appendRow(toggle_item) for item_text in new_items: item = QStandardItem(item_text) item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) item.setData(Qt.Unchecked, Qt.CheckStateRole) new_model.appendRow(item) widget.setModel(new_model) widget.setView(QListView()) # Reset view to refresh properly def on_view_clicked(index): item = new_model.itemFromIndex(index) if item.isCheckable(): new_state = Qt.Checked if item.checkState() == Qt.Unchecked else Qt.Unchecked item.setCheckState(new_state) widget.view().pressed.connect(on_view_clicked) def on_item_changed(item): if getattr(self, "_updating_checkstates", False): return self._updating_checkstates = True normal_items = [new_model.item(i) for i in range(2, new_model.rowCount())] if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) for i in normal_items: i.setCheckState(Qt.Unchecked if all_checked else Qt.Checked) toggle_item.setCheckState(Qt.Unchecked if all_checked else Qt.Checked) else: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) self._updating_checkstates = False for param_name, info in self.widgets.items(): if info["widget"] == widget: self.update_dropdown_label(param_name) break new_model.itemChanged.connect(on_item_changed) widget.lineEdit().setText("") def _get_checked_items(self, combo): checked = [] model = combo.model() for i in range(model.rowCount()): item = model.item(i) if item.text() in ("", "Toggle Select All"): continue if item.checkState() == Qt.Checked: checked.append(item.text()) return checked def update_dropdown_label(self, param_name): widget_info = self.widgets.get(param_name) if not widget_info: print(f"[ParamSection] No widget found for param '{param_name}'") return widget = widget_info["widget"] if not isinstance(widget, FullClickComboBox): print(f"[ParamSection] Widget for param '{param_name}' is not a FullClickComboBox") return selected = self._get_checked_items(widget) if not selected: widget.lineEdit().setText("") else: # You can customize how you display selected items here: widget.lineEdit().setText(", ".join(selected)) def update_annotation_dropdown_from_folder(self, folder_path): """ Reads all EEG files in the given folder, extracts annotations using MNE, and updates the dropdown for the `target_param` with the set of common annotations. Args: folder_param (str): The name of the parameter holding the folder path. target_param (str): The name of the multi-select dropdown to update. """ # folder_path_widget = self.widgets.get(folder_param, {}).get("widget") # if not folder_path_widget: # print(f"[ParamSection] Folder path param '{folder_param}' not found.") # return # folder_path = folder_path_widget.text().strip() # if not os.path.isdir(folder_path): # print(f"[ParamSection] '{folder_path}' is not a valid directory.") # return annotation_sets = [] for filename in os.listdir(folder_path): full_path = os.path.join(folder_path, filename) try: raw = read_raw_snirf(full_path, preload=False, verbose="ERROR") annotations = raw.annotations if annotations is not None: labels = set(annotations.description) annotation_sets.append(labels) except Exception as e: print(f"[ParamSection] Skipping file '{filename}' due to error: {e}") if not annotation_sets: print(f"[ParamSection] No annotations found in folder '{folder_path}'") return # Get common annotations common_annotations = set.intersection(*annotation_sets) if len(annotation_sets) > 1 else annotation_sets[0] common_annotations = sorted(list(common_annotations)) # for consistent order print(f"[ParamSection] Common annotations: {common_annotations}") # Update the dropdown self.update_dropdown_items("REMOVE_EVENTS", common_annotations) class FullClickLineEdit(QLineEdit): def mousePressEvent(self, event): combo = self.parent() if isinstance(combo, QComboBox): combo.showPopup() super().mousePressEvent(event) class FullClickComboBox(QComboBox): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.setLineEdit(FullClickLineEdit(self)) self.lineEdit().setReadOnly(True) class ParticipantViewerWidget(QWidget): def __init__(self, haemo_dict, fig_bytes_dict): super().__init__() self.setWindowTitle("FLARES Participant Viewer") self.haemo_dict = haemo_dict self.fig_bytes_dict = fig_bytes_dict # Create mappings: file_path -> participant label and dropdown display text self.participant_map = {} # file_path -> "Participant 1" self.participant_dropdown_items = [] # "Participant 1 (filename)" for i, file_path in enumerate(self.haemo_dict.keys(), start=1): short_label = f"Participant {i}" display_label = f"{short_label} ({os.path.basename(file_path)})" self.participant_map[file_path] = short_label self.participant_dropdown_items.append(display_label) self.layout = QVBoxLayout(self) self.top_bar = QHBoxLayout() self.layout.addLayout(self.top_bar) self.participant_dropdown = self._create_multiselect_dropdown(self.participant_dropdown_items) self.participant_dropdown.currentIndexChanged.connect(self.update_participant_dropdown_label) first_fig_dict = next(iter(self.fig_bytes_dict.values())) image_label_items = list(first_fig_dict.keys()) self.image_index_dropdown = self._create_multiselect_dropdown(image_label_items) self.image_index_dropdown.currentIndexChanged.connect(self.update_image_index_dropdown_label) self.submit_button = QPushButton("Submit") self.submit_button.clicked.connect(self.show_selected_images) self.top_bar.addWidget(QLabel("Participants:")) self.top_bar.addWidget(self.participant_dropdown) self.top_bar.addWidget(QLabel("Image Indexes:")) self.top_bar.addWidget(self.image_index_dropdown) self.top_bar.addWidget(self.submit_button) self.scroll = QScrollArea() self.scroll.setWidgetResizable(True) self.scroll_content = QWidget() self.grid_layout = QGridLayout(self.scroll_content) self.scroll.setWidget(self.scroll_content) self.layout.addWidget(self.scroll) self.thumb_size = QSize(280, 180) self.save_button = QPushButton("Save Displayed Images") self.save_button.clicked.connect(self.save_displayed_images) self.top_bar.addWidget(self.save_button) self.showMaximized() def _create_multiselect_dropdown(self, items): combo = FullClickComboBox() combo.setView(QListView()) model = QStandardItemModel() combo.setModel(model) combo.setEditable(True) combo.lineEdit().setReadOnly(True) combo.lineEdit().setPlaceholderText("Select...") dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(toggle_item) for item in items: standard_item = QStandardItem(item) standard_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) standard_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(standard_item) combo.setInsertPolicy(QComboBox.NoInsert) def on_view_clicked(index): item = model.itemFromIndex(index) if item.isCheckable(): new_state = Qt.Checked if item.checkState() == Qt.Unchecked else Qt.Unchecked item.setCheckState(new_state) combo.view().pressed.connect(on_view_clicked) self._updating_checkstates = False def on_item_changed(item): if self._updating_checkstates: return self._updating_checkstates = True normal_items = [model.item(i) for i in range(2, model.rowCount())] # skip dummy and toggle if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) if all_checked: for i in normal_items: i.setCheckState(Qt.Unchecked) toggle_item.setCheckState(Qt.Unchecked) else: for i in normal_items: i.setCheckState(Qt.Checked) toggle_item.setCheckState(Qt.Checked) elif item == dummy_item: pass else: # When normal items change, update toggle item all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) # Update label text immediately after change if combo == self.participant_dropdown: self.update_participant_dropdown_label() elif combo == self.image_index_dropdown: self.update_image_index_dropdown_label() self._updating_checkstates = False model.itemChanged.connect(on_item_changed) combo.setInsertPolicy(QComboBox.NoInsert) return combo def _get_checked_items(self, combo): checked = [] model = combo.model() for i in range(model.rowCount()): item = model.item(i) # Skip dummy and toggle items: if item.text() in ("", "Toggle Select All"): continue if item.checkState() == Qt.Checked: checked.append(item.text()) return checked def update_participant_dropdown_label(self): selected = self._get_checked_items(self.participant_dropdown) if not selected: self.participant_dropdown.lineEdit().setText("") else: # Extract just "Participant N" from "Participant N (filename)" selected_short = [s.split(" ")[0] + " " + s.split(" ")[1] for s in selected] self.participant_dropdown.lineEdit().setText(", ".join(selected_short)) def update_image_index_dropdown_label(self): selected = self._get_checked_items(self.image_index_dropdown) if not selected: self.image_index_dropdown.lineEdit().setText("") else: # Only show the index part self.image_index_dropdown.lineEdit().setText(", ".join(selected)) def show_selected_images(self): # Clear previous images for i in reversed(range(self.grid_layout.count())): widget = self.grid_layout.itemAt(i).widget() if widget: widget.setParent(None) selected_display_names = self._get_checked_items(self.participant_dropdown) # Map from display names back to file paths selected_file_paths = [] for display_name in selected_display_names: # Find file_path by matching display name for fp, short_label in self.participant_map.items(): expected_display = f"{short_label} ({os.path.basename(fp)})" if display_name == expected_display: selected_file_paths.append(fp) break selected_labels = self._get_checked_items(self.image_index_dropdown) row, col = 0, 0 for file_path in selected_file_paths: fig_list = self.fig_bytes_dict.get(file_path, []) participant_label = self.participant_map[file_path] for label in selected_labels: fig_bytes = fig_list.get(label) if not fig_bytes: continue full_pixmap = QPixmap() full_pixmap.loadFromData(fig_bytes) thumbnail_pixmap = full_pixmap.scaled( self.thumb_size, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation ) container = QWidget() hlayout = QHBoxLayout(container) hlayout.setContentsMargins(0, 0, 0, 0) hlayout.setSpacing(0) hlayout.setAlignment(Qt.AlignmentFlag.AlignCenter) image_label = ClickableLabel(full_pixmap, thumbnail_pixmap) image_label.setToolTip(f"{participant_label}\n{label}") hlayout.addWidget(image_label) self.grid_layout.addWidget(container, row, col) col += 1 if col >= 6: col = 0 row += 1 # Update dropdown labels after display self.update_participant_dropdown_label() self.update_image_index_dropdown_label() def save_displayed_images(self): # Ensure the folder exists save_dir = Path("individual_images") save_dir.mkdir(exist_ok=True) selected_display_names = self._get_checked_items(self.participant_dropdown) selected_image_labels = self._get_checked_items(self.image_index_dropdown) for display_name in selected_display_names: # Match display name to file path for file_path, short_label in self.participant_map.items(): expected_display = f"{short_label} ({os.path.basename(file_path)})" if display_name == expected_display: fig_dict = self.fig_bytes_dict.get(file_path, {}) for label in selected_image_labels: if label not in fig_dict: continue fig_bytes = fig_dict[label] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"{os.path.basename(file_path)}_{label}_{timestamp}.png" output_path = save_dir / filename with open(output_path, "wb") as f: f.write(fig_bytes) break # file_path matched; stop loop QMessageBox.information(self, "Save Complete", f"Images saved to {save_dir.resolve()}") class ClickableLabel(QLabel): def __init__(self, full_pixmap: QPixmap, thumbnail_pixmap: QPixmap): super().__init__() self._pixmap_full = full_pixmap self.setPixmap(thumbnail_pixmap) self.setAlignment(Qt.AlignmentFlag.AlignCenter) self.setFixedSize(thumbnail_pixmap.size()) self.setStyleSheet("border: 1px solid gray; margin: 2px;") def mousePressEvent(self, event): viewer = QWidget() viewer.setWindowTitle("Expanded View") layout = QVBoxLayout(viewer) label = QLabel() label.setPixmap(self._pixmap_full) label.setAlignment(Qt.AlignmentFlag.AlignCenter) layout.addWidget(label) viewer.resize(1000, 800) viewer.show() self._expanded_viewer = viewer # keep reference alive class ParticipantBrainViewerWidget(QWidget): def __init__(self, haemo_dict, cha_dict): super().__init__() self.setWindowTitle("FLARES Participant Brain Viewer") self.haemo_dict = haemo_dict self.cha_dict = cha_dict # Create mappings: file_path -> participant label and dropdown display text self.participant_map = {} # file_path -> "Participant 1" self.participant_dropdown_items = [] # "Participant 1 (filename)" for i, file_path in enumerate(self.haemo_dict.keys(), start=1): short_label = f"Participant {i}" display_label = f"{short_label} ({os.path.basename(file_path)})" self.participant_map[file_path] = short_label self.participant_dropdown_items.append(display_label) self.layout = QVBoxLayout(self) self.top_bar = QHBoxLayout() self.layout.addLayout(self.top_bar) self.participant_dropdown = self._create_multiselect_dropdown(self.participant_dropdown_items) self.participant_dropdown.currentIndexChanged.connect(self.update_participant_dropdown_label) self.event_dropdown = QComboBox() self.event_dropdown.addItem("") self.index_texts = [ "0 (Brain Landmarks)", "1 (Brain Activity Visualization)", # "2 (third image)", # "3 (fourth image)", ] self.image_index_dropdown = self._create_multiselect_dropdown(self.index_texts) self.image_index_dropdown.currentIndexChanged.connect(self.update_image_index_dropdown_label) self.submit_button = QPushButton("Submit") self.submit_button.clicked.connect(self.show_brain_images) self.top_bar.addWidget(QLabel("Participants:")) self.top_bar.addWidget(self.participant_dropdown) self.top_bar.addWidget(QLabel("Event:")) self.top_bar.addWidget(self.event_dropdown) self.top_bar.addWidget(QLabel("Image Indexes:")) self.top_bar.addWidget(self.image_index_dropdown) self.top_bar.addWidget(self.submit_button) self.scroll = QScrollArea() self.scroll.setWidgetResizable(True) self.scroll_content = QWidget() self.grid_layout = QGridLayout(self.scroll_content) self.scroll.setWidget(self.scroll_content) self.layout.addWidget(self.scroll) self.thumb_size = QSize(280, 180) self.showMaximized() def _create_multiselect_dropdown(self, items): combo = FullClickComboBox() combo.setView(QListView()) model = QStandardItemModel() combo.setModel(model) combo.setEditable(True) combo.lineEdit().setReadOnly(True) combo.lineEdit().setPlaceholderText("Select...") dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(toggle_item) for item in items: standard_item = QStandardItem(item) standard_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) standard_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(standard_item) combo.setInsertPolicy(QComboBox.NoInsert) def on_view_clicked(index): item = model.itemFromIndex(index) if item.isCheckable(): new_state = Qt.Checked if item.checkState() == Qt.Unchecked else Qt.Unchecked item.setCheckState(new_state) combo.view().pressed.connect(on_view_clicked) self._updating_checkstates = False def on_item_changed(item): if self._updating_checkstates: return self._updating_checkstates = True normal_items = [model.item(i) for i in range(2, model.rowCount())] # skip dummy and toggle if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) if all_checked: for i in normal_items: i.setCheckState(Qt.Unchecked) toggle_item.setCheckState(Qt.Unchecked) else: for i in normal_items: i.setCheckState(Qt.Checked) toggle_item.setCheckState(Qt.Checked) elif item == dummy_item: pass else: # When normal items change, update toggle item all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) # Update label text immediately after change if combo == self.participant_dropdown: self.update_participant_dropdown_label() elif combo == self.image_index_dropdown: self.update_image_index_dropdown_label() self._updating_checkstates = False model.itemChanged.connect(on_item_changed) combo.setInsertPolicy(QComboBox.NoInsert) return combo def _get_checked_items(self, combo): checked = [] model = combo.model() for i in range(model.rowCount()): item = model.item(i) # Skip dummy and toggle items: if item.text() in ("", "Toggle Select All"): continue if item.checkState() == Qt.Checked: checked.append(item.text()) return checked def update_participant_dropdown_label(self): selected = self._get_checked_items(self.participant_dropdown) if not selected: self.participant_dropdown.lineEdit().setText("") else: # Extract just "Participant N" from "Participant N (filename)" selected_short = [s.split(" ")[0] + " " + s.split(" ")[1] for s in selected] self.participant_dropdown.lineEdit().setText(", ".join(selected_short)) self._update_event_dropdown() def update_image_index_dropdown_label(self): selected = self._get_checked_items(self.image_index_dropdown) if not selected: self.image_index_dropdown.lineEdit().setText("") else: # Only show the index part index_labels = [s.split(" ")[0] for s in selected] self.image_index_dropdown.lineEdit().setText(", ".join(index_labels)) def _update_event_dropdown(self): selected_display_names = self._get_checked_items(self.participant_dropdown) selected_file_paths = [] for display_name in selected_display_names: for fp, short_label in self.participant_map.items(): expected_display = f"{short_label} ({os.path.basename(fp)})" if display_name == expected_display: selected_file_paths.append(fp) break if not selected_file_paths: self.event_dropdown.clear() self.event_dropdown.addItem("") return annotation_sets = [] for file_path in selected_file_paths: raw = self.haemo_dict.get(file_path) if raw is None or not hasattr(raw, "annotations"): continue annotations = set(raw.annotations.description) annotation_sets.append(annotations) if not annotation_sets: self.event_dropdown.clear() self.event_dropdown.addItem("") return shared_annotations = set.intersection(*annotation_sets) self.event_dropdown.clear() self.event_dropdown.addItem("") for ann in sorted(shared_annotations): self.event_dropdown.addItem(ann) def show_brain_images(self): import flares selected_event = self.event_dropdown.currentText() if selected_event == "": selected_event = None selected_display_names = self._get_checked_items(self.participant_dropdown) selected_file_paths = [] for display_name in selected_display_names: for fp, short_label in self.participant_map.items(): expected_display = f"{short_label} ({os.path.basename(fp)})" if display_name == expected_display: selected_file_paths.append(fp) break selected_indexes = [ int(s.split(" ")[0]) for s in self._get_checked_items(self.image_index_dropdown) ] parameterized_indexes = { 0: [ { "key": "show_optodes", "label": "Determine what is rendered above the brain. Valid values are 'sensors', 'labels', 'none', 'all'.", "default": "all", "type": str, }, { "key": "show_brodmann", "label": "Show common brodmann areas on the brain.", "default": "True", "type": bool, } ], 1: [ { "key": "show_optodes", "label": "Determine what is rendered above the brain. Valid values are 'sensors', 'labels', 'none', 'all'.", "default": "all", "type": str, }, { "key": "t_or_theta", "label": "Specify if t values or theta values should be plotted. Valid values are 't', 'theta'", "default": "theta", "type": str, }, { "key": "show_text", "label": "Display informative text on the top left corner. THIS DOES NOT WORK AND SHOULD BE LEFT AT FALSE", "default": "False", "type": bool, }, { "key": "brain_bounds", "label": "Graph Upper/Lower Limit", "default": "1.0", "type": float, } ], } # Inject full_text from index_texts for idx, params_list in parameterized_indexes.items(): full_text = self.index_texts[idx] if idx < len(self.index_texts) else f"{idx} (No label found)" for param_info in params_list: param_info["full_text"] = full_text indexes_needing_params = {idx: parameterized_indexes[idx] for idx in selected_indexes if idx in parameterized_indexes} param_values = {} if indexes_needing_params: dialog = ParameterInputDialog(indexes_needing_params, parent=self) if dialog.exec_() == QDialog.Accepted: param_values = dialog.get_values() if param_values is None: return else: return # Pass the necessary arguments to each method for file_path in selected_file_paths: haemo_obj = self.haemo_dict.get(file_path) if haemo_obj is None: continue cha = self.cha_dict.get(file_path) for idx in selected_indexes: if idx == 0: params = param_values.get(idx, {}) show_optodes = params.get("show_optodes", None) show_brodmann = params.get("show_brodmann", None) if show_optodes is None or show_brodmann is None: print(f"Missing parameters for index {idx}, skipping.") continue flares.brain_landmarks_3d(haemo_obj, show_optodes, show_brodmann) elif idx == 1: params = param_values.get(idx, {}) show_optodes = params.get("show_optodes", None) t_or_theta = params.get("t_or_theta", None) show_text = params.get("show_text", None) brain_bounds = params.get("brain_bounds", None) if show_optodes is None or t_or_theta is None or show_text is None or brain_bounds is None: print(f"Missing parameters for index {idx}, skipping.") continue flares.brain_3d_visualization(haemo_obj, cha, selected_event, t_or_theta=t_or_theta, show_optodes=show_optodes, show_text=show_text, brain_bounds=brain_bounds) else: print(f"No method defined for index {idx}") class ParticipantFoldChannelsWidget(QWidget): def __init__(self, haemo_dict, cha_dict): super().__init__() self.setWindowTitle("FLARES Participant Fold Channels Viewer") self.haemo_dict = haemo_dict self.cha_dict = cha_dict # Create mappings: file_path -> participant label and dropdown display text self.participant_map = {} # file_path -> "Participant 1" self.participant_dropdown_items = [] # "Participant 1 (filename)" for i, file_path in enumerate(self.haemo_dict.keys(), start=1): short_label = f"Participant {i}" display_label = f"{short_label} ({os.path.basename(file_path)})" self.participant_map[file_path] = short_label self.participant_dropdown_items.append(display_label) self.layout = QVBoxLayout(self) self.top_bar = QHBoxLayout() self.layout.addLayout(self.top_bar) self.participant_dropdown = self._create_multiselect_dropdown(self.participant_dropdown_items) self.participant_dropdown.currentIndexChanged.connect(self.update_participant_dropdown_label) self.index_texts = [ "0 (Fold Channels)", # "1 (second image)", # "2 (third image)", # "3 (fourth image)", ] self.image_index_dropdown = self._create_multiselect_dropdown(self.index_texts) self.image_index_dropdown.currentIndexChanged.connect(self.update_image_index_dropdown_label) self.submit_button = QPushButton("Submit") self.submit_button.clicked.connect(self.show_fold_images) self.top_bar.addWidget(QLabel("Participants:")) self.top_bar.addWidget(self.participant_dropdown) self.top_bar.addWidget(QLabel("Fold Type:")) self.top_bar.addWidget(self.image_index_dropdown) self.top_bar.addWidget(QLabel("This will cause the app to hang for ~30s/Participant!")) self.top_bar.addWidget(self.submit_button) self.scroll = QScrollArea() self.scroll.setWidgetResizable(True) self.scroll_content = QWidget() self.grid_layout = QGridLayout(self.scroll_content) self.scroll.setWidget(self.scroll_content) self.layout.addWidget(self.scroll) self.thumb_size = QSize(280, 180) self.showMaximized() def _create_multiselect_dropdown(self, items): combo = FullClickComboBox() combo.setView(QListView()) model = QStandardItemModel() combo.setModel(model) combo.setEditable(True) combo.lineEdit().setReadOnly(True) combo.lineEdit().setPlaceholderText("Select...") dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(toggle_item) for item in items: standard_item = QStandardItem(item) standard_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) standard_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(standard_item) combo.setInsertPolicy(QComboBox.NoInsert) def on_view_clicked(index): item = model.itemFromIndex(index) if item.isCheckable(): new_state = Qt.Checked if item.checkState() == Qt.Unchecked else Qt.Unchecked item.setCheckState(new_state) combo.view().pressed.connect(on_view_clicked) self._updating_checkstates = False def on_item_changed(item): if self._updating_checkstates: return self._updating_checkstates = True normal_items = [model.item(i) for i in range(2, model.rowCount())] # skip dummy and toggle if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) if all_checked: for i in normal_items: i.setCheckState(Qt.Unchecked) toggle_item.setCheckState(Qt.Unchecked) else: for i in normal_items: i.setCheckState(Qt.Checked) toggle_item.setCheckState(Qt.Checked) elif item == dummy_item: pass else: # When normal items change, update toggle item all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) # Update label text immediately after change if combo == self.participant_dropdown: self.update_participant_dropdown_label() elif combo == self.image_index_dropdown: self.update_image_index_dropdown_label() self._updating_checkstates = False model.itemChanged.connect(on_item_changed) combo.setInsertPolicy(QComboBox.NoInsert) return combo def _get_checked_items(self, combo): checked = [] model = combo.model() for i in range(model.rowCount()): item = model.item(i) # Skip dummy and toggle items: if item.text() in ("", "Toggle Select All"): continue if item.checkState() == Qt.Checked: checked.append(item.text()) return checked def update_participant_dropdown_label(self): selected = self._get_checked_items(self.participant_dropdown) if not selected: self.participant_dropdown.lineEdit().setText("") else: # Extract just "Participant N" from "Participant N (filename)" selected_short = [s.split(" ")[0] + " " + s.split(" ")[1] for s in selected] self.participant_dropdown.lineEdit().setText(", ".join(selected_short)) def update_image_index_dropdown_label(self): selected = self._get_checked_items(self.image_index_dropdown) if not selected: self.image_index_dropdown.lineEdit().setText("") else: # Only show the index part index_labels = [s.split(" ")[0] for s in selected] self.image_index_dropdown.lineEdit().setText(", ".join(index_labels)) def show_fold_images(self): import flares selected_display_names = self._get_checked_items(self.participant_dropdown) selected_file_paths = [] for display_name in selected_display_names: for fp, short_label in self.participant_map.items(): expected_display = f"{short_label} ({os.path.basename(fp)})" if display_name == expected_display: selected_file_paths.append(fp) break selected_indexes = [ int(s.split(" ")[0]) for s in self._get_checked_items(self.image_index_dropdown) ] # Pass the necessary arguments to each method for file_path in selected_file_paths: haemo_obj = self.haemo_dict.get(file_path) if haemo_obj is None: continue #cha = self.cha_dict.get(file_path) for idx in selected_indexes: if idx == 0: flares.fold_channels(haemo_obj) else: print(f"No method defined for index {idx}") class ExportDataAsCSVViewerWidget(QWidget): def __init__(self, haemo_dict, cha_dict, df_ind, design_matrix, group, contrast_results_dict): super().__init__() self.setWindowTitle("FLARES Export Data As CSV Viewer") self.haemo_dict = haemo_dict self.cha_dict = cha_dict self.df_ind = df_ind self.design_matrix = design_matrix self.group = group self.contrast_results_dict = contrast_results_dict # Create mappings: file_path -> participant label and dropdown display text self.participant_map = {} # file_path -> "Participant 1" self.participant_dropdown_items = [] # "Participant 1 (filename)" for i, file_path in enumerate(self.haemo_dict.keys(), start=1): short_label = f"Participant {i}" display_label = f"{short_label} ({os.path.basename(file_path)})" self.participant_map[file_path] = short_label self.participant_dropdown_items.append(display_label) self.layout = QVBoxLayout(self) self.top_bar = QHBoxLayout() self.layout.addLayout(self.top_bar) self.participant_dropdown = self._create_multiselect_dropdown(self.participant_dropdown_items) self.participant_dropdown.currentIndexChanged.connect(self.update_participant_dropdown_label) self.index_texts = [ "0 (Export Data to CSV)", # "1 (second image)", # "2 (third image)", # "3 (fourth image)", ] self.image_index_dropdown = self._create_multiselect_dropdown(self.index_texts) self.image_index_dropdown.currentIndexChanged.connect(self.update_image_index_dropdown_label) self.submit_button = QPushButton("Submit") self.submit_button.clicked.connect(self.generate_and_save_csv) self.top_bar.addWidget(QLabel("Participants:")) self.top_bar.addWidget(self.participant_dropdown) self.top_bar.addWidget(QLabel("Export Type:")) self.top_bar.addWidget(self.image_index_dropdown) self.top_bar.addWidget(self.submit_button) self.scroll = QScrollArea() self.scroll.setWidgetResizable(True) self.scroll_content = QWidget() self.grid_layout = QGridLayout(self.scroll_content) self.scroll.setWidget(self.scroll_content) self.layout.addWidget(self.scroll) self.thumb_size = QSize(280, 180) self.showMaximized() def _create_multiselect_dropdown(self, items): combo = FullClickComboBox() combo.setView(QListView()) model = QStandardItemModel() combo.setModel(model) combo.setEditable(True) combo.lineEdit().setReadOnly(True) combo.lineEdit().setPlaceholderText("Select...") dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(toggle_item) for item in items: standard_item = QStandardItem(item) standard_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) standard_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(standard_item) combo.setInsertPolicy(QComboBox.NoInsert) def on_view_clicked(index): item = model.itemFromIndex(index) if item.isCheckable(): new_state = Qt.Checked if item.checkState() == Qt.Unchecked else Qt.Unchecked item.setCheckState(new_state) combo.view().pressed.connect(on_view_clicked) self._updating_checkstates = False def on_item_changed(item): if self._updating_checkstates: return self._updating_checkstates = True normal_items = [model.item(i) for i in range(2, model.rowCount())] # skip dummy and toggle if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) if all_checked: for i in normal_items: i.setCheckState(Qt.Unchecked) toggle_item.setCheckState(Qt.Unchecked) else: for i in normal_items: i.setCheckState(Qt.Checked) toggle_item.setCheckState(Qt.Checked) elif item == dummy_item: pass else: # When normal items change, update toggle item all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) # Update label text immediately after change if combo == self.participant_dropdown: self.update_participant_dropdown_label() elif combo == self.image_index_dropdown: self.update_image_index_dropdown_label() self._updating_checkstates = False model.itemChanged.connect(on_item_changed) combo.setInsertPolicy(QComboBox.NoInsert) return combo def _get_checked_items(self, combo): checked = [] model = combo.model() for i in range(model.rowCount()): item = model.item(i) # Skip dummy and toggle items: if item.text() in ("", "Toggle Select All"): continue if item.checkState() == Qt.Checked: checked.append(item.text()) return checked def update_participant_dropdown_label(self): selected = self._get_checked_items(self.participant_dropdown) if not selected: self.participant_dropdown.lineEdit().setText("") else: # Extract just "Participant N" from "Participant N (filename)" selected_short = [s.split(" ")[0] + " " + s.split(" ")[1] for s in selected] self.participant_dropdown.lineEdit().setText(", ".join(selected_short)) def update_image_index_dropdown_label(self): selected = self._get_checked_items(self.image_index_dropdown) if not selected: self.image_index_dropdown.lineEdit().setText("") else: # Only show the index part index_labels = [s.split(" ")[0] for s in selected] self.image_index_dropdown.lineEdit().setText(", ".join(index_labels)) def generate_and_save_csv(self): selected_display_names = self._get_checked_items(self.participant_dropdown) selected_file_paths = [] for display_name in selected_display_names: for fp, short_label in self.participant_map.items(): expected_display = f"{short_label} ({os.path.basename(fp)})" if display_name == expected_display: selected_file_paths.append(fp) break selected_indexes = [ int(s.split(" ")[0]) for s in self._get_checked_items(self.image_index_dropdown) ] # Pass the necessary arguments to each method for file_path in selected_file_paths: haemo_obj = self.haemo_dict.get(file_path) if haemo_obj is None: continue cha = self.cha_dict.get(file_path) for idx in selected_indexes: if idx == 0: try: suggested_name = f"{file_path}.csv" # Open save dialog save_path, _ = QFileDialog.getSaveFileName( self, "Save SNIRF File As", suggested_name, "CSV Files (*.csv)" ) if not save_path: print("Save cancelled.") return if not save_path.lower().endswith(".csv"): save_path += ".csv" # Save the CSV here cha.to_csv(save_path) QMessageBox.information(self, "Success", "CSV file has been saved.") except Exception as e: QMessageBox.critical(self, "Error", f"Failed to update SNIRF file:\n{e}") else: print(f"No method defined for index {idx}") class ClickableLabel(QLabel): def __init__(self, full_pixmap: QPixmap, thumbnail_pixmap: QPixmap): super().__init__() self._pixmap_full = full_pixmap self.setPixmap(thumbnail_pixmap) self.setAlignment(Qt.AlignmentFlag.AlignCenter) self.setFixedSize(thumbnail_pixmap.size()) self.setStyleSheet("border: 1px solid gray; margin: 2px;") def mousePressEvent(self, event): viewer = QWidget() viewer.setWindowTitle("Expanded View") layout = QVBoxLayout(viewer) label = QLabel() label.setPixmap(self._pixmap_full) label.setAlignment(Qt.AlignmentFlag.AlignCenter) layout.addWidget(label) viewer.resize(1000, 800) viewer.show() self._expanded_viewer = viewer # keep reference alive class ParameterInputDialog(QDialog): def __init__(self, params_dict, parent=None): """ params_dict format: { idx: [ { "key": "p_val", "label": "Significance threshold P-value (e.g. 0.05)", "default": "0.05", "type": float, }, { "key": "graph_scale", "label": "Graph scale factor", "default": "1", "type": int, } ], ... } """ super().__init__(parent) self.setWindowTitle("Input Parameters") self.params_dict = params_dict self.inputs = {} # {(idx, param_key): QLineEdit} layout = QVBoxLayout(self) intro_label = QLabel( "Some methods require parameters to continue:\n" "Clicking OK will simply use default values if input is left empty." ) layout.addWidget(intro_label) for idx, param_list in params_dict.items(): full_text = param_list[0].get('full_text', f"Index [{idx}]") group_label = QLabel(f"{full_text} requires parameters:") group_label.setStyleSheet("font-weight: bold; margin-top: 10px;") layout.addWidget(group_label) for param_info in param_list: label = QLabel(param_info["label"]) layout.addWidget(label) line_edit = QLineEdit(self) line_edit.setPlaceholderText(str(param_info.get("default", ""))) layout.addWidget(line_edit) self.inputs[(idx, param_info["key"])] = line_edit # Buttons btn_layout = QHBoxLayout() ok_btn = QPushButton("OK", self) cancel_btn = QPushButton("Cancel", self) btn_layout.addWidget(ok_btn) btn_layout.addWidget(cancel_btn) layout.addLayout(btn_layout) ok_btn.clicked.connect(self.accept) cancel_btn.clicked.connect(self.reject) def get_values(self): """ Validate and return values dict in form: { idx: { param_key: value, ... }, ... } Returns None if validation fails (error dialog shown). """ values = {} for (idx, param_key), line_edit in self.inputs.items(): text = line_edit.text().strip() # Find param info dict param_info = None for p in self.params_dict[idx]: if p['key'] == param_key: param_info = p break if param_info is None: # This shouldn't happen, but just in case: self._show_error(f"Internal error: No param info for index {idx} key '{param_key}'") return None if not text: text = str(param_info.get('default', '')) param_type = param_info.get('type', str) try: if param_type == int: val = int(text) elif param_type == float: val = float(text) elif param_type == bool: # Convert common bool strings to bool val_lower = text.lower() if val_lower in ('true', '1', 'yes', 'y'): val = True elif val_lower in ('false', '0', 'no', 'n'): val = False else: raise ValueError(f"Invalid bool value: {text}") elif param_type == str: val = text else: val = text # fallback except (ValueError, TypeError): self._show_error( f"Invalid input for index {idx} parameter '{param_key}': '{text}'\n" f"Expected type: {param_type.__name__}" ) return None if idx not in values: values[idx] = {} values[idx][param_key] = val return values def _show_error(self, message): error_box = QMessageBox(self) error_box.setIcon(QMessageBox.Critical) error_box.setWindowTitle("Input Error") error_box.setText(message) error_box.exec_() class GroupViewerWidget(QWidget): def __init__(self, haemo_dict, cha, df_ind, design_matrix, contrast_results, group): super().__init__() self.setWindowTitle("FLARES Group Viewer") self.haemo_dict = haemo_dict self.cha = cha self.df_ind = df_ind self.design_matrix = design_matrix self.contrast_results = contrast_results self.group = group # Create mappings: file_path -> participant label and dropdown display text self.participant_map = {} # file_path -> "Participant 1" self.participant_dropdown_items = [] # "Participant 1 (filename)" for i, file_path in enumerate(self.haemo_dict.keys(), start=1): short_label = f"Participant {i}" display_label = f"{short_label} ({os.path.basename(file_path)})" self.participant_map[file_path] = short_label self.participant_dropdown_items.append(display_label) self.layout = QVBoxLayout(self) self.top_bar = QHBoxLayout() self.layout.addLayout(self.top_bar) self.group_to_paths = {} for file_path, group_name in self.group.items(): self.group_to_paths.setdefault(group_name, []).append(file_path) self.group_names = sorted(self.group_to_paths.keys()) self.group_dropdown = QComboBox() self.group_dropdown.addItem("") self.group_dropdown.addItems(self.group_names) self.group_dropdown.setCurrentIndex(0) self.group_dropdown.currentIndexChanged.connect(self.update_participant_list_for_group) self.participant_dropdown = self._create_multiselect_dropdown(self.participant_dropdown_items) self.participant_dropdown.currentIndexChanged.connect(self.update_participant_dropdown_label) self.participant_dropdown.setEnabled(False) self.event_dropdown = QComboBox() self.event_dropdown.addItem("") self.index_texts = [ "0 (GLM Results)", "1 (Significance)", "2 (Brain Activity Visualization)", # "3 (fourth image)", ] self.image_index_dropdown = self._create_multiselect_dropdown(self.index_texts) self.image_index_dropdown.currentIndexChanged.connect(self.update_image_index_dropdown_label) self.submit_button = QPushButton("Submit") self.submit_button.clicked.connect(self.show_brain_images) self.top_bar.addWidget(QLabel("Group:")) self.top_bar.addWidget(self.group_dropdown) self.top_bar.addWidget(QLabel("Participants:")) self.top_bar.addWidget(self.participant_dropdown) self.top_bar.addWidget(QLabel("Event:")) self.top_bar.addWidget(self.event_dropdown) self.top_bar.addWidget(QLabel("Image Indexes:")) self.top_bar.addWidget(self.image_index_dropdown) self.top_bar.addWidget(self.submit_button) self.scroll = QScrollArea() self.scroll.setWidgetResizable(True) self.scroll_content = QWidget() self.grid_layout = QGridLayout(self.scroll_content) self.scroll.setWidget(self.scroll_content) self.layout.addWidget(self.scroll) self.thumb_size = QSize(280, 180) self.showMaximized() def _create_multiselect_dropdown(self, items): combo = FullClickComboBox() combo.setView(QListView()) model = QStandardItemModel() combo.setModel(model) combo.setEditable(True) combo.lineEdit().setReadOnly(True) combo.lineEdit().setPlaceholderText("Select...") dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(toggle_item) for item in items: standard_item = QStandardItem(item) standard_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) standard_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(standard_item) combo.setInsertPolicy(QComboBox.NoInsert) def on_view_clicked(index): item = model.itemFromIndex(index) if item.isCheckable(): new_state = Qt.Checked if item.checkState() == Qt.Unchecked else Qt.Unchecked item.setCheckState(new_state) combo.view().pressed.connect(on_view_clicked) self._updating_checkstates = False def on_item_changed(item): if self._updating_checkstates: return self._updating_checkstates = True normal_items = [model.item(i) for i in range(2, model.rowCount())] # skip dummy and toggle if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) if all_checked: for i in normal_items: i.setCheckState(Qt.Unchecked) toggle_item.setCheckState(Qt.Unchecked) else: for i in normal_items: i.setCheckState(Qt.Checked) toggle_item.setCheckState(Qt.Checked) elif item == dummy_item: pass else: # When normal items change, update toggle item all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) # Update label text immediately after change if combo == self.participant_dropdown: self.update_participant_dropdown_label() elif combo == self.image_index_dropdown: self.update_image_index_dropdown_label() self._updating_checkstates = False model.itemChanged.connect(on_item_changed) combo.setInsertPolicy(QComboBox.NoInsert) return combo def _get_checked_items(self, combo): checked = [] model = combo.model() for i in range(model.rowCount()): item = model.item(i) # Skip dummy and toggle items: if item.text() in ("", "Toggle Select All"): continue if item.checkState() == Qt.Checked: checked.append(item.text()) return checked def update_participant_list_for_group(self): selected_group = self.group_dropdown.currentText() model = self.participant_dropdown.model() model.clear() self.participant_map.clear() # Add dummy and toggle select all items again dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(toggle_item) if selected_group == "": # Disable participant dropdown when no group selected self.participant_dropdown.setEnabled(False) self.update_participant_dropdown_label() return # Enable participant dropdown since a valid group is selected self.participant_dropdown.setEnabled(True) group_file_paths = self.group_to_paths.get(selected_group, []) for i, file_path in enumerate(group_file_paths, start=1): short_label = f"Participant {i}" display_label = f"{short_label} ({os.path.basename(file_path)})" self.participant_map[file_path] = short_label item = QStandardItem(display_label) item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(item) self._connect_select_all_toggle(toggle_item, model) self.update_participant_dropdown_label() def _connect_select_all_toggle(self, toggle_item, model): """Helper function to connect the Select All functionality.""" normal_items = [model.item(i) for i in range(2, model.rowCount())] # skip dummy and toggle def on_item_changed(item): if self._updating_checkstates: return self._updating_checkstates = True if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) if all_checked: for i in normal_items: i.setCheckState(Qt.Unchecked) toggle_item.setCheckState(Qt.Unchecked) else: for i in normal_items: i.setCheckState(Qt.Checked) toggle_item.setCheckState(Qt.Checked) else: # When normal items change, update toggle item all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) # Update label text immediately after change if self.participant_dropdown: self.update_participant_dropdown_label() self._updating_checkstates = False model.itemChanged.connect(on_item_changed) def update_participant_dropdown_label(self): selected = self._get_checked_items(self.participant_dropdown) if not selected: self.participant_dropdown.lineEdit().setText("") else: # Extract just "Participant N" from "Participant N (filename)" selected_short = [s.split(" ")[0] + " " + s.split(" ")[1] for s in selected] self.participant_dropdown.lineEdit().setText(", ".join(selected_short)) self._update_event_dropdown() def update_image_index_dropdown_label(self): selected = self._get_checked_items(self.image_index_dropdown) if not selected: self.image_index_dropdown.lineEdit().setText("") else: # Only show the index part index_labels = [s.split(" ")[0] for s in selected] self.image_index_dropdown.lineEdit().setText(", ".join(index_labels)) def _update_event_dropdown(self): selected_display_names = self._get_checked_items(self.participant_dropdown) selected_file_paths = [] for display_name in selected_display_names: for fp, short_label in self.participant_map.items(): expected_display = f"{short_label} ({os.path.basename(fp)})" if display_name == expected_display: selected_file_paths.append(fp) break if not selected_file_paths: self.event_dropdown.clear() self.event_dropdown.addItem("") return annotation_sets = [] for file_path in selected_file_paths: raw = self.haemo_dict.get(file_path) if raw is None or not hasattr(raw, "annotations"): continue annotations = set(raw.annotations.description) annotation_sets.append(annotations) if not annotation_sets: self.event_dropdown.clear() self.event_dropdown.addItem("") return shared_annotations = set.intersection(*annotation_sets) self.event_dropdown.clear() self.event_dropdown.addItem("") for ann in sorted(shared_annotations): self.event_dropdown.addItem(ann) def show_brain_images(self): import flares selected_event = self.event_dropdown.currentText() if selected_event == "": selected_event = None selected_display_names = self._get_checked_items(self.participant_dropdown) selected_file_paths = [] for display_name in selected_display_names: for fp, short_label in self.participant_map.items(): expected_display = f"{short_label} ({os.path.basename(fp)})" if display_name == expected_display: selected_file_paths.append(fp) break selected_indexes = [ int(s.split(" ")[0]) for s in self._get_checked_items(self.image_index_dropdown) ] if not selected_file_paths: print("No participants selected.") return # Only keep indexes 0 and 1 that need parameters parameterized_indexes = { 0: [ { "key": "lower_bound", "label": "Lower bound + ", "default": "-0.3", "type": float, # specify int here }, { "key": "upper_bound", "label": "Upper bound + ", "default": "0.8", "type": float, # specify int here } ], 1: [ { "key": "p_value", "label": "Significance threshold P-value (e.g. 0.05)", "default": "0.05", "type": float, }, { "key": "graph_bounds", "label": "Graph Upper/Lower Limit", "default": "3.0", "type": float, } ], 2: [ { "key": "show_optodes", "label": "Determine what is rendered above the brain. Valid values are 'sensors', 'labels', 'none', 'all'.", "default": "all", "type": str, }, { "key": "t_or_theta", "label": "Specify if t values or theta values should be plotted. Valid values are 't', 'theta'", "default": "theta", "type": str, }, { "key": "show_text", "label": "Display informative text on the top left corner. THIS DOES NOT WORK AND SHOULD BE LEFT AT FALSE", "default": "False", "type": bool, }, { "key": "brain_bounds", "label": "Graph Upper/Lower Limit", "default": "1.0", "type": float, } ], } # Inject full_text from index_texts for idx, params_list in parameterized_indexes.items(): full_text = self.index_texts[idx] if idx < len(self.index_texts) else f"{idx} (No label found)" for param_info in params_list: param_info["full_text"] = full_text indexes_needing_params = {idx: parameterized_indexes[idx] for idx in selected_indexes if idx in parameterized_indexes} param_values = {} if indexes_needing_params: dialog = ParameterInputDialog(indexes_needing_params, parent=self) if dialog.exec_() == QDialog.Accepted: param_values = dialog.get_values() if param_values is None: return else: return all_cha = pd.DataFrame() for file_path in selected_file_paths: haemo_obj = self.haemo_dict.get(file_path) if haemo_obj is None: continue cha_df = self.cha.get(file_path) if cha_df is not None: all_cha = pd.concat([all_cha, cha_df], ignore_index=True) # Pass the necessary arguments to each method file_path = selected_file_paths[0] p_haemo = self.haemo_dict.get(file_path) p_design_matrix = self.design_matrix.get(file_path) df_group = pd.DataFrame() if selected_file_paths: for file_path in selected_file_paths: df = self.df_ind.get(file_path) if df is not None: df_group = pd.concat([df_group, df], ignore_index=True) for idx in selected_indexes: if idx == 0: params = param_values.get(idx, {}) lower_bound = params.get("lower_bound", None) upper_bound = params.get("upper_bound", None) if lower_bound is None or upper_bound is None: print(f"Missing parameters for index {idx}, skipping.") continue flares.plot_fir_model_results(df_group, p_haemo, p_design_matrix, selected_event, lower_bound, upper_bound) elif idx == 1: params = param_values.get(idx, {}) p_val = params.get("p_value", None) graph_bounds = params.get("graph_bounds", None) if p_val is None or graph_bounds is None: print(f"Missing parameters for index {idx}, skipping.") continue all_contrasts = [] for fp in selected_file_paths: condition_dfs = self.contrast_results.get(fp, {}) if selected_event in condition_dfs: df = condition_dfs[selected_event].copy() df["ID"] = fp all_contrasts.append(df) if not all_contrasts: print("No contrast data found for selected participants and event.") return df_contrasts = pd.concat(all_contrasts, ignore_index=True) flares.run_second_level_analysis(df_contrasts, p_haemo, p_val, graph_bounds) elif idx == 2: params = param_values.get(idx, {}) show_optodes = params.get("show_optodes", None) t_or_theta = params.get("t_or_theta", None) show_text = params.get("show_text", None) brain_bounds = params.get("brain_bounds", None) if show_optodes is None or t_or_theta is None or show_text is None or brain_bounds is None: print(f"Missing parameters for index {idx}, skipping.") continue flares.brain_3d_visualization(haemo_obj, all_cha, selected_event, t_or_theta=t_or_theta, show_optodes=show_optodes, show_text=show_text, brain_bounds=brain_bounds) elif idx == 3: pass else: print(f"No method defined for index {idx}") class GroupBrainViewerWidget(QWidget): def __init__(self, haemo_dict, df_ind, design_matrix, group, contrast_results_dict): super().__init__() self.setWindowTitle("Group Brain Viewer") self.haemo_dict = haemo_dict self.df_ind = df_ind self.design_matrix = design_matrix self.group = group self.contrast_results_dict = contrast_results_dict self.group_to_paths = {} for file_path, group_name in self.group.items(): self.group_to_paths.setdefault(group_name, []).append(file_path) self.group_names = sorted(self.group_to_paths.keys()) self.layout = QVBoxLayout(self) self.top_bar = QHBoxLayout() self.layout.addLayout(self.top_bar) self.group_a_dropdown = QComboBox() self.group_a_dropdown.addItem("") self.group_a_dropdown.addItems(self.group_names) self.group_a_dropdown.currentIndexChanged.connect(self._update_group_a_options) self.group_b_dropdown = QComboBox() self.group_b_dropdown.addItem("") self.group_b_dropdown.addItems(self.group_names) self.group_b_dropdown.currentIndexChanged.connect(self._update_group_b_options) self.event_dropdown = QComboBox() self.event_dropdown.addItem("") self.participant_dropdown_a = self._create_multiselect_dropdown([]) self.participant_dropdown_a.lineEdit().setPlaceholderText("Select participants (Group A)") self.participant_dropdown_a.model().itemChanged.connect(self._on_participants_changed) self.participant_dropdown_b = self._create_multiselect_dropdown([]) self.participant_dropdown_b.lineEdit().setPlaceholderText("Select participants (Group B)") self.participant_dropdown_b.model().itemChanged.connect(self._on_participants_changed) self.index_texts = [ "0 (Contrast Image)", # "1 (3D Brain Contrast)", # "2 (third image)", # "3 (fourth image)", ] self.image_index_dropdown = self._create_multiselect_dropdown(self.index_texts) self.image_index_dropdown.currentIndexChanged.connect(self.update_image_index_dropdown_label) self.submit_button = QPushButton("Submit") self.submit_button.clicked.connect(self.show_brain_images) self.top_bar.addWidget(QLabel("Group A:")) self.top_bar.addWidget(self.group_a_dropdown) self.top_bar.addWidget(QLabel("Participants (Group A):")) self.top_bar.addWidget(self.participant_dropdown_a) self.top_bar.addWidget(QLabel("Group B:")) self.top_bar.addWidget(self.group_b_dropdown) self.top_bar.addWidget(QLabel("Participants (Group B):")) self.top_bar.addWidget(self.participant_dropdown_b) self.top_bar.addWidget(QLabel("Event:")) self.top_bar.addWidget(self.event_dropdown) self.top_bar.addWidget(QLabel("Image Indexes:")) self.top_bar.addWidget(self.image_index_dropdown) self.top_bar.addWidget(self.submit_button) self.scroll = QScrollArea() self.scroll.setWidgetResizable(True) self.scroll_content = QWidget() self.grid_layout = QGridLayout(self.scroll_content) self.scroll.setWidget(self.scroll_content) self.layout.addWidget(self.scroll) self.thumb_size = QSize(280, 180) self.showMaximized() def _update_group_b_options(self): selected = self.group_a_dropdown.currentText() self._refresh_group_dropdown(self.group_b_dropdown, exclude=selected) self._update_event_dropdown() group_b = self.group_b_dropdown.currentText() self.update_participant_list_for_group(group_b, self.participant_dropdown_b) def _update_group_a_options(self): selected = self.group_b_dropdown.currentText() self._refresh_group_dropdown(self.group_a_dropdown, exclude=selected) self._update_event_dropdown() group_a = self.group_a_dropdown.currentText() self.update_participant_list_for_group(group_a, self.participant_dropdown_a) def update_participant_list_for_group(self, group_name: str, participant_dropdown: FullClickComboBox): model = participant_dropdown.model() model.clear() # Maintain separate participant maps for A and B to avoid conflicts if participant_dropdown == self.participant_dropdown_a: participant_map = self.participant_map_a = {} else: participant_map = self.participant_map_b = {} # Add dummy and toggle select all items again dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(toggle_item) if group_name == "": participant_dropdown.setEnabled(False) self._update_participant_dropdown_label(participant_dropdown) return participant_dropdown.setEnabled(True) group_file_paths = self.group_to_paths.get(group_name, []) for i, file_path in enumerate(group_file_paths, start=1): short_label = f"Participant {i}" display_label = f"{short_label} ({os.path.basename(file_path)})" participant_map[file_path] = short_label item = QStandardItem(display_label) item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(item) self._connect_select_all_toggle(toggle_item, model) self._update_participant_dropdown_label(participant_dropdown) def _update_participant_dropdown_label(self, participant_dropdown): selected = self._get_checked_items(participant_dropdown) if not selected: participant_dropdown.lineEdit().setText("") else: # Extract just "Participant N" from "Participant N (filename)" selected_short = [s.split(" ")[0] + " " + s.split(" ")[1] for s in selected] participant_dropdown.lineEdit().setText(", ".join(selected_short)) self._update_event_dropdown() def _connect_select_all_toggle(self, toggle_item, model): normal_items = [model.item(i) for i in range(2, model.rowCount())] # skip dummy and toggle def on_item_changed(item): if getattr(self, "_updating_checkstates", False): return self._updating_checkstates = True if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) if all_checked: for i in normal_items: i.setCheckState(Qt.Unchecked) toggle_item.setCheckState(Qt.Unchecked) else: for i in normal_items: i.setCheckState(Qt.Checked) toggle_item.setCheckState(Qt.Checked) else: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) # Update label text for participant dropdowns if hasattr(self, 'participant_dropdown_a') and model == self.participant_dropdown_a.model(): self._update_participant_dropdown_label(self.participant_dropdown_a) elif hasattr(self, 'participant_dropdown_b') and model == self.participant_dropdown_b.model(): self._update_participant_dropdown_label(self.participant_dropdown_b) self._updating_checkstates = False model.itemChanged.connect(on_item_changed) def _on_participants_changed(self, item=None): self._update_event_dropdown() def _update_event_dropdown(self): participants_a = self._get_checked_items(self.participant_dropdown_a) participants_b = self._get_checked_items(self.participant_dropdown_b) if not participants_a or not participants_b: self.event_dropdown.clear() self.event_dropdown.addItem("") return selected_file_paths_a = [ fp for display_name in participants_a for fp, short_label in self.participant_map_a.items() if display_name == f"{short_label} ({os.path.basename(fp)})" ] selected_file_paths_b = [ fp for display_name in participants_b for fp, short_label in self.participant_map_b.items() if display_name == f"{short_label} ({os.path.basename(fp)})" ] all_selected_file_paths = set(selected_file_paths_a + selected_file_paths_b) if not all_selected_file_paths: self.event_dropdown.clear() self.event_dropdown.addItem("") return annotation_sets = [] for file_path in all_selected_file_paths: raw = self.haemo_dict.get(file_path) if raw is None or not hasattr(raw, "annotations"): continue annotation_sets.append(set(raw.annotations.description)) if not annotation_sets: self.event_dropdown.clear() self.event_dropdown.addItem("") return shared_annotations = set.intersection(*annotation_sets) self.event_dropdown.clear() self.event_dropdown.addItem("") for ann in sorted(shared_annotations): self.event_dropdown.addItem(ann) def _refresh_group_dropdown(self, dropdown, exclude): current = dropdown.currentText() dropdown.blockSignals(True) dropdown.clear() dropdown.addItem("") for group in self.group_names: if group != exclude: dropdown.addItem(group) # Restore previous selection if still valid if current != "" and current != exclude and dropdown.findText(current) != -1: dropdown.setCurrentText(current) else: dropdown.setCurrentIndex(0) # Reset to "" dropdown.blockSignals(False) def _create_multiselect_dropdown(self, items): combo = FullClickComboBox() combo.setView(QListView()) model = QStandardItemModel() combo.setModel(model) combo.setEditable(True) combo.lineEdit().setReadOnly(True) combo.lineEdit().setPlaceholderText("Select...") dummy_item = QStandardItem("") dummy_item.setFlags(Qt.ItemIsEnabled) model.appendRow(dummy_item) toggle_item = QStandardItem("Toggle Select All") toggle_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) toggle_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(toggle_item) for item in items: standard_item = QStandardItem(item) standard_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled) standard_item.setData(Qt.Unchecked, Qt.CheckStateRole) model.appendRow(standard_item) def on_view_clicked(index): item = model.itemFromIndex(index) if item.isCheckable(): new_state = Qt.Checked if item.checkState() == Qt.Unchecked else Qt.Unchecked item.setCheckState(new_state) combo.view().pressed.connect(on_view_clicked) self._updating_checkstates = False def on_item_changed(item): if self._updating_checkstates: return self._updating_checkstates = True normal_items = [model.item(i) for i in range(2, model.rowCount())] if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) for i in normal_items: i.setCheckState(Qt.Unchecked if all_checked else Qt.Checked) toggle_item.setCheckState(Qt.Unchecked if all_checked else Qt.Checked) toggle_item.setCheckState(Qt.Checked if all(i.checkState() == Qt.Checked for i in normal_items) else Qt.Unchecked) self.update_image_index_dropdown_label() self._updating_checkstates = False model.itemChanged.connect(on_item_changed) return combo def _get_checked_items(self, combo): checked = [] model = combo.model() for i in range(model.rowCount()): item = model.item(i) if item.text() in ("", "Toggle Select All"): continue if item.checkState() == Qt.Checked: checked.append(item.text()) return checked def update_image_index_dropdown_label(self): selected = self._get_checked_items(self.image_index_dropdown) if not selected: self.image_index_dropdown.lineEdit().setText("") else: index_labels = [s.split(" ")[0] for s in selected] self.image_index_dropdown.lineEdit().setText(", ".join(index_labels)) def _connect_select_all_toggle(self, toggle_item, model): """Helper function to connect the Select All functionality.""" normal_items = [model.item(i) for i in range(2, model.rowCount())] # skip dummy and toggle def on_item_changed(item): if self._updating_checkstates: return self._updating_checkstates = True if item == toggle_item: all_checked = all(i.checkState() == Qt.Checked for i in normal_items) if all_checked: for i in normal_items: i.setCheckState(Qt.Unchecked) toggle_item.setCheckState(Qt.Unchecked) else: for i in normal_items: i.setCheckState(Qt.Checked) toggle_item.setCheckState(Qt.Checked) else: # When normal items change, update toggle item all_checked = all(i.checkState() == Qt.Checked for i in normal_items) toggle_item.setCheckState(Qt.Checked if all_checked else Qt.Unchecked) # Update label text immediately after change if hasattr(self, 'participant_dropdown_a') and model == self.participant_dropdown_a.model(): self._update_participant_dropdown_label(self.participant_dropdown_a) elif hasattr(self, 'participant_dropdown_b') and model == self.participant_dropdown_b.model(): self._update_participant_dropdown_label(self.participant_dropdown_b) self._updating_checkstates = False model.itemChanged.connect(on_item_changed) def update_participant_dropdown_label(self): selected = self._get_checked_items(self.participant_dropdown) if not selected: self.participant_dropdown.lineEdit().setText("") else: # Extract just "Participant N" from "Participant N (filename)" selected_short = [s.split(" ")[0] + " " + s.split(" ")[1] for s in selected] self.participant_dropdown.lineEdit().setText(", ".join(selected_short)) def update_image_index_dropdown_label(self): selected = self._get_checked_items(self.image_index_dropdown) if not selected: self.image_index_dropdown.lineEdit().setText("") else: # Only show the index part index_labels = [s.split(" ")[0] for s in selected] self.image_index_dropdown.lineEdit().setText(", ".join(index_labels)) def _get_file_paths_from_labels(self, labels, group_name): file_paths = [] if group_name == self.group_a_dropdown.currentText(): participant_map = self.participant_map_a elif group_name == self.group_b_dropdown.currentText(): participant_map = self.participant_map_b else: return [] # Reverse map: display label -> file path reverse_map = { f"{label} ({os.path.basename(fp)})": fp for fp, label in participant_map.items() } for label in labels: file_path = reverse_map.get(label) if file_path: file_paths.append(file_path) return file_paths def show_brain_images(self): import flares selected_event = self.event_dropdown.currentText() if selected_event == "": selected_event = None # Group A participants_a = self._get_checked_items(self.participant_dropdown_a) file_paths_a = self._get_file_paths_from_labels(participants_a, self.group_a_dropdown.currentText()) # Group B participants_b = self._get_checked_items(self.participant_dropdown_b) file_paths_b = self._get_file_paths_from_labels(participants_b, self.group_b_dropdown.currentText()) selected_indexes = [ int(s.split(" ")[0]) for s in self._get_checked_items(self.image_index_dropdown) ] parameterized_indexes = { 0: [ { "key": "show_optodes", "label": "Determine what is rendered above the brain. Valid values are 'sensors', 'labels', 'none', 'all'.", "default": "all", "type": str, }, { "key": "t_or_theta", "label": "Specify if t values or theta values should be plotted. Valid values are 't', 'theta'", "default": "theta", "type": str, }, { "key": "show_text", "label": "Display informative text on the top left corner about the contrast.", "default": "True", "type": bool, }, { "key": "brain_bounds", "label": "Graph Upper/Lower Limit", "default": "1.0", "type": float, }, { "key": "is_3d", "label": "Should we display the results in a 3D interactive window?", "default": "True", "type": bool, } ], } # Inject full_text from index_texts for idx, params_list in parameterized_indexes.items(): full_text = self.index_texts[idx] if idx < len(self.index_texts) else f"{idx} (No label found)" for param_info in params_list: param_info["full_text"] = full_text indexes_needing_params = {idx: parameterized_indexes[idx] for idx in selected_indexes if idx in parameterized_indexes} param_values = {} if indexes_needing_params: dialog = ParameterInputDialog(indexes_needing_params, parent=self) if dialog.exec_() == QDialog.Accepted: param_values = dialog.get_values() if param_values is None: return else: return # Build group-level contrast DataFrames def concat_group_contrasts(file_paths: list[str], event: str | None) -> pd.DataFrame: group_df = pd.DataFrame() for fp in file_paths: print(f"Looking up contrast for: {fp}") event_con_dict = self.contrast_results_dict.get(fp, {}) print("Available events for this file:", list(event_con_dict.keys())) if event and event in event_con_dict: df = event_con_dict[event] print(f"Appending contrast df for event: {event}") group_df = pd.concat([group_df, df], ignore_index=True) else: print(f"Event '{event}' not found for {fp}") return group_df print("Selected event:", selected_event) print("File paths A:", file_paths_a) print("File paths B:", file_paths_b) contrast_df_a = concat_group_contrasts(file_paths_a, selected_event) contrast_df_b = concat_group_contrasts(file_paths_b, selected_event) print("contrast_df_a empty?", contrast_df_a.empty) print("contrast_df_b empty?", contrast_df_b.empty) # Get one person for their layout rep_raw = None for fp in file_paths_a + file_paths_b: rep_raw = self.haemo_dict.get(fp) if rep_raw: break print(rep_raw) # Visualizations for idx in selected_indexes: if idx == 0: params = param_values.get(idx, {}) show_optodes = params.get("show_optodes", None) t_or_theta = params.get("t_or_theta", None) show_text = params.get("show_text", None) brain_bounds = params.get("brain_bounds", None) is_3d = params.get("is_3d", None) if show_optodes is None or t_or_theta is None or show_text is None or brain_bounds is None or is_3d is None: print(f"Missing parameters for index {idx}, skipping.") continue if not contrast_df_a.empty and not contrast_df_b.empty and rep_raw: flares.plot_2d_3d_contrasts_between_groups( contrast_df_a, contrast_df_b, raw_haemo=rep_raw, group_a_name=self.group_a_dropdown.currentText(), group_b_name=self.group_b_dropdown.currentText(), is_3d=is_3d, t_or_theta=t_or_theta, show_optodes=show_optodes, show_text=show_text, brain_bounds=brain_bounds ) else: print("no") class ViewerLauncherWidget(QWidget): def __init__(self, haemo_dict, fig_bytes_dict, cha_dict, contrast_results_dict, df_ind, design_matrix, group): super().__init__() self.setWindowTitle("Viewer Launcher") layout = QVBoxLayout(self) btn1 = QPushButton("Open Participant Viewer") btn1.clicked.connect(lambda: self.open_participant_viewer(haemo_dict, fig_bytes_dict)) btn2 = QPushButton("Open Participant Brain Viewer") btn2.clicked.connect(lambda: self.open_participant_brain_viewer(haemo_dict, cha_dict)) btn3 = QPushButton("Open Participant Fold Channels Viewer") btn3.clicked.connect(lambda: self.open_participant_fold_channels_viewer(haemo_dict, cha_dict)) btn4 = QPushButton("Open Inter-Group Viewer") btn4.clicked.connect(lambda: self.open_group_viewer(haemo_dict, cha_dict, df_ind, design_matrix, contrast_results_dict, group)) btn5 = QPushButton("Open Cross Group Brain Viewer") btn5.clicked.connect(lambda: self.open_group_brain_viewer(haemo_dict, df_ind, design_matrix, group, contrast_results_dict)) btn6 = QPushButton("Open Export Data As CSV Viewer") btn6.clicked.connect(lambda: self.open_export_data_as_csv_viewer(haemo_dict, cha_dict, df_ind, design_matrix, group, contrast_results_dict)) layout.addWidget(btn1) layout.addWidget(btn2) layout.addWidget(btn3) layout.addWidget(btn4) layout.addWidget(btn5) layout.addWidget(btn6) def open_participant_viewer(self, haemo_dict, fig_bytes_dict): self.participant_viewer = ParticipantViewerWidget(haemo_dict, fig_bytes_dict) self.participant_viewer.show() def open_participant_brain_viewer(self, haemo_dict, cha_dict): self.participant_brain_viewer = ParticipantBrainViewerWidget(haemo_dict, cha_dict) self.participant_brain_viewer.show() def open_participant_fold_channels_viewer(self, haemo_dict, cha_dict): self.participant_fold_channels_viewer = ParticipantFoldChannelsWidget(haemo_dict, cha_dict) self.participant_fold_channels_viewer.show() def open_group_viewer(self, haemo_dict, cha_dict, df_ind, design_matrix, contrast_results_dict, group): self.participant_brain_viewer = GroupViewerWidget(haemo_dict, cha_dict, df_ind, design_matrix, contrast_results_dict, group) self.participant_brain_viewer.show() def open_group_brain_viewer(self, haemo_dict, df_ind, design_matrix, group, contrast_results_dict): self.participant_brain_viewer = GroupBrainViewerWidget(haemo_dict, df_ind, design_matrix, group, contrast_results_dict) self.participant_brain_viewer.show() def open_export_data_as_csv_viewer(self, haemo_dict, cha_dict, df_ind, design_matrix, group, contrast_results_dict): self.export_data_as_csv_viewer = ExportDataAsCSVViewerWidget(haemo_dict, cha_dict, df_ind, design_matrix, group, contrast_results_dict) self.export_data_as_csv_viewer.show() class MainApplication(QMainWindow): """ Main application window that creates and sets up the UI. """ progress_update_signal = Signal(str, int) def __init__(self): super().__init__() self.setWindowTitle("FLARES") self.setGeometry(100, 100, 1280, 720) self.about = None self.help = None self.optodes = None self.events = None self.terminal = None self.bubble_widgets = {} self.param_sections = [] self.folder_paths = [] self.section_widget = None self.first_run = True self.init_ui() self.create_menu_bar() self.platform_suffix = "-" + PLATFORM_NAME self.pending_update_version = None self.pending_update_path = None self.last_clicked_bubble = None self.installEventFilter(self) self.file_metadata = {} self.current_file = None # Start local pending update check thread self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix) self.local_check_thread.pending_update_found.connect(self.on_pending_update_found) self.local_check_thread.no_pending_update.connect(self.on_no_pending_update) self.local_check_thread.start() def init_ui(self): # Central widget and main horizontal layout central = QWidget() self.setCentralWidget(central) main_layout = QHBoxLayout() central.setLayout(main_layout) # Left container with vertical layout: top left + bottom left left_container = QWidget() left_layout = QVBoxLayout() left_container.setLayout(left_layout) left_container.setMinimumWidth(300) top_left_container = QGroupBox() top_left_container.setTitle("File information") top_left_container.setStyleSheet("QGroupBox { font-weight: bold; }") # Style if needed top_left_container.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Expanding) top_left_layout = QHBoxLayout() top_left_container.setLayout(top_left_layout) # QTextEdit with fixed height, but only 80% width self.top_left_widget = QTextEdit() self.top_left_widget.setReadOnly(True) self.top_left_widget.setPlaceholderText("Click a file below to get started! No files below? Open one using File > Open!") # Add QTextEdit to the layout with a stretch factor top_left_layout.addWidget(self.top_left_widget, stretch=4) # 80% # Create a vertical box layout for the right 20% self.right_column_widget = QWidget() right_column_layout = QVBoxLayout() self.right_column_widget.setLayout(right_column_layout) self.meta_fields = { "AGE": QLineEdit(), "GENDER": QLineEdit(), "GROUP": QLineEdit(), } for key, field in self.meta_fields.items(): label = QLabel(key.capitalize()) field.setPlaceholderText(f"Enter {key}") right_column_layout.addWidget(label) right_column_layout.addWidget(field) right_column_layout.addStretch() # Push fields to top self.right_column_widget.hide() # Add right column widget to the top-left layout (takes 20% width) top_left_layout.addWidget(self.right_column_widget, stretch=1) # Add top_left_container to the main left_layout left_layout.addWidget(top_left_container) # Bottom left: the bubbles inside the scroll area self.bubble_container = QWidget() self.bubble_layout = QGridLayout() self.bubble_layout.setAlignment(Qt.AlignmentFlag.AlignTop) self.bubble_container.setLayout(self.bubble_layout) self.scroll_area = QScrollArea() self.scroll_area.setWidgetResizable(True) self.scroll_area.setWidget(self.bubble_container) self.scroll_area.setMinimumHeight(300) # Add top left and bottom left to left layout left_layout.addWidget(self.scroll_area) self.progress_update_signal.connect(self.update_file_progress) # Right widget (full height on right side) self.right_container = QWidget() right_container_layout = QVBoxLayout() self.right_container.setLayout(right_container_layout) # Content widget inside scroll area self.right_content_widget = QWidget() right_content_layout = QVBoxLayout() self.right_content_widget.setLayout(right_content_layout) # Option selector dropdown self.option_selector = QComboBox() self.option_selector.addItems(["FIR"]) right_content_layout.addWidget(self.option_selector) # Container for the sections self.rows_container = QWidget() self.rows_layout = QVBoxLayout() self.rows_layout.setSpacing(10) self.rows_container.setLayout(self.rows_layout) right_content_layout.addWidget(self.rows_container) # Spacer at bottom inside scroll area content to push content up right_content_layout.addStretch() # Scroll area for the right side content self.right_scroll_area = QScrollArea() self.right_scroll_area.setWidgetResizable(True) self.right_scroll_area.setWidget(self.right_content_widget) # Buttons widget (fixed below the scroll area) buttons_widget = QWidget() buttons_layout = QHBoxLayout() buttons_widget.setLayout(buttons_layout) buttons_layout.addStretch() self.button1 = QPushButton("Process") self.button2 = QPushButton("Clear") self.button3 = QPushButton("Analysis") buttons_layout.addWidget(self.button1) buttons_layout.addWidget(self.button2) buttons_layout.addWidget(self.button3) self.button1.setMinimumSize(100, 40) self.button2.setMinimumSize(100, 40) self.button3.setMinimumSize(100, 40) self.button1.setVisible(False) self.button3.setVisible(False) self.button1.clicked.connect(self.on_run_task) self.button2.clicked.connect(self.clear_all) self.button3.clicked.connect(self.open_launcher_window) # Add scroll area and buttons widget to right container layout right_container_layout.addWidget(self.right_scroll_area) right_container_layout.addWidget(buttons_widget) # Add left and right containers to main layout main_layout.addWidget(left_container, stretch=55) main_layout.addWidget(self.right_container, stretch=45) # Set size policy to expand self.right_container.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) self.right_scroll_area.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) # Store ParamSection widgets self.option_selector.currentIndexChanged.connect(self.update_sections) # Initial build self.update_sections(0) def create_menu_bar(self): '''Menu Bar at the top of the screen''' menu_bar = self.menuBar() def make_action(name, shortcut=None, slot=None, checkable=False, checked=False, icon=None): action = QAction(name, self) if shortcut: action.setShortcut(QKeySequence(shortcut)) if slot: action.triggered.connect(slot) if checkable: action.setCheckable(True) action.setChecked(checked) if icon: action.setIcon(QIcon(icon)) return action # File menu and actions file_menu = menu_bar.addMenu("File") file_actions = [ ("Open File...", "Ctrl+O", self.open_file_dialog, resource_path("icons/file_open_24dp_1F1F1F.svg")), ("Open Folder...", "Ctrl+Alt+O", self.open_folder_dialog, resource_path("icons/folder_24dp_1F1F1F.svg")), ("Open Folders...", "Ctrl+Shift+O", self.open_multiple_folders_dialog, resource_path("icons/folder_copy_24dp_1F1F1F.svg")), ("Load Project...", "Ctrl+L", self.load_project, resource_path("icons/article_24dp_1F1F1F.svg")), ("Save Project...", "Ctrl+S", self.save_project, resource_path("icons/save_24dp_1F1F1F.svg")), ("Save Project As...", "Ctrl+Shift+S", self.save_project, resource_path("icons/save_as_24dp_1F1F1F.svg")), ] for i, (name, shortcut, slot, icon) in enumerate(file_actions): file_menu.addAction(make_action(name, shortcut, slot, icon=icon)) if i == 2: # after the first 3 actions (0,1,2) file_menu.addSeparator() file_menu.addSeparator() file_menu.addAction(make_action("Exit", "Ctrl+Q", QApplication.instance().quit, icon=resource_path("icons/exit_to_app_24dp_1F1F1F.svg"))) # Edit menu edit_menu = menu_bar.addMenu("Edit") edit_actions = [ ("Cut", "Ctrl+X", self.cut_text, resource_path("icons/content_cut_24dp_1F1F1F.svg")), ("Copy", "Ctrl+C", self.copy_text, resource_path("icons/content_copy_24dp_1F1F1F.svg")), ("Paste", "Ctrl+V", self.paste_text, resource_path("icons/content_paste_24dp_1F1F1F.svg")) ] for name, shortcut, slot, icon in edit_actions: edit_menu.addAction(make_action(name, shortcut, slot, icon=icon)) # View menu view_menu = menu_bar.addMenu("View") toggle_statusbar_action = make_action("Toggle Status Bar", checkable=True, checked=True, slot=None) view_menu.addAction(toggle_statusbar_action) # Options menu (Help & About) options_menu = menu_bar.addMenu("Options") options_actions = [ ("User Guide", "F1", self.user_guide, resource_path("icons/help_24dp_1F1F1F.svg")), ("Check for Updates", "F5", self.manual_check_for_updates, resource_path("icons/update_24dp_1F1F1F.svg")), ("Update optodes in snirf file...", "F6", self.update_optode_positions, resource_path("icons/upgrade_24dp_1F1F1F.svg")), ("Update events in snirf file...", "F7", self.update_event_markers, resource_path("icons/upgrade_24dp_1F1F1F.svg")), ("About", "F12", self.about_window, resource_path("icons/info_24dp_1F1F1F.svg")) ] for i, (name, shortcut, slot, icon) in enumerate(options_actions): options_menu.addAction(make_action(name, shortcut, slot, icon=icon)) if i == 1 or i == 3: # after the first 2 actions (0,1) options_menu.addSeparator() terminal_menu = menu_bar.addMenu("Terminal") terminal_actions = [ ("New Terminal", "Ctrl+Alt+T", self.terminal_gui, resource_path("icons/terminal_24dp_1F1F1F.svg")), ] for name, shortcut, slot, icon in terminal_actions: terminal_menu.addAction(make_action(name, shortcut, slot, icon=icon)) self.statusbar = self.statusBar() self.statusbar.showMessage("Ready") def update_sections(self, index): # Clear previous sections for i in reversed(range(self.rows_layout.count())): widget = self.rows_layout.itemAt(i).widget() if widget is not None: widget.deleteLater() self.param_sections.clear() # Add ParamSection widgets from SECTIONS for section in SECTIONS: self.section_widget = ParamSection(section) self.rows_layout.addWidget(self.section_widget) self.param_sections.append(self.section_widget) def clear_all(self): self.cancel_task() self.right_column_widget.hide() # Clear the bubble layout while self.bubble_layout.count(): item = self.bubble_layout.takeAt(0) widget = item.widget() if widget: widget.deleteLater() # Clear file data self.bubble_widgets.clear() self.statusBar().clearMessage() self.raw_haemo_dict = None self.epochs_dict = None self.fig_bytes_dict = None self.cha_dict = None self.contrast_results_dict = None self.df_ind_dict = None self.design_matrix_dict = None self.age_dict = None self.gender_dict = None self.group_dict = None self.valid_dict = None # Reset any visible UI elements self.button1.setVisible(False) self.button3.setVisible(False) self.top_left_widget.clear() def open_launcher_window(self): self.launcher_window = ViewerLauncherWidget(self.raw_haemo_dict, self.fig_bytes_dict, self.cha_dict, self.contrast_results_dict, self.df_ind_dict, self.design_matrix_dict, self.group_dict) self.launcher_window.show() def copy_text(self): self.top_left_widget.copy() # Trigger copy self.statusbar.showMessage("Copied to clipboard") # Show status message def cut_text(self): self.top_left_widget.cut() # Trigger cut self.statusbar.showMessage("Cut to clipboard") # Show status message def paste_text(self): self.top_left_widget.paste() # Trigger paste self.statusbar.showMessage("Pasted from clipboard") # Show status message def about_window(self): if self.about is None or not self.about.isVisible(): self.about = AboutWindow(self) self.about.show() def user_guide(self): if self.help is None or not self.help.isVisible(): self.help = UserGuideWindow(self) self.help.show() def terminal_gui(self): if self.terminal is None or not self.terminal.isVisible(): self.terminal = TerminalWindow(self) self.terminal.show() def update_optode_positions(self): if self.optodes is None or not self.optodes.isVisible(): self.optodes = UpdateOptodesWindow(self) self.optodes.show() def update_event_markers(self): if self.events is None or not self.events.isVisible(): self.events = UpdateEventsWindow(self) self.events.show() def open_file_dialog(self): file_path, _ = QFileDialog.getOpenFileName( self, "Open File", "", "All Files (*);;Text Files (*.txt)" ) if file_path: self.selected_path = file_path # store the file path self.show_files_as_bubbles(file_path) self.button1.setVisible(True) def open_folder_dialog(self): folder_path = QFileDialog.getExistingDirectory( self, "Select Folder", "" ) if folder_path: self.selected_path = folder_path # store the folder path self.show_files_as_bubbles(folder_path) for section_widget in self.param_sections: if "REMOVE_EVENTS" in section_widget.widgets: section_widget.update_annotation_dropdown_from_folder(folder_path) break else: print("[MainWindow] Could not find ParamSection with 'REMOVE_EVENTS' widget") self.button1.setVisible(True) def open_multiple_folders_dialog(self): while True: folder = QFileDialog.getExistingDirectory(self, "Select Folder") if not folder: break if not hasattr(self, 'selected_paths'): self.selected_paths = [] if folder not in self.selected_paths: self.selected_paths.append(folder) self.show_files_as_bubbles(self.selected_paths) # Ask if the user wants to add another more = QMessageBox.question( self, "Add Another?", "Do you want to select another folder?", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, ) if more == QMessageBox.StandardButton.No: break self.button1.setVisible(True) def save_project(self): filename, _ = QFileDialog.getSaveFileName( self, "Save Project", "", "FLARE Project (*.flare)" ) if not filename: return try: # Ensure the filename has the proper extension if not filename.endswith(".flare"): filename += ".flare" project_path = Path(filename).resolve() project_dir = project_path.parent file_list = [ str(PurePosixPath(Path(bubble.file_path).resolve().relative_to(project_dir))) for bubble in self.bubble_widgets.values() ] progress_states = { str(PurePosixPath(Path(bubble.file_path).resolve().relative_to(project_dir))): bubble.current_step for bubble in self.bubble_widgets.values() } project_data = { "file_list": file_list, "progress_states": progress_states, "raw_haemo_dict": self.raw_haemo_dict, "epochs_dict": self.epochs_dict, "fig_bytes_dict": self.fig_bytes_dict, "cha_dict": self.cha_dict, "contrast_results_dict": self.contrast_results_dict, "df_ind_dict": self.df_ind_dict, "design_matrix_dict": self.design_matrix_dict, "age_dict": self.age_dict, "gender_dict": self.gender_dict, "group_dict": self.group_dict, "valid_dict": self.valid_dict, } def sanitize(obj): if isinstance(obj, Path): return str(PurePosixPath(obj)) elif isinstance(obj, dict): return {sanitize(k): sanitize(v) for k, v in obj.items()} elif isinstance(obj, list): return [sanitize(i) for i in obj] return obj project_data = sanitize(project_data) with open(filename, "wb") as f: pickle.dump(project_data, f) QMessageBox.information(self, "Success", f"Project saved to:\n{filename}") except Exception as e: QMessageBox.critical(self, "Error", f"Failed to save project:\n{e}") def load_project(self): filename, _ = QFileDialog.getOpenFileName( self, "Load Project", "", "FLARE Project (*.flare)" ) if not filename: return try: with open(filename, "rb") as f: data = pickle.load(f) self.raw_haemo_dict = data.get("raw_haemo_dict", {}) self.epochs_dict = data.get("epochs_dict", {}) self.fig_bytes_dict = data.get("fig_bytes_dict", {}) self.cha_dict = data.get("cha_dict", {}) self.contrast_results_dict = data.get("contrast_results_dict", {}) self.df_ind_dict = data.get("df_ind_dict", {}) self.design_matrix_dict = data.get("design_matrix_dict", {}) self.age_dict = data.get("age_dict", {}) self.gender_dict = data.get("gender_dict", {}) self.group_dict = data.get("group_dict", {}) self.valid_dict = data.get("valid_dict", {}) project_dir = Path(filename).parent # Convert saved relative paths to absolute paths file_list = [str((project_dir / Path(rel_path)).resolve()) for rel_path in data["file_list"]] # Also resolve progress_states with updated paths raw_progress = data.get("progress_states", {}) progress_states = { str((project_dir / Path(rel_path)).resolve()): step for rel_path, step in raw_progress.items() } self.show_files_as_bubbles_from_list(file_list, progress_states, filename) # Re-enable buttons # self.button1.setVisible(True) self.button3.setVisible(True) QMessageBox.information(self, "Loaded", f"Project loaded from:\n{filename}") except Exception as e: QMessageBox.critical(self, "Error", f"Failed to load project:\n{e}") def show_files_as_bubbles(self, folder_paths): if isinstance(folder_paths, str): folder_paths = [folder_paths] # Clear previous bubbles while self.bubble_layout.count(): item = self.bubble_layout.takeAt(0) widget = item.widget() if widget: widget.deleteLater() temp_bubble = ProgressBubble("Test Bubble", "") # A dummy bubble for measurement temp_bubble.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy. Preferred) # temp_bubble.setAttribute(Qt.WA_OpaquePaintEvent) # Improve rendering? temp_bubble.adjustSize() # Adjust size after the widget is created bubble_width = temp_bubble.width() # Get the actual width of a bubble available_width = self.bubble_container.width() cols = max(1, available_width // bubble_width) # Ensure at least 1 column index = 0 for folder_path in folder_paths: if not os.path.isdir(folder_path): continue files = os.listdir(folder_path) files = [f for f in files if os.path.isfile(os.path.join(folder_path, f))] for filename in files: full_path = os.path.join(folder_path, filename) display_name = f"{os.path.basename(folder_path)} / {filename}" bubble = ProgressBubble(display_name, full_path) bubble.clicked.connect(self.on_bubble_clicked) self.bubble_widgets[filename] = bubble row = index // cols col = index % cols self.bubble_layout.addWidget(bubble, row, col) index += 1 self.statusBar().showMessage(f"{index} file(s) loaded from: {', '.join(folder_paths)}") def show_files_as_bubbles_from_list(self, file_list, progress_states=None, filenames=None): progress_states = progress_states or {} # Clear old while self.bubble_layout.count(): item = self.bubble_layout.takeAt(0) widget = item.widget() if widget: widget.deleteLater() self.bubble_widgets = {} temp_bubble = ProgressBubble("Test Bubble Test Bubble", "") # A dummy bubble for measurement temp_bubble.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Preferred) # temp_bubble.setAttribute(Qt.WA_OpaquePaintEvent) # Improves rendering? temp_bubble.adjustSize() # Adjust size after the widget is created bubble_width = temp_bubble.width() # Get the actual width of a bubble available_width = self.bubble_container.width() cols = max(1, available_width // bubble_width) # Ensure at least 1 column index = 0 for index, file_path in enumerate(file_list): filename = os.path.basename(file_path) display_name = f"{os.path.basename(os.path.dirname(file_path))} / {filename}" # Create bubble with full path bubble = ProgressBubble(display_name, file_path) bubble.clicked.connect(self.on_bubble_clicked) self.bubble_widgets[file_path] = bubble step = progress_states.get(file_path, 0) bubble.update_progress(step) row = index // cols col = index % cols self.bubble_layout.addWidget(bubble, row, col) self.statusBar().showMessage(f"{len(file_list)} files loaded from from {os.path.abspath(filenames)}.") def get_suffix_from_meta_fields(self): parts = [] for key, line_edit in self.meta_fields.items(): val = line_edit.text().strip() if val: parts.append(f"{key}: {val}") return ", ".join(parts) def on_bubble_clicked(self, bubble): if self.current_file: self.save_metadata(self.current_file) if self.last_clicked_bubble and self.last_clicked_bubble != bubble: suffix = self.get_suffix_from_meta_fields() self.last_clicked_bubble.setSuffixText(suffix) self.last_clicked_bubble = bubble # show age / gender / group self.right_column_widget.show() file_path = bubble.file_path if not os.path.exists(file_path): self.top_left_widget.setText("File not found.") return size = os.path.getsize(file_path) created = time.ctime(os.path.getctime(file_path)) modified = time.ctime(os.path.getmtime(file_path)) snirf_info = self.get_snirf_metadata_mne(file_path) info = f"""\ File: {os.path.basename(file_path)} Size: {size:,} bytes Created: {created} Modified: {modified} Full Path: {file_path} """ if snirf_info is None: info += f"\nSNIRF Metadata could not be loaded!" else: info += "\nSNIRF Metadata:\n" for k, v in snirf_info.items(): if isinstance(v, list): info += f" {k}:\n" for item in v: info += f" - {item}\n" else: info += f" {k}: {v}\n" self.top_left_widget.setText(info) clicked_bubble = self.sender() file_path = clicked_bubble.file_path # Save current file's metadata if self.current_file: self.save_metadata(self.current_file) # Update current file self.current_file = file_path if file_path not in self.file_metadata: self.file_metadata[file_path] = {key: "" for key in self.meta_fields} # Load new file's metadata into the fields metadata = self.file_metadata.get(file_path, {}) for key, field in self.meta_fields.items(): field.blockSignals(True) field.setText(metadata.get(key, "")) field.blockSignals(False) def eventFilter(self, watched, event): if event.type() == QEvent.Type.MouseButtonPress: widget = self.childAt(event.pos()) if isinstance(widget, ProgressBubble): pass else: if self.last_clicked_bubble: if not self.last_clicked_bubble.isAncestorOf(widget): if self.current_file: self.save_metadata(self.current_file) suffix = self.get_suffix_from_meta_fields() self.last_clicked_bubble.setSuffixText(suffix) self.last_clicked_bubble = None return super().eventFilter(watched, event) def placeholder(self): QMessageBox.information(self, "Placeholder", "This feature is not implemented yet.") def save_metadata(self, file_path): if not file_path: return self.file_metadata[file_path] = { key: field.text() for key, field in self.meta_fields.items() } def get_all_metadata(self): # First, make sure current file's edits are saved for field in self.meta_fields.values(): field.clearFocus() # Save current file's metadata if self.current_file: self.save_metadata(self.current_file) return self.file_metadata def cancel_task(self): self.button1.clicked.disconnect(self.cancel_task) self.button1.setText("Stopping...") if hasattr(self, "result_process") and self.result_process.is_alive(): parent = psutil.Process(self.result_process.pid) children = parent.children(recursive=True) for child in children: try: child.kill() except psutil.NoSuchProcess: pass self.result_process.terminate() self.result_process.join() if hasattr(self, "result_timer") and self.result_timer.isActive(): self.result_timer.stop() # if hasattr(self, "result_process") and self.result_process.is_alive(): # self.result_process.terminate() # Forcefully terminate the process # self.result_process.join() # Wait for it to properly close # # Stop the QTimer if running # if hasattr(self, "result_timer") and self.result_timer.isActive(): # self.result_timer.stop() for bubble in self.bubble_widgets.values(): bubble.mark_cancelled() self.statusbar.showMessage("Processing cancelled.") self.button1.clicked.connect(self.on_run_task) self.button1.setText("Process") '''MODULE FILE''' def on_run_task(self): self.button1.clicked.disconnect(self.on_run_task) self.button1.setText("Cancel") self.button1.clicked.connect(self.cancel_task) if not self.first_run: for bubble in self.bubble_widgets.values(): bubble.mark_cancelled() self.first_run = False # Collect all selected snirf files in a flat list snirf_files = [] if hasattr(self, "selected_paths") and self.selected_paths: for path in self.selected_paths: p = Path(path) if p.is_dir(): snirf_files += [str(f) for f in p.glob("*.snirf")] elif p.is_file() and p.suffix == ".snirf": snirf_files.append(str(p)) elif hasattr(self, "selected_path") and self.selected_path: p = Path(self.selected_path) if p.is_dir(): snirf_files += [str(f) for f in p.glob("*.snirf")] elif p.is_file() and p.suffix == ".snirf": snirf_files.append(str(p)) else: raise ValueError("No file(s) selected") if not snirf_files: raise ValueError("No .snirf files found in selection") all_params = {} for section_widget in self.param_sections: section_params = section_widget.get_param_values() all_params.update(section_params) collected_data = { "SNIRF_FILES": snirf_files, "PARAMS": all_params, # add this line "METADATA": self.get_all_metadata(), # optionally add metadata if needed } # Start processing if current_process().name == 'MainProcess': self.manager = Manager() self.result_queue = self.manager.Queue() self.progress_queue = self.manager.Queue() self.result_process = Process( target=run_gui_entry_wrapper, args=(collected_data, self.result_queue, self.progress_queue) ) self.result_process.daemon = False self.result_process.start() self.statusbar.showMessage("Running processing in background...") self.result_timer = QTimer() self.result_timer.timeout.connect(self.check_for_pipeline_results) self.result_timer.start() self.statusbar.showMessage("Task started in separate process.") def check_for_pipeline_results(self): while not self.result_queue.empty(): msg = self.result_queue.get() if isinstance(msg, dict): if msg.get("success"): results = msg["result"] # from flares.py # Initialize storage # TODO: Is this check needed? if not hasattr(self, 'raw_haemo_dict'): self.raw_haemo_dict = {} self.epochs_dict = {} self.fig_bytes_dict = {} self.cha_dict = {} self.contrast_results_dict = {} self.df_ind_dict = {} self.design_matrix_dict = {} self.age_dict = {} self.gender_dict = {} self.group_dict = {} self.valid_dict = {} # Combine all results into the dicts for file_path, (raw_haemo, epochs, fig_bytes, cha, contrast_results, df_ind, design_matrix, age, gender, group, valid) in results.items(): self.raw_haemo_dict[file_path] = raw_haemo self.epochs_dict[file_path] = epochs self.fig_bytes_dict[file_path] = fig_bytes self.cha_dict[file_path] = cha self.contrast_results_dict[file_path] = contrast_results self.df_ind_dict[file_path] = df_ind self.design_matrix_dict[file_path] = design_matrix self.age_dict[file_path] = age self.gender_dict[file_path] = gender self.group_dict[file_path] = group self.valid_dict[file_path] = valid # self.statusbar.showMessage(f"Processing complete! Time elapsed: {elapsed_time:.2f} seconds") self.statusbar.showMessage(f"Processing complete!") self.button3.setVisible(True) else: error_msg = msg.get("error", "Unknown error") print("Error during processing:", error_msg) self.statusbar.showMessage(f"Processing failed! {error_msg}") self.result_timer.stop() self.cleanup_after_process() return elif isinstance(msg, tuple) and msg[0] == 'progress': _, file_path, step_index = msg file_name = os.path.basename(file_path) # extract file name self.progress_update_signal.emit(file_name, step_index) def cleanup_after_process(self): if hasattr(self, 'result_process'): self.result_process.join(timeout=0) if self.result_process.is_alive(): self.result_process.terminate() self.result_process.join() if hasattr(self, 'result_queue'): if 'AutoProxy' in repr(self.result_queue): pass else: self.result_queue.close() self.result_queue.join_thread() if hasattr(self, 'progress_queue'): if 'AutoProxy' in repr(self.progress_queue): pass else: self.progress_queue.close() self.progress_queue.join_thread() # Shutdown manager to kill its server process and clean up if hasattr(self, 'manager'): self.manager.shutdown() def update_file_progress(self, filename, step_index): bubble = self.bubble_widgets.get(filename) if bubble: bubble.update_progress(step_index) '''UPDATER''' def manual_check_for_updates(self): self.local_check_thread = LocalPendingUpdateCheckThread(CURRENT_VERSION, self.platform_suffix) self.local_check_thread.pending_update_found.connect(self.on_pending_update_found) self.local_check_thread.no_pending_update.connect(self.on_no_pending_update) self.local_check_thread.start() def on_pending_update_found(self, version, folder_path): self.statusBar().showMessage(f"Pending update found: version {version}") self.pending_update_version = version self.pending_update_path = folder_path self.show_pending_update_popup() def on_no_pending_update(self): # No pending update found locally, start server check directly self.statusBar().showMessage("No pending local update found. Checking server...") self.start_update_check_thread() def show_pending_update_popup(self): msg_box = QMessageBox(self) msg_box.setWindowTitle("Pending Update Found") msg_box.setText(f"A previously downloaded update (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?") install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) msg_box.exec() if msg_box.clickedButton() == install_now_button: self.install_update(self.pending_update_path) else: self.statusBar().showMessage("Pending update available. Install later.") # After user dismisses, still check the server for new updates self.start_update_check_thread() def start_update_check_thread(self): self.check_thread = UpdateCheckThread() self.check_thread.download_requested.connect(self.on_server_update_requested) self.check_thread.no_update_available.connect(self.on_server_no_update) self.check_thread.error_occurred.connect(self.on_error) self.check_thread.start() def on_server_no_update(self): self.statusBar().showMessage("No new updates found on server.", 5000) def on_server_update_requested(self, download_url, latest_version): if self.pending_update_version: cmp = self.version_compare(latest_version, self.pending_update_version) if cmp > 0: # Server version is newer than pending update self.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...") try: shutil.rmtree(self.pending_update_path) self.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}") except Exception as e: self.statusBar().showMessage(f"Failed to delete old update folder: {e}") # Clear pending update info so new download proceeds self.pending_update_version = None self.pending_update_path = None # Download the new update self.download_update(download_url, latest_version) elif cmp == 0: # Versions equal, no download needed self.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.") else: # Server version older than pending? Unlikely but just keep pending update self.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.") else: # No pending update, just download self.download_update(download_url, latest_version) def download_update(self, download_url, latest_version): self.statusBar().showMessage("Downloading update...") self.download_thread = UpdateDownloadThread(download_url, latest_version) self.download_thread.update_ready.connect(self.on_update_ready) self.download_thread.error_occurred.connect(self.on_error) self.download_thread.start() def on_update_ready(self, latest_version, extract_folder): self.statusBar().showMessage("Update downloaded and extracted.") msg_box = QMessageBox(self) msg_box.setWindowTitle("Update Ready") msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?") install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) msg_box.exec() if msg_box.clickedButton() == install_now_button: self.install_update(extract_folder) else: self.statusBar().showMessage("Update ready. Install later.") def install_update(self, extract_folder): # Path to updater executable if PLATFORM_NAME == 'windows': updater_path = os.path.join(os.getcwd(), "flares_updater.exe") elif PLATFORM_NAME == 'darwin': if getattr(sys, 'frozen', False): updater_path = os.path.join(os.path.dirname(sys.executable), "../../../flares_updater.app") else: updater_path = os.path.join(os.getcwd(), "../flares_updater.app") elif PLATFORM_NAME == 'linux': updater_path = os.path.join(os.getcwd(), "flares_updater") else: updater_path = os.getcwd() if not os.path.exists(updater_path): QMessageBox.critical(self, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}") return # Launch updater with extracted folder path as argument try: # Pass current app's executable path for updater to relaunch main_app_executable = os.path.abspath(sys.argv[0]) print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"') if PLATFORM_NAME == 'darwin': subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable]) else: subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path)) # Close the current app so updater can replace files sys.exit(0) except Exception as e: QMessageBox.critical(self, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}") def on_error(self, message): # print(f"Error: {message}") self.statusBar().showMessage(f"Error occurred during update process. {message}") def version_compare(self, v1, v2): def normalize(v): return [int(x) for x in v.split(".")] return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) def get_snirf_metadata_mne(self, file_name): try: raw = read_raw_snirf(file_name, preload=True) snirf_info = {} # Measurement date snirf_info['Measurement Date'] = str(raw.info.get('meas_date')) # Source-detector distances distances = source_detector_distances(raw.info) distance_info = [] for ch_name, dist in zip(raw.info['ch_names'], distances): distance_info.append(f"{ch_name}: {dist:.4f} m") snirf_info['Source-Detector Distances'] = distance_info # Digitization points / optode positions dig = raw.info.get('dig', None) if dig is not None: dig_info = [] for point in dig: kind = point['kind'] ident = point['ident'] coord = point['r'] dig_info.append(f"Kind: {kind}, ID: {ident}, Coord: {coord}") snirf_info['Digitization Points'] = dig_info else: snirf_info['Digitization Points'] = "Not found" if raw.annotations is not None and len(raw.annotations) > 0: annot_info = [] for onset, duration, desc in zip(raw.annotations.onset, raw.annotations.duration, raw.annotations.description): annot_info.append(f"Onset: {onset:.2f}s, Duration: {duration:.2f}s, Description: {desc}") snirf_info['Annotations'] = annot_info else: snirf_info['Annotations'] = "No annotations found" return snirf_info except: return None def closeEvent(self, event): # Gracefully shut down multiprocessing children print("Window is closing. Cleaning up...") if hasattr(self, 'manager'): self.manager.shutdown() for child in self.findChildren(QWidget): if child is not self and child.isVisible(): child.close() kill_child_processes() event.accept() def wait_for_process_to_exit(process_name, timeout=10): """ Waits for a process with the specified name to exit within a timeout period. Args: process_name (str): Name (or part of the name) of the process to wait for. timeout (int, optional): Maximum time to wait in seconds. Defaults to 10. Returns: bool: True if the process exited before the timeout, False otherwise. """ print(f"Waiting for {process_name} to exit...") deadline = time.time() + timeout while time.time() < deadline: still_running = False for proc in psutil.process_iter(['name']): try: if proc.info['name'] and process_name.lower() in proc.info['name'].lower(): still_running = True print(f"Still running: {proc.info['name']} (PID: {proc.pid})") break except (psutil.NoSuchProcess, psutil.AccessDenied): continue if not still_running: print(f"{process_name} has exited.") return True time.sleep(0.5) print(f"{process_name} did not exit in time.") return False def finish_update_if_needed(): """ Completes a pending application update if '--finish-update' is present in the command-line arguments. """ if "--finish-update" in sys.argv: print("Finishing update...") if PLATFORM_NAME == 'darwin': app_dir = '/tmp/flarestempupdate' else: app_dir = os.getcwd() # 1. Find update folder update_folder = None for entry in os.listdir(app_dir): entry_path = os.path.join(app_dir, entry) if os.path.isdir(entry_path) and entry.startswith("flares-") and entry.endswith("-" + PLATFORM_NAME): update_folder = os.path.join(app_dir, entry) break if update_folder is None: print("No update folder found. Skipping update steps.") return if PLATFORM_NAME == 'darwin': update_folder = os.path.join(update_folder, "flares-darwin") # 2. Wait for flares_updater to exit print("Waiting for flares_updater to exit...") for proc in psutil.process_iter(['pid', 'name']): if proc.info['name'] and "flares_updater" in proc.info['name'].lower(): try: proc.wait(timeout=5) except psutil.TimeoutExpired: print("Force killing lingering flares_updater") proc.kill() # 3. Replace the updater if PLATFORM_NAME == 'windows': new_updater = os.path.join(update_folder, "flares_updater.exe") dest_updater = os.path.join(app_dir, "flares_updater.exe") elif PLATFORM_NAME == 'darwin': new_updater = os.path.join(update_folder, "flares_updater.app") dest_updater = os.path.abspath(os.path.join(sys.executable, "../../../../flares_updater.app")) elif PLATFORM_NAME == 'linux': new_updater = os.path.join(update_folder, "flares_updater") dest_updater = os.path.join(app_dir, "flares_updater") else: print("No platform??") new_updater = os.getcwd() dest_updater = os.getcwd() print(f"New updater is {new_updater}") print(f"Dest updater is {dest_updater}") print("Writable?", os.access(dest_updater, os.W_OK)) print("Executable path:", sys.executable) print("Trying to copy:", new_updater, "->", dest_updater) if os.path.exists(new_updater): try: if os.path.exists(dest_updater): if PLATFORM_NAME == 'darwin': try: if os.path.isdir(dest_updater): shutil.rmtree(dest_updater) print(f"Deleted directory: {dest_updater}") else: os.remove(dest_updater) print(f"Deleted file: {dest_updater}") except Exception as e: print(f"Error deleting {dest_updater}: {e}") else: os.remove(dest_updater) if PLATFORM_NAME == 'darwin': wait_for_process_to_exit("flares_updater", timeout=10) subprocess.check_call(["ditto", new_updater, dest_updater]) else: shutil.copy2(new_updater, dest_updater) if PLATFORM_NAME in ('linux', 'darwin'): os.chmod(dest_updater, 0o755) if PLATFORM_NAME == 'darwin': remove_quarantine(dest_updater) print("flares_updater replaced.") except Exception as e: print(f"Failed to replace flares_updater: {e}") # 4. Delete the update folder try: if PLATFORM_NAME == 'darwin': shutil.rmtree(app_dir) else: shutil.rmtree(update_folder) except Exception as e: print(f"Failed to delete update folder: {e}") QMessageBox.information(None, "Update Complete", "The application has been successfully updated.") sys.argv.remove("--finish-update") def remove_quarantine(app_path): """ Removes the macOS quarantine attribute from the specified application path. """ script = f''' do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "FLARES needs privileges to finish the update. (2/2)" ''' try: subprocess.run(['osascript', '-e', script], check=True) print("✅ Quarantine attribute removed.") except subprocess.CalledProcessError as e: print("❌ Failed to remove quarantine attribute.") print(e) def run_gui_entry_wrapper(config, gui_queue, progress_queue): """ Where the processing happens """ try: import flares flares.gui_entry(config, gui_queue, progress_queue) sys.exit(0) except Exception as e: tb_str = traceback.format_exc() gui_queue.put({ "success": False, "error": f"Child process crashed: {str(e)}\nTraceback:\n{tb_str}" }) def resource_path(relative_path): """ Get absolute path to resource regardless of running directly or packaged using PyInstaller """ if hasattr(sys, '_MEIPASS'): # PyInstaller bundle path base_path = sys._MEIPASS else: base_path = os.path.dirname(os.path.abspath(__file__)) return os.path.join(base_path, relative_path) def kill_child_processes(): """ Goodbye children """ try: parent = psutil.Process(os.getpid()) children = parent.children(recursive=True) for child in children: try: child.kill() except psutil.NoSuchProcess: pass psutil.wait_procs(children, timeout=5) except Exception as e: print(f"Error killing child processes: {e}") def exception_hook(exc_type, exc_value, exc_traceback): """ Method that will display a popup when the program hard crashes containg what went wrong """ error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) print(error_msg) # also print to console kill_child_processes() # Show error message box # Make sure QApplication exists (or create a minimal one) app = QApplication.instance() if app is None: app = QApplication(sys.argv) QMessageBox.critical(None, "Unexpected Error", f"An unhandled exception occurred:\n\n{error_msg}") # Exit the app after user acknowledges sys.exit(1) if __name__ == "__main__": # Redirect exceptions to the popup window sys.excepthook = exception_hook # Set up application logging if PLATFORM_NAME == "darwin": log_path = os.path.join(os.path.dirname(sys.executable), "../../../flares.log") else: log_path = os.path.join(os.getcwd(), "flares.log") try: os.remove(log_path) except: pass sys.stdout = open(log_path, "a", buffering=1) sys.stderr = sys.stdout print(f"\n=== App started at {datetime.now()} ===\n") freeze_support() # Required for PyInstaller + multiprocessing # Only run GUI in the main process if current_process().name == 'MainProcess': app = QApplication(sys.argv) finish_update_if_needed() window = MainApplication() if PLATFORM_NAME == "darwin": app.setWindowIcon(QIcon(resource_path("icons/main.icns"))) window.setWindowIcon(QIcon(resource_path("icons/main.icns"))) else: app.setWindowIcon(QIcon(resource_path("icons/main.ico"))) window.setWindowIcon(QIcon(resource_path("icons/main.ico"))) window.show() sys.exit(app.exec()) # Not 4000 lines yay!