135 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			135 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
import logging
 | 
						|
import time
 | 
						|
import typing
 | 
						|
from logging import Logger
 | 
						|
from typing import Optional
 | 
						|
 | 
						|
from NetUtils import ClientStatus, NetworkItem
 | 
						|
from worlds.AutoSNIClient import SNIClient
 | 
						|
from .Enemies import enemy_id_to_name
 | 
						|
from .Items import start_id as items_start_id
 | 
						|
from .Locations import start_id as locations_start_id
 | 
						|
 | 
						|
if typing.TYPE_CHECKING:
 | 
						|
    from SNIClient import SNIContext
 | 
						|
else:
 | 
						|
    SNIContext = typing.Any
 | 
						|
 | 
						|
snes_logger: Logger = logging.getLogger("SNES")
 | 
						|
 | 
						|
SRAM_START: int = 0xE00000
 | 
						|
L2AC_ROMNAME_START: int = 0x007FC0
 | 
						|
L2AC_SIGN_ADDR: int = SRAM_START + 0x2000
 | 
						|
L2AC_GOAL_ADDR: int = SRAM_START + 0x2030
 | 
						|
L2AC_DEATH_ADDR: int = SRAM_START + 0x203D
 | 
						|
L2AC_TX_ADDR: int = SRAM_START + 0x2040
 | 
						|
L2AC_RX_ADDR: int = SRAM_START + 0x2800
 | 
						|
 | 
						|
 | 
						|
