improve handling of embedded server options:
use whitelist attempt to set, skip otherwise attempt to convert data type
This commit is contained in:
		
							parent
							
								
									fd1a25e362
								
							
						
					
					
						commit
						f56efbc9e3
					
				| 
						 | 
				
			
			@ -58,6 +58,14 @@ class Client(Endpoint):
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
class Context(Node):
 | 
			
		||||
    simple_options = {"hint_cost": int,
 | 
			
		||||
                      "location_check_points": int,
 | 
			
		||||
                      "server_password": str,
 | 
			
		||||
                      "password": str,
 | 
			
		||||
                      "forfeit_mode": str,
 | 
			
		||||
                      "item_cheat": bool,
 | 
			
		||||
                      "compatibility": int}
 | 
			
		||||
 | 
			
		||||
    def __init__(self, host: str, port: int, server_password: str, password: str, location_check_points: int,
 | 
			
		||||
                 hint_cost: int, item_cheat: bool, forfeit_mode: str = "disabled", remaining_mode: str = "disabled",
 | 
			
		||||
                 auto_shutdown: typing.SupportsFloat = 0, compatibility: int = 2):
 | 
			
		||||
| 
						 | 
				
			
			@ -131,15 +139,23 @@ class Context(Node):
 | 
			
		|||
            self._set_options(server_options)
 | 
			
		||||
 | 
			
		||||
    def _set_options(self, server_options: dict):
 | 
			
		||||
 | 
			
		||||
        sentinel = object()
 | 
			
		||||
        for key, value in server_options.items():
 | 
			
		||||
            if key not in self.embedded_blacklist:
 | 
			
		||||
                current = getattr(self, key, sentinel)
 | 
			
		||||
                if current is not sentinel:
 | 
			
		||||
                    logging.debug(f"Setting server option {key} to {value} from supplied multidata")
 | 
			
		||||
                    setattr(self, key, value)
 | 
			
		||||
        self.item_cheat = not server_options.get("disable_item_cheat", True)
 | 
			
		||||
            data_type = self.simple_options.get(key, None)
 | 
			
		||||
            if data_type is not None:
 | 
			
		||||
                if value not in {False, True, None}: # some can be boolean OR text, such as password
 | 
			
		||||
                    try:
 | 
			
		||||
                        value = data_type(value)
 | 
			
		||||
                    except Exception as e:
 | 
			
		||||
                        try:
 | 
			
		||||
                            raise Exception(f"Could not set server option {key}, skipping.") from e
 | 
			
		||||
                        except Exception as e:
 | 
			
		||||
                            logging.exception(e)
 | 
			
		||||
                logging.debug(f"Setting server option {key} to {value} from supplied multidata")
 | 
			
		||||
                setattr(self, key, value)
 | 
			
		||||
            elif key == "disable_item_cheat":
 | 
			
		||||
                self.item_cheat = not bool(value)
 | 
			
		||||
            else:
 | 
			
		||||
                logging.debug(f"Unrecognized server option {key}")
 | 
			
		||||
 | 
			
		||||
    def save(self, now=False) -> bool:
 | 
			
		||||
        if self.saving:
 | 
			
		||||
| 
						 | 
				
			
			@ -651,15 +667,6 @@ class CommandProcessor(metaclass=CommandMeta):
 | 
			
		|||
class CommonCommandProcessor(CommandProcessor):
 | 
			
		||||
    ctx: Context
 | 
			
		||||
 | 
			
		||||
    simple_options = {"hint_cost": int,
 | 
			
		||||
                      "location_check_points": int,
 | 
			
		||||
                      "server_password": str,
 | 
			
		||||
                      "password": str,
 | 
			
		||||
                      "forfeit_mode": str,
 | 
			
		||||
                      "item_cheat": bool,
 | 
			
		||||
                      "auto_save_interval": int,
 | 
			
		||||
                      "compatibility": int}
 | 
			
		||||
 | 
			
		||||
    def _cmd_countdown(self, seconds: str = "10") -> bool:
 | 
			
		||||
        """Start a countdown in seconds"""
 | 
			
		||||
        try:
 | 
			
		||||
| 
						 | 
				
			
			@ -672,7 +679,7 @@ class CommonCommandProcessor(CommandProcessor):
 | 
			
		|||
    def _cmd_options(self):
 | 
			
		||||
        """List all current options. Warning: lists password."""
 | 
			
		||||
        self.output("Current options:")
 | 
			
		||||
        for option in self.simple_options:
 | 
			
		||||
        for option in self.ctx.simple_options:
 | 
			
		||||
            if option == "server_password" and self.marker == "!":  #Do not display the server password to the client.
 | 
			
		||||
                self.output(f"Option server_password is set to {('*' * random.randint(4,16))}")
 | 
			
		||||
            else:
 | 
			
		||||
| 
						 | 
				
			
			@ -1231,7 +1238,7 @@ class ServerCommandProcessor(CommonCommandProcessor):
 | 
			
		|||
    def _cmd_option(self, option_name: str, option: str):
 | 
			
		||||
        """Set options for the server. Warning: expires on restart"""
 | 
			
		||||
 | 
			
		||||
        attrtype = self.simple_options.get(option_name, None)
 | 
			
		||||
        attrtype = self.ctx.simple_options.get(option_name, None)
 | 
			
		||||
        if attrtype:
 | 
			
		||||
            if attrtype == bool:
 | 
			
		||||
                def attrtype(input_text: str):
 | 
			
		||||
| 
						 | 
				
			
			@ -1245,7 +1252,7 @@ class ServerCommandProcessor(CommonCommandProcessor):
 | 
			
		|||
            self.output(f"Set option {option_name} to {getattr(self.ctx, option_name)}")
 | 
			
		||||
            return True
 | 
			
		||||
        else:
 | 
			
		||||
            known = (f"{option}:{otype}" for option, otype in self.simple_options.items())
 | 
			
		||||
            known = (f"{option}:{otype}" for option, otype in self.ctx.simple_options.items())
 | 
			
		||||
            self.output(f"Unrecognized Option {option_name}, known: "
 | 
			
		||||
                        f"{', '.join(known)}")
 | 
			
		||||
            return False
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue