diff --git a/DESIGN.md b/DESIGN.md index 22a685e..9a1bd26 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -97,6 +97,15 @@ All dialog items (`dialog_image`, `dialog_question`, `dialog_references`, `dialo - For the live stream (terminal in batch / GUI panel), prefixes every line emitted from a branch's thread with `[] ` so concurrent branches stay readable. - Exposes `write` / `writeln` / `flush` (Python 3.14's `unittest` calls `stream.writeln()` directly without `_WritelnDecorator`). +### Subprocess RPC startup handshake (py_func / lua_func / eval_proc) + +The parent ↔ subprocess JSON-RPC link runs over a localhost TCP socket. The **subprocess** owns the port: it binds `port 0` (OS-assigned), `listen()`s, then prints `__TESTIUM_RPC_PORT__=` on stdout (constant `RPC_PORT_SENTINEL` in `runtime/jrpc.py`). The parent reads that line (`proc_drain.drain_and_read_port` + `wait_for_port`, deadline `gd("proc_start_timeout", 30)`) and only *then* connects — the server is guaranteed to be listening, so the connect succeeds on the first attempt. + +This replaced the previous fragile scheme (parent reserved a port via `bind(0)`+close, child re-bound the same port, parent connected on a timing guess) which broke intermittently on Windows: cold-start/antivirus variance pushed the worker past the connect deadline, and `connect()` to a not-yet-listening localhost port *times out* (≈1 s) instead of refusing, exhausting the retry budget. Notes: +- The server no longer sets `SO_REUSEADDR` (a fresh ephemeral port needs no TIME_WAIT override; on Windows it would enable port hijacking). +- `JsonRpcBase.wait_ready()` always settles (event set on success **and** failure) and returns the actual connection outcome — a connect failure no longer hangs a `wait_ready()` caller. +- Non-sentinel subprocess stdout/stderr is still forwarded to the parent log (early-startup errors stay visible). + ### Subprocess API contract (py_func / lua_func) User test scripts running inside a `py_func` or `lua_func` subprocess **must** use the JSON-RPC bridge to interact with testium state: @@ -279,6 +288,7 @@ The `testium_assist` editor extension is a thin LSP client that spawns `testium Both Flatpak and AppImage export `TESTIUM_VERSION` from a launcher (Flatpak: launcher script in `org.testium.Testium.yaml`; AppImage: `runtime.env` in `AppImageBuilder.yml`). `get_testium_version()` checks `/.flatpak-info` / `APPIMAGE` and reads `TESTIUM_VERSION` rather than relying on package metadata or repo introspection. ## Recent fixes / notable changes +- Subprocess RPC startup handshake: the `py_func`/`lua_func`/`eval_proc` worker now picks its own port (`bind 0`), announces it on stdout (`__TESTIUM_RPC_PORT__=`), and the parent connects only after reading it. Fixes intermittent Windows `failed to connect : timeout` and the matching `wait_ready()` hang; removes the reserve/close/rebind race and `SO_REUSEADDR`. See "Subprocess RPC startup handshake". - `build_all.sh`: builds the four heavy channels in parallel (serial prep for the shared venv + wheel), results in completion order, Ctrl+C kills the whole job tree; `--ram` puts the build scratch on tmpfs (`/dev/shm`) + skips UPX for fast builds on USB/SD storage (Flatpak excluded — rofiles-fuse can't mount tmpfs). See the "Building all channels" section. - LSP across packaging channels: `testium lsp` (and the `testium_assist` editor extension that spawns it) now works from source, wheel, PyInstaller, Flatpak and AppImage. Two enablers — (1) action items declare a class-level `ACTIONS = {key: class}` registry (like `PARAMS`), so `lsp/schema.py` builds the full schema from class attributes with no `inspect.getsource`/AST (which broke under frozen PyInstaller); (2) the `[lsp]` extra (pygls) is wired into every full-app channel. `test/validation/lsp_check.py`, run by `run.sh` before the suite, asserts per-channel that `schema` keeps its actions and `lsp` answers `initialize`. See the matching architecture sections. - Declarative test item parameters (v0.2): each `TestItem` subclass exposes a `PARAMS = ParamSet(...)` class attribute consumed by the base `__init__`. Catches unknown YAML keys (typo warnings listing the accepted names) and missing required params (load-time errors with `.tum` context). Lays the schema foundation for a future LSP server and auto-generated manual sections. See the matching architecture section. diff --git a/src/testium/interpreter/test_items/test_item_py_func.py b/src/testium/interpreter/test_items/test_item_py_func.py index ccd90c8..09cbfed 100644 --- a/src/testium/interpreter/test_items/test_item_py_func.py +++ b/src/testium/interpreter/test_items/test_item_py_func.py @@ -90,7 +90,7 @@ class TestItemPyFunc(TestItem): if not engine.is_alive(): engine.start() - if not engine.wait_ready(): + if not engine.wait_ready(10): raise ETUMRuntimeError( f"""Impossible to start the external python execution process. Is the python path correct ? diff --git a/src/testium/interpreter/utils/lua_process.py b/src/testium/interpreter/utils/lua_process.py index ec21eec..7a30052 100644 --- a/src/testium/interpreter/utils/lua_process.py +++ b/src/testium/interpreter/utils/lua_process.py @@ -1,14 +1,13 @@ import os import sys import subprocess -import socket import api.testium as tm from runtime.jrpc import JsonRpcClient from interpreter.utils.paths import subproc_path from runtime.tum_except import ETUMRuntimeError from interpreter.utils import bins -from interpreter.utils.proc_drain import drain_to_log +from interpreter.utils.proc_drain import drain_and_read_port, wait_for_port class LuaProcessBase: @@ -79,12 +78,7 @@ class LuaProcessBase: else: env[k] = e + ";" + env.get(k, "") - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(("localhost", 0)) - self._port = sock.getsockname()[1] - sock.close() - - # POpen params + # POpen params (port 0 -> the Lua server picks a free port and reports it) cmd_args = [ "main.lua", "--timeout", @@ -92,7 +86,7 @@ class LuaProcessBase: "--host", "127.0.0.1", "--port", - f"{self._port}", + "0", ] if tm.debug_enabled() and tm.gd("debug_rpc", False): @@ -122,10 +116,18 @@ class LuaProcessBase: restore_signals=False, **popen_kwargs, ) - # Route subprocess stdout/stderr (lua require failures, syntax - # errors, anything written to fd 1/2 before the in-script - # remote_print is set up) into the parent's log. - drain_to_log(self._process, prefix="[lua_func] ") + # Route subprocess stdout/stderr into the parent's log and read the + # startup port sentinel before connecting. + holder = drain_and_read_port(self._process, prefix="[lua_func] ") + self._port = wait_for_port( + self._process, holder, tm.gd("proc_start_timeout", 30) + ) + if self._port is None: + # Worker died before announcing its port: tear down fully so a + # later start() (e.g. a reused context_id engine) can retry cleanly. + self.stop() + self.join() + return self._rpc = JsonRpcClient( "localhost", self._port, req_handler=self._req_handler diff --git a/src/testium/interpreter/utils/proc_drain.py b/src/testium/interpreter/utils/proc_drain.py index 4dd0ccf..d6d039a 100644 --- a/src/testium/interpreter/utils/proc_drain.py +++ b/src/testium/interpreter/utils/proc_drain.py @@ -8,6 +8,9 @@ exceptions before the in-process redirection kicks in, lua ``require`` failures, anything written to fd 1/2 directly). """ import threading +from time import monotonic + +from runtime.jrpc import RPC_PORT_SENTINEL def _drain_pipe(pipe, prefix): @@ -46,3 +49,62 @@ def drain_to_log(process, prefix=""): t.start() threads.append(t) return threads + + +def drain_and_read_port(process, prefix=""): + """Like :func:`drain_to_log`, but the stdout reader also watches for the + startup port sentinel. Returns a ``holder`` dict (passed to + :func:`wait_for_port`); all non-sentinel lines are still forwarded to the + log. stderr is drained as usual. + """ + holder = {"port": None, "evt": threading.Event()} + + def _read_stdout(pipe): + try: + for raw in iter(pipe.readline, b""): + line = raw.decode("utf-8", errors="replace").rstrip("\r\n") + if holder["port"] is None and line.startswith(RPC_PORT_SENTINEL): + try: + holder["port"] = int(line[len(RPC_PORT_SENTINEL):].strip()) + except ValueError: + continue + holder["evt"].set() + continue + if line: + print(f"{prefix}{line}" if prefix else line) + finally: + try: + pipe.close() + except Exception: + pass + # Unblock the waiter on EOF even if the sentinel never came. + holder["evt"].set() + + if process.stdout is not None: + threading.Thread( + target=_read_stdout, args=(process.stdout,), daemon=True, + ).start() + if process.stderr is not None: + threading.Thread( + target=_drain_pipe, args=(process.stderr, prefix), daemon=True, + ).start() + return holder + + +def wait_for_port(process, holder, deadline): + """Block until the port sentinel arrives, the process dies, or *deadline* + seconds elapse. Returns the port int or ``None``. + """ + end = monotonic() + deadline + while holder["port"] is None: + remaining = end - monotonic() + if remaining <= 0: + break + holder["evt"].wait(min(remaining, 0.2)) + if holder["port"] is not None: + break + if process.poll() is not None: + # Child exited; give the reader a moment to flush a trailing line. + holder["evt"].wait(0.2) + break + return holder["port"] diff --git a/src/testium/interpreter/utils/py_process.py b/src/testium/interpreter/utils/py_process.py index 6ce61d6..2248bd3 100644 --- a/src/testium/interpreter/utils/py_process.py +++ b/src/testium/interpreter/utils/py_process.py @@ -1,13 +1,12 @@ import os import sys import subprocess -import socket from runtime.jrpc import JsonRpcClient import api.testium as tm from runtime.tum_except import ETUMRuntimeError from interpreter.utils.paths import testium_path, subproc_path from interpreter.utils import bins -from interpreter.utils.proc_drain import drain_to_log +from interpreter.utils.proc_drain import drain_and_read_port, wait_for_port class PyProcessBase: @@ -54,13 +53,6 @@ class PyProcessBase: else: env[k] = e + os.pathsep + env.get(k, "") - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(("localhost", 0)) - self._port = sock.getsockname()[1] - # Port was reserved until the sub-process is started. Now released. - if sock is not None: - sock.close() - # In Flatpak the host can't see /app/lib/testium, so use a staged copy # under /tmp (shared between sandbox and host) for both cwd and as the # root in PYTHONPATH. Outside Flatpak the original paths are used. @@ -75,7 +67,7 @@ class PyProcessBase: cmd_args = [ "py_func", "-p", - f"{self._port}", + "0", "-t", f"{self._timeout}", ] @@ -107,11 +99,20 @@ class PyProcessBase: restore_signals=False, **popen_kwargs, ) - # Route subprocess stdout/stderr (early-startup errors, - # unhandled exceptions, anything written to fd 1/2 before the - # in-process JSON-RPC stdio_redir kicks in) into the parent's - # log. - drain_to_log(self._process, prefix="[py_func] ") + # Route subprocess stdout/stderr into the parent's log and read the + # startup port sentinel. Startup variance (cold start, antivirus) is + # absorbed here: we wait for the worker to announce its port before + # connecting. + holder = drain_and_read_port(self._process, prefix="[py_func] ") + self._port = wait_for_port( + self._process, holder, tm.gd("proc_start_timeout", 30) + ) + if self._port is None: + # Worker died before announcing its port: tear down fully so a + # later start() (e.g. a reused context_id engine) can retry cleanly. + self.stop() + self.join() + return self._rpc = JsonRpcClient( "localhost", self._port, req_handler=self._req_handler diff --git a/src/testium/lua_func/main.lua b/src/testium/lua_func/main.lua index 2b405bc..e4f4d59 100644 --- a/src/testium/lua_func/main.lua +++ b/src/testium/lua_func/main.lua @@ -3,7 +3,7 @@ -- ========================= local config = { host = "0.0.0.0", - port = 9000, + port = 0, -- 0 = OS-assigned; actual port is reported on stdout timeout = 60, verbose = false, } @@ -76,6 +76,10 @@ server_sock:listen(1) local ip, port = server_sock:getsockname() utils.log("listening on %s:%d for %.1f secs", ip, port, config.timeout) +-- Announce the actual bound port so the parent connects only once we listen. +io.stdout:write("__TESTIUM_RPC_PORT__=" .. port .. "\n") +io.stdout:flush() + server_sock:settimeout(config.timeout) -- Prevents hanging on dead connections -- Main Server Loop diff --git a/src/testium/py_func/__init__.py b/src/testium/py_func/__init__.py index f8e9b35..919d190 100755 --- a/src/testium/py_func/__init__.py +++ b/src/testium/py_func/__init__.py @@ -1,7 +1,9 @@ #!/usr/bin/env python +import sys import multiprocessing from py_func.tm import _init_api, _remote_print from runtime.stdout_redirect import stdio_redir +from runtime.jrpc import RPC_PORT_SENTINEL class TcpStdOut: @@ -24,21 +26,30 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("-i", "--ip", type=str, help="Ip address or hostname to listen to", default="localhost") - parser.add_argument("-p", "--port", type=int, help="port to listen to", - default=9000) + parser.add_argument("-p", "--port", type=int, help="port to listen to (0 = OS-assigned)", + default=0) parser.add_argument("-t", "--timeout", type=float, help="Timeout waiting for connection", default=10) parser.add_argument("-v", "--verbose", action='store_true', help="port to listen to") args = parser.parse_args() thrd_api = _init_api(args.ip, args.port, args.timeout) - # redirect I/O - outstream = TcpStdOut() - stdio_redir.redirect(outstream) # debug the server if args.verbose: thrd_api.dbg_out = stdio_redir.ini_stdout thrd_api.start() + + # Announce the actual bound port on real stdout (before redirection) so the + # parent connects only once we are listening. + port = thrd_api.wait_bound(args.timeout) + if port is None: + print("py_func: failed to bind a listening port", file=sys.stderr, flush=True) + return + print(f"{RPC_PORT_SENTINEL}{port}", flush=True) + + # redirect I/O + outstream = TcpStdOut() + stdio_redir.redirect(outstream) try: while thrd_api.is_alive(): thrd_api.join(1) diff --git a/src/testium/runtime/jrpc.py b/src/testium/runtime/jrpc.py index 7c5878b..c9b8edf 100644 --- a/src/testium/runtime/jrpc.py +++ b/src/testium/runtime/jrpc.py @@ -12,6 +12,11 @@ except: from runtime.tum_except import ETUMRuntimeError +# Startup handshake: the subprocess prints this line (followed by the actual +# bound port) on stdout once its server is listening; the parent reads it and +# connects. Avoids guessing the port and connecting before the server is ready. +RPC_PORT_SENTINEL = "__TESTIUM_RPC_PORT__=" + """Lightweight JSON-RPC 2.0 helpers over TCP sockets. This module implements a minimal JSON-RPC 2.0 messaging layer using @@ -279,6 +284,9 @@ class JsonRpcBase(threading.Thread): self._req_handler = req_handler self._dbg_out = dbg_out self._event_ready = threading.Event() + # Event is set on success AND failure so wait_ready() never hangs; + # _connected carries the actual outcome. + self._connected = False def handle_request(self, method, params): """Override to implement server-side request handling. @@ -314,10 +322,12 @@ class JsonRpcBase(threading.Thread): self.name, sock, self.handle_request, dbg_out=self.dbg_out ) self._rpc.wait_ready() + self._connected = True self._event_ready.set() def wait_ready(self, timeout=None): - return self._event_ready.wait(timeout) + self._event_ready.wait(timeout) + return self._connected @property def dbg_out(self): @@ -348,20 +358,34 @@ class JsonRpcSrv(JsonRpcBase): def __init__(self, host, port, req_handler=None, timeout=10): super().__init__(host, port, req_handler, timeout) self.name = f"JsonRpcSvr_{port}" + self._bound_port = None + self._bound_evt = threading.Event() + + @property + def bound_port(self): + return self._bound_port + + def wait_bound(self, timeout=None): + self._bound_evt.wait(timeout) + return self._bound_port def run(self): # TCP/IP socket creation try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # No SO_REUSEADDR: the OS picks a fresh ephemeral port (port 0), + # so there is no TIME_WAIT to override, and on Windows REUSEADDR + # would let another process hijack the port. - # Link of the socket at the configured port + # Bind (port may be 0 -> OS-assigned) then publish the actual port sock.bind((self._host, self._port)) # Listens incoming connections sock.listen(1) - self.print_info(f"listening on {self._host}:{self._port}") + self._bound_port = sock.getsockname()[1] + self._bound_evt.set() + self.print_info(f"listening on {self._host}:{self._bound_port}") self.print_info(f"awaiting connection for {self._timeout} secs") sock.settimeout(self._timeout) @@ -382,6 +406,8 @@ class JsonRpcSrv(JsonRpcBase): sleep(0.1) finally: + # Unblock wait_bound() even if bind/accept failed. + self._bound_evt.set() if self._rpc is not None: self._rpc.stop() self._rpc.join() @@ -407,35 +433,37 @@ class JsonRpcClient(JsonRpcBase): self.name = f"JsonRpcClt_{port}" def run(self): - if tm.OS() == "Windows": - self.run_win() - else: - self.run_lin() + try: + if tm.OS() == "Windows": + self.run_win() + else: + self.run_lin() + except Exception as e: + self.print_info(f"connection failed: {e}") + finally: + # Settle wait_ready() whatever the outcome (_connected stays False + # on failure). + self._event_ready.set() def run_win(self): - # TCP/IP socket creation - tslice = 1 - t = self._timeout + # Server is already listening (port handshake), so connect succeeds on + # the first attempt; retry on refused/timeout until the deadline anyway. + deadline = monotonic() + self._timeout sock = None try: - while t >= 0: + while True: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(tslice) - # Link of the socket at the configured port + sock.settimeout(0.5) try: sock.connect((self._host, self._port)) break - except socket.timeout: + except OSError as e: sock.close() - t -= tslice - if t < 0: + if monotonic() >= deadline: raise ETUMRuntimeError( - f"{self.name}: failed to connect : timeout" + f"{self.name}: failed to connect : {e}" ) - else: - sleep(tslice) - except socket.error as e: - raise ETUMRuntimeError(f"{self.name}: failed to connect : {e}") + sleep(0.1) self.print_info("Connected to server") self.connect(sock)