class L2ACSNIClient(SNIClient):
 | 
						|
    game: str = "Lufia II Ancient Cave"
 | 
						|
 | 
						|
    async def validate_rom(self, ctx: SNIContext) -> bool:
 | 
						|
        from SNIClient import snes_read
 | 
						|
 | 
						|
        rom_name: Optional[bytes] = await snes_read(ctx, L2AC_ROMNAME_START, 0x15)
 | 
						|
        if rom_name is None or rom_name[:4] != b"L2AC":
 | 
						|
            return False
 | 
						|
 | 
						|
        ctx.game = self.game
 | 
						|
        ctx.items_handling = 0b111  # fully remote
 | 
						|
 | 
						|
        ctx.rom = rom_name
 | 
						|
 | 
						|
        return True
 | 
						|
 | 
						|
    async def game_watcher(self, ctx: SNIContext) -> None:
 | 
						|
        from SNIClient import snes_buffered_write, snes_flush_writes, snes_read
 | 
						|
 | 
						|
        rom: Optional[bytes] = await snes_read(ctx, L2AC_ROMNAME_START, 0x15)
 | 
						|
        if rom != ctx.rom:
 | 
						|
            ctx.rom = None
 | 
						|
            return
 | 
						|
 | 
						|
        if ctx.server is None or ctx.slot is None:
 | 
						|
            # not successfully connected to a multiworld server, cannot process the game sending items
 | 
						|
            return
 | 
						|
 | 
						|
        signature: Optional[bytes] = await snes_read(ctx, L2AC_SIGN_ADDR, 16)
 | 
						|
        if signature != b"ArchipelagoLufia":
 | 
						|
            return
 | 
						|
 | 
						|
        # Goal
 | 
						|
        if not ctx.finished_game:
 | 
						|
            goal_data: Optional[bytes] = await snes_read(ctx, L2AC_GOAL_ADDR, 10)
 | 
						|
            if goal_data is not None and goal_data[goal_data[0]] == 0x01:
 | 
						|
                await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
 | 
						|
                ctx.finished_game = True
 | 
						|
 | 
						|
        # DeathLink TX
 | 
						|
        death_data: Optional[bytes] = await snes_read(ctx, L2AC_DEATH_ADDR, 3)
 | 
						|
        if death_data is not None:
 | 
						|
            await ctx.update_death_link(bool(death_data[0]))
 | 
						|
            if death_data[1] != 0x00:
 | 
						|
                snes_buffered_write(ctx, L2AC_DEATH_ADDR + 1, b"\x00")
 | 
						|
                if "DeathLink" in ctx.tags and ctx.last_death_link + 1 < time.time():
 | 
						|
                    player_name: str = ctx.player_names.get(ctx.slot, str(ctx.slot))
 | 
						|
                    enemy_name: str = enemy_id_to_name.get(death_data[1] - 1, hex(death_data[1] - 1))
 | 
						|
                    await ctx.send_death(f"{player_name} was totally defeated by {enemy_name}.")
 | 
						|
 | 
						|
        # TX
 | 
						|
        tx_data: Optional[bytes] = await snes_read(ctx, L2AC_TX_ADDR, 8)
 | 
						|
        if tx_data is not None:
 | 
						|
            snes_items_sent = int.from_bytes(tx_data[:2], "little")
 | 
						|
            client_items_sent = int.from_bytes(tx_data[2:4], "little")
 | 
						|
            client_ap_items_found = int.from_bytes(tx_data[4:6], "little")
 | 
						|
 | 
						|
            if client_items_sent < snes_items_sent:
 | 
						|
                location_id: int = locations_start_id + client_items_sent
 | 
						|
                location: str = ctx.location_names[location_id]
 | 
						|
                client_items_sent += 1
 | 
						|
 | 
						|
                ctx.locations_checked.add(location_id)
 | 
						|
                await ctx.send_msgs([{"cmd": "LocationChecks", "locations": [location_id]}])
 | 
						|
 | 
						|
                snes_logger.info("New Check: %s (%d/%d)" % (
 | 
						|
                    location,
 | 
						|
                    len(ctx.locations_checked),
 | 
						|
                    len(ctx.missing_locations) + len(ctx.checked_locations)))
 | 
						|
                snes_buffered_write(ctx, L2AC_TX_ADDR + 2, client_items_sent.to_bytes(2, "little"))
 | 
						|
 | 
						|
            ap_items_found: int = sum(net_item.player != ctx.slot for net_item in ctx.locations_info.values())
 | 
						|
            if client_ap_items_found < ap_items_found:
 | 
						|
                snes_buffered_write(ctx, L2AC_TX_ADDR + 4, ap_items_found.to_bytes(2, "little"))
 | 
						|
 | 
						|
        # RX
 | 
						|
        rx_data: Optional[bytes] = await snes_read(ctx, L2AC_RX_ADDR, 4)
 | 
						|
        if rx_data is not None:
 | 
						|
            snes_items_received = int.from_bytes(rx_data[:2], "little")
 | 
						|
 | 
						|
            if snes_items_received < len(ctx.items_received):
 | 
						|
                item: NetworkItem = ctx.items_received[snes_items_received]
 | 
						|
                item_code: int = item.item - items_start_id
 | 
						|
                snes_items_received += 1
 | 
						|
 | 
						|
                snes_logger.info("Received %s from %s (%s) (%d/%d in list)" % (
 | 
						|
                    ctx.item_names[item.item],
 | 
						|
                    ctx.player_names[item.player],
 | 
						|
                    ctx.location_names[item.location],
 | 
						|
                    snes_items_received, len(ctx.items_received)))
 | 
						|
                snes_buffered_write(ctx, L2AC_RX_ADDR + 2 * (snes_items_received + 1), item_code.to_bytes(2, "little"))
 | 
						|
                snes_buffered_write(ctx, L2AC_RX_ADDR, snes_items_received.to_bytes(2, "little"))
 | 
						|
 | 
						|
        await snes_flush_writes(ctx)
 | 
						|
 | 
						|
    async def deathlink_kill_player(self, ctx: SNIContext) -> None:
 | 
						|
        from SNIClient import DeathState, snes_buffered_write, snes_flush_writes
 | 
						|
 | 
						|
        # DeathLink RX
 | 
						|
        if "DeathLink" in ctx.tags:
 | 
						|
            snes_buffered_write(ctx, L2AC_DEATH_ADDR + 2, b"\x01")
 | 
						|
        else:
 | 
						|
            snes_buffered_write(ctx, L2AC_DEATH_ADDR + 2, b"\x00")
 | 
						|
        await snes_flush_writes(ctx)
 | 
						|
        ctx.death_state = DeathState.dead
 |