You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/GUI/gui.py

1345 lines
62 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
import rclpy
from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QSplitter, QScrollArea,
QSizePolicy, QTabWidget, QTableWidget, QTableWidgetItem,
QHeaderView, QPushButton, QCheckBox, QLineEdit,
QComboBox, QDialog)
from PyQt6.QtCore import Qt, QTimer
from PyQt6.QtGui import QColor
import sys
import asyncio
import json
import subprocess
import time
# 導入分離的類別
from communication import DroneMonitor, UDPMavlinkReceiver, WebSocketMavlinkReceiver
from map_layout import DroneMap
from drone_panel import DronePanel, SocketGroupPanel
from comm_panel import CommPanel
from overview_table import OverviewTable
# ================================================================================
# 導入任務規劃器、執行器、發送器、群組
# ================================================================================
from mission_planner import FormationPlanner, MissionType
from command_sender import MavlinkSender
from mission_executor import MissionExecutor, MissionState
from mission_group import (
MissionGroup, GroupPanel, DroneAssignDialog, GROUP_COLORS
)
# ================================================================================
class ControlStationUI(QMainWindow):
VERSION = '1.0.1'
def __init__(self):
super().__init__()
self.setWindowTitle(f'GCS v{self.VERSION}')
self.resize(1400, 900)
# 初始化ROS2
rclpy.init()
self.monitor = DroneMonitor()
self.monitor.signals.update_signal.connect(self.update_ui)
# ROS执行器配置
self.executor = rclpy.executors.SingleThreadedExecutor()
self.executor.add_node(self.monitor)
# 定时处理ROS事件
self.timer = QTimer()
self.timer.timeout.connect(self.spin_ros)
self.timer.start(10)
# 初始化 panel 和 map 更新10Hz
self.panel_map_timer = QTimer()
self.panel_map_timer.timeout.connect(self._update_panel_and_map)
self.panel_map_timer.start(100) # 10Hz
# 快取消息數據,以便在沒有新消息時使用上一次的值
self._message_cache = {}
# 初始化UI
self.drones = {}
self.socket_groups = {}
self.socket_types = {}
self.socket_colors = {
'0': '#00BFFF', # 天藍色 (DeepSkyBlue)
'1': '#FFD700', # 金色 (Gold)
'2': '#FF6969', # 淺紅色 (Light Red)
'3': '#FF69B4', # 熱粉紅 (HotPink)
'4': '#00FA9A', # 中春綠 (MediumSpringGreen)
'5': '#9370DB', # 中紫色 (MediumPurple) - 串口
'6': '#FFA500', # 橙色 (Orange)
'7': '#20B2AA', # 淺海綠 (LightSeaGreen)
'8': '#7CFC00', # 草綠色 (LawnGreen)
'9': '#FF8C00', # 深橙色 (DarkOrange)
'default': '#AAAAAA' # 灰色
}
self.drone_positions = {}
self.drone_headings = {}
# 初始化地圖
self.drone_map = DroneMap()
# 初始化連接列表
self.udp_receivers = []
self.udp_connections = []
self.ws_connections = []
self.serial_receivers = []
self.serial_connections = []
# ================================================================================
# 初始化任務規劃器
# ================================================================================
self.mission_planner = FormationPlanner(
spacing=5.0, # 5 公尺間距
base_altitude=10.0, # 基準高度 10 公尺
altitude_diff=2.0 # 高低差 2 公尺
)
# ================================================================================
# ================================================================================
# 初始化指令發送器(所有群組共用)
# ================================================================================
self.command_sender = MavlinkSender("udpout:127.0.0.1:14550") # 驗證階段寫死
# ================================================================================
# ================================================================================
# 多任務群組管理
# ================================================================================
self.mission_groups = {} # group_id → MissionGroup
self.group_panels = {} # group_id → GroupPanel
self.active_group_id = None # 當前 active 的 group
self._group_counter = 0 # 用來產生 group_id
self._pending_box_assign = None # 框選後直接分配到的 group_id
# ================================================================================
self.init_ui()
def init_ui(self):
main_splitter = QSplitter(Qt.Orientation.Horizontal)
# 左側 TabWidget
self.left_tab = QTabWidget()
# — 分頁 1Drone Panel
self.drone_panel_container = QWidget()
self.drone_panel_layout = QVBoxLayout(self.drone_panel_container)
self.drone_panel_layout.setAlignment(Qt.AlignmentFlag.AlignTop)
self.drone_panel_layout.setSpacing(0)
self.drone_panel_layout.setContentsMargins(10, 10, 10, 10)
scroll = QScrollArea()
scroll.setWidget(self.drone_panel_container)
scroll.setWidgetResizable(True)
self.left_tab.addTab(scroll, "無人載具")
# — 分頁 2Overview Table
self.overview_table = OverviewTable()
self.left_tab.addTab(self.overview_table, "總覽")
# — 分頁 3通訊設定
self.comm_panel = CommPanel()
self.comm_panel.udp_connection_added.connect(self.handle_udp_connection_added)
self.comm_panel.ws_connection_added.connect(self.handle_ws_connection_added)
self.comm_panel.serial_connection_added.connect(self.handle_serial_connection_added)
self.comm_panel.udp_connection_toggled.connect(self.toggle_udp_connection)
self.comm_panel.ws_connection_toggled.connect(self.toggle_ws_connection)
self.comm_panel.serial_connection_toggled.connect(self.toggle_serial_connection)
self.comm_panel.udp_connection_removed.connect(self.remove_udp_connection)
self.comm_panel.ws_connection_removed.connect(self.remove_ws_connection)
self.comm_panel.serial_connection_removed.connect(self.remove_serial_connection)
self.comm_panel.status_message.connect(lambda msg, timeout: self.statusBar().showMessage(msg, timeout))
self.left_tab.addTab(self.comm_panel, "通訊")
# 右侧容器
right_container = QWidget()
right_layout = QVBoxLayout(right_container)
right_layout.setContentsMargins(10, 10, 10, 10)
right_layout.setSpacing(10)
# ========== 任務群組 Tab ==========
group_header = QHBoxLayout()
group_title = QLabel("任務群組")
group_title.setStyleSheet(
"color: #DDD; font-size: 14px; font-weight: bold; padding: 2px;")
group_header.addWidget(group_title)
group_header.addStretch()
add_group_btn = QPushButton("+ 新增群組")
add_group_btn.setStyleSheet("""
QPushButton { background-color: #4A9EFF; color: white; border: none;
padding: 5px 12px; border-radius: 4px; font-size: 12px; font-weight: bold; }
QPushButton:hover { background-color: #3A8EEF; }
""")
add_group_btn.clicked.connect(self._add_mission_group)
group_header.addWidget(add_group_btn)
clear_traj_btn = QPushButton("清除軌跡")
clear_traj_btn.setStyleSheet("""
QPushButton { background-color: #EF5350; color: white; border: none;
padding: 5px 12px; border-radius: 4px; font-size: 12px; font-weight: bold; }
QPushButton:hover { background-color: #E53935; }
""")
clear_traj_btn.clicked.connect(self.drone_map.clear_trajectories)
group_header.addWidget(clear_traj_btn)
right_layout.addLayout(group_header)
self.group_tab_widget = QTabWidget()
self.group_tab_widget.setStyleSheet("""
QTabWidget::pane { border: 1px solid #444; background-color: #2B2B2B; }
QTabBar::tab {
background-color: #333; color: #AAA; border: 1px solid #444;
padding: 5px 12px; margin-right: 2px; border-top-left-radius: 4px;
border-top-right-radius: 4px; font-size: 12px;
}
QTabBar::tab:selected { background-color: #2B2B2B; color: #FFF; border-bottom-color: #2B2B2B; }
QTabBar::tab:hover { background-color: #3A3A3A; }
""")
self.group_tab_widget.setFixedHeight(150)
self.group_tab_widget.currentChanged.connect(self._on_group_tab_changed)
right_layout.addWidget(self.group_tab_widget)
# 預設建立 Group A
self._add_mission_group()
# 添加地圖
right_layout.addWidget(self.drone_map.get_widget())
self.drone_map.get_gps_signal().connect(self.handle_map_click)
self.drone_map.get_drone_clicked_signal().connect(self.handle_drone_clicked)
self.drone_map.get_clear_all_drone_selection_signal().connect(self.handle_clear_all_drone_selection)
self.drone_map.get_toggle_select_all_drones_signal().connect(self.handle_toggle_select_all_drones)
# ================================================================================
# 連接地圖信號(任務模式改由 Group Tab 控制,不再從地圖下拉選單)
# ================================================================================
self.drone_map.get_rectangle_selected_signal().connect(self.handle_rectangle_selected)
self.drone_map.get_route_confirmed_signal().connect(self.handle_route_confirmed)
self.drone_map.get_drone_box_selected_signal().connect(self._handle_drone_box_selected)
# ================================================================================
main_splitter.addWidget(self.left_tab)
main_splitter.addWidget(right_container)
main_splitter.setSizes([400, 1000])
self.setCentralWidget(main_splitter)
# ================================================================================
# 連線管理
# ================================================================================
def handle_udp_connection_added(self, ip, port):
new_conn = {'name': f'UDP {len(self.udp_connections) + 1}', 'ip': ip, 'port': port, 'enabled': True}
receiver = UDPMavlinkReceiver(ip, port, self.monitor.signals, new_conn['name'], self.monitor)
receiver.start()
self.udp_receivers.append(receiver)
new_conn['receiver'] = receiver
self.udp_connections.append(new_conn)
self.comm_panel.add_udp_panel(new_conn)
self.statusBar().showMessage(f"已添加 UDP 連接: {ip}:{port}", 3000)
def handle_ws_connection_added(self, url):
new_conn = {'name': f'WS {len(self.ws_connections) + 1}', 'url': url, 'enabled': True}
receiver = WebSocketMavlinkReceiver(url, self.monitor.signals, new_conn['name'], self.monitor)
receiver.start()
self.monitor.ws_receivers.append(receiver)
new_conn['receiver'] = receiver
self.ws_connections.append(new_conn)
self.comm_panel.add_ws_panel(new_conn)
self.statusBar().showMessage(f"已添加 WebSocket 連接: {url}", 3000)
def create_drone_panel(self, drone_id):
panel = DronePanel(drone_id)
panel.mode_change_requested.connect(self.handle_mode_change)
panel.arm_requested.connect(self.handle_arm)
panel.takeoff_requested.connect(self.handle_takeoff)
panel.setpoint_requested.connect(self.handle_single_setpoint)
panel.selection_changed.connect(self.handle_drone_selection)
return panel
def toggle_ws_connection(self, conn, btn, status_label):
if conn.get('enabled', False):
if 'receiver' in conn and conn['receiver']:
conn['receiver'].stop()
conn['enabled'] = False
btn.setText("啟動")
status_label.setStyleSheet("color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
self.statusBar().showMessage(f"已停止 WebSocket 連接: {conn['url']}", 3000)
else:
receiver = WebSocketMavlinkReceiver(conn['url'], self.monitor.signals, conn['name'])
receiver.start()
self.monitor.ws_receivers.append(receiver)
conn['receiver'] = receiver
conn['enabled'] = True
btn.setText("停止")
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
self.statusBar().showMessage(f"已啟動 WebSocket 連接: {conn['url']}", 3000)
def remove_ws_connection(self, conn, panel):
if 'receiver' in conn and conn['receiver']:
conn['receiver'].stop()
if conn['receiver'] in self.monitor.ws_receivers:
self.monitor.ws_receivers.remove(conn['receiver'])
if conn in self.ws_connections:
self.ws_connections.remove(conn)
self.comm_panel.remove_ws_connection_from_list(conn)
panel.setParent(None)
panel.deleteLater()
self.statusBar().showMessage(f"已移除 WebSocket 連接: {conn['url']}", 3000)
def toggle_udp_connection(self, conn, btn, status_label):
if conn.get('enabled', False):
if 'receiver' in conn and conn['receiver']:
conn['receiver'].stop()
conn['enabled'] = False
btn.setText("啟動")
status_label.setStyleSheet("color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
self.statusBar().showMessage(f"已停止 UDP 連接: {conn['ip']}:{conn['port']}", 3000)
else:
receiver = UDPMavlinkReceiver(conn['ip'], conn['port'], self.monitor.signals, conn['name'])
receiver.start()
self.udp_receivers.append(receiver)
conn['receiver'] = receiver
conn['enabled'] = True
btn.setText("停止")
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
self.statusBar().showMessage(f"已啟動 UDP 連接: {conn['ip']}:{conn['port']}", 3000)
def remove_udp_connection(self, conn, panel):
if 'receiver' in conn and conn['receiver']:
conn['receiver'].stop()
if conn['receiver'] in self.udp_receivers:
self.udp_receivers.remove(conn['receiver'])
if conn in self.udp_connections:
self.udp_connections.remove(conn)
self.comm_panel.remove_udp_connection_from_list(conn)
panel.setParent(None)
panel.deleteLater()
self.statusBar().showMessage(f"已移除 UDP 連接: {conn['ip']}:{conn['port']}", 3000)
def handle_serial_connection_added(self, port, baudrate):
conn = {'name': 'Serial', 'port': port, 'baudrate': baudrate, 'enabled': False, 'receiver': None}
self.serial_connections.append(conn)
self.comm_panel.add_serial_panel(conn)
self.statusBar().showMessage(f"已添加 Serial 連接: {port} @ {baudrate}", 3000)
def toggle_serial_connection(self, conn, btn, status_label):
if conn.get('enabled', False):
if conn.get('receiver'):
conn['receiver'].stop()
conn['receiver'] = None
conn['enabled'] = False
btn.setText("啟動")
status_label.setStyleSheet("color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
self.statusBar().showMessage(f"已停止 Serial 連接: {conn['port']}", 3000)
else:
try:
receiver = self.monitor.start_serial_connection(conn['port'], conn['baudrate'])
conn['receiver'] = receiver
conn['enabled'] = True
btn.setText("停止")
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
self.statusBar().showMessage(f"已啟動 Serial 連接: {conn['port']}", 3000)
except Exception as e:
self.statusBar().showMessage(f"啟動 Serial 連接失敗: {str(e)}", 5000)
def remove_serial_connection(self, conn, panel):
if conn.get('enabled', False) and conn.get('receiver'):
conn['receiver'].stop()
if conn in self.serial_connections:
self.serial_connections.remove(conn)
self.comm_panel.remove_serial_connection_from_list(conn)
panel.setParent(None)
panel.deleteLater()
self.statusBar().showMessage(f"已移除 Serial 連接: {conn['port']}", 3000)
def create_socket_group_panel(self, socket_id):
color = self.socket_colors.get(socket_id, self.socket_colors['default'])
socket_type = self.socket_types.get(socket_id, None)
panel = SocketGroupPanel(socket_id, color, socket_type)
panel.group_selection_changed.connect(self.handle_group_selection)
return panel
# ================================================================================
# 無人機操作
# ================================================================================
def handle_mode_change(self, drone_id):
# 從 active group 的 mode_combo 讀取模式
group = self._get_active_group()
if group:
panel = self.group_panels.get(group.group_id)
mode = panel.mode_combo.currentText() if panel else "GUIDED"
else:
mode = "GUIDED"
loop = asyncio.get_event_loop()
future = self.monitor.set_mode(drone_id, mode)
loop.create_task(self.handle_service_response(future, f"切換模式 {mode} {drone_id}"))
def handle_arm(self, drone_id):
loop = asyncio.get_event_loop()
arm_state = not self.monitor.get_arm_state(drone_id)
future = self.monitor.arm_drone(drone_id, arm_state)
loop.create_task(self.handle_service_response(future, f"{'解鎖' if arm_state else '上鎖'} {drone_id}"))
def handle_takeoff(self, drone_id):
loop = asyncio.get_event_loop()
future = self.monitor.takeoff_drone(drone_id, 10.0)
loop.create_task(self.handle_service_response(future, f"起飛 {drone_id}"))
def handle_setpoint_selected(self):
try:
x = float(self.x_input.text() or '0')
y = float(self.y_input.text() or '0')
z = float(self.z_input.text() or '0')
for drone_id in self.monitor.selected_drones:
if self.monitor.send_setpoint(drone_id, x, y, z):
self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000)
else:
self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000)
except ValueError:
self.statusBar().showMessage("座標格式錯誤", 3000)
def handle_single_setpoint(self, drone_id):
try:
x = float(self.x_input.text() or '0')
y = float(self.y_input.text() or '0')
z = float(self.z_input.text() or '0')
if self.monitor.send_setpoint(drone_id, x, y, z):
self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000)
else:
self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000)
except ValueError:
self.statusBar().showMessage("座標格式錯誤", 3000)
async def handle_service_response(self, future, action):
try:
result = await future
if result:
self.statusBar().showMessage(f"{action} 成功", 3000)
else:
self.statusBar().showMessage(f"{action} 失敗", 3000)
except Exception as e:
self.statusBar().showMessage(f"{action} 錯誤: {str(e)}", 3000)
def handle_arm_selected(self):
loop = asyncio.get_event_loop()
for drone_id in self.monitor.selected_drones:
future = self.monitor.arm_drone(drone_id, True)
loop.create_task(self.handle_service_response(future, f"批次解鎖 {drone_id}"))
def handle_takeoff_selected(self):
loop = asyncio.get_event_loop()
for drone_id in self.monitor.selected_drones:
future = self.monitor.takeoff_drone(drone_id, 10.0)
loop.create_task(self.handle_service_response(future, f"批次起飛 {drone_id}"))
# ================================================================================
# 任務群組管理
# ================================================================================
def _next_group_id(self):
"""產生下一個群組 ID (A, B, C, ...)"""
gid = chr(ord('A') + self._group_counter)
self._group_counter += 1
return gid
def _add_mission_group(self):
"""新增一個任務群組"""
gid = self._next_group_id()
color = GROUP_COLORS[(self._group_counter - 1) % len(GROUP_COLORS)]
group = MissionGroup(gid, color)
self.mission_groups[gid] = group
panel = GroupPanel(group)
panel.assign_drones_requested.connect(self._handle_assign_drones)
panel.mission_type_changed.connect(self._handle_mission_type_changed)
panel.start_requested.connect(self._handle_group_start)
panel.pause_requested.connect(self._handle_group_pause)
panel.stop_requested.connect(self._handle_group_stop)
panel.mode_change_requested.connect(self._handle_group_mode_change)
panel.arm_requested.connect(self._handle_group_arm)
panel.takeoff_requested.connect(self._handle_group_takeoff)
panel.box_select_requested.connect(self._handle_box_select)
panel.select_all_requested.connect(self._handle_select_all_for_group)
panel.clear_group_requested.connect(self._handle_clear_group)
self.group_panels[gid] = panel
# 用帶顏色的 tab 標題
scroll = QScrollArea()
scroll.setWidget(panel)
scroll.setWidgetResizable(True)
idx = self.group_tab_widget.addTab(scroll, f"Group {gid}")
self.group_tab_widget.tabBar().setTabTextColor(idx, QColor(color))
self.group_tab_widget.setCurrentIndex(idx)
self.active_group_id = gid
self.statusBar().showMessage(f"已新增 Group {gid}", 2000)
def _on_group_tab_changed(self, index):
"""Tab 切換時更新 active group 並同步地圖模式"""
if index < 0:
self.active_group_id = None
return
# tab 標題是 "Group X"
tab_text = self.group_tab_widget.tabText(index)
gid = tab_text.replace("Group ", "")
if gid in self.mission_groups:
self.active_group_id = gid
group = self.mission_groups[gid]
self.drone_map.set_mission_mode(group.mission_type)
def _get_active_group(self):
"""取得當前 active 的 MissionGroup"""
if self.active_group_id and self.active_group_id in self.mission_groups:
return self.mission_groups[self.active_group_id]
return None
def _get_other_assigned(self, exclude_gid):
"""取得其他群組已佔用的無人機 {drone_id: group_id}"""
assigned = {}
for gid, group in self.mission_groups.items():
if gid == exclude_gid:
continue
for did in group.drone_ids:
assigned[did] = gid
return assigned
def _handle_assign_drones(self, group_id):
"""開啟無人機分配對話框(已勾選的 checkbox 會預先帶入)"""
group = self.mission_groups.get(group_id)
if not group:
return
all_ids = list(self.drones.keys())
other_assigned = self._get_other_assigned(group_id)
# 將目前 checkbox 已勾選的無人機(且未被其他群組佔用)合併進 pre-selected
currently_checked = self.get_selected_drones()
pre_selected = set(group.drone_ids)
for did in currently_checked:
if did not in other_assigned:
pre_selected.add(did)
dialog = DroneAssignDialog(self, all_ids, pre_selected, other_assigned)
if dialog.exec() == QDialog.DialogCode.Accepted:
group.drone_ids = dialog.get_selected()
panel = self.group_panels.get(group_id)
if panel:
panel.update_drone_list()
panel.update_status()
self.statusBar().showMessage(
f"Group {group_id}: 已分配 {len(group.drone_ids)} 台無人機", 3000)
def _handle_mission_type_changed(self, group_id, mission_type):
"""群組任務類型切換"""
group = self.mission_groups.get(group_id)
if group:
group.mission_type = mission_type
# 如果是 active group同步更新地圖的選擇模式
if group_id == self.active_group_id:
self.drone_map.set_mission_mode(mission_type)
def _create_executor_for_group(self, group):
"""為群組建立 MissionExecutor"""
executor = MissionExecutor(
sender=self.command_sender,
drone_gps=self.monitor.drone_gps,
arrival_radius=2.0,
send_rate_hz=2.0
)
executor.drone_waypoint_reached.connect(self.on_drone_waypoint_reached)
executor.mission_completed.connect(
lambda gid=group.group_id: self._on_group_mission_completed(gid))
group.executor = executor
def _handle_group_start(self, group_id):
"""啟動群組任務"""
group = self.mission_groups.get(group_id)
if not group:
return
if group.planned_waypoints is None:
self.statusBar().showMessage(f"⚠ Group {group_id}: 請先規劃任務", 3000)
return
if group.executor is None:
self._create_executor_for_group(group)
group.executor.start(group.planned_waypoints)
panel = self.group_panels.get(group_id)
if panel:
panel.update_status()
self.statusBar().showMessage(f"🚀 Group {group_id}: 任務已啟動", 3000)
def _handle_group_pause(self, group_id):
"""暫停/恢復群組任務"""
group = self.mission_groups.get(group_id)
if not group or not group.executor:
return
if group.executor.state == MissionState.RUNNING:
group.executor.pause()
self.statusBar().showMessage(f"⏸ Group {group_id}: 任務已暫停", 3000)
elif group.executor.state == MissionState.PAUSED:
group.executor.resume()
self.statusBar().showMessage(f"▶ Group {group_id}: 任務已恢復", 3000)
panel = self.group_panels.get(group_id)
if panel:
panel.update_status()
def _handle_group_stop(self, group_id):
"""停止群組任務"""
group = self.mission_groups.get(group_id)
if not group:
return
if group.executor:
group.executor.stop()
group.planned_waypoints = None
self.drone_map.clear_mission_plan_for_group(group_id)
panel = self.group_panels.get(group_id)
if panel:
panel.update_status()
panel.clear_mission_info()
self.statusBar().showMessage(f"■ Group {group_id}: 任務已停止", 3000)
def _handle_group_mode_change(self, group_id, mode):
"""切換群組內所有無人機的飛行模式"""
group = self.mission_groups.get(group_id)
if not group:
return
loop = asyncio.get_event_loop()
for drone_id in group.drone_ids:
future = self.monitor.set_mode(drone_id, mode)
loop.create_task(self.handle_service_response(future, f"{drone_id} 切換 {mode}"))
def _handle_group_arm(self, group_id):
"""解鎖群組內所有無人機"""
group = self.mission_groups.get(group_id)
if not group:
return
loop = asyncio.get_event_loop()
for drone_id in group.drone_ids:
future = self.monitor.arm_drone(drone_id, True)
loop.create_task(self.handle_service_response(future, f"解鎖 {drone_id}"))
def _handle_group_takeoff(self, group_id, altitude):
"""群組內所有無人機起飛"""
group = self.mission_groups.get(group_id)
if not group:
return
loop = asyncio.get_event_loop()
for drone_id in group.drone_ids:
future = self.monitor.takeoff_drone(drone_id, altitude)
loop.create_task(self.handle_service_response(future, f"起飛 {drone_id} ({altitude}m)"))
def _handle_box_select(self, group_id):
"""觸發地圖框選 → 框選完成後直接分配到該群組"""
self._pending_box_assign = group_id
self.drone_map.toggle_drone_box_select()
self.statusBar().showMessage(
f"請在地圖上框選要分配到 Group {group_id} 的無人機", 5000)
def _handle_drone_box_selected(self, drone_ids_json):
"""地圖框選完成 — 直接分配到指定群組"""
group_id = self._pending_box_assign
self._pending_box_assign = None
if not group_id:
return
group = self.mission_groups.get(group_id)
if not group:
return
drone_ids = json.loads(drone_ids_json)
other = self._get_other_assigned(group_id)
valid = {did for did in drone_ids if did not in other}
group.drone_ids = valid
panel = self.group_panels.get(group_id)
if panel:
panel.update_drone_list()
panel.update_status()
self.statusBar().showMessage(
f"Group {group_id}: 框選分配 {len(valid)} 台無人機", 3000)
def _handle_select_all_for_group(self, group_id):
"""全選所有可用無人機,直接分配到該群組"""
group = self.mission_groups.get(group_id)
if not group:
return
other = self._get_other_assigned(group_id)
available = {did for did in self.drones.keys() if did not in other}
group.drone_ids = available
panel = self.group_panels.get(group_id)
if panel:
panel.update_drone_list()
panel.update_status()
self.statusBar().showMessage(
f"Group {group_id}: 全選分配 {len(available)} 台無人機", 3000)
def _handle_clear_group(self, group_id):
"""清除群組的無人機分配"""
group = self.mission_groups.get(group_id)
if not group:
return
group.drone_ids = set()
group.planned_waypoints = None
if group.executor:
group.executor.stop()
self.drone_map.clear_mission_plan_for_group(group_id)
panel = self.group_panels.get(group_id)
if panel:
panel.update_drone_list()
panel.update_status()
panel.clear_mission_info()
self.statusBar().showMessage(
f"Group {group_id}: 已清除分組", 3000)
def _on_group_mission_completed(self, group_id):
"""群組任務完成回呼"""
panel = self.group_panels.get(group_id)
if panel:
panel.update_status()
self.statusBar().showMessage(f"✅ Group {group_id}: 所有無人機已完成任務", 5000)
# ================================================================================
# UI 更新
# ================================================================================
def update_ui(self, msg_type, drone_id, data):
"""只做數據快取,不在這裡更新 UI"""
if msg_type == 'connection_type':
conn_type = data.get('type', 'Unknown')
parts = drone_id.split('_')
if len(parts) >= 2 and parts[0].startswith('s'):
socket_id = parts[0][1:]
if socket_id not in self.socket_types:
self.socket_types[socket_id] = conn_type
if socket_id in self.socket_groups:
self.socket_groups[socket_id].set_socket_type(conn_type)
return
if drone_id not in self.drones:
self.add_drone(drone_id)
return
# 只做資料快取,不更新 UI - 所有 UI 更新都在 _update_panel_and_map 中進行
if drone_id not in self._message_cache:
self._message_cache[drone_id] = {}
self._message_cache[drone_id][msg_type] = data
# ================================================================================
# 勾選管理
# ================================================================================
def handle_group_selection(self, socket_id, state):
group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id]
is_checked = state == Qt.CheckState.Checked.value
for drone_id in group_drones:
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
checkbox.blockSignals(True)
checkbox.setChecked(is_checked)
checkbox.blockSignals(False)
if is_checked: self.monitor.selected_drones.add(drone_id)
else: self.monitor.selected_drones.discard(drone_id)
def handle_drone_selection(self, drone_id, state):
if state == Qt.CheckState.Checked.value:
self.monitor.selected_drones.add(drone_id)
else:
self.monitor.selected_drones.discard(drone_id)
self.update_group_checkbox_state(self.get_socket_id(drone_id))
def update_group_checkbox_state(self, socket_id):
group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id]
if not group_drones: return
checked_count = sum(1 for did in group_drones if self.drones[did].is_checked())
if socket_id in self.socket_groups:
group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox")
if group_checkbox:
group_checkbox.blockSignals(True)
if checked_count == 0: group_checkbox.setCheckState(Qt.CheckState.Unchecked)
elif checked_count == len(group_drones): group_checkbox.setCheckState(Qt.CheckState.Checked)
else: group_checkbox.setCheckState(Qt.CheckState.PartiallyChecked)
group_checkbox.blockSignals(False)
def handle_select_all(self):
all_selected = all(self.drones[did].is_checked() for did in self.drones)
new_state = not all_selected
for drone_id in self.drones:
self.drones[drone_id].set_checked(new_state)
for socket_id in self.socket_groups:
group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox")
if group_checkbox:
group_checkbox.blockSignals(True)
group_checkbox.setChecked(new_state)
group_checkbox.blockSignals(False)
def handle_drone_clicked(self, drone_id):
if drone_id in self.drones:
checkbox = self.drones[drone_id].get_checkbox()
checkbox.setChecked(not checkbox.isChecked())
def handle_clear_all_drone_selection(self):
for drone_id in self.drones.keys():
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
checkbox.blockSignals(True)
checkbox.setChecked(False)
checkbox.blockSignals(False)
self.monitor.selected_drones.clear()
for socket_id in self.socket_groups.keys():
self.update_group_checkbox_state(socket_id)
def handle_toggle_select_all_drones(self):
all_selected = all(self.drones[did].get_checkbox().isChecked() for did in self.drones.keys())
if all_selected:
for drone_id in self.drones.keys():
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
checkbox.blockSignals(True)
checkbox.setChecked(False)
checkbox.blockSignals(False)
self.monitor.selected_drones.clear()
else:
for drone_id in self.drones.keys():
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
checkbox.blockSignals(True)
checkbox.setChecked(True)
checkbox.blockSignals(False)
self.monitor.selected_drones.add(drone_id)
for socket_id in self.socket_groups.keys():
self.update_group_checkbox_state(socket_id)
# ================================================================================
# 任務規劃 — 點擊地圖(路由到 active group
# ================================================================================
def _get_group_drones(self, group):
"""取得群組的無人機 ID 列表(排序後)"""
return sorted(group.drone_ids, key=lambda x: (x.split('_')[0], int(x.split('_')[1])))
def handle_map_click(self, lat, lon):
"""處理地圖點擊事件 — 根據 active group 的任務類型規劃"""
group = self._get_active_group()
if not group:
self.statusBar().showMessage("⚠ 請先建立任務群組", 3000)
return
mode_map = {
'M_FORMATION': MissionType.M_FORMATION,
'CIRCLE_FORMATION': MissionType.CIRCLE_FORMATION,
}
mission_type = mode_map.get(group.mission_type)
if mission_type is None:
return # Grid Sweep / Leader-Follower 由各自的觸發方式處理
selected_drones = self._get_group_drones(group)
if len(selected_drones) == 0:
self.statusBar().showMessage(f"⚠ Group {group.group_id}: 請先分配無人機", 3000)
return
print(f"地圖點擊: {lat:.6f}, {lon:.6f} → Group {group.group_id} ({group.mission_type})")
panel = self.group_panels.get(group.group_id)
params = panel.get_mission_params() if panel else {}
base_alt = params.get('base_altitude', params.get('altitude', 10.0))
target_gps = (lat, lon, base_alt)
self.statusBar().showMessage(
f"⏳ Group {group.group_id}: 正在規劃 {group.mission_type} ({len(selected_drones)} 台) ...", 2000)
try:
drone_gps_positions = self._collect_drone_gps(selected_drones, base_alt)
if drone_gps_positions is None:
return
waypoints_per_drone, center_origin = self.mission_planner.plan_formation_mission(
drone_gps_positions, target_gps, mission_type, params=params
)
group.planned_waypoints = {'drone_ids': selected_drones, 'waypoints': waypoints_per_drone}
group.center_origin = center_origin
self.show_planned_waypoints(group)
center_lat, center_lon, _ = center_origin
self.drone_map.draw_mission_plan_for_group(
group.group_id, group.color,
center_lat, center_lon, lat, lon
)
self._launch_verification(
group.mission_type, drone_gps_positions, selected_drones,
waypoints_per_drone, center_origin, target_gps=target_gps
)
panel = self.group_panels.get(group.group_id)
if panel:
panel.update_status()
panel.update_mission_info(center_lat, center_lon, lat, lon)
total_wps = sum(len(wps) for wps in waypoints_per_drone)
self.statusBar().showMessage(
f"✓ Group {group.group_id}: {group.mission_type} 規劃完成!{len(selected_drones)} 台,共 {total_wps} 個航點", 5000
)
except Exception as e:
self.statusBar().showMessage(f"❌ Group {group.group_id}: 規劃失敗: {str(e)}", 5000)
import traceback
traceback.print_exc()
# ================================================================================
# 任務規劃 — 矩形選取 (Grid Sweep)
# ================================================================================
def handle_rectangle_selected(self, points_json):
group = self._get_active_group()
if not group:
self.statusBar().showMessage("⚠ 請先建立任務群組", 3000)
return
if group.mission_type != 'GRID_SWEEP':
return # 非 Grid Sweep 模式不處理矩形選取
selected_drones = self._get_group_drones(group)
if len(selected_drones) == 0:
self.statusBar().showMessage(f"⚠ Group {group.group_id}: 請先分配無人機", 3000)
return
print(f"矩形選取 → Group {group.group_id}: {points_json}")
try:
rect_corners = [(p[0], p[1]) for p in json.loads(points_json)]
except (json.JSONDecodeError, IndexError):
self.statusBar().showMessage("❌ 矩形座標解析失敗", 3000)
return
panel = self.group_panels.get(group.group_id)
params = panel.get_mission_params() if panel else {}
base_alt = params.get('altitude', 10.0)
self.statusBar().showMessage(
f"⏳ Group {group.group_id}: 正在規劃 Grid Sweep ({len(selected_drones)} 台) ...", 2000)
try:
drone_gps_positions = self._collect_drone_gps(selected_drones, base_alt)
if drone_gps_positions is None:
return
target_lat = sum(c[0] for c in rect_corners) / 4
target_lon = sum(c[1] for c in rect_corners) / 4
target_gps = (target_lat, target_lon, base_alt)
params['rect_corners'] = rect_corners
waypoints_per_drone, center_origin = self.mission_planner.plan_formation_mission(
drone_gps_positions, target_gps, MissionType.GRID_SWEEP,
params=params
)
group.planned_waypoints = {'drone_ids': selected_drones, 'waypoints': waypoints_per_drone}
group.center_origin = center_origin
self.show_planned_waypoints(group)
center_lat, center_lon, _ = center_origin
self.drone_map.draw_mission_plan_for_group(
group.group_id, group.color,
center_lat, center_lon, target_lat, target_lon
)
self._launch_verification(
'grid_sweep', drone_gps_positions, selected_drones,
waypoints_per_drone, center_origin, rect_corners=rect_corners
)
panel = self.group_panels.get(group.group_id)
if panel:
panel.update_status()
panel.update_mission_info(center_lat, center_lon, target_lat, target_lon)
total_wps = sum(len(wps) for wps in waypoints_per_drone)
self.statusBar().showMessage(
f"✓ Group {group.group_id}: Grid Sweep 規劃完成!{len(selected_drones)} 台,共 {total_wps} 個航點", 5000
)
except Exception as e:
self.statusBar().showMessage(f"❌ Group {group.group_id}: Grid Sweep 規劃失敗: {str(e)}", 5000)
import traceback
traceback.print_exc()
# ================================================================================
# 任務規劃 — 路徑確認 (Leader-Follower)
# ================================================================================
def handle_route_confirmed(self, points_json):
group = self._get_active_group()
if not group:
self.statusBar().showMessage("⚠ 請先建立任務群組", 3000)
return
if group.mission_type != 'LEADER_FOLLOWER':
return
selected_drones = self._get_group_drones(group)
if len(selected_drones) == 0:
self.statusBar().showMessage(f"⚠ Group {group.group_id}: 請先分配無人機", 3000)
return
print(f"路徑確認 → Group {group.group_id}: {points_json}")
try:
route_points = json.loads(points_json)
route_waypoints = [(p[0], p[1]) for p in route_points]
except (json.JSONDecodeError, IndexError):
self.statusBar().showMessage("❌ 路徑座標解析失敗", 3000)
return
if len(route_waypoints) < 2:
self.statusBar().showMessage("⚠ 至少需要 2 個路徑點", 3000)
return
panel = self.group_panels.get(group.group_id)
params = panel.get_mission_params() if panel else {}
base_alt = params.get('altitude', 10.0)
self.statusBar().showMessage(
f"⏳ Group {group.group_id}: 正在規劃跟隨模式 ({len(selected_drones)} 台) ...", 2000)
try:
drone_gps_positions = self._collect_drone_gps(selected_drones, base_alt)
if drone_gps_positions is None:
return
target_lat = sum(p[0] for p in route_waypoints) / len(route_waypoints)
target_lon = sum(p[1] for p in route_waypoints) / len(route_waypoints)
target_gps = (target_lat, target_lon, base_alt)
params['route_waypoints'] = route_waypoints
waypoints_per_drone, center_origin = self.mission_planner.plan_formation_mission(
drone_gps_positions, target_gps, MissionType.LEADER_FOLLOWER,
params=params
)
group.planned_waypoints = {'drone_ids': selected_drones, 'waypoints': waypoints_per_drone}
group.center_origin = center_origin
self.show_planned_waypoints(group)
center_lat, center_lon, _ = center_origin
self.drone_map.draw_mission_plan_for_group(
group.group_id, group.color,
center_lat, center_lon, target_lat, target_lon
)
self._launch_verification(
'LEADER_FOLLOWER', drone_gps_positions, selected_drones,
waypoints_per_drone, center_origin,
target_gps=target_gps, route_waypoints=route_waypoints
)
panel = self.group_panels.get(group.group_id)
if panel:
panel.update_status()
panel.update_mission_info(center_lat, center_lon, target_lat, target_lon)
total_wps = sum(len(wps) for wps in waypoints_per_drone)
self.statusBar().showMessage(
f"✓ Group {group.group_id}: 跟隨模式規劃完成!{len(selected_drones)} 台,共 {total_wps} 個航點", 5000
)
except Exception as e:
self.statusBar().showMessage(f"❌ Group {group.group_id}: 跟隨模式規劃失敗: {str(e)}", 5000)
import traceback
traceback.print_exc()
# ================================================================================
# 任務執行回呼
# ================================================================================
def on_drone_waypoint_reached(self, drone_id, wp_index, total):
if wp_index >= total:
self.statusBar().showMessage(f"📍 {drone_id} 完成所有航點", 3000)
else:
self.statusBar().showMessage(f"📍 {drone_id} → WP {wp_index}/{total}", 2000)
# ================================================================================
# 輔助方法
# ================================================================================
def _collect_drone_gps(self, selected_drones, base_alt):
drone_gps_positions = []
for drone_id in selected_drones:
if hasattr(self.monitor, 'drone_gps') and drone_id in self.monitor.drone_gps:
gps_data = self.monitor.drone_gps[drone_id]
drone_gps_positions.append((gps_data['lat'], gps_data['lon'], gps_data.get('alt', base_alt)))
elif drone_id in self.drone_positions:
pos = self.drone_positions[drone_id]
lat_drone = 24.0 + pos[1] / 111000
lon_drone = 120.0 + pos[0] / (111000 * 0.9)
alt_drone = pos[2] if len(pos) > 2 else base_alt
drone_gps_positions.append((lat_drone, lon_drone, alt_drone))
else:
self.statusBar().showMessage(f"⚠ 找不到 {drone_id} 的位置資料", 3000)
return None
return drone_gps_positions
def _launch_verification(self, mission_type, drone_gps_positions,
selected_drones, waypoints_per_drone, origin,
target_gps=None, rect_corners=None, route_waypoints=None):
"""存 JSON + 啟動 matplotlib 視覺化驗證 (獨立 process)"""
import os
data = {
'mission_type': mission_type,
'drone_ids': selected_drones,
'drones_gps': drone_gps_positions,
'waypoints': waypoints_per_drone,
'origin': list(origin),
}
if target_gps:
data['target'] = list(target_gps)
if rect_corners:
data['rect_corners'] = rect_corners
if route_waypoints:
data['route_waypoints'] = route_waypoints
json_path = '/tmp/mission_plan.json'
with open(json_path, 'w') as f:
json.dump(data, f, indent=2)
script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'validation', 'verify_waypoints.py')
subprocess.Popen([sys.executable, script_path, '--file', json_path])
print(f"驗證視窗已啟動: {json_path}")
def show_planned_waypoints(self, group=None):
pw = group.planned_waypoints if group else None
if not pw:
return
gid = group.group_id if group else "?"
print(f"\n{'=' * 60}")
print(f"任務規劃結果 — Group {gid}")
print(f"{'=' * 60}")
drone_ids = pw['drone_ids']
waypoints = pw['waypoints']
print(f"\n{len(drone_ids)} 台無人機")
for i, drone_id in enumerate(drone_ids):
wps = waypoints[i]
print(f"\n{drone_id}】({len(wps)} 個航點)")
for j, wp in enumerate(wps):
print(f" WP{j}: ({wp[0]:.6f}°, {wp[1]:.6f}°, {wp[2]:.1f}m)")
print(f"\n{'=' * 60}")
def get_selected_drones(self):
return [did for did, panel in self.drones.items() if hasattr(panel, 'checkbox') and panel.checkbox.isChecked()]
def update_field(self, panel, drone_id, field, text, color=None):
if isinstance(panel, DronePanel):
panel.update_field(field, text, color)
def update_overview_table(self, drone_id=None, field=None, value=None):
if not hasattr(self, 'overview_table') or self.overview_table is None: return
self.overview_table.set_drones(self.drones)
self.overview_table.update_table(drone_id, field, value)
def get_socket_id(self, drone_id):
import re
match = re.match(r's(\d+)_(\d+)', drone_id)
return match.group(1) if match else 'unknown'
def add_drone(self, drone_id):
if drone_id in self.drones: return
socket_id = self.get_socket_id(drone_id)
panel = self.create_drone_panel(drone_id)
self.drones[drone_id] = panel
if socket_id not in self.socket_groups:
self.socket_groups[socket_id] = self.create_socket_group_panel(socket_id)
self.socket_groups[socket_id].drones_layout.addWidget(panel)
self.reorganize_socket_groups()
self.update_group_checkbox_state(socket_id)
self.update_overview_table()
def reorganize_socket_groups(self):
while self.drone_panel_layout.count():
w = self.drone_panel_layout.takeAt(0).widget()
if w: w.setParent(None)
for socket_id, group_panel in self.socket_groups.items():
group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id]
while group_panel.drones_layout.count():
w = group_panel.drones_layout.takeAt(0).widget()
if w: w.setParent(None)
def sort_key(x):
parts = x[1:].split('_')
return (int(parts[0]), int(parts[1]))
for did in sorted(group_drones, key=sort_key):
group_panel.drones_layout.addWidget(self.drones[did])
for socket_id in sorted(self.socket_groups.keys(), key=lambda x: int(x)):
self.drone_panel_layout.addWidget(self.socket_groups[socket_id])
def _update_panel_and_map(self):
"""30Hz 定時更新 panel 和 map批量更新 UI 以避免過度重繪"""
if not hasattr(self, '_message_cache') or not self._message_cache:
return
# 頻率監控
if not hasattr(self, '_map_update_time'):
self._map_update_time = time.time()
self._map_update_count = 0
self._map_update_count += 1
now = time.time()
if now - self._map_update_time >= 1.0:
print(f"[Panel/Map Update] {self._map_update_count} Hz")
self._map_update_time = now
self._map_update_count = 0
# ✅ 步驟 1: 暫停表格的即時重繪
if hasattr(self, 'overview_table') and self.overview_table:
self.overview_table.setUpdatesEnabled(False)
try:
start_time = time.time()
# ✅ 步驟 2: 遍歷快取中最新的資料來更新 UI
for drone_id in list(self._message_cache.keys()):
if drone_id not in self.drones:
continue
panel = self.drones[drone_id]
cached_data = self._message_cache[drone_id]
# 處理所有快取的消息類型
for msg_type, data in cached_data.items():
if msg_type == 'state':
mode = data.get('mode', 'UNKNOWN')
armed = data.get('armed', None)
mode_color = '#FF5555' if any(x in mode.upper() for x in ['RTL', '返航', 'EMERGENCY']) else '#55FF55'
if armed is True:
arm_text, arm_color = "ARMED", '#55FF55'
elif armed is False:
arm_text, arm_color = "DISARMED", '#FF5555'
else:
arm_text, arm_color = "--", '#AAAAAA'
self.update_field(panel, drone_id, 'mode', mode, mode_color)
self.update_field(panel, drone_id, 'armed', arm_text, arm_color)
self.update_overview_table(drone_id, 'mode', mode)
self.update_overview_table(drone_id, 'armed', arm_text)
elif msg_type == 'battery':
voltage = data.get('voltage', 16)
cells = round(voltage / 3.95)
percentage = (voltage / cells - 3.7) / 0.5 * 100 if cells > 0 else 0
if percentage < 20: voltage_color = '#FF6464'
elif percentage < 50: voltage_color = '#FFA500'
else: voltage_color = '#FFFFFF'
percentage = data.get('percentage', percentage)
self.update_field(panel, drone_id, 'battery_pct', f"{percentage:.0f}%", voltage_color)
self.update_field(panel, drone_id, 'battery_vol', f"{voltage:.2f}V")
self.update_field(panel, drone_id, 'battery_cells', f"{cells}S")
self.update_overview_table(drone_id, 'battery', f"{voltage:.2f}V")
elif msg_type == 'altitude':
altitude = data.get('altitude', 0)
text = f"{altitude:.1f} m"
self.update_field(panel, drone_id, 'altitude', text)
self.update_overview_table(drone_id, 'altitude', text)
elif msg_type == 'local_pose':
x, y = data.get('x', 0), data.get('y', 0)
if not hasattr(self.monitor, 'drone_local'):
self.monitor.drone_local = {}
self.monitor.drone_local[drone_id] = {'x': x, 'y': y}
self.update_overview_table(drone_id, 'local', f"{x:.1f}, {y:.1f}")
elif msg_type == 'loss_rate':
text = f"{data.get('loss_rate', 0):.1f}%"
self.update_field(panel, drone_id, 'loss_rate', text)
self.update_overview_table(drone_id, 'loss_rate', text)
elif msg_type == 'ping':
text = f"{data.get('ping', 0):.1f} ms"
self.update_field(panel, drone_id, 'ping', text)
self.update_overview_table(drone_id, 'ping', text)
elif msg_type == 'velocity':
self.update_overview_table(drone_id, 'velocity', f"{data['vx']:.1f}, {data['vy']:.1f}")
elif msg_type == 'attitude':
roll, pitch, yaw = data.get('roll', 0), data.get('pitch', 0), data.get('yaw', 0)
self.update_overview_table(drone_id, 'roll', f"{roll:.1f}°")
self.update_overview_table(drone_id, 'pitch', f"{pitch:.1f}°")
self.update_overview_table(drone_id, 'yaw', f"{yaw:.1f}°")
panel._last_roll = roll
panel._last_pitch = pitch
if hasattr(panel, 'update_attitude'):
heading = self.drone_headings.get(drone_id, 0)
panel.update_attitude(heading, roll, pitch)
elif msg_type == 'gps':
gps_data = data
lat, lon = gps_data.get('lat', 0), gps_data.get('lon', 0)
self.drone_positions[drone_id] = (lat, lon)
alt = gps_data.get('alt', 0)
if not hasattr(self.monitor, 'drone_gps'):
self.monitor.drone_gps = {}
self.monitor.drone_gps[drone_id] = {'lat': lat, 'lon': lon, 'alt': alt}
self.update_overview_table(drone_id, 'latitude', f"{lat:.6f}°")
self.update_overview_table(drone_id, 'longitude', f"{lon:.6f}°")
elif msg_type == 'hud':
hud_data = data
heading = hud_data.get('heading', 0)
self.drone_headings[drone_id] = heading
groundspeed = hud_data.get('groundspeed', 0)
airspeed = hud_data.get('airspeed', 0)
throttle = hud_data.get('throttle', 0)
hud_alt = hud_data.get('alt', 0)
climb = hud_data.get('climb', 0)
self.update_overview_table(drone_id, 'heading', f"{heading:.1f}°")
self.update_overview_table(drone_id, 'groundspeed', f"{groundspeed:.1f} m/s" if isinstance(groundspeed, (int, float)) else "--")
self.update_overview_table(drone_id, 'airspeed', f"{airspeed:.1f} m/s" if isinstance(airspeed, (int, float)) else "--")
self.update_overview_table(drone_id, 'throttle', f"{throttle:.0f}%" if isinstance(throttle, (int, float)) else "--")
self.update_overview_table(drone_id, 'hud_alt', f"{hud_alt:.1f} m" if isinstance(hud_alt, (int, float)) else "--")
self.update_overview_table(drone_id, 'climb', f"{climb:.1f} m/s" if isinstance(climb, (int, float)) else "--")
self.update_field(panel, drone_id, 'heading', f"{heading:.1f}°")
self.update_field(panel, drone_id, 'groundspeed', f"{groundspeed:.1f} m/s" if isinstance(groundspeed, (int, float)) else "--")
self.update_field(panel, drone_id, 'speed', f"{groundspeed:.1f} m/s" if isinstance(groundspeed, (int, float)) else "--")
if drone_id in self.drone_positions:
lat, lon = self.drone_positions[drone_id]
self.drone_map.update_drone_position(drone_id, lat, lon, heading)
elapsed = (time.time() - start_time) * 1000
if elapsed > 33:
print(f"[WARNING] UI update took {elapsed:.1f}ms (target: <33ms)")
finally:
# ✅ 步驟 3: 恢復表格重繪(所有資料已填好,一次性重繪)
if hasattr(self, 'overview_table') and self.overview_table:
self.overview_table.setUpdatesEnabled(True)
self.overview_table.viewport().update()
def spin_ros(self):
try:
self.executor.spin_once(timeout_sec=0.01)
for (drone_id, msg_type), data in self.monitor.latest_data.items():
self.monitor.signals.update_signal.emit(msg_type, drone_id, data)
self.monitor.latest_data.clear()
except Exception as e:
print(f"ROS spin error: {e}")
def closeEvent(self, event):
for group in self.mission_groups.values():
if group.executor:
group.executor.stop()
self.command_sender.close()
for receiver in self.udp_receivers:
receiver.stop()
for receiver in self.monitor.ws_receivers:
receiver.stop()
self.monitor.destroy_node()
self.executor.shutdown()
rclpy.shutdown()
event.accept()
if __name__ == '__main__':
app = QApplication(sys.argv)
station = ControlStationUI()
station.show()
app.exec()