import base64 import logging import time from NetUtils import ClientStatus from worlds._bizhawk.client import BizHawkClient from worlds._bizhawk import read, write, guarded_write from .locations import location_data logger = logging.getLogger("Client") BANK_EXCHANGE_RATE = 50000000 DATA_LOCATIONS = { "ItemIndex": (0x1A6E, 0x02), "Deathlink": (0x00FD, 0x01), "APItem": (0x00FF, 0x01), "EventFlag": (0x1735, 0x140), "Missable": (0x161A, 0x20), "Hidden": (0x16DE, 0x0E), "Rod": (0x1716, 0x01), "DexSanityFlag": (0x1A71, 19), "GameStatus": (0x1A84, 0x01), "Money": (0x141F, 3), "ResetCheck": (0x0100, 4), # First and second Vermilion Gym trash can selection. Second is not used, so should always be 0. # First should never be above 0x0F. This is just before Event Flags. "CrashCheck1": (0x1731, 2), # Unused, should always be 0. This is just before Missables flags. "CrashCheck2": (0x1617, 1), # Progressive keys, should never be above 10. Just before Dexsanity flags. "CrashCheck3": (0x1A70, 1), # Route 18 Gate script value. Should never be above 3. Just before Hidden items flags. "CrashCheck4": (0x16DD, 1), } location_map = {"Rod": {}, "EventFlag": {}, "Missable": {}, "Hidden": {}, "list": {}, "DexSanityFlag": {}} location_bytes_bits = {} for location in location_data: if location.ram_address is not None: if type(location.ram_address) == list: location_map[type(location.ram_address).__name__][(location.ram_address[0].flag, location.ram_address[1].flag)] = location.address location_bytes_bits[location.address] = [{'byte': location.ram_address[0].byte, 'bit': location.ram_address[0].bit}, {'byte': location.ram_address[1].byte, 'bit': location.ram_address[1].bit}] else: location_map[type(location.ram_address).__name__][location.ram_address.flag] = location.address location_bytes_bits[location.address] = {'byte': location.ram_address.byte, 'bit': location.ram_address.bit} location_name_to_id = {location.name: location.address for location in location_data if location.type == "Item" and location.address is not None} class PokemonRBClient(BizHawkClient): system = ("GB", "SGB") patch_suffix = (".apred", ".apblue") game = "Pokemon Red and Blue" def __init__(self): super().__init__() self.auto_hints = set() self.locations_array = None self.disconnect_pending = False self.set_deathlink = False self.banking_command = None self.game_state = False self.last_death_link = 0 async def validate_rom(self, ctx): game_name = await read(ctx.bizhawk_ctx, [(0x134, 12, "ROM")]) game_name = game_name[0].decode("ascii") if game_name in ("POKEMON RED\00", "POKEMON BLUE"): ctx.game = self.game ctx.items_handling = 0b001 ctx.command_processor.commands["bank"] = cmd_bank seed_name = await read(ctx.bizhawk_ctx, [(0xFFDB, 21, "ROM")]) ctx.seed_name = seed_name[0].split(b"\0")[0].decode("ascii") self.set_deathlink = False self.banking_command = None self.locations_array = None self.disconnect_pending = False return True return False async def set_auth(self, ctx): auth_name = await read(ctx.bizhawk_ctx, [(0xFFC6, 21, "ROM")]) if auth_name[0] == bytes([0] * 21): # rom was patched before rom names implemented, use player name auth_name = await read(ctx.bizhawk_ctx, [(0xFFF0, 16, "ROM")]) auth_name = auth_name[0].decode("ascii").split("\x00")[0] else: auth_name = base64.b64encode(auth_name[0]).decode() ctx.auth = auth_name async def game_watcher(self, ctx): if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed: return data = await read(ctx.bizhawk_ctx, [(loc_data[0], loc_data[1], "WRAM") for loc_data in DATA_LOCATIONS.values()]) data = {data_set_name: data_name for data_set_name, data_name in zip(DATA_LOCATIONS.keys(), data)} if self.set_deathlink: self.set_deathlink = False await ctx.update_death_link(True) if self.disconnect_pending: self.disconnect_pending = False await ctx.disconnect() if data["GameStatus"][0] == 0 or data["ResetCheck"] == b'\xff\xff\xff\x7f': # Do not handle anything before game save is loaded self.game_state = False return elif (data["GameStatus"][0] not in (0x2A, 0xAC) or data["CrashCheck1"][0] & 0xF0 or data["CrashCheck1"][1] & 0xFF or data["CrashCheck2"][0] or data["CrashCheck3"][0] > 10 or data["CrashCheck4"][0] > 3): # Should mean game crashed logger.warning("Pokémon Red/Blue game may have crashed. Disconnecting from server.") self.game_state = False await ctx.disconnect() return self.game_state = True # SEND ITEMS TO CLIENT if data["APItem"][0] == 0: item_index = int.from_bytes(data["ItemIndex"], "little") if len(ctx.items_received) > item_index: item_code = ctx.items_received[item_index].item - 172000000 if item_code > 255: item_code -= 256 await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["APItem"][0], [item_code], "WRAM")]) # LOCATION CHECKS locations = set() for flag_type, loc_map in location_map.items(): for flag, loc_id in loc_map.items(): if flag_type == "list": if (data["EventFlag"][location_bytes_bits[loc_id][0]['byte']] & 1 << location_bytes_bits[loc_id][0]['bit'] and data["Missable"][location_bytes_bits[loc_id][1]['byte']] & 1 << location_bytes_bits[loc_id][1]['bit']): locations.add(loc_id) elif data[flag_type][location_bytes_bits[loc_id]['byte']] & 1 << location_bytes_bits[loc_id]['bit']: locations.add(loc_id) if locations != self.locations_array: if locations: self.locations_array = locations await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locations)}]) # AUTO HINTS hints = [] if data["EventFlag"][280] & 16: hints.append("Cerulean Bicycle Shop") if data["EventFlag"][280] & 32: hints.append("Route 2 Gate - Oak's Aide") if data["EventFlag"][280] & 64: hints.append("Route 11 Gate 2F - Oak's Aide") if data["EventFlag"][280] & 128: hints.append("Route 15 Gate 2F - Oak's Aide") if data["EventFlag"][281] & 1: hints += ["Celadon Prize Corner - Item Prize 1", "Celadon Prize Corner - Item Prize 2", "Celadon Prize Corner - Item Prize 3"] if (location_name_to_id["Fossil - Choice A"] in ctx.checked_locations and location_name_to_id[ "Fossil - Choice B"] not in ctx.checked_locations): hints.append("Fossil - Choice B") elif (location_name_to_id["Fossil - Choice B"] in ctx.checked_locations and location_name_to_id[ "Fossil - Choice A"] not in ctx.checked_locations): hints.append("Fossil - Choice A") hints = [ location_name_to_id[loc] for loc in hints if location_name_to_id[loc] not in self.auto_hints and location_name_to_id[loc] in ctx.missing_locations and location_name_to_id[loc] not in ctx.locations_checked ] if hints: await ctx.send_msgs([{"cmd": "LocationScouts", "locations": hints, "create_as_hint": 2}]) self.auto_hints.update(hints) # DEATHLINK if "DeathLink" in ctx.tags: if data["Deathlink"][0] == 3: await ctx.send_death(ctx.player_names[ctx.slot] + " is out of usable Pokémon! " + ctx.player_names[ctx.slot] + " blacked out!") await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [0], "WRAM")]) self.last_death_link = ctx.last_death_link elif ctx.last_death_link > self.last_death_link: self.last_death_link = ctx.last_death_link await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [1], "WRAM")]) # BANK if self.banking_command: original_money = data["Money"] # Money is stored as binary-coded decimal. money = int(original_money.hex()) if self.banking_command > money: logger.warning(f"You do not have ${self.banking_command} to deposit!") elif (-self.banking_command * BANK_EXCHANGE_RATE) > (ctx.stored_data[f"EnergyLink{ctx.team}"] or 0): logger.warning("Not enough money in the EnergyLink storage!") else: if self.banking_command + money > 999999: self.banking_command = 999999 - money money = str(money - self.banking_command).zfill(6) money = [int(money[:2], 16), int(money[2:4], 16), int(money[4:], 16)] money_written = await guarded_write(ctx.bizhawk_ctx, [(0x141F, money, "WRAM")], [(0x141F, original_money, "WRAM")]) if money_written: if self.banking_command >= 0: deposit = self.banking_command - int(self.banking_command / 4) tax = self.banking_command - deposit logger.info(f"Deposited ${deposit}, and charged a tax of ${tax}.") self.banking_command = deposit else: logger.info(f"Withdrew ${-self.banking_command}.") await ctx.send_msgs([{ "cmd": "Set", "key": f"EnergyLink{ctx.team}", "operations": [{"operation": "add", "value": self.banking_command * BANK_EXCHANGE_RATE}, {"operation": "max", "value": 0}], }]) self.banking_command = None # VICTORY if data["EventFlag"][280] & 1 and not ctx.finished_game: await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}]) ctx.finished_game = True def on_package(self, ctx, cmd, args): if cmd == 'Connected': if 'death_link' in args['slot_data'] and args['slot_data']['death_link']: self.set_deathlink = True self.last_death_link = time.time() ctx.set_notify(f"EnergyLink{ctx.team}") elif cmd == 'RoomInfo': if ctx.seed_name and ctx.seed_name != args["seed_name"]: # CommonClient's on_package displays an error to the user in this case, but connection is not cancelled. self.game_state = False self.disconnect_pending = True super().on_package(ctx, cmd, args) def cmd_bank(self, cmd: str = "", amount: str = ""): """Deposit or withdraw money with the server's EnergyLink storage. /bank - check server balance. /bank deposit # - deposit money. One quarter of the amount will be lost to taxation. /bank withdraw # - withdraw money.""" if self.ctx.game != "Pokemon Red and Blue": logger.warning("This command can only be used while playing Pokémon Red and Blue") return if (not self.ctx.server) or self.ctx.server.socket.closed or not self.ctx.client_handler.game_state: logger.info(f"Must be connected to server and in game.") return elif not cmd: logger.info(f"Money available: {int((self.ctx.stored_data[f'EnergyLink{self.ctx.team}'] or 0) / BANK_EXCHANGE_RATE)}") return elif not amount: logger.warning("You must specify an amount.") elif cmd == "withdraw": self.ctx.client_handler.banking_command = -int(amount) elif cmd == "deposit": if int(amount) < 4: logger.warning("You must deposit at least $4, for tax purposes.") return self.ctx.client_handler.banking_command = int(amount) else: logger.warning(f"Invalid bank command {cmd}") return