140 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			140 lines
		
	
	
		
			5.3 KiB
		
	
	
	
		
			Python
		
	
	
	
import math
 | 
						|
from typing import TYPE_CHECKING, List, Optional, Set
 | 
						|
 | 
						|
from NetUtils import ClientStatus, NetworkItem
 | 
						|
 | 
						|
import worlds._bizhawk as bizhawk
 | 
						|
from worlds._bizhawk.client import BizHawkClient
 | 
						|
from . import item_to_index
 | 
						|
 | 
						|
if TYPE_CHECKING:
 | 
						|
    from worlds._bizhawk.context import BizHawkClientContext
 | 
						|
 | 
						|
 | 
						|
class YuGiOh2006Client(BizHawkClient):
 | 
						|
    game = "Yu-Gi-Oh! 2006"
 | 
						|
    system = "GBA"
 | 
						|
    patch_suffix = ".apygo06"
 | 
						|
    local_checked_locations: Set[int]
 | 
						|
    goal_flag: int
 | 
						|
    rom_slot_name: Optional[str]
 | 
						|
 | 
						|
    def __init__(self) -> None:
 | 
						|
        super().__init__()
 | 
						|
        self.local_checked_locations = set()
 | 
						|
        self.rom_slot_name = None
 | 
						|
 | 
						|
    async def validate_rom(self, ctx: "BizHawkClientContext") -> bool:
 | 
						|
        from CommonClient import logger
 | 
						|
 | 
						|
        try:
 | 
						|
            # Check if ROM is some version of Yu-Gi-Oh! 2006
 | 
						|
            game_name = ((await bizhawk.read(ctx.bizhawk_ctx, [(0xA0, 11, "ROM")]))[0]).decode("ascii")
 | 
						|
            if game_name != "YUGIOHWCT06":
 | 
						|
                return False
 | 
						|
 | 
						|
            # Check if we can read the slot name. Doing this here instead of set_auth as a protection against
 | 
						|
            # validating a ROM where there's no slot name to read.
 | 
						|
            try:
 | 
						|
                slot_name_bytes = (await bizhawk.read(ctx.bizhawk_ctx, [(0x30, 32, "ROM")]))[0]
 | 
						|
                self.rom_slot_name = bytes([byte for byte in slot_name_bytes if byte != 0]).decode("utf-8")
 | 
						|
            except UnicodeDecodeError:
 | 
						|
                logger.info("Could not read slot name from ROM. Are you sure this ROM matches this client version?")
 | 
						|
                return False
 | 
						|
        except UnicodeDecodeError:
 | 
						|
            return False
 | 
						|
        except bizhawk.RequestFailedError:
 | 
						|
            return False  # Should verify on the next pass
 | 
						|
 | 
						|
        ctx.game = self.game
 | 
						|
        ctx.items_handling = 0b001
 | 
						|
        ctx.want_slot_data = False
 | 
						|
        return True
 | 
						|
 | 
						|
    async def set_auth(self, ctx: "BizHawkClientContext") -> None:
 | 
						|
        ctx.auth = self.rom_slot_name
 | 
						|
 | 
						|
    async def game_watcher(self, ctx: "BizHawkClientContext") -> None:
 | 
						|
        try:
 | 
						|
            read_state = await bizhawk.read(
 | 
						|
                ctx.bizhawk_ctx,
 | 
						|
                [
 | 
						|
                    (0x0, 8, "EWRAM"),
 | 
						|
                    (0x52E8, 32, "EWRAM"),
 | 
						|
                    (0x5308, 32, "EWRAM"),
 | 
						|
                    (0x5325, 1, "EWRAM"),
 | 
						|
                    (0x6C38, 4, "EWRAM"),
 | 
						|
                ],
 | 
						|
            )
 | 
						|
            game_state = read_state[0].decode("utf-8")
 | 
						|
            locations = read_state[1]
 | 
						|
            items = read_state[2]
 | 
						|
            amount_items = int.from_bytes(read_state[3], "little")
 | 
						|
            money = int.from_bytes(read_state[4], "little")
 | 
						|
 | 
						|
            # make sure save was created
 | 
						|
            if game_state != "YWCT2006":
 | 
						|
                return
 | 
						|
            local_items = bytearray(items)
 | 
						|
            await bizhawk.guarded_write(
 | 
						|
                ctx.bizhawk_ctx,
 | 
						|
                [(0x5308, parse_items(bytearray(items), ctx.items_received), "EWRAM")],
 | 
						|
                [(0x5308, local_items, "EWRAM")],
 | 
						|
            )
 | 
						|
            money_received = 0
 | 
						|
            for item in ctx.items_received:
 | 
						|
                if item.item == item_to_index["5000DP"] + 5730000:
 | 
						|
                    money_received += 1
 | 
						|
            if money_received > amount_items:
 | 
						|
                await bizhawk.guarded_write(
 | 
						|
                    ctx.bizhawk_ctx,
 | 
						|
                    [
 | 
						|
                        (0x6C38, (money + (money_received - amount_items) * 5000).to_bytes(4, "little"), "EWRAM"),
 | 
						|
                        (0x5325, money_received.to_bytes(2, "little"), "EWRAM"),
 | 
						|
                    ],
 | 
						|
                    [
 | 
						|
                        (0x6C38, money.to_bytes(4, "little"), "EWRAM"),
 | 
						|
                        (0x5325, amount_items.to_bytes(2, "little"), "EWRAM"),
 | 
						|
                    ],
 | 
						|
                )
 | 
						|
 | 
						|
            locs_to_send = set()
 | 
						|
 | 
						|
            # Check for set location flags.
 | 
						|
            for byte_i, byte in enumerate(bytearray(locations)):
 | 
						|
                for i in range(8):
 | 
						|
                    and_value = 1 << i
 | 
						|
                    if byte & and_value != 0:
 | 
						|
                        flag_id = byte_i * 8 + i
 | 
						|
 | 
						|
                        location_id = flag_id + 5730001
 | 
						|
                        if location_id in ctx.server_locations:
 | 
						|
                            locs_to_send.add(location_id)
 | 
						|
 | 
						|
            # Send locations if there are any to send.
 | 
						|
            if locs_to_send != self.local_checked_locations:
 | 
						|
                self.local_checked_locations = locs_to_send
 | 
						|
 | 
						|
                if locs_to_send is not None:
 | 
						|
                    await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locs_to_send)}])
 | 
						|
 | 
						|
            # Send game clear if we're in either any ending cutscene or the credits state.
 | 
						|
            if not ctx.finished_game and locations[18] & (1 << 5) != 0:
 | 
						|
                await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
 | 
						|
 | 
						|
        except bizhawk.RequestFailedError:
 | 
						|
            # Exit handler and return to main loop to reconnect.
 | 
						|
            pass
 | 
						|
 | 
						|
 | 
						|
# Parses bit-map for local items and adds the received items to that bit-map
 | 
						|
def parse_items(local_items: bytearray, items: List[NetworkItem]) -> bytearray:
 | 
						|
    array = local_items
 | 
						|
    for item in items:
 | 
						|
        index = item.item - 5730001
 | 
						|
        if index != 254:
 | 
						|
            byte = math.floor(index / 8)
 | 
						|
            bit = index % 8
 | 
						|
            array[byte] = array[byte] | (1 << bit)
 | 
						|
    return array
 |