#!/usr/bin/python3 import os import sys import traceback from configparser import ConfigParser import logging import inspect from enum import Enum, auto import signal from importlib import import_module from threading import Thread, Lock import inspect from threading import Thread def is_number(s): try: float(s) return True except ValueError: pass return False class AEErrs(Enum): PARSE_ERROR = -32700 INVALID_REQUEST = -32600 METH_NOT_FOUND = -32601 INVALID_PARAMS = -32602 INTERNAL_ERROR = -32000 def __str__(self) -> str: return ERROR_MESSAGES[self.value] ERROR_MESSAGES = { AEErrs.PARSE_ERROR: """Invalid JSON was received by the server. An error occurred on the server while parsing the JSON text.""", AEErrs.INVALID_REQUEST: "The request sent is not a valid object.", AEErrs.METH_NOT_FOUND: "The method does not exist / is not available.", AEErrs.INVALID_PARAMS: "Invalid method parameter(s).", AEErrs.INTERNAL_ERROR: "Internal error.", } class AppEngineException(Exception): def __init__(self, error: AEErrs, mesg=None) -> None: if mesg is None: self.mesg = str(error) super().__init__(self.mesg) self.value = error.value class Commands(Thread): defmod = None prefcmd = "cmd_" def __init__(self, config: ConfigParser, log: logging.Handler) -> None: super().__init__() self.threaded = False self.cmods = {} self.config = config self.log = log self.stopped = False self.nickname = None if not (self.config is None): self.nickname = self.config.get("alias") def val_to_print(self, val, dig=1): if isinstance(val, str): return val if isinstance(val, float): f = "{:." + str(int(dig)) + "f}" return f.format(val) else: return str(val) def info(self, msg): if self.log: self.log.info(msg) else: print("info: " + msg) def debug(self, msg): if self.log: self.log.debug(msg) else: print("debug: " + msg) def warning(self, msg): if self.log: self.log.warning(msg) else: print("warning: " + msg) def error(self, msg): if self.log: self.log.error(msg) else: print("error: " + msg) def stop(self): self.stopped = True def free(self): """ Virtual method used to clean resources for threaded Commands when they are stopped. """ pass def list_modules(self): success = True ret = "List of modules:\n" for module in self.cmods.keys(): ret = ret + " " + module + "\n" return success, ret.strip() def help_module(self, module, *args): if module in self.cmods.keys(): if len(args) > 0: return self.cmods[module].cmd_help(args[0]) else: return self.cmods[module].cmd_help() else: return "No module with this name" def _execute_command(self, method: str, *args, **kwargs) -> tuple: success = False ret = (AEErrs.INTERNAL_ERROR.value, "function not found") if hasattr(self, self.prefcmd + method) and inspect.ismethod( getattr(self, self.prefcmd + method) ): self.lock.acquire() try: f = getattr(self, self.prefcmd + method) ret = f(*args, **kwargs) success = True self.log.info( 'function "{}" of module "{}" was called'.format(method, self.name) ) except TypeError: self.log.error( 'function "{}" of module "{}" wrong parameters'.format( method, self.name ) ) ret = ( AEErrs.INVALID_PARAMS.value, ERROR_MESSAGES[AEErrs.INVALID_PARAMS], ) except AppEngineException as e: self.log.error( 'function "{}" of module "{}" returned an error with code {} and message "{}"'.format( method, self.name, e.value, e.mesg ) ) ret = (e.value, e.mesg) except: self.log.error( 'function "{}" of module "{}" crashed without other notif'.format( method, self.name ) ) ret = (AEErrs.INTERNAL_ERROR.value, traceback.format_exc()) self.lock.release() return success, ret def execute_command(self, module: str, method: str, *args, **kwargs): # isolate the module called success = False if module == "": if method == 'help': return self.list_modules() else: module = self.defmod if not (module in self.cmods.keys()): m = module.replace("_", "-") if not (m in self.cmods.keys()): ret = 'module "{}" not found'.format(module) self.log.error(ret) return success, (AEErrs.INVALID_REQUEST.value, ret) return self.cmods[module]._execute_command(method, *args, **kwargs) def cmd_help(self, *args, **kwargs): """Help of module commands. Params: - if No param: list of commands - otherwise: help of the first arg""" ret = "" if len(args) == 0: cmds = inspect.getmembers(self, predicate=inspect.ismethod) cmds = [x for x in cmds if x[0].startswith(self.prefcmd)] for m in cmds: ret = ret + m[0][len(self.prefcmd):] + "\n" else: if isinstance(args[0], str): cmd = self.prefcmd + args[0] c = getattr(self, cmd, None) if c: r = inspect.getdoc(c) if r: ret = r else: ret = 'No documentation for "' + args[0] + '"' if ret == "": ret = "No command with this name..." return ret class CommandsLoader: def __init__(self, config: ConfigParser, log: logging.Handler, modpath: str): self.config = config self.modpath = modpath sys.path.append(os.path.join(modpath)) Commands.defmod = self.config["general"].get("default") prefcmd = self.config["general"].get("methods_prefix") if not prefcmd is None: Commands.prefcmd = prefcmd self.prefcmds = "cmds_" prefcmds = self.config["general"].get("modules_prefix") if not prefcmds is None: self.prefcmds = prefcmds self.log = log self.lock = Lock() self._load_commands() self._set_cmods() def _load_commands(self): cmds = {} with os.scandir(self.modpath) as it: for entry in it: if ( entry.name.startswith(self.prefcmds) and entry.name.endswith(".py") and entry.is_file() ): obj = self._load_module(entry.name[:-3]) if not obj is None: nmod = obj.nickname if nmod is None: nmod = (entry.name[:-3])[len(self.prefcmds):] cmds.update({nmod: obj}) self.log.info('module "{}" loaded'.format(nmod)) else: self.log.error( 'The module "{}" could not be loaded'.format( entry.name[:-3] ) ) self.cmods = cmds def _load_module(self, name: str): try: module = import_module(name) except ModuleNotFoundError: raise Exception(f"""The module "{name}" could not be loaded.""") members = inspect.getmembers(module, inspect.isclass) conf = None if name[len(self.prefcmds):] in self.config.sections(): conf = self.config[name[len(self.prefcmds):]] obj = None for n, c in members: if issubclass(c, Commands) and (n != "Commands"): obj = c(conf, self.log) obj.log = self.log obj.lock = self.lock break return obj def _set_cmods(self): for k, v in self.cmods.items(): v.cmods = self.cmods def _load_dependencies(self): for k, v in self.cmods.items(): if hasattr(v, "dependencies"): for p in v.dependencies: if p in self.cmods.keys(): setattr(v, p, self.cmods[p]) else: self.log.error( 'Dependency "{}" of module "{}" could not be satisfied'.format( p, k ) ) def start(self): self._load_dependencies() for k, v in self.cmods.items(): if v.threaded: v.start() def stop(self): for k, v in self.cmods.items(): if v.threaded: v.stop() def join(self): for k, v in self.cmods.items(): if v.threaded: v.join() def free(self): for k, v in self.cmods.items(): if v.threaded: v.free() class AppEngine: def __init__( self, app_name: str, conf_file: str = "", log_file: str = "", debug: bool = False, ) -> None: self.cl = None self.app_name = app_name self.conf = ConfigParser() if conf_file != "": self.parse_config(conf_file) self.def_log(log_file) signal.signal(signal.SIGINT, self.signal_handler) if debug: self.log.setLevel(logging.DEBUG) else: self.log.setLevel(logging.WARNING) def signal_handler(self, sig, frame): print("\nExiting.") self.stop() def parse_config(self, cf: str): if os.path.exists(cf) and os.path.isfile(cf): self.conf.read(cf) else: raise Exception("Configuration file not found") def def_log(self, log_file: str): self.log = logging.getLogger(self.app_name) if log_file != "": fname = log_file is_writeable = False if not os.path.isabs(fname): fname = os.path.join(os.getcwd(), fname) if os.access(os.path.dirname(fname), os.W_OK): is_writeable = True if is_writeable: self.log.addHandler(logging.FileHandler(fname)) else: self.log.error('No write permissions: "{}"'.format(fname)) def exec(self, modpath: str = ""): self.cl = CommandsLoader(self.conf, self.log, modpath) self.cl.start() self.cl.join() self.cl.free() def stop(self): self.cl.stop()