feat(console): list/regex read_until, serial error clarity; v0.3

read_until:
- 'expected' now accepts a list of values (succeeds on any match).
- new 'regex: true' flag: each pattern is a Python regex (re.search over a
  bounded tail, Console.REGEX_WINDOW). Reports which pattern matched.

Serial console robustness & clarity:
- failed open() raises a clear ETUMRuntimeError ("Serial device '…' does not
  exist." / permission hint) instead of a raw pyserial traceback.
- a console whose open failed is safely "not open" (init _thd=None +
  isOpened guards in readchar/read_nowait/close) — no more cascading
  AttributeError: '_thd' on subsequent read steps.
- action handlers: one-liner for expected (ETUMRuntimeError) errors, full
  traceback kept for unexpected ones. All console errors use testium
  exceptions (ETUMRuntimeError).

Flatpak: grant --device=all so serial adapters (/dev/ttyUSB*, /dev/ttyACM*)
are visible in the sandbox.

Validation: new read_until list/regex (match + no-match) cases in
items/console/test.tum.

Version: 0.3.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-14 16:54:00 +02:00
parent 8c4e1b56b5
commit e4300ecf7b
6 changed files with 180 additions and 36 deletions

View File

@@ -1 +1 @@
0.2.3
0.3

View File

@@ -2,6 +2,7 @@ from datetime import datetime
import sys
import os
import re
import errno
from queue import Queue, Empty
from time import sleep
import collections
@@ -10,6 +11,8 @@ import threading
from telnetlib3 import Telnet, DO, WILL, WONT, TTYPE, IAC, SB, SE, theNULL
from runtime.tum_except import ETUMRuntimeError
TIMEOUT_NULL = 0.000001
STOP_POLL_INTERVAL = 0.2
@@ -124,7 +127,32 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
# c = ''
return c
def read_until(self, match, timeout=None, return_data=False, mute=False, should_stop=None):
# Upper bound (in characters) of the accumulated buffer tail scanned in
# regex mode, so cost/memory stay bounded on long-running streams.
REGEX_WINDOW = 65536
def _feed_match(self, data, search_deques, match_deques, matches):
"""Append *data* to every rolling window and return the first matched
pattern string, or None if none completed on this character."""
matched = None
for sd, md, m in zip(search_deques, match_deques, matches):
sd.append(data)
if matched is None and sd == md:
matched = m
return matched
def _search_regex(self, read_data, compiled):
"""Search the (bounded) tail of *read_data* with each compiled regex;
return the matched text of the first hit, or None."""
tail = read_data[-self.REGEX_WINDOW:]
for p in compiled:
m = p.search(tail)
if m is not None:
return m.group(0)
return None
def read_until(self, match, timeout=None, return_data=False, mute=False,
should_stop=None, regex=False):
"""
read until the string 'match is found
If timeout is not set (None), this function runs indefinitely
@@ -141,15 +169,37 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
read_data = ''
status = -1
if not match:
raise ValueError('match parameter can not be empty')
raise ETUMRuntimeError("'expected' pattern can not be empty")
# match may be a single string or a list/tuple of strings: the read
# succeeds as soon as ANY of them is seen in the stream.
if isinstance(match, (list, tuple)):
matches = [str(m) for m in match]
else:
matches = [str(match)]
if (not matches) or any(len(m) == 0 for m in matches):
raise ETUMRuntimeError("'expected' pattern can not be empty")
if timeout is None:
timeout = 1000000
# Fixed-length queue that will contain the readout characters
search_deque = collections.deque(maxlen=len(match))
# convert match string into a deque for faster comparisons
match_deque = collections.deque(match)
compiled = None
search_deques = match_deques = None
if regex:
# 'matches' are regular expressions; succeed on the first hit.
compiled = []
for m in matches:
try:
compiled.append(re.compile(m))
except re.error as e:
raise ETUMRuntimeError(
"Invalid regular expression {!r}: {}".format(m, e)) from None
else:
# One fixed-length rolling window per match pattern, compared
# against the corresponding pattern deque.
search_deques = [collections.deque(maxlen=len(m)) for m in matches]
match_deques = [collections.deque(m) for m in matches]
self._matched = None
# In case of a timeout equal to zero, it must be looped until the
# buffer is empty
@@ -167,9 +217,13 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
self.string_buffer += data
read_data += data
search_deque.append(data)
if search_deque == match_deque:
if regex:
matched = self._search_regex(read_data, compiled)
else:
matched = self._feed_match(data, search_deques, match_deques, matches)
if matched is not None:
status = 0
self._matched = matched
if (not mute) and (data != '\n'):
self.string_buffer += '\n'
@@ -210,9 +264,13 @@ A {classname}.close() is missing somewhere in your code !'.format(classname=type
self.string_buffer += data
read_data += data
search_deque.append(data)
if search_deque == match_deque:
if regex:
matched = self._search_regex(read_data, compiled)
else:
matched = self._feed_match(data, search_deques, match_deques, matches)
if matched is not None:
status = 0
self._matched = matched
if (not mute) and (data != '\n'):
self.string_buffer += '\n'
@@ -407,20 +465,35 @@ class SerialConsole(Console):
self.stop = threading.Event()
self.port = None
self.port_id = port
self._thd = None
def open(self):
self.port = serial.Serial(port=self.port_id,
baudrate=self.baudrate,
stopbits=self.stopbits,
parity=self.parity,
xonxoff=self.xonxoff,
timeout=None)
try:
self.port = serial.Serial(port=self.port_id,
baudrate=self.baudrate,
stopbits=self.stopbits,
parity=self.parity,
xonxoff=self.xonxoff,
timeout=None)
except (serial.SerialException, OSError) as e:
raise ETUMRuntimeError(self._open_error_message(e)) from None
self.isOpened = True
if self.bufferize:
self.port.timeout = 2
self._thd = threading.Thread(target=self.read_thread)
self._thd.start()
def _open_error_message(self, exc):
"""Build a short, direct message for a failed serial open."""
errno_ = getattr(exc, "errno", None)
if errno_ == errno.ENOENT:
return "Serial device '{}' does not exist.".format(self.port_id)
if errno_ == errno.EACCES:
return ("Permission denied opening serial device '{}' "
"(is your user allowed to access it, e.g. 'dialout' group?)."
.format(self.port_id))
return "Could not open serial device '{}': {}".format(self.port_id, exc)
def read_thread(self):
while not self.stop.is_set():
c = self.port.read(1)
@@ -428,7 +501,7 @@ class SerialConsole(Console):
self.rx_queue.put(c)
def close(self):
if self.bufferize:
if self.bufferize and self._thd is not None:
self.stop.set()
self._thd.join()
if self.port is not None:
@@ -440,10 +513,12 @@ class SerialConsole(Console):
self.port.timeout = timeout
def readchar(self, timeout):
if not self.isOpened:
raise ETUMRuntimeError("Serial console '{}' is not open".format(self.name))
if self.bufferize:
if not self._thd.is_alive() and not self.stop.isSet():
raise RuntimeError(
"Impossible to read the serial console, it may be already openned")
raise ETUMRuntimeError(
"Impossible to read the serial console, it may be already opened")
if timeout < TIMEOUT_NULL:
return self.rx_queue.get(block=False)
else:
@@ -455,10 +530,12 @@ class SerialConsole(Console):
self.port.flush()
def read_nowait(self, mute=False):
if not self.isOpened:
raise ETUMRuntimeError("Serial console '{}' is not open".format(self.name))
if self.bufferize:
if not self._thd.is_alive() and not self.stop.isSet():
raise RuntimeError(
"Impossible to read the serial console, it may be already openned")
raise ETUMRuntimeError(
"Impossible to read the serial console, it may be already opened")
st = self.rx_queue.getAll().decode(self.encoding, errors='replace')
if not mute:
date_str = str(datetime.now()).split('.')[0].split(' ')[1]

View File

@@ -4,7 +4,7 @@ import importlib
import traceback
import api.testium as tm
from runtime.tum_except import ETUMSyntaxError
from runtime.tum_except import ETUMSyntaxError, ETUMRuntimeError
from runtime.stdout_redirect import stdio_redir
from interpreter.test_items.test_item import test_run
from interpreter.test_items.item_actions import TestItemActions
@@ -225,12 +225,17 @@ class TestItemConsoleOpen(TestItemConsoleAction):
tm.add_console(cons)
cons.open()
self.result.set(TestValue.SUCCESS)
except ETUMRuntimeError as e:
# Expected, user-facing console error (device missing, no permission,
# …): report a single clear line, no traceback.
msg = "Impossible to open the console '{}': {}".format(cname, e._message)
self.result.set(result=TestValue.FAILURE, message=msg)
print(msg)
except Exception as e:
# Unexpected error: keep the full traceback for diagnosis.
self.result.set(
result=TestValue.FAILURE,
message="Impossible to open the console ({}) (exception: {})".format(
cname, e
),
message="Impossible to open the console '{}': {}".format(cname, e),
)
traceback.print_exception(*sys.exc_info())
@@ -319,12 +324,17 @@ class TestItemConsoleReadUntil(TestItemConsoleAction):
PARAMS = ParamSet(
Param("expected", required=True,
doc="Regex matched against incoming console output until found "
"or until timeout."),
doc="Literal string — or a list of strings — matched against the "
"incoming console output. The read succeeds as soon as one of "
"them is seen, or fails on timeout."),
Param("timeout", default=-1,
doc="Seconds before giving up. Negative means infinite."),
Param("mute", default=False,
doc="If true, don't echo received bytes to testium's stdout/log."),
Param("regex", default=False,
doc="If true, each 'expected' entry is treated as a Python "
"regular expression (searched, not anchored) instead of a "
"literal string."),
)
def __init__(
@@ -343,16 +353,21 @@ class TestItemConsoleReadUntil(TestItemConsoleAction):
@test_run
def execute(self):
cons = self.get_console()
ru = self._prms.expanse(self._read_until)
# 'expected' may be a single value or a list of values (match any).
if isinstance(self._read_until, (list, tuple)):
ru = [self._prms.expanse(m) for m in self._read_until]
else:
ru = self._prms.expanse(self._read_until)
read_timeout = int(self._prms.getParam("timeout", default=-1, processed=True))
mute = self._prms.getParam("mute", default=False, processed=True)
use_regex = self._prms.getParam("regex", default=False, processed=True)
if read_timeout < 0:
read_timeout = None
try:
status, data = cons.read_until(
ru, timeout=read_timeout, return_data=True, mute=mute,
should_stop=self.isStopped,
should_stop=self.isStopped, regex=bool(use_regex),
)
if status == 0:
self.result.set(TestValue.SUCCESS)
@@ -364,14 +379,21 @@ class TestItemConsoleReadUntil(TestItemConsoleAction):
)
else:
self.result.set(result=TestValue.FAILURE, message="No matching text")
if mute:
self.result.reported = {"data": ""}
else:
self.result.reported = {"data": data}
reported = {"data": "" if mute else data}
# When several patterns were given, expose which one matched.
if status == 0 and isinstance(ru, (list, tuple)):
reported["matched"] = getattr(cons, "_matched", None)
self.result.reported = reported
# The result is put in global dir
tm.setgd("cn_" + self.parent()._name, data)
except:
except ETUMRuntimeError as e:
# Expected console error (e.g. console not open): clear one-liner.
msg = f"Console '{self.token['console_name']}': impossible to read ({e._message})"
self.result.set(result=TestValue.FAILURE, message=msg)
print(msg)
except Exception:
# Unexpected error: keep the full traceback for diagnosis.
print(traceback.format_exc())
self.result.set(
result=TestValue.FAILURE,