SUB PUB json and proto
parent
f3fca9d37d
commit
a41be855c4
@ -0,0 +1,6 @@
|
|||||||
|
from utils.protoJson_mqtt_pub_data_to_ros import Proto_msg_to_ros
|
||||||
|
from utils.protoJson_mqtt_pub_data_to_ros import Json_msg_to_ros
|
||||||
|
from utils.protoJson_mqtt_sub_data_from_ros import Proto_msg_from_ros
|
||||||
|
from utils.protoJson_mqtt_sub_data_from_ros import Json_msg_from_ros
|
||||||
|
from utils.readConfig import MQTT_ROS_Config
|
||||||
|
from utils.basicMqtt import MQTTClient
|
||||||
@ -0,0 +1,17 @@
|
|||||||
|
import paho.mqtt.client as mqtt
|
||||||
|
import time
|
||||||
|
class MQTTClient(mqtt.Client):
|
||||||
|
|
||||||
|
def __init__(self,cname,**kwargs):
|
||||||
|
super().__init__(cname,**kwargs)
|
||||||
|
self.last_pub_time=time.time()
|
||||||
|
self.topic_ack=[]
|
||||||
|
self.run_flag=True
|
||||||
|
self.subscribe_flag=False
|
||||||
|
self.bad_connection_flag=False
|
||||||
|
self.connected_flag=True
|
||||||
|
self.disconnect_flag=False
|
||||||
|
self.disconnect_time=0.0
|
||||||
|
self.pub_msg_count=0
|
||||||
|
self.devices=[]
|
||||||
|
|
||||||
@ -0,0 +1,21 @@
|
|||||||
|
MQTT_ROS:
|
||||||
|
msg_format: Proto
|
||||||
|
MQTTClientNamePub: Drone650Pub
|
||||||
|
MQTTClientNameSub: Drone650Sub
|
||||||
|
host: 192.168.50.117
|
||||||
|
port: 1883
|
||||||
|
keepalive: 60
|
||||||
|
willTopic: CheckDoneConnect
|
||||||
|
willTopicQOS: 1
|
||||||
|
lwt: Drone650 Gone Offline
|
||||||
|
willRetain: False
|
||||||
|
# Mqtt topic
|
||||||
|
Flight_Information_topicToMqtt: drone/leader/Flight_Information
|
||||||
|
Fly_Formation_topicToMqtt: drone/leader/Formation
|
||||||
|
# Change formate qos
|
||||||
|
Fly_Formation_topicToMqtt_QOS: 2
|
||||||
|
#ROS
|
||||||
|
ROSClientNamePub: Drone650Pub
|
||||||
|
ROSClientNameSub: Drone650Sub
|
||||||
|
ROStopicName_Flight_Information: Flight_Information_reciver
|
||||||
|
ROStopicName_Fly_Formation: Fly_Formation_reciver
|
||||||
@ -0,0 +1,60 @@
|
|||||||
|
import orjson
|
||||||
|
import time
|
||||||
|
|
||||||
|
import proto.flight_information_pb2 as flight_information_pb2
|
||||||
|
import proto.flyformatioln_pb2 as flyformatioln_pb2
|
||||||
|
import google.protobuf.json_format as json_format
|
||||||
|
|
||||||
|
class Proto_msg_to_ros:
|
||||||
|
#Protobuf
|
||||||
|
flight_information_msg = None
|
||||||
|
fly_formation_msg = None # delcare in function
|
||||||
|
|
||||||
|
#Ros publisher
|
||||||
|
rate = None
|
||||||
|
publisher_Flight_Information = None
|
||||||
|
publisher_Fly_Formation = None
|
||||||
|
|
||||||
|
#Mqtt topic: check data from which topic
|
||||||
|
Flight_Information_topicToMqtt = None
|
||||||
|
Fly_Formation_topicToMqtt = None
|
||||||
|
|
||||||
|
#Proto
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def ros_pub(cls, dataProto):
|
||||||
|
if dataProto.topic == cls.Flight_Information_topicToMqtt:
|
||||||
|
proto_msg = cls.flight_information_msg.FromString(dataProto.payload)
|
||||||
|
protoTOJson_msg = json_format.MessageToJson(proto_msg, indent=None, preserving_proto_field_name=True)
|
||||||
|
print(protoTOJson_msg)
|
||||||
|
# cls.publisher_flight_information.publish(protoTOJson_msg)
|
||||||
|
# cls.rate.sleep()
|
||||||
|
elif dataProto.topic == cls.Fly_Formation_topicToMqtt:
|
||||||
|
proto_msg = cls.fly_formation_msg.FromString(dataProto.payload)
|
||||||
|
protoTOJson_msg = json_format.MessageToJson(proto_msg, indent=None, preserving_proto_field_name=True)
|
||||||
|
cls.publisher_Formation.publish(protoTOJson_msg)
|
||||||
|
cls.rate.sleep()
|
||||||
|
else:
|
||||||
|
print("topic not found")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Json_msg_to_ros:
|
||||||
|
rate = None
|
||||||
|
#Ros publisher
|
||||||
|
publisher_Flight_Information = None
|
||||||
|
publisher_Fly_Formation = None
|
||||||
|
#Mqtt topic: check data from which topic
|
||||||
|
Flight_Information_topicToMqtt = None
|
||||||
|
Fly_Formation_topicToMqtt = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def ros_pub(cls, dataJson):
|
||||||
|
if dataJson.topic == cls.Flight_Information_topicToMqtt:
|
||||||
|
cls.publisher_flight_information.publish(dataJson.payload.decode('utf-8'))
|
||||||
|
cls.rate.sleep()
|
||||||
|
elif dataJson.topic == cls.Fly_Formation_topicToMqtt:
|
||||||
|
cls.publisher_Fly_Formation.publish(dataJson.payload.decode('utf-8'))
|
||||||
|
cls.rate.sleep()
|
||||||
|
else:
|
||||||
|
print("topic not found")
|
||||||
@ -0,0 +1,113 @@
|
|||||||
|
import orjson
|
||||||
|
import time
|
||||||
|
|
||||||
|
class Proto_msg_from_ros:
|
||||||
|
#Protobuf
|
||||||
|
flight_information_msg = None
|
||||||
|
fly_formation_msg = None # delcare in function
|
||||||
|
#Mqtt
|
||||||
|
client = None
|
||||||
|
Flight_Information_topicToMqtt = None
|
||||||
|
Fly_Formation_topicToMqtt = None
|
||||||
|
Fly_Formation_topicToMqtt_QOS = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def callBack_gps(cls, GPS):
|
||||||
|
# flight_information_msg = flight_information_pb2.flight_information_message()
|
||||||
|
cls.flight_information_msg.gps.LAT = GPS.latitude
|
||||||
|
cls.flight_information_msg.gps.LON = GPS.longitude
|
||||||
|
cls.flight_information_msg.gps.ALT = GPS.altitude
|
||||||
|
# print ('lat:'+lat+'\n'+'lon:'+lon+'\n')
|
||||||
|
# print(flight_information_msg)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def callBack_compass_hdg(cls, Compass):
|
||||||
|
cls.flight_information_msg.heading = Compass.data
|
||||||
|
|
||||||
|
flightInformationMsg = cls.flight_information_msg.SerializeToString()
|
||||||
|
cls.mqtt_Pub(message=flightInformationMsg, topics=cls.Flight_Information_topicToMqtt)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Formation.velocity check
|
||||||
|
@classmethod
|
||||||
|
def callBack_fly_formation(cls, Formation):
|
||||||
|
cls.fly_formation_msg.velocity = Formation.velocity
|
||||||
|
# cls.fly_formation_msg.fly_formation = flyformatioln_pb2.FLY_FORMATION_v
|
||||||
|
cls.fly_formation_msg.fly_formation = 2
|
||||||
|
flyFormationMsg = cls.fly_formation_msg.SerializeToString()
|
||||||
|
cls.mqtt_Pub(message=flyFormationMsg, topics=cls.Fly_Formation_topicToMqtt, qos=cls.Fly_Formation_topicToMqtt_QOS)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
# publish a message
|
||||||
|
def mqtt_Pub(cls, message:bytes, topics:str, waitForAck:bool=False, qos:int=0)->None:
|
||||||
|
mid = cls.client.publish(topics, message, qos)[1]
|
||||||
|
# print(f"just published {message} to topic")
|
||||||
|
if waitForAck:
|
||||||
|
while mid not in cls.client.topic_ack:
|
||||||
|
print("wait for ack")
|
||||||
|
time.sleep(0.25)
|
||||||
|
cls.client.topic_ack.remove(mid)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Json_msg_from_ros:
|
||||||
|
data = {}
|
||||||
|
client = None
|
||||||
|
#Mqtt
|
||||||
|
Flight_Information_topicToMqtt = None
|
||||||
|
Fly_Formation_topicToMqtt = None
|
||||||
|
Fly_Formation_topicToMqtt_QOS = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def callBack_imu(cls, IMU):
|
||||||
|
gyro_x=str(IMU.linear_acceleration.x)
|
||||||
|
gyro_y=str(IMU.linear_acceleration.y)
|
||||||
|
gyro_z=str(IMU.linear_acceleration.z)
|
||||||
|
|
||||||
|
accel_x=str(IMU.angular_velocity.x)
|
||||||
|
accel_y=str(IMU.angular_velocity.y)
|
||||||
|
accel_z=str(IMU.angular_velocity.z)
|
||||||
|
|
||||||
|
dataImuUpdate = {"gyro_x": gyro_x, "gyro_y": gyro_y,"gyro_z":gyro_z, "accel_x": accel_x, "accel_y": accel_y, "accel_z": accel_z}
|
||||||
|
cls.data.update(dataImuUpdate)
|
||||||
|
# print ('gyro_x:'+gyro_x+'\n'+'gyro_y:'+gyro_y+'\n'+'gyro_z:'+gyro_z +'\n')
|
||||||
|
print ('accel_x:'+accel_x+'\n'+'accel_y:'+accel_y+'\n'+'accel_z:'+accel_z +'\n')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def callBack_gps(cls, GPS):
|
||||||
|
lat=int(GPS.latitude*10000000) #change the scale to centimeters
|
||||||
|
lon=int(GPS.longitude*10000000)
|
||||||
|
alt=int(GPS.altitude*100)
|
||||||
|
dataGpsUpdate = {"lat": lat, "lon": lon, "alt": alt}
|
||||||
|
cls.data.update(dataGpsUpdate)
|
||||||
|
# dataJsonFormate = orjson.dumps(data)
|
||||||
|
# mqtt_Pub(message=dataJsonFormate)
|
||||||
|
# print ('lat:'+lat+'\n'+'lon:'+lon+'\n')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def callBack_state(cls, state):
|
||||||
|
mode = state.mode
|
||||||
|
dataGpsUpdate = {"mode": mode}
|
||||||
|
cls.data.update(dataGpsUpdate)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def callBack_compass_hdg(cls, Compass):
|
||||||
|
heading = int(Compass.data*100)
|
||||||
|
dataGpsUpdate = {"heading": heading}
|
||||||
|
cls.data.update(dataGpsUpdate)
|
||||||
|
dataJsonFormate = orjson.dumps(cls.data).decode("UTF-8")
|
||||||
|
cls.mqtt_Pub(message=dataJsonFormate, topics=cls.Flight_Information_topicToMqtt)
|
||||||
|
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def mqtt_Pub(cls, message:str, topics:str, waitForAck:bool=False, qos:int=0):
|
||||||
|
mid = cls.client.publish(topics, message, qos)[1]
|
||||||
|
# print(f"just published {message} to topic")
|
||||||
|
if waitForAck:
|
||||||
|
while mid not in cls.client.topic_ack:
|
||||||
|
print("wait for ack")
|
||||||
|
time.sleep(0.25)
|
||||||
|
cls.client.topic_ack.remove(mid)
|
||||||
@ -0,0 +1,65 @@
|
|||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
|
class MQTT_ROS_Config:
|
||||||
|
"""
|
||||||
|
MQTT_ROS_Config configuration class
|
||||||
|
"""
|
||||||
|
# class variables
|
||||||
|
sectionName='MQTT_ROS'
|
||||||
|
options={
|
||||||
|
'msg_format': (str,True),
|
||||||
|
'MQTTClientNamePub': (str,True),
|
||||||
|
'MQTTClientNameSub': (str,True),
|
||||||
|
'host': (str,True),
|
||||||
|
'port': (int,True),
|
||||||
|
'keepalive': (int,True),
|
||||||
|
'willTopic':(str,True),
|
||||||
|
'lwt':(str, True),
|
||||||
|
'willRetain':(bool,False),
|
||||||
|
'willTopicQOS':(int,True),
|
||||||
|
'Flight_Information_topicToMqtt': (str,False),
|
||||||
|
'Fly_Formation_topicToMqtt': (str,False),
|
||||||
|
'Fly_Formation_topicToMqtt_QOS':(int,False),
|
||||||
|
'ROSClientNamePub': (str,False),
|
||||||
|
'ROSClientNameSub': (str,False),
|
||||||
|
'ROStopicName_Flight_Information': (str,False),
|
||||||
|
'ROStopicName_Fly_Formation': (str,False)}
|
||||||
|
|
||||||
|
#constructor
|
||||||
|
def __init__(self, inFileName):
|
||||||
|
#read YAML config and get EV3 section
|
||||||
|
infile=open(inFileName,'r')
|
||||||
|
ymlcfg=yaml.safe_load(infile)
|
||||||
|
infile.close()
|
||||||
|
eccfg=ymlcfg.get(self.sectionName,None)
|
||||||
|
if eccfg is None: raise Exception('Missing {} section in cfg file'.format(self.sectionName))
|
||||||
|
|
||||||
|
#iterate over options
|
||||||
|
for opt in self.options:
|
||||||
|
if opt in eccfg:
|
||||||
|
optval=eccfg[opt]
|
||||||
|
|
||||||
|
#verify parameter type
|
||||||
|
if type(optval) != self.options[opt][0]:
|
||||||
|
raise Exception('Parameter "{}" has wrong type'.format(opt))
|
||||||
|
|
||||||
|
#create attributes on the fly
|
||||||
|
setattr(self,opt,optval)
|
||||||
|
else:
|
||||||
|
if self.options[opt][1]:
|
||||||
|
raise Exception('Missing mandatory parameter "{}"'.format(opt))
|
||||||
|
else:
|
||||||
|
setattr(self,opt,None)
|
||||||
|
|
||||||
|
#string representation for class data
|
||||||
|
def __str__(self):
|
||||||
|
return str(yaml.dump(self.__dict__,default_flow_style=False))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
cfg=MQTT_ROS_Config("mqttConfig.yml")
|
||||||
|
print(cfg)
|
||||||
|
|
||||||
|
|
||||||
Loading…
Reference in New Issue