add mqttCmd

main
RangeOfGlitching 3 years ago
parent 831199bf93
commit 5399e5cf7f

@ -0,0 +1,79 @@
#!/usr/bin/env python3
#coding:utf-8
import paho.mqtt.client as mqtt
import os
import sys
import time
from utils.readConfig import Read_CMD_Config
from utils.basicMqtt import MQTTClient
from utils.protoJson_mqtt_cmd_data_to_ros import Json_msg_to_ros
import logging
from std_msgs.msg import String
import rospy
def init_dataFormat(cfg:Read_CMD_Config):
if cfg.msg_format == "Json":
# Json_msg_to_ros.rate = rospy.Rate(10)
# Json_msg_to_ros.publisher_Flight_Information = rospy.Publisher(cfg.ROStopicName_Cmd_Broadcast_Receiver,String,queue_size=10)
client.on_message = Json_msg_to_ros.on_message
Json_msg_to_ros.Cmd_Broadcast_topicToMqtt = cfg.Cmd_Broadcast_topicToMqtt
elif cfg.msg_format == "Proto":
pass
else:
logging.debug("msg_format not found")
def on_connect(self, userdata, flags, rc):
logger.info("Connected with result code " + str(rc))
client.subscribe(cfg.Cmd_Broadcast_topicToMqtt)
def on_disconnect(client, userdata, rc):
logger.info("disconnecting reason " +str(rc))
client.connected_flag=False
client.disconnect_flag=True
if __name__ == '__main__':
# Read Config
FilePath = os.path.join(os.path.dirname(__file__),"utils","mqttConfig_CMD.yml")
cfg = Read_CMD_Config(FilePath)
client = MQTTClient(cfg.MQTTClientNameCmd)
# set log
log_format = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(cfg.logFileName)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)
logger = logging.getLogger("__CMD__")
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
logger.info(cfg)
# Mqtt
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect(host=cfg.host, port=cfg.port, keepalive=cfg.keepalive)
client.loop_start()
# Ros
# rospy.init_node(cfg.ROSClientNameCmd)
# init data format
init_dataFormat(cfg)
try:
# rospy.spin()
while True:
time.sleep(0.25)
except BaseException as error:
client.disconnect()
logger.info("End of program")
sys.exit()

@ -99,7 +99,7 @@ if __name__ == '__main__':
file_handler.setLevel(logging.INFO)
logger = logging.getLogger("__PUB__")
logger.setLevel(logging.INFO)
logger.setLevel(logging.CRITICAL)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)

@ -39,12 +39,12 @@ def init_dataFormat(cfg:utils.Read_SUB_Config):
# utils.Json_msg_to_ros.ros_pub(msg)
def on_connect(self, userdata, flags, rc):
logging.info("Connected with result code " + str(rc))
logger.info("Connected with result code " + str(rc))
client.subscribe(cfg.Flight_Information_topicToMqtt)
client.subscribe(cfg.Fly_Formation_topicToMqtt)
def on_disconnect(client, userdata, rc):
# logging.info("disconnecting reason " +str(rc))
# logger.info("disconnecting reason " +str(rc))
client.connected_flag=False
client.disconnect_flag=True

@ -1 +1 @@
{"serialize":{"json":0.3499185360000183,"proto":1.0996155560000034},"deserializ":{"json":0.4229183979999789,"proto":0.5481516849999934}}
{"serialize":{"json":0.34842266000123345,"proto":1.0934251180005958},"deserializ":{"json":0.42545599799996126,"proto":0.48490119200141635}}

@ -0,0 +1,32 @@
#!/usr/bin/env python3
#coding:utf-8
import paho.mqtt.client as mqtt
import os
import sys
import time
import utils
import proto.flight_information_pb2 as flight_information_pb2
import proto.flyformatioln_pb2 as flyformatioln_pb2
import logging
def init_dataFormat(cfg:utils.Read_PUB_Config):
# ros_namespace="/drone1"
if cfg.msg_format == "Proto":
utils.Proto_msg_from_ros.client = client
utils.Proto_msg_from_ros.flight_information_msg = flight_information_pb2.flight_information_message()
utils.Proto_msg_from_ros.fly_formation_msg = flyformatioln_pb2.fly_formation_message()
utils.Proto_msg_from_ros.Flight_Information_topicToMqtt = cfg.Flight_Information_topicToMqtt
utils.Proto_msg_from_ros.Fly_Formation_topicToMqtt = cfg.Fly_Formation_topicToMqtt
utils.Proto_msg_from_ros.Fly_Formation_topicToMqtt_QOS = cfg.Fly_Formation_topicToMqtt_QOS
# rospy.Subscriber(ros_namespace+'/mavros/global_position/global', NavSatFix, utils.Proto_msg_from_ros.callBack_gps)
# rospy.Subscriber(ros_namespace+'/mavros/global_position/compass_hdg', Float64, utils.Proto_msg_from_ros.callBack_compass_hdg)
elif cfg.msg_format == "Json":
utils.Json_msg_from_ros.client = client
utils.Json_msg_from_ros.Flight_Information_topicToMqtt = cfg.Flight_Information_topicToMqtt
utils.Json_msg_from_ros.Fly_Formation_topicToMqtt = cfg.Fly_Formation_topicToMqtt
utils.Json_msg_from_ros.Fly_Formation_topicToMqtt_QOS = cfg.Fly_Formation_topicToMqtt_QOS
# rospy.Subscriber(ros_namespace+'/mavros/global_position/global', NavSatFix, utils.Json_msg_from_ros.callBack_gps)
# rospy.Subscriber(ros_namespace+'/mavros/global_position/compass_hdg', Float64, utils.Json_msg_from_ros.callBack_compass_hdg)
else:
logging.debug("msg_format not found")

