from __future__ import annotations import typing def tuplize_version(version: str) -> typing.Tuple[int, ...]: return tuple(int(piece, 10) for piece in version.split(".")) __version__ = "3.3.3" _version_tuple = tuplize_version(__version__) import os import subprocess import sys import pickle import io import builtins import functools from yaml import load, dump, safe_load try: from yaml import CLoader as Loader except ImportError: from yaml import Loader def int16_as_bytes(value): value = value & 0xFFFF return [value & 0xFF, (value >> 8) & 0xFF] def int32_as_bytes(value): value = value & 0xFFFFFFFF return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF] def pc_to_snes(value): return ((value<<1) & 0x7F0000)|(value & 0x7FFF)|0x8000 def snes_to_pc(value): return ((value & 0x7F0000)>>1)|(value & 0x7FFF) def parse_player_names(names, players, teams): names = tuple(n for n in (n.strip() for n in names.split(",")) if n) if len(names) != len(set(names)): raise ValueError("Duplicate Player names is not supported.") ret = [] while names or len(ret) < teams: team = [n[:16] for n in names[:players]] # 16 bytes in rom per player, which will map to more in unicode, but those characters later get filtered while len(team) != players: team.append(f"Player{len(team) + 1}") ret.append(team) names = names[players:] return ret def is_bundled() -> bool: return getattr(sys, 'frozen', False) def local_path(*path): if local_path.cached_path: return os.path.join(local_path.cached_path, *path) elif is_bundled(): if hasattr(sys, "_MEIPASS"): # we are running in a PyInstaller bundle local_path.cached_path = sys._MEIPASS # pylint: disable=protected-access,no-member else: # cx_Freeze local_path.cached_path = os.path.dirname(os.path.abspath(sys.argv[0])) else: # we are running in a normal Python environment import __main__ local_path.cached_path = os.path.dirname(os.path.abspath(__main__.__file__)) return os.path.join(local_path.cached_path, *path) local_path.cached_path = None def output_path(*path): if output_path.cached_path: return os.path.join(output_path.cached_path, *path) output_path.cached_path = local_path(get_options()["general_options"]["output_path"]) path = os.path.join(output_path.cached_path, *path) os.makedirs(os.path.dirname(path), exist_ok=True) return path output_path.cached_path = None def open_file(filename): if sys.platform == 'win32': os.startfile(filename) else: open_command = 'open' if sys.platform == 'darwin' else 'xdg-open' subprocess.call([open_command, filename]) def close_console(): if sys.platform == 'win32': #windows import ctypes.wintypes try: ctypes.windll.kernel32.FreeConsole() except Exception: pass parse_yaml = safe_load unsafe_parse_yaml = functools.partial(load, Loader=Loader) class Hint(typing.NamedTuple): receiving_player: int finding_player: int location: int item: int found: bool entrance: str = "" def re_check(self, ctx, team) -> Hint: if self.found: return self found = self.location in ctx.location_checks[team, self.finding_player] if found: return Hint(self.receiving_player, self.finding_player, self.location, self.item, found, self.entrance) return self def as_legacy(self) -> tuple: return self.receiving_player, self.finding_player, self.location, self.item, self.found def __hash__(self): return hash((self.receiving_player, self.finding_player, self.location, self.item, self.entrance)) def get_public_ipv4() -> str: import socket import urllib.request import logging ip = socket.gethostbyname(socket.gethostname()) try: ip = urllib.request.urlopen('https://checkip.amazonaws.com/').read().decode('utf8').strip() except Exception as e: try: ip = urllib.request.urlopen('https://v4.ident.me').read().decode('utf8').strip() except: logging.exception(e) pass # we could be offline, in a local game, so no point in erroring out return ip def get_public_ipv6() -> str: import socket import urllib.request import logging ip = socket.gethostbyname(socket.gethostname()) try: ip = urllib.request.urlopen('https://v6.ident.me').read().decode('utf8').strip() except Exception as e: logging.exception(e) pass # we could be offline, in a local game, or ipv6 may not be available return ip def get_default_options() -> dict: if not hasattr(get_default_options, "options"): options = dict() # Refer to host.yaml for comments as to what all these options mean. generaloptions = dict() generaloptions["rom_file"] = "Zelda no Densetsu - Kamigami no Triforce (Japan).sfc" generaloptions["qusb2snes"] = "QUsb2Snes\\QUsb2Snes.exe" generaloptions["rom_start"] = True generaloptions["output_path"] = "output" options["general_options"] = generaloptions serveroptions = dict() serveroptions["host"] = None serveroptions["port"] = 38281 serveroptions["password"] = None serveroptions["multidata"] = None serveroptions["savefile"] = None serveroptions["disable_save"] = False serveroptions["loglevel"] = "info" serveroptions["server_password"] = None serveroptions["disable_item_cheat"] = False serveroptions["location_check_points"] = 1 serveroptions["hint_cost"] = 1000 serveroptions["forfeit_mode"] = "goal" serveroptions["remaining_mode"] = "goal" serveroptions["auto_shutdown"] = 0 serveroptions["compatibility"] = 2 options["server_options"] = serveroptions multimysteryoptions = dict() multimysteryoptions["teams"] = 1 multimysteryoptions["enemizer_path"] = "EnemizerCLI/EnemizerCLI.Core.exe" multimysteryoptions["player_files_path"] = "Players" multimysteryoptions["players"] = 0 multimysteryoptions["weights_file_path"] = "weights.yaml" multimysteryoptions["meta_file_path"] = "meta.yaml" multimysteryoptions["player_name"] = "" multimysteryoptions["create_spoiler"] = 1 multimysteryoptions["zip_roms"] = 0 multimysteryoptions["zip_diffs"] = 2 multimysteryoptions["zip_spoiler"] = 0 multimysteryoptions["zip_multidata"] = 1 multimysteryoptions["zip_format"] = 1 multimysteryoptions["race"] = 0 multimysteryoptions["cpu_threads"] = 0 multimysteryoptions["max_attempts"] = 0 multimysteryoptions["take_first_working"] = False multimysteryoptions["keep_all_seeds"] = False multimysteryoptions["log_output_path"] = "Output Logs" multimysteryoptions["log_level"] = None options["multi_mystery_options"] = multimysteryoptions get_default_options.options = options return get_default_options.options blacklisted_options = {"multi_mystery_options.cpu_threads", "multi_mystery_options.max_attempts", "multi_mystery_options.take_first_working", "multi_mystery_options.keep_all_seeds", "multi_mystery_options.log_output_path", "multi_mystery_options.log_level"} def update_options(src: dict, dest: dict, filename: str, keys: list) -> dict: import logging for key, value in src.items(): new_keys = keys.copy() new_keys.append(key) option_name = '.'.join(new_keys) if key not in dest: dest[key] = value if filename.endswith("options.yaml") and option_name not in blacklisted_options: logging.info(f"Warning: {filename} is missing {option_name}") elif isinstance(value, dict): if not isinstance(dest.get(key, None), dict): if filename.endswith("options.yaml") and option_name not in blacklisted_options: logging.info(f"Warning: {filename} has {option_name}, but it is not a dictionary. overwriting.") dest[key] = value else: dest[key] = update_options(value, dest[key], filename, new_keys) return dest def get_options() -> dict: if not hasattr(get_options, "options"): locations = ("options.yaml", "host.yaml", local_path("options.yaml"), local_path("host.yaml")) for location in locations: if os.path.exists(location): with open(location) as f: options = parse_yaml(f.read()) get_options.options = update_options(get_default_options(), options, location, list()) break else: raise FileNotFoundError(f"Could not find {locations[1]} to load options.") return get_options.options def get_item_name_from_id(code): import Items return Items.lookup_id_to_name.get(code, f'Unknown item (ID:{code})') def get_location_name_from_address(address): import Regions return Regions.lookup_id_to_name.get(address, f'Unknown location (ID:{address})') def persistent_store(category, key, value): path = local_path("_persistent_storage.yaml") storage: dict = persistent_load() category = storage.setdefault(category, {}) category[key] = value with open(path, "wt") as f: f.write(dump(storage)) def persistent_load() -> typing.Dict[dict]: storage = getattr(persistent_load, "storage", None) if storage: return storage path = local_path("_persistent_storage.yaml") storage: dict = {} if os.path.exists(path): try: with open(path, "r") as f: storage = unsafe_parse_yaml(f.read()) except Exception as e: import logging logging.debug(f"Could not read store: {e}") if storage is None: storage = {} persistent_load.storage = storage return storage def get_adjuster_settings(romfile: str) -> typing.Tuple[str, bool]: if hasattr(get_adjuster_settings, "adjuster_settings"): adjuster_settings = getattr(get_adjuster_settings, "adjuster_settings") else: adjuster_settings = persistent_load().get("adjuster", {}).get("last_settings_3", {}) if adjuster_settings: import pprint import Patch adjuster_settings.rom = romfile adjuster_settings.baserom = Patch.get_base_rom_path() whitelist = {"disablemusic", "fastmenu", "heartbeep", "heartcolor", "ow_palettes", "quickswap", "uw_palettes", "sprite"} printed_options = {name: value for name, value in vars(adjuster_settings).items() if name in whitelist} if hasattr(get_adjuster_settings, "adjust_wanted"): adjust_wanted = getattr(get_adjuster_settings, "adjust_wanted") elif persistent_load().get("adjuster", {}).get("never_adjust", False): # never adjust, per user request return romfile, False else: adjust_wanted = input(f"Last used adjuster settings were found. Would you like to apply these? \n" f"{pprint.pformat(printed_options)}\n" f"Enter yes, no or never: ") if adjust_wanted and adjust_wanted.startswith("y"): adjusted = True import AdjusterMain _, romfile = AdjusterMain.adjust(adjuster_settings) elif adjust_wanted and "never" in adjust_wanted: persistent_store("adjuster", "never_adjust", True) return romfile, False else: adjusted = False import logging if not hasattr(get_adjuster_settings, "adjust_wanted"): logging.info(f"Skipping post-patch adjustment") get_adjuster_settings.adjuster_settings = adjuster_settings get_adjuster_settings.adjust_wanted = adjust_wanted return romfile, adjusted return romfile, False class ReceivedItem(typing.NamedTuple): item: int location: int player: int def get_unique_identifier(): uuid = persistent_load().get("client", {}).get("uuid", None) if uuid: return uuid import uuid uuid = uuid.getnode() persistent_store("client", "uuid", uuid) return uuid safe_builtins = { 'set', 'frozenset', } class RestrictedUnpickler(pickle.Unpickler): def find_class(self, module, name): if module == "builtins" and name in safe_builtins: return getattr(builtins, name) # Forbid everything else. raise pickle.UnpicklingError("global '%s.%s' is forbidden" % (module, name)) def restricted_loads(s): """Helper function analogous to pickle.loads().""" return RestrictedUnpickler(io.BytesIO(s)).load()