""" Filename: updater.py Description: Generic updater file Author: Tyler de Zeeuw License: GPL-3.0 """ # Built-in imports import os import re import sys import time import shlex import shutil import zipfile import traceback import subprocess # External library imports import psutil import requests from PySide6.QtWidgets import QMessageBox from PySide6.QtCore import QThread, Signal, QObject class UpdateDownloadThread(QThread): """ Thread that downloads and extracts an update package and emits a signal on completion or error. Args: download_url (str): URL of the update zip file to download. latest_version (str): Version string of the latest update. """ update_ready = Signal(str, str) error_occurred = Signal(str) def __init__(self, download_url, latest_version, platform_name, app_name): super().__init__() self.download_url = download_url self.latest_version = latest_version self.platform_name = platform_name self.app_name = app_name def run(self): try: local_filename = os.path.basename(self.download_url) if self.platform_name == 'darwin': tmp_dir = f'/tmp/{self.app_name}tempupdate' os.makedirs(tmp_dir, exist_ok=True) local_path = os.path.join(tmp_dir, local_filename) else: local_path = os.path.join(os.getcwd(), local_filename) # Download the file with requests.get(self.download_url, stream=True, timeout=15) as r: r.raise_for_status() with open(local_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) # Extract folder name (remove .zip) if self.platform_name == 'darwin': extract_folder = os.path.splitext(local_filename)[0] extract_path = os.path.join(tmp_dir, extract_folder) else: extract_folder = os.path.splitext(local_filename)[0] extract_path = os.path.join(os.getcwd(), extract_folder) # Create the folder if not exists os.makedirs(extract_path, exist_ok=True) # Extract the zip file contents if self.platform_name == 'darwin': subprocess.run(['ditto', '-xk', local_path, extract_path], check=True) else: with zipfile.ZipFile(local_path, 'r') as zip_ref: zip_ref.extractall(extract_path) # Remove the zip once extracted and emit a signal os.remove(local_path) self.update_ready.emit(self.latest_version, extract_path) except Exception as e: # Emit a signal signifying failure self.error_occurred.emit(str(e)) class UpdateCheckThread(QThread): """ Thread that checks for updates by querying the API and emits a signal based on the result. Signals: download_requested(str, str): Emitted with (download_url, latest_version) when an update is available. no_update_available(): Emitted when no update is found or current version is up to date. error_occurred(str): Emitted with an error message if the update check fails. """ download_requested = Signal(str, str) no_update_available = Signal() error_occurred = Signal(str) def __init__(self, api_url, api_url_sec, current_version, platform_name, app_name): super().__init__() self.api_url = api_url self.api_url_sec = api_url_sec self.current_version = current_version self.platform_name = platform_name self.app_name = app_name def run(self): # if not getattr(sys, 'frozen', False): # self.error_occurred.emit("Application is not frozen (Development mode).") # return try: latest_version, download_url = self.get_latest_release_for_platform() if not latest_version: self.no_update_available.emit() return if not download_url: self.error_occurred.emit(f"No download available for platform '{self.platform_name}'") return if self.version_compare(latest_version, self.current_version) > 0: self.download_requested.emit(download_url, latest_version) else: self.no_update_available.emit() except Exception as e: self.error_occurred.emit(f"Update check failed: {e}") def version_compare(self, v1, v2): def normalize(v): return [int(x) for x in v.split(".")] return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) def get_latest_release_for_platform(self): urls = [self.api_url, self.api_url_sec] for url in urls: try: response = requests.get(url, timeout=5) response.raise_for_status() releases = response.json() if not releases: continue latest = next((r for r in releases if not r.get("prerelease") and not r.get("draft")), None) if not latest: continue tag = latest["tag_name"].lstrip("v") for asset in latest.get("assets", []): if self.platform_name in asset["name"].lower(): return tag, asset["browser_download_url"] return tag, None except (requests.RequestException, ValueError) as e: continue return None, None class LocalPendingUpdateCheckThread(QThread): """ Thread that checks for locally pending updates by scanning the download directory and emits a signal accordingly. Args: current_version (str): Current application version. platform_suffix (str): Platform-specific suffix to identify update folders. """ pending_update_found = Signal(str, str) no_pending_update = Signal() def __init__(self, current_version, platform_suffix, platform_name, app_name): super().__init__() self.current_version = current_version self.platform_suffix = platform_suffix self.platform_name = platform_name self.app_name = app_name def version_compare(self, v1, v2): def normalize(v): return [int(x) for x in v.split(".")] return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) def run(self): if self.platform_name == 'darwin': cwd = f'/tmp/{self.app_name}tempupdate' else: cwd = os.getcwd() pattern = re.compile(r".*-(\d+\.\d+\.\d+)" + re.escape(self.platform_suffix) + r"$") found = False try: for item in os.listdir(cwd): folder_path = os.path.join(cwd, item) if os.path.isdir(folder_path) and item.endswith(self.platform_suffix): match = pattern.match(item) if match: folder_version = match.group(1) if self.version_compare(folder_version, self.current_version) > 0: self.pending_update_found.emit(folder_version, folder_path) found = True break except: pass if not found: self.no_pending_update.emit() class UpdateManager(QObject): """ Orchestrates the update process. Main apps should instantiate this and call check_for_updates(). """ def __init__(self, main_window, api_url, api_url_sec, current_version, platform_name, platform_suffix, app_name): super().__init__() self.parent = main_window self.api_url = api_url self.api_url_sec = api_url_sec self.current_version = current_version self.platform_name = platform_name self.platform_suffix = platform_suffix self.app_name = app_name self.pending_update_version = None self.pending_update_path = None def manual_check_for_updates(self): self.local_check_thread = LocalPendingUpdateCheckThread(self.current_version, self.platform_suffix, self.platform_name, self.app_name) self.local_check_thread.pending_update_found.connect(self.on_pending_update_found) self.local_check_thread.no_pending_update.connect(self.on_no_pending_update) self.local_check_thread.start() def on_pending_update_found(self, version, folder_path): self.parent.statusBar().showMessage(f"Pending update found: version {version}") self.pending_update_version = version self.pending_update_path = folder_path self.show_pending_update_popup() def on_no_pending_update(self): # No pending update found locally, start server check directly self.parent.statusBar().showMessage("No pending local update found. Checking server...") self.start_update_check_thread() def show_pending_update_popup(self): msg_box = QMessageBox(self.parent) msg_box.setWindowTitle("Pending Update Found") msg_box.setText(f"A previously downloaded update for {self.app_name.upper()} (version {self.pending_update_version}) is available at:\n{self.pending_update_path}\nWould you like to install it now?") install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) msg_box.exec() if msg_box.clickedButton() == install_now_button: self.install_update(self.pending_update_path) else: self.parent.statusBar().showMessage("Pending update available. Install later.") # After user dismisses, still check the server for new updates self.start_update_check_thread() def start_update_check_thread(self): self.check_thread = UpdateCheckThread(self.api_url, self.api_url_sec, self.current_version, self.platform_name, self.app_name) self.check_thread.download_requested.connect(self.on_server_update_requested) self.check_thread.no_update_available.connect(self.on_server_no_update) self.check_thread.error_occurred.connect(self.on_error) self.check_thread.start() def on_server_no_update(self): self.parent.statusBar().showMessage("No new updates found on server.", 5000) def on_server_update_requested(self, download_url, latest_version): if self.pending_update_version: cmp = self.version_compare(latest_version, self.pending_update_version) if cmp > 0: # Server version is newer than pending update self.parent.statusBar().showMessage(f"Newer version {latest_version} available on server. Removing old pending update...") try: shutil.rmtree(self.pending_update_path) self.parent.statusBar().showMessage(f"Deleted old update folder: {self.pending_update_path}") except Exception as e: self.parent.statusBar().showMessage(f"Failed to delete old update folder: {e}") # Clear pending update info so new download proceeds self.pending_update_version = None self.pending_update_path = None # Download the new update self.download_update(download_url, latest_version) elif cmp == 0: # Versions equal, no download needed self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is already latest. No download needed.") else: # Server version older than pending? Unlikely but just keep pending update self.parent.statusBar().showMessage(f"Pending update version {self.pending_update_version} is newer than server version. No action.") else: # No pending update, just download self.download_update(download_url, latest_version) def download_update(self, download_url, latest_version): self.parent.statusBar().showMessage("Downloading update...") self.download_thread = UpdateDownloadThread(download_url, latest_version, self.platform_name, self.app_name) self.download_thread.update_ready.connect(self.on_update_ready) self.download_thread.error_occurred.connect(self.on_error) self.download_thread.start() def on_update_ready(self, latest_version, extract_folder): self.parent.statusBar().showMessage("Update downloaded and extracted.") msg_box = QMessageBox(self.parent) msg_box.setWindowTitle("Update Ready") msg_box.setText(f"Version {latest_version} has been downloaded and extracted to:\n{extract_folder}\nWould you like to install it now?") install_now_button = msg_box.addButton("Install Now", QMessageBox.ButtonRole.AcceptRole) install_later_button = msg_box.addButton("Install Later", QMessageBox.ButtonRole.RejectRole) msg_box.exec() if msg_box.clickedButton() == install_now_button: self.install_update(extract_folder) else: self.parent.statusBar().showMessage("Update ready. Install later.") def install_update(self, extract_folder): # Path to updater executable if self.platform_name == 'windows': updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater.exe") elif self.platform_name == 'darwin': if getattr(sys, 'frozen', False): updater_path = os.path.join(os.path.dirname(sys.executable), f"../../../{self.app_name}_updater.app") else: updater_path = os.path.join(os.getcwd(), f"../{self.app_name}_updater.app") elif self.platform_name == 'linux': updater_path = os.path.join(os.getcwd(), f"{self.app_name}_updater") else: updater_path = os.getcwd() if not os.path.exists(updater_path): QMessageBox.critical(self.parent, "Error", f"Updater not found at:\n{updater_path}. The absolute path was {os.path.abspath(updater_path)}") return # Launch updater with extracted folder path as argument try: # Pass current app's executable path for updater to relaunch main_app_executable = os.path.abspath(sys.argv[0]) print(f'Launching updater with: "{updater_path}" "{extract_folder}" "{main_app_executable}"') if self.platform_name == 'darwin': subprocess.Popen(['open', updater_path, '--args', extract_folder, main_app_executable]) else: subprocess.Popen([updater_path, f'{extract_folder}', f'{main_app_executable}'], cwd=os.path.dirname(updater_path)) # Close the current app so updater can replace files sys.exit(0) except Exception as e: QMessageBox.critical(self.parent, "Error", f"[Updater Launch Failed]\n{str(e)}\n{traceback.format_exc()}") def on_error(self, message): # print(f"Error: {message}") self.parent.statusBar().showMessage(f"Error occurred during update process. {message}") def version_compare(self, v1, v2): def normalize(v): return [int(x) for x in v.split(".")] return (normalize(v1) > normalize(v2)) - (normalize(v1) < normalize(v2)) def wait_for_process_to_exit(process_name, timeout=10): """ Waits for a process with the specified name to exit within a timeout period. Args: process_name (str): Name (or part of the name) of the process to wait for. timeout (int, optional): Maximum time to wait in seconds. Defaults to 10. Returns: bool: True if the process exited before the timeout, False otherwise. """ print(f"Waiting for {process_name} to exit...") deadline = time.time() + timeout while time.time() < deadline: still_running = False for proc in psutil.process_iter(['name']): try: if proc.info['name'] and process_name.lower() in proc.info['name'].lower(): still_running = True print(f"Still running: {proc.info['name']} (PID: {proc.pid})") break except (psutil.NoSuchProcess, psutil.AccessDenied): continue if not still_running: print(f"{process_name} has exited.") return True time.sleep(0.5) print(f"{process_name} did not exit in time.") return False def finish_update_if_needed(platform_name, app_name): """ Completes a pending application update if '--finish-update' is present in the command-line arguments. """ if "--finish-update" in sys.argv: print("Finishing update...") if platform_name == 'darwin': app_dir = f'/tmp/{app_name}tempupdate' else: app_dir = os.getcwd() # 1. Find update folder update_folder = None for entry in os.listdir(app_dir): entry_path = os.path.join(app_dir, entry) if os.path.isdir(entry_path) and entry.startswith(f"{app_name}-") and entry.endswith("-" + platform_name): update_folder = os.path.join(app_dir, entry) break if update_folder is None: print("No update folder found. Skipping update steps.") return if platform_name == 'darwin': update_folder = os.path.join(update_folder, f"{app_name}-darwin") # 2. Wait for updater to exit print(f"Waiting for {app_name}_updater to exit...") for proc in psutil.process_iter(['pid', 'name']): if proc.info['name'] and f"{app_name}_updater" in proc.info['name'].lower(): try: proc.wait(timeout=5) except psutil.TimeoutExpired: print(f"Force killing lingering {app_name}_updater") proc.kill() # 3. Replace the updater if platform_name == 'windows': new_updater = os.path.join(update_folder, f"{app_name}_updater.exe") dest_updater = os.path.join(app_dir, f"{app_name}_updater.exe") elif platform_name == 'darwin': new_updater = os.path.join(update_folder, f"{app_name}_updater.app") dest_updater = os.path.abspath(os.path.join(sys.executable, f"../../../../{app_name}_updater.app")) elif platform_name == 'linux': new_updater = os.path.join(update_folder, f"{app_name}_updater") dest_updater = os.path.join(app_dir, f"{app_name}_updater") else: print("Unknown Platform") new_updater = os.getcwd() dest_updater = os.getcwd() print(f"New updater is {new_updater}") print(f"Dest updater is {dest_updater}") print("Writable?", os.access(dest_updater, os.W_OK)) print("Executable path:", sys.executable) print("Trying to copy:", new_updater, "->", dest_updater) if os.path.exists(new_updater): try: if os.path.exists(dest_updater): if platform_name == 'darwin': try: if os.path.isdir(dest_updater): shutil.rmtree(dest_updater) print(f"Deleted directory: {dest_updater}") else: os.remove(dest_updater) print(f"Deleted file: {dest_updater}") except Exception as e: print(f"Error deleting {dest_updater}: {e}") else: os.remove(dest_updater) if platform_name == 'darwin': wait_for_process_to_exit(f"{app_name}_updater", timeout=10) subprocess.check_call(["ditto", new_updater, dest_updater]) else: shutil.copy2(new_updater, dest_updater) if platform_name in ('linux', 'darwin'): os.chmod(dest_updater, 0o755) if platform_name == 'darwin': remove_quarantine(dest_updater, app_name) print(f"{app_name}_updater replaced.") except Exception as e: print(f"Failed to replace {app_name}_updater: {e}") # 4. Delete the update folder try: if platform_name == 'darwin': shutil.rmtree(app_dir) else: shutil.rmtree(update_folder) except Exception as e: print(f"Failed to delete update folder: {e}") QMessageBox.information(None, "Update Complete", "The application has been successfully updated.") sys.argv.remove("--finish-update") def remove_quarantine(app_path, app_name): """ Removes the macOS quarantine attribute from the specified application path. """ script = f''' do shell script "xattr -d -r com.apple.quarantine {shlex.quote(app_path)}" with administrator privileges with prompt "{app_name.upper()} needs privileges to finish the update. (2/2)" ''' try: subprocess.run(['osascript', '-e', script], check=True) print("✅ Quarantine attribute removed.") except subprocess.CalledProcessError as e: print("❌ Failed to remove quarantine attribute.") print(e)