206 lines
7.7 KiB
Python
206 lines
7.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import multiprocessing
|
|
import typing
|
|
from datetime import timedelta, datetime
|
|
from threading import Event, Thread
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from pony.orm import db_session, select, commit
|
|
|
|
from Utils import restricted_loads
|
|
from .locker import Locker, AlreadyRunningException
|
|
|
|
_stop_event = Event()
|
|
|
|
|
|
def stop():
|
|
"""Stops previously launched threads"""
|
|
global _stop_event
|
|
stop_event = _stop_event
|
|
_stop_event = Event() # new event for new threads
|
|
stop_event.set()
|
|
|
|
|
|
def handle_generation_success(seed_id):
|
|
logging.info(f"Generation finished for seed {seed_id}")
|
|
|
|
|
|
def handle_generation_failure(result: BaseException):
|
|
try: # hacky way to get the full RemoteTraceback
|
|
raise result
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
|
|
|
|
def launch_generator(pool: multiprocessing.pool.Pool, generation: Generation):
|
|
try:
|
|
meta = json.loads(generation.meta)
|
|
options = restricted_loads(generation.options)
|
|
logging.info(f"Generating {generation.id} for {len(options)} players")
|
|
pool.apply_async(gen_game, (options,),
|
|
{"meta": meta,
|
|
"sid": generation.id,
|
|
"owner": generation.owner},
|
|
handle_generation_success, handle_generation_failure)
|
|
except Exception as e:
|
|
generation.state = STATE_ERROR
|
|
commit()
|
|
logging.exception(e)
|
|
else:
|
|
generation.state = STATE_STARTED
|
|
|
|
|
|
def init_generator(config: dict[str, Any]) -> None:
|
|
try:
|
|
import resource
|
|
except ModuleNotFoundError:
|
|
pass # unix only module
|
|
else:
|
|
# set soft limit for memory to from config (default 4GiB)
|
|
soft_limit = config["GENERATOR_MEMORY_LIMIT"]
|
|
old_limit, hard_limit = resource.getrlimit(resource.RLIMIT_AS)
|
|
if soft_limit != old_limit:
|
|
resource.setrlimit(resource.RLIMIT_AS, (soft_limit, hard_limit))
|
|
logging.debug(f"Changed AS mem limit {old_limit} -> {soft_limit}")
|
|
del resource, soft_limit, hard_limit
|
|
|
|
pony_config = config["PONY"]
|
|
db.bind(**pony_config)
|
|
db.generate_mapping()
|
|
|
|
|
|
def cleanup():
|
|
"""delete unowned user-content"""
|
|
with db_session:
|
|
# >>> bool(uuid.UUID(int=0))
|
|
# True
|
|
rooms = Room.select(lambda room: room.owner == UUID(int=0)).delete(bulk=True)
|
|
seeds = Seed.select(lambda seed: seed.owner == UUID(int=0) and not seed.rooms).delete(bulk=True)
|
|
slots = Slot.select(lambda slot: not slot.seed).delete(bulk=True)
|
|
# Command gets deleted by ponyorm Cascade Delete, as Room is Required
|
|
if rooms or seeds or slots:
|
|
logging.info(f"{rooms} Rooms, {seeds} Seeds and {slots} Slots have been deleted.")
|
|
|
|
|
|
def autohost(config: dict):
|
|
def keep_running():
|
|
stop_event = _stop_event
|
|
try:
|
|
with Locker("autohost"):
|
|
cleanup()
|
|
hosters = []
|
|
for x in range(config["HOSTERS"]):
|
|
hoster = MultiworldInstance(config, x)
|
|
hosters.append(hoster)
|
|
hoster.start()
|
|
|
|
while not stop_event.wait(0.1):
|
|
with db_session:
|
|
rooms = select(
|
|
room for room in Room if
|
|
room.last_activity >= datetime.utcnow() - timedelta(days=3))
|
|
for room in rooms:
|
|
# we have to filter twice, as the per-room timeout can't currently be PonyORM transpiled.
|
|
if room.last_activity >= datetime.utcnow() - timedelta(seconds=room.timeout + 5):
|
|
hosters[room.id.int % len(hosters)].start_room(room.id)
|
|
|
|
except AlreadyRunningException:
|
|
logging.info("Autohost reports as already running, not starting another.")
|
|
|
|
Thread(target=keep_running, name="AP_Autohost").start()
|
|
|
|
|
|
def autogen(config: dict):
|
|
def keep_running():
|
|
stop_event = _stop_event
|
|
try:
|
|
with Locker("autogen"):
|
|
|
|
with multiprocessing.Pool(config["GENERATORS"], initializer=init_generator,
|
|
initargs=(config,), maxtasksperchild=10) as generator_pool:
|
|
with db_session:
|
|
to_start = select(generation for generation in Generation if generation.state == STATE_STARTED)
|
|
|
|
if to_start:
|
|
logging.info("Resuming generation")
|
|
for generation in to_start:
|
|
sid = Seed.get(id=generation.id)
|
|
if sid:
|
|
generation.delete()
|
|
else:
|
|
launch_generator(generator_pool, generation)
|
|
|
|
commit()
|
|
select(generation for generation in Generation if generation.state == STATE_ERROR).delete()
|
|
|
|
while not stop_event.wait(0.1):
|
|
with db_session:
|
|
# for update locks the database row(s) during transaction, preventing writes from elsewhere
|
|
to_start = select(
|
|
generation for generation in Generation
|
|
if generation.state == STATE_QUEUED).for_update()
|
|
for generation in to_start:
|
|
launch_generator(generator_pool, generation)
|
|
except AlreadyRunningException:
|
|
logging.info("Autogen reports as already running, not starting another.")
|
|
|
|
Thread(target=keep_running, name="AP_Autogen").start()
|
|
|
|
|
|
multiworlds: typing.Dict[type(Room.id), MultiworldInstance] = {}
|
|
|
|
|
|
class MultiworldInstance():
|
|
def __init__(self, config: dict, id: int):
|
|
self.room_ids = set()
|
|
self.process: typing.Optional[multiprocessing.Process] = None
|
|
self.ponyconfig = config["PONY"]
|
|
self.cert = config["SELFLAUNCHCERT"]
|
|
self.key = config["SELFLAUNCHKEY"]
|
|
self.host = config["HOST_ADDRESS"]
|
|
self.rooms_to_start = multiprocessing.Queue()
|
|
self.rooms_shutting_down = multiprocessing.Queue()
|
|
self.name = f"MultiHoster{id}"
|
|
|
|
def start(self):
|
|
if self.process and self.process.is_alive():
|
|
return False
|
|
|
|
process = multiprocessing.Process(group=None, target=run_server_process,
|
|
args=(self.name, self.ponyconfig, get_static_server_data(),
|
|
self.cert, self.key, self.host,
|
|
self.rooms_to_start, self.rooms_shutting_down),
|
|
name=self.name)
|
|
process.start()
|
|
self.process = process
|
|
|
|
def start_room(self, room_id):
|
|
while not self.rooms_shutting_down.empty():
|
|
self.room_ids.remove(self.rooms_shutting_down.get(block=True, timeout=None))
|
|
if room_id in self.room_ids:
|
|
pass # should already be hosted currently.
|
|
else:
|
|
self.room_ids.add(room_id)
|
|
self.rooms_to_start.put(room_id)
|
|
|
|
def stop(self):
|
|
if self.process:
|
|
self.process.terminate()
|
|
self.process = None
|
|
|
|
def done(self):
|
|
return self.process and not self.process.is_alive()
|
|
|
|
def collect(self):
|
|
self.process.join() # wait for process to finish
|
|
self.process = None
|
|
|
|
|
|
from .models import Room, Generation, STATE_QUEUED, STATE_STARTED, STATE_ERROR, db, Seed, Slot
|
|
from .customserver import run_server_process, get_static_server_data
|
|
from .generate import gen_game
|