diff --git a/src/GUI/comm_panel.py b/src/GUI/comm_panel.py index 6259c8d..bd12794 100644 --- a/src/GUI/comm_panel.py +++ b/src/GUI/comm_panel.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, - QPushButton, QLineEdit) + QPushButton, QLineEdit, QComboBox) from PyQt6.QtCore import pyqtSignal +import glob +import os class CommPanel(QWidget): """通讯设置面板类""" @@ -13,12 +15,16 @@ class CommPanel(QWidget): ws_connection_added = pyqtSignal(str) # url ws_connection_toggled = pyqtSignal(dict, QPushButton, QLabel) # conn, btn, status_label ws_connection_removed = pyqtSignal(dict, QWidget) # conn, panel + serial_connection_added = pyqtSignal(str, int) # port, baudrate + serial_connection_toggled = pyqtSignal(dict, QPushButton, QLabel) # conn, btn, status_label + serial_connection_removed = pyqtSignal(dict, QWidget) # conn, panel status_message = pyqtSignal(str, int) # message, timeout def __init__(self, parent=None): super().__init__(parent) self.udp_connections = [] self.ws_connections = [] + self.serial_connections = [] self._init_ui() def _init_ui(self): @@ -65,7 +71,7 @@ class CommPanel(QWidget): """) self.udp_port_input = QLineEdit() - self.udp_port_input.setText("14540") + self.udp_port_input.setText("14550") self.udp_port_input.setPlaceholderText("Port") self.udp_port_input.setFixedWidth(80) self.udp_port_input.setStyleSheet(""" @@ -105,6 +111,118 @@ class CommPanel(QWidget): separator.setFixedHeight(20) layout.addWidget(separator) + # ========== Serial 區域 ========== + serial_title = QLabel("Serial") + serial_title.setStyleSheet(""" + color: #DDD; + font-size: 14px; + font-weight: bold; + padding: 5px; + background-color: #333; + border-radius: 4px; + """) + layout.addWidget(serial_title) + + # Serial 連接列表容器 + self.serial_list_widget = QWidget() + self.serial_list_layout = QVBoxLayout(self.serial_list_widget) + self.serial_list_layout.setContentsMargins(0, 0, 0, 0) + self.serial_list_layout.setSpacing(5) + layout.addWidget(self.serial_list_widget) + + # Serial 添加新連接區域 + add_serial_widget = QWidget() + add_serial_layout = QHBoxLayout(add_serial_widget) + add_serial_layout.setContentsMargins(0, 0, 0, 0) + + self.serial_port_combo = QComboBox() + self.serial_port_combo.setStyleSheet(""" + QComboBox { + background-color: #333; + color: #DDD; + border: 1px solid #555; + border-radius: 4px; + padding: 5px; + } + QComboBox::drop-down { + border: none; + } + QComboBox::down-arrow { + image: none; + border-left: 5px solid transparent; + border-right: 5px solid transparent; + border-top: 5px solid #DDD; + } + """) + self._refresh_serial_ports() + + refresh_ports_btn = QPushButton("↻") + refresh_ports_btn.setFixedWidth(35) + refresh_ports_btn.clicked.connect(self._refresh_serial_ports) + refresh_ports_btn.setToolTip("重新掃描串口") + refresh_ports_btn.setStyleSheet(""" + QPushButton { + background-color: #444; + color: #DDD; + border: none; + padding: 6px; + border-radius: 4px; + font-size: 16px; + } + QPushButton:hover { background-color: #555; } + """) + + self.serial_baudrate_combo = QComboBox() + self.serial_baudrate_combo.addItems(['9600', '19200', '38400', '57600', '115200']) + self.serial_baudrate_combo.setCurrentText('57600') + self.serial_baudrate_combo.setFixedWidth(100) + self.serial_baudrate_combo.setStyleSheet(""" + QComboBox { + background-color: #333; + color: #DDD; + border: 1px solid #555; + border-radius: 4px; + padding: 5px; + } + QComboBox::drop-down { + border: none; + } + QComboBox::down-arrow { + image: none; + border-left: 5px solid transparent; + border-right: 5px solid transparent; + border-top: 5px solid #DDD; + } + """) + + add_serial_btn = QPushButton("添加") + add_serial_btn.clicked.connect(self._handle_add_serial) + add_serial_btn.setStyleSheet(""" + QPushButton { + background-color: #4CAF50; + color: white; + border: none; + padding: 6px 12px; + border-radius: 4px; + min-width: 30px; + } + QPushButton:hover { background-color: #45a049; } + """) + + add_serial_layout.addWidget(QLabel("Port:", styleSheet="color: #DDD;")) + add_serial_layout.addWidget(self.serial_port_combo) + add_serial_layout.addWidget(refresh_ports_btn) + add_serial_layout.addWidget(QLabel("Baud:", styleSheet="color: #DDD;")) + add_serial_layout.addWidget(self.serial_baudrate_combo) + add_serial_layout.addWidget(add_serial_btn) + + layout.addWidget(add_serial_widget) + + # 分隔線 + separator = QWidget() + separator.setFixedHeight(20) + layout.addWidget(separator) + # ========== WebSocket 區域 ========== ws_title = QLabel("WebSocket") ws_title.setStyleSheet(""" @@ -229,6 +347,94 @@ class CommPanel(QWidget): # 清空輸入框 self.ws_url_input.clear() + def _refresh_serial_ports(self): + """重新掃描可用的串口""" + self.serial_port_combo.clear() + + # 掃描 Linux 下的串口設備 + ports = [] + + # 掃描 USB 串口 + usb_ports = glob.glob('/dev/ttyUSB*') + ports.extend(usb_ports) + + # 掃描 ACM 串口 + acm_ports = glob.glob('/dev/ttyACM*') + ports.extend(acm_ports) + + # 排序 + ports.sort() + + if ports: + self.serial_port_combo.addItems(ports) + else: + self.serial_port_combo.addItem("沒有找到串口") + self.serial_port_combo.setEnabled(False) + return + + self.serial_port_combo.setEnabled(True) + + def _handle_add_serial(self): + """處理添加 Serial 連接""" + port = self.serial_port_combo.currentText() + baudrate_text = self.serial_baudrate_combo.currentText() + + if not port or port == "沒有找到串口": + self.status_message.emit("請選擇有效的串口", 3000) + return + + try: + baudrate = int(baudrate_text) + except ValueError: + self.status_message.emit("波特率格式錯誤", 3000) + return + + # 檢查是否已存在相同連接 + for conn in self.serial_connections: + if conn['port'] == port: + self.status_message.emit("連接已存在", 3000) + return + + # 發送信號通知主窗口 + self.serial_connection_added.emit(port, baudrate) + + def _handle_add_ws(self): + """處理添加 WebSocket 連接""" + input_url = self.ws_url_input.text().strip() + + if not input_url: + self.status_message.emit("請輸入 WebSocket URL", 3000) + return + + # 自動添加 ws:// 前綴 + if not input_url.startswith('ws://') and not input_url.startswith('wss://'): + url = f'ws://{input_url}' + else: + url = input_url + + # 基本 URL 格式驗證 + try: + if '://' in url: + parts = url.split('://', 1) + if len(parts) == 2 and ':' not in parts[1]: + self.status_message.emit("URL 格式錯誤,需要包含端口號 (例如: 127.0.0.1:8756)", 3000) + return + except: + self.status_message.emit("URL 格式錯誤", 3000) + return + + # 檢查是否已存在相同連接 + for conn in self.ws_connections: + if conn['url'] == url: + self.status_message.emit("連接已存在", 3000) + return + + # 發送信號通知主窗口 + self.ws_connection_added.emit(url) + + # 清空輸入框 + self.ws_url_input.clear() + def create_udp_connection_panel(self, conn): """創建 UDP 連接面板""" panel = QWidget() @@ -395,4 +601,88 @@ class CommPanel(QWidget): def remove_ws_connection_from_list(self, conn): """從列表中移除 WebSocket 連接""" if conn in self.ws_connections: - self.ws_connections.remove(conn) \ No newline at end of file + self.ws_connections.remove(conn) + + def create_serial_connection_panel(self, conn): + """創建 Serial 連接面板""" + panel = QWidget() + panel.setStyleSheet(""" + QWidget { + background-color: #2A2A2A; + border-radius: 6px; + padding: 8px; + border: 1px solid #444; + } + """) + + layout = QHBoxLayout(panel) + layout.setContentsMargins(8, 8, 8, 8) + + # 連接資訊 + info_label = QLabel(f"{conn['name']} - {conn['port']} @ {conn['baudrate']}") + info_label.setStyleSheet("color: #DDD; font-size: 12px;") + + # 狀態指示器 + status_label = QLabel("●") + if conn.get('enabled', False): + status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") + status_label.setToolTip("運行中") + else: + status_label.setStyleSheet("color: #888; font-size: 16px;") + status_label.setToolTip("已停止") + + # 控制按鈕 + toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動") + toggle_btn.setFixedWidth(60) + toggle_btn.clicked.connect(lambda: self.serial_connection_toggled.emit(conn, toggle_btn, status_label)) + toggle_btn.setStyleSheet(""" + QPushButton { + background-color: #444; + color: #DDD; + border: none; + padding: 4px 8px; + border-radius: 3px; + font-size: 11px; + } + QPushButton:hover { background-color: #555; } + """) + + remove_btn = QPushButton("移除") + remove_btn.setFixedWidth(60) + remove_btn.clicked.connect(lambda: self.serial_connection_removed.emit(conn, panel)) + remove_btn.setStyleSheet(""" + QPushButton { + background-color: #d32f2f; + color: white; + border: none; + padding: 4px 8px; + border-radius: 3px; + font-size: 11px; + } + QPushButton:hover { background-color: #b71c1c; } + """) + + layout.addWidget(status_label) + layout.addWidget(info_label) + layout.addStretch() + layout.addWidget(toggle_btn) + layout.addWidget(remove_btn) + + # 儲存引用 + panel.connection = conn + panel.toggle_btn = toggle_btn + panel.status_label = status_label + + return panel + + def add_serial_panel(self, conn): + """添加 Serial 連接面板到列表""" + panel = self.create_serial_connection_panel(conn) + self.serial_list_layout.addWidget(panel) + self.serial_connections.append(conn) + return panel + + def remove_serial_connection_from_list(self, conn): + """從列表中移除 Serial 連接""" + if conn in self.serial_connections: + self.serial_connections.remove(conn) \ No newline at end of file diff --git a/src/GUI/communication.py b/src/GUI/communication.py index 45c960c..b97828b 100644 --- a/src/GUI/communication.py +++ b/src/GUI/communication.py @@ -11,7 +11,7 @@ import socket from pymavlink import mavutil from geometry_msgs.msg import Point, Vector3 from sensor_msgs.msg import BatteryState, NavSatFix, Imu -from std_msgs.msg import Float64 +from std_msgs.msg import Float64, String from mavros_msgs.msg import State, VfrHud from mavros_msgs.srv import CommandBool, CommandTOL @@ -20,12 +20,15 @@ class DroneSignals(QObject): class UDPMavlinkReceiver(threading.Thread): """UDP MAVLink 接收器""" - def __init__(self, ip, port, signals, connection_name): + def __init__(self, ip, port, signals, connection_name, monitor=None): super().__init__(daemon=True) self.ip = ip self.port = port self.signals = signals self.connection_name = connection_name + self.monitor = monitor # 保存 monitor 引用 + # UDP 使用原始 socket_id = 8 + self.socket_id = monitor.get_or_assign_socket_id('8') if monitor else 0 self.running = False self.sock = None @@ -61,7 +64,7 @@ class UDPMavlinkReceiver(threading.Thread): try: msg_type = msg.get_type() system_id = msg.get_srcSystem() - drone_id = f"s8_{system_id}" # 使用 s8_ 前綴表示 UDP 來源 + drone_id = f"s{self.socket_id}_{system_id}" if msg_type == "HEARTBEAT": mode = mavutil.mode_string_v10(msg) @@ -116,11 +119,13 @@ class UDPMavlinkReceiver(threading.Thread): }) elif msg_type == "VFR_HUD": - groundspeed = msg.groundspeed - heading = msg.heading self.signals.update_signal.emit('hud', drone_id, { - 'heading': heading, - 'groundspeed': groundspeed + 'airspeed': msg.airspeed, + 'groundspeed': msg.groundspeed, + 'heading': msg.heading, + 'throttle': msg.throttle, + 'alt': msg.alt, + 'climb': msg.climb }) except Exception as e: @@ -132,12 +137,15 @@ class UDPMavlinkReceiver(threading.Thread): class SerialMavlinkReceiver(threading.Thread): """串口 MAVLink 接收器""" - def __init__(self, port, baudrate, signals, connection_name): + def __init__(self, port, baudrate, signals, connection_name, monitor=None): super().__init__(daemon=True) self.port = port self.baudrate = baudrate self.signals = signals self.connection_name = connection_name + self.monitor = monitor # 保存 monitor 引用 + # Serial 使用原始 socket_id = 5 + self.socket_id = monitor.get_or_assign_socket_id('5') if monitor else 0 self.running = False self.mav = None @@ -184,7 +192,7 @@ class SerialMavlinkReceiver(threading.Thread): try: msg_type = msg.get_type() system_id = msg.get_srcSystem() - drone_id = f"s5_{system_id}" # 使用 serial_ 前綴表示串口來源 + drone_id = f"s{self.socket_id}_{system_id}" if msg_type == "HEARTBEAT": mode = mavutil.mode_string_v10(msg) @@ -239,11 +247,13 @@ class SerialMavlinkReceiver(threading.Thread): }) elif msg_type == "VFR_HUD": - groundspeed = msg.groundspeed - heading = msg.heading self.signals.update_signal.emit('hud', drone_id, { - 'heading': heading, - 'groundspeed': groundspeed + 'airspeed': msg.airspeed, + 'groundspeed': msg.groundspeed, + 'heading': msg.heading, + 'throttle': msg.throttle, + 'alt': msg.alt, + 'climb': msg.climb }) except Exception as e: @@ -255,12 +265,12 @@ class SerialMavlinkReceiver(threading.Thread): class WebSocketMavlinkReceiver(threading.Thread): """WebSocket MAVLink 接收器""" - def __init__(self, url, signals, connection_name): + def __init__(self, url, signals, connection_name, monitor=None): super().__init__(daemon=True) self.url = url self.signals = signals self.connection_name = connection_name - self.running = False + self.monitor = monitor # 保存 monitor 引用 self.socket_id = monitor.get_next_socket_id() if monitor else 0 # 一次性分配 socket_id self.running = False self.max_retries = 5 self.base_delay = 1.0 @@ -319,10 +329,10 @@ class WebSocketMavlinkReceiver(threading.Thread): def process_websocket_message(self, data): """處理 WebSocket 訊息""" try: - drone_id = data.get('system_id') - drone_id = f"s9_{drone_id}" if drone_id else None - if not drone_id: + system_id = data.get('system_id') + if not system_id: return + drone_id = f"s{self.socket_id}_{system_id}" # 模式 if 'mode' in data: @@ -371,6 +381,7 @@ class WebSocketMavlinkReceiver(threading.Thread): self.running = False class DroneMonitor(Node): + # Subscribe to drone ROS2 topics def __init__(self): super().__init__('drone_monitor') self.signals = DroneSignals() @@ -396,111 +407,112 @@ class DroneMonitor(Node): # ================================================================================ self.drone_gps = {} # {drone_id: {'lat': ..., 'lon': ..., 'alt': ...}} # ================================================================================ + + # ================================================================================ + # 【新增】Socket ID 重新分配機制 (從 0 開始) + # ================================================================================ + self.socket_id_mapping = {} # {原始socket_id: 重新分配的socket_id} + self.socket_id_counter = 0 # 當前分配到的最大socket_id + self.socket_id_lock = Lock() # 線程安全鎖 + # ================================================================================ + + # ================================================================================ + # 【新增】儲存 sys_id 到 actual_drone_id 的映射 (從 summary 獲取) + # ================================================================================ + self.sys_to_actual_id = {} # {sys_id: actual_drone_id} e.g. {'sys11': 's0_11'} + self.sys_to_socket_id = {} # {sys_id: assigned_socket_id} e.g. {'sys11': 0} + # ================================================================================ self.serial_receivers = [] # 主题检测定时器 self.create_timer(1.0, self.scan_topics) + + def get_or_assign_socket_id(self, original_socket_id): + """根據原始 socket_id 分配或獲取對應的 socket_id(從 0 開始連續分配) + 同一個原始 socket_id 會得到同一個分配的 ID + """ + original_socket_id = str(original_socket_id) + + with self.socket_id_lock: + if original_socket_id not in self.socket_id_mapping: + # 分配新的 socket_id + self.socket_id_mapping[original_socket_id] = self.socket_id_counter + print(f"🆕 Socket ID 映射: 原始 {original_socket_id} -> 分配 {self.socket_id_counter}") + self.socket_id_counter += 1 + + return self.socket_id_mapping[original_socket_id] def scan_topics(self): topics = self.get_topic_names_and_types() - drone_pattern = re.compile(r'/MavLinkBus/(s\d+_\d+)/(\w+)') + drone_pattern = re.compile(r'/fc_network/vehicle/(sys\d+)/(\w+)') found_drones = set() for topic_name, _ in topics: if match := drone_pattern.match(topic_name): - drone_id, topic_type = match.groups() - found_drones.add(drone_id) + sys_id, topic_type = match.groups() + found_drones.add(sys_id) with self.lock: - self.drone_topics.setdefault(drone_id, set()).add(topic_type) + self.drone_topics.setdefault(sys_id, set()).add(topic_type) - for drone_id in found_drones: - if not hasattr(self, f'drone_{drone_id}_subs'): - self.setup_drone(drone_id) + for sys_id in found_drones: + # 為每個 sys_id 分配 socket_id(如果還沒有分配) + # 注意:如果後續 summary 提供了 socket_id,會使用 summary 的映射覆蓋 + if sys_id not in self.sys_to_socket_id: + # 暫時所有 ROS2 topic 共享同一個原始 socket_id,等 summary 提供實際的 socket_id + self.sys_to_socket_id[sys_id] = self.get_or_assign_socket_id('0') + + if not hasattr(self, f'drone_{sys_id}_subs'): + self.setup_drone(sys_id) - def setup_drone(self, drone_id): - base_topic = f'/MavLinkBus/{drone_id}' + def setup_drone(self, sys_id): + # sys_id 格式: sys11, sys12, ... + base_topic = f'/fc_network/vehicle/{sys_id}' - # Add service clients - self.arm_clients[drone_id] = self.create_client( + # Add service clients (保留但可能無法使用,因為新 topic 可能沒有這些服務) + self.arm_clients[sys_id] = self.create_client( CommandBool, f'{base_topic}/cmd/arming' ) - self.takeoff_clients[drone_id] = self.create_client( + self.takeoff_clients[sys_id] = self.create_client( CommandTOL, f'{base_topic}/cmd/takeoff' ) # Add setpoint publisher - self.setpoint_pubs[drone_id] = self.create_publisher( + self.setpoint_pubs[sys_id] = self.create_publisher( Point, f'{base_topic}/setpoint_position/local', 10 ) subs = { - 'attitude': self.create_subscription( - Imu, - f'{base_topic}/attitude', - lambda msg, did=drone_id: self.attitude_callback(did, msg), - 10 - ), 'battery': self.create_subscription( BatteryState, f'{base_topic}/battery', - lambda msg, did=drone_id: self.battery_callback(did, msg), + lambda msg, sid=sys_id: self.battery_callback(sid, msg), 10 ), - 'global': self.create_subscription( + 'position': self.create_subscription( NavSatFix, - f'{base_topic}/global_position/global', - lambda msg, did=drone_id: self.gps_callback(did, msg), - 10 - ), - 'rel_alt': self.create_subscription( - Float64, - f'{base_topic}/global_position/rel_alt', - lambda msg, did=drone_id: self.altitude_callback(did, msg), - 10 - ), - 'local_pose': self.create_subscription( - Point, - f'{base_topic}/local_position/pose', - lambda msg, did=drone_id: self.local_pose_callback(did, msg), - 10 - ), - 'local_vel': self.create_subscription( - Vector3, - f'{base_topic}/local_position/velocity', - lambda msg, did=drone_id: self.local_vel_callback(did, msg), - 10 - ), - 'loss_rate': self.create_subscription( - Float64, - f'{base_topic}/packet_loss_rate', - lambda msg, did=drone_id: self.loss_rate_callback(did, msg), - 10 - ), - 'state': self.create_subscription( - State, - f'{base_topic}/state', - lambda msg, did=drone_id: self.state_callback(did, msg), + f'{base_topic}/position', + lambda msg, sid=sys_id: self.gps_callback(sid, msg), 10 ), - 'ping': self.create_subscription( - Float64, - f'{base_topic}/ping', - lambda msg, did=drone_id: self.ping_callback(did, msg), + 'summary': self.create_subscription( + String, + f'{base_topic}/summary', + lambda msg, sid=sys_id: self.summary_callback(sid, msg), 10 ), 'vfr_hud': self.create_subscription( VfrHud, f'{base_topic}/vfr_hud', - lambda msg, did=drone_id: self.hud_callback(did, msg), + lambda msg, sid=sys_id: self.hud_callback(sid, msg), 10 ) } - setattr(self, f'drone_{drone_id}_subs', subs) + setattr(self, f'drone_{sys_id}_subs', subs) async def arm_drone(self, drone_id, arm): if drone_id not in self.arm_clients: @@ -582,8 +594,14 @@ class DroneMonitor(Node): msg.angular_velocity.z) } - def battery_callback(self, drone_id, msg): - self.latest_data[(drone_id, 'battery')] = { + def battery_callback(self, sys_id, msg): + # 使用映射獲取實際的 drone_id + actual_drone_id = self.sys_to_actual_id.get(sys_id, None) + # 如果還沒有從 summary 獲取到映射,則不處理 + if actual_drone_id is None: + return + + self.latest_data[(actual_drone_id, 'battery')] = { 'voltage': msg.voltage } @@ -596,8 +614,62 @@ class DroneMonitor(Node): 'armed': msg.armed } - def gps_callback(self, drone_id, msg): - self.latest_data[(drone_id, 'gps')] = { + def summary_callback(self, sys_id, msg): + """處理 summary topic (JSON 格式)""" + try: + data = json.loads(msg.data) + mode = data.get('mode_name', 'UNKNOWN') + if mode in self.filtered_modes: + return + + # 從 summary 獲取原始 socket_id,並映射到分配的 socket_id + original_socket_id = data.get('socket_id') + if original_socket_id is not None: + # 使用原始 socket_id 獲取或分配統一的 socket_id + assigned_socket_id = self.get_or_assign_socket_id(original_socket_id) + else: + # 如果沒有 socket_id,使用 sys_to_socket_id 映射 + assigned_socket_id = self.sys_to_socket_id.get(sys_id, 0) + + sysid = data.get('sysid') + if sysid is not None: + actual_drone_id = f's{assigned_socket_id}_{sysid}' + + # ================================================================================ + # 【關鍵】保存 sys_id 到 actual_drone_id 的映射 + # ================================================================================ + self.sys_to_actual_id[sys_id] = actual_drone_id + # ================================================================================ + else: + # 如果沒有 sysid,使用 sys_id 中的數字 + sys_num = sys_id.replace('sys', '') + actual_drone_id = f's{assigned_socket_id}_{sys_num}' + self.sys_to_actual_id[sys_id] = actual_drone_id + + self.latest_data[(actual_drone_id, 'state')] = { + 'mode': mode, + 'armed': data.get('armed', False), + 'socket_id': original_socket_id, + 'sysid': sysid, + 'vehicle_type': data.get('vehicle_type'), + 'autopilot': data.get('autopilot'), + 'gps_fix': data.get('gps_fix'), + 'gps_fix_type': data.get('gps_fix'), + 'connected': data.get('connected') + } + except json.JSONDecodeError as e: + print(f"Error parsing summary JSON for {sys_id}: {e}") + except Exception as e: + print(f"Error in summary_callback for {sys_id}: {e}") + + def gps_callback(self, sys_id, msg): + # 使用映射獲取實際的 drone_id + actual_drone_id = self.sys_to_actual_id.get(sys_id, None) + # 如果還沒有從 summary 獲取到映射,則不處理 + if actual_drone_id is None: + return + + self.latest_data[(actual_drone_id, 'gps')] = { 'lat': msg.latitude, 'lon': msg.longitude, 'alt': msg.altitude @@ -606,7 +678,7 @@ class DroneMonitor(Node): # ================================================================================ # 【新增】儲存 GPS 資料到 drone_gps 字典 # ================================================================================ - self.drone_gps[drone_id] = { + self.drone_gps[actual_drone_id] = { 'lat': msg.latitude, 'lon': msg.longitude, 'alt': msg.altitude @@ -632,8 +704,14 @@ class DroneMonitor(Node): 'z': msg.z } - def hud_callback(self, drone_id, msg): - self.latest_data[(drone_id, 'hud')] = { + def hud_callback(self, sys_id, msg): + # 使用映射獲取實際的 drone_id + actual_drone_id = self.sys_to_actual_id.get(sys_id, None) + # 如果還沒有從 summary 獲取到映射,則不處理 + if actual_drone_id is None: + return + + self.latest_data[(actual_drone_id, 'hud')] = { 'airspeed': msg.airspeed, 'groundspeed': msg.groundspeed, 'heading': msg.heading, @@ -655,7 +733,7 @@ class DroneMonitor(Node): def start_serial_connection(self, port='/dev/ttyUSB0', baudrate=57600): """啟動串口 MAVLink 連接""" connection_name = f"Serial_{port.replace('/', '_')}" - receiver = SerialMavlinkReceiver(port, baudrate, self.signals, connection_name) + receiver = SerialMavlinkReceiver(port, baudrate, self.signals, connection_name, self) receiver.start() self.serial_receivers.append(receiver) print(f"Started serial connection on {port} at {baudrate} baud") diff --git a/src/GUI/gui.py b/src/GUI/gui.py index 94c6889..d73b08d 100644 --- a/src/GUI/gui.py +++ b/src/GUI/gui.py @@ -13,6 +13,7 @@ from communication import DroneMonitor, UDPMavlinkReceiver, WebSocketMavlinkRece from map_layout import DroneMap from drone_panel import DronePanel, SocketGroupPanel from comm_panel import CommPanel +from overview_table import OverviewTable # ================================================================================ # 【新增】導入任務規劃器 @@ -43,23 +44,6 @@ class ControlStationUI(QMainWindow): # 初始化UI self.drones = {} self.socket_groups = {} - self.info_types = ["模式", "ARM", "電壓", "高度", "Local", "Velocity", "地速", "航向", - "Roll", "Pitch", "Yaw", "丟包", "延遲"] - self.info_type_map = { - "mode": 0, - "armed": 1, - "battery": 2, - "altitude": 3, - "local": 4, - "velocity": 5, - "groundspeed": 6, - "heading": 7, - "roll": 8, - "pitch": 9, - "yaw": 10, - "loss_rate": 11, - "ping": 12 - } self.socket_colors = { '0': '#00BFFF', # 天藍色 (DeepSkyBlue) @@ -83,7 +67,8 @@ class ControlStationUI(QMainWindow): self.udp_receivers = [] self.udp_connections = [] self.ws_connections = [] - self.monitor.start_serial_connection('/dev/ttyUSB0', 57600) + self.serial_receivers = [] + self.serial_connections = [] # ================================================================================ # 【新增】初始化任務規劃器 @@ -119,29 +104,20 @@ class ControlStationUI(QMainWindow): self.left_tab.addTab(scroll, "無人載具") # — 分頁 2:Overview Table - self.overview_table = QTableWidget() - self.overview_table.setColumnCount(1) - self.overview_table.setRowCount(len(self.info_types)) - self.overview_table.setHorizontalHeaderLabels(["資訊"]) - header = self.overview_table.horizontalHeader() - header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents) - self.overview_table.verticalHeader().setVisible(False) - for i, txt in enumerate(self.info_types): - item = QTableWidgetItem(txt) - item.setFlags(Qt.ItemFlag.ItemIsEnabled) - item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) - self.overview_table.setItem(i, 0, item) - + self.overview_table = OverviewTable() self.left_tab.addTab(self.overview_table, "總覽") # — 分頁 3:通訊設定 self.comm_panel = CommPanel() self.comm_panel.udp_connection_added.connect(self.handle_udp_connection_added) self.comm_panel.ws_connection_added.connect(self.handle_ws_connection_added) + self.comm_panel.serial_connection_added.connect(self.handle_serial_connection_added) self.comm_panel.udp_connection_toggled.connect(self.toggle_udp_connection) self.comm_panel.ws_connection_toggled.connect(self.toggle_ws_connection) + self.comm_panel.serial_connection_toggled.connect(self.toggle_serial_connection) self.comm_panel.udp_connection_removed.connect(self.remove_udp_connection) self.comm_panel.ws_connection_removed.connect(self.remove_ws_connection) + self.comm_panel.serial_connection_removed.connect(self.remove_serial_connection) self.comm_panel.status_message.connect(lambda msg, timeout: self.statusBar().showMessage(msg, timeout)) self.left_tab.addTab(self.comm_panel, "通訊") @@ -278,6 +254,7 @@ class ControlStationUI(QMainWindow): # 添加地圖 right_layout.addWidget(self.drone_map.get_widget()) self.drone_map.get_gps_signal().connect(self.handle_map_click) + self.drone_map.get_drone_clicked_signal().connect(self.handle_drone_clicked) # Add widgets to splitter @@ -299,7 +276,7 @@ class ControlStationUI(QMainWindow): } # 启动接收器 - receiver = UDPMavlinkReceiver(ip, port, self.monitor.signals, new_conn['name']) + receiver = UDPMavlinkReceiver(ip, port, self.monitor.signals, new_conn['name'], self.monitor) receiver.start() self.udp_receivers.append(receiver) new_conn['receiver'] = receiver @@ -322,7 +299,7 @@ class ControlStationUI(QMainWindow): } # 启动接收器 - receiver = WebSocketMavlinkReceiver(url, self.monitor.signals, new_conn['name']) + receiver = WebSocketMavlinkReceiver(url, self.monitor.signals, new_conn['name'], self.monitor) receiver.start() self.monitor.ws_receivers.append(receiver) new_conn['receiver'] = receiver @@ -436,6 +413,69 @@ class ControlStationUI(QMainWindow): self.statusBar().showMessage(f"已移除 UDP 连接: {conn['ip']}:{conn['port']}", 3000) + def handle_serial_connection_added(self, port, baudrate): + """處理添加 Serial 連接""" + conn = { + 'name': f'Serial', + 'port': port, + 'baudrate': baudrate, + 'enabled': False, + 'receiver': None + } + + # 添加到列表 + self.serial_connections.append(conn) + + # 在 UI 中顯示 + panel = self.comm_panel.add_serial_panel(conn) + + self.statusBar().showMessage(f"已添加 Serial 连接: {port} @ {baudrate}", 3000) + + def toggle_serial_connection(self, conn, btn, status_label): + """切換 Serial 連接狀態""" + if conn.get('enabled', False): + # 停止連接 + if conn.get('receiver'): + conn['receiver'].stop() + conn['receiver'] = None + + conn['enabled'] = False + btn.setText("啟動") + status_label.setStyleSheet("color: #888; font-size: 16px;") + status_label.setToolTip("已停止") + self.statusBar().showMessage(f"已停止 Serial 連接: {conn['port']}", 3000) + else: + # 啟動連接 + try: + receiver = self.monitor.start_serial_connection(conn['port'], conn['baudrate']) + conn['receiver'] = receiver + conn['enabled'] = True + btn.setText("停止") + status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") + status_label.setToolTip("運行中") + self.statusBar().showMessage(f"已啟動 Serial 連接: {conn['port']}", 3000) + except Exception as e: + self.statusBar().showMessage(f"啟動 Serial 連接失敗: {str(e)}", 5000) + + def remove_serial_connection(self, conn, panel): + """移除 Serial 連接""" + # 停止連接 + if conn.get('enabled', False) and conn.get('receiver'): + conn['receiver'].stop() + + # 从列表移除 + if conn in self.serial_connections: + self.serial_connections.remove(conn) + + # 从 comm_panel 列表移除 + self.comm_panel.remove_serial_connection_from_list(conn) + + # 从 UI 移除 + panel.setParent(None) + panel.deleteLater() + + self.statusBar().showMessage(f"已移除 Serial 连接: {conn['port']}", 3000) + def create_socket_group_panel(self, socket_id): """創建 socket 分組面板""" color = self.socket_colors.get(socket_id, self.socket_colors['default']) @@ -540,7 +580,10 @@ class ControlStationUI(QMainWindow): cells = round(voltage / 3.95) # 計算電量百分比 - percentage = (voltage / cells - 3.7) / 0.5 * 100 + if cells == 0: + percentage = 0 + else: + percentage = (voltage / cells - 3.7) / 0.5 * 100 # 根據百分比設置顏色 if percentage < 20: @@ -562,9 +605,12 @@ class ControlStationUI(QMainWindow): elif msg_type == 'gps': lat, lon = data.get('lat', 0), data.get('lon', 0) self.drone_positions[drone_id] = (lat, lon) - text = f"{lat:.6f}°, {lon:.6f}°" - self.update_field(panel, drone_id, 'gps', text) - self.update_overview_table(drone_id, 'gps', text) + lon_text = f"{lon:.6f}°" + lat_text = f"{lat:.6f}°" + self.update_field(panel, drone_id, 'longitude', lon_text) + self.update_field(panel, drone_id, 'latitude', lat_text) + self.update_overview_table(drone_id, 'longitude', lon_text) + self.update_overview_table(drone_id, 'latitude', lat_text) # ================================================================================ # 【新增】同時儲存到 monitor.drone_gps(處理 UDP/WebSocket 的 GPS 資料) @@ -578,6 +624,10 @@ class ControlStationUI(QMainWindow): 'alt': alt } # ================================================================================ + + # 更新地圖上的無人機位置 + heading = self.drone_headings.get(drone_id, 0) # 如果沒有航向,使用 0 + self.drone_map.update_drone_position(drone_id, lat, lon, heading) elif msg_type == 'altitude': altitude = data.get('altitude', 0) @@ -594,15 +644,50 @@ class ControlStationUI(QMainWindow): heading = data.get('heading') self.drone_headings[drone_id] = heading groundspeed = data.get('groundspeed') + airspeed = data.get('airspeed') + throttle = data.get('throttle') + hud_alt = data.get('alt') + climb = data.get('climb') + heading_text = f"{heading:.1f}°" if isinstance(groundspeed, (int, float)): groundspeed_text = f"{groundspeed:.1f} m/s" else: groundspeed_text = "--" + + if isinstance(airspeed, (int, float)): + airspeed_text = f"{airspeed:.1f} m/s" + else: + airspeed_text = "--" + + if isinstance(throttle, (int, float)): + throttle_text = f"{throttle:.0f}%" + else: + throttle_text = "--" + + if isinstance(hud_alt, (int, float)): + hud_alt_text = f"{hud_alt:.1f} m" + else: + hud_alt_text = "--" + + if isinstance(climb, (int, float)): + climb_text = f"{climb:.1f} m/s" + else: + climb_text = "--" + self.update_field(panel, drone_id, 'heading', heading_text) self.update_field(panel, drone_id, 'groundspeed', groundspeed_text) self.update_overview_table(drone_id, 'heading', heading_text) self.update_overview_table(drone_id, 'groundspeed', groundspeed_text) + self.update_overview_table(drone_id, 'airspeed', airspeed_text) + self.update_overview_table(drone_id, 'throttle', throttle_text) + self.update_overview_table(drone_id, 'hud_alt', hud_alt_text) + self.update_overview_table(drone_id, 'climb', climb_text) + + # 如果有位置資訊,也更新地圖上的航向 + if drone_id in self.drone_positions: + lat, lon = self.drone_positions[drone_id] + self.drone_map.update_drone_position(drone_id, lat, lon, heading) elif msg_type == 'loss_rate': loss_rate = data.get('loss_rate', 0) @@ -631,12 +716,6 @@ class ControlStationUI(QMainWindow): self.update_overview_table(drone_id, 'pitch', pitch_text) self.update_overview_table(drone_id, 'yaw', yaw_text) - # 更新地圖上的無人機位置 - if drone_id in self.drone_positions and drone_id in self.drone_headings: - lat, lon = self.drone_positions[drone_id] - heading = self.drone_headings[drone_id] - self.drone_map.update_drone_position(drone_id, lat, lon, heading) - # 新增處理分組勾選的方法 def handle_group_selection(self, socket_id, state): """處理 socket 分組勾選狀態變化""" @@ -756,47 +835,23 @@ class ControlStationUI(QMainWindow): panel.update_field(field, text, color) def update_overview_table(self, drone_id=None, field=None, value=None): - # Ensure the widget is available + """更新總覽表格""" if not hasattr(self, 'overview_table') or self.overview_table is None: return - - # Update a specific cell in the overview table - if drone_id and field and value: - col = 1 + list(self.drones.keys()).index(drone_id) - row = self.info_type_map.get(field, -1) # Get row from field mapping - - if row == -1: - return # Invalid field, exit - - item = self.overview_table.item(row, col) - if not item: - item = QTableWidgetItem() - self.overview_table.setItem(row, col, item) - - item.setText(value) # Update the cell's text - item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) # Center the text - - # If no specific update, refresh entire table - if drone_id is None: - cols = 1 + len(self.drones) - self.overview_table.setColumnCount(cols) - headers = ["資訊"] + list(self.drones.keys()) - self.overview_table.setHorizontalHeaderLabels(headers) - - for col, did in enumerate(self.drones, start=1): - panel = self.drones[did] - for field, row in self.info_type_map.items(): - lbl = panel.findChild(QLabel, f"{did}_{field}") - val = lbl.text() if lbl else "--" - val_item = QTableWidgetItem(val) - val_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) - self.overview_table.setItem(row, col, val_item) + + # 更新無人機引用 + self.overview_table.set_drones(self.drones) + # 委託給 OverviewTable 處理 + self.overview_table.update_table(drone_id, field, value) def get_socket_id(self, drone_id): - """從 drone_id 提取 socket_id (例如 's9_1' -> '9')""" + """從 drone_id 提取 socket_id (例如 's0_1' -> '0')""" import re - match = re.match(r's(\d+)_\d+', drone_id) - return match.group(1) if match else 'unknown' + match = re.match(r's(\d+)_(\d+)', drone_id) + if not match: + return 'unknown' + + return match.group(1) def add_drone(self, drone_id): if drone_id in self.drones: @@ -978,6 +1033,21 @@ class ControlStationUI(QMainWindow): import traceback traceback.print_exc() + def handle_drone_clicked(self, drone_id): + """ + 處理地圖上無人機被點擊事件,切換該無人機的選取狀態 + + Args: + drone_id: 無人機 ID + """ + print(f"地圖上點擊無人機: {drone_id}") + + if drone_id in self.drones: + panel = self.drones[drone_id] + checkbox = panel.get_checkbox() + # 切換勾選狀態 + checkbox.setChecked(not checkbox.isChecked()) + def show_planned_waypoints(self): """ 顯示規劃的航點(在終端輸出) diff --git a/src/GUI/map_layout.py b/src/GUI/map_layout.py index cfb3ba0..c77c1fb 100644 --- a/src/GUI/map_layout.py +++ b/src/GUI/map_layout.py @@ -32,24 +32,120 @@ class DroneMap: @@ -58,6 +154,25 @@ class DroneMap:
+
+ +
+
+
+ + + +
+
+ 中心點: + 未設定 +
+
+ 目標點: + 未設定 +
+ +
- - - - -
- - - - ''' - self.map_view.setHtml(inline_html) - self.map_view.loadFinished.connect(self.on_map_loaded) - - main_splitter.addWidget(left_panel) - main_splitter.addWidget(self.map_view) - main_splitter.setSizes([400, 1000]) - - self.setCentralWidget(main_splitter) + QPushButton:hover { background-color: #555; } + """) + upper_buttons.addWidget(btn) + upper_buttons.addStretch() - def on_map_loaded(self, ok: bool): - if ok: - self.map_loaded = True - else: - print("⚠️ 地图页加载失败") + # 模式切換區域 + mode_layout = QHBoxLayout() + mode_label = QLabel("模式:") + mode_label.setStyleSheet("color: #DDD; min-width: 40px;") - def create_drone_panel(self, drone_id): - panel = QWidget() - panel.setObjectName(f"panel_{drone_id}") - panel.setStyleSheet(""" - QWidget#panel_%s { - background-color: #2A2A2A; - border-radius: 8px; - margin: 6px; - padding: 10px; - border: 1px solid #444; + from PyQt6.QtWidgets import QComboBox + self.mode_combo = QComboBox() + self.mode_combo.addItems(["AUTO", "GUIDED", "LOITER", "LAND"]) + self.mode_combo.setCurrentIndex(1) + self.mode_combo.setStyleSheet(""" + QComboBox { background-color: #333; color: #DDD; border-radius: 3px; padding: 2px 10px;} + """) + + batch_mode_btn = QPushButton("切換") + batch_mode_btn.clicked.connect(self.handle_batch_mode_change) + batch_mode_btn.setStyleSheet(""" + QPushButton { + background-color: #444; + color: #DDD; + border: none; + padding: 8px 12px; + border-radius: 4px; + min-width: 80px; } - QLabel { + QPushButton:hover { background-color: #555; } + """) + mode_layout.addWidget(mode_label) + mode_layout.addWidget(self.mode_combo) + mode_layout.addWidget(batch_mode_btn) + mode_layout.addStretch() + + # 座標輸入區域 + coord_widget = QWidget() + coord_layout = QHBoxLayout(coord_widget) + + self.x_input = QLineEdit() + self.y_input = QLineEdit() + self.z_input = QLineEdit() + + for input_field in [self.x_input, self.y_input, self.z_input]: + input_field.setFixedWidth(60) + input_field.setStyleSheet(""" + QLineEdit { + background-color: #333; + color: #DDD; + border: 1px solid #555; + border-radius: 4px; + padding: 3px; + } + """) + + coord_layout.addWidget(QLabel("X:", styleSheet="color: #DDD;")) + coord_layout.addWidget(self.x_input) + coord_layout.addWidget(QLabel("Y:", styleSheet="color: #DDD;")) + coord_layout.addWidget(self.y_input) + coord_layout.addWidget(QLabel("Z:", styleSheet="color: #DDD;")) + coord_layout.addWidget(self.z_input) + + setpoint_btn = QPushButton("Setpoint") + setpoint_btn.clicked.connect(self.handle_setpoint_selected) + setpoint_btn.setStyleSheet(""" + QPushButton { + background-color: #444; color: #DDD; - font-size: 12px; - padding: 2px; + border: none; + padding: 8px 12px; + border-radius: 4px; + min-width: 80px; } - """ % drone_id) - - layout = QVBoxLayout(panel) - layout.setContentsMargins(8, 8, 8, 8) - layout.setSpacing(4) - - # 顶部标题栏 - header = QWidget() - header_layout = QHBoxLayout(header) - header_layout.setContentsMargins(0, 0, 0, 0) - - # ID显示 - id_label = QLabel(drone_id) - id_label.setStyleSheet(""" - font-weight: bold; - font-size: 14px; - color: #7FFFD4; - min-width: 80px; + QPushButton:hover { background-color: #555; } """) + + lower_control = QHBoxLayout() + lower_control.addWidget(coord_widget) + lower_control.addWidget(setpoint_btn) + lower_control.addStretch() + + # 組裝批次控制 layout + batch_control_layout.addLayout(upper_buttons) + batch_control_layout.addLayout(mode_layout) + batch_control_layout.addLayout(lower_control) + + # 將批次控制 layout 添加到右側容器 + right_layout.addLayout(batch_control_layout) - # 状态指示灯 - status_light = QLabel("●") - status_light.setStyleSheet("color: #00FF00; font-size: 16px;") + # 添加地圖 + right_layout.addWidget(self.drone_map.get_widget()) + self.drone_map.get_gps_signal().connect(self.handle_map_click) + + + # Add widgets to splitter + main_splitter.addWidget(self.left_tab) + main_splitter.addWidget(right_container) + main_splitter.setSizes([400, 1000]) - header_layout.addWidget(id_label) - header_layout.addWidget(status_light) - header_layout.addStretch() + self.setCentralWidget(main_splitter) + + def handle_map_click(self, lat, lon): + print(f"地圖點擊位置: {lat:.6f}, {lon:.6f}") + + def handle_udp_connection_added(self, ip, port): + """处理 UDP 连接添加信号""" + # 创建新连接 + new_conn = { + 'name': f'UDP {len(self.udp_connections) + 1}', + 'ip': ip, + 'port': port, + 'enabled': True + } - layout.addWidget(header) + # 启动接收器 + receiver = UDPMavlinkReceiver(ip, port, self.monitor.signals, new_conn['name']) + receiver.start() + self.udp_receivers.append(receiver) + new_conn['receiver'] = receiver - # 数据字段(带标签) - self.create_data_row(layout, "模式:", f"{drone_id}_mode", "--") - self.create_data_row(layout, "電壓:", f"{drone_id}_battery", "--") - self.create_data_row(layout, "GPS:", f"{drone_id}_gps", "--") - self.create_data_row(layout, "高度:", f"{drone_id}_altitude", "--") - self.create_data_row(layout, "Local:", f"{drone_id}_local", "--") - self.create_data_row(layout, "HUD:", f"{drone_id}_hud", "--") + self.udp_connections.append(new_conn) - return panel + # 添加到 UI + self.comm_panel.add_udp_panel(new_conn) + + self.statusBar().showMessage(f"已添加 UDP 连接: {ip}:{port}", 3000) + print(f"UDP MAVLink receiver added and started on {ip}:{port}") - def create_data_row(self, layout, title, object_name, default): - row = QWidget() - hbox = QHBoxLayout(row) - hbox.setContentsMargins(0, 0, 0, 0) + def handle_ws_connection_added(self, url): + """处理 WebSocket 连接添加信号""" + # 创建新连接 + new_conn = { + 'name': f'WS {len(self.ws_connections) + 1}', + 'url': url, + 'enabled': True + } - # 标题标签 - title_label = QLabel(title) - title_label.setStyleSheet("color: #888; min-width: 80px;") - title_label.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Preferred) + # 启动接收器 + receiver = WebSocketMavlinkReceiver(url, self.monitor.signals, new_conn['name']) + receiver.start() + self.monitor.ws_receivers.append(receiver) + new_conn['receiver'] = receiver + + self.ws_connections.append(new_conn) + + # 添加到 UI + self.comm_panel.add_ws_panel(new_conn) + + self.statusBar().showMessage(f"已添加 WebSocket 连接: {url}", 3000) + print(f"WebSocket receiver added and started: {url}") + + def create_drone_panel(self, drone_id): + """創建無人機面板""" + panel = DronePanel(drone_id) - # 数据标签 - value_label = QLabel(default) - value_label.setObjectName(object_name) - value_label.setWordWrap(True) - value_label.setStyleSheet("margin-left: 10px;") + # 連接信號 + panel.mode_change_requested.connect(self.handle_mode_change) + panel.arm_requested.connect(self.handle_arm) + panel.takeoff_requested.connect(self.handle_takeoff) + panel.setpoint_requested.connect(self.handle_single_setpoint) + panel.selection_changed.connect(self.handle_drone_selection) - hbox.addWidget(title_label) - hbox.addWidget(value_label) - layout.addWidget(row) + return panel + + def toggle_ws_connection(self, conn, btn, status_label): + """切換 WebSocket 連接狀態""" + if conn.get('enabled', False): + # 停止連接 + if 'receiver' in conn and conn['receiver']: + conn['receiver'].stop() + conn['enabled'] = False + btn.setText("啟動") + status_label.setStyleSheet("color: #888; font-size: 16px;") + status_label.setToolTip("已停止") + self.statusBar().showMessage(f"已停止 WebSocket 連接: {conn['url']}", 3000) + else: + # 啟動連接 + receiver = WebSocketMavlinkReceiver(conn['url'], self.monitor.signals, conn['name']) + receiver.start() + self.monitor.ws_receivers.append(receiver) + conn['receiver'] = receiver + conn['enabled'] = True + btn.setText("停止") + status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") + status_label.setToolTip("運行中") + self.statusBar().showMessage(f"已啟動 WebSocket 連接: {conn['url']}", 3000) + + def remove_ws_connection(self, conn, panel): + """移除 WebSocket 连接""" + # 停止接收器 + if 'receiver' in conn and conn['receiver']: + conn['receiver'].stop() + if conn['receiver'] in self.monitor.ws_receivers: + self.monitor.ws_receivers.remove(conn['receiver']) + + # 从列表移除 + if conn in self.ws_connections: + self.ws_connections.remove(conn) + + # 从 comm_panel 列表移除 + self.comm_panel.remove_ws_connection_from_list(conn) + + # 从 UI 移除 + panel.setParent(None) + panel.deleteLater() + + self.statusBar().showMessage(f"已移除 WebSocket 连接: {conn['url']}", 3000) + + def toggle_udp_connection(self, conn, btn, status_label): + """切換 UDP 連接狀態""" + if conn.get('enabled', False): + # 停止連接 + if 'receiver' in conn and conn['receiver']: + conn['receiver'].stop() + conn['enabled'] = False + btn.setText("啟動") + status_label.setStyleSheet("color: #888; font-size: 16px;") + status_label.setToolTip("已停止") + self.statusBar().showMessage(f"已停止 UDP 連接: {conn['ip']}:{conn['port']}", 3000) + else: + # 啟動連接 + receiver = UDPMavlinkReceiver(conn['ip'], conn['port'], self.monitor.signals, conn['name']) + receiver.start() + self.udp_receivers.append(receiver) + conn['receiver'] = receiver + conn['enabled'] = True + btn.setText("停止") + status_label.setStyleSheet("color: #4CAF50; font-size: 16px;") + status_label.setToolTip("運行中") + self.statusBar().showMessage(f"已啟動 UDP 連接: {conn['ip']}:{conn['port']}", 3000) + + def remove_udp_connection(self, conn, panel): + """移除 UDP 连接""" + # 停止接收器 + if 'receiver' in conn and conn['receiver']: + conn['receiver'].stop() + if conn['receiver'] in self.udp_receivers: + self.udp_receivers.remove(conn['receiver']) + + # 从列表移除 + if conn in self.udp_connections: + self.udp_connections.remove(conn) + + # 从 comm_panel 列表移除 + self.comm_panel.remove_udp_connection_from_list(conn) + + # 从 UI 移除 + panel.setParent(None) + panel.deleteLater() + + self.statusBar().showMessage(f"已移除 UDP 连接: {conn['ip']}:{conn['port']}", 3000) + + def create_socket_group_panel(self, socket_id): + """創建 socket 分組面板""" + color = self.socket_colors.get(socket_id, self.socket_colors['default']) + panel = SocketGroupPanel(socket_id, color) + panel.group_selection_changed.connect(self.handle_group_selection) + return panel + + def handle_mode_change(self, drone_id): + """處理單個無人機的模式切換""" + mode = self.mode_combo.currentText() + loop = asyncio.get_event_loop() + future = self.monitor.set_mode(drone_id, mode) + loop.create_task(self.handle_service_response(future, f"切換模式 {mode} {drone_id}")) + + def handle_arm(self, drone_id): + loop = asyncio.get_event_loop() + arm_state = not self.monitor.get_arm_state(drone_id) # Toggle arm state + future = self.monitor.arm_drone(drone_id, arm_state) + loop.create_task(self.handle_service_response(future, f"{'解鎖' if arm_state else '上鎖'} {drone_id}")) + + def handle_takeoff(self, drone_id): + loop = asyncio.get_event_loop() + future = self.monitor.takeoff_drone(drone_id, 10.0) # 10 meters default + loop.create_task(self.handle_service_response(future, f"起飛 {drone_id}")) + + def handle_setpoint_selected(self): + """處理發送 setpoint 命令""" + try: + x = float(self.x_input.text() or '0') + y = float(self.y_input.text() or '0') + z = float(self.z_input.text() or '0') + + for drone_id in self.monitor.selected_drones: + if self.monitor.send_setpoint(drone_id, x, y, z): + self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000) + else: + self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000) + except ValueError: + self.statusBar().showMessage("座標格式錯誤", 3000) + + def handle_single_setpoint(self, drone_id): + """處理單個無人機的 setpoint 命令""" + try: + x = float(self.x_input.text() or '0') + y = float(self.y_input.text() or '0') + z = float(self.z_input.text() or '0') + + if self.monitor.send_setpoint(drone_id, x, y, z): + self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000) + else: + self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000) + except ValueError: + self.statusBar().showMessage("座標格式錯誤", 3000) + + async def handle_service_response(self, future, action): + try: + result = await future + if result: + self.statusBar().showMessage(f"{action} 成功", 3000) + else: + self.statusBar().showMessage(f"{action} 失敗", 3000) + except Exception as e: + self.statusBar().showMessage(f"{action} 錯誤: {str(e)}", 3000) def update_ui(self, msg_type, drone_id, data): - if msg_type == 'new_drone': + # 檢查是否為新無人機,若是則加入無人機面板 + if drone_id not in self.drones: self.add_drone(drone_id) return - + + # 確認無人機面板存在 if not (panel := self.drones.get(drone_id)): return - - if msg_type == 'mode': - self.update_field(panel, drone_id, 'mode', f"{data}", - '#FF5555' if '返航' in data else '#55FF55') - + + # 更新特定欄位並在總覽頁面更新 + if msg_type == 'state': + mode = data.get('mode', 'UNKNOWN') + armed = data.get('armed', None) + mode_color = '#FF5555' if any(x in mode.upper() for x in ['RTL', '返航', 'EMERGENCY']) else '#55FF55' + if armed is True: + arm_text = "ARMED" + arm_color = '#55FF55' + elif armed is False: + arm_text = "DISARMED" + arm_color = '#FF5555' + else: + arm_text = "--" + arm_color = '#AAAAAA' # 未知狀態 + + # 更新無人機面板欄位 + self.update_field(panel, drone_id, 'mode', mode, mode_color) + self.update_field(panel, drone_id, 'armed', arm_text, arm_color) + + # 更新總覽頁面欄位 + self.update_overview_table(drone_id, 'mode', mode) + self.update_overview_table(drone_id, 'armed', arm_text) + elif msg_type == 'battery': - voltage = data.get('voltage', 0) - self.update_field(panel, drone_id, 'battery', - f"{voltage:.1f} V", - '#FF6464' if voltage < 12 else '#FFFFFF') - + voltage = data.get('voltage', 16) + + # 判斷電池節數 + cells = round(voltage / 3.95) + + # 計算電量百分比 + percentage = (voltage / cells - 3.7) / 0.5 * 100 + + # 根據百分比設置顏色 + if percentage < 20: + voltage_color = '#FF6464' # 紅色 (低電量) + elif percentage < 50: + voltage_color = '#FFA500' # 橘色 (中低電量) + else: + voltage_color = '#FFFFFF' # 白色 (正常電量) + + percentage = data.get('percentage', percentage) + + # 顯示總電壓、電池節數、單節電壓和百分比 + text = f"{percentage:.0f}%" + vol = f"{voltage:.2f}V" + + self.update_field(panel, drone_id, 'battery', text, voltage_color) + self.update_overview_table(drone_id, 'battery', vol) + elif msg_type == 'gps': - lat, lon = data['lat'], data['lon'] + lat, lon = data.get('lat', 0), data.get('lon', 0) self.drone_positions[drone_id] = (lat, lon) - text = (f"緯度: {lat:.6f}°\n" - f"經度: {lon:.6f}°") + text = f"{lat:.6f}°, {lon:.6f}°" self.update_field(panel, drone_id, 'gps', text) + self.update_overview_table(drone_id, 'gps', text) elif msg_type == 'altitude': - text = (f"{data['altitude']:.1f} m") + altitude = data.get('altitude', 0) + text = f"{altitude:.1f} m" self.update_field(panel, drone_id, 'altitude', text) - + self.update_overview_table(drone_id, 'altitude', text) + elif msg_type == 'local_pose': - text = (f"({data['x']:.1f}, {data['y']:.1f}, {data['z']:.1f})") + text = f"{data['x']:.1f}, {data['y']:.1f}" self.update_field(panel, drone_id, 'local', text) - + self.update_overview_table(drone_id, 'local', text) + elif msg_type == 'hud': - heading = data['heading'] - text = (f"地速: {data['groundspeed']:.1f} m/s\n" - f"航向: {heading:.1f}°") - self.update_field(panel, drone_id, 'hud', text) - - if self.map_loaded and drone_id in self.drone_positions: - lat, lon = self.drone_positions[drone_id] - js = f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});" - self.map_view.page().runJavaScript(js) - ''' + heading = data.get('heading') + self.drone_headings[drone_id] = heading + groundspeed = data.get('groundspeed') + heading_text = f"{heading:.1f}°" + if isinstance(groundspeed, (int, float)): + groundspeed_text = f"{groundspeed:.1f} m/s" + else: + groundspeed_text = "--" + self.update_field(panel, drone_id, 'heading', heading_text) + self.update_field(panel, drone_id, 'groundspeed', groundspeed_text) + self.update_overview_table(drone_id, 'heading', heading_text) + self.update_overview_table(drone_id, 'groundspeed', groundspeed_text) + + elif msg_type == 'loss_rate': + loss_rate = data.get('loss_rate', 0) + text = f"{loss_rate:.1f}%" + self.update_field(panel, drone_id, 'loss_rate', text) + self.update_overview_table(drone_id, 'loss_rate', text) + + elif msg_type == 'ping': + ping = data.get('ping', 0) + text = f"{ping:.1f} ms" + self.update_field(panel, drone_id, 'ping', text) + self.update_overview_table(drone_id, 'ping', text) + elif msg_type == 'velocity': - text = (f"VX: {data['vx']:.1f} m/s\n" - f"VY: {data['vy']:.1f} m/s\n" - f"VZ: {data['vz']:.1f} m/s") - self.update_field(panel, drone_id, 'velocity', text) - + text = f"{data['vx']:.1f}, {data['vy']:.1f}" + self.update_overview_table(drone_id, 'velocity', text) + elif msg_type == 'attitude': - text = (f"Roll: {data['roll']:.1f}°\n" - f"Pitch: {data['pitch']:.1f}°\n" - f"Yaw: {data['yaw']:.1f}°") - self.update_field(panel, drone_id, 'attitude', text) - ''' + roll = data.get('roll', 0) + pitch = data.get('pitch', 0) + yaw = data.get('yaw', 0) + roll_text = f"{roll:.1f}°" + pitch_text = f"{pitch:.1f}°" + yaw_text = f"{yaw:.1f}°" + self.update_overview_table(drone_id, 'roll', roll_text) + self.update_overview_table(drone_id, 'pitch', pitch_text) + self.update_overview_table(drone_id, 'yaw', yaw_text) + + # 更新地圖上的無人機位置 + if drone_id in self.drone_positions and drone_id in self.drone_headings: + lat, lon = self.drone_positions[drone_id] + heading = self.drone_headings[drone_id] + self.drone_map.update_drone_position(drone_id, lat, lon, heading) + + # 新增處理分組勾選的方法 + def handle_group_selection(self, socket_id, state): + """處理 socket 分組勾選狀態變化""" + # 獲取該分組下的所有無人機 + group_drones = [did for did in self.drones.keys() + if self.get_socket_id(did) == socket_id] + + # 根據分組勾選狀態更新所有該分組的無人機勾選狀態 + is_checked = state == Qt.CheckState.Checked.value + + for drone_id in group_drones: + checkbox = self.drones[drone_id].get_checkbox() + if checkbox: + # 暫時斷開信號連接,避免遞迴觸發 + checkbox.blockSignals(True) + checkbox.setChecked(is_checked) + checkbox.blockSignals(False) + + # 手動更新選中集合 + if is_checked: + self.monitor.selected_drones.add(drone_id) + else: + self.monitor.selected_drones.discard(drone_id) + + def handle_drone_selection(self, drone_id, state): + """處理個別無人機勾選狀態變化""" + if state == Qt.CheckState.Checked.value: + self.monitor.selected_drones.add(drone_id) + else: + self.monitor.selected_drones.discard(drone_id) + + # 更新對應 socket 分組的勾選狀態 + socket_id = self.get_socket_id(drone_id) + self.update_group_checkbox_state(socket_id) + + # 新增更新分組勾選框狀態的方法 + def update_group_checkbox_state(self, socket_id): + """更新指定 socket 分組的勾選框狀態""" + # 獲取該分組下的所有無人機 + group_drones = [did for did in self.drones.keys() + if self.get_socket_id(did) == socket_id] + + if not group_drones: + return + + # 檢查該分組內有多少無人機被勾選 + checked_count = sum(1 for did in group_drones + if self.drones[did].is_checked()) + + # 獲取分組勾選框 + if socket_id in self.socket_groups: + group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox") + if group_checkbox: + # 暫時斷開信號連接 + group_checkbox.blockSignals(True) + + if checked_count == 0: + # 沒有無人機被勾選 + group_checkbox.setCheckState(Qt.CheckState.Unchecked) + elif checked_count == len(group_drones): + # 所有無人機都被勾選 + group_checkbox.setCheckState(Qt.CheckState.Checked) + else: + # 部分無人機被勾選,顯示半勾選狀態 + group_checkbox.setCheckState(Qt.CheckState.PartiallyChecked) + + # 重新連接信號 + group_checkbox.blockSignals(False) + + def handle_select_all(self): + """處理全選按鈕 - 支援分組結構""" + # 檢查是否所有無人機都已被選中 + all_selected = all( + self.drones[drone_id].is_checked() + for drone_id in self.drones + ) + + # 如果全部已選中,則取消全選;否則全選 + new_state = not all_selected + + # 更新所有勾選框狀態(無人機和分組) + for drone_id in self.drones: + self.drones[drone_id].set_checked(new_state) + + # 更新所有分組勾選框狀態 + for socket_id in self.socket_groups: + group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox") + if group_checkbox: + group_checkbox.blockSignals(True) + group_checkbox.setChecked(new_state) + group_checkbox.blockSignals(False) + + def handle_arm_selected(self): + """處理批次解鎖""" + loop = asyncio.get_event_loop() + for drone_id in self.monitor.selected_drones: + future = self.monitor.arm_drone(drone_id, True) + loop.create_task(self.handle_service_response(future, f"批次解鎖 {drone_id}")) + + def handle_takeoff_selected(self): + """處理批次起飛""" + loop = asyncio.get_event_loop() + for drone_id in self.monitor.selected_drones: + future = self.monitor.takeoff_drone(drone_id, 10.0) + loop.create_task(self.handle_service_response(future, f"批次起飛 {drone_id}")) + + def handle_batch_mode_change(self): + mode = self.mode_combo.currentText() + loop = asyncio.get_event_loop() + for drone_id in self.monitor.selected_drones: + future = self.monitor.set_mode(drone_id, mode) + loop.create_task(self.handle_service_response(future, f"{drone_id} 切換模式 {mode}")) + def update_field(self, panel, drone_id, field, text, color=None): - if label := panel.findChild(QLabel, f"{drone_id}_{field}"): - label.setText(text) - if color: - label.setStyleSheet(f"color: {color};") + """更新面板中的指定欄位""" + if isinstance(panel, DronePanel): + panel.update_field(field, text, color) + + def update_overview_table(self, drone_id=None, field=None, value=None): + # Ensure the widget is available + if not hasattr(self, 'overview_table') or self.overview_table is None: + return + + # Update a specific cell in the overview table + if drone_id and field and value: + col = 1 + list(self.drones.keys()).index(drone_id) + row = self.info_type_map.get(field, -1) # Get row from field mapping + + if row == -1: + return # Invalid field, exit + + item = self.overview_table.item(row, col) + if not item: + item = QTableWidgetItem() + self.overview_table.setItem(row, col, item) + + item.setText(value) # Update the cell's text + item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) # Center the text + + # If no specific update, refresh entire table + if drone_id is None: + cols = 1 + len(self.drones) + self.overview_table.setColumnCount(cols) + headers = ["資訊"] + list(self.drones.keys()) + self.overview_table.setHorizontalHeaderLabels(headers) + + for col, did in enumerate(self.drones, start=1): + panel = self.drones[did] + for field, row in self.info_type_map.items(): + lbl = panel.findChild(QLabel, f"{did}_{field}") + val = lbl.text() if lbl else "--" + val_item = QTableWidgetItem(val) + val_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) + self.overview_table.setItem(row, col, val_item) + + def get_socket_id(self, drone_id): + """從 drone_id 提取 socket_id (例如 's9_1' -> '9')""" + import re + match = re.match(r's(\d+)_\d+', drone_id) + return match.group(1) if match else 'unknown' def add_drone(self, drone_id): - if drone_id not in self.drones: - panel = self.create_drone_panel(drone_id) - self.info_layout.addWidget(panel) - self.drones[drone_id] = panel + if drone_id in self.drones: + return + + # 獲取 socket_id + socket_id = self.get_socket_id(drone_id) + + # 創建無人機面板 + panel = self.create_drone_panel(drone_id) + self.drones[drone_id] = panel + + # 如果該 socket 分組不存在,創建它 + if socket_id not in self.socket_groups: + group_panel = self.create_socket_group_panel(socket_id) + self.socket_groups[socket_id] = group_panel + + # 將無人機面板添加到對應的 socket 分組中 + group_panel = self.socket_groups[socket_id] + group_panel.drones_layout.addWidget(panel) + + # 重新排序並顯示所有 socket 分組 + self.reorganize_socket_groups() + + # 更新分組勾選框狀態 + self.update_group_checkbox_state(socket_id) + + # 更新總覽表 + self.update_overview_table() + + def reorganize_socket_groups(self): + """重新組織和排序 socket 分組""" + # 先清空主佈局 + while self.drone_panel_layout.count(): + w = self.drone_panel_layout.takeAt(0).widget() + if w: + w.setParent(None) + + # 對每個 socket 分組內的無人機進行排序 + for socket_id, group_panel in self.socket_groups.items(): + # 獲取該分組內的所有無人機 + group_drones = [did for did in self.drones.keys() + if self.get_socket_id(did) == socket_id] + + # 清空分組佈局 + while group_panel.drones_layout.count(): + w = group_panel.drones_layout.takeAt(0).widget() + if w: + w.setParent(None) + + # 對該分組內的無人機按數字排序 + def sort_key(x): + parts = x[1:].split('_') + return (int(parts[0]), int(parts[1])) + + # 重新添加排序後的無人機面板 + for did in sorted(group_drones, key=sort_key): + group_panel.drones_layout.addWidget(self.drones[did]) + + # 按 socket_id 數字順序添加分組到主佈局 + for socket_id in sorted(self.socket_groups.keys(), key=lambda x: int(x)): + self.drone_panel_layout.addWidget(self.socket_groups[socket_id]) def spin_ros(self): try: - self.executor.spin_once(timeout_sec=0) + self.executor.spin_once(timeout_sec=0.01) + for (drone_id, msg_type), data in self.monitor.latest_data.items(): + self.monitor.signals.update_signal.emit(msg_type, drone_id, data) + self.monitor.latest_data.clear() except Exception as e: print(f"ROS spin error: {e}") def closeEvent(self, event): + # 停止 UDP 接收器 + for receiver in self.udp_receivers: + receiver.stop() + + # 停止 WebSocket 接收器 + for receiver in self.monitor.ws_receivers: + receiver.stop() + self.monitor.destroy_node() self.executor.shutdown() rclpy.shutdown() @@ -441,4 +859,4 @@ if __name__ == '__main__': app = QApplication(sys.argv) station = ControlStationUI() station.show() - app.exec(app.exec()) \ No newline at end of file + app.exec() \ No newline at end of file diff --git a/src/unitdev01/unitdev01/map_layout.py b/src/unitdev01/unitdev01/map_layout.py new file mode 100644 index 0000000..11f7171 --- /dev/null +++ b/src/unitdev01/unitdev01/map_layout.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python3 +from PyQt6.QtWebEngineWidgets import QWebEngineView +from PyQt6.QtCore import QTimer, pyqtSignal, QObject, pyqtSlot +from PyQt6.QtWebChannel import QWebChannel + +class DroneMap: + """無人機地圖類別 - 負責管理 Leaflet 地圖顯示""" + + def __init__(self): + """初始化地圖""" + self.map_view = QWebEngineView() + self.map_loaded = False + self.pending_map_updates = {} + + # 創建橋接對象 + self.bridge = MapBridge() + + # 設置 QWebChannel + self.channel = QWebChannel() + self.channel.registerObject('bridge', self.bridge) + self.map_view.page().setWebChannel(self.channel) + + # 設置地圖 HTML + inline_html = ''' + + + + + + + + + + + +
+
+ +
+ + + + + ''' + + self.map_view.setHtml(inline_html) + self.map_view.loadFinished.connect(self._on_map_loaded) + + # 設置地圖更新計時器 + self.map_update_timer = QTimer() + self.map_update_timer.timeout.connect(self.update_map_positions) + self.map_update_timer.start(200) # 每 200ms 更新一次 + + def _on_map_loaded(self, ok: bool): + """地圖加載完成回調""" + if ok: + self.map_loaded = True + else: + print("⚠️ 地圖加載失敗") + + def update_drone_position(self, drone_id, lat, lon, heading): + """更新無人機位置(加入待處理隊列)""" + self.pending_map_updates[drone_id] = (lat, lon, heading) + + def update_map_positions(self): + """批量更新地圖上的無人機位置""" + if not self.map_loaded or not self.pending_map_updates: + return + + # 批量執行所有待更新的位置 + js_commands = [] + for drone_id, (lat, lon, heading) in self.pending_map_updates.items(): + js_commands.append(f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});") + + if js_commands: + combined_js = "\n".join(js_commands) + self.map_view.page().runJavaScript(combined_js) + + # 清空待更新緩存 + self.pending_map_updates.clear() + + def clear_trajectories(self): + """清除所有軌跡""" + if self.map_loaded: + self.map_view.page().runJavaScript("clearAllTrajectories();") + + def focus_on_drone(self, drone_id): + """聚焦到指定無人機""" + if self.map_loaded: + self.map_view.page().runJavaScript(f"focusOn('{drone_id}');") + + def get_widget(self): + """獲取地圖 widget""" + return self.map_view + + def get_gps_signal(self): + """獲取 GPS 信號""" + return self.bridge.gps_signal + +class MapBridge(QObject): + """JavaScript 和 Python 之間的橋接類""" + gps_signal = pyqtSignal(float, float) # lat, lon + + def __init__(self): + super().__init__() + + @pyqtSlot(float, float) + def emitGpsSignal(self, lat, lon): + """供 JavaScript 調用的方法""" + self.gps_signal.emit(lat, lon) \ No newline at end of file diff --git a/src/unitdev01/unitdev01/mavlink.py b/src/unitdev01/unitdev01/mavlink.py index b047a0e..9bb6329 100644 --- a/src/unitdev01/unitdev01/mavlink.py +++ b/src/unitdev01/unitdev01/mavlink.py @@ -21,13 +21,15 @@ class MAVLinkWorker(QThread): param_signal = pyqtSignal(str, int) kbps_signal = pyqtSignal(str, float) - def __init__(self, connection_string="udp:127.0.0.1:14550", usb='/dev/ttyUSB0'): + def __init__(self, connection_string="udp:127.0.0.1:14560 ", usb='/dev/ttyUSB0', acm ='/dev/ttyACM0'): super().__init__() - self.namespaces = ['UAV1', 'UAV2', 'UAV3'] - self.connection = mavutil.mavlink_connection(usb, baud=57600) + self.sysids = [1, 5, 10] + self.namespaces = {} + self.namespaces = [f"UAV{sysid}" for sysid in self.sysids] + + self.connection = mavutil.mavlink_connection(connection_string, baud=115200) self.connection.wait_heartbeat() - for sysid in self.namespaces: - sysid = int(sysid.replace('UAV', '')) + for sysid in self.sysids: self.set_sr_params(sysid) self.running = True @@ -42,8 +44,8 @@ class MAVLinkWorker(QThread): self.total_bytes_received = {} self.throughput_KBps = {} - for namespace in self.namespaces: - self.request_param(namespace, "SR1_EXTRA1") + for sysid in self.sysids: + self.request_param(sysid, "SR1_EXTRA1") self.connection.mav.timesync_send( 0, #tc1 int(time.time() * 1e9) #ts1 @@ -52,7 +54,7 @@ class MAVLinkWorker(QThread): def run(self): while self.running: try: - msg = self.connection.recv_msg() + msg = self.connection.recv_msg() # Debugging line to see received messages current_time = time.time() if not msg: continue @@ -64,7 +66,6 @@ class MAVLinkWorker(QThread): if msg_type =="BAD_DATA": continue - if sysid not in self.total_bytes_received: self.total_bytes_received[sysid] = 0 self.throughput_KBps[sysid] = 0 @@ -162,7 +163,7 @@ class MAVLinkWorker(QThread): elif msg_type == 'PARAM_VALUE': param_name = "SR1_EXTRA1" if msg.param_id == param_name: - self.param_signal.emit(namespace, msg.param_value) + self.param_signal.emit(namespace, int(msg.param_value)) except Exception as e: print(f"Error reading message: {e}") @@ -171,8 +172,11 @@ class MAVLinkWorker(QThread): self.running = False self.connection.close() + def get_sysid(self, namespace): + return int(namespace.replace('UAV', '')) + def set_mode(self, namespace, mode): - sysid = int(namespace.replace('UAV', '')) + sysid = self.get_sysid(namespace) if mode == 'STABILIZE': mode = 0 elif mode == 'AUTO': @@ -188,7 +192,7 @@ class MAVLinkWorker(QThread): ) def arm(self, namespace, arm): - sysid = int(namespace.replace('UAV', '')) + sysid = self.get_sysid(namespace) self.connection.mav.command_long_send( sysid, 1, # Component ID @@ -199,7 +203,7 @@ class MAVLinkWorker(QThread): ) def takeoff(self, namespace, altitude): - sysid = int(namespace.replace('UAV', '')) + sysid = self.get_sysid(namespace) self.connection.mav.command_long_send( sysid, 1, # Component ID @@ -210,7 +214,7 @@ class MAVLinkWorker(QThread): ) def land(self, namespace): - sysid = int(namespace.replace('UAV', '')) + sysid = self.get_sysid(namespace) self.connection.mav.command_long_send( sysid, 1, # Component ID @@ -221,7 +225,7 @@ class MAVLinkWorker(QThread): ) def set_local_position(self, namespace, x, y, z): - sysid = int(namespace.replace('UAV', '')) + sysid = self.get_sysid(namespace) self.connection.mav.set_position_target_local_ned_send( 0, sysid, 1, # time_boot_ms, sysid, compid mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame @@ -233,7 +237,7 @@ class MAVLinkWorker(QThread): ) def reboot_drone(self, namespace): - sysid = int(namespace.replace('UAV', '')) + sysid = self.get_sysid(namespace) self.connection.mav.command_long_send( sysid, 1, # target_component (一般為1) @@ -244,7 +248,7 @@ class MAVLinkWorker(QThread): ) def set_param(self, namespace, param_name, value): - sysid = int(namespace.replace('UAV', '')) + sysid = self.get_sysid(namespace) try: self.connection.mav.param_set_send( sysid, @@ -256,8 +260,7 @@ class MAVLinkWorker(QThread): except Exception as e: print(f"Failed to set parameter {param_name}: {e}") - def request_param(self, namespace, param_name): - sysid = int(namespace.replace('UAV', '')) + def request_param(self, sysid, param_name): try: self.connection.mav.param_request_read_send( sysid, # 系統 ID @@ -268,43 +271,57 @@ class MAVLinkWorker(QThread): except Exception as e: print(f"Failed to set parameter {param_name}: {e}") - ''' + def set_sr_params(self, sysid): """ 直接設置 MAVLink 訊息頻率 """ - freqs = [0, 2, 4] + freqs = [0, 1, 4] params = { "SR1_ADSB": freqs[0], "SR1_EXT_STAT": freqs[1], "SR1_EXTRA1": freqs[2], - "SR1_EXTRA2": freqs[2], + "SR1_EXTRA2": freqs[1], "SR1_EXTRA3": freqs[1], - "SR1_POSITION": freqs[1], + "SR1_POSITION": freqs[2], "SR1_RAW_SENS": freqs[1], "SR1_RC_CHAN": freqs[1] } for param, value in params.items(): self.set_param(f"UAV{sysid}", param, value) - ''' + + ''' + def set_sr_params(self, sysid): + """ 直接設置 MAVLink 訊息頻率 """ + params = { + "SERIAL1_BAUD": 38 + } + for param, value in params.items(): + self.set_param(f"UAV{sysid}", param, value) + def set_sr_params(self, sysid): """ 直接設置 MAVLink 訊息頻率 """ - freqs = [0, 1, 1] + freqs = [0, 3, 3] params = { "SR1_ADSB": freqs[0], - "SR1_EXT_STAT": freqs[0], + "SR1_EXT_STAT": freqs[1], "SR1_EXTRA1": freqs[2], "SR1_EXTRA2": freqs[2], - "SR1_EXTRA3": freqs[0], + "SR1_EXTRA3": freqs[1], "SR1_POSITION": freqs[1], "SR1_RAW_SENS": freqs[1], - "SR1_RC_CHAN": freqs[0] + "SR1_RC_CHAN": freqs[1] } for param, value in params.items(): self.set_param(f"UAV{sysid}", param, value) + ''' + def init_drone(self, namespace): + sysid = self.get_sysid(namespace) + self.set_local_position(sysid, 0, 0, 0) class DroneControlApp(QMainWindow): def __init__(self): super().__init__() self.workers = MAVLinkWorker() + self.sysids = self.workers.sysids self.namespaces = self.workers.namespaces self.initUI() @@ -438,6 +455,15 @@ class DroneControlApp(QMainWindow): self.rebootButton.clicked.connect(self.reboot_drone) main_layout.addWidget(self.rebootButton) + mission_layout = QHBoxLayout() + self.initButton = QPushButton("重設位置") + self.initButton.clicked.connect(self.init_drone) + self.startButton = QPushButton("開始任務") + self.startButton.clicked.connect(self.start_mission) + mission_layout.addWidget(self.initButton) + mission_layout.addWidget(self.startButton) + main_layout.addLayout(mission_layout) + # 設置主佈局 central_widget = QWidget(self) central_widget.setLayout(main_layout) @@ -563,6 +589,24 @@ class DroneControlApp(QMainWindow): except ValueError: QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + def init_drone(self): + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.init_drone(namespace) + print(f"重設位置:{namespace}") + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"重設位置失敗:{e}") + + def start_mission(self): + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.start_mission(namespace) + print(f"開始任務:{namespace}") + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"開始任務失敗:{e}") + def main(): app = QtWidgets.QApplication(sys.argv) window = DroneControlApp() diff --git a/src/unitdev01/unitdev01/xbee.py b/src/unitdev01/unitdev01/xbee.py deleted file mode 100644 index b550e1e..0000000 --- a/src/unitdev01/unitdev01/xbee.py +++ /dev/null @@ -1,619 +0,0 @@ -import sys -import time -import math -import serial -import struct -from PyQt5 import QtWidgets -from PyQt5.QtCore import QThread, pyqtSignal -from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox -from pymavlink import mavutil -from pymavlink.dialects.v20 import ardupilotmega as mavlink2 - -class PacketCapture: - def __init__(self): - self.data = bytearray() - - def write(self, b): - self.data.extend(b) - return len(b) - -class MAVLinkWorker(QThread): - state_signal = pyqtSignal(str, str) - battery_signal = pyqtSignal(str, float) - gps_signal = pyqtSignal(str, float, float) - gps_status_signal = pyqtSignal(str, int, int) - local_position_signal = pyqtSignal(str, float, float, float) - imu_signal = pyqtSignal(str, float) - hdg_signal = pyqtSignal(str, float) - groundspeed_signal = pyqtSignal(str, float) - ping_signal = pyqtSignal(str, float) - loss_signal = pyqtSignal(str, float) - frequency_signal = pyqtSignal(str, float) - param_signal = pyqtSignal(str, int) - kbps_signal = pyqtSignal(str, float) - - def __init__(self): - super().__init__() - self.namespaces = ['UAV1', 'UAV2', 'UAV3'] - self.connection = mavutil.mavlink_connection('/dev/ttyUSB0', baud=57600) - self.connection.wait_heartbeat() - for sysid in self.namespaces: - sysid = int(sysid.replace('UAV', '')) - self.set_sr_params(sysid) - self.running = True - - # 用於計算頻率丟包 - self.message_count = {} - self.frequency = {} - self.start_time = {} - self.seq_numbers = {} - self.packet_loss_count = {} - self.total_packet_count = {} - self.loss_percentage = {} - self.total_bytes_received = {} - self.throughput_KBps = {} - - for namespace in self.namespaces: - self.request_param(namespace, "SR1_EXTRA1") - self.connection.mav.timesync_send( - 0, #tc1 - int(time.time() * 1e9) #ts1 - ) - - def run(self): - while self.running: - try: - msg = self.connection.recv_msg() - current_time = time.time() - if not msg: - continue - sysid = msg.get_srcSystem() - if sysid == 0: - continue - namespace = f"UAV{sysid}" - msg_type = msg.get_type() - if msg_type =="BAD_DATA": - continue - - if sysid not in self.total_bytes_received: - self.total_bytes_received[sysid] = 0 - self.throughput_KBps[sysid] = 0 - - # 計算訊息大小 - msg_bytes = msg.get_msgbuf() # 取得封包的 bytes - if msg_bytes: - self.total_bytes_received[sysid] += len(msg_bytes) - - # Packet loss calculation - if sysid not in self.seq_numbers: - self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking - self.packet_loss_count[sysid] = 0 - self.total_packet_count[sysid] = 1 - else: - current_seq = msg.get_seq() - expected_seq = (self.seq_numbers[sysid] + 1) % 256 - self.total_packet_count[sysid] += 1 - - if current_seq != expected_seq: # Packet(s) lost - lost_packets = (current_seq - expected_seq) % 256 - self.packet_loss_count[sysid] += lost_packets - self.total_packet_count[sysid] += lost_packets - - self.seq_numbers[sysid] = current_seq - self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100 - self.loss_signal.emit(namespace, self.loss_percentage[sysid]) - - # Frequency calculation - if sysid not in self.message_count: - self.message_count[sysid] = 0 - self.start_time[sysid] = current_time - - self.message_count[sysid] += 1 - - # 每隔一段時間更新 - elapsed_time = current_time - self.start_time[sysid] - if elapsed_time >= 1: - # 每秒頻率 - self.frequency[sysid] = self.message_count[sysid] / elapsed_time - self.frequency_signal.emit(namespace, self.frequency[sysid]) - - # 發送 timesync request - self.connection.mav.timesync_send( - 0, #tc1 - int(current_time * 1e9) #ts1 - ) - #吞吐量 - self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time - self.kbps_signal.emit(namespace, self.throughput_KBps[sysid]) - - self.start_time[sysid] = current_time - self.message_count[sysid] = 0 - self.total_bytes_received[sysid] = 0 - - if msg_type == "HEARTBEAT": - mode = mavutil.mode_string_v10(msg) - self.state_signal.emit(namespace, mode) - - elif msg_type == "BATTERY_STATUS": - voltage = msg.voltages[0] / 1000 - self.battery_signal.emit(namespace, voltage) - - elif msg_type == "GLOBAL_POSITION_INT": - latitude = msg.lat / 1e7 - longitude = msg.lon / 1e7 - self.gps_signal.emit(namespace, latitude, longitude) - - elif msg_type == "GPS_RAW_INT": - fix_type = msg.fix_type - satellites_visible = msg.satellites_visible - self.gps_status_signal.emit(namespace, fix_type, satellites_visible) - - elif msg_type == "LOCAL_POSITION_NED": - x = msg.y - y = msg.x - z = -msg.z - self.local_position_signal.emit(namespace, x, y, z) - - elif msg_type == "ATTITUDE": - pitch = math.degrees(msg.pitch) - self.imu_signal.emit(namespace, pitch) - - elif msg_type == "VFR_HUD": - groundspeed = msg.groundspeed - heading = msg.heading - self.hdg_signal.emit(namespace, heading) - self.groundspeed_signal.emit(namespace, groundspeed) - - elif msg_type == "TIMESYNC": - round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6 - if(round_trip_time<1e6): - self.ping_signal.emit(namespace, round_trip_time) - - elif msg_type == 'PARAM_VALUE': - param_name = "SR1_EXTRA1" - if msg.param_id == param_name: - self.param_signal.emit(namespace, msg.param_value) - - except Exception as e: - print(f"Error reading message: {e}") - - def stop(self): - self.running = False - self.connection.close() - - def build_api_tx_frame(self, data: bytes, frame_id=0x01): - frame_type = 0x10 - dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # Broadcast - dest_addr16 = b'\xFF\xFE' - broadcast_radius = 0x00 - options = 0x00 - - frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) - frame += dest_addr64 + dest_addr16 - frame += struct.pack(">BB", broadcast_radius, options) + data - checksum = 0xFF - (sum(frame) & 0xFF) - return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) - - def send_command(self, msg): - # Create the packet and send via serial port - PORT = "/dev/ttyUSB0" - BAUD = 57600 - ser = serial.Serial(PORT, BAUD) - capture = PacketCapture() - mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1) - mav.version = 2 - mav.send(msg) - api_frame = self.build_api_tx_frame(capture.data) - ser.write(api_frame) - print("RAW HEX:", capture.data.hex()) - - def set_mode(self, namespace, mode): - sysid = int(namespace.replace('UAV', '')) - if mode == 'STABILIZE': - mode = 0 - elif mode == 'AUTO': - mode = 3 - elif mode == 'GUIDED': - mode = 4 - elif mode == 'LOITER': - mode = 5 - - msg = self.connection.mav.command_long_encode( - target_system=sysid, - target_component=1, - command=mavutil.mavlink.MAV_CMD_DO_SET_MODE, - confirmation=0, - param1=1, # Custom mode enabled - param2=mode, - param3=0, param4=0, param5=0, param6=0, param7=0 - ) - self.send_command(msg) - - def arm(self, namespace, arm): - sysid = int(namespace.replace('UAV', '')) - msg = self.connection.mav.command_long_encode( - target_system=sysid, - target_component=1, - command=mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, - confirmation=0, - param1=1 if arm else 0, # 1 to arm, 0 to disarm - param2=0, param3=0, param4=0, param5=0, param6=0, param7=0 - ) - self.send_command(msg) - - def takeoff(self, namespace, altitude): - sysid = int(namespace.replace('UAV', '')) - msg = self.connection.mav.command_long_encode( - target_system=sysid, - target_component=1, - command=mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, - confirmation=0, - param1=0, param2=0, param3=0, param4=0, param5=0, param6=0, - param7=altitude - ) - self.send_command(msg) - - def land(self, namespace): - sysid = int(namespace.replace('UAV', '')) - msg = self.connection.mav.command_long_encode( - target_system=sysid, - target_component=1, - command=mavutil.mavlink.MAV_CMD_NAV_LAND, - confirmation=0, - param1=0, param2=0, param3=0, param4=0, param5=0, param6=0, - param7=0 - ) - self.send_command(msg) - - def set_local_position(self, namespace, x, y, z): - sysid = int(namespace.replace('UAV', '')) - msg = self.connection.mav.set_position_target_local_ned_encode( - 0, sysid, 1, # time_boot_ms, sysid, compid - mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame - 0b110111111000, # Mask - y, x, -z, # Position - 0, 0, 0, # Velocity - 0, 0, 0, # Acceleration - 0, 0 # Yaw, yaw_rate - ) - self.send_command(msg) - - def reboot_drone(self, namespace): - sysid = int(namespace.replace('UAV', '')) - self.connection.mav.command_long_send( - sysid, - 1, # target_component (一般為1) - mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, # Reboot 命令 - 0, # confirmation - 1, # 第一個參數 (1 = reboot,0 = 無操作,2 = 關機) - 0, 0, 0, 0, 0, 0 # 其餘參數保留 - ) - - def set_param(self, namespace, param_name, value): - sysid = int(namespace.replace('UAV', '')) - try: - self.connection.mav.param_set_send( - sysid, - 1, - param_name.encode('utf-8'), - float(value), - mavutil.mavlink.MAV_PARAM_TYPE_REAL32 - ) - except Exception as e: - print(f"Failed to set parameter {param_name}: {e}") - - def request_param(self, namespace, param_name): - sysid = int(namespace.replace('UAV', '')) - try: - self.connection.mav.param_request_read_send( - sysid, # 系統 ID - 1, # 組件 ID - param_name.encode('utf-8'), # 參數名稱 - -1 # 參數索引,-1 表示根據名稱請求 - ) - except Exception as e: - print(f"Failed to set parameter {param_name}: {e}") - - ''' - def set_sr_params(self, sysid): - """ 直接設置 MAVLink 訊息頻率 """ - freqs = [0, 2, 4] - params = { - "SR1_ADSB": freqs[0], - "SR1_EXT_STAT": freqs[1], - "SR1_EXTRA1": freqs[2], - "SR1_EXTRA2": freqs[2], - "SR1_EXTRA3": freqs[1], - "SR1_POSITION": freqs[1], - "SR1_RAW_SENS": freqs[1], - "SR1_RC_CHAN": freqs[1] - } - for param, value in params.items(): - self.set_param(f"UAV{sysid}", param, value) - ''' - def set_sr_params(self, sysid): - """ 直接設置 MAVLink 訊息頻率 """ - freqs = [0, 1, 1] - params = { - "SR1_ADSB": freqs[0], - "SR1_EXT_STAT": freqs[0], - "SR1_EXTRA1": freqs[2], - "SR1_EXTRA2": freqs[2], - "SR1_EXTRA3": freqs[0], - "SR1_POSITION": freqs[1], - "SR1_RAW_SENS": freqs[1], - "SR1_RC_CHAN": freqs[0] - } - for param, value in params.items(): - self.set_param(f"UAV{sysid}", param, value) - -class DroneControlApp(QMainWindow): - def __init__(self): - super().__init__() - self.workers = MAVLinkWorker() - self.namespaces = self.workers.namespaces - self.initUI() - - # Connect signals to update the UI - self.workers.state_signal.connect(self.update_state) - self.workers.battery_signal.connect(self.update_battery) - self.workers.gps_signal.connect(self.update_gps) - self.workers.gps_status_signal.connect(self.update_gps_status) - self.workers.local_position_signal.connect(self.update_local_position) - self.workers.imu_signal.connect(self.update_imu) - self.workers.hdg_signal.connect(self.update_hdg) - self.workers.groundspeed_signal.connect(self.update_groundspeed) - self.workers.ping_signal.connect(self.update_ping) - self.workers.loss_signal.connect(self.update_loss) - self.workers.frequency_signal.connect(self.update_frequency) - self.workers.param_signal.connect(self.update_param) - self.workers.kbps_signal.connect(self.update_kbps) - self.workers.start() - - def initUI(self): - self.setWindowTitle("多無人機控制介面") - self.setGeometry(100, 100, 800, 600) - - # 主佈局 - main_layout = QVBoxLayout() - - # 無人機選擇區域 - uav_layout = QHBoxLayout() - self.uav_checkboxes = {} - for namespace in self.namespaces: - checkbox = QCheckBox(namespace) - checkbox.setChecked(True) # 預設勾選 - self.uav_checkboxes[namespace] = checkbox - uav_layout.addWidget(checkbox) - main_layout.addLayout(uav_layout) - - # 顯示所有無人機資訊,從左到右顯示 - uav_layout = QHBoxLayout() - - # 逐個顯示 UAV 的資訊 - self.uav_labels = {} - for namespace in self.namespaces: - uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列 - - # 狀態顯示 - self.uav_labels[namespace] = { - "status": QLabel("狀態:等待數據..."), - "battery": QLabel("電壓:等待數據..."), - "local_position": QLabel("Local Position:等待數據..."), - "gps": QLabel("GPS位置:等待數據...\n\n"), - "fix": QLabel("Fix Type:等待數據..."), - "satellites": QLabel("衛星數量:等待數據..."), - "groundspeed": QLabel("地面速度:等待數據..."), - "pitch": QLabel("Pitch:等待數據..."), - "heading": QLabel("Heading:等待數據..."), - "ping": QLabel("Ping:等待數據..."), - "loss": QLabel("丟包:等待數據..."), - "frequency": QLabel("頻率:等待數據..."), - "kbps": QLabel("吞吐量:等待數據..."), - "param": QLabel("SR1_EXTRA1:等待數據...") - } - - # 把每個資訊添加到該 UAV 的垂直佈局中 - for label in self.uav_labels[namespace].values(): - uav_info_layout.addWidget(label) - - # 將該 UAV 的資訊佈局加到主佈局中(從左到右排列) - uav_layout.addLayout(uav_info_layout) - - main_layout.addLayout(uav_layout) - - # SR1_EXTRA1參數設置 - param_layout = QHBoxLayout() - self.paramInput = QLineEdit() - self.paramInput.setPlaceholderText("輸入SR1_EXTRA1的新值") - self.setParamButton = QPushButton("設置SR1_EXTRA1") - self.setParamButton.clicked.connect(self.set_SR1_EXTRA1) - param_layout.addWidget(self.paramInput) - param_layout.addWidget(self.setParamButton) - main_layout.addLayout(param_layout) - - # 模式切換區域 - mode_layout = QHBoxLayout() - self.modeComboBox = QComboBox() - self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"]) - self.modeButton = QPushButton("切換模式") - self.modeButton.clicked.connect(self.change_mode) - mode_layout.addWidget(QLabel("選擇模式:")) - mode_layout.addWidget(self.modeComboBox) - mode_layout.addWidget(self.modeButton) - main_layout.addLayout(mode_layout) - - # 解鎖按鈕 - fly_layout = QHBoxLayout() - self.armButton = QPushButton("解鎖") - self.armButton.clicked.connect(self.arm_drone) - - # 起飛按鈕 - self.takeoffButton = QPushButton("起飛") - self.takeoffButton.clicked.connect(self.takeoff_drone) - - # 降落按鈕 - self.landButton = QPushButton("降落") - self.landButton.clicked.connect(self.land_drone) - fly_layout.addWidget(self.armButton) - fly_layout.addWidget(self.takeoffButton) - fly_layout.addWidget(self.landButton) - main_layout.addLayout(fly_layout) - - # XYZ 設置欄位 - xyz_layout = QHBoxLayout() - self.positionX = QLineEdit() - self.positionX.setPlaceholderText("X") - self.positionY = QLineEdit() - self.positionY.setPlaceholderText("Y") - self.positionZ = QLineEdit() - self.positionZ.setPlaceholderText("Z") - self.setPositionButton = QPushButton("設置位置") - self.setPositionButton.clicked.connect(self.set_local_position) - xyz_layout.addWidget(QLabel("輸入位置:")) - xyz_layout.addWidget(self.positionX) - xyz_layout.addWidget(self.positionY) - xyz_layout.addWidget(self.positionZ) - xyz_layout.addWidget(self.setPositionButton) - main_layout.addLayout(xyz_layout) - - # 設置 XYZ 的按鈕 - - #設置重啟按鈕 - self.rebootButton = QPushButton("重啟") - self.rebootButton.clicked.connect(self.reboot_drone) - main_layout.addWidget(self.rebootButton) - - # 設置主佈局 - central_widget = QWidget(self) - central_widget.setLayout(main_layout) - self.setCentralWidget(central_widget) - self.show() - - def closeEvent(self, event): - try: - self.workers.stop() - finally: - event.accept() - - def update_state(self, namespace, mode): - self.uav_labels[namespace]["status"].setText(f"狀態:{mode}") - - def update_battery(self, namespace, voltage): - self.uav_labels[namespace]["battery"].setText(f"電壓:{voltage:.2f} V") - - def update_gps(self, namespace, latitude, longitude): - self.uav_labels[namespace]["gps"].setText(f"GPS位置: \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°") - - def update_gps_status(self, namespace, fix_type, satellites_visible): - fix_type_str = { - 0: "No Fix", - 1: "No Fix", - 2: "2D Fix", - 3: "3D Fix", - 4: "RTK Float", - 5: "RTK Fix" - }.get(fix_type, "Unknown") - self.uav_labels[namespace]["fix"].setText(f"Fix Type:{fix_type_str}") - self.uav_labels[namespace]["satellites"].setText(f"衛星數量:{satellites_visible}") - - def update_local_position(self, namespace, x, y, z): - self.uav_labels[namespace]["local_position"].setText(f"Local:({x:.2f}, {y:.2f}, {z:.2f})") - - def update_imu(self, namespace, pitch): - self.uav_labels[namespace]["pitch"].setText(f"Pitch:{pitch:.2f}°") - - def update_hdg(self, namespace, heading): - self.uav_labels[namespace]["heading"].setText(f"Heading:{heading:.2f}°") - - def update_groundspeed(self, namespace, groundspeed): - self.uav_labels[namespace]["groundspeed"].setText(f"地面速度:{groundspeed:.2f} m/s") - - def update_ping(self, namespace, ping): - self.uav_labels[namespace]["ping"].setText(f"Ping:{ping:.2f} ms") - - def update_loss(self, namespace, loss): - self.uav_labels[namespace]["loss"].setText(f"丟包:{loss:.2f}%") - - def update_frequency(self, namespace, frequency): - self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz") - - def update_kbps(self, namespace, kbps): - self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s") - - def update_param(self, namespace, value): - self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}") - - def change_mode(self): - try: - selected_mode = self.modeComboBox.currentText() - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.set_mode(namespace, selected_mode) - except Exception as e: - QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}") - - def arm_drone(self): - try: - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.arm(namespace, True) - except Exception as e: - QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}") - - def takeoff_drone(self): - try: - z_text = self.positionZ.text().strip() - z = float(z_text) if z_text else 5.0 - h = 2 - for i, namespace in enumerate(self.namespaces): - if self.uav_checkboxes[namespace].isChecked(): - self.workers.takeoff(namespace, z + h * i) - - except ValueError: - QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") - - def land_drone(self): - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.land(namespace) - - def set_local_position(self): - try: - x = float(self.positionX.text().strip()) - y = float(self.positionY.text().strip()) - z = float(self.positionZ.text().strip()) - h = 2 - for i, namespace in enumerate(self.namespaces): - if self.uav_checkboxes[namespace].isChecked(): - self.workers.set_local_position(namespace, x, y, z + h * i) - - except ValueError: - QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") - - def reboot_drone(self): - try: - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.reboot_drone(namespace) - except Exception as e: - QtWidgets.QMessageBox.warning(self, "錯誤", f"重啟失敗:{e}") - - def set_SR1_EXTRA1(self): - param_value = self.paramInput.text().strip() - try: - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.set_param(namespace, "SR1_EXTRA1", param_value) - self.workers.request_param(namespace, "SR1_EXTRA1") - except ValueError: - QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") - -def main(): - app = QtWidgets.QApplication(sys.argv) - window = DroneControlApp() - window.show() - sys.exit(app.exec_()) - -if __name__ == '__main__': - main() \ No newline at end of file