import argparse import zipfile from io import BytesIO import bsdiff4 from datetime import datetime import hashlib import json import logging import os import requests import secrets import shutil import subprocess from tkinter import messagebox from typing import Any, Dict, Set import urllib import urllib.parse import Utils from .Constants import * from . import SavingPrincessWorld files_to_clean: Set[str] = { "D3DX9_43.dll", "data.win", "m_boss.ogg", "m_brainos.ogg", "m_coldarea.ogg", "m_escape.ogg", "m_hotarea.ogg", "m_hsis_dark.ogg", "m_hsis_power.ogg", "m_introarea.ogg", "m_malakhov.ogg", "m_miniboss.ogg", "m_ninja.ogg", "m_purple.ogg", "m_space_idle.ogg", "m_stonearea.ogg", "m_swamp.ogg", "m_zzz.ogg", "options.ini", "Saving Princess v0_8.exe", "splash.png", "gm-apclientpp.dll", "LICENSE", "original_data.win", "versions.json", } file_hashes: Dict[str, str] = { "D3DX9_43.dll": "86e39e9161c3d930d93822f1563c280d", "Saving Princess v0_8.exe": "cc3ad10c782e115d93c5b9fbc5675eaf", "original_data.win": "f97b80204bd9ae535faa5a8d1e5eb6ca", } class UrlResponse: def __init__(self, response_code: int, data: Any): self.response_code = response_code self.data = data def get_date(target_asset: str) -> str: """Provided the name of an asset, fetches its update date""" try: with open("versions.json", "r") as versions_json: return json.load(versions_json)[target_asset] except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError): return "2000-01-01T00:00:00Z" # arbitrary old date def set_date(target_asset: str, date: str) -> None: """Provided the name of an asset and a date, sets it update date""" try: with open("versions.json", "r") as versions_json: versions = json.load(versions_json) versions[target_asset] = date except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError): versions = {target_asset: date} with open("versions.json", "w") as versions_json: json.dump(versions, versions_json) def get_timestamp(date: str) -> float: """Parses a GitHub REST API date into a timestamp""" return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ").timestamp() def send_request(request_url: str) -> UrlResponse: """Fetches status code and json response from given url""" response = requests.get(request_url) if response.status_code == 200: # success try: data = response.json() except requests.exceptions.JSONDecodeError: raise RuntimeError(f"Unable to fetch data. (status code {response.status_code}).") else: data = {} return UrlResponse(response.status_code, data) def update(target_asset: str, url: str) -> bool: """ Returns True if the data was fetched and installed (or it was already on the latest version, or the user refused the update) Returns False if rate limit was exceeded """ try: logging.info(f"Checking for {target_asset} updates.") response = send_request(url) if response.response_code == 403: # rate limit exceeded return False assets = response.data[0]["assets"] for asset in assets: if target_asset in asset["name"]: newest_date: str = asset["updated_at"] release_url: str = asset["browser_download_url"] break else: raise RuntimeError(f"Failed to locate {target_asset} amongst the assets.") except (KeyError, IndexError, TypeError, RuntimeError): update_error = f"Failed to fetch latest {target_asset}." messagebox.showerror("Failure", update_error) raise RuntimeError(update_error) try: update_available = get_timestamp(newest_date) > get_timestamp(get_date(target_asset)) if update_available and messagebox.askyesnocancel(f"New {target_asset}", "Would you like to install the new version now?"): # unzip and patch with urllib.request.urlopen(release_url) as download: with zipfile.ZipFile(BytesIO(download.read())) as zf: zf.extractall() patch_game() set_date(target_asset, newest_date) except (ValueError, RuntimeError, urllib.error.HTTPError): update_error = f"Failed to apply update." messagebox.showerror("Failure", update_error) raise RuntimeError(update_error) return True def patch_game() -> None: """Applies the patch to data.win""" logging.info("Proceeding to patch.") with open(PATCH_NAME, "rb") as patch: with open("original_data.win", "rb") as data: patched_data = bsdiff4.patch(data.read(), patch.read()) with open("data.win", "wb") as data: data.write(patched_data) logging.info("Done!") def is_install_valid() -> bool: """Checks that the mandatory files that we cannot replace do exist in the current folder""" for file_name, expected_hash in file_hashes.items(): if not os.path.exists(file_name): return False with open(file_name, "rb") as clean: current_hash = hashlib.md5(clean.read()).hexdigest() if not secrets.compare_digest(current_hash, expected_hash): return False return True def install() -> None: """Extracts all the game files into the mod installation folder""" logging.info("Mod installation missing or corrupted, proceeding to reinstall.") # get the cab file and extract it into the installation folder with open(SavingPrincessWorld.settings.exe_path, "rb") as exe: # find the cab header logging.info("Looking for cab archive inside exe.") cab_found: bool = False while not cab_found: cab_found = exe.read(1) == b'M' and exe.read(1) == b'S' and exe.read(1) == b'C' and exe.read(1) == b'F' exe.read(4) # skip reserved1, always 0 cab_size: int = int.from_bytes(exe.read(4), "little") # read size in bytes exe.seek(-12, 1) # move the cursor back to the start of the cab file logging.info(f"Archive found at offset {hex(exe.seek(0, 1))}, size: {hex(cab_size)}.") logging.info("Extracting cab archive from exe.") with open("saving_princess.cab", "wb") as cab: cab.write(exe.read(cab_size)) # clean up files from previous installations for file_name in files_to_clean: if os.path.exists(file_name): os.remove(file_name) logging.info("Extracting files from cab archive.") if Utils.is_windows: subprocess.run(["Extrac32", "/Y", "/E", "saving_princess.cab"]) else: if shutil.which("wine") is not None: subprocess.run(["wine", "Extrac32", "/Y", "/E", "saving_princess.cab"]) elif shutil.which("7z") is not None: subprocess.run(["7z", "e", "saving_princess.cab"]) else: error = "Could not find neither wine nor 7z.\n\nPlease install either the wine or the p7zip package." messagebox.showerror("Missing package!", f"Error: {error}") raise RuntimeError(error) os.remove("saving_princess.cab") # delete the cab file shutil.copyfile("data.win", "original_data.win") # and make a copy of data.win logging.info("Done!") def launch(*args: str) -> Any: """Check args, then the mod installation, then launch the game""" name: str = "" password: str = "" server: str = "" if args: parser = argparse.ArgumentParser(description=f"{GAME_NAME} Client Launcher") parser.add_argument("url", type=str, nargs="?", help="Archipelago Webhost uri to auto connect to.") args = parser.parse_args(args) # handle if text client is launched using the "archipelago://name:pass@host:port" url from webhost if args.url: url = urllib.parse.urlparse(args.url) if url.scheme == "archipelago": server = f'--server="{url.hostname}:{url.port}"' if url.username: name = f'--name="{urllib.parse.unquote(url.username)}"' if url.password: password = f'--password="{urllib.parse.unquote(url.password)}"' else: parser.error(f"bad url, found {args.url}, expected url in form of archipelago://archipelago.gg:38281") Utils.init_logging(CLIENT_NAME, exception_logger="Client") os.chdir(SavingPrincessWorld.settings.install_folder) # check that the mod installation is valid if not is_install_valid(): if messagebox.askyesnocancel(f"Mod installation missing or corrupted!", "Would you like to reinstall now?"): install() # if there is no mod installation, and we are not installing it, then there isn't much to do else: return # check for updates if not update(DOWNLOAD_NAME, DOWNLOAD_URL): messagebox.showinfo("Rate limit exceeded", "GitHub REST API limit exceeded, could not check for updates.\n\n" "This will not prevent the game from being played if it was already playable.") # and try to launch the game if SavingPrincessWorld.settings.launch_game: logging.info("Launching game.") try: subprocess.Popen(f"{SavingPrincessWorld.settings.launch_command} {name} {password} {server}") except FileNotFoundError: error = ("Could not run the game!\n\n" "Please check that launch_command in options.yaml or host.yaml is set up correctly.") messagebox.showerror("Command error!", f"Error: {error}") raise RuntimeError(error)