import logging import time from enum import IntEnum from base64 import b64encode from typing import TYPE_CHECKING, Dict, Tuple, List, Optional, Any from NetUtils import ClientStatus, color, NetworkItem from worlds._bizhawk.client import BizHawkClient if TYPE_CHECKING: from worlds._bizhawk.context import BizHawkClientContext, BizHawkClientCommandProcessor nes_logger = logging.getLogger("NES") logger = logging.getLogger("Client") MM2_ROBOT_MASTERS_UNLOCKED = 0x8A MM2_ROBOT_MASTERS_DEFEATED = 0x8B MM2_ITEMS_ACQUIRED = 0x8C MM2_LAST_WILY = 0x8D MM2_RECEIVED_ITEMS = 0x8E MM2_DEATHLINK = 0x8F MM2_ENERGYLINK = 0x90 MM2_RBM_STROBE = 0x91 MM2_WEAPONS_UNLOCKED = 0x9A MM2_ITEMS_UNLOCKED = 0x9B MM2_WEAPON_ENERGY = 0x9C MM2_E_TANKS = 0xA7 MM2_LIVES = 0xA8 MM2_DIFFICULTY = 0xCB MM2_HEALTH = 0x6C0 MM2_COMPLETED_STAGES = 0x770 MM2_CONSUMABLES = 0x780 MM2_SFX_QUEUE = 0x580 MM2_SFX_STROBE = 0x66 MM2_CONSUMABLE_TABLE: Dict[int, Tuple[int, int]] = { # Item: (byte offset, bit mask) 0x880201: (0, 8), 0x880202: (16, 1), 0x880203: (16, 2), 0x880204: (16, 4), 0x880205: (16, 8), 0x880206: (16, 16), 0x880207: (16, 32), 0x880208: (16, 64), 0x880209: (16, 128), 0x88020A: (20, 1), 0x88020B: (20, 4), 0x88020C: (20, 64), 0x88020D: (21, 1), 0x88020E: (21, 2), 0x88020F: (21, 4), 0x880210: (24, 1), 0x880211: (24, 2), 0x880212: (24, 4), 0x880213: (28, 1), 0x880214: (28, 2), 0x880215: (28, 4), 0x880216: (33, 4), 0x880217: (33, 8), 0x880218: (37, 8), 0x880219: (37, 16), 0x88021A: (38, 1), 0x88021B: (38, 2), 0x880227: (38, 4), 0x880228: (38, 32), 0x880229: (38, 128), 0x88022A: (39, 4), 0x88022B: (39, 2), 0x88022C: (39, 1), 0x88022D: (38, 64), 0x88022E: (38, 16), 0x88022F: (38, 8), 0x88021C: (39, 32), 0x88021D: (39, 64), 0x88021E: (39, 128), 0x88021F: (41, 16), 0x880220: (42, 2), 0x880221: (42, 4), 0x880222: (42, 8), 0x880223: (46, 1), 0x880224: (46, 2), 0x880225: (46, 4), 0x880226: (46, 8), } class MM2EnergyLinkType(IntEnum): Life = 0 AtomicFire = 1 AirShooter = 2 LeafShield = 3 BubbleLead = 4 QuickBoomerang = 5 TimeStopper = 6 MetalBlade = 7 CrashBomber = 8 Item1 = 9 Item2 = 10 Item3 = 11 OneUP = 12 request_to_name: Dict[str, str] = { "HP": "health", "AF": "Atomic Fire energy", "AS": "Air Shooter energy", "LS": "Leaf Shield energy", "BL": "Bubble Lead energy", "QB": "Quick Boomerang energy", "TS": "Time Stopper energy", "MB": "Metal Blade energy", "CB": "Crash Bomber energy", "I1": "Item 1 energy", "I2": "Item 2 energy", "I3": "Item 3 energy", "1U": "lives" } HP_EXCHANGE_RATE = 500000000 WEAPON_EXCHANGE_RATE = 250000000 ONEUP_EXCHANGE_RATE = 14000000000 def cmd_pool(self: "BizHawkClientCommandProcessor") -> None: """Check the current pool of EnergyLink, and requestable refills from it.""" if self.ctx.game != "Mega Man 2": logger.warning("This command can only be used when playing Mega Man 2.") return if not self.ctx.server or not self.ctx.slot: logger.warning("You must be connected to a server to use this command.") return energylink = self.ctx.stored_data.get(f"EnergyLink{self.ctx.team}", 0) health_points = energylink // HP_EXCHANGE_RATE weapon_points = energylink // WEAPON_EXCHANGE_RATE lives = energylink // ONEUP_EXCHANGE_RATE logger.info(f"Healing available: {health_points}\n" f"Weapon refill available: {weapon_points}\n" f"Lives available: {lives}") def cmd_request(self: "BizHawkClientCommandProcessor", amount: str, target: str) -> None: from worlds._bizhawk.context import BizHawkClientContext """Request a refill from EnergyLink.""" if self.ctx.game != "Mega Man 2": logger.warning("This command can only be used when playing Mega Man 2.") return if not self.ctx.server or not self.ctx.slot: logger.warning("You must be connected to a server to use this command.") return valid_targets: Dict[str, MM2EnergyLinkType] = { "HP": MM2EnergyLinkType.Life, "AF": MM2EnergyLinkType.AtomicFire, "AS": MM2EnergyLinkType.AirShooter, "LS": MM2EnergyLinkType.LeafShield, "BL": MM2EnergyLinkType.BubbleLead, "QB": MM2EnergyLinkType.QuickBoomerang, "TS": MM2EnergyLinkType.TimeStopper, "MB": MM2EnergyLinkType.MetalBlade, "CB": MM2EnergyLinkType.CrashBomber, "I1": MM2EnergyLinkType.Item1, "I2": MM2EnergyLinkType.Item2, "I3": MM2EnergyLinkType.Item3, "1U": MM2EnergyLinkType.OneUP } if target.upper() not in valid_targets: logger.warning(f"Unrecognized target {target.upper()}. Available targets: {', '.join(valid_targets.keys())}") return ctx = self.ctx assert isinstance(ctx, BizHawkClientContext) client = ctx.client_handler assert isinstance(client, MegaMan2Client) client.refill_queue.append((valid_targets[target.upper()], int(amount))) logger.info(f"Restoring {amount} {request_to_name[target.upper()]}.") def cmd_autoheal(self) -> None: """Enable auto heal from EnergyLink.""" if self.ctx.game != "Mega Man 2": logger.warning("This command can only be used when playing Mega Man 2.") return if not self.ctx.server or not self.ctx.slot: logger.warning("You must be connected to a server to use this command.") return else: assert isinstance(self.ctx.client_handler, MegaMan2Client) if self.ctx.client_handler.auto_heal: self.ctx.client_handler.auto_heal = False logger.info(f"Auto healing disabled.") else: self.ctx.client_handler.auto_heal = True logger.info(f"Auto healing enabled.") def get_sfx_writes(sfx: int) -> Tuple[Tuple[int, bytes, str], ...]: return (MM2_SFX_QUEUE, sfx.to_bytes(1, 'little'), "RAM"), (MM2_SFX_STROBE, 0x01.to_bytes(1, "little"), "RAM") class MegaMan2Client(BizHawkClient): game = "Mega Man 2" system = "NES" patch_suffix = ".apmm2" item_queue: List[NetworkItem] = [] pending_death_link: bool = False # default to true, as we don't want to send a deathlink until Mega Man's HP is initialized once sending_death_link: bool = True death_link: bool = False energy_link: bool = False rom: Optional[bytes] = None weapon_energy: int = 0 health_energy: int = 0 auto_heal: bool = False refill_queue: List[Tuple[MM2EnergyLinkType, int]] = [] last_wily: Optional[int] = None # default to wily 1 async def validate_rom(self, ctx: "BizHawkClientContext") -> bool: from worlds._bizhawk import RequestFailedError, read from . import MM2World try: game_name, version = (await read(ctx.bizhawk_ctx, [(0x3FFB0, 21, "PRG ROM"), (0x3FFC8, 3, "PRG ROM")])) if game_name[:3] != b"MM2" or version != bytes(MM2World.world_version): if game_name[:3] == b"MM2": # I think this is an easier check than the other? older_version = "0.2.1" if version == b"\xFF\xFF\xFF" else f"{version[0]}.{version[1]}.{version[2]}" logger.warning(f"This Mega Man 2 patch was generated for an different version of the apworld. " f"Please use that version to connect instead.\n" f"Patch version: ({older_version})\n" f"Client version: ({'.'.join([str(i) for i in MM2World.world_version])})") if "pool" in ctx.command_processor.commands: ctx.command_processor.commands.pop("pool") if "request" in ctx.command_processor.commands: ctx.command_processor.commands.pop("request") if "autoheal" in ctx.command_processor.commands: ctx.command_processor.commands.pop("autoheal") return False except UnicodeDecodeError: return False except RequestFailedError: return False # Should verify on the next pass ctx.game = self.game self.rom = game_name ctx.items_handling = 0b111 ctx.want_slot_data = False deathlink = (await read(ctx.bizhawk_ctx, [(0x3FFC5, 1, "PRG ROM")]))[0][0] if deathlink & 0x01: self.death_link = True if deathlink & 0x02: self.energy_link = True if self.energy_link: if "pool" not in ctx.command_processor.commands: ctx.command_processor.commands["pool"] = cmd_pool if "request" not in ctx.command_processor.commands: ctx.command_processor.commands["request"] = cmd_request if "autoheal" not in ctx.command_processor.commands: ctx.command_processor.commands["autoheal"] = cmd_autoheal return True async def set_auth(self, ctx: "BizHawkClientContext") -> None: if self.rom: ctx.auth = b64encode(self.rom).decode() def on_package(self, ctx: "BizHawkClientContext", cmd: str, args: Dict[str, Any]) -> None: if cmd == "Bounced": if "tags" in args: assert ctx.slot is not None if "DeathLink" in args["tags"] and args["data"]["source"] != ctx.slot_info[ctx.slot].name: self.on_deathlink(ctx) elif cmd == "Retrieved": if f"MM2_LAST_WILY_{ctx.team}_{ctx.slot}" in args["keys"]: self.last_wily = args["keys"][f"MM2_LAST_WILY_{ctx.team}_{ctx.slot}"] elif cmd == "Connected": if self.energy_link: ctx.set_notify(f"EnergyLink{ctx.team}") if ctx.ui: ctx.ui.enable_energy_link() async def send_deathlink(self, ctx: "BizHawkClientContext") -> None: self.sending_death_link = True ctx.last_death_link = time.time() await ctx.send_death("Mega Man was defeated.") def on_deathlink(self, ctx: "BizHawkClientContext") -> None: ctx.last_death_link = time.time() self.pending_death_link = True async def game_watcher(self, ctx: "BizHawkClientContext") -> None: from worlds._bizhawk import read, write if ctx.server is None: return if ctx.slot is None: return # get our relevant bytes robot_masters_unlocked, robot_masters_defeated, items_acquired, \ weapons_unlocked, items_unlocked, items_received, \ completed_stages, consumable_checks, \ e_tanks, lives, weapon_energy, health, difficulty, death_link_status, \ energy_link_packet, last_wily = await read(ctx.bizhawk_ctx, [ (MM2_ROBOT_MASTERS_UNLOCKED, 1, "RAM"), (MM2_ROBOT_MASTERS_DEFEATED, 1, "RAM"), (MM2_ITEMS_ACQUIRED, 1, "RAM"), (MM2_WEAPONS_UNLOCKED, 1, "RAM"), (MM2_ITEMS_UNLOCKED, 1, "RAM"), (MM2_RECEIVED_ITEMS, 1, "RAM"), (MM2_COMPLETED_STAGES, 0xE, "RAM"), (MM2_CONSUMABLES, 52, "RAM"), (MM2_E_TANKS, 1, "RAM"), (MM2_LIVES, 1, "RAM"), (MM2_WEAPON_ENERGY, 11, "RAM"), (MM2_HEALTH, 1, "RAM"), (MM2_DIFFICULTY, 1, "RAM"), (MM2_DEATHLINK, 1, "RAM"), (MM2_ENERGYLINK, 1, "RAM"), (MM2_LAST_WILY, 1, "RAM"), ]) if difficulty[0] not in (0, 1): return # Game is not initialized if not ctx.finished_game and completed_stages[0xD] != 0: # this sets on credits fade, no real better way to do this await ctx.send_msgs([{ "cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL }]) writes = [] # deathlink if self.death_link: await ctx.update_death_link(self.death_link) if self.pending_death_link: writes.append((MM2_DEATHLINK, bytes([0x01]), "RAM")) self.pending_death_link = False self.sending_death_link = True if "DeathLink" in ctx.tags and ctx.last_death_link + 1 < time.time(): if health[0] == 0x00 and not self.sending_death_link: await self.send_deathlink(ctx) elif health[0] != 0x00 and not death_link_status[0]: self.sending_death_link = False if self.last_wily != last_wily[0]: if self.last_wily is None: # revalidate last wily from data storage await ctx.send_msgs([{"cmd": "Set", "key": f"MM2_LAST_WILY_{ctx.team}_{ctx.slot}", "operations": [ {"operation": "default", "value": 8} ]}]) await ctx.send_msgs([{"cmd": "Get", "keys": [f"MM2_LAST_WILY_{ctx.team}_{ctx.slot}"]}]) elif last_wily[0] == 0: writes.append((MM2_LAST_WILY, self.last_wily.to_bytes(1, "little"), "RAM")) else: # correct our setting self.last_wily = last_wily[0] await ctx.send_msgs([{"cmd": "Set", "key": f"MM2_LAST_WILY_{ctx.team}_{ctx.slot}", "operations": [ {"operation": "replace", "value": self.last_wily} ]}]) # handle receiving items recv_amount = items_received[0] if recv_amount < len(ctx.items_received): item = ctx.items_received[recv_amount] logging.info('Received %s from %s (%s) (%d/%d in list)' % ( color(ctx.item_names.lookup_in_game(item.item), 'red', 'bold'), color(ctx.player_names[item.player], 'yellow'), ctx.location_names.lookup_in_slot(item.location, item.player), recv_amount, len(ctx.items_received))) if item.item & 0x130 == 0: # Robot Master Weapon new_weapons = weapons_unlocked[0] | (1 << ((item.item & 0xF) - 1)) writes.append((MM2_WEAPONS_UNLOCKED, new_weapons.to_bytes(1, 'little'), "RAM")) writes.extend(get_sfx_writes(0x21)) elif item.item & 0x30 == 0: # Robot Master Stage Access new_stages = robot_masters_unlocked[0] & ~(1 << ((item.item & 0xF) - 1)) writes.append((MM2_ROBOT_MASTERS_UNLOCKED, new_stages.to_bytes(1, 'little'), "RAM")) writes.extend(get_sfx_writes(0x3a)) writes.append((MM2_RBM_STROBE, b"\x01", "RAM")) elif item.item & 0x20 == 0: # Items new_items = items_unlocked[0] | (1 << ((item.item & 0xF) - 1)) writes.append((MM2_ITEMS_UNLOCKED, new_items.to_bytes(1, 'little'), "RAM")) writes.extend(get_sfx_writes(0x21)) else: # append to the queue, so we handle it later self.item_queue.append(item) recv_amount += 1 writes.append((MM2_RECEIVED_ITEMS, recv_amount.to_bytes(1, 'little'), "RAM")) if energy_link_packet[0]: pickup = energy_link_packet[0] if pickup in (0x76, 0x77): # Health pickups if pickup == 0x77: value = 2 else: value = 10 exchange_rate = HP_EXCHANGE_RATE elif pickup in (0x78, 0x79): # Weapon Energy if pickup == 0x79: value = 2 else: value = 10 exchange_rate = WEAPON_EXCHANGE_RATE elif pickup == 0x7B: # 1-Up value = 1 exchange_rate = ONEUP_EXCHANGE_RATE else: # if we managed to pickup something else, we should just fall through value = 0 exchange_rate = 0 contribution = (value * exchange_rate) >> 1 if contribution: await ctx.send_msgs([{ "cmd": "Set", "key": f"EnergyLink{ctx.team}", "slot": ctx.slot, "operations": [{"operation": "add", "value": contribution}, {"operation": "max", "value": 0}]}]) logger.info(f"Deposited {contribution / HP_EXCHANGE_RATE} health into the pool.") writes.append((MM2_ENERGYLINK, 0x00.to_bytes(1, "little"), "RAM")) if self.weapon_energy: # Weapon Energy # We parse the whole thing to spread it as thin as possible current_energy = self.weapon_energy weapon_energy = bytearray(weapon_energy) for i, weapon in zip(range(len(weapon_energy)), weapon_energy): if weapon < 0x1C: missing = 0x1C - weapon if missing > self.weapon_energy: missing = self.weapon_energy self.weapon_energy -= missing weapon_energy[i] = weapon + missing if not self.weapon_energy: writes.append((MM2_WEAPON_ENERGY, weapon_energy, "RAM")) break else: if current_energy != self.weapon_energy: writes.append((MM2_WEAPON_ENERGY, weapon_energy, "RAM")) if self.health_energy or self.auto_heal: # Health Energy # We save this if the player has not taken any damage current_health = health[0] if 0 < current_health < 0x1C: health_diff = 0x1C - current_health if self.health_energy: if health_diff > self.health_energy: health_diff = self.health_energy self.health_energy -= health_diff else: pool = ctx.stored_data.get(f"EnergyLink{ctx.team}", 0) if health_diff * HP_EXCHANGE_RATE > pool: health_diff = int(pool // HP_EXCHANGE_RATE) await ctx.send_msgs([{ "cmd": "Set", "key": f"EnergyLink{ctx.team}", "slot": ctx.slot, "operations": [{"operation": "add", "value": -health_diff * HP_EXCHANGE_RATE}, {"operation": "max", "value": 0}]}]) current_health += health_diff writes.append((MM2_HEALTH, current_health.to_bytes(1, 'little'), "RAM")) if self.refill_queue: refill_type, refill_amount = self.refill_queue.pop() if refill_type == MM2EnergyLinkType.Life: exchange_rate = HP_EXCHANGE_RATE elif refill_type == MM2EnergyLinkType.OneUP: exchange_rate = ONEUP_EXCHANGE_RATE else: exchange_rate = WEAPON_EXCHANGE_RATE pool = ctx.stored_data.get(f"EnergyLink{ctx.team}", 0) request = exchange_rate * refill_amount if request > pool: logger.warning( f"Not enough energy to fulfill the request. Maximum request: {pool // exchange_rate}") else: await ctx.send_msgs([{ "cmd": "Set", "key": f"EnergyLink{ctx.team}", "slot": ctx.slot, "operations": [{"operation": "add", "value": -request}, {"operation": "max", "value": 0}]}]) if refill_type == MM2EnergyLinkType.Life: refill_ptr = MM2_HEALTH elif refill_type == MM2EnergyLinkType.OneUP: refill_ptr = MM2_LIVES else: refill_ptr = MM2_WEAPON_ENERGY - 1 + refill_type current_value = (await read(ctx.bizhawk_ctx, [(refill_ptr, 1, "RAM")]))[0][0] new_value = min(0x1C if refill_type != MM2EnergyLinkType.OneUP else 99, current_value + refill_amount) writes.append((refill_ptr, new_value.to_bytes(1, "little"), "RAM")) if len(self.item_queue): item = self.item_queue.pop(0) idx = item.item & 0xF if idx == 0: # 1-Up current_lives = lives[0] if current_lives > 99: self.item_queue.append(item) else: current_lives += 1 writes.append((MM2_LIVES, current_lives.to_bytes(1, 'little'), "RAM")) writes.extend(get_sfx_writes(0x42)) elif idx == 1: self.weapon_energy += 0xE writes.extend(get_sfx_writes(0x28)) elif idx == 2: self.health_energy += 0xE writes.extend(get_sfx_writes(0x28)) elif idx == 3: # E-Tank # visuals only allow 4, but we're gonna go up to 9 anyway? May change current_tanks = e_tanks[0] if current_tanks < 9: current_tanks += 1 writes.append((MM2_E_TANKS, current_tanks.to_bytes(1, 'little'), "RAM")) writes.extend(get_sfx_writes(0x42)) else: self.item_queue.append(item) await write(ctx.bizhawk_ctx, writes) new_checks = [] # check for locations for i in range(8): flag = 1 << i if robot_masters_defeated[0] & flag: wep_id = 0x880101 + i if wep_id not in ctx.checked_locations: new_checks.append(wep_id) for i in range(3): flag = 1 << i if items_acquired[0] & flag: itm_id = 0x880111 + i if itm_id not in ctx.checked_locations: new_checks.append(itm_id) for i in range(0xD): rbm_id = 0x880001 + i if completed_stages[i] != 0: if rbm_id not in ctx.checked_locations: new_checks.append(rbm_id) for consumable in MM2_CONSUMABLE_TABLE: if consumable not in ctx.checked_locations: is_checked = consumable_checks[MM2_CONSUMABLE_TABLE[consumable][0]] \ & MM2_CONSUMABLE_TABLE[consumable][1] if is_checked: new_checks.append(consumable) for new_check_id in new_checks: ctx.locations_checked.add(new_check_id) location = ctx.location_names.lookup_in_game(new_check_id) nes_logger.info( f'New Check: {location} ({len(ctx.locations_checked)}/' f'{len(ctx.missing_locations) + len(ctx.checked_locations)})') await ctx.send_msgs([{"cmd": 'LocationChecks', "locations": [new_check_id]}])