Compare commits

...

18 Commits

Author SHA1 Message Date
wenchun 303dfaf04a 新增 devRun.py 測試檔案 11 months ago
wenchun cd3c0f5bee Merge branch 'chiyu' into wenchun 11 months ago
Chiyu Chen 6fab72ac97 重大更新 :
1. 修正不同 socket 轉傳間效能低落問題
於 mavlinkObject.py
重構 mavlink_object
增加 async_io_manager
(版本目標功能調整 寫於程式碼註解中)
2. 調整各個程式碼的檔案位置 讓測試用程式與專案程式分離
11 months ago
Chiyu Chen 7f7753d0b4 1. (中繼的存檔) 準備對 mavlinkObject.py 的 mavlink_object 重構
2. 新增 ringBuffer.py 作為 queue 取代
3. 新增 tests 資料夾分離程式與測試檔案
4. asyncioManager.py 是參考用的範例
11 months ago
Chiyu Chen 417d9e8f57 1. 新增 serial_udp_bitrans.py 該程式連結 serial 的封包 經過轉換後 再向 udp 口丟出 (反之亦然)
2. 其餘檔案只是 名稱與註解修正
12 months ago
Chiyu Chen 5769f9ab3b XMerge branch 'lunu' into chiyu 12 months ago
lenting1027 b1109ad102 XBee解讀heartbeat資訊 1 year ago
lenting1027 7073eaf3e6 AT XBee傳送並接收字串訊息 1 year ago
lenting1027 f465b44b79 API XBee傳送並接收字串訊息 1 year ago
lenting1027 5f73324a34 API XBee傳送字串訊息 1 year ago
lenting1027 d312f644c8 XBee解讀heartbeat資訊 1 year ago
lenting1027 e4919cfe78 XBee封包轉換(change mode) 1 year ago
lenting1027 cdf139f4e8 更新「src/unitdev04/change_mode.py」 1 year ago
lenting1027 6c1441d0c6 Mavlink and XBee packet transfer 1 year ago
lenting1027 df2a459070 更新「src/unitdev04/change_mode.py」 1 year ago
lenting1027 716b3726c2 Mavlink and XBee packet transfer 1 year ago
lenting1027 21dce163ed 刪除「src/unitdev04/change_mode.py」 1 year ago
lenting1027 a222514ca5 上傳檔案到「src/unitdev04」
XBee
1 year ago

@ -11,7 +11,8 @@
Python
1. pymavlink
2.
2. conda-forge 中的 pyserial-asyncio
3.
ROS2
1. source ~/ros2_humble/install/setup.bash

@ -1,531 +0,0 @@
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
import mavlinkObject as mo
import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 51
running_time = 30
print('test_item : ', test_item)
if test_item == 1:
# 測試 mavlink_object 中 updateMultiplexingList 的輸出
print('===> Start of Program .Test ', test_item)
mavlink_object_none = mo.mavlink_object(None)
print('=====================')
print('正常範例')
mavlink_object_none.multiplexingToAnalysis = []
mavlink_object_none.multiplexingToReturn = []
# mavlink_object_none.multiplexingToSwap = [[]]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 長度不一致')
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('=====================')
print('正常範例')
mo.swap_queues.append(queue.Queue())
mo.swap_queues.append(queue.Queue())
mavlink_object_none.multiplexingToAnalysis = [0, 1, 2, 3, 4, 5]
mavlink_object_none.multiplexingToReturn = [6, 7, 8, 9, 10]
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 multiplexingToAnalysis 不可以有 -1')
mavlink_object_none.multiplexingToAnalysis = [-1, 1, 2, 3, 4, 5]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('<=== End of Program')
elif test_item == 2:
# 測試 mavlink_object 創建時 socket_id 是否正確
# 說實在 這個測試項 似乎因為 python 的 GC 機制 會導致難以測試
print('===> Start of Program .Test ', test_item)
mavlink_object_none1 = mo.mavlink_object(None)
mavlink_object_none2 = mo.mavlink_object(None)
mavlink_object_none3 = mo.mavlink_object(None)
del mavlink_object_none2
print(len(mo.swap_queues))
mavlink_object_none2 = mo.mavlink_object(None)
print(len(mo.swap_queues))
print(mavlink_object_none1._multiplexingList)
print('<=== End of Program')
elif test_item == 10:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# 只啟用了 mavlink_object 的功能
print('===> Start of Program .Test ', test_item)
# 創建一個空的通道 這個通道的 socket_id 是 0
mavlink_object_none = mo.mavlink_object(None)
# 創建另一個通道
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket)
# 設定通道轉發的參數
mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE
mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT
mavlink_object1.multiplexingToSwap[0] = [74, ] # only VFR_HUD
# print(mavlink_object1.multiplexingToSwap)
# print(mo.swap_queues)
# 啟動通道
mavlink_object1.run()
# 確認轉拋的設定有沒有錯
print("_multiplexingList for mavlink object :", mavlink_object1._multiplexingList)
# 運行幾秒並印出 queue 的資料
start_time = time.time()
while time.time() - start_time < running_time:
while not mo.fixed_stream_bridge_queue.empty():
print('fixed_stream_bridge_queue:')
t = mo.fixed_stream_bridge_queue.get()
print('from {} : {}'.format(t[0],t[2]))
while not mo.return_packet_processor_queue.empty():
print('return_packet_processor_queue:')
t = mo.return_packet_processor_queue.get()
print('from {} : {}'.format(t[0],t[2]))
# print(t[2].get_msgbuf())
# print(t[2].type)
for n in range(len(mo.swap_queues)):
q = mo.swap_queues[n]
while not q.empty():
print('swap_queues:')
t = q.get()
print('{} gets : {}'.format(n,t))
# 結束程式 退出所有 thread
mavlink_object1.running = False
mavlink_object1.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 11:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
# 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能
print('===> Start of Program .Test ', test_item)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 創建通道
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object2 = mo.mavlink_object(mavlink_socket)
# mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT
# mavlink_object2.multiplexingToReturn = []
# 啟動連線的模組
mavlink_object2.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(2)
print("=== connection established! ===")
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
start_time = time.time()
show_time = time.time()
compid = 1
while time.time() - start_time < running_time:
if (time.time() - show_time) >= 1:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count)
print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 結束程式 退出所有 thread
mavlink_object2.stop()
mavlink_object2.thread.join()
analyzer.stop()
analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 12:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試 mavlink object 作為交換器功能的代碼
print('===> Start of Program .Test ', test_item)
# 啟動連線的模組
analyzer = mo.mavlink_bridge()
# 初始化輸入通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT
# 初始化輸出通道
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = [0]
# 做一個空的通道驗證 可以拿來 debug
mavlink_object_none = mo.mavlink_object(None)
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(3)
print("=== connection established! ===")
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
# print(type(mavlink_socket_out))
# print(type(mavlink_socket_out.mav))
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False)
print('none object : ', test)
except queue.Empty:
pass
if (time.time() - show_time) >= 2:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_in.close()
mavlink_socket_out.close()
analyzer.stop()
print('<=== End of Program')
elif test_item == 20:
# 這邊測試 node 生成 topic 的功能
# 利用 空的通道 發出假的 heartbeat 封包
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
from pymavlink.dialects.v20 import common as mavlink2
sysid = 1
compid = 1
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
mav = mavlink2.MAVLink(None)
mav.srcSystem = sysid
mav.srcComponent = compid
# 這是一個 heartbeat 封包
fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
type=mavlink2.MAV_TYPE_QUADROTOR,
autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
base_mode=3,
custom_mode=0,
system_status=0,
mavlink_version=3
)
mavlink_bytes = fake_heartbeat.pack(mav)
fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
print(analyzer.mavlink_systems)
mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
time.sleep(0.1)
print(analyzer.mavlink_systems)
# ROS2 初始化
analyzer._init_node()
# 創建 ROS2 topic
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
print("topic created")
time.sleep(5)
# 丟出 topic
analyzer.emit_info()
# 結束程式
analyzer.running = False
analyzer.destroy_node()
rclpy.shutdown()
analyzer.stop()
analyzer.thread.join()
print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
# 啟動通道
mavlink_object3.run()
print('=== waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(1)
except KeyboardInterrupt:
break
# 程式結束
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 22:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 初始化兩個通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = []
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
show_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
show_time2 = time.time()
while time.time() - start_time < running_time:
if (time.time() - show_time2) >= 1:
analyzer.emit_info() # 這邊是測試 node 的運行
# sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
sss = mavutil.mode_string_v10(fmsg)
# print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
print("目前的飛行模式 : {}".format(sss))
show_time2 = time.time()
# if (time.time() - show_time) >= 2:
# show_time = time.time()
# for sysid in analyzer.mavlink_systems:
# for compid in analyzer.mavlink_systems[sysid].components:
# print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# print("===================")
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_socket_in.close()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_out.close()
analyzer.stop()
analyzer.thread.join()
print('<=== End of Program')
elif test_item == 51:
# 晉凱的測試項目
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket3 = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket3)
# 設定通道流動
mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
mavlink_object3.multiplexingToReturn = [] #
# mavlink_object3.multiplexingToSwap = [] #
# 啟動通道
mavlink_object3.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create all topic: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(0.5)
except KeyboardInterrupt:
break
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket3.close()
print('<=== End of Program')

@ -1,6 +1,6 @@
# 自定義的 import
from theLogger import setup_logger
from .theLogger import setup_logger
# ====================== 分割線 =====================
@ -31,6 +31,11 @@ class mavlink_device():
# p_str += '=====================\n'
return p_str
'''
寫了半天 這個功能應該是 pymalink 本來就有的
去找 pymavlink_util.py mavfile(object)
算了 先擺著吧 之後再看看怎麼整合
'''
def updateComponentPacketCount(self, compid, current_seq, current_type, current_time):
# 這段檢查遺失封包
try:

@ -1,8 +1,7 @@
'''
# 不同的匯流排只有單一種通訊協定
# 匯流排接到訊息後透過 queue stack 傳送到橋接器
# 會有三種 Queue 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 3. 轉拋串流
# 第三種比較麻煩 會需要用 list 管理多個轉換器的 queue
# 匯流排接到訊息後透過 ring buffer 傳送到橋接器
# 會有兩種 RingBuffer 分別作為 1. 固定串流橋接器 2. 回傳封包處理器
# 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去
# 關於 mavlink 採用 pymavlink 這個套件 製作匯流排
@ -13,8 +12,11 @@
# 基礎功能的 import
import threading
import queue
# import queue
import time
import asyncio
from enum import Enum, auto
# from typing import Dict, List, Optional, Set, Any, Tuple
# mavlink 的 import
from pymavlink import mavutil
@ -23,38 +25,17 @@ from pymavlink import mavutil
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
from .mavlinkDevice import mavlink_device, mavlink_systems
from .mavlinkPublish import mavlink_publisher
from .theLogger import setup_logger
from .ringBuffer import RingBuffer
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
# 基礎功能的 import
import threading
import queue
import time
# mavlink 的 import
from pymavlink import mavutil
# ROS2 的 import
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device, mavlink_systems
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
fixed_stream_bridge_queue = queue.Queue()
return_packet_processor_queue = queue.Queue()
swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer
stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255)
return_packet_ring = RingBuffer(capacity=256, buffer_id=254)
# ====================== 分割線 =====================
@ -75,6 +56,8 @@ class mavlink_bridge(Node, mavlink_publisher):
node 區塊則是處理 ros2 publisher subscriber 訂閱相關
藉由控制 ros2 的機制再把 device object 的資訊發送出去
fixed_stream_bridge_queue 置換成 stream_bridge_ring
ps. 我限制了這個 class 只能有一個 instance
'''
_instance = None
@ -106,13 +89,25 @@ class mavlink_bridge(Node, mavlink_publisher):
# === Thread 區塊 ===
def _run_thread(self):
# start_time = time.time() # debug
start_time = time.time() # debug
run_loop_count = 0 # debug
logger.info('Start of mavlink_bridge._run_thread')
# 從 Queue fixed_stream_bridge_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
# 從 Queue stream_bridge_ring 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
while self.running:
if fixed_stream_bridge_queue.empty():
# # 這個迴圈每秒鐘被執行了幾輪? # 這整段都是 debug 用的
# current_time = time.time()
# if (current_time - start_time) >= 1.0:
# logger.info(f'mavlink_bridge._run_thread loop count: {run_loop_count}')
# run_loop_count = 0
# start_time = current_time
# else:
# run_loop_count += 1
# # =========================
if stream_bridge_ring.is_empty():
continue
msg_pack = fixed_stream_bridge_queue.get()
msg_pack = stream_bridge_ring.get()
msg = msg_pack[2]
sysid = msg.get_srcSystem()
@ -150,7 +145,7 @@ class mavlink_bridge(Node, mavlink_publisher):
this_component.emitParams['flightMode'] = msg # debug
# print("mav_type : ", msg.type) # debug
# print("get mode :", mavutil.mode_string_v10(msg)) # debug
print("get mode :", mavutil.mode_string_v10(msg)) # debug
# print("record mode :", this_component.emitParams['flightMode_mode']) # debug
elif msg_id == 30: # ATTITUDE 處理
@ -201,24 +196,45 @@ class mavlink_bridge(Node, mavlink_publisher):
# ====================== 分割線 =====================
class mavlink_object():
'''
本次改版的暫時記錄註解於此
1. mavlink_object 由於 Queue 的效能太差 會完全移除
其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現
multiplexingToSwap 會完全被移除代替方式下一條描述
2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流
(註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object)
因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移
並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出
3. mavlink_object 需要加上 state 去管理其狀態
4. mavlink_object 需要加上 target port 去管理寫出的目標
5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道
6. 基於第1,2 updateMultiplexingList 會被完全移除
7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object
8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring return_packet_ring 來取代 另外 swap_queues 會被完全移除
'''
# 定義 mavlink_object 的狀態
class MavlinkObjectState(Enum):
INIT = auto() # 初始化狀態
RUNNING = auto() # 運行中狀態
ERROR = auto() # 錯誤狀態
CLOSED = auto() # 已關閉狀態
class mavlink_object:
'''
每個 mavlink bus 都會有一個 mavlink_object
其中主要是 thread 做統計封包與分流
分流表的控制在三個 list 分別為
multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的
multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的
multiplexingToSwap : 這個 list 是用來分流到轉拋串流的
透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為
使用 asyncio 處理資料流 RingBuffer 來分配訊息
直接透過 socket 寫出
'''
mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) }
socket_num = 0 # 用來記錄目前的 socket 數量
mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) }
socket_num = 0 # 用來記錄目前的 socket 數量
def __new__(cls, *args, **kwargs):
# 每創建一個實例 就將其添加到 mavlinkObjects 列表中
# 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號
# 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號
# socket_id 從 0 開始
instance = super().__new__(cls)
socket_id = 0
for k in cls.mavlinkObjects.keys():
@ -232,183 +248,389 @@ class mavlink_object():
return instance
def __init__(self, socket):
# 登入所需的 socket
self.mavlink_socket = socket
# 這邊變數是執行的時候被使用的 不要直接寫入它
self._multiplexingList = []
# 存放要發送的訊息的 queue 或稱 buffer
self.output_buffer = queue.Queue()
if self.socket_id >= len(swap_queues):
swap_queues.append(self.output_buffer)
else:
swap_queues[self.socket_id] = self.output_buffer
# 存放目前是否分流到固定串流橋接器和回傳封包處理器的標誌 # 都是要做的 把這個判斷拿掉
# self.send_to_bridge = True
# self.send_to_return = False
# 用於主線程發送消息的緩衝區
self.outgoing_msg_lock = threading.Lock()
self.outgoing_msgs = []
# 記錄訊息過濾類型 (可選)
self.bridge_msg_types = [0] # 默認只處理 HEARTBEAT
self.return_msg_types = []
# 目標端口
self.target_sockets = []
# 關聯到全域變數
global mavlink_systems
self.mavlink_systems = mavlink_systems
# 這三個 list 用來分配不同的訊息到不同的 queue
self.multiplexingToAnalysis = [
0, # HEARTBEAT # 挺必要的項目
# 24, # GPS_RAW_INT
# 30, # ATTITUDE
# 33, # GLOBAL_POSITION_INT
# 74, # VFR_HUD
# 147, # BATTERY_STATUS
]
self.multiplexingToReturn = []
self.multiplexingToSwap = [
[] for _ in range(len(swap_queues))
]
# 刷新其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) <= self.socket_id):
object.multiplexingToSwap.append([])
object.updateMultiplexingList()
logger.info('mavlink_object instance {} created'.format(self.socket_id))
# 物件變數
self.task = None # Task reference
self.dirtyDataCount = 0 # 髒資料計數器
self.dirtyDataMax = 10 # 髒資料容許閾值
self.state = MavlinkObjectState.INIT
logger.info(f'mavlink_object instance {self.socket_id} created')
def __del__(self):
# 停下自己的 thread
if self.mavlink_socket != None:
self.mavlink_socket.close()
self.stop()
# 移除其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) > self.socket_id):
object.multiplexingToSwap[self.socket_id] = []
object.updateMultiplexingList()
# 移除自己的 swap_queues
swap_queues[self.socket_id] = None
# 停止 asyncio task
self.stop()
# 關閉 socket
if hasattr(self, 'mavlink_socket') and self.mavlink_socket:
try:
self.mavlink_socket.close()
except:
pass
# 處理 class 的 instance 記錄
self.socket_num -= 1
self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance
# logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替
# print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
if hasattr(self.__class__, 'socket_num'):
self.__class__.socket_num -= 1
if hasattr(self.__class__, 'mavlinkObjects') and hasattr(self, 'socket_id'):
if self.socket_id in self.__class__.mavlinkObjects:
self.__class__.mavlinkObjects.pop(self.socket_id)
# 這段不知道怎麼了 反正會一直讓 logger ERROR 我先關掉
# try:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
# logger.info(f'mavlink_object instance {self.socket_id} deleted')
# except Exception as e:
# print(f"Error logging in __del__: {e}")
# if 'logger' in globals() and logger:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
def run(self):
# TODO 檢查 socket 是否有連線
self.thread = threading.Thread(target=self._run_thread)
self.running = self.updateMultiplexingList()
self.thread.start()
def start(self):
"""啟動 mavlink_object 處理循環"""
if self.state == MavlinkObjectState.RUNNING:
logger.warning(f"mavlink_object {self.socket_id} is already running")
return
self.state = MavlinkObjectState.RUNNING
# 實際的啟動會由 async_io_manager 處理
def stop(self):
self.running = False
def _run_thread(self):
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
start_time = time.time()
while self.running:
timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差
# 處理接收到的封包
"""停止 mavlink_object 處理循環"""
if self.state in (MavlinkObjectState.CLOSED, MavlinkObjectState.ERROR):
return
self.state = MavlinkObjectState.CLOSED
if self.task:
if not self.task.done():
self.task.cancel()
async def process_data(self):
"""處理 mavlink 數據的主要 asyncio 協程"""
logger.info(f'Start of mavlink_object.process_data id: {self.socket_id}')
while self.state == MavlinkObjectState.RUNNING:
timestamp = time.time()
try:
# 處理接收到的封包
mavlinkMsg = self.mavlink_socket.recv_msg()
except Exception as e:
logger.critical(f"Receiving data not mavlink format. Object Delete.")
logger.critical(f"Receiving data not mavlink format. Object Delete.")
print(f"[mavlinkObject.py] Error receiving mavlink data: {e}")
print(mavlinkMsg)
self.running = False
break
if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega. etc...'
# 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間
sysid = mavlinkMsg.get_srcSystem()
compid = mavlinkMsg.get_srcComponent()
if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息
# mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型
mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp)
# break # Debug function can put here you can see the input data from mavlink
# 將訊息依照 multiplexing list 分發到不同的 queue
for i in range(len(self._multiplexingList)):
if (self._multiplexingList[i] == []):
continue
elif (mavlinkMsg.get_msgId() in self._multiplexingList[i]) or (self._multiplexingList[i][0] == -1):
if i == 0:
fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg))
elif i == 1:
return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg))
else:
_queue = swap_queues[i-2]
# _queue.put((self.socket_id,timestamp,mavlinkMsg)) # 測試看看 也許不需要別的資訊 只需要封包
_queue.put(mavlinkMsg)
# 處理要送出的封包
# 如果 有資料在 output_buffer 中則將其取出並發送
# 沒有就略過發送
logger.warning(f"Error in mavlink_object.process_data for id {self.socket_id}: {e}")
self.dirtyDataCount += 1
if self.dirtyDataCount >= self.dirtyDataMax:
logger.error(f"Too many dirty data received in mavlink_object {self.socket_id}, entering ERROR state")
self.state = MavlinkObjectState.ERROR
# 不直接退出,嘗試容忍髒數據
await asyncio.sleep(0.01) # 短暫暫停
continue
try:
mavlinkMsg_send = self.output_buffer.get(block=False)
except queue.Empty:
mavlinkMsg_send = None
# except Exception as e:
# logger.error(f"Error getting data from output_buffer: {e}")
# mavlinkMsg_send = None
if mavlinkMsg_send:
# self.mavlink_socket.mav.send(mavlinkMsg_send)
self.mavlink_socket.write(mavlinkMsg_send.get_msgbuf()) # 這邊會將封包寫入 socket 中
# thread 結束
logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id))
def updateMultiplexingList(self):
'''
更新 multiplexing list 並做簡單的檢查
'''
# 檢查 multiplexingToAnalysis 與 multiplexingToReturn 是否有 -1 值
check = (-1 in self.multiplexingToAnalysis) or (-1 in self.multiplexingToReturn)
if check:
logger.error('MultiplexingToAnalysis or MultiplexingToReturn NOT all type spilt. socket id : {}'.format(self.socket_id))
return False
# 檢查 multiplexingToSwap 與 swap_queues 的長度是否一致 而且 swap_queues 的長度不能為 0
check = len(self.multiplexingToSwap) != len(swap_queues) or len(swap_queues) == 0
if check:
logger.error('Multiplexing Queue not fit List , Please check the list. socket id : {}'.format(self.socket_id))
if mavlinkMsg:
# 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間
sysid = mavlinkMsg.get_srcSystem()
compid = mavlinkMsg.get_srcComponent()
if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息
self.mavlink_systems[sysid].updateComponentPacketCount(
compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) #TODO 這邊怪怪的 好像要再檢查
# 分發訊息到 RingBuffer
msg_id = mavlinkMsg.get_msgId()
# if self.send_to_bridge and (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types):
if (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types):
stream_bridge_ring.put((self.socket_id, timestamp, mavlinkMsg))
# if self.send_to_return and (msg_id in self.return_msg_types or -1 in self.return_msg_types):
if (msg_id in self.return_msg_types or -1 in self.return_msg_types):
return_packet_ring.put((self.socket_id, timestamp, mavlinkMsg))
# 將接收到的訊息轉發給所有目標端口
for target_port in self.target_sockets:
if target_port != self.socket_id and target_port in self.mavlinkObjects:
target_obj = self.mavlinkObjects[target_port]
if target_obj.state == MavlinkObjectState.RUNNING:
try:
target_obj.mavlink_socket.write(mavlinkMsg.get_msgbuf())
except Exception as e:
logger.error(f"Error forwarding message to port {target_port}: {e}")
with self.outgoing_msg_lock:
if self.outgoing_messages and (send_msg := self.outgoing_messages.pop(0)):
try:
self.mavlink_socket.write(send_msg)
except Exception as e:
logger.error(f"mavlink_object {self.socket_id} failed to send message: {e}")
# 這邊的重點不是延遲 而是透過 await 讓出 event loop 的控制權
await asyncio.sleep(0.001)
except asyncio.CancelledError:
logger.info(f'mavlink_object.process_data for id {self.socket_id} was cancelled')
break
except Exception as e:
await asyncio.sleep(0.01) # 短暫暫停避免CPU過載
logger.info(f'End of mavlink_object.process_data id: {self.socket_id}')
self.state = MavlinkObjectState.CLOSED
def add_target_socket(self, target_socket_id):
"""添加一個目標端口用於轉發"""
if target_socket_id not in self.target_sockets and target_socket_id != self.socket_id:
self.target_sockets.append(target_socket_id)
logger.info(f"mavlink_object Added target port {target_socket_id} to mavlink_object {self.socket_id}")
return True
return False
def remove_target_socket(self, target_socket_id):
"""移除目標端口"""
if target_socket_id in self.target_sockets:
self.target_sockets.remove(target_socket_id)
logger.info(f"mavlink_object Removed target port {target_socket_id} from mavlink_object {self.socket_id}")
return True
return False
def set_bridge_message_types(self, msg_types):
"""設置需要分流到橋接器的訊息類型"""
if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types):
self.bridge_msg_types = msg_types
return True
logger.error(f"Invalid bridge message types: {msg_types}")
return False
def set_return_message_types(self, msg_types):
"""設置需要分流到回傳處理器的訊息類型"""
if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types):
self.return_msg_types = msg_types
return True
logger.error(f"Invalid return message types: {msg_types}")
return False
def send_message(self, message_bytes):
"""
從主線程向此 mavlink_object socket 發送數據
將數據添加到簡單的列表中 asyncio 任務處理
Args:
message_bytes: 要發送的 mavlink 消息的字節數據
Returns:
bool: 是否成功添加消息到列表
"""
if self.state != MavlinkObjectState.RUNNING:
logger.warning(f"Cannot send message: mavlink_object {self.socket_id} is not running")
return False
# 對應到自己的 multiplexingToSwap 必需為空 避免對自己迴圈轉拋
try:
self.multiplexingToSwap[self.socket_id] = []
except IndexError:
logger.error('Multiplexing List of self socket id should be void. socket id : {}'.format(self.socket_id))
return False
# 組合 multiplexing list
multiL_tmp = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToSwap
# 檢查 multiplexing list 格式是否有錯誤 # 全部都要是 list 每個 list 裡面都要是 int
check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in multiL_tmp)
if not check:
logger.error('Multiplexing List Format Error, Please check the list. socket id : {}'.format(self.socket_id))
return False # 若有錯誤則回傳 False
# 更新 multiplexing list
self._multiplexingList = multiL_tmp
# 使用鎖保護共享資源訪問
with self.outgoing_messages_lock:
self.outgoing_messages.append(message_bytes)
return True
except Exception as e:
logger.error(f"Error queueing message for mavlink_object {self.socket_id}: {e}")
return False
# def enable_bridge(self, enable=True):
# """啟用或禁用橋接器分流"""
# self.send_to_bridge = enable
# def enable_return(self, enable=True):
# """啟用或禁用回傳處理器分流"""
# self.send_to_return = enable
class async_io_manager:
"""
管理所有 mavlink_object 實例的 asyncio 任務
提供單一線程來處理所有 mavlink 通道的數據
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
cls._instance = super(async_io_manager, cls).__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'initialized'):
self.initialized = True
self.loop = None
self.main_task = None
self.running = False
self.managed_objects = {} # socket_id: task
def start(self):
"""啟動 async_io_manager 和其管理的所有 mavlink_object"""
if self.running:
logger.warning("async_io_manager already running")
return
self.running = True
self.thread = threading.Thread(target=self._run_event_loop)
self.thread.start()
logger.info("async_io_manager started")
def stop(self):
"""停止 async_io_manager 和其管理的所有 mavlink_object"""
if not self.running:
return
self.running = False
if self.loop:
# 取消所有任務
for socket_id in list(self.managed_objects.keys()):
self.remove_mavlink_object(socket_id)
# 停止事件循環
if self.main_task and not self.main_task.done():
asyncio.run_coroutine_threadsafe(self._shutdown(), self.loop)
# 等待線程結束
if hasattr(self, 'thread') and self.thread.is_alive():
self.thread.join(timeout=5.0)
logger.info("async_io_manager stopped")
async def _shutdown(self):
"""正確關閉事件循環的協程"""
tasks = [task for task in asyncio.all_tasks(self.loop) if task is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
def _run_event_loop(self):
"""在獨立線程中運行事件循環"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
try:
self.main_task = self.loop.create_task(self._main_task())
self.loop.run_forever()
except Exception as e:
logger.error(f"Error in async_io_manager event loop: {e}")
finally:
self.loop.close()
self.loop = None
self.running = False
logger.info("async_io_manager event loop ended")
async def _main_task(self):
"""主任務協程,用於監視和管理其他任務"""
logger.info("async_io_manager main task started")
try:
while self.running:
# 這邊就是一個無窮迴圈 確保 async_io_manager 物件續存
# 每秒鐘檢查並移除已完成或已取消的任務
await asyncio.sleep(1.0)
for socket_id in list(self.managed_objects.keys()):
task = self.managed_objects[socket_id]
if task.done():
try:
exc = task.exception()
if exc:
logger.error(f"Task for mavlink_object {socket_id} raised exception: {exc}")
except (asyncio.CancelledError, asyncio.InvalidStateError):
pass
if socket_id in mavlink_object.mavlinkObjects:
print(f"this is manager, i make socket_id {socket_id} closed, he is now in {mavlink_object.mavlinkObjects[socket_id].state} state.") # debug
mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED
del self.managed_objects[socket_id]
except asyncio.CancelledError:
logger.info("async_io_manager main task was cancelled")
except Exception as e:
logger.error(f"Error in async_io_manager main task: {e}")
logger.info("async_io_manager main task ended")
def add_mavlink_object(self, mavlink_obj):
"""添加一個 mavlink_object 實例到 async_io_manager 並啟動其處理任務"""
if not self.running:
logger.error("Cannot add mavlink_object: async_io_manager is not running")
return False
if not isinstance(mavlink_obj, mavlink_object):
logger.error(f"Invalid mavlink_object: {mavlink_obj}")
return False
socket_id = mavlink_obj.socket_id
if socket_id in self.managed_objects:
logger.warning(f"mavlink_object {socket_id} already managed")
return False
# 創建並啟動任務
if self.loop:
# task = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop).result()
future = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop)
# print(f"Task created for mavlink_object {socket_id}: {future1}") # debug
# task = future1.result()
self.managed_objects[socket_id] = future
mavlink_obj.task = future
mavlink_obj.state = MavlinkObjectState.RUNNING
logger.info(f"Added mavlink_object {socket_id} to async_io_manager")
return True
return False
def remove_mavlink_object(self, socket_id):
"""從 async_io_manager 中移除一個 mavlink_object 實例並取消其處理任務"""
if socket_id not in self.managed_objects:
logger.warning(f"mavlink_object {socket_id} not managed by async_io_manager")
return False
# 取消任務
task = self.managed_objects[socket_id]
if not task.done():
task.cancel()
# 從管理列表中移除
del self.managed_objects[socket_id]
# 更新 mavlink_object 狀態
if socket_id in mavlink_object.mavlinkObjects:
mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED
logger.info(f"Removed mavlink_object {socket_id} from async_io_manager")
return True
def get_managed_objects(self):
"""獲取所有被管理的對象列表"""
return list(self.managed_objects.keys())
# ====================== 分割線 =====================
# 整合到 ros2 之後的程式進入點
def main_node():
pass
if __name__ == '__main__':
pass

@ -9,14 +9,16 @@
publisher topic name 命名規則為 <前綴詞>/s<sysid>/<具體 topic>
'''
# ROS2 的 import
import std_msgs.msg
import sensor_msgs.msg
import geometry_msgs.msg
import mavros_msgs.msg
from theLogger import setup_logger
import math
# 自定義的 import
from .theLogger import setup_logger
logger = setup_logger("mavlinkPublish.py")

@ -0,0 +1,231 @@
# import array
import threading
import ctypes
from typing import Any, Optional, Tuple
class RingBuffer:
"""
高效能無鎖環形緩衝區使用原子操作避免鎖定
用於在不同執行緒間高效傳遞資料
"""
# 緩衝區計數器,用於自動分配 buffer_id
_buffer_counter = 0
_counter_lock = threading.Lock()
def __init__(self, capacity: int = 256, buffer_id: int = None):
"""
初始化環形緩衝區
Args:
capacity: 緩衝區容量 (必須是 2 的次方)
buffer_id: 緩衝區 ID如果為 None 則自動分配
"""
# 確保容量是 2 的次方,便於使用位運算取模
if capacity & (capacity - 1) != 0:
# 找到大於等於 capacity 的最小 2 的次方
capacity = 1 << (capacity - 1).bit_length()
# 分配緩衝區 ID
if buffer_id is None:
with RingBuffer._counter_lock:
buffer_id = RingBuffer._buffer_counter
RingBuffer._buffer_counter += 1
self.buffer_id = buffer_id
self.capacity = capacity
self.mask = capacity - 1 # 用於快速取模
self.buffer = [None] * capacity
# 原子計數器作為讀寫指標
self.write_index = ctypes.c_uint64(0)
self.read_index = ctypes.c_uint64(0)
# 用於檢測上次操作的執行緒 ID
self._last_read_thread = None
self._last_write_thread = None
# 添加同時讀寫統計
self.concurrent_write_count = ctypes.c_uint64(0) # 同時寫入計數
self.concurrent_read_count = ctypes.c_uint64(0) # 同時讀取計數
self.total_write_count = ctypes.c_uint64(0) # 總寫入操作計數
self.total_read_count = ctypes.c_uint64(0) # 總讀取操作計數
self.overflow_count = ctypes.c_uint64(0) # 緩衝區溢出次數
# 記錄各執行緒的操作次數
self.thread_write_counts = {} # {thread_id: count}
self.thread_read_counts = {} # {thread_id: count}
# 用於保護統計數據的鎖(僅用於統計,不影響主要讀寫操作)
self._stats_lock = threading.Lock()
def put(self, item: Any) -> bool:
"""
將項目存入緩衝區
Args:
item: 要存入的項目
Returns:
bool: 成功寫入返回 True緩衝區已滿返回 False
"""
# 更新寫入統計
self.total_write_count.value += 1
# 檢測上次寫入的執行緒
current_thread = threading.get_ident()
# 記錄當前執行緒寫入次數
with self._stats_lock:
self.thread_write_counts[current_thread] = self.thread_write_counts.get(current_thread, 0) + 1
# 檢測是否為不同執行緒同時寫入
if self._last_write_thread is not None and current_thread != self._last_write_thread:
self.concurrent_write_count.value += 1
self._last_write_thread = current_thread
# 原子獲取當前寫入位置
current = self.write_index.value
next_pos = (current + 1) & self.mask
# 檢查緩衝區是否已滿
if next_pos == self.read_index.value:
self.overflow_count.value += 1
return False
# 寫入資料並原子更新寫入位置
self.buffer[current] = item
self.write_index.value = next_pos
return True
def get(self) -> Optional[Any]:
"""
從緩衝區讀取項目
Returns:
項目或 None如果緩衝區為空
"""
# 更新讀取統計
self.total_read_count.value += 1
# 檢測上次讀取的執行緒
current_thread = threading.get_ident()
# 記錄當前執行緒讀取次數
with self._stats_lock:
self.thread_read_counts[current_thread] = self.thread_read_counts.get(current_thread, 0) + 1
# 檢測是否為不同執行緒同時讀取
if self._last_read_thread is not None and current_thread != self._last_read_thread:
self.concurrent_read_count.value += 1
self._last_read_thread = current_thread
# 檢查緩衝區是否為空
if self.read_index.value == self.write_index.value:
return None
# 原子獲取當前讀取位置並讀取資料
current = self.read_index.value
item = self.buffer[current]
# 原子更新讀取位置
self.read_index.value = (current + 1) & self.mask
return item
def get_all(self) -> list:
"""
獲取緩衝區中的所有項目
Returns:
list: 所有可用項目的列表
"""
items = []
while True:
item = self.get()
if item is None:
break
items.append(item)
return items
def size(self) -> int:
# 返回目前緩衝區內項目數量
return (self.write_index.value - self.read_index.value) & self.mask
def is_empty(self) -> bool:
# 檢查緩衝區是否為空
return self.read_index.value == self.write_index.value
def is_full(self) -> bool:
# 檢查緩衝區是否已滿
return ((self.write_index.value + 1) & self.mask) == self.read_index.value
def clear(self) -> None:
"""清空緩衝區"""
self.read_index.value = self.write_index.value
def get_stats(self) -> dict:
"""
獲取緩衝區統計資訊
Returns:
dict: 包含各種統計數據的字典
"""
with self._stats_lock:
stats = {
"buffer_id": self.buffer_id,
"capacity": self.capacity,
"current_size": self.size(),
"is_full": self.is_full(),
"is_empty": self.is_empty(),
"total_writes": self.total_write_count.value,
"total_reads": self.total_read_count.value,
"concurrent_writes": self.concurrent_write_count.value,
"concurrent_reads": self.concurrent_read_count.value,
"overflow_count": self.overflow_count.value,
"write_threads": len(self.thread_write_counts),
"read_threads": len(self.thread_read_counts),
"concurrent_write_ratio": self.concurrent_write_count.value / max(1, self.total_write_count.value),
"concurrent_read_ratio": self.concurrent_read_count.value / max(1, self.total_read_count.value),
"top_writer_threads": sorted(self.thread_write_counts.items(), key=lambda x: x[1], reverse=True)[:3],
"top_reader_threads": sorted(self.thread_read_counts.items(), key=lambda x: x[1], reverse=True)[:3],
}
return stats
def print_stats(self) -> None:
"""輸出當前緩衝區統計資訊"""
stats = self.get_stats()
print(f"\n=== RingBuffer #{stats['buffer_id']} Statistics ===")
print(f"Capacity: {stats['capacity']}, Current Size: {stats['current_size']}")
print(f"Total Write Operations: {stats['total_writes']}")
print(f"Total Read Operations: {stats['total_reads']}")
print(f"Concurrent Write Events: {stats['concurrent_writes']} ({stats['concurrent_write_ratio']:.2%})")
print(f"Concurrent Read Events: {stats['concurrent_reads']} ({stats['concurrent_read_ratio']:.2%})")
print(f"Buffer Overflow Count: {stats['overflow_count']}")
print(f"Writing Threads: {stats['write_threads']}")
print(f"Reading Threads: {stats['read_threads']}")
print("Top Writer Threads:")
for thread_id, count in stats['top_writer_threads']:
print(f" Thread {thread_id}: {count} writes")
print("Top Reader Threads:")
for thread_id, count in stats['top_reader_threads']:
print(f" Thread {thread_id}: {count} reads")
print("=============================\n")
def reset_stats(self) -> None:
"""重置所有統計數據"""
with self._stats_lock:
self.concurrent_write_count.value = 0
self.concurrent_read_count.value = 0
self.total_write_count.value = 0
self.total_read_count.value = 0
self.overflow_count.value = 0
self.thread_write_counts.clear()
self.thread_read_counts.clear()
def __str__(self) -> str:
"""返回緩衝區的字符串表示"""
return f"RingBuffer(id={self.buffer_id}, capacity={self.capacity}, size={self.size()})"

@ -0,0 +1,340 @@
import asyncio
import serial_asyncio
# 附加驗證功能
import os
import sys
import serial
import signal
# XBee 模組
from xbee.frame import APIFrame
# =========================
SERIAL_PORT = '/dev/ttyACM0' # serial port
SERIAL_BAUDRATE = 57600 # serial baudrate
UDP_REMOTE_IP = '127.0.0.1' # UDP Target IP
UDP_REMOTE_PORT = 14550 # UDP Target port
# 測試用 只會吃一次資料
DEBUG_MODE = False
# =========================
def check_serial_port():
"""檢查串口是否存在與可用"""
# 檢查設備是否存在
if not os.path.exists(SERIAL_PORT):
print(f"錯誤:串口設備 {SERIAL_PORT} 不存在")
return False
# 檢查是否有權限訪問設備
try:
os.access(SERIAL_PORT, os.R_OK | os.W_OK)
except Exception as e:
print(f"錯誤:無法訪問串口設備 {SERIAL_PORT}{str(e)}")
return False
# 檢查是否被占用
try:
# 嘗試打開串口
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
ser.close() # 打開成功後立即關閉
return True
except serial.SerialException as e:
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
return False
except Exception as e:
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
return False
# =========================
class SerialToUDP(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發
def __init__(self, udp_protocol):
self.udp_protocol = udp_protocol
self.first_data = True # 標記是否為第一次收到資料
self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"Serial connection established on {SERIAL_PORT}")
## =====================================
# Serial 收到資料,轉發到 UDP
def data_received(self, data):
# 在 DEBUG 模式下,如果已經處理過數據,則直接返回 # debug
if DEBUG_MODE and self.has_processed:
return
# 標記首次收到資料
if hasattr(self.udp_protocol, 'send_udp'):
if self.first_data:
print(f"First data received from serial, forwarding to UDP: {data[:20]}...")
self.first_data = False
# 轉發數據
self.udp_protocol.send_udp(data)
if DEBUG_MODE: # 測試模式用 # debug
self.has_processed = True
print("[DEBUG] SerialToUDP Mark")
def write_to_serial(self, data):
# 在資料透過 Serial 發送之前進行處理
processed_data = self.encapsulate_data(data)
# 處理失敗就不發送
if processed_data not None:
self.transport.write(processed_data)
def encapsulate_data(self, data):
"""
將數據封裝為 XBee API 傳輸幀
使用 XBee API 格式封裝數據:
- 傳輸請求幀 (0x10)
- 使用廣播地址
- 添加適當的頭部和校驗和
"""
## 方法一
# 設置幀參數
frame_id = 0x01 # 幀識別碼,用於確認
frame_type = 0x10 # 幀類型:傳輸請求
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 64位目標地址 (廣播)
dest_addr16 = b'\xFF\xFE' # 16位目標地址 (未知/廣播)
broadcast_radius = 0x00 # 廣播半徑 (0 = 最大)
options = 0x00 # 傳輸選項
# 構建幀數據部分
frame_data = bytearray()
frame_data.append(frame_type) # 添加幀類型
frame_data.append(frame_id) # 添加幀 ID
frame_data.extend(dest_addr64) # 添加 64 位目標地址
frame_data.extend(dest_addr16) # 添加 16 位目標地址
frame_data.append(broadcast_radius) # 添加廣播半徑
frame_data.append(options) # 添加選項
frame_data.extend(data) # 添加實際數據內容
# 計算校驗和 (0xFF 減去所有字節的總和的最低 8 位)
checksum = 0xFF - (sum(frame_data) & 0xFF)
# 構建完整的 API 幀
# 起始分隔符 + 長度 (兩字節) + 幀數據 + 校驗和
complete_frame = bytearray()
complete_frame.append(0x7E) # 添加起始分隔符
complete_frame.extend(struct.pack(">H", len(frame_data))) # 添加長度 (高位優先)
complete_frame.extend(frame_data) # 添加幀數據
complete_frame.append(checksum) # 添加校驗和
return bytes(complete_frame)
## 方法二
# frame_id=0x01
# frame_type = 0x10
# dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
# dest_addr16 = b'\xFF\xFE'
# broadcast_radius = 0x00
# options = 0x00
# frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
# frame += dest_addr64 + dest_addr16
# frame += struct.pack(">BB", broadcast_radius, options) + data
# checksum = 0xFF - (sum(frame) & 0xFF)
# return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發
def __init__(self):
self.serial_transport = None
self.transport = None
self.remote_addr = None # 儲存動態獲取的遠程地址
self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
print("UDP transport ready. Waiting for serial data before sending...")
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
## =====================================
# UDP 收到資料
def datagram_received(self, data, addr):
# 儲存對方的地址(這樣就能向同一個來源回傳數據)
# self.remote_addr = addr
# print(f"Received UDP data from {addr}, setting as remote address")
# 在 DEBUG 模式下,如果已經處理過數據,則直接返回
if DEBUG_MODE and self.has_processed:
return
if self.serial_transport:
self.serial_transport.write_to_serial(data)
if DEBUG_MODE: # 測試模式用
self.has_processed = True
print("[DEBUG] UDPHandler Mark")
def send_udp(self, data):
# 發送資料到 UDP
# if self.transport:
# # 如果有已知的回應地址,使用該地址
# if self.remote_addr:
# self.transport.sendto(data, self.remote_addr)
# # print(f"Sending to dynamic address: {self.remote_addr}")
# else:
# # 否則使用預設地址
# self.transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
# print(f"Sending first UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}")
if self.transport:
# 只有第一次或地址改變時才顯示
# if not hasattr(self, '_last_sent_addr') or self._last_sent_addr != (UDP_REMOTE_IP, UDP_REMOTE_PORT):
# print(f"Sending UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}")
# self._last_sent_addr = (UDP_REMOTE_IP, UDP_REMOTE_PORT)
# 在透過 UDP 發送數據之前進行解封裝
decoded_data = self.decapsulate_data(data)
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def decapsulate_data(self, data):
# 這裡可以根據需要進行數據解封裝
## 方法一
try:
# 創建一個 API 幀處理器
api_frame = APIFrame(escaped=True)
# 嘗試解析數據
api_frame.fill(data)
# 如果數據不完整,直接返回原始數據
if not api_frame.is_complete():
return data
# 解析幀內容
parsed_data = api_frame.parse()
# 提取有用數據
if parsed_data:
frame_data = parsed_data.get('rf_data', None)
if frame_data:
return frame_data
return data
## 方法二 - 手動解析
# try:
# # XBee API 幀格式:
# # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節)
# # 檢查幀起始符 (0x7E)
# if not data or len(data) < 5 or data[0] != 0x7E:
# return data
# # 獲取數據長度 (不包括校驗和)
# length = (data[1] << 8) + data[2]
# # 檢查幀完整性
# if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和
# return data
# # 提取API標識符和數據
# frame_type = data[3]
# frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中
# # 根據不同的幀類型進行處理
# if frame_type == 0x90: # 例如,這是"接收數據包"類型
# # 提取實際有效載荷
# # 對於接收數據包,格式通常是:
# # API標識符(1) + 64位地址(8) + 16位地址(2) + 選項(1) + 數據
# if len(frame_data) >= 12: # 確保數據長度足夠
# payload = frame_data[11:] # 前11字節是地址和選項
# return payload
# return data
# 如果無法提取 則回傳 None
except Exception as e:
print(f"手動解析 XBee 數據錯誤: {e}")
return None
async def main():
# 先檢查串口是否可用
if not check_serial_port():
print("程式終止:串口檢查失敗")
return
loop = asyncio.get_running_loop()
# 設置終止處理
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(shutdown(loop))
)
# 建立單一 UDP 端點處理收發
udp_handler = UDPHandler()
# 建立 UDP 傳輸,不指定接收端口,讓系統自動分配
try:
udp_transport, protocol = await loop.create_datagram_endpoint(
lambda: udp_handler,
local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口
)
except Exception as e:
print(f"無法創建 UDP 端點:{str(e)}")
return
# 獲取系統分配的本地端口
sock = udp_transport.get_extra_info('socket')
local_addr = sock.getsockname()
print(f"UDP listening on {local_addr[0]}:{local_addr[1]}")
# 建立 Serial 傳輸,將 UDP 處理器傳給它
try:
serial_proto = SerialToUDP(udp_handler)
_, serial_transport = await serial_asyncio.create_serial_connection(
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
except Exception as e:
print(f"無法建立串口連接:{str(e)}")
udp_transport.close()
return
print(f"Waiting for data from serial port to initiate UDP communication...")
# 保持運行
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
async def shutdown(loop):
"""關閉程序"""
print("Shutting down...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程式被使用者中斷")
except Exception as e:
print(f"程式執行錯誤:{str(e)}")

@ -0,0 +1,448 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
from fc_network_adapter import mavlinkObject as mo
from fc_network_adapter import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 12
running_time = 500
print('test_item : ', test_item)
'''
測試項 個位數 表示分離的功能
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
'''
if test_item == 10:
# 需要開啟一個 ardupilot 的模擬器
# 測試 mavlink_object 放入 ring buffer 的應用
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
start_time = time.time()
while (time.time() - start_time) < running_time:
items_a = mo.stream_bridge_ring.get_all()
items_b = mo.return_packet_ring.get_all()
try:
print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}")
except IndexError:
print("stream_bridge_ring is empty")
try:
print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}")
except IndexError:
print("return_packet_ring is empty")
time.sleep(1)
manager.stop()
print('<=== End of Program')
elif test_item == 11:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
time.sleep(3)
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
if (time.time() - show_time) >= 2:
# print("mark B")
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
manager.stop()
analyzer.stop()
print('<=== End of Program')
elif test_item == 12:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試 mavlink object 作為交換器功能的代碼
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14554"
mavlink_socket_in1 = mavutil.mavlink_connection(connection_string)
mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1)
connection_string="udp:127.0.0.1:14551"
mavlink_socket_in2 = mavutil.mavlink_connection(connection_string)
mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2)
# 初始化輸出通道
connection_string="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
manager.add_mavlink_object(mavlink_object_out)
manager.add_mavlink_object(mavlink_object_in1)
manager.add_mavlink_object(mavlink_object_in2)
mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id)
mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id)
start_time = time.time()
while (time.time() - start_time) < running_time:
time.sleep(1)
manager.stop()
print('<=== End of Program')
# elif test_item == 20:
# # 這邊測試 node 生成 topic 的功能
# # 利用 空的通道 發出假的 heartbeat 封包
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# from pymavlink.dialects.v20 import common as mavlink2
# sysid = 1
# compid = 1
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# mav = mavlink2.MAVLink(None)
# mav.srcSystem = sysid
# mav.srcComponent = compid
# # 這是一個 heartbeat 封包
# fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
# type=mavlink2.MAV_TYPE_QUADROTOR,
# autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
# base_mode=3,
# custom_mode=0,
# system_status=0,
# mavlink_version=3
# )
# mavlink_bytes = fake_heartbeat.pack(mav)
# fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
# print(analyzer.mavlink_systems)
# mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
# time.sleep(0.1)
# print(analyzer.mavlink_systems)
# # ROS2 初始化
# analyzer._init_node()
# # 創建 ROS2 topic
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
# print("topic created")
# time.sleep(5)
# # 丟出 topic
# analyzer.emit_info()
# # 結束程式
# analyzer.running = False
# analyzer.destroy_node()
# rclpy.shutdown()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
# 初始化 rclpy 才能使用 node
rclpy.init()
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
time.sleep(0.5) # 系統 Setup 完成
# 創建通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
manager.add_mavlink_object(mavlink_object3)
print('=== waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(1)
except KeyboardInterrupt:
break
# 程式結束
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
manager.stop()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
# elif test_item == 22:
# # 需要開啟一個 ardupilot 的模擬器 與 GCS
# # 這邊是測試代碼 引入 rclpy 來測試 node 的運行
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 初始化兩個通道
# connection_string_in="udp:127.0.0.1:15551"
# mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
# mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
# mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
# connection_string_out="udpout:127.0.0.1:14553"
# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
# mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
# mavlink_object_out.multiplexingToAnalysis = []
# # 讓兩個通道互相傳輸
# mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# # 啟動通道
# mavlink_object_in.run()
# mavlink_object_out.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# print('目前所有的系統 : ')
# for sysid in analyzer.mavlink_systems:
# print(analyzer.mavlink_systems[sysid])
# compid = 1
# sysid = 1
# show_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# show_time2 = time.time()
# while time.time() - start_time < running_time:
# if (time.time() - show_time2) >= 1:
# analyzer.emit_info() # 這邊是測試 node 的運行
# # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
# fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
# sss = mavutil.mode_string_v10(fmsg)
# # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
# print("目前的飛行模式 : {}".format(sss))
# show_time2 = time.time()
# # if (time.time() - show_time) >= 2:
# # show_time = time.time()
# # for sysid in analyzer.mavlink_systems:
# # for compid in analyzer.mavlink_systems[sysid].components:
# # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# # print("===================")
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object_in.stop()
# mavlink_object_in.thread.join()
# mavlink_socket_in.close()
# mavlink_object_out.stop()
# mavlink_object_out.thread.join()
# mavlink_socket_out.close()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
# elif test_item == 51:
# # 晉凱的測試項目
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 創建通道
# connection_string="udp:127.0.0.1:15551"
# mavlink_socket3 = mavutil.mavlink_connection(connection_string)
# mavlink_object3 = mo.mavlink_object(mavlink_socket3)
# # 設定通道流動
# mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
# mavlink_object3.multiplexingToReturn = [] #
# # mavlink_object3.multiplexingToSwap = [] #
# # 啟動通道
# mavlink_object3.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# compid = 1
# sysid = 1
# start_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
# end_time = time.time()
# print(f"Execution time for create all topic: {end_time - start_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# while time.time() - start_time < running_time:
# try:
# # rclpy.spin(analyzer)
# analyzer.emit_info() # 這邊是測試 node 的運行
# time.sleep(0.5)
# except KeyboardInterrupt:
# break
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object3.stop()
# mavlink_object3.thread.join()
# analyzer.stop()
# analyzer.thread.join()
# mavlink_socket3.close()
# print('<=== End of Program')

@ -0,0 +1,468 @@
#!/usr/bin/env python
"""
測試腳本用於測試 mavlinkObject.py 中的 mavlink_object async_io_manager 類別
"""
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import unittest
import time
import threading
import socket
import asyncio
from unittest.mock import MagicMock, patch
# 導入要測試的模組
from fc_network_adapter.mavlinkObject import (
mavlink_object,
async_io_manager,
MavlinkObjectState,
stream_bridge_ring,
return_packet_ring
)
class MockMavlinkSocket:
"""模擬 Mavlink Socket 的類別,用於測試"""
def __init__(self, test_data=None):
self.closed = False
self.test_data = test_data or []
self.data_index = 0
self.written_data = []
def recv_msg(self):
if not self.test_data or self.data_index >= len(self.test_data):
return None
data = self.test_data[self.data_index]
self.data_index += 1
return data
def write(self, data):
self.written_data.append(data)
def close(self):
self.closed = True
class MockMavlinkMessage:
"""模擬 Mavlink 訊息的類別,用於測試"""
def __init__(self, msg_id, sysid, compid, seq=0):
self.msg_id = msg_id
self.sysid = sysid
self.compid = compid
self.seq = seq
self.msg_buf = bytes([msg_id, sysid, compid, seq])
def get_msgId(self):
return self.msg_id
def get_srcSystem(self):
return self.sysid
def get_srcComponent(self):
return self.compid
def get_seq(self):
return self.seq
def get_msgbuf(self):
return self.msg_buf
def get_type(self):
return f"MSG_ID_{self.msg_id}"
class TestMavlinkObject(unittest.TestCase):
"""測試 mavlink_object 類別"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建測試訊息
self.heartbeat_msg = MockMavlinkMessage(msg_id=0, sysid=1, compid=1)
self.attitude_msg = MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
self.position_msg = MockMavlinkMessage(msg_id=32, sysid=1, compid=1)
# 創建模擬的 socket # 假的 Mavlink Socket
self.mock_socket = MockMavlinkSocket([
self.heartbeat_msg,
self.attitude_msg,
self.position_msg
])
# 創建測試對象
self.mavlink_obj = mavlink_object(self.mock_socket)
def test_initialization(self):
"""測試 mavlink_object 初始化是否正確"""
# print("Testing mavlink_object initialization")
self.assertEqual(self.mavlink_obj.socket_id, 0)
self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT)
self.assertEqual(len(self.mavlink_obj.target_sockets), 0)
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0])
def test_add_remove_target_socket(self):
"""測試添加和移除目標端口功能"""
# 添加目標端口
self.assertTrue(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
self.assertEqual(self.mavlink_obj.target_sockets[0], 1)
self.assertTrue(self.mavlink_obj.add_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
self.assertIn(2, self.mavlink_obj.target_sockets)
# 嘗試添加已存在的端口
self.assertFalse(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 嘗試添加自己的端口
self.assertFalse(self.mavlink_obj.add_target_socket(0))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 移除端口
self.assertTrue(self.mavlink_obj.remove_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
# 嘗試移除不存在的端口
self.assertFalse(self.mavlink_obj.remove_target_socket(2))
def test_set_message_types(self):
"""測試設置訊息類型功能"""
# 設置橋接器訊息類型
self.assertTrue(self.mavlink_obj.set_bridge_message_types([0, 30]))
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0, 30])
# 設置回傳處理器訊息類型
self.assertTrue(self.mavlink_obj.set_return_message_types([32]))
self.assertEqual(self.mavlink_obj.return_msg_types, [32])
# 測試無效的訊息類型
self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid"))
self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"]))
def test_send_message(self):
"""測試 send_message 功能"""
# 創建一個新的 mavlink_object 實例
mock_socket = MockMavlinkSocket()
mavlink_obj = mavlink_object(mock_socket)
# 準備測試數據
test_message1 = b"test_message_1"
test_message2 = b"test_message_2"
# 測試初始狀態
self.assertEqual(len(mock_socket.written_data), 0)
# 測試非運行狀態下發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 0)
# 啟動 manager
manager = async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象到 manager
manager.add_mavlink_object(mavlink_obj)
time.sleep(0.1) # 等待對象啟動
# 確認對象狀態
self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING)
# 測試發送消息
self.assertTrue(mavlink_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
# 確認消息已發送
self.assertEqual(len(mock_socket.written_data), 1)
self.assertEqual(mock_socket.written_data[0], test_message1)
# 測試連續發送多條消息
self.assertTrue(mavlink_obj.send_message(test_message2))
time.sleep(0.2) # 等待消息處理
# 確認兩條消息都已發送
self.assertEqual(len(mock_socket.written_data), 2)
self.assertEqual(mock_socket.written_data[1], test_message2)
# 模擬發送出錯的情況
class ErrorWriteSocket(MockMavlinkSocket):
def write(self, data):
raise Exception("Write error")
error_socket = ErrorWriteSocket()
error_obj = mavlink_object(error_socket)
manager.add_mavlink_object(error_obj)
time.sleep(0.1) # 等待對象啟動
# 發送消息到錯誤的 socket
self.assertTrue(error_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
# 即使寫入失敗send_message 應該也返回 True
# 因為消息已成功加入到佇列中,只是後續的實際發送失敗
# 停止 manager
manager.stop()
time.sleep(0.5) # 等待 manager 停止
# 測試對象已關閉後發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加
class TestAsyncIOManager(unittest.TestCase):
"""測試 async_io_manager 類別"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建 async_io_manager 實例
self.manager = async_io_manager()
# 模擬 mavlink 對象
self.mock_socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
self.mock_socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1)
])
self.mavlink_obj1 = mavlink_object(self.mock_socket1)
self.mavlink_obj2 = mavlink_object(self.mock_socket2)
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.stop()
def test_singleton_pattern(self):
"""測試 async_io_manager 的單例模式 就是不會產生兩個 magager"""
manager1 = async_io_manager()
manager2 = async_io_manager()
self.assertIs(manager1, manager2)
def test_start_stop(self):
"""測試 async_io_manager 的啟動和停止功能"""
# 啟動管理器
self.manager.start()
self.assertTrue(self.manager.running)
self.assertIsNotNone(self.manager.thread)
# 再次啟動應該沒有效果
old_thread = self.manager.thread
self.manager.start()
self.assertIs(self.manager.thread, old_thread)
# 停止管理器
self.manager.stop()
self.assertFalse(self.manager.running)
# 最多等待 5 秒讓線程結束
start_time = time.time()
while self.manager.thread.is_alive() and time.time() - start_time < 5:
time.sleep(0.1)
self.assertFalse(self.manager.thread.is_alive())
def test_add_remove_objects(self):
"""測試添加和移除 mavlink_object"""
# 啟動管理器
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj1))
self.assertEqual(len(self.manager.managed_objects), 1)
self.assertEqual(self.mavlink_obj1.state, MavlinkObjectState.RUNNING)
# 添加另一個對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj2))
self.assertEqual(len(self.manager.managed_objects), 2)
# 檢查受管理對象列表
managed_objects = self.manager.get_managed_objects()
self.assertEqual(len(managed_objects), 2)
self.assertIn(0, managed_objects)
self.assertIn(1, managed_objects)
# 移除對象
self.assertTrue(self.manager.remove_mavlink_object(0))
self.assertEqual(len(self.manager.managed_objects), 1)
# 嘗試移除不存在的對象
self.assertFalse(self.manager.remove_mavlink_object(999))
# 停止管理器
self.manager.stop()
def test_data_processing(self):
"""測試數據處理"""
# 創建用於轉發的 mavlink_objects
mock_socket3 = MockMavlinkSocket()
mavlink_obj3 = mavlink_object(mock_socket3)
# 設置轉發
self.mavlink_obj1.add_target_socket(2) # socket1 轉發到 socket3
# 設置訊息類型
self.mavlink_obj1.set_bridge_message_types([0, 30]) # HEARTBEAT, ATTITUDE
# self.mavlink_obj1.enable_return(True)
self.mavlink_obj1.set_return_message_types([30]) # ATTITUDE
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
self.manager.add_mavlink_object(self.mavlink_obj1)
self.manager.add_mavlink_object(mavlink_obj3)
# 等待處理完成
time.sleep(1.0)
# print("testing Mark A")
# 檢查 Ring buffer 是否有正確的數據
self.assertEqual(stream_bridge_ring.size(), 2) # HEARTBEAT + ATTITUDE
self.assertEqual(return_packet_ring.size(), 1) # ATTITUDE
a = stream_bridge_ring.get()
print(f"stream_bridge_ring: {a}")
# 檢查是否正確轉發
self.assertEqual(len(mock_socket3.written_data), 2) # HEARTBEAT + ATTITUDE
# print("testing Mark B")
# 停止管理器
self.manager.stop()
def test_error_handling(self):
"""測試錯誤處理情況"""
print("=== mark A ===")
# 創建一個會引發異常的 socket
class ErrorSocket(MockMavlinkSocket):
def recv_msg(self):
raise Exception("Test exception")
error_socket = ErrorSocket()
mavlink_obj_err = mavlink_object(error_socket)
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
self.manager.add_mavlink_object(mavlink_obj_err)
print("=== mark B ===")
# 等待錯誤處理
time.sleep(1.0)
# 對象應該進入錯誤狀態,但不會崩潰
# self.assertEqual(mavlink_obj_err.state, MavlinkObjectState.ERROR)
print("=== mark C ===")
# 管理器應該仍在運行
self.assertTrue(self.manager.running)
# 故意等一段時間 確認 socket 有被 manager 處理掉
time.sleep(3)
# 停止管理器
self.manager.stop()
class TestIntegration(unittest.TestCase):
"""整合測試,測試多個 mavlink_object 之間的互動"""
def test_bidirectional_forwarding(self):
"""測試雙向轉發"""
# 清空全局變數和 ring buffer
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建三個 mavlink 對象,模擬三個通道
socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1),
MockMavlinkMessage(msg_id=32, sysid=2, compid=1)
])
socket3 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=3, compid=1),
MockMavlinkMessage(msg_id=33, sysid=3, compid=1)
])
obj1 = mavlink_object(socket1)
obj2 = mavlink_object(socket2)
obj3 = mavlink_object(socket3)
# 設置雙向轉發
# obj1 <-> obj2 <-> obj3
obj1.add_target_socket(1) # obj1 -> obj2
obj2.add_target_socket(0) # obj2 -> obj1
obj2.add_target_socket(2) # obj2 -> obj3
obj3.add_target_socket(1) # obj3 -> obj2
# 啟動 async_io_manager
manager = async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加所有 mavlink_object
manager.add_mavlink_object(obj1)
manager.add_mavlink_object(obj2)
manager.add_mavlink_object(obj3)
# 等待處理所有訊息
time.sleep(1.5)
# 檢查轉發結果
# socket1 應該收到 socket2 和 socket3 的訊息
self.assertEqual(len(socket1.written_data), 4) # 2 from obj2, 2 from obj3 via obj2
# socket2 應該收到 socket1 和 socket3 的訊息
self.assertEqual(len(socket2.written_data), 4) # 2 from obj1, 2 from obj3
# socket3 應該收到 socket1 和 socket2 的訊息
self.assertEqual(len(socket3.written_data), 4) # 2 from obj1 via obj2, 2 from obj2
# 檢查 ring buffer 的數據
# 假設所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0]
# 應該有 3 個 HEARTBEAT 訊息
self.assertEqual(stream_bridge_ring.size(), 3) # 3 HEARTBEAT
# 停止管理器
manager.stop()
if __name__ == "__main__":
unittest.main(defaultTest="TestMavlinkObject.test_send_message")

@ -0,0 +1,152 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import time
import threading
from fc_network_adapter.ringBuffer import RingBuffer
def producer(buffer, count, interval=0.01):
"""生產者:向緩衝區添加資料"""
print(f"Producer started (thread {threading.get_ident()})")
for i in range(count):
# 嘗試寫入數據,直到成功
while not buffer.put(f"Item-{i}"):
print(f"Buffer full, producer waiting... (thread {threading.get_ident()})")
time.sleep(0.1)
print(f"Produced: Item-{i}, buffer size: {buffer.size()}")
time.sleep(interval) # 模擬生產過程
print(f"Producer finished (thread {threading.get_ident()})")
def consumer(buffer, max_items, interval=0.05):
"""消費者:從緩衝區讀取資料"""
print(f"Consumer started (thread {threading.get_ident()})")
items_consumed = 0
while items_consumed < max_items:
# 嘗試讀取數據
item = buffer.get()
if item:
print(f"Consumed: {item}, buffer size: {buffer.size()}")
items_consumed += 1
else:
print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})")
time.sleep(interval) # 模擬消費過程
print(f"Consumer finished (thread {threading.get_ident()})")
def batch_consumer(buffer, interval=0.2):
"""批量消費者:一次性讀取緩衝區中的所有資料"""
print(f"Batch consumer started (thread {threading.get_ident()})")
for _ in range(5): # 執行5次批量讀取
time.sleep(interval) # 等待緩衝區積累數據
items = buffer.get_all()
if items:
print(f"Batch consumed {len(items)} items: {items}")
else:
print("Buffer empty for batch consumer")
print(f"Batch consumer finished (thread {threading.get_ident()})")
def demonstrate_multi_writer():
"""示範多個寫入執行緒同時操作緩衝區"""
print("\n=== Demonstrating Multiple Writers ===")
buffer = RingBuffer(capacity=80)
# 創建多個生產者執行緒
threads = []
for i in range(3):
thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1)))
threads.append(thread)
thread.start()
# 等待所有執行緒完成
for thread in threads:
thread.join()
buffer.print_stats() # 印出統計資訊
# 讀出所有剩餘資料
remaining = buffer.get_all()
print(f"Remaining items in buffer after multiple writers: {remaining}")
def demonstrate_basic_usage():
"""示範基本使用方式"""
print("\n=== Demonstrating Basic Usage ===")
# 創建緩衝區
buffer = RingBuffer(capacity=20, buffer_id=7)
# 檢查初始狀態
print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 添加幾個項目
for i in range(5):
buffer.put(f"Test-{i}")
# 檢查狀態
print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 讀取一個項目
item = buffer.get()
print(f"Read item: {item}")
print(f"After reading 1 item - Content Size: {buffer.size()}")
# 添加更多項目直到滿
items_added = 0
while not buffer.is_full():
buffer.put(f"Fill-{items_added}")
items_added += 1
print(f"Added {items_added} more items until full")
print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 嘗試添加到已滿的緩衝區
result = buffer.put("Overflow")
print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}")
# 獲取所有項目
all_items = buffer.get_all()
print(f"All items in buffer: {all_items}")
print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}")
# 印出統計資訊
buffer.print_stats()
def demonstrate_producer_consumer():
"""示範生產者-消費者模式"""
print("\n=== Demonstrating Producer-Consumer Pattern ===")
buffer = RingBuffer(capacity=16)
# 創建生產者和消費者執行緒
producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1))
consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2))
batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5))
# 啟動執行緒
producer_thread.start()
consumer_thread.start()
batch_thread.start()
# 等待執行緒完成
producer_thread.join()
consumer_thread.join()
batch_thread.join()
# 檢查最終狀態
print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}")
buffer.print_stats()
if __name__ == "__main__":
# 展示各種使用場景
# demonstrate_basic_usage()
# demonstrate_producer_consumer()
demonstrate_multi_writer()
print("\nAll demonstrations completed!")

@ -1,33 +0,0 @@
import socket
import os
# socket file path
SOCKET_PATH = "/tmp/unix_socket_example"
# 若檔案存在就先刪除
if os.path.exists(SOCKET_PATH):
os.remove(SOCKET_PATH)
# 建立 socket
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# 綁定 socket 到檔案
server.bind(SOCKET_PATH)
server.listen(1)
print("🔌 Server waiting for connection...")
# 等待 client 連線
conn, _ = server.accept()
print("✅ Client connected.")
while True:
data = conn.recv(1024)
if not data:
break
print("📥 Received:", data.decode())
conn.sendall(b"Echo: " + data)
conn.close()
server.close()
os.remove(SOCKET_PATH)

@ -0,0 +1,94 @@
import socket
import os
# # socket file path
# SOCKET_PATH = "/tmp/unix_socket_example"
# # 若檔案存在就先刪除
# if os.path.exists(SOCKET_PATH):
# os.remove(SOCKET_PATH)
# # 建立 socket
# server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# # 綁定 socket 到檔案
# server.bind(SOCKET_PATH)
# server.listen(1)
# print("🔌 Server waiting for connection...")
# # 等待 client 連線
# conn, _ = server.accept()
# print("✅ Client connected.")
# while True:
# data = conn.recv(1024)
# if not data:
# break
# print("📥 Received:", data.decode())
# conn.sendall(b"Echo: " + data)
# conn.close()
# server.close()
# os.remove(SOCKET_PATH)
# =====================
# from pymavlink import mavutil
# def create_unix_socket_connection():
# # 建立一個 mavtcpin 實例
# mav_conn = mavutil.mavtcpin("127.0.0.250:9999", source_system=1, source_component=1)
# # 替換底層 socket
# # 關閉原有的 socket
# mav_conn.listen.close()
# # 創建 Unix socket 並替換
# unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# unix_socket_path = "/tmp/unix_socket_mavlink.sock"
# # 確保先前的 unix socket 已被移除
# if os.path.exists(unix_socket_path):
# os.remove(unix_socket_path)
# # 綁定並設定 Unix socket
# unix_socket.bind(unix_socket_path)
# unix_socket.listen(1)
# unix_socket.setblocking(0)
# mavutil.set_close_on_exec(unix_socket.fileno())
# # 替換 listen socket
# mav_conn.listen = unix_socket
# mav_conn.fd = unix_socket.fileno()
# # mav_conn.port = unix_socket
# return mav_conn
# a = create_unix_socket_connection()
# # print(a.port)
# ===============================
import mavunixin
mav_conn = mavunixin.mavunixin("unix:/tmp/unix_socket_mavlink.sock", source_system=1, source_component=1)
import time
print("🔌 Server waiting for connection...")
while True:
time.sleep(1)
mavlinkMsg = mav_conn.recv_msg()
if mavlinkMsg is not None:
print("📥 Received:", mavlinkMsg)
# a.send(mavlinkMsg)
# print("📤 Server replied:", data.decode())
else:
print("No message received.")

@ -0,0 +1,109 @@
import curses
import serial
import struct
from pymavlink.dialects.v20 import ardupilotmega as mavlink2
PORT = "COM5" # or "/dev/ttyUSB0"
BAUD = 57600
target_system = 5
target_component = 1
MAV_CMD_DO_SET_MODE = 176
mode_list = [
("STABILIZE", 0),
("AUTO", 3),
("GUIDED", 4),
("LOITER", 5)
]
ser = serial.Serial(PORT, BAUD)
class PacketCapture:
def __init__(self):
self.data = bytearray()
def write(self, b):
self.data.extend(b)
return len(b)
def build_api_tx_frame(data: bytes, frame_id=0x01):
frame_type = 0x10
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
def curses_main(stdscr):
curses.curs_set(0)
selected = 0
while True:
stdscr.clear()
h, w = stdscr.getmaxyx()
stdscr.addstr(1, 2, "模式選單(使用:箭頭選擇Enter:發送q:離開)")
for i, (name, _) in enumerate(mode_list):
if i == selected:
stdscr.attron(curses.A_REVERSE)
stdscr.addstr(i + 3, 4, f"> {name}")
stdscr.attroff(curses.A_REVERSE)
else:
stdscr.addstr(i + 3, 4, f" {name}")
key = stdscr.getch()
if key == curses.KEY_UP:
selected = (selected - 1) % len(mode_list)
elif key == curses.KEY_DOWN:
selected = (selected + 1) % len(mode_list)
elif key in [10, 13]: # Enter
name, custom_mode = mode_list[selected]
capture = PacketCapture()
mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1)
mav.version = 2
msg = mav.command_long_encode(
target_system=target_system,
target_component=target_component,
command=MAV_CMD_DO_SET_MODE,
confirmation=0,
param1=1,
param2=custom_mode,
param3=0, param4=0, param5=0, param6=0, param7=0
)
print("msg =", msg) # 確認封包物件生成
mav.send(msg) # 改為 send() 會寫入 capture
print("RAW HEX:", capture.data.hex())
api_frame = build_api_tx_frame(capture.data)
ser.write(api_frame)
# 顯示封包資訊
msg_line = min(h - 4, len(mode_list) + 5)
stdscr.addstr(msg_line, 2, f"發送模式切換:{name} ({custom_mode})")
stdscr.addstr(msg_line + 1, 2, f"MAVLink HEX: {' '.join(f'{b:02x}' for b in capture.data)[:w-4]}")
stdscr.addstr(msg_line + 2, 2, f"XBee API HEX: {' '.join(f'{b:02x}' for b in api_frame)[:w-4]}")
stdscr.refresh()
curses.napms(1500)
elif key in [ord('q'), ord('Q')]:
break
stdscr.refresh()
try:
curses.wrapper(lambda stdscr: curses_main(stdscr)) # 使用 lambda 函數來傳遞 ser
except Exception as e:
print(f"發生錯誤: {e}")
finally:
ser.close()
print("程式結束,串口已關閉")

@ -38,4 +38,4 @@ try:
pass
except KeyboardInterrupt:
print("\nProgram terminated.")
ser.close()
ser.close()

@ -0,0 +1,60 @@
from xbee import XBee
import serial
from pymavlink import mavutil
import time
# 配置 XBee 串口連接
ser = serial.Serial('COM5', 57600) # XBee 串口
# 無人機系統 ID假設為 5和目標系統 ID假設為 1
system_id = 5
target_system_id = 1 # 設定目標系統 ID這裡假設目標系統 ID 是 1
# 目標設備的 64 位地址(請替換為實際的無人機地址)
target_address = b'\x00\x13\xa2\x00\x40\x5f\x88\x56' # 例如00 13 A2 00 40 5F 88 56
def decode_mavlink_data(data,master):
"""解碼 MAVLink 的原始數據並處理 HEARTBEAT 訊息"""
try:
msg = master.parse_char(data)
if msg:
if msg.get_type() == "HEARTBEAT":
print(f"Raw MAVLink Data (Hex): {data.hex()}")
print(f"Received MAVLink message: {msg}")
print(f"System status: {msg.system_status}")
print(f"Flight mode: {mavutil.mode_string_v10(msg)}")
print(f"System ID: {msg.get_srcSystem()}") # 使用 get_srcSystem() 獲取 system_id
except Exception as e:
print(f"Failed to decode MAVLink data: {e}")
def receive_packets(ser):
xbee = XBee(ser)
# 創建 MAVLink 解析器並與 XBee 串口連接
master = mavutil.mavlink.MAVLink(ser)
while True:
try:
# 從 XBee 接收數據
xbee_data = xbee.wait_read_frame()
# 讀取 `rf_data` 而非 `payload`
raw_data = xbee_data.get('rf_data', None)
if raw_data is None:
print("Warning: No 'rf_data' found in received XBee data!")
continue # 跳過此次循環
# 解碼 MAVLink 訊息
decode_mavlink_data(raw_data,master)
# 根據需要觸發模式切換,例如根據用戶輸入更改模式
except KeyboardInterrupt:
print("Exiting...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(0.1)
Loading…
Cancel
Save