259 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			259 lines
		
	
	
		
			9.7 KiB
		
	
	
	
		
			Python
		
	
	
	
import argparse
 | 
						|
import zipfile
 | 
						|
from io import BytesIO
 | 
						|
 | 
						|
import bsdiff4
 | 
						|
from datetime import datetime
 | 
						|
import hashlib
 | 
						|
import json
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import requests
 | 
						|
import secrets
 | 
						|
import shutil
 | 
						|
import subprocess
 | 
						|
from tkinter import messagebox
 | 
						|
from typing import Any, Dict, Set
 | 
						|
import urllib
 | 
						|
import urllib.parse
 | 
						|
 | 
						|
import Utils
 | 
						|
from .Constants import *
 | 
						|
from . import SavingPrincessWorld
 | 
						|
 | 
						|
files_to_clean: Set[str] = {
 | 
						|
    "D3DX9_43.dll",
 | 
						|
    "data.win",
 | 
						|
    "m_boss.ogg",
 | 
						|
    "m_brainos.ogg",
 | 
						|
    "m_coldarea.ogg",
 | 
						|
    "m_escape.ogg",
 | 
						|
    "m_hotarea.ogg",
 | 
						|
    "m_hsis_dark.ogg",
 | 
						|
    "m_hsis_power.ogg",
 | 
						|
    "m_introarea.ogg",
 | 
						|
    "m_malakhov.ogg",
 | 
						|
    "m_miniboss.ogg",
 | 
						|
    "m_ninja.ogg",
 | 
						|
    "m_purple.ogg",
 | 
						|
    "m_space_idle.ogg",
 | 
						|
    "m_stonearea.ogg",
 | 
						|
    "m_swamp.ogg",
 | 
						|
    "m_zzz.ogg",
 | 
						|
    "options.ini",
 | 
						|
    "Saving Princess v0_8.exe",
 | 
						|
    "splash.png",
 | 
						|
    "gm-apclientpp.dll",
 | 
						|
    "LICENSE",
 | 
						|
    "original_data.win",
 | 
						|
    "versions.json",
 | 
						|
}
 | 
						|
 | 
						|
file_hashes: Dict[str, str] = {
 | 
						|
    "D3DX9_43.dll": "86e39e9161c3d930d93822f1563c280d",
 | 
						|
    "Saving Princess v0_8.exe": "cc3ad10c782e115d93c5b9fbc5675eaf",
 | 
						|
    "original_data.win": "f97b80204bd9ae535faa5a8d1e5eb6ca",
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
class UrlResponse:
 | 
						|
    def __init__(self, response_code: int, data: Any):
 | 
						|
        self.response_code = response_code
 | 
						|
        self.data = data
 | 
						|
 | 
						|
 | 
						|
def get_date(target_asset: str) -> str:
 | 
						|
    """Provided the name of an asset, fetches its update date"""
 | 
						|
    try:
 | 
						|
        with open("versions.json", "r") as versions_json:
 | 
						|
            return json.load(versions_json)[target_asset]
 | 
						|
    except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError):
 | 
						|
        return "2000-01-01T00:00:00Z"  # arbitrary old date
 | 
						|
 | 
						|
 | 
						|
def set_date(target_asset: str, date: str) -> None:
 | 
						|
    """Provided the name of an asset and a date, sets it update date"""
 | 
						|
    try:
 | 
						|
        with open("versions.json", "r") as versions_json:
 | 
						|
            versions = json.load(versions_json)
 | 
						|
            versions[target_asset] = date
 | 
						|
    except (FileNotFoundError, KeyError, json.decoder.JSONDecodeError):
 | 
						|
        versions = {target_asset: date}
 | 
						|
    with open("versions.json", "w") as versions_json:
 | 
						|
        json.dump(versions, versions_json)
 | 
						|
 | 
						|
 | 
						|
def get_timestamp(date: str) -> float:
 | 
						|
    """Parses a GitHub REST API date into a timestamp"""
 | 
						|
    return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ").timestamp()
 | 
						|
 | 
						|
 | 
						|
def send_request(request_url: str) -> UrlResponse:
 | 
						|
    """Fetches status code and json response from given url"""
 | 
						|
    response = requests.get(request_url)
 | 
						|
    if response.status_code == 200:  # success
 | 
						|
        try:
 | 
						|
            data = response.json()
 | 
						|
        except requests.exceptions.JSONDecodeError:
 | 
						|
            raise RuntimeError(f"Unable to fetch data. (status code {response.status_code}).")
 | 
						|
    else:
 | 
						|
        data = {}
 | 
						|
    return UrlResponse(response.status_code, data)
 | 
						|
 | 
						|
 | 
						|
