py_func an lua_func processes processes faster.
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import time
|
||||
import pprint
|
||||
import textwrap
|
||||
|
||||
@@ -32,7 +32,7 @@ class TestItemPyFunc(TestItem):
|
||||
f"The '{self.cmd()}' test item named '{self.name()}' (child of '{self.parent.name()}') has a missing or wrong parameter",
|
||||
self.seqFilename(),
|
||||
)
|
||||
self._proc = PyFuncExecEngine(tm.gd("python_bin", ""), api_request, 10)
|
||||
self._py_func_proc = PyFuncExecEngine(tm.gd("python_bin", ""), api_request, 10)
|
||||
|
||||
@test_run
|
||||
def execute(self):
|
||||
@@ -48,21 +48,26 @@ class TestItemPyFunc(TestItem):
|
||||
tm.print_debug("Parameters list:")
|
||||
tm.print_debug(textwrap.indent(pprint.pformat(pl), " |"))
|
||||
|
||||
# start the process for executing external python
|
||||
self._proc.start()
|
||||
if not self._proc.wait_ready(10):
|
||||
raise ETUMRuntimeError(
|
||||
f"""Impossible to start the external python execution process.
|
||||
# start the process for executing external python
|
||||
t0 = time.monotonic()
|
||||
self._py_func_proc.start()
|
||||
t1 = time.monotonic()
|
||||
if not self._py_func_proc.wait_ready():
|
||||
raise ETUMRuntimeError(
|
||||
f"""Impossible to start the external python execution process.
|
||||
Is the python path correct ?
|
||||
python_bin = {tm.gd("python_bin", "no python path defined")}"""
|
||||
)
|
||||
|
||||
)
|
||||
t2 = time.monotonic()
|
||||
tm.print_info(f"t1 = {(t1-t0):0.2f}. t2 = {(t2-t1):0.2f}")
|
||||
try:
|
||||
success, ret = self._proc.func_call(self.file_name, self.func_name, pl)
|
||||
success, ret = self._py_func_proc.func_call(self.file_name, self.func_name, pl)
|
||||
finally:
|
||||
# Stops python function execution process
|
||||
self._proc.stop()
|
||||
self._proc.join()
|
||||
self._py_func_proc.stop()
|
||||
t3 = time.monotonic()
|
||||
self._py_func_proc.join()
|
||||
tm.print_info(f"t3 = {(t3-t2):0.3f}. t4 = {(time.monotonic()-t3):0.3f}")
|
||||
|
||||
if success == TestValue.SUCCESS:
|
||||
self.result.set(TestValue.SUCCESS)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import socket
|
||||
import json
|
||||
from time import monotonic
|
||||
import threading
|
||||
import itertools
|
||||
from time import sleep
|
||||
@@ -58,7 +59,6 @@ class JsonRpcConnection:
|
||||
name,
|
||||
conn: socket.socket,
|
||||
req_handler: Callable[..., Any],
|
||||
timeout=0.2,
|
||||
dbg_out=None,
|
||||
):
|
||||
self.name = name
|
||||
@@ -74,8 +74,9 @@ class JsonRpcConnection:
|
||||
self.id_gen = itertools.count(1)
|
||||
self.running = True
|
||||
self._dbg_out = dbg_out
|
||||
self._event_ready = threading.Event()
|
||||
|
||||
self.conn.settimeout(timeout)
|
||||
self.conn.settimeout(0.1)
|
||||
|
||||
self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
|
||||
self.recv_thread.start()
|
||||
@@ -93,6 +94,7 @@ class JsonRpcConnection:
|
||||
buffer = b""
|
||||
|
||||
try:
|
||||
self._event_ready.set()
|
||||
while self.running:
|
||||
try:
|
||||
data = self.conn.recv(4096)
|
||||
@@ -222,6 +224,12 @@ class JsonRpcConnection:
|
||||
def join(self):
|
||||
self.recv_thread.join()
|
||||
|
||||
def is_alive(self):
|
||||
self.recv_thread.is_alive()
|
||||
|
||||
def wait_ready(self, timeout=None):
|
||||
return self._event_ready.wait(timeout)
|
||||
|
||||
|
||||
class JsonRpcBase(threading.Thread):
|
||||
"""Threaded base class for simple JSON-RPC server/client helpers.
|
||||
@@ -292,6 +300,7 @@ class JsonRpcBase(threading.Thread):
|
||||
self._rpc = JsonRpcConnection(
|
||||
self.name, sock, self.handle_request, dbg_out=self.dbg_out
|
||||
)
|
||||
self._rpc.wait_ready()
|
||||
self._event_ready.set()
|
||||
|
||||
def wait_ready(self, timeout=None):
|
||||
@@ -342,16 +351,12 @@ class JsonRpcSrv(JsonRpcBase):
|
||||
self.print_info(f"listening on {self._host}:{self._port}")
|
||||
|
||||
self.print_info(f"awaiting connection for {self._timeout} secs")
|
||||
tslice = 0.2
|
||||
sock.settimeout(tslice)
|
||||
t = self._timeout
|
||||
sock.settimeout(self._timeout)
|
||||
while True:
|
||||
try:
|
||||
conn, addr = sock.accept()
|
||||
break
|
||||
except socket.timeout:
|
||||
t -= tslice
|
||||
if t < 0:
|
||||
raise ETUMRuntimeError(f"{self.name}: Timeout")
|
||||
|
||||
self.print_info("Client connected")
|
||||
@@ -439,22 +444,20 @@ class JsonRpcClient(JsonRpcBase):
|
||||
# TCP/IP socket creation
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
tslice = 0.5
|
||||
t = self._timeout
|
||||
sock.settimeout(tslice)
|
||||
tslice = 0.1
|
||||
t0 = monotonic()
|
||||
sock.settimeout(0)
|
||||
# Link of the socket at the configured port
|
||||
while True:
|
||||
try:
|
||||
sleep(tslice)
|
||||
sock.connect((self._host, self._port))
|
||||
break
|
||||
except Exception as e:
|
||||
t -= tslice
|
||||
if t < 0:
|
||||
if (monotonic() - t0) > self._timeout:
|
||||
raise ETUMRuntimeError(
|
||||
f"{self.name}: failed to connect : {e}"
|
||||
)
|
||||
else:
|
||||
sleep(tslice)
|
||||
|
||||
self.print_info("Connected to server")
|
||||
self.connect(sock)
|
||||
|
||||
@@ -84,25 +84,28 @@ class PyProcessBase:
|
||||
}
|
||||
|
||||
def __init__(self, python_bin="", request_handler=None, timeout=10, python_path=""):
|
||||
if (python_bin is not None) and (python_bin != ""):
|
||||
self._pbin = python_bin
|
||||
if (self._pbin is not None) and (self._pbin != ""):
|
||||
|
||||
if shutil.which(python_bin) is None:
|
||||
if shutil.which(self._pbin) is None:
|
||||
raise ETUMRuntimeError(
|
||||
f"The passed python path is not pointing to an executable: '{python_bin}'"
|
||||
f"The passed python path is not pointing to an executable: '{self._pbin}'"
|
||||
)
|
||||
|
||||
if not _is_python_interpreter(python_bin):
|
||||
if not _is_python_interpreter(self._pbin):
|
||||
raise ETUMRuntimeError(
|
||||
f"The passed executable is not a python interpreter: '{python_bin}'"
|
||||
f"The passed executable is not a python interpreter: '{self._pbin}'"
|
||||
)
|
||||
|
||||
else:
|
||||
python_bin = _sys_python_bin()
|
||||
if python_bin == "":
|
||||
raise ETUMRuntimeError(f"No valid python interpreter found")
|
||||
tm.setgd("python_bin", python_bin)
|
||||
self._pbin = tm.gd("_cached_python_bin", "")
|
||||
if self._pbin == "":
|
||||
self._pbin = _sys_python_bin()
|
||||
tm.setgd("_cached_python_bin", self._pbin)
|
||||
|
||||
if self._pbin == "":
|
||||
raise ETUMRuntimeError(f"No valid python interpreter found")
|
||||
|
||||
self._pbin = python_bin
|
||||
self._ppath = python_path
|
||||
self._req_handler = request_handler
|
||||
self._process = None
|
||||
@@ -110,6 +113,7 @@ class PyProcessBase:
|
||||
self._timeout = timeout
|
||||
self._rpc = None
|
||||
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
run the subprocess to execute the python functions of the test.
|
||||
@@ -132,6 +136,9 @@ class PyProcessBase:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.bind(("localhost", 0))
|
||||
self._port = sock.getsockname()[1]
|
||||
# Port was reserved until the sub-process is started. Now released.
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
|
||||
# Add the path of the subprocess (root sources of testium)
|
||||
func_proc_path = testium_path()
|
||||
@@ -152,10 +159,6 @@ class PyProcessBase:
|
||||
|
||||
self._process = subprocess.Popen(params, env=env, cwd=func_proc_path)
|
||||
|
||||
# Port was reserved until the sub-process is started. Now released.
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
|
||||
self._rpc = JsonRpcClient(
|
||||
"localhost", self._port, req_handler=self._req_handler
|
||||
)
|
||||
@@ -163,6 +166,11 @@ class PyProcessBase:
|
||||
self._rpc.dbg_out = sys.stdout
|
||||
self._rpc.start()
|
||||
|
||||
@property
|
||||
def python_bin(self):
|
||||
return self._pbin
|
||||
|
||||
|
||||
def join(self):
|
||||
if self._rpc is not None:
|
||||
self._rpc.join()
|
||||
|
||||
Reference in New Issue
Block a user