first importation

This commit is contained in:
2023-04-25 08:04:15 +02:00
parent e5bd8edd54
commit 0d49269357
2 changed files with 457 additions and 0 deletions

325
src/commands/__init__.py Normal file
View File

@@ -0,0 +1,325 @@
import inspect
from threading import Thread
from unicodedata import numeric
def is_number(s):
try:
float(s)
return True
except ValueError:
pass
return False
class Commands(Thread):
defmod = None
def __init__(self, config, log):
super().__init__()
self.threaded = False
self.cmods = {}
self.config = config
self.log = log
self.stopped = False
self.nickname = None
if not (self.config is None):
self.nickname = self.config.get('nickname')
def val_to_print(self, val, dig=1):
if isinstance(val, str):
return val
if isinstance(val, float):
f = '{:.' + str(int(dig)) + 'f}'
return f.format(val)
else:
return str(val)
def info(self, msg):
if self.log:
self.log.info(msg)
else:
print('info: '+msg)
def debug(self, msg):
if self.log:
self.log.debug(msg)
else:
print('debug: '+msg)
def warning(self, msg):
if self.log:
self.log.warning(msg)
else:
print('warning: '+msg)import inspect
from threading import Thread
from unicodedata import numeric
def is_number(s):
try:
float(s)
return True
except ValueError:
pass
return False
class Commands(Thread):
defmod = None
def __init__(self, config, log):
super().__init__()
self.threaded = False
self.cmods = {}
self.config = config
self.log = log
self.stopped = False
self.nickname = None
if not (self.config is None):
self.nickname = self.config.get('nickname')
def val_to_print(self, val, dig=1):
if isinstance(val, str):
return val
if isinstance(val, float):
f = '{:.' + str(int(dig)) + 'f}'
return f.format(val)
else:
return str(val)
def info(self, msg):
if self.log:
self.log.info(msg)
else:
print('info: '+msg)
def debug(self, msg):
if self.log:
self.log.debug(msg)
else:
print('debug: '+msg)
def warning(self, msg):
if self.log:
self.log.warning(msg)
else:
print('warning: '+msg)
def error(self, msg):
if self.log:
self.log.error(msg)
else:
print('error: '+msg)
def stop(self):
self.stopped = True
def list_modules(self):
success = True
ret = ''
for module in self.cmods.keys():
ret = ret + module + '\n'
return success, ret.strip()
def _execute_command(self, cmd):
c = cmd.split(',', maxsplit=1)
key = "cmd_" + c[0].strip().lower().replace(' ', '_')
params = []
#parameters
if len(c) > 1:
for p in c[1].split(','):
p = p.strip()
if is_number(p):
params.append(float(p))
else:
params.append(p)
params = tuple(params)
success = False
ret = "function not found"
if hasattr(self, key) and inspect.ismethod(getattr(self, key)):
self.lock.acquire()
try:
f = getattr(self, key)
ret = f(*params)
success = True
self.log.info('function "{}" of module "{}" was called'.format(key, self.name))
except TypeError:
self.log.error('function "{}" of module "{}" wrong parameters'.format(key, self.name))
ret = 'param error'
except:
self.log.error('function "{}" of module "{}" crashed'.format(key, self.name))
ret = 'function crash'
self.lock.release()
if ret is None:
ret = ''
return success, ret.strip()
def execute_command(self, cmd):
# isolate the module called
success = False
c = cmd.split('.', maxsplit=1)
if ((len(c) == 1) and (self.defmod is None)) or (len(c) < 1):
ret = 'Wrong call syntax. It should be: "<module>.<function>=param,param2..."'
self.log.error(ret)
return success, ret
if (len(c) == 1) and (c[0].lower() == 'help'):
return self.list_modules()
elif len(c) == 1:
module = self.defmod
command = c[0]
else:
module = c[0].lower()
command = c[1]
if not (module in self.cmods.keys()):
m = module.replace('_','-')
if not (m in self.cmods.keys()):
ret = 'module "{}" not found'.format(module)
self.log.error(ret)
return success, ret
return self.cmods[module]._execute_command(command)
def cmd_help(self,*args):
'''Help of module commands.
Params:
- if No param: list of commands
- otherwise: help of the first arg'''
ret = ''
if len(args) == 0:
cmds = inspect.getmembers(self, predicate=inspect.ismethod)
cmds = [x for x in cmds if x[0].startswith('cmd_')]
for m in cmds:
ret = ret + m[0][len('cmd_'):] + '\n'
else:
if isinstance(args[0], str):
cmd = 'cmd_'+args[0]
c = getattr(self, cmd, None)
if c:
r = inspect.getdoc(c)
if r:
ret = r
else:
ret = 'No documentation for "' + args[0] + '"'
if ret == '':
ret = 'No command with this name...'
return ret
def error(self, msg):
if self.log:
self.log.error(msg)
else:
print('error: '+msg)
def stop(self):
self.stopped = True
def list_modules(self):
success = True
ret = ''
for module in self.cmods.keys():
ret = ret + module + '\n'
return success, ret.strip()
def _execute_command(self, cmd):
c = cmd.split(',', maxsplit=1)
key = "cmd_" + c[0].strip().lower().replace(' ', '_')
params = []
#parameters
if len(c) > 1:
for p in c[1].split(','):
p = p.strip()
if is_number(p):
params.append(float(p))
else:
params.append(p)
params = tuple(params)
success = False
ret = "function not found"
if hasattr(self, key) and inspect.ismethod(getattr(self, key)):
self.lock.acquire()
try:
f = getattr(self, key)
ret = f(*params)
success = True
self.log.info('function "{}" of module "{}" was called'.format(key, self.name))
except TypeError:
self.log.error('function "{}" of module "{}" wrong parameters'.format(key, self.name))
ret = 'param error'
except:
self.log.error('function "{}" of module "{}" crashed'.format(key, self.name))
ret = 'function crash'
self.lock.release()
if ret is None:
ret = ''
return success, ret.strip()
def execute_command(self, cmd):
# isolate the module called
success = False
c = cmd.split('.', maxsplit=1)
if ((len(c) == 1) and (self.defmod is None)) or (len(c) < 1):
ret = 'Wrong call syntax. It should be: "<module>.<function>=param,param2..."'
self.log.error(ret)
return success, ret
if (len(c) == 1) and (c[0].lower() == 'help'):
return self.list_modules()
elif len(c) == 1:
module = self.defmod
command = c[0]
else:
module = c[0].lower()
command = c[1]
if not (module in self.cmods.keys()):
m = module.replace('_','-')
if not (m in self.cmods.keys()):
ret = 'module "{}" not found'.format(module)
self.log.error(ret)
return success, ret
return self.cmods[module]._execute_command(command)
def cmd_help(self,*args):
'''Help of module commands.
Params:
- if No param: list of commands
- otherwise: help of the first arg'''
ret = ''
if len(args) == 0:
cmds = inspect.getmembers(self, predicate=inspect.ismethod)
cmds = [x for x in cmds if x[0].startswith('cmd_')]
for m in cmds:
ret = ret + m[0][len('cmd_'):] + '\n'
else:
if isinstance(args[0], str):
cmd = 'cmd_'+args[0]
c = getattr(self, cmd, None)
if c:
r = inspect.getdoc(c)
if r:
ret = r
else:
ret = 'No documentation for "' + args[0] + '"'
if ret == '':
ret = 'No command with this name...'
return ret

