You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/GUI/mission_executor.py

199 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""
任務執行模組
管理多架無人機的 GUIDED 模式飛行控制迴圈
設計:
- 每架無人機持有一個航點序列逐點推進
- 各自到達就各自切換到下一個航點
- QTimer 驅動 Qt 主線程執行
- 暫停 = 停止發送 setpoint飛控自動懸停
- 相容舊的 2 階段任務與新的多航點任務 (Grid Sweep)
"""
import math
from enum import Enum
from PyQt6.QtCore import QObject, QTimer, pyqtSignal
class MissionState(Enum):
"""整體任務狀態"""
IDLE = "idle"
RUNNING = "running"
PAUSED = "paused"
class DroneTask:
"""單架無人機的任務資料"""
__slots__ = ('drone_id', 'sysid', 'waypoints', 'wp_index', 'done')
def __init__(self, drone_id, sysid, waypoints):
"""
Args:
drone_id: GUI 用的 ID ( 's0_11')
sysid: MAVLink system ID ( 11)
waypoints: 航點序列 [(lat, lon, alt), ...]
"""
self.drone_id = drone_id
self.sysid = sysid
self.waypoints = waypoints
self.wp_index = 0
self.done = len(waypoints) == 0
@property
def current_target(self):
if self.done or self.wp_index >= len(self.waypoints):
return None
return self.waypoints[self.wp_index]
@property
def total_waypoints(self):
return len(self.waypoints)
class MissionExecutor(QObject):
"""
任務執行器
planned_waypoints 格式:
{
'drone_ids': ['s0_1', 's0_2', ...],
'waypoints': [
[(lat,lon,alt), ...], # drone 0
[(lat,lon,alt), ...], # drone 1
]
}
"""
# 信號
drone_waypoint_reached = pyqtSignal(str, int, int) # (drone_id, wp_index, total)
mission_completed = pyqtSignal()
def __init__(self, sender, drone_gps,
arrival_radius=2.0, send_rate_hz=2.0):
super().__init__()
self.sender = sender
self.drone_gps = drone_gps
self.arrival_radius = arrival_radius
self.state = MissionState.IDLE
self.tasks = {}
self._interval_ms = int(1000 / send_rate_hz)
self._timer = QTimer()
self._timer.timeout.connect(self._tick)
# ------------------------------------------------------------------ 公開方法
def start(self, planned_waypoints):
"""啟動任務"""
if self.state == MissionState.RUNNING:
print("任務已在執行中")
return
self.tasks.clear()
drone_ids = planned_waypoints['drone_ids']
waypoints_list = planned_waypoints['waypoints']
for i, drone_id in enumerate(drone_ids):
sysid = int(drone_id.split('_')[1])
self.tasks[drone_id] = DroneTask(drone_id, sysid, waypoints_list[i])
self.state = MissionState.RUNNING
self._timer.start(self._interval_ms)
total_wps = sum(t.total_waypoints for t in self.tasks.values())
print(f"任務啟動: {len(self.tasks)} 架無人機, "
f"{total_wps} 個航點, "
f"到達半徑={self.arrival_radius}m, "
f"發送週期={self._interval_ms}ms")
def pause(self):
"""暫停任務"""
if self.state == MissionState.RUNNING:
self._timer.stop()
self.state = MissionState.PAUSED
print("任務暫停")
def resume(self):
"""恢復任務"""
if self.state == MissionState.PAUSED:
self._timer.start(self._interval_ms)
self.state = MissionState.RUNNING
print("任務恢復")
def stop(self):
"""停止任務"""
self._timer.stop()
self.tasks.clear()
self.state = MissionState.IDLE
print("任務停止")
# ------------------------------------------------------------------ 控制迴圈
def _tick(self):
"""控制迴圈"""
all_done = True
for task in self.tasks.values():
if task.done:
continue
all_done = False
target = task.current_target
if target is None:
continue
# 讀取當前 GPS
gps = self.drone_gps.get(task.drone_id)
if gps is None:
continue
tgt_lat, tgt_lon, tgt_alt = target
distance = _haversine(gps['lat'], gps['lon'], tgt_lat, tgt_lon)
# 到達判定
if distance < self.arrival_radius:
task.wp_index += 1
if task.wp_index >= task.total_waypoints:
task.done = True
self.drone_waypoint_reached.emit(
task.drone_id, task.wp_index, task.total_waypoints
)
print(f" {task.drone_id} → DONE "
f"({task.total_waypoints}/{task.total_waypoints})")
continue
else:
self.drone_waypoint_reached.emit(
task.drone_id, task.wp_index, task.total_waypoints
)
print(f" {task.drone_id} → WP {task.wp_index}/{task.total_waypoints} "
f"(距離: {distance:.1f}m)")
# 更新目標
tgt_lat, tgt_lon, tgt_alt = task.current_target
# 發送 setpoint
self.sender.send_position_global(
task.sysid, tgt_lat, tgt_lon, tgt_alt
)
# 全部完成檢查
if all_done and self.tasks:
self._timer.stop()
self.state = MissionState.IDLE
self.mission_completed.emit()
print("===== 任務全部完成 =====")
# ------------------------------------------------------------------ 工具函式
def _haversine(lat1, lon1, lat2, lon2):
"""計算兩個 GPS 座標間的水平距離 (公尺)"""
R = 6371000.0
lat1_r = math.radians(lat1)
lat2_r = math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) ** 2 +
math.cos(lat1_r) * math.cos(lat2_r) * math.sin(dlon / 2) ** 2)
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))