Send AP text into Factorio worlds

This commit is contained in:
Fabian Dill 2021-04-13 14:49:32 +02:00
parent a995627e98
commit ee30914b2c
3 changed files with 51 additions and 13 deletions

View File

@ -129,7 +129,7 @@ class CommonContext():
self.input_requests = 0 self.input_requests = 0
# game state # game state
self.player_names: typing.Dict[int: str] = {} self.player_names: typing.Dict[int: str] = {0: "Server"}
self.exit_event = asyncio.Event() self.exit_event = asyncio.Event()
self.watcher_event = asyncio.Event() self.watcher_event = asyncio.Event()
@ -195,6 +195,7 @@ class CommonContext():
def consume_players_package(self, package: typing.List[tuple]): def consume_players_package(self, package: typing.List[tuple]):
self.player_names = {slot: name for team, slot, name, orig_name in package if self.team == team} self.player_names = {slot: name for team, slot, name, orig_name in package if self.team == team}
self.player_names[0] = "Server"
def event_invalid_slot(self): def event_invalid_slot(self):
raise Exception('Invalid Slot; please verify that you have connected to the correct world.') raise Exception('Invalid Slot; please verify that you have connected to the correct world.')
@ -213,6 +214,14 @@ class CommonContext():
await self.disconnect() await self.disconnect()
self.server_task = asyncio.create_task(server_loop(self, address)) self.server_task = asyncio.create_task(server_loop(self, address))
def on_print(self, args: dict):
logger.info(args["text"])
def on_print_json(self, args: dict):
if not self.found_items and args.get("type", None) == "ItemSend" and args["receiving"] == args["sending"]:
pass # don't want info on other player's local pickups.
logger.info(self.jsontotextparser(args["data"]))
async def server_loop(ctx: CommonContext, address=None): async def server_loop(ctx: CommonContext, address=None):
ui_node = getattr(ctx, "ui_node", None) ui_node = getattr(ctx, "ui_node", None)
@ -394,12 +403,10 @@ async def process_server_cmd(ctx: CommonContext, args: dict):
ctx.hint_points = args['hint_points'] ctx.hint_points = args['hint_points']
elif cmd == 'Print': elif cmd == 'Print':
logger.info(args["text"]) ctx.on_print(args)
elif cmd == 'PrintJSON': elif cmd == 'PrintJSON':
if not ctx.found_items and args.get("type", None) == "ItemSend" and args["receiving"] == args["sending"]: ctx.on_print_json(args)
pass # don't want info on other player's local pickups.
logger.info(ctx.jsontotextparser(args["data"]))
elif cmd == 'InvalidArguments': elif cmd == 'InvalidArguments':
logger.warning(f"Invalid Arguments: {args['text']}") logger.warning(f"Invalid Arguments: {args['text']}")

View File

