539 lines
21 KiB
Python
539 lines
21 KiB
Python
"""
|
|
Filename: updater.py
|
|
Description: Generic updater file
|
|
|
|
Author: Tyler de Zeeuw
|
|
License: GPL-3.0
|
|
"""
|
|
|
|
# Built-in imports
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import shlex
|
|
import shutil
|
|
import zipfile
|
|
import traceback
|
|
import subprocess
|
|
|
|
# External library imports
|
|
import psutil
|
|
import requests
|
|
|
|
from PySide6.QtWidgets import QMessageBox
|
|
from PySide6.QtCore import QThread, Signal, QObject
|
|
|
|
|
|
class UpdateDownloadThread(QThread):
|
|
"""
|
|
Thread that downloads and extracts an update package and emits a signal on completion or error.
|
|
|
|
Args:
|
|
download_url (str): URL of the update zip file to download.
|
|
latest_version (str): Version string of the latest update.
|
|
"""
|
|
|
|
update_ready = Signal(str, str)
|
|
error_occurred = Signal(str)
|
|
|
|
def __init__(self, download_url, latest_version, platform_name, app_name):
|
|
super().__init__()
|
|
self.download_url = download_url
|
|
self.latest_version = latest_version
|
|
self.platform_name = platform_name
|
|
self.app_name = app_name
|
|
|
|
def run(self):
|
|
try:
|
|
local_filename = os.path.basename(self.download_url)
|
|
|
|
if self.platform_name == 'darwin':
|
|
tmp_dir = f'/tmp/{self.app_name}tempupdate'
|
|
os.makedirs(tmp_dir, exist_ok=True)
|
|
local_path = os.path.join(tmp_dir, local_filename)
|
|
else:
|
|
local_path = os.path.join(os.getcwd(), local_filename)
|
|
|
|
# Download the file
|
|
with requests.get(self.download_url, stream=True, timeout=15) as r:
|
|
r.raise_for_status()
|
|
with open(local_path, 'wb') as f:
|
|
for chunk in r.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
|
|
# Extract folder name (remove .zip)
|
|
if self.platform_name == 'darwin':
|
|
extract_folder = os.path.splitext(local_filename)[0]
|
|
extract_path = os.path.join(tmp_dir, extract_folder)
|
|
|
|
else:
|
|
extract_folder = os.path.splitext(local_filename)[0]
|
|
extract_path = os.path.join(os.getcwd(), extract_folder)
|
|
|
|
# Create the folder if not exists
|
|
os.makedirs(extract_path, exist_ok=True)
|
|
|
|
# Extract the zip file contents
|
|
if self.platform_name == 'darwin':
|
|
subprocess.run(['ditto', '-xk', local_path, extract_path], check=True)
|
|
else:
|
|
with zipfile.ZipFile(local_path, 'r') as zip_ref:
|
|
zip_ref.extractall(extract_path)
|
|
|
|
# Remove the zip once extracted and emit a signal
|
|
os.remove(local_path)
|
|
self.update_ready.emit(self.latest_version, extract_path)
|
|
|
|
except Exception as e:
|
|
# Emit a signal signifying failure
|
|
self.error_occurred.emit(str(e))
|
|
|
|
|
|
|
|
class UpdateCheckThread(QThread):
|
|
"""
|
|
Thread that checks for updates by querying the API and emits a signal based on the result.
|
|
|
|
Signals:
|
|
download_requested(str, str): Emitted with (download_url, latest_version) when an update is available.
|
|
no_update_available(): Emitted when no update is found or current version is up to date.
|
|
error_occurred(str): Emitted with an error message if the update check fails.
|
|
"""
|
|
|
|
download_requested = Signal(str, str)
|
|
no_update_available = Signal()
|
|
error_occurred = Signal(str)
|
|
|
|
def __init__(self, api_url, api_url_sec, current_version, platform_name, app_name):
|
|
super().__init__()
|
|
self.api_url = api_url
|
|
self.api_url_sec = api_url_sec
|
|
self.current_version = current_version
|
|
self.platform_name = platform_name
|
|
self.app_name = app_name
|
|
|
|
def run(self):
|
|
# if not getattr(sys, 'frozen', False):
|
|
# self.error_occurred.emit("Application is not frozen (Development mode).")
|
|
# return
|
|
try:
|
|
latest_version, download_url = self.get_latest_release_for_platform()
|
|
if not latest_version:
|
|
self.no_update_available.emit()
|
|
return
|
|
|
|
if not download_url:
|
|
self.error_occurred.emit(f"No download available for platform '{self.platform_name}'")
|
|
return
|
|
|
|
if self.version_compare(latest_version, self.current_version) > 0:
|
|
self.download_requested.emit(download_url, latest_version)
|
|
else:
|
|
self.no_update_available.emit()
|
|
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Update check failed: {e}")
|
|
|
|
def version_compare(self, v1, v2):
|
|
def normalize(v): return [int(x) for x in v.split(".")]
|
|
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
|
|
|
|
def get_latest_release_for_platform(self):
|
|
urls = [self.api_url, self.api_url_sec]
|
|
for url in urls:
|
|
try:
|
|
|
|
response = requests.get(url, timeout=5)
|
|
response.raise_for_status()
|
|
releases = response.json()
|
|
|
|
if not releases:
|
|
continue
|
|
|
|
latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None)
|
|
|
|
if not latest:
|
|
continue
|
|
|
|
tag = latest["tag_name"].lstrip("v")
|
|
|
|
for asset in latest.get("assets", []):
|
|
if self.platform_name in asset["name"].lower():
|
|
return tag, asset["browser_download_url"]
|
|
|
|
return tag, None
|
|
except (requests.RequestException, ValueError) as e:
|
|
continue
|
|
return None, None
|
|
|
|
|
|
class LocalPendingUpdateCheckThread(QThread):
|
|
"""
|
|
Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly.
|
|
|
|
Args:
|
|
current_version (str): Current application version.
|
|
platform_suffix (str): Platform-specific suffix to identify update folders.
|
|
"""
|
|
|
|
pending_update_found = Signal(str, str)
|
|
no_pending_update = Signal()
|
|
|
|
def __init__(self, current_version, platform_suffix, platform_name, app_name):
|
|
super().__init__()
|
|
self.current_version = current_version
|
|
self.platform_suffix = platform_suffix
|
|
self.platform_name = platform_name
|
|
self.app_name = app_name
|
|
|
|
def version_compare(self, v1, v2):
|
|
def normalize(v): return [int(x) for x in v.split(".")]
|
|
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
|
|
|
|
def run(self):
|
|
if self.platform_name == 'darwin':
|
|
cwd = f'/tmp/{self.app_name}tempupdate'
|
|
else:
|
|
cwd = os.getcwd()
|
|
|
|
pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$")
|
|
found = False
|
|
|
|
try:
|
|
for item in os.listdir(cwd):
|
|
folder_path = os.path.join(cwd, item)
|
|
if os.path.isdir(folder_path) and item.endswith(self.platform_suffix):
|
|
match = pattern.match(item)
|
|
if match:
|
|
folder_version = match.group(1)
|
|
if self.version_compare(folder_version, self.current_version) > 0:
|
|
self.pending_update_found.emit(folder_version, folder_path)
|
|
found = True
|
|
break
|
|
except:
|
|
pass
|
|
|
|
if not found:
|
|
self.no_pending_update.emit()
|
|
|
|
|
|
|
|
|
|
|
|
class UpdateManager(QObject):
|
|
"""
|
|
Orchestrates the update process.
|
|
Main apps should instantiate this and call check_for_updates().
|
|
"""
|
|
|
|
def __init__(self, main_window, api_url, api_url_sec, current_version, platform_name, platform_suffix, app_name):
|
|
super().__init__()
|
|
self.parent = main_window
|
|
self.api_url = api_url
|
|
self.api_url_sec = api_url_sec
|
|
self.current_version = current_version
|
|
self.platform_name = platform_name
|
|
self.platform_suffix = platform_suffix
|
|
self.app_name = app_name
|
|
|
|
self.pending_update_version = None
|
|
self.pending_update_path = None
|
|
|
|
|
|
def manual_check_for_updates(self):
|
|
self.local_check_thread = LocalPendingUpdateCheckThread(self.current_version, self.platform_suffix, self.platform_name, self.app_name)
|
|
self.local_check_thread.pending_update_found.connect(self.on_pending_update_found)
|
|
self.local_check_thread.no_pending_update.connect(self.on_no_pending_update)
|
|
self.local_check_thread.start()
|
|
|
|
def on_pending_update_found(self, version, folder_path):
|
|
self.parent.statusBar().showMessage(f"Pending update found: version {version}")
|
|
self.pending_update_version = version
|
|
self.pending_update_path = folder_path
|
|
self.show_pending_update_popup()
|
|
|
|
def on_no_pending_update(self):
|
|
# No pending update found locally, start server check directly
|
|
self.parent.statusBar().showMessage("No pending local update found. Checking server...")
|
|
self.start_update_check_thread()
|
|
|
|
def show_pending_update_popup(self):
|
|
msg_box = QMessageBox(self.parent)
|
|
msg_box.setWindowTitle("Pending Update Found")
|
|
msg_box.setText(f"A previously downloaded update for {self.app_name.upper()} (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?")
|
|
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
|
|
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
|
|
msg_box.exec()
|
|
|
|
if msg_box.clickedButton() == install_now_button:
|
|
self.install_update(self.pending_update_path)
|
|
else:
|
|
self.parent.statusBar().showMessage("Pending update available. Install later.")
|
|
# After user dismisses, still check the server for new updates
|
|
self.start_update_check_thread()
|
|
|
|
def start_update_check_thread(self):
|
|
self.check_thread = UpdateCheckThread(self.api_url, self.api_url_sec, self.current_version, self.platform_name, self.app_name)
|
|
self.check_thread.download_requested.connect(self.on_server_update_requested)
|
|
self.check_thread.no_update_available.connect(self.on_server_no_update)
|
|
self.check_thread.error_occurred.connect(self.on_error)
|
|
self.check_thread.start()
|
|
|
|
def on_server_no_update(self):
|
|
self.parent.statusBar().showMessage("No new updates found on server.", 5000)
|
|
|
|
def on_server_update_requested(self, download_url, latest_version):
|
|
if self.pending_update_version:
|
|
cmp = self.version_compare(latest_version, self.pending_update_version)
|
|
if cmp > 0:
|
|
# Server version is newer than pending update
|
|
self.parent.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...")
|
|
try:
|
|
shutil.rmtree(self.pending_update_path)
|
|
self.parent.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}")
|
|
except Exception as e:
|
|
self.parent.statusBar().showMessage(f"Failed to delete old update folder: {e}")
|
|
|
|
# Clear pending update info so new download proceeds
|
|
self.pending_update_version = None
|
|
self.pending_update_path = None
|
|
|
|
# Download the new update
|
|
self.download_update(download_url, latest_version)
|
|
elif cmp == 0:
|
|
# Versions equal, no download needed
|
|
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.")
|
|
else:
|
|
# Server version older than pending? Unlikely but just keep pending update
|
|
self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.")
|
|
else:
|
|
# No pending update, just download
|
|
self.download_update(download_url, latest_version)
|
|
|
|
def download_update(self, download_url, latest_version):
|
|
self.parent.statusBar().showMessage("Downloading update...")
|
|
self.download_thread = UpdateDownloadThread(download_url, latest_version, self.platform_name, self.app_name)
|
|
self.download_thread.update_ready.connect(self.on_update_ready)
|
|
self.download_thread.error_occurred.connect(self.on_error)
|
|
self.download_thread.start()
|
|
|
|
def on_update_ready(self, latest_version, extract_folder):
|
|
self.parent.statusBar().showMessage("Update downloaded and extracted.")
|
|
|
|
msg_box = QMessageBox(self.parent)
|
|
msg_box.setWindowTitle("Update Ready")
|
|
msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?")
|
|
install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole)
|
|
install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole)
|
|
|
|
msg_box.exec()
|
|
|
|
if msg_box.clickedButton() == install_now_button:
|
|
self.install_update(extract_folder)
|
|
else:
|
|
self.parent.statusBar().showMessage("Update ready. Install later.")
|
|
|
|
|
|
def install_update(self, extract_folder):
|
|
# Path to updater executable
|
|
|
|
if self.platform_name == 'windows':
|
|
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater.exe")
|
|
elif self.platform_name == 'darwin':
|
|
if getattr(sys, 'frozen', False):
|
|
updater_path = os.path.join(os.path.dirname(sys.executable), f"../../../{self.app_name}_updater.app")
|
|
else:
|
|
updater_path = os.path.join(os.getcwd(), f"../{self.app_name}_updater.app")
|
|
|
|
elif self.platform_name == 'linux':
|
|
updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater")
|
|
else:
|
|
updater_path = os.getcwd()
|
|
|
|
if not os.path.exists(updater_path):
|
|
QMessageBox.critical(self.parent, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}")
|
|
return
|
|
|
|
# Launch updater with extracted folder path as argument
|
|
try:
|
|
# Pass current app's executable path for updater to relaunch
|
|
main_app_executable = os.path.abspath(sys.argv[0])
|
|
|
|
print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"')
|
|
|
|
if self.platform_name == 'darwin':
|
|
subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable])
|
|
else:
|
|
subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path))
|
|
|
|
# Close the current app so updater can replace files
|
|
sys.exit(0)
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(self.parent, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}")
|
|
|
|
def on_error(self, message):
|
|
# print(f"Error: {message}")
|
|
self.parent.statusBar().showMessage(f"Error occurred during update process. {message}")
|
|
|
|
def version_compare(self, v1, v2):
|
|
def normalize(v): return [int(x) for x in v.split(".")]
|
|
return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2))
|
|
|
|
|
|
def wait_for_process_to_exit(process_name, timeout=10):
|
|
"""
|
|
Waits for a process with the specified name to exit within a timeout period.
|
|
|
|
Args:
|
|
process_name (str): Name (or part of the name) of the process to wait for.
|
|
timeout (int, optional): Maximum time to wait in seconds. Defaults to 10.
|
|
|
|
Returns:
|
|
bool: True if the process exited before the timeout, False otherwise.
|
|
"""
|
|
|
|
print(f"Waiting for {process_name} to exit...")
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
still_running = False
|
|
for proc in psutil.process_iter(['name']):
|
|
try:
|
|
if proc.info['name'] and process_name.lower() in proc.info['name'].lower():
|
|
still_running = True
|
|
print(f"Still running: {proc.info['name']} (PID: {proc.pid})")
|
|
break
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
continue
|
|
if not still_running:
|
|
print(f"{process_name} has exited.")
|
|
return True
|
|
time.sleep(0.5)
|
|
print(f"{process_name} did not exit in time.")
|
|
return False
|
|
|
|
|
|
def finish_update_if_needed(platform_name, app_name):
|
|
"""
|
|
Completes a pending application update if '--finish-update' is present in the command-line arguments.
|
|
"""
|
|
|
|
if "--finish-update" in sys.argv:
|
|
print("Finishing update...")
|
|
|
|
if platform_name == 'darwin':
|
|
app_dir = f'/tmp/{app_name}tempupdate'
|
|
else:
|
|
app_dir = os.getcwd()
|
|
|
|
# 1. Find update folder
|
|
update_folder = None
|
|
for entry in os.listdir(app_dir):
|
|
entry_path = os.path.join(app_dir, entry)
|
|
if os.path.isdir(entry_path) and entry.startswith(f"{app_name}-") and entry.endswith("-" + platform_name):
|
|
update_folder = os.path.join(app_dir, entry)
|
|
break
|
|
|
|
if update_folder is None:
|
|
print("No update folder found. Skipping update steps.")
|
|
return
|
|
|
|
if platform_name == 'darwin':
|
|
update_folder = os.path.join(update_folder, f"{app_name}-darwin")
|
|
|
|
# 2. Wait for updater to exit
|
|
print(f"Waiting for {app_name}_updater to exit...")
|
|
for proc in psutil.process_iter(['pid', 'name']):
|
|
if proc.info['name'] and f"{app_name}_updater" in proc.info['name'].lower():
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except psutil.TimeoutExpired:
|
|
print(f"Force killing lingering {app_name}_updater")
|
|
proc.kill()
|
|
|
|
# 3. Replace the updater
|
|
if platform_name == 'windows':
|
|
new_updater = os.path.join(update_folder, f"{app_name}_updater.exe")
|
|
dest_updater = os.path.join(app_dir, f"{app_name}_updater.exe")
|
|
|
|
elif platform_name == 'darwin':
|
|
new_updater = os.path.join(update_folder, f"{app_name}_updater.app")
|
|
dest_updater = os.path.abspath(os.path.join(sys.executable, f"../../../../{app_name}_updater.app"))
|
|
|
|
elif platform_name == 'linux':
|
|
new_updater = os.path.join(update_folder, f"{app_name}_updater")
|
|
dest_updater = os.path.join(app_dir, f"{app_name}_updater")
|
|
|
|
else:
|
|
print("Unknown Platform")
|
|
new_updater = os.getcwd()
|
|
dest_updater = os.getcwd()
|
|
|
|
print(f"New updater is {new_updater}")
|
|
print(f"Dest updater is {dest_updater}")
|
|
|
|
print("Writable?", os.access(dest_updater, os.W_OK))
|
|
print("Executable path:", sys.executable)
|
|
print("Trying to copy:", new_updater, "->", dest_updater)
|
|
|
|
if os.path.exists(new_updater):
|
|
try:
|
|
if os.path.exists(dest_updater):
|
|
if platform_name == 'darwin':
|
|
try:
|
|
if os.path.isdir(dest_updater):
|
|
shutil.rmtree(dest_updater)
|
|
print(f"Deleted directory: {dest_updater}")
|
|
else:
|
|
os.remove(dest_updater)
|
|
print(f"Deleted file: {dest_updater}")
|
|
except Exception as e:
|
|
print(f"Error deleting {dest_updater}: {e}")
|
|
else:
|
|
os.remove(dest_updater)
|
|
|
|
if platform_name == 'darwin':
|
|
wait_for_process_to_exit(f"{app_name}_updater", timeout=10)
|
|
subprocess.check_call(["ditto", new_updater, dest_updater])
|
|
else:
|
|
shutil.copy2(new_updater, dest_updater)
|
|
|
|
if platform_name in ('linux', 'darwin'):
|
|
os.chmod(dest_updater, 0o755)
|
|
|
|
if platform_name == 'darwin':
|
|
remove_quarantine(dest_updater, app_name)
|
|
|
|
print(f"{app_name}_updater replaced.")
|
|
except Exception as e:
|
|
print(f"Failed to replace {app_name}_updater: {e}")
|
|
|
|
# 4. Delete the update folder
|
|
try:
|
|
if platform_name == 'darwin':
|
|
shutil.rmtree(app_dir)
|
|
else:
|
|
shutil.rmtree(update_folder)
|
|
except Exception as e:
|
|
print(f"Failed to delete update folder: {e}")
|
|
|
|
QMessageBox.information(None, "Update Complete", "The application has been successfully updated.")
|
|
sys.argv.remove("--finish-update")
|
|
|
|
|
|
def remove_quarantine(app_path, app_name):
|
|
"""
|
|
Removes the macOS quarantine attribute from the specified application path.
|
|
"""
|
|
|
|
script = f'''
|
|
do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{app_name.upper()} needs privileges to finish the update. (2/2)"
|
|
'''
|
|
try:
|
|
subprocess.run(['osascript', '-e', script], check=True)
|
|
print("✅ Quarantine attribute removed.")
|
|
except subprocess.CalledProcessError as e:
|
|
print("❌ Failed to remove quarantine attribute.")
|
|
print(e) |