WebHost: Some refactors and additional checks when uploading files. (#2549)
This commit is contained in:
parent
db1d195cb0
commit
0eefe9e936
|
@ -1,3 +1,4 @@
|
||||||
|
import os
|
||||||
import zipfile
|
import zipfile
|
||||||
import base64
|
import base64
|
||||||
from typing import Union, Dict, Set, Tuple
|
from typing import Union, Dict, Set, Tuple
|
||||||
|
@ -6,13 +7,7 @@ from flask import request, flash, redirect, url_for, render_template
|
||||||
from markupsafe import Markup
|
from markupsafe import Markup
|
||||||
|
|
||||||
from WebHostLib import app
|
from WebHostLib import app
|
||||||
|
from WebHostLib.upload import allowed_options, allowed_options_extensions, banned_file
|
||||||
banned_zip_contents = (".sfc",)
|
|
||||||
|
|
||||||
|
|
||||||
def allowed_file(filename):
|
|
||||||
return filename.endswith(('.txt', ".yaml", ".zip"))
|
|
||||||
|
|
||||||
|
|
||||||
from Generate import roll_settings, PlandoOptions
|
from Generate import roll_settings, PlandoOptions
|
||||||
from Utils import parse_yamls
|
from Utils import parse_yamls
|
||||||
|
@ -51,33 +46,41 @@ def mysterycheck():
|
||||||
def get_yaml_data(files) -> Union[Dict[str, str], str, Markup]:
|
def get_yaml_data(files) -> Union[Dict[str, str], str, Markup]:
|
||||||
options = {}
|
options = {}
|
||||||
for uploaded_file in files:
|
for uploaded_file in files:
|
||||||
# if user does not select file, browser also
|
if banned_file(uploaded_file.filename):
|
||||||
# submit an empty part without filename
|
return ("Uploaded data contained a rom file, which is likely to contain copyrighted material. "
|
||||||
if uploaded_file.filename == '':
|
"Your file was deleted.")
|
||||||
return 'No selected file'
|
# If the user does not select file, the browser will still submit an empty string without a file name.
|
||||||
|
elif uploaded_file.filename == "":
|
||||||
|
return "No selected file."
|
||||||
elif uploaded_file.filename in options:
|
elif uploaded_file.filename in options:
|
||||||
return f'Conflicting files named {uploaded_file.filename} submitted'
|
return f"Conflicting files named {uploaded_file.filename} submitted."
|
||||||
elif uploaded_file and allowed_file(uploaded_file.filename):
|
elif uploaded_file and allowed_options(uploaded_file.filename):
|
||||||
if uploaded_file.filename.endswith(".zip"):
|
if uploaded_file.filename.endswith(".zip"):
|
||||||
|
if not zipfile.is_zipfile(uploaded_file):
|
||||||
|
return f"Uploaded file {uploaded_file.filename} is not a valid .zip file and cannot be opened."
|
||||||
|
|
||||||
with zipfile.ZipFile(uploaded_file, 'r') as zfile:
|
uploaded_file.seek(0) # offset from is_zipfile check
|
||||||
infolist = zfile.infolist()
|
with zipfile.ZipFile(uploaded_file, "r") as zfile:
|
||||||
|
for file in zfile.infolist():
|
||||||
|
# Remove folder pathing from str (e.g. "__MACOSX/" folder paths from archives created by macOS).
|
||||||
|
base_filename = os.path.basename(file.filename)
|
||||||
|
|
||||||
if any(file.filename.endswith(".archipelago") for file in infolist):
|
if base_filename.endswith(".archipelago"):
|
||||||
return Markup("Error: Your .zip file contains an .archipelago file. "
|
return Markup("Error: Your .zip file contains an .archipelago file. "
|
||||||
'Did you mean to <a href="/uploads">host a game</a>?')
|
'Did you mean to <a href="/uploads">host a game</a>?')
|
||||||
|
elif base_filename.endswith(".zip"):
|
||||||
for file in infolist:
|
return "Nested .zip files inside a .zip are not supported."
|
||||||
if file.filename.endswith(banned_zip_contents):
|
elif banned_file(base_filename):
|
||||||
return ("Uploaded data contained a rom file, "
|
return ("Uploaded data contained a rom file, which is likely to contain copyrighted "
|
||||||
"which is likely to contain copyrighted material. "
|
"material. Your file was deleted.")
|
||||||
"Your file was deleted.")
|
# Ignore dot-files.
|
||||||
elif file.filename.endswith((".yaml", ".json", ".yml", ".txt")):
|
elif not base_filename.startswith(".") and allowed_options(base_filename):
|
||||||
options[file.filename] = zfile.open(file, "r").read()
|
options[file.filename] = zfile.open(file, "r").read()
|
||||||
else:
|
else:
|
||||||
options[uploaded_file.filename] = uploaded_file.read()
|
options[uploaded_file.filename] = uploaded_file.read()
|
||||||
|
|
||||||
if not options:
|
if not options:
|
||||||
return "Did not find a .yaml file to process."
|
return f"Did not find any valid files to process. Accepted formats: {allowed_options_extensions}"
|
||||||
return options
|
return options
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,22 @@ from worlds.Files import AutoPatchRegister
|
||||||
from . import app
|
from . import app
|
||||||
from .models import Seed, Room, Slot, GameDataPackage
|
from .models import Seed, Room, Slot, GameDataPackage
|
||||||
|
|
||||||
banned_zip_contents = (".sfc", ".z64", ".n64", ".sms", ".gb")
|
banned_extensions = (".sfc", ".z64", ".n64", ".nes", ".smc", ".sms", ".gb", ".gbc", ".gba")
|
||||||
|
allowed_options_extensions = (".yaml", ".json", ".yml", ".txt", ".zip")
|
||||||
|
allowed_generation_extensions = (".archipelago", ".zip")
|
||||||
|
|
||||||
|
|
||||||
|
def allowed_options(filename: str) -> bool:
|
||||||
|
return filename.endswith(allowed_options_extensions)
|
||||||
|
|
||||||
|
|
||||||
|
def allowed_generation(filename: str) -> bool:
|
||||||
|
return filename.endswith(allowed_generation_extensions)
|
||||||
|
|
||||||
|
|
||||||
|
def banned_file(filename: str) -> bool:
|
||||||
|
return filename.endswith(banned_extensions)
|
||||||
|
|
||||||
|
|
||||||
def process_multidata(compressed_multidata, files={}):
|
def process_multidata(compressed_multidata, files={}):
|
||||||
decompressed_multidata = MultiServer.Context.decompress(compressed_multidata)
|
decompressed_multidata = MultiServer.Context.decompress(compressed_multidata)
|
||||||
|
@ -61,8 +76,8 @@ def upload_zip_to_db(zfile: zipfile.ZipFile, owner=None, meta={"race": False}, s
|
||||||
if not owner:
|
if not owner:
|
||||||
owner = session["_id"]
|
owner = session["_id"]
|
||||||
infolist = zfile.infolist()
|
infolist = zfile.infolist()
|
||||||
if all(file.filename.endswith((".yaml", ".yml")) or file.is_dir() for file in infolist):
|
if all(allowed_options(file.filename) or file.is_dir() for file in infolist):
|
||||||
flash(Markup("Error: Your .zip file only contains .yaml files. "
|
flash(Markup("Error: Your .zip file only contains options files. "
|
||||||
'Did you mean to <a href="/generate">generate a game</a>?'))
|
'Did you mean to <a href="/generate">generate a game</a>?'))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -73,7 +88,7 @@ def upload_zip_to_db(zfile: zipfile.ZipFile, owner=None, meta={"race": False}, s
|
||||||
# Load files.
|
# Load files.
|
||||||
for file in infolist:
|
for file in infolist:
|
||||||
handler = AutoPatchRegister.get_handler(file.filename)
|
handler = AutoPatchRegister.get_handler(file.filename)
|
||||||
if file.filename.endswith(banned_zip_contents):
|
if banned_file(file.filename):
|
||||||
return "Uploaded data contained a rom file, which is likely to contain copyrighted material. " \
|
return "Uploaded data contained a rom file, which is likely to contain copyrighted material. " \
|
||||||
"Your file was deleted."
|
"Your file was deleted."
|
||||||
|
|
||||||
|
@ -136,35 +151,34 @@ def upload_zip_to_db(zfile: zipfile.ZipFile, owner=None, meta={"race": False}, s
|
||||||
flash("No multidata was found in the zip file, which is required.")
|
flash("No multidata was found in the zip file, which is required.")
|
||||||
|
|
||||||
|
|
||||||
@app.route('/uploads', methods=['GET', 'POST'])
|
@app.route("/uploads", methods=["GET", "POST"])
|
||||||
def uploads():
|
def uploads():
|
||||||
if request.method == 'POST':
|
if request.method == "POST":
|
||||||
# check if the post request has the file part
|
# check if the POST request has a file part.
|
||||||
if 'file' not in request.files:
|
if "file" not in request.files:
|
||||||
flash('No file part')
|
flash("No file part in POST request.")
|
||||||
else:
|
else:
|
||||||
file = request.files['file']
|
uploaded_file = request.files["file"]
|
||||||
# if user does not select file, browser also
|
# If the user does not select file, the browser will still submit an empty string without a file name.
|
||||||
# submit an empty part without filename
|
if uploaded_file.filename == "":
|
||||||
if file.filename == '':
|
flash("No selected file.")
|
||||||
flash('No selected file')
|
elif uploaded_file and allowed_generation(uploaded_file.filename):
|
||||||
elif file and allowed_file(file.filename):
|
if zipfile.is_zipfile(uploaded_file):
|
||||||
if zipfile.is_zipfile(file):
|
with zipfile.ZipFile(uploaded_file, "r") as zfile:
|
||||||
with zipfile.ZipFile(file, 'r') as zfile:
|
|
||||||
try:
|
try:
|
||||||
res = upload_zip_to_db(zfile)
|
res = upload_zip_to_db(zfile)
|
||||||
except VersionException:
|
except VersionException:
|
||||||
flash(f"Could not load multidata. Wrong Version detected.")
|
flash(f"Could not load multidata. Wrong Version detected.")
|
||||||
else:
|
else:
|
||||||
if type(res) == str:
|
if res is str:
|
||||||
return res
|
return res
|
||||||
elif res:
|
elif res:
|
||||||
return redirect(url_for("view_seed", seed=res.id))
|
return redirect(url_for("view_seed", seed=res.id))
|
||||||
else:
|
else:
|
||||||
file.seek(0) # offset from is_zipfile check
|
uploaded_file.seek(0) # offset from is_zipfile check
|
||||||
# noinspection PyBroadException
|
# noinspection PyBroadException
|
||||||
try:
|
try:
|
||||||
multidata = file.read()
|
multidata = uploaded_file.read()
|
||||||
slots, multidata = process_multidata(multidata)
|
slots, multidata = process_multidata(multidata)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
flash(f"Could not load multidata. File may be corrupted or incompatible. ({e})")
|
flash(f"Could not load multidata. File may be corrupted or incompatible. ({e})")
|
||||||
|
@ -182,7 +196,3 @@ def user_content():
|
||||||
rooms = select(room for room in Room if room.owner == session["_id"])
|
rooms = select(room for room in Room if room.owner == session["_id"])
|
||||||
seeds = select(seed for seed in Seed if seed.owner == session["_id"])
|
seeds = select(seed for seed in Seed if seed.owner == session["_id"])
|
||||||
return render_template("userContent.html", rooms=rooms, seeds=seeds)
|
return render_template("userContent.html", rooms=rooms, seeds=seeds)
|
||||||
|
|
||||||
|
|
||||||
def allowed_file(filename):
|
|
||||||
return filename.endswith(('.archipelago', ".zip"))
|
|
||||||
|
|
Loading…
Reference in New Issue