rtl measure py file

main
RangeOfGlitching 3 years ago
parent 3417812d5b
commit ba5f97096f

4
.gitignore vendored

@ -1,2 +1,4 @@
__pycache__/
*pyc
*pyc
*.log
*.png

@ -0,0 +1,11 @@
#!/bin/bash
source /home/tony/anaconda3/etc/profile.d/conda.sh
conda activate testFeature
python delayClientA_SUB.py &
p1=$!
python delayClientB.py &
p2=$!
python delayClientA_PUB.py &
p3=$!
wait $p3
killall python

@ -0,0 +1,94 @@
#!/usr/bin/env python3
#coding:utf-8
import paho.mqtt.client as mqtt
import os
import sys
import time
import utils
import proto.flyformatioln_pb2 as flyformatioln_pb2
import logging
from utils.protoJson_delayClientA_PUB import Json_msg
from utils.protoJson_delayClientA_PUB import Proto_msg
def init_dataFormat(cfg:utils.Read_delayA_PUB_Config):
global pubFun
if cfg.msg_format == "Proto":
Proto_msg.client = client
Proto_msg.qos = cfg.qos
Proto_msg.Delay_topicToMqtt_PUB = cfg.Delay_topicToMqtt_PUB
cfg.logFileName = "Proto" + cfg.logFileName
pubFun = Proto_msg.callBack_gps
elif cfg.msg_format == "Json":
Json_msg.client = client
Json_msg.qos = cfg.qos
Json_msg.Delay_topicToMqtt_PUB = cfg.Delay_topicToMqtt_PUB
cfg.logFileName = "Json" + cfg.logFileName
pubFun = Json_msg.callBack_gps
else:
logging.debug("msg_format not found")
def on_connect(self, userdata, flags, rc):
logger.debug("Connected with result code " + str(rc))
def on_publish(self, userdata, mid):
# logger.debug("pub success")
pass
if __name__ == '__main__':
time.sleep(0.25)
# Read Config
FilePath = os.path.join(os.path.dirname(__file__),"utils","mqttConfig_delayA_PUB.yml")
cfg = utils.Read_delayA_PUB_Config(FilePath)
# Mqtt
client = utils.MQTTClient(cfg.MQTTClientNamePub)
pubFun = None
init_dataFormat(cfg)
# set log
log_format = "%(asctime)s - %(levelname)s - %(message)s"
file_format = "%(message)s"
file_formatter = logging.Formatter(file_format)
formatter = logging.Formatter(log_format)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(cfg.logFileName, mode="w")
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.INFO)
logger = logging.getLogger("__PUB__")
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
logger.debug(cfg)
# Mqtt
client.on_connect = on_connect
client.on_publish = on_publish
client.connect(host=cfg.host, port=cfg.port, keepalive=cfg.keepalive)
client.loop_start()
for i in range(cfg.msgAmount):
try:
pubFun()
time.sleep(cfg.msgInterval)
except KeyboardInterrupt as e:
client.disconnect()
logger.debug("Not elegeant of program")
sys.exit()
client.disconnect()
logger.debug(cfg)
logger.debug("End of program")
sys.exit()

