141 lines
5.5 KiB
Python
141 lines
5.5 KiB
Python
"""
|
|
Copy of the script in test/benchmark, adapted to Stardew Valley.
|
|
|
|
Run with `python -m worlds.stardew_valley.test.script.benchmark_locations --options minimal_locations_maximal_items`
|
|
"""
|
|
|
|
import argparse
|
|
import collections
|
|
import gc
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import typing
|
|
|
|
from BaseClasses import CollectionState, Location
|
|
from Utils import init_logging
|
|
from worlds.stardew_valley.stardew_rule.rule_explain import explain
|
|
from ... import test
|
|
|
|
|
|
def run_locations_benchmark():
|
|
init_logging("Benchmark Runner")
|
|
logger = logging.getLogger("Benchmark")
|
|
|
|
class BenchmarkRunner:
|
|
gen_steps: typing.Tuple[str, ...] = (
|
|
"generate_early", "create_regions", "create_items", "set_rules", "generate_basic", "pre_fill")
|
|
rule_iterations: int = 100_000
|
|
|
|
@staticmethod
|
|
def format_times_from_counter(counter: collections.Counter[str], top: int = 5) -> str:
|
|
return "\n".join(f" {time:.4f} in {name}" for name, time in counter.most_common(top))
|
|
|
|
def location_test(self, test_location: Location, state: CollectionState, state_name: str) -> float:
|
|
with TimeIt(f"{test_location.game} {self.rule_iterations} "
|
|
f"runs of {test_location}.access_rule({state_name})", logger) as t:
|
|
for _ in range(self.rule_iterations):
|
|
test_location.access_rule(state)
|
|
# if time is taken to disentangle complex ref chains,
|
|
# this time should be attributed to the rule.
|
|
gc.collect()
|
|
return t.dif
|
|
|
|
def main(self):
|
|
game = "Stardew Valley"
|
|
summary_data: typing.Dict[str, collections.Counter[str]] = {
|
|
"empty_state": collections.Counter(),
|
|
"all_state": collections.Counter(),
|
|
}
|
|
try:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--options', help="Define the option set to use, from the preset in test/__init__.py .", type=str, required=True)
|
|
parser.add_argument('--seed', help="Define the seed to use.", type=int, required=True)
|
|
parser.add_argument('--location', help="Define the specific location to benchmark.", type=str, default=None)
|
|
parser.add_argument('--state', help="Define the state in which the location will be benchmarked.", type=str, default=None)
|
|
args = parser.parse_args()
|
|
options_set = args.options
|
|
options = getattr(test, options_set)()
|
|
seed = args.seed
|
|
location = args.location
|
|
state = args.state
|
|
|
|
multiworld = test.setup_solo_multiworld(options, seed)
|
|
gc.collect()
|
|
|
|
if location:
|
|
locations = [multiworld.get_location(location, 1)]
|
|
else:
|
|
locations = sorted(multiworld.get_unfilled_locations())
|
|
|
|
all_state = multiworld.get_all_state(False)
|
|
for location in locations:
|
|
if state != "all_state":
|
|
time_taken = self.location_test(location, multiworld.state, "empty_state")
|
|
summary_data["empty_state"][location.name] = time_taken
|
|
|
|
if state != "empty_state":
|
|
time_taken = self.location_test(location, all_state, "all_state")
|
|
summary_data["all_state"][location.name] = time_taken
|
|
|
|
total_empty_state = sum(summary_data["empty_state"].values())
|
|
total_all_state = sum(summary_data["all_state"].values())
|
|
|
|
logger.info(f"{game} took {total_empty_state / len(locations):.4f} "
|
|
f"seconds per location in empty_state and {total_all_state / len(locations):.4f} "
|
|
f"in all_state. (all times summed for {self.rule_iterations} runs.)")
|
|
logger.info(f"Top times in empty_state:\n"
|
|
f"{self.format_times_from_counter(summary_data['empty_state'])}")
|
|
logger.info(f"Top times in all_state:\n"
|
|
f"{self.format_times_from_counter(summary_data['all_state'])}")
|
|
|
|
if len(locations) == 1:
|
|
logger.info(str(explain(locations[0].access_rule, all_state, False)))
|
|
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
|
|
runner = BenchmarkRunner()
|
|
runner.main()
|
|
|
|
|
|
class TimeIt:
|
|
def __init__(self, name: str, time_logger=None):
|
|
self.name = name
|
|
self.logger = time_logger
|
|
self.timer = None
|
|
self.end_timer = None
|
|
|
|
def __enter__(self):
|
|
self.timer = time.perf_counter()
|
|
return self
|
|
|
|
@property
|
|
def dif(self):
|
|
return self.end_timer - self.timer
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
if not self.end_timer:
|
|
self.end_timer = time.perf_counter()
|
|
if self.logger:
|
|
self.logger.info(f"{self.dif:.4f} seconds in {self.name}.")
|
|
|
|
|
|
def change_home():
|
|
"""Allow scripts to run from "this" folder."""
|
|
old_home = os.path.dirname(__file__)
|
|
sys.path.remove(old_home)
|
|
new_home = os.path.normpath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
|
|
os.chdir(new_home)
|
|
sys.path.append(new_home)
|
|
# fallback to local import
|
|
sys.path.append(old_home)
|
|
|
|
from Utils import local_path
|
|
local_path.cached_path = new_home
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_locations_benchmark()
|