Add parallel test item with thread-aware stdout routing
The parallel item runs branches concurrently with sync:all or sync:any policy and optional per-branch wait_for synchronization. Each branch runs in its own daemon thread and produces a clean per-item entry in the SQLite report; the live output is prefixed [<branch_name>] so concurrent branches stay readable. Supporting changes: - StdoutProxy (lib/stdout_redirect.py): thread-aware sys.stdout/stderr with per-thread capture buffers and per-branch live-output prefix. Adds writeln() for Python 3.14 unittest compatibility. - TestItemContainer: shared base extracted from Group/Cycle for the sequential children execution pattern. - TestItemSleep: interruptible loop polling _is_stopped so sync:any can cancel slow branches quickly. - TestReport: thread-safe SQLite (check_same_thread=False + lock). Also drops the unused -m/--terminal mode and its module. Validation: 11 scenarios in test/validation/items/parallel covering sync:all/any, no_fail, wait_for + timeout, conditions, multi-branch, nested parallel, parallel inside loop, real branch failure. Documentation: new parallel_test_item.rst added to the manual. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -1,8 +1,88 @@
|
||||
import sys
|
||||
import threading
|
||||
from threading import (Thread, Event)
|
||||
from lib.string_queue import StringQueue
|
||||
from time import (sleep)
|
||||
|
||||
|
||||
class StdoutProxy:
|
||||
"""Thread-aware stdout proxy.
|
||||
|
||||
Each writing thread can be associated with:
|
||||
- a per-thread buffer (StringQueue) where its writes are captured for the
|
||||
per-item SQLite log column;
|
||||
- a 'branch' label, used to prefix each line in the live (parent-visible)
|
||||
output stream so concurrent branches are easy to read.
|
||||
|
||||
Threads with no association fall back to the default buffer (the "main"
|
||||
thread's buffer) and write to live output without prefix.
|
||||
"""
|
||||
|
||||
def __init__(self, live_stream, default_buffer):
|
||||
self.live_stream = live_stream
|
||||
self.default_buffer = default_buffer
|
||||
self._buffers = {}
|
||||
self._branches = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def register(self, tid=None, buffer=None, branch=None):
|
||||
if tid is None:
|
||||
tid = threading.get_ident()
|
||||
with self._lock:
|
||||
if buffer is not None:
|
||||
self._buffers[tid] = buffer
|
||||
if branch is not None:
|
||||
self._branches[tid] = branch
|
||||
|
||||
def unregister(self, tid=None):
|
||||
if tid is None:
|
||||
tid = threading.get_ident()
|
||||
with self._lock:
|
||||
self._buffers.pop(tid, None)
|
||||
self._branches.pop(tid, None)
|
||||
|
||||
def get_buffer(self, tid=None):
|
||||
if tid is None:
|
||||
tid = threading.get_ident()
|
||||
with self._lock:
|
||||
return self._buffers.get(tid, self.default_buffer)
|
||||
|
||||
def write(self, s):
|
||||
if not s:
|
||||
return
|
||||
tid = threading.get_ident()
|
||||
with self._lock:
|
||||
buf = self._buffers.get(tid, self.default_buffer)
|
||||
branch = self._branches.get(tid)
|
||||
# Per-thread capture: clean, no prefix
|
||||
buf.write(s)
|
||||
# Live stream: prefix each line with the branch label
|
||||
if branch:
|
||||
self.live_stream.write(self._prefix(s, f'[{branch}] '))
|
||||
else:
|
||||
self.live_stream.write(s)
|
||||
|
||||
@staticmethod
|
||||
def _prefix(s, prefix):
|
||||
ends_nl = s.endswith('\n')
|
||||
body = s[:-1] if ends_nl else s
|
||||
if body == '':
|
||||
return s
|
||||
prefixed = '\n'.join(prefix + line for line in body.split('\n'))
|
||||
if ends_nl:
|
||||
prefixed += '\n'
|
||||
return prefixed
|
||||
|
||||
def writeln(self, s=''):
|
||||
self.write(s + '\n')
|
||||
|
||||
def flush(self):
|
||||
try:
|
||||
self.live_stream.flush()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
class StdioRedirect:
|
||||
|
||||
def __init__(self):
|
||||
@@ -28,48 +108,38 @@ class StdioRedirect:
|
||||
|
||||
def intercept(self):
|
||||
if not self.spy_enabled:
|
||||
self.thr_started = Event()
|
||||
self.log_buf = StringQueue()
|
||||
self.in_stream = StringQueue()
|
||||
self.stop_output = Event()
|
||||
self.thrd_out = Thread(target=self.interceptStdOut)
|
||||
self.thrd_out.daemon = True
|
||||
sys.stdout = self.in_stream
|
||||
sys.stderr = self.in_stream
|
||||
self.stream = self.in_stream
|
||||
self.thrd_out.start()
|
||||
self.thr_started.wait()
|
||||
self.log_buf = StringQueue() # default buffer (main thread)
|
||||
self.proxy = StdoutProxy(self.out_stream, self.log_buf)
|
||||
sys.stdout = self.proxy
|
||||
sys.stderr = self.proxy
|
||||
self.stream = self.proxy
|
||||
self.spy_enabled = True
|
||||
|
||||
|
||||
def stop(self):
|
||||
if self.spy_enabled:
|
||||
sys.stdout = self.out_stream
|
||||
sys.stderr = self.out_stream
|
||||
self.stream = self.out_stream
|
||||
self.stop_output.set()
|
||||
self.thrd_out.join()
|
||||
del self.log_buf
|
||||
del self.in_stream
|
||||
del self.stop_output
|
||||
del self.thrd_out
|
||||
del self.thr_started
|
||||
del self.proxy
|
||||
|
||||
self.spy_enabled = False
|
||||
|
||||
def interceptStdOut(self):
|
||||
self.thr_started.set()
|
||||
while not self.stop_output.is_set():
|
||||
data = self.in_stream.read()
|
||||
self.log_buf.write(data)
|
||||
self.out_stream.write(data)
|
||||
if data == '':
|
||||
sleep(0.1)
|
||||
|
||||
def read(self):
|
||||
ret = ''
|
||||
"""Read accumulated content from the calling thread's buffer."""
|
||||
if not self.spy_enabled:
|
||||
return ''
|
||||
return self.proxy.get_buffer().read()
|
||||
|
||||
def register_thread(self, buffer=None, branch=None):
|
||||
"""Register the calling thread's per-thread buffer and/or branch label."""
|
||||
if self.spy_enabled:
|
||||
ret = self.log_buf.read()
|
||||
return ret
|
||||
self.proxy.register(buffer=buffer, branch=branch)
|
||||
|
||||
def unregister_thread(self):
|
||||
"""Drop the calling thread's registration."""
|
||||
if self.spy_enabled:
|
||||
self.proxy.unregister()
|
||||
|
||||
|
||||
stdio_redir = StdioRedirect()
|
||||
|
||||
Reference in New Issue
Block a user