You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/GUI/comm_panel.py

687 lines
24 KiB
Python

#!/usr/bin/env python3
from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
3 months ago
QPushButton, QLineEdit, QComboBox)
from PyQt6.QtCore import pyqtSignal
3 months ago
import glob
import os
class CommPanel(QWidget):
"""通讯设置面板类"""
# 定义信号
udp_connection_added = pyqtSignal(str, int) # ip, port
udp_connection_toggled = pyqtSignal(dict, QPushButton, QLabel) # conn, btn, status_label
udp_connection_removed = pyqtSignal(dict, QWidget) # conn, panel
ws_connection_added = pyqtSignal(str) # url
ws_connection_toggled = pyqtSignal(dict, QPushButton, QLabel) # conn, btn, status_label
ws_connection_removed = pyqtSignal(dict, QWidget) # conn, panel
3 months ago
serial_connection_added = pyqtSignal(str, int) # port, baudrate
serial_connection_toggled = pyqtSignal(dict, QPushButton, QLabel) # conn, btn, status_label
serial_connection_removed = pyqtSignal(dict, QWidget) # conn, panel
status_message = pyqtSignal(str, int) # message, timeout
def __init__(self, parent=None):
super().__init__(parent)
self.udp_connections = []
self.ws_connections = []
3 months ago
self.serial_connections = []
self._init_ui()
def _init_ui(self):
"""初始化UI"""
layout = QVBoxLayout(self)
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(10)
# ========== UDP MAVLink 區域 ==========
udp_title = QLabel("UDP")
udp_title.setStyleSheet("""
color: #DDD;
font-size: 14px;
font-weight: bold;
padding: 5px;
background-color: #333;
border-radius: 4px;
""")
layout.addWidget(udp_title)
# UDP 連接列表容器
self.udp_list_widget = QWidget()
self.udp_list_layout = QVBoxLayout(self.udp_list_widget)
self.udp_list_layout.setContentsMargins(0, 0, 0, 0)
self.udp_list_layout.setSpacing(5)
layout.addWidget(self.udp_list_widget)
# UDP 添加新連接區域
add_udp_widget = QWidget()
add_udp_layout = QHBoxLayout(add_udp_widget)
add_udp_layout.setContentsMargins(0, 0, 0, 0)
self.udp_ip_input = QLineEdit()
self.udp_ip_input.setText("127.0.0.1")
self.udp_ip_input.setPlaceholderText("IP")
self.udp_ip_input.setStyleSheet("""
QLineEdit {
background-color: #333;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
}
""")
self.udp_port_input = QLineEdit()
self.udp_port_input.setText("14550")
self.udp_port_input.setPlaceholderText("Port")
self.udp_port_input.setFixedWidth(80)
self.udp_port_input.setStyleSheet("""
QLineEdit {
background-color: #333;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
}
""")
add_udp_btn = QPushButton("添加")
add_udp_btn.clicked.connect(self._handle_add_udp)
add_udp_btn.setStyleSheet("""
QPushButton {
background-color: #4CAF50;
color: white;
border: none;
padding: 6px 12px;
border-radius: 4px;
min-width: 30px;
}
QPushButton:hover { background-color: #45a049; }
""")
add_udp_layout.addWidget(QLabel("IP:", styleSheet="color: #DDD;"))
add_udp_layout.addWidget(self.udp_ip_input)
add_udp_layout.addWidget(QLabel("Port:", styleSheet="color: #DDD;"))
add_udp_layout.addWidget(self.udp_port_input)
add_udp_layout.addWidget(add_udp_btn)
layout.addWidget(add_udp_widget)
# 分隔線
separator = QWidget()
separator.setFixedHeight(20)
layout.addWidget(separator)
3 months ago
# ========== Serial 區域 ==========
serial_title = QLabel("Serial")
serial_title.setStyleSheet("""
color: #DDD;
font-size: 14px;
font-weight: bold;
padding: 5px;
background-color: #333;
border-radius: 4px;
""")
layout.addWidget(serial_title)
# Serial 連接列表容器
self.serial_list_widget = QWidget()
self.serial_list_layout = QVBoxLayout(self.serial_list_widget)
self.serial_list_layout.setContentsMargins(0, 0, 0, 0)
self.serial_list_layout.setSpacing(5)
layout.addWidget(self.serial_list_widget)
# Serial 添加新連接區域
add_serial_widget = QWidget()
add_serial_layout = QHBoxLayout(add_serial_widget)
add_serial_layout.setContentsMargins(0, 0, 0, 0)
self.serial_port_combo = QComboBox()
self.serial_port_combo.setStyleSheet("""
QComboBox {
background-color: #333;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
}
QComboBox::drop-down {
border: none;
}
QComboBox::down-arrow {
image: none;
border-left: 5px solid transparent;
border-right: 5px solid transparent;
border-top: 5px solid #DDD;
}
""")
self._refresh_serial_ports()
refresh_ports_btn = QPushButton("")
refresh_ports_btn.setFixedWidth(35)
refresh_ports_btn.clicked.connect(self._refresh_serial_ports)
refresh_ports_btn.setToolTip("重新掃描串口")
refresh_ports_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 6px;
border-radius: 4px;
font-size: 16px;
}
QPushButton:hover { background-color: #555; }
""")
self.serial_baudrate_combo = QComboBox()
self.serial_baudrate_combo.addItems(['9600', '19200', '38400', '57600', '115200'])
self.serial_baudrate_combo.setCurrentText('57600')
self.serial_baudrate_combo.setFixedWidth(100)
self.serial_baudrate_combo.setStyleSheet("""
QComboBox {
background-color: #333;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
}
QComboBox::drop-down {
border: none;
}
QComboBox::down-arrow {
image: none;
border-left: 5px solid transparent;
border-right: 5px solid transparent;
border-top: 5px solid #DDD;
}
""")
add_serial_btn = QPushButton("添加")
add_serial_btn.clicked.connect(self._handle_add_serial)
add_serial_btn.setStyleSheet("""
QPushButton {
background-color: #4CAF50;
color: white;
border: none;
padding: 6px 12px;
border-radius: 4px;
min-width: 30px;
}
QPushButton:hover { background-color: #45a049; }
""")
add_serial_layout.addWidget(QLabel("Port:", styleSheet="color: #DDD;"))
add_serial_layout.addWidget(self.serial_port_combo)
add_serial_layout.addWidget(refresh_ports_btn)
add_serial_layout.addWidget(QLabel("Baud:", styleSheet="color: #DDD;"))
add_serial_layout.addWidget(self.serial_baudrate_combo)
add_serial_layout.addWidget(add_serial_btn)
layout.addWidget(add_serial_widget)
# 分隔線
separator = QWidget()
separator.setFixedHeight(20)
layout.addWidget(separator)
# ========== WebSocket 區域 ==========
ws_title = QLabel("WebSocket")
ws_title.setStyleSheet("""
color: #DDD;
font-size: 14px;
font-weight: bold;
padding: 5px;
background-color: #333;
border-radius: 4px;
""")
layout.addWidget(ws_title)
# WebSocket 連接列表容器
self.ws_list_widget = QWidget()
self.ws_list_layout = QVBoxLayout(self.ws_list_widget)
self.ws_list_layout.setContentsMargins(0, 0, 0, 0)
self.ws_list_layout.setSpacing(5)
layout.addWidget(self.ws_list_widget)
# WebSocket 添加新連接區域
add_ws_widget = QWidget()
add_ws_layout = QHBoxLayout(add_ws_widget)
add_ws_layout.setContentsMargins(0, 0, 0, 0)
self.ws_url_input = QLineEdit()
self.ws_url_input.setPlaceholderText("host")
self.ws_url_input.setStyleSheet("""
QLineEdit {
background-color: #333;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
}
""")
add_ws_btn = QPushButton("添加")
add_ws_btn.clicked.connect(self._handle_add_ws)
add_ws_btn.setStyleSheet("""
QPushButton {
background-color: #4CAF50;
color: white;
border: none;
padding: 6px 12px;
border-radius: 4px;
min-width: 30px;
}
QPushButton:hover { background-color: #45a049; }
""")
add_ws_layout.addWidget(QLabel("URL:", styleSheet="color: #DDD;"))
add_ws_layout.addWidget(self.ws_url_input)
add_ws_layout.addWidget(add_ws_btn)
layout.addWidget(add_ws_widget)
layout.addStretch()
def _handle_add_udp(self):
"""處理添加 UDP 連接"""
ip = self.udp_ip_input.text().strip()
port_text = self.udp_port_input.text().strip()
if not ip or not port_text:
self.status_message.emit("請輸入 IP 和 Port", 3000)
return
try:
port = int(port_text)
if port < 1 or port > 65535:
raise ValueError("Port 超出範圍")
except ValueError:
self.status_message.emit("Port 必須是 1-65535 的數字", 3000)
return
# 檢查是否已存在相同連接
for conn in self.udp_connections:
if conn['ip'] == ip and conn['port'] == port:
self.status_message.emit("連接已存在", 3000)
return
# 發送信號通知主窗口
self.udp_connection_added.emit(ip, port)
# 只清空 port 輸入框,保留 IP
self.udp_port_input.clear()
def _handle_add_ws(self):
"""處理添加 WebSocket 連接"""
input_url = self.ws_url_input.text().strip()
if not input_url:
self.status_message.emit("請輸入 WebSocket URL", 3000)
return
# 自動添加 ws:// 前綴
if not input_url.startswith('ws://') and not input_url.startswith('wss://'):
url = f'ws://{input_url}'
else:
url = input_url
# 基本 URL 格式驗證
try:
if '://' in url:
parts = url.split('://', 1)
if len(parts) == 2 and ':' not in parts[1]:
self.status_message.emit("URL 格式錯誤,需要包含端口號 (例如: 127.0.0.1:8756)", 3000)
return
except:
self.status_message.emit("URL 格式錯誤", 3000)
return
# 檢查是否已存在相同連接
for conn in self.ws_connections:
if conn['url'] == url:
self.status_message.emit("連接已存在", 3000)
return
# 發送信號通知主窗口
self.ws_connection_added.emit(url)
# 清空輸入框
self.ws_url_input.clear()
3 months ago
def _refresh_serial_ports(self):
"""重新掃描可用的串口"""
self.serial_port_combo.clear()
# 掃描 Linux 下的串口設備
ports = []
# 掃描 USB 串口
usb_ports = glob.glob('/dev/ttyUSB*')
ports.extend(usb_ports)
# 掃描 ACM 串口
acm_ports = glob.glob('/dev/ttyACM*')
ports.extend(acm_ports)
# 排序
ports.sort()
if ports:
self.serial_port_combo.addItems(ports)
else:
self.serial_port_combo.addItem("沒有找到串口")
self.serial_port_combo.setEnabled(False)
return
self.serial_port_combo.setEnabled(True)
def _handle_add_serial(self):
"""處理添加 Serial 連接"""
port = self.serial_port_combo.currentText()
baudrate_text = self.serial_baudrate_combo.currentText()
if not port or port == "沒有找到串口":
self.status_message.emit("請選擇有效的串口", 3000)
return
try:
baudrate = int(baudrate_text)
except ValueError:
self.status_message.emit("波特率格式錯誤", 3000)
return
# 檢查是否已存在相同連接
for conn in self.serial_connections:
if conn['port'] == port:
self.status_message.emit("連接已存在", 3000)
return
# 發送信號通知主窗口
self.serial_connection_added.emit(port, baudrate)
def _handle_add_ws(self):
"""處理添加 WebSocket 連接"""
input_url = self.ws_url_input.text().strip()
if not input_url:
self.status_message.emit("請輸入 WebSocket URL", 3000)
return
# 自動添加 ws:// 前綴
if not input_url.startswith('ws://') and not input_url.startswith('wss://'):
url = f'ws://{input_url}'
else:
url = input_url
# 基本 URL 格式驗證
try:
if '://' in url:
parts = url.split('://', 1)
if len(parts) == 2 and ':' not in parts[1]:
self.status_message.emit("URL 格式錯誤,需要包含端口號 (例如: 127.0.0.1:8756)", 3000)
return
except:
self.status_message.emit("URL 格式錯誤", 3000)
return
# 檢查是否已存在相同連接
for conn in self.ws_connections:
if conn['url'] == url:
self.status_message.emit("連接已存在", 3000)
return
# 發送信號通知主窗口
self.ws_connection_added.emit(url)
# 清空輸入框
self.ws_url_input.clear()
def create_udp_connection_panel(self, conn):
"""創建 UDP 連接面板"""
panel = QWidget()
panel.setStyleSheet("""
QWidget {
background-color: #2A2A2A;
border-radius: 6px;
padding: 8px;
border: 1px solid #444;
}
""")
layout = QHBoxLayout(panel)
layout.setContentsMargins(8, 8, 8, 8)
# 連接資訊
info_label = QLabel(f"{conn['name']} - {conn['ip']}:{conn['port']}")
info_label.setStyleSheet("color: #DDD; font-size: 12px;")
# 狀態指示器
status_label = QLabel("")
if conn.get('enabled', False):
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
else:
status_label.setStyleSheet("color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
# 控制按鈕
toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動")
toggle_btn.setFixedWidth(60)
toggle_btn.clicked.connect(lambda: self.udp_connection_toggled.emit(conn, toggle_btn, status_label))
toggle_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #555; }
""")
remove_btn = QPushButton("移除")
remove_btn.setFixedWidth(60)
remove_btn.clicked.connect(lambda: self.udp_connection_removed.emit(conn, panel))
remove_btn.setStyleSheet("""
QPushButton {
background-color: #d32f2f;
color: white;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #b71c1c; }
""")
layout.addWidget(status_label)
layout.addWidget(info_label)
layout.addStretch()
layout.addWidget(toggle_btn)
layout.addWidget(remove_btn)
# 儲存引用
panel.connection = conn
panel.toggle_btn = toggle_btn
panel.status_label = status_label
return panel
def create_ws_connection_panel(self, conn):
"""創建 WebSocket 連接面板"""
panel = QWidget()
panel.setStyleSheet("""
QWidget {
background-color: #2A2A2A;
border-radius: 6px;
padding: 8px;
border: 1px solid #444;
}
""")
layout = QHBoxLayout(panel)
layout.setContentsMargins(8, 8, 8, 8)
# 連接資訊
info_label = QLabel(f"{conn['name']} - {conn['url']}")
info_label.setStyleSheet("color: #DDD; font-size: 12px;")
# 狀態指示器
status_label = QLabel("")
if conn.get('enabled', False):
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
else:
status_label.setStyleSheet("color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
# 控制按鈕
toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動")
toggle_btn.setFixedWidth(60)
toggle_btn.clicked.connect(lambda: self.ws_connection_toggled.emit(conn, toggle_btn, status_label))
toggle_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #555; }
""")
remove_btn = QPushButton("移除")
remove_btn.setFixedWidth(60)
remove_btn.clicked.connect(lambda: self.ws_connection_removed.emit(conn, panel))
remove_btn.setStyleSheet("""
QPushButton {
background-color: #d32f2f;
color: white;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #b71c1c; }
""")
layout.addWidget(status_label)
layout.addWidget(info_label)
layout.addStretch()
layout.addWidget(toggle_btn)
layout.addWidget(remove_btn)
# 儲存引用
panel.connection = conn
panel.toggle_btn = toggle_btn
panel.status_label = status_label
return panel
def add_udp_panel(self, conn):
"""添加 UDP 連接面板到列表"""
panel = self.create_udp_connection_panel(conn)
self.udp_list_layout.addWidget(panel)
self.udp_connections.append(conn)
return panel
def add_ws_panel(self, conn):
"""添加 WebSocket 連接面板到列表"""
panel = self.create_ws_connection_panel(conn)
self.ws_list_layout.addWidget(panel)
self.ws_connections.append(conn)
return panel
def remove_udp_connection_from_list(self, conn):
"""從列表中移除 UDP 連接"""
if conn in self.udp_connections:
self.udp_connections.remove(conn)
def remove_ws_connection_from_list(self, conn):
"""從列表中移除 WebSocket 連接"""
if conn in self.ws_connections:
3 months ago
self.ws_connections.remove(conn)
def create_serial_connection_panel(self, conn):
"""創建 Serial 連接面板"""
panel = QWidget()
panel.setStyleSheet("""
QWidget {
background-color: #2A2A2A;
border-radius: 6px;
padding: 8px;
border: 1px solid #444;
}
""")
layout = QHBoxLayout(panel)
layout.setContentsMargins(8, 8, 8, 8)
# 連接資訊
info_label = QLabel(f"{conn['name']} - {conn['port']} @ {conn['baudrate']}")
info_label.setStyleSheet("color: #DDD; font-size: 12px;")
# 狀態指示器
status_label = QLabel("")
if conn.get('enabled', False):
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
else:
status_label.setStyleSheet("color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
# 控制按鈕
toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動")
toggle_btn.setFixedWidth(60)
toggle_btn.clicked.connect(lambda: self.serial_connection_toggled.emit(conn, toggle_btn, status_label))
toggle_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #555; }
""")
remove_btn = QPushButton("移除")
remove_btn.setFixedWidth(60)
remove_btn.clicked.connect(lambda: self.serial_connection_removed.emit(conn, panel))
remove_btn.setStyleSheet("""
QPushButton {
background-color: #d32f2f;
color: white;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #b71c1c; }
""")
layout.addWidget(status_label)
layout.addWidget(info_label)
layout.addStretch()
layout.addWidget(toggle_btn)
layout.addWidget(remove_btn)
# 儲存引用
panel.connection = conn
panel.toggle_btn = toggle_btn
panel.status_label = status_label
return panel
def add_serial_panel(self, conn):
"""添加 Serial 連接面板到列表"""
panel = self.create_serial_connection_panel(conn)
self.serial_list_layout.addWidget(panel)
self.serial_connections.append(conn)
return panel
def remove_serial_connection_from_list(self, conn):
"""從列表中移除 Serial 連接"""
if conn in self.serial_connections:
self.serial_connections.remove(conn)