Add context_id to py_func and lua_func for shared persistent subprocess

- py_func and lua_func items accept a context_id parameter; items sharing
  the same id reuse the same subprocess for the duration of the test run
- Subprocess-side tm.setgd/tm.gd use a local fallback dict for non-JSON-
  serializable values (py_func only); serializable values reach the main
  process global dict and are accessible from any test item or subprocess
- Shared subprocess engines are cleaned up in process.py finally block
- LuaProcessBase gains is_alive() (was missing, broke all lua_func items)
- Validation tests cover serializable sharing across different context ids,
  non-serializable sharing within the same context_id, and cross-item access
- RST documentation updated for both py_func and lua_func items

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-18 16:02:36 +02:00
parent 49354b8664
commit d92f518e1e
12 changed files with 390 additions and 52 deletions

View File

@@ -1,19 +1,26 @@
import json
import sys
from py_func.handle import FuncHandler
from lib.tum_except import ETUMRuntimeError
from lib.api import SUPPORTED_API
thismodule = sys.modules[__name__]
# Shared FuncHandler instance used to forward API calls. Remains None
# until `_init_api` is invoked.
_func_call_thread = None
# Local storage for non-JSON-serializable values
_local_dict = {}
def _is_json_serializable(value):
try:
json.dumps(value)
return True
except (TypeError, ValueError):
return False
###############################################################################
# Dynamically create module-level functions for each supported API name.
# Each generated function shares the implementation of `api_call` but
# has a distinct name used as the remote action identifier.
def _make_api(name):
def _wrapper(*params):
if _func_call_thread is not None:
@@ -31,21 +38,62 @@ def _make_api(name):
return _wrapper
for k in SUPPORTED_API:
setattr(thismodule, k, _make_api(k))
if k not in ('gd', 'setgd', 'delgd'):
setattr(thismodule, k, _make_api(k))
###############################################################################
# gd/setgd/delgd with local-dict fallback for non-serializable values
def gd(*params):
key = params[0] if params else None
if key is not None and key in _local_dict:
return _local_dict[key]
if _func_call_thread is not None:
res = _func_call_thread.call("gd", params)
if "result" in res:
return res["result"]
elif "error" in res:
raise ETUMRuntimeError(f"api call to 'tm.gd' failed with error '{res['error']}'")
else:
raise ETUMRuntimeError("api call failure in jrpc client to be reported to testium support team.")
raise ETUMRuntimeError("api not initialized")
def setgd(*params):
key = params[0] if params else None
value = params[1] if len(params) > 1 else None
if key is not None and not _is_json_serializable(value):
_local_dict[key] = value
return None
if _func_call_thread is not None:
res = _func_call_thread.call("setgd", params)
if "result" in res:
return res["result"]
elif "error" in res:
raise ETUMRuntimeError(f"api call to 'tm.setgd' failed with error '{res['error']}'")
else:
raise ETUMRuntimeError("api call failure in jrpc client to be reported to testium support team.")
raise ETUMRuntimeError("api not initialized")
def delgd(*params):
key = params[0] if params else None
if key is not None and key in _local_dict:
del _local_dict[key]
return None
if _func_call_thread is not None:
res = _func_call_thread.call("delgd", params)
if "result" in res:
return res["result"]
elif "error" in res:
raise ETUMRuntimeError(f"api call to 'tm.delgd' failed with error '{res['error']}'")
else:
raise ETUMRuntimeError("api call failure in jrpc client to be reported to testium support team.")
raise ETUMRuntimeError("api not initialized")
def _init_api(host, port, timeout):
"""Start and initialize the remote function handler.
Starts a ``FuncHandler`` bound to ``port``, runs it and blocks until
it signals readiness.
Args:
port: port number or identifier passed to ``FuncHandler``.
Returns:
The initialized ``FuncHandler`` instance assigned to
``_func_call_thread``.
"""
global _func_call_thread
_func_call_thread = FuncHandler(host, port, timeout=timeout)
return _func_call_thread
@@ -53,17 +101,10 @@ def _init_api(host, port, timeout):
###############################################################################
def _remote_print(*values):
"""Forward print-like output to the remote handler.
If a ``_func_call_thread`` is available, this function calls the
handler with action name ``"print"`` and the provided values. Errors
during forwarding are ignored because printing is best-effort.
"""
if _func_call_thread is not None:
try:
_func_call_thread.call("print", values)
except:
# Best-effort: ignore forwarding failures
pass

