Sample implemention of JSONtoTextParser
This commit is contained in:
parent
3d81f0cca7
commit
1da1e4eac6
|
@ -43,7 +43,6 @@ def create_named_task(coro, *args, name=None):
|
|||
return asyncio.create_task(coro, *args, name=name)
|
||||
|
||||
|
||||
coloramaparser = HTMLtoColoramaParser()
|
||||
|
||||
|
||||
class Context():
|
||||
|
@ -101,6 +100,8 @@ class Context():
|
|||
self.finished_game = False
|
||||
self.slow_mode = False
|
||||
|
||||
self.jsontotextparser = JSONtoTextParser(self)
|
||||
|
||||
@property
|
||||
def endpoints(self):
|
||||
if self.server:
|
||||
|
@ -957,8 +958,8 @@ async def process_server_cmd(ctx: Context, cmd: str, args: typing.Optional[dict]
|
|||
elif cmd == 'Print':
|
||||
logger.info(args["text"])
|
||||
|
||||
elif cmd == 'PrintHTML':
|
||||
logger.info(coloramaparser.get_colorama_text(args["text"]))
|
||||
elif cmd == 'PrintJSON':
|
||||
logger.info(ctx.jsontotextparser(args["data"]))
|
||||
|
||||
elif cmd == 'HintPointUpdate':
|
||||
ctx.hint_points = args['points']
|
||||
|
@ -1411,7 +1412,6 @@ async def main():
|
|||
help="Turn off emitting a webserver for the webbrowser based user interface.")
|
||||
args = parser.parse_args()
|
||||
logging.basicConfig(format='%(message)s', level=getattr(logging, args.loglevel.upper(), logging.INFO))
|
||||
|
||||
if args.diff_file:
|
||||
import Patch
|
||||
logging.info("Patch file was supplied. Creating sfc rom..")
|
||||
|
|
|
@ -592,8 +592,8 @@ class CommandMeta(type):
|
|||
commands = attrs["commands"] = {}
|
||||
for base in bases:
|
||||
commands.update(base.commands)
|
||||
commands.update({name[5:].lower(): method for name, method in attrs.items() if
|
||||
name.startswith("_cmd_")})
|
||||
commands.update({command_name[5:]: method for command_name, method in attrs.items() if
|
||||
command_name.startswith("_cmd_")})
|
||||
return super(CommandMeta, cls).__new__(cls, name, bases, attrs)
|
||||
|
||||
|
||||
|
|
85
NetUtils.py
85
NetUtils.py
|
@ -2,7 +2,6 @@ from __future__ import annotations
|
|||
import asyncio
|
||||
import logging
|
||||
import typing
|
||||
from html.parser import HTMLParser
|
||||
from json import loads, dumps
|
||||
|
||||
import websockets
|
||||
|
@ -55,43 +54,61 @@ class Endpoint:
|
|||
raise NotImplementedError
|
||||
|
||||
|
||||
class HTMLtoColoramaParser(HTMLParser):
|
||||
def get_colorama_text(self, input_text: str) -> str:
|
||||
self.feed(input_text)
|
||||
self.close()
|
||||
data = self.data
|
||||
self.reset()
|
||||
return data
|
||||
class HandlerMeta(type):
|
||||
def __new__(mcs, name, bases, attrs):
|
||||
handlers = attrs["handlers"] = {}
|
||||
trigger: str = "_handle_"
|
||||
for base in bases:
|
||||
handlers.update(base.commands)
|
||||
handlers.update({handler_name[len(trigger):]: method for handler_name, method in attrs.items() if
|
||||
handler_name.startswith(trigger)})
|
||||
|
||||
def handle_data(self, data):
|
||||
self.data += data
|
||||
orig_init = attrs.get('__init__', None)
|
||||
|
||||
def handle_starttag(self, tag, attrs):
|
||||
if tag in {"span", "div", "p"}:
|
||||
for attr in attrs:
|
||||
subtag, data = attr
|
||||
if subtag == "style":
|
||||
for subdata in data.split(";"):
|
||||
if subdata.startswith("color"):
|
||||
color = subdata.split(":", 1)[-1].strip()
|
||||
if color in color_codes:
|
||||
self.data += color_code(color)
|
||||
self.colored = tag
|
||||
def __init__(self, *args, **kwargs):
|
||||
# turn functions into bound methods
|
||||
self.handlers = {name: method.__get__(self, type(self)) for name, method in
|
||||
handlers.items()}
|
||||
if orig_init:
|
||||
orig_init(self, *args, **kwargs)
|
||||
|
||||
def handle_endtag(self, tag):
|
||||
if tag == self.colored:
|
||||
self.colored = False
|
||||
self.data += color_code("reset")
|
||||
attrs['__init__'] = __init__
|
||||
return super(HandlerMeta, mcs).__new__(mcs, name, bases, attrs)
|
||||
|
||||
def reset(self):
|
||||
super(HTMLtoColoramaParser, self).reset()
|
||||
self.data = ""
|
||||
self.colored = False
|
||||
|
||||
def close(self):
|
||||
super(HTMLtoColoramaParser, self).close()
|
||||
if self.colored:
|
||||
self.handle_endtag(self.colored)
|
||||
class JSONtoTextParser(metaclass=HandlerMeta):
|
||||
def __init__(self, ctx: "MultiClient.Context"):
|
||||
self.ctx = ctx
|
||||
|
||||
def __call__(self, input_object: typing.List[dict]) -> str:
|
||||
return "".join(self.handle_node(section) for section in input_object)
|
||||
|
||||
def handle_node(self, node: dict):
|
||||
type = node.get("type", None)
|
||||
handler = self.handlers.get(type, self.handlers["text"])
|
||||
return handler(node)
|
||||
|
||||
def _handle_color(self, node: dict):
|
||||
if node["color"] in color_codes:
|
||||
return color_code(node["color"]) + self._handle_text(node) + color_code("reset")
|
||||
else:
|
||||
logging.warning(f"Unknown color in node {node}")
|
||||
return self._handle_text(node)
|
||||
|
||||
def _handle_text(self, node: dict):
|
||||
return node.get("text", "")
|
||||
|
||||
def _handle_player_id(self, node: dict):
|
||||
player = node["player"]
|
||||
node["color"] = 'yellow' if player != self.ctx.slot else 'magenta'
|
||||
node["text"] = self.ctx.player_names[player]
|
||||
return self._handle_color(node)
|
||||
|
||||
# for other teams, spectators etc.? Only useful if player isn't in the clientside mapping
|
||||
def _handle_player_name(self, node: dict):
|
||||
node["color"] = 'yellow'
|
||||
return self._handle_color(node)
|
||||
|
||||
|
||||
|
||||
color_codes = {'reset': 0, 'bold': 1, 'underline': 4, 'black': 30, 'red': 31, 'green': 32, 'yellow': 33, 'blue': 34,
|
||||
|
@ -110,4 +127,4 @@ def color(text, *args):
|
|||
CLIENT_UNKNOWN = 0
|
||||
CLIENT_READY = 10
|
||||
CLIENT_PLAYING = 20
|
||||
CLIENT_GOAL = 30
|
||||
CLIENT_GOAL = 30
|
||||
|
|
Loading…
Reference in New Issue