BizHawkClient: Use built-ins for typing (#4508)

This commit is contained in:
Bryce Wilson 2025-01-19 01:23:06 -08:00 committed by GitHub
parent 0bb657d2c8
commit 9183e8f9c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 24 additions and 24 deletions

View File

@ -10,7 +10,7 @@ import base64
import enum
import json
import sys
import typing
from typing import Any, Sequence
BIZHAWK_SOCKET_PORT_RANGE_START = 43055
@ -44,10 +44,10 @@ class SyncError(Exception):
class BizHawkContext:
streams: typing.Optional[typing.Tuple[asyncio.StreamReader, asyncio.StreamWriter]]
streams: tuple[asyncio.StreamReader, asyncio.StreamWriter] | None
connection_status: ConnectionStatus
_lock: asyncio.Lock
_port: typing.Optional[int]
_port: int | None
def __init__(self) -> None:
self.streams = None
@ -122,12 +122,12 @@ async def get_script_version(ctx: BizHawkContext) -> int:
return int(await ctx._send_message("VERSION"))
async def send_requests(ctx: BizHawkContext, req_list: typing.List[typing.Dict[str, typing.Any]]) -> typing.List[typing.Dict[str, typing.Any]]:
async def send_requests(ctx: BizHawkContext, req_list: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Sends a list of requests to the BizHawk connector and returns their responses.
It's likely you want to use the wrapper functions instead of this."""
responses = json.loads(await ctx._send_message(json.dumps(req_list)))
errors: typing.List[ConnectorError] = []
errors: list[ConnectorError] = []
for response in responses:
if response["type"] == "ERROR":
@ -180,7 +180,7 @@ async def get_system(ctx: BizHawkContext) -> str:
return res["value"]
async def get_cores(ctx: BizHawkContext) -> typing.Dict[str, str]:
async def get_cores(ctx: BizHawkContext) -> dict[str, str]:
"""Gets the preferred cores for systems with multiple cores. Only systems with multiple available cores have
entries."""
res = (await send_requests(ctx, [{"type": "PREFERRED_CORES"}]))[0]
@ -233,8 +233,8 @@ async def set_message_interval(ctx: BizHawkContext, value: float) -> None:
raise SyncError(f"Expected response of type SET_MESSAGE_INTERVAL_RESPONSE but got {res['type']}")
async def guarded_read(ctx: BizHawkContext, read_list: typing.Sequence[typing.Tuple[int, int, str]],
guard_list: typing.Sequence[typing.Tuple[int, typing.Sequence[int], str]]) -> typing.Optional[typing.List[bytes]]:
async def guarded_read(ctx: BizHawkContext, read_list: Sequence[tuple[int, int, str]],
guard_list: Sequence[tuple[int, Sequence[int], str]]) -> list[bytes] | None:
"""Reads an array of bytes at 1 or more addresses if and only if every byte in guard_list matches its expected
value.
@ -262,7 +262,7 @@ async def guarded_read(ctx: BizHawkContext, read_list: typing.Sequence[typing.Tu
"domain": domain
} for address, size, domain in read_list])
ret: typing.List[bytes] = []
ret: list[bytes] = []
for item in res:
if item["type"] == "GUARD_RESPONSE":
if not item["value"]:
@ -276,7 +276,7 @@ async def guarded_read(ctx: BizHawkContext, read_list: typing.Sequence[typing.Tu
return ret
async def read(ctx: BizHawkContext, read_list: typing.Sequence[typing.Tuple[int, int, str]]) -> typing.List[bytes]:
async def read(ctx: BizHawkContext, read_list: Sequence[tuple[int, int, str]]) -> list[bytes]:
"""Reads data at 1 or more addresses.
Items in `read_list` should be organized `(address, size, domain)` where
@ -288,8 +288,8 @@ async def read(ctx: BizHawkContext, read_list: typing.Sequence[typing.Tuple[int,
return await guarded_read(ctx, read_list, [])
async def guarded_write(ctx: BizHawkContext, write_list: typing.Sequence[typing.Tuple[int, typing.Sequence[int], str]],
guard_list: typing.Sequence[typing.Tuple[int, typing.Sequence[int], str]]) -> bool:
async def guarded_write(ctx: BizHawkContext, write_list: Sequence[tuple[int, Sequence[int], str]],
guard_list: Sequence[tuple[int, Sequence[int], str]]) -> bool:
"""Writes data to 1 or more addresses if and only if every byte in guard_list matches its expected value.
Items in `write_list` should be organized `(address, value, domain)` where
@ -326,7 +326,7 @@ async def guarded_write(ctx: BizHawkContext, write_list: typing.Sequence[typing.
return True
async def write(ctx: BizHawkContext, write_list: typing.Sequence[typing.Tuple[int, typing.Sequence[int], str]]) -> None:
async def write(ctx: BizHawkContext, write_list: Sequence[tuple[int, Sequence[int], str]]) -> None:
"""Writes data to 1 or more addresses.
Items in write_list should be organized `(address, value, domain)` where

View File

@ -5,7 +5,7 @@ A module containing the BizHawkClient base class and metaclass
from __future__ import annotations
import abc
from typing import TYPE_CHECKING, Any, ClassVar, Dict, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, ClassVar
from worlds.LauncherComponents import Component, SuffixIdentifier, Type, components, launch_subprocess
@ -24,9 +24,9 @@ components.append(component)
class AutoBizHawkClientRegister(abc.ABCMeta):
game_handlers: ClassVar[Dict[Tuple[str, ...], Dict[str, BizHawkClient]]] = {}
game_handlers: ClassVar[dict[tuple[str, ...], dict[str, BizHawkClient]]] = {}
def __new__(cls, name: str, bases: Tuple[type, ...], namespace: Dict[str, Any]) -> AutoBizHawkClientRegister:
def __new__(cls, name: str, bases: tuple[type, ...], namespace: dict[str, Any]) -> AutoBizHawkClientRegister:
new_class = super().__new__(cls, name, bases, namespace)
# Register handler
@ -54,7 +54,7 @@ class AutoBizHawkClientRegister(abc.ABCMeta):
return new_class
@staticmethod
async def get_handler(ctx: "BizHawkClientContext", system: str) -> Optional[BizHawkClient]:
async def get_handler(ctx: "BizHawkClientContext", system: str) -> BizHawkClient | None:
for systems, handlers in AutoBizHawkClientRegister.game_handlers.items():
if system in systems:
for handler in handlers.values():
@ -65,13 +65,13 @@ class AutoBizHawkClientRegister(abc.ABCMeta):
class BizHawkClient(abc.ABC, metaclass=AutoBizHawkClientRegister):
system: ClassVar[Union[str, Tuple[str, ...]]]
system: ClassVar[str | tuple[str, ...]]
"""The system(s) that the game this client is for runs on"""
game: ClassVar[str]
"""The game this client is for"""
patch_suffix: ClassVar[Optional[Union[str, Tuple[str, ...]]]]
patch_suffix: ClassVar[str | tuple[str, ...] | None]
"""The file extension(s) this client is meant to open and patch (e.g. ".apz3")"""
@abc.abstractmethod

View File

@ -6,7 +6,7 @@ checking or launching the client, otherwise it will probably cause circular impo
import asyncio
import enum
import subprocess
from typing import Any, Dict, Optional
from typing import Any
from CommonClient import CommonContext, ClientCommandProcessor, get_base_parser, server_loop, logger, gui_enabled
import Patch
@ -43,15 +43,15 @@ class BizHawkClientContext(CommonContext):
command_processor = BizHawkClientCommandProcessor
auth_status: AuthStatus
password_requested: bool
client_handler: Optional[BizHawkClient]
slot_data: Optional[Dict[str, Any]] = None
rom_hash: Optional[str] = None
client_handler: BizHawkClient | None
slot_data: dict[str, Any] | None = None
rom_hash: str | None = None
bizhawk_ctx: BizHawkContext
watcher_timeout: float
"""The maximum amount of time the game watcher loop will wait for an update from the server before executing"""
def __init__(self, server_address: Optional[str], password: Optional[str]):
def __init__(self, server_address: str | None, password: str | None):
super().__init__(server_address, password)
self.auth_status = AuthStatus.NOT_AUTHENTICATED
self.password_requested = False