857 lines
33 KiB
Python
857 lines
33 KiB
Python
"""
|
|
Application settings / host.yaml interface using type hints.
|
|
This is different from player settings.
|
|
"""
|
|
|
|
import os.path
|
|
import shutil
|
|
import sys
|
|
import typing
|
|
import warnings
|
|
from enum import IntEnum
|
|
from threading import Lock
|
|
from typing import cast, Any, BinaryIO, ClassVar, Dict, Iterator, List, Optional, TextIO, Tuple, Union, TypeVar
|
|
import os
|
|
|
|
__all__ = [
|
|
"get_settings", "fmt_doc", "no_gui",
|
|
"Group", "Bool", "Path", "UserFilePath", "UserFolderPath", "LocalFilePath", "LocalFolderPath",
|
|
"OptionalUserFilePath", "OptionalUserFolderPath", "OptionalLocalFilePath", "OptionalLocalFolderPath",
|
|
"GeneralOptions", "ServerOptions", "GeneratorOptions", "SNIOptions", "Settings"
|
|
]
|
|
|
|
no_gui = False
|
|
skip_autosave = False
|
|
_world_settings_name_cache: Dict[str, str] = {} # TODO: cache on disk and update when worlds change
|
|
_world_settings_name_cache_updated = False
|
|
_lock = Lock()
|
|
|
|
|
|
def _update_cache() -> None:
|
|
"""Load all worlds and update world_settings_name_cache"""
|
|
global _world_settings_name_cache_updated
|
|
if _world_settings_name_cache_updated:
|
|
return
|
|
|
|
try:
|
|
from worlds.AutoWorld import AutoWorldRegister
|
|
for world in AutoWorldRegister.world_types.values():
|
|
annotation = world.__annotations__.get("settings", None)
|
|
if annotation is None or annotation == "ClassVar[Optional['Group']]":
|
|
continue
|
|
_world_settings_name_cache[world.settings_key] = f"{world.__module__}.{world.__name__}"
|
|
finally:
|
|
_world_settings_name_cache_updated = True
|
|
|
|
|
|
def fmt_doc(cls: type, level: int) -> str:
|
|
comment = cls.__doc__
|
|
assert comment, f"{cls} has no __doc__"
|
|
indent = level * 2 * " "
|
|
return "\n".join(map(lambda s: f"{indent}# {s}", filter(None, map(lambda s: s.strip(), comment.split("\n")))))
|
|
|
|
|
|
class Group:
|
|
_type_cache: ClassVar[Optional[Dict[str, Any]]] = None
|
|
_dumping: bool = False
|
|
_has_attr: bool = False
|
|
_changed: bool = False
|
|
_dumper: ClassVar[type]
|
|
|
|
def __getitem__(self, key: str) -> Any:
|
|
try:
|
|
return getattr(self, key)
|
|
except NameError:
|
|
raise KeyError(key)
|
|
|
|
def __iter__(self) -> Iterator[str]:
|
|
cls_members = dir(self.__class__)
|
|
members = filter(lambda k: not k.startswith("_") and (k not in cls_members or k in self.__annotations__),
|
|
list(self.__annotations__) +
|
|
[name for name in dir(self) if name not in self.__annotations__])
|
|
return members.__iter__()
|
|
|
|
def __contains__(self, key: str) -> bool:
|
|
try:
|
|
self._has_attr = True
|
|
return hasattr(self, key)
|
|
finally:
|
|
self._has_attr = False
|
|
|
|
def __setitem__(self, key: str, value: Any) -> None:
|
|
setattr(self, key, value)
|
|
|
|
def __getattribute__(self, item: str) -> Any:
|
|
attr = super().__getattribute__(item)
|
|
if isinstance(attr, Path) and not super().__getattribute__("_dumping"):
|
|
if attr.required and not attr.exists() and not super().__getattribute__("_has_attr"):
|
|
# if a file is required, and the one from settings does not exist, ask the user to provide it
|
|
# unless we are dumping the settings, because that would ask for each entry
|
|
with _lock: # lock to avoid opening multiple
|
|
new = None if no_gui else attr.browse()
|
|
if new is None:
|
|
raise FileNotFoundError(f"{attr} does not exist, but "
|
|
f"{self.__class__.__name__}.{item} is required")
|
|
setattr(self, item, new)
|
|
self._changed = True
|
|
attr = new
|
|
# resolve the path immediately when accessing it
|
|
return attr.__class__(attr.resolve())
|
|
return attr
|
|
|
|
@property
|
|
def changed(self) -> bool:
|
|
return self._changed or any(map(lambda v: isinstance(v, Group) and v.changed,
|
|
self.__dict__.values()))
|
|
|
|
@classmethod
|
|
def get_type_hints(cls) -> Dict[str, Any]:
|
|
"""Returns resolved type hints for the class"""
|
|
if cls._type_cache is None:
|
|
if not isinstance(next(iter(cls.__annotations__.values())), str):
|
|
# non-str: assume already resolved
|
|
cls._type_cache = cls.__annotations__
|
|
else:
|
|
# str: build dicts and resolve with eval
|
|
mod = sys.modules[cls.__module__] # assume the module wasn't deleted
|
|
mod_dict = {k: getattr(mod, k) for k in dir(mod)}
|
|
cls._type_cache = typing.get_type_hints(cls, globalns=mod_dict, localns=cls.__dict__)
|
|
return cls._type_cache
|
|
|
|
def get(self, key: str, default: Any = None) -> Any:
|
|
if key in self:
|
|
return self[key]
|
|
return default
|
|
|
|
def items(self) -> List[Tuple[str, Any]]:
|
|
return [(key, getattr(self, key)) for key in self]
|
|
|
|
def update(self, dct: Dict[str, Any]) -> None:
|
|
assert isinstance(dct, dict), f"{self.__class__.__name__}.update called with " \
|
|
f"{dct.__class__.__name__} instead of dict."
|
|
|
|
for k in self.__annotations__:
|
|
if not k.startswith("_") and k not in dct:
|
|
self._changed = True # key missing from host.yaml
|
|
|
|
for k, v in dct.items():
|
|
# don't do getattr to stay lazy with world group init/loading
|
|
# instead we assign unknown groups as dicts and a later getattr will upcast them
|
|
attr = self.__dict__[k] if k in self.__dict__ else \
|
|
self.__class__.__dict__[k] if k in self.__class__.__dict__ else None
|
|
if isinstance(attr, Group):
|
|
# update group
|
|
if k not in self.__dict__:
|
|
attr = attr.__class__() # make a copy of default
|
|
setattr(self, k, attr)
|
|
if isinstance(v, dict):
|
|
attr.update(v)
|
|
else:
|
|
warnings.warn(f"{self.__class__.__name__}.{k} "
|
|
f"tried to update Group from {type(v)}")
|
|
elif isinstance(attr, dict):
|
|
# update dict
|
|
if k not in self.__dict__:
|
|
attr = attr.copy() # make a copy of default
|
|
setattr(self, k, attr)
|
|
if isinstance(v, dict):
|
|
attr.update(v)
|
|
else:
|
|
warnings.warn(f"{self.__class__.__name__}.{k} "
|
|
f"tried to update dict from {type(v)}")
|
|
else:
|
|
# assign value, try to upcast to type hint
|
|
annotation = self.get_type_hints().get(k, None)
|
|
candidates = [] if annotation is None else \
|
|
typing.get_args(annotation) if typing.get_origin(annotation) is Union else [annotation]
|
|
none_type = type(None)
|
|
for cls in candidates:
|
|
assert isinstance(cls, type), f"{self.__class__.__name__}.{k}: type {cls} not supported in settings"
|
|
if v is None and cls is none_type:
|
|
# assign None, i.e. from Optional
|
|
setattr(self, k, v)
|
|
break
|
|
if cls is bool and isinstance(v, bool):
|
|
# assign bool - special handling because issubclass(int, bool) is True
|
|
setattr(self, k, v)
|
|
break
|
|
if cls is not bool and issubclass(cls, type(v)):
|
|
# upcast, i.e. int -> IntEnum, str -> Path
|
|
setattr(self, k, cls.__call__(v))
|
|
break
|
|
if issubclass(cls, (tuple, set)) and isinstance(v, list):
|
|
# convert or upcast from list
|
|
setattr(self, k, cls.__call__(v))
|
|
break
|
|
else:
|
|
# assign scalar and hope for the best
|
|
setattr(self, k, v)
|
|
if annotation:
|
|
warnings.warn(f"{self.__class__.__name__}.{k} "
|
|
f"assigned from incompatible type {type(v).__name__}")
|
|
|
|
def as_dict(self, *args: str, downcast: bool = True) -> Dict[str, Any]:
|
|
return {
|
|
name: _to_builtin(cast(object, getattr(self, name))) if downcast else getattr(self, name)
|
|
for name in self if not args or name in args
|
|
}
|
|
|
|
@classmethod
|
|
def _dump_value(cls, value: Any, f: TextIO, indent: str) -> None:
|
|
"""Write a single yaml line to f"""
|
|
from Utils import dump, Dumper as BaseDumper
|
|
yaml_line: str = dump(value, Dumper=cast(BaseDumper, cls._dumper))
|
|
assert yaml_line.count("\n") == 1, f"Unexpected input for yaml dumper: {value}"
|
|
f.write(f"{indent}{yaml_line}")
|
|
|
|
@classmethod
|
|
def _dump_item(cls, name: Optional[str], attr: object, f: TextIO, level: int) -> None:
|
|
"""Write a group, dict or sequence item to f, where attr can be a scalar or a collection"""
|
|
|
|
# lazy construction of yaml Dumper to avoid loading Utils early
|
|
from Utils import Dumper as BaseDumper
|
|
from yaml import ScalarNode, MappingNode
|
|
if not hasattr(cls, "_dumper"):
|
|
if cls is Group or not hasattr(Group, "_dumper"):
|
|
class Dumper(BaseDumper):
|
|
def represent_mapping(self, tag: str, mapping: Any, flow_style: Any = None) -> MappingNode:
|
|
from yaml import ScalarNode
|
|
res: MappingNode = super().represent_mapping(tag, mapping, flow_style)
|
|
pairs = cast(List[Tuple[ScalarNode, Any]], res.value)
|
|
for k, v in pairs:
|
|
k.style = None # remove quotes from keys
|
|
return res
|
|
|
|
def represent_str(self, data: str) -> ScalarNode:
|
|
# default double quote all strings
|
|
return self.represent_scalar("tag:yaml.org,2002:str", data, style='"')
|
|
|
|
Dumper.add_representer(str, Dumper.represent_str)
|
|
Group._dumper = Dumper
|
|
if cls is not Group:
|
|
cls._dumper = Group._dumper
|
|
|
|
indent = " " * level
|
|
start = f"{indent}-\n" if name is None else f"{indent}{name}:\n"
|
|
if isinstance(attr, Group):
|
|
# handle group
|
|
f.write(start)
|
|
attr.dump(f, level=level+1)
|
|
elif isinstance(attr, (list, tuple, set)) and attr:
|
|
# handle non-empty sequence; empty use one-line [] syntax
|
|
f.write(start)
|
|
for value in attr:
|
|
cls._dump_item(None, value, f, level=level + 1)
|
|
elif isinstance(attr, dict) and attr:
|
|
# handle non-empty dict; empty use one-line {} syntax
|
|
f.write(start)
|
|
for dict_key, value in attr.items():
|
|
# not dumping doc string here, since there is no way to upcast it after dumping
|
|
assert dict_key is not None, "Key None is reserved for sequences"
|
|
cls._dump_item(dict_key, value, f, level=level + 1)
|
|
else:
|
|
# dump scalar or empty sequence or mapping item
|
|
line = [_to_builtin(attr)] if name is None else {name: _to_builtin(attr)}
|
|
cls._dump_value(line, f, indent=indent)
|
|
|
|
def dump(self, f: TextIO, level: int = 0) -> None:
|
|
"""Dump Group to stream f at given indentation level"""
|
|
# There is no easy way to generate extra lines into default yaml output,
|
|
# so we format part of it by hand using an odd recursion here and in _dump_*.
|
|
|
|
self._dumping = True
|
|
try:
|
|
# fetch class to avoid going through getattr
|
|
cls = self.__class__
|
|
type_hints = cls.get_type_hints()
|
|
# validate group
|
|
for name in cls.__annotations__.keys():
|
|
assert hasattr(cls, name), f"{cls}.{name} is missing a default value"
|
|
# dump ordered members
|
|
for name in self:
|
|
attr = cast(object, getattr(self, name))
|
|
attr_cls = type_hints[name] if name in type_hints else attr.__class__
|
|
attr_cls_origin = typing.get_origin(attr_cls)
|
|
while attr_cls_origin is Union: # resolve to first type for doc string
|
|
attr_cls = typing.get_args(attr_cls)[0]
|
|
attr_cls_origin = typing.get_origin(attr_cls)
|
|
if attr_cls.__doc__ and attr_cls.__module__ != "builtins":
|
|
f.write(fmt_doc(attr_cls, level=level) + "\n")
|
|
self._dump_item(name, attr, f, level=level)
|
|
self._changed = False
|
|
finally:
|
|
self._dumping = False
|
|
|
|
|
|
class Bool:
|
|
# can't subclass bool, so we use this and Union or type: ignore
|
|
def __bool__(self) -> bool:
|
|
raise NotImplementedError()
|
|
|
|
|
|
# Types for generic settings
|
|
T = TypeVar("T", bound="Path")
|
|
|
|
|
|
def _resolve_exe(s: str) -> str:
|
|
"""Append exe file extension if the file is an executable"""
|
|
if isinstance(s, Path):
|
|
from Utils import is_windows
|
|
if s.is_exe and is_windows and not s.lower().endswith(".exe"):
|
|
return str(s + ".exe")
|
|
return str(s)
|
|
|
|
|
|
def _to_builtin(o: object) -> Any:
|
|
"""Downcast object to a builtin type for output"""
|
|
if o is None:
|
|
return None
|
|
c = o.__class__
|
|
while c.__module__ != "builtins":
|
|
c = c.__base__
|
|
return c.__call__(o)
|
|
|
|
|
|
class Path(str):
|
|
# paths in host.yaml are str
|
|
required: bool = True
|
|
"""Marks the file as required and opens a file browser when missing"""
|
|
is_exe: bool = False
|
|
"""Special cross-platform handling for executables"""
|
|
description: Optional[str] = None
|
|
"""Title to display when browsing for the file"""
|
|
copy_to: Optional[str] = None
|
|
"""If not None, copy to AP folder instead of linking it"""
|
|
|
|
@classmethod
|
|
def validate(cls, path: str) -> None:
|
|
"""Overload and raise to validate input files from browse"""
|
|
pass
|
|
|
|
def browse(self: T, **kwargs: Any) -> Optional[T]:
|
|
"""Opens a file browser to search for the file"""
|
|
raise NotImplementedError(f"Please use a subclass of Path for {self.__class__.__name__}")
|
|
|
|
def resolve(self) -> str:
|
|
return _resolve_exe(self)
|
|
|
|
def exists(self) -> bool:
|
|
return os.path.exists(self.resolve())
|
|
|
|
|
|
class _UserPath(str):
|
|
def resolve(self) -> str:
|
|
if os.path.isabs(self):
|
|
return str(self)
|
|
from Utils import user_path
|
|
return user_path(_resolve_exe(self))
|
|
|
|
|
|
class _LocalPath(str):
|
|
def resolve(self) -> str:
|
|
if os.path.isabs(self):
|
|
return str(self)
|
|
from Utils import local_path
|
|
return local_path(_resolve_exe(self))
|
|
|
|
|
|
class FilePath(Path):
|
|
# path to a file
|
|
|
|
md5s: ClassVar[List[Union[str, bytes]]] = []
|
|
"""MD5 hashes for default validator."""
|
|
|
|
def browse(self: T,
|
|
filetypes: Optional[typing.Sequence[typing.Tuple[str, typing.Sequence[str]]]] = None, **kwargs: Any)\
|
|
-> Optional[T]:
|
|
from Utils import open_filename, is_windows
|
|
if not filetypes:
|
|
if self.is_exe:
|
|
name, ext = "Program", ".exe" if is_windows else ""
|
|
else:
|
|
ext = os.path.splitext(self)[1]
|
|
name = ext[1:] if ext else "File"
|
|
filetypes = [(name, [ext])]
|
|
res = open_filename(f"Select {self.description or self.__class__.__name__}", filetypes, self)
|
|
if res:
|
|
self.validate(res)
|
|
if self.copy_to:
|
|
# instead of linking the file, copy it
|
|
dst = self.__class__(self.copy_to).resolve()
|
|
shutil.copy(res, dst, follow_symlinks=True)
|
|
res = dst
|
|
try:
|
|
rel = os.path.relpath(res, self.__class__("").resolve())
|
|
if not rel.startswith(".."):
|
|
res = rel
|
|
except ValueError:
|
|
pass
|
|
return self.__class__(res)
|
|
return None
|
|
|
|
@classmethod
|
|
def _validate_stream_hashes(cls, f: BinaryIO) -> None:
|
|
"""Helper to efficiently validate stream against hashes"""
|
|
if not cls.md5s:
|
|
return # no hashes to validate against
|
|
|
|
pos = f.tell()
|
|
try:
|
|
from hashlib import md5
|
|
file_md5 = md5()
|
|
block = bytearray(64*1024)
|
|
view = memoryview(block)
|
|
while n := f.readinto(view): # type: ignore
|
|
file_md5.update(view[:n])
|
|
file_md5_hex = file_md5.hexdigest()
|
|
for valid_md5 in cls.md5s:
|
|
if isinstance(valid_md5, str):
|
|
if valid_md5.lower() == file_md5_hex:
|
|
break
|
|
elif valid_md5 == file_md5.digest():
|
|
break
|
|
else:
|
|
raise ValueError(f"Hashes do not match for {cls.__name__}")
|
|
finally:
|
|
f.seek(pos)
|
|
|
|
@classmethod
|
|
def validate(cls, path: str) -> None:
|
|
"""Try to open and validate file against hashes"""
|
|
with open(path, "rb", buffering=0) as f:
|
|
try:
|
|
cls._validate_stream_hashes(f)
|
|
except ValueError:
|
|
raise ValueError(f"File hash does not match for {path}")
|
|
|
|
|
|
class FolderPath(Path):
|
|
# path to a folder
|
|
|
|
def browse(self: T, **kwargs: Any) -> Optional[T]:
|
|
from Utils import open_directory
|
|
res = open_directory(f"Select {self.description or self.__class__.__name__}", self)
|
|
if res:
|
|
try:
|
|
rel = os.path.relpath(res, self.__class__("").resolve())
|
|
if not rel.startswith(".."):
|
|
res = rel
|
|
except ValueError:
|
|
pass
|
|
return self.__class__(res)
|
|
return None
|
|
|
|
|
|
class UserFilePath(_UserPath, FilePath):
|
|
pass
|
|
|
|
|
|
class UserFolderPath(_UserPath, FolderPath):
|
|
pass
|
|
|
|
|
|
class OptionalUserFilePath(UserFilePath):
|
|
required = False
|
|
|
|
|
|
class OptionalUserFolderPath(UserFolderPath):
|
|
required = False
|
|
|
|
|
|
class LocalFilePath(_LocalPath, FilePath):
|
|
pass
|
|
|
|
|
|
class LocalFolderPath(_LocalPath, FolderPath):
|
|
pass
|
|
|
|
|
|
class OptionalLocalFilePath(LocalFilePath):
|
|
required = False
|
|
|
|
|
|
class OptionalLocalFolderPath(LocalFolderPath):
|
|
required = False
|
|
|
|
|
|
class SNESRomPath(UserFilePath):
|
|
# Special UserFilePath that ignores an optional header when validating
|
|
|
|
@classmethod
|
|
def validate(cls, path: str) -> None:
|
|
"""Try to open and validate file against hashes"""
|
|
with open(path, "rb", buffering=0) as f:
|
|
f.seek(0, os.SEEK_END)
|
|
size = f.tell()
|
|
if size % 1024 == 512:
|
|
f.seek(512) # skip header
|
|
elif size % 1024 == 0:
|
|
f.seek(0) # header-less
|
|
else:
|
|
raise ValueError(f"Unexpected file size for {path}")
|
|
|
|
try:
|
|
cls._validate_stream_hashes(f)
|
|
except ValueError:
|
|
raise ValueError(f"File hash does not match for {path}")
|
|
|
|
|
|
# World-independent setting groups
|
|
|
|
class GeneralOptions(Group):
|
|
class OutputPath(OptionalUserFolderPath):
|
|
"""
|
|
Where to place output files
|
|
"""
|
|
# created on demand, so marked as optional
|
|
|
|
output_path: OutputPath = OutputPath("output")
|
|
|
|
|
|
class ServerOptions(Group):
|
|
"""
|
|
Options for MultiServer
|
|
Null means nothing, for the server this means to default the value
|
|
These overwrite command line arguments!
|
|
"""
|
|
|
|
class ServerPassword(str):
|
|
"""
|
|
Allows for clients to log on and manage the server. If this is null, no remote administration is possible.
|
|
"""
|
|
|
|
class DisableItemCheat(Bool):
|
|
"""Disallow !getitem"""
|
|
|
|
class LocationCheckPoints(int):
|
|
"""
|
|
Client hint system
|
|
Points given to a player for each acquired item in their world
|
|
"""
|
|
|
|
class HintCost(int):
|
|
"""
|
|
Relative point cost to receive a hint via !hint for players
|
|
so for example hint_cost: 20 would mean that for every 20% of available checks, you get the ability to hint,
|
|
for a total of 5
|
|
"""
|
|
|
|
class ReleaseMode(str):
|
|
"""
|
|
Release modes
|
|
A Release sends out the remaining items *from* a world that releases
|
|
"disabled" -> clients can't release,
|
|
"enabled" -> clients can always release
|
|
"auto" -> automatic release on goal completion
|
|
"auto-enabled" -> automatic release on goal completion and manual release is also enabled
|
|
"goal" -> release is allowed after goal completion
|
|
"""
|
|
|
|
class CollectMode(str):
|
|
"""
|
|
Collect modes
|
|
A Collect sends the remaining items *to* a world that collects
|
|
"disabled" -> clients can't collect,
|
|
"enabled" -> clients can always collect
|
|
"auto" -> automatic collect on goal completion
|
|
"auto-enabled" -> automatic collect on goal completion and manual collect is also enabled
|
|
"goal" -> collect is allowed after goal completion
|
|
"""
|
|
|
|
class RemainingMode(str):
|
|
"""
|
|
Remaining modes
|
|
!remaining handling, that tells a client which items remain in their pool
|
|
"enabled" -> Client can always ask for remaining items
|
|
"disabled" -> Client can never ask for remaining items
|
|
"goal" -> Client can ask for remaining items after goal completion
|
|
"""
|
|
|
|
class AutoShutdown(int):
|
|
"""Automatically shut down the server after this many seconds without new location checks, 0 to keep running"""
|
|
|
|
class Compatibility(IntEnum):
|
|
"""
|
|
Compatibility handling
|
|
2 -> Recommended for casual/cooperative play, attempt to be compatible with everything across all versions
|
|
1 -> No longer in use, kept reserved in case of future use
|
|
0 -> Recommended for tournaments to force a level playing field, only allow an exact version match
|
|
"""
|
|
OFF = 0
|
|
ON = 1
|
|
FULL = 2
|
|
|
|
class LogNetwork(IntEnum):
|
|
"""log all server traffic, mostly for dev use"""
|
|
OFF = 0
|
|
ON = 1
|
|
|
|
host: Optional[str] = None
|
|
port: int = 38281
|
|
password: Optional[str] = None
|
|
multidata: Optional[str] = None
|
|
savefile: Optional[str] = None
|
|
disable_save: bool = False
|
|
loglevel: str = "info"
|
|
server_password: Optional[ServerPassword] = None
|
|
disable_item_cheat: Union[DisableItemCheat, bool] = False
|
|
location_check_points: LocationCheckPoints = LocationCheckPoints(1)
|
|
hint_cost: HintCost = HintCost(10)
|
|
release_mode: ReleaseMode = ReleaseMode("goal")
|
|
collect_mode: CollectMode = CollectMode("goal")
|
|
remaining_mode: RemainingMode = RemainingMode("goal")
|
|
auto_shutdown: AutoShutdown = AutoShutdown(0)
|
|
compatibility: Compatibility = Compatibility(2)
|
|
log_network: LogNetwork = LogNetwork(0)
|
|
|
|
|
|
class GeneratorOptions(Group):
|
|
"""Options for Generation"""
|
|
|
|
class EnemizerPath(LocalFilePath):
|
|
"""Location of your Enemizer CLI, available here: https://github.com/Ijwu/Enemizer/releases"""
|
|
is_exe = True
|
|
|
|
class PlayerFilesPath(OptionalUserFolderPath):
|
|
"""Folder from which the player yaml files are pulled from"""
|
|
# created on demand, so marked as optional
|
|
|
|
class Players(int):
|
|
"""amount of players, 0 to infer from player files"""
|
|
|
|
class WeightsFilePath(str):
|
|
"""
|
|
general weights file, within the stated player_files_path location
|
|
gets used if players is higher than the amount of per-player files found to fill remaining slots
|
|
"""
|
|
# this is special because the path is relative to player_files_path
|
|
|
|
class MetaFilePath(str):
|
|
"""Meta file name, within the stated player_files_path location"""
|
|
# this is special because the path is relative to player_files_path
|
|
|
|
class Spoiler(IntEnum):
|
|
"""
|
|
Create a spoiler file
|
|
0 -> None
|
|
1 -> Spoiler without playthrough or paths to playthrough required items
|
|
2 -> Spoiler with playthrough (viable solution to goals)
|
|
3 -> Spoiler with playthrough and traversal paths towards items
|
|
"""
|
|
NONE = 0
|
|
BASIC = 1
|
|
PLAYTHROUGH = 2
|
|
FULL = 3
|
|
|
|
class GlitchTriforceRoom(IntEnum):
|
|
"""
|
|
Glitch to Triforce room from Ganon
|
|
When disabled, you have to have a weapon that can hurt ganon (master sword or swordless/easy item functionality
|
|
+ hammer) and have completed the goal required for killing ganon to be able to access the triforce room.
|
|
1 -> Enabled.
|
|
0 -> Disabled (except in no-logic)
|
|
"""
|
|
OFF = 0
|
|
ON = 1
|
|
|
|
class PlandoOptions(str):
|
|
"""
|
|
List of options that can be plando'd. Can be combined, for example "bosses, items"
|
|
Available options: bosses, items, texts, connections
|
|
"""
|
|
|
|
class Race(IntEnum):
|
|
"""Create encrypted race roms and flag games as race mode"""
|
|
OFF = 0
|
|
ON = 1
|
|
|
|
enemizer_path: EnemizerPath = EnemizerPath("EnemizerCLI/EnemizerCLI.Core") # + ".exe" is implied on Windows
|
|
player_files_path: PlayerFilesPath = PlayerFilesPath("Players")
|
|
players: Players = Players(0)
|
|
weights_file_path: WeightsFilePath = WeightsFilePath("weights.yaml")
|
|
meta_file_path: MetaFilePath = MetaFilePath("meta.yaml")
|
|
spoiler: Spoiler = Spoiler(3)
|
|
glitch_triforce_room: GlitchTriforceRoom = GlitchTriforceRoom(1) # why is this here?
|
|
race: Race = Race(0)
|
|
plando_options: PlandoOptions = PlandoOptions("bosses")
|
|
|
|
|
|
class SNIOptions(Group):
|
|
class SNIPath(LocalFolderPath):
|
|
"""
|
|
Set this to your SNI folder location if you want the MultiClient to attempt an auto start, \
|
|
does nothing if not found
|
|
"""
|
|
|
|
class SnesRomStart(str):
|
|
"""
|
|
Set this to false to never autostart a rom (such as after patching)
|
|
True for operating system default program
|
|
Alternatively, a path to a program to open the .sfc file with
|
|
"""
|
|
|
|
sni_path: SNIPath = SNIPath("SNI")
|
|
snes_rom_start: Union[SnesRomStart, bool] = True
|
|
|
|
|
|
class BizHawkClientOptions(Group):
|
|
class EmuHawkPath(UserFilePath):
|
|
"""
|
|
The location of the EmuHawk you want to auto launch patched ROMs with
|
|
"""
|
|
is_exe = True
|
|
description = "EmuHawk Executable"
|
|
|
|
class RomStart(str):
|
|
"""
|
|
Set this to true to autostart a patched ROM in BizHawk with the connector script,
|
|
to false to never open the patched rom automatically,
|
|
or to a path to an external program to open the ROM file with that instead.
|
|
"""
|
|
|
|
emuhawk_path: EmuHawkPath = EmuHawkPath(None)
|
|
rom_start: Union[RomStart, bool] = True
|
|
|
|
|
|
# Top-level group with lazy loading of worlds
|
|
|
|
class Settings(Group):
|
|
general_options: GeneralOptions = GeneralOptions()
|
|
server_options: ServerOptions = ServerOptions()
|
|
generator: GeneratorOptions = GeneratorOptions()
|
|
sni_options: SNIOptions = SNIOptions()
|
|
bizhawkclient_options: BizHawkClientOptions = BizHawkClientOptions()
|
|
|
|
_filename: Optional[str] = None
|
|
|
|
def __getattribute__(self, key: str) -> Any:
|
|
if key.startswith("_") or key in self.__class__.__dict__:
|
|
# not a group or a hard-coded group
|
|
pass
|
|
elif key not in dir(self) or isinstance(super().__getattribute__(key), dict):
|
|
# settings class not loaded yet
|
|
if key not in _world_settings_name_cache:
|
|
# find world that provides the settings class
|
|
_update_cache()
|
|
# check for missing keys to update _changed
|
|
for world_settings_name in _world_settings_name_cache:
|
|
if world_settings_name not in dir(self):
|
|
self._changed = True
|
|
if key not in _world_settings_name_cache:
|
|
# not a world group
|
|
return super().__getattribute__(key)
|
|
# directly import world and grab settings class
|
|
world_mod, world_cls_name = _world_settings_name_cache[key].rsplit(".", 1)
|
|
world = cast(type, getattr(__import__(world_mod, fromlist=[world_cls_name]), world_cls_name))
|
|
assert getattr(world, "settings_key") == key
|
|
try:
|
|
cls_or_name = world.__annotations__["settings"]
|
|
except KeyError:
|
|
import warnings
|
|
warnings.warn(f"World {world_cls_name} does not define settings. Please consider upgrading the world.")
|
|
return super().__getattribute__(key)
|
|
if isinstance(cls_or_name, str):
|
|
# Try to resolve type. Sadly we can't use get_type_hints, see https://bugs.python.org/issue43463
|
|
cls_name = cls_or_name
|
|
if "[" in cls_name: # resolve ClassVar[]
|
|
cls_name = cls_name.split("[", 1)[1].rsplit("]", 1)[0]
|
|
cls = cast(type, getattr(__import__(world_mod, fromlist=[cls_name]), cls_name))
|
|
else:
|
|
type_args = typing.get_args(cls_or_name) # resolve ClassVar[]
|
|
cls = type_args[0] if type_args else cast(type, cls_or_name)
|
|
impl: Group = cast(Group, cls())
|
|
assert isinstance(impl, Group), f"{world_cls_name}.settings has to inherit from settings.Group. " \
|
|
"If that's already the case, please avoid recursive partial imports."
|
|
# above assert fails for recursive partial imports
|
|
# upcast loaded data to settings class
|
|
try:
|
|
dct = super().__getattribute__(key)
|
|
if isinstance(dct, dict):
|
|
impl.update(dct)
|
|
else:
|
|
self._changed = True # key is a class var -> new section
|
|
except AttributeError:
|
|
self._changed = True # key is unknown -> new section
|
|
setattr(self, key, impl)
|
|
|
|
return super().__getattribute__(key)
|
|
|
|
def __init__(self, location: Optional[str]): # change to PathLike[str] once we drop 3.8?
|
|
super().__init__()
|
|
if location:
|
|
from Utils import parse_yaml
|
|
with open(location, encoding="utf-8-sig") as f:
|
|
options = parse_yaml(f.read())
|
|
# TODO: detect if upgrade is required
|
|
# TODO: once we have a cache for _world_settings_name_cache, detect if any game section is missing
|
|
self.update(options or {})
|
|
self._filename = location
|
|
|
|
def autosave() -> None:
|
|
if __debug__:
|
|
import __main__
|
|
main_file = getattr(__main__, "__file__", "")
|
|
assert "pytest" not in main_file and "unittest" not in main_file, \
|
|
f"Auto-saving {self._filename} during unittests"
|
|
if self._filename and self.changed and not skip_autosave:
|
|
self.save()
|
|
|
|
if not skip_autosave:
|
|
import atexit
|
|
atexit.register(autosave)
|
|
|
|
def save(self, location: Optional[str] = None) -> None: # as above
|
|
location = location or self._filename
|
|
assert location, "No file specified"
|
|
temp_location = location + ".tmp" # not using tempfile to test expected file access
|
|
# remove old temps
|
|
if os.path.exists(temp_location):
|
|
os.unlink(temp_location)
|
|
# can't use utf-8-sig because it breaks backward compat: pyyaml on Windows with bytes does not strip the BOM
|
|
with open(temp_location, "w", encoding="utf-8") as f:
|
|
self.dump(f)
|
|
# replace old with new
|
|
if os.path.exists(location):
|
|
os.unlink(location)
|
|
os.rename(temp_location, location)
|
|
self._filename = location
|
|
|
|
def dump(self, f: TextIO, level: int = 0) -> None:
|
|
# load all world setting classes
|
|
_update_cache()
|
|
for key in _world_settings_name_cache:
|
|
self.__getattribute__(key) # load all worlds
|
|
super().dump(f, level)
|
|
|
|
@property
|
|
def filename(self) -> Optional[str]:
|
|
return self._filename
|
|
|
|
|
|
# host.yaml loader
|
|
|
|
def get_settings() -> Settings:
|
|
"""Returns settings from the default host.yaml"""
|
|
with _lock: # make sure we only have one instance
|
|
res = getattr(get_settings, "_cache", None)
|
|
if not res:
|
|
import os
|
|
from Utils import user_path, local_path
|
|
filenames = ("options.yaml", "host.yaml")
|
|
locations: List[str] = []
|
|
if os.path.join(os.getcwd()) != local_path():
|
|
locations += filenames # use files from cwd only if it's not the local_path
|
|
locations += [user_path(filename) for filename in filenames]
|
|
for location in locations:
|
|
try:
|
|
res = Settings(location)
|
|
break
|
|
except FileNotFoundError:
|
|
continue
|
|
else:
|
|
warnings.warn(f"Could not find {filenames[1]} to load options. Creating a new one.")
|
|
res = Settings(None)
|
|
res.save(user_path(filenames[1]))
|
|
setattr(get_settings, "_cache", res)
|
|
return res
|