You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
299 lines
9.5 KiB
Python
299 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
獨立測試腳本 — 驗證 MissionExecutor + MavlinkSender 在 SITL 環境下的運作
|
|
|
|
使用方式:
|
|
1. 啟動 SITL
|
|
2. 修改下方 CONFIG 區塊
|
|
3. python3 test_mission.py
|
|
"""
|
|
import sys, os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
|
|
|
|
import time
|
|
from pymavlink import mavutil
|
|
from PyQt6.QtWidgets import QApplication
|
|
from PyQt6.QtCore import QTimer
|
|
|
|
from mission_planner import FormationPlanner, MissionType
|
|
from command_sender import MavlinkSender
|
|
from mission_executor import MissionExecutor, MissionState
|
|
|
|
|
|
# ================================================================================
|
|
# CONFIG
|
|
# ================================================================================
|
|
|
|
# 接收用連線 (讀取無人機狀態)
|
|
RECV_CONNECTION = "udp:127.0.0.1:14550"
|
|
|
|
# 發送用連線 (發送 setpoint 指令)
|
|
SEND_CONNECTION = "udpout:127.0.0.1:14550"
|
|
|
|
# 要控制的無人機 sysid 列表
|
|
DRONE_SYSIDS = [1]
|
|
|
|
# 起飛高度 (公尺)
|
|
TAKEOFF_ALT = 10.0
|
|
|
|
# 任務規劃參數
|
|
FORMATION_SPACING = 5.0
|
|
BASE_ALTITUDE = 10.0
|
|
ALTITUDE_DIFF = 2.0
|
|
ARRIVAL_RADIUS = 2.0
|
|
|
|
# 測試模式: "formation" 或 "grid_sweep"
|
|
TEST_MODE = "formation"
|
|
|
|
# Grid Sweep 專用設定
|
|
GRID_LINE_SPACING = 5.0
|
|
|
|
# ================================================================================
|
|
|
|
|
|
class SITLDroneManager:
|
|
"""管理 SITL 無人機的連線、起飛前置作業"""
|
|
|
|
def __init__(self, connection_string, sysids):
|
|
self.connection_string = connection_string
|
|
self.sysids = sysids
|
|
self.mav = None
|
|
self.drone_gps = {}
|
|
|
|
def connect(self):
|
|
"""建立 MAVLink 連線並等待心跳"""
|
|
print(f"連線到 {self.connection_string} ...")
|
|
self.mav = mavutil.mavlink_connection(self.connection_string)
|
|
self.mav.wait_heartbeat()
|
|
print(f"已收到心跳: sysid={self.mav.target_system}, compid={self.mav.target_component}")
|
|
|
|
def set_guided_and_arm(self, sysid):
|
|
"""切換到 GUIDED 模式並解鎖"""
|
|
print(f"\n--- sysid={sysid}: 切換 GUIDED + 解鎖 ---")
|
|
|
|
# 切換 GUIDED 模式
|
|
self.mav.mav.command_long_send(
|
|
sysid, 1,
|
|
mavutil.mavlink.MAV_CMD_DO_SET_MODE,
|
|
0,
|
|
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
|
|
4, # GUIDED = 4
|
|
0, 0, 0, 0, 0
|
|
)
|
|
time.sleep(1)
|
|
|
|
# 解鎖
|
|
self.mav.mav.command_long_send(
|
|
sysid, 1,
|
|
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
|
|
0, 1, 0, 0, 0, 0, 0, 0
|
|
)
|
|
time.sleep(1)
|
|
print(f" sysid={sysid}: GUIDED + ARMED")
|
|
|
|
def takeoff(self, sysid, altitude):
|
|
"""起飛到指定高度"""
|
|
print(f" sysid={sysid}: 起飛到 {altitude}m ...")
|
|
self.mav.mav.command_long_send(
|
|
sysid, 1,
|
|
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
|
|
0, 0, 0, 0, 0, 0, 0, altitude
|
|
)
|
|
|
|
def wait_for_altitude(self, sysid, target_alt, timeout=30):
|
|
"""等待無人機到達指定高度"""
|
|
print(f" sysid={sysid}: 等待到達 {target_alt}m ...")
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
|
|
if msg and msg.get_srcSystem() == sysid:
|
|
alt = msg.relative_alt / 1000.0
|
|
if alt >= target_alt * 0.9:
|
|
print(f" sysid={sysid}: 已到達 {alt:.1f}m")
|
|
return True
|
|
print(f" sysid={sysid}: 等待超時!")
|
|
return False
|
|
|
|
def update_gps_once(self):
|
|
"""讀取一輪 GPS 資料更新 drone_gps"""
|
|
deadline = time.time() + 3
|
|
received = set()
|
|
while time.time() < deadline and len(received) < len(self.sysids):
|
|
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
|
|
if msg is None:
|
|
continue
|
|
sid = msg.get_srcSystem()
|
|
if sid in self.sysids:
|
|
drone_id = f"s0_{sid}"
|
|
self.drone_gps[drone_id] = {
|
|
'lat': msg.lat / 1e7,
|
|
'lon': msg.lon / 1e7,
|
|
'alt': msg.relative_alt / 1000.0
|
|
}
|
|
received.add(sid)
|
|
|
|
for sid in self.sysids:
|
|
drone_id = f"s0_{sid}"
|
|
if drone_id in self.drone_gps:
|
|
gps = self.drone_gps[drone_id]
|
|
print(f" {drone_id}: ({gps['lat']:.6f}, {gps['lon']:.6f}, {gps['alt']:.1f}m)")
|
|
else:
|
|
print(f" {drone_id}: 尚未收到 GPS")
|
|
|
|
def start_gps_polling(self, interval_ms=200):
|
|
"""啟動定時 GPS 輪詢 (用 QTimer)"""
|
|
self._gps_timer = QTimer()
|
|
self._gps_timer.timeout.connect(self._poll_gps)
|
|
self._gps_timer.start(interval_ms)
|
|
|
|
def _poll_gps(self):
|
|
"""非阻塞方式讀取最新 GPS"""
|
|
while True:
|
|
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=False)
|
|
if msg is None:
|
|
break
|
|
sid = msg.get_srcSystem()
|
|
if sid in self.sysids:
|
|
drone_id = f"s0_{sid}"
|
|
self.drone_gps[drone_id] = {
|
|
'lat': msg.lat / 1e7,
|
|
'lon': msg.lon / 1e7,
|
|
'alt': msg.relative_alt / 1000.0
|
|
}
|
|
|
|
|
|
def main():
|
|
# 建立 Qt 應用 (MissionExecutor 需要 QTimer)
|
|
app = QApplication(sys.argv)
|
|
|
|
# 連線 + 起飛前置作業
|
|
manager = SITLDroneManager(RECV_CONNECTION, DRONE_SYSIDS)
|
|
manager.connect()
|
|
|
|
for sysid in DRONE_SYSIDS:
|
|
manager.set_guided_and_arm(sysid)
|
|
manager.takeoff(sysid, TAKEOFF_ALT)
|
|
|
|
# 等待所有無人機到達起飛高度
|
|
for sysid in DRONE_SYSIDS:
|
|
manager.wait_for_altitude(sysid, TAKEOFF_ALT)
|
|
|
|
time.sleep(2)
|
|
|
|
# 讀取當前 GPS 位置
|
|
print("\n讀取當前 GPS 位置 ...")
|
|
manager.update_gps_once()
|
|
|
|
drone_ids = [f"s0_{sid}" for sid in DRONE_SYSIDS]
|
|
drone_gps_positions = []
|
|
for drone_id in drone_ids:
|
|
gps = manager.drone_gps.get(drone_id)
|
|
if gps is None:
|
|
print(f"錯誤: 讀不到 {drone_id} 的 GPS")
|
|
return
|
|
drone_gps_positions.append((gps['lat'], gps['lon'], gps['alt']))
|
|
|
|
# 規劃任務
|
|
print(f"\n規劃任務 (模式: {TEST_MODE}) ...")
|
|
planner = FormationPlanner(
|
|
spacing=FORMATION_SPACING,
|
|
base_altitude=BASE_ALTITUDE,
|
|
altitude_diff=ALTITUDE_DIFF
|
|
)
|
|
|
|
center_lat = drone_gps_positions[0][0]
|
|
center_lon = drone_gps_positions[0][1]
|
|
|
|
if TEST_MODE == "formation":
|
|
target_lat = center_lat + 30.0 / 111000.0
|
|
target_lon = center_lon
|
|
target_gps = (target_lat, target_lon, BASE_ALTITUDE)
|
|
print(f" 目標點: ({target_lat:.6f}, {target_lon:.6f})")
|
|
|
|
waypoints_per_drone, origin, _ = planner.plan_formation_mission(
|
|
drone_gps_positions, target_gps, MissionType.M_FORMATION
|
|
)
|
|
|
|
elif TEST_MODE == "grid_sweep":
|
|
# 在無人機前方 30m 處建立 40m x 30m 的矩形
|
|
offset_lat = 30.0 / 111000.0
|
|
half_w = 20.0 / (111000.0 * 0.9)
|
|
half_h = 15.0 / 111000.0
|
|
|
|
rect_center_lat = center_lat + offset_lat
|
|
rect_center_lon = center_lon
|
|
|
|
rect_corners = [
|
|
(rect_center_lat - half_h, rect_center_lon - half_w),
|
|
(rect_center_lat - half_h, rect_center_lon + half_w),
|
|
(rect_center_lat + half_h, rect_center_lon + half_w),
|
|
(rect_center_lat + half_h, rect_center_lon - half_w),
|
|
]
|
|
target_gps = (rect_center_lat, rect_center_lon, BASE_ALTITUDE)
|
|
print(f" 矩形中心: ({rect_center_lat:.6f}, {rect_center_lon:.6f})")
|
|
|
|
waypoints_per_drone, origin, _ = planner.plan_formation_mission(
|
|
drone_gps_positions, target_gps, MissionType.GRID_SWEEP,
|
|
params={
|
|
'rect_corners': rect_corners,
|
|
'line_spacing': GRID_LINE_SPACING,
|
|
'altitude': BASE_ALTITUDE
|
|
}
|
|
)
|
|
else:
|
|
print(f"未知測試模式: {TEST_MODE}")
|
|
return
|
|
|
|
planned_waypoints = {
|
|
'drone_ids': drone_ids,
|
|
'waypoints': waypoints_per_drone
|
|
}
|
|
|
|
# 印出規劃結果
|
|
for i, did in enumerate(drone_ids):
|
|
wps = waypoints_per_drone[i]
|
|
print(f" {did}: {len(wps)} 個航點")
|
|
for j, wp in enumerate(wps):
|
|
print(f" WP{j}: ({wp[0]:.6f}, {wp[1]:.6f}, {wp[2]:.1f}m)")
|
|
|
|
# 啟動任務
|
|
print("\n啟動任務 ...")
|
|
manager.start_gps_polling(interval_ms=200)
|
|
|
|
sender = MavlinkSender(SEND_CONNECTION)
|
|
executor = MissionExecutor(
|
|
sender=sender,
|
|
drone_gps=manager.drone_gps,
|
|
arrival_radius=ARRIVAL_RADIUS,
|
|
send_rate_hz=2.0
|
|
)
|
|
|
|
executor.drone_waypoint_reached.connect(
|
|
lambda did, idx, total: print(f"\n >> {did} 到達 WP {idx}/{total}")
|
|
)
|
|
executor.mission_completed.connect(
|
|
lambda: (print("\n===== 任務全部完成 ====="), app.quit())
|
|
)
|
|
|
|
# 設定超時自動退出
|
|
timeout_timer = QTimer()
|
|
timeout_timer.setSingleShot(True)
|
|
timeout_timer.timeout.connect(lambda: (
|
|
print("\n⚠ 測試超時,強制退出"),
|
|
executor.stop(),
|
|
app.quit()
|
|
))
|
|
timeout_timer.start(180_000) # 180 秒超時
|
|
|
|
executor.start(planned_waypoints)
|
|
|
|
print("進入事件迴圈 (等待任務完成或 180 秒超時) ...\n")
|
|
app.exec()
|
|
|
|
executor.stop()
|
|
sender.close()
|
|
print("測試結束")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |