From 3afcb54b7705e5bbf7d3fef8a5ac33428bb822d7 Mon Sep 17 00:00:00 2001 From: Tyler Date: Mon, 23 Mar 2026 16:49:02 -0700 Subject: [PATCH] mac + windows standalone craching fixes --- flares.py | 58 +++++++++++----------- main.py | 142 +++++++++++++++++++++++++++--------------------------- 2 files changed, 100 insertions(+), 100 deletions(-) diff --git a/flares.py b/flares.py index 7a43581..91322a3 100644 --- a/flares.py +++ b/flares.py @@ -251,25 +251,18 @@ REQUIRED_KEYS: dict[str, Any] = { } +# audit_log = logging.getLogger("memory_audit") +# audit_log.setLevel(logging.INFO) +# audit_log.propagate = False # This prevents it from talking to other loggers +# # 2. Add a file handler specifically for this audit logger +# if not audit_log.handlers: +# fh = logging.FileHandler('flares_memory_audit.log') +# fh.setFormatter(logging.Formatter('%(asctime)s | PID: %(process)d | %(message)s')) +# audit_log.addHandler(fh) -import logging -import os -import psutil -import traceback - -audit_log = logging.getLogger("memory_audit") -audit_log.setLevel(logging.INFO) -audit_log.propagate = False # This prevents it from talking to other loggers - -# 2. Add a file handler specifically for this audit logger -if not audit_log.handlers: - fh = logging.FileHandler('flares_memory_audit.log') - fh.setFormatter(logging.Formatter('%(asctime)s | PID: %(process)d | %(message)s')) - audit_log.addHandler(fh) - -def get_mem_mb(): - return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 +# def get_mem_mb(): +# return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 @@ -375,10 +368,10 @@ def gui_entry(config: dict[str, Any], gui_queue: Queue, progress_queue: Queue) - max_workers = file_params.get("MAX_WORKERS", int(os.cpu_count()/4)) results = process_multiple_participants( - file_paths, file_params, file_metadata, progress_queue, max_workers + file_paths, file_params, file_metadata, progress_queue, gui_queue, max_workers ) - gui_queue.put({"success": True, "result": results}) + gui_queue.put({"success": True}) except Exception as e: gui_queue.put({ @@ -423,15 +416,15 @@ def process_participant_worker(file_path, file_params, file_metadata, result_que finally: # --- THE FIX: MANDATORY EXIT --- # Explicitly flush the logs and force the process to terminate - audit_log.info(f"Worker for {file_name} calling hard exit.") + #audit_log.info(f"Worker for {file_name} calling hard exit.") sys.stdout.flush() sys.stderr.flush() # We use os._exit(0) as a nuclear option if sys.exit() is being caught by a try/except os._exit(0) -def process_multiple_participants(file_paths, file_params, file_metadata, progress_queue=None, max_workers=6): - audit_log.info(f"--- SESSION START: {len(file_paths)} files ---") +def process_multiple_participants(file_paths, file_params, file_metadata, progress_queue=None, gui_queue=None, max_workers=6): + #audit_log.info(f"--- SESSION START: {len(file_paths)} files ---") pending_files = list(file_paths) active_processes = [] # List of tuples: (Process object, file_path) @@ -454,16 +447,25 @@ def process_multiple_participants(file_paths, file_params, file_metadata, progre ) p.start() active_processes.append((p, file_path)) - audit_log.info(f"Spawned worker. Active processes: {len(active_processes)}") + #audit_log.info(f"Spawned worker. Active processes: {len(active_processes)}") # 2. COLLECT RESULTS: Drain the queue continuously so workers don't deadlock while not result_queue.empty(): try: res_path, result, error = result_queue.get_nowait() - if not error: - results_by_file[res_path] = result + # SEND IMMEDIATELY TO THE MAIN GUI + if gui_queue: + gui_queue.put({ + "type": "file_done", + "file": res_path, + "success": error is None, + "result": result if error is None else None, + "error": error if error else None + }) else: - audit_log.error(f"Worker failed on {os.path.basename(res_path)}: {error}") + # Fallback if no GUI queue (e.g., CLI mode) + results_by_file[res_path] = result + except Exception: break # Queue is empty or busy @@ -472,12 +474,12 @@ def process_multiple_participants(file_paths, file_params, file_metadata, progre if not p.is_alive(): p.join() # Formally close the process to free OS resources active_processes.remove((p, f_path)) - audit_log.info(f"Worker finished. Active processes dropping to: {len(active_processes)}") + #audit_log.info(f"Worker finished. Active processes dropping to: {len(active_processes)}") # Brief pause to prevent this while loop from pegging your CPU to 100% time.sleep(0.5) - audit_log.info("--- SESSION COMPLETE ---") + #audit_log.info("--- SESSION COMPLETE ---") return results_by_file diff --git a/main.py b/main.py index b8f5baa..4b9c085 100644 --- a/main.py +++ b/main.py @@ -21,6 +21,7 @@ from enum import Enum, auto from pathlib import Path, PurePosixPath from datetime import datetime from multiprocessing import Process, current_process, freeze_support, Manager, Queue +from queue import Empty # External library imports import numpy as np @@ -5784,95 +5785,91 @@ class MainApplication(QMainWindow): def check_for_pipeline_results(self): - while not self.result_queue.empty(): - msg = self.result_queue.get() + try: + while True: + try: + msg = self.result_queue.get_nowait() + except Empty: + break - if isinstance(msg, dict): - if msg.get("success"): - - results = msg["result"] # from flares.py - - for file_path, result_tuple in results.items(): - self.files_done.add(file_path) - self.files_results[file_path] = result_tuple + if isinstance(msg, dict) and msg.get("type") == "file_done": + file_path = msg["file"] + self.files_done.add(file_path) - # Initialize storage - # TODO: Is this check needed? Edit: yes very much so - if getattr(self, 'raw_haemo_dict', None) is None: - self.raw_haemo_dict = {} - self.config_dict = {} - self.epochs_dict = {} - self.fig_bytes_dict = {} - self.cha_dict = {} - self.contrast_results_dict = {} - self.df_ind_dict = {} - self.design_matrix_dict = {} - self.valid_dict = {} + # print(f"[DEBUG] File Done: {os.path.basename(file_path)}") + # print(f"[DEBUG] Progress: {len(self.files_done)} / {self.files_total}") + + if msg.get("success"): + # Unpack the massive tuple + raw_haemo, config, epochs, fig_bytes, cha, contrast, df_ind, design, valid = msg["result"] + + # Initialize dictionaries once if needed + if not hasattr(self, 'raw_haemo_dict') or self.raw_haemo_dict is None: + attrs = ['raw_haemo_dict', 'config_dict', 'epochs_dict', 'fig_bytes_dict', + 'cha_dict', 'contrast_results_dict', 'df_ind_dict', + 'design_matrix_dict', 'valid_dict'] + for attr in attrs: + setattr(self, attr, {}) - # Combine all results into the dicts - for file_path, (raw_haemo, config, epochs, fig_bytes, cha, contrast_results, df_ind, design_matrix, valid) in results.items(): + self.files_results[file_path] = msg["result"] self.raw_haemo_dict[file_path] = raw_haemo self.config_dict[file_path] = config self.epochs_dict[file_path] = epochs self.fig_bytes_dict[file_path] = fig_bytes self.cha_dict[file_path] = cha - self.contrast_results_dict[file_path] = contrast_results + self.contrast_results_dict[file_path] = contrast self.df_ind_dict[file_path] = df_ind - self.design_matrix_dict[file_path] = design_matrix + self.design_matrix_dict[file_path] = design self.valid_dict[file_path] = valid - # self.statusbar.showMessage(f"Processing complete! Time elapsed: {elapsed_time:.2f} seconds") - # self.statusbar.showMessage(f"Processing complete!") + self.statusbar.showMessage(f"Processed: {os.path.basename(file_path)}") - self.button3.setVisible(True) - + else: + self.files_failed.add(file_path) + error_msg = msg.get("error", "Unknown worker error") + print(f"[DEBUG] File Failed: {os.path.basename(file_path)} - {error_msg}") + self.show_error_popup(f"Error: {file_path}", error_msg, msg.get("traceback", "")) + self.statusbar.showMessage(f"Failed: {os.path.basename(file_path)}") - elif msg.get("success") is False: + elif isinstance(msg, dict) and msg.get("success") is True: + self.statusbar.showMessage("All files processed successfully!") + + elif isinstance(msg, dict) and (msg.get("success") is False or msg.get("type") == "error"): + file_path = msg.get("file", "Process") error_msg = msg.get("error", "Unknown error") - traceback_str = msg.get("traceback", "") - self.show_error_popup("Processing failed!", error_msg, traceback_str) - self.files_done = set(self.files_results.keys()) - self.statusbar.showMessage(f"Processing failed!") - self.result_timer.stop() - self.cleanup_after_process() - return - - elif msg.get("type") == "error": - # Error forwarded from a single file (e.g. from a worker) - file_path = msg.get("file", "Unknown file") - error_msg = msg.get("error", "Unknown error") - traceback_str = msg.get("traceback", "") - + self.show_error_popup(f"Error: {file_path}", error_msg, msg.get("traceback", "")) self.files_done.add(file_path) - self.files_failed.add(file_path) + if msg.get("success") is False: # Fatal crash + self.result_timer.stop() + self.cleanup_after_process() + return - self.show_error_popup(f"{file_path}", error_msg, traceback_str) - self.statusbar.showMessage(f"Error processing {file_path}") - if file_path in self.bubble_widgets: - self.bubble_widgets[file_path].mark_cancelled() - + elif isinstance(msg, tuple) and msg[0] == 'progress': + _, file_path, step_index = msg + self.progress_update_signal.emit(file_path, step_index) - elif isinstance(msg, tuple) and msg[0] == 'progress': - _, file_path, step_index = msg - self.progress_update_signal.emit(file_path, step_index) - - - if len(self.files_done) == self.files_total: - self.result_timer.stop() - self.cleanup_after_process() - - success_count = len(self.files_results) - fail_count = len(self.files_failed) - - summary_msg = f"Processing complete: {success_count} succeeded, {fail_count} failed." - self.statusbar.showMessage(summary_msg) - - if success_count > 0: - self.button3.setVisible(True) + if len(self.files_done) >= self.files_total: + self.result_timer.stop() + self.cleanup_after_process() - self.button1.clicked.disconnect(self.cancel_task) - self.button1.setText("Process") - self.button1.clicked.connect(self.on_run_task) + success_count = len(self.files_results) + fail_count = self.files_total - success_count + self.statusbar.showMessage(f"Complete: {success_count} succeeded, {fail_count} failed.") + + if success_count > 0: + self.button3.setVisible(True) + + try: + self.button1.clicked.disconnect() + except: pass + self.button1.setText("Process") + self.button1.clicked.connect(self.on_run_task) + + except Exception as e: + print(f"Error in timer loop: {e}") + if not self.result_process.is_alive() and len(self.files_done) < self.files_total: + self.statusbar.showMessage("Background process died.") + self.result_timer.stop() def show_error_popup(self, title, error_message, traceback_str=""): @@ -6061,7 +6058,7 @@ def run_gui_entry_wrapper(config, gui_queue, progress_queue): import flares flares.gui_entry(config, gui_queue, progress_queue) gui_queue.close() - gui_queue.join_thread() + #gui_queue.join_thread() sys.exit(0) except Exception as e: @@ -6070,6 +6067,7 @@ def run_gui_entry_wrapper(config, gui_queue, progress_queue): "success": False, "error": f"Child process crashed: {str(e)}\nTraceback:\n{tb_str}" }) + sys.exit(1) def resource_path(relative_path):