from __future__ import annotations __version__ = "2.3.1" _version_tuple = tuple(int(piece, 10) for piece in __version__.split(".")) import os import subprocess import sys import typing import functools from yaml import load, dump try: from yaml import CLoader as Loader except ImportError: from yaml import Loader def int16_as_bytes(value): value = value & 0xFFFF return [value & 0xFF, (value >> 8) & 0xFF] def int32_as_bytes(value): value = value & 0xFFFFFFFF return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF] def pc_to_snes(value): return ((value<<1) & 0x7F0000)|(value & 0x7FFF)|0x8000 def snes_to_pc(value): return ((value & 0x7F0000)>>1)|(value & 0x7FFF) def parse_player_names(names, players, teams): names = tuple(n for n in (n.strip() for n in names.split(",")) if n) ret = [] while names or len(ret) < teams: team = [n[:16] for n in names[:players]] # where does the 16 character limit come from? while len(team) != players: team.append(f"Player{len(team) + 1}") ret.append(team) names = names[players:] return ret def is_bundled(): return getattr(sys, 'frozen', False) def local_path(path): if local_path.cached_path: return os.path.join(local_path.cached_path, path) elif is_bundled(): if hasattr(sys, "_MEIPASS"): # we are running in a PyInstaller bundle local_path.cached_path = sys._MEIPASS # pylint: disable=protected-access,no-member else: # cx_Freeze local_path.cached_path = os.path.dirname(os.path.abspath(sys.argv[0])) else: # we are running in a normal Python environment import __main__ local_path.cached_path = os.path.dirname(os.path.abspath(__main__.__file__)) return os.path.join(local_path.cached_path, path) local_path.cached_path = None def output_path(path): if output_path.cached_path: return os.path.join(output_path.cached_path, path) if not is_bundled() and not hasattr(sys, "_MEIPASS"): # this should trigger if it's cx_freeze bundling output_path.cached_path = '.' return os.path.join(output_path.cached_path, path) else: # has been PyInstaller packaged, so cannot use CWD for output. if sys.platform == 'win32': # windows import ctypes.wintypes CSIDL_PERSONAL = 5 # My Documents SHGFP_TYPE_CURRENT = 0 # Get current, not default value buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH) ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf) documents = buf.value elif sys.platform == 'darwin': from AppKit import NSSearchPathForDirectoriesInDomains # pylint: disable=import-error # http://developer.apple.com/DOCUMENTATION/Cocoa/Reference/Foundation/Miscellaneous/Foundation_Functions/Reference/reference.html#//apple_ref/c/func/NSSearchPathForDirectoriesInDomains NSDocumentDirectory = 9 NSUserDomainMask = 1 # True for expanding the tilde into a fully qualified path documents = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0] else: raise NotImplementedError('Not supported yet') output_path.cached_path = os.path.join(documents, 'ALttPEntranceRandomizer') if not os.path.exists(output_path.cached_path): os.mkdir(output_path.cached_path) return os.path.join(output_path.cached_path, path) output_path.cached_path = None def open_file(filename): if sys.platform == 'win32': os.startfile(filename) else: open_command = 'open' if sys.platform == 'darwin' else 'xdg-open' subprocess.call([open_command, filename]) def close_console(): if sys.platform == 'win32': #windows import ctypes.wintypes try: ctypes.windll.kernel32.FreeConsole() except Exception: pass def make_new_base2current(old_rom='Zelda no Densetsu - Kamigami no Triforce (Japan).sfc', new_rom='working.sfc'): from collections import OrderedDict import json import hashlib with open(old_rom, 'rb') as stream: old_rom_data = bytearray(stream.read()) with open(new_rom, 'rb') as stream: new_rom_data = bytearray(stream.read()) # extend to 2 mb old_rom_data.extend(bytearray([0x00]) * (2097152 - len(old_rom_data))) out_data = OrderedDict() for idx, old in enumerate(old_rom_data): new = new_rom_data[idx] if old != new: out_data[idx] = [int(new)] for offset in reversed(list(out_data.keys())): if offset - 1 in out_data: out_data[offset-1].extend(out_data.pop(offset)) with open('data/base2current.json', 'wt') as outfile: json.dump([{key: value} for key, value in out_data.items()], outfile, separators=(",", ":")) basemd5 = hashlib.md5() basemd5.update(new_rom_data) return "New Rom Hash: " + basemd5.hexdigest() parse_yaml = functools.partial(load, Loader=Loader) class Hint(typing.NamedTuple): receiving_player: int finding_player: int location: int item: int found: bool entrance: str = "" def re_check(self, ctx, team) -> Hint: if self.found: return self found = self.location in ctx.location_checks[team, self.finding_player] if found: return Hint(self.receiving_player, self.finding_player, self.location, self.item, found, self.entrance) return self def as_legacy(self) -> tuple: return self.receiving_player, self.finding_player, self.location, self.item, self.found def __hash__(self): return hash((self.receiving_player, self.finding_player, self.location, self.item, self.entrance)) def get_public_ipv4() -> str: import socket import urllib.request import logging ip = socket.gethostbyname(socket.gethostname()) try: ip = urllib.request.urlopen('https://checkip.amazonaws.com/').read().decode('utf8').strip() except Exception as e: try: ip = urllib.request.urlopen('https://v4.ident.me').read().decode('utf8').strip() except: logging.exception(e) pass # we could be offline, in a local game, so no point in erroring out return ip def get_options() -> dict: if not hasattr(get_options, "options"): locations = ("options.yaml", "host.yaml", local_path("options.yaml"), local_path("host.yaml")) for location in locations: if os.path.exists(location): with open(location) as f: get_options.options = parse_yaml(f.read()) break else: raise FileNotFoundError(f"Could not find {locations[1]} to load options.") return get_options.options def get_item_name_from_id(code): import Items return Items.lookup_id_to_name.get(code, f'Unknown item (ID:{code})') def get_location_name_from_address(address): import Regions return Regions.lookup_id_to_name.get(address, f'Unknown location (ID:{address})') def persistent_store(category, key, value): path = local_path("_persistent_storage.yaml") storage: dict = persistent_load() category = storage.setdefault(category, {}) category[key] = value with open(path, "wt") as f: f.write(dump(storage)) def persistent_load() -> typing.Dict[dict]: storage = getattr(persistent_load, "storage", None) if storage: return storage path = local_path("_persistent_storage.yaml") storage: dict = {} if os.path.exists(path): try: with open(path, "r") as f: storage = parse_yaml(f.read()) except Exception as e: import logging logging.debug(f"Could not read store: {e}") if storage is None: storage = {} persistent_load.storage = storage return storage def get_adjuster_settings(romfile: str) -> typing.Tuple[str, bool]: if hasattr(get_adjuster_settings, "adjuster_settings"): adjuster_settings = getattr(get_adjuster_settings, "adjuster_settings") else: adjuster_settings = persistent_load().get("adjuster", {}).get("last_settings", {}) if adjuster_settings: import pprint import Patch adjuster_settings.rom = romfile adjuster_settings.baserom = Patch.get_base_rom_path() whitelist = {"disablemusic", "fastmenu", "heartbeep", "heartcolor", "ow_palettes", "quickswap", "uw_palettes"} printed_options = {name: value for name, value in vars(adjuster_settings).items() if name in whitelist} sprite = getattr(adjuster_settings, "sprite", None) if sprite: printed_options["sprite"] = adjuster_settings.sprite.name if hasattr(get_adjuster_settings, "adjust_wanted"): adjust_wanted = getattr(get_adjuster_settings, "adjust_wanted") else: adjust_wanted = input(f"Last used adjuster settings were found. Would you like to apply these? \n" f"{pprint.pformat(printed_options)}\n" f"Enter yes or no: ") if adjust_wanted and adjust_wanted.startswith("y"): adjusted = True import AdjusterMain _, romfile = AdjusterMain.adjust(adjuster_settings) else: adjusted = False import logging if not hasattr(get_adjuster_settings, "adjust_wanted"): logging.info(f"Skipping post-patch adjustment") get_adjuster_settings.adjuster_settings = adjuster_settings get_adjuster_settings.adjust_wanted = adjust_wanted return romfile, adjusted return romfile, False class ReceivedItem(typing.NamedTuple): item: int location: int player: int def get_unique_identifier(): uuid = persistent_load().get("client", {}).get("uuid", None) if uuid: return uuid import uuid uuid = uuid.getnode() persistent_store("client", "uuid", uuid) return uuid