You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/fc_network_adapter/tests/test_mavlinkObject.py

469 lines
17 KiB
Python

#!/usr/bin/env python
"""
測試腳本用於測試 mavlinkObject.py 中的 mavlink_object async_io_manager 類別
"""
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import unittest
import time
import threading
import socket
import asyncio
# 導入要測試的模組
from ..fc_network_adapter.mavlinkObject import (
mavlink_object,
async_io_manager,
MavlinkObjectState,
stream_bridge_ring,
return_packet_ring
)
# 預先定義好的真實 MAVLink heartbeat 封包 (MAVLink 1.0 格式)
# Format: STX(0xFE) + LEN + SEQ + SYS + COMP + MSG_ID + PAYLOAD(9 bytes for heartbeat) + CRC(2 bytes)
HEARTBEAT_PACKET_1 = bytes([
0xFE, # STX (MAVLink 1.0)
0x09, # payload length (9 bytes)
0x00, # sequence
0x01, # system ID = 1
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
# Payload (9 bytes): custom_mode(4), type(1), autopilot(1), base_mode(1), system_status(1), mavlink_version(1)
0x00, 0x00, 0x00, 0x00, # custom_mode = 0
0x02, # type = MAV_TYPE_QUADROTOR (2)
0x03, # autopilot = MAV_AUTOPILOT_ARDUPILOTMEGA (3)
0x40, # base_mode = MAV_MODE_FLAG_CUSTOM_MODE_ENABLED (64)
0x03, # system_status = MAV_STATE_STANDBY (3)
0x03, # mavlink_version = 3
0x62, 0x8E # CRC (simplified placeholder)
])
HEARTBEAT_PACKET_2 = bytes([
0xFE, # STX
0x09, # payload length
0x01, # sequence (增加)
0x01, # system ID = 1
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
0x00, 0x00, 0x00, 0x00,
0x02, 0x03, 0x41, 0x03, 0x03,
0x33, 0xEC
])
HEARTBEAT_PACKET_3 = bytes([
0xFE, # STX
0x09, # payload length
0x02, # sequence
0x02, # system ID = 2
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
0x00, 0x00, 0x00, 0x00,
0x02, 0x03, 0x42, 0x03, 0x03,
0x37, 0x44
])
class MockMavlinkSocket:
"""模擬 Mavlink Socket 的類別,用於測試
使用真實的 MAVLink 封包而不是模擬的訊息對象
"""
def __init__(self, test_packets=None):
"""
Args:
test_packets: list of bytes每個元素都是完整的 MAVLink 封包
"""
self.closed = False
self.test_packets = test_packets or []
self.packet_index = 0
self.written_data = []
# 使用 pymavlink 來解析封包
from pymavlink import mavutil
self.mav_parser = mavutil.mavlink.MAVLink(self)
def recv_msg(self):
"""返回解析後的 MAVLink 訊息對象"""
if not self.test_packets or self.packet_index >= len(self.test_packets):
return None
packet = self.test_packets[self.packet_index]
self.packet_index += 1
# 使用 pymavlink 解析封包
try:
for byte in packet:
msg = self.mav_parser.parse_char(bytes([byte]))
if msg:
return msg
except Exception as e:
print(f"Error parsing packet: {e}")
return None
return None
def write(self, data):
"""寫入數據(用於檢查轉發)"""
self.written_data.append(data)
def close(self):
"""關閉 socket"""
self.closed = True
class TestMavlinkObject(unittest.TestCase):
"""測試 mavlink_object 類別的獨立功能"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建模擬的 socket使用真實封包
self.mock_socket = MockMavlinkSocket([HEARTBEAT_PACKET_1])
# 創建測試對象
self.mavlink_obj = mavlink_object(self.mock_socket)
def test_initialization(self):
"""測試 mavlink_object 初始化是否正確"""
self.assertEqual(self.mavlink_obj.socket_id, 0)
self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT)
self.assertEqual(len(self.mavlink_obj.target_sockets), 0)
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0])
self.assertEqual(self.mavlink_obj.return_msg_types, [])
def test_add_remove_target_socket(self):
"""測試添加和移除目標端口功能"""
# 添加目標端口
self.assertTrue(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
self.assertEqual(self.mavlink_obj.target_sockets[0], 1)
self.assertTrue(self.mavlink_obj.add_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
self.assertIn(2, self.mavlink_obj.target_sockets)
# 嘗試添加已存在的端口
self.assertFalse(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 嘗試添加自己的端口
self.assertFalse(self.mavlink_obj.add_target_socket(0))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 移除端口
self.assertTrue(self.mavlink_obj.remove_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
# 嘗試移除不存在的端口
self.assertFalse(self.mavlink_obj.remove_target_socket(2))
def test_set_message_types(self):
"""測試設置訊息類型功能"""
# 設置橋接器訊息類型
self.assertTrue(self.mavlink_obj.set_bridge_message_types([0, 30]))
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0, 30])
# 設置回傳處理器訊息類型
self.assertTrue(self.mavlink_obj.set_return_message_types([32]))
self.assertEqual(self.mavlink_obj.return_msg_types, [32])
# 測試無效的訊息類型
self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid"))
self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"]))
def test_send_message_validation(self):
"""測試 send_message 的數據驗證功能(不需要啟動 manager"""
# 測試非運行狀態下發送消息
self.assertFalse(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1))
# 測試無效的數據類型
self.mavlink_obj.state = MavlinkObjectState.RUNNING # 臨時設置狀態
self.assertFalse(self.mavlink_obj.send_message("invalid"))
self.assertFalse(self.mavlink_obj.send_message(123))
# 測試太短的封包
self.assertFalse(self.mavlink_obj.send_message(bytes([0xFE, 0x00])))
# 測試無效的起始標記
invalid_packet = bytes([0xFF, 0x09, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00])
self.assertFalse(self.mavlink_obj.send_message(invalid_packet))
# 測試有效的封包可以加入佇列
self.assertTrue(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(self.mavlink_obj.outgoing_msgs), 1)
self.mavlink_obj.state = MavlinkObjectState.INIT # 恢復狀態
class TestAsyncIOManager(unittest.TestCase):
"""測試 async_io_manager 類別的獨立功能"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建 async_io_manager 實例
self.manager = async_io_manager()
# 創建模擬 mavlink 對象,使用真實封包
self.mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2])
self.mock_socket2 = MockMavlinkSocket([HEARTBEAT_PACKET_3])
self.mavlink_obj1 = mavlink_object(self.mock_socket1)
self.mavlink_obj2 = mavlink_object(self.mock_socket2)
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.shutdown()
def test_singleton_pattern(self):
"""測試 async_io_manager 的單例模式"""
manager1 = async_io_manager()
manager2 = async_io_manager()
self.assertIs(manager1, manager2)
def test_start_stop(self):
"""測試 async_io_manager 的啟動和停止功能"""
# 啟動管理器
self.manager.start()
self.assertTrue(self.manager.running)
self.assertIsNotNone(self.manager.thread)
# 再次啟動應該沒有效果
old_thread = self.manager.thread
self.manager.start()
self.assertIs(self.manager.thread, old_thread)
# 停止管理器
self.manager.shutdown()
self.assertFalse(self.manager.running)
# 最多等待 5 秒讓線程結束
start_time = time.time()
while self.manager.thread.is_alive() and time.time() - start_time < 5:
time.sleep(0.1)
self.assertFalse(self.manager.thread.is_alive())
def test_add_remove_objects(self):
"""測試添加和移除 mavlink_object"""
# 啟動管理器
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj1))
self.assertEqual(len(self.manager.managed_objects), 1)
self.assertEqual(self.mavlink_obj1.state, MavlinkObjectState.RUNNING)
# 添加另一個對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj2))
self.assertEqual(len(self.manager.managed_objects), 2)
# 檢查受管理對象列表
managed_objects = self.manager.get_managed_objects()
self.assertEqual(len(managed_objects), 2)
self.assertIn(0, managed_objects)
self.assertIn(1, managed_objects)
# 移除對象
self.assertTrue(self.manager.remove_mavlink_object(0))
self.assertEqual(len(self.manager.managed_objects), 1)
# 嘗試移除不存在的對象
self.assertFalse(self.manager.remove_mavlink_object(999))
# 停止管理器
self.manager.shutdown()
class TestIntegration(unittest.TestCase):
"""整合測試,測試多個 mavlink_object 之間的互動與資料流"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建 async_io_manager 實例
self.manager = async_io_manager()
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.shutdown()
def test_send_message_with_manager(self):
"""測試透過 async_io_manager 發送訊息的完整流程"""
# 創建一個新的 mavlink_object 實例
mock_socket = MockMavlinkSocket()
mavlink_obj = mavlink_object(mock_socket)
# 測試初始狀態
self.assertEqual(len(mock_socket.written_data), 0)
# 測試非運行狀態下發送消息
self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(mock_socket.written_data), 0)
# 啟動 manager
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象到 manager
self.manager.add_mavlink_object(mavlink_obj)
time.sleep(0.1) # 等待對象啟動
# 確認對象狀態
self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING)
# 測試發送消息
self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
time.sleep(0.2) # 等待消息處理
# 確認消息已發送
self.assertEqual(len(mock_socket.written_data), 1)
self.assertEqual(mock_socket.written_data[0], HEARTBEAT_PACKET_1)
# 測試連續發送多條消息
self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_2))
time.sleep(0.2) # 等待消息處理
# 確認兩條消息都已發送
self.assertEqual(len(mock_socket.written_data), 2)
self.assertEqual(mock_socket.written_data[1], HEARTBEAT_PACKET_2)
# 停止 manager
self.manager.shutdown()
time.sleep(0.5) # 等待 manager 停止
# 測試對象已關閉後發送消息
self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加
def test_data_processing_and_forwarding(self):
"""測試數據處理與轉發流程"""
# 創建用於轉發的 mavlink_objects
mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2,])
mock_socket3 = MockMavlinkSocket()
mavlink_obj1 = mavlink_object(mock_socket1)
mavlink_obj3 = mavlink_object(mock_socket3)
# 設置訊息類型
mavlink_obj1.set_bridge_message_types([0]) # 只處理 HEARTBEAT
# 設置轉發: obj1 -> obj3
mavlink_obj1.add_target_socket(mavlink_obj3.socket_id) # socket1 轉發到 socket3 (socket_id=1)
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
"""
這邊出現很奇怪的狀況 應該說 設計時沒有考量 但是實測會發現
mavlink_obj3 是接收端 必需要被優先加入 manager 才能正確接收來自 mavlink_obj1 的轉發封包
若先把 mavlink_ojb1 加入 manger 則可能會導致前面幾個封包丟失
"""
self.manager.add_mavlink_object(mavlink_obj3)
self.manager.add_mavlink_object(mavlink_obj1)
# 等待處理完成
time.sleep(0.5)
# 檢查 Ring buffer 是否有正確的數據
self.assertGreaterEqual(stream_bridge_ring.size(), 2) # 至少 2 個 HEARTBEAT
# 檢查是否正確轉發
self.assertGreaterEqual(len(mock_socket3.written_data), 2) # 至少 2 個 HEARTBEAT
# 停止管理器
self.manager.shutdown()
def test_bidirectional_forwarding(self):
"""測試雙向轉發"""
# 清空全局變數和 ring buffer
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建三個 mavlink 對象,模擬三個通道
socket1 = MockMavlinkSocket()
socket2 = MockMavlinkSocket()
socket3 = MockMavlinkSocket()
obj1 = mavlink_object(socket1)
obj2 = mavlink_object(socket2)
obj3 = mavlink_object(socket3)
# 設置雙向轉發
# obj1 <-> obj2 <-> obj3
obj1.add_target_socket(1) # obj1 -> obj2
obj2.add_target_socket(0) # obj2 -> obj1
obj2.add_target_socket(2) # obj2 -> obj3
obj3.add_target_socket(1) # obj3 -> obj2
# 啟動 async_io_manager
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加所有 mavlink_object
self.manager.add_mavlink_object(obj1)
self.manager.add_mavlink_object(obj2)
self.manager.add_mavlink_object(obj3)
# 對三個對象添加數據
socket1.test_packets.append(HEARTBEAT_PACKET_1)
socket2.test_packets.append(HEARTBEAT_PACKET_2)
socket3.test_packets.append(HEARTBEAT_PACKET_3)
# 等待處理所有訊息
time.sleep(1.0)
# 檢查轉發結果
# socket1 應該收到 socket2 的訊息
self.assertGreaterEqual(len(socket1.written_data), 1)
# socket2 應該收到 socket1 和 socket3 的訊息
self.assertGreaterEqual(len(socket2.written_data), 2)
# socket3 應該收到 socket2 的訊息
self.assertGreaterEqual(len(socket3.written_data), 1)
# 檢查 ring buffer 的數據
# 所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0]
self.assertGreaterEqual(stream_bridge_ring.size(), 3) # 至少 3 個 HEARTBEAT
# 停止管理器
self.manager.shutdown()
if __name__ == "__main__":
# 可以指定要運行的測試
# unittest.main(defaultTest="TestMavlinkObject.test_send_message_validation")
# unittest.main(defaultTest="TestAsyncIOManager.test_add_remove_objects")
unittest.main(defaultTest="TestIntegration.test_bidirectional_forwarding")
unittest.main()