(add) 1. 增加 ros2 topic publish

chiyu
Chiyu Chen 4 months ago
parent fa0a2d0831
commit e0165c9aab

@ -0,0 +1,611 @@
"""
MAVLink ROS2 Nodes
包含兩個獨立的 ROS2 Node
1. VehicleStatusPublisher - 發布載具狀態到 ROS2 topics
2. MavlinkCommandService - 提供 MAVLink 指令 service 介面
vehicle_registry 讀取狀態數據頻率控制模組化設計
"""
import os
import time
import math
import threading
from typing import Dict, Optional
# ROS2 imports
import rclpy
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
# ROS2 Message imports
import std_msgs.msg
import sensor_msgs.msg
import geometry_msgs.msg
import mavros_msgs.msg
# 自定義 imports
from . import mavlinkVehicleView as mvv
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
# ============================================================================
# 頻率控制器
# ============================================================================
class PublishRateController:
"""發布頻率控制器 - 按時間間隔控制發布頻率"""
def __init__(self):
# 各 topic 的發布間隔(秒)
self.topic_intervals = {
'position': 0.5, # GPS位置 2Hz
'attitude': 0.5, # 姿態 2Hz
'velocity': 0.5, # 速度 2Hz
'battery': 1.0, # 電池 1Hz
'vfr_hud': 0.5, # VFR HUD 2Hz
'mode': 1.0, # 飛行模式 1Hz
'summary': 1.0, # 載具摘要 1Hz
}
# 記錄每個 topic 的最後發布時間 {(sysid, topic): timestamp}
self.last_publish_time: Dict[tuple, float] = {}
def should_publish(self, sysid: int, topic: str) -> bool:
"""
檢查是否應該發布此 topic
Args:
sysid: 系統ID
topic: topic 名稱
Returns:
bool: True 表示應該發布
"""
key = (sysid, topic)
now = time.time()
# 首次發布
if key not in self.last_publish_time:
self.last_publish_time[key] = now
return True
# 檢查時間間隔
interval = self.topic_intervals.get(topic, 1.0)
if now - self.last_publish_time[key] >= interval:
self.last_publish_time[key] = now
return True
return False
def reset(self):
"""重置所有計時器"""
self.last_publish_time.clear()
# ============================================================================
# VehicleStatusPublisher Node
# ============================================================================
class VehicleStatusPublisher(Node):
"""
載具狀態發布者 - vehicle_registry 讀取數據並發布到 ROS2 topics
職責:
- 定期從 vehicle_registry 讀取載具狀態
- 頻率控制位置/姿態 2Hz電池/摘要 1Hz
- 發布標準 ROS2 消息類型
- 檢測訂閱者按需發布
"""
def __init__(self):
super().__init__('vehicle_status_publisher')
# 頻率控制器
self.rate_controller = PublishRateController()
# fc_publishers 字典 {(sysid, topic_name): publisher}
self.fc_publishers: Dict[tuple, any] = {}
# 定時器:以較高頻率檢查 vehicle_registry 並發布
# 10Hz 檢查頻率,但通過 rate_controller 控制實際發布頻率
self.timer_period = 0.1 # 100ms
self.timer = self.create_timer(self.timer_period, self.timer_callback)
# 狀態標誌
self.running = True
# logger.info("VehicleStatusPublisher initialized")
def timer_callback(self):
"""定時器回調 - 檢查所有載具並發布狀態"""
if not self.running:
return
# 從 vehicle_registry 獲取所有載具
all_vehicles = mvv.vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
self._publish_vehicle_status(vehicle)
def _publish_vehicle_status(self, vehicle: mvv.VehicleView):
"""
發布單個載具的所有狀態
Args:
vehicle: VehicleView 實例
"""
sysid = vehicle.sysid
# 假設只有一個 autopilot component (component_id=1)
component = vehicle.get_component(1)
if not component:
return
status = component.status
# 發布各種狀態(通過頻率控制器判斷是否發布)
self._publish_position(sysid, status)
self._publish_attitude(sysid, status)
self._publish_velocity(sysid, status)
self._publish_battery(sysid, status)
self._publish_vfr_hud(sysid, status)
self._publish_mode(sysid, status)
self._publish_summary(vehicle)
def _get_or_create_publisher(self, sysid: int, topic: str, msg_type, qos: int = 1):
"""
獲取或創建 publisher
Args:
sysid: 系統ID
topic: topic 名稱
msg_type: ROS2 消息類型
qos: QoS 設置
Returns:
publisher 對象
"""
key = (sysid, topic)
if key not in self.fc_publishers:
topic_name = f'/vehicle_status/sys{sysid}/{topic}'
publisher = self.create_publisher(msg_type, topic_name, qos)
self.fc_publishers[key] = publisher
logger.info(f"Created publisher: {topic_name}")
return self.fc_publishers[key]
def _publish_position(self, sysid: int, status: mvv.ComponentStatus):
"""發布 GPS 位置"""
if not self.rate_controller.should_publish(sysid, 'position'):
return
pos = status.position
if pos.latitude is None or pos.longitude is None:
return
publisher = self._get_or_create_publisher(sysid, 'position', sensor_msgs.msg.NavSatFix)
# 檢查是否有訂閱者
if publisher.get_subscription_count() == 0:
return
msg = sensor_msgs.msg.NavSatFix()
msg.latitude = pos.latitude
msg.longitude = pos.longitude
msg.altitude = pos.altitude if pos.altitude is not None else 0.0
# GPS 狀態資訊
gps = status.gps
if gps.fix_type is not None:
msg.status.status = gps.fix_type - 1 # MAVLink fix_type 轉 NavSatStatus
publisher.publish(msg)
def _publish_attitude(self, sysid: int, status: mvv.ComponentStatus):
"""發布姿態IMU"""
if not self.rate_controller.should_publish(sysid, 'attitude'):
return
att = status.attitude
if att.roll is None:
return
publisher = self._get_or_create_publisher(sysid, 'attitude', sensor_msgs.msg.Imu)
if publisher.get_subscription_count() == 0:
return
msg = sensor_msgs.msg.Imu()
# 歐拉角轉四元數
qx, qy, qz, qw = self._euler_to_quaternion(
att.roll, att.pitch, att.yaw
)
msg.orientation.x = qx
msg.orientation.y = qy
msg.orientation.z = qz
msg.orientation.w = qw
# 角速度
if att.rollspeed is not None:
msg.angular_velocity.x = att.rollspeed
msg.angular_velocity.y = att.pitchspeed
msg.angular_velocity.z = att.yawspeed
publisher.publish(msg)
def _publish_velocity(self, sysid: int, status: mvv.ComponentStatus):
"""發布速度"""
if not self.rate_controller.should_publish(sysid, 'velocity'):
return
vfr = status.vfr
if vfr.groundspeed is None:
return
publisher = self._get_or_create_publisher(sysid, 'velocity', geometry_msgs.msg.TwistStamped)
if publisher.get_subscription_count() == 0:
return
msg = geometry_msgs.msg.TwistStamped()
msg.header.stamp = self.get_clock().now().to_msg()
# 使用 VFR HUD 的地速和航向計算速度分量
if vfr.heading is not None:
heading_rad = math.radians(vfr.heading)
msg.twist.linear.x = vfr.groundspeed * math.cos(heading_rad)
msg.twist.linear.y = vfr.groundspeed * math.sin(heading_rad)
# 爬升率作為 z 軸速度
if vfr.climb is not None:
msg.twist.linear.z = vfr.climb
publisher.publish(msg)
def _publish_battery(self, sysid: int, status: mvv.ComponentStatus):
"""發布電池狀態"""
if not self.rate_controller.should_publish(sysid, 'battery'):
return
bat = status.battery
if bat.voltage is None:
return
publisher = self._get_or_create_publisher(sysid, 'battery', sensor_msgs.msg.BatteryState)
if publisher.get_subscription_count() == 0:
return
msg = sensor_msgs.msg.BatteryState()
msg.voltage = bat.voltage
if bat.current is not None:
msg.current = bat.current
if bat.remaining is not None:
msg.percentage = bat.remaining / 100.0
if bat.temperature is not None:
msg.temperature = bat.temperature
publisher.publish(msg)
def _publish_vfr_hud(self, sysid: int, status: mvv.ComponentStatus):
"""發布 VFR HUD"""
if not self.rate_controller.should_publish(sysid, 'vfr_hud'):
return
vfr = status.vfr
if vfr.airspeed is None:
return
publisher = self._get_or_create_publisher(sysid, 'vfr_hud', mavros_msgs.msg.VfrHud)
if publisher.get_subscription_count() == 0:
return
msg = mavros_msgs.msg.VfrHud()
msg.airspeed = vfr.airspeed if vfr.airspeed is not None else 0.0
msg.groundspeed = vfr.groundspeed if vfr.groundspeed is not None else 0.0
msg.heading = vfr.heading if vfr.heading is not None else 0
msg.throttle = float(vfr.throttle) if vfr.throttle is not None else 0.0
msg.climb = vfr.climb if vfr.climb is not None else 0.0
# altitude 需要從 position 獲取
if status.position.altitude is not None:
msg.altitude = status.position.altitude
publisher.publish(msg)
def _publish_mode(self, sysid: int, status: mvv.ComponentStatus):
"""發布飛行模式"""
if not self.rate_controller.should_publish(sysid, 'mode'):
return
mode = status.mode
if mode.mode_name is None:
return
publisher = self._get_or_create_publisher(sysid, 'mode', std_msgs.msg.String)
if publisher.get_subscription_count() == 0:
return
msg = std_msgs.msg.String()
msg.data = mode.mode_name
publisher.publish(msg)
def _publish_summary(self, vehicle: mvv.VehicleView):
"""
發布載具摘要資訊自定義格式使用 String 暫時代替
TODO: 未來可以定義專門的 VehicleSummary.msg
"""
sysid = vehicle.sysid
if not self.rate_controller.should_publish(sysid, 'summary'):
return
publisher = self._get_or_create_publisher(sysid, 'summary', std_msgs.msg.String)
if publisher.get_subscription_count() == 0:
return
# 獲取 autopilot component
component = vehicle.get_component(1)
if not component:
return
status = component.status
# 構建摘要資訊JSON 格式字串)
import json
summary = {
'sysid': sysid,
'vehicle_type': vehicle.vehicle_type if vehicle.vehicle_type else 0,
'autopilot': component.mav_autopilot if component.mav_autopilot else 0,
'socket_id': vehicle.custom_meta.get('socket_id', -1), # 重要!
'armed': status.armed if status.armed is not None else False,
'mode_custom': status.mode.custom_mode if status.mode.custom_mode else 0,
'mode_name': status.mode.mode_name if status.mode.mode_name else "UNKNOWN",
'latitude': status.position.latitude if status.position.latitude else 0.0,
'longitude': status.position.longitude if status.position.longitude else 0.0,
'altitude': status.position.altitude if status.position.altitude else 0.0,
'battery_percent': status.battery.remaining if status.battery.remaining else 0,
'gps_fix': status.gps.fix_type if status.gps.fix_type else 0,
'connection_type': vehicle.connected_via.value,
'last_update': component.packet_stats.last_msg_time if component.packet_stats.last_msg_time else 0.0,
}
msg = std_msgs.msg.String()
msg.data = json.dumps(summary)
publisher.publish(msg)
@staticmethod
def _euler_to_quaternion(roll, pitch, yaw):
"""
歐拉角轉四元數
Args:
roll: 橫滾角 (弧度)
pitch: 俯仰角 (弧度)
yaw: 偏航角 (弧度)
Returns:
tuple: (qx, qy, qz, qw)
"""
qx = math.sin(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) - \
math.cos(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
qy = math.cos(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + \
math.sin(roll/2) * math.cos(pitch/2) * math.sin(yaw/2)
qz = math.cos(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) - \
math.sin(roll/2) * math.sin(pitch/2) * math.cos(yaw/2)
qw = math.cos(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) + \
math.sin(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
return (qx, qy, qz, qw)
def stop(self):
"""停止發布"""
self.running = False
# logger.info("VehicleStatusPublisher stopped")
# ============================================================================
# MavlinkCommandService Node
# ============================================================================
class MavlinkCommandService(Node):
"""
MAVLink 指令服務節點 - 提供 ROS2 service 介面來發送 MAVLink 指令
職責:
- 作為 service server等待 client 請求
- 接收請求組裝 MAVLink 封包
- 調用 mavlinkObject 發送封包
- 處理 ACK 等待和超時未來實現
TODO: 稍後實現
"""
def __init__(self):
super().__init__('mavlink_command_service')
logger.info("MavlinkCommandService initialized (not implemented yet)")
def stop(self):
"""停止服務"""
# logger.info("MavlinkCommandService stopped")
# ============================================================================
# ROS2 節點管理器
# ============================================================================
class fc_ros_manager:
"""
MAVLink ROS2 節點管理器
管理兩個獨立的 ROS2 Node
- VehicleStatusPublisher
- MavlinkCommandService
提供統一的啟動/停止介面給 mainOrchestrator
"""
def __init__(self):
self.initialized = False
self.running = False
# 两个 node 实例
self.status_publisher: Optional[VehicleStatusPublisher] = None
self.command_service: Optional[MavlinkCommandService] = None
# Executor & Thread
self.executor: Optional[MultiThreadedExecutor] = None
self.spin_thread: Optional[threading.Thread] = None
def initialize(self):
"""初始化 ROS2 环境和 nodes"""
if self.initialized:
logger.warning("fc_ros_manager already initialized")
return False
try:
# 初始化 ROS2
rclpy.init()
# 創建節點 node
self.status_publisher = VehicleStatusPublisher()
self.command_service = MavlinkCommandService()
# 創建執行者 MultiThreadedExecutor
self.executor = MultiThreadedExecutor()
self.executor.add_node(self.status_publisher)
self.executor.add_node(self.command_service)
self.initialized = True
# logger.info("fc_ros_manager initialized")
return True
except Exception as e:
logger.error(f"Failed to initialize fc_ros_manager: {e}")
return False
def start(self):
"""啟動 ROS2 nodes (在獨立執行緒中運行 executor) """
if not self.initialized:
logger.error("fc_ros_manager initialize failed or not called")
return False
if self.running:
logger.warning("fc_ros_manager already running")
return False
try:
self.running = True
self.spin_thread = threading.Thread(
target=self._spin_executor,
daemon=True,
name="ROS2ExecutorThread"
)
self.spin_thread.start()
logger.info("fc_ros_manager started <-")
return True
except Exception as e:
logger.error(f"Failed to start fc_ros_manager: {e}")
self.running = False
return False
def _spin_executor(self):
"""在 thread 中運行的 executor"""
try:
# logger.info("ROS2 executor spinning...")
while self.running:
self.executor.spin_once(timeout_sec=0.1)
except Exception as e:
logger.error(f"fc_ros_manager error in spinning executor: {e}")
def stop(self):
"""停止 ROS2 nodes"""
if not self.running:
logger.warning("fc_ros_manager not running")
return False
try:
logger.info("Stopping fc_ros_manager...")
# 標記停止
self.running = False
# 停止各個 node
if self.status_publisher:
self.status_publisher.stop()
if self.command_service:
self.command_service.stop()
# 等待 spin 執行緒結束
if self.spin_thread and self.spin_thread.is_alive():
self.spin_thread.join(timeout=2.0)
logger.info("fc_ros_manager thread END!")
return True
except Exception as e:
logger.error(f"Error stopping fc_ros_manager: {e}")
return False
def shutdown(self):
"""完全關閉並清理資源"""
if self.running:
self.stop()
if self.initialized:
try:
# 銷毀 nodes
if self.status_publisher:
self.status_publisher.destroy_node()
if self.command_service:
self.command_service.destroy_node()
# 關閉 ROS2
if rclpy.ok():
rclpy.shutdown()
self.initialized = False
logger.info("fc_ros_manager Node END!")
except Exception as e:
logger.error(f"Error during shutdown: {e}")
def get_status(self) -> dict:
return {
'initialized': self.initialized,
'running': self.running,
'status_publisher_active': self.status_publisher is not None and self.status_publisher.running,
'command_service_active': self.command_service is not None,
}
# ============================================================================
# 全域實例
# ============================================================================
# 全域管理器實例(供 mainOrchestrator 使用)
ros2_manager = fc_ros_manager()
'''
================= 改版記錄 ============================
2026.01.20
1. 重構自 mavlinkPublish.py
2. 實現 VehicleStatusPublisher - vehicle_registry 讀取並發布狀態
3. 添加頻率控制器 - 按需發布2Hz 位置/姿態1Hz 電池/摘要
4. 預留 MavlinkCommandService 結構稍後實現
5. 提供 fc_ros_manager 統一管理介面
'''

@ -0,0 +1,498 @@
"""
VehicleStatusPublisher 測試程式
測試從 vehicle_registry 讀取資料並發布到 ROS2 topics
"""
import time
import json
import threading
# ROS2 imports
import rclpy
from rclpy.node import Node
# 標準 ROS2 消息類型
import std_msgs.msg
import sensor_msgs.msg
import geometry_msgs.msg
import mavros_msgs.msg
# 專案 imports
from ..fc_network_adapter.mavlinkROS2Nodes import (
VehicleStatusPublisher,
fc_ros_manager,
ros2_manager
)
from ..fc_network_adapter.mavlinkVehicleView import (
vehicle_registry,
ConnectionType,
ComponentType,
)
class TestSubscriber(Node):
"""測試用的訂閱者節點 - 接收並記錄收到的消息"""
def __init__(self, sysid: int = 1):
super().__init__(f'test_subscriber_sys{sysid}')
self.sysid = sysid
self.received_messages = {
'position': [],
'attitude': [],
'velocity': [],
'battery': [],
'vfr_hud': [],
'mode': [],
'summary': [],
}
# 建立所有訂閱者
self._create_subscriptions()
print(f"[TestSubscriber] 已建立,監聽 sys{sysid} 的所有 topics")
def _create_subscriptions(self):
"""建立所有 topic 的訂閱者"""
base_topic = f'/vehicle_status/sys{self.sysid}'
# Position
self.create_subscription(
sensor_msgs.msg.NavSatFix,
f'{base_topic}/position',
lambda msg: self._on_message('position', msg),
10
)
# Attitude
self.create_subscription(
sensor_msgs.msg.Imu,
f'{base_topic}/attitude',
lambda msg: self._on_message('attitude', msg),
10
)
# Velocity
self.create_subscription(
geometry_msgs.msg.TwistStamped,
f'{base_topic}/velocity',
lambda msg: self._on_message('velocity', msg),
10
)
# Battery
self.create_subscription(
sensor_msgs.msg.BatteryState,
f'{base_topic}/battery',
lambda msg: self._on_message('battery', msg),
10
)
# VFR HUD
self.create_subscription(
mavros_msgs.msg.VfrHud,
f'{base_topic}/vfr_hud',
lambda msg: self._on_message('vfr_hud', msg),
10
)
# Mode
self.create_subscription(
std_msgs.msg.String,
f'{base_topic}/mode',
lambda msg: self._on_message('mode', msg),
10
)
# Summary
self.create_subscription(
std_msgs.msg.String,
f'{base_topic}/summary',
lambda msg: self._on_message('summary', msg),
10
)
def _on_message(self, topic_name: str, msg):
"""通用消息接收回調"""
self.received_messages[topic_name].append(msg)
print(f"[TestSubscriber] 收到 {topic_name}: {self._format_msg(topic_name, msg)}")
def _format_msg(self, topic_name: str, msg) -> str:
"""格式化消息以便顯示"""
if topic_name == 'position':
return f"lat={msg.latitude:.6f}, lon={msg.longitude:.6f}, alt={msg.altitude:.2f}m"
elif topic_name == 'attitude':
return f"quat=({msg.orientation.x:.3f}, {msg.orientation.y:.3f}, {msg.orientation.z:.3f}, {msg.orientation.w:.3f})"
elif topic_name == 'velocity':
return f"linear=({msg.twist.linear.x:.2f}, {msg.twist.linear.y:.2f}, {msg.twist.linear.z:.2f})"
elif topic_name == 'battery':
return f"voltage={msg.voltage:.2f}V, percent={msg.percentage*100:.1f}%"
elif topic_name == 'vfr_hud':
return f"airspeed={msg.airspeed:.2f}, groundspeed={msg.groundspeed:.2f}, heading={msg.heading}"
elif topic_name == 'mode':
return f"mode={msg.data}"
elif topic_name == 'summary':
try:
data = json.loads(msg.data)
return f"sysid={data['sysid']}, socket_id={data['socket_id']}, mode={data['mode_name']}"
except:
return msg.data
return str(msg)
def get_message_count(self, topic_name: str) -> int:
"""獲取收到的消息數量"""
return len(self.received_messages[topic_name])
def clear_messages(self):
"""清空已收到的消息"""
for key in self.received_messages:
self.received_messages[key].clear()
def setup_test_vehicle(sysid: int = 1, socket_id: int = 10):
"""
建立測試用的載具數據
Args:
sysid: 系統 ID
socket_id: Socket ID
"""
print(f"\n=== 建立測試載具 (sysid={sysid}, socket_id={socket_id}) ===")
# 註冊載具
vehicle = vehicle_registry.register(sysid)
vehicle.kind = "Copter"
vehicle.vehicle_type = 2 # MAV_TYPE_QUADROTOR
vehicle.connected_via = ConnectionType.UDP
vehicle.custom_meta['socket_id'] = socket_id
# 新增 autopilot 組件 (component_id=1)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
autopilot.mav_type = 2 # MAV_TYPE_QUADROTOR
autopilot.mav_autopilot = 3 # MAV_AUTOPILOT_ARDUPILOTMEGA
# 填充狀態資料
status = autopilot.status
# 位置
status.position.latitude = 25.0330
status.position.longitude = 121.5654
status.position.altitude = 100.5
status.position.relative_altitude = 50.0
status.position.timestamp = time.time()
# 姿態
status.attitude.roll = 0.1
status.attitude.pitch = -0.05
status.attitude.yaw = 1.57
status.attitude.rollspeed = 0.01
status.attitude.pitchspeed = 0.02
status.attitude.yawspeed = 0.03
status.attitude.timestamp = time.time()
# 飛行模式
status.mode.base_mode = 89
status.mode.custom_mode = 4
status.mode.mode_name = "GUIDED"
status.mode.timestamp = time.time()
# 電池
status.battery.voltage = 12.6
status.battery.current = 15.3
status.battery.remaining = 75
status.battery.temperature = 35.2
status.battery.timestamp = time.time()
# GPS
status.gps.fix_type = 3 # 3D fix
status.gps.satellites_visible = 12
status.gps.eph = 100
status.gps.epv = 150
status.gps.timestamp = time.time()
# VFR
status.vfr.airspeed = 5.5
status.vfr.groundspeed = 6.0
status.vfr.heading = 90
status.vfr.throttle = 65
status.vfr.climb = 1.2
status.vfr.timestamp = time.time()
# 系統狀態
status.armed = True
status.system_status = 4 # MAV_STATE_ACTIVE
# 更新封包統計
autopilot.update_packet_stats(seq=10, msg_type=33, timestamp=time.time())
print(f"✓ 載具 {sysid} 已建立並填充測試數據")
return vehicle
def test_basic_publishing():
"""測試基本的發布功能"""
print("\n" + "="*60)
print("測試 1: 基本發布功能")
print("="*60)
# 清空 registry
vehicle_registry.clear()
# 建立測試載具
vehicle = setup_test_vehicle(sysid=1, socket_id=10)
# 初始化 ROS2 管理器
if not ros2_manager.initialized:
ros2_manager.initialize()
# 建立測試訂閱者
test_sub = TestSubscriber(sysid=1)
# 啟動 publisher
ros2_manager.start()
print("\n--- 開始發布,等待 5 秒 ---")
# 運行 5 秒,持續 spin
start_time = time.time()
while time.time() - start_time < 5.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
# 檢查收到的消息
print("\n--- 消息統計 ---")
total_messages = 0
for topic in ['position', 'attitude', 'velocity', 'battery', 'vfr_hud', 'mode', 'summary']:
count = test_sub.get_message_count(topic)
total_messages += count
print(f" {topic:15s}: {count:3d} 條消息")
print(f"\n總計收到: {total_messages} 條消息")
# 驗證
if total_messages > 0:
print("\n✓ 測試通過:成功接收到消息")
else:
print("\n✗ 測試失敗:沒有接收到任何消息")
# 停止
ros2_manager.stop()
test_sub.destroy_node()
def test_frequency_control():
"""測試頻率控制功能"""
print("\n" + "="*60)
print("測試 2: 頻率控制")
print("="*60)
# 清空 registry
vehicle_registry.clear()
# 建立測試載具
vehicle = setup_test_vehicle(sysid=1, socket_id=10)
# 初始化(如果還沒初始化)
if not ros2_manager.initialized:
ros2_manager.initialize()
# 建立測試訂閱者
test_sub = TestSubscriber(sysid=1)
# 啟動 publisher
ros2_manager.start()
print("\n--- 測試頻率控制,運行 3 秒 ---")
print("預期position/attitude 約 6 條 (2Hz)battery/mode/summary 約 3 條 (1Hz)")
# 運行 3 秒
start_time = time.time()
while time.time() - start_time < 3.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
# 檢查頻率
print("\n--- 頻率分析 ---")
# 2Hz topics (預期約 6 條)
print("2Hz Topics (預期 ~6 條):")
for topic in ['position', 'attitude', 'velocity', 'vfr_hud']:
count = test_sub.get_message_count(topic)
print(f" {topic:15s}: {count:3d}")
# 1Hz topics (預期約 3 條)
print("\n1Hz Topics (預期 ~3 條):")
for topic in ['battery', 'mode', 'summary']:
count = test_sub.get_message_count(topic)
print(f" {topic:15s}: {count:3d}")
print("\n✓ 頻率控制測試完成")
# 停止
ros2_manager.stop()
test_sub.destroy_node()
def test_multi_vehicle():
"""測試多載具發布"""
print("\n" + "="*60)
print("測試 3: 多載具發布")
print("="*60)
# 清空 registry
vehicle_registry.clear()
# 建立 3 個測試載具
v1 = setup_test_vehicle(sysid=1, socket_id=10)
v2 = setup_test_vehicle(sysid=2, socket_id=11)
v3 = setup_test_vehicle(sysid=3, socket_id=12)
# 修改各載具的位置以便區分
v2.components[1].status.position.latitude = 26.0
v3.components[1].status.position.latitude = 27.0
# 初始化
if not ros2_manager.initialized:
ros2_manager.initialize()
# 建立 3 個測試訂閱者
test_sub1 = TestSubscriber(sysid=1)
test_sub2 = TestSubscriber(sysid=2)
test_sub3 = TestSubscriber(sysid=3)
# 啟動 publisher
ros2_manager.start()
print("\n--- 測試多載具,運行 3 秒 ---")
# 運行 3 秒
start_time = time.time()
while time.time() - start_time < 3.0:
rclpy.spin_once(test_sub1, timeout_sec=0.05)
rclpy.spin_once(test_sub2, timeout_sec=0.05)
rclpy.spin_once(test_sub3, timeout_sec=0.05)
time.sleep(0.1)
# 檢查每個載具的消息
print("\n--- 各載具消息統計 ---")
for sysid, test_sub in [(1, test_sub1), (2, test_sub2), (3, test_sub3)]:
total = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys())
print(f"載具 {sysid}: {total:3d} 條消息")
# 檢查 summary 中的 socket_id
if test_sub.get_message_count('summary') > 0:
last_summary = test_sub.received_messages['summary'][-1]
data = json.loads(last_summary.data)
print(f" └─ socket_id={data['socket_id']}, lat={data['latitude']:.1f}")
print("\n✓ 多載具測試完成")
# 停止
ros2_manager.stop()
test_sub1.destroy_node()
test_sub2.destroy_node()
test_sub3.destroy_node()
def test_dynamic_vehicle():
"""測試動態新增/移除載具"""
print("\n" + "="*60)
print("測試 4: 動態載具管理")
print("="*60)
# 清空 registry
vehicle_registry.clear()
# 初始化
if not ros2_manager.initialized:
ros2_manager.initialize()
# 建立測試訂閱者
test_sub = TestSubscriber(sysid=1)
# 啟動 publisher
ros2_manager.start()
print("\n--- 階段 1: 無載具,運行 1 秒 ---")
start_time = time.time()
while time.time() - start_time < 1.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
count_before = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys())
print(f"收到消息: {count_before}")
# 新增載具
print("\n--- 階段 2: 新增載具,運行 2 秒 ---")
vehicle = setup_test_vehicle(sysid=1, socket_id=10)
start_time = time.time()
while time.time() - start_time < 2.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
count_after = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys())
print(f"收到消息: {count_after - count_before}")
# 移除載具
print("\n--- 階段 3: 移除載具,運行 1 秒 ---")
vehicle_registry.unregister(1)
start_time = time.time()
while time.time() - start_time < 1.0:
rclpy.spin_once(test_sub, timeout_sec=0.1)
time.sleep(0.1)
count_final = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys())
print(f"收到消息: {count_final - count_after} 條(應該為 0")
if count_final - count_after == 0:
print("\n✓ 動態載具管理測試通過")
else:
print("\n✗ 移除載具後仍收到消息")
# 停止
ros2_manager.stop()
test_sub.destroy_node()
def main():
"""主測試函數"""
print("\n" + "="*60)
print("VehicleStatusPublisher 測試程式")
print("="*60)
try:
# 執行各項測試
test_basic_publishing()
time.sleep(1)
# test_frequency_control()
# time.sleep(1)
# test_multi_vehicle()
# time.sleep(1)
# test_dynamic_vehicle()
print("\n" + "="*60)
print("所有測試完成!")
print("="*60)
except KeyboardInterrupt:
print("\n\n測試被中斷")
except Exception as e:
print(f"\n\n測試出錯: {e}")
import traceback
traceback.print_exc()
finally:
# 清理
if ros2_manager.initialized:
ros2_manager.shutdown()
vehicle_registry.clear()
print("\n清理完成")
if __name__ == '__main__':
main()
Loading…
Cancel
Save