fix(windows): RPC port handshake for py/lua subprocesses

The subprocess now binds port 0, prints the bound port on stdout after
listen(), and the parent connects only once it reads that port. Removes
the reserve/close/rebind race and SO_REUSEADDR, and the connect-before-
ready timing guess that failed intermittently on Windows. wait_ready()
no longer hangs when a connection attempt fails.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-07 18:29:04 +02:00
parent 9dae210f7f
commit 2241dfb8c7
8 changed files with 175 additions and 57 deletions

View File

@@ -97,6 +97,15 @@ All dialog items (`dialog_image`, `dialog_question`, `dialog_references`, `dialo
- For the live stream (terminal in batch / GUI panel), prefixes every line emitted from a branch's thread with `[<branch_name>] ` so concurrent branches stay readable.
- Exposes `write` / `writeln` / `flush` (Python 3.14's `unittest` calls `stream.writeln()` directly without `_WritelnDecorator`).
### Subprocess RPC startup handshake (py_func / lua_func / eval_proc)
The parent ↔ subprocess JSON-RPC link runs over a localhost TCP socket. The **subprocess** owns the port: it binds `port 0` (OS-assigned), `listen()`s, then prints `__TESTIUM_RPC_PORT__=<port>` on stdout (constant `RPC_PORT_SENTINEL` in `runtime/jrpc.py`). The parent reads that line (`proc_drain.drain_and_read_port` + `wait_for_port`, deadline `gd("proc_start_timeout", 30)`) and only *then* connects — the server is guaranteed to be listening, so the connect succeeds on the first attempt.
This replaced the previous fragile scheme (parent reserved a port via `bind(0)`+close, child re-bound the same port, parent connected on a timing guess) which broke intermittently on Windows: cold-start/antivirus variance pushed the worker past the connect deadline, and `connect()` to a not-yet-listening localhost port *times out* (≈1 s) instead of refusing, exhausting the retry budget. Notes:
- The server no longer sets `SO_REUSEADDR` (a fresh ephemeral port needs no TIME_WAIT override; on Windows it would enable port hijacking).
- `JsonRpcBase.wait_ready()` always settles (event set on success **and** failure) and returns the actual connection outcome — a connect failure no longer hangs a `wait_ready()` caller.
- Non-sentinel subprocess stdout/stderr is still forwarded to the parent log (early-startup errors stay visible).
### Subprocess API contract (py_func / lua_func)
User test scripts running inside a `py_func` or `lua_func` subprocess **must** use the JSON-RPC bridge to interact with testium state:
@@ -279,6 +288,7 @@ The `testium_assist` editor extension is a thin LSP client that spawns `testium
Both Flatpak and AppImage export `TESTIUM_VERSION` from a launcher (Flatpak: launcher script in `org.testium.Testium.yaml`; AppImage: `runtime.env` in `AppImageBuilder.yml`). `get_testium_version()` checks `/.flatpak-info` / `APPIMAGE` and reads `TESTIUM_VERSION` rather than relying on package metadata or repo introspection.
## Recent fixes / notable changes
- Subprocess RPC startup handshake: the `py_func`/`lua_func`/`eval_proc` worker now picks its own port (`bind 0`), announces it on stdout (`__TESTIUM_RPC_PORT__=`), and the parent connects only after reading it. Fixes intermittent Windows `failed to connect : timeout` and the matching `wait_ready()` hang; removes the reserve/close/rebind race and `SO_REUSEADDR`. See "Subprocess RPC startup handshake".
- `build_all.sh`: builds the four heavy channels in parallel (serial prep for the shared venv + wheel), results in completion order, Ctrl+C kills the whole job tree; `--ram` puts the build scratch on tmpfs (`/dev/shm`) + skips UPX for fast builds on USB/SD storage (Flatpak excluded — rofiles-fuse can't mount tmpfs). See the "Building all channels" section.
- LSP across packaging channels: `testium lsp` (and the `testium_assist` editor extension that spawns it) now works from source, wheel, PyInstaller, Flatpak and AppImage. Two enablers — (1) action items declare a class-level `ACTIONS = {key: class}` registry (like `PARAMS`), so `lsp/schema.py` builds the full schema from class attributes with no `inspect.getsource`/AST (which broke under frozen PyInstaller); (2) the `[lsp]` extra (pygls) is wired into every full-app channel. `test/validation/lsp_check.py`, run by `run.sh` before the suite, asserts per-channel that `schema` keeps its actions and `lsp` answers `initialize`. See the matching architecture sections.
- Declarative test item parameters (v0.2): each `TestItem` subclass exposes a `PARAMS = ParamSet(...)` class attribute consumed by the base `__init__`. Catches unknown YAML keys (typo warnings listing the accepted names) and missing required params (load-time errors with `.tum` context). Lays the schema foundation for a future LSP server and auto-generated manual sections. See the matching architecture section.

View File

@@ -90,7 +90,7 @@ class TestItemPyFunc(TestItem):
if not engine.is_alive():
engine.start()
if not engine.wait_ready():
if not engine.wait_ready(10):
raise ETUMRuntimeError(
f"""Impossible to start the external python execution process.
Is the python path correct ?

View File

@@ -1,14 +1,13 @@
import os
import sys
import subprocess
import socket
import api.testium as tm
from runtime.jrpc import JsonRpcClient
from interpreter.utils.paths import subproc_path
from runtime.tum_except import ETUMRuntimeError
from interpreter.utils import bins
from interpreter.utils.proc_drain import drain_to_log
from interpreter.utils.proc_drain import drain_and_read_port, wait_for_port
class LuaProcessBase:
@@ -79,12 +78,7 @@ class LuaProcessBase:
else:
env[k] = e + ";" + env.get(k, "")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("localhost", 0))
self._port = sock.getsockname()[1]
sock.close()
# POpen params
# POpen params (port 0 -> the Lua server picks a free port and reports it)
cmd_args = [
"main.lua",
"--timeout",
@@ -92,7 +86,7 @@ class LuaProcessBase:
"--host",
"127.0.0.1",
"--port",
f"{self._port}",
"0",
]
if tm.debug_enabled() and tm.gd("debug_rpc", False):
@@ -122,10 +116,18 @@ class LuaProcessBase:
restore_signals=False,
**popen_kwargs,
)
# Route subprocess stdout/stderr (lua require failures, syntax
# errors, anything written to fd 1/2 before the in-script
# remote_print is set up) into the parent's log.
drain_to_log(self._process, prefix="[lua_func] ")
# Route subprocess stdout/stderr into the parent's log and read the
# startup port sentinel before connecting.
holder = drain_and_read_port(self._process, prefix="[lua_func] ")
self._port = wait_for_port(
self._process, holder, tm.gd("proc_start_timeout", 30)
)
if self._port is None:
# Worker died before announcing its port: tear down fully so a
# later start() (e.g. a reused context_id engine) can retry cleanly.
self.stop()
self.join()
return
self._rpc = JsonRpcClient(
"localhost", self._port, req_handler=self._req_handler

View File

@@ -8,6 +8,9 @@ exceptions before the in-process redirection kicks in, lua
``require`` failures, anything written to fd 1/2 directly).
"""
import threading
from time import monotonic
from runtime.jrpc import RPC_PORT_SENTINEL
def _drain_pipe(pipe, prefix):
@@ -46,3 +49,62 @@ def drain_to_log(process, prefix=""):
t.start()
threads.append(t)
return threads
def drain_and_read_port(process, prefix=""):
"""Like :func:`drain_to_log`, but the stdout reader also watches for the
startup port sentinel. Returns a ``holder`` dict (passed to
:func:`wait_for_port`); all non-sentinel lines are still forwarded to the
log. stderr is drained as usual.
"""
holder = {"port": None, "evt": threading.Event()}
def _read_stdout(pipe):
try:
for raw in iter(pipe.readline, b""):
line = raw.decode("utf-8", errors="replace").rstrip("\r\n")
if holder["port"] is None and line.startswith(RPC_PORT_SENTINEL):
try:
holder["port"] = int(line[len(RPC_PORT_SENTINEL):].strip())
except ValueError:
continue
holder["evt"].set()
continue
if line:
print(f"{prefix}{line}" if prefix else line)
finally:
try:
pipe.close()
except Exception:
pass
# Unblock the waiter on EOF even if the sentinel never came.
holder["evt"].set()
if process.stdout is not None:
threading.Thread(
target=_read_stdout, args=(process.stdout,), daemon=True,
).start()
if process.stderr is not None:
threading.Thread(
target=_drain_pipe, args=(process.stderr, prefix), daemon=True,
).start()
return holder
def wait_for_port(process, holder, deadline):
"""Block until the port sentinel arrives, the process dies, or *deadline*
seconds elapse. Returns the port int or ``None``.
"""
end = monotonic() + deadline
while holder["port"] is None:
remaining = end - monotonic()
if remaining <= 0:
break
holder["evt"].wait(min(remaining, 0.2))
if holder["port"] is not None:
break
if process.poll() is not None:
# Child exited; give the reader a moment to flush a trailing line.
holder["evt"].wait(0.2)
break
return holder["port"]

View File

@@ -1,13 +1,12 @@
import os
import sys
import subprocess
import socket
from runtime.jrpc import JsonRpcClient
import api.testium as tm
from runtime.tum_except import ETUMRuntimeError
from interpreter.utils.paths import testium_path, subproc_path
from interpreter.utils import bins
from interpreter.utils.proc_drain import drain_to_log
from interpreter.utils.proc_drain import drain_and_read_port, wait_for_port
class PyProcessBase:
@@ -54,13 +53,6 @@ class PyProcessBase:
else:
env[k] = e + os.pathsep + env.get(k, "")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("localhost", 0))
self._port = sock.getsockname()[1]
# Port was reserved until the sub-process is started. Now released.
if sock is not None:
sock.close()
# In Flatpak the host can't see /app/lib/testium, so use a staged copy
# under /tmp (shared between sandbox and host) for both cwd and as the
# root in PYTHONPATH. Outside Flatpak the original paths are used.
@@ -75,7 +67,7 @@ class PyProcessBase:
cmd_args = [
"py_func",
"-p",
f"{self._port}",
"0",
"-t",
f"{self._timeout}",
]
@@ -107,11 +99,20 @@ class PyProcessBase:
restore_signals=False,
**popen_kwargs,
)
# Route subprocess stdout/stderr (early-startup errors,
# unhandled exceptions, anything written to fd 1/2 before the
# in-process JSON-RPC stdio_redir kicks in) into the parent's
# log.
drain_to_log(self._process, prefix="[py_func] ")
# Route subprocess stdout/stderr into the parent's log and read the
# startup port sentinel. Startup variance (cold start, antivirus) is
# absorbed here: we wait for the worker to announce its port before
# connecting.
holder = drain_and_read_port(self._process, prefix="[py_func] ")
self._port = wait_for_port(
self._process, holder, tm.gd("proc_start_timeout", 30)
)
if self._port is None:
# Worker died before announcing its port: tear down fully so a
# later start() (e.g. a reused context_id engine) can retry cleanly.
self.stop()
self.join()
return
self._rpc = JsonRpcClient(
"localhost", self._port, req_handler=self._req_handler

View File

@@ -3,7 +3,7 @@
-- =========================
local config = {
host = "0.0.0.0",
port = 9000,
port = 0, -- 0 = OS-assigned; actual port is reported on stdout
timeout = 60,
verbose = false,
}
@@ -76,6 +76,10 @@ server_sock:listen(1)
local ip, port = server_sock:getsockname()
utils.log("listening on %s:%d for %.1f secs", ip, port, config.timeout)
-- Announce the actual bound port so the parent connects only once we listen.
io.stdout:write("__TESTIUM_RPC_PORT__=" .. port .. "\n")
io.stdout:flush()
server_sock:settimeout(config.timeout) -- Prevents hanging on dead connections
-- Main Server Loop

View File

@@ -1,7 +1,9 @@
#!/usr/bin/env python
import sys
import multiprocessing
from py_func.tm import _init_api, _remote_print
from runtime.stdout_redirect import stdio_redir
from runtime.jrpc import RPC_PORT_SENTINEL
class TcpStdOut:
@@ -24,21 +26,30 @@ def main():
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--ip", type=str, help="Ip address or hostname to listen to",
default="localhost")
parser.add_argument("-p", "--port", type=int, help="port to listen to",
default=9000)
parser.add_argument("-p", "--port", type=int, help="port to listen to (0 = OS-assigned)",
default=0)
parser.add_argument("-t", "--timeout", type=float, help="Timeout waiting for connection",
default=10)
parser.add_argument("-v", "--verbose", action='store_true', help="port to listen to")
args = parser.parse_args()
thrd_api = _init_api(args.ip, args.port, args.timeout)
# redirect I/O
outstream = TcpStdOut()
stdio_redir.redirect(outstream)
# debug the server
if args.verbose:
thrd_api.dbg_out = stdio_redir.ini_stdout
thrd_api.start()
# Announce the actual bound port on real stdout (before redirection) so the
# parent connects only once we are listening.
port = thrd_api.wait_bound(args.timeout)
if port is None:
print("py_func: failed to bind a listening port", file=sys.stderr, flush=True)
return
print(f"{RPC_PORT_SENTINEL}{port}", flush=True)
# redirect I/O
outstream = TcpStdOut()
stdio_redir.redirect(outstream)
try:
while thrd_api.is_alive():
thrd_api.join(1)

View File

@@ -12,6 +12,11 @@ except:
from runtime.tum_except import ETUMRuntimeError
# Startup handshake: the subprocess prints this line (followed by the actual
# bound port) on stdout once its server is listening; the parent reads it and
# connects. Avoids guessing the port and connecting before the server is ready.
RPC_PORT_SENTINEL = "__TESTIUM_RPC_PORT__="
"""Lightweight JSON-RPC 2.0 helpers over TCP sockets.
This module implements a minimal JSON-RPC 2.0 messaging layer using
@@ -279,6 +284,9 @@ class JsonRpcBase(threading.Thread):
self._req_handler = req_handler
self._dbg_out = dbg_out
self._event_ready = threading.Event()
# Event is set on success AND failure so wait_ready() never hangs;
# _connected carries the actual outcome.
self._connected = False
def handle_request(self, method, params):
"""Override to implement server-side request handling.
@@ -314,10 +322,12 @@ class JsonRpcBase(threading.Thread):
self.name, sock, self.handle_request, dbg_out=self.dbg_out
)
self._rpc.wait_ready()
self._connected = True
self._event_ready.set()
def wait_ready(self, timeout=None):
return self._event_ready.wait(timeout)
self._event_ready.wait(timeout)
return self._connected
@property
def dbg_out(self):
@@ -348,20 +358,34 @@ class JsonRpcSrv(JsonRpcBase):
def __init__(self, host, port, req_handler=None, timeout=10):
super().__init__(host, port, req_handler, timeout)
self.name = f"JsonRpcSvr_{port}"
self._bound_port = None
self._bound_evt = threading.Event()
@property
def bound_port(self):
return self._bound_port
def wait_bound(self, timeout=None):
self._bound_evt.wait(timeout)
return self._bound_port
def run(self):
# TCP/IP socket creation
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# No SO_REUSEADDR: the OS picks a fresh ephemeral port (port 0),
# so there is no TIME_WAIT to override, and on Windows REUSEADDR
# would let another process hijack the port.
# Link of the socket at the configured port
# Bind (port may be 0 -> OS-assigned) then publish the actual port
sock.bind((self._host, self._port))
# Listens incoming connections
sock.listen(1)
self.print_info(f"listening on {self._host}:{self._port}")
self._bound_port = sock.getsockname()[1]
self._bound_evt.set()
self.print_info(f"listening on {self._host}:{self._bound_port}")
self.print_info(f"awaiting connection for {self._timeout} secs")
sock.settimeout(self._timeout)
@@ -382,6 +406,8 @@ class JsonRpcSrv(JsonRpcBase):
sleep(0.1)
finally:
# Unblock wait_bound() even if bind/accept failed.
self._bound_evt.set()
if self._rpc is not None:
self._rpc.stop()
self._rpc.join()
@@ -407,35 +433,37 @@ class JsonRpcClient(JsonRpcBase):
self.name = f"JsonRpcClt_{port}"
def run(self):
try:
if tm.OS() == "Windows":
self.run_win()
else:
self.run_lin()
except Exception as e:
self.print_info(f"connection failed: {e}")
finally:
# Settle wait_ready() whatever the outcome (_connected stays False
# on failure).
self._event_ready.set()
def run_win(self):
# TCP/IP socket creation
tslice = 1
t = self._timeout
# Server is already listening (port handshake), so connect succeeds on
# the first attempt; retry on refused/timeout until the deadline anyway.
deadline = monotonic() + self._timeout
sock = None
try:
while t >= 0:
while True:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(tslice)
# Link of the socket at the configured port
sock.settimeout(0.5)
try:
sock.connect((self._host, self._port))
break
except socket.timeout:
except OSError as e:
sock.close()
t -= tslice
if t < 0:
if monotonic() >= deadline:
raise ETUMRuntimeError(
f"{self.name}: failed to connect : timeout"
f"{self.name}: failed to connect : {e}"
)
else:
sleep(tslice)
except socket.error as e:
raise ETUMRuntimeError(f"{self.name}: failed to connect : {e}")
sleep(0.1)
self.print_info("Connected to server")
self.connect(sock)