Merge branch 'ken' into chiyu

chiyu
Chiyu Chen 12 months ago
commit 4d82b92a11

@ -22,6 +22,27 @@ from pymavlink import mavutil
# ROS2 的 import
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
# 基礎功能的 import
import threading
import queue
import time
# mavlink 的 import
from pymavlink import mavutil
# ROS2 的 import
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device, mavlink_systems
from mavlinkPublish import mavlink_publisher
@ -131,8 +152,20 @@ class mavlink_bridge(Node, mavlink_publisher):
# print("get mode :", mavutil.mode_string_v10(msg)) # debug
# print("record mode :", this_component.emitParams['flightMode_mode']) # debug
elif msg.get_msgId() == 147: # BATTERY_STATUS 處理
this_component = self.mavlink_systems[sysid].components[msg.get_srcComponent()]
elif msg_id == 30: # ATTITUDE 處理
this_component.emitParams['attitude'] = msg
elif msg_id == 32: # LOCAL_POSITION_NED 處理
this_component.emitParams['local_position'] = msg
elif msg_id == 33: # GLOBAL_POSITION_INT 處理
this_component.emitParams['global_position'] = msg
elif msg_id == 74: # VFR_HUD 處理
this_component.emitParams['vfr_hud'] = msg
elif msg_id == 147: # BATTERY_STATUS 處理
this_component.emitParams['battery'] = msg
# ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↑↑↑↑↑↑↑↑↑↑↑↑
@ -273,6 +306,7 @@ class mavlink_object():
self.running = False
def _run_thread(self):
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
start_time = time.time()
while self.running:
@ -282,6 +316,7 @@ class mavlink_object():
try:
mavlinkMsg = self.mavlink_socket.recv_msg()
except Exception as e:
logger.critical(f"Receiving data not mavlink format. Object Delete.")
logger.critical(f"Receiving data not mavlink format. Object Delete.")
print(f"[mavlinkObject.py] Error receiving mavlink data: {e}")
print(mavlinkMsg)