@ -2,16 +2,18 @@ import os
import logging import logging
import json import json
import string import string
import copy
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import colorama import colorama
import asyncio import asyncio
from queue import Queue, Empty from queue import Queue, Empty
from CommonClient import CommonContext, server_loop, console_loop, ClientCommandProcessor from CommonClient import CommonContext, server_loop, console_loop, ClientCommandProcessor, logger
from MultiServer import mark_raw from MultiServer import mark_raw
import Utils import Utils
import random import random
from NetUtils import RawJSONtoTextParser, NetworkItem
from worlds.factorio.Technologies import lookup_id_to_name from worlds.factorio.Technologies import lookup_id_to_name
@ -61,6 +63,7 @@ class FactorioContext(CommonContext):
super(FactorioContext, self).__init__(*args, **kwargs) super(FactorioContext, self).__init__(*args, **kwargs)
self.send_index = 0 self.send_index = 0
self.rcon_client = None self.rcon_client = None
self.raw_json_text_parser = RawJSONtoTextParser(self)
async def server_auth(self, password_requested): async def server_auth(self, password_requested):
if password_requested and not self.password: if password_requested and not self.password:
@ -75,6 +78,18 @@ class FactorioContext(CommonContext):
'uuid': Utils.get_unique_identifier(), 'game': "Factorio" 'uuid': Utils.get_unique_identifier(), 'game': "Factorio"
}]) }])
def on_print(self, args: dict):
logger.info(args["text"])
if self.rcon_client:
self.rcon_client.send_command(f"Archipelago: {args['text']}")
def on_print_json(self, args: dict):
if not self.found_items and args.get("type", None) == "ItemSend" and args["receiving"] == args["sending"]:
pass # don't want info on other player's local pickups.
copy_data = copy.deepcopy(args["data"]) # jsontotextparser is destructive currently
logger.info(self.jsontotextparser(args["data"]))
if self.rcon_client:
self.rcon_client.send_command(f"Archipelago: {self.raw_json_text_parser(copy_data)}")
async def game_watcher(ctx: FactorioContext): async def game_watcher(ctx: FactorioContext):
bridge_logger = logging.getLogger("FactorioWatcher") bridge_logger = logging.getLogger("FactorioWatcher")
@ -146,8 +161,9 @@ async def factorio_server_watcher(ctx: FactorioContext):
ctx.rcon_client.send_command("/sc game.print('Starting Archipelago Bridge')") ctx.rcon_client.send_command("/sc game.print('Starting Archipelago Bridge')")
if ctx.rcon_client: if ctx.rcon_client:
while ctx.send_index < len(ctx.items_received): while ctx.send_index < len(ctx.items_received):
item_id = ctx.items_received[ctx.send_index].item transfer_item: NetworkItem = ctx.items_received[ctx.send_index]
player_name = ctx.player_names[ctx.send_index].player item_id = transfer_item.item
player_name = ctx.player_names[transfer_item.player]
if item_id not in lookup_id_to_name: if item_id not in lookup_id_to_name:
logging.error(f"Cannot send unknown item ID: {item_id}") logging.error(f"Cannot send unknown item ID: {item_id}")
else: else:

View File

