Post-Merge Cleanup

This commit is contained in:
Fabian Dill 2021-11-12 14:36:34 +01:00
parent 77ec8d4141
commit 4a8ba0575f
6 changed files with 51 additions and 51 deletions

View File

@ -10,14 +10,14 @@ from typing import Tuple, Optional
import Utils import Utils
current_patch_version = 3 current_patch_version = 3
GAME_ALTTP = 0 GAME_ALTTP = "A Link to the Past"
GAME_SM = 1 GAME_SM = "Super Metroid"
supported_games = ["A Link to the Past", "Super Metroid"] supported_games = {"A Link to the Past", "Super Metroid"}
def generate_yaml(patch: bytes, metadata: Optional[dict] = None, game: int = GAME_ALTTP) -> bytes:
def generate_yaml(patch: bytes, metadata: Optional[dict] = None, game: str = GAME_ALTTP) -> bytes:
if game == GAME_ALTTP: if game == GAME_ALTTP:
from worlds.alttp.Rom import JAP10HASH from worlds.alttp.Rom import JAP10HASH
elif game == GAME_SM: elif game == GAME_SM:
@ -27,7 +27,7 @@ def generate_yaml(patch: bytes, metadata: Optional[dict] = None, game: int = GAM
patch = yaml.dump({"meta": metadata, patch = yaml.dump({"meta": metadata,
"patch": patch, "patch": patch,
"game": supported_games[game], "game": game,
# minimum version of patch system expected for patching to be successful # minimum version of patch system expected for patching to be successful
"compatible_version": 3, "compatible_version": 3,
"version": current_patch_version, "version": current_patch_version,
@ -35,7 +35,7 @@ def generate_yaml(patch: bytes, metadata: Optional[dict] = None, game: int = GAM
return patch.encode(encoding="utf-8-sig") return patch.encode(encoding="utf-8-sig")
def generate_patch(rom: bytes, metadata: Optional[dict] = None, game: int = GAME_ALTTP) -> bytes: def generate_patch(rom: bytes, metadata: Optional[dict] = None, game: str = GAME_ALTTP) -> bytes:
if game == GAME_ALTTP: if game == GAME_ALTTP:
from worlds.alttp.Rom import get_base_rom_bytes from worlds.alttp.Rom import get_base_rom_bytes
elif game == GAME_SM: elif game == GAME_SM:
@ -50,14 +50,15 @@ def generate_patch(rom: bytes, metadata: Optional[dict] = None, game: int = GAME
def create_patch_file(rom_file_to_patch: str, server: str = "", destination: str = None, def create_patch_file(rom_file_to_patch: str, server: str = "", destination: str = None,
player: int = 0, player_name: str = "", game: int = GAME_ALTTP) -> str: player: int = 0, player_name: str = "", game: str = GAME_ALTTP) -> str:
meta = {"server": server, # allow immediate connection to server in multiworld. Empty string otherwise meta = {"server": server, # allow immediate connection to server in multiworld. Empty string otherwise
"player_id": player, "player_id": player,
"player_name": player_name} "player_name": player_name}
bytes = generate_patch(load_bytes(rom_file_to_patch), bytes = generate_patch(load_bytes(rom_file_to_patch),
meta, meta,
game) game)
target = destination if destination else os.path.splitext(rom_file_to_patch)[0] + (".apbp" if game == GAME_ALTTP else ".apm3") target = destination if destination else os.path.splitext(rom_file_to_patch)[0] + (
".apbp" if game == GAME_ALTTP else ".apm3")
write_lzma(bytes, target) write_lzma(bytes, target)
return target return target
@ -66,13 +67,16 @@ def create_rom_bytes(patch_file: str, ignore_version: bool = False) -> Tuple[dic
data = Utils.parse_yaml(lzma.decompress(load_bytes(patch_file)).decode("utf-8-sig")) data = Utils.parse_yaml(lzma.decompress(load_bytes(patch_file)).decode("utf-8-sig"))
game_name = data["game"] game_name = data["game"]
if game_name in supported_games: if game_name in supported_games:
game_index = supported_games.index(game_name) if game_name == GAME_ALTTP:
if game_index == GAME_ALTTP:
from worlds.alttp.Rom import get_base_rom_bytes from worlds.alttp.Rom import get_base_rom_bytes
elif game_index == GAME_SM: elif game_name == GAME_SM:
from worlds.sm.Rom import get_base_rom_bytes from worlds.sm.Rom import get_base_rom_bytes
else: else:
raise Exception(f"No Patch handler for game {game_name}")
elif game_name == "alttp": # old version for A Link to the Past
from worlds.alttp.Rom import get_base_rom_bytes from worlds.alttp.Rom import get_base_rom_bytes
else:
raise Exception(f"Cannot handle game {game_name}")
if not ignore_version and data["compatible_version"] > current_patch_version: if not ignore_version and data["compatible_version"] > current_patch_version:
raise RuntimeError("Patch file is incompatible with this patcher, likely an update is required.") raise RuntimeError("Patch file is incompatible with this patcher, likely an update is required.")
@ -107,6 +111,14 @@ def write_lzma(data: bytes, path: str):
f.write(data) f.write(data)
def read_rom(stream, strip_header=True) -> bytearray:
"""Reads rom into bytearray and optionally strips off any smc header"""
buffer = bytearray(stream.read())
if strip_header and len(buffer) % 0x400 == 0x200:
return buffer[0x200:]
return buffer
if __name__ == "__main__": if __name__ == "__main__":
host = Utils.get_public_ipv4() host = Utils.get_public_ipv4()
options = Utils.get_options()['server_options'] options = Utils.get_options()['server_options']
@ -191,12 +203,4 @@ if __name__ == "__main__":
import traceback import traceback
traceback.print_exc() traceback.print_exc()
input("Press enter to close.") input("Press enter to close.")
def read_rom(stream, strip_header=True) -> bytearray:
"""Reads rom into bytearray and optionally strips off any smc header"""
buffer = bytearray(stream.read())
if strip_header and len(buffer) % 0x400 == 0x200:
return buffer[0x200:]
return buffer

View File

@ -122,7 +122,6 @@ class Context(CommonContext):
self.awaiting_rom = False self.awaiting_rom = False
self.rom = None self.rom = None
self.prev_rom = None self.prev_rom = None
self.gameID = None
async def connection_closed(self): async def connection_closed(self):
await super(Context, self).connection_closed() await super(Context, self).connection_closed()
@ -148,7 +147,8 @@ class Context(CommonContext):
await self.send_msgs([{"cmd": 'Connect', await self.send_msgs([{"cmd": 'Connect',
'password': self.password, 'name': auth, 'version': Utils.version_tuple, 'password': self.password, 'name': auth, 'version': Utils.version_tuple,
'tags': self.tags, 'tags': self.tags,
'uuid': Utils.get_unique_identifier(), 'game': "Super Metroid" if self.rom[:2] == b"SM" else "A Link to the Past" 'uuid': Utils.get_unique_identifier(),
'game': self.game
}]) }])
def on_deathlink(self, data: dict): def on_deathlink(self, data: dict):
@ -161,25 +161,23 @@ async def deathlink_kill_player(ctx: Context):
ctx.death_state = DeathState.killing_player ctx.death_state = DeathState.killing_player
while ctx.death_state == DeathState.killing_player and \ while ctx.death_state == DeathState.killing_player and \
ctx.snes_state == SNESState.SNES_ATTACHED: ctx.snes_state == SNESState.SNES_ATTACHED:
if ctx.gameID == GAME_ALTTP: if ctx.game == GAME_ALTTP:
snes_buffered_write(ctx, WRAM_START + 0xF36D, bytes([0])) # set current health to 0 snes_buffered_write(ctx, WRAM_START + 0xF36D, bytes([0])) # set current health to 0
snes_buffered_write(ctx, WRAM_START + 0x0373, bytes([8])) # deal 1 full heart of damage at next opportunity snes_buffered_write(ctx, WRAM_START + 0x0373, bytes([8])) # deal 1 full heart of damage at next opportunity
elif ctx.gameID == GAME_SM: elif ctx.game == GAME_SM:
snes_buffered_write(ctx, WRAM_START + 0x09C2, bytes([0, 0])) # set current health to 0 snes_buffered_write(ctx, WRAM_START + 0x09C2, bytes([0, 0])) # set current health to 0
await snes_flush_writes(ctx) await snes_flush_writes(ctx)
await asyncio.sleep(1) await asyncio.sleep(1)
gamemode = None gamemode = None
if ctx.gameID == GAME_ALTTP: if ctx.game == GAME_ALTTP:
gamemode = await snes_read(ctx, WRAM_START + 0x10, 1) gamemode = await snes_read(ctx, WRAM_START + 0x10, 1)
elif ctx.gameID == GAME_SM: elif ctx.game == GAME_SM:
gamemode = await snes_read(ctx, WRAM_START + 0x0998, 1) gamemode = await snes_read(ctx, WRAM_START + 0x0998, 1)
if not gamemode or gamemode[0] in (DEATH_MODES if ctx.gameID == GAME_ALTTP else SM_DEATH_MODES): if not gamemode or gamemode[0] in (DEATH_MODES if ctx.game == GAME_ALTTP else SM_DEATH_MODES):
ctx.death_state = DeathState.dead ctx.death_state = DeathState.dead
ctx.last_death_link = time.time() ctx.last_death_link = time.time()
def color_item(item_id: int, green: bool = False) -> str: def color_item(item_id: int, green: bool = False) -> str:
item_name = get_item_name_from_id(item_id) item_name = get_item_name_from_id(item_id)
item_colors = ['green' if green else 'cyan'] item_colors = ['green' if green else 'cyan']
@ -886,16 +884,17 @@ async def game_watcher(ctx: Context):
if gameName is None: if gameName is None:
continue continue
elif gameName == b"SM": elif gameName == b"SM":
ctx.gameID = GAME_SM ctx.game = GAME_SM
else: else:
ctx.gameID = GAME_ALTTP ctx.game = GAME_ALTTP
rom = await snes_read(ctx, SM_ROMNAME_START if ctx.gameID == GAME_SM else ROMNAME_START, ROMNAME_SIZE) rom = await snes_read(ctx, SM_ROMNAME_START if ctx.game == GAME_SM else ROMNAME_START, ROMNAME_SIZE)
if rom is None or rom == bytes([0] * ROMNAME_SIZE): if rom is None or rom == bytes([0] * ROMNAME_SIZE):
continue continue
ctx.rom = rom ctx.rom = rom
death_link = await snes_read(ctx, DEATH_LINK_ACTIVE_ADDR if ctx.gameID == GAME_ALTTP else SM_DEATH_LINK_ACTIVE_ADDR, 1) death_link = await snes_read(ctx, DEATH_LINK_ACTIVE_ADDR if ctx.game == GAME_ALTTP else
SM_DEATH_LINK_ACTIVE_ADDR, 1)
if death_link: if death_link:
death_link = bool(death_link[0] & 0b1) death_link = bool(death_link[0] & 0b1)
old_tags = ctx.tags.copy() old_tags = ctx.tags.copy()
@ -917,7 +916,7 @@ async def game_watcher(ctx: Context):
snes_logger.warning("ROM change detected, please reconnect to the multiworld server") snes_logger.warning("ROM change detected, please reconnect to the multiworld server")
await ctx.disconnect() await ctx.disconnect()
if ctx.gameID == GAME_ALTTP: if ctx.game == GAME_ALTTP:
gamemode = await snes_read(ctx, WRAM_START + 0x10, 1) gamemode = await snes_read(ctx, WRAM_START + 0x10, 1)
if "DeathLink" in ctx.tags and gamemode and ctx.last_death_link + 1 < time.time(): if "DeathLink" in ctx.tags and gamemode and ctx.last_death_link + 1 < time.time():
currently_dead = gamemode[0] in DEATH_MODES currently_dead = gamemode[0] in DEATH_MODES
@ -998,7 +997,7 @@ async def game_watcher(ctx: Context):
ctx.locations_scouted.add(scout_location) ctx.locations_scouted.add(scout_location)
await ctx.send_msgs([{"cmd": "LocationScouts", "locations": [scout_location]}]) await ctx.send_msgs([{"cmd": "LocationScouts", "locations": [scout_location]}])
await track_locations(ctx, roomid, roomdata) await track_locations(ctx, roomid, roomdata)
elif ctx.gameID == GAME_SM: elif ctx.game == GAME_SM:
gamemode = await snes_read(ctx, WRAM_START + 0x0998, 1) gamemode = await snes_read(ctx, WRAM_START + 0x0998, 1)
if "DeathLink" in ctx.tags and gamemode and ctx.last_death_link + 1 < time.time(): if "DeathLink" in ctx.tags and gamemode and ctx.last_death_link + 1 < time.time():
currently_dead = gamemode[0] in SM_DEATH_MODES currently_dead = gamemode[0] in SM_DEATH_MODES

View File

@ -13,7 +13,7 @@ class Version(typing.NamedTuple):
build: int build: int
__version__ = "0.1.9" __version__ = "0.2.0"
version_tuple = tuplize_version(__version__) version_tuple = tuplize_version(__version__)
import builtins import builtins
@ -162,7 +162,7 @@ def get_default_options() -> dict:
"executable": "factorio\\bin\\x64\\factorio", "executable": "factorio\\bin\\x64\\factorio",
}, },
"sm_options": { "sm_options": {
"rom_file": "Super Metroid (JU)[!].sfc", "rom_file": "Super Metroid (JU).sfc",
"sni": "SNI", "sni": "SNI",
"rom_start": True, "rom_start": True,
}, },

View File

@ -93,7 +93,7 @@ lttp_options:
rom_start: true rom_start: true
sm_options: sm_options:
# File name of the v1.0 J rom # File name of the v1.0 J rom
rom_file: "Super Metroid (JU)[!].sfc" rom_file: "Super Metroid (JU).sfc"
# Set this to your SNI folder location if you want the MultiClient to attempt an auto start, does nothing if not found # Set this to your SNI folder location if you want the MultiClient to attempt an auto start, does nothing if not found
sni: "SNI" sni: "SNI"
# Set this to false to never autostart a rom (such as after patching) # Set this to false to never autostart a rom (such as after patching)

View File

@ -324,7 +324,7 @@ class ALTTPWorld(World):
del (multidata["connect_names"][self.world.player_name[self.player]]) del (multidata["connect_names"][self.world.player_name[self.player]])
def get_required_client_version(self) -> tuple: def get_required_client_version(self) -> tuple:
return max((0, 1, 4), super(ALTTPWorld, self).get_required_client_version()) return max((0, 2, 0), super(ALTTPWorld, self).get_required_client_version())
def create_item(self, name: str) -> Item: def create_item(self, name: str) -> Item:
return ALttPItem(name, self.player, **as_dict_item_table[name]) return ALttPItem(name, self.player, **as_dict_item_table[name])

View File

@ -28,13 +28,10 @@ from logic.logic import Logic
from randomizer import VariaRandomizer from randomizer import VariaRandomizer
from Utils import output_path
from shutil import copy2
class SMWorld(World): class SMWorld(World):
game: str = "Super Metroid" game: str = "Super Metroid"
topology_present = True topology_present = True
data_version = 0 data_version = 1
options = sm_options options = sm_options
item_names: Set[str] = frozenset(items_lookup_name_to_id) item_names: Set[str] = frozenset(items_lookup_name_to_id)
location_names: Set[str] = frozenset(locations_lookup_name_to_id) location_names: Set[str] = frozenset(locations_lookup_name_to_id)