feat: 新增任務規劃系統 (M-Formation, Circle, Leader-Follower, Grid Sweep)

- mission_planner: 四種任務模式,Leader-Follower 支援 Bezier 曲線轉彎
- mission_executor: QTimer 驅動的多航點執行器
- command_sender: MAVLink setpoint 發送抽象層
- gui: 地圖任務模式選單 + 框選/路徑標記觸發
- map_layout: 任務模式下拉選單 + 確認路徑按鈕 + route bridge 信號
- verify_waypoints: 靜態航點圖 + 動畫模擬驗證 (Play/Pause/Reset)
chiyu
wenchun 2 months ago
parent 76b3b0f40c
commit 837b06dab5

@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""
指令發送模組
負責將目標位置轉成具體的通訊指令發送到飛控
現階段: MavlinkSender (直接 pymavlink 發送)
未來: 替換為 ROS2Sender (發到 ROS2 topic fc_network_adapter 轉發)
"""
from abc import ABC, abstractmethod
from pymavlink import mavutil
class CommandSender(ABC):
"""指令發送抽象介面"""
@abstractmethod
def send_position_global(self, sysid, lat, lon, alt):
"""
發送全球座標位置指令
Args:
sysid: 目標無人機的 MAVLink system ID
lat: 緯度 ()
lon: 經度 ()
alt: 高度 (公尺)
"""
pass
@abstractmethod
def close(self):
"""關閉連線"""
pass
class MavlinkSender(CommandSender):
"""
MAVLink 直接發送 (驗證階段用)
透過 SET_POSITION_TARGET_GLOBAL_INT 發送目標位置
使用方式:
sender = MavlinkSender("udpout:127.0.0.1:14550")
sender.send_position_global(sysid=1, lat=24.123, lon=120.567, alt=10.0)
"""
# type_mask: 只使用位置 (lat, lon, alt)
# 忽略速度 (bit 3,4,5)、加速度 (bit 6,7,8)、yaw (bit 10)、yaw_rate (bit 11)
TYPE_MASK_POSITION_ONLY = (
0b0000_1101_1111_1000 # = 0x0DF8
)
def __init__(self, connection_string="udpout:127.0.0.1:14550"):
"""
Args:
connection_string: MAVLink 連線字串
SITL 範例: "udpout:127.0.0.1:14550"
"""
self.connection_string = connection_string
self.mav = mavutil.mavlink_connection(connection_string)
print(f"MavlinkSender 已建立連線: {connection_string}")
def send_position_global(self, sysid, lat, lon, alt):
"""
發送 SET_POSITION_TARGET_GLOBAL_INT
注意:
- coordinate_frame 使用 MAV_FRAME_GLOBAL_RELATIVE_ALT_INT
高度是相對起飛點的高度 (公尺)
- 如果 FormationPlanner 產出的 alt AMSL 絕對高度
需要在外部先減去起飛點高度再傳入
"""
self.mav.mav.set_position_target_global_int_send(
0, # time_boot_ms (不使用)
sysid, # target_system
1, # target_component (autopilot)
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
self.TYPE_MASK_POSITION_ONLY,
int(lat * 1e7), # lat_int
int(lon * 1e7), # lon_int
float(alt), # alt
0, 0, 0, # vx, vy, vz (忽略)
0, 0, 0, # afx, afy, afz (忽略)
0, 0 # yaw, yaw_rate (忽略)
)
def close(self):
"""關閉 MAVLink 連線"""
if self.mav:
self.mav.close()
self.mav = None
print("MavlinkSender 已關閉")

File diff suppressed because it is too large Load Diff

@ -122,7 +122,8 @@ class DroneMap:
margin-bottom: 10px; margin-bottom: 10px;
padding-bottom: 10px; padding-bottom: 10px;
border-bottom: 1px solid #ddd; border-bottom: 1px solid #ddd;
} .selection-button-blue { }
.selection-button-blue {
width: 100%; width: 100%;
padding: 8px; padding: 8px;
background-color: #64B5F6; background-color: #64B5F6;
@ -139,10 +140,11 @@ class DroneMap:
} }
.selection-button-blue.active { .selection-button-blue.active {
background-color: #1976D2; background-color: #1976D2;
} .selection-button-blue { }
.selection-button {
width: 100%; width: 100%;
padding: 8px; padding: 8px;
background-color: #64B5F6; background-color: #F06A61;
color: white; color: white;
border: none; border: none;
border-radius: 4px; border-radius: 4px;
@ -151,16 +153,16 @@ class DroneMap:
font-weight: bold; font-weight: bold;
transition: background-color 0.2s; transition: background-color 0.2s;
} }
.selection-button-blue:hover { .selection-button:hover {
background-color: #42A5F5; background-color: #E53935;
} }
.selection-button-blue.active { .selection-button.active {
background-color: #1976D2; background-color: #D32F2F;
} }
.selection-button { .confirm-route-button {
width: 100%; width: 100%;
padding: 8px; padding: 8px;
background-color: #F06A61; background-color: #66BB6A;
color: white; color: white;
border: none; border: none;
border-radius: 4px; border-radius: 4px;
@ -169,11 +171,8 @@ class DroneMap:
font-weight: bold; font-weight: bold;
transition: background-color 0.2s; transition: background-color 0.2s;
} }
.selection-button:hover { .confirm-route-button:hover {
background-color: #E53935; background-color: #4CAF50;
}
.selection-button.active {
background-color: #D32F2F;
} }
</style> </style>
</head> </head>
@ -181,8 +180,17 @@ class DroneMap:
<div id="map"></div> <div id="map"></div>
<div class="map-controls"> <div class="map-controls">
<button class="selection-button" onclick="clearAllTrajectories()">清除軌跡</button> <button class="selection-button" onclick="clearAllTrajectories()">清除軌跡</button>
<button class="selection-button" id="select-rect-btn" onclick="toggleRectSelection()">框選方形區域</button> <div style="border-top: 1px solid #ddd; padding-top: 5px; margin-top: 5px;">
<button class="selection-button" id="select-polygon-btn" onclick="togglePolygonSelection()">多點選擇區域</button> <label style="font-size: 12px; color: #555; font-weight: bold;">任務模式</label>
<select id="mission-mode-select" onchange="onMissionModeChanged(this.value)" style="width: 100%; padding: 6px; border-radius: 4px; border: 1px solid #ccc; font-size: 12px; margin-top: 3px;">
<option value="M_FORMATION">列隊飛行</option>
<option value="CIRCLE_FORMATION">環狀包圍</option>
<option value="LEADER_FOLLOWER">跟隨模式</option>
<option value="GRID_SWEEP">柵狀偵查</option>
</select>
</div>
<button class="confirm-route-button" id="confirm-route-btn" onclick="confirmRoute()" style="display: none;">確認路徑</button>
<button class="selection-button" id="select-polygon-btn" onclick="togglePolygonSelection()">多點選擇區域 (開發中)</button>
</div> </div>
<div class="mission-info-panel"> <div class="mission-info-panel">
<div class="selection-buttons"> <div class="selection-buttons">
@ -229,8 +237,10 @@ class DroneMap:
// 地圖點擊事件 // 地圖點擊事件
map.on('click', function(e) { map.on('click', function(e) {
if (selectionMode === 'polygon') { if (selectionMode === 'polygon') {
// 多點選擇模式添加點
addPolygonPoint(e.latlng.lat, e.latlng.lng); addPolygonPoint(e.latlng.lat, e.latlng.lng);
} else if (selectionMode === 'route') {
// 跟隨模式添加路徑點
addRoutePoint(e.latlng.lat, e.latlng.lng);
} else if (!selectionMode) { } else if (!selectionMode) {
// 正常模式發送GPS信號 // 正常模式發送GPS信號
if (bridge) { if (bridge) {
@ -264,7 +274,6 @@ class DroneMap:
} }
if (isDrawing && (selectionMode === 'rect' || selectionMode === 'drones') && drawStartPoint) { if (isDrawing && (selectionMode === 'rect' || selectionMode === 'drones') && drawStartPoint) {
// 更新臨時矩形
if (tempRectangle) { if (tempRectangle) {
selectionLayer.removeLayer(tempRectangle); selectionLayer.removeLayer(tempRectangle);
} }
@ -286,7 +295,6 @@ class DroneMap:
drawStartPoint = null; drawStartPoint = null;
} }
// 重置拖移狀態
clickStartPos = null; clickStartPos = null;
setTimeout(function() { setTimeout(function() {
mapDragStarted = false; mapDragStarted = false;
@ -309,22 +317,22 @@ class DroneMap:
</div> </div>
`, `,
iconSize: [30, 30], iconSize: [30, 30],
iconAnchor: [15, 15] // 箭頭往左上移使 ID 顯示在右下 iconAnchor: [15, 15]
}); });
} }
function getColorBySocketId(id) { function getColorBySocketId(id) {
if (id.startsWith("s0_")) return "#00BFFF"; // 天藍色 if (id.startsWith("s0_")) return "#00BFFF";
if (id.startsWith("s1_")) return "#FFD700"; // 金色 if (id.startsWith("s1_")) return "#FFD700";
if (id.startsWith("s2_")) return "#FF6969"; // 淺紅色 if (id.startsWith("s2_")) return "#FF6969";
if (id.startsWith("s3_")) return "#FF69B4"; // 熱粉紅 if (id.startsWith("s3_")) return "#FF69B4";
if (id.startsWith("s4_")) return "#00FA9A"; // 中春綠 if (id.startsWith("s4_")) return "#00FA9A";
if (id.startsWith("s5_")) return "#9370DB"; // 中紫色 (串口) if (id.startsWith("s5_")) return "#9370DB";
if (id.startsWith("s6_")) return "#FFA500"; // 橙色 if (id.startsWith("s6_")) return "#FFA500";
if (id.startsWith("s7_")) return "#20B2AA"; // 淺海綠 if (id.startsWith("s7_")) return "#20B2AA";
if (id.startsWith("s8_")) return "#7CFC00"; // 草綠色 if (id.startsWith("s8_")) return "#7CFC00";
if (id.startsWith("s9_")) return "#FF8C00"; // 深橙色 if (id.startsWith("s9_")) return "#FF8C00";
return "#AAAAAA"; // 灰色 (預設) return "#AAAAAA";
} }
function createIdIcon(id) { function createIdIcon(id) {
@ -345,7 +353,7 @@ class DroneMap:
text-shadow: 1px 1px 2px rgba(255,255,255,0.8), -1px -1px 2px rgba(255,255,255,0.8); text-shadow: 1px 1px 2px rgba(255,255,255,0.8), -1px -1px 2px rgba(255,255,255,0.8);
">${sysid}</div>`, ">${sysid}</div>`,
iconSize: [16, 16], iconSize: [16, 16],
iconAnchor: [8, 6] // icon 中心 = 經緯度點 iconAnchor: [8, 6]
}); });
} }
@ -358,25 +366,31 @@ class DroneMap:
var trajectoryGroup = L.layerGroup().addTo(map); var trajectoryGroup = L.layerGroup().addTo(map);
// ================================================================================ // ================================================================================
// 新增任務規劃視覺化變數 // 任務規劃視覺化變數
// ================================================================================ // ================================================================================
var missionPlanGroup = L.layerGroup().addTo(map); // 任務規劃圖層 var missionPlanGroup = L.layerGroup().addTo(map);
var centerMarker = null; // 中心點標記 var centerMarker = null;
var targetMarker = null; // 目標點標記 var targetMarker = null;
var missionLine = null; // 中心點到目標點的虛線 var missionLine = null;
var centerPosition = null; // 中心點經緯度 var centerPosition = null;
var targetPosition = null; // 目標點經緯度 var targetPosition = null;
// 選擇工具變量 // 選擇工具變量
var selectionMode = null; // 'drones', 'rect', 'polygon', null var selectionMode = null; // 'drones', 'rect', 'polygon', 'route', null
var selectionLayer = L.layerGroup().addTo(map); var selectionLayer = L.layerGroup().addTo(map);
var polygonPoints = []; // 多點選擇的點 var polygonPoints = [];
var polygonMarkers = []; // 多點選擇的標記 var polygonMarkers = [];
var tempRectangle = null; // 臨時矩形 var tempRectangle = null;
var isDrawing = false; // 是否正在繪製 var isDrawing = false;
var drawStartPoint = null; // 繪製起點 var drawStartPoint = null;
var mapDragStarted = false; // 地圖是否正在拖移 var mapDragStarted = false;
var clickStartPos = null; // 記錄點擊開始位置 var clickStartPos = null;
// 路徑標記變量 (跟隨模式用)
var routePoints = [];
var routeMarkers = [];
var routePolyline = null;
var routeLayer = L.layerGroup().addTo(map);
// ================================================================================ // ================================================================================
// 更新任務信息面板 // 更新任務信息面板
@ -397,7 +411,6 @@ class DroneMap:
targetElem.textContent = '未設定'; targetElem.textContent = '未設定';
} }
// 只有當中心點和目標點都設定時才啟用按鈕
if (centerPosition && targetPosition) { if (centerPosition && targetPosition) {
startBtn.disabled = false; startBtn.disabled = false;
} else { } else {
@ -409,7 +422,6 @@ class DroneMap:
// 選擇工具函數 // 選擇工具函數
// ================================================================================ // ================================================================================
function toggleSelectAllDrones() { function toggleSelectAllDrones() {
// 切換全選/取消全選無人機
if (bridge) { if (bridge) {
bridge.toggleSelectAllDrones(); bridge.toggleSelectAllDrones();
console.log('切換全選無人機'); console.log('切換全選無人機');
@ -429,34 +441,18 @@ class DroneMap:
} }
} }
function toggleRectSelection() {
clearSelectionMode();
if (selectionMode === 'rect') {
selectionMode = null;
document.getElementById('select-rect-btn').classList.remove('active');
map.dragging.enable();
} else {
selectionMode = 'rect';
document.getElementById('select-rect-btn').classList.add('active');
map.dragging.disable();
}
}
function togglePolygonSelection() { function togglePolygonSelection() {
if (selectionMode === 'polygon') { if (selectionMode === 'polygon') {
// 第二次點擊完成選擇
if (polygonPoints.length >= 3) { if (polygonPoints.length >= 3) {
finishPolygonSelection(); finishPolygonSelection();
} else { } else {
alert('至少需要3個點來形成區域'); alert('至少需要3個點來形成區域');
// 清除並重置
clearSelectionMode(); clearSelectionMode();
clearPolygonPoints(); clearPolygonPoints();
selectionMode = null; selectionMode = null;
document.getElementById('select-polygon-btn').classList.remove('active'); document.getElementById('select-polygon-btn').classList.remove('active');
} }
} else { } else {
// 第一次點擊清除上次的點位並啟動模式
clearSelectionMode(); clearSelectionMode();
clearPolygonPoints(); clearPolygonPoints();
selectionMode = 'polygon'; selectionMode = 'polygon';
@ -466,23 +462,25 @@ class DroneMap:
} }
function clearSelectionMode() { function clearSelectionMode() {
// 清除所有按鈕的激活狀態
document.getElementById('select-drones-btn').classList.remove('active'); document.getElementById('select-drones-btn').classList.remove('active');
document.getElementById('select-rect-btn').classList.remove('active');
document.getElementById('select-polygon-btn').classList.remove('active'); document.getElementById('select-polygon-btn').classList.remove('active');
// 清除選擇圖層
selectionLayer.clearLayers(); selectionLayer.clearLayers();
tempRectangle = null; tempRectangle = null;
// 重新啟用地圖拖曳 // 不清除 routeLayer clearRoutePoints 單獨管理
map.dragging.enable(); map.dragging.enable();
// 如果離開 route 模式重置 selectionMode
if (selectionMode !== 'route') {
selectionMode = null;
}
} }
function addPolygonPoint(lat, lng) { function addPolygonPoint(lat, lng) {
polygonPoints.push([lat, lng]); polygonPoints.push([lat, lng]);
// 添加邊界點標記
var marker = L.circleMarker([lat, lng], { var marker = L.circleMarker([lat, lng], {
radius: 5, radius: 5,
color: '#FF6B6B', color: '#FF6B6B',
@ -491,7 +489,6 @@ class DroneMap:
}).addTo(selectionLayer); }).addTo(selectionLayer);
polygonMarkers.push(marker); polygonMarkers.push(marker);
// 如果有多個點繪製連線
if (polygonPoints.length > 1) { if (polygonPoints.length > 1) {
L.polyline(polygonPoints, { L.polyline(polygonPoints, {
color: '#FF6B6B', color: '#FF6B6B',
@ -516,7 +513,6 @@ class DroneMap:
return; return;
} }
// 繪製最終多邊形
var polygon = L.polygon(polygonPoints, { var polygon = L.polygon(polygonPoints, {
color: '#FF6B6B', color: '#FF6B6B',
fillColor: '#FF6B6B', fillColor: '#FF6B6B',
@ -524,14 +520,12 @@ class DroneMap:
weight: 2 weight: 2
}).addTo(selectionLayer); }).addTo(selectionLayer);
// 發送多邊形數據到Python
if (bridge) { if (bridge) {
var pointsStr = JSON.stringify(polygonPoints); var pointsStr = JSON.stringify(polygonPoints);
bridge.polygonSelected(pointsStr); bridge.polygonSelected(pointsStr);
console.log('多邊形選擇完成:', polygonPoints); console.log('多邊形選擇完成:', polygonPoints);
} }
// 重置狀態
selectionMode = null; selectionMode = null;
document.getElementById('select-polygon-btn').classList.remove('active'); document.getElementById('select-polygon-btn').classList.remove('active');
map.dragging.enable(); map.dragging.enable();
@ -540,14 +534,12 @@ class DroneMap:
function finishRectSelection(bounds) { function finishRectSelection(bounds) {
var selectedDrones = []; var selectedDrones = [];
// 清除臨時矩形
if (tempRectangle) { if (tempRectangle) {
selectionLayer.removeLayer(tempRectangle); selectionLayer.removeLayer(tempRectangle);
tempRectangle = null; tempRectangle = null;
} }
if (selectionMode === 'drones') { if (selectionMode === 'drones') {
// 框選無人機模式先清除全選
if (bridge) { if (bridge) {
bridge.clearAllDroneSelection(); bridge.clearAllDroneSelection();
} }
@ -556,17 +548,14 @@ class DroneMap:
var pos = markers[droneId].getLatLng(); var pos = markers[droneId].getLatLng();
if (bounds.contains(pos)) { if (bounds.contains(pos)) {
selectedDrones.push(droneId); selectedDrones.push(droneId);
// 觸發無人機點擊信號
if (bridge) { if (bridge) {
bridge.emitDroneClicked(droneId); bridge.emitDroneClicked(droneId);
} }
} }
}); });
console.log('框選無人機:', selectedDrones); console.log('框選無人機:', selectedDrones);
// 不保留選擇框直接完成
} else if (selectionMode === 'rect') { } else if (selectionMode === 'rect') {
// 框選區域模式
var rectPoints = [ var rectPoints = [
[bounds.getNorth(), bounds.getWest()], [bounds.getNorth(), bounds.getWest()],
[bounds.getNorth(), bounds.getEast()], [bounds.getNorth(), bounds.getEast()],
@ -574,7 +563,6 @@ class DroneMap:
[bounds.getSouth(), bounds.getWest()] [bounds.getSouth(), bounds.getWest()]
]; ];
// 繪製最終矩形
L.rectangle(bounds, { L.rectangle(bounds, {
color: '#FF6B6B', color: '#FF6B6B',
fillColor: '#FF6B6B', fillColor: '#FF6B6B',
@ -582,7 +570,6 @@ class DroneMap:
weight: 2 weight: 2
}).addTo(selectionLayer); }).addTo(selectionLayer);
// 添加四個角的標記
rectPoints.forEach(point => { rectPoints.forEach(point => {
L.circleMarker(point, { L.circleMarker(point, {
radius: 5, radius: 5,
@ -592,7 +579,6 @@ class DroneMap:
}).addTo(selectionLayer); }).addTo(selectionLayer);
}); });
// 發送矩形數據到Python
if (bridge) { if (bridge) {
var pointsStr = JSON.stringify(rectPoints); var pointsStr = JSON.stringify(rectPoints);
bridge.rectangleSelected(pointsStr); bridge.rectangleSelected(pointsStr);
@ -603,8 +589,90 @@ class DroneMap:
// 重置狀態 // 重置狀態
selectionMode = null; selectionMode = null;
document.getElementById('select-drones-btn').classList.remove('active'); document.getElementById('select-drones-btn').classList.remove('active');
document.getElementById('select-rect-btn').classList.remove('active');
map.dragging.enable(); map.dragging.enable();
// 如果仍在 Grid Sweep 模式重新進入框選
var currentMode = document.getElementById('mission-mode-select').value;
if (currentMode === 'GRID_SWEEP') {
setTimeout(function() {
selectionMode = 'rect';
map.dragging.disable();
console.log('Grid Sweep: 重新進入框選模式');
}, 500);
}
}
// ================================================================================
// 路徑標記函數 (跟隨模式用)
// ================================================================================
function addRoutePoint(lat, lng) {
routePoints.push([lat, lng]);
var idx = routePoints.length;
// 添加編號標記
var icon = L.divIcon({
className: 'route-point',
html: '<div style="' +
'background-color: #FF5722;' +
'color: white;' +
'width: 22px;' +
'height: 22px;' +
'border-radius: 50%;' +
'display: flex;' +
'align-items: center;' +
'justify-content: center;' +
'font-weight: bold;' +
'font-size: 11px;' +
'border: 2px solid white;' +
'box-shadow: 0 2px 5px rgba(0,0,0,0.3);' +
'">' + idx + '</div>',
iconSize: [22, 22],
iconAnchor: [11, 11]
});
var marker = L.marker([lat, lng], {
icon: icon,
zIndexOffset: 3000
}).addTo(routeLayer);
routeMarkers.push(marker);
// 更新連線
if (routePolyline) {
routeLayer.removeLayer(routePolyline);
}
if (routePoints.length > 1) {
routePolyline = L.polyline(routePoints, {
color: '#FF5722',
weight: 2.5,
opacity: 0.8,
dashArray: '8, 6'
}).addTo(routeLayer);
}
console.log('添加路徑點 #' + idx + ':', lat, lng);
}
function clearRoutePoints() {
routePoints = [];
routeMarkers = [];
if (routePolyline) {
routeLayer.removeLayer(routePolyline);
routePolyline = null;
}
routeLayer.clearLayers();
}
function confirmRoute() {
if (routePoints.length < 2) {
alert('至少需要 2 個路徑點');
return;
}
if (bridge) {
var pointsStr = JSON.stringify(routePoints);
bridge.routeConfirmed(pointsStr);
console.log('路徑確認:', routePoints.length, '個點');
}
} }
// ================================================================================ // ================================================================================
@ -631,6 +699,36 @@ class DroneMap:
console.log('暫停任務'); console.log('暫停任務');
} }
} }
// ================================================================================
// 任務模式切換
// ================================================================================
function onMissionModeChanged(mode) {
// 先清除選擇狀態
selectionMode = null;
clearSelectionMode();
clearRoutePoints();
// 隱藏確認路徑按鈕
document.getElementById('confirm-route-btn').style.display = 'none';
if (mode === 'GRID_SWEEP') {
// 自動進入框選模式
selectionMode = 'rect';
map.dragging.disable();
console.log('Grid Sweep: 進入框選模式');
} else if (mode === 'LEADER_FOLLOWER') {
// 進入路徑標記模式
selectionMode = 'route';
document.getElementById('confirm-route-btn').style.display = 'block';
console.log('跟隨模式: 進入路徑標記模式,點擊地圖添加路徑點');
}
if (bridge) {
bridge.missionModeChanged(mode);
console.log('任務模式切換:', mode);
}
}
// ================================================================================ // ================================================================================
function initTrajectory(id) { function initTrajectory(id) {
@ -696,7 +794,7 @@ class DroneMap:
}) })
.on('click', function () { .on('click', function () {
if (mapDragStarted) { if (mapDragStarted) {
return; // 如果是拖移地圖不處理點擊 return;
} }
if (bridge) { if (bridge) {
bridge.emitDroneClicked(id); bridge.emitDroneClicked(id);
@ -711,7 +809,7 @@ class DroneMap:
}) })
.on('click', function() { .on('click', function() {
if (mapDragStarted) { if (mapDragStarted) {
return; // 如果是拖移地圖不處理點擊 return;
} }
if (bridge) { if (bridge) {
bridge.emitDroneClicked(id); bridge.emitDroneClicked(id);
@ -722,7 +820,7 @@ class DroneMap:
if (!initialized || id < focusedId) { if (!initialized || id < focusedId) {
focusedId = id; focusedId = id;
map.setView([lat, lon], 19); // 第一台無人機重置並放大到最大 map.setView([lat, lon], 19);
initialized = true; initialized = true;
} }
} }
@ -738,18 +836,15 @@ class DroneMap:
} }
// ================================================================================ // ================================================================================
// 新增任務規劃視覺化函式 // 任務規劃視覺化函式
// ================================================================================ // ================================================================================
function drawMissionPlan(centerLat, centerLon, targetLat, targetLon) { function drawMissionPlan(centerLat, centerLon, targetLat, targetLon) {
// 清除舊的任務規劃標記
clearMissionPlan(); clearMissionPlan();
// 保存中心點和目標點位置
centerPosition = {lat: centerLat, lng: centerLon}; centerPosition = {lat: centerLat, lng: centerLon};
targetPosition = {lat: targetLat, lng: targetLon}; targetPosition = {lat: targetLat, lng: targetLon};
updateMissionInfo(); updateMissionInfo();
// 繪製中心點標記 "C"縮小到 20px
var centerIcon = L.divIcon({ var centerIcon = L.divIcon({
className: 'mission-center', className: 'mission-center',
html: `<div style=" html: `<div style="
@ -775,7 +870,6 @@ class DroneMap:
zIndexOffset: 2000 zIndexOffset: 2000
}).addTo(missionPlanGroup); }).addTo(missionPlanGroup);
// 繪製目標點標記 ""星星符號
var targetIcon = L.divIcon({ var targetIcon = L.divIcon({
className: 'mission-target', className: 'mission-target',
html: `<div style=" html: `<div style="
@ -805,19 +899,16 @@ class DroneMap:
} }
function clearMissionPlan() { function clearMissionPlan() {
// 清除中心點標記
if (centerMarker) { if (centerMarker) {
missionPlanGroup.removeLayer(centerMarker); missionPlanGroup.removeLayer(centerMarker);
centerMarker = null; centerMarker = null;
} }
// 清除目標點標記
if (targetMarker) { if (targetMarker) {
missionPlanGroup.removeLayer(targetMarker); missionPlanGroup.removeLayer(targetMarker);
targetMarker = null; targetMarker = null;
} }
// 清除位置資訊
centerPosition = null; centerPosition = null;
targetPosition = null; targetPosition = null;
updateMissionInfo(); updateMissionInfo();
@ -854,7 +945,6 @@ class DroneMap:
if not self.map_loaded or not self.pending_map_updates: if not self.map_loaded or not self.pending_map_updates:
return return
# 批量執行所有待更新的位置
js_commands = [] js_commands = []
for drone_id, (lat, lon, heading) in self.pending_map_updates.items(): for drone_id, (lat, lon, heading) in self.pending_map_updates.items():
js_commands.append(f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});") js_commands.append(f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});")
@ -863,7 +953,6 @@ class DroneMap:
combined_js = "\n".join(js_commands) combined_js = "\n".join(js_commands)
self.map_view.page().runJavaScript(combined_js) self.map_view.page().runJavaScript(combined_js)
# 清空待更新緩存
self.pending_map_updates.clear() self.pending_map_updates.clear()
def clear_trajectories(self): def clear_trajectories(self):
@ -877,18 +966,10 @@ class DroneMap:
self.map_view.page().runJavaScript(f"focusOn('{drone_id}');") self.map_view.page().runJavaScript(f"focusOn('{drone_id}');")
# ================================================================================ # ================================================================================
# 【新增】任務規劃視覺化方法 # 任務規劃視覺化方法
# ================================================================================ # ================================================================================
def draw_mission_plan(self, center_lat, center_lon, target_lat, target_lon): def draw_mission_plan(self, center_lat, center_lon, target_lat, target_lon):
""" """在地圖上繪製任務規劃"""
在地圖上繪製任務規劃
Args:
center_lat: 中心點緯度
center_lon: 中心點經度
target_lat: 目標點緯度
target_lon: 目標點經度
"""
if self.map_loaded: if self.map_loaded:
js_code = f"drawMissionPlan({center_lat:.6f}, {center_lon:.6f}, {target_lat:.6f}, {target_lon:.6f});" js_code = f"drawMissionPlan({center_lat:.6f}, {center_lon:.6f}, {target_lat:.6f}, {target_lon:.6f});"
self.map_view.page().runJavaScript(js_code) self.map_view.page().runJavaScript(js_code)
@ -941,16 +1022,26 @@ class DroneMap:
"""獲取多邊形選擇信號""" """獲取多邊形選擇信號"""
return self.bridge.polygon_selected return self.bridge.polygon_selected
def get_mission_mode_changed_signal(self):
"""獲取任務模式切換信號"""
return self.bridge.mission_mode_changed
def get_route_confirmed_signal(self):
"""獲取路徑確認信號"""
return self.bridge.route_confirmed
class MapBridge(QObject): class MapBridge(QObject):
"""JavaScript 和 Python 之間的橋接類""" """JavaScript 和 Python 之間的橋接類"""
gps_signal = pyqtSignal(float, float) # lat, lon gps_signal = pyqtSignal(float, float)
drone_clicked = pyqtSignal(str) # drone_id drone_clicked = pyqtSignal(str)
clear_all_drone_selection = pyqtSignal() # clear all drone selection clear_all_drone_selection = pyqtSignal()
select_all_drones = pyqtSignal() # select all drones select_all_drones = pyqtSignal()
start_mission_signal = pyqtSignal(float, float, float, float) # center_lat, center_lon, target_lat, target_lon start_mission_signal = pyqtSignal(float, float, float, float)
pause_mission_signal = pyqtSignal() # pause mission pause_mission_signal = pyqtSignal()
rectangle_selected = pyqtSignal(str) # JSON string of rectangle points rectangle_selected = pyqtSignal(str)
polygon_selected = pyqtSignal(str) # JSON string of polygon points polygon_selected = pyqtSignal(str)
mission_mode_changed = pyqtSignal(str)
route_confirmed = pyqtSignal(str) # 路徑確認 (JSON 字串)
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@ -999,4 +1090,16 @@ class MapBridge(QObject):
def polygonSelected(self, points_json): def polygonSelected(self, points_json):
"""供 JavaScript 調用的方法 - 多邊形選擇完成""" """供 JavaScript 調用的方法 - 多邊形選擇完成"""
self.polygon_selected.emit(points_json) self.polygon_selected.emit(points_json)
print(f"🔷 多邊形區域已選擇: {points_json}") print(f"🔷 多邊形區域已選擇: {points_json}")
@pyqtSlot(str)
def missionModeChanged(self, mode):
"""供 JavaScript 調用的方法 - 任務模式切換"""
self.mission_mode_changed.emit(mode)
print(f"🔄 任務模式切換: {mode}")
@pyqtSlot(str)
def routeConfirmed(self, points_json):
"""供 JavaScript 調用的方法 - 路徑確認"""
self.route_confirmed.emit(points_json)
print(f"📍 路徑已確認: {points_json}")

@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
任務執行模組
管理多架無人機的 GUIDED 模式飛行控制迴圈
設計:
- 每架無人機持有一個航點序列逐點推進
- 各自到達就各自切換到下一個航點
- QTimer 驅動 Qt 主線程執行
- 暫停 = 停止發送 setpoint飛控自動懸停
- 相容舊的 2 階段任務與新的多航點任務 (Grid Sweep)
"""
import math
from enum import Enum
from PyQt6.QtCore import QObject, QTimer, pyqtSignal
class MissionState(Enum):
"""整體任務狀態"""
IDLE = "idle"
RUNNING = "running"
PAUSED = "paused"
class DroneTask:
"""單架無人機的任務資料"""
__slots__ = ('drone_id', 'sysid', 'waypoints', 'wp_index', 'done')
def __init__(self, drone_id, sysid, waypoints):
"""
Args:
drone_id: GUI 用的 ID ( 's0_11')
sysid: MAVLink system ID ( 11)
waypoints: 航點序列 [(lat, lon, alt), ...]
"""
self.drone_id = drone_id
self.sysid = sysid
self.waypoints = waypoints
self.wp_index = 0
self.done = len(waypoints) == 0
@property
def current_target(self):
if self.done or self.wp_index >= len(self.waypoints):
return None
return self.waypoints[self.wp_index]
@property
def total_waypoints(self):
return len(self.waypoints)
class MissionExecutor(QObject):
"""
任務執行器
planned_waypoints 格式:
{
'drone_ids': ['s0_1', 's0_2', ...],
'waypoints': [
[(lat,lon,alt), ...], # drone 0
[(lat,lon,alt), ...], # drone 1
]
}
"""
# 信號
drone_waypoint_reached = pyqtSignal(str, int, int) # (drone_id, wp_index, total)
mission_completed = pyqtSignal()
def __init__(self, sender, drone_gps,
arrival_radius=2.0, send_rate_hz=2.0):
super().__init__()
self.sender = sender
self.drone_gps = drone_gps
self.arrival_radius = arrival_radius
self.state = MissionState.IDLE
self.tasks = {}
self._interval_ms = int(1000 / send_rate_hz)
self._timer = QTimer()
self._timer.timeout.connect(self._tick)
# ------------------------------------------------------------------ 公開方法
def start(self, planned_waypoints):
"""啟動任務"""
if self.state == MissionState.RUNNING:
print("任務已在執行中")
return
self.tasks.clear()
drone_ids = planned_waypoints['drone_ids']
waypoints_list = planned_waypoints['waypoints']
for i, drone_id in enumerate(drone_ids):
sysid = int(drone_id.split('_')[1])
self.tasks[drone_id] = DroneTask(drone_id, sysid, waypoints_list[i])
self.state = MissionState.RUNNING
self._timer.start(self._interval_ms)
total_wps = sum(t.total_waypoints for t in self.tasks.values())
print(f"任務啟動: {len(self.tasks)} 架無人機, "
f"{total_wps} 個航點, "
f"到達半徑={self.arrival_radius}m, "
f"發送週期={self._interval_ms}ms")
def pause(self):
"""暫停任務"""
if self.state == MissionState.RUNNING:
self._timer.stop()
self.state = MissionState.PAUSED
print("任務暫停")
def resume(self):
"""恢復任務"""
if self.state == MissionState.PAUSED:
self._timer.start(self._interval_ms)
self.state = MissionState.RUNNING
print("任務恢復")
def stop(self):
"""停止任務"""
self._timer.stop()
self.tasks.clear()
self.state = MissionState.IDLE
print("任務停止")
# ------------------------------------------------------------------ 控制迴圈
def _tick(self):
"""控制迴圈"""
all_done = True
for task in self.tasks.values():
if task.done:
continue
all_done = False
target = task.current_target
if target is None:
continue
# 讀取當前 GPS
gps = self.drone_gps.get(task.drone_id)
if gps is None:
continue
tgt_lat, tgt_lon, tgt_alt = target
distance = _haversine(gps['lat'], gps['lon'], tgt_lat, tgt_lon)
# 到達判定
if distance < self.arrival_radius:
task.wp_index += 1
if task.wp_index >= task.total_waypoints:
task.done = True
self.drone_waypoint_reached.emit(
task.drone_id, task.wp_index, task.total_waypoints
)
print(f" {task.drone_id} → DONE "
f"({task.total_waypoints}/{task.total_waypoints})")
continue
else:
self.drone_waypoint_reached.emit(
task.drone_id, task.wp_index, task.total_waypoints
)
print(f" {task.drone_id} → WP {task.wp_index}/{task.total_waypoints} "
f"(距離: {distance:.1f}m)")
# 更新目標
tgt_lat, tgt_lon, tgt_alt = task.current_target
# 發送 setpoint
self.sender.send_position_global(
task.sysid, tgt_lat, tgt_lon, tgt_alt
)
# 全部完成檢查
if all_done and self.tasks:
self._timer.stop()
self.state = MissionState.IDLE
self.mission_completed.emit()
print("===== 任務全部完成 =====")
# ------------------------------------------------------------------ 工具函式
def _haversine(lat1, lon1, lat2, lon2):
"""計算兩個 GPS 座標間的水平距離 (公尺)"""
R = 6371000.0
lat1_r = math.radians(lat1)
lat2_r = math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) ** 2 +
math.cos(lat1_r) * math.cos(lat2_r) * math.sin(dlon / 2) ** 2)
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
飛行任務規劃模組 飛行任務規劃模組
新增Circle Formation Leader-Follower 支援: M-Formation, Circle, Leader-Follower (Bezier 轉彎), Grid Sweep
""" """
import math import math
from typing import List, Tuple, Optional, Dict, Any from typing import List, Tuple, Optional, Dict, Any
@ -13,166 +13,172 @@ class MissionType(Enum):
M_FORMATION = "m_formation" M_FORMATION = "m_formation"
CIRCLE_FORMATION = "circle_formation" CIRCLE_FORMATION = "circle_formation"
LEADER_FOLLOWER = "leader_follower" LEADER_FOLLOWER = "leader_follower"
GRID_SWEEP = "grid_sweep"
class CoordinateConverter: class CoordinateConverter:
"""GPS 座標與 Local 座標的轉換器""" """GPS 座標與 Local 座標的轉換器"""
def __init__(self, origin_lat: float, origin_lon: float, origin_alt: float = 0): def __init__(self, origin_lat: float, origin_lon: float, origin_alt: float = 0):
self.origin_lat = origin_lat self.origin_lat = origin_lat
self.origin_lon = origin_lon self.origin_lon = origin_lon
self.origin_alt = origin_alt self.origin_alt = origin_alt
self.R = 6371000.0 self.R = 6371000.0
def gps_to_local(self, lat: float, lon: float, alt: float) -> Tuple[float, float, float]: def gps_to_local(self, lat: float, lon: float, alt: float) -> Tuple[float, float, float]:
lat_rad = math.radians(lat) lat_rad = math.radians(lat)
lon_rad = math.radians(lon) lon_rad = math.radians(lon)
origin_lat_rad = math.radians(self.origin_lat) origin_lat_rad = math.radians(self.origin_lat)
origin_lon_rad = math.radians(self.origin_lon) origin_lon_rad = math.radians(self.origin_lon)
dlat = lat_rad - origin_lat_rad dlat = lat_rad - origin_lat_rad
dlon = lon_rad - origin_lon_rad dlon = lon_rad - origin_lon_rad
x = self.R * dlon * math.cos((lat_rad + origin_lat_rad) / 2) x = self.R * dlon * math.cos((lat_rad + origin_lat_rad) / 2)
y = self.R * dlat y = self.R * dlat
z = alt - self.origin_alt z = alt - self.origin_alt
return x, y, z return x, y, z
def local_to_gps(self, x: float, y: float, z: float) -> Tuple[float, float, float]: def local_to_gps(self, x: float, y: float, z: float) -> Tuple[float, float, float]:
origin_lat_rad = math.radians(self.origin_lat) origin_lat_rad = math.radians(self.origin_lat)
origin_lon_rad = math.radians(self.origin_lon) origin_lon_rad = math.radians(self.origin_lon)
lat_rad = origin_lat_rad + (y / self.R) lat_rad = origin_lat_rad + (y / self.R)
lon_rad = origin_lon_rad + (x / (self.R * math.cos((lat_rad + origin_lat_rad) / 2))) lon_rad = origin_lon_rad + (x / (self.R * math.cos((lat_rad + origin_lat_rad) / 2)))
lat = math.degrees(lat_rad) lat = math.degrees(lat_rad)
lon = math.degrees(lon_rad) lon = math.degrees(lon_rad)
alt = self.origin_alt + z alt = self.origin_alt + z
return lat, lon, alt return lat, lon, alt
class FormationPlanner: class FormationPlanner:
"""隊形規劃器""" """隊形規劃器"""
def __init__(self, spacing: float = 5.0, def __init__(self, spacing: float = 5.0,
base_altitude: float = 10.0, base_altitude: float = 10.0,
altitude_diff: float = 2.0): altitude_diff: float = 2.0):
self.spacing = spacing self.spacing = spacing
self.base_altitude = base_altitude self.base_altitude = base_altitude
self.altitude_diff = altitude_diff self.altitude_diff = altitude_diff
self.current_origin = None self.current_origin = None
self.converter = None self.converter = None
def plan_formation_mission(self, def plan_formation_mission(self,
drone_gps_positions: List[Tuple[float, float, float]], drone_gps_positions: List[Tuple[float, float, float]],
target_gps: Tuple[float, float, float], target_gps: Tuple[float, float, float],
mission_type: MissionType = MissionType.M_FORMATION, mission_type: MissionType = MissionType.M_FORMATION,
params: Optional[Dict[str, Any]] = None) -> Tuple[ params: Optional[Dict[str, Any]] = None
List[Tuple[float, float, float]], ) -> Tuple[List[List[Tuple[float, float, float]]],
List[Tuple[float, float, float]], Tuple[float, float, float]]:
Tuple[float, float, float]]:
"""
規劃兩階段隊形任務
Args:
drone_gps_positions: 當前無人機 GPS 位置列表
target_gps: 目標點 GPS 座標
mission_type: 任務類型
params: 任務參數
Returns:
(stage1_gps, stage2_gps, origin)
"""
if len(drone_gps_positions) == 0: if len(drone_gps_positions) == 0:
raise ValueError("無人機位置列表不能為空") raise ValueError("無人機位置列表不能為空")
# 計算中央點
center_lat = sum(pos[0] for pos in drone_gps_positions) / len(drone_gps_positions) center_lat = sum(pos[0] for pos in drone_gps_positions) / len(drone_gps_positions)
center_lon = sum(pos[1] for pos in drone_gps_positions) / len(drone_gps_positions) center_lon = sum(pos[1] for pos in drone_gps_positions) / len(drone_gps_positions)
center_alt = sum(pos[2] for pos in drone_gps_positions) / len(drone_gps_positions) center_alt = sum(pos[2] for pos in drone_gps_positions) / len(drone_gps_positions)
self.current_origin = (center_lat, center_lon, center_alt) self.current_origin = (center_lat, center_lon, center_alt)
self.converter = CoordinateConverter(center_lat, center_lon, center_alt) self.converter = CoordinateConverter(center_lat, center_lon, center_alt)
# 轉換到 Local 座標
drone_local = [self.converter.gps_to_local(*pos) for pos in drone_gps_positions] drone_local = [self.converter.gps_to_local(*pos) for pos in drone_gps_positions]
target_local = self.converter.gps_to_local(*target_gps) target_local = self.converter.gps_to_local(*target_gps)
# 計算隊形
if mission_type == MissionType.M_FORMATION: if mission_type == MissionType.M_FORMATION:
stage1_local, stage2_local = self._calculate_m_formation(drone_local, target_local, params) s1, s2 = self._calculate_m_formation(drone_local, target_local, params)
waypoints_local = [[s1[i], s2[i]] for i in range(len(drone_local))]
elif mission_type == MissionType.CIRCLE_FORMATION: elif mission_type == MissionType.CIRCLE_FORMATION:
stage1_local, stage2_local = self._calculate_circle_formation(drone_local, target_local, params) s1, s2 = self._calculate_circle_formation(drone_local, target_local, params)
waypoints_local = [[s1[i], s2[i]] for i in range(len(drone_local))]
elif mission_type == MissionType.LEADER_FOLLOWER: elif mission_type == MissionType.LEADER_FOLLOWER:
stage1_local, stage2_local = self._calculate_leader_follower(drone_local, target_local, params) params = params or {}
route_wps_gps = params.get('route_waypoints')
if route_wps_gps is None or len(route_wps_gps) < 2:
raise ValueError("LEADER_FOLLOWER 需要至少 2 個路徑點")
route_wps_local = [
self.converter.gps_to_local(wp[0], wp[1], 0)[:2]
for wp in route_wps_gps
]
waypoints_local = self._calculate_leader_follower(drone_local, route_wps_local, params)
elif mission_type == MissionType.GRID_SWEEP:
params = params or {}
rect_corners_gps = params.get('rect_corners')
if rect_corners_gps is None or len(rect_corners_gps) != 4:
raise ValueError("GRID_SWEEP 需要 4 個 GPS 角點")
rect_corners_local = [
self.converter.gps_to_local(c[0], c[1], 0)[:2]
for c in rect_corners_gps
]
waypoints_local = self._calculate_grid_sweep(drone_local, rect_corners_local, params)
else: else:
raise ValueError(f"不支援的任務類型: {mission_type}") raise ValueError(f"不支援的任務類型: {mission_type}")
# 轉回 GPS waypoints_gps = []
stage1_gps = [self.converter.local_to_gps(*pos) for pos in stage1_local] for drone_wps in waypoints_local:
stage2_gps = [self.converter.local_to_gps(*pos) for pos in stage2_local] gps_wps = [self.converter.local_to_gps(*wp) for wp in drone_wps]
waypoints_gps.append(gps_wps)
return stage1_gps, stage2_gps, self.current_origin
return waypoints_gps, self.current_origin
# ------------------------------------------------------------------ M-Formation
def _calculate_m_formation(self, drone_positions, target_point, params): def _calculate_m_formation(self, drone_positions, target_point, params):
"""M字形編隊原本的邏輯"""
params = params or {} params = params or {}
N = len(drone_positions) N = len(drone_positions)
spacing = params.get('spacing', self.spacing) spacing = params.get('spacing', self.spacing)
base_altitude = params.get('base_altitude', self.base_altitude) base_altitude = params.get('base_altitude', self.base_altitude)
altitude_diff = params.get('altitude_diff', self.altitude_diff) altitude_diff = params.get('altitude_diff', self.altitude_diff)
C_x = sum(pos[0] for pos in drone_positions) / N C_x = sum(pos[0] for pos in drone_positions) / N
C_y = sum(pos[1] for pos in drone_positions) / N C_y = sum(pos[1] for pos in drone_positions) / N
C_z = sum(pos[2] for pos in drone_positions) / N
T_x, T_y, T_z = target_point T_x, T_y, T_z = target_point
V_x, V_y = T_x - C_x, T_y - C_y V_x, V_y = T_x - C_x, T_y - C_y
P_x, P_y = -V_y, V_x P_x, P_y = -V_y, V_x
length = math.sqrt(P_x**2 + P_y**2) length = math.sqrt(P_x ** 2 + P_y ** 2)
P_x_unit, P_y_unit = (P_x / length, P_y / length) if length > 0.01 else (1.0, 0.0) P_x_unit, P_y_unit = (P_x / length, P_y / length) if length > 0.01 else (1.0, 0.0)
projections = [((pos[0] - C_x) * P_x_unit + (pos[1] - C_y) * P_y_unit, i) projections = [((pos[0] - C_x) * P_x_unit + (pos[1] - C_y) * P_y_unit, i)
for i, pos in enumerate(drone_positions)] for i, pos in enumerate(drone_positions)]
projections.sort() projections.sort()
stage1_positions = [None] * N stage1_positions = [None] * N
stage2_positions = [None] * N stage2_positions = [None] * N
for rank, (_, original_idx) in enumerate(projections): for rank, (_, original_idx) in enumerate(projections):
offset = (rank - (N - 1) / 2) * spacing offset = (rank - (N - 1) / 2) * spacing
altitude = base_altitude + (altitude_diff if rank % 2 == 0 else -altitude_diff) altitude = base_altitude + (altitude_diff if rank % 2 == 0 else -altitude_diff)
stage1_positions[original_idx] = (C_x + P_x_unit * offset, C_y + P_y_unit * offset, altitude) stage1_positions[original_idx] = (C_x + P_x_unit * offset, C_y + P_y_unit * offset, altitude)
stage2_positions[original_idx] = (T_x + P_x_unit * offset, T_y + P_y_unit * offset, altitude) stage2_positions[original_idx] = (T_x + P_x_unit * offset, T_y + P_y_unit * offset, altitude)
return stage1_positions, stage2_positions return stage1_positions, stage2_positions
# ------------------------------------------------------------------ Circle
def _calculate_circle_formation(self, drone_positions, target_point, params): def _calculate_circle_formation(self, drone_positions, target_point, params):
"""
圓形編隊
params: radius(10.0), altitude(10.0), start_angle(0.0)
"""
params = params or {} params = params or {}
N = len(drone_positions) N = len(drone_positions)
radius = params.get('radius', 10.0) radius = params.get('radius', 10.0)
altitude = params.get('altitude', 10.0) altitude = params.get('altitude', 10.0)
start_angle = params.get('start_angle', 0.0) start_angle = params.get('start_angle', 0.0)
center_x, center_y, center_z = target_point center_x, center_y, center_z = target_point
stage1_positions = [] stage1_positions = []
stage2_positions = [] stage2_positions = []
angle_step = 360.0 / N angle_step = 360.0 / N
for i in range(N): for i in range(N):
angle_rad = math.radians(start_angle + angle_step * i) angle_rad = math.radians(start_angle + angle_step * i)
final_x = center_x + radius * math.cos(angle_rad) final_x = center_x + radius * math.cos(angle_rad)
final_y = center_y + radius * math.sin(angle_rad) final_y = center_y + radius * math.sin(angle_rad)
final_z = altitude final_z = altitude
current_x, current_y, current_z = drone_positions[i] current_x, current_y, current_z = drone_positions[i]
stage1_positions.append(( stage1_positions.append((
current_x + (final_x - current_x) * 0.5, current_x + (final_x - current_x) * 0.5,
@ -180,71 +186,353 @@ class FormationPlanner:
current_z + (final_z - current_z) * 0.5 current_z + (final_z - current_z) * 0.5
)) ))
stage2_positions.append((final_x, final_y, final_z)) stage2_positions.append((final_x, final_y, final_z))
return stage1_positions, stage2_positions return stage1_positions, stage2_positions
def _calculate_leader_follower(self, drone_positions, target_point, params): # ------------------------------------------------------------------ 路徑跟隨 (Bezier 轉彎)
def _calculate_leader_follower(self, drone_positions, route_wps_local, params):
""" """
Leader-Follower 編隊 路徑跟隨編隊 Bezier 曲線轉彎版
params: follow_distance(15.0), altitude_offset(1.0)
步驟:
1. _build_center_path: 在轉折 WP 處用二次 Bezier 曲線平滑轉彎
2. 固定排序每架無人機沿中心路徑套用橫向+縱向偏移
隊形 (俯視):
| (前進方向) |
| D1 | 左偏移, 0m
| D2 | 右偏移, 5m
| D3 | 左偏移, 10m
| D4 | 右偏移, 15m
""" """
params = params or {}
N = len(drone_positions) N = len(drone_positions)
follow_distance = params.get('follow_distance', 15.0) # 預設 15m lateral_offset = params.get('lateral_offset', 3.0)
altitude_offset = params.get('altitude_offset', 1.0) longitudinal_spacing = params.get('longitudinal_spacing', 5.0)
altitude = params.get('altitude', self.base_altitude)
leader_x, leader_y, leader_z = drone_positions[0] turn_margin = params.get('turn_margin', 0.35) # 轉彎切入距離佔段長比例
target_x, target_y, target_z = target_point curve_resolution = params.get('curve_resolution', 8) # 每個彎道的插值點數
direction_x = target_x - leader_x # Step 1: 建立帶 Bezier 轉彎的中心路徑
direction_y = target_y - leader_y center_path = self._build_center_path(
direction_length = math.sqrt(direction_x**2 + direction_y**2) route_wps_local, turn_margin, curve_resolution
)
dir_unit_x, dir_unit_y = (direction_x / direction_length, direction_y / direction_length) \
if direction_length > 0.01 else (0.0, 1.0) # Step 2: 固定排序
first_dir = (center_path[0][2], center_path[0][3])
stage1_positions = [] first_perp = (-first_dir[1], first_dir[0])
stage2_positions = [] C_x = sum(p[0] for p in drone_positions) / N
C_y = sum(p[1] for p in drone_positions) / N
# Leader
leader_stage1 = (leader_x + direction_x * 0.3, leader_y + direction_y * 0.3, target_z) projections = [
stage1_positions.append(leader_stage1) ((pos[0] - C_x) * first_perp[0] + (pos[1] - C_y) * first_perp[1], i)
stage2_positions.append(target_point) for i, pos in enumerate(drone_positions)
]
# Followers projections.sort()
for i in range(1, N):
back_offset = follow_distance * i # Step 3: 偏移
height_offset = altitude_offset * (i % 2) all_waypoints = [None] * N
current_x, current_y, current_z = drone_positions[i] for rank, (_, original_idx) in enumerate(projections):
follow1_x = leader_stage1[0] - dir_unit_x * back_offset lat_sign = -1 if rank % 2 == 0 else 1
follow1_y = leader_stage1[1] - dir_unit_y * back_offset lat = lat_sign * lateral_offset
follow1_z = leader_stage1[2] + height_offset lon = rank * longitudinal_spacing
stage1_positions.append(( waypoints = []
current_x + (follow1_x - current_x) * 0.5, for (cx, cy, dx, dy) in center_path:
current_y + (follow1_y - current_y) * 0.5, perp_x, perp_y = -dy, dx
current_z + (follow1_z - current_z) * 0.5 ox = cx + lat * perp_x - lon * dx
)) oy = cy + lat * perp_y - lon * dy
waypoints.append((ox, oy, altitude))
stage2_positions.append((
target_x - dir_unit_x * back_offset, all_waypoints[original_idx] = waypoints
target_y - dir_unit_y * back_offset,
target_z + height_offset
))
return stage1_positions, stage2_positions
return all_waypoints
def _build_center_path(self, waypoints, turn_margin, curve_resolution):
"""
建立帶 Bezier 曲線轉彎的中心路徑
在每個轉折 WP :
1. 計算 pre_turn = WP - d_in × T
2. 計算 post_turn = WP + d_out × T
3. 用二次 Bezier 曲線: B(t) = (1-t)²·pre + 2t(1-t)·WP + ·post
4. 方向從導數得到: B'(t) = 2(1-t)(WP-pre) + 2t(post-WP)
Args:
waypoints: 路徑航點 [(x, y), ...]
turn_margin: 轉彎切入距離佔相鄰段長的比例 (0~0.5)
curve_resolution: 每個彎道的 Bezier 插值點數
Returns:
[(x, y, dir_x, dir_y), ...] 中心路徑點 + 單位方向
"""
num_wps = len(waypoints)
if num_wps < 2:
return [(waypoints[0][0], waypoints[0][1], 0.0, 1.0)]
# 計算每段方向和長度
segments = []
for i in range(num_wps - 1):
dx = waypoints[i + 1][0] - waypoints[i][0]
dy = waypoints[i + 1][1] - waypoints[i][1]
length = math.sqrt(dx ** 2 + dy ** 2)
if length < 0.01:
segments.append((0.0, 1.0, length))
else:
segments.append((dx / length, dy / length, length))
path = []
# 第一個航點
path.append((waypoints[0][0], waypoints[0][1],
segments[0][0], segments[0][1]))
# 中間航點 (轉彎處)
for i in range(1, num_wps - 1):
d_in_x, d_in_y, len_in = segments[i - 1]
d_out_x, d_out_y, len_out = segments[i]
# 切入距離 T: 取相鄰兩段中較短的 × turn_margin
T = min(len_in, len_out) * turn_margin
if T < 0.5:
# 段太短,不插彎,直接加一個平均方向點
avg_dx = d_in_x + d_out_x
avg_dy = d_in_y + d_out_y
avg_len = math.sqrt(avg_dx ** 2 + avg_dy ** 2)
if avg_len > 0.01:
avg_dx /= avg_len
avg_dy /= avg_len
else:
avg_dx, avg_dy = d_in_x, d_in_y
path.append((waypoints[i][0], waypoints[i][1], avg_dx, avg_dy))
continue
# P0 = pre_turn (弧線起始)
p0_x = waypoints[i][0] - d_in_x * T
p0_y = waypoints[i][1] - d_in_y * T
# P1 = WP 本身 (控制點)
p1_x = waypoints[i][0]
p1_y = waypoints[i][1]
# P2 = post_turn (弧線結束)
p2_x = waypoints[i][0] + d_out_x * T
p2_y = waypoints[i][1] + d_out_y * T
# 加入 pre_turn 點 (方向 = incoming)
path.append((p0_x, p0_y, d_in_x, d_in_y))
# 加入 Bezier 插值點
for j in range(1, curve_resolution):
t = j / curve_resolution
# B(t) = (1-t)²·P0 + 2t(1-t)·P1 + t²·P2
one_minus_t = 1.0 - t
bx = one_minus_t * one_minus_t * p0_x + \
2 * t * one_minus_t * p1_x + \
t * t * p2_x
by = one_minus_t * one_minus_t * p0_y + \
2 * t * one_minus_t * p1_y + \
t * t * p2_y
# B'(t) = 2(1-t)(P1-P0) + 2t(P2-P1) → 切線方向
tdx = 2 * one_minus_t * (p1_x - p0_x) + 2 * t * (p2_x - p1_x)
tdy = 2 * one_minus_t * (p1_y - p0_y) + 2 * t * (p2_y - p1_y)
# 正規化
t_len = math.sqrt(tdx ** 2 + tdy ** 2)
if t_len > 0.01:
tdx /= t_len
tdy /= t_len
else:
tdx, tdy = d_in_x, d_in_y
path.append((bx, by, tdx, tdy))
# 加入 post_turn 點 (方向 = outgoing)
path.append((p2_x, p2_y, d_out_x, d_out_y))
# 最後一個航點
path.append((waypoints[-1][0], waypoints[-1][1],
segments[-1][0], segments[-1][1]))
return path
# ------------------------------------------------------------------ 柵狀掃描
def _calculate_grid_sweep(self, drone_positions, rect_corners_local, params):
"""柵狀掃描:掃描方向沿矩形長邊,切割方向沿短邊"""
N = len(drone_positions)
line_spacing = params.get('line_spacing', 5.0)
altitude = params.get('altitude', self.base_altitude)
c0, c1, c2, c3 = rect_corners_local
edge_a = (c1[0] - c0[0], c1[1] - c0[1])
len_a = math.sqrt(edge_a[0] ** 2 + edge_a[1] ** 2)
edge_b = (c3[0] - c0[0], c3[1] - c0[1])
len_b = math.sqrt(edge_b[0] ** 2 + edge_b[1] ** 2)
if len_a >= len_b:
sweep_vec = edge_a
sweep_len = len_a
cut_vec = edge_b
cut_len = len_b
sweep_start_mid = ((c0[0] + c3[0]) / 2, (c0[1] + c3[1]) / 2)
sweep_end_mid = ((c1[0] + c2[0]) / 2, (c1[1] + c2[1]) / 2)
cut_start_corner = c0
else:
sweep_vec = edge_b
sweep_len = len_b
cut_vec = edge_a
cut_len = len_a
sweep_start_mid = ((c0[0] + c1[0]) / 2, (c0[1] + c1[1]) / 2)
sweep_end_mid = ((c3[0] + c2[0]) / 2, (c3[1] + c2[1]) / 2)
cut_start_corner = c0
sweep_dir = (sweep_vec[0] / sweep_len, sweep_vec[1] / sweep_len)
cut_dir = (cut_vec[0] / cut_len, cut_vec[1] / cut_len)
C_x = sum(p[0] for p in drone_positions) / N
C_y = sum(p[1] for p in drone_positions) / N
dist_to_start = math.sqrt(
(C_x - sweep_start_mid[0]) ** 2 + (C_y - sweep_start_mid[1]) ** 2
)
dist_to_end = math.sqrt(
(C_x - sweep_end_mid[0]) ** 2 + (C_y - sweep_end_mid[1]) ** 2
)
if dist_to_start <= dist_to_end:
near_corner_a = cut_start_corner
else:
sweep_dir = (-sweep_dir[0], -sweep_dir[1])
near_corner_a = (cut_start_corner[0] + sweep_vec[0],
cut_start_corner[1] + sweep_vec[1])
projections = [
((pos[0] - C_x) * cut_dir[0] + (pos[1] - C_y) * cut_dir[1], i)
for i, pos in enumerate(drone_positions)
]
projections.sort()
strip_width = cut_len / N
drone_sweep_proj = C_x * sweep_dir[0] + C_y * sweep_dir[1]
near_sweep_proj = near_corner_a[0] * sweep_dir[0] + near_corner_a[1] * sweep_dir[1]
gather_sweep_proj = (drone_sweep_proj + near_sweep_proj) / 2
all_waypoints = [None] * N
for rank, (_, original_idx) in enumerate(projections):
strip_center_offset = (rank + 0.5) * strip_width
base_x = near_corner_a[0] + cut_dir[0] * strip_center_offset
base_y = near_corner_a[1] + cut_dir[1] * strip_center_offset
waypoints_list = []
gather_offset = gather_sweep_proj - near_sweep_proj
gx = base_x + sweep_dir[0] * gather_offset
gy = base_y + sweep_dir[1] * gather_offset
waypoints_list.append((gx, gy, altitude))
num_lines = max(1, int(strip_width / line_spacing))
actual_spacing = strip_width / num_lines
first_line_offset = (rank * strip_width) + actual_spacing / 2
entry_x = near_corner_a[0] + cut_dir[0] * first_line_offset
entry_y = near_corner_a[1] + cut_dir[1] * first_line_offset
waypoints_list.append((entry_x, entry_y, altitude))
current_cut_offset = first_line_offset
for line_idx in range(num_lines):
line_near_x = near_corner_a[0] + cut_dir[0] * current_cut_offset
line_near_y = near_corner_a[1] + cut_dir[1] * current_cut_offset
line_far_x = line_near_x + sweep_dir[0] * sweep_len
line_far_y = line_near_y + sweep_dir[1] * sweep_len
if line_idx % 2 == 0:
waypoints_list.append((line_far_x, line_far_y, altitude))
else:
waypoints_list.append((line_near_x, line_near_y, altitude))
if line_idx < num_lines - 1:
next_cut_offset = current_cut_offset + actual_spacing
next_near_x = near_corner_a[0] + cut_dir[0] * next_cut_offset
next_near_y = near_corner_a[1] + cut_dir[1] * next_cut_offset
next_far_x = next_near_x + sweep_dir[0] * sweep_len
next_far_y = next_near_y + sweep_dir[1] * sweep_len
if line_idx % 2 == 0:
waypoints_list.append((next_far_x, next_far_y, altitude))
else:
waypoints_list.append((next_near_x, next_near_y, altitude))
current_cut_offset = next_cut_offset
all_waypoints[original_idx] = waypoints_list
return all_waypoints
# ================================================================================
# 測試 # 測試
# ================================================================================
if __name__ == "__main__": if __name__ == "__main__":
planner = FormationPlanner() planner = FormationPlanner()
drones = [(24.123450, 120.567800, 100.0), (24.123470, 120.567820, 102.0), (24.123440, 120.567810, 98.0)] drones = [
target = (24.123600, 120.568000, 105.0) (24.123450, 120.567800, 0.0),
(24.123470, 120.567820, 0.0),
# Leader-Follower (15m 間距) (24.123440, 120.567810, 0.0),
s1, s2, origin = planner.plan_formation_mission(drones, target, MissionType.LEADER_FOLLOWER, {'follow_distance': 15.0}) (24.123460, 120.567830, 0.0),
print("Leader-Follower (15m):") ]
for i, pos in enumerate(s2): target = (24.12400, 120.56795, 10.0)
print(f" {'Leader' if i == 0 else f'Follower {i}'}: {pos[0]:.6f}, {pos[1]:.6f}")
# M-Formation
wps, origin = planner.plan_formation_mission(drones, target, MissionType.M_FORMATION)
print("M-Formation:")
for i, wp_list in enumerate(wps):
print(f" Drone {i}: {len(wp_list)} waypoints")
# Leader-Follower (Bezier 轉彎)
route = [
(24.12345, 120.56780),
(24.12370, 120.56800),
(24.12390, 120.56820),
(24.12400, 120.56800),
(24.12420, 120.56790),
]
wps_lf, origin_lf = planner.plan_formation_mission(
drones, target, MissionType.LEADER_FOLLOWER,
params={
'route_waypoints': route,
'lateral_offset': 3.0,
'longitudinal_spacing': 5.0,
'turn_margin': 0.35,
'curve_resolution': 8,
'altitude': 10.0
}
)
print(f"\nLeader-Follower (Bezier turns):")
for i, wp_list in enumerate(wps_lf):
print(f" Drone {i}: {len(wp_list)} waypoints")
for j, wp in enumerate(wp_list):
print(f" WP{j}: ({wp[0]:.6f}, {wp[1]:.6f}, {wp[2]:.1f})")
# Grid Sweep
rect = [
(24.1237, 120.5679),
(24.1237, 120.5683),
(24.1240, 120.5683),
(24.1240, 120.5679)
]
wps2, origin2 = planner.plan_formation_mission(
drones, target, MissionType.GRID_SWEEP,
params={'rect_corners': rect, 'line_spacing': 5.0, 'altitude': 10.0}
)
print(f"\nGrid Sweep:")
for i, wp_list in enumerate(wps2):
print(f" Drone {i}: {len(wp_list)} waypoints")

