mac + windows standalone craching fixes

This commit is contained in:
2026-03-23 16:49:02 -07:00
parent 9fe4a4dc30
commit 3afcb54b77
2 changed files with 100 additions and 100 deletions
+30 -28
View File
@@ -251,25 +251,18 @@ REQUIRED_KEYS: dict[str, Any] = {
}
# audit_log = logging.getLogger("memory_audit")
# audit_log.setLevel(logging.INFO)
# audit_log.propagate = False # This prevents it from talking to other loggers
# # 2. Add a file handler specifically for this audit logger
# if not audit_log.handlers:
# fh = logging.FileHandler('flares_memory_audit.log')
# fh.setFormatter(logging.Formatter('%(asctime)s | PID: %(process)d | %(message)s'))
# audit_log.addHandler(fh)
import logging
import os
import psutil
import traceback
audit_log = logging.getLogger("memory_audit")
audit_log.setLevel(logging.INFO)
audit_log.propagate = False # This prevents it from talking to other loggers
# 2. Add a file handler specifically for this audit logger
if not audit_log.handlers:
fh = logging.FileHandler('flares_memory_audit.log')
fh.setFormatter(logging.Formatter('%(asctime)s | PID: %(process)d | %(message)s'))
audit_log.addHandler(fh)
def get_mem_mb():
return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
# def get_mem_mb():
# return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
@@ -375,10 +368,10 @@ def gui_entry(config: dict[str, Any], gui_queue: Queue, progress_queue: Queue) -
max_workers = file_params.get("MAX_WORKERS", int(os.cpu_count()/4))
results = process_multiple_participants(
file_paths, file_params, file_metadata, progress_queue, max_workers
file_paths, file_params, file_metadata, progress_queue, gui_queue, max_workers
)
gui_queue.put({"success": True, "result": results})
gui_queue.put({"success": True})
except Exception as e:
gui_queue.put({
@@ -423,15 +416,15 @@ def process_participant_worker(file_path, file_params, file_metadata, result_que
finally:
# --- THE FIX: MANDATORY EXIT ---
# Explicitly flush the logs and force the process to terminate
audit_log.info(f"Worker for {file_name} calling hard exit.")
#audit_log.info(f"Worker for {file_name} calling hard exit.")
sys.stdout.flush()
sys.stderr.flush()
# We use os._exit(0) as a nuclear option if sys.exit() is being caught by a try/except
os._exit(0)
def process_multiple_participants(file_paths, file_params, file_metadata, progress_queue=None, max_workers=6):
audit_log.info(f"--- SESSION START: {len(file_paths)} files ---")
def process_multiple_participants(file_paths, file_params, file_metadata, progress_queue=None, gui_queue=None, max_workers=6):
#audit_log.info(f"--- SESSION START: {len(file_paths)} files ---")
pending_files = list(file_paths)
active_processes = [] # List of tuples: (Process object, file_path)
@@ -454,16 +447,25 @@ def process_multiple_participants(file_paths, file_params, file_metadata, progre
)
p.start()
active_processes.append((p, file_path))
audit_log.info(f"Spawned worker. Active processes: {len(active_processes)}")
#audit_log.info(f"Spawned worker. Active processes: {len(active_processes)}")
# 2. COLLECT RESULTS: Drain the queue continuously so workers don't deadlock
while not result_queue.empty():
try:
res_path, result, error = result_queue.get_nowait()
if not error:
results_by_file[res_path] = result
# SEND IMMEDIATELY TO THE MAIN GUI
if gui_queue:
gui_queue.put({
"type": "file_done",
"file": res_path,
"success": error is None,
"result": result if error is None else None,
"error": error if error else None
})
else:
audit_log.error(f"Worker failed on {os.path.basename(res_path)}: {error}")
# Fallback if no GUI queue (e.g., CLI mode)
results_by_file[res_path] = result
except Exception:
break # Queue is empty or busy
@@ -472,12 +474,12 @@ def process_multiple_participants(file_paths, file_params, file_metadata, progre
if not p.is_alive():
p.join() # Formally close the process to free OS resources
active_processes.remove((p, f_path))
audit_log.info(f"Worker finished. Active processes dropping to: {len(active_processes)}")
#audit_log.info(f"Worker finished. Active processes dropping to: {len(active_processes)}")
# Brief pause to prevent this while loop from pegging your CPU to 100%
time.sleep(0.5)
audit_log.info("--- SESSION COMPLETE ---")
#audit_log.info("--- SESSION COMPLETE ---")
return results_by_file
+64 -66
View File
@@ -21,6 +21,7 @@ from enum import Enum, auto
from pathlib import Path, PurePosixPath
from datetime import datetime
from multiprocessing import Process, current_process, freeze_support, Manager, Queue
from queue import Empty
# External library imports
import numpy as np
@@ -5784,95 +5785,91 @@ class MainApplication(QMainWindow):
def check_for_pipeline_results(self):
while not self.result_queue.empty():
msg = self.result_queue.get()
try:
while True:
try:
msg = self.result_queue.get_nowait()
except Empty:
break
if isinstance(msg, dict):
if msg.get("success"):
if isinstance(msg, dict) and msg.get("type") == "file_done":
file_path = msg["file"]
self.files_done.add(file_path)
results = msg["result"] # from flares.py
# print(f"[DEBUG] File Done: {os.path.basename(file_path)}")
# print(f"[DEBUG] Progress: {len(self.files_done)} / {self.files_total}")
for file_path, result_tuple in results.items():
self.files_done.add(file_path)
self.files_results[file_path] = result_tuple
if msg.get("success"):
# Unpack the massive tuple
raw_haemo, config, epochs, fig_bytes, cha, contrast, df_ind, design, valid = msg["result"]
# Initialize storage
# TODO: Is this check needed? Edit: yes very much so
if getattr(self, 'raw_haemo_dict', None) is None:
self.raw_haemo_dict = {}
self.config_dict = {}
self.epochs_dict = {}
self.fig_bytes_dict = {}
self.cha_dict = {}
self.contrast_results_dict = {}
self.df_ind_dict = {}
self.design_matrix_dict = {}
self.valid_dict = {}
# Initialize dictionaries once if needed
if not hasattr(self, 'raw_haemo_dict') or self.raw_haemo_dict is None:
attrs = ['raw_haemo_dict', 'config_dict', 'epochs_dict', 'fig_bytes_dict',
'cha_dict', 'contrast_results_dict', 'df_ind_dict',
'design_matrix_dict', 'valid_dict']
for attr in attrs:
setattr(self, attr, {})
# Combine all results into the dicts
for file_path, (raw_haemo, config, epochs, fig_bytes, cha, contrast_results, df_ind, design_matrix, valid) in results.items():
self.files_results[file_path] = msg["result"]
self.raw_haemo_dict[file_path] = raw_haemo
self.config_dict[file_path] = config
self.epochs_dict[file_path] = epochs
self.fig_bytes_dict[file_path] = fig_bytes
self.cha_dict[file_path] = cha
self.contrast_results_dict[file_path] = contrast_results
self.contrast_results_dict[file_path] = contrast
self.df_ind_dict[file_path] = df_ind
self.design_matrix_dict[file_path] = design_matrix
self.design_matrix_dict[file_path] = design
self.valid_dict[file_path] = valid
# self.statusbar.showMessage(f"Processing complete! Time elapsed: {elapsed_time:.2f} seconds")
# self.statusbar.showMessage(f"Processing complete!")
self.statusbar.showMessage(f"Processed: {os.path.basename(file_path)}")
self.button3.setVisible(True)
else:
self.files_failed.add(file_path)
error_msg = msg.get("error", "Unknown worker error")
print(f"[DEBUG] File Failed: {os.path.basename(file_path)} - {error_msg}")
self.show_error_popup(f"Error: {file_path}", error_msg, msg.get("traceback", ""))
self.statusbar.showMessage(f"Failed: {os.path.basename(file_path)}")
elif isinstance(msg, dict) and msg.get("success") is True:
self.statusbar.showMessage("All files processed successfully!")
elif msg.get("success") is False:
elif isinstance(msg, dict) and (msg.get("success") is False or msg.get("type") == "error"):
file_path = msg.get("file", "Process")
error_msg = msg.get("error", "Unknown error")
traceback_str = msg.get("traceback", "")
self.show_error_popup("Processing failed!", error_msg, traceback_str)
self.files_done = set(self.files_results.keys())
self.statusbar.showMessage(f"Processing failed!")
self.result_timer.stop()
self.cleanup_after_process()
return
elif msg.get("type") == "error":
# Error forwarded from a single file (e.g. from a worker)
file_path = msg.get("file", "Unknown file")
error_msg = msg.get("error", "Unknown error")
traceback_str = msg.get("traceback", "")
self.show_error_popup(f"Error: {file_path}", error_msg, msg.get("traceback", ""))
self.files_done.add(file_path)
self.files_failed.add(file_path)
if msg.get("success") is False: # Fatal crash
self.result_timer.stop()
self.cleanup_after_process()
return
self.show_error_popup(f"{file_path}", error_msg, traceback_str)
self.statusbar.showMessage(f"Error processing {file_path}")
if file_path in self.bubble_widgets:
self.bubble_widgets[file_path].mark_cancelled()
elif isinstance(msg, tuple) and msg[0] == 'progress':
_, file_path, step_index = msg
self.progress_update_signal.emit(file_path, step_index)
if len(self.files_done) >= self.files_total:
self.result_timer.stop()
self.cleanup_after_process()
elif isinstance(msg, tuple) and msg[0] == 'progress':
_, file_path, step_index = msg
self.progress_update_signal.emit(file_path, step_index)
success_count = len(self.files_results)
fail_count = self.files_total - success_count
self.statusbar.showMessage(f"Complete: {success_count} succeeded, {fail_count} failed.")
if success_count > 0:
self.button3.setVisible(True)
if len(self.files_done) == self.files_total:
self.result_timer.stop()
self.cleanup_after_process()
try:
self.button1.clicked.disconnect()
except: pass
self.button1.setText("Process")
self.button1.clicked.connect(self.on_run_task)
success_count = len(self.files_results)
fail_count = len(self.files_failed)
summary_msg = f"Processing complete: {success_count} succeeded, {fail_count} failed."
self.statusbar.showMessage(summary_msg)
if success_count > 0:
self.button3.setVisible(True)
self.button1.clicked.disconnect(self.cancel_task)
self.button1.setText("Process")
self.button1.clicked.connect(self.on_run_task)
except Exception as e:
print(f"Error in timer loop: {e}")
if not self.result_process.is_alive() and len(self.files_done) < self.files_total:
self.statusbar.showMessage("Background process died.")
self.result_timer.stop()
def show_error_popup(self, title, error_message, traceback_str=""):
@@ -6061,7 +6058,7 @@ def run_gui_entry_wrapper(config, gui_queue, progress_queue):
import flares
flares.gui_entry(config, gui_queue, progress_queue)
gui_queue.close()
gui_queue.join_thread()
#gui_queue.join_thread()
sys.exit(0)
except Exception as e:
@@ -6070,6 +6067,7 @@ def run_gui_entry_wrapper(config, gui_queue, progress_queue):
"success": False,
"error": f"Child process crashed: {str(e)}\nTraceback:\n{tb_str}"
})
sys.exit(1)
def resource_path(relative_path):