Compare commits
3 Commits
116e528a7d
...
50d183d191
| Author | SHA1 | Date | |
|---|---|---|---|
| 50d183d191 | |||
| 2177715641 | |||
| a728f561be |
@@ -20,6 +20,22 @@ main:
|
|||||||
param:
|
param:
|
||||||
- 123
|
- 123
|
||||||
|
|
||||||
|
- py_func:
|
||||||
|
name: python long wait
|
||||||
|
doc: The purpose of this step is to try the tasks "stop" interruption
|
||||||
|
file: utils.py
|
||||||
|
func_name: long_wait
|
||||||
|
param:
|
||||||
|
- 10
|
||||||
|
|
||||||
|
- lua_func:
|
||||||
|
name: lua long wait
|
||||||
|
doc: The purpose of this step is to try the tasks "stop" interruption
|
||||||
|
file: lua_func.lua
|
||||||
|
func_name: long_wait
|
||||||
|
param:
|
||||||
|
- 10
|
||||||
|
|
||||||
- sleep:
|
- sleep:
|
||||||
name: sleep item
|
name: sleep item
|
||||||
dialog: true
|
dialog: true
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
tm = require("tm")
|
tm = require("tm")
|
||||||
|
socket = require("socket")
|
||||||
|
|
||||||
local module = {}
|
local module = {}
|
||||||
|
|
||||||
@@ -7,4 +8,8 @@ function module.func_to_be_executed(param)
|
|||||||
return param
|
return param
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function module.long_wait(sec)
|
||||||
|
socket.sleep(sec)
|
||||||
|
end
|
||||||
|
|
||||||
return module
|
return module
|
||||||
@@ -17,18 +17,3 @@ plot_log_path: /tmp/testium_plot/$(testrun_date)/$(testrun_time)/
|
|||||||
python_path_Windows: C:\Users\François\Applications\Python313\python.exe
|
python_path_Windows: C:\Users\François\Applications\Python313\python.exe
|
||||||
python_path_Linux: $(home)/tmp/tum_venv/bin/python3
|
python_path_Linux: $(home)/tmp/tum_venv/bin/python3
|
||||||
|
|
||||||
# lua_bin_Windows: C:\Lua\5.1
|
|
||||||
# lua_bin_Linux: /usr/bin/lua
|
|
||||||
|
|
||||||
LUA_PATH_Linux: /usr/share/lua/5.4/?.lua;/usr/local/share/lua/5.4/?.lua;/usr/local/share/lua/5.4/?/init.lua;/usr/share/lua/5.4/?/init.lua;/usr/local/lib/lua/5.4/?.lua;/usr/local/lib/lua/5.4/?/init.lua;/usr/lib/lua/5.4/?.lua;/usr/lib/lua/5.4/?/init.lua;./?.lua;./?/init.lua;/home/francois/.luarocks/share/lua/5.4/?.lua;/home/francois/.luarocks/share/lua/5.4/?/init.lua
|
|
||||||
LUA_CPATH_Linux: /usr/local/lib/lua/5.4/?.so;/usr/lib/lua/5.4/?.so;/usr/local/lib/lua/5.4/loadall.so;/usr/lib/lua/5.4/loadall.so;./?.so;/home/francois/.luarocks/lib/lua/5.4/?.so
|
|
||||||
PATH_Linux:
|
|
||||||
|
|
||||||
LUA_PATH_Windows: ;.\?.lua;C:\Lua\5.1\lua\?.lua;C:\Lua\5.1\lua\?\init.lua;C:\Lua\5.1\?.lua;C:\Lua\5.1\?\init.lua;C:\Lua\5.1\lua\?.luac
|
|
||||||
LUA_CPATH_Windows: .\?.dll;C:\Lua\5.1\?.dll;C:\Lua\5.1\loadall.dll;C:\Lua\5.1\clibs\?.dll;C:\Lua\5.1\clibs\loadall.dll;.\?51.dll;C:\Lua\5.1\?51.dll;C:\Lua\5.1\clibs\?51.dll
|
|
||||||
PATH_Windows: ""
|
|
||||||
|
|
||||||
lua_env:
|
|
||||||
PATH: $(PATH_$(os))
|
|
||||||
LUA_PATH: $(LUA_PATH_$(os))
|
|
||||||
LUA_CPATH: $(LUA_CPATH_$(os))
|
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
from time import sleep
|
||||||
|
|
||||||
def dummy_exit(useless1, useless2):
|
def dummy_exit(useless1, useless2):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -11,3 +13,6 @@ def funcToBeExecuted (bla):
|
|||||||
def funcToBeExecuted2 (bla):
|
def funcToBeExecuted2 (bla):
|
||||||
print(bla)
|
print(bla)
|
||||||
return blo
|
return blo
|
||||||
|
|
||||||
|
def long_wait (sec):
|
||||||
|
sleep(sec)
|
||||||
@@ -11,6 +11,7 @@ import threading
|
|||||||
from telnetlib3 import Telnet, DO, WILL, WONT, TTYPE, IAC, SB, SE, theNULL
|
from telnetlib3 import Telnet, DO, WILL, WONT, TTYPE, IAC, SB, SE, theNULL
|
||||||
|
|
||||||
TIMEOUT_NULL = 0.000001
|
TIMEOUT_NULL = 0.000001
|
||||||
|
STOP_POLL_INTERVAL = 0.2
|
||||||
|
|
||||||
|
|
||||||
class BytesStore(object):
|
class BytesStore(object):
|
||||||
@@ -123,12 +124,14 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
|||||||
# c = ''
|
# c = ''
|
||||||
return c
|
return c
|
||||||
|
|
||||||
def read_until(self, match, timeout=None, return_data=False, mute=False):
|
def read_until(self, match, timeout=None, return_data=False, mute=False, should_stop=None):
|
||||||
"""
|
"""
|
||||||
read until the string 'match is found
|
read until the string 'match is found
|
||||||
If timeout is not set (None), this function runs indefinitely
|
If timeout is not set (None), this function runs indefinitely
|
||||||
If timeout is set to zero, this function returns immediately
|
If timeout is set to zero, this function returns immediately
|
||||||
If mute is set to True the characters read from the console will not be displayed
|
If mute is set to True the characters read from the console will not be displayed
|
||||||
|
If should_stop is a callable, it is polled between reads (every STOP_POLL_INTERVAL
|
||||||
|
at most) and the loop exits early — like a timeout — when it returns True.
|
||||||
|
|
||||||
If function fails (because of a timeout) it will return a 'status' integer set to -1
|
If function fails (because of a timeout) it will return a 'status' integer set to -1
|
||||||
otherwise it will return 0.
|
otherwise it will return 0.
|
||||||
@@ -139,13 +142,6 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
|||||||
status = -1
|
status = -1
|
||||||
if not match:
|
if not match:
|
||||||
raise ValueError('match parameter can not be empty')
|
raise ValueError('match parameter can not be empty')
|
||||||
# replace all '\r' by '\n' as any '\r' read will undergo the same replacement
|
|
||||||
# match = match.replace('\r\n', '\n')
|
|
||||||
# match = match.replace('\r', '')
|
|
||||||
|
|
||||||
# update the console timeout in conformity with what is required.
|
|
||||||
|
|
||||||
self.set_read_timeout(timeout)
|
|
||||||
|
|
||||||
if timeout is None:
|
if timeout is None:
|
||||||
timeout = 1000000
|
timeout = 1000000
|
||||||
@@ -159,6 +155,7 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
|||||||
# buffer is empty
|
# buffer is empty
|
||||||
# Otherwise we are waiting for the timeout to rise
|
# Otherwise we are waiting for the timeout to rise
|
||||||
if timeout < TIMEOUT_NULL:
|
if timeout < TIMEOUT_NULL:
|
||||||
|
self.set_read_timeout(0)
|
||||||
data = self.readchar(0)
|
data = self.readchar(0)
|
||||||
|
|
||||||
while (status < 0) and ((data is not None) and (data != b'')):
|
while (status < 0) and ((data is not None) and (data != b'')):
|
||||||
@@ -191,39 +188,45 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
|
|||||||
|
|
||||||
# Timeout different than zero
|
# Timeout different than zero
|
||||||
else:
|
else:
|
||||||
|
# Poll in short chunks so a stop request is honored within
|
||||||
|
# STOP_POLL_INTERVAL, regardless of the per-protocol blocking
|
||||||
|
# behavior of readchar().
|
||||||
|
self.set_read_timeout(STOP_POLL_INTERVAL)
|
||||||
|
|
||||||
time_is_out = threading.Event()
|
time_is_out = threading.Event()
|
||||||
timer = threading.Timer(timeout, lambda: time_is_out.set())
|
timer = threading.Timer(timeout, lambda: time_is_out.set())
|
||||||
timer.start()
|
timer.start()
|
||||||
|
|
||||||
# We are waiting for the timeout to rise
|
try:
|
||||||
|
while (status < 0) and (not time_is_out.is_set()):
|
||||||
|
if should_stop is not None and should_stop():
|
||||||
|
break
|
||||||
|
|
||||||
while (status < 0) and (not time_is_out.isSet()):
|
data = self.readchar(STOP_POLL_INTERVAL)
|
||||||
|
if data is not None:
|
||||||
data = self.readchar(timeout)
|
data = self._compute_char(data)
|
||||||
if data is not None:
|
if data != '':
|
||||||
data = self._compute_char(data)
|
|
||||||
if data != '':
|
|
||||||
if not mute:
|
|
||||||
self.string_buffer += data
|
|
||||||
read_data += data
|
|
||||||
|
|
||||||
search_deque.append(data)
|
|
||||||
if search_deque == match_deque:
|
|
||||||
timer.cancel()
|
|
||||||
status = 0
|
|
||||||
if (not mute) and (data != '\n'):
|
|
||||||
self.string_buffer += '\n'
|
|
||||||
|
|
||||||
if data == '\n' or (status >= 0):
|
|
||||||
# the datas are written line by line for display optimisation in GUI mode
|
|
||||||
if not mute:
|
if not mute:
|
||||||
self.string_buffer = self.string_buffer.replace('\r\n', '\n')
|
self.string_buffer += data
|
||||||
self.string_buffer = self.string_buffer.replace('\r', '')
|
read_data += data
|
||||||
self.stream.write(self.string_buffer)
|
|
||||||
|
|
||||||
date_str = str(datetime.now()).split('.')[0].split(' ')[1]
|
search_deque.append(data)
|
||||||
self.string_buffer = '[{} {}]'.format(date_str, self.name)
|
if search_deque == match_deque:
|
||||||
|
status = 0
|
||||||
|
if (not mute) and (data != '\n'):
|
||||||
|
self.string_buffer += '\n'
|
||||||
|
|
||||||
|
if data == '\n' or (status >= 0):
|
||||||
|
# the datas are written line by line for display optimisation in GUI mode
|
||||||
|
if not mute:
|
||||||
|
self.string_buffer = self.string_buffer.replace('\r\n', '\n')
|
||||||
|
self.string_buffer = self.string_buffer.replace('\r', '')
|
||||||
|
self.stream.write(self.string_buffer)
|
||||||
|
|
||||||
|
date_str = str(datetime.now()).split('.')[0].split(' ')[1]
|
||||||
|
self.string_buffer = '[{} {}]'.format(date_str, self.name)
|
||||||
|
finally:
|
||||||
|
timer.cancel()
|
||||||
|
|
||||||
if return_data:
|
if return_data:
|
||||||
return status, read_data
|
return status, read_data
|
||||||
|
|||||||
@@ -307,11 +307,17 @@ class TestItemConsoleReadUntil(TestItemConsoleAction):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
status, data = cons.read_until(
|
status, data = cons.read_until(
|
||||||
ru, timeout=read_timeout, return_data=True, mute=mute
|
ru, timeout=read_timeout, return_data=True, mute=mute,
|
||||||
|
should_stop=self.isStopped,
|
||||||
)
|
)
|
||||||
if status == 0:
|
if status == 0:
|
||||||
self.result.set(TestValue.SUCCESS)
|
self.result.set(TestValue.SUCCESS)
|
||||||
self.result.value = data
|
self.result.value = data
|
||||||
|
elif self.isStopped():
|
||||||
|
self.result.set(
|
||||||
|
result=TestValue.FAILURE,
|
||||||
|
message="Console read aborted on stop request",
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
self.result.set(result=TestValue.FAILURE, message="No matching text")
|
self.result.set(result=TestValue.FAILURE, message="No matching text")
|
||||||
if mute:
|
if mute:
|
||||||
|
|||||||
@@ -105,6 +105,7 @@ class TestItemJSRPCActionQuery(TestItemAction):
|
|||||||
jrpc_id = randint(1, (2**32) - 1)
|
jrpc_id = randint(1, (2**32) - 1)
|
||||||
send_only = self._prms.expanse(self._send_only)
|
send_only = self._prms.expanse(self._send_only)
|
||||||
timeout = self._prms.expanse(self._timeout)
|
timeout = self._prms.expanse(self._timeout)
|
||||||
|
self.token.set_should_stop(self.isStopped)
|
||||||
try:
|
try:
|
||||||
success, result = self.token.query(
|
success, result = self.token.query(
|
||||||
meth, obj, jrpc_id, send_only, timeout=timeout
|
meth, obj, jrpc_id, send_only, timeout=timeout
|
||||||
@@ -146,6 +147,7 @@ class TestItemJSRPCActionReceive(TestItemAction):
|
|||||||
def execute(self):
|
def execute(self):
|
||||||
timeout = self._prms.expanse(self._timeout)
|
timeout = self._prms.expanse(self._timeout)
|
||||||
jrpc_id = self._prms.expanse(self._jrpc_id)
|
jrpc_id = self._prms.expanse(self._jrpc_id)
|
||||||
|
self.token.set_should_stop(self.isStopped)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
success, result = self.token.receive(jrpc_id, timeout)
|
success, result = self.token.receive(jrpc_id, timeout)
|
||||||
|
|||||||
@@ -2,10 +2,11 @@ import json
|
|||||||
import socket
|
import socket
|
||||||
import re
|
import re
|
||||||
import struct
|
import struct
|
||||||
|
import time
|
||||||
|
|
||||||
from runtime.tum_except import ETUMRuntimeError
|
from runtime.tum_except import ETUMRuntimeError
|
||||||
import api.testium as tm
|
import api.testium as tm
|
||||||
from api.console import Console
|
from api.console import Console, STOP_POLL_INTERVAL
|
||||||
|
|
||||||
|
|
||||||
def is_ip_address(address):
|
def is_ip_address(address):
|
||||||
@@ -45,9 +46,16 @@ class JrpcAdapter:
|
|||||||
self._jrpc_version = version
|
self._jrpc_version = version
|
||||||
self._mute = mute
|
self._mute = mute
|
||||||
self._timeout = timeout
|
self._timeout = timeout
|
||||||
|
# Optional callable polled by _receive() implementations to abort
|
||||||
|
# waits early when the test is being stopped. Set by the test item
|
||||||
|
# action before each query/receive call.
|
||||||
|
self._should_stop = None
|
||||||
if not (version == "1.0" or version == "2.0"):
|
if not (version == "1.0" or version == "2.0"):
|
||||||
raise ETUMRuntimeError("Invalid JSONRPC version passed.")
|
raise ETUMRuntimeError("Invalid JSONRPC version passed.")
|
||||||
|
|
||||||
|
def set_should_stop(self, cb):
|
||||||
|
self._should_stop = cb
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def timeout(self):
|
def timeout(self):
|
||||||
return self._timeout
|
return self._timeout
|
||||||
@@ -249,32 +257,38 @@ class JrpcUdpAdapter(JrpcAdapter):
|
|||||||
print(f" | sent to @{self._server}:{self._snd_port}")
|
print(f" | sent to @{self._server}:{self._snd_port}")
|
||||||
|
|
||||||
def _receive(self, timeout: float) -> str:
|
def _receive(self, timeout: float) -> str:
|
||||||
|
# Poll in short chunks so a stop request is honored within
|
||||||
|
# STOP_POLL_INTERVAL.
|
||||||
|
self.sock.settimeout(STOP_POLL_INTERVAL)
|
||||||
|
deadline = time.monotonic() + float(timeout)
|
||||||
|
data = None
|
||||||
|
addr = None
|
||||||
|
while True:
|
||||||
|
if self._should_stop is not None and self._should_stop():
|
||||||
|
raise ETUMRuntimeError("JSONRPC udp receive aborted on stop request.")
|
||||||
|
try:
|
||||||
|
data, addr = self.sock.recvfrom(self._bufsize)
|
||||||
|
break
|
||||||
|
except socket.timeout:
|
||||||
|
if time.monotonic() >= deadline:
|
||||||
|
raise ETUMRuntimeError(
|
||||||
|
"JSONRPC udp answer took too long. Try to increase the timeout."
|
||||||
|
)
|
||||||
|
|
||||||
# configures the reception timeout
|
# In case of buffer overload we chose to complain
|
||||||
self.sock.settimeout(timeout)
|
if len(data) >= self._bufsize:
|
||||||
|
|
||||||
# Receives the answer from the server
|
|
||||||
try:
|
|
||||||
data, addr = self.sock.recvfrom(self._bufsize)
|
|
||||||
|
|
||||||
# In case of buffer overload we chose to complain
|
|
||||||
if len(data) >= self._bufsize:
|
|
||||||
raise ETUMRuntimeError(
|
|
||||||
"JSONRPC udp answer size overflow. Try to increase the bufsize"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Converts binary to string
|
|
||||||
res = data.decode()
|
|
||||||
|
|
||||||
# Don't log if mute
|
|
||||||
if not self._mute:
|
|
||||||
print(f" | UDP answer: '{res}'")
|
|
||||||
print(f" | received from @{addr[0]}:{addr[1]}")
|
|
||||||
|
|
||||||
except socket.timeout:
|
|
||||||
raise ETUMRuntimeError(
|
raise ETUMRuntimeError(
|
||||||
"JSONRPC udp answer took too long. Try to increase the timeout."
|
"JSONRPC udp answer size overflow. Try to increase the bufsize"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Converts binary to string
|
||||||
|
res = data.decode()
|
||||||
|
|
||||||
|
# Don't log if mute
|
||||||
|
if not self._mute:
|
||||||
|
print(f" | UDP answer: '{res}'")
|
||||||
|
print(f" | received from @{addr[0]}:{addr[1]}")
|
||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
def _build_query(self, method: str, obj, jrpc_id: int):
|
def _build_query(self, method: str, obj, jrpc_id: int):
|
||||||
@@ -339,11 +353,16 @@ class JrpcConsoleAdapter(JrpcAdapter):
|
|||||||
|
|
||||||
def _receive(self, timeout: float) -> str:
|
def _receive(self, timeout: float) -> str:
|
||||||
status, data = self._cons.read_until(
|
status, data = self._cons.read_until(
|
||||||
self._endswith, timeout, return_data=True, mute=self._mute
|
self._endswith, timeout, return_data=True, mute=self._mute,
|
||||||
|
should_stop=self._should_stop,
|
||||||
)
|
)
|
||||||
|
|
||||||
# if we did not receive anything, we complain
|
# if we did not receive anything, we complain
|
||||||
if not status == 0:
|
if not status == 0:
|
||||||
|
if self._should_stop is not None and self._should_stop():
|
||||||
|
raise ETUMRuntimeError(
|
||||||
|
f"JSONRPC console receive aborted on stop request."
|
||||||
|
)
|
||||||
raise ETUMRuntimeError(
|
raise ETUMRuntimeError(
|
||||||
f"The '{self._cons.name}' console did not answer in the requested time."
|
f"The '{self._cons.name}' console did not answer in the requested time."
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -45,6 +45,18 @@ class TestItemLuaFunc(TestItem):
|
|||||||
tm.setgd(_LUA_FUNC_CONTEXTS_KEY, contexts)
|
tm.setgd(_LUA_FUNC_CONTEXTS_KEY, contexts)
|
||||||
return contexts[ctx_id], True
|
return contexts[ctx_id], True
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
super().stop()
|
||||||
|
# Tear down the worker so any in-flight func_call returns promptly.
|
||||||
|
# join() clears _rpc/_process so a subsequent item reusing the same
|
||||||
|
# context_id can restart the engine cleanly.
|
||||||
|
try:
|
||||||
|
engine, _ = self._get_engine()
|
||||||
|
engine.stop()
|
||||||
|
engine.join()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
@test_run
|
@test_run
|
||||||
def execute(self):
|
def execute(self):
|
||||||
self.result.set(
|
self.result.set(
|
||||||
@@ -96,9 +108,15 @@ Is the lua environnment well defined in the "LUA_PATH" and "LUA_CPATH" variables
|
|||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
except ConnectionAbortedError:
|
||||||
|
self.result.set(TestValue.FAILURE, "lua_func aborted on stop request")
|
||||||
|
print("lua_func aborted on stop request.")
|
||||||
except:
|
except:
|
||||||
traceback.print_exception(*sys.exc_info())
|
traceback.print_exception(*sys.exc_info())
|
||||||
self.result.set(
|
if self.isStopped():
|
||||||
TestValue.FAILURE,
|
self.result.set(TestValue.FAILURE, "lua_func aborted on stop request")
|
||||||
'Unrecoverable "lua_func" item error from {}'.format(self.func_name),
|
else:
|
||||||
)
|
self.result.set(
|
||||||
|
TestValue.FAILURE,
|
||||||
|
'Unrecoverable "lua_func" item error from {}'.format(self.func_name),
|
||||||
|
)
|
||||||
|
|||||||
@@ -45,6 +45,18 @@ class TestItemPyFunc(TestItem):
|
|||||||
tm.setgd(_PY_FUNC_CONTEXTS_KEY, contexts)
|
tm.setgd(_PY_FUNC_CONTEXTS_KEY, contexts)
|
||||||
return contexts[ctx_id], True
|
return contexts[ctx_id], True
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
super().stop()
|
||||||
|
# Tear down the worker so any in-flight func_call returns promptly.
|
||||||
|
# join() clears _rpc/_process so a subsequent item reusing the same
|
||||||
|
# context_id can restart the engine cleanly.
|
||||||
|
try:
|
||||||
|
engine, _ = self._get_engine()
|
||||||
|
engine.stop()
|
||||||
|
engine.join()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
@test_run
|
@test_run
|
||||||
def execute(self):
|
def execute(self):
|
||||||
self.result.set(
|
self.result.set(
|
||||||
@@ -94,9 +106,15 @@ python_bin = {tm.gd("python_bin", "no python path defined")}"""
|
|||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
|
except ConnectionAbortedError:
|
||||||
|
self.result.set(TestValue.FAILURE, "py_func aborted on stop request")
|
||||||
|
print("py_func aborted on stop request.")
|
||||||
except:
|
except:
|
||||||
traceback.print_exception(*sys.exc_info())
|
traceback.print_exception(*sys.exc_info())
|
||||||
self.result.set(
|
if self.isStopped():
|
||||||
TestValue.FAILURE,
|
self.result.set(TestValue.FAILURE, "py_func aborted on stop request")
|
||||||
'Unrecoverable "py_func" item error from {}'.format(self.func_name),
|
else:
|
||||||
)
|
self.result.set(
|
||||||
|
TestValue.FAILURE,
|
||||||
|
'Unrecoverable "py_func" item error from {}'.format(self.func_name),
|
||||||
|
)
|
||||||
|
|||||||
@@ -80,4 +80,7 @@ class TestItemSleep(TestItem):
|
|||||||
end_time = _time.time() + float(timeout)
|
end_time = _time.time() + float(timeout)
|
||||||
while _time.time() < end_time and not self._is_stopped:
|
while _time.time() < end_time and not self._is_stopped:
|
||||||
sleep(min(0.05, end_time - _time.time()))
|
sleep(min(0.05, end_time - _time.time()))
|
||||||
self.result.set(TestValue.SUCCESS, 'Sleep %s sec' % (str(timeout)))
|
if self._is_stopped:
|
||||||
|
self.result.set(TestValue.FAILURE, 'Sleep aborted on stop request')
|
||||||
|
else:
|
||||||
|
self.result.set(TestValue.SUCCESS, 'Sleep %s sec' % (str(timeout)))
|
||||||
|
|||||||
@@ -146,4 +146,12 @@ class LuaProcessBase:
|
|||||||
"""
|
"""
|
||||||
if self._rpc is not None:
|
if self._rpc is not None:
|
||||||
self._rpc.stop()
|
self._rpc.stop()
|
||||||
|
# Force-kill the worker if it's still running. Needed when user code
|
||||||
|
# in the worker is stuck and won't notice the parent closing the RPC
|
||||||
|
# socket on its own.
|
||||||
|
if self._process is not None and self._process.poll() is None:
|
||||||
|
try:
|
||||||
|
self._process.terminate()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|||||||
@@ -123,3 +123,11 @@ class PyProcessBase:
|
|||||||
def stop(self):
|
def stop(self):
|
||||||
if self._rpc is not None:
|
if self._rpc is not None:
|
||||||
self._rpc.stop()
|
self._rpc.stop()
|
||||||
|
# Force-kill the worker if it's still running. Needed when user code
|
||||||
|
# in the worker is stuck (e.g. sleep, blocking I/O) and won't notice
|
||||||
|
# the parent closing the RPC socket on its own.
|
||||||
|
if self._process is not None and self._process.poll() is None:
|
||||||
|
try:
|
||||||
|
self._process.terminate()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|||||||
@@ -200,6 +200,7 @@ class JsonRpcConnection:
|
|||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
TimeoutError: If no response is received within `timeout`.
|
TimeoutError: If no response is received within `timeout`.
|
||||||
|
ConnectionAbortedError: If stop() was called while waiting.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
req_id = next(self.id_gen)
|
req_id = next(self.id_gen)
|
||||||
@@ -214,7 +215,12 @@ class JsonRpcConnection:
|
|||||||
self.pending.pop(req_id, None)
|
self.pending.pop(req_id, None)
|
||||||
raise TimeoutError("Timeout JSON-RPC")
|
raise TimeoutError("Timeout JSON-RPC")
|
||||||
|
|
||||||
return self.pending.pop(req_id)["response"]
|
entry = self.pending.pop(req_id)
|
||||||
|
if entry["response"] is None:
|
||||||
|
# Woken by stop() (or by a malformed dispatch) rather than by a
|
||||||
|
# real response — abort the call so callers don't block further.
|
||||||
|
raise ConnectionAbortedError("JSON-RPC client stopped")
|
||||||
|
return entry["response"]
|
||||||
|
|
||||||
def print_info(self, msg):
|
def print_info(self, msg):
|
||||||
if self.dbg_out is not None:
|
if self.dbg_out is not None:
|
||||||
@@ -223,6 +229,10 @@ class JsonRpcConnection:
|
|||||||
def stop(self):
|
def stop(self):
|
||||||
if self.running:
|
if self.running:
|
||||||
self.running = False
|
self.running = False
|
||||||
|
# Wake any in-flight call() so it doesn't sit on its (default 1h)
|
||||||
|
# timeout. The response stays None and call() raises ConnectionAbortedError.
|
||||||
|
for entry in list(self.pending.values()):
|
||||||
|
entry["event"].set()
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
self.recv_thread.join()
|
self.recv_thread.join()
|
||||||
|
|||||||
Reference in New Issue
Block a user