uavlink submsg no check

protobuf
RangeOfGlitching 3 years ago
parent a3a9c8a17a
commit 20ce2a6cec

@ -2,7 +2,6 @@
#coding:utf-8
import serial
import time
import random
import sys
import os
import proto.flight_information_pb2 as flight_information_pb2

@ -0,0 +1,70 @@
#!/usr/bin/env python3
#coding:utf-8
import serial
import time
import sys
import os
import proto.flight_information_pb2 as flight_information_pb2
import logging
from utils.readConfig import Read_SUB_Config
from utils.proto_uavlink_pub_data_to_ros import Proto_msg_to_ros
import rospy
from std_msgs.msg import String
from std_msgs.msg import Float64
from std_msgs.msg import Header
from mavros_msgs.srv import ParamGet
from sensor_msgs.msg import NavSatFix
from sensor_msgs.msg import Imu
from sensor_msgs.msg import Range
from geometry_msgs.msg import Vector3
def init_dataFormat(cfg:Read_SUB_Config):
ros_namespace="/drone1"
f1data = b'\xf1drone550..............\r\n'
sel.write(f1data)
readTenByte = sel.read_until(size=5)
print(readTenByte)
Proto_msg_to_ros.flight_information_msg = flight_information_pb2.flight_information_message()
Proto_msg_to_ros.rate = rospy.Rate(10)
Proto_msg_to_ros.publisher_Flight_Information = rospy.Publisher(cfg.ROStopicName_Flight_Information,String,queue_size=10)
if __name__ == '__main__':
FilePath = os.path.join(os.path.dirname(__file__),"utils","uavlinkConfig_SUB.yml")
cfg = Read_SUB_Config(FilePath)
print(cfg)
sel = serial.Serial(cfg.ttyport, cfg.baudrate, serial.EIGHTBITS, serial.PARITY_NONE, serial.STOPBITS_ONE)
# set log
log_format = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
stream_handler.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(cfg.logFileName)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
logger.info(cfg)
while True:
try:
# test json
readTenByte = sel.read_until(expected= b'\x01\x01', size=25)
except KeyboardInterrupt as e:
print("End of program")

@ -0,0 +1,32 @@
import time
import proto.flight_information_pb2 as flight_information_pb2
import google.protobuf.json_format as json_format
import logging
# TODO: use native ros type instead of json or str
logger = logging.getLogger("__SUB__")
class Proto_msg_to_ros:
#Protobuf
flight_information_msg = None
sel = None
#Ros publisher
rate = None
publisher_Flight_Information = None
#Proto
@classmethod
def on_message_Flight_Information(cls, msg):
proto = msg[1:23]
proto_msg = cls.flight_information_msg.FromString(proto)
protoTOJson_msg = json_format.MessageToJson(proto_msg, indent=None, preserving_proto_field_name=True)
logger.info(protoTOJson_msg)
cls.publisher_Flight_Information.publish(protoTOJson_msg)
cls.rate.sleep()

@ -106,7 +106,12 @@ class Read_SUB_Config(Config):
"Dron_ROStopicName_Flight_Information": (str,False),
"ROStopicName_Fly_Formation": (str,False)},
self.sectionNames[2]:{
"logFileName":(str,False)}}
"logFileName":(str,False)},
self.sectionNames[3]:{
"uavlink_msg_format": (str,False),
"uav_id": (str,False),
"baudrate": (int,False),
"ttyport": (str,False)}}
self.setAttribute()
def __str__(self):

@ -0,0 +1,12 @@
UAVLINK:
uavlink_msg_format: Proto
uav_id: \x01\x01
baudrate: 250000
ttyport: /dev/ttyUSB2
MQTT: "None"
ROS:
ROSClientNameSub: Drone550UAVLINKSub
ROStopicName_Flight_Information: Flight_Information_reciver
ROStopicName_Fly_Formation: Fly_Formation_reciver
LOG:
logFileName: UAVLINKsub.log
Loading…
Cancel
Save