#!/usr/bin/env python3 """ 獨立測試腳本 — 驗證 MissionExecutor + MavlinkSender 在 SITL 環境下的運作 使用方式: 1. 啟動 SITL 2. 修改下方 CONFIG 區塊 3. python3 test_mission.py """ import sys, os sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..')) import time from pymavlink import mavutil from PyQt6.QtWidgets import QApplication from PyQt6.QtCore import QTimer from mission_planner import FormationPlanner, MissionType from command_sender import MavlinkSender from mission_executor import MissionExecutor, MissionState # ================================================================================ # CONFIG # ================================================================================ # 接收用連線 (讀取無人機狀態) RECV_CONNECTION = "udp:127.0.0.1:14550" # 發送用連線 (發送 setpoint 指令) SEND_CONNECTION = "udpout:127.0.0.1:14550" # 要控制的無人機 sysid 列表 DRONE_SYSIDS = [1] # 起飛高度 (公尺) TAKEOFF_ALT = 10.0 # 任務規劃參數 FORMATION_SPACING = 5.0 BASE_ALTITUDE = 10.0 ALTITUDE_DIFF = 2.0 ARRIVAL_RADIUS = 2.0 # 測試模式: "formation" 或 "grid_sweep" TEST_MODE = "formation" # Grid Sweep 專用設定 GRID_LINE_SPACING = 5.0 # ================================================================================ class SITLDroneManager: """管理 SITL 無人機的連線、起飛前置作業""" def __init__(self, connection_string, sysids): self.connection_string = connection_string self.sysids = sysids self.mav = None self.drone_gps = {} def connect(self): """建立 MAVLink 連線並等待心跳""" print(f"連線到 {self.connection_string} ...") self.mav = mavutil.mavlink_connection(self.connection_string) self.mav.wait_heartbeat() print(f"已收到心跳: sysid={self.mav.target_system}, compid={self.mav.target_component}") def set_guided_and_arm(self, sysid): """切換到 GUIDED 模式並解鎖""" print(f"\n--- sysid={sysid}: 切換 GUIDED + 解鎖 ---") # 切換 GUIDED 模式 self.mav.mav.command_long_send( sysid, 1, mavutil.mavlink.MAV_CMD_DO_SET_MODE, 0, mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, 4, # GUIDED = 4 0, 0, 0, 0, 0 ) time.sleep(1) # 解鎖 self.mav.mav.command_long_send( sysid, 1, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, 1, 0, 0, 0, 0, 0, 0 ) time.sleep(1) print(f" sysid={sysid}: GUIDED + ARMED") def takeoff(self, sysid, altitude): """起飛到指定高度""" print(f" sysid={sysid}: 起飛到 {altitude}m ...") self.mav.mav.command_long_send( sysid, 1, mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, 0, 0, 0, 0, 0, 0, 0, altitude ) def wait_for_altitude(self, sysid, target_alt, timeout=30): """等待無人機到達指定高度""" print(f" sysid={sysid}: 等待到達 {target_alt}m ...") start = time.time() while time.time() - start < timeout: msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1) if msg and msg.get_srcSystem() == sysid: alt = msg.relative_alt / 1000.0 if alt >= target_alt * 0.9: print(f" sysid={sysid}: 已到達 {alt:.1f}m") return True print(f" sysid={sysid}: 等待超時!") return False def update_gps_once(self): """讀取一輪 GPS 資料更新 drone_gps""" deadline = time.time() + 3 received = set() while time.time() < deadline and len(received) < len(self.sysids): msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1) if msg is None: continue sid = msg.get_srcSystem() if sid in self.sysids: drone_id = f"s0_{sid}" self.drone_gps[drone_id] = { 'lat': msg.lat / 1e7, 'lon': msg.lon / 1e7, 'alt': msg.relative_alt / 1000.0 } received.add(sid) for sid in self.sysids: drone_id = f"s0_{sid}" if drone_id in self.drone_gps: gps = self.drone_gps[drone_id] print(f" {drone_id}: ({gps['lat']:.6f}, {gps['lon']:.6f}, {gps['alt']:.1f}m)") else: print(f" {drone_id}: 尚未收到 GPS") def start_gps_polling(self, interval_ms=200): """啟動定時 GPS 輪詢 (用 QTimer)""" self._gps_timer = QTimer() self._gps_timer.timeout.connect(self._poll_gps) self._gps_timer.start(interval_ms) def _poll_gps(self): """非阻塞方式讀取最新 GPS""" while True: msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=False) if msg is None: break sid = msg.get_srcSystem() if sid in self.sysids: drone_id = f"s0_{sid}" self.drone_gps[drone_id] = { 'lat': msg.lat / 1e7, 'lon': msg.lon / 1e7, 'alt': msg.relative_alt / 1000.0 } def main(): # 建立 Qt 應用 (MissionExecutor 需要 QTimer) app = QApplication(sys.argv) # 連線 + 起飛前置作業 manager = SITLDroneManager(RECV_CONNECTION, DRONE_SYSIDS) manager.connect() for sysid in DRONE_SYSIDS: manager.set_guided_and_arm(sysid) manager.takeoff(sysid, TAKEOFF_ALT) # 等待所有無人機到達起飛高度 for sysid in DRONE_SYSIDS: manager.wait_for_altitude(sysid, TAKEOFF_ALT) time.sleep(2) # 讀取當前 GPS 位置 print("\n讀取當前 GPS 位置 ...") manager.update_gps_once() drone_ids = [f"s0_{sid}" for sid in DRONE_SYSIDS] drone_gps_positions = [] for drone_id in drone_ids: gps = manager.drone_gps.get(drone_id) if gps is None: print(f"錯誤: 讀不到 {drone_id} 的 GPS") return drone_gps_positions.append((gps['lat'], gps['lon'], gps['alt'])) # 規劃任務 print(f"\n規劃任務 (模式: {TEST_MODE}) ...") planner = FormationPlanner( spacing=FORMATION_SPACING, base_altitude=BASE_ALTITUDE, altitude_diff=ALTITUDE_DIFF ) center_lat = drone_gps_positions[0][0] center_lon = drone_gps_positions[0][1] if TEST_MODE == "formation": target_lat = center_lat + 30.0 / 111000.0 target_lon = center_lon target_gps = (target_lat, target_lon, BASE_ALTITUDE) print(f" 目標點: ({target_lat:.6f}, {target_lon:.6f})") waypoints_per_drone, origin = planner.plan_formation_mission( drone_gps_positions, target_gps, MissionType.M_FORMATION ) elif TEST_MODE == "grid_sweep": # 在無人機前方 30m 處建立 40m x 30m 的矩形 offset_lat = 30.0 / 111000.0 half_w = 20.0 / (111000.0 * 0.9) half_h = 15.0 / 111000.0 rect_center_lat = center_lat + offset_lat rect_center_lon = center_lon rect_corners = [ (rect_center_lat - half_h, rect_center_lon - half_w), (rect_center_lat - half_h, rect_center_lon + half_w), (rect_center_lat + half_h, rect_center_lon + half_w), (rect_center_lat + half_h, rect_center_lon - half_w), ] target_gps = (rect_center_lat, rect_center_lon, BASE_ALTITUDE) print(f" 矩形中心: ({rect_center_lat:.6f}, {rect_center_lon:.6f})") waypoints_per_drone, origin = planner.plan_formation_mission( drone_gps_positions, target_gps, MissionType.GRID_SWEEP, params={ 'rect_corners': rect_corners, 'line_spacing': GRID_LINE_SPACING, 'altitude': BASE_ALTITUDE } ) else: print(f"未知測試模式: {TEST_MODE}") return planned_waypoints = { 'drone_ids': drone_ids, 'waypoints': waypoints_per_drone } # 印出規劃結果 for i, did in enumerate(drone_ids): wps = waypoints_per_drone[i] print(f" {did}: {len(wps)} 個航點") for j, wp in enumerate(wps): print(f" WP{j}: ({wp[0]:.6f}, {wp[1]:.6f}, {wp[2]:.1f}m)") # 啟動任務 print("\n啟動任務 ...") manager.start_gps_polling(interval_ms=200) sender = MavlinkSender(SEND_CONNECTION) executor = MissionExecutor( sender=sender, drone_gps=manager.drone_gps, arrival_radius=ARRIVAL_RADIUS, send_rate_hz=2.0 ) executor.drone_waypoint_reached.connect( lambda did, idx, total: print(f"\n >> {did} 到達 WP {idx}/{total}") ) executor.mission_completed.connect( lambda: (print("\n===== 任務全部完成 ====="), app.quit()) ) # 設定超時自動退出 timeout_timer = QTimer() timeout_timer.setSingleShot(True) timeout_timer.timeout.connect(lambda: ( print("\n⚠ 測試超時,強制退出"), executor.stop(), app.quit() )) timeout_timer.start(180_000) # 180 秒超時 executor.start(planned_waypoints) print("進入事件迴圈 (等待任務完成或 180 秒超時) ...\n") app.exec() executor.stop() sender.close() print("測試結束") if __name__ == "__main__": main()