@ -0,0 +1,299 @@
#!/usr/bin/env python3
"""
獨立測試腳本 驗證 MissionExecutor + MavlinkSender SITL 環境下的運作
使用方式:
1. 啟動 SITL
2. 修改下方 CONFIG 區塊
3. python3 test_mission.py
"""
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
import time
from pymavlink import mavutil
from PyQt6.QtWidgets import QApplication
from PyQt6.QtCore import QTimer
from mission_planner import FormationPlanner, MissionType
from command_sender import MavlinkSender
from mission_executor import MissionExecutor, MissionState
# ================================================================================
# CONFIG
# ================================================================================
# 接收用連線 (讀取無人機狀態)
RECV_CONNECTION = "udp:127.0.0.1:14550"
# 發送用連線 (發送 setpoint 指令)
SEND_CONNECTION = "udpout:127.0.0.1:14550"
# 要控制的無人機 sysid 列表
DRONE_SYSIDS = [1]
# 起飛高度 (公尺)
TAKEOFF_ALT = 10.0
# 任務規劃參數
FORMATION_SPACING = 5.0
BASE_ALTITUDE = 10.0
ALTITUDE_DIFF = 2.0
ARRIVAL_RADIUS = 2.0
# 測試模式: "formation" 或 "grid_sweep"
TEST_MODE = "formation"
# Grid Sweep 專用設定
GRID_LINE_SPACING = 5.0
# ================================================================================
class SITLDroneManager:
"""管理 SITL 無人機的連線、起飛前置作業"""
def __init__(self, connection_string, sysids):
self.connection_string = connection_string
self.sysids = sysids
self.mav = None
self.drone_gps = {}
def connect(self):
"""建立 MAVLink 連線並等待心跳"""
print(f"連線到 {self.connection_string} ...")
self.mav = mavutil.mavlink_connection(self.connection_string)
self.mav.wait_heartbeat()
print(f"已收到心跳: sysid={self.mav.target_system}, compid={self.mav.target_component}")
def set_guided_and_arm(self, sysid):
"""切換到 GUIDED 模式並解鎖"""
print(f"\n--- sysid={sysid}: 切換 GUIDED + 解鎖 ---")
# 切換 GUIDED 模式
self.mav.mav.command_long_send(
sysid, 1,
mavutil.mavlink.MAV_CMD_DO_SET_MODE,
0,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
4, # GUIDED = 4
0, 0, 0, 0, 0
)
time.sleep(1)
# 解鎖
self.mav.mav.command_long_send(
sysid, 1,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0, 1, 0, 0, 0, 0, 0, 0
)
time.sleep(1)
print(f" sysid={sysid}: GUIDED + ARMED")
def takeoff(self, sysid, altitude):
"""起飛到指定高度"""
print(f" sysid={sysid}: 起飛到 {altitude}m ...")
self.mav.mav.command_long_send(
sysid, 1,
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
0, 0, 0, 0, 0, 0, 0, altitude
)
def wait_for_altitude(self, sysid, target_alt, timeout=30):
"""等待無人機到達指定高度"""
print(f" sysid={sysid}: 等待到達 {target_alt}m ...")
start = time.time()
while time.time() - start < timeout:
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if msg and msg.get_srcSystem() == sysid:
alt = msg.relative_alt / 1000.0
if alt >= target_alt * 0.9:
print(f" sysid={sysid}: 已到達 {alt:.1f}m")
return True
print(f" sysid={sysid}: 等待超時!")
return False
def update_gps_once(self):
"""讀取一輪 GPS 資料更新 drone_gps"""
deadline = time.time() + 3
received = set()
while time.time() < deadline and len(received) < len(self.sysids):
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if msg is None:
continue
sid = msg.get_srcSystem()
if sid in self.sysids:
drone_id = f"s0_{sid}"
self.drone_gps[drone_id] = {
'lat': msg.lat / 1e7,
'lon': msg.lon / 1e7,
'alt': msg.relative_alt / 1000.0
}
received.add(sid)
for sid in self.sysids:
drone_id = f"s0_{sid}"
if drone_id in self.drone_gps:
gps = self.drone_gps[drone_id]
print(f" {drone_id}: ({gps['lat']:.6f}, {gps['lon']:.6f}, {gps['alt']:.1f}m)")
else:
print(f" {drone_id}: 尚未收到 GPS")
def start_gps_polling(self, interval_ms=200):
"""啟動定時 GPS 輪詢 (用 QTimer)"""
self._gps_timer = QTimer()
self._gps_timer.timeout.connect(self._poll_gps)
self._gps_timer.start(interval_ms)
def _poll_gps(self):
"""非阻塞方式讀取最新 GPS"""
while True:
msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=False)
if msg is None:
break
sid = msg.get_srcSystem()
if sid in self.sysids:
drone_id = f"s0_{sid}"
self.drone_gps[drone_id] = {
'lat': msg.lat / 1e7,
'lon': msg.lon / 1e7,
'alt': msg.relative_alt / 1000.0
}
def main():
# 建立 Qt 應用 (MissionExecutor 需要 QTimer)
app = QApplication(sys.argv)
# 連線 + 起飛前置作業
manager = SITLDroneManager(RECV_CONNECTION, DRONE_SYSIDS)
manager.connect()
for sysid in DRONE_SYSIDS:
manager.set_guided_and_arm(sysid)
manager.takeoff(sysid, TAKEOFF_ALT)
# 等待所有無人機到達起飛高度
for sysid in DRONE_SYSIDS:
manager.wait_for_altitude(sysid, TAKEOFF_ALT)
time.sleep(2)
# 讀取當前 GPS 位置
print("\n讀取當前 GPS 位置 ...")
manager.update_gps_once()
drone_ids = [f"s0_{sid}" for sid in DRONE_SYSIDS]
drone_gps_positions = []
for drone_id in drone_ids:
gps = manager.drone_gps.get(drone_id)
if gps is None:
print(f"錯誤: 讀不到 {drone_id} 的 GPS")
return
drone_gps_positions.append((gps['lat'], gps['lon'], gps['alt']))
# 規劃任務
print(f"\n規劃任務 (模式: {TEST_MODE}) ...")
planner = FormationPlanner(
spacing=FORMATION_SPACING,
base_altitude=BASE_ALTITUDE,
altitude_diff=ALTITUDE_DIFF
)
center_lat = drone_gps_positions[0][0]
center_lon = drone_gps_positions[0][1]
if TEST_MODE == "formation":
target_lat = center_lat + 30.0 / 111000.0
target_lon = center_lon
target_gps = (target_lat, target_lon, BASE_ALTITUDE)
print(f" 目標點: ({target_lat:.6f}, {target_lon:.6f})")
waypoints_per_drone, origin = planner.plan_formation_mission(
drone_gps_positions, target_gps, MissionType.M_FORMATION
)
elif TEST_MODE == "grid_sweep":
# 在無人機前方 30m 處建立 40m x 30m 的矩形
offset_lat = 30.0 / 111000.0
half_w = 20.0 / (111000.0 * 0.9)
half_h = 15.0 / 111000.0
rect_center_lat = center_lat + offset_lat
rect_center_lon = center_lon
rect_corners = [
(rect_center_lat - half_h, rect_center_lon - half_w),
(rect_center_lat - half_h, rect_center_lon + half_w),
(rect_center_lat + half_h, rect_center_lon + half_w),
(rect_center_lat + half_h, rect_center_lon - half_w),
]
target_gps = (rect_center_lat, rect_center_lon, BASE_ALTITUDE)
print(f" 矩形中心: ({rect_center_lat:.6f}, {rect_center_lon:.6f})")
waypoints_per_drone, origin = planner.plan_formation_mission(
drone_gps_positions, target_gps, MissionType.GRID_SWEEP,
params={
'rect_corners': rect_corners,
'line_spacing': GRID_LINE_SPACING,
'altitude': BASE_ALTITUDE
}
)
else:
print(f"未知測試模式: {TEST_MODE}")
return
planned_waypoints = {
'drone_ids': drone_ids,
'waypoints': waypoints_per_drone
}
# 印出規劃結果
for i, did in enumerate(drone_ids):
wps = waypoints_per_drone[i]
print(f" {did}: {len(wps)} 個航點")
for j, wp in enumerate(wps):
print(f" WP{j}: ({wp[0]:.6f}, {wp[1]:.6f}, {wp[2]:.1f}m)")
# 啟動任務
print("\n啟動任務 ...")
manager.start_gps_polling(interval_ms=200)
sender = MavlinkSender(SEND_CONNECTION)
executor = MissionExecutor(
sender=sender,
drone_gps=manager.drone_gps,
arrival_radius=ARRIVAL_RADIUS,
send_rate_hz=2.0
)
executor.drone_waypoint_reached.connect(
lambda did, idx, total: print(f"\n >> {did} 到達 WP {idx}/{total}")
)
executor.mission_completed.connect(
lambda: (print("\n===== 任務全部完成 ====="), app.quit())
)
# 設定超時自動退出
timeout_timer = QTimer()
timeout_timer.setSingleShot(True)
timeout_timer.timeout.connect(lambda: (
print("\n⚠ 測試超時,強制退出"),
executor.stop(),
app.quit()
))
timeout_timer.start(180_000) # 180 秒超時
executor.start(planned_waypoints)
print("進入事件迴圈 (等待任務完成或 180 秒超時) ...\n")
app.exec()
executor.stop()
sender.close()
print("測試結束")
if __name__ == "__main__":
main()

