diff --git a/BaseClasses.py b/BaseClasses.py index 5e0f91b5..10223cf9 100644 --- a/BaseClasses.py +++ b/BaseClasses.py @@ -33,14 +33,20 @@ class MultiWorld(): dark_room_logic: Dict[int, str] restrict_dungeon_item_on_boss: Dict[int, bool] plando_texts: List[Dict[str, str]] - plando_items: List + plando_items: List[List[Dict[str, Any]]] plando_connections: List worlds: Dict[int, Any] groups: Dict[int, Group] + itempool: List[Item] is_race: bool = False precollected_items: Dict[int, List[Item]] state: CollectionState + accessibility: Dict[int, Options.Accessibility] + local_items: Dict[int, Options.LocalItems] + non_local_items: Dict[int, Options.NonLocalItems] + progression_balancing: Dict[int, Options.ProgressionBalancing] + class AttributeProxy(): def __init__(self, rule): self.rule = rule @@ -65,7 +71,7 @@ class MultiWorld(): self._cached_entrances = None self._cached_locations = None self._entrance_cache = {} - self._location_cache = {} + self._location_cache: Dict[Tuple[str, int], Location] = {} self.required_locations = [] self.light_world_light_cone = False self.dark_world_light_cone = False @@ -387,7 +393,7 @@ class MultiWorld(): def clear_location_cache(self): self._cached_locations = None - def get_unfilled_locations(self, player=None) -> List[Location]: + def get_unfilled_locations(self, player: Optional[int] = None) -> List[Location]: if player is not None: return [location for location in self.get_locations() if location.player == player and not location.item] @@ -396,13 +402,13 @@ class MultiWorld(): def get_unfilled_dungeon_locations(self): return [location for location in self.get_locations() if not location.item and location.parent_region.dungeon] - def get_filled_locations(self, player=None) -> List[Location]: + def get_filled_locations(self, player: Optional[int] = None) -> List[Location]: if player is not None: return [location for location in self.get_locations() if location.player == player and location.item is not None] return [location for location in self.get_locations() if location.item is not None] - def get_reachable_locations(self, state=None, player=None) -> List[Location]: + def get_reachable_locations(self, state: Optional[CollectionState] = None, player: Optional[int] = None) -> List[Location]: if state is None: state = self.state return [location for location in self.get_locations() if @@ -414,7 +420,7 @@ class MultiWorld(): return [location for location in self.get_locations() if (player is None or location.player == player) and location.item is None and location.can_reach(state)] - def get_unfilled_locations_for_players(self, locations, players: Iterable[int]): + def get_unfilled_locations_for_players(self, locations: List[str], players: Iterable[int]): for player in players: if len(locations) == 0: locations = [location.name for location in self.get_unfilled_locations(player)] @@ -423,7 +429,7 @@ class MultiWorld(): if location is not None and location.item is None: yield location - def unlocks_new_location(self, item) -> bool: + def unlocks_new_location(self, item: Item) -> bool: temp_state = self.state.copy() temp_state.collect(item, True) @@ -433,7 +439,7 @@ class MultiWorld(): return False - def has_beaten_game(self, state: CollectionState, player: Optional[int] = None): + def has_beaten_game(self, state: CollectionState, player: Optional[int] = None) -> bool: if player: return self.completion_condition[player](state) else: @@ -617,7 +623,10 @@ class CollectionState(): ret = function(self, ret) return ret - def can_reach(self, spot: Union[Location, Entrance, Region, str], resolution_hint=None, player=None) -> bool: + def can_reach(self, + spot: Union[Location, Entrance, Region, str], + resolution_hint: Optional[str] = None, + player: Optional[int] = None) -> bool: if not hasattr(spot, "can_reach"): # try to resolve a name if resolution_hint == 'Location': @@ -833,7 +842,7 @@ class CollectionState(): def can_bomb_clip(self, region: Region, player: int) -> bool: return self.is_not_bunny(region, player) and self.has('Pegasus Boots', player) - def collect(self, item: Item, event: bool = False, location: Location = None) -> bool: + def collect(self, item: Item, event: bool = False, location: Optional[Location] = None) -> bool: if location: self.locations_checked.add(location) diff --git a/Fill.py b/Fill.py index f35e22a2..eda8e67e 100644 --- a/Fill.py +++ b/Fill.py @@ -13,7 +13,7 @@ class FillError(RuntimeError): pass -def sweep_from_pool(base_state: CollectionState, itempool: typing.Sequence[Item] = tuple()): +def sweep_from_pool(base_state: CollectionState, itempool: typing.Sequence[Item] = tuple()) -> CollectionState: new_state = base_state.copy() for item in itempool: new_state.collect(item, True) @@ -22,12 +22,12 @@ def sweep_from_pool(base_state: CollectionState, itempool: typing.Sequence[Item] def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations: typing.List[Location], - itempool: typing.List[Item], single_player_placement=False, lock=False): - unplaced_items = [] + itempool: typing.List[Item], single_player_placement: bool = False, lock: bool = False) -> None: + unplaced_items: typing.List[Item] = [] placements: typing.List[Location] = [] - swapped_items = Counter() - reachable_items: typing.Dict[int, deque] = {} + swapped_items: typing.Counter[typing.Tuple[int, str]] = Counter() + reachable_items: typing.Dict[int, typing.Deque[Item]] = {} for item in itempool: reachable_items.setdefault(item.player, deque()).append(item) @@ -46,7 +46,8 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations: spot_to_fill: typing.Optional[Location] = None if world.accessibility[item_to_place.player] == 'minimal': perform_access_check = not world.has_beaten_game(maximum_exploration_state, - item_to_place.player) if single_player_placement else not has_beaten_game + item_to_place.player) \ + if single_player_placement else not has_beaten_game else: perform_access_check = True @@ -127,18 +128,18 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations: itempool.extend(unplaced_items) -def distribute_items_restrictive(world: MultiWorld): +def distribute_items_restrictive(world: MultiWorld) -> None: fill_locations = sorted(world.get_unfilled_locations()) world.random.shuffle(fill_locations) # get items to distribute itempool = sorted(world.itempool) world.random.shuffle(itempool) - progitempool = [] - nonexcludeditempool = [] - localrestitempool = {player: [] for player in range(1, world.players + 1)} - nonlocalrestitempool = [] - restitempool = [] + progitempool: typing.List[Item] = [] + nonexcludeditempool: typing.List[Item] = [] + localrestitempool: typing.Dict[int, typing.List[Item]] = {player: [] for player in range(1, world.players + 1)} + nonlocalrestitempool: typing.List[Item] = [] + restitempool: typing.List[Item] = [] for item in itempool: if item.advancement: @@ -188,7 +189,7 @@ def distribute_items_restrictive(world: MultiWorld): world.random.shuffle(defaultlocations) if any(localrestitempool.values()): # we need to make sure some fills are limited to certain worlds - local_locations = {player: [] for player in world.player_ids} + local_locations: typing.Dict[int, typing.List[Location]] = {player: [] for player in world.player_ids} for location in defaultlocations: local_locations[location.player].append(location) for player_locations in local_locations.values(): @@ -232,15 +233,16 @@ def distribute_items_restrictive(world: MultiWorld): logging.info(f'Per-Player counts: {print_data})') -def fast_fill(world: MultiWorld, item_pool: typing.List, fill_locations: typing.List) -> typing.Tuple[ - typing.List, typing.List]: +def fast_fill(world: MultiWorld, + item_pool: typing.List[Item], + fill_locations: typing.List[Location]) -> typing.Tuple[typing.List[Item], typing.List[Location]]: placing = min(len(item_pool), len(fill_locations)) for item, location in zip(item_pool, fill_locations): world.push_item(location, item, False) return item_pool[placing:], fill_locations[placing:] -def flood_items(world: MultiWorld): +def flood_items(world: MultiWorld) -> None: # get items to distribute world.random.shuffle(world.itempool) itempool = world.itempool @@ -279,7 +281,8 @@ def flood_items(world: MultiWorld): item_to_place = item break - # we might be in a situation where all new locations require multiple items to reach. If that is the case, just place any advancement item we've found and continue trying + # we might be in a situation where all new locations require multiple items to reach. + # If that is the case, just place any advancement item we've found and continue trying if item_to_place is None: if candidate_item_to_place is not None: item_to_place = candidate_item_to_place @@ -300,7 +303,7 @@ def flood_items(world: MultiWorld): break -def balance_multiworld_progression(world: MultiWorld): +def balance_multiworld_progression(world: MultiWorld) -> None: # A system to reduce situations where players have no checks remaining, popularly known as "BK mode." # Overall progression balancing algorithm: # Gather up all locations in a sphere. @@ -313,24 +316,30 @@ def balance_multiworld_progression(world: MultiWorld): else: logging.info(f'Balancing multiworld progression for {len(balanceable_players)} Players.') state = CollectionState(world) - checked_locations = set() + checked_locations: typing.Set[Location] = set() unchecked_locations = set(world.get_locations()) - reachable_locations_count = {player: 0 for player in world.player_ids if len(world.get_filled_locations(player)) != 0} + reachable_locations_count = { + player: 0 + for player in world.player_ids + if len(world.get_filled_locations(player)) != 0 + } total_locations_count = Counter(location.player for location in world.get_locations() if not location.locked) balanceable_players = {player for player in balanceable_players if total_locations_count[player]} sphere_num = 1 moved_item_count = 0 - def get_sphere_locations(sphere_state, locations): + def get_sphere_locations(sphere_state: CollectionState, + locations: typing.Set[Location]) -> typing.Set[Location]: sphere_state.sweep_for_events(key_only=True, locations=locations) return {loc for loc in locations if sphere_state.can_reach(loc)} - def item_percentage(player, num): + def item_percentage(player: int, num: int) -> float: return num / total_locations_count[player] while True: - # Gather non-locked locations. This ensures that only shuffled locations get counted for progression balancing, + # Gather non-locked locations. + # This ensures that only shuffled locations get counted for progression balancing, # i.e. the items the players will be checking. sphere_locations = get_sphere_locations(state, unchecked_locations) for location in sphere_locations: @@ -340,12 +349,18 @@ def balance_multiworld_progression(world: MultiWorld): logging.debug(f"Sphere {sphere_num}") logging.debug(f"Reachable locations: {reachable_locations_count}") - logging.debug(f"Reachable percentages: { {player: round(item_percentage(player, num), 2) for player, num in reachable_locations_count.items()} }\n") + debug_percentages = { + player: round(item_percentage(player, num), 2) + for player, num in reachable_locations_count.items() + } + logging.debug(f"Reachable percentages: {debug_percentages}\n") sphere_num += 1 if checked_locations: - # The 10% threshold can be modified for "progression balancing strength" -- right now it approximates the old 20/216 bound. - threshold_percentage = max(map(lambda p: item_percentage(p, reachable_locations_count[p]), reachable_locations_count)) - 0.10 + # The 10% threshold can be modified for "progression balancing strength" + # right now it approximates the old 20/216 bound. + threshold_percentage = max(map(lambda p: item_percentage(p, reachable_locations_count[p]), + reachable_locations_count)) - 0.10 logging.debug(f"Threshold: {threshold_percentage}") balancing_players = {player for player, reachables in reachable_locations_count.items() if item_percentage(player, reachables) < threshold_percentage and player in balanceable_players} @@ -354,7 +369,7 @@ def balance_multiworld_progression(world: MultiWorld): balancing_unchecked_locations = unchecked_locations.copy() balancing_reachables = reachable_locations_count.copy() balancing_sphere = sphere_locations.copy() - candidate_items = collections.defaultdict(set) + candidate_items: typing.Dict[int, typing.Set[Location]] = collections.defaultdict(set) while True: # Check locations in the current sphere and gather progression items to swap earlier for location in balancing_sphere: @@ -380,19 +395,21 @@ def balance_multiworld_progression(world: MultiWorld): elif not balancing_sphere: raise RuntimeError('Not all required items reachable. Something went terribly wrong here.') # Gather a set of locations which we can swap items into - unlocked_locations = collections.defaultdict(set) + unlocked_locations: typing.Dict[int, typing.Set[Location]] = collections.defaultdict(set) for l in unchecked_locations: if l not in balancing_unchecked_locations: unlocked_locations[l.player].add(l) - items_to_replace = [] + items_to_replace: typing.List[Location] = [] for player in balancing_players: locations_to_test = unlocked_locations[player] items_to_test = candidate_items[player] while items_to_test: testing = items_to_test.pop() reducing_state = state.copy() - for location in itertools.chain((l for l in items_to_replace if l.item.player == player), - items_to_test): + for location in itertools.chain(( + l for l in items_to_replace + if l.item.player == player + ), items_to_test): reducing_state.collect(location.item, True, location) reducing_state.sweep_for_events(locations=locations_to_test) @@ -402,7 +419,8 @@ def balance_multiworld_progression(world: MultiWorld): items_to_replace.append(testing) else: reduced_sphere = get_sphere_locations(reducing_state, locations_to_test) - if item_percentage(player, reachable_locations_count[player] + len(reduced_sphere)) < threshold_percentage: + p = item_percentage(player, reachable_locations_count[player] + len(reduced_sphere)) + if p < threshold_percentage: items_to_replace.append(testing) replaced_items = False @@ -452,7 +470,7 @@ def balance_multiworld_progression(world: MultiWorld): break -def swap_location_item(location_1: Location, location_2: Location, check_locked=True): +def swap_location_item(location_1: Location, location_2: Location, check_locked: bool = True) -> None: """Swaps Items of locations. Does NOT swap flags like shop_slot or locked, but does swap event""" if check_locked: if location_1.locked: @@ -465,14 +483,14 @@ def swap_location_item(location_1: Location, location_2: Location, check_locked= location_1.event, location_2.event = location_2.event, location_1.event -def distribute_planned(world: MultiWorld): - def warn(warning: str, force): +def distribute_planned(world: MultiWorld) -> None: + def warn(warning: str, force: typing.Union[bool, str]) -> None: if force in [True, 'fail', 'failure', 'none', False, 'warn', 'warning']: logging.warning(f'{warning}') else: logging.debug(f'{warning}') - def failed(warning: str, force): + def failed(warning: str, force: typing.Union[bool, str]) -> None: if force in [True, 'fail', 'failure']: raise Exception(warning) else: @@ -482,7 +500,8 @@ def distribute_planned(world: MultiWorld): from worlds.alttp.Regions import key_drop_data world_name_lookup = world.world_name_lookup - plando_blocks = [] + block_value = typing.Union[typing.List[str], typing.Dict[str, typing.Any], str] + plando_blocks: typing.List[typing.Dict[str, typing.Any]] = [] player_ids = set(world.player_ids) for player in player_ids: for block in world.plando_items[player]: @@ -493,7 +512,7 @@ def distribute_planned(world: MultiWorld): block['from_pool'] = True if 'world' not in block: block['world'] = False - items = [] + items: block_value = [] if "items" in block: items = block["items"] if 'count' not in block: @@ -506,7 +525,7 @@ def distribute_planned(world: MultiWorld): failed("You must specify at least one item to place items with plando.", block['force']) continue if isinstance(items, dict): - item_list = [] + item_list: typing.List[str] = [] for key, value in items.items(): if value is True: value = world.itempool.count(world.worlds[player].create_item(key)) @@ -516,7 +535,7 @@ def distribute_planned(world: MultiWorld): items = [items] block['items'] = items - locations = [] + locations: block_value = [] if 'location' in block: locations = block['location'] # just allow 'location' to keep old yamls compatible elif 'locations' in block: @@ -529,8 +548,6 @@ def distribute_planned(world: MultiWorld): for key, value in locations.items(): location_list += [key] * value locations = location_list - if isinstance(locations, str): - locations = [locations] block['locations'] = locations if not block['count']: @@ -572,20 +589,19 @@ def distribute_planned(world: MultiWorld): maxcount = placement['count']['target'] from_pool = placement['from_pool'] if target_world is False or world.players == 1: # target own world - worlds = {player} + worlds: typing.Set[int] = {player} elif target_world is True: # target any worlds besides own worlds = set(world.player_ids) - {player} elif target_world is None: # target all worlds worlds = set(world.player_ids) elif type(target_world) == list: # list of target worlds - worlds = [] + worlds = set() for listed_world in target_world: if listed_world not in world_name_lookup: failed(f"Cannot place item to {target_world}'s world as that world does not exist.", placement['force']) continue - worlds.append(world_name_lookup[listed_world]) - worlds = set(worlds) + worlds.add(world_name_lookup[listed_world]) elif type(target_world) == int: # target world by slot number if target_world not in range(1, world.players + 1): failed( @@ -605,8 +621,8 @@ def distribute_planned(world: MultiWorld): world.random.shuffle(candidates) world.random.shuffle(items) count = 0 - err = [] - successful_pairs = [] + err: typing.List[str] = [] + successful_pairs: typing.List[typing.Tuple[Item, Location]] = [] for item_name in items: item = world.worlds[player].create_item(item_name) for location in reversed(candidates): @@ -617,7 +633,7 @@ def distribute_planned(world: MultiWorld): if not location.item: if location.item_rule(item): if location.can_fill(world.state, item, False): - successful_pairs.append([item, location]) + successful_pairs.append((item, location)) candidates.remove(location) count = count + 1 break @@ -630,10 +646,9 @@ def distribute_planned(world: MultiWorld): if count == maxcount: break if count < placement['count']['min']: - err = " ".join(err) m = placement['count']['min'] failed( - f"Plando block failed to place {m - count} of {m} item(s) for {world.player_name[player]}, error(s): {err}", + f"Plando block failed to place {m - count} of {m} item(s) for {world.player_name[player]}, error(s): {' '.join(err)}", placement['force']) for (item, location) in successful_pairs: world.push_item(location, item, collect=False) diff --git a/Options.py b/Options.py index b26d55df..e20f5098 100644 --- a/Options.py +++ b/Options.py @@ -53,9 +53,11 @@ class AssembleOptions(type): return super(AssembleOptions, mcs).__new__(mcs, name, bases, attrs) +T = typing.TypeVar('T') -class Option(metaclass=AssembleOptions): - value: int + +class Option(typing.Generic[T], metaclass=AssembleOptions): + value: T name_lookup: typing.Dict[int, str] default = 0 @@ -98,7 +100,7 @@ class Option(metaclass=AssembleOptions): raise NotImplementedError -class Toggle(Option): +class Toggle(Option[int]): option_false = 0 option_true = 1 default = 0 @@ -150,7 +152,7 @@ class DefaultOnToggle(Toggle): default = 1 -class Choice(Option): +class Choice(Option[int]): auto_display_name = True def __init__(self, value: int): @@ -207,7 +209,7 @@ class Choice(Option): __hash__ = Option.__hash__ # see https://docs.python.org/3/reference/datamodel.html#object.__hash__ -class Range(Option, int): +class Range(Option[int], int): range_start = 0 range_end = 1 @@ -300,10 +302,9 @@ class VerifyKeys: f"is not a valid location name from {world.game}") -class OptionDict(Option, VerifyKeys): +class OptionDict(Option[typing.Dict[str, typing.Any]], VerifyKeys): default = {} supports_weighting = False - value: typing.Dict[str, typing.Any] def __init__(self, value: typing.Dict[str, typing.Any]): self.value = value @@ -332,10 +333,9 @@ class ItemDict(OptionDict): super(ItemDict, self).__init__(value) -class OptionList(Option, VerifyKeys): +class OptionList(Option[typing.List[typing.Any]], VerifyKeys): default = [] supports_weighting = False - value: list def __init__(self, value: typing.List[typing.Any]): self.value = value or [] @@ -359,10 +359,9 @@ class OptionList(Option, VerifyKeys): return item in self.value -class OptionSet(Option, VerifyKeys): +class OptionSet(Option[typing.Set[str]], VerifyKeys): default = frozenset() supports_weighting = False - value: set def __init__(self, value: typing.Union[typing.Set[str, typing.Any], typing.List[str, typing.Any]]): self.value = set(value)