@ -10,8 +10,13 @@ publisher topic name 命名規則為 <前綴詞>/s<sysid>/<具體 topic>
'''
import std_msgs.msg
import sensor_msgs.msg
import geometry_msgs.msg
import mavros_msgs.msg
from theLogger import setup_logger
import math
logger = setup_logger("mavlinkPublish.py")
@ -46,7 +51,158 @@ class mavlink_publisher():
pass
# ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同 ros2 topic 訊息 請放在這裡 ↓↓↓↓↓↓↓↓↓↓↓↓
def euler_to_quaternion(cls,roll, pitch, yaw):
qx = math.sin(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) - math.cos(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
qy = math.cos(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.cos(pitch/2) * math.sin(yaw/2)
qz = math.cos(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) - math.sin(roll/2) * math.sin(pitch/2) * math.cos(yaw/2)
qw = math.cos(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
return [qx, qy, qz, qw]
def create_attitude(self, sysid, component_obj):
target_topic = 'attitude'
try:
_ = component_obj.emitParams['attitude']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(sensor_msgs.msg.Imu, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_attitude]
def packEmit_attitude(self, emitParams, publisher):
mav_msg = emitParams['attitude']
msg = sensor_msgs.msg.Imu()
x, y, z, w = self.euler_to_quaternion(mav_msg.roll, mav_msg.pitch, mav_msg.yaw)
msg.orientation.x = x
msg.orientation.y = y
msg.orientation.z = z
msg.orientation.w = w
msg.angular_velocity.x = mav_msg.rollspeed
msg.angular_velocity.y = mav_msg.pitchspeed
msg.angular_velocity.z = mav_msg.yawspeed
publisher.publish(msg)
pass
def create_local_position_pose(self, sysid, component_obj):
target_topic = 'local_position/pose'
try:
_ = component_obj.emitParams['local_position']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(geometry_msgs.msg.Point, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_pose]
def packEmit_local_pose(cls, emitParams, publisher):
mav_msg = emitParams['local_position']
msg = geometry_msgs.msg.Point()
msg.x = mav_msg.x
msg.y = mav_msg.y
msg.z = mav_msg.z
publisher.publish(msg)
pass
def create_local_position_velocity(self, sysid, component_obj):
target_topic = 'local_position/velocity'
try:
_ = component_obj.emitParams['local_position']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(geometry_msgs.msg.Vector3, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_vel]
def packEmit_local_vel(cls, emitParams, publisher):
mav_msg = emitParams['local_position']
msg = geometry_msgs.msg.Vector3()
msg.x = mav_msg.vx
msg.y = mav_msg.vy
msg.z = mav_msg.vz
publisher.publish(msg)
pass
def create_global_global(self, sysid, component_obj):
target_topic = 'global_position/global'
try:
_ = component_obj.emitParams['global_position']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(sensor_msgs.msg.NavSatFix, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_global]
def packEmit_global_global(cls, emitParams, publisher):
mav_msg = emitParams['global_position']
msg = sensor_msgs.msg.NavSatFix()
msg.latitude = mav_msg.lat/1e7
msg.longitude = mav_msg.lon/1e7
msg.altitude = mav_msg.alt/1e3
publisher.publish(msg)
pass
def create_global_rel(self, sysid, component_obj):
target_topic = 'global_position/rel_alt'
try:
_ = component_obj.emitParams['global_position']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(std_msgs.msg.Float64, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_rel]
def packEmit_global_rel(cls, emitParams, publisher):
mav_msg = emitParams['global_position']
msg = std_msgs.msg.Float64()
msg.data = float(mav_msg.relative_alt/1e3)
publisher.publish(msg)
pass
def create_vfr_hud(self, sysid, component_obj):
target_topic = 'vfr_hud'
try:
_ = component_obj.emitParams['vfr_hud']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(mavros_msgs.msg.VfrHud, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_vfr_hud]
def packEmit_vfr_hud(cls, emitParams, publisher):
mav_msg = emitParams['vfr_hud']
msg = mavros_msgs.msg.VfrHud()
msg.airspeed = mav_msg.airspeed
msg.groundspeed = mav_msg.groundspeed
msg.heading = mav_msg.heading
msg.throttle = float(mav_msg.throttle)
msg.altitude = mav_msg.alt
msg.climb = mav_msg.climb
publisher.publish(msg)
pass
def create_battery(self, sysid, component_obj):
target_topic = 'battery'
try:
_ = component_obj.emitParams['battery']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(sensor_msgs.msg.BatteryState, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_battery]
def packEmit_battery(cls, emitParams, publisher):
mav_msg = emitParams['battery']
msg = sensor_msgs.msg.BatteryState()
msg.voltage = mav_msg.voltages[0]/1e3
publisher.publish(msg)
pass
# ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同 ros2 topic 訊息 請放在這裡 ↑↑↑↑↑↑↑↑↑↑↑↑

@ -0,0 +1,444 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QSplitter, QScrollArea,
QSizePolicy, QApplication)
from PyQt6.QtCore import Qt, QTimer, QObject, pyqtSignal, QUrl
from PyQt6.QtGui import QColor
from PyQt6.QtWebEngineWidgets import QWebEngineView
import math
import re
import os
import sys
from threading import Lock
from geometry_msgs.msg import Point, Vector3
from sensor_msgs.msg import BatteryState, NavSatFix, Imu
from std_msgs.msg import String, Float64
from mavros_msgs.msg import VfrHud
class DroneSignals(QObject):
update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data)
class DroneMonitor(Node):
def __init__(self):
super().__init__('drone_monitor')
self.signals = DroneSignals()
self.drone_topics = {}
self.lock = Lock()
# 主题检测定时器
self.create_timer(1.0, self.scan_topics)
def scan_topics(self):
topics = self.get_topic_names_and_types()
drone_pattern = re.compile(r'/MavLinkBus/(s\d+)/(\w+)')
found_drones = set()
for topic_name, _ in topics:
if match := drone_pattern.match(topic_name):
drone_id, topic_type = match.groups()
found_drones.add(drone_id)
with self.lock:
self.drone_topics.setdefault(drone_id, set()).add(topic_type)
for drone_id in found_drones:
if not hasattr(self, f'drone_{drone_id}_subs'):
self.setup_drone_subs(drone_id)
def setup_drone_subs(self, drone_id):
base_topic = f'/MavLinkBus/{drone_id}'
subs = {
'attitude': self.create_subscription(
Imu,
f'{base_topic}/attitude',
lambda msg, did=drone_id: self.attitude_callback(did, msg),
10
),
'battery': self.create_subscription(
BatteryState,
f'{base_topic}/battery',
lambda msg, did=drone_id: self.battery_callback(did, msg),
10
),
'flightMode': self.create_subscription(
String,
f'{base_topic}/flightMode',
lambda msg, did=drone_id: self.mode_callback(did, msg),
10
),
'global': self.create_subscription(
NavSatFix,
f'{base_topic}/global_position/global',
lambda msg, did=drone_id: self.gps_callback(did, msg),
10
),
'rel_alt': self.create_subscription(
Float64,
f'{base_topic}/global_position/rel_alt',
lambda msg, did=drone_id: self.altitude_callback(did, msg),
10
),
'local_pose': self.create_subscription(
Point,
f'{base_topic}/local_position/pose',
lambda msg, did=drone_id: self.local_pose_callback(did, msg),
10
),
'local_vel': self.create_subscription(
Vector3,
f'{base_topic}/local_position/velocity',
lambda msg, did=drone_id: self.local_vel_callback(did, msg),
10
),
'vfr_hud': self.create_subscription(
VfrHud,
f'{base_topic}/vfr_hud',
lambda msg, did=drone_id: self.hud_callback(did, msg),
10
)
}
setattr(self, f'drone_{drone_id}_subs', subs)
self.signals.update_signal.emit('new_drone', drone_id, None)
def quaternion_to_euler(self, q):
sinr_cosp = 2 * (q.w * q.x + q.y * q.z)
cosr_cosp = 1 - 2 * (q.x**2 + q.y**2)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2 * (q.w * q.y - q.z * q.x)
pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi/2, sinp)
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y**2 + q.z**2)
yaw = math.atan2(siny_cosp, cosy_cosp)
return math.degrees(roll), math.degrees(pitch), math.degrees(yaw)
# 回调函数
def attitude_callback(self, drone_id, msg):
if hasattr(msg, 'orientation'):
roll, pitch, yaw = self.quaternion_to_euler(msg.orientation)
self.signals.update_signal.emit('attitude', drone_id, {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'rates': (msg.angular_velocity.x,
msg.angular_velocity.y,
msg.angular_velocity.z)
})
def battery_callback(self, drone_id, msg):
self.signals.update_signal.emit('battery', drone_id, {
'voltage': msg.voltage
})
def mode_callback(self, drone_id, msg):
self.signals.update_signal.emit('mode', drone_id, msg.data)
def gps_callback(self, drone_id, msg):
self.signals.update_signal.emit('gps', drone_id, {
'lat': msg.latitude,
'lon': msg.longitude
,'alt': msg.altitude
})
def local_vel_callback(self, drone_id, msg):
self.signals.update_signal.emit('velocity', drone_id, {
'vx': msg.x,
'vy': msg.y,
'vz': msg.z
})
def altitude_callback(self, drone_id, msg):
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': msg.data
})
def local_pose_callback(self, drone_id, msg):
self.signals.update_signal.emit('local_pose', drone_id, {
'x': msg.x,
'y': msg.y,
'z': msg.z
})
def hud_callback(self, drone_id, msg):
self.signals.update_signal.emit('hud', drone_id, {
'airspeed': msg.airspeed,
'groundspeed': msg.groundspeed,
'heading': msg.heading,
'throttle': msg.throttle,
'alt': msg.altitude,
'climb': msg.climb
})
class ControlStationUI(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle('GCS')
self.resize(1400, 900)
self.drones = {}
# 初始化ROS2
rclpy.init()
self.monitor = DroneMonitor()
self.monitor.signals.update_signal.connect(self.update_ui)
# ROS执行器配置
self.executor = rclpy.executors.SingleThreadedExecutor()
self.executor.add_node(self.monitor)
# 初始化UI
self.drone_positions = {}
self.map_loaded = False
self.init_ui()
# 定时处理ROS事件
self.timer = QTimer()
self.timer.timeout.connect(self.spin_ros)
self.timer.start(50)
def init_ui(self):
main_splitter = QSplitter(Qt.Orientation.Horizontal)
# 左侧信息面板
left_panel = QWidget()
left_layout = QVBoxLayout(left_panel)
left_layout.setContentsMargins(5, 5, 5, 5)
# 信息滚动区域
scroll_area = QScrollArea()
scroll_area.setWidgetResizable(True)
self.info_container = QWidget()
self.info_layout = QVBoxLayout(self.info_container)
self.info_layout.setAlignment(Qt.AlignmentFlag.AlignTop)
scroll_area.setWidget(self.info_container)
left_layout.addWidget(scroll_area)
# 右侧地图区域
self.map_view = QWebEngineView()
inline_html = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<link rel="stylesheet" href="https://unpkg.com/leaflet/dist/leaflet.css" />
<script src="https://unpkg.com/leaflet/dist/leaflet.js"></script>
<script src="https://unpkg.com/leaflet-rotatedmarker/leaflet.rotatedMarker.js"></script>
<style>
html, body, #map { height: 100%; margin: 0; }
</style>
</head>
<body>
<div id="map"></div>
<script>
var map = L.map('map').setView([0, 0], 2);
L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
maxZoom: 19
}).addTo(map);
var arrowIcon = L.icon({
iconUrl: 'https://cdn-icons-png.flaticon.com/512/399/399308.png', // 朝上的箭頭
iconSize: [40, 40],
iconAnchor: [20, 20]
});
var markers = {};
function updateDrone(lat, lon, id, heading) {
if (markers[id]) {
markers[id].setLatLng([lat, lon]);
markers[id].setRotationAngle(heading);
} else {
markers[id] = L.marker([lat, lon], {
icon: arrowIcon,
rotationAngle: heading,
rotationOrigin: 'center'
}).addTo(map).bindPopup(id);
}
map.setView([lat, lon], 16);
}
</script>
</body>
</html>
'''
self.map_view.setHtml(inline_html)
self.map_view.loadFinished.connect(self.on_map_loaded)
main_splitter.addWidget(left_panel)
main_splitter.addWidget(self.map_view)
main_splitter.setSizes([400, 1000])
self.setCentralWidget(main_splitter)
def on_map_loaded(self, ok: bool):
if ok:
self.map_loaded = True
else:
print("⚠️ 地图页加载失败")
def create_drone_panel(self, drone_id):
panel = QWidget()
panel.setObjectName(f"panel_{drone_id}")
panel.setStyleSheet("""
QWidget#panel_%s {
background-color: #2A2A2A;
border-radius: 8px;
margin: 6px;
padding: 10px;
border: 1px solid #444;
}
QLabel {
color: #DDD;
font-size: 12px;
padding: 2px;
}
""" % drone_id)
layout = QVBoxLayout(panel)
layout.setContentsMargins(8, 8, 8, 8)
layout.setSpacing(4)
# 顶部标题栏
header = QWidget()
header_layout = QHBoxLayout(header)
header_layout.setContentsMargins(0, 0, 0, 0)
# ID显示
id_label = QLabel(drone_id)
id_label.setStyleSheet("""
font-weight: bold;
font-size: 14px;
color: #7FFFD4;
min-width: 80px;
""")
# 状态指示灯
status_light = QLabel("")
status_light.setStyleSheet("color: #00FF00; font-size: 16px;")
header_layout.addWidget(id_label)
header_layout.addWidget(status_light)
header_layout.addStretch()
layout.addWidget(header)
# 数据字段(带标签)
self.create_data_row(layout, "模式:", f"{drone_id}_mode", "--")
self.create_data_row(layout, "電壓:", f"{drone_id}_battery", "--")
self.create_data_row(layout, "GPS:", f"{drone_id}_gps", "--")
self.create_data_row(layout, "高度:", f"{drone_id}_altitude", "--")
self.create_data_row(layout, "Local:", f"{drone_id}_local", "--")
self.create_data_row(layout, "HUD:", f"{drone_id}_hud", "--")
return panel
def create_data_row(self, layout, title, object_name, default):
row = QWidget()
hbox = QHBoxLayout(row)
hbox.setContentsMargins(0, 0, 0, 0)
# 标题标签
title_label = QLabel(title)
title_label.setStyleSheet("color: #888; min-width: 80px;")
title_label.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Preferred)
# 数据标签
value_label = QLabel(default)
value_label.setObjectName(object_name)
value_label.setWordWrap(True)
value_label.setStyleSheet("margin-left: 10px;")
hbox.addWidget(title_label)
hbox.addWidget(value_label)
layout.addWidget(row)
def update_ui(self, msg_type, drone_id, data):
if msg_type == 'new_drone':
self.add_drone(drone_id)
return
if not (panel := self.drones.get(drone_id)):
return
if msg_type == 'mode':
self.update_field(panel, drone_id, 'mode', f"{data}",
'#FF5555' if '返航' in data else '#55FF55')
elif msg_type == 'battery':
voltage = data.get('voltage', 0)
self.update_field(panel, drone_id, 'battery',
f"{voltage:.1f} V",
'#FF6464' if voltage < 12 else '#FFFFFF')
elif msg_type == 'gps':
lat, lon = data['lat'], data['lon']
self.drone_positions[drone_id] = (lat, lon)
text = (f"緯度: {lat:.6f}°\n"
f"經度: {lon:.6f}°")
self.update_field(panel, drone_id, 'gps', text)
elif msg_type == 'altitude':
text = (f"{data['altitude']:.1f} m")
self.update_field(panel, drone_id, 'altitude', text)
elif msg_type == 'local_pose':
text = (f"({data['x']:.1f}, {data['y']:.1f}, {data['z']:.1f})")
self.update_field(panel, drone_id, 'local', text)
elif msg_type == 'hud':
heading = data['heading']
text = (f"地速: {data['groundspeed']:.1f} m/s\n"
f"航向: {heading:.1f}°")
self.update_field(panel, drone_id, 'hud', text)
if self.map_loaded and drone_id in self.drone_positions:
lat, lon = self.drone_positions[drone_id]
js = f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});"
self.map_view.page().runJavaScript(js)
'''
elif msg_type == 'velocity':
text = (f"VX: {data['vx']:.1f} m/s\n"
f"VY: {data['vy']:.1f} m/s\n"
f"VZ: {data['vz']:.1f} m/s")
self.update_field(panel, drone_id, 'velocity', text)
elif msg_type == 'attitude':
text = (f"Roll: {data['roll']:.1f}°\n"
f"Pitch: {data['pitch']:.1f}°\n"
f"Yaw: {data['yaw']:.1f}°")
self.update_field(panel, drone_id, 'attitude', text)
'''
def update_field(self, panel, drone_id, field, text, color=None):
if label := panel.findChild(QLabel, f"{drone_id}_{field}"):
label.setText(text)
if color:
label.setStyleSheet(f"color: {color};")
def add_drone(self, drone_id):
if drone_id not in self.drones:
panel = self.create_drone_panel(drone_id)
self.info_layout.addWidget(panel)
self.drones[drone_id] = panel
def spin_ros(self):
try:
self.executor.spin_once(timeout_sec=0)
except Exception as e:
print(f"ROS spin error: {e}")
def closeEvent(self, event):
self.monitor.destroy_node()
self.executor.shutdown()
rclpy.shutdown()
event.accept()
if __name__ == '__main__':
app = QApplication(sys.argv)
station = ControlStationUI()
station.show()
app.exec(app.exec())

