pyinstaller now working
This commit is contained in:
477
src/lib/jrpc.py
Normal file
477
src/lib/jrpc.py
Normal file
@@ -0,0 +1,477 @@
|
||||
import socket
|
||||
import json
|
||||
from time import monotonic
|
||||
import threading
|
||||
import itertools
|
||||
from time import sleep
|
||||
from typing import Callable, Any
|
||||
try:
|
||||
import libs.testium as tm
|
||||
except:
|
||||
import py_func.tm as tm
|
||||
|
||||
from lib.tum_except import ETUMRuntimeError
|
||||
|
||||
"""Lightweight JSON-RPC 2.0 helpers over TCP sockets.
|
||||
|
||||
This module implements a minimal JSON-RPC 2.0 messaging layer using
|
||||
newline-delimited JSON over TCP sockets. It is intended for simple
|
||||
request/response exchanges useful during development and testing. The
|
||||
implementation favors clarity and small surface area rather than full
|
||||
JSON-RPC compliance.
|
||||
|
||||
Main public classes
|
||||
- `JsonRpcConnection` -- Wraps a connected TCP socket and manages
|
||||
sending requests and dispatching incoming requests and responses. It
|
||||
runs a background receiver thread and pairs outgoing requests with
|
||||
responses using per-request identifiers and `threading.Event` objects.
|
||||
|
||||
- `JsonRpcBase` -- Threaded base class for simple server/client helpers.
|
||||
Provides a `call()` method to send requests to the connected peer and
|
||||
a `handle_request()` hook that can be overridden or supplied at
|
||||
construction.
|
||||
|
||||
- `JsonRpcSrv` / `JsonRpcClient` -- Convenience single-connection server
|
||||
and client classes that accept/connect to a peer and create a
|
||||
`JsonRpcConnection` to handle message I/O.
|
||||
|
||||
Usage example (server):
|
||||
|
||||
srv = JsonRpcSrv(port, req_handler=my_handler)
|
||||
srv.start() # runs in background thread
|
||||
|
||||
Usage example (client):
|
||||
|
||||
clt = JsonRpcClient(host, port)
|
||||
clt.start()
|
||||
result = clt.call('method_name', {'foo': 'bar'})
|
||||
|
||||
Notes:
|
||||
- Messages must be valid JSON objects and are expected to be
|
||||
single-line (newline-delimited).
|
||||
- This helper is intended for local/testing use; it does not provide
|
||||
authentication, encryption, or advanced JSON-RPC features (notifications,
|
||||
batch requests, error objects beyond simple dicts).
|
||||
"""
|
||||
|
||||
|
||||
class JsonRpcConnection:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
conn: socket.socket,
|
||||
req_handler: Callable[..., Any],
|
||||
dbg_out=None,
|
||||
):
|
||||
self.name = name
|
||||
self.conn = conn
|
||||
if not callable(req_handler):
|
||||
raise TypeError("req_handler must be a callable (function)")
|
||||
|
||||
# User-provided function called to handle incoming requests.
|
||||
# It may accept either the full request dict or (method, params).
|
||||
self.req_handler = req_handler
|
||||
self.send_lock = threading.Lock()
|
||||
self.pending = {} # id -> Event + response
|
||||
self.id_gen = itertools.count(1)
|
||||
self.running = True
|
||||
self._dbg_out = dbg_out
|
||||
self._event_ready = threading.Event()
|
||||
|
||||
self.conn.settimeout(0.1)
|
||||
|
||||
self.recv_thread = threading.Thread(target=self._recv_loop, daemon=True)
|
||||
self.recv_thread.start()
|
||||
|
||||
@property
|
||||
def dbg_out(self):
|
||||
return self._dbg_out
|
||||
|
||||
@dbg_out.setter
|
||||
def dbg_out(self, dbg_out):
|
||||
self._dbg_out = dbg_out
|
||||
|
||||
# ---------- Reception ----------
|
||||
def _recv_loop(self):
|
||||
buffer = b""
|
||||
|
||||
try:
|
||||
self._event_ready.set()
|
||||
while self.running:
|
||||
try:
|
||||
data = self.conn.recv(4096)
|
||||
if not data:
|
||||
self.print_info("Connection closed")
|
||||
break
|
||||
except socket.timeout:
|
||||
continue
|
||||
else:
|
||||
buffer += data
|
||||
|
||||
while b"\n" in buffer:
|
||||
line, buffer = buffer.split(b"\n", 1)
|
||||
try:
|
||||
msg = json.loads(line.decode())
|
||||
except Exception as e:
|
||||
self.print_info(str(e))
|
||||
else:
|
||||
if isinstance(msg, dict):
|
||||
self._dispatch(msg)
|
||||
else:
|
||||
self.print_info(f"msg not dict ! = '{msg}'")
|
||||
|
||||
except (ConnectionResetError, OSError):
|
||||
self.print_info("Connection lost")
|
||||
|
||||
finally:
|
||||
self.running = False
|
||||
|
||||
# ---------- Dispatch ----------
|
||||
def _dispatch(self, msg):
|
||||
if "method" in msg:
|
||||
# request to be sent
|
||||
meth = msg["method"]
|
||||
params = msg.get("params", None)
|
||||
rid = msg.get("id", None)
|
||||
|
||||
threading.Thread(
|
||||
target=self._handle_request, args=(meth, params, rid), daemon=True
|
||||
).start()
|
||||
|
||||
elif "id" in msg:
|
||||
# we just received an answer to a previously sent request
|
||||
if msg["id"] in self.pending:
|
||||
self.pending[msg["id"]]["response"] = msg
|
||||
self.pending[msg["id"]]["event"].set()
|
||||
else:
|
||||
self.print_info(f"msg id '{msg["id"]}' inconsistency")
|
||||
|
||||
# ---------- Handler ----------
|
||||
def _handle_request(self, meth, params, rid=None):
|
||||
"""Basic request handler.
|
||||
|
||||
In this implementation
|
||||
a `req_handler` callable provided at construction is invoked to handle
|
||||
the request and produce a response value.
|
||||
"""
|
||||
# print(f"Request received: m:'{meth}', p:'{params}'")
|
||||
|
||||
# Delegate handling to the user-provided function. Accept both
|
||||
# `handler(req_dict)` and `handler(method, params)` signatures; if
|
||||
# the handler raises, capture the exception message as the result.
|
||||
try:
|
||||
result = self.req_handler(meth, params)
|
||||
except Exception as exc:
|
||||
result = {"error": str(exc)}
|
||||
|
||||
self.print_info(f"result: {result}")
|
||||
|
||||
# If the request contains an `id`, send a JSON-RPC response.
|
||||
if rid is not None:
|
||||
msg = {"jsonrpc": "2.0", **result, "id": rid}
|
||||
self._send(msg)
|
||||
|
||||
# ---------- Send ----------
|
||||
def _send(self, obj):
|
||||
"""Send a JSON-serializable object terminated by newline.
|
||||
|
||||
The send operation is protected by a lock to avoid interleaving when
|
||||
multiple threads attempt to write to the underlying socket.
|
||||
"""
|
||||
msg = json.dumps(obj) + "\n"
|
||||
data = (msg).encode()
|
||||
self.print_info(f"sending : " + msg)
|
||||
with self.send_lock:
|
||||
self.conn.sendall(data)
|
||||
|
||||
# ---------- Outgoing request ----------
|
||||
def call(self, method, params=None, timeout=3600.0):
|
||||
"""Send a request and wait for its response.
|
||||
|
||||
Args:
|
||||
method: The RPC method name.
|
||||
params: Parameters for the method (any JSON-serializable object).
|
||||
timeout: Seconds to wait for a response before raising
|
||||
`TimeoutError`.
|
||||
|
||||
Returns:
|
||||
The response message (dict) received from the peer.
|
||||
|
||||
Raises:
|
||||
TimeoutError: If no response is received within `timeout`.
|
||||
"""
|
||||
|
||||
req_id = next(self.id_gen)
|
||||
event = threading.Event()
|
||||
|
||||
self.pending[req_id] = {"event": event, "response": None}
|
||||
|
||||
self._send({"jsonrpc": "2.0", "method": method, "params": params, "id": req_id})
|
||||
|
||||
if not event.wait(timeout):
|
||||
# Timeout: remove pending entry and raise
|
||||
self.pending.pop(req_id, None)
|
||||
raise TimeoutError("Timeout JSON-RPC")
|
||||
|
||||
return self.pending.pop(req_id)["response"]
|
||||
|
||||
def print_info(self, msg):
|
||||
if self.dbg_out is not None:
|
||||
print(f"{self.name}: " + str(msg), file=self.dbg_out)
|
||||
|
||||
def stop(self):
|
||||
if self.running:
|
||||
self.running = False
|
||||
|
||||
def join(self):
|
||||
self.recv_thread.join()
|
||||
|
||||
def is_alive(self):
|
||||
self.recv_thread.is_alive()
|
||||
|
||||
def wait_ready(self, timeout=None):
|
||||
return self._event_ready.wait(timeout)
|
||||
|
||||
|
||||
class JsonRpcBase(threading.Thread):
|
||||
"""Threaded base class for simple JSON-RPC server/client helpers.
|
||||
|
||||
Subclasses implement `run()` to accept or establish a single TCP
|
||||
connection and create a `JsonRpcConnection` instance assigned to
|
||||
`self._rpc`. The base class provides a `call()` helper that forwards
|
||||
to the active connection, and a `handle_request(method, params)` hook
|
||||
which may be overridden or supplied via the `req_handler` constructor
|
||||
argument.
|
||||
|
||||
Constructor:
|
||||
- `port` (int): TCP port to bind/connect to.
|
||||
- `req_handler` (callable|None): optional request handler.
|
||||
- `timeout` (int|float): operation timeout in seconds.
|
||||
|
||||
Behavior:
|
||||
- `call()` raises `ETUMRuntimeError` if no active connection exists.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host,
|
||||
port,
|
||||
req_handler: Callable[[dict], Any] = None,
|
||||
timeout=10,
|
||||
dbg_out=None,
|
||||
):
|
||||
super().__init__()
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._timeout = timeout
|
||||
self._rpc = None
|
||||
self._req_handler = req_handler
|
||||
self._dbg_out = dbg_out
|
||||
self._event_ready = threading.Event()
|
||||
|
||||
def handle_request(self, method, params):
|
||||
"""Override to implement server-side request handling.
|
||||
|
||||
The default implementation delegates to the `req_handler` provided
|
||||
at construction (if any). Override this method to customize
|
||||
behaviour.
|
||||
"""
|
||||
if self._req_handler is not None:
|
||||
return self._req_handler(method, params)
|
||||
|
||||
self.print_info("No handler defined for the calls")
|
||||
|
||||
def call(self, method, params):
|
||||
if (self._rpc is not None) and self._rpc.running:
|
||||
return self._rpc.call(method, params)
|
||||
else:
|
||||
raise ETUMRuntimeError(f"'{self.name}' JRPC Server not started.")
|
||||
|
||||
def print_info(self, msg):
|
||||
if self.dbg_out is not None:
|
||||
print(f"{self.name}: " + str(msg), file=self.dbg_out)
|
||||
|
||||
def run(self):
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
if self._rpc is not None:
|
||||
self._rpc.stop()
|
||||
|
||||
def connect(self, sock):
|
||||
self._rpc = JsonRpcConnection(
|
||||
self.name, sock, self.handle_request, dbg_out=self.dbg_out
|
||||
)
|
||||
self._rpc.wait_ready()
|
||||
self._event_ready.set()
|
||||
|
||||
def wait_ready(self, timeout=None):
|
||||
return self._event_ready.wait(timeout)
|
||||
|
||||
@property
|
||||
def dbg_out(self):
|
||||
return self._dbg_out
|
||||
|
||||
@dbg_out.setter
|
||||
def dbg_out(self, dbg_out):
|
||||
self._dbg_out = dbg_out
|
||||
if self._rpc is not None:
|
||||
self._rpc.dbg_out = dbg_out
|
||||
|
||||
|
||||
class JsonRpcSrv(JsonRpcBase):
|
||||
"""Single-connection JSON-RPC server.
|
||||
|
||||
`JsonRpcSrv` binds to `localhost` on the provided port and waits for a
|
||||
single client connection. When a client connects it creates a
|
||||
`JsonRpcConnection` and runs until the connection closes or is stopped.
|
||||
|
||||
Typical usage::
|
||||
|
||||
srv = JsonRpcSrv(port, req_handler=my_handler)
|
||||
srv.start() # runs in background thread
|
||||
|
||||
The server will raise `ETUMRuntimeError` on accept/connect timeout.
|
||||
"""
|
||||
|
||||
def __init__(self, host, port, req_handler=None, timeout=10):
|
||||
super().__init__(host, port, req_handler, timeout)
|
||||
self.name = f"JsonRpcSvr_{port}"
|
||||
|
||||
def run(self):
|
||||
# TCP/IP socket creation
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
|
||||
# Link of the socket at the configured port
|
||||
sock.bind((self._host, self._port))
|
||||
|
||||
# Listens incoming connections
|
||||
sock.listen(1)
|
||||
self.print_info(f"listening on {self._host}:{self._port}")
|
||||
|
||||
self.print_info(f"awaiting connection for {self._timeout} secs")
|
||||
sock.settimeout(self._timeout)
|
||||
while True:
|
||||
try:
|
||||
conn, addr = sock.accept()
|
||||
break
|
||||
except socket.timeout:
|
||||
raise ETUMRuntimeError(f"{self.name}: Timeout")
|
||||
|
||||
self.print_info("Client connected")
|
||||
with conn:
|
||||
self.connect(conn)
|
||||
|
||||
while self._rpc.running:
|
||||
# Sleep a short time to avoid a busy loop and allow
|
||||
# the receiver thread to process messages.
|
||||
sleep(0.1)
|
||||
|
||||
finally:
|
||||
if self._rpc is not None:
|
||||
self._rpc.stop()
|
||||
self._rpc.join()
|
||||
self.print_info("stopped")
|
||||
|
||||
|
||||
class JsonRpcClient(JsonRpcBase):
|
||||
"""Simple JSON-RPC client that connects to a server on localhost.
|
||||
|
||||
`JsonRpcClient` will attempt to connect to the given port until the
|
||||
configured timeout elapses. On successful connection it creates a
|
||||
`JsonRpcConnection` and serves requests/responses until closed.
|
||||
|
||||
Typical usage::
|
||||
|
||||
clt = JsonRpcClient(host, port)
|
||||
clt.start()
|
||||
resp = clt.call('method', {'a': 1})
|
||||
"""
|
||||
|
||||
def __init__(self, host, port, req_handler=None, timeout=10):
|
||||
super().__init__(host, port, req_handler, timeout)
|
||||
self.name = f"JsonRpcClt_{port}"
|
||||
|
||||
def run(self):
|
||||
if tm.OS() == "Windows":
|
||||
self.run_win()
|
||||
else:
|
||||
self.run_lin()
|
||||
|
||||
def run_win(self):
|
||||
# TCP/IP socket creation
|
||||
tslice = 1
|
||||
t = self._timeout
|
||||
sock = None
|
||||
try:
|
||||
while t >= 0:
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(tslice)
|
||||
# Link of the socket at the configured port
|
||||
try:
|
||||
sock.connect((self._host, self._port))
|
||||
break
|
||||
except socket.timeout:
|
||||
sock.close()
|
||||
t -= tslice
|
||||
if t < 0:
|
||||
raise ETUMRuntimeError(
|
||||
f"{self.name}: failed to connect : timeout"
|
||||
)
|
||||
else:
|
||||
sleep(tslice)
|
||||
except socket.error as e:
|
||||
raise ETUMRuntimeError(f"{self.name}: failed to connect : {e}")
|
||||
|
||||
self.print_info("Connected to server")
|
||||
self.connect(sock)
|
||||
|
||||
while self._rpc.running:
|
||||
# Sleep a short time to avoid a busy loop and allow
|
||||
# the receiver thread to process messages.
|
||||
sleep(0.1)
|
||||
|
||||
finally:
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
if self._rpc is not None:
|
||||
self._rpc.stop()
|
||||
self._rpc.join()
|
||||
self.print_info("closed")
|
||||
|
||||
def run_lin(self):
|
||||
# TCP/IP socket creation
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
tslice = 0.1
|
||||
t0 = monotonic()
|
||||
sock.settimeout(0)
|
||||
# Link of the socket at the configured port
|
||||
while True:
|
||||
try:
|
||||
sleep(tslice)
|
||||
sock.connect((self._host, self._port))
|
||||
break
|
||||
except Exception as e:
|
||||
if (monotonic() - t0) > self._timeout:
|
||||
raise ETUMRuntimeError(
|
||||
f"{self.name}: failed to connect : {e}"
|
||||
)
|
||||
|
||||
self.print_info("Connected to server")
|
||||
self.connect(sock)
|
||||
|
||||
while self._rpc.running:
|
||||
# Sleep a short time to avoid a busy loop and allow
|
||||
# the receiver thread to process messages.
|
||||
sleep(0.1)
|
||||
|
||||
finally:
|
||||
if self._rpc is not None:
|
||||
self._rpc.stop()
|
||||
self._rpc.join()
|
||||
self.print_info("closed")
|
||||
Reference in New Issue
Block a user