Implement PrintJSON
Remove deprecated network packets Make Client send new locations checked only once Simplify register_location_checks Rip out Server savegame compatibility
This commit is contained in:
parent
07b9fcfefc
commit
7871555620
|
@ -932,58 +932,9 @@ async def process_server_cmd(ctx: Context, args: dict):
|
|||
elif cmd == 'LocationInfo':
|
||||
for item, location, player in args['locations']:
|
||||
if location not in ctx.locations_info:
|
||||
replacements = {0xA2: 'Small Key', 0x9D: 'Big Key', 0x8D: 'Compass', 0x7D: 'Map'}
|
||||
item_name = replacements.get(item, ctx.item_name_getter(item))
|
||||
logger.info(
|
||||
f"Saw {color(item_name, 'red', 'bold')} at {list(Regions.location_table.keys())[location - 1]}")
|
||||
ctx.locations_info[location] = (item, player)
|
||||
ctx.watcher_event.set()
|
||||
|
||||
elif cmd == 'ItemSent': # going away
|
||||
found = NetworkItem(*args["item"])
|
||||
receiving_player = args["receiver"]
|
||||
ctx.ui_node.notify_item_sent(ctx.player_names[found.player], ctx.player_names[receiving_player],
|
||||
ctx.item_name_getter(found.item), ctx.location_name_getter(found.location),
|
||||
found.player == ctx.slot, receiving_player == ctx.slot,
|
||||
ctx.item_name_getter(found.item) in Items.progression_items)
|
||||
item = color(ctx.item_name_getter(found.item), 'cyan' if found.player != ctx.slot else 'green')
|
||||
found_player = color(ctx.player_names[found.player], 'yellow' if found.player != ctx.slot else 'magenta')
|
||||
receiving_player = color(ctx.player_names[receiving_player],
|
||||
'yellow' if receiving_player != ctx.slot else 'magenta')
|
||||
logging.info(
|
||||
'%s sent %s to %s (%s)' % (found_player, item, receiving_player,
|
||||
color(ctx.location_name_getter(found.location), 'blue_bg', 'white')))
|
||||
|
||||
elif cmd == 'ItemFound': # going away
|
||||
found = NetworkItem(*args["item"])
|
||||
ctx.ui_node.notify_item_found(ctx.player_names[found.player], ctx.item_name_getter(found.item),
|
||||
ctx.location_name_getter(found.location), found.player == ctx.slot,
|
||||
ctx.item_name_getter(found.item) in Items.progression_items)
|
||||
item = color_item(found.item, found.player == ctx.slot)
|
||||
player_sent = color(ctx.player_names[found.player], 'yellow' if found.player != ctx.slot else 'magenta')
|
||||
logging.info('%s found %s (%s)' % (player_sent, item, color(ctx.location_name_getter(found.location),
|
||||
'blue_bg', 'white')))
|
||||
|
||||
elif cmd == 'Hint': # going away
|
||||
hints = [Utils.Hint(*hint) for hint in args["hints"]]
|
||||
for hint in hints:
|
||||
ctx.ui_node.send_hint(ctx.player_names[hint.finding_player], ctx.player_names[hint.receiving_player],
|
||||
ctx.item_name_getter(hint.item), ctx.location_name_getter(hint.location),
|
||||
hint.found, hint.finding_player == ctx.slot, hint.receiving_player == ctx.slot,
|
||||
hint.entrance if hint.entrance else None)
|
||||
item = color_item(hint.item, hint.found)
|
||||
player_find = color(ctx.player_names[hint.finding_player],
|
||||
'yellow' if hint.finding_player != ctx.slot else 'magenta')
|
||||
player_recvd = color(ctx.player_names[hint.receiving_player],
|
||||
'yellow' if hint.receiving_player != ctx.slot else 'magenta')
|
||||
|
||||
text = f"[Hint]: {player_recvd}'s {item} is " \
|
||||
f"at {color(ctx.location_name_getter(hint.location), 'blue_bg', 'white')} " \
|
||||
f"in {player_find}'s World"
|
||||
if hint.entrance:
|
||||
text += " at " + color(hint.entrance, 'white_bg', 'black')
|
||||
logging.info(text + (f". {color('(found)', 'green_bg', 'black')} " if hint.found else "."))
|
||||
|
||||
elif cmd == "RoomUpdate":
|
||||
if "players" in args:
|
||||
ctx.consume_players_package(args["players"])
|
||||
|
@ -994,7 +945,7 @@ async def process_server_cmd(ctx: Context, args: dict):
|
|||
logger.info(args["text"])
|
||||
|
||||
elif cmd == 'PrintJSON':
|
||||
logger.info(ctx.jsontotextparser(args["data"]))
|
||||
logger.info(ctx.jsontotextparser(args["text"]))
|
||||
|
||||
elif cmd == 'InvalidArguments':
|
||||
logger.warning(f"Invalid Arguments: {args['text']}")
|
||||
|
@ -1163,6 +1114,7 @@ async def track_locations(ctx: Context, roomid, roomdata):
|
|||
new_locations = []
|
||||
|
||||
def new_check(location):
|
||||
new_locations.append(Regions.lookup_name_to_id.get(location, Shops.shop_table_by_location.get(location, -1)))
|
||||
ctx.locations_checked.add(location)
|
||||
|
||||
check = None
|
||||
|
@ -1227,7 +1179,7 @@ async def track_locations(ctx: Context, roomid, roomdata):
|
|||
if ow_data[screenid - ow_begin] & 0x40 != 0:
|
||||
new_check(location)
|
||||
|
||||
if not all([location in ctx.locations_checked for location in location_table_npc.keys()]):
|
||||
if not all(location in ctx.locations_checked for location in location_table_npc.keys()):
|
||||
npc_data = await snes_read(ctx, SAVEDATA_START + 0x410, 2)
|
||||
if npc_data is not None:
|
||||
npc_value = npc_data[0] | (npc_data[1] << 8)
|
||||
|
@ -1235,7 +1187,7 @@ async def track_locations(ctx: Context, roomid, roomdata):
|
|||
if npc_value & mask != 0 and location not in ctx.locations_checked:
|
||||
new_check(location)
|
||||
|
||||
if not all([location in ctx.locations_checked for location in location_table_misc.keys()]):
|
||||
if not all(location in ctx.locations_checked for location in location_table_misc.keys()):
|
||||
misc_data = await snes_read(ctx, SAVEDATA_START + 0x3c6, 4)
|
||||
if misc_data is not None:
|
||||
for location, (offset, mask) in location_table_misc.items():
|
||||
|
@ -1243,13 +1195,6 @@ async def track_locations(ctx: Context, roomid, roomdata):
|
|||
if misc_data[offset - 0x3c6] & mask != 0 and location not in ctx.locations_checked:
|
||||
new_check(location)
|
||||
|
||||
for location in ctx.locations_checked:
|
||||
try:
|
||||
my_id = Regions.lookup_name_to_id.get(location, Shops.shop_table_by_location.get(location, -1))
|
||||
new_locations.append(my_id)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
logger.info(f"Exception: {e}")
|
||||
|
||||
if new_locations:
|
||||
await ctx.send_msgs([{"cmd": 'LocationChecks', "locations": new_locations}])
|
||||
|
|
158
MultiServer.py
158
MultiServer.py
|
@ -13,9 +13,9 @@ import datetime
|
|||
import threading
|
||||
import random
|
||||
import pickle
|
||||
from json import loads, dumps
|
||||
|
||||
import ModuleUpdate
|
||||
import NetUtils
|
||||
|
||||
ModuleUpdate.update()
|
||||
|
||||
|
@ -92,7 +92,7 @@ class Context(Node):
|
|||
self.hint_cost = hint_cost
|
||||
self.location_check_points = location_check_points
|
||||
self.hints_used = collections.defaultdict(int)
|
||||
self.hints: typing.Dict[typing.Tuple[int, int], typing.Set[Utils.Hint]] = collections.defaultdict(set)
|
||||
self.hints: typing.Dict[typing.Tuple[int, int], typing.Set[NetUtils.Hint]] = collections.defaultdict(set)
|
||||
self.forfeit_mode: str = forfeit_mode
|
||||
self.remaining_mode: str = remaining_mode
|
||||
self.item_cheat = item_cheat
|
||||
|
@ -245,50 +245,21 @@ class Context(Node):
|
|||
|
||||
def set_save(self, savedata: dict):
|
||||
rom_names = savedata["rom_names"] # convert from TrackerList to List in case of ponyorm
|
||||
try:
|
||||
adjusted = {rom: (team, slot) for rom, (team, slot) in rom_names}
|
||||
except TypeError:
|
||||
adjusted = {tuple(rom): (team, slot) for (rom, (team, slot)) in rom_names} # old format, ponyorm friendly
|
||||
if self.connect_names != adjusted:
|
||||
logging.warning('Save file mismatch, will start a new game')
|
||||
return
|
||||
else:
|
||||
if adjusted != self.connect_names:
|
||||
logging.warning('Save file mismatch, will start a new game')
|
||||
return
|
||||
|
||||
received_items = {tuple(k): [NetworkItem(*i) for i in v] for k, v in savedata["received_items"]}
|
||||
|
||||
self.received_items = received_items
|
||||
self.hints_used.update({tuple(key): value for key, value in savedata["hints_used"]})
|
||||
if "hints" in savedata:
|
||||
self.hints.update(
|
||||
{tuple(key): set(Utils.Hint(*hint) for hint in value) for key, value in savedata["hints"]})
|
||||
else: # backwards compatiblity for <= 2.0.2
|
||||
old_hints = {tuple(key): set(value) for key, value in savedata["hints_sent"]}
|
||||
for team_slot, item_or_location_s in old_hints.items():
|
||||
team, slot = team_slot
|
||||
for item_or_location in item_or_location_s:
|
||||
if item_or_location in Items.item_table:
|
||||
hints = collect_hints(self, team, slot, item_or_location)
|
||||
else:
|
||||
hints = collect_hints_location(self, team, slot, item_or_location)
|
||||
for hint in hints:
|
||||
self.hints[team, hint.receiving_player].add(hint)
|
||||
# even if it is the same hint, it won't be duped due to set
|
||||
self.hints[team, hint.finding_player].add(hint)
|
||||
if "name_aliases" in savedata:
|
||||
self.name_aliases.update({tuple(key): value for key, value in savedata["name_aliases"]})
|
||||
if "client_game_state" in savedata:
|
||||
self.client_game_state.update({tuple(key): value for key, value in savedata["client_game_state"]})
|
||||
if "client_activity_timers" in savedata:
|
||||
self.client_connection_timers.update(
|
||||
{tuple(key): datetime.datetime.fromtimestamp(value, datetime.timezone.utc) for key, value
|
||||
in savedata["client_connection_timers"]})
|
||||
self.client_activity_timers.update(
|
||||
{tuple(key): datetime.datetime.fromtimestamp(value, datetime.timezone.utc) for key, value
|
||||
in savedata["client_activity_timers"]})
|
||||
self.hints.update(
|
||||
{tuple(key): set(NetUtils.Hint(*hint) for hint in value) for key, value in savedata["hints"]})
|
||||
|
||||
self.name_aliases.update({tuple(key): value for key, value in savedata["name_aliases"]})
|
||||
self.client_game_state.update({tuple(key): value for key, value in savedata["client_game_state"]})
|
||||
self.client_connection_timers.update(
|
||||
{tuple(key): datetime.datetime.fromtimestamp(value, datetime.timezone.utc) for key, value
|
||||
in savedata["client_connection_timers"]})
|
||||
self.client_activity_timers.update(
|
||||
{tuple(key): datetime.datetime.fromtimestamp(value, datetime.timezone.utc) for key, value
|
||||
in savedata["client_activity_timers"]})
|
||||
self.location_checks.update({tuple(key): set(value) for key, value in savedata["location_checks"]})
|
||||
|
||||
logging.info(f'Loaded save file with {sum([len(p) for p in received_items.values()])} received items '
|
||||
|
@ -316,9 +287,10 @@ class Context(Node):
|
|||
asyncio.create_task(self.send_msgs(client, [{"cmd": "Print", "text": text} for text in texts]))
|
||||
|
||||
def broadcast_team(self, team, msgs):
|
||||
msgs = self.dumper(msgs)
|
||||
for client in self.endpoints:
|
||||
if client.auth and client.team == team:
|
||||
asyncio.create_task(self.send_msgs(client, msgs))
|
||||
asyncio.create_task(self.send_encoded_msgs(client, msgs))
|
||||
|
||||
def broadcast_all(self, msgs):
|
||||
msgs = self.dumper(msgs)
|
||||
|
@ -331,15 +303,17 @@ class Context(Node):
|
|||
await on_client_disconnected(self, endpoint)
|
||||
|
||||
|
||||
# separated out, due to compatibility between clients
|
||||
def notify_hints(ctx: Context, team: int, hints: typing.List[Utils.Hint]):
|
||||
def notify_hints(ctx: Context, team: int, hints: typing.List[NetUtils.Hint]):
|
||||
cmd = ctx.dumper([{"cmd": "Hint", "hints" : hints}])
|
||||
texts = ([format_hint(ctx, team, hint)] for hint in hints)
|
||||
for text in texts:
|
||||
commands = ctx.dumper([hint.as_network_message() for hint in hints])
|
||||
|
||||
for text in (format_hint(ctx, team, hint) for hint in hints):
|
||||
logging.info("Notice (Team #%d): %s" % (team + 1, text))
|
||||
|
||||
for client in ctx.endpoints:
|
||||
if client.auth and client.team == team:
|
||||
asyncio.create_task(ctx.send_encoded_msgs(client, cmd))
|
||||
asyncio.create_task(ctx.send_encoded_msgs(client, commands))
|
||||
|
||||
|
||||
def update_aliases(ctx: Context, team: int, client: typing.Optional[Client] = None):
|
||||
|
@ -449,15 +423,14 @@ def tuplize_received_items(items):
|
|||
|
||||
def send_new_items(ctx: Context):
|
||||
for client in ctx.endpoints:
|
||||
if not client.auth:
|
||||
continue
|
||||
items = get_received_items(ctx, client.team, client.slot)
|
||||
if len(items) > client.send_index:
|
||||
asyncio.create_task(ctx.send_msgs(client, [{
|
||||
"cmd": "ReceivedItems",
|
||||
"index": client.send_index,
|
||||
"items": tuplize_received_items(items)[client.send_index:]}]))
|
||||
client.send_index = len(items)
|
||||
if client.auth: # can't send to disconnected client
|
||||
items = get_received_items(ctx, client.team, client.slot)
|
||||
if len(items) > client.send_index:
|
||||
asyncio.create_task(ctx.send_msgs(client, [{
|
||||
"cmd": "ReceivedItems",
|
||||
"index": client.send_index,
|
||||
"items": tuplize_received_items(items)[client.send_index:]}]))
|
||||
client.send_index = len(items)
|
||||
|
||||
|
||||
def forfeit_player(ctx: Context, team: int, slot: int):
|
||||
|
@ -476,50 +449,29 @@ def get_remaining(ctx: Context, team: int, slot: int) -> typing.List[int]:
|
|||
|
||||
|
||||
def register_location_checks(ctx: Context, team: int, slot: int, locations: typing.Iterable[int]):
|
||||
found_items = False
|
||||
new_locations = set(locations) - ctx.location_checks[team, slot]
|
||||
known_locations = set()
|
||||
if new_locations:
|
||||
ctx.client_activity_timers[team, slot] = datetime.datetime.now(datetime.timezone.utc)
|
||||
for location in new_locations:
|
||||
if (location, slot) in ctx.locations:
|
||||
known_locations.add(location)
|
||||
target_item, target_player = ctx.locations[(location, slot)]
|
||||
item_id, target_player = ctx.locations[(location, slot)]
|
||||
new_item = NetworkItem(item_id, location, slot)
|
||||
if target_player != slot or slot in ctx.remote_items:
|
||||
found: bool = False
|
||||
recvd_items = get_received_items(ctx, team, target_player)
|
||||
for recvd_item in recvd_items:
|
||||
if recvd_item.location == location and recvd_item.player == slot:
|
||||
found = True
|
||||
break
|
||||
get_received_items(ctx, team, target_player).append(new_item)
|
||||
|
||||
if not found:
|
||||
new_item = NetworkItem(target_item, location, slot)
|
||||
recvd_items.append(new_item)
|
||||
if slot != target_player:
|
||||
ctx.broadcast_team(team,
|
||||
[{"cmd": "ItemSent",
|
||||
"item": new_item,
|
||||
"receiver" : target_player}])
|
||||
logging.info('(Team #%d) %s sent %s to %s (%s)' % (
|
||||
team + 1, ctx.player_names[(team, slot)], get_item_name_from_id(target_item),
|
||||
ctx.player_names[(team, target_player)], get_location_name_from_address(location)))
|
||||
found_items = True
|
||||
elif target_player == slot: # local pickup, notify clients of the pickup
|
||||
if location not in ctx.location_checks[team, slot]:
|
||||
for client in ctx.endpoints:
|
||||
if client.team == team and client.wants_item_notification:
|
||||
asyncio.create_task(
|
||||
ctx.send_msgs(client, [{"cmd": "ItemFound",
|
||||
"item": NetworkItem(target_item, location, slot)}]))
|
||||
ctx.location_checks[team, slot] |= known_locations
|
||||
logging.info('(Team #%d) %s sent %s to %s (%s)' % (
|
||||
team + 1, ctx.player_names[(team, slot)], get_item_name_from_id(item_id),
|
||||
ctx.player_names[(team, target_player)], get_location_name_from_address(location)))
|
||||
info_text = json_format_send_event(new_item, target_player)
|
||||
ctx.broadcast_team(team, [info_text])
|
||||
|
||||
ctx.location_checks[team, slot] |= new_locations
|
||||
send_new_items(ctx)
|
||||
for client in ctx.endpoints:
|
||||
if client.team == team and client.slot == slot:
|
||||
asyncio.create_task(ctx.send_msgs(client, [{"cmd": "RoomUpdate",
|
||||
"hint_points": get_client_points(ctx, client)}]))
|
||||
|
||||
if found_items:
|
||||
for client in ctx.endpoints:
|
||||
if client.team == team and client.slot == slot:
|
||||
asyncio.create_task(ctx.send_msgs(client, [{"cmd": "RoomUpdate",
|
||||
"hint_points": get_client_points(ctx, client)}]))
|
||||
ctx.save()
|
||||
|
||||
|
||||
|
@ -529,7 +481,7 @@ def notify_team(ctx: Context, team: int, text: str):
|
|||
|
||||
|
||||
|
||||
def collect_hints(ctx: Context, team: int, slot: int, item: str) -> typing.List[Utils.Hint]:
|
||||
def collect_hints(ctx: Context, team: int, slot: int, item: str) -> typing.List[NetUtils.Hint]:
|
||||
hints = []
|
||||
seeked_item_id = Items.item_table[item][2]
|
||||
for check, result in ctx.locations.items():
|
||||
|
@ -538,12 +490,12 @@ def collect_hints(ctx: Context, team: int, slot: int, item: str) -> typing.List[
|
|||
location_id, finding_player = check
|
||||
found = location_id in ctx.location_checks[team, finding_player]
|
||||
entrance = ctx.er_hint_data.get(finding_player, {}).get(location_id, "")
|
||||
hints.append(Utils.Hint(receiving_player, finding_player, location_id, item_id, found, entrance))
|
||||
hints.append(NetUtils.Hint(receiving_player, finding_player, location_id, item_id, found, entrance))
|
||||
|
||||
return hints
|
||||
|
||||
|
||||
def collect_hints_location(ctx: Context, team: int, slot: int, location: str) -> typing.List[Utils.Hint]:
|
||||
def collect_hints_location(ctx: Context, team: int, slot: int, location: str) -> typing.List[NetUtils.Hint]:
|
||||
hints = []
|
||||
seeked_location = Regions.lookup_name_to_id[location]
|
||||
for check, result in ctx.locations.items():
|
||||
|
@ -552,12 +504,12 @@ def collect_hints_location(ctx: Context, team: int, slot: int, location: str) ->
|
|||
item_id, receiving_player = result
|
||||
found = location_id in ctx.location_checks[team, finding_player]
|
||||
entrance = ctx.er_hint_data.get(finding_player, {}).get(location_id, "")
|
||||
hints.append(Utils.Hint(receiving_player, finding_player, location_id, item_id, found, entrance))
|
||||
hints.append(NetUtils.Hint(receiving_player, finding_player, location_id, item_id, found, entrance))
|
||||
break # each location has 1 item
|
||||
return hints
|
||||
|
||||
|
||||
def format_hint(ctx: Context, team: int, hint: Utils.Hint) -> str:
|
||||
def format_hint(ctx: Context, team: int, hint: NetUtils.Hint) -> str:
|
||||
text = f"[Hint]: {ctx.player_names[team, hint.receiving_player]}'s " \
|
||||
f"{Items.lookup_id_to_name[hint.item]} is " \
|
||||
f"at {get_location_name_from_address(hint.location)} " \
|
||||
|
@ -567,6 +519,19 @@ def format_hint(ctx: Context, team: int, hint: Utils.Hint) -> str:
|
|||
text += f" at {hint.entrance}"
|
||||
return text + (". (found)" if hint.found else ".")
|
||||
|
||||
def json_format_send_event(net_item: NetworkItem, receiving_player: int):
|
||||
parts = []
|
||||
NetUtils.add_json_text(parts, net_item.player, type=NetUtils.JSONTypes.player_id)
|
||||
NetUtils.add_json_text(parts, " sent ")
|
||||
NetUtils.add_json_text(parts, net_item.item, type=NetUtils.JSONTypes.item_id)
|
||||
NetUtils.add_json_text(parts, " to ")
|
||||
NetUtils.add_json_text(parts, receiving_player, type=NetUtils.JSONTypes.player_id)
|
||||
NetUtils.add_json_text(parts, " (")
|
||||
NetUtils.add_json_text(parts, net_item.location, type=NetUtils.JSONTypes.location_id)
|
||||
NetUtils.add_json_text(parts, ")")
|
||||
|
||||
return {"cmd": "PrintJSON", "text": parts, "type": "ItemSend",
|
||||
"receiving": receiving_player, "sending": net_item.player}
|
||||
|
||||
def get_intended_text(input_text: str, possible_answers: typing.Iterable[str]= console_names) -> typing.Tuple[str, bool, str]:
|
||||
picks = fuzzy_process.extract(input_text, possible_answers, limit=2)
|
||||
|
@ -824,9 +789,8 @@ class ClientMessageProcessor(CommonCommandProcessor):
|
|||
"""List all missing location checks from the server's perspective"""
|
||||
|
||||
locations = get_missing_checks(self.ctx, self.client)
|
||||
checked_locations = get_checked_checks(self.ctx, self.client)
|
||||
|
||||
if len(locations) > 0:
|
||||
if locations:
|
||||
texts = [f'Missing: {location}\n' for location in locations]
|
||||
texts.append(f"Found {len(locations)} missing location checks")
|
||||
self.ctx.notify_client_multiple(self.client, texts)
|
||||
|
|
107
NetUtils.py
107
NetUtils.py
|
@ -9,10 +9,15 @@ import websockets
|
|||
|
||||
from Utils import Version
|
||||
|
||||
class JSONMessagePart(typing.TypedDict):
|
||||
type: typing.Optional[str]
|
||||
color: typing.Optional[str]
|
||||
text: typing.Optional[str]
|
||||
class JSONMessagePart(typing.TypedDict, total=False):
|
||||
text: str
|
||||
# optional
|
||||
type: str
|
||||
color: str
|
||||
# mainly for items, optional
|
||||
found: bool
|
||||
|
||||
|
||||
|
||||
class CLientStatus(enum.IntEnum):
|
||||
CLIENT_UNKNOWN = 0
|
||||
|
@ -144,6 +149,16 @@ class HandlerMeta(type):
|
|||
attrs['__init__'] = __init__
|
||||
return super(HandlerMeta, mcs).__new__(mcs, name, bases, attrs)
|
||||
|
||||
class JSONTypes(str, enum.Enum):
|
||||
color = "color"
|
||||
text = "text"
|
||||
player_id = "player_id"
|
||||
player_name = "player_name"
|
||||
item_name = "item_name"
|
||||
item_id = "item_id"
|
||||
location_name = "location_name"
|
||||
location_id = "location_id"
|
||||
entrance_name = "entrance_name"
|
||||
|
||||
class JSONtoTextParser(metaclass=HandlerMeta):
|
||||
def __init__(self, ctx):
|
||||
|
@ -158,18 +173,16 @@ class JSONtoTextParser(metaclass=HandlerMeta):
|
|||
return handler(node)
|
||||
|
||||
def _handle_color(self, node: JSONMessagePart):
|
||||
if node["color"] in color_codes:
|
||||
return color_code(node["color"]) + self._handle_text(node) + color_code("reset")
|
||||
else:
|
||||
logging.warning(f"Unknown color in node {node}")
|
||||
return self._handle_text(node)
|
||||
codes = node["color"].split(";")
|
||||
buffer = "".join(color_code(code) for code in codes)
|
||||
return buffer + self._handle_text(node) + color_code("reset")
|
||||
|
||||
def _handle_text(self, node: JSONMessagePart):
|
||||
return node.get("text", "")
|
||||
|
||||
def _handle_player_id(self, node: JSONMessagePart):
|
||||
player = node["text"]
|
||||
node["color"] = 'yellow' if player != self.ctx.slot else 'magenta'
|
||||
player = int(node["text"])
|
||||
node["color"] = 'magenta' if player == self.ctx.slot else 'yellow'
|
||||
node["text"] = self.ctx.player_names[player]
|
||||
return self._handle_color(node)
|
||||
|
||||
|
@ -178,6 +191,32 @@ class JSONtoTextParser(metaclass=HandlerMeta):
|
|||
node["color"] = 'yellow'
|
||||
return self._handle_color(node)
|
||||
|
||||
def _handle_item_name(self, node: JSONMessagePart):
|
||||
# todo: use a better info source
|
||||
from worlds.alttp.Items import progression_items
|
||||
node["color"] = 'green' if node.get("found", False) else 'cyan'
|
||||
if node["text"] in progression_items:
|
||||
node["color"] += ";white_bg"
|
||||
return self._handle_color(node)
|
||||
|
||||
def _handle_item_id(self, node: JSONMessagePart):
|
||||
item_id = int(node["text"])
|
||||
node["text"] = self.ctx.item_name_getter(item_id)
|
||||
return self._handle_item_name(node)
|
||||
|
||||
def _handle_location_name(self, node: JSONMessagePart):
|
||||
node["color"] = 'blue_bg;white'
|
||||
return self._handle_color(node)
|
||||
|
||||
def _handle_location_id(self, node: JSONMessagePart):
|
||||
item_id = int(node["text"])
|
||||
node["text"] = self.ctx.location_name_getter(item_id)
|
||||
return self._handle_item_name(node)
|
||||
|
||||
def _handle_entrance_name(self, node: JSONMessagePart):
|
||||
node["color"] = 'white_bg;black'
|
||||
return self._handle_color(node)
|
||||
|
||||
|
||||
color_codes = {'reset': 0, 'bold': 1, 'underline': 4, 'black': 30, 'red': 31, 'green': 32, 'yellow': 33, 'blue': 34,
|
||||
'magenta': 35, 'cyan': 36, 'white': 37, 'black_bg': 40, 'red_bg': 41, 'green_bg': 42, 'yellow_bg': 43,
|
||||
|
@ -190,3 +229,49 @@ def color_code(*args):
|
|||
|
||||
def color(text, *args):
|
||||
return color_code(*args) + text + color_code('reset')
|
||||
|
||||
|
||||
def add_json_text(parts: list, text: typing.Any, **kwargs) -> None:
|
||||
parts.append({"text": str(text), **kwargs})
|
||||
|
||||
|
||||
class Hint(typing.NamedTuple):
|
||||
receiving_player: int
|
||||
finding_player: int
|
||||
location: int
|
||||
item: int
|
||||
found: bool
|
||||
entrance: str = ""
|
||||
|
||||
def re_check(self, ctx, team) -> Hint:
|
||||
if self.found:
|
||||
return self
|
||||
found = self.location in ctx.location_checks[team, self.finding_player]
|
||||
if found:
|
||||
return Hint(self.receiving_player, self.finding_player, self.location, self.item, found, self.entrance)
|
||||
return self
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.receiving_player, self.finding_player, self.location, self.item, self.entrance))
|
||||
|
||||
def as_network_message(self) -> dict:
|
||||
parts = []
|
||||
add_json_text(parts, "[Hint]: ")
|
||||
add_json_text(parts, self.receiving_player, type="player_id")
|
||||
add_json_text(parts, "'s ")
|
||||
add_json_text(parts, self.item, type="item_id", found=self.found)
|
||||
add_json_text(parts, " is at ")
|
||||
add_json_text(parts, self.location, type="location_id")
|
||||
add_json_text(parts, " in ")
|
||||
add_json_text(parts, self.finding_player, type ="player_id")
|
||||
if self.entrance:
|
||||
add_json_text(parts, "'s World at ")
|
||||
add_json_text(parts, self.entrance, type="entrance_name")
|
||||
else:
|
||||
add_json_text(parts, "'s World")
|
||||
if self.found:
|
||||
add_json_text(parts, ". (found)")
|
||||
else:
|
||||
add_json_text(parts, ".")
|
||||
|
||||
return {"cmd": "PrintJSON", "text": parts, "type": "hint"}
|
23
Utils.py
23
Utils.py
|
@ -125,29 +125,6 @@ parse_yaml = safe_load
|
|||
unsafe_parse_yaml = functools.partial(load, Loader=Loader)
|
||||
|
||||
|
||||
class Hint(typing.NamedTuple):
|
||||
receiving_player: int
|
||||
finding_player: int
|
||||
location: int
|
||||
item: int
|
||||
found: bool
|
||||
entrance: str = ""
|
||||
|
||||
def re_check(self, ctx, team) -> Hint:
|
||||
if self.found:
|
||||
return self
|
||||
found = self.location in ctx.location_checks[team, self.finding_player]
|
||||
if found:
|
||||
return Hint(self.receiving_player, self.finding_player, self.location, self.item, found, self.entrance)
|
||||
return self
|
||||
|
||||
def as_legacy(self) -> tuple:
|
||||
return self.receiving_player, self.finding_player, self.location, self.item, self.found
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.receiving_player, self.finding_player, self.location, self.item, self.entrance))
|
||||
|
||||
|
||||
def get_public_ipv4() -> str:
|
||||
import socket
|
||||
import urllib.request
|
||||
|
|
|
@ -7,7 +7,7 @@ from uuid import UUID
|
|||
|
||||
from worlds.alttp import Items, Regions
|
||||
from WebHostLib import app, cache, Room
|
||||
from Utils import Hint
|
||||
from NetUtils import Hint
|
||||
|
||||
|
||||
def get_id(item_name):
|
||||
|
|
|
@ -78,7 +78,7 @@ progressive: # Enable or disable progressive items (swords, shields, bow)
|
|||
on: 50 # All items are progressive
|
||||
off: 0 # No items are progressive
|
||||
random: 0 # Randomly decides for all items. Swords could be progressive, shields might not be
|
||||
entrance_shuffle: # Documentation: https://alttpr.com/en/options#entrance_shuffle
|
||||
entrance_shuffle:
|
||||
none: 50 # Vanilla game map. All entrances and exits lead to their original locations. You probably want this option
|
||||
dungeonssimple: 0 # Shuffle just dungeons amongst each other, swapping dungeons entirely, so Hyrule Castle is always 1 dungeon
|
||||
dungeonsfull: 0 # Shuffle any dungeon entrance with any dungeon interior, so Hyrule Castle can be 4 different dungeons
|
||||
|
|
|
@ -674,6 +674,7 @@ class Sprite(object):
|
|||
rom.write_bytes(0x307000, self.palette)
|
||||
rom.write_bytes(0x307078, self.glove_palette)
|
||||
|
||||
|
||||
bonk_addresses = [0x4CF6C, 0x4CFBA, 0x4CFE0, 0x4CFFB, 0x4D018, 0x4D01B, 0x4D028, 0x4D03C, 0x4D059, 0x4D07A,
|
||||
0x4D09E, 0x4D0A8, 0x4D0AB, 0x4D0AE, 0x4D0BE, 0x4D0DD,
|
||||
0x4D16A, 0x4D1E5, 0x4D1EE, 0x4D20B, 0x4CBBF, 0x4CBBF, 0x4CC17, 0x4CC1A, 0x4CC4A, 0x4CC4D,
|
||||
|
|
Loading…
Reference in New Issue