def update(target_asset: str, url: str) -> bool:
 | 
						|
    """
 | 
						|
    Returns True if the data was fetched and installed
 | 
						|
        (or it was already on the latest version, or the user refused the update)
 | 
						|
    Returns False if rate limit was exceeded
 | 
						|
    """
 | 
						|
    try:
 | 
						|
        logging.info(f"Checking for {target_asset} updates.")
 | 
						|
        response = send_request(url)
 | 
						|
        if response.response_code == 403:  # rate limit exceeded
 | 
						|
            return False
 | 
						|
        assets = response.data[0]["assets"]
 | 
						|
        for asset in assets:
 | 
						|
            if target_asset in asset["name"]:
 | 
						|
                newest_date: str = asset["updated_at"]
 | 
						|
                release_url: str = asset["browser_download_url"]
 | 
						|
                break
 | 
						|
        else:
 | 
						|
            raise RuntimeError(f"Failed to locate {target_asset} amongst the assets.")
 | 
						|
    except (KeyError, IndexError, TypeError, RuntimeError):
 | 
						|
        update_error = f"Failed to fetch latest {target_asset}."
 | 
						|
        messagebox.showerror("Failure", update_error)
 | 
						|
        raise RuntimeError(update_error)
 | 
						|
    try:
 | 
						|
        update_available = get_timestamp(newest_date) > get_timestamp(get_date(target_asset))
 | 
						|
        if update_available and messagebox.askyesnocancel(f"New {target_asset}",
 | 
						|
                                                          "Would you like to install the new version now?"):
 | 
						|
            # unzip and patch
 | 
						|
            with urllib.request.urlopen(release_url) as download:
 | 
						|
                with zipfile.ZipFile(BytesIO(download.read())) as zf:
 | 
						|
                    zf.extractall()
 | 
						|
            patch_game()
 | 
						|
            set_date(target_asset, newest_date)
 | 
						|
    except (ValueError, RuntimeError, urllib.error.HTTPError):
 | 
						|
        update_error = f"Failed to apply update."
 | 
						|
        messagebox.showerror("Failure", update_error)
 | 
						|
        raise RuntimeError(update_error)
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def patch_game() -> None:
 | 
						|
    """Applies the patch to data.win"""
 | 
						|
    logging.info("Proceeding to patch.")
 | 
						|
    with open(PATCH_NAME, "rb") as patch:
 | 
						|
        with open("original_data.win", "rb") as data:
 | 
						|
            patched_data = bsdiff4.patch(data.read(), patch.read())
 | 
						|
        with open("data.win", "wb") as data:
 | 
						|
            data.write(patched_data)
 | 
						|
        logging.info("Done!")
 | 
						|
 | 
						|
 | 
						|
def is_install_valid() -> bool:
 | 
						|
    """Checks that the mandatory files that we cannot replace do exist in the current folder"""
 | 
						|
    for file_name, expected_hash in file_hashes.items():
 | 
						|
        if not os.path.exists(file_name):
 | 
						|
            return False
 | 
						|
        with open(file_name, "rb") as clean:
 | 
						|
            current_hash = hashlib.md5(clean.read()).hexdigest()
 | 
						|
        if not secrets.compare_digest(current_hash, expected_hash):
 | 
						|
            return False
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def install() -> None:
 | 
						|
    """Extracts all the game files into the mod installation folder"""
 | 
						|
    logging.info("Mod installation missing or corrupted, proceeding to reinstall.")
 | 
						|
    # get the cab file and extract it into the installation folder
 | 
						|
    with open(SavingPrincessWorld.settings.exe_path, "rb") as exe:
 | 
						|
        # find the cab header
 | 
						|
        logging.info("Looking for cab archive inside exe.")
 | 
						|
        cab_found: bool = False
 | 
						|
        while not cab_found:
 | 
						|
            cab_found = exe.read(1) == b'M' and exe.read(1) == b'S' and exe.read(1) == b'C' and exe.read(1) == b'F'
 | 
						|
        exe.read(4)  # skip reserved1, always 0
 | 
						|
        cab_size: int = int.from_bytes(exe.read(4), "little")  # read size in bytes
 | 
						|
        exe.seek(-12, 1)  # move the cursor back to the start of the cab file
 | 
						|
        logging.info(f"Archive found at offset {hex(exe.seek(0, 1))}, size: {hex(cab_size)}.")
 | 
						|
        logging.info("Extracting cab archive from exe.")
 | 
						|
        with open("saving_princess.cab", "wb") as cab:
 | 
						|
            cab.write(exe.read(cab_size))
 | 
						|
 | 
						|
    # clean up files from previous installations
 | 
						|
    for file_name in files_to_clean:
 | 
						|
        if os.path.exists(file_name):
 | 
						|
            os.remove(file_name)
 | 
						|
 | 
						|
    logging.info("Extracting files from cab archive.")
 | 
						|
    if Utils.is_windows:
 | 
						|
        subprocess.run(["Extrac32", "/Y", "/E", "saving_princess.cab"])
 | 
						|
    else:
 | 
						|
        if shutil.which("wine") is not None:
 | 
						|
            subprocess.run(["wine", "Extrac32", "/Y", "/E", "saving_princess.cab"])
 | 
						|
        elif shutil.which("7z") is not None:
 | 
						|
            subprocess.run(["7z", "e", "saving_princess.cab"])
 | 
						|
        else:
 | 
						|
            error = "Could not find neither wine nor 7z.\n\nPlease install either the wine or the p7zip package."
 | 
						|
            messagebox.showerror("Missing package!", f"Error: {error}")
 | 
						|
            raise RuntimeError(error)
 | 
						|
    os.remove("saving_princess.cab")  # delete the cab file
 | 
						|
 | 
						|
    shutil.copyfile("data.win", "original_data.win")  # and make a copy of data.win
 | 
						|
    logging.info("Done!")
 | 
						|
 | 
						|
 | 
						|
