Factorio integration

This commit is contained in:
Fabian Dill 2021-04-01 11:40:58 +02:00
parent 1f5bcb6273
commit dc73fa0f33
36 changed files with 1154 additions and 551 deletions

1
.gitignore vendored
View File

@ -35,3 +35,4 @@ mystery_result_*.yaml
success.txt
output/
Output Logs/
/factorio/

View File

@ -158,6 +158,10 @@ class MultiWorld():
def hk_player_ids(self):
yield from (player for player in range(1, self.players + 1) if self.game[player] == "Hollow Knight")
@property
def factorio_player_ids(self):
yield from (player for player in range(1, self.players + 1) if self.game[player] == "Factorio")
def get_name_string_for_object(self, obj) -> str:
return obj.name if self.players == 1 else f'{obj.name} ({self.get_player_names(obj.player)})'
@ -1302,9 +1306,6 @@ class Spoiler(object):
self.bosses[str(player)]["Ganons Tower"] = "Agahnim 2"
self.bosses[str(player)]["Ganon"] = "Ganon"
if self.world.players == 1:
self.bosses = self.bosses["1"]
from Utils import __version__ as APVersion
self.metadata = {'version': APVersion,
'logic': self.world.logic,

429
CommonClient.py Normal file
View File

@ -0,0 +1,429 @@
from __future__ import annotations
import logging
import typing
import asyncio
import urllib.parse
import prompt_toolkit
import websockets
from prompt_toolkit.patch_stdout import patch_stdout
import Utils
from MultiServer import CommandProcessor
from NetUtils import Endpoint, decode, NetworkItem, encode, JSONtoTextParser, color, ClientStatus
from Utils import Version
# logging note:
# logging.* gets send to only the text console, logger.* gets send to the WebUI as well, if it's initialized.
from worlds import network_data_package
from worlds.alttp import Items, Regions
logger = logging.getLogger("Client")
class ClientCommandProcessor(CommandProcessor):
def __init__(self, ctx: CommonContext):
self.ctx = ctx
def output(self, text: str):
logger.info(text)
def _cmd_exit(self) -> bool:
"""Close connections and client"""
self.ctx.exit_event.set()
return True
def _cmd_connect(self, address: str = "") -> bool:
"""Connect to a MultiWorld Server"""
self.ctx.server_address = None
asyncio.create_task(self.ctx.connect(address if address else None))
return True
def _cmd_disconnect(self) -> bool:
"""Disconnect from a MultiWorld Server"""
self.ctx.server_address = None
asyncio.create_task(self.ctx.disconnect())
return True
def _cmd_received(self) -> bool:
"""List all received items"""
logger.info('Received items:')
for index, item in enumerate(self.ctx.items_received, 1):
self.ctx.ui_node.notify_item_received(self.ctx.player_names[item.player], self.ctx.item_name_getter(item.item),
self.ctx.location_name_getter(item.location), index,
len(self.ctx.items_received),
self.ctx.item_name_getter(item.item) in Items.progression_items)
logging.info('%s from %s (%s) (%d/%d in list)' % (
color(self.ctx.item_name_getter(item.item), 'red', 'bold'),
color(self.ctx.player_names[item.player], 'yellow'),
self.ctx.location_name_getter(item.location), index, len(self.ctx.items_received)))
return True
def _cmd_missing(self) -> bool:
"""List all missing location checks, from your local game state"""
count = 0
checked_count = 0
for location, location_id in Regions.lookup_name_to_id.items():
if location_id < 0:
continue
if location_id not in self.ctx.locations_checked:
if location_id in self.ctx.missing_locations:
self.output('Missing: ' + location)
count += 1
elif location_id in self.ctx.checked_locations:
self.output('Checked: ' + location)
count += 1
checked_count += 1
if count:
self.output(
f"Found {count} missing location checks{f'. {checked_count} location checks previously visited.' if checked_count else ''}")
else:
self.output("No missing location checks found.")
return True
def _cmd_ready(self):
self.ctx.ready = not self.ctx.ready
if self.ctx.ready:
state = ClientStatus.CLIENT_READY
self.output("Readied up.")
else:
state = ClientStatus.CLIENT_CONNECTED
self.output("Unreadied.")
asyncio.create_task(self.ctx.send_msgs([{"cmd": "StatusUpdate", "status": state}]))
def default(self, raw: str):
asyncio.create_task(self.ctx.send_msgs([{"cmd": "Say", "text": raw}]))
class CommonContext():
starting_reconnect_delay = 5
current_reconnect_delay = starting_reconnect_delay
command_processor = ClientCommandProcessor
def __init__(self, server_address, password, found_items: bool):
# server state
self.server_address = server_address
self.password = password
self.server_task = None
self.server: typing.Optional[Endpoint] = None
self.server_version = Version(0, 0, 0)
# own state
self.finished_game = False
self.ready = False
self.found_items = found_items
self.team = None
self.slot = None
self.auth = None
self.ui_node = None
self.locations_checked: typing.Set[int] = set()
self.locations_scouted: typing.Set[int] = set()
self.items_received = []
self.missing_locations: typing.List[int] = []
self.checked_locations: typing.List[int] = []
self.locations_info = {}
self.input_queue = asyncio.Queue()
self.input_requests = 0
# game state
self.player_names: typing.Dict[int: str] = {}
self.exit_event = asyncio.Event()
self.watcher_event = asyncio.Event()
self.slow_mode = False
self.jsontotextparser = JSONtoTextParser(self)
self.set_getters(network_data_package)
async def connection_closed(self):
self.auth = None
self.items_received = []
self.locations_info = {}
self.server_version = Version(0, 0, 0)
if self.server and self.server.socket is not None:
await self.server.socket.close()
self.server = None
self.server_task = None
def set_getters(self, data_package: dict, network=False):
if not network: # local data; check if newer data was already downloaded
local_package = Utils.persistent_load().get("datapackage", {}).get("latest", {})
if local_package and local_package["version"] > network_data_package["version"]:
data_package: dict = local_package
elif network: # check if data from server is newer
for key, value in data_package.items():
if type(value) == dict: # convert to int keys
data_package[key] = \
{int(subkey) if subkey.isdigit() else subkey: subvalue for subkey, subvalue in value.items()}
if data_package["version"] > network_data_package["version"]:
Utils.persistent_store("datapackage", "latest", network_data_package)
item_lookup: dict = data_package["lookup_any_item_id_to_name"]
locations_lookup: dict = data_package["lookup_any_location_id_to_name"]
def get_item_name_from_id(code: int):
return item_lookup.get(code, f'Unknown item (ID:{code})')
self.item_name_getter = get_item_name_from_id
def get_location_name_from_address(address: int):
return locations_lookup.get(address, f'Unknown location (ID:{address})')
self.location_name_getter = get_location_name_from_address
@property
def endpoints(self):
if self.server:
return [self.server]
else:
return []
async def disconnect(self):
if self.server and not self.server.socket.closed:
await self.server.socket.close()
self.ui_node.send_connection_status(self)
if self.server_task is not None:
await self.server_task
async def send_msgs(self, msgs):
if not self.server or not self.server.socket.open or self.server.socket.closed:
return
await self.server.socket.send(encode(msgs))
def consume_players_package(self, package: typing.List[tuple]):
self.player_names = {slot: name for team, slot, name, orig_name in package if self.team == team}
def event_invalid_slot(self):
raise Exception('Invalid Slot; please verify that you have connected to the correct world.')
async def server_auth(self, password_requested):
if password_requested and not self.password:
logger.info('Enter the password required to join this game:')
self.password = await self.console_input()
return self.password
async def console_input(self):
self.input_requests += 1
return await self.input_queue.get()
async def connect(self, address= None):
await self.disconnect()
self.server_task = asyncio.create_task(server_loop(self, address))
async def server_loop(ctx: CommonContext, address=None):
ui_node = getattr(ctx, "ui_node", None)
if ui_node:
ui_node.send_connection_status(ctx)
cached_address = None
if ctx.server and ctx.server.socket:
logger.error('Already connected')
return
if address is None: # set through CLI or APBP
address = ctx.server_address
# Wait for the user to provide a multiworld server address
if not address:
logger.info('Please connect to an Archipelago server.')
if ui_node:
ui_node.poll_for_server_ip()
return
address = f"ws://{address}" if "://" not in address else address
port = urllib.parse.urlparse(address).port or 38281
logger.info(f'Connecting to Archipelago server at {address}')
try:
socket = await websockets.connect(address, port=port, ping_timeout=None, ping_interval=None)
ctx.server = Endpoint(socket)
logger.info('Connected')
ctx.server_address = address
if ui_node:
ui_node.send_connection_status(ctx)
ctx.current_reconnect_delay = ctx.starting_reconnect_delay
async for data in ctx.server.socket:
for msg in decode(data):
await process_server_cmd(ctx, msg)
logger.warning('Disconnected from multiworld server, type /connect to reconnect')
except ConnectionRefusedError:
if cached_address:
logger.error('Unable to connect to multiworld server at cached address. '
'Please use the connect button above.')
else:
logger.error('Connection refused by the multiworld server')
except (OSError, websockets.InvalidURI):
logger.error('Failed to connect to the multiworld server')
except Exception as e:
logger.error('Lost connection to the multiworld server, type /connect to reconnect')
if not isinstance(e, websockets.WebSocketException):
logger.exception(e)
finally:
await ctx.connection_closed()
if ctx.server_address:
logger.info(f"... reconnecting in {ctx.current_reconnect_delay}s")
if ui_node:
ui_node.send_connection_status(ctx)
asyncio.create_task(server_autoreconnect(ctx))
ctx.current_reconnect_delay *= 2
async def server_autoreconnect(ctx: CommonContext):
await asyncio.sleep(ctx.current_reconnect_delay)
if ctx.server_address and ctx.server_task is None:
ctx.server_task = asyncio.create_task(server_loop(ctx))
async def process_server_cmd(ctx: CommonContext, args: dict):
try:
cmd = args["cmd"]
except:
logger.exception(f"Could not get command from {args}")
raise
if cmd == 'RoomInfo':
logger.info('--------------------------------')
logger.info('Room Information:')
logger.info('--------------------------------')
version = args["version"]
ctx.server_version = tuple(version)
version = ".".join(str(item) for item in version)
logger.info(f'Server protocol version: {version}')
logger.info("Server protocol tags: " + ", ".join(args["tags"]))
if args['password']:
logger.info('Password required')
logger.info(f"Forfeit setting: {args['forfeit_mode']}")
logger.info(f"Remaining setting: {args['remaining_mode']}")
logger.info(f"A !hint costs {args['hint_cost']} points and you get {args['location_check_points']}"
f" for each location checked.")
ctx.hint_cost = int(args['hint_cost'])
ctx.check_points = int(args['location_check_points'])
ctx.forfeit_mode = args['forfeit_mode']
ctx.remaining_mode = args['remaining_mode']
if ctx.ui_node:
ctx.ui_node.send_game_info(ctx)
if len(args['players']) < 1:
logger.info('No player connected')
else:
args['players'].sort()
current_team = -1
logger.info('Players:')
for team, slot, name in args['players']:
if team != current_team:
logger.info(f' Team #{team + 1}')
current_team = team
logger.info(' %s (Player %d)' % (name, slot))
if args["datapackage_version"] > network_data_package["version"]:
await ctx.send_msgs([{"cmd": "GetDataPackage"}])
await ctx.server_auth(args['password'])
elif cmd == 'DataPackage':
logger.info("Got new ID/Name Datapackage")
ctx.set_getters(args['data'], network=True)
elif cmd == 'ConnectionRefused':
errors = args["errors"]
if 'InvalidSlot' in errors:
ctx.event_invalid_slot()
elif 'SlotAlreadyTaken' in errors:
raise Exception('Player slot already in use for that team')
elif 'IncompatibleVersion' in errors:
raise Exception('Server reported your client version as incompatible')
# last to check, recoverable problem
elif 'InvalidPassword' in errors:
logger.error('Invalid password')
ctx.password = None
await ctx.server_auth(True)
else:
raise Exception("Unknown connection errors: " + str(errors))
raise Exception('Connection refused by the multiworld host, no reason provided')
elif cmd == 'Connected':
ctx.team = args["team"]
ctx.slot = args["slot"]
ctx.consume_players_package(args["players"])
msgs = []
if ctx.locations_checked:
msgs.append({"cmd": "LocationChecks",
"locations": list(ctx.locations_checked)})
if ctx.locations_scouted:
msgs.append({"cmd": "LocationScouts",
"locations": list(ctx.locations_scouted)})
if msgs:
await ctx.send_msgs(msgs)
if ctx.finished_game:
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": CLientStatus.CLIENT_GOAL}])
# Get the server side view of missing as of time of connecting.
# This list is used to only send to the server what is reported as ACTUALLY Missing.
# This also serves to allow an easy visual of what locations were already checked previously
# when /missing is used for the client side view of what is missing.
ctx.missing_locations = args["missing_locations"]
ctx.checked_locations = args["checked_locations"]
elif cmd == 'ReceivedItems':
start_index = args["index"]
if start_index == 0:
ctx.items_received = []
elif start_index != len(ctx.items_received):
sync_msg = [{'cmd': 'Sync'}]
if ctx.locations_checked:
sync_msg.append({"cmd": "LocationChecks",
"locations": list(ctx.locations_checked)})
await ctx.send_msgs(sync_msg)
if start_index == len(ctx.items_received):
for item in args['items']:
ctx.items_received.append(NetworkItem(*item))
ctx.watcher_event.set()
elif cmd == 'LocationInfo':
for item, location, player in args['locations']:
if location not in ctx.locations_info:
ctx.locations_info[location] = (item, player)
ctx.watcher_event.set()
elif cmd == "RoomUpdate":
if "players" in args:
ctx.consume_players_package(args["players"])
if "hint_points" in args:
ctx.hint_points = args['hint_points']
elif cmd == 'Print':
logger.info(args["text"])
elif cmd == 'PrintJSON':
if not ctx.found_items and args.get("type", None) == "ItemSend" and args["receiving"] == args["sending"]:
pass # don't want info on other player's local pickups.
logger.info(ctx.jsontotextparser(args["data"]))
elif cmd == 'InvalidArguments':
logger.warning(f"Invalid Arguments: {args['text']}")
else:
logger.debug(f"unknown command {cmd}")
async def console_loop(ctx: CommonContext):
import sys
commandprocessor = ctx.command_processor(ctx)
while not ctx.exit_event.is_set():
try:
input_text = await asyncio.get_event_loop().run_in_executor(
None, sys.stdin.readline
)
input_text = input_text.strip()
if ctx.input_requests > 0:
ctx.input_requests -= 1
ctx.input_queue.put_nowait(input_text)
continue
if input_text:
commandprocessor(input_text)
except Exception as e:
logging.exception(e)

187
FactorioClient.py Normal file
View File

@ -0,0 +1,187 @@
import os
import logging
import json
import string
from concurrent.futures import ThreadPoolExecutor
import colorama
import asyncio
from queue import Queue, Empty
from CommonClient import CommonContext, server_loop, console_loop, ClientCommandProcessor
from MultiServer import mark_raw
import Utils
import random
from worlds.factorio.Technologies import lookup_id_to_name
rcon_port = 24242
rcon_password = ''.join(random.choice(string.ascii_letters) for x in range(32))
save_name = "Archipelago"
server_args = (save_name, "--rcon-port", rcon_port, "--rcon-password", rcon_password)
logging.basicConfig(format='[%(name)s]: %(message)s', level=logging.INFO)
options = Utils.get_options()
executable = options["factorio_options"]["executable"]
bin_dir = os.path.dirname(executable)
if not os.path.isdir(bin_dir):
raise FileNotFoundError(bin_dir)
if not os.path.exists(executable):
if os.path.exists(executable + ".exe"):
executable = executable + ".exe"
else:
raise FileNotFoundError(executable)
script_folder = options["factorio_options"]["script-output"]
threadpool = ThreadPoolExecutor(10)
class FactorioCommandProcessor(ClientCommandProcessor):
@mark_raw
def _cmd_factorio(self, text: str) -> bool:
"""Send the following command to the bound Factorio Server."""
if self.ctx.rcon_client:
result = self.ctx.rcon_client.send_command(text)
if result:
self.output(result)
return True
return False
class FactorioContext(CommonContext):
command_processor = FactorioCommandProcessor
def __init__(self, *args, **kwargs):
super(FactorioContext, self).__init__(*args, **kwargs)
self.send_index = 0
self.rcon_client = None
async def server_auth(self, password_requested):
if password_requested and not self.password:
await super(FactorioContext, self).server_auth(password_requested)
if self.auth is None:
logging.info('Enter the name of your slot to join this game:')
self.auth = await self.console_input()
await self.send_msgs([{"cmd": 'Connect',
'password': self.password, 'name': self.auth, 'version': Utils._version_tuple,
'tags': ['AP'],
'uuid': Utils.get_unique_identifier(), 'game': "Factorio"
}])
async def game_watcher(ctx: FactorioContext):
research_logger = logging.getLogger("FactorioWatcher")
researches_done_file = os.path.join(script_folder, "research_done.json")
if os.path.exists(researches_done_file):
os.remove(researches_done_file)
from worlds.factorio.Technologies import lookup_id_to_name, tech_table
try:
while 1:
if os.path.exists(researches_done_file):
research_logger.info("Found Factorio Bridge file.")
while 1:
with open(researches_done_file) as f:
data = json.load(f)
research_data = {int(tech_name.split("-")[1]) for tech_name in data if tech_name.startswith("ap-")}
if ctx.locations_checked != research_data:
research_logger.info(f"New researches done: "
f"{[lookup_id_to_name[rid] for rid in research_data - ctx.locations_checked]}")
ctx.locations_checked = research_data
await ctx.send_msgs([{"cmd": 'LocationChecks', "locations": tuple(research_data)}])
await asyncio.sleep(1)
else:
research_logger.info("Did not find Factorio Bridge file.")
await asyncio.sleep(5)
except Exception as e:
logging.exception(e)
logging.error("Aborted Factorio Server Bridge")
def stream_factorio_output(pipe, queue):
def queuer():
while 1:
text = pipe.readline().strip()
if text:
queue.put_nowait(text)
from threading import Thread
thread = Thread(target=queuer, name="Factorio Output Queue", daemon=True)
thread.start()
async def factorio_server_watcher(ctx: FactorioContext):
import subprocess
import factorio_rcon
factorio_server_logger = logging.getLogger("FactorioServer")
factorio_process = subprocess.Popen((executable, "--start-server", *(str(elem) for elem in server_args)),
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.DEVNULL,
encoding="utf-8")
factorio_server_logger.info("Started Factorio Server")
factorio_queue = Queue()
stream_factorio_output(factorio_process.stdout, factorio_queue)
stream_factorio_output(factorio_process.stderr, factorio_queue)
try:
while 1:
while not factorio_queue.empty():
msg = factorio_queue.get()
factorio_server_logger.info(msg)
if not ctx.rcon_client and "Hosting game at IP ADDR:" in msg:
ctx.rcon_client = factorio_rcon.RCONClient("localhost", rcon_port, rcon_password)
# trigger lua interface confirmation
ctx.rcon_client.send_command("/sc game.print('Starting Archipelago Bridge')")
ctx.rcon_client.send_command("/sc game.print('Starting Archipelago Bridge')")
if ctx.rcon_client:
while ctx.send_index < len(ctx.items_received):
item_id = ctx.items_received[ctx.send_index].item
item_name = lookup_id_to_name[item_id]
factorio_server_logger.info(f"Sending {item_name} to Nauvis.")
response = ctx.rcon_client.send_command(f'/ap-get-technology {item_name}')
if response:
factorio_server_logger.info(response)
ctx.send_index += 1
await asyncio.sleep(1)
except Exception as e:
logging.exception(e)
logging.error("Aborted Factorio Server Bridge")
async def main():
ctx = FactorioContext(None, None, True)
ctx.server_address = "localhost"
ctx.auth = "Berserker"
if ctx.server_task is None:
ctx.server_task = asyncio.create_task(server_loop(ctx), name="ServerLoop")
await asyncio.sleep(3)
watcher_task = asyncio.create_task(game_watcher(ctx), name="FactorioProgressionWatcher")
input_task = asyncio.create_task(console_loop(ctx), name="Input")
factorio_server_task = asyncio.create_task(factorio_server_watcher(ctx), name="FactorioServer")
await ctx.exit_event.wait()
ctx.server_address = None
ctx.snes_reconnect_address = None
await asyncio.gather(watcher_task, input_task, factorio_server_task)
if ctx.server is not None and not ctx.server.socket.closed:
await ctx.server.socket.close()
if ctx.server_task is not None:
await ctx.server_task
await factorio_server_task
while ctx.input_requests > 0:
ctx.input_queue.put_nowait(None)
ctx.input_requests -= 1
await input_task
if __name__ == '__main__':
colorama.init()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
colorama.deinit()

0
Adjuster.py → LttPAdjuster.py Executable file → Normal file
View File

View File

@ -1,5 +1,4 @@
import argparse
import urllib.parse
import atexit
import time
import functools
@ -23,52 +22,70 @@ import ModuleUpdate
ModuleUpdate.update()
import colorama
import prompt_toolkit
from prompt_toolkit.patch_stdout import patch_stdout
from NetUtils import *
import WebUI
from worlds.alttp import Regions, Shops
from worlds.alttp import Items
from worlds import network_data_package
import Utils
# logging note:
# logging.* gets send to only the text console, logger.* gets send to the WebUI as well, if it's initialized.
logger = logging.getLogger("Client")
from CommonClient import CommonContext, server_loop, logger, console_loop, ClientCommandProcessor
def create_named_task(coro, *args, name=None):
if not name:
name = coro.__name__
return asyncio.create_task(coro, *args, name=name)
from MultiServer import mark_raw
class Context():
class LttPCommandProcessor(ClientCommandProcessor):
def _cmd_slow_mode(self, toggle: str = ""):
"""Toggle slow mode, which limits how fast you send / receive items."""
if toggle:
self.ctx.slow_mode = toggle.lower() in {"1", "true", "on"}
else:
self.ctx.slow_mode = not self.ctx.slow_mode
self.output(f"Setting slow mode to {self.ctx.slow_mode}")
def _cmd_web(self):
if self.ctx.webui_socket_port:
webbrowser.open(f'http://localhost:5050?port={self.ctx.webui_socket_port}')
else:
self.output("Web UI was never started.")
@mark_raw
def _cmd_snes(self, snes_address: str = "") -> bool:
"""Connect to a snes. Optionally include network address of a snes to connect to, otherwise show available devices"""
self.ctx.snes_reconnect_address = None
asyncio.create_task(snes_connect(self.ctx, snes_address if snes_address else self.ctx.snes_address))
return True
def _cmd_snes_close(self) -> bool:
"""Close connection to a currently connected snes"""
self.ctx.snes_reconnect_address = None
if self.ctx.snes_socket is not None and not self.ctx.snes_socket.closed:
asyncio.create_task(self.ctx.snes_socket.close())
return True
else:
return False
class Context(CommonContext):
command_processor = LttPCommandProcessor
def __init__(self, snes_address, server_address, password, found_items, port: int):
self.snes_address = snes_address
self.server_address = server_address
super(Context, self).__init__(server_address, password, found_items)
# WebUI Stuff
self.ui_node = WebUI.WebUiClient()
logger.addHandler(self.ui_node)
self.ready = False
self.custom_address = None
self.webui_socket_port: typing.Optional[int] = port
self.hint_cost = 0
self.check_points = 0
self.forfeit_mode = ''
self.remaining_mode = ''
self.hint_points = 0
# End WebUI Stuff
self.exit_event = asyncio.Event()
self.watcher_event = asyncio.Event()
self.input_queue = asyncio.Queue()
self.input_requests = 0
# End of WebUI Stuff
# snes stuff
self.snes_address = snes_address
self.snes_socket = None
self.snes_state = SNESState.SNES_DISCONNECTED
self.snes_attached_device = None
@ -78,79 +95,36 @@ class Context():
self.is_sd2snes = False
self.snes_write_buffer = []
self.server_task = None
self.server: typing.Optional[Endpoint] = None
self.password = password
self.server_version = Version(0, 0, 0)
self.team = None
self.slot = None
self.player_names: typing.Dict[int: str] = {}
self.locations_recognized = set()
self.locations_checked:typing.Set[int] = set()
self.locations_scouted:typing.Set[int] = set()
self.items_received = []
self.missing_locations: typing.List[int] = []
self.checked_locations: typing.List[int] = []
self.locations_info = {}
self.awaiting_rom = False
self.rom = None
self.prev_rom = None
self.auth = None
self.found_items = found_items
self.finished_game = False
self.slow_mode = False
self.jsontotextparser = JSONtoTextParser(self)
self.set_getters(network_data_package)
def set_getters(self, data_package: dict, network=False):
if not network: # local data; check if newer data was already downloaded
local_package = Utils.persistent_load().get("datapackage", {}).get("latest", {})
if local_package and local_package["version"] > network_data_package["version"]:
data_package: dict = local_package
elif network: # check if data from server is newer
for key, value in data_package.items():
if type(value) == dict: # convert to int keys
data_package[key] = \
{int(subkey) if subkey.isdigit() else subkey: subvalue for subkey, subvalue in value.items()}
async def connection_closed(self):
await super(Context, self).connection_closed()
self.awaiting_rom = False
if data_package["version"] > network_data_package["version"]:
Utils.persistent_store("datapackage", "latest", network_data_package)
def event_invalid_slot(self):
if self.snes_socket is not None and not self.snes_socket.closed:
asyncio.create_task(self.snes_socket.close())
raise Exception('Invalid ROM detected, '
'please verify that you have loaded the correct rom and reconnect your snes (/snes)')
item_lookup: dict = data_package["lookup_any_item_id_to_name"]
locations_lookup: dict = data_package["lookup_any_location_id_to_name"]
def get_item_name_from_id(code: int):
return item_lookup.get(code, f'Unknown item (ID:{code})')
self.item_name_getter = get_item_name_from_id
def get_location_name_from_address(address: int):
return locations_lookup.get(address, f'Unknown location (ID:{address})')
self.location_name_getter = get_location_name_from_address
@property
def endpoints(self):
if self.server:
return [self.server]
else:
return []
async def disconnect(self):
if self.server and not self.server.socket.closed:
await self.server.socket.close()
self.ui_node.send_connection_status(self)
if self.server_task is not None:
await self.server_task
async def send_msgs(self, msgs):
if not self.server or not self.server.socket.open or self.server.socket.closed:
async def server_auth(self, password_requested):
if password_requested and not self.password:
await super(Context, self).server_auth(password_requested)
if self.rom is None:
self.awaiting_rom = True
logger.info(
'No ROM detected, awaiting snes connection to authenticate to the multiworld server (/snes)')
return
await self.server.socket.send(encode(msgs))
def consume_players_package(self, package: typing.List[tuple]):
self.player_names = {slot: name for team, slot, name, orig_name in package if self.team == team}
self.awaiting_rom = False
self.auth = self.rom
auth = base64.b64encode(self.rom).decode()
await self.send_msgs([{"cmd": 'Connect',
'password': self.password, 'name': auth, 'version': Utils._version_tuple,
'tags': get_tags(self),
'uuid': Utils.get_unique_identifier(), 'game': "A Link to the Past"
}])
def color_item(item_id: int, green: bool = False) -> str:
@ -161,9 +135,7 @@ def color_item(item_id: int, green: bool = False) -> str:
return color(item_name, *item_colors)
START_RECONNECT_DELAY = 5
SNES_RECONNECT_DELAY = 5
SERVER_RECONNECT_DELAY = 5
ROM_START = 0x000000
WRAM_START = 0xF50000
@ -463,7 +435,7 @@ class SNESState(enum.IntEnum):
def launch_qusb2snes(ctx: Context):
qusb2snes_path = Utils.get_options()["general_options"]["qusb2snes"]
qusb2snes_path = Utils.get_options()["lttp_options"]["qusb2snes"]
if not os.path.isfile(qusb2snes_path):
qusb2snes_path = Utils.local_path(qusb2snes_path)
@ -580,7 +552,7 @@ async def snes_connect(ctx: Context, address):
ctx.snes_reconnect_address = address
recv_task = asyncio.create_task(snes_recv_loop(ctx))
SNES_RECONNECT_DELAY = START_RECONNECT_DELAY
SNES_RECONNECT_DELAY = ctx.starting_reconnect_delay
except Exception as e:
if recv_task is not None:
@ -754,385 +726,14 @@ async def snes_flush_writes(ctx: Context):
await snes_write(ctx, writes)
async def server_loop(ctx: Context, address=None):
global SERVER_RECONNECT_DELAY
ctx.ui_node.send_connection_status(ctx)
cached_address = None
if ctx.server and ctx.server.socket:
logger.error('Already connected')
return
if address is None: # set through CLI or APBP
address = ctx.server_address
if address is None: # see if this is an old connection
await asyncio.sleep(0.5) # wait for snes connection to succeed if possible.
rom = ctx.rom if ctx.rom else None
if rom:
servers = Utils.persistent_load()["servers"]
if rom in servers:
address = servers[rom]
cached_address = True
# Wait for the user to provide a multiworld server address
if not address:
logger.info('Please connect to a multiworld server.')
ctx.ui_node.poll_for_server_ip()
return
address = f"ws://{address}" if "://" not in address else address
port = urllib.parse.urlparse(address).port or 38281
logger.info('Connecting to multiworld server at %s' % address)
try:
socket = await websockets.connect(address, port=port, ping_timeout=None, ping_interval=None)
ctx.server = Endpoint(socket)
logger.info('Connected')
ctx.server_address = address
ctx.ui_node.send_connection_status(ctx)
SERVER_RECONNECT_DELAY = START_RECONNECT_DELAY
async for data in ctx.server.socket:
for msg in decode(data):
await process_server_cmd(ctx, msg)
logger.warning('Disconnected from multiworld server, type /connect to reconnect')
except WebUI.WaitingForUiException:
pass
except ConnectionRefusedError:
if cached_address:
logger.error('Unable to connect to multiworld server at cached address. '
'Please use the connect button above.')
else:
logger.error('Connection refused by the multiworld server')
except (OSError, websockets.InvalidURI):
logger.error('Failed to connect to the multiworld server')
except Exception as e:
logger.error('Lost connection to the multiworld server, type /connect to reconnect')
if not isinstance(e, websockets.WebSocketException):
logger.exception(e)
finally:
ctx.awaiting_rom = False
ctx.auth = None
ctx.items_received = []
ctx.locations_info = {}
ctx.server_version = Version(0, 0, 0)
if ctx.server and ctx.server.socket is not None:
await ctx.server.socket.close()
ctx.server = None
ctx.server_task = None
if ctx.server_address:
logger.info(f"... reconnecting in {SERVER_RECONNECT_DELAY}s")
ctx.ui_node.send_connection_status(ctx)
asyncio.create_task(server_autoreconnect(ctx))
SERVER_RECONNECT_DELAY *= 2
async def server_autoreconnect(ctx: Context):
# unfortunately currently broken. See: https://github.com/prompt-toolkit/python-prompt-toolkit/issues/1033
# with prompt_toolkit.shortcuts.ProgressBar() as pb:
# for _ in pb(range(100)):
# await asyncio.sleep(RECONNECT_DELAY/100)
await asyncio.sleep(SERVER_RECONNECT_DELAY)
if ctx.server_address and ctx.server_task is None:
ctx.server_task = asyncio.create_task(server_loop(ctx))
async def process_server_cmd(ctx: Context, args: dict):
try:
cmd = args["cmd"]
except:
logger.exception(f"Could not get command from {args}")
raise
if cmd == 'RoomInfo':
logger.info('--------------------------------')
logger.info('Room Information:')
logger.info('--------------------------------')
version = args["version"]
ctx.server_version = tuple(version)
version = ".".join(str(item) for item in version)
logger.info(f'Server protocol version: {version}')
logger.info("Server protocol tags: " + ", ".join(args["tags"]))
if args['password']:
logger.info('Password required')
logging.info(f"Forfeit setting: {args['forfeit_mode']}")
logging.info(f"Remaining setting: {args['remaining_mode']}")
logging.info(f"A !hint costs {args['hint_cost']} points and you get {args['location_check_points']}"
f" for each location checked.")
ctx.hint_cost = int(args['hint_cost'])
ctx.check_points = int(args['location_check_points'])
ctx.forfeit_mode = args['forfeit_mode']
ctx.remaining_mode = args['remaining_mode']
ctx.ui_node.send_game_info(ctx)
if len(args['players']) < 1:
logger.info('No player connected')
else:
args['players'].sort()
current_team = -1
logger.info('Players:')
for team, slot, name in args['players']:
if team != current_team:
logger.info(f' Team #{team + 1}')
current_team = team
logger.info(' %s (Player %d)' % (name, slot))
if args["datapackage_version"] > network_data_package["version"]:
await ctx.send_msgs([{"cmd": "GetDataPackage"}])
await server_auth(ctx, args['password'])
elif cmd == 'DataPackage':
ctx.set_getters(args['data'], network=True)
elif cmd == 'ConnectionRefused':
errors = args["errors"]
if 'InvalidSlot' in errors:
if ctx.snes_socket is not None and not ctx.snes_socket.closed:
asyncio.create_task(ctx.snes_socket.close())
raise Exception('Invalid ROM detected, '
'please verify that you have loaded the correct rom and reconnect your snes (/snes)')
elif 'SlotAlreadyTaken' in errors:
Utils.persistent_store("servers", ctx.rom, ctx.server_address)
raise Exception('Player slot already in use for that team')
elif 'IncompatibleVersion' in errors:
raise Exception('Server reported your client version as incompatible')
# last to check, recoverable problem
elif 'InvalidPassword' in errors:
logger.error('Invalid password')
ctx.password = None
await server_auth(ctx, True)
else:
raise Exception("Unknown connection errors: " + str(errors))
raise Exception('Connection refused by the multiworld host, no reason provided')
elif cmd == 'Connected':
Utils.persistent_store("servers", ctx.rom, ctx.server_address)
ctx.team = args["team"]
ctx.slot = args["slot"]
ctx.consume_players_package(args["players"])
msgs = []
if ctx.locations_checked:
msgs.append({"cmd": "LocationChecks",
"locations": list(ctx.locations_checked)})
if ctx.locations_scouted:
msgs.append({"cmd": "LocationScouts",
"locations": list(ctx.locations_scouted)})
if msgs:
await ctx.send_msgs(msgs)
if ctx.finished_game:
await send_finished_game(ctx)
# Get the server side view of missing as of time of connecting.
# This list is used to only send to the server what is reported as ACTUALLY Missing.
# This also serves to allow an easy visual of what locations were already checked previously
# when /missing is used for the client side view of what is missing.
ctx.missing_locations = args["missing_locations"]
ctx.checked_locations = args["checked_locations"]
elif cmd == 'ReceivedItems':
start_index = args["index"]
if start_index == 0:
ctx.items_received = []
elif start_index != len(ctx.items_received):
sync_msg = [{'cmd': 'Sync'}]
if ctx.locations_checked:
sync_msg.append({"cmd": "LocationChecks",
"locations": list(ctx.locations_checked)})
await ctx.send_msgs(sync_msg)
if start_index == len(ctx.items_received):
for item in args['items']:
ctx.items_received.append(NetworkItem(*item))
ctx.watcher_event.set()
elif cmd == 'LocationInfo':
for item, location, player in args['locations']:
if location not in ctx.locations_info:
ctx.locations_info[location] = (item, player)
ctx.watcher_event.set()
elif cmd == "RoomUpdate":
if "players" in args:
ctx.consume_players_package(args["players"])
if "hint_points" in args:
ctx.hint_points = args['hint_points']
elif cmd == 'Print':
logger.info(args["text"])
elif cmd == 'PrintJSON':
if not ctx.found_items and args.get("type", None) == "ItemSend" and args["receiving"] == args["sending"]:
pass # don't want info on other player's local pickups.
logger.info(ctx.jsontotextparser(args["data"]))
elif cmd == 'InvalidArguments':
logger.warning(f"Invalid Arguments: {args['text']}")
else:
logger.debug(f"unknown command {cmd}")
# kept as function for easier wrapping by plugins
def get_tags(ctx: Context):
tags = ['AP']
return tags
async def server_auth(ctx: Context, password_requested):
if password_requested and not ctx.password:
logger.info('Enter the password required to join this game:')
ctx.password = await console_input(ctx)
if ctx.rom is None:
ctx.awaiting_rom = True
logger.info(
'No ROM detected, awaiting snes connection to authenticate to the multiworld server (/snes)')
return
ctx.awaiting_rom = False
ctx.auth = ctx.rom
auth = base64.b64encode(ctx.rom).decode()
await ctx.send_msgs([{"cmd": 'Connect',
'password': ctx.password, 'name': auth, 'version': Utils._version_tuple,
'tags': get_tags(ctx),
'uuid': Utils.get_unique_identifier(), 'game': "A Link to the Past"
}])
async def console_input(ctx: Context):
ctx.input_requests += 1
return await ctx.input_queue.get()
async def connect(ctx: Context, address=None):
await ctx.disconnect()
ctx.server_task = asyncio.create_task(server_loop(ctx, address))
from MultiServer import CommandProcessor, mark_raw
class ClientCommandProcessor(CommandProcessor):
def __init__(self, ctx: Context):
self.ctx = ctx
def output(self, text: str):
logger.info(text)
def _cmd_exit(self) -> bool:
"""Close connections and client"""
self.ctx.exit_event.set()
return True
@mark_raw
def _cmd_snes(self, snes_address: str = "") -> bool:
"""Connect to a snes. Optionally include network address of a snes to connect to, otherwise show available devices"""
self.ctx.snes_reconnect_address = None
asyncio.create_task(snes_connect(self.ctx, snes_address if snes_address else self.ctx.snes_address))
return True
def _cmd_snes_close(self) -> bool:
"""Close connection to a currently connected snes"""
self.ctx.snes_reconnect_address = None
if self.ctx.snes_socket is not None and not self.ctx.snes_socket.closed:
asyncio.create_task(self.ctx.snes_socket.close())
return True
else:
return False
def _cmd_connect(self, address: str = "") -> bool:
"""Connect to a MultiWorld Server"""
self.ctx.server_address = None
asyncio.create_task(connect(self.ctx, address if address else None))
return True
def _cmd_disconnect(self) -> bool:
"""Disconnect from a MultiWorld Server"""
self.ctx.server_address = None
asyncio.create_task(self.ctx.disconnect())
return True
def _cmd_received(self) -> bool:
"""List all received items"""
logger.info('Received items:')
for index, item in enumerate(self.ctx.items_received, 1):
self.ctx.ui_node.notify_item_received(self.ctx.player_names[item.player], self.ctx.item_name_getter(item.item),
self.ctx.location_name_getter(item.location), index,
len(self.ctx.items_received),
self.ctx.item_name_getter(item.item) in Items.progression_items)
logging.info('%s from %s (%s) (%d/%d in list)' % (
color(self.ctx.item_name_getter(item.item), 'red', 'bold'),
color(self.ctx.player_names[item.player], 'yellow'),
self.ctx.location_name_getter(item.location), index, len(self.ctx.items_received)))
return True
def _cmd_missing(self) -> bool:
"""List all missing location checks, from your local game state"""
count = 0
checked_count = 0
for location, location_id in Regions.lookup_name_to_id.items():
if location_id < 0:
continue
if location_id not in self.ctx.locations_checked:
if location_id in self.ctx.missing_locations:
self.output('Missing: ' + location)
count += 1
elif location_id in self.ctx.checked_locations:
self.output('Checked: ' + location)
count += 1
checked_count += 1
if count:
self.output(
f"Found {count} missing location checks{f'. {checked_count} locations checks previously visited.' if checked_count else ''}")
else:
self.output("No missing location checks found.")
return True
def _cmd_slow_mode(self, toggle: str = ""):
"""Toggle slow mode, which limits how fast you send / receive items."""
if toggle:
self.ctx.slow_mode = toggle.lower() in {"1", "true", "on"}
else:
self.ctx.slow_mode = not self.ctx.slow_mode
self.output(f"Setting slow mode to {self.ctx.slow_mode}")
def _cmd_web(self):
if self.ctx.webui_socket_port:
webbrowser.open(f'http://localhost:5050?port={self.ctx.webui_socket_port}')
else:
self.output("Web UI was never started.")
def _cmd_ready(self):
self.ctx.ready = not self.ctx.ready
if self.ctx.ready:
state = CLientStatus.CLIENT_READY
self.output("Readied up.")
else:
state = CLientStatus.CLIENT_CONNECTED
self.output("Unreadied.")
asyncio.create_task(self.ctx.send_msgs([{"cmd": "StatusUpdate", "status": state}]))
def default(self, raw: str):
asyncio.create_task(self.ctx.send_msgs([{"cmd": "Say", "text": raw}]))
async def console_loop(ctx: Context):
session = prompt_toolkit.PromptSession()
commandprocessor = ClientCommandProcessor(ctx)
while not ctx.exit_event.is_set():
try:
with patch_stdout():
input_text = await session.prompt_async()
if ctx.input_requests > 0:
ctx.input_requests -= 1
ctx.input_queue.put_nowait(input_text)
continue
if not input_text:
continue
commandprocessor(input_text)
except Exception as e:
logging.exception(e)
await snes_flush_writes(ctx)
async def track_locations(ctx: Context, roomid, roomdata):
new_locations = []
@ -1218,9 +819,6 @@ async def track_locations(ctx: Context, roomid, roomdata):
await ctx.send_msgs([{"cmd": 'LocationChecks', "locations": new_locations}])
async def send_finished_game(ctx: Context):
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": CLientStatus.CLIENT_GOAL}])
async def game_watcher(ctx: Context):
prev_game_timer = 0
perf_counter = time.perf_counter()
@ -1244,7 +842,7 @@ async def game_watcher(ctx: Context):
ctx.prev_rom = ctx.rom
if ctx.awaiting_rom:
await server_auth(ctx, False)
await ctx.server_auth(False)
if ctx.auth and ctx.auth != ctx.rom:
logger.warning("ROM change detected, please reconnect to the multiworld server")
@ -1260,7 +858,7 @@ async def game_watcher(ctx: Context):
delay = 7 if ctx.slow_mode else 2
if gameend[0]:
if not ctx.finished_game:
await send_finished_game(ctx)
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
ctx.finished_game = True
if time.perf_counter() - perf_counter < delay:
@ -1314,7 +912,7 @@ async def game_watcher(ctx: Context):
async def run_game(romfile):
auto_start = Utils.get_options()["general_options"].get("rom_start", True)
auto_start = Utils.get_options()["lttp_options"].get("rom_start", True)
if auto_start is True:
import webbrowser
webbrowser.open(romfile)
@ -1326,7 +924,7 @@ async def run_game(romfile):
async def websocket_server(websocket: websockets.WebSocketServerProtocol, path, ctx: Context):
endpoint = Endpoint(websocket)
ctx.ui_node.endpoints.append(endpoint)
process_command = ClientCommandProcessor(ctx)
process_command = LttPCommandProcessor(ctx)
try:
async for incoming_data in websocket:
data = loads(incoming_data)
@ -1347,7 +945,7 @@ async def websocket_server(websocket: websockets.WebSocketServerProtocol, path,
elif data['type'] == 'webConfig':
if 'serverAddress' in data['content']:
ctx.server_address = data['content']['serverAddress']
await connect(ctx, data['content']['serverAddress'])
await ctx.connect(data['content']['serverAddress'])
elif 'deviceId' in data['content']:
# Allow a SNES disconnect via UI sending -1 as new device
if data['content']['deviceId'] == "-1":
@ -1416,16 +1014,16 @@ async def main():
port, on_start=threading.Timer(1, webbrowser.open, (f'http://localhost:5050?port={port}',)).start)
ctx = Context(args.snes, args.connect, args.password, args.founditems, port)
input_task = create_named_task(console_loop(ctx), name="Input")
input_task = asyncio.create_task(console_loop(ctx), name="Input")
if not args.disable_web_ui:
ui_socket = websockets.serve(functools.partial(websocket_server, ctx=ctx),
'localhost', port, ping_timeout=None, ping_interval=None)
await ui_socket
if ctx.server_task is None:
ctx.server_task = create_named_task(server_loop(ctx), name="ServerLoop")
ctx.server_task = asyncio.create_task(server_loop(ctx), name="ServerLoop")
watcher_task = create_named_task(game_watcher(ctx), name="GameWatcher")
watcher_task = asyncio.create_task(game_watcher(ctx), name="GameWatcher")
await ctx.exit_event.wait()
ctx.server_address = None

28
Main.py
View File

@ -25,6 +25,8 @@ from worlds.alttp.ItemPool import generate_itempool, difficulties, fill_prizes
from Utils import output_path, parse_player_names, get_options, __version__, _version_tuple
from worlds.hk import gen_hollow, set_rules as set_hk_rules
from worlds.hk import create_regions as hk_create_regions
from worlds.factorio import gen_factorio, factorio_create_regions
from worlds.factorio.Mod import generate_mod
from worlds.generic.Rules import locality_rules
from worlds import Games
import Patch
@ -74,6 +76,7 @@ def main(args, seed=None):
world.timer = args.timer.copy()
world.progressive = args.progressive.copy()
world.goal = args.goal.copy()
world.local_items = args.local_items.copy()
if hasattr(args, "algorithm"): # current GUI options
world.algorithm = args.algorithm
world.shuffleganon = args.shuffleganon
@ -202,6 +205,9 @@ def main(args, seed=None):
for player in world.hk_player_ids:
hk_create_regions(world, player)
for player in world.factorio_player_ids:
factorio_create_regions(world, player)
for player in world.alttp_player_ids:
if world.open_pyramid[player] == 'goal':
world.open_pyramid[player] = world.goal[player] in {'crystals', 'ganontriforcehunt', 'localganontriforcehunt', 'ganonpedestal'}
@ -251,14 +257,16 @@ def main(args, seed=None):
if world.players > 1:
for player in world.player_ids:
locality_rules(world, player)
for player in world.alttp_player_ids:
set_rules(world, player)
logger.info("Doing Hollow Knight things")
for player in world.hk_player_ids:
gen_hollow(world, player)
for player in world.factorio_player_ids:
gen_factorio(world, player)
logger.info("Running Item Plando")
for item in world.itempool:
@ -405,10 +413,12 @@ def main(args, seed=None):
if not args.suppress_rom:
rom_futures = []
mod_futures = []
for team in range(world.teams):
for player in world.alttp_player_ids:
rom_futures.append(pool.submit(_gen_rom, team, player))
for player in world.factorio_player_ids:
mod_futures.append(pool.submit(generate_mod, world, player))
def get_entrance_to_region(region: Region):
for entrance in region.entrances:
@ -440,7 +450,7 @@ def main(args, seed=None):
for location in [loc for loc in world.get_filled_locations() if type(loc.address) is int]:
main_entrance = get_entrance_to_region(location.parent_region)
if location.game == Games.HK:
if location.game != Games.LTTP:
checks_in_area[location.player]["Light World"].append(location.address)
elif location.parent_region.dungeon:
dungeonname = {'Inverted Agahnims Tower': 'Agahnims Tower',
@ -477,7 +487,7 @@ def main(args, seed=None):
FillDisabledShopSlots(world)
def write_multidata(roms):
def write_multidata(roms, mods):
import base64
for future in roms:
rom_name = future.result()
@ -488,13 +498,13 @@ def main(args, seed=None):
for i, team in enumerate(parsed_names):
for player, name in enumerate(team, 1):
if player in world.hk_player_ids:
if player not in world.alttp_player_ids:
connect_names[name] = (i, player)
multidata = zlib.compress(pickle.dumps({"names": parsed_names,
"connect_names": connect_names,
"remote_items": {player for player in range(1, world.players + 1) if
world.remote_items[player] or
world.game[player] == "Hollow Knight"},
world.game[player] != "Hollow Knight"},
"locations": {
(location.address, location.player):
(location.item.code, location.item.player)
@ -512,8 +522,10 @@ def main(args, seed=None):
with open(output_path('%s.archipelago' % outfilebase), 'wb') as f:
f.write(bytes([1])) # version of format
f.write(multidata)
for future in mods:
future.result() # collect errors if they occured
multidata_task = pool.submit(write_multidata, rom_futures)
multidata_task = pool.submit(write_multidata, rom_futures, mod_futures)
if not check_accessibility_task.result():
if not world.can_beat_game():
raise Exception("Game appears as unbeatable. Aborting.")

View File

@ -15,7 +15,8 @@ def update_command():
naming_specialties = {"PyYAML": "yaml", # PyYAML is imported as the name yaml
"maseya-z3pr": "maseya"}
"maseya-z3pr": "maseya",
"factorio-rcon-py": "factorio_rcon"}
def update():

View File

@ -48,7 +48,7 @@ if __name__ == "__main__":
weights_file_path = multi_mystery_options["weights_file_path"]
pre_roll = multi_mystery_options["pre_roll"]
teams = multi_mystery_options["teams"]
rom_file = options["general_options"]["rom_file"]
rom_file = options["lttp_options"]["rom_file"]
host = options["server_options"]["host"]
port = options["server_options"]["port"]

View File

@ -31,7 +31,7 @@ from worlds import network_data_package, lookup_any_item_id_to_name, lookup_any_
import Utils
from Utils import get_item_name_from_id, get_location_name_from_address, \
_version_tuple, restricted_loads, Version
from NetUtils import Node, Endpoint, CLientStatus, NetworkItem, decode
from NetUtils import Node, Endpoint, ClientStatus, NetworkItem, decode
colorama.init()
lttp_console_names = frozenset(set(Items.item_table) | set(Items.item_name_groups) | set(Regions.lookup_name_to_id))
@ -370,7 +370,7 @@ async def on_client_disconnected(ctx: Context, client: Client):
async def on_client_joined(ctx: Context, client: Client):
update_client_status(ctx, client, CLientStatus.CLIENT_CONNECTED)
update_client_status(ctx, client, ClientStatus.CLIENT_CONNECTED)
version_str = '.'.join(str(x) for x in client.version)
ctx.notify_all(
f"{ctx.get_aliased_name(client.team, client.slot)} (Team #{client.team + 1}) has joined the game. "
@ -379,7 +379,7 @@ async def on_client_joined(ctx: Context, client: Client):
ctx.client_connection_timers[client.team, client.slot] = datetime.datetime.now(datetime.timezone.utc)
async def on_client_left(ctx: Context, client: Client):
update_client_status(ctx, client, CLientStatus.CLIENT_UNKNOWN)
update_client_status(ctx, client, ClientStatus.CLIENT_UNKNOWN)
ctx.notify_all("%s (Team #%d) has left the game" % (ctx.get_aliased_name(client.team, client.slot), client.team + 1))
ctx.client_connection_timers[client.team, client.slot] = datetime.datetime.now(datetime.timezone.utc)
@ -486,7 +486,7 @@ def notify_team(ctx: Context, team: int, text: str):
def collect_hints(ctx: Context, team: int, slot: int, item: str) -> typing.List[NetUtils.Hint]:
hints = []
seeked_item_id = Items.item_table[item][2]
seeked_item_id = lookup_any_item_name_to_id[item]
for check, result in ctx.locations.items():
item_id, receiving_player = result
if receiving_player == slot and item_id == seeked_item_id:
@ -514,7 +514,7 @@ def collect_hints_location(ctx: Context, team: int, slot: int, location: str) ->
def format_hint(ctx: Context, team: int, hint: NetUtils.Hint) -> str:
text = f"[Hint]: {ctx.player_names[team, hint.receiving_player]}'s " \
f"{Items.lookup_id_to_name[hint.item]} is " \
f"{lookup_any_item_id_to_name[hint.item]} is " \
f"at {get_location_name_from_address(hint.location)} " \
f"in {ctx.player_names[team, hint.finding_player]}'s World"
@ -750,7 +750,7 @@ class ClientMessageProcessor(CommonCommandProcessor):
"Sorry, client forfeiting has been disabled on this server. You can ask the server admin for a /forfeit")
return False
else: # is auto or goal
if self.ctx.client_game_state[self.client.team, self.client.slot] == CLientStatus.CLIENT_GOAL:
if self.ctx.client_game_state[self.client.team, self.client.slot] == ClientStatus.CLIENT_GOAL:
forfeit_player(self.ctx, self.client.team, self.client.slot)
return True
else:
@ -774,7 +774,7 @@ class ClientMessageProcessor(CommonCommandProcessor):
"Sorry, !remaining has been disabled on this server.")
return False
else: # is goal
if self.ctx.client_game_state[self.client.team, self.client.slot] == CLientStatus.CLIENT_GOAL:
if self.ctx.client_game_state[self.client.team, self.client.slot] == ClientStatus.CLIENT_GOAL:
remaining_item_ids = get_remaining(self.ctx, self.client.team, self.client.slot)
if remaining_item_ids:
self.output("Remaining items: " + ", ".join(Items.lookup_id_to_name.get(item_id, "unknown item")
@ -859,7 +859,7 @@ class ClientMessageProcessor(CommonCommandProcessor):
hints = []
for item in Items.item_name_groups[item_name]:
hints.extend(collect_hints(self.ctx, self.client.team, self.client.slot, item))
elif item_name in Items.item_table: # item name
elif item_name in lookup_any_item_name_to_id: # item name
hints = collect_hints(self.ctx, self.client.team, self.client.slot, item_name)
else: # location name
hints = collect_hints_location(self.ctx, self.client.team, self.client.slot, item_name)
@ -1056,10 +1056,10 @@ async def process_client_cmd(ctx: Context, client: Client, args: dict):
client.messageprocessor(args["text"])
def update_client_status(ctx: Context, client: Client, new_status: CLientStatus):
def update_client_status(ctx: Context, client: Client, new_status: ClientStatus):
current = ctx.client_game_state[client.team, client.slot]
if current != CLientStatus.CLIENT_GOAL: # can't undo goal completion
if new_status == CLientStatus.CLIENT_GOAL:
if current != ClientStatus.CLIENT_GOAL: # can't undo goal completion
if new_status == ClientStatus.CLIENT_GOAL:
finished_msg = f'{ctx.get_aliased_name(client.team, client.slot)} (Team #{client.team + 1}) has completed their goal.'
ctx.notify_all(finished_msg)
if "auto" in ctx.forfeit_mode:
@ -1219,7 +1219,10 @@ class ServerCommandProcessor(CommonCommandProcessor):
hints = collect_hints(self.ctx, team, slot, item)
else: # location name
hints = collect_hints_location(self.ctx, team, slot, item)
notify_hints(self.ctx, team, hints)
if hints:
notify_hints(self.ctx, team, hints)
else:
self.output("No hints found.")
return True
else:
self.output(response)
@ -1336,7 +1339,8 @@ async def auto_shutdown(ctx, to_cancel=None):
async def main(args: argparse.Namespace):
logging.basicConfig(format='[%(asctime)s] %(message)s', level=getattr(logging, args.loglevel.upper(), logging.INFO))
logging.basicConfig(force = True,
format='[%(asctime)s] %(message)s', level=getattr(logging, args.loglevel.upper(), logging.INFO))
ctx = Context(args.host, args.port, args.server_password, args.password, args.location_check_points,
args.hint_cost, not args.disable_item_cheat, args.forfeit_mode, args.remaining_mode,

View File

@ -473,6 +473,8 @@ def roll_settings(weights: dict, plando_options: typing.Set[str] = frozenset(("b
elif ret.game == "Hollow Knight":
for option_name, option in Options.hollow_knight_options.items():
setattr(ret, option_name, option.from_any(get_choice(option_name, weights, True)))
elif ret.game == "Factorio":
pass
else:
raise Exception(f"Unsupported game {ret.game}")
return ret

View File

@ -19,7 +19,7 @@ class JSONMessagePart(typing.TypedDict, total=False):
class CLientStatus(enum.IntEnum):
class ClientStatus(enum.IntEnum):
CLIENT_UNKNOWN = 0
CLIENT_CONNECTED = 5
CLIENT_READY = 10

View File

@ -18,7 +18,7 @@ current_patch_version = 1
def get_base_rom_path(file_name: str = "") -> str:
options = Utils.get_options()
if not file_name:
file_name = options["general_options"]["rom_file"]
file_name = options["lttp_options"]["rom_file"]
if not os.path.exists(file_name):
file_name = Utils.local_path(file_name)
return file_name

View File

@ -12,7 +12,7 @@ class Version(typing.NamedTuple):
minor: int
build: int
__version__ = "0.0.1"
__version__ = "0.0.2"
_version_tuple = tuplize_version(__version__)
import builtins
@ -162,10 +162,17 @@ def get_default_options() -> dict:
# Refer to host.yaml for comments as to what all these options mean.
options = {
"general_options": {
"output_path": "output",
},
"factorio_options": {
"executable": "factorio\\bin\\x64\\factorio",
"script-output": "factorio\\script-output",
},
"lttp_options": {
"rom_file": "Zelda no Densetsu - Kamigami no Triforce (Japan).sfc",
"qusb2snes": "QUsb2Snes\\QUsb2Snes.exe",
"rom_start": True,
"output_path": "output",
},
"server_options": {
"host": None,

View File

@ -1,7 +1,7 @@
flask>=1.1.2
pony>=0.7.14
waitress>=1.4.4
flask-caching>=1.9.0
waitress>=2.0.0
flask-caching>=1.10.1
Flask-Autoversion>=0.2.0
Flask-Compress>=1.9.0
Flask-Limiter>=1.4

View File

@ -10,7 +10,7 @@ import asyncio
from functools import partial
from NetUtils import Node
from MultiClient import Context
from LttPClient import Context
import Utils
@ -116,10 +116,6 @@ class WebUiClient(Node, logging.Handler):
}))
class WaitingForUiException(Exception):
pass
web_thread = None
PORT = 5050

View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2021 Berserker55
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,108 @@
-- for testing
script.on_event(defines.events.on_tick, function(event)
if event.tick%600 == 0 then
dumpTech()
end
end)
-- hook into researches done
script.on_event(defines.events.on_research_finished, function(event)
game.print("Research done")
dumpTech()
end)
function dumpTech()
local force = game.forces["player"]
local data_collection = {}
for tech_name, tech in pairs(force.technologies) do
if tech.researched and string.find(tech_name, "ap-") == 1 then
data_collection[tech_name] = tech.researched
end
end
game.write_file("research_done.json", game.table_to_json(data_collection), false)
-- game.write_file("research_done.json", game.table_to_json(data_collection), false, 0)
-- game.print("Sent progress to Archipelago.")
end
function dumpGameInfo()
-- dump Game Information that the Archipelago Randomizer needs.
local data_collection = {}
local force = game.forces["player"]
for tech_name, tech in pairs(force.technologies) do
if tech.enabled then
local tech_data = {}
local unlocks = {}
tech_data["unlocks"] = unlocks
local requires = {}
tech_data["requires"] = requires
local ingredients = {}
tech_data["ingredients"] = ingredients
for tech_requirement, _ in pairs(tech.prerequisites) do
table.insert(requires, tech_requirement)
end
for _, modifier in pairs(tech.effects) do
if modifier.type == "unlock-recipe" then
table.insert(unlocks, modifier.recipe)
end
end
for _, ingredient in pairs(tech.research_unit_ingredients) do
table.insert(ingredients, ingredient.name)
end
data_collection[tech_name] = tech_data
end
game.write_file("techs.json", game.table_to_json(data_collection), false)
game.print("Exported Tech Data")
end
data_collection = {}
for recipe_name, recipe in pairs(force.recipes) do
local recipe_data = {}
recipe_data["ingredients"] = {}
recipe_data["products"] = {}
recipe_data["category"] = recipe.category
for _, ingredient in pairs(recipe.ingredients) do
table.insert(recipe_data["ingredients"], ingredient.name)
end
for _, product in pairs(recipe.products) do
table.insert(recipe_data["products"], product.name)
end
data_collection[recipe_name] = recipe_data
end
game.write_file("recipes.json", game.table_to_json(data_collection), false)
game.print("Exported Recipe Data")
-- data.raw can't be accessed from control.lua, need to find a better method
-- data_collection = {}
-- for machine_name, machine in pairs(data.raw["assembling_machine"]) do
-- local machine_data = {}
-- machine_data["categories"] = table.deepcopy(machine.crafting_categories)
-- data_collection[machine.name] = machine_data
-- end
-- game.write_file("machines.json", game.table_to_json(data_collection), false)
-- game.print("Exported Machine Data")
end
-- add / commands
commands.add_command("ap-get-info-dump", "Dump Game Info, used by Archipelago.", function(call)
dumpGameInfo()
end)
commands.add_command("ap-sync", "Run manual Research Sync with Archipelago.", function(call)
dumpTech()
end)
commands.add_command("ap-get-technology", "Grant a technology, used by the Archipelago Client.", function(call)
local force = game.forces["player"]
local tech_name = call.parameter
local tech = force.technologies[tech_name]
if tech ~= nil then
if tech.researched ~= true then
tech.researched = true
game.print({"", "Received ", tech.localised_name, " from Archipelago"})
game.play_sound({path="utility/research_completed"})
end
else
game.print("Unknown Technology " .. tech_name)
end
end)

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 264 KiB

View File

@ -0,0 +1,9 @@
{
"name": "archipelago-client",
"version": "0.0.1",
"title": "Archipelago",
"author": "Berserker",
"homepage": "https://archipelago.gg",
"description": "Integration client for the Archipelago Randomizer",
"factorio_version": "1.1"
}

View File

@ -0,0 +1,34 @@
-- this file gets written automatically by the Archipelago Randomizer and is in its raw form a Jinja2 Template
local technologies = data.raw["technology"]
local original_tech
local new_tree_copy
local template_tech = table.deepcopy(technologies["automation"])
{#- ensure the copy unlocks nothing #}
template_tech.unlocks = {}
template_tech.upgrade = false
template_tech.effects = {}
{# each randomized tech gets set to be invisible, with new nodes added that trigger those #}
{%- for original_tech_name, item_name, receiving_player in locations %}
original_tech = technologies["{{original_tech_name}}"]
{#- the tech researched by the local player #}
new_tree_copy = table.deepcopy(template_tech)
new_tree_copy.name = "ap-{{ tech_table[original_tech_name] }}-"{# use AP ID #}
{#- hide and disable original tech; which will be shown, unlocked and enabled by AP Client #}
original_tech.enabled = false
{#- copy original tech costs #}
new_tree_copy.unit = table.deepcopy(original_tech.unit)
{% if item_name in tech_table %}
{#- copy Factorio Technology Icon #}
new_tree_copy.icon = table.deepcopy(technologies["{{ item_name }}"].icon)
new_tree_copy.icons = table.deepcopy(technologies["{{ item_name }}"].icons)
new_tree_copy.icon_size = table.deepcopy(technologies["{{ item_name }}"].icon_size)
{% else %}
{#- use default AP icon if no Factorio graphics exist #}
new_tree_copy.icon = "__{{ mod_name }}__/graphics/icons/ap.png"
new_tree_copy.icons = nil
new_tree_copy.icon_size = 512
{% endif %}
{#- add new technology to game #}
data:extend{new_tree_copy}
{% endfor %}

View File

@ -0,0 +1,8 @@
[technology-name]
{% for original_tech_name, item_name, receiving_player in locations %}
ap-{{ tech_table[original_tech_name] }}-={{ player_names[receiving_player] }}'s {{ item_name }}
{% endfor %}
[technology-description]
{% for original_tech_name, item_name, receiving_player in locations %}
"ap-{{ tech_table[original_tech_name] }}-=Researching this technology sends {{ item_name }} to {{ player_names[receiving_player] }}.
{% endfor %}

File diff suppressed because one or more lines are too long

1
data/factorio/techs.json Normal file

File diff suppressed because one or more lines are too long

View File

@ -1,12 +1,4 @@
general_options:
# File name of the v1.0 J rom
rom_file: "Zelda no Densetsu - Kamigami no Triforce (Japan).sfc"
# Set this to your (Q)Usb2Snes location if you want the MultiClient to attempt an auto start, does nothing if not found
qusb2snes: "QUsb2Snes\\QUsb2Snes.exe"
# Set this to false to never autostart a rom (such as after patching)
# True for operating system default program
# Alternatively, a path to a program to open the .sfc file with
rom_start: true
# Where to place output files
output_path: "output"
# Options for MultiServer
@ -104,3 +96,15 @@ multi_mystery_options:
# List of options that can be plando'd. Can be combined, for example "bosses, items"
# Available options: bosses, items, texts, connections
plando_options: "bosses"
lttp_options:
# File name of the v1.0 J rom
rom_file: "Zelda no Densetsu - Kamigami no Triforce (Japan).sfc"
# Set this to your (Q)Usb2Snes location if you want the MultiClient to attempt an auto start, does nothing if not found
qusb2snes: "QUsb2Snes\\QUsb2Snes.exe"
# Set this to false to never autostart a rom (such as after patching)
# True for operating system default program
# Alternatively, a path to a program to open the .sfc file with
rom_start: true
factorio_options:
executable: "factorio\\bin\\x64\\factorio"
script-output: "factorio\\script-output"

View File

@ -6,7 +6,7 @@
[Setup]
; NOTE: The value of AppId uniquely identifies this application.
; Do not use the same AppId value in installers for other applications.
AppId={{6D826EE0-49BE-4B36-BACE-09C6971CD85C}}
AppId={{918BA46A-FAB8-460C-9DFF-AE691E1C865B}}
AppName={#MyAppName}
AppVerName={#MyAppName}
DefaultDirName={commonappdata}\{#MyAppName}

View File

@ -6,7 +6,7 @@
[Setup]
; NOTE: The value of AppId uniquely identifies this application.
; Do not use the same AppId value in installers for other applications.
AppId={{6D826EE0-49BE-4B36-BACE-09C6971CD85C}}
AppId={{918BA46A-FAB8-460C-9DFF-AE691E1C865B}}
AppName={#MyAppName}
AppVerName={#MyAppName}
DefaultDirName={commonappdata}\{#MyAppName}

View File

@ -21,6 +21,19 @@ description: Template Name # Used to describe your yaml. Useful if you have mult
name: YourName{number} # Your name in-game. Spaces will be replaced with underscores and there is a 16 character limit
#{player} will be replaced with the player's slot number.
#{number} will be replaced with the counter value of the name.
game:
A Link to the Past: 1
Hollow Knight: 1
Factorio: 1
# Shared Options supported by all games:
accessibility:
items: 0 # Guarantees you will be able to acquire all items, but you may not be able to access all locations
locations: 50 # Guarantees you will be able to access all locations, and therefore all items
none: 0 # Guarantees only that the game is beatable. You may not be able to access all locations or acquire all items
progression_balancing:
on: 50 # A system to reduce BK, as in times during which you can't do anything by moving your items into an earlier access sphere to make it likely you have stuff to do
off: 0 # Turn this off if you don't mind a longer multiworld, or can glitch/sequence break around missing items.
# A Link to the Past options:
### Logic Section ###
# Warning: overworld_glitches is not available and minor_glitches is only partially implemented on the door-rando version
glitches_required: # Determine the logic required to complete the seed
@ -37,13 +50,6 @@ restrict_dungeon_item_on_boss: # aka ambrosia boss items
on: 0 # prevents unshuffled compasses, maps and keys to be boss drops, they can still drop keysanity and other players' items
off: 50
### End of Logic Section ###
meta_ignore: # Nullify options specified in the meta.yaml file. Adding an option here guarantees it will not occur in your seed, even if the .yaml file specifies it
mode:
- inverted # Never play inverted seeds
retro:
- on # Never play retro seeds
weapons:
- swordless # Never play a swordless seed
map_shuffle: # Shuffle dungeon maps into the world and other dungeons, including other players' worlds
on: 0
off: 50
@ -72,10 +78,6 @@ dungeon_counters:
pickup: 50 # Show when compass is picked up
default: 0 # Show when compass is picked up if the compass itself is shuffled
off: 0 # Never show item count in dungeons
accessibility:
items: 0 # Guarantees you will be able to acquire all items, but you may not be able to access all locations
locations: 50 # Guarantees you will be able to access all locations, and therefore all items
none: 0 # Guarantees only that the game is beatable. You may not be able to access all locations or acquire all items
progressive: # Enable or disable progressive items (swords, shields, bow)
on: 50 # All items are progressive
off: 0 # No items are progressive
@ -95,8 +97,8 @@ entrance_shuffle:
crossed-group-myfriends: 0 # using this method, everyone with "group-myfriends" will share the same seed
goals:
ganon: 50 # Climb GT, defeat Agahnim 2, and then kill Ganon
fast_ganon: 0 # Only killing Ganon is required. However, items may still be placed in GT
dungeons: 0 # Defeat the boss of all dungeons, including Agahnim's tower and GT (Aga 2)
crystals: 0 # Only killing Ganon is required. However, items may still be placed in GT
bosses: 0 # Defeat the boss of all dungeons, including Agahnim's tower and GT (Aga 2)
pedestal: 0 # Pull the Triforce from the Master Sword pedestal
ganon_pedestal: 0 # Pull the Master Sword pedestal, then kill Ganon
triforce_hunt: 0 # Collect 20 of 30 Triforce pieces spread throughout the worlds, then turn them in to Murahadala in front of Hyrule Castle
@ -183,9 +185,6 @@ item_functionality:
normal: 50 # Vanilla item functionality
hard: 0 # Reduced helpfulness of items (potions less effective, can't catch faeries, cape uses double magic, byrna does not grant invulnerability, boomerangs do not stun, silvers disabled outside ganon)
expert: 0 # Vastly reduces the helpfulness of items (potions barely effective, can't catch faeries, cape uses double magic, byrna does not grant invulnerability, boomerangs and hookshot do not stun, silvers disabled outside ganon)
progression_balancing:
on: 50 # A system to reduce BK, as in times during which you can't do anything by moving your items into an earlier access sphere to make it likely you have stuff to do
off: 0 # Turn this off if you don't mind a longer multiworld, or can glitch around missing items.
tile_shuffle: # Randomize the tile layouts in flying tile rooms
on: 0
off: 50
@ -202,9 +201,9 @@ turtle_rock_medallion: # required medallion to open Turtle Rock front entrance
### Enemizer Section ###
boss_shuffle:
none: 50 # Vanilla bosses
simple: 0 # Existing bosses except Ganon and Agahnim are shuffled throughout dungeons
full: 0 # 3 bosses can occur twice
random: 0 # Any boss can appear any amount of times
basic: 0 # Existing bosses except Ganon and Agahnim are shuffled throughout dungeons
random: 0 # 3 bosses can occur twice
chaos: 0 # Any boss can appear any amount of times
singularity: 0 # Picks a boss, tries to put it everywhere that works, if there's spaces remaining it picks a boss to fill those
enemy_shuffle: # Randomize enemy placement
on: 0
@ -294,6 +293,14 @@ green_clock_time: # For all timer modes, the amount of time in minutes to gain o
glitch_boots:
on: 50 # Start with Pegasus Boots in any glitched logic mode that makes use of them
off: 0
# meta_ignore, linked_options and triggers work for any game
meta_ignore: # Nullify options specified in the meta.yaml file. Adding an option here guarantees it will not occur in your seed, even if the .yaml file specifies it
mode:
- inverted # Never play inverted seeds
retro:
- on # Never play retro seeds
weapons:
- swordless # Never play a swordless seed
linked_options:
- name: crosskeys
options: # These overwrite earlier options if the percentage chance triggers
@ -336,7 +343,7 @@ triggers:
percentage: 0 # AND has a 0 percent chance (meaning this is default disabled, just to show how it works)
options: # then inserts these options
swords: assured
### door rando only options ###
### door rando only options (not supported at all yet on this branch) ###
door_shuffle: # Only available if the host uses the doors branch, it is ignored otherwise
vanilla: 50 # Everything should be like in vanilla
basic: 0 # Dungeons are shuffled within themselves

View File

@ -3,7 +3,8 @@ websockets>=8.1
PyYAML>=5.4.1
fuzzywuzzy>=0.18.0
bsdiff4>=1.2.0
prompt_toolkit>=3.0.16
prompt_toolkit>=3.0.18
appdirs>=1.4.4
maseya-z3pr>=1.0.0rc1
xxtea>=2.0.0.post0
factorio-rcon-py>=1.2.1

View File

@ -54,12 +54,12 @@ def manifest_creation():
print("Created Manifest")
scripts = {"MultiClient.py": "ArchipelagoLttPClient",
scripts = {"LttPClient.py": "ArchipelagoLttPClient",
"MultiMystery.py": "ArchipelagoMultiMystery",
"MultiServer.py": "ArchipelagoServer",
"gui.py": "ArchipelagoLttPCreator",
"Mystery.py": "ArchipelagoMystery",
"Adjuster.py": "ArchipelagoLttPAdjuster"}
"LttPAdjuster.py": "ArchipelagoLttPAdjuster"}
exes = []

View File

@ -7,20 +7,26 @@ __all__ = {"lookup_any_item_id_to_name",
from .alttp.Items import lookup_id_to_name as alttp
from .hk.Items import lookup_id_to_name as hk
lookup_any_item_id_to_name = {**alttp, **hk}
from .factorio import Technologies
lookup_any_item_id_to_name = {**alttp, **hk, **Technologies.lookup_id_to_name}
lookup_any_item_name_to_id = {name: id for id, name in lookup_any_item_id_to_name.items()}
from .alttp import Regions
from .hk import Locations
lookup_any_location_id_to_name = {**Regions.lookup_id_to_name, **Locations.lookup_id_to_name}
lookup_any_location_id_to_name = {**Regions.lookup_id_to_name, **Locations.lookup_id_to_name,
**Technologies.lookup_id_to_name}
lookup_any_location_name_to_id = {name: id for id, name in lookup_any_location_id_to_name.items()}
network_data_package = {"lookup_any_location_id_to_name": lookup_any_location_id_to_name,
"lookup_any_item_id_to_name": lookup_any_item_id_to_name,
"version": 1}
"version": 2}
@enum.unique
class Games(str, enum.Enum):
HK = "Hollow Knight"
LTTP = "A Link to the Past"
Factorio = "Factorio"

58
worlds/factorio/Mod.py Normal file
View File

@ -0,0 +1,58 @@
"""Outputs a Factorio Mod to facilitate integration with Archipelago"""
import os
from typing import Optional
import threading
import json
import jinja2
import Utils
import shutil
from BaseClasses import MultiWorld
from .Technologies import tech_table
template: Optional[jinja2.Template] = None
locale_template: Optional[jinja2.Template] = None
template_load_lock = threading.Lock()
base_info = {
"version": Utils.__version__,
"title": "Archipelago",
"author": "Berserker",
"homepage": "https://archipelago.gg",
"description": "Integration client for the Archipelago Randomizer",
"factorio_version": "1.1"
}
def generate_mod(world: MultiWorld, player: int):
global template, locale_template
with template_load_lock:
if not template:
template = jinja2.Template(open(Utils.local_path("data", "factorio", "mod_template", "data-final-fixes.lua")).read())
locale_template = jinja2.Template(open(Utils.local_path("data", "factorio", "mod_template", "locale", "en", "locale.cfg")).read())
# get data for templates
player_names = {x: world.player_names[x][0] for x in world.player_ids}
locations = []
for location in world.get_filled_locations(player):
if not location.name.startswith("recipe-"): # introduce this a new location property?
locations.append((location.name, location.item.name, location.item.player))
mod_name = f"archipelago-client-{world.seed}-{player}"
template_data = {"locations": locations, "player_names" : player_names, "tech_table": tech_table,
"mod_name": mod_name}
mod_code = template.render(**template_data)
mod_dir = Utils.output_path(mod_name)
en_locale_dir = os.path.join(mod_dir, "locale", "en")
os.makedirs(en_locale_dir, exist_ok=True)
shutil.copytree(Utils.local_path("data", "factorio", "mod"), mod_dir, dirs_exist_ok=True)
with open(os.path.join(mod_dir, "data-final-fixes.lua"), "wt") as f:
f.write(mod_code)
locale_content = locale_template.render(**template_data)
with open(os.path.join(en_locale_dir, "locale.cfg"), "wt") as f:
f.write(locale_content)
info = base_info.copy()
info["name"] = mod_name
with open(os.path.join(mod_dir, "info.json"), "wt") as f:
json.dump(info, f, indent=4)

View File

@ -0,0 +1,46 @@
# Factorio technologies are imported from a .json document in /data
from typing import Dict
import os
import json
import Utils
factorio_id = 2**17
source_file = Utils.local_path("data", "factorio", "techs.json")
with open(source_file) as f:
raw = json.load(f)
tech_table = {}
requirements = {}
ingredients = {}
all_ingredients = set()
# TODO: export this dynamically, or filter it during export
starting_ingredient_recipes = {"automation-science-pack"}
# recipes and technologies can share names in Factorio
for technology in sorted(raw):
data = raw[technology]
tech_table[technology] = factorio_id
factorio_id += 1
if data["requires"]:
requirements[technology] = set(data["requires"])
current_ingredients = set(data["ingredients"])-starting_ingredient_recipes
if current_ingredients:
all_ingredients |= current_ingredients
current_ingredients = {"recipe-"+ingredient for ingredient in current_ingredients}
ingredients[technology] = current_ingredients
recipe_sources = {}
for technology, data in raw.items():
recipe_source = all_ingredients & set(data["unlocks"])
for recipe in recipe_source:
recipe_sources["recipe-"+recipe] = technology
all_ingredients = {"recipe-"+ingredient for ingredient in all_ingredients}
del(raw)
lookup_id_to_name: Dict[int, str] = {item_id: item_name for item_name, item_id in tech_table.items()}

View File

@ -0,0 +1,59 @@
import logging
from BaseClasses import Region, Entrance, Location, MultiWorld, Item
from .Technologies import tech_table, requirements, ingredients, all_ingredients, recipe_sources
static_nodes = {"automation", "logistics"}
def gen_factorio(world: MultiWorld, player: int):
for tech_name, tech_id in tech_table.items():
tech_item = Item(tech_name, True, tech_id, player)
tech_item.game = "Factorio"
if tech_name in static_nodes:
loc = world.get_location(tech_name, player)
loc.item = tech_item
loc.locked = loc.event = True
else:
world.itempool.append(tech_item)
set_rules(world, player)
def factorio_create_regions(world: MultiWorld, player: int):
menu = Region("Menu", None, "Menu", player)
crash = Entrance(player, "Crash Land", menu)
menu.exits.append(crash)
nauvis = Region("Nauvis", None, "Nauvis", player)
nauvis.world = menu.world = world
for tech_name, tech_id in tech_table.items():
tech = Location(player, tech_name, tech_id, nauvis)
nauvis.locations.append(tech)
tech.game = "Factorio"
for ingredient in all_ingredients: # register science packs as events
ingredient_location = Location(player, ingredient, 0, nauvis)
ingredient_location.item = Item(ingredient, True, 0, player)
ingredient_location.event = ingredient_location.locked = True
menu.locations.append(ingredient_location)
crash.connect(nauvis)
world.regions += [menu, nauvis]
def set_rules(world: MultiWorld, player: int):
if world.logic[player] != 'nologic':
from worlds.generic import Rules
for tech_name in tech_table:
# vanilla layout, to be implemented
# rules = requirements.get(tech_name, set()) | ingredients.get(tech_name, set())
# loose nodes
rules = ingredients.get(tech_name, set())
if rules:
location = world.get_location(tech_name, player)
Rules.set_rule(location, lambda state, rules=rules: all(state.has(rule, player) for rule in rules))
for recipe, technology in recipe_sources.items():
Rules.set_rule(world.get_location(recipe, player), lambda state, tech=technology: state.has(tech, player))
world.completion_condition[player] = lambda state: all(state.has(ingredient, player)
for ingredient in all_ingredients)

View File

@ -24,12 +24,14 @@ def create_region(world: MultiWorld, player: int, name: str, locations=None, exi
return ret
class HKLocation(Location):
game: str = "Hollow Knight"
def __init__(self, player: int, name: str, address=None, parent=None):
super(HKLocation, self).__init__(player, name, address, parent)
class HKItem(Item):
game = "Hollow Knight"
@ -44,11 +46,10 @@ def gen_hollow(world: MultiWorld, player: int):
set_rules(world, player)
def link_regions(world: MultiWorld, player: int):
world.get_entrance('Hollow Nest S&Q', player).connect(world.get_region('Hollow Nest', player))
not_shufflable_types = {"Essence_Boss"}
option_to_type_lookup = {