Archipelago/Utils.py

241 lines
7.7 KiB
Python
Raw Normal View History

from __future__ import annotations
__version__ = "2.1.2"
2020-04-20 12:50:49 +00:00
_version_tuple = tuple(int(piece, 10) for piece in __version__.split("."))
import os
import subprocess
import sys
import typing
import functools
from yaml import load, dump
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
def int16_as_bytes(value):
value = value & 0xFFFF
return [value & 0xFF, (value >> 8) & 0xFF]
def int32_as_bytes(value):
value = value & 0xFFFFFFFF
return [value & 0xFF, (value >> 8) & 0xFF, (value >> 16) & 0xFF, (value >> 24) & 0xFF]
2018-09-23 02:51:54 +00:00
def pc_to_snes(value):
return ((value<<1) & 0x7F0000)|(value & 0x7FFF)|0x8000
def snes_to_pc(value):
return ((value & 0x7F0000)>>1)|(value & 0x7FFF)
def parse_player_names(names, players, teams):
names = tuple(n for n in (n.strip() for n in names.split(",")) if n)
ret = []
while names or len(ret) < teams:
team = [n[:16] for n in names[:players]]
2020-02-23 16:06:44 +00:00
# where does the 16 character limit come from?
while len(team) != players:
2020-04-18 19:46:57 +00:00
team.append(f"Player{len(team) + 1}")
ret.append(team)
names = names[players:]
return ret
def is_bundled():
return getattr(sys, 'frozen', False)
def local_path(path):
if local_path.cached_path:
return os.path.join(local_path.cached_path, path)
2020-03-23 06:45:40 +00:00
elif is_bundled():
if hasattr(sys, "_MEIPASS"):
# we are running in a PyInstaller bundle
local_path.cached_path = sys._MEIPASS # pylint: disable=protected-access,no-member
else:
# cx_Freeze
local_path.cached_path = os.path.dirname(os.path.abspath(sys.argv[0]))
else:
# we are running in a normal Python environment
2020-03-23 06:45:40 +00:00
import __main__
local_path.cached_path = os.path.dirname(os.path.abspath(__main__.__file__))
return os.path.join(local_path.cached_path, path)
local_path.cached_path = None
def output_path(path):
if output_path.cached_path:
return os.path.join(output_path.cached_path, path)
if not is_bundled() and not hasattr(sys, "_MEIPASS"):
# this should trigger if it's cx_freeze bundling
output_path.cached_path = '.'
return os.path.join(output_path.cached_path, path)
else:
# has been PyInstaller packaged, so cannot use CWD for output.
if sys.platform == 'win32':
# windows
import ctypes.wintypes
CSIDL_PERSONAL = 5 # My Documents
SHGFP_TYPE_CURRENT = 0 # Get current, not default value
buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf)
documents = buf.value
elif sys.platform == 'darwin':
from AppKit import NSSearchPathForDirectoriesInDomains # pylint: disable=import-error
# http://developer.apple.com/DOCUMENTATION/Cocoa/Reference/Foundation/Miscellaneous/Foundation_Functions/Reference/reference.html#//apple_ref/c/func/NSSearchPathForDirectoriesInDomains
NSDocumentDirectory = 9
NSUserDomainMask = 1
# True for expanding the tilde into a fully qualified path
documents = NSSearchPathForDirectoriesInDomains(NSDocumentDirectory, NSUserDomainMask, True)[0]
else:
raise NotImplementedError('Not supported yet')
output_path.cached_path = os.path.join(documents, 'ALttPEntranceRandomizer')
if not os.path.exists(output_path.cached_path):
os.mkdir(output_path.cached_path)
return os.path.join(output_path.cached_path, path)
output_path.cached_path = None
def open_file(filename):
if sys.platform == 'win32':
os.startfile(filename)
else:
open_command = 'open' if sys.platform == 'darwin' else 'xdg-open'
subprocess.call([open_command, filename])
def close_console():
if sys.platform == 'win32':
#windows
import ctypes.wintypes
try:
ctypes.windll.kernel32.FreeConsole()
except Exception:
pass
def make_new_base2current(old_rom='Zelda no Densetsu - Kamigami no Triforce (Japan).sfc', new_rom='working.sfc'):
from collections import OrderedDict
import json
import hashlib
with open(old_rom, 'rb') as stream:
old_rom_data = bytearray(stream.read())
with open(new_rom, 'rb') as stream:
new_rom_data = bytearray(stream.read())
# extend to 2 mb
old_rom_data.extend(bytearray([0x00]) * (2097152 - len(old_rom_data)))
out_data = OrderedDict()
for idx, old in enumerate(old_rom_data):
new = new_rom_data[idx]
if old != new:
out_data[idx] = [int(new)]
for offset in reversed(list(out_data.keys())):
if offset - 1 in out_data:
out_data[offset-1].extend(out_data.pop(offset))
with open('data/base2current.json', 'wt') as outfile:
json.dump([{key: value} for key, value in out_data.items()], outfile, separators=(",", ":"))
basemd5 = hashlib.md5()
basemd5.update(new_rom_data)
return "New Rom Hash: " + basemd5.hexdigest()
parse_yaml = functools.partial(load, Loader=Loader)
class Hint(typing.NamedTuple):
receiving_player: int
finding_player: int
location: int
item: int
found: bool
def re_check(self, ctx, team) -> Hint:
if self.found:
return self
found = self.location in ctx.location_checks[team, self.finding_player]
if found:
return Hint(self.receiving_player, self.finding_player, self.location, self.item, found)
return self
2020-04-22 13:50:14 +00:00
def __hash__(self):
return hash((self.receiving_player, self.finding_player, self.location, self.item))
def get_public_ipv4() -> str:
import socket
import urllib.request
import logging
ip = socket.gethostbyname(socket.gethostname())
try:
ip = urllib.request.urlopen('https://checkip.amazonaws.com/').read().decode('utf8').strip()
except Exception as e:
try:
ip = urllib.request.urlopen('https://v4.ident.me').read().decode('utf8').strip()
except:
logging.exception(e)
pass # we could be offline, in a local game, so no point in erroring out
return ip
def get_options() -> dict:
2020-03-23 06:59:55 +00:00
if not hasattr(get_options, "options"):
locations = ("options.yaml", "host.yaml",
local_path("options.yaml"), local_path("host.yaml"))
for location in locations:
if os.path.exists(location):
with open(location) as f:
2020-03-23 06:59:55 +00:00
get_options.options = parse_yaml(f.read())
break
else:
raise FileNotFoundError(f"Could not find {locations[1]} to load options.")
return get_options.options
def get_item_name_from_id(code):
import Items
return Items.lookup_id_to_name.get(code, f'Unknown item (ID:{code})')
def get_location_name_from_address(address):
import Regions
return Regions.lookup_id_to_name.get(address, f'Unknown location (ID:{address})')
def persistent_store(category, key, value):
path = local_path("_persistent_storage.yaml")
storage: dict = persistent_load()
category = storage.setdefault(category, {})
category[key] = value
with open(path, "wt") as f:
f.write(dump(storage))
def persistent_load() -> typing.Dict[dict]:
path = local_path("_persistent_storage.yaml")
storage: dict = {}
if os.path.exists(path):
try:
with open(path, "r") as f:
storage = parse_yaml(f.read())
except Exception as e:
import logging
logging.debug(f"Could not read store: {e}")
return storage
class ReceivedItem(typing.NamedTuple):
item: int
location: int
player: int