405 lines
16 KiB
Python
405 lines
16 KiB
Python
"""
|
|
Filename: predictor.py
|
|
Description: BLAZES machine learning
|
|
|
|
Author: Tyler de Zeeuw
|
|
License: GPL-3.0
|
|
"""
|
|
|
|
# Built-in imports
|
|
import inspect
|
|
from datetime import datetime
|
|
|
|
# External library imports
|
|
import numpy as np
|
|
import joblib
|
|
import seaborn as sns
|
|
import matplotlib.pyplot as plt
|
|
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.metrics import classification_report, f1_score, precision_score, recall_score, confusion_matrix
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
|
# To be used once multiple models are supported and functioning:
|
|
# import torch
|
|
# import torch.nn as nn
|
|
# import torch.optim as optim
|
|
# import xgboost as xgb
|
|
# from sklearn.svm import SVC
|
|
# import os
|
|
|
|
VERBOSITY = 1
|
|
|
|
GEOMETRY_LIBRARY = {
|
|
# --- Distances (Point A, Point B) ---
|
|
"dist_l_wrist_nose": ("dist", [9, 0], True),
|
|
"dist_r_wrist_nose": ("dist", [10, 0], True),
|
|
"dist_l_ear_r_shld": ("dist", [3, 6], True),
|
|
"dist_r_ear_l_shld": ("dist", [4, 5], True),
|
|
|
|
"dist_l_wrist_pelvis": ("dist", [9, [11, 12]], True),
|
|
"dist_r_wrist_pelvis": ("dist", [10, [11, 12]], True),
|
|
"dist_l_ankl_pelvis": ("dist", [15, [11, 12]], True),
|
|
"dist_r_ankl_pelvis": ("dist", [16, [11, 12]], True),
|
|
"dist_nose_pelvis": ("dist", [0, [11, 12]], True),
|
|
"dist_ankl_ankl": ("dist", [15, 16], True),
|
|
|
|
# NEW: Cross-Body and Pure Extension Distances
|
|
"dist_l_wri_r_shld": ("dist", [9, 6], True), # Reach across body
|
|
"dist_r_wri_l_shld": ("dist", [10, 5], True), # Reach across body
|
|
"dist_l_wri_l_shld": ("dist", [9, 5], True), # Pure arm extension
|
|
"dist_r_wri_r_shld": ("dist", [10, 6], True), # Pure arm extension
|
|
|
|
# --- Angles (Point A, Center B, Point C) ---
|
|
"angle_l_elbow": ("angle", [5, 7, 9]),
|
|
"angle_r_elbow": ("angle", [6, 8, 10]),
|
|
"angle_l_shoulder": ("angle", [11, 5, 7]),
|
|
"angle_r_shoulder": ("angle", [12, 6, 8]),
|
|
"angle_l_knee": ("angle", [11, 13, 15]),
|
|
"angle_r_knee": ("angle", [12, 14, 16]),
|
|
"angle_l_hip": ("angle", [5, 11, 13]),
|
|
"angle_r_hip": ("angle", [6, 12, 14]),
|
|
|
|
# --- Custom/Derived ---
|
|
"asym_wrist": ("z_diff", [9, 10]),
|
|
"asym_ankl": ("z_diff", [15, 16]),
|
|
"offset_head": ("head_offset", [0, 5, 6]),
|
|
"diff_ear_shld": ("subtraction", ["dist_l_ear_r_shld", "dist_r_ear_l_shld"]),
|
|
"abs_diff_ear_shld": ("abs_subtraction", ["dist_l_ear_r_shld", "dist_r_ear_l_shld"]),
|
|
|
|
# NEW: Verticality and Contralateral Contrast
|
|
"height_l_ankl": ("y_diff", [15, 11]), # Foot height relative to hip
|
|
"height_r_ankl": ("y_diff", [16, 12]), # Foot height relative to hip
|
|
"diff_knee_angle": ("subtraction", ["angle_l_knee", "angle_r_knee"]),
|
|
"asym_wri_shld": ("subtraction", ["dist_l_wri_l_shld", "dist_r_wri_r_shld"])
|
|
}
|
|
|
|
|
|
# The Target Activity Map
|
|
ACTIVITY_MAP = {
|
|
"Mouthing": [
|
|
"dist_l_wrist_nose", "dist_r_wrist_nose", "angle_l_elbow",
|
|
"angle_r_elbow", "angle_l_shoulder", "angle_r_shoulder",
|
|
"asym_wrist", "offset_head"
|
|
],
|
|
"Head Movement": [
|
|
"dist_l_wrist_nose", "dist_r_wrist_nose", "angle_l_elbow",
|
|
"angle_r_elbow", "angle_l_shoulder", "angle_r_shoulder",
|
|
"asym_wrist", "offset_head", "dist_l_ear_r_shld",
|
|
"dist_r_ear_l_shld", "diff_ear_shld", "abs_diff_ear_shld"
|
|
],
|
|
"Reach (Left)": [
|
|
"dist_l_wrist_pelvis", "dist_l_wrist_nose", "dist_l_wri_l_shld",
|
|
"dist_l_wri_r_shld", "angle_l_elbow", "angle_l_shoulder",
|
|
"asym_wri_shld"
|
|
],
|
|
"Reach (Right)": [
|
|
"dist_r_wrist_pelvis", "dist_r_wrist_nose", "dist_r_wri_r_shld",
|
|
"dist_r_wri_l_shld", "angle_r_elbow", "angle_r_shoulder",
|
|
"asym_wri_shld"
|
|
],
|
|
"Kick (Left)": [
|
|
"dist_l_ankl_pelvis", "angle_l_knee", "angle_l_hip",
|
|
"height_l_ankl", "dist_ankl_ankl", "asym_ankl",
|
|
"diff_knee_angle", "dist_nose_pelvis"
|
|
],
|
|
"Kick (Right)": [
|
|
"dist_r_ankl_pelvis", "angle_r_knee", "angle_r_hip",
|
|
"height_r_ankl", "dist_ankl_ankl", "asym_ankl",
|
|
"diff_knee_angle", "dist_nose_pelvis"
|
|
]
|
|
}
|
|
|
|
|
|
def debug_print():
|
|
if VERBOSITY:
|
|
frame = inspect.currentframe().f_back
|
|
qualname = frame.f_code.co_qualname
|
|
print(qualname)
|
|
|
|
|
|
class GeneralPredictor:
|
|
def __init__(self):
|
|
debug_print()
|
|
self.base_paths = {
|
|
"Random Forest": "rf.pkl",
|
|
"XGBoost": "xgb.json",
|
|
"SVM": "svm.pkl",
|
|
"LSTM": "lstm.pth",
|
|
"1D-CNN": "cnn.pth"
|
|
}
|
|
self.raw_participant_buffer = []
|
|
self.current_target = ""
|
|
self.scaler_cache = {}
|
|
|
|
|
|
def add_to_raw_buffer(self, raw_payload, y_labels):
|
|
"""
|
|
Adds a participant's raw kinematic components to the pool.
|
|
raw_payload should contain: 'z_kps', 'directions', 'raw_kps'
|
|
"""
|
|
debug_print()
|
|
entry = {
|
|
"raw_data": raw_payload,
|
|
"labels": y_labels
|
|
}
|
|
self.raw_participant_buffer.append(entry)
|
|
return f"Added participant to pool. Total participants: {len(self.raw_participant_buffer)}"
|
|
|
|
|
|
def clear_buffer(self):
|
|
"""Clears the raw pool."""
|
|
debug_print()
|
|
self.raw_participant_buffer = []
|
|
|
|
|
|
def calculate_and_train(self, model_type, target_name):
|
|
"""
|
|
The 'On-the-Fly' engine. Loops through the raw buffer,
|
|
calculates features for the SELECTED target, and trains.
|
|
"""
|
|
debug_print()
|
|
self.current_target = target_name
|
|
all_X = []
|
|
all_y = []
|
|
|
|
# 1. Process every participant in the pool
|
|
for participant in self.raw_participant_buffer:
|
|
raw = participant["raw_data"]
|
|
all_tracks = participant["labels"]
|
|
|
|
# Pull the specific track that was requested
|
|
track_key = f"OBS: {target_name}"
|
|
if track_key not in all_tracks:
|
|
print(f"Warning: Track {track_key} not found for a participant. Skipping.")
|
|
continue
|
|
|
|
y = all_tracks[track_key]
|
|
|
|
# Extract lists from the payload
|
|
z_scores = raw["z_kps"]
|
|
dirs = raw["directions"]
|
|
kpts = raw["raw_kps"]
|
|
|
|
# Calculate geometric features for every frame
|
|
participant_features = []
|
|
for i in range(len(y)):
|
|
feat = self.format_features(z_scores[i], dirs[i], kpts[i])
|
|
participant_features.append(feat)
|
|
|
|
all_X.append(np.array(participant_features))
|
|
all_y.append(y)
|
|
|
|
# 2. Prepare for Training
|
|
X_combined = np.vstack(all_X)
|
|
y_combined = np.concatenate(all_y)
|
|
|
|
# 3. Scale the data specifically for this target/model combo
|
|
scaler = StandardScaler()
|
|
X_scaled = scaler.fit_transform(X_combined)
|
|
scaler_path = self.get_path(model_type, is_scaler=True)
|
|
joblib.dump(scaler, scaler_path)
|
|
|
|
# 4. Train/Test Split
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X_scaled, y_combined, test_size=0.2, stratify=y_combined, random_state=42
|
|
)
|
|
|
|
# 5. Process with corresponding Model
|
|
if model_type == "Random Forest":
|
|
model = RandomForestClassifier(max_depth=15, n_estimators=100, class_weight="balanced")
|
|
model.fit(X_train, y_train)
|
|
|
|
# Save the model
|
|
save_path = self.get_path(model_type)
|
|
joblib.dump(model, save_path)
|
|
|
|
y_pred = model.predict(X_test)
|
|
|
|
# Feature Importance for the UI
|
|
labels_names = self.get_feature_labels()
|
|
importances = model.feature_importances_
|
|
feature_data = sorted(zip(labels_names, importances), key=lambda x: x[1], reverse=True)
|
|
ui_extras = "<b>Top Predictors:</b><br>" + "<br>".join([f"{n}: {v:.3f}" for n, v in feature_data])
|
|
file_extras = "Top Predictors:\n" + "\n".join([f"- {n}: {v:.3f}" for n, v in feature_data])
|
|
|
|
return self._evaluate_and_report(model_type, y_test, y_pred, ui_extras=ui_extras, file_extras=file_extras, target_name=target_name)
|
|
|
|
# TODO: More than random forest
|
|
else:
|
|
return "Model type not yet implemented in calculate_and_train."
|
|
|
|
|
|
def get_path(self, model_type, is_scaler=False):
|
|
"""Returns the specific file path for the target/model or its scaler."""
|
|
debug_print()
|
|
suffix = self.base_paths[model_type]
|
|
|
|
if is_scaler:
|
|
suffix = suffix.split('.')[0] + "_scaler.pkl"
|
|
|
|
return f"ml_{self.current_target}_{suffix}"
|
|
|
|
|
|
def get_feature_labels(self):
|
|
"""Returns labels only for features active in the current target."""
|
|
debug_print()
|
|
active_keys = ACTIVITY_MAP.get(self.current_target, [])
|
|
return active_keys
|
|
|
|
|
|
def format_features(self, z_scores, directions, kpts):
|
|
"""The 'Universal Parser' for geometric features."""
|
|
# debug_print()
|
|
# Internal Math Helpers
|
|
if self.current_target == "ALL_FEATURES":
|
|
active_list = list(GEOMETRY_LIBRARY.keys())
|
|
else:
|
|
active_list = ACTIVITY_MAP.get(self.current_target, ACTIVITY_MAP["Mouthing"])
|
|
|
|
def resolve_pt(idx):
|
|
if isinstance(idx, list):
|
|
# Calculate midpoint of all indices in the list
|
|
pts = [kpts[i] for i in idx]
|
|
return np.mean(pts, axis=0)
|
|
return kpts[idx]
|
|
|
|
def get_dist(p1, p2): return np.linalg.norm(p1 - p2)
|
|
def get_angle(a, b, c):
|
|
try:
|
|
ba, bc = a - b, c - b
|
|
denom = (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
|
|
cos = np.dot(ba, bc) / denom
|
|
return np.degrees(np.arccos(np.clip(cos, -1.0, 1.0))) / 180.0
|
|
except: return 0.0
|
|
|
|
calculated_pool = {}
|
|
|
|
try:
|
|
if kpts is None or len(kpts) < 13: raise ValueError()
|
|
# Reference scale (Shoulders)
|
|
scale = get_dist(kpts[5], kpts[6]) + 1e-6
|
|
|
|
# First Pass: Direct Geometries
|
|
for name, (f_type, indices, *meta) in GEOMETRY_LIBRARY.items():
|
|
if f_type == "dist":
|
|
# Use resolve_pt for both indices
|
|
p1 = resolve_pt(indices[0])
|
|
p2 = resolve_pt(indices[1])
|
|
calculated_pool[name] = get_dist(p1, p2) / scale
|
|
|
|
elif f_type == "angle":
|
|
# Use resolve_pt for all three indices
|
|
p1 = resolve_pt(indices[0])
|
|
p2 = resolve_pt(indices[1])
|
|
p3 = resolve_pt(indices[2])
|
|
calculated_pool[name] = get_angle(p1, p2, p3)
|
|
|
|
elif f_type == "z_diff":
|
|
# Z-scores are usually single indices, but we handle lists just in case
|
|
z1 = np.mean([z_scores[i] for i in indices[0]]) if isinstance(indices[0], list) else z_scores[indices[0]]
|
|
z2 = np.mean([z_scores[i] for i in indices[1]]) if isinstance(indices[1], list) else z_scores[indices[1]]
|
|
calculated_pool[name] = abs(z1 - z2)
|
|
|
|
elif f_type == "head_offset":
|
|
p_target = resolve_pt(indices[0])
|
|
p_mid = resolve_pt([indices[1], indices[2]]) # Midpoint of shoulders
|
|
calculated_pool[name] = abs(p_target[0] - p_mid[0]) / scale
|
|
|
|
# Second Pass: Composite Geometries (Subtractions/Symmetry)
|
|
# We do this after so 'dist_l_ear_r_shld' is already calculated
|
|
for name, (f_type, indices, *meta) in GEOMETRY_LIBRARY.items():
|
|
if f_type == "subtraction":
|
|
calculated_pool[name] = calculated_pool[indices[0]] - calculated_pool[indices[1]]
|
|
elif f_type == "abs_subtraction":
|
|
calculated_pool[name] = abs(calculated_pool[indices[0]] - calculated_pool[indices[1]])
|
|
|
|
except Exception:
|
|
# If a frame fails, fill the pool with zeros to prevent crashes
|
|
calculated_pool = {name: 0.0 for name in GEOMETRY_LIBRARY.keys()}
|
|
|
|
# Final Extraction based on current_target
|
|
|
|
active_list = ACTIVITY_MAP.get(self.current_target, ACTIVITY_MAP["Mouthing"])
|
|
feature_vector = [calculated_pool[feat] for feat in active_list]
|
|
|
|
return np.array(feature_vector, dtype=np.float32)
|
|
|
|
def _prepare_pool_data(self):
|
|
"""Merges buffer and fits scaler."""
|
|
debug_print()
|
|
if not self.X_buffer:
|
|
return None, None, None
|
|
|
|
X_total = np.vstack(self.X_buffer)
|
|
y_total = np.concatenate(self.y_buffer)
|
|
|
|
# We always fit a fresh scaler on the current pool
|
|
scaler_file = f"{self.current_target}_scaler.pkl"
|
|
scaler = StandardScaler()
|
|
X_scaled = scaler.fit_transform(X_total)
|
|
joblib.dump(scaler, scaler_file)
|
|
|
|
return X_scaled, y_total, scaler
|
|
|
|
|
|
def _evaluate_and_report(self, model_name, y_test, y_pred, extra_text="", ui_extras="", file_extras="", target_name=""):
|
|
"""Generates unified metrics, confusion matrix, and reports for ANY model"""
|
|
debug_print()
|
|
prec = precision_score(y_test, y_pred, zero_division=0)
|
|
rec = recall_score(y_test, y_pred, zero_division=0)
|
|
f1 = f1_score(y_test, y_pred, zero_division=0)
|
|
|
|
target = getattr(self, 'current_target', 'Activity')
|
|
display_labels = ['Rest', target]
|
|
# Plot Confusion Matrix
|
|
cm = confusion_matrix(y_test, y_pred)
|
|
plt.figure(figsize=(8, 6))
|
|
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
|
|
xticklabels=display_labels,
|
|
yticklabels=display_labels)
|
|
plt.title(f'{model_name} Detection: Predicted vs Actual')
|
|
plt.ylabel('Actual State')
|
|
plt.xlabel('Predicted State')
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
plt.savefig(f"ml_{target_name}_confusion_matrix_rf_{timestamp}.png")
|
|
plt.close()
|
|
|
|
# Classification Report String
|
|
report_str = classification_report(y_test, y_pred,
|
|
target_names=display_labels,
|
|
zero_division=0)
|
|
|
|
# Build TXT File Content
|
|
report_text = f"MODEL PERFORMANCE REPORT: {model_name}\nGenerated: {timestamp}\n"
|
|
report_text += "="*40 + "\n"
|
|
report_text += report_str + "\n"
|
|
report_text += f"Precision: {prec:.4f}\nRecall: {rec:.4f}\nF1-Score: {f1:.4f}\n"
|
|
report_text += "="*40 + "\n" + extra_text
|
|
report_text += "="*40 + "\n" + file_extras
|
|
|
|
with open(f"ml_{target_name}_performance_rf_{timestamp}.txt", "w") as f:
|
|
f.write(report_text)
|
|
|
|
# Build UI String
|
|
ui_report = f"""
|
|
<b>{model_name} Performance:</b><br>
|
|
Precision: {prec:.2f} | Recall: {rec:.2f} | <b>F1: {f1:.2f}</b><br>
|
|
<hr>
|
|
{ui_extras}
|
|
"""
|
|
return ui_report
|
|
|
|
def calculate_directions(self, analysis_kps):
|
|
debug_print()
|
|
all_dirs = np.zeros((len(analysis_kps), 17))
|
|
|
|
for f in range(1, len(analysis_kps)):
|
|
deltas = analysis_kps[f] - analysis_kps[f-1] # Shape (17, 2)
|
|
|
|
angles = np.arctan2(-deltas[:, 1], deltas[:, 0])
|
|
all_dirs[f] = angles
|
|
|
|
return all_dirs |