You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/fc_network_adapter/tests/test_vehicleStatusPublisher.py

509 lines
15 KiB
Python

"""
VehicleStatusPublisher 測試程式
測試從 vehicle_registry 讀取資料並發布到 ROS2 topics
"""
import time
import json
import threading
# ROS2 imports
import rclpy
from rclpy.node import Node
# 標準 ROS2 消息類型
import std_msgs.msg
import sensor_msgs.msg
import geometry_msgs.msg
import mavros_msgs.msg
import fc_interfaces.msg
# 專案 imports
from ..fc_network_adapter.mavlinkROS2Nodes import (
VehicleStatusPublisher,
fc_ros_manager,
ros2_manager
)
from ..fc_network_adapter.mavlinkVehicleView import (
vehicle_registry,
ConnectionType,
ComponentType,
)
class TestSubscriber(Node):
"""測試用的訂閱者節點 - 接收並記錄收到的消息"""
def __init__(self, sysid: int = 1):
super().__init__(f'test_subscriber_sys{sysid}')
self.sysid = sysid
self.received_messages = {
'position_gnss': [],
'attitude': [],
'velocity': [],
'battery': [],
'vfr_hud': [],
'mode': [],
'summary': [],
}
# 建立所有訂閱者
self._create_subscriptions()
print(f"[TestSubscriber] 已建立,監聽 sys{sysid} 的所有 topics")
def _create_subscriptions(self):
"""建立所有 topic 的訂閱者"""
base_topic = f'{VehicleStatusPublisher.topicString_prefix}/sys{self.sysid}'
# Position
self.create_subscription(
fc_interfaces.msg.GnssRaw,
f'{base_topic}/position_gnss',
lambda msg: self._on_message('position_gnss', msg),
10
)
# Attitude
self.create_subscription(
sensor_msgs.msg.Imu,
f'{base_topic}/attitude',
lambda msg: self._on_message('attitude', msg),
10
)
# Velocity
self.create_subscription(
geometry_msgs.msg.TwistStamped,
f'{base_topic}/velocity',
lambda msg: self._on_message('velocity', msg),
10
)
# Battery
self.create_subscription(
sensor_msgs.msg.BatteryState,
f'{base_topic}/battery',
lambda msg: self._on_message('battery', msg),
10
)
# VFR HUD
self.create_subscription(
mavros_msgs.msg.VfrHud,
f'{base_topic}/vfr_hud',
lambda msg: self._on_message('vfr_hud', msg),
10
)
# Mode
self.create_subscription(
std_msgs.msg.String,
f'{base_topic}/mode',
lambda msg: self._on_message('mode', msg),
10
)
# Summary
self.create_subscription(
std_msgs.msg.String,
f'{base_topic}/summary',
lambda msg: self._on_message('summary', msg),
10
)
def _on_message(self, topic_name: str, msg):
"""通用消息接收回調"""
self.received_messages[topic_name].append(msg)
print(f"[TestSubscriber] 收到 {topic_name}: {self._format_msg(topic_name, msg)}")
def _format_msg(self, topic_name: str, msg) -> str:
"""格式化消息以便顯示"""
if topic_name == 'position_gnss':
return f"lat={msg.latitude:.6f}, lon={msg.longitude:.6f}, alt={msg.altitude:.2f}m"
elif topic_name == 'attitude':
return f"quat=({msg.orientation.x:.3f}, {msg.orientation.y:.3f}, {msg.orientation.z:.3f}, {msg.orientation.w:.3f})"
elif topic_name == 'velocity':
return f"linear=({msg.twist.linear.x:.2f}, {msg.twist.linear.y:.2f}, {msg.twist.linear.z:.2f})"
elif topic_name == 'battery':
return f"voltage={msg.voltage:.2f}V, percent={msg.percentage*100:.1f}%"
elif topic_name == 'vfr_hud':
return f"airspeed={msg.airspeed:.2f}, groundspeed={msg.groundspeed:.2f}, heading={msg.heading}"
elif topic_name == 'mode':
return f"mode={msg.data}"
elif topic_name == 'summary':
try:
data = json.loads(msg.data)
return f"sysid={data['sysid']}, socket_id={data['socket_id']}, mode={data['mode_name']}"
except:
return msg.data
return str(msg)
def get_message_count(self, topic_name: str) -> int:
"""獲取收到的消息數量"""
return len(self.received_messages[topic_name])
def clear_messages(self):
"""清空已收到的消息"""
for key in self.received_messages:
self.received_messages[key].clear()
def setup_test_vehicle(sysid: int = 1, socket_id: int = 10):
"""
建立測試用的載具數據
Args:
sysid: 系統 ID
socket_id: Socket ID
"""
print(f"\n=== 建立測試載具 (sysid={sysid}, socket_id={socket_id}) ===")
# 註冊載具
vehicle = vehicle_registry.register(sysid)
vehicle.kind = "Copter"
vehicle.vehicle_type = 2 # MAV_TYPE_QUADROTOR
vehicle.connected_via = ConnectionType.UDP
vehicle.custom_meta['socket_id'] = socket_id
# 新增 autopilot 組件 (component_id=1)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
autopilot.mav_type = 2 # MAV_TYPE_QUADROTOR
autopilot.mav_autopilot = 3 # MAV_AUTOPILOT_ARDUPILOTMEGA
# 填充狀態資料
status = autopilot.status
# 位置
status.position.latitude = 25.0330
status.position.longitude = 121.5654
status.position.altitude = 100.5
status.position.relative_altitude = 50.0
status.position.timestamp = time.time()
# 姿態
status.attitude.roll = 0.1
status.attitude.pitch = -0.05
status.attitude.yaw = 1.57
status.attitude.rollspeed = 0.01
status.attitude.pitchspeed = 0.02
status.attitude.yawspeed = 0.03
status.attitude.timestamp = time.time()
# 飛行模式
status.mode.base_mode = 89
status.mode.custom_mode = 4
status.mode.mode_name = "GUIDED"
status.mode.timestamp = time.time()
# 電池
status.battery.voltage = 12.6
status.battery.current = 15.3
status.battery.remaining = 75
status.battery.temperature = 35.2
status.battery.timestamp = time.time()
# GPS
status.gps.fix_type = 3 # 3D fix
status.gps.satellites_visible = 12
status.gps.eph = 100
status.gps.epv = 150
status.gps.timestamp = time.time()
# VFR
status.vfr.airspeed = 5.5
status.vfr.groundspeed = 6.0
status.vfr.heading = 90
status.vfr.throttle = 65
status.vfr.climb = 1.2
status.vfr.timestamp = time.time()
# 系統狀態
status.armed = True
status.system_status = 4 # MAV_STATE_ACTIVE
# 更新封包統計
autopilot.update_packet_stats(seq=10, msg_type=33, timestamp=time.time())
print(f"✓ 載具 {sysid} 已建立並填充測試數據")
return vehicle
def test_basic_publishing():
"""測試基本的發布功能"""
print("\n" + "="*60)
print("測試 1: 基本發布功能")
print("="*60)
# 清空 registry
vehicle_registry.clear()
# 建立測試載具
vehicle = setup_test_vehicle(sysid=1, socket_id=10)
# 初始化 ROS2 管理器
if not ros2_manager.initialized:
ros2_manager.initialize()
# 建立測試訂閱者
test_sub = TestSubscriber(sysid=1)
# 啟動 publisher
ros2_manager.start()
print("\n--- 開始發布,等待 5 秒 ---")
# 運行 5 秒,持續 spin
start_time = time.time()
while time.time() - start_time < 5.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
# 檢查收到的消息
print("\n--- 消息統計 ---")
total_messages = 0
for topic in ['position_gnss', 'attitude', 'velocity', 'battery', 'vfr_hud', 'mode', 'summary']:
count = test_sub.get_message_count(topic)
total_messages += count
print(f" {topic:15s}: {count:3d} 條消息")
print(f"\n總計收到: {total_messages} 條消息")
# 驗證
if total_messages > 0:
print("\n✓ 測試通過:成功接收到消息")
else:
print("\n✗ 測試失敗:沒有接收到任何消息")
# 停止
ros2_manager.stop()
test_sub.destroy_node()
def test_frequency_control():
"""測試頻率控制功能"""
print("\n" + "="*60)
print("測試 2: 頻率控制")
print("="*60)
# 清空 registry
vehicle_registry.clear()
# 建立測試載具
vehicle = setup_test_vehicle(sysid=1, socket_id=10)
# 初始化(如果還沒初始化)
if not ros2_manager.initialized:
ros2_manager.initialize()
# 建立測試訂閱者
test_sub = TestSubscriber(sysid=1)
# 修改頻率設定
publisher_node = ros2_manager.status_publisher
publisher_node.rate_controller.topic_intervals = {
'position_gnss': 1.5,
'attitude': 1.0,
'velocity': 1.0,
'battery': 1.0,
'vfr_hud': 0.5,
'mode': 0.0,
'summary': 0.0,
}
# 啟動 publisher
ros2_manager.start()
print("\n--- 測試頻率控制,運行 3 秒 ---")
# 運行 3 秒
start_time = time.time()
while time.time() - start_time < 3.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
# 檢查頻率
print("\n--- 頻率分析 ---")
print("預期position 約 2 條 (0.67Hz)attitude/battery/velocity 約 3 條 (1Hz)vfr_hud 約 6 條 (2Hz) mode/summary 不發布")
print("2Hz Topics (預期 ~6 條):")
for topic in ['position_gnss', 'attitude', 'velocity', 'vfr_hud']:
count = test_sub.get_message_count(topic)
print(f" {topic:15s}: {count:3d}")
for topic in ['battery', 'mode', 'summary']:
count = test_sub.get_message_count(topic)
print(f" {topic:15s}: {count:3d}")
print("\n✓ 頻率控制測試完成")
# 停止
ros2_manager.stop()
test_sub.destroy_node()
def test_multi_vehicle():
"""測試多載具發布"""
print("\n" + "="*60)
print("測試 3: 多載具發布")
print("="*60)
# 清空 registry
vehicle_registry.clear()
# 建立 3 個測試載具
v1 = setup_test_vehicle(sysid=1, socket_id=10)
v2 = setup_test_vehicle(sysid=2, socket_id=11)
v3 = setup_test_vehicle(sysid=3, socket_id=12)
# 修改各載具的位置以便區分
v2.components[1].status.position.latitude = 26.0
v3.components[1].status.position.latitude = 27.0
# 初始化
if not ros2_manager.initialized:
ros2_manager.initialize()
# 建立 3 個測試訂閱者
test_sub1 = TestSubscriber(sysid=1)
test_sub2 = TestSubscriber(sysid=2)
test_sub3 = TestSubscriber(sysid=3)
# 啟動 publisher
ros2_manager.start()
print("\n--- 測試多載具,運行 3 秒 ---")
# 運行 3 秒
start_time = time.time()
while time.time() - start_time < 3.0:
rclpy.spin_once(test_sub1, timeout_sec=0.05)
rclpy.spin_once(test_sub2, timeout_sec=0.05)
rclpy.spin_once(test_sub3, timeout_sec=0.05)
time.sleep(0.1)
# 檢查每個載具的消息
print("\n--- 各載具消息統計 ---")
for sysid, test_sub in [(1, test_sub1), (2, test_sub2), (3, test_sub3)]:
total = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys())
print(f"載具 {sysid}: {total:3d} 條消息")
# 檢查 summary 中的 socket_id
if test_sub.get_message_count('summary') > 0:
last_summary = test_sub.received_messages['summary'][-1]
data = json.loads(last_summary.data)
print(f" └─ socket_id={data['socket_id']}, lat={data['latitude']:.1f}")
print("\n✓ 多載具測試完成")
# 停止
ros2_manager.stop()
test_sub1.destroy_node()
test_sub2.destroy_node()
test_sub3.destroy_node()
def test_dynamic_vehicle():
"""測試動態新增/移除載具"""
print("\n" + "="*60)
print("測試 4: 動態載具管理")
print("="*60)
# 清空 registry
vehicle_registry.clear()
# 初始化
if not ros2_manager.initialized:
ros2_manager.initialize()
# 建立測試訂閱者
test_sub = TestSubscriber(sysid=1)
# 啟動 publisher
ros2_manager.start()
print("\n--- 階段 1: 無載具,運行 1 秒 ---")
start_time = time.time()
while time.time() - start_time < 1.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
count_before = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys())
print(f"收到消息: {count_before}")
# 新增載具
print("\n--- 階段 2: 新增載具,運行 2 秒 ---")
vehicle = setup_test_vehicle(sysid=1, socket_id=10)
start_time = time.time()
while time.time() - start_time < 2.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
count_after = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys())
print(f"收到消息: {count_after - count_before}")
# 移除載具
print("\n--- 階段 3: 移除載具,運行 1 秒 ---")
vehicle_registry.unregister(1)
start_time = time.time()
while time.time() - start_time < 1.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
count_final = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys())
print(f"收到消息: {count_final - count_after} 條(應該為 0")
if count_final - count_after == 0:
print("\n✓ 動態載具管理測試通過")
else:
print("\n✗ 移除載具後仍收到消息")
# 停止
ros2_manager.stop()
test_sub.destroy_node()
def main():
"""主測試函數"""
print("\n" + "="*60)
print("VehicleStatusPublisher 測試程式")
print("="*60)
try:
# 執行各項測試
test_basic_publishing()
# time.sleep(1)
# test_frequency_control()
# time.sleep(1)
# test_multi_vehicle()
# time.sleep(1)
# test_dynamic_vehicle()
print("\n" + "="*60)
print("所有測試完成!")
print("="*60)
except KeyboardInterrupt:
print("\n\n測試被中斷")
except Exception as e:
print(f"\n\n測試出錯: {e}")
import traceback
traceback.print_exc()
finally:
# 清理
if ros2_manager.initialized:
ros2_manager.shutdown()
vehicle_registry.clear()
print("\n清理完成")
if __name__ == '__main__':
main()