diff --git a/src/GUI/comm_panel.py b/src/GUI/comm_panel.py new file mode 100644 index 0000000..82b539e --- /dev/null +++ b/src/GUI/comm_panel.py @@ -0,0 +1,398 @@ +#!/usr/bin/env python3 +from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QPushButton, QLineEdit) +from PyQt6.QtCore import pyqtSignal + +class CommPanel(QWidget): + """通讯设置面板类""" + + # 定义信号 + udp_connection_added = pyqtSignal(str, int) # ip, port + udp_connection_toggled = pyqtSignal(dict, QPushButton, QLabel) # conn, btn, status_label + udp_connection_removed = pyqtSignal(dict, QWidget) # conn, panel + ws_connection_added = pyqtSignal(str) # url + ws_connection_toggled = pyqtSignal(dict, QPushButton, QLabel) # conn, btn, status_label + ws_connection_removed = pyqtSignal(dict, QWidget) # conn, panel + status_message = pyqtSignal(str, int) # message, timeout + + def __init__(self, parent=None): + super().__init__(parent) + self.udp_connections = [] + self.ws_connections = [] + self._init_ui() + + def _init_ui(self): + """初始化UI""" + layout = QVBoxLayout(self) + layout.setContentsMargins(10, 10, 10, 10) + layout.setSpacing(10) + + # ========== UDP MAVLink 區域 ========== + udp_title = QLabel("UDP") + udp_title.setStyleSheet(""" + color: #DDD; + font-size: 14px; + font-weight: bold; + padding: 5px; + background-color: #333; + border-radius: 4px; + """) + layout.addWidget(udp_title) + + # UDP 連接列表容器 + self.udp_list_widget = QWidget() + self.udp_list_layout = QVBoxLayout(self.udp_list_widget) + self.udp_list_layout.setContentsMargins(0, 0, 0, 0) + self.udp_list_layout.setSpacing(5) + layout.addWidget(self.udp_list_widget) + + # UDP 添加新連接區域 + add_udp_widget = QWidget() + add_udp_layout = QHBoxLayout(add_udp_widget) + add_udp_layout.setContentsMargins(0, 0, 0, 0) + + self.udp_ip_input = QLineEdit() + self.udp_ip_input.setText("127.0.0.1") + self.udp_ip_input.setPlaceholderText("IP") + self.udp_ip_input.setStyleSheet(""" + QLineEdit { + background-color: #333; + color: #DDD; + border: 1px solid #555; + border-radius: 4px; + padding: 5px; + } + """) + + self.udp_port_input = QLineEdit() + self.udp_port_input.setText("14550") + self.udp_port_input.setPlaceholderText("Port") + self.udp_port_input.setFixedWidth(80) + self.udp_port_input.setStyleSheet(""" + QLineEdit { + background-color: #333; + color: #DDD; + border: 1px solid #555; + border-radius: 4px; + padding: 5px; + } + """) + + add_udp_btn = QPushButton("添加") + add_udp_btn.clicked.connect(self._handle_add_udp) + add_udp_btn.setStyleSheet(""" + QPushButton { + background-color: #4CAF50; + color: white; + border: none; + padding: 6px 12px; + border-radius: 4px; + min-width: 30px; + } + QPushButton:hover { background-color: #45a049; } + """) + + add_udp_layout.addWidget(QLabel("IP:", styleSheet="color: #DDD;")) + add_udp_layout.addWidget(self.udp_ip_input) + add_udp_layout.addWidget(QLabel("Port:", styleSheet="color: #DDD;")) + add_udp_layout.addWidget(self.udp_port_input) + add_udp_layout.addWidget(add_udp_btn) + + layout.addWidget(add_udp_widget) + + # 分隔線 + separator = QWidget() + separator.setFixedHeight(20) + layout.addWidget(separator) + + # ========== WebSocket 區域 ========== + ws_title = QLabel("WebSocket") + ws_title.setStyleSheet(""" + color: #DDD; + font-size: 14px; + font-weight: bold; + padding: 5px; + background-color: #333; + border-radius: 4px; + """) + layout.addWidget(ws_title) + + # WebSocket 連接列表容器 + self.ws_list_widget = QWidget() + self.ws_list_layout = QVBoxLayout(self.ws_list_widget) + self.ws_list_layout.setContentsMargins(0, 0, 0, 0) + self.ws_list_layout.setSpacing(5) + layout.addWidget(self.ws_list_widget) + + # WebSocket 添加新連接區域 + add_ws_widget = QWidget() + add_ws_layout = QHBoxLayout(add_ws_widget) + add_ws_layout.setContentsMargins(0, 0, 0, 0) + + self.ws_url_input = QLineEdit() + self.ws_url_input.setPlaceholderText("host") + self.ws_url_input.setStyleSheet(""" + QLineEdit { + background-color: #333; + color: #DDD; + border: 1px solid #555; + border-radius: 4px; + padding: 5px; + } + """) + + add_ws_btn = QPushButton("添加") + add_ws_btn.clicked.connect(self._handle_add_ws) + add_ws_btn.setStyleSheet(""" + QPushButton { + background-color: #4CAF50; + color: white; + border: none; + padding: 6px 12px; + border-radius: 4px; + min-width: 30px; + } + QPushButton:hover { background-color: #45a049; } + """) + + add_ws_layout.addWidget(QLabel("URL:", styleSheet="color: #DDD;")) + add_ws_layout.addWidget(self.ws_url_input) + add_ws_layout.addWidget(add_ws_btn) + + layout.addWidget(add_ws_widget) + layout.addStretch() + + def _handle_add_udp(self): + """處理添加 UDP 連接""" + ip = self.udp_ip_input.text().strip() + port_text = self.udp_port_input.text().strip() + + if not ip or not port_text: + self.status_message.emit("請輸入 IP 和 Port", 3000) + return + + try: + port = int(port_text) + if port < 1 or port > 65535: + raise ValueError("Port 超出範圍") + except ValueError: + self.status_message.emit("Port 必須是 1-65535 的數字", 3000) + return + + # 檢查是否已存在相同連接 + for conn in self.udp_connections: + if conn['ip'] == ip and conn['port'] == port: + self.status_message.emit("連接已存在", 3000) + return + + # 發送信號通知主窗口 + self.udp_connection_added.emit(ip, port) + + # 清空輸入框 + self.udp_ip_input.clear() + self.udp_port_input.clear() + + def _handle_add_ws(self): + """處理添加 WebSocket 連接""" + input_url = self.ws_url_input.text().strip() + + if not input_url: + self.status_message.emit("請輸入 WebSocket URL", 3000) + return + + # 自動添加 ws:// 前綴 + if not input_url.startswith('ws://') and not input_url.startswith('wss://'): + url = f'ws://{input_url}' + else: + url = input_url + + # 基本 URL 格式驗證 + try: + if '://' in url: + parts = url.split('://', 1) + if len(parts) == 2 and ':' not in parts[1]: + self.status_message.emit("URL 格式錯誤,需要包含端口號 (例如: 127.0.0.1:8756)", 3000) + return + except: + self.status_message.emit("URL 格式錯誤", 3000) + return + + # 檢查是否已存在相同連接 + for conn in self.ws_connections: + if conn['url'] == url: + self.status_message.emit("連接已存在", 3000) + return + + # 發送信號通知主窗口 + self.ws_connection_added.emit(url) + + # 清空輸入框 + self.ws_url_input.clear() + + def create_udp_connection_panel(self, conn): + """創建 UDP 連接面板""" + panel = QWidget() + panel.setStyleSheet(""" + QWidget { + background-color: #2A2A2A; + border-radius: 6px; + padding: 8px; + border: 1px solid #444; + } + """) + + layout = QHBoxLayout(panel) + layout.setContentsMargins(8, 8, 8, 8) + + # 連接資訊 + info_label = QLabel(f"{conn['name']} - {conn['ip']}:{conn['port']}") + info_label.setStyleSheet("color: #DDD; font-size: 12px;") + + # 狀態指示器 + status_label = QLabel("●") + if conn.get('enabled', False): + status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") + status_label.setToolTip("運行中") + else: + status_label.setStyleSheet("color: #888; font-size: 16px;") + status_label.setToolTip("已停止") + + # 控制按鈕 + toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動") + toggle_btn.setFixedWidth(60) + toggle_btn.clicked.connect(lambda: self.udp_connection_toggled.emit(conn, toggle_btn, status_label)) + toggle_btn.setStyleSheet(""" + QPushButton { + background-color: #444; + color: #DDD; + border: none; + padding: 4px 8px; + border-radius: 3px; + font-size: 11px; + } + QPushButton:hover { background-color: #555; } + """) + + remove_btn = QPushButton("移除") + remove_btn.setFixedWidth(60) + remove_btn.clicked.connect(lambda: self.udp_connection_removed.emit(conn, panel)) + remove_btn.setStyleSheet(""" + QPushButton { + background-color: #d32f2f; + color: white; + border: none; + padding: 4px 8px; + border-radius: 3px; + font-size: 11px; + } + QPushButton:hover { background-color: #b71c1c; } + """) + + layout.addWidget(status_label) + layout.addWidget(info_label) + layout.addStretch() + layout.addWidget(toggle_btn) + layout.addWidget(remove_btn) + + # 儲存引用 + panel.connection = conn + panel.toggle_btn = toggle_btn + panel.status_label = status_label + + return panel + + def create_ws_connection_panel(self, conn): + """創建 WebSocket 連接面板""" + panel = QWidget() + panel.setStyleSheet(""" + QWidget { + background-color: #2A2A2A; + border-radius: 6px; + padding: 8px; + border: 1px solid #444; + } + """) + + layout = QHBoxLayout(panel) + layout.setContentsMargins(8, 8, 8, 8) + + # 連接資訊 + info_label = QLabel(f"{conn['name']} - {conn['url']}") + info_label.setStyleSheet("color: #DDD; font-size: 12px;") + + # 狀態指示器 + status_label = QLabel("●") + if conn.get('enabled', False): + status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") + status_label.setToolTip("運行中") + else: + status_label.setStyleSheet("color: #888; font-size: 16px;") + status_label.setToolTip("已停止") + + # 控制按鈕 + toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動") + toggle_btn.setFixedWidth(60) + toggle_btn.clicked.connect(lambda: self.ws_connection_toggled.emit(conn, toggle_btn, status_label)) + toggle_btn.setStyleSheet(""" + QPushButton { + background-color: #444; + color: #DDD; + border: none; + padding: 4px 8px; + border-radius: 3px; + font-size: 11px; + } + QPushButton:hover { background-color: #555; } + """) + + remove_btn = QPushButton("移除") + remove_btn.setFixedWidth(60) + remove_btn.clicked.connect(lambda: self.ws_connection_removed.emit(conn, panel)) + remove_btn.setStyleSheet(""" + QPushButton { + background-color: #d32f2f; + color: white; + border: none; + padding: 4px 8px; + border-radius: 3px; + font-size: 11px; + } + QPushButton:hover { background-color: #b71c1c; } + """) + + layout.addWidget(status_label) + layout.addWidget(info_label) + layout.addStretch() + layout.addWidget(toggle_btn) + layout.addWidget(remove_btn) + + # 儲存引用 + panel.connection = conn + panel.toggle_btn = toggle_btn + panel.status_label = status_label + + return panel + + def add_udp_panel(self, conn): + """添加 UDP 連接面板到列表""" + panel = self.create_udp_connection_panel(conn) + self.udp_list_layout.addWidget(panel) + self.udp_connections.append(conn) + return panel + + def add_ws_panel(self, conn): + """添加 WebSocket 連接面板到列表""" + panel = self.create_ws_connection_panel(conn) + self.ws_list_layout.addWidget(panel) + self.ws_connections.append(conn) + return panel + + def remove_udp_connection_from_list(self, conn): + """從列表中移除 UDP 連接""" + if conn in self.udp_connections: + self.udp_connections.remove(conn) + + def remove_ws_connection_from_list(self, conn): + """從列表中移除 WebSocket 連接""" + if conn in self.ws_connections: + self.ws_connections.remove(conn) \ No newline at end of file diff --git a/src/GUI/communication.py b/src/GUI/communication.py new file mode 100644 index 0000000..45c960c --- /dev/null +++ b/src/GUI/communication.py @@ -0,0 +1,662 @@ +from rclpy.node import Node +from PyQt6.QtCore import QObject, pyqtSignal +import math +import re +import threading +from threading import Lock +import asyncio +import websockets +import json +import socket +from pymavlink import mavutil +from geometry_msgs.msg import Point, Vector3 +from sensor_msgs.msg import BatteryState, NavSatFix, Imu +from std_msgs.msg import Float64 +from mavros_msgs.msg import State, VfrHud +from mavros_msgs.srv import CommandBool, CommandTOL + +class DroneSignals(QObject): + update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data) + +class UDPMavlinkReceiver(threading.Thread): + """UDP MAVLink 接收器""" + def __init__(self, ip, port, signals, connection_name): + super().__init__(daemon=True) + self.ip = ip + self.port = port + self.signals = signals + self.connection_name = connection_name + self.running = False + self.sock = None + + def run(self): + """執行 UDP 接收循環""" + self.running = True + try: + print(f"UDP MAVLink receiver started on {self.ip}:{self.port}") + + # 創建 MAVLink 連接 + mav = mavutil.mavlink_connection(f'udpin:{self.ip}:{self.port}') + while self.running: + try: + msg = mav.recv_match(blocking=True, timeout=1.0) + if msg is None: + continue + + self.process_mavlink_message(msg) + + except socket.timeout: + continue + except Exception as e: + print(f"Error receiving MAVLink message: {e}") + + except Exception as e: + print(f"UDP receiver error: {e}") + finally: + if self.sock: + self.sock.close() + + def process_mavlink_message(self, msg): + """處理 MAVLink 訊息""" + try: + msg_type = msg.get_type() + system_id = msg.get_srcSystem() + drone_id = f"s8_{system_id}" # 使用 s8_ 前綴表示 UDP 來源 + + if msg_type == "HEARTBEAT": + mode = mavutil.mode_string_v10(msg) + armed = bool(msg.base_mode & 128) + self.signals.update_signal.emit('state', drone_id, { + 'mode': mode, + 'armed': armed + }) + + elif msg_type == "BATTERY_STATUS": + voltage = msg.voltages[0] / 1000 + self.signals.update_signal.emit('battery', drone_id, { + 'voltage': voltage + }) + + elif msg_type == "GLOBAL_POSITION_INT": + latitude = msg.lat / 1e7 + longitude = msg.lon / 1e7 + self.signals.update_signal.emit('gps', drone_id, { + 'lat': latitude, + 'lon': longitude + }) + + elif msg_type == "GPS_RAW_INT": + fix_type = msg.fix_type + + elif msg_type == "LOCAL_POSITION_NED": + x = msg.y + y = msg.x + z = -msg.z + self.signals.update_signal.emit('local_pose', drone_id, { + 'x': x, + 'y': y, + 'z': z + }) + self.signals.update_signal.emit('altitude', drone_id, { + 'altitude': z + }) + self.signals.update_signal.emit('velocity', drone_id, { + 'vx': msg.vx, + 'vy': msg.vy, + 'vz': msg.vz + }) + + elif msg_type == "ATTITUDE": + pitch = math.degrees(msg.pitch) + self.signals.update_signal.emit('attitude', drone_id, { + 'pitch': pitch, + 'roll': 0, + 'yaw': 0, + 'rates': (0, 0, 0) + }) + + elif msg_type == "VFR_HUD": + groundspeed = msg.groundspeed + heading = msg.heading + self.signals.update_signal.emit('hud', drone_id, { + 'heading': heading, + 'groundspeed': groundspeed + }) + + except Exception as e: + print(f"Error processing MAVLink message: {e}") + + def stop(self): + """停止接收器""" + self.running = False + +class SerialMavlinkReceiver(threading.Thread): + """串口 MAVLink 接收器""" + def __init__(self, port, baudrate, signals, connection_name): + super().__init__(daemon=True) + self.port = port + self.baudrate = baudrate + self.signals = signals + self.connection_name = connection_name + self.running = False + self.mav = None + + def run(self): + """執行串口接收循環""" + self.running = True + try: + print(f"Serial MAVLink receiver started on {self.port} at {self.baudrate} baud") + + # 創建 MAVLink 串口連接 + self.mav = mavutil.mavlink_connection( + self.port, + baud=self.baudrate, + source_system=255 + ) + + print(f"Waiting for heartbeat from {self.port}...") + self.mav.wait_heartbeat() + print(f"Heartbeat received from system {self.mav.target_system}, component {self.mav.target_component}") + + while self.running: + try: + msg = self.mav.recv_match(blocking=True, timeout=1.0) + if msg is None: + continue + + self.process_mavlink_message(msg) + + except Exception as e: + if self.running: + print(f"Error receiving MAVLink message from serial: {e}") + + except Exception as e: + print(f"Serial receiver error: {e}") + finally: + if self.mav: + try: + self.mav.close() + except: + pass + + def process_mavlink_message(self, msg): + """處理 MAVLink 訊息""" + try: + msg_type = msg.get_type() + system_id = msg.get_srcSystem() + drone_id = f"s5_{system_id}" # 使用 serial_ 前綴表示串口來源 + + if msg_type == "HEARTBEAT": + mode = mavutil.mode_string_v10(msg) + armed = bool(msg.base_mode & 128) + self.signals.update_signal.emit('state', drone_id, { + 'mode': mode, + 'armed': armed + }) + + elif msg_type == "BATTERY_STATUS": + voltage = msg.voltages[0] / 1000 + self.signals.update_signal.emit('battery', drone_id, { + 'voltage': voltage + }) + + elif msg_type == "GLOBAL_POSITION_INT": + latitude = msg.lat / 1e7 + longitude = msg.lon / 1e7 + self.signals.update_signal.emit('gps', drone_id, { + 'lat': latitude, + 'lon': longitude + }) + + elif msg_type == "GPS_RAW_INT": + fix_type = msg.fix_type + + elif msg_type == "LOCAL_POSITION_NED": + x = msg.y + y = msg.x + z = -msg.z + self.signals.update_signal.emit('local_pose', drone_id, { + 'x': x, + 'y': y, + 'z': z + }) + self.signals.update_signal.emit('altitude', drone_id, { + 'altitude': z + }) + self.signals.update_signal.emit('velocity', drone_id, { + 'vx': msg.vx, + 'vy': msg.vy, + 'vz': msg.vz + }) + + elif msg_type == "ATTITUDE": + pitch = math.degrees(msg.pitch) + self.signals.update_signal.emit('attitude', drone_id, { + 'pitch': pitch, + 'roll': 0, + 'yaw': 0, + 'rates': (0, 0, 0) + }) + + elif msg_type == "VFR_HUD": + groundspeed = msg.groundspeed + heading = msg.heading + self.signals.update_signal.emit('hud', drone_id, { + 'heading': heading, + 'groundspeed': groundspeed + }) + + except Exception as e: + print(f"Error processing MAVLink message from serial: {e}") + + def stop(self): + """停止接收器""" + self.running = False + +class WebSocketMavlinkReceiver(threading.Thread): + """WebSocket MAVLink 接收器""" + def __init__(self, url, signals, connection_name): + super().__init__(daemon=True) + self.url = url + self.signals = signals + self.connection_name = connection_name + self.running = False + self.max_retries = 5 + self.base_delay = 1.0 + + def run(self): + """執行 WebSocket 接收循環""" + self.running = True + asyncio.set_event_loop(asyncio.new_event_loop()) + asyncio.get_event_loop().run_until_complete(self.ws_client_loop()) + + async def ws_client_loop(self): + """WebSocket 連接的主循環""" + retry_count = 0 + + print(f"Starting WebSocket client for {self.connection_name} at {self.url}") + + while self.running and retry_count < self.max_retries: + try: + async with websockets.connect(self.url) as websocket: + print(f"WebSocket {self.connection_name} connected to {self.url}") + retry_count = 0 # 重置重試計數 + + async for message in websocket: + if not self.running: + break + + try: + data = json.loads(message) + data['_connection_source'] = self.connection_name + self.process_websocket_message(data) + except json.JSONDecodeError as e: + print(f"WebSocket {self.connection_name} JSON decode error: {e}") + except Exception as e: + print(f"WebSocket {self.connection_name} message processing error: {e}") + + except websockets.exceptions.ConnectionClosedError: + print(f"WebSocket {self.connection_name} connection closed") + if self.running: + retry_count += 1 + if retry_count < self.max_retries: + delay = self.base_delay * (2 ** min(retry_count, 4)) + print(f"Reconnecting in {delay}s...") + await asyncio.sleep(delay) + else: + break + except Exception as e: + retry_count += 1 + if retry_count < self.max_retries and self.running: + delay = self.base_delay * (2 ** min(retry_count, 4)) + print(f"WebSocket {self.connection_name} connection error: {e}, retrying in {delay}s (attempt {retry_count}/{self.max_retries})") + await asyncio.sleep(delay) + else: + break + + print(f"WebSocket client {self.connection_name} stopped") + + def process_websocket_message(self, data): + """處理 WebSocket 訊息""" + try: + drone_id = data.get('system_id') + drone_id = f"s9_{drone_id}" if drone_id else None + if not drone_id: + return + + # 模式 + if 'mode' in data: + self.signals.update_signal.emit('state', drone_id, { + 'mode': data['mode'], + }) + + # 電池 + if 'battery' in data: + self.signals.update_signal.emit('battery', drone_id, { + 'percentage': data['battery'] + }) + + # 位置 + if 'position' in data: + pos = data['position'] + self.signals.update_signal.emit('gps', drone_id, { + 'lat': pos.get('lat', 0), + 'lon': pos.get('lon', 0) + }) + + # Local position - 設定 x, y 為 0.0 + self.signals.update_signal.emit('local_pose', drone_id, { + 'x': 0.0, + 'y': 0.0, + 'z': 0.0 + }) + + # Altitude - 設定為 0.0 + self.signals.update_signal.emit('altitude', drone_id, { + 'altitude': 0.0 + }) + + # 航向 + if 'heading' in data: + self.signals.update_signal.emit('hud', drone_id, { + 'heading': data['heading'], + 'groundspeed': 0.0 + }) + + except Exception as e: + print(f"WebSocket message processing error: {e}") + + def stop(self): + """停止接收器""" + self.running = False + +class DroneMonitor(Node): + def __init__(self): + super().__init__('drone_monitor') + self.signals = DroneSignals() + self.drone_topics = {} + self.lock = Lock() + + self.arm_clients = {} + self.takeoff_clients = {} + self.setpoint_pubs = {} + self.selected_drones = set() + self.latest_data = {} + + # 定義需要過濾的模式 + self.filtered_modes = ['Mode(0x000000c0)'] + + # WebSocket 接收器列表 + self.ws_receivers = [] + + # 串口接收器列表 + + # ================================================================================ + # 【新增】儲存 GPS 資料的字典 + # ================================================================================ + self.drone_gps = {} # {drone_id: {'lat': ..., 'lon': ..., 'alt': ...}} + # ================================================================================ + self.serial_receivers = [] + + # 主题检测定时器 + self.create_timer(1.0, self.scan_topics) + + def scan_topics(self): + topics = self.get_topic_names_and_types() + drone_pattern = re.compile(r'/MavLinkBus/(s\d+_\d+)/(\w+)') + + found_drones = set() + for topic_name, _ in topics: + if match := drone_pattern.match(topic_name): + drone_id, topic_type = match.groups() + found_drones.add(drone_id) + with self.lock: + self.drone_topics.setdefault(drone_id, set()).add(topic_type) + + for drone_id in found_drones: + if not hasattr(self, f'drone_{drone_id}_subs'): + self.setup_drone(drone_id) + + def setup_drone(self, drone_id): + base_topic = f'/MavLinkBus/{drone_id}' + + # Add service clients + self.arm_clients[drone_id] = self.create_client( + CommandBool, + f'{base_topic}/cmd/arming' + ) + self.takeoff_clients[drone_id] = self.create_client( + CommandTOL, + f'{base_topic}/cmd/takeoff' + ) + + # Add setpoint publisher + self.setpoint_pubs[drone_id] = self.create_publisher( + Point, + f'{base_topic}/setpoint_position/local', + 10 + ) + + subs = { + 'attitude': self.create_subscription( + Imu, + f'{base_topic}/attitude', + lambda msg, did=drone_id: self.attitude_callback(did, msg), + 10 + ), + 'battery': self.create_subscription( + BatteryState, + f'{base_topic}/battery', + lambda msg, did=drone_id: self.battery_callback(did, msg), + 10 + ), + 'global': self.create_subscription( + NavSatFix, + f'{base_topic}/global_position/global', + lambda msg, did=drone_id: self.gps_callback(did, msg), + 10 + ), + 'rel_alt': self.create_subscription( + Float64, + f'{base_topic}/global_position/rel_alt', + lambda msg, did=drone_id: self.altitude_callback(did, msg), + 10 + ), + 'local_pose': self.create_subscription( + Point, + f'{base_topic}/local_position/pose', + lambda msg, did=drone_id: self.local_pose_callback(did, msg), + 10 + ), + 'local_vel': self.create_subscription( + Vector3, + f'{base_topic}/local_position/velocity', + lambda msg, did=drone_id: self.local_vel_callback(did, msg), + 10 + ), + 'loss_rate': self.create_subscription( + Float64, + f'{base_topic}/packet_loss_rate', + lambda msg, did=drone_id: self.loss_rate_callback(did, msg), + 10 + ), + 'state': self.create_subscription( + State, + f'{base_topic}/state', + lambda msg, did=drone_id: self.state_callback(did, msg), + 10 + ), + 'ping': self.create_subscription( + Float64, + f'{base_topic}/ping', + lambda msg, did=drone_id: self.ping_callback(did, msg), + 10 + ), + 'vfr_hud': self.create_subscription( + VfrHud, + f'{base_topic}/vfr_hud', + lambda msg, did=drone_id: self.hud_callback(did, msg), + 10 + ) + } + + setattr(self, f'drone_{drone_id}_subs', subs) + + async def arm_drone(self, drone_id, arm): + if drone_id not in self.arm_clients: + return False + + client = self.arm_clients[drone_id] + if not client.wait_for_service(timeout_sec=1.0): + return False + + request = CommandBool.Request() + request.value = arm + + future = client.call_async(request) + try: + response = await future + return response.success + except Exception as e: + self.get_logger().error(f'Arm service call failed: {e}') + return False + + async def takeoff_drone(self, drone_id, altitude=10.0): + if drone_id not in self.takeoff_clients: + return False + + client = self.takeoff_clients[drone_id] + if not client.wait_for_service(timeout_sec=1.0): + return False + + request = CommandTOL.Request() + request.altitude = altitude + request.min_pitch = 0.0 + request.yaw = 0.0 + + future = client.call_async(request) + try: + response = await future + return response.success + except Exception as e: + self.get_logger().error(f'Takeoff service call failed: {e}') + return False + + def send_setpoint(self, drone_id, x, y, z): + """Send setpoint position command""" + if drone_id not in self.setpoint_pubs: + return False + + msg = Point() + msg.x = float(x) + msg.y = float(y) + msg.z = float(z) + + self.setpoint_pubs[drone_id].publish(msg) + return True + + def quaternion_to_euler(self, q): + sinr_cosp = 2 * (q.w * q.x + q.y * q.z) + cosr_cosp = 1 - 2 * (q.x**2 + q.y**2) + roll = math.atan2(sinr_cosp, cosr_cosp) + + sinp = 2 * (q.w * q.y - q.z * q.x) + pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi/2, sinp) + + siny_cosp = 2 * (q.w * q.z + q.x * q.y) + cosy_cosp = 1 - 2 * (q.y**2 + q.z**2) + yaw = math.atan2(siny_cosp, cosy_cosp) + + return math.degrees(roll), math.degrees(pitch), math.degrees(yaw) + + # callbacks + def attitude_callback(self, drone_id, msg): + if hasattr(msg, 'orientation'): + roll, pitch, yaw = self.quaternion_to_euler(msg.orientation) + self.latest_data[(drone_id, 'attitude')] = { + 'roll': roll, + 'pitch': pitch, + 'yaw': yaw, + 'rates': (msg.angular_velocity.x, + msg.angular_velocity.y, + msg.angular_velocity.z) + } + + def battery_callback(self, drone_id, msg): + self.latest_data[(drone_id, 'battery')] = { + 'voltage': msg.voltage + } + + def state_callback(self, drone_id, msg): + mode = msg.mode + if mode in self.filtered_modes: + return + self.latest_data[(drone_id, 'state')] = { + 'mode': msg.mode, + 'armed': msg.armed + } + + def gps_callback(self, drone_id, msg): + self.latest_data[(drone_id, 'gps')] = { + 'lat': msg.latitude, + 'lon': msg.longitude, + 'alt': msg.altitude + } + + # ================================================================================ + # 【新增】儲存 GPS 資料到 drone_gps 字典 + # ================================================================================ + self.drone_gps[drone_id] = { + 'lat': msg.latitude, + 'lon': msg.longitude, + 'alt': msg.altitude + } + # ================================================================================ + + def local_vel_callback(self, drone_id, msg): + self.latest_data[(drone_id, 'velocity')] = { + 'vx': msg.x, + 'vy': msg.y, + 'vz': msg.z + } + + def altitude_callback(self, drone_id, msg): + self.latest_data[(drone_id, 'altitude')] = { + 'altitude': msg.data + } + + def local_pose_callback(self, drone_id, msg): + self.latest_data[(drone_id, 'local_pose')] = { + 'x': msg.x, + 'y': msg.y, + 'z': msg.z + } + + def hud_callback(self, drone_id, msg): + self.latest_data[(drone_id, 'hud')] = { + 'airspeed': msg.airspeed, + 'groundspeed': msg.groundspeed, + 'heading': msg.heading, + 'throttle': msg.throttle, + 'alt': msg.altitude, + 'climb': msg.climb + } + + def loss_rate_callback(self, drone_id, msg): + self.latest_data[(drone_id, 'loss_rate')] = { + 'loss_rate': msg.data + } + + def ping_callback(self, drone_id, msg): + self.latest_data[(drone_id, 'ping')] = { + 'ping': msg.data + } + + def start_serial_connection(self, port='/dev/ttyUSB0', baudrate=57600): + """啟動串口 MAVLink 連接""" + connection_name = f"Serial_{port.replace('/', '_')}" + receiver = SerialMavlinkReceiver(port, baudrate, self.signals, connection_name) + receiver.start() + self.serial_receivers.append(receiver) + print(f"Started serial connection on {port} at {baudrate} baud") + return receiver \ No newline at end of file diff --git a/src/GUI/drone_panel.py b/src/GUI/drone_panel.py new file mode 100644 index 0000000..a5a7bf5 --- /dev/null +++ b/src/GUI/drone_panel.py @@ -0,0 +1,378 @@ +#!/usr/bin/env python3 +from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QPushButton, QSizePolicy, QCheckBox) +from PyQt6.QtCore import pyqtSignal + +class DronePanel(QWidget): + """單個無人機面板類別""" + + # 定義信號 + mode_change_requested = pyqtSignal(str) # drone_id + arm_requested = pyqtSignal(str) # drone_id + takeoff_requested = pyqtSignal(str) # drone_id + setpoint_requested = pyqtSignal(str) # drone_id + selection_changed = pyqtSignal(str, int) # drone_id, state + + def __init__(self, drone_id, parent=None): + super().__init__(parent) + self.drone_id = drone_id + self.display_id = 's' + drone_id.split('_')[1] + self._init_ui() + + def _init_ui(self): + """初始化UI""" + self.setObjectName(f"panel_{self.drone_id}") + self.setFixedHeight(140) + self.setStyleSheet(""" + background-color: #2A2A2A; + border-radius: 8px; + """) + + # 主佈局 + main_layout = QHBoxLayout(self) + main_layout.setContentsMargins(8, 8, 8, 8) + main_layout.setSpacing(0) + + # 創建內容容器(包含 info 和 control) + content_widget = QWidget() + content_widget.setStyleSheet("background-color: #333; border-radius: 6px;") + content_layout = QHBoxLayout(content_widget) + content_layout.setContentsMargins(8, 8, 8, 8) + content_layout.setSpacing(8) + + # 左側資訊區域 + info_widget = self._create_info_section() + + # 右側控制按鈕區域 + control_widget = self._create_control_section() + + # 將 info 和 control 加入內容容器 + content_layout.addWidget(info_widget) + content_layout.addWidget(control_widget) + + # 將內容容器加入主佈局 + main_layout.addWidget(content_widget) + + def _create_info_section(self): + """創建資訊顯示區域""" + info_widget = QWidget() + info_layout = QVBoxLayout(info_widget) + info_layout.setContentsMargins(0, 0, 0, 0) + info_layout.setSpacing(4) + + # 頂部標題欄 + header = QWidget() + header_layout = QHBoxLayout(header) + header_layout.setContentsMargins(0, 0, 0, 0) + + # 勾選框 + self.checkbox = QCheckBox() + self.checkbox.setObjectName(f"{self.drone_id}_checkbox") + self.checkbox.setStyleSheet("QCheckBox { color: #DDD; }") + self.checkbox.stateChanged.connect( + lambda state: self.selection_changed.emit(self.drone_id, state) + ) + + # ID 顯示 + id_label = QLabel(self.display_id) + id_label.setStyleSheet(""" + font-weight: bold; + font-size: 14px; + color: #7FFFD4; + min-width: 80px; + """) + + header_layout.addWidget(self.checkbox) + header_layout.addWidget(id_label) + header_layout.addStretch() + + info_layout.addWidget(header) + + # 第一行:狀態 (模式 + ARM狀態) + status_row = self._create_status_row() + info_layout.addWidget(status_row) + + # 第二行:電池 + battery_row = self._create_battery_row() + info_layout.addWidget(battery_row) + + # 第三行:位置 + 高度 + position_row = self._create_position_row() + info_layout.addWidget(position_row) + + # 第四行:航向 + 速度 + nav_row = self._create_nav_row() + info_layout.addWidget(nav_row) + + return info_widget + + def _create_status_row(self): + """創建狀態行""" + status_row = QWidget() + status_layout = QHBoxLayout(status_row) + status_layout.setContentsMargins(0, 0, 0, 0) + + status_title = QLabel("狀態:") + status_title.setStyleSheet("color: #888; min-width: 50px;") + + self.mode_label = QLabel("--") + self.mode_label.setObjectName(f"{self.drone_id}_mode") + self.mode_label.setStyleSheet("color: #DDD;") + + self.armed_label = QLabel("--") + self.armed_label.setObjectName(f"{self.drone_id}_armed") + self.armed_label.setStyleSheet("color: #DDD;") + + status_layout.addWidget(status_title) + status_layout.addWidget(self.mode_label) + status_layout.addWidget(self.armed_label) + status_layout.addStretch() + + return status_row + + def _create_battery_row(self): + """創建電池行""" + battery_row = QWidget() + battery_layout = QHBoxLayout(battery_row) + battery_layout.setContentsMargins(0, 0, 0, 0) + + battery_title = QLabel("電池:") + battery_title.setStyleSheet("color: #888; min-width: 50px;") + + self.battery_label = QLabel("--") + self.battery_label.setObjectName(f"{self.drone_id}_battery") + self.battery_label.setStyleSheet("color: #DDD;") + + battery_layout.addWidget(battery_title) + battery_layout.addWidget(self.battery_label) + battery_layout.addStretch() + + return battery_row + + def _create_position_row(self): + """創建位置行""" + position_row = QWidget() + position_layout = QHBoxLayout(position_row) + position_layout.setContentsMargins(0, 0, 0, 0) + + position_title = QLabel("位置:") + position_title.setStyleSheet("color: #888; min-width: 50px;") + + self.local_label = QLabel("--") + self.local_label.setObjectName(f"{self.drone_id}_local") + self.local_label.setStyleSheet("color: #DDD;") + + altitude_title = QLabel("高度:") + altitude_title.setStyleSheet("color: #888; margin-left: 10px;") + + self.altitude_label = QLabel("--") + self.altitude_label.setObjectName(f"{self.drone_id}_altitude") + self.altitude_label.setStyleSheet("color: #DDD;") + + position_layout.addWidget(position_title) + position_layout.addWidget(self.local_label) + position_layout.addWidget(altitude_title) + position_layout.addWidget(self.altitude_label) + position_layout.addStretch() + + return position_row + + def _create_nav_row(self): + """創建導航行""" + nav_row = QWidget() + nav_layout = QHBoxLayout(nav_row) + nav_layout.setContentsMargins(0, 0, 0, 0) + + heading_title = QLabel("航向:") + heading_title.setStyleSheet("color: #888; min-width: 50px;") + + self.heading_label = QLabel("--") + self.heading_label.setObjectName(f"{self.drone_id}_heading") + self.heading_label.setStyleSheet("color: #DDD;") + + speed_title = QLabel("速度:") + speed_title.setStyleSheet("color: #888; margin-left: 10px;") + + self.groundspeed_label = QLabel("--") + self.groundspeed_label.setObjectName(f"{self.drone_id}_groundspeed") + self.groundspeed_label.setStyleSheet("color: #DDD;") + + nav_layout.addWidget(heading_title) + nav_layout.addWidget(self.heading_label) + nav_layout.addWidget(speed_title) + nav_layout.addWidget(self.groundspeed_label) + nav_layout.addStretch() + + return nav_row + + def _create_control_section(self): + """創建控制按鈕區域""" + control_widget = QWidget() + control_layout = QVBoxLayout(control_widget) + control_layout.setContentsMargins(0, 0, 0, 0) + control_layout.setSpacing(6) + + control_widget.setFixedWidth(80) + + btn_style = """ + QPushButton { + background-color: #444; + color: #DDD; + border: none; + border-radius: 4px; + font-size: 11px; + } + QPushButton:hover { + background-color: #555; + } + """ + # 模式切換按鈕 + mode_btn = QPushButton("切換模式") + mode_btn.setStyleSheet(btn_style) + mode_btn.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding) + mode_btn.clicked.connect(lambda: self.mode_change_requested.emit(self.drone_id)) + + # 解鎖按鈕 + arm_btn = QPushButton("解鎖") + arm_btn.setStyleSheet(btn_style) + arm_btn.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding) + arm_btn.clicked.connect(lambda: self.arm_requested.emit(self.drone_id)) + + # 起飛按鈕 + takeoff_btn = QPushButton("起飛") + takeoff_btn.setStyleSheet(btn_style) + takeoff_btn.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding) + takeoff_btn.clicked.connect(lambda: self.takeoff_requested.emit(self.drone_id)) + + # Setpoint 按鈕 + setpoint_btn = QPushButton("Setpoint") + setpoint_btn.setStyleSheet(btn_style) + setpoint_btn.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding) + setpoint_btn.clicked.connect(lambda: self.setpoint_requested.emit(self.drone_id)) + + control_layout.addWidget(mode_btn) + control_layout.addWidget(arm_btn) + control_layout.addWidget(takeoff_btn) + control_layout.addWidget(setpoint_btn) + + return control_widget + + def update_field(self, field, text, color=None): + """更新指定欄位的值""" + label = self.findChild(QLabel, f"{self.drone_id}_{field}") + if label and label.text() != text: + label.setText(text) + if color: + label.setStyleSheet(f"color: {color};") + + def get_checkbox(self): + """獲取勾選框""" + return self.checkbox + + def set_checked(self, checked): + """設置勾選狀態""" + self.checkbox.setChecked(checked) + + def is_checked(self): + """獲取勾選狀態""" + return self.checkbox.isChecked() + +class SocketGroupPanel(QWidget): + # 定義信號 + group_selection_changed = pyqtSignal(str, int) # socket_id, state + + def __init__(self, socket_id, color='#AAAAAA', parent=None): + super().__init__(parent) + self.socket_id = socket_id + self.color = color + self._init_ui() + + def _init_ui(self): + """初始化UI""" + self.setObjectName(f"socket_group_{self.socket_id}") + self.setStyleSheet(""" + background-color: #1E1E1E; + border-radius: 12px; + """) + + layout = QVBoxLayout(self) + layout.setContentsMargins(10, 10, 10, 10) + layout.setSpacing(6) + + # Socket 分組標題行 - 包含勾選框 + title_row = QWidget() + title_layout = QHBoxLayout(title_row) + title_layout.setContentsMargins(0, 0, 0, 0) + + # 分組勾選框 + self.group_checkbox = QCheckBox() + self.group_checkbox.setObjectName(f"socket_{self.socket_id}_checkbox") + self.group_checkbox.setStyleSheet(f""" + QCheckBox {{ color: #DDD; }} + QCheckBox::indicator {{ + width: 14px; + height: 14px; + border: 2px solid #888888; + border-radius: 3px; + background: transparent; + }} + QCheckBox::indicator:checked {{ + background-color: {self.color}; + border: 2px solid #888888; + }} + QCheckBox::indicator:indeterminate {{ + background-color: #666; + border: 2px solid #888888; + }} + """) + self.group_checkbox.stateChanged.connect( + lambda state: self.group_selection_changed.emit(self.socket_id, state) + ) + + # Socket 分組標題 + title_label = QLabel(f"Socket {self.socket_id}") + title_label.setStyleSheet(f""" + font-weight: bold; + font-size: 16px; + color: {self.color}; + margin-bottom: 8px; + padding: 4px 8px; + border-radius: 6px; + """) + + title_layout.addWidget(self.group_checkbox) + title_layout.addWidget(title_label) + title_layout.addStretch() + + layout.addWidget(title_row) + + # 創建子容器用於放置該 socket 下的所有無人機面板 + self.drones_container = QWidget() + self.drones_layout = QVBoxLayout(self.drones_container) + self.drones_layout.setContentsMargins(0, 0, 0, 0) + self.drones_layout.setSpacing(4) + + layout.addWidget(self.drones_container) + + def add_drone_panel(self, panel): + """添加無人機面板到分組""" + self.drones_layout.addWidget(panel) + + def clear_drones(self): + """清空所有無人機面板""" + while self.drones_layout.count(): + item = self.drones_layout.takeAt(0) + if item.widget(): + item.widget().setParent(None) + + def get_checkbox(self): + """獲取分組勾選框""" + return self.group_checkbox + + def set_checked(self, checked): + """設置分組勾選狀態""" + self.group_checkbox.setChecked(checked) + + def set_check_state(self, state): + """設置分組勾選狀態(支持半選)""" + self.group_checkbox.setCheckState(state) \ No newline at end of file diff --git a/src/GUI/gui.py b/src/GUI/gui.py new file mode 100644 index 0000000..cefc9c9 --- /dev/null +++ b/src/GUI/gui.py @@ -0,0 +1,1009 @@ +#!/usr/bin/env python3 +import rclpy +from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, + QWidget, QLabel, QSplitter, QScrollArea, + QSizePolicy, QTabWidget, QTableWidget, QTableWidgetItem, + QHeaderView, QPushButton, QCheckBox, QLineEdit) +from PyQt6.QtCore import Qt, QTimer +import sys +import asyncio + +# 導入分離的類別 +from communication import DroneMonitor, UDPMavlinkReceiver, WebSocketMavlinkReceiver +from map_layout import DroneMap +from drone_panel import DronePanel, SocketGroupPanel +from comm_panel import CommPanel +from overview_table import OverviewTable + +# ================================================================================ +# 【新增】導入任務規劃器 +# ================================================================================ +from mission_planner import FormationPlanner +# ================================================================================ + +class ControlStationUI(QMainWindow): + def __init__(self): + super().__init__() + self.setWindowTitle('GCS') + self.resize(1400, 900) + + # 初始化ROS2 + rclpy.init() + self.monitor = DroneMonitor() + self.monitor.signals.update_signal.connect(self.update_ui) + + # ROS执行器配置 + self.executor = rclpy.executors.SingleThreadedExecutor() + self.executor.add_node(self.monitor) + + # 定时处理ROS事件 + self.timer = QTimer() + self.timer.timeout.connect(self.spin_ros) + self.timer.start(10) + + # 初始化UI + self.drones = {} + self.socket_groups = {} + self.info_types = ["模式", "ARM", "電壓", "經度", "緯度", "高度", "Local", "Velocity", "地速", "航向", + "Roll", "Pitch", "Yaw", "丟包", "延遲"] + self.info_type_map = { + "mode": 0, + "armed": 1, + "battery": 2, + "longitude": 3, + "latitude": 4, + "altitude": 5, + "local": 6, + "velocity": 7, + "groundspeed": 8, + "heading": 9, + "roll": 10, + "pitch": 11, + "yaw": 12, + "loss_rate": 13, + "ping": 14 + } + + self.socket_colors = { + '0': '#00BFFF', # 天藍色 (DeepSkyBlue) + '1': '#FFD700', # 金色 (Gold) + '2': '#FF6969', # 淺紅色 (Light Red) + '3': '#FF69B4', # 熱粉紅 (HotPink) + '4': '#00FA9A', # 中春綠 (MediumSpringGreen) + '5': '#9370DB', # 中紫色 (MediumPurple) - 串口 + '6': '#FFA500', # 橙色 (Orange) + '7': '#20B2AA', # 淺海綠 (LightSeaGreen) + '8': '#7CFC00', # 草綠色 (LawnGreen) + '9': '#FF8C00', # 深橙色 (DarkOrange) + 'default': '#AAAAAA' # 灰色 + } + + self.drone_positions = {} + self.drone_headings = {} + # 初始化地圖 + self.drone_map = DroneMap() + # 初始化連接列表 + self.udp_receivers = [] + self.udp_connections = [] + self.ws_connections = [] + self.monitor.start_serial_connection('/dev/ttyUSB0', 57600) + + # ================================================================================ + # 【新增】初始化任務規劃器 + # ================================================================================ + self.mission_planner = FormationPlanner( + spacing=5.0, # 5 公尺間距 + base_altitude=10.0, # 基準高度 10 公尺 + altitude_diff=2.0 # 高低差 2 公尺 + ) + self.planned_waypoints = None # 儲存規劃結果 + # ================================================================================ + + self.init_ui() + + def init_ui(self): + # 只呼叫一次 + main_splitter = QSplitter(Qt.Orientation.Horizontal) + + # 左側 TabWidget + self.left_tab = QTabWidget() + + # — 分頁 1:Drone Panel + self.drone_panel_container = QWidget() + self.drone_panel_layout = QVBoxLayout(self.drone_panel_container) + self.drone_panel_layout.setAlignment(Qt.AlignmentFlag.AlignTop) + self.drone_panel_layout.setSpacing(0) + self.drone_panel_layout.setContentsMargins(10, 10, 10, 10) + + # Wrap drone panel in scroll area + scroll = QScrollArea() + scroll.setWidget(self.drone_panel_container) + scroll.setWidgetResizable(True) + self.left_tab.addTab(scroll, "無人載具") + + # — 分頁 2:Overview Table + self.overview_table = OverviewTable(self.info_types, self.info_type_map) + self.left_tab.addTab(self.overview_table, "總覽") + + # — 分頁 3:通訊設定 + self.comm_panel = CommPanel() + self.comm_panel.udp_connection_added.connect(self.handle_udp_connection_added) + self.comm_panel.ws_connection_added.connect(self.handle_ws_connection_added) + self.comm_panel.udp_connection_toggled.connect(self.toggle_udp_connection) + self.comm_panel.ws_connection_toggled.connect(self.toggle_ws_connection) + self.comm_panel.udp_connection_removed.connect(self.remove_udp_connection) + self.comm_panel.ws_connection_removed.connect(self.remove_ws_connection) + self.comm_panel.status_message.connect(lambda msg, timeout: self.statusBar().showMessage(msg, timeout)) + + self.left_tab.addTab(self.comm_panel, "通訊") + + # 右侧容器 + right_container = QWidget() + right_layout = QVBoxLayout(right_container) + right_layout.setContentsMargins(10, 10, 10, 10) + right_layout.setSpacing(10) + + # ========== 批次控制區域(直接使用 layout) ========== + batch_control_layout = QHBoxLayout() + + # 添加批次操作標題 + batch_title = QLabel("批次操作") + batch_title.setStyleSheet(""" + color: #DDD; + font-size: 16px; + font-weight: bold; + padding: 5px; + background-color: #333; + border-radius: 4px; + """) + batch_control_layout.addWidget(batch_title) + + # 上方按鈕區域 + upper_buttons = QHBoxLayout() + select_all_btn = QPushButton("全選") + select_all_btn.clicked.connect(self.handle_select_all) + arm_all_btn = QPushButton("解鎖") + arm_all_btn.clicked.connect(self.handle_arm_selected) + takeoff_all_btn = QPushButton("起飛") + takeoff_all_btn.clicked.connect(self.handle_takeoff_selected) + for btn in [select_all_btn, arm_all_btn, takeoff_all_btn]: + btn.setStyleSheet(""" + QPushButton { + background-color: #444; + color: #DDD; + border: none; + padding: 8px 12px; + border-radius: 4px; + min-width: 80px; + } + QPushButton:hover { background-color: #555; } + """) + upper_buttons.addWidget(btn) + upper_buttons.addStretch() + + # 模式切換區域 + mode_layout = QHBoxLayout() + mode_label = QLabel("模式:") + mode_label.setStyleSheet("color: #DDD; min-width: 40px;") + + from PyQt6.QtWidgets import QComboBox + self.mode_combo = QComboBox() + self.mode_combo.addItems(["AUTO", "GUIDED", "LOITER", "LAND"]) + self.mode_combo.setCurrentIndex(1) + self.mode_combo.setStyleSheet(""" + QComboBox { background-color: #333; color: #DDD; border-radius: 3px; padding: 2px 10px;} + """) + + batch_mode_btn = QPushButton("切換") + batch_mode_btn.clicked.connect(self.handle_batch_mode_change) + batch_mode_btn.setStyleSheet(""" + QPushButton { + background-color: #444; + color: #DDD; + border: none; + padding: 8px 12px; + border-radius: 4px; + min-width: 80px; + } + QPushButton:hover { background-color: #555; } + """) + mode_layout.addWidget(mode_label) + mode_layout.addWidget(self.mode_combo) + mode_layout.addWidget(batch_mode_btn) + mode_layout.addStretch() + + # 座標輸入區域 + coord_widget = QWidget() + coord_layout = QHBoxLayout(coord_widget) + + self.x_input = QLineEdit() + self.y_input = QLineEdit() + self.z_input = QLineEdit() + + for input_field in [self.x_input, self.y_input, self.z_input]: + input_field.setFixedWidth(60) + input_field.setStyleSheet(""" + QLineEdit { + background-color: #333; + color: #DDD; + border: 1px solid #555; + border-radius: 4px; + padding: 3px; + } + """) + + coord_layout.addWidget(QLabel("X:", styleSheet="color: #DDD;")) + coord_layout.addWidget(self.x_input) + coord_layout.addWidget(QLabel("Y:", styleSheet="color: #DDD;")) + coord_layout.addWidget(self.y_input) + coord_layout.addWidget(QLabel("Z:", styleSheet="color: #DDD;")) + coord_layout.addWidget(self.z_input) + + setpoint_btn = QPushButton("Setpoint") + setpoint_btn.clicked.connect(self.handle_setpoint_selected) + setpoint_btn.setStyleSheet(""" + QPushButton { + background-color: #444; + color: #DDD; + border: none; + padding: 8px 12px; + border-radius: 4px; + min-width: 80px; + } + QPushButton:hover { background-color: #555; } + """) + + lower_control = QHBoxLayout() + lower_control.addWidget(coord_widget) + lower_control.addWidget(setpoint_btn) + lower_control.addStretch() + + # 組裝批次控制 layout + batch_control_layout.addLayout(upper_buttons) + batch_control_layout.addLayout(mode_layout) + batch_control_layout.addLayout(lower_control) + + # 將批次控制 layout 添加到右側容器 + right_layout.addLayout(batch_control_layout) + + # 添加地圖 + right_layout.addWidget(self.drone_map.get_widget()) + self.drone_map.get_gps_signal().connect(self.handle_map_click) + + + # Add widgets to splitter + main_splitter.addWidget(self.left_tab) + main_splitter.addWidget(right_container) + main_splitter.setSizes([400, 1000]) + + self.setCentralWidget(main_splitter) + + + def handle_udp_connection_added(self, ip, port): + """处理 UDP 连接添加信号""" + # 创建新连接 + new_conn = { + 'name': f'UDP {len(self.udp_connections) + 1}', + 'ip': ip, + 'port': port, + 'enabled': True + } + + # 启动接收器 + receiver = UDPMavlinkReceiver(ip, port, self.monitor.signals, new_conn['name']) + receiver.start() + self.udp_receivers.append(receiver) + new_conn['receiver'] = receiver + + self.udp_connections.append(new_conn) + + # 添加到 UI + self.comm_panel.add_udp_panel(new_conn) + + self.statusBar().showMessage(f"已添加 UDP 连接: {ip}:{port}", 3000) + print(f"UDP MAVLink receiver added and started on {ip}:{port}") + + def handle_ws_connection_added(self, url): + """处理 WebSocket 连接添加信号""" + # 创建新连接 + new_conn = { + 'name': f'WS {len(self.ws_connections) + 1}', + 'url': url, + 'enabled': True + } + + # 启动接收器 + receiver = WebSocketMavlinkReceiver(url, self.monitor.signals, new_conn['name']) + receiver.start() + self.monitor.ws_receivers.append(receiver) + new_conn['receiver'] = receiver + + self.ws_connections.append(new_conn) + + # 添加到 UI + self.comm_panel.add_ws_panel(new_conn) + + self.statusBar().showMessage(f"已添加 WebSocket 连接: {url}", 3000) + print(f"WebSocket receiver added and started: {url}") + + def create_drone_panel(self, drone_id): + """創建無人機面板""" + panel = DronePanel(drone_id) + + # 連接信號 + panel.mode_change_requested.connect(self.handle_mode_change) + panel.arm_requested.connect(self.handle_arm) + panel.takeoff_requested.connect(self.handle_takeoff) + panel.setpoint_requested.connect(self.handle_single_setpoint) + panel.selection_changed.connect(self.handle_drone_selection) + + return panel + + def toggle_ws_connection(self, conn, btn, status_label): + """切換 WebSocket 連接狀態""" + if conn.get('enabled', False): + # 停止連接 + if 'receiver' in conn and conn['receiver']: + conn['receiver'].stop() + conn['enabled'] = False + btn.setText("啟動") + status_label.setStyleSheet("color: #888; font-size: 16px;") + status_label.setToolTip("已停止") + self.statusBar().showMessage(f"已停止 WebSocket 連接: {conn['url']}", 3000) + else: + # 啟動連接 + receiver = WebSocketMavlinkReceiver(conn['url'], self.monitor.signals, conn['name']) + receiver.start() + self.monitor.ws_receivers.append(receiver) + conn['receiver'] = receiver + conn['enabled'] = True + btn.setText("停止") + status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") + status_label.setToolTip("運行中") + self.statusBar().showMessage(f"已啟動 WebSocket 連接: {conn['url']}", 3000) + + def remove_ws_connection(self, conn, panel): + """移除 WebSocket 连接""" + # 停止接收器 + if 'receiver' in conn and conn['receiver']: + conn['receiver'].stop() + if conn['receiver'] in self.monitor.ws_receivers: + self.monitor.ws_receivers.remove(conn['receiver']) + + # 从列表移除 + if conn in self.ws_connections: + self.ws_connections.remove(conn) + + # 从 comm_panel 列表移除 + self.comm_panel.remove_ws_connection_from_list(conn) + + # 从 UI 移除 + panel.setParent(None) + panel.deleteLater() + + self.statusBar().showMessage(f"已移除 WebSocket 连接: {conn['url']}", 3000) + + def toggle_udp_connection(self, conn, btn, status_label): + """切換 UDP 連接狀態""" + if conn.get('enabled', False): + # 停止連接 + if 'receiver' in conn and conn['receiver']: + conn['receiver'].stop() + conn['enabled'] = False + btn.setText("啟動") + status_label.setStyleSheet("color: #888; font-size: 16px;") + status_label.setToolTip("已停止") + self.statusBar().showMessage(f"已停止 UDP 連接: {conn['ip']}:{conn['port']}", 3000) + else: + # 啟動連接 + receiver = UDPMavlinkReceiver(conn['ip'], conn['port'], self.monitor.signals, conn['name']) + receiver.start() + self.udp_receivers.append(receiver) + conn['receiver'] = receiver + conn['enabled'] = True + btn.setText("停止") + status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") + status_label.setToolTip("運行中") + self.statusBar().showMessage(f"已啟動 UDP 連接: {conn['ip']}:{conn['port']}", 3000) + + def remove_udp_connection(self, conn, panel): + """移除 UDP 连接""" + # 停止接收器 + if 'receiver' in conn and conn['receiver']: + conn['receiver'].stop() + if conn['receiver'] in self.udp_receivers: + self.udp_receivers.remove(conn['receiver']) + + # 从列表移除 + if conn in self.udp_connections: + self.udp_connections.remove(conn) + + # 从 comm_panel 列表移除 + self.comm_panel.remove_udp_connection_from_list(conn) + + # 从 UI 移除 + panel.setParent(None) + panel.deleteLater() + + self.statusBar().showMessage(f"已移除 UDP 连接: {conn['ip']}:{conn['port']}", 3000) + + def create_socket_group_panel(self, socket_id): + """創建 socket 分組面板""" + color = self.socket_colors.get(socket_id, self.socket_colors['default']) + panel = SocketGroupPanel(socket_id, color) + panel.group_selection_changed.connect(self.handle_group_selection) + return panel + + def handle_mode_change(self, drone_id): + """處理單個無人機的模式切換""" + mode = self.mode_combo.currentText() + loop = asyncio.get_event_loop() + future = self.monitor.set_mode(drone_id, mode) + loop.create_task(self.handle_service_response(future, f"切換模式 {mode} {drone_id}")) + + def handle_arm(self, drone_id): + loop = asyncio.get_event_loop() + arm_state = not self.monitor.get_arm_state(drone_id) # Toggle arm state + future = self.monitor.arm_drone(drone_id, arm_state) + loop.create_task(self.handle_service_response(future, f"{'解鎖' if arm_state else '上鎖'} {drone_id}")) + + def handle_takeoff(self, drone_id): + loop = asyncio.get_event_loop() + future = self.monitor.takeoff_drone(drone_id, 10.0) # 10 meters default + loop.create_task(self.handle_service_response(future, f"起飛 {drone_id}")) + + def handle_setpoint_selected(self): + """處理發送 setpoint 命令""" + try: + x = float(self.x_input.text() or '0') + y = float(self.y_input.text() or '0') + z = float(self.z_input.text() or '0') + + for drone_id in self.monitor.selected_drones: + if self.monitor.send_setpoint(drone_id, x, y, z): + self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000) + else: + self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000) + except ValueError: + self.statusBar().showMessage("座標格式錯誤", 3000) + + def handle_single_setpoint(self, drone_id): + """處理單個無人機的 setpoint 命令""" + try: + x = float(self.x_input.text() or '0') + y = float(self.y_input.text() or '0') + z = float(self.z_input.text() or '0') + + if self.monitor.send_setpoint(drone_id, x, y, z): + self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000) + else: + self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000) + except ValueError: + self.statusBar().showMessage("座標格式錯誤", 3000) + + async def handle_service_response(self, future, action): + try: + result = await future + if result: + self.statusBar().showMessage(f"{action} 成功", 3000) + else: + self.statusBar().showMessage(f"{action} 失敗", 3000) + except Exception as e: + self.statusBar().showMessage(f"{action} 錯誤: {str(e)}", 3000) + + def update_ui(self, msg_type, drone_id, data): + # 檢查是否為新無人機,若是則加入無人機面板 + if drone_id not in self.drones: + self.add_drone(drone_id) + return + + # 確認無人機面板存在 + if not (panel := self.drones.get(drone_id)): + return + + # 更新特定欄位並在總覽頁面更新 + if msg_type == 'state': + mode = data.get('mode', 'UNKNOWN') + armed = data.get('armed', None) + mode_color = '#FF5555' if any(x in mode.upper() for x in ['RTL', '返航', 'EMERGENCY']) else '#55FF55' + if armed is True: + arm_text = "ARMED" + arm_color = '#55FF55' + elif armed is False: + arm_text = "DISARMED" + arm_color = '#FF5555' + else: + arm_text = "--" + arm_color = '#AAAAAA' # 未知狀態 + + # 更新無人機面板欄位 + self.update_field(panel, drone_id, 'mode', mode, mode_color) + self.update_field(panel, drone_id, 'armed', arm_text, arm_color) + + # 更新總覽頁面欄位 + self.update_overview_table(drone_id, 'mode', mode) + self.update_overview_table(drone_id, 'armed', arm_text) + + elif msg_type == 'battery': + voltage = data.get('voltage', 16) + + # 判斷電池節數 + cells = round(voltage / 3.95) + + # 計算電量百分比 + percentage = (voltage / cells - 3.7) / 0.5 * 100 + + # 根據百分比設置顏色 + if percentage < 20: + voltage_color = '#FF6464' # 紅色 (低電量) + elif percentage < 50: + voltage_color = '#FFA500' # 橘色 (中低電量) + else: + voltage_color = '#FFFFFF' # 白色 (正常電量) + + percentage = data.get('percentage', percentage) + + # 顯示總電壓、電池節數、單節電壓和百分比 + text = f"{percentage:.0f}%" + vol = f"{voltage:.2f}V" + + self.update_field(panel, drone_id, 'battery', text, voltage_color) + self.update_overview_table(drone_id, 'battery', vol) + + elif msg_type == 'gps': + lat, lon = data.get('lat', 0), data.get('lon', 0) + self.drone_positions[drone_id] = (lat, lon) + lon_text = f"{lon:.6f}°" + lat_text = f"{lat:.6f}°" + self.update_field(panel, drone_id, 'longitude', lon_text) + self.update_field(panel, drone_id, 'latitude', lat_text) + self.update_overview_table(drone_id, 'longitude', lon_text) + self.update_overview_table(drone_id, 'latitude', lat_text) + + # ================================================================================ + # 【新增】同時儲存到 monitor.drone_gps(處理 UDP/WebSocket 的 GPS 資料) + # ================================================================================ + alt = data.get('alt', 0) # UDP/WebSocket 可能沒有 alt + if not hasattr(self.monitor, 'drone_gps'): + self.monitor.drone_gps = {} + self.monitor.drone_gps[drone_id] = { + 'lat': lat, + 'lon': lon, + 'alt': alt + } + # ================================================================================ + + # 更新地圖上的無人機位置 + heading = self.drone_headings.get(drone_id, 0) # 如果沒有航向,使用 0 + self.drone_map.update_drone_position(drone_id, lat, lon, heading) + + elif msg_type == 'altitude': + altitude = data.get('altitude', 0) + text = f"{altitude:.1f} m" + self.update_field(panel, drone_id, 'altitude', text) + self.update_overview_table(drone_id, 'altitude', text) + + elif msg_type == 'local_pose': + text = f"{data['x']:.1f}, {data['y']:.1f}" + self.update_field(panel, drone_id, 'local', text) + self.update_overview_table(drone_id, 'local', text) + + elif msg_type == 'hud': + heading = data.get('heading') + self.drone_headings[drone_id] = heading + groundspeed = data.get('groundspeed') + heading_text = f"{heading:.1f}°" + if isinstance(groundspeed, (int, float)): + groundspeed_text = f"{groundspeed:.1f} m/s" + else: + groundspeed_text = "--" + self.update_field(panel, drone_id, 'heading', heading_text) + self.update_field(panel, drone_id, 'groundspeed', groundspeed_text) + self.update_overview_table(drone_id, 'heading', heading_text) + self.update_overview_table(drone_id, 'groundspeed', groundspeed_text) + + # 如果有位置資訊,也更新地圖上的航向 + if drone_id in self.drone_positions: + lat, lon = self.drone_positions[drone_id] + self.drone_map.update_drone_position(drone_id, lat, lon, heading) + + elif msg_type == 'loss_rate': + loss_rate = data.get('loss_rate', 0) + text = f"{loss_rate:.1f}%" + self.update_field(panel, drone_id, 'loss_rate', text) + self.update_overview_table(drone_id, 'loss_rate', text) + + elif msg_type == 'ping': + ping = data.get('ping', 0) + text = f"{ping:.1f} ms" + self.update_field(panel, drone_id, 'ping', text) + self.update_overview_table(drone_id, 'ping', text) + + elif msg_type == 'velocity': + text = f"{data['vx']:.1f}, {data['vy']:.1f}" + self.update_overview_table(drone_id, 'velocity', text) + + elif msg_type == 'attitude': + roll = data.get('roll', 0) + pitch = data.get('pitch', 0) + yaw = data.get('yaw', 0) + roll_text = f"{roll:.1f}°" + pitch_text = f"{pitch:.1f}°" + yaw_text = f"{yaw:.1f}°" + self.update_overview_table(drone_id, 'roll', roll_text) + self.update_overview_table(drone_id, 'pitch', pitch_text) + self.update_overview_table(drone_id, 'yaw', yaw_text) + + # 新增處理分組勾選的方法 + def handle_group_selection(self, socket_id, state): + """處理 socket 分組勾選狀態變化""" + # 獲取該分組下的所有無人機 + group_drones = [did for did in self.drones.keys() + if self.get_socket_id(did) == socket_id] + + # 根據分組勾選狀態更新所有該分組的無人機勾選狀態 + is_checked = state == Qt.CheckState.Checked.value + + for drone_id in group_drones: + checkbox = self.drones[drone_id].get_checkbox() + if checkbox: + # 暫時斷開信號連接,避免遞迴觸發 + checkbox.blockSignals(True) + checkbox.setChecked(is_checked) + checkbox.blockSignals(False) + + # 手動更新選中集合 + if is_checked: + self.monitor.selected_drones.add(drone_id) + else: + self.monitor.selected_drones.discard(drone_id) + + def handle_drone_selection(self, drone_id, state): + """處理個別無人機勾選狀態變化""" + if state == Qt.CheckState.Checked.value: + self.monitor.selected_drones.add(drone_id) + else: + self.monitor.selected_drones.discard(drone_id) + + # 更新對應 socket 分組的勾選狀態 + socket_id = self.get_socket_id(drone_id) + self.update_group_checkbox_state(socket_id) + + # 新增更新分組勾選框狀態的方法 + def update_group_checkbox_state(self, socket_id): + """更新指定 socket 分組的勾選框狀態""" + # 獲取該分組下的所有無人機 + group_drones = [did for did in self.drones.keys() + if self.get_socket_id(did) == socket_id] + + if not group_drones: + return + + # 檢查該分組內有多少無人機被勾選 + checked_count = sum(1 for did in group_drones + if self.drones[did].is_checked()) + + # 獲取分組勾選框 + if socket_id in self.socket_groups: + group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox") + if group_checkbox: + # 暫時斷開信號連接 + group_checkbox.blockSignals(True) + + if checked_count == 0: + # 沒有無人機被勾選 + group_checkbox.setCheckState(Qt.CheckState.Unchecked) + elif checked_count == len(group_drones): + # 所有無人機都被勾選 + group_checkbox.setCheckState(Qt.CheckState.Checked) + else: + # 部分無人機被勾選,顯示半勾選狀態 + group_checkbox.setCheckState(Qt.CheckState.PartiallyChecked) + + # 重新連接信號 + group_checkbox.blockSignals(False) + + def handle_select_all(self): + """處理全選按鈕 - 支援分組結構""" + # 檢查是否所有無人機都已被選中 + all_selected = all( + self.drones[drone_id].is_checked() + for drone_id in self.drones + ) + + # 如果全部已選中,則取消全選;否則全選 + new_state = not all_selected + + # 更新所有勾選框狀態(無人機和分組) + for drone_id in self.drones: + self.drones[drone_id].set_checked(new_state) + + # 更新所有分組勾選框狀態 + for socket_id in self.socket_groups: + group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox") + if group_checkbox: + group_checkbox.blockSignals(True) + group_checkbox.setChecked(new_state) + group_checkbox.blockSignals(False) + + def handle_arm_selected(self): + """處理批次解鎖""" + loop = asyncio.get_event_loop() + for drone_id in self.monitor.selected_drones: + future = self.monitor.arm_drone(drone_id, True) + loop.create_task(self.handle_service_response(future, f"批次解鎖 {drone_id}")) + + def handle_takeoff_selected(self): + """處理批次起飛""" + loop = asyncio.get_event_loop() + for drone_id in self.monitor.selected_drones: + future = self.monitor.takeoff_drone(drone_id, 10.0) + loop.create_task(self.handle_service_response(future, f"批次起飛 {drone_id}")) + + def handle_batch_mode_change(self): + mode = self.mode_combo.currentText() + loop = asyncio.get_event_loop() + for drone_id in self.monitor.selected_drones: + future = self.monitor.set_mode(drone_id, mode) + loop.create_task(self.handle_service_response(future, f"{drone_id} 切換模式 {mode}")) + + def update_field(self, panel, drone_id, field, text, color=None): + """更新面板中的指定欄位""" + if isinstance(panel, DronePanel): + panel.update_field(field, text, color) + + def update_overview_table(self, drone_id=None, field=None, value=None): + """更新總覽表格""" + if not hasattr(self, 'overview_table') or self.overview_table is None: + return + + # 更新無人機引用 + self.overview_table.set_drones(self.drones) + # 委託給 OverviewTable 處理 + self.overview_table.update_table(drone_id, field, value) + + def get_socket_id(self, drone_id): + """從 drone_id 提取 socket_id (例如 's9_1' -> '9')""" + import re + match = re.match(r's(\d+)_\d+', drone_id) + return match.group(1) if match else 'unknown' + + def add_drone(self, drone_id): + if drone_id in self.drones: + return + + # 獲取 socket_id + socket_id = self.get_socket_id(drone_id) + + # 創建無人機面板 + panel = self.create_drone_panel(drone_id) + self.drones[drone_id] = panel + + # 如果該 socket 分組不存在,創建它 + if socket_id not in self.socket_groups: + group_panel = self.create_socket_group_panel(socket_id) + self.socket_groups[socket_id] = group_panel + + # 將無人機面板添加到對應的 socket 分組中 + group_panel = self.socket_groups[socket_id] + group_panel.drones_layout.addWidget(panel) + + # 重新排序並顯示所有 socket 分組 + self.reorganize_socket_groups() + + # 更新分組勾選框狀態 + self.update_group_checkbox_state(socket_id) + + # 更新總覽表 + self.update_overview_table() + + def reorganize_socket_groups(self): + """重新組織和排序 socket 分組""" + # 先清空主佈局 + while self.drone_panel_layout.count(): + w = self.drone_panel_layout.takeAt(0).widget() + if w: + w.setParent(None) + + # 對每個 socket 分組內的無人機進行排序 + for socket_id, group_panel in self.socket_groups.items(): + # 獲取該分組內的所有無人機 + group_drones = [did for did in self.drones.keys() + if self.get_socket_id(did) == socket_id] + + # 清空分組佈局 + while group_panel.drones_layout.count(): + w = group_panel.drones_layout.takeAt(0).widget() + if w: + w.setParent(None) + + # 對該分組內的無人機按數字排序 + def sort_key(x): + parts = x[1:].split('_') + return (int(parts[0]), int(parts[1])) + + # 重新添加排序後的無人機面板 + for did in sorted(group_drones, key=sort_key): + group_panel.drones_layout.addWidget(self.drones[did]) + + # 按 socket_id 數字順序添加分組到主佈局 + for socket_id in sorted(self.socket_groups.keys(), key=lambda x: int(x)): + self.drone_panel_layout.addWidget(self.socket_groups[socket_id]) + + def spin_ros(self): + try: + self.executor.spin_once(timeout_sec=0.01) + for (drone_id, msg_type), data in self.monitor.latest_data.items(): + self.monitor.signals.update_signal.emit(msg_type, drone_id, data) + self.monitor.latest_data.clear() + except Exception as e: + print(f"ROS spin error: {e}") + + def closeEvent(self, event): + # 停止 UDP 接收器 + for receiver in self.udp_receivers: + receiver.stop() + + # 停止 WebSocket 接收器 + for receiver in self.monitor.ws_receivers: + receiver.stop() + + self.monitor.destroy_node() + self.executor.shutdown() + rclpy.shutdown() + event.accept() + + # ================================================================================ + # 【新增】以下方法是任務規劃功能 + # ================================================================================ + + def handle_map_click(self, lat, lon): + """ + 處理地圖點擊事件,自動規劃隊形任務 + + Args: + lat: 緯度 + lon: 經度 + """ + print(f"地圖點擊位置: {lat:.6f}, {lon:.6f}") + + selected_drones = self.get_selected_drones() + + if len(selected_drones) == 0: + self.statusBar().showMessage("⚠ 請先選擇無人機(勾選 checkbox)", 3000) + return + + # 設定目標點 + base_alt = 10.0 # 基準高度 + target_gps = (lat, lon, base_alt) + + self.statusBar().showMessage(f"⏳ 正在規劃 {len(selected_drones)} 台無人機的任務...", 2000) + + try: + # 取得當前無人機位置(GPS 座標) + drone_gps_positions = [] + + for drone_id in selected_drones: + # 檢查是否有 GPS 資料 + if hasattr(self.monitor, 'drone_gps') and drone_id in self.monitor.drone_gps: + # 使用實際的 GPS 資料 + gps_data = self.monitor.drone_gps[drone_id] + lat_drone = gps_data['lat'] + lon_drone = gps_data['lon'] + alt_drone = gps_data.get('alt', base_alt) + drone_gps_positions.append((lat_drone, lon_drone, alt_drone)) + print(f"使用實際 GPS: {drone_id} -> ({lat_drone:.6f}, {lon_drone:.6f}, {alt_drone:.1f})") + + elif drone_id in self.drone_positions: + # 備用方案:從 Local 座標轉換 + pos = self.drone_positions[drone_id] + # 使用簡化轉換 + lat_drone = 24.0 + pos[1] / 111000 + lon_drone = 120.0 + pos[0] / (111000 * 0.9) + alt_drone = pos[2] if len(pos) > 2 else base_alt + drone_gps_positions.append((lat_drone, lon_drone, alt_drone)) + print(f"使用 Local 轉換: {drone_id} -> ({lat_drone:.6f}, {lon_drone:.6f}, {alt_drone:.1f})") + + else: + self.statusBar().showMessage(f"⚠ 找不到 {drone_id} 的位置資料", 3000) + return + + # 規劃任務 + # ================================================================================ + # 【修改】接收中心點座標 + # ================================================================================ + stage1_gps, stage2_gps, center_origin = self.mission_planner.plan_formation_mission( + drone_gps_positions, + target_gps + ) + # ================================================================================ + + # 儲存規劃結果 + self.planned_waypoints = { + 'stage1': stage1_gps, + 'stage2': stage2_gps, + 'drone_ids': selected_drones + } + + # 顯示規劃結果 + self.show_planned_waypoints() + + # ================================================================================ + # 【新增】在地圖上顯示任務規劃(中心點 + 虛線) + # ================================================================================ + center_lat, center_lon, center_alt = center_origin + self.drone_map.draw_mission_plan( + center_lat, center_lon, # 中心點座標 + lat, lon # 目標點座標 + ) + # ================================================================================ + + self.statusBar().showMessage( + f"✓ 任務規劃完成!{len(selected_drones)} 台無人機,2 階段飛行", 5000 + ) + + except Exception as e: + self.statusBar().showMessage(f"❌ 規劃失敗: {str(e)}", 5000) + print(f"Mission planning error: {e}") + import traceback + traceback.print_exc() + + def show_planned_waypoints(self): + """ + 顯示規劃的航點(在終端輸出) + """ + if not self.planned_waypoints: + return + + print("\n" + "=" * 60) + print("任務規劃結果") + print("=" * 60) + + drone_ids = self.planned_waypoints['drone_ids'] + stage1 = self.planned_waypoints['stage1'] + stage2 = self.planned_waypoints['stage2'] + + print(f"\n共 {len(drone_ids)} 台無人機") + print(f"參數:間距={self.mission_planner.spacing}m, " + f"基準高度={self.mission_planner.base_altitude}m, " + f"高低差={self.mission_planner.altitude_diff}m") + + for i, drone_id in enumerate(drone_ids): + print(f"\n【{drone_id}】") + lat1, lon1, alt1 = stage1[i] + lat2, lon2, alt2 = stage2[i] + + print(f" 階段 1(集合點):") + print(f" 緯度: {lat1:.6f}°") + print(f" 經度: {lon1:.6f}°") + print(f" 高度: {alt1:.1f} m") + + print(f" 階段 2(目標點):") + print(f" 緯度: {lat2:.6f}°") + print(f" 經度: {lon2:.6f}°") + print(f" 高度: {alt2:.1f} m") + + print("\n" + "=" * 60) + + def get_selected_drones(self): + """ + 取得被選中的無人機 ID 列表 + + Returns: + list: 被選中的無人機 ID 列表 + """ + selected = [] + for drone_id, panel in self.drones.items(): + if hasattr(panel, 'checkbox') and panel.checkbox.isChecked(): + selected.append(drone_id) + return selected + + # ================================================================================ + # 【新增結束】 + # ================================================================================ + +if __name__ == '__main__': + app = QApplication(sys.argv) + station = ControlStationUI() + station.show() + app.exec() \ No newline at end of file diff --git a/src/GUI/map_layout.py b/src/GUI/map_layout.py new file mode 100644 index 0000000..24bd2d2 --- /dev/null +++ b/src/GUI/map_layout.py @@ -0,0 +1,436 @@ +#!/usr/bin/env python3 +from PyQt6.QtWebEngineWidgets import QWebEngineView +from PyQt6.QtCore import QTimer, pyqtSignal, QObject, pyqtSlot +from PyQt6.QtWebChannel import QWebChannel + +class DroneMap: + """無人機地圖類別 - 負責管理 Leaflet 地圖顯示""" + + def __init__(self): + """初始化地圖""" + self.map_view = QWebEngineView() + self.map_loaded = False + self.pending_map_updates = {} + + # 創建橋接對象 + self.bridge = MapBridge() + + # 設置 QWebChannel + self.channel = QWebChannel() + self.channel.registerObject('bridge', self.bridge) + self.map_view.page().setWebChannel(self.channel) + + # 設置地圖 HTML + inline_html = ''' + + + + + + + + + + + +
+
+ +
+ + + + + ''' + + self.map_view.setHtml(inline_html) + self.map_view.loadFinished.connect(self._on_map_loaded) + + # 設置地圖更新計時器 + self.map_update_timer = QTimer() + self.map_update_timer.timeout.connect(self.update_map_positions) + self.map_update_timer.start(200) # 每 200ms 更新一次 + + def _on_map_loaded(self, ok: bool): + """地圖加載完成回調""" + if ok: + self.map_loaded = True + else: + print("⚠️ 地圖加載失敗") + + def update_drone_position(self, drone_id, lat, lon, heading): + """更新無人機位置(加入待處理隊列)""" + self.pending_map_updates[drone_id] = (lat, lon, heading) + + def update_map_positions(self): + """批量更新地圖上的無人機位置""" + if not self.map_loaded or not self.pending_map_updates: + return + + # 批量執行所有待更新的位置 + js_commands = [] + for drone_id, (lat, lon, heading) in self.pending_map_updates.items(): + js_commands.append(f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});") + + if js_commands: + combined_js = "\n".join(js_commands) + self.map_view.page().runJavaScript(combined_js) + + # 清空待更新緩存 + self.pending_map_updates.clear() + + def clear_trajectories(self): + """清除所有軌跡""" + if self.map_loaded: + self.map_view.page().runJavaScript("clearAllTrajectories();") + + def focus_on_drone(self, drone_id): + """聚焦到指定無人機""" + if self.map_loaded: + self.map_view.page().runJavaScript(f"focusOn('{drone_id}');") + + # ================================================================================ + # 【新增】任務規劃視覺化方法 + # ================================================================================ + def draw_mission_plan(self, center_lat, center_lon, target_lat, target_lon): + """ + 在地圖上繪製任務規劃 + + Args: + center_lat: 中心點緯度 + center_lon: 中心點經度 + target_lat: 目標點緯度 + target_lon: 目標點經度 + """ + if self.map_loaded: + js_code = f"drawMissionPlan({center_lat:.6f}, {center_lon:.6f}, {target_lat:.6f}, {target_lon:.6f});" + self.map_view.page().runJavaScript(js_code) + print(f"📍 地圖已繪製任務規劃: C({center_lat:.6f}, {center_lon:.6f}) -> T({target_lat:.6f}, {target_lon:.6f})") + + def clear_mission_plan(self): + """清除地圖上的任務規劃標記""" + if self.map_loaded: + self.map_view.page().runJavaScript("clearMissionPlan();") + print("🗑️ 地圖已清除任務規劃") + # ================================================================================ + + def get_widget(self): + """獲取地圖 widget""" + return self.map_view + + def get_gps_signal(self): + """獲取 GPS 信號""" + return self.bridge.gps_signal + +class MapBridge(QObject): + """JavaScript 和 Python 之間的橋接類""" + gps_signal = pyqtSignal(float, float) # lat, lon + + def __init__(self): + super().__init__() + + @pyqtSlot(float, float) + def emitGpsSignal(self, lat, lon): + """供 JavaScript 調用的方法""" + self.gps_signal.emit(lat, lon) \ No newline at end of file diff --git a/src/GUI/mission_planner.py b/src/GUI/mission_planner.py new file mode 100644 index 0000000..138795d --- /dev/null +++ b/src/GUI/mission_planner.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python3 +""" +飛行任務規劃模組 +負責將 GPS 點擊座標轉換為隊形飛行任務 +""" +import math +from typing import List, Tuple, Optional + + +class CoordinateConverter: + """GPS 座標與 Local 座標的轉換器""" + + def __init__(self, origin_lat: float, origin_lon: float, origin_alt: float = 0): + """ + 初始化座標轉換器 + + Args: + origin_lat: 參考原點緯度 + origin_lon: 參考原點經度 + origin_alt: 參考原點高度(公尺,相對於海平面) + """ + self.origin_lat = origin_lat + self.origin_lon = origin_lon + self.origin_alt = origin_alt + self.R = 6371000.0 # 地球半徑(公尺) + + def gps_to_local(self, lat: float, lon: float, alt: float) -> Tuple[float, float, float]: + """ + GPS 座標轉換為 Local 座標(ENU 系統:East-North-Up) + + Args: + lat: 緯度 + lon: 經度 + alt: 高度(公尺,相對於海平面) + + Returns: + (x, y, z): Local 座標(公尺) + x: East(東方) + y: North(北方) + z: Up(向上) + """ + lat_rad = math.radians(lat) + lon_rad = math.radians(lon) + origin_lat_rad = math.radians(self.origin_lat) + origin_lon_rad = math.radians(self.origin_lon) + + dlat = lat_rad - origin_lat_rad + dlon = lon_rad - origin_lon_rad + + # 使用 Equirectangular projection(適用於小範圍 < 10 km) + x = self.R * dlon * math.cos((lat_rad + origin_lat_rad) / 2) # East + y = self.R * dlat # North + z = alt - self.origin_alt # Up + + return x, y, z + + def local_to_gps(self, x: float, y: float, z: float) -> Tuple[float, float, float]: + """ + Local 座標轉換為 GPS 座標 + + Args: + x: East(東方,公尺) + y: North(北方,公尺) + z: Up(向上,公尺) + + Returns: + (lat, lon, alt): GPS 座標 + """ + origin_lat_rad = math.radians(self.origin_lat) + origin_lon_rad = math.radians(self.origin_lon) + + lat_rad = origin_lat_rad + (y / self.R) + lon_rad = origin_lon_rad + (x / (self.R * math.cos((lat_rad + origin_lat_rad) / 2))) + + lat = math.degrees(lat_rad) + lon = math.degrees(lon_rad) + alt = self.origin_alt + z + + return lat, lon, alt + + +class FormationPlanner: + """隊形規劃器""" + + def __init__(self, spacing: float = 5.0, + base_altitude: float = 10.0, + altitude_diff: float = 2.0): + """ + 初始化隊形規劃器 + + Args: + spacing: 無人機間距(公尺) + base_altitude: 基準飛行高度(公尺,相對於參考原點) + altitude_diff: M 字形的高低差(公尺) + """ + self.spacing = spacing + self.base_altitude = base_altitude + self.altitude_diff = altitude_diff + self.current_origin = None + self.converter = None + + def plan_formation_mission(self, + drone_gps_positions: List[Tuple[float, float, float]], + target_gps: Tuple[float, float, float]) -> Tuple[List[Tuple[float, float, float]], + List[Tuple[float, float, float]]]: + """ + 規劃兩階段隊形任務 + + Args: + drone_gps_positions: 當前無人機 GPS 位置列表 [(lat, lon, alt), ...] + target_gps: 目標點 GPS 座標 (lat, lon, alt) + + Returns: + stage1_gps: 階段 1(集合點)的 GPS 航點列表 + stage2_gps: 階段 2(目標點)的 GPS 航點列表 + origin: 中心點(參考原點)的 GPS 座標 (lat, lon, alt) + origin: 中心點(參考原點)的 GPS 座標 (lat, lon, alt) + """ + if len(drone_gps_positions) == 0: + raise ValueError("無人機位置列表不能為空") + + # ===== 步驟 1: 計算中央點並設為參考原點 ===== + center_lat = sum(pos[0] for pos in drone_gps_positions) / len(drone_gps_positions) + center_lon = sum(pos[1] for pos in drone_gps_positions) / len(drone_gps_positions) + center_alt = sum(pos[2] for pos in drone_gps_positions) / len(drone_gps_positions) + + self.current_origin = (center_lat, center_lon, center_alt) + self.converter = CoordinateConverter(center_lat, center_lon, center_alt) + + print(f"📍 參考原點: ({center_lat:.6f}, {center_lon:.6f}, {center_alt:.1f}m)") + + # ===== 步驟 2: 轉換到 Local 座標系 ===== + drone_local_positions = [] + for lat, lon, alt in drone_gps_positions: + x, y, z = self.converter.gps_to_local(lat, lon, alt) + drone_local_positions.append((x, y, z)) + + target_local = self.converter.gps_to_local( + target_gps[0], target_gps[1], target_gps[2] + ) + + # ===== 步驟 3: 在 Local 座標系中計算隊形 ===== + stage1_local, stage2_local = self._calculate_formation_local( + drone_local_positions, + target_local + ) + + # ===== 步驟 4: 轉回 GPS 座標 ===== + stage1_gps = [] + for x, y, z in stage1_local: + lat, lon, alt = self.converter.local_to_gps(x, y, z) + stage1_gps.append((lat, lon, alt)) + + stage2_gps = [] + for x, y, z in stage2_local: + lat, lon, alt = self.converter.local_to_gps(x, y, z) + stage2_gps.append((lat, lon, alt)) + + print(f"✅ 任務規劃完成: {len(stage1_gps)} 台無人機,2 階段飛行") + + # ================================================================================ + # 【修改】回傳中心點座標供地圖視覺化使用 + # ================================================================================ + return stage1_gps, stage2_gps, self.current_origin + # ================================================================================ + + def _calculate_formation_local(self, + drone_positions: List[Tuple[float, float, float]], + target_point: Tuple[float, float, float]) -> Tuple[List[Tuple[float, float, float]], + List[Tuple[float, float, float]]]: + """ + 在 Local 座標系中計算隊形 + + Args: + drone_positions: Local 座標的無人機位置 [(x, y, z), ...] + target_point: Local 座標的目標點 (x, y, z) + + Returns: + stage1_positions: 階段 1(中央點分佈)的 Local 座標 + stage2_positions: 階段 2(目標點分佈)的 Local 座標 + """ + N = len(drone_positions) + + # ===== 步驟 1: 計算中央點(在 Local 座標系中應該接近原點)===== + C_x = sum(pos[0] for pos in drone_positions) / N + C_y = sum(pos[1] for pos in drone_positions) / N + C_z = sum(pos[2] for pos in drone_positions) / N + + print(f" 中央點 Local: ({C_x:.2f}, {C_y:.2f}, {C_z:.2f})") + + # ===== 步驟 2: 計算方向向量(中央點 → 目標點)===== + T_x, T_y, T_z = target_point + V_x = T_x - C_x + V_y = T_y - C_y + + print(f" 方向向量: ({V_x:.2f}, {V_y:.2f})") + + # ===== 步驟 3: 計算垂直向量(逆時針旋轉 90°)===== + P_x = -V_y + P_y = V_x + + # 單位化垂直向量 + length = math.sqrt(P_x**2 + P_y**2) + if length < 0.01: # 避免除以零 + # 如果目標點與中央點重合,使用默認方向(向東) + P_x_unit, P_y_unit = 1.0, 0.0 + else: + P_x_unit = P_x / length + P_y_unit = P_y / length + + print(f" 垂直單位向量: ({P_x_unit:.3f}, {P_y_unit:.3f})") + + # ===== 步驟 4: 計算無人機在垂直方向的投影並排序 ===== + projections = [] + for i, pos in enumerate(drone_positions): + relative_x = pos[0] - C_x + relative_y = pos[1] - C_y + projection = relative_x * P_x_unit + relative_y * P_y_unit + projections.append((projection, i)) + + # 按投影值排序(保持左右順序) + projections.sort() + sorted_indices = [idx for _, idx in projections] + + print(f" 排序後的無人機索引: {sorted_indices}") + + # ===== 步驟 5: 分佈無人機位置 ===== + stage1_positions = [None] * N # 預分配列表 + stage2_positions = [None] * N + + for rank, original_idx in enumerate(sorted_indices): + # 計算相對中心的水平偏移 + offset = (rank - (N - 1) / 2) * self.spacing + + # 計算 M 字形高度(交替高低) + if rank % 2 == 0: + altitude = self.base_altitude + self.altitude_diff # 高 + else: + altitude = self.base_altitude - self.altitude_diff # 低 + + # 階段 1:在中央點附近分佈 + stage1_x = C_x + P_x_unit * offset + stage1_y = C_y + P_y_unit * offset + stage1_z = altitude + + # 階段 2:在目標點附近分佈(保持相同的相對位置) + stage2_x = T_x + P_x_unit * offset + stage2_y = T_y + P_y_unit * offset + stage2_z = altitude + + # 按照原始索引存儲(保持無人機 ID 順序) + stage1_positions[original_idx] = (stage1_x, stage1_y, stage1_z) + stage2_positions[original_idx] = (stage2_x, stage2_y, stage2_z) + + return stage1_positions, stage2_positions + + def update_parameters(self, spacing: Optional[float] = None, + base_altitude: Optional[float] = None, + altitude_diff: Optional[float] = None): + """ + 更新隊形參數 + + Args: + spacing: 無人機間距(公尺) + base_altitude: 基準飛行高度(公尺) + altitude_diff: M 字形的高低差(公尺) + """ + if spacing is not None: + self.spacing = spacing + print(f" 間距更新為: {spacing} m") + + if base_altitude is not None: + self.base_altitude = base_altitude + print(f" 基準高度更新為: {base_altitude} m") + + if altitude_diff is not None: + self.altitude_diff = altitude_diff + print(f" 高低差更新為: {altitude_diff} m") + + +# ===== 測試程式碼 ===== +if __name__ == "__main__": + print("=" * 60) + print("隊形任務規劃器測試") + print("=" * 60) + + # 模擬 5 台無人機的 GPS 位置 + drone_gps = [ + (24.123450, 120.567800, 100.0), # 無人機 0 + (24.123470, 120.567820, 102.0), # 無人機 1 + (24.123440, 120.567810, 98.0), # 無人機 2 + (24.123460, 120.567830, 101.0), # 無人機 3 + (24.123445, 120.567795, 99.0), # 無人機 4 + ] + + # 目標點 + target_gps = (24.123600, 120.568000, 105.0) + + # 建立規劃器 + planner = FormationPlanner( + spacing=5.0, # 5 公尺間距 + base_altitude=10.0, # 基準高度 10 公尺 + altitude_diff=2.0 # 高低差 2 公尺 + ) + + # 規劃任務 + print("\n開始規劃任務...") + stage1, stage2 = planner.plan_formation_mission(drone_gps, target_gps) + + # 顯示結果 + print("\n" + "=" * 60) + print("階段 1:集合點位置(GPS)") + print("=" * 60) + for i, (lat, lon, alt) in enumerate(stage1): + print(f"無人機 {i}: ({lat:.6f}, {lon:.6f}, {alt:.1f}m)") + + print("\n" + "=" * 60) + print("階段 2:目標點位置(GPS)") + print("=" * 60) + for i, (lat, lon, alt) in enumerate(stage2): + print(f"無人機 {i}: ({lat:.6f}, {lon:.6f}, {alt:.1f}m)") + + print("\n✅ 測試完成!") \ No newline at end of file diff --git a/src/GUI/overview_table.py b/src/GUI/overview_table.py new file mode 100644 index 0000000..11953f0 --- /dev/null +++ b/src/GUI/overview_table.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +from PyQt6.QtWidgets import QTableWidget, QTableWidgetItem, QHeaderView, QLabel +from PyQt6.QtCore import Qt + +class OverviewTable(QTableWidget): + """總覽表格,顯示所有無人機的狀態資訊""" + + def __init__(self, info_types, info_type_map, parent=None): + super().__init__(parent) + + self.info_types = info_types + self.info_type_map = info_type_map + self.drones = {} # 存儲無人機面板的引用 + + # 初始化表格 + self.setColumnCount(1) + self.setRowCount(len(self.info_types)) + self.setHorizontalHeaderLabels(["資訊"]) + header = self.horizontalHeader() + header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents) + self.verticalHeader().setVisible(False) + + # 設置第一列的資訊類型 + for i, txt in enumerate(self.info_types): + item = QTableWidgetItem(txt) + item.setFlags(Qt.ItemFlag.ItemIsEnabled) + item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) + self.setItem(i, 0, item) + + def set_drones(self, drones): + """設置無人機面板字典的引用""" + self.drones = drones + + def update_table(self, drone_id=None, field=None, value=None): + """更新總覽表格 + + Args: + drone_id: 無人機 ID + field: 欄位名稱 (如 'mode', 'altitude' 等) + value: 要更新的值 + """ + # 更新特定儲存格 + if drone_id and field and value: + if drone_id not in self.drones: + return + + col = 1 + list(self.drones.keys()).index(drone_id) + row = self.info_type_map.get(field, -1) + + if row == -1: + return # 無效的欄位 + + item = self.item(row, col) + if not item: + item = QTableWidgetItem() + self.setItem(row, col, item) + + item.setText(value) + item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) + + # 如果沒有指定更新,刷新整個表格 + if drone_id is None: + self.refresh_all() + + def refresh_all(self): + """刷新整個表格""" + cols = 1 + len(self.drones) + self.setColumnCount(cols) + headers = ["資訊"] + list(self.drones.keys()) + self.setHorizontalHeaderLabels(headers) + + for col, did in enumerate(self.drones, start=1): + panel = self.drones[did] + for field, row in self.info_type_map.items(): + lbl = panel.findChild(QLabel, f"{did}_{field}") + val = lbl.text() if lbl else "--" + val_item = QTableWidgetItem(val) + val_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) + self.setItem(row, col, val_item) + + def add_drone_column(self, drone_id): + """當新增無人機時,添加一列""" + if drone_id in self.drones: + self.refresh_all() + + def remove_drone_column(self, drone_id): + """當移除無人機時,刷新表格""" + if drone_id in self.drones: + self.refresh_all()