împlement optional hint system (defaults to off)

Fabian Dill 2020-02-09 05:28:48 +01:00
5 changed files with 160 additions and 85 deletions

__author__ = "Berserker55" # you can find me on the ALTTP Randomizer Discord
__version__ = 1.5
__version__ = 1.6
This script launches a Multiplayer "Multiworld" Mystery Game
It is still up to the host to forward the correct port (38281 by default) and distribute the roms to the players.
Regular Mystery has to work for this first, such as a ALTTP Base ROM and Enemizer Setup.
A guide can be found here: https://docs.google.com/document/d/19FoqUkuyStMqhOq8uGiocskMo1KMjOW4nEeG81xrKoI/edit
This script itself should be placed within the Bonta Multiworld folder, that you download in step 1
Configuration can be found in host.yaml
#location of your Enemizer CLI, available here: https://github.com/Bonta0/Enemizer/releases
enemizer_location:str = "EnemizerCLI/EnemizerCLI.Core.exe"
#Where to place the resulting files
outputpath:str = "MultiMystery"
#automatically launches {player_name}.yaml's ROM file using the OS's default program once generation completes. (likely your emulator)
#does nothing if the name is not found
#example: player_name = "Berserker"
player_name:str = ""
#Zip the resulting roms
#0 -> Don't
#1 -> Create a zip
#2 -> Create a zip and delete the ROMs that will be in it, except the hosts (requires player_name to be set correctly)
zip_roms:int = 1
#create a spoiler file
create_spoiler:bool = True
#create roms as race coms
race:bool= False
#folder from which the player yaml files are pulled from
player_files_folder:str = "Players"
#Version of python to use for Bonta Multiworld. Probably leave this as is, if you don't know what this does.
#can be tagged for bitness, for example "3.8-32" would be latest installed 3.8 on 32 bits
#special case: None -> use the python which was used to launch this file.
py_version:str = None
####end of config####
import os
import subprocess
import sys
if __name__ == "__main__":
if not py_version:
py_version = f"{sys.version_info.major}.{sys.version_info.minor}"
print(f"{__author__}'s MultiMystery Launcher V{__version__}")
import ModuleUpdate
print(f"{__author__}'s MultiMystery Launcher V{__version__}")
if not os.path.exists(enemizer_location):
feedback(f"Enemizer not found at {enemizer_location}, please adjust the path in MultiMystery.py's config or put Enemizer in the default location.")
from Utils import parse_yaml
multi_mystery_options = parse_yaml(open("host.yaml").read())["multi_mystery_options"]
output_path = multi_mystery_options["output_path"]
enemizer_path = multi_mystery_options["enemizer_path"]
player_files_path = multi_mystery_options["player_files_path"]
race = multi_mystery_options["race"]
create_spoiler = multi_mystery_options["create_spoiler"]
zip_roms = multi_mystery_options["zip_roms"]
player_name = multi_mystery_options["player_name"]
py_version = f"{sys.version_info.major}.{sys.version_info.minor}"
if not os.path.exists(enemizer_path):
feedback(f"Enemizer not found at {enemizer_path}, please adjust the path in MultiMystery.py's config or put Enemizer in the default location.")
if not os.path.exists("Zelda no Densetsu - Kamigami no Triforce (Japan).sfc"):
feedback("Base rom is expected as Zelda no Densetsu - Kamigami no Triforce (Japan).sfc in the Multiworld root folder please place/rename it there.")
player_files = []
os.makedirs(player_files_folder, exist_ok=True)
for file in os.listdir(player_files_folder):
os.makedirs(player_files_path, exist_ok=True)
for file in os.listdir(player_files_path):
if file.lower().endswith(".yaml"):
print(f"Player {file[:-5]} found.")
player_count = len(player_files)
if player_count == 0:
feedback(f"No player files found. Please put them in a {player_files_folder} folder.")
feedback(f"No player files found. Please put them in a {player_files_path} folder.")
print(player_count, "Players found.")
player_string = ""
for i,file in enumerate(player_files):
player_string += f"--p{i+1} {os.path.join(player_files_folder, file)} "
player_string += f"--p{i+1} {os.path.join(player_files_path, file)} "
player_names = list(file[:-5] for file in player_files)
basemysterycommand = f"py -{py_version} Mystery.py" #source
command = f"{basemysterycommand} --multi {len(player_files)} {player_string} " \
f"--names {','.join(player_names)} --enemizercli {enemizer_location} " \
f"--outputpath {outputpath}" + " --create_spoiler" if create_spoiler else "" + " --race" if race else ""
f"--names {','.join(player_names)} --enemizercli {enemizer_path} " \
f"--outputpath {output_path}" + " --create_spoiler" if create_spoiler else "" + " --race" if race else ""
import time
start = time.perf_counter()
except IndexError:
print(f"Could not find Player {player_name}")
romfilename = os.path.join(outputpath, f"ER_{seedname}_P{index+1}_{player_name}.sfc")
romfilename = os.path.join(output_path, f"ER_{seedname}_P{index+1}_{player_name}.sfc")
import webbrowser
if os.path.exists(romfilename):
print(f"Launching ROM file {romfilename}")
if zip_roms:
zipname = os.path.join(outputpath, f"ER_{seedname}.zip")
zipname = os.path.join(output_path, f"ER_{seedname}.zip")
print(f"Creating zipfile {zipname}")
import zipfile
with zipfile.ZipFile(zipname, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf:
for file in os.listdir(outputpath):
for file in os.listdir(output_path):
if file.endswith(".sfc") and seedname in file:
zf.write(os.path.join(outputpath, file), file)
zf.write(os.path.join(output_path, file), file)
print(f"Packed {file} into zipfile {zipname}")
if zip_roms == 2 and player_name.lower() not in file.lower():
baseservercommand = f"py -{py_version} MultiServer.py" # source
#don't have a mac to test that. If you try to run compiled on mac, good luck.
subprocess.call(f"{baseservercommand} --multidata {os.path.join(outputpath, multidataname)}")
subprocess.call(f"{baseservercommand} --multidata {os.path.join(output_path, multidataname)}")
import shlex
import urllib.request
import zlib
import collections
import ModuleUpdate
import Items
import Regions
import Utils
from MultiClient import ReceivedItem, get_item_name_from_id, get_location_name_from_address
class Client:
self.send_index = 0
class Context:
def __init__(self, host, port, password):
def __init__(self, host:str, port:int, password:str, location_check_points:int, hint_cost:int):
self.data_filename = None
self.save_filename = None
self.disable_save = False
self.countdown_timer = 0
self.clients = []
self.received_items = {}
self.location_checks = collections.defaultdict(lambda: 0)
self.hint_cost = hint_cost
self.location_check_points = location_check_points
self.hints_used = collections.defaultdict(lambda: 0)
def get_save(self) -> dict:
return {
"rom_names": list(self.rom_names.items()),
"received_items": tuple((k, [i.__dict__ for i in v]) for k, v in self.received_items.items()),
"hints_used" : tuple((key,value) for key, value in self.hints_used.items()),
"location_checks" : tuple((key,value) for key, value in self.location_checks.items())
def set_save(self, savedata: dict):
rom_names = savedata["rom_names"]
received_items = {tuple(k): [ReceivedItem(**i) for i in v] for k, v in savedata["received_items"]}
if not all([self.rom_names[tuple(rom)] == (team, slot) for rom, (team, slot) in rom_names]):
raise Exception('Save file mismatch, will start a new game')
self.received_items = received_items
self.hints_used.update({tuple(key): value for key, value in savedata["hints_used"]})
self.location_checks.update({tuple(key): value for key, value in savedata["location_checks"]})
logging.info(f'Loaded save file with {sum([len(p) for p in received_items.values()])} received items '
f'for {len(received_items)} players')
if not websocket or not websocket.open or websocket.closed:
@ -174,7 +199,9 @@ def register_location_checks(ctx : Context, team, slot, locations):
if recvd_item.location == location and recvd_item.player == slot:
found = True
if not found:
ctx.location_checks[team, slot] += 1
new_item = ReceivedItem(target_item, location, slot)
if slot != target_player:
@ -183,15 +210,32 @@ def register_location_checks(ctx : Context, team, slot, locations):
found_items = True
if found_items and not ctx.disable_save:
if found_items:
if not ctx.disable_save:
with open(ctx.save_filename, "wb") as f:
jsonstr = json.dumps((list(ctx.rom_names.items()),
[(k, [i.__dict__ for i in v]) for k, v in ctx.received_items.items()]))
jsonstr = json.dumps(ctx.get_save())
except Exception as e:
def hint(ctx:Context, team, slot, item:str):
found = 0
seeked_item_id = Items.item_table[item][3]
for check, result in ctx.locations.items():
item_id, receiving_player = result
if receiving_player == slot and item_id == seeked_item_id:
location_id, finding_player = check
hint = f"[Hint]: {ctx.player_names[(team, slot)]}'s {item} can be found at " \
f"{get_location_name_from_address(location_id)} in {ctx.player_names[team, finding_player]}'s World"
notify_team(ctx, team, hint)
found += 1
return found
async def process_client_cmd(ctx : Context, client : Client, cmd, args):
if type(cmd) is not str:
await send_msgs(client.socket, [['InvalidCmd']])
if args.startswith('!players'):
notify_all(ctx, get_connected_players_string(ctx))
if args.startswith('!forfeit'):
elif args.startswith('!forfeit'):
forfeit_player(ctx, client.team, client.slot)
if args.startswith('!countdown'):
elif args.startswith('!countdown'):
timer = int(args.split()[1])
except (IndexError, ValueError):
timer = 10
asyncio.create_task(countdown(ctx, timer))
elif args.startswith("!hint"):
points_available = ctx.location_check_points * ctx.location_checks[client.team, client.slot] - ctx.hint_cost*ctx.hints_used[client.team, client.slot]
itemname = args[6:]
if not itemname:
notify_client(client, "Use !hint {itemname}. For example !hint Lamp. "
f"A hint costs {ctx.hint_cost} points.\n"
f"You have {points_available} points.")
elif itemname in Items.item_table:
if ctx.hint_cost: can_pay = points_available // ctx.hint_cost >= 1
else: can_pay = True
if can_pay:
found = hint(ctx, client.team, client.slot, itemname)
ctx.hints_used[client.team, client.slot] += found
if not found:
notify_client(client, "No items found, points refunded.")
notify_client(client, f"You can't afford the hint. "
f"You have {points_available} points and need {ctx.hint_cost}")
notify_client(client, f'Item "{itemname}" not found.')
def set_password(ctx : Context, password):
ctx.password = password
logging.warning('Password set to ' + password if password is not None else 'Password disabled')
if command[0] == '/hint':
for (team, slot), name in ctx.player_names.items():
if len(command) == 1:
print("Use /hint {Playername} {itemname}\nFor example /hint Berserker Lamp")
logging.info("Use /hint {Playername} {itemname}\nFor example /hint Berserker Lamp")
elif name.lower() == command[1].lower():
item = " ".join(command[2:])
if item in Items.item_table:
seeked_item_id = Items.item_table[item][3]
for check, result in ctx.locations.items():
item_id, receiving_player = result
if receiving_player == slot and item_id == seeked_item_id:
location_id, finding_player = check
name_finder = ctx.player_names[team, finding_player]
hint = f"[Hint]: {name}'s {item} can be found at " \
f"{get_location_name_from_address(location_id)} in {name_finder}'s World"
notify_team(ctx, team, hint)
hint(ctx, team, slot, item)
logging.warning("Unknown item: " + item)
if command[0][0] != '/':
parser.add_argument('--savefile', default=None)
parser.add_argument('--disable_save', default=False, action='store_true')
parser.add_argument('--loglevel', default='info', choices=['debug', 'info', 'warning', 'error', 'critical'])
parser.add_argument('--location_check_points', default=1, type=int)
parser.add_argument('--hint_cost', default=1000, type=int)
args = parser.parse_args()
file_options = Utils.parse_yaml(open("host.yaml").read())["server_options"]
for key, value in file_options.items():
if value is not None:
setattr(args, key, value)
logging.basicConfig(format='[%(asctime)s] %(message)s', level=getattr(logging, args.loglevel.upper(), logging.INFO))
ctx = Context(args.host, args.port, args.password)
ctx = Context(args.host, args.port, args.password, args.location_check_points, args.hint_cost)
ctx.data_filename = args.multidata
with open(ctx.save_filename, 'rb') as f:
jsonobj = json.loads(zlib.decompress(f.read()).decode("utf-8"))
rom_names = jsonobj[0]
received_items = {tuple(k): [ReceivedItem(**i) for i in v] for k, v in jsonobj[1]}
if not all([ctx.rom_names[tuple(rom)] == (team, slot) for rom, (team, slot) in rom_names]):
raise Exception('Save file mismatch, will start a new game')
ctx.received_items = received_items
logging.info('Loaded save file with %d received items for %d players' % (sum([len(p) for p in received_items.values()]), len(received_items)))
except FileNotFoundError:
logging.error('No save data found, starting a new game')
except Exception as e:
ctx.server = websockets.serve(functools.partial(server,ctx=ctx), ctx.host, ctx.port, ping_timeout=None, ping_interval=None)
await ctx.server

from yaml import load
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
from Utils import parse_yaml
from Rom import get_sprite_from_name
from EntranceRandomizer import parse_arguments
from Main import main as ERmain
parse_yaml = functools.partial(load, Loader=Loader)
basemd5 = hashlib.md5()
return "New Rom Hash: " + basemd5.hexdigest()
from yaml import load
import functools
try: from yaml import CLoader as Loader
except ImportError: from yaml import Loader
parse_yaml = functools.partial(load, Loader=Loader)

#options for MultiServer
#null means nothing, for the server this means to default the value
#these overwrite command line arguments!
host: null
port: null
password: null
multidata: null
savefile: null
disable_save: null
loglevel: null
#Client hint system
#points given to player for each acquired item
location_check_points: 1
#point cost to receive a hint via !hint for players
hint_cost: 1000
#options for MultiMystery.py
#Where to place the resulting files
output_path: "MultiMystery"
#location of your Enemizer CLI, available here: https://github.com/Bonta0/Enemizer/releases
enemizer_path: "EnemizerCLI/EnemizerCLI.Core.exe"
#folder from which the player yaml files are pulled from
player_files_path: "Players"
#automatically launches {player_name}.yaml's ROM file using the OS's default program once generation completes. (likely your emulator)
#does nothing if the name is not found
#example: player_name = "Berserker"
player_name: "" # the hosts name
#Zip the resulting roms
#0 -> Don't
#1 -> Create a zip
#2 -> Create a zip and delete the ROMs that will be in it, except the hosts (requires player_name to be set correctly)
zip_roms: 1
#create a spoiler file
create_spoiler: 1
#create roms flagged as race roms
race: 0