953 lines
38 KiB
Python
953 lines
38 KiB
Python
from __future__ import annotations
|
|
|
|
import multiprocessing
|
|
import logging
|
|
import asyncio
|
|
import os.path
|
|
|
|
import nest_asyncio
|
|
import sc2
|
|
|
|
from sc2.main import run_game
|
|
from sc2.data import Race
|
|
from sc2.bot_ai import BotAI
|
|
from sc2.player import Bot
|
|
|
|
from worlds.sc2wol.Regions import MissionInfo
|
|
from worlds.sc2wol.MissionTables import lookup_id_to_mission
|
|
from worlds.sc2wol.Items import lookup_id_to_name, item_table
|
|
from worlds.sc2wol.Locations import SC2WOL_LOC_ID_OFFSET
|
|
from worlds.sc2wol import SC2WoLWorld
|
|
|
|
from pathlib import Path
|
|
import re
|
|
from MultiServer import mark_raw
|
|
import ctypes
|
|
import sys
|
|
|
|
from Utils import init_logging, is_windows
|
|
|
|
if __name__ == "__main__":
|
|
init_logging("SC2Client", exception_logger="Client")
|
|
|
|
logger = logging.getLogger("Client")
|
|
sc2_logger = logging.getLogger("Starcraft2")
|
|
|
|
import colorama
|
|
|
|
from NetUtils import *
|
|
from CommonClient import CommonContext, server_loop, ClientCommandProcessor, gui_enabled, get_base_parser
|
|
|
|
nest_asyncio.apply()
|
|
|
|
|
|
class StarcraftClientProcessor(ClientCommandProcessor):
|
|
ctx: SC2Context
|
|
|
|
def _cmd_difficulty(self, difficulty: str = "") -> bool:
|
|
"""Overrides the current difficulty set for the seed. Takes the argument casual, normal, hard, or brutal"""
|
|
options = difficulty.split()
|
|
num_options = len(options)
|
|
difficulty_choice = options[0].lower()
|
|
|
|
if num_options > 0:
|
|
if difficulty_choice == "casual":
|
|
self.ctx.difficulty_override = 0
|
|
elif difficulty_choice == "normal":
|
|
self.ctx.difficulty_override = 1
|
|
elif difficulty_choice == "hard":
|
|
self.ctx.difficulty_override = 2
|
|
elif difficulty_choice == "brutal":
|
|
self.ctx.difficulty_override = 3
|
|
else:
|
|
self.output("Unable to parse difficulty '" + options[0] + "'")
|
|
return False
|
|
|
|
self.output("Difficulty set to " + options[0])
|
|
return True
|
|
|
|
else:
|
|
self.output("Difficulty needs to be specified in the command.")
|
|
return False
|
|
|
|
def _cmd_disable_mission_check(self) -> bool:
|
|
"""Disables the check to see if a mission is available to play. Meant for co-op runs where one player can play
|
|
the next mission in a chain the other player is doing."""
|
|
self.ctx.missions_unlocked = True
|
|
sc2_logger.info("Mission check has been disabled")
|
|
return True
|
|
|
|
def _cmd_play(self, mission_id: str = "") -> bool:
|
|
"""Start a Starcraft 2 mission"""
|
|
|
|
options = mission_id.split()
|
|
num_options = len(options)
|
|
|
|
if num_options > 0:
|
|
mission_number = int(options[0])
|
|
|
|
self.ctx.play_mission(mission_number)
|
|
|
|
else:
|
|
sc2_logger.info(
|
|
"Mission ID needs to be specified. Use /unfinished or /available to view ids for available missions.")
|
|
return False
|
|
|
|
return True
|
|
|
|
def _cmd_available(self) -> bool:
|
|
"""Get what missions are currently available to play"""
|
|
|
|
request_available_missions(self.ctx.checked_locations, self.ctx.mission_req_table, self.ctx.ui)
|
|
return True
|
|
|
|
def _cmd_unfinished(self) -> bool:
|
|
"""Get what missions are currently available to play and have not had all locations checked"""
|
|
|
|
request_unfinished_missions(self.ctx.checked_locations, self.ctx.mission_req_table, self.ctx.ui, self.ctx)
|
|
return True
|
|
|
|
@mark_raw
|
|
def _cmd_set_path(self, path: str = '') -> bool:
|
|
"""Manually set the SC2 install directory (if the automatic detection fails)."""
|
|
if path:
|
|
os.environ["SC2PATH"] = path
|
|
check_mod_install()
|
|
return True
|
|
else:
|
|
sc2_logger.warning("When using set_path, you must type the path to your SC2 install directory.")
|
|
return False
|
|
|
|
|
|
class SC2Context(CommonContext):
|
|
command_processor = StarcraftClientProcessor
|
|
game = "Starcraft 2 Wings of Liberty"
|
|
items_handling = 0b111
|
|
difficulty = -1
|
|
all_in_choice = 0
|
|
mission_req_table = None
|
|
items_rec_to_announce = []
|
|
rec_announce_pos = 0
|
|
items_sent_to_announce = []
|
|
sent_announce_pos = 0
|
|
announcements = []
|
|
announcement_pos = 0
|
|
sc2_run_task: typing.Optional[asyncio.Task] = None
|
|
missions_unlocked = False
|
|
current_tooltip = None
|
|
last_loc_list = None
|
|
difficulty_override = -1
|
|
|
|
async def server_auth(self, password_requested: bool = False):
|
|
if password_requested and not self.password:
|
|
await super(SC2Context, self).server_auth(password_requested)
|
|
await self.get_username()
|
|
await self.send_connect()
|
|
|
|
def on_package(self, cmd: str, args: dict):
|
|
if cmd in {"Connected"}:
|
|
self.difficulty = args["slot_data"]["game_difficulty"]
|
|
self.all_in_choice = args["slot_data"]["all_in_map"]
|
|
slot_req_table = args["slot_data"]["mission_req"]
|
|
self.mission_req_table = {}
|
|
# Compatibility for 0.3.2 server data.
|
|
if "category" not in next(iter(slot_req_table)):
|
|
for i, mission_data in enumerate(slot_req_table.values()):
|
|
mission_data["category"] = wol_default_categories[i]
|
|
for mission in slot_req_table:
|
|
self.mission_req_table[mission] = MissionInfo(**slot_req_table[mission])
|
|
|
|
# Look for and set SC2PATH.
|
|
# check_game_install_path() returns True if and only if it finds + sets SC2PATH.
|
|
if "SC2PATH" not in os.environ and check_game_install_path():
|
|
check_mod_install()
|
|
|
|
if cmd in {"PrintJSON"}:
|
|
if "receiving" in args:
|
|
if self.slot_concerns_self(args["receiving"]):
|
|
self.announcements.append(args["data"])
|
|
return
|
|
if "item" in args:
|
|
if self.slot_concerns_self(args["item"].player):
|
|
self.announcements.append(args["data"])
|
|
|
|
def run_gui(self):
|
|
from kvui import GameManager, HoverBehavior, ServerToolTip, fade_in_animation
|
|
from kivy.app import App
|
|
from kivy.clock import Clock
|
|
from kivy.uix.tabbedpanel import TabbedPanelItem
|
|
from kivy.uix.gridlayout import GridLayout
|
|
from kivy.lang import Builder
|
|
from kivy.uix.label import Label
|
|
from kivy.uix.button import Button
|
|
from kivy.uix.floatlayout import FloatLayout
|
|
from kivy.properties import StringProperty
|
|
|
|
import Utils
|
|
|
|
class HoverableButton(HoverBehavior, Button):
|
|
pass
|
|
|
|
class MissionButton(HoverableButton):
|
|
tooltip_text = StringProperty("Test")
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(HoverableButton, self).__init__(*args, **kwargs)
|
|
self.layout = FloatLayout()
|
|
self.popuplabel = ServerToolTip(text=self.text)
|
|
self.layout.add_widget(self.popuplabel)
|
|
|
|
def on_enter(self):
|
|
self.popuplabel.text = self.tooltip_text
|
|
|
|
if self.ctx.current_tooltip:
|
|
App.get_running_app().root.remove_widget(self.ctx.current_tooltip)
|
|
|
|
if self.tooltip_text == "":
|
|
self.ctx.current_tooltip = None
|
|
else:
|
|
App.get_running_app().root.add_widget(self.layout)
|
|
self.ctx.current_tooltip = self.layout
|
|
|
|
def on_leave(self):
|
|
if self.ctx.current_tooltip:
|
|
App.get_running_app().root.remove_widget(self.ctx.current_tooltip)
|
|
|
|
self.ctx.current_tooltip = None
|
|
|
|
@property
|
|
def ctx(self) -> CommonContext:
|
|
return App.get_running_app().ctx
|
|
|
|
class MissionLayout(GridLayout):
|
|
pass
|
|
|
|
class MissionCategory(GridLayout):
|
|
pass
|
|
|
|
class SC2Manager(GameManager):
|
|
logging_pairs = [
|
|
("Client", "Archipelago"),
|
|
("Starcraft2", "Starcraft2"),
|
|
]
|
|
base_title = "Archipelago Starcraft 2 Client"
|
|
|
|
mission_panel = None
|
|
last_checked_locations = {}
|
|
mission_id_to_button = {}
|
|
launching = False
|
|
refresh_from_launching = True
|
|
first_check = True
|
|
|
|
def __init__(self, ctx):
|
|
super().__init__(ctx)
|
|
|
|
def build(self):
|
|
container = super().build()
|
|
|
|
panel = TabbedPanelItem(text="Starcraft 2 Launcher")
|
|
self.mission_panel = panel.content = MissionLayout()
|
|
|
|
self.tabs.add_widget(panel)
|
|
|
|
Clock.schedule_interval(self.build_mission_table, 0.5)
|
|
|
|
return container
|
|
|
|
def build_mission_table(self, dt):
|
|
if (not self.launching and (not self.last_checked_locations == self.ctx.checked_locations or
|
|
not self.refresh_from_launching)) or self.first_check:
|
|
self.refresh_from_launching = True
|
|
|
|
self.mission_panel.clear_widgets()
|
|
|
|
if self.ctx.mission_req_table:
|
|
self.last_checked_locations = self.ctx.checked_locations.copy()
|
|
self.first_check = False
|
|
|
|
self.mission_id_to_button = {}
|
|
categories = {}
|
|
available_missions = []
|
|
unfinished_locations = initialize_blank_mission_dict(self.ctx.mission_req_table)
|
|
unfinished_missions = calc_unfinished_missions(self.ctx.checked_locations,
|
|
self.ctx.mission_req_table,
|
|
self.ctx, available_missions=available_missions,
|
|
unfinished_locations=unfinished_locations)
|
|
|
|
# separate missions into categories
|
|
for mission in self.ctx.mission_req_table:
|
|
if not self.ctx.mission_req_table[mission].category in categories:
|
|
categories[self.ctx.mission_req_table[mission].category] = []
|
|
|
|
categories[self.ctx.mission_req_table[mission].category].append(mission)
|
|
|
|
for category in categories:
|
|
category_panel = MissionCategory()
|
|
category_panel.add_widget(Label(text=category, size_hint_y=None, height=50, outline_width=1))
|
|
|
|
# Map is completed
|
|
for mission in categories[category]:
|
|
text = mission
|
|
tooltip = ""
|
|
|
|
# Map has uncollected locations
|
|
if mission in unfinished_missions:
|
|
text = f"[color=6495ED]{text}[/color]"
|
|
|
|
tooltip = f"Uncollected locations:\n"
|
|
tooltip += "\n".join(location for location in unfinished_locations[mission])
|
|
elif mission in available_missions:
|
|
text = f"[color=FFFFFF]{text}[/color]"
|
|
# Map requirements not met
|
|
else:
|
|
text = f"[color=a9a9a9]{text}[/color]"
|
|
tooltip = f"Requires: "
|
|
if len(self.ctx.mission_req_table[mission].required_world) > 0:
|
|
tooltip += ", ".join(list(self.ctx.mission_req_table)[req_mission-1] for
|
|
req_mission in
|
|
self.ctx.mission_req_table[mission].required_world)
|
|
|
|
if self.ctx.mission_req_table[mission].number > 0:
|
|
tooltip += " and "
|
|
if self.ctx.mission_req_table[mission].number > 0:
|
|
tooltip += f"{self.ctx.mission_req_table[mission].number} missions completed"
|
|
|
|
mission_button = MissionButton(text=text, size_hint_y=None, height=50)
|
|
mission_button.tooltip_text = tooltip
|
|
mission_button.bind(on_press=self.mission_callback)
|
|
self.mission_id_to_button[self.ctx.mission_req_table[mission].id] = mission_button
|
|
category_panel.add_widget(mission_button)
|
|
|
|
category_panel.add_widget(Label(text=""))
|
|
self.mission_panel.add_widget(category_panel)
|
|
|
|
elif self.launching:
|
|
self.refresh_from_launching = False
|
|
|
|
self.mission_panel.clear_widgets()
|
|
self.mission_panel.add_widget(Label(text="Launching Mission"))
|
|
|
|
def mission_callback(self, button):
|
|
if not self.launching:
|
|
self.ctx.play_mission(list(self.mission_id_to_button.keys())
|
|
[list(self.mission_id_to_button.values()).index(button)])
|
|
self.launching = True
|
|
Clock.schedule_once(self.finish_launching, 10)
|
|
|
|
def finish_launching(self, dt):
|
|
self.launching = False
|
|
|
|
self.ui = SC2Manager(self)
|
|
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
|
|
|
|
Builder.load_file(Utils.local_path(os.path.dirname(SC2WoLWorld.__file__), "Starcraft2.kv"))
|
|
|
|
async def shutdown(self):
|
|
await super(SC2Context, self).shutdown()
|
|
if self.sc2_run_task:
|
|
self.sc2_run_task.cancel()
|
|
|
|
def play_mission(self, mission_id):
|
|
if self.missions_unlocked or \
|
|
is_mission_available(mission_id, self.checked_locations, self.mission_req_table):
|
|
if self.sc2_run_task:
|
|
if not self.sc2_run_task.done():
|
|
sc2_logger.warning("Starcraft 2 Client is still running!")
|
|
self.sc2_run_task.cancel() # doesn't actually close the game, just stops the python task
|
|
if self.slot is None:
|
|
sc2_logger.warning("Launching Mission without Archipelago authentication, "
|
|
"checks will not be registered to server.")
|
|
self.sc2_run_task = asyncio.create_task(starcraft_launch(self, mission_id),
|
|
name="Starcraft 2 Launch")
|
|
else:
|
|
sc2_logger.info(
|
|
f"{lookup_id_to_mission[mission_id]} is not currently unlocked. "
|
|
f"Use /unfinished or /available to see what is available.")
|
|
|
|
|
|
async def main():
|
|
multiprocessing.freeze_support()
|
|
parser = get_base_parser()
|
|
parser.add_argument('--name', default=None, help="Slot Name to connect as.")
|
|
args = parser.parse_args()
|
|
|
|
ctx = SC2Context(args.connect, args.password)
|
|
ctx.auth = args.name
|
|
if ctx.server_task is None:
|
|
ctx.server_task = asyncio.create_task(server_loop(ctx), name="ServerLoop")
|
|
|
|
if gui_enabled:
|
|
ctx.run_gui()
|
|
ctx.run_cli()
|
|
|
|
await ctx.exit_event.wait()
|
|
|
|
await ctx.shutdown()
|
|
|
|
|
|
maps_table = [
|
|
"ap_traynor01", "ap_traynor02", "ap_traynor03",
|
|
"ap_thanson01", "ap_thanson02", "ap_thanson03a", "ap_thanson03b",
|
|
"ap_ttychus01", "ap_ttychus02", "ap_ttychus03", "ap_ttychus04", "ap_ttychus05",
|
|
"ap_ttosh01", "ap_ttosh02", "ap_ttosh03a", "ap_ttosh03b",
|
|
"ap_thorner01", "ap_thorner02", "ap_thorner03", "ap_thorner04", "ap_thorner05s",
|
|
"ap_tzeratul01", "ap_tzeratul02", "ap_tzeratul03", "ap_tzeratul04",
|
|
"ap_tvalerian01", "ap_tvalerian02a", "ap_tvalerian02b", "ap_tvalerian03"
|
|
]
|
|
|
|
wol_default_categories = [
|
|
"Mar Sara", "Mar Sara", "Mar Sara", "Colonist", "Colonist", "Colonist", "Colonist",
|
|
"Artifact", "Artifact", "Artifact", "Artifact", "Artifact", "Covert", "Covert", "Covert", "Covert",
|
|
"Rebellion", "Rebellion", "Rebellion", "Rebellion", "Rebellion", "Prophecy", "Prophecy", "Prophecy", "Prophecy",
|
|
"Char", "Char", "Char", "Char"
|
|
]
|
|
|
|
|
|
def calculate_items(items):
|
|
unit_unlocks = 0
|
|
armory1_unlocks = 0
|
|
armory2_unlocks = 0
|
|
upgrade_unlocks = 0
|
|
building_unlocks = 0
|
|
merc_unlocks = 0
|
|
lab_unlocks = 0
|
|
protoss_unlock = 0
|
|
minerals = 0
|
|
vespene = 0
|
|
supply = 0
|
|
|
|
for item in items:
|
|
data = lookup_id_to_name[item.item]
|
|
|
|
if item_table[data].type == "Unit":
|
|
unit_unlocks += (1 << item_table[data].number)
|
|
elif item_table[data].type == "Upgrade":
|
|
upgrade_unlocks += (1 << item_table[data].number)
|
|
elif item_table[data].type == "Armory 1":
|
|
armory1_unlocks += (1 << item_table[data].number)
|
|
elif item_table[data].type == "Armory 2":
|
|
armory2_unlocks += (1 << item_table[data].number)
|
|
elif item_table[data].type == "Building":
|
|
building_unlocks += (1 << item_table[data].number)
|
|
elif item_table[data].type == "Mercenary":
|
|
merc_unlocks += (1 << item_table[data].number)
|
|
elif item_table[data].type == "Laboratory":
|
|
lab_unlocks += (1 << item_table[data].number)
|
|
elif item_table[data].type == "Protoss":
|
|
protoss_unlock += (1 << item_table[data].number)
|
|
elif item_table[data].type == "Minerals":
|
|
minerals += item_table[data].number
|
|
elif item_table[data].type == "Vespene":
|
|
vespene += item_table[data].number
|
|
elif item_table[data].type == "Supply":
|
|
supply += item_table[data].number
|
|
|
|
return [unit_unlocks, upgrade_unlocks, armory1_unlocks, armory2_unlocks, building_unlocks, merc_unlocks,
|
|
lab_unlocks, protoss_unlock, minerals, vespene, supply]
|
|
|
|
|
|
def calc_difficulty(difficulty):
|
|
if difficulty == 0:
|
|
return 'C'
|
|
elif difficulty == 1:
|
|
return 'N'
|
|
elif difficulty == 2:
|
|
return 'H'
|
|
elif difficulty == 3:
|
|
return 'B'
|
|
|
|
return 'X'
|
|
|
|
|
|
async def starcraft_launch(ctx: SC2Context, mission_id):
|
|
ctx.rec_announce_pos = len(ctx.items_rec_to_announce)
|
|
ctx.sent_announce_pos = len(ctx.items_sent_to_announce)
|
|
ctx.announcements_pos = len(ctx.announcements)
|
|
|
|
sc2_logger.info(f"Launching {lookup_id_to_mission[mission_id]}. If game does not launch check log file for errors.")
|
|
|
|
with DllDirectory(None):
|
|
run_game(sc2.maps.get(maps_table[mission_id - 1]), [Bot(Race.Terran, ArchipelagoBot(ctx, mission_id),
|
|
name="Archipelago", fullscreen=True)], realtime=True)
|
|
|
|
|
|
class ArchipelagoBot(sc2.bot_ai.BotAI):
|
|
game_running = False
|
|
mission_completed = False
|
|
first_bonus = False
|
|
second_bonus = False
|
|
third_bonus = False
|
|
fourth_bonus = False
|
|
fifth_bonus = False
|
|
sixth_bonus = False
|
|
seventh_bonus = False
|
|
eight_bonus = False
|
|
ctx: SC2Context = None
|
|
mission_id = 0
|
|
|
|
can_read_game = False
|
|
|
|
last_received_update = 0
|
|
|
|
def __init__(self, ctx: SC2Context, mission_id):
|
|
self.ctx = ctx
|
|
self.mission_id = mission_id
|
|
|
|
super(ArchipelagoBot, self).__init__()
|
|
|
|
async def on_step(self, iteration: int):
|
|
game_state = 0
|
|
if iteration == 0:
|
|
start_items = calculate_items(self.ctx.items_received)
|
|
if self.ctx.difficulty_override >= 0:
|
|
difficulty = calc_difficulty(self.ctx.difficulty_override)
|
|
else:
|
|
difficulty = calc_difficulty(self.ctx.difficulty)
|
|
await self.chat_send("ArchipelagoLoad {} {} {} {} {} {} {} {} {} {} {} {} {}".format(
|
|
difficulty,
|
|
start_items[0], start_items[1], start_items[2], start_items[3], start_items[4],
|
|
start_items[5], start_items[6], start_items[7], start_items[8], start_items[9],
|
|
self.ctx.all_in_choice, start_items[10]))
|
|
self.last_received_update = len(self.ctx.items_received)
|
|
|
|
else:
|
|
if self.ctx.announcement_pos < len(self.ctx.announcements):
|
|
index = 0
|
|
message = ""
|
|
while index < len(self.ctx.announcements[self.ctx.announcement_pos]):
|
|
message += self.ctx.announcements[self.ctx.announcement_pos][index]["text"]
|
|
index += 1
|
|
|
|
index = 0
|
|
start_rem_pos = -1
|
|
# Remove unneeded [Color] tags
|
|
while index < len(message):
|
|
if message[index] == '[':
|
|
start_rem_pos = index
|
|
index += 1
|
|
elif message[index] == ']' and start_rem_pos > -1:
|
|
temp_msg = ""
|
|
|
|
if start_rem_pos > 0:
|
|
temp_msg = message[:start_rem_pos]
|
|
if index < len(message) - 1:
|
|
temp_msg += message[index + 1:]
|
|
|
|
message = temp_msg
|
|
index += start_rem_pos - index
|
|
start_rem_pos = -1
|
|
else:
|
|
index += 1
|
|
|
|
await self.chat_send("SendMessage " + message)
|
|
self.ctx.announcement_pos += 1
|
|
|
|
# Archipelago reads the health
|
|
for unit in self.all_own_units():
|
|
if unit.health_max == 38281:
|
|
game_state = int(38281 - unit.health)
|
|
self.can_read_game = True
|
|
|
|
if iteration == 160 and not game_state & 1:
|
|
await self.chat_send("SendMessage Warning: Archipelago unable to connect or has lost connection to " +
|
|
"Starcraft 2 (This is likely a map issue)")
|
|
|
|
if self.last_received_update < len(self.ctx.items_received):
|
|
current_items = calculate_items(self.ctx.items_received)
|
|
await self.chat_send("UpdateTech {} {} {} {} {} {} {} {}".format(
|
|
current_items[0], current_items[1], current_items[2], current_items[3], current_items[4],
|
|
current_items[5], current_items[6], current_items[7]))
|
|
self.last_received_update = len(self.ctx.items_received)
|
|
|
|
if game_state & 1:
|
|
if not self.game_running:
|
|
print("Archipelago Connected")
|
|
self.game_running = True
|
|
|
|
if self.can_read_game:
|
|
if game_state & (1 << 1) and not self.mission_completed:
|
|
if self.mission_id != 29:
|
|
print("Mission Completed")
|
|
await self.ctx.send_msgs([
|
|
{"cmd": 'LocationChecks', "locations": [SC2WOL_LOC_ID_OFFSET + 100 * self.mission_id]}])
|
|
self.mission_completed = True
|
|
else:
|
|
print("Game Complete")
|
|
await self.ctx.send_msgs([{"cmd": 'StatusUpdate', "status": ClientStatus.CLIENT_GOAL}])
|
|
self.mission_completed = True
|
|
|
|
if game_state & (1 << 2) and not self.first_bonus:
|
|
print("1st Bonus Collected")
|
|
await self.ctx.send_msgs(
|
|
[{"cmd": 'LocationChecks',
|
|
"locations": [SC2WOL_LOC_ID_OFFSET + 100 * self.mission_id + 1]}])
|
|
self.first_bonus = True
|
|
|
|
if not self.second_bonus and game_state & (1 << 3):
|
|
print("2nd Bonus Collected")
|
|
await self.ctx.send_msgs(
|
|
[{"cmd": 'LocationChecks',
|
|
"locations": [SC2WOL_LOC_ID_OFFSET + 100 * self.mission_id + 2]}])
|
|
self.second_bonus = True
|
|
|
|
if not self.third_bonus and game_state & (1 << 4):
|
|
print("3rd Bonus Collected")
|
|
await self.ctx.send_msgs(
|
|
[{"cmd": 'LocationChecks',
|
|
"locations": [SC2WOL_LOC_ID_OFFSET + 100 * self.mission_id + 3]}])
|
|
self.third_bonus = True
|
|
|
|
if not self.fourth_bonus and game_state & (1 << 5):
|
|
print("4th Bonus Collected")
|
|
await self.ctx.send_msgs(
|
|
[{"cmd": 'LocationChecks',
|
|
"locations": [SC2WOL_LOC_ID_OFFSET + 100 * self.mission_id + 4]}])
|
|
self.fourth_bonus = True
|
|
|
|
if not self.fifth_bonus and game_state & (1 << 6):
|
|
print("5th Bonus Collected")
|
|
await self.ctx.send_msgs(
|
|
[{"cmd": 'LocationChecks',
|
|
"locations": [SC2WOL_LOC_ID_OFFSET + 100 * self.mission_id + 5]}])
|
|
self.fifth_bonus = True
|
|
|
|
if not self.sixth_bonus and game_state & (1 << 7):
|
|
print("6th Bonus Collected")
|
|
await self.ctx.send_msgs(
|
|
[{"cmd": 'LocationChecks',
|
|
"locations": [SC2WOL_LOC_ID_OFFSET + 100 * self.mission_id + 6]}])
|
|
self.sixth_bonus = True
|
|
|
|
if not self.seventh_bonus and game_state & (1 << 8):
|
|
print("6th Bonus Collected")
|
|
await self.ctx.send_msgs(
|
|
[{"cmd": 'LocationChecks',
|
|
"locations": [SC2WOL_LOC_ID_OFFSET + 100 * self.mission_id + 7]}])
|
|
self.seventh_bonus = True
|
|
|
|
if not self.eight_bonus and game_state & (1 << 9):
|
|
print("6th Bonus Collected")
|
|
await self.ctx.send_msgs(
|
|
[{"cmd": 'LocationChecks',
|
|
"locations": [SC2WOL_LOC_ID_OFFSET + 100 * self.mission_id + 8]}])
|
|
self.eight_bonus = True
|
|
|
|
else:
|
|
await self.chat_send("LostConnection - Lost connection to game.")
|
|
|
|
|
|
def calc_objectives_completed(mission, missions_info, locations_done, unfinished_locations, ctx):
|
|
objectives_complete = 0
|
|
|
|
if missions_info[mission].extra_locations > 0:
|
|
for i in range(missions_info[mission].extra_locations):
|
|
if (missions_info[mission].id * 100 + SC2WOL_LOC_ID_OFFSET + i) in locations_done:
|
|
objectives_complete += 1
|
|
else:
|
|
unfinished_locations[mission].append(ctx.location_names[
|
|
missions_info[mission].id * 100 + SC2WOL_LOC_ID_OFFSET + i])
|
|
|
|
return objectives_complete
|
|
|
|
else:
|
|
return -1
|
|
|
|
|
|
def request_unfinished_missions(locations_done, location_table, ui, ctx):
|
|
if location_table:
|
|
message = "Unfinished Missions: "
|
|
unlocks = initialize_blank_mission_dict(location_table)
|
|
unfinished_locations = initialize_blank_mission_dict(location_table)
|
|
|
|
unfinished_missions = calc_unfinished_missions(locations_done, location_table, ctx, unlocks=unlocks,
|
|
unfinished_locations=unfinished_locations)
|
|
|
|
message += ", ".join(f"{mark_up_mission_name(mission, location_table, ui,unlocks)}[{location_table[mission].id}] " +
|
|
mark_up_objectives(
|
|
f"[{unfinished_missions[mission]}/{location_table[mission].extra_locations}]",
|
|
ctx, unfinished_locations, mission)
|
|
for mission in unfinished_missions)
|
|
|
|
if ui:
|
|
ui.log_panels['All'].on_message_markup(message)
|
|
ui.log_panels['Starcraft2'].on_message_markup(message)
|
|
else:
|
|
sc2_logger.info(message)
|
|
else:
|
|
sc2_logger.warning("No mission table found, you are likely not connected to a server.")
|
|
|
|
|
|
def calc_unfinished_missions(locations_done, locations, ctx, unlocks=None, unfinished_locations=None,
|
|
available_missions=[]):
|
|
unfinished_missions = []
|
|
locations_completed = []
|
|
|
|
if not unlocks:
|
|
unlocks = initialize_blank_mission_dict(locations)
|
|
|
|
if not unfinished_locations:
|
|
unfinished_locations = initialize_blank_mission_dict(locations)
|
|
|
|
if len(available_missions) > 0:
|
|
available_missions = []
|
|
|
|
available_missions.extend(calc_available_missions(locations_done, locations, unlocks))
|
|
|
|
for name in available_missions:
|
|
if not locations[name].extra_locations == -1:
|
|
objectives_completed = calc_objectives_completed(name, locations, locations_done, unfinished_locations, ctx)
|
|
|
|
if objectives_completed < locations[name].extra_locations:
|
|
unfinished_missions.append(name)
|
|
locations_completed.append(objectives_completed)
|
|
|
|
else:
|
|
unfinished_missions.append(name)
|
|
locations_completed.append(-1)
|
|
|
|
return {unfinished_missions[i]: locations_completed[i] for i in range(len(unfinished_missions))}
|
|
|
|
|
|
def is_mission_available(mission_id_to_check, locations_done, locations):
|
|
unfinished_missions = calc_available_missions(locations_done, locations)
|
|
|
|
return any(mission_id_to_check == locations[mission].id for mission in unfinished_missions)
|
|
|
|
|
|
def mark_up_mission_name(mission, location_table, ui, unlock_table):
|
|
"""Checks if the mission is required for game completion and adds '*' to the name to mark that."""
|
|
|
|
if location_table[mission].completion_critical:
|
|
if ui:
|
|
message = "[color=AF99EF]" + mission + "[/color]"
|
|
else:
|
|
message = "*" + mission + "*"
|
|
else:
|
|
message = mission
|
|
|
|
if ui:
|
|
unlocks = unlock_table[mission]
|
|
|
|
if len(unlocks) > 0:
|
|
pre_message = f"[ref={list(location_table).index(mission)}|Unlocks: "
|
|
pre_message += ", ".join(f"{unlock}({location_table[unlock].id})" for unlock in unlocks)
|
|
pre_message += f"]"
|
|
message = pre_message + message + "[/ref]"
|
|
|
|
return message
|
|
|
|
|
|
def mark_up_objectives(message, ctx, unfinished_locations, mission):
|
|
formatted_message = message
|
|
|
|
if ctx.ui:
|
|
locations = unfinished_locations[mission]
|
|
|
|
pre_message = f"[ref={list(ctx.mission_req_table).index(mission)+30}|"
|
|
pre_message += "<br>".join(location for location in locations)
|
|
pre_message += f"]"
|
|
formatted_message = pre_message + message + "[/ref]"
|
|
|
|
return formatted_message
|
|
|
|
|
|
def request_available_missions(locations_done, location_table, ui):
|
|
if location_table:
|
|
message = "Available Missions: "
|
|
|
|
# Initialize mission unlock table
|
|
unlocks = initialize_blank_mission_dict(location_table)
|
|
|
|
missions = calc_available_missions(locations_done, location_table, unlocks)
|
|
message += \
|
|
", ".join(f"{mark_up_mission_name(mission, location_table, ui, unlocks)}[{location_table[mission].id}]"
|
|
for mission in missions)
|
|
|
|
if ui:
|
|
ui.log_panels['All'].on_message_markup(message)
|
|
ui.log_panels['Starcraft2'].on_message_markup(message)
|
|
else:
|
|
sc2_logger.info(message)
|
|
else:
|
|
sc2_logger.warning("No mission table found, you are likely not connected to a server.")
|
|
|
|
|
|
def calc_available_missions(locations_done, locations, unlocks=None):
|
|
available_missions = []
|
|
missions_complete = 0
|
|
|
|
# Get number of missions completed
|
|
for loc in locations_done:
|
|
if loc % 100 == 0:
|
|
missions_complete += 1
|
|
|
|
for name in locations:
|
|
# Go through the required missions for each mission and fill up unlock table used later for hover-over tooltips
|
|
if unlocks:
|
|
for unlock in locations[name].required_world:
|
|
unlocks[list(locations)[unlock-1]].append(name)
|
|
|
|
if mission_reqs_completed(name, missions_complete, locations_done, locations):
|
|
available_missions.append(name)
|
|
|
|
return available_missions
|
|
|
|
|
|
def mission_reqs_completed(location_to_check, missions_complete, locations_done, locations):
|
|
"""Returns a bool signifying if the mission has all requirements complete and can be done
|
|
|
|
Keyword arguments:
|
|
locations_to_check -- the mission string name to check
|
|
missions_complete -- an int of how many missions have been completed
|
|
locations_done -- a list of the location ids that have been complete
|
|
locations -- a dict of MissionInfo for mission requirements for this world"""
|
|
if len(locations[location_to_check].required_world) >= 1:
|
|
# A check for when the requirements are being or'd
|
|
or_success = False
|
|
|
|
# Loop through required missions
|
|
for req_mission in locations[location_to_check].required_world:
|
|
req_success = True
|
|
|
|
# Check if required mission has been completed
|
|
if not (locations[list(locations)[req_mission-1]].id * 100 + SC2WOL_LOC_ID_OFFSET) in locations_done:
|
|
if not locations[location_to_check].or_requirements:
|
|
return False
|
|
else:
|
|
req_success = False
|
|
|
|
# Recursively check required mission to see if it's requirements are met, in case !collect has been done
|
|
if not mission_reqs_completed(list(locations)[req_mission-1], missions_complete, locations_done,
|
|
locations):
|
|
if not locations[location_to_check].or_requirements:
|
|
return False
|
|
else:
|
|
req_success = False
|
|
|
|
# If requirement check succeeded mark or as satisfied
|
|
if locations[location_to_check].or_requirements and req_success:
|
|
or_success = True
|
|
|
|
if locations[location_to_check].or_requirements:
|
|
# Return false if or requirements not met
|
|
if not or_success:
|
|
return False
|
|
|
|
# Check number of missions
|
|
if missions_complete >= locations[location_to_check].number:
|
|
return True
|
|
else:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
|
|
def initialize_blank_mission_dict(location_table):
|
|
unlocks = {}
|
|
|
|
for mission in list(location_table):
|
|
unlocks[mission] = []
|
|
|
|
return unlocks
|
|
|
|
|
|
def check_game_install_path() -> bool:
|
|
# First thing: go to the default location for ExecuteInfo.
|
|
# An exception for Windows is included because it's very difficult to find ~\Documents if the user moved it.
|
|
if is_windows:
|
|
# The next five lines of utterly inscrutable code are brought to you by copy-paste from Stack Overflow.
|
|
# https://stackoverflow.com/questions/6227590/finding-the-users-my-documents-path/30924555#
|
|
import ctypes.wintypes
|
|
CSIDL_PERSONAL = 5 # My Documents
|
|
SHGFP_TYPE_CURRENT = 0 # Get current, not default value
|
|
|
|
buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
|
|
ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf)
|
|
documentspath = buf.value
|
|
einfo = str(documentspath / Path("StarCraft II\\ExecuteInfo.txt"))
|
|
else:
|
|
einfo = str(sc2.paths.get_home() / Path(sc2.paths.USERPATH[sc2.paths.PF]))
|
|
|
|
# Check if the file exists.
|
|
if os.path.isfile(einfo):
|
|
|
|
# Open the file and read it, picking out the latest executable's path.
|
|
with open(einfo) as f:
|
|
content = f.read()
|
|
if content:
|
|
base = re.search(r" = (.*)Versions", content).group(1)
|
|
if os.path.exists(base):
|
|
executable = sc2.paths.latest_executeble(Path(base).expanduser() / "Versions")
|
|
|
|
# Finally, check the path for an actual executable.
|
|
# If we find one, great. Set up the SC2PATH.
|
|
if os.path.isfile(executable):
|
|
sc2_logger.info(f"Found an SC2 install at {base}!")
|
|
sc2_logger.debug(f"Latest executable at {executable}.")
|
|
os.environ["SC2PATH"] = base
|
|
sc2_logger.debug(f"SC2PATH set to {base}.")
|
|
return True
|
|
else:
|
|
sc2_logger.warning(f"We may have found an SC2 install at {base}, but couldn't find {executable}.")
|
|
else:
|
|
sc2_logger.warning(f"{einfo} pointed to {base}, but we could not find an SC2 install there.")
|
|
else:
|
|
sc2_logger.warning(f"Couldn't find {einfo}. Please run /set_path with your SC2 install directory.")
|
|
return False
|
|
|
|
|
|
def check_mod_install() -> bool:
|
|
# Pull up the SC2PATH if set. If not, encourage the user to manually run /set_path.
|
|
try:
|
|
# Check inside the Mods folder for Archipelago.SC2Mod. If found, tell user. If not, tell user.
|
|
if os.path.isfile(modfile := (os.environ["SC2PATH"] / Path("Mods") / Path("Archipelago.SC2Mod"))):
|
|
sc2_logger.info(f"Archipelago mod found at {modfile}.")
|
|
return True
|
|
else:
|
|
sc2_logger.warning(f"Archipelago mod could not be found at {modfile}. Please install the mod file there.")
|
|
except KeyError:
|
|
sc2_logger.warning(f"SC2PATH isn't set. Please run /set_path with the path to your SC2 install.")
|
|
return False
|
|
|
|
|
|
class DllDirectory:
|
|
# Credit to Black Sliver for this code.
|
|
# More info: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-setdlldirectoryw
|
|
_old: typing.Optional[str] = None
|
|
_new: typing.Optional[str] = None
|
|
|
|
def __init__(self, new: typing.Optional[str]):
|
|
self._new = new
|
|
|
|
def __enter__(self):
|
|
old = self.get()
|
|
if self.set(self._new):
|
|
self._old = old
|
|
|
|
def __exit__(self, *args):
|
|
if self._old is not None:
|
|
self.set(self._old)
|
|
|
|
@staticmethod
|
|
def get() -> str:
|
|
if sys.platform == "win32":
|
|
n = ctypes.windll.kernel32.GetDllDirectoryW(0, None)
|
|
buf = ctypes.create_unicode_buffer(n)
|
|
ctypes.windll.kernel32.GetDllDirectoryW(n, buf)
|
|
return buf.value
|
|
# NOTE: other OS may support os.environ["LD_LIBRARY_PATH"], but this fix is windows-specific
|
|
return None
|
|
|
|
@staticmethod
|
|
def set(s: typing.Optional[str]) -> bool:
|
|
if sys.platform == "win32":
|
|
return ctypes.windll.kernel32.SetDllDirectoryW(s) != 0
|
|
# NOTE: other OS may support os.environ["LD_LIBRARY_PATH"], but this fix is windows-specific
|
|
return False
|
|
|
|
|
|
if __name__ == '__main__':
|
|
colorama.init()
|
|
asyncio.run(main())
|
|
colorama.deinit()
|