@ -9,6 +9,7 @@ import websockets
from Utils import Version from Utils import Version
class JSONMessagePart(typing.TypedDict, total=False): class JSONMessagePart(typing.TypedDict, total=False):
text: str text: str
# optional # optional
@ -18,7 +19,6 @@ class JSONMessagePart(typing.TypedDict, total=False):
found: bool found: bool
class ClientStatus(enum.IntEnum): class ClientStatus(enum.IntEnum):
CLIENT_UNKNOWN = 0 CLIENT_UNKNOWN = 0
CLIENT_CONNECTED = 5 CLIENT_CONNECTED = 5
@ -61,10 +61,12 @@ _encode = JSONEncoder(
def encode(obj): def encode(obj):
return _encode(_scan_for_TypedTuples(obj)) return _encode(_scan_for_TypedTuples(obj))
def get_any_version(data: dict) -> Version: def get_any_version(data: dict) -> Version:
data = {key.lower(): value for key, value in data.items()} # .NET version classes have capitalized keys data = {key.lower(): value for key, value in data.items()} # .NET version classes have capitalized keys
return Version(int(data["major"]), int(data["minor"]), int(data["build"])) return Version(int(data["major"]), int(data["minor"]), int(data["build"]))
whitelist = {"NetworkPlayer": NetworkPlayer, whitelist = {"NetworkPlayer": NetworkPlayer,
"NetworkItem": NetworkItem, "NetworkItem": NetworkItem,
} }
@ -73,6 +75,7 @@ custom_hooks = {
"Version": get_any_version "Version": get_any_version
} }
def _object_hook(o: typing.Any) -> typing.Any: def _object_hook(o: typing.Any) -> typing.Any:
if isinstance(o, dict): if isinstance(o, dict):
hook = custom_hooks.get(o.get("class", None), None) hook = custom_hooks.get(o.get("class", None), None)
@ -82,7 +85,7 @@ def _object_hook(o: typing.Any) -> typing.Any:
if cls: if cls:
for key in tuple(o): for key in tuple(o):
if key not in cls._fields: if key not in cls._fields:
del(o[key]) del (o[key])
return cls(**o) return cls(**o)
return o return o
@ -151,11 +154,16 @@ class HandlerMeta(type):
handlers = attrs["handlers"] = {} handlers = attrs["handlers"] = {}
trigger: str = "_handle_" trigger: str = "_handle_"
for base in bases: for base in bases:
handlers.update(base.commands) handlers.update(base.handlers)
handlers.update({handler_name[len(trigger):]: method for handler_name, method in attrs.items() if handlers.update({handler_name[len(trigger):]: method for handler_name, method in attrs.items() if
handler_name.startswith(trigger)}) handler_name.startswith(trigger)})
orig_init = attrs.get('__init__', None) orig_init = attrs.get('__init__', None)
if not orig_init:
for base in bases:
orig_init = getattr(base, '__init__', None)
if orig_init:
break
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# turn functions into bound methods # turn functions into bound methods
@ -167,6 +175,7 @@ class HandlerMeta(type):
attrs['__init__'] = __init__ attrs['__init__'] = __init__
return super(HandlerMeta, mcs).__new__(mcs, name, bases, attrs) return super(HandlerMeta, mcs).__new__(mcs, name, bases, attrs)
class JSONTypes(str, enum.Enum): class JSONTypes(str, enum.Enum):
color = "color" color = "color"
text = "text" text = "text"
@ -178,6 +187,7 @@ class JSONTypes(str, enum.Enum):
location_id = "location_id" location_id = "location_id"
entrance_name = "entrance_name" entrance_name = "entrance_name"
class JSONtoTextParser(metaclass=HandlerMeta): class JSONtoTextParser(metaclass=HandlerMeta):
def __init__(self, ctx): def __init__(self, ctx):
self.ctx = ctx self.ctx = ctx
@ -236,6 +246,11 @@ class JSONtoTextParser(metaclass=HandlerMeta):
return self._handle_color(node) return self._handle_color(node)
class RawJSONtoTextParser(JSONtoTextParser):
def _handle_color(self, node: JSONMessagePart):
return self._handle_text(node)
color_codes = {'reset': 0, 'bold': 1, 'underline': 4, 'black': 30, 'red': 31, 'green': 32, 'yellow': 33, 'blue': 34, color_codes = {'reset': 0, 'bold': 1, 'underline': 4, 'black': 30, 'red': 31, 'green': 32, 'yellow': 33, 'blue': 34,
'magenta': 35, 'cyan': 36, 'white': 37, 'black_bg': 40, 'red_bg': 41, 'green_bg': 42, 'yellow_bg': 43, 'magenta': 35, 'cyan': 36, 'white': 37, 'black_bg': 40, 'red_bg': 41, 'green_bg': 42, 'yellow_bg': 43,
'blue_bg': 44, 'purple_bg': 45, 'cyan_bg': 46, 'white_bg': 47} 'blue_bg': 44, 'purple_bg': 45, 'cyan_bg': 46, 'white_bg': 47}
@ -281,7 +296,7 @@ class Hint(typing.NamedTuple):
add_json_text(parts, " is at ") add_json_text(parts, " is at ")
add_json_text(parts, self.location, type="location_id") add_json_text(parts, self.location, type="location_id")
add_json_text(parts, " in ") add_json_text(parts, " in ")
add_json_text(parts, self.finding_player, type ="player_id") add_json_text(parts, self.finding_player, type="player_id")
if self.entrance: if self.entrance:
add_json_text(parts, "'s World at ") add_json_text(parts, "'s World at ")
add_json_text(parts, self.entrance, type="entrance_name") add_json_text(parts, self.entrance, type="entrance_name")
@ -292,4 +307,4 @@ class Hint(typing.NamedTuple):
else: else:
add_json_text(parts, ".") add_json_text(parts, ".")
return {"cmd": "PrintJSON", "data": parts, "type": "hint"} return {"cmd": "PrintJSON", "data": parts, "type": "hint"}