View File

@@ -275,6 +275,13 @@ Is the python exec path correct ?"""
# Stop python eval execution process
eval_proc.stop()
eval_proc.join()
# Stop shared func context engines (keep_context_id)
for engine in tm.gd("_py_func_contexts", {}).values():
engine.stop()
engine.join()
for engine in tm.gd("_lua_func_contexts", {}).values():
engine.stop()
engine.join()
except Exception as e:
print_exception(e)

View File

@@ -12,10 +12,13 @@ from interpreter.utils.lua_func_exec import LuaFuncExecEngine
from interpreter.utils.api_srv import api_request
from interpreter.utils.constants import TestItemType as cst
_LUA_FUNC_CONTEXTS_KEY = "_lua_func_contexts"
class TestItemLuaFunc(TestItem):
"""lua_func item usage.
func file: func_file.lua, func_name: func, param: [$(variable1), [1, 2, 3], true]
Optional: context_id: <id> — share a persistent process with other lua_func items using the same id.
"""
def __init__(self, dict_item, parent=None, status_queue=None, filename=""):
@@ -27,14 +30,26 @@ class TestItemLuaFunc(TestItem):
self.file_name = self._prms.getParam("file", required=True)
self.func_name = self._prms.getParam("func_name", required=True)
self.params = self._prms.getParamAll("param")
self._context_id = self._prms.getParam("context_id", default=None, processed=False)
except:
raise ETUMSyntaxError(
f"The '{self.cmd()}' test item named '{self.name()}' (child of '{self.parent.name()}') has a missing or wrong parameter",
self.seqFilename(),
)
# Lua functions call subprocess initialization
self._lua_func_proc = LuaFuncExecEngine(tm.gd("lua_bin", ""), api_request, 10)
def _get_engine(self):
"""Return (engine, persistent). If context_id is set, use a shared persistent engine."""
if self._context_id is None:
return self._lua_func_proc, False
ctx_id = self._prms.expanse(self._context_id)
contexts = tm.gd(_LUA_FUNC_CONTEXTS_KEY, {})
if ctx_id not in contexts:
contexts[ctx_id] = LuaFuncExecEngine(tm.gd("lua_bin", ""), api_request, 10)
tm.setgd(_LUA_FUNC_CONTEXTS_KEY, contexts)
return contexts[ctx_id], True
@test_run
def execute(self):
self.result.set(
@@ -48,22 +63,25 @@ class TestItemLuaFunc(TestItem):
print("Parameters list:")
print(textwrap.indent(pprint.pformat(pl), " |"))
self._lua_func_proc.start()
if not self._lua_func_proc.wait_ready(10):
raise ETUMRuntimeError(
f"""Impossible to start the external lua execution process.
engine, persistent = self._get_engine()
if not engine.is_alive():
engine.start()
if not engine.wait_ready(10):
raise ETUMRuntimeError(
f"""Impossible to start the external lua execution process.
Is the lua path correct ?
lua_bin = {tm.gd("lua_bin", "no lua path defined")}
Are "lua-sockets" and "lua-cjson" installed ?
Is the lua environnment well defined in the "LUA_PATH" and "LUA_CPATH" variables ?"""
)
)
try:
success, ret = self._lua_func_proc.func_call(self.file_name, self.func_name, pl)
success, ret = engine.func_call(self.file_name, self.func_name, pl)
finally:
# Stops lua function execution process
self._lua_func_proc.stop()
self._lua_func_proc.join()
if not persistent:
engine.stop()
engine.join()
if success == TestValue.SUCCESS:
self.result.set(TestValue.SUCCESS)
@@ -73,7 +91,6 @@ Is the lua environnment well defined in the "LUA_PATH" and "LUA_CPATH" variables
print("Returned value:")
print(textwrap.indent(pprint.pformat(res), " |"))
# The result of the func test item is put in global dir and result
tm.setgd("lfn_" + self._name, res)
self.result.value = res
@@ -88,5 +105,5 @@ Is the lua environnment well defined in the "LUA_PATH" and "LUA_CPATH" variables
traceback.print_exception(*sys.exc_info())
self.result.set(
TestValue.FAILURE,
'Unrecoverable "py_func" item error from {}'.format(self.func_name),
'Unrecoverable "lua_func" item error from {}'.format(self.func_name),
)