@ -0,0 +1,88 @@
#!/usr/bin/env python3
#coding:utf-8
import paho.mqtt.client as mqtt
import os
import sys
import time
import proto.flight_information_pb2 as flight_information_pb2
import proto.flyformatioln_pb2 as flyformatioln_pb2
from std_msgs.msg import String
import logging
import utils
from utils.protoJson_delayClientA_SUB import Json_msg
from utils.protoJson_delayClientA_SUB import Proto_msg
def init_dataFormat(cfg:utils.Read_delayB_SUB_Config):
if cfg.msg_format == "Proto":
Proto_msg.flight_information_msg = flight_information_pb2.flight_information_message()
client.on_message = Proto_msg.on_message
cfg.logFileName = "Proto" + cfg.logFileName
elif cfg.msg_format == "Json":
client.on_message = Json_msg.on_message
cfg.logFileName = "Json" + cfg.logFileName
else:
logger.debug("msg_format not found")
def on_connect(self, userdata, flags, rc):
logger.debug("Connected with result code " + str(rc))
client.subscribe(cfg.Respond_topicToMqtt_SUB, qos=cfg.qos)
def on_disconnect(client, userdata, rc):
logger.debug("disconnecting reason " +str(rc))
client.connected_flag=False
client.disconnect_flag=True
def on_publish(self, userdata, mid):
pass
if __name__ == '__main__':
# Read Config
FilePath = os.path.join(os.path.dirname(__file__),"utils","mqttConfig_delayA_SUB.yml")
cfg = utils.Read_delayA_SUB_Config(FilePath)
client = utils.MQTTClient(cfg.MQTTClientNameSub)
init_dataFormat(cfg)
# set log
log_format = "%(asctime)s - %(levelname)s - %(message)s"
file_foramt = "%(message)s"
formatter = logging.Formatter(log_format)
file_formatter = logging.Formatter(file_foramt)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(cfg.logFileName, mode="w")
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.INFO)
logger = logging.getLogger("__delayA_SUB__")
logger.setLevel(logging.INFO)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
logger.debug(cfg)
# Mqtt
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.connect(host=cfg.host, port=cfg.port, keepalive=cfg.keepalive)
try:
client.loop_forever()
except KeyboardInterrupt as e:
client.loop_stop()
client.disconnect()
logger.debug("Not elegeant end of program")
sys.exit()
client.loop_stop()
client.disconnect()
logger.debug("End of program")
sys.exit()

@ -0,0 +1,100 @@
#!/usr/bin/env python3
#coding:utf-8
import paho.mqtt.client as mqtt
import os
import sys
import time
import utils
import proto.flight_information_pb2 as flight_information_pb2
import proto.flyformatioln_pb2 as flyformatioln_pb2
import rospy
from std_msgs.msg import String
import logging
from utils.protoJson_delayClientB_SUB import Json_msg
from utils.protoJson_delayClientB_SUB import Proto_msg
def init_dataFormat(cfg:utils.Read_delayB_SUB_Config):
if cfg.msg_format == "Proto":
Proto_msg.qos = cfg.qos
Proto_msg.flight_information_msg = flight_information_pb2.flight_information_message()
Proto_msg.fly_formation_msg = flyformatioln_pb2.fly_formation_message()
Proto_msg.client = client
client.on_message = Proto_msg.on_message
cfg.logFileName = "Proto" + cfg.logFileName
Proto_msg.Delay_topicToMqtt_PUB = cfg.Delay_topicToMqtt_PUB
elif cfg.msg_format == "Json":
client.on_message = Json_msg.on_message
Json_msg.qos = cfg.qos
Json_msg.client = client
Json_msg.Delay_topicToMqtt_PUB = cfg.Delay_topicToMqtt_PUB
cfg.logFileName = "Json" + cfg.logFileName
else:
logger.debug("msg_format not found")
def on_connect(self, userdata, flags, rc):
logger.debug("Connected with result code " + str(rc))
client.subscribe(cfg.Delay_topicToMqtt_SUB, qos=cfg.qos)
def on_disconnect(client, userdata, rc):
# logger.info("disconnecting reason " +str(rc))
client.connected_flag=False
client.disconnect_flag=True
def on_publish(self, userdata, mid):
pass
if __name__ == '__main__':
# Read Config
FilePath = os.path.join(os.path.dirname(__file__),"utils","mqttConfig_delayB_SUB.yml")
cfg = utils.Read_delayB_SUB_Config(FilePath)
client = utils.MQTTClient(cfg.MQTTClientNameSub)
# Mqtt
client = utils.MQTTClient(cfg.MQTTClientNameSub)
client.on_connect = on_connect
client.on_publish = on_publish
client.on_disconnect = on_disconnect
client.connect(host=cfg.host, port=cfg.port, keepalive=cfg.keepalive)
# initialize
init_dataFormat(cfg)
# set log
log_format = "%(asctime)s - %(levelname)s - %(message)s"
file_foramt = "%(message)s"
formatter = logging.Formatter(log_format)
file_formatter = logging.Formatter(file_foramt)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(cfg.logFileName, mode="w")
file_handler.setFormatter(file_formatter)
file_handler.setLevel(logging.INFO)
logger = logging.getLogger("__SUB__")
logger.setLevel(logging.INFO)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
logger.debug(cfg)
try:
client.loop_forever()
# rospy.spin()
except KeyboardInterrupt as e:
client.loop_stop()
client.disconnect()
logger.debug("Not elegeant end of program")
sys.exit()
client.loop_stop()
client.disconnect()
logger.debug("End of program")
sys.exit()