132
src/main.py Normal file
View File

@@ -0,0 +1,132 @@
#!/usr/bin/python3
import os
import configparser
import logging
from systemd import journal
import inspect
from importlib import import_module
from commands import Commands
from threading import Lock
ourpath = os.path.dirname(os.path.abspath(__file__))
def dir_writeable(path):
return os.access(path, os.W_OK)
class CommandsLoader:
def __init__(self, config, log):
self.config = config
Commands.defmod = self.config['general'].get('default')
self.log = log
self.lock = Lock()
self._load_commands()
self._set_cmods()
self._load_dependencies()
def _load_commands(self):
cmds = {}
with os.scandir(os.path.join(ourpath, "commands")) as it:
for entry in it:
if entry.name.startswith('cmds_') and entry.name.endswith('.py') and entry.is_file():
obj = self._load_module(entry.name[:-3])
if not obj is None:
nmod = obj.nickname
if nmod is None:
nmod = (entry.name[:-3])[5:]
cmds.update({nmod: obj})
self.log.info('module "{}" loaded'.format(nmod))
else:
self.log.error(
'The module "{}" could not be loaded'.format(entry.name[:-3]))
self.cmods = cmds
def _load_module(self, name):
module = import_module("."+name, 'commands')
members = inspect.getmembers(module, inspect.isclass)
conf = None
if name[5:] in self.config.sections():
conf = self.config[name[5:]]
obj = None
for n, c in members:
if issubclass(c, Commands) and (n != 'Commands'):
obj = c(conf, self.log)
obj.log = self.log
obj.lock = self.lock
break
return obj
def _set_cmods(self):
for k, v in self.cmods.items():
v.cmods = self.cmods
def _load_dependencies(self):
for k, v in self.cmods.items():
if hasattr(v, "dependencies"):
for p in v.dependencies:
if p in self.cmods.keys():
setattr(v, p, self.cmods[p])
else:
log.error(
'Dependency "{}" of module "{}" could not be satisfied'.format(p, k))
def start(self):
for k, v in self.cmods.items():
if v.threaded:
v.start()
def join(self):
for k, v in self.cmods.items():
if v.threaded:
v.join()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"-c", "--config", help="Configuration file", default='/etc/sms-server.conf')
parser.add_argument("-l", "--log-file", help="log to a file instead of the console",
default='')
parser.add_argument("-g", "--debug", action='store_true', required=False,
help="Debug messages logged")
args = parser.parse_args()
if os.path.exists(args.config):
conf = configparser.ConfigParser()
conf.read(args.config)
else:
raise Exception('Configuration file not found')
log = logging.getLogger(os.path.basename(__name__))
if args.log_file != '':
fname = args.log_file
is_writeable = False
if not os.path.isabs(fname):
fname = os.path.join(os.getcwd(), fname)
if dir_writeable(os.path.dirname(fname)):
is_writeable = True
if is_writeable:
log.addHandler(logging.FileHandler(fname))
else:
log.addHandler(journal.JournalHandler())
log.error('No write permissions: "{}"'.format(fname))
else:
log.addHandler(journal.JournalHandler())
if args.debug:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.WARNING)
cl = CommandsLoader(conf, log)
cl.start()
cl.join()