From 0b941e22682c85f5fcae6493523a701b40b5cab6 Mon Sep 17 00:00:00 2001 From: Fabian Dill Date: Sat, 7 Jan 2023 10:22:15 +0100 Subject: [PATCH] LttP: attempt at preventing ghost location checks (#1355) --- worlds/alttp/Client.py | 231 +++++++++++++++++++++-------------------- 1 file changed, 119 insertions(+), 112 deletions(-) diff --git a/worlds/alttp/Client.py b/worlds/alttp/Client.py index b3a12a7f..3484355d 100644 --- a/worlds/alttp/Client.py +++ b/worlds/alttp/Client.py @@ -322,7 +322,7 @@ location_table_misc = {'Bottle Merchant': (0x3c9, 0x2), location_table_misc_id = {Regions.lookup_name_to_id[name]: data for name, data in location_table_misc.items()} -async def track_locations(ctx, roomid, roomdata): +async def track_locations(ctx, roomid, roomdata) -> bool: from SNIClient import snes_read, snes_buffered_write, snes_flush_writes new_locations = [] @@ -451,10 +451,126 @@ async def track_locations(ctx, roomid, roomdata): if misc_data_changed: snes_buffered_write(ctx, SAVEDATA_START + 0x3c6, bytes(misc_data)) - if new_locations: - await ctx.send_msgs([{"cmd": 'LocationChecks', "locations": new_locations}]) + # verify rom is still the same: + rom_name = await snes_read(ctx, ROMNAME_START, ROMNAME_SIZE) + if rom_name is None or all(byte == b"\x00" for byte in rom_name) or rom_name[:2] != b"AP" or \ + rom_name != ctx.rom: + snes_logger.info(f"Discarding recent {len(new_locations)} checks as ROM Status has changed.") + return False + else: + await ctx.send_msgs([{"cmd": 'LocationChecks', "locations": new_locations}]) await snes_flush_writes(ctx) + return True + + +class ALTTPSNIClient(SNIClient): + game = "A Link to the Past" + + async def deathlink_kill_player(self, ctx): + from SNIClient import DeathState, snes_read, snes_buffered_write, snes_flush_writes + invincible = await snes_read(ctx, WRAM_START + 0x037B, 1) + last_health = await snes_read(ctx, WRAM_START + 0xF36D, 1) + await asyncio.sleep(0.25) + health = await snes_read(ctx, WRAM_START + 0xF36D, 1) + if not invincible or not last_health or not health: + ctx.death_state = DeathState.dead + ctx.last_death_link = time.time() + return + if not invincible[0] and last_health[0] == health[0]: + snes_buffered_write(ctx, WRAM_START + 0xF36D, bytes([0])) # set current health to 0 + snes_buffered_write(ctx, WRAM_START + 0x0373, + bytes([8])) # deal 1 full heart of damage at next opportunity + + await snes_flush_writes(ctx) + await asyncio.sleep(1) + + gamemode = await snes_read(ctx, WRAM_START + 0x10, 1) + if not gamemode or gamemode[0] in DEATH_MODES: + ctx.death_state = DeathState.dead + + async def validate_rom(self, ctx) -> bool: + from SNIClient import snes_read + + rom_name = await snes_read(ctx, ROMNAME_START, ROMNAME_SIZE) + if rom_name is None or all(byte == b"\x00" for byte in rom_name) or rom_name[:2] != b"AP": + return False + + ctx.game = self.game + ctx.items_handling = 0b001 # full local + + ctx.rom = rom_name + + death_link = await snes_read(ctx, DEATH_LINK_ACTIVE_ADDR, 1) + + if death_link: + ctx.allow_collect = bool(death_link[0] & 0b100) + ctx.death_link_allow_survive = bool(death_link[0] & 0b10) + await ctx.update_death_link(bool(death_link[0] & 0b1)) + + return True + + async def game_watcher(self, ctx): + from SNIClient import snes_read, snes_buffered_write, snes_flush_writes + gamemode = await snes_read(ctx, WRAM_START + 0x10, 1) + if "DeathLink" in ctx.tags and gamemode and ctx.last_death_link + 1 < time.time(): + currently_dead = gamemode[0] in DEATH_MODES + await ctx.handle_deathlink_state(currently_dead) + + gameend = await snes_read(ctx, SAVEDATA_START + 0x443, 1) + game_timer = await snes_read(ctx, SAVEDATA_START + 0x42E, 4) + if gamemode is None or gameend is None or game_timer is None or \ + (gamemode[0] not in INGAME_MODES and gamemode[0] not in ENDGAME_MODES): + return + + if gameend[0]: + if not ctx.finished_game: + await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}]) + ctx.finished_game = True + + if gamemode in ENDGAME_MODES: # triforce room and credits + return + + data = await snes_read(ctx, RECV_PROGRESS_ADDR, 8) + if data is None: + return + + recv_index = data[0] | (data[1] << 8) + recv_item = data[2] + roomid = data[4] | (data[5] << 8) + roomdata = data[6] + scout_location = data[7] + + if recv_index < len(ctx.items_received) and recv_item == 0: + item = ctx.items_received[recv_index] + recv_index += 1 + logging.info('Received %s from %s (%s) (%d/%d in list)' % ( + color(ctx.item_names[item.item], 'red', 'bold'), + color(ctx.player_names[item.player], 'yellow'), + ctx.location_names[item.location], recv_index, len(ctx.items_received))) + + snes_buffered_write(ctx, RECV_PROGRESS_ADDR, + bytes([recv_index & 0xFF, (recv_index >> 8) & 0xFF])) + snes_buffered_write(ctx, RECV_ITEM_ADDR, + bytes([item.item])) + snes_buffered_write(ctx, RECV_ITEM_PLAYER_ADDR, + bytes([min(ROM_PLAYER_LIMIT, item.player) if item.player != ctx.slot else 0])) + if scout_location > 0 and scout_location in ctx.locations_info: + snes_buffered_write(ctx, SCOUTREPLY_LOCATION_ADDR, + bytes([scout_location])) + snes_buffered_write(ctx, SCOUTREPLY_ITEM_ADDR, + bytes([ctx.locations_info[scout_location].item])) + snes_buffered_write(ctx, SCOUTREPLY_PLAYER_ADDR, + bytes([min(ROM_PLAYER_LIMIT, ctx.locations_info[scout_location].player)])) + + await snes_flush_writes(ctx) + + if scout_location > 0 and scout_location not in ctx.locations_scouted: + ctx.locations_scouted.add(scout_location) + await ctx.send_msgs([{"cmd": "LocationScouts", "locations": [scout_location]}]) + same_rom = await track_locations(ctx, roomid, roomdata) + if not same_rom: + return def get_alttp_settings(romfile: str): @@ -582,112 +698,3 @@ def get_alttp_settings(romfile: str): else: adjusted = False return adjustedromfile, adjusted - - -class ALTTPSNIClient(SNIClient): - game = "A Link to the Past" - - async def deathlink_kill_player(self, ctx): - from SNIClient import DeathState, snes_read, snes_buffered_write, snes_flush_writes - invincible = await snes_read(ctx, WRAM_START + 0x037B, 1) - last_health = await snes_read(ctx, WRAM_START + 0xF36D, 1) - await asyncio.sleep(0.25) - health = await snes_read(ctx, WRAM_START + 0xF36D, 1) - if not invincible or not last_health or not health: - ctx.death_state = DeathState.dead - ctx.last_death_link = time.time() - return - if not invincible[0] and last_health[0] == health[0]: - snes_buffered_write(ctx, WRAM_START + 0xF36D, bytes([0])) # set current health to 0 - snes_buffered_write(ctx, WRAM_START + 0x0373, - bytes([8])) # deal 1 full heart of damage at next opportunity - - await snes_flush_writes(ctx) - await asyncio.sleep(1) - - gamemode = await snes_read(ctx, WRAM_START + 0x10, 1) - if not gamemode or gamemode[0] in DEATH_MODES: - ctx.death_state = DeathState.dead - - - async def validate_rom(self, ctx): - from SNIClient import snes_read, snes_buffered_write, snes_flush_writes - - rom_name = await snes_read(ctx, ROMNAME_START, ROMNAME_SIZE) - if rom_name is None or rom_name == bytes([0] * ROMNAME_SIZE) or rom_name[:2] != b"AP": - return False - - ctx.game = self.game - ctx.items_handling = 0b001 # full local - - ctx.rom = rom_name - - death_link = await snes_read(ctx, DEATH_LINK_ACTIVE_ADDR, 1) - - if death_link: - ctx.allow_collect = bool(death_link[0] & 0b100) - ctx.death_link_allow_survive = bool(death_link[0] & 0b10) - await ctx.update_death_link(bool(death_link[0] & 0b1)) - - return True - - - async def game_watcher(self, ctx): - from SNIClient import snes_read, snes_buffered_write, snes_flush_writes - gamemode = await snes_read(ctx, WRAM_START + 0x10, 1) - if "DeathLink" in ctx.tags and gamemode and ctx.last_death_link + 1 < time.time(): - currently_dead = gamemode[0] in DEATH_MODES - await ctx.handle_deathlink_state(currently_dead) - - gameend = await snes_read(ctx, SAVEDATA_START + 0x443, 1) - game_timer = await snes_read(ctx, SAVEDATA_START + 0x42E, 4) - if gamemode is None or gameend is None or game_timer is None or \ - (gamemode[0] not in INGAME_MODES and gamemode[0] not in ENDGAME_MODES): - return - - if gameend[0]: - if not ctx.finished_game: - await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}]) - ctx.finished_game = True - - if gamemode in ENDGAME_MODES: # triforce room and credits - return - - data = await snes_read(ctx, RECV_PROGRESS_ADDR, 8) - if data is None: - return - - recv_index = data[0] | (data[1] << 8) - recv_item = data[2] - roomid = data[4] | (data[5] << 8) - roomdata = data[6] - scout_location = data[7] - - if recv_index < len(ctx.items_received) and recv_item == 0: - item = ctx.items_received[recv_index] - recv_index += 1 - logging.info('Received %s from %s (%s) (%d/%d in list)' % ( - color(ctx.item_names[item.item], 'red', 'bold'), - color(ctx.player_names[item.player], 'yellow'), - ctx.location_names[item.location], recv_index, len(ctx.items_received))) - - snes_buffered_write(ctx, RECV_PROGRESS_ADDR, - bytes([recv_index & 0xFF, (recv_index >> 8) & 0xFF])) - snes_buffered_write(ctx, RECV_ITEM_ADDR, - bytes([item.item])) - snes_buffered_write(ctx, RECV_ITEM_PLAYER_ADDR, - bytes([min(ROM_PLAYER_LIMIT, item.player) if item.player != ctx.slot else 0])) - if scout_location > 0 and scout_location in ctx.locations_info: - snes_buffered_write(ctx, SCOUTREPLY_LOCATION_ADDR, - bytes([scout_location])) - snes_buffered_write(ctx, SCOUTREPLY_ITEM_ADDR, - bytes([ctx.locations_info[scout_location].item])) - snes_buffered_write(ctx, SCOUTREPLY_PLAYER_ADDR, - bytes([min(ROM_PLAYER_LIMIT, ctx.locations_info[scout_location].player)])) - - await snes_flush_writes(ctx) - - if scout_location > 0 and scout_location not in ctx.locations_scouted: - ctx.locations_scouted.add(scout_location) - await ctx.send_msgs([{"cmd": "LocationScouts", "locations": [scout_location]}]) - await track_locations(ctx, roomid, roomdata)