@ -144,7 +144,6 @@ class DroneControlApp(QMainWindow):
self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.start()
def initUI(self):

@ -1,7 +1,6 @@
import sys
import time
import math
import rclpy
from PyQt5 import QtWidgets
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox
@ -20,10 +19,11 @@ class MAVLinkWorker(QThread):
loss_signal = pyqtSignal(str, float)
frequency_signal = pyqtSignal(str, float)
param_signal = pyqtSignal(str, int)
kbps_signal = pyqtSignal(str, float)
def __init__(self, connection_string="udp:127.0.0.1:14550", usb='/dev/ttyUSB0'):
super().__init__()
self.namespaces = ['UAV1', 'UAV4', 'UAV5']
self.namespaces = ['UAV1', 'UAV2', 'UAV3']
self.connection = mavutil.mavlink_connection(usb, baud=57600)
self.connection.wait_heartbeat()
for sysid in self.namespaces:
@ -31,38 +31,48 @@ class MAVLinkWorker(QThread):
self.set_sr_params(sysid)
self.running = True
# 設置需要監控的訊息類型
self.messages_to_monitor = ["HEARTBEAT", "BATTERY_STATUS", "GLOBAL_POSITION_INT", "GPS_RAW_INT", "LOCAL_POSITION_NED", "ATTITUDE", "VFR_HUD", "TIMESYNC"]
# 用於計算頻率丟包
self.message_count = {}
self.frequency = {}
self.start_time = {}
# 用於計算丟包
self.seq_numbers = {}
self.packet_loss_count = {}
self.total_packet_count = {}
self.loss_percentage = {}
self.total_bytes_received = {}
self.throughput_KBps = {}
for namespace in self.namespaces:
self.request_param(namespace, "SR1_EXTRA1")
self.connection.mav.timesync_send(
0, #tc1
int(time.time() * 1e9) #ts1
)
def run(self):
while self.running:
try:
current_time = time.time()
msg = self.connection.recv_msg()
current_time = time.time()
if not msg:
continue
sysid = msg.get_srcSystem()
if sysid == 0:
continue
namespace = f"UAV{sysid}"
print(sysid)
msg_type = msg.get_type()
if msg_type =="BAD_DATA":
continue
print(msg_type)
if sysid not in self.total_bytes_received:
self.total_bytes_received[sysid] = 0
self.throughput_KBps[sysid] = 0
# 計算訊息大小
msg_bytes = msg.get_msgbuf() # 取得封包的 bytes
if msg_bytes:
self.total_bytes_received[sysid] += len(msg_bytes)
# Packet loss calculation
if sysid not in self.seq_numbers:
@ -71,7 +81,7 @@ class MAVLinkWorker(QThread):
self.total_packet_count[sysid] = 1
else:
current_seq = msg.get_seq()
expected_seq = (self.seq_numbers[sysid] + 1) % 256 # MAVLink sequence numbers are modulo 256
expected_seq = (self.seq_numbers[sysid] + 1) % 256
self.total_packet_count[sysid] += 1
if current_seq != expected_seq: # Packet(s) lost
@ -91,27 +101,32 @@ class MAVLinkWorker(QThread):
self.message_count[sysid] += 1
# 每隔一段時間更新
if current_time - self.start_time[sysid] >= 1:
elapsed_time = current_time - self.start_time[sysid]
if elapsed_time >= 1:
# 每秒頻率
self.frequency[sysid] = self.message_count[sysid] / (current_time - self.start_time[sysid])
self.start_time[sysid] = current_time
self.message_count[sysid] = 0
self.frequency[sysid] = self.message_count[sysid] / elapsed_time
self.frequency_signal.emit(namespace, self.frequency[sysid])
# 發送 timesync request
self.connection.mav.timesync_send(
0, #tc1
int(time.time() * 1e9) #ts1
int(current_time * 1e9) #ts1
)
#self.send_heartbeat
#吞吐量
self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time
self.kbps_signal.emit(namespace, self.throughput_KBps[sysid])
self.start_time[sysid] = current_time
self.message_count[sysid] = 0
self.total_bytes_received[sysid] = 0
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
self.state_signal.emit(namespace, mode)
elif msg_type == "BATTERY_STATUS":
percentage = msg.battery_remaining/100
self.battery_signal.emit(namespace, percentage)
voltage = msg.voltages[0] / 1000
self.battery_signal.emit(namespace, voltage)
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
@ -140,11 +155,11 @@ class MAVLinkWorker(QThread):
self.groundspeed_signal.emit(namespace, groundspeed)
elif msg_type == "TIMESYNC":
round_trip_time = (int(time.time() * 1e9) - msg.ts1) / 1e6
round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6
if(round_trip_time<1e6):
self.ping_signal.emit(namespace, round_trip_time)
elif msg.get_type() == 'PARAM_VALUE':
elif msg_type == 'PARAM_VALUE':
param_name = "SR1_EXTRA1"
if msg.param_id == param_name:
self.param_signal.emit(namespace, msg.param_value)
@ -253,13 +268,7 @@ class MAVLinkWorker(QThread):
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
def send_heartbeat(self):
self.connection.mav.heartbeat_send(
mavutil.mavlink.MAV_TYPE_GCS,
mavutil.mavlink.MAV_AUTOPILOT_INVALID,
0, 0, 0
)
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 2, 4]
@ -275,6 +284,22 @@ class MAVLinkWorker(QThread):
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 1, 1]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[0],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[0],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[0]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
class DroneControlApp(QMainWindow):
def __init__(self):
@ -296,6 +321,7 @@ class DroneControlApp(QMainWindow):
self.workers.loss_signal.connect(self.update_loss)
self.workers.frequency_signal.connect(self.update_frequency)
self.workers.param_signal.connect(self.update_param)
self.workers.kbps_signal.connect(self.update_kbps)
self.workers.start()
def initUI(self):
@ -326,17 +352,18 @@ class DroneControlApp(QMainWindow):
# 狀態顯示
self.uav_labels[namespace] = {
"status": QLabel("狀態:等待數據..."),
"battery": QLabel("電量:等待數據..."),
"battery": QLabel("電壓:等待數據..."),
"local_position": QLabel("Local Position等待數據..."),
"gps": QLabel("GPS位置等待數據...\n\n"),
"fix": QLabel("Fix Type等待數據..."),
"satellites": QLabel("衛星數量:等待數據..."),
"local_position": QLabel("Local Position等待數據..."),
"groundspeed": QLabel("地面速度:等待數據..."),
"pitch": QLabel("Pitch等待數據..."),
"heading": QLabel("Heading等待數據..."),
"ping": QLabel("Ping等待數據..."),
"loss": QLabel("丟包:等待數據..."),
"frequency": QLabel("頻率:等待數據..."),
"kbps": QLabel("吞吐量:等待數據..."),
"param": QLabel("SR1_EXTRA1等待數據...")
}
@ -370,25 +397,6 @@ class DroneControlApp(QMainWindow):
mode_layout.addWidget(self.modeButton)
main_layout.addLayout(mode_layout)
# XYZ 設置欄位
xyz_layout = QHBoxLayout()
self.positionX = QLineEdit()
self.positionX.setPlaceholderText("X")
self.positionY = QLineEdit()
self.positionY.setPlaceholderText("Y")
self.positionZ = QLineEdit()
self.positionZ.setPlaceholderText("Z")
xyz_layout.addWidget(QLabel("輸入位置:"))
xyz_layout.addWidget(self.positionX)
xyz_layout.addWidget(self.positionY)
xyz_layout.addWidget(self.positionZ)
main_layout.addLayout(xyz_layout)
# 設置 XYZ 的按鈕
self.setPositionButton = QPushButton("設置位置")
self.setPositionButton.clicked.connect(self.set_local_position)
main_layout.addWidget(self.setPositionButton)
# 解鎖按鈕
fly_layout = QHBoxLayout()
self.armButton = QPushButton("解鎖")
@ -406,6 +414,25 @@ class DroneControlApp(QMainWindow):
fly_layout.addWidget(self.landButton)
main_layout.addLayout(fly_layout)
# XYZ 設置欄位
xyz_layout = QHBoxLayout()
self.positionX = QLineEdit()
self.positionX.setPlaceholderText("X")
self.positionY = QLineEdit()
self.positionY.setPlaceholderText("Y")
self.positionZ = QLineEdit()
self.positionZ.setPlaceholderText("Z")
self.setPositionButton = QPushButton("設置位置")
self.setPositionButton.clicked.connect(self.set_local_position)
xyz_layout.addWidget(QLabel("輸入位置:"))
xyz_layout.addWidget(self.positionX)
xyz_layout.addWidget(self.positionY)
xyz_layout.addWidget(self.positionZ)
xyz_layout.addWidget(self.setPositionButton)
main_layout.addLayout(xyz_layout)
# 設置 XYZ 的按鈕
#設置重啟按鈕
self.rebootButton = QPushButton("重啟")
self.rebootButton.clicked.connect(self.reboot_drone)
@ -426,8 +453,8 @@ class DroneControlApp(QMainWindow):
def update_state(self, namespace, mode):
self.uav_labels[namespace]["status"].setText(f"狀態:{mode}")
def update_battery(self, namespace, percentage):
self.uav_labels[namespace]["battery"].setText(f"量:{percentage * 100:.2f}%")
def update_battery(self, namespace, voltage):
self.uav_labels[namespace]["battery"].setText(f"壓:{voltage:.2f} V")
def update_gps(self, namespace, latitude, longitude):
self.uav_labels[namespace]["gps"].setText(f"GPS位置 \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°")
@ -465,6 +492,9 @@ class DroneControlApp(QMainWindow):
def update_frequency(self, namespace, frequency):
self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz")
def update_kbps(self, namespace, kbps):
self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s")
def update_param(self, namespace, value):
self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}")
@ -488,10 +518,12 @@ class DroneControlApp(QMainWindow):
def takeoff_drone(self):
try:
z_text = self.positionZ.text().strip()
altitude = float(z_text) if z_text else 5.0
for namespace in self.namespaces:
z = float(z_text) if z_text else 5.0
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.takeoff(namespace, altitude)
self.workers.takeoff(namespace, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
@ -505,9 +537,11 @@ class DroneControlApp(QMainWindow):
x = float(self.positionX.text().strip())
y = float(self.positionY.text().strip())
z = float(self.positionZ.text().strip())
for namespace in self.namespaces:
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z)
self.workers.set_local_position(namespace, x, y, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
@ -529,8 +563,7 @@ class DroneControlApp(QMainWindow):
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def main(args=None):
rclpy.init(args=args)
def main():
app = QtWidgets.QApplication(sys.argv)
window = DroneControlApp()
window.show()

@ -0,0 +1,619 @@
import sys
import time
import math
import serial
import struct
from PyQt5 import QtWidgets
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox
from pymavlink import mavutil
from pymavlink.dialects.v20 import ardupilotmega as mavlink2
class PacketCapture:
def __init__(self):
self.data = bytearray()
def write(self, b):
self.data.extend(b)
return len(b)
class MAVLinkWorker(QThread):
state_signal = pyqtSignal(str, str)
battery_signal = pyqtSignal(str, float)
gps_signal = pyqtSignal(str, float, float)
gps_status_signal = pyqtSignal(str, int, int)
local_position_signal = pyqtSignal(str, float, float, float)
imu_signal = pyqtSignal(str, float)
hdg_signal = pyqtSignal(str, float)
groundspeed_signal = pyqtSignal(str, float)
ping_signal = pyqtSignal(str, float)
loss_signal = pyqtSignal(str, float)
frequency_signal = pyqtSignal(str, float)
param_signal = pyqtSignal(str, int)
kbps_signal = pyqtSignal(str, float)
def __init__(self):
super().__init__()
self.namespaces = ['UAV1', 'UAV2', 'UAV3']
self.connection = mavutil.mavlink_connection('/dev/ttyUSB0', baud=57600)
self.connection.wait_heartbeat()
for sysid in self.namespaces:
sysid = int(sysid.replace('UAV', ''))
self.set_sr_params(sysid)
self.running = True
# 用於計算頻率丟包
self.message_count = {}
self.frequency = {}
self.start_time = {}
self.seq_numbers = {}
self.packet_loss_count = {}
self.total_packet_count = {}
self.loss_percentage = {}
self.total_bytes_received = {}
self.throughput_KBps = {}
for namespace in self.namespaces:
self.request_param(namespace, "SR1_EXTRA1")
self.connection.mav.timesync_send(
0, #tc1
int(time.time() * 1e9) #ts1
)
def run(self):
while self.running:
try:
msg = self.connection.recv_msg()
current_time = time.time()
if not msg:
continue
sysid = msg.get_srcSystem()
if sysid == 0:
continue
namespace = f"UAV{sysid}"
msg_type = msg.get_type()
if msg_type =="BAD_DATA":
continue
if sysid not in self.total_bytes_received:
self.total_bytes_received[sysid] = 0
self.throughput_KBps[sysid] = 0
# 計算訊息大小
msg_bytes = msg.get_msgbuf() # 取得封包的 bytes
if msg_bytes:
self.total_bytes_received[sysid] += len(msg_bytes)
# Packet loss calculation
if sysid not in self.seq_numbers:
self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking
self.packet_loss_count[sysid] = 0
self.total_packet_count[sysid] = 1
else:
current_seq = msg.get_seq()
expected_seq = (self.seq_numbers[sysid] + 1) % 256
self.total_packet_count[sysid] += 1
if current_seq != expected_seq: # Packet(s) lost
lost_packets = (current_seq - expected_seq) % 256
self.packet_loss_count[sysid] += lost_packets
self.total_packet_count[sysid] += lost_packets
self.seq_numbers[sysid] = current_seq
self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100
self.loss_signal.emit(namespace, self.loss_percentage[sysid])
# Frequency calculation
if sysid not in self.message_count:
self.message_count[sysid] = 0
self.start_time[sysid] = current_time
self.message_count[sysid] += 1
# 每隔一段時間更新
elapsed_time = current_time - self.start_time[sysid]
if elapsed_time >= 1:
# 每秒頻率
self.frequency[sysid] = self.message_count[sysid] / elapsed_time
self.frequency_signal.emit(namespace, self.frequency[sysid])
# 發送 timesync request
self.connection.mav.timesync_send(
0, #tc1
int(current_time * 1e9) #ts1
)
#吞吐量
self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time
self.kbps_signal.emit(namespace, self.throughput_KBps[sysid])
self.start_time[sysid] = current_time
self.message_count[sysid] = 0
self.total_bytes_received[sysid] = 0
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
self.state_signal.emit(namespace, mode)
elif msg_type == "BATTERY_STATUS":
voltage = msg.voltages[0] / 1000
self.battery_signal.emit(namespace, voltage)
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
longitude = msg.lon / 1e7
self.gps_signal.emit(namespace, latitude, longitude)
elif msg_type == "GPS_RAW_INT":
fix_type = msg.fix_type
satellites_visible = msg.satellites_visible
self.gps_status_signal.emit(namespace, fix_type, satellites_visible)
elif msg_type == "LOCAL_POSITION_NED":
x = msg.y
y = msg.x
z = -msg.z
self.local_position_signal.emit(namespace, x, y, z)
elif msg_type == "ATTITUDE":
pitch = math.degrees(msg.pitch)
self.imu_signal.emit(namespace, pitch)
elif msg_type == "VFR_HUD":
groundspeed = msg.groundspeed
heading = msg.heading
self.hdg_signal.emit(namespace, heading)
self.groundspeed_signal.emit(namespace, groundspeed)
elif msg_type == "TIMESYNC":
round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6
if(round_trip_time<1e6):
self.ping_signal.emit(namespace, round_trip_time)
elif msg_type == 'PARAM_VALUE':
param_name = "SR1_EXTRA1"
if msg.param_id == param_name:
self.param_signal.emit(namespace, msg.param_value)
except Exception as e:
print(f"Error reading message: {e}")
def stop(self):
self.running = False
self.connection.close()
def build_api_tx_frame(self, data: bytes, frame_id=0x01):
frame_type = 0x10
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # Broadcast
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
def send_command(self, msg):
# Create the packet and send via serial port
PORT = "/dev/ttyUSB0"
BAUD = 57600
ser = serial.Serial(PORT, BAUD)
capture = PacketCapture()
mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1)
mav.version = 2
mav.send(msg)
api_frame = self.build_api_tx_frame(capture.data)
ser.write(api_frame)
print("RAW HEX:", capture.data.hex())
def set_mode(self, namespace, mode):
sysid = int(namespace.replace('UAV', ''))
if mode == 'STABILIZE':
mode = 0
elif mode == 'AUTO':
mode = 3
elif mode == 'GUIDED':
mode = 4
elif mode == 'LOITER':
mode = 5
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_DO_SET_MODE,
confirmation=0,
param1=1, # Custom mode enabled
param2=mode,
param3=0, param4=0, param5=0, param6=0, param7=0
)
self.send_command(msg)
def arm(self, namespace, arm):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
confirmation=0,
param1=1 if arm else 0, # 1 to arm, 0 to disarm
param2=0, param3=0, param4=0, param5=0, param6=0, param7=0
)
self.send_command(msg)
def takeoff(self, namespace, altitude):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
confirmation=0,
param1=0, param2=0, param3=0, param4=0, param5=0, param6=0,
param7=altitude
)
self.send_command(msg)
def land(self, namespace):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_NAV_LAND,
confirmation=0,
param1=0, param2=0, param3=0, param4=0, param5=0, param6=0,
param7=0
)
self.send_command(msg)
def set_local_position(self, namespace, x, y, z):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.set_position_target_local_ned_encode(
0, sysid, 1, # time_boot_ms, sysid, compid
mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame
0b110111111000, # Mask
y, x, -z, # Position
0, 0, 0, # Velocity
0, 0, 0, # Acceleration
0, 0 # Yaw, yaw_rate
)
self.send_command(msg)
def reboot_drone(self, namespace):
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.command_long_send(
sysid,
1, # target_component (一般為1)
mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, # Reboot 命令
0, # confirmation
1, # 第一個參數 (1 = reboot0 = 無操作2 = 關機)
0, 0, 0, 0, 0, 0 # 其餘參數保留
)
def set_param(self, namespace, param_name, value):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_set_send(
sysid,
1,
param_name.encode('utf-8'),
float(value),
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
def request_param(self, namespace, param_name):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_request_read_send(
sysid, # 系統 ID
1, # 組件 ID
param_name.encode('utf-8'), # 參數名稱
-1 # 參數索引,-1 表示根據名稱請求
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 2, 4]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[1],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[1],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[1]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 1, 1]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[0],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[0],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[0]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
class DroneControlApp(QMainWindow):
def __init__(self):
super().__init__()
self.workers = MAVLinkWorker()
self.namespaces = self.workers.namespaces
self.initUI()
# Connect signals to update the UI
self.workers.state_signal.connect(self.update_state)
self.workers.battery_signal.connect(self.update_battery)
self.workers.gps_signal.connect(self.update_gps)
self.workers.gps_status_signal.connect(self.update_gps_status)
self.workers.local_position_signal.connect(self.update_local_position)
self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.ping_signal.connect(self.update_ping)
self.workers.loss_signal.connect(self.update_loss)
self.workers.frequency_signal.connect(self.update_frequency)
self.workers.param_signal.connect(self.update_param)
self.workers.kbps_signal.connect(self.update_kbps)
self.workers.start()
def initUI(self):
self.setWindowTitle("多無人機控制介面")
self.setGeometry(100, 100, 800, 600)
# 主佈局
main_layout = QVBoxLayout()
# 無人機選擇區域
uav_layout = QHBoxLayout()
self.uav_checkboxes = {}
for namespace in self.namespaces:
checkbox = QCheckBox(namespace)
checkbox.setChecked(True) # 預設勾選
self.uav_checkboxes[namespace] = checkbox
uav_layout.addWidget(checkbox)
main_layout.addLayout(uav_layout)
# 顯示所有無人機資訊,從左到右顯示
uav_layout = QHBoxLayout()
# 逐個顯示 UAV 的資訊
self.uav_labels = {}
for namespace in self.namespaces:
uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列
# 狀態顯示
self.uav_labels[namespace] = {
"status": QLabel("狀態:等待數據..."),
"battery": QLabel("電壓:等待數據..."),
"local_position": QLabel("Local Position等待數據..."),
"gps": QLabel("GPS位置等待數據...\n\n"),
"fix": QLabel("Fix Type等待數據..."),
"satellites": QLabel("衛星數量:等待數據..."),
"groundspeed": QLabel("地面速度:等待數據..."),
"pitch": QLabel("Pitch等待數據..."),
"heading": QLabel("Heading等待數據..."),
"ping": QLabel("Ping等待數據..."),
"loss": QLabel("丟包:等待數據..."),
"frequency": QLabel("頻率:等待數據..."),
"kbps": QLabel("吞吐量:等待數據..."),
"param": QLabel("SR1_EXTRA1等待數據...")
}
# 把每個資訊添加到該 UAV 的垂直佈局中
for label in self.uav_labels[namespace].values():
uav_info_layout.addWidget(label)
# 將該 UAV 的資訊佈局加到主佈局中(從左到右排列)
uav_layout.addLayout(uav_info_layout)
main_layout.addLayout(uav_layout)
# SR1_EXTRA1參數設置
param_layout = QHBoxLayout()
self.paramInput = QLineEdit()
self.paramInput.setPlaceholderText("輸入SR1_EXTRA1的新值")
self.setParamButton = QPushButton("設置SR1_EXTRA1")
self.setParamButton.clicked.connect(self.set_SR1_EXTRA1)
param_layout.addWidget(self.paramInput)
param_layout.addWidget(self.setParamButton)
main_layout.addLayout(param_layout)
# 模式切換區域
mode_layout = QHBoxLayout()
self.modeComboBox = QComboBox()
self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"])
self.modeButton = QPushButton("切換模式")
self.modeButton.clicked.connect(self.change_mode)
mode_layout.addWidget(QLabel("選擇模式:"))
mode_layout.addWidget(self.modeComboBox)
mode_layout.addWidget(self.modeButton)
main_layout.addLayout(mode_layout)
# 解鎖按鈕
fly_layout = QHBoxLayout()
self.armButton = QPushButton("解鎖")
self.armButton.clicked.connect(self.arm_drone)
# 起飛按鈕
self.takeoffButton = QPushButton("起飛")
self.takeoffButton.clicked.connect(self.takeoff_drone)
# 降落按鈕
self.landButton = QPushButton("降落")
self.landButton.clicked.connect(self.land_drone)
fly_layout.addWidget(self.armButton)
fly_layout.addWidget(self.takeoffButton)
fly_layout.addWidget(self.landButton)
main_layout.addLayout(fly_layout)
# XYZ 設置欄位
xyz_layout = QHBoxLayout()
self.positionX = QLineEdit()
self.positionX.setPlaceholderText("X")
self.positionY = QLineEdit()
self.positionY.setPlaceholderText("Y")
self.positionZ = QLineEdit()
self.positionZ.setPlaceholderText("Z")
self.setPositionButton = QPushButton("設置位置")
self.setPositionButton.clicked.connect(self.set_local_position)
xyz_layout.addWidget(QLabel("輸入位置:"))
xyz_layout.addWidget(self.positionX)
xyz_layout.addWidget(self.positionY)
xyz_layout.addWidget(self.positionZ)
xyz_layout.addWidget(self.setPositionButton)
main_layout.addLayout(xyz_layout)
# 設置 XYZ 的按鈕
#設置重啟按鈕
self.rebootButton = QPushButton("重啟")
self.rebootButton.clicked.connect(self.reboot_drone)
main_layout.addWidget(self.rebootButton)
# 設置主佈局
central_widget = QWidget(self)
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
self.show()
def closeEvent(self, event):
try:
self.workers.stop()
finally:
event.accept()
def update_state(self, namespace, mode):
self.uav_labels[namespace]["status"].setText(f"狀態:{mode}")
def update_battery(self, namespace, voltage):
self.uav_labels[namespace]["battery"].setText(f"電壓:{voltage:.2f} V")
def update_gps(self, namespace, latitude, longitude):
self.uav_labels[namespace]["gps"].setText(f"GPS位置 \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°")
def update_gps_status(self, namespace, fix_type, satellites_visible):
fix_type_str = {
0: "No Fix",
1: "No Fix",
2: "2D Fix",
3: "3D Fix",
4: "RTK Float",
5: "RTK Fix"
}.get(fix_type, "Unknown")
self.uav_labels[namespace]["fix"].setText(f"Fix Type{fix_type_str}")
self.uav_labels[namespace]["satellites"].setText(f"衛星數量:{satellites_visible}")
def update_local_position(self, namespace, x, y, z):
self.uav_labels[namespace]["local_position"].setText(f"Local({x:.2f}, {y:.2f}, {z:.2f})")
def update_imu(self, namespace, pitch):
self.uav_labels[namespace]["pitch"].setText(f"Pitch{pitch:.2f}°")
def update_hdg(self, namespace, heading):
self.uav_labels[namespace]["heading"].setText(f"Heading{heading:.2f}°")
def update_groundspeed(self, namespace, groundspeed):
self.uav_labels[namespace]["groundspeed"].setText(f"地面速度:{groundspeed:.2f} m/s")
def update_ping(self, namespace, ping):
self.uav_labels[namespace]["ping"].setText(f"Ping{ping:.2f} ms")
def update_loss(self, namespace, loss):
self.uav_labels[namespace]["loss"].setText(f"丟包:{loss:.2f}%")
def update_frequency(self, namespace, frequency):
self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz")
def update_kbps(self, namespace, kbps):
self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s")
def update_param(self, namespace, value):
self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}")
def change_mode(self):
try:
selected_mode = self.modeComboBox.currentText()
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_mode(namespace, selected_mode)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}")
def arm_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.arm(namespace, True)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}")
def takeoff_drone(self):
try:
z_text = self.positionZ.text().strip()
z = float(z_text) if z_text else 5.0
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.takeoff(namespace, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def land_drone(self):
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.land(namespace)
def set_local_position(self):
try:
x = float(self.positionX.text().strip())
y = float(self.positionY.text().strip())
z = float(self.positionZ.text().strip())
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def reboot_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.reboot_drone(namespace)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"重啟失敗:{e}")
def set_SR1_EXTRA1(self):
param_value = self.paramInput.text().strip()
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_param(namespace, "SR1_EXTRA1", param_value)
self.workers.request_param(namespace, "SR1_EXTRA1")
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def main():
app = QtWidgets.QApplication(sys.argv)
window = DroneControlApp()
window.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Loading…
Cancel
Save