[Core] Allow multiple worlds in one yaml (#428)

This commit is contained in:
Jarno Westhof 2022-04-12 10:57:29 +02:00 committed by GitHub
parent 8e68aa0ccd
commit 618bdfc917
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 53 deletions

View File

@ -3,7 +3,7 @@ import logging
import random
import urllib.request
import urllib.parse
import typing
from typing import Set, Dict, Tuple, Callable, Any, Union
import os
from collections import Counter
import string
@ -15,7 +15,7 @@ ModuleUpdate.update()
import Utils
from worlds.alttp import Options as LttPOptions
from worlds.generic import PlandoConnection
from Utils import parse_yaml, version_tuple, __version__, tuplize_version, get_options, local_path, user_path
from Utils import parse_yamls, version_tuple, __version__, tuplize_version, get_options, local_path, user_path
from worlds.alttp.EntranceRandomizer import parse_arguments
from Main import main as ERmain
from BaseClasses import seeddigits, get_seed
@ -32,7 +32,7 @@ def mystery_argparse():
options = get_options()
defaults = options["generator"]
def resolve_path(path: str, resolver: typing.Callable[[str], str]) -> str:
def resolve_path(path: str, resolver: Callable[[str], str]) -> str:
return path if os.path.isabs(path) else resolver(path)
parser = argparse.ArgumentParser(description="CMD Generation Interface, defaults come from host.yaml.")
@ -64,7 +64,7 @@ def mystery_argparse():
args.weights_file_path = os.path.join(args.player_files_path, args.weights_file_path)
if not os.path.isabs(args.meta_file_path):
args.meta_file_path = os.path.join(args.player_files_path, args.meta_file_path)
args.plando: typing.Set[str] = {arg.strip().lower() for arg in args.plando.split(",")}
args.plando: Set[str] = {arg.strip().lower() for arg in args.plando.split(",")}
return args, options
@ -83,21 +83,21 @@ def main(args=None, callback=ERmain):
if args.race:
random.seed() # reset to time-based random source
weights_cache = {}
weights_cache: Dict[str, Tuple[Any, ...]] = {}
if args.weights_file_path and os.path.exists(args.weights_file_path):
try:
weights_cache[args.weights_file_path] = read_weights_yaml(args.weights_file_path)
weights_cache[args.weights_file_path] = read_weights_yamls(args.weights_file_path)
except Exception as e:
raise ValueError(f"File {args.weights_file_path} is destroyed. Please fix your yaml.") from e
print(f"Weights: {args.weights_file_path} >> "
f"{get_choice('description', weights_cache[args.weights_file_path], 'No description specified')}")
f"{get_choice('description', weights_cache[args.weights_file_path][-1], 'No description specified')}")
if args.meta_file_path and os.path.exists(args.meta_file_path):
try:
weights_cache[args.meta_file_path] = read_weights_yaml(args.meta_file_path)
weights_cache[args.meta_file_path] = read_weights_yamls(args.meta_file_path)
except Exception as e:
raise ValueError(f"File {args.meta_file_path} is destroyed. Please fix your yaml.") from e
meta_weights = weights_cache[args.meta_file_path]
meta_weights = weights_cache[args.meta_file_path][-1]
print(f"Meta: {args.meta_file_path} >> {get_choice('meta_description', meta_weights)}")
del(meta_weights["meta_description"])
if args.samesettings:
@ -111,14 +111,15 @@ def main(args=None, callback=ERmain):
if file.is_file() and os.path.join(args.player_files_path, fname) not in {args.meta_file_path, args.weights_file_path}:
path = os.path.join(args.player_files_path, fname)
try:
weights_cache[fname] = read_weights_yaml(path)
weights_cache[fname] = read_weights_yamls(path)
except Exception as e:
raise ValueError(f"File {fname} is destroyed. Please fix your yaml.") from e
else:
print(f"P{player_id} Weights: {fname} >> "
f"{get_choice('description', weights_cache[fname], 'No description specified')}")
player_files[player_id] = fname
player_id += 1
for yaml in weights_cache[fname]:
print(f"P{player_id} Weights: {fname} >> "
f"{get_choice('description', yaml, 'No description specified')}")
player_files[player_id] = fname
player_id += 1
args.multi = max(player_id-1, args.multi)
print(f"Generating for {args.multi} player{'s' if args.multi > 1 else ''}, {seed_name} Seed {seed} with plando: "
@ -141,8 +142,9 @@ def main(args=None, callback=ERmain):
erargs.sm_rom = args.sm_rom
erargs.enemizercli = args.enemizercli
settings_cache = {k: (roll_settings(v, args.plando) if args.samesettings else None)
for k, v in weights_cache.items()}
settings_cache: Dict[str, Tuple[argparse.Namespace, ...]] = \
{fname: ( tuple(roll_settings(yaml, args.plando) for yaml in yamls) if args.samesettings else None)
for fname, yamls in weights_cache.items()}
player_path_cache = {}
for player in range(1, args.multi + 1):
player_path_cache[player] = player_files.get(player, args.weights_file_path)
@ -153,38 +155,45 @@ def main(args=None, callback=ERmain):
option = get_choice(key, category_dict)
if option is not None:
for player, path in player_path_cache.items():
if category_name is None:
weights_cache[path][key] = option
elif category_name not in weights_cache[path]:
logging.warning(f"Meta: Category {category_name} is not present in {path}.")
else:
weights_cache[path][category_name][key] = option
for yaml in weights_cache[path]:
if category_name is None:
yaml[key] = option
elif category_name not in yaml:
logging.warning(f"Meta: Category {category_name} is not present in {path}.")
else:
yaml[category_name][key] = option
name_counter = Counter()
erargs.player_settings = {}
for player in range(1, args.multi + 1):
player = 1
while player <= args.multi:
path = player_path_cache[player]
if path:
try:
settings = settings_cache[path] if settings_cache[path] else \
roll_settings(weights_cache[path], args.plando)
for k, v in vars(settings).items():
if v is not None:
try:
getattr(erargs, k)[player] = v
except AttributeError:
setattr(erargs, k, {player: v})
except Exception as e:
raise Exception(f"Error setting {k} to {v} for player {player}") from e
settings: Tuple[argparse.Namespace, ...] = settings_cache[path] if settings_cache[path] else \
tuple(roll_settings(yaml, args.plando) for yaml in weights_cache[path])
for settingsObject in settings:
for k, v in vars(settingsObject).items():
if v is not None:
try:
getattr(erargs, k)[player] = v
except AttributeError:
setattr(erargs, k, {player: v})
except Exception as e:
raise Exception(f"Error setting {k} to {v} for player {player}") from e
if path == args.weights_file_path: # if name came from the weights file, just use base player name
erargs.name[player] = f"Player{player}"
elif not erargs.name[player]: # if name was not specified, generate it from filename
erargs.name[player] = os.path.splitext(os.path.split(path)[-1])[0]
erargs.name[player] = handle_name(erargs.name[player], player, name_counter)
player += 1
except Exception as e:
raise ValueError(f"File {path} is destroyed. Please fix your yaml.") from e
else:
raise RuntimeError(f'No weights specified for player {player}')
if path == args.weights_file_path: # if name came from the weights file, just use base player name
erargs.name[player] = f"Player{player}"
elif not erargs.name[player]: # if name was not specified, generate it from filename
erargs.name[player] = os.path.splitext(os.path.split(path)[-1])[0]
erargs.name[player] = handle_name(erargs.name[player], player, name_counter)
if len(set(erargs.name.values())) != len(erargs.name):
raise Exception(f"Names have to be unique. Names: {Counter(erargs.name.values())}")
@ -214,7 +223,7 @@ def main(args=None, callback=ERmain):
callback(erargs, seed)
def read_weights_yaml(path):
def read_weights_yamls(path) -> Tuple[Any, ...]:
try:
if urllib.parse.urlparse(path).scheme in ('https', 'file'):
yaml = str(urllib.request.urlopen(path).read(), "utf-8-sig")
@ -224,7 +233,7 @@ def read_weights_yaml(path):
except Exception as e:
raise Exception(f"Failed to read weights ({path})") from e
return parse_yaml(yaml)
return tuple(parse_yamls(yaml))
def interpret_on_off(value) -> bool:
@ -235,7 +244,7 @@ def convert_to_on_off(value) -> str:
return {True: "on", False: "off"}.get(value, value)
def get_choice_legacy(option, root, value=None) -> typing.Any:
def get_choice_legacy(option, root, value=None) -> Any:
if option not in root:
return value
if type(root[option]) is list:
@ -250,7 +259,7 @@ def get_choice_legacy(option, root, value=None) -> typing.Any:
raise RuntimeError(f"All options specified in \"{option}\" are weighted as zero.")
def get_choice(option, root, value=None) -> typing.Any:
def get_choice(option, root, value=None) -> Any:
if option not in root:
return value
if type(root[option]) is list:
@ -283,16 +292,16 @@ def handle_name(name: str, player: int, name_counter: Counter):
return new_name
def prefer_int(input_data: str) -> typing.Union[str, int]:
def prefer_int(input_data: str) -> Union[str, int]:
try:
return int(input_data)
except:
return input_data
available_boss_names: typing.Set[str] = {boss.lower() for boss in Bosses.boss_table if boss not in
available_boss_names: Set[str] = {boss.lower() for boss in Bosses.boss_table if boss not in
{'Agahnim', 'Agahnim2', 'Ganon'}}
available_boss_locations: typing.Set[str] = {f"{loc.lower()}{f' {level}' if level else ''}" for loc, level in
available_boss_locations: Set[str] = {f"{loc.lower()}{f' {level}' if level else ''}" for loc, level in
Bosses.boss_location_table}
boss_shuffle_options = {None: 'none',
@ -317,7 +326,7 @@ goals = {
}
def roll_percentage(percentage: typing.Union[int, float]) -> bool:
def roll_percentage(percentage: Union[int, float]) -> bool:
"""Roll a percentage chance.
percentage is expected to be in range [0, 100]"""
return random.random() < (float(percentage) / 100)
@ -387,7 +396,7 @@ def roll_triggers(weights: dict, triggers: list) -> dict:
return weights
def get_plando_bosses(boss_shuffle: str, plando_options: typing.Set[str]) -> str:
def get_plando_bosses(boss_shuffle: str, plando_options: Set[str]) -> str:
if boss_shuffle in boss_shuffle_options:
return boss_shuffle_options[boss_shuffle]
elif "bosses" in plando_options:
@ -439,7 +448,7 @@ def handle_option(ret: argparse.Namespace, game_weights: dict, option_key: str,
setattr(ret, option_key, option(option.default))
def roll_settings(weights: dict, plando_options: typing.Set[str] = frozenset(("bosses",))):
def roll_settings(weights: dict, plando_options: Set[str] = frozenset(("bosses",))):
if "linked_options" in weights:
weights = roll_linked_options(weights)

View File

@ -28,7 +28,7 @@ class Version(typing.NamedTuple):
__version__ = "0.3.2"
version_tuple = tuplize_version(__version__)
from yaml import load, dump, SafeLoader
from yaml import load, load_all, dump, SafeLoader
try:
from yaml import CLoader as Loader
@ -159,6 +159,7 @@ class UniqueKeyLoader(SafeLoader):
parse_yaml = functools.partial(load, Loader=UniqueKeyLoader)
parse_yamls = functools.partial(load_all, Loader=UniqueKeyLoader)
unsafe_parse_yaml = functools.partial(load, Loader=Loader)

View File

@ -13,7 +13,7 @@ def allowed_file(filename):
from Generate import roll_settings
from Utils import parse_yaml
from Utils import parse_yamls
@app.route('/check', methods=['GET', 'POST'])
@ -68,14 +68,19 @@ def roll_options(options: Dict[str, Union[dict, str]]) -> Tuple[Dict[str, Union[
for filename, text in options.items():
try:
if type(text) is dict:
yaml_data = text
yaml_datas = (text, )
else:
yaml_data = parse_yaml(text)
yaml_datas = tuple(parse_yamls(text))
except Exception as e:
results[filename] = f"Failed to parse YAML data in {filename}: {e}"
else:
try:
rolled_results[filename] = roll_settings(yaml_data,
if len(yaml_datas) == 1:
rolled_results[filename] = roll_settings(yaml_datas[0],
plando_options={"bosses", "items", "connections", "texts"})
else:
for i, yaml_data in enumerate(yaml_datas):
rolled_results[f"{filename}/{i + 1}"] = roll_settings(yaml_data,
plando_options={"bosses", "items", "connections", "texts"})
except Exception as e:
results[filename] = f"Failed to generate mystery in {filename}: {e}"