Archipelago/worlds/saving_princess/Client.py

259 lines
9.7 KiB
Python

import argparse
import zipfile
from io import BytesIO
import bsdiff4
from datetime import datetime
import hashlib
import json
import logging
import os
import requests
import secrets
import shutil
import subprocess
from tkinter import messagebox
from typing import Any, Dict, Set
import urllib
import urllib.parse
import Utils
from .Constants import *
from . import SavingPrincessWorld
files_to_clean: Set[str] = {
"D3DX9_43.dll",
"data.win",
"m_boss.ogg",
"m_brainos.ogg",
"m_coldarea.ogg",
"m_escape.ogg",
"m_hotarea.ogg",
"m_hsis_dark.ogg",
"m_hsis_power.ogg",
"m_introarea.ogg",
"m_malakhov.ogg",
"m_miniboss.ogg",
"m_ninja.ogg",
"m_purple.ogg",
"m_space_idle.ogg",
"m_stonearea.ogg",
"m_swamp.ogg",
"m_zzz.ogg",
"options.ini",
"Saving Princess v0_8.exe",
"splash.png",
"gm-apclientpp.dll",
"LICENSE",
"original_data.win",
"versions.json",
}
file_hashes: Dict[str, str] = {
"D3DX9_43.dll": "86e39e9161c3d930d93822f1563c280d",
"Saving Princess v0_8.exe": "cc3ad10c782e115d93c5b9fbc5675eaf",
"original_data.win": "f97b80204bd9ae535faa5a8d1e5eb6ca",
}
class UrlResponse:
def __init__(self, response_code: int, data: Any):
self.response_code = response_code
self.data = data
def get_date(target_asset: str) -> str:
"""Provided the name of an asset, fetches its update date"""
try:
with open("versions.json", "r") as versions_json:
return json.load(versions_json)[target_asset]
except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError):
return "2000-01-01T00:00:00Z" # arbitrary old date
def set_date(target_asset: str, date: str) -> None:
"""Provided the name of an asset and a date, sets it update date"""
try:
with open("versions.json", "r") as versions_json:
versions = json.load(versions_json)
versions[target_asset] = date
except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError):
versions = {target_asset: date}
with open("versions.json", "w") as versions_json:
json.dump(versions, versions_json)
def get_timestamp(date: str) -> float:
"""Parses a GitHub REST API date into a timestamp"""
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ").timestamp()
def send_request(request_url: str) -> UrlResponse:
"""Fetches status code and json response from given url"""
response = requests.get(request_url)
if response.status_code == 200: # success
try:
data = response.json()
except requests.exceptions.JSONDecodeError:
raise RuntimeError(f"Unable to fetch data. (status code {response.status_code}).")
else:
data = {}
return UrlResponse(response.status_code, data)
def update(target_asset: str, url: str) -> bool:
"""
Returns True if the data was fetched and installed
(or it was already on the latest version, or the user refused the update)
Returns False if rate limit was exceeded
"""
try:
logging.info(f"Checking for {target_asset} updates.")
response = send_request(url)
if response.response_code == 403: # rate limit exceeded
return False
assets = response.data[0]["assets"]
for asset in assets:
if target_asset in asset["name"]:
newest_date: str = asset["updated_at"]
release_url: str = asset["browser_download_url"]
break
else:
raise RuntimeError(f"Failed to locate {target_asset} amongst the assets.")
except (KeyError, IndexError, TypeError, RuntimeError):
update_error = f"Failed to fetch latest {target_asset}."
messagebox.showerror("Failure", update_error)
raise RuntimeError(update_error)
try:
update_available = get_timestamp(newest_date) > get_timestamp(get_date(target_asset))
if update_available and messagebox.askyesnocancel(f"New {target_asset}",
"Would you like to install the new version now?"):
# unzip and patch
with urllib.request.urlopen(release_url) as download:
with zipfile.ZipFile(BytesIO(download.read())) as zf:
zf.extractall()
patch_game()
set_date(target_asset, newest_date)
except (ValueError, RuntimeError, urllib.error.HTTPError):
update_error = f"Failed to apply update."
messagebox.showerror("Failure", update_error)
raise RuntimeError(update_error)
return True
def patch_game() -> None:
"""Applies the patch to data.win"""
logging.info("Proceeding to patch.")
with open(PATCH_NAME, "rb") as patch:
with open("original_data.win", "rb") as data:
patched_data = bsdiff4.patch(data.read(), patch.read())
with open("data.win", "wb") as data:
data.write(patched_data)
logging.info("Done!")
def is_install_valid() -> bool:
"""Checks that the mandatory files that we cannot replace do exist in the current folder"""
for file_name, expected_hash in file_hashes.items():
if not os.path.exists(file_name):
return False
with open(file_name, "rb") as clean:
current_hash = hashlib.md5(clean.read()).hexdigest()
if not secrets.compare_digest(current_hash, expected_hash):
return False
return True
def install() -> None:
"""Extracts all the game files into the mod installation folder"""
logging.info("Mod installation missing or corrupted, proceeding to reinstall.")
# get the cab file and extract it into the installation folder
with open(SavingPrincessWorld.settings.exe_path, "rb") as exe:
# find the cab header
logging.info("Looking for cab archive inside exe.")
cab_found: bool = False
while not cab_found:
cab_found = exe.read(1) == b'M' and exe.read(1) == b'S' and exe.read(1) == b'C' and exe.read(1) == b'F'
exe.read(4) # skip reserved1, always 0
cab_size: int = int.from_bytes(exe.read(4), "little") # read size in bytes
exe.seek(-12, 1) # move the cursor back to the start of the cab file
logging.info(f"Archive found at offset {hex(exe.seek(0, 1))}, size: {hex(cab_size)}.")
logging.info("Extracting cab archive from exe.")
with open("saving_princess.cab", "wb") as cab:
cab.write(exe.read(cab_size))
# clean up files from previous installations
for file_name in files_to_clean:
if os.path.exists(file_name):
os.remove(file_name)
logging.info("Extracting files from cab archive.")
if Utils.is_windows:
subprocess.run(["Extrac32", "/Y", "/E", "saving_princess.cab"])
else:
if shutil.which("wine") is not None:
subprocess.run(["wine", "Extrac32", "/Y", "/E", "saving_princess.cab"])
elif shutil.which("7z") is not None:
subprocess.run(["7z", "e", "saving_princess.cab"])
else:
error = "Could not find neither wine nor 7z.\n\nPlease install either the wine or the p7zip package."
messagebox.showerror("Missing package!", f"Error: {error}")
raise RuntimeError(error)
os.remove("saving_princess.cab") # delete the cab file
shutil.copyfile("data.win", "original_data.win") # and make a copy of data.win
logging.info("Done!")
def launch(*args: str) -> Any:
"""Check args, then the mod installation, then launch the game"""
name: str = ""
password: str = ""
server: str = ""
if args:
parser = argparse.ArgumentParser(description=f"{GAME_NAME} Client Launcher")
parser.add_argument("url", type=str, nargs="?", help="Archipelago Webhost uri to auto connect to.")
args = parser.parse_args(args)
# handle if text client is launched using the "archipelago://name:pass@host:port" url from webhost
if args.url:
url = urllib.parse.urlparse(args.url)
if url.scheme == "archipelago":
server = f'--server="{url.hostname}:{url.port}"'
if url.username:
name = f'--name="{urllib.parse.unquote(url.username)}"'
if url.password:
password = f'--password="{urllib.parse.unquote(url.password)}"'
else:
parser.error(f"bad url, found {args.url}, expected url in form of archipelago://archipelago.gg:38281")
Utils.init_logging(CLIENT_NAME, exception_logger="Client")
os.chdir(SavingPrincessWorld.settings.install_folder)
# check that the mod installation is valid
if not is_install_valid():
if messagebox.askyesnocancel(f"Mod installation missing or corrupted!",
"Would you like to reinstall now?"):
install()
# if there is no mod installation, and we are not installing it, then there isn't much to do
else:
return
# check for updates
if not update(DOWNLOAD_NAME, DOWNLOAD_URL):
messagebox.showinfo("Rate limit exceeded",
"GitHub REST API limit exceeded, could not check for updates.\n\n"
"This will not prevent the game from being played if it was already playable.")
# and try to launch the game
if SavingPrincessWorld.settings.launch_game:
logging.info("Launching game.")
try:
subprocess.Popen(f"{SavingPrincessWorld.settings.launch_command} {name} {password} {server}")
except FileNotFoundError:
error = ("Could not run the game!\n\n"
"Please check that launch_command in options.yaml or host.yaml is set up correctly.")
messagebox.showerror("Command error!", f"Error: {error}")
raise RuntimeError(error)