Merge branch 'chiyu' into wenchun

chiyu
wenchun 11 months ago
commit cd3c0f5bee

@ -11,7 +11,8 @@
Python
1. pymavlink
2.
2. conda-forge 中的 pyserial-asyncio
3.
ROS2
1. source ~/ros2_humble/install/setup.bash

@ -1,531 +0,0 @@
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
import mavlinkObject as mo
import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 51
running_time = 30
print('test_item : ', test_item)
if test_item == 1:
# 測試 mavlink_object 中 updateMultiplexingList 的輸出
print('===> Start of Program .Test ', test_item)
mavlink_object_none = mo.mavlink_object(None)
print('=====================')
print('正常範例')
mavlink_object_none.multiplexingToAnalysis = []
mavlink_object_none.multiplexingToReturn = []
# mavlink_object_none.multiplexingToSwap = [[]]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 長度不一致')
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('=====================')
print('正常範例')
mo.swap_queues.append(queue.Queue())
mo.swap_queues.append(queue.Queue())
mavlink_object_none.multiplexingToAnalysis = [0, 1, 2, 3, 4, 5]
mavlink_object_none.multiplexingToReturn = [6, 7, 8, 9, 10]
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 multiplexingToAnalysis 不可以有 -1')
mavlink_object_none.multiplexingToAnalysis = [-1, 1, 2, 3, 4, 5]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('<=== End of Program')
elif test_item == 2:
# 測試 mavlink_object 創建時 socket_id 是否正確
# 說實在 這個測試項 似乎因為 python 的 GC 機制 會導致難以測試
print('===> Start of Program .Test ', test_item)
mavlink_object_none1 = mo.mavlink_object(None)
mavlink_object_none2 = mo.mavlink_object(None)
mavlink_object_none3 = mo.mavlink_object(None)
del mavlink_object_none2
print(len(mo.swap_queues))
mavlink_object_none2 = mo.mavlink_object(None)
print(len(mo.swap_queues))
print(mavlink_object_none1._multiplexingList)
print('<=== End of Program')
elif test_item == 10:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# 只啟用了 mavlink_object 的功能
print('===> Start of Program .Test ', test_item)
# 創建一個空的通道 這個通道的 socket_id 是 0
mavlink_object_none = mo.mavlink_object(None)
# 創建另一個通道
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket)
# 設定通道轉發的參數
mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE
mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT
mavlink_object1.multiplexingToSwap[0] = [74, ] # only VFR_HUD
# print(mavlink_object1.multiplexingToSwap)
# print(mo.swap_queues)
# 啟動通道
mavlink_object1.run()
# 確認轉拋的設定有沒有錯
print("_multiplexingList for mavlink object :", mavlink_object1._multiplexingList)
# 運行幾秒並印出 queue 的資料
start_time = time.time()
while time.time() - start_time < running_time:
while not mo.fixed_stream_bridge_queue.empty():
print('fixed_stream_bridge_queue:')
t = mo.fixed_stream_bridge_queue.get()
print('from {} : {}'.format(t[0],t[2]))
while not mo.return_packet_processor_queue.empty():
print('return_packet_processor_queue:')
t = mo.return_packet_processor_queue.get()
print('from {} : {}'.format(t[0],t[2]))
# print(t[2].get_msgbuf())
# print(t[2].type)
for n in range(len(mo.swap_queues)):
q = mo.swap_queues[n]
while not q.empty():
print('swap_queues:')
t = q.get()
print('{} gets : {}'.format(n,t))
# 結束程式 退出所有 thread
mavlink_object1.running = False
mavlink_object1.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 11:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
# 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能
print('===> Start of Program .Test ', test_item)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 創建通道
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object2 = mo.mavlink_object(mavlink_socket)
# mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT
# mavlink_object2.multiplexingToReturn = []
# 啟動連線的模組
mavlink_object2.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(2)
print("=== connection established! ===")
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
start_time = time.time()
show_time = time.time()
compid = 1
while time.time() - start_time < running_time:
if (time.time() - show_time) >= 1:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count)
print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 結束程式 退出所有 thread
mavlink_object2.stop()
mavlink_object2.thread.join()
analyzer.stop()
analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 12:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試 mavlink object 作為交換器功能的代碼
print('===> Start of Program .Test ', test_item)
# 啟動連線的模組
analyzer = mo.mavlink_bridge()
# 初始化輸入通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT
# 初始化輸出通道
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = [0]
# 做一個空的通道驗證 可以拿來 debug
mavlink_object_none = mo.mavlink_object(None)
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(3)
print("=== connection established! ===")
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
# print(type(mavlink_socket_out))
# print(type(mavlink_socket_out.mav))
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False)
print('none object : ', test)
except queue.Empty:
pass
if (time.time() - show_time) >= 2:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_in.close()
mavlink_socket_out.close()
analyzer.stop()
print('<=== End of Program')
elif test_item == 20:
# 這邊測試 node 生成 topic 的功能
# 利用 空的通道 發出假的 heartbeat 封包
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
from pymavlink.dialects.v20 import common as mavlink2
sysid = 1
compid = 1
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
mav = mavlink2.MAVLink(None)
mav.srcSystem = sysid
mav.srcComponent = compid
# 這是一個 heartbeat 封包
fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
type=mavlink2.MAV_TYPE_QUADROTOR,
autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
base_mode=3,
custom_mode=0,
system_status=0,
mavlink_version=3
)
mavlink_bytes = fake_heartbeat.pack(mav)
fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
print(analyzer.mavlink_systems)
mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
time.sleep(0.1)
print(analyzer.mavlink_systems)
# ROS2 初始化
analyzer._init_node()
# 創建 ROS2 topic
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
print("topic created")
time.sleep(5)
# 丟出 topic
analyzer.emit_info()
# 結束程式
analyzer.running = False
analyzer.destroy_node()
rclpy.shutdown()
analyzer.stop()
analyzer.thread.join()
print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
# 啟動通道
mavlink_object3.run()
print('=== waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(1)
except KeyboardInterrupt:
break
# 程式結束
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 22:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 初始化兩個通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = []
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
show_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
show_time2 = time.time()
while time.time() - start_time < running_time:
if (time.time() - show_time2) >= 1:
analyzer.emit_info() # 這邊是測試 node 的運行
# sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
sss = mavutil.mode_string_v10(fmsg)
# print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
print("目前的飛行模式 : {}".format(sss))
show_time2 = time.time()
# if (time.time() - show_time) >= 2:
# show_time = time.time()
# for sysid in analyzer.mavlink_systems:
# for compid in analyzer.mavlink_systems[sysid].components:
# print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# print("===================")
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_socket_in.close()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_out.close()
analyzer.stop()
analyzer.thread.join()
print('<=== End of Program')
elif test_item == 51:
# 晉凱的測試項目
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket3 = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket3)
# 設定通道流動
mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
mavlink_object3.multiplexingToReturn = [] #
# mavlink_object3.multiplexingToSwap = [] #
# 啟動通道
mavlink_object3.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create all topic: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(0.5)
except KeyboardInterrupt:
break
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket3.close()
print('<=== End of Program')

@ -1,6 +1,6 @@
# 自定義的 import
from theLogger import setup_logger
from .theLogger import setup_logger
# ====================== 分割線 =====================
@ -31,6 +31,11 @@ class mavlink_device():
# p_str += '=====================\n'
return p_str
'''
寫了半天 這個功能應該是 pymalink 本來就有的
去找 pymavlink_util.py mavfile(object)
算了 先擺著吧 之後再看看怎麼整合
'''
def updateComponentPacketCount(self, compid, current_seq, current_type, current_time):
# 這段檢查遺失封包
try:

