diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index 9130a15..60ec91b 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -22,6 +22,27 @@ from pymavlink import mavutil # ROS2 的 import from rclpy.node import Node +# 自定義的 import +from mavlinkDevice import mavlink_device +from mavlinkPublish import mavlink_publisher +from theLogger import setup_logger + + +# ====================== 分割線 ===================== + +logger = setup_logger("mavlinkObject.py") + +# 基礎功能的 import +import threading +import queue +import time + +# mavlink 的 import +from pymavlink import mavutil + +# ROS2 的 import +from rclpy.node import Node + # 自定義的 import from mavlinkDevice import mavlink_device, mavlink_systems from mavlinkPublish import mavlink_publisher @@ -131,8 +152,20 @@ class mavlink_bridge(Node, mavlink_publisher): # print("get mode :", mavutil.mode_string_v10(msg)) # debug # print("record mode :", this_component.emitParams['flightMode_mode']) # debug - elif msg.get_msgId() == 147: # BATTERY_STATUS 處理 - this_component = self.mavlink_systems[sysid].components[msg.get_srcComponent()] + elif msg_id == 30: # ATTITUDE 處理 + this_component.emitParams['attitude'] = msg + + elif msg_id == 32: # LOCAL_POSITION_NED 處理 + this_component.emitParams['local_position'] = msg + + elif msg_id == 33: # GLOBAL_POSITION_INT 處理 + this_component.emitParams['global_position'] = msg + + elif msg_id == 74: # VFR_HUD 處理 + this_component.emitParams['vfr_hud'] = msg + + elif msg_id == 147: # BATTERY_STATUS 處理 + this_component.emitParams['battery'] = msg # ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↑↑↑↑↑↑↑↑↑↑↑↑ @@ -273,6 +306,7 @@ class mavlink_object(): self.running = False def _run_thread(self): + logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id)) logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id)) start_time = time.time() while self.running: @@ -282,6 +316,7 @@ class mavlink_object(): try: mavlinkMsg = self.mavlink_socket.recv_msg() except Exception as e: + logger.critical(f"Receiving data not mavlink format. Object Delete.") logger.critical(f"Receiving data not mavlink format. Object Delete.") print(f"[mavlinkObject.py] Error receiving mavlink data: {e}") print(mavlinkMsg) diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py index b47416e..028e3ee 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py @@ -10,8 +10,13 @@ publisher topic name 命名規則為 <前綴詞>/s/<具體 topic> ''' import std_msgs.msg +import sensor_msgs.msg +import geometry_msgs.msg +import mavros_msgs.msg from theLogger import setup_logger +import math + logger = setup_logger("mavlinkPublish.py") @@ -46,7 +51,158 @@ class mavlink_publisher(): pass # ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同 ros2 topic 訊息 請放在這裡 ↓↓↓↓↓↓↓↓↓↓↓↓ + def euler_to_quaternion(cls,roll, pitch, yaw): + qx = math.sin(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) - math.cos(roll/2) * math.sin(pitch/2) * math.sin(yaw/2) + qy = math.cos(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) + qz = math.cos(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) - math.sin(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + qw = math.cos(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.sin(pitch/2) * math.sin(yaw/2) + return [qx, qy, qz, qw] + + def create_attitude(self, sysid, component_obj): + target_topic = 'attitude' + + try: + _ = component_obj.emitParams['attitude'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(sensor_msgs.msg.Imu, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_attitude] + + def packEmit_attitude(self, emitParams, publisher): + mav_msg = emitParams['attitude'] + msg = sensor_msgs.msg.Imu() + x, y, z, w = self.euler_to_quaternion(mav_msg.roll, mav_msg.pitch, mav_msg.yaw) + msg.orientation.x = x + msg.orientation.y = y + msg.orientation.z = z + msg.orientation.w = w + msg.angular_velocity.x = mav_msg.rollspeed + msg.angular_velocity.y = mav_msg.pitchspeed + msg.angular_velocity.z = mav_msg.yawspeed + publisher.publish(msg) + pass + + def create_local_position_pose(self, sysid, component_obj): + target_topic = 'local_position/pose' + try: + _ = component_obj.emitParams['local_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(geometry_msgs.msg.Point, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_pose] + + def packEmit_local_pose(cls, emitParams, publisher): + mav_msg = emitParams['local_position'] + msg = geometry_msgs.msg.Point() + msg.x = mav_msg.x + msg.y = mav_msg.y + msg.z = mav_msg.z + publisher.publish(msg) + pass + + def create_local_position_velocity(self, sysid, component_obj): + target_topic = 'local_position/velocity' + try: + _ = component_obj.emitParams['local_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(geometry_msgs.msg.Vector3, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_vel] + + def packEmit_local_vel(cls, emitParams, publisher): + mav_msg = emitParams['local_position'] + msg = geometry_msgs.msg.Vector3() + msg.x = mav_msg.vx + msg.y = mav_msg.vy + msg.z = mav_msg.vz + publisher.publish(msg) + pass + + def create_global_global(self, sysid, component_obj): + target_topic = 'global_position/global' + try: + _ = component_obj.emitParams['global_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(sensor_msgs.msg.NavSatFix, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_global] + + def packEmit_global_global(cls, emitParams, publisher): + mav_msg = emitParams['global_position'] + msg = sensor_msgs.msg.NavSatFix() + msg.latitude = mav_msg.lat/1e7 + msg.longitude = mav_msg.lon/1e7 + msg.altitude = mav_msg.alt/1e3 + publisher.publish(msg) + pass + + def create_global_rel(self, sysid, component_obj): + target_topic = 'global_position/rel_alt' + try: + _ = component_obj.emitParams['global_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(std_msgs.msg.Float64, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_rel] + def packEmit_global_rel(cls, emitParams, publisher): + mav_msg = emitParams['global_position'] + msg = std_msgs.msg.Float64() + msg.data = float(mav_msg.relative_alt/1e3) + publisher.publish(msg) + pass + def create_vfr_hud(self, sysid, component_obj): + target_topic = 'vfr_hud' + try: + _ = component_obj.emitParams['vfr_hud'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(mavros_msgs.msg.VfrHud, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_vfr_hud] + + def packEmit_vfr_hud(cls, emitParams, publisher): + mav_msg = emitParams['vfr_hud'] + msg = mavros_msgs.msg.VfrHud() + msg.airspeed = mav_msg.airspeed + msg.groundspeed = mav_msg.groundspeed + msg.heading = mav_msg.heading + msg.throttle = float(mav_msg.throttle) + msg.altitude = mav_msg.alt + msg.climb = mav_msg.climb + publisher.publish(msg) + pass + + def create_battery(self, sysid, component_obj): + target_topic = 'battery' + try: + _ = component_obj.emitParams['battery'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(sensor_msgs.msg.BatteryState, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_battery] + def packEmit_battery(cls, emitParams, publisher): + mav_msg = emitParams['battery'] + msg = sensor_msgs.msg.BatteryState() + msg.voltage = mav_msg.voltages[0]/1e3 + publisher.publish(msg) + pass + + # ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同 ros2 topic 訊息 請放在這裡 ↑↑↑↑↑↑↑↑↑↑↑↑ \ No newline at end of file diff --git a/src/unitdev01/unitdev01/gui.py b/src/unitdev01/unitdev01/gui.py new file mode 100644 index 0000000..a81dbb2 --- /dev/null +++ b/src/unitdev01/unitdev01/gui.py @@ -0,0 +1,444 @@ +#!/usr/bin/env python3 +import rclpy +from rclpy.node import Node +from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, + QWidget, QLabel, QSplitter, QScrollArea, + QSizePolicy, QApplication) +from PyQt6.QtCore import Qt, QTimer, QObject, pyqtSignal, QUrl +from PyQt6.QtGui import QColor +from PyQt6.QtWebEngineWidgets import QWebEngineView +import math +import re +import os +import sys +from threading import Lock +from geometry_msgs.msg import Point, Vector3 +from sensor_msgs.msg import BatteryState, NavSatFix, Imu +from std_msgs.msg import String, Float64 +from mavros_msgs.msg import VfrHud + +class DroneSignals(QObject): + update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data) + +class DroneMonitor(Node): + def __init__(self): + super().__init__('drone_monitor') + self.signals = DroneSignals() + self.drone_topics = {} + self.lock = Lock() + + # 主题检测定时器 + self.create_timer(1.0, self.scan_topics) + + def scan_topics(self): + topics = self.get_topic_names_and_types() + drone_pattern = re.compile(r'/MavLinkBus/(s\d+)/(\w+)') + + found_drones = set() + for topic_name, _ in topics: + if match := drone_pattern.match(topic_name): + drone_id, topic_type = match.groups() + found_drones.add(drone_id) + with self.lock: + self.drone_topics.setdefault(drone_id, set()).add(topic_type) + + for drone_id in found_drones: + if not hasattr(self, f'drone_{drone_id}_subs'): + self.setup_drone_subs(drone_id) + + def setup_drone_subs(self, drone_id): + base_topic = f'/MavLinkBus/{drone_id}' + + subs = { + 'attitude': self.create_subscription( + Imu, + f'{base_topic}/attitude', + lambda msg, did=drone_id: self.attitude_callback(did, msg), + 10 + ), + 'battery': self.create_subscription( + BatteryState, + f'{base_topic}/battery', + lambda msg, did=drone_id: self.battery_callback(did, msg), + 10 + ), + 'flightMode': self.create_subscription( + String, + f'{base_topic}/flightMode', + lambda msg, did=drone_id: self.mode_callback(did, msg), + 10 + ), + 'global': self.create_subscription( + NavSatFix, + f'{base_topic}/global_position/global', + lambda msg, did=drone_id: self.gps_callback(did, msg), + 10 + ), + 'rel_alt': self.create_subscription( + Float64, + f'{base_topic}/global_position/rel_alt', + lambda msg, did=drone_id: self.altitude_callback(did, msg), + 10 + ), + 'local_pose': self.create_subscription( + Point, + f'{base_topic}/local_position/pose', + lambda msg, did=drone_id: self.local_pose_callback(did, msg), + 10 + ), + 'local_vel': self.create_subscription( + Vector3, + f'{base_topic}/local_position/velocity', + lambda msg, did=drone_id: self.local_vel_callback(did, msg), + 10 + ), + 'vfr_hud': self.create_subscription( + VfrHud, + f'{base_topic}/vfr_hud', + lambda msg, did=drone_id: self.hud_callback(did, msg), + 10 + ) + } + + setattr(self, f'drone_{drone_id}_subs', subs) + self.signals.update_signal.emit('new_drone', drone_id, None) + + def quaternion_to_euler(self, q): + sinr_cosp = 2 * (q.w * q.x + q.y * q.z) + cosr_cosp = 1 - 2 * (q.x**2 + q.y**2) + roll = math.atan2(sinr_cosp, cosr_cosp) + + sinp = 2 * (q.w * q.y - q.z * q.x) + pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi/2, sinp) + + siny_cosp = 2 * (q.w * q.z + q.x * q.y) + cosy_cosp = 1 - 2 * (q.y**2 + q.z**2) + yaw = math.atan2(siny_cosp, cosy_cosp) + + return math.degrees(roll), math.degrees(pitch), math.degrees(yaw) + + # 回调函数 + def attitude_callback(self, drone_id, msg): + if hasattr(msg, 'orientation'): + roll, pitch, yaw = self.quaternion_to_euler(msg.orientation) + self.signals.update_signal.emit('attitude', drone_id, { + 'roll': roll, + 'pitch': pitch, + 'yaw': yaw, + 'rates': (msg.angular_velocity.x, + msg.angular_velocity.y, + msg.angular_velocity.z) + }) + + def battery_callback(self, drone_id, msg): + self.signals.update_signal.emit('battery', drone_id, { + 'voltage': msg.voltage + }) + + def mode_callback(self, drone_id, msg): + self.signals.update_signal.emit('mode', drone_id, msg.data) + + def gps_callback(self, drone_id, msg): + self.signals.update_signal.emit('gps', drone_id, { + 'lat': msg.latitude, + 'lon': msg.longitude + ,'alt': msg.altitude + }) + + def local_vel_callback(self, drone_id, msg): + self.signals.update_signal.emit('velocity', drone_id, { + 'vx': msg.x, + 'vy': msg.y, + 'vz': msg.z + }) + + def altitude_callback(self, drone_id, msg): + self.signals.update_signal.emit('altitude', drone_id, { + 'altitude': msg.data + }) + + def local_pose_callback(self, drone_id, msg): + self.signals.update_signal.emit('local_pose', drone_id, { + 'x': msg.x, + 'y': msg.y, + 'z': msg.z + }) + + def hud_callback(self, drone_id, msg): + self.signals.update_signal.emit('hud', drone_id, { + 'airspeed': msg.airspeed, + 'groundspeed': msg.groundspeed, + 'heading': msg.heading, + 'throttle': msg.throttle, + 'alt': msg.altitude, + 'climb': msg.climb + }) + +class ControlStationUI(QMainWindow): + def __init__(self): + super().__init__() + self.setWindowTitle('GCS') + self.resize(1400, 900) + self.drones = {} + + # 初始化ROS2 + rclpy.init() + self.monitor = DroneMonitor() + self.monitor.signals.update_signal.connect(self.update_ui) + + # ROS执行器配置 + self.executor = rclpy.executors.SingleThreadedExecutor() + self.executor.add_node(self.monitor) + + # 初始化UI + self.drone_positions = {} + self.map_loaded = False + self.init_ui() + + # 定时处理ROS事件 + self.timer = QTimer() + self.timer.timeout.connect(self.spin_ros) + self.timer.start(50) + + def init_ui(self): + main_splitter = QSplitter(Qt.Orientation.Horizontal) + + # 左侧信息面板 + left_panel = QWidget() + left_layout = QVBoxLayout(left_panel) + left_layout.setContentsMargins(5, 5, 5, 5) + + # 信息滚动区域 + scroll_area = QScrollArea() + scroll_area.setWidgetResizable(True) + self.info_container = QWidget() + self.info_layout = QVBoxLayout(self.info_container) + self.info_layout.setAlignment(Qt.AlignmentFlag.AlignTop) + scroll_area.setWidget(self.info_container) + + left_layout.addWidget(scroll_area) + + # 右侧地图区域 + self.map_view = QWebEngineView() + + inline_html = ''' + + + + + + + + + + +
+ + + + ''' + self.map_view.setHtml(inline_html) + self.map_view.loadFinished.connect(self.on_map_loaded) + + main_splitter.addWidget(left_panel) + main_splitter.addWidget(self.map_view) + main_splitter.setSizes([400, 1000]) + + self.setCentralWidget(main_splitter) + + def on_map_loaded(self, ok: bool): + if ok: + self.map_loaded = True + else: + print("⚠️ 地图页加载失败") + + def create_drone_panel(self, drone_id): + panel = QWidget() + panel.setObjectName(f"panel_{drone_id}") + panel.setStyleSheet(""" + QWidget#panel_%s { + background-color: #2A2A2A; + border-radius: 8px; + margin: 6px; + padding: 10px; + border: 1px solid #444; + } + QLabel { + color: #DDD; + font-size: 12px; + padding: 2px; + } + """ % drone_id) + + layout = QVBoxLayout(panel) + layout.setContentsMargins(8, 8, 8, 8) + layout.setSpacing(4) + + # 顶部标题栏 + header = QWidget() + header_layout = QHBoxLayout(header) + header_layout.setContentsMargins(0, 0, 0, 0) + + # ID显示 + id_label = QLabel(drone_id) + id_label.setStyleSheet(""" + font-weight: bold; + font-size: 14px; + color: #7FFFD4; + min-width: 80px; + """) + + # 状态指示灯 + status_light = QLabel("●") + status_light.setStyleSheet("color: #00FF00; font-size: 16px;") + + header_layout.addWidget(id_label) + header_layout.addWidget(status_light) + header_layout.addStretch() + + layout.addWidget(header) + + # 数据字段(带标签) + self.create_data_row(layout, "模式:", f"{drone_id}_mode", "--") + self.create_data_row(layout, "電壓:", f"{drone_id}_battery", "--") + self.create_data_row(layout, "GPS:", f"{drone_id}_gps", "--") + self.create_data_row(layout, "高度:", f"{drone_id}_altitude", "--") + self.create_data_row(layout, "Local:", f"{drone_id}_local", "--") + self.create_data_row(layout, "HUD:", f"{drone_id}_hud", "--") + + return panel + + def create_data_row(self, layout, title, object_name, default): + row = QWidget() + hbox = QHBoxLayout(row) + hbox.setContentsMargins(0, 0, 0, 0) + + # 标题标签 + title_label = QLabel(title) + title_label.setStyleSheet("color: #888; min-width: 80px;") + title_label.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Preferred) + + # 数据标签 + value_label = QLabel(default) + value_label.setObjectName(object_name) + value_label.setWordWrap(True) + value_label.setStyleSheet("margin-left: 10px;") + + hbox.addWidget(title_label) + hbox.addWidget(value_label) + layout.addWidget(row) + + def update_ui(self, msg_type, drone_id, data): + if msg_type == 'new_drone': + self.add_drone(drone_id) + return + + if not (panel := self.drones.get(drone_id)): + return + + if msg_type == 'mode': + self.update_field(panel, drone_id, 'mode', f"{data}", + '#FF5555' if '返航' in data else '#55FF55') + + elif msg_type == 'battery': + voltage = data.get('voltage', 0) + self.update_field(panel, drone_id, 'battery', + f"{voltage:.1f} V", + '#FF6464' if voltage < 12 else '#FFFFFF') + + elif msg_type == 'gps': + lat, lon = data['lat'], data['lon'] + self.drone_positions[drone_id] = (lat, lon) + text = (f"緯度: {lat:.6f}°\n" + f"經度: {lon:.6f}°") + self.update_field(panel, drone_id, 'gps', text) + + elif msg_type == 'altitude': + text = (f"{data['altitude']:.1f} m") + self.update_field(panel, drone_id, 'altitude', text) + + elif msg_type == 'local_pose': + text = (f"({data['x']:.1f}, {data['y']:.1f}, {data['z']:.1f})") + self.update_field(panel, drone_id, 'local', text) + + elif msg_type == 'hud': + heading = data['heading'] + text = (f"地速: {data['groundspeed']:.1f} m/s\n" + f"航向: {heading:.1f}°") + self.update_field(panel, drone_id, 'hud', text) + + if self.map_loaded and drone_id in self.drone_positions: + lat, lon = self.drone_positions[drone_id] + js = f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});" + self.map_view.page().runJavaScript(js) + ''' + elif msg_type == 'velocity': + text = (f"VX: {data['vx']:.1f} m/s\n" + f"VY: {data['vy']:.1f} m/s\n" + f"VZ: {data['vz']:.1f} m/s") + self.update_field(panel, drone_id, 'velocity', text) + + elif msg_type == 'attitude': + text = (f"Roll: {data['roll']:.1f}°\n" + f"Pitch: {data['pitch']:.1f}°\n" + f"Yaw: {data['yaw']:.1f}°") + self.update_field(panel, drone_id, 'attitude', text) + ''' + def update_field(self, panel, drone_id, field, text, color=None): + if label := panel.findChild(QLabel, f"{drone_id}_{field}"): + label.setText(text) + if color: + label.setStyleSheet(f"color: {color};") + + def add_drone(self, drone_id): + if drone_id not in self.drones: + panel = self.create_drone_panel(drone_id) + self.info_layout.addWidget(panel) + self.drones[drone_id] = panel + + def spin_ros(self): + try: + self.executor.spin_once(timeout_sec=0) + except Exception as e: + print(f"ROS spin error: {e}") + + def closeEvent(self, event): + self.monitor.destroy_node() + self.executor.shutdown() + rclpy.shutdown() + event.accept() + +if __name__ == '__main__': + app = QApplication(sys.argv) + station = ControlStationUI() + station.show() + app.exec(app.exec()) \ No newline at end of file diff --git a/src/unitdev01/unitdev01/interface.py b/src/unitdev01/unitdev01/interface.py index 5699e46..07b2773 100644 --- a/src/unitdev01/unitdev01/interface.py +++ b/src/unitdev01/unitdev01/interface.py @@ -144,7 +144,6 @@ class DroneControlApp(QMainWindow): self.workers.imu_signal.connect(self.update_imu) self.workers.hdg_signal.connect(self.update_hdg) self.workers.groundspeed_signal.connect(self.update_groundspeed) - self.workers.start() def initUI(self): diff --git a/src/unitdev01/unitdev01/mavlink.py b/src/unitdev01/unitdev01/mavlink.py index 9078880..b047a0e 100644 --- a/src/unitdev01/unitdev01/mavlink.py +++ b/src/unitdev01/unitdev01/mavlink.py @@ -1,7 +1,6 @@ import sys import time import math -import rclpy from PyQt5 import QtWidgets from PyQt5.QtCore import QThread, pyqtSignal from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox @@ -20,10 +19,11 @@ class MAVLinkWorker(QThread): loss_signal = pyqtSignal(str, float) frequency_signal = pyqtSignal(str, float) param_signal = pyqtSignal(str, int) + kbps_signal = pyqtSignal(str, float) def __init__(self, connection_string="udp:127.0.0.1:14550", usb='/dev/ttyUSB0'): super().__init__() - self.namespaces = ['UAV1', 'UAV4', 'UAV5'] + self.namespaces = ['UAV1', 'UAV2', 'UAV3'] self.connection = mavutil.mavlink_connection(usb, baud=57600) self.connection.wait_heartbeat() for sysid in self.namespaces: @@ -31,38 +31,48 @@ class MAVLinkWorker(QThread): self.set_sr_params(sysid) self.running = True - # 設置需要監控的訊息類型 - self.messages_to_monitor = ["HEARTBEAT", "BATTERY_STATUS", "GLOBAL_POSITION_INT", "GPS_RAW_INT", "LOCAL_POSITION_NED", "ATTITUDE", "VFR_HUD", "TIMESYNC"] + # 用於計算頻率丟包 self.message_count = {} self.frequency = {} self.start_time = {} - - # 用於計算丟包 self.seq_numbers = {} self.packet_loss_count = {} self.total_packet_count = {} self.loss_percentage = {} + self.total_bytes_received = {} + self.throughput_KBps = {} for namespace in self.namespaces: self.request_param(namespace, "SR1_EXTRA1") + self.connection.mav.timesync_send( + 0, #tc1 + int(time.time() * 1e9) #ts1 + ) def run(self): while self.running: try: - current_time = time.time() msg = self.connection.recv_msg() + current_time = time.time() if not msg: continue sysid = msg.get_srcSystem() if sysid == 0: continue namespace = f"UAV{sysid}" - print(sysid) msg_type = msg.get_type() if msg_type =="BAD_DATA": continue + - print(msg_type) + if sysid not in self.total_bytes_received: + self.total_bytes_received[sysid] = 0 + self.throughput_KBps[sysid] = 0 + + # 計算訊息大小 + msg_bytes = msg.get_msgbuf() # 取得封包的 bytes + if msg_bytes: + self.total_bytes_received[sysid] += len(msg_bytes) # Packet loss calculation if sysid not in self.seq_numbers: @@ -71,7 +81,7 @@ class MAVLinkWorker(QThread): self.total_packet_count[sysid] = 1 else: current_seq = msg.get_seq() - expected_seq = (self.seq_numbers[sysid] + 1) % 256 # MAVLink sequence numbers are modulo 256 + expected_seq = (self.seq_numbers[sysid] + 1) % 256 self.total_packet_count[sysid] += 1 if current_seq != expected_seq: # Packet(s) lost @@ -91,27 +101,32 @@ class MAVLinkWorker(QThread): self.message_count[sysid] += 1 # 每隔一段時間更新 - if current_time - self.start_time[sysid] >= 1: + elapsed_time = current_time - self.start_time[sysid] + if elapsed_time >= 1: # 每秒頻率 - self.frequency[sysid] = self.message_count[sysid] / (current_time - self.start_time[sysid]) - self.start_time[sysid] = current_time - self.message_count[sysid] = 0 + self.frequency[sysid] = self.message_count[sysid] / elapsed_time self.frequency_signal.emit(namespace, self.frequency[sysid]) # 發送 timesync request self.connection.mav.timesync_send( 0, #tc1 - int(time.time() * 1e9) #ts1 + int(current_time * 1e9) #ts1 ) - #self.send_heartbeat + #吞吐量 + self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time + self.kbps_signal.emit(namespace, self.throughput_KBps[sysid]) + + self.start_time[sysid] = current_time + self.message_count[sysid] = 0 + self.total_bytes_received[sysid] = 0 if msg_type == "HEARTBEAT": mode = mavutil.mode_string_v10(msg) self.state_signal.emit(namespace, mode) elif msg_type == "BATTERY_STATUS": - percentage = msg.battery_remaining/100 - self.battery_signal.emit(namespace, percentage) + voltage = msg.voltages[0] / 1000 + self.battery_signal.emit(namespace, voltage) elif msg_type == "GLOBAL_POSITION_INT": latitude = msg.lat / 1e7 @@ -140,11 +155,11 @@ class MAVLinkWorker(QThread): self.groundspeed_signal.emit(namespace, groundspeed) elif msg_type == "TIMESYNC": - round_trip_time = (int(time.time() * 1e9) - msg.ts1) / 1e6 + round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6 if(round_trip_time<1e6): self.ping_signal.emit(namespace, round_trip_time) - elif msg.get_type() == 'PARAM_VALUE': + elif msg_type == 'PARAM_VALUE': param_name = "SR1_EXTRA1" if msg.param_id == param_name: self.param_signal.emit(namespace, msg.param_value) @@ -253,13 +268,7 @@ class MAVLinkWorker(QThread): except Exception as e: print(f"Failed to set parameter {param_name}: {e}") - def send_heartbeat(self): - self.connection.mav.heartbeat_send( - mavutil.mavlink.MAV_TYPE_GCS, - mavutil.mavlink.MAV_AUTOPILOT_INVALID, - 0, 0, 0 - ) - + ''' def set_sr_params(self, sysid): """ 直接設置 MAVLink 訊息頻率 """ freqs = [0, 2, 4] @@ -275,6 +284,22 @@ class MAVLinkWorker(QThread): } for param, value in params.items(): self.set_param(f"UAV{sysid}", param, value) + ''' + def set_sr_params(self, sysid): + """ 直接設置 MAVLink 訊息頻率 """ + freqs = [0, 1, 1] + params = { + "SR1_ADSB": freqs[0], + "SR1_EXT_STAT": freqs[0], + "SR1_EXTRA1": freqs[2], + "SR1_EXTRA2": freqs[2], + "SR1_EXTRA3": freqs[0], + "SR1_POSITION": freqs[1], + "SR1_RAW_SENS": freqs[1], + "SR1_RC_CHAN": freqs[0] + } + for param, value in params.items(): + self.set_param(f"UAV{sysid}", param, value) class DroneControlApp(QMainWindow): def __init__(self): @@ -296,6 +321,7 @@ class DroneControlApp(QMainWindow): self.workers.loss_signal.connect(self.update_loss) self.workers.frequency_signal.connect(self.update_frequency) self.workers.param_signal.connect(self.update_param) + self.workers.kbps_signal.connect(self.update_kbps) self.workers.start() def initUI(self): @@ -326,17 +352,18 @@ class DroneControlApp(QMainWindow): # 狀態顯示 self.uav_labels[namespace] = { "status": QLabel("狀態:等待數據..."), - "battery": QLabel("電量:等待數據..."), + "battery": QLabel("電壓:等待數據..."), + "local_position": QLabel("Local Position:等待數據..."), "gps": QLabel("GPS位置:等待數據...\n\n"), "fix": QLabel("Fix Type:等待數據..."), "satellites": QLabel("衛星數量:等待數據..."), - "local_position": QLabel("Local Position:等待數據..."), "groundspeed": QLabel("地面速度:等待數據..."), "pitch": QLabel("Pitch:等待數據..."), "heading": QLabel("Heading:等待數據..."), "ping": QLabel("Ping:等待數據..."), "loss": QLabel("丟包:等待數據..."), "frequency": QLabel("頻率:等待數據..."), + "kbps": QLabel("吞吐量:等待數據..."), "param": QLabel("SR1_EXTRA1:等待數據...") } @@ -370,25 +397,6 @@ class DroneControlApp(QMainWindow): mode_layout.addWidget(self.modeButton) main_layout.addLayout(mode_layout) - # XYZ 設置欄位 - xyz_layout = QHBoxLayout() - self.positionX = QLineEdit() - self.positionX.setPlaceholderText("X") - self.positionY = QLineEdit() - self.positionY.setPlaceholderText("Y") - self.positionZ = QLineEdit() - self.positionZ.setPlaceholderText("Z") - xyz_layout.addWidget(QLabel("輸入位置:")) - xyz_layout.addWidget(self.positionX) - xyz_layout.addWidget(self.positionY) - xyz_layout.addWidget(self.positionZ) - main_layout.addLayout(xyz_layout) - - # 設置 XYZ 的按鈕 - self.setPositionButton = QPushButton("設置位置") - self.setPositionButton.clicked.connect(self.set_local_position) - main_layout.addWidget(self.setPositionButton) - # 解鎖按鈕 fly_layout = QHBoxLayout() self.armButton = QPushButton("解鎖") @@ -406,6 +414,25 @@ class DroneControlApp(QMainWindow): fly_layout.addWidget(self.landButton) main_layout.addLayout(fly_layout) + # XYZ 設置欄位 + xyz_layout = QHBoxLayout() + self.positionX = QLineEdit() + self.positionX.setPlaceholderText("X") + self.positionY = QLineEdit() + self.positionY.setPlaceholderText("Y") + self.positionZ = QLineEdit() + self.positionZ.setPlaceholderText("Z") + self.setPositionButton = QPushButton("設置位置") + self.setPositionButton.clicked.connect(self.set_local_position) + xyz_layout.addWidget(QLabel("輸入位置:")) + xyz_layout.addWidget(self.positionX) + xyz_layout.addWidget(self.positionY) + xyz_layout.addWidget(self.positionZ) + xyz_layout.addWidget(self.setPositionButton) + main_layout.addLayout(xyz_layout) + + # 設置 XYZ 的按鈕 + #設置重啟按鈕 self.rebootButton = QPushButton("重啟") self.rebootButton.clicked.connect(self.reboot_drone) @@ -426,8 +453,8 @@ class DroneControlApp(QMainWindow): def update_state(self, namespace, mode): self.uav_labels[namespace]["status"].setText(f"狀態:{mode}") - def update_battery(self, namespace, percentage): - self.uav_labels[namespace]["battery"].setText(f"電量:{percentage * 100:.2f}%") + def update_battery(self, namespace, voltage): + self.uav_labels[namespace]["battery"].setText(f"電壓:{voltage:.2f} V") def update_gps(self, namespace, latitude, longitude): self.uav_labels[namespace]["gps"].setText(f"GPS位置: \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°") @@ -465,6 +492,9 @@ class DroneControlApp(QMainWindow): def update_frequency(self, namespace, frequency): self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz") + def update_kbps(self, namespace, kbps): + self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s") + def update_param(self, namespace, value): self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}") @@ -488,10 +518,12 @@ class DroneControlApp(QMainWindow): def takeoff_drone(self): try: z_text = self.positionZ.text().strip() - altitude = float(z_text) if z_text else 5.0 - for namespace in self.namespaces: + z = float(z_text) if z_text else 5.0 + h = 2 + for i, namespace in enumerate(self.namespaces): if self.uav_checkboxes[namespace].isChecked(): - self.workers.takeoff(namespace, altitude) + self.workers.takeoff(namespace, z + h * i) + except ValueError: QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") @@ -505,9 +537,11 @@ class DroneControlApp(QMainWindow): x = float(self.positionX.text().strip()) y = float(self.positionY.text().strip()) z = float(self.positionZ.text().strip()) - for namespace in self.namespaces: + h = 2 + for i, namespace in enumerate(self.namespaces): if self.uav_checkboxes[namespace].isChecked(): - self.workers.set_local_position(namespace, x, y, z) + self.workers.set_local_position(namespace, x, y, z + h * i) + except ValueError: QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") @@ -529,8 +563,7 @@ class DroneControlApp(QMainWindow): except ValueError: QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") -def main(args=None): - rclpy.init(args=args) +def main(): app = QtWidgets.QApplication(sys.argv) window = DroneControlApp() window.show() diff --git a/src/unitdev01/unitdev01/xbee.py b/src/unitdev01/unitdev01/xbee.py new file mode 100644 index 0000000..b550e1e --- /dev/null +++ b/src/unitdev01/unitdev01/xbee.py @@ -0,0 +1,619 @@ +import sys +import time +import math +import serial +import struct +from PyQt5 import QtWidgets +from PyQt5.QtCore import QThread, pyqtSignal +from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox +from pymavlink import mavutil +from pymavlink.dialects.v20 import ardupilotmega as mavlink2 + +class PacketCapture: + def __init__(self): + self.data = bytearray() + + def write(self, b): + self.data.extend(b) + return len(b) + +class MAVLinkWorker(QThread): + state_signal = pyqtSignal(str, str) + battery_signal = pyqtSignal(str, float) + gps_signal = pyqtSignal(str, float, float) + gps_status_signal = pyqtSignal(str, int, int) + local_position_signal = pyqtSignal(str, float, float, float) + imu_signal = pyqtSignal(str, float) + hdg_signal = pyqtSignal(str, float) + groundspeed_signal = pyqtSignal(str, float) + ping_signal = pyqtSignal(str, float) + loss_signal = pyqtSignal(str, float) + frequency_signal = pyqtSignal(str, float) + param_signal = pyqtSignal(str, int) + kbps_signal = pyqtSignal(str, float) + + def __init__(self): + super().__init__() + self.namespaces = ['UAV1', 'UAV2', 'UAV3'] + self.connection = mavutil.mavlink_connection('/dev/ttyUSB0', baud=57600) + self.connection.wait_heartbeat() + for sysid in self.namespaces: + sysid = int(sysid.replace('UAV', '')) + self.set_sr_params(sysid) + self.running = True + + # 用於計算頻率丟包 + self.message_count = {} + self.frequency = {} + self.start_time = {} + self.seq_numbers = {} + self.packet_loss_count = {} + self.total_packet_count = {} + self.loss_percentage = {} + self.total_bytes_received = {} + self.throughput_KBps = {} + + for namespace in self.namespaces: + self.request_param(namespace, "SR1_EXTRA1") + self.connection.mav.timesync_send( + 0, #tc1 + int(time.time() * 1e9) #ts1 + ) + + def run(self): + while self.running: + try: + msg = self.connection.recv_msg() + current_time = time.time() + if not msg: + continue + sysid = msg.get_srcSystem() + if sysid == 0: + continue + namespace = f"UAV{sysid}" + msg_type = msg.get_type() + if msg_type =="BAD_DATA": + continue + + if sysid not in self.total_bytes_received: + self.total_bytes_received[sysid] = 0 + self.throughput_KBps[sysid] = 0 + + # 計算訊息大小 + msg_bytes = msg.get_msgbuf() # 取得封包的 bytes + if msg_bytes: + self.total_bytes_received[sysid] += len(msg_bytes) + + # Packet loss calculation + if sysid not in self.seq_numbers: + self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking + self.packet_loss_count[sysid] = 0 + self.total_packet_count[sysid] = 1 + else: + current_seq = msg.get_seq() + expected_seq = (self.seq_numbers[sysid] + 1) % 256 + self.total_packet_count[sysid] += 1 + + if current_seq != expected_seq: # Packet(s) lost + lost_packets = (current_seq - expected_seq) % 256 + self.packet_loss_count[sysid] += lost_packets + self.total_packet_count[sysid] += lost_packets + + self.seq_numbers[sysid] = current_seq + self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100 + self.loss_signal.emit(namespace, self.loss_percentage[sysid]) + + # Frequency calculation + if sysid not in self.message_count: + self.message_count[sysid] = 0 + self.start_time[sysid] = current_time + + self.message_count[sysid] += 1 + + # 每隔一段時間更新 + elapsed_time = current_time - self.start_time[sysid] + if elapsed_time >= 1: + # 每秒頻率 + self.frequency[sysid] = self.message_count[sysid] / elapsed_time + self.frequency_signal.emit(namespace, self.frequency[sysid]) + + # 發送 timesync request + self.connection.mav.timesync_send( + 0, #tc1 + int(current_time * 1e9) #ts1 + ) + #吞吐量 + self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time + self.kbps_signal.emit(namespace, self.throughput_KBps[sysid]) + + self.start_time[sysid] = current_time + self.message_count[sysid] = 0 + self.total_bytes_received[sysid] = 0 + + if msg_type == "HEARTBEAT": + mode = mavutil.mode_string_v10(msg) + self.state_signal.emit(namespace, mode) + + elif msg_type == "BATTERY_STATUS": + voltage = msg.voltages[0] / 1000 + self.battery_signal.emit(namespace, voltage) + + elif msg_type == "GLOBAL_POSITION_INT": + latitude = msg.lat / 1e7 + longitude = msg.lon / 1e7 + self.gps_signal.emit(namespace, latitude, longitude) + + elif msg_type == "GPS_RAW_INT": + fix_type = msg.fix_type + satellites_visible = msg.satellites_visible + self.gps_status_signal.emit(namespace, fix_type, satellites_visible) + + elif msg_type == "LOCAL_POSITION_NED": + x = msg.y + y = msg.x + z = -msg.z + self.local_position_signal.emit(namespace, x, y, z) + + elif msg_type == "ATTITUDE": + pitch = math.degrees(msg.pitch) + self.imu_signal.emit(namespace, pitch) + + elif msg_type == "VFR_HUD": + groundspeed = msg.groundspeed + heading = msg.heading + self.hdg_signal.emit(namespace, heading) + self.groundspeed_signal.emit(namespace, groundspeed) + + elif msg_type == "TIMESYNC": + round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6 + if(round_trip_time<1e6): + self.ping_signal.emit(namespace, round_trip_time) + + elif msg_type == 'PARAM_VALUE': + param_name = "SR1_EXTRA1" + if msg.param_id == param_name: + self.param_signal.emit(namespace, msg.param_value) + + except Exception as e: + print(f"Error reading message: {e}") + + def stop(self): + self.running = False + self.connection.close() + + def build_api_tx_frame(self, data: bytes, frame_id=0x01): + frame_type = 0x10 + dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # Broadcast + dest_addr16 = b'\xFF\xFE' + broadcast_radius = 0x00 + options = 0x00 + + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += dest_addr64 + dest_addr16 + frame += struct.pack(">BB", broadcast_radius, options) + data + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + + def send_command(self, msg): + # Create the packet and send via serial port + PORT = "/dev/ttyUSB0" + BAUD = 57600 + ser = serial.Serial(PORT, BAUD) + capture = PacketCapture() + mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1) + mav.version = 2 + mav.send(msg) + api_frame = self.build_api_tx_frame(capture.data) + ser.write(api_frame) + print("RAW HEX:", capture.data.hex()) + + def set_mode(self, namespace, mode): + sysid = int(namespace.replace('UAV', '')) + if mode == 'STABILIZE': + mode = 0 + elif mode == 'AUTO': + mode = 3 + elif mode == 'GUIDED': + mode = 4 + elif mode == 'LOITER': + mode = 5 + + msg = self.connection.mav.command_long_encode( + target_system=sysid, + target_component=1, + command=mavutil.mavlink.MAV_CMD_DO_SET_MODE, + confirmation=0, + param1=1, # Custom mode enabled + param2=mode, + param3=0, param4=0, param5=0, param6=0, param7=0 + ) + self.send_command(msg) + + def arm(self, namespace, arm): + sysid = int(namespace.replace('UAV', '')) + msg = self.connection.mav.command_long_encode( + target_system=sysid, + target_component=1, + command=mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, + confirmation=0, + param1=1 if arm else 0, # 1 to arm, 0 to disarm + param2=0, param3=0, param4=0, param5=0, param6=0, param7=0 + ) + self.send_command(msg) + + def takeoff(self, namespace, altitude): + sysid = int(namespace.replace('UAV', '')) + msg = self.connection.mav.command_long_encode( + target_system=sysid, + target_component=1, + command=mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, + confirmation=0, + param1=0, param2=0, param3=0, param4=0, param5=0, param6=0, + param7=altitude + ) + self.send_command(msg) + + def land(self, namespace): + sysid = int(namespace.replace('UAV', '')) + msg = self.connection.mav.command_long_encode( + target_system=sysid, + target_component=1, + command=mavutil.mavlink.MAV_CMD_NAV_LAND, + confirmation=0, + param1=0, param2=0, param3=0, param4=0, param5=0, param6=0, + param7=0 + ) + self.send_command(msg) + + def set_local_position(self, namespace, x, y, z): + sysid = int(namespace.replace('UAV', '')) + msg = self.connection.mav.set_position_target_local_ned_encode( + 0, sysid, 1, # time_boot_ms, sysid, compid + mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame + 0b110111111000, # Mask + y, x, -z, # Position + 0, 0, 0, # Velocity + 0, 0, 0, # Acceleration + 0, 0 # Yaw, yaw_rate + ) + self.send_command(msg) + + def reboot_drone(self, namespace): + sysid = int(namespace.replace('UAV', '')) + self.connection.mav.command_long_send( + sysid, + 1, # target_component (一般為1) + mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, # Reboot 命令 + 0, # confirmation + 1, # 第一個參數 (1 = reboot,0 = 無操作,2 = 關機) + 0, 0, 0, 0, 0, 0 # 其餘參數保留 + ) + + def set_param(self, namespace, param_name, value): + sysid = int(namespace.replace('UAV', '')) + try: + self.connection.mav.param_set_send( + sysid, + 1, + param_name.encode('utf-8'), + float(value), + mavutil.mavlink.MAV_PARAM_TYPE_REAL32 + ) + except Exception as e: + print(f"Failed to set parameter {param_name}: {e}") + + def request_param(self, namespace, param_name): + sysid = int(namespace.replace('UAV', '')) + try: + self.connection.mav.param_request_read_send( + sysid, # 系統 ID + 1, # 組件 ID + param_name.encode('utf-8'), # 參數名稱 + -1 # 參數索引,-1 表示根據名稱請求 + ) + except Exception as e: + print(f"Failed to set parameter {param_name}: {e}") + + ''' + def set_sr_params(self, sysid): + """ 直接設置 MAVLink 訊息頻率 """ + freqs = [0, 2, 4] + params = { + "SR1_ADSB": freqs[0], + "SR1_EXT_STAT": freqs[1], + "SR1_EXTRA1": freqs[2], + "SR1_EXTRA2": freqs[2], + "SR1_EXTRA3": freqs[1], + "SR1_POSITION": freqs[1], + "SR1_RAW_SENS": freqs[1], + "SR1_RC_CHAN": freqs[1] + } + for param, value in params.items(): + self.set_param(f"UAV{sysid}", param, value) + ''' + def set_sr_params(self, sysid): + """ 直接設置 MAVLink 訊息頻率 """ + freqs = [0, 1, 1] + params = { + "SR1_ADSB": freqs[0], + "SR1_EXT_STAT": freqs[0], + "SR1_EXTRA1": freqs[2], + "SR1_EXTRA2": freqs[2], + "SR1_EXTRA3": freqs[0], + "SR1_POSITION": freqs[1], + "SR1_RAW_SENS": freqs[1], + "SR1_RC_CHAN": freqs[0] + } + for param, value in params.items(): + self.set_param(f"UAV{sysid}", param, value) + +class DroneControlApp(QMainWindow): + def __init__(self): + super().__init__() + self.workers = MAVLinkWorker() + self.namespaces = self.workers.namespaces + self.initUI() + + # Connect signals to update the UI + self.workers.state_signal.connect(self.update_state) + self.workers.battery_signal.connect(self.update_battery) + self.workers.gps_signal.connect(self.update_gps) + self.workers.gps_status_signal.connect(self.update_gps_status) + self.workers.local_position_signal.connect(self.update_local_position) + self.workers.imu_signal.connect(self.update_imu) + self.workers.hdg_signal.connect(self.update_hdg) + self.workers.groundspeed_signal.connect(self.update_groundspeed) + self.workers.ping_signal.connect(self.update_ping) + self.workers.loss_signal.connect(self.update_loss) + self.workers.frequency_signal.connect(self.update_frequency) + self.workers.param_signal.connect(self.update_param) + self.workers.kbps_signal.connect(self.update_kbps) + self.workers.start() + + def initUI(self): + self.setWindowTitle("多無人機控制介面") + self.setGeometry(100, 100, 800, 600) + + # 主佈局 + main_layout = QVBoxLayout() + + # 無人機選擇區域 + uav_layout = QHBoxLayout() + self.uav_checkboxes = {} + for namespace in self.namespaces: + checkbox = QCheckBox(namespace) + checkbox.setChecked(True) # 預設勾選 + self.uav_checkboxes[namespace] = checkbox + uav_layout.addWidget(checkbox) + main_layout.addLayout(uav_layout) + + # 顯示所有無人機資訊,從左到右顯示 + uav_layout = QHBoxLayout() + + # 逐個顯示 UAV 的資訊 + self.uav_labels = {} + for namespace in self.namespaces: + uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列 + + # 狀態顯示 + self.uav_labels[namespace] = { + "status": QLabel("狀態:等待數據..."), + "battery": QLabel("電壓:等待數據..."), + "local_position": QLabel("Local Position:等待數據..."), + "gps": QLabel("GPS位置:等待數據...\n\n"), + "fix": QLabel("Fix Type:等待數據..."), + "satellites": QLabel("衛星數量:等待數據..."), + "groundspeed": QLabel("地面速度:等待數據..."), + "pitch": QLabel("Pitch:等待數據..."), + "heading": QLabel("Heading:等待數據..."), + "ping": QLabel("Ping:等待數據..."), + "loss": QLabel("丟包:等待數據..."), + "frequency": QLabel("頻率:等待數據..."), + "kbps": QLabel("吞吐量:等待數據..."), + "param": QLabel("SR1_EXTRA1:等待數據...") + } + + # 把每個資訊添加到該 UAV 的垂直佈局中 + for label in self.uav_labels[namespace].values(): + uav_info_layout.addWidget(label) + + # 將該 UAV 的資訊佈局加到主佈局中(從左到右排列) + uav_layout.addLayout(uav_info_layout) + + main_layout.addLayout(uav_layout) + + # SR1_EXTRA1參數設置 + param_layout = QHBoxLayout() + self.paramInput = QLineEdit() + self.paramInput.setPlaceholderText("輸入SR1_EXTRA1的新值") + self.setParamButton = QPushButton("設置SR1_EXTRA1") + self.setParamButton.clicked.connect(self.set_SR1_EXTRA1) + param_layout.addWidget(self.paramInput) + param_layout.addWidget(self.setParamButton) + main_layout.addLayout(param_layout) + + # 模式切換區域 + mode_layout = QHBoxLayout() + self.modeComboBox = QComboBox() + self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"]) + self.modeButton = QPushButton("切換模式") + self.modeButton.clicked.connect(self.change_mode) + mode_layout.addWidget(QLabel("選擇模式:")) + mode_layout.addWidget(self.modeComboBox) + mode_layout.addWidget(self.modeButton) + main_layout.addLayout(mode_layout) + + # 解鎖按鈕 + fly_layout = QHBoxLayout() + self.armButton = QPushButton("解鎖") + self.armButton.clicked.connect(self.arm_drone) + + # 起飛按鈕 + self.takeoffButton = QPushButton("起飛") + self.takeoffButton.clicked.connect(self.takeoff_drone) + + # 降落按鈕 + self.landButton = QPushButton("降落") + self.landButton.clicked.connect(self.land_drone) + fly_layout.addWidget(self.armButton) + fly_layout.addWidget(self.takeoffButton) + fly_layout.addWidget(self.landButton) + main_layout.addLayout(fly_layout) + + # XYZ 設置欄位 + xyz_layout = QHBoxLayout() + self.positionX = QLineEdit() + self.positionX.setPlaceholderText("X") + self.positionY = QLineEdit() + self.positionY.setPlaceholderText("Y") + self.positionZ = QLineEdit() + self.positionZ.setPlaceholderText("Z") + self.setPositionButton = QPushButton("設置位置") + self.setPositionButton.clicked.connect(self.set_local_position) + xyz_layout.addWidget(QLabel("輸入位置:")) + xyz_layout.addWidget(self.positionX) + xyz_layout.addWidget(self.positionY) + xyz_layout.addWidget(self.positionZ) + xyz_layout.addWidget(self.setPositionButton) + main_layout.addLayout(xyz_layout) + + # 設置 XYZ 的按鈕 + + #設置重啟按鈕 + self.rebootButton = QPushButton("重啟") + self.rebootButton.clicked.connect(self.reboot_drone) + main_layout.addWidget(self.rebootButton) + + # 設置主佈局 + central_widget = QWidget(self) + central_widget.setLayout(main_layout) + self.setCentralWidget(central_widget) + self.show() + + def closeEvent(self, event): + try: + self.workers.stop() + finally: + event.accept() + + def update_state(self, namespace, mode): + self.uav_labels[namespace]["status"].setText(f"狀態:{mode}") + + def update_battery(self, namespace, voltage): + self.uav_labels[namespace]["battery"].setText(f"電壓:{voltage:.2f} V") + + def update_gps(self, namespace, latitude, longitude): + self.uav_labels[namespace]["gps"].setText(f"GPS位置: \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°") + + def update_gps_status(self, namespace, fix_type, satellites_visible): + fix_type_str = { + 0: "No Fix", + 1: "No Fix", + 2: "2D Fix", + 3: "3D Fix", + 4: "RTK Float", + 5: "RTK Fix" + }.get(fix_type, "Unknown") + self.uav_labels[namespace]["fix"].setText(f"Fix Type:{fix_type_str}") + self.uav_labels[namespace]["satellites"].setText(f"衛星數量:{satellites_visible}") + + def update_local_position(self, namespace, x, y, z): + self.uav_labels[namespace]["local_position"].setText(f"Local:({x:.2f}, {y:.2f}, {z:.2f})") + + def update_imu(self, namespace, pitch): + self.uav_labels[namespace]["pitch"].setText(f"Pitch:{pitch:.2f}°") + + def update_hdg(self, namespace, heading): + self.uav_labels[namespace]["heading"].setText(f"Heading:{heading:.2f}°") + + def update_groundspeed(self, namespace, groundspeed): + self.uav_labels[namespace]["groundspeed"].setText(f"地面速度:{groundspeed:.2f} m/s") + + def update_ping(self, namespace, ping): + self.uav_labels[namespace]["ping"].setText(f"Ping:{ping:.2f} ms") + + def update_loss(self, namespace, loss): + self.uav_labels[namespace]["loss"].setText(f"丟包:{loss:.2f}%") + + def update_frequency(self, namespace, frequency): + self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz") + + def update_kbps(self, namespace, kbps): + self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s") + + def update_param(self, namespace, value): + self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}") + + def change_mode(self): + try: + selected_mode = self.modeComboBox.currentText() + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_mode(namespace, selected_mode) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}") + + def arm_drone(self): + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.arm(namespace, True) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}") + + def takeoff_drone(self): + try: + z_text = self.positionZ.text().strip() + z = float(z_text) if z_text else 5.0 + h = 2 + for i, namespace in enumerate(self.namespaces): + if self.uav_checkboxes[namespace].isChecked(): + self.workers.takeoff(namespace, z + h * i) + + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + + def land_drone(self): + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.land(namespace) + + def set_local_position(self): + try: + x = float(self.positionX.text().strip()) + y = float(self.positionY.text().strip()) + z = float(self.positionZ.text().strip()) + h = 2 + for i, namespace in enumerate(self.namespaces): + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_local_position(namespace, x, y, z + h * i) + + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + + def reboot_drone(self): + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.reboot_drone(namespace) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"重啟失敗:{e}") + + def set_SR1_EXTRA1(self): + param_value = self.paramInput.text().strip() + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_param(namespace, "SR1_EXTRA1", param_value) + self.workers.request_param(namespace, "SR1_EXTRA1") + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + +def main(): + app = QtWidgets.QApplication(sys.argv) + window = DroneControlApp() + window.show() + sys.exit(app.exec_()) + +if __name__ == '__main__': + main() \ No newline at end of file