Files
blazes/predictor.py
2026-03-13 20:53:20 -07:00

405 lines
16 KiB
Python

"""
Filename: predictor.py
Description: BLAZES machine learning
Author: Tyler de Zeeuw
License: GPL-3.0
"""
# Built-in imports
import inspect
from datetime import datetime
# External library imports
import numpy as np
import joblib
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, f1_score, precision_score, recall_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
# To be used once multiple models are supported and functioning:
# import torch
# import torch.nn as nn
# import torch.optim as optim
# import xgboost as xgb
# from sklearn.svm import SVC
# import os
VERBOSITY = 1
GEOMETRY_LIBRARY = {
# --- Distances (Point A, Point B) ---
"dist_l_wrist_nose": ("dist", [9, 0], True),
"dist_r_wrist_nose": ("dist", [10, 0], True),
"dist_l_ear_r_shld": ("dist", [3, 6], True),
"dist_r_ear_l_shld": ("dist", [4, 5], True),
"dist_l_wrist_pelvis": ("dist", [9, [11, 12]], True),
"dist_r_wrist_pelvis": ("dist", [10, [11, 12]], True),
"dist_l_ankl_pelvis": ("dist", [15, [11, 12]], True),
"dist_r_ankl_pelvis": ("dist", [16, [11, 12]], True),
"dist_nose_pelvis": ("dist", [0, [11, 12]], True),
"dist_ankl_ankl": ("dist", [15, 16], True),
# NEW: Cross-Body and Pure Extension Distances
"dist_l_wri_r_shld": ("dist", [9, 6], True), # Reach across body
"dist_r_wri_l_shld": ("dist", [10, 5], True), # Reach across body
"dist_l_wri_l_shld": ("dist", [9, 5], True), # Pure arm extension
"dist_r_wri_r_shld": ("dist", [10, 6], True), # Pure arm extension
# --- Angles (Point A, Center B, Point C) ---
"angle_l_elbow": ("angle", [5, 7, 9]),
"angle_r_elbow": ("angle", [6, 8, 10]),
"angle_l_shoulder": ("angle", [11, 5, 7]),
"angle_r_shoulder": ("angle", [12, 6, 8]),
"angle_l_knee": ("angle", [11, 13, 15]),
"angle_r_knee": ("angle", [12, 14, 16]),
"angle_l_hip": ("angle", [5, 11, 13]),
"angle_r_hip": ("angle", [6, 12, 14]),
# --- Custom/Derived ---
"asym_wrist": ("z_diff", [9, 10]),
"asym_ankl": ("z_diff", [15, 16]),
"offset_head": ("head_offset", [0, 5, 6]),
"diff_ear_shld": ("subtraction", ["dist_l_ear_r_shld", "dist_r_ear_l_shld"]),
"abs_diff_ear_shld": ("abs_subtraction", ["dist_l_ear_r_shld", "dist_r_ear_l_shld"]),
# NEW: Verticality and Contralateral Contrast
"height_l_ankl": ("y_diff", [15, 11]), # Foot height relative to hip
"height_r_ankl": ("y_diff", [16, 12]), # Foot height relative to hip
"diff_knee_angle": ("subtraction", ["angle_l_knee", "angle_r_knee"]),
"asym_wri_shld": ("subtraction", ["dist_l_wri_l_shld", "dist_r_wri_r_shld"])
}
# The Target Activity Map
ACTIVITY_MAP = {
"Mouthing": [
"dist_l_wrist_nose", "dist_r_wrist_nose", "angle_l_elbow",
"angle_r_elbow", "angle_l_shoulder", "angle_r_shoulder",
"asym_wrist", "offset_head"
],
"Head Movement": [
"dist_l_wrist_nose", "dist_r_wrist_nose", "angle_l_elbow",
"angle_r_elbow", "angle_l_shoulder", "angle_r_shoulder",
"asym_wrist", "offset_head", "dist_l_ear_r_shld",
"dist_r_ear_l_shld", "diff_ear_shld", "abs_diff_ear_shld"
],
"Reach (Left)": [
"dist_l_wrist_pelvis", "dist_l_wrist_nose", "dist_l_wri_l_shld",
"dist_l_wri_r_shld", "angle_l_elbow", "angle_l_shoulder",
"asym_wri_shld"
],
"Reach (Right)": [
"dist_r_wrist_pelvis", "dist_r_wrist_nose", "dist_r_wri_r_shld",
"dist_r_wri_l_shld", "angle_r_elbow", "angle_r_shoulder",
"asym_wri_shld"
],
"Kick (Left)": [
"dist_l_ankl_pelvis", "angle_l_knee", "angle_l_hip",
"height_l_ankl", "dist_ankl_ankl", "asym_ankl",
"diff_knee_angle", "dist_nose_pelvis"
],
"Kick (Right)": [
"dist_r_ankl_pelvis", "angle_r_knee", "angle_r_hip",
"height_r_ankl", "dist_ankl_ankl", "asym_ankl",
"diff_knee_angle", "dist_nose_pelvis"
]
}
def debug_print():
if VERBOSITY:
frame = inspect.currentframe().f_back
qualname = frame.f_code.co_qualname
print(qualname)
class GeneralPredictor:
def __init__(self):
debug_print()
self.base_paths = {
"Random Forest": "rf.pkl",
"XGBoost": "xgb.json",
"SVM": "svm.pkl",
"LSTM": "lstm.pth",
"1D-CNN": "cnn.pth"
}
self.raw_participant_buffer = []
self.current_target = ""
self.scaler_cache = {}
def add_to_raw_buffer(self, raw_payload, y_labels):
"""
Adds a participant's raw kinematic components to the pool.
raw_payload should contain: 'z_kps', 'directions', 'raw_kps'
"""
debug_print()
entry = {
"raw_data": raw_payload,
"labels": y_labels
}
self.raw_participant_buffer.append(entry)
return f"Added participant to pool. Total participants: {len(self.raw_participant_buffer)}"
def clear_buffer(self):
"""Clears the raw pool."""
debug_print()
self.raw_participant_buffer = []
def calculate_and_train(self, model_type, target_name):
"""
The 'On-the-Fly' engine. Loops through the raw buffer,
calculates features for the SELECTED target, and trains.
"""
debug_print()
self.current_target = target_name
all_X = []
all_y = []
# 1. Process every participant in the pool
for participant in self.raw_participant_buffer:
raw = participant["raw_data"]
all_tracks = participant["labels"]
# Pull the specific track that was requested
track_key = f"OBS: {target_name}"
if track_key not in all_tracks:
print(f"Warning: Track {track_key} not found for a participant. Skipping.")
continue
y = all_tracks[track_key]
# Extract lists from the payload
z_scores = raw["z_kps"]
dirs = raw["directions"]
kpts = raw["raw_kps"]
# Calculate geometric features for every frame
participant_features = []
for i in range(len(y)):
feat = self.format_features(z_scores[i], dirs[i], kpts[i])
participant_features.append(feat)
all_X.append(np.array(participant_features))
all_y.append(y)
# 2. Prepare for Training
X_combined = np.vstack(all_X)
y_combined = np.concatenate(all_y)
# 3. Scale the data specifically for this target/model combo
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_combined)
scaler_path = self.get_path(model_type, is_scaler=True)
joblib.dump(scaler, scaler_path)
# 4. Train/Test Split
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y_combined, test_size=0.2, stratify=y_combined, random_state=42
)
# 5. Process with corresponding Model
if model_type == "Random Forest":
model = RandomForestClassifier(max_depth=15, n_estimators=100, class_weight="balanced")
model.fit(X_train, y_train)
# Save the model
save_path = self.get_path(model_type)
joblib.dump(model, save_path)
y_pred = model.predict(X_test)
# Feature Importance for the UI
labels_names = self.get_feature_labels()
importances = model.feature_importances_
feature_data = sorted(zip(labels_names, importances), key=lambda x: x[1], reverse=True)
ui_extras = "<b>Top Predictors:</b><br>" + "<br>".join([f"{n}: {v:.3f}" for n, v in feature_data])
file_extras = "Top Predictors:\n" + "\n".join([f"- {n}: {v:.3f}" for n, v in feature_data])
return self._evaluate_and_report(model_type, y_test, y_pred, ui_extras=ui_extras, file_extras=file_extras, target_name=target_name)
# TODO: More than random forest
else:
return "Model type not yet implemented in calculate_and_train."
def get_path(self, model_type, is_scaler=False):
"""Returns the specific file path for the target/model or its scaler."""
debug_print()
suffix = self.base_paths[model_type]
if is_scaler:
suffix = suffix.split('.')[0] + "_scaler.pkl"
return f"ml_{self.current_target}_{suffix}"
def get_feature_labels(self):
"""Returns labels only for features active in the current target."""
debug_print()
active_keys = ACTIVITY_MAP.get(self.current_target, [])
return active_keys
def format_features(self, z_scores, directions, kpts):
"""The 'Universal Parser' for geometric features."""
# debug_print()
# Internal Math Helpers
if self.current_target == "ALL_FEATURES":
active_list = list(GEOMETRY_LIBRARY.keys())
else:
active_list = ACTIVITY_MAP.get(self.current_target, ACTIVITY_MAP["Mouthing"])
def resolve_pt(idx):
if isinstance(idx, list):
# Calculate midpoint of all indices in the list
pts = [kpts[i] for i in idx]
return np.mean(pts, axis=0)
return kpts[idx]
def get_dist(p1, p2): return np.linalg.norm(p1 - p2)
def get_angle(a, b, c):
try:
ba, bc = a - b, c - b
denom = (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
cos = np.dot(ba, bc) / denom
return np.degrees(np.arccos(np.clip(cos, -1.0, 1.0))) / 180.0
except: return 0.0
calculated_pool = {}
try:
if kpts is None or len(kpts) < 13: raise ValueError()
# Reference scale (Shoulders)
scale = get_dist(kpts[5], kpts[6]) + 1e-6
# First Pass: Direct Geometries
for name, (f_type, indices, *meta) in GEOMETRY_LIBRARY.items():
if f_type == "dist":
# Use resolve_pt for both indices
p1 = resolve_pt(indices[0])
p2 = resolve_pt(indices[1])
calculated_pool[name] = get_dist(p1, p2) / scale
elif f_type == "angle":
# Use resolve_pt for all three indices
p1 = resolve_pt(indices[0])
p2 = resolve_pt(indices[1])
p3 = resolve_pt(indices[2])
calculated_pool[name] = get_angle(p1, p2, p3)
elif f_type == "z_diff":
# Z-scores are usually single indices, but we handle lists just in case
z1 = np.mean([z_scores[i] for i in indices[0]]) if isinstance(indices[0], list) else z_scores[indices[0]]
z2 = np.mean([z_scores[i] for i in indices[1]]) if isinstance(indices[1], list) else z_scores[indices[1]]
calculated_pool[name] = abs(z1 - z2)
elif f_type == "head_offset":
p_target = resolve_pt(indices[0])
p_mid = resolve_pt([indices[1], indices[2]]) # Midpoint of shoulders
calculated_pool[name] = abs(p_target[0] - p_mid[0]) / scale
# Second Pass: Composite Geometries (Subtractions/Symmetry)
# We do this after so 'dist_l_ear_r_shld' is already calculated
for name, (f_type, indices, *meta) in GEOMETRY_LIBRARY.items():
if f_type == "subtraction":
calculated_pool[name] = calculated_pool[indices[0]] - calculated_pool[indices[1]]
elif f_type == "abs_subtraction":
calculated_pool[name] = abs(calculated_pool[indices[0]] - calculated_pool[indices[1]])
except Exception:
# If a frame fails, fill the pool with zeros to prevent crashes
calculated_pool = {name: 0.0 for name in GEOMETRY_LIBRARY.keys()}
# Final Extraction based on current_target
active_list = ACTIVITY_MAP.get(self.current_target, ACTIVITY_MAP["Mouthing"])
feature_vector = [calculated_pool[feat] for feat in active_list]
return np.array(feature_vector, dtype=np.float32)
def _prepare_pool_data(self):
"""Merges buffer and fits scaler."""
debug_print()
if not self.X_buffer:
return None, None, None
X_total = np.vstack(self.X_buffer)
y_total = np.concatenate(self.y_buffer)
# We always fit a fresh scaler on the current pool
scaler_file = f"{self.current_target}_scaler.pkl"
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_total)
joblib.dump(scaler, scaler_file)
return X_scaled, y_total, scaler
def _evaluate_and_report(self, model_name, y_test, y_pred, extra_text="", ui_extras="", file_extras="", target_name=""):
"""Generates unified metrics, confusion matrix, and reports for ANY model"""
debug_print()
prec = precision_score(y_test, y_pred, zero_division=0)
rec = recall_score(y_test, y_pred, zero_division=0)
f1 = f1_score(y_test, y_pred, zero_division=0)
target = getattr(self, 'current_target', 'Activity')
display_labels = ['Rest', target]
# Plot Confusion Matrix
cm = confusion_matrix(y_test, y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=display_labels,
yticklabels=display_labels)
plt.title(f'{model_name} Detection: Predicted vs Actual')
plt.ylabel('Actual State')
plt.xlabel('Predicted State')
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
plt.savefig(f"ml_{target_name}_confusion_matrix_rf_{timestamp}.png")
plt.close()
# Classification Report String
report_str = classification_report(y_test, y_pred,
target_names=display_labels,
zero_division=0)
# Build TXT File Content
report_text = f"MODEL PERFORMANCE REPORT: {model_name}\nGenerated: {timestamp}\n"
report_text += "="*40 + "\n"
report_text += report_str + "\n"
report_text += f"Precision: {prec:.4f}\nRecall: {rec:.4f}\nF1-Score: {f1:.4f}\n"
report_text += "="*40 + "\n" + extra_text
report_text += "="*40 + "\n" + file_extras
with open(f"ml_{target_name}_performance_rf_{timestamp}.txt", "w") as f:
f.write(report_text)
# Build UI String
ui_report = f"""
<b>{model_name} Performance:</b><br>
Precision: {prec:.2f} | Recall: {rec:.2f} | <b>F1: {f1:.2f}</b><br>
<hr>
{ui_extras}
"""
return ui_report
def calculate_directions(self, analysis_kps):
debug_print()
all_dirs = np.zeros((len(analysis_kps), 17))
for f in range(1, len(analysis_kps)):
deltas = analysis_kps[f] - analysis_kps[f-1] # Shape (17, 2)
angles = np.arctan2(-deltas[:, 1], deltas[:, 0])
all_dirs[f] = angles
return all_dirs