From 5399e5cf7f689d9b5dd2205fc1a5c1d1c46cf391 Mon Sep 17 00:00:00 2001 From: RangeOfGlitching Date: Fri, 10 Feb 2023 03:12:33 +0800 Subject: [PATCH] add mqttCmd --- Stream/uav_proto_msg/mqttCmdMain.py | 79 +++++++++++++++++++ Stream/uav_proto_msg/mqttPubMain.py | 2 +- Stream/uav_proto_msg/mqttSubMain.py | 4 +- Stream/uav_proto_msg/protomsg/Data/timeData | 2 +- Stream/uav_proto_msg/protomsg/rtl_test.py | 32 ++++++++ Stream/uav_proto_msg/protomsg/timeit_test.py | 3 +- Stream/uav_proto_msg/utils/__init__.py | 2 +- Stream/uav_proto_msg/utils/mqttConfig_CMD.yml | 15 ++++ .../utils/protoJson_mqtt_cmd_data_to_ros.py | 32 ++++++++ .../utils/protoJson_mqtt_pub_data_to_ros.py | 14 ++-- .../utils/protoJson_mqtt_sub_data_from_ros.py | 6 +- Stream/uav_proto_msg/utils/readConfig.py | 35 ++++++-- 12 files changed, 206 insertions(+), 20 deletions(-) create mode 100644 Stream/uav_proto_msg/mqttCmdMain.py create mode 100644 Stream/uav_proto_msg/protomsg/rtl_test.py create mode 100644 Stream/uav_proto_msg/utils/mqttConfig_CMD.yml create mode 100644 Stream/uav_proto_msg/utils/protoJson_mqtt_cmd_data_to_ros.py diff --git a/Stream/uav_proto_msg/mqttCmdMain.py b/Stream/uav_proto_msg/mqttCmdMain.py new file mode 100644 index 0000000..22ce926 --- /dev/null +++ b/Stream/uav_proto_msg/mqttCmdMain.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +#coding:utf-8 +import paho.mqtt.client as mqtt +import os +import sys +import time +from utils.readConfig import Read_CMD_Config +from utils.basicMqtt import MQTTClient +from utils.protoJson_mqtt_cmd_data_to_ros import Json_msg_to_ros +import logging + +from std_msgs.msg import String +import rospy + +def init_dataFormat(cfg:Read_CMD_Config): + if cfg.msg_format == "Json": + # Json_msg_to_ros.rate = rospy.Rate(10) + # Json_msg_to_ros.publisher_Flight_Information = rospy.Publisher(cfg.ROStopicName_Cmd_Broadcast_Receiver,String,queue_size=10) + client.on_message = Json_msg_to_ros.on_message + Json_msg_to_ros.Cmd_Broadcast_topicToMqtt = cfg.Cmd_Broadcast_topicToMqtt + elif cfg.msg_format == "Proto": + pass + else: + logging.debug("msg_format not found") + +def on_connect(self, userdata, flags, rc): + logger.info("Connected with result code " + str(rc)) + client.subscribe(cfg.Cmd_Broadcast_topicToMqtt) + +def on_disconnect(client, userdata, rc): + logger.info("disconnecting reason " +str(rc)) + client.connected_flag=False + client.disconnect_flag=True + +if __name__ == '__main__': + # Read Config + FilePath = os.path.join(os.path.dirname(__file__),"utils","mqttConfig_CMD.yml") + cfg = Read_CMD_Config(FilePath) + client = MQTTClient(cfg.MQTTClientNameCmd) + + # set log + log_format = "%(asctime)s - %(levelname)s - %(message)s" + formatter = logging.Formatter(log_format) + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + stream_handler.setLevel(logging.DEBUG) + + file_handler = logging.FileHandler(cfg.logFileName) + file_handler.setFormatter(formatter) + file_handler.setLevel(logging.INFO) + + logger = logging.getLogger("__CMD__") + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + logger.info(cfg) + + # Mqtt + client.on_connect = on_connect + client.on_disconnect = on_disconnect + client.connect(host=cfg.host, port=cfg.port, keepalive=cfg.keepalive) + client.loop_start() + + # Ros + # rospy.init_node(cfg.ROSClientNameCmd) + + # init data format + init_dataFormat(cfg) + + try: + # rospy.spin() + while True: + time.sleep(0.25) + except BaseException as error: + client.disconnect() + logger.info("End of program") + sys.exit() + \ No newline at end of file diff --git a/Stream/uav_proto_msg/mqttPubMain.py b/Stream/uav_proto_msg/mqttPubMain.py index 04a40b6..2729d37 100644 --- a/Stream/uav_proto_msg/mqttPubMain.py +++ b/Stream/uav_proto_msg/mqttPubMain.py @@ -99,7 +99,7 @@ if __name__ == '__main__': file_handler.setLevel(logging.INFO) logger = logging.getLogger("__PUB__") - logger.setLevel(logging.INFO) + logger.setLevel(logging.CRITICAL) logger.addHandler(file_handler) logger.addHandler(stream_handler) diff --git a/Stream/uav_proto_msg/mqttSubMain.py b/Stream/uav_proto_msg/mqttSubMain.py index 03b8778..d065601 100644 --- a/Stream/uav_proto_msg/mqttSubMain.py +++ b/Stream/uav_proto_msg/mqttSubMain.py @@ -39,12 +39,12 @@ def init_dataFormat(cfg:utils.Read_SUB_Config): # utils.Json_msg_to_ros.ros_pub(msg) def on_connect(self, userdata, flags, rc): - logging.info("Connected with result code " + str(rc)) + logger.info("Connected with result code " + str(rc)) client.subscribe(cfg.Flight_Information_topicToMqtt) client.subscribe(cfg.Fly_Formation_topicToMqtt) def on_disconnect(client, userdata, rc): - # logging.info("disconnecting reason " +str(rc)) + # logger.info("disconnecting reason " +str(rc)) client.connected_flag=False client.disconnect_flag=True diff --git a/Stream/uav_proto_msg/protomsg/Data/timeData b/Stream/uav_proto_msg/protomsg/Data/timeData index c1e2610..f87b524 100644 --- a/Stream/uav_proto_msg/protomsg/Data/timeData +++ b/Stream/uav_proto_msg/protomsg/Data/timeData @@ -1 +1 @@ -{"serialize":{"json":0.3499185360000183,"proto":1.0996155560000034},"deserializ":{"json":0.4229183979999789,"proto":0.5481516849999934}} \ No newline at end of file +{"serialize":{"json":0.34842266000123345,"proto":1.0934251180005958},"deserializ":{"json":0.42545599799996126,"proto":0.48490119200141635}} \ No newline at end of file diff --git a/Stream/uav_proto_msg/protomsg/rtl_test.py b/Stream/uav_proto_msg/protomsg/rtl_test.py new file mode 100644 index 0000000..d53b170 --- /dev/null +++ b/Stream/uav_proto_msg/protomsg/rtl_test.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 +#coding:utf-8 +import paho.mqtt.client as mqtt +import os +import sys +import time +import utils +import proto.flight_information_pb2 as flight_information_pb2 +import proto.flyformatioln_pb2 as flyformatioln_pb2 +import logging + + +def init_dataFormat(cfg:utils.Read_PUB_Config): + # ros_namespace="/drone1" + if cfg.msg_format == "Proto": + utils.Proto_msg_from_ros.client = client + utils.Proto_msg_from_ros.flight_information_msg = flight_information_pb2.flight_information_message() + utils.Proto_msg_from_ros.fly_formation_msg = flyformatioln_pb2.fly_formation_message() + utils.Proto_msg_from_ros.Flight_Information_topicToMqtt = cfg.Flight_Information_topicToMqtt + utils.Proto_msg_from_ros.Fly_Formation_topicToMqtt = cfg.Fly_Formation_topicToMqtt + utils.Proto_msg_from_ros.Fly_Formation_topicToMqtt_QOS = cfg.Fly_Formation_topicToMqtt_QOS + # rospy.Subscriber(ros_namespace+'/mavros/global_position/global', NavSatFix, utils.Proto_msg_from_ros.callBack_gps) + # rospy.Subscriber(ros_namespace+'/mavros/global_position/compass_hdg', Float64, utils.Proto_msg_from_ros.callBack_compass_hdg) + elif cfg.msg_format == "Json": + utils.Json_msg_from_ros.client = client + utils.Json_msg_from_ros.Flight_Information_topicToMqtt = cfg.Flight_Information_topicToMqtt + utils.Json_msg_from_ros.Fly_Formation_topicToMqtt = cfg.Fly_Formation_topicToMqtt + utils.Json_msg_from_ros.Fly_Formation_topicToMqtt_QOS = cfg.Fly_Formation_topicToMqtt_QOS + # rospy.Subscriber(ros_namespace+'/mavros/global_position/global', NavSatFix, utils.Json_msg_from_ros.callBack_gps) + # rospy.Subscriber(ros_namespace+'/mavros/global_position/compass_hdg', Float64, utils.Json_msg_from_ros.callBack_compass_hdg) + else: + logging.debug("msg_format not found") \ No newline at end of file diff --git a/Stream/uav_proto_msg/protomsg/timeit_test.py b/Stream/uav_proto_msg/protomsg/timeit_test.py index 6616fbd..6231f9f 100644 --- a/Stream/uav_proto_msg/protomsg/timeit_test.py +++ b/Stream/uav_proto_msg/protomsg/timeit_test.py @@ -44,8 +44,7 @@ if __name__=='__main__': json_deserialize_time = timeit.timeit(lambda: json_deserialize(orjson), number = 1000000) time_dict = {"serialize": {"json": json_serialize_time, "proto":proto_serialize_time}, "deserializ":{"json": json_deserialize_time, "proto":proto_deserialize_time}} print(f"proto_serialize_time: {proto_serialize_time}") - print(f"json_serialize_time: {json_serialize_time}") - print() + print(f"json_serialize_time: {json_serialize_time}\n") print(f"proto_deserialize_time: {proto_deserialize_time}") print(f"json_deserialize_time: {json_deserialize_time}") fileName = "Data/timeData" diff --git a/Stream/uav_proto_msg/utils/__init__.py b/Stream/uav_proto_msg/utils/__init__.py index b2ccc77..1128946 100644 --- a/Stream/uav_proto_msg/utils/__init__.py +++ b/Stream/uav_proto_msg/utils/__init__.py @@ -1,7 +1,7 @@ from utils.protoJson_mqtt_pub_data_to_ros import Proto_msg_to_ros from utils.protoJson_mqtt_pub_data_to_ros import Json_msg_to_ros from utils.protoJson_mqtt_sub_data_from_ros import Proto_msg_from_ros -from utils.protoJson_mqtt_sub_data_from_ros import Json_msg_from_ros +from utils.protoJson_mqtt_sub_data_from_ros import Json_msg_from_ros from utils.readConfig import Read_PUB_Config from utils.readConfig import Read_SUB_Config from utils.basicMqtt import MQTTClient \ No newline at end of file diff --git a/Stream/uav_proto_msg/utils/mqttConfig_CMD.yml b/Stream/uav_proto_msg/utils/mqttConfig_CMD.yml new file mode 100644 index 0000000..4bc2e12 --- /dev/null +++ b/Stream/uav_proto_msg/utils/mqttConfig_CMD.yml @@ -0,0 +1,15 @@ +MQTT: + msg_format: Json + MQTTClientNameCmd: Drone550Cmd + host: 192.168.50.118 + port: 1883 + keepalive: 600 + # Mqtt topic + Cmd_Broadcast_topicToMqtt: cmd/broadcast + Cmd_Broadcast_topicToMqtt_QOS: 2 + #ROS +ROS: + ROSClientNameCmd: Drone550_Cmd + ROStopicName_Cmd_Broadcast_Receiver: cmd_receiver +LOG: + logFileName: cmd.log diff --git a/Stream/uav_proto_msg/utils/protoJson_mqtt_cmd_data_to_ros.py b/Stream/uav_proto_msg/utils/protoJson_mqtt_cmd_data_to_ros.py new file mode 100644 index 0000000..9088fe5 --- /dev/null +++ b/Stream/uav_proto_msg/utils/protoJson_mqtt_cmd_data_to_ros.py @@ -0,0 +1,32 @@ +import orjson +import time + +import logging + +# TODO: use native ros type instead of json or str +logger = logging.getLogger("__CMD__") + +class Proto_msg_to_ros: + pass + +class Json_msg_to_ros: + rate = None + # Ros publisher + publisher_Cmd_Broadcast = None + + Cmd_Broadcast_topicToMqtt = None + + + @classmethod + def ros_pub(cls, dataJson): + if dataJson.topic == cls.Cmd_Broadcast_topicToMqtt: + logger.debug(dataJson.payload.decode("UTF-8")) + # cls.publisher_Cmd_Broadcast.publish(dataJson.payload.decode("UTF-8")) + # cls.rate.sleep() + else: + logger.info("topic not found") + + + @staticmethod + def on_message(client, userdata, msg): + Json_msg_to_ros.ros_pub(msg) \ No newline at end of file diff --git a/Stream/uav_proto_msg/utils/protoJson_mqtt_pub_data_to_ros.py b/Stream/uav_proto_msg/utils/protoJson_mqtt_pub_data_to_ros.py index 28f1046..6ed5970 100644 --- a/Stream/uav_proto_msg/utils/protoJson_mqtt_pub_data_to_ros.py +++ b/Stream/uav_proto_msg/utils/protoJson_mqtt_pub_data_to_ros.py @@ -4,8 +4,10 @@ import time import proto.flight_information_pb2 as flight_information_pb2 import proto.flyformatioln_pb2 as flyformatioln_pb2 import google.protobuf.json_format as json_format +import logging # TODO: use native ros type instead of json or str +logger = logging.getLogger("__SUB__") class Proto_msg_to_ros: #Protobuf @@ -28,18 +30,18 @@ class Proto_msg_to_ros: if dataProto.topic == cls.Flight_Information_topicToMqtt: proto_msg = cls.flight_information_msg.FromString(dataProto.payload) protoTOJson_msg = json_format.MessageToJson(proto_msg, indent=None, preserving_proto_field_name=True) - print(protoTOJson_msg) + logger.info(protoTOJson_msg) # cls.publisher_Flight_Information.publish(protoTOJson_msg) # cls.rate.sleep() elif dataProto.topic == cls.Fly_Formation_topicToMqtt: proto_msg = cls.fly_formation_msg.FromString(dataProto.payload) protoTOJson_msg = json_format.MessageToJson(proto_msg, indent=None, preserving_proto_field_name=True) # protoTOJson_msg = json_format.MessageToJson(proto_msg, indent=None, preserving_proto_field_name=True, use_integers_for_enums=True) - print(protoTOJson_msg) + logger.info(protoTOJson_msg) # cls.publisher_Fly_Formation.publish(protoTOJson_msg) # cls.rate.sleep() else: - print("topic not found") + logger.info("topic not found") @staticmethod @@ -59,15 +61,15 @@ class Json_msg_to_ros: @classmethod def ros_pub(cls, dataJson): if dataJson.topic == cls.Flight_Information_topicToMqtt: - print(dataJson.payload.decode("UTF-8")) + logger.info(dataJson.payload.decode("UTF-8")) # cls.publisher_Flight_Information.publish(dataJson.payload.decode("UTF-8")) # cls.rate.sleep() elif dataJson.topic == cls.Fly_Formation_topicToMqtt: - print(dataJson.payload.decode("UTF-8")) + logger.info(dataJson.payload.decode("UTF-8")) # cls.publisher_Fly_Formation.publish(dataJson.payload.decode("UTF-8")) # cls.rate.sleep() else: - print("topic not found") + logger.info("topic not found") @staticmethod diff --git a/Stream/uav_proto_msg/utils/protoJson_mqtt_sub_data_from_ros.py b/Stream/uav_proto_msg/utils/protoJson_mqtt_sub_data_from_ros.py index f373034..0db73a2 100644 --- a/Stream/uav_proto_msg/utils/protoJson_mqtt_sub_data_from_ros.py +++ b/Stream/uav_proto_msg/utils/protoJson_mqtt_sub_data_from_ros.py @@ -1,5 +1,8 @@ import orjson import time +import logging + +logger = logging.getLogger("__PUB__") class Proto_msg_from_ros: #Protobuf @@ -46,6 +49,7 @@ class Proto_msg_from_ros: # print(f"just published {message} to topic") if waitForAck: while mid not in cls.client.topic_ack: + # TODO: logging print("wait for ack") time.sleep(0.25) cls.client.topic_ack.remove(mid) @@ -75,7 +79,7 @@ class Json_msg_from_ros: dataImuUpdate = {"gyro_x": gyro_x, "gyro_y": gyro_y,"gyro_z":gyro_z, "accel_x": accel_x, "accel_y": accel_y, "accel_z": accel_z} cls.data.update(dataImuUpdate) # print ('gyro_x:'+gyro_x+'\n'+'gyro_y:'+gyro_y+'\n'+'gyro_z:'+gyro_z +'\n') - print ('accel_x:'+accel_x+'\n'+'accel_y:'+accel_y+'\n'+'accel_z:'+accel_z +'\n') + @classmethod def callBack_gps(cls, GPS): diff --git a/Stream/uav_proto_msg/utils/readConfig.py b/Stream/uav_proto_msg/utils/readConfig.py index 683b5e3..539b3a3 100644 --- a/Stream/uav_proto_msg/utils/readConfig.py +++ b/Stream/uav_proto_msg/utils/readConfig.py @@ -9,7 +9,6 @@ class Config: def setAttribute(self): with open(self.inFileName,"r") as f: self.ymlcfg=yaml.safe_load(f) - ecfgs = [self.ymlcfg.get(name) for name in self.sectionNames] if None in ecfgs: nameIndex = ecfgs.index(None) @@ -21,13 +20,13 @@ class Config: optval=ecfg[opt] #verify parameter type if type(optval) != self.options[opts][opt][0]: - raise Exception("Parameter {} has wrong type".format(self.opt)) + raise Exception("Parameter {} has wrong type".format(opt)) #create attributes on the fly setattr(self,opt,optval) else: if self.options[opts][opt][1]: - raise Exception("Missing mandatory parameter {}".format(self.opt)) + raise Exception("Missing mandatory parameter {}".format(opt)) else: setattr(self,opt,None) @@ -94,14 +93,38 @@ class Read_SUB_Config(Config): "logFileName":(str,False)}} self.setAttribute() +class Read_CMD_Config(Config): + def setAttribute(self): + super().setAttribute() + + def __init__(self, inFileName): + super().__init__(inFileName) + self.options = { + self.sectionNames[0]:{ + "msg_format": (str,True), + "MQTTClientNameCmd": (str,True), + "host": (str,True), + "port": (int,True), + "keepalive": (int,True), + "Cmd_Broadcast_topicToMqtt": (str,True), + "Cmd_Direct_topicToMqtt": (str,False), + "Cmd_Broadcast_topicToMqtt_QOS":(int,True), + "Cmd_Direct_topicToMqtt_QOS":(int,False)}, + self.sectionNames[1]:{ + "ROSClientNameCmd": (str,True), + "ROStopicName_Cmd_Broadcast_Receiver": (str,True), + "ROStopicName_Cmd_Direct_Receiver": (str,False)}, + self.sectionNames[2]:{ + "logFileName":(str,False)}} + self.setAttribute() + def __str__(self): return super().__str__() if __name__ == "__main__": - cfg=SUB_Config("mqttConfig_PUB.yml") - cfg.setAttribute() - print(cfg.msg_format) + cfg=Read_CMD_Config("mqttConfig_CMD.yml") + print(cfg)