122 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			122 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
| import importlib
 | |
| import os
 | |
| import sys
 | |
| import warnings
 | |
| import zipimport
 | |
| import time
 | |
| import dataclasses
 | |
| from typing import Dict, List, TypedDict, Optional
 | |
| 
 | |
| from Utils import local_path, user_path
 | |
| 
 | |
| local_folder = os.path.dirname(__file__)
 | |
| user_folder = user_path("worlds") if user_path() != local_path() else None
 | |
| 
 | |
| __all__ = {
 | |
|     "network_data_package",
 | |
|     "AutoWorldRegister",
 | |
|     "world_sources",
 | |
|     "local_folder",
 | |
|     "user_folder",
 | |
|     "GamesPackage",
 | |
|     "DataPackage",
 | |
|     "failed_world_loads",
 | |
| }
 | |
| 
 | |
| 
 | |
| failed_world_loads: List[str] = []
 | |
| 
 | |
| 
 | |
| class GamesPackage(TypedDict, total=False):
 | |
|     item_name_groups: Dict[str, List[str]]
 | |
|     item_name_to_id: Dict[str, int]
 | |
|     location_name_groups: Dict[str, List[str]]
 | |
|     location_name_to_id: Dict[str, int]
 | |
|     checksum: str
 | |
|     version: int  # TODO: Remove support after per game data packages API change.
 | |
| 
 | |
| 
 | |
| class DataPackage(TypedDict):
 | |
|     games: Dict[str, GamesPackage]
 | |
| 
 | |
| 
 | |
| @dataclasses.dataclass(order=True)
 | |
| class WorldSource:
 | |
|     path: str  # typically relative path from this module
 | |
|     is_zip: bool = False
 | |
|     relative: bool = True  # relative to regular world import folder
 | |
|     time_taken: Optional[float] = None
 | |
| 
 | |
|     def __repr__(self) -> str:
 | |
|         return f"{self.__class__.__name__}({self.path}, is_zip={self.is_zip}, relative={self.relative})"
 | |
| 
 | |
|     @property
 | |
|     def resolved_path(self) -> str:
 | |
|         if self.relative:
 | |
|             return os.path.join(local_folder, self.path)
 | |
|         return self.path
 | |
| 
 | |
|     def load(self) -> bool:
 | |
|         try:
 | |
|             start = time.perf_counter()
 | |
|             if self.is_zip:
 | |
|                 importer = zipimport.zipimporter(self.resolved_path)
 | |
|                 if hasattr(importer, "find_spec"):  # new in Python 3.10
 | |
|                     spec = importer.find_spec(os.path.basename(self.path).rsplit(".", 1)[0])
 | |
|                     assert spec, f"{self.path} is not a loadable module"
 | |
|                     mod = importlib.util.module_from_spec(spec)
 | |
|                 else:  # TODO: remove with 3.8 support
 | |
|                     mod = importer.load_module(os.path.basename(self.path).rsplit(".", 1)[0])
 | |
| 
 | |
|                 mod.__package__ = f"worlds.{mod.__package__}"
 | |
|                 mod.__name__ = f"worlds.{mod.__name__}"
 | |
|                 sys.modules[mod.__name__] = mod
 | |
|                 with warnings.catch_warnings():
 | |
|                     warnings.filterwarnings("ignore", message="__package__ != __spec__.parent")
 | |
|                     # Found no equivalent for < 3.10
 | |
|                     if hasattr(importer, "exec_module"):
 | |
|                         importer.exec_module(mod)
 | |
|             else:
 | |
|                 importlib.import_module(f".{self.path}", "worlds")
 | |
|             self.time_taken = time.perf_counter()-start
 | |
|             return True
 | |
| 
 | |
|         except Exception:
 | |
|             # A single world failing can still mean enough is working for the user, log and carry on
 | |
|             import traceback
 | |
|             import io
 | |
|             file_like = io.StringIO()
 | |
|             print(f"Could not load world {self}:", file=file_like)
 | |
|             traceback.print_exc(file=file_like)
 | |
|             file_like.seek(0)
 | |
|             import logging
 | |
|             logging.exception(file_like.read())
 | |
|             failed_world_loads.append(os.path.basename(self.path).rsplit(".", 1)[0])
 | |
|             return False
 | |
| 
 | |
| 
 | |
| # find potential world containers, currently folders and zip-importable .apworld's
 | |
| world_sources: List[WorldSource] = []
 | |
| for folder in (folder for folder in (user_folder, local_folder) if folder):
 | |
|     relative = folder == local_folder
 | |
|     for entry in os.scandir(folder):
 | |
|         # prevent loading of __pycache__ and allow _* for non-world folders, disable files/folders starting with "."
 | |
|         if not entry.name.startswith(("_", ".")):
 | |
|             file_name = entry.name if relative else os.path.join(folder, entry.name)
 | |
|             if entry.is_dir():
 | |
|                 world_sources.append(WorldSource(file_name, relative=relative))
 | |
|             elif entry.is_file() and entry.name.endswith(".apworld"):
 | |
|                 world_sources.append(WorldSource(file_name, is_zip=True, relative=relative))
 | |
| 
 | |
| # import all submodules to trigger AutoWorldRegister
 | |
| world_sources.sort()
 | |
| for world_source in world_sources:
 | |
|     world_source.load()
 | |
| 
 | |
| # Build the data package for each game.
 | |
| from .AutoWorld import AutoWorldRegister
 | |
| 
 | |
| network_data_package: DataPackage = {
 | |
|     "games": {world_name: world.get_data_package_data() for world_name, world in AutoWorldRegister.world_types.items()},
 | |
| }
 |