import inspect import csv import os import json from datetime import datetime import numpy as np import joblib import seaborn as sns import matplotlib.pyplot as plt from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, f1_score, precision_score, recall_score, confusion_matrix from sklearn.preprocessing import StandardScaler VERBOSITY = 1 def load_analysis_config(path="analysis_config.json"): with open(path, 'r') as f: config = json.load(f) return config['geometry_library'], config['activity_map'] try: GEOMETRY_LIBRARY, ACTIVITY_MAP = load_analysis_config() except FileNotFoundError: GEOMETRY_LIBRARY, ACTIVITY_MAP = {}, {} print("Warning: analysis_config.json not found. ML functions will fail.") def debug_print(): if VERBOSITY: frame = inspect.currentframe().f_back qualname = frame.f_code.co_filename print(f"DEBUG_PRINT: {qualname}") class GeneralPredictor: def __init__(self): debug_print() self.base_paths = { "Random Forest": "rf.pkl" } self.current_target = "" self.active_feature_keys = [] def calculate_and_train(self, training_params): """ Takes the dict from get_selection() in TrainModelWindow. Loads CSV/JSON pairs, extracts combined features, and trains Random Forest. """ debug_print() folder = training_params.get("folder") pairs = training_params.get("pairs", []) selected_behaviors = training_params.get("selected_behaviors", []) self.current_target = training_params.get("target_name", "combined_model") model_type = training_params.get("model_type", "Random Forest") if not pairs or not selected_behaviors: return "Error: Missing data pairs or target behaviors." # 1. Determine the union of ALL needed geometric features across selected behaviors needed_features = set() for b_name in selected_behaviors: req_feats = ACTIVITY_MAP.get(b_name, []) needed_features.update(req_feats) self.active_feature_keys = sorted(list(needed_features)) print(self.active_feature_keys) model_metadata = { "target_behavior": self.current_target, "feature_keys": self.active_feature_keys, "model_type": model_type, "timestamp": datetime.now().isoformat() } if not self.active_feature_keys: return "Error: No geometric features mapped to the selected behavior(s) in analysis_config.json." all_X = [] all_y = [] # 2. Process each Pair (JSON labels + CSV raw pose) for json_path, csv_path in pairs: # --- Load JSON Labels --- try: with open(json_path, 'r') as f: label_data = json.load(f) except Exception as e: print(f"Error loading {json_path}: {e}") continue behaviors = label_data.get("behaviors", {}) # --- Load CSV Pose Data --- try: raw_kpts = [] with open(csv_path, 'r') as f: reader = csv.reader(f) next(reader) # skip header for row in reader: raw_kpts.append(np.array([float(x) for x in row]).reshape(17, 3)) raw_kpts = np.array(raw_kpts) except Exception as e: print(f"Error loading {csv_path}: {e}") continue total_frames = len(raw_kpts) if total_frames == 0: continue # Create binary target array (0 = Rest, 1 = Active) y_vector = np.zeros(total_frames, dtype=int) # If the frame falls inside ANY of the selected behaviors, mark it 1 for b_name in selected_behaviors: instances = behaviors.get(b_name, []) for inst in instances: start = inst.get("start_frame", 0) duration = inst.get("duration_frames", 0) end = min(start + duration, total_frames) y_vector[start:end] = 1 # --- Calculate Features per Frame --- # To match the new flow, we just need raw_kpts. # (Z-scores were previously passed, but those were derived from raw anyway. # If you require normalized z-scores for RF, you must recalculate them here # using the same baseline logic from the main window. For now, we extract raw geom.) participant_features = [] for i in range(total_frames): kpts = raw_kpts[i] # Shape (17, 3) feat = self.format_features(kpts) participant_features.append(feat) all_X.append(np.array(participant_features)) all_y.append(y_vector) # 3. Prepare for Training if not all_X: return "Error: No valid data extracted from files." X_combined = np.vstack(all_X) y_combined = np.concatenate(all_y) # Check for class imbalance edge case (e.g. 0 instances of behavior found) if len(np.unique(y_combined)) < 2: return "Error: Training data only contains one class (usually 0/Rest). Model cannot train." metadata_path = self.get_path(model_type).replace(".pkl", "_metadata.json") with open(metadata_path, 'w') as f: json.dump(model_metadata, f, indent=4) print(f"[INFO] Metadata saved to: {metadata_path}") # 4. Scale Data scaler = StandardScaler() X_scaled = scaler.fit_transform(X_combined) scaler_path = self.get_path(model_type, is_scaler=True) joblib.dump(scaler, scaler_path) # 5. Train/Test Split X_train, X_test, y_train, y_test = train_test_split( X_scaled, y_combined, test_size=0.2, stratify=y_combined, random_state=42 ) # 6. Train Random Forest (Placeholders exist for others) if model_type == "Random Forest": model = RandomForestClassifier(max_depth=15, n_estimators=100, class_weight="balanced") model.fit(X_train, y_train) save_path = self.get_path(model_type) joblib.dump(model, save_path) y_pred = model.predict(X_test) # Feature Importance importances = model.feature_importances_ feature_data = sorted(zip(self.active_feature_keys, importances), key=lambda x: x[1], reverse=True) ui_extras = "Top Predictors:
" + "
".join([f"{n}: {v:.3f}" for n, v in feature_data[:10]]) file_extras = "Top Predictors:\n" + "\n".join([f"- {n}: {v:.3f}" for n, v in feature_data]) return self._evaluate_and_report(model_type, y_test, y_pred, ui_extras=ui_extras, file_extras=file_extras) elif model_type == "1D-CNN": return "1D-CNN training placeholder reached. Not yet implemented." elif model_type == "LSTM": return "LSTM training placeholder reached. Not yet implemented." elif model_type == "XGBoost": return "XGBoost training placeholder reached. Not yet implemented." else: return f"Model type {model_type} not supported." def get_path(self, model_type, is_scaler=False): debug_print() suffix = self.base_paths.get(model_type, "model.pkl") if is_scaler: suffix = suffix.split('.')[0] + "_scaler.pkl" return f"ml_{self.current_target}_{suffix}" def format_features(self, kpts): """ Calculates only the geometric features required by self.active_feature_keys. """ def resolve_pt(idx): if isinstance(idx, list): pts = [kpts[i][:2] for i in idx] # Ensure X/Y only return np.mean(pts, axis=0) return kpts[idx][:2] def get_dist(p1, p2): return np.linalg.norm(p1 - p2) def get_angle(a, b, c): try: ba, bc = a - b, c - b denom = (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6) cos = np.dot(ba, bc) / denom return np.degrees(np.arccos(np.clip(cos, -1.0, 1.0))) / 180.0 except: return 0.0 calculated_pool = {} try: if kpts is None or len(kpts) < 13: raise ValueError() scale = get_dist(kpts[5][:2], kpts[6][:2]) + 1e-6 # First Pass: Direct Geometries (Only calculate what is needed or what is a dependency) for name, config_data in GEOMETRY_LIBRARY.items(): f_type = config_data[0] indices = config_data[1] if f_type == "dist": p1 = resolve_pt(indices[0]) p2 = resolve_pt(indices[1]) calculated_pool[name] = get_dist(p1, p2) / scale elif f_type == "angle": p1 = resolve_pt(indices[0]) p2 = resolve_pt(indices[1]) p3 = resolve_pt(indices[2]) calculated_pool[name] = get_angle(p1, p2, p3) elif f_type == "head_offset": p_target = resolve_pt(indices[0]) p_mid = resolve_pt([indices[1], indices[2]]) calculated_pool[name] = abs(p_target[0] - p_mid[0]) / scale elif f_type == "y_diff": # NEW from JSON p1 = resolve_pt(indices[0]) p2 = resolve_pt(indices[1]) calculated_pool[name] = abs(p1[1] - p2[1]) / scale # Second Pass: Subtractions (Requires first pass to be complete) for name, config_data in GEOMETRY_LIBRARY.items(): f_type = config_data[0] indices = config_data[1] if f_type == "subtraction": val1 = calculated_pool.get(indices[0], 0) val2 = calculated_pool.get(indices[1], 0) calculated_pool[name] = val1 - val2 elif f_type == "abs_subtraction": val1 = calculated_pool.get(indices[0], 0) val2 = calculated_pool.get(indices[1], 0) calculated_pool[name] = abs(val1 - val2) except Exception: calculated_pool = {name: 0.0 for name in GEOMETRY_LIBRARY.keys()} # Final Extraction based on the set of needed features feature_vector = [calculated_pool.get(feat, 0.0) for feat in self.active_feature_keys] return np.array(feature_vector, dtype=np.float32) def _evaluate_and_report(self, model_name, y_test, y_pred, ui_extras="", file_extras=""): debug_print() prec = precision_score(y_test, y_pred, zero_division=0) rec = recall_score(y_test, y_pred, zero_division=0) f1 = f1_score(y_test, y_pred, zero_division=0) display_labels = ['Rest', self.current_target] cm = confusion_matrix(y_test, y_pred) plt.figure(figsize=(8, 6)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=display_labels, yticklabels=display_labels) plt.title(f'{model_name} Detection: {self.current_target}') plt.ylabel('Actual State') plt.xlabel('Predicted State') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') plt.savefig(f"ml_{self.current_target}_cm_{timestamp}.png") plt.close() report_str = classification_report(y_test, y_pred, target_names=display_labels, zero_division=0) report_text = f"MODEL PERFORMANCE REPORT: {model_name}\nGenerated: {timestamp}\n" report_text += "="*40 + "\n" report_text += report_str + "\n" report_text += f"Precision: {prec:.4f}\nRecall: {rec:.4f}\nF1-Score: {f1:.4f}\n" report_text += "="*40 + "\n" + file_extras with open(f"ml_{self.current_target}_performance_{timestamp}.txt", "w") as f: f.write(report_text) ui_report = f""" {model_name} Model for '{self.current_target}'
Precision: {prec:.2f} | Recall: {rec:.2f} | F1: {f1:.2f}

{ui_extras} """ return ui_report # Inside predictor.py -> GeneralPredictor class def convert_to_events(self, predictions, track_name="🤖 AI: Predicted"): """ Converts a 1D array of class labels into a dictionary of timeline blocks. predictions: np.array of 0s and 1s track_name: The name for the resulting timeline row """ events = {track_name: []} current_class = None start_frame = 0 for i, pred in enumerate(predictions): # We only care about the transition into or out of class 1 if pred != current_class: # If we were in an active block (1), close it if current_class == 1: events[track_name].append([start_frame, i, "Normal", "ML Prediction"]) # If we are starting a new active block (1), mark the start if pred == 1: start_frame = i current_class = pred # Close the final block if the video ends while the behavior is active if current_class == 1: events[track_name].append([start_frame, len(predictions), "Normal", "ML Prediction"]) return events