279 lines
13 KiB
Python
279 lines
13 KiB
Python
import base64
|
|
import logging
|
|
import time
|
|
|
|
from NetUtils import ClientStatus
|
|
from worlds._bizhawk.client import BizHawkClient
|
|
from worlds._bizhawk import read, write, guarded_write
|
|
|
|
from .locations import location_data
|
|
|
|
logger = logging.getLogger("Client")
|
|
|
|
BANK_EXCHANGE_RATE = 50000000
|
|
|
|
DATA_LOCATIONS = {
|
|
"ItemIndex": (0x1A6E, 0x02),
|
|
"Deathlink": (0x00FD, 0x01),
|
|
"APItem": (0x00FF, 0x01),
|
|
"EventFlag": (0x1735, 0x140),
|
|
"Missable": (0x161A, 0x20),
|
|
"Hidden": (0x16DE, 0x0E),
|
|
"Rod": (0x1716, 0x01),
|
|
"DexSanityFlag": (0x1A71, 19),
|
|
"GameStatus": (0x1A84, 0x01),
|
|
"Money": (0x141F, 3),
|
|
"ResetCheck": (0x0100, 4),
|
|
# First and second Vermilion Gym trash can selection. Second is not used, so should always be 0.
|
|
# First should never be above 0x0F. This is just before Event Flags.
|
|
"CrashCheck1": (0x1731, 2),
|
|
# Unused, should always be 0. This is just before Missables flags.
|
|
"CrashCheck2": (0x1617, 1),
|
|
# Progressive keys, should never be above 10. Just before Dexsanity flags.
|
|
"CrashCheck3": (0x1A70, 1),
|
|
# Route 18 script value. Should never be above 2. Just before Hidden items flags.
|
|
"CrashCheck4": (0x16DD, 1),
|
|
}
|
|
|
|
location_map = {"Rod": {}, "EventFlag": {}, "Missable": {}, "Hidden": {}, "list": {}, "DexSanityFlag": {}}
|
|
location_bytes_bits = {}
|
|
for location in location_data:
|
|
if location.ram_address is not None:
|
|
if type(location.ram_address) == list:
|
|
location_map[type(location.ram_address).__name__][(location.ram_address[0].flag, location.ram_address[1].flag)] = location.address
|
|
location_bytes_bits[location.address] = [{'byte': location.ram_address[0].byte, 'bit': location.ram_address[0].bit},
|
|
{'byte': location.ram_address[1].byte, 'bit': location.ram_address[1].bit}]
|
|
else:
|
|
location_map[type(location.ram_address).__name__][location.ram_address.flag] = location.address
|
|
location_bytes_bits[location.address] = {'byte': location.ram_address.byte, 'bit': location.ram_address.bit}
|
|
|
|
location_name_to_id = {location.name: location.address for location in location_data if location.type == "Item"
|
|
and location.address is not None}
|
|
|
|
|
|
class PokemonRBClient(BizHawkClient):
|
|
system = ("GB", "SGB")
|
|
patch_suffix = (".apred", ".apblue")
|
|
game = "Pokemon Red and Blue"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.auto_hints = set()
|
|
self.locations_array = None
|
|
self.disconnect_pending = False
|
|
self.set_deathlink = False
|
|
self.banking_command = None
|
|
self.game_state = False
|
|
self.last_death_link = 0
|
|
|
|
async def validate_rom(self, ctx):
|
|
game_name = await read(ctx.bizhawk_ctx, [(0x134, 12, "ROM")])
|
|
game_name = game_name[0].decode("ascii")
|
|
if game_name in ("POKEMON RED\00", "POKEMON BLUE"):
|
|
ctx.game = self.game
|
|
ctx.items_handling = 0b001
|
|
ctx.command_processor.commands["bank"] = cmd_bank
|
|
seed_name = await read(ctx.bizhawk_ctx, [(0xFFDB, 21, "ROM")])
|
|
ctx.seed_name = seed_name[0].split(b"\0")[0].decode("ascii")
|
|
self.set_deathlink = False
|
|
self.banking_command = None
|
|
self.locations_array = None
|
|
self.disconnect_pending = False
|
|
return True
|
|
return False
|
|
|
|
async def set_auth(self, ctx):
|
|
auth_name = await read(ctx.bizhawk_ctx, [(0xFFC6, 21, "ROM")])
|
|
if auth_name[0] == bytes([0] * 21):
|
|
# rom was patched before rom names implemented, use player name
|
|
auth_name = await read(ctx.bizhawk_ctx, [(0xFFF0, 16, "ROM")])
|
|
auth_name = auth_name[0].decode("ascii").split("\x00")[0]
|
|
else:
|
|
auth_name = base64.b64encode(auth_name[0]).decode()
|
|
ctx.auth = auth_name
|
|
|
|
async def game_watcher(self, ctx):
|
|
if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed:
|
|
return
|
|
|
|
data = await read(ctx.bizhawk_ctx, [(loc_data[0], loc_data[1], "WRAM")
|
|
for loc_data in DATA_LOCATIONS.values()])
|
|
data = {data_set_name: data_name for data_set_name, data_name in zip(DATA_LOCATIONS.keys(), data)}
|
|
|
|
if self.set_deathlink:
|
|
self.set_deathlink = False
|
|
await ctx.update_death_link(True)
|
|
|
|
if self.disconnect_pending:
|
|
self.disconnect_pending = False
|
|
await ctx.disconnect()
|
|
|
|
if data["GameStatus"][0] == 0 or data["ResetCheck"] == b'\xff\xff\xff\x7f':
|
|
# Do not handle anything before game save is loaded
|
|
self.game_state = False
|
|
return
|
|
elif (data["GameStatus"][0] not in (0x2A, 0xAC)
|
|
or data["CrashCheck1"][0] & 0xF0 or data["CrashCheck1"][1] & 0xFF
|
|
or data["CrashCheck2"][0]
|
|
or data["CrashCheck3"][0] > 10
|
|
or data["CrashCheck4"][0] > 2):
|
|
# Should mean game crashed
|
|
logger.warning("Pokémon Red/Blue game may have crashed. Disconnecting from server.")
|
|
self.game_state = False
|
|
await ctx.disconnect()
|
|
return
|
|
self.game_state = True
|
|
|
|
# SEND ITEMS TO CLIENT
|
|
|
|
if data["APItem"][0] == 0:
|
|
item_index = int.from_bytes(data["ItemIndex"], "little")
|
|
if len(ctx.items_received) > item_index:
|
|
item_code = ctx.items_received[item_index].item - 172000000
|
|
if item_code > 255:
|
|
item_code -= 256
|
|
await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["APItem"][0],
|
|
[item_code], "WRAM")])
|
|
|
|
# LOCATION CHECKS
|
|
|
|
locations = set()
|
|
|
|
for flag_type, loc_map in location_map.items():
|
|
for flag, loc_id in loc_map.items():
|
|
if flag_type == "list":
|
|
if (data["EventFlag"][location_bytes_bits[loc_id][0]['byte']] & 1 <<
|
|
location_bytes_bits[loc_id][0]['bit']
|
|
and data["Missable"][location_bytes_bits[loc_id][1]['byte']] & 1 <<
|
|
location_bytes_bits[loc_id][1]['bit']):
|
|
locations.add(loc_id)
|
|
elif data[flag_type][location_bytes_bits[loc_id]['byte']] & 1 << location_bytes_bits[loc_id]['bit']:
|
|
locations.add(loc_id)
|
|
|
|
if locations != self.locations_array:
|
|
if locations:
|
|
self.locations_array = locations
|
|
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locations)}])
|
|
|
|
# AUTO HINTS
|
|
|
|
hints = []
|
|
if data["EventFlag"][280] & 16:
|
|
hints.append("Cerulean Bicycle Shop")
|
|
if data["EventFlag"][280] & 32:
|
|
hints.append("Route 2 Gate - Oak's Aide")
|
|
if data["EventFlag"][280] & 64:
|
|
hints.append("Route 11 Gate 2F - Oak's Aide")
|
|
if data["EventFlag"][280] & 128:
|
|
hints.append("Route 15 Gate 2F - Oak's Aide")
|
|
if data["EventFlag"][281] & 1:
|
|
hints += ["Celadon Prize Corner - Item Prize 1", "Celadon Prize Corner - Item Prize 2",
|
|
"Celadon Prize Corner - Item Prize 3"]
|
|
if (location_name_to_id["Fossil - Choice A"] in ctx.checked_locations and location_name_to_id[
|
|
"Fossil - Choice B"]
|
|
not in ctx.checked_locations):
|
|
hints.append("Fossil - Choice B")
|
|
elif (location_name_to_id["Fossil - Choice B"] in ctx.checked_locations and location_name_to_id[
|
|
"Fossil - Choice A"]
|
|
not in ctx.checked_locations):
|
|
hints.append("Fossil - Choice A")
|
|
hints = [
|
|
location_name_to_id[loc] for loc in hints if location_name_to_id[loc] not in self.auto_hints and
|
|
location_name_to_id[loc] in ctx.missing_locations and
|
|
location_name_to_id[loc] not in ctx.locations_checked
|
|
]
|
|
if hints:
|
|
await ctx.send_msgs([{"cmd": "LocationScouts", "locations": hints, "create_as_hint": 2}])
|
|
self.auto_hints.update(hints)
|
|
|
|
# DEATHLINK
|
|
|
|
if "DeathLink" in ctx.tags:
|
|
if data["Deathlink"][0] == 3:
|
|
await ctx.send_death(ctx.player_names[ctx.slot] + " is out of usable Pokémon! "
|
|
+ ctx.player_names[ctx.slot] + " blacked out!")
|
|
await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [0], "WRAM")])
|
|
self.last_death_link = ctx.last_death_link
|
|
elif ctx.last_death_link > self.last_death_link:
|
|
self.last_death_link = ctx.last_death_link
|
|
await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [1], "WRAM")])
|
|
|
|
# BANK
|
|
|
|
if self.banking_command:
|
|
original_money = data["Money"]
|
|
# Money is stored as binary-coded decimal.
|
|
money = int(original_money.hex())
|
|
if self.banking_command > money:
|
|
logger.warning(f"You do not have ${self.banking_command} to deposit!")
|
|
elif (-self.banking_command * BANK_EXCHANGE_RATE) > (ctx.stored_data[f"EnergyLink{ctx.team}"] or 0):
|
|
logger.warning("Not enough money in the EnergyLink storage!")
|
|
else:
|
|
if self.banking_command + money > 999999:
|
|
self.banking_command = 999999 - money
|
|
money = str(money - self.banking_command).zfill(6)
|
|
money = [int(money[:2], 16), int(money[2:4], 16), int(money[4:], 16)]
|
|
money_written = await guarded_write(ctx.bizhawk_ctx, [(0x141F, money, "WRAM")],
|
|
[(0x141F, original_money, "WRAM")])
|
|
if money_written:
|
|
if self.banking_command >= 0:
|
|
deposit = self.banking_command - int(self.banking_command / 4)
|
|
tax = self.banking_command - deposit
|
|
logger.info(f"Deposited ${deposit}, and charged a tax of ${tax}.")
|
|
self.banking_command = deposit
|
|
else:
|
|
logger.info(f"Withdrew ${-self.banking_command}.")
|
|
await ctx.send_msgs([{
|
|
"cmd": "Set", "key": f"EnergyLink{ctx.team}", "operations":
|
|
[{"operation": "add", "value": self.banking_command * BANK_EXCHANGE_RATE},
|
|
{"operation": "max", "value": 0}],
|
|
}])
|
|
self.banking_command = None
|
|
|
|
# VICTORY
|
|
|
|
if data["EventFlag"][280] & 1 and not ctx.finished_game:
|
|
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
|
|
ctx.finished_game = True
|
|
|
|
def on_package(self, ctx, cmd, args):
|
|
if cmd == 'Connected':
|
|
if 'death_link' in args['slot_data'] and args['slot_data']['death_link']:
|
|
self.set_deathlink = True
|
|
self.last_death_link = time.time()
|
|
ctx.set_notify(f"EnergyLink{ctx.team}")
|
|
elif cmd == 'RoomInfo':
|
|
if ctx.seed_name and ctx.seed_name != args["seed_name"]:
|
|
# CommonClient's on_package displays an error to the user in this case, but connection is not cancelled.
|
|
self.game_state = False
|
|
self.disconnect_pending = True
|
|
super().on_package(ctx, cmd, args)
|
|
|
|
|
|
def cmd_bank(self, cmd: str = "", amount: str = ""):
|
|
"""Deposit or withdraw money with the server's EnergyLink storage.
|
|
/bank - check server balance.
|
|
/bank deposit # - deposit money. One quarter of the amount will be lost to taxation.
|
|
/bank withdraw # - withdraw money."""
|
|
if self.ctx.game != "Pokemon Red and Blue":
|
|
logger.warning("This command can only be used while playing Pokémon Red and Blue")
|
|
return
|
|
if (not self.ctx.server) or self.ctx.server.socket.closed or not self.ctx.client_handler.game_state:
|
|
logger.info(f"Must be connected to server and in game.")
|
|
return
|
|
elif not cmd:
|
|
logger.info(f"Money available: {int((self.ctx.stored_data[f'EnergyLink{self.ctx.team}'] or 0) / BANK_EXCHANGE_RATE)}")
|
|
return
|
|
elif not amount:
|
|
logger.warning("You must specify an amount.")
|
|
elif cmd == "withdraw":
|
|
self.ctx.client_handler.banking_command = -int(amount)
|
|
elif cmd == "deposit":
|
|
if int(amount) < 4:
|
|
logger.warning("You must deposit at least $4, for tax purposes.")
|
|
return
|
|
self.ctx.client_handler.banking_command = int(amount)
|
|
else:
|
|
logger.warning(f"Invalid bank command {cmd}")
|
|
return
|