@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: duration.proto
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='duration.proto',
package='google.protobuf',
syntax='proto3',
serialized_options=b'\n\023com.google.protobufB\rDurationProtoP\001Z1google.golang.org/protobuf/types/known/durationpb\370\001\001\242\002\003GPB\252\002\036Google.Protobuf.WellKnownTypes',
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n\x0e\x64uration.proto\x12\x0fgoogle.protobuf\"*\n\x08\x44uration\x12\x0f\n\x07seconds\x18\x01 \x01(\x03\x12\r\n\x05nanos\x18\x02 \x01(\x05\x42\x83\x01\n\x13\x63om.google.protobufB\rDurationProtoP\x01Z1google.golang.org/protobuf/types/known/durationpb\xf8\x01\x01\xa2\x02\x03GPB\xaa\x02\x1eGoogle.Protobuf.WellKnownTypesb\x06proto3'
)
_DURATION = _descriptor.Descriptor(
name='Duration',
full_name='google.protobuf.Duration',
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name='seconds', full_name='google.protobuf.Duration.seconds', index=0,
number=1, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
_descriptor.FieldDescriptor(
name='nanos', full_name='google.protobuf.Duration.nanos', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=35,
serialized_end=77,
)
DESCRIPTOR.message_types_by_name['Duration'] = _DURATION
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Duration = _reflection.GeneratedProtocolMessageType('Duration', (_message.Message,), {
'DESCRIPTOR' : _DURATION,
'__module__' : 'duration_pb2'
# @@protoc_insertion_point(class_scope:google.protobuf.Duration)
})
_sym_db.RegisterMessage(Duration)
DESCRIPTOR._options = None
# @@protoc_insertion_point(module_scope)

@ -0,0 +1,14 @@
syntax = 'proto3';
package flight_information_proto;
// GPS + compass
message GPS {
optional float LAT = 1;
optional float LON = 2;
optional float ALT = 3;
}
message flight_information_message {
GPS gps = 1;
optional float heading = 2;
}

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: flight_information.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18\x66light_information.proto\x12\x18\x66light_information_proto\"S\n\x03GPS\x12\x10\n\x03LAT\x18\x01 \x01(\x02H\x00\x88\x01\x01\x12\x10\n\x03LON\x18\x02 \x01(\x02H\x01\x88\x01\x01\x12\x10\n\x03\x41LT\x18\x03 \x01(\x02H\x02\x88\x01\x01\x42\x06\n\x04_LATB\x06\n\x04_LONB\x06\n\x04_ALT\"j\n\x1a\x66light_information_message\x12*\n\x03gps\x18\x01 \x01(\x0b\x32\x1d.flight_information_proto.GPS\x12\x14\n\x07heading\x18\x02 \x01(\x02H\x00\x88\x01\x01\x42\n\n\x08_headingb\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'flight_information_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_GPS._serialized_start=54
_GPS._serialized_end=137
_FLIGHT_INFORMATION_MESSAGE._serialized_start=139
_FLIGHT_INFORMATION_MESSAGE._serialized_end=245
# @@protoc_insertion_point(module_scope)

@ -0,0 +1,17 @@
syntax = "proto3";
package fly_formation_proto;
// leader only
enum FLY_FORMATION{
FLY_FORMATION_UNSPECIFIED = 0;
FLY_FORMATION_v = 1;
FLY_FORMATION_X = 2;
FLY_FORMATION_O = 3;
FLY_FORMATION_LINE = 4;
FLY_FORMATION_ROW = 5;
FLY_FORMATION_HEX = 6;
}
message fly_formation_message{
optional float velocity = 1;
FLY_FORMATION fly_formation= 2;
}

@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: flyformatioln.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13\x66lyformatioln.proto\x12\x13\x66ly_formation_proto\"v\n\x15\x66ly_formation_message\x12\x15\n\x08velocity\x18\x01 \x01(\x02H\x00\x88\x01\x01\x12\x39\n\rfly_formation\x18\x02 \x01(\x0e\x32\".fly_formation_proto.FLY_FORMATIONB\x0b\n\t_velocity*\xb3\x01\n\rFLY_FORMATION\x12\x1d\n\x19\x46LY_FORMATION_UNSPECIFIED\x10\x00\x12\x13\n\x0f\x46LY_FORMATION_v\x10\x01\x12\x13\n\x0f\x46LY_FORMATION_X\x10\x02\x12\x13\n\x0f\x46LY_FORMATION_O\x10\x03\x12\x16\n\x12\x46LY_FORMATION_LINE\x10\x04\x12\x15\n\x11\x46LY_FORMATION_ROW\x10\x05\x12\x15\n\x11\x46LY_FORMATION_HEX\x10\x06\x62\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'flyformatioln_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_FLY_FORMATION._serialized_start=165
_FLY_FORMATION._serialized_end=344
_FLY_FORMATION_MESSAGE._serialized_start=44
_FLY_FORMATION_MESSAGE._serialized_end=162
# @@protoc_insertion_point(module_scope)

@ -0,0 +1,6 @@
from utils.readConfig import Read_delayA_PUB_Config
from utils.readConfig import Read_delayB_SUB_Config
from utils.readConfig import Read_delayA_SUB_Config
from utils.basicMqtt import MQTTClient

@ -0,0 +1,17 @@
import paho.mqtt.client as mqtt
import time
class MQTTClient(mqtt.Client):
def __init__(self,cname,**kwargs):
super().__init__(cname,**kwargs)
self.last_pub_time=time.time()
self.topic_ack=[]
self.run_flag=True
self.subscribe_flag=False
self.bad_connection_flag=False
self.connected_flag=True
self.disconnect_flag=False
self.disconnect_time=0.0
self.pub_msg_count=0
self.devices=[]

@ -0,0 +1,15 @@
MQTT:
msg_format: Proto
# msg_format: Json
qos: 1
MQTTClientNamePub: delayA_PUB
host: 192.168.50.117
port: 1883
keepalive: 60
msgAmount: 1000
msgInterval: 0.01
# Mqtt topic
Delay_topicToMqtt_PUB: delayA_PUB/delay
#ROS
LOG:
logFileName: delayA_PUB.log

@ -0,0 +1,12 @@
MQTT:
msg_format: Proto
# msg_format: Json
qos: 1
MQTTClientNameSub: delayA_SUB
host: 192.168.50.117
port: 1883
keepalive: 60
# Mqtt topic
Respond_topicToMqtt_SUB: delayB/respond
LOG:
logFileName: delayA_SUB.log

@ -0,0 +1,13 @@
MQTT:
msg_format: Proto
# msg_format: Json
qos: 1
MQTTClientNameSub: delayB_SUB
host: 192.168.50.117
port: 1883
keepalive: 60
# Mqtt topic
Delay_topicToMqtt_SUB: delayA_PUB/delay
Delay_topicToMqtt_PUB: delayB/respond
LOG:
logFileName: delayB_SUB.log

@ -0,0 +1,121 @@
import orjson
import time
import logging
import proto.flight_information_pb2 as flight_information_pb2
logger = logging.getLogger("__PUB__")
class Proto_msg:
#Protobuf
flight_information_msg = flight_information_pb2.flight_information_message()
flight_information_msg.gps.LAT = 100000000
flight_information_msg.gps.LON = 200000000
flight_information_msg.gps.ALT = 300000000
timePub = 0
#Mqtt
client = None
Delay_topicToMqtt_PUB = None
count = 0
qos = None
@classmethod
def callBack_gps(cls):
cls.flight_information_msg.heading = cls.count
flightInformationMsg = cls.flight_information_msg.SerializeToString()
cls.mqtt_Pub(message=flightInformationMsg, topics=cls.Delay_topicToMqtt_PUB)
@classmethod
def callBack_compass_hdg(cls, Compass):
cls.flight_information_msg.heading = Compass.data
flightInformationMsg = cls.flight_information_msg.SerializeToString()
cls.mqtt_Pub(message=flightInformationMsg, topics=cls.Delay_topicToMqtt_PUB)
# TODO: Formation.velocity check
@classmethod
def callBack_fly_formation(cls, Formation):
cls.fly_formation_msg.velocity = Formation.velocity
# cls.fly_formation_msg.fly_formation = flyformatioln_pb2.FLY_FORMATION_v
cls.fly_formation_msg.fly_formation = Formation.type
flyFormationMsg = cls.fly_formation_msg.SerializeToString()
cls.mqtt_Pub(message=flyFormationMsg, topics=cls.Delay_topicToMqtt_PUB)
@classmethod
# publish a message
def mqtt_Pub(cls, message:bytes, topics:str, waitForAck:bool=False)->None:
cls.timePub = time.perf_counter_ns()
mid = cls.client.publish(topics, message, cls.qos)[1]
logger.info("{} {}".format(cls.count,cls.timePub))
cls.count += 1
# print(f"just published {message} to topic")
if waitForAck:
while mid not in cls.client.topic_ack:
# TODO: logging
print("wait for ack")
time.sleep(0.25)
cls.client.topic_ack.remove(mid)
class Json_msg:
GPS_Data = {"lat": 100000000, "lon": 200000000, "alt": 300000000, "count":0}
count = 0
#Mqtt
client = None
Delay_topicToMqtt_PUB = None
timePub = 0
qos = None
@classmethod
def callBack_imu(cls, IMU):
gyro_x=str(IMU.linear_acceleration.x)
gyro_y=str(IMU.linear_acceleration.y)
gyro_z=str(IMU.linear_acceleration.z)
accel_x=str(IMU.angular_velocity.x)
accel_y=str(IMU.angular_velocity.y)
accel_z=str(IMU.angular_velocity.z)
dataImuUpdate = {"gyro_x": gyro_x, "gyro_y": gyro_y,"gyro_z":gyro_z, "accel_x": accel_x, "accel_y": accel_y, "accel_z": accel_z}
cls.data.update(dataImuUpdate)
# print ('gyro_x:'+gyro_x+'\n'+'gyro_y:'+gyro_y+'\n'+'gyro_z:'+gyro_z +'\n')
@classmethod
def callBack_gps(cls):
cls.GPS_Data["count"] = cls.count
dataJsonFormate = orjson.dumps(cls.GPS_Data)
cls.mqtt_Pub(message=dataJsonFormate, topics=cls.Delay_topicToMqtt_PUB)
# print ('lat:'+lat+'\n'+'lon:'+lon+'\n')
@classmethod
def callBack_compass_hdg(cls, Compass):
heading = int(Compass.data*100)
dataGpsUpdate = {"heading": heading}
cls.GPS_Data.update(dataGpsUpdate)
dataJsonFormate = orjson.dumps(cls.GPS_Data)
cls.mqtt_Pub(message=dataJsonFormate, topics=cls.Flight_Information_topicToMqtt)
@classmethod
def callBack_fly_formation(cls, Formation):
Formation_data = {"velocity": Formation.velocity, "type": Formation.type}
flyFormationMsg = orjson.dumps(Formation_data)
cls.mqtt_Pub(message=flyFormationMsg, topics=cls.Fly_Formation_topicToMqtt)
@classmethod
def mqtt_Pub(cls, message:str, topics:str, waitForAck:bool=False):
cls.timePub = time.perf_counter_ns()
mid = cls.client.publish(topics, message, cls.qos)[1]
logger.info("{} {}".format(cls.count,cls.timePub))
cls.count += 1
# if waitForAck:
# while mid not in cls.client.topic_ack:
# print("wait for ack")
# time.sleep(0.25)
# cls.client.topic_ack.remove(mid)

@ -0,0 +1,36 @@
import orjson
import time
import proto.flight_information_pb2 as flight_information_pb2
import proto.flyformatioln_pb2 as flyformatioln_pb2
import google.protobuf.json_format as json_format
import logging
logger = logging.getLogger("__delayA_SUB__")
class Proto_msg:
#Protobuf
flight_information_msg = None
timeRcv = 0
msgCounter = None
@classmethod
def on_message(cls, client, userdata, msg):
cls.timeRcv = time.perf_counter_ns()
cls.msgCounter = int(cls.flight_information_msg.FromString(msg.payload).heading)
logger.info("{} {}".format(cls.msgCounter, cls.timeRcv))
class Json_msg:
timeRcv = 0
msgCounter = None
@classmethod
def on_message(cls, client, userdata, msg):
cls.timeRcv = time.perf_counter_ns()
cls.msgCounter = orjson.loads(msg.payload)["count"]
logger.info("{} {}".format(cls.msgCounter, cls.timeRcv))

@ -0,0 +1,63 @@
import orjson
import time
import proto.flight_information_pb2 as flight_information_pb2
import proto.flyformatioln_pb2 as flyformatioln_pb2
import google.protobuf.json_format as json_format
import logging
# TODO: use native ros type instead of json or str
logger = logging.getLogger("__SUB__")
class Proto_msg:
#Protobuf
flight_information_msg = None
fly_formation_msg = None # delcare in function
#Mqtt topic: check data from which topic
Delay_topicToMqtt_PUB = None
qos = None
client = None
timeRcv = 0
timeRsp = 0
msgCounter = None
@classmethod
def mqtt_Pub(cls, msg, waitForAck:bool=False):
cls.timeRsp = time.perf_counter_ns()
mid = cls.client.publish(cls.Delay_topicToMqtt_PUB, msg, cls.qos)[1]
timeDiff = cls.timeRsp - cls.timeRcv
logger.info("{} {}".format(cls.msgCounter, timeDiff))
@classmethod
def on_message(cls, client, userdata, msg):
cls.timeRcv = time.perf_counter_ns()
cls.msgCounter = int(cls.flight_information_msg.FromString(msg.payload).heading)
cls.mqtt_Pub(msg=msg.payload)
class Json_msg:
#Mqtt topic: check data from which topic
Delay_topicToMqtt_PUB = None
qos = None
client = None
timeRcv = 0
timeRsp = 0
msgCounter = None
@classmethod
def mqtt_Pub(cls, msg, waitForAck:bool=False):
cls.timeRsp = time.perf_counter_ns()
mid = cls.client.publish(cls.Delay_topicToMqtt_PUB, msg, cls.qos)[1]
timeDiff = cls.timeRsp - cls.timeRcv
logger.info("{} {}".format(cls.msgCounter, timeDiff))
@classmethod
def on_message(cls, client, userdata, msg):
cls.timeRcv = time.perf_counter_ns()
cls.msgCounter = orjson.loads(msg.payload)["count"]
cls.mqtt_Pub(msg=msg.payload)

@ -0,0 +1,109 @@
import yaml
class Config:
def __init__(self, inFileName):
self.sectionNames = ["MQTT", "LOG"]
self.options = {}
self.inFileName = inFileName
def setAttribute(self):
with open(self.inFileName,"r") as f:
self.ymlcfg=yaml.safe_load(f)
ecfgs = [self.ymlcfg.get(name) for name in self.sectionNames]
if None in ecfgs:
nameIndex = ecfgs.index(None)
raise Exception("Missing {} section in cfg file".format(self.sectionNames[nameIndex]))
#iterate over options
for opts, ecfg in zip(self.options, ecfgs):
for opt in self.options[opts]:
if opt in ecfg:
optval=ecfg[opt]
#verify parameter type
if type(optval) != self.options[opts][opt][0]:
raise Exception("Parameter {} has wrong type".format(opt))
#create attributes on the fly
setattr(self,opt,optval)
else:
if self.options[opts][opt][1]:
raise Exception("Missing mandatory parameter {}".format(opt))
else:
setattr(self,opt,None)
def __str__(self):
return str(yaml.dump(self.ymlcfg, default_flow_style=False))
class Read_delayA_PUB_Config(Config):
def setAttribute(self):
super().setAttribute()
def __init__(self, inFileName):
super().__init__(inFileName)
self.options = {
self.sectionNames[0]:{
"msg_format": (str,True),
"MQTTClientNamePub": (str,True),
"host": (str,True),
"port": (int,True),
"keepalive": (int,True),
"Delay_topicToMqtt_PUB": (str,False),
"msgInterval": (float,True),
"msgAmount": (int,True),
"qos": (int,True)},
self.sectionNames[1]:{
"logFileName":(str,False)}}
self.setAttribute()
def __str__(self):
return super().__str__()
class Read_delayB_SUB_Config(Config):
def setAttribute(self):
super().setAttribute()
def __init__(self, inFileName):
super().__init__(inFileName)
self.options = {
self.sectionNames[0]:{
"msg_format": (str,True),
"MQTTClientNameSub": (str,True),
"host": (str,True),
"port": (int,True),
"keepalive": (int,True),
"Delay_topicToMqtt_SUB":(str,True),
"Delay_topicToMqtt_PUB":(str,True),
"qos": (int,True)},
self.sectionNames[1]:{
"logFileName":(str,False)}}
self.setAttribute()
class Read_delayA_SUB_Config(Config):
def setAttribute(self):
super().setAttribute()
def __init__(self, inFileName):
super().__init__(inFileName)
self.options = {
self.sectionNames[0]:{
"msg_format": (str,True),
"MQTTClientNameSub": (str,True),
"host": (str,True),
"port": (int,True),
"keepalive": (int,True),
"Respond_topicToMqtt_SUB":(str,True),
"qos": (int,True)},
self.sectionNames[1]:{
"logFileName":(str,False)}}
self.setAttribute()
def __str__(self):
return super().__str__()
if __name__ == "__main__":
cfg=Read_delayA_SUB_Config("mqttConfig_CMD.yml")
print(cfg)

@ -15,13 +15,13 @@ import rospy
def init_dataFormat(cfg:Read_CMD_Config):
if cfg.msg_format == "Json":
# Json_msg_to_ros.rate = rospy.Rate(10)
# Json_msg_to_ros.publisher_Flight_Information = rospy.Publisher(cfg.ROStopicName_Cmd_Broadcast_Receiver,String,queue_size=10)
# Json_msg_to_ros.publisher_Cmd_Broadcast = rospy.Publisher(cfg.ROStopicName_Cmd_Broadcast_Receiver,String,queue_size=10)
client.on_message = Json_msg_to_ros.on_message
Json_msg_to_ros.Cmd_Broadcast_topicToMqtt = cfg.Cmd_Broadcast_topicToMqtt
elif cfg.msg_format == "Proto":
pass
else:
logging.debug("msg_format not found")
logger.debug("msg_format not found")
def on_connect(self, userdata, flags, rc):
logger.info("Connected with result code " + str(rc))

@ -1,69 +0,0 @@
import numpy as np
import matplotlib.pyplot as plt
import orjson
def readData(fileName:str) -> dict:
with open(fileName, "r") as f:
data_dict = orjson.loads(f.read())
return data_dict
def plotSizeHist(size_data_dict:dict):
postion = 421
for OutterKey in size_data_dict:
plt.subplot(postion)
if postion == 428:
plt.ylim(top=440)
else:
plt.ylim(top=140)
postion += 1
size = list(size_data_dict[OutterKey].values())
width=0.3
x = ['']
x1=np.arange(len(x))
y1=(size[0])
x2=[p + width for p in x1]
y2=(size[1])
plt.bar(x1, y1, label='Json', width=0.1)
plt.bar(x2, y2, label='protobuf', width=0.1)
plt.xticks([p + width/2 for p in x1], x)
plt.legend()
plt.title(OutterKey)
plt.ylabel('MB')
figure = plt.gcf() # get current figure
figure.set_size_inches(16, 14)
plt.savefig("performancePlot/size.png")
plt.show()
def plotTimeHist(time_data_dict:dict):
postion = 211
for OutterKey in time_data_dict:
plt.subplot(postion)
postion += 1
size = list(time_data_dict[OutterKey].values())
width=0.3
x = ['']
x1=np.arange(len(x))
y1=(size[0])
x2=[p + width for p in x1]
y2=(size[1])
plt.bar(x1, y1, label='Json', width=0.1)
plt.bar(x2, y2, label='protobuf', width=0.1)
plt.xticks([p + width/2 for p in x1], x)
plt.legend()
plt.title(OutterKey)
plt.ylabel('second')
plt.savefig("performancePlot/time.png")
plt.show()
if __name__ == "__main__":
fileName_size, fileName_time = "Data/sizeData", "Data/timeData"
plotSizeHist(readData(fileName_size))
plotTimeHist(readData(fileName_time))

@ -1,32 +0,0 @@
#!/usr/bin/env python3
#coding:utf-8
import paho.mqtt.client as mqtt
import os
import sys
import time
import utils
import proto.flight_information_pb2 as flight_information_pb2
import proto.flyformatioln_pb2 as flyformatioln_pb2
import logging
def init_dataFormat(cfg:utils.Read_PUB_Config):
# ros_namespace="/drone1"
if cfg.msg_format == "Proto":
utils.Proto_msg_from_ros.client = client
utils.Proto_msg_from_ros.flight_information_msg = flight_information_pb2.flight_information_message()
utils.Proto_msg_from_ros.fly_formation_msg = flyformatioln_pb2.fly_formation_message()
utils.Proto_msg_from_ros.Flight_Information_topicToMqtt = cfg.Flight_Information_topicToMqtt
utils.Proto_msg_from_ros.Fly_Formation_topicToMqtt = cfg.Fly_Formation_topicToMqtt
utils.Proto_msg_from_ros.Fly_Formation_topicToMqtt_QOS = cfg.Fly_Formation_topicToMqtt_QOS
# rospy.Subscriber(ros_namespace+'/mavros/global_position/global', NavSatFix, utils.Proto_msg_from_ros.callBack_gps)
# rospy.Subscriber(ros_namespace+'/mavros/global_position/compass_hdg', Float64, utils.Proto_msg_from_ros.callBack_compass_hdg)
elif cfg.msg_format == "Json":
utils.Json_msg_from_ros.client = client
utils.Json_msg_from_ros.Flight_Information_topicToMqtt = cfg.Flight_Information_topicToMqtt
utils.Json_msg_from_ros.Fly_Formation_topicToMqtt = cfg.Fly_Formation_topicToMqtt
utils.Json_msg_from_ros.Fly_Formation_topicToMqtt_QOS = cfg.Fly_Formation_topicToMqtt_QOS
# rospy.Subscriber(ros_namespace+'/mavros/global_position/global', NavSatFix, utils.Json_msg_from_ros.callBack_gps)
# rospy.Subscriber(ros_namespace+'/mavros/global_position/compass_hdg', Float64, utils.Json_msg_from_ros.callBack_compass_hdg)
else:
logging.debug("msg_format not found")

@ -3,7 +3,7 @@ import proto.flight_information_pb2 as flight_information_pb2
import google.protobuf.json_format as json_format
import timeit
import orjson
import json
a = '{"gps": {"LAT": 34123.125, "LON": 23423.123, "ALT": 12123.123}, "heading": 155.12215}'
data ={}

@ -10,9 +10,9 @@ class Proto_msg_to_ros:
pass
class Json_msg_to_ros:
rate = None
# rate = None
# Ros publisher
publisher_Cmd_Broadcast = None
# publisher_Cmd_Broadcast = None
Cmd_Broadcast_topicToMqtt = None

Loading…
Cancel
Save