diff --git a/src/unitdev01/unitdev01/gui.py b/src/unitdev01/unitdev01/gui.py new file mode 100644 index 0000000..623a0d2 --- /dev/null +++ b/src/unitdev01/unitdev01/gui.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python3 +import rclpy +from rclpy.node import Node +from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, + QWidget, QLabel, QSplitter, QScrollArea, + QSizePolicy) +from PyQt6.QtCore import Qt, QTimer, QObject, pyqtSignal +from PyQt6.QtGui import QColor +import math +import re +from threading import Lock +from geometry_msgs.msg import Point, Vector3 +from sensor_msgs.msg import BatteryState, NavSatFix, Imu +from std_msgs.msg import String, Float64 +from mavros_msgs.msg import VfrHud + +class DroneSignals(QObject): + update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data) + +class DroneMonitor(Node): + def __init__(self): + super().__init__('drone_monitor') + self.signals = DroneSignals() + self.drone_topics = {} + self.lock = Lock() + + # 主题检测定时器 + self.create_timer(1.0, self.scan_topics) + + def scan_topics(self): + topics = self.get_topic_names_and_types() + drone_pattern = re.compile(r'/MavLinkBus/(s\d+)/(\w+)') + + found_drones = set() + for topic_name, _ in topics: + if match := drone_pattern.match(topic_name): + drone_id, topic_type = match.groups() + found_drones.add(drone_id) + with self.lock: + self.drone_topics.setdefault(drone_id, set()).add(topic_type) + + for drone_id in found_drones: + if not hasattr(self, f'drone_{drone_id}_subs'): + self.setup_drone_subs(drone_id) + + def setup_drone_subs(self, drone_id): + base_topic = f'/MavLinkBus/{drone_id}' + + subs = { + 'attitude': self.create_subscription( + Imu, + f'{base_topic}/attitude', + lambda msg, did=drone_id: self.attitude_callback(did, msg), + 10 + ), + 'battery': self.create_subscription( + BatteryState, + f'{base_topic}/battery', + lambda msg, did=drone_id: self.battery_callback(did, msg), + 10 + ), + 'flightMode': self.create_subscription( + String, + f'{base_topic}/flightMode', + lambda msg, did=drone_id: self.mode_callback(did, msg), + 10 + ), + 'global': self.create_subscription( + NavSatFix, + f'{base_topic}/global_position/global', + lambda msg, did=drone_id: self.gps_callback(did, msg), + 10 + ), + 'rel_alt': self.create_subscription( + Float64, + f'{base_topic}/global_position/rel_alt', + lambda msg, did=drone_id: self.altitude_callback(did, msg), + 10 + ), + 'local_pose': self.create_subscription( + Point, + f'{base_topic}/local_position/pose', + lambda msg, did=drone_id: self.local_pose_callback(did, msg), + 10 + ), + 'local_vel': self.create_subscription( + Vector3, + f'{base_topic}/local_position/velocity', + lambda msg, did=drone_id: self.local_vel_callback(did, msg), + 10 + ), + 'vfr_hud': self.create_subscription( + VfrHud, + f'{base_topic}/vfr_hud', + lambda msg, did=drone_id: self.hud_callback(did, msg), + 10 + ) + } + + setattr(self, f'drone_{drone_id}_subs', subs) + self.signals.update_signal.emit('new_drone', drone_id, None) + + def quaternion_to_euler(self, q): + sinr_cosp = 2 * (q.w * q.x + q.y * q.z) + cosr_cosp = 1 - 2 * (q.x**2 + q.y**2) + roll = math.atan2(sinr_cosp, cosr_cosp) + + sinp = 2 * (q.w * q.y - q.z * q.x) + pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi/2, sinp) + + siny_cosp = 2 * (q.w * q.z + q.x * q.y) + cosy_cosp = 1 - 2 * (q.y**2 + q.z**2) + yaw = math.atan2(siny_cosp, cosy_cosp) + + return math.degrees(roll), math.degrees(pitch), math.degrees(yaw) + + # 回调函数 + def attitude_callback(self, drone_id, msg): + if hasattr(msg, 'orientation'): + roll, pitch, yaw = self.quaternion_to_euler(msg.orientation) + self.signals.update_signal.emit('attitude', drone_id, { + 'roll': roll, + 'pitch': pitch, + 'yaw': yaw, + 'rates': (msg.angular_velocity.x, + msg.angular_velocity.y, + msg.angular_velocity.z) + }) + + def battery_callback(self, drone_id, msg): + self.signals.update_signal.emit('battery', drone_id, { + 'voltage': msg.voltage + }) + + def mode_callback(self, drone_id, msg): + self.signals.update_signal.emit('mode', drone_id, msg.data) + + def gps_callback(self, drone_id, msg): + self.signals.update_signal.emit('gps', drone_id, { + 'lat': msg.latitude, + 'lon': msg.longitude, + 'alt': msg.altitude + }) + + def local_vel_callback(self, drone_id, msg): + self.signals.update_signal.emit('velocity', drone_id, { + 'vx': msg.x, + 'vy': msg.y, + 'vz': msg.z + }) + + def altitude_callback(self, drone_id, msg): + self.signals.update_signal.emit('altitude', drone_id, msg.data) + + def local_pose_callback(self, drone_id, msg): + self.signals.update_signal.emit('local_pose', drone_id, { + 'x': msg.x, + 'y': msg.y, + 'z': msg.z + }) + + def hud_callback(self, drone_id, msg): + self.signals.update_signal.emit('hud', drone_id, { + 'airspeed': msg.airspeed, + 'groundspeed': msg.groundspeed, + 'heading': msg.heading, + 'throttle': msg.throttle, + 'alt': msg.altitude, + 'climb': msg.climb + }) + +class ControlStationUI(QMainWindow): + def __init__(self): + super().__init__() + self.setWindowTitle('GCS') + self.resize(1400, 900) + self.drones = {} + + # 初始化ROS2 + rclpy.init() + self.monitor = DroneMonitor() + self.monitor.signals.update_signal.connect(self.update_ui) + + # ROS执行器配置 + self.executor = rclpy.executors.SingleThreadedExecutor() + self.executor.add_node(self.monitor) + + # 初始化UI + self.init_ui() + + # 定时处理ROS事件 + self.timer = QTimer() + self.timer.timeout.connect(self.spin_ros) + self.timer.start(50) + + def init_ui(self): + main_splitter = QSplitter(Qt.Orientation.Horizontal) + + # 左侧信息面板 + left_panel = QWidget() + left_layout = QVBoxLayout(left_panel) + left_layout.setContentsMargins(5, 5, 5, 5) + + # 信息滚动区域 + scroll_area = QScrollArea() + scroll_area.setWidgetResizable(True) + self.info_container = QWidget() + self.info_layout = QVBoxLayout(self.info_container) + self.info_layout.setAlignment(Qt.AlignmentFlag.AlignTop) + scroll_area.setWidget(self.info_container) + + left_layout.addWidget(scroll_area) + + # 右侧地图区域 + self.map_widget = QLabel() + self.map_widget.setAlignment(Qt.AlignmentFlag.AlignCenter) + self.map_widget.setStyleSheet(""" + QLabel { + background-color: #1A1A1A; + color: #AAAAAA; + font-size: 18px; + border: 2px solid #333; + border-radius: 10px; + padding: 20px; + } + """) + self.update_map_display("等待GPS數據...") + + main_splitter.addWidget(left_panel) + main_splitter.addWidget(self.map_widget) + main_splitter.setSizes([400, 1000]) + + self.setCentralWidget(main_splitter) + + def create_drone_panel(self, drone_id): + panel = QWidget() + panel.setObjectName(f"panel_{drone_id}") + panel.setStyleSheet(""" + QWidget#panel_%s { + background-color: #2A2A2A; + border-radius: 8px; + margin: 6px; + padding: 10px; + border: 1px solid #444; + } + QLabel { + color: #DDD; + font-size: 12px; + padding: 2px; + } + """ % drone_id) + + layout = QVBoxLayout(panel) + layout.setContentsMargins(8, 8, 8, 8) + layout.setSpacing(4) + + # 顶部标题栏 + header = QWidget() + header_layout = QHBoxLayout(header) + header_layout.setContentsMargins(0, 0, 0, 0) + + # ID显示 + id_label = QLabel(drone_id) + id_label.setStyleSheet(""" + font-weight: bold; + font-size: 14px; + color: #7FFFD4; + min-width: 80px; + """) + + # 状态指示灯 + status_light = QLabel("●") + status_light.setStyleSheet("color: #00FF00; font-size: 16px;") + + header_layout.addWidget(id_label) + header_layout.addWidget(status_light) + header_layout.addStretch() + + layout.addWidget(header) + + # 数据字段(带标签) + self.create_data_row(layout, "模式:", f"{drone_id}_mode", "--") + self.create_data_row(layout, "電壓:", f"{drone_id}_battery", "-- V") + self.create_data_row(layout, "GPS:", f"{drone_id}_gps", "等待定位...") + self.create_data_row(layout, "Local:", f"{drone_id}_local", "X: -- Y: -- Z: --") + self.create_data_row(layout, "速度:", f"{drone_id}_velocity", "VX: -- VY: -- VZ: --") + self.create_data_row(layout, "姿態:", f"{drone_id}_attitude", "Roll: -- Pitch: -- Yaw: --") + + return panel + + def create_data_row(self, layout, title, object_name, default): + row = QWidget() + hbox = QHBoxLayout(row) + hbox.setContentsMargins(0, 0, 0, 0) + + # 标题标签 + title_label = QLabel(title) + title_label.setStyleSheet("color: #888; min-width: 80px;") + title_label.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Preferred) + + # 数据标签 + value_label = QLabel(default) + value_label.setObjectName(object_name) + value_label.setWordWrap(True) + value_label.setStyleSheet("margin-left: 10px;") + + hbox.addWidget(title_label) + hbox.addWidget(value_label) + layout.addWidget(row) + + def update_ui(self, msg_type, drone_id, data): + if msg_type == 'new_drone': + self.add_drone(drone_id) + return + + if not (panel := self.drones.get(drone_id)): + return + + if msg_type == 'mode': + self.update_field(panel, drone_id, 'mode', f"模式: {data}", + '#FF5555' if '返航' in data else '#55FF55') + + elif msg_type == 'battery': + voltage = data.get('voltage', 0) + self.update_field(panel, drone_id, 'battery', + f"{voltage:.1f} V", + '#FF6464' if voltage < 12 else '#FFFFFF') + + elif msg_type == 'gps': + text = (f"緯度: {data['lat']:.6f}\n" + f"經度: {data['lon']:.6f}\n" + f"高度: {data['alt']:.1f} m") + self.update_field(panel, drone_id, 'gps', text) + self.update_map_display(f"Drone {drone_id}\n{text}") + + elif msg_type == 'local_pose': + text = (f"X: {data['x']:.1f} m\n" + f"Y: {data['y']:.1f} m\n" + f"Z: {data['z']:.1f} m") + self.update_field(panel, drone_id, 'local', text) + + elif msg_type == 'velocity': + text = (f"VX: {data['vx']:.1f} m/s\n" + f"VY: {data['vy']:.1f} m/s\n" + f"VZ: {data['vz']:.1f} m/s") + self.update_field(panel, drone_id, 'velocity', text) + + elif msg_type == 'attitude': + text = (f"Roll: {data['roll']:.1f}°\n" + f"Pitch: {data['pitch']:.1f}°\n" + f"Yaw: {data['yaw']:.1f}°") + self.update_field(panel, drone_id, 'attitude', text) + + elif msg_type == 'hud': + text = (f"空速: {data['airspeed']:.1f} m/s\n" + f"地速: {data['groundspeed']:.1f} m/s\n" + f"航向: {data['heading']:.1f}°") + self.update_field(panel, drone_id, 'hud', text) + + def update_field(self, panel, drone_id, field, text, color=None): + if label := panel.findChild(QLabel, f"{drone_id}_{field}"): + label.setText(text) + if color: + label.setStyleSheet(f"color: {color};") + + def add_drone(self, drone_id): + if drone_id not in self.drones: + panel = self.create_drone_panel(drone_id) + self.info_layout.addWidget(panel) + self.drones[drone_id] = panel + + def update_map_display(self, text): + self.map_widget.setText(text) + self.map_widget.repaint() + + def spin_ros(self): + try: + self.executor.spin_once(timeout_sec=0) + except Exception as e: + print(f"ROS spin error: {e}") + + def closeEvent(self, event): + self.monitor.destroy_node() + self.executor.shutdown() + rclpy.shutdown() + event.accept() + +if __name__ == '__main__': + app = QApplication([]) + station = ControlStationUI() + station.show() + app.exec() \ No newline at end of file