361 lines
16 KiB
Python
361 lines
16 KiB
Python
from BaseClasses import ItemClassification
|
|
from worlds.Files import APDeltaPatch
|
|
|
|
import Utils
|
|
import os
|
|
import hashlib
|
|
import bsdiff4
|
|
from .lz10 import gba_decompress, gba_compress
|
|
|
|
from .BN3RomUtils import ArchiveToReferences, read_u16_le, read_u32_le, int16_to_byte_list_le, int32_to_byte_list_le, \
|
|
generate_progressive_undernet, ArchiveToSizeComp, ArchiveToSizeUncomp, generate_item_message, \
|
|
generate_external_item_message, generate_text_bytes, dictChar
|
|
|
|
from .Items import ItemType
|
|
|
|
CHECKSUM_BLUE = "6fe31df0144759b34ad666badaacc442"
|
|
|
|
|
|
def list_contains_subsequence(lst, sublist) -> bool:
|
|
sub_index = 0
|
|
for index, item in enumerate(lst):
|
|
if item == sublist[sub_index]:
|
|
sub_index += 1
|
|
if sub_index >= len(sublist):
|
|
return True
|
|
else:
|
|
sub_index = 0
|
|
return False
|
|
|
|
|
|
class ArchiveScript:
|
|
def __init__(self, index, message_bytes):
|
|
self.index = index
|
|
self.messageBoxes = []
|
|
|
|
self.set_bytes(message_bytes)
|
|
|
|
def get_bytes(self):
|
|
data = []
|
|
for message in self.messageBoxes:
|
|
data.extend(message)
|
|
return data
|
|
|
|
def set_bytes(self, message_bytes):
|
|
self.messageBoxes = []
|
|
|
|
message_box = []
|
|
# message_box_chars = []
|
|
|
|
command_index = 0
|
|
byte_index = 0
|
|
for byte in message_bytes:
|
|
byte_index += 1
|
|
if command_index <= 0 and (byte == 0xE9 or byte == 0xE7):
|
|
if byte == 0xE9: # More textboxes to come, don't end it yet
|
|
message_box.append(byte)
|
|
# message_box_chars.append(hex(byte))
|
|
self.messageBoxes.append(message_box)
|
|
else: # It's the end of the script, add another message to end it after this one
|
|
self.messageBoxes.append(message_box)
|
|
self.messageBoxes.append([0xE7])
|
|
|
|
message_box = []
|
|
message_box_chars = []
|
|
|
|
else:
|
|
if command_index <= 0:
|
|
# We can hit a command that might contain an E9 or an E7. If we do, skip checking the next few bytes
|
|
if byte == 0xF6: # CheckItem
|
|
command_index = 7
|
|
if byte == 0xF3: # CheckFlag
|
|
# For whatever reason, the "Check Navi Customizer" command is one byte shorter than the other
|
|
# Check flags. If the next byte is 0x28, our command is only 5 bytes long.
|
|
if message_bytes[byte_index] == 0x28:
|
|
command_index = 5
|
|
else:
|
|
command_index = 6
|
|
if byte == 0xF2: # FlagSet
|
|
command_index = 4
|
|
command_index -= 1
|
|
message_box.append(byte)
|
|
# message_box_chars.append(dictChar[byte] if byte in dictChar else hex(byte))
|
|
|
|
# If there's still bytes left over, add them even if we didn't hit an end
|
|
if len(message_box) > 0:
|
|
self.messageBoxes.append(message_box)
|
|
|
|
def __str__(self):
|
|
s = str(self.index)+' - \n'
|
|
for messageBox in self.messageBoxes:
|
|
s += ' '+str(["{:02x}".format(x) for x in messageBox])+'\n'
|
|
|
|
|
|
class TextArchive:
|
|
def __init__(self, data, offset, size, compressed=True):
|
|
self.startOffset = offset
|
|
self.compressed = compressed
|
|
self.scripts = {}
|
|
self.scriptCount = 0xFF
|
|
self.references = ArchiveToReferences[offset]
|
|
self.unused_indices = [] # A list of places it's okay to inject new scripts
|
|
self.progressive_undernet_indices = [] # If this archive has progressive undernet, here they are in order
|
|
|
|
self.text_changed = False
|
|
|
|
if compressed:
|
|
self.compressedSize = size
|
|
self.compressedData = data
|
|
self.uncompressedData = gba_decompress(self.compressedData)
|
|
self.uncompressedSize = len(self.uncompressedData)
|
|
else:
|
|
self.uncompressedSize = size
|
|
self.uncompressedData = data
|
|
self.compressedData = gba_compress(self.uncompressedData)
|
|
self.compressedSize = len(self.compressedData)
|
|
self.scriptCount = (read_u16_le(self.uncompressedData, 0)) >> 1
|
|
|
|
for i in range(0, self.scriptCount):
|
|
start_offset = read_u16_le(self.uncompressedData, i * 2)
|
|
next_offset = read_u16_le(self.uncompressedData, (i + 1) * 2)
|
|
|
|
if start_offset != next_offset:
|
|
message_bytes = list(self.uncompressedData[start_offset:next_offset])
|
|
message = ArchiveScript(i, message_bytes)
|
|
self.scripts[i] = message
|
|
else:
|
|
self.unused_indices.append(i)
|
|
|
|
def generate_data(self, compressed=True):
|
|
header = []
|
|
scripts = []
|
|
byte_offset = self.scriptCount * 2
|
|
for i in range(0, self.scriptCount):
|
|
header.extend(int16_to_byte_list_le(byte_offset))
|
|
if i in self.scripts:
|
|
script = self.scripts[i]
|
|
scriptbytes = script.get_bytes()
|
|
scripts.extend(scriptbytes)
|
|
byte_offset += len(scriptbytes)
|
|
|
|
data = []
|
|
data.extend(header)
|
|
data.extend(scripts)
|
|
byte_data = bytes(data)
|
|
if compressed:
|
|
byte_data = gba_compress(byte_data)
|
|
|
|
return bytearray(byte_data)
|
|
|
|
def inject_item_message(self, script_index, message_indices, new_bytes):
|
|
# First step, if the old message had any flag sets or flag clears, we need to keep them.
|
|
# Mystery data has a flag set to actually remove the mystery data, and jobs often have a completion flag
|
|
for message_index in message_indices:
|
|
# print(hex(self.startOffset) + ": " + str(script_index) + " " + str(message_indices))
|
|
oldbytes = self.scripts[script_index].messageBoxes[message_index]
|
|
for i in range(len(oldbytes)-3):
|
|
# F2 00 is the code for "flagSet", with the two bytes after it being the flag to set.
|
|
# F2 04 is the code for "flagClear", which also needs to come along for the ride
|
|
# Add those to the message box after the other text.
|
|
if oldbytes[i] == 0xF2 and (oldbytes[i+1] == 0x00 or oldbytes[i+1] == 0x04):
|
|
flag = oldbytes[i:i+4]
|
|
new_bytes.extend(flag)
|
|
|
|
first_message_index = message_indices[0]
|
|
# Then, overwrite the existing script with the new one
|
|
self.scripts[script_index].messageBoxes[first_message_index] = new_bytes
|
|
for index in message_indices[1:]:
|
|
self.scripts[script_index].messageBoxes[index] = []
|
|
|
|
def inject_into_rom(self, modified_rom_data):
|
|
working_data = self.generate_data(self.compressed)
|
|
|
|
# It needs to start on a byte divisible by 4. If the rom data is not, add an FF
|
|
while len(modified_rom_data) % 4 != 0:
|
|
modified_rom_data.append(0xFF)
|
|
new_start_offset = 0x08000000 + len(modified_rom_data)
|
|
offset_byte = int32_to_byte_list_le(new_start_offset)
|
|
modified_rom_data.extend(working_data)
|
|
for offset in self.references:
|
|
modified_rom_data[offset:offset+4] = offset_byte
|
|
return modified_rom_data
|
|
|
|
def add_progression_scripts(self):
|
|
if len(self.unused_indices) < 9:
|
|
# As far as I know, this should literally not be possible.
|
|
# Every script I've looked at has dozens of unused indices, so finding 9 (8 plus one "ending" script)
|
|
# should be no problem. We re-use these so we don't have to worry about an area getting tons of these
|
|
raise AssertionError("Error in generation -- not enough room for progressive undernet in archive "+self.startOffset)
|
|
for i in range(9): # There are 8 progressive undernet ranks
|
|
new_script_index = self.unused_indices[i]
|
|
new_script = ArchiveScript(new_script_index, generate_progressive_undernet(i, self.unused_indices[i+1]))
|
|
self.scripts[new_script_index] = new_script
|
|
self.progressive_undernet_indices.append(new_script_index)
|
|
self.unused_indices = self.unused_indices[9:] # Remove the first eight elements
|
|
|
|
def inject_item_text(self, item_text, next_message=""):
|
|
item_text_bytes = generate_text_bytes(item_text)
|
|
next_message_bytes = generate_text_bytes(next_message)
|
|
for script_index in self.scripts:
|
|
script = self.scripts[script_index]
|
|
# Loop through the bytes
|
|
for message_index in range(0, len(script.messageBoxes)):
|
|
oldbytes = self.scripts[script_index].messageBoxes[message_index]
|
|
for i in range(0, len(oldbytes)-1):
|
|
if oldbytes[i] == 0x68 and oldbytes[i+1] == 0x68:
|
|
oldbytes[i:i+2] = item_text_bytes
|
|
self.text_changed = True
|
|
|
|
# If there's another text box to display, add it to the message bytes before setting them back
|
|
if len(next_message) > 0:
|
|
oldbytes.extend(next_message_bytes)
|
|
# TODO append end message nextline etc.
|
|
# I think this is "wait for button press" then "clearmessage"
|
|
oldbytes.extend([0xEB, 0xE9])
|
|
self.scripts[script_index].messageBoxes[message_index] = oldbytes
|
|
|
|
|
|
class LocalRom:
|
|
def __init__(self, file, name=None):
|
|
self.name = name
|
|
self.changed_archives = {}
|
|
|
|
self.rom_data = bytearray(get_patched_rom_bytes(file))
|
|
|
|
def get_data_chunk(self, start_offset, size):
|
|
if start_offset+size > len(self.rom_data):
|
|
print("Attempting to get data chunk beyond the size of the ROM: "+hex(start_offset)+", ROM size ends at: "+hex(len(self.rom_data)))
|
|
return self.rom_data[start_offset:start_offset + size]
|
|
|
|
def replace_item(self, location, item):
|
|
offset = location.text_archive_address
|
|
# If the archive is already loaded, use that
|
|
if offset in self.changed_archives:
|
|
archive = self.changed_archives[offset]
|
|
else:
|
|
is_compressed = offset in ArchiveToSizeComp.keys()
|
|
size = ArchiveToSizeComp[offset] if is_compressed\
|
|
else ArchiveToSizeUncomp[offset]
|
|
data = self.get_data_chunk(offset, size)
|
|
# Check if the archive we want to load has been moved by the patch. This is indicated by a 0xFF 0xFF
|
|
# as the first two bytes of the chunk
|
|
|
|
if data[0] == 0xFF and data[1] == 0xFF:
|
|
new_size_bytes = data[2:4]
|
|
new_address_le = data[4:8]
|
|
# Last byte should be zero since we're dealing with purely ROM address space
|
|
new_address_le[3] = 0x0
|
|
size = read_u16_le(new_size_bytes, 0)
|
|
data = self.get_data_chunk(read_u32_le(new_address_le, 0), size)
|
|
|
|
|
|
archive = TextArchive(data, offset, size, is_compressed)
|
|
self.changed_archives[offset] = archive
|
|
|
|
if item.type == ItemType.Undernet:
|
|
if len(archive.progressive_undernet_indices) == 0:
|
|
archive.add_progression_scripts() # Generate the new scripts
|
|
# Replace the item text box as normal. We just also add a new jump at the end of the script
|
|
item_bytes = generate_item_message(item)
|
|
changed_script = archive.scripts[location.text_script_index]
|
|
# There isn't a "Jump unconditional", so we fake one. Check flag 0 and jump
|
|
# to the start of our progression regardless of outcome
|
|
jump_to_first_undernet_bytes = [0xF3, 0x00,
|
|
0x00, 0x00,
|
|
archive.progressive_undernet_indices[0],
|
|
archive.progressive_undernet_indices[0]]
|
|
# Insert the new message second-to-last (the last index should be an end all by itself)
|
|
changed_script.messageBoxes.insert(-1, jump_to_first_undernet_bytes)
|
|
# item_bytes = jump_to_first_undernet_bytes
|
|
elif item.type == ItemType.External:
|
|
item_bytes = generate_external_item_message(item.itemName, item.recipient)
|
|
else:
|
|
item_bytes = generate_item_message(item)
|
|
archive.inject_item_message(location.text_script_index, location.text_box_indices,
|
|
item_bytes)
|
|
|
|
|
|
def insert_hint_text(self, location, short_text, long_text = ""):
|
|
"""
|
|
Replaces the placeholder text in this location's archive with short_text,
|
|
gives another text box for long_text if it's present
|
|
"""
|
|
|
|
# Replace item name placeholders
|
|
if location.inject_name:
|
|
offset = location.text_archive_address
|
|
# If the archive is already loaded, use that
|
|
if offset in self.changed_archives:
|
|
archive = self.changed_archives[offset]
|
|
else:
|
|
# It should be theoretically impossible to call insert_hint_text before actually injecting the item.
|
|
raise AssertionError("Inserting a hint at a location that doesn't have an item!")
|
|
archive.inject_item_text(short_text, long_text)
|
|
|
|
|
|
def inject_name(self, player):
|
|
authname = player
|
|
authname = authname+('\x00' * (63 - len(player)))
|
|
self.rom_data[0x7FFFC0:0x7FFFFF] = bytes(authname, 'utf8')
|
|
|
|
def write_changed_rom(self):
|
|
for archive in self.changed_archives.values():
|
|
self.rom_data = archive.inject_into_rom(self.rom_data)
|
|
|
|
def write_to_file(self, out_path):
|
|
with open(out_path, "wb") as rom:
|
|
rom.write(self.rom_data)
|
|
|
|
|
|
class MMBN3DeltaPatch(APDeltaPatch):
|
|
hash = CHECKSUM_BLUE
|
|
game = "MegaMan Battle Network 3"
|
|
patch_file_ending = ".apbn3"
|
|
result_file_ending = ".gba"
|
|
|
|
@classmethod
|
|
def get_source_data(cls) -> bytes:
|
|
return get_base_rom_bytes()
|
|
|
|
|
|
def get_base_rom_path(file_name: str = "") -> str:
|
|
options = Utils.get_options()
|
|
if not file_name:
|
|
bn3_options = options.get("mmbn3_options", None)
|
|
if bn3_options is None:
|
|
file_name = "Mega Man Battle Network 3 - Blue Version (USA).gba"
|
|
else:
|
|
file_name = bn3_options["rom_file"]
|
|
if not os.path.exists(file_name):
|
|
file_name = Utils.local_path(file_name)
|
|
return file_name
|
|
|
|
|
|
def get_base_rom_bytes(file_name: str = "") -> bytes:
|
|
base_rom_bytes = getattr(get_base_rom_bytes, "base_rom_bytes", None)
|
|
if not base_rom_bytes:
|
|
file_name = get_base_rom_path(file_name)
|
|
base_rom_bytes = bytes(open(file_name, "rb").read())
|
|
|
|
basemd5 = hashlib.md5()
|
|
basemd5.update(base_rom_bytes)
|
|
if CHECKSUM_BLUE != basemd5.hexdigest():
|
|
raise Exception('Supplied Base Rom does not match US GBA Blue Version.'
|
|
'Please provide the correct ROM version')
|
|
|
|
get_base_rom_bytes.base_rom_bytes = base_rom_bytes
|
|
return base_rom_bytes
|
|
|
|
|
|
def get_patched_rom_bytes(file_name: str = "") -> bytes:
|
|
"""
|
|
Gets the patched ROM data generated from applying the ap-patch diff file to the provided ROM.
|
|
Diff patch generated by https://github.com/digiholic/bn3-ap-patch
|
|
Which should contain all changed text banks and assembly code
|
|
"""
|
|
import pkgutil
|
|
base_rom_bytes = get_base_rom_bytes(file_name)
|
|
patch_bytes = pkgutil.get_data(__name__, "data/bn3-ap-patch.bsdiff")
|
|
patched_rom_bytes = bsdiff4.patch(base_rom_bytes, patch_bytes)
|
|
return patched_rom_bytes
|