#!/usr/bin/env python3 import rclpy from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, QWidget, QLabel, QSplitter, QScrollArea, QSizePolicy, QTabWidget, QTableWidget, QTableWidgetItem, QHeaderView, QPushButton, QCheckBox, QLineEdit, QComboBox, QDialog, QPlainTextEdit, QSlider) from PyQt6.QtCore import Qt, QTimer, QObject, pyqtSignal, QEvent, QPropertyAnimation, pyqtProperty, QEasingCurve from PyQt6.QtGui import QColor, QFont, QPainter, QPen import sys import asyncio import json import subprocess import time import traceback import re import threading from concurrent.futures import ThreadPoolExecutor def _log(level, message): print(f"[{level}] {message}", flush=True) # 導入分離的類別 from communication import DroneMonitor, UDPMavlinkReceiver, WebSocketMavlinkReceiver from map_layout import DroneMap from drone_panel import DronePanel, SocketGroupPanel from comm_panel import CommPanel from overview_table import OverviewTable # ================================================================================ # 導入任務規劃器、執行器、發送器、群組 # ================================================================================ from mission_planner import FormationPlanner, MissionType from command_sender import MavlinkSender, Ros2CommandSender from mission_executor import MissionExecutor, MissionState from mission_group import ( MissionGroup, GroupPanel, DroneAssignDialog, GROUP_COLORS, DEFAULT_MISSION_PARAM_VALUES ) # ================================================================================ class StreamRedirector(QObject): """將 stdout/stderr 同步轉發到 Qt UI。""" text_written = pyqtSignal(str) def __init__(self, original_stream=None, parent=None): super().__init__(parent) self.original_stream = original_stream self._buffer = "" def write(self, text): if self.original_stream: self.original_stream.write(text) if not text: return self._buffer += text while "\n" in self._buffer: line, self._buffer = self._buffer.split("\n", 1) line = line.strip() if line: self.text_written.emit(line) def flush(self): if self.original_stream: self.original_stream.flush() line = self._buffer.strip() if line: self.text_written.emit(line) self._buffer = "" class ToggleSwitch(QWidget): """自訂滑動開關 — ON 綠色 / OFF 紅色""" toggled = pyqtSignal(bool) def __init__(self, checked=True, parent=None): super().__init__(parent) self._checked = checked self._knob_x = 1.0 if checked else 0.0 self.setFixedSize(44, 24) self.setCursor(Qt.CursorShape.PointingHandCursor) self._animation = QPropertyAnimation(self, b"knob_position", self) self._animation.setDuration(150) self._animation.setEasingCurve(QEasingCurve.Type.InOutCubic) def _get_knob_position(self): return self._knob_x def _set_knob_position(self, val): self._knob_x = val self.update() knob_position = pyqtProperty(float, _get_knob_position, _set_knob_position) def isChecked(self): return self._checked def setChecked(self, checked): if self._checked != checked: self._checked = checked self._animate(checked) self.toggled.emit(checked) def mousePressEvent(self, event): self.setChecked(not self._checked) def _animate(self, checked): self._animation.stop() self._animation.setStartValue(self._knob_x) self._animation.setEndValue(1.0 if checked else 0.0) self._animation.start() def paintEvent(self, event): p = QPainter(self) p.setRenderHint(QPainter.RenderHint.Antialiasing) w, h = self.width(), self.height() r = h / 2 # 背景軌道:依 knob_x 插值 灰→綠 red = QColor(0x88, 0x88, 0x88) green = QColor(0x4C, 0xAF, 0x50) t = self._knob_x bg = QColor( int(red.red() + (green.red() - red.red()) * t), int(red.green() + (green.green() - red.green()) * t), int(red.blue() + (green.blue() - red.blue()) * t), ) p.setPen(Qt.PenStyle.NoPen) p.setBrush(bg) p.drawRoundedRect(0, 0, w, h, r, r) # 滑塊圓形 knob_d = h - 4 knob_range = w - knob_d - 4 knob_cx = 2 + self._knob_x * knob_range + knob_d / 2 p.setBrush(QColor(255, 255, 255)) p.drawEllipse(int(knob_cx - knob_d / 2), 2, int(knob_d), int(knob_d)) p.end() class ControlStationUI(QMainWindow): planning_finished = pyqtSignal(object) VERSION = '2.3.0' FONT_SCALE_MIN = 70 FONT_SCALE_MAX = 180 FONT_SCALE_DEFAULT = 100 def __init__(self): super().__init__() self.setWindowTitle(f'GCS v{self.VERSION}') self.resize(1400, 900) self._closing = False # 初始化消息隊列(用於線程安全的 GUI 更新) import queue self.message_queue = queue.Queue() # 初始化ROS2 Monitor(ROS2 本身在 main() 中已初始化) self.monitor = DroneMonitor() self.monitor.signals.update_signal.connect(self.update_ui) # ROS执行器配置 self.executor = rclpy.executors.SingleThreadedExecutor() self.executor.add_node(self.monitor) # 將執行器註冊到 DroneMonitor,以便動態創建的 CommandLongClient 能被添加 self.monitor.executor = self.executor # 在背景執行緒處理 ROS2 spin,避免佔用 Qt 主執行緒時間 self.ros_thread_running = True self.ros_thread = threading.Thread(target=self._ros_spin_thread, daemon=True) self.ros_thread.start() # 任務規劃移到 worker thread,避免大量航點計算卡住 Qt 主執行緒 self.planner_pool = ThreadPoolExecutor(max_workers=2) self._plan_request_counter = 0 self._latest_plan_request_by_group = {} self.planning_finished.connect(self._on_plan_done) # 驅動 asyncio 事件循環的定時器(新增 - 關鍵!) # 這允許異步任務(如 arm_drone)能夠執行 self.asyncio_timer = QTimer() self.asyncio_timer.timeout.connect(self._spin_asyncio) self.asyncio_timer.start(10) # 每 10ms 驅動一次 asyncio # 初始化 panel 更新(10Hz) self.panel_map_timer = QTimer() self.panel_map_timer.timeout.connect(self._update_panel_and_map) self.panel_map_timer.start(100) # 10Hz # 消息隊列處理定時器(來自線程的 GUI 更新) self.msg_queue_timer = QTimer() self.msg_queue_timer.timeout.connect(self._process_message_queue) self.msg_queue_timer.start(50) # 每 50ms 檢查一次 # 快取消息數據,以便在沒有新消息時使用上一次的值 self._message_cache = {} self._overview_cache = {} self.message_history = [] self.max_message_history = 500 # 初始化UI self.drones = {} self.socket_groups = {} self.socket_types = {} self.socket_colors = { '0': '#00BFFF', # 天藍色 (DeepSkyBlue) '1': '#FFD700', # 金色 (Gold) '2': '#FF6969', # 淺紅色 (Light Red) '3': '#FF69B4', # 熱粉紅 (HotPink) '4': '#00FA9A', # 中春綠 (MediumSpringGreen) '5': '#9370DB', # 中紫色 (MediumPurple) - 串口 '6': '#FFA500', # 橙色 (Orange) '7': '#20B2AA', # 淺海綠 (LightSeaGreen) '8': '#7CFC00', # 草綠色 (LawnGreen) '9': '#FF8C00', # 深橙色 (DarkOrange) 'default': '#AAAAAA' # 灰色 } self.drone_positions = {} self.drone_headings = {} # 初始化地圖 self.drone_map = DroneMap() # 地圖更新獨立降頻(5Hz),避免 WebEngine / JS map 拖慢 panel 更新 self.map_timer = QTimer() self.map_timer.timeout.connect(self._update_map_only) self.map_timer.start(200) # Overview table 獨立批次更新(5Hz),避免每筆資料都重繪 Qt table self.overview_timer = QTimer() self.overview_timer.timeout.connect(self._flush_overview_table) self.overview_timer.start(200) # 初始化連接列表 self.udp_receivers = [] self.udp_connections = [] self.ws_connections = [] self.serial_receivers = [] self.serial_connections = [] # ================================================================================ # 初始化任務規劃器 # ================================================================================ self.mission_planner = FormationPlanner( spacing=5.0, # 5 公尺間距 base_altitude=10.0, # 基準高度 10 公尺 altitude_diff=2.0 # 高低差 2 公尺 ) # ================================================================================ # ================================================================================ # 初始化指令發送器(所有群組共用) # ================================================================================ # 走 ROS2 /fc_network/vehicle/pos_global_int service;per-drone client + asyncio polling self.command_sender = Ros2CommandSender(monitor=self.monitor) # ================================================================================ # ================================================================================ # 多任務群組管理 # ================================================================================ self.mission_groups = {} # group_id → MissionGroup self.group_panels = {} # group_id → GroupPanel self.active_group_id = None # 當前 active 的 group self._group_counter = 0 # 用來產生 group_id self._pending_box_assign = None # 框選後直接分配到的 group_id # ================================================================================ self.global_mission_defaults = dict(DEFAULT_MISSION_PARAM_VALUES) self.font_scale = self.FONT_SCALE_DEFAULT / 100.0 self.pending_font_scale = self.font_scale self._font_scale_applying = False self._base_app_font = QFont(QApplication.instance().font()) self.init_ui() self._setup_stream_redirector() app = QApplication.instance() app.setProperty("font_scale", self.font_scale) app.setProperty("base_app_font", self._base_app_font) app.installEventFilter(self) def init_ui(self): main_splitter = QSplitter(Qt.Orientation.Horizontal) self.main_splitter = main_splitter # 左側 TabWidget self.left_tab = QTabWidget() # — 分頁 1:Drone Panel self.drone_panel_container = QWidget() self.drone_panel_layout = QVBoxLayout(self.drone_panel_container) self.drone_panel_layout.setAlignment(Qt.AlignmentFlag.AlignTop) self.drone_panel_layout.setSpacing(0) self.drone_panel_layout.setContentsMargins(10, 10, 10, 10) scroll = QScrollArea() scroll.setWidget(self.drone_panel_container) scroll.setWidgetResizable(True) self.left_tab.addTab(scroll, "無人載具") # — 分頁 2:Overview Table self.overview_table = OverviewTable() self.left_tab.addTab(self.overview_table, "總覽") # — 分頁 3:訊息歷史 self.message_history_tab = self._create_message_history_tab() self.left_tab.addTab(self.message_history_tab, "紀錄") # — 分頁 4:通訊設定 self.comm_panel = CommPanel() self.comm_panel.udp_connection_added.connect(self.handle_udp_connection_added) self.comm_panel.ws_connection_added.connect(self.handle_ws_connection_added) self.comm_panel.serial_connection_added.connect(self.handle_serial_connection_added) self.comm_panel.udp_connection_toggled.connect(self.toggle_udp_connection) self.comm_panel.ws_connection_toggled.connect(self.toggle_ws_connection) self.comm_panel.serial_connection_toggled.connect(self.toggle_serial_connection) self.comm_panel.udp_connection_removed.connect(self.remove_udp_connection) self.comm_panel.ws_connection_removed.connect(self.remove_ws_connection) self.comm_panel.serial_connection_removed.connect(self.remove_serial_connection) self.comm_panel.status_message.connect(lambda msg, timeout: self.statusBar().showMessage(msg, timeout)) self.left_tab.addTab(self.comm_panel, "通訊") # — 分頁 5:全域設定 self.settings_tab = self._create_settings_tab() self.left_tab.addTab(self.settings_tab, "設定") # 右侧容器 right_container = QWidget() right_layout = QVBoxLayout(right_container) right_layout.setContentsMargins(10, 10, 10, 10) right_layout.setSpacing(10) # ========== 任務群組 Tab ========== self.group_container = QWidget() group_container_layout = QVBoxLayout(self.group_container) group_container_layout.setContentsMargins(0, 0, 0, 0) group_container_layout.setSpacing(6) group_header = QHBoxLayout() # 標題 + 收起/展開按鈕 title_layout = QHBoxLayout() group_title = QLabel("任務群組") group_title.setStyleSheet( "color: #DDD; font-size: 14px; font-weight: bold; padding: 2px;") title_layout.addWidget(group_title) # 收起/展開按鈕 self.toggle_group_btn = QPushButton("▼") self.toggle_group_btn.setStyleSheet(""" QPushButton { background-color: #555; color: white; border: none; padding: 3px 8px; border-radius: 3px; font-size: 12px; font-weight: bold; min-width: 30px; max-width: 30px; } QPushButton:hover { background-color: #666; } """) self.toggle_group_btn.setToolTip("收起/展開任務群組") self.toggle_group_btn.clicked.connect(self._toggle_group_panel) title_layout.addWidget(self.toggle_group_btn) title_layout.addStretch() group_header.addLayout(title_layout) group_header.addStretch() verify_label = QLabel("模擬驗證") verify_label.setStyleSheet("color: #CCC; font-size: 12px;") self.verify_toggle = ToggleSwitch(checked=True) group_header.addWidget(verify_label) group_header.addWidget(self.verify_toggle) clear_traj_btn = QPushButton("清除軌跡") clear_traj_btn.setStyleSheet(""" QPushButton { background-color: #EF5350; color: white; border: none; padding: 5px 12px; border-radius: 4px; font-size: 12px; font-weight: bold; } QPushButton:hover { background-color: #E53935; } """) clear_traj_btn.clicked.connect(self.drone_map.clear_trajectories) group_header.addWidget(clear_traj_btn) group_container_layout.addLayout(group_header) self.group_tab_widget = QTabWidget() self.group_tab_widget.setStyleSheet(""" QTabWidget::pane { border: 1px solid #444; background-color: #2B2B2B; } QTabBar::tab { background-color: #333; color: #AAA; border: 1px solid #444; padding: 5px 12px; margin-right: 2px; border-top-left-radius: 4px; border-top-right-radius: 4px; font-size: 12px; } QTabBar::tab:selected { background-color: #2B2B2B; color: #FFF; border-bottom-color: #2B2B2B; } QTabBar::tab:hover { background-color: #3A3A3A; } """) self.group_tab_widget.setMinimumHeight(80) self.group_tab_widget.currentChanged.connect(self._on_group_tab_changed) group_container_layout.addWidget(self.group_tab_widget) # 🌟 新增:保存群組面板的展開狀態 self.group_panel_expanded = True # 預設建立 Group A self._add_mission_group() # 任務群組與地圖之間使用垂直 splitter,可上下拖曳調整高度 self.right_vertical_splitter = QSplitter(Qt.Orientation.Vertical) self.right_vertical_splitter.addWidget(self.group_container) self.right_vertical_splitter.addWidget(self.drone_map.get_widget()) self.right_vertical_splitter.setChildrenCollapsible(False) self.right_vertical_splitter.setStretchFactor(0, 0) self.right_vertical_splitter.setStretchFactor(1, 1) self.right_vertical_splitter.setSizes([170, 700]) right_layout.addWidget(self.right_vertical_splitter) # 添加地圖 self.drone_map.get_gps_signal().connect(self.handle_map_click) self.drone_map.get_drone_clicked_signal().connect(self.handle_drone_clicked) self.drone_map.get_clear_all_drone_selection_signal().connect(self.handle_clear_all_drone_selection) self.drone_map.get_toggle_select_all_drones_signal().connect(self.handle_toggle_select_all_drones) # ================================================================================ # 連接地圖信號(任務模式改由 Group Tab 控制,不再從地圖下拉選單) # ================================================================================ self.drone_map.get_rectangle_selected_signal().connect(self.handle_rectangle_selected) self.drone_map.get_route_confirmed_signal().connect(self.handle_route_confirmed) self.drone_map.get_drone_box_selected_signal().connect(self._handle_drone_box_selected) # ================================================================================ main_splitter.addWidget(self.left_tab) main_splitter.addWidget(right_container) main_splitter.setSizes([400, 1000]) self.setCentralWidget(main_splitter) self.statusBar().messageChanged.connect(self._on_status_bar_message_changed) def _create_message_history_tab(self): """建立左側訊息歷史分頁。""" widget = QWidget() layout = QVBoxLayout(widget) layout.setContentsMargins(10, 10, 10, 10) layout.setSpacing(8) header_layout = QHBoxLayout() title = QLabel("操作與訊息歷史") title.setStyleSheet("color: #DDD; font-size: 14px; font-weight: bold;") header_layout.addWidget(title) header_layout.addStretch() clear_btn = QPushButton("清空") clear_btn.setStyleSheet(""" QPushButton { background-color: #555; color: white; border: none; padding: 5px 10px; border-radius: 4px; font-size: 12px; } QPushButton:hover { background-color: #666; } """) clear_btn.clicked.connect(self._clear_message_history) header_layout.addWidget(clear_btn) layout.addLayout(header_layout) self.message_history_view = QPlainTextEdit() self.message_history_view.setReadOnly(True) self.message_history_view.setStyleSheet(""" QPlainTextEdit { background-color: #1E1E1E; color: #DDD; border: 1px solid #444; border-radius: 6px; padding: 6px; font-family: monospace; font-size: 12px; } """) self.message_history_view.setPlaceholderText("左下角狀態訊息會顯示在這裡...") layout.addWidget(self.message_history_view) return widget def _create_settings_tab(self): """建立字體設定分頁。""" widget = QWidget() layout = QVBoxLayout(widget) layout.setContentsMargins(10, 10, 10, 10) layout.setSpacing(8) scale_panel = QWidget() scale_panel.setStyleSheet(""" QWidget { background-color: #2A2A2A; border: 1px solid #444; border-radius: 6px; } """) scale_layout = QVBoxLayout(scale_panel) scale_layout.setContentsMargins(12, 12, 12, 12) scale_layout.setSpacing(8) self.font_scale_label = QLabel() self.font_scale_label.setStyleSheet("color: #DDD; font-size: 13px; font-weight: bold;") scale_layout.addWidget(self.font_scale_label) self.font_scale_slider = QSlider(Qt.Orientation.Horizontal) self.font_scale_slider.setRange(self.FONT_SCALE_MIN, self.FONT_SCALE_MAX) self.font_scale_slider.setValue(self.FONT_SCALE_DEFAULT) self.font_scale_slider.setTickInterval(10) self.font_scale_slider.setTickPosition(QSlider.TickPosition.TicksBelow) self.font_scale_slider.setStyleSheet(""" QSlider::groove:horizontal { border: 1px solid #444; height: 8px; background: #333; border-radius: 4px; } QSlider::handle:horizontal { background: #4A9EFF; border: 1px solid #3A8EEF; width: 18px; margin: -6px 0; border-radius: 9px; } QSlider::sub-page:horizontal { background: #4A9EFF; border-radius: 4px; } QSlider::add-page:horizontal { background: #2A2A2A; border-radius: 4px; } """) self.font_scale_slider.valueChanged.connect(self._handle_font_scale_slider_changed) scale_layout.addWidget(self.font_scale_slider) layout.addWidget(scale_panel) layout.addStretch() settings_button_row = QHBoxLayout() settings_button_row.addStretch() apply_btn = QPushButton("套用") apply_btn.setStyleSheet(""" QPushButton { background-color: #4A9EFF; color: white; border: none; padding: 6px 12px; border-radius: 4px; font-size: 12px; font-weight: bold; } QPushButton:hover { background-color: #3A8EEF; } """) apply_btn.clicked.connect(self._apply_pending_font_scale) settings_button_row.addWidget(apply_btn) layout.addLayout(settings_button_row) self._update_font_scale_label() return widget def _update_font_scale_label(self): percent = int(round(self.pending_font_scale * 100)) if hasattr(self, 'font_scale_label'): self.font_scale_label.setText(f"字體倍率:{percent}%") def _handle_font_scale_slider_changed(self, value): self.pending_font_scale = value / 100.0 self._update_font_scale_label() def _apply_pending_font_scale(self): self._apply_font_scale(self.pending_font_scale) def _scale_stylesheet_font_sizes(self, stylesheet, scale): if not stylesheet or 'font-size' not in stylesheet: return stylesheet def repl(match): size = float(match.group(1)) unit = match.group(2) scaled = max(1.0, size * scale) if unit == 'px': text = f"{scaled:.2f}".rstrip('0').rstrip('.') else: text = f"{scaled:.3f}".rstrip('0').rstrip('.') return f"font-size: {text}{unit}" return re.sub(r'font-size\s*:\s*([0-9]+(?:\.[0-9]+)?)\s*(px|pt)', repl, stylesheet) def _apply_font_scale_to_widget(self, widget): if hasattr(widget, 'apply_font_scale'): widget.apply_font_scale() return if hasattr(widget, '_base_stylesheet') and hasattr(widget, '_applied_stylesheet'): current_stylesheet = widget.styleSheet() base_stylesheet = widget._base_stylesheet if current_stylesheet != widget._applied_stylesheet: base_stylesheet = current_stylesheet widget._base_stylesheet = base_stylesheet scaled_stylesheet = self._scale_stylesheet_font_sizes(base_stylesheet, self.font_scale) widget._applied_stylesheet = scaled_stylesheet if current_stylesheet != scaled_stylesheet: widget.setStyleSheet(scaled_stylesheet) return current_stylesheet = widget.styleSheet() source_stylesheet = getattr(widget, '_font_scale_source_stylesheet', current_stylesheet) applied_stylesheet = getattr(widget, '_font_scale_applied_stylesheet', None) if current_stylesheet != applied_stylesheet: source_stylesheet = current_stylesheet scaled_stylesheet = self._scale_stylesheet_font_sizes(source_stylesheet, self.font_scale) widget._font_scale_source_stylesheet = source_stylesheet widget._font_scale_applied_stylesheet = scaled_stylesheet if current_stylesheet != scaled_stylesheet: widget.setStyleSheet(scaled_stylesheet) def _apply_font_scale_to_widget_tree(self, root): if root is None: return if isinstance(root, QWidget): self._apply_font_scale_to_widget(root) for child in root.findChildren(QWidget): self._apply_font_scale_to_widget(child) def _apply_font_scale(self, scale): app = QApplication.instance() splitter_sizes = None left_width = None old_left_min = None old_left_max = None old_window_size = self.size() old_window_min = self.minimumSize() old_window_max = self.maximumSize() lock_window_size = not self.isMaximized() and not self.isFullScreen() if hasattr(self, 'main_splitter') and self.main_splitter: splitter_sizes = self.main_splitter.sizes() if hasattr(self, 'left_tab') and self.left_tab: left_width = self.left_tab.width() old_left_min = self.left_tab.minimumWidth() old_left_max = self.left_tab.maximumWidth() if left_width > 0: self.left_tab.setMinimumWidth(left_width) self.left_tab.setMaximumWidth(left_width) self._font_scale_applying = True try: self.font_scale = scale app.setProperty("font_scale", scale) if hasattr(self, '_base_app_font'): scaled_font = QFont(self._base_app_font) if self._base_app_font.pointSizeF() > 0: scaled_font.setPointSizeF(max(1.0, self._base_app_font.pointSizeF() * scale)) elif self._base_app_font.pixelSize() > 0: scaled_font.setPixelSize(max(1, int(round(self._base_app_font.pixelSize() * scale)))) app.setFont(scaled_font) if lock_window_size: self.setMinimumSize(old_window_size) self.setMaximumSize(old_window_size) self._update_font_scale_label() self._apply_font_scale_to_widget_tree(self) if hasattr(self, 'comm_panel') and self.comm_panel: self.comm_panel.apply_font_scale() for panel in getattr(self, 'drones', {}).values(): if hasattr(panel, 'apply_font_scale'): panel.apply_font_scale() for panel in getattr(self, 'socket_groups', {}).values(): if hasattr(panel, 'apply_font_scale'): panel.apply_font_scale() for panel in getattr(self, 'group_panels', {}).values(): if hasattr(panel, 'apply_font_scale'): panel.apply_font_scale() if hasattr(self, 'drone_map') and self.drone_map: self.drone_map.set_font_scale(scale) if splitter_sizes: self.main_splitter.setSizes(splitter_sizes) if lock_window_size: self.resize(old_window_size) self.left_tab.updateGeometry() self.update() finally: self._font_scale_applying = False def _restore_locked_sizes(): if lock_window_size: self.setMinimumSize(old_window_min) self.setMaximumSize(old_window_max) self.resize(old_window_size) if self.left_tab and left_width and left_width > 0: self.left_tab.setMinimumWidth(left_width) self.left_tab.setMaximumWidth(left_width) if splitter_sizes and self.main_splitter: self.main_splitter.setSizes(splitter_sizes) def _release_size_locks(): if self.left_tab and old_left_min is not None and old_left_max is not None: self.left_tab.setMinimumWidth(old_left_min) self.left_tab.setMaximumWidth(old_left_max) QTimer.singleShot(0, _restore_locked_sizes) QTimer.singleShot(30, _restore_locked_sizes) QTimer.singleShot(100, _restore_locked_sizes) QTimer.singleShot(150, _release_size_locks) def eventFilter(self, obj, event): if self._font_scale_applying: return super().eventFilter(obj, event) if isinstance(obj, QWidget) and event.type() in { QEvent.Type.StyleChange, QEvent.Type.Polish, QEvent.Type.Show, }: self._font_scale_applying = True try: self._apply_font_scale_to_widget(obj) finally: self._font_scale_applying = False return super().eventFilter(obj, event) def _clear_message_history(self): """清空訊息歷史。""" self.message_history.clear() if hasattr(self, 'message_history_view'): self.message_history_view.clear() def _on_status_bar_message_changed(self, message): """同步狀態列訊息到歷史紀錄。""" if not message: return timestamp = time.strftime("%H:%M:%S") entry = f"[{timestamp}] {message}" self.message_history.append(entry) if len(self.message_history) > self.max_message_history: self.message_history = self.message_history[-self.max_message_history:] if hasattr(self, 'message_history_view'): self.message_history_view.appendPlainText(entry) scrollbar = self.message_history_view.verticalScrollBar() scrollbar.setValue(scrollbar.maximum()) def _setup_stream_redirector(self): """將 stdout/stderr 同步到左下角狀態列與訊息紀錄。""" self._original_stdout = sys.stdout self._original_stderr = sys.stderr self.stdout_redirector = StreamRedirector(self._original_stdout, self) self.stderr_redirector = StreamRedirector(self._original_stderr, self) self.stdout_redirector.text_written.connect(self.show_in_bottom_left) self.stderr_redirector.text_written.connect(self.show_in_bottom_left) sys.stdout = self.stdout_redirector sys.stderr = self.stderr_redirector def _restore_stream_redirector(self): """還原 stdout/stderr,避免關閉視窗後仍寫入 UI。""" if getattr(self, '_original_stdout', None) is not None: sys.stdout = self._original_stdout if getattr(self, '_original_stderr', None) is not None: sys.stderr = self._original_stderr def show_in_bottom_left(self, text): """將重導向的輸出顯示在左下角狀態列。""" if not text: return self.statusBar().showMessage(text, 5000) # ================================================================================ # 連線管理 # ================================================================================ def handle_udp_connection_added(self, ip, port): new_conn = {'name': f'UDP {len(self.udp_connections) + 1}', 'ip': ip, 'port': port, 'enabled': True} receiver = UDPMavlinkReceiver(ip, port, self.monitor.signals, new_conn['name'], self.monitor) receiver.start() self.udp_receivers.append(receiver) new_conn['receiver'] = receiver self.udp_connections.append(new_conn) self.comm_panel.add_udp_panel(new_conn) self.statusBar().showMessage(f"已添加 UDP 連接: {ip}:{port}", 3000) def handle_ws_connection_added(self, url): new_conn = {'name': f'WS {len(self.ws_connections) + 1}', 'url': url, 'enabled': True} receiver = WebSocketMavlinkReceiver(url, self.monitor.signals, new_conn['name'], self.monitor) receiver.start() self.monitor.ws_receivers.append(receiver) new_conn['receiver'] = receiver self.ws_connections.append(new_conn) self.comm_panel.add_ws_panel(new_conn) self.statusBar().showMessage(f"已添加 WebSocket 連接: {url}", 3000) def create_drone_panel(self, drone_id): panel = DronePanel(drone_id) panel.mode_change_requested.connect(self.handle_mode_change) panel.arm_requested.connect(self.handle_arm) panel.takeoff_requested.connect(self.handle_takeoff) panel.setpoint_requested.connect(self.handle_single_setpoint) panel.selection_changed.connect(self.handle_drone_selection) return panel def toggle_ws_connection(self, conn, btn, status_label): if conn.get('enabled', False): if 'receiver' in conn and conn['receiver']: conn['receiver'].stop() conn['enabled'] = False btn.setText("啟動") status_label.setStyleSheet("color: #888; font-size: 16px;") status_label.setToolTip("已停止") self.statusBar().showMessage(f"已停止 WebSocket 連接: {conn['url']}", 3000) else: receiver = WebSocketMavlinkReceiver(conn['url'], self.monitor.signals, conn['name'], self.monitor) receiver.start() self.monitor.ws_receivers.append(receiver) conn['receiver'] = receiver conn['enabled'] = True btn.setText("停止") status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") status_label.setToolTip("運行中") self.statusBar().showMessage(f"已啟動 WebSocket 連接: {conn['url']}", 3000) def remove_ws_connection(self, conn, panel): if 'receiver' in conn and conn['receiver']: conn['receiver'].stop() if conn['receiver'] in self.monitor.ws_receivers: self.monitor.ws_receivers.remove(conn['receiver']) if conn in self.ws_connections: self.ws_connections.remove(conn) self.comm_panel.remove_ws_connection_from_list(conn) panel.setParent(None) panel.deleteLater() self.statusBar().showMessage(f"已移除 WebSocket 連接: {conn['url']}", 3000) def toggle_udp_connection(self, conn, btn, status_label): if conn.get('enabled', False): if 'receiver' in conn and conn['receiver']: conn['receiver'].stop() conn['enabled'] = False btn.setText("啟動") status_label.setStyleSheet("color: #888; font-size: 16px;") status_label.setToolTip("已停止") self.statusBar().showMessage(f"已停止 UDP 連接: {conn['ip']}:{conn['port']}", 3000) else: receiver = UDPMavlinkReceiver(conn['ip'], conn['port'], self.monitor.signals, conn['name'], self.monitor) receiver.start() self.udp_receivers.append(receiver) conn['receiver'] = receiver conn['enabled'] = True btn.setText("停止") status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") status_label.setToolTip("運行中") self.statusBar().showMessage(f"已啟動 UDP 連接: {conn['ip']}:{conn['port']}", 3000) def remove_udp_connection(self, conn, panel): if 'receiver' in conn and conn['receiver']: conn['receiver'].stop() if conn['receiver'] in self.udp_receivers: self.udp_receivers.remove(conn['receiver']) if conn in self.udp_connections: self.udp_connections.remove(conn) self.comm_panel.remove_udp_connection_from_list(conn) panel.setParent(None) panel.deleteLater() self.statusBar().showMessage(f"已移除 UDP 連接: {conn['ip']}:{conn['port']}", 3000) def handle_serial_connection_added(self, port, baudrate): conn = {'name': 'Serial', 'port': port, 'baudrate': baudrate, 'enabled': False, 'receiver': None} self.serial_connections.append(conn) self.comm_panel.add_serial_panel(conn) self.statusBar().showMessage(f"已添加 Serial 連接: {port} @ {baudrate}", 3000) def toggle_serial_connection(self, conn, btn, status_label): if conn.get('enabled', False): if conn.get('receiver'): conn['receiver'].stop() conn['receiver'] = None conn['enabled'] = False btn.setText("啟動") status_label.setStyleSheet("color: #888; font-size: 16px;") status_label.setToolTip("已停止") self.statusBar().showMessage(f"已停止 Serial 連接: {conn['port']}", 3000) else: try: receiver = self.monitor.start_serial_connection(conn['port'], conn['baudrate']) conn['receiver'] = receiver conn['enabled'] = True btn.setText("停止") status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") status_label.setToolTip("運行中") self.statusBar().showMessage(f"已啟動 Serial 連接: {conn['port']}", 3000) except Exception as e: _log("ERROR", f"Serial 連接啟動失敗: {str(e)}") traceback.print_exc() self.statusBar().showMessage(f"啟動 Serial 連接失敗: {str(e)}", 5000) def remove_serial_connection(self, conn, panel): if conn.get('enabled', False) and conn.get('receiver'): conn['receiver'].stop() if conn in self.serial_connections: self.serial_connections.remove(conn) self.comm_panel.remove_serial_connection_from_list(conn) panel.setParent(None) panel.deleteLater() self.statusBar().showMessage(f"已移除 Serial 連接: {conn['port']}", 3000) def create_socket_group_panel(self, socket_id): color = self.socket_colors.get(socket_id, self.socket_colors['default']) socket_type = self.socket_types.get(socket_id, None) panel = SocketGroupPanel(socket_id, color, socket_type) panel.group_selection_changed.connect(self.handle_group_selection) return panel # ================================================================================ # 無人機操作 # ================================================================================ def handle_mode_change(self, drone_id): # 從 active group 的 mode_combo 讀取模式 group = self._get_active_group() if group: panel = self.group_panels.get(group.group_id) mode = panel.mode_combo.currentText() if panel else "GUIDED" else: mode = "GUIDED" _log("INFO", f"切換模式請求: drone={drone_id}, mode={mode}") loop = asyncio.get_event_loop() future = self.monitor.set_mode(drone_id, mode) loop.create_task(self.handle_service_response(future, f"切換模式 {mode} {drone_id}")) def handle_arm(self, drone_id): loop = asyncio.get_event_loop() arm_state = not self.monitor.get_arm_state(drone_id) coro = self.monitor.arm_drone(drone_id, arm_state) action_text = f"{'解鎖' if arm_state else '上鎖'} {drone_id}" _log("INFO", f"{action_text} 請求已送出") loop.create_task(self.handle_service_response(coro, action_text)) def handle_takeoff(self, drone_id): loop = asyncio.get_event_loop() future = self.monitor.takeoff_drone(drone_id, 10.0) loop.create_task(self.handle_service_response(future, f"起飛 {drone_id}")) def handle_setpoint_selected(self): """發送位置命令到 active group 的所有選中無人機""" group = self._get_active_group() if not group: self.statusBar().showMessage("請先建立任務群組", 3000) return try: x = float(self.x_input.text() or '0') y = float(self.y_input.text() or '0') z = float(self.z_input.text() or '0') for drone_id in group.selected_drone_ids: if self.monitor.send_setpoint(drone_id, x, y, z): self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000) else: self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000) except ValueError: self.statusBar().showMessage("座標格式錯誤", 3000) def handle_single_setpoint(self, drone_id): try: x = float(self.x_input.text() or '0') y = float(self.y_input.text() or '0') z = float(self.z_input.text() or '0') if self.monitor.send_setpoint(drone_id, x, y, z): self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000) else: self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000) except ValueError: self.statusBar().showMessage("座標格式錯誤", 3000) async def handle_service_response(self, future, action): try: result = await future if result: _log("INFO", f"{action} 成功") self.statusBar().showMessage(f"{action} 成功", 3000) else: _log("WARN", f"{action} 失敗") self.statusBar().showMessage(f"{action} 失敗", 3000) except asyncio.CancelledError: _log("WARN", f"{action} 已取消") except Exception as e: _log("ERROR", f"{action} 錯誤: {str(e)}") traceback.print_exc() self.statusBar().showMessage(f"{action} 錯誤: {str(e)}", 3000) def handle_arm_selected(self): """解鎖 active group 的所有選中無人機""" group = self._get_active_group() if not group: self.statusBar().showMessage("請先建立任務群組", 3000) return selected = list(group.selected_drone_ids) if not selected: self.statusBar().showMessage("尚未選擇任何無人機", 3000) return _log("INFO", f"批次解鎖請求: {', '.join(selected)}") loop = asyncio.get_event_loop() for drone_id in selected: coro = self.monitor.arm_drone(drone_id, True) # 使用 run_coroutine_threadsafe() 正確地在事件循環中運行 asyncio.run_coroutine_threadsafe( self.handle_service_response(coro, f"批次解鎖 {drone_id}"), loop ) def handle_takeoff_selected(self): """起飛 active group 的所有選中無人機""" group = self._get_active_group() if not group: self.statusBar().showMessage("請先建立任務群組", 3000) return selected = list(group.selected_drone_ids) if not selected: self.statusBar().showMessage("尚未選擇任何無人機", 3000) return loop = asyncio.get_event_loop() for drone_id in selected: future = self.monitor.takeoff_drone(drone_id, 10.0) loop.create_task(self.handle_service_response(future, f"批次起飛 {drone_id}")) # ================================================================================ # 任務群組管理 # ================================================================================ def _next_group_id(self): """產生下一個群組 ID (A, B, C, ...)""" gid = chr(ord('A') + self._group_counter) self._group_counter += 1 return gid def _toggle_group_panel(self): """🌟 收起/展開任務群組面板""" if self.group_panel_expanded: # 收起 if hasattr(self, 'right_vertical_splitter'): sizes = self.right_vertical_splitter.sizes() if sizes and sizes[0] > 0: self._last_group_panel_height = sizes[0] self.group_tab_widget.hide() header_height = self.group_container.sizeHint().height() self.group_container.setMaximumHeight(header_height) if hasattr(self, 'right_vertical_splitter'): total = sum(self.right_vertical_splitter.sizes()) or self.height() self.right_vertical_splitter.setSizes([header_height, max(1, total - header_height)]) self.toggle_group_btn.setText("▶") self.toggle_group_btn.setToolTip("展開任務群組") self.group_panel_expanded = False else: # 展開 self.group_container.setMaximumHeight(16777215) self.group_tab_widget.show() if hasattr(self, 'right_vertical_splitter'): total = sum(self.right_vertical_splitter.sizes()) or self.height() group_height = getattr(self, '_last_group_panel_height', 170) group_height = min(max(group_height, 120), max(120, total - 120)) self.right_vertical_splitter.setSizes([group_height, max(1, total - group_height)]) self.toggle_group_btn.setText("▼") self.toggle_group_btn.setToolTip("收起任務群組") self.group_panel_expanded = True def _add_mission_group(self): """新增一個任務群組""" gid = self._next_group_id() color = GROUP_COLORS[(self._group_counter - 1) % len(GROUP_COLORS)] group = MissionGroup(gid, color) self.mission_groups[gid] = group panel = GroupPanel(group, default_params=self.global_mission_defaults) panel.assign_drones_requested.connect(self._handle_assign_drones) panel.mission_type_changed.connect(self._handle_mission_type_changed) panel.start_requested.connect(self._handle_group_start) panel.pause_requested.connect(self._handle_group_pause) panel.stop_requested.connect(self._handle_group_stop) panel.mode_change_requested.connect(self._handle_group_mode_change) panel.arm_requested.connect(self._handle_group_arm) panel.takeoff_requested.connect(self._handle_group_takeoff) panel.box_select_requested.connect(self._handle_box_select) panel.select_all_requested.connect(self._handle_select_all_for_group) panel.clear_group_requested.connect(self._handle_clear_group) panel.add_group_requested.connect(self._add_mission_group) panel.delete_group_requested.connect(self._handle_delete_group) self.group_panels[gid] = panel # 用帶顏色的 tab 標題 scroll = QScrollArea() scroll.setWidget(panel) scroll.setWidgetResizable(True) idx = self.group_tab_widget.addTab(scroll, f"Group {gid}") self.group_tab_widget.tabBar().setTabTextColor(idx, QColor(color)) # 切換到新 group 的 tab self.group_tab_widget.setCurrentIndex(idx) self.statusBar().showMessage(f"已新增 Group {gid}", 2000) # 更新刪除按鈕的啟用/禁用狀態 self._update_delete_buttons_state() def _on_group_tab_changed(self, index): """切換 group tab — 只需切換 active group 並刷新 UI""" if index < 0: self.active_group_id = None return tab_text = self.group_tab_widget.tabText(index) gid = tab_text.replace("Group ", "") if gid not in self.mission_groups: return self.active_group_id = gid group = self.mission_groups[gid] # 同步地圖的任務模式 self.drone_map.set_mission_mode(group.mission_type) # 統一刷新所有 UI self.refresh_selection_ui() def _get_active_group(self): """取得當前 active 的 MissionGroup""" if self.active_group_id and self.active_group_id in self.mission_groups: return self.mission_groups[self.active_group_id] return None def refresh_selection_ui(self): """統一刷新所有 UI 元素:左側 drone 勾選、左側 socket 勾選、右側 group panel""" group = self._get_active_group() selected = group.selected_drone_ids if group else set() current_group_id = group.group_id if group else None # 左側 drone checkbox for drone_id, panel in self.drones.items(): checkbox = panel.get_checkbox() if checkbox: blocked = ( current_group_id is not None and self._is_drone_selected_by_other_group(drone_id, current_group_id) ) checkbox.blockSignals(True) checkbox.setStyleSheet(self._get_drone_checkbox_style(blocked)) checkbox.setChecked(drone_id in selected) checkbox.setEnabled(not blocked or drone_id in selected) checkbox.blockSignals(False) # 左側 socket checkbox for socket_id in self.socket_groups.keys(): self.refresh_socket_checkbox(socket_id, selected, current_group_id) # 右側 group panel if group: panel = self.group_panels.get(group.group_id) if panel: panel.update_drone_list() panel.update_status() def _get_drone_checkbox_style(self, blocked=False): """回傳 drone checkbox 樣式;被其他 group 佔用時顯示紅色填滿。""" blocked_style = """ QCheckBox::indicator:disabled { background-color: #D32F2F; border: 2px solid #A32020; } """ if blocked else "" return f""" QCheckBox {{ color: #DDD; }} QCheckBox::indicator {{ width: 16px; height: 16px; border: 2px solid #888888; border-radius: 3px; background: transparent; }} QCheckBox::indicator:checked {{ background-color: #7FFFD4; border: 2px solid #888888; }} {blocked_style} """ def refresh_socket_checkbox(self, socket_id, selected_ids, current_group_id=None): """根據 selected_ids 推導並更新 socket 的勾選狀態""" drone_ids = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id] if not drone_ids: return socket_widget = self.socket_groups.get(socket_id) if not socket_widget: return checkbox = socket_widget.findChild(QCheckBox, f"socket_{socket_id}_checkbox") if not checkbox: return effective_drone_ids = [ did for did in drone_ids if current_group_id is None or did in selected_ids or not self._is_drone_selected_by_other_group(did, current_group_id) ] checked_count = sum(1 for did in effective_drone_ids if did in selected_ids) checkbox.blockSignals(True) if checked_count == 0: checkbox.setCheckState(Qt.CheckState.Unchecked) elif checked_count == len(effective_drone_ids): checkbox.setCheckState(Qt.CheckState.Checked) else: checkbox.setCheckState(Qt.CheckState.PartiallyChecked) checkbox.setEnabled(len(effective_drone_ids) > 0) checkbox.blockSignals(False) # 【已遷移至 refresh_socket_checkbox()】舊的 update_group_checkbox_state() 已廢棄 def _handle_assign_drones(self, group_id): """開啟無人機分配對話框 — 過濾被占用的drone""" group = self.mission_groups.get(group_id) if not group: return all_ids = list(self.drones.keys()) # 預先選中:目前 group 的 selected_drone_ids pre_selected = set(group.selected_drone_ids) dialog = DroneAssignDialog(self, all_ids, pre_selected, {}) if dialog.exec() == QDialog.DialogCode.Accepted: requested = dialog.get_selected() allowed = set() blocked = set() for did in requested: if self._is_drone_selected_by_other_group(did, group_id): blocked.add(did) else: allowed.add(did) group.selected_drone_ids = allowed # 只有當操作目標組是 active 組時,才更新 UI if group_id == self.active_group_id: self.refresh_selection_ui() panel = self.group_panels.get(group_id) if panel: panel.update_drone_list() panel.update_status() if blocked: self.statusBar().showMessage( f"Group {group_id}: {len(blocked)} 台已被其他 group 勾選,未加入", 4000 ) else: self.statusBar().showMessage( f"Group {group_id}: 已分配 {len(group.selected_drone_ids)} 台無人機", 3000 ) def _handle_mission_type_changed(self, group_id, mission_type): """群組任務類型切換""" group = self.mission_groups.get(group_id) if group: group.mission_type = mission_type # 如果是 active group,同步更新地圖的選擇模式 if group_id == self.active_group_id: self.drone_map.set_mission_mode(mission_type) def _create_executor_for_group(self, group): """為群組建立 MissionExecutor""" executor = MissionExecutor( sender=self.command_sender, drone_gps=self.monitor.drone_gps, monitor=self.monitor, arrival_radius=2.0, tick_rate_hz=2.0, ) executor.drone_waypoint_reached.connect(self.on_drone_waypoint_reached) executor.task_status_changed.connect(self._on_task_status_changed) executor.mission_completed.connect( lambda gid=group.group_id: self._on_group_mission_completed(gid)) group.executor = executor def _handle_group_start(self, group_id): """啟動群組任務""" group = self.mission_groups.get(group_id) if not group: return if group.planned_waypoints is None: self.statusBar().showMessage(f"Group {group_id}: 請先規劃任務", 3000) return if group.executor is None: self._create_executor_for_group(group) group.executor.start(group.planned_waypoints) panel = self.group_panels.get(group_id) if panel: panel.update_status() self.statusBar().showMessage(f"Group {group_id}: 任務已啟動", 3000) def _handle_group_pause(self, group_id): """暫停/恢復群組任務""" group = self.mission_groups.get(group_id) if not group or not group.executor: return if group.executor.state == MissionState.RUNNING: group.executor.pause() self.statusBar().showMessage(f"Group {group_id}: 任務已暫停", 3000) elif group.executor.state == MissionState.PAUSED: group.executor.resume() self.statusBar().showMessage(f"Group {group_id}: 任務已恢復", 3000) panel = self.group_panels.get(group_id) if panel: panel.update_status() def _handle_group_stop(self, group_id): """停止群組任務""" group = self.mission_groups.get(group_id) if not group: return if group.executor: group.executor.stop() group.planned_waypoints = None self.drone_map.clear_mission_plan_for_group(group_id) panel = self.group_panels.get(group_id) if panel: panel.update_status() panel.clear_mission_info() self.statusBar().showMessage(f"Group {group_id}: 任務已停止", 3000) def _handle_group_mode_change(self, group_id, mode): """切換群組內所有無人機的飛行模式""" group = self.mission_groups.get(group_id) if not group: _log("ERROR", f"找不到群組: {group_id}") return if not group.selected_drone_ids: _log("WARN", f"Group {group_id} 中沒有無人機") self.statusBar().showMessage(f"群組 {group_id} 中沒有無人機", 3000) return _log("INFO", f"Group {group_id} 準備切換模式為 {mode}") self.statusBar().showMessage("正在切換模式", 1000) # 使用 asyncio 執行(通過事件循環) async def do_mode_changes_async(): for drone_id in group.selected_drone_ids: try: result = await self.monitor.set_mode(drone_id, mode) if result: msg = f"{drone_id} 切換成功" _log("INFO", msg) self.message_queue.put((msg, 2000)) else: msg = f"{drone_id} 切換失敗" _log("WARN", msg) self.message_queue.put((msg, 2000)) except Exception as e: msg = f"{drone_id} 錯誤: {str(e)}" _log("ERROR", msg) traceback.print_exc() self.message_queue.put((msg, 2000)) # 通過事件循環提交異步任務 loop = asyncio.get_event_loop() asyncio.run_coroutine_threadsafe( do_mode_changes_async(), loop ) def _handle_group_arm(self, group_id): """解鎖群組內所有無人機""" group = self.mission_groups.get(group_id) if not group: _log("WARN", f"找不到群組: {group_id}") return loop = asyncio.get_event_loop() _log("INFO", f"Group {group_id} 批次解鎖 {len(group.selected_drone_ids)} 台無人機") for drone_id in group.selected_drone_ids: coro = self.monitor.arm_drone(drone_id, True) action_text = f"解鎖 {drone_id}" # 使用 run_coroutine_threadsafe() 正確地在事件循環中運行 coroutine # 這是 Qt + asyncio 整合的正確方式 asyncio.run_coroutine_threadsafe( self.handle_service_response(coro, action_text), loop ) def _handle_group_takeoff(self, group_id, altitude): """群組內所有無人機起飛""" group = self.mission_groups.get(group_id) if not group: return loop = asyncio.get_event_loop() for drone_id in group.selected_drone_ids: future = self.monitor.takeoff_drone(drone_id, altitude) loop.create_task(self.handle_service_response(future, f"起飛 {drone_id} ({altitude}m)")) def _handle_box_select(self, group_id): """觸發地圖框選 → 框選完成後直接分配到該群組""" self._pending_box_assign = group_id self.drone_map.toggle_drone_box_select() self.statusBar().showMessage( f"請在地圖上框選要分配到 Group {group_id} 的無人機", 5000) def _handle_drone_box_selected(self, drone_ids_json): group_id = self._pending_box_assign self._pending_box_assign = None if not group_id: return group = self.mission_groups.get(group_id) if not group: return drone_ids = json.loads(drone_ids_json) allowed = set() blocked = set() for did in drone_ids: if self._is_drone_selected_by_other_group(did, group_id): blocked.add(did) else: allowed.add(did) group.selected_drone_ids = allowed if group_id == self.active_group_id: self.refresh_selection_ui() panel = self.group_panels.get(group_id) if panel: panel.update_drone_list() panel.update_status() if blocked: self.statusBar().showMessage( f"Group {group_id}: 部分 drone 已被其他 group 勾選,僅加入 {len(allowed)} 台", 4000 ) else: self.statusBar().showMessage( f"Group {group_id}: 框選分配 {len(allowed)} 台無人機", 3000 ) def _handle_select_all_for_group(self, group_id): """全選/取消全選 — Toggle 邏輯""" group = self.mission_groups.get(group_id) if not group: return selectable_ids = self._get_selectable_drones_for_group(group_id) | set(group.selected_drone_ids) # Toggle 邏輯:如果已全選,則清空;否則全選 if group.selected_drone_ids == selectable_ids: # 已全選 → 清空 group.selected_drone_ids.clear() msg_status = "已取消全選" else: # 未全選 → 全選 group.selected_drone_ids = set(selectable_ids) msg_status = f"全選可用無人機 {len(selectable_ids)} 台" # 只有當操作目標組是 active 組時,才更新 UI if group_id == self.active_group_id: self.refresh_selection_ui() panel = self.group_panels.get(group_id) if panel: panel.update_drone_list() panel.update_status() self.statusBar().showMessage( f"Group {group_id}: {msg_status}", 3000) def _handle_clear_group(self, group_id): """清除群組的無人機選擇""" group = self.mission_groups.get(group_id) if not group: return group.selected_drone_ids.clear() group.planned_waypoints = None if group.executor: group.executor.stop() self.drone_map.clear_mission_plan_for_group(group_id) # 只有當操作目標組是 active 組時,才更新 UI if group_id == self.active_group_id: self.refresh_selection_ui() panel = self.group_panels.get(group_id) if panel: panel.update_drone_list() panel.update_status() panel.clear_mission_info() self.statusBar().showMessage( f"Group {group_id}: 已清除分組", 3000) def _handle_delete_group(self, group_id): """刪除一個任務群組""" # 檢查是否只有一個群組,如果是就禁止刪除 if len(self.mission_groups) <= 1: self.statusBar().showMessage("至少需要保留一個群組", 3000) return if group_id not in self.mission_groups: self.statusBar().showMessage(f"Group {group_id} 不存在", 3000) return # 停止群組的執行(如果有) group = self.mission_groups[group_id] if group.executor: group.executor.stop() # 移除地圖上的任務計畫 self.drone_map.clear_mission_plan_for_group(group_id) # 移除 tab for i in range(self.group_tab_widget.count()): if f"Group {group_id}" in self.group_tab_widget.tabText(i): self.group_tab_widget.removeTab(i) break # 移除資料 del self.mission_groups[group_id] if group_id in self.group_panels: del self.group_panels[group_id] # 更新 active group — 讓 currentChanged signal 自動處理 if self.active_group_id == group_id: self.active_group_id = None if self.group_tab_widget.count() > 0: self.group_tab_widget.setCurrentIndex(0) # _on_group_tab_changed 會自動設定新的 active group self.statusBar().showMessage(f"已刪除 Group {group_id}", 3000) # 更新刪除按鈕的啟用/禁用狀態 self._update_delete_buttons_state() def _update_delete_buttons_state(self): """根據群組數量,更新所有群組的刪除按鈕啟用/禁用狀態""" # 如果只有一個群組,禁用該群組的刪除按鈕 # 如果有多個群組,啟用所有刪除按鈕 should_enable = len(self.mission_groups) > 1 for gid, panel in self.group_panels.items(): panel.set_delete_enabled(should_enable) def _on_group_mission_completed(self, group_id): """群組任務完成回呼""" panel = self.group_panels.get(group_id) if panel: panel.update_status() self.statusBar().showMessage(f"Group {group_id}: 所有無人機已完成任務", 5000) # ================================================================================ # UI 更新 # ================================================================================ def update_ui(self, msg_type, drone_id, data): """只做數據快取,不在這裡更新 UI""" if msg_type == 'connection_type': conn_type = data.get('type', 'Unknown') parts = drone_id.split('_') if len(parts) >= 2 and parts[0].startswith('s'): socket_id = parts[0][1:] if socket_id not in self.socket_types: self.socket_types[socket_id] = conn_type if socket_id in self.socket_groups: self.socket_groups[socket_id].set_socket_type(conn_type) return if drone_id not in self.drones: self.add_drone(drone_id) return # 只做資料快取,不更新 UI - 所有 UI 更新都在 _update_panel_and_map 中進行 if drone_id not in self._message_cache: self._message_cache[drone_id] = {} self._message_cache[drone_id][msg_type] = data # ================================================================================ # 勾選管理 # ================================================================================ def handle_group_selection(self, socket_id, state): """Socket 群組勾選 — 更新該 socket 下所有 drone 的選擇狀態""" group = self._get_active_group() if not group: return drone_ids = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id] selectable = [] blocked = [] for did in drone_ids: if self._is_drone_selected_by_other_group(did, group.group_id): blocked.append(did) else: selectable.append(did) if not selectable: if blocked: self.statusBar().showMessage( f"Socket {socket_id} 的 drone 已被其他 group 勾選,無法操作", 4000 ) self.refresh_selection_ui() return all_selectable_selected = all(did in group.selected_drone_ids for did in selectable) if all_selectable_selected: group.selected_drone_ids.difference_update(selectable) else: group.selected_drone_ids.update(selectable) if blocked: self.statusBar().showMessage( f"以下 drone 已被其他 group 勾選,略過: {', '.join(blocked)}", 4000 ) self.refresh_selection_ui() def _is_drone_selected_by_other_group(self, drone_id, current_group_id): """ 檢查該 drone 是否已被其他 group 勾選 Args: drone_id: 無人機 ID current_group_id: 當前 group 的 ID Returns: True 如果 drone 已被其他 group 選擇,否則 False """ for gid, group in self.mission_groups.items(): # 跳過當前 group if gid == current_group_id: continue # 檢查該 drone 是否在其他 group 中被勾選 if drone_id in group.selected_drone_ids: return True return False def _get_selectable_drones_for_group(self, group_id): """ 取得指定 group 可以選擇的 drone 清單(不被其他 group 佔用的) Args: group_id: group 的 ID Returns: set: 可選擇的 drone ID 集合 """ selectable = set() for did in self.drones.keys(): if not self._is_drone_selected_by_other_group(did, group_id): selectable.add(did) return selectable def handle_drone_selection(self, drone_id, state): """單台 drone 勾選 — 只修改 active group 的 selected_drone_ids""" group = self._get_active_group() if not group: return is_checked = (state == Qt.CheckState.Checked.value) if is_checked: if self._is_drone_selected_by_other_group(drone_id, group.group_id): self.statusBar().showMessage( f"{drone_id} 已被其他 group 勾選,無法加入 Group {group.group_id}", 3000 ) self.refresh_selection_ui() return group.selected_drone_ids.add(drone_id) else: group.selected_drone_ids.discard(drone_id) self.refresh_selection_ui() def handle_drone_clicked(self, drone_id): """地圖上點擊 drone — 切換其選擇狀態""" group = self._get_active_group() if not group: return if drone_id in group.selected_drone_ids: group.selected_drone_ids.remove(drone_id) else: # 嘗試勾選前先檢查是否被其他 group 使用 if self._is_drone_selected_by_other_group(drone_id, group.group_id): self.statusBar().showMessage( f"{drone_id} 已被其他 group 勾選,無法加入 Group {group.group_id}", 3000 ) self.refresh_selection_ui() return group.selected_drone_ids.add(drone_id) self.refresh_selection_ui() def handle_toggle_select_all_drones(self): """全選 / 清空 — 切換 active group 的所有 drone""" group = self._get_active_group() if not group: return selectable_ids = self._get_selectable_drones_for_group(group.group_id) | set(group.selected_drone_ids) if group.selected_drone_ids == selectable_ids: # 已全選 → 清空 group.selected_drone_ids.clear() else: # 未全選 → 全選 group.selected_drone_ids = set(selectable_ids) self.refresh_selection_ui() def handle_clear_all_drone_selection(self): """清除 active group 的所有無人機選擇""" group = self._get_active_group() if not group: return group.selected_drone_ids.clear() self.refresh_selection_ui() self.statusBar().showMessage("已清除所有選擇", 2000) # ================================================================================ # 任務規劃 — 點擊地圖(路由到 active group) # ================================================================================ def _get_group_drones(self, group): """取得群組的無人機 ID 列表(排序後)""" return sorted(group.selected_drone_ids, key=lambda x: (x.split('_')[0], int(x.split('_')[1]))) @staticmethod def _run_plan_formation_mission(planner_config, drone_gps_positions, target_gps, mission_type, params): planner = FormationPlanner(**planner_config) return planner.plan_formation_mission( drone_gps_positions, target_gps, mission_type, params=params ) def _submit_mission_plan(self, group, selected_drones, drone_gps_positions, target_gps, mission_type, params, context): self._plan_request_counter += 1 request_id = self._plan_request_counter self._latest_plan_request_by_group[group.group_id] = request_id planner_config = { 'spacing': self.mission_planner.spacing, 'base_altitude': self.mission_planner.base_altitude, 'altitude_diff': self.mission_planner.altitude_diff, } params = dict(params) future = self.planner_pool.submit( self._run_plan_formation_mission, planner_config, list(drone_gps_positions), tuple(target_gps), mission_type, params, ) future.add_done_callback( lambda f: self._emit_plan_result( f, request_id, group.group_id, list(selected_drones), list(drone_gps_positions), tuple(target_gps), context ) ) def _emit_plan_result(self, future, request_id, group_id, selected_drones, drone_gps_positions, target_gps, context): if getattr(self, '_closing', False): return try: waypoints_per_drone, center_origin, rendezvous_indices = future.result() payload = { 'ok': True, 'request_id': request_id, 'group_id': group_id, 'selected_drones': selected_drones, 'drone_gps_positions': drone_gps_positions, 'target_gps': target_gps, 'waypoints_per_drone': waypoints_per_drone, 'center_origin': center_origin, 'rendezvous_indices': rendezvous_indices, 'context': context, } except Exception as e: payload = { 'ok': False, 'request_id': request_id, 'group_id': group_id, 'error': str(e), 'traceback': traceback.format_exc(), 'context': context, } self.planning_finished.emit(payload) def _on_plan_done(self, payload): if getattr(self, '_closing', False): return group_id = payload['group_id'] context = payload.get('context', {}) if self._latest_plan_request_by_group.get(group_id) != payload['request_id']: return group = self.mission_groups.get(group_id) if not group: return if not payload['ok']: self.statusBar().showMessage( f"Group {group_id}: {context.get('failure_label', '規劃')}失敗: {payload['error']}", 5000 ) _log("ERROR", payload.get('traceback', '')) return selected_drones = payload['selected_drones'] drone_gps_positions = payload['drone_gps_positions'] target_gps = payload['target_gps'] waypoints_per_drone = payload['waypoints_per_drone'] center_origin = payload['center_origin'] rendezvous_indices = payload['rendezvous_indices'] expected_mission_type = context.get('expected_mission_type') if set(group.selected_drone_ids) != set(selected_drones): self.statusBar().showMessage( f"Group {group_id}: 無人機分配已變更,忽略舊規劃結果", 3000 ) return if expected_mission_type and group.mission_type != expected_mission_type: self.statusBar().showMessage( f"Group {group_id}: 任務模式已變更,忽略舊規劃結果", 3000 ) return group.planned_waypoints = { 'drone_ids': selected_drones, 'waypoints': waypoints_per_drone, 'rendezvous_indices': rendezvous_indices, } group.center_origin = center_origin self.show_planned_waypoints(group) center_lat, center_lon, _ = center_origin target_lat, target_lon = context['draw_target'] self.drone_map.draw_mission_plan_for_group( group_id, group.color, center_lat, center_lon, target_lat, target_lon ) self._launch_verification( context['verification_type'], drone_gps_positions, selected_drones, waypoints_per_drone, center_origin, target_gps=target_gps if context.get('use_target_gps') else None, rect_corners=context.get('rect_corners'), route_waypoints=context.get('route_waypoints'), ) panel = self.group_panels.get(group_id) if panel: panel.update_status() panel.update_mission_info(center_lat, center_lon, target_lat, target_lon) total_wps = sum(len(wps) for wps in waypoints_per_drone) self.statusBar().showMessage( f"Group {group_id}: {context['success_label']},{len(selected_drones)} 台,共 {total_wps} 個航點", 5000 ) def handle_map_click(self, lat, lon): """處理地圖點擊事件 — 根據 active group 的任務類型規劃""" group = self._get_active_group() if not group: self.statusBar().showMessage("請先建立任務群組", 3000) return mode_map = { 'M_FORMATION': MissionType.M_FORMATION, 'CIRCLE_FORMATION': MissionType.CIRCLE_FORMATION, } mission_type = mode_map.get(group.mission_type) if mission_type is None: return # Grid Sweep / Leader-Follower 由各自的觸發方式處理 selected_drones = self._get_group_drones(group) if len(selected_drones) == 0: self.statusBar().showMessage(f"Group {group.group_id}: 請先分配無人機", 3000) return _log("INFO", f"地圖點擊: {lat:.6f}, {lon:.6f} -> Group {group.group_id} ({group.mission_type})") panel = self.group_panels.get(group.group_id) params = panel.get_mission_params() if panel else {} base_alt = params.get('base_altitude', params.get('altitude', 10.0)) target_gps = (lat, lon, base_alt) self.statusBar().showMessage( f"Group {group.group_id}: 正在規劃 {group.mission_type} ({len(selected_drones)} 台)", 2000) drone_gps_positions = self._collect_drone_gps(selected_drones, base_alt) if drone_gps_positions is None: return context = { 'success_label': f"{group.mission_type} 規劃完成", 'failure_label': '規劃', 'verification_type': group.mission_type, 'draw_target': (lat, lon), 'use_target_gps': True, 'expected_mission_type': group.mission_type, } self._submit_mission_plan( group, selected_drones, drone_gps_positions, target_gps, mission_type, params, context ) # ================================================================================ # 任務規劃 — 矩形選取 (Grid Sweep) # ================================================================================ def handle_rectangle_selected(self, points_json): group = self._get_active_group() if not group: self.statusBar().showMessage("請先建立任務群組", 3000) return if group.mission_type != 'GRID_SWEEP': return # 非 Grid Sweep 模式不處理矩形選取 selected_drones = self._get_group_drones(group) if len(selected_drones) == 0: self.statusBar().showMessage(f"Group {group.group_id}: 請先分配無人機", 3000) return _log("INFO", f"矩形選取 -> Group {group.group_id}: {points_json}") try: rect_corners = [(p[0], p[1]) for p in json.loads(points_json)] except (json.JSONDecodeError, IndexError): self.statusBar().showMessage("矩形座標解析失敗", 3000) return panel = self.group_panels.get(group.group_id) params = panel.get_mission_params() if panel else {} base_alt = params.get('altitude', 10.0) self.statusBar().showMessage( f"Group {group.group_id}: 正在規劃 Grid Sweep ({len(selected_drones)} 台)", 2000) drone_gps_positions = self._collect_drone_gps(selected_drones, base_alt) if drone_gps_positions is None: return target_lat = sum(c[0] for c in rect_corners) / 4 target_lon = sum(c[1] for c in rect_corners) / 4 target_gps = (target_lat, target_lon, base_alt) params['rect_corners'] = rect_corners context = { 'success_label': 'Grid Sweep 規劃完成', 'failure_label': 'Grid Sweep 規劃', 'verification_type': 'grid_sweep', 'draw_target': (target_lat, target_lon), 'rect_corners': rect_corners, 'expected_mission_type': group.mission_type, } self._submit_mission_plan( group, selected_drones, drone_gps_positions, target_gps, MissionType.GRID_SWEEP, params, context ) # ================================================================================ # 任務規劃 — 路徑確認 (Leader-Follower) # ================================================================================ def handle_route_confirmed(self, points_json): group = self._get_active_group() if not group: self.statusBar().showMessage("請先建立任務群組", 3000) return if group.mission_type != 'LEADER_FOLLOWER': return selected_drones = self._get_group_drones(group) if len(selected_drones) == 0: self.statusBar().showMessage(f"Group {group.group_id}: 請先分配無人機", 3000) return # LEADER_FOLLOWER:把指定的領隊放到 rank 0 leader = getattr(group, 'leader_drone_id', None) if leader and leader in selected_drones: selected_drones = [leader] + [d for d in selected_drones if d != leader] _log("INFO", f"路徑確認 -> Group {group.group_id}: {points_json}") try: route_points = json.loads(points_json) route_waypoints = [(p[0], p[1]) for p in route_points] except (json.JSONDecodeError, IndexError): self.statusBar().showMessage("路徑座標解析失敗", 3000) return if len(route_waypoints) < 2: self.statusBar().showMessage("至少需要 2 個路徑點", 3000) return panel = self.group_panels.get(group.group_id) params = panel.get_mission_params() if panel else {} base_alt = params.get('altitude', 10.0) self.statusBar().showMessage( f"Group {group.group_id}: 正在規劃跟隨模式 ({len(selected_drones)} 台)", 2000) drone_gps_positions = self._collect_drone_gps(selected_drones, base_alt) if drone_gps_positions is None: return target_lat = sum(p[0] for p in route_waypoints) / len(route_waypoints) target_lon = sum(p[1] for p in route_waypoints) / len(route_waypoints) target_gps = (target_lat, target_lon, base_alt) params['route_waypoints'] = route_waypoints context = { 'success_label': '跟隨模式規劃完成', 'failure_label': '跟隨模式規劃', 'verification_type': 'LEADER_FOLLOWER', 'draw_target': (target_lat, target_lon), 'use_target_gps': True, 'route_waypoints': route_waypoints, 'expected_mission_type': group.mission_type, } self._submit_mission_plan( group, selected_drones, drone_gps_positions, target_gps, MissionType.LEADER_FOLLOWER, params, context ) # ================================================================================ # 任務執行回呼 # ================================================================================ def on_drone_waypoint_reached(self, drone_id, wp_index, total): if wp_index >= total: self.statusBar().showMessage(f"{drone_id} 已完成所有航點", 3000) else: self.statusBar().showMessage(f"{drone_id} 到達 WP {wp_index}/{total}", 2000) def _on_task_status_changed(self, drone_id, status, message): """MissionExecutor.task_status_changed slot:把 goto 失敗/重試/fallback/barrier 丟到 status bar""" if status == "retrying": self.statusBar().showMessage(f"{drone_id} {message}", 4000) elif status == "fallback_loiter": self.statusBar().showMessage(f"{drone_id} {message}", 8000) elif status == "waiting_at_barrier": self.statusBar().showMessage(f"{drone_id} {message}", 3000) elif status == "normal": self.statusBar().showMessage(f"{drone_id} {message or '已恢復'}", 2000) # ================================================================================ # 輔助方法 # ================================================================================ def _collect_drone_gps(self, selected_drones, base_alt): drone_gps_positions = [] for drone_id in selected_drones: if hasattr(self.monitor, 'drone_gps') and drone_id in self.monitor.drone_gps: gps_data = self.monitor.drone_gps[drone_id] drone_gps_positions.append((gps_data['lat'], gps_data['lon'], gps_data.get('alt', base_alt))) elif drone_id in self.drone_positions: pos = self.drone_positions[drone_id] lat_drone = 24.0 + pos[1] / 111000 lon_drone = 120.0 + pos[0] / (111000 * 0.9) alt_drone = pos[2] if len(pos) > 2 else base_alt drone_gps_positions.append((lat_drone, lon_drone, alt_drone)) else: self.statusBar().showMessage(f"找不到 {drone_id} 的位置資料", 3000) return None return drone_gps_positions def _launch_verification(self, mission_type, drone_gps_positions, selected_drones, waypoints_per_drone, origin, target_gps=None, rect_corners=None, route_waypoints=None): """存 JSON + 啟動 matplotlib 視覺化驗證 (獨立 process)""" if not self.verify_toggle.isChecked(): _log("INFO", "模擬驗證已關閉,跳過驗證視窗") return import os data = { 'mission_type': mission_type, 'drone_ids': selected_drones, 'drones_gps': drone_gps_positions, 'waypoints': waypoints_per_drone, 'origin': list(origin), } if target_gps: data['target'] = list(target_gps) if rect_corners: data['rect_corners'] = rect_corners if route_waypoints: data['route_waypoints'] = route_waypoints json_path = '/tmp/mission_plan.json' with open(json_path, 'w') as f: json.dump(data, f, indent=2) script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'validation', 'verify_waypoints.py') subprocess.Popen([sys.executable, script_path, '--file', json_path]) _log("INFO", f"驗證視窗已啟動: {json_path}") def show_planned_waypoints(self, group=None): pw = group.planned_waypoints if group else None if not pw: return gid = group.group_id if group else "?" print(f"\n{'=' * 60}") print(f"任務規劃結果 — Group {gid}") print(f"{'=' * 60}") drone_ids = pw['drone_ids'] waypoints = pw['waypoints'] print(f"\n共 {len(drone_ids)} 台無人機") for i, drone_id in enumerate(drone_ids): wps = waypoints[i] print(f"\n【{drone_id}】({len(wps)} 個航點)") for j, wp in enumerate(wps): print(f" WP{j}: ({wp[0]:.6f}°, {wp[1]:.6f}°, {wp[2]:.1f}m)") print(f"\n{'=' * 60}") def update_field(self, panel, drone_id, field, text, color=None): if isinstance(panel, DronePanel): panel.update_field(field, text, color) def update_overview_table(self, drone_id=None, field=None, value=None): if not hasattr(self, 'overview_table') or self.overview_table is None: return self.overview_table.set_drones(self.drones) self.overview_table.update_table(drone_id, field, value) def queue_overview_update(self, drone_id, field, value): if not hasattr(self, '_overview_cache'): self._overview_cache = {} self._overview_cache.setdefault(drone_id, {})[field] = value def _flush_overview_table(self): if not hasattr(self, 'overview_table') or self.overview_table is None: return if not getattr(self, '_overview_cache', None): return pending_updates = self._overview_cache self._overview_cache = {} self.overview_table.set_drones(self.drones) self.overview_table.setUpdatesEnabled(False) try: for drone_id, fields in pending_updates.items(): for field, value in fields.items(): self.overview_table.update_table(drone_id, field, value) finally: self.overview_table.setUpdatesEnabled(True) self.overview_table.viewport().update() def get_socket_id(self, drone_id): import re match = re.match(r's(\d+)_(\d+)', drone_id) return match.group(1) if match else 'unknown' def add_drone(self, drone_id): if drone_id in self.drones: return socket_id = self.get_socket_id(drone_id) panel = self.create_drone_panel(drone_id) self.drones[drone_id] = panel if socket_id not in self.socket_groups: self.socket_groups[socket_id] = self.create_socket_group_panel(socket_id) self.socket_groups[socket_id].drones_layout.addWidget(panel) self.reorganize_socket_groups() self.update_overview_table() # 同步新 drone 到 UI self.refresh_selection_ui() def reorganize_socket_groups(self): while self.drone_panel_layout.count(): w = self.drone_panel_layout.takeAt(0).widget() if w: w.setParent(None) for socket_id, group_panel in self.socket_groups.items(): group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id] while group_panel.drones_layout.count(): w = group_panel.drones_layout.takeAt(0).widget() if w: w.setParent(None) def sort_key(x): parts = x[1:].split('_') return (int(parts[0]), int(parts[1])) for did in sorted(group_drones, key=sort_key): group_panel.drones_layout.addWidget(self.drones[did]) for socket_id in sorted(self.socket_groups.keys(), key=lambda x: int(x)): self.drone_panel_layout.addWidget(self.socket_groups[socket_id]) def _update_panel_and_map(self): """定時更新 panel / table,批量更新 UI 以避免過度重繪。""" if not hasattr(self, '_message_cache') or not self._message_cache: return # 頻率監控 if not hasattr(self, '_map_update_time'): self._map_update_time = time.time() self._map_update_count = 0 self._map_update_count += 1 now = time.time() if now - self._map_update_time >= 1.0: self._map_update_time = now self._map_update_count = 0 try: start_time = time.time() # ✅ 步驟 2: 遍歷快取中最新的資料來更新 UI for drone_id in list(self._message_cache.keys()): if drone_id not in self.drones: continue panel = self.drones[drone_id] cached_data = self._message_cache[drone_id] # 處理所有快取的消息類型 for msg_type, data in cached_data.items(): if msg_type == 'state': mode = data.get('mode', 'UNKNOWN') armed = data.get('armed', None) mode_color = '#FF5555' if any(x in mode.upper() for x in ['RTL', '返航', 'EMERGENCY']) else '#55FF55' if armed is True: arm_text, arm_color = "ARMED", '#55FF55' elif armed is False: arm_text, arm_color = "DISARMED", '#FF5555' else: arm_text, arm_color = "--", '#AAAAAA' self.update_field(panel, drone_id, 'mode', mode, mode_color) self.update_field(panel, drone_id, 'armed', arm_text, arm_color) self.queue_overview_update(drone_id, 'mode', mode) self.queue_overview_update(drone_id, 'armed', arm_text) elif msg_type == 'battery': voltage = data.get('voltage', 16) cells = round(voltage / 3.95) percentage = (voltage / cells - 3.7) / 0.5 * 100 if cells > 0 else 0 if percentage < 20: voltage_color = '#FF6464' elif percentage < 50: voltage_color = '#FFA500' else: voltage_color = '#FFFFFF' percentage = data.get('percentage', percentage) self.update_field(panel, drone_id, 'battery_pct', f"{percentage:.0f}%", voltage_color) self.update_field(panel, drone_id, 'battery_vol', f"{voltage:.2f}V") self.update_field(panel, drone_id, 'battery_cells', f"{cells}S") self.queue_overview_update(drone_id, 'battery', f"{voltage:.2f}V") elif msg_type == 'altitude': altitude = data.get('altitude', 0) text = f"{altitude:.1f} m" self.update_field(panel, drone_id, 'altitude', text) self.queue_overview_update(drone_id, 'altitude', text) elif msg_type == 'local_pose': x, y = data.get('x', 0), data.get('y', 0) if not hasattr(self.monitor, 'drone_local'): self.monitor.drone_local = {} self.monitor.drone_local[drone_id] = {'x': x, 'y': y} self.queue_overview_update(drone_id, 'local', f"{x:.1f}, {y:.1f}") elif msg_type == 'loss_rate': text = f"{data.get('loss_rate', 0):.1f}%" self.update_field(panel, drone_id, 'loss_rate', text) self.queue_overview_update(drone_id, 'loss_rate', text) elif msg_type == 'ping': text = f"{data.get('ping', 0):.1f} ms" self.update_field(panel, drone_id, 'ping', text) self.queue_overview_update(drone_id, 'ping', text) elif msg_type == 'velocity': self.queue_overview_update(drone_id, 'velocity', f"{data['vx']:.1f}, {data['vy']:.1f}") elif msg_type == 'attitude': roll, pitch, yaw = data.get('roll', 0), data.get('pitch', 0), data.get('yaw', 0) self.queue_overview_update(drone_id, 'roll', f"{roll:.1f}°") self.queue_overview_update(drone_id, 'pitch', f"{pitch:.1f}°") self.queue_overview_update(drone_id, 'yaw', f"{yaw:.1f}°") panel._last_roll = roll panel._last_pitch = pitch if hasattr(panel, 'update_attitude'): heading = self.drone_headings.get(drone_id, yaw) panel.update_attitude(heading, roll, pitch) elif msg_type == 'gps': gps_data = data lat, lon = gps_data.get('lat', 0), gps_data.get('lon', 0) self.drone_positions[drone_id] = (lat, lon) alt = gps_data.get('alt', 0) if not hasattr(self.monitor, 'drone_gps'): self.monitor.drone_gps = {} self.monitor.drone_gps[drone_id] = {'lat': lat, 'lon': lon, 'alt': alt} self.queue_overview_update(drone_id, 'latitude', f"{lat:.6f}°") self.queue_overview_update(drone_id, 'longitude', f"{lon:.6f}°") elif msg_type == 'hud': hud_data = data heading = hud_data.get('heading', 0) self.drone_headings[drone_id] = heading groundspeed = hud_data.get('groundspeed', 0) airspeed = hud_data.get('airspeed', 0) throttle = hud_data.get('throttle', 0) hud_alt = hud_data.get('alt', 0) climb = hud_data.get('climb', 0) self.queue_overview_update(drone_id, 'heading', f"{heading:.1f}°") self.queue_overview_update(drone_id, 'groundspeed', f"{groundspeed:.1f} m/s" if isinstance(groundspeed, (int, float)) else "--") self.queue_overview_update(drone_id, 'airspeed', f"{airspeed:.1f} m/s" if isinstance(airspeed, (int, float)) else "--") self.queue_overview_update(drone_id, 'throttle', f"{throttle:.0f}%" if isinstance(throttle, (int, float)) else "--") self.queue_overview_update(drone_id, 'hud_alt', f"{hud_alt:.1f} m" if isinstance(hud_alt, (int, float)) else "--") self.queue_overview_update(drone_id, 'climb', f"{climb:.1f} m/s" if isinstance(climb, (int, float)) else "--") self.update_field(panel, drone_id, 'heading', f"{heading:.1f}°") self.update_field(panel, drone_id, 'groundspeed', f"{groundspeed:.1f} m/s" if isinstance(groundspeed, (int, float)) else "--") self.update_field(panel, drone_id, 'speed', f"{groundspeed:.1f} m/s" if isinstance(groundspeed, (int, float)) else "--") elapsed = (time.time() - start_time) * 1000 if elapsed > 33: _log("WARN", f"UI 更新耗時 {elapsed:.1f}ms (目標 <33ms)") except Exception as e: _log("ERROR", f"Panel 更新錯誤: {e}") traceback.print_exc() def _update_map_only(self): """獨立降頻更新地圖,避免地圖 JS 呼叫拖住 panel / table。""" for drone_id, pos in list(self.drone_positions.items()): heading = self.drone_headings.get(drone_id, 0) lat, lon = pos self.drone_map.update_drone_position(drone_id, lat, lon, heading) def _process_message_queue(self): """處理來自後台線程的消息隊列(更新 GUI 狀態欄)""" try: while True: try: message, duration = self.message_queue.get_nowait() self.statusBar().showMessage(message, duration) except: break except Exception as e: _log("ERROR", f"訊息佇列處理錯誤: {e}") traceback.print_exc() def _spin_asyncio(self): """驅動 asyncio 事件循環,允許異步任務執行 關鍵修復:asyncio 事件循環不會自動運行。 這個定時器會定期執行事件循環,讓 run_coroutine_threadsafe() 提交的任務能夠執行。 """ try: loop = asyncio.get_event_loop() if loop and not loop.is_closed() and not loop.is_running(): # 執行事件循環直到沒有掛起的任務或超時 # 使用 run_until_complete() 配合一個快速返回的 coroutine loop.run_until_complete(asyncio.sleep(0)) except Exception as e: # 靜默忽略任何錯誤,防止 Qt 定時器出現異常 # 但仍然打印詳細的堆棧跟踪以便調試 traceback.print_exc() def _ros_spin_thread(self): while self.ros_thread_running and rclpy.ok(): try: self.executor.spin_once(timeout_sec=0.01) latest_items = list(self.monitor.latest_data.items()) self.monitor.latest_data.clear() for (drone_id, msg_type), data in latest_items: self.monitor.signals.update_signal.emit(msg_type, drone_id, data) except RuntimeError as e: if "Context" in str(e) or "context" in str(e).lower(): _log("WARN", f"ROS2 context 錯誤(已忽略): {e}") break _log("ERROR", f"ROS spin thread error: {e}") traceback.print_exc() except Exception as e: _log("ERROR", f"ROS spin thread error: {e}") traceback.print_exc() def _stop_ros_thread(self): self.ros_thread_running = False ros_thread = getattr(self, 'ros_thread', None) if ros_thread and ros_thread.is_alive(): ros_thread.join(timeout=1.0) def closeEvent(self, event): self._closing = True self._restore_stream_redirector() self._stop_ros_thread() try: try: self.planner_pool.shutdown(wait=False, cancel_futures=True) except TypeError: self.planner_pool.shutdown(wait=False) for group in self.mission_groups.values(): if group.executor: group.executor.stop() self.command_sender.close() for receiver in self.udp_receivers: receiver.stop() for receiver in self.monitor.ws_receivers: receiver.stop() # Clean up serial receivers for receiver in self.monitor.serial_receivers: receiver.stop() # Clean up all CommandLongClient instances for drone_id, client in self.monitor.command_long_clients.items(): try: client.destroy_node() except: pass # Clean up all PositionTargetGlobalIntClient instances for drone_id, client in getattr(self.monitor, 'position_target_clients', {}).items(): try: client.destroy_node() except: pass self.monitor.destroy_node() self.executor.shutdown() except Exception as e: _log("WARN", f"清理資源時發生錯誤: {e}") traceback.print_exc() # 安全地 shutdown ROS2 try: if rclpy.ok(): rclpy.shutdown() except RuntimeError as e: _log("WARN", f"ROS2 shutdown 錯誤: {e}") traceback.print_exc() event.accept() def main(): """ GUI 應用程式的主入口點 標準 ROS2 + Qt 架構: 1. 在最外層/最前面只做一次 rclpy.init() 2. 啟動 Qt 應用程式 3. 在 finally 中做 rclpy.shutdown() 這樣可以確保所有 ROS2 相關操作都共享同一個初始化狀態 """ # 第一步:在最外層只初始化一次 ROS2(終極防護) # 添加 rclpy.ok() 檢查,防止重複初始化導致 "Context.init() must only be called once" 錯誤 _log("INFO", "GUI 主程式檢查 ROS2 初始化狀態") if not rclpy.ok(): _log("INFO", "ROS2 尚未初始化,開始初始化") rclpy.init() _log("INFO", "ROS2 已完成全域初始化") else: _log("INFO", "ROS2 已初始化,略過重複初始化") try: # 第二步:啟動 Qt 應用程式 _log("INFO", "啟動 Qt 應用程式") app = QApplication(sys.argv) station = ControlStationUI() station.show() _log("INFO", "應用程式已啟動") # 第三步:進入 Qt 事件循環(阻塞直到用戶關閉應用) _log("INFO", "進入主事件循環") sys.exit(app.exec()) finally: # 第四步:只有當 GUI 視窗被關閉時,才做 ROS2 cleanup(終極防護) # 這裡確保 ROS2 被正確且安全地關閉 _log("INFO", "應用程式關閉,開始清理 ROS2 資源") if rclpy.ok(): rclpy.shutdown() _log("INFO", "ROS2 已安全關閉") else: _log("INFO", "ROS2 已關閉或不可用,無需重複 shutdown") _log("INFO", "應用程式已完全退出") if __name__ == '__main__': main()