Multidata is now able to provide items/location IDs to server.

This commit is contained in:
CaitSith2 2021-01-21 15:59:28 -08:00
parent aa22653bfc
commit f47a85d435
1 changed files with 52 additions and 27 deletions

View File

@ -113,6 +113,11 @@ class Context(Node):
self.save_dirty = False
self.tags = ['Berserker']
self.minimum_client_versions: typing.Dict[typing.Tuple[int, int], Utils.Version] = {}
self.lookup_items_id_to_name = Items.lookup_id_to_name
self.lookup_region_id_to_name = Regions.lookup_id_to_name
self.lookup_region_name_to_id = Regions.lookup_name_to_id
self.console_names = console_names
self.item_name_groups = Items.item_name_groups
def load(self, multidatapath: str, use_embedded_server_options: bool = False):
with open(multidatapath, 'rb') as f:
@ -150,6 +155,29 @@ class Context(Node):
server_options = jsonobj.get("server_options", {})
self._set_options(server_options)
new_console_names = set()
lookups = {"lookup_items_id_to_name": False, "lookup_region_id_to_name": False, "item_name_groups": False}
if "lookup_items_id_to_name" in jsonobj:
lookups["lookup_items_id_to_name"] = True
self.lookup_items_id_to_name = jsonobj["lookup_items_id_to_name"]
new_console_names |= set(self.lookup_items_id_to_name.values())
if "lookup_region_id_to_name" in jsonobj:
lookups["lookup_region_id_to_name"] = True
self.lookup_region_id_to_name = jsonobj["lookup_region_id_to_name"]
self.lookup_region_name_to_id = {value: key for key, value in self.lookup_region_id_to_name.items()}
new_console_names |= set(self.lookup_region_id_to_name.values())
if "item_name_groups" in jsonobj:
lookups["item_name_groups"] = True
self.item_name_groups = {key: set(value) for key, value in jsonobj["item_name_groups"]}
new_console_names |= set(self.item_name_groups.keys())
if not all(lookups.values()):
new_console_names |= self.console_names
self.console_names = frozenset(new_console_names)
def _set_options(self, server_options: dict):
for key, value in server_options.items():
data_type = self.simple_options.get(key, None)
@ -553,7 +581,7 @@ def collect_hints(ctx: Context, team: int, slot: int, item: str) -> typing.List[
def collect_hints_location(ctx: Context, team: int, slot: int, location: str) -> typing.List[Utils.Hint]:
hints = []
seeked_location = Regions.lookup_name_to_id[location]
seeked_location = ctx.lookup_region_name_to_id[location]
for check, result in ctx.locations.items():
location_id, finding_player = check
if finding_player == slot and location_id == seeked_location:
@ -567,7 +595,7 @@ def collect_hints_location(ctx: Context, team: int, slot: int, location: str) ->
def format_hint(ctx: Context, team: int, hint: Utils.Hint) -> str:
text = f"[Hint]: {ctx.player_names[team, hint.receiving_player]}'s " \
f"{Items.lookup_id_to_name[hint.item]} is " \
f"{ctx.lookup_items_id_to_name[hint.item]} is " \
f"at {get_location_name_from_address(hint.location)} " \
f"in {ctx.player_names[team, hint.finding_player]}'s World"
@ -576,7 +604,7 @@ def format_hint(ctx: Context, team: int, hint: Utils.Hint) -> str:
return text + (". (found)" if hint.found else ".")
def get_intended_text(input_text: str, possible_answers: typing.Iterable[str]= console_names) -> typing.Tuple[str, bool, str]:
def get_intended_text(input_text: str, possible_answers: typing.Iterable[str]) -> typing.Tuple[str, bool, str]:
picks = fuzzy_process.extract(input_text, possible_answers, limit=2)
if len(picks) > 1:
dif = picks[0][1] - picks[1][1]
@ -801,29 +829,26 @@ class ClientMessageProcessor(CommonCommandProcessor):
"Your client is too old to send game beaten information. Please update, load you savegame and reconnect.")
return False
def remaining_items(self) -> bool:
remaining_item_ids = get_remaining(self.ctx, self.client.team, self.client.slot)
if remaining_item_ids:
self.output("Remaining items: " + ", ".join(self.ctx.lookup_items_id_to_name.get(item_id, "unknown item")
for item_id in remaining_item_ids))
else:
self.output("No remaining items found.")
return True
def _cmd_remaining(self) -> bool:
"""List remaining items in your game, but not their location or recipient"""
if self.ctx.remaining_mode == "enabled":
remaining_item_ids = get_remaining(self.ctx, self.client.team, self.client.slot)
if remaining_item_ids:
self.output("Remaining items: " + ", ".join(Items.lookup_id_to_name.get(item_id, "unknown item")
for item_id in remaining_item_ids))
else:
self.output("No remaining items found.")
return True
return self.remaining_items()
elif self.ctx.remaining_mode == "disabled":
self.output(
"Sorry, !remaining has been disabled on this server.")
return False
else: # is goal
if self.ctx.client_game_state[self.client.team, self.client.slot] == CLIENT_GOAL:
remaining_item_ids = get_remaining(self.ctx, self.client.team, self.client.slot)
if remaining_item_ids:
self.output("Remaining items: " + ", ".join(Items.lookup_id_to_name.get(item_id, "unknown item")
for item_id in remaining_item_ids))
else:
self.output("No remaining items found.")
return True
return self.remaining_items()
else:
self.output(
"Sorry, !remaining requires you to have beaten the game on this server")
@ -873,7 +898,7 @@ class ClientMessageProcessor(CommonCommandProcessor):
def _cmd_getitem(self, item_name: str) -> bool:
"""Cheat in an item, if it is enabled on this server"""
if self.ctx.item_cheat:
item_name, usable, response = get_intended_text(item_name, Items.item_table.keys())
item_name, usable, response = get_intended_text(item_name, self.ctx.lookup_items_id_to_name.values())
if usable:
new_item = ReceivedItem(Items.item_table[item_name][3], -1, self.client.slot)
get_received_items(self.ctx, self.client.team, self.client.slot).append(new_item)
@ -900,14 +925,14 @@ class ClientMessageProcessor(CommonCommandProcessor):
notify_hints(self.ctx, self.client.team, list(hints))
return True
else:
item_name, usable, response = get_intended_text(item_or_location)
item_name, usable, response = get_intended_text(item_or_location, self.ctx.console_names)
if usable:
if item_name in Items.hint_blacklist:
self.output(f"Sorry, \"{item_name}\" is marked as non-hintable.")
hints = []
elif item_name in Items.item_name_groups:
elif item_name in self.ctx.item_name_groups:
hints = []
for item in Items.item_name_groups[item_name]:
for item in self.ctx.item_name_groups[item_name]:
hints.extend(collect_hints(self.ctx, self.client.team, self.client.slot, item))
elif item_name in Items.item_table: # item name
hints = collect_hints(self.ctx, self.client.team, self.client.slot, item_name)
@ -968,14 +993,14 @@ class ClientMessageProcessor(CommonCommandProcessor):
def get_checked_checks(ctx: Context, client: Client) -> list:
return [Regions.lookup_id_to_name.get(location_id, f'Unknown Location ID: {location_id}') for
return [ctx.lookup_region_id_to_name.get(location_id, f'Unknown Location ID: {location_id}') for
location_id, slot in ctx.locations if
slot == client.slot and
location_id in ctx.location_checks[client.team, client.slot]]
def get_missing_checks(ctx: Context, client: Client) -> list:
return [Regions.lookup_id_to_name.get(location_id, f'Unknown Location ID: {location_id}') for
return [ctx.lookup_region_id_to_name.get(location_id, f'Unknown Location ID: {location_id}') for
location_id, slot in ctx.locations if
slot == client.slot and
location_id not in ctx.location_checks[client.team, client.slot]]
@ -1224,7 +1249,7 @@ class ServerCommandProcessor(CommonCommandProcessor):
seeked_player, usable, response = get_intended_text(player_name, self.ctx.player_names.values())
if usable:
item = " ".join(item_name)
item, usable, response = get_intended_text(item, Items.item_table.keys())
item, usable, response = get_intended_text(item, self.ctx.lookup_items_id_to_name.values())
if usable:
for client in self.ctx.endpoints:
if client.name == seeked_player:
@ -1247,11 +1272,11 @@ class ServerCommandProcessor(CommonCommandProcessor):
for (team, slot), name in self.ctx.player_names.items():
if name == seeked_player:
item = " ".join(item_or_location)
item, usable, response = get_intended_text(item)
item, usable, response = get_intended_text(item, self.ctx.console_names)
if usable:
if item in Items.item_name_groups:
if item in self.ctx.item_name_groups:
hints = []
for item in Items.item_name_groups[item]:
for item in self.ctx.item_name_groups[item]:
hints.extend(collect_hints(self.ctx, team, slot, item))
elif item in Items.item_table: # item name
hints = collect_hints(self.ctx, team, slot, item)