def launch(*args: str) -> Any:
 | 
						|
    """Check args, then the mod installation, then launch the game"""
 | 
						|
    name: str = ""
 | 
						|
    password: str = ""
 | 
						|
    server: str = ""
 | 
						|
    if args:
 | 
						|
        parser = argparse.ArgumentParser(description=f"{GAME_NAME} Client Launcher")
 | 
						|
        parser.add_argument("url", type=str, nargs="?", help="Archipelago Webhost uri to auto connect to.")
 | 
						|
        args = parser.parse_args(args)
 | 
						|
 | 
						|
        # handle if text client is launched using the "archipelago://name:pass@host:port" url from webhost
 | 
						|
        if args.url:
 | 
						|
            url = urllib.parse.urlparse(args.url)
 | 
						|
            if url.scheme == "archipelago":
 | 
						|
                server = f'--server="{url.hostname}:{url.port}"'
 | 
						|
                if url.username:
 | 
						|
                    name = f'--name="{urllib.parse.unquote(url.username)}"'
 | 
						|
                if url.password:
 | 
						|
                    password = f'--password="{urllib.parse.unquote(url.password)}"'
 | 
						|
            else:
 | 
						|
                parser.error(f"bad url, found {args.url}, expected url in form of archipelago://archipelago.gg:38281")
 | 
						|
 | 
						|
    Utils.init_logging(CLIENT_NAME, exception_logger="Client")
 | 
						|
 | 
						|
    os.chdir(SavingPrincessWorld.settings.install_folder)
 | 
						|
 | 
						|
    # check that the mod installation is valid
 | 
						|
    if not is_install_valid():
 | 
						|
        if messagebox.askyesnocancel(f"Mod installation missing or corrupted!",
 | 
						|
                                     "Would you like to reinstall now?"):
 | 
						|
            install()
 | 
						|
        # if there is no mod installation, and we are not installing it, then there isn't much to do
 | 
						|
        else:
 | 
						|
            return
 | 
						|
 | 
						|
    # check for updates
 | 
						|
    if not update(DOWNLOAD_NAME, DOWNLOAD_URL):
 | 
						|
        messagebox.showinfo("Rate limit exceeded",
 | 
						|
                            "GitHub REST API limit exceeded, could not check for updates.\n\n"
 | 
						|
                            "This will not prevent the game from being played if it was already playable.")
 | 
						|
 | 
						|
    # and try to launch the game
 | 
						|
    if SavingPrincessWorld.settings.launch_game:
 | 
						|
        logging.info("Launching game.")
 | 
						|
        try:
 | 
						|
            subprocess.Popen(f"{SavingPrincessWorld.settings.launch_command} {name} {password} {server}")
 | 
						|
        except FileNotFoundError:
 | 
						|
            error = ("Could not run the game!\n\n"
 | 
						|
                     "Please check that launch_command in options.yaml or host.yaml is set up correctly.")
 | 
						|
            messagebox.showerror("Command error!", f"Error: {error}")
 | 
						|
            raise RuntimeError(error)
 |