from __future__ import annotations import json import logging import multiprocessing import typing from datetime import timedelta, datetime from threading import Event, Thread from typing import Any from uuid import UUID from pony.orm import db_session, select, commit from Utils import restricted_loads from .locker import Locker, AlreadyRunningException _stop_event = Event() def stop(): """Stops previously launched threads""" global _stop_event stop_event = _stop_event _stop_event = Event() # new event for new threads stop_event.set() def handle_generation_success(seed_id): logging.info(f"Generation finished for seed {seed_id}") def handle_generation_failure(result: BaseException): try: # hacky way to get the full RemoteTraceback raise result except Exception as e: logging.exception(e) def launch_generator(pool: multiprocessing.pool.Pool, generation: Generation): try: meta = json.loads(generation.meta) options = restricted_loads(generation.options) logging.info(f"Generating {generation.id} for {len(options)} players") pool.apply_async(gen_game, (options,), {"meta": meta, "sid": generation.id, "owner": generation.owner}, handle_generation_success, handle_generation_failure) except Exception as e: generation.state = STATE_ERROR commit() logging.exception(e) else: generation.state = STATE_STARTED def init_generator(config: dict[str, Any]) -> None: try: import resource except ModuleNotFoundError: pass # unix only module else: # set soft limit for memory to from config (default 4GiB) soft_limit = config["GENERATOR_MEMORY_LIMIT"] old_limit, hard_limit = resource.getrlimit(resource.RLIMIT_AS) if soft_limit != old_limit: resource.setrlimit(resource.RLIMIT_AS, (soft_limit, hard_limit)) logging.debug(f"Changed AS mem limit {old_limit} -> {soft_limit}") del resource, soft_limit, hard_limit pony_config = config["PONY"] db.bind(**pony_config) db.generate_mapping() def cleanup(): """delete unowned user-content""" with db_session: # >>> bool(uuid.UUID(int=0)) # True rooms = Room.select(lambda room: room.owner == UUID(int=0)).delete(bulk=True) seeds = Seed.select(lambda seed: seed.owner == UUID(int=0) and not seed.rooms).delete(bulk=True) slots = Slot.select(lambda slot: not slot.seed).delete(bulk=True) # Command gets deleted by ponyorm Cascade Delete, as Room is Required if rooms or seeds or slots: logging.info(f"{rooms} Rooms, {seeds} Seeds and {slots} Slots have been deleted.") def autohost(config: dict): def keep_running(): stop_event = _stop_event try: with Locker("autohost"): cleanup() hosters = [] for x in range(config["HOSTERS"]): hoster = MultiworldInstance(config, x) hosters.append(hoster) hoster.start() while not stop_event.wait(0.1): with db_session: rooms = select( room for room in Room if room.last_activity >= datetime.utcnow() - timedelta(days=3)) for room in rooms: # we have to filter twice, as the per-room timeout can't currently be PonyORM transpiled. if room.last_activity >= datetime.utcnow() - timedelta(seconds=room.timeout + 5): hosters[room.id.int % len(hosters)].start_room(room.id) except AlreadyRunningException: logging.info("Autohost reports as already running, not starting another.") Thread(target=keep_running, name="AP_Autohost").start() def autogen(config: dict): def keep_running(): stop_event = _stop_event try: with Locker("autogen"): with multiprocessing.Pool(config["GENERATORS"], initializer=init_generator, initargs=(config,), maxtasksperchild=10) as generator_pool: with db_session: to_start = select(generation for generation in Generation if generation.state == STATE_STARTED) if to_start: logging.info("Resuming generation") for generation in to_start: sid = Seed.get(id=generation.id) if sid: generation.delete() else: launch_generator(generator_pool, generation) commit() select(generation for generation in Generation if generation.state == STATE_ERROR).delete() while not stop_event.wait(0.1): with db_session: # for update locks the database row(s) during transaction, preventing writes from elsewhere to_start = select( generation for generation in Generation if generation.state == STATE_QUEUED).for_update() for generation in to_start: launch_generator(generator_pool, generation) except AlreadyRunningException: logging.info("Autogen reports as already running, not starting another.") Thread(target=keep_running, name="AP_Autogen").start() multiworlds: typing.Dict[type(Room.id), MultiworldInstance] = {} class MultiworldInstance(): def __init__(self, config: dict, id: int): self.room_ids = set() self.process: typing.Optional[multiprocessing.Process] = None self.ponyconfig = config["PONY"] self.cert = config["SELFLAUNCHCERT"] self.key = config["SELFLAUNCHKEY"] self.host = config["HOST_ADDRESS"] self.rooms_to_start = multiprocessing.Queue() self.rooms_shutting_down = multiprocessing.Queue() self.name = f"MultiHoster{id}" def start(self): if self.process and self.process.is_alive(): return False process = multiprocessing.Process(group=None, target=run_server_process, args=(self.name, self.ponyconfig, get_static_server_data(), self.cert, self.key, self.host, self.rooms_to_start, self.rooms_shutting_down), name=self.name) process.start() self.process = process def start_room(self, room_id): while not self.rooms_shutting_down.empty(): self.room_ids.remove(self.rooms_shutting_down.get(block=True, timeout=None)) if room_id in self.room_ids: pass # should already be hosted currently. else: self.room_ids.add(room_id) self.rooms_to_start.put(room_id) def stop(self): if self.process: self.process.terminate() self.process = None def done(self): return self.process and not self.process.is_alive() def collect(self): self.process.join() # wait for process to finish self.process = None from .models import Room, Generation, STATE_QUEUED, STATE_STARTED, STATE_ERROR, db, Seed, Slot from .customserver import run_server_process, get_static_server_data from .generate import gen_game