PyQt6 GUI

chiyu
ken910606 1 year ago
parent 53c062a218
commit d8ef37fdc0

@ -0,0 +1,392 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QSplitter, QScrollArea,
QSizePolicy)
from PyQt6.QtCore import Qt, QTimer, QObject, pyqtSignal
from PyQt6.QtGui import QColor
import math
import re
from threading import Lock
from geometry_msgs.msg import Point, Vector3
from sensor_msgs.msg import BatteryState, NavSatFix, Imu
from std_msgs.msg import String, Float64
from mavros_msgs.msg import VfrHud
class DroneSignals(QObject):
update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data)
class DroneMonitor(Node):
def __init__(self):
super().__init__('drone_monitor')
self.signals = DroneSignals()
self.drone_topics = {}
self.lock = Lock()
# 主题检测定时器
self.create_timer(1.0, self.scan_topics)
def scan_topics(self):
topics = self.get_topic_names_and_types()
drone_pattern = re.compile(r'/MavLinkBus/(s\d+)/(\w+)')
found_drones = set()
for topic_name, _ in topics:
if match := drone_pattern.match(topic_name):
drone_id, topic_type = match.groups()
found_drones.add(drone_id)
with self.lock:
self.drone_topics.setdefault(drone_id, set()).add(topic_type)
for drone_id in found_drones:
if not hasattr(self, f'drone_{drone_id}_subs'):
self.setup_drone_subs(drone_id)
def setup_drone_subs(self, drone_id):
base_topic = f'/MavLinkBus/{drone_id}'
subs = {
'attitude': self.create_subscription(
Imu,
f'{base_topic}/attitude',
lambda msg, did=drone_id: self.attitude_callback(did, msg),
10
),
'battery': self.create_subscription(
BatteryState,
f'{base_topic}/battery',
lambda msg, did=drone_id: self.battery_callback(did, msg),
10
),
'flightMode': self.create_subscription(
String,
f'{base_topic}/flightMode',
lambda msg, did=drone_id: self.mode_callback(did, msg),
10
),
'global': self.create_subscription(
NavSatFix,
f'{base_topic}/global_position/global',
lambda msg, did=drone_id: self.gps_callback(did, msg),
10
),
'rel_alt': self.create_subscription(
Float64,
f'{base_topic}/global_position/rel_alt',
lambda msg, did=drone_id: self.altitude_callback(did, msg),
10
),
'local_pose': self.create_subscription(
Point,
f'{base_topic}/local_position/pose',
lambda msg, did=drone_id: self.local_pose_callback(did, msg),
10
),
'local_vel': self.create_subscription(
Vector3,
f'{base_topic}/local_position/velocity',
lambda msg, did=drone_id: self.local_vel_callback(did, msg),
10
),
'vfr_hud': self.create_subscription(
VfrHud,
f'{base_topic}/vfr_hud',
lambda msg, did=drone_id: self.hud_callback(did, msg),
10
)
}
setattr(self, f'drone_{drone_id}_subs', subs)
self.signals.update_signal.emit('new_drone', drone_id, None)
def quaternion_to_euler(self, q):
sinr_cosp = 2 * (q.w * q.x + q.y * q.z)
cosr_cosp = 1 - 2 * (q.x**2 + q.y**2)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2 * (q.w * q.y - q.z * q.x)
pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi/2, sinp)
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y**2 + q.z**2)
yaw = math.atan2(siny_cosp, cosy_cosp)
return math.degrees(roll), math.degrees(pitch), math.degrees(yaw)
# 回调函数
def attitude_callback(self, drone_id, msg):
if hasattr(msg, 'orientation'):
roll, pitch, yaw = self.quaternion_to_euler(msg.orientation)
self.signals.update_signal.emit('attitude', drone_id, {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'rates': (msg.angular_velocity.x,
msg.angular_velocity.y,
msg.angular_velocity.z)
})
def battery_callback(self, drone_id, msg):
self.signals.update_signal.emit('battery', drone_id, {
'voltage': msg.voltage
})
def mode_callback(self, drone_id, msg):
self.signals.update_signal.emit('mode', drone_id, msg.data)
def gps_callback(self, drone_id, msg):
self.signals.update_signal.emit('gps', drone_id, {
'lat': msg.latitude,
'lon': msg.longitude,
'alt': msg.altitude
})
def local_vel_callback(self, drone_id, msg):
self.signals.update_signal.emit('velocity', drone_id, {
'vx': msg.x,
'vy': msg.y,
'vz': msg.z
})
def altitude_callback(self, drone_id, msg):
self.signals.update_signal.emit('altitude', drone_id, msg.data)
def local_pose_callback(self, drone_id, msg):
self.signals.update_signal.emit('local_pose', drone_id, {
'x': msg.x,
'y': msg.y,
'z': msg.z
})
def hud_callback(self, drone_id, msg):
self.signals.update_signal.emit('hud', drone_id, {
'airspeed': msg.airspeed,
'groundspeed': msg.groundspeed,
'heading': msg.heading,
'throttle': msg.throttle,
'alt': msg.altitude,
'climb': msg.climb
})
class ControlStationUI(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle('GCS')
self.resize(1400, 900)
self.drones = {}
# 初始化ROS2
rclpy.init()
self.monitor = DroneMonitor()
self.monitor.signals.update_signal.connect(self.update_ui)
# ROS执行器配置
self.executor = rclpy.executors.SingleThreadedExecutor()
self.executor.add_node(self.monitor)
# 初始化UI
self.init_ui()
# 定时处理ROS事件
self.timer = QTimer()
self.timer.timeout.connect(self.spin_ros)
self.timer.start(50)
def init_ui(self):
main_splitter = QSplitter(Qt.Orientation.Horizontal)
# 左侧信息面板
left_panel = QWidget()
left_layout = QVBoxLayout(left_panel)
left_layout.setContentsMargins(5, 5, 5, 5)
# 信息滚动区域
scroll_area = QScrollArea()
scroll_area.setWidgetResizable(True)
self.info_container = QWidget()
self.info_layout = QVBoxLayout(self.info_container)
self.info_layout.setAlignment(Qt.AlignmentFlag.AlignTop)
scroll_area.setWidget(self.info_container)
left_layout.addWidget(scroll_area)
# 右侧地图区域
self.map_widget = QLabel()
self.map_widget.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.map_widget.setStyleSheet("""
QLabel {
background-color: #1A1A1A;
color: #AAAAAA;
font-size: 18px;
border: 2px solid #333;
border-radius: 10px;
padding: 20px;
}
""")
self.update_map_display("等待GPS數據...")
main_splitter.addWidget(left_panel)
main_splitter.addWidget(self.map_widget)
main_splitter.setSizes([400, 1000])
self.setCentralWidget(main_splitter)
def create_drone_panel(self, drone_id):
panel = QWidget()
panel.setObjectName(f"panel_{drone_id}")
panel.setStyleSheet("""
QWidget#panel_%s {
background-color: #2A2A2A;
border-radius: 8px;
margin: 6px;
padding: 10px;
border: 1px solid #444;
}
QLabel {
color: #DDD;
font-size: 12px;
padding: 2px;
}
""" % drone_id)
layout = QVBoxLayout(panel)
layout.setContentsMargins(8, 8, 8, 8)
layout.setSpacing(4)
# 顶部标题栏
header = QWidget()
header_layout = QHBoxLayout(header)
header_layout.setContentsMargins(0, 0, 0, 0)
# ID显示
id_label = QLabel(drone_id)
id_label.setStyleSheet("""
font-weight: bold;
font-size: 14px;
color: #7FFFD4;
min-width: 80px;
""")
# 状态指示灯
status_light = QLabel("")
status_light.setStyleSheet("color: #00FF00; font-size: 16px;")
header_layout.addWidget(id_label)
header_layout.addWidget(status_light)
header_layout.addStretch()
layout.addWidget(header)
# 数据字段(带标签)
self.create_data_row(layout, "模式:", f"{drone_id}_mode", "--")
self.create_data_row(layout, "電壓:", f"{drone_id}_battery", "-- V")
self.create_data_row(layout, "GPS:", f"{drone_id}_gps", "等待定位...")
self.create_data_row(layout, "Local:", f"{drone_id}_local", "X: -- Y: -- Z: --")
self.create_data_row(layout, "速度:", f"{drone_id}_velocity", "VX: -- VY: -- VZ: --")
self.create_data_row(layout, "姿態:", f"{drone_id}_attitude", "Roll: -- Pitch: -- Yaw: --")
return panel
def create_data_row(self, layout, title, object_name, default):
row = QWidget()
hbox = QHBoxLayout(row)
hbox.setContentsMargins(0, 0, 0, 0)
# 标题标签
title_label = QLabel(title)
title_label.setStyleSheet("color: #888; min-width: 80px;")
title_label.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Preferred)
# 数据标签
value_label = QLabel(default)
value_label.setObjectName(object_name)
value_label.setWordWrap(True)
value_label.setStyleSheet("margin-left: 10px;")
hbox.addWidget(title_label)
hbox.addWidget(value_label)
layout.addWidget(row)
def update_ui(self, msg_type, drone_id, data):
if msg_type == 'new_drone':
self.add_drone(drone_id)
return
if not (panel := self.drones.get(drone_id)):
return
if msg_type == 'mode':
self.update_field(panel, drone_id, 'mode', f"模式: {data}",
'#FF5555' if '返航' in data else '#55FF55')
elif msg_type == 'battery':
voltage = data.get('voltage', 0)
self.update_field(panel, drone_id, 'battery',
f"{voltage:.1f} V",
'#FF6464' if voltage < 12 else '#FFFFFF')
elif msg_type == 'gps':
text = (f"緯度: {data['lat']:.6f}\n"
f"經度: {data['lon']:.6f}\n"
f"高度: {data['alt']:.1f} m")
self.update_field(panel, drone_id, 'gps', text)
self.update_map_display(f"Drone {drone_id}\n{text}")
elif msg_type == 'local_pose':
text = (f"X: {data['x']:.1f} m\n"
f"Y: {data['y']:.1f} m\n"
f"Z: {data['z']:.1f} m")
self.update_field(panel, drone_id, 'local', text)
elif msg_type == 'velocity':
text = (f"VX: {data['vx']:.1f} m/s\n"
f"VY: {data['vy']:.1f} m/s\n"
f"VZ: {data['vz']:.1f} m/s")
self.update_field(panel, drone_id, 'velocity', text)
elif msg_type == 'attitude':
text = (f"Roll: {data['roll']:.1f}°\n"
f"Pitch: {data['pitch']:.1f}°\n"
f"Yaw: {data['yaw']:.1f}°")
self.update_field(panel, drone_id, 'attitude', text)
elif msg_type == 'hud':
text = (f"空速: {data['airspeed']:.1f} m/s\n"
f"地速: {data['groundspeed']:.1f} m/s\n"
f"航向: {data['heading']:.1f}°")
self.update_field(panel, drone_id, 'hud', text)
def update_field(self, panel, drone_id, field, text, color=None):
if label := panel.findChild(QLabel, f"{drone_id}_{field}"):
label.setText(text)
if color:
label.setStyleSheet(f"color: {color};")
def add_drone(self, drone_id):
if drone_id not in self.drones:
panel = self.create_drone_panel(drone_id)
self.info_layout.addWidget(panel)
self.drones[drone_id] = panel
def update_map_display(self, text):
self.map_widget.setText(text)
self.map_widget.repaint()
def spin_ros(self):
try:
self.executor.spin_once(timeout_sec=0)
except Exception as e:
print(f"ROS spin error: {e}")
def closeEvent(self, event):
self.monitor.destroy_node()
self.executor.shutdown()
rclpy.shutdown()
event.accept()
if __name__ == '__main__':
app = QApplication([])
station = ControlStationUI()
station.show()
app.exec()
Loading…
Cancel
Save