Changes to patch.py (#65)
* Changes to patch.py * Get hos/port from host.yaml if applicable. * Extract rom/host information from existing patch files. * Give users a chance to read the info as required. * Update patches in zip file with new host. (Kind of wish I didn't have to create a new zip file to do this, but unfortunately there is no options in zipfile to overwrite/delete existing files.)
This commit is contained in:
		
							parent
							
								
									5996d6d978
								
							
						
					
					
						commit
						4a3d941d20
					
				
							
								
								
									
										67
									
								
								Patch.py
								
								
								
								
							
							
						
						
									
										67
									
								
								Patch.py
								
								
								
								
							| 
						 | 
				
			
			@ -3,6 +3,9 @@ import yaml
 | 
			
		|||
import os
 | 
			
		||||
import lzma
 | 
			
		||||
import hashlib
 | 
			
		||||
import threading
 | 
			
		||||
import concurrent.futures
 | 
			
		||||
import zipfile
 | 
			
		||||
from typing import Tuple, Optional
 | 
			
		||||
 | 
			
		||||
import Utils
 | 
			
		||||
| 
						 | 
				
			
			@ -56,6 +59,13 @@ def create_rom_file(patch_file) -> Tuple[dict, str]:
 | 
			
		|||
    return data["meta"], target
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def update_patch_data(patch_data: bytes, server: str = "") -> bytes:
 | 
			
		||||
    data = Utils.parse_yaml(lzma.decompress(patch_data).decode("utf-8-sig"))
 | 
			
		||||
    data["meta"]["server"] = server
 | 
			
		||||
    bytes = generate_patch(data["patch"], data["meta"])
 | 
			
		||||
    return lzma.compress(bytes)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def load_bytes(path: str):
 | 
			
		||||
    with open(path, "rb") as f:
 | 
			
		||||
        return f.read()
 | 
			
		||||
| 
						 | 
				
			
			@ -66,11 +76,58 @@ def write_lzma(data: bytes, path: str):
 | 
			
		|||
        f.write(data)
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    ipv4 = Utils.get_public_ipv4()
 | 
			
		||||
    host = Utils.get_public_ipv4()
 | 
			
		||||
    try:
 | 
			
		||||
        options = Utils.get_options()['server_options']
 | 
			
		||||
        if options['host']:
 | 
			
		||||
            ipv4 = options['host'] + ":" + str(options['port'])
 | 
			
		||||
        else:
 | 
			
		||||
            ipv4 = host + ":" + options['port']
 | 
			
		||||
    except:
 | 
			
		||||
        ipv4 = host + ":38281"
 | 
			
		||||
 | 
			
		||||
    ziplock = threading.Lock()
 | 
			
		||||
 | 
			
		||||
    print(f"Host for patches to be created is {ipv4}")
 | 
			
		||||
    import sys
 | 
			
		||||
 | 
			
		||||
    Processed = False
 | 
			
		||||
    for rom in sys.argv:
 | 
			
		||||
        if rom.endswith(".sfc"):
 | 
			
		||||
            print(f"Creating patch for {rom}")
 | 
			
		||||
            result = create_patch_file(rom, ipv4)
 | 
			
		||||
            print(f"Created patch {result}")
 | 
			
		||||
        Processed |= rom.endswith(".sfc") or rom.endswith(".bmbp") or rom.endswith(".zip")
 | 
			
		||||
        try:
 | 
			
		||||
            if rom.endswith(".sfc"):
 | 
			
		||||
                print(f"Creating patch for {rom}")
 | 
			
		||||
                result = create_patch_file(rom, ipv4)
 | 
			
		||||
                print(f"Created patch {result}")
 | 
			
		||||
            elif rom.endswith(".bmbp"):
 | 
			
		||||
                print(f"Applying patch {rom}")
 | 
			
		||||
                data, target = create_rom_file(rom)
 | 
			
		||||
                print(f"Created rom {target}.")
 | 
			
		||||
                if 'server' in data:
 | 
			
		||||
                    print(f"Host is {data['server']}")
 | 
			
		||||
            elif rom.endswith(".zip"):
 | 
			
		||||
                print(f"Updating host in patch files contained in {rom}")
 | 
			
		||||
                def _handle_zip_file_entry(zfinfo : zipfile.ZipInfo, server: str):
 | 
			
		||||
                    data = zfr.read(zfinfo)
 | 
			
		||||
                    if zfinfo.filename.endswith(".bmbp"):
 | 
			
		||||
                        data = update_patch_data(data, server)
 | 
			
		||||
                    with ziplock:
 | 
			
		||||
                        zfw.writestr(zfinfo, data)
 | 
			
		||||
                    return zfinfo.filename
 | 
			
		||||
 | 
			
		||||
                with concurrent.futures.ThreadPoolExecutor() as pool:
 | 
			
		||||
                    futures = []
 | 
			
		||||
                    with zipfile.ZipFile(rom, "r") as zfr:
 | 
			
		||||
                        updated_zip = os.path.splitext(rom)[0] + "_updated.zip"
 | 
			
		||||
                        with zipfile.ZipFile(updated_zip, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zfw:
 | 
			
		||||
                            for zfname in zfr.namelist():
 | 
			
		||||
                                futures.append(pool.submit(_handle_zip_file_entry, zfr.getinfo(zfname), ipv4))
 | 
			
		||||
                            for future in futures:
 | 
			
		||||
                                print(f"File {future.result()} added to {os.path.split(updated_zip)[1]}")
 | 
			
		||||
 | 
			
		||||
        except:
 | 
			
		||||
            import traceback
 | 
			
		||||
            traceback.print_exc()
 | 
			
		||||
 | 
			
		||||
    if Processed:
 | 
			
		||||
        input("Press enter to close.")
 | 
			
		||||
		Loading…
	
		Reference in New Issue