diff --git a/src/GUI/command_sender.py b/src/GUI/command_sender.py
new file mode 100644
index 0000000..329a0f0
--- /dev/null
+++ b/src/GUI/command_sender.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python3
+"""
+指令發送模組
+負責將目標位置轉成具體的通訊指令發送到飛控
+
+現階段: MavlinkSender (直接 pymavlink 發送)
+未來: 替換為 ROS2Sender (發到 ROS2 topic,由 fc_network_adapter 轉發)
+"""
+from abc import ABC, abstractmethod
+from pymavlink import mavutil
+
+
+class CommandSender(ABC):
+ """指令發送抽象介面"""
+
+ @abstractmethod
+ def send_position_global(self, sysid, lat, lon, alt):
+ """
+ 發送全球座標位置指令
+
+ Args:
+ sysid: 目標無人機的 MAVLink system ID
+ lat: 緯度 (度)
+ lon: 經度 (度)
+ alt: 高度 (公尺)
+ """
+ pass
+
+ @abstractmethod
+ def close(self):
+ """關閉連線"""
+ pass
+
+
+class MavlinkSender(CommandSender):
+ """
+ MAVLink 直接發送 (驗證階段用)
+ 透過 SET_POSITION_TARGET_GLOBAL_INT 發送目標位置
+
+ 使用方式:
+ sender = MavlinkSender("udpout:127.0.0.1:14550")
+ sender.send_position_global(sysid=1, lat=24.123, lon=120.567, alt=10.0)
+ """
+
+ # type_mask: 只使用位置 (lat, lon, alt)
+ # 忽略速度 (bit 3,4,5)、加速度 (bit 6,7,8)、yaw (bit 10)、yaw_rate (bit 11)
+ TYPE_MASK_POSITION_ONLY = (
+ 0b0000_1101_1111_1000 # = 0x0DF8
+ )
+
+ def __init__(self, connection_string="udpout:127.0.0.1:14550"):
+ """
+ Args:
+ connection_string: MAVLink 連線字串
+ SITL 範例: "udpout:127.0.0.1:14550"
+ """
+ self.connection_string = connection_string
+ self.mav = mavutil.mavlink_connection(connection_string)
+ print(f"MavlinkSender 已建立連線: {connection_string}")
+
+ def send_position_global(self, sysid, lat, lon, alt):
+ """
+ 發送 SET_POSITION_TARGET_GLOBAL_INT
+
+ 注意:
+ - coordinate_frame 使用 MAV_FRAME_GLOBAL_RELATIVE_ALT_INT
+ 高度是相對起飛點的高度 (公尺)
+ - 如果 FormationPlanner 產出的 alt 是 AMSL 絕對高度,
+ 需要在外部先減去起飛點高度再傳入
+ """
+ self.mav.mav.set_position_target_global_int_send(
+ 0, # time_boot_ms (不使用)
+ sysid, # target_system
+ 1, # target_component (autopilot)
+ mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
+ self.TYPE_MASK_POSITION_ONLY,
+ int(lat * 1e7), # lat_int
+ int(lon * 1e7), # lon_int
+ float(alt), # alt
+ 0, 0, 0, # vx, vy, vz (忽略)
+ 0, 0, 0, # afx, afy, afz (忽略)
+ 0, 0 # yaw, yaw_rate (忽略)
+ )
+
+ def close(self):
+ """關閉 MAVLink 連線"""
+ if self.mav:
+ self.mav.close()
+ self.mav = None
+ print("MavlinkSender 已關閉")
\ No newline at end of file
diff --git a/src/GUI/gui.py b/src/GUI/gui.py
index 015167f..2dd8df5 100644
--- a/src/GUI/gui.py
+++ b/src/GUI/gui.py
@@ -7,6 +7,8 @@ from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout
from PyQt6.QtCore import Qt, QTimer
import sys
import asyncio
+import json
+import subprocess
# 導入分離的類別
from communication import DroneMonitor, UDPMavlinkReceiver, WebSocketMavlinkReceiver
@@ -16,9 +18,11 @@ from comm_panel import CommPanel
from overview_table import OverviewTable
# ================================================================================
-# 【新增】導入任務規劃器
+# 導入任務規劃器、執行器、發送器
# ================================================================================
-from mission_planner import FormationPlanner
+from mission_planner import FormationPlanner, MissionType
+from command_sender import MavlinkSender
+from mission_executor import MissionExecutor
# ================================================================================
class ControlStationUI(QMainWindow):
@@ -74,7 +78,7 @@ class ControlStationUI(QMainWindow):
self.serial_connections = []
# ================================================================================
- # 【新增】初始化任務規劃器
+ # 初始化任務規劃器
# ================================================================================
self.mission_planner = FormationPlanner(
spacing=5.0, # 5 公尺間距
@@ -83,11 +87,31 @@ class ControlStationUI(QMainWindow):
)
self.planned_waypoints = None # 儲存規劃結果
# ================================================================================
+
+ # ================================================================================
+ # 當前任務模式 (由地圖右上角下拉選單控制)
+ # ================================================================================
+ self.current_mission_mode = 'M_FORMATION'
+ # ================================================================================
+
+ # ================================================================================
+ # 初始化指令發送器與任務執行器
+ # ================================================================================
+ self.command_sender = MavlinkSender("udpout:127.0.0.1:14550") # 驗證階段寫死
+
+ self.mission_executor = MissionExecutor(
+ sender=self.command_sender,
+ drone_gps=self.monitor.drone_gps, # 直接傳引用,即時讀取
+ arrival_radius=2.0,
+ send_rate_hz=2.0
+ )
+ self.mission_executor.drone_waypoint_reached.connect(self.on_drone_waypoint_reached)
+ self.mission_executor.mission_completed.connect(self.on_mission_completed)
+ # ================================================================================
self.init_ui()
def init_ui(self):
- # 只呼叫一次
main_splitter = QSplitter(Qt.Orientation.Horizontal)
# 左側 TabWidget
@@ -100,7 +124,6 @@ class ControlStationUI(QMainWindow):
self.drone_panel_layout.setSpacing(0)
self.drone_panel_layout.setContentsMargins(10, 10, 10, 10)
- # Wrap drone panel in scroll area
scroll = QScrollArea()
scroll.setWidget(self.drone_panel_container)
scroll.setWidgetResizable(True)
@@ -131,40 +154,27 @@ class ControlStationUI(QMainWindow):
right_layout.setContentsMargins(10, 10, 10, 10)
right_layout.setSpacing(10)
- # ========== 批次控制區域(直接使用 layout) ==========
+ # ========== 批次控制區域 ==========
batch_control_layout = QHBoxLayout()
- # 添加批次操作標題
batch_title = QLabel("批次操作")
batch_title.setStyleSheet("""
- color: #DDD;
- font-size: 16px;
- font-weight: bold;
- padding: 5px;
- background-color: #333;
- border-radius: 4px;
+ color: #DDD; font-size: 16px; font-weight: bold;
+ padding: 5px; background-color: #333; border-radius: 4px;
""")
batch_control_layout.addWidget(batch_title)
- # 第一行:全選按鈕
first_row = QHBoxLayout()
select_all_btn = QPushButton("全選")
select_all_btn.clicked.connect(self.handle_select_all)
select_all_btn.setStyleSheet("""
- QPushButton {
- background-color: #444;
- color: #DDD;
- border: none;
- padding: 8px 12px;
- border-radius: 4px;
- min-width: 80px;
- }
+ QPushButton { background-color: #444; color: #DDD; border: none;
+ padding: 8px 12px; border-radius: 4px; min-width: 80px; }
QPushButton:hover { background-color: #555; }
""")
first_row.addWidget(select_all_btn)
first_row.addStretch()
- # 第二行:模式切換
mode_layout = QHBoxLayout()
mode_label = QLabel("模式:")
mode_label.setStyleSheet("color: #DDD; min-width: 40px;")
@@ -188,14 +198,8 @@ class ControlStationUI(QMainWindow):
batch_mode_btn = QPushButton("切換")
batch_mode_btn.clicked.connect(self.handle_batch_mode_change)
batch_mode_btn.setStyleSheet("""
- QPushButton {
- background-color: #444;
- color: #DDD;
- border: none;
- padding: 8px 12px;
- border-radius: 4px;
- min-width: 80px;
- }
+ QPushButton { background-color: #444; color: #DDD; border: none;
+ padding: 8px 12px; border-radius: 4px; min-width: 80px; }
QPushButton:hover { background-color: #555; }
""")
mode_layout.addWidget(mode_label)
@@ -203,50 +207,30 @@ class ControlStationUI(QMainWindow):
mode_layout.addWidget(batch_mode_btn)
mode_layout.addStretch()
- # 第三行:解鎖按鈕
third_row = QHBoxLayout()
arm_all_btn = QPushButton("解鎖")
arm_all_btn.clicked.connect(self.handle_arm_selected)
arm_all_btn.setStyleSheet("""
- QPushButton {
- background-color: #444;
- color: #DDD;
- border: none;
- padding: 8px 12px;
- border-radius: 4px;
- min-width: 80px;
- }
+ QPushButton { background-color: #444; color: #DDD; border: none;
+ padding: 8px 12px; border-radius: 4px; min-width: 80px; }
QPushButton:hover { background-color: #555; }
""")
third_row.addWidget(arm_all_btn)
third_row.addStretch()
- # 第四行:高度輸入和起飛按鈕
fourth_row = QHBoxLayout()
-
self.z_input = QLineEdit()
self.z_input.setFixedWidth(60)
self.z_input.setStyleSheet("""
- QLineEdit {
- background-color: #333;
- color: #DDD;
- border: 1px solid #555;
- border-radius: 4px;
- padding: 3px;
- }
+ QLineEdit { background-color: #333; color: #DDD;
+ border: 1px solid #555; border-radius: 4px; padding: 3px; }
""")
takeoff_all_btn = QPushButton("起飛")
takeoff_all_btn.clicked.connect(self.handle_takeoff_selected)
takeoff_all_btn.setStyleSheet("""
- QPushButton {
- background-color: #444;
- color: #DDD;
- border: none;
- padding: 8px 12px;
- border-radius: 4px;
- min-width: 80px;
- }
+ QPushButton { background-color: #444; color: #DDD; border: none;
+ padding: 8px 12px; border-radius: 4px; min-width: 80px; }
QPushButton:hover { background-color: #555; }
""")
@@ -255,13 +239,11 @@ class ControlStationUI(QMainWindow):
fourth_row.addWidget(takeoff_all_btn)
fourth_row.addStretch()
- # 組裝批次控制 layout
batch_control_layout.addLayout(first_row)
batch_control_layout.addLayout(mode_layout)
batch_control_layout.addLayout(third_row)
batch_control_layout.addLayout(fourth_row)
- # 將批次控制 layout 添加到右側容器
right_layout.addLayout(batch_control_layout)
# 添加地圖
@@ -271,8 +253,16 @@ class ControlStationUI(QMainWindow):
self.drone_map.get_clear_all_drone_selection_signal().connect(self.handle_clear_all_drone_selection)
self.drone_map.get_toggle_select_all_drones_signal().connect(self.handle_toggle_select_all_drones)
+ # ================================================================================
+ # 連接任務控制 + 矩形選取 + 任務模式切換 + 路徑確認信號
+ # ================================================================================
+ self.drone_map.get_start_mission_signal().connect(self.handle_start_mission)
+ self.drone_map.get_pause_mission_signal().connect(self.handle_pause_mission)
+ self.drone_map.get_rectangle_selected_signal().connect(self.handle_rectangle_selected)
+ self.drone_map.get_mission_mode_changed_signal().connect(self.on_mission_mode_changed)
+ self.drone_map.get_route_confirmed_signal().connect(self.handle_route_confirmed)
+ # ================================================================================
- # Add widgets to splitter
main_splitter.addWidget(self.left_tab)
main_splitter.addWidget(right_container)
main_splitter.setSizes([400, 1000])
@@ -280,70 +270,41 @@ class ControlStationUI(QMainWindow):
self.setCentralWidget(main_splitter)
+ # ================================================================================
+ # 連線管理
+ # ================================================================================
+
def handle_udp_connection_added(self, ip, port):
- """处理 UDP 连接添加信号"""
- # 创建新连接
- new_conn = {
- 'name': f'UDP {len(self.udp_connections) + 1}',
- 'ip': ip,
- 'port': port,
- 'enabled': True
- }
-
- # 启动接收器
+ new_conn = {'name': f'UDP {len(self.udp_connections) + 1}', 'ip': ip, 'port': port, 'enabled': True}
receiver = UDPMavlinkReceiver(ip, port, self.monitor.signals, new_conn['name'], self.monitor)
receiver.start()
self.udp_receivers.append(receiver)
new_conn['receiver'] = receiver
-
self.udp_connections.append(new_conn)
-
- # 添加到 UI
self.comm_panel.add_udp_panel(new_conn)
-
- self.statusBar().showMessage(f"已添加 UDP 连接: {ip}:{port}", 3000)
- print(f"UDP MAVLink receiver added and started on {ip}:{port}")
+ self.statusBar().showMessage(f"已添加 UDP 連接: {ip}:{port}", 3000)
def handle_ws_connection_added(self, url):
- """处理 WebSocket 连接添加信号"""
- # 创建新连接
- new_conn = {
- 'name': f'WS {len(self.ws_connections) + 1}',
- 'url': url,
- 'enabled': True
- }
-
- # 启动接收器
+ new_conn = {'name': f'WS {len(self.ws_connections) + 1}', 'url': url, 'enabled': True}
receiver = WebSocketMavlinkReceiver(url, self.monitor.signals, new_conn['name'], self.monitor)
receiver.start()
self.monitor.ws_receivers.append(receiver)
new_conn['receiver'] = receiver
-
self.ws_connections.append(new_conn)
-
- # 添加到 UI
self.comm_panel.add_ws_panel(new_conn)
-
- self.statusBar().showMessage(f"已添加 WebSocket 连接: {url}", 3000)
- print(f"WebSocket receiver added and started: {url}")
+ self.statusBar().showMessage(f"已添加 WebSocket 連接: {url}", 3000)
def create_drone_panel(self, drone_id):
- """創建無人機面板"""
panel = DronePanel(drone_id)
-
- # 連接信號
panel.mode_change_requested.connect(self.handle_mode_change)
panel.arm_requested.connect(self.handle_arm)
panel.takeoff_requested.connect(self.handle_takeoff)
panel.setpoint_requested.connect(self.handle_single_setpoint)
panel.selection_changed.connect(self.handle_drone_selection)
-
return panel
def toggle_ws_connection(self, conn, btn, status_label):
- """切換 WebSocket 連接狀態"""
if conn.get('enabled', False):
- # 停止連接
if 'receiver' in conn and conn['receiver']:
conn['receiver'].stop()
conn['enabled'] = False
@@ -352,7 +313,6 @@ class ControlStationUI(QMainWindow):
status_label.setToolTip("已停止")
self.statusBar().showMessage(f"已停止 WebSocket 連接: {conn['url']}", 3000)
else:
- # 啟動連接
receiver = WebSocketMavlinkReceiver(conn['url'], self.monitor.signals, conn['name'])
receiver.start()
self.monitor.ws_receivers.append(receiver)
@@ -364,30 +324,19 @@ class ControlStationUI(QMainWindow):
self.statusBar().showMessage(f"已啟動 WebSocket 連接: {conn['url']}", 3000)
def remove_ws_connection(self, conn, panel):
- """移除 WebSocket 连接"""
- # 停止接收器
if 'receiver' in conn and conn['receiver']:
conn['receiver'].stop()
if conn['receiver'] in self.monitor.ws_receivers:
self.monitor.ws_receivers.remove(conn['receiver'])
-
- # 从列表移除
if conn in self.ws_connections:
self.ws_connections.remove(conn)
-
- # 从 comm_panel 列表移除
self.comm_panel.remove_ws_connection_from_list(conn)
-
- # 从 UI 移除
panel.setParent(None)
panel.deleteLater()
-
- self.statusBar().showMessage(f"已移除 WebSocket 连接: {conn['url']}", 3000)
+ self.statusBar().showMessage(f"已移除 WebSocket 連接: {conn['url']}", 3000)
def toggle_udp_connection(self, conn, btn, status_label):
- """切換 UDP 連接狀態"""
if conn.get('enabled', False):
- # 停止連接
if 'receiver' in conn and conn['receiver']:
conn['receiver'].stop()
conn['enabled'] = False
@@ -396,7 +345,6 @@ class ControlStationUI(QMainWindow):
status_label.setToolTip("已停止")
self.statusBar().showMessage(f"已停止 UDP 連接: {conn['ip']}:{conn['port']}", 3000)
else:
- # 啟動連接
receiver = UDPMavlinkReceiver(conn['ip'], conn['port'], self.monitor.signals, conn['name'])
receiver.start()
self.udp_receivers.append(receiver)
@@ -408,59 +356,34 @@ class ControlStationUI(QMainWindow):
self.statusBar().showMessage(f"已啟動 UDP 連接: {conn['ip']}:{conn['port']}", 3000)
def remove_udp_connection(self, conn, panel):
- """移除 UDP 连接"""
- # 停止接收器
if 'receiver' in conn and conn['receiver']:
conn['receiver'].stop()
if conn['receiver'] in self.udp_receivers:
self.udp_receivers.remove(conn['receiver'])
-
- # 从列表移除
if conn in self.udp_connections:
self.udp_connections.remove(conn)
-
- # 从 comm_panel 列表移除
self.comm_panel.remove_udp_connection_from_list(conn)
-
- # 从 UI 移除
panel.setParent(None)
panel.deleteLater()
-
- self.statusBar().showMessage(f"已移除 UDP 连接: {conn['ip']}:{conn['port']}", 3000)
+ self.statusBar().showMessage(f"已移除 UDP 連接: {conn['ip']}:{conn['port']}", 3000)
def handle_serial_connection_added(self, port, baudrate):
- """處理添加 Serial 連接"""
- conn = {
- 'name': f'Serial',
- 'port': port,
- 'baudrate': baudrate,
- 'enabled': False,
- 'receiver': None
- }
-
- # 添加到列表
+ conn = {'name': 'Serial', 'port': port, 'baudrate': baudrate, 'enabled': False, 'receiver': None}
self.serial_connections.append(conn)
-
- # 在 UI 中顯示
- panel = self.comm_panel.add_serial_panel(conn)
-
- self.statusBar().showMessage(f"已添加 Serial 连接: {port} @ {baudrate}", 3000)
+ self.comm_panel.add_serial_panel(conn)
+ self.statusBar().showMessage(f"已添加 Serial 連接: {port} @ {baudrate}", 3000)
def toggle_serial_connection(self, conn, btn, status_label):
- """切換 Serial 連接狀態"""
if conn.get('enabled', False):
- # 停止連接
if conn.get('receiver'):
conn['receiver'].stop()
conn['receiver'] = None
-
conn['enabled'] = False
btn.setText("啟動")
status_label.setStyleSheet("color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
self.statusBar().showMessage(f"已停止 Serial 連接: {conn['port']}", 3000)
else:
- # 啟動連接
try:
receiver = self.monitor.start_serial_connection(conn['port'], conn['baudrate'])
conn['receiver'] = receiver
@@ -473,35 +396,27 @@ class ControlStationUI(QMainWindow):
self.statusBar().showMessage(f"啟動 Serial 連接失敗: {str(e)}", 5000)
def remove_serial_connection(self, conn, panel):
- """移除 Serial 連接"""
- # 停止連接
if conn.get('enabled', False) and conn.get('receiver'):
conn['receiver'].stop()
-
- # 从列表移除
if conn in self.serial_connections:
self.serial_connections.remove(conn)
-
- # 从 comm_panel 列表移除
self.comm_panel.remove_serial_connection_from_list(conn)
-
- # 从 UI 移除
panel.setParent(None)
panel.deleteLater()
-
- self.statusBar().showMessage(f"已移除 Serial 连接: {conn['port']}", 3000)
+ self.statusBar().showMessage(f"已移除 Serial 連接: {conn['port']}", 3000)
def create_socket_group_panel(self, socket_id):
- """創建 socket 分組面板"""
color = self.socket_colors.get(socket_id, self.socket_colors['default'])
- # 如果已知socket類型,傳遞給panel
socket_type = self.socket_types.get(socket_id, None)
panel = SocketGroupPanel(socket_id, color, socket_type)
panel.group_selection_changed.connect(self.handle_group_selection)
return panel
+ # ================================================================================
+ # 無人機操作
+ # ================================================================================
+
def handle_mode_change(self, drone_id):
- """處理單個無人機的模式切換"""
mode = self.mode_combo.currentText()
loop = asyncio.get_event_loop()
future = self.monitor.set_mode(drone_id, mode)
@@ -509,22 +424,20 @@ class ControlStationUI(QMainWindow):
def handle_arm(self, drone_id):
loop = asyncio.get_event_loop()
- arm_state = not self.monitor.get_arm_state(drone_id) # Toggle arm state
+ arm_state = not self.monitor.get_arm_state(drone_id)
future = self.monitor.arm_drone(drone_id, arm_state)
loop.create_task(self.handle_service_response(future, f"{'解鎖' if arm_state else '上鎖'} {drone_id}"))
def handle_takeoff(self, drone_id):
loop = asyncio.get_event_loop()
- future = self.monitor.takeoff_drone(drone_id, 10.0) # 10 meters default
+ future = self.monitor.takeoff_drone(drone_id, 10.0)
loop.create_task(self.handle_service_response(future, f"起飛 {drone_id}"))
def handle_setpoint_selected(self):
- """處理發送 setpoint 命令"""
try:
x = float(self.x_input.text() or '0')
y = float(self.y_input.text() or '0')
z = float(self.z_input.text() or '0')
-
for drone_id in self.monitor.selected_drones:
if self.monitor.send_setpoint(drone_id, x, y, z):
self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000)
@@ -534,12 +447,10 @@ class ControlStationUI(QMainWindow):
self.statusBar().showMessage("座標格式錯誤", 3000)
def handle_single_setpoint(self, drone_id):
- """處理單個無人機的 setpoint 命令"""
try:
x = float(self.x_input.text() or '0')
y = float(self.y_input.text() or '0')
z = float(self.z_input.text() or '0')
-
if self.monitor.send_setpoint(drone_id, x, y, z):
self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000)
else:
@@ -557,105 +468,85 @@ class ControlStationUI(QMainWindow):
except Exception as e:
self.statusBar().showMessage(f"{action} 錯誤: {str(e)}", 3000)
+ def handle_arm_selected(self):
+ loop = asyncio.get_event_loop()
+ for drone_id in self.monitor.selected_drones:
+ future = self.monitor.arm_drone(drone_id, True)
+ loop.create_task(self.handle_service_response(future, f"批次解鎖 {drone_id}"))
+
+ def handle_takeoff_selected(self):
+ loop = asyncio.get_event_loop()
+ for drone_id in self.monitor.selected_drones:
+ future = self.monitor.takeoff_drone(drone_id, 10.0)
+ loop.create_task(self.handle_service_response(future, f"批次起飛 {drone_id}"))
+
+ def handle_batch_mode_change(self):
+ mode = self.mode_combo.currentText()
+ loop = asyncio.get_event_loop()
+ for drone_id in self.monitor.selected_drones:
+ future = self.monitor.set_mode(drone_id, mode)
+ loop.create_task(self.handle_service_response(future, f"{drone_id} 切換模式 {mode}"))
+
+ # ================================================================================
+ # UI 更新
+ # ================================================================================
+
def update_ui(self, msg_type, drone_id, data):
- # 優先處理 connection_type,即使 drone 還不存在
if msg_type == 'connection_type':
- # 獲取連接類型和socket ID
conn_type = data.get('type', 'Unknown')
- # 從 drone_id 提取 socket_id (格式: s{socket}_{sys})
parts = drone_id.split('_')
if len(parts) >= 2 and parts[0].startswith('s'):
- socket_id = parts[0][1:] # 移除 's' 前綴
- # 只在第一次收到時更新
+ socket_id = parts[0][1:]
if socket_id not in self.socket_types:
self.socket_types[socket_id] = conn_type
- # 如果 socket group 已存在,更新標題
if socket_id in self.socket_groups:
self.socket_groups[socket_id].set_socket_type(conn_type)
return
- # 檢查是否為新無人機,若是則加入無人機面板
if drone_id not in self.drones:
self.add_drone(drone_id)
return
- # 確認無人機面板存在
if not (panel := self.drones.get(drone_id)):
return
- # 更新特定欄位並在總覽頁面更新
if msg_type == 'state':
mode = data.get('mode', 'UNKNOWN')
armed = data.get('armed', None)
mode_color = '#FF5555' if any(x in mode.upper() for x in ['RTL', '返航', 'EMERGENCY']) else '#55FF55'
if armed is True:
- arm_text = "ARMED"
- arm_color = '#55FF55'
+ arm_text, arm_color = "ARMED", '#55FF55'
elif armed is False:
- arm_text = "DISARMED"
- arm_color = '#FF5555'
+ arm_text, arm_color = "DISARMED", '#FF5555'
else:
- arm_text = "--"
- arm_color = '#AAAAAA' # 未知狀態
-
- # 更新無人機面板欄位
+ arm_text, arm_color = "--", '#AAAAAA'
self.update_field(panel, drone_id, 'mode', mode, mode_color)
self.update_field(panel, drone_id, 'armed', arm_text, arm_color)
-
- # 更新總覽頁面欄位
self.update_overview_table(drone_id, 'mode', mode)
self.update_overview_table(drone_id, 'armed', arm_text)
elif msg_type == 'battery':
voltage = data.get('voltage', 16)
-
- # 判斷電池節數
cells = round(voltage / 3.95)
-
- # 計算電量百分比
- if cells == 0:
- percentage = 0
- else:
- percentage = (voltage / cells - 3.7) / 0.5 * 100
-
- # 根據百分比設置顏色
- if percentage < 20:
- voltage_color = '#FF6464' # 紅色 (低電量)
- elif percentage < 50:
- voltage_color = '#FFA500' # 橘色 (中低電量)
- else:
- voltage_color = '#FFFFFF' # 白色 (正常電量)
-
+ percentage = (voltage / cells - 3.7) / 0.5 * 100 if cells > 0 else 0
+ if percentage < 20: voltage_color = '#FF6464'
+ elif percentage < 50: voltage_color = '#FFA500'
+ else: voltage_color = '#FFFFFF'
percentage = data.get('percentage', percentage)
-
- # 顯示百分比與電壓(分開欄位)
- pct_text = f"{percentage:.0f}%"
- vol = f"{voltage:.2f}V"
- cells_text = f"{cells}S"
-
- self.update_field(panel, drone_id, 'battery_pct', pct_text, voltage_color)
- self.update_field(panel, drone_id, 'battery_vol', vol)
- self.update_field(panel, drone_id, 'battery_cells', cells_text)
- # 維持 overview table 的 battery 欄位為電壓
- self.update_overview_table(drone_id, 'battery', vol)
+ self.update_field(panel, drone_id, 'battery_pct', f"{percentage:.0f}%", voltage_color)
+ self.update_field(panel, drone_id, 'battery_vol', f"{voltage:.2f}V")
+ self.update_field(panel, drone_id, 'battery_cells', f"{cells}S")
+ self.update_overview_table(drone_id, 'battery', f"{voltage:.2f}V")
elif msg_type == 'gps':
- # 仍然儲存 GPS 到內部監控,但不在面板上顯示經緯度欄位
lat, lon = data.get('lat', 0), data.get('lon', 0)
self.drone_positions[drone_id] = (lat, lon)
alt = data.get('alt', 0)
if not hasattr(self.monitor, 'drone_gps'):
self.monitor.drone_gps = {}
- self.monitor.drone_gps[drone_id] = {
- 'lat': lat,
- 'lon': lon,
- 'alt': alt
- }
- # 更新 overview table 的經緯度
+ self.monitor.drone_gps[drone_id] = {'lat': lat, 'lon': lon, 'alt': alt}
self.update_overview_table(drone_id, 'latitude', f"{lat:.6f}°")
self.update_overview_table(drone_id, 'longitude', f"{lon:.6f}°")
-
- # 更新地圖上的無人機位置(地圖仍需經緯度)
heading = self.drone_headings.get(drone_id, 0)
self.drone_map.update_drone_position(drone_id, lat, lon, heading)
@@ -666,19 +557,11 @@ class ControlStationUI(QMainWindow):
self.update_overview_table(drone_id, 'altitude', text)
elif msg_type == 'local_pose':
- # 更新 local 座標並顯示在 overview table
- x = data.get('x', 0)
- y = data.get('y', 0)
- z = data.get('z', 0)
+ x, y = data.get('x', 0), data.get('y', 0)
if not hasattr(self.monitor, 'drone_local'):
self.monitor.drone_local = {}
- self.monitor.drone_local[drone_id] = {
- 'x': x,
- 'y': y
- }
- # 更新 overview table 的位置欄位 (只顯示 x, y)
- local_text = f"{x:.1f}, {y:.1f}"
- self.update_overview_table(drone_id, 'local', local_text)
+ self.monitor.drone_local[drone_id] = {'x': x, 'y': y}
+ self.update_overview_table(drone_id, 'local', f"{x:.1f}, {y:.1f}")
elif msg_type == 'hud':
heading = data.get('heading')
@@ -690,30 +573,11 @@ class ControlStationUI(QMainWindow):
climb = data.get('climb')
heading_text = f"{heading:.1f}°"
- if isinstance(groundspeed, (int, float)):
- groundspeed_text = f"{groundspeed:.1f} m/s"
- else:
- groundspeed_text = "--"
-
- if isinstance(airspeed, (int, float)):
- airspeed_text = f"{airspeed:.1f} m/s"
- else:
- airspeed_text = "--"
-
- if isinstance(throttle, (int, float)):
- throttle_text = f"{throttle:.0f}%"
- else:
- throttle_text = "--"
-
- if isinstance(hud_alt, (int, float)):
- hud_alt_text = f"{hud_alt:.1f} m"
- else:
- hud_alt_text = "--"
-
- if isinstance(climb, (int, float)):
- climb_text = f"{climb:.1f} m/s"
- else:
- climb_text = "--"
+ groundspeed_text = f"{groundspeed:.1f} m/s" if isinstance(groundspeed, (int, float)) else "--"
+ airspeed_text = f"{airspeed:.1f} m/s" if isinstance(airspeed, (int, float)) else "--"
+ throttle_text = f"{throttle:.0f}%" if isinstance(throttle, (int, float)) else "--"
+ hud_alt_text = f"{hud_alt:.1f} m" if isinstance(hud_alt, (int, float)) else "--"
+ climb_text = f"{climb:.1f} m/s" if isinstance(climb, (int, float)) else "--"
self.update_field(panel, drone_id, 'heading', heading_text)
self.update_field(panel, drone_id, 'groundspeed', groundspeed_text)
@@ -725,141 +589,81 @@ class ControlStationUI(QMainWindow):
self.update_overview_table(drone_id, 'hud_alt', hud_alt_text)
self.update_overview_table(drone_id, 'climb', climb_text)
- # 更新態度指示器的航向(如果有roll/pitch數據,下面的attitude訊息會更新)
if panel and hasattr(panel, 'attitude_indicator') and panel.attitude_indicator:
- if not hasattr(panel, '_last_roll'):
- panel._last_roll = 0
- if not hasattr(panel, '_last_pitch'):
- panel._last_pitch = 0
+ if not hasattr(panel, '_last_roll'): panel._last_roll = 0
+ if not hasattr(panel, '_last_pitch'): panel._last_pitch = 0
panel.attitude_indicator.update_attitude(heading, panel._last_roll, panel._last_pitch)
- # 如果有位置資訊,也更新地圖上的航向
if drone_id in self.drone_positions:
lat, lon = self.drone_positions[drone_id]
self.drone_map.update_drone_position(drone_id, lat, lon, heading)
elif msg_type == 'loss_rate':
- loss_rate = data.get('loss_rate', 0)
- text = f"{loss_rate:.1f}%"
+ text = f"{data.get('loss_rate', 0):.1f}%"
self.update_field(panel, drone_id, 'loss_rate', text)
self.update_overview_table(drone_id, 'loss_rate', text)
elif msg_type == 'ping':
- ping = data.get('ping', 0)
- text = f"{ping:.1f} ms"
+ text = f"{data.get('ping', 0):.1f} ms"
self.update_field(panel, drone_id, 'ping', text)
self.update_overview_table(drone_id, 'ping', text)
elif msg_type == 'velocity':
- text = f"{data['vx']:.1f}, {data['vy']:.1f}"
- self.update_overview_table(drone_id, 'velocity', text)
+ self.update_overview_table(drone_id, 'velocity', f"{data['vx']:.1f}, {data['vy']:.1f}")
elif msg_type == 'attitude':
- roll = data.get('roll', 0)
- pitch = data.get('pitch', 0)
- yaw = data.get('yaw', 0)
- roll_text = f"{roll:.1f}°"
- pitch_text = f"{pitch:.1f}°"
- yaw_text = f"{yaw:.1f}°"
- self.update_overview_table(drone_id, 'roll', roll_text)
- self.update_overview_table(drone_id, 'pitch', pitch_text)
- self.update_overview_table(drone_id, 'yaw', yaw_text)
-
- # 儲存roll/pitch供attitude指示器使用,並使用現存的航向
+ roll, pitch, yaw = data.get('roll', 0), data.get('pitch', 0), data.get('yaw', 0)
+ self.update_overview_table(drone_id, 'roll', f"{roll:.1f}°")
+ self.update_overview_table(drone_id, 'pitch', f"{pitch:.1f}°")
+ self.update_overview_table(drone_id, 'yaw', f"{yaw:.1f}°")
if panel:
panel._last_roll = roll
panel._last_pitch = pitch
-
- # 更新態度指示器(使用現存航向或預設0)
if panel and hasattr(panel, 'update_attitude'):
heading = self.drone_headings.get(drone_id, 0)
panel.update_attitude(heading, roll, pitch)
- # 新增處理分組勾選的方法
+ # ================================================================================
+ # 勾選管理
+ # ================================================================================
+
def handle_group_selection(self, socket_id, state):
- """處理 socket 分組勾選狀態變化"""
- # 獲取該分組下的所有無人機
- group_drones = [did for did in self.drones.keys()
- if self.get_socket_id(did) == socket_id]
-
- # 根據分組勾選狀態更新所有該分組的無人機勾選狀態
+ group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id]
is_checked = state == Qt.CheckState.Checked.value
-
for drone_id in group_drones:
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
- # 暫時斷開信號連接,避免遞迴觸發
checkbox.blockSignals(True)
checkbox.setChecked(is_checked)
checkbox.blockSignals(False)
-
- # 手動更新選中集合
- if is_checked:
- self.monitor.selected_drones.add(drone_id)
- else:
- self.monitor.selected_drones.discard(drone_id)
+ if is_checked: self.monitor.selected_drones.add(drone_id)
+ else: self.monitor.selected_drones.discard(drone_id)
def handle_drone_selection(self, drone_id, state):
- """處理個別無人機勾選狀態變化"""
if state == Qt.CheckState.Checked.value:
self.monitor.selected_drones.add(drone_id)
else:
self.monitor.selected_drones.discard(drone_id)
+ self.update_group_checkbox_state(self.get_socket_id(drone_id))
- # 更新對應 socket 分組的勾選狀態
- socket_id = self.get_socket_id(drone_id)
- self.update_group_checkbox_state(socket_id)
-
- # 新增更新分組勾選框狀態的方法
def update_group_checkbox_state(self, socket_id):
- """更新指定 socket 分組的勾選框狀態"""
- # 獲取該分組下的所有無人機
- group_drones = [did for did in self.drones.keys()
- if self.get_socket_id(did) == socket_id]
-
- if not group_drones:
- return
-
- # 檢查該分組內有多少無人機被勾選
- checked_count = sum(1 for did in group_drones
- if self.drones[did].is_checked())
-
- # 獲取分組勾選框
+ group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id]
+ if not group_drones: return
+ checked_count = sum(1 for did in group_drones if self.drones[did].is_checked())
if socket_id in self.socket_groups:
group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox")
if group_checkbox:
- # 暫時斷開信號連接
group_checkbox.blockSignals(True)
-
- if checked_count == 0:
- # 沒有無人機被勾選
- group_checkbox.setCheckState(Qt.CheckState.Unchecked)
- elif checked_count == len(group_drones):
- # 所有無人機都被勾選
- group_checkbox.setCheckState(Qt.CheckState.Checked)
- else:
- # 部分無人機被勾選,顯示半勾選狀態
- group_checkbox.setCheckState(Qt.CheckState.PartiallyChecked)
-
- # 重新連接信號
+ if checked_count == 0: group_checkbox.setCheckState(Qt.CheckState.Unchecked)
+ elif checked_count == len(group_drones): group_checkbox.setCheckState(Qt.CheckState.Checked)
+ else: group_checkbox.setCheckState(Qt.CheckState.PartiallyChecked)
group_checkbox.blockSignals(False)
def handle_select_all(self):
- """處理全選按鈕 - 支援分組結構"""
- # 檢查是否所有無人機都已被選中
- all_selected = all(
- self.drones[drone_id].is_checked()
- for drone_id in self.drones
- )
-
- # 如果全部已選中,則取消全選;否則全選
+ all_selected = all(self.drones[did].is_checked() for did in self.drones)
new_state = not all_selected
-
- # 更新所有勾選框狀態(無人機和分組)
for drone_id in self.drones:
self.drones[drone_id].set_checked(new_state)
-
- # 更新所有分組勾選框狀態
for socket_id in self.socket_groups:
group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox")
if group_checkbox:
@@ -867,110 +671,369 @@ class ControlStationUI(QMainWindow):
group_checkbox.setChecked(new_state)
group_checkbox.blockSignals(False)
- def handle_arm_selected(self):
- """處理批次解鎖"""
- loop = asyncio.get_event_loop()
- for drone_id in self.monitor.selected_drones:
- future = self.monitor.arm_drone(drone_id, True)
- loop.create_task(self.handle_service_response(future, f"批次解鎖 {drone_id}"))
+ def handle_drone_clicked(self, drone_id):
+ if drone_id in self.drones:
+ checkbox = self.drones[drone_id].get_checkbox()
+ checkbox.setChecked(not checkbox.isChecked())
+
+ def handle_clear_all_drone_selection(self):
+ for drone_id in self.drones.keys():
+ checkbox = self.drones[drone_id].get_checkbox()
+ if checkbox:
+ checkbox.blockSignals(True)
+ checkbox.setChecked(False)
+ checkbox.blockSignals(False)
+ self.monitor.selected_drones.clear()
+ for socket_id in self.socket_groups.keys():
+ self.update_group_checkbox_state(socket_id)
+
+ def handle_toggle_select_all_drones(self):
+ all_selected = all(self.drones[did].get_checkbox().isChecked() for did in self.drones.keys())
+ if all_selected:
+ for drone_id in self.drones.keys():
+ checkbox = self.drones[drone_id].get_checkbox()
+ if checkbox:
+ checkbox.blockSignals(True)
+ checkbox.setChecked(False)
+ checkbox.blockSignals(False)
+ self.monitor.selected_drones.clear()
+ else:
+ for drone_id in self.drones.keys():
+ checkbox = self.drones[drone_id].get_checkbox()
+ if checkbox:
+ checkbox.blockSignals(True)
+ checkbox.setChecked(True)
+ checkbox.blockSignals(False)
+ self.monitor.selected_drones.add(drone_id)
+ for socket_id in self.socket_groups.keys():
+ self.update_group_checkbox_state(socket_id)
- def handle_takeoff_selected(self):
- """處理批次起飛"""
- loop = asyncio.get_event_loop()
- for drone_id in self.monitor.selected_drones:
- future = self.monitor.takeoff_drone(drone_id, 10.0)
- loop.create_task(self.handle_service_response(future, f"批次起飛 {drone_id}"))
+ # ================================================================================
+ # 任務模式切換
+ # ================================================================================
+
+ def on_mission_mode_changed(self, mode):
+ self.current_mission_mode = mode
+ mode_names = {
+ 'M_FORMATION': '列隊飛行',
+ 'CIRCLE_FORMATION': '環狀包圍',
+ 'LEADER_FOLLOWER': '跟隨模式',
+ 'GRID_SWEEP': '柵狀偵查'
+ }
+ display_name = mode_names.get(mode, mode)
+ self.statusBar().showMessage(f"🔄 任務模式: {display_name}", 3000)
+ print(f"任務模式切換: {mode}")
+
+ # ================================================================================
+ # 任務規劃 — 點擊地圖 (M-Formation / Circle)
+ # ================================================================================
- def handle_batch_mode_change(self):
- mode = self.mode_combo.currentText()
- loop = asyncio.get_event_loop()
- for drone_id in self.monitor.selected_drones:
- future = self.monitor.set_mode(drone_id, mode)
- loop.create_task(self.handle_service_response(future, f"{drone_id} 切換模式 {mode}"))
+ def handle_map_click(self, lat, lon):
+ """處理地圖點擊事件 — 根據選單模式規劃"""
+ print(f"地圖點擊位置: {lat:.6f}, {lon:.6f} (模式: {self.current_mission_mode})")
+
+ # Grid Sweep 和 Leader-Follower 由各自的觸發方式處理,點擊地圖不處理
+ mode_map = {
+ 'M_FORMATION': MissionType.M_FORMATION,
+ 'CIRCLE_FORMATION': MissionType.CIRCLE_FORMATION,
+ }
+ mission_type = mode_map.get(self.current_mission_mode)
+ if mission_type is None:
+ # Grid Sweep / Leader-Follower 模式下點擊地圖不處理
+ return
+
+ selected_drones = self.get_selected_drones()
+ if len(selected_drones) == 0:
+ self.statusBar().showMessage("⚠ 請先選擇無人機(勾選 checkbox)", 3000)
+ return
+
+ base_alt = 10.0
+ target_gps = (lat, lon, base_alt)
+ self.statusBar().showMessage(f"⏳ 正在規劃 {self.current_mission_mode} ({len(selected_drones)} 台) ...", 2000)
+
+ try:
+ drone_gps_positions = self._collect_drone_gps(selected_drones, base_alt)
+ if drone_gps_positions is None: return
+
+ waypoints_per_drone, center_origin = self.mission_planner.plan_formation_mission(
+ drone_gps_positions, target_gps, mission_type
+ )
+
+ self.planned_waypoints = {'drone_ids': selected_drones, 'waypoints': waypoints_per_drone}
+ self.show_planned_waypoints()
+
+ center_lat, center_lon, _ = center_origin
+ self.drone_map.draw_mission_plan(center_lat, center_lon, lat, lon)
+
+ self._launch_verification(
+ self.current_mission_mode, drone_gps_positions, selected_drones,
+ waypoints_per_drone, center_origin, target_gps=target_gps
+ )
+
+ total_wps = sum(len(wps) for wps in waypoints_per_drone)
+ self.statusBar().showMessage(
+ f"✓ {self.current_mission_mode} 規劃完成!{len(selected_drones)} 台,共 {total_wps} 個航點", 5000
+ )
+ except Exception as e:
+ self.statusBar().showMessage(f"❌ 規劃失敗: {str(e)}", 5000)
+ import traceback
+ traceback.print_exc()
+
+ # ================================================================================
+ # 任務規劃 — 矩形選取 (Grid Sweep)
+ # ================================================================================
+
+ def handle_rectangle_selected(self, points_json):
+ print(f"矩形選取: {points_json}")
+ selected_drones = self.get_selected_drones()
+ if len(selected_drones) == 0:
+ self.statusBar().showMessage("⚠ 請先選擇無人機再框選區域", 3000)
+ return
+ try:
+ rect_corners = [(p[0], p[1]) for p in json.loads(points_json)]
+ except (json.JSONDecodeError, IndexError):
+ self.statusBar().showMessage("❌ 矩形座標解析失敗", 3000)
+ return
+
+ base_alt = 10.0
+ self.statusBar().showMessage(f"⏳ 正在規劃 Grid Sweep ({len(selected_drones)} 台) ...", 2000)
+ try:
+ drone_gps_positions = self._collect_drone_gps(selected_drones, base_alt)
+ if drone_gps_positions is None: return
+
+ target_lat = sum(c[0] for c in rect_corners) / 4
+ target_lon = sum(c[1] for c in rect_corners) / 4
+ target_gps = (target_lat, target_lon, base_alt)
+
+ waypoints_per_drone, center_origin = self.mission_planner.plan_formation_mission(
+ drone_gps_positions, target_gps, MissionType.GRID_SWEEP,
+ params={'rect_corners': rect_corners, 'line_spacing': 5.0, 'altitude': base_alt}
+ )
+
+ self.planned_waypoints = {'drone_ids': selected_drones, 'waypoints': waypoints_per_drone}
+ self.show_planned_waypoints()
+
+ center_lat, center_lon, _ = center_origin
+ self.drone_map.draw_mission_plan(center_lat, center_lon, target_lat, target_lon)
+ self._launch_verification(
+ 'grid_sweep', drone_gps_positions, selected_drones,
+ waypoints_per_drone, center_origin, rect_corners=rect_corners
+ )
+
+ total_wps = sum(len(wps) for wps in waypoints_per_drone)
+ self.statusBar().showMessage(
+ f"✓ Grid Sweep 規劃完成!{len(selected_drones)} 台,共 {total_wps} 個航點", 5000
+ )
+ except Exception as e:
+ self.statusBar().showMessage(f"❌ Grid Sweep 規劃失敗: {str(e)}", 5000)
+ import traceback
+ traceback.print_exc()
+
+ # ================================================================================
+ # 任務規劃 — 路徑確認 (Leader-Follower 跟隨模式)
+ # ================================================================================
+
+ def handle_route_confirmed(self, points_json):
+ """路徑確認 → Leader-Follower 任務規劃"""
+ print(f"路徑確認: {points_json}")
+
+ selected_drones = self.get_selected_drones()
+ if len(selected_drones) == 0:
+ self.statusBar().showMessage("⚠ 請先選擇無人機再標記路徑", 3000)
+ return
+
+ try:
+ route_points = json.loads(points_json) # [[lat, lon], ...]
+ route_waypoints = [(p[0], p[1]) for p in route_points]
+ except (json.JSONDecodeError, IndexError):
+ self.statusBar().showMessage("❌ 路徑座標解析失敗", 3000)
+ return
+
+ if len(route_waypoints) < 2:
+ self.statusBar().showMessage("⚠ 至少需要 2 個路徑點", 3000)
+ return
+
+ base_alt = 10.0
+ self.statusBar().showMessage(f"⏳ 正在規劃跟隨模式 ({len(selected_drones)} 台, {len(route_waypoints)} 個路徑點) ...", 2000)
+
+ try:
+ drone_gps_positions = self._collect_drone_gps(selected_drones, base_alt)
+ if drone_gps_positions is None:
+ return
+
+ # 目標點 = 路徑中心(供 origin 計算)
+ target_lat = sum(p[0] for p in route_waypoints) / len(route_waypoints)
+ target_lon = sum(p[1] for p in route_waypoints) / len(route_waypoints)
+ target_gps = (target_lat, target_lon, base_alt)
+
+ waypoints_per_drone, center_origin = self.mission_planner.plan_formation_mission(
+ drone_gps_positions,
+ target_gps,
+ MissionType.LEADER_FOLLOWER,
+ params={
+ 'route_waypoints': route_waypoints,
+ 'lateral_offset': 3.0,
+ 'longitudinal_spacing': 5.0,
+ 'altitude': base_alt
+ }
+ )
+
+ self.planned_waypoints = {'drone_ids': selected_drones, 'waypoints': waypoints_per_drone}
+ self.show_planned_waypoints()
+
+ center_lat, center_lon, _ = center_origin
+ self.drone_map.draw_mission_plan(center_lat, center_lon, target_lat, target_lon)
+
+ # 啟動視覺化驗證
+ self._launch_verification(
+ 'LEADER_FOLLOWER', drone_gps_positions, selected_drones,
+ waypoints_per_drone, center_origin,
+ target_gps=target_gps, route_waypoints=route_waypoints
+ )
+
+ total_wps = sum(len(wps) for wps in waypoints_per_drone)
+ self.statusBar().showMessage(
+ f"✓ 跟隨模式規劃完成!{len(selected_drones)} 台,{len(route_waypoints)} 個路徑點,共 {total_wps} 個航點", 5000
+ )
+ except Exception as e:
+ self.statusBar().showMessage(f"❌ 跟隨模式規劃失敗: {str(e)}", 5000)
+ import traceback
+ traceback.print_exc()
+
+ # ================================================================================
+ # 任務執行控制
+ # ================================================================================
+
+ def handle_start_mission(self, center_lat, center_lon, target_lat, target_lon):
+ if self.planned_waypoints is None:
+ self.statusBar().showMessage("⚠ 請先規劃任務", 3000)
+ return
+ self.mission_executor.start(self.planned_waypoints)
+ self.statusBar().showMessage("🚀 任務已啟動", 3000)
+
+ def handle_pause_mission(self):
+ if self.mission_executor.state.value == "running":
+ self.mission_executor.pause()
+ self.statusBar().showMessage("⏸ 任務已暫停", 3000)
+ elif self.mission_executor.state.value == "paused":
+ self.mission_executor.resume()
+ self.statusBar().showMessage("▶ 任務已恢復", 3000)
+
+ def on_drone_waypoint_reached(self, drone_id, wp_index, total):
+ if wp_index >= total:
+ self.statusBar().showMessage(f"📍 {drone_id} 完成所有航點", 3000)
+ else:
+ self.statusBar().showMessage(f"📍 {drone_id} → WP {wp_index}/{total}", 2000)
+
+ def on_mission_completed(self):
+ self.statusBar().showMessage("✅ 所有無人機已完成任務", 5000)
+
+ # ================================================================================
+ # 輔助方法
+ # ================================================================================
+
+ def _collect_drone_gps(self, selected_drones, base_alt):
+ drone_gps_positions = []
+ for drone_id in selected_drones:
+ if hasattr(self.monitor, 'drone_gps') and drone_id in self.monitor.drone_gps:
+ gps_data = self.monitor.drone_gps[drone_id]
+ drone_gps_positions.append((gps_data['lat'], gps_data['lon'], gps_data.get('alt', base_alt)))
+ elif drone_id in self.drone_positions:
+ pos = self.drone_positions[drone_id]
+ lat_drone = 24.0 + pos[1] / 111000
+ lon_drone = 120.0 + pos[0] / (111000 * 0.9)
+ alt_drone = pos[2] if len(pos) > 2 else base_alt
+ drone_gps_positions.append((lat_drone, lon_drone, alt_drone))
+ else:
+ self.statusBar().showMessage(f"⚠ 找不到 {drone_id} 的位置資料", 3000)
+ return None
+ return drone_gps_positions
+
+ def _launch_verification(self, mission_type, drone_gps_positions,
+ selected_drones, waypoints_per_drone, origin,
+ target_gps=None, rect_corners=None, route_waypoints=None):
+ """存 JSON + 啟動 matplotlib 視覺化驗證 (獨立 process)"""
+ import os
+ data = {
+ 'mission_type': mission_type,
+ 'drone_ids': selected_drones,
+ 'drones_gps': drone_gps_positions,
+ 'waypoints': waypoints_per_drone,
+ 'origin': list(origin),
+ }
+ if target_gps:
+ data['target'] = list(target_gps)
+ if rect_corners:
+ data['rect_corners'] = rect_corners
+ if route_waypoints:
+ data['route_waypoints'] = route_waypoints
+
+ json_path = '/tmp/mission_plan.json'
+ with open(json_path, 'w') as f:
+ json.dump(data, f, indent=2)
+
+ script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'validation', 'verify_waypoints.py')
+ subprocess.Popen([sys.executable, script_path, '--file', json_path])
+ print(f"驗證視窗已啟動: {json_path}")
+
+ def show_planned_waypoints(self):
+ if not self.planned_waypoints: return
+ print("\n" + "=" * 60)
+ print("任務規劃結果")
+ print("=" * 60)
+ drone_ids = self.planned_waypoints['drone_ids']
+ waypoints = self.planned_waypoints['waypoints']
+ print(f"\n共 {len(drone_ids)} 台無人機")
+ for i, drone_id in enumerate(drone_ids):
+ wps = waypoints[i]
+ print(f"\n【{drone_id}】({len(wps)} 個航點)")
+ for j, wp in enumerate(wps):
+ print(f" WP{j}: ({wp[0]:.6f}°, {wp[1]:.6f}°, {wp[2]:.1f}m)")
+ print("\n" + "=" * 60)
+
+ def get_selected_drones(self):
+ return [did for did, panel in self.drones.items() if hasattr(panel, 'checkbox') and panel.checkbox.isChecked()]
def update_field(self, panel, drone_id, field, text, color=None):
- """更新面板中的指定欄位"""
if isinstance(panel, DronePanel):
panel.update_field(field, text, color)
def update_overview_table(self, drone_id=None, field=None, value=None):
- """更新總覽表格"""
- if not hasattr(self, 'overview_table') or self.overview_table is None:
- return
-
- # 更新無人機引用
+ if not hasattr(self, 'overview_table') or self.overview_table is None: return
self.overview_table.set_drones(self.drones)
- # 委託給 OverviewTable 處理
self.overview_table.update_table(drone_id, field, value)
def get_socket_id(self, drone_id):
- """從 drone_id 提取 socket_id (例如 's0_1' -> '0')"""
import re
match = re.match(r's(\d+)_(\d+)', drone_id)
- if not match:
- return 'unknown'
-
- return match.group(1)
+ return match.group(1) if match else 'unknown'
def add_drone(self, drone_id):
- if drone_id in self.drones:
- return
-
- # 獲取 socket_id
+ if drone_id in self.drones: return
socket_id = self.get_socket_id(drone_id)
-
- # 創建無人機面板
panel = self.create_drone_panel(drone_id)
self.drones[drone_id] = panel
-
- # 如果該 socket 分組不存在,創建它
if socket_id not in self.socket_groups:
- group_panel = self.create_socket_group_panel(socket_id)
- self.socket_groups[socket_id] = group_panel
-
- # 將無人機面板添加到對應的 socket 分組中
- group_panel = self.socket_groups[socket_id]
- group_panel.drones_layout.addWidget(panel)
-
- # 重新排序並顯示所有 socket 分組
+ self.socket_groups[socket_id] = self.create_socket_group_panel(socket_id)
+ self.socket_groups[socket_id].drones_layout.addWidget(panel)
self.reorganize_socket_groups()
-
- # 更新分組勾選框狀態
self.update_group_checkbox_state(socket_id)
-
- # 更新總覽表
self.update_overview_table()
def reorganize_socket_groups(self):
- """重新組織和排序 socket 分組"""
- # 先清空主佈局
while self.drone_panel_layout.count():
w = self.drone_panel_layout.takeAt(0).widget()
- if w:
- w.setParent(None)
-
- # 對每個 socket 分組內的無人機進行排序
+ if w: w.setParent(None)
for socket_id, group_panel in self.socket_groups.items():
- # 獲取該分組內的所有無人機
- group_drones = [did for did in self.drones.keys()
- if self.get_socket_id(did) == socket_id]
-
- # 清空分組佈局
+ group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id]
while group_panel.drones_layout.count():
w = group_panel.drones_layout.takeAt(0).widget()
- if w:
- w.setParent(None)
-
- # 對該分組內的無人機按數字排序
+ if w: w.setParent(None)
def sort_key(x):
parts = x[1:].split('_')
return (int(parts[0]), int(parts[1]))
-
- # 重新添加排序後的無人機面板
for did in sorted(group_drones, key=sort_key):
group_panel.drones_layout.addWidget(self.drones[did])
-
- # 按 socket_id 數字順序添加分組到主佈局
for socket_id in sorted(self.socket_groups.keys(), key=lambda x: int(x)):
self.drone_panel_layout.addWidget(self.socket_groups[socket_id])
@@ -984,231 +1047,17 @@ class ControlStationUI(QMainWindow):
print(f"ROS spin error: {e}")
def closeEvent(self, event):
- # 停止 UDP 接收器
+ self.mission_executor.stop()
+ self.command_sender.close()
for receiver in self.udp_receivers:
receiver.stop()
-
- # 停止 WebSocket 接收器
for receiver in self.monitor.ws_receivers:
receiver.stop()
-
self.monitor.destroy_node()
self.executor.shutdown()
rclpy.shutdown()
event.accept()
- # ================================================================================
- # 【新增】以下方法是任務規劃功能
- # ================================================================================
-
- def handle_map_click(self, lat, lon):
- """
- 處理地圖點擊事件,自動規劃隊形任務
-
- Args:
- lat: 緯度
- lon: 經度
- """
- print(f"地圖點擊位置: {lat:.6f}, {lon:.6f}")
-
- selected_drones = self.get_selected_drones()
-
- if len(selected_drones) == 0:
- self.statusBar().showMessage("⚠ 請先選擇無人機(勾選 checkbox)", 3000)
- return
-
- # 設定目標點
- base_alt = 10.0 # 基準高度
- target_gps = (lat, lon, base_alt)
-
- self.statusBar().showMessage(f"⏳ 正在規劃 {len(selected_drones)} 台無人機的任務...", 2000)
-
- try:
- # 取得當前無人機位置(GPS 座標)
- drone_gps_positions = []
-
- for drone_id in selected_drones:
- # 檢查是否有 GPS 資料
- if hasattr(self.monitor, 'drone_gps') and drone_id in self.monitor.drone_gps:
- # 使用實際的 GPS 資料
- gps_data = self.monitor.drone_gps[drone_id]
- lat_drone = gps_data['lat']
- lon_drone = gps_data['lon']
- alt_drone = gps_data.get('alt', base_alt)
- drone_gps_positions.append((lat_drone, lon_drone, alt_drone))
- print(f"使用實際 GPS: {drone_id} -> ({lat_drone:.6f}, {lon_drone:.6f}, {alt_drone:.1f})")
-
- elif drone_id in self.drone_positions:
- # 備用方案:從 Local 座標轉換
- pos = self.drone_positions[drone_id]
- # 使用簡化轉換
- lat_drone = 24.0 + pos[1] / 111000
- lon_drone = 120.0 + pos[0] / (111000 * 0.9)
- alt_drone = pos[2] if len(pos) > 2 else base_alt
- drone_gps_positions.append((lat_drone, lon_drone, alt_drone))
- print(f"使用 Local 轉換: {drone_id} -> ({lat_drone:.6f}, {lon_drone:.6f}, {alt_drone:.1f})")
-
- else:
- self.statusBar().showMessage(f"⚠ 找不到 {drone_id} 的位置資料", 3000)
- return
-
- # 規劃任務
- # ================================================================================
- # 【修改】接收中心點座標
- # ================================================================================
- stage1_gps, stage2_gps, center_origin = self.mission_planner.plan_formation_mission(
- drone_gps_positions,
- target_gps
- )
- # ================================================================================
-
- # 儲存規劃結果
- self.planned_waypoints = {
- 'stage1': stage1_gps,
- 'stage2': stage2_gps,
- 'drone_ids': selected_drones
- }
-
- # 顯示規劃結果
- self.show_planned_waypoints()
-
- # ================================================================================
- # 【新增】在地圖上顯示任務規劃(中心點 + 虛線)
- # ================================================================================
- center_lat, center_lon, center_alt = center_origin
- self.drone_map.draw_mission_plan(
- center_lat, center_lon, # 中心點座標
- lat, lon # 目標點座標
- )
- # ================================================================================
-
- self.statusBar().showMessage(
- f"✓ 任務規劃完成!{len(selected_drones)} 台無人機,2 階段飛行", 5000
- )
-
- except Exception as e:
- self.statusBar().showMessage(f"❌ 規劃失敗: {str(e)}", 5000)
- print(f"Mission planning error: {e}")
- import traceback
- traceback.print_exc()
-
- def handle_drone_clicked(self, drone_id):
- """
- 處理地圖上無人機被點擊事件,切換該無人機的選取狀態
-
- Args:
- drone_id: 無人機 ID
- """
- print(f"地圖上點擊無人機: {drone_id}")
-
- if drone_id in self.drones:
- panel = self.drones[drone_id]
- checkbox = panel.get_checkbox()
- # 切換勾選狀態
- checkbox.setChecked(not checkbox.isChecked())
-
- def handle_clear_all_drone_selection(self):
- """清除所有無人機的勾選狀態"""
- print("清除所有無人機選擇")
-
- for drone_id in self.drones.keys():
- checkbox = self.drones[drone_id].get_checkbox()
- if checkbox:
- checkbox.blockSignals(True)
- checkbox.setChecked(False)
- checkbox.blockSignals(False)
-
- # 清除選中集合
- self.monitor.selected_drones.clear()
-
- # 更新所有分組的勾選框狀態
- for socket_id in self.socket_groups.keys():
- self.update_group_checkbox_state(socket_id)
-
- def handle_toggle_select_all_drones(self):
- """切換全選/取消全選所有無人機"""
- # 檢查是否已經全選
- all_selected = all(self.drones[drone_id].get_checkbox().isChecked()
- for drone_id in self.drones.keys())
-
- if all_selected:
- # 已全選,則取消全選
- print("取消全選所有無人機")
- for drone_id in self.drones.keys():
- checkbox = self.drones[drone_id].get_checkbox()
- if checkbox:
- checkbox.blockSignals(True)
- checkbox.setChecked(False)
- checkbox.blockSignals(False)
- self.monitor.selected_drones.clear()
- else:
- # 未全選,則全選
- print("全選所有無人機")
- for drone_id in self.drones.keys():
- checkbox = self.drones[drone_id].get_checkbox()
- if checkbox:
- checkbox.blockSignals(True)
- checkbox.setChecked(True)
- checkbox.blockSignals(False)
- self.monitor.selected_drones.add(drone_id)
-
- # 更新所有分組的勾選框狀態
- for socket_id in self.socket_groups.keys():
- self.update_group_checkbox_state(socket_id)
-
- def show_planned_waypoints(self):
- """
- 顯示規劃的航點(在終端輸出)
- """
- if not self.planned_waypoints:
- return
-
- print("\n" + "=" * 60)
- print("任務規劃結果")
- print("=" * 60)
-
- drone_ids = self.planned_waypoints['drone_ids']
- stage1 = self.planned_waypoints['stage1']
- stage2 = self.planned_waypoints['stage2']
-
- print(f"\n共 {len(drone_ids)} 台無人機")
- print(f"參數:間距={self.mission_planner.spacing}m, "
- f"基準高度={self.mission_planner.base_altitude}m, "
- f"高低差={self.mission_planner.altitude_diff}m")
-
- for i, drone_id in enumerate(drone_ids):
- print(f"\n【{drone_id}】")
- lat1, lon1, alt1 = stage1[i]
- lat2, lon2, alt2 = stage2[i]
-
- print(f" 階段 1(集合點):")
- print(f" 緯度: {lat1:.6f}°")
- print(f" 經度: {lon1:.6f}°")
- print(f" 高度: {alt1:.1f} m")
-
- print(f" 階段 2(目標點):")
- print(f" 緯度: {lat2:.6f}°")
- print(f" 經度: {lon2:.6f}°")
- print(f" 高度: {alt2:.1f} m")
-
- print("\n" + "=" * 60)
-
- def get_selected_drones(self):
- """
- 取得被選中的無人機 ID 列表
-
- Returns:
- list: 被選中的無人機 ID 列表
- """
- selected = []
- for drone_id, panel in self.drones.items():
- if hasattr(panel, 'checkbox') and panel.checkbox.isChecked():
- selected.append(drone_id)
- return selected
-
- # ================================================================================
- # 【新增結束】
- # ================================================================================
if __name__ == '__main__':
app = QApplication(sys.argv)
diff --git a/src/GUI/map_layout.py b/src/GUI/map_layout.py
index bd6d5c0..0e0a252 100644
--- a/src/GUI/map_layout.py
+++ b/src/GUI/map_layout.py
@@ -122,7 +122,8 @@ class DroneMap:
margin-bottom: 10px;
padding-bottom: 10px;
border-bottom: 1px solid #ddd;
- } .selection-button-blue {
+ }
+ .selection-button-blue {
width: 100%;
padding: 8px;
background-color: #64B5F6;
@@ -139,10 +140,11 @@ class DroneMap:
}
.selection-button-blue.active {
background-color: #1976D2;
- } .selection-button-blue {
+ }
+ .selection-button {
width: 100%;
padding: 8px;
- background-color: #64B5F6;
+ background-color: #F06A61;
color: white;
border: none;
border-radius: 4px;
@@ -151,16 +153,16 @@ class DroneMap:
font-weight: bold;
transition: background-color 0.2s;
}
- .selection-button-blue:hover {
- background-color: #42A5F5;
+ .selection-button:hover {
+ background-color: #E53935;
}
- .selection-button-blue.active {
- background-color: #1976D2;
+ .selection-button.active {
+ background-color: #D32F2F;
}
- .selection-button {
+ .confirm-route-button {
width: 100%;
padding: 8px;
- background-color: #F06A61;
+ background-color: #66BB6A;
color: white;
border: none;
border-radius: 4px;
@@ -169,11 +171,8 @@ class DroneMap:
font-weight: bold;
transition: background-color 0.2s;
}
- .selection-button:hover {
- background-color: #E53935;
- }
- .selection-button.active {
- background-color: #D32F2F;
+ .confirm-route-button:hover {
+ background-color: #4CAF50;
}
@@ -181,8 +180,17 @@ class DroneMap:
-
-
+
+
+
+
+
+
@@ -229,8 +237,10 @@ class DroneMap:
// 地圖點擊事件
map.on('click', function(e) {
if (selectionMode === 'polygon') {
- // 多點選擇模式:添加點
addPolygonPoint(e.latlng.lat, e.latlng.lng);
+ } else if (selectionMode === 'route') {
+ // 跟隨模式:添加路徑點
+ addRoutePoint(e.latlng.lat, e.latlng.lng);
} else if (!selectionMode) {
// 正常模式:發送GPS信號
if (bridge) {
@@ -264,7 +274,6 @@ class DroneMap:
}
if (isDrawing && (selectionMode === 'rect' || selectionMode === 'drones') && drawStartPoint) {
- // 更新臨時矩形
if (tempRectangle) {
selectionLayer.removeLayer(tempRectangle);
}
@@ -286,7 +295,6 @@ class DroneMap:
drawStartPoint = null;
}
- // 重置拖移狀態
clickStartPos = null;
setTimeout(function() {
mapDragStarted = false;
@@ -309,22 +317,22 @@ class DroneMap:
`,
iconSize: [30, 30],
- iconAnchor: [15, 15] // ★ 箭頭往左上移,使 ID 顯示在右下
+ iconAnchor: [15, 15]
});
}
function getColorBySocketId(id) {
- if (id.startsWith("s0_")) return "#00BFFF"; // 天藍色
- if (id.startsWith("s1_")) return "#FFD700"; // 金色
- if (id.startsWith("s2_")) return "#FF6969"; // 淺紅色
- if (id.startsWith("s3_")) return "#FF69B4"; // 熱粉紅
- if (id.startsWith("s4_")) return "#00FA9A"; // 中春綠
- if (id.startsWith("s5_")) return "#9370DB"; // 中紫色 (串口)
- if (id.startsWith("s6_")) return "#FFA500"; // 橙色
- if (id.startsWith("s7_")) return "#20B2AA"; // 淺海綠
- if (id.startsWith("s8_")) return "#7CFC00"; // 草綠色
- if (id.startsWith("s9_")) return "#FF8C00"; // 深橙色
- return "#AAAAAA"; // 灰色 (預設)
+ if (id.startsWith("s0_")) return "#00BFFF";
+ if (id.startsWith("s1_")) return "#FFD700";
+ if (id.startsWith("s2_")) return "#FF6969";
+ if (id.startsWith("s3_")) return "#FF69B4";
+ if (id.startsWith("s4_")) return "#00FA9A";
+ if (id.startsWith("s5_")) return "#9370DB";
+ if (id.startsWith("s6_")) return "#FFA500";
+ if (id.startsWith("s7_")) return "#20B2AA";
+ if (id.startsWith("s8_")) return "#7CFC00";
+ if (id.startsWith("s9_")) return "#FF8C00";
+ return "#AAAAAA";
}
function createIdIcon(id) {
@@ -345,7 +353,7 @@ class DroneMap:
text-shadow: 1px 1px 2px rgba(255,255,255,0.8), -1px -1px 2px rgba(255,255,255,0.8);
">${sysid}
`,
iconSize: [16, 16],
- iconAnchor: [8, 6] // ★ icon 中心 = 經緯度點
+ iconAnchor: [8, 6]
});
}
@@ -358,25 +366,31 @@ class DroneMap:
var trajectoryGroup = L.layerGroup().addTo(map);
// ================================================================================
- // 【新增】任務規劃視覺化變數
+ // 任務規劃視覺化變數
// ================================================================================
- var missionPlanGroup = L.layerGroup().addTo(map); // 任務規劃圖層
- var centerMarker = null; // 中心點標記
- var targetMarker = null; // 目標點標記
- var missionLine = null; // 中心點到目標點的虛線
- var centerPosition = null; // 中心點經緯度
- var targetPosition = null; // 目標點經緯度
+ var missionPlanGroup = L.layerGroup().addTo(map);
+ var centerMarker = null;
+ var targetMarker = null;
+ var missionLine = null;
+ var centerPosition = null;
+ var targetPosition = null;
// 選擇工具變量
- var selectionMode = null; // 'drones', 'rect', 'polygon', null
+ var selectionMode = null; // 'drones', 'rect', 'polygon', 'route', null
var selectionLayer = L.layerGroup().addTo(map);
- var polygonPoints = []; // 多點選擇的點
- var polygonMarkers = []; // 多點選擇的標記
- var tempRectangle = null; // 臨時矩形
- var isDrawing = false; // 是否正在繪製
- var drawStartPoint = null; // 繪製起點
- var mapDragStarted = false; // 地圖是否正在拖移
- var clickStartPos = null; // 記錄點擊開始位置
+ var polygonPoints = [];
+ var polygonMarkers = [];
+ var tempRectangle = null;
+ var isDrawing = false;
+ var drawStartPoint = null;
+ var mapDragStarted = false;
+ var clickStartPos = null;
+
+ // 路徑標記變量 (跟隨模式用)
+ var routePoints = [];
+ var routeMarkers = [];
+ var routePolyline = null;
+ var routeLayer = L.layerGroup().addTo(map);
// ================================================================================
// 更新任務信息面板
@@ -397,7 +411,6 @@ class DroneMap:
targetElem.textContent = '未設定';
}
- // 只有當中心點和目標點都設定時才啟用按鈕
if (centerPosition && targetPosition) {
startBtn.disabled = false;
} else {
@@ -409,7 +422,6 @@ class DroneMap:
// 選擇工具函數
// ================================================================================
function toggleSelectAllDrones() {
- // 切換全選/取消全選無人機
if (bridge) {
bridge.toggleSelectAllDrones();
console.log('切換全選無人機');
@@ -429,34 +441,18 @@ class DroneMap:
}
}
- function toggleRectSelection() {
- clearSelectionMode();
- if (selectionMode === 'rect') {
- selectionMode = null;
- document.getElementById('select-rect-btn').classList.remove('active');
- map.dragging.enable();
- } else {
- selectionMode = 'rect';
- document.getElementById('select-rect-btn').classList.add('active');
- map.dragging.disable();
- }
- }
-
function togglePolygonSelection() {
if (selectionMode === 'polygon') {
- // 第二次點擊:完成選擇
if (polygonPoints.length >= 3) {
finishPolygonSelection();
} else {
alert('至少需要3個點來形成區域');
- // 清除並重置
clearSelectionMode();
clearPolygonPoints();
selectionMode = null;
document.getElementById('select-polygon-btn').classList.remove('active');
}
} else {
- // 第一次點擊:清除上次的點位並啟動模式
clearSelectionMode();
clearPolygonPoints();
selectionMode = 'polygon';
@@ -466,23 +462,25 @@ class DroneMap:
}
function clearSelectionMode() {
- // 清除所有按鈕的激活狀態
document.getElementById('select-drones-btn').classList.remove('active');
- document.getElementById('select-rect-btn').classList.remove('active');
document.getElementById('select-polygon-btn').classList.remove('active');
- // 清除選擇圖層
selectionLayer.clearLayers();
tempRectangle = null;
- // 重新啟用地圖拖曳
+ // 不清除 routeLayer(由 clearRoutePoints 單獨管理)
+
map.dragging.enable();
+
+ // 如果離開 route 模式,重置 selectionMode
+ if (selectionMode !== 'route') {
+ selectionMode = null;
+ }
}
function addPolygonPoint(lat, lng) {
polygonPoints.push([lat, lng]);
- // 添加邊界點標記
var marker = L.circleMarker([lat, lng], {
radius: 5,
color: '#FF6B6B',
@@ -491,7 +489,6 @@ class DroneMap:
}).addTo(selectionLayer);
polygonMarkers.push(marker);
- // 如果有多個點,繪製連線
if (polygonPoints.length > 1) {
L.polyline(polygonPoints, {
color: '#FF6B6B',
@@ -516,7 +513,6 @@ class DroneMap:
return;
}
- // 繪製最終多邊形
var polygon = L.polygon(polygonPoints, {
color: '#FF6B6B',
fillColor: '#FF6B6B',
@@ -524,14 +520,12 @@ class DroneMap:
weight: 2
}).addTo(selectionLayer);
- // 發送多邊形數據到Python
if (bridge) {
var pointsStr = JSON.stringify(polygonPoints);
bridge.polygonSelected(pointsStr);
console.log('多邊形選擇完成:', polygonPoints);
}
- // 重置狀態
selectionMode = null;
document.getElementById('select-polygon-btn').classList.remove('active');
map.dragging.enable();
@@ -540,14 +534,12 @@ class DroneMap:
function finishRectSelection(bounds) {
var selectedDrones = [];
- // 清除臨時矩形
if (tempRectangle) {
selectionLayer.removeLayer(tempRectangle);
tempRectangle = null;
}
if (selectionMode === 'drones') {
- // 框選無人機模式:先清除全選
if (bridge) {
bridge.clearAllDroneSelection();
}
@@ -556,17 +548,14 @@ class DroneMap:
var pos = markers[droneId].getLatLng();
if (bounds.contains(pos)) {
selectedDrones.push(droneId);
- // 觸發無人機點擊信號
if (bridge) {
bridge.emitDroneClicked(droneId);
}
}
});
console.log('框選無人機:', selectedDrones);
- // 不保留選擇框,直接完成
} else if (selectionMode === 'rect') {
- // 框選區域模式
var rectPoints = [
[bounds.getNorth(), bounds.getWest()],
[bounds.getNorth(), bounds.getEast()],
@@ -574,7 +563,6 @@ class DroneMap:
[bounds.getSouth(), bounds.getWest()]
];
- // 繪製最終矩形
L.rectangle(bounds, {
color: '#FF6B6B',
fillColor: '#FF6B6B',
@@ -582,7 +570,6 @@ class DroneMap:
weight: 2
}).addTo(selectionLayer);
- // 添加四個角的標記
rectPoints.forEach(point => {
L.circleMarker(point, {
radius: 5,
@@ -592,7 +579,6 @@ class DroneMap:
}).addTo(selectionLayer);
});
- // 發送矩形數據到Python
if (bridge) {
var pointsStr = JSON.stringify(rectPoints);
bridge.rectangleSelected(pointsStr);
@@ -603,8 +589,90 @@ class DroneMap:
// 重置狀態
selectionMode = null;
document.getElementById('select-drones-btn').classList.remove('active');
- document.getElementById('select-rect-btn').classList.remove('active');
map.dragging.enable();
+
+ // 如果仍在 Grid Sweep 模式,重新進入框選
+ var currentMode = document.getElementById('mission-mode-select').value;
+ if (currentMode === 'GRID_SWEEP') {
+ setTimeout(function() {
+ selectionMode = 'rect';
+ map.dragging.disable();
+ console.log('Grid Sweep: 重新進入框選模式');
+ }, 500);
+ }
+ }
+
+ // ================================================================================
+ // 路徑標記函數 (跟隨模式用)
+ // ================================================================================
+ function addRoutePoint(lat, lng) {
+ routePoints.push([lat, lng]);
+ var idx = routePoints.length;
+
+ // 添加編號標記
+ var icon = L.divIcon({
+ className: 'route-point',
+ html: '' + idx + '
',
+ iconSize: [22, 22],
+ iconAnchor: [11, 11]
+ });
+
+ var marker = L.marker([lat, lng], {
+ icon: icon,
+ zIndexOffset: 3000
+ }).addTo(routeLayer);
+ routeMarkers.push(marker);
+
+ // 更新連線
+ if (routePolyline) {
+ routeLayer.removeLayer(routePolyline);
+ }
+ if (routePoints.length > 1) {
+ routePolyline = L.polyline(routePoints, {
+ color: '#FF5722',
+ weight: 2.5,
+ opacity: 0.8,
+ dashArray: '8, 6'
+ }).addTo(routeLayer);
+ }
+
+ console.log('添加路徑點 #' + idx + ':', lat, lng);
+ }
+
+ function clearRoutePoints() {
+ routePoints = [];
+ routeMarkers = [];
+ if (routePolyline) {
+ routeLayer.removeLayer(routePolyline);
+ routePolyline = null;
+ }
+ routeLayer.clearLayers();
+ }
+
+ function confirmRoute() {
+ if (routePoints.length < 2) {
+ alert('至少需要 2 個路徑點');
+ return;
+ }
+
+ if (bridge) {
+ var pointsStr = JSON.stringify(routePoints);
+ bridge.routeConfirmed(pointsStr);
+ console.log('路徑確認:', routePoints.length, '個點');
+ }
}
// ================================================================================
@@ -631,6 +699,36 @@ class DroneMap:
console.log('暫停任務');
}
}
+
+ // ================================================================================
+ // 任務模式切換
+ // ================================================================================
+ function onMissionModeChanged(mode) {
+ // 先清除選擇狀態
+ selectionMode = null;
+ clearSelectionMode();
+ clearRoutePoints();
+
+ // 隱藏確認路徑按鈕
+ document.getElementById('confirm-route-btn').style.display = 'none';
+
+ if (mode === 'GRID_SWEEP') {
+ // 自動進入框選模式
+ selectionMode = 'rect';
+ map.dragging.disable();
+ console.log('Grid Sweep: 進入框選模式');
+ } else if (mode === 'LEADER_FOLLOWER') {
+ // 進入路徑標記模式
+ selectionMode = 'route';
+ document.getElementById('confirm-route-btn').style.display = 'block';
+ console.log('跟隨模式: 進入路徑標記模式,點擊地圖添加路徑點');
+ }
+
+ if (bridge) {
+ bridge.missionModeChanged(mode);
+ console.log('任務模式切換:', mode);
+ }
+ }
// ================================================================================
function initTrajectory(id) {
@@ -696,7 +794,7 @@ class DroneMap:
})
.on('click', function () {
if (mapDragStarted) {
- return; // 如果是拖移地圖,不處理點擊
+ return;
}
if (bridge) {
bridge.emitDroneClicked(id);
@@ -711,7 +809,7 @@ class DroneMap:
})
.on('click', function() {
if (mapDragStarted) {
- return; // 如果是拖移地圖,不處理點擊
+ return;
}
if (bridge) {
bridge.emitDroneClicked(id);
@@ -722,7 +820,7 @@ class DroneMap:
if (!initialized || id < focusedId) {
focusedId = id;
- map.setView([lat, lon], 19); // 第一台無人機:重置並放大到最大
+ map.setView([lat, lon], 19);
initialized = true;
}
}
@@ -738,18 +836,15 @@ class DroneMap:
}
// ================================================================================
- // 【新增】任務規劃視覺化函式
+ // 任務規劃視覺化函式
// ================================================================================
function drawMissionPlan(centerLat, centerLon, targetLat, targetLon) {
- // 清除舊的任務規劃標記
clearMissionPlan();
- // 保存中心點和目標點位置
centerPosition = {lat: centerLat, lng: centerLon};
targetPosition = {lat: targetLat, lng: targetLon};
updateMissionInfo();
- // 繪製中心點標記 "C"(縮小到 20px)
var centerIcon = L.divIcon({
className: 'mission-center',
html: `= len(self.waypoints):
+ return None
+ return self.waypoints[self.wp_index]
+
+ @property
+ def total_waypoints(self):
+ return len(self.waypoints)
+
+
+class MissionExecutor(QObject):
+ """
+ 任務執行器
+
+ planned_waypoints 格式:
+ {
+ 'drone_ids': ['s0_1', 's0_2', ...],
+ 'waypoints': [
+ [(lat,lon,alt), ...], # drone 0
+ [(lat,lon,alt), ...], # drone 1
+ ]
+ }
+ """
+
+ # 信號
+ drone_waypoint_reached = pyqtSignal(str, int, int) # (drone_id, wp_index, total)
+ mission_completed = pyqtSignal()
+
+ def __init__(self, sender, drone_gps,
+ arrival_radius=2.0, send_rate_hz=2.0):
+ super().__init__()
+ self.sender = sender
+ self.drone_gps = drone_gps
+ self.arrival_radius = arrival_radius
+ self.state = MissionState.IDLE
+ self.tasks = {}
+
+ self._interval_ms = int(1000 / send_rate_hz)
+ self._timer = QTimer()
+ self._timer.timeout.connect(self._tick)
+
+ # ------------------------------------------------------------------ 公開方法
+
+ def start(self, planned_waypoints):
+ """啟動任務"""
+ if self.state == MissionState.RUNNING:
+ print("任務已在執行中")
+ return
+
+ self.tasks.clear()
+
+ drone_ids = planned_waypoints['drone_ids']
+ waypoints_list = planned_waypoints['waypoints']
+
+ for i, drone_id in enumerate(drone_ids):
+ sysid = int(drone_id.split('_')[1])
+ self.tasks[drone_id] = DroneTask(drone_id, sysid, waypoints_list[i])
+
+ self.state = MissionState.RUNNING
+ self._timer.start(self._interval_ms)
+
+ total_wps = sum(t.total_waypoints for t in self.tasks.values())
+ print(f"任務啟動: {len(self.tasks)} 架無人機, "
+ f"共 {total_wps} 個航點, "
+ f"到達半徑={self.arrival_radius}m, "
+ f"發送週期={self._interval_ms}ms")
+
+ def pause(self):
+ """暫停任務"""
+ if self.state == MissionState.RUNNING:
+ self._timer.stop()
+ self.state = MissionState.PAUSED
+ print("任務暫停")
+
+ def resume(self):
+ """恢復任務"""
+ if self.state == MissionState.PAUSED:
+ self._timer.start(self._interval_ms)
+ self.state = MissionState.RUNNING
+ print("任務恢復")
+
+ def stop(self):
+ """停止任務"""
+ self._timer.stop()
+ self.tasks.clear()
+ self.state = MissionState.IDLE
+ print("任務停止")
+
+ # ------------------------------------------------------------------ 控制迴圈
+
+ def _tick(self):
+ """控制迴圈"""
+ all_done = True
+
+ for task in self.tasks.values():
+ if task.done:
+ continue
+
+ all_done = False
+ target = task.current_target
+ if target is None:
+ continue
+
+ # 讀取當前 GPS
+ gps = self.drone_gps.get(task.drone_id)
+ if gps is None:
+ continue
+
+ tgt_lat, tgt_lon, tgt_alt = target
+ distance = _haversine(gps['lat'], gps['lon'], tgt_lat, tgt_lon)
+
+ # 到達判定
+ if distance < self.arrival_radius:
+ task.wp_index += 1
+ if task.wp_index >= task.total_waypoints:
+ task.done = True
+ self.drone_waypoint_reached.emit(
+ task.drone_id, task.wp_index, task.total_waypoints
+ )
+ print(f" {task.drone_id} → DONE "
+ f"({task.total_waypoints}/{task.total_waypoints})")
+ continue
+ else:
+ self.drone_waypoint_reached.emit(
+ task.drone_id, task.wp_index, task.total_waypoints
+ )
+ print(f" {task.drone_id} → WP {task.wp_index}/{task.total_waypoints} "
+ f"(距離: {distance:.1f}m)")
+ # 更新目標
+ tgt_lat, tgt_lon, tgt_alt = task.current_target
+
+ # 發送 setpoint
+ self.sender.send_position_global(
+ task.sysid, tgt_lat, tgt_lon, tgt_alt
+ )
+
+ # 全部完成檢查
+ if all_done and self.tasks:
+ self._timer.stop()
+ self.state = MissionState.IDLE
+ self.mission_completed.emit()
+ print("===== 任務全部完成 =====")
+
+
+# ------------------------------------------------------------------ 工具函式
+
+def _haversine(lat1, lon1, lat2, lon2):
+ """計算兩個 GPS 座標間的水平距離 (公尺)"""
+ R = 6371000.0
+ lat1_r = math.radians(lat1)
+ lat2_r = math.radians(lat2)
+ dlat = math.radians(lat2 - lat1)
+ dlon = math.radians(lon2 - lon1)
+
+ a = (math.sin(dlat / 2) ** 2 +
+ math.cos(lat1_r) * math.cos(lat2_r) * math.sin(dlon / 2) ** 2)
+ return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
\ No newline at end of file
diff --git a/src/GUI/mission_planner.py b/src/GUI/mission_planner.py
index 6a04f88..239529e 100644
--- a/src/GUI/mission_planner.py
+++ b/src/GUI/mission_planner.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""
飛行任務規劃模組
-新增:Circle Formation 和 Leader-Follower
+支援: M-Formation, Circle, Leader-Follower (Bezier 轉彎), Grid Sweep
"""
import math
from typing import List, Tuple, Optional, Dict, Any
@@ -13,166 +13,172 @@ class MissionType(Enum):
M_FORMATION = "m_formation"
CIRCLE_FORMATION = "circle_formation"
LEADER_FOLLOWER = "leader_follower"
+ GRID_SWEEP = "grid_sweep"
class CoordinateConverter:
"""GPS 座標與 Local 座標的轉換器"""
-
+
def __init__(self, origin_lat: float, origin_lon: float, origin_alt: float = 0):
self.origin_lat = origin_lat
self.origin_lon = origin_lon
self.origin_alt = origin_alt
self.R = 6371000.0
-
+
def gps_to_local(self, lat: float, lon: float, alt: float) -> Tuple[float, float, float]:
lat_rad = math.radians(lat)
lon_rad = math.radians(lon)
origin_lat_rad = math.radians(self.origin_lat)
origin_lon_rad = math.radians(self.origin_lon)
-
+
dlat = lat_rad - origin_lat_rad
dlon = lon_rad - origin_lon_rad
-
+
x = self.R * dlon * math.cos((lat_rad + origin_lat_rad) / 2)
y = self.R * dlat
z = alt - self.origin_alt
-
+
return x, y, z
-
+
def local_to_gps(self, x: float, y: float, z: float) -> Tuple[float, float, float]:
origin_lat_rad = math.radians(self.origin_lat)
origin_lon_rad = math.radians(self.origin_lon)
-
+
lat_rad = origin_lat_rad + (y / self.R)
lon_rad = origin_lon_rad + (x / (self.R * math.cos((lat_rad + origin_lat_rad) / 2)))
-
+
lat = math.degrees(lat_rad)
lon = math.degrees(lon_rad)
alt = self.origin_alt + z
-
+
return lat, lon, alt
class FormationPlanner:
"""隊形規劃器"""
-
- def __init__(self, spacing: float = 5.0,
- base_altitude: float = 10.0,
+
+ def __init__(self, spacing: float = 5.0,
+ base_altitude: float = 10.0,
altitude_diff: float = 2.0):
self.spacing = spacing
self.base_altitude = base_altitude
self.altitude_diff = altitude_diff
self.current_origin = None
self.converter = None
-
- def plan_formation_mission(self,
- drone_gps_positions: List[Tuple[float, float, float]],
- target_gps: Tuple[float, float, float],
- mission_type: MissionType = MissionType.M_FORMATION,
- params: Optional[Dict[str, Any]] = None) -> Tuple[
- List[Tuple[float, float, float]],
- List[Tuple[float, float, float]],
- Tuple[float, float, float]]:
- """
- 規劃兩階段隊形任務
-
- Args:
- drone_gps_positions: 當前無人機 GPS 位置列表
- target_gps: 目標點 GPS 座標
- mission_type: 任務類型
- params: 任務參數
-
- Returns:
- (stage1_gps, stage2_gps, origin)
- """
+
+ def plan_formation_mission(self,
+ drone_gps_positions: List[Tuple[float, float, float]],
+ target_gps: Tuple[float, float, float],
+ mission_type: MissionType = MissionType.M_FORMATION,
+ params: Optional[Dict[str, Any]] = None
+ ) -> Tuple[List[List[Tuple[float, float, float]]],
+ Tuple[float, float, float]]:
if len(drone_gps_positions) == 0:
raise ValueError("無人機位置列表不能為空")
-
- # 計算中央點
+
center_lat = sum(pos[0] for pos in drone_gps_positions) / len(drone_gps_positions)
center_lon = sum(pos[1] for pos in drone_gps_positions) / len(drone_gps_positions)
center_alt = sum(pos[2] for pos in drone_gps_positions) / len(drone_gps_positions)
-
+
self.current_origin = (center_lat, center_lon, center_alt)
self.converter = CoordinateConverter(center_lat, center_lon, center_alt)
-
- # 轉換到 Local 座標
+
drone_local = [self.converter.gps_to_local(*pos) for pos in drone_gps_positions]
target_local = self.converter.gps_to_local(*target_gps)
-
- # 計算隊形
+
if mission_type == MissionType.M_FORMATION:
- stage1_local, stage2_local = self._calculate_m_formation(drone_local, target_local, params)
+ s1, s2 = self._calculate_m_formation(drone_local, target_local, params)
+ waypoints_local = [[s1[i], s2[i]] for i in range(len(drone_local))]
+
elif mission_type == MissionType.CIRCLE_FORMATION:
- stage1_local, stage2_local = self._calculate_circle_formation(drone_local, target_local, params)
+ s1, s2 = self._calculate_circle_formation(drone_local, target_local, params)
+ waypoints_local = [[s1[i], s2[i]] for i in range(len(drone_local))]
+
elif mission_type == MissionType.LEADER_FOLLOWER:
- stage1_local, stage2_local = self._calculate_leader_follower(drone_local, target_local, params)
+ params = params or {}
+ route_wps_gps = params.get('route_waypoints')
+ if route_wps_gps is None or len(route_wps_gps) < 2:
+ raise ValueError("LEADER_FOLLOWER 需要至少 2 個路徑點")
+ route_wps_local = [
+ self.converter.gps_to_local(wp[0], wp[1], 0)[:2]
+ for wp in route_wps_gps
+ ]
+ waypoints_local = self._calculate_leader_follower(drone_local, route_wps_local, params)
+
+ elif mission_type == MissionType.GRID_SWEEP:
+ params = params or {}
+ rect_corners_gps = params.get('rect_corners')
+ if rect_corners_gps is None or len(rect_corners_gps) != 4:
+ raise ValueError("GRID_SWEEP 需要 4 個 GPS 角點")
+ rect_corners_local = [
+ self.converter.gps_to_local(c[0], c[1], 0)[:2]
+ for c in rect_corners_gps
+ ]
+ waypoints_local = self._calculate_grid_sweep(drone_local, rect_corners_local, params)
else:
raise ValueError(f"不支援的任務類型: {mission_type}")
-
- # 轉回 GPS
- stage1_gps = [self.converter.local_to_gps(*pos) for pos in stage1_local]
- stage2_gps = [self.converter.local_to_gps(*pos) for pos in stage2_local]
-
- return stage1_gps, stage2_gps, self.current_origin
-
+
+ waypoints_gps = []
+ for drone_wps in waypoints_local:
+ gps_wps = [self.converter.local_to_gps(*wp) for wp in drone_wps]
+ waypoints_gps.append(gps_wps)
+
+ return waypoints_gps, self.current_origin
+
+ # ------------------------------------------------------------------ M-Formation
+
def _calculate_m_formation(self, drone_positions, target_point, params):
- """M字形編隊(原本的邏輯)"""
params = params or {}
N = len(drone_positions)
spacing = params.get('spacing', self.spacing)
base_altitude = params.get('base_altitude', self.base_altitude)
altitude_diff = params.get('altitude_diff', self.altitude_diff)
-
+
C_x = sum(pos[0] for pos in drone_positions) / N
C_y = sum(pos[1] for pos in drone_positions) / N
- C_z = sum(pos[2] for pos in drone_positions) / N
-
+
T_x, T_y, T_z = target_point
V_x, V_y = T_x - C_x, T_y - C_y
P_x, P_y = -V_y, V_x
-
- length = math.sqrt(P_x**2 + P_y**2)
+
+ length = math.sqrt(P_x ** 2 + P_y ** 2)
P_x_unit, P_y_unit = (P_x / length, P_y / length) if length > 0.01 else (1.0, 0.0)
-
- projections = [((pos[0] - C_x) * P_x_unit + (pos[1] - C_y) * P_y_unit, i)
+
+ projections = [((pos[0] - C_x) * P_x_unit + (pos[1] - C_y) * P_y_unit, i)
for i, pos in enumerate(drone_positions)]
projections.sort()
-
+
stage1_positions = [None] * N
stage2_positions = [None] * N
-
+
for rank, (_, original_idx) in enumerate(projections):
offset = (rank - (N - 1) / 2) * spacing
altitude = base_altitude + (altitude_diff if rank % 2 == 0 else -altitude_diff)
-
stage1_positions[original_idx] = (C_x + P_x_unit * offset, C_y + P_y_unit * offset, altitude)
stage2_positions[original_idx] = (T_x + P_x_unit * offset, T_y + P_y_unit * offset, altitude)
-
+
return stage1_positions, stage2_positions
-
+
+ # ------------------------------------------------------------------ Circle
+
def _calculate_circle_formation(self, drone_positions, target_point, params):
- """
- 圓形編隊
- params: radius(10.0), altitude(10.0), start_angle(0.0)
- """
params = params or {}
N = len(drone_positions)
radius = params.get('radius', 10.0)
altitude = params.get('altitude', 10.0)
start_angle = params.get('start_angle', 0.0)
-
+
center_x, center_y, center_z = target_point
stage1_positions = []
stage2_positions = []
angle_step = 360.0 / N
-
+
for i in range(N):
angle_rad = math.radians(start_angle + angle_step * i)
final_x = center_x + radius * math.cos(angle_rad)
final_y = center_y + radius * math.sin(angle_rad)
final_z = altitude
-
+
current_x, current_y, current_z = drone_positions[i]
stage1_positions.append((
current_x + (final_x - current_x) * 0.5,
@@ -180,71 +186,353 @@ class FormationPlanner:
current_z + (final_z - current_z) * 0.5
))
stage2_positions.append((final_x, final_y, final_z))
-
+
return stage1_positions, stage2_positions
-
- def _calculate_leader_follower(self, drone_positions, target_point, params):
+
+ # ------------------------------------------------------------------ 路徑跟隨 (Bezier 轉彎)
+
+ def _calculate_leader_follower(self, drone_positions, route_wps_local, params):
"""
- Leader-Follower 編隊
- params: follow_distance(15.0), altitude_offset(1.0)
+ 路徑跟隨編隊 — Bezier 曲線轉彎版
+
+ 步驟:
+ 1. _build_center_path: 在轉折 WP 處用二次 Bezier 曲線平滑轉彎
+ 2. 固定排序,每架無人機沿中心路徑套用橫向+縱向偏移
+
+ 隊形 (俯視):
+ | (前進方向) |
+ | D1 | ← 左偏移, 後 0m
+ | D2 | ← 右偏移, 後 5m
+ | D3 | ← 左偏移, 後 10m
+ | D4 | ← 右偏移, 後 15m
"""
- params = params or {}
N = len(drone_positions)
- follow_distance = params.get('follow_distance', 15.0) # 預設 15m
- altitude_offset = params.get('altitude_offset', 1.0)
-
- leader_x, leader_y, leader_z = drone_positions[0]
- target_x, target_y, target_z = target_point
-
- direction_x = target_x - leader_x
- direction_y = target_y - leader_y
- direction_length = math.sqrt(direction_x**2 + direction_y**2)
-
- dir_unit_x, dir_unit_y = (direction_x / direction_length, direction_y / direction_length) \
- if direction_length > 0.01 else (0.0, 1.0)
-
- stage1_positions = []
- stage2_positions = []
-
- # Leader
- leader_stage1 = (leader_x + direction_x * 0.3, leader_y + direction_y * 0.3, target_z)
- stage1_positions.append(leader_stage1)
- stage2_positions.append(target_point)
-
- # Followers
- for i in range(1, N):
- back_offset = follow_distance * i
- height_offset = altitude_offset * (i % 2)
-
- current_x, current_y, current_z = drone_positions[i]
- follow1_x = leader_stage1[0] - dir_unit_x * back_offset
- follow1_y = leader_stage1[1] - dir_unit_y * back_offset
- follow1_z = leader_stage1[2] + height_offset
-
- stage1_positions.append((
- current_x + (follow1_x - current_x) * 0.5,
- current_y + (follow1_y - current_y) * 0.5,
- current_z + (follow1_z - current_z) * 0.5
- ))
-
- stage2_positions.append((
- target_x - dir_unit_x * back_offset,
- target_y - dir_unit_y * back_offset,
- target_z + height_offset
- ))
-
- return stage1_positions, stage2_positions
+ lateral_offset = params.get('lateral_offset', 3.0)
+ longitudinal_spacing = params.get('longitudinal_spacing', 5.0)
+ altitude = params.get('altitude', self.base_altitude)
+ turn_margin = params.get('turn_margin', 0.35) # 轉彎切入距離佔段長比例
+ curve_resolution = params.get('curve_resolution', 8) # 每個彎道的插值點數
+
+ # Step 1: 建立帶 Bezier 轉彎的中心路徑
+ center_path = self._build_center_path(
+ route_wps_local, turn_margin, curve_resolution
+ )
+
+ # Step 2: 固定排序
+ first_dir = (center_path[0][2], center_path[0][3])
+ first_perp = (-first_dir[1], first_dir[0])
+ C_x = sum(p[0] for p in drone_positions) / N
+ C_y = sum(p[1] for p in drone_positions) / N
+
+ projections = [
+ ((pos[0] - C_x) * first_perp[0] + (pos[1] - C_y) * first_perp[1], i)
+ for i, pos in enumerate(drone_positions)
+ ]
+ projections.sort()
+
+ # Step 3: 偏移
+ all_waypoints = [None] * N
+
+ for rank, (_, original_idx) in enumerate(projections):
+ lat_sign = -1 if rank % 2 == 0 else 1
+ lat = lat_sign * lateral_offset
+ lon = rank * longitudinal_spacing
+
+ waypoints = []
+ for (cx, cy, dx, dy) in center_path:
+ perp_x, perp_y = -dy, dx
+ ox = cx + lat * perp_x - lon * dx
+ oy = cy + lat * perp_y - lon * dy
+ waypoints.append((ox, oy, altitude))
+
+ all_waypoints[original_idx] = waypoints
+ return all_waypoints
+ def _build_center_path(self, waypoints, turn_margin, curve_resolution):
+ """
+ 建立帶 Bezier 曲線轉彎的中心路徑
+
+ 在每個轉折 WP 處:
+ 1. 計算 pre_turn = WP - d_in × T
+ 2. 計算 post_turn = WP + d_out × T
+ 3. 用二次 Bezier 曲線: B(t) = (1-t)²·pre + 2t(1-t)·WP + t²·post
+ 4. 方向從導數得到: B'(t) = 2(1-t)(WP-pre) + 2t(post-WP)
+
+ Args:
+ waypoints: 路徑航點 [(x, y), ...]
+ turn_margin: 轉彎切入距離佔相鄰段長的比例 (0~0.5)
+ curve_resolution: 每個彎道的 Bezier 插值點數
+
+ Returns:
+ [(x, y, dir_x, dir_y), ...] 中心路徑點 + 單位方向
+ """
+ num_wps = len(waypoints)
+
+ if num_wps < 2:
+ return [(waypoints[0][0], waypoints[0][1], 0.0, 1.0)]
+
+ # 計算每段方向和長度
+ segments = []
+ for i in range(num_wps - 1):
+ dx = waypoints[i + 1][0] - waypoints[i][0]
+ dy = waypoints[i + 1][1] - waypoints[i][1]
+ length = math.sqrt(dx ** 2 + dy ** 2)
+ if length < 0.01:
+ segments.append((0.0, 1.0, length))
+ else:
+ segments.append((dx / length, dy / length, length))
+
+ path = []
+
+ # 第一個航點
+ path.append((waypoints[0][0], waypoints[0][1],
+ segments[0][0], segments[0][1]))
+
+ # 中間航點 (轉彎處)
+ for i in range(1, num_wps - 1):
+ d_in_x, d_in_y, len_in = segments[i - 1]
+ d_out_x, d_out_y, len_out = segments[i]
+
+ # 切入距離 T: 取相鄰兩段中較短的 × turn_margin
+ T = min(len_in, len_out) * turn_margin
+
+ if T < 0.5:
+ # 段太短,不插彎,直接加一個平均方向點
+ avg_dx = d_in_x + d_out_x
+ avg_dy = d_in_y + d_out_y
+ avg_len = math.sqrt(avg_dx ** 2 + avg_dy ** 2)
+ if avg_len > 0.01:
+ avg_dx /= avg_len
+ avg_dy /= avg_len
+ else:
+ avg_dx, avg_dy = d_in_x, d_in_y
+ path.append((waypoints[i][0], waypoints[i][1], avg_dx, avg_dy))
+ continue
+
+ # P0 = pre_turn (弧線起始)
+ p0_x = waypoints[i][0] - d_in_x * T
+ p0_y = waypoints[i][1] - d_in_y * T
+
+ # P1 = WP 本身 (控制點)
+ p1_x = waypoints[i][0]
+ p1_y = waypoints[i][1]
+
+ # P2 = post_turn (弧線結束)
+ p2_x = waypoints[i][0] + d_out_x * T
+ p2_y = waypoints[i][1] + d_out_y * T
+
+ # 加入 pre_turn 點 (方向 = incoming)
+ path.append((p0_x, p0_y, d_in_x, d_in_y))
+
+ # 加入 Bezier 插值點
+ for j in range(1, curve_resolution):
+ t = j / curve_resolution
+
+ # B(t) = (1-t)²·P0 + 2t(1-t)·P1 + t²·P2
+ one_minus_t = 1.0 - t
+ bx = one_minus_t * one_minus_t * p0_x + \
+ 2 * t * one_minus_t * p1_x + \
+ t * t * p2_x
+ by = one_minus_t * one_minus_t * p0_y + \
+ 2 * t * one_minus_t * p1_y + \
+ t * t * p2_y
+
+ # B'(t) = 2(1-t)(P1-P0) + 2t(P2-P1) → 切線方向
+ tdx = 2 * one_minus_t * (p1_x - p0_x) + 2 * t * (p2_x - p1_x)
+ tdy = 2 * one_minus_t * (p1_y - p0_y) + 2 * t * (p2_y - p1_y)
+
+ # 正規化
+ t_len = math.sqrt(tdx ** 2 + tdy ** 2)
+ if t_len > 0.01:
+ tdx /= t_len
+ tdy /= t_len
+ else:
+ tdx, tdy = d_in_x, d_in_y
+
+ path.append((bx, by, tdx, tdy))
+
+ # 加入 post_turn 點 (方向 = outgoing)
+ path.append((p2_x, p2_y, d_out_x, d_out_y))
+
+ # 最後一個航點
+ path.append((waypoints[-1][0], waypoints[-1][1],
+ segments[-1][0], segments[-1][1]))
+
+ return path
+
+ # ------------------------------------------------------------------ 柵狀掃描
+
+ def _calculate_grid_sweep(self, drone_positions, rect_corners_local, params):
+ """柵狀掃描:掃描方向沿矩形長邊,切割方向沿短邊"""
+ N = len(drone_positions)
+ line_spacing = params.get('line_spacing', 5.0)
+ altitude = params.get('altitude', self.base_altitude)
+
+ c0, c1, c2, c3 = rect_corners_local
+
+ edge_a = (c1[0] - c0[0], c1[1] - c0[1])
+ len_a = math.sqrt(edge_a[0] ** 2 + edge_a[1] ** 2)
+ edge_b = (c3[0] - c0[0], c3[1] - c0[1])
+ len_b = math.sqrt(edge_b[0] ** 2 + edge_b[1] ** 2)
+
+ if len_a >= len_b:
+ sweep_vec = edge_a
+ sweep_len = len_a
+ cut_vec = edge_b
+ cut_len = len_b
+ sweep_start_mid = ((c0[0] + c3[0]) / 2, (c0[1] + c3[1]) / 2)
+ sweep_end_mid = ((c1[0] + c2[0]) / 2, (c1[1] + c2[1]) / 2)
+ cut_start_corner = c0
+ else:
+ sweep_vec = edge_b
+ sweep_len = len_b
+ cut_vec = edge_a
+ cut_len = len_a
+ sweep_start_mid = ((c0[0] + c1[0]) / 2, (c0[1] + c1[1]) / 2)
+ sweep_end_mid = ((c3[0] + c2[0]) / 2, (c3[1] + c2[1]) / 2)
+ cut_start_corner = c0
+
+ sweep_dir = (sweep_vec[0] / sweep_len, sweep_vec[1] / sweep_len)
+ cut_dir = (cut_vec[0] / cut_len, cut_vec[1] / cut_len)
+
+ C_x = sum(p[0] for p in drone_positions) / N
+ C_y = sum(p[1] for p in drone_positions) / N
+
+ dist_to_start = math.sqrt(
+ (C_x - sweep_start_mid[0]) ** 2 + (C_y - sweep_start_mid[1]) ** 2
+ )
+ dist_to_end = math.sqrt(
+ (C_x - sweep_end_mid[0]) ** 2 + (C_y - sweep_end_mid[1]) ** 2
+ )
+
+ if dist_to_start <= dist_to_end:
+ near_corner_a = cut_start_corner
+ else:
+ sweep_dir = (-sweep_dir[0], -sweep_dir[1])
+ near_corner_a = (cut_start_corner[0] + sweep_vec[0],
+ cut_start_corner[1] + sweep_vec[1])
+
+ projections = [
+ ((pos[0] - C_x) * cut_dir[0] + (pos[1] - C_y) * cut_dir[1], i)
+ for i, pos in enumerate(drone_positions)
+ ]
+ projections.sort()
+
+ strip_width = cut_len / N
+ drone_sweep_proj = C_x * sweep_dir[0] + C_y * sweep_dir[1]
+ near_sweep_proj = near_corner_a[0] * sweep_dir[0] + near_corner_a[1] * sweep_dir[1]
+ gather_sweep_proj = (drone_sweep_proj + near_sweep_proj) / 2
+
+ all_waypoints = [None] * N
+
+ for rank, (_, original_idx) in enumerate(projections):
+ strip_center_offset = (rank + 0.5) * strip_width
+ base_x = near_corner_a[0] + cut_dir[0] * strip_center_offset
+ base_y = near_corner_a[1] + cut_dir[1] * strip_center_offset
+
+ waypoints_list = []
+
+ gather_offset = gather_sweep_proj - near_sweep_proj
+ gx = base_x + sweep_dir[0] * gather_offset
+ gy = base_y + sweep_dir[1] * gather_offset
+ waypoints_list.append((gx, gy, altitude))
+
+ num_lines = max(1, int(strip_width / line_spacing))
+ actual_spacing = strip_width / num_lines
+ first_line_offset = (rank * strip_width) + actual_spacing / 2
+
+ entry_x = near_corner_a[0] + cut_dir[0] * first_line_offset
+ entry_y = near_corner_a[1] + cut_dir[1] * first_line_offset
+ waypoints_list.append((entry_x, entry_y, altitude))
+
+ current_cut_offset = first_line_offset
+
+ for line_idx in range(num_lines):
+ line_near_x = near_corner_a[0] + cut_dir[0] * current_cut_offset
+ line_near_y = near_corner_a[1] + cut_dir[1] * current_cut_offset
+ line_far_x = line_near_x + sweep_dir[0] * sweep_len
+ line_far_y = line_near_y + sweep_dir[1] * sweep_len
+
+ if line_idx % 2 == 0:
+ waypoints_list.append((line_far_x, line_far_y, altitude))
+ else:
+ waypoints_list.append((line_near_x, line_near_y, altitude))
+
+ if line_idx < num_lines - 1:
+ next_cut_offset = current_cut_offset + actual_spacing
+ next_near_x = near_corner_a[0] + cut_dir[0] * next_cut_offset
+ next_near_y = near_corner_a[1] + cut_dir[1] * next_cut_offset
+ next_far_x = next_near_x + sweep_dir[0] * sweep_len
+ next_far_y = next_near_y + sweep_dir[1] * sweep_len
+
+ if line_idx % 2 == 0:
+ waypoints_list.append((next_far_x, next_far_y, altitude))
+ else:
+ waypoints_list.append((next_near_x, next_near_y, altitude))
+ current_cut_offset = next_cut_offset
+
+ all_waypoints[original_idx] = waypoints_list
+
+ return all_waypoints
+
+
+# ================================================================================
# 測試
+# ================================================================================
if __name__ == "__main__":
planner = FormationPlanner()
-
- drones = [(24.123450, 120.567800, 100.0), (24.123470, 120.567820, 102.0), (24.123440, 120.567810, 98.0)]
- target = (24.123600, 120.568000, 105.0)
-
- # Leader-Follower (15m 間距)
- s1, s2, origin = planner.plan_formation_mission(drones, target, MissionType.LEADER_FOLLOWER, {'follow_distance': 15.0})
- print("Leader-Follower (15m):")
- for i, pos in enumerate(s2):
- print(f" {'Leader' if i == 0 else f'Follower {i}'}: {pos[0]:.6f}, {pos[1]:.6f}")
\ No newline at end of file
+
+ drones = [
+ (24.123450, 120.567800, 0.0),
+ (24.123470, 120.567820, 0.0),
+ (24.123440, 120.567810, 0.0),
+ (24.123460, 120.567830, 0.0),
+ ]
+ target = (24.12400, 120.56795, 10.0)
+
+ # M-Formation
+ wps, origin = planner.plan_formation_mission(drones, target, MissionType.M_FORMATION)
+ print("M-Formation:")
+ for i, wp_list in enumerate(wps):
+ print(f" Drone {i}: {len(wp_list)} waypoints")
+
+ # Leader-Follower (Bezier 轉彎)
+ route = [
+ (24.12345, 120.56780),
+ (24.12370, 120.56800),
+ (24.12390, 120.56820),
+ (24.12400, 120.56800),
+ (24.12420, 120.56790),
+ ]
+ wps_lf, origin_lf = planner.plan_formation_mission(
+ drones, target, MissionType.LEADER_FOLLOWER,
+ params={
+ 'route_waypoints': route,
+ 'lateral_offset': 3.0,
+ 'longitudinal_spacing': 5.0,
+ 'turn_margin': 0.35,
+ 'curve_resolution': 8,
+ 'altitude': 10.0
+ }
+ )
+ print(f"\nLeader-Follower (Bezier turns):")
+ for i, wp_list in enumerate(wps_lf):
+ print(f" Drone {i}: {len(wp_list)} waypoints")
+ for j, wp in enumerate(wp_list):
+ print(f" WP{j}: ({wp[0]:.6f}, {wp[1]:.6f}, {wp[2]:.1f})")
+
+ # Grid Sweep
+ rect = [
+ (24.1237, 120.5679),
+ (24.1237, 120.5683),
+ (24.1240, 120.5683),
+ (24.1240, 120.5679)
+ ]
+ wps2, origin2 = planner.plan_formation_mission(
+ drones, target, MissionType.GRID_SWEEP,
+ params={'rect_corners': rect, 'line_spacing': 5.0, 'altitude': 10.0}
+ )
+ print(f"\nGrid Sweep:")
+ for i, wp_list in enumerate(wps2):
+ print(f" Drone {i}: {len(wp_list)} waypoints")
\ No newline at end of file
diff --git a/src/GUI/validation/test_mission.py b/src/GUI/validation/test_mission.py
new file mode 100644
index 0000000..22b6e22
--- /dev/null
+++ b/src/GUI/validation/test_mission.py
@@ -0,0 +1,299 @@
+#!/usr/bin/env python3
+"""
+獨立測試腳本 — 驗證 MissionExecutor + MavlinkSender 在 SITL 環境下的運作
+
+使用方式:
+ 1. 啟動 SITL
+ 2. 修改下方 CONFIG 區塊
+ 3. python3 test_mission.py
+"""
+import sys, os
+sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
+
+import time
+from pymavlink import mavutil
+from PyQt6.QtWidgets import QApplication
+from PyQt6.QtCore import QTimer
+
+from mission_planner import FormationPlanner, MissionType
+from command_sender import MavlinkSender
+from mission_executor import MissionExecutor, MissionState
+
+
+# ================================================================================
+# CONFIG
+# ================================================================================
+
+# 接收用連線 (讀取無人機狀態)
+RECV_CONNECTION = "udp:127.0.0.1:14550"
+
+# 發送用連線 (發送 setpoint 指令)
+SEND_CONNECTION = "udpout:127.0.0.1:14550"
+
+# 要控制的無人機 sysid 列表
+DRONE_SYSIDS = [1]
+
+# 起飛高度 (公尺)
+TAKEOFF_ALT = 10.0
+
+# 任務規劃參數
+FORMATION_SPACING = 5.0
+BASE_ALTITUDE = 10.0
+ALTITUDE_DIFF = 2.0
+ARRIVAL_RADIUS = 2.0
+
+# 測試模式: "formation" 或 "grid_sweep"
+TEST_MODE = "formation"
+
+# Grid Sweep 專用設定
+GRID_LINE_SPACING = 5.0
+
+# ================================================================================
+
+
+class SITLDroneManager:
+ """管理 SITL 無人機的連線、起飛前置作業"""
+
+ def __init__(self, connection_string, sysids):
+ self.connection_string = connection_string
+ self.sysids = sysids
+ self.mav = None
+ self.drone_gps = {}
+
+ def connect(self):
+ """建立 MAVLink 連線並等待心跳"""
+ print(f"連線到 {self.connection_string} ...")
+ self.mav = mavutil.mavlink_connection(self.connection_string)
+ self.mav.wait_heartbeat()
+ print(f"已收到心跳: sysid={self.mav.target_system}, compid={self.mav.target_component}")
+
+ def set_guided_and_arm(self, sysid):
+ """切換到 GUIDED 模式並解鎖"""
+ print(f"\n--- sysid={sysid}: 切換 GUIDED + 解鎖 ---")
+
+ # 切換 GUIDED 模式
+ self.mav.mav.command_long_send(
+ sysid, 1,
+ mavutil.mavlink.MAV_CMD_DO_SET_MODE,
+ 0,
+ mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
+ 4, # GUIDED = 4
+ 0, 0, 0, 0, 0
+ )
+ time.sleep(1)
+
+ # 解鎖
+ self.mav.mav.command_long_send(
+ sysid, 1,
+ mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
+ 0, 1, 0, 0, 0, 0, 0, 0
+ )
+ time.sleep(1)
+ print(f" sysid={sysid}: GUIDED + ARMED")
+
+ def takeoff(self, sysid, altitude):
+ """起飛到指定高度"""
+ print(f" sysid={sysid}: 起飛到 {altitude}m ...")
+ self.mav.mav.command_long_send(
+ sysid, 1,
+ mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
+ 0, 0, 0, 0, 0, 0, 0, altitude
+ )
+
+ def wait_for_altitude(self, sysid, target_alt, timeout=30):
+ """等待無人機到達指定高度"""
+ print(f" sysid={sysid}: 等待到達 {target_alt}m ...")
+ start = time.time()
+ while time.time() - start < timeout:
+ msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
+ if msg and msg.get_srcSystem() == sysid:
+ alt = msg.relative_alt / 1000.0
+ if alt >= target_alt * 0.9:
+ print(f" sysid={sysid}: 已到達 {alt:.1f}m")
+ return True
+ print(f" sysid={sysid}: 等待超時!")
+ return False
+
+ def update_gps_once(self):
+ """讀取一輪 GPS 資料更新 drone_gps"""
+ deadline = time.time() + 3
+ received = set()
+ while time.time() < deadline and len(received) < len(self.sysids):
+ msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
+ if msg is None:
+ continue
+ sid = msg.get_srcSystem()
+ if sid in self.sysids:
+ drone_id = f"s0_{sid}"
+ self.drone_gps[drone_id] = {
+ 'lat': msg.lat / 1e7,
+ 'lon': msg.lon / 1e7,
+ 'alt': msg.relative_alt / 1000.0
+ }
+ received.add(sid)
+
+ for sid in self.sysids:
+ drone_id = f"s0_{sid}"
+ if drone_id in self.drone_gps:
+ gps = self.drone_gps[drone_id]
+ print(f" {drone_id}: ({gps['lat']:.6f}, {gps['lon']:.6f}, {gps['alt']:.1f}m)")
+ else:
+ print(f" {drone_id}: 尚未收到 GPS")
+
+ def start_gps_polling(self, interval_ms=200):
+ """啟動定時 GPS 輪詢 (用 QTimer)"""
+ self._gps_timer = QTimer()
+ self._gps_timer.timeout.connect(self._poll_gps)
+ self._gps_timer.start(interval_ms)
+
+ def _poll_gps(self):
+ """非阻塞方式讀取最新 GPS"""
+ while True:
+ msg = self.mav.recv_match(type='GLOBAL_POSITION_INT', blocking=False)
+ if msg is None:
+ break
+ sid = msg.get_srcSystem()
+ if sid in self.sysids:
+ drone_id = f"s0_{sid}"
+ self.drone_gps[drone_id] = {
+ 'lat': msg.lat / 1e7,
+ 'lon': msg.lon / 1e7,
+ 'alt': msg.relative_alt / 1000.0
+ }
+
+
+def main():
+ # 建立 Qt 應用 (MissionExecutor 需要 QTimer)
+ app = QApplication(sys.argv)
+
+ # 連線 + 起飛前置作業
+ manager = SITLDroneManager(RECV_CONNECTION, DRONE_SYSIDS)
+ manager.connect()
+
+ for sysid in DRONE_SYSIDS:
+ manager.set_guided_and_arm(sysid)
+ manager.takeoff(sysid, TAKEOFF_ALT)
+
+ # 等待所有無人機到達起飛高度
+ for sysid in DRONE_SYSIDS:
+ manager.wait_for_altitude(sysid, TAKEOFF_ALT)
+
+ time.sleep(2)
+
+ # 讀取當前 GPS 位置
+ print("\n讀取當前 GPS 位置 ...")
+ manager.update_gps_once()
+
+ drone_ids = [f"s0_{sid}" for sid in DRONE_SYSIDS]
+ drone_gps_positions = []
+ for drone_id in drone_ids:
+ gps = manager.drone_gps.get(drone_id)
+ if gps is None:
+ print(f"錯誤: 讀不到 {drone_id} 的 GPS")
+ return
+ drone_gps_positions.append((gps['lat'], gps['lon'], gps['alt']))
+
+ # 規劃任務
+ print(f"\n規劃任務 (模式: {TEST_MODE}) ...")
+ planner = FormationPlanner(
+ spacing=FORMATION_SPACING,
+ base_altitude=BASE_ALTITUDE,
+ altitude_diff=ALTITUDE_DIFF
+ )
+
+ center_lat = drone_gps_positions[0][0]
+ center_lon = drone_gps_positions[0][1]
+
+ if TEST_MODE == "formation":
+ target_lat = center_lat + 30.0 / 111000.0
+ target_lon = center_lon
+ target_gps = (target_lat, target_lon, BASE_ALTITUDE)
+ print(f" 目標點: ({target_lat:.6f}, {target_lon:.6f})")
+
+ waypoints_per_drone, origin = planner.plan_formation_mission(
+ drone_gps_positions, target_gps, MissionType.M_FORMATION
+ )
+
+ elif TEST_MODE == "grid_sweep":
+ # 在無人機前方 30m 處建立 40m x 30m 的矩形
+ offset_lat = 30.0 / 111000.0
+ half_w = 20.0 / (111000.0 * 0.9)
+ half_h = 15.0 / 111000.0
+
+ rect_center_lat = center_lat + offset_lat
+ rect_center_lon = center_lon
+
+ rect_corners = [
+ (rect_center_lat - half_h, rect_center_lon - half_w),
+ (rect_center_lat - half_h, rect_center_lon + half_w),
+ (rect_center_lat + half_h, rect_center_lon + half_w),
+ (rect_center_lat + half_h, rect_center_lon - half_w),
+ ]
+ target_gps = (rect_center_lat, rect_center_lon, BASE_ALTITUDE)
+ print(f" 矩形中心: ({rect_center_lat:.6f}, {rect_center_lon:.6f})")
+
+ waypoints_per_drone, origin = planner.plan_formation_mission(
+ drone_gps_positions, target_gps, MissionType.GRID_SWEEP,
+ params={
+ 'rect_corners': rect_corners,
+ 'line_spacing': GRID_LINE_SPACING,
+ 'altitude': BASE_ALTITUDE
+ }
+ )
+ else:
+ print(f"未知測試模式: {TEST_MODE}")
+ return
+
+ planned_waypoints = {
+ 'drone_ids': drone_ids,
+ 'waypoints': waypoints_per_drone
+ }
+
+ # 印出規劃結果
+ for i, did in enumerate(drone_ids):
+ wps = waypoints_per_drone[i]
+ print(f" {did}: {len(wps)} 個航點")
+ for j, wp in enumerate(wps):
+ print(f" WP{j}: ({wp[0]:.6f}, {wp[1]:.6f}, {wp[2]:.1f}m)")
+
+ # 啟動任務
+ print("\n啟動任務 ...")
+ manager.start_gps_polling(interval_ms=200)
+
+ sender = MavlinkSender(SEND_CONNECTION)
+ executor = MissionExecutor(
+ sender=sender,
+ drone_gps=manager.drone_gps,
+ arrival_radius=ARRIVAL_RADIUS,
+ send_rate_hz=2.0
+ )
+
+ executor.drone_waypoint_reached.connect(
+ lambda did, idx, total: print(f"\n >> {did} 到達 WP {idx}/{total}")
+ )
+ executor.mission_completed.connect(
+ lambda: (print("\n===== 任務全部完成 ====="), app.quit())
+ )
+
+ # 設定超時自動退出
+ timeout_timer = QTimer()
+ timeout_timer.setSingleShot(True)
+ timeout_timer.timeout.connect(lambda: (
+ print("\n⚠ 測試超時,強制退出"),
+ executor.stop(),
+ app.quit()
+ ))
+ timeout_timer.start(180_000) # 180 秒超時
+
+ executor.start(planned_waypoints)
+
+ print("進入事件迴圈 (等待任務完成或 180 秒超時) ...\n")
+ app.exec()
+
+ executor.stop()
+ sender.close()
+ print("測試結束")
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/src/GUI/validation/verify_waypoints.py b/src/GUI/validation/verify_waypoints.py
new file mode 100644
index 0000000..fe2d855
--- /dev/null
+++ b/src/GUI/validation/verify_waypoints.py
@@ -0,0 +1,482 @@
+#!/usr/bin/env python3
+"""
+任務規劃視覺化驗證工具(含動畫模擬)
+
+使用方式:
+ 1. 由 GUI 自動觸發: python3 verify_waypoints.py --file /tmp/mission_plan.json
+ 2. 獨立手動執行: python3 verify_waypoints.py
+
+操作:
+ - 啟動後先顯示靜態航點圖
+ - 點擊圖下方的按鈕控制動畫
+"""
+import sys, os
+sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'))
+
+import json
+import argparse
+import math
+import matplotlib
+matplotlib.use('TkAgg')
+import matplotlib.pyplot as plt
+import matplotlib.animation as animation
+from matplotlib.widgets import Button
+from mpl_toolkits.mplot3d import Axes3D
+import numpy as np
+from mission_planner import FormationPlanner, MissionType, CoordinateConverter
+
+
+# ================================================================================
+# 色彩定義
+# ================================================================================
+COLORS = ['#378ADD', '#1D9E75', '#BA7517', '#D85A30', '#7F77DD', '#D4537E',
+ '#E24B4A', '#639922', '#00BFFF', '#FF69B4']
+
+# 動畫參數
+FRAMES_PER_SEGMENT = 40 # 每段航點之間的畫面數
+TRAIL_LENGTH = 60 # 拖尾長度(畫面數)
+INTERVAL_MS = 50 # 每幀間隔 (ms)
+
+
+# ================================================================================
+# 靜態繪圖
+# ================================================================================
+
+def plot_grid_sweep(ax, data, conv):
+ """畫 Grid Sweep 俯視圖"""
+ if 'rect_corners' in data:
+ rect_local = [conv.gps_to_local(c[0], c[1], 0)[:2] for c in data['rect_corners']]
+ xs = [p[0] for p in rect_local] + [rect_local[0][0]]
+ ys = [p[1] for p in rect_local] + [rect_local[0][1]]
+ ax.plot(xs, ys, 'k--', linewidth=1.5, label='Task area')
+ ax.fill(xs, ys, alpha=0.05, color='gray')
+
+ _draw_waypoint_paths(ax, data, conv, show_sweep_labels=True)
+
+ total_wps = sum(len(wps) for wps in data['waypoints'])
+ ax.set_title(f'Grid Sweep - {len(data["drone_ids"])} drones, {total_wps} waypoints')
+ ax.set_xlabel('East (m)')
+ ax.set_ylabel('North (m)')
+ ax.set_aspect('equal')
+ ax.grid(True, alpha=0.3)
+
+
+def plot_formation(ax, data, conv):
+ """畫 M-Formation / Circle / Leader-Follower 俯視圖"""
+ _draw_waypoint_paths(ax, data, conv, show_sweep_labels=False)
+
+ if 'target' in data:
+ t = data['target']
+ tx, ty, _ = conv.gps_to_local(t[0], t[1], t[2] if len(t) > 2 else 0)
+ ax.plot(tx, ty, 'r*', markersize=18, zorder=5)
+ ax.annotate('Target', (tx, ty + 1), fontsize=9, color='red', ha='center', va='bottom')
+
+ if 'route_waypoints' in data:
+ route_local = [conv.gps_to_local(wp[0], wp[1], 0)[:2] for wp in data['route_waypoints']]
+ rxs = [p[0] for p in route_local]
+ rys = [p[1] for p in route_local]
+ ax.plot(rxs, rys, 'r--', linewidth=1.5, alpha=0.5, label='Route center')
+ for i, (rx, ry) in enumerate(route_local):
+ ax.plot(rx, ry, 'ro', markersize=6, alpha=0.5)
+ ax.annotate(f'R{i+1}', (rx, ry + 0.8), fontsize=7, color='red',
+ ha='center', va='bottom', alpha=0.7)
+
+ mission_type = data.get('mission_type', 'formation')
+ total_wps = sum(len(wps) for wps in data['waypoints'])
+ ax.set_title(f'{mission_type} - {len(data["drone_ids"])} drones, {total_wps} waypoints')
+ ax.set_xlabel('East (m)')
+ ax.set_ylabel('North (m)')
+ ax.set_aspect('equal')
+ ax.grid(True, alpha=0.3)
+
+
+def _draw_waypoint_paths(ax, data, conv, show_sweep_labels=False):
+ """共用的航點路徑繪圖"""
+ drone_ids = data['drone_ids']
+ waypoints = data['waypoints']
+ drones_gps = data.get('drones_gps', [])
+
+ for i, pos in enumerate(drones_gps):
+ x, y, _ = conv.gps_to_local(pos[0], pos[1], pos[2] if len(pos) > 2 else 0)
+ c = COLORS[i % len(COLORS)]
+ ax.plot(x, y, 'o', color=c, markersize=10, zorder=5)
+ ax.annotate(f'{drone_ids[i]}', (x, y + 1), fontsize=8, fontweight='bold',
+ ha='center', va='bottom', color=c)
+
+ for i, wps in enumerate(waypoints):
+ c = COLORS[i % len(COLORS)]
+ local_wps = [conv.gps_to_local(wp[0], wp[1], wp[2]) for wp in wps]
+ xs = [p[0] for p in local_wps]
+ ys = [p[1] for p in local_wps]
+ ax.plot(xs, ys, '-', color=c, linewidth=1.5, alpha=0.7)
+
+ for j, (x, y, z) in enumerate(local_wps):
+ if show_sweep_labels:
+ if j == 0:
+ ax.plot(x, y, 's', color=c, markersize=8, zorder=4)
+ ax.annotate('gather', (x, y), fontsize=6, ha='right', va='top')
+ elif j == 1:
+ ax.plot(x, y, '^', color=c, markersize=8, zorder=4)
+ ax.annotate('entry', (x, y), fontsize=6, ha='right', va='top')
+ else:
+ ax.plot(x, y, '.', color=c, markersize=4)
+ else:
+ marker = 's' if j == 0 else '*'
+ ax.plot(x, y, marker, color=c, markersize=10, zorder=4)
+ ax.annotate(f'WP{j}\n({z:.0f}m)', (x, y), fontsize=6, ha='center', va='bottom')
+
+ if local_wps:
+ lx, ly, _ = local_wps[-1]
+ ax.plot(lx, ly, 'X', color=c, markersize=10, markeredgewidth=2, zorder=4)
+
+
+# ================================================================================
+# 動畫模擬
+# ================================================================================
+
+class MissionAnimator:
+ """任務動畫控制器"""
+
+ def __init__(self, fig, ax, data, conv):
+ self.fig = fig
+ self.ax = ax
+ self.data = data
+ self.conv = conv
+ self.is_playing = False
+ self.anim = None
+ self.current_frame = 0
+
+ drone_ids = data['drone_ids']
+ waypoints = data['waypoints']
+ drones_gps = data.get('drones_gps', [])
+ self.num_drones = len(drone_ids)
+
+ # 建立完整航點序列:初始位置 → WP0 → WP1 → ...
+ self.all_local_wps = []
+ for i, wps in enumerate(waypoints):
+ local_wps = [conv.gps_to_local(wp[0], wp[1], wp[2]) for wp in wps]
+
+ # 把初始位置插入最前面
+ if i < len(drones_gps):
+ pos = drones_gps[i]
+ start = conv.gps_to_local(pos[0], pos[1], pos[2] if len(pos) > 2 else 0)
+ local_wps.insert(0, start)
+
+ self.all_local_wps.append(local_wps)
+
+ # 計算最大航點段數
+ self.max_segments = max(len(wps) - 1 for wps in self.all_local_wps) if self.all_local_wps else 0
+ self.total_frames = self.max_segments * FRAMES_PER_SEGMENT
+
+ # 預計算位置
+ self.positions = self._precompute_positions()
+
+ # 動畫元素
+ self.drone_dots = []
+ self.trail_lines = []
+ self.trail_data = [[] for _ in range(self.num_drones)]
+ self.status_text = None
+
+ def _precompute_positions(self):
+ """預計算所有幀的位置 — 等時間步進"""
+ positions = []
+
+ for frame in range(self.total_frames + 1):
+ seg_idx = frame // FRAMES_PER_SEGMENT
+ seg_progress = (frame % FRAMES_PER_SEGMENT) / FRAMES_PER_SEGMENT
+
+ frame_positions = []
+ for drone_idx in range(self.num_drones):
+ wps = self.all_local_wps[drone_idx]
+ num_segs = len(wps) - 1
+
+ if seg_idx >= num_segs:
+ frame_positions.append((wps[-1][0], wps[-1][1]))
+ else:
+ x0, y0, _ = wps[seg_idx]
+ x1, y1, _ = wps[seg_idx + 1]
+ x = x0 + (x1 - x0) * seg_progress
+ y = y0 + (y1 - y0) * seg_progress
+ frame_positions.append((x, y))
+
+ positions.append(frame_positions)
+
+ return positions
+
+ def setup(self):
+ """建立動畫元素和按鈕"""
+ for i in range(self.num_drones):
+ c = COLORS[i % len(COLORS)]
+ dot, = self.ax.plot([], [], 'o', color=c, markersize=12,
+ markeredgecolor='white', markeredgewidth=1.5, zorder=10)
+ self.drone_dots.append(dot)
+ trail, = self.ax.plot([], [], '-', color=c, linewidth=2.5, alpha=0.4, zorder=9)
+ self.trail_lines.append(trail)
+
+ self.status_text = self.ax.text(
+ 0.02, 0.98, 'Ready',
+ transform=self.ax.transAxes, fontsize=10,
+ verticalalignment='top',
+ bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.8)
+ )
+
+ # 按鈕放在右上角圖內,避免擋到軸標籤
+ ax_play = self.fig.add_axes([0.72, 0.91, 0.08, 0.035])
+ self.btn_play = Button(ax_play, 'Play', color='#4CAF50', hovercolor='#66BB6A')
+ self.btn_play.label.set_color('white')
+ self.btn_play.label.set_fontweight('bold')
+ self.btn_play.on_clicked(self._on_play)
+
+ ax_pause = self.fig.add_axes([0.81, 0.91, 0.08, 0.035])
+ self.btn_pause = Button(ax_pause, 'Pause', color='#FF9800', hovercolor='#FFB74D')
+ self.btn_pause.label.set_color('white')
+ self.btn_pause.label.set_fontweight('bold')
+ self.btn_pause.on_clicked(self._on_pause)
+
+ ax_reset = self.fig.add_axes([0.90, 0.91, 0.08, 0.035])
+ self.btn_reset = Button(ax_reset, 'Reset', color='#F44336', hovercolor='#EF5350')
+ self.btn_reset.label.set_color('white')
+ self.btn_reset.label.set_fontweight('bold')
+ self.btn_reset.on_clicked(self._on_reset)
+
+ def _on_play(self, event):
+ if self.is_playing:
+ return
+ if self.anim is None:
+ self.anim = animation.FuncAnimation(
+ self.fig, self._update_frame,
+ frames=range(self.current_frame, self.total_frames + 1),
+ interval=INTERVAL_MS,
+ blit=False,
+ repeat=False
+ )
+ else:
+ self.anim.resume()
+ self.is_playing = True
+ self.fig.canvas.draw_idle()
+
+ def _on_pause(self, event):
+ if not self.is_playing or self.anim is None:
+ return
+ self.anim.pause()
+ self.is_playing = False
+ self.status_text.set_text(f'Paused (frame {self.current_frame}/{self.total_frames})')
+ self.fig.canvas.draw_idle()
+
+ def _on_reset(self, event):
+ if self.anim is not None:
+ self.anim.event_source.stop()
+ self.anim = None
+ self.is_playing = False
+ self.current_frame = 0
+ self.trail_data = [[] for _ in range(self.num_drones)]
+ for dot in self.drone_dots:
+ dot.set_data([], [])
+ for trail in self.trail_lines:
+ trail.set_data([], [])
+ self.status_text.set_text('Ready')
+ self.fig.canvas.draw_idle()
+
+ def _update_frame(self, frame):
+ self.current_frame = frame
+
+ if frame >= len(self.positions):
+ self.is_playing = False
+ self.status_text.set_text('Done')
+ return self.drone_dots + self.trail_lines + [self.status_text]
+
+ # seg_idx - 1 是因為第 0 段是 start→WP0
+ seg_idx = frame // FRAMES_PER_SEGMENT
+ progress = (frame % FRAMES_PER_SEGMENT) / FRAMES_PER_SEGMENT
+
+ # 顯示時把第 0 段標為 "Start -> WP0"
+ if seg_idx == 0:
+ label = f'Start -> WP0 Progress {progress:.0%}'
+ else:
+ label = f'WP{seg_idx-1} -> WP{seg_idx} Progress {progress:.0%}'
+ self.status_text.set_text(
+ f'{label} Frame {frame}/{self.total_frames}'
+ )
+
+ for i in range(self.num_drones):
+ x, y = self.positions[frame][i]
+ self.drone_dots[i].set_data([x], [y])
+
+ self.trail_data[i].append((x, y))
+ if len(self.trail_data[i]) > TRAIL_LENGTH:
+ self.trail_data[i] = self.trail_data[i][-TRAIL_LENGTH:]
+ trail_x = [p[0] for p in self.trail_data[i]]
+ trail_y = [p[1] for p in self.trail_data[i]]
+ self.trail_lines[i].set_data(trail_x, trail_y)
+
+ return self.drone_dots + self.trail_lines + [self.status_text]
+
+
+# ================================================================================
+# 主流程
+# ================================================================================
+
+def visualize_from_file(filepath):
+ """從 JSON 檔案讀取並視覺化"""
+ with open(filepath, 'r') as f:
+ data = json.load(f)
+
+ origin = data['origin']
+ conv = CoordinateConverter(origin[0], origin[1], 0)
+ mission_type = data.get('mission_type', 'formation')
+ is_sweep = mission_type == 'grid_sweep'
+
+ fig, ax = plt.subplots(1, 1, figsize=(10, 8))
+ fig.suptitle(f'Mission Verification - {mission_type}', fontsize=13, fontweight='bold')
+
+ if is_sweep:
+ plot_grid_sweep(ax, data, conv)
+ else:
+ plot_formation(ax, data, conv)
+
+ _print_summary(data)
+
+ animator = MissionAnimator(fig, ax, data, conv)
+ animator.setup()
+
+ plt.tight_layout(rect=[0, 0, 1, 0.95])
+ plt.show()
+
+
+def visualize_standalone():
+ """獨立執行:使用內建測試資料"""
+ drones = [
+ (24.123450, 120.567800, 0.0),
+ (24.123470, 120.567820, 0.0),
+ (24.123440, 120.567810, 0.0),
+ (24.123460, 120.567830, 0.0),
+ ]
+ rect_corners = [
+ (24.12380, 120.56775),
+ (24.12380, 120.56810),
+ (24.12420, 120.56810),
+ (24.12420, 120.56775),
+ ]
+ target = (24.12400, 120.56795, 10.0)
+
+ planner = FormationPlanner(spacing=5.0, base_altitude=10.0, altitude_diff=2.0)
+
+ fig = plt.figure(figsize=(16, 10))
+ fig.suptitle('Mission Planner Verification (standalone)', fontsize=14, fontweight='bold')
+
+ # M-Formation
+ wps_m, origin_m = planner.plan_formation_mission(drones, target, MissionType.M_FORMATION)
+ conv_m = CoordinateConverter(origin_m[0], origin_m[1], 0)
+ data_m = {
+ 'drone_ids': [f's0_{i + 1}' for i in range(len(drones))],
+ 'waypoints': wps_m,
+ 'drones_gps': drones,
+ 'target': target,
+ 'mission_type': 'M_FORMATION'
+ }
+ ax1 = fig.add_subplot(2, 2, 1)
+ plot_formation(ax1, data_m, conv_m)
+
+ # Grid Sweep 5m
+ target_gs = (sum(c[0] for c in rect_corners) / 4,
+ sum(c[1] for c in rect_corners) / 4, 10.0)
+ wps_g, origin_g = planner.plan_formation_mission(
+ drones, target_gs, MissionType.GRID_SWEEP,
+ params={'rect_corners': rect_corners, 'line_spacing': 5.0, 'altitude': 10.0}
+ )
+ conv_g = CoordinateConverter(origin_g[0], origin_g[1], 0)
+ data_g = {
+ 'drone_ids': [f's0_{i + 1}' for i in range(len(drones))],
+ 'waypoints': wps_g,
+ 'drones_gps': drones,
+ 'rect_corners': rect_corners,
+ 'mission_type': 'grid_sweep'
+ }
+ ax2 = fig.add_subplot(2, 2, 2)
+ plot_grid_sweep(ax2, data_g, conv_g)
+
+ # Leader-Follower
+ route = [
+ (24.12360, 120.56780),
+ (24.12380, 120.56800),
+ (24.12400, 120.56820),
+ (24.12410, 120.56800),
+ (24.12420, 120.56790),
+ ]
+ wps_lf, origin_lf = planner.plan_formation_mission(
+ drones, target, MissionType.LEADER_FOLLOWER,
+ params={'route_waypoints': route, 'lateral_offset': 3.0,
+ 'longitudinal_spacing': 5.0, 'altitude': 10.0}
+ )
+ conv_lf = CoordinateConverter(origin_lf[0], origin_lf[1], 0)
+ data_lf = {
+ 'drone_ids': [f's0_{i + 1}' for i in range(len(drones))],
+ 'waypoints': wps_lf,
+ 'drones_gps': drones,
+ 'route_waypoints': route,
+ 'target': target,
+ 'mission_type': 'LEADER_FOLLOWER'
+ }
+ ax3 = fig.add_subplot(2, 2, 3)
+ plot_formation(ax3, data_lf, conv_lf)
+
+ # 3D
+ ax4 = fig.add_subplot(2, 2, 4, projection='3d')
+ _plot_3d(ax4, data_g, conv_g)
+
+ plt.tight_layout()
+ plt.show()
+
+
+def _plot_3d(ax, data, conv):
+ """3D 視角"""
+ if 'rect_corners' in data:
+ rect_local = [conv.gps_to_local(c[0], c[1], 0)[:2] for c in data['rect_corners']]
+ xs = [p[0] for p in rect_local] + [rect_local[0][0]]
+ ys = [p[1] for p in rect_local] + [rect_local[0][1]]
+ ax.plot(xs, ys, [0] * len(xs), 'k--', linewidth=1, alpha=0.4)
+
+ for i, wps in enumerate(data['waypoints']):
+ c = COLORS[i % len(COLORS)]
+ local_wps = [conv.gps_to_local(wp[0], wp[1], wp[2]) for wp in wps]
+ xs = [p[0] for p in local_wps]
+ ys = [p[1] for p in local_wps]
+ zs = [p[2] for p in local_wps]
+ ax.plot(xs, ys, zs, '-', color=c, linewidth=1.5)
+ if local_wps:
+ ax.scatter(xs[0], ys[0], zs[0], color=c, s=50, marker='s')
+ ax.scatter(xs[-1], ys[-1], zs[-1], color=c, s=50, marker='X')
+
+ ax.set_title('3D view')
+ ax.set_xlabel('East (m)')
+ ax.set_ylabel('North (m)')
+ ax.set_zlabel('Alt (m)')
+
+
+def _print_summary(data):
+ """終端印出摘要"""
+ drone_ids = data['drone_ids']
+ waypoints = data['waypoints']
+ mission_type = data.get('mission_type', 'unknown')
+ print(f"\n{'=' * 50}")
+ print(f"Mission: {mission_type}")
+ print(f"Drones: {len(drone_ids)}")
+ print(f"{'=' * 50}")
+ for i, did in enumerate(drone_ids):
+ wps = waypoints[i]
+ print(f" {did}: {len(wps)} waypoints")
+ for j, wp in enumerate(wps):
+ print(f" WP{j}: ({wp[0]:.6f}, {wp[1]:.6f}, {wp[2]:.1f}m)")
+ print(f"{'=' * 50}\n")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description='Mission waypoint visualizer')
+ parser.add_argument('--file', '-f', type=str, default=None,
+ help='JSON file from GUI (auto-generated)')
+ args = parser.parse_args()
+
+ if args.file:
+ visualize_from_file(args.file)
+ else:
+ visualize_standalone()
\ No newline at end of file