Merge branch 'main' into feat/pytest-item

# Conflicts:
#	DESIGN.md
This commit is contained in:
2026-06-14 19:42:02 +02:00
12 changed files with 233 additions and 51 deletions

View File

@@ -1 +1 @@
0.2.3
0.2.4

View File

@@ -2,6 +2,7 @@ from datetime import datetime
import sys
import os
import re
import errno
from queue import Queue, Empty
from time import sleep
import collections
@@ -10,6 +11,8 @@ import threading
from telnetlib3 import Telnet, DO, WILL, WONT, TTYPE, IAC, SB, SE, theNULL
from runtime.tum_except import ETUMRuntimeError
TIMEOUT_NULL = 0.000001
STOP_POLL_INTERVAL = 0.2
@@ -124,7 +127,29 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
# c = ''
return c
def read_until(self, match, timeout=None, return_data=False, mute=False, should_stop=None):
# Max chars of the buffer tail scanned in regex mode (bounds cost/memory).
REGEX_WINDOW = 65536
def _feed_match(self, data, search_deques, match_deques, matches):
"""Append *data* to each window; return the first matched pattern or None."""
matched = None
for sd, md, m in zip(search_deques, match_deques, matches):
sd.append(data)
if matched is None and sd == md:
matched = m
return matched
def _search_regex(self, read_data, compiled):
"""Search the buffer tail with each regex; return the first hit's text or None."""
tail = read_data[-self.REGEX_WINDOW:]
for p in compiled:
m = p.search(tail)
if m is not None:
return m.group(0)
return None
def read_until(self, match, timeout=None, return_data=False, mute=False,
should_stop=None, regex=False):
"""
read until the string 'match is found
If timeout is not set (None), this function runs indefinitely
@@ -141,15 +166,35 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
read_data = ''
status = -1
if not match:
raise ValueError('match parameter can not be empty')
raise ETUMRuntimeError("'expected' pattern can not be empty")
# match: a string or list of strings; succeed as soon as any is seen.
if isinstance(match, (list, tuple)):
matches = [str(m) for m in match]
else:
matches = [str(match)]
if (not matches) or any(len(m) == 0 for m in matches):
raise ETUMRuntimeError("'expected' pattern can not be empty")
if timeout is None:
timeout = 1000000
# Fixed-length queue that will contain the readout characters
search_deque = collections.deque(maxlen=len(match))
# convert match string into a deque for faster comparisons
match_deque = collections.deque(match)
compiled = None
search_deques = match_deques = None
if regex:
# 'matches' are regular expressions; succeed on the first hit.
compiled = []
for m in matches:
try:
compiled.append(re.compile(m))
except re.error as e:
raise ETUMRuntimeError(
"Invalid regular expression {!r}: {}".format(m, e)) from None
else:
# One fixed-length rolling window per literal pattern.
search_deques = [collections.deque(maxlen=len(m)) for m in matches]
match_deques = [collections.deque(m) for m in matches]
self._matched = None
# In case of a timeout equal to zero, it must be looped until the
# buffer is empty
@@ -167,9 +212,13 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
self.string_buffer += data
read_data += data
search_deque.append(data)
if search_deque == match_deque:
if regex:
matched = self._search_regex(read_data, compiled)
else:
matched = self._feed_match(data, search_deques, match_deques, matches)
if matched is not None:
status = 0
self._matched = matched
if (not mute) and (data != '\n'):
self.string_buffer += '\n'
@@ -210,9 +259,13 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
self.string_buffer += data
read_data += data
search_deque.append(data)
if search_deque == match_deque:
if regex:
matched = self._search_regex(read_data, compiled)
else:
matched = self._feed_match(data, search_deques, match_deques, matches)
if matched is not None:
status = 0
self._matched = matched
if (not mute) and (data != '\n'):
self.string_buffer += '\n'
@@ -407,20 +460,35 @@ class SerialConsole(Console):
self.stop = threading.Event()
self.port = None
self.port_id = port
self._thd = None
def open(self):
self.port = serial.Serial(port=self.port_id,
baudrate=self.baudrate,
stopbits=self.stopbits,
parity=self.parity,
xonxoff=self.xonxoff,
timeout=None)
try:
self.port = serial.Serial(port=self.port_id,
baudrate=self.baudrate,
stopbits=self.stopbits,
parity=self.parity,
xonxoff=self.xonxoff,
timeout=None)
except (serial.SerialException, OSError) as e:
raise ETUMRuntimeError(self._open_error_message(e)) from None
self.isOpened = True
if self.bufferize:
self.port.timeout = 2
self._thd = threading.Thread(target=self.read_thread)
self._thd.start()
def _open_error_message(self, exc):
"""Build a short, direct message for a failed serial open."""
errno_ = getattr(exc, "errno", None)
if errno_ == errno.ENOENT:
return "Serial device '{}' does not exist.".format(self.port_id)
if errno_ == errno.EACCES:
return ("Permission denied opening serial device '{}' "
"(is your user allowed to access it, e.g. 'dialout' group?)."
.format(self.port_id))
return "Could not open serial device '{}': {}".format(self.port_id, exc)
def read_thread(self):
while not self.stop.is_set():
c = self.port.read(1)
@@ -428,7 +496,7 @@ class SerialConsole(Console):
self.rx_queue.put(c)
def close(self):
if self.bufferize:
if self.bufferize and self._thd is not None:
self.stop.set()
self._thd.join()
if self.port is not None:
@@ -440,10 +508,12 @@ class SerialConsole(Console):
self.port.timeout = timeout
def readchar(self, timeout):
if not self.isOpened:
raise ETUMRuntimeError("Serial console '{}' is not open".format(self.name))
if self.bufferize:
if not self._thd.is_alive() and not self.stop.isSet():
raise RuntimeError(
"Impossible to read the serial console, it may be already openned")
raise ETUMRuntimeError(
"Impossible to read the serial console, it may be already opened")
if timeout < TIMEOUT_NULL:
return self.rx_queue.get(block=False)
else:
@@ -455,10 +525,12 @@ class SerialConsole(Console):
self.port.flush()
def read_nowait(self, mute=False):
if not self.isOpened:
raise ETUMRuntimeError("Serial console '{}' is not open".format(self.name))
if self.bufferize:
if not self._thd.is_alive() and not self.stop.isSet():
raise RuntimeError(
"Impossible to read the serial console, it may be already openned")
raise ETUMRuntimeError(
"Impossible to read the serial console, it may be already opened")
st = self.rx_queue.getAll().decode(self.encoding, errors='replace')
if not mute:
date_str = str(datetime.now()).split('.')[0].split(' ')[1]

View File

@@ -145,7 +145,7 @@ class TestItem:
self._report_key = None
self._reported = None
self.status_queue = status_queue
self._execute_on_stop = False
self._execute_on_stop_raw = False
self._post_eval = None
self._store_result = None
self._expected_result = None
@@ -154,7 +154,7 @@ class TestItem:
self._is_running = False
self._is_breakpoint = False
self._is_paused = False
self._stop_on_failure = False
self._stop_on_failure_raw = False
self._doc = ""
self._name = ""
self.report = None
@@ -197,13 +197,14 @@ class TestItem:
self.skipped = False
self._report_key = self._prms.getParam("key", default=None)
self._stop_on_failure = self._prms.getParam(
"stop_on_failure", default=False, processed=True
# Kept raw: expanded at run time by the matching properties.
self._stop_on_failure_raw = self._prms.getParam(
"stop_on_failure", default=False
)
self._doc = self._prms.getParam("doc", default="", processed=True)
#
self._execute_on_stop = self._prms.getParam(
"execute_on_stop", default=False, processed=True
self._execute_on_stop_raw = self._prms.getParam(
"execute_on_stop", default=False
)
if "process_result" in dict_item:
@@ -570,6 +571,20 @@ class TestItem:
def setEnabled(self):
self.enabled = True
def _eval_flag(self, raw):
"""Run-time flag: bool as-is, otherwise expanded and coerced to bool."""
if isinstance(raw, bool):
return raw
return eval_to_boolean(self._prms.expanse(raw))
@property
def _stop_on_failure(self):
return self._eval_flag(self._stop_on_failure_raw)
@property
def _execute_on_stop(self):
return self._eval_flag(self._execute_on_stop_raw)
def executedOnStop(self):
return self._execute_on_stop

View File

@@ -4,7 +4,7 @@ import importlib
import traceback
import api.testium as tm
from runtime.tum_except import ETUMSyntaxError
from runtime.tum_except import ETUMSyntaxError, ETUMRuntimeError
from runtime.stdout_redirect import stdio_redir
from interpreter.test_items.test_item import test_run
from interpreter.test_items.item_actions import TestItemActions
@@ -88,7 +88,7 @@ class TestItemConsoleOpen(TestItemConsoleAction):
telnet_host = self._prms.getParam(
"telnet_host", required=True, processed=True
)
telnet_port = self._prms.getParam("telnet_port", default=69)
telnet_port = self._prms.getParam("telnet_port", default=69, processed=True)
elif self._protocol == "ssh":
if tm.OS() == "Windows":
@@ -225,12 +225,16 @@ class TestItemConsoleOpen(TestItemConsoleAction):
tm.add_console(cons)
cons.open()
self.result.set(TestValue.SUCCESS)
except ETUMRuntimeError as e:
# Expected console error (device missing, no permission…): one line.
msg = "Impossible to open the console '{}': {}".format(cname, e._message)
self.result.set(result=TestValue.FAILURE, message=msg)
print(msg)
except Exception as e:
# Unexpected error: keep the full traceback for diagnosis.
self.result.set(
result=TestValue.FAILURE,
message="Impossible to open the console ({}) (exception: {})".format(
cname, e
),
message="Impossible to open the console '{}': {}".format(cname, e),
)
traceback.print_exception(*sys.exc_info())
@@ -319,12 +323,17 @@ class TestItemConsoleReadUntil(TestItemConsoleAction):
PARAMS = ParamSet(
Param("expected", required=True,
doc="Regex matched against incoming console output until found "
"or until timeout."),
doc="Literal string — or a list of strings — matched against the "
"incoming console output. The read succeeds as soon as one of "
"them is seen, or fails on timeout."),
Param("timeout", default=-1,
doc="Seconds before giving up. Negative means infinite."),
Param("mute", default=False,
doc="If true, don't echo received bytes to testium's stdout/log."),
Param("regex", default=False,
doc="If true, each 'expected' entry is treated as a Python "
"regular expression (searched, not anchored) instead of a "
"literal string."),
)
def __init__(
@@ -343,16 +352,21 @@ class TestItemConsoleReadUntil(TestItemConsoleAction):
@test_run
def execute(self):
cons = self.get_console()
ru = self._prms.expanse(self._read_until)
# 'expected' may be a single value or a list of values (match any).
if isinstance(self._read_until, (list, tuple)):
ru = [self._prms.expanse(m) for m in self._read_until]
else:
ru = self._prms.expanse(self._read_until)
read_timeout = int(self._prms.getParam("timeout", default=-1, processed=True))
mute = self._prms.getParam("mute", default=False, processed=True)
use_regex = self._prms.getParam("regex", default=False, processed=True)
if read_timeout < 0:
read_timeout = None
try:
status, data = cons.read_until(
ru, timeout=read_timeout, return_data=True, mute=mute,
should_stop=self.isStopped,
should_stop=self.isStopped, regex=bool(use_regex),
)
if status == 0:
self.result.set(TestValue.SUCCESS)
@@ -364,14 +378,21 @@ class TestItemConsoleReadUntil(TestItemConsoleAction):
)
else:
self.result.set(result=TestValue.FAILURE, message="No matching text")
if mute:
self.result.reported = {"data": ""}
else:
self.result.reported = {"data": data}
reported = {"data": "" if mute else data}
# When several patterns were given, expose which one matched.
if status == 0 and isinstance(ru, (list, tuple)):
reported["matched"] = getattr(cons, "_matched", None)
self.result.reported = reported
# The result is put in global dir
tm.setgd("cn_" + self.parent()._name, data)
except:
except ETUMRuntimeError as e:
# Expected console error (e.g. console not open): clear one-liner.
msg = f"Console '{self.token['console_name']}': impossible to read ({e._message})"
self.result.set(result=TestValue.FAILURE, message=msg)
print(msg)
except Exception:
# Unexpected error: keep the full traceback for diagnosis.
print(traceback.format_exc())
self.result.set(
result=TestValue.FAILURE,

View File

@@ -51,11 +51,8 @@ class TestItemCycle(TestItem):
self._niter = None
if "iterator" in dict_cycle:
# Kept raw: expanded at run time in execute().
self._iter = dict_cycle["iterator"]
if isinstance(self._iter, str):
self._iter = self._prms.expanse(self._iter)
else:
self._iter = None

View File

@@ -21,7 +21,8 @@ class TestItemGit(TestItem):
super().__init__(dict_item, parent, status_queue, filename=filename)
self._type = cst.TYPE_GIT
self.is_container = False
self.repo = self._prms.getParamAll('repo', processed=True, required=True)
# Kept raw: each repo entry is expanded at run time in execute().
self.repo = self._prms.getParamAll('repo', required=True)
@test_run
def execute(self):

View File

@@ -26,13 +26,14 @@ class TestItemTestedRefsDialog(TestItemDialogBase):
self.is_container = False
with item_load_context(self.cmd(), self.name(), self.seqFilename()):
self._question = self._prms.getParam('question', required=True)
self._init_values = self._prms.getParamAll('reference', required=False, processed=True)
# Kept raw: expanded at run time in execute().
self._init_values = self._prms.getParamAll('reference', required=False)
self._auto_result = self._prms.getParam('auto_result', required=False, default=None)
@test_run
def execute(self):
q = self._prms.expanse(self._question)
init_values = ','.join(self._init_values)
init_values = ','.join(self._prms.expanse(v) for v in self._init_values)
if _is_text_mode():
print(f"References: {q}")
rows = init_values.split(',') if init_values else ['']