重大更新 :

1. 修正不同 socket 轉傳間效能低落問題
於 mavlinkObject.py
重構 mavlink_object
增加 async_io_manager
(版本目標功能調整 寫於程式碼註解中)
2. 調整各個程式碼的檔案位置 讓測試用程式與專案程式分離
chiyu
Chiyu Chen 11 months ago
parent 7f7753d0b4
commit 6fab72ac97

@ -1,290 +0,0 @@
import asyncio
import socket
import threading
import time
import logging
from typing import Dict, Any, Optional, List, Callable, Tuple
from pymavlink import mavutil
from dataclasses import dataclass, field
from .ringBuffer import RingBuffer
# 設置日誌
logger = logging.getLogger("async_io_manager")
@dataclass
class Connection:
"""表示一個 MAVLink 連接"""
socket_id: int
socket: Any # pymavlink socket 或相容物件
buffer: RingBuffer = field(default_factory=lambda: RingBuffer(1024))
last_activity: float = field(default_factory=time.time)
is_closed: bool = False
stats: Dict[str, int] = field(default_factory=lambda: {"rx_count": 0, "rx_bytes": 0, "errors": 0})
class AsyncIOManager:
"""
異步 I/O 管理器
負責高效處理所有 MAVLink socket I/O 操作
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
cls._instance = super(AsyncIOManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self.connections: Dict[int, Connection] = {}
self.next_socket_id = 0
self._running = False
self._event_loop = None
self._thread = None
self.check_interval = 0.01 # 檢查間隔,單位:秒
def start(self):
"""啟動異步 I/O 管理器"""
if self._running:
logger.warning("AsyncIOManager already running")
return
self._running = True
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._thread.start()
logger.info("AsyncIOManager started")
def _run_event_loop(self):
"""在專用執行緒中執行 asyncio 事件循環"""
try:
# 創建新的事件循環
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._event_loop = loop
# 添加主要協程任務
loop.create_task(self._process_all_connections())
# 運行事件循環
loop.run_forever()
except Exception as e:
logger.error(f"Error in AsyncIOManager event loop: {e}")
finally:
if loop.is_running():
loop.stop()
logger.info("AsyncIOManager event loop stopped")
async def _process_all_connections(self):
"""處理所有連接的主循環"""
while self._running:
for socket_id, conn in list(self.connections.items()):
if conn.is_closed:
continue
try:
# 非阻塞模式讀取數據
await asyncio.get_event_loop().run_in_executor(
None, self._read_from_connection, conn
)
except Exception as e:
conn.stats["errors"] += 1
logger.error(f"Error processing connection {socket_id}: {e}")
# 短暫休息,讓出 CPU
await asyncio.sleep(self.check_interval)
def _read_from_connection(self, conn: Connection):
"""從連接讀取數據,此函數在執行器中運行"""
try:
# 使用 mavlink 的非阻塞讀取
msg = conn.socket.recv_msg()
if msg:
# 收到消息,更新統計資訊
conn.last_activity = time.time()
conn.stats["rx_count"] += 1
conn.stats["rx_bytes"] += len(msg.get_msgbuf())
# 將消息放入緩衝區
if not conn.buffer.put(msg):
# 緩衝區已滿,記錄錯誤
logger.warning(f"Buffer full for connection {conn.socket_id}, message dropped")
except Exception as e:
conn.stats["errors"] += 1
if "Connection refused" not in str(e): # 忽略常見的連接拒絕錯誤
logger.error(f"Error reading from socket {conn.socket_id}: {e}")
def stop(self):
"""停止異步 I/O 管理器"""
if not self._running:
return
self._running = False
# 停止事件循環
if self._event_loop:
asyncio.run_coroutine_threadsafe(
self._shutdown_loop(), self._event_loop
)
# 等待執行緒結束
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2.0)
if self._thread.is_alive():
logger.warning("AsyncIOManager thread did not terminate cleanly")
logger.info("AsyncIOManager stopped")
async def _shutdown_loop(self):
"""關閉事件循環"""
# 關閉所有連接
for socket_id in list(self.connections.keys()):
self.remove_connection(socket_id)
# 停止事件循環
loop = asyncio.get_event_loop()
loop.stop()
def register_connection(self, socket: Any) -> int:
"""
註冊新的連接
Args:
socket: pymavlink socket 或相容物件
Returns:
int: 分配的 socket_id
"""
socket_id = self.next_socket_id
self.next_socket_id += 1
# 創建新的連接
connection = Connection(socket_id=socket_id, socket=socket)
self.connections[socket_id] = connection
logger.info(f"Registered new connection with ID {socket_id}")
return socket_id
def remove_connection(self, socket_id: int) -> bool:
"""
移除連接
Args:
socket_id: 連接 ID
Returns:
bool: 成功移除返回 True
"""
if socket_id in self.connections:
conn = self.connections[socket_id]
conn.is_closed = True
try:
# 關閉原始 socket
if hasattr(conn.socket, 'close'):
conn.socket.close()
except Exception as e:
logger.error(f"Error closing socket {socket_id}: {e}")
# 從字典中移除
self.connections.pop(socket_id)
logger.info(f"Removed connection with ID {socket_id}")
return True
return False
def get_buffer(self, socket_id: int) -> Optional[RingBuffer]:
"""
獲取指定連接的緩衝區
Args:
socket_id: 連接 ID
Returns:
RingBuffer None如果連接不存在
"""
if socket_id in self.connections:
return self.connections[socket_id].buffer
return None
def get_stats(self, socket_id: int = None) -> Dict:
"""
獲取連接統計資訊
Args:
socket_id: 指定連接 IDNone 表示所有連接
Returns:
Dict: 統計資訊
"""
if socket_id is not None and socket_id in self.connections:
return self.connections[socket_id].stats
# 彙總所有連接的統計資訊
total_stats = {"rx_count": 0, "rx_bytes": 0, "errors": 0, "connection_count": len(self.connections)}
for conn in self.connections.values():
for key, value in conn.stats.items():
if key in total_stats:
total_stats[key] += value
return total_stats
def send_message(self, socket_id: int, message: Any) -> bool:
"""
發送 MAVLink 訊息
Args:
socket_id: 連接 ID
message: 要發送的訊息
Returns:
bool: 成功發送返回 True
"""
if socket_id not in self.connections:
return False
conn = self.connections[socket_id]
if conn.is_closed:
return False
try:
# 使用 asyncio.run_coroutine_threadsafe 在事件循環中執行 IO 操作
future = asyncio.run_coroutine_threadsafe(
self._send_message_async(conn, message),
self._event_loop
)
# 等待操作完成(可選)
return future.result(timeout=1.0)
except Exception as e:
logger.error(f"Error sending message on socket {socket_id}: {e}")
return False
async def _send_message_async(self, conn: Connection, message: Any) -> bool:
"""在事件循環中異步發送訊息"""
try:
# 調用執行器進行實際發送操作
return await asyncio.get_event_loop().run_in_executor(
None, self._send_message_sync, conn, message
)
except Exception as e:
conn.stats["errors"] += 1
logger.error(f"Async error sending message: {e}")
return False
def _send_message_sync(self, conn: Connection, message: Any) -> bool:
"""同步發送訊息函數,在執行器中運行"""
try:
if hasattr(message, 'get_msgbuf'):
# 如果是 MAVLink 訊息對象
conn.socket.write(message.get_msgbuf())
else:
# 假設是已編碼的字節
conn.socket.write(message)
conn.last_activity = time.time()
return True
except Exception as e:
conn.stats["errors"] += 1
logger.error(f"Error in sync message send: {e}")
return False

@ -1,624 +0,0 @@
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
import mavlinkObject as mo
import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 12
running_time = 3000
print('test_item : ', test_item)
'''
測試項 個位數 表示分離的功能
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
'''
if test_item == 1:
# 測試 mavlink_object 中 updateMultiplexingList 的輸出
print('===> Start of Program .Test ', test_item)
mavlink_object_none = mo.mavlink_object(None)
print('=====================')
print('正常範例')
mavlink_object_none.multiplexingToAnalysis = []
mavlink_object_none.multiplexingToReturn = []
# mavlink_object_none.multiplexingToSwap = [[]]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 長度不一致')
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('=====================')
print('正常範例')
mo.swap_queues.append(queue.Queue())
mo.swap_queues.append(queue.Queue())
mavlink_object_none.multiplexingToAnalysis = [0, 1, 2, 3, 4, 5]
mavlink_object_none.multiplexingToReturn = [6, 7, 8, 9, 10]
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 multiplexingToAnalysis 不可以有 -1')
mavlink_object_none.multiplexingToAnalysis = [-1, 1, 2, 3, 4, 5]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('<=== End of Program')
elif test_item == 2:
# 測試 mavlink_object 創建時 socket_id 是否正確
# 說實在 這個測試項 似乎因為 python 的 GC 機制 會導致難以測試
print('===> Start of Program .Test ', test_item)
mavlink_object_none1 = mo.mavlink_object(None)
mavlink_object_none2 = mo.mavlink_object(None)
mavlink_object_none3 = mo.mavlink_object(None)
del mavlink_object_none2
print(len(mo.swap_queues))
mavlink_object_none2 = mo.mavlink_object(None)
print(len(mo.swap_queues))
print(mavlink_object_none1._multiplexingList)
print('<=== End of Program')
elif test_item == 10:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# 只啟用了 mavlink_object 的功能
print('===> Start of Program .Test ', test_item)
# 創建一個空的通道 這個通道的 socket_id 是 0
mavlink_object_none = mo.mavlink_object(None)
# 創建另一個通道
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket)
# 設定通道轉發的參數
mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE
mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT
mavlink_object1.multiplexingToSwap[0] = [74, ] # only VFR_HUD
# print(mavlink_object1.multiplexingToSwap)
# print(mo.swap_queues)
# 啟動通道
mavlink_object1.run()
# 確認轉拋的設定有沒有錯
print("_multiplexingList for mavlink object :", mavlink_object1._multiplexingList)
# 運行幾秒並印出 queue 的資料
start_time = time.time()
while time.time() - start_time < running_time:
while not mo.fixed_stream_bridge_queue.empty():
print('fixed_stream_bridge_queue:')
t = mo.fixed_stream_bridge_queue.get()
print('from {} : {}'.format(t[0],t[2]))
while not mo.return_packet_processor_queue.empty():
print('return_packet_processor_queue:')
t = mo.return_packet_processor_queue.get()
print('from {} : {}'.format(t[0],t[2]))
# print(t[2].get_msgbuf())
# print(t[2].type)
for n in range(len(mo.swap_queues)):
q = mo.swap_queues[n]
while not q.empty():
print('swap_queues:')
t = q.get()
print('{} gets : {}'.format(n,t))
# 結束程式 退出所有 thread
mavlink_object1.running = False
mavlink_object1.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 11:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
# 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能
print('===> Start of Program .Test ', test_item)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object2 = mo.mavlink_object(mavlink_socket)
# mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT
# mavlink_object2.multiplexingToReturn = []
# 啟動連線的模組
mavlink_object2.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(2)
print("=== connection established! ===")
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
start_time = time.time()
show_time = time.time()
compid = 1
while time.time() - start_time < running_time:
if (time.time() - show_time) >= 1:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count)
print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print(mavlink_socket.last_seq)
print("===================")
# 結束程式 退出所有 thread
mavlink_object2.stop()
mavlink_object2.thread.join()
analyzer.stop()
analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 12:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試 mavlink object 作為交換器功能的代碼
print('===> Start of Program .Test ', test_item)
# 啟動連線的模組
analyzer = mo.mavlink_bridge()
# 初始化輸入通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT
# 初始化輸出通道
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = [0]
# 做一個空的通道驗證 可以拿來 debug
mavlink_object_none = mo.mavlink_object(None)
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(3)
print("=== connection established! ===")
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
# print(type(mavlink_socket_out))
# print(type(mavlink_socket_out.mav))
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False)
print('none object : ', test)
except queue.Empty:
pass
if (time.time() - show_time) >= 2:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_in.close()
mavlink_socket_out.close()
analyzer.stop()
print('<=== End of Program')
elif test_item == 13:
# 這邊測試看看能否狸貓換太子的方式
# 把原來 pymavlink 的 mavtcpin 換成 unix socket
print('===> Start of Program .Test ', test_item)
import os
import socket
# 建立一個 mavtcpin 實例
import mavunixin
# ===== 以下就是按照原本測試 12 的流程 =====
# 啟動連線的模組
analyzer = mo.mavlink_bridge()
# 初始化輸入通道
mavlink_socket_unix = mavunixin.mavunixin("unix:/tmp/unix_socket_mavlink.sock", source_system=1, source_component=1)
mavlink_object_in = mo.mavlink_object(mavlink_socket_unix)
mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT
# 停留五秒 我要啟動另一邊
time.sleep(5)
# 初始化輸出通道
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = [0]
# 做一個空的通道驗證 可以拿來 debug
mavlink_object_none = mo.mavlink_object(None)
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(3)
print("=== connection established! ===")
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
# print(type(mavlink_socket_out))
# print(type(mavlink_socket_out.mav))
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False)
print('none object : ', test)
except queue.Empty:
pass
if (time.time() - show_time) >= 2:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_unix.close()
mavlink_socket_out.close()
analyzer.stop()
print('<=== End of Program')
elif test_item == 20:
# 這邊測試 node 生成 topic 的功能
# 利用 空的通道 發出假的 heartbeat 封包
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
from pymavlink.dialects.v20 import common as mavlink2
sysid = 1
compid = 1
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
mav = mavlink2.MAVLink(None)
mav.srcSystem = sysid
mav.srcComponent = compid
# 這是一個 heartbeat 封包
fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
type=mavlink2.MAV_TYPE_QUADROTOR,
autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
base_mode=3,
custom_mode=0,
system_status=0,
mavlink_version=3
)
mavlink_bytes = fake_heartbeat.pack(mav)
fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
print(analyzer.mavlink_systems)
mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
time.sleep(0.1)
print(analyzer.mavlink_systems)
# ROS2 初始化
analyzer._init_node()
# 創建 ROS2 topic
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
print("topic created")
time.sleep(5)
# 丟出 topic
analyzer.emit_info()
# 結束程式
analyzer.running = False
analyzer.destroy_node()
rclpy.shutdown()
analyzer.stop()
analyzer.thread.join()
print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
# 啟動通道
mavlink_object3.run()
print('=== waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(1)
except KeyboardInterrupt:
break
# 程式結束
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 22:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 初始化兩個通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = []
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
show_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
show_time2 = time.time()
while time.time() - start_time < running_time:
if (time.time() - show_time2) >= 1:
analyzer.emit_info() # 這邊是測試 node 的運行
# sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
sss = mavutil.mode_string_v10(fmsg)
# print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
print("目前的飛行模式 : {}".format(sss))
show_time2 = time.time()
# if (time.time() - show_time) >= 2:
# show_time = time.time()
# for sysid in analyzer.mavlink_systems:
# for compid in analyzer.mavlink_systems[sysid].components:
# print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# print("===================")
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_socket_in.close()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_out.close()
analyzer.stop()
analyzer.thread.join()
print('<=== End of Program')
elif test_item == 51:
# 晉凱的測試項目
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket3 = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket3)
# 設定通道流動
mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
mavlink_object3.multiplexingToReturn = [] #
# mavlink_object3.multiplexingToSwap = [] #
# 啟動通道
mavlink_object3.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create all topic: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(0.5)
except KeyboardInterrupt:
break
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket3.close()
print('<=== End of Program')

@ -1,6 +1,6 @@
# 自定義的 import
from theLogger import setup_logger
from .theLogger import setup_logger
# ====================== 分割線 =====================

@ -1,8 +1,7 @@
'''
# 不同的匯流排只有單一種通訊協定
# 匯流排接到訊息後透過 queue stack 傳送到橋接器
# 會有三種 Queue 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 3. 轉拋串流
# 第三種比較麻煩 會需要用 list 管理多個轉換器的 queue
# 匯流排接到訊息後透過 ring buffer 傳送到橋接器
# 會有兩種 RingBuffer 分別作為 1. 固定串流橋接器 2. 回傳封包處理器
# 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去
# 關於 mavlink 採用 pymavlink 這個套件 製作匯流排
@ -13,8 +12,11 @@
# 基礎功能的 import
import threading
import queue
# import queue
import time
import asyncio
from enum import Enum, auto
# from typing import Dict, List, Optional, Set, Any, Tuple
# mavlink 的 import
from pymavlink import mavutil
@ -23,38 +25,17 @@ from pymavlink import mavutil
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
# 基礎功能的 import
import threading
import queue
import time
# mavlink 的 import
from pymavlink import mavutil
# ROS2 的 import
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device, mavlink_systems
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
from .mavlinkDevice import mavlink_device, mavlink_systems
from .mavlinkPublish import mavlink_publisher
from .theLogger import setup_logger
from .ringBuffer import RingBuffer
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
fixed_stream_bridge_queue = queue.Queue()
return_packet_processor_queue = queue.Queue()
swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer
stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255)
return_packet_ring = RingBuffer(capacity=256, buffer_id=254)
# ====================== 分割線 =====================
@ -75,6 +56,8 @@ class mavlink_bridge(Node, mavlink_publisher):
node 區塊則是處理 ros2 publisher subscriber 訂閱相關
藉由控制 ros2 的機制再把 device object 的資訊發送出去
fixed_stream_bridge_queue 置換成 stream_bridge_ring
ps. 我限制了這個 class 只能有一個 instance
'''
_instance = None
@ -106,13 +89,25 @@ class mavlink_bridge(Node, mavlink_publisher):
# === Thread 區塊 ===
def _run_thread(self):
# start_time = time.time() # debug
start_time = time.time() # debug
run_loop_count = 0 # debug
logger.info('Start of mavlink_bridge._run_thread')
# 從 Queue fixed_stream_bridge_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
# 從 Queue stream_bridge_ring 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
while self.running:
if fixed_stream_bridge_queue.empty():
# # 這個迴圈每秒鐘被執行了幾輪? # 這整段都是 debug 用的
# current_time = time.time()
# if (current_time - start_time) >= 1.0:
# logger.info(f'mavlink_bridge._run_thread loop count: {run_loop_count}')
# run_loop_count = 0
# start_time = current_time
# else:
# run_loop_count += 1
# # =========================
if stream_bridge_ring.is_empty():
continue
msg_pack = fixed_stream_bridge_queue.get()
msg_pack = stream_bridge_ring.get()
msg = msg_pack[2]
sysid = msg.get_srcSystem()
@ -150,7 +145,7 @@ class mavlink_bridge(Node, mavlink_publisher):
this_component.emitParams['flightMode'] = msg # debug
# print("mav_type : ", msg.type) # debug
# print("get mode :", mavutil.mode_string_v10(msg)) # debug
print("get mode :", mavutil.mode_string_v10(msg)) # debug
# print("record mode :", this_component.emitParams['flightMode_mode']) # debug
elif msg_id == 30: # ATTITUDE 處理
@ -201,24 +196,45 @@ class mavlink_bridge(Node, mavlink_publisher):
# ====================== 分割線 =====================
class mavlink_object():
'''
本次改版的暫時記錄註解於此
1. mavlink_object 由於 Queue 的效能太差 會完全移除
其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現
multiplexingToSwap 會完全被移除代替方式下一條描述
2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流
(註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object)
因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移
並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出
3. mavlink_object 需要加上 state 去管理其狀態
4. mavlink_object 需要加上 target port 去管理寫出的目標
5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道
6. 基於第1,2 updateMultiplexingList 會被完全移除
7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object
8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring return_packet_ring 來取代 另外 swap_queues 會被完全移除
'''
# 定義 mavlink_object 的狀態
class MavlinkObjectState(Enum):
INIT = auto() # 初始化狀態
RUNNING = auto() # 運行中狀態
ERROR = auto() # 錯誤狀態
CLOSED = auto() # 已關閉狀態
class mavlink_object:
'''
每個 mavlink bus 都會有一個 mavlink_object
其中主要是 thread 做統計封包與分流
分流表的控制在三個 list 分別為
multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的
multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的
multiplexingToSwap : 這個 list 是用來分流到轉拋串流的
透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為
使用 asyncio 處理資料流 RingBuffer 來分配訊息
直接透過 socket 寫出
'''
mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) }
socket_num = 0 # 用來記錄目前的 socket 數量
def __new__(cls, *args, **kwargs):
# 每創建一個實例 就將其添加到 mavlinkObjects 列表中
# 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號
# 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號
# socket_id 從 0 開始
instance = super().__new__(cls)
socket_id = 0
for k in cls.mavlinkObjects.keys():
@ -232,180 +248,389 @@ class mavlink_object():
return instance
def __init__(self, socket):
# 登入所需的 socket
self.mavlink_socket = socket
# 這邊變數是執行的時候被使用的 不要直接寫入它
self._multiplexingList = []
# 存放要發送的訊息的 queue 或稱 buffer
self.output_buffer = queue.Queue()
if self.socket_id >= len(swap_queues):
swap_queues.append(self.output_buffer)
else:
swap_queues[self.socket_id] = self.output_buffer
# 存放目前是否分流到固定串流橋接器和回傳封包處理器的標誌 # 都是要做的 把這個判斷拿掉
# self.send_to_bridge = True
# self.send_to_return = False
# 用於主線程發送消息的緩衝區
self.outgoing_msg_lock = threading.Lock()
self.outgoing_msgs = []
# 記錄訊息過濾類型 (可選)
self.bridge_msg_types = [0] # 默認只處理 HEARTBEAT
self.return_msg_types = []
# 目標端口
self.target_sockets = []
# 關聯到全域變數
global mavlink_systems
self.mavlink_systems = mavlink_systems
# 這三個 list 用來分配不同的訊息到不同的 queue
self.multiplexingToAnalysis = [
0, # HEARTBEAT # 挺必要的項目
# 24, # GPS_RAW_INT
# 30, # ATTITUDE
# 33, # GLOBAL_POSITION_INT
# 74, # VFR_HUD
# 147, # BATTERY_STATUS
]
self.multiplexingToReturn = []
self.multiplexingToSwap = [
[] for _ in range(len(swap_queues))
]
# 刷新其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) <= self.socket_id):
object.multiplexingToSwap.append([])
object.updateMultiplexingList()
logger.info('mavlink_object instance {} created'.format(self.socket_id))
# 物件變數
self.task = None # Task reference
self.dirtyDataCount = 0 # 髒資料計數器
self.dirtyDataMax = 10 # 髒資料容許閾值
self.state = MavlinkObjectState.INIT
logger.info(f'mavlink_object instance {self.socket_id} created')
def __del__(self):
# 停下自己的 thread
if self.mavlink_socket != None:
self.mavlink_socket.close()
# 停止 asyncio task
self.stop()
# 移除其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) > self.socket_id):
object.multiplexingToSwap[self.socket_id] = []
object.updateMultiplexingList()
# 移除自己的 swap_queues
swap_queues[self.socket_id] = None
# 關閉 socket
if hasattr(self, 'mavlink_socket') and self.mavlink_socket:
try:
self.mavlink_socket.close()
except:
pass
# 處理 class 的 instance 記錄
self.socket_num -= 1
self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance
if hasattr(self.__class__, 'socket_num'):
self.__class__.socket_num -= 1
# logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替
# print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
if hasattr(self.__class__, 'mavlinkObjects') and hasattr(self, 'socket_id'):
if self.socket_id in self.__class__.mavlinkObjects:
self.__class__.mavlinkObjects.pop(self.socket_id)
# 這段不知道怎麼了 反正會一直讓 logger ERROR 我先關掉
# try:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
# logger.info(f'mavlink_object instance {self.socket_id} deleted')
# except Exception as e:
# print(f"Error logging in __del__: {e}")
# if 'logger' in globals() and logger:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
def run(self):
# TODO 檢查 socket 是否有連線
self.thread = threading.Thread(target=self._run_thread)
self.running = self.updateMultiplexingList()
self.thread.start()
def start(self):
"""啟動 mavlink_object 處理循環"""
if self.state == MavlinkObjectState.RUNNING:
logger.warning(f"mavlink_object {self.socket_id} is already running")
return
self.state = MavlinkObjectState.RUNNING
# 實際的啟動會由 async_io_manager 處理
def stop(self):
self.running = False
"""停止 mavlink_object 處理循環"""
if self.state in (MavlinkObjectState.CLOSED, MavlinkObjectState.ERROR):
return
def _run_thread(self):
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
start_time = time.time()
while self.running:
timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差
self.state = MavlinkObjectState.CLOSED
if self.task:
if not self.task.done():
self.task.cancel()
# 處理接收到的封包
async def process_data(self):
"""處理 mavlink 數據的主要 asyncio 協程"""
logger.info(f'Start of mavlink_object.process_data id: {self.socket_id}')
while self.state == MavlinkObjectState.RUNNING:
timestamp = time.time()
try:
# 處理接收到的封包
mavlinkMsg = self.mavlink_socket.recv_msg()
except Exception as e:
logger.critical(f"Receiving data not mavlink format. Object Delete.")
print(f"[mavlinkObject.py] Error receiving mavlink data: {e}")
print(mavlinkMsg)
self.running = False
break
logger.warning(f"Error in mavlink_object.process_data for id {self.socket_id}: {e}")
self.dirtyDataCount += 1
if self.dirtyDataCount >= self.dirtyDataMax:
logger.error(f"Too many dirty data received in mavlink_object {self.socket_id}, entering ERROR state")
self.state = MavlinkObjectState.ERROR
# 不直接退出,嘗試容忍髒數據
await asyncio.sleep(0.01) # 短暫暫停
continue
if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega. etc...'
try:
if mavlinkMsg:
# 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間
sysid = mavlinkMsg.get_srcSystem()
compid = mavlinkMsg.get_srcComponent()
if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息
# mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型
mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp)
# break # Debug function can put here you can see the input data from mavlink
if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息
self.mavlink_systems[sysid].updateComponentPacketCount(
compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) #TODO 這邊怪怪的 好像要再檢查
# 將訊息依照 multiplexing list 分發到不同的 queue
for i in range(len(self._multiplexingList)):
if (self._multiplexingList[i] == []):
continue
elif (mavlinkMsg.get_msgId() in self._multiplexingList[i]) or (self._multiplexingList[i][0] == -1):
if i == 0:
fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg))
elif i == 1:
return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg))
else:
_queue = swap_queues[i-2]
_queue.put(mavlinkMsg)
# 分發訊息到 RingBuffer
msg_id = mavlinkMsg.get_msgId()
# if self.send_to_bridge and (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types):
if (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types):
stream_bridge_ring.put((self.socket_id, timestamp, mavlinkMsg))
# if self.send_to_return and (msg_id in self.return_msg_types or -1 in self.return_msg_types):
if (msg_id in self.return_msg_types or -1 in self.return_msg_types):
return_packet_ring.put((self.socket_id, timestamp, mavlinkMsg))
# 處理要送出的封包
# 如果 有資料在 output_buffer 中則將其取出並發送
# 沒有就略過發送
# 將接收到的訊息轉發給所有目標端口
for target_port in self.target_sockets:
if target_port != self.socket_id and target_port in self.mavlinkObjects:
target_obj = self.mavlinkObjects[target_port]
if target_obj.state == MavlinkObjectState.RUNNING:
try:
mavlinkMsg_send = self.output_buffer.get(block=False)
except queue.Empty:
mavlinkMsg_send = None
# except Exception as e:
# logger.error(f"Error getting data from output_buffer: {e}")
# mavlinkMsg_send = None
target_obj.mavlink_socket.write(mavlinkMsg.get_msgbuf())
except Exception as e:
logger.error(f"Error forwarding message to port {target_port}: {e}")
if mavlinkMsg_send:
# self.mavlink_socket.mav.send(mavlinkMsg_send)
self.mavlink_socket.write(mavlinkMsg_send.get_msgbuf()) # 這邊會將封包寫入 socket 中
with self.outgoing_msg_lock:
if self.outgoing_messages and (send_msg := self.outgoing_messages.pop(0)):
try:
self.mavlink_socket.write(send_msg)
except Exception as e:
logger.error(f"mavlink_object {self.socket_id} failed to send message: {e}")
# thread 結束
logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id))
# 這邊的重點不是延遲 而是透過 await 讓出 event loop 的控制權
await asyncio.sleep(0.001)
def updateMultiplexingList(self):
'''
更新 multiplexing list 並做簡單的檢查
'''
# 檢查 multiplexingToAnalysis 與 multiplexingToReturn 是否有 -1 值
check = (-1 in self.multiplexingToAnalysis) or (-1 in self.multiplexingToReturn)
if check:
logger.error('MultiplexingToAnalysis or MultiplexingToReturn NOT all type spilt. socket id : {}'.format(self.socket_id))
except asyncio.CancelledError:
logger.info(f'mavlink_object.process_data for id {self.socket_id} was cancelled')
break
except Exception as e:
await asyncio.sleep(0.01) # 短暫暫停避免CPU過載
logger.info(f'End of mavlink_object.process_data id: {self.socket_id}')
self.state = MavlinkObjectState.CLOSED
def add_target_socket(self, target_socket_id):
"""添加一個目標端口用於轉發"""
if target_socket_id not in self.target_sockets and target_socket_id != self.socket_id:
self.target_sockets.append(target_socket_id)
logger.info(f"mavlink_object Added target port {target_socket_id} to mavlink_object {self.socket_id}")
return True
return False
# 檢查 multiplexingToSwap 與 swap_queues 的長度是否一致 而且 swap_queues 的長度不能為 0
check = len(self.multiplexingToSwap) != len(swap_queues) or len(swap_queues) == 0
if check:
logger.error('Multiplexing Queue not fit List , Please check the list. socket id : {}'.format(self.socket_id))
def remove_target_socket(self, target_socket_id):
"""移除目標端口"""
if target_socket_id in self.target_sockets:
self.target_sockets.remove(target_socket_id)
logger.info(f"mavlink_object Removed target port {target_socket_id} from mavlink_object {self.socket_id}")
return True
return False
# 對應到自己的 multiplexingToSwap 必需為空 避免對自己迴圈轉拋
try:
self.multiplexingToSwap[self.socket_id] = []
except IndexError:
logger.error('Multiplexing List of self socket id should be void. socket id : {}'.format(self.socket_id))
def set_bridge_message_types(self, msg_types):
"""設置需要分流到橋接器的訊息類型"""
if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types):
self.bridge_msg_types = msg_types
return True
logger.error(f"Invalid bridge message types: {msg_types}")
return False
# 組合 multiplexing list
multiL_tmp = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToSwap
def set_return_message_types(self, msg_types):
"""設置需要分流到回傳處理器的訊息類型"""
if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types):
self.return_msg_types = msg_types
return True
logger.error(f"Invalid return message types: {msg_types}")
return False
# 檢查 multiplexing list 格式是否有錯誤 # 全部都要是 list 每個 list 裡面都要是 int
check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in multiL_tmp)
if not check:
logger.error('Multiplexing List Format Error, Please check the list. socket id : {}'.format(self.socket_id))
return False # 若有錯誤則回傳 False
def send_message(self, message_bytes):
"""
從主線程向此 mavlink_object socket 發送數據
將數據添加到簡單的列表中 asyncio 任務處理
# 更新 multiplexing list
self._multiplexingList = multiL_tmp
Args:
message_bytes: 要發送的 mavlink 消息的字節數據
Returns:
bool: 是否成功添加消息到列表
"""
if self.state != MavlinkObjectState.RUNNING:
logger.warning(f"Cannot send message: mavlink_object {self.socket_id} is not running")
return False
try:
# 使用鎖保護共享資源訪問
with self.outgoing_messages_lock:
self.outgoing_messages.append(message_bytes)
return True
except Exception as e:
logger.error(f"Error queueing message for mavlink_object {self.socket_id}: {e}")
return False
# ====================== 分割線 =====================
# def enable_bridge(self, enable=True):
# """啟用或禁用橋接器分流"""
# self.send_to_bridge = enable
# def enable_return(self, enable=True):
# """啟用或禁用回傳處理器分流"""
# self.send_to_return = enable
class async_io_manager:
"""
管理所有 mavlink_object 實例的 asyncio 任務
提供單一線程來處理所有 mavlink 通道的數據
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
with cls._lock:
if cls._instance is None:
cls._instance = super(async_io_manager, cls).__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, 'initialized'):
self.initialized = True
self.loop = None
self.main_task = None
self.running = False
self.managed_objects = {} # socket_id: task
def start(self):
"""啟動 async_io_manager 和其管理的所有 mavlink_object"""
if self.running:
logger.warning("async_io_manager already running")
return
self.running = True
self.thread = threading.Thread(target=self._run_event_loop)
self.thread.start()
logger.info("async_io_manager started")
def stop(self):
"""停止 async_io_manager 和其管理的所有 mavlink_object"""
if not self.running:
return
self.running = False
if self.loop:
# 取消所有任務
for socket_id in list(self.managed_objects.keys()):
self.remove_mavlink_object(socket_id)
# 停止事件循環
if self.main_task and not self.main_task.done():
asyncio.run_coroutine_threadsafe(self._shutdown(), self.loop)
# 等待線程結束
if hasattr(self, 'thread') and self.thread.is_alive():
self.thread.join(timeout=5.0)
logger.info("async_io_manager stopped")
async def _shutdown(self):
"""正確關閉事件循環的協程"""
tasks = [task for task in asyncio.all_tasks(self.loop) if task is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
def _run_event_loop(self):
"""在獨立線程中運行事件循環"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
try:
self.main_task = self.loop.create_task(self._main_task())
self.loop.run_forever()
except Exception as e:
logger.error(f"Error in async_io_manager event loop: {e}")
finally:
self.loop.close()
self.loop = None
self.running = False
logger.info("async_io_manager event loop ended")
async def _main_task(self):
"""主任務協程,用於監視和管理其他任務"""
logger.info("async_io_manager main task started")
# 整合到 ros2 之後的程式進入點
def main_node():
try:
while self.running:
# 這邊就是一個無窮迴圈 確保 async_io_manager 物件續存
# 每秒鐘檢查並移除已完成或已取消的任務
await asyncio.sleep(1.0)
for socket_id in list(self.managed_objects.keys()):
task = self.managed_objects[socket_id]
if task.done():
try:
exc = task.exception()
if exc:
logger.error(f"Task for mavlink_object {socket_id} raised exception: {exc}")
except (asyncio.CancelledError, asyncio.InvalidStateError):
pass
if socket_id in mavlink_object.mavlinkObjects:
print(f"this is manager, i make socket_id {socket_id} closed, he is now in {mavlink_object.mavlinkObjects[socket_id].state} state.") # debug
mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED
del self.managed_objects[socket_id]
except asyncio.CancelledError:
logger.info("async_io_manager main task was cancelled")
except Exception as e:
logger.error(f"Error in async_io_manager main task: {e}")
logger.info("async_io_manager main task ended")
def add_mavlink_object(self, mavlink_obj):
"""添加一個 mavlink_object 實例到 async_io_manager 並啟動其處理任務"""
if not self.running:
logger.error("Cannot add mavlink_object: async_io_manager is not running")
return False
if not isinstance(mavlink_obj, mavlink_object):
logger.error(f"Invalid mavlink_object: {mavlink_obj}")
return False
socket_id = mavlink_obj.socket_id
if socket_id in self.managed_objects:
logger.warning(f"mavlink_object {socket_id} already managed")
return False
# 創建並啟動任務
if self.loop:
# task = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop).result()
future = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop)
# print(f"Task created for mavlink_object {socket_id}: {future1}") # debug
# task = future1.result()
self.managed_objects[socket_id] = future
mavlink_obj.task = future
mavlink_obj.state = MavlinkObjectState.RUNNING
logger.info(f"Added mavlink_object {socket_id} to async_io_manager")
return True
return False
def remove_mavlink_object(self, socket_id):
"""從 async_io_manager 中移除一個 mavlink_object 實例並取消其處理任務"""
if socket_id not in self.managed_objects:
logger.warning(f"mavlink_object {socket_id} not managed by async_io_manager")
return False
# 取消任務
task = self.managed_objects[socket_id]
if not task.done():
task.cancel()
# 從管理列表中移除
del self.managed_objects[socket_id]
# 更新 mavlink_object 狀態
if socket_id in mavlink_object.mavlinkObjects:
mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED
logger.info(f"Removed mavlink_object {socket_id} from async_io_manager")
return True
def get_managed_objects(self):
"""獲取所有被管理的對象列表"""
return list(self.managed_objects.keys())
# ====================== 分割線 =====================
if __name__ == '__main__':
pass

@ -9,14 +9,16 @@
publisher topic name 命名規則為 <前綴詞>/s<sysid>/<具體 topic>
'''
# ROS2 的 import
import std_msgs.msg
import sensor_msgs.msg
import geometry_msgs.msg
import mavros_msgs.msg
from theLogger import setup_logger
import math
# 自定義的 import
from .theLogger import setup_logger
logger = setup_logger("mavlinkPublish.py")

@ -0,0 +1,448 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
from fc_network_adapter import mavlinkObject as mo
from fc_network_adapter import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 21
running_time = 30
print('test_item : ', test_item)
'''
測試項 個位數 表示分離的功能
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
'''
if test_item == 10:
# 需要開啟一個 ardupilot 的模擬器
# 測試 mavlink_object 放入 ring buffer 的應用
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
start_time = time.time()
while (time.time() - start_time) < running_time:
items_a = mo.stream_bridge_ring.get_all()
items_b = mo.return_packet_ring.get_all()
try:
print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}")
except IndexError:
print("stream_bridge_ring is empty")
try:
print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}")
except IndexError:
print("return_packet_ring is empty")
time.sleep(1)
manager.stop()
print('<=== End of Program')
elif test_item == 11:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
time.sleep(3)
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
if (time.time() - show_time) >= 2:
# print("mark B")
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
manager.stop()
analyzer.stop()
print('<=== End of Program')
elif test_item == 12:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試 mavlink object 作為交換器功能的代碼
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket_in1 = mavutil.mavlink_connection(connection_string)
mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1)
connection_string="udp:127.0.0.1:14561"
mavlink_socket_in2 = mavutil.mavlink_connection(connection_string)
mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2)
# 初始化輸出通道
connection_string="udpout:127.0.0.1:14550"
mavlink_socket_out = mavutil.mavlink_connection(connection_string)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
manager.add_mavlink_object(mavlink_object_out)
manager.add_mavlink_object(mavlink_object_in1)
manager.add_mavlink_object(mavlink_object_in2)
mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id)
mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id)
start_time = time.time()
while (time.time() - start_time) < running_time:
time.sleep(1)
manager.stop()
print('<=== End of Program')
# elif test_item == 20:
# # 這邊測試 node 生成 topic 的功能
# # 利用 空的通道 發出假的 heartbeat 封包
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# from pymavlink.dialects.v20 import common as mavlink2
# sysid = 1
# compid = 1
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# mav = mavlink2.MAVLink(None)
# mav.srcSystem = sysid
# mav.srcComponent = compid
# # 這是一個 heartbeat 封包
# fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
# type=mavlink2.MAV_TYPE_QUADROTOR,
# autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
# base_mode=3,
# custom_mode=0,
# system_status=0,
# mavlink_version=3
# )
# mavlink_bytes = fake_heartbeat.pack(mav)
# fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
# print(analyzer.mavlink_systems)
# mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
# time.sleep(0.1)
# print(analyzer.mavlink_systems)
# # ROS2 初始化
# analyzer._init_node()
# # 創建 ROS2 topic
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
# print("topic created")
# time.sleep(5)
# # 丟出 topic
# analyzer.emit_info()
# # 結束程式
# analyzer.running = False
# analyzer.destroy_node()
# rclpy.shutdown()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
# 初始化 rclpy 才能使用 node
rclpy.init()
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
time.sleep(0.5) # 系統 Setup 完成
# 創建通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
manager.add_mavlink_object(mavlink_object3)
print('=== waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(1)
except KeyboardInterrupt:
break
# 程式結束
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
manager.stop()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
# elif test_item == 22:
# # 需要開啟一個 ardupilot 的模擬器 與 GCS
# # 這邊是測試代碼 引入 rclpy 來測試 node 的運行
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 初始化兩個通道
# connection_string_in="udp:127.0.0.1:15551"
# mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
# mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
# mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
# connection_string_out="udpout:127.0.0.1:14553"
# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
# mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
# mavlink_object_out.multiplexingToAnalysis = []
# # 讓兩個通道互相傳輸
# mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# # 啟動通道
# mavlink_object_in.run()
# mavlink_object_out.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# print('目前所有的系統 : ')
# for sysid in analyzer.mavlink_systems:
# print(analyzer.mavlink_systems[sysid])
# compid = 1
# sysid = 1
# show_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# show_time2 = time.time()
# while time.time() - start_time < running_time:
# if (time.time() - show_time2) >= 1:
# analyzer.emit_info() # 這邊是測試 node 的運行
# # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
# fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
# sss = mavutil.mode_string_v10(fmsg)
# # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
# print("目前的飛行模式 : {}".format(sss))
# show_time2 = time.time()
# # if (time.time() - show_time) >= 2:
# # show_time = time.time()
# # for sysid in analyzer.mavlink_systems:
# # for compid in analyzer.mavlink_systems[sysid].components:
# # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# # print("===================")
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object_in.stop()
# mavlink_object_in.thread.join()
# mavlink_socket_in.close()
# mavlink_object_out.stop()
# mavlink_object_out.thread.join()
# mavlink_socket_out.close()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
# elif test_item == 51:
# # 晉凱的測試項目
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 創建通道
# connection_string="udp:127.0.0.1:15551"
# mavlink_socket3 = mavutil.mavlink_connection(connection_string)
# mavlink_object3 = mo.mavlink_object(mavlink_socket3)
# # 設定通道流動
# mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
# mavlink_object3.multiplexingToReturn = [] #
# # mavlink_object3.multiplexingToSwap = [] #
# # 啟動通道
# mavlink_object3.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# compid = 1
# sysid = 1
# start_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
# end_time = time.time()
# print(f"Execution time for create all topic: {end_time - start_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# while time.time() - start_time < running_time:
# try:
# # rclpy.spin(analyzer)
# analyzer.emit_info() # 這邊是測試 node 的運行
# time.sleep(0.5)
# except KeyboardInterrupt:
# break
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object3.stop()
# mavlink_object3.thread.join()
# analyzer.stop()
# analyzer.thread.join()
# mavlink_socket3.close()
# print('<=== End of Program')

@ -1,246 +0,0 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import time
import threading
import socket
import random
from pymavlink import mavutil
from pymavlink.dialects.v20 import common as mavlink2
from fc_network_adapter.asyncioManager import AsyncIOManager, Connection
class MockMAVLinkSocket:
"""模擬 pymavlink socket 行為的類,用於測試"""
def __init__(self, name="mock", simulate_errors=False):
self.name = name
self.simulate_errors = simulate_errors
self.is_closed = False
self.received_msgs = []
self.seq = 0
def recv_msg(self):
"""模擬接收 MAVLink 消息"""
if self.is_closed:
return None
# 隨機模擬網絡延遲
time.sleep(random.uniform(0.01, 0.05))
# 偶爾模擬錯誤
if self.simulate_errors and random.random() < 0.1:
raise Exception("Simulated network error")
# 70% 的機率返回消息30% 返回 None (模擬無數據可讀)
if random.random() > 0.3:
msg = self._generate_mavlink_message()
self.received_msgs.append(msg)
return msg
return None
def _generate_mavlink_message(self):
"""生成模擬 MAVLink 消息"""
# 創建 MAVLink 實例
mav = mavlink2.MAVLink(self)
# 設置來源系統和組件
mav.srcSystem = random.randint(1, 5)
mav.srcComponent = random.randint(1, 5)
# 創建 HEARTBEAT 消息
msg = mavlink2.MAVLink_heartbeat_message(
type=mavlink2.MAV_TYPE_QUADROTOR,
autopilot=mavlink2.MAV_AUTOPILOT_GENERIC,
base_mode=0,
custom_mode=0,
system_status=0,
mavlink_version=3
)
# 打包消息
msg.pack(mav)
self.seq += 1
return msg
def write(self, data):
"""模擬發送數據"""
if self.is_closed:
raise Exception("Socket is closed")
# 模擬寫入操作
time.sleep(random.uniform(0.001, 0.01))
# 偶爾模擬錯誤
if self.simulate_errors and random.random() < 0.1:
raise Exception("Simulated write error")
return len(data)
def close(self):
"""關閉連接"""
self.is_closed = True
def create_mock_mavlink_sockets(count=3):
"""創建多個模擬 MAVLink socket"""
sockets = []
for i in range(count):
# 每三個 socket 中有一個模擬錯誤
simulate_errors = (i % 3 == 0)
socket = MockMAVLinkSocket(f"mock_{i}", simulate_errors)
sockets.append(socket)
return sockets
def monitor_thread(io_manager):
"""監視 AsyncIOManager 狀態的執行緒"""
print("Starting monitoring thread...")
try:
while True:
# 獲取總體統計資訊
stats = io_manager.get_stats()
print(f"\n=== AsyncIOManager Stats ===")
print(f"Total connections: {stats.get('connection_count', 0)}")
print(f"Total messages received: {stats.get('rx_count', 0)}")
print(f"Total bytes received: {stats.get('rx_bytes', 0)}")
print(f"Total errors: {stats.get('errors', 0)}")
# 獲取每個連接的緩衝區狀態
for socket_id in io_manager.connections:
buffer = io_manager.get_buffer(socket_id)
conn_stats = io_manager.get_stats(socket_id)
if buffer and not io_manager.connections[socket_id].is_closed:
print(f"Connection {socket_id}: Buffer size={buffer.size()}, " +
f"Messages={conn_stats.get('rx_count', 0)}, " +
f"Last activity={time.time() - io_manager.connections[socket_id].last_activity:.1f}s ago")
# 每秒更新一次
time.sleep(1.0)
except KeyboardInterrupt:
print("Monitoring thread stopped.")
def consumer_thread(io_manager, socket_id):
"""從指定連接的緩衝區消費消息"""
print(f"Starting consumer for socket {socket_id}...")
try:
while True:
# 獲取緩衝區
buffer = io_manager.get_buffer(socket_id)
if not buffer:
print(f"Consumer {socket_id}: Buffer not found or connection closed")
break
# 獲取所有消息
messages = buffer.get_all()
if messages:
print(f"Consumer {socket_id}: Received {len(messages)} messages")
for msg in messages:
# 處理每條消息 (這裡只是示例,實際應用中可能有更複雜的處理)
print(f" - Message from SysID={msg.get_srcSystem()}, CompID={msg.get_srcComponent()}, Type={msg.get_type()}")
# 短暫休眠,避免 CPU 空轉
time.sleep(0.2)
except Exception as e:
print(f"Consumer {socket_id} stopped: {e}")
def send_command_thread(io_manager, socket_ids):
"""定期向所有連接發送命令"""
print("Starting command sender thread...")
# 創建一個 MAVLink 實例用於生成消息
mav = mavlink2.MAVLink(None)
mav.srcSystem = 255 # GCS system ID
mav.srcComponent = 0
try:
count = 0
while True:
# 等待 2 秒
time.sleep(2.0)
count += 1
# 創建一個命令消息
msg = mavlink2.MAVLink_command_long_message(
target_system=1,
target_component=1,
command=mavlink2.MAV_CMD_REQUEST_PROTOCOL_VERSION,
confirmation=0,
param1=0, param2=0, param3=0,
param4=0, param5=0, param6=0, param7=0
)
# 向所有連接發送消息
for socket_id in socket_ids:
if socket_id in io_manager.connections and not io_manager.connections[socket_id].is_closed:
try:
# 打包並發送消息
result = io_manager.send_message(socket_id, msg)
print(f"Sent command to socket {socket_id}: {'Success' if result else 'Failed'}")
except Exception as e:
print(f"Error sending to socket {socket_id}: {e}")
# 每 10 次迭代關閉一個連接 (用於演示關閉功能)
if count % 10 == 0 and socket_ids:
socket_id = socket_ids.pop(0)
print(f"Closing connection {socket_id}...")
io_manager.remove_connection(socket_id)
except Exception as e:
print(f"Command sender thread stopped: {e}")
def main():
print("AsyncIOManager Example")
print("======================")
# 創建 AsyncIOManager 實例
io_manager = AsyncIOManager()
# 啟動 I/O 管理器
io_manager.start()
print("AsyncIOManager started")
# 創建模擬 MAVLink socket
mock_sockets = create_mock_mavlink_sockets(5)
socket_ids = []
# 註冊連接
for i, socket in enumerate(mock_sockets):
socket_id = io_manager.register_connection(socket)
socket_ids.append(socket_id)
print(f"Registered socket {i} with ID {socket_id}")
# 啟動監視執行緒
monitor = threading.Thread(target=monitor_thread, args=(io_manager,))
monitor.daemon = True
monitor.start()
# 為每個連接啟動消費者執行緒
consumers = []
for socket_id in socket_ids:
consumer = threading.Thread(target=consumer_thread, args=(io_manager, socket_id))
consumer.daemon = True
consumer.start()
consumers.append(consumer)
# 啟動命令發送執行緒
sender = threading.Thread(target=send_command_thread, args=(io_manager, socket_ids.copy()))
sender.daemon = True
sender.start()
print("\nPress Ctrl+C to exit...")
try:
# 運行一段時間然後退出
time.sleep(30)
except KeyboardInterrupt:
pass
finally:
# 清理資源
print("\nShutting down...")
io_manager.stop()
print("AsyncIOManager stopped")
if __name__ == "__main__":
main()

@ -0,0 +1,468 @@
#!/usr/bin/env python
"""
測試腳本用於測試 mavlinkObject.py 中的 mavlink_object async_io_manager 類別
"""
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import unittest
import time
import threading
import socket
import asyncio
from unittest.mock import MagicMock, patch
# 導入要測試的模組
from fc_network_adapter.mavlinkObject import (
mavlink_object,
async_io_manager,
MavlinkObjectState,
stream_bridge_ring,
return_packet_ring
)
class MockMavlinkSocket:
"""模擬 Mavlink Socket 的類別,用於測試"""
def __init__(self, test_data=None):
self.closed = False
self.test_data = test_data or []
self.data_index = 0
self.written_data = []
def recv_msg(self):
if not self.test_data or self.data_index >= len(self.test_data):
return None
data = self.test_data[self.data_index]
self.data_index += 1
return data
def write(self, data):
self.written_data.append(data)
def close(self):
self.closed = True
class MockMavlinkMessage:
"""模擬 Mavlink 訊息的類別,用於測試"""
def __init__(self, msg_id, sysid, compid, seq=0):
self.msg_id = msg_id
self.sysid = sysid
self.compid = compid
self.seq = seq
self.msg_buf = bytes([msg_id, sysid, compid, seq])
def get_msgId(self):
return self.msg_id
def get_srcSystem(self):
return self.sysid
def get_srcComponent(self):
return self.compid
def get_seq(self):
return self.seq
def get_msgbuf(self):
return self.msg_buf
def get_type(self):
return f"MSG_ID_{self.msg_id}"
class TestMavlinkObject(unittest.TestCase):
"""測試 mavlink_object 類別"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建測試訊息
self.heartbeat_msg = MockMavlinkMessage(msg_id=0, sysid=1, compid=1)
self.attitude_msg = MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
self.position_msg = MockMavlinkMessage(msg_id=32, sysid=1, compid=1)
# 創建模擬的 socket # 假的 Mavlink Socket
self.mock_socket = MockMavlinkSocket([
self.heartbeat_msg,
self.attitude_msg,
self.position_msg
])
# 創建測試對象
self.mavlink_obj = mavlink_object(self.mock_socket)
def test_initialization(self):
"""測試 mavlink_object 初始化是否正確"""
# print("Testing mavlink_object initialization")
self.assertEqual(self.mavlink_obj.socket_id, 0)
self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT)
self.assertEqual(len(self.mavlink_obj.target_sockets), 0)
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0])
def test_add_remove_target_socket(self):
"""測試添加和移除目標端口功能"""
# 添加目標端口
self.assertTrue(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
self.assertEqual(self.mavlink_obj.target_sockets[0], 1)
self.assertTrue(self.mavlink_obj.add_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
self.assertIn(2, self.mavlink_obj.target_sockets)
# 嘗試添加已存在的端口
self.assertFalse(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 嘗試添加自己的端口
self.assertFalse(self.mavlink_obj.add_target_socket(0))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 移除端口
self.assertTrue(self.mavlink_obj.remove_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
# 嘗試移除不存在的端口
self.assertFalse(self.mavlink_obj.remove_target_socket(2))
def test_set_message_types(self):
"""測試設置訊息類型功能"""
# 設置橋接器訊息類型
self.assertTrue(self.mavlink_obj.set_bridge_message_types([0, 30]))
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0, 30])
# 設置回傳處理器訊息類型
self.assertTrue(self.mavlink_obj.set_return_message_types([32]))
self.assertEqual(self.mavlink_obj.return_msg_types, [32])
# 測試無效的訊息類型
self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid"))
self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"]))
def test_send_message(self):
"""測試 send_message 功能"""
# 創建一個新的 mavlink_object 實例
mock_socket = MockMavlinkSocket()
mavlink_obj = mavlink_object(mock_socket)
# 準備測試數據
test_message1 = b"test_message_1"
test_message2 = b"test_message_2"
# 測試初始狀態
self.assertEqual(len(mock_socket.written_data), 0)
# 測試非運行狀態下發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 0)
# 啟動 manager
manager = async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象到 manager
manager.add_mavlink_object(mavlink_obj)
time.sleep(0.1) # 等待對象啟動
# 確認對象狀態
self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING)
# 測試發送消息
self.assertTrue(mavlink_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
# 確認消息已發送
self.assertEqual(len(mock_socket.written_data), 1)
self.assertEqual(mock_socket.written_data[0], test_message1)
# 測試連續發送多條消息
self.assertTrue(mavlink_obj.send_message(test_message2))
time.sleep(0.2) # 等待消息處理
# 確認兩條消息都已發送
self.assertEqual(len(mock_socket.written_data), 2)
self.assertEqual(mock_socket.written_data[1], test_message2)
# 模擬發送出錯的情況
class ErrorWriteSocket(MockMavlinkSocket):
def write(self, data):
raise Exception("Write error")
error_socket = ErrorWriteSocket()
error_obj = mavlink_object(error_socket)
manager.add_mavlink_object(error_obj)
time.sleep(0.1) # 等待對象啟動
# 發送消息到錯誤的 socket
self.assertTrue(error_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
# 即使寫入失敗send_message 應該也返回 True
# 因為消息已成功加入到佇列中,只是後續的實際發送失敗
# 停止 manager
manager.stop()
time.sleep(0.5) # 等待 manager 停止
# 測試對象已關閉後發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加
class TestAsyncIOManager(unittest.TestCase):
"""測試 async_io_manager 類別"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建 async_io_manager 實例
self.manager = async_io_manager()
# 模擬 mavlink 對象
self.mock_socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
self.mock_socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1)
])
self.mavlink_obj1 = mavlink_object(self.mock_socket1)
self.mavlink_obj2 = mavlink_object(self.mock_socket2)
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.stop()
def test_singleton_pattern(self):
"""測試 async_io_manager 的單例模式 就是不會產生兩個 magager"""
manager1 = async_io_manager()
manager2 = async_io_manager()
self.assertIs(manager1, manager2)
def test_start_stop(self):
"""測試 async_io_manager 的啟動和停止功能"""
# 啟動管理器
self.manager.start()
self.assertTrue(self.manager.running)
self.assertIsNotNone(self.manager.thread)
# 再次啟動應該沒有效果
old_thread = self.manager.thread
self.manager.start()
self.assertIs(self.manager.thread, old_thread)
# 停止管理器
self.manager.stop()
self.assertFalse(self.manager.running)
# 最多等待 5 秒讓線程結束
start_time = time.time()
while self.manager.thread.is_alive() and time.time() - start_time < 5:
time.sleep(0.1)
self.assertFalse(self.manager.thread.is_alive())
def test_add_remove_objects(self):
"""測試添加和移除 mavlink_object"""
# 啟動管理器
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj1))
self.assertEqual(len(self.manager.managed_objects), 1)
self.assertEqual(self.mavlink_obj1.state, MavlinkObjectState.RUNNING)
# 添加另一個對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj2))
self.assertEqual(len(self.manager.managed_objects), 2)
# 檢查受管理對象列表
managed_objects = self.manager.get_managed_objects()
self.assertEqual(len(managed_objects), 2)
self.assertIn(0, managed_objects)
self.assertIn(1, managed_objects)
# 移除對象
self.assertTrue(self.manager.remove_mavlink_object(0))
self.assertEqual(len(self.manager.managed_objects), 1)
# 嘗試移除不存在的對象
self.assertFalse(self.manager.remove_mavlink_object(999))
# 停止管理器
self.manager.stop()
def test_data_processing(self):
"""測試數據處理"""
# 創建用於轉發的 mavlink_objects
mock_socket3 = MockMavlinkSocket()
mavlink_obj3 = mavlink_object(mock_socket3)
# 設置轉發
self.mavlink_obj1.add_target_socket(2) # socket1 轉發到 socket3
# 設置訊息類型
self.mavlink_obj1.set_bridge_message_types([0, 30]) # HEARTBEAT, ATTITUDE
# self.mavlink_obj1.enable_return(True)
self.mavlink_obj1.set_return_message_types([30]) # ATTITUDE
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
self.manager.add_mavlink_object(self.mavlink_obj1)
self.manager.add_mavlink_object(mavlink_obj3)
# 等待處理完成
time.sleep(1.0)
# print("testing Mark A")
# 檢查 Ring buffer 是否有正確的數據
self.assertEqual(stream_bridge_ring.size(), 2) # HEARTBEAT + ATTITUDE
self.assertEqual(return_packet_ring.size(), 1) # ATTITUDE
a = stream_bridge_ring.get()
print(f"stream_bridge_ring: {a}")
# 檢查是否正確轉發
self.assertEqual(len(mock_socket3.written_data), 2) # HEARTBEAT + ATTITUDE
# print("testing Mark B")
# 停止管理器
self.manager.stop()
def test_error_handling(self):
"""測試錯誤處理情況"""
print("=== mark A ===")
# 創建一個會引發異常的 socket
class ErrorSocket(MockMavlinkSocket):
def recv_msg(self):
raise Exception("Test exception")
error_socket = ErrorSocket()
mavlink_obj_err = mavlink_object(error_socket)
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
self.manager.add_mavlink_object(mavlink_obj_err)
print("=== mark B ===")
# 等待錯誤處理
time.sleep(1.0)
# 對象應該進入錯誤狀態,但不會崩潰
# self.assertEqual(mavlink_obj_err.state, MavlinkObjectState.ERROR)
print("=== mark C ===")
# 管理器應該仍在運行
self.assertTrue(self.manager.running)
# 故意等一段時間 確認 socket 有被 manager 處理掉
time.sleep(3)
# 停止管理器
self.manager.stop()
class TestIntegration(unittest.TestCase):
"""整合測試,測試多個 mavlink_object 之間的互動"""
def test_bidirectional_forwarding(self):
"""測試雙向轉發"""
# 清空全局變數和 ring buffer
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建三個 mavlink 對象,模擬三個通道
socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1),
MockMavlinkMessage(msg_id=32, sysid=2, compid=1)
])
socket3 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=3, compid=1),
MockMavlinkMessage(msg_id=33, sysid=3, compid=1)
])
obj1 = mavlink_object(socket1)
obj2 = mavlink_object(socket2)
obj3 = mavlink_object(socket3)
# 設置雙向轉發
# obj1 <-> obj2 <-> obj3
obj1.add_target_socket(1) # obj1 -> obj2
obj2.add_target_socket(0) # obj2 -> obj1
obj2.add_target_socket(2) # obj2 -> obj3
obj3.add_target_socket(1) # obj3 -> obj2
# 啟動 async_io_manager
manager = async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加所有 mavlink_object
manager.add_mavlink_object(obj1)
manager.add_mavlink_object(obj2)
manager.add_mavlink_object(obj3)
# 等待處理所有訊息
time.sleep(1.5)
# 檢查轉發結果
# socket1 應該收到 socket2 和 socket3 的訊息
self.assertEqual(len(socket1.written_data), 4) # 2 from obj2, 2 from obj3 via obj2
# socket2 應該收到 socket1 和 socket3 的訊息
self.assertEqual(len(socket2.written_data), 4) # 2 from obj1, 2 from obj3
# socket3 應該收到 socket1 和 socket2 的訊息
self.assertEqual(len(socket3.written_data), 4) # 2 from obj1 via obj2, 2 from obj2
# 檢查 ring buffer 的數據
# 假設所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0]
# 應該有 3 個 HEARTBEAT 訊息
self.assertEqual(stream_bridge_ring.size(), 3) # 3 HEARTBEAT
# 停止管理器
manager.stop()
if __name__ == "__main__":
unittest.main(defaultTest="TestMavlinkObject.test_send_message")
Loading…
Cancel
Save