You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1605 lines
59 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QSplitter, QScrollArea,
QSizePolicy, QTabWidget, QTableWidget, QTableWidgetItem,
QHeaderView, QPushButton, QCheckBox, QLineEdit)
from PyQt6.QtCore import Qt, QTimer, QObject, pyqtSignal
from PyQt6.QtWebEngineWidgets import QWebEngineView
import math
import re
import sys
from threading import Lock
from geometry_msgs.msg import Point, Vector3
from sensor_msgs.msg import BatteryState, NavSatFix, Imu
from std_msgs.msg import Float64
from mavros_msgs.msg import State, VfrHud
from mavros_msgs.srv import CommandBool, CommandTOL, SetMode
import asyncio
from functools import partial
import threading
import websockets
import json
class DroneSignals(QObject):
update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data)
class DroneMonitor(Node):
def __init__(self):
super().__init__('drone_monitor')
self.signals = DroneSignals()
self.drone_topics = {}
self.lock = Lock()
self.arm_clients = {}
self.takeoff_clients = {}
self.setpoint_pubs = {}
self.selected_drones = set()
self.latest_data = {}
# 定義需要過濾的模式
self.filtered_modes = ['Mode(0x000000c0)']
# 定義多個 WebSocket 連接配置
self.websocket_connections = [
{
'name': 'local_1',
'url': 'ws://192.168.137.1:5163',
'enabled': True
},
{
'name': 'local_2',
'url': 'ws://0.0.0.0:8756',
'enabled': True # 新增的端口
},
{
'name': 'remote_8756',
'url': 'ws://192.168.50.48:8756',
'enabled': False # 可選擇啟用
}
]
# 啟動多個 WebSocket client 執行緒
for connection in self.websocket_connections:
if connection['enabled']:
threading.Thread(
target=self.start_ws_client,
args=(connection,),
daemon=True,
name=f"WebSocket-{connection['name']}"
).start()
# 主题检测定时器
self.create_timer(1.0, self.scan_topics)
def start_ws_client(self, connection_config):
"""啟動單個 WebSocket 客戶端"""
asyncio.set_event_loop(asyncio.new_event_loop())
asyncio.get_event_loop().run_until_complete(
self.ws_client_loop(connection_config)
)
async def ws_client_loop(self, connection_config):
"""單個 WebSocket 連接的主循環"""
retry_count = 0
max_retries = 5
base_delay = 1.0
connection_name = connection_config['name']
url = connection_config['url']
print(f"Starting WebSocket client for {connection_name} at {url}")
while retry_count < max_retries:
try:
async with websockets.connect(url) as websocket:
print(f"WebSocket {connection_name} connected to {url}")
retry_count = 0 # 重置重試計數
async for message in websocket:
try:
data = json.loads(message)
# 添加連接來源信息
data['_connection_source'] = connection_name
self.process_websocket_message(data)
except json.JSONDecodeError as e:
print(f"WebSocket {connection_name} JSON decode error: {e}")
except Exception as e:
print(f"WebSocket {connection_name} message processing error: {e}")
except websockets.exceptions.ConnectionClosedError:
print(f"WebSocket {connection_name} connection closed")
break
except Exception as e:
retry_count += 1
delay = base_delay * (2 ** min(retry_count, 4)) # 指數退避
print(f"WebSocket {connection_name} connection error: {e}, retrying in {delay}s (attempt {retry_count}/{max_retries})")
await asyncio.sleep(delay)
print(f"WebSocket client {connection_name} stopped after maximum retries")
def process_websocket_message(self, data):
try:
drone_id = data.get('system_id')
drone_id = f"s9_{drone_id}" if drone_id else None
if not drone_id:
return
# 模式
if 'mode' in data:
self.signals.update_signal.emit('state', drone_id, {
'mode': data['mode'],
})
# 電池
if 'battery' in data:
# 這裡假設 battery 是百分比
self.signals.update_signal.emit('battery', drone_id, {
'percentage': data['battery']
})
# 位置
if 'position' in data:
pos = data['position']
self.signals.update_signal.emit('gps', drone_id, {
'lat': pos.get('lat', 0),
'lon': pos.get('lon', 0)
})
self.signals.update_signal.emit('local_pose', drone_id, {
'x': round(pos.get('lat', 0), 6),
'y': round(pos.get('lon', 0), 6),
})
# 航向
if 'heading' in data:
self.signals.update_signal.emit('hud', drone_id, {
'heading': data['heading'],
})
except Exception as e:
print(f"WebSocket message processing error: {e}")
def scan_topics(self):
topics = self.get_topic_names_and_types()
drone_pattern = re.compile(r'/MavLinkBus/(s\d+_\d+)/(\w+)')
found_drones = set()
for topic_name, _ in topics:
if match := drone_pattern.match(topic_name):
drone_id, topic_type = match.groups()
found_drones.add(drone_id)
with self.lock:
self.drone_topics.setdefault(drone_id, set()).add(topic_type)
for drone_id in found_drones:
if not hasattr(self, f'drone_{drone_id}_subs'):
self.setup_drone(drone_id)
def setup_drone(self, drone_id):
base_topic = f'/MavLinkBus/{drone_id}'
# Add service clients
self.arm_clients[drone_id] = self.create_client(
CommandBool,
f'{base_topic}/cmd/arming'
)
self.takeoff_clients[drone_id] = self.create_client(
CommandTOL,
f'{base_topic}/cmd/takeoff'
)
# Add setpoint publisher
self.setpoint_pubs[drone_id] = self.create_publisher(
Point,
f'{base_topic}/setpoint_position/local',
10
)
subs = {
'attitude': self.create_subscription(
Imu,
f'{base_topic}/attitude',
lambda msg, did=drone_id: self.attitude_callback(did, msg),
10
),
'battery': self.create_subscription(
BatteryState,
f'{base_topic}/battery',
lambda msg, did=drone_id: self.battery_callback(did, msg),
10
),
'global': self.create_subscription(
NavSatFix,
f'{base_topic}/global_position/global',
lambda msg, did=drone_id: self.gps_callback(did, msg),
10
),
'rel_alt': self.create_subscription(
Float64,
f'{base_topic}/global_position/rel_alt',
lambda msg, did=drone_id: self.altitude_callback(did, msg),
10
),
'local_pose': self.create_subscription(
Point,
f'{base_topic}/local_position/pose',
lambda msg, did=drone_id: self.local_pose_callback(did, msg),
10
),
'local_vel': self.create_subscription(
Vector3,
f'{base_topic}/local_position/velocity',
lambda msg, did=drone_id: self.local_vel_callback(did, msg),
10
),
'loss_rate': self.create_subscription(
Float64,
f'{base_topic}/packet_loss_rate',
lambda msg, did=drone_id: self.loss_rate_callback(did, msg),
10
),
'state': self.create_subscription(
State,
f'{base_topic}/state',
lambda msg, did=drone_id: self.state_callback(did, msg),
10
),
'ping': self.create_subscription(
Float64,
f'{base_topic}/ping',
lambda msg, did=drone_id: self.ping_callback(did, msg),
10
),
'vfr_hud': self.create_subscription(
VfrHud,
f'{base_topic}/vfr_hud',
lambda msg, did=drone_id: self.hud_callback(did, msg),
10
)
}
setattr(self, f'drone_{drone_id}_subs', subs)
async def arm_drone(self, drone_id, arm):
if drone_id not in self.arm_clients:
return False
client = self.arm_clients[drone_id]
if not client.wait_for_service(timeout_sec=1.0):
return False
request = CommandBool.Request()
request.value = arm
future = client.call_async(request)
try:
response = await future
return response.success
except Exception as e:
self.get_logger().error(f'Arm service call failed: {e}')
return False
async def takeoff_drone(self, drone_id, altitude=10.0):
if drone_id not in self.takeoff_clients:
return False
client = self.takeoff_clients[drone_id]
if not client.wait_for_service(timeout_sec=1.0):
return False
request = CommandTOL.Request()
request.altitude = altitude
request.min_pitch = 0.0
request.yaw = 0.0
future = client.call_async(request)
try:
response = await future
return response.success
except Exception as e:
self.get_logger().error(f'Takeoff service call failed: {e}')
return False
def send_setpoint(self, drone_id, x, y, z):
"""Send setpoint position command"""
if drone_id not in self.setpoint_pubs:
return False
msg = Point()
msg.x = float(x)
msg.y = float(y)
msg.z = float(z)
self.setpoint_pubs[drone_id].publish(msg)
return True
def quaternion_to_euler(self, q):
sinr_cosp = 2 * (q.w * q.x + q.y * q.z)
cosr_cosp = 1 - 2 * (q.x**2 + q.y**2)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2 * (q.w * q.y - q.z * q.x)
pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi/2, sinp)
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y**2 + q.z**2)
yaw = math.atan2(siny_cosp, cosy_cosp)
return math.degrees(roll), math.degrees(pitch), math.degrees(yaw)
# callbacks
def attitude_callback(self, drone_id, msg):
if hasattr(msg, 'orientation'):
roll, pitch, yaw = self.quaternion_to_euler(msg.orientation)
self.latest_data[(drone_id, 'attitude')] = {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'rates': (msg.angular_velocity.x,
msg.angular_velocity.y,
msg.angular_velocity.z)
}
def battery_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'battery')] = {
'voltage': msg.voltage
}
def state_callback(self, drone_id, msg):
mode = msg.mode
if mode in self.filtered_modes:
return
self.latest_data[(drone_id, 'state')] = {
'mode': msg.mode,
'armed': msg.armed
}
def gps_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'gps')] = {
'lat': msg.latitude,
'lon': msg.longitude,
'alt': msg.altitude
}
def local_vel_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'velocity')] = {
'vx': msg.x,
'vy': msg.y,
'vz': msg.z
}
def altitude_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'altitude')] = {
'altitude': msg.data
}
def local_pose_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'local_pose')] = {
'x': msg.x,
'y': msg.y,
'z': msg.z
}
def hud_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'hud')] = {
'airspeed': msg.airspeed,
'groundspeed': msg.groundspeed,
'heading': msg.heading,
'throttle': msg.throttle,
'alt': msg.altitude,
'climb': msg.climb
}
def loss_rate_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'loss_rate')] = {
'loss_rate': msg.data
}
def ping_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'ping')] = {
'ping': msg.data
}
class ControlStationUI(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle('GCS')
self.resize(1400, 900)
# 初始化ROS2
rclpy.init()
self.monitor = DroneMonitor()
self.monitor.signals.update_signal.connect(self.update_ui)
# ROS执行器配置
self.executor = rclpy.executors.SingleThreadedExecutor()
self.executor.add_node(self.monitor)
# 定时处理ROS事件
self.timer = QTimer()
self.timer.timeout.connect(self.spin_ros)
self.timer.start(10)
# 初始化UI
self.drones = {}
self.socket_groups = {} # 新增:用於儲存 socket 分組
self.info_types = ["模式", "ARM", "電壓", "高度", "Local", "Velocity", "地速", "航向",
"Roll", "Pitch", "Yaw", "丟包", "延遲"]
self.info_type_map = {
"mode": 0,
"armed": 1,
"battery": 2,
"altitude": 3,
"local": 4,
"velocity": 5,
"groundspeed": 6,
"heading": 7,
"roll": 8,
"pitch": 9,
"yaw": 10,
"loss_rate": 11,
"ping": 12
}
self.socket_colors = {
'0': '#00BFFF', # DeepSkyBlue
'1': '#FFD700', # Gold
'2': '#FF69B4', # HotPink
'9': '#7CFC00', # LawnGreen
'default': '#AAAAAA'
}
self.drone_positions = {}
self.drone_headings = {}
self.map_loaded = False
# 添加地圖更新節流定時器
self.map_update_timer = QTimer()
self.map_update_timer.timeout.connect(self.update_map_positions)
self.map_update_timer.start(200) # 每 200ms 更新一次地圖
# 添加待更新的無人機位置緩存
self.pending_map_updates = {}
self.init_ui()
def init_ui(self):
# 只呼叫一次
main_splitter = QSplitter(Qt.Orientation.Horizontal)
# 左側 TabWidget
self.left_tab = QTabWidget()
# — 分頁 1Drone Panel
self.drone_panel_container = QWidget()
self.drone_panel_layout = QVBoxLayout(self.drone_panel_container)
self.drone_panel_layout.setAlignment(Qt.AlignmentFlag.AlignTop)
self.drone_panel_layout.setSpacing(0)
self.drone_panel_layout.setContentsMargins(10, 10, 10, 10)
# Wrap drone panel in scroll area
scroll = QScrollArea()
scroll.setWidget(self.drone_panel_container)
scroll.setWidgetResizable(True)
self.left_tab.addTab(scroll, "無人載具")
# 底部控制按鈕區域
bottom_control = QWidget()
bottom_layout = QVBoxLayout(bottom_control)
# 上方按鈕區域
upper_buttons = QHBoxLayout()
select_all_btn = QPushButton("全選")
select_all_btn.clicked.connect(self.handle_select_all)
arm_all_btn = QPushButton("批次解鎖")
arm_all_btn.clicked.connect(self.handle_arm_selected)
takeoff_all_btn = QPushButton("批次起飛")
takeoff_all_btn.clicked.connect(self.handle_takeoff_selected)
for btn in [select_all_btn, arm_all_btn, takeoff_all_btn]:
btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 5px 10px;
border-radius: 4px;
min-width: 80px;
}
QPushButton:hover { background-color: #555; }
""")
upper_buttons.addWidget(btn)
upper_buttons.addStretch()
# --- 模式切換區域 ---
mode_layout = QHBoxLayout()
mode_layout.setContentsMargins(0, 0, 0, 0)
mode_layout.setSpacing(8)
mode_label = QLabel("模式:")
mode_label.setStyleSheet("color: #DDD; min-width: 40px;")
from PyQt6.QtWidgets import QComboBox
self.mode_combo = QComboBox()
self.mode_combo.addItems(["AUTO", "GUIDED", "LOITER", "LAND"])
self.mode_combo.setCurrentIndex(1) # 預設 GUIDED
self.mode_combo.setStyleSheet("""
QComboBox { background-color: #333; color: #DDD; border-radius: 3px; padding: 2px 10px;}
""")
batch_mode_btn = QPushButton("批次切換模式")
batch_mode_btn.clicked.connect(self.handle_batch_mode_change)
batch_mode_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 5px 10px;
border-radius: 4px;
min-width: 80px;
}
QPushButton:hover { background-color: #555; }
""")
mode_layout.addWidget(mode_label)
mode_layout.addWidget(self.mode_combo)
mode_layout.addWidget(batch_mode_btn)
mode_layout.addStretch()
# Add coordinate inputs
coord_widget = QWidget()
coord_layout = QHBoxLayout(coord_widget)
coord_layout.setContentsMargins(0, 0, 0, 0)
self.x_input = QLineEdit()
self.y_input = QLineEdit()
self.z_input = QLineEdit()
for input_field in [self.x_input, self.y_input, self.z_input]:
input_field.setFixedWidth(60)
input_field.setStyleSheet("""
QLineEdit {
background-color: #333;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 3px;
}
""")
coord_layout.addWidget(QLabel("X:"))
coord_layout.addWidget(self.x_input)
coord_layout.addWidget(QLabel("Y:"))
coord_layout.addWidget(self.y_input)
coord_layout.addWidget(QLabel("Z:"))
coord_layout.addWidget(self.z_input)
# Add buttons to control layout
setpoint_btn = QPushButton("Setpoint")
setpoint_btn.clicked.connect(self.handle_setpoint_selected)
setpoint_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 5px 10px;
border-radius: 4px;
min-width: 80px;
}
QPushButton:hover { background-color: #555; }
""")
# 下方座標輸入區域
lower_control = QHBoxLayout()
lower_control.addWidget(coord_widget)
lower_control.addWidget(setpoint_btn)
lower_control.addStretch()
# 加入底部控制區
bottom_layout.addLayout(upper_buttons)
bottom_layout.addLayout(mode_layout)
bottom_layout.addLayout(lower_control)
# — 分頁 2Overview Table
self.overview_table = QTableWidget()
# 一開始只有一欄
self.overview_table.setColumnCount(1)
self.overview_table.setRowCount(len(self.info_types))
self.overview_table.setHorizontalHeaderLabels(["資訊"])
header = self.overview_table.horizontalHeader()
# 只有第一欄自動收縮到內容寬度,其它欄不動
header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
self.overview_table.verticalHeader().setVisible(False)
for i, txt in enumerate(self.info_types):
item = QTableWidgetItem(txt)
item.setFlags(Qt.ItemFlag.ItemIsEnabled)
item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
self.overview_table.setItem(i, 0, item)
self.left_tab.addTab(self.overview_table, "總覽")
# 右侧地图区域
self.map_view = QWebEngineView()
inline_html = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<link rel="stylesheet" href="https://unpkg.com/leaflet/dist/leaflet.css" />
<script src="https://unpkg.com/leaflet/dist/leaflet.js"></script>
<script src="https://unpkg.com/leaflet-rotatedmarker/leaflet.rotatedMarker.js"></script>
<style>
html, body, #map { height: 100%; margin: 0; }
.map-controls {
position: absolute;
top: 10px;
right: 10px;
z-index: 1000;
}
.control-button {
padding: 8px 12px;
background-color: #f44336;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 12px;
box-shadow: 0 2px 5px rgba(0,0,0,0.2);
}
.control-button:hover {
background-color: #d32f2f;
}
</style>
</head>
<body>
<div id="map"></div>
<div class="map-controls">
<button class="control-button" onclick="clearAllTrajectories()">清除軌跡</button>
</div>
<script>
var map = L.map('map').setView([0, 0], 19);
L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
maxZoom: 19,
attribution: '© OpenStreetMap contributors'
}).addTo(map);
var arrowIcon = L.icon({
iconUrl: 'https://cdn-icons-png.flaticon.com/512/399/399308.png',
iconSize: [40, 40],
iconAnchor: [20, 20]
});
function getColorBySocketId(id) {
if (id.startsWith("s0_")) return "#00BFFF";
if (id.startsWith("s1_")) return "#FFD700";
if (id.startsWith("s2_")) return "#FF69B4";
if (id.startsWith("s9_")) return "#7CFC00";
return "#AAAAAA";
}
function createIdIcon(id) {
const color = getColorBySocketId(id);
const sysid = id.split('_')[1];
return L.divIcon({
className: 'drone-id',
html: `<div style="
position: relative;
left: 2px;
background-color: ${color};
color: black;
width: 16px;
height: 16px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
font-weight: bold;
font-size: 10px;
">${sysid}</div>`,
iconSize: [20, 20],
iconAnchor: [10, 10]
});
}
var markers = {};
var idLabels = {};
var trajectories = {};
var trajectoryLines = {};
var focusedId = null;
var initialized = false;
var trajectoryGroup = L.layerGroup().addTo(map);
function initTrajectory(id) {
if (!trajectories[id]) {
trajectories[id] = [];
const color = getColorBySocketId(id);
trajectoryLines[id] = L.polyline([], {
color: color,
weight: 3,
opacity: 0.7,
smoothFactor: 1
}).addTo(trajectoryGroup);
}
}
function addTrajectoryPoint(id, lat, lon) {
initTrajectory(id);
const point = [lat, lon];
trajectories[id].push(point);
if (trajectories[id].length > 1000) {
trajectories[id].shift();
}
trajectoryLines[id].setLatLngs([...trajectories[id]]);
}
function focusOn(id) {
if (!markers[id]) return;
focusedId = id;
var latlng = markers[id].getLatLng();
map.flyTo(latlng, map.getZoom());
}
setInterval(() => {
if (focusedId && markers[focusedId]) {
var latlng = markers[focusedId].getLatLng();
map.panTo(latlng);
}
}, 1000);
function updateDrone(lat, lon, id, heading) {
if (markers[id]) {
const lastPos = markers[id].getLatLng();
const distance = lastPos.distanceTo([lat, lon]);
if (distance > 1) {
addTrajectoryPoint(id, lat, lon);
}
markers[id]
.setLatLng([lat, lon])
.setRotationAngle(heading);
idLabels[id].setLatLng([lat, lon]);
} else {
initTrajectory(id);
addTrajectoryPoint(id, lat, lon);
markers[id] = L.marker([lat, lon], {
icon: arrowIcon,
rotationAngle: heading,
rotationOrigin: 'center'
})
.on('click', function () {
focusOn(id);
})
.addTo(map);
idLabels[id] = L.marker([lat, lon], {
icon: createIdIcon(id),
zIndexOffset: 1000
})
.on('click', function() {
focusOn(id);
})
.addTo(map);
if (!initialized || id < focusedId) {
focusOn(id);
initialized = true;
}
}
}
function clearAllTrajectories() {
trajectories = {};
Object.values(trajectoryLines).forEach(line => {
trajectoryGroup.removeLayer(line);
});
trajectoryLines = {};
console.log('所有軌跡已清除');
}
</script>
</body>
</html>
'''
self.map_view.setHtml(inline_html)
self.map_view.loadFinished.connect(self.on_map_loaded)
# Create left container and its layout
left_container = QWidget()
left_layout = QVBoxLayout(left_container)
left_layout.setContentsMargins(0, 0, 0, 0)
# Add tab widget and bottom control to left container
left_layout.addWidget(self.left_tab)
left_layout.addWidget(bottom_control)
# Add widgets to splitter
main_splitter.addWidget(left_container)
main_splitter.addWidget(self.map_view)
main_splitter.setSizes([400, 1000])
self.setCentralWidget(main_splitter)
def on_map_loaded(self, ok: bool):
if ok:
self.map_loaded = True
else:
print("⚠️ 地图页加载失败")
# 修改2: 添加地圖更新節流方法
def update_map_positions(self):
"""批量更新地圖上的無人機位置"""
if not self.map_loaded or not self.pending_map_updates:
return
# 批量執行所有待更新的位置
js_commands = []
for drone_id, (lat, lon, heading) in self.pending_map_updates.items():
js_commands.append(f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});")
if js_commands:
combined_js = "\n".join(js_commands)
self.map_view.page().runJavaScript(combined_js)
# 清空待更新緩存
self.pending_map_updates.clear()
def create_drone_panel(self, drone_id):
panel = QWidget()
panel.setObjectName(f"panel_{drone_id}")
panel.setFixedHeight(140) # 根據需要調整高度
panel.setStyleSheet("""
QWidget#panel_%s {
background-color: #2A2A2A;
border-radius: 8px;
margin: 6px;
padding: 10px;
border: 1px solid #444;
}
QLabel {
color: #DDD;
font-size: 12px;
padding: 2px;
}
""" % drone_id)
# 主佈局改為水平佈局
main_layout = QHBoxLayout(panel)
main_layout.setContentsMargins(8, 8, 8, 8)
main_layout.setSpacing(8)
# 左側資訊區域
info_widget = QWidget()
info_layout = QVBoxLayout(info_widget)
info_layout.setContentsMargins(0, 0, 0, 0)
info_layout.setSpacing(4)
# 顶部标题栏
header = QWidget()
header_layout = QHBoxLayout(header)
header_layout.setContentsMargins(0, 0, 0, 0)
# 在標題列加入勾選框
checkbox = QCheckBox()
checkbox.setObjectName(f"{drone_id}_checkbox")
checkbox.setStyleSheet("QCheckBox { color: #DDD; }")
checkbox.stateChanged.connect(lambda state: self.handle_drone_selection(drone_id, state))
header_layout.insertWidget(0, checkbox) # 插入到最前面
# ID显示 - 修改這裡:將 s0_4 格式改為 s4 格式
display_id = 's' + drone_id.split('_')[1]
id_label = QLabel(display_id)
id_label.setStyleSheet("""
font-weight: bold;
font-size: 14px;
color: #7FFFD4;
min-width: 80px;
""")
header_layout.addWidget(id_label)
header_layout.addStretch()
info_layout.addWidget(header)
# 第一行:狀態 (模式 + ARM狀態)
status_row = QWidget()
status_layout = QHBoxLayout(status_row)
status_layout.setContentsMargins(0, 0, 0, 0)
status_title = QLabel("狀態:")
status_title.setStyleSheet("color: #888; min-width: 50px;")
mode_label = QLabel("--")
mode_label.setObjectName(f"{drone_id}_mode")
armed_label = QLabel("--")
armed_label.setObjectName(f"{drone_id}_armed")
status_layout.addWidget(status_title)
status_layout.addWidget(mode_label)
status_layout.addWidget(armed_label)
status_layout.addStretch()
info_layout.addWidget(status_row)
# 第二行:電池
battery_row = QWidget()
battery_layout = QHBoxLayout(battery_row)
battery_layout.setContentsMargins(0, 0, 0, 0)
battery_title = QLabel("電池:")
battery_title.setStyleSheet("color: #888; min-width: 50px;")
battery_label = QLabel("--")
battery_label.setObjectName(f"{drone_id}_battery")
battery_layout.addWidget(battery_title)
battery_layout.addWidget(battery_label)
battery_layout.addStretch()
info_layout.addWidget(battery_row)
# 第三行:位置 + 高度
position_row = QWidget()
position_layout = QHBoxLayout(position_row)
position_layout.setContentsMargins(0, 0, 0, 0)
position_title = QLabel("位置:")
position_title.setStyleSheet("color: #888; min-width: 50px;")
local_label = QLabel("--")
local_label.setObjectName(f"{drone_id}_local")
altitude_title = QLabel("高度:")
altitude_title.setStyleSheet("color: #888; margin-left: 10px;")
altitude_label = QLabel("--")
altitude_label.setObjectName(f"{drone_id}_altitude")
position_layout.addWidget(position_title)
position_layout.addWidget(local_label)
position_layout.addWidget(altitude_title)
position_layout.addWidget(altitude_label)
position_layout.addStretch()
info_layout.addWidget(position_row)
# 第四行:航向 + 速度
nav_row = QWidget()
nav_layout = QHBoxLayout(nav_row)
nav_layout.setContentsMargins(0, 0, 0, 0)
heading_title = QLabel("航向:")
heading_title.setStyleSheet("color: #888; min-width: 50px;")
heading_label = QLabel("--")
heading_label.setObjectName(f"{drone_id}_heading")
speed_title = QLabel("速度:")
speed_title.setStyleSheet("color: #888; margin-left: 10px;")
groundspeed_label = QLabel("--")
groundspeed_label.setObjectName(f"{drone_id}_groundspeed")
nav_layout.addWidget(heading_title)
nav_layout.addWidget(heading_label)
nav_layout.addWidget(speed_title)
nav_layout.addWidget(groundspeed_label)
nav_layout.addStretch()
info_layout.addWidget(nav_row)
# 右側控制按鈕區域
control_widget = QWidget()
control_layout = QVBoxLayout(control_widget)
control_layout.setContentsMargins(0, 0, 0, 0)
control_layout.setSpacing(6)
# 設置控制區域的固定寬度
control_widget.setFixedWidth(80)
mode_btn = QPushButton("切換模式")
mode_btn.setObjectName(f"{drone_id}_mode_btn")
mode_btn.clicked.connect(lambda: self.handle_mode_change(drone_id))
mode_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 8px 6px;
border-radius: 4px;
font-size: 11px;
}
QPushButton:hover {
background-color: #555;
}
""")
arm_btn = QPushButton("解鎖")
arm_btn.setObjectName(f"{drone_id}_arm_btn")
arm_btn.clicked.connect(lambda: self.handle_arm(drone_id))
arm_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 8px 6px;
border-radius: 4px;
font-size: 11px;
}
QPushButton:hover {
background-color: #555;
}
""")
takeoff_btn = QPushButton("起飛")
takeoff_btn.setObjectName(f"{drone_id}_takeoff_btn")
takeoff_btn.clicked.connect(lambda: self.handle_takeoff(drone_id))
takeoff_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 8px 6px;
border-radius: 4px;
font-size: 11px;
}
QPushButton:hover {
background-color: #555;
}
""")
setpoint_btn = QPushButton("Setpoint")
setpoint_btn.setObjectName(f"{drone_id}_setpoint_btn")
setpoint_btn.clicked.connect(lambda: self.handle_single_setpoint(drone_id))
setpoint_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 8px 6px;
border-radius: 4px;
font-size: 11px;
}
QPushButton:hover {
background-color: #555;
}
""")
# 按鈕由上而下排列
control_layout.addWidget(mode_btn)
control_layout.addWidget(arm_btn)
control_layout.addWidget(takeoff_btn)
control_layout.addWidget(setpoint_btn)
control_layout.addStretch() # 將按鈕推向頂部
# 將左側資訊區域和右側控制區域加入主佈局
main_layout.addWidget(info_widget)
main_layout.addWidget(control_widget)
return panel
def create_socket_group_panel(self, socket_id):
"""創建 socket 分組面板"""
group_panel = QWidget()
group_panel.setObjectName(f"socket_group_{socket_id}")
group_panel.setStyleSheet(f"""
QWidget#socket_group_{socket_id} {{
background-color: #1E1E1E;
border: 2px solid #555;
border-radius: 12px;
margin: 8px;
padding: 12px;
}}
QLabel {{
color: #DDD;
font-size: 12px;
padding: 2px;
}}
""")
layout = QVBoxLayout(group_panel)
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(6)
# Socket 分組標題行 - 包含勾選框
title_row = QWidget()
title_layout = QHBoxLayout(title_row)
title_layout.setContentsMargins(0, 0, 0, 0)
# 分組勾選框
group_checkbox = QCheckBox()
group_checkbox.setObjectName(f"socket_{socket_id}_checkbox")
group_checkbox.setStyleSheet("QCheckBox { color: #DDD; }")
group_checkbox.stateChanged.connect(lambda state: self.handle_group_selection(socket_id, state))
# Socket 分組標題
color = self.socket_colors.get(socket_id, self.socket_colors['default'])
title_label = QLabel(f"Socket {socket_id}")
title_label.setStyleSheet(f"""
font-weight: bold;
font-size: 16px;
color: {color};
margin-bottom: 8px;
padding: 4px 8px;
background-color: #333;
border-radius: 6px;
""")
title_layout.addWidget(group_checkbox)
title_layout.addWidget(title_label)
title_layout.addStretch()
layout.addWidget(title_row)
# 創建子容器用於放置該 socket 下的所有無人機面板
drones_container = QWidget()
drones_layout = QVBoxLayout(drones_container)
drones_layout.setContentsMargins(0, 0, 0, 0)
drones_layout.setSpacing(4)
layout.addWidget(drones_container)
# 儲存容器的引用以便後續添加無人機
group_panel.drones_container = drones_container
group_panel.drones_layout = drones_layout
return group_panel
def create_data_row(self, layout, title, object_name, default):
row = QWidget()
hbox = QHBoxLayout(row)
hbox.setContentsMargins(0, 0, 0, 0)
# 标题标签
title_label = QLabel(title)
title_label.setStyleSheet("color: #888; min-width: 80px;")
title_label.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Preferred)
# 数据标签
value_label = QLabel(default)
value_label.setObjectName(object_name)
value_label.setWordWrap(True)
value_label.setStyleSheet("margin-left: 10px;")
hbox.addWidget(title_label)
hbox.addWidget(value_label)
layout.addWidget(row)
def handle_mode_change(self, drone_id):
"""處理單個無人機的模式切換"""
mode = self.mode_combo.currentText()
loop = asyncio.get_event_loop()
future = self.monitor.set_mode(drone_id, mode)
loop.create_task(self.handle_service_response(future, f"切換模式 {mode} {drone_id}"))
def handle_arm(self, drone_id):
loop = asyncio.get_event_loop()
arm_state = not self.monitor.get_arm_state(drone_id) # Toggle arm state
future = self.monitor.arm_drone(drone_id, arm_state)
loop.create_task(self.handle_service_response(future, f"{'解鎖' if arm_state else '上鎖'} {drone_id}"))
def handle_takeoff(self, drone_id):
loop = asyncio.get_event_loop()
future = self.monitor.takeoff_drone(drone_id, 10.0) # 10 meters default
loop.create_task(self.handle_service_response(future, f"起飛 {drone_id}"))
def handle_setpoint_selected(self):
"""處理發送 setpoint 命令"""
try:
x = float(self.x_input.text() or '0')
y = float(self.y_input.text() or '0')
z = float(self.z_input.text() or '0')
for drone_id in self.monitor.selected_drones:
if self.monitor.send_setpoint(drone_id, x, y, z):
self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000)
else:
self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000)
except ValueError:
self.statusBar().showMessage("座標格式錯誤", 3000)
def handle_single_setpoint(self, drone_id):
"""處理單個無人機的 setpoint 命令"""
try:
x = float(self.x_input.text() or '0')
y = float(self.y_input.text() or '0')
z = float(self.z_input.text() or '0')
if self.monitor.send_setpoint(drone_id, x, y, z):
self.statusBar().showMessage(f"發送位置命令到 {drone_id}: ({x}, {y}, {z})", 3000)
else:
self.statusBar().showMessage(f"發送位置命令失敗: {drone_id}", 3000)
except ValueError:
self.statusBar().showMessage("座標格式錯誤", 3000)
async def handle_service_response(self, future, action):
try:
result = await future
if result:
self.statusBar().showMessage(f"{action} 成功", 3000)
else:
self.statusBar().showMessage(f"{action} 失敗", 3000)
except Exception as e:
self.statusBar().showMessage(f"{action} 錯誤: {str(e)}", 3000)
def update_ui(self, msg_type, drone_id, data):
# 檢查是否為新無人機,若是則加入無人機面板
if drone_id not in self.drones:
self.add_drone(drone_id)
return
# 確認無人機面板存在
if not (panel := self.drones.get(drone_id)):
return
# 更新特定欄位並在總覽頁面更新
if msg_type == 'state':
mode = data.get('mode', 'UNKNOWN')
armed = data.get('armed', None)
mode_color = '#FF5555' if any(x in mode.upper() for x in ['RTL', '返航', 'EMERGENCY']) else '#55FF55'
if armed is True:
arm_text = "ARMED"
arm_color = '#55FF55'
elif armed is False:
arm_text = "DISARMED"
arm_color = '#FF5555'
else:
arm_text = "--"
arm_color = '#AAAAAA' # 未知狀態
# 更新無人機面板欄位
self.update_field(panel, drone_id, 'mode', mode, mode_color)
self.update_field(panel, drone_id, 'armed', arm_text, arm_color)
# 更新總覽頁面欄位
self.update_overview_table(drone_id, 'mode', mode)
self.update_overview_table(drone_id, 'armed', arm_text)
elif msg_type == 'battery':
voltage = data.get('voltage', 16)
# 判斷電池節數
cells = round(voltage / 3.95)
# 計算電量百分比
percentage = (voltage / cells - 3.7) / 0.5 * 100
# 根據百分比設置顏色
if percentage < 20:
voltage_color = '#FF6464' # 紅色 (低電量)
elif percentage < 50:
voltage_color = '#FFA500' # 橘色 (中低電量)
else:
voltage_color = '#FFFFFF' # 白色 (正常電量)
percentage = data.get('percentage', percentage)
# 顯示總電壓、電池節數、單節電壓和百分比
text = f"{percentage:.0f}%"
vol = f"{voltage:.2f}V"
self.update_field(panel, drone_id, 'battery', text, voltage_color)
self.update_overview_table(drone_id, 'battery', vol)
elif msg_type == 'gps':
lat, lon = data.get('lat', 0), data.get('lon', 0)
self.drone_positions[drone_id] = (lat, lon)
text = f"{lat:.6f}°, {lon:.6f}°"
self.update_field(panel, drone_id, 'gps', text)
self.update_overview_table(drone_id, 'gps', text)
elif msg_type == 'altitude':
altitude = data.get('altitude', 0)
text = f"{altitude:.1f} m"
self.update_field(panel, drone_id, 'altitude', text)
self.update_overview_table(drone_id, 'altitude', text)
elif msg_type == 'local_pose':
text = f"{data['x']:.1f}, {data['y']:.1f}"
self.update_field(panel, drone_id, 'local', text)
self.update_overview_table(drone_id, 'local', text)
elif msg_type == 'hud':
heading = data.get('heading')
self.drone_headings[drone_id] = heading
groundspeed = data.get('groundspeed')
heading_text = f"{heading:.1f}°"
if isinstance(groundspeed, (int, float)):
groundspeed_text = f"{groundspeed:.1f} m/s"
else:
groundspeed_text = "--"
self.update_field(panel, drone_id, 'heading', heading_text)
self.update_field(panel, drone_id, 'groundspeed', groundspeed_text)
self.update_overview_table(drone_id, 'heading', heading_text)
self.update_overview_table(drone_id, 'groundspeed', groundspeed_text)
elif msg_type == 'loss_rate':
loss_rate = data.get('loss_rate', 0)
text = f"{loss_rate:.1f}%"
self.update_field(panel, drone_id, 'loss_rate', text)
self.update_overview_table(drone_id, 'loss_rate', text)
elif msg_type == 'ping':
ping = data.get('ping', 0)
text = f"{ping:.1f} ms"
self.update_field(panel, drone_id, 'ping', text)
self.update_overview_table(drone_id, 'ping', text)
elif msg_type == 'velocity':
text = f"{data['vx']:.1f}, {data['vy']:.1f}"
self.update_overview_table(drone_id, 'velocity', text)
elif msg_type == 'attitude':
roll = data.get('roll', 0)
pitch = data.get('pitch', 0)
yaw = data.get('yaw', 0)
roll_text = f"{roll:.1f}°"
pitch_text = f"{pitch:.1f}°"
yaw_text = f"{yaw:.1f}°"
self.update_overview_table(drone_id, 'roll', roll_text)
self.update_overview_table(drone_id, 'pitch', pitch_text)
self.update_overview_table(drone_id, 'yaw', yaw_text)
# 新的代碼 - 將地圖更新放入緩存,等待批量處理:
if drone_id in self.drone_positions and drone_id in self.drone_headings:
lat, lon = self.drone_positions[drone_id]
heading = self.drone_headings[drone_id]
self.pending_map_updates[drone_id] = (lat, lon, heading)
# 新增處理分組勾選的方法
def handle_group_selection(self, socket_id, state):
"""處理 socket 分組勾選狀態變化"""
# 獲取該分組下的所有無人機
group_drones = [did for did in self.drones.keys()
if self.get_socket_id(did) == socket_id]
# 根據分組勾選狀態更新所有該分組的無人機勾選狀態
is_checked = state == Qt.CheckState.Checked.value
for drone_id in group_drones:
checkbox = self.drones[drone_id].findChild(QCheckBox, f"{drone_id}_checkbox")
if checkbox:
# 暫時斷開信號連接,避免遞迴觸發
checkbox.blockSignals(True)
checkbox.setChecked(is_checked)
checkbox.blockSignals(False)
# 手動更新選中集合
if is_checked:
self.monitor.selected_drones.add(drone_id)
else:
self.monitor.selected_drones.discard(drone_id)
def handle_drone_selection(self, drone_id, state):
"""處理個別無人機勾選狀態變化"""
if state == Qt.CheckState.Checked.value:
self.monitor.selected_drones.add(drone_id)
else:
self.monitor.selected_drones.discard(drone_id)
# 更新對應 socket 分組的勾選狀態
socket_id = self.get_socket_id(drone_id)
self.update_group_checkbox_state(socket_id)
# 新增更新分組勾選框狀態的方法
def update_group_checkbox_state(self, socket_id):
"""更新指定 socket 分組的勾選框狀態"""
# 獲取該分組下的所有無人機
group_drones = [did for did in self.drones.keys()
if self.get_socket_id(did) == socket_id]
if not group_drones:
return
# 檢查該分組內有多少無人機被勾選
checked_count = sum(1 for did in group_drones
if self.drones[did].findChild(QCheckBox, f"{did}_checkbox").isChecked())
# 獲取分組勾選框
if socket_id in self.socket_groups:
group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox")
if group_checkbox:
# 暫時斷開信號連接
group_checkbox.blockSignals(True)
if checked_count == 0:
# 沒有無人機被勾選
group_checkbox.setCheckState(Qt.CheckState.Unchecked)
elif checked_count == len(group_drones):
# 所有無人機都被勾選
group_checkbox.setCheckState(Qt.CheckState.Checked)
else:
# 部分無人機被勾選,顯示半勾選狀態
group_checkbox.setCheckState(Qt.CheckState.PartiallyChecked)
# 重新連接信號
group_checkbox.blockSignals(False)
def handle_select_all(self):
"""處理全選按鈕 - 支援分組結構"""
# 檢查是否所有無人機都已被選中
all_selected = all(
self.drones[drone_id].findChild(QCheckBox, f"{drone_id}_checkbox").isChecked()
for drone_id in self.drones
)
# 如果全部已選中,則取消全選;否則全選
new_state = not all_selected
# 更新所有勾選框狀態(無人機和分組)
for drone_id in self.drones:
checkbox = self.drones[drone_id].findChild(QCheckBox, f"{drone_id}_checkbox")
if checkbox:
checkbox.setChecked(new_state)
# 更新所有分組勾選框狀態
for socket_id in self.socket_groups:
group_checkbox = self.socket_groups[socket_id].findChild(QCheckBox, f"socket_{socket_id}_checkbox")
if group_checkbox:
group_checkbox.blockSignals(True)
group_checkbox.setChecked(new_state)
group_checkbox.blockSignals(False)
def handle_arm_selected(self):
"""處理批次解鎖"""
loop = asyncio.get_event_loop()
for drone_id in self.monitor.selected_drones:
future = self.monitor.arm_drone(drone_id, True)
loop.create_task(self.handle_service_response(future, f"批次解鎖 {drone_id}"))
def handle_takeoff_selected(self):
"""處理批次起飛"""
loop = asyncio.get_event_loop()
for drone_id in self.monitor.selected_drones:
future = self.monitor.takeoff_drone(drone_id, 10.0)
loop.create_task(self.handle_service_response(future, f"批次起飛 {drone_id}"))
def handle_batch_mode_change(self):
mode = self.mode_combo.currentText()
loop = asyncio.get_event_loop()
for drone_id in self.monitor.selected_drones:
future = self.monitor.set_mode(drone_id, mode)
loop.create_task(self.handle_service_response(future, f"{drone_id} 切換模式 {mode}"))
def handle_single_mode_change(self, drone_id):
mode = self.mode_combo.currentText()
loop = asyncio.get_event_loop()
future = self.monitor.set_mode(drone_id, mode)
loop.create_task(self.handle_service_response(future, f"{drone_id} 切換模式 {mode}"))
def update_field(self, panel, drone_id, field, text, color=None):
"""Update a specific field in the panel - 添加變更檢查"""
if label := panel.findChild(QLabel, f"{drone_id}_{field}"):
# 只有在文字真正改變時才更新
if label.text() != text:
label.setText(text)
if color:
label.setStyleSheet(f"color: {color};")
def update_overview_table(self, drone_id=None, field=None, value=None):
# Ensure the widget is available
if not hasattr(self, 'overview_table') or self.overview_table is None:
return
# Update a specific cell in the overview table
if drone_id and field and value:
col = 1 + list(self.drones.keys()).index(drone_id)
row = self.info_type_map.get(field, -1) # Get row from field mapping
if row == -1:
return # Invalid field, exit
item = self.overview_table.item(row, col)
if not item:
item = QTableWidgetItem()
self.overview_table.setItem(row, col, item)
item.setText(value) # Update the cell's text
item.setTextAlignment(Qt.AlignmentFlag.AlignCenter) # Center the text
# If no specific update, refresh entire table
if drone_id is None:
cols = 1 + len(self.drones)
self.overview_table.setColumnCount(cols)
headers = ["資訊"] + list(self.drones.keys())
self.overview_table.setHorizontalHeaderLabels(headers)
for col, did in enumerate(self.drones, start=1):
panel = self.drones[did]
for field, row in self.info_type_map.items():
lbl = panel.findChild(QLabel, f"{did}_{field}")
val = lbl.text() if lbl else "--"
val_item = QTableWidgetItem(val)
val_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
self.overview_table.setItem(row, col, val_item)
def get_socket_id(self, drone_id):
"""從 drone_id 提取 socket_id (例如 's9_1' -> '9')"""
import re
match = re.match(r's(\d+)_\d+', drone_id)
return match.group(1) if match else 'unknown'
def add_drone(self, drone_id):
if drone_id in self.drones:
return
# 獲取 socket_id
socket_id = self.get_socket_id(drone_id)
# 創建無人機面板
panel = self.create_drone_panel(drone_id)
self.drones[drone_id] = panel
# 如果該 socket 分組不存在,創建它
if socket_id not in self.socket_groups:
group_panel = self.create_socket_group_panel(socket_id)
self.socket_groups[socket_id] = group_panel
# 將無人機面板添加到對應的 socket 分組中
group_panel = self.socket_groups[socket_id]
group_panel.drones_layout.addWidget(panel)
# 重新排序並顯示所有 socket 分組
self.reorganize_socket_groups()
# 更新分組勾選框狀態
self.update_group_checkbox_state(socket_id)
# 更新總覽表
self.update_overview_table()
def reorganize_socket_groups(self):
"""重新組織和排序 socket 分組"""
# 先清空主佈局
while self.drone_panel_layout.count():
w = self.drone_panel_layout.takeAt(0).widget()
if w:
w.setParent(None)
# 對每個 socket 分組內的無人機進行排序
for socket_id, group_panel in self.socket_groups.items():
# 獲取該分組內的所有無人機
group_drones = [did for did in self.drones.keys()
if self.get_socket_id(did) == socket_id]
# 清空分組佈局
while group_panel.drones_layout.count():
w = group_panel.drones_layout.takeAt(0).widget()
if w:
w.setParent(None)
# 對該分組內的無人機按數字排序
def sort_key(x):
parts = x[1:].split('_')
return (int(parts[0]), int(parts[1]))
# 重新添加排序後的無人機面板
for did in sorted(group_drones, key=sort_key):
group_panel.drones_layout.addWidget(self.drones[did])
# 按 socket_id 數字順序添加分組到主佈局
for socket_id in sorted(self.socket_groups.keys(), key=lambda x: int(x)):
self.drone_panel_layout.addWidget(self.socket_groups[socket_id])
def spin_ros(self):
try:
self.executor.spin_once(timeout_sec=0.01)
for (drone_id, msg_type), data in self.monitor.latest_data.items():
self.monitor.signals.update_signal.emit(msg_type, drone_id, data)
self.monitor.latest_data.clear()
except Exception as e:
print(f"ROS spin error: {e}")
def closeEvent(self, event):
self.monitor.destroy_node()
self.executor.shutdown()
rclpy.shutdown()
event.accept()
if __name__ == '__main__':
app = QApplication(sys.argv)
station = ControlStationUI()
station.show()
app.exec()