implement fuzzy text matching

This commit is contained in:
Fabian Dill 2020-02-17 13:57:48 +01:00
parent 95aea8c4b4
commit 81e83ae65d
2 changed files with 89 additions and 54 deletions

View File

@ -3,8 +3,6 @@ import asyncio
import functools import functools
import json import json
import logging import logging
import re
import shlex
import urllib.request import urllib.request
import zlib import zlib
import collections import collections
@ -15,12 +13,15 @@ ModuleUpdate.update()
import websockets import websockets
import aioconsole import aioconsole
from fuzzywuzzy import process as fuzzy_process
import Items import Items
import Regions import Regions
import Utils import Utils
from MultiClient import ReceivedItem, get_item_name_from_id, get_location_name_from_address from MultiClient import ReceivedItem, get_item_name_from_id, get_location_name_from_address
console_names = frozenset(set(Items.item_table) | set(Regions.location_table))
class Client: class Client:
version: typing.List[int] = [0, 0, 0] version: typing.List[int] = [0, 0, 0]
@ -261,7 +262,7 @@ def save(ctx: Context):
def collect_hints(ctx: Context, team: int, slot: int, item: str) -> typing.List[Utils.Hint]: def collect_hints(ctx: Context, team: int, slot: int, item: str) -> typing.List[Utils.Hint]:
hints = [] hints = []
seeked_item_id = Items.lookup_lower_name_to_id[item] seeked_item_id = Items.lookup_lower_name_to_id[item.lower()]
for check, result in ctx.locations.items(): for check, result in ctx.locations.items():
item_id, receiving_player = result item_id, receiving_player = result
if receiving_player == slot and item_id == seeked_item_id: if receiving_player == slot and item_id == seeked_item_id:
@ -273,7 +274,7 @@ def collect_hints(ctx: Context, team: int, slot: int, item: str) -> typing.List[
def collect_hints_location(ctx: Context, team: int, slot: int, location: str) -> typing.List[Utils.Hint]: def collect_hints_location(ctx: Context, team: int, slot: int, location: str) -> typing.List[Utils.Hint]:
hints = [] hints = []
location = Regions.lookup_lower_name_to_name[location] location = Regions.lookup_lower_name_to_name[location.lower()]
seeked_location = Regions.location_table[location][0] seeked_location = Regions.location_table[location][0]
for check, result in ctx.locations.items(): for check, result in ctx.locations.items():
location_id, finding_player = check location_id, finding_player = check
@ -291,6 +292,17 @@ def format_hint(ctx: Context, team: int, hint: Utils.Hint) -> str:
f"in {ctx.player_names[team, hint.finding_player]}'s World." \ f"in {ctx.player_names[team, hint.finding_player]}'s World." \
+ (" (found)" if hint.found else "") + (" (found)" if hint.found else "")
def get_intended_text(input_text: str, possible_answers: typing.Iterable[str]= console_names) -> typing.Tuple[str, bool, str]:
picks = fuzzy_process.extract(input_text, possible_answers, limit=2)
dif = picks[0][1] - picks[1][1]
if picks[0][1] == 100:
return picks[0][0], True, "Perfect Match"
elif picks[0][1] < 75:
return picks[0][0], False, f"Didn't find something that closely matches, did you mean {picks[0][0]}?"
elif dif > 5:
return picks[0][0], True, "Close Match"
else:
return picks[0][0], False, f"Too many close matches, did you mean {picks[0][0]}?"
async def process_client_cmd(ctx: Context, client: Client, cmd, args): async def process_client_cmd(ctx: Context, client: Client, cmd, args):
if type(cmd) is not str: if type(cmd) is not str:
@ -392,40 +404,47 @@ async def process_client_cmd(ctx: Context, client: Client, cmd, args):
points_available = ctx.location_check_points * len(ctx.location_checks[client.team, client.slot]) - \ points_available = ctx.location_check_points * len(ctx.location_checks[client.team, client.slot]) - \
ctx.hint_cost * ctx.hints_used[client.team, client.slot] ctx.hint_cost * ctx.hints_used[client.team, client.slot]
item_name = args[6:].lower() item_name = args[6:].lower()
hints = []
if not item_name: if not item_name:
notify_client(client, "Use !hint {item_name/location_name}, " notify_client(client, "Use !hint {item_name/location_name}, "
"for example !hint Lamp or !hint Link's House. " "for example !hint Lamp or !hint Link's House. "
f"A hint costs {ctx.hint_cost} points. " f"A hint costs {ctx.hint_cost} points. "
f"You have {points_available} points.") f"You have {points_available} points.")
elif item_name in Items.lookup_lower_name_to_id:
hints = collect_hints(ctx, client.team, client.slot, item_name)
elif item_name in Regions.lookup_lower_name_to_name:
hints = collect_hints_location(ctx, client.team, client.slot, item_name)
else: else:
notify_client(client, f'Item/location "{item_name}" not found.') item_name, usable, response = get_intended_text(item_name)
if usable:
if item_name in Items.item_table: # item name
hints = collect_hints(ctx, client.team, client.slot, item_name)
else: # location name
hints = collect_hints_location(ctx, client.team, client.slot, item_name)
if hints: if hints:
found = 0 found = 0
for hint in hints: for hint in hints:
found += 1 - hint.found found += 1 - hint.found
if not found: if not found:
notify_hints(ctx, client.team, hints) notify_hints(ctx, client.team, hints)
notify_client(client, "No new items found, points refunded.") notify_client(client, "No new items found, points refunded.")
else:
if ctx.hint_cost:
can_pay = points_available // (ctx.hint_cost * found) >= 1
else:
can_pay = True
if can_pay:
ctx.hints_used[client.team, client.slot] += found
notify_hints(ctx, client.team, hints)
save(ctx)
else:
notify_client(client, f"You can't afford the hint. "
f"You have {points_available} points and need at least {ctx.hint_cost}, "
f"more if multiple items are still to be found.")
else:
notify_client(client, "Nothing found. Item/Location may not exist.")
else: else:
if ctx.hint_cost: notify_client(client, response)
can_pay = points_available // (ctx.hint_cost * found) >= 1
else:
can_pay = True
if can_pay:
ctx.hints_used[client.team, client.slot] += found
notify_hints(ctx, client.team, hints)
save(ctx)
else:
notify_client(client, f"You can't afford the hint. "
f"You have {points_available} points and need at least {ctx.hint_cost}, "
f"more if multiple items are still to be found.")
def set_password(ctx : Context, password): def set_password(ctx : Context, password):
ctx.password = password ctx.password = password
@ -468,32 +487,47 @@ async def console(ctx : Context):
for (team, slot), name in ctx.player_names.items(): for (team, slot), name in ctx.player_names.items():
if name.lower() == seeked_player: if name.lower() == seeked_player:
forfeit_player(ctx, team, slot) forfeit_player(ctx, team, slot)
if command[0] == '/senditem' and len(command) > 2: if command[0] == '/senditem':
[(player, item)] = re.findall(r'\S* (\S*) (.*)', input) if len(command) <= 2:
item = item.lower() logging.info("Use /senditem {Playername} {itemname}\nFor example /senditem Berserker Lamp")
if item in Items.lookup_lower_name_to_id:
for client in ctx.clients:
if client.name.lower() == player.lower():
new_item = ReceivedItem(Items.lookup_lower_name_to_name[item], "cheat console", client.slot)
get_received_items(ctx, client.team, client.slot).append(new_item)
notify_all(ctx, 'Cheat console: sending "' + item + '" to ' + client.name)
send_new_items(ctx)
else: else:
logging.warning("Unknown item: " + item) seeked_player, usable, response = get_intended_text(command[1], ctx.player_names.values())
if command[0] == '/hint': if usable:
for (team, slot), name in ctx.player_names.items(): item = " ".join(command[2:])
if len(command) == 1: item, usable, response = get_intended_text(item, Items.item_table.keys())
logging.info("Use /hint {Playername} {itemname/locationname}\nFor example /hint Berserker Lamp") if usable:
elif name.lower() == command[1].lower(): for client in ctx.clients:
item = " ".join(command[2:]).lower() if client.name == seeked_player:
if item in Items.lookup_lower_name_to_id: #item name new_item = ReceivedItem(item, "cheat console", client.slot)
hints = collect_hints(ctx, team, slot, item) get_received_items(ctx, client.team, client.slot).append(new_item)
notify_hints(ctx, team, hints) notify_all(ctx, 'Cheat console: sending "' + item + '" to ' + client.name)
elif item in Regions.lookup_lower_name_to_name: #location name send_new_items(ctx)
hints = collect_hints_location(ctx, team, slot, item)
notify_hints(ctx, team, hints)
else: else:
logging.warning("Unknown item/location: " + item) logging.warning(response)
else:
logging.warning(response)
if command[0] == '/hint':
if len(command) <= 2:
logging.info("Use /hint {Playername} {itemname/locationname}\nFor example /hint Berserker Lamp")
else:
seeked_player, usable, response = get_intended_text(command[1], ctx.player_names.values())
if usable:
for (team, slot), name in ctx.player_names.items():
if name == seeked_player:
item = " ".join(command[2:])
item, usable, response = get_intended_text(item)
if usable:
if item in Items.item_table: #item name
hints = collect_hints(ctx, team, slot, item)
notify_hints(ctx, team, hints)
else: #location name
hints = collect_hints_location(ctx, team, slot, item)
notify_hints(ctx, team, hints)
else:
logging.warning(response)
else:
logging.warning(response)
if command[0][0] != '/': if command[0][0] != '/':
notify_all(ctx, '[Server]: ' + input) notify_all(ctx, '[Server]: ' + input)
except: except:

View File

@ -3,3 +3,4 @@ colorama>=0.4.3
websockets>=8.1 websockets>=8.1
PyYAML>=5.3 PyYAML>=5.3
collections_extended>=1.0.3 collections_extended>=1.0.3
fuzzywuzzy>=0.18.0