@ -1,8 +1,7 @@
'''
# 不同的匯流排只有單一種通訊協定
# 匯流排接到訊息後透過 queue stack 傳送到橋接器
# 會有三種 Queue 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 3. 轉拋串流
# 第三種比較麻煩 會需要用 list 管理多個轉換器的 queue
# 匯流排接到訊息後透過 ring buffer 傳送到橋接器
# 會有兩種 RingBuffer 分別作為 1. 固定串流橋接器 2. 回傳封包處理器
# 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去
# 關於 mavlink 採用 pymavlink 這個套件 製作匯流排
@ -13,8 +12,11 @@
# 基礎功能的 import
import threading
import queue
# import queue
import time
import asyncio
from enum import Enum, auto
# from typing import Dict, List, Optional, Set, Any, Tuple
# mavlink 的 import
from pymavlink import mavutil
@ -23,38 +25,17 @@ from pymavlink import mavutil
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
# 基礎功能的 import
import threading
import queue
import time
# mavlink 的 import
from pymavlink import mavutil
# ROS2 的 import
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device, mavlink_systems
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
from .mavlinkDevice import mavlink_device, mavlink_systems
from .mavlinkPublish import mavlink_publisher
from .theLogger import setup_logger
from .ringBuffer import RingBuffer
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
fixed_stream_bridge_queue = queue.Queue()
return_packet_processor_queue = queue.Queue()
swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer
stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255)
return_packet_ring = RingBuffer(capacity=256, buffer_id=254)
# ====================== 分割線 =====================
@ -75,6 +56,8 @@ class mavlink_bridge(Node, mavlink_publisher):
node 區塊則是處理 ros2 publisher subscriber 訂閱相關
藉由控制 ros2 的機制再把 device object 的資訊發送出去
fixed_stream_bridge_queue 置換成 stream_bridge_ring
ps. 我限制了這個 class 只能有一個 instance
'''
_instance = None
@ -106,13 +89,25 @@ class mavlink_bridge(Node, mavlink_publisher):
# === Thread 區塊 ===
def _run_thread(self):
# start_time = time.time() # debug
start_time = time.time() # debug
run_loop_count = 0 # debug
logger.info('Start of mavlink_bridge._run_thread')
# 從 Queue fixed_stream_bridge_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
# 從 Queue stream_bridge_ring 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
while self.running:
if fixed_stream_bridge_queue.empty():
# # 這個迴圈每秒鐘被執行了幾輪? # 這整段都是 debug 用的
# current_time = time.time()
# if (current_time - start_time) >= 1.0:
# logger.info(f'mavlink_bridge._run_thread loop count: {run_loop_count}')
# run_loop_count = 0
# start_time = current_time
# else:
# run_loop_count += 1
# # =========================
if stream_bridge_ring.is_empty():
continue
msg_pack = fixed_stream_bridge_queue.get()
msg_pack = stream_bridge_ring.get()
msg = msg_pack[2]
sysid = msg.get_srcSystem()
@ -150,7 +145,7 @@ class mavlink_bridge(Node, mavlink_publisher):
this_component.emitParams['flightMode'] = msg # debug
# print("mav_type : ", msg.type) # debug
# print("get mode :", mavutil.mode_string_v10(msg)) # debug
print("get mode :", mavutil.mode_string_v10(msg)) # debug
# print("record mode :", this_component.emitParams['flightMode_mode']) # debug
elif msg_id == 30: # ATTITUDE 處理
@ -201,24 +196,45 @@ class mavlink_bridge(Node, mavlink_publisher):
# ====================== 分割線 =====================
class mavlink_object():
'''
本次改版的暫時記錄註解於此
1. mavlink_object 由於 Queue 的效能太差 會完全移除
其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現
multiplexingToSwap 會完全被移除代替方式下一條描述
2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流
(註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object)
因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移
並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出
3. mavlink_object 需要加上 state 去管理其狀態
4. mavlink_object 需要加上 target port 去管理寫出的目標
5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道
6. 基於第1,2 updateMultiplexingList 會被完全移除
7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object
8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring return_packet_ring 來取代 另外 swap_queues 會被完全移除
'''
# 定義 mavlink_object 的狀態
class MavlinkObjectState(Enum):
INIT = auto() # 初始化狀態
RUNNING = auto() # 運行中狀態
ERROR = auto() # 錯誤狀態
CLOSED = auto() # 已關閉狀態
class mavlink_object:
'''
每個 mavlink bus 都會有一個 mavlink_object
其中主要是 thread 做統計封包與分流
分流表的控制在三個 list 分別為
multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的
multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的
multiplexingToSwap : 這個 list 是用來分流到轉拋串流的
透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為
使用 asyncio 處理資料流 RingBuffer 來分配訊息
直接透過 socket 寫出
'''
mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) }
socket_num = 0 # 用來記錄目前的 socket 數量
def __new__(cls, *args, **kwargs):
# 每創建一個實例 就將其添加到 mavlinkObjects 列表中
# 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號
# 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號
# socket_id 從 0 開始
instance = super().__new__(cls)
socket_id = 0
for k in cls.mavlinkObjects.keys():
@ -232,183 +248,389 @@ class mavlink_object():
return instance
def __init__(self, socket):
# 登入所需的 socket
self.mavlink_socket = socket
# 這邊變數是執行的時候被使用的 不要直接寫入它
self._multiplexingList = []
# 存放要發送的訊息的 queue 或稱 buffer
self.output_buffer = queue.Queue()
if self.socket_id >= len(swap_queues):
swap_queues.append(self.output_buffer)
else:
swap_queues[self.socket_id] = self.output_buffer
# 存放目前是否分流到固定串流橋接器和回傳封包處理器的標誌 # 都是要做的 把這個判斷拿掉
# self.send_to_bridge = True
# self.send_to_return = False
# 用於主線程發送消息的緩衝區
self.outgoing_msg_lock = threading.Lock()
self.outgoing_msgs = []
# 記錄訊息過濾類型 (可選)
self.bridge_msg_types = [0] # 默認只處理 HEARTBEAT
self.return_msg_types = []
# 目標端口
self.target_sockets = []
# 關聯到全域變數
global mavlink_systems
self.mavlink_systems = mavlink_systems
# 這三個 list 用來分配不同的訊息到不同的 queue
self.multiplexingToAnalysis = [
0, # HEARTBEAT # 挺必要的項目
# 24, # GPS_RAW_INT
# 30, # ATTITUDE
# 33, # GLOBAL_POSITION_INT
# 74, # VFR_HUD
# 147, # BATTERY_STATUS
]
self.multiplexingToReturn = []
self.multiplexingToSwap = [
[] for _ in range(len(swap_queues))
]
# 刷新其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) <= self.socket_id):
object.multiplexingToSwap.append([])
object.updateMultiplexingList()
logger.info('mavlink_object instance {} created'.format(self.socket_id))
# 物件變數
self.task = None # Task reference
self.dirtyDataCount = 0 # 髒資料計數器
self.dirtyDataMax = 10 # 髒資料容許閾值
self.state = MavlinkObjectState.INIT
logger.info(f'mavlink_object instance {self.socket_id} created')
def __del__(self):
# 停下自己的 thread
if self.mavlink_socket != None:
self.mavlink_socket.close()
# 停止 asyncio task
self.stop()
# 移除其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) > self.socket_id):
object.multiplexingToSwap[self.socket_id] = []
object.updateMultiplexingList()
# 移除自己的 swap_queues
swap_queues[self.socket_id] = None
# 關閉 socket
if hasattr(self, 'mavlink_socket') and self.mavlink_socket:
try:
self.mavlink_socket.close()
except:
pass
# 處理 class 的 instance 記錄
self.socket_num -= 1
self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance
if hasattr(self.__class__, 'socket_num'):
self.__class__.socket_num -= 1
# logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替
# print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
if hasattr(self.__class__, 'mavlinkObjects') and hasattr(self, 'socket_id'):
if self.socket_id in self.__class__.mavlinkObjects:
self.__class__.mavlinkObjects.pop(self.socket_id)
# 這段不知道怎麼了 反正會一直讓 logger ERROR 我先關掉
# try:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
# logger.info(f'mavlink_object instance {self.socket_id} deleted')
# except Exception as e:
# print(f"Error logging in __del__: {e}")
# if 'logger' in globals() and logger:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
def run(self):
# TODO 檢查 socket 是否有連線
self.thread = threading.Thread(target=self._run_thread)
self.running = self.updateMultiplexingList()
self.thread.start()
def start(self):
"""啟動 mavlink_object 處理循環"""
if self.state == MavlinkObjectState.RUNNING:
logger.warning(f"mavlink_object {self.socket_id} is already running")
return
self.state = MavlinkObjectState.RUNNING
# 實際的啟動會由 async_io_manager 處理
def stop(self):
self.running = False
"""停止 mavlink_object 處理循環"""
if self.state in (MavlinkObjectState.CLOSED, MavlinkObjectState.ERROR):
return
def _run_thread(self):
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
start_time = time.time()
while self.running:
timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差
self.state = MavlinkObjectState.CLOSED
if self.task:
if not self.task.done():
self.task.cancel()
# 處理接收到的封包
async def process_data(self):
"""處理 mavlink 數據的主要 asyncio 協程"""
logger.info(f'Start of mavlink_object.process_data id: {self.socket_id}')
while self.state == MavlinkObjectState.RUNNING:
timestamp = time.time()
try:
# 處理接收到的封包
mavlinkMsg = self.mavlink_socket.recv_msg()
except Exception as e:
logger.critical(f"Receiving data not mavlink format. Object Delete.")
logger.critical(f"Receiving data not mavlink format. Object Delete.")
print(f"[mavlinkObject.py] Error receiving mavlink data: {e}")
print(mavlinkMsg)
self.running = False
break
logger.warning(f"Error in mavlink_object.process_data for id {self.socket_id}: {e}")
self.dirtyDataCount += 1
if self.dirtyDataCount >= self.dirtyDataMax:
logger.error(f"Too many dirty data received in mavlink_object {self.socket_id}, entering ERROR state")
self.state = MavlinkObjectState.ERROR
# 不直接退出,嘗試容忍髒數據
await asyncio.sleep(0.01) # 短暫暫停
continue
if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega. etc...'
try:
if mavlinkMsg:
# 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間
sysid = mavlinkMsg.get_srcSystem()
compid = mavlinkMsg.get_srcComponent()
if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息
# mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型
mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp)
# break # Debug function can put here you can see the input data from mavlink
if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息
self.mavlink_systems[sysid].updateComponentPacketCount(
compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) #TODO 這邊怪怪的 好像要再檢查
# 將訊息依照 multiplexing list 分發到不同的 queue
for i in range(len(self._multiplexingList)):
if (self._multiplexingList[i] == []):
continue
elif (mavlinkMsg.get_msgId() in self._multiplexingList[i]) or (self._multiplexingList[i][0] == -1):
if i == 0:
fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg))
elif i == 1:
return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg))
else:
_queue = swap_queues[i-2]
# _queue.put((self.socket_id,timestamp,mavlinkMsg)) # 測試看看 也許不需要別的資訊 只需要封包
_queue.put(mavlinkMsg)
# 分發訊息到 RingBuffer
msg_id = mavlinkMsg.get_msgId()
# if self.send_to_bridge and (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types):
if (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types):
stream_bridge_ring.put((self.socket_id, timestamp, mavlinkMsg))
# if self.send_to_return and (msg_id in self.return_msg_types or -1 in self.return_msg_types):
if (msg_id in self.return_msg_types or -1 in self.return_msg_types):
return_packet_ring.put((self.socket_id, timestamp, mavlinkMsg))
# 處理要送出的封包
# 如果 有資料在 output_buffer 中則將其取出並發送
# 沒有就略過發送
# 將接收到的訊息轉發給所有目標端口
for target_port in self.target_sockets:
if target_port != self.socket_id and target_port in self.mavlinkObjects:
target_obj = self.mavlinkObjects[target_port]
if target_obj.state == MavlinkObjectState.RUNNING:
try:
mavlinkMsg_send = self.output_buffer.get(block=False)
except queue.Empty:
mavlinkMsg_send = None
# except Exception as e:
# logger.error(f"Error getting data from output_buffer: {e}")
# mavlinkMsg_send = None
target_obj.mavlink_socket.write(mavlinkMsg.get_msgbuf())
except Exception as e:
logger.error(f"Error forwarding message to port {target_port}: {e}")
if mavlinkMsg_send:
# self.mavlink_socket.mav.send(mavlinkMsg_send)
self.mavlink_socket.write(mavlinkMsg_send.get_msgbuf()) # 這邊會將封包寫入 socket 中
with self.outgoing_msg_lock:
if self.outgoing_messages and (send_msg := self.outgoing_messages.pop(0)):
try:
self.mavlink_socket.write(send_msg)
except Exception as e:
logger.error(f"mavlink_object {self.socket_id} failed to send message: {e}")
# thread 結束
logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id))
# 這邊的重點不是延遲 而是透過 await 讓出 event loop 的控制權
await asyncio.sleep(0.001)
def updateMultiplexingList(self):
'''
更新 multiplexing list 並做簡單的檢查
'''
# 檢查 multiplexingToAnalysis 與 multiplexingToReturn 是否有 -1 值
check = (-1 in self.multiplexingToAnalysis) or (-1 in self.multiplexingToReturn)
if check:
logger.error('MultiplexingToAnalysis or MultiplexingToReturn NOT all type spilt. socket id : {}'.format(self.socket_id))
except asyncio.CancelledError:
logger.info(f'mavlink_object.process_data for id {self.socket_id} was cancelled')
break
except Exception as e:
await asyncio.sleep(0.01) # 短暫暫停避免CPU過載
logger.info(f'End of mavlink_object.process_data id: {self.socket_id}')
self.state = MavlinkObjectState.CLOSED
def add_target_socket(self, target_socket_id):
"""添加一個目標端口用於轉發"""
if target_socket_id not in self.target_sockets and target_socket_id != self.socket_id:
self.target_sockets.append(target_socket_id)
logger.info(f"mavlink_object Added target port {target_socket_id} to mavlink_object {self.socket_id}")
return True
return False
# 檢查 multiplexingToSwap 與 swap_queues 的長度是否一致 而且 swap_queues 的長度不能為 0
check = len(self.multiplexingToSwap) != len(swap_queues) or len(swap_queues) == 0
if check:
logger.error('Multiplexing Queue not fit List , Please check the list. socket id : {}'.format(self.socket_id))
def remove_target_socket(self, target_socket_id):
"""移除目標端口"""
if target_socket_id in self.target_sockets:
self.target_sockets.remove(target_socket_id)
logger.info(f"mavlink_object Removed target port {target_socket_id} from mavlink_object {self.socket_id}")
return True
return False
# 對應到自己的 multiplexingToSwap 必需為空 避免對自己迴圈轉拋
try:
self.multiplexingToSwap[self.socket_id] = []
except IndexError:
logger.error('Multiplexing List of self socket id should be void. socket id : {}'.format(self.socket_id))
def set_bridge_message_types(self, msg_types):
"""設置需要分流到橋接器的訊息類型"""
if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types):
self.bridge_msg_types = msg_types
return True
logger.error(f"Invalid bridge message types: {msg_types}")
return False
# 組合 multiplexing list
multiL_tmp = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToSwap
def set_return_message_types(self, msg_types):
"""設置需要分流到回傳處理器的訊息類型"""
if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types):
self.return_msg_types = msg_types
return True
logger.error(f"Invalid return message types: {msg_types}")
return False
# 檢查 multiplexing list 格式是否有錯誤 # 全部都要是 list 每個 list 裡面都要是 int
check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in multiL_tmp)
if not check:
logger.error('Multiplexing List Format Error, Please check the list. socket id : {}'.format(self.socket_id))
return False # 若有錯誤則回傳 False
def send_message(self, message_bytes):
"""
從主線程向此 mavlink_object socket 發送數據
將數據添加到簡單的列表中 asyncio 任務處理
# 更新 multiplexing list
self._multiplexingList = multiL_tmp
Args:
message_bytes: 要發送的 mavlink 消息的字節數據
Returns:
bool: 是否成功添加消息到列表
"""
if self.state != MavlinkObjectState.RUNNING:
logger.warning(f"Cannot send message: mavlink_object {self.socket_id} is not running")
return False
try:
# 使用鎖保護共享資源訪問
with self.outgoing_messages_lock:
self.outgoing_messages.append(message_bytes)
return True
except Exception as e:
logger.error(f"Error queueing message for mavlink_object {self.socket_id}: {e}")
return False
# ====================== 分割線 =====================
# def enable_bridge(self, enable=True):
# """啟用或禁用橋接器分流"""
# self.send_to_bridge = enable
# def enable_return(self, enable=True):
# """啟用或禁用回傳處理器分流"""
# self.send_to_return = enable
class async_io_manager:
"""
管理所有 mavlink_object 實例的 asyncio 任務
提供單一線程來處理所有 mavlink 通道的數據
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
cls._instance = super(async_io_manager, cls).__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'initialized'):
self.initialized = True
self.loop = None
self.main_task = None
self.running = False
self.managed_objects = {} # socket_id: task
def start(self):
"""啟動 async_io_manager 和其管理的所有 mavlink_object"""
if self.running:
logger.warning("async_io_manager already running")
return
self.running = True
self.thread = threading.Thread(target=self._run_event_loop)
self.thread.start()
logger.info("async_io_manager started")
def stop(self):
"""停止 async_io_manager 和其管理的所有 mavlink_object"""
if not self.running:
return
self.running = False
if self.loop:
# 取消所有任務
for socket_id in list(self.managed_objects.keys()):
self.remove_mavlink_object(socket_id)
# 停止事件循環
if self.main_task and not self.main_task.done():
asyncio.run_coroutine_threadsafe(self._shutdown(), self.loop)
# 等待線程結束
if hasattr(self, 'thread') and self.thread.is_alive():
self.thread.join(timeout=5.0)
logger.info("async_io_manager stopped")
async def _shutdown(self):
"""正確關閉事件循環的協程"""
tasks = [task for task in asyncio.all_tasks(self.loop) if task is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
def _run_event_loop(self):
"""在獨立線程中運行事件循環"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
try:
self.main_task = self.loop.create_task(self._main_task())
self.loop.run_forever()
except Exception as e:
logger.error(f"Error in async_io_manager event loop: {e}")
finally:
self.loop.close()
self.loop = None
self.running = False
logger.info("async_io_manager event loop ended")
async def _main_task(self):
"""主任務協程,用於監視和管理其他任務"""
logger.info("async_io_manager main task started")
# 整合到 ros2 之後的程式進入點
def main_node():
try:
while self.running:
# 這邊就是一個無窮迴圈 確保 async_io_manager 物件續存
# 每秒鐘檢查並移除已完成或已取消的任務
await asyncio.sleep(1.0)
for socket_id in list(self.managed_objects.keys()):
task = self.managed_objects[socket_id]
if task.done():
try:
exc = task.exception()
if exc:
logger.error(f"Task for mavlink_object {socket_id} raised exception: {exc}")
except (asyncio.CancelledError, asyncio.InvalidStateError):
pass
if socket_id in mavlink_object.mavlinkObjects:
print(f"this is manager, i make socket_id {socket_id} closed, he is now in {mavlink_object.mavlinkObjects[socket_id].state} state.") # debug
mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED
del self.managed_objects[socket_id]
except asyncio.CancelledError:
logger.info("async_io_manager main task was cancelled")
except Exception as e:
logger.error(f"Error in async_io_manager main task: {e}")
logger.info("async_io_manager main task ended")
def add_mavlink_object(self, mavlink_obj):
"""添加一個 mavlink_object 實例到 async_io_manager 並啟動其處理任務"""
if not self.running:
logger.error("Cannot add mavlink_object: async_io_manager is not running")
return False
if not isinstance(mavlink_obj, mavlink_object):
logger.error(f"Invalid mavlink_object: {mavlink_obj}")
return False
socket_id = mavlink_obj.socket_id
if socket_id in self.managed_objects:
logger.warning(f"mavlink_object {socket_id} already managed")
return False
# 創建並啟動任務
if self.loop:
# task = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop).result()
future = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop)
# print(f"Task created for mavlink_object {socket_id}: {future1}") # debug
# task = future1.result()
self.managed_objects[socket_id] = future
mavlink_obj.task = future
mavlink_obj.state = MavlinkObjectState.RUNNING
logger.info(f"Added mavlink_object {socket_id} to async_io_manager")
return True
return False
def remove_mavlink_object(self, socket_id):
"""從 async_io_manager 中移除一個 mavlink_object 實例並取消其處理任務"""
if socket_id not in self.managed_objects:
logger.warning(f"mavlink_object {socket_id} not managed by async_io_manager")
return False
# 取消任務
task = self.managed_objects[socket_id]
if not task.done():
task.cancel()
# 從管理列表中移除
del self.managed_objects[socket_id]
# 更新 mavlink_object 狀態
if socket_id in mavlink_object.mavlinkObjects:
mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED
logger.info(f"Removed mavlink_object {socket_id} from async_io_manager")
return True
def get_managed_objects(self):
"""獲取所有被管理的對象列表"""
return list(self.managed_objects.keys())
# ====================== 分割線 =====================
if __name__ == '__main__':
pass

