Archipelago/WebHostLib/upload.py

222 lines
8.8 KiB
Python
Raw Normal View History

2021-05-15 23:16:51 +00:00
import base64
2022-10-16 23:08:31 +00:00
import json
import pickle
2022-10-16 23:08:31 +00:00
import typing
2021-09-17 23:02:26 +00:00
import uuid
2022-10-16 23:08:31 +00:00
import zipfile
import zlib
from io import BytesIO
from flask import request, flash, redirect, url_for, session, render_template
from markupsafe import Markup
from pony.orm import commit, flush, select, rollback
from pony.orm.core import TransactionIntegrityError
import schema
2022-10-16 23:08:31 +00:00
import MultiServer
from NetUtils import SlotType
from Utils import VersionException, __version__
from worlds import GamesPackage
from worlds.Files import AutoPatchRegister
from worlds.AutoWorld import data_package_checksum
2022-10-16 23:08:31 +00:00
from . import app
from .models import Seed, Room, Slot, GameDataPackage
banned_extensions = (".sfc", ".z64", ".n64", ".nes", ".smc", ".sms", ".gb", ".gbc", ".gba")
allowed_options_extensions = (".yaml", ".json", ".yml", ".txt", ".zip")
allowed_generation_extensions = (".archipelago", ".zip")
games_package_schema = schema.Schema({
"item_name_groups": {str: [str]},
"item_name_to_id": {str: int},
"location_name_groups": {str: [str]},
"location_name_to_id": {str: int},
schema.Optional("checksum"): str,
schema.Optional("version"): int,
})
def allowed_options(filename: str) -> bool:
return filename.endswith(allowed_options_extensions)
def allowed_generation(filename: str) -> bool:
return filename.endswith(allowed_generation_extensions)
def banned_file(filename: str) -> bool:
return filename.endswith(banned_extensions)
def process_multidata(compressed_multidata, files={}):
game_data: GamesPackage
decompressed_multidata = MultiServer.Context.decompress(compressed_multidata)
slots: typing.Set[Slot] = set()
if "datapackage" in decompressed_multidata:
# strip datapackage from multidata, leaving only the checksums
game_data_packages: typing.List[GameDataPackage] = []
for game, game_data in decompressed_multidata["datapackage"].items():
if game_data.get("checksum"):
original_checksum = game_data.pop("checksum")
game_data = games_package_schema.validate(game_data)
game_data = {key: value for key, value in sorted(game_data.items())}
game_data["checksum"] = data_package_checksum(game_data)
game_data_package = GameDataPackage(checksum=game_data["checksum"],
data=pickle.dumps(game_data))
if original_checksum != game_data["checksum"]:
raise Exception(f"Original checksum {original_checksum} != "
f"calculated checksum {game_data['checksum']} "
f"for game {game}.")
decompressed_multidata["datapackage"][game] = {
"version": game_data.get("version", 0),
"checksum": game_data["checksum"],
}
try:
commit() # commit game data package
game_data_packages.append(game_data_package)
except TransactionIntegrityError:
del game_data_package
rollback()
if "slot_info" in decompressed_multidata:
for slot, slot_info in decompressed_multidata["slot_info"].items():
# Ignore Player Groups (e.g. item links)
if slot_info.type == SlotType.group:
continue
slots.add(Slot(data=files.get(slot, None),
player_name=slot_info.name,
player_id=slot,
game=slot_info.game))
flush() # commit slots
compressed_multidata = compressed_multidata[0:1] + zlib.compress(pickle.dumps(decompressed_multidata), 9)
return slots, compressed_multidata
2021-09-17 23:02:26 +00:00
def upload_zip_to_db(zfile: zipfile.ZipFile, owner=None, meta={"race": False}, sid=None):
if not owner:
owner = session["_id"]
infolist = zfile.infolist()
if all(allowed_options(file.filename) or file.is_dir() for file in infolist):
flash(Markup("Error: Your .zip file only contains options files. "
'Did you mean to <a href="/generate">generate a game</a>?'))
return
2021-09-17 23:02:26 +00:00
spoiler = ""
files = {}
2021-09-17 23:02:26 +00:00
multidata = None
# Load files.
2021-09-17 23:02:26 +00:00
for file in infolist:
2022-03-18 03:53:09 +00:00
handler = AutoPatchRegister.get_handler(file.filename)
if banned_file(file.filename):
2021-09-17 23:02:26 +00:00
return "Uploaded data contained a rom file, which is likely to contain copyrighted material. " \
"Your file was deleted."
# AP Container
elif handler:
2021-09-17 23:02:26 +00:00
data = zfile.open(file, "r").read()
patch = handler(BytesIO(data))
patch.read()
files[patch.player] = data
# Spoiler
2021-09-17 23:02:26 +00:00
elif file.filename.endswith(".txt"):
spoiler = zfile.open(file, "r").read().decode("utf-8-sig")
2022-03-18 03:53:09 +00:00
# Multi-data
2021-09-17 23:02:26 +00:00
elif file.filename.endswith(".archipelago"):
try:
multidata = zfile.open(file).read()
except:
flash("Could not load multidata. File may be corrupted or incompatible.")
2021-11-22 16:57:23 +00:00
multidata = None
# Minecraft
elif file.filename.endswith(".apmc"):
data = zfile.open(file, "r").read()
metadata = json.loads(base64.b64decode(data).decode("utf-8"))
files[metadata["player_id"]] = data
# Factorio
elif file.filename.endswith(".zip"):
try:
_, _, slot_id, *_ = file.filename.split('_')[0].split('-', 3)
except ValueError:
flash("Error: Unexpected file found in .zip: " + file.filename)
return
data = zfile.open(file, "r").read()
files[int(slot_id[1:])] = data
# All other files using the standard MultiWorld.get_out_file_name_base method
else:
try:
_, _, slot_id, *_ = file.filename.split('.')[0].split('_', 3)
except ValueError:
flash("Error: Unexpected file found in .zip: " + file.filename)
return
data = zfile.open(file, "r").read()
files[int(slot_id[1:])] = data
# Load multi data.
2021-09-17 23:02:26 +00:00
if multidata:
slots, multidata = process_multidata(multidata, files)
2021-09-17 23:02:26 +00:00
seed = Seed(multidata=multidata, spoiler=spoiler, slots=slots, owner=owner, meta=json.dumps(meta),
id=sid if sid else uuid.uuid4())
flush() # create seed
for slot in slots:
slot.seed = seed
return seed
else:
flash("No multidata was found in the zip file, which is required.")
@app.route("/uploads", methods=["GET", "POST"])
2020-07-04 21:50:18 +00:00
def uploads():
if request.method == "POST":
# check if the POST request has a file part.
if "file" not in request.files:
flash("No file part in POST request.")
else:
uploaded_file = request.files["file"]
# If the user does not select file, the browser will still submit an empty string without a file name.
if uploaded_file.filename == "":
flash("No selected file.")
elif uploaded_file and allowed_generation(uploaded_file.filename):
if zipfile.is_zipfile(uploaded_file):
with zipfile.ZipFile(uploaded_file, "r") as zfile:
try:
res = upload_zip_to_db(zfile)
except VersionException:
flash(f"Could not load multidata. Wrong Version detected.")
else:
if res is str:
return res
elif res:
return redirect(url_for("view_seed", seed=res.id))
else:
uploaded_file.seek(0) # offset from is_zipfile check
# noinspection PyBroadException
try:
multidata = uploaded_file.read()
slots, multidata = process_multidata(multidata)
except Exception as e:
flash(f"Could not load multidata. File may be corrupted or incompatible. ({e})")
else:
seed = Seed(multidata=multidata, slots=slots, owner=session["_id"])
flush() # place into DB and generate ids
2021-11-25 19:48:58 +00:00
return redirect(url_for("view_seed", seed=seed.id))
else:
flash("Not recognized file format. Awaiting a .archipelago file or .zip containing one.")
return render_template("hostGame.html", version=__version__)
2020-12-04 23:22:12 +00:00
@app.route('/user-content', methods=['GET'])
def user_content():
rooms = select(room for room in Room if room.owner == session["_id"])
2020-12-04 22:25:49 +00:00
seeds = select(seed for seed in Seed if seed.owner == session["_id"])
2020-12-04 23:22:12 +00:00
return render_template("userContent.html", rooms=rooms, seeds=seeds)