diff --git a/Lib/test/libregrtest/cmdline.py b/Lib/test/libregrtest/cmdline.py index 64c035307e6654..4ee4c207307fd5 100644 --- a/Lib/test/libregrtest/cmdline.py +++ b/Lib/test/libregrtest/cmdline.py @@ -194,6 +194,7 @@ def __init__(self, **kwargs) -> None: self._add_python_opts = True self.xmlpath = None self.single_process = False + self.single_process_per_case = False super().__init__(**kwargs) @@ -368,6 +369,11 @@ def _create_parser(): group.add_argument('--list-cases', action='store_true', help='only write the name of test cases that will be run, ' 'don\'t execute them') + group.add_argument('--single-process-per-case', action='store_true', + help='run each test case in its own process. ' + '(slow; for debugging order dependencies ' + "and environment leaks). Test cases from " + 'the same module run sequentially.') group.add_argument('-P', '--pgo', dest='pgo', action='store_true', help='enable Profile Guided Optimization (PGO) training') group.add_argument('--pgo-extended', action='store_true', diff --git a/Lib/test/libregrtest/findtests.py b/Lib/test/libregrtest/findtests.py index 6c0e50846a466b..9794cf7309e6fb 100644 --- a/Lib/test/libregrtest/findtests.py +++ b/Lib/test/libregrtest/findtests.py @@ -94,19 +94,43 @@ def list_cases(tests: TestTuple, *, test_dir: StrPath | None = None) -> None: support.verbose = False set_match_tests(match_tests) + cases_by_module, skipped = collect_cases(tests, match_tests=match_tests, + test_dir=test_dir) + for cases in cases_by_module.values(): + for case_id in cases: + print(case_id) + if skipped: + sys.stdout.flush() + stderr = sys.stderr + print(file=stderr) + print(count(len(skipped), "test"), "skipped:", file=stderr) + printlist(skipped, file=stderr) - skipped = [] +def collect_cases(tests: TestTuple, *, + match_tests: TestFilter | None = None, + test_dir: StrPath | None = None + ) -> tuple[dict[TestName, list[str]], list[TestName]]: + result: dict[TestName, list[str]] = {} + skipped: list[TestName] = [] for test_name in tests: module_name = abs_module_name(test_name, test_dir) + cases: list[str] = [] try: suite = unittest.defaultTestLoader.loadTestsFromName(module_name) - _list_cases(suite) + _collect_cases(suite, cases) except unittest.SkipTest: skipped.append(test_name) + continue + if cases: + result[test_name] = cases + return result, skipped - if skipped: - sys.stdout.flush() - stderr = sys.stderr - print(file=stderr) - print(count(len(skipped), "test"), "skipped:", file=stderr) - printlist(skipped, file=stderr) +def _collect_cases(suite: unittest.TestSuite, out: list[str]) -> None: + for test in suite: + if isinstance(test, unittest.loader._FailedTest): + continue + if isinstance(test, unittest.TestSuite): + _collect_cases(test, out) + elif isinstance(test, unittest.TestCase): + if match_test(test): + out.append(test.id()) diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py index 8773e9df73263b..0e31859cd9dfaf 100644 --- a/Lib/test/libregrtest/main.py +++ b/Lib/test/libregrtest/main.py @@ -12,7 +12,7 @@ from test.support import os_helper, MS_WINDOWS, flush_std_streams from .cmdline import _parse_args, Namespace -from .findtests import findtests, split_test_packages, list_cases +from .findtests import findtests, split_test_packages, list_cases, collect_cases from .logger import Logger from .pgo import setup_pgo_tests from .result import TestResult @@ -73,6 +73,7 @@ def __init__(self, ns: Namespace, _add_python_opts: bool = False): self.want_header: bool = ns.header self.want_list_tests: bool = ns.list_tests self.want_list_cases: bool = ns.list_cases + self.want_single_process_per_case: bool = ns.single_process_per_case self.want_wait: bool = ns.wait self.want_cleanup: bool = ns.cleanup self.want_rerun: bool = ns.rerun @@ -521,6 +522,8 @@ def create_run_tests(self, tests: TestTuple) -> RunTests: randomize=self.randomize, random_seed=self.random_seed, parallel_threads=self.parallel_threads, + single_process_per_case=self.want_single_process_per_case, + case_groups=None, ) def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int: @@ -529,6 +532,10 @@ def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int: "less than 3 warmup repetitions can give false positives!") print(msg, file=sys.stdout, flush=True) + if self.want_single_process_per_case and self.num_workers == 0: + # Each test case must run in its own subprocess + self.num_workers = 1 + if self.num_workers < 0: # Use all CPUs + 2 extra worker processes for tests # that like to sleep @@ -546,6 +553,22 @@ def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int: print("Using random seed:", self.random_seed) runtests = self.create_run_tests(selected) + if self.want_single_process_per_case: + cases_by_module, _ = collect_cases( + selected, + match_tests=self.match_tests, + test_dir=self.test_dir) + case_groups = tuple( + (module_name, tuple(cases)) + for module_name, cases in cases_by_module.items() + ) + if not case_groups: + self.log("No test cases found") + return 0 + case_ids = tuple( + case_id for _, cases in case_groups for case_id in cases + ) + runtests = runtests.copy(tests=case_ids, case_groups=case_groups) self.first_runtests = runtests self.logger.set_tests(runtests) diff --git a/Lib/test/libregrtest/run_workers.py b/Lib/test/libregrtest/run_workers.py index befdac7ee77f10..4c845842cd1fc0 100644 --- a/Lib/test/libregrtest/run_workers.py +++ b/Lib/test/libregrtest/run_workers.py @@ -70,6 +70,26 @@ def stop(self): with self.lock: self.tests_iter = None +class GroupedMultiprocessIterator: + """Provide test groups safely across multiple worker threads.""" + + def __init__(self, groups_iter): + self.lock = threading.Lock() + self.groups_iter = groups_iter + + def next_group(self): + with self.lock: + if self.groups_iter is None: + return None + try: + return next(self.groups_iter) + except StopIteration: + return None + + def stop(self): + with self.lock: + self.groups_iter = None + @dataclasses.dataclass(slots=True, frozen=True) class MultiprocessResult: @@ -119,6 +139,7 @@ def __init__(self, worker_id: int, runner: "RunWorkers") -> None: self._popen: subprocess.Popen[str] | None = None self._killed = False self._stopped = False + self.current_module: TestName = "" def __repr__(self) -> str: info = [f'WorkerThread #{self.worker_id}'] @@ -267,15 +288,18 @@ def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextI return (json_file, json_tmpfile) def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> WorkerRunTests: - tests = (test_name,) - if self.runtests.rerun: - match_tests = self.runtests.get_match_tests(test_name) + kwargs: dict[str, Any] = {} + + if self.runtests.single_process_per_case: + tests = (self.current_module,) + kwargs['match_tests'] = [(test_name, True)] else: - match_tests = None + tests = (test_name,) + if self.runtests.rerun: + match_tests = self.runtests.get_match_tests(test_name) + if match_tests: + kwargs['match_tests'] = [(test, True) for test in match_tests] - kwargs: dict[str, Any] = {} - if match_tests: - kwargs['match_tests'] = [(test, True) for test in match_tests] if self.runtests.output_on_failure: kwargs['verbose'] = True kwargs['output_on_failure'] = False @@ -388,6 +412,13 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult: return MultiprocessResult(result, stdout) def run(self) -> None: + if self.runtests.single_process_per_case: + self._run_grouped() + else: + self._run_flat() + + def _run_flat(self) -> None: + """Original behavior: one test name (module) per iteration.""" fail_fast = self.runtests.fail_fast fail_env_changed = self.runtests.fail_env_changed try: @@ -417,6 +448,52 @@ def run(self) -> None: finally: self.output.put(WorkerThreadExited()) + def _run_grouped(self) -> None: + """Execute all tests in a group on the same thread before moving on.""" + fail_fast = self.runtests.fail_fast + fail_env_changed = self.runtests.fail_env_changed + try: + while not self._stopped: + group = self.pending.next_group() + if group is None: + break + + module_name, case_ids = group + must_stop = False + for test_name in case_ids: + if self._stopped: + break + self.current_module = module_name + self.start_time = time.monotonic() + self.test_name = test_name + try: + mp_result = self._runtest(test_name) + except WorkerError as exc: + mp_result = exc.mp_result + finally: + self.test_name = _NOT_RUNNING + + mp_result = dataclasses.replace( + mp_result, + result=dataclasses.replace( + mp_result.result, + test_name=test_name, + duration=time.monotonic() - self.start_time)) + + self.output.put((False, mp_result)) + if mp_result.result.must_stop(fail_fast, fail_env_changed): + must_stop = True + break + + if must_stop: + break + except ExitThread: + pass + except BaseException: + self.output.put((True, traceback.format_exc())) + finally: + self.output.put(WorkerThreadExited()) + def _wait_completed(self) -> None: popen = self._popen # only needed for mypy: @@ -486,8 +563,12 @@ def __init__(self, num_workers: int, runtests: RunTests, self.live_worker_count = 0 self.output: queue.Queue[QueueContent] = queue.Queue() - tests_iter = runtests.iter_tests() - self.pending = MultiprocessIterator(tests_iter) + if runtests.single_process_per_case: + groups_iter = runtests.iter_case_groups() + self.pending = GroupedMultiprocessIterator(groups_iter) + else: + tests_iter = runtests.iter_tests() + self.pending = MultiprocessIterator(tests_iter) self.timeout = runtests.timeout if self.timeout is not None: # Rely on faulthandler to kill a worker process. This timouet is diff --git a/Lib/test/libregrtest/runtests.py b/Lib/test/libregrtest/runtests.py index 0a9edce1085be5..fbb04b5b705ece 100644 --- a/Lib/test/libregrtest/runtests.py +++ b/Lib/test/libregrtest/runtests.py @@ -101,6 +101,8 @@ class RunTests: randomize: bool random_seed: int | str parallel_threads: int | None + single_process_per_case: bool + case_groups: tuple[tuple[TestName, tuple[TestName, ...]], ...] | None def copy(self, **override) -> 'RunTests': state = dataclasses.asdict(self) @@ -132,6 +134,20 @@ def iter_tests(self) -> Iterator[TestName]: else: yield from self.tests + def iter_case_groups(self) -> Iterator[tuple[TestName, tuple[TestName, ...]]]: + """ + Yield (module_name, case_ids) pairs. All case_ids in a group + must run sequentially on the same worker thread. + """ + if self.case_groups is None: + for name in self.iter_tests(): + yield (name, (name,)) + elif self.forever: + while True: + yield from self.case_groups + else: + yield from self.case_groups + def json_file_use_stdout(self) -> bool: # Use STDOUT in two cases: #