diff --git a/.gitignore b/.gitignore index fcabc20..194c992 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ __pycache__/ -*pyc \ No newline at end of file +*pyc +*.log +*.png \ No newline at end of file diff --git a/Stream/measure_rtl/allinone.bash b/Stream/measure_rtl/allinone.bash new file mode 100755 index 0000000..97ffb9b --- /dev/null +++ b/Stream/measure_rtl/allinone.bash @@ -0,0 +1,11 @@ +#!/bin/bash +source /home/tony/anaconda3/etc/profile.d/conda.sh +conda activate testFeature +python delayClientA_SUB.py & +p1=$! +python delayClientB.py & +p2=$! +python delayClientA_PUB.py & +p3=$! +wait $p3 +killall python diff --git a/Stream/measure_rtl/delayClientA_PUB.py b/Stream/measure_rtl/delayClientA_PUB.py new file mode 100644 index 0000000..26e4351 --- /dev/null +++ b/Stream/measure_rtl/delayClientA_PUB.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +#coding:utf-8 +import paho.mqtt.client as mqtt +import os +import sys +import time +import utils + +import proto.flyformatioln_pb2 as flyformatioln_pb2 +import logging +from utils.protoJson_delayClientA_PUB import Json_msg +from utils.protoJson_delayClientA_PUB import Proto_msg + + + +def init_dataFormat(cfg:utils.Read_delayA_PUB_Config): + global pubFun + if cfg.msg_format == "Proto": + Proto_msg.client = client + Proto_msg.qos = cfg.qos + Proto_msg.Delay_topicToMqtt_PUB = cfg.Delay_topicToMqtt_PUB + cfg.logFileName = "Proto" + cfg.logFileName + pubFun = Proto_msg.callBack_gps + elif cfg.msg_format == "Json": + Json_msg.client = client + Json_msg.qos = cfg.qos + Json_msg.Delay_topicToMqtt_PUB = cfg.Delay_topicToMqtt_PUB + cfg.logFileName = "Json" + cfg.logFileName + pubFun = Json_msg.callBack_gps + else: + logging.debug("msg_format not found") + + +def on_connect(self, userdata, flags, rc): + logger.debug("Connected with result code " + str(rc)) + +def on_publish(self, userdata, mid): + # logger.debug("pub success") + pass + + +if __name__ == '__main__': + time.sleep(0.25) + # Read Config + FilePath = os.path.join(os.path.dirname(__file__),"utils","mqttConfig_delayA_PUB.yml") + cfg = utils.Read_delayA_PUB_Config(FilePath) + # Mqtt + client = utils.MQTTClient(cfg.MQTTClientNamePub) + pubFun = None + + + init_dataFormat(cfg) + + # set log + log_format = "%(asctime)s - %(levelname)s - %(message)s" + file_format = "%(message)s" + file_formatter = logging.Formatter(file_format) + formatter = logging.Formatter(log_format) + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + stream_handler.setLevel(logging.DEBUG) + + + file_handler = logging.FileHandler(cfg.logFileName, mode="w") + file_handler.setFormatter(file_formatter) + file_handler.setLevel(logging.INFO) + + logger = logging.getLogger("__PUB__") + logger.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + logger.debug(cfg) + # Mqtt + client.on_connect = on_connect + client.on_publish = on_publish + + client.connect(host=cfg.host, port=cfg.port, keepalive=cfg.keepalive) + client.loop_start() + + for i in range(cfg.msgAmount): + try: + pubFun() + time.sleep(cfg.msgInterval) + except KeyboardInterrupt as e: + client.disconnect() + logger.debug("Not elegeant of program") + sys.exit() + + client.disconnect() + logger.debug(cfg) + logger.debug("End of program") + sys.exit() \ No newline at end of file diff --git a/Stream/measure_rtl/delayClientA_SUB.py b/Stream/measure_rtl/delayClientA_SUB.py new file mode 100644 index 0000000..6df1c62 --- /dev/null +++ b/Stream/measure_rtl/delayClientA_SUB.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +#coding:utf-8 +import paho.mqtt.client as mqtt +import os +import sys +import time +import proto.flight_information_pb2 as flight_information_pb2 +import proto.flyformatioln_pb2 as flyformatioln_pb2 +from std_msgs.msg import String +import logging +import utils +from utils.protoJson_delayClientA_SUB import Json_msg +from utils.protoJson_delayClientA_SUB import Proto_msg + + +def init_dataFormat(cfg:utils.Read_delayB_SUB_Config): + if cfg.msg_format == "Proto": + Proto_msg.flight_information_msg = flight_information_pb2.flight_information_message() + client.on_message = Proto_msg.on_message + cfg.logFileName = "Proto" + cfg.logFileName + elif cfg.msg_format == "Json": + client.on_message = Json_msg.on_message + cfg.logFileName = "Json" + cfg.logFileName + else: + logger.debug("msg_format not found") + +def on_connect(self, userdata, flags, rc): + logger.debug("Connected with result code " + str(rc)) + client.subscribe(cfg.Respond_topicToMqtt_SUB, qos=cfg.qos) + +def on_disconnect(client, userdata, rc): + logger.debug("disconnecting reason " +str(rc)) + client.connected_flag=False + client.disconnect_flag=True + +def on_publish(self, userdata, mid): + pass + +if __name__ == '__main__': + # Read Config + FilePath = os.path.join(os.path.dirname(__file__),"utils","mqttConfig_delayA_SUB.yml") + cfg = utils.Read_delayA_SUB_Config(FilePath) + client = utils.MQTTClient(cfg.MQTTClientNameSub) + + + init_dataFormat(cfg) + # set log + log_format = "%(asctime)s - %(levelname)s - %(message)s" + file_foramt = "%(message)s" + + formatter = logging.Formatter(log_format) + file_formatter = logging.Formatter(file_foramt) + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + stream_handler.setLevel(logging.DEBUG) + + + file_handler = logging.FileHandler(cfg.logFileName, mode="w") + file_handler.setFormatter(file_formatter) + file_handler.setLevel(logging.INFO) + + logger = logging.getLogger("__delayA_SUB__") + logger.setLevel(logging.INFO) + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + logger.debug(cfg) + + # Mqtt + client.on_connect = on_connect + client.on_publish = on_publish + client.on_disconnect = on_disconnect + client.connect(host=cfg.host, port=cfg.port, keepalive=cfg.keepalive) + + + try: + client.loop_forever() + except KeyboardInterrupt as e: + client.loop_stop() + client.disconnect() + logger.debug("Not elegeant end of program") + sys.exit() + client.loop_stop() + client.disconnect() + logger.debug("End of program") + sys.exit() + diff --git a/Stream/measure_rtl/delayClientB.py b/Stream/measure_rtl/delayClientB.py new file mode 100644 index 0000000..f95afdb --- /dev/null +++ b/Stream/measure_rtl/delayClientB.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +#coding:utf-8 +import paho.mqtt.client as mqtt +import os +import sys +import time +import utils +import proto.flight_information_pb2 as flight_information_pb2 +import proto.flyformatioln_pb2 as flyformatioln_pb2 +import rospy +from std_msgs.msg import String +import logging +from utils.protoJson_delayClientB_SUB import Json_msg +from utils.protoJson_delayClientB_SUB import Proto_msg + +def init_dataFormat(cfg:utils.Read_delayB_SUB_Config): + if cfg.msg_format == "Proto": + Proto_msg.qos = cfg.qos + Proto_msg.flight_information_msg = flight_information_pb2.flight_information_message() + Proto_msg.fly_formation_msg = flyformatioln_pb2.fly_formation_message() + Proto_msg.client = client + client.on_message = Proto_msg.on_message + cfg.logFileName = "Proto" + cfg.logFileName + + Proto_msg.Delay_topicToMqtt_PUB = cfg.Delay_topicToMqtt_PUB + elif cfg.msg_format == "Json": + client.on_message = Json_msg.on_message + Json_msg.qos = cfg.qos + Json_msg.client = client + Json_msg.Delay_topicToMqtt_PUB = cfg.Delay_topicToMqtt_PUB + cfg.logFileName = "Json" + cfg.logFileName + else: + logger.debug("msg_format not found") + + +def on_connect(self, userdata, flags, rc): + logger.debug("Connected with result code " + str(rc)) + client.subscribe(cfg.Delay_topicToMqtt_SUB, qos=cfg.qos) + +def on_disconnect(client, userdata, rc): + # logger.info("disconnecting reason " +str(rc)) + client.connected_flag=False + client.disconnect_flag=True + +def on_publish(self, userdata, mid): + pass + + +if __name__ == '__main__': + # Read Config + FilePath = os.path.join(os.path.dirname(__file__),"utils","mqttConfig_delayB_SUB.yml") + cfg = utils.Read_delayB_SUB_Config(FilePath) + client = utils.MQTTClient(cfg.MQTTClientNameSub) + + # Mqtt + client = utils.MQTTClient(cfg.MQTTClientNameSub) + client.on_connect = on_connect + client.on_publish = on_publish + client.on_disconnect = on_disconnect + client.connect(host=cfg.host, port=cfg.port, keepalive=cfg.keepalive) + + + # initialize + init_dataFormat(cfg) + # set log + log_format = "%(asctime)s - %(levelname)s - %(message)s" + file_foramt = "%(message)s" + + formatter = logging.Formatter(log_format) + file_formatter = logging.Formatter(file_foramt) + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + stream_handler.setLevel(logging.DEBUG) + + + file_handler = logging.FileHandler(cfg.logFileName, mode="w") + file_handler.setFormatter(file_formatter) + file_handler.setLevel(logging.INFO) + + logger = logging.getLogger("__SUB__") + logger.setLevel(logging.INFO) + logger.addHandler(file_handler) + logger.addHandler(stream_handler) + + logger.debug(cfg) + + try: + client.loop_forever() + # rospy.spin() + except KeyboardInterrupt as e: + client.loop_stop() + client.disconnect() + logger.debug("Not elegeant end of program") + sys.exit() + client.loop_stop() + client.disconnect() + logger.debug("End of program") + sys.exit() + diff --git a/Stream/measure_rtl/proto/duration_pb2.py b/Stream/measure_rtl/proto/duration_pb2.py new file mode 100644 index 0000000..1a30c51 --- /dev/null +++ b/Stream/measure_rtl/proto/duration_pb2.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: duration.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='duration.proto', + package='google.protobuf', + syntax='proto3', + serialized_options=b'\n\023com.google.protobufB\rDurationProtoP\001Z1google.golang.org/protobuf/types/known/durationpb\370\001\001\242\002\003GPB\252\002\036Google.Protobuf.WellKnownTypes', + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\x0e\x64uration.proto\x12\x0fgoogle.protobuf\"*\n\x08\x44uration\x12\x0f\n\x07seconds\x18\x01 \x01(\x03\x12\r\n\x05nanos\x18\x02 \x01(\x05\x42\x83\x01\n\x13\x63om.google.protobufB\rDurationProtoP\x01Z1google.golang.org/protobuf/types/known/durationpb\xf8\x01\x01\xa2\x02\x03GPB\xaa\x02\x1eGoogle.Protobuf.WellKnownTypesb\x06proto3' +) + + + + +_DURATION = _descriptor.Descriptor( + name='Duration', + full_name='google.protobuf.Duration', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='seconds', full_name='google.protobuf.Duration.seconds', index=0, + number=1, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='nanos', full_name='google.protobuf.Duration.nanos', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=35, + serialized_end=77, +) + +DESCRIPTOR.message_types_by_name['Duration'] = _DURATION +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Duration = _reflection.GeneratedProtocolMessageType('Duration', (_message.Message,), { + 'DESCRIPTOR' : _DURATION, + '__module__' : 'duration_pb2' + # @@protoc_insertion_point(class_scope:google.protobuf.Duration) + }) +_sym_db.RegisterMessage(Duration) + + +DESCRIPTOR._options = None +# @@protoc_insertion_point(module_scope) diff --git a/Stream/measure_rtl/proto/flight_information.proto b/Stream/measure_rtl/proto/flight_information.proto new file mode 100644 index 0000000..620a8c8 --- /dev/null +++ b/Stream/measure_rtl/proto/flight_information.proto @@ -0,0 +1,14 @@ +syntax = 'proto3'; +package flight_information_proto; +// GPS + compass + +message GPS { + optional float LAT = 1; + optional float LON = 2; + optional float ALT = 3; +} + +message flight_information_message { + GPS gps = 1; + optional float heading = 2; +} \ No newline at end of file diff --git a/Stream/measure_rtl/proto/flight_information_pb2.py b/Stream/measure_rtl/proto/flight_information_pb2.py new file mode 100644 index 0000000..d35aec8 --- /dev/null +++ b/Stream/measure_rtl/proto/flight_information_pb2.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: flight_information.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66light_information.proto\x12\x18\x66light_information_proto\"S\n\x03GPS\x12\x10\n\x03LAT\x18\x01 \x01(\x02H\x00\x88\x01\x01\x12\x10\n\x03LON\x18\x02 \x01(\x02H\x01\x88\x01\x01\x12\x10\n\x03\x41LT\x18\x03 \x01(\x02H\x02\x88\x01\x01\x42\x06\n\x04_LATB\x06\n\x04_LONB\x06\n\x04_ALT\"j\n\x1a\x66light_information_message\x12*\n\x03gps\x18\x01 \x01(\x0b\x32\x1d.flight_information_proto.GPS\x12\x14\n\x07heading\x18\x02 \x01(\x02H\x00\x88\x01\x01\x42\n\n\x08_headingb\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'flight_information_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _GPS._serialized_start=54 + _GPS._serialized_end=137 + _FLIGHT_INFORMATION_MESSAGE._serialized_start=139 + _FLIGHT_INFORMATION_MESSAGE._serialized_end=245 +# @@protoc_insertion_point(module_scope) diff --git a/Stream/measure_rtl/proto/flyformatioln.proto b/Stream/measure_rtl/proto/flyformatioln.proto new file mode 100644 index 0000000..56a6d58 --- /dev/null +++ b/Stream/measure_rtl/proto/flyformatioln.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; +package fly_formation_proto; +// leader only + +enum FLY_FORMATION{ + FLY_FORMATION_UNSPECIFIED = 0; + FLY_FORMATION_v = 1; + FLY_FORMATION_X = 2; + FLY_FORMATION_O = 3; + FLY_FORMATION_LINE = 4; + FLY_FORMATION_ROW = 5; + FLY_FORMATION_HEX = 6; + } + message fly_formation_message{ + optional float velocity = 1; + FLY_FORMATION fly_formation= 2; + } \ No newline at end of file diff --git a/Stream/measure_rtl/proto/flyformatioln_pb2.py b/Stream/measure_rtl/proto/flyformatioln_pb2.py new file mode 100644 index 0000000..34014f4 --- /dev/null +++ b/Stream/measure_rtl/proto/flyformatioln_pb2.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: flyformatioln.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13\x66lyformatioln.proto\x12\x13\x66ly_formation_proto\"v\n\x15\x66ly_formation_message\x12\x15\n\x08velocity\x18\x01 \x01(\x02H\x00\x88\x01\x01\x12\x39\n\rfly_formation\x18\x02 \x01(\x0e\x32\".fly_formation_proto.FLY_FORMATIONB\x0b\n\t_velocity*\xb3\x01\n\rFLY_FORMATION\x12\x1d\n\x19\x46LY_FORMATION_UNSPECIFIED\x10\x00\x12\x13\n\x0f\x46LY_FORMATION_v\x10\x01\x12\x13\n\x0f\x46LY_FORMATION_X\x10\x02\x12\x13\n\x0f\x46LY_FORMATION_O\x10\x03\x12\x16\n\x12\x46LY_FORMATION_LINE\x10\x04\x12\x15\n\x11\x46LY_FORMATION_ROW\x10\x05\x12\x15\n\x11\x46LY_FORMATION_HEX\x10\x06\x62\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'flyformatioln_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _FLY_FORMATION._serialized_start=165 + _FLY_FORMATION._serialized_end=344 + _FLY_FORMATION_MESSAGE._serialized_start=44 + _FLY_FORMATION_MESSAGE._serialized_end=162 +# @@protoc_insertion_point(module_scope) diff --git a/Stream/measure_rtl/utils/__init__.py b/Stream/measure_rtl/utils/__init__.py new file mode 100644 index 0000000..259b419 --- /dev/null +++ b/Stream/measure_rtl/utils/__init__.py @@ -0,0 +1,6 @@ +from utils.readConfig import Read_delayA_PUB_Config +from utils.readConfig import Read_delayB_SUB_Config +from utils.readConfig import Read_delayA_SUB_Config +from utils.basicMqtt import MQTTClient + + diff --git a/Stream/measure_rtl/utils/basicMqtt.py b/Stream/measure_rtl/utils/basicMqtt.py new file mode 100644 index 0000000..572e84e --- /dev/null +++ b/Stream/measure_rtl/utils/basicMqtt.py @@ -0,0 +1,17 @@ +import paho.mqtt.client as mqtt +import time +class MQTTClient(mqtt.Client): + + def __init__(self,cname,**kwargs): + super().__init__(cname,**kwargs) + self.last_pub_time=time.time() + self.topic_ack=[] + self.run_flag=True + self.subscribe_flag=False + self.bad_connection_flag=False + self.connected_flag=True + self.disconnect_flag=False + self.disconnect_time=0.0 + self.pub_msg_count=0 + self.devices=[] + diff --git a/Stream/measure_rtl/utils/mqttConfig_delayA_PUB.yml b/Stream/measure_rtl/utils/mqttConfig_delayA_PUB.yml new file mode 100644 index 0000000..19586f5 --- /dev/null +++ b/Stream/measure_rtl/utils/mqttConfig_delayA_PUB.yml @@ -0,0 +1,15 @@ +MQTT: + msg_format: Proto + # msg_format: Json + qos: 1 + MQTTClientNamePub: delayA_PUB + host: 192.168.50.117 + port: 1883 + keepalive: 60 + msgAmount: 1000 + msgInterval: 0.01 + # Mqtt topic + Delay_topicToMqtt_PUB: delayA_PUB/delay + #ROS +LOG: + logFileName: delayA_PUB.log diff --git a/Stream/measure_rtl/utils/mqttConfig_delayA_SUB.yml b/Stream/measure_rtl/utils/mqttConfig_delayA_SUB.yml new file mode 100644 index 0000000..4872643 --- /dev/null +++ b/Stream/measure_rtl/utils/mqttConfig_delayA_SUB.yml @@ -0,0 +1,12 @@ +MQTT: + msg_format: Proto + # msg_format: Json + qos: 1 + MQTTClientNameSub: delayA_SUB + host: 192.168.50.117 + port: 1883 + keepalive: 60 + # Mqtt topic + Respond_topicToMqtt_SUB: delayB/respond +LOG: + logFileName: delayA_SUB.log diff --git a/Stream/measure_rtl/utils/mqttConfig_delayB_SUB.yml b/Stream/measure_rtl/utils/mqttConfig_delayB_SUB.yml new file mode 100644 index 0000000..bb00d06 --- /dev/null +++ b/Stream/measure_rtl/utils/mqttConfig_delayB_SUB.yml @@ -0,0 +1,13 @@ +MQTT: + msg_format: Proto + # msg_format: Json + qos: 1 + MQTTClientNameSub: delayB_SUB + host: 192.168.50.117 + port: 1883 + keepalive: 60 + # Mqtt topic + Delay_topicToMqtt_SUB: delayA_PUB/delay + Delay_topicToMqtt_PUB: delayB/respond +LOG: + logFileName: delayB_SUB.log diff --git a/Stream/measure_rtl/utils/protoJson_delayClientA_PUB.py b/Stream/measure_rtl/utils/protoJson_delayClientA_PUB.py new file mode 100644 index 0000000..68ae9dd --- /dev/null +++ b/Stream/measure_rtl/utils/protoJson_delayClientA_PUB.py @@ -0,0 +1,121 @@ +import orjson +import time +import logging +import proto.flight_information_pb2 as flight_information_pb2 +logger = logging.getLogger("__PUB__") + +class Proto_msg: + #Protobuf + flight_information_msg = flight_information_pb2.flight_information_message() + flight_information_msg.gps.LAT = 100000000 + flight_information_msg.gps.LON = 200000000 + flight_information_msg.gps.ALT = 300000000 + timePub = 0 + #Mqtt + client = None + Delay_topicToMqtt_PUB = None + count = 0 + qos = None + + @classmethod + def callBack_gps(cls): + cls.flight_information_msg.heading = cls.count + flightInformationMsg = cls.flight_information_msg.SerializeToString() + cls.mqtt_Pub(message=flightInformationMsg, topics=cls.Delay_topicToMqtt_PUB) + + + @classmethod + def callBack_compass_hdg(cls, Compass): + cls.flight_information_msg.heading = Compass.data + + flightInformationMsg = cls.flight_information_msg.SerializeToString() + cls.mqtt_Pub(message=flightInformationMsg, topics=cls.Delay_topicToMqtt_PUB) + + + # TODO: Formation.velocity check + @classmethod + def callBack_fly_formation(cls, Formation): + cls.fly_formation_msg.velocity = Formation.velocity + # cls.fly_formation_msg.fly_formation = flyformatioln_pb2.FLY_FORMATION_v + cls.fly_formation_msg.fly_formation = Formation.type + flyFormationMsg = cls.fly_formation_msg.SerializeToString() + cls.mqtt_Pub(message=flyFormationMsg, topics=cls.Delay_topicToMqtt_PUB) + + + + @classmethod + # publish a message + def mqtt_Pub(cls, message:bytes, topics:str, waitForAck:bool=False)->None: + cls.timePub = time.perf_counter_ns() + mid = cls.client.publish(topics, message, cls.qos)[1] + logger.info("{} {}".format(cls.count,cls.timePub)) + cls.count += 1 + # print(f"just published {message} to topic") + if waitForAck: + while mid not in cls.client.topic_ack: + # TODO: logging + print("wait for ack") + time.sleep(0.25) + cls.client.topic_ack.remove(mid) + + + + +class Json_msg: + GPS_Data = {"lat": 100000000, "lon": 200000000, "alt": 300000000, "count":0} + count = 0 + #Mqtt + client = None + Delay_topicToMqtt_PUB = None + timePub = 0 + qos = None + + + @classmethod + def callBack_imu(cls, IMU): + gyro_x=str(IMU.linear_acceleration.x) + gyro_y=str(IMU.linear_acceleration.y) + gyro_z=str(IMU.linear_acceleration.z) + + accel_x=str(IMU.angular_velocity.x) + accel_y=str(IMU.angular_velocity.y) + accel_z=str(IMU.angular_velocity.z) + + dataImuUpdate = {"gyro_x": gyro_x, "gyro_y": gyro_y,"gyro_z":gyro_z, "accel_x": accel_x, "accel_y": accel_y, "accel_z": accel_z} + cls.data.update(dataImuUpdate) + # print ('gyro_x:'+gyro_x+'\n'+'gyro_y:'+gyro_y+'\n'+'gyro_z:'+gyro_z +'\n') + + + @classmethod + def callBack_gps(cls): + cls.GPS_Data["count"] = cls.count + dataJsonFormate = orjson.dumps(cls.GPS_Data) + cls.mqtt_Pub(message=dataJsonFormate, topics=cls.Delay_topicToMqtt_PUB) + # print ('lat:'+lat+'\n'+'lon:'+lon+'\n') + + + @classmethod + def callBack_compass_hdg(cls, Compass): + heading = int(Compass.data*100) + dataGpsUpdate = {"heading": heading} + cls.GPS_Data.update(dataGpsUpdate) + dataJsonFormate = orjson.dumps(cls.GPS_Data) + cls.mqtt_Pub(message=dataJsonFormate, topics=cls.Flight_Information_topicToMqtt) + + @classmethod + def callBack_fly_formation(cls, Formation): + Formation_data = {"velocity": Formation.velocity, "type": Formation.type} + flyFormationMsg = orjson.dumps(Formation_data) + cls.mqtt_Pub(message=flyFormationMsg, topics=cls.Fly_Formation_topicToMqtt) + + @classmethod + def mqtt_Pub(cls, message:str, topics:str, waitForAck:bool=False): + cls.timePub = time.perf_counter_ns() + mid = cls.client.publish(topics, message, cls.qos)[1] + logger.info("{} {}".format(cls.count,cls.timePub)) + cls.count += 1 + # if waitForAck: + # while mid not in cls.client.topic_ack: + # print("wait for ack") + # time.sleep(0.25) + # cls.client.topic_ack.remove(mid) \ No newline at end of file diff --git a/Stream/measure_rtl/utils/protoJson_delayClientA_SUB.py b/Stream/measure_rtl/utils/protoJson_delayClientA_SUB.py new file mode 100644 index 0000000..310c97c --- /dev/null +++ b/Stream/measure_rtl/utils/protoJson_delayClientA_SUB.py @@ -0,0 +1,36 @@ +import orjson +import time + +import proto.flight_information_pb2 as flight_information_pb2 +import proto.flyformatioln_pb2 as flyformatioln_pb2 +import google.protobuf.json_format as json_format +import logging + + +logger = logging.getLogger("__delayA_SUB__") + +class Proto_msg: + #Protobuf + flight_information_msg = None + + timeRcv = 0 + msgCounter = None + + @classmethod + def on_message(cls, client, userdata, msg): + cls.timeRcv = time.perf_counter_ns() + cls.msgCounter = int(cls.flight_information_msg.FromString(msg.payload).heading) + logger.info("{} {}".format(cls.msgCounter, cls.timeRcv)) + + + +class Json_msg: + timeRcv = 0 + msgCounter = None + + @classmethod + def on_message(cls, client, userdata, msg): + cls.timeRcv = time.perf_counter_ns() + cls.msgCounter = orjson.loads(msg.payload)["count"] + logger.info("{} {}".format(cls.msgCounter, cls.timeRcv)) + diff --git a/Stream/measure_rtl/utils/protoJson_delayClientB_SUB.py b/Stream/measure_rtl/utils/protoJson_delayClientB_SUB.py new file mode 100644 index 0000000..eafe9e5 --- /dev/null +++ b/Stream/measure_rtl/utils/protoJson_delayClientB_SUB.py @@ -0,0 +1,63 @@ +import orjson +import time + +import proto.flight_information_pb2 as flight_information_pb2 +import proto.flyformatioln_pb2 as flyformatioln_pb2 +import google.protobuf.json_format as json_format +import logging + +# TODO: use native ros type instead of json or str +logger = logging.getLogger("__SUB__") + +class Proto_msg: + #Protobuf + flight_information_msg = None + fly_formation_msg = None # delcare in function + + #Mqtt topic: check data from which topic + Delay_topicToMqtt_PUB = None + qos = None + + client = None + timeRcv = 0 + timeRsp = 0 + + msgCounter = None + + @classmethod + def mqtt_Pub(cls, msg, waitForAck:bool=False): + cls.timeRsp = time.perf_counter_ns() + mid = cls.client.publish(cls.Delay_topicToMqtt_PUB, msg, cls.qos)[1] + timeDiff = cls.timeRsp - cls.timeRcv + logger.info("{} {}".format(cls.msgCounter, timeDiff)) + + @classmethod + def on_message(cls, client, userdata, msg): + cls.timeRcv = time.perf_counter_ns() + cls.msgCounter = int(cls.flight_information_msg.FromString(msg.payload).heading) + cls.mqtt_Pub(msg=msg.payload) + +class Json_msg: + #Mqtt topic: check data from which topic + Delay_topicToMqtt_PUB = None + + qos = None + client = None + timeRcv = 0 + timeRsp = 0 + + msgCounter = None + + @classmethod + def mqtt_Pub(cls, msg, waitForAck:bool=False): + cls.timeRsp = time.perf_counter_ns() + mid = cls.client.publish(cls.Delay_topicToMqtt_PUB, msg, cls.qos)[1] + timeDiff = cls.timeRsp - cls.timeRcv + logger.info("{} {}".format(cls.msgCounter, timeDiff)) + + + @classmethod + def on_message(cls, client, userdata, msg): + cls.timeRcv = time.perf_counter_ns() + cls.msgCounter = orjson.loads(msg.payload)["count"] + cls.mqtt_Pub(msg=msg.payload) \ No newline at end of file diff --git a/Stream/measure_rtl/utils/readConfig.py b/Stream/measure_rtl/utils/readConfig.py new file mode 100644 index 0000000..87494d8 --- /dev/null +++ b/Stream/measure_rtl/utils/readConfig.py @@ -0,0 +1,109 @@ +import yaml + +class Config: + def __init__(self, inFileName): + self.sectionNames = ["MQTT", "LOG"] + self.options = {} + self.inFileName = inFileName + + def setAttribute(self): + with open(self.inFileName,"r") as f: + self.ymlcfg=yaml.safe_load(f) + ecfgs = [self.ymlcfg.get(name) for name in self.sectionNames] + if None in ecfgs: + nameIndex = ecfgs.index(None) + raise Exception("Missing {} section in cfg file".format(self.sectionNames[nameIndex])) + #iterate over options + for opts, ecfg in zip(self.options, ecfgs): + for opt in self.options[opts]: + if opt in ecfg: + optval=ecfg[opt] + #verify parameter type + if type(optval) != self.options[opts][opt][0]: + raise Exception("Parameter {} has wrong type".format(opt)) + + #create attributes on the fly + setattr(self,opt,optval) + else: + if self.options[opts][opt][1]: + raise Exception("Missing mandatory parameter {}".format(opt)) + else: + setattr(self,opt,None) + + def __str__(self): + return str(yaml.dump(self.ymlcfg, default_flow_style=False)) + + +class Read_delayA_PUB_Config(Config): + def setAttribute(self): + super().setAttribute() + + def __init__(self, inFileName): + super().__init__(inFileName) + self.options = { + self.sectionNames[0]:{ + "msg_format": (str,True), + "MQTTClientNamePub": (str,True), + "host": (str,True), + "port": (int,True), + "keepalive": (int,True), + "Delay_topicToMqtt_PUB": (str,False), + "msgInterval": (float,True), + "msgAmount": (int,True), + "qos": (int,True)}, + self.sectionNames[1]:{ + "logFileName":(str,False)}} + self.setAttribute() + + def __str__(self): + return super().__str__() + +class Read_delayB_SUB_Config(Config): + def setAttribute(self): + super().setAttribute() + + def __init__(self, inFileName): + super().__init__(inFileName) + self.options = { + self.sectionNames[0]:{ + "msg_format": (str,True), + "MQTTClientNameSub": (str,True), + "host": (str,True), + "port": (int,True), + "keepalive": (int,True), + "Delay_topicToMqtt_SUB":(str,True), + "Delay_topicToMqtt_PUB":(str,True), + "qos": (int,True)}, + self.sectionNames[1]:{ + "logFileName":(str,False)}} + self.setAttribute() + +class Read_delayA_SUB_Config(Config): + def setAttribute(self): + super().setAttribute() + + def __init__(self, inFileName): + super().__init__(inFileName) + self.options = { + self.sectionNames[0]:{ + "msg_format": (str,True), + "MQTTClientNameSub": (str,True), + "host": (str,True), + "port": (int,True), + "keepalive": (int,True), + "Respond_topicToMqtt_SUB":(str,True), + "qos": (int,True)}, + self.sectionNames[1]:{ + "logFileName":(str,False)}} + self.setAttribute() + + def __str__(self): + return super().__str__() + + + +if __name__ == "__main__": + cfg=Read_delayA_SUB_Config("mqttConfig_CMD.yml") + print(cfg) + + diff --git a/Stream/uav_proto_msg/mqttCmdMain.py b/Stream/uav_proto_msg/mqttCmdMain.py index 22ce926..3164eba 100644 --- a/Stream/uav_proto_msg/mqttCmdMain.py +++ b/Stream/uav_proto_msg/mqttCmdMain.py @@ -15,13 +15,13 @@ import rospy def init_dataFormat(cfg:Read_CMD_Config): if cfg.msg_format == "Json": # Json_msg_to_ros.rate = rospy.Rate(10) - # Json_msg_to_ros.publisher_Flight_Information = rospy.Publisher(cfg.ROStopicName_Cmd_Broadcast_Receiver,String,queue_size=10) + # Json_msg_to_ros.publisher_Cmd_Broadcast = rospy.Publisher(cfg.ROStopicName_Cmd_Broadcast_Receiver,String,queue_size=10) client.on_message = Json_msg_to_ros.on_message Json_msg_to_ros.Cmd_Broadcast_topicToMqtt = cfg.Cmd_Broadcast_topicToMqtt elif cfg.msg_format == "Proto": pass else: - logging.debug("msg_format not found") + logger.debug("msg_format not found") def on_connect(self, userdata, flags, rc): logger.info("Connected with result code " + str(rc)) diff --git a/Stream/uav_proto_msg/protomsg/ComparePerformance.py b/Stream/uav_proto_msg/protomsg/ComparePerformance.py deleted file mode 100644 index 9de519a..0000000 --- a/Stream/uav_proto_msg/protomsg/ComparePerformance.py +++ /dev/null @@ -1,69 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt -import orjson - - -def readData(fileName:str) -> dict: - with open(fileName, "r") as f: - data_dict = orjson.loads(f.read()) - return data_dict - - - - - -def plotSizeHist(size_data_dict:dict): - postion = 421 - for OutterKey in size_data_dict: - plt.subplot(postion) - if postion == 428: - plt.ylim(top=440) - else: - plt.ylim(top=140) - postion += 1 - size = list(size_data_dict[OutterKey].values()) - width=0.3 - x = [''] - x1=np.arange(len(x)) - y1=(size[0]) - x2=[p + width for p in x1] - y2=(size[1]) - plt.bar(x1, y1, label='Json', width=0.1) - plt.bar(x2, y2, label='protobuf', width=0.1) - plt.xticks([p + width/2 for p in x1], x) - plt.legend() - plt.title(OutterKey) - plt.ylabel('MB') - figure = plt.gcf() # get current figure - figure.set_size_inches(16, 14) - plt.savefig("performancePlot/size.png") - plt.show() - - - -def plotTimeHist(time_data_dict:dict): - postion = 211 - for OutterKey in time_data_dict: - plt.subplot(postion) - postion += 1 - size = list(time_data_dict[OutterKey].values()) - width=0.3 - x = [''] - x1=np.arange(len(x)) - y1=(size[0]) - x2=[p + width for p in x1] - y2=(size[1]) - plt.bar(x1, y1, label='Json', width=0.1) - plt.bar(x2, y2, label='protobuf', width=0.1) - plt.xticks([p + width/2 for p in x1], x) - plt.legend() - plt.title(OutterKey) - plt.ylabel('second') - - plt.savefig("performancePlot/time.png") - plt.show() - -if __name__ == "__main__": - fileName_size, fileName_time = "Data/sizeData", "Data/timeData" - plotSizeHist(readData(fileName_size)) - plotTimeHist(readData(fileName_time)) \ No newline at end of file diff --git a/Stream/uav_proto_msg/protomsg/rtl_test.py b/Stream/uav_proto_msg/protomsg/rtl_test.py deleted file mode 100644 index d53b170..0000000 --- a/Stream/uav_proto_msg/protomsg/rtl_test.py +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env python3 -#coding:utf-8 -import paho.mqtt.client as mqtt -import os -import sys -import time -import utils -import proto.flight_information_pb2 as flight_information_pb2 -import proto.flyformatioln_pb2 as flyformatioln_pb2 -import logging - - -def init_dataFormat(cfg:utils.Read_PUB_Config): - # ros_namespace="/drone1" - if cfg.msg_format == "Proto": - utils.Proto_msg_from_ros.client = client - utils.Proto_msg_from_ros.flight_information_msg = flight_information_pb2.flight_information_message() - utils.Proto_msg_from_ros.fly_formation_msg = flyformatioln_pb2.fly_formation_message() - utils.Proto_msg_from_ros.Flight_Information_topicToMqtt = cfg.Flight_Information_topicToMqtt - utils.Proto_msg_from_ros.Fly_Formation_topicToMqtt = cfg.Fly_Formation_topicToMqtt - utils.Proto_msg_from_ros.Fly_Formation_topicToMqtt_QOS = cfg.Fly_Formation_topicToMqtt_QOS - # rospy.Subscriber(ros_namespace+'/mavros/global_position/global', NavSatFix, utils.Proto_msg_from_ros.callBack_gps) - # rospy.Subscriber(ros_namespace+'/mavros/global_position/compass_hdg', Float64, utils.Proto_msg_from_ros.callBack_compass_hdg) - elif cfg.msg_format == "Json": - utils.Json_msg_from_ros.client = client - utils.Json_msg_from_ros.Flight_Information_topicToMqtt = cfg.Flight_Information_topicToMqtt - utils.Json_msg_from_ros.Fly_Formation_topicToMqtt = cfg.Fly_Formation_topicToMqtt - utils.Json_msg_from_ros.Fly_Formation_topicToMqtt_QOS = cfg.Fly_Formation_topicToMqtt_QOS - # rospy.Subscriber(ros_namespace+'/mavros/global_position/global', NavSatFix, utils.Json_msg_from_ros.callBack_gps) - # rospy.Subscriber(ros_namespace+'/mavros/global_position/compass_hdg', Float64, utils.Json_msg_from_ros.callBack_compass_hdg) - else: - logging.debug("msg_format not found") \ No newline at end of file diff --git a/Stream/uav_proto_msg/protomsg/timeit_test.py b/Stream/uav_proto_msg/protomsg/timeit_test.py index 6231f9f..440ad97 100644 --- a/Stream/uav_proto_msg/protomsg/timeit_test.py +++ b/Stream/uav_proto_msg/protomsg/timeit_test.py @@ -3,7 +3,7 @@ import proto.flight_information_pb2 as flight_information_pb2 import google.protobuf.json_format as json_format import timeit import orjson -import json + a = '{"gps": {"LAT": 34123.125, "LON": 23423.123, "ALT": 12123.123}, "heading": 155.12215}' data ={} diff --git a/Stream/uav_proto_msg/utils/protoJson_mqtt_cmd_data_to_ros.py b/Stream/uav_proto_msg/utils/protoJson_mqtt_cmd_data_to_ros.py index 9088fe5..13b7731 100644 --- a/Stream/uav_proto_msg/utils/protoJson_mqtt_cmd_data_to_ros.py +++ b/Stream/uav_proto_msg/utils/protoJson_mqtt_cmd_data_to_ros.py @@ -10,9 +10,9 @@ class Proto_msg_to_ros: pass class Json_msg_to_ros: - rate = None + # rate = None # Ros publisher - publisher_Cmd_Broadcast = None + # publisher_Cmd_Broadcast = None Cmd_Broadcast_topicToMqtt = None