improvements

This commit is contained in:
2025-10-20 09:33:50 -07:00
parent b5afcec37d
commit 20b255321b
5 changed files with 476 additions and 130 deletions

View File

@@ -268,40 +268,42 @@ def set_metadata(file_path, metadata: dict[str, Any]) -> None:
val = file_metadata.get(key, None)
if val not in (None, '', [], {}, ()): # check for "empty" values
globals()[key] = val
from queue import Empty # This works with multiprocessing.Manager().Queue()
def gui_entry(config: dict[str, Any], gui_queue: Queue, progress_queue: Queue) -> None:
try:
# Start a thread to forward progress messages back to GUI
def forward_progress():
while True:
try:
msg = progress_queue.get(timeout=1)
if msg == "__done__":
break
gui_queue.put(msg)
except:
continue
def forward_progress():
while True:
try:
msg = progress_queue.get(timeout=1)
if msg == "__done__":
break
gui_queue.put(msg)
except Empty:
continue
except Exception as e:
gui_queue.put({
"type": "error",
"error": f"Forwarding thread crashed: {e}",
"traceback": traceback.format_exc()
})
break
t = threading.Thread(target=forward_progress, daemon=True)
t.start()
t = threading.Thread(target=forward_progress, daemon=True)
t.start()
try:
file_paths = config['SNIRF_FILES']
file_params = config['PARAMS']
file_metadata = config['METADATA']
max_workers = file_params.get("MAX_WORKERS", int(os.cpu_count()/4))
# Run the actual processing, with progress_queue passed down
print("actual call")
results = process_multiple_participants(file_paths, file_params, file_metadata, progress_queue, max_workers)
results = process_multiple_participants(
file_paths, file_params, file_metadata, progress_queue, max_workers
)
# Signal end of progress
progress_queue.put("__done__")
t.join()
gui_queue.put({"success": True, "result": results})
except Exception as e:
gui_queue.put({
"success": False,
@@ -309,6 +311,14 @@ def gui_entry(config: dict[str, Any], gui_queue: Queue, progress_queue: Queue) -
"traceback": traceback.format_exc()
})
finally:
# Always send done to the thread and avoid hanging
try:
progress_queue.put("__done__")
except:
pass
t.join(timeout=5) # prevent permanent hang
def process_participant_worker(args):
@@ -343,9 +353,16 @@ def process_multiple_participants(file_paths, file_params, file_metadata, progre
try:
file_path, result, error = future.result()
if error:
print(f"Error processing {file_path}: {error[0]}")
print(error[1])
error_message, error_traceback = error
if progress_queue:
progress_queue.put({
"type": "error",
"file": file_path,
"error": error_message,
"traceback": error_traceback
})
continue
results_by_file[file_path] = result
except Exception as e:
print(f"Unexpected error processing {file_path}: {e}")