@ -44,8 +44,7 @@ if __name__=='__main__':
json_deserialize_time = timeit.timeit(lambda: json_deserialize(orjson), number = 1000000)
time_dict = {"serialize": {"json": json_serialize_time, "proto":proto_serialize_time}, "deserializ":{"json": json_deserialize_time, "proto":proto_deserialize_time}}
print(f"proto_serialize_time: {proto_serialize_time}")
print(f"json_serialize_time: {json_serialize_time}")
print()
print(f"json_serialize_time: {json_serialize_time}\n")
print(f"proto_deserialize_time: {proto_deserialize_time}")
print(f"json_deserialize_time: {json_deserialize_time}")
fileName = "Data/timeData"

@ -1,7 +1,7 @@
from utils.protoJson_mqtt_pub_data_to_ros import Proto_msg_to_ros
from utils.protoJson_mqtt_pub_data_to_ros import Json_msg_to_ros
from utils.protoJson_mqtt_sub_data_from_ros import Proto_msg_from_ros
from utils.protoJson_mqtt_sub_data_from_ros import Json_msg_from_ros
from utils.protoJson_mqtt_sub_data_from_ros import Json_msg_from_ros
from utils.readConfig import Read_PUB_Config
from utils.readConfig import Read_SUB_Config
from utils.basicMqtt import MQTTClient

@ -0,0 +1,15 @@
MQTT:
msg_format: Json
MQTTClientNameCmd: Drone550Cmd
host: 192.168.50.118
port: 1883
keepalive: 600
# Mqtt topic
Cmd_Broadcast_topicToMqtt: cmd/broadcast
Cmd_Broadcast_topicToMqtt_QOS: 2
#ROS
ROS:
ROSClientNameCmd: Drone550_Cmd
ROStopicName_Cmd_Broadcast_Receiver: cmd_receiver
LOG:
logFileName: cmd.log

@ -0,0 +1,32 @@
import orjson
import time
import logging
# TODO: use native ros type instead of json or str
logger = logging.getLogger("__CMD__")
class Proto_msg_to_ros:
pass
class Json_msg_to_ros:
rate = None
# Ros publisher
publisher_Cmd_Broadcast = None
Cmd_Broadcast_topicToMqtt = None
@classmethod
def ros_pub(cls, dataJson):
if dataJson.topic == cls.Cmd_Broadcast_topicToMqtt:
logger.debug(dataJson.payload.decode("UTF-8"))
# cls.publisher_Cmd_Broadcast.publish(dataJson.payload.decode("UTF-8"))
# cls.rate.sleep()
else:
logger.info("topic not found")
@staticmethod
def on_message(client, userdata, msg):
Json_msg_to_ros.ros_pub(msg)

