""" Filename: predictor.py Description: BLAZES machine learning Author: Tyler de Zeeuw License: GPL-3.0 """ # Built-in imports import inspect from datetime import datetime # External library imports import numpy as np import joblib import seaborn as sns import matplotlib.pyplot as plt from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, f1_score, precision_score, recall_score, confusion_matrix from sklearn.preprocessing import StandardScaler # To be used once multiple models are supported and functioning: # import torch # import torch.nn as nn # import torch.optim as optim # import xgboost as xgb # from sklearn.svm import SVC # import os VERBOSITY = 1 GEOMETRY_LIBRARY = { # --- Distances (Point A, Point B) --- "dist_l_wrist_nose": ("dist", [9, 0], True), "dist_r_wrist_nose": ("dist", [10, 0], True), "dist_l_ear_r_shld": ("dist", [3, 6], True), "dist_r_ear_l_shld": ("dist", [4, 5], True), "dist_l_wrist_pelvis": ("dist", [9, [11, 12]], True), "dist_r_wrist_pelvis": ("dist", [10, [11, 12]], True), "dist_l_ankl_pelvis": ("dist", [15, [11, 12]], True), "dist_r_ankl_pelvis": ("dist", [16, [11, 12]], True), "dist_nose_pelvis": ("dist", [0, [11, 12]], True), "dist_ankl_ankl": ("dist", [15, 16], True), # NEW: Cross-Body and Pure Extension Distances "dist_l_wri_r_shld": ("dist", [9, 6], True), # Reach across body "dist_r_wri_l_shld": ("dist", [10, 5], True), # Reach across body "dist_l_wri_l_shld": ("dist", [9, 5], True), # Pure arm extension "dist_r_wri_r_shld": ("dist", [10, 6], True), # Pure arm extension # --- Angles (Point A, Center B, Point C) --- "angle_l_elbow": ("angle", [5, 7, 9]), "angle_r_elbow": ("angle", [6, 8, 10]), "angle_l_shoulder": ("angle", [11, 5, 7]), "angle_r_shoulder": ("angle", [12, 6, 8]), "angle_l_knee": ("angle", [11, 13, 15]), "angle_r_knee": ("angle", [12, 14, 16]), "angle_l_hip": ("angle", [5, 11, 13]), "angle_r_hip": ("angle", [6, 12, 14]), # --- Custom/Derived --- "asym_wrist": ("z_diff", [9, 10]), "asym_ankl": ("z_diff", [15, 16]), "offset_head": ("head_offset", [0, 5, 6]), "diff_ear_shld": ("subtraction", ["dist_l_ear_r_shld", "dist_r_ear_l_shld"]), "abs_diff_ear_shld": ("abs_subtraction", ["dist_l_ear_r_shld", "dist_r_ear_l_shld"]), # NEW: Verticality and Contralateral Contrast "height_l_ankl": ("y_diff", [15, 11]), # Foot height relative to hip "height_r_ankl": ("y_diff", [16, 12]), # Foot height relative to hip "diff_knee_angle": ("subtraction", ["angle_l_knee", "angle_r_knee"]), "asym_wri_shld": ("subtraction", ["dist_l_wri_l_shld", "dist_r_wri_r_shld"]) } # The Target Activity Map ACTIVITY_MAP = { "Mouthing": [ "dist_l_wrist_nose", "dist_r_wrist_nose", "angle_l_elbow", "angle_r_elbow", "angle_l_shoulder", "angle_r_shoulder", "asym_wrist", "offset_head" ], "Head Movement": [ "dist_l_wrist_nose", "dist_r_wrist_nose", "angle_l_elbow", "angle_r_elbow", "angle_l_shoulder", "angle_r_shoulder", "asym_wrist", "offset_head", "dist_l_ear_r_shld", "dist_r_ear_l_shld", "diff_ear_shld", "abs_diff_ear_shld" ], "Reach (Left)": [ "dist_l_wrist_pelvis", "dist_l_wrist_nose", "dist_l_wri_l_shld", "dist_l_wri_r_shld", "angle_l_elbow", "angle_l_shoulder", "asym_wri_shld" ], "Reach (Right)": [ "dist_r_wrist_pelvis", "dist_r_wrist_nose", "dist_r_wri_r_shld", "dist_r_wri_l_shld", "angle_r_elbow", "angle_r_shoulder", "asym_wri_shld" ], "Kick (Left)": [ "dist_l_ankl_pelvis", "angle_l_knee", "angle_l_hip", "height_l_ankl", "dist_ankl_ankl", "asym_ankl", "diff_knee_angle", "dist_nose_pelvis" ], "Kick (Right)": [ "dist_r_ankl_pelvis", "angle_r_knee", "angle_r_hip", "height_r_ankl", "dist_ankl_ankl", "asym_ankl", "diff_knee_angle", "dist_nose_pelvis" ] } def debug_print(): if VERBOSITY: frame = inspect.currentframe().f_back qualname = frame.f_code.co_qualname print(qualname) class GeneralPredictor: def __init__(self): debug_print() self.base_paths = { "Random Forest": "rf.pkl", "XGBoost": "xgb.json", "SVM": "svm.pkl", "LSTM": "lstm.pth", "1D-CNN": "cnn.pth" } self.raw_participant_buffer = [] self.current_target = "" self.scaler_cache = {} def add_to_raw_buffer(self, raw_payload, y_labels): """ Adds a participant's raw kinematic components to the pool. raw_payload should contain: 'z_kps', 'directions', 'raw_kps' """ debug_print() entry = { "raw_data": raw_payload, "labels": y_labels } self.raw_participant_buffer.append(entry) return f"Added participant to pool. Total participants: {len(self.raw_participant_buffer)}" def clear_buffer(self): """Clears the raw pool.""" debug_print() self.raw_participant_buffer = [] def calculate_and_train(self, model_type, target_name): """ The 'On-the-Fly' engine. Loops through the raw buffer, calculates features for the SELECTED target, and trains. """ debug_print() self.current_target = target_name all_X = [] all_y = [] # 1. Process every participant in the pool for participant in self.raw_participant_buffer: raw = participant["raw_data"] all_tracks = participant["labels"] # Pull the specific track that was requested track_key = f"OBS: {target_name}" if track_key not in all_tracks: print(f"Warning: Track {track_key} not found for a participant. Skipping.") continue y = all_tracks[track_key] # Extract lists from the payload z_scores = raw["z_kps"] dirs = raw["directions"] kpts = raw["raw_kps"] # Calculate geometric features for every frame participant_features = [] for i in range(len(y)): feat = self.format_features(z_scores[i], dirs[i], kpts[i]) participant_features.append(feat) all_X.append(np.array(participant_features)) all_y.append(y) # 2. Prepare for Training X_combined = np.vstack(all_X) y_combined = np.concatenate(all_y) # 3. Scale the data specifically for this target/model combo scaler = StandardScaler() X_scaled = scaler.fit_transform(X_combined) scaler_path = self.get_path(model_type, is_scaler=True) joblib.dump(scaler, scaler_path) # 4. Train/Test Split X_train, X_test, y_train, y_test = train_test_split( X_scaled, y_combined, test_size=0.2, stratify=y_combined, random_state=42 ) # 5. Process with corresponding Model if model_type == "Random Forest": model = RandomForestClassifier(max_depth=15, n_estimators=100, class_weight="balanced") model.fit(X_train, y_train) # Save the model save_path = self.get_path(model_type) joblib.dump(model, save_path) y_pred = model.predict(X_test) # Feature Importance for the UI labels_names = self.get_feature_labels() importances = model.feature_importances_ feature_data = sorted(zip(labels_names, importances), key=lambda x: x[1], reverse=True) ui_extras = "Top Predictors:
" + "
".join([f"{n}: {v:.3f}" for n, v in feature_data]) file_extras = "Top Predictors:\n" + "\n".join([f"- {n}: {v:.3f}" for n, v in feature_data]) return self._evaluate_and_report(model_type, y_test, y_pred, ui_extras=ui_extras, file_extras=file_extras, target_name=target_name) # TODO: More than random forest else: return "Model type not yet implemented in calculate_and_train." def get_path(self, model_type, is_scaler=False): """Returns the specific file path for the target/model or its scaler.""" debug_print() suffix = self.base_paths[model_type] if is_scaler: suffix = suffix.split('.')[0] + "_scaler.pkl" return f"ml_{self.current_target}_{suffix}" def get_feature_labels(self): """Returns labels only for features active in the current target.""" debug_print() active_keys = ACTIVITY_MAP.get(self.current_target, []) return active_keys def format_features(self, z_scores, directions, kpts): """The 'Universal Parser' for geometric features.""" # debug_print() # Internal Math Helpers if self.current_target == "ALL_FEATURES": active_list = list(GEOMETRY_LIBRARY.keys()) else: active_list = ACTIVITY_MAP.get(self.current_target, ACTIVITY_MAP["Mouthing"]) def resolve_pt(idx): if isinstance(idx, list): # Calculate midpoint of all indices in the list pts = [kpts[i] for i in idx] return np.mean(pts, axis=0) return kpts[idx] def get_dist(p1, p2): return np.linalg.norm(p1 - p2) def get_angle(a, b, c): try: ba, bc = a - b, c - b denom = (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6) cos = np.dot(ba, bc) / denom return np.degrees(np.arccos(np.clip(cos, -1.0, 1.0))) / 180.0 except: return 0.0 calculated_pool = {} try: if kpts is None or len(kpts) < 13: raise ValueError() # Reference scale (Shoulders) scale = get_dist(kpts[5], kpts[6]) + 1e-6 # First Pass: Direct Geometries for name, (f_type, indices, *meta) in GEOMETRY_LIBRARY.items(): if f_type == "dist": # Use resolve_pt for both indices p1 = resolve_pt(indices[0]) p2 = resolve_pt(indices[1]) calculated_pool[name] = get_dist(p1, p2) / scale elif f_type == "angle": # Use resolve_pt for all three indices p1 = resolve_pt(indices[0]) p2 = resolve_pt(indices[1]) p3 = resolve_pt(indices[2]) calculated_pool[name] = get_angle(p1, p2, p3) elif f_type == "z_diff": # Z-scores are usually single indices, but we handle lists just in case z1 = np.mean([z_scores[i] for i in indices[0]]) if isinstance(indices[0], list) else z_scores[indices[0]] z2 = np.mean([z_scores[i] for i in indices[1]]) if isinstance(indices[1], list) else z_scores[indices[1]] calculated_pool[name] = abs(z1 - z2) elif f_type == "head_offset": p_target = resolve_pt(indices[0]) p_mid = resolve_pt([indices[1], indices[2]]) # Midpoint of shoulders calculated_pool[name] = abs(p_target[0] - p_mid[0]) / scale # Second Pass: Composite Geometries (Subtractions/Symmetry) # We do this after so 'dist_l_ear_r_shld' is already calculated for name, (f_type, indices, *meta) in GEOMETRY_LIBRARY.items(): if f_type == "subtraction": calculated_pool[name] = calculated_pool[indices[0]] - calculated_pool[indices[1]] elif f_type == "abs_subtraction": calculated_pool[name] = abs(calculated_pool[indices[0]] - calculated_pool[indices[1]]) except Exception: # If a frame fails, fill the pool with zeros to prevent crashes calculated_pool = {name: 0.0 for name in GEOMETRY_LIBRARY.keys()} # Final Extraction based on current_target active_list = ACTIVITY_MAP.get(self.current_target, ACTIVITY_MAP["Mouthing"]) feature_vector = [calculated_pool[feat] for feat in active_list] return np.array(feature_vector, dtype=np.float32) def _prepare_pool_data(self): """Merges buffer and fits scaler.""" debug_print() if not self.X_buffer: return None, None, None X_total = np.vstack(self.X_buffer) y_total = np.concatenate(self.y_buffer) # We always fit a fresh scaler on the current pool scaler_file = f"{self.current_target}_scaler.pkl" scaler = StandardScaler() X_scaled = scaler.fit_transform(X_total) joblib.dump(scaler, scaler_file) return X_scaled, y_total, scaler def _evaluate_and_report(self, model_name, y_test, y_pred, extra_text="", ui_extras="", file_extras="", target_name=""): """Generates unified metrics, confusion matrix, and reports for ANY model""" debug_print() prec = precision_score(y_test, y_pred, zero_division=0) rec = recall_score(y_test, y_pred, zero_division=0) f1 = f1_score(y_test, y_pred, zero_division=0) target = getattr(self, 'current_target', 'Activity') display_labels = ['Rest', target] # Plot Confusion Matrix cm = confusion_matrix(y_test, y_pred) plt.figure(figsize=(8, 6)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=display_labels, yticklabels=display_labels) plt.title(f'{model_name} Detection: Predicted vs Actual') plt.ylabel('Actual State') plt.xlabel('Predicted State') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') plt.savefig(f"ml_{target_name}_confusion_matrix_rf_{timestamp}.png") plt.close() # Classification Report String report_str = classification_report(y_test, y_pred, target_names=display_labels, zero_division=0) # Build TXT File Content report_text = f"MODEL PERFORMANCE REPORT: {model_name}\nGenerated: {timestamp}\n" report_text += "="*40 + "\n" report_text += report_str + "\n" report_text += f"Precision: {prec:.4f}\nRecall: {rec:.4f}\nF1-Score: {f1:.4f}\n" report_text += "="*40 + "\n" + extra_text report_text += "="*40 + "\n" + file_extras with open(f"ml_{target_name}_performance_rf_{timestamp}.txt", "w") as f: f.write(report_text) # Build UI String ui_report = f""" {model_name} Performance:
Precision: {prec:.2f} | Recall: {rec:.2f} | F1: {f1:.2f}

{ui_extras} """ return ui_report def calculate_directions(self, analysis_kps): debug_print() all_dirs = np.zeros((len(analysis_kps), 17)) for f in range(1, len(analysis_kps)): deltas = analysis_kps[f] - analysis_kps[f-1] # Shape (17, 2) angles = np.arctan2(-deltas[:, 1], deltas[:, 0]) all_dirs[f] = angles return all_dirs