log and rewrite readconfig
parent
f6c2f80841
commit
831199bf93
@ -1,65 +1,107 @@
|
||||
import yaml
|
||||
|
||||
|
||||
class MQTT_ROS_Config:
|
||||
"""
|
||||
MQTT_ROS_Config configuration class
|
||||
"""
|
||||
# class variables
|
||||
sectionName='MQTT_ROS'
|
||||
options={
|
||||
'msg_format': (str,True),
|
||||
'MQTTClientNamePub': (str,True),
|
||||
'MQTTClientNameSub': (str,True),
|
||||
'host': (str,True),
|
||||
'port': (int,True),
|
||||
'keepalive': (int,True),
|
||||
'willTopic':(str,True),
|
||||
'lwt':(str, True),
|
||||
'willRetain':(bool,False),
|
||||
'willTopicQOS':(int,True),
|
||||
'Flight_Information_topicToMqtt': (str,False),
|
||||
'Fly_Formation_topicToMqtt': (str,False),
|
||||
'Fly_Formation_topicToMqtt_QOS':(int,False),
|
||||
'ROSClientNamePub': (str,False),
|
||||
'ROSClientNameSub': (str,False),
|
||||
'ROStopicName_Flight_Information': (str,False),
|
||||
'ROStopicName_Fly_Formation': (str,False)}
|
||||
|
||||
#constructor
|
||||
class Config:
|
||||
def __init__(self, inFileName):
|
||||
#read YAML config and get EV3 section
|
||||
infile=open(inFileName,'r')
|
||||
ymlcfg=yaml.safe_load(infile)
|
||||
infile.close()
|
||||
eccfg=ymlcfg.get(self.sectionName,None)
|
||||
if eccfg is None: raise Exception('Missing {} section in cfg file'.format(self.sectionName))
|
||||
|
||||
self.sectionNames = ["MQTT","ROS", "LOG"]
|
||||
self.options = {}
|
||||
self.inFileName = inFileName
|
||||
|
||||
def setAttribute(self):
|
||||
with open(self.inFileName,"r") as f:
|
||||
self.ymlcfg=yaml.safe_load(f)
|
||||
|
||||
ecfgs = [self.ymlcfg.get(name) for name in self.sectionNames]
|
||||
if None in ecfgs:
|
||||
nameIndex = ecfgs.index(None)
|
||||
raise Exception("Missing {} section in cfg file".format(self.sectionNames[nameIndex]))
|
||||
#iterate over options
|
||||
for opt in self.options:
|
||||
if opt in eccfg:
|
||||
optval=eccfg[opt]
|
||||
|
||||
#verify parameter type
|
||||
if type(optval) != self.options[opt][0]:
|
||||
raise Exception('Parameter "{}" has wrong type'.format(opt))
|
||||
|
||||
#create attributes on the fly
|
||||
setattr(self,opt,optval)
|
||||
else:
|
||||
if self.options[opt][1]:
|
||||
raise Exception('Missing mandatory parameter "{}"'.format(opt))
|
||||
for opts, ecfg in zip(self.options, ecfgs):
|
||||
for opt in self.options[opts]:
|
||||
if opt in ecfg:
|
||||
optval=ecfg[opt]
|
||||
#verify parameter type
|
||||
if type(optval) != self.options[opts][opt][0]:
|
||||
raise Exception("Parameter {} has wrong type".format(self.opt))
|
||||
|
||||
#create attributes on the fly
|
||||
setattr(self,opt,optval)
|
||||
else:
|
||||
setattr(self,opt,None)
|
||||
|
||||
#string representation for class data
|
||||
if self.options[opts][opt][1]:
|
||||
raise Exception("Missing mandatory parameter {}".format(self.opt))
|
||||
else:
|
||||
setattr(self,opt,None)
|
||||
|
||||
def __str__(self):
|
||||
return str(yaml.dump(self.ymlcfg, default_flow_style=False))
|
||||
|
||||
|
||||
class Read_PUB_Config(Config):
|
||||
def setAttribute(self):
|
||||
super().setAttribute()
|
||||
|
||||
def __init__(self, inFileName):
|
||||
super().__init__(inFileName)
|
||||
self.options = {
|
||||
self.sectionNames[0]:{
|
||||
"msg_format": (str,True),
|
||||
"MQTTClientNamePub": (str,True),
|
||||
"host": (str,True),
|
||||
"port": (int,True),
|
||||
"keepalive": (int,True),
|
||||
"willTopic":(str,True),
|
||||
"lwt":(str, True),
|
||||
"willRetain":(bool,False),
|
||||
"willTopicQOS":(int,True),
|
||||
"Flight_Information_topicToMqtt": (str,False),
|
||||
"Fly_Formation_topicToMqtt": (str,False),
|
||||
"Fly_Formation_topicToMqtt_QOS":(int,False)},
|
||||
self.sectionNames[1]:{
|
||||
"ROSClientNamePub": (str,False),
|
||||
"ROStopicName_Flight_Information": (str,False),
|
||||
"ROStopicName_Fly_Formation": (str,False)},
|
||||
self.sectionNames[2]:{
|
||||
"logFileName":(str,False)}}
|
||||
self.setAttribute()
|
||||
|
||||
def __str__(self):
|
||||
return str(yaml.dump(self.__dict__,default_flow_style=False))
|
||||
return super().__str__()
|
||||
|
||||
class Read_SUB_Config(Config):
|
||||
def setAttribute(self):
|
||||
super().setAttribute()
|
||||
|
||||
def __init__(self, inFileName):
|
||||
super().__init__(inFileName)
|
||||
self.options = {
|
||||
self.sectionNames[0]:{
|
||||
"msg_format": (str,True),
|
||||
"MQTTClientNameSub": (str,True),
|
||||
"host": (str,True),
|
||||
"port": (int,True),
|
||||
"keepalive": (int,True),
|
||||
"willTopic":(str,True),
|
||||
"lwt":(str, True),
|
||||
"willRetain":(bool,False),
|
||||
"willTopicQOS":(int,True),
|
||||
"Flight_Information_topicToMqtt": (str,False),
|
||||
"Fly_Formation_topicToMqtt": (str,False),
|
||||
"Fly_Formation_topicToMqtt_QOS":(int,False)},
|
||||
self.sectionNames[1]:{
|
||||
"ROSClientNameSub": (str,False),
|
||||
"ROStopicName_Flight_Information": (str,False),
|
||||
"ROStopicName_Fly_Formation": (str,False)},
|
||||
self.sectionNames[2]:{
|
||||
"logFileName":(str,False)}}
|
||||
self.setAttribute()
|
||||
|
||||
def __str__(self):
|
||||
return super().__str__()
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
cfg=MQTT_ROS_Config("mqttConfig.yml")
|
||||
print(cfg)
|
||||
if __name__ == "__main__":
|
||||
cfg=SUB_Config("mqttConfig_PUB.yml")
|
||||
cfg.setAttribute()
|
||||
print(cfg.msg_format)
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue