You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

469 lines
17 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
"""
測試腳本,用於測試 mavlinkObject.py 中的 mavlink_object 和 async_io_manager 類別
"""
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import unittest
import time
import threading
import socket
import asyncio
# 導入要測試的模組
from ..fc_network_adapter.mavlinkObject import (
mavlink_object,
async_io_manager,
MavlinkObjectState,
stream_bridge_ring,
return_packet_ring
)
# 預先定義好的真實 MAVLink heartbeat 封包 (MAVLink 1.0 格式)
# Format: STX(0xFE) + LEN + SEQ + SYS + COMP + MSG_ID + PAYLOAD(9 bytes for heartbeat) + CRC(2 bytes)
HEARTBEAT_PACKET_1 = bytes([
0xFE, # STX (MAVLink 1.0)
0x09, # payload length (9 bytes)
0x00, # sequence
0x01, # system ID = 1
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
# Payload (9 bytes): custom_mode(4), type(1), autopilot(1), base_mode(1), system_status(1), mavlink_version(1)
0x00, 0x00, 0x00, 0x00, # custom_mode = 0
0x02, # type = MAV_TYPE_QUADROTOR (2)
0x03, # autopilot = MAV_AUTOPILOT_ARDUPILOTMEGA (3)
0x40, # base_mode = MAV_MODE_FLAG_CUSTOM_MODE_ENABLED (64)
0x03, # system_status = MAV_STATE_STANDBY (3)
0x03, # mavlink_version = 3
0x62, 0x8E # CRC (simplified placeholder)
])
HEARTBEAT_PACKET_2 = bytes([
0xFE, # STX
0x09, # payload length
0x01, # sequence (增加)
0x01, # system ID = 1
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
0x00, 0x00, 0x00, 0x00,
0x02, 0x03, 0x41, 0x03, 0x03,
0x33, 0xEC
])
HEARTBEAT_PACKET_3 = bytes([
0xFE, # STX
0x09, # payload length
0x02, # sequence
0x02, # system ID = 2
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
0x00, 0x00, 0x00, 0x00,
0x02, 0x03, 0x42, 0x03, 0x03,
0x37, 0x44
])
class MockMavlinkSocket:
"""模擬 Mavlink Socket 的類別,用於測試
使用真實的 MAVLink 封包,而不是模擬的訊息對象
"""
def __init__(self, test_packets=None):
"""
Args:
test_packets: list of bytes每個元素都是完整的 MAVLink 封包
"""
self.closed = False
self.test_packets = test_packets or []
self.packet_index = 0
self.written_data = []
# 使用 pymavlink 來解析封包
from pymavlink import mavutil
self.mav_parser = mavutil.mavlink.MAVLink(self)
def recv_msg(self):
"""返回解析後的 MAVLink 訊息對象"""
if not self.test_packets or self.packet_index >= len(self.test_packets):
return None
packet = self.test_packets[self.packet_index]
self.packet_index += 1
# 使用 pymavlink 解析封包
try:
for byte in packet:
msg = self.mav_parser.parse_char(bytes([byte]))
if msg:
return msg
except Exception as e:
print(f"Error parsing packet: {e}")
return None
return None
def write(self, data):
"""寫入數據(用於檢查轉發)"""
self.written_data.append(data)
def close(self):
"""關閉 socket"""
self.closed = True
class TestMavlinkObject(unittest.TestCase):
"""測試 mavlink_object 類別的獨立功能"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建模擬的 socket使用真實封包
self.mock_socket = MockMavlinkSocket([HEARTBEAT_PACKET_1])
# 創建測試對象
self.mavlink_obj = mavlink_object(self.mock_socket)
def test_initialization(self):
"""測試 mavlink_object 初始化是否正確"""
self.assertEqual(self.mavlink_obj.socket_id, 0)
self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT)
self.assertEqual(len(self.mavlink_obj.target_sockets), 0)
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0])
self.assertEqual(self.mavlink_obj.return_msg_types, [])
def test_add_remove_target_socket(self):
"""測試添加和移除目標端口功能"""
# 添加目標端口
self.assertTrue(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
self.assertEqual(self.mavlink_obj.target_sockets[0], 1)
self.assertTrue(self.mavlink_obj.add_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
self.assertIn(2, self.mavlink_obj.target_sockets)
# 嘗試添加已存在的端口
self.assertFalse(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 嘗試添加自己的端口
self.assertFalse(self.mavlink_obj.add_target_socket(0))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
# 移除端口
self.assertTrue(self.mavlink_obj.remove_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
# 嘗試移除不存在的端口
self.assertFalse(self.mavlink_obj.remove_target_socket(2))
def test_set_message_types(self):
"""測試設置訊息類型功能"""
# 設置橋接器訊息類型
self.assertTrue(self.mavlink_obj.set_bridge_message_types([0, 30]))
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0, 30])
# 設置回傳處理器訊息類型
self.assertTrue(self.mavlink_obj.set_return_message_types([32]))
self.assertEqual(self.mavlink_obj.return_msg_types, [32])
# 測試無效的訊息類型
self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid"))
self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"]))
def test_send_message_validation(self):
"""測試 send_message 的數據驗證功能(不需要啟動 manager"""
# 測試非運行狀態下發送消息
self.assertFalse(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1))
# 測試無效的數據類型
self.mavlink_obj.state = MavlinkObjectState.RUNNING # 臨時設置狀態
self.assertFalse(self.mavlink_obj.send_message("invalid"))
self.assertFalse(self.mavlink_obj.send_message(123))
# 測試太短的封包
self.assertFalse(self.mavlink_obj.send_message(bytes([0xFE, 0x00])))
# 測試無效的起始標記
invalid_packet = bytes([0xFF, 0x09, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00])
self.assertFalse(self.mavlink_obj.send_message(invalid_packet))
# 測試有效的封包可以加入佇列
self.assertTrue(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(self.mavlink_obj.outgoing_msgs), 1)
self.mavlink_obj.state = MavlinkObjectState.INIT # 恢復狀態
class TestAsyncIOManager(unittest.TestCase):
"""測試 async_io_manager 類別的獨立功能"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建 async_io_manager 實例
self.manager = async_io_manager()
# 創建模擬 mavlink 對象,使用真實封包
self.mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2])
self.mock_socket2 = MockMavlinkSocket([HEARTBEAT_PACKET_3])
self.mavlink_obj1 = mavlink_object(self.mock_socket1)
self.mavlink_obj2 = mavlink_object(self.mock_socket2)
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.shutdown()
def test_singleton_pattern(self):
"""測試 async_io_manager 的單例模式"""
manager1 = async_io_manager()
manager2 = async_io_manager()
self.assertIs(manager1, manager2)
def test_start_stop(self):
"""測試 async_io_manager 的啟動和停止功能"""
# 啟動管理器
self.manager.start()
self.assertTrue(self.manager.running)
self.assertIsNotNone(self.manager.thread)
# 再次啟動應該沒有效果
old_thread = self.manager.thread
self.manager.start()
self.assertIs(self.manager.thread, old_thread)
# 停止管理器
self.manager.shutdown()
self.assertFalse(self.manager.running)
# 最多等待 5 秒讓線程結束
start_time = time.time()
while self.manager.thread.is_alive() and time.time() - start_time < 5:
time.sleep(0.1)
self.assertFalse(self.manager.thread.is_alive())
def test_add_remove_objects(self):
"""測試添加和移除 mavlink_object"""
# 啟動管理器
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj1))
self.assertEqual(len(self.manager.managed_objects), 1)
self.assertEqual(self.mavlink_obj1.state, MavlinkObjectState.RUNNING)
# 添加另一個對象
self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj2))
self.assertEqual(len(self.manager.managed_objects), 2)
# 檢查受管理對象列表
managed_objects = self.manager.get_managed_objects()
self.assertEqual(len(managed_objects), 2)
self.assertIn(0, managed_objects)
self.assertIn(1, managed_objects)
# 移除對象
self.assertTrue(self.manager.remove_mavlink_object(0))
self.assertEqual(len(self.manager.managed_objects), 1)
# 嘗試移除不存在的對象
self.assertFalse(self.manager.remove_mavlink_object(999))
# 停止管理器
self.manager.shutdown()
class TestIntegration(unittest.TestCase):
"""整合測試,測試多個 mavlink_object 之間的互動與資料流"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建 async_io_manager 實例
self.manager = async_io_manager()
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.shutdown()
def test_send_message_with_manager(self):
"""測試透過 async_io_manager 發送訊息的完整流程"""
# 創建一個新的 mavlink_object 實例
mock_socket = MockMavlinkSocket()
mavlink_obj = mavlink_object(mock_socket)
# 測試初始狀態
self.assertEqual(len(mock_socket.written_data), 0)
# 測試非運行狀態下發送消息
self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(mock_socket.written_data), 0)
# 啟動 manager
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象到 manager
self.manager.add_mavlink_object(mavlink_obj)
time.sleep(0.1) # 等待對象啟動
# 確認對象狀態
self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING)
# 測試發送消息
self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
time.sleep(0.2) # 等待消息處理
# 確認消息已發送
self.assertEqual(len(mock_socket.written_data), 1)
self.assertEqual(mock_socket.written_data[0], HEARTBEAT_PACKET_1)
# 測試連續發送多條消息
self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_2))
time.sleep(0.2) # 等待消息處理
# 確認兩條消息都已發送
self.assertEqual(len(mock_socket.written_data), 2)
self.assertEqual(mock_socket.written_data[1], HEARTBEAT_PACKET_2)
# 停止 manager
self.manager.shutdown()
time.sleep(0.5) # 等待 manager 停止
# 測試對象已關閉後發送消息
self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加
def test_data_processing_and_forwarding(self):
"""測試數據處理與轉發流程"""
# 創建用於轉發的 mavlink_objects
mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2,])
mock_socket3 = MockMavlinkSocket()
mavlink_obj1 = mavlink_object(mock_socket1)
mavlink_obj3 = mavlink_object(mock_socket3)
# 設置訊息類型
mavlink_obj1.set_bridge_message_types([0]) # 只處理 HEARTBEAT
# 設置轉發: obj1 -> obj3
mavlink_obj1.add_target_socket(mavlink_obj3.socket_id) # socket1 轉發到 socket3 (socket_id=1)
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
"""
這邊出現很奇怪的狀況 應該說 設計時沒有考量 但是實測會發現
mavlink_obj3 是接收端 必需要被優先加入 manager 才能正確接收來自 mavlink_obj1 的轉發封包
若先把 mavlink_ojb1 加入 manger 則可能會導致前面幾個封包丟失
"""
self.manager.add_mavlink_object(mavlink_obj3)
self.manager.add_mavlink_object(mavlink_obj1)
# 等待處理完成
time.sleep(0.5)
# 檢查 Ring buffer 是否有正確的數據
self.assertGreaterEqual(stream_bridge_ring.size(), 2) # 至少 2 個 HEARTBEAT
# 檢查是否正確轉發
self.assertGreaterEqual(len(mock_socket3.written_data), 2) # 至少 2 個 HEARTBEAT
# 停止管理器
self.manager.shutdown()
def test_bidirectional_forwarding(self):
"""測試雙向轉發"""
# 清空全局變數和 ring buffer
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建三個 mavlink 對象,模擬三個通道
socket1 = MockMavlinkSocket()
socket2 = MockMavlinkSocket()
socket3 = MockMavlinkSocket()
obj1 = mavlink_object(socket1)
obj2 = mavlink_object(socket2)
obj3 = mavlink_object(socket3)
# 設置雙向轉發
# obj1 <-> obj2 <-> obj3
obj1.add_target_socket(1) # obj1 -> obj2
obj2.add_target_socket(0) # obj2 -> obj1
obj2.add_target_socket(2) # obj2 -> obj3
obj3.add_target_socket(1) # obj3 -> obj2
# 啟動 async_io_manager
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加所有 mavlink_object
self.manager.add_mavlink_object(obj1)
self.manager.add_mavlink_object(obj2)
self.manager.add_mavlink_object(obj3)
# 對三個對象添加數據
socket1.test_packets.append(HEARTBEAT_PACKET_1)
socket2.test_packets.append(HEARTBEAT_PACKET_2)
socket3.test_packets.append(HEARTBEAT_PACKET_3)
# 等待處理所有訊息
time.sleep(1.0)
# 檢查轉發結果
# socket1 應該收到 socket2 的訊息
self.assertGreaterEqual(len(socket1.written_data), 1)
# socket2 應該收到 socket1 和 socket3 的訊息
self.assertGreaterEqual(len(socket2.written_data), 2)
# socket3 應該收到 socket2 的訊息
self.assertGreaterEqual(len(socket3.written_data), 1)
# 檢查 ring buffer 的數據
# 所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0]
self.assertGreaterEqual(stream_bridge_ring.size(), 3) # 至少 3 個 HEARTBEAT
# 停止管理器
self.manager.shutdown()
if __name__ == "__main__":
# 可以指定要運行的測試
# unittest.main(defaultTest="TestMavlinkObject.test_send_message_validation")
# unittest.main(defaultTest="TestAsyncIOManager.test_add_remove_objects")
unittest.main(defaultTest="TestIntegration.test_bidirectional_forwarding")
unittest.main()