345 lines
14 KiB
Python
345 lines
14 KiB
Python
import inspect
|
|
import csv
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
|
|
import numpy as np
|
|
import joblib
|
|
import seaborn as sns
|
|
import matplotlib.pyplot as plt
|
|
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
from sklearn.model_selection import train_test_split
|
|
from sklearn.metrics import classification_report, f1_score, precision_score, recall_score, confusion_matrix
|
|
from sklearn.preprocessing import StandardScaler
|
|
|
|
VERBOSITY = 1
|
|
|
|
def load_analysis_config(path="analysis_config.json"):
|
|
with open(path, 'r') as f:
|
|
config = json.load(f)
|
|
return config['geometry_library'], config['activity_map']
|
|
|
|
try:
|
|
GEOMETRY_LIBRARY, ACTIVITY_MAP = load_analysis_config()
|
|
except FileNotFoundError:
|
|
GEOMETRY_LIBRARY, ACTIVITY_MAP = {}, {}
|
|
print("Warning: analysis_config.json not found. ML functions will fail.")
|
|
|
|
|
|
def debug_print():
|
|
if VERBOSITY:
|
|
frame = inspect.currentframe().f_back
|
|
qualname = frame.f_code.co_filename
|
|
print(f"DEBUG_PRINT: {qualname}")
|
|
|
|
|
|
class GeneralPredictor:
|
|
def __init__(self):
|
|
debug_print()
|
|
self.base_paths = {
|
|
"Random Forest": "rf.pkl"
|
|
}
|
|
self.current_target = ""
|
|
self.active_feature_keys = []
|
|
|
|
def calculate_and_train(self, training_params):
|
|
"""
|
|
Takes the dict from get_selection() in TrainModelWindow.
|
|
Loads CSV/JSON pairs, extracts combined features, and trains Random Forest.
|
|
"""
|
|
debug_print()
|
|
|
|
folder = training_params.get("folder")
|
|
pairs = training_params.get("pairs", [])
|
|
selected_behaviors = training_params.get("selected_behaviors", [])
|
|
self.current_target = training_params.get("target_name", "combined_model")
|
|
model_type = training_params.get("model_type", "Random Forest")
|
|
|
|
if not pairs or not selected_behaviors:
|
|
return "Error: Missing data pairs or target behaviors."
|
|
|
|
# 1. Determine the union of ALL needed geometric features across selected behaviors
|
|
needed_features = set()
|
|
for b_name in selected_behaviors:
|
|
req_feats = ACTIVITY_MAP.get(b_name, [])
|
|
needed_features.update(req_feats)
|
|
|
|
self.active_feature_keys = sorted(list(needed_features))
|
|
print(self.active_feature_keys)
|
|
|
|
model_metadata = {
|
|
"target_behavior": self.current_target,
|
|
"feature_keys": self.active_feature_keys,
|
|
"model_type": model_type,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
if not self.active_feature_keys:
|
|
return "Error: No geometric features mapped to the selected behavior(s) in analysis_config.json."
|
|
|
|
all_X = []
|
|
all_y = []
|
|
|
|
# 2. Process each Pair (JSON labels + CSV raw pose)
|
|
for json_path, csv_path in pairs:
|
|
# --- Load JSON Labels ---
|
|
try:
|
|
with open(json_path, 'r') as f:
|
|
label_data = json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading {json_path}: {e}")
|
|
continue
|
|
|
|
behaviors = label_data.get("behaviors", {})
|
|
|
|
# --- Load CSV Pose Data ---
|
|
try:
|
|
raw_kpts = []
|
|
with open(csv_path, 'r') as f:
|
|
reader = csv.reader(f)
|
|
next(reader) # skip header
|
|
for row in reader:
|
|
raw_kpts.append(np.array([float(x) for x in row]).reshape(17, 3))
|
|
raw_kpts = np.array(raw_kpts)
|
|
except Exception as e:
|
|
print(f"Error loading {csv_path}: {e}")
|
|
continue
|
|
|
|
total_frames = len(raw_kpts)
|
|
if total_frames == 0:
|
|
continue
|
|
|
|
# Create binary target array (0 = Rest, 1 = Active)
|
|
y_vector = np.zeros(total_frames, dtype=int)
|
|
|
|
# If the frame falls inside ANY of the selected behaviors, mark it 1
|
|
for b_name in selected_behaviors:
|
|
instances = behaviors.get(b_name, [])
|
|
for inst in instances:
|
|
start = inst.get("start_frame", 0)
|
|
duration = inst.get("duration_frames", 0)
|
|
end = min(start + duration, total_frames)
|
|
y_vector[start:end] = 1
|
|
|
|
# --- Calculate Features per Frame ---
|
|
# To match the new flow, we just need raw_kpts.
|
|
# (Z-scores were previously passed, but those were derived from raw anyway.
|
|
# If you require normalized z-scores for RF, you must recalculate them here
|
|
# using the same baseline logic from the main window. For now, we extract raw geom.)
|
|
|
|
participant_features = []
|
|
for i in range(total_frames):
|
|
kpts = raw_kpts[i] # Shape (17, 3)
|
|
feat = self.format_features(kpts)
|
|
participant_features.append(feat)
|
|
|
|
all_X.append(np.array(participant_features))
|
|
all_y.append(y_vector)
|
|
|
|
# 3. Prepare for Training
|
|
if not all_X:
|
|
return "Error: No valid data extracted from files."
|
|
|
|
X_combined = np.vstack(all_X)
|
|
y_combined = np.concatenate(all_y)
|
|
|
|
# Check for class imbalance edge case (e.g. 0 instances of behavior found)
|
|
if len(np.unique(y_combined)) < 2:
|
|
return "Error: Training data only contains one class (usually 0/Rest). Model cannot train."
|
|
|
|
metadata_path = self.get_path(model_type).replace(".pkl", "_metadata.json")
|
|
with open(metadata_path, 'w') as f:
|
|
json.dump(model_metadata, f, indent=4)
|
|
|
|
print(f"[INFO] Metadata saved to: {metadata_path}")
|
|
|
|
# 4. Scale Data
|
|
scaler = StandardScaler()
|
|
X_scaled = scaler.fit_transform(X_combined)
|
|
scaler_path = self.get_path(model_type, is_scaler=True)
|
|
joblib.dump(scaler, scaler_path)
|
|
|
|
# 5. Train/Test Split
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X_scaled, y_combined, test_size=0.2, stratify=y_combined, random_state=42
|
|
)
|
|
|
|
# 6. Train Random Forest (Placeholders exist for others)
|
|
if model_type == "Random Forest":
|
|
model = RandomForestClassifier(max_depth=15, n_estimators=100, class_weight="balanced")
|
|
model.fit(X_train, y_train)
|
|
|
|
save_path = self.get_path(model_type)
|
|
joblib.dump(model, save_path)
|
|
|
|
y_pred = model.predict(X_test)
|
|
|
|
# Feature Importance
|
|
importances = model.feature_importances_
|
|
feature_data = sorted(zip(self.active_feature_keys, importances), key=lambda x: x[1], reverse=True)
|
|
|
|
ui_extras = "<b>Top Predictors:</b><br>" + "<br>".join([f"{n}: {v:.3f}" for n, v in feature_data[:10]])
|
|
file_extras = "Top Predictors:\n" + "\n".join([f"- {n}: {v:.3f}" for n, v in feature_data])
|
|
|
|
return self._evaluate_and_report(model_type, y_test, y_pred, ui_extras=ui_extras, file_extras=file_extras)
|
|
|
|
elif model_type == "1D-CNN":
|
|
return "1D-CNN training placeholder reached. Not yet implemented."
|
|
elif model_type == "LSTM":
|
|
return "LSTM training placeholder reached. Not yet implemented."
|
|
elif model_type == "XGBoost":
|
|
return "XGBoost training placeholder reached. Not yet implemented."
|
|
else:
|
|
return f"Model type {model_type} not supported."
|
|
|
|
def get_path(self, model_type, is_scaler=False):
|
|
debug_print()
|
|
suffix = self.base_paths.get(model_type, "model.pkl")
|
|
if is_scaler:
|
|
suffix = suffix.split('.')[0] + "_scaler.pkl"
|
|
return f"ml_{self.current_target}_{suffix}"
|
|
|
|
def format_features(self, kpts):
|
|
"""
|
|
Calculates only the geometric features required by self.active_feature_keys.
|
|
"""
|
|
def resolve_pt(idx):
|
|
if isinstance(idx, list):
|
|
pts = [kpts[i][:2] for i in idx] # Ensure X/Y only
|
|
return np.mean(pts, axis=0)
|
|
return kpts[idx][:2]
|
|
|
|
def get_dist(p1, p2): return np.linalg.norm(p1 - p2)
|
|
def get_angle(a, b, c):
|
|
try:
|
|
ba, bc = a - b, c - b
|
|
denom = (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
|
|
cos = np.dot(ba, bc) / denom
|
|
return np.degrees(np.arccos(np.clip(cos, -1.0, 1.0))) / 180.0
|
|
except: return 0.0
|
|
|
|
calculated_pool = {}
|
|
|
|
try:
|
|
if kpts is None or len(kpts) < 13: raise ValueError()
|
|
scale = get_dist(kpts[5][:2], kpts[6][:2]) + 1e-6
|
|
|
|
# First Pass: Direct Geometries (Only calculate what is needed or what is a dependency)
|
|
for name, config_data in GEOMETRY_LIBRARY.items():
|
|
f_type = config_data[0]
|
|
indices = config_data[1]
|
|
|
|
if f_type == "dist":
|
|
p1 = resolve_pt(indices[0])
|
|
p2 = resolve_pt(indices[1])
|
|
calculated_pool[name] = get_dist(p1, p2) / scale
|
|
|
|
elif f_type == "angle":
|
|
p1 = resolve_pt(indices[0])
|
|
p2 = resolve_pt(indices[1])
|
|
p3 = resolve_pt(indices[2])
|
|
calculated_pool[name] = get_angle(p1, p2, p3)
|
|
|
|
elif f_type == "head_offset":
|
|
p_target = resolve_pt(indices[0])
|
|
p_mid = resolve_pt([indices[1], indices[2]])
|
|
calculated_pool[name] = abs(p_target[0] - p_mid[0]) / scale
|
|
|
|
elif f_type == "y_diff": # NEW from JSON
|
|
p1 = resolve_pt(indices[0])
|
|
p2 = resolve_pt(indices[1])
|
|
calculated_pool[name] = abs(p1[1] - p2[1]) / scale
|
|
|
|
# Second Pass: Subtractions (Requires first pass to be complete)
|
|
for name, config_data in GEOMETRY_LIBRARY.items():
|
|
f_type = config_data[0]
|
|
indices = config_data[1]
|
|
|
|
if f_type == "subtraction":
|
|
val1 = calculated_pool.get(indices[0], 0)
|
|
val2 = calculated_pool.get(indices[1], 0)
|
|
calculated_pool[name] = val1 - val2
|
|
elif f_type == "abs_subtraction":
|
|
val1 = calculated_pool.get(indices[0], 0)
|
|
val2 = calculated_pool.get(indices[1], 0)
|
|
calculated_pool[name] = abs(val1 - val2)
|
|
|
|
except Exception:
|
|
calculated_pool = {name: 0.0 for name in GEOMETRY_LIBRARY.keys()}
|
|
|
|
# Final Extraction based on the set of needed features
|
|
feature_vector = [calculated_pool.get(feat, 0.0) for feat in self.active_feature_keys]
|
|
|
|
return np.array(feature_vector, dtype=np.float32)
|
|
|
|
def _evaluate_and_report(self, model_name, y_test, y_pred, ui_extras="", file_extras=""):
|
|
debug_print()
|
|
prec = precision_score(y_test, y_pred, zero_division=0)
|
|
rec = recall_score(y_test, y_pred, zero_division=0)
|
|
f1 = f1_score(y_test, y_pred, zero_division=0)
|
|
|
|
display_labels = ['Rest', self.current_target]
|
|
cm = confusion_matrix(y_test, y_pred)
|
|
|
|
plt.figure(figsize=(8, 6))
|
|
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
|
|
xticklabels=display_labels, yticklabels=display_labels)
|
|
plt.title(f'{model_name} Detection: {self.current_target}')
|
|
plt.ylabel('Actual State')
|
|
plt.xlabel('Predicted State')
|
|
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
plt.savefig(f"ml_{self.current_target}_cm_{timestamp}.png")
|
|
plt.close()
|
|
|
|
report_str = classification_report(y_test, y_pred, target_names=display_labels, zero_division=0)
|
|
|
|
report_text = f"MODEL PERFORMANCE REPORT: {model_name}\nGenerated: {timestamp}\n"
|
|
report_text += "="*40 + "\n"
|
|
report_text += report_str + "\n"
|
|
report_text += f"Precision: {prec:.4f}\nRecall: {rec:.4f}\nF1-Score: {f1:.4f}\n"
|
|
report_text += "="*40 + "\n" + file_extras
|
|
|
|
with open(f"ml_{self.current_target}_performance_{timestamp}.txt", "w") as f:
|
|
f.write(report_text)
|
|
|
|
ui_report = f"""
|
|
<b>{model_name} Model for '{self.current_target}'</b><br>
|
|
Precision: {prec:.2f} | Recall: {rec:.2f} | <b>F1: {f1:.2f}</b><br>
|
|
<hr>
|
|
{ui_extras}
|
|
"""
|
|
return ui_report
|
|
|
|
|
|
# Inside predictor.py -> GeneralPredictor class
|
|
def convert_to_events(self, predictions, track_name="🤖 AI: Predicted"):
|
|
"""
|
|
Converts a 1D array of class labels into a dictionary of timeline blocks.
|
|
predictions: np.array of 0s and 1s
|
|
track_name: The name for the resulting timeline row
|
|
"""
|
|
events = {track_name: []}
|
|
current_class = None
|
|
start_frame = 0
|
|
|
|
for i, pred in enumerate(predictions):
|
|
# We only care about the transition into or out of class 1
|
|
if pred != current_class:
|
|
# If we were in an active block (1), close it
|
|
if current_class == 1:
|
|
events[track_name].append([start_frame, i, "Normal", "ML Prediction"])
|
|
|
|
# If we are starting a new active block (1), mark the start
|
|
if pred == 1:
|
|
start_frame = i
|
|
|
|
current_class = pred
|
|
|
|
# Close the final block if the video ends while the behavior is active
|
|
if current_class == 1:
|
|
events[track_name].append([start_frame, len(predictions), "Normal", "ML Prediction"])
|
|
|
|
return events |