Make Stop interrupt blocking steps promptly
console.read_until polls a should_stop callback in 0.2s chunks across all protocols. py_func/lua_func override stop() to tear down the worker and wake the parent RPC wait. json_rpc adapters honor should_stop too. Engaged leaf steps now report FAILURE on stop (sleep no-dialog was silently SUCCESS).
This commit is contained in:
@@ -11,6 +11,7 @@ import threading
|
||||
from telnetlib3 import Telnet, DO, WILL, WONT, TTYPE, IAC, SB, SE, theNULL
|
||||
|
||||
TIMEOUT_NULL = 0.000001
|
||||
STOP_POLL_INTERVAL = 0.2
|
||||
|
||||
|
||||
class BytesStore(object):
|
||||
@@ -123,12 +124,14 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
||||
# c = ''
|
||||
return c
|
||||
|
||||
def read_until(self, match, timeout=None, return_data=False, mute=False):
|
||||
def read_until(self, match, timeout=None, return_data=False, mute=False, should_stop=None):
|
||||
"""
|
||||
read until the string 'match is found
|
||||
If timeout is not set (None), this function runs indefinitely
|
||||
If timeout is set to zero, this function returns immediately
|
||||
If mute is set to True the characters read from the console will not be displayed
|
||||
If should_stop is a callable, it is polled between reads (every STOP_POLL_INTERVAL
|
||||
at most) and the loop exits early — like a timeout — when it returns True.
|
||||
|
||||
If function fails (because of a timeout) it will return a 'status' integer set to -1
|
||||
otherwise it will return 0.
|
||||
@@ -139,13 +142,6 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
||||
status = -1
|
||||
if not match:
|
||||
raise ValueError('match parameter can not be empty')
|
||||
# replace all '\r' by '\n' as any '\r' read will undergo the same replacement
|
||||
# match = match.replace('\r\n', '\n')
|
||||
# match = match.replace('\r', '')
|
||||
|
||||
# update the console timeout in conformity with what is required.
|
||||
|
||||
self.set_read_timeout(timeout)
|
||||
|
||||
if timeout is None:
|
||||
timeout = 1000000
|
||||
@@ -159,6 +155,7 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
||||
# buffer is empty
|
||||
# Otherwise we are waiting for the timeout to rise
|
||||
if timeout < TIMEOUT_NULL:
|
||||
self.set_read_timeout(0)
|
||||
data = self.readchar(0)
|
||||
|
||||
while (status < 0) and ((data is not None) and (data != b'')):
|
||||
@@ -191,16 +188,21 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
||||
|
||||
# Timeout different than zero
|
||||
else:
|
||||
# Poll in short chunks so a stop request is honored within
|
||||
# STOP_POLL_INTERVAL, regardless of the per-protocol blocking
|
||||
# behavior of readchar().
|
||||
self.set_read_timeout(STOP_POLL_INTERVAL)
|
||||
|
||||
time_is_out = threading.Event()
|
||||
timer = threading.Timer(timeout, lambda: time_is_out.set())
|
||||
timer.start()
|
||||
|
||||
# We are waiting for the timeout to rise
|
||||
try:
|
||||
while (status < 0) and (not time_is_out.is_set()):
|
||||
if should_stop is not None and should_stop():
|
||||
break
|
||||
|
||||
while (status < 0) and (not time_is_out.isSet()):
|
||||
|
||||
data = self.readchar(timeout)
|
||||
data = self.readchar(STOP_POLL_INTERVAL)
|
||||
if data is not None:
|
||||
data = self._compute_char(data)
|
||||
if data != '':
|
||||
@@ -210,7 +212,6 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
||||
|
||||
search_deque.append(data)
|
||||
if search_deque == match_deque:
|
||||
timer.cancel()
|
||||
status = 0
|
||||
if (not mute) and (data != '\n'):
|
||||
self.string_buffer += '\n'
|
||||
@@ -224,6 +225,8 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
||||
|
||||
date_str = str(datetime.now()).split('.')[0].split(' ')[1]
|
||||
self.string_buffer = '[{} {}]'.format(date_str, self.name)
|
||||
finally:
|
||||
timer.cancel()
|
||||
|
||||
if return_data:
|
||||
return status, read_data
|
||||
|
||||
@@ -307,11 +307,17 @@ class TestItemConsoleReadUntil(TestItemConsoleAction):
|
||||
|
||||
try:
|
||||
status, data = cons.read_until(
|
||||
ru, timeout=read_timeout, return_data=True, mute=mute
|
||||
ru, timeout=read_timeout, return_data=True, mute=mute,
|
||||
should_stop=self.isStopped,
|
||||
)
|
||||
if status == 0:
|
||||
self.result.set(TestValue.SUCCESS)
|
||||
self.result.value = data
|
||||
elif self.isStopped():
|
||||
self.result.set(
|
||||
result=TestValue.FAILURE,
|
||||
message="Console read aborted on stop request",
|
||||
)
|
||||
else:
|
||||
self.result.set(result=TestValue.FAILURE, message="No matching text")
|
||||
if mute:
|
||||
|
||||
@@ -105,6 +105,7 @@ class TestItemJSRPCActionQuery(TestItemAction):
|
||||
jrpc_id = randint(1, (2**32) - 1)
|
||||
send_only = self._prms.expanse(self._send_only)
|
||||
timeout = self._prms.expanse(self._timeout)
|
||||
self.token.set_should_stop(self.isStopped)
|
||||
try:
|
||||
success, result = self.token.query(
|
||||
meth, obj, jrpc_id, send_only, timeout=timeout
|
||||
@@ -146,6 +147,7 @@ class TestItemJSRPCActionReceive(TestItemAction):
|
||||
def execute(self):
|
||||
timeout = self._prms.expanse(self._timeout)
|
||||
jrpc_id = self._prms.expanse(self._jrpc_id)
|
||||
self.token.set_should_stop(self.isStopped)
|
||||
|
||||
try:
|
||||
success, result = self.token.receive(jrpc_id, timeout)
|
||||
|
||||
@@ -2,10 +2,11 @@ import json
|
||||
import socket
|
||||
import re
|
||||
import struct
|
||||
import time
|
||||
|
||||
from runtime.tum_except import ETUMRuntimeError
|
||||
import api.testium as tm
|
||||
from api.console import Console
|
||||
from api.console import Console, STOP_POLL_INTERVAL
|
||||
|
||||
|
||||
def is_ip_address(address):
|
||||
@@ -45,9 +46,16 @@ class JrpcAdapter:
|
||||
self._jrpc_version = version
|
||||
self._mute = mute
|
||||
self._timeout = timeout
|
||||
# Optional callable polled by _receive() implementations to abort
|
||||
# waits early when the test is being stopped. Set by the test item
|
||||
# action before each query/receive call.
|
||||
self._should_stop = None
|
||||
if not (version == "1.0" or version == "2.0"):
|
||||
raise ETUMRuntimeError("Invalid JSONRPC version passed.")
|
||||
|
||||
def set_should_stop(self, cb):
|
||||
self._should_stop = cb
|
||||
|
||||
@property
|
||||
def timeout(self):
|
||||
return self._timeout
|
||||
@@ -249,13 +257,23 @@ class JrpcUdpAdapter(JrpcAdapter):
|
||||
print(f" | sent to @{self._server}:{self._snd_port}")
|
||||
|
||||
def _receive(self, timeout: float) -> str:
|
||||
|
||||
# configures the reception timeout
|
||||
self.sock.settimeout(timeout)
|
||||
|
||||
# Receives the answer from the server
|
||||
# Poll in short chunks so a stop request is honored within
|
||||
# STOP_POLL_INTERVAL.
|
||||
self.sock.settimeout(STOP_POLL_INTERVAL)
|
||||
deadline = time.monotonic() + float(timeout)
|
||||
data = None
|
||||
addr = None
|
||||
while True:
|
||||
if self._should_stop is not None and self._should_stop():
|
||||
raise ETUMRuntimeError("JSONRPC udp receive aborted on stop request.")
|
||||
try:
|
||||
data, addr = self.sock.recvfrom(self._bufsize)
|
||||
break
|
||||
except socket.timeout:
|
||||
if time.monotonic() >= deadline:
|
||||
raise ETUMRuntimeError(
|
||||
"JSONRPC udp answer took too long. Try to increase the timeout."
|
||||
)
|
||||
|
||||
# In case of buffer overload we chose to complain
|
||||
if len(data) >= self._bufsize:
|
||||
@@ -271,10 +289,6 @@ class JrpcUdpAdapter(JrpcAdapter):
|
||||
print(f" | UDP answer: '{res}'")
|
||||
print(f" | received from @{addr[0]}:{addr[1]}")
|
||||
|
||||
except socket.timeout:
|
||||
raise ETUMRuntimeError(
|
||||
"JSONRPC udp answer took too long. Try to increase the timeout."
|
||||
)
|
||||
return res
|
||||
|
||||
def _build_query(self, method: str, obj, jrpc_id: int):
|
||||
@@ -339,11 +353,16 @@ class JrpcConsoleAdapter(JrpcAdapter):
|
||||
|
||||
def _receive(self, timeout: float) -> str:
|
||||
status, data = self._cons.read_until(
|
||||
self._endswith, timeout, return_data=True, mute=self._mute
|
||||
self._endswith, timeout, return_data=True, mute=self._mute,
|
||||
should_stop=self._should_stop,
|
||||
)
|
||||
|
||||
# if we did not receive anything, we complain
|
||||
if not status == 0:
|
||||
if self._should_stop is not None and self._should_stop():
|
||||
raise ETUMRuntimeError(
|
||||
f"JSONRPC console receive aborted on stop request."
|
||||
)
|
||||
raise ETUMRuntimeError(
|
||||
f"The '{self._cons.name}' console did not answer in the requested time."
|
||||
)
|
||||
|
||||
@@ -45,6 +45,18 @@ class TestItemLuaFunc(TestItem):
|
||||
tm.setgd(_LUA_FUNC_CONTEXTS_KEY, contexts)
|
||||
return contexts[ctx_id], True
|
||||
|
||||
def stop(self):
|
||||
super().stop()
|
||||
# Tear down the worker so any in-flight func_call returns promptly.
|
||||
# join() clears _rpc/_process so a subsequent item reusing the same
|
||||
# context_id can restart the engine cleanly.
|
||||
try:
|
||||
engine, _ = self._get_engine()
|
||||
engine.stop()
|
||||
engine.join()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@test_run
|
||||
def execute(self):
|
||||
self.result.set(
|
||||
@@ -96,8 +108,14 @@ Is the lua environnment well defined in the "LUA_PATH" and "LUA_CPATH" variables
|
||||
|
||||
return
|
||||
|
||||
except ConnectionAbortedError:
|
||||
self.result.set(TestValue.FAILURE, "lua_func aborted on stop request")
|
||||
print("lua_func aborted on stop request.")
|
||||
except:
|
||||
traceback.print_exception(*sys.exc_info())
|
||||
if self.isStopped():
|
||||
self.result.set(TestValue.FAILURE, "lua_func aborted on stop request")
|
||||
else:
|
||||
self.result.set(
|
||||
TestValue.FAILURE,
|
||||
'Unrecoverable "lua_func" item error from {}'.format(self.func_name),
|
||||
|
||||
@@ -45,6 +45,18 @@ class TestItemPyFunc(TestItem):
|
||||
tm.setgd(_PY_FUNC_CONTEXTS_KEY, contexts)
|
||||
return contexts[ctx_id], True
|
||||
|
||||
def stop(self):
|
||||
super().stop()
|
||||
# Tear down the worker so any in-flight func_call returns promptly.
|
||||
# join() clears _rpc/_process so a subsequent item reusing the same
|
||||
# context_id can restart the engine cleanly.
|
||||
try:
|
||||
engine, _ = self._get_engine()
|
||||
engine.stop()
|
||||
engine.join()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@test_run
|
||||
def execute(self):
|
||||
self.result.set(
|
||||
@@ -94,8 +106,14 @@ python_bin = {tm.gd("python_bin", "no python path defined")}"""
|
||||
|
||||
return
|
||||
|
||||
except ConnectionAbortedError:
|
||||
self.result.set(TestValue.FAILURE, "py_func aborted on stop request")
|
||||
print("py_func aborted on stop request.")
|
||||
except:
|
||||
traceback.print_exception(*sys.exc_info())
|
||||
if self.isStopped():
|
||||
self.result.set(TestValue.FAILURE, "py_func aborted on stop request")
|
||||
else:
|
||||
self.result.set(
|
||||
TestValue.FAILURE,
|
||||
'Unrecoverable "py_func" item error from {}'.format(self.func_name),
|
||||
|
||||
@@ -80,4 +80,7 @@ class TestItemSleep(TestItem):
|
||||
end_time = _time.time() + float(timeout)
|
||||
while _time.time() < end_time and not self._is_stopped:
|
||||
sleep(min(0.05, end_time - _time.time()))
|
||||
if self._is_stopped:
|
||||
self.result.set(TestValue.FAILURE, 'Sleep aborted on stop request')
|
||||
else:
|
||||
self.result.set(TestValue.SUCCESS, 'Sleep %s sec' % (str(timeout)))
|
||||
|
||||
@@ -146,4 +146,12 @@ class LuaProcessBase:
|
||||
"""
|
||||
if self._rpc is not None:
|
||||
self._rpc.stop()
|
||||
# Force-kill the worker if it's still running. Needed when user code
|
||||
# in the worker is stuck and won't notice the parent closing the RPC
|
||||
# socket on its own.
|
||||
if self._process is not None and self._process.poll() is None:
|
||||
try:
|
||||
self._process.terminate()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@@ -123,3 +123,11 @@ class PyProcessBase:
|
||||
def stop(self):
|
||||
if self._rpc is not None:
|
||||
self._rpc.stop()
|
||||
# Force-kill the worker if it's still running. Needed when user code
|
||||
# in the worker is stuck (e.g. sleep, blocking I/O) and won't notice
|
||||
# the parent closing the RPC socket on its own.
|
||||
if self._process is not None and self._process.poll() is None:
|
||||
try:
|
||||
self._process.terminate()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -200,6 +200,7 @@ class JsonRpcConnection:
|
||||
|
||||
Raises:
|
||||
TimeoutError: If no response is received within `timeout`.
|
||||
ConnectionAbortedError: If stop() was called while waiting.
|
||||
"""
|
||||
|
||||
req_id = next(self.id_gen)
|
||||
@@ -214,7 +215,12 @@ class JsonRpcConnection:
|
||||
self.pending.pop(req_id, None)
|
||||
raise TimeoutError("Timeout JSON-RPC")
|
||||
|
||||
return self.pending.pop(req_id)["response"]
|
||||
entry = self.pending.pop(req_id)
|
||||
if entry["response"] is None:
|
||||
# Woken by stop() (or by a malformed dispatch) rather than by a
|
||||
# real response — abort the call so callers don't block further.
|
||||
raise ConnectionAbortedError("JSON-RPC client stopped")
|
||||
return entry["response"]
|
||||
|
||||
def print_info(self, msg):
|
||||
if self.dbg_out is not None:
|
||||
@@ -223,6 +229,10 @@ class JsonRpcConnection:
|
||||
def stop(self):
|
||||
if self.running:
|
||||
self.running = False
|
||||
# Wake any in-flight call() so it doesn't sit on its (default 1h)
|
||||
# timeout. The response stays None and call() raises ConnectionAbortedError.
|
||||
for entry in list(self.pending.values()):
|
||||
entry["event"].set()
|
||||
|
||||
def join(self):
|
||||
self.recv_thread.join()
|
||||
|
||||
Reference in New Issue
Block a user