Compare commits

..

5 Commits
lunu ... master

Author SHA1 Message Date
ken910606 d2a5f6fad7 Update GUI 2.3.0: JSON in Serial 3 days ago
wenchun 730c3e2420 feat(GUI): 新增模擬驗證滑動開關(ToggleSwitch)
在清除軌跡按鈕左邊加入滑動開關,可控制執行任務時是否開啟 matplotlib 驗證視窗。
預設開啟,OFF 時跳過驗證。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 week ago
ken910606 1426a618f4 Update GUI 2.2.0: Settings tab 1 week ago
ken910606 edd15df3fc Update GUI 2.1.0: logs history tab, drone panel attitude, linear twist overview 1 week ago
Chiyu Chen 7f8babfa5e (update)
1. mavlinkROS2Nodes.py 更新姿態發佈的 topic
2. 新增 interface AttitudeRaw 與對應的 package.xml & CMakeList
1 week ago

@ -1,10 +1,78 @@
#!/usr/bin/env python3
from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QLineEdit, QComboBox)
QPushButton, QLineEdit, QComboBox, QApplication)
from PyQt6.QtGui import QFont
from PyQt6.QtCore import pyqtSignal
import glob
import os
def _get_font_scale():
app = QApplication.instance()
if app is None:
return 1.0
scale = app.property("font_scale")
try:
return float(scale) if scale is not None else 1.0
except (TypeError, ValueError):
return 1.0
def _scale_stylesheet_font_sizes(stylesheet, scale):
if not stylesheet or 'font-size' not in stylesheet:
return stylesheet
import re
def repl(match):
size = float(match.group(1))
unit = match.group(2)
scaled = max(1.0, size * scale)
text = f"{scaled:.2f}".rstrip('0').rstrip('.')
return f"font-size: {text}{unit}"
return re.sub(r'font-size\s*:\s*([0-9]+(?:\.[0-9]+)?)\s*(px|pt)', repl, stylesheet)
def _set_scaled_stylesheet(widget, stylesheet):
widget._base_stylesheet = stylesheet
scaled = _scale_stylesheet_font_sizes(stylesheet, _get_font_scale())
widget._applied_stylesheet = scaled
widget.setStyleSheet(scaled)
def _reapply_scaled_stylesheet(widget):
current_stylesheet = widget.styleSheet()
base_stylesheet = getattr(widget, '_base_stylesheet', None)
applied_stylesheet = getattr(widget, '_applied_stylesheet', None)
if current_stylesheet != applied_stylesheet:
base_stylesheet = current_stylesheet
widget._base_stylesheet = base_stylesheet
if base_stylesheet is not None:
scaled = _scale_stylesheet_font_sizes(base_stylesheet, _get_font_scale())
widget._applied_stylesheet = scaled
if current_stylesheet != scaled:
widget.setStyleSheet(scaled)
def _apply_scaled_font(widget):
base_font = getattr(widget, '_base_font_for_scale', None)
if base_font is None:
app = QApplication.instance()
app_base_font = app.property("base_app_font") if app else None
base_font = QFont(app_base_font) if app_base_font is not None else QFont(widget.font())
widget._base_font_for_scale = QFont(base_font)
scaled_font = QFont(base_font)
scale = _get_font_scale()
if base_font.pointSizeF() > 0:
scaled_font.setPointSizeF(max(1.0, base_font.pointSizeF() * scale))
elif base_font.pointSize() > 0:
scaled_font.setPointSize(max(1, int(round(base_font.pointSize() * scale))))
widget.setFont(scaled_font)
class CommPanel(QWidget):
"""通讯设置面板类"""
@ -26,6 +94,7 @@ class CommPanel(QWidget):
self.ws_connections = []
self.serial_connections = []
self._init_ui()
self.apply_font_scale()
def _init_ui(self):
"""初始化UI"""
@ -35,7 +104,7 @@ class CommPanel(QWidget):
# ========== UDP MAVLink 區域 ==========
udp_title = QLabel("UDP")
udp_title.setStyleSheet("""
_set_scaled_stylesheet(udp_title, """
color: #DDD;
font-size: 14px;
font-weight: bold;
@ -60,7 +129,7 @@ class CommPanel(QWidget):
self.udp_ip_input = QLineEdit()
self.udp_ip_input.setText("127.0.0.1")
self.udp_ip_input.setPlaceholderText("IP")
self.udp_ip_input.setStyleSheet("""
_set_scaled_stylesheet(self.udp_ip_input, """
QLineEdit {
background-color: #333;
color: #DDD;
@ -74,7 +143,7 @@ class CommPanel(QWidget):
self.udp_port_input.setText("14550")
self.udp_port_input.setPlaceholderText("Port")
self.udp_port_input.setFixedWidth(80)
self.udp_port_input.setStyleSheet("""
_set_scaled_stylesheet(self.udp_port_input, """
QLineEdit {
background-color: #333;
color: #DDD;
@ -86,7 +155,7 @@ class CommPanel(QWidget):
add_udp_btn = QPushButton("添加")
add_udp_btn.clicked.connect(self._handle_add_udp)
add_udp_btn.setStyleSheet("""
_set_scaled_stylesheet(add_udp_btn, """
QPushButton {
background-color: #4CAF50;
color: white;
@ -98,9 +167,13 @@ class CommPanel(QWidget):
QPushButton:hover { background-color: #45a049; }
""")
add_udp_layout.addWidget(QLabel("IP:", styleSheet="color: #DDD;"))
ip_label = QLabel("IP:")
_set_scaled_stylesheet(ip_label, "color: #DDD;")
add_udp_layout.addWidget(ip_label)
add_udp_layout.addWidget(self.udp_ip_input)
add_udp_layout.addWidget(QLabel("Port:", styleSheet="color: #DDD;"))
port_label = QLabel("Port:")
_set_scaled_stylesheet(port_label, "color: #DDD;")
add_udp_layout.addWidget(port_label)
add_udp_layout.addWidget(self.udp_port_input)
add_udp_layout.addWidget(add_udp_btn)
@ -113,7 +186,7 @@ class CommPanel(QWidget):
# ========== Serial 區域 ==========
serial_title = QLabel("Serial")
serial_title.setStyleSheet("""
_set_scaled_stylesheet(serial_title, """
color: #DDD;
font-size: 14px;
font-weight: bold;
@ -136,7 +209,7 @@ class CommPanel(QWidget):
add_serial_layout.setContentsMargins(0, 0, 0, 0)
self.serial_port_combo = QComboBox()
self.serial_port_combo.setStyleSheet("""
_set_scaled_stylesheet(self.serial_port_combo, """
QComboBox {
background-color: #333;
color: #DDD;
@ -160,7 +233,7 @@ class CommPanel(QWidget):
refresh_ports_btn.setFixedWidth(35)
refresh_ports_btn.clicked.connect(self._refresh_serial_ports)
refresh_ports_btn.setToolTip("重新掃描串口")
refresh_ports_btn.setStyleSheet("""
_set_scaled_stylesheet(refresh_ports_btn, """
QPushButton {
background-color: #444;
color: #DDD;
@ -176,7 +249,7 @@ class CommPanel(QWidget):
self.serial_baudrate_combo.addItems(['9600', '19200', '38400', '57600', '115200'])
self.serial_baudrate_combo.setCurrentText('57600')
self.serial_baudrate_combo.setFixedWidth(100)
self.serial_baudrate_combo.setStyleSheet("""
_set_scaled_stylesheet(self.serial_baudrate_combo, """
QComboBox {
background-color: #333;
color: #DDD;
@ -197,7 +270,7 @@ class CommPanel(QWidget):
add_serial_btn = QPushButton("添加")
add_serial_btn.clicked.connect(self._handle_add_serial)
add_serial_btn.setStyleSheet("""
_set_scaled_stylesheet(add_serial_btn, """
QPushButton {
background-color: #4CAF50;
color: white;
@ -209,10 +282,14 @@ class CommPanel(QWidget):
QPushButton:hover { background-color: #45a049; }
""")
add_serial_layout.addWidget(QLabel("Port:", styleSheet="color: #DDD;"))
serial_port_label = QLabel("Port:")
_set_scaled_stylesheet(serial_port_label, "color: #DDD;")
add_serial_layout.addWidget(serial_port_label)
add_serial_layout.addWidget(self.serial_port_combo)
add_serial_layout.addWidget(refresh_ports_btn)
add_serial_layout.addWidget(QLabel("Baud:", styleSheet="color: #DDD;"))
baud_label = QLabel("Baud:")
_set_scaled_stylesheet(baud_label, "color: #DDD;")
add_serial_layout.addWidget(baud_label)
add_serial_layout.addWidget(self.serial_baudrate_combo)
add_serial_layout.addWidget(add_serial_btn)
@ -225,7 +302,7 @@ class CommPanel(QWidget):
# ========== WebSocket 區域 ==========
ws_title = QLabel("WebSocket")
ws_title.setStyleSheet("""
_set_scaled_stylesheet(ws_title, """
color: #DDD;
font-size: 14px;
font-weight: bold;
@ -249,7 +326,7 @@ class CommPanel(QWidget):
self.ws_url_input = QLineEdit()
self.ws_url_input.setPlaceholderText("host")
self.ws_url_input.setStyleSheet("""
_set_scaled_stylesheet(self.ws_url_input, """
QLineEdit {
background-color: #333;
color: #DDD;
@ -261,7 +338,7 @@ class CommPanel(QWidget):
add_ws_btn = QPushButton("添加")
add_ws_btn.clicked.connect(self._handle_add_ws)
add_ws_btn.setStyleSheet("""
_set_scaled_stylesheet(add_ws_btn, """
QPushButton {
background-color: #4CAF50;
color: white;
@ -273,7 +350,9 @@ class CommPanel(QWidget):
QPushButton:hover { background-color: #45a049; }
""")
add_ws_layout.addWidget(QLabel("URL:", styleSheet="color: #DDD;"))
url_label = QLabel("URL:")
_set_scaled_stylesheet(url_label, "color: #DDD;")
add_ws_layout.addWidget(url_label)
add_ws_layout.addWidget(self.ws_url_input)
add_ws_layout.addWidget(add_ws_btn)
@ -437,7 +516,7 @@ class CommPanel(QWidget):
def create_udp_connection_panel(self, conn):
"""創建 UDP 連接面板"""
panel = QWidget()
panel.setStyleSheet("""
_set_scaled_stylesheet(panel, """
QWidget {
background-color: #2A2A2A;
border-radius: 6px;
@ -451,22 +530,22 @@ class CommPanel(QWidget):
# 連接資訊
info_label = QLabel(f"{conn['name']} - {conn['ip']}:{conn['port']}")
info_label.setStyleSheet("color: #DDD; font-size: 12px;")
_set_scaled_stylesheet(info_label, "color: #DDD; font-size: 12px;")
# 狀態指示器
status_label = QLabel("")
if conn.get('enabled', False):
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
_set_scaled_stylesheet(status_label, "color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
else:
status_label.setStyleSheet("color: #888; font-size: 16px;")
_set_scaled_stylesheet(status_label, "color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
# 控制按鈕
toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動")
toggle_btn.setFixedWidth(60)
toggle_btn.clicked.connect(lambda: self.udp_connection_toggled.emit(conn, toggle_btn, status_label))
toggle_btn.setStyleSheet("""
_set_scaled_stylesheet(toggle_btn, """
QPushButton {
background-color: #444;
color: #DDD;
@ -481,7 +560,7 @@ class CommPanel(QWidget):
remove_btn = QPushButton("移除")
remove_btn.setFixedWidth(60)
remove_btn.clicked.connect(lambda: self.udp_connection_removed.emit(conn, panel))
remove_btn.setStyleSheet("""
_set_scaled_stylesheet(remove_btn, """
QPushButton {
background-color: #d32f2f;
color: white;
@ -509,7 +588,7 @@ class CommPanel(QWidget):
def create_ws_connection_panel(self, conn):
"""創建 WebSocket 連接面板"""
panel = QWidget()
panel.setStyleSheet("""
_set_scaled_stylesheet(panel, """
QWidget {
background-color: #2A2A2A;
border-radius: 6px;
@ -523,22 +602,22 @@ class CommPanel(QWidget):
# 連接資訊
info_label = QLabel(f"{conn['name']} - {conn['url']}")
info_label.setStyleSheet("color: #DDD; font-size: 12px;")
_set_scaled_stylesheet(info_label, "color: #DDD; font-size: 12px;")
# 狀態指示器
status_label = QLabel("")
if conn.get('enabled', False):
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
_set_scaled_stylesheet(status_label, "color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
else:
status_label.setStyleSheet("color: #888; font-size: 16px;")
_set_scaled_stylesheet(status_label, "color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
# 控制按鈕
toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動")
toggle_btn.setFixedWidth(60)
toggle_btn.clicked.connect(lambda: self.ws_connection_toggled.emit(conn, toggle_btn, status_label))
toggle_btn.setStyleSheet("""
_set_scaled_stylesheet(toggle_btn, """
QPushButton {
background-color: #444;
color: #DDD;
@ -553,7 +632,7 @@ class CommPanel(QWidget):
remove_btn = QPushButton("移除")
remove_btn.setFixedWidth(60)
remove_btn.clicked.connect(lambda: self.ws_connection_removed.emit(conn, panel))
remove_btn.setStyleSheet("""
_set_scaled_stylesheet(remove_btn, """
QPushButton {
background-color: #d32f2f;
color: white;
@ -605,7 +684,7 @@ class CommPanel(QWidget):
def create_serial_connection_panel(self, conn):
"""創建 Serial 連接面板"""
panel = QWidget()
panel.setStyleSheet("""
_set_scaled_stylesheet(panel, """
QWidget {
background-color: #2A2A2A;
border-radius: 6px;
@ -619,22 +698,22 @@ class CommPanel(QWidget):
# 連接資訊
info_label = QLabel(f"{conn['name']} - {conn['port']} @ {conn['baudrate']}")
info_label.setStyleSheet("color: #DDD; font-size: 12px;")
_set_scaled_stylesheet(info_label, "color: #DDD; font-size: 12px;")
# 狀態指示器
status_label = QLabel("")
if conn.get('enabled', False):
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
_set_scaled_stylesheet(status_label, "color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
else:
status_label.setStyleSheet("color: #888; font-size: 16px;")
_set_scaled_stylesheet(status_label, "color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
# 控制按鈕
toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動")
toggle_btn.setFixedWidth(60)
toggle_btn.clicked.connect(lambda: self.serial_connection_toggled.emit(conn, toggle_btn, status_label))
toggle_btn.setStyleSheet("""
_set_scaled_stylesheet(toggle_btn, """
QPushButton {
background-color: #444;
color: #DDD;
@ -649,7 +728,7 @@ class CommPanel(QWidget):
remove_btn = QPushButton("移除")
remove_btn.setFixedWidth(60)
remove_btn.clicked.connect(lambda: self.serial_connection_removed.emit(conn, panel))
remove_btn.setStyleSheet("""
_set_scaled_stylesheet(remove_btn, """
QPushButton {
background-color: #d32f2f;
color: white;
@ -685,3 +764,11 @@ class CommPanel(QWidget):
"""從列表中移除 Serial 連接"""
if conn in self.serial_connections:
self.serial_connections.remove(conn)
def apply_font_scale(self):
"""重新套用目前字體倍率到通訊面板。"""
_apply_scaled_font(self)
_reapply_scaled_stylesheet(self)
for child in self.findChildren(QWidget):
_apply_scaled_font(child)
_reapply_scaled_stylesheet(child)

@ -16,6 +16,10 @@ from PyQt6.QtCore import QObject, pyqtSignal
from pymavlink import mavutil
def _log(level, message):
print(f"[{level}] {message}")
class CommandSender(ABC):
"""指令發送抽象介面"""
@ -63,7 +67,7 @@ class MavlinkSender(CommandSender):
"""
self.connection_string = connection_string
self.mav = mavutil.mavlink_connection(connection_string)
print(f"MavlinkSender 已建立連線: {connection_string}")
_log("INFO", f"MavlinkSender 已建立連線: {connection_string}")
def send_position_global(self, drone_id, sysid, lat, lon, alt):
"""發送 SET_POSITION_TARGET_GLOBAL_INT。drone_id 未使用,保留為介面相容。"""
@ -86,7 +90,7 @@ class MavlinkSender(CommandSender):
if self.mav:
self.mav.close()
self.mav = None
print("MavlinkSender 已關閉")
_log("INFO", "MavlinkSender 已關閉")
class Ros2CommandSender(QObject):
@ -162,4 +166,4 @@ class Ros2CommandSender(QObject):
if not task.done():
task.cancel()
self._pending.clear()
print("Ros2CommandSender 已關閉")
_log("INFO", "Ros2CommandSender 已關閉")

@ -12,6 +12,10 @@ import socket
import sys
import os
import traceback
try:
import serial
except ImportError:
serial = None
from pymavlink import mavutil
from geometry_msgs.msg import Point, Vector3, Vector3Stamped, PoseWithCovarianceStamped
from sensor_msgs.msg import BatteryState, NavSatFix, Imu
@ -20,6 +24,11 @@ from mavros_msgs.msg import State, VfrHud
from nav_msgs.msg import Odometry
from mavros_msgs.srv import CommandBool, CommandTOL
def _log(level, message):
print(f"[{level}] {message}")
# 確保 src 目錄在 Python 路徑中(用於 fc_network_apps 導入)
_src_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _src_path not in sys.path:
@ -30,10 +39,9 @@ try:
from fc_network_apps.longCommand import CommandLongClient
except ImportError as e:
import traceback
print(f"⚠️ 警告: 無法導入 CommandLongClient")
print(f" 错误: {e}")
print(f" 这通常表示 ROS2 的 fc_interfaces 包未被编译或未正确安装")
print(f" 完整堆栈跟踪:")
_log("WARN", "無法導入 CommandLongClient")
_log("ERROR", f"錯誤: {e}")
_log("WARN", "這通常表示 ROS2 的 fc_interfaces 套件尚未編譯或安裝不完整")
traceback.print_exc()
CommandLongClient = None
@ -42,14 +50,168 @@ try:
from fc_network_apps.navigation import PositionTargetGlobalIntClient
except ImportError as e:
import traceback
print(f"⚠️ 警告: 無法導入 PositionTargetGlobalIntClient")
print(f" 错误: {e}")
_log("WARN", "無法導入 PositionTargetGlobalIntClient")
_log("ERROR", f"錯誤: {e}")
traceback.print_exc()
PositionTargetGlobalIntClient = None
try:
from fc_interfaces.msg import AttitudeRaw
except ImportError as e:
_log("WARN", "無法導入 AttitudeRaw")
_log("ERROR", f"錯誤: {e}")
AttitudeRaw = None
class DroneSignals(QObject):
update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data)
class JsonTelemetryProcessor:
"""共用 WebSocket JSON telemetry 轉換器。
Canonical JSON fields:
{
"system_id": 1,
"mode": "GUIDED",
"battery": 85,
"position": {"lat": 24.0, "lon": 120.0},
"heading": 90
}
Serial JSON uses this same shape; only the transport/framing differs.
"""
def _emit_json_connection_type(self, drone_id):
self.signals.update_signal.emit('connection_type', drone_id, {
'type': self.source_type
})
def process_json_telemetry_message(self, data):
"""處理 WebSocket JSON 格式的遙測資料。"""
try:
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
self.process_json_telemetry_message(item)
return
if not isinstance(data, dict):
return
system_id = data.get('system_id', data.get('sysid'))
if system_id is None:
return
drone_id = f"s{self.socket_id}_{system_id}"
self._emit_json_connection_type(drone_id)
mode = data.get('mode', data.get('mode_name'))
state = {}
if mode is not None:
state['mode'] = mode
if 'armed' in data:
state['armed'] = data.get('armed')
if state:
self.signals.update_signal.emit('state', drone_id, state)
if 'battery' in data:
battery = data['battery']
if isinstance(battery, dict):
battery_data = {}
if 'percentage' in battery:
battery_data['percentage'] = battery.get('percentage')
elif 'percent' in battery:
battery_data['percentage'] = battery.get('percent')
if 'voltage' in battery:
battery_data['voltage'] = battery.get('voltage')
elif 'voltage_v' in battery:
battery_data['voltage'] = battery.get('voltage_v')
if battery_data:
self.signals.update_signal.emit('battery', drone_id, battery_data)
else:
self.signals.update_signal.emit('battery', drone_id, {
'percentage': battery
})
elif 'battery_percentage' in data or 'battery_voltage' in data:
battery_data = {}
if 'battery_percentage' in data:
battery_data['percentage'] = data.get('battery_percentage')
if 'battery_voltage' in data:
battery_data['voltage'] = data.get('battery_voltage')
self.signals.update_signal.emit('battery', drone_id, battery_data)
pos = data.get('position')
if isinstance(pos, dict):
gps_data = {
'lat': pos.get('lat', pos.get('latitude', 0)),
'lon': pos.get('lon', pos.get('longitude', 0)),
'alt': pos.get('alt', pos.get('altitude', 0))
}
self.signals.update_signal.emit('gps', drone_id, gps_data)
elif 'lat' in data or 'latitude' in data:
self.signals.update_signal.emit('gps', drone_id, {
'lat': data.get('lat', data.get('latitude', 0)),
'lon': data.get('lon', data.get('longitude', 0)),
'alt': data.get('alt', data.get('altitude', 0))
})
local = data.get('local_position', data.get('local_pose', data.get('local')))
if isinstance(local, dict):
x = local.get('x', 0.0)
y = local.get('y', 0.0)
z = local.get('z', 0.0)
self.signals.update_signal.emit('local_pose', drone_id, {
'x': x,
'y': y,
'z': z
})
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': z
})
elif isinstance(pos, dict):
alt = pos.get('alt', pos.get('altitude', 0.0))
self.signals.update_signal.emit('local_pose', drone_id, {
'x': 0.0,
'y': 0.0,
'z': alt
})
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': alt
})
velocity = data.get('velocity')
if isinstance(velocity, dict):
self.signals.update_signal.emit('velocity', drone_id, {
'vx': velocity.get('vx', velocity.get('x', 0.0)),
'vy': velocity.get('vy', velocity.get('y', 0.0)),
'vz': velocity.get('vz', velocity.get('z', 0.0))
})
attitude = data.get('attitude')
if isinstance(attitude, dict):
self.signals.update_signal.emit('attitude', drone_id, {
'roll': attitude.get('roll', 0.0),
'pitch': attitude.get('pitch', 0.0),
'yaw': attitude.get('yaw', 0.0),
'rates': attitude.get('rates', (0.0, 0.0, 0.0))
})
hud = data.get('hud', {})
if not isinstance(hud, dict):
hud = {}
if 'heading' in data:
hud['heading'] = data.get('heading')
if hud:
self.signals.update_signal.emit('hud', drone_id, {
'heading': hud.get('heading', 0.0),
'groundspeed': hud.get('groundspeed', 0.0),
'airspeed': hud.get('airspeed', 0.0),
'throttle': hud.get('throttle', 0.0),
'alt': hud.get('alt', hud.get('altitude', 0.0)),
'climb': hud.get('climb', 0.0)
})
except Exception as e:
print(f"{self.source_type} JSON telemetry processing error: {e}")
class UDPMavlinkReceiver(threading.Thread):
"""UDP MAVLink 接收器"""
def __init__(self, ip, port, signals, connection_name, monitor=None):
@ -173,8 +335,8 @@ class UDPMavlinkReceiver(threading.Thread):
"""停止接收器"""
self.running = False
class SerialMavlinkReceiver(threading.Thread):
"""串口 MAVLink 接收器"""
class SerialMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
"""串口遙測接收器,可自動處理 MAVLink 或 WebSocket 格式 JSON。"""
def __init__(self, port, baudrate, signals, connection_name, monitor=None):
super().__init__(daemon=True)
self.port = port
@ -183,47 +345,125 @@ class SerialMavlinkReceiver(threading.Thread):
self.connection_name = connection_name
self.monitor = monitor # 保存 monitor 引用
self.socket_id = monitor.get_next_socket_id() if monitor else 0
self.source_type = 'Serial'
self.running = False
self.mav = None
self.serial_conn = None
self.mav_parser = mavutil.mavlink.MAVLink(None)
self.mav_parser.robust_parsing = True
self.json_buffer = []
self.json_depth = 0
self.json_in_string = False
self.json_escape = False
self._detected_protocols = set()
def run(self):
"""執行串口接收循環"""
"""執行串口接收循環"""
self.running = True
try:
print(f"Serial MAVLink receiver started on {self.port} at {self.baudrate} baud")
print(f"Serial receiver started on {self.port} at {self.baudrate} baud (MAVLink/JSON auto detect)")
if serial is None:
raise RuntimeError("pyserial 未安裝,無法啟動 Serial 連線")
# 創建 MAVLink 串口連接
self.mav = mavutil.mavlink_connection(
self.serial_conn = serial.Serial(
self.port,
baud=self.baudrate,
source_system=255
self.baudrate,
timeout=0.2
)
print(f"Waiting for heartbeat from {self.port}...")
self.mav.wait_heartbeat()
print(f"Heartbeat received from system {self.mav.target_system}, component {self.mav.target_component}")
while self.running:
try:
msg = self.mav.recv_match(blocking=True, timeout=1.0)
if msg is None:
chunk = self.serial_conn.read(256)
if not chunk:
continue
self.process_mavlink_message(msg)
for raw_byte in chunk:
byte = bytes([raw_byte])
self._process_json_byte(byte)
self._process_mavlink_byte(byte)
except Exception as e:
if self.running:
print(f"Error receiving MAVLink message from serial: {e}")
print(f"Error receiving serial telemetry: {e}")
except Exception as e:
print(f"Serial receiver error: {e}")
finally:
if self.mav:
if self.serial_conn:
try:
self.mav.close()
except:
self.serial_conn.close()
except Exception:
pass
def _process_mavlink_byte(self, byte):
try:
msg = self.mav_parser.parse_char(byte)
if msg is None:
return
if 'MAVLink' not in self._detected_protocols:
print(f"Serial {self.connection_name} detected MAVLink")
self._detected_protocols.add('MAVLink')
self.process_mavlink_message(msg)
except Exception:
# MAVLink parser is deliberately fed the whole stream, including JSON bytes.
return
def _process_json_byte(self, byte):
try:
char = byte.decode('utf-8')
except UnicodeDecodeError:
if self.json_buffer:
self._reset_json_framing()
return
if not self.json_buffer:
if char.isspace():
return
if char not in ('{', '['):
return
self.json_depth = 1
self.json_in_string = False
self.json_escape = False
self.json_buffer = [char]
return
self.json_buffer.append(char)
if self.json_in_string:
if self.json_escape:
self.json_escape = False
elif char == '\\':
self.json_escape = True
elif char == '"':
self.json_in_string = False
return
if char == '"':
self.json_in_string = True
elif char in ('{', '['):
self.json_depth += 1
elif char in ('}', ']'):
self.json_depth -= 1
if self.json_depth > 0:
return
payload = ''.join(self.json_buffer)
self._reset_json_framing()
try:
data = json.loads(payload)
if 'JSON' not in self._detected_protocols:
print(f"Serial {self.connection_name} detected JSON")
self._detected_protocols.add('JSON')
self.process_json_telemetry_message(data)
except json.JSONDecodeError as e:
print(f"Serial {self.connection_name} JSON decode error: {e}")
def _reset_json_framing(self):
self.json_buffer = []
self.json_depth = 0
self.json_in_string = False
self.json_escape = False
def process_mavlink_message(self, msg):
"""處理 MAVLink 訊息"""
try:
@ -306,15 +546,23 @@ class SerialMavlinkReceiver(threading.Thread):
def stop(self):
"""停止接收器"""
self.running = False
if self.serial_conn:
try:
self.serial_conn.close()
except Exception:
pass
class WebSocketMavlinkReceiver(threading.Thread):
class WebSocketMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
"""WebSocket MAVLink 接收器"""
def __init__(self, url, signals, connection_name, monitor=None):
super().__init__(daemon=True)
self.url = url
self.signals = signals
self.connection_name = connection_name
self.monitor = monitor # 保存 monitor 引用 self.socket_id = monitor.get_next_socket_id() if monitor else 0 # 一次性分配 socket_id self.running = False
self.monitor = monitor # 保存 monitor 引用
self.socket_id = monitor.get_next_socket_id() if monitor else 0 # 一次性分配 socket_id
self.source_type = 'WS'
self.running = False
self.max_retries = 5
self.base_delay = 1.0
@ -342,7 +590,8 @@ class WebSocketMavlinkReceiver(threading.Thread):
try:
data = json.loads(message)
data['_connection_source'] = self.connection_name
if isinstance(data, dict):
data['_connection_source'] = self.connection_name
self.process_websocket_message(data)
except json.JSONDecodeError as e:
print(f"WebSocket {self.connection_name} JSON decode error: {e}")
@ -372,57 +621,7 @@ class WebSocketMavlinkReceiver(threading.Thread):
def process_websocket_message(self, data):
"""處理 WebSocket 訊息"""
try:
system_id = data.get('system_id')
if not system_id:
return
drone_id = f"s{self.socket_id}_{system_id}"
# 模式
if 'mode' in data:
# 先發送連接類型資訊
self.signals.update_signal.emit('connection_type', drone_id, {
'type': 'WS'
})
self.signals.update_signal.emit('state', drone_id, {
'mode': data['mode'],
})
# 電池
if 'battery' in data:
self.signals.update_signal.emit('battery', drone_id, {
'percentage': data['battery']
})
# 位置
if 'position' in data:
pos = data['position']
self.signals.update_signal.emit('gps', drone_id, {
'lat': pos.get('lat', 0),
'lon': pos.get('lon', 0)
})
# Local position - 設定 x, y 為 0.0
self.signals.update_signal.emit('local_pose', drone_id, {
'x': 0.0,
'y': 0.0,
'z': 0.0
})
# Altitude - 設定為 0.0
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': 0.0
})
# 航向
if 'heading' in data:
self.signals.update_signal.emit('hud', drone_id, {
'heading': data['heading'],
'groundspeed': 0.0
})
except Exception as e:
print(f"WebSocket message processing error: {e}")
self.process_json_telemetry_message(data)
def stop(self):
"""停止接收器"""
@ -528,25 +727,25 @@ class DroneMonitor(Node):
unique_name = f"cmd_long_client_{drone_id}_{self.client_counter}"
client = CommandLongClient(node_name=unique_name)
self.command_long_clients[drone_id] = client
print(f" ✓ 為 {drone_id} 創建新的 CommandLongClient (node={unique_name})")
_log("INFO", f"已為 {drone_id} 建立 CommandLongClient (node={unique_name})")
# 將新 client 添加到主執行器(這樣它的回調才能被處理)
if self.executor:
self.executor.add_node(client)
print(f" ✓ 將 {drone_id} 的 client 添加到主執行器")
_log("INFO", f"已將 {drone_id} 的 CommandLongClient 加入主執行器")
except TypeError:
# 舊版 CommandLongClient 不支持 node_name 參數,使用預設
client = CommandLongClient()
self.command_long_clients[drone_id] = client
print(f" ✓ 為 {drone_id} 創建新的 CommandLongClient (使用預設名稱)")
_log("INFO", f"已為 {drone_id} 建立 CommandLongClient (使用預設名稱)")
if self.executor:
self.executor.add_node(client)
print(f" ✓ 將 {drone_id} 的 client 添加到主執行器")
_log("INFO", f"已將 {drone_id} 的 CommandLongClient 加入主執行器")
except Exception as e:
print(f"⚠️ 無法為 {drone_id} 建 CommandLongClient: {e}")
_log("WARN", f"無法為 {drone_id} CommandLongClient: {e}")
return None
return self.command_long_clients[drone_id]
@ -561,12 +760,12 @@ class DroneMonitor(Node):
unique_name = f"pos_target_client_{drone_id}_{self.pos_client_counter}"
client = PositionTargetGlobalIntClient(node_name=unique_name)
self.position_target_clients[drone_id] = client
print(f" ✓ 為 {drone_id} 創建新的 PositionTargetGlobalIntClient (node={unique_name})")
_log("INFO", f"已為 {drone_id} 建立 PositionTargetGlobalIntClient (node={unique_name})")
if self.executor:
self.executor.add_node(client)
print(f" ✓ 將 {drone_id} 的 position client 添加到主執行器")
_log("INFO", f"已將 {drone_id} 的 PositionTargetGlobalIntClient 加入主執行器")
except Exception as e:
print(f"⚠️ 無法為 {drone_id} 建 PositionTargetGlobalIntClient: {e}")
_log("WARN", f"無法為 {drone_id} PositionTargetGlobalIntClient: {e}")
return None
return self.position_target_clients[drone_id]
@ -593,7 +792,7 @@ class DroneMonitor(Node):
if not hasattr(self, subs_attr):
self.setup_drone(sys_id)
else:
# 檢查既有訂閱是否包含 position_ned,如果不包含就添加(兼容舊訂閱)
# 檢查既有訂閱是否包含 position_ned / attitude,如果不包含就添加(兼容舊訂閱)
subs = getattr(self, subs_attr, {})
if isinstance(subs, dict) and 'position_ned' not in subs:
base_topic = f'/fc_network/vehicle/{sys_id}'
@ -608,6 +807,19 @@ class DroneMonitor(Node):
setattr(self, subs_attr, subs) # 明確保存更新後的字典
except Exception as e:
pass
if isinstance(subs, dict) and 'attitude' not in subs and AttitudeRaw is not None:
base_topic = f'/fc_network/vehicle/{sys_id}'
try:
attitude_sub = self.create_subscription(
AttitudeRaw,
f'{base_topic}/attitude',
lambda msg, sid=sys_id: self.attitude_callback(sid, msg),
10
)
subs['attitude'] = attitude_sub
setattr(self, subs_attr, subs)
except Exception:
pass
def setup_drone(self, sys_id):
# sys_id 格式: sys11, sys12, ...
@ -663,6 +875,14 @@ class DroneMonitor(Node):
)
}
if AttitudeRaw is not None:
subs['attitude'] = self.create_subscription(
AttitudeRaw,
f'{base_topic}/attitude',
lambda msg, sid=sys_id: self.attitude_callback(sid, msg),
10
)
setattr(self, f'drone_{sys_id}_subs', subs)
# ================================================================================
@ -699,22 +919,22 @@ class DroneMonitor(Node):
# 解析 drone_id 提取 sysid
parts = drone_id.split('_')
if len(parts) < 2:
print(f"[SET_MODE] 無效的 drone_id 格式: {drone_id}")
_log("ERROR", f"[SET_MODE] 無效的 drone_id 格式: {drone_id}")
return False
sysid = int(parts[-1])
# 獲取模式對應的 custom_mode 值
custom_mode = self.MODE_MAPPING.get(mode_name)
if custom_mode is None:
print(f"[SET_MODE] 未知模式: {mode_name}")
_log("ERROR", f"[SET_MODE] 未知模式: {mode_name}")
return False
print(f"\n📢 [SET_MODE] {drone_id} {mode_name} (custom_mode={custom_mode})")
_log("INFO", f"[SET_MODE] {drone_id} -> {mode_name} (custom_mode={custom_mode})")
# 獲取或創建該 drone 專用的 client避免多機並行時的競態條件
client = self.get_or_create_client(drone_id)
if not client:
print(f"[SET_MODE] CommandLongClient 無法初始化")
_log("ERROR", "[SET_MODE] CommandLongClient 無法初始化")
return False
# 直接調用 async 方法,無需線程池(避免嵌套執行器衝突)
@ -727,13 +947,13 @@ class DroneMonitor(Node):
)
if result and result.success:
print(f"✅ [SET_MODE] 模式切換成功")
_log("INFO", f"[SET_MODE] {drone_id} 模式切換成功")
return True
else:
print(f"[SET_MODE] 模式切換失敗 (message={result.message if result else 'None'})")
_log("ERROR", f"[SET_MODE] 模式切換失敗 (message={result.message if result else 'None'})")
return False
except Exception as e:
print(f"[SET_MODE] 例外錯誤: {e}")
_log("ERROR", f"[SET_MODE] 例外錯誤: {e}")
traceback.print_exc()
return False
@ -743,17 +963,17 @@ class DroneMonitor(Node):
# 解析 drone_id 提取 sysid
parts = drone_id.split('_')
if len(parts) < 2:
print(f"[ARM] 無效的 drone_id 格式: {drone_id}")
_log("ERROR", f"[ARM] 無效的 drone_id 格式: {drone_id}")
return False
sysid = int(parts[-1])
action_name = "解鎖" if arm else "上鎖"
print(f"\n📢 [ARM] {drone_id} {action_name}")
_log("INFO", f"[ARM] {drone_id} -> {action_name}")
# 獲取或創建該 drone 專用的 client避免多機並行時的競態條件
client = self.get_or_create_client(drone_id)
if not client:
print(f"[ARM] CommandLongClient 無法初始化")
_log("ERROR", "[ARM] CommandLongClient 無法初始化")
return False
# 直接調用 async 方法,無需線程池(避免嵌套執行器衝突)
@ -765,13 +985,13 @@ class DroneMonitor(Node):
)
if result and result.success:
print(f"✅ [ARM] {action_name}成功")
_log("INFO", f"[ARM] {drone_id} {action_name}成功")
return True
else:
print(f"❌ [ARM] {action_name}失敗 (message={result.message if result else 'None'})")
_log("ERROR", f"[ARM] {drone_id} {action_name}失敗 (message={result.message if result else 'None'})")
return False
except Exception as e:
print(f"[ARM] 例外錯誤: {e}")
_log("ERROR", f"[ARM] 例外錯誤: {e}")
traceback.print_exc()
return False
@ -781,16 +1001,16 @@ class DroneMonitor(Node):
# 解析 drone_id 提取 sysid
parts = drone_id.split('_')
if len(parts) < 2:
print(f"[TAKEOFF] 無效的 drone_id 格式: {drone_id}")
_log("ERROR", f"[TAKEOFF] 無效的 drone_id 格式: {drone_id}")
return False
sysid = int(parts[-1])
print(f"\n📢 [TAKEOFF] {drone_id} 起飛 (高度={altitude}m)")
_log("INFO", f"[TAKEOFF] {drone_id} -> 起飛 (高度={altitude}m)")
# 獲取或創建該 drone 專用的 client避免多機並行時的競態條件
client = self.get_or_create_client(drone_id)
if not client:
print(f"[TAKEOFF] CommandLongClient 無法初始化")
_log("ERROR", "[TAKEOFF] CommandLongClient 無法初始化")
return False
# 直接調用 async 方法,無需線程池(避免嵌套執行器衝突)
@ -802,13 +1022,13 @@ class DroneMonitor(Node):
)
if result and result.success:
print(f"✅ [TAKEOFF] 起飛成功")
_log("INFO", f"[TAKEOFF] {drone_id} 起飛成功")
return True
else:
print(f"[TAKEOFF] 起飛失敗 (message={result.message if result else 'None'})")
_log("ERROR", f"[TAKEOFF] 起飛失敗 (message={result.message if result else 'None'})")
return False
except Exception as e:
print(f"[TAKEOFF] 例外錯誤: {e}")
_log("ERROR", f"[TAKEOFF] 例外錯誤: {e}")
traceback.print_exc()
return False
@ -840,17 +1060,42 @@ class DroneMonitor(Node):
return math.degrees(roll), math.degrees(pitch), math.degrees(yaw)
# callbacks
def attitude_callback(self, drone_id, msg):
if hasattr(msg, 'orientation'):
roll, pitch, yaw = self.quaternion_to_euler(msg.orientation)
self.latest_data[(drone_id, 'attitude')] = {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'rates': (msg.angular_velocity.x,
msg.angular_velocity.y,
msg.angular_velocity.z)
}
def attitude_callback(self, sys_id, msg):
"""處理姿態 topic支援 AttitudeRaw 與 IMU 四元數格式。"""
actual_drone_id = self.sys_to_actual_id.get(sys_id, None)
if actual_drone_id is None:
return
try:
if hasattr(msg, 'roll') and hasattr(msg, 'pitch') and hasattr(msg, 'yaw'):
data = {
'roll': math.degrees(msg.roll),
'pitch': math.degrees(msg.pitch),
'yaw': math.degrees(msg.yaw),
'rates': (
getattr(msg, 'rollspeed', 0.0),
getattr(msg, 'pitchspeed', 0.0),
getattr(msg, 'yawspeed', 0.0),
)
}
elif hasattr(msg, 'orientation'):
roll, pitch, yaw = self.quaternion_to_euler(msg.orientation)
data = {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'rates': (
msg.angular_velocity.x,
msg.angular_velocity.y,
msg.angular_velocity.z,
)
}
else:
return
self.latest_data[(actual_drone_id, 'attitude')] = data
except Exception as e:
print(f"Error parsing attitude msg for {sys_id}: {e}")
def battery_callback(self, sys_id, msg):
# 使用映射獲取實際的 drone_id
@ -903,7 +1148,7 @@ class DroneMonitor(Node):
sys_num = sys_id.replace('sys', '')
actual_drone_id = f's{assigned_socket_id}_{sys_num}'
self.sys_to_actual_id[sys_id] = actual_drone_id
print(f"[DEBUG] summary_callback: 已創建映射 {sys_id} -> {actual_drone_id} (使用 sys_num)")
_log("INFO", f"summary_callback 已建立映射 {sys_id} -> {actual_drone_id} (使用 sys_num)")
# 先發送連接類型資訊
self.signals.update_signal.emit('connection_type', actual_drone_id, {
@ -1003,6 +1248,9 @@ class DroneMonitor(Node):
x = msg.pose.pose.position.y # NED 座標系中交換 x/y與 local_pose 對齐)
y = msg.pose.pose.position.x
z = -msg.pose.pose.position.z # 將向下的 NED z 轉換為向上的高度z 為負表示向下)
vx = msg.twist.twist.linear.y
vy = msg.twist.twist.linear.x
vz = -msg.twist.twist.linear.z
# 儲存高度信息
self.latest_data[(actual_drone_id, 'altitude')] = {
@ -1016,6 +1264,13 @@ class DroneMonitor(Node):
'z': z
}
# 儲存速度資訊供總覽頁「XY速度」欄位顯示
self.latest_data[(actual_drone_id, 'velocity')] = {
'vx': vx,
'vy': vy,
'vz': vz
}
# 發送信號給 GUI 更新高度顯示
self.signals.update_signal.emit('altitude', actual_drone_id, {
'altitude': z
@ -1034,10 +1289,10 @@ class DroneMonitor(Node):
}
def start_serial_connection(self, port='/dev/ttyUSB0', baudrate=57600):
"""啟動串口 MAVLink 連接"""
"""啟動串口遙測連接(自動辨識 MAVLink / JSON"""
connection_name = f"Serial_{port.replace('/', '_')}"
receiver = SerialMavlinkReceiver(port, baudrate, self.signals, connection_name, self)
receiver.start()
self.serial_receivers.append(receiver)
print(f"Started serial connection on {port} at {baudrate} baud")
_log("INFO", f"已啟動 Serial 連線: {port} @ {baudrate} baud (MAVLink/JSON)")
return receiver

@ -1,10 +1,77 @@
#!/usr/bin/env python3
from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, QCheckBox)
from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel, QCheckBox, QApplication)
from PyQt6.QtCore import pyqtSignal
from PyQt6.QtGui import QPainter, QPen, QColor, QFont, QPolygonF
from PyQt6.QtCore import QPointF, Qt
import math
def _get_font_scale():
app = QApplication.instance()
if app is None:
return 1.0
scale = app.property("font_scale")
try:
return float(scale) if scale is not None else 1.0
except (TypeError, ValueError):
return 1.0
def _scale_stylesheet_font_sizes(stylesheet, scale):
if not stylesheet or 'font-size' not in stylesheet:
return stylesheet
import re
def repl(match):
size = float(match.group(1))
unit = match.group(2)
scaled = max(1.0, size * scale)
text = f"{scaled:.2f}".rstrip('0').rstrip('.')
return f"font-size: {text}{unit}"
return re.sub(r'font-size\s*:\s*([0-9]+(?:\.[0-9]+)?)\s*(px|pt)', repl, stylesheet)
def _set_scaled_stylesheet(widget, stylesheet):
widget._base_stylesheet = stylesheet
scaled = _scale_stylesheet_font_sizes(stylesheet, _get_font_scale())
widget._applied_stylesheet = scaled
widget.setStyleSheet(scaled)
def _reapply_scaled_stylesheet(widget):
current_stylesheet = widget.styleSheet()
base_stylesheet = getattr(widget, '_base_stylesheet', None)
applied_stylesheet = getattr(widget, '_applied_stylesheet', None)
if current_stylesheet != applied_stylesheet:
base_stylesheet = current_stylesheet
widget._base_stylesheet = base_stylesheet
if base_stylesheet is not None:
scaled = _scale_stylesheet_font_sizes(base_stylesheet, _get_font_scale())
widget._applied_stylesheet = scaled
if current_stylesheet != scaled:
widget.setStyleSheet(scaled)
def _apply_scaled_font(widget):
base_font = getattr(widget, '_base_font_for_scale', None)
if base_font is None:
app = QApplication.instance()
app_base_font = app.property("base_app_font") if app else None
base_font = QFont(app_base_font) if app_base_font is not None else QFont(widget.font())
widget._base_font_for_scale = QFont(base_font)
scaled_font = QFont(base_font)
scale = _get_font_scale()
if base_font.pointSizeF() > 0:
scaled_font.setPointSizeF(max(1.0, base_font.pointSizeF() * scale))
elif base_font.pointSize() > 0:
scaled_font.setPointSize(max(1, int(round(base_font.pointSize() * scale))))
widget.setFont(scaled_font)
class DronePanel(QWidget):
"""單個無人機面板類別"""
@ -32,12 +99,13 @@ class DronePanel(QWidget):
self.attitude_indicator = None
self._init_ui()
self.apply_font_scale()
def _init_ui(self):
"""初始化UI"""
self.setObjectName(f"panel_{self.drone_id}")
self.setFixedHeight(140)
self.setStyleSheet("""
_set_scaled_stylesheet(self, """
background-color: #2A2A2A;
border-radius: 8px;
""")
@ -49,7 +117,7 @@ class DronePanel(QWidget):
# 創建內容容器(包含 info 和 control
content_widget = QWidget()
content_widget.setStyleSheet("background-color: #333; border-radius: 6px;")
_set_scaled_stylesheet(content_widget, "background-color: #333; border-radius: 6px;")
content_layout = QHBoxLayout(content_widget)
content_layout.setContentsMargins(8, 8, 8, 8)
content_layout.setSpacing(8)
@ -82,7 +150,7 @@ class DronePanel(QWidget):
# 勾選框
self.checkbox = QCheckBox()
self.checkbox.setObjectName(f"{self.drone_id}_checkbox")
self.checkbox.setStyleSheet("""
_set_scaled_stylesheet(self.checkbox, """
QCheckBox {
color: #DDD;
}
@ -103,8 +171,8 @@ class DronePanel(QWidget):
)
# ID 顯示
id_label = QLabel(self.display_id)
id_label.setStyleSheet("""
self.id_label = QLabel(self.display_id)
_set_scaled_stylesheet(self.id_label, """
font-weight: bold;
font-size: 14px;
color: #7FFFD4;
@ -112,7 +180,7 @@ class DronePanel(QWidget):
""")
header_layout.addWidget(self.checkbox)
header_layout.addWidget(id_label)
header_layout.addWidget(self.id_label)
header_layout.addStretch()
info_layout.addWidget(header)
@ -148,15 +216,15 @@ class DronePanel(QWidget):
status_layout.setContentsMargins(0, 0, 0, 0)
status_title = QLabel("狀態:")
status_title.setStyleSheet("color: #888; min-width: 50px;")
_set_scaled_stylesheet(status_title, "color: #888; min-width: 50px;")
self.mode_label = QLabel("--")
self.mode_label.setObjectName(f"{self.drone_id}_mode")
self.mode_label.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(self.mode_label, "color: #DDD;")
self.armed_label = QLabel("--")
self.armed_label.setObjectName(f"{self.drone_id}_armed")
self.armed_label.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(self.armed_label, "color: #DDD;")
status_layout.addWidget(status_title)
status_layout.addWidget(self.mode_label)
@ -172,15 +240,15 @@ class DronePanel(QWidget):
connection_layout.setContentsMargins(0, 0, 0, 0)
connection_title = QLabel("Socket")
connection_title.setStyleSheet("color: #888; min-width: 50px;")
_set_scaled_stylesheet(connection_title, "color: #888; min-width: 50px;")
# 根據解析的 drone_id 資訊設定初始值
self.socket_seq_label = QLabel(self.socket_seq)
self.socket_seq_label.setObjectName(f"{self.drone_id}_socket_seq")
self.socket_seq_label.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(self.socket_seq_label, "color: #DDD;")
connection_sep = QLabel(" - ")
connection_sep.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(connection_sep, "color: #DDD;")
# 設定連接方式顯示
connection_type_map = {
@ -193,7 +261,7 @@ class DronePanel(QWidget):
self.connection_type_label = QLabel(connection_type)
self.connection_type_label.setObjectName(f"{self.drone_id}_connection_type")
self.connection_type_label.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(self.connection_type_label, "color: #DDD;")
connection_layout.addWidget(connection_title)
connection_layout.addWidget(self.socket_seq_label)
@ -210,29 +278,29 @@ class DronePanel(QWidget):
battery_layout.setContentsMargins(0, 0, 0, 0)
# 顯示百分比
battery_title = QLabel("電池:")
battery_title.setStyleSheet("color: #888; min-width: 50px;")
_set_scaled_stylesheet(battery_title, "color: #888; min-width: 50px;")
self.battery_pct_label = QLabel("--")
self.battery_pct_label.setObjectName(f"{self.drone_id}_battery_pct")
self.battery_pct_label.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(self.battery_pct_label, "color: #DDD;")
# 分隔符
separator1 = QLabel(" - ")
separator1.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(separator1, "color: #DDD;")
# 顯示電壓
self.battery_vol_label = QLabel("--")
self.battery_vol_label.setObjectName(f"{self.drone_id}_battery_vol")
self.battery_vol_label.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(self.battery_vol_label, "color: #DDD;")
# 分隔符
separator2 = QLabel(" - ")
separator2.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(separator2, "color: #DDD;")
# 顯示電池節數 (S count)
self.battery_cells_label = QLabel("--")
self.battery_cells_label.setObjectName(f"{self.drone_id}_battery_cells")
self.battery_cells_label.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(self.battery_cells_label, "color: #DDD;")
battery_layout.addWidget(battery_title)
battery_layout.addWidget(self.battery_pct_label)
@ -251,18 +319,18 @@ class DronePanel(QWidget):
altitude_layout.setContentsMargins(0, 0, 0, 0)
altitude_title = QLabel("高度:")
altitude_title.setStyleSheet("color: #888; min-width: 50px;")
_set_scaled_stylesheet(altitude_title, "color: #888; min-width: 50px;")
self.altitude_label = QLabel("--")
self.altitude_label.setObjectName(f"{self.drone_id}_altitude")
self.altitude_label.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(self.altitude_label, "color: #DDD;")
speed_title = QLabel("速度:")
speed_title.setStyleSheet("color: #888; margin-left: 10px;")
_set_scaled_stylesheet(speed_title, "color: #888; margin-left: 10px;")
self.speed_label = QLabel("--")
self.speed_label.setObjectName(f"{self.drone_id}_speed")
self.speed_label.setStyleSheet("color: #DDD;")
_set_scaled_stylesheet(self.speed_label, "color: #DDD;")
altitude_layout.addWidget(altitude_title)
altitude_layout.addWidget(self.altitude_label)
@ -321,6 +389,16 @@ class DronePanel(QWidget):
"""獲取勾選狀態"""
return self.checkbox.isChecked()
def apply_font_scale(self):
"""重新套用目前字體倍率到 panel 內所有元件。"""
_apply_scaled_font(self)
_reapply_scaled_stylesheet(self)
for child in self.findChildren(QWidget):
_apply_scaled_font(child)
_reapply_scaled_stylesheet(child)
if self.attitude_indicator:
self.attitude_indicator.update()
class SocketGroupPanel(QWidget):
# 定義信號
group_selection_changed = pyqtSignal(str, int) # socket_id, state
@ -331,11 +409,12 @@ class SocketGroupPanel(QWidget):
self.color = color
self.socket_type = socket_type
self._init_ui()
self.apply_font_scale()
def _init_ui(self):
"""初始化UI"""
self.setObjectName(f"socket_group_{self.socket_id}")
self.setStyleSheet("""
_set_scaled_stylesheet(self, """
background-color: #1E1E1E;
border-radius: 12px;
""")
@ -352,7 +431,7 @@ class SocketGroupPanel(QWidget):
# 分組勾選框
self.group_checkbox = QCheckBox()
self.group_checkbox.setObjectName(f"socket_{self.socket_id}_checkbox")
self.group_checkbox.setStyleSheet(f"""
_set_scaled_stylesheet(self.group_checkbox, f"""
QCheckBox {{ color: #DDD; }}
QCheckBox::indicator {{
width: 14px;
@ -380,7 +459,7 @@ class SocketGroupPanel(QWidget):
else:
title_text = f"Socket {self.socket_id}"
self.title_label = QLabel(title_text)
self.title_label.setStyleSheet(f"""
_set_scaled_stylesheet(self.title_label, f"""
font-weight: bold;
font-size: 16px;
color: {self.color};
@ -430,6 +509,14 @@ class SocketGroupPanel(QWidget):
"""設置分組勾選狀態(支持半選)"""
self.group_checkbox.setCheckState(state)
def apply_font_scale(self):
"""重新套用目前字體倍率到 socket 分組面板。"""
_apply_scaled_font(self)
_reapply_scaled_stylesheet(self)
for child in self.findChildren(QWidget):
_apply_scaled_font(child)
_reapply_scaled_stylesheet(child)
class AttitudeIndicator(QWidget):
"""
@ -503,7 +590,9 @@ class AttitudeIndicator(QWidget):
# pitch ladder (every 10°, ±30°)
p.setPen(QPen(QColor(255, 255, 255, 180), 1))
p.setFont(QFont("Arial", 6))
ladder_font = QFont("Arial")
ladder_font.setPointSizeF(max(1.0, 6 * _get_font_scale()))
p.setFont(ladder_font)
for deg in range(-30, 31, 10):
if deg == 0:
continue
@ -581,6 +670,8 @@ class AttitudeIndicator(QWidget):
# heading text centred (bigger)
p.setPen(QPen(QColor("#FFFFFF")))
p.setFont(QFont("Arial", 10, QFont.Weight.Bold))
heading_font = QFont("Arial", weight=QFont.Weight.Bold)
heading_font.setPointSizeF(max(1.0, 10 * _get_font_scale()))
p.setFont(heading_font)
hdg_str = f"{int(self.heading)}°"
p.drawText(0, int(strip_y), w, strip_h, Qt.AlignmentFlag.AlignCenter, hdg_str)

File diff suppressed because it is too large Load Diff

@ -3,6 +3,11 @@ from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtCore import QTimer, pyqtSignal, QObject, pyqtSlot
from PyQt6.QtWebChannel import QWebChannel
def _log(level, message):
print(f"[{level}] {message}")
class DroneMap:
"""無人機地圖類別 - 負責管理 Leaflet 地圖顯示"""
@ -11,6 +16,7 @@ class DroneMap:
self.map_view = QWebEngineView()
self.map_loaded = False
self.pending_map_updates = {}
self.font_scale = 1.0
# 創建橋接對象
self.bridge = MapBridge()
@ -31,6 +37,7 @@ class DroneMap:
<script src="https://unpkg.com/leaflet-rotatedmarker/leaflet.rotatedMarker.js"></script>
<script src="qrc:///qtwebchannel/qwebchannel.js"></script>
<style>
:root { --ui-font-scale: 1; }
html, body, #map { height: 100%; margin: 0; }
#map {
user-select: none;
@ -65,7 +72,7 @@ class DroneMap:
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 13px;
font-size: calc(13px * var(--ui-font-scale));
font-weight: bold;
box-shadow: 0 2px 5px rgba(0,0,0,0.2);
}
@ -85,7 +92,7 @@ class DroneMap:
}
.mission-info-row {
margin-bottom: 8px;
font-size: 12px;
font-size: calc(12px * var(--ui-font-scale));
color: #333;
}
.mission-info-label {
@ -104,7 +111,7 @@ class DroneMap:
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 13px;
font-size: calc(13px * var(--ui-font-scale));
font-weight: bold;
margin-top: 8px;
}
@ -131,7 +138,7 @@ class DroneMap:
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 13px;
font-size: calc(13px * var(--ui-font-scale));
font-weight: bold;
transition: background-color 0.2s;
}
@ -149,7 +156,7 @@ class DroneMap:
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 13px;
font-size: calc(13px * var(--ui-font-scale));
font-weight: bold;
transition: background-color 0.2s;
}
@ -167,7 +174,7 @@ class DroneMap:
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 13px;
font-size: calc(13px * var(--ui-font-scale));
font-weight: bold;
transition: background-color 0.2s;
}
@ -322,7 +329,7 @@ class DroneMap:
align-items: center;
justify-content: center;
font-weight: bold;
font-size: 12px;
font-size: calc(12px * var(--ui-font-scale));
text-shadow: 1px 1px 2px rgba(255,255,255,0.8), -1px -1px 2px rgba(255,255,255,0.8);
">${sysid}</div>`,
iconSize: [16, 16],
@ -564,7 +571,7 @@ class DroneMap:
'align-items: center;' +
'justify-content: center;' +
'font-weight: bold;' +
'font-size: 11px;' +
'font-size: calc(11px * var(--ui-font-scale));' +
'border: 2px solid white;' +
'box-shadow: 0 2px 5px rgba(0,0,0,0.3);' +
'">' + idx + '</div>',
@ -618,6 +625,10 @@ class DroneMap:
}
// ================================================================================
function setFontScale(scale) {
document.documentElement.style.setProperty('--ui-font-scale', scale);
}
// 開始任務
function startMission() {
if (!centerPosition || !targetPosition) {
@ -802,7 +813,7 @@ class DroneMap:
'width: 22px; height: 22px;' +
'border-radius: 50%;' +
'display: flex; align-items: center; justify-content: center;' +
'font-weight: bold; font-size: 10px;' +
'font-weight: bold; font-size: calc(10px * var(--ui-font-scale));' +
'border: 2px solid white;' +
'box-shadow: 0 2px 5px rgba(0,0,0,0.3);' +
'">' + groupId + '</div>',
@ -822,7 +833,7 @@ class DroneMap:
'width: 22px; height: 22px;' +
'border-radius: 50%;' +
'display: flex; align-items: center; justify-content: center;' +
'font-weight: bold; font-size: 14px;' +
'font-weight: bold; font-size: calc(14px * var(--ui-font-scale));' +
'border: 2px solid white;' +
'box-shadow: 0 2px 5px rgba(0,0,0,0.3);' +
'">★</div>',
@ -873,8 +884,9 @@ class DroneMap:
"""地圖加載完成回調"""
if ok:
self.map_loaded = True
self.set_font_scale(self.font_scale)
else:
print("⚠️ 地圖加載失敗")
_log("ERROR", "地圖載入失敗")
def update_drone_position(self, drone_id, lat, lon, heading):
"""更新無人機位置(加入待處理隊列)"""
@ -905,6 +917,12 @@ class DroneMap:
if self.map_loaded:
self.map_view.page().runJavaScript(f"focusOn('{drone_id}');")
def set_font_scale(self, scale):
"""設定地圖 HTML 控制項的字體倍率。"""
self.font_scale = scale
if self.map_loaded:
self.map_view.page().runJavaScript(f"setFontScale({scale:.3f});")
# ================================================================================
# 任務規劃視覺化方法
# ================================================================================
@ -924,20 +942,24 @@ class DroneMap:
f"{target_lat:.6f}, {target_lon:.6f});"
)
self.map_view.page().runJavaScript(js_code)
print(f"📍 地圖已繪製 Group {group_id} 任務規劃: "
f"C({center_lat:.6f}, {center_lon:.6f}) -> T({target_lat:.6f}, {target_lon:.6f})")
_log(
"INFO",
f"地圖已繪製 Group {group_id} 任務規劃: "
f"C({center_lat:.6f}, {center_lon:.6f}) -> "
f"T({target_lat:.6f}, {target_lon:.6f})",
)
def clear_mission_plan(self):
"""清除地圖上所有任務規劃標記"""
if self.map_loaded:
self.map_view.page().runJavaScript("clearAllMissionPlans();")
print("🗑️ 地圖已清除所有任務規劃")
_log("INFO", "地圖已清除所有任務規劃")
def clear_mission_plan_for_group(self, group_id):
"""清除指定群組的任務規劃標記"""
if self.map_loaded:
self.map_view.page().runJavaScript(f"clearMissionPlanForGroup('{group_id}');")
print(f"🗑️ 地圖已清除 Group {group_id} 任務規劃")
_log("INFO", f"地圖已清除 Group {group_id} 任務規劃")
def set_mission_mode(self, mode):
"""從 Python 端切換地圖的任務模式(觸發框選/路徑標記等)"""
@ -1033,52 +1055,56 @@ class MapBridge(QObject):
def clearAllDroneSelection(self):
"""供 JavaScript 調用的方法 - 清除所有無人機選擇"""
self.clear_all_drone_selection.emit()
print("🗑️ 清除所有無人機選擇")
_log("INFO", "清除所有無人機選擇")
@pyqtSlot()
def toggleSelectAllDrones(self):
"""供 JavaScript 調用的方法 - 切換全選/取消全選所有無人機"""
self.select_all_drones.emit()
print("🔄 切換全選無人機")
_log("INFO", "切換全選無人機")
@pyqtSlot(float, float, float, float)
def startMissionSignal(self, center_lat, center_lon, target_lat, target_lon):
"""供 JavaScript 調用的方法 - 開始任務"""
self.start_mission_signal.emit(center_lat, center_lon, target_lat, target_lon)
print(f"🚀 開始任務信號已發出: C({center_lat}, {center_lon}) -> T({target_lat}, {target_lon})")
_log(
"INFO",
f"已發出開始任務信號: "
f"C({center_lat}, {center_lon}) -> T({target_lat}, {target_lon})",
)
@pyqtSlot()
def pauseMissionSignal(self):
"""供 JavaScript 調用的方法 - 暫停任務"""
self.pause_mission_signal.emit()
print("⏸️ 暫停任務信號已發出")
_log("INFO", "已發出暫停任務信號")
@pyqtSlot(str)
def rectangleSelected(self, points_json):
"""供 JavaScript 調用的方法 - 矩形選擇完成"""
self.rectangle_selected.emit(points_json)
print(f"📦 矩形區域已選擇: {points_json}")
_log("INFO", f"矩形區域已選擇: {points_json}")
@pyqtSlot(str)
def polygonSelected(self, points_json):
"""供 JavaScript 調用的方法 - 多邊形選擇完成"""
self.polygon_selected.emit(points_json)
print(f"🔷 多邊形區域已選擇: {points_json}")
_log("INFO", f"多邊形區域已選擇: {points_json}")
@pyqtSlot(str)
def missionModeChanged(self, mode):
"""供 JavaScript 調用的方法 - 任務模式切換"""
self.mission_mode_changed.emit(mode)
print(f"🔄 任務模式切換: {mode}")
_log("INFO", f"任務模式已切換: {mode}")
@pyqtSlot(str)
def routeConfirmed(self, points_json):
"""供 JavaScript 調用的方法 - 路徑確認"""
self.route_confirmed.emit(points_json)
print(f"📍 路徑已確認: {points_json}")
_log("INFO", f"路徑已確認: {points_json}")
@pyqtSlot(str)
def droneBoxSelected(self, drone_ids_json):
"""供 JavaScript 調用的方法 - 框選無人機完成"""
self.drone_box_selected.emit(drone_ids_json)
print(f"📦 框選無人機: {drone_ids_json}")
_log("INFO", f"框選無人機完成: {drone_ids_json}")

@ -19,6 +19,10 @@ from enum import Enum
from PyQt6.QtCore import QObject, QTimer, pyqtSignal
def _log(level, message):
print(f"[{level}] {message}")
class MissionState(Enum):
"""整體任務狀態"""
IDLE = "idle"
@ -109,7 +113,7 @@ class MissionExecutor(QObject):
def start(self, planned_waypoints):
if self.state == MissionState.RUNNING:
print("任務已在執行中")
_log("WARN", "任務已在執行中")
return
self.tasks.clear()
@ -132,31 +136,34 @@ class MissionExecutor(QObject):
f"rendezvous WP={sorted(self.rendezvous_indices)}"
if self.rendezvous_indices else "無 rendezvous (各自跑)"
)
print(f"任務啟動: {len(self.tasks)} 架無人機, "
f"{total_wps} 個航點, "
f"到達半徑={self.arrival_radius}m, "
f"tick 週期={self._interval_ms}ms, "
f"barrier timeout={self.barrier_timeout_sec}s, "
f"{rv_info}")
_log(
"INFO",
f"任務已啟動: {len(self.tasks)} 架無人機, "
f"{total_wps} 個航點, "
f"到達半徑={self.arrival_radius}m, "
f"tick 週期={self._interval_ms}ms, "
f"barrier timeout={self.barrier_timeout_sec}s, "
f"{rv_info}",
)
def pause(self):
if self.state == MissionState.RUNNING:
self._timer.stop()
self.state = MissionState.PAUSED
print("任務暫停")
_log("INFO", "任務已暫停")
def resume(self):
if self.state == MissionState.PAUSED:
self._timer.start(self._interval_ms)
self.state = MissionState.RUNNING
print("任務恢復")
_log("INFO", "任務已恢復")
def stop(self):
self._timer.stop()
self.tasks.clear()
self.rendezvous_indices = set()
self.state = MissionState.IDLE
print("任務停止")
_log("INFO", "任務已停止")
# ------------------------------------------------------------------ 控制迴圈
@ -197,7 +204,7 @@ class MissionExecutor(QObject):
# rendezvous 點 → 不推進,進入 barrier 等待
task.status = TaskStatus.WAITING_AT_BARRIER
task.waiting_since = now
print(f" {task.drone_id} 抵達 barrier WP {task.wp_index},等待同伴")
_log("INFO", f"{task.drone_id} 抵達 barrier WP {task.wp_index},等待同伴")
self.task_status_changed.emit(
task.drone_id, task.status.value,
f"waiting at WP {task.wp_index}"
@ -236,7 +243,7 @@ class MissionExecutor(QObject):
self._timer.stop()
self.state = MissionState.IDLE
self.mission_completed.emit()
print("===== 任務全部完成 =====")
_log("INFO", "任務全部完成")
def _advance_waypoint(self, task, arrived_distance):
"""把 task 推進一個航點,重置發送旗標。不處理 barrier 邏輯。"""
@ -248,14 +255,20 @@ class MissionExecutor(QObject):
self.drone_waypoint_reached.emit(
task.drone_id, task.wp_index, task.total_waypoints
)
print(f" {task.drone_id} → DONE "
f"({task.total_waypoints}/{task.total_waypoints})")
_log(
"INFO",
f"{task.drone_id} 已完成所有航點 "
f"({task.total_waypoints}/{task.total_waypoints})",
)
return
self.drone_waypoint_reached.emit(
task.drone_id, task.wp_index, task.total_waypoints
)
print(f" {task.drone_id} → WP {task.wp_index}/{task.total_waypoints} "
f"(到達距離: {arrived_distance:.1f}m)")
_log(
"INFO",
f"{task.drone_id} 前往 WP {task.wp_index}/{task.total_waypoints} "
f"(到達距離: {arrived_distance:.1f}m)",
)
def _check_barriers(self, now):
"""檢查每個有人在等的 barrier 能不能釋放。"""
@ -289,14 +302,13 @@ class MissionExecutor(QObject):
oldest_wait = min(t.waiting_since for t in waiting_tasks)
if now - oldest_wait >= self.barrier_timeout_sec:
force_reason = f"timeout {self.barrier_timeout_sec:.0f}s"
print(f"⚠️ barrier WP {barrier_idx} {force_reason},強制放行")
_log("WARN", f"barrier WP {barrier_idx} {force_reason},強制放行")
else:
continue # 還沒到齊、也還沒 timeout → 繼續等
# 釋放:把所有在此 barrier 等待的機一起推進
tag = "全員到齊" if force_reason is None else force_reason
print(f"✅ barrier WP {barrier_idx} 釋放 ({tag})"
f"推進 {len(waiting_tasks)}")
_log("INFO", f"barrier WP {barrier_idx} 已釋放 ({tag}),推進 {len(waiting_tasks)}")
for task in waiting_tasks:
task.status = TaskStatus.NORMAL
msg = f"barrier WP {barrier_idx} released ({tag})"
@ -324,7 +336,7 @@ class MissionExecutor(QObject):
return
task.fail_count += 1
print(f"⚠️ {drone_id} 發送失敗 {task.fail_count}/{self.MAX_RETRY}: {message}")
_log("WARN", f"{drone_id} 發送失敗 {task.fail_count}/{self.MAX_RETRY}: {message}")
if task.fail_count < self.MAX_RETRY:
task.status = TaskStatus.RETRYING
@ -339,13 +351,13 @@ class MissionExecutor(QObject):
drone_id, task.status.value,
f"fallback LOITER after {self.MAX_RETRY} fails: {message}"
)
print(f"{drone_id} 連續失敗 {self.MAX_RETRY} → 切換 LOITER")
_log("ERROR", f"{drone_id} 連續失敗 {self.MAX_RETRY},切換至 LOITER")
self._fallback_to_loiter(drone_id)
def _fallback_to_loiter(self, drone_id):
"""用 monitor.set_mode 切 LOITER。set_mode 是 coroutine透過 event loop 派送。"""
if self.monitor is None:
print(f"⚠️ 無 monitor無法將 {drone_id} 切到 LOITER")
_log("WARN", f"無 monitor無法將 {drone_id} 切換至 LOITER")
return
try:
loop = asyncio.get_event_loop()

@ -23,6 +23,18 @@ GROUP_COLORS = [
'#FF6B9D', # 粉
]
DEFAULT_MISSION_PARAM_VALUES = {
'spacing': '5.0',
'base_altitude': '10.0',
'altitude_diff': '2.0',
'radius': '10.0',
'altitude': '10.0',
'start_angle': '0',
'lateral_offset': '3.0',
'longitudinal_spacing': '5.0',
'line_spacing': '5.0',
}
class MissionGroup:
"""單一任務群組的資料"""
@ -157,10 +169,13 @@ class GroupPanel(QWidget):
QPushButton:disabled {{ background-color: #444; color: #666; }}
"""
def __init__(self, group: MissionGroup, parent=None):
def __init__(self, group: MissionGroup, parent=None, default_params=None):
super().__init__(parent)
self.group = group
self.all_btn_ref = None # 保存全選按鈕的參考
self.default_params = dict(DEFAULT_MISSION_PARAM_VALUES)
if default_params:
self.default_params.update(default_params)
self._build_ui()
def _make_sep(self):
@ -372,23 +387,23 @@ class GroupPanel(QWidget):
# 每種任務類型的參數定義: (key, label, default_value)
self._param_defs = {
'M_FORMATION': [
('spacing', '間距 (m)', '5.0'),
('base_altitude', '基準高度 (m)', '10.0'),
('altitude_diff', '高低差 (m)', '2.0'),
('spacing', '間距 (m)', self.default_params['spacing']),
('base_altitude', '基準高度 (m)', self.default_params['base_altitude']),
('altitude_diff', '高低差 (m)', self.default_params['altitude_diff']),
],
'CIRCLE_FORMATION': [
('radius', '半徑 (m)', '10.0'),
('altitude', '高度 (m)', '10.0'),
('start_angle', '起始角 (°)', '0'),
('radius', '半徑 (m)', self.default_params['radius']),
('altitude', '高度 (m)', self.default_params['altitude']),
('start_angle', '起始角 (°)', self.default_params['start_angle']),
],
'LEADER_FOLLOWER': [
('lateral_offset', '橫向偏移 (m)', '3.0'),
('longitudinal_spacing', '縱向間距 (m)', '5.0'),
('altitude', '高度 (m)', '10.0'),
('lateral_offset', '橫向偏移 (m)', self.default_params['lateral_offset']),
('longitudinal_spacing', '縱向間距 (m)', self.default_params['longitudinal_spacing']),
('altitude', '高度 (m)', self.default_params['altitude']),
],
'GRID_SWEEP': [
('line_spacing', '掃描線距 (m)', '5.0'),
('altitude', '高度 (m)', '10.0'),
('line_spacing', '掃描線距 (m)', self.default_params['line_spacing']),
('altitude', '高度 (m)', self.default_params['altitude']),
],
}
@ -532,6 +547,13 @@ class GroupPanel(QWidget):
params[key] = float(default)
return params
def set_param_value(self, key, value):
"""更新指定參數欄位的文字值。"""
if key not in self._param_widgets:
return
_row_w, inp = self._param_widgets[key]
inp.setText(str(value))
def update_mission_info(self, center_lat, center_lon, target_lat, target_lon):
"""更新中心點 / 目標點顯示"""
info_style = f"color: {self.group.color}; font-size: 11px; font-weight: bold;"
@ -553,4 +575,3 @@ class GroupPanel(QWidget):
except ValueError:
alt = 10.0
self.takeoff_requested.emit(self.group.group_id, alt)

@ -8,11 +8,14 @@ endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/AttitudeRaw.msg"
"srv/MavPing.srv"
"srv/MavCommandLong.srv"
"srv/MavPositionTargetGlobalInt.srv"
DEPENDENCIES std_msgs
)
ament_package()

@ -0,0 +1,9 @@
uint32 seq
builtin_interfaces/Time stamp
string frame_id
float32 roll
float32 pitch
float32 yaw
float32 rollspeed
float32 pitchspeed
float32 yawspeed

@ -15,6 +15,9 @@
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<build_depend>std_msgs</build_depend>
<exec_depend>std_msgs</exec_depend>
<export>
<build_type>ament_cmake</build_type>
</export>

@ -32,6 +32,7 @@ import mavros_msgs.msg
# ROS2 Service imports
import fc_interfaces.srv as fcsrv
import fc_interfaces.msg as fcmsg
from .ackEnum import serviceAckResult
# 自定義 imports
@ -231,7 +232,7 @@ class VehicleStatusPublisher(Node):
publisher.publish(msg)
def _publish_position_ned(self, sysid: int, status: mvv.ComponentStatus):
"""發布 LOCAL_POSITION_NEDNED 座標,含位置與速度)"""
"""發布 LOCAL_POSITION_NED (NED 座標,含位置與速度)"""
if not self.rate_controller.should_publish(sysid, 'position_ned'):
return
@ -267,34 +268,28 @@ class VehicleStatusPublisher(Node):
publisher.publish(msg)
def _publish_attitude(self, sysid: int, status: mvv.ComponentStatus):
"""發布姿態IMU"""
"""發布原始姿態roll/pitch/yaw 與角速度"""
if not self.rate_controller.should_publish(sysid, 'attitude'):
return
att = status.attitude
if att.roll is None:
if att.roll is None or att.pitch is None or att.yaw is None:
return
publisher = self._get_or_create_publisher(sysid, 'attitude', sensor_msgs.msg.Imu)
publisher = self._get_or_create_publisher(
sysid, 'attitude', fcmsg.AttitudeRaw
)
if publisher.get_subscription_count() == 0:
return
msg = sensor_msgs.msg.Imu()
# 歐拉角轉四元數
qx, qy, qz, qw = self._euler_to_quaternion(
att.roll, att.pitch, att.yaw
)
msg.orientation.x = qx
msg.orientation.y = qy
msg.orientation.z = qz
msg.orientation.w = qw
# 角速度
if att.rollspeed is not None:
msg.angular_velocity.x = att.rollspeed
msg.angular_velocity.y = att.pitchspeed
msg.angular_velocity.z = att.yawspeed
msg = fcmsg.AttitudeRaw()
msg.stamp = self.get_clock().now().to_msg()
msg.roll = float(att.roll)
msg.pitch = float(att.pitch)
msg.yaw = float(att.yaw)
msg.rollspeed = float(att.rollspeed) if att.rollspeed is not None else 0.0
msg.pitchspeed = float(att.pitchspeed) if att.pitchspeed is not None else 0.0
msg.yawspeed = float(att.yawspeed) if att.yawspeed is not None else 0.0
publisher.publish(msg)
@ -1137,7 +1132,8 @@ ros2_manager = fc_ros_manager()
2026.04.07
1. 完成 ros2 MavPositionTargetGlobalInt 區域
2. 優化 response.ack_result 回傳值的資訊
3. 新增 _publish_position_ned() 發布剛體座標的訊息
4. 重製 _publish_attitude() 使其更簡單的去發布姿態訊息
TODO
1. service 部分會需要跟 mavlinkobject 大量互動 也許需要考慮對方的生命週期

@ -38,6 +38,7 @@ class SerialMode(Enum):
"""連接類型"""
STRAIGHT = auto() # 原始數據直通
XBEEAPI2AT = auto() # XBee API 模式
XBEEAPI_POLL = auto()
NOT_USE = auto() # 不使用
@ -176,10 +177,10 @@ class XBeeFrameProcessor(FrameProcessor):
"""根據 frame type 分派;若是 RX payload 回傳 bytes其餘回傳 None"""
frame_type = frame[3]
if frame_type == self.FRAME_TYPE_RX_PACKET:
if frame_type == self.FRAME_TYPE_RX_PACKET: # mavlink
return self._decapsulate(frame)
if frame_type == self.FRAME_TYPE_AT_RESPONSE:
if frame_type == self.FRAME_TYPE_AT_RESPONSE: # AT command
if self.at_handler is not None:
self.at_handler.handle_frame(frame)
return None
@ -335,9 +336,10 @@ class ATCommandHandler:
def _handle_rssi(self, data: bytes):
"""處理 DB (RSSI) 回應:單 byte 無號值,單位 dBm"""
pass
# if data:
# print(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev
if data:
print(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev
# logger.debug(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev
pass
def _handle_serial_high(self, data: bytes):
"""處理 SH (Serial Number High)"""
@ -370,6 +372,10 @@ class SerialHandler(asyncio.Protocol):
at_handler = ATCommandHandler(self.serial_port_str)
return XBeeFrameProcessor(at_handler=at_handler)
# if self.serial_mode == SerialMode.XBEEAPI_POLL:
# at_handler = ATCommandHandler_new(self.serial_port_str)
# return XBeeFrameProcessor(at_handler=at_handler)
logger.warning(f"Unknown serial mode: {self.serial_mode}, using Raw")
return RawFrameProcessor()

@ -7,6 +7,13 @@
<maintainer email="chiyu1468@hotmail.com">picars</maintainer>
<license>TODO: License declaration</license>
<exec_depend>fc_interfaces</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>nav_msgs</exec_depend>
<exec_depend>mavros_msgs</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>

Loading…
Cancel
Save