From 947a10aed3b255a58a19c88d1efcce403e2d950b Mon Sep 17 00:00:00 2001 From: Thomas Kpenou Date: Thu, 25 Jun 2026 07:56:21 -0400 Subject: [PATCH] test: fix flaky PTY env test (read output after stream close) test_PYTHONUNBUFFERED intermittently failed on CI with "'1' != None": the PYTHONUNBUFFERED line was missing from the captured env. Root cause is a race in how the test read the output. start() reads the process output on a separate thread that pushes chunks into a ReplayObservable and close()s it when done. Reading via read_until_closed() while that thread is still pushing races ReplayObservable.subscribe (replay buffered chunks, then register) against push (append chunk, fire to observers): a chunk pushed in the gap is neither replayed nor delivered, so the last env line could be dropped. The tiny, fast printenv.sh output made this most likely (and 3.14/loaded runners hit it). Fix the tests to wait for output_stream.wait_close() before reading: once closed there is no concurrent pusher, and close() keeps the replay buffer, so the late subscribe replays every chunk deterministically. Applied via a shared _run_and_read_all() helper to the env tests and the unicode/encoding tests (same latent race). No production code changed. Co-Authored-By: Claude Opus 4.8 --- src/tests/execution/process_pty_test.py | 39 ++++++++++++------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/src/tests/execution/process_pty_test.py b/src/tests/execution/process_pty_test.py index 80260ae5..f724a4d0 100644 --- a/src/tests/execution/process_pty_test.py +++ b/src/tests/execution/process_pty_test.py @@ -1,5 +1,4 @@ import os -import threading import unittest from pathlib import Path @@ -9,6 +8,22 @@ from utils import file_utils +def _run_and_read_all(process_wrapper): + """Start the process and return its full output, deterministically. + + The output is read on a separate thread (started by start()) that pushes + chunks into a ReplayObservable and closes it once reading is done. Reading + the output via read_until_closed() *while* that thread is still pushing races + the (unsynchronised) ReplayObservable.subscribe vs push and can drop the last + chunk — the source of the flaky test_PYTHONUNBUFFERED failure. Waiting for the + stream to close first means there is no concurrent pusher, and close() keeps + the replay buffer, so the late subscribe replays every chunk. + """ + process_wrapper.start() + process_wrapper.output_stream.wait_close() + return ''.join(read_until_closed(process_wrapper.output_stream)) + + class TestEnvironmentVariables(unittest.TestCase): def test_default_variables(self): @@ -38,11 +53,7 @@ def execute_and_get_passed_env(custom_variables): env_variables = test_utils.env_variables process_wrapper = PtyProcessWrapper( str(Path(__file__).parent.parent / 'scripts' / 'printenv.sh'), '.', env_variables.build_env_vars(custom_variables)) - process_wrapper.start() - thread = threading.Thread(target=process_wrapper.wait_finish, daemon=True) - thread.start() - thread.join(timeout=0.1) - output = ''.join(read_until_closed(process_wrapper.output_stream)) + output = _run_and_read_all(process_wrapper) lines = output.split('\n') env_dict = {line.split('=', 2)[0]: line.split('=', 2)[1] for line in lines if '=' in line} return env_dict @@ -54,13 +65,7 @@ def test_many_unicode_characters(self): test_utils.create_file('test.txt', text=long_unicode_text) process_wrapper = PtyProcessWrapper(['cat', 'test.txt'], test_utils.temp_folder, {}) - process_wrapper.start() - - thread = threading.Thread(target=process_wrapper.wait_finish, daemon=True) - thread.start() - thread.join(timeout=0.1) - - output = ''.join(read_until_closed(process_wrapper.output_stream)) + output = _run_and_read_all(process_wrapper) self.assertEqual(long_unicode_text, output) self.assertEqual(0, process_wrapper.get_return_code()) @@ -73,13 +78,7 @@ def test_mixed_encoding(self): byte_content=True) process_wrapper = PtyProcessWrapper(['cat', 'test.txt'], test_utils.temp_folder, {}) - process_wrapper.start() - - thread = threading.Thread(target=process_wrapper.wait_finish, daemon=True) - thread.start() - thread.join(timeout=0.1) - - output = ''.join(read_until_closed(process_wrapper.output_stream)) + output = _run_and_read_all(process_wrapper) self.assertEqual('gültig\n läuft verändert für Ändern \nPrüfung gültig läuft ࠀ 𒀀!', output) def setUp(self):