diff --git a/src/testium/api/console.py b/src/testium/api/console.py index 924da1c..cf72a41 100755 --- a/src/testium/api/console.py +++ b/src/testium/api/console.py @@ -11,6 +11,7 @@ import threading from telnetlib3 import Telnet, DO, WILL, WONT, TTYPE, IAC, SB, SE, theNULL TIMEOUT_NULL = 0.000001 +STOP_POLL_INTERVAL = 0.2 class BytesStore(object): @@ -123,12 +124,14 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type # c = '' return c - def read_until(self, match, timeout=None, return_data=False, mute=False): + def read_until(self, match, timeout=None, return_data=False, mute=False, should_stop=None): """ read until the string 'match is found If timeout is not set (None), this function runs indefinitely If timeout is set to zero, this function returns immediately If mute is set to True the characters read from the console will not be displayed + If should_stop is a callable, it is polled between reads (every STOP_POLL_INTERVAL + at most) and the loop exits early — like a timeout — when it returns True. If function fails (because of a timeout) it will return a 'status' integer set to -1 otherwise it will return 0. @@ -139,13 +142,6 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type status = -1 if not match: raise ValueError('match parameter can not be empty') - # replace all '\r' by '\n' as any '\r' read will undergo the same replacement - # match = match.replace('\r\n', '\n') - # match = match.replace('\r', '') - - # update the console timeout in conformity with what is required. - - self.set_read_timeout(timeout) if timeout is None: timeout = 1000000 @@ -159,6 +155,7 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type # buffer is empty # Otherwise we are waiting for the timeout to rise if timeout < TIMEOUT_NULL: + self.set_read_timeout(0) data = self.readchar(0) while (status < 0) and ((data is not None) and (data != b'')): @@ -191,39 +188,45 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type # Timeout different than zero else: + # Poll in short chunks so a stop request is honored within + # STOP_POLL_INTERVAL, regardless of the per-protocol blocking + # behavior of readchar(). + self.set_read_timeout(STOP_POLL_INTERVAL) time_is_out = threading.Event() timer = threading.Timer(timeout, lambda: time_is_out.set()) timer.start() - # We are waiting for the timeout to rise + try: + while (status < 0) and (not time_is_out.is_set()): + if should_stop is not None and should_stop(): + break - while (status < 0) and (not time_is_out.isSet()): - - data = self.readchar(timeout) - if data is not None: - data = self._compute_char(data) - if data != '': - if not mute: - self.string_buffer += data - read_data += data - - search_deque.append(data) - if search_deque == match_deque: - timer.cancel() - status = 0 - if (not mute) and (data != '\n'): - self.string_buffer += '\n' - - if data == '\n' or (status >= 0): - # the datas are written line by line for display optimisation in GUI mode + data = self.readchar(STOP_POLL_INTERVAL) + if data is not None: + data = self._compute_char(data) + if data != '': if not mute: - self.string_buffer = self.string_buffer.replace('\r\n', '\n') - self.string_buffer = self.string_buffer.replace('\r', '') - self.stream.write(self.string_buffer) + self.string_buffer += data + read_data += data - date_str = str(datetime.now()).split('.')[0].split(' ')[1] - self.string_buffer = '[{} {}]'.format(date_str, self.name) + search_deque.append(data) + if search_deque == match_deque: + status = 0 + if (not mute) and (data != '\n'): + self.string_buffer += '\n' + + if data == '\n' or (status >= 0): + # the datas are written line by line for display optimisation in GUI mode + if not mute: + self.string_buffer = self.string_buffer.replace('\r\n', '\n') + self.string_buffer = self.string_buffer.replace('\r', '') + self.stream.write(self.string_buffer) + + date_str = str(datetime.now()).split('.')[0].split(' ')[1] + self.string_buffer = '[{} {}]'.format(date_str, self.name) + finally: + timer.cancel() if return_data: return status, read_data diff --git a/src/testium/interpreter/test_items/test_item_console.py b/src/testium/interpreter/test_items/test_item_console.py index e264707..f924d97 100644 --- a/src/testium/interpreter/test_items/test_item_console.py +++ b/src/testium/interpreter/test_items/test_item_console.py @@ -307,11 +307,17 @@ class TestItemConsoleReadUntil(TestItemConsoleAction): try: status, data = cons.read_until( - ru, timeout=read_timeout, return_data=True, mute=mute + ru, timeout=read_timeout, return_data=True, mute=mute, + should_stop=self.isStopped, ) if status == 0: self.result.set(TestValue.SUCCESS) self.result.value = data + elif self.isStopped(): + self.result.set( + result=TestValue.FAILURE, + message="Console read aborted on stop request", + ) else: self.result.set(result=TestValue.FAILURE, message="No matching text") if mute: diff --git a/src/testium/interpreter/test_items/test_item_json_rpc/__init__.py b/src/testium/interpreter/test_items/test_item_json_rpc/__init__.py index d4631dc..38281d5 100644 --- a/src/testium/interpreter/test_items/test_item_json_rpc/__init__.py +++ b/src/testium/interpreter/test_items/test_item_json_rpc/__init__.py @@ -105,6 +105,7 @@ class TestItemJSRPCActionQuery(TestItemAction): jrpc_id = randint(1, (2**32) - 1) send_only = self._prms.expanse(self._send_only) timeout = self._prms.expanse(self._timeout) + self.token.set_should_stop(self.isStopped) try: success, result = self.token.query( meth, obj, jrpc_id, send_only, timeout=timeout @@ -146,6 +147,7 @@ class TestItemJSRPCActionReceive(TestItemAction): def execute(self): timeout = self._prms.expanse(self._timeout) jrpc_id = self._prms.expanse(self._jrpc_id) + self.token.set_should_stop(self.isStopped) try: success, result = self.token.receive(jrpc_id, timeout) diff --git a/src/testium/interpreter/test_items/test_item_json_rpc/jsonrpc_adapters.py b/src/testium/interpreter/test_items/test_item_json_rpc/jsonrpc_adapters.py index e27f168..2ba738a 100644 --- a/src/testium/interpreter/test_items/test_item_json_rpc/jsonrpc_adapters.py +++ b/src/testium/interpreter/test_items/test_item_json_rpc/jsonrpc_adapters.py @@ -2,10 +2,11 @@ import json import socket import re import struct +import time from runtime.tum_except import ETUMRuntimeError import api.testium as tm -from api.console import Console +from api.console import Console, STOP_POLL_INTERVAL def is_ip_address(address): @@ -45,9 +46,16 @@ class JrpcAdapter: self._jrpc_version = version self._mute = mute self._timeout = timeout + # Optional callable polled by _receive() implementations to abort + # waits early when the test is being stopped. Set by the test item + # action before each query/receive call. + self._should_stop = None if not (version == "1.0" or version == "2.0"): raise ETUMRuntimeError("Invalid JSONRPC version passed.") + def set_should_stop(self, cb): + self._should_stop = cb + @property def timeout(self): return self._timeout @@ -249,32 +257,38 @@ class JrpcUdpAdapter(JrpcAdapter): print(f" | sent to @{self._server}:{self._snd_port}") def _receive(self, timeout: float) -> str: + # Poll in short chunks so a stop request is honored within + # STOP_POLL_INTERVAL. + self.sock.settimeout(STOP_POLL_INTERVAL) + deadline = time.monotonic() + float(timeout) + data = None + addr = None + while True: + if self._should_stop is not None and self._should_stop(): + raise ETUMRuntimeError("JSONRPC udp receive aborted on stop request.") + try: + data, addr = self.sock.recvfrom(self._bufsize) + break + except socket.timeout: + if time.monotonic() >= deadline: + raise ETUMRuntimeError( + "JSONRPC udp answer took too long. Try to increase the timeout." + ) - # configures the reception timeout - self.sock.settimeout(timeout) - - # Receives the answer from the server - try: - data, addr = self.sock.recvfrom(self._bufsize) - - # In case of buffer overload we chose to complain - if len(data) >= self._bufsize: - raise ETUMRuntimeError( - "JSONRPC udp answer size overflow. Try to increase the bufsize" - ) - - # Converts binary to string - res = data.decode() - - # Don't log if mute - if not self._mute: - print(f" | UDP answer: '{res}'") - print(f" | received from @{addr[0]}:{addr[1]}") - - except socket.timeout: + # In case of buffer overload we chose to complain + if len(data) >= self._bufsize: raise ETUMRuntimeError( - "JSONRPC udp answer took too long. Try to increase the timeout." + "JSONRPC udp answer size overflow. Try to increase the bufsize" ) + + # Converts binary to string + res = data.decode() + + # Don't log if mute + if not self._mute: + print(f" | UDP answer: '{res}'") + print(f" | received from @{addr[0]}:{addr[1]}") + return res def _build_query(self, method: str, obj, jrpc_id: int): @@ -339,11 +353,16 @@ class JrpcConsoleAdapter(JrpcAdapter): def _receive(self, timeout: float) -> str: status, data = self._cons.read_until( - self._endswith, timeout, return_data=True, mute=self._mute + self._endswith, timeout, return_data=True, mute=self._mute, + should_stop=self._should_stop, ) # if we did not receive anything, we complain if not status == 0: + if self._should_stop is not None and self._should_stop(): + raise ETUMRuntimeError( + f"JSONRPC console receive aborted on stop request." + ) raise ETUMRuntimeError( f"The '{self._cons.name}' console did not answer in the requested time." ) diff --git a/src/testium/interpreter/test_items/test_item_lua_func.py b/src/testium/interpreter/test_items/test_item_lua_func.py index 1b9ba54..b6d227e 100644 --- a/src/testium/interpreter/test_items/test_item_lua_func.py +++ b/src/testium/interpreter/test_items/test_item_lua_func.py @@ -45,6 +45,18 @@ class TestItemLuaFunc(TestItem): tm.setgd(_LUA_FUNC_CONTEXTS_KEY, contexts) return contexts[ctx_id], True + def stop(self): + super().stop() + # Tear down the worker so any in-flight func_call returns promptly. + # join() clears _rpc/_process so a subsequent item reusing the same + # context_id can restart the engine cleanly. + try: + engine, _ = self._get_engine() + engine.stop() + engine.join() + except Exception: + pass + @test_run def execute(self): self.result.set( @@ -96,9 +108,15 @@ Is the lua environnment well defined in the "LUA_PATH" and "LUA_CPATH" variables return + except ConnectionAbortedError: + self.result.set(TestValue.FAILURE, "lua_func aborted on stop request") + print("lua_func aborted on stop request.") except: traceback.print_exception(*sys.exc_info()) - self.result.set( - TestValue.FAILURE, - 'Unrecoverable "lua_func" item error from {}'.format(self.func_name), - ) + if self.isStopped(): + self.result.set(TestValue.FAILURE, "lua_func aborted on stop request") + else: + self.result.set( + TestValue.FAILURE, + 'Unrecoverable "lua_func" item error from {}'.format(self.func_name), + ) diff --git a/src/testium/interpreter/test_items/test_item_py_func.py b/src/testium/interpreter/test_items/test_item_py_func.py index 59af448..7394439 100644 --- a/src/testium/interpreter/test_items/test_item_py_func.py +++ b/src/testium/interpreter/test_items/test_item_py_func.py @@ -45,6 +45,18 @@ class TestItemPyFunc(TestItem): tm.setgd(_PY_FUNC_CONTEXTS_KEY, contexts) return contexts[ctx_id], True + def stop(self): + super().stop() + # Tear down the worker so any in-flight func_call returns promptly. + # join() clears _rpc/_process so a subsequent item reusing the same + # context_id can restart the engine cleanly. + try: + engine, _ = self._get_engine() + engine.stop() + engine.join() + except Exception: + pass + @test_run def execute(self): self.result.set( @@ -94,9 +106,15 @@ python_bin = {tm.gd("python_bin", "no python path defined")}""" return + except ConnectionAbortedError: + self.result.set(TestValue.FAILURE, "py_func aborted on stop request") + print("py_func aborted on stop request.") except: traceback.print_exception(*sys.exc_info()) - self.result.set( - TestValue.FAILURE, - 'Unrecoverable "py_func" item error from {}'.format(self.func_name), - ) + if self.isStopped(): + self.result.set(TestValue.FAILURE, "py_func aborted on stop request") + else: + self.result.set( + TestValue.FAILURE, + 'Unrecoverable "py_func" item error from {}'.format(self.func_name), + ) diff --git a/src/testium/interpreter/test_items/test_item_sleep.py b/src/testium/interpreter/test_items/test_item_sleep.py index 4aabef9..090f9b2 100644 --- a/src/testium/interpreter/test_items/test_item_sleep.py +++ b/src/testium/interpreter/test_items/test_item_sleep.py @@ -80,4 +80,7 @@ class TestItemSleep(TestItem): end_time = _time.time() + float(timeout) while _time.time() < end_time and not self._is_stopped: sleep(min(0.05, end_time - _time.time())) - self.result.set(TestValue.SUCCESS, 'Sleep %s sec' % (str(timeout))) + if self._is_stopped: + self.result.set(TestValue.FAILURE, 'Sleep aborted on stop request') + else: + self.result.set(TestValue.SUCCESS, 'Sleep %s sec' % (str(timeout))) diff --git a/src/testium/interpreter/utils/lua_process.py b/src/testium/interpreter/utils/lua_process.py index 3898779..5eff8f1 100644 --- a/src/testium/interpreter/utils/lua_process.py +++ b/src/testium/interpreter/utils/lua_process.py @@ -146,4 +146,12 @@ class LuaProcessBase: """ if self._rpc is not None: self._rpc.stop() + # Force-kill the worker if it's still running. Needed when user code + # in the worker is stuck and won't notice the parent closing the RPC + # socket on its own. + if self._process is not None and self._process.poll() is None: + try: + self._process.terminate() + except Exception: + pass diff --git a/src/testium/interpreter/utils/py_process.py b/src/testium/interpreter/utils/py_process.py index 8563879..9d7367f 100644 --- a/src/testium/interpreter/utils/py_process.py +++ b/src/testium/interpreter/utils/py_process.py @@ -123,3 +123,11 @@ class PyProcessBase: def stop(self): if self._rpc is not None: self._rpc.stop() + # Force-kill the worker if it's still running. Needed when user code + # in the worker is stuck (e.g. sleep, blocking I/O) and won't notice + # the parent closing the RPC socket on its own. + if self._process is not None and self._process.poll() is None: + try: + self._process.terminate() + except Exception: + pass diff --git a/src/testium/runtime/jrpc.py b/src/testium/runtime/jrpc.py index 1fd2580..7c5878b 100644 --- a/src/testium/runtime/jrpc.py +++ b/src/testium/runtime/jrpc.py @@ -200,6 +200,7 @@ class JsonRpcConnection: Raises: TimeoutError: If no response is received within `timeout`. + ConnectionAbortedError: If stop() was called while waiting. """ req_id = next(self.id_gen) @@ -214,7 +215,12 @@ class JsonRpcConnection: self.pending.pop(req_id, None) raise TimeoutError("Timeout JSON-RPC") - return self.pending.pop(req_id)["response"] + entry = self.pending.pop(req_id) + if entry["response"] is None: + # Woken by stop() (or by a malformed dispatch) rather than by a + # real response — abort the call so callers don't block further. + raise ConnectionAbortedError("JSON-RPC client stopped") + return entry["response"] def print_info(self, msg): if self.dbg_out is not None: @@ -223,6 +229,10 @@ class JsonRpcConnection: def stop(self): if self.running: self.running = False + # Wake any in-flight call() so it doesn't sit on its (default 1h) + # timeout. The response stays None and call() raises ConnectionAbortedError. + for entry in list(self.pending.values()): + entry["event"].set() def join(self): self.recv_thread.join()