Archipelago/Utils.py

401 lines
13 KiB
Python
Raw Normal View History

from __future__ import annotations
2021-01-02 11:49:43 +00:00
import typing
def tuplize_version(version: str) -> typing.Tuple[int, ...]:
return Version(*(int(piece, 10) for piece in version.split(".")))
class Version(typing.NamedTuple):
major: int
minor: int
2021-02-21 22:46:05 +00:00
build: int
2021-07-01 23:29:49 +00:00
2021-07-27 12:57:44 +00:00
__version__ = "0.1.6"
2021-06-18 20:15:54 +00:00
version_tuple = tuplize_version(__version__)
2020-04-20 12:50:49 +00:00
2021-01-03 13:41:21 +00:00
import builtins
import os
import subprocess
import sys
2020-09-08 23:41:37 +00:00
import pickle
import functools
2021-01-03 13:32:32 +00:00
import io
import collections
from yaml import load, dump, safe_load
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
def int16_as_bytes(value):
value = value & 0xFFFF
return [value & 0xFF, (value >> 8) & 0xFF]
def int32_as_bytes(value):
value = value & 0xFFFFFFFF
return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF]
2018-09-23 02:51:54 +00:00
def pc_to_snes(value):
2021-01-02 11:49:43 +00:00
return ((value << 1) & 0x7F0000) | (value & 0x7FFF) | 0x8000
2018-09-23 02:51:54 +00:00
2018-09-23 02:51:54 +00:00
def snes_to_pc(value):
2021-01-02 11:49:43 +00:00
return ((value & 0x7F0000) >> 1) | (value & 0x7FFF)
2018-09-23 02:51:54 +00:00
def parse_player_names(names, players, teams):
names = tuple(n for n in (n.strip() for n in names.split(",")) if n)
if len(names) != len(set(names)):
2021-03-07 19:11:36 +00:00
name_counter = collections.Counter(names)
raise ValueError(f"Duplicate Player names is not supported, "
f'found multiple "{name_counter.most_common(1)[0][0]}".')
ret = []
while names or len(ret) < teams:
team = [n[:16] for n in names[:players]]
# 16 bytes in rom per player, which will map to more in unicode, but those characters later get filtered
while len(team) != players:
2020-04-18 19:46:57 +00:00
team.append(f"Player{len(team) + 1}")
ret.append(team)
names = names[players:]
return ret
2021-07-09 15:44:24 +00:00
def cache_argsless(function):
if function.__code__.co_argcount:
raise Exception("Can only cache 0 argument functions with this cache.")
result = sentinel = object()
def _wrap():
nonlocal result
if result is sentinel:
result = function()
return result
return _wrap
def is_frozen() -> bool:
return getattr(sys, 'frozen', False)
def local_path(*path):
if local_path.cached_path:
return os.path.join(local_path.cached_path, *path)
elif is_frozen():
2020-03-23 06:45:40 +00:00
if hasattr(sys, "_MEIPASS"):
# we are running in a PyInstaller bundle
local_path.cached_path = sys._MEIPASS # pylint: disable=protected-access,no-member
else:
# cx_Freeze
local_path.cached_path = os.path.dirname(os.path.abspath(sys.argv[0]))
else:
2020-03-23 06:45:40 +00:00
import __main__
if hasattr(__main__, "__file__"):
# we are running in a normal Python environment
local_path.cached_path = os.path.dirname(os.path.abspath(__main__.__file__))
else:
# pray
local_path.cached_path = os.path.abspath(".")
return os.path.join(local_path.cached_path, *path)
2021-01-02 11:49:43 +00:00
local_path.cached_path = None
def output_path(*path):
if output_path.cached_path:
return os.path.join(output_path.cached_path, *path)
output_path.cached_path = local_path(get_options()["general_options"]["output_path"])
path = os.path.join(output_path.cached_path, *path)
os.makedirs(os.path.dirname(path), exist_ok=True)
return path
2021-01-02 11:49:43 +00:00
output_path.cached_path = None
2021-01-02 11:49:43 +00:00
def open_file(filename):
if sys.platform == 'win32':
os.startfile(filename)
else:
open_command = 'open' if sys.platform == 'darwin' else 'xdg-open'
subprocess.call([open_command, filename])
2021-01-02 11:49:43 +00:00
parse_yaml = safe_load
unsafe_parse_yaml = functools.partial(load, Loader=Loader)
2021-07-30 23:40:27 +00:00
2021-07-09 15:44:24 +00:00
@cache_argsless
def get_public_ipv4() -> str:
import socket
import urllib.request
import logging
ip = socket.gethostbyname(socket.gethostname())
try:
ip = urllib.request.urlopen('https://checkip.amazonaws.com/').read().decode('utf8').strip()
except Exception as e:
try:
ip = urllib.request.urlopen('https://v4.ident.me').read().decode('utf8').strip()
except:
logging.exception(e)
pass # we could be offline, in a local game, so no point in erroring out
return ip
2021-07-30 23:40:27 +00:00
2021-07-09 15:44:24 +00:00
@cache_argsless
2020-06-14 07:06:37 +00:00
def get_public_ipv6() -> str:
import socket
import urllib.request
import logging
ip = socket.gethostbyname(socket.gethostname())
try:
ip = urllib.request.urlopen('https://v6.ident.me').read().decode('utf8').strip()
except Exception as e:
logging.exception(e)
2020-06-21 14:13:42 +00:00
pass # we could be offline, in a local game, or ipv6 may not be available
2020-06-14 07:06:37 +00:00
return ip
2021-07-30 23:40:27 +00:00
2021-07-09 15:44:24 +00:00
@cache_argsless
2020-11-28 19:34:29 +00:00
def get_default_options() -> dict:
2021-07-09 15:44:24 +00:00
# Refer to host.yaml for comments as to what all these options mean.
options = {
"general_options": {
"output_path": "output",
},
"factorio_options": {
"executable": "factorio\\bin\\x64\\factorio",
},
"lttp_options": {
"rom_file": "Zelda no Densetsu - Kamigami no Triforce (Japan).sfc",
"sni": "SNI",
"rom_start": True,
},
"server_options": {
"host": None,
"port": 38281,
"password": None,
"multidata": None,
"savefile": None,
"disable_save": False,
"loglevel": "info",
"server_password": None,
"disable_item_cheat": False,
"location_check_points": 1,
"hint_cost": 10,
"forfeit_mode": "goal",
"remaining_mode": "goal",
"auto_shutdown": 0,
"compatibility": 2,
"log_network": 0
},
"generator": {
2021-07-09 15:44:24 +00:00
"teams": 1,
"enemizer_path": "EnemizerCLI/EnemizerCLI.Core.exe",
"player_files_path": "Players",
"players": 0,
"weights_file_path": "weights.yaml",
"meta_file_path": "meta.yaml",
"spoiler": 2,
2021-07-09 15:44:24 +00:00
"glitch_triforce_room": 1,
"race": 0,
"plando_options": "bosses",
2021-01-02 11:49:43 +00:00
}
2021-07-09 15:44:24 +00:00
}
2021-01-02 11:49:43 +00:00
2021-07-09 15:44:24 +00:00
return options
2020-11-28 19:34:29 +00:00
def update_options(src: dict, dest: dict, filename: str, keys: list) -> dict:
import logging
for key, value in src.items():
new_keys = keys.copy()
new_keys.append(key)
option_name = '.'.join(new_keys)
if key not in dest:
dest[key] = value
2021-07-30 23:40:27 +00:00
if filename.endswith("options.yaml"):
logging.info(f"Warning: {filename} is missing {option_name}")
elif isinstance(value, dict):
if not isinstance(dest.get(key, None), dict):
2021-07-30 23:40:27 +00:00
if filename.endswith("options.yaml"):
logging.info(f"Warning: {filename} has {option_name}, but it is not a dictionary. overwriting.")
dest[key] = value
else:
dest[key] = update_options(value, dest[key], filename, new_keys)
return dest
2020-11-28 19:34:29 +00:00
2021-07-30 23:40:27 +00:00
2021-07-09 15:44:24 +00:00
@cache_argsless
def get_options() -> dict:
2020-03-23 06:59:55 +00:00
if not hasattr(get_options, "options"):
locations = ("options.yaml", "host.yaml",
local_path("options.yaml"), local_path("host.yaml"))
for location in locations:
if os.path.exists(location):
with open(location) as f:
2020-11-28 19:34:29 +00:00
options = parse_yaml(f.read())
get_options.options = update_options(get_default_options(), options, location, list())
2020-03-23 06:59:55 +00:00
break
else:
raise FileNotFoundError(f"Could not find {locations[1]} to load options.")
return get_options.options
def get_item_name_from_id(code: int) -> str:
from worlds import lookup_any_item_id_to_name
return lookup_any_item_id_to_name.get(code, f'Unknown item (ID:{code})')
def get_location_name_from_id(code: int) -> str:
from worlds import lookup_any_location_id_to_name
return lookup_any_location_id_to_name.get(code, f'Unknown location (ID:{code})')
def persistent_store(category: str, key: typing.Any, value: typing.Any):
path = local_path("_persistent_storage.yaml")
storage: dict = persistent_load()
category = storage.setdefault(category, {})
category[key] = value
with open(path, "wt") as f:
f.write(dump(storage))
def persistent_load() -> typing.Dict[dict]:
storage = getattr(persistent_load, "storage", None)
if storage:
return storage
path = local_path("_persistent_storage.yaml")
storage: dict = {}
if os.path.exists(path):
try:
with open(path, "r") as f:
storage = unsafe_parse_yaml(f.read())
except Exception as e:
import logging
logging.debug(f"Could not read store: {e}")
if storage is None:
storage = {}
persistent_load.storage = storage
return storage
def get_adjuster_settings(romfile: str) -> typing.Tuple[str, bool]:
if hasattr(get_adjuster_settings, "adjuster_settings"):
adjuster_settings = getattr(get_adjuster_settings, "adjuster_settings")
else:
2020-11-11 12:15:35 +00:00
adjuster_settings = persistent_load().get("adjuster", {}).get("last_settings_3", {})
if adjuster_settings:
import pprint
import Patch
adjuster_settings.rom = romfile
adjuster_settings.baserom = Patch.get_base_rom_path()
adjuster_settings.world = None
whitelist = {"disablemusic", "fastmenu", "heartbeep", "heartcolor", "ow_palettes", "quickswap",
2020-11-11 12:15:35 +00:00
"uw_palettes", "sprite"}
printed_options = {name: value for name, value in vars(adjuster_settings).items() if name in whitelist}
if hasattr(adjuster_settings, "sprite_pool"):
sprite_pool = {}
for sprite in getattr(adjuster_settings, "sprite_pool"):
if sprite in sprite_pool:
sprite_pool[sprite] += 1
else:
sprite_pool[sprite] = 1
if sprite_pool:
printed_options["sprite_pool"] = sprite_pool
2020-11-11 12:15:35 +00:00
if hasattr(get_adjuster_settings, "adjust_wanted"):
adjust_wanted = getattr(get_adjuster_settings, "adjust_wanted")
2021-01-02 11:49:43 +00:00
elif persistent_load().get("adjuster", {}).get("never_adjust", False): # never adjust, per user request
2020-11-11 12:15:35 +00:00
return romfile, False
else:
adjust_wanted = input(f"Last used adjuster settings were found. Would you like to apply these? \n"
f"{pprint.pformat(printed_options)}\n"
2020-11-11 12:15:35 +00:00
f"Enter yes, no or never: ")
if adjust_wanted and adjust_wanted.startswith("y"):
if hasattr(adjuster_settings, "sprite_pool"):
2021-06-05 20:58:59 +00:00
from LttPAdjuster import AdjusterWorld
adjuster_settings.world = AdjusterWorld(getattr(adjuster_settings, "sprite_pool"))
adjusted = True
2021-06-05 20:58:59 +00:00
import LttPAdjuster
_, romfile = LttPAdjuster.adjust(adjuster_settings)
if hasattr(adjuster_settings, "world"):
delattr(adjuster_settings, "world")
2020-11-11 12:15:35 +00:00
elif adjust_wanted and "never" in adjust_wanted:
persistent_store("adjuster", "never_adjust", True)
return romfile, False
else:
adjusted = False
import logging
if not hasattr(get_adjuster_settings, "adjust_wanted"):
logging.info(f"Skipping post-patch adjustment")
get_adjuster_settings.adjuster_settings = adjuster_settings
get_adjuster_settings.adjust_wanted = adjust_wanted
return romfile, adjusted
return romfile, False
2021-07-09 15:44:24 +00:00
@cache_argsless
def get_unique_identifier():
uuid = persistent_load().get("client", {}).get("uuid", None)
if uuid:
return uuid
import uuid
uuid = uuid.getnode()
persistent_store("client", "uuid", uuid)
return uuid
2020-09-08 23:41:37 +00:00
safe_builtins = {
'set',
'frozenset',
}
class RestrictedUnpickler(pickle.Unpickler):
def find_class(self, module, name):
if module == "builtins" and name in safe_builtins:
return getattr(builtins, name)
if module == "NetUtils" and name in {"NetworkItem", "ClientStatus", "Hint"}:
import NetUtils
return getattr(NetUtils, name)
2021-05-16 20:59:45 +00:00
if module == "Options":
import Options
obj = getattr(Options, name)
if issubclass(obj, Options.Option):
return obj
2020-09-08 23:41:37 +00:00
# Forbid everything else.
raise pickle.UnpicklingError("global '%s.%s' is forbidden" %
(module, name))
def restricted_loads(s):
"""Helper function analogous to pickle.loads()."""
return RestrictedUnpickler(io.BytesIO(s)).load()
2021-07-09 15:44:24 +00:00
class KeyedDefaultDict(collections.defaultdict):
def __missing__(self, key):
self[key] = value = self.default_factory(key)
return value