too many fixes to document just read changelog

This commit is contained in:
2026-03-21 15:26:59 -07:00
parent 4922405138
commit 540793d150
5 changed files with 705 additions and 441 deletions

199
flares.py
View File

@@ -16,12 +16,14 @@ from io import BytesIO
from typing import Any, Optional, cast, Literal, Union
from itertools import compress
from copy import deepcopy
from multiprocessing import Queue
from multiprocessing import Queue, Pool
import os.path as op
import re
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
from queue import Empty
import time
import multiprocessing as mp
# External library imports
import matplotlib.pyplot as plt
@@ -169,6 +171,8 @@ H_FREQ: float
L_TRANS_BANDWIDTH: float
H_TRANS_BANDWIDTH: float
EPOCH_HANDLING: str
RESAMPLE: bool
RESAMPLE_FREQ: int
STIM_DUR: float
@@ -247,6 +251,28 @@ REQUIRED_KEYS: dict[str, Any] = {
}
import logging
import os
import psutil
import traceback
audit_log = logging.getLogger("memory_audit")
audit_log.setLevel(logging.INFO)
audit_log.propagate = False # This prevents it from talking to other loggers
# 2. Add a file handler specifically for this audit logger
if not audit_log.handlers:
fh = logging.FileHandler('flares_memory_audit.log')
fh.setFormatter(logging.Formatter('%(asctime)s | PID: %(process)d | %(message)s'))
audit_log.addHandler(fh)
def get_mem_mb():
return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
class ProcessingError(Exception):
def __init__(self, message: str = "Something went wrong!"):
self.message = message
@@ -370,58 +396,92 @@ def gui_entry(config: dict[str, Any], gui_queue: Queue, progress_queue: Queue) -
t.join(timeout=5) # prevent permanent hang
def process_participant_worker(args):
file_path, file_params, file_metadata, progress_queue = args
set_config_me(file_params)
set_metadata(file_path, file_metadata)
logger.info(f"DEBUG: Metadata for {file_path}: AGE={globals().get('AGE')}, GENDER={globals().get('GENDER')}, GROUP={globals().get('GROUP')}")
def progress_callback(step_idx):
if progress_queue:
progress_queue.put(('progress', file_path, step_idx))
def process_participant_worker(file_path, file_params, file_metadata, result_queue, progress_queue):
file_name = os.path.basename(file_path)
try:
# 1. Setup
set_config_me(file_params)
set_metadata(file_path, file_metadata)
def progress_callback(step_idx):
if progress_queue:
# We use put_nowait to prevent the worker from hanging on a full queue
try:
progress_queue.put_nowait(('progress', file_path, step_idx))
except: pass
# 2. Process
result = process_participant(file_path, progress_callback=progress_callback)
return file_path, result, None
# 3. Report Success
result_queue.put((file_path, result, None))
except Exception as e:
error_trace = traceback.format_exc()
return file_path, None, (str(e), error_trace)
result_queue.put((file_path, None, str(e)))
finally:
# --- THE FIX: MANDATORY EXIT ---
# Explicitly flush the logs and force the process to terminate
audit_log.info(f"Worker for {file_name} calling hard exit.")
sys.stdout.flush()
sys.stderr.flush()
# We use os._exit(0) as a nuclear option if sys.exit() is being caught by a try/except
os._exit(0)
def process_multiple_participants(file_paths, file_params, file_metadata, progress_queue=None, max_workers=None):
def process_multiple_participants(file_paths, file_params, file_metadata, progress_queue=None, max_workers=6):
audit_log.info(f"--- SESSION START: {len(file_paths)} files ---")
pending_files = list(file_paths)
active_processes = [] # List of tuples: (Process object, file_path)
results_by_file = {}
# We use a manager queue so it handles IPC serialization cleanly
manager = mp.Manager()
result_queue = manager.Queue()
file_args = [(file_path, file_params, file_metadata, progress_queue) for file_path in file_paths]
# Loop continues as long as there are files to process OR workers still running
while pending_files or active_processes:
# 1. SPWAN WORKERS: Only spawn if we are under the limit AND have files left
while len(active_processes) < max_workers and pending_files:
file_path = pending_files.pop(0)
p = mp.Process(
target=process_participant_worker,
args=(file_path, file_params, file_metadata, result_queue, progress_queue)
)
p.start()
active_processes.append((p, file_path))
audit_log.info(f"Spawned worker. Active processes: {len(active_processes)}")
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_participant_worker, arg): arg[0] for arg in file_args}
for future in as_completed(futures):
file_path = futures[future]
# 2. COLLECT RESULTS: Drain the queue continuously so workers don't deadlock
while not result_queue.empty():
try:
file_path, result, error = future.result()
if error:
error_message, error_traceback = error
if progress_queue:
progress_queue.put({
"type": "error",
"file": file_path,
"error": error_message,
"traceback": error_traceback
})
continue
results_by_file[file_path] = result
except Exception as e:
print(f"Unexpected error processing {file_path}: {e}")
res_path, result, error = result_queue.get_nowait()
if not error:
results_by_file[res_path] = result
else:
audit_log.error(f"Worker failed on {os.path.basename(res_path)}: {error}")
except Exception:
break # Queue is empty or busy
# 3. CLEANUP: Check for finished processes and remove them
for p, f_path in active_processes[:]: # Iterate over a slice copy
if not p.is_alive():
p.join() # Formally close the process to free OS resources
active_processes.remove((p, f_path))
audit_log.info(f"Worker finished. Active processes dropping to: {len(active_processes)}")
# Brief pause to prevent this while loop from pegging your CPU to 100%
time.sleep(0.5)
audit_log.info("--- SESSION COMPLETE ---")
return results_by_file
def markbad(data, ax, ch_names: list[str]) -> None:
"""
Add a strikethrough to a plot for channels marked as bad.
@@ -1143,16 +1203,48 @@ def filter_the_data(raw_haemo):
def safe_create_epochs(raw, events, event_dict, tmin, tmax, baseline):
"""
Attempts to create epochs, shifting event times slightly if
sample collisions are detected.
"""
shift_increment = 1.0 / raw.info['sfreq'] # The duration of exactly one sample
for attempt in range(10): # Limit attempts to avoid infinite loops
try:
epochs = Epochs(
raw, events, event_id=event_dict,
tmin=tmin, tmax=tmax, baseline=baseline,
preload=True, verbose=False
)
return epochs
except RuntimeError as e:
if "Event time samples were not unique" in str(e):
# Find duplicates in the events array (column 0 is the sample index)
vals, counts = np.unique(events[:, 0], return_counts=True)
duplicates = vals[counts > 1]
# Shift the second occurrence of every duplicate by 1 sample
for dup in duplicates:
idx = np.where(events[:, 0] == dup)[0][1:] # Get all but the first
events[idx, 0] += 1
print(f"Collision detected. Nudging events by {shift_increment:.4f}s and retrying...")
continue
else:
raise e # Raise if it's a different Runtime Error
raise RuntimeError("Could not resolve event collisions after 10 attempts.")
def epochs_calculations(raw_haemo, events, event_dict):
fig_epochs = [] # List to store figures
# Create epochs from raw data
epochs = Epochs(raw_haemo,
events,
event_id=event_dict,
tmin=-5,
tmax=15,
baseline=(None, 0))
if EPOCH_HANDLING == 'shift':
epochs = safe_create_epochs(raw=raw_haemo, events=events, event_dict=event_dict, tmin=-5, tmax=15, baseline=(None, 0))
else:
epochs = Epochs(raw_haemo, events, event_id=event_dict, tmin=-5, tmax=15, baseline=(None, 0))
# Make a copy of the epochs and drop bad ones
epochs2 = epochs.copy()
@@ -1582,15 +1674,12 @@ def resource_path(relative_path):
def fold_channels(raw: BaseRaw) -> None:
# if getattr(sys, 'frozen', False):
path = os.path.expanduser("~") + "/mne_data/fOLD/fOLD-public-master/Supplementary"
logger.info(path)
set_config('MNE_NIRS_FOLD_PATH', resource_path(path)) # type: ignore
# # Locate the fOLD excel files
# else:
# logger.info("yabba")
# set_config('MNE_NIRS_FOLD_PATH', resource_path("../../mne_data/fOLD/fOLD-public-master/Supplementary")) # type: ignore
# Locate the fOLD excel files
if getattr(sys, 'frozen', False):
set_config('MNE_NIRS_FOLD_PATH', resource_path("../../mne_data/fOLD/fOLD-public-master/Supplementary")) # type: ignore
else:
path = os.path.expanduser("~") + "/mne_data/fOLD/fOLD-public-master/Supplementary"
set_config('MNE_NIRS_FOLD_PATH', resource_path(path)) # type: ignore
output = None