@ -4,8 +4,10 @@ import time
import proto.flight_information_pb2 as flight_information_pb2
import proto.flyformatioln_pb2 as flyformatioln_pb2
import google.protobuf.json_format as json_format
import logging
# TODO: use native ros type instead of json or str
logger = logging.getLogger("__SUB__")
class Proto_msg_to_ros:
#Protobuf
@ -28,18 +30,18 @@ class Proto_msg_to_ros:
if dataProto.topic == cls.Flight_Information_topicToMqtt:
proto_msg = cls.flight_information_msg.FromString(dataProto.payload)
protoTOJson_msg = json_format.MessageToJson(proto_msg, indent=None, preserving_proto_field_name=True)
print(protoTOJson_msg)
logger.info(protoTOJson_msg)
# cls.publisher_Flight_Information.publish(protoTOJson_msg)
# cls.rate.sleep()
elif dataProto.topic == cls.Fly_Formation_topicToMqtt:
proto_msg = cls.fly_formation_msg.FromString(dataProto.payload)
protoTOJson_msg = json_format.MessageToJson(proto_msg, indent=None, preserving_proto_field_name=True)
# protoTOJson_msg = json_format.MessageToJson(proto_msg, indent=None, preserving_proto_field_name=True, use_integers_for_enums=True)
print(protoTOJson_msg)
logger.info(protoTOJson_msg)
# cls.publisher_Fly_Formation.publish(protoTOJson_msg)
# cls.rate.sleep()
else:
print("topic not found")
logger.info("topic not found")
@staticmethod
@ -59,15 +61,15 @@ class Json_msg_to_ros:
@classmethod
def ros_pub(cls, dataJson):
if dataJson.topic == cls.Flight_Information_topicToMqtt:
print(dataJson.payload.decode("UTF-8"))
logger.info(dataJson.payload.decode("UTF-8"))
# cls.publisher_Flight_Information.publish(dataJson.payload.decode("UTF-8"))
# cls.rate.sleep()
elif dataJson.topic == cls.Fly_Formation_topicToMqtt:
print(dataJson.payload.decode("UTF-8"))
logger.info(dataJson.payload.decode("UTF-8"))
# cls.publisher_Fly_Formation.publish(dataJson.payload.decode("UTF-8"))
# cls.rate.sleep()
else:
print("topic not found")
logger.info("topic not found")
@staticmethod

@ -1,5 +1,8 @@
import orjson
import time
import logging
logger = logging.getLogger("__PUB__")
class Proto_msg_from_ros:
#Protobuf
@ -46,6 +49,7 @@ class Proto_msg_from_ros:
# print(f"just published {message} to topic")
if waitForAck:
while mid not in cls.client.topic_ack:
# TODO: logging
print("wait for ack")
time.sleep(0.25)
cls.client.topic_ack.remove(mid)
@ -75,7 +79,7 @@ class Json_msg_from_ros:
dataImuUpdate = {"gyro_x": gyro_x, "gyro_y": gyro_y,"gyro_z":gyro_z, "accel_x": accel_x, "accel_y": accel_y, "accel_z": accel_z}
cls.data.update(dataImuUpdate)
# print ('gyro_x:'+gyro_x+'\n'+'gyro_y:'+gyro_y+'\n'+'gyro_z:'+gyro_z +'\n')
print ('accel_x:'+accel_x+'\n'+'accel_y:'+accel_y+'\n'+'accel_z:'+accel_z +'\n')
@classmethod
def callBack_gps(cls, GPS):

@ -9,7 +9,6 @@ class Config:
def setAttribute(self):
with open(self.inFileName,"r") as f:
self.ymlcfg=yaml.safe_load(f)
ecfgs = [self.ymlcfg.get(name) for name in self.sectionNames]
if None in ecfgs:
nameIndex = ecfgs.index(None)
@ -21,13 +20,13 @@ class Config:
optval=ecfg[opt]
#verify parameter type
if type(optval) != self.options[opts][opt][0]:
raise Exception("Parameter {} has wrong type".format(self.opt))
raise Exception("Parameter {} has wrong type".format(opt))
#create attributes on the fly
setattr(self,opt,optval)
else:
if self.options[opts][opt][1]:
raise Exception("Missing mandatory parameter {}".format(self.opt))
raise Exception("Missing mandatory parameter {}".format(opt))
else:
setattr(self,opt,None)
@ -94,14 +93,38 @@ class Read_SUB_Config(Config):
"logFileName":(str,False)}}
self.setAttribute()
class Read_CMD_Config(Config):
def setAttribute(self):
super().setAttribute()
def __init__(self, inFileName):
super().__init__(inFileName)
self.options = {
self.sectionNames[0]:{
"msg_format": (str,True),
"MQTTClientNameCmd": (str,True),
"host": (str,True),
"port": (int,True),
"keepalive": (int,True),
"Cmd_Broadcast_topicToMqtt": (str,True),
"Cmd_Direct_topicToMqtt": (str,False),
"Cmd_Broadcast_topicToMqtt_QOS":(int,True),
"Cmd_Direct_topicToMqtt_QOS":(int,False)},
self.sectionNames[1]:{
"ROSClientNameCmd": (str,True),
"ROStopicName_Cmd_Broadcast_Receiver": (str,True),
"ROStopicName_Cmd_Direct_Receiver": (str,False)},
self.sectionNames[2]:{
"logFileName":(str,False)}}
self.setAttribute()
def __str__(self):
return super().__str__()
if __name__ == "__main__":
cfg=SUB_Config("mqttConfig_PUB.yml")
cfg.setAttribute()
print(cfg.msg_format)
cfg=Read_CMD_Config("mqttConfig_CMD.yml")
print(cfg)

Loading…
Cancel
Save