Core remove legacy patch (#1047)

Co-authored-by: black-sliver <59490463+black-sliver@users.noreply.github.com>
This commit is contained in:
Fabian Dill 2022-09-30 00:36:30 +02:00 committed by GitHub
parent 8ab0b410c3
commit 61e39f355d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 259 additions and 491 deletions

View File

@ -139,7 +139,7 @@ def adjust(args):
vanillaRom = args.baserom
if not os.path.exists(vanillaRom) and not os.path.isabs(vanillaRom):
vanillaRom = local_path(vanillaRom)
if os.path.splitext(args.rom)[-1].lower() in {'.apbp', '.aplttp'}:
if os.path.splitext(args.rom)[-1].lower() == '.aplttp':
import Patch
meta, args.rom = Patch.create_rom_file(args.rom)
@ -195,7 +195,7 @@ def adjustGUI():
romEntry2 = Entry(romDialogFrame, textvariable=romVar2)
def RomSelect2():
rom = filedialog.askopenfilename(filetypes=[("Rom Files", (".sfc", ".smc", ".apbp")), ("All Files", "*")])
rom = filedialog.askopenfilename(filetypes=[("Rom Files", (".sfc", ".smc", ".aplttp")), ("All Files", "*")])
romVar2.set(rom)
romSelectButton2 = Button(romDialogFrame, text='Select Rom', command=RomSelect2)
@ -725,7 +725,7 @@ def get_rom_options_frame(parent=None):
vars.auto_apply = StringVar(value=adjuster_settings.auto_apply)
autoApplyFrame = Frame(romOptionsFrame)
autoApplyFrame.grid(row=9, column=0, columnspan=2, sticky=W)
filler = Label(autoApplyFrame, text="Automatically apply last used settings on opening .apbp files")
filler = Label(autoApplyFrame, text="Automatically apply last used settings on opening .aplttp files")
filler.pack(side=TOP, expand=True, fill=X)
askRadio = Radiobutton(autoApplyFrame, text='Ask', variable=vars.auto_apply, value='ask')
askRadio.pack(side=LEFT, padx=5, pady=5)

View File

@ -5,7 +5,8 @@ import multiprocessing
import subprocess
from asyncio import StreamReader, StreamWriter
from CommonClient import CommonContext, server_loop, gui_enabled, console_loop, \
# CommonClient import first to trigger ModuleUpdater
from CommonClient import CommonContext, server_loop, gui_enabled, \
ClientCommandProcessor, logger, get_base_parser
import Utils
from worlds import network_data_package

418
Patch.py
View File

@ -1,274 +1,33 @@
from __future__ import annotations
import shutil
import json
import bsdiff4 # type: ignore
import yaml
import os
import lzma
import threading
import concurrent.futures
import zipfile
import sys
from typing import ClassVar, List, Tuple, Optional, Dict, Any, Union, BinaryIO
from typing import Tuple, Optional, TypedDict
import ModuleUpdate
ModuleUpdate.update()
if __name__ == "__main__":
import ModuleUpdate
ModuleUpdate.update()
import Utils
current_patch_version = 5
from worlds.Files import AutoPatchRegister, APDeltaPatch
class AutoPatchRegister(type):
patch_types: ClassVar[Dict[str, AutoPatchRegister]] = {}
file_endings: ClassVar[Dict[str, AutoPatchRegister]] = {}
def __new__(cls, name: str, bases: Tuple[type, ...], dct: Dict[str, Any]) -> AutoPatchRegister:
# construct class
new_class = super().__new__(cls, name, bases, dct)
if "game" in dct:
AutoPatchRegister.patch_types[dct["game"]] = new_class
if not dct["patch_file_ending"]:
raise Exception(f"Need an expected file ending for {name}")
AutoPatchRegister.file_endings[dct["patch_file_ending"]] = new_class
return new_class
@staticmethod
def get_handler(file: str) -> Optional[AutoPatchRegister]:
for file_ending, handler in AutoPatchRegister.file_endings.items():
if file.endswith(file_ending):
return handler
return None
class APContainer:
"""A zipfile containing at least archipelago.json"""
version: int = current_patch_version
compression_level: int = 9
compression_method: int = zipfile.ZIP_DEFLATED
game: Optional[str] = None
# instance attributes:
path: Optional[str]
player: Optional[int]
player_name: str
server: str
def __init__(self, path: Optional[str] = None, player: Optional[int] = None,
player_name: str = "", server: str = ""):
self.path = path
self.player = player
self.player_name = player_name
self.server = server
def write(self, file: Optional[Union[str, BinaryIO]] = None) -> None:
zip_file = file if file else self.path
if not zip_file:
raise FileNotFoundError(f"Cannot write {self.__class__.__name__} due to no path provided.")
with zipfile.ZipFile(zip_file, "w", self.compression_method, True, self.compression_level) \
as zf:
if file:
self.path = zf.filename
self.write_contents(zf)
def write_contents(self, opened_zipfile: zipfile.ZipFile) -> None:
manifest = self.get_manifest()
try:
manifest_str = json.dumps(manifest)
except Exception as e:
raise Exception(f"Manifest {manifest} did not convert to json.") from e
else:
opened_zipfile.writestr("archipelago.json", manifest_str)
def read(self, file: Optional[Union[str, BinaryIO]] = None) -> None:
"""Read data into patch object. file can be file-like, such as an outer zip file's stream."""
zip_file = file if file else self.path
if not zip_file:
raise FileNotFoundError(f"Cannot read {self.__class__.__name__} due to no path provided.")
with zipfile.ZipFile(zip_file, "r") as zf:
if file:
self.path = zf.filename
self.read_contents(zf)
def read_contents(self, opened_zipfile: zipfile.ZipFile) -> None:
with opened_zipfile.open("archipelago.json", "r") as f:
manifest = json.load(f)
if manifest["compatible_version"] > self.version:
raise Exception(f"File (version: {manifest['compatible_version']}) too new "
f"for this handler (version: {self.version})")
self.player = manifest["player"]
self.server = manifest["server"]
self.player_name = manifest["player_name"]
def get_manifest(self) -> Dict[str, Any]:
return {
"server": self.server, # allow immediate connection to server in multiworld. Empty string otherwise
"player": self.player,
"player_name": self.player_name,
"game": self.game,
# minimum version of patch system expected for patching to be successful
"compatible_version": 4,
"version": current_patch_version,
}
class APDeltaPatch(APContainer, metaclass=AutoPatchRegister):
"""An APContainer that additionally has delta.bsdiff4
containing a delta patch to get the desired file, often a rom."""
hash: Optional[str] # base checksum of source file
patch_file_ending: str = ""
delta: Optional[bytes] = None
result_file_ending: str = ".sfc"
source_data: bytes
def __init__(self, *args: Any, patched_path: str = "", **kwargs: Any) -> None:
self.patched_path = patched_path
super(APDeltaPatch, self).__init__(*args, **kwargs)
def get_manifest(self) -> Dict[str, Any]:
manifest = super(APDeltaPatch, self).get_manifest()
manifest["base_checksum"] = self.hash
manifest["result_file_ending"] = self.result_file_ending
manifest["patch_file_ending"] = self.patch_file_ending
return manifest
@classmethod
def get_source_data(cls) -> bytes:
"""Get Base data"""
raise NotImplementedError()
@classmethod
def get_source_data_with_cache(cls) -> bytes:
if not hasattr(cls, "source_data"):
cls.source_data = cls.get_source_data()
return cls.source_data
def write_contents(self, opened_zipfile: zipfile.ZipFile):
super(APDeltaPatch, self).write_contents(opened_zipfile)
# write Delta
opened_zipfile.writestr("delta.bsdiff4",
bsdiff4.diff(self.get_source_data_with_cache(), open(self.patched_path, "rb").read()),
compress_type=zipfile.ZIP_STORED) # bsdiff4 is a format with integrated compression
def read_contents(self, opened_zipfile: zipfile.ZipFile):
super(APDeltaPatch, self).read_contents(opened_zipfile)
self.delta = opened_zipfile.read("delta.bsdiff4")
def patch(self, target: str):
"""Base + Delta -> Patched"""
if not self.delta:
self.read()
result = bsdiff4.patch(self.get_source_data_with_cache(), self.delta)
with open(target, "wb") as f:
f.write(result)
# legacy patch handling follows:
GAME_ALTTP = "A Link to the Past"
GAME_SM = "Super Metroid"
GAME_SOE = "Secret of Evermore"
GAME_SMZ3 = "SMZ3"
GAME_DKC3 = "Donkey Kong Country 3"
GAME_SMW = "Super Mario World"
supported_games = {"A Link to the Past", "Super Metroid", "Secret of Evermore", "SMZ3", "Donkey Kong Country 3"}
preferred_endings = {
GAME_ALTTP: "apbp",
GAME_SM: "apm3",
GAME_SOE: "apsoe",
GAME_SMZ3: "apsmz",
GAME_DKC3: "apdkc3"
}
def generate_yaml(patch: bytes, metadata: Optional[dict] = None, game: str = GAME_ALTTP) -> bytes:
if game == GAME_ALTTP:
from worlds.alttp.Rom import LTTPJPN10HASH as HASH
elif game == GAME_SM:
from worlds.sm.Rom import SMJUHASH as HASH
elif game == GAME_SOE:
from worlds.soe.Patch import USHASH as HASH
elif game == GAME_SMZ3:
from worlds.alttp.Rom import LTTPJPN10HASH as ALTTPHASH
from worlds.sm.Rom import SMJUHASH as SMHASH
HASH = ALTTPHASH + SMHASH
elif game == GAME_DKC3:
from worlds.dkc3.Rom import USHASH as HASH
else:
raise RuntimeError(f"Selected game {game} for base rom not found.")
patch = yaml.dump({"meta": metadata,
"patch": patch,
"game": game,
# minimum version of patch system expected for patching to be successful
"compatible_version": 3,
"version": current_patch_version,
"base_checksum": HASH})
return patch.encode(encoding="utf-8-sig")
class RomMeta(TypedDict):
server: str
player: Optional[int]
player_name: str
def generate_patch(rom: bytes, metadata: Optional[Dict[str, Any]] = None, game: str = GAME_ALTTP) -> bytes:
if metadata is None:
metadata = {}
patch = bsdiff4.diff(get_base_rom_data(game), rom)
return generate_yaml(patch, metadata, game)
def create_patch_file(rom_file_to_patch: str,
server: str = "",
destination: Optional[str] = None,
player: int = 0,
player_name: str = "",
game: str = GAME_ALTTP) -> str:
meta = {"server": server, # allow immediate connection to server in multiworld. Empty string otherwise
"player_id": player,
"player_name": player_name}
bytes = generate_patch(load_bytes(rom_file_to_patch),
meta,
game)
target = destination if destination else os.path.splitext(rom_file_to_patch)[0] + (
".apbp" if game == GAME_ALTTP
else ".apsmz" if game == GAME_SMZ3
else ".apdkc3" if game == GAME_DKC3
else ".apm3")
write_lzma(bytes, target)
return target
def create_rom_bytes(patch_file: str, ignore_version: bool = False) -> Tuple[Dict[str, Any], str, bytearray]:
data = Utils.parse_yaml(lzma.decompress(load_bytes(patch_file)).decode("utf-8-sig"))
game_name = data["game"]
if not ignore_version and data["compatible_version"] > current_patch_version:
raise RuntimeError("Patch file is incompatible with this patcher, likely an update is required.")
patched_data: bytearray = bsdiff4.patch(get_base_rom_data(game_name), data["patch"])
rom_hash = patched_data[int(0x7FC0):int(0x7FD5)]
data["meta"]["hash"] = "".join(chr(x) for x in rom_hash)
target = os.path.splitext(patch_file)[0] + ".sfc"
return data["meta"], target, patched_data
def get_base_rom_data(game: str) -> bytes:
if game == GAME_ALTTP:
from worlds.alttp.Rom import get_base_rom_bytes
elif game == "alttp": # old version for A Link to the Past
from worlds.alttp.Rom import get_base_rom_bytes
elif game == GAME_SM:
from worlds.sm.Rom import get_base_rom_bytes
elif game == GAME_SOE:
from worlds.soe.Patch import get_base_rom_path
get_base_rom_bytes = lambda: bytes(read_rom(open(get_base_rom_path(), "rb")))
elif game == GAME_SMZ3:
from worlds.smz3.Rom import get_base_rom_bytes
elif game == GAME_DKC3:
from worlds.dkc3.Rom import get_base_rom_bytes
else:
raise RuntimeError("Selected game for base rom not found.")
return get_base_rom_bytes()
def create_rom_file(patch_file: str) -> Tuple[Dict[str, Any], str]:
def create_rom_file(patch_file: str) -> Tuple[RomMeta, str]:
auto_handler = AutoPatchRegister.get_handler(patch_file)
if auto_handler:
handler: APDeltaPatch = auto_handler(patch_file)
@ -277,157 +36,10 @@ def create_rom_file(patch_file: str) -> Tuple[Dict[str, Any], str]:
return {"server": handler.server,
"player": handler.player,
"player_name": handler.player_name}, target
else:
data, target, patched_data = create_rom_bytes(patch_file)
with open(target, "wb") as f:
f.write(patched_data)
return data, target
def update_patch_data(patch_data: bytes, server: str = "") -> bytes:
data = Utils.parse_yaml(lzma.decompress(patch_data).decode("utf-8-sig"))
data["meta"]["server"] = server
bytes = generate_yaml(data["patch"], data["meta"], data["game"])
return lzma.compress(bytes)
def load_bytes(path: str) -> bytes:
with open(path, "rb") as f:
return f.read()
def write_lzma(data: bytes, path: str):
with lzma.LZMAFile(path, 'wb') as f:
f.write(data)
def read_rom(stream: BinaryIO, strip_header: bool = True) -> bytearray:
"""Reads rom into bytearray and optionally strips off any smc header"""
buffer = bytearray(stream.read())
if strip_header and len(buffer) % 0x400 == 0x200:
return buffer[0x200:]
return buffer
raise NotImplementedError(f"No Handler for {patch_file} found.")
if __name__ == "__main__":
host = Utils.get_public_ipv4()
options = Utils.get_options()['server_options']
if options['host']:
host = options['host']
address = f"{host}:{options['port']}"
ziplock = threading.Lock()
print(f"Host for patches to be created is {address}")
with concurrent.futures.ThreadPoolExecutor() as pool:
for rom in sys.argv:
try:
if rom.endswith(".sfc"):
print(f"Creating patch for {rom}")
result = pool.submit(create_patch_file, rom, address)
result.add_done_callback(lambda task: print(f"Created patch {task.result()}"))
elif rom.endswith(".apbp"):
print(f"Applying patch {rom}")
data, target = create_rom_file(rom)
# romfile, adjusted = Utils.get_adjuster_settings(target)
adjuster_settings = Utils.get_adjuster_settings(GAME_ALTTP)
adjusted = False
if adjuster_settings:
import pprint
from worlds.alttp.Rom import get_base_rom_path
adjuster_settings.rom = target
adjuster_settings.baserom = get_base_rom_path()
adjuster_settings.world = None
whitelist = {"music", "menuspeed", "heartbeep", "heartcolor", "ow_palettes", "quickswap",
"uw_palettes", "sprite", "sword_palettes", "shield_palettes", "hud_palettes",
"reduceflashing", "deathlink"}
printed_options = {name: value for name, value in vars(adjuster_settings).items() if name in whitelist}
if hasattr(adjuster_settings, "sprite_pool"):
sprite_pool = {}
for sprite in getattr(adjuster_settings, "sprite_pool"):
if sprite in sprite_pool:
sprite_pool[sprite] += 1
else:
sprite_pool[sprite] = 1
if sprite_pool:
printed_options["sprite_pool"] = sprite_pool
adjust_wanted = str('no')
if not hasattr(adjuster_settings, 'auto_apply') or 'ask' in adjuster_settings.auto_apply:
adjust_wanted = input(f"Last used adjuster settings were found. Would you like to apply these? \n"
f"{pprint.pformat(printed_options)}\n"
f"Enter yes, no, always or never: ")
if adjuster_settings.auto_apply == 'never': # never adjust, per user request
adjust_wanted = 'no'
elif adjuster_settings.auto_apply == 'always':
adjust_wanted = 'yes'
if adjust_wanted and "never" in adjust_wanted:
adjuster_settings.auto_apply = 'never'
Utils.persistent_store("adjuster", GAME_ALTTP, adjuster_settings)
elif adjust_wanted and "always" in adjust_wanted:
adjuster_settings.auto_apply = 'always'
Utils.persistent_store("adjuster", GAME_ALTTP, adjuster_settings)
if adjust_wanted and adjust_wanted.startswith("y"):
if hasattr(adjuster_settings, "sprite_pool"):
from LttPAdjuster import AdjusterWorld
adjuster_settings.world = AdjusterWorld(getattr(adjuster_settings, "sprite_pool"))
adjusted = True
import LttPAdjuster
_, romfile = LttPAdjuster.adjust(adjuster_settings)
if hasattr(adjuster_settings, "world"):
delattr(adjuster_settings, "world")
else:
adjusted = False
if adjusted:
try:
shutil.move(romfile, target)
romfile = target
except Exception as e:
print(e)
print(f"Created rom {romfile if adjusted else target}.")
if 'server' in data:
Utils.persistent_store("servers", data['hash'], data['server'])
print(f"Host is {data['server']}")
elif rom.endswith(".apm3") \
or rom.endswith(".apsmz") \
or rom.endswith(".apdkc3"):
print(f"Applying patch {rom}")
data, target = create_rom_file(rom)
print(f"Created rom {target}.")
if 'server' in data:
Utils.persistent_store("servers", data['hash'], data['server'])
print(f"Host is {data['server']}")
elif rom.endswith(".zip"):
print(f"Updating host in patch files contained in {rom}")
def _handle_zip_file_entry(zfinfo: zipfile.ZipInfo, server: str) -> str:
data = zfr.read(zfinfo)
if zfinfo.filename.endswith(".apbp") or \
zfinfo.filename.endswith(".apm3") or \
zfinfo.filename.endswith(".apdkc3"):
data = update_patch_data(data, server)
with ziplock:
zfw.writestr(zfinfo, data)
return zfinfo.filename
futures: List[concurrent.futures.Future[str]] = []
with zipfile.ZipFile(rom, "r") as zfr:
updated_zip = os.path.splitext(rom)[0] + "_updated.zip"
with zipfile.ZipFile(updated_zip, "w", compression=zipfile.ZIP_DEFLATED,
compresslevel=9) as zfw:
for zfname in zfr.namelist():
futures.append(pool.submit(_handle_zip_file_entry, zfr.getinfo(zfname), address))
for future in futures:
print(f"File {future.result()} added to {os.path.split(updated_zip)[1]}")
except:
import traceback
traceback.print_exc()
input("Press enter to close.")
for file in sys.argv[1:]:
meta_data, result_file = create_rom_file(file)
print(f"Patch with meta-data {meta_data} was written to {result_file}")

View File

@ -15,10 +15,13 @@ import typing
from json import loads, dumps
from Utils import init_logging, messagebox
# CommonClient import first to trigger ModuleUpdater
from CommonClient import CommonContext, server_loop, ClientCommandProcessor, gui_enabled, get_base_parser
import Utils
if __name__ == "__main__":
init_logging("SNIClient", exception_logger="Client")
Utils.init_logging("SNIClient", exception_logger="Client")
import colorama
import websockets
@ -28,10 +31,9 @@ from worlds.alttp import Regions, Shops
from worlds.alttp.Rom import ROM_PLAYER_LIMIT
from worlds.sm.Rom import ROM_PLAYER_LIMIT as SM_ROM_PLAYER_LIMIT
from worlds.smz3.Rom import ROM_PLAYER_LIMIT as SMZ3_ROM_PLAYER_LIMIT
import Utils
from CommonClient import CommonContext, server_loop, ClientCommandProcessor, gui_enabled, get_base_parser
from Patch import GAME_ALTTP, GAME_SM, GAME_SMZ3, GAME_DKC3, GAME_SMW
snes_logger = logging.getLogger("SNES")
from MultiServer import mark_raw
@ -1336,20 +1338,18 @@ async def main():
try:
meta, romfile = Patch.create_rom_file(args.diff_file)
except Exception as e:
messagebox('Error', str(e), True)
Utils.messagebox('Error', str(e), True)
raise
if "server" in meta:
args.connect = meta["server"]
args.connect = meta["server"]
logging.info(f"Wrote rom file to {romfile}")
if args.diff_file.endswith(".apsoe"):
import webbrowser
webbrowser.open("http://www.evermizer.com/apclient/" +
(f"#server={meta['server']}" if "server" in meta else ""))
webbrowser.open(f"http://www.evermizer.com/apclient/#server={meta['server']}")
logging.info("Starting Evermizer Client in your Browser...")
import time
time.sleep(3)
sys.exit()
elif args.diff_file.endswith((".apbp", ".apz3", ".aplttp")):
elif args.diff_file.endswith(".aplttp"):
adjustedromfile, adjusted = get_alttp_settings(romfile)
asyncio.create_task(run_game(adjustedromfile if adjusted else romfile))
else:

View File

@ -11,6 +11,8 @@ import io
import collections
import importlib
import logging
from typing import BinaryIO
from yaml import load, load_all, dump, SafeLoader
try:
@ -632,3 +634,11 @@ def title_sorted(data: typing.Sequence, key=None, ignore: typing.Set = frozenset
else:
return element.lower()
return sorted(data, key=lambda i: sorter(key(i)) if key else sorter(i))
def read_snes_rom(stream: BinaryIO, strip_header: bool = True) -> bytearray:
"""Reads rom into bytearray and optionally strips off any smc header"""
buffer = bytearray(stream.read())
if strip_header and len(buffer) % 0x400 == 0x200:
return buffer[0x200:]
return buffer

View File

@ -10,7 +10,6 @@ from flask_compress import Compress
from werkzeug.routing import BaseConverter
from Utils import title_sorted
from .models import *
UPLOAD_FOLDER = os.path.relpath('uploads')
LOGS_FOLDER = os.path.relpath('logs')
@ -73,8 +72,10 @@ def register():
"""Import submodules, triggering their registering on flask routing.
Note: initializes worlds subsystem."""
# has automatic patch integration
import Patch
app.jinja_env.filters['supports_apdeltapatch'] = lambda game_name: game_name in Patch.AutoPatchRegister.patch_types
import worlds.AutoWorld
import worlds.Files
app.jinja_env.filters['supports_apdeltapatch'] = lambda game_name: \
game_name in worlds.Files.AutoPatchRegister.patch_types
from WebHostLib.customserver import run_server_process
# to trigger app routing picking up on it

View File

@ -7,7 +7,8 @@ from . import api_endpoints
from flask import request, session, url_for
from pony.orm import commit
from WebHostLib import app, Generation, STATE_QUEUED, Seed, STATE_ERROR
from WebHostLib import app
from WebHostLib.models import Generation, STATE_QUEUED, Seed, STATE_ERROR
from WebHostLib.check import get_yaml_data, roll_options
from WebHostLib.generate import get_meta

View File

@ -5,8 +5,9 @@ from io import BytesIO
from flask import send_file, Response, render_template
from pony.orm import select
from Patch import update_patch_data, preferred_endings, AutoPatchRegister
from WebHostLib import app, Slot, Room, Seed, cache
from worlds.Files import AutoPatchRegister
from . import app, cache
from .models import Slot, Room, Seed
@app.route("/dl_patch/<suuid:room_id>/<int:patch_id>")
@ -41,12 +42,7 @@ def download_patch(room_id, patch_id):
new_file.seek(0)
return send_file(new_file, as_attachment=True, download_name=fname)
else:
patch_data = update_patch_data(patch.data, server=f"{app.config['PATCH_TARGET']}:{last_port}")
patch_data = BytesIO(patch_data)
fname = f"P{patch.player_id}_{patch.player_name}_{app.jinja_env.filters['suuid'](room_id)}." \
f"{preferred_endings[patch.game]}"
return send_file(patch_data, as_attachment=True, download_name=fname)
return "Old Patch file, no longer compatible."
@app.route("/dl_spoiler/<suuid:seed_id>")

View File

@ -8,7 +8,8 @@ import datetime
from uuid import UUID
from worlds.alttp import Items
from WebHostLib import app, cache, Room
from . import app, cache
from .models import Room
from Utils import restricted_loads
from worlds import lookup_any_item_id_to_name, lookup_any_location_id_to_name
from MultiServer import Context

View File

@ -1,6 +1,5 @@
import typing
import zipfile
import lzma
import json
import base64
import MultiServer
@ -10,9 +9,10 @@ from io import BytesIO
from flask import request, flash, redirect, url_for, session, render_template
from pony.orm import flush, select
from WebHostLib import app, Seed, Room, Slot
from Utils import parse_yaml, VersionException, __version__
from Patch import preferred_endings, AutoPatchRegister
from . import app
from .models import Seed, Room, Slot
from Utils import VersionException, __version__
from worlds.Files import AutoPatchRegister
from NetUtils import NetworkSlot, SlotType
banned_zip_contents = (".sfc",)
@ -38,17 +38,6 @@ def upload_zip_to_db(zfile: zipfile.ZipFile, owner=None, meta={"race": False}, s
player_name=patch.player_name,
player_id=patch.player,
game=patch.game))
elif file.filename.endswith(tuple(preferred_endings.values())):
data = zfile.open(file, "r").read()
yaml_data = parse_yaml(lzma.decompress(data).decode("utf-8-sig"))
if yaml_data["version"] < 2:
return "Old format cannot be uploaded (outdated .apbp)"
metadata = yaml_data["meta"]
slots.add(Slot(data=data,
player_name=metadata["player_name"],
player_id=metadata["player_id"],
game=yaml_data["game"]))
elif file.filename.endswith(".apmc"):
data = zfile.open(file, "r").read()

Binary file not shown.

BIN
data/basepatch.bsdiff4 Normal file

Binary file not shown.

View File

@ -221,7 +221,7 @@ Starting with version 4 of the APBP format, this is a ZIP file containing metada
files required by the game / patching process. For ROM-based games the ZIP will include a `delta.bsdiff4` which is the
bsdiff between the original and the randomized ROM.
To make using APBP easy, they can be generated by inheriting from `Patch.APDeltaPatch`.
To make using APBP easy, they can be generated by inheriting from `worlds.Files.APDeltaPatch`.
### Mod files
Games which support modding will usually just let you drag and drop the mods files into a folder somewhere.
@ -230,7 +230,7 @@ They can either be generic and modify the game using a seed or `slot_data` from
generated per seed.
If the mod is generated by AP and is installed from a ZIP file, it may be possible to include APBP metadata for easy
integration into the Webhost by inheriting from `Patch.APContainer`.
integration into the Webhost by inheriting from `worlds.Files.APContainer`.
## Archipelago Integration

156
worlds/Files.py Normal file
View File

@ -0,0 +1,156 @@
from __future__ import annotations
import json
import zipfile
from typing import ClassVar, Dict, Tuple, Any, Optional, Union, BinaryIO
import bsdiff4
class AutoPatchRegister(type):
patch_types: ClassVar[Dict[str, AutoPatchRegister]] = {}
file_endings: ClassVar[Dict[str, AutoPatchRegister]] = {}
def __new__(mcs, name: str, bases: Tuple[type, ...], dct: Dict[str, Any]) -> AutoPatchRegister:
# construct class
new_class = super().__new__(mcs, name, bases, dct)
if "game" in dct:
AutoPatchRegister.patch_types[dct["game"]] = new_class
if not dct["patch_file_ending"]:
raise Exception(f"Need an expected file ending for {name}")
AutoPatchRegister.file_endings[dct["patch_file_ending"]] = new_class
return new_class
@staticmethod
def get_handler(file: str) -> Optional[AutoPatchRegister]:
for file_ending, handler in AutoPatchRegister.file_endings.items():
if file.endswith(file_ending):
return handler
return None
current_patch_version: int = 5
class APContainer:
"""A zipfile containing at least archipelago.json"""
version: int = current_patch_version
compression_level: int = 9
compression_method: int = zipfile.ZIP_DEFLATED
game: Optional[str] = None
# instance attributes:
path: Optional[str]
player: Optional[int]
player_name: str
server: str
def __init__(self, path: Optional[str] = None, player: Optional[int] = None,
player_name: str = "", server: str = ""):
self.path = path
self.player = player
self.player_name = player_name
self.server = server
def write(self, file: Optional[Union[str, BinaryIO]] = None) -> None:
zip_file = file if file else self.path
if not zip_file:
raise FileNotFoundError(f"Cannot write {self.__class__.__name__} due to no path provided.")
with zipfile.ZipFile(zip_file, "w", self.compression_method, True, self.compression_level) \
as zf:
if file:
self.path = zf.filename
self.write_contents(zf)
def write_contents(self, opened_zipfile: zipfile.ZipFile) -> None:
manifest = self.get_manifest()
try:
manifest_str = json.dumps(manifest)
except Exception as e:
raise Exception(f"Manifest {manifest} did not convert to json.") from e
else:
opened_zipfile.writestr("archipelago.json", manifest_str)
def read(self, file: Optional[Union[str, BinaryIO]] = None) -> None:
"""Read data into patch object. file can be file-like, such as an outer zip file's stream."""
zip_file = file if file else self.path
if not zip_file:
raise FileNotFoundError(f"Cannot read {self.__class__.__name__} due to no path provided.")
with zipfile.ZipFile(zip_file, "r") as zf:
if file:
self.path = zf.filename
self.read_contents(zf)
def read_contents(self, opened_zipfile: zipfile.ZipFile) -> None:
with opened_zipfile.open("archipelago.json", "r") as f:
manifest = json.load(f)
if manifest["compatible_version"] > self.version:
raise Exception(f"File (version: {manifest['compatible_version']}) too new "
f"for this handler (version: {self.version})")
self.player = manifest["player"]
self.server = manifest["server"]
self.player_name = manifest["player_name"]
def get_manifest(self) -> Dict[str, Any]:
return {
"server": self.server, # allow immediate connection to server in multiworld. Empty string otherwise
"player": self.player,
"player_name": self.player_name,
"game": self.game,
# minimum version of patch system expected for patching to be successful
"compatible_version": 4,
"version": current_patch_version,
}
class APDeltaPatch(APContainer, metaclass=AutoPatchRegister):
"""An APContainer that additionally has delta.bsdiff4
containing a delta patch to get the desired file, often a rom."""
hash: Optional[str] # base checksum of source file
patch_file_ending: str = ""
delta: Optional[bytes] = None
result_file_ending: str = ".sfc"
source_data: bytes
def __init__(self, *args: Any, patched_path: str = "", **kwargs: Any) -> None:
self.patched_path = patched_path
super(APDeltaPatch, self).__init__(*args, **kwargs)
def get_manifest(self) -> Dict[str, Any]:
manifest = super(APDeltaPatch, self).get_manifest()
manifest["base_checksum"] = self.hash
manifest["result_file_ending"] = self.result_file_ending
manifest["patch_file_ending"] = self.patch_file_ending
return manifest
@classmethod
def get_source_data(cls) -> bytes:
"""Get Base data"""
raise NotImplementedError()
@classmethod
def get_source_data_with_cache(cls) -> bytes:
if not hasattr(cls, "source_data"):
cls.source_data = cls.get_source_data()
return cls.source_data
def write_contents(self, opened_zipfile: zipfile.ZipFile):
super(APDeltaPatch, self).write_contents(opened_zipfile)
# write Delta
opened_zipfile.writestr("delta.bsdiff4",
bsdiff4.diff(self.get_source_data_with_cache(), open(self.patched_path, "rb").read()),
compress_type=zipfile.ZIP_STORED) # bsdiff4 is a format with integrated compression
def read_contents(self, opened_zipfile: zipfile.ZipFile):
super(APDeltaPatch, self).read_contents(opened_zipfile)
self.delta = opened_zipfile.read("delta.bsdiff4")
def patch(self, target: str):
"""Base + Delta -> Patched"""
if not self.delta:
self.read()
result = bsdiff4.patch(self.get_source_data_with_cache(), self.delta)
with open(target, "wb") as f:
f.write(result)

View File

@ -1,11 +1,12 @@
from __future__ import annotations
import Utils
from Patch import read_rom
import worlds.AutoWorld
import worlds.Files
LTTPJPN10HASH = '03a63945398191337e896e5771f77173'
RANDOMIZERBASEHASH = '9952c2a3ec1b421e408df0d20c8f0c7f'
ROM_PLAYER_LIMIT = 255
LTTPJPN10HASH: str = "03a63945398191337e896e5771f77173"
RANDOMIZERBASEHASH: str = "9952c2a3ec1b421e408df0d20c8f0c7f"
ROM_PLAYER_LIMIT: int = 255
import io
import json
@ -34,7 +35,7 @@ from worlds.alttp.Text import KingsReturn_texts, Sanctuary_texts, Kakariko_texts
DeathMountain_texts, \
LostWoods_texts, WishingWell_texts, DesertPalace_texts, MountainTower_texts, LinksHouse_texts, Lumberjacks_texts, \
SickKid_texts, FluteBoy_texts, Zora_texts, MagicShop_texts, Sahasrahla_names
from Utils import local_path, user_path, int16_as_bytes, int32_as_bytes, snes_to_pc, is_frozen, parse_yaml
from Utils import local_path, user_path, int16_as_bytes, int32_as_bytes, snes_to_pc, is_frozen, parse_yaml, read_snes_rom
from worlds.alttp.Items import ItemFactory, item_table, item_name_groups, progression_items
from worlds.alttp.EntranceShuffle import door_addresses
from worlds.alttp.Options import smallkey_shuffle
@ -57,13 +58,13 @@ class LocalRom(object):
self.orig_buffer = None
with open(file, 'rb') as stream:
self.buffer = read_rom(stream)
self.buffer = read_snes_rom(stream)
if patch:
self.patch_base_rom()
self.orig_buffer = self.buffer.copy()
if vanillaRom:
with open(vanillaRom, 'rb') as vanillaStream:
self.orig_buffer = read_rom(vanillaStream)
self.orig_buffer = read_snes_rom(vanillaStream)
def read_byte(self, address: int) -> int:
return self.buffer[address]
@ -123,29 +124,24 @@ class LocalRom(object):
return expected == buffermd5.hexdigest()
def patch_base_rom(self):
if os.path.isfile(local_path('basepatch.sfc')):
with open(local_path('basepatch.sfc'), 'rb') as stream:
if os.path.isfile(user_path('basepatch.sfc')):
with open(user_path('basepatch.sfc'), 'rb') as stream:
buffer = bytearray(stream.read())
if self.verify(buffer):
self.buffer = buffer
if not os.path.exists(local_path('data', 'basepatch.apbp')):
Patch.create_patch_file(local_path('basepatch.sfc'))
return
if not os.path.isfile(local_path('data', 'basepatch.apbp')):
raise RuntimeError('Base patch unverified. Unable to continue.')
with open(local_path("data", "basepatch.bsdiff4"), "rb") as f:
delta = f.read()
if os.path.isfile(local_path('data', 'basepatch.apbp')):
_, target, buffer = Patch.create_rom_bytes(local_path('data', 'basepatch.apbp'), ignore_version=True)
if self.verify(buffer):
self.buffer = bytearray(buffer)
with open(user_path('basepatch.sfc'), 'wb') as stream:
stream.write(buffer)
return
raise RuntimeError('Base patch unverified. Unable to continue.')
raise RuntimeError('Could not find Base Patch. Unable to continue.')
buffer = bsdiff4.patch(get_base_rom_bytes(), delta)
if self.verify(buffer):
self.buffer = bytearray(buffer)
with open(user_path('basepatch.sfc'), 'wb') as stream:
stream.write(buffer)
return
raise RuntimeError('Base patch unverified. Unable to continue.')
def write_crc(self):
crc = (sum(self.buffer[:0x7FDC] + self.buffer[0x7FE0:]) + 0x01FE) & 0xFFFF
@ -544,7 +540,7 @@ class Sprite():
def get_vanilla_sprite_data(self):
file_name = get_base_rom_path()
base_rom_bytes = bytes(read_rom(open(file_name, "rb")))
base_rom_bytes = bytes(read_snes_rom(open(file_name, "rb")))
Sprite.sprite = base_rom_bytes[0x80000:0x87000]
Sprite.palette = base_rom_bytes[0xDD308:0xDD380]
Sprite.glove_palette = base_rom_bytes[0xDEDF5:0xDEDF9]
@ -2906,7 +2902,7 @@ hash_alphabet = [
]
class LttPDeltaPatch(Patch.APDeltaPatch):
class LttPDeltaPatch(worlds.Files.APDeltaPatch):
hash = LTTPJPN10HASH
game = "A Link to the Past"
patch_file_ending = ".aplttp"
@ -2920,7 +2916,7 @@ def get_base_rom_bytes(file_name: str = "") -> bytes:
base_rom_bytes = getattr(get_base_rom_bytes, "base_rom_bytes", None)
if not base_rom_bytes:
file_name = get_base_rom_path(file_name)
base_rom_bytes = bytes(read_rom(open(file_name, "rb")))
base_rom_bytes = bytes(read_snes_rom(open(file_name, "rb")))
basemd5 = hashlib.md5()
basemd5.update(base_rom_bytes)

View File

@ -1,5 +1,6 @@
import Utils
from Patch import read_rom, APDeltaPatch
from Utils import read_snes_rom
from worlds.Files import APDeltaPatch
from .Locations import lookup_id_to_name, all_locations
from .Levels import level_list, level_dict
@ -440,13 +441,13 @@ class LocalRom(object):
self.orig_buffer = None
with open(file, 'rb') as stream:
self.buffer = read_rom(stream)
self.buffer = read_snes_rom(stream)
#if patch:
# self.patch_rom()
# self.orig_buffer = self.buffer.copy()
#if vanillaRom:
# with open(vanillaRom, 'rb') as vanillaStream:
# self.orig_buffer = read_rom(vanillaStream)
# self.orig_buffer = read_snes_rom(vanillaStream)
def read_bit(self, address: int, bit_number: int) -> bool:
bitflag = (1 << bit_number)
@ -724,7 +725,7 @@ def get_base_rom_bytes(file_name: str = "") -> bytes:
base_rom_bytes = getattr(get_base_rom_bytes, "base_rom_bytes", None)
if not base_rom_bytes:
file_name = get_base_rom_path(file_name)
base_rom_bytes = bytes(read_rom(open(file_name, "rb")))
base_rom_bytes = bytes(read_snes_rom(open(file_name, "rb")))
basemd5 = hashlib.md5()
basemd5.update(base_rom_bytes)

View File

@ -11,6 +11,8 @@ import shutil
import Utils
import Patch
import worlds.AutoWorld
import worlds.Files
from . import Options
from .Technologies import tech_table, recipes, free_sample_exclusions, progressive_technology_table, \
@ -57,7 +59,7 @@ recipe_time_ranges = {
}
class FactorioModFile(Patch.APContainer):
class FactorioModFile(worlds.Files.APContainer):
game = "Factorio"
compression_method = zipfile.ZIP_DEFLATED # Factorio can't load LZMA archives

View File

@ -3,7 +3,8 @@ import os
import json
import Utils
from Patch import read_rom, APDeltaPatch
from Utils import read_snes_rom
from worlds.Files import APDeltaPatch
SMJUHASH = '21f3e98df4780ee1c667b84e57d88675'
ROM_PLAYER_LIMIT = 65535 # max archipelago player ID. note, SM ROM itself will only store 201 names+ids max
@ -22,7 +23,7 @@ def get_base_rom_bytes(file_name: str = "") -> bytes:
base_rom_bytes = getattr(get_base_rom_bytes, "base_rom_bytes", None)
if not base_rom_bytes:
file_name = get_base_rom_path(file_name)
base_rom_bytes = bytes(read_rom(open(file_name, "rb")))
base_rom_bytes = bytes(read_snes_rom(open(file_name, "rb")))
basemd5 = hashlib.md5()
basemd5.update(base_rom_bytes)

View File

@ -1,8 +1,7 @@
import Utils
from Patch import read_rom, APDeltaPatch
from worlds.Files import APDeltaPatch
from .Aesthetics import generate_shuffled_header_data
from .Locations import lookup_id_to_name, all_locations
from .Levels import level_info_dict, full_level_list, submap_level_list, location_id_to_level_id
from .Levels import level_info_dict
from .Names.TextBox import generate_goal_text, title_text_mapping, generate_text_box
USHASH = 'cdd3c8c37322978ca8669b34bc89c804'
@ -69,7 +68,7 @@ class LocalRom:
self.orig_buffer = None
with open(file, 'rb') as stream:
self.buffer = read_rom(stream)
self.buffer = Utils.read_snes_rom(stream)
def read_bit(self, address: int, bit_number: int) -> bool:
bitflag = (1 << bit_number)
@ -827,7 +826,7 @@ def get_base_rom_bytes(file_name: str = "") -> bytes:
base_rom_bytes = getattr(get_base_rom_bytes, "base_rom_bytes", None)
if not base_rom_bytes:
file_name = get_base_rom_path(file_name)
base_rom_bytes = bytes(read_rom(open(file_name, "rb")))
base_rom_bytes = bytes(Utils.read_snes_rom(open(file_name, "rb")))
basemd5 = hashlib.md5()
basemd5.update(base_rom_bytes)
@ -837,6 +836,7 @@ def get_base_rom_bytes(file_name: str = "") -> bytes:
get_base_rom_bytes.base_rom_bytes = base_rom_bytes
return base_rom_bytes
def get_base_rom_path(file_name: str = "") -> str:
options = Utils.get_options()
if not file_name:

View File

@ -14,7 +14,6 @@ from ..generic.Rules import add_rule
from .Names import ItemName, LocationName
from ..AutoWorld import WebWorld, World
from .Rom import LocalRom, patch_rom, get_base_rom_path, SMWDeltaPatch
import Patch
class SMWWeb(WebWorld):
@ -146,6 +145,7 @@ class SMWWorld(World):
def generate_output(self, output_directory: str):
rompath = "" # if variable is not declared finally clause may fail
try:
world = self.world
player = self.player
@ -167,9 +167,9 @@ class SMWWorld(World):
except:
raise
finally:
self.rom_name_available_event.set() # make sure threading continues and errors are collected
if os.path.exists(rompath):
os.unlink(rompath)
self.rom_name_available_event.set() # make sure threading continues and errors are collected
def modify_multidata(self, multidata: dict):
import base64

View File

@ -2,7 +2,8 @@ import hashlib
import os
import Utils
from Patch import read_rom, APDeltaPatch
from Utils import read_snes_rom
from worlds.Files import APDeltaPatch
SMJUHASH = '21f3e98df4780ee1c667b84e57d88675'
LTTPJPN10HASH = '03a63945398191337e896e5771f77173'
@ -23,7 +24,7 @@ def get_base_rom_bytes() -> bytes:
base_rom_bytes = getattr(get_base_rom_bytes, "base_rom_bytes", None)
if not base_rom_bytes:
sm_file_name = get_sm_base_rom_path()
sm_base_rom_bytes = bytes(read_rom(open(sm_file_name, "rb")))
sm_base_rom_bytes = bytes(read_snes_rom(open(sm_file_name, "rb")))
basemd5 = hashlib.md5()
basemd5.update(sm_base_rom_bytes)
@ -31,7 +32,7 @@ def get_base_rom_bytes() -> bytes:
raise Exception('Supplied Base Rom does not match known MD5 for SM Japan+US release. '
'Get the correct game and version, then dump it')
lttp_file_name = get_lttp_base_rom_path()
lttp_base_rom_bytes = bytes(read_rom(open(lttp_file_name, "rb")))
lttp_base_rom_bytes = bytes(read_snes_rom(open(lttp_file_name, "rb")))
basemd5 = hashlib.md5()
basemd5.update(lttp_base_rom_bytes)

View File

@ -2,7 +2,7 @@ import bsdiff4
import yaml
from typing import Optional
import Utils
from Patch import APDeltaPatch
from worlds.Files import APDeltaPatch
import os