mavlink_object 架構改善 新增功能 轉接不同 socket 的數據流

有時候不穩定 會有些小問題 之後再改善
另外 devRun 也更新了幾個範例
20號項 可以獨立測試 node 功能
12號項 展示最簡單的轉接到 GCS 功能
Chiyu Chen 1 year ago
parent 0af71f8c8e
commit f79aaf86fa

@ -13,29 +13,93 @@ import mavlinkObject as mo
# ====================== 分割線 =====================
test_item = 3
test_item = 12
print('test_item : ', test_item)
if test_item == 1:
# 測試 updateMultiplexingList 的輸出
print('===> Start of Program .Test ', test_item)
mavlink_object_none = mo.mavlink_object(None)
print('=====================')
print('正常範例')
mavlink_object_none.multiplexingToAnalysis = []
mavlink_object_none.multiplexingToReturn = []
# mavlink_object_none.multiplexingToSwap = [[]]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 長度不一致')
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('=====================')
print('正常範例')
mo.swap_queues.append(queue.Queue())
mo.swap_queues.append(queue.Queue())
mavlink_object_none.multiplexingToAnalysis = [0, 1, 2, 3, 4, 5]
mavlink_object_none.multiplexingToReturn = [6, 7, 8, 9, 10]
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 multiplexingToAnalysis 不可以有 -1')
mavlink_object_none.multiplexingToAnalysis = [-1, 1, 2, 3, 4, 5]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('End of Program')
elif test_item == 2:
# 測試 updateMultiplexingList 的輸出
print('===> Start of Program .Test ', test_item)
mavlink_object_none1 = mo.mavlink_object(None)
mavlink_object_none2 = mo.mavlink_object(None)
mavlink_object_none3 = mo.mavlink_object(None)
del mavlink_object_none2
print(len(mo.swap_queues))
mavlink_object_none2 = mo.mavlink_object(None)
print(len(mo.swap_queues))
print(mavlink_object_none1._multiplexingList)
print('End of Program')
elif test_item == 10:
# 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# 只啟用了 mavlink_object 的功能
print('Start of Program .Test 1')
print('===> Start of Program .Test ', test_item)
mavlink_object_none = mo.mavlink_object(None)
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket)
print(mavlink_object1.multiplexingToSwap)
print(mo.swap_queues)
mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE
mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT
mavlink_object1.multiplexingToConvert = [[74,]] # only VFR_HUD
mavlink_object1.multiplexingToSwap[0] = [74, ] # only VFR_HUD
# mavlink_object1.multiplexingToSwap[0] = [-1, ] # all
# 做一個空的 queue list 驗證 multiplexingToConvert
mo.swap_queues.append(queue.Queue())
# 啟動連線的模組
mavlink_object1.run()
print(mavlink_object1._multiplexingList)
# 運行幾秒並印出 queue 的資料
start_time = time.time()
while time.time() - start_time < 2:
while time.time() - start_time < 3:
while not mo.fixed_stream_bridge_queue.empty():
print('fixed_stream_bridge_queue:')
t = mo.fixed_stream_bridge_queue.get()
@ -43,32 +107,35 @@ if test_item == 1:
while not mo.return_packet_processor_queue.empty():
print('return_packet_processor_queue:')
t = mo.return_packet_processor_queue.get()
# print('from {} : {}'.format(t[0],t[2]))
print(t[2].type)
print('from {} : {}'.format(t[0],t[2]))
# print(t[2].get_msgbuf())
# print(t[2].type)
for q in mo.swap_queues:
for n in range(len(mo.swap_queues)):
q = mo.swap_queues[n]
while not q.empty():
print('swap_queues:')
t = q.get()
print('from {} : {}'.format(t[0],t[2]))
print('{} gets : {}'.format(n,t))
# 結束程式 退出所有 thread
mavlink_object1.running = False
mavlink_object1.thread.join()
mavlink_socket.close()
print('End of Program .Test 1')
print('End of Program')
elif test_item == 2:
elif test_item == 11:
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
# 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能
print('Start of Program .Test 2')
print('===> Start of Program .Test ', test_item)
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object2 = mo.mavlink_object(mavlink_socket)
mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object2.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object2.multiplexingToConvert = [] #
mavlink_object2.multiplexingToReturn = []
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
@ -105,11 +172,137 @@ elif test_item == 2:
analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance
analyzer.thread.join()
mavlink_socket.close()
print('End of Program .Test 2')
print('End of Program')
elif test_item == 12:
# 我這邊會測試 mavlink object 作為交換器的功能
print('===> Start of Program .Test ', test_item)
# 初始化兩個通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out, source_system=17, source_component=200)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = [0]
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# 做一個空的通道驗證
mavlink_object_none = mo.mavlink_object(None)
# mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all
# 啟動連線的模組
analyzer = mo.mavlink_bridge()
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
start_time = time.time()
show_time = time.time()
print("connection established!")
print(type(mavlink_socket_out))
print(type(mavlink_socket_out.mav))
while time.time() - start_time < 10:
try:
test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False)
print('none object : ', test)
except queue.Empty:
pass
if (time.time() - show_time) >= 2:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_in.close()
mavlink_socket_out.close()
analyzer.stop()
print('End of Program')
elif test_item == 20:
sysid = 1
compid = 1
# 這邊測試 node 生成 topic 的功能
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# mavlink_object_none = mo.mavlink_object(None)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
from pymavlink.dialects.v20 import common as mavlink2
mav = mavlink2.MAVLink(None)
mav.srcSystem = sysid
mav.srcComponent = compid
# 這是一個 heartbeat 封包
fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
type=mavlink2.MAV_TYPE_QUADROTOR,
autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
base_mode=3,
custom_mode=0,
system_status=0,
mavlink_version=3
)
mavlink_bytes = fake_heartbeat.pack(mav)
fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
print(analyzer.mavlink_systems)
mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
time.sleep(0.1)
print(analyzer.mavlink_systems)
analyzer._init_node()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
print("topic created")
time.sleep(5)
analyzer.emit_info()
# 結束程式
analyzer.running = False
analyzer.destroy_node()
rclpy.shutdown()
analyzer.stop()
analyzer.thread.join()
print('End of Program')
elif test_item == 3:
elif test_item == 21:
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('Start of Program .Test 3')
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
@ -117,7 +310,7 @@ elif test_item == 3:
mavlink_object3 = mo.mavlink_object(mavlink_socket)
mavlink_object3.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object3.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object3.multiplexingToConvert = [] #
mavlink_object3.multiplexingToSwap = [] #
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
@ -163,4 +356,4 @@ elif test_item == 3:
analyzer.thread.join()
mavlink_socket.close()
print('End of Program .Test 3')
print('End of Program')

