diff --git a/CommonClient.py b/CommonClient.py index 567a55cb..9a928846 100644 --- a/CommonClient.py +++ b/CommonClient.py @@ -5,6 +5,7 @@ import urllib.parse import sys import os import typing +import time import websockets @@ -135,6 +136,8 @@ class CommonContext(): self.input_queue = asyncio.Queue() self.input_requests = 0 + self.last_death_link: float = time.time() # last send/received death link on AP layer + # game state self.player_names: typing.Dict[int: str] = {0: "Archipelago"} self.exit_event = asyncio.Event() @@ -256,6 +259,20 @@ class CommonContext(): except Exception as e: # safeguard against permissions that may be implemented in the future logger.exception(e) + def on_deathlink(self, data: dict): + """Gets dispatched when a new DeathLink is triggered by another linked player.""" + raise NotImplementedError + + async def send_death(self): + self.last_death_link = time.time() + await self.send_msgs([{ + "cmd": "Bounce", "tags": ["DeathLink"], + "data": { + "time": self.last_death_link, + "source": self.player_names[self.slot] + } + }]) + async def keep_alive(ctx: CommonContext, seconds_between_checks=100): """some ISPs/network configurations drop TCP connections if no payload is sent (ignore TCP-keep-alive) @@ -461,7 +478,9 @@ async def process_server_cmd(ctx: CommonContext, args: dict): logger.warning(f"Invalid Packet of {args['type']}: {args['text']}") elif cmd == "Bounced": - pass + tags = args.get("tags", []) + if "DeathLink" in tags and ctx.last_death_link != args["data"]["time"]: + ctx.on_deathlink(args["data"]) else: logger.debug(f"unknown command {cmd}") diff --git a/FactorioClient.py b/FactorioClient.py index 7d7454f2..916ff327 100644 --- a/FactorioClient.py +++ b/FactorioClient.py @@ -58,7 +58,6 @@ class FactorioContext(CommonContext): self.rcon_client = None self.awaiting_bridge = False self.write_data_path = None - self.last_death_link: float = time.time() # last send/received death link on AP layer self.death_link_tick: int = 0 # last send death link on Factorio layer self.factorio_json_text_parser = FactorioJSONtoTextParser(self) @@ -102,6 +101,10 @@ class FactorioContext(CommonContext): self.rcon_client.send_command(f"/ap-print [font=default-large-bold]Archipelago:[/font] " f"{text}") + def on_deathlink(self, data: dict): + if self.rcon_client: + self.rcon_client.send_command(f"/ap-deathlink {data['source']}") + def on_package(self, cmd: str, args: dict): if cmd in {"Connected", "RoomUpdate"}: # catch up sync anything that is already cleared. @@ -109,12 +112,6 @@ class FactorioContext(CommonContext): self.rcon_client.send_commands({item_name: f'/ap-get-technology ap-{item_name}-\t-1' for item_name in args["checked_locations"]}) - elif cmd == "Bounced": - if self.rcon_client: - tags = args.get("tags", []) - if "DeathLink" in tags and self.last_death_link != args["data"]["time"]: - self.rcon_client.send_command(f"/ap-deathlink {args['data']['source']}") - async def game_watcher(ctx: FactorioContext): bridge_logger = logging.getLogger("FactorioWatcher") @@ -150,14 +147,8 @@ async def game_watcher(ctx: FactorioContext): death_link_tick = data.get("death_link_tick", 0) if death_link_tick != ctx.death_link_tick: ctx.death_link_tick = death_link_tick - ctx.last_death_link = time.time() - await ctx.send_msgs([{ - "cmd": "Bounce", "tags": ["DeathLink"], - "data": { - "time": ctx.last_death_link, - "source": ctx.player_names[ctx.slot] - } - }]) + await ctx.send_death() + await asyncio.sleep(0.1) except Exception as e: diff --git a/LttPClient.py b/LttPClient.py index 79b21534..73227e58 100644 --- a/LttPClient.py +++ b/LttPClient.py @@ -1,5 +1,7 @@ +from __future__ import annotations import argparse import atexit + exit_func = atexit.register(input, "Press enter to close.") import threading import time @@ -12,13 +14,10 @@ import logging import asyncio from json import loads, dumps -from Utils import get_item_name_from_id - - import ModuleUpdate - ModuleUpdate.update() +from Utils import get_item_name_from_id import colorama from NetUtils import * @@ -35,7 +34,10 @@ snes_logger = logging.getLogger("SNES") from MultiServer import mark_raw + class LttPCommandProcessor(ClientCommandProcessor): + ctx: Context + def _cmd_slow_mode(self, toggle: str = ""): """Toggle slow mode, which limits how fast you send / receive items.""" if toggle: @@ -47,17 +49,18 @@ class LttPCommandProcessor(ClientCommandProcessor): @mark_raw def _cmd_snes(self, snes_options: str = "") -> bool: - """Connect to a snes. Optionally include network address of a snes to connect to, otherwise show available devices; and a SNES device number if more than one SNES is detected""" - + """Connect to a snes. Optionally include network address of a snes to connect to, + otherwise show available devices; and a SNES device number if more than one SNES is detected""" + snes_address = self.ctx.snes_address snes_device_number = -1 - + options = snes_options.split() num_options = len(options) - + if num_options > 0: snes_address = options[0] - + if num_options > 1: try: snes_device_number = int(options[1]) @@ -94,6 +97,7 @@ class Context(CommonContext): self.snes_request_lock = asyncio.Lock() self.snes_write_buffer = [] self.snes_connector_lock = threading.Lock() + self.death_state = False # for death link flop behaviour self.awaiting_rom = False self.rom = None @@ -109,7 +113,7 @@ class Context(CommonContext): raise Exception('Invalid ROM detected, ' 'please verify that you have loaded the correct rom and reconnect your snes (/snes)') - async def server_auth(self, password_requested): + async def server_auth(self, password_requested: bool = False): if password_requested and not self.password: await super(Context, self).server_auth(password_requested) if self.rom is None: @@ -121,10 +125,17 @@ class Context(CommonContext): self.auth = self.rom auth = base64.b64encode(self.rom).decode() await self.send_msgs([{"cmd": 'Connect', - 'password': self.password, 'name': auth, 'version': Utils.version_tuple, - 'tags': self.tags, - 'uuid': Utils.get_unique_identifier(), 'game': "A Link to the Past" - }]) + 'password': self.password, 'name': auth, 'version': Utils.version_tuple, + 'tags': self.tags, + 'uuid': Utils.get_unique_identifier(), 'game': "A Link to the Past" + }]) + + def on_deathlink(self, data: dict): + snes_buffered_write(self, WRAM_START+0xF36D, bytes([0])) + snes_buffered_write(self, WRAM_START+0x0373, bytes([8])) + asyncio.create_task(snes_flush_writes(self)) + self.death_state = True + snes_logger.info(f"Received DeathLink from {data['source']}") def color_item(item_id: int, green: bool = False) -> str: @@ -147,6 +158,7 @@ ROMNAME_SIZE = 0x15 INGAME_MODES = {0x07, 0x09, 0x0b} ENDGAME_MODES = {0x19, 0x1a} +DEATH_MODES = {0x12} SAVEDATA_START = WRAM_START + 0xF000 SAVEDATA_SIZE = 0x500 @@ -162,6 +174,8 @@ SCOUTREPLY_ITEM_ADDR = SAVEDATA_START + 0x4D9 # 1 byte SCOUTREPLY_PLAYER_ADDR = SAVEDATA_START + 0x4DA # 1 byte SHOP_ADDR = SAVEDATA_START + 0x302 # 2 bytes +DEATH_LINK_ACTIVE_ADDR = ROM_START + 0x18008D # 1 byte + location_shop_ids = set([info[0] for name, info in Shops.shop_table.items()]) location_table_uw = {"Blind's Hideout - Top": (0x11d, 0x10), @@ -385,7 +399,7 @@ location_table_uw = {"Blind's Hideout - Top": (0x11d, 0x10), 'Ganons Tower - Pre-Moldorm Chest': (0x3d, 0x40), 'Ganons Tower - Validation Chest': (0x4d, 0x10)} -location_table_uw_id = {Regions.lookup_name_to_id[name] : data for name, data in location_table_uw.items()} +location_table_uw_id = {Regions.lookup_name_to_id[name]: data for name, data in location_table_uw.items()} location_table_npc = {'Mushroom': 0x1000, 'King Zora': 0x2, @@ -401,7 +415,7 @@ location_table_npc = {'Mushroom': 0x1000, 'Stumpy': 0x8, 'Bombos Tablet': 0x200} -location_table_npc_id = {Regions.lookup_name_to_id[name] : data for name, data in location_table_npc.items()} +location_table_npc_id = {Regions.lookup_name_to_id[name]: data for name, data in location_table_npc.items()} location_table_ow = {'Flute Spot': 0x2a, 'Sunken Treasure': 0x3b, @@ -416,14 +430,15 @@ location_table_ow = {'Flute Spot': 0x2a, 'Bumper Cave Ledge': 0x4a, 'Floating Island': 0x5} -location_table_ow_id = {Regions.lookup_name_to_id[name] : data for name, data in location_table_ow.items()} +location_table_ow_id = {Regions.lookup_name_to_id[name]: data for name, data in location_table_ow.items()} location_table_misc = {'Bottle Merchant': (0x3c9, 0x2), 'Purple Chest': (0x3c9, 0x10), "Link's Uncle": (0x3c6, 0x1), 'Hobo': (0x3c9, 0x1)} -location_table_misc_id = {Regions.lookup_name_to_id[name] : data for name, data in location_table_misc.items()} +location_table_misc_id = {Regions.lookup_name_to_id[name]: data for name, data in location_table_misc.items()} + class SNESState(enum.IntEnum): SNES_DISCONNECTED = 0 @@ -446,10 +461,11 @@ def launch_sni(ctx: Context): if os.path.isfile(sni_path): snes_logger.info(f"Attempting to start {sni_path}") import subprocess - if Utils.is_frozen(): # if it spawns a visible console, may as well populate it + if Utils.is_frozen(): # if it spawns a visible console, may as well populate it subprocess.Popen(sni_path, cwd=os.path.dirname(sni_path)) else: - subprocess.Popen(sni_path, cwd=os.path.dirname(sni_path), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.Popen(sni_path, cwd=os.path.dirname(sni_path), stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL) else: snes_logger.info( f"Attempt to start SNI was aborted as path {sni_path} was not found, " @@ -500,12 +516,11 @@ async def get_snes_devices(ctx: Context): reply = loads(await socket.recv()) devices = reply['Results'] if 'Results' in reply and len(reply['Results']) > 0 else None - await socket.close() return devices -async def snes_connect(ctx: Context, address, deviceIndex = -1): +async def snes_connect(ctx: Context, address, deviceIndex=-1): global SNES_RECONNECT_DELAY if ctx.snes_socket is not None and ctx.snes_state == SNESState.SNES_CONNECTED: if ctx.rom: @@ -534,7 +549,8 @@ async def snes_connect(ctx: Context, address, deviceIndex = -1): device = devices[ctx.snes_attached_device[0]] elif numDevices > 1: if deviceIndex == -1: - snes_logger.info("Found " + str(numDevices) + " SNES devices; connect to one with /snes