View File

@@ -12,10 +12,13 @@ from interpreter.utils.py_func_exec import PyFuncExecEngine
from interpreter.utils.api_srv import api_request
from interpreter.utils.constants import TestItemType as cst
_PY_FUNC_CONTEXTS_KEY = "_py_func_contexts"
class TestItemPyFunc(TestItem):
"""py_func item usage.
func file: func_file.py, func_name: func, param: [$(variable1), [1, 2, 3], true]
Optional: context_id: <id> — share a persistent process with other py_func items using the same id.
"""
def __init__(self, dict_item, parent=None, status_queue=None, filename=""):
@@ -27,6 +30,7 @@ class TestItemPyFunc(TestItem):
self.file_name = self._prms.getParam("file", required=True)
self.func_name = self._prms.getParam("func_name", required=True)
self.params = self._prms.getParamAll("param")
self._context_id = self._prms.getParam("context_id", default=None, processed=False)
except:
raise ETUMSyntaxError(
f"The '{self.cmd()}' test item named '{self.name()}' (child of '{self.parent.name()}') has a missing or wrong parameter",
@@ -34,6 +38,18 @@ class TestItemPyFunc(TestItem):
)
self._py_func_proc = PyFuncExecEngine(tm.gd("python_bin", ""), api_request, 10)
def _get_engine(self):
"""Return (engine, persistent). If context_id is set, use a shared persistent engine."""
if self._context_id is None:
return self._py_func_proc, False
ctx_id = self._prms.expanse(self._context_id)
contexts = tm.gd(_PY_FUNC_CONTEXTS_KEY, {})
if ctx_id not in contexts:
contexts[ctx_id] = PyFuncExecEngine(tm.gd("python_bin", ""), api_request, 10)
tm.setgd(_PY_FUNC_CONTEXTS_KEY, contexts)
return contexts[ctx_id], True
@test_run
def execute(self):
self.result.set(
@@ -47,20 +63,23 @@ class TestItemPyFunc(TestItem):
print("Parameters list:")
print(textwrap.indent(pprint.pformat(pl), " |"))
# start the process for executing external python
self._py_func_proc.start()
if not self._py_func_proc.wait_ready():
raise ETUMRuntimeError(
f"""Impossible to start the external python execution process.
engine, persistent = self._get_engine()
if not engine.is_alive():
engine.start()
if not engine.wait_ready():
raise ETUMRuntimeError(
f"""Impossible to start the external python execution process.
Is the python path correct ?
python_bin = {tm.gd("python_bin", "no python path defined")}"""
)
)
try:
success, ret = self._py_func_proc.func_call(self.file_name, self.func_name, pl)
success, ret = engine.func_call(self.file_name, self.func_name, pl)
finally:
# Stops python function execution process
self._py_func_proc.stop()
self._py_func_proc.join()
if not persistent:
engine.stop()
engine.join()
if success == TestValue.SUCCESS:
self.result.set(TestValue.SUCCESS)
@@ -70,7 +89,6 @@ python_bin = {tm.gd("python_bin", "no python path defined")}"""
print("Returned value:")
print(textwrap.indent(pprint.pformat(res), " |"))
# The result of the func test item is put in global dir and result
tm.setgd("pfn_" + self._name, res)
self.result.value = res

View File

@@ -221,6 +221,11 @@ class LuaProcessBase:
return self._rpc.wait_ready(timeout)
return False
def is_alive(self):
if self._rpc is not None:
return self._rpc.is_alive()
return False
def stop(self):
"""
Stops the RPC client.