@ -6,7 +6,11 @@ from theLogger import setup_logger
logger = setup_logger("mavlinkDevice.py")
# 用來記錄每個 system 的資訊
# 資料格式 { sysid : mavlink_device object }
mavlink_systems = {}
# ====================== 分割線 =====================
# 這個 class 是用來記錄每個 system 的資訊 每個 system 可能會有多個 component
class mavlink_device():

@ -23,7 +23,7 @@ from pymavlink import mavutil
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device
from mavlinkDevice import mavlink_device, mavlink_systems
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
@ -35,10 +35,6 @@ fixed_stream_bridge_queue = queue.Queue()
return_packet_processor_queue = queue.Queue()
swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer
# 用來記錄每個 system 的資訊
# 資料格式 { sysid : mavlink_device object }
mavlink_systems = {}
# ====================== 分割線 =====================
class mavlink_bridge(Node, mavlink_publisher):
@ -163,31 +159,39 @@ class mavlink_object():
分流表的控制在三個 list 分別為
multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的
multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的
multiplexingToConvert : 這個 list 是用來分流到轉拋串流的
multiplexingToSwap : 這個 list 是用來分流到轉拋串流的
透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為
'''
mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) }
socket_num = 0 # 用來記錄目前的 socket 數量
def __new__(cls, *args, **kwargs):
# 每創建一個實例 就將其添加到 mavlinkObjects 列表中
# 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號
# 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號
# socket_id 從 1 開始
# socket_id 從 0 開始
instance = super().__new__(cls)
socket_id = 1
socket_id = 0
for k in cls.mavlinkObjects.keys():
if k == socket_id:
socket_id += 1
else:
break
instance.socket_id = socket_id
cls.socket_num += 1
cls.mavlinkObjects[socket_id] = instance
return instance
def __init__(self, socket):
self.mavlink_socket = socket
self.msg_count = {}
self.multiplexingList = []
# 這邊變數是執行的時候被使用的 不要直接寫入它
self._multiplexingList = []
# 存放要發送的訊息的 queue 或稱 buffer
self.output_buffer = queue.Queue()
if self.socket_id >= len(swap_queues):
swap_queues.append(self.output_buffer)
else:
swap_queues[self.socket_id] = self.output_buffer
# 關聯到全域變數
global mavlink_systems
@ -195,27 +199,57 @@ class mavlink_object():
# 這三個 list 用來分配不同的訊息到不同的 queue
self.multiplexingToAnalysis = [
0, # HEARTBEAT
24, # GPS_RAW_INT
30, # ATTITUDE
33, # GLOBAL_POSITION_INT
74, # VFR_HUD
147, # BATTERY_STATUS
]
self.multiplexingToReturn = [
0, # HEARTBEAT
# 30, # ATTITUDE
0, # HEARTBEAT # 挺必要的項目
# 24, # GPS_RAW_INT
# 30, # ATTITUDE
# 33, # GLOBAL_POSITION_INT
# 74, # VFR_HUD
# 147, # BATTERY_STATUS
]
self.multiplexingToConvert = [
self.multiplexingToReturn = []
self.multiplexingToSwap = [
[] for _ in range(len(swap_queues))
]
# 刷新其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) <= self.socket_id):
object.multiplexingToSwap.append([])
object.updateMultiplexingList()
logger.info('mavlink_object instance {} created'.format(self.socket_id))
def __del__(self):
self.mavlink_socket.close()
self.stop()
# 停下自己的 thread
if self.mavlink_socket != None:
self.mavlink_socket.close()
self.stop()
# 移除其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) > self.socket_id):
object.multiplexingToSwap[self.socket_id] = []
object.updateMultiplexingList()
# 移除自己的 swap_queues
swap_queues[self.socket_id] = None
# 處理 class 的 instance 記錄
self.socket_num -= 1
self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance
# logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替
print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
# try:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
# except Exception as e:
# print(f"Error logging in __del__: {e}")
# if 'logger' in globals() and logger:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
def run(self):
# TODO 檢查 socket 是否有連線
self.thread = threading.Thread(target=self._run_thread)
self.running = self.updateMultiplexingList()
self.thread.start()
@ -247,15 +281,13 @@ class mavlink_object():
# mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型
mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp)
# print(type(mavlinkMsg.pack)) # debug
# print("mark A")
# # print(dir(mavlinkMsg)) # debug
# print(mavlinkMsg.pack()) # debug
# break # Debug
# break # Debug function can put here you can see the input data from mavlink
# 將訊息依照 multiplexing list 分發到不同的 queue
for i in range(len(self.multiplexingList)):
if mavlinkMsg.get_msgId() in self.multiplexingList[i]:
for i in range(len(self._multiplexingList)):
if (self._multiplexingList[i] == []):
continue
elif (mavlinkMsg.get_msgId() in self._multiplexingList[i]) or (self._multiplexingList[i][0] == -1):
if i == 0:
fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg))
elif i == 1:
@ -266,6 +298,20 @@ class mavlink_object():
_queue.put(mavlinkMsg)
# 處理要送出的封包
# 如果 有資料在 output_buffer 中則將其取出並發送
# 沒有就略過發送
try:
mavlinkMsg_send = self.output_buffer.get(block=False)
except queue.Empty:
mavlinkMsg_send = None
# except Exception as e:
# logger.error(f"Error getting data from output_buffer: {e}")
# mavlinkMsg_send = None
if mavlinkMsg_send:
# self.mavlink_socket.mav.send(mavlinkMsg_send)
self.mavlink_socket.write(mavlinkMsg_send.get_msgbuf()) # 這邊會將封包寫入 socket 中
# thread 結束
logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id))
@ -274,20 +320,37 @@ class mavlink_object():
'''
更新 multiplexing list 並做簡單的檢查
'''
# 檢查 multiplexingToAnalysis 與 multiplexingToReturn 是否有 -1 值
check = (-1 in self.multiplexingToAnalysis) or (-1 in self.multiplexingToReturn)
if check:
logger.error('MultiplexingToAnalysis or MultiplexingToReturn NOT all type spilt. socket id : {}'.format(self.socket_id))
return False
# 更新 multiplexing list
self.multiplexingList = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToConvert
# 檢查 multiplexingToSwap 與 swap_queues 的長度是否一致 而且 swap_queues 的長度不能為 0
check = len(self.multiplexingToSwap) != len(swap_queues) or len(swap_queues) == 0
if check:
logger.error('Multiplexing Queue not fit List , Please check the list. socket id : {}'.format(self.socket_id))
return False
# 對應到自己的 multiplexingToSwap 必需為空 避免對自己迴圈轉拋
try:
self.multiplexingToSwap[self.socket_id] = []
except IndexError:
logger.error('Multiplexing List of self socket id should be void. socket id : {}'.format(self.socket_id))
return False
# 檢查 multiplexing list 格式是否有錯誤
check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in self.multiplexingList)
# 組合 multiplexing list
multiL_tmp = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToSwap
# 檢查 multiplexing list 格式是否有錯誤 # 全部都要是 list 每個 list 裡面都要是 int
check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in multiL_tmp)
if not check:
logger.error('Multiplexing List Format Error, Please check the list')
logger.error('Multiplexing List Format Error, Please check the list. socket id : {}'.format(self.socket_id))
return False # 若有錯誤則回傳 False
check = len(self.multiplexingToConvert) != len(swap_queues)
if check:
logger.error('Multiplexing Queue not fit List , Please check the list')
return False
# 更新 multiplexing list
self._multiplexingList = multiL_tmp
return True
# ====================== 分割線 =====================

Loading…
Cancel
Save