from __future__ import annotations import logging import json import multiprocessing from datetime import timedelta, datetime import concurrent.futures import sys import typing import time from pony.orm import db_session, select, commit from Utils import restricted_loads class CommonLocker(): """Uses a file lock to signal that something is already running""" def __init__(self, lockname: str): self.lockname = lockname self.lockfile = f"./{self.lockname}.lck" class AlreadyRunningException(Exception): pass if sys.platform == 'win32': import os class Locker(CommonLocker): def __enter__(self): try: if os.path.exists(self.lockfile): os.unlink(self.lockfile) self.fp = os.open( self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) except OSError as e: raise AlreadyRunningException() from e def __exit__(self, _type, value, tb): fp = getattr(self, "fp", None) if fp: os.close(self.fp) os.unlink(self.lockfile) else: # unix import fcntl class Locker(CommonLocker): def __enter__(self): try: self.fp = open(self.lockfile, "wb") fcntl.flock(self.fp.fileno(), fcntl.LOCK_EX) except OSError as e: raise AlreadyRunningException() from e def __exit__(self, _type, value, tb): fcntl.flock(self.fp.fileno(), fcntl.LOCK_UN) self.fp.close() def launch_room(room: Room, config: dict): # requires db_session! if room.last_activity >= datetime.utcnow() - timedelta(seconds=room.timeout): multiworld = multiworlds.get(room.id, None) if not multiworld: multiworld = MultiworldInstance(room, config) multiworld.start() def handle_generation_success(seed_id): logging.info(f"Generation finished for seed {seed_id}") def handle_generation_failure(result: BaseException): try: # hacky way to get the full RemoteTraceback raise result except Exception as e: logging.exception(e) def launch_generator(pool: multiprocessing.pool.Pool, generation: Generation): try: meta = json.loads(generation.meta) options = restricted_loads(generation.options) logging.info(f"Generating {generation.id} for {len(options)} players") pool.apply_async(gen_game, (options,), {"race": meta["race"], "sid": generation.id, "owner": generation.owner}, handle_generation_success, handle_generation_failure) except: generation.state = STATE_ERROR commit() raise else: generation.state = STATE_STARTED def init_db(pony_config: dict): db.bind(**pony_config) db.generate_mapping() def autohost(config: dict): def keep_running(): try: with Locker("autohost"): with multiprocessing.Pool(config["GENERATORS"], initializer=init_db, initargs=(config["PONY"],)) as generator_pool: with db_session: to_start = select(generation for generation in Generation if generation.state == STATE_STARTED) if to_start: logging.info("Resuming generation") for generation in to_start: sid = Seed.get(id=generation.id) if sid: generation.delete() else: launch_generator(generator_pool, generation) commit() select(generation for generation in Generation if generation.state == STATE_ERROR).delete() while 1: time.sleep(0.50) with db_session: rooms = select( room for room in Room if room.last_activity >= datetime.utcnow() - timedelta(days=3)) for room in rooms: launch_room(room, config) to_start = select( generation for generation in Generation if generation.state == STATE_QUEUED) for generation in to_start: launch_generator(generator_pool, generation) except AlreadyRunningException: pass import threading threading.Thread(target=keep_running).start() multiworlds = {} guardians = concurrent.futures.ThreadPoolExecutor(2, thread_name_prefix="Guardian") class MultiworldInstance(): def __init__(self, room: Room, config: dict): self.room_id = room.id self.process: typing.Optional[multiprocessing.Process] = None multiworlds[self.room_id] = self self.ponyconfig = config["PONY"] def start(self): if self.process and self.process.is_alive(): return False logging.info(f"Spinning up {self.room_id}") self.process = multiprocessing.Process(group=None, target=run_server_process, args=(self.room_id, self.ponyconfig), name="MultiHost") self.process.start() self.guardian = guardians.submit(self._collect) def stop(self): if self.process: self.process.terminate() self.process = None def _collect(self): self.process.join() # wait for process to finish self.process = None self.guardian = None from .models import Room, Generation, STATE_QUEUED, STATE_STARTED, STATE_ERROR, db, Seed from .customserver import run_server_process from .generate import gen_game