#!/usr/bin/env python3 """ 指令發送模組 負責將目標位置轉成具體的通訊指令發送到飛控 目前使用 Ros2CommandSender,經 fc_network_adapter 的 /fc_network/vehicle/pos_global_int service 發送。 MavlinkSender 保留作為直連 SITL 的 fallback 工具,但已不是預設路徑。 """ import asyncio import traceback from abc import ABC, abstractmethod from PyQt6.QtCore import QObject, pyqtSignal from pymavlink import mavutil def _log(level, message): print(f"[{level}] {message}") class CommandSender(ABC): """指令發送抽象介面""" @abstractmethod def send_position_global(self, drone_id, sysid, lat, lon, alt): """ 發送全球座標位置指令 Args: drone_id: GUI 用的 drone 識別字串 (如 's0_11') sysid: 目標無人機的 MAVLink system ID lat: 緯度 (度) lon: 經度 (度) alt: 高度 (公尺) """ pass @abstractmethod def close(self): """關閉連線""" pass class MavlinkSender(CommandSender): """ MAVLink 直接發送 (驗證階段用) 透過 SET_POSITION_TARGET_GLOBAL_INT 發送目標位置 使用方式: sender = MavlinkSender("udpout:127.0.0.1:14550") sender.send_position_global(sysid=1, lat=24.123, lon=120.567, alt=10.0) """ # type_mask: 只使用位置 (lat, lon, alt) # 忽略速度 (bit 3,4,5)、加速度 (bit 6,7,8)、yaw (bit 10)、yaw_rate (bit 11) TYPE_MASK_POSITION_ONLY = ( 0b0000_1101_1111_1000 # = 0x0DF8 ) def __init__(self, connection_string="udpout:127.0.0.1:14550"): """ Args: connection_string: MAVLink 連線字串 SITL 範例: "udpout:127.0.0.1:14550" """ self.connection_string = connection_string self.mav = mavutil.mavlink_connection(connection_string) _log("INFO", f"MavlinkSender 已建立連線: {connection_string}") def send_position_global(self, drone_id, sysid, lat, lon, alt): """發送 SET_POSITION_TARGET_GLOBAL_INT。drone_id 未使用,保留為介面相容。""" self.mav.mav.set_position_target_global_int_send( 0, # time_boot_ms (不使用) sysid, # target_system 1, # target_component (autopilot) mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT_INT, self.TYPE_MASK_POSITION_ONLY, int(lat * 1e7), # lat_int int(lon * 1e7), # lon_int float(alt), # alt 0, 0, 0, # vx, vy, vz (忽略) 0, 0, 0, # afx, afy, afz (忽略) 0, 0 # yaw, yaw_rate (忽略) ) def close(self): """關閉 MAVLink 連線""" if self.mav: self.mav.close() self.mav = None _log("INFO", "MavlinkSender 已關閉") class Ros2CommandSender(QObject): """ 透過 fc_network_apps.PositionTargetGlobalIntClient 送 goto 指令。 設計重點: - send_position_global 是同步介面,立刻回傳。結果透過 send_result signal 回報 - 每台 drone 維護自己的 asyncio.Task pipeline,新的目標會 cancel 掉舊的 in-flight - client 由 DroneMonitor.get_or_create_position_client 管理,per-drone 獨立節點 """ # (drone_id, sysid, success, message) send_result = pyqtSignal(str, int, bool, str) DEFAULT_TIMEOUT_SEC = 2.0 def __init__(self, monitor, timeout_sec: float = DEFAULT_TIMEOUT_SEC): super().__init__() self._monitor = monitor self._timeout_sec = timeout_sec self._pending = {} # drone_id -> asyncio.Task def send_position_global(self, drone_id, sysid, lat, lon, alt): """同步介面。立刻回,結果透過 send_result signal 回報。""" # 取消該機上一個未完成的 task,避免舊指令覆蓋新指令 old = self._pending.get(drone_id) if old is not None and not old.done(): old.cancel() try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) task = loop.create_task( self._do_send(drone_id, int(sysid), float(lat), float(lon), float(alt)) ) self._pending[drone_id] = task async def _do_send(self, drone_id, sysid, lat, lon, alt): try: client = self._monitor.get_or_create_position_client(drone_id) if client is None: self.send_result.emit( drone_id, sysid, False, "PositionTargetGlobalIntClient 無法建立" ) return result = await client.goto_position_only_async( target_sysid=sysid, target_compid=0, latitude_deg=lat, longitude_deg=lon, alt=alt, timeout_sec=self._timeout_sec, ) self.send_result.emit( drone_id, sysid, bool(result.success), str(result.message or "") ) except asyncio.CancelledError: # 被新的目標取代,正常狀況,不用回報 pass except Exception as e: traceback.print_exc() self.send_result.emit(drone_id, sysid, False, f"exception: {e}") def close(self): """取消所有未完成的 task。PositionTargetGlobalIntClient 由 DroneMonitor 負責 destroy。""" for task in self._pending.values(): if not task.done(): task.cancel() self._pending.clear() _log("INFO", "Ros2CommandSender 已關閉")