Archipelago/Patch.py

213 lines
8.5 KiB
Python
Raw Normal View History

2021-11-13 19:52:30 +00:00
# TODO: convert this into a system like AutoWorld
import bsdiff4
import yaml
import os
import lzma
import threading
import concurrent.futures
import zipfile
2020-04-15 08:11:47 +00:00
import sys
2020-03-28 20:55:41 +00:00
from typing import Tuple, Optional
import Utils
current_patch_version = 3
2021-11-12 13:36:34 +00:00
GAME_ALTTP = "A Link to the Past"
GAME_SM = "Super Metroid"
2021-11-13 19:52:30 +00:00
GAME_SOE = "Secret of Evermore"
supported_games = {"A Link to the Past", "Super Metroid", "Secret of Evermore"}
preferred_endings = {
GAME_ALTTP: "apbp",
GAME_SM: "apm3",
GAME_SOE: "apsoe"
}
2021-11-12 13:36:34 +00:00
2021-11-12 13:36:34 +00:00
def generate_yaml(patch: bytes, metadata: Optional[dict] = None, game: str = GAME_ALTTP) -> bytes:
if game == GAME_ALTTP:
2021-11-13 19:52:30 +00:00
from worlds.alttp.Rom import JAP10HASH as HASH
elif game == GAME_SM:
2021-11-13 19:52:30 +00:00
from worlds.sm.Rom import JAP10HASH as HASH
elif game == GAME_SOE:
from worlds.soe.Patch import USHASH as HASH
else:
2021-11-13 19:52:30 +00:00
raise RuntimeError(f"Selected game {game} for base rom not found.")
patch = yaml.dump({"meta": metadata,
"patch": patch,
2021-11-12 13:36:34 +00:00
"game": game,
# minimum version of patch system expected for patching to be successful
"compatible_version": 3,
2020-10-24 03:38:56 +00:00
"version": current_patch_version,
2021-11-13 19:52:30 +00:00
"base_checksum": HASH})
return patch.encode(encoding="utf-8-sig")
2021-11-12 13:36:34 +00:00
def generate_patch(rom: bytes, metadata: Optional[dict] = None, game: str = GAME_ALTTP) -> bytes:
if metadata is None:
metadata = {}
2021-11-14 20:03:17 +00:00
patch = bsdiff4.diff(get_base_rom_data(game), rom)
return generate_yaml(patch, metadata, game)
def create_patch_file(rom_file_to_patch: str, server: str = "", destination: str = None,
2021-11-12 13:36:34 +00:00
player: int = 0, player_name: str = "", game: str = GAME_ALTTP) -> str:
meta = {"server": server, # allow immediate connection to server in multiworld. Empty string otherwise
"player_id": player,
"player_name": player_name}
bytes = generate_patch(load_bytes(rom_file_to_patch),
meta,
game)
2021-11-12 13:36:34 +00:00
target = destination if destination else os.path.splitext(rom_file_to_patch)[0] + (
".apbp" if game == GAME_ALTTP else ".apm3")
write_lzma(bytes, target)
return target
def create_rom_bytes(patch_file: str, ignore_version: bool = False) -> Tuple[dict, str, bytearray]:
data = Utils.parse_yaml(lzma.decompress(load_bytes(patch_file)).decode("utf-8-sig"))
game_name = data["game"]
if not ignore_version and data["compatible_version"] > current_patch_version:
2020-10-24 03:38:56 +00:00
raise RuntimeError("Patch file is incompatible with this patcher, likely an update is required.")
2021-11-14 20:03:17 +00:00
patched_data = bsdiff4.patch(get_base_rom_data(game_name), data["patch"])
rom_hash = patched_data[int(0x7FC0):int(0x7FD5)]
data["meta"]["hash"] = "".join(chr(x) for x in rom_hash)
target = os.path.splitext(patch_file)[0] + ".sfc"
2020-06-09 19:18:48 +00:00
return data["meta"], target, patched_data
2021-11-14 20:03:17 +00:00
def get_base_rom_data(game: str):
if game == GAME_ALTTP:
from worlds.alttp.Rom import get_base_rom_bytes
elif game == "alttp": # old version for A Link to the Past
from worlds.alttp.Rom import get_base_rom_bytes
elif game == GAME_SM:
from worlds.sm.Rom import get_base_rom_bytes
elif game == GAME_SOE:
file_name = Utils.get_options()["soe_options"]["rom"]
get_base_rom_bytes = lambda: bytes(read_rom(open(file_name, "rb")))
else:
raise RuntimeError("Selected game for base rom not found.")
return get_base_rom_bytes()
2020-06-09 19:18:48 +00:00
def create_rom_file(patch_file: str) -> Tuple[dict, str]:
data, target, patched_data = create_rom_bytes(patch_file)
with open(target, "wb") as f:
f.write(patched_data)
2020-06-09 19:18:48 +00:00
return data, target
def update_patch_data(patch_data: bytes, server: str = "") -> bytes:
data = Utils.parse_yaml(lzma.decompress(patch_data).decode("utf-8-sig"))
data["meta"]["server"] = server
bytes = generate_yaml(data["patch"], data["meta"], data["game"])
return lzma.compress(bytes)
2020-04-15 08:11:47 +00:00
def load_bytes(path: str) -> bytes:
with open(path, "rb") as f:
return f.read()
def write_lzma(data: bytes, path: str):
with lzma.LZMAFile(path, 'wb') as f:
f.write(data)
2020-03-17 18:16:11 +00:00
2021-11-12 13:36:34 +00:00
def read_rom(stream, strip_header=True) -> bytearray:
"""Reads rom into bytearray and optionally strips off any smc header"""
buffer = bytearray(stream.read())
if strip_header and len(buffer) % 0x400 == 0x200:
return buffer[0x200:]
return buffer
2020-03-17 18:16:11 +00:00
if __name__ == "__main__":
host = Utils.get_public_ipv4()
2020-04-15 08:11:47 +00:00
options = Utils.get_options()['server_options']
if options['host']:
host = options['host']
2020-04-15 08:11:47 +00:00
address = f"{host}:{options['port']}"
ziplock = threading.Lock()
2020-04-15 08:11:47 +00:00
print(f"Host for patches to be created is {address}")
with concurrent.futures.ThreadPoolExecutor() as pool:
for rom in sys.argv:
try:
if rom.endswith(".sfc"):
print(f"Creating patch for {rom}")
result = pool.submit(create_patch_file, rom, address)
result.add_done_callback(lambda task: print(f"Created patch {task.result()}"))
elif rom.endswith(".apbp"):
print(f"Applying patch {rom}")
data, target = create_rom_file(rom)
romfile, adjusted = Utils.get_adjuster_settings(target)
if adjusted:
try:
os.replace(romfile, target)
romfile = target
except Exception as e:
print(e)
print(f"Created rom {romfile if adjusted else target}.")
if 'server' in data:
Utils.persistent_store("servers", data['hash'], data['server'])
print(f"Host is {data['server']}")
elif rom.endswith(".apm3"):
print(f"Applying patch {rom}")
data, target = create_rom_file(rom)
print(f"Created rom {target}.")
if 'server' in data:
Utils.persistent_store("servers", data['hash'], data['server'])
print(f"Host is {data['server']}")
2021-01-03 13:32:32 +00:00
elif rom.endswith(".archipelago"):
import json
import zlib
2021-01-03 13:32:32 +00:00
with open(rom, 'rb') as fr:
multidata = zlib.decompress(fr.read()).decode("utf-8")
with open(rom + '.txt', 'w') as fw:
fw.write(multidata)
multidata = json.loads(multidata)
for romname in multidata['roms']:
Utils.persistent_store("servers", "".join(chr(byte) for byte in romname[2]), address)
from Utils import get_options
multidata["server_options"] = get_options()["server_options"]
multidata = zlib.compress(json.dumps(multidata).encode("utf-8"), 9)
2021-01-03 13:32:32 +00:00
with open(rom + "_updated.archipelago", 'wb') as f:
f.write(multidata)
elif rom.endswith(".zip"):
print(f"Updating host in patch files contained in {rom}")
def _handle_zip_file_entry(zfinfo: zipfile.ZipInfo, server: str):
data = zfr.read(zfinfo)
if zfinfo.filename.endswith(".apbp") or zfinfo.filename.endswith(".apm3"):
data = update_patch_data(data, server)
with ziplock:
zfw.writestr(zfinfo, data)
return zfinfo.filename
2020-03-17 18:16:11 +00:00
futures = []
with zipfile.ZipFile(rom, "r") as zfr:
updated_zip = os.path.splitext(rom)[0] + "_updated.zip"
with zipfile.ZipFile(updated_zip, "w", compression=zipfile.ZIP_DEFLATED,
compresslevel=9) as zfw:
for zfname in zfr.namelist():
2020-04-15 08:11:47 +00:00
futures.append(pool.submit(_handle_zip_file_entry, zfr.getinfo(zfname), address))
for future in futures:
print(f"File {future.result()} added to {os.path.split(updated_zip)[1]}")
except:
import traceback
traceback.print_exc()
2021-11-12 13:36:34 +00:00
input("Press enter to close.")