@ -0,0 +1,482 @@
#!/usr/bin/env python3
"""
任務規劃視覺化驗證工具含動畫模擬
使用方式:
1. GUI 自動觸發: python3 verify_waypoints.py --file /tmp/mission_plan.json
2. 獨立手動執行: python3 verify_waypoints.py
操作:
- 啟動後先顯示靜態航點圖
- 點擊圖下方的按鈕控制動畫
"""
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
import json
import argparse
import math
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.widgets import Button
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
from mission_planner import FormationPlanner, MissionType, CoordinateConverter
# ================================================================================
# 色彩定義
# ================================================================================
COLORS = ['#378ADD', '#1D9E75', '#BA7517', '#D85A30', '#7F77DD', '#D4537E',
'#E24B4A', '#639922', '#00BFFF', '#FF69B4']
# 動畫參數
FRAMES_PER_SEGMENT = 40 # 每段航點之間的畫面數
TRAIL_LENGTH = 60 # 拖尾長度(畫面數)
INTERVAL_MS = 50 # 每幀間隔 (ms)
# ================================================================================
# 靜態繪圖
# ================================================================================
def plot_grid_sweep(ax, data, conv):
"""畫 Grid Sweep 俯視圖"""
if 'rect_corners' in data:
rect_local = [conv.gps_to_local(c[0], c[1], 0)[:2] for c in data['rect_corners']]
xs = [p[0] for p in rect_local] + [rect_local[0][0]]
ys = [p[1] for p in rect_local] + [rect_local[0][1]]
ax.plot(xs, ys, 'k--', linewidth=1.5, label='Task area')
ax.fill(xs, ys, alpha=0.05, color='gray')
_draw_waypoint_paths(ax, data, conv, show_sweep_labels=True)
total_wps = sum(len(wps) for wps in data['waypoints'])
ax.set_title(f'Grid Sweep - {len(data["drone_ids"])} drones, {total_wps} waypoints')
ax.set_xlabel('East (m)')
ax.set_ylabel('North (m)')
ax.set_aspect('equal')
ax.grid(True, alpha=0.3)
def plot_formation(ax, data, conv):
"""畫 M-Formation / Circle / Leader-Follower 俯視圖"""
_draw_waypoint_paths(ax, data, conv, show_sweep_labels=False)
if 'target' in data:
t = data['target']
tx, ty, _ = conv.gps_to_local(t[0], t[1], t[2] if len(t) > 2 else 0)
ax.plot(tx, ty, 'r*', markersize=18, zorder=5)
ax.annotate('Target', (tx, ty + 1), fontsize=9, color='red', ha='center', va='bottom')
if 'route_waypoints' in data:
route_local = [conv.gps_to_local(wp[0], wp[1], 0)[:2] for wp in data['route_waypoints']]
rxs = [p[0] for p in route_local]
rys = [p[1] for p in route_local]
ax.plot(rxs, rys, 'r--', linewidth=1.5, alpha=0.5, label='Route center')
for i, (rx, ry) in enumerate(route_local):
ax.plot(rx, ry, 'ro', markersize=6, alpha=0.5)
ax.annotate(f'R{i+1}', (rx, ry + 0.8), fontsize=7, color='red',
ha='center', va='bottom', alpha=0.7)
mission_type = data.get('mission_type', 'formation')
total_wps = sum(len(wps) for wps in data['waypoints'])
ax.set_title(f'{mission_type} - {len(data["drone_ids"])} drones, {total_wps} waypoints')
ax.set_xlabel('East (m)')
ax.set_ylabel('North (m)')
ax.set_aspect('equal')
ax.grid(True, alpha=0.3)
def _draw_waypoint_paths(ax, data, conv, show_sweep_labels=False):
"""共用的航點路徑繪圖"""
drone_ids = data['drone_ids']
waypoints = data['waypoints']
drones_gps = data.get('drones_gps', [])
for i, pos in enumerate(drones_gps):
x, y, _ = conv.gps_to_local(pos[0], pos[1], pos[2] if len(pos) > 2 else 0)
c = COLORS[i % len(COLORS)]
ax.plot(x, y, 'o', color=c, markersize=10, zorder=5)
ax.annotate(f'{drone_ids[i]}', (x, y + 1), fontsize=8, fontweight='bold',
ha='center', va='bottom', color=c)
for i, wps in enumerate(waypoints):
c = COLORS[i % len(COLORS)]
local_wps = [conv.gps_to_local(wp[0], wp[1], wp[2]) for wp in wps]
xs = [p[0] for p in local_wps]
ys = [p[1] for p in local_wps]
ax.plot(xs, ys, '-', color=c, linewidth=1.5, alpha=0.7)
for j, (x, y, z) in enumerate(local_wps):
if show_sweep_labels:
if j == 0:
ax.plot(x, y, 's', color=c, markersize=8, zorder=4)
ax.annotate('gather', (x, y), fontsize=6, ha='right', va='top')
elif j == 1:
ax.plot(x, y, '^', color=c, markersize=8, zorder=4)
ax.annotate('entry', (x, y), fontsize=6, ha='right', va='top')
else:
ax.plot(x, y, '.', color=c, markersize=4)
else:
marker = 's' if j == 0 else '*'
ax.plot(x, y, marker, color=c, markersize=10, zorder=4)
ax.annotate(f'WP{j}\n({z:.0f}m)', (x, y), fontsize=6, ha='center', va='bottom')
if local_wps:
lx, ly, _ = local_wps[-1]
ax.plot(lx, ly, 'X', color=c, markersize=10, markeredgewidth=2, zorder=4)
# ================================================================================
# 動畫模擬
# ================================================================================
class MissionAnimator:
"""任務動畫控制器"""
def __init__(self, fig, ax, data, conv):
self.fig = fig
self.ax = ax
self.data = data
self.conv = conv
self.is_playing = False
self.anim = None
self.current_frame = 0
drone_ids = data['drone_ids']
waypoints = data['waypoints']
drones_gps = data.get('drones_gps', [])
self.num_drones = len(drone_ids)
# 建立完整航點序列:初始位置 → WP0 → WP1 → ...
self.all_local_wps = []
for i, wps in enumerate(waypoints):
local_wps = [conv.gps_to_local(wp[0], wp[1], wp[2]) for wp in wps]
# 把初始位置插入最前面
if i < len(drones_gps):
pos = drones_gps[i]
start = conv.gps_to_local(pos[0], pos[1], pos[2] if len(pos) > 2 else 0)
local_wps.insert(0, start)
self.all_local_wps.append(local_wps)
# 計算最大航點段數
self.max_segments = max(len(wps) - 1 for wps in self.all_local_wps) if self.all_local_wps else 0
self.total_frames = self.max_segments * FRAMES_PER_SEGMENT
# 預計算位置
self.positions = self._precompute_positions()
# 動畫元素
self.drone_dots = []
self.trail_lines = []
self.trail_data = [[] for _ in range(self.num_drones)]
self.status_text = None
def _precompute_positions(self):
"""預計算所有幀的位置 — 等時間步進"""
positions = []
for frame in range(self.total_frames + 1):
seg_idx = frame // FRAMES_PER_SEGMENT
seg_progress = (frame % FRAMES_PER_SEGMENT) / FRAMES_PER_SEGMENT
frame_positions = []
for drone_idx in range(self.num_drones):
wps = self.all_local_wps[drone_idx]
num_segs = len(wps) - 1
if seg_idx >= num_segs:
frame_positions.append((wps[-1][0], wps[-1][1]))
else:
x0, y0, _ = wps[seg_idx]
x1, y1, _ = wps[seg_idx + 1]
x = x0 + (x1 - x0) * seg_progress
y = y0 + (y1 - y0) * seg_progress
frame_positions.append((x, y))
positions.append(frame_positions)
return positions
def setup(self):
"""建立動畫元素和按鈕"""
for i in range(self.num_drones):
c = COLORS[i % len(COLORS)]
dot, = self.ax.plot([], [], 'o', color=c, markersize=12,
markeredgecolor='white', markeredgewidth=1.5, zorder=10)
self.drone_dots.append(dot)
trail, = self.ax.plot([], [], '-', color=c, linewidth=2.5, alpha=0.4, zorder=9)
self.trail_lines.append(trail)
self.status_text = self.ax.text(
0.02, 0.98, 'Ready',
transform=self.ax.transAxes, fontsize=10,
verticalalignment='top',
bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.8)
)
# 按鈕放在右上角圖內,避免擋到軸標籤
ax_play = self.fig.add_axes([0.72, 0.91, 0.08, 0.035])
self.btn_play = Button(ax_play, 'Play', color='#4CAF50', hovercolor='#66BB6A')
self.btn_play.label.set_color('white')
self.btn_play.label.set_fontweight('bold')
self.btn_play.on_clicked(self._on_play)
ax_pause = self.fig.add_axes([0.81, 0.91, 0.08, 0.035])
self.btn_pause = Button(ax_pause, 'Pause', color='#FF9800', hovercolor='#FFB74D')
self.btn_pause.label.set_color('white')
self.btn_pause.label.set_fontweight('bold')
self.btn_pause.on_clicked(self._on_pause)
ax_reset = self.fig.add_axes([0.90, 0.91, 0.08, 0.035])
self.btn_reset = Button(ax_reset, 'Reset', color='#F44336', hovercolor='#EF5350')
self.btn_reset.label.set_color('white')
self.btn_reset.label.set_fontweight('bold')
self.btn_reset.on_clicked(self._on_reset)
def _on_play(self, event):
if self.is_playing:
return
if self.anim is None:
self.anim = animation.FuncAnimation(
self.fig, self._update_frame,
frames=range(self.current_frame, self.total_frames + 1),
interval=INTERVAL_MS,
blit=False,
repeat=False
)
else:
self.anim.resume()
self.is_playing = True
self.fig.canvas.draw_idle()
def _on_pause(self, event):
if not self.is_playing or self.anim is None:
return
self.anim.pause()
self.is_playing = False
self.status_text.set_text(f'Paused (frame {self.current_frame}/{self.total_frames})')
self.fig.canvas.draw_idle()
def _on_reset(self, event):
if self.anim is not None:
self.anim.event_source.stop()
self.anim = None
self.is_playing = False
self.current_frame = 0
self.trail_data = [[] for _ in range(self.num_drones)]
for dot in self.drone_dots:
dot.set_data([], [])
for trail in self.trail_lines:
trail.set_data([], [])
self.status_text.set_text('Ready')
self.fig.canvas.draw_idle()
def _update_frame(self, frame):
self.current_frame = frame
if frame >= len(self.positions):
self.is_playing = False
self.status_text.set_text('Done')
return self.drone_dots + self.trail_lines + [self.status_text]
# seg_idx - 1 是因為第 0 段是 start→WP0
seg_idx = frame // FRAMES_PER_SEGMENT
progress = (frame % FRAMES_PER_SEGMENT) / FRAMES_PER_SEGMENT
# 顯示時把第 0 段標為 "Start -> WP0"
if seg_idx == 0:
label = f'Start -> WP0 Progress {progress:.0%}'
else:
label = f'WP{seg_idx-1} -> WP{seg_idx} Progress {progress:.0%}'
self.status_text.set_text(
f'{label} Frame {frame}/{self.total_frames}'
)
for i in range(self.num_drones):
x, y = self.positions[frame][i]
self.drone_dots[i].set_data([x], [y])
self.trail_data[i].append((x, y))
if len(self.trail_data[i]) > TRAIL_LENGTH:
self.trail_data[i] = self.trail_data[i][-TRAIL_LENGTH:]
trail_x = [p[0] for p in self.trail_data[i]]
trail_y = [p[1] for p in self.trail_data[i]]
self.trail_lines[i].set_data(trail_x, trail_y)
return self.drone_dots + self.trail_lines + [self.status_text]
# ================================================================================
# 主流程
# ================================================================================
def visualize_from_file(filepath):
"""從 JSON 檔案讀取並視覺化"""
with open(filepath, 'r') as f:
data = json.load(f)
origin = data['origin']
conv = CoordinateConverter(origin[0], origin[1], 0)
mission_type = data.get('mission_type', 'formation')
is_sweep = mission_type == 'grid_sweep'
fig, ax = plt.subplots(1, 1, figsize=(10, 8))
fig.suptitle(f'Mission Verification - {mission_type}', fontsize=13, fontweight='bold')
if is_sweep:
plot_grid_sweep(ax, data, conv)
else:
plot_formation(ax, data, conv)
_print_summary(data)
animator = MissionAnimator(fig, ax, data, conv)
animator.setup()
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()
def visualize_standalone():
"""獨立執行:使用內建測試資料"""
drones = [
(24.123450, 120.567800, 0.0),
(24.123470, 120.567820, 0.0),
(24.123440, 120.567810, 0.0),
(24.123460, 120.567830, 0.0),
]
rect_corners = [
(24.12380, 120.56775),
(24.12380, 120.56810),
(24.12420, 120.56810),
(24.12420, 120.56775),
]
target = (24.12400, 120.56795, 10.0)
planner = FormationPlanner(spacing=5.0, base_altitude=10.0, altitude_diff=2.0)
fig = plt.figure(figsize=(16, 10))
fig.suptitle('Mission Planner Verification (standalone)', fontsize=14, fontweight='bold')
# M-Formation
wps_m, origin_m = planner.plan_formation_mission(drones, target, MissionType.M_FORMATION)
conv_m = CoordinateConverter(origin_m[0], origin_m[1], 0)
data_m = {
'drone_ids': [f's0_{i + 1}' for i in range(len(drones))],
'waypoints': wps_m,
'drones_gps': drones,
'target': target,
'mission_type': 'M_FORMATION'
}
ax1 = fig.add_subplot(2, 2, 1)
plot_formation(ax1, data_m, conv_m)
# Grid Sweep 5m
target_gs = (sum(c[0] for c in rect_corners) / 4,
sum(c[1] for c in rect_corners) / 4, 10.0)
wps_g, origin_g = planner.plan_formation_mission(
drones, target_gs, MissionType.GRID_SWEEP,
params={'rect_corners': rect_corners, 'line_spacing': 5.0, 'altitude': 10.0}
)
conv_g = CoordinateConverter(origin_g[0], origin_g[1], 0)
data_g = {
'drone_ids': [f's0_{i + 1}' for i in range(len(drones))],
'waypoints': wps_g,
'drones_gps': drones,
'rect_corners': rect_corners,
'mission_type': 'grid_sweep'
}
ax2 = fig.add_subplot(2, 2, 2)
plot_grid_sweep(ax2, data_g, conv_g)
# Leader-Follower
route = [
(24.12360, 120.56780),
(24.12380, 120.56800),
(24.12400, 120.56820),
(24.12410, 120.56800),
(24.12420, 120.56790),
]
wps_lf, origin_lf = planner.plan_formation_mission(
drones, target, MissionType.LEADER_FOLLOWER,
params={'route_waypoints': route, 'lateral_offset': 3.0,
'longitudinal_spacing': 5.0, 'altitude': 10.0}
)
conv_lf = CoordinateConverter(origin_lf[0], origin_lf[1], 0)
data_lf = {
'drone_ids': [f's0_{i + 1}' for i in range(len(drones))],
'waypoints': wps_lf,
'drones_gps': drones,
'route_waypoints': route,
'target': target,
'mission_type': 'LEADER_FOLLOWER'
}
ax3 = fig.add_subplot(2, 2, 3)
plot_formation(ax3, data_lf, conv_lf)
# 3D
ax4 = fig.add_subplot(2, 2, 4, projection='3d')
_plot_3d(ax4, data_g, conv_g)
plt.tight_layout()
plt.show()
def _plot_3d(ax, data, conv):
"""3D 視角"""
if 'rect_corners' in data:
rect_local = [conv.gps_to_local(c[0], c[1], 0)[:2] for c in data['rect_corners']]
xs = [p[0] for p in rect_local] + [rect_local[0][0]]
ys = [p[1] for p in rect_local] + [rect_local[0][1]]
ax.plot(xs, ys, [0] * len(xs), 'k--', linewidth=1, alpha=0.4)
for i, wps in enumerate(data['waypoints']):
c = COLORS[i % len(COLORS)]
local_wps = [conv.gps_to_local(wp[0], wp[1], wp[2]) for wp in wps]
xs = [p[0] for p in local_wps]
ys = [p[1] for p in local_wps]
zs = [p[2] for p in local_wps]
ax.plot(xs, ys, zs, '-', color=c, linewidth=1.5)
if local_wps:
ax.scatter(xs[0], ys[0], zs[0], color=c, s=50, marker='s')
ax.scatter(xs[-1], ys[-1], zs[-1], color=c, s=50, marker='X')
ax.set_title('3D view')
ax.set_xlabel('East (m)')
ax.set_ylabel('North (m)')
ax.set_zlabel('Alt (m)')
def _print_summary(data):
"""終端印出摘要"""
drone_ids = data['drone_ids']
waypoints = data['waypoints']
mission_type = data.get('mission_type', 'unknown')
print(f"\n{'=' * 50}")
print(f"Mission: {mission_type}")
print(f"Drones: {len(drone_ids)}")
print(f"{'=' * 50}")
for i, did in enumerate(drone_ids):
wps = waypoints[i]
print(f" {did}: {len(wps)} waypoints")
for j, wp in enumerate(wps):
print(f" WP{j}: ({wp[0]:.6f}, {wp[1]:.6f}, {wp[2]:.1f}m)")
print(f"{'=' * 50}\n")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Mission waypoint visualizer')
parser.add_argument('--file', '-f', type=str, default=None,
help='JSON file from GUI (auto-generated)')
args = parser.parse_args()
if args.file:
visualize_from_file(args.file)
else:
visualize_standalone()
Loading…
Cancel
Save