diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkROS2Nodes.py b/src/fc_network_adapter/fc_network_adapter/mavlinkROS2Nodes.py new file mode 100644 index 0000000..496ce48 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkROS2Nodes.py @@ -0,0 +1,611 @@ +""" +MAVLink ROS2 Nodes +包含兩個獨立的 ROS2 Node: +1. VehicleStatusPublisher - 發布載具狀態到 ROS2 topics +2. MavlinkCommandService - 提供 MAVLink 指令 service 介面 + +從 vehicle_registry 讀取狀態數據,頻率控制,模組化設計 +""" + +import os +import time +import math +import threading +from typing import Dict, Optional + +# ROS2 imports +import rclpy +from rclpy.node import Node +from rclpy.executors import MultiThreadedExecutor + +# ROS2 Message imports +import std_msgs.msg +import sensor_msgs.msg +import geometry_msgs.msg +import mavros_msgs.msg + +# 自定義 imports +from . import mavlinkVehicleView as mvv +from .utils import setup_logger + +logger = setup_logger(os.path.basename(__file__)) + + +# ============================================================================ +# 頻率控制器 +# ============================================================================ + +class PublishRateController: + """發布頻率控制器 - 按時間間隔控制發布頻率""" + + def __init__(self): + # 各 topic 的發布間隔(秒) + self.topic_intervals = { + 'position': 0.5, # GPS位置 2Hz + 'attitude': 0.5, # 姿態 2Hz + 'velocity': 0.5, # 速度 2Hz + 'battery': 1.0, # 電池 1Hz + 'vfr_hud': 0.5, # VFR HUD 2Hz + 'mode': 1.0, # 飛行模式 1Hz + 'summary': 1.0, # 載具摘要 1Hz + } + # 記錄每個 topic 的最後發布時間 {(sysid, topic): timestamp} + self.last_publish_time: Dict[tuple, float] = {} + + def should_publish(self, sysid: int, topic: str) -> bool: + """ + 檢查是否應該發布此 topic + + Args: + sysid: 系統ID + topic: topic 名稱 + + Returns: + bool: True 表示應該發布 + """ + key = (sysid, topic) + now = time.time() + + # 首次發布 + if key not in self.last_publish_time: + self.last_publish_time[key] = now + return True + + # 檢查時間間隔 + interval = self.topic_intervals.get(topic, 1.0) + if now - self.last_publish_time[key] >= interval: + self.last_publish_time[key] = now + return True + + return False + + def reset(self): + """重置所有計時器""" + self.last_publish_time.clear() + + +# ============================================================================ +# VehicleStatusPublisher Node +# ============================================================================ + +class VehicleStatusPublisher(Node): + """ + 載具狀態發布者 - 從 vehicle_registry 讀取數據並發布到 ROS2 topics + + 職責: + - 定期從 vehicle_registry 讀取載具狀態 + - 頻率控制(位置/姿態 2Hz,電池/摘要 1Hz) + - 發布標準 ROS2 消息類型 + - 檢測訂閱者,按需發布 + """ + + def __init__(self): + super().__init__('vehicle_status_publisher') + + # 頻率控制器 + self.rate_controller = PublishRateController() + + # fc_publishers 字典 {(sysid, topic_name): publisher} + self.fc_publishers: Dict[tuple, any] = {} + + # 定時器:以較高頻率檢查 vehicle_registry 並發布 + # 10Hz 檢查頻率,但通過 rate_controller 控制實際發布頻率 + self.timer_period = 0.1 # 100ms + self.timer = self.create_timer(self.timer_period, self.timer_callback) + + # 狀態標誌 + self.running = True + + # logger.info("VehicleStatusPublisher initialized") + + def timer_callback(self): + """定時器回調 - 檢查所有載具並發布狀態""" + if not self.running: + return + + # 從 vehicle_registry 獲取所有載具 + all_vehicles = mvv.vehicle_registry.get_all() + + for sysid, vehicle in all_vehicles.items(): + self._publish_vehicle_status(vehicle) + + def _publish_vehicle_status(self, vehicle: mvv.VehicleView): + """ + 發布單個載具的所有狀態 + + Args: + vehicle: VehicleView 實例 + """ + sysid = vehicle.sysid + + # 假設只有一個 autopilot component (component_id=1) + component = vehicle.get_component(1) + if not component: + return + + status = component.status + + # 發布各種狀態(通過頻率控制器判斷是否發布) + self._publish_position(sysid, status) + self._publish_attitude(sysid, status) + self._publish_velocity(sysid, status) + self._publish_battery(sysid, status) + self._publish_vfr_hud(sysid, status) + self._publish_mode(sysid, status) + self._publish_summary(vehicle) + + def _get_or_create_publisher(self, sysid: int, topic: str, msg_type, qos: int = 1): + """ + 獲取或創建 publisher + + Args: + sysid: 系統ID + topic: topic 名稱 + msg_type: ROS2 消息類型 + qos: QoS 設置 + + Returns: + publisher 對象 + """ + key = (sysid, topic) + if key not in self.fc_publishers: + topic_name = f'/vehicle_status/sys{sysid}/{topic}' + publisher = self.create_publisher(msg_type, topic_name, qos) + self.fc_publishers[key] = publisher + logger.info(f"Created publisher: {topic_name}") + return self.fc_publishers[key] + + def _publish_position(self, sysid: int, status: mvv.ComponentStatus): + """發布 GPS 位置""" + if not self.rate_controller.should_publish(sysid, 'position'): + return + + pos = status.position + if pos.latitude is None or pos.longitude is None: + return + + publisher = self._get_or_create_publisher(sysid, 'position', sensor_msgs.msg.NavSatFix) + + # 檢查是否有訂閱者 + if publisher.get_subscription_count() == 0: + return + + msg = sensor_msgs.msg.NavSatFix() + msg.latitude = pos.latitude + msg.longitude = pos.longitude + msg.altitude = pos.altitude if pos.altitude is not None else 0.0 + + # GPS 狀態資訊 + gps = status.gps + if gps.fix_type is not None: + msg.status.status = gps.fix_type - 1 # MAVLink fix_type 轉 NavSatStatus + + publisher.publish(msg) + + def _publish_attitude(self, sysid: int, status: mvv.ComponentStatus): + """發布姿態(IMU)""" + if not self.rate_controller.should_publish(sysid, 'attitude'): + return + + att = status.attitude + if att.roll is None: + return + + publisher = self._get_or_create_publisher(sysid, 'attitude', sensor_msgs.msg.Imu) + + if publisher.get_subscription_count() == 0: + return + + msg = sensor_msgs.msg.Imu() + + # 歐拉角轉四元數 + qx, qy, qz, qw = self._euler_to_quaternion( + att.roll, att.pitch, att.yaw + ) + msg.orientation.x = qx + msg.orientation.y = qy + msg.orientation.z = qz + msg.orientation.w = qw + + # 角速度 + if att.rollspeed is not None: + msg.angular_velocity.x = att.rollspeed + msg.angular_velocity.y = att.pitchspeed + msg.angular_velocity.z = att.yawspeed + + publisher.publish(msg) + + def _publish_velocity(self, sysid: int, status: mvv.ComponentStatus): + """發布速度""" + if not self.rate_controller.should_publish(sysid, 'velocity'): + return + + vfr = status.vfr + if vfr.groundspeed is None: + return + + publisher = self._get_or_create_publisher(sysid, 'velocity', geometry_msgs.msg.TwistStamped) + + if publisher.get_subscription_count() == 0: + return + + msg = geometry_msgs.msg.TwistStamped() + msg.header.stamp = self.get_clock().now().to_msg() + + # 使用 VFR HUD 的地速和航向計算速度分量 + if vfr.heading is not None: + heading_rad = math.radians(vfr.heading) + msg.twist.linear.x = vfr.groundspeed * math.cos(heading_rad) + msg.twist.linear.y = vfr.groundspeed * math.sin(heading_rad) + + # 爬升率作為 z 軸速度 + if vfr.climb is not None: + msg.twist.linear.z = vfr.climb + + publisher.publish(msg) + + def _publish_battery(self, sysid: int, status: mvv.ComponentStatus): + """發布電池狀態""" + if not self.rate_controller.should_publish(sysid, 'battery'): + return + + bat = status.battery + if bat.voltage is None: + return + + publisher = self._get_or_create_publisher(sysid, 'battery', sensor_msgs.msg.BatteryState) + + if publisher.get_subscription_count() == 0: + return + + msg = sensor_msgs.msg.BatteryState() + msg.voltage = bat.voltage + + if bat.current is not None: + msg.current = bat.current + + if bat.remaining is not None: + msg.percentage = bat.remaining / 100.0 + + if bat.temperature is not None: + msg.temperature = bat.temperature + + publisher.publish(msg) + + def _publish_vfr_hud(self, sysid: int, status: mvv.ComponentStatus): + """發布 VFR HUD""" + if not self.rate_controller.should_publish(sysid, 'vfr_hud'): + return + + vfr = status.vfr + if vfr.airspeed is None: + return + + publisher = self._get_or_create_publisher(sysid, 'vfr_hud', mavros_msgs.msg.VfrHud) + + if publisher.get_subscription_count() == 0: + return + + msg = mavros_msgs.msg.VfrHud() + msg.airspeed = vfr.airspeed if vfr.airspeed is not None else 0.0 + msg.groundspeed = vfr.groundspeed if vfr.groundspeed is not None else 0.0 + msg.heading = vfr.heading if vfr.heading is not None else 0 + msg.throttle = float(vfr.throttle) if vfr.throttle is not None else 0.0 + msg.climb = vfr.climb if vfr.climb is not None else 0.0 + + # altitude 需要從 position 獲取 + if status.position.altitude is not None: + msg.altitude = status.position.altitude + + publisher.publish(msg) + + def _publish_mode(self, sysid: int, status: mvv.ComponentStatus): + """發布飛行模式""" + if not self.rate_controller.should_publish(sysid, 'mode'): + return + + mode = status.mode + if mode.mode_name is None: + return + + publisher = self._get_or_create_publisher(sysid, 'mode', std_msgs.msg.String) + + if publisher.get_subscription_count() == 0: + return + + msg = std_msgs.msg.String() + msg.data = mode.mode_name + publisher.publish(msg) + + def _publish_summary(self, vehicle: mvv.VehicleView): + """ + 發布載具摘要資訊(自定義格式,使用 String 暫時代替) + TODO: 未來可以定義專門的 VehicleSummary.msg + """ + sysid = vehicle.sysid + + if not self.rate_controller.should_publish(sysid, 'summary'): + return + + publisher = self._get_or_create_publisher(sysid, 'summary', std_msgs.msg.String) + + if publisher.get_subscription_count() == 0: + return + + # 獲取 autopilot component + component = vehicle.get_component(1) + if not component: + return + + status = component.status + + # 構建摘要資訊(JSON 格式字串) + import json + summary = { + 'sysid': sysid, + 'vehicle_type': vehicle.vehicle_type if vehicle.vehicle_type else 0, + 'autopilot': component.mav_autopilot if component.mav_autopilot else 0, + 'socket_id': vehicle.custom_meta.get('socket_id', -1), # 重要! + 'armed': status.armed if status.armed is not None else False, + 'mode_custom': status.mode.custom_mode if status.mode.custom_mode else 0, + 'mode_name': status.mode.mode_name if status.mode.mode_name else "UNKNOWN", + 'latitude': status.position.latitude if status.position.latitude else 0.0, + 'longitude': status.position.longitude if status.position.longitude else 0.0, + 'altitude': status.position.altitude if status.position.altitude else 0.0, + 'battery_percent': status.battery.remaining if status.battery.remaining else 0, + 'gps_fix': status.gps.fix_type if status.gps.fix_type else 0, + 'connection_type': vehicle.connected_via.value, + 'last_update': component.packet_stats.last_msg_time if component.packet_stats.last_msg_time else 0.0, + } + + msg = std_msgs.msg.String() + msg.data = json.dumps(summary) + publisher.publish(msg) + + @staticmethod + def _euler_to_quaternion(roll, pitch, yaw): + """ + 歐拉角轉四元數 + + Args: + roll: 橫滾角 (弧度) + pitch: 俯仰角 (弧度) + yaw: 偏航角 (弧度) + + Returns: + tuple: (qx, qy, qz, qw) + """ + qx = math.sin(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) - \ + math.cos(roll/2) * math.sin(pitch/2) * math.sin(yaw/2) + qy = math.cos(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + \ + math.sin(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) + qz = math.cos(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) - \ + math.sin(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + qw = math.cos(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) + \ + math.sin(roll/2) * math.sin(pitch/2) * math.sin(yaw/2) + return (qx, qy, qz, qw) + + def stop(self): + """停止發布""" + self.running = False + # logger.info("VehicleStatusPublisher stopped") + + +# ============================================================================ +# MavlinkCommandService Node +# ============================================================================ + +class MavlinkCommandService(Node): + """ + MAVLink 指令服務節點 - 提供 ROS2 service 介面來發送 MAVLink 指令 + + 職責: + - 作為 service server,等待 client 請求 + - 接收請求,組裝 MAVLink 封包 + - 調用 mavlinkObject 發送封包 + - 處理 ACK 等待和超時(未來實現) + + TODO: 稍後實現 + """ + + def __init__(self): + super().__init__('mavlink_command_service') + + logger.info("MavlinkCommandService initialized (not implemented yet)") + + def stop(self): + """停止服務""" + # logger.info("MavlinkCommandService stopped") + + +# ============================================================================ +# ROS2 節點管理器 +# ============================================================================ + +class fc_ros_manager: + """ + MAVLink ROS2 節點管理器 + + 管理兩個獨立的 ROS2 Node: + - VehicleStatusPublisher + - MavlinkCommandService + + 提供統一的啟動/停止介面給 mainOrchestrator + """ + + def __init__(self): + self.initialized = False + self.running = False + + # 两个 node 实例 + self.status_publisher: Optional[VehicleStatusPublisher] = None + self.command_service: Optional[MavlinkCommandService] = None + + # Executor & Thread + self.executor: Optional[MultiThreadedExecutor] = None + self.spin_thread: Optional[threading.Thread] = None + + def initialize(self): + """初始化 ROS2 环境和 nodes""" + if self.initialized: + logger.warning("fc_ros_manager already initialized") + return False + + try: + # 初始化 ROS2 + rclpy.init() + + # 創建節點 node + self.status_publisher = VehicleStatusPublisher() + self.command_service = MavlinkCommandService() + + # 創建執行者 MultiThreadedExecutor + self.executor = MultiThreadedExecutor() + self.executor.add_node(self.status_publisher) + self.executor.add_node(self.command_service) + + self.initialized = True + # logger.info("fc_ros_manager initialized") + return True + + except Exception as e: + logger.error(f"Failed to initialize fc_ros_manager: {e}") + return False + + def start(self): + """啟動 ROS2 nodes (在獨立執行緒中運行 executor) """ + if not self.initialized: + logger.error("fc_ros_manager initialize failed or not called") + return False + + if self.running: + logger.warning("fc_ros_manager already running") + return False + + try: + self.running = True + + self.spin_thread = threading.Thread( + target=self._spin_executor, + daemon=True, + name="ROS2ExecutorThread" + ) + self.spin_thread.start() + + logger.info("fc_ros_manager started <-") + return True + + except Exception as e: + logger.error(f"Failed to start fc_ros_manager: {e}") + self.running = False + return False + + def _spin_executor(self): + """在 thread 中運行的 executor""" + try: + # logger.info("ROS2 executor spinning...") + while self.running: + self.executor.spin_once(timeout_sec=0.1) + except Exception as e: + logger.error(f"fc_ros_manager error in spinning executor: {e}") + + def stop(self): + """停止 ROS2 nodes""" + if not self.running: + logger.warning("fc_ros_manager not running") + return False + + try: + logger.info("Stopping fc_ros_manager...") + + # 標記停止 + self.running = False + + # 停止各個 node + if self.status_publisher: + self.status_publisher.stop() + if self.command_service: + self.command_service.stop() + + # 等待 spin 執行緒結束 + if self.spin_thread and self.spin_thread.is_alive(): + self.spin_thread.join(timeout=2.0) + + logger.info("fc_ros_manager thread END!") + return True + + except Exception as e: + logger.error(f"Error stopping fc_ros_manager: {e}") + return False + + def shutdown(self): + """完全關閉並清理資源""" + if self.running: + self.stop() + + if self.initialized: + try: + # 銷毀 nodes + if self.status_publisher: + self.status_publisher.destroy_node() + if self.command_service: + self.command_service.destroy_node() + + # 關閉 ROS2 + if rclpy.ok(): + rclpy.shutdown() + + self.initialized = False + logger.info("fc_ros_manager Node END!") + + except Exception as e: + logger.error(f"Error during shutdown: {e}") + + def get_status(self) -> dict: + return { + 'initialized': self.initialized, + 'running': self.running, + 'status_publisher_active': self.status_publisher is not None and self.status_publisher.running, + 'command_service_active': self.command_service is not None, + } + + +# ============================================================================ +# 全域實例 +# ============================================================================ + +# 全域管理器實例(供 mainOrchestrator 使用) +ros2_manager = fc_ros_manager() + + +''' +================= 改版記錄 ============================ + +2026.01.20 +1. 重構自 mavlinkPublish.py +2. 實現 VehicleStatusPublisher - 從 vehicle_registry 讀取並發布狀態 +3. 添加頻率控制器 - 按需發布(2Hz 位置/姿態,1Hz 電池/摘要) +4. 預留 MavlinkCommandService 結構(稍後實現) +5. 提供 fc_ros_manager 統一管理介面 + +''' diff --git a/src/fc_network_adapter/tests/test_vehicleStatusPublisher.py b/src/fc_network_adapter/tests/test_vehicleStatusPublisher.py new file mode 100644 index 0000000..5c902bd --- /dev/null +++ b/src/fc_network_adapter/tests/test_vehicleStatusPublisher.py @@ -0,0 +1,498 @@ +""" +VehicleStatusPublisher 測試程式 + +測試從 vehicle_registry 讀取資料並發布到 ROS2 topics +""" + +import time +import json +import threading + +# ROS2 imports +import rclpy +from rclpy.node import Node + +# 標準 ROS2 消息類型 +import std_msgs.msg +import sensor_msgs.msg +import geometry_msgs.msg +import mavros_msgs.msg + +# 專案 imports +from ..fc_network_adapter.mavlinkROS2Nodes import ( + VehicleStatusPublisher, + fc_ros_manager, + ros2_manager +) +from ..fc_network_adapter.mavlinkVehicleView import ( + vehicle_registry, + ConnectionType, + ComponentType, +) + + +class TestSubscriber(Node): + """測試用的訂閱者節點 - 接收並記錄收到的消息""" + + def __init__(self, sysid: int = 1): + super().__init__(f'test_subscriber_sys{sysid}') + + self.sysid = sysid + self.received_messages = { + 'position': [], + 'attitude': [], + 'velocity': [], + 'battery': [], + 'vfr_hud': [], + 'mode': [], + 'summary': [], + } + + # 建立所有訂閱者 + self._create_subscriptions() + + print(f"[TestSubscriber] 已建立,監聽 sys{sysid} 的所有 topics") + + def _create_subscriptions(self): + """建立所有 topic 的訂閱者""" + base_topic = f'/vehicle_status/sys{self.sysid}' + + # Position + self.create_subscription( + sensor_msgs.msg.NavSatFix, + f'{base_topic}/position', + lambda msg: self._on_message('position', msg), + 10 + ) + + # Attitude + self.create_subscription( + sensor_msgs.msg.Imu, + f'{base_topic}/attitude', + lambda msg: self._on_message('attitude', msg), + 10 + ) + + # Velocity + self.create_subscription( + geometry_msgs.msg.TwistStamped, + f'{base_topic}/velocity', + lambda msg: self._on_message('velocity', msg), + 10 + ) + + # Battery + self.create_subscription( + sensor_msgs.msg.BatteryState, + f'{base_topic}/battery', + lambda msg: self._on_message('battery', msg), + 10 + ) + + # VFR HUD + self.create_subscription( + mavros_msgs.msg.VfrHud, + f'{base_topic}/vfr_hud', + lambda msg: self._on_message('vfr_hud', msg), + 10 + ) + + # Mode + self.create_subscription( + std_msgs.msg.String, + f'{base_topic}/mode', + lambda msg: self._on_message('mode', msg), + 10 + ) + + # Summary + self.create_subscription( + std_msgs.msg.String, + f'{base_topic}/summary', + lambda msg: self._on_message('summary', msg), + 10 + ) + + def _on_message(self, topic_name: str, msg): + """通用消息接收回調""" + self.received_messages[topic_name].append(msg) + print(f"[TestSubscriber] 收到 {topic_name}: {self._format_msg(topic_name, msg)}") + + def _format_msg(self, topic_name: str, msg) -> str: + """格式化消息以便顯示""" + if topic_name == 'position': + return f"lat={msg.latitude:.6f}, lon={msg.longitude:.6f}, alt={msg.altitude:.2f}m" + elif topic_name == 'attitude': + return f"quat=({msg.orientation.x:.3f}, {msg.orientation.y:.3f}, {msg.orientation.z:.3f}, {msg.orientation.w:.3f})" + elif topic_name == 'velocity': + return f"linear=({msg.twist.linear.x:.2f}, {msg.twist.linear.y:.2f}, {msg.twist.linear.z:.2f})" + elif topic_name == 'battery': + return f"voltage={msg.voltage:.2f}V, percent={msg.percentage*100:.1f}%" + elif topic_name == 'vfr_hud': + return f"airspeed={msg.airspeed:.2f}, groundspeed={msg.groundspeed:.2f}, heading={msg.heading}" + elif topic_name == 'mode': + return f"mode={msg.data}" + elif topic_name == 'summary': + try: + data = json.loads(msg.data) + return f"sysid={data['sysid']}, socket_id={data['socket_id']}, mode={data['mode_name']}" + except: + return msg.data + return str(msg) + + def get_message_count(self, topic_name: str) -> int: + """獲取收到的消息數量""" + return len(self.received_messages[topic_name]) + + def clear_messages(self): + """清空已收到的消息""" + for key in self.received_messages: + self.received_messages[key].clear() + + +def setup_test_vehicle(sysid: int = 1, socket_id: int = 10): + """ + 建立測試用的載具數據 + + Args: + sysid: 系統 ID + socket_id: Socket ID + """ + print(f"\n=== 建立測試載具 (sysid={sysid}, socket_id={socket_id}) ===") + + # 註冊載具 + vehicle = vehicle_registry.register(sysid) + vehicle.kind = "Copter" + vehicle.vehicle_type = 2 # MAV_TYPE_QUADROTOR + vehicle.connected_via = ConnectionType.UDP + vehicle.custom_meta['socket_id'] = socket_id + + # 新增 autopilot 組件 (component_id=1) + autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT) + autopilot.mav_type = 2 # MAV_TYPE_QUADROTOR + autopilot.mav_autopilot = 3 # MAV_AUTOPILOT_ARDUPILOTMEGA + + # 填充狀態資料 + status = autopilot.status + + # 位置 + status.position.latitude = 25.0330 + status.position.longitude = 121.5654 + status.position.altitude = 100.5 + status.position.relative_altitude = 50.0 + status.position.timestamp = time.time() + + # 姿態 + status.attitude.roll = 0.1 + status.attitude.pitch = -0.05 + status.attitude.yaw = 1.57 + status.attitude.rollspeed = 0.01 + status.attitude.pitchspeed = 0.02 + status.attitude.yawspeed = 0.03 + status.attitude.timestamp = time.time() + + # 飛行模式 + status.mode.base_mode = 89 + status.mode.custom_mode = 4 + status.mode.mode_name = "GUIDED" + status.mode.timestamp = time.time() + + # 電池 + status.battery.voltage = 12.6 + status.battery.current = 15.3 + status.battery.remaining = 75 + status.battery.temperature = 35.2 + status.battery.timestamp = time.time() + + # GPS + status.gps.fix_type = 3 # 3D fix + status.gps.satellites_visible = 12 + status.gps.eph = 100 + status.gps.epv = 150 + status.gps.timestamp = time.time() + + # VFR + status.vfr.airspeed = 5.5 + status.vfr.groundspeed = 6.0 + status.vfr.heading = 90 + status.vfr.throttle = 65 + status.vfr.climb = 1.2 + status.vfr.timestamp = time.time() + + # 系統狀態 + status.armed = True + status.system_status = 4 # MAV_STATE_ACTIVE + + # 更新封包統計 + autopilot.update_packet_stats(seq=10, msg_type=33, timestamp=time.time()) + + print(f"✓ 載具 {sysid} 已建立並填充測試數據") + return vehicle + + +def test_basic_publishing(): + """測試基本的發布功能""" + print("\n" + "="*60) + print("測試 1: 基本發布功能") + print("="*60) + + # 清空 registry + vehicle_registry.clear() + + # 建立測試載具 + vehicle = setup_test_vehicle(sysid=1, socket_id=10) + + # 初始化 ROS2 管理器 + if not ros2_manager.initialized: + ros2_manager.initialize() + + # 建立測試訂閱者 + test_sub = TestSubscriber(sysid=1) + + # 啟動 publisher + ros2_manager.start() + + print("\n--- 開始發布,等待 5 秒 ---") + + # 運行 5 秒,持續 spin + start_time = time.time() + while time.time() - start_time < 5.0: + rclpy.spin_once(test_sub, timeout_sec=0.1) + time.sleep(0.1) + + # 檢查收到的消息 + print("\n--- 消息統計 ---") + total_messages = 0 + for topic in ['position', 'attitude', 'velocity', 'battery', 'vfr_hud', 'mode', 'summary']: + count = test_sub.get_message_count(topic) + total_messages += count + print(f" {topic:15s}: {count:3d} 條消息") + + print(f"\n總計收到: {total_messages} 條消息") + + # 驗證 + if total_messages > 0: + print("\n✓ 測試通過:成功接收到消息") + else: + print("\n✗ 測試失敗:沒有接收到任何消息") + + # 停止 + ros2_manager.stop() + test_sub.destroy_node() + + +def test_frequency_control(): + """測試頻率控制功能""" + print("\n" + "="*60) + print("測試 2: 頻率控制") + print("="*60) + + # 清空 registry + vehicle_registry.clear() + + # 建立測試載具 + vehicle = setup_test_vehicle(sysid=1, socket_id=10) + + # 初始化(如果還沒初始化) + if not ros2_manager.initialized: + ros2_manager.initialize() + + # 建立測試訂閱者 + test_sub = TestSubscriber(sysid=1) + + # 啟動 publisher + ros2_manager.start() + + print("\n--- 測試頻率控制,運行 3 秒 ---") + print("預期:position/attitude 約 6 條 (2Hz),battery/mode/summary 約 3 條 (1Hz)") + + # 運行 3 秒 + start_time = time.time() + while time.time() - start_time < 3.0: + rclpy.spin_once(test_sub, timeout_sec=0.1) + time.sleep(0.1) + + # 檢查頻率 + print("\n--- 頻率分析 ---") + + # 2Hz topics (預期約 6 條) + print("2Hz Topics (預期 ~6 條):") + for topic in ['position', 'attitude', 'velocity', 'vfr_hud']: + count = test_sub.get_message_count(topic) + print(f" {topic:15s}: {count:3d} 條") + + # 1Hz topics (預期約 3 條) + print("\n1Hz Topics (預期 ~3 條):") + for topic in ['battery', 'mode', 'summary']: + count = test_sub.get_message_count(topic) + print(f" {topic:15s}: {count:3d} 條") + + print("\n✓ 頻率控制測試完成") + + # 停止 + ros2_manager.stop() + test_sub.destroy_node() + + +def test_multi_vehicle(): + """測試多載具發布""" + print("\n" + "="*60) + print("測試 3: 多載具發布") + print("="*60) + + # 清空 registry + vehicle_registry.clear() + + # 建立 3 個測試載具 + v1 = setup_test_vehicle(sysid=1, socket_id=10) + v2 = setup_test_vehicle(sysid=2, socket_id=11) + v3 = setup_test_vehicle(sysid=3, socket_id=12) + + # 修改各載具的位置以便區分 + v2.components[1].status.position.latitude = 26.0 + v3.components[1].status.position.latitude = 27.0 + + # 初始化 + if not ros2_manager.initialized: + ros2_manager.initialize() + + # 建立 3 個測試訂閱者 + test_sub1 = TestSubscriber(sysid=1) + test_sub2 = TestSubscriber(sysid=2) + test_sub3 = TestSubscriber(sysid=3) + + # 啟動 publisher + ros2_manager.start() + + print("\n--- 測試多載具,運行 3 秒 ---") + + # 運行 3 秒 + start_time = time.time() + while time.time() - start_time < 3.0: + rclpy.spin_once(test_sub1, timeout_sec=0.05) + rclpy.spin_once(test_sub2, timeout_sec=0.05) + rclpy.spin_once(test_sub3, timeout_sec=0.05) + time.sleep(0.1) + + # 檢查每個載具的消息 + print("\n--- 各載具消息統計 ---") + for sysid, test_sub in [(1, test_sub1), (2, test_sub2), (3, test_sub3)]: + total = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys()) + print(f"載具 {sysid}: {total:3d} 條消息") + + # 檢查 summary 中的 socket_id + if test_sub.get_message_count('summary') > 0: + last_summary = test_sub.received_messages['summary'][-1] + data = json.loads(last_summary.data) + print(f" └─ socket_id={data['socket_id']}, lat={data['latitude']:.1f}") + + print("\n✓ 多載具測試完成") + + # 停止 + ros2_manager.stop() + test_sub1.destroy_node() + test_sub2.destroy_node() + test_sub3.destroy_node() + + +def test_dynamic_vehicle(): + """測試動態新增/移除載具""" + print("\n" + "="*60) + print("測試 4: 動態載具管理") + print("="*60) + + # 清空 registry + vehicle_registry.clear() + + # 初始化 + if not ros2_manager.initialized: + ros2_manager.initialize() + + # 建立測試訂閱者 + test_sub = TestSubscriber(sysid=1) + + # 啟動 publisher + ros2_manager.start() + + print("\n--- 階段 1: 無載具,運行 1 秒 ---") + start_time = time.time() + while time.time() - start_time < 1.0: + rclpy.spin_once(test_sub, timeout_sec=0.1) + time.sleep(0.1) + + count_before = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys()) + print(f"收到消息: {count_before} 條") + + # 新增載具 + print("\n--- 階段 2: 新增載具,運行 2 秒 ---") + vehicle = setup_test_vehicle(sysid=1, socket_id=10) + + start_time = time.time() + while time.time() - start_time < 2.0: + rclpy.spin_once(test_sub, timeout_sec=0.1) + time.sleep(0.1) + + count_after = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys()) + print(f"收到消息: {count_after - count_before} 條") + + # 移除載具 + print("\n--- 階段 3: 移除載具,運行 1 秒 ---") + vehicle_registry.unregister(1) + + start_time = time.time() + while time.time() - start_time < 1.0: + rclpy.spin_once(test_sub, timeout_sec=0.1) + time.sleep(0.1) + + count_final = sum(test_sub.get_message_count(t) for t in test_sub.received_messages.keys()) + print(f"收到消息: {count_final - count_after} 條(應該為 0)") + + if count_final - count_after == 0: + print("\n✓ 動態載具管理測試通過") + else: + print("\n✗ 移除載具後仍收到消息") + + # 停止 + ros2_manager.stop() + test_sub.destroy_node() + + +def main(): + """主測試函數""" + print("\n" + "="*60) + print("VehicleStatusPublisher 測試程式") + print("="*60) + + try: + # 執行各項測試 + test_basic_publishing() + time.sleep(1) + + # test_frequency_control() + # time.sleep(1) + + # test_multi_vehicle() + # time.sleep(1) + + # test_dynamic_vehicle() + + print("\n" + "="*60) + print("所有測試完成!") + print("="*60) + + except KeyboardInterrupt: + print("\n\n測試被中斷") + except Exception as e: + print(f"\n\n測試出錯: {e}") + import traceback + traceback.print_exc() + finally: + # 清理 + if ros2_manager.initialized: + ros2_manager.shutdown() + vehicle_registry.clear() + print("\n清理完成") + + +if __name__ == '__main__': + main()