#!/usr/bin/env python3 import rclpy from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, QWidget, QLabel, QSplitter, QScrollArea, QSizePolicy, QTabWidget, QTableWidget, QTableWidgetItem, QHeaderView, QPushButton, QCheckBox, QLineEdit) from PyQt6.QtCore import Qt, QTimer import sys import asyncio # 導入分離的類別 from communication import DroneMonitor, UDPMavlinkReceiver, WebSocketMavlinkReceiver from map_layout import DroneMap from drone_panel import DronePanel, SocketGroupPanel from comm_panel import CommPanel from overview_table import OverviewTable # ================================================================================ # 【新增】導入任務規劃器 # ================================================================================ from mission_planner import FormationPlanner # ================================================================================ class ControlStationUI(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle('GCS') self.resize(1400, 900) # 初始化ROS2 rclpy.init() self.monitor = DroneMonitor() self.monitor.signals.update_signal.connect(self.update_ui) # ROS执行器配置 self.executor = rclpy.executors.SingleThreadedExecutor() self.executor.add_node(self.monitor) # 定时处理ROS事件 self.timer = QTimer() self.timer.timeout.connect(self.spin_ros) self.timer.start(10) # 初始化UI self.drones = {} self.socket_groups = {} self.info_types = ["模式", "ARM", "電壓", "經度", "緯度", "高度", "Local", "Velocity", "地速", "航向", "Roll", "Pitch", "Yaw", "丟包", "延遲"] self.info_type_map = { "mode": 0, "armed": 1, "battery": 2, "longitude": 3, "latitude": 4, "altitude": 5, "local": 6, "velocity": 7, "groundspeed": 8, "heading": 9, "roll": 10, "pitch": 11, "yaw": 12, "loss_rate": 13, "ping": 14 } self.socket_colors = { '0': '#00BFFF', # 天藍色 (DeepSkyBlue) '1': '#FFD700', # 金色 (Gold) '2': '#FF6969', # 淺紅色 (Light Red) '3': '#FF69B4', # 熱粉紅 (HotPink) '4': '#00FA9A', # 中春綠 (MediumSpringGreen) '5': '#9370DB', # 中紫色 (MediumPurple) - 串口 '6': '#FFA500', # 橙色 (Orange) '7': '#20B2AA', # 淺海綠 (LightSeaGreen) '8': '#7CFC00', # 草綠色 (LawnGreen) '9': '#FF8C00', # 深橙色 (DarkOrange) 'default': '#AAAAAA' # 灰色 } self.drone_positions = {} self.drone_headings = {} # 初始化地圖 self.drone_map = DroneMap() # 初始化連接列表 self.udp_receivers = [] self.udp_connections = [] self.ws_connections = [] self.monitor.start_serial_connection('/dev/ttyUSB0', 57600) # ================================================================================ # 【新增】初始化任務規劃器 # ================================================================================ self.mission_planner = FormationPlanner( spacing=5.0, # 5 公尺間距 base_altitude=10.0, # 基準高度 10 公尺 altitude_diff=2.0 # 高低差 2 公尺 ) self.planned_waypoints = None # 儲存規劃結果 # ================================================================================ self.init_ui() def init_ui(self): # 只呼叫一次 main_splitter = QSplitter(Qt.Orientation.Horizontal) # 左側 TabWidget self.left_tab = QTabWidget() # — 分頁 1:Drone Panel self.drone_panel_container = QWidget() self.drone_panel_layout = QVBoxLayout(self.drone_panel_container) self.drone_panel_layout.setAlignment(Qt.AlignmentFlag.AlignTop) self.drone_panel_layout.setSpacing(0) self.drone_panel_layout.setContentsMargins(10, 10, 10, 10) # Wrap drone panel in scroll area scroll = QScrollArea() scroll.setWidget(self.drone_panel_container) scroll.setWidgetResizable(True) self.left_tab.addTab(scroll, "無人載具") # — 分頁 2:Overview Table self.overview_table = OverviewTable(self.info_types, self.info_type_map) self.left_tab.addTab(self.overview_table, "總覽") # — 分頁 3:通訊設定 self.comm_panel = CommPanel() self.comm_panel.udp_connection_added.connect(self.handle_udp_connection_added) self.comm_panel.ws_connection_added.connect(self.handle_ws_connection_added) self.comm_panel.udp_connection_toggled.connect(self.toggle_udp_connection) self.comm_panel.ws_connection_toggled.connect(self.toggle_ws_connection) self.comm_panel.udp_connection_removed.connect(self.remove_udp_connection) self.comm_panel.ws_connection_removed.connect(self.remove_ws_connection) self.comm_panel.status_message.connect(lambda msg, timeout: self.statusBar().showMessage(msg, timeout)) self.left_tab.addTab(self.comm_panel, "通訊") # 右侧容器 right_container = QWidget() right_layout = QVBoxLayout(right_container) right_layout.setContentsMargins(10, 10, 10, 10) right_layout.setSpacing(10) # ========== 批次控制區域(直接使用 layout) ========== batch_control_layout = QHBoxLayout() # 添加批次操作標題 batch_title = QLabel("批次操作") batch_title.setStyleSheet(""" color: #DDD; font-size: 16px; font-weight: bold; padding: 5px; background-color: #333; border-radius: 4px; """) batch_control_layout.addWidget(batch_title) # 上方按鈕區域 upper_buttons = QHBoxLayout() select_all_btn = QPushButton("全選") select_all_btn.clicked.connect(self.handle_select_all) arm_all_btn = QPushButton("解鎖") arm_all_btn.clicked.connect(self.handle_arm_selected) takeoff_all_btn = QPushButton("起飛") takeoff_all_btn.clicked.connect(self.handle_takeoff_selected) for btn in [select_all_btn, arm_all_btn, takeoff_all_btn]: btn.setStyleSheet(""" QPushButton { background-color: #444; color: #DDD; border: none; padding: 8px 12px; border-radius: 4px; min-width: 80px; } QPushButton:hover { background-color: #555; } """) upper_buttons.addWidget(btn) upper_buttons.addStretch() # 模式切換區域 mode_layout = QHBoxLayout() mode_label = QLabel("模式:") mode_label.setStyleSheet("color: #DDD; min-width: 40px;") from PyQt6.QtWidgets import QComboBox self.mode_combo = QComboBox() self.mode_combo.addItems(["AUTO", "GUIDED", "LOITER", "LAND"]) self.mode_combo.setCurrentIndex(1) self.mode_combo.setStyleSheet(""" QComboBox { background-color: #333; color: #DDD; border-radius: 3px; padding: 2px 10px;} """) batch_mode_btn = QPushButton("切換") batch_mode_btn.clicked.connect(self.handle_batch_mode_change) batch_mode_btn.setStyleSheet(""" QPushButton { background-color: #444; color: #DDD; border: none; padding: 8px 12px; border-radius: 4px; min-width: 80px; } QPushButton:hover { background-color: #555; } """) mode_layout.addWidget(mode_label) mode_layout.addWidget(self.mode_combo) mode_layout.addWidget(batch_mode_btn) mode_layout.addStretch() # 座標輸入區域 coord_widget = QWidget() coord_layout = QHBoxLayout(coord_widget) self.x_input = QLineEdit() self.y_input = QLineEdit() self.z_input = QLineEdit() for input_field in [self.x_input, self.y_input, self.z_input]: input_field.setFixedWidth(60) input_field.setStyleSheet(""" QLineEdit { background-color: #333; color: #DDD; border: 1px solid #555; border-radius: 4px; padding: 3px; } """) coord_layout.addWidget(QLabel("X:", styleSheet="color: #DDD;")) coord_layout.addWidget(self.x_input) coord_layout.addWidget(QLabel("Y:", styleSheet="color: #DDD;")) coord_layout.addWidget(self.y_input) coord_layout.addWidget(QLabel("Z:", styleSheet="color: #DDD;")) coord_layout.addWidget(self.z_input) setpoint_btn = QPushButton("Setpoint") setpoint_btn.clicked.connect(self.handle_setpoint_selected) setpoint_btn.setStyleSheet(""" QPushButton { background-color: #444; color: #DDD; border: none; padding: 8px 12px; border-radius: 4px; min-width: 80px; } QPushButton:hover { background-color: #555; } """) lower_control = QHBoxLayout() lower_control.addWidget(coord_widget) lower_control.addWidget(setpoint_btn) lower_control.addStretch() # 組裝批次控制 layout batch_control_layout.addLayout(upper_buttons) batch_control_layout.addLayout(mode_layout) batch_control_layout.addLayout(lower_control) # 將批次控制 layout 添加到右側容器 right_layout.addLayout(batch_control_layout) # 添加地圖 right_layout.addWidget(self.drone_map.get_widget()) self.drone_map.get_gps_signal().connect(self.handle_map_click) # Add widgets to splitter main_splitter.addWidget(self.left_tab) main_splitter.addWidget(right_container) main_splitter.setSizes([400, 1000]) self.setCentralWidget(main_splitter) def handle_udp_connection_added(self, ip, port): """处理 UDP 连接添加信号""" # 创建新连接 new_conn = { 'name': f'UDP {len(self.udp_connections) + 1}', 'ip': ip, 'port': port, 'enabled': True } # 启动接收器 receiver = UDPMavlinkReceiver(ip, port, self.monitor.signals, new_conn['name']) receiver.start() self.udp_receivers.append(receiver) new_conn['receiver'] = receiver self.udp_connections.append(new_conn) # 添加到 UI self.comm_panel.add_udp_panel(new_conn) self.statusBar().showMessage(f"已添加 UDP 连接: {ip}:{port}", 3000) print(f"UDP MAVLink receiver added and started on {ip}:{port}") def handle_ws_connection_added(self, url): """处理 WebSocket 连接添加信号""" # 创建新连接 new_conn = { 'name': f'WS {len(self.ws_connections) + 1}', 'url': url, 'enabled': True } # 启动接收器 receiver = WebSocketMavlinkReceiver(url, self.monitor.signals, new_conn['name']) receiver.start() self.monitor.ws_receivers.append(receiver) new_conn['receiver'] = receiver self.ws_connections.append(new_conn) # 添加到 UI self.comm_panel.add_ws_panel(new_conn) self.statusBar().showMessage(f"已添加 WebSocket 连接: {url}", 3000) print(f"WebSocket receiver added and started: {url}") def create_drone_panel(self, drone_id): """創建無人機面板""" panel = DronePanel(drone_id) # 連接信號 panel.mode_change_requested.connect(self.handle_mode_change) panel.arm_requested.connect(self.handle_arm) panel.takeoff_requested.connect(self.handle_takeoff) panel.setpoint_requested.connect(self.handle_single_setpoint) panel.selection_changed.connect(self.handle_drone_selection) return panel def toggle_ws_connection(self, conn, btn, status_label): """切換 WebSocket 連接狀態""" if conn.get('enabled', False): # 停止連接 if 'receiver' in conn and conn['receiver']: conn['receiver'].stop() conn['enabled'] = False btn.setText("啟動") status_label.setStyleSheet("color: #888; font-size: 16px;") status_label.setToolTip("已停止") self.statusBar().showMessage(f"已停止 WebSocket 連接: {conn['url']}", 3000) else: # 啟動連接 receiver = WebSocketMavlinkReceiver(conn['url'], self.monitor.signals, conn['name']) receiver.start() self.monitor.ws_receivers.append(receiver) conn['receiver'] = receiver conn['enabled'] = True btn.setText("停止") status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") status_label.setToolTip("運行中") self.statusBar().showMessage(f"已啟動 WebSocket 連接: {conn['url']}", 3000) def remove_ws_connection(self, conn, panel): """移除 WebSocket 连接""" # 停止接收器 if 'receiver' in conn and conn['receiver']: conn['receiver'].stop() if conn['receiver'] in self.monitor.ws_receivers: self.monitor.ws_receivers.remove(conn['receiver']) # 从列表移除 if conn in self.ws_connections: self.ws_connections.remove(conn) # 从 comm_panel 列表移除 self.comm_panel.remove_ws_connection_from_list(conn) # 从 UI 移除 panel.setParent(None) panel.deleteLater() self.statusBar().showMessage(f"已移除 WebSocket 连接: {conn['url']}", 3000) def toggle_udp_connection(self, conn, btn, status_label): """切換 UDP 連接狀態""" if conn.get('enabled', False): # 停止連接 if 'receiver' in conn and conn['receiver']: conn['receiver'].stop() conn['enabled'] = False btn.setText("啟動") status_label.setStyleSheet("color: #888; font-size: 16px;") status_label.setToolTip("已停止") self.statusBar().showMessage(f"已停止 UDP 連接: {conn['ip']}:{conn['port']}", 3000) else: # 啟動連接 receiver = UDPMavlinkReceiver(conn['ip'], conn['port'], self.monitor.signals, conn['name']) receiver.start() self.udp_receivers.append(receiver) conn['receiver'] = receiver conn['enabled'] = True btn.setText("停止") status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") status_label.setToolTip("運行中") self.statusBar().showMessage(f"已啟動 UDP 連接: {conn['ip']}:{conn['port']}", 3000) def remove_udp_connection(self, conn, panel): """移除 UDP 连接""" # 停止接收器 if 'receiver' in conn and conn['receiver']: conn['receiver'].stop() if conn['receiver'] in self.udp_receivers: self.udp_receivers.remove(conn['receiver']) # 从列表移除 if conn in self.udp_connections: self.udp_connections.remove(conn) # 从 comm_panel 列表移除 self.comm_panel.remove_udp_connection_from_list(conn) # 从 UI 移除 panel.setParent(None) panel.deleteLater() self.statusBar().showMessage(f"已移除 UDP 连接: {conn['ip']}:{conn['port']}", 3000) def create_socket_group_panel(self, socket_id): """創建 socket 分組面板""" color = self.socket_colors.get(socket_id, self.socket_colors['default']) panel = SocketGroupPanel(socket_id, color) panel.group_selection_changed.connect(self.handle_group_selection) return panel def handle_mode_change(self, drone_id): """處理單個無人機的模式切換""" mode = self.mode_combo.currentText() loop = asyncio.get_event_loop() future = self.monitor.set_mode(drone_id, mode) loop.create_task(self.handle_service_response(future, f"切換模式 {mode} {drone_id}")) def handle_arm(self, drone_id): loop = asyncio.get_event_loop() arm_state = not self.monitor.get_arm_state(drone_id) # Toggle arm state future = self.monitor.arm_drone(drone_id, arm_state) loop.create_task(self.handle_service_response(future, f"{'解鎖' if arm_state else '上鎖'} {drone_id}")) def handle_takeoff(self, drone_id): loop = asyncio.get_event_loop() future = self.monitor.takeoff_drone(drone_id, 10.0) # 10 meters default loop.create_task(self.handle_service_response(future, f"起飛 {drone_id}")) def handle_setpoint_selected(self): """處理發送 setpoint 命令""" try: x = float(self.x_input.text() or '0') y = float(self.y_input.text() or '0') z = float(self.z_input.text() or '0') for drone_id in self.monitor.selected_drones: if self.monitor.send_setpoint(drone_id, x, y, z): self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000) else: self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000) except ValueError: self.statusBar().showMessage("座標格式錯誤", 3000) def handle_single_setpoint(self, drone_id): """處理單個無人機的 setpoint 命令""" try: x = float(self.x_input.text() or '0') y = float(self.y_input.text() or '0') z = float(self.z_input.text() or '0') if self.monitor.send_setpoint(drone_id, x, y, z): self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000) else: self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000) except ValueError: self.statusBar().showMessage("座標格式錯誤", 3000) async def handle_service_response(self, future, action): try: result = await future if result: self.statusBar().showMessage(f"{action} 成功", 3000) else: self.statusBar().showMessage(f"{action} 失敗", 3000) except Exception as e: self.statusBar().showMessage(f"{action} 錯誤: {str(e)}", 3000) def update_ui(self, msg_type, drone_id, data): # 檢查是否為新無人機,若是則加入無人機面板 if drone_id not in self.drones: self.add_drone(drone_id) return # 確認無人機面板存在 if not (panel := self.drones.get(drone_id)): return # 更新特定欄位並在總覽頁面更新 if msg_type == 'state': mode = data.get('mode', 'UNKNOWN') armed = data.get('armed', None) mode_color = '#FF5555' if any(x in mode.upper() for x in ['RTL', '返航', 'EMERGENCY']) else '#55FF55' if armed is True: arm_text = "ARMED" arm_color = '#55FF55' elif armed is False: arm_text = "DISARMED" arm_color = '#FF5555' else: arm_text = "--" arm_color = '#AAAAAA' # 未知狀態 # 更新無人機面板欄位 self.update_field(panel, drone_id, 'mode', mode, mode_color) self.update_field(panel, drone_id, 'armed', arm_text, arm_color) # 更新總覽頁面欄位 self.update_overview_table(drone_id, 'mode', mode) self.update_overview_table(drone_id, 'armed', arm_text) elif msg_type == 'battery': voltage = data.get('voltage', 16) # 判斷電池節數 cells = round(voltage / 3.95) # 計算電量百分比 percentage = (voltage / cells - 3.7) / 0.5 * 100 # 根據百分比設置顏色 if percentage < 20: voltage_color = '#FF6464' # 紅色 (低電量) elif percentage < 50: voltage_color = '#FFA500' # 橘色 (中低電量) else: voltage_color = '#FFFFFF' # 白色 (正常電量) percentage = data.get('percentage', percentage) # 顯示總電壓、電池節數、單節電壓和百分比 text = f"{percentage:.0f}%" vol = f"{voltage:.2f}V" self.update_field(panel, drone_id, 'battery', text, voltage_color) self.update_overview_table(drone_id, 'battery', vol) elif msg_type == 'gps': lat, lon = data.get('lat', 0), data.get('lon', 0) self.drone_positions[drone_id] = (lat, lon) lon_text = f"{lon:.6f}°" lat_text = f"{lat:.6f}°" self.update_field(panel, drone_id, 'longitude', lon_text) self.update_field(panel, drone_id, 'latitude', lat_text) self.update_overview_table(drone_id, 'longitude', lon_text) self.update_overview_table(drone_id, 'latitude', lat_text) # ================================================================================ # 【新增】同時儲存到 monitor.drone_gps(處理 UDP/WebSocket 的 GPS 資料) # ================================================================================ alt = data.get('alt', 0) # UDP/WebSocket 可能沒有 alt if not hasattr(self.monitor, 'drone_gps'): self.monitor.drone_gps = {} self.monitor.drone_gps[drone_id] = { 'lat': lat, 'lon': lon, 'alt': alt } # ================================================================================ # 更新地圖上的無人機位置 heading = self.drone_headings.get(drone_id, 0) # 如果沒有航向,使用 0 self.drone_map.update_drone_position(drone_id, lat, lon, heading) elif msg_type == 'altitude': altitude = data.get('altitude', 0) text = f"{altitude:.1f} m" self.update_field(panel, drone_id, 'altitude', text) self.update_overview_table(drone_id, 'altitude', text) elif msg_type == 'local_pose': text = f"{data['x']:.1f}, {data['y']:.1f}" self.update_field(panel, drone_id, 'local', text) self.update_overview_table(drone_id, 'local', text) elif msg_type == 'hud': heading = data.get('heading') self.drone_headings[drone_id] = heading groundspeed = data.get('groundspeed') heading_text = f"{heading:.1f}°" if isinstance(groundspeed, (int, float)): groundspeed_text = f"{groundspeed:.1f} m/s" else: groundspeed_text = "--" self.update_field(panel, drone_id, 'heading', heading_text) self.update_field(panel, drone_id, 'groundspeed', groundspeed_text) self.update_overview_table(drone_id, 'heading', heading_text) self.update_overview_table(drone_id, 'groundspeed', groundspeed_text) # 如果有位置資訊,也更新地圖上的航向 if drone_id in self.drone_positions: lat, lon = self.drone_positions[drone_id] self.drone_map.update_drone_position(drone_id, lat, lon, heading) elif msg_type == 'loss_rate': loss_rate = data.get('loss_rate', 0) text = f"{loss_rate:.1f}%" self.update_field(panel, drone_id, 'loss_rate', text) self.update_overview_table(drone_id, 'loss_rate', text) elif msg_type == 'ping': ping = data.get('ping', 0) text = f"{ping:.1f} ms" self.update_field(panel, drone_id, 'ping', text) self.update_overview_table(drone_id, 'ping', text) elif msg_type == 'velocity': text = f"{data['vx']:.1f}, {data['vy']:.1f}" self.update_overview_table(drone_id, 'velocity', text) elif msg_type == 'attitude': roll = data.get('roll', 0) pitch = data.get('pitch', 0) yaw = data.get('yaw', 0) roll_text = f"{roll:.1f}°" pitch_text = f"{pitch:.1f}°" yaw_text = f"{yaw:.1f}°" self.update_overview_table(drone_id, 'roll', roll_text) self.update_overview_table(drone_id, 'pitch', pitch_text) self.update_overview_table(drone_id, 'yaw', yaw_text) # 新增處理分組勾選的方法 def handle_group_selection(self, socket_id, state): """處理 socket 分組勾選狀態變化""" # 獲取該分組下的所有無人機 group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id] # 根據分組勾選狀態更新所有該分組的無人機勾選狀態 is_checked = state == Qt.CheckState.Checked.value for drone_id in group_drones: checkbox = self.drones[drone_id].get_checkbox() if checkbox: # 暫時斷開信號連接,避免遞迴觸發 checkbox.blockSignals(True) checkbox.setChecked(is_checked) checkbox.blockSignals(False) # 手動更新選中集合 if is_checked: self.monitor.selected_drones.add(drone_id) else: self.monitor.selected_drones.discard(drone_id) def handle_drone_selection(self, drone_id, state): """處理個別無人機勾選狀態變化""" if state == Qt.CheckState.Checked.value: self.monitor.selected_drones.add(drone_id) else: self.monitor.selected_drones.discard(drone_id) # 更新對應 socket 分組的勾選狀態 socket_id = self.get_socket_id(drone_id) self.update_group_checkbox_state(socket_id) # 新增更新分組勾選框狀態的方法 def update_group_checkbox_state(self, socket_id): """更新指定 socket 分組的勾選框狀態""" # 獲取該分組下的所有無人機 group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id] if not group_drones: return # 檢查該分組內有多少無人機被勾選 checked_count = sum(1 for did in group_drones if self.drones[did].is_checked()) # 獲取分組勾選框 if socket_id in self.socket_groups: group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox") if group_checkbox: # 暫時斷開信號連接 group_checkbox.blockSignals(True) if checked_count == 0: # 沒有無人機被勾選 group_checkbox.setCheckState(Qt.CheckState.Unchecked) elif checked_count == len(group_drones): # 所有無人機都被勾選 group_checkbox.setCheckState(Qt.CheckState.Checked) else: # 部分無人機被勾選,顯示半勾選狀態 group_checkbox.setCheckState(Qt.CheckState.PartiallyChecked) # 重新連接信號 group_checkbox.blockSignals(False) def handle_select_all(self): """處理全選按鈕 - 支援分組結構""" # 檢查是否所有無人機都已被選中 all_selected = all( self.drones[drone_id].is_checked() for drone_id in self.drones ) # 如果全部已選中,則取消全選;否則全選 new_state = not all_selected # 更新所有勾選框狀態(無人機和分組) for drone_id in self.drones: self.drones[drone_id].set_checked(new_state) # 更新所有分組勾選框狀態 for socket_id in self.socket_groups: group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox") if group_checkbox: group_checkbox.blockSignals(True) group_checkbox.setChecked(new_state) group_checkbox.blockSignals(False) def handle_arm_selected(self): """處理批次解鎖""" loop = asyncio.get_event_loop() for drone_id in self.monitor.selected_drones: future = self.monitor.arm_drone(drone_id, True) loop.create_task(self.handle_service_response(future, f"批次解鎖 {drone_id}")) def handle_takeoff_selected(self): """處理批次起飛""" loop = asyncio.get_event_loop() for drone_id in self.monitor.selected_drones: future = self.monitor.takeoff_drone(drone_id, 10.0) loop.create_task(self.handle_service_response(future, f"批次起飛 {drone_id}")) def handle_batch_mode_change(self): mode = self.mode_combo.currentText() loop = asyncio.get_event_loop() for drone_id in self.monitor.selected_drones: future = self.monitor.set_mode(drone_id, mode) loop.create_task(self.handle_service_response(future, f"{drone_id} 切換模式 {mode}")) def update_field(self, panel, drone_id, field, text, color=None): """更新面板中的指定欄位""" if isinstance(panel, DronePanel): panel.update_field(field, text, color) def update_overview_table(self, drone_id=None, field=None, value=None): """更新總覽表格""" if not hasattr(self, 'overview_table') or self.overview_table is None: return # 更新無人機引用 self.overview_table.set_drones(self.drones) # 委託給 OverviewTable 處理 self.overview_table.update_table(drone_id, field, value) def get_socket_id(self, drone_id): """從 drone_id 提取 socket_id (例如 's9_1' -> '9')""" import re match = re.match(r's(\d+)_\d+', drone_id) return match.group(1) if match else 'unknown' def add_drone(self, drone_id): if drone_id in self.drones: return # 獲取 socket_id socket_id = self.get_socket_id(drone_id) # 創建無人機面板 panel = self.create_drone_panel(drone_id) self.drones[drone_id] = panel # 如果該 socket 分組不存在,創建它 if socket_id not in self.socket_groups: group_panel = self.create_socket_group_panel(socket_id) self.socket_groups[socket_id] = group_panel # 將無人機面板添加到對應的 socket 分組中 group_panel = self.socket_groups[socket_id] group_panel.drones_layout.addWidget(panel) # 重新排序並顯示所有 socket 分組 self.reorganize_socket_groups() # 更新分組勾選框狀態 self.update_group_checkbox_state(socket_id) # 更新總覽表 self.update_overview_table() def reorganize_socket_groups(self): """重新組織和排序 socket 分組""" # 先清空主佈局 while self.drone_panel_layout.count(): w = self.drone_panel_layout.takeAt(0).widget() if w: w.setParent(None) # 對每個 socket 分組內的無人機進行排序 for socket_id, group_panel in self.socket_groups.items(): # 獲取該分組內的所有無人機 group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id] # 清空分組佈局 while group_panel.drones_layout.count(): w = group_panel.drones_layout.takeAt(0).widget() if w: w.setParent(None) # 對該分組內的無人機按數字排序 def sort_key(x): parts = x[1:].split('_') return (int(parts[0]), int(parts[1])) # 重新添加排序後的無人機面板 for did in sorted(group_drones, key=sort_key): group_panel.drones_layout.addWidget(self.drones[did]) # 按 socket_id 數字順序添加分組到主佈局 for socket_id in sorted(self.socket_groups.keys(), key=lambda x: int(x)): self.drone_panel_layout.addWidget(self.socket_groups[socket_id]) def spin_ros(self): try: self.executor.spin_once(timeout_sec=0.01) for (drone_id, msg_type), data in self.monitor.latest_data.items(): self.monitor.signals.update_signal.emit(msg_type, drone_id, data) self.monitor.latest_data.clear() except Exception as e: print(f"ROS spin error: {e}") def closeEvent(self, event): # 停止 UDP 接收器 for receiver in self.udp_receivers: receiver.stop() # 停止 WebSocket 接收器 for receiver in self.monitor.ws_receivers: receiver.stop() self.monitor.destroy_node() self.executor.shutdown() rclpy.shutdown() event.accept() # ================================================================================ # 【新增】以下方法是任務規劃功能 # ================================================================================ def handle_map_click(self, lat, lon): """ 處理地圖點擊事件,自動規劃隊形任務 Args: lat: 緯度 lon: 經度 """ print(f"地圖點擊位置: {lat:.6f}, {lon:.6f}") selected_drones = self.get_selected_drones() if len(selected_drones) == 0: self.statusBar().showMessage("⚠ 請先選擇無人機(勾選 checkbox)", 3000) return # 設定目標點 base_alt = 10.0 # 基準高度 target_gps = (lat, lon, base_alt) self.statusBar().showMessage(f"⏳ 正在規劃 {len(selected_drones)} 台無人機的任務...", 2000) try: # 取得當前無人機位置(GPS 座標) drone_gps_positions = [] for drone_id in selected_drones: # 檢查是否有 GPS 資料 if hasattr(self.monitor, 'drone_gps') and drone_id in self.monitor.drone_gps: # 使用實際的 GPS 資料 gps_data = self.monitor.drone_gps[drone_id] lat_drone = gps_data['lat'] lon_drone = gps_data['lon'] alt_drone = gps_data.get('alt', base_alt) drone_gps_positions.append((lat_drone, lon_drone, alt_drone)) print(f"使用實際 GPS: {drone_id} -> ({lat_drone:.6f}, {lon_drone:.6f}, {alt_drone:.1f})") elif drone_id in self.drone_positions: # 備用方案:從 Local 座標轉換 pos = self.drone_positions[drone_id] # 使用簡化轉換 lat_drone = 24.0 + pos[1] / 111000 lon_drone = 120.0 + pos[0] / (111000 * 0.9) alt_drone = pos[2] if len(pos) > 2 else base_alt drone_gps_positions.append((lat_drone, lon_drone, alt_drone)) print(f"使用 Local 轉換: {drone_id} -> ({lat_drone:.6f}, {lon_drone:.6f}, {alt_drone:.1f})") else: self.statusBar().showMessage(f"⚠ 找不到 {drone_id} 的位置資料", 3000) return # 規劃任務 # ================================================================================ # 【修改】接收中心點座標 # ================================================================================ stage1_gps, stage2_gps, center_origin = self.mission_planner.plan_formation_mission( drone_gps_positions, target_gps ) # ================================================================================ # 儲存規劃結果 self.planned_waypoints = { 'stage1': stage1_gps, 'stage2': stage2_gps, 'drone_ids': selected_drones } # 顯示規劃結果 self.show_planned_waypoints() # ================================================================================ # 【新增】在地圖上顯示任務規劃(中心點 + 虛線) # ================================================================================ center_lat, center_lon, center_alt = center_origin self.drone_map.draw_mission_plan( center_lat, center_lon, # 中心點座標 lat, lon # 目標點座標 ) # ================================================================================ self.statusBar().showMessage( f"✓ 任務規劃完成!{len(selected_drones)} 台無人機,2 階段飛行", 5000 ) except Exception as e: self.statusBar().showMessage(f"❌ 規劃失敗: {str(e)}", 5000) print(f"Mission planning error: {e}") import traceback traceback.print_exc() def show_planned_waypoints(self): """ 顯示規劃的航點(在終端輸出) """ if not self.planned_waypoints: return print("\n" + "=" * 60) print("任務規劃結果") print("=" * 60) drone_ids = self.planned_waypoints['drone_ids'] stage1 = self.planned_waypoints['stage1'] stage2 = self.planned_waypoints['stage2'] print(f"\n共 {len(drone_ids)} 台無人機") print(f"參數:間距={self.mission_planner.spacing}m, " f"基準高度={self.mission_planner.base_altitude}m, " f"高低差={self.mission_planner.altitude_diff}m") for i, drone_id in enumerate(drone_ids): print(f"\n【{drone_id}】") lat1, lon1, alt1 = stage1[i] lat2, lon2, alt2 = stage2[i] print(f" 階段 1(集合點):") print(f" 緯度: {lat1:.6f}°") print(f" 經度: {lon1:.6f}°") print(f" 高度: {alt1:.1f} m") print(f" 階段 2(目標點):") print(f" 緯度: {lat2:.6f}°") print(f" 經度: {lon2:.6f}°") print(f" 高度: {alt2:.1f} m") print("\n" + "=" * 60) def get_selected_drones(self): """ 取得被選中的無人機 ID 列表 Returns: list: 被選中的無人機 ID 列表 """ selected = [] for drone_id, panel in self.drones.items(): if hasattr(panel, 'checkbox') and panel.checkbox.isChecked(): selected.append(drone_id) return selected # ================================================================================ # 【新增結束】 # ================================================================================ if __name__ == '__main__': app = QApplication(sys.argv) station = ControlStationUI() station.show() app.exec()