@ -9,14 +9,16 @@
publisher topic name 命名規則為 <前綴詞>/s<sysid>/<具體 topic>
'''
# ROS2 的 import
import std_msgs.msg
import sensor_msgs.msg
import geometry_msgs.msg
import mavros_msgs.msg
from theLogger import setup_logger
import math
# 自定義的 import
from .theLogger import setup_logger
logger = setup_logger("mavlinkPublish.py")

@ -0,0 +1,231 @@
# import array
import threading
import ctypes
from typing import Any, Optional, Tuple
class RingBuffer:
"""
高效能無鎖環形緩衝區使用原子操作避免鎖定
用於在不同執行緒間高效傳遞資料
"""
# 緩衝區計數器,用於自動分配 buffer_id
_buffer_counter = 0
_counter_lock = threading.Lock()
def __init__(self, capacity: int = 256, buffer_id: int = None):
"""
初始化環形緩衝區
Args:
capacity: 緩衝區容量 (必須是 2 的次方)
buffer_id: 緩衝區 ID如果為 None 則自動分配
"""
# 確保容量是 2 的次方,便於使用位運算取模
if capacity & (capacity - 1) != 0:
# 找到大於等於 capacity 的最小 2 的次方
capacity = 1 << (capacity - 1).bit_length()
# 分配緩衝區 ID
if buffer_id is None:
with RingBuffer._counter_lock:
buffer_id = RingBuffer._buffer_counter
RingBuffer._buffer_counter += 1
self.buffer_id = buffer_id
self.capacity = capacity
self.mask = capacity - 1 # 用於快速取模
self.buffer = [None] * capacity
# 原子計數器作為讀寫指標
self.write_index = ctypes.c_uint64(0)
self.read_index = ctypes.c_uint64(0)
# 用於檢測上次操作的執行緒 ID
self._last_read_thread = None
self._last_write_thread = None
# 添加同時讀寫統計
self.concurrent_write_count = ctypes.c_uint64(0) # 同時寫入計數
self.concurrent_read_count = ctypes.c_uint64(0) # 同時讀取計數
self.total_write_count = ctypes.c_uint64(0) # 總寫入操作計數
self.total_read_count = ctypes.c_uint64(0) # 總讀取操作計數
self.overflow_count = ctypes.c_uint64(0) # 緩衝區溢出次數
# 記錄各執行緒的操作次數
self.thread_write_counts = {} # {thread_id: count}
self.thread_read_counts = {} # {thread_id: count}
# 用於保護統計數據的鎖(僅用於統計,不影響主要讀寫操作)
self._stats_lock = threading.Lock()
def put(self, item: Any) -> bool:
"""
將項目存入緩衝區
Args:
item: 要存入的項目
Returns:
bool: 成功寫入返回 True緩衝區已滿返回 False
"""
# 更新寫入統計
self.total_write_count.value += 1
# 檢測上次寫入的執行緒
current_thread = threading.get_ident()
# 記錄當前執行緒寫入次數
with self._stats_lock:
self.thread_write_counts[current_thread] = self.thread_write_counts.get(current_thread, 0) + 1
# 檢測是否為不同執行緒同時寫入
if self._last_write_thread is not None and current_thread != self._last_write_thread:
self.concurrent_write_count.value += 1
self._last_write_thread = current_thread
# 原子獲取當前寫入位置
current = self.write_index.value
next_pos = (current + 1) & self.mask
# 檢查緩衝區是否已滿
if next_pos == self.read_index.value:
self.overflow_count.value += 1
return False
# 寫入資料並原子更新寫入位置
self.buffer[current] = item
self.write_index.value = next_pos
return True
def get(self) -> Optional[Any]:
"""
從緩衝區讀取項目
Returns:
項目或 None如果緩衝區為空
"""
# 更新讀取統計
self.total_read_count.value += 1
# 檢測上次讀取的執行緒
current_thread = threading.get_ident()
# 記錄當前執行緒讀取次數
with self._stats_lock:
self.thread_read_counts[current_thread] = self.thread_read_counts.get(current_thread, 0) + 1
# 檢測是否為不同執行緒同時讀取
if self._last_read_thread is not None and current_thread != self._last_read_thread:
self.concurrent_read_count.value += 1
self._last_read_thread = current_thread
# 檢查緩衝區是否為空
if self.read_index.value == self.write_index.value:
return None
# 原子獲取當前讀取位置並讀取資料
current = self.read_index.value
item = self.buffer[current]
# 原子更新讀取位置
self.read_index.value = (current + 1) & self.mask
return item
def get_all(self) -> list:
"""
獲取緩衝區中的所有項目
Returns:
list: 所有可用項目的列表
"""
items = []
while True:
item = self.get()
if item is None:
break
items.append(item)
return items
def size(self) -> int:
# 返回目前緩衝區內項目數量
return (self.write_index.value - self.read_index.value) & self.mask
def is_empty(self) -> bool:
# 檢查緩衝區是否為空
return self.read_index.value == self.write_index.value
def is_full(self) -> bool:
# 檢查緩衝區是否已滿
return ((self.write_index.value + 1) & self.mask) == self.read_index.value
def clear(self) -> None:
"""清空緩衝區"""
self.read_index.value = self.write_index.value
def get_stats(self) -> dict:
"""
獲取緩衝區統計資訊
Returns:
dict: 包含各種統計數據的字典
"""
with self._stats_lock:
stats = {
"buffer_id": self.buffer_id,
"capacity": self.capacity,
"current_size": self.size(),
"is_full": self.is_full(),
"is_empty": self.is_empty(),
"total_writes": self.total_write_count.value,
"total_reads": self.total_read_count.value,
"concurrent_writes": self.concurrent_write_count.value,
"concurrent_reads": self.concurrent_read_count.value,
"overflow_count": self.overflow_count.value,
"write_threads": len(self.thread_write_counts),
"read_threads": len(self.thread_read_counts),
"concurrent_write_ratio": self.concurrent_write_count.value / max(1, self.total_write_count.value),
"concurrent_read_ratio": self.concurrent_read_count.value / max(1, self.total_read_count.value),
"top_writer_threads": sorted(self.thread_write_counts.items(), key=lambda x: x[1], reverse=True)[:3],
"top_reader_threads": sorted(self.thread_read_counts.items(), key=lambda x: x[1], reverse=True)[:3],
}
return stats
def print_stats(self) -> None:
"""輸出當前緩衝區統計資訊"""
stats = self.get_stats()
print(f"\n=== RingBuffer #{stats['buffer_id']} Statistics ===")
print(f"Capacity: {stats['capacity']}, Current Size: {stats['current_size']}")
print(f"Total Write Operations: {stats['total_writes']}")
print(f"Total Read Operations: {stats['total_reads']}")
print(f"Concurrent Write Events: {stats['concurrent_writes']} ({stats['concurrent_write_ratio']:.2%})")
print(f"Concurrent Read Events: {stats['concurrent_reads']} ({stats['concurrent_read_ratio']:.2%})")
print(f"Buffer Overflow Count: {stats['overflow_count']}")
print(f"Writing Threads: {stats['write_threads']}")
print(f"Reading Threads: {stats['read_threads']}")
print("Top Writer Threads:")
for thread_id, count in stats['top_writer_threads']:
print(f" Thread {thread_id}: {count} writes")
print("Top Reader Threads:")
for thread_id, count in stats['top_reader_threads']:
print(f" Thread {thread_id}: {count} reads")
print("=============================\n")
def reset_stats(self) -> None:
"""重置所有統計數據"""
with self._stats_lock:
self.concurrent_write_count.value = 0
self.concurrent_read_count.value = 0
self.total_write_count.value = 0
self.total_read_count.value = 0
self.overflow_count.value = 0
self.thread_write_counts.clear()
self.thread_read_counts.clear()
def __str__(self) -> str:
"""返回緩衝區的字符串表示"""
return f"RingBuffer(id={self.buffer_id}, capacity={self.capacity}, size={self.size()})"

@ -0,0 +1,340 @@
import asyncio
import serial_asyncio
# 附加驗證功能
import os
import sys
import serial
import signal
# XBee 模組
from xbee.frame import APIFrame
# =========================
SERIAL_PORT = '/dev/ttyACM0' # serial port
SERIAL_BAUDRATE = 57600 # serial baudrate
UDP_REMOTE_IP = '127.0.0.1' # UDP Target IP
UDP_REMOTE_PORT = 14550 # UDP Target port
# 測試用 只會吃一次資料
DEBUG_MODE = False
# =========================
def check_serial_port():
"""檢查串口是否存在與可用"""
# 檢查設備是否存在
if not os.path.exists(SERIAL_PORT):
print(f"錯誤:串口設備 {SERIAL_PORT} 不存在")
return False
# 檢查是否有權限訪問設備
try:
os.access(SERIAL_PORT, os.R_OK | os.W_OK)
except Exception as e:
print(f"錯誤:無法訪問串口設備 {SERIAL_PORT}{str(e)}")
return False
# 檢查是否被占用
try:
# 嘗試打開串口
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
ser.close() # 打開成功後立即關閉
return True
except serial.SerialException as e:
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
return False
except Exception as e:
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
return False
# =========================
class SerialToUDP(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發
def __init__(self, udp_protocol):
self.udp_protocol = udp_protocol
self.first_data = True # 標記是否為第一次收到資料
self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"Serial connection established on {SERIAL_PORT}")
## =====================================
# Serial 收到資料,轉發到 UDP
def data_received(self, data):
# 在 DEBUG 模式下,如果已經處理過數據,則直接返回 # debug
if DEBUG_MODE and self.has_processed:
return
# 標記首次收到資料
if hasattr(self.udp_protocol, 'send_udp'):
if self.first_data:
print(f"First data received from serial, forwarding to UDP: {data[:20]}...")
self.first_data = False
# 轉發數據
self.udp_protocol.send_udp(data)
if DEBUG_MODE: # 測試模式用 # debug
self.has_processed = True
print("[DEBUG] SerialToUDP Mark")
def write_to_serial(self, data):
# 在資料透過 Serial 發送之前進行處理
processed_data = self.encapsulate_data(data)
# 處理失敗就不發送
if processed_data not None:
self.transport.write(processed_data)
def encapsulate_data(self, data):
"""
將數據封裝為 XBee API 傳輸幀
使用 XBee API 格式封裝數據:
- 傳輸請求幀 (0x10)
- 使用廣播地址
- 添加適當的頭部和校驗和
"""
## 方法一
# 設置幀參數
frame_id = 0x01 # 幀識別碼,用於確認
frame_type = 0x10 # 幀類型:傳輸請求
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 64位目標地址 (廣播)
dest_addr16 = b'\xFF\xFE' # 16位目標地址 (未知/廣播)
broadcast_radius = 0x00 # 廣播半徑 (0 = 最大)
options = 0x00 # 傳輸選項
# 構建幀數據部分
frame_data = bytearray()
frame_data.append(frame_type) # 添加幀類型
frame_data.append(frame_id) # 添加幀 ID
frame_data.extend(dest_addr64) # 添加 64 位目標地址
frame_data.extend(dest_addr16) # 添加 16 位目標地址
frame_data.append(broadcast_radius) # 添加廣播半徑
frame_data.append(options) # 添加選項
frame_data.extend(data) # 添加實際數據內容
# 計算校驗和 (0xFF 減去所有字節的總和的最低 8 位)
checksum = 0xFF - (sum(frame_data) & 0xFF)
# 構建完整的 API 幀
# 起始分隔符 + 長度 (兩字節) + 幀數據 + 校驗和
complete_frame = bytearray()
complete_frame.append(0x7E) # 添加起始分隔符
complete_frame.extend(struct.pack(">H", len(frame_data))) # 添加長度 (高位優先)
complete_frame.extend(frame_data) # 添加幀數據
complete_frame.append(checksum) # 添加校驗和
return bytes(complete_frame)
## 方法二
# frame_id=0x01
# frame_type = 0x10
# dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
# dest_addr16 = b'\xFF\xFE'
# broadcast_radius = 0x00
# options = 0x00
# frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
# frame += dest_addr64 + dest_addr16
# frame += struct.pack(">BB", broadcast_radius, options) + data
# checksum = 0xFF - (sum(frame) & 0xFF)
# return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發
def __init__(self):
self.serial_transport = None
self.transport = None
self.remote_addr = None # 儲存動態獲取的遠程地址
self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
print("UDP transport ready. Waiting for serial data before sending...")
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
## =====================================
# UDP 收到資料
def datagram_received(self, data, addr):
# 儲存對方的地址(這樣就能向同一個來源回傳數據)
# self.remote_addr = addr
# print(f"Received UDP data from {addr}, setting as remote address")
# 在 DEBUG 模式下,如果已經處理過數據,則直接返回
if DEBUG_MODE and self.has_processed:
return
if self.serial_transport:
self.serial_transport.write_to_serial(data)
if DEBUG_MODE: # 測試模式用
self.has_processed = True
print("[DEBUG] UDPHandler Mark")
def send_udp(self, data):
# 發送資料到 UDP
# if self.transport:
# # 如果有已知的回應地址,使用該地址
# if self.remote_addr:
# self.transport.sendto(data, self.remote_addr)
# # print(f"Sending to dynamic address: {self.remote_addr}")
# else:
# # 否則使用預設地址
# self.transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
# print(f"Sending first UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}")
if self.transport:
# 只有第一次或地址改變時才顯示
# if not hasattr(self, '_last_sent_addr') or self._last_sent_addr != (UDP_REMOTE_IP, UDP_REMOTE_PORT):
# print(f"Sending UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}")
# self._last_sent_addr = (UDP_REMOTE_IP, UDP_REMOTE_PORT)
# 在透過 UDP 發送數據之前進行解封裝
decoded_data = self.decapsulate_data(data)
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def decapsulate_data(self, data):
# 這裡可以根據需要進行數據解封裝
## 方法一
try:
# 創建一個 API 幀處理器
api_frame = APIFrame(escaped=True)
# 嘗試解析數據
api_frame.fill(data)
# 如果數據不完整,直接返回原始數據
if not api_frame.is_complete():
return data
# 解析幀內容
parsed_data = api_frame.parse()
# 提取有用數據
if parsed_data:
frame_data = parsed_data.get('rf_data', None)
if frame_data:
return frame_data
return data
## 方法二 - 手動解析
# try:
# # XBee API 幀格式:
# # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節)
# # 檢查幀起始符 (0x7E)
# if not data or len(data) < 5 or data[0] != 0x7E:
# return data
# # 獲取數據長度 (不包括校驗和)
# length = (data[1] << 8) + data[2]
# # 檢查幀完整性
# if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和
# return data
# # 提取API標識符和數據
# frame_type = data[3]
# frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中
# # 根據不同的幀類型進行處理
# if frame_type == 0x90: # 例如,這是"接收數據包"類型
# # 提取實際有效載荷
# # 對於接收數據包,格式通常是:
# # API標識符(1) + 64位地址(8) + 16位地址(2) + 選項(1) + 數據
# if len(frame_data) >= 12: # 確保數據長度足夠
# payload = frame_data[11:] # 前11字節是地址和選項
# return payload
# return data
# 如果無法提取 則回傳 None
except Exception as e:
print(f"手動解析 XBee 數據錯誤: {e}")
return None
async def main():
# 先檢查串口是否可用
if not check_serial_port():
print("程式終止:串口檢查失敗")
return
loop = asyncio.get_running_loop()
# 設置終止處理
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(shutdown(loop))
)
# 建立單一 UDP 端點處理收發
udp_handler = UDPHandler()
# 建立 UDP 傳輸,不指定接收端口,讓系統自動分配
try:
udp_transport, protocol = await loop.create_datagram_endpoint(
lambda: udp_handler,
local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口
)
except Exception as e:
print(f"無法創建 UDP 端點:{str(e)}")
return
# 獲取系統分配的本地端口
sock = udp_transport.get_extra_info('socket')
local_addr = sock.getsockname()
print(f"UDP listening on {local_addr[0]}:{local_addr[1]}")
# 建立 Serial 傳輸,將 UDP 處理器傳給它
try:
serial_proto = SerialToUDP(udp_handler)
_, serial_transport = await serial_asyncio.create_serial_connection(
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
except Exception as e:
print(f"無法建立串口連接:{str(e)}")
udp_transport.close()
return
print(f"Waiting for data from serial port to initiate UDP communication...")
# 保持運行
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
async def shutdown(loop):
"""關閉程序"""
print("Shutting down...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程式被使用者中斷")
except Exception as e:
print(f"程式執行錯誤:{str(e)}")

@ -0,0 +1,448 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
from fc_network_adapter import mavlinkObject as mo
from fc_network_adapter import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 21
running_time = 30
print('test_item : ', test_item)
'''
測試項 個位數 表示分離的功能
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
'''
if test_item == 10:
# 需要開啟一個 ardupilot 的模擬器
# 測試 mavlink_object 放入 ring buffer 的應用
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
start_time = time.time()
while (time.time() - start_time) < running_time:
items_a = mo.stream_bridge_ring.get_all()
items_b = mo.return_packet_ring.get_all()
try:
print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}")
except IndexError:
print("stream_bridge_ring is empty")
try:
print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}")
except IndexError:
print("return_packet_ring is empty")
time.sleep(1)
manager.stop()
print('<=== End of Program')
elif test_item == 11:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
time.sleep(3)
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
if (time.time() - show_time) >= 2:
# print("mark B")
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
manager.stop()
analyzer.stop()
print('<=== End of Program')
elif test_item == 12:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試 mavlink object 作為交換器功能的代碼
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket_in1 = mavutil.mavlink_connection(connection_string)
mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1)
connection_string="udp:127.0.0.1:14561"
mavlink_socket_in2 = mavutil.mavlink_connection(connection_string)
mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2)
# 初始化輸出通道
connection_string="udpout:127.0.0.1:14550"
mavlink_socket_out = mavutil.mavlink_connection(connection_string)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
manager.add_mavlink_object(mavlink_object_out)
manager.add_mavlink_object(mavlink_object_in1)
manager.add_mavlink_object(mavlink_object_in2)
mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id)
mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id)
start_time = time.time()
while (time.time() - start_time) < running_time:
time.sleep(1)
manager.stop()
print('<=== End of Program')
# elif test_item == 20:
# # 這邊測試 node 生成 topic 的功能
# # 利用 空的通道 發出假的 heartbeat 封包
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# from pymavlink.dialects.v20 import common as mavlink2
# sysid = 1
# compid = 1
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# mav = mavlink2.MAVLink(None)
# mav.srcSystem = sysid
# mav.srcComponent = compid
# # 這是一個 heartbeat 封包
# fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
# type=mavlink2.MAV_TYPE_QUADROTOR,
# autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
# base_mode=3,
# custom_mode=0,
# system_status=0,
# mavlink_version=3
# )
# mavlink_bytes = fake_heartbeat.pack(mav)
# fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
# print(analyzer.mavlink_systems)
# mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
# time.sleep(0.1)
# print(analyzer.mavlink_systems)
# # ROS2 初始化
# analyzer._init_node()
# # 創建 ROS2 topic
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
# print("topic created")
# time.sleep(5)
# # 丟出 topic
# analyzer.emit_info()
# # 結束程式
# analyzer.running = False
# analyzer.destroy_node()
# rclpy.shutdown()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
# 初始化 rclpy 才能使用 node
rclpy.init()
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
time.sleep(0.5) # 系統 Setup 完成
# 創建通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
manager.add_mavlink_object(mavlink_object3)
print('=== waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(1)
except KeyboardInterrupt:
break
# 程式結束
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
manager.stop()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
# elif test_item == 22:
# # 需要開啟一個 ardupilot 的模擬器 與 GCS
# # 這邊是測試代碼 引入 rclpy 來測試 node 的運行
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 初始化兩個通道
# connection_string_in="udp:127.0.0.1:15551"
# mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
# mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
# mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
# connection_string_out="udpout:127.0.0.1:14553"
# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
# mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
# mavlink_object_out.multiplexingToAnalysis = []
# # 讓兩個通道互相傳輸
# mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# # 啟動通道
# mavlink_object_in.run()
# mavlink_object_out.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# print('目前所有的系統 : ')
# for sysid in analyzer.mavlink_systems:
# print(analyzer.mavlink_systems[sysid])
# compid = 1
# sysid = 1
# show_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# show_time2 = time.time()
# while time.time() - start_time < running_time:
# if (time.time() - show_time2) >= 1:
# analyzer.emit_info() # 這邊是測試 node 的運行
# # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
# fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
# sss = mavutil.mode_string_v10(fmsg)
# # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
# print("目前的飛行模式 : {}".format(sss))
# show_time2 = time.time()
# # if (time.time() - show_time) >= 2:
# # show_time = time.time()
# # for sysid in analyzer.mavlink_systems:
# # for compid in analyzer.mavlink_systems[sysid].components:
# # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# # print("===================")
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object_in.stop()
# mavlink_object_in.thread.join()
# mavlink_socket_in.close()
# mavlink_object_out.stop()
# mavlink_object_out.thread.join()
# mavlink_socket_out.close()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
# elif test_item == 51:
# # 晉凱的測試項目
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 創建通道
# connection_string="udp:127.0.0.1:15551"
# mavlink_socket3 = mavutil.mavlink_connection(connection_string)
# mavlink_object3 = mo.mavlink_object(mavlink_socket3)
# # 設定通道流動
# mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
# mavlink_object3.multiplexingToReturn = [] #
# # mavlink_object3.multiplexingToSwap = [] #
# # 啟動通道
# mavlink_object3.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# compid = 1
# sysid = 1
# start_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
# end_time = time.time()
# print(f"Execution time for create all topic: {end_time - start_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# while time.time() - start_time < running_time:
# try:
# # rclpy.spin(analyzer)
# analyzer.emit_info() # 這邊是測試 node 的運行
# time.sleep(0.5)
# except KeyboardInterrupt:
# break
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object3.stop()
# mavlink_object3.thread.join()
# analyzer.stop()
# analyzer.thread.join()
# mavlink_socket3.close()
# print('<=== End of Program')

@ -0,0 +1,468 @@
#!/usr/bin/env python
"""
測試腳本用於測試 mavlinkObject.py 中的 mavlink_object async_io_manager 類別
"""
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import unittest
import time
import threading
import socket
import asyncio
from unittest.mock import MagicMock, patch
# 導入要測試的模組
from fc_network_adapter.mavlinkObject import (
mavlink_object,
async_io_manager,
MavlinkObjectState,
stream_bridge_ring,
return_packet_ring
)
class MockMavlinkSocket:
"""模擬 Mavlink Socket 的類別,用於測試"""
def __init__(self, test_data=None):
self.closed = False
self.test_data = test_data or []
self.data_index = 0
self.written_data = []
def recv_msg(self):
if not self.test_data or self.data_index >= len(self.test_data):
return None
data = self.test_data[self.data_index]
self.data_index += 1
return data
def write(self, data):
self.written_data.append(data)
def close(self):
self.closed = True
class MockMavlinkMessage:
"""模擬 Mavlink 訊息的類別,用於測試"""
def __init__(self, msg_id, sysid, compid, seq=0):
self.msg_id = msg_id
self.sysid = sysid
self.compid = compid
self.seq = seq
self.msg_buf = bytes([msg_id, sysid, compid, seq])
def get_msgId(self):
return self.msg_id
def get_srcSystem(self):
return self.sysid
def get_srcComponent(self):
return self.compid
def get_seq(self):
return self.seq
def get_msgbuf(self):
return self.msg_buf
def get_type(self):
return f"MSG_ID_{self.msg_id}"
class TestMavlinkObject(unittest.TestCase):
"""測試 mavlink_object 類別"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建測試訊息
self.heartbeat_msg = MockMavlinkMessage(msg_id=0, sysid=1, compid=1)
self.attitude_msg = MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
self.position_msg = MockMavlinkMessage(msg_id=32, sysid=1, compid=1)
# 創建模擬的 socket # 假的 Mavlink Socket
self.mock_socket = MockMavlinkSocket([
self.heartbeat_msg,
self.attitude_msg,
self.position_msg
])
# 創建測試對象
self.mavlink_obj = mavlink_object(self.mock_socket)
def test_initialization(self):
"""測試 mavlink_object 初始化是否正確"""
# print("Testing mavlink_object initialization")
self.assertEqual(self.mavlink_obj.socket_id, 0)
self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT)
self.assertEqual(len(self.mavlink_obj.target_sockets), 0)
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0])
def test_add_remove_target_socket(self):
"""測試添加和移除目標端口功能"""
# 添加目標端口
self.assertTrue(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
self.assertEqual(self.mavlink_obj.target_sockets[0], 1)
self.assertTrue(self.mavlink_obj.add_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
self.assertIn(2, self.mavlink_obj.target_sockets)
# 嘗試添加已存在的端口
self.assertFalse(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 嘗試添加自己的端口
self.assertFalse(self.mavlink_obj.add_target_socket(0))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 移除端口
self.assertTrue(self.mavlink_obj.remove_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
# 嘗試移除不存在的端口
self.assertFalse(self.mavlink_obj.remove_target_socket(2))
def test_set_message_types(self):
"""測試設置訊息類型功能"""
# 設置橋接器訊息類型
self.assertTrue(self.mavlink_obj.set_bridge_message_types([0, 30]))
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0, 30])
# 設置回傳處理器訊息類型
self.assertTrue(self.mavlink_obj.set_return_message_types([32]))
self.assertEqual(self.mavlink_obj.return_msg_types, [32])
# 測試無效的訊息類型
self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid"))
self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"]))
def test_send_message(self):
"""測試 send_message 功能"""
# 創建一個新的 mavlink_object 實例
mock_socket = MockMavlinkSocket()
mavlink_obj = mavlink_object(mock_socket)
# 準備測試數據
test_message1 = b"test_message_1"
test_message2 = b"test_message_2"
# 測試初始狀態
self.assertEqual(len(mock_socket.written_data), 0)
# 測試非運行狀態下發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 0)
# 啟動 manager
manager = async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象到 manager
manager.add_mavlink_object(mavlink_obj)
time.sleep(0.1) # 等待對象啟動
# 確認對象狀態
self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING)
# 測試發送消息
self.assertTrue(mavlink_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
# 確認消息已發送
self.assertEqual(len(mock_socket.written_data), 1)
self.assertEqual(mock_socket.written_data[0], test_message1)
# 測試連續發送多條消息
self.assertTrue(mavlink_obj.send_message(test_message2))
time.sleep(0.2) # 等待消息處理
# 確認兩條消息都已發送
self.assertEqual(len(mock_socket.written_data), 2)
self.assertEqual(mock_socket.written_data[1], test_message2)
# 模擬發送出錯的情況
class ErrorWriteSocket(MockMavlinkSocket):
def write(self, data):
raise Exception("Write error")
error_socket = ErrorWriteSocket()
error_obj = mavlink_object(error_socket)
manager.add_mavlink_object(error_obj)
time.sleep(0.1) # 等待對象啟動
# 發送消息到錯誤的 socket
self.assertTrue(error_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
# 即使寫入失敗send_message 應該也返回 True
# 因為消息已成功加入到佇列中,只是後續的實際發送失敗
# 停止 manager
manager.stop()
time.sleep(0.5) # 等待 manager 停止
# 測試對象已關閉後發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加
class TestAsyncIOManager(unittest.TestCase):
"""測試 async_io_manager 類別"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建 async_io_manager 實例
self.manager = async_io_manager()
# 模擬 mavlink 對象
self.mock_socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
self.mock_socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1)
])
self.mavlink_obj1 = mavlink_object(self.mock_socket1)
self.mavlink_obj2 = mavlink_object(self.mock_socket2)
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.stop()
def test_singleton_pattern(self):
"""測試 async_io_manager 的單例模式 就是不會產生兩個 magager"""
manager1 = async_io_manager()
manager2 = async_io_manager()
self.assertIs(manager1, manager2)
def test_start_stop(self):
"""測試 async_io_manager 的啟動和停止功能"""
# 啟動管理器
self.manager.start()
self.assertTrue(self.manager.running)
self.assertIsNotNone(self.manager.thread)
# 再次啟動應該沒有效果
old_thread = self.manager.thread
self.manager.start()
self.assertIs(self.manager.thread, old_thread)
# 停止管理器
self.manager.stop()
self.assertFalse(self.manager.running)
# 最多等待 5 秒讓線程結束
start_time = time.time()
while self.manager.thread.is_alive() and time.time() - start_time < 5:
time.sleep(0.1)
self.assertFalse(self.manager.thread.is_alive())
def test_add_remove_objects(self):
"""測試添加和移除 mavlink_object"""
# 啟動管理器
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj1))
self.assertEqual(len(self.manager.managed_objects), 1)
self.assertEqual(self.mavlink_obj1.state, MavlinkObjectState.RUNNING)
# 添加另一個對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj2))
self.assertEqual(len(self.manager.managed_objects), 2)
# 檢查受管理對象列表
managed_objects = self.manager.get_managed_objects()
self.assertEqual(len(managed_objects), 2)
self.assertIn(0, managed_objects)
self.assertIn(1, managed_objects)
# 移除對象
self.assertTrue(self.manager.remove_mavlink_object(0))
self.assertEqual(len(self.manager.managed_objects), 1)
# 嘗試移除不存在的對象
self.assertFalse(self.manager.remove_mavlink_object(999))
# 停止管理器
self.manager.stop()
def test_data_processing(self):
"""測試數據處理"""
# 創建用於轉發的 mavlink_objects
mock_socket3 = MockMavlinkSocket()
mavlink_obj3 = mavlink_object(mock_socket3)
# 設置轉發
self.mavlink_obj1.add_target_socket(2) # socket1 轉發到 socket3
# 設置訊息類型
self.mavlink_obj1.set_bridge_message_types([0, 30]) # HEARTBEAT, ATTITUDE
# self.mavlink_obj1.enable_return(True)
self.mavlink_obj1.set_return_message_types([30]) # ATTITUDE
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
self.manager.add_mavlink_object(self.mavlink_obj1)
self.manager.add_mavlink_object(mavlink_obj3)
# 等待處理完成
time.sleep(1.0)
# print("testing Mark A")
# 檢查 Ring buffer 是否有正確的數據
self.assertEqual(stream_bridge_ring.size(), 2) # HEARTBEAT + ATTITUDE
self.assertEqual(return_packet_ring.size(), 1) # ATTITUDE
a = stream_bridge_ring.get()
print(f"stream_bridge_ring: {a}")
# 檢查是否正確轉發
self.assertEqual(len(mock_socket3.written_data), 2) # HEARTBEAT + ATTITUDE
# print("testing Mark B")
# 停止管理器
self.manager.stop()
def test_error_handling(self):
"""測試錯誤處理情況"""
print("=== mark A ===")
# 創建一個會引發異常的 socket
class ErrorSocket(MockMavlinkSocket):
def recv_msg(self):
raise Exception("Test exception")
error_socket = ErrorSocket()
mavlink_obj_err = mavlink_object(error_socket)
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
self.manager.add_mavlink_object(mavlink_obj_err)
print("=== mark B ===")
# 等待錯誤處理
time.sleep(1.0)
# 對象應該進入錯誤狀態,但不會崩潰
# self.assertEqual(mavlink_obj_err.state, MavlinkObjectState.ERROR)
print("=== mark C ===")
# 管理器應該仍在運行
self.assertTrue(self.manager.running)
# 故意等一段時間 確認 socket 有被 manager 處理掉
time.sleep(3)
# 停止管理器
self.manager.stop()
class TestIntegration(unittest.TestCase):
"""整合測試,測試多個 mavlink_object 之間的互動"""
def test_bidirectional_forwarding(self):
"""測試雙向轉發"""
# 清空全局變數和 ring buffer
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建三個 mavlink 對象,模擬三個通道
socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1),
MockMavlinkMessage(msg_id=32, sysid=2, compid=1)
])
socket3 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=3, compid=1),
MockMavlinkMessage(msg_id=33, sysid=3, compid=1)
])
obj1 = mavlink_object(socket1)
obj2 = mavlink_object(socket2)
obj3 = mavlink_object(socket3)
# 設置雙向轉發
# obj1 <-> obj2 <-> obj3
obj1.add_target_socket(1) # obj1 -> obj2
obj2.add_target_socket(0) # obj2 -> obj1
obj2.add_target_socket(2) # obj2 -> obj3
obj3.add_target_socket(1) # obj3 -> obj2
# 啟動 async_io_manager
manager = async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加所有 mavlink_object
manager.add_mavlink_object(obj1)
manager.add_mavlink_object(obj2)
manager.add_mavlink_object(obj3)
# 等待處理所有訊息
time.sleep(1.5)
# 檢查轉發結果
# socket1 應該收到 socket2 和 socket3 的訊息
self.assertEqual(len(socket1.written_data), 4) # 2 from obj2, 2 from obj3 via obj2
# socket2 應該收到 socket1 和 socket3 的訊息
self.assertEqual(len(socket2.written_data), 4) # 2 from obj1, 2 from obj3
# socket3 應該收到 socket1 和 socket2 的訊息
self.assertEqual(len(socket3.written_data), 4) # 2 from obj1 via obj2, 2 from obj2
# 檢查 ring buffer 的數據
# 假設所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0]
# 應該有 3 個 HEARTBEAT 訊息
self.assertEqual(stream_bridge_ring.size(), 3) # 3 HEARTBEAT
# 停止管理器
manager.stop()
if __name__ == "__main__":
unittest.main(defaultTest="TestMavlinkObject.test_send_message")

@ -0,0 +1,152 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import time
import threading
from fc_network_adapter.ringBuffer import RingBuffer
def producer(buffer, count, interval=0.01):
"""生產者:向緩衝區添加資料"""
print(f"Producer started (thread {threading.get_ident()})")
for i in range(count):
# 嘗試寫入數據,直到成功
while not buffer.put(f"Item-{i}"):
print(f"Buffer full, producer waiting... (thread {threading.get_ident()})")
time.sleep(0.1)
print(f"Produced: Item-{i}, buffer size: {buffer.size()}")
time.sleep(interval) # 模擬生產過程
print(f"Producer finished (thread {threading.get_ident()})")
def consumer(buffer, max_items, interval=0.05):
"""消費者:從緩衝區讀取資料"""
print(f"Consumer started (thread {threading.get_ident()})")
items_consumed = 0
while items_consumed < max_items:
# 嘗試讀取數據
item = buffer.get()
if item:
print(f"Consumed: {item}, buffer size: {buffer.size()}")
items_consumed += 1
else:
print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})")
time.sleep(interval) # 模擬消費過程
print(f"Consumer finished (thread {threading.get_ident()})")
def batch_consumer(buffer, interval=0.2):
"""批量消費者:一次性讀取緩衝區中的所有資料"""
print(f"Batch consumer started (thread {threading.get_ident()})")
for _ in range(5): # 執行5次批量讀取
time.sleep(interval) # 等待緩衝區積累數據
items = buffer.get_all()
if items:
print(f"Batch consumed {len(items)} items: {items}")
else:
print("Buffer empty for batch consumer")
print(f"Batch consumer finished (thread {threading.get_ident()})")
def demonstrate_multi_writer():
"""示範多個寫入執行緒同時操作緩衝區"""
print("\n=== Demonstrating Multiple Writers ===")
buffer = RingBuffer(capacity=80)
# 創建多個生產者執行緒
threads = []
for i in range(3):
thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1)))
threads.append(thread)
thread.start()
# 等待所有執行緒完成
for thread in threads:
thread.join()
buffer.print_stats() # 印出統計資訊
# 讀出所有剩餘資料
remaining = buffer.get_all()
print(f"Remaining items in buffer after multiple writers: {remaining}")
def demonstrate_basic_usage():
"""示範基本使用方式"""
print("\n=== Demonstrating Basic Usage ===")
# 創建緩衝區
buffer = RingBuffer(capacity=20, buffer_id=7)
# 檢查初始狀態
print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 添加幾個項目
for i in range(5):
buffer.put(f"Test-{i}")
# 檢查狀態
print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 讀取一個項目
item = buffer.get()
print(f"Read item: {item}")
print(f"After reading 1 item - Content Size: {buffer.size()}")
# 添加更多項目直到滿
items_added = 0
while not buffer.is_full():
buffer.put(f"Fill-{items_added}")
items_added += 1
print(f"Added {items_added} more items until full")
print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 嘗試添加到已滿的緩衝區
result = buffer.put("Overflow")
print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}")
# 獲取所有項目
all_items = buffer.get_all()
print(f"All items in buffer: {all_items}")
print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}")
# 印出統計資訊
buffer.print_stats()
def demonstrate_producer_consumer():
"""示範生產者-消費者模式"""
print("\n=== Demonstrating Producer-Consumer Pattern ===")
buffer = RingBuffer(capacity=16)
# 創建生產者和消費者執行緒
producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1))
consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2))
batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5))
# 啟動執行緒
producer_thread.start()
consumer_thread.start()
batch_thread.start()
# 等待執行緒完成
producer_thread.join()
consumer_thread.join()
batch_thread.join()
# 檢查最終狀態
print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}")
buffer.print_stats()
if __name__ == "__main__":
# 展示各種使用場景
# demonstrate_basic_usage()
# demonstrate_producer_consumer()
demonstrate_multi_writer()
print("\nAll demonstrations completed!")

@ -1,33 +0,0 @@
import socket
import os
# socket file path
SOCKET_PATH = "/tmp/unix_socket_example"
# 若檔案存在就先刪除
if os.path.exists(SOCKET_PATH):
os.remove(SOCKET_PATH)
# 建立 socket
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# 綁定 socket 到檔案
server.bind(SOCKET_PATH)
server.listen(1)
print("🔌 Server waiting for connection...")
# 等待 client 連線
conn, _ = server.accept()
print("✅ Client connected.")
while True:
data = conn.recv(1024)
if not data:
break
print("📥 Received:", data.decode())
conn.sendall(b"Echo: " + data)
conn.close()
server.close()
os.remove(SOCKET_PATH)

@ -0,0 +1,94 @@
import socket
import os
# # socket file path
# SOCKET_PATH = "/tmp/unix_socket_example"
# # 若檔案存在就先刪除
# if os.path.exists(SOCKET_PATH):
# os.remove(SOCKET_PATH)
# # 建立 socket
# server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# # 綁定 socket 到檔案
# server.bind(SOCKET_PATH)
# server.listen(1)
# print("🔌 Server waiting for connection...")
# # 等待 client 連線
# conn, _ = server.accept()
# print("✅ Client connected.")
# while True:
# data = conn.recv(1024)
# if not data:
# break
# print("📥 Received:", data.decode())
# conn.sendall(b"Echo: " + data)
# conn.close()
# server.close()
# os.remove(SOCKET_PATH)
# =====================
# from pymavlink import mavutil
# def create_unix_socket_connection():
# # 建立一個 mavtcpin 實例
# mav_conn = mavutil.mavtcpin("127.0.0.250:9999", source_system=1, source_component=1)
# # 替換底層 socket
# # 關閉原有的 socket
# mav_conn.listen.close()
# # 創建 Unix socket 並替換
# unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# unix_socket_path = "/tmp/unix_socket_mavlink.sock"
# # 確保先前的 unix socket 已被移除
# if os.path.exists(unix_socket_path):
# os.remove(unix_socket_path)
# # 綁定並設定 Unix socket
# unix_socket.bind(unix_socket_path)
# unix_socket.listen(1)
# unix_socket.setblocking(0)
# mavutil.set_close_on_exec(unix_socket.fileno())
# # 替換 listen socket
# mav_conn.listen = unix_socket
# mav_conn.fd = unix_socket.fileno()
# # mav_conn.port = unix_socket
# return mav_conn
# a = create_unix_socket_connection()
# # print(a.port)
# ===============================
import mavunixin
mav_conn = mavunixin.mavunixin("unix:/tmp/unix_socket_mavlink.sock", source_system=1, source_component=1)
import time
print("🔌 Server waiting for connection...")
while True:
time.sleep(1)
mavlinkMsg = mav_conn.recv_msg()
if mavlinkMsg is not None:
print("📥 Received:", mavlinkMsg)
# a.send(mavlinkMsg)
# print("📤 Server replied:", data.decode())
else:
print("No message received.")

@ -0,0 +1,109 @@
import curses
import serial
import struct
from pymavlink.dialects.v20 import ardupilotmega as mavlink2
PORT = "COM5" # or "/dev/ttyUSB0"
BAUD = 57600
target_system = 5
target_component = 1
MAV_CMD_DO_SET_MODE = 176
mode_list = [
("STABILIZE", 0),
("AUTO", 3),
("GUIDED", 4),
("LOITER", 5)
]
ser = serial.Serial(PORT, BAUD)
class PacketCapture:
def __init__(self):
self.data = bytearray()
def write(self, b):
self.data.extend(b)
return len(b)
def build_api_tx_frame(data: bytes, frame_id=0x01):
frame_type = 0x10
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
def curses_main(stdscr):
curses.curs_set(0)
selected = 0
while True:
stdscr.clear()
h, w = stdscr.getmaxyx()
stdscr.addstr(1, 2, "模式選單(使用:箭頭選擇Enter:發送q:離開)")
for i, (name, _) in enumerate(mode_list):
if i == selected:
stdscr.attron(curses.A_REVERSE)
stdscr.addstr(i + 3, 4, f"> {name}")
stdscr.attroff(curses.A_REVERSE)
else:
stdscr.addstr(i + 3, 4, f" {name}")
key = stdscr.getch()
if key == curses.KEY_UP:
selected = (selected - 1) % len(mode_list)
elif key == curses.KEY_DOWN:
selected = (selected + 1) % len(mode_list)
elif key in [10, 13]: # Enter
name, custom_mode = mode_list[selected]
capture = PacketCapture()
mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1)
mav.version = 2
msg = mav.command_long_encode(
target_system=target_system,
target_component=target_component,
command=MAV_CMD_DO_SET_MODE,
confirmation=0,
param1=1,
param2=custom_mode,
param3=0, param4=0, param5=0, param6=0, param7=0
)
print("msg =", msg) # 確認封包物件生成
mav.send(msg) # 改為 send() 會寫入 capture
print("RAW HEX:", capture.data.hex())
api_frame = build_api_tx_frame(capture.data)
ser.write(api_frame)
# 顯示封包資訊
msg_line = min(h - 4, len(mode_list) + 5)
stdscr.addstr(msg_line, 2, f"發送模式切換:{name} ({custom_mode})")
stdscr.addstr(msg_line + 1, 2, f"MAVLink HEX: {' '.join(f'{b:02x}' for b in capture.data)[:w-4]}")
stdscr.addstr(msg_line + 2, 2, f"XBee API HEX: {' '.join(f'{b:02x}' for b in api_frame)[:w-4]}")
stdscr.refresh()
curses.napms(1500)
elif key in [ord('q'), ord('Q')]:
break
stdscr.refresh()
try:
curses.wrapper(lambda stdscr: curses_main(stdscr)) # 使用 lambda 函數來傳遞 ser
except Exception as e:
print(f"發生錯誤: {e}")
finally:
ser.close()
print("程式結束,串口已關閉")

@ -0,0 +1,60 @@
from xbee import XBee
import serial
from pymavlink import mavutil
import time
# 配置 XBee 串口連接
ser = serial.Serial('COM5', 57600) # XBee 串口
# 無人機系統 ID假設為 5和目標系統 ID假設為 1
system_id = 5
target_system_id = 1 # 設定目標系統 ID這裡假設目標系統 ID 是 1
# 目標設備的 64 位地址(請替換為實際的無人機地址)
target_address = b'\x00\x13\xa2\x00\x40\x5f\x88\x56' # 例如00 13 A2 00 40 5F 88 56
def decode_mavlink_data(data,master):
"""解碼 MAVLink 的原始數據並處理 HEARTBEAT 訊息"""
try:
msg = master.parse_char(data)
if msg:
if msg.get_type() == "HEARTBEAT":
print(f"Raw MAVLink Data (Hex): {data.hex()}")
print(f"Received MAVLink message: {msg}")
print(f"System status: {msg.system_status}")
print(f"Flight mode: {mavutil.mode_string_v10(msg)}")
print(f"System ID: {msg.get_srcSystem()}") # 使用 get_srcSystem() 獲取 system_id
except Exception as e:
print(f"Failed to decode MAVLink data: {e}")
def receive_packets(ser):
xbee = XBee(ser)
# 創建 MAVLink 解析器並與 XBee 串口連接
master = mavutil.mavlink.MAVLink(ser)
while True:
try:
# 從 XBee 接收數據
xbee_data = xbee.wait_read_frame()
# 讀取 `rf_data` 而非 `payload`
raw_data = xbee_data.get('rf_data', None)
if raw_data is None:
print("Warning: No 'rf_data' found in received XBee data!")
continue # 跳過此次循環
# 解碼 MAVLink 訊息
decode_mavlink_data(raw_data,master)
# 根據需要觸發模式切換,例如根據用戶輸入更改模式
except KeyboardInterrupt:
print("Exiting...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(0.1)
Loading…
Cancel
Save