Archipelago/worlds/lufia2ac/Client.py

135 lines
5.4 KiB
Python

import logging
import time
import typing
from logging import Logger
from typing import Optional
from NetUtils import ClientStatus, NetworkItem
from worlds.AutoSNIClient import SNIClient
from .Enemies import enemy_id_to_name
from .Items import start_id as items_start_id
from .Locations import start_id as locations_start_id
if typing.TYPE_CHECKING:
from SNIClient import SNIContext
else:
SNIContext = typing.Any
snes_logger: Logger = logging.getLogger("SNES")
SRAM_START: int = 0xE00000
L2AC_ROMNAME_START: int = 0x007FC0
L2AC_SIGN_ADDR: int = SRAM_START + 0x2000
L2AC_GOAL_ADDR: int = SRAM_START + 0x2030
L2AC_DEATH_ADDR: int = SRAM_START + 0x203D
L2AC_TX_ADDR: int = SRAM_START + 0x2040
L2AC_RX_ADDR: int = SRAM_START + 0x2800
class L2ACSNIClient(SNIClient):
game: str = "Lufia II Ancient Cave"
async def validate_rom(self, ctx: SNIContext) -> bool:
from SNIClient import snes_read
rom_name: Optional[bytes] = await snes_read(ctx, L2AC_ROMNAME_START, 0x15)
if rom_name is None or rom_name[:4] != b"L2AC":
return False
ctx.game = self.game
ctx.items_handling = 0b111 # fully remote
ctx.rom = rom_name
return True
async def game_watcher(self, ctx: SNIContext) -> None:
from SNIClient import snes_buffered_write, snes_flush_writes, snes_read
rom: Optional[bytes] = await snes_read(ctx, L2AC_ROMNAME_START, 0x15)
if rom != ctx.rom:
ctx.rom = None
return
if ctx.server is None or ctx.slot is None:
# not successfully connected to a multiworld server, cannot process the game sending items
return
signature: Optional[bytes] = await snes_read(ctx, L2AC_SIGN_ADDR, 16)
if signature != b"ArchipelagoLufia":
return
# Goal
if not ctx.finished_game:
goal_data: Optional[bytes] = await snes_read(ctx, L2AC_GOAL_ADDR, 10)
if goal_data is not None and goal_data[goal_data[0]] == 0x01:
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
ctx.finished_game = True
# DeathLink TX
death_data: Optional[bytes] = await snes_read(ctx, L2AC_DEATH_ADDR, 3)
if death_data is not None:
await ctx.update_death_link(bool(death_data[0]))
if death_data[1] != 0x00:
snes_buffered_write(ctx, L2AC_DEATH_ADDR + 1, b"\x00")
if "DeathLink" in ctx.tags and ctx.last_death_link + 1 < time.time():
player_name: str = ctx.player_names.get(ctx.slot, str(ctx.slot))
enemy_name: str = enemy_id_to_name.get(death_data[1] - 1, hex(death_data[1] - 1))
await ctx.send_death(f"{player_name} was totally defeated by {enemy_name}.")
# TX
tx_data: Optional[bytes] = await snes_read(ctx, L2AC_TX_ADDR, 8)
if tx_data is not None:
snes_items_sent = int.from_bytes(tx_data[:2], "little")
client_items_sent = int.from_bytes(tx_data[2:4], "little")
client_ap_items_found = int.from_bytes(tx_data[4:6], "little")
if client_items_sent < snes_items_sent:
location_id: int = locations_start_id + client_items_sent
location: str = ctx.location_names[location_id]
client_items_sent += 1
ctx.locations_checked.add(location_id)
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": [location_id]}])
snes_logger.info("New Check: %s (%d/%d)" % (
location,
len(ctx.locations_checked),
len(ctx.missing_locations) + len(ctx.checked_locations)))
snes_buffered_write(ctx, L2AC_TX_ADDR + 2, client_items_sent.to_bytes(2, "little"))
ap_items_found: int = sum(net_item.player != ctx.slot for net_item in ctx.locations_info.values())
if client_ap_items_found < ap_items_found:
snes_buffered_write(ctx, L2AC_TX_ADDR + 4, ap_items_found.to_bytes(2, "little"))
# RX
rx_data: Optional[bytes] = await snes_read(ctx, L2AC_RX_ADDR, 4)
if rx_data is not None:
snes_items_received = int.from_bytes(rx_data[:2], "little")
if snes_items_received < len(ctx.items_received):
item: NetworkItem = ctx.items_received[snes_items_received]
item_code: int = item.item - items_start_id
snes_items_received += 1
snes_logger.info("Received %s from %s (%s) (%d/%d in list)" % (
ctx.item_names[item.item],
ctx.player_names[item.player],
ctx.location_names[item.location],
snes_items_received, len(ctx.items_received)))
snes_buffered_write(ctx, L2AC_RX_ADDR + 2 * (snes_items_received + 1), item_code.to_bytes(2, "little"))
snes_buffered_write(ctx, L2AC_RX_ADDR, snes_items_received.to_bytes(2, "little"))
await snes_flush_writes(ctx)
async def deathlink_kill_player(self, ctx: SNIContext) -> None:
from SNIClient import DeathState, snes_buffered_write, snes_flush_writes
# DeathLink RX
if "DeathLink" in ctx.tags:
snes_buffered_write(ctx, L2AC_DEATH_ADDR + 2, b"\x01")
else:
snes_buffered_write(ctx, L2AC_DEATH_ADDR + 2, b"\x00")
await snes_flush_writes(ctx)
ctx.death_state = DeathState.dead