server command processor

some commands were renamed at this opportunity
This commit is contained in:
Fabian Dill 2020-04-13 11:26:50 +02:00
parent 34df144f8c
commit 8b8e279015
1 changed files with 133 additions and 81 deletions

View File

@ -6,6 +6,7 @@ import logging
import zlib
import collections
import typing
import inspect
import ModuleUpdate
@ -66,6 +67,7 @@ class Context:
self.hints_used = collections.defaultdict(int)
self.hints_sent = collections.defaultdict(set)
self.item_cheat = item_cheat
self.running = True
def get_save(self) -> dict:
return {
@ -124,7 +126,7 @@ def notify_client(client: Client, text: str):
asyncio.create_task(send_msgs(client.socket, [['Print', text]]))
# separated out, due to compatibilty between client's
# separated out, due to compatibilty between clients
def notify_hints(ctx: Context, team: int, hints: typing.List[Utils.Hint]):
cmd = [["Hint", hints]]
texts = [['Print', format_hint(ctx, team, hint)] for hint in hints]
@ -499,94 +501,145 @@ async def process_client_cmd(ctx: Context, client: Client, cmd, args):
notify_client(client, response)
def set_password(ctx : Context, password):
def set_password(ctx: Context, password):
ctx.password = password
logging.warning('Password set to ' + password if password else 'Password disabled')
class CommandProcessor():
commands: typing.Dict[str, typing.Callable]
def __init__(self):
self.commands = {name[5:].lower(): method for name, method in inspect.getmembers(self) if
name.startswith("_cmd_")}
def output(self, text: str):
print(text)
def __call__(self, raw: str):
if not raw:
return
command = raw.split()
basecommand = command[0]
if basecommand[0] == "/":
method = self.commands.get(basecommand[1:].lower(), None)
if not method:
self._error_unknown_command(basecommand[1:])
else:
method(*command[1:])
else:
self.default(raw)
def get_help_text(self) -> str:
s = ""
for command, method in self.commands.items():
spec = inspect.signature(method).parameters
s += f"/{command} {' '.join(spec)}\n {method.__doc__}\n"
return s
def _cmd_help(self):
"""Returns the help listing"""
self.output(self.get_help_text())
def default(self, raw: str):
self.output("Echo: " + raw)
def _error_unknown_command(self, raw: str):
self.output(f"Could not find command {raw}. Known commands: {', '.join(self.commands)}")
class ServerCommandProcessor(CommandProcessor):
ctx: Context
def __init__(self, ctx: Context):
self.ctx = ctx
super(ServerCommandProcessor, self).__init__()
def default(self, raw: str):
notify_all(self.ctx, '[Server]: ' + raw)
def _cmd_kick(self, player_name: str):
"""Kick specified player from the server"""
for client in self.ctx.clients:
if client.auth and client.name.lower() == player_name.lower() and client.socket and not client.socket.closed:
asyncio.create_task(client.socket.close())
self.output(f"Kicked {client.name}")
break
else:
self.output("Could not find player to kick")
def _cmd_players(self):
"""Get information about connected players"""
self.output(get_connected_players_string(self.ctx))
def _cmd_exit(self):
"""Shutdown the server"""
asyncio.create_task(self.ctx.server.ws_server._close())
self.ctx.running = False
def _cmd_password(self, new_password: str = ""):
"""Set the server password. Leave the password text empty to remove the password"""
set_password(self.ctx, new_password if new_password else None)
def _cmd_forfeit(self, player_name: str):
"""Send out the remaining items from a player's game to their intended recipients"""
seeked_player = player_name.lower()
for (team, slot), name in self.ctx.player_names.items():
if name.lower() == seeked_player:
forfeit_player(self.ctx, team, slot)
break
else:
self.output("Could not find player to forfeit")
def _cmd_send(self, player_name: str, *item_name: str):
"""Sends an item to the specified player"""
seeked_player, usable, response = get_intended_text(player_name, self.ctx.player_names.values())
if usable:
item = " ".join(item_name)
item, usable, response = get_intended_text(item, Items.item_table.keys())
if usable:
for client in self.ctx.clients:
if client.name == seeked_player:
new_item = ReceivedItem(Items.item_table[item][3], -1, client.slot)
get_received_items(self.ctx, client.team, client.slot).append(new_item)
notify_all(self.ctx, 'Cheat console: sending "' + item + '" to ' + client.name)
send_new_items(self.ctx)
return
else:
self.output(response)
else:
self.output(response)
def _cmd_hint(self, player_name: str, *item_or_location: str):
"""Send out a hint for a player's item or location to their team"""
seeked_player, usable, response = get_intended_text(player_name, self.ctx.player_names.values())
if usable:
for (team, slot), name in self.ctx.player_names.items():
if name == seeked_player:
item = " ".join(item_or_location)
item, usable, response = get_intended_text(item)
if usable:
if item in Items.item_table: # item name
hints = collect_hints(self.ctx, team, slot, item)
notify_hints(self.ctx, team, hints)
else: # location name
hints = collect_hints_location(self.ctx, team, slot, item)
notify_hints(self.ctx, team, hints)
else:
self.output(response)
return
else:
self.output(response)
async def console(ctx: Context):
session = prompt_toolkit.PromptSession()
running = True
while running:
cmd_processor = ServerCommandProcessor(ctx)
while ctx.running:
with patch_stdout():
input_text = await session.prompt_async()
try:
command = input_text.split()
if not command:
continue
if command[0] == '/exit':
await ctx.server.ws_server._close()
running = False
if command[0] == '/players':
logging.info(get_connected_players_string(ctx))
if command[0] == '/password':
set_password(ctx, command[1] if len(command) > 1 else None)
if command[0] == '/kick' and len(command) > 1:
team = int(command[2]) - 1 if len(command) > 2 and command[2].isdigit() else None
for client in ctx.clients:
if client.auth and client.name.lower() == command[1].lower() and (team is None or team == client.team):
if client.socket and not client.socket.closed:
await client.socket.close()
if command[0] == '/forfeitslot' and len(command) > 1 and command[1].isdigit():
if len(command) > 2 and command[2].isdigit():
team = int(command[1]) - 1
slot = int(command[2])
else:
team = 0
slot = int(command[1])
forfeit_player(ctx, team, slot)
if command[0] == '/forfeitplayer' and len(command) > 1:
seeked_player = command[1].lower()
for (team, slot), name in ctx.player_names.items():
if name.lower() == seeked_player:
forfeit_player(ctx, team, slot)
if command[0] == '/senditem':
if len(command) <= 2:
logging.info("Use /senditem {Playername} {itemname}\nFor example /senditem Berserker Lamp")
else:
seeked_player, usable, response = get_intended_text(command[1], ctx.player_names.values())
if usable:
item = " ".join(command[2:])
item, usable, response = get_intended_text(item, Items.item_table.keys())
if usable:
for client in ctx.clients:
if client.name == seeked_player:
new_item = ReceivedItem(Items.item_table[item][3], -1, client.slot)
get_received_items(ctx, client.team, client.slot).append(new_item)
notify_all(ctx, 'Cheat console: sending "' + item + '" to ' + client.name)
send_new_items(ctx)
else:
logging.warning(response)
else:
logging.warning(response)
if command[0] == '/hint':
if len(command) <= 2:
logging.info("Use /hint {Playername} {itemname/locationname}\nFor example /hint Berserker Lamp")
else:
seeked_player, usable, response = get_intended_text(command[1], ctx.player_names.values())
if usable:
for (team, slot), name in ctx.player_names.items():
if name == seeked_player:
item = " ".join(command[2:])
item, usable, response = get_intended_text(item)
if usable:
if item in Items.item_table: #item name
hints = collect_hints(ctx, team, slot, item)
notify_hints(ctx, team, hints)
else: #location name
hints = collect_hints_location(ctx, team, slot, item)
notify_hints(ctx, team, hints)
else:
logging.warning(response)
else:
logging.warning(response)
if command[0][0] != '/':
notify_all(ctx, '[Server]: ' + input_text)
cmd_processor(input_text)
except:
import traceback
traceback.print_exc()
@ -702,4 +755,3 @@ async def main(args: argparse.Namespace):
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main(parse_args()))
loop.close()