real init
This commit is contained in:
45
README.md
45
README.md
@@ -1,3 +1,44 @@
|
||||
# blazes
|
||||
BLAZES (Behavioral Learning & Automated Zoned Events Suite)
|
||||
=================================================================
|
||||
|
||||
Behavioral Learning & Automated Zoned Events Suite
|
||||
BLAZES is a standalone application to predict behavioural events from video files.
|
||||
|
||||
BLAZES is free and open-source software that runs on Windows, MacOS, and Linux. Please read the information regarding each operating system below.
|
||||
|
||||
Visit the official [BLAZES web site](https://research.dezeeuw.ca/blazes).
|
||||
|
||||
[](https://www.python.org)
|
||||
|
||||
# For MacOS Users
|
||||
|
||||
Due to the cost of an Apple Developer account, the application is not certified by Apple. Once the application is extracted and attempted to be launched for the first time you will get a popup stating:
|
||||
|
||||
"Apple could not verify blazes.app is free of malware that may harm your Mac or compromise your privacy.", with the options of "Done" or "Move to Trash".
|
||||
|
||||
The solution around this is to use finder and navigate to the blazes-darwin folder. Once the folder has been located, right click the folder and click the option "New Terminal at Folder". Once the terminal opens, run the following command (you can copy + paste):
|
||||
|
||||
```xattr -dr com.apple.quarantine blazes.app & pid1=$!; xattr -dr com.apple.quarantine blazes_updater.app & pid2=$!; wait $pid1 $pid2; exit```
|
||||
|
||||
Once the command has been executed and the text "[Process completed]" appears, you may close the terminal window and attempt to open the application again. If you choose to unrestrict the app through Settings > Privacy & Security, the app may not be able to update correctly in the future.
|
||||
|
||||
This only applies for the first time you attempt to run BLAZES. Subsequent times, including after updates, will function correctly as-is.
|
||||
|
||||
# For Windows Users
|
||||
|
||||
Due to the cost of a code signing certificate, the application is not digitally signed. Once the application is extracted and attempted to be launched for the first time you will get a popup stating:
|
||||
|
||||
"Windows protected your PC - Microsoft Defender SmartScreen prevented an unrecognized app from starting. Running this app might put your PC at risk.", with the options of "More info" or "Don't run".
|
||||
|
||||
The solution around this is to click "More info" and then select "Run anyway".
|
||||
|
||||
This only applies for the first time you attempt to run BLAZES. Subsequent times, including after updates, will function correctly as-is.
|
||||
|
||||
# For Linux Users
|
||||
|
||||
There are no conditions for Linux users at this time.
|
||||
|
||||
# Licence
|
||||
|
||||
BLAZES is distributed under the GPL-3.0 license.
|
||||
|
||||
Copyright (C) 2025-2026 Tyler de Zeeuw
|
||||
211
batch_processing.py
Normal file
211
batch_processing.py
Normal file
@@ -0,0 +1,211 @@
|
||||
"""
|
||||
Filename: batch_processing.py
|
||||
Description: BLAZES batch processor
|
||||
|
||||
Author: Tyler de Zeeuw
|
||||
License: GPL-3.0
|
||||
"""
|
||||
|
||||
# Built-in imports
|
||||
import os
|
||||
import csv
|
||||
from pathlib import Path
|
||||
|
||||
# External library imports
|
||||
import cv2
|
||||
import numpy as np
|
||||
from ultralytics import YOLO
|
||||
|
||||
from PySide6.QtWidgets import (QDialog, QVBoxLayout, QHBoxLayout, QLineEdit,
|
||||
QPushButton, QComboBox, QSpinBox, QLabel,
|
||||
QFileDialog, QTextEdit)
|
||||
from PySide6.QtCore import Qt, QObject, Signal, QRunnable, QThreadPool, Slot
|
||||
|
||||
|
||||
JOINT_NAMES = [
|
||||
"Nose", "Left Eye", "Right Eye", "Left Ear", "Right Ear",
|
||||
"Left Shoulder", "Right Shoulder", "Left Elbow", "Right Elbow",
|
||||
"Left Wrist", "Right Wrist", "Left Hip", "Right Hip",
|
||||
"Left Knee", "Right Knee", "Left Ankle", "Right Ankle"
|
||||
]
|
||||
|
||||
class WorkerSignals(QObject):
|
||||
"""Signals to communicate back to the UI from the thread pool."""
|
||||
progress = Signal(str)
|
||||
finished = Signal()
|
||||
|
||||
class VideoWorker(QRunnable):
|
||||
"""A worker task for processing a single video file."""
|
||||
def __init__(self, video_path, model_name):
|
||||
super().__init__()
|
||||
self.video_path = video_path
|
||||
self.model_name = model_name
|
||||
self.signals = WorkerSignals()
|
||||
|
||||
@Slot()
|
||||
def run(self):
|
||||
filename = os.path.basename(self.video_path)
|
||||
self.signals.progress.emit(f"Starting: {filename}")
|
||||
|
||||
try:
|
||||
cap = cv2.VideoCapture(self.video_path)
|
||||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
|
||||
csv_storage_data = []
|
||||
pose_cache = str(Path(self.video_path).with_name(Path(self.video_path).stem + "_pose_raw.csv"))
|
||||
|
||||
if os.path.exists(pose_cache):
|
||||
self.signals.progress.emit(f" - Cache found for {filename}. Skipping inference.")
|
||||
else:
|
||||
# Instantiate model INSIDE the thread for safety
|
||||
model = YOLO(self.model_name)
|
||||
prev_track_id = None
|
||||
|
||||
for i in range(total_frames):
|
||||
ret, frame = cap.read()
|
||||
if not ret: break
|
||||
|
||||
results = model.track(frame, persist=True, verbose=False)
|
||||
track_id, kp, confs, _ = self.get_best_infant_match(results, width, height, prev_track_id)
|
||||
|
||||
if kp is not None:
|
||||
prev_track_id = track_id
|
||||
csv_storage_data.append(np.column_stack((kp, confs)))
|
||||
else:
|
||||
csv_storage_data.append(np.zeros((17, 3)))
|
||||
|
||||
if i % 100 == 0:
|
||||
p = int((i / total_frames) * 100)
|
||||
self.signals.progress.emit(f" - {filename}: {p}%")
|
||||
|
||||
self._save_pose_cache(pose_cache, csv_storage_data)
|
||||
|
||||
cap.release()
|
||||
self.signals.progress.emit(f"COMPLETED: {filename}")
|
||||
except Exception as e:
|
||||
self.signals.progress.emit(f"ERROR on {filename}: {str(e)}")
|
||||
finally:
|
||||
self.signals.finished.emit()
|
||||
|
||||
def get_best_infant_match(self, results, w, h, prev_track_id):
|
||||
if not results[0].boxes or results[0].boxes.id is None:
|
||||
return None, None, None, None
|
||||
|
||||
ids = results[0].boxes.id.int().cpu().tolist()
|
||||
kpts = results[0].keypoints.xy.cpu().numpy()
|
||||
confs = results[0].keypoints.conf.cpu().numpy()
|
||||
|
||||
best_idx, best_score = -1, -1
|
||||
for i, k in enumerate(kpts):
|
||||
vis = np.sum(confs[i] > 0.5)
|
||||
valid = k[confs[i] > 0.5]
|
||||
dist = np.linalg.norm(np.mean(valid, axis=0) - [w/2, h/2]) if len(valid) > 0 else 1000
|
||||
score = (vis * 10) - (dist * 0.1) + (50 if ids[i] == prev_track_id else 0)
|
||||
|
||||
if score > best_score:
|
||||
best_score, best_idx = score, i
|
||||
|
||||
if best_idx == -1:
|
||||
return None, None, None, None
|
||||
return ids[best_idx], kpts[best_idx], confs[best_idx], best_idx
|
||||
|
||||
def _save_pose_cache(self, path, data):
|
||||
with open(path, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
header = []
|
||||
for joint in JOINT_NAMES:
|
||||
header.extend([f"{joint}_x", f"{joint}_y", f"{joint}_conf"])
|
||||
writer.writerow(header)
|
||||
for frame_data in data:
|
||||
writer.writerow(frame_data.flatten())
|
||||
|
||||
class BatchProcessorDialog(QDialog):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
self.setWindowTitle("Concurrent Batch Video Processor")
|
||||
self.setMinimumSize(600, 500)
|
||||
self.threadpool = QThreadPool()
|
||||
self.active_workers = 0
|
||||
self.setup_ui()
|
||||
|
||||
def setup_ui(self):
|
||||
layout = QVBoxLayout(self)
|
||||
|
||||
# Folder selection
|
||||
f_lay = QHBoxLayout()
|
||||
self.folder_edit = QLineEdit()
|
||||
btn_browse = QPushButton("Browse")
|
||||
btn_browse.clicked.connect(self.get_folder)
|
||||
f_lay.addWidget(QLabel("Folder:"))
|
||||
f_lay.addWidget(self.folder_edit)
|
||||
f_lay.addWidget(btn_browse)
|
||||
layout.addLayout(f_lay)
|
||||
|
||||
# Dropdown for YOLO Model
|
||||
self.model_combo = QComboBox()
|
||||
# self.model_combo.addItems([
|
||||
# "yolov8n-pose.pt", "yolov8s-pose.pt",
|
||||
# "yolov8m-pose.pt", "yolov8l-pose.pt", "yolov8x-pose.pt"
|
||||
# ])
|
||||
self.model_combo.addItems(["yolov8n-pose.pt"])
|
||||
layout.addWidget(QLabel("Select YOLO Model:"))
|
||||
layout.addWidget(self.model_combo)
|
||||
|
||||
# Int input for Concurrency
|
||||
self.concurrency_spin = QSpinBox()
|
||||
self.concurrency_spin.setRange(1, 16)
|
||||
self.concurrency_spin.setValue(4)
|
||||
layout.addWidget(QLabel("Concurrent Workers (Threads):"))
|
||||
layout.addWidget(self.concurrency_spin)
|
||||
|
||||
# Output Log
|
||||
self.log = QTextEdit()
|
||||
self.log.setReadOnly(True)
|
||||
self.log.setStyleSheet("background-color: #1e1e1e; color: #d4d4d4; font-family: Consolas;")
|
||||
layout.addWidget(self.log)
|
||||
|
||||
# Action Buttons
|
||||
self.btn_run = QPushButton("Run Concurrent Batch")
|
||||
self.btn_run.setFixedHeight(40)
|
||||
self.btn_run.clicked.connect(self.run_batch)
|
||||
layout.addWidget(self.btn_run)
|
||||
|
||||
def get_folder(self):
|
||||
path = QFileDialog.getExistingDirectory(self, "Select Folder")
|
||||
if path: self.folder_edit.setText(path)
|
||||
|
||||
def log_msg(self, msg):
|
||||
self.log.append(msg)
|
||||
self.log.verticalScrollBar().setValue(self.log.verticalScrollBar().maximum())
|
||||
|
||||
def worker_finished(self):
|
||||
self.active_workers -= 1
|
||||
if self.active_workers <= 0:
|
||||
self.btn_run.setEnabled(True)
|
||||
self.log_msg("--- All concurrent tasks finished ---")
|
||||
|
||||
def run_batch(self):
|
||||
folder = self.folder_edit.text()
|
||||
if not os.path.isdir(folder):
|
||||
self.log_msg("!!! Error: Invalid directory.")
|
||||
return
|
||||
|
||||
video_extensions = ('.mp4', '.avi', '.mov', '.mkv')
|
||||
files = [str(f) for f in Path(folder).iterdir() if f.suffix.lower() in video_extensions]
|
||||
|
||||
if not files:
|
||||
self.log_msg("No videos found.")
|
||||
return
|
||||
|
||||
self.btn_run.setEnabled(False)
|
||||
self.active_workers = len(files)
|
||||
self.threadpool.setMaxThreadCount(self.concurrency_spin.value())
|
||||
|
||||
self.log_msg(f"Queueing {len(files)} videos...")
|
||||
for f_path in files:
|
||||
worker = VideoWorker(f_path, self.model_combo.currentText())
|
||||
worker.signals.progress.connect(self.log_msg)
|
||||
worker.signals.finished.connect(self.worker_finished)
|
||||
self.threadpool.start(worker)
|
||||
257
blazes_updater.py
Normal file
257
blazes_updater.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""
|
||||
Filename: blazes_updater.py
|
||||
Description: BLAZES updater executable
|
||||
|
||||
Author: Tyler de Zeeuw
|
||||
License: GPL-3.0
|
||||
"""
|
||||
|
||||
# Built-in imports
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import shlex
|
||||
import psutil
|
||||
import shutil
|
||||
import platform
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
PLATFORM_NAME = platform.system().lower()
|
||||
APP_NAME = "blazes"
|
||||
|
||||
if PLATFORM_NAME == 'darwin':
|
||||
LOG_FILE = os.path.join(os.path.dirname(sys.executable), f"../../../{APP_NAME}_updater.log")
|
||||
else:
|
||||
LOG_FILE = os.path.join(os.getcwd(), f"{APP_NAME}_updater.log")
|
||||
|
||||
|
||||
def log(msg):
|
||||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
f.write(f"{timestamp} - {msg}\n")
|
||||
|
||||
|
||||
def kill_all_processes_by_executable(exe_path):
|
||||
terminated_any = False
|
||||
exe_path = os.path.realpath(exe_path)
|
||||
|
||||
if PLATFORM_NAME == 'windows':
|
||||
for proc in psutil.process_iter(['pid', 'exe']):
|
||||
try:
|
||||
proc_exe = proc.info.get('exe')
|
||||
if proc_exe and os.path.samefile(os.path.realpath(proc_exe), exe_path):
|
||||
log(f"Terminating process: PID {proc.pid}")
|
||||
_terminate_process(proc)
|
||||
terminated_any = True
|
||||
except Exception as e:
|
||||
log(f"Error terminating process (Windows): {e}")
|
||||
elif PLATFORM_NAME == 'linux':
|
||||
for proc in psutil.process_iter(['pid', 'cmdline']):
|
||||
try:
|
||||
cmdline = proc.info.get('cmdline', [])
|
||||
if cmdline:
|
||||
proc_cmd = os.path.realpath(cmdline[0])
|
||||
if os.path.samefile(proc_cmd, exe_path):
|
||||
log(f"Terminating process: PID {proc.pid}")
|
||||
_terminate_process(proc)
|
||||
terminated_any = True
|
||||
except Exception as e:
|
||||
log(f"Error terminating process (Linux): {e}")
|
||||
|
||||
if not terminated_any:
|
||||
log(f"No running processes found for {exe_path}")
|
||||
return terminated_any
|
||||
|
||||
|
||||
def _terminate_process(proc):
|
||||
try:
|
||||
proc.terminate()
|
||||
proc.wait(timeout=10)
|
||||
log(f"Process {proc.pid} terminated gracefully.")
|
||||
except psutil.TimeoutExpired:
|
||||
log(f"Process {proc.pid} did not terminate in time. Killing forcefully.")
|
||||
proc.kill()
|
||||
proc.wait(timeout=5)
|
||||
log(f"Process {proc.pid} killed.")
|
||||
|
||||
|
||||
def wait_for_unlock(path, timeout=100):
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
try:
|
||||
if os.path.isdir(path):
|
||||
shutil.rmtree(path)
|
||||
else:
|
||||
os.remove(path)
|
||||
log(f"Deleted (after wait): {path}")
|
||||
return
|
||||
except Exception as e:
|
||||
log(f"Still locked: {path} - {e}")
|
||||
time.sleep(1)
|
||||
log(f"Failed to delete after wait: {path}")
|
||||
|
||||
|
||||
def delete_path(path):
|
||||
if os.path.exists(path):
|
||||
try:
|
||||
if os.path.isdir(path):
|
||||
shutil.rmtree(path)
|
||||
log(f"Deleted directory: {path}")
|
||||
else:
|
||||
os.remove(path)
|
||||
log(f"Deleted file: {path}")
|
||||
except Exception as e:
|
||||
log(f"Error deleting {path}: {e}")
|
||||
|
||||
|
||||
def copy_update_files(src_folder, dest_folder, updater_name):
|
||||
for item in os.listdir(src_folder):
|
||||
if item.lower() == updater_name.lower():
|
||||
log(f"Skipping updater executable: {item}")
|
||||
continue
|
||||
s = os.path.join(src_folder, item)
|
||||
d = os.path.join(dest_folder, item)
|
||||
delete_path(d)
|
||||
try:
|
||||
if os.path.isdir(s):
|
||||
shutil.copytree(s, d)
|
||||
log(f"Copied folder: {s} -> {d}")
|
||||
else:
|
||||
shutil.copy2(s, d)
|
||||
log(f"Copied file: {s} -> {d}")
|
||||
except Exception as e:
|
||||
log(f"Error copying {s} -> {d}: {e}")
|
||||
|
||||
|
||||
def copy_update_files_darwin(src_folder, dest_folder, updater_name):
|
||||
|
||||
updater_name = updater_name + ".app"
|
||||
|
||||
for item in os.listdir(src_folder):
|
||||
if item.lower() == updater_name.lower():
|
||||
log(f"Skipping updater executable: {item}")
|
||||
continue
|
||||
s = os.path.join(src_folder, item)
|
||||
d = os.path.join(dest_folder, item)
|
||||
delete_path(d)
|
||||
try:
|
||||
if os.path.isdir(s):
|
||||
subprocess.check_call(["ditto", s, d])
|
||||
log(f"Copied folder with ditto: {s} -> {d}")
|
||||
else:
|
||||
shutil.copy2(s, d)
|
||||
log(f"Copied file: {s} -> {d}")
|
||||
except Exception as e:
|
||||
log(f"Error copying {s} -> {d}: {e}")
|
||||
|
||||
|
||||
def remove_quarantine(app_path):
|
||||
script = f'''
|
||||
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{APP_NAME} needs privileges to finish the update. (1/2)"
|
||||
'''
|
||||
try:
|
||||
subprocess.run(['osascript', '-e', script], check=True)
|
||||
print("✅ Quarantine attribute removed.")
|
||||
except subprocess.CalledProcessError as e:
|
||||
print("❌ Failed to remove quarantine attribute.")
|
||||
print(e)
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
log(f"[Updater] sys.argv: {sys.argv}")
|
||||
|
||||
if len(sys.argv) != 3:
|
||||
log(f"Invalid arguments. Usage: {APP_NAME}_updater <update_folder> <main_app_executable>")
|
||||
sys.exit(1)
|
||||
|
||||
update_folder = sys.argv[1]
|
||||
main_exe = sys.argv[2]
|
||||
|
||||
# Interesting naming convention
|
||||
parent_dir = os.path.dirname(os.path.abspath(main_exe))
|
||||
pparent_dir = os.path.dirname(parent_dir)
|
||||
ppparent_dir = os.path.dirname(pparent_dir)
|
||||
pppparent_dir = os.path.dirname(ppparent_dir)
|
||||
|
||||
updater_name = os.path.basename(sys.argv[0])
|
||||
|
||||
log("Updater started.")
|
||||
log(f"Update folder: {update_folder}")
|
||||
log(f"Main EXE: {main_exe}")
|
||||
log(f"Updater EXE: {updater_name}")
|
||||
if PLATFORM_NAME == 'darwin':
|
||||
log(f"Main App Folder: {ppparent_dir}")
|
||||
|
||||
# Kill all instances of main app
|
||||
kill_all_processes_by_executable(main_exe)
|
||||
|
||||
# Wait until main_exe process is fully gone (polling)
|
||||
for _ in range(20): # wait max 10 seconds
|
||||
running = False
|
||||
for proc in psutil.process_iter(['exe', 'cmdline']):
|
||||
try:
|
||||
if PLATFORM_NAME == 'windows':
|
||||
proc_exe = proc.info.get('exe')
|
||||
if proc_exe and os.path.samefile(os.path.realpath(proc_exe), os.path.realpath(main_exe)):
|
||||
running = True
|
||||
break
|
||||
elif PLATFORM_NAME == 'linux':
|
||||
cmdline = proc.info.get('cmdline', [])
|
||||
if cmdline:
|
||||
proc_cmd = os.path.realpath(cmdline[0])
|
||||
if os.path.samefile(proc_cmd, os.path.realpath(main_exe)):
|
||||
running = True
|
||||
break
|
||||
except Exception as e:
|
||||
log(f"Polling error: {e}")
|
||||
if not running:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
log("Warning: main executable still running after wait timeout.")
|
||||
|
||||
# Delete old version files
|
||||
if PLATFORM_NAME == 'darwin':
|
||||
log(f'Attempting to delete {ppparent_dir}')
|
||||
delete_path(ppparent_dir)
|
||||
update_folder = os.path.join(sys.argv[1], f"{APP_NAME}-darwin")
|
||||
copy_update_files_darwin(update_folder, pppparent_dir, updater_name)
|
||||
|
||||
else:
|
||||
delete_path(main_exe)
|
||||
wait_for_unlock(os.path.join(parent_dir, "_internal"))
|
||||
|
||||
# Copy new files excluding the updater itself
|
||||
copy_update_files(update_folder, parent_dir, updater_name)
|
||||
|
||||
except Exception as e:
|
||||
log(f"Something went wrong: {e}")
|
||||
|
||||
# Relaunch main app
|
||||
try:
|
||||
if PLATFORM_NAME == 'linux':
|
||||
os.chmod(main_exe, 0o755)
|
||||
log("Added executable bit")
|
||||
|
||||
if PLATFORM_NAME == 'darwin':
|
||||
os.chmod(ppparent_dir, 0o755)
|
||||
log("Added executable bit")
|
||||
remove_quarantine(ppparent_dir)
|
||||
log(f"Removed the quarantine flag on {ppparent_dir}")
|
||||
subprocess.Popen(['open', ppparent_dir, "--args", "--finish-update"])
|
||||
else:
|
||||
subprocess.Popen([main_exe, "--finish-update"], cwd=parent_dir)
|
||||
|
||||
log("Relaunched main app.")
|
||||
except Exception as e:
|
||||
log(f"Failed to relaunch main app: {e}")
|
||||
|
||||
log("Updater completed. Exiting.")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
405
predictor.py
Normal file
405
predictor.py
Normal file
@@ -0,0 +1,405 @@
|
||||
"""
|
||||
Filename: predictor.py
|
||||
Description: BLAZES machine learning
|
||||
|
||||
Author: Tyler de Zeeuw
|
||||
License: GPL-3.0
|
||||
"""
|
||||
|
||||
# Built-in imports
|
||||
import inspect
|
||||
from datetime import datetime
|
||||
|
||||
# External library imports
|
||||
import numpy as np
|
||||
import joblib
|
||||
import seaborn as sns
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
from sklearn.ensemble import RandomForestClassifier
|
||||
from sklearn.model_selection import train_test_split
|
||||
from sklearn.metrics import classification_report, f1_score, precision_score, recall_score, confusion_matrix
|
||||
from sklearn.preprocessing import StandardScaler
|
||||
|
||||
# To be used once multiple models are supported and functioning:
|
||||
# import torch
|
||||
# import torch.nn as nn
|
||||
# import torch.optim as optim
|
||||
# import xgboost as xgb
|
||||
# from sklearn.svm import SVC
|
||||
# import os
|
||||
|
||||
VERBOSITY = 1
|
||||
|
||||
GEOMETRY_LIBRARY = {
|
||||
# --- Distances (Point A, Point B) ---
|
||||
"dist_l_wrist_nose": ("dist", [9, 0], True),
|
||||
"dist_r_wrist_nose": ("dist", [10, 0], True),
|
||||
"dist_l_ear_r_shld": ("dist", [3, 6], True),
|
||||
"dist_r_ear_l_shld": ("dist", [4, 5], True),
|
||||
|
||||
"dist_l_wrist_pelvis": ("dist", [9, [11, 12]], True),
|
||||
"dist_r_wrist_pelvis": ("dist", [10, [11, 12]], True),
|
||||
"dist_l_ankl_pelvis": ("dist", [15, [11, 12]], True),
|
||||
"dist_r_ankl_pelvis": ("dist", [16, [11, 12]], True),
|
||||
"dist_nose_pelvis": ("dist", [0, [11, 12]], True),
|
||||
"dist_ankl_ankl": ("dist", [15, 16], True),
|
||||
|
||||
# NEW: Cross-Body and Pure Extension Distances
|
||||
"dist_l_wri_r_shld": ("dist", [9, 6], True), # Reach across body
|
||||
"dist_r_wri_l_shld": ("dist", [10, 5], True), # Reach across body
|
||||
"dist_l_wri_l_shld": ("dist", [9, 5], True), # Pure arm extension
|
||||
"dist_r_wri_r_shld": ("dist", [10, 6], True), # Pure arm extension
|
||||
|
||||
# --- Angles (Point A, Center B, Point C) ---
|
||||
"angle_l_elbow": ("angle", [5, 7, 9]),
|
||||
"angle_r_elbow": ("angle", [6, 8, 10]),
|
||||
"angle_l_shoulder": ("angle", [11, 5, 7]),
|
||||
"angle_r_shoulder": ("angle", [12, 6, 8]),
|
||||
"angle_l_knee": ("angle", [11, 13, 15]),
|
||||
"angle_r_knee": ("angle", [12, 14, 16]),
|
||||
"angle_l_hip": ("angle", [5, 11, 13]),
|
||||
"angle_r_hip": ("angle", [6, 12, 14]),
|
||||
|
||||
# --- Custom/Derived ---
|
||||
"asym_wrist": ("z_diff", [9, 10]),
|
||||
"asym_ankl": ("z_diff", [15, 16]),
|
||||
"offset_head": ("head_offset", [0, 5, 6]),
|
||||
"diff_ear_shld": ("subtraction", ["dist_l_ear_r_shld", "dist_r_ear_l_shld"]),
|
||||
"abs_diff_ear_shld": ("abs_subtraction", ["dist_l_ear_r_shld", "dist_r_ear_l_shld"]),
|
||||
|
||||
# NEW: Verticality and Contralateral Contrast
|
||||
"height_l_ankl": ("y_diff", [15, 11]), # Foot height relative to hip
|
||||
"height_r_ankl": ("y_diff", [16, 12]), # Foot height relative to hip
|
||||
"diff_knee_angle": ("subtraction", ["angle_l_knee", "angle_r_knee"]),
|
||||
"asym_wri_shld": ("subtraction", ["dist_l_wri_l_shld", "dist_r_wri_r_shld"])
|
||||
}
|
||||
|
||||
|
||||
# The Target Activity Map
|
||||
ACTIVITY_MAP = {
|
||||
"Mouthing": [
|
||||
"dist_l_wrist_nose", "dist_r_wrist_nose", "angle_l_elbow",
|
||||
"angle_r_elbow", "angle_l_shoulder", "angle_r_shoulder",
|
||||
"asym_wrist", "offset_head"
|
||||
],
|
||||
"Head Movement": [
|
||||
"dist_l_wrist_nose", "dist_r_wrist_nose", "angle_l_elbow",
|
||||
"angle_r_elbow", "angle_l_shoulder", "angle_r_shoulder",
|
||||
"asym_wrist", "offset_head", "dist_l_ear_r_shld",
|
||||
"dist_r_ear_l_shld", "diff_ear_shld", "abs_diff_ear_shld"
|
||||
],
|
||||
"Reach (Left)": [
|
||||
"dist_l_wrist_pelvis", "dist_l_wrist_nose", "dist_l_wri_l_shld",
|
||||
"dist_l_wri_r_shld", "angle_l_elbow", "angle_l_shoulder",
|
||||
"asym_wri_shld"
|
||||
],
|
||||
"Reach (Right)": [
|
||||
"dist_r_wrist_pelvis", "dist_r_wrist_nose", "dist_r_wri_r_shld",
|
||||
"dist_r_wri_l_shld", "angle_r_elbow", "angle_r_shoulder",
|
||||
"asym_wri_shld"
|
||||
],
|
||||
"Kick (Left)": [
|
||||
"dist_l_ankl_pelvis", "angle_l_knee", "angle_l_hip",
|
||||
"height_l_ankl", "dist_ankl_ankl", "asym_ankl",
|
||||
"diff_knee_angle", "dist_nose_pelvis"
|
||||
],
|
||||
"Kick (Right)": [
|
||||
"dist_r_ankl_pelvis", "angle_r_knee", "angle_r_hip",
|
||||
"height_r_ankl", "dist_ankl_ankl", "asym_ankl",
|
||||
"diff_knee_angle", "dist_nose_pelvis"
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def debug_print():
|
||||
if VERBOSITY:
|
||||
frame = inspect.currentframe().f_back
|
||||
qualname = frame.f_code.co_qualname
|
||||
print(qualname)
|
||||
|
||||
|
||||
class GeneralPredictor:
|
||||
def __init__(self):
|
||||
debug_print()
|
||||
self.base_paths = {
|
||||
"Random Forest": "rf.pkl",
|
||||
"XGBoost": "xgb.json",
|
||||
"SVM": "svm.pkl",
|
||||
"LSTM": "lstm.pth",
|
||||
"1D-CNN": "cnn.pth"
|
||||
}
|
||||
self.raw_participant_buffer = []
|
||||
self.current_target = ""
|
||||
self.scaler_cache = {}
|
||||
|
||||
|
||||
def add_to_raw_buffer(self, raw_payload, y_labels):
|
||||
"""
|
||||
Adds a participant's raw kinematic components to the pool.
|
||||
raw_payload should contain: 'z_kps', 'directions', 'raw_kps'
|
||||
"""
|
||||
debug_print()
|
||||
entry = {
|
||||
"raw_data": raw_payload,
|
||||
"labels": y_labels
|
||||
}
|
||||
self.raw_participant_buffer.append(entry)
|
||||
return f"Added participant to pool. Total participants: {len(self.raw_participant_buffer)}"
|
||||
|
||||
|
||||
def clear_buffer(self):
|
||||
"""Clears the raw pool."""
|
||||
debug_print()
|
||||
self.raw_participant_buffer = []
|
||||
|
||||
|
||||
def calculate_and_train(self, model_type, target_name):
|
||||
"""
|
||||
The 'On-the-Fly' engine. Loops through the raw buffer,
|
||||
calculates features for the SELECTED target, and trains.
|
||||
"""
|
||||
debug_print()
|
||||
self.current_target = target_name
|
||||
all_X = []
|
||||
all_y = []
|
||||
|
||||
# 1. Process every participant in the pool
|
||||
for participant in self.raw_participant_buffer:
|
||||
raw = participant["raw_data"]
|
||||
all_tracks = participant["labels"]
|
||||
|
||||
# Pull the specific track that was requested
|
||||
track_key = f"OBS: {target_name}"
|
||||
if track_key not in all_tracks:
|
||||
print(f"Warning: Track {track_key} not found for a participant. Skipping.")
|
||||
continue
|
||||
|
||||
y = all_tracks[track_key]
|
||||
|
||||
# Extract lists from the payload
|
||||
z_scores = raw["z_kps"]
|
||||
dirs = raw["directions"]
|
||||
kpts = raw["raw_kps"]
|
||||
|
||||
# Calculate geometric features for every frame
|
||||
participant_features = []
|
||||
for i in range(len(y)):
|
||||
feat = self.format_features(z_scores[i], dirs[i], kpts[i])
|
||||
participant_features.append(feat)
|
||||
|
||||
all_X.append(np.array(participant_features))
|
||||
all_y.append(y)
|
||||
|
||||
# 2. Prepare for Training
|
||||
X_combined = np.vstack(all_X)
|
||||
y_combined = np.concatenate(all_y)
|
||||
|
||||
# 3. Scale the data specifically for this target/model combo
|
||||
scaler = StandardScaler()
|
||||
X_scaled = scaler.fit_transform(X_combined)
|
||||
scaler_path = self.get_path(model_type, is_scaler=True)
|
||||
joblib.dump(scaler, scaler_path)
|
||||
|
||||
# 4. Train/Test Split
|
||||
X_train, X_test, y_train, y_test = train_test_split(
|
||||
X_scaled, y_combined, test_size=0.2, stratify=y_combined, random_state=42
|
||||
)
|
||||
|
||||
# 5. Process with corresponding Model
|
||||
if model_type == "Random Forest":
|
||||
model = RandomForestClassifier(max_depth=15, n_estimators=100, class_weight="balanced")
|
||||
model.fit(X_train, y_train)
|
||||
|
||||
# Save the model
|
||||
save_path = self.get_path(model_type)
|
||||
joblib.dump(model, save_path)
|
||||
|
||||
y_pred = model.predict(X_test)
|
||||
|
||||
# Feature Importance for the UI
|
||||
labels_names = self.get_feature_labels()
|
||||
importances = model.feature_importances_
|
||||
feature_data = sorted(zip(labels_names, importances), key=lambda x: x[1], reverse=True)
|
||||
ui_extras = "<b>Top Predictors:</b><br>" + "<br>".join([f"{n}: {v:.3f}" for n, v in feature_data])
|
||||
file_extras = "Top Predictors:\n" + "\n".join([f"- {n}: {v:.3f}" for n, v in feature_data])
|
||||
|
||||
return self._evaluate_and_report(model_type, y_test, y_pred, ui_extras=ui_extras, file_extras=file_extras, target_name=target_name)
|
||||
|
||||
# TODO: More than random forest
|
||||
else:
|
||||
return "Model type not yet implemented in calculate_and_train."
|
||||
|
||||
|
||||
def get_path(self, model_type, is_scaler=False):
|
||||
"""Returns the specific file path for the target/model or its scaler."""
|
||||
debug_print()
|
||||
suffix = self.base_paths[model_type]
|
||||
|
||||
if is_scaler:
|
||||
suffix = suffix.split('.')[0] + "_scaler.pkl"
|
||||
|
||||
return f"ml_{self.current_target}_{suffix}"
|
||||
|
||||
|
||||
def get_feature_labels(self):
|
||||
"""Returns labels only for features active in the current target."""
|
||||
debug_print()
|
||||
active_keys = ACTIVITY_MAP.get(self.current_target, [])
|
||||
return active_keys
|
||||
|
||||
|
||||
def format_features(self, z_scores, directions, kpts):
|
||||
"""The 'Universal Parser' for geometric features."""
|
||||
# debug_print()
|
||||
# Internal Math Helpers
|
||||
if self.current_target == "ALL_FEATURES":
|
||||
active_list = list(GEOMETRY_LIBRARY.keys())
|
||||
else:
|
||||
active_list = ACTIVITY_MAP.get(self.current_target, ACTIVITY_MAP["Mouthing"])
|
||||
|
||||
def resolve_pt(idx):
|
||||
if isinstance(idx, list):
|
||||
# Calculate midpoint of all indices in the list
|
||||
pts = [kpts[i] for i in idx]
|
||||
return np.mean(pts, axis=0)
|
||||
return kpts[idx]
|
||||
|
||||
def get_dist(p1, p2): return np.linalg.norm(p1 - p2)
|
||||
def get_angle(a, b, c):
|
||||
try:
|
||||
ba, bc = a - b, c - b
|
||||
denom = (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
|
||||
cos = np.dot(ba, bc) / denom
|
||||
return np.degrees(np.arccos(np.clip(cos, -1.0, 1.0))) / 180.0
|
||||
except: return 0.0
|
||||
|
||||
calculated_pool = {}
|
||||
|
||||
try:
|
||||
if kpts is None or len(kpts) < 13: raise ValueError()
|
||||
# Reference scale (Shoulders)
|
||||
scale = get_dist(kpts[5], kpts[6]) + 1e-6
|
||||
|
||||
# First Pass: Direct Geometries
|
||||
for name, (f_type, indices, *meta) in GEOMETRY_LIBRARY.items():
|
||||
if f_type == "dist":
|
||||
# Use resolve_pt for both indices
|
||||
p1 = resolve_pt(indices[0])
|
||||
p2 = resolve_pt(indices[1])
|
||||
calculated_pool[name] = get_dist(p1, p2) / scale
|
||||
|
||||
elif f_type == "angle":
|
||||
# Use resolve_pt for all three indices
|
||||
p1 = resolve_pt(indices[0])
|
||||
p2 = resolve_pt(indices[1])
|
||||
p3 = resolve_pt(indices[2])
|
||||
calculated_pool[name] = get_angle(p1, p2, p3)
|
||||
|
||||
elif f_type == "z_diff":
|
||||
# Z-scores are usually single indices, but we handle lists just in case
|
||||
z1 = np.mean([z_scores[i] for i in indices[0]]) if isinstance(indices[0], list) else z_scores[indices[0]]
|
||||
z2 = np.mean([z_scores[i] for i in indices[1]]) if isinstance(indices[1], list) else z_scores[indices[1]]
|
||||
calculated_pool[name] = abs(z1 - z2)
|
||||
|
||||
elif f_type == "head_offset":
|
||||
p_target = resolve_pt(indices[0])
|
||||
p_mid = resolve_pt([indices[1], indices[2]]) # Midpoint of shoulders
|
||||
calculated_pool[name] = abs(p_target[0] - p_mid[0]) / scale
|
||||
|
||||
# Second Pass: Composite Geometries (Subtractions/Symmetry)
|
||||
# We do this after so 'dist_l_ear_r_shld' is already calculated
|
||||
for name, (f_type, indices, *meta) in GEOMETRY_LIBRARY.items():
|
||||
if f_type == "subtraction":
|
||||
calculated_pool[name] = calculated_pool[indices[0]] - calculated_pool[indices[1]]
|
||||
elif f_type == "abs_subtraction":
|
||||
calculated_pool[name] = abs(calculated_pool[indices[0]] - calculated_pool[indices[1]])
|
||||
|
||||
except Exception:
|
||||
# If a frame fails, fill the pool with zeros to prevent crashes
|
||||
calculated_pool = {name: 0.0 for name in GEOMETRY_LIBRARY.keys()}
|
||||
|
||||
# Final Extraction based on current_target
|
||||
|
||||
active_list = ACTIVITY_MAP.get(self.current_target, ACTIVITY_MAP["Mouthing"])
|
||||
feature_vector = [calculated_pool[feat] for feat in active_list]
|
||||
|
||||
return np.array(feature_vector, dtype=np.float32)
|
||||
|
||||
def _prepare_pool_data(self):
|
||||
"""Merges buffer and fits scaler."""
|
||||
debug_print()
|
||||
if not self.X_buffer:
|
||||
return None, None, None
|
||||
|
||||
X_total = np.vstack(self.X_buffer)
|
||||
y_total = np.concatenate(self.y_buffer)
|
||||
|
||||
# We always fit a fresh scaler on the current pool
|
||||
scaler_file = f"{self.current_target}_scaler.pkl"
|
||||
scaler = StandardScaler()
|
||||
X_scaled = scaler.fit_transform(X_total)
|
||||
joblib.dump(scaler, scaler_file)
|
||||
|
||||
return X_scaled, y_total, scaler
|
||||
|
||||
|
||||
def _evaluate_and_report(self, model_name, y_test, y_pred, extra_text="", ui_extras="", file_extras="", target_name=""):
|
||||
"""Generates unified metrics, confusion matrix, and reports for ANY model"""
|
||||
debug_print()
|
||||
prec = precision_score(y_test, y_pred, zero_division=0)
|
||||
rec = recall_score(y_test, y_pred, zero_division=0)
|
||||
f1 = f1_score(y_test, y_pred, zero_division=0)
|
||||
|
||||
target = getattr(self, 'current_target', 'Activity')
|
||||
display_labels = ['Rest', target]
|
||||
# Plot Confusion Matrix
|
||||
cm = confusion_matrix(y_test, y_pred)
|
||||
plt.figure(figsize=(8, 6))
|
||||
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
|
||||
xticklabels=display_labels,
|
||||
yticklabels=display_labels)
|
||||
plt.title(f'{model_name} Detection: Predicted vs Actual')
|
||||
plt.ylabel('Actual State')
|
||||
plt.xlabel('Predicted State')
|
||||
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
plt.savefig(f"ml_{target_name}_confusion_matrix_rf_{timestamp}.png")
|
||||
plt.close()
|
||||
|
||||
# Classification Report String
|
||||
report_str = classification_report(y_test, y_pred,
|
||||
target_names=display_labels,
|
||||
zero_division=0)
|
||||
|
||||
# Build TXT File Content
|
||||
report_text = f"MODEL PERFORMANCE REPORT: {model_name}\nGenerated: {timestamp}\n"
|
||||
report_text += "="*40 + "\n"
|
||||
report_text += report_str + "\n"
|
||||
report_text += f"Precision: {prec:.4f}\nRecall: {rec:.4f}\nF1-Score: {f1:.4f}\n"
|
||||
report_text += "="*40 + "\n" + extra_text
|
||||
report_text += "="*40 + "\n" + file_extras
|
||||
|
||||
with open(f"ml_{target_name}_performance_rf_{timestamp}.txt", "w") as f:
|
||||
f.write(report_text)
|
||||
|
||||
# Build UI String
|
||||
ui_report = f"""
|
||||
<b>{model_name} Performance:</b><br>
|
||||
Precision: {prec:.2f} | Recall: {rec:.2f} | <b>F1: {f1:.2f}</b><br>
|
||||
<hr>
|
||||
{ui_extras}
|
||||
"""
|
||||
return ui_report
|
||||
|
||||
def calculate_directions(self, analysis_kps):
|
||||
debug_print()
|
||||
all_dirs = np.zeros((len(analysis_kps), 17))
|
||||
|
||||
for f in range(1, len(analysis_kps)):
|
||||
deltas = analysis_kps[f] - analysis_kps[f-1] # Shape (17, 2)
|
||||
|
||||
angles = np.arctan2(-deltas[:, 1], deltas[:, 0])
|
||||
all_dirs[f] = angles
|
||||
|
||||
return all_dirs
|
||||
539
updater.py
Normal file
539
updater.py
Normal file
@@ -0,0 +1,539 @@
|
||||
"""
|
||||
Filename: updater.py
|
||||
Description: Generic updater file
|
||||
|
||||
Author: Tyler de Zeeuw
|
||||
License: GPL-3.0
|
||||
"""
|
||||
|
||||
# Built-in imports
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import shlex
|
||||
import shutil
|
||||
import zipfile
|
||||
import traceback
|
||||
import subprocess
|
||||
|
||||
# External library imports
|
||||
import psutil
|
||||
import requests
|
||||
|
||||
from PySide6.QtWidgets import QMessageBox
|
||||
from PySide6.QtCore import QThread, Signal, QObject
|
||||
|
||||
|
||||
class UpdateDownloadThread(QThread):
|
||||
"""
|
||||
Thread that downloads and extracts an update package and emits a signal on completion or error.
|
||||
|
||||
Args:
|
||||
download_url (str): URL of the update zip file to download.
|
||||
latest_version (str): Version string of the latest update.
|
||||
"""
|
||||
|
||||
update_ready = Signal(str, str)
|
||||
error_occurred = Signal(str)
|
||||
|
||||
def __init__(self, download_url, latest_version, platform_name, app_name):
|
||||
super().__init__()
|
||||
self.download_url = download_url
|
||||
self.latest_version = latest_version
|
||||
self.platform_name = platform_name
|
||||
self.app_name = app_name
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
local_filename = os.path.basename(self.download_url)
|
||||
|
||||
if self.platform_name == 'darwin':
|
||||
tmp_dir = f'/tmp/{self.app_name}tempupdate'
|
||||
os.makedirs(tmp_dir, exist_ok=True)
|
||||
local_path = os.path.join(tmp_dir, local_filename)
|
||||
else:
|
||||
local_path = os.path.join(os.getcwd(), local_filename)
|
||||
|
||||
# Download the file
|
||||
with requests.get(self.download_url, stream=True, timeout=15) as r:
|
||||
r.raise_for_status()
|
||||
with open(local_path, 'wb') as f:
|
||||
for chunk in r.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
|
||||
# Extract folder name (remove .zip)
|
||||
if self.platform_name == 'darwin':
|
||||
extract_folder = os.path.splitext(local_filename)[0]
|
||||
extract_path = os.path.join(tmp_dir, extract_folder)
|
||||
|
||||
else:
|
||||
extract_folder = os.path.splitext(local_filename)[0]
|
||||
extract_path = os.path.join(os.getcwd(), extract_folder)
|
||||
|
||||
# Create the folder if not exists
|
||||
os.makedirs(extract_path, exist_ok=True)
|
||||
|
||||
# Extract the zip file contents
|
||||
if self.platform_name == 'darwin':
|
||||
subprocess.run(['ditto', '-xk', local_path, extract_path], check=True)
|
||||
else:
|
||||
with zipfile.ZipFile(local_path, 'r') as zip_ref:
|
||||
zip_ref.extractall(extract_path)
|
||||
|
||||
# Remove the zip once extracted and emit a signal
|
||||
os.remove(local_path)
|
||||
self.update_ready.emit(self.latest_version, extract_path)
|
||||
|
||||
except Exception as e:
|
||||
# Emit a signal signifying failure
|
||||
self.error_occurred.emit(str(e))
|
||||
|
||||
|
||||
|
||||
class UpdateCheckThread(QThread):
|
||||
"""
|
||||
Thread that checks for updates by querying the API and emits a signal based on the result.
|
||||
|
||||
Signals:
|
||||
download_requested(str, str): Emitted with (download_url, latest_version) when an update is available.
|
||||
no_update_available(): Emitted when no update is found or current version is up to date.
|
||||
error_occurred(str): Emitted with an error message if the update check fails.
|
||||
"""
|
||||
|
||||
download_requested = Signal(str, str)
|
||||
no_update_available = Signal()
|
||||
error_occurred = Signal(str)
|
||||
|
||||
def __init__(self, api_url, api_url_sec, current_version, platform_name, app_name):
|
||||
super().__init__()
|
||||
self.api_url = api_url
|
||||
self.api_url_sec = api_url_sec
|
||||
self.current_version = current_version
|
||||
self.platform_name = platform_name
|
||||
self.app_name = app_name
|
||||
|
||||
def run(self):
|
||||
# if not getattr(sys, 'frozen', False):
|
||||
# self.error_occurred.emit("Application is not frozen (Development mode).")
|
||||
# return
|
||||
try:
|
||||
latest_version, download_url = self.get_latest_release_for_platform()
|
||||
if not latest_version:
|
||||
self.no_update_available.emit()
|
||||
return
|
||||
|
||||
if not download_url:
|
||||
self.error_occurred.emit(f"No download available for platform '{self.platform_name}'")
|
||||
return
|
||||
|
||||
if self.version_compare(latest_version, self.current_version) > 0:
|
||||
self.download_requested.emit(download_url, latest_version)
|
||||
else:
|
||||
self.no_update_available.emit()
|
||||
|
||||
except Exception as e:
|
||||
self.error_occurred.emit(f"Update check failed: {e}")
|
||||
|
||||
def version_compare(self, v1, v2):
|
||||
def normalize(v): return [int(x) for x in v.split(".")]
|
||||
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
|
||||
|
||||
def get_latest_release_for_platform(self):
|
||||
urls = [self.api_url, self.api_url_sec]
|
||||
for url in urls:
|
||||
try:
|
||||
|
||||
response = requests.get(url, timeout=5)
|
||||
response.raise_for_status()
|
||||
releases = response.json()
|
||||
|
||||
if not releases:
|
||||
continue
|
||||
|
||||
latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None)
|
||||
|
||||
if not latest:
|
||||
continue
|
||||
|
||||
tag = latest["tag_name"].lstrip("v")
|
||||
|
||||
for asset in latest.get("assets", []):
|
||||
if self.platform_name in asset["name"].lower():
|
||||
return tag, asset["browser_download_url"]
|
||||
|
||||
return tag, None
|
||||
except (requests.RequestException, ValueError) as e:
|
||||
continue
|
||||
return None, None
|
||||
|
||||
|
||||
class LocalPendingUpdateCheckThread(QThread):
|
||||
"""
|
||||
Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly.
|
||||
|
||||
Args:
|
||||
current_version (str): Current application version.
|
||||
platform_suffix (str): Platform-specific suffix to identify update folders.
|
||||
"""
|
||||
|
||||
pending_update_found = Signal(str, str)
|
||||
no_pending_update = Signal()
|
||||
|
||||
def __init__(self, current_version, platform_suffix, platform_name, app_name):
|
||||
super().__init__()
|
||||
self.current_version = current_version
|
||||
self.platform_suffix = platform_suffix
|
||||
self.platform_name = platform_name
|
||||
self.app_name = app_name
|
||||
|
||||
def version_compare(self, v1, v2):
|
||||
def normalize(v): return [int(x) for x in v.split(".")]
|
||||
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
|
||||
|
||||
def run(self):
|
||||
if self.platform_name == 'darwin':
|
||||
cwd = f'/tmp/{self.app_name}tempupdate'
|
||||
else:
|
||||
cwd = os.getcwd()
|
||||
|
||||
pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$")
|
||||
found = False
|
||||
|
||||
try:
|
||||
for item in os.listdir(cwd):
|
||||
folder_path = os.path.join(cwd, item)
|
||||
if os.path.isdir(folder_path) and item.endswith(self.platform_suffix):
|
||||
match = pattern.match(item)
|
||||
if match:
|
||||
folder_version = match.group(1)
|
||||
if self.version_compare(folder_version, self.current_version) > 0:
|
||||
self.pending_update_found.emit(folder_version, folder_path)
|
||||
found = True
|
||||
break
|
||||
except:
|
||||
pass
|
||||
|
||||
if not found:
|
||||
self.no_pending_update.emit()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class UpdateManager(QObject):
|
||||
"""
|
||||
Orchestrates the update process.
|
||||
Main apps should instantiate this and call check_for_updates().
|
||||
"""
|
||||
|
||||
def __init__(self, main_window, api_url, api_url_sec, current_version, platform_name, platform_suffix, app_name):
|
||||
super().__init__()
|
||||
self.parent = main_window
|
||||
self.api_url = api_url
|
||||
self.api_url_sec = api_url_sec
|
||||
self.current_version = current_version
|
||||
self.platform_name = platform_name
|
||||
self.platform_suffix = platform_suffix
|
||||
self.app_name = app_name
|
||||
|
||||
self.pending_update_version = None
|
||||
self.pending_update_path = None
|
||||
|
||||
|
||||
def manual_check_for_updates(self):
|
||||
self.local_check_thread = LocalPendingUpdateCheckThread(self.current_version, self.platform_suffix, self.platform_name, self.app_name)
|
||||
self.local_check_thread.pending_update_found.connect(self.on_pending_update_found)
|
||||
self.local_check_thread.no_pending_update.connect(self.on_no_pending_update)
|
||||
self.local_check_thread.start()
|
||||
|
||||
def on_pending_update_found(self, version, folder_path):
|
||||
self.parent.statusBar().showMessage(f"Pending update found: version {version}")
|
||||
self.pending_update_version = version
|
||||
self.pending_update_path = folder_path
|
||||
self.show_pending_update_popup()
|
||||
|
||||
def on_no_pending_update(self):
|
||||
# No pending update found locally, start server check directly
|
||||
self.parent.statusBar().showMessage("No pending local update found. Checking server...")
|
||||
self.start_update_check_thread()
|
||||
|
||||
def show_pending_update_popup(self):
|
||||
msg_box = QMessageBox(self.parent)
|
||||
msg_box.setWindowTitle("Pending Update Found")
|
||||
msg_box.setText(f"A previously downloaded update for {self.app_name.upper()} (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?")
|
||||
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
|
||||
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
|
||||
msg_box.exec()
|
||||
|
||||
if msg_box.clickedButton() == install_now_button:
|
||||
self.install_update(self.pending_update_path)
|
||||
else:
|
||||
self.parent.statusBar().showMessage("Pending update available. Install later.")
|
||||
# After user dismisses, still check the server for new updates
|
||||
self.start_update_check_thread()
|
||||
|
||||
def start_update_check_thread(self):
|
||||
self.check_thread = UpdateCheckThread(self.api_url, self.api_url_sec, self.current_version, self.platform_name, self.app_name)
|
||||
self.check_thread.download_requested.connect(self.on_server_update_requested)
|
||||
self.check_thread.no_update_available.connect(self.on_server_no_update)
|
||||
self.check_thread.error_occurred.connect(self.on_error)
|
||||
self.check_thread.start()
|
||||
|
||||
def on_server_no_update(self):
|
||||
self.parent.statusBar().showMessage("No new updates found on server.", 5000)
|
||||
|
||||
def on_server_update_requested(self, download_url, latest_version):
|
||||
if self.pending_update_version:
|
||||
cmp = self.version_compare(latest_version, self.pending_update_version)
|
||||
if cmp > 0:
|
||||
# Server version is newer than pending update
|
||||
self.parent.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...")
|
||||
try:
|
||||
shutil.rmtree(self.pending_update_path)
|
||||
self.parent.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}")
|
||||
except Exception as e:
|
||||
self.parent.statusBar().showMessage(f"Failed to delete old update folder: {e}")
|
||||
|
||||
# Clear pending update info so new download proceeds
|
||||
self.pending_update_version = None
|
||||
self.pending_update_path = None
|
||||
|
||||
# Download the new update
|
||||
self.download_update(download_url, latest_version)
|
||||
elif cmp == 0:
|
||||
# Versions equal, no download needed
|
||||
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.")
|
||||
else:
|
||||
# Server version older than pending? Unlikely but just keep pending update
|
||||
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.")
|
||||
else:
|
||||
# No pending update, just download
|
||||
self.download_update(download_url, latest_version)
|
||||
|
||||
def download_update(self, download_url, latest_version):
|
||||
self.parent.statusBar().showMessage("Downloading update...")
|
||||
self.download_thread = UpdateDownloadThread(download_url, latest_version, self.platform_name, self.app_name)
|
||||
self.download_thread.update_ready.connect(self.on_update_ready)
|
||||
self.download_thread.error_occurred.connect(self.on_error)
|
||||
self.download_thread.start()
|
||||
|
||||
def on_update_ready(self, latest_version, extract_folder):
|
||||
self.parent.statusBar().showMessage("Update downloaded and extracted.")
|
||||
|
||||
msg_box = QMessageBox(self.parent)
|
||||
msg_box.setWindowTitle("Update Ready")
|
||||
msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?")
|
||||
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
|
||||
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
|
||||
|
||||
msg_box.exec()
|
||||
|
||||
if msg_box.clickedButton() == install_now_button:
|
||||
self.install_update(extract_folder)
|
||||
else:
|
||||
self.parent.statusBar().showMessage("Update ready. Install later.")
|
||||
|
||||
|
||||
def install_update(self, extract_folder):
|
||||
# Path to updater executable
|
||||
|
||||
if self.platform_name == 'windows':
|
||||
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater.exe")
|
||||
elif self.platform_name == 'darwin':
|
||||
if getattr(sys, 'frozen', False):
|
||||
updater_path = os.path.join(os.path.dirname(sys.executable), f"../../../{self.app_name}_updater.app")
|
||||
else:
|
||||
updater_path = os.path.join(os.getcwd(), f"../{self.app_name}_updater.app")
|
||||
|
||||
elif self.platform_name == 'linux':
|
||||
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater")
|
||||
else:
|
||||
updater_path = os.getcwd()
|
||||
|
||||
if not os.path.exists(updater_path):
|
||||
QMessageBox.critical(self.parent, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}")
|
||||
return
|
||||
|
||||
# Launch updater with extracted folder path as argument
|
||||
try:
|
||||
# Pass current app's executable path for updater to relaunch
|
||||
main_app_executable = os.path.abspath(sys.argv[0])
|
||||
|
||||
print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"')
|
||||
|
||||
if self.platform_name == 'darwin':
|
||||
subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable])
|
||||
else:
|
||||
subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path))
|
||||
|
||||
# Close the current app so updater can replace files
|
||||
sys.exit(0)
|
||||
|
||||
except Exception as e:
|
||||
QMessageBox.critical(self.parent, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}")
|
||||
|
||||
def on_error(self, message):
|
||||
# print(f"Error: {message}")
|
||||
self.parent.statusBar().showMessage(f"Error occurred during update process. {message}")
|
||||
|
||||
def version_compare(self, v1, v2):
|
||||
def normalize(v): return [int(x) for x in v.split(".")]
|
||||
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
|
||||
|
||||
|
||||
def wait_for_process_to_exit(process_name, timeout=10):
|
||||
"""
|
||||
Waits for a process with the specified name to exit within a timeout period.
|
||||
|
||||
Args:
|
||||
process_name (str): Name (or part of the name) of the process to wait for.
|
||||
timeout (int, optional): Maximum time to wait in seconds. Defaults to 10.
|
||||
|
||||
Returns:
|
||||
bool: True if the process exited before the timeout, False otherwise.
|
||||
"""
|
||||
|
||||
print(f"Waiting for {process_name} to exit...")
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
still_running = False
|
||||
for proc in psutil.process_iter(['name']):
|
||||
try:
|
||||
if proc.info['name'] and process_name.lower() in proc.info['name'].lower():
|
||||
still_running = True
|
||||
print(f"Still running: {proc.info['name']} (PID: {proc.pid})")
|
||||
break
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
continue
|
||||
if not still_running:
|
||||
print(f"{process_name} has exited.")
|
||||
return True
|
||||
time.sleep(0.5)
|
||||
print(f"{process_name} did not exit in time.")
|
||||
return False
|
||||
|
||||
|
||||
def finish_update_if_needed(platform_name, app_name):
|
||||
"""
|
||||
Completes a pending application update if '--finish-update' is present in the command-line arguments.
|
||||
"""
|
||||
|
||||
if "--finish-update" in sys.argv:
|
||||
print("Finishing update...")
|
||||
|
||||
if platform_name == 'darwin':
|
||||
app_dir = f'/tmp/{app_name}tempupdate'
|
||||
else:
|
||||
app_dir = os.getcwd()
|
||||
|
||||
# 1. Find update folder
|
||||
update_folder = None
|
||||
for entry in os.listdir(app_dir):
|
||||
entry_path = os.path.join(app_dir, entry)
|
||||
if os.path.isdir(entry_path) and entry.startswith(f"{app_name}-") and entry.endswith("-" + platform_name):
|
||||
update_folder = os.path.join(app_dir, entry)
|
||||
break
|
||||
|
||||
if update_folder is None:
|
||||
print("No update folder found. Skipping update steps.")
|
||||
return
|
||||
|
||||
if platform_name == 'darwin':
|
||||
update_folder = os.path.join(update_folder, f"{app_name}-darwin")
|
||||
|
||||
# 2. Wait for updater to exit
|
||||
print(f"Waiting for {app_name}_updater to exit...")
|
||||
for proc in psutil.process_iter(['pid', 'name']):
|
||||
if proc.info['name'] and f"{app_name}_updater" in proc.info['name'].lower():
|
||||
try:
|
||||
proc.wait(timeout=5)
|
||||
except psutil.TimeoutExpired:
|
||||
print(f"Force killing lingering {app_name}_updater")
|
||||
proc.kill()
|
||||
|
||||
# 3. Replace the updater
|
||||
if platform_name == 'windows':
|
||||
new_updater = os.path.join(update_folder, f"{app_name}_updater.exe")
|
||||
dest_updater = os.path.join(app_dir, f"{app_name}_updater.exe")
|
||||
|
||||
elif platform_name == 'darwin':
|
||||
new_updater = os.path.join(update_folder, f"{app_name}_updater.app")
|
||||
dest_updater = os.path.abspath(os.path.join(sys.executable, f"../../../../{app_name}_updater.app"))
|
||||
|
||||
elif platform_name == 'linux':
|
||||
new_updater = os.path.join(update_folder, f"{app_name}_updater")
|
||||
dest_updater = os.path.join(app_dir, f"{app_name}_updater")
|
||||
|
||||
else:
|
||||
print("Unknown Platform")
|
||||
new_updater = os.getcwd()
|
||||
dest_updater = os.getcwd()
|
||||
|
||||
print(f"New updater is {new_updater}")
|
||||
print(f"Dest updater is {dest_updater}")
|
||||
|
||||
print("Writable?", os.access(dest_updater, os.W_OK))
|
||||
print("Executable path:", sys.executable)
|
||||
print("Trying to copy:", new_updater, "->", dest_updater)
|
||||
|
||||
if os.path.exists(new_updater):
|
||||
try:
|
||||
if os.path.exists(dest_updater):
|
||||
if platform_name == 'darwin':
|
||||
try:
|
||||
if os.path.isdir(dest_updater):
|
||||
shutil.rmtree(dest_updater)
|
||||
print(f"Deleted directory: {dest_updater}")
|
||||
else:
|
||||
os.remove(dest_updater)
|
||||
print(f"Deleted file: {dest_updater}")
|
||||
except Exception as e:
|
||||
print(f"Error deleting {dest_updater}: {e}")
|
||||
else:
|
||||
os.remove(dest_updater)
|
||||
|
||||
if platform_name == 'darwin':
|
||||
wait_for_process_to_exit(f"{app_name}_updater", timeout=10)
|
||||
subprocess.check_call(["ditto", new_updater, dest_updater])
|
||||
else:
|
||||
shutil.copy2(new_updater, dest_updater)
|
||||
|
||||
if platform_name in ('linux', 'darwin'):
|
||||
os.chmod(dest_updater, 0o755)
|
||||
|
||||
if platform_name == 'darwin':
|
||||
remove_quarantine(dest_updater, app_name)
|
||||
|
||||
print(f"{app_name}_updater replaced.")
|
||||
except Exception as e:
|
||||
print(f"Failed to replace {app_name}_updater: {e}")
|
||||
|
||||
# 4. Delete the update folder
|
||||
try:
|
||||
if platform_name == 'darwin':
|
||||
shutil.rmtree(app_dir)
|
||||
else:
|
||||
shutil.rmtree(update_folder)
|
||||
except Exception as e:
|
||||
print(f"Failed to delete update folder: {e}")
|
||||
|
||||
QMessageBox.information(None, "Update Complete", "The application has been successfully updated.")
|
||||
sys.argv.remove("--finish-update")
|
||||
|
||||
|
||||
def remove_quarantine(app_path, app_name):
|
||||
"""
|
||||
Removes the macOS quarantine attribute from the specified application path.
|
||||
"""
|
||||
|
||||
script = f'''
|
||||
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{app_name.upper()} needs privileges to finish the update. (2/2)"
|
||||
'''
|
||||
try:
|
||||
subprocess.run(['osascript', '-e', script], check=True)
|
||||
print("✅ Quarantine attribute removed.")
|
||||
except subprocess.CalledProcessError as e:
|
||||
print("❌ Failed to remove quarantine attribute.")
|
||||
print(e)
|
||||
29
version_main.txt
Normal file
29
version_main.txt
Normal file
@@ -0,0 +1,29 @@
|
||||
VSVersionInfo(
|
||||
ffi=FixedFileInfo(
|
||||
filevers=(1, 0, 0, 0),
|
||||
prodvers=(1, 0, 0, 0),
|
||||
mask=0x3f,
|
||||
flags=0x0,
|
||||
OS=0x4,
|
||||
fileType=0x1,
|
||||
subtype=0x0,
|
||||
date=(0, 0)
|
||||
),
|
||||
kids=[
|
||||
StringFileInfo(
|
||||
[
|
||||
StringTable(
|
||||
'040904B0',
|
||||
[StringStruct('CompanyName', 'Tyler de Zeeuw'),
|
||||
StringStruct('FileDescription', 'BLAZES main application'),
|
||||
StringStruct('FileVersion', '1.0.0.0'),
|
||||
StringStruct('InternalName', 'blazes.exe'),
|
||||
StringStruct('LegalCopyright', '© 2025-2026 Tyler de Zeeuw'),
|
||||
StringStruct('OriginalFilename', 'blazes.exe'),
|
||||
StringStruct('ProductName', 'BLAZES'),
|
||||
StringStruct('ProductVersion', '1.0.0.0')])
|
||||
]
|
||||
),
|
||||
VarFileInfo([VarStruct('Translation', [1033, 1200])])
|
||||
]
|
||||
)
|
||||
29
version_updater.txt
Normal file
29
version_updater.txt
Normal file
@@ -0,0 +1,29 @@
|
||||
VSVersionInfo(
|
||||
ffi=FixedFileInfo(
|
||||
filevers=(1, 0, 0, 0),
|
||||
prodvers=(1, 0, 0, 0),
|
||||
mask=0x3f,
|
||||
flags=0x0,
|
||||
OS=0x4,
|
||||
fileType=0x1,
|
||||
subtype=0x0,
|
||||
date=(0, 0)
|
||||
),
|
||||
kids=[
|
||||
StringFileInfo(
|
||||
[
|
||||
StringTable(
|
||||
'040904B0',
|
||||
[StringStruct('CompanyName', 'Tyler de Zeeuw'),
|
||||
StringStruct('FileDescription', 'BLAZES updater application'),
|
||||
StringStruct('FileVersion', '1.0.0.0'),
|
||||
StringStruct('InternalName', 'main.exe'),
|
||||
StringStruct('LegalCopyright', '© 2025-2026 Tyler de Zeeuw'),
|
||||
StringStruct('OriginalFilename', 'blazes_updater.exe'),
|
||||
StringStruct('ProductName', 'BLAZES Updater'),
|
||||
StringStruct('ProductVersion', '1.0.0.0')])
|
||||
]
|
||||
),
|
||||
VarFileInfo([VarStruct('Translation', [1033, 1200])])
|
||||
]
|
||||
)
|
||||
BIN
yolov8n-pose.pt
Normal file
BIN
yolov8n-pose.pt
Normal file
Binary file not shown.
Reference in New Issue
Block a user