2020-04-22 03:09:46 +00:00
|
|
|
from __future__ import annotations
|
2021-01-02 11:49:43 +00:00
|
|
|
|
2022-11-02 14:51:35 +00:00
|
|
|
import asyncio
|
2023-03-20 16:01:08 +00:00
|
|
|
import json
|
2020-06-21 13:32:31 +00:00
|
|
|
import typing
|
2021-11-28 03:06:30 +00:00
|
|
|
import builtins
|
|
|
|
import os
|
|
|
|
import subprocess
|
|
|
|
import sys
|
|
|
|
import pickle
|
|
|
|
import functools
|
|
|
|
import io
|
|
|
|
import collections
|
|
|
|
import importlib
|
|
|
|
import logging
|
2023-10-02 06:34:50 +00:00
|
|
|
import warnings
|
2022-09-29 22:36:30 +00:00
|
|
|
|
2023-08-25 20:25:02 +00:00
|
|
|
from argparse import Namespace
|
2023-07-05 20:39:35 +00:00
|
|
|
from settings import Settings, get_settings
|
|
|
|
from typing import BinaryIO, Coroutine, Optional, Set, Dict, Any, Union
|
2022-08-11 22:32:37 +00:00
|
|
|
from yaml import load, load_all, dump, SafeLoader
|
|
|
|
|
|
|
|
try:
|
|
|
|
from yaml import CLoader as UnsafeLoader
|
|
|
|
from yaml import CDumper as Dumper
|
|
|
|
except ImportError:
|
|
|
|
from yaml import Loader as UnsafeLoader
|
|
|
|
from yaml import Dumper
|
2022-05-18 20:30:19 +00:00
|
|
|
|
|
|
|
if typing.TYPE_CHECKING:
|
2022-08-11 22:32:37 +00:00
|
|
|
import tkinter
|
|
|
|
import pathlib
|
2023-10-01 23:56:55 +00:00
|
|
|
from BaseClasses import Region
|
2020-06-21 13:32:31 +00:00
|
|
|
|
|
|
|
|
2021-08-06 17:33:17 +00:00
|
|
|
def tuplize_version(version: str) -> Version:
|
2020-12-29 18:23:14 +00:00
|
|
|
return Version(*(int(piece, 10) for piece in version.split(".")))
|
|
|
|
|
2020-06-21 13:32:31 +00:00
|
|
|
|
2020-12-29 18:23:14 +00:00
|
|
|
class Version(typing.NamedTuple):
|
|
|
|
major: int
|
|
|
|
minor: int
|
2021-02-21 22:46:05 +00:00
|
|
|
build: int
|
2020-04-22 03:09:46 +00:00
|
|
|
|
2023-04-25 11:26:52 +00:00
|
|
|
def as_simple_string(self) -> str:
|
|
|
|
return ".".join(str(item) for item in self)
|
|
|
|
|
2021-07-01 23:29:49 +00:00
|
|
|
|
2023-09-16 17:23:22 +00:00
|
|
|
__version__ = "0.4.3"
|
2021-06-18 20:15:54 +00:00
|
|
|
version_tuple = tuplize_version(__version__)
|
2020-04-20 12:50:49 +00:00
|
|
|
|
2022-08-11 22:32:37 +00:00
|
|
|
is_linux = sys.platform.startswith("linux")
|
|
|
|
is_macos = sys.platform == "darwin"
|
2022-06-04 16:10:34 +00:00
|
|
|
is_windows = sys.platform in ("win32", "cygwin", "msys")
|
|
|
|
|
2017-11-28 14:36:32 +00:00
|
|
|
|
2022-04-28 16:03:44 +00:00
|
|
|
def int16_as_bytes(value: int) -> typing.List[int]:
|
2018-02-17 23:38:54 +00:00
|
|
|
value = value & 0xFFFF
|
|
|
|
return [value & 0xFF, (value >> 8) & 0xFF]
|
|
|
|
|
2020-02-16 14:32:40 +00:00
|
|
|
|
2022-04-28 16:03:44 +00:00
|
|
|
def int32_as_bytes(value: int) -> typing.List[int]:
|
2018-02-17 23:38:54 +00:00
|
|
|
value = value & 0xFFFFFFFF
|
|
|
|
return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF]
|
|
|
|
|
2020-02-16 14:32:40 +00:00
|
|
|
|
2022-04-28 16:03:44 +00:00
|
|
|
def pc_to_snes(value: int) -> int:
|
2021-01-02 11:49:43 +00:00
|
|
|
return ((value << 1) & 0x7F0000) | (value & 0x7FFF) | 0x8000
|
2018-09-23 02:51:54 +00:00
|
|
|
|
2020-07-21 21:15:19 +00:00
|
|
|
|
2022-04-28 16:03:44 +00:00
|
|
|
def snes_to_pc(value: int) -> int:
|
2021-01-02 11:49:43 +00:00
|
|
|
return ((value & 0x7F0000) >> 1) | (value & 0x7FFF)
|
2018-09-23 02:51:54 +00:00
|
|
|
|
2020-07-21 21:15:19 +00:00
|
|
|
|
2022-04-28 16:03:44 +00:00
|
|
|
RetType = typing.TypeVar("RetType")
|
|
|
|
|
|
|
|
|
|
|
|
def cache_argsless(function: typing.Callable[[], RetType]) -> typing.Callable[[], RetType]:
|
2022-04-30 02:39:08 +00:00
|
|
|
assert not function.__code__.co_argcount, "Can only cache 0 argument functions with this cache."
|
2021-07-09 15:44:24 +00:00
|
|
|
|
2022-04-28 16:03:44 +00:00
|
|
|
sentinel = object()
|
|
|
|
result: typing.Union[object, RetType] = sentinel
|
2021-07-09 15:44:24 +00:00
|
|
|
|
2022-04-28 16:03:44 +00:00
|
|
|
def _wrap() -> RetType:
|
2021-07-09 15:44:24 +00:00
|
|
|
nonlocal result
|
|
|
|
if result is sentinel:
|
|
|
|
result = function()
|
2022-04-28 16:03:44 +00:00
|
|
|
return typing.cast(RetType, result)
|
2021-07-09 15:44:24 +00:00
|
|
|
|
|
|
|
return _wrap
|
|
|
|
|
|
|
|
|
2021-07-19 19:52:08 +00:00
|
|
|
def is_frozen() -> bool:
|
2022-04-28 16:03:44 +00:00
|
|
|
return typing.cast(bool, getattr(sys, 'frozen', False))
|
2017-11-28 14:36:32 +00:00
|
|
|
|
2020-07-21 21:15:19 +00:00
|
|
|
|
2022-03-31 03:08:15 +00:00
|
|
|
def local_path(*path: str) -> str:
|
2023-03-29 18:14:45 +00:00
|
|
|
"""
|
|
|
|
Returns path to a file in the local Archipelago installation or source.
|
|
|
|
This might be read-only and user_path should be used instead for ROMs, configuration, etc.
|
|
|
|
"""
|
2022-03-31 03:08:15 +00:00
|
|
|
if hasattr(local_path, 'cached_path'):
|
|
|
|
pass
|
2021-07-19 19:52:08 +00:00
|
|
|
elif is_frozen():
|
2020-03-23 06:45:40 +00:00
|
|
|
if hasattr(sys, "_MEIPASS"):
|
|
|
|
# we are running in a PyInstaller bundle
|
|
|
|
local_path.cached_path = sys._MEIPASS # pylint: disable=protected-access,no-member
|
|
|
|
else:
|
|
|
|
# cx_Freeze
|
|
|
|
local_path.cached_path = os.path.dirname(os.path.abspath(sys.argv[0]))
|
2017-11-28 14:36:32 +00:00
|
|
|
else:
|
2020-03-23 06:45:40 +00:00
|
|
|
import __main__
|
2022-12-11 12:15:23 +00:00
|
|
|
if hasattr(__main__, "__file__") and os.path.isfile(__main__.__file__):
|
2021-04-04 01:18:19 +00:00
|
|
|
# we are running in a normal Python environment
|
|
|
|
local_path.cached_path = os.path.dirname(os.path.abspath(__main__.__file__))
|
|
|
|
else:
|
|
|
|
# pray
|
|
|
|
local_path.cached_path = os.path.abspath(".")
|
2020-03-15 18:32:00 +00:00
|
|
|
|
2020-08-25 11:22:47 +00:00
|
|
|
return os.path.join(local_path.cached_path, *path)
|
2017-11-28 14:36:32 +00:00
|
|
|
|
2021-01-02 11:49:43 +00:00
|
|
|
|
2022-03-31 03:08:15 +00:00
|
|
|
def home_path(*path: str) -> str:
|
|
|
|
"""Returns path to a file in the user home's Archipelago directory."""
|
|
|
|
if hasattr(home_path, 'cached_path'):
|
|
|
|
pass
|
|
|
|
elif sys.platform.startswith('linux'):
|
|
|
|
home_path.cached_path = os.path.expanduser('~/Archipelago')
|
|
|
|
os.makedirs(home_path.cached_path, 0o700, exist_ok=True)
|
|
|
|
else:
|
|
|
|
# not implemented
|
|
|
|
home_path.cached_path = local_path() # this will generate the same exceptions we got previously
|
|
|
|
|
|
|
|
return os.path.join(home_path.cached_path, *path)
|
2017-11-28 14:36:32 +00:00
|
|
|
|
2020-08-25 11:22:47 +00:00
|
|
|
|
2022-03-31 03:08:15 +00:00
|
|
|
def user_path(*path: str) -> str:
|
|
|
|
"""Returns either local_path or home_path based on write permissions."""
|
2022-08-11 22:32:37 +00:00
|
|
|
if hasattr(user_path, "cached_path"):
|
2022-03-31 03:08:15 +00:00
|
|
|
pass
|
|
|
|
elif os.access(local_path(), os.W_OK):
|
|
|
|
user_path.cached_path = local_path()
|
|
|
|
else:
|
|
|
|
user_path.cached_path = home_path()
|
2023-07-05 20:39:35 +00:00
|
|
|
# populate home from local
|
|
|
|
if user_path.cached_path != local_path():
|
|
|
|
import filecmp
|
|
|
|
if not os.path.exists(user_path("manifest.json")) or \
|
|
|
|
not filecmp.cmp(local_path("manifest.json"), user_path("manifest.json"), shallow=True):
|
|
|
|
import shutil
|
|
|
|
for dn in ("Players", "data/sprites"):
|
|
|
|
shutil.copytree(local_path(dn), user_path(dn), dirs_exist_ok=True)
|
|
|
|
for fn in ("manifest.json",):
|
|
|
|
shutil.copy2(local_path(fn), user_path(fn))
|
2022-03-31 03:08:15 +00:00
|
|
|
|
|
|
|
return os.path.join(user_path.cached_path, *path)
|
|
|
|
|
|
|
|
|
2023-03-20 16:01:08 +00:00
|
|
|
def cache_path(*path: str) -> str:
|
|
|
|
"""Returns path to a file in the user's Archipelago cache directory."""
|
|
|
|
if hasattr(cache_path, "cached_path"):
|
|
|
|
pass
|
|
|
|
else:
|
2023-03-30 13:30:43 +00:00
|
|
|
import platformdirs
|
|
|
|
cache_path.cached_path = platformdirs.user_cache_dir("Archipelago", False)
|
2023-03-20 16:01:08 +00:00
|
|
|
|
|
|
|
return os.path.join(cache_path.cached_path, *path)
|
|
|
|
|
|
|
|
|
2022-10-25 17:54:43 +00:00
|
|
|
def output_path(*path: str) -> str:
|
2022-03-31 03:08:15 +00:00
|
|
|
if hasattr(output_path, 'cached_path'):
|
2020-08-25 11:22:47 +00:00
|
|
|
return os.path.join(output_path.cached_path, *path)
|
2022-03-31 03:08:15 +00:00
|
|
|
output_path.cached_path = user_path(get_options()["general_options"]["output_path"])
|
2020-08-25 11:22:47 +00:00
|
|
|
path = os.path.join(output_path.cached_path, *path)
|
2020-08-01 14:52:11 +00:00
|
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
|
|
return path
|
2017-11-28 14:36:32 +00:00
|
|
|
|
2021-01-02 11:49:43 +00:00
|
|
|
|
2022-08-11 22:32:37 +00:00
|
|
|
def open_file(filename: typing.Union[str, "pathlib.Path"]) -> None:
|
|
|
|
if is_windows:
|
2017-11-28 14:36:32 +00:00
|
|
|
os.startfile(filename)
|
|
|
|
else:
|
2022-08-11 22:32:37 +00:00
|
|
|
from shutil import which
|
|
|
|
open_command = which("open") if is_macos else (which("xdg-open") or which("gnome-open") or which("kde-open"))
|
2017-11-28 14:36:32 +00:00
|
|
|
subprocess.call([open_command, filename])
|
2017-12-02 14:21:04 +00:00
|
|
|
|
2021-01-02 11:49:43 +00:00
|
|
|
|
2022-01-19 03:26:25 +00:00
|
|
|
# from https://gist.github.com/pypt/94d747fe5180851196eb#gistcomment-4015118 with some changes
|
|
|
|
class UniqueKeyLoader(SafeLoader):
|
|
|
|
def construct_mapping(self, node, deep=False):
|
|
|
|
mapping = set()
|
|
|
|
for key_node, value_node in node.value:
|
|
|
|
key = self.construct_object(key_node, deep=deep)
|
|
|
|
if key in mapping:
|
2022-01-25 03:20:08 +00:00
|
|
|
logging.error(f"YAML duplicates sanity check failed{key_node.start_mark}")
|
|
|
|
raise KeyError(f"Duplicate key {key} found in YAML. Already found keys: {mapping}.")
|
2022-01-19 03:26:25 +00:00
|
|
|
mapping.add(key)
|
|
|
|
return super().construct_mapping(node, deep)
|
|
|
|
|
|
|
|
|
|
|
|
parse_yaml = functools.partial(load, Loader=UniqueKeyLoader)
|
2022-04-12 08:57:29 +00:00
|
|
|
parse_yamls = functools.partial(load_all, Loader=UniqueKeyLoader)
|
2022-08-11 22:32:37 +00:00
|
|
|
unsafe_parse_yaml = functools.partial(load, Loader=UnsafeLoader)
|
|
|
|
|
|
|
|
del load, load_all # should not be used. don't leak their names
|
2020-02-16 14:32:40 +00:00
|
|
|
|
2021-07-30 23:40:27 +00:00
|
|
|
|
2021-11-13 22:14:26 +00:00
|
|
|
def get_cert_none_ssl_context():
|
|
|
|
import ssl
|
|
|
|
ctx = ssl.create_default_context()
|
|
|
|
ctx.check_hostname = False
|
|
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
return ctx
|
|
|
|
|
|
|
|
|
2021-07-09 15:44:24 +00:00
|
|
|
@cache_argsless
|
2020-03-05 23:48:23 +00:00
|
|
|
def get_public_ipv4() -> str:
|
|
|
|
import socket
|
|
|
|
import urllib.request
|
2023-10-02 06:34:50 +00:00
|
|
|
try:
|
|
|
|
ip = socket.gethostbyname(socket.gethostname())
|
|
|
|
except socket.gaierror:
|
|
|
|
# if hostname or resolvconf is not set up properly, this may fail
|
|
|
|
warnings.warn("Could not resolve own hostname, falling back to 127.0.0.1")
|
|
|
|
ip = "127.0.0.1"
|
|
|
|
|
2021-11-13 22:14:26 +00:00
|
|
|
ctx = get_cert_none_ssl_context()
|
2020-03-05 23:48:23 +00:00
|
|
|
try:
|
2023-03-09 20:31:00 +00:00
|
|
|
ip = urllib.request.urlopen("https://checkip.amazonaws.com/", context=ctx, timeout=10).read().decode("utf8").strip()
|
2020-03-05 23:48:23 +00:00
|
|
|
except Exception as e:
|
2022-08-11 22:32:37 +00:00
|
|
|
# noinspection PyBroadException
|
2020-03-05 23:48:23 +00:00
|
|
|
try:
|
2023-03-09 20:31:00 +00:00
|
|
|
ip = urllib.request.urlopen("https://v4.ident.me", context=ctx, timeout=10).read().decode("utf8").strip()
|
2022-08-11 22:32:37 +00:00
|
|
|
except Exception:
|
2020-03-05 23:48:23 +00:00
|
|
|
logging.exception(e)
|
|
|
|
pass # we could be offline, in a local game, so no point in erroring out
|
|
|
|
return ip
|
2020-03-15 18:32:00 +00:00
|
|
|
|
2021-07-30 23:40:27 +00:00
|
|
|
|
2021-07-09 15:44:24 +00:00
|
|
|
@cache_argsless
|
2020-06-14 07:06:37 +00:00
|
|
|
def get_public_ipv6() -> str:
|
|
|
|
import socket
|
|
|
|
import urllib.request
|
2023-10-02 06:34:50 +00:00
|
|
|
try:
|
|
|
|
ip = socket.gethostbyname(socket.gethostname())
|
|
|
|
except socket.gaierror:
|
|
|
|
# if hostname or resolvconf is not set up properly, this may fail
|
|
|
|
warnings.warn("Could not resolve own hostname, falling back to ::1")
|
|
|
|
ip = "::1"
|
|
|
|
|
2021-11-13 22:14:26 +00:00
|
|
|
ctx = get_cert_none_ssl_context()
|
2020-06-14 07:06:37 +00:00
|
|
|
try:
|
2023-03-09 20:31:00 +00:00
|
|
|
ip = urllib.request.urlopen("https://v6.ident.me", context=ctx, timeout=10).read().decode("utf8").strip()
|
2020-06-14 07:06:37 +00:00
|
|
|
except Exception as e:
|
|
|
|
logging.exception(e)
|
2020-06-21 14:13:42 +00:00
|
|
|
pass # we could be offline, in a local game, or ipv6 may not be available
|
2020-06-14 07:06:37 +00:00
|
|
|
return ip
|
2020-03-15 18:32:00 +00:00
|
|
|
|
2021-07-30 23:40:27 +00:00
|
|
|
|
2023-07-05 20:39:35 +00:00
|
|
|
OptionsType = Settings # TODO: remove ~2 versions after 0.4.1
|
2022-09-28 21:54:10 +00:00
|
|
|
|
|
|
|
|
2021-07-09 15:44:24 +00:00
|
|
|
@cache_argsless
|
2023-07-05 20:39:35 +00:00
|
|
|
def get_default_options() -> Settings: # TODO: remove ~2 versions after 0.4.1
|
|
|
|
return Settings(None)
|
2020-11-28 19:34:29 +00:00
|
|
|
|
2021-07-30 23:40:27 +00:00
|
|
|
|
2023-07-05 20:39:35 +00:00
|
|
|
get_options = get_settings # TODO: add a warning ~2 versions after 0.4.1 and remove once all games are ported
|
2020-04-14 18:22:42 +00:00
|
|
|
|
|
|
|
|
2021-02-25 01:07:28 +00:00
|
|
|
def persistent_store(category: str, key: typing.Any, value: typing.Any):
|
2022-03-31 03:08:15 +00:00
|
|
|
path = user_path("_persistent_storage.yaml")
|
2020-04-24 03:29:02 +00:00
|
|
|
storage: dict = persistent_load()
|
|
|
|
category = storage.setdefault(category, {})
|
|
|
|
category[key] = value
|
|
|
|
with open(path, "wt") as f:
|
2022-08-11 22:32:37 +00:00
|
|
|
f.write(dump(storage, Dumper=Dumper))
|
2020-04-24 03:29:02 +00:00
|
|
|
|
|
|
|
|
2022-08-11 22:32:37 +00:00
|
|
|
def persistent_load() -> typing.Dict[str, dict]:
|
2020-06-04 19:27:29 +00:00
|
|
|
storage = getattr(persistent_load, "storage", None)
|
|
|
|
if storage:
|
|
|
|
return storage
|
2022-03-31 03:08:15 +00:00
|
|
|
path = user_path("_persistent_storage.yaml")
|
2020-04-24 03:29:02 +00:00
|
|
|
storage: dict = {}
|
|
|
|
if os.path.exists(path):
|
|
|
|
try:
|
|
|
|
with open(path, "r") as f:
|
2020-07-05 00:06:00 +00:00
|
|
|
storage = unsafe_parse_yaml(f.read())
|
2020-04-24 03:29:02 +00:00
|
|
|
except Exception as e:
|
|
|
|
logging.debug(f"Could not read store: {e}")
|
2020-04-30 05:42:26 +00:00
|
|
|
if storage is None:
|
|
|
|
storage = {}
|
2020-06-04 19:27:29 +00:00
|
|
|
persistent_load.storage = storage
|
2020-04-24 03:29:02 +00:00
|
|
|
return storage
|
|
|
|
|
|
|
|
|
2023-03-20 16:01:08 +00:00
|
|
|
def get_file_safe_name(name: str) -> str:
|
|
|
|
return "".join(c for c in name if c not in '<>:"/\\|?*')
|
|
|
|
|
|
|
|
|
|
|
|
def load_data_package_for_checksum(game: str, checksum: typing.Optional[str]) -> Dict[str, Any]:
|
|
|
|
if checksum and game:
|
|
|
|
if checksum != get_file_safe_name(checksum):
|
|
|
|
raise ValueError(f"Bad symbols in checksum: {checksum}")
|
|
|
|
path = cache_path("datapackage", get_file_safe_name(game), f"{checksum}.json")
|
|
|
|
if os.path.exists(path):
|
|
|
|
try:
|
|
|
|
with open(path, "r", encoding="utf-8-sig") as f:
|
|
|
|
return json.load(f)
|
|
|
|
except Exception as e:
|
|
|
|
logging.debug(f"Could not load data package: {e}")
|
|
|
|
|
|
|
|
# fall back to old cache
|
|
|
|
cache = persistent_load().get("datapackage", {}).get("games", {}).get(game, {})
|
|
|
|
if cache.get("checksum") == checksum:
|
|
|
|
return cache
|
|
|
|
|
|
|
|
# cache does not match
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
def store_data_package_for_checksum(game: str, data: typing.Dict[str, Any]) -> None:
|
|
|
|
checksum = data.get("checksum")
|
|
|
|
if checksum and game:
|
|
|
|
if checksum != get_file_safe_name(checksum):
|
|
|
|
raise ValueError(f"Bad symbols in checksum: {checksum}")
|
|
|
|
game_folder = cache_path("datapackage", get_file_safe_name(game))
|
|
|
|
os.makedirs(game_folder, exist_ok=True)
|
|
|
|
try:
|
|
|
|
with open(os.path.join(game_folder, f"{checksum}.json"), "w", encoding="utf-8-sig") as f:
|
|
|
|
json.dump(data, f, ensure_ascii=False, separators=(",", ":"))
|
|
|
|
except Exception as e:
|
|
|
|
logging.debug(f"Could not store data package: {e}")
|
|
|
|
|
2023-08-25 20:25:02 +00:00
|
|
|
def get_default_adjuster_settings(game_name: str) -> Namespace:
|
|
|
|
import LttPAdjuster
|
|
|
|
adjuster_settings = Namespace()
|
|
|
|
if game_name == LttPAdjuster.GAME_ALTTP:
|
|
|
|
return LttPAdjuster.get_argparser().parse_known_args(args=[])[0]
|
2023-03-20 16:01:08 +00:00
|
|
|
|
2022-01-20 03:19:58 +00:00
|
|
|
return adjuster_settings
|
2020-06-07 19:04:33 +00:00
|
|
|
|
2021-07-31 13:13:55 +00:00
|
|
|
|
2023-08-25 20:25:02 +00:00
|
|
|
def get_adjuster_settings_no_defaults(game_name: str) -> Namespace:
|
|
|
|
return persistent_load().get("adjuster", {}).get(game_name, Namespace())
|
|
|
|
|
|
|
|
|
|
|
|
def get_adjuster_settings(game_name: str) -> Namespace:
|
|
|
|
adjuster_settings = get_adjuster_settings_no_defaults(game_name)
|
|
|
|
default_settings = get_default_adjuster_settings(game_name)
|
|
|
|
|
|
|
|
# Fill in any arguments from the argparser that we haven't seen before
|
|
|
|
return Namespace(**vars(adjuster_settings), **{k:v for k,v in vars(default_settings).items() if k not in vars(adjuster_settings)})
|
|
|
|
|
|
|
|
|
2021-07-09 15:44:24 +00:00
|
|
|
@cache_argsless
|
2020-06-04 19:27:29 +00:00
|
|
|
def get_unique_identifier():
|
|
|
|
uuid = persistent_load().get("client", {}).get("uuid", None)
|
|
|
|
if uuid:
|
|
|
|
return uuid
|
|
|
|
|
|
|
|
import uuid
|
|
|
|
uuid = uuid.getnode()
|
|
|
|
persistent_store("client", "uuid", uuid)
|
|
|
|
return uuid
|
2020-09-08 23:41:37 +00:00
|
|
|
|
|
|
|
|
2022-08-11 22:32:37 +00:00
|
|
|
safe_builtins = frozenset((
|
2020-09-08 23:41:37 +00:00
|
|
|
'set',
|
|
|
|
'frozenset',
|
2022-08-11 22:32:37 +00:00
|
|
|
))
|
2020-09-08 23:41:37 +00:00
|
|
|
|
|
|
|
|
|
|
|
class RestrictedUnpickler(pickle.Unpickler):
|
2023-09-20 14:05:56 +00:00
|
|
|
generic_properties_module: Optional[object]
|
|
|
|
|
2021-09-17 23:02:26 +00:00
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
super(RestrictedUnpickler, self).__init__(*args, **kwargs)
|
|
|
|
self.options_module = importlib.import_module("Options")
|
|
|
|
self.net_utils_module = importlib.import_module("NetUtils")
|
2023-09-20 14:05:56 +00:00
|
|
|
self.generic_properties_module = None
|
2021-09-17 23:02:26 +00:00
|
|
|
|
2020-09-08 23:41:37 +00:00
|
|
|
def find_class(self, module, name):
|
|
|
|
if module == "builtins" and name in safe_builtins:
|
|
|
|
return getattr(builtins, name)
|
2021-09-17 23:02:26 +00:00
|
|
|
# used by MultiServer -> savegame/multidata
|
2022-01-30 12:57:12 +00:00
|
|
|
if module == "NetUtils" and name in {"NetworkItem", "ClientStatus", "Hint", "SlotType", "NetworkSlot"}:
|
2021-09-17 23:02:26 +00:00
|
|
|
return getattr(self.net_utils_module, name)
|
2021-09-23 00:29:24 +00:00
|
|
|
# Options and Plando are unpickled by WebHost -> Generate
|
|
|
|
if module == "worlds.generic" and name in {"PlandoItem", "PlandoConnection"}:
|
2023-09-20 14:05:56 +00:00
|
|
|
if not self.generic_properties_module:
|
|
|
|
self.generic_properties_module = importlib.import_module("worlds.generic")
|
2021-09-23 00:29:24 +00:00
|
|
|
return getattr(self.generic_properties_module, name)
|
2022-09-28 21:54:10 +00:00
|
|
|
# pep 8 specifies that modules should have "all-lowercase names" (options, not Options)
|
|
|
|
if module.lower().endswith("options"):
|
2021-09-17 23:02:26 +00:00
|
|
|
if module == "Options":
|
|
|
|
mod = self.options_module
|
|
|
|
else:
|
|
|
|
mod = importlib.import_module(module)
|
|
|
|
obj = getattr(mod, name)
|
|
|
|
if issubclass(obj, self.options_module.Option):
|
2021-05-16 20:59:45 +00:00
|
|
|
return obj
|
2020-09-08 23:41:37 +00:00
|
|
|
# Forbid everything else.
|
2022-08-11 22:32:37 +00:00
|
|
|
raise pickle.UnpicklingError(f"global '{module}.{name}' is forbidden")
|
2020-09-08 23:41:37 +00:00
|
|
|
|
|
|
|
|
|
|
|
def restricted_loads(s):
|
|
|
|
"""Helper function analogous to pickle.loads()."""
|
2021-07-07 08:14:58 +00:00
|
|
|
return RestrictedUnpickler(io.BytesIO(s)).load()
|
|
|
|
|
2021-07-09 15:44:24 +00:00
|
|
|
|
2023-05-02 06:23:39 +00:00
|
|
|
class ByValue:
|
|
|
|
"""
|
|
|
|
Mixin for enums to pickle value instead of name (restores pre-3.11 behavior). Use as left-most parent.
|
|
|
|
See https://github.com/python/cpython/pull/26658 for why this exists.
|
|
|
|
"""
|
|
|
|
def __reduce_ex__(self, prot):
|
|
|
|
return self.__class__, (self._value_, )
|
|
|
|
|
|
|
|
|
2021-07-07 08:14:58 +00:00
|
|
|
class KeyedDefaultDict(collections.defaultdict):
|
2022-08-12 04:52:01 +00:00
|
|
|
"""defaultdict variant that uses the missing key as argument to default_factory"""
|
|
|
|
default_factory: typing.Callable[[typing.Any], typing.Any]
|
|
|
|
|
2021-07-07 08:14:58 +00:00
|
|
|
def __missing__(self, key):
|
|
|
|
self[key] = value = self.default_factory(key)
|
2021-10-16 17:40:27 +00:00
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
|
|
def get_text_between(text: str, start: str, end: str) -> str:
|
|
|
|
return text[text.index(start) + len(start): text.rindex(end)]
|
2021-11-10 14:35:43 +00:00
|
|
|
|
|
|
|
|
2022-08-17 22:27:37 +00:00
|
|
|
def get_text_after(text: str, start: str) -> str:
|
|
|
|
return text[text.index(start) + len(start):]
|
|
|
|
|
|
|
|
|
2021-11-10 14:35:43 +00:00
|
|
|
loglevel_mapping = {'error': logging.ERROR, 'info': logging.INFO, 'warning': logging.WARNING, 'debug': logging.DEBUG}
|
|
|
|
|
|
|
|
|
|
|
|
def init_logging(name: str, loglevel: typing.Union[str, int] = logging.INFO, write_mode: str = "w",
|
2022-06-07 22:34:45 +00:00
|
|
|
log_format: str = "[%(name)s at %(asctime)s]: %(message)s",
|
|
|
|
exception_logger: typing.Optional[str] = None):
|
2022-11-17 20:27:44 +00:00
|
|
|
import datetime
|
2021-11-10 14:35:43 +00:00
|
|
|
loglevel: int = loglevel_mapping.get(loglevel, loglevel)
|
2022-03-31 03:08:15 +00:00
|
|
|
log_folder = user_path("logs")
|
2021-11-10 14:35:43 +00:00
|
|
|
os.makedirs(log_folder, exist_ok=True)
|
|
|
|
root_logger = logging.getLogger()
|
|
|
|
for handler in root_logger.handlers[:]:
|
|
|
|
root_logger.removeHandler(handler)
|
|
|
|
handler.close()
|
|
|
|
root_logger.setLevel(loglevel)
|
2023-05-20 17:18:25 +00:00
|
|
|
logging.getLogger("websockets").setLevel(loglevel) # make sure level is applied for websockets
|
2022-11-17 20:27:44 +00:00
|
|
|
if "a" not in write_mode:
|
|
|
|
name += f"_{datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"
|
2021-11-10 14:35:43 +00:00
|
|
|
file_handler = logging.FileHandler(
|
|
|
|
os.path.join(log_folder, f"{name}.txt"),
|
|
|
|
write_mode,
|
|
|
|
encoding="utf-8-sig")
|
|
|
|
file_handler.setFormatter(logging.Formatter(log_format))
|
2023-10-08 11:26:14 +00:00
|
|
|
|
|
|
|
class Filter(logging.Filter):
|
|
|
|
def __init__(self, filter_name, condition):
|
|
|
|
super().__init__(filter_name)
|
|
|
|
self.condition = condition
|
|
|
|
|
|
|
|
def filter(self, record: logging.LogRecord) -> bool:
|
|
|
|
return self.condition(record)
|
|
|
|
|
|
|
|
file_handler.addFilter(Filter("NoStream", lambda record: not getattr(record, "NoFile", False)))
|
2021-11-10 14:35:43 +00:00
|
|
|
root_logger.addHandler(file_handler)
|
|
|
|
if sys.stdout:
|
2023-10-08 11:26:14 +00:00
|
|
|
stream_handler = logging.StreamHandler(sys.stdout)
|
|
|
|
stream_handler.addFilter(Filter("NoFile", lambda record: not getattr(record, "NoStream", False)))
|
|
|
|
root_logger.addHandler(stream_handler)
|
2021-11-17 21:46:32 +00:00
|
|
|
|
|
|
|
# Relay unhandled exceptions to logger.
|
|
|
|
if not getattr(sys.excepthook, "_wrapped", False): # skip if already modified
|
|
|
|
orig_hook = sys.excepthook
|
|
|
|
|
|
|
|
def handle_exception(exc_type, exc_value, exc_traceback):
|
|
|
|
if issubclass(exc_type, KeyboardInterrupt):
|
|
|
|
sys.__excepthook__(exc_type, exc_value, exc_traceback)
|
|
|
|
return
|
|
|
|
logging.getLogger(exception_logger).exception("Uncaught exception",
|
|
|
|
exc_info=(exc_type, exc_value, exc_traceback))
|
|
|
|
return orig_hook(exc_type, exc_value, exc_traceback)
|
|
|
|
|
|
|
|
handle_exception._wrapped = True
|
|
|
|
|
|
|
|
sys.excepthook = handle_exception
|
2021-11-28 03:06:30 +00:00
|
|
|
|
2022-11-17 20:27:44 +00:00
|
|
|
def _cleanup():
|
|
|
|
for file in os.scandir(log_folder):
|
|
|
|
if file.name.endswith(".txt"):
|
|
|
|
last_change = datetime.datetime.fromtimestamp(file.stat().st_mtime)
|
|
|
|
if datetime.datetime.now() - last_change > datetime.timedelta(days=7):
|
|
|
|
try:
|
|
|
|
os.unlink(file.path)
|
|
|
|
except Exception as e:
|
|
|
|
logging.exception(e)
|
|
|
|
else:
|
2023-01-23 01:23:16 +00:00
|
|
|
logging.debug(f"Deleted old logfile {file.path}")
|
2022-11-17 20:27:44 +00:00
|
|
|
import threading
|
|
|
|
threading.Thread(target=_cleanup, name="LogCleaner").start()
|
2022-11-28 01:52:36 +00:00
|
|
|
import platform
|
|
|
|
logging.info(
|
|
|
|
f"Archipelago ({__version__}) logging initialized"
|
|
|
|
f" on {platform.platform()}"
|
|
|
|
f" running Python {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
|
|
|
|
)
|
2022-06-07 22:34:45 +00:00
|
|
|
|
2021-11-28 03:06:30 +00:00
|
|
|
|
|
|
|
def stream_input(stream, queue):
|
|
|
|
def queuer():
|
|
|
|
while 1:
|
2022-08-03 12:53:14 +00:00
|
|
|
try:
|
|
|
|
text = stream.readline().strip()
|
|
|
|
except UnicodeDecodeError as e:
|
|
|
|
logging.exception(e)
|
|
|
|
else:
|
|
|
|
if text:
|
|
|
|
queue.put_nowait(text)
|
2021-11-28 03:06:30 +00:00
|
|
|
|
|
|
|
from threading import Thread
|
|
|
|
thread = Thread(target=queuer, name=f"Stream handler for {stream.name}", daemon=True)
|
|
|
|
thread.start()
|
|
|
|
return thread
|
2022-01-18 07:23:38 +00:00
|
|
|
|
|
|
|
|
2022-08-11 22:32:37 +00:00
|
|
|
def tkinter_center_window(window: "tkinter.Tk") -> None:
|
2022-01-20 03:19:58 +00:00
|
|
|
window.update()
|
2022-08-11 22:32:37 +00:00
|
|
|
x = int(window.winfo_screenwidth() / 2 - window.winfo_reqwidth() / 2)
|
|
|
|
y = int(window.winfo_screenheight() / 2 - window.winfo_reqheight() / 2)
|
|
|
|
window.geometry(f"+{x}+{y}")
|
2022-01-20 03:19:58 +00:00
|
|
|
|
2022-02-24 03:47:01 +00:00
|
|
|
|
2022-01-18 07:23:38 +00:00
|
|
|
class VersionException(Exception):
|
|
|
|
pass
|
2022-01-20 03:19:58 +00:00
|
|
|
|
2022-02-24 03:47:01 +00:00
|
|
|
|
2022-06-21 18:50:40 +00:00
|
|
|
def chaining_prefix(index: int, labels: typing.Tuple[str]) -> str:
|
|
|
|
text = ""
|
|
|
|
max_label = len(labels) - 1
|
|
|
|
while index > max_label:
|
|
|
|
text += labels[-1]
|
|
|
|
index -= max_label
|
|
|
|
return labels[index] + text
|
|
|
|
|
|
|
|
|
2022-04-30 02:39:08 +00:00
|
|
|
# noinspection PyPep8Naming
|
2022-08-11 22:46:11 +00:00
|
|
|
def format_SI_prefix(value, power=1000, power_labels=("", "k", "M", "G", "T", "P", "E", "Z", "Y")) -> str:
|
2022-06-21 18:50:40 +00:00
|
|
|
"""Formats a value into a value + metric/si prefix. More info at https://en.wikipedia.org/wiki/Metric_prefix"""
|
2022-08-12 21:02:56 +00:00
|
|
|
import decimal
|
2022-02-24 03:47:01 +00:00
|
|
|
n = 0
|
2022-06-21 18:50:40 +00:00
|
|
|
value = decimal.Decimal(value)
|
2022-08-11 22:46:11 +00:00
|
|
|
limit = power - decimal.Decimal("0.005")
|
|
|
|
while value >= limit:
|
2022-02-24 03:47:01 +00:00
|
|
|
value /= power
|
|
|
|
n += 1
|
2022-06-21 18:50:40 +00:00
|
|
|
|
|
|
|
return f"{value.quantize(decimal.Decimal('1.00'))} {chaining_prefix(n, power_labels)}"
|
2022-05-09 05:18:50 +00:00
|
|
|
|
|
|
|
|
2022-05-09 15:03:16 +00:00
|
|
|
def get_fuzzy_results(input_word: str, wordlist: typing.Sequence[str], limit: typing.Optional[int] = None) \
|
|
|
|
-> typing.List[typing.Tuple[str, int]]:
|
2022-08-11 22:32:37 +00:00
|
|
|
import jellyfish
|
|
|
|
|
|
|
|
def get_fuzzy_ratio(word1: str, word2: str) -> float:
|
|
|
|
return (1 - jellyfish.damerau_levenshtein_distance(word1.lower(), word2.lower())
|
|
|
|
/ max(len(word1), len(word2)))
|
|
|
|
|
2022-05-09 15:03:16 +00:00
|
|
|
limit: int = limit if limit else len(wordlist)
|
|
|
|
return list(
|
|
|
|
map(
|
|
|
|
lambda container: (container[0], int(container[1]*100)), # convert up to limit to int %
|
|
|
|
sorted(
|
|
|
|
map(lambda candidate:
|
|
|
|
(candidate, get_fuzzy_ratio(input_word, candidate)),
|
|
|
|
wordlist),
|
|
|
|
key=lambda element: element[1],
|
|
|
|
reverse=True)[0:limit]
|
|
|
|
)
|
|
|
|
)
|
2022-06-04 15:02:02 +00:00
|
|
|
|
|
|
|
|
2023-07-05 20:39:35 +00:00
|
|
|
def open_filename(title: str, filetypes: typing.Sequence[typing.Tuple[str, typing.Sequence[str]]], suggest: str = "") \
|
2022-06-04 16:36:50 +00:00
|
|
|
-> typing.Optional[str]:
|
|
|
|
def run(*args: str):
|
2022-08-11 22:32:37 +00:00
|
|
|
return subprocess.run(args, capture_output=True, text=True).stdout.split("\n", 1)[0] or None
|
2022-06-04 16:36:50 +00:00
|
|
|
|
|
|
|
if is_linux:
|
|
|
|
# prefer native dialog
|
2022-08-11 22:32:37 +00:00
|
|
|
from shutil import which
|
|
|
|
kdialog = which("kdialog")
|
2022-06-04 16:36:50 +00:00
|
|
|
if kdialog:
|
|
|
|
k_filters = '|'.join((f'{text} (*{" *".join(ext)})' for (text, ext) in filetypes))
|
2023-07-05 20:39:35 +00:00
|
|
|
return run(kdialog, f"--title={title}", "--getopenfilename", suggest or ".", k_filters)
|
2022-08-11 22:32:37 +00:00
|
|
|
zenity = which("zenity")
|
2022-06-04 16:36:50 +00:00
|
|
|
if zenity:
|
|
|
|
z_filters = (f'--file-filter={text} ({", ".join(ext)}) | *{" *".join(ext)}' for (text, ext) in filetypes)
|
2023-09-24 08:30:33 +00:00
|
|
|
selection = (f"--filename={suggest}",) if suggest else ()
|
2023-07-05 20:39:35 +00:00
|
|
|
return run(zenity, f"--title={title}", "--file-selection", *z_filters, *selection)
|
|
|
|
|
|
|
|
# fall back to tk
|
|
|
|
try:
|
|
|
|
import tkinter
|
|
|
|
import tkinter.filedialog
|
|
|
|
except Exception as e:
|
|
|
|
logging.error('Could not load tkinter, which is likely not installed. '
|
|
|
|
f'This attempt was made because open_filename was used for "{title}".')
|
|
|
|
raise e
|
|
|
|
else:
|
2023-09-24 08:30:33 +00:00
|
|
|
try:
|
|
|
|
root = tkinter.Tk()
|
|
|
|
except tkinter.TclError:
|
|
|
|
return None # GUI not available. None is the same as a user clicking "cancel"
|
2023-07-05 20:39:35 +00:00
|
|
|
root.withdraw()
|
|
|
|
return tkinter.filedialog.askopenfilename(title=title, filetypes=((t[0], ' '.join(t[1])) for t in filetypes),
|
|
|
|
initialfile=suggest or None)
|
|
|
|
|
|
|
|
|
|
|
|
def open_directory(title: str, suggest: str = "") -> typing.Optional[str]:
|
|
|
|
def run(*args: str):
|
|
|
|
return subprocess.run(args, capture_output=True, text=True).stdout.split("\n", 1)[0] or None
|
|
|
|
|
|
|
|
if is_linux:
|
|
|
|
# prefer native dialog
|
|
|
|
from shutil import which
|
2023-09-24 08:30:33 +00:00
|
|
|
kdialog = which("kdialog")
|
2023-07-05 20:39:35 +00:00
|
|
|
if kdialog:
|
2023-09-24 08:30:33 +00:00
|
|
|
return run(kdialog, f"--title={title}", "--getexistingdirectory",
|
|
|
|
os.path.abspath(suggest) if suggest else ".")
|
|
|
|
zenity = which("zenity")
|
2023-07-05 20:39:35 +00:00
|
|
|
if zenity:
|
|
|
|
z_filters = ("--directory",)
|
2023-09-24 08:30:33 +00:00
|
|
|
selection = (f"--filename={os.path.abspath(suggest)}/",) if suggest else ()
|
2023-07-05 20:39:35 +00:00
|
|
|
return run(zenity, f"--title={title}", "--file-selection", *z_filters, *selection)
|
2022-06-04 16:36:50 +00:00
|
|
|
|
|
|
|
# fall back to tk
|
|
|
|
try:
|
|
|
|
import tkinter
|
|
|
|
import tkinter.filedialog
|
|
|
|
except Exception as e:
|
|
|
|
logging.error('Could not load tkinter, which is likely not installed. '
|
|
|
|
f'This attempt was made because open_filename was used for "{title}".')
|
|
|
|
raise e
|
|
|
|
else:
|
2023-09-24 08:30:33 +00:00
|
|
|
try:
|
|
|
|
root = tkinter.Tk()
|
|
|
|
except tkinter.TclError:
|
|
|
|
return None # GUI not available. None is the same as a user clicking "cancel"
|
2022-06-04 16:36:50 +00:00
|
|
|
root.withdraw()
|
2023-07-05 20:39:35 +00:00
|
|
|
return tkinter.filedialog.askdirectory(title=title, mustexist=True, initialdir=suggest or None)
|
2022-06-04 16:36:50 +00:00
|
|
|
|
|
|
|
|
2022-06-04 15:02:02 +00:00
|
|
|
def messagebox(title: str, text: str, error: bool = False) -> None:
|
2022-06-23 17:26:30 +00:00
|
|
|
def run(*args: str):
|
2022-08-11 22:32:37 +00:00
|
|
|
return subprocess.run(args, capture_output=True, text=True).stdout.split("\n", 1)[0] or None
|
2022-06-23 17:26:30 +00:00
|
|
|
|
2022-06-04 15:02:02 +00:00
|
|
|
def is_kivy_running():
|
2022-08-11 22:32:37 +00:00
|
|
|
if "kivy" in sys.modules:
|
2022-06-04 15:02:02 +00:00
|
|
|
from kivy.app import App
|
|
|
|
return App.get_running_app() is not None
|
|
|
|
return False
|
|
|
|
|
|
|
|
if is_kivy_running():
|
|
|
|
from kvui import MessageBox
|
|
|
|
MessageBox(title, text, error).open()
|
|
|
|
return
|
|
|
|
|
2022-08-11 22:32:37 +00:00
|
|
|
if is_linux and "tkinter" not in sys.modules:
|
2022-06-23 17:26:30 +00:00
|
|
|
# prefer native dialog
|
2022-08-11 22:32:37 +00:00
|
|
|
from shutil import which
|
|
|
|
kdialog = which("kdialog")
|
2022-06-23 17:26:30 +00:00
|
|
|
if kdialog:
|
2022-08-11 22:32:37 +00:00
|
|
|
return run(kdialog, f"--title={title}", "--error" if error else "--msgbox", text)
|
|
|
|
zenity = which("zenity")
|
2022-06-23 17:26:30 +00:00
|
|
|
if zenity:
|
2022-08-11 22:32:37 +00:00
|
|
|
return run(zenity, f"--title={title}", f"--text={text}", "--error" if error else "--info")
|
2022-06-23 17:26:30 +00:00
|
|
|
|
2022-06-04 15:02:02 +00:00
|
|
|
# fall back to tk
|
|
|
|
try:
|
|
|
|
import tkinter
|
|
|
|
from tkinter.messagebox import showerror, showinfo
|
|
|
|
except Exception as e:
|
|
|
|
logging.error('Could not load tkinter, which is likely not installed. '
|
|
|
|
f'This attempt was made because messagebox was used for "{title}".')
|
|
|
|
raise e
|
|
|
|
else:
|
|
|
|
root = tkinter.Tk()
|
|
|
|
root.withdraw()
|
|
|
|
showerror(title, text) if error else showinfo(title, text)
|
|
|
|
root.update()
|
2022-08-09 20:21:45 +00:00
|
|
|
|
|
|
|
|
|
|
|
def title_sorted(data: typing.Sequence, key=None, ignore: typing.Set = frozenset(("a", "the"))):
|
|
|
|
"""Sorts a sequence of text ignoring typical articles like "a" or "the" in the beginning."""
|
2023-02-17 18:16:37 +00:00
|
|
|
def sorter(element: Union[str, Dict[str, Any]]) -> str:
|
|
|
|
if (not isinstance(element, str)):
|
|
|
|
element = element["title"]
|
|
|
|
|
2022-08-09 20:21:45 +00:00
|
|
|
parts = element.split(maxsplit=1)
|
|
|
|
if parts[0].lower() in ignore:
|
2022-08-26 14:44:09 +00:00
|
|
|
return parts[1].lower()
|
2022-08-09 20:21:45 +00:00
|
|
|
else:
|
2022-08-26 14:44:09 +00:00
|
|
|
return element.lower()
|
2022-08-09 20:21:45 +00:00
|
|
|
return sorted(data, key=lambda i: sorter(key(i)) if key else sorter(i))
|
2022-09-29 22:36:30 +00:00
|
|
|
|
|
|
|
|
|
|
|
def read_snes_rom(stream: BinaryIO, strip_header: bool = True) -> bytearray:
|
|
|
|
"""Reads rom into bytearray and optionally strips off any smc header"""
|
|
|
|
buffer = bytearray(stream.read())
|
|
|
|
if strip_header and len(buffer) % 0x400 == 0x200:
|
|
|
|
return buffer[0x200:]
|
|
|
|
return buffer
|
2022-11-02 14:51:35 +00:00
|
|
|
|
|
|
|
|
2023-06-29 13:06:58 +00:00
|
|
|
_faf_tasks: "Set[asyncio.Task[typing.Any]]" = set()
|
2022-11-02 14:51:35 +00:00
|
|
|
|
|
|
|
|
2023-06-29 13:06:58 +00:00
|
|
|
def async_start(co: Coroutine[None, None, typing.Any], name: Optional[str] = None) -> None:
|
2022-11-02 14:51:35 +00:00
|
|
|
"""
|
|
|
|
Use this to start a task when you don't keep a reference to it or immediately await it,
|
|
|
|
to prevent early garbage collection. "fire-and-forget"
|
|
|
|
"""
|
|
|
|
# https://docs.python.org/3.10/library/asyncio-task.html#asyncio.create_task
|
|
|
|
# Python docs:
|
|
|
|
# ```
|
|
|
|
# Important: Save a reference to the result of [asyncio.create_task],
|
|
|
|
# to avoid a task disappearing mid-execution.
|
|
|
|
# ```
|
|
|
|
# This implementation follows the pattern given in that documentation.
|
|
|
|
|
2023-06-29 13:06:58 +00:00
|
|
|
task: asyncio.Task[typing.Any] = asyncio.create_task(co, name=name)
|
2022-11-02 14:51:35 +00:00
|
|
|
_faf_tasks.add(task)
|
|
|
|
task.add_done_callback(_faf_tasks.discard)
|
2023-06-19 07:57:17 +00:00
|
|
|
|
|
|
|
|
|
|
|
def deprecate(message: str):
|
|
|
|
if __debug__:
|
|
|
|
raise Exception(message)
|
|
|
|
import warnings
|
|
|
|
warnings.warn(message)
|
2023-06-25 00:24:43 +00:00
|
|
|
|
|
|
|
def _extend_freeze_support() -> None:
|
|
|
|
"""Extend multiprocessing.freeze_support() to also work on Non-Windows for spawn."""
|
|
|
|
# upstream issue: https://github.com/python/cpython/issues/76327
|
|
|
|
# code based on https://github.com/pyinstaller/pyinstaller/blob/develop/PyInstaller/hooks/rthooks/pyi_rth_multiprocessing.py#L26
|
|
|
|
import multiprocessing
|
|
|
|
import multiprocessing.spawn
|
|
|
|
|
|
|
|
def _freeze_support() -> None:
|
|
|
|
"""Minimal freeze_support. Only apply this if frozen."""
|
|
|
|
from subprocess import _args_from_interpreter_flags
|
|
|
|
|
|
|
|
# Prevent `spawn` from trying to read `__main__` in from the main script
|
|
|
|
multiprocessing.process.ORIGINAL_DIR = None
|
|
|
|
|
|
|
|
# Handle the first process that MP will create
|
|
|
|
if (
|
|
|
|
len(sys.argv) >= 2 and sys.argv[-2] == '-c' and sys.argv[-1].startswith((
|
|
|
|
'from multiprocessing.semaphore_tracker import main', # Py<3.8
|
|
|
|
'from multiprocessing.resource_tracker import main', # Py>=3.8
|
|
|
|
'from multiprocessing.forkserver import main'
|
|
|
|
)) and set(sys.argv[1:-2]) == set(_args_from_interpreter_flags())
|
|
|
|
):
|
|
|
|
exec(sys.argv[-1])
|
|
|
|
sys.exit()
|
|
|
|
|
|
|
|
# Handle the second process that MP will create
|
|
|
|
if multiprocessing.spawn.is_forking(sys.argv):
|
|
|
|
kwargs = {}
|
|
|
|
for arg in sys.argv[2:]:
|
|
|
|
name, value = arg.split('=')
|
|
|
|
if value == 'None':
|
|
|
|
kwargs[name] = None
|
|
|
|
else:
|
|
|
|
kwargs[name] = int(value)
|
|
|
|
multiprocessing.spawn.spawn_main(**kwargs)
|
|
|
|
sys.exit()
|
|
|
|
|
|
|
|
if not is_windows and is_frozen():
|
|
|
|
multiprocessing.freeze_support = multiprocessing.spawn.freeze_support = _freeze_support
|
|
|
|
|
|
|
|
|
|
|
|
def freeze_support() -> None:
|
|
|
|
"""This behaves like multiprocessing.freeze_support but also works on Non-Windows."""
|
|
|
|
import multiprocessing
|
|
|
|
_extend_freeze_support()
|
|
|
|
multiprocessing.freeze_support()
|
2023-10-01 23:56:55 +00:00
|
|
|
|
|
|
|
|
|
|
|
def visualize_regions(root_region: Region, file_name: str, *,
|
|
|
|
show_entrance_names: bool = False, show_locations: bool = True, show_other_regions: bool = True,
|
|
|
|
linetype_ortho: bool = True) -> None:
|
|
|
|
"""Visualize the layout of a world as a PlantUML diagram.
|
|
|
|
|
|
|
|
:param root_region: The region from which to start the diagram from. (Usually the "Menu" region of your world.)
|
|
|
|
:param file_name: The name of the destination .puml file.
|
|
|
|
:param show_entrance_names: (default False) If enabled, the name of the entrance will be shown near each connection.
|
|
|
|
:param show_locations: (default True) If enabled, the locations will be listed inside each region.
|
|
|
|
Priority locations will be shown in bold.
|
|
|
|
Excluded locations will be stricken out.
|
|
|
|
Locations without ID will be shown in italics.
|
|
|
|
Locked locations will be shown with a padlock icon.
|
|
|
|
For filled locations, the item name will be shown after the location name.
|
|
|
|
Progression items will be shown in bold.
|
|
|
|
Items without ID will be shown in italics.
|
|
|
|
:param show_other_regions: (default True) If enabled, regions that can't be reached by traversing exits are shown.
|
|
|
|
:param linetype_ortho: (default True) If enabled, orthogonal straight line parts will be used; otherwise polylines.
|
|
|
|
|
|
|
|
Example usage in World code:
|
|
|
|
from Utils import visualize_regions
|
|
|
|
visualize_regions(self.multiworld.get_region("Menu", self.player), "my_world.puml")
|
|
|
|
|
|
|
|
Example usage in Main code:
|
|
|
|
from Utils import visualize_regions
|
|
|
|
for player in world.player_ids:
|
|
|
|
visualize_regions(world.get_region("Menu", player), f"{world.get_out_file_name_base(player)}.puml")
|
|
|
|
"""
|
|
|
|
assert root_region.multiworld, "The multiworld attribute of root_region has to be filled"
|
|
|
|
from BaseClasses import Entrance, Item, Location, LocationProgressType, MultiWorld, Region
|
|
|
|
from collections import deque
|
|
|
|
import re
|
|
|
|
|
|
|
|
uml: typing.List[str] = list()
|
|
|
|
seen: typing.Set[Region] = set()
|
|
|
|
regions: typing.Deque[Region] = deque((root_region,))
|
|
|
|
multiworld: MultiWorld = root_region.multiworld
|
|
|
|
|
|
|
|
def fmt(obj: Union[Entrance, Item, Location, Region]) -> str:
|
|
|
|
name = obj.name
|
|
|
|
if isinstance(obj, Item):
|
|
|
|
name = multiworld.get_name_string_for_object(obj)
|
|
|
|
if obj.advancement:
|
|
|
|
name = f"**{name}**"
|
|
|
|
if obj.code is None:
|
|
|
|
name = f"//{name}//"
|
|
|
|
if isinstance(obj, Location):
|
|
|
|
if obj.progress_type == LocationProgressType.PRIORITY:
|
|
|
|
name = f"**{name}**"
|
|
|
|
elif obj.progress_type == LocationProgressType.EXCLUDED:
|
|
|
|
name = f"--{name}--"
|
|
|
|
if obj.address is None:
|
|
|
|
name = f"//{name}//"
|
|
|
|
return re.sub("[\".:]", "", name)
|
|
|
|
|
|
|
|
def visualize_exits(region: Region) -> None:
|
|
|
|
for exit_ in region.exits:
|
|
|
|
if exit_.connected_region:
|
|
|
|
if show_entrance_names:
|
|
|
|
uml.append(f"\"{fmt(region)}\" --> \"{fmt(exit_.connected_region)}\" : \"{fmt(exit_)}\"")
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
uml.remove(f"\"{fmt(exit_.connected_region)}\" --> \"{fmt(region)}\"")
|
|
|
|
uml.append(f"\"{fmt(exit_.connected_region)}\" <--> \"{fmt(region)}\"")
|
|
|
|
except ValueError:
|
|
|
|
uml.append(f"\"{fmt(region)}\" --> \"{fmt(exit_.connected_region)}\"")
|
|
|
|
else:
|
|
|
|
uml.append(f"circle \"unconnected exit:\\n{fmt(exit_)}\"")
|
|
|
|
uml.append(f"\"{fmt(region)}\" --> \"unconnected exit:\\n{fmt(exit_)}\"")
|
|
|
|
|
|
|
|
def visualize_locations(region: Region) -> None:
|
|
|
|
any_lock = any(location.locked for location in region.locations)
|
|
|
|
for location in region.locations:
|
|
|
|
lock = "<&lock-locked> " if location.locked else "<&lock-unlocked,color=transparent> " if any_lock else ""
|
|
|
|
if location.item:
|
|
|
|
uml.append(f"\"{fmt(region)}\" : {{method}} {lock}{fmt(location)}: {fmt(location.item)}")
|
|
|
|
else:
|
|
|
|
uml.append(f"\"{fmt(region)}\" : {{field}} {lock}{fmt(location)}")
|
|
|
|
|
|
|
|
def visualize_region(region: Region) -> None:
|
|
|
|
uml.append(f"class \"{fmt(region)}\"")
|
|
|
|
if show_locations:
|
|
|
|
visualize_locations(region)
|
|
|
|
visualize_exits(region)
|
|
|
|
|
|
|
|
def visualize_other_regions() -> None:
|
|
|
|
if other_regions := [region for region in multiworld.get_regions(root_region.player) if region not in seen]:
|
|
|
|
uml.append("package \"other regions\" <<Cloud>> {")
|
|
|
|
for region in other_regions:
|
|
|
|
uml.append(f"class \"{fmt(region)}\"")
|
|
|
|
uml.append("}")
|
|
|
|
|
|
|
|
uml.append("@startuml")
|
|
|
|
uml.append("hide circle")
|
|
|
|
uml.append("hide empty members")
|
|
|
|
if linetype_ortho:
|
|
|
|
uml.append("skinparam linetype ortho")
|
|
|
|
while regions:
|
|
|
|
if (current_region := regions.popleft()) not in seen:
|
|
|
|
seen.add(current_region)
|
|
|
|
visualize_region(current_region)
|
|
|
|
regions.extend(exit_.connected_region for exit_ in current_region.exits if exit_.connected_region)
|
|
|
|
if show_other_regions:
|
|
|
|
visualize_other_regions()
|
|
|
|
uml.append("@enduml")
|
|
|
|
|
|
|
|
with open(file_name, "wt", encoding="utf-8") as f:
|
|
|
|
f.write("\n".join(uml))
|