import json import os import pickle import random import tempfile import zipfile import concurrent.futures from collections import Counter from typing import Dict, Optional, Any, Union, List from flask import request, flash, redirect, url_for, session, render_template from pony.orm import commit, db_session from BaseClasses import seeddigits, get_seed from Generate import handle_name, PlandoOptions from Main import main as ERmain from Utils import __version__ from WebHostLib import app from worlds.alttp.EntranceRandomizer import parse_arguments from .check import get_yaml_data, roll_options from .models import Generation, STATE_ERROR, STATE_QUEUED, Seed, UUID from .upload import upload_zip_to_db def get_meta(options_source: dict, race: bool = False) -> Dict[str, Union[List[str], Dict[str, Any]]]: plando_options = { options_source.get("plando_bosses", ""), options_source.get("plando_items", ""), options_source.get("plando_connections", ""), options_source.get("plando_texts", "") } plando_options -= {""} server_options = { "hint_cost": int(options_source.get("hint_cost", 10)), "release_mode": options_source.get("release_mode", "goal"), "remaining_mode": options_source.get("remaining_mode", "disabled"), "collect_mode": options_source.get("collect_mode", "disabled"), "item_cheat": bool(int(options_source.get("item_cheat", 1))), "server_password": options_source.get("server_password", None), } generator_options = { "spoiler": int(options_source.get("spoiler", 0)), "race": race } if race: server_options["item_cheat"] = False server_options["remaining_mode"] = "disabled" generator_options["spoiler"] = 0 return { "server_options": server_options, "plando_options": list(plando_options), "generator_options": generator_options, } @app.route('/generate', methods=['GET', 'POST']) @app.route('/generate/', methods=['GET', 'POST']) def generate(race=False): if request.method == 'POST': # check if the post request has the file part if 'file' not in request.files: flash('No file part') else: files = request.files.getlist('file') options = get_yaml_data(files) if isinstance(options, str): flash(options) else: meta = get_meta(request.form, race) results, gen_options = roll_options(options, set(meta["plando_options"])) if any(type(result) == str for result in results.values()): return render_template("checkResult.html", results=results) elif len(gen_options) > app.config["MAX_ROLL"]: flash(f"Sorry, generating of multiworlds is limited to {app.config['MAX_ROLL']} players. " f"If you have a larger group, please generate it yourself and upload it.") elif len(gen_options) >= app.config["JOB_THRESHOLD"]: gen = Generation( options=pickle.dumps({name: vars(options) for name, options in gen_options.items()}), # convert to json compatible meta=json.dumps(meta), state=STATE_QUEUED, owner=session["_id"]) commit() return redirect(url_for("wait_seed", seed=gen.id)) else: try: seed_id = gen_game({name: vars(options) for name, options in gen_options.items()}, meta=meta, owner=session["_id"].int) except BaseException as e: from .autolauncher import handle_generation_failure handle_generation_failure(e) return render_template("seedError.html", seed_error=(e.__class__.__name__ + ": " + str(e))) return redirect(url_for("view_seed", seed=seed_id)) return render_template("generate.html", race=race, version=__version__) def gen_game(gen_options: dict, meta: Optional[Dict[str, Any]] = None, owner=None, sid=None): if not meta: meta: Dict[str, Any] = {} meta.setdefault("server_options", {}).setdefault("hint_cost", 10) race = meta.setdefault("generator_options", {}).setdefault("race", False) def task(): target = tempfile.TemporaryDirectory() playercount = len(gen_options) seed = get_seed() if race: random.seed() # use time-based random source else: random.seed(seed) seedname = "W" + (f"{random.randint(0, pow(10, seeddigits) - 1)}".zfill(seeddigits)) erargs = parse_arguments(['--multi', str(playercount)]) erargs.seed = seed erargs.name = {x: "" for x in range(1, playercount + 1)} # only so it can be overwritten in mystery erargs.spoiler = meta["generator_options"].get("spoiler", 0) erargs.race = race erargs.outputname = seedname erargs.outputpath = target.name erargs.teams = 1 erargs.plando_options = PlandoOptions.from_set(meta.setdefault("plando_options", {"bosses", "items", "connections", "texts"})) erargs.skip_prog_balancing = False name_counter = Counter() for player, (playerfile, settings) in enumerate(gen_options.items(), 1): for k, v in settings.items(): if v is not None: if hasattr(erargs, k): getattr(erargs, k)[player] = v else: setattr(erargs, k, {player: v}) if not erargs.name[player]: erargs.name[player] = os.path.splitext(os.path.split(playerfile)[-1])[0] erargs.name[player] = handle_name(erargs.name[player], player, name_counter) if len(set(erargs.name.values())) != len(erargs.name): raise Exception(f"Names have to be unique. Names: {Counter(erargs.name.values())}") ERmain(erargs, seed, baked_server_options=meta["server_options"]) return upload_to_db(target.name, sid, owner, race) thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) thread = thread_pool.submit(task) try: return thread.result(app.config["JOB_TIME"]) except concurrent.futures.TimeoutError as e: if sid: with db_session: gen = Generation.get(id=sid) if gen is not None: gen.state = STATE_ERROR meta = json.loads(gen.meta) meta["error"] = ( "Allowed time for Generation exceeded, please consider generating locally instead. " + e.__class__.__name__ + ": " + str(e)) gen.meta = json.dumps(meta) commit() except BaseException as e: if sid: with db_session: gen = Generation.get(id=sid) if gen is not None: gen.state = STATE_ERROR meta = json.loads(gen.meta) meta["error"] = (e.__class__.__name__ + ": " + str(e)) gen.meta = json.dumps(meta) commit() raise @app.route('/wait/') def wait_seed(seed: UUID): seed_id = seed seed = Seed.get(id=seed_id) if seed: return redirect(url_for("view_seed", seed=seed_id)) generation = Generation.get(id=seed_id) if not generation: return "Generation not found." elif generation.state == STATE_ERROR: return render_template("seedError.html", seed_error=generation.meta) return render_template("waitSeed.html", seed_id=seed_id) def upload_to_db(folder, sid, owner, race): for file in os.listdir(folder): file = os.path.join(folder, file) if file.endswith(".zip"): with db_session: with zipfile.ZipFile(file) as zfile: res = upload_zip_to_db(zfile, owner, {"race": race}, sid) if type(res) == "str": raise Exception(res) elif res: seed = res gen = Generation.get(id=seed.id) if gen is not None: gen.delete() return seed.id raise Exception("Generation zipfile not found.")