Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Lib/test/libregrtest/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def __init__(self, **kwargs) -> None:
self._add_python_opts = True
self.xmlpath = None
self.single_process = False
self.single_process_per_case = False

super().__init__(**kwargs)

Expand Down Expand Up @@ -368,6 +369,11 @@ def _create_parser():
group.add_argument('--list-cases', action='store_true',
help='only write the name of test cases that will be run, '
'don\'t execute them')
group.add_argument('--single-process-per-case', action='store_true',
help='run each test case in its own process. '
'(slow; for debugging order dependencies '
"and environment leaks). Test cases from "
'the same module run sequentially.')
group.add_argument('-P', '--pgo', dest='pgo', action='store_true',
help='enable Profile Guided Optimization (PGO) training')
group.add_argument('--pgo-extended', action='store_true',
Expand Down
40 changes: 32 additions & 8 deletions Lib/test/libregrtest/findtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,43 @@ def list_cases(tests: TestTuple, *,
test_dir: StrPath | None = None) -> None:
support.verbose = False
set_match_tests(match_tests)
cases_by_module, skipped = collect_cases(tests, match_tests=match_tests,
test_dir=test_dir)
for cases in cases_by_module.values():
for case_id in cases:
print(case_id)
if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)

skipped = []
def collect_cases(tests: TestTuple, *,
match_tests: TestFilter | None = None,
test_dir: StrPath | None = None
) -> tuple[dict[TestName, list[str]], list[TestName]]:
result: dict[TestName, list[str]] = {}
skipped: list[TestName] = []
for test_name in tests:
module_name = abs_module_name(test_name, test_dir)
cases: list[str] = []
try:
suite = unittest.defaultTestLoader.loadTestsFromName(module_name)
_list_cases(suite)
_collect_cases(suite, cases)
except unittest.SkipTest:
skipped.append(test_name)
continue
if cases:
result[test_name] = cases
return result, skipped

if skipped:
sys.stdout.flush()
stderr = sys.stderr
print(file=stderr)
print(count(len(skipped), "test"), "skipped:", file=stderr)
printlist(skipped, file=stderr)
def _collect_cases(suite: unittest.TestSuite, out: list[str]) -> None:
for test in suite:
if isinstance(test, unittest.loader._FailedTest):
continue
if isinstance(test, unittest.TestSuite):
_collect_cases(test, out)
elif isinstance(test, unittest.TestCase):
if match_test(test):
out.append(test.id())
25 changes: 24 additions & 1 deletion Lib/test/libregrtest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from test.support import os_helper, MS_WINDOWS, flush_std_streams

from .cmdline import _parse_args, Namespace
from .findtests import findtests, split_test_packages, list_cases
from .findtests import findtests, split_test_packages, list_cases, collect_cases
from .logger import Logger
from .pgo import setup_pgo_tests
from .result import TestResult
Expand Down Expand Up @@ -73,6 +73,7 @@ def __init__(self, ns: Namespace, _add_python_opts: bool = False):
self.want_header: bool = ns.header
self.want_list_tests: bool = ns.list_tests
self.want_list_cases: bool = ns.list_cases
self.want_single_process_per_case: bool = ns.single_process_per_case
self.want_wait: bool = ns.wait
self.want_cleanup: bool = ns.cleanup
self.want_rerun: bool = ns.rerun
Expand Down Expand Up @@ -521,6 +522,8 @@ def create_run_tests(self, tests: TestTuple) -> RunTests:
randomize=self.randomize,
random_seed=self.random_seed,
parallel_threads=self.parallel_threads,
single_process_per_case=self.want_single_process_per_case,
case_groups=None,
)

def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
Expand All @@ -529,6 +532,10 @@ def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
"less than 3 warmup repetitions can give false positives!")
print(msg, file=sys.stdout, flush=True)

if self.want_single_process_per_case and self.num_workers == 0:
# Each test case must run in its own subprocess
self.num_workers = 1

if self.num_workers < 0:
# Use all CPUs + 2 extra worker processes for tests
# that like to sleep
Expand All @@ -546,6 +553,22 @@ def _run_tests(self, selected: TestTuple, tests: TestList | None) -> int:
print("Using random seed:", self.random_seed)

runtests = self.create_run_tests(selected)
if self.want_single_process_per_case:
cases_by_module, _ = collect_cases(
selected,
match_tests=self.match_tests,
test_dir=self.test_dir)
case_groups = tuple(
(module_name, tuple(cases))
for module_name, cases in cases_by_module.items()
)
if not case_groups:
self.log("No test cases found")
return 0
case_ids = tuple(
case_id for _, cases in case_groups for case_id in cases
)
runtests = runtests.copy(tests=case_ids, case_groups=case_groups)
self.first_runtests = runtests
self.logger.set_tests(runtests)

Expand Down
99 changes: 90 additions & 9 deletions Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ def stop(self):
with self.lock:
self.tests_iter = None

class GroupedMultiprocessIterator:
"""Provide test groups safely across multiple worker threads."""

def __init__(self, groups_iter):
self.lock = threading.Lock()
self.groups_iter = groups_iter

def next_group(self):
with self.lock:
if self.groups_iter is None:
return None
try:
return next(self.groups_iter)
except StopIteration:
return None

def stop(self):
with self.lock:
self.groups_iter = None


@dataclasses.dataclass(slots=True, frozen=True)
class MultiprocessResult:
Expand Down Expand Up @@ -119,6 +139,7 @@ def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
self._popen: subprocess.Popen[str] | None = None
self._killed = False
self._stopped = False
self.current_module: TestName = ""

def __repr__(self) -> str:
info = [f'WorkerThread #{self.worker_id}']
Expand Down Expand Up @@ -267,15 +288,18 @@ def create_json_file(self, stack: contextlib.ExitStack) -> tuple[JsonFile, TextI
return (json_file, json_tmpfile)

def create_worker_runtests(self, test_name: TestName, json_file: JsonFile) -> WorkerRunTests:
tests = (test_name,)
if self.runtests.rerun:
match_tests = self.runtests.get_match_tests(test_name)
kwargs: dict[str, Any] = {}

if self.runtests.single_process_per_case:
tests = (self.current_module,)
kwargs['match_tests'] = [(test_name, True)]
else:
match_tests = None
tests = (test_name,)
if self.runtests.rerun:
match_tests = self.runtests.get_match_tests(test_name)
if match_tests:
kwargs['match_tests'] = [(test, True) for test in match_tests]

kwargs: dict[str, Any] = {}
if match_tests:
kwargs['match_tests'] = [(test, True) for test in match_tests]
if self.runtests.output_on_failure:
kwargs['verbose'] = True
kwargs['output_on_failure'] = False
Expand Down Expand Up @@ -388,6 +412,13 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult:
return MultiprocessResult(result, stdout)

def run(self) -> None:
if self.runtests.single_process_per_case:
self._run_grouped()
else:
self._run_flat()

def _run_flat(self) -> None:
"""Original behavior: one test name (module) per iteration."""
fail_fast = self.runtests.fail_fast
fail_env_changed = self.runtests.fail_env_changed
try:
Expand Down Expand Up @@ -417,6 +448,52 @@ def run(self) -> None:
finally:
self.output.put(WorkerThreadExited())

def _run_grouped(self) -> None:
"""Execute all tests in a group on the same thread before moving on."""
fail_fast = self.runtests.fail_fast
fail_env_changed = self.runtests.fail_env_changed
try:
while not self._stopped:
group = self.pending.next_group()
if group is None:
break

module_name, case_ids = group
must_stop = False
for test_name in case_ids:
if self._stopped:
break
self.current_module = module_name
self.start_time = time.monotonic()
self.test_name = test_name
try:
mp_result = self._runtest(test_name)
except WorkerError as exc:
mp_result = exc.mp_result
finally:
self.test_name = _NOT_RUNNING

mp_result = dataclasses.replace(
mp_result,
result=dataclasses.replace(
mp_result.result,
test_name=test_name,
duration=time.monotonic() - self.start_time))

self.output.put((False, mp_result))
if mp_result.result.must_stop(fail_fast, fail_env_changed):
must_stop = True
break

if must_stop:
break
except ExitThread:
pass
except BaseException:
self.output.put((True, traceback.format_exc()))
finally:
self.output.put(WorkerThreadExited())

def _wait_completed(self) -> None:
popen = self._popen
# only needed for mypy:
Expand Down Expand Up @@ -486,8 +563,12 @@ def __init__(self, num_workers: int, runtests: RunTests,
self.live_worker_count = 0

self.output: queue.Queue[QueueContent] = queue.Queue()
tests_iter = runtests.iter_tests()
self.pending = MultiprocessIterator(tests_iter)
if runtests.single_process_per_case:
groups_iter = runtests.iter_case_groups()
self.pending = GroupedMultiprocessIterator(groups_iter)
else:
tests_iter = runtests.iter_tests()
self.pending = MultiprocessIterator(tests_iter)
self.timeout = runtests.timeout
if self.timeout is not None:
# Rely on faulthandler to kill a worker process. This timouet is
Expand Down
16 changes: 16 additions & 0 deletions Lib/test/libregrtest/runtests.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class RunTests:
randomize: bool
random_seed: int | str
parallel_threads: int | None
single_process_per_case: bool
case_groups: tuple[tuple[TestName, tuple[TestName, ...]], ...] | None

def copy(self, **override) -> 'RunTests':
state = dataclasses.asdict(self)
Expand Down Expand Up @@ -132,6 +134,20 @@ def iter_tests(self) -> Iterator[TestName]:
else:
yield from self.tests

def iter_case_groups(self) -> Iterator[tuple[TestName, tuple[TestName, ...]]]:
"""
Yield (module_name, case_ids) pairs. All case_ids in a group
must run sequentially on the same worker thread.
"""
if self.case_groups is None:
for name in self.iter_tests():
yield (name, (name,))
elif self.forever:
while True:
yield from self.case_groups
else:
yield from self.case_groups

def json_file_use_stdout(self) -> bool:
# Use STDOUT in two cases:
#
Expand Down
Loading