from typing import TYPE_CHECKING, Optional, Set, List, Dict import struct from NetUtils import ClientStatus from .Locations import roomCount, nonBlock, beanstones, roomException, shop, badge, pants, eReward from .Items import items_by_id import asyncio import worlds._bizhawk as bizhawk from worlds._bizhawk.client import BizHawkClient if TYPE_CHECKING: from worlds._bizhawk.context import BizHawkClientContext ROOM_ARRAY_POINTER = 0x51FA00 class MLSSClient(BizHawkClient): game = "Mario & Luigi Superstar Saga" system = "GBA" patch_suffix = ".apmlss" local_checked_locations: Set[int] goal_flag: int rom_slot_name: Optional[str] eUsed: List[int] room: int local_events: List[int] player_name: Optional[str] checked_flags: Dict[int, list] = {} def __init__(self) -> None: super().__init__() self.local_checked_locations = set() self.local_set_events = {} self.local_found_key_items = {} self.rom_slot_name = None self.seed_verify = False self.eUsed = [] self.room = 0 self.local_events = [] async def validate_rom(self, ctx: "BizHawkClientContext") -> bool: from CommonClient import logger try: # Check ROM name/patch version rom_name_bytes = await bizhawk.read(ctx.bizhawk_ctx, [(0xA0, 14, "ROM")]) rom_name = bytes([byte for byte in rom_name_bytes[0] if byte != 0]).decode("UTF-8") if not rom_name.startswith("MARIO&LUIGIU"): return False if rom_name == "MARIO&LUIGIUA8": logger.info( "ERROR: You appear to be running an unpatched version of Mario & Luigi Superstar Saga. " "You need to generate a patch file and use it to create a patched ROM." ) return False if rom_name != "MARIO&LUIGIUAP": logger.info( "ERROR: The patch file used to create this ROM is not compatible with " "this client. Double check your client version against the version being " "used by the generator." ) return False except UnicodeDecodeError: return False except bizhawk.RequestFailedError: return False # Should verify on the next pass ctx.game = self.game ctx.items_handling = 0b101 ctx.want_slot_data = True ctx.watcher_timeout = 0.125 self.rom_slot_name = rom_name self.seed_verify = False name_bytes = (await bizhawk.read(ctx.bizhawk_ctx, [(0xDF0000, 16, "ROM")]))[0] name = bytes([byte for byte in name_bytes if byte != 0]).decode("UTF-8") self.player_name = name for i in range(59): self.checked_flags[i] = [] return True async def set_auth(self, ctx: "BizHawkClientContext") -> None: ctx.auth = self.player_name def on_package(self, ctx, cmd, args) -> None: if cmd == "RoomInfo": ctx.seed_name = args["seed_name"] async def game_watcher(self, ctx: "BizHawkClientContext") -> None: from CommonClient import logger try: if ctx.seed_name is None: return if not self.seed_verify: seed = await bizhawk.read(ctx.bizhawk_ctx, [(0xDF00A0, len(ctx.seed_name), "ROM")]) seed = seed[0].decode("UTF-8") if seed != ctx.seed_name: logger.info( "ERROR: The ROM you loaded is for a different game of AP. " "Please make sure the host has sent you the correct patch file," "and that you have opened the correct ROM." ) raise bizhawk.ConnectorError("Loaded ROM is for Incorrect lobby.") self.seed_verify = True read_state = await bizhawk.read( ctx.bizhawk_ctx, [ (0x4564, 59, "EWRAM"), (0x2330, 2, "IWRAM"), (0x3FE0, 1, "IWRAM"), (0x304A, 1, "EWRAM"), (0x304B, 1, "EWRAM"), (0x304C, 4, "EWRAM"), (0x3060, 6, "EWRAM"), (0x4808, 2, "EWRAM"), (0x4407, 1, "EWRAM"), (0x2339, 1, "IWRAM"), ] ) flags = read_state[0] current_room = int.from_bytes(read_state[1], "little") shop_init = read_state[2][0] shop_scroll = read_state[3][0] & 0x1F is_buy = read_state[4][0] != 0 shop_address = (struct.unpack("= 0xDA0000 and location not in self.local_events: self.local_events += [location] await ctx.send_msgs( [ { "cmd": "Set", "key": f"mlss_flag_{ctx.team}_{ctx.slot}", "default": 0, "want_reply": False, "operations": [{"operation": "or", "value": 1 << (location - 0xDA0000)}], } ] ) continue if location in roomException: if current_room not in roomException[location]: exception = True else: exception = False else: exception = True if location in eReward: if location not in self.eUsed: self.eUsed += [location] location = eReward[len(self.eUsed) - 1] else: continue if (location in ctx.server_locations) and exception: locs_to_send.add(location) # Check for set location flags. for byte_i, byte in enumerate(bytearray(flags)): for j in range(8): if j in self.checked_flags[byte_i]: continue and_value = 1 << j if byte & and_value != 0: flag_id = byte_i * 8 + (j + 1) room, item = find_key(roomCount, flag_id) pointer_arr = await bizhawk.read( ctx.bizhawk_ctx, [(ROOM_ARRAY_POINTER + ((room - 1) * 4), 4, "ROM")] ) pointer = struct.unpack(" value: leftover -= value else: return key, leftover def id_to_RAM(id_: int): code = id_ if 0x1C <= code <= 0x1F: code += 0xE if 0x20 <= code <= 0x26: code -= 0x4 return code