初始化專案

master
picard 1 year ago
commit b5ff857ef8

26
.gitignore vendored

@ -0,0 +1,26 @@
# 二進制
*.o
*.so
*.out
*.a
*.exe
# 編譯的資料夾
**/build/
**/install/
log/
# CMAKE 的衍生檔
CMakeCache.txt
CMakeFiles/
Makefile
# IDE files
**/.vscode/
**/.idea/
**/__pycache__/
**/*.iml
**/*.class
**/*.pyc
**/*.pyo

@ -0,0 +1,15 @@
這是天雷系統的專案
===
專案核心環境
1. ROS2 Humble
2. Python
===
必要相依套件
===
Package 簡述

@ -0,0 +1,349 @@
import sys
import numpy as np
import rclpy
from rclpy.executors import SingleThreadedExecutor
from PyQt5 import QtWidgets
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox
from std_msgs.msg import Float64
from mavros_msgs.srv import SetMode, CommandBool, CommandTOL
from mavros_msgs.msg import State, GPSRAW, VfrHud
from sensor_msgs.msg import NavSatFix, BatteryState, Imu
from geometry_msgs.msg import PoseStamped
from mavros.base import SENSOR_QOS
class ROS2Worker(QThread):
state_signal = pyqtSignal(str, str) # 新增 namespace 參數
battery_signal = pyqtSignal(str, float) # 新增 namespace 參數
gps_signal = pyqtSignal(str, float, float) # 新增 namespace 參數
gps_status_signal = pyqtSignal(str, int, int)
local_position_signal = pyqtSignal(str, float, float, float) # 新增 namespace 參數
imu_signal = pyqtSignal(str, float, float, float, float)
hdg_signal = pyqtSignal(str, float)
groundspeed_signal = pyqtSignal(str, float) # 新增 namespace 與 groundspeed 值
def __init__(self, namespaces):
super().__init__()
namespaces = ['uav1', 'uav2', 'uav3']
self.node = rclpy.create_node('multi_uav_control_app')
self.executor = SingleThreadedExecutor() # 創建單執行器
self.executor.add_node(self.node) # 添加節點到執行器
self.running = True
for namespace in namespaces:
self.node.create_subscription(State, f'/{namespace}/state',
lambda msg, ns=namespace: self.state_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(BatteryState, f'/{namespace}/battery',
lambda msg, ns=namespace: self.battery_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(NavSatFix, f'/{namespace}/global_position/global',
lambda msg, ns=namespace: self.gps_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(GPSRAW, f'/{namespace}/gpsstatus/gps1/raw',
lambda msg, ns=namespace: self.gps_status_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(PoseStamped, f'/{namespace}/local_position/pose',
lambda msg, ns=namespace: self.local_position_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(Imu, f'/{namespace}/imu/data',
lambda msg, ns=namespace: self.imu_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(Float64, f'/{namespace}/global_position/compass_hdg',
lambda msg, ns=namespace: self.heading_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(VfrHud, f'/{namespace}/vfr_hud',
lambda msg, ns=namespace: self.vfr_hud_callback(msg, ns), qos_profile=SENSOR_QOS)
self.clients = {namespace: {
"set_mode": self.node.create_client(SetMode, f'/{namespace}/set_mode'),
"arming": self.node.create_client(CommandBool, f'/{namespace}/cmd/arming'),
"takeoff": self.node.create_client(CommandTOL, f'/{namespace}/cmd/takeoff'),
"land": self.node.create_client(CommandTOL, f'/{namespace}/cmd/land'),
"local_position": self.node.create_publisher(PoseStamped, f'/{namespace}/setpoint_position/local', 10)
} for namespace in namespaces}
def run(self):
while self.running and rclpy.ok():
self.executor.spin_once(timeout_sec=0.1)
def stop(self):
self.running = False
self.executor.shutdown()
self.node.destroy_node()
def state_callback(self, msg, namespace):
self.state_signal.emit(namespace, msg.mode)
def battery_callback(self, msg, namespace):
self.battery_signal.emit(namespace, msg.percentage)
def gps_callback(self, msg, namespace):
self.gps_signal.emit(namespace, msg.latitude, msg.longitude)
def gps_status_callback(self, msg, namespace):
self.gps_status_signal.emit(namespace, msg.fix_type, msg.satellites_visible)
def local_position_callback(self, msg, namespace):
self.local_position_signal.emit(namespace, msg.pose.position.x, msg.pose.position.y, msg.pose.position.z)
def imu_callback(self, msg, namespace):
self.imu_signal.emit(namespace, msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w)
def heading_callback(self, msg, namespace):
self.hdg_signal.emit(namespace, msg.data)
def vfr_hud_callback(self, msg, namespace):
self.groundspeed_signal.emit(namespace, msg.groundspeed)
def set_mode(self, namespace, mode):
req = SetMode.Request()
req.custom_mode = mode
self.clients[namespace]["set_mode"].call_async(req)
self.node.get_logger().info(f'{namespace}: 模式切換: {mode}')
def arm(self, namespace, arm: bool):
req = CommandBool.Request()
req.value = arm
self.clients[namespace]["arming"].call_async(req)
self.node.get_logger().info(f'{namespace}: {"解鎖" if arm else "上鎖"}')
def takeoff(self, namespace, altitude):
req = CommandTOL.Request()
req.altitude = altitude
req.latitude = 0.0
req.longitude = 0.0
req.yaw = 0.0
req.min_pitch = 0.0
self.clients[namespace]["takeoff"].call_async(req)
self.node.get_logger().info(f'{namespace}: 起飛到高度 {altitude}')
def land(self, namespace):
req = CommandTOL.Request()
req.altitude = 0.0
req.latitude = 0.0
req.longitude = 0.0
req.yaw = 0.0
req.min_pitch = 0.0
self.clients[namespace]["land"].call_async(req)
self.node.get_logger().info(f'{namespace}: 降落')
def set_local_position(self, namespace, x, y, z):
pose_msg = PoseStamped()
pose_msg.pose.position.x = x
pose_msg.pose.position.y = y
pose_msg.pose.position.z = z
self.clients[namespace]["local_position"].publish(pose_msg)
self.node.get_logger().info(f'{namespace}: Published local position: X={x}, Y={y}, Z={z}')
class DroneControlApp(QMainWindow):
def __init__(self):
super().__init__()
self.namespaces = ['uav1', 'uav2', 'uav3']
self.workers = ROS2Worker(self.namespaces)
self.initUI()
# 訂閱所有數據更新
self.workers.state_signal.connect(self.update_state)
self.workers.battery_signal.connect(self.update_battery)
self.workers.gps_signal.connect(self.update_gps)
self.workers.gps_status_signal.connect(self.update_gps_status)
self.workers.local_position_signal.connect(self.update_local_position)
self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.start()
def initUI(self):
self.setWindowTitle("多無人機控制介面")
self.setGeometry(100, 100, 800, 600)
# 主佈局
main_layout = QVBoxLayout()
# 無人機選擇區域
uav_layout = QHBoxLayout()
self.uav_checkboxes = {}
for namespace in self.namespaces:
checkbox = QCheckBox(namespace)
checkbox.setChecked(True) # 預設勾選
self.uav_checkboxes[namespace] = checkbox
uav_layout.addWidget(checkbox)
main_layout.addLayout(uav_layout)
# 顯示所有無人機資訊,從左到右顯示
uav_layout = QHBoxLayout()
# 逐個顯示 UAV 的資訊
self.uav_labels = {}
for namespace in self.namespaces:
uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列
# 狀態顯示
self.uav_labels[namespace] = {
"status": QLabel(f"{namespace} 狀態:等待數據..."),
"battery": QLabel(f"{namespace} 電量:等待數據..."),
"gps": QLabel(f"{namespace} GPS位置等待數據...\n\n"),
"fix": QLabel(f"{namespace} Fix Type等待數據..."),
"satellites": QLabel(f"{namespace} 衛星數量:等待數據..."),
"local_position": QLabel(f"{namespace} Local Position等待數據..."),
"groundspeed": QLabel(f"{namespace} 地面速度:等待數據..."),
"pitch": QLabel(f"{namespace} Pitch等待數據..."),
"heading": QLabel(f"{namespace} Heading等待數據..."),
}
# 把每個資訊添加到該 UAV 的垂直佈局中
for label in self.uav_labels[namespace].values():
uav_info_layout.addWidget(label)
# 將該 UAV 的資訊佈局加到主佈局中(從左到右排列)
uav_layout.addLayout(uav_info_layout)
main_layout.addLayout(uav_layout)
# 模式切換區域
mode_layout = QHBoxLayout()
self.modeComboBox = QComboBox()
self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"])
self.modeButton = QPushButton("切換模式")
self.modeButton.clicked.connect(self.change_mode)
mode_layout.addWidget(QLabel("選擇模式:"))
mode_layout.addWidget(self.modeComboBox)
mode_layout.addWidget(self.modeButton)
main_layout.addLayout(mode_layout)
# XYZ 設置欄位
xyz_layout = QHBoxLayout()
self.positionX = QLineEdit()
self.positionX.setPlaceholderText("X")
self.positionY = QLineEdit()
self.positionY.setPlaceholderText("Y")
self.positionZ = QLineEdit()
self.positionZ.setPlaceholderText("Z")
xyz_layout.addWidget(QLabel("輸入位置:"))
xyz_layout.addWidget(self.positionX)
xyz_layout.addWidget(self.positionY)
xyz_layout.addWidget(self.positionZ)
main_layout.addLayout(xyz_layout)
# 設置 XYZ 的按鈕
self.setPositionButton = QPushButton("設置位置")
self.setPositionButton.clicked.connect(self.set_local_position)
main_layout.addWidget(self.setPositionButton)
# 解鎖按鈕
self.armButton = QPushButton("解鎖")
self.armButton.clicked.connect(self.arm_drone)
main_layout.addWidget(self.armButton)
# 起飛按鈕
self.takeoffButton = QPushButton("起飛")
self.takeoffButton.clicked.connect(self.takeoff_drone)
main_layout.addWidget(self.takeoffButton)
# 降落按鈕
self.landButton = QPushButton("降落")
self.landButton.clicked.connect(self.land_drone)
main_layout.addWidget(self.landButton)
# 設置主佈局
central_widget = QWidget(self)
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
self.show()
def closeEvent(self, event):
try:
self.workers.stop()
finally:
event.accept()
def update_state(self, namespace, mode):
self.uav_labels[namespace]["status"].setText(f"{namespace} 狀態:{mode}")
def update_battery(self, namespace, percentage):
self.uav_labels[namespace]["battery"].setText(f"{namespace} 電量:{percentage * 100:.2f}%")
def update_gps(self, namespace, latitude, longitude):
self.uav_labels[namespace]["gps"].setText(f"{namespace} GPS位置 \nLat:{latitude:.6f}° \nLon:{longitude:.6f}°")
def update_gps_status(self, namespace, fix_type, satellites_visible):
fix_type_str = {
0: "No Fix",
1: "No Fix",
2: "2D Fix",
3: "3D Fix",
4: "RTK Float",
5: "RTK Fix"
}.get(fix_type, "Unknown")
self.uav_labels[namespace]["fix"].setText(f"{namespace} Fix Type{fix_type_str}")
self.uav_labels[namespace]["satellites"].setText(f"{namespace} 衛星數量:{satellites_visible}")
def update_local_position(self, namespace, x, y, z):
self.uav_labels[namespace]["local_position"].setText(f"{namespace} Local({x:.2f}, {y:.2f}, {z:.2f})")
def update_groundspeed(self, namespace, groundspeed):
self.uav_labels[namespace]["groundspeed"].setText(f"{namespace} 地面速度:{groundspeed:.2f} m/s")
def update_imu(self, namespace, x, y, z, w):
q = np.array([x, y, z, w])
pitch = self.quaternion_to_euler(q)
self.uav_labels[namespace]["pitch"].setText(f"{namespace} Pitch{pitch:.2f}°")
def update_hdg(self, namespace, data):
self.uav_labels[namespace]["heading"].setText(f"{namespace} Heading{data:.2f}°")
def change_mode(self):
try:
selected_mode = self.modeComboBox.currentText()
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_mode(namespace, selected_mode)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}")
def set_local_position(self):
try:
x = float(self.positionX.text().strip())
y = float(self.positionY.text().strip())
z = float(self.positionZ.text().strip())
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def arm_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.arm(namespace, True)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}")
def takeoff_drone(self):
try:
z_text = self.positionZ.text().strip()
altitude = float(z_text) if z_text else 5.0
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.takeoff(namespace, altitude)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def land_drone(self):
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.land(namespace)
def quaternion_to_euler(self, q):
# Convert quaternion to Euler angles (roll, pitch, yaw)
x, y, z, w = q
# Pitch (y-axis rotation)
pitch_y = np.arcsin(2.0 * (w * y - z * x))
return pitch_y
def main(args=None):
rclpy.init(args=args)
app = QtWidgets.QApplication(sys.argv)
window = DroneControlApp()
window.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()

@ -0,0 +1,479 @@
import sys
import time
import math
import rclpy
from collections import deque
from PyQt5 import QtWidgets
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox
from pymavlink import mavutil
class MAVLinkWorker(QThread):
state_signal = pyqtSignal(str, str)
battery_signal = pyqtSignal(str, float)
gps_signal = pyqtSignal(str, float, float)
gps_status_signal = pyqtSignal(str, int, int)
local_position_signal = pyqtSignal(str, float, float, float)
imu_signal = pyqtSignal(str, float)
hdg_signal = pyqtSignal(str, float)
groundspeed_signal = pyqtSignal(str, float)
def __init__(self, connection_string="udp:127.0.0.1:14550", usb='/dev/ttyUSB0'):
super().__init__()
self.connection = mavutil.mavlink_connection(connection_string, baud=57600)
self.connection.wait_heartbeat()
self.running = True
self.uav_data = {}
# 設置需要監控的訊息類型
self.messages_to_monitor = ["HEARTBEAT", "BATTERY_STATUS", "GLOBAL_POSITION_INT", "GPS_RAW_INT", "LOCAL_POSITION_NED", "ATTITUDE", "VFR_HUD"]
# 記錄訊息的時間戳和丟包數
self.last_received_time = time.time()
self.message_count = 0
self.missed_messages = 0
# 用於計算頻率的時間窗口(每隔 1 秒計算一次頻率)
self.start_time = time.time()
# 記錄處理過的消息類型及其對應時間
self.message_times = {}
self.delay = deque(maxlen=20)
self.seq_numbers = {}
self.packet_loss_count = {}
self.total_packet_count = {}
self.loss_percentage = {}
self.seq = 0
self.ping_time = 0
def run(self):
while self.running:
try:
current_time = time.time()
self.connection.mav.timesync_send(int(current_time*1e6), 0)
'''
if self.ping_time - current_time > 1e3:
self.connection.mav.ping_send(
time_usec = current_time,
seq = self.seq,
target_system = 0,
target_component = 0
)
print(f"Ping {self.seq} sent at {current_time} ms")
'''
msg = self.connection.recv_match(blocking=True)
if not msg:
continue
msg_type = msg.get_type()
sysid = msg.get_srcSystem()
namespace = f"uav{sysid}"
if sysid not in self.uav_data:
self.uav_data[sysid] = {"sysid": sysid}
# Packet loss calculation
if sysid not in self.seq_numbers:
self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking
self.packet_loss_count[sysid] = 0
self.total_packet_count[sysid] = 1
else:
current_seq = msg.get_seq()
expected_seq = (self.seq_numbers[sysid] + 1) % 256 # MAVLink sequence numbers are modulo 256
self.total_packet_count[sysid] += 1
if current_seq != expected_seq:
# Packet(s) lost
lost_packets = (current_seq - expected_seq) % 256
self.packet_loss_count[sysid] += lost_packets
self.seq_numbers[sysid] = current_seq
'''
if self.total_packet_count[sysid] > 0:
self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100
print(f"UAV {sysid} Packet Loss: {self.packet_loss_count[sysid]} / {self.total_packet_count[sysid]} "
f"({self.loss_percentage[sysid]:.2f}%)")
'''
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
self.state_signal.emit(namespace, mode)
elif msg_type == "BATTERY_STATUS":
percentage = msg.battery_remaining/100
self.battery_signal.emit(namespace, percentage)
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
longitude = msg.lon / 1e7
self.gps_signal.emit(namespace, latitude, longitude)
elif msg_type == "GPS_RAW_INT":
fix_type = msg.fix_type
satellites_visible = msg.satellites_visible
self.gps_status_signal.emit(namespace, fix_type, satellites_visible)
elif msg_type == "LOCAL_POSITION_NED":
x = msg.y
y = msg.x
z = -msg.z
self.local_position_signal.emit(namespace, x, y, z)
elif msg_type == "ATTITUDE":
pitch = math.degrees(msg.pitch)
self.imu_signal.emit(namespace, pitch)
elif msg_type == "VFR_HUD":
groundspeed = msg.groundspeed
heading = msg.heading
self.hdg_signal.emit(namespace, heading)
self.groundspeed_signal.emit(namespace, groundspeed)
elif msg_type == "TIMESYNC":
# Calculate the round-trip time in milliseconds
round_trip_time = (int(time.time() * 1e6) - msg.ts1) / 1000.0
print(f"Round-trip time (ping): {round_trip_time:.3f} ms")
self.seq += 1
if msg_type in self.messages_to_monitor:
# 更新消息接收時間
if msg_type not in self.message_times:
self.message_times[msg_type] = current_time
else:
# 計算每條消息的頻率
self.time_diff = current_time - self.message_times[msg_type]
self.message_times[msg_type] = current_time
frequency = 1 / self.time_diff if self.time_diff > 0 else 0
timestamp = msg._timestamp
round_trip_delay = (timestamp - current_time)*1000
self.delay.append(round_trip_delay)
# 計算總的頻率
self.message_count += 1
self.last_received_time = current_time
# 每隔一段時間更新總頻率
if current_time - self.start_time >= 1:
# 更新每秒頻率
frequency = self.message_count / (current_time - self.start_time)
print(f"Overall frequency: {frequency:.2f} Hz")
if self.delay:
average = sum(self.delay) / len(self.delay)
print(f"The average delay is: {average:.4f} ms")
self.start_time = current_time
self.message_count = 0
except Exception as e:
print(f"Error reading message: {e}")
def stop(self):
self.running = False
self.connection.close()
def set_mode(self, namespace, mode):
sysid = int(namespace[-1])
if mode == 'STABILIZE':
mode = 0
elif mode == 'AUTO':
mode = 3
elif mode == 'GUIDED':
mode = 4
elif mode == 'LOITER':
mode = 5
self.connection.mav.set_mode_send( #SET_MODE (11) — [DEP]
sysid,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode
)
def arm(self, namespace, arm):
sysid = int(namespace[-1])
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, #MAV_CMD_COMPONENT_ARM_DISARM (400)
0, # Confirmation
1 if arm else 0,
0, 0, 0, 0, 0, 0
)
def takeoff(self, namespace, altitude):
sysid = int(namespace[-1])
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, #MAV_CMD_NAV_TAKEOFF (22)
0, # Confirmation (0 = first transmission)
0, 0, 0, 0, # Parameters 1-4 (Unused in takeoff command)
0, 0, altitude # Latitude, Longitude, Altitude (target_altitude in meters)
)
def land(self, namespace):
sysid = int(namespace[-1])
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_NAV_LAND, #MAV_CMD_NAV_LAND (21)
0,
0, 0, 0, 0,
0, 0, 0
)
def set_local_position(self, namespace, x, y, z):
sysid = int(namespace[-1])
self.connection.mav.set_position_target_local_ned_send(
0, sysid, 1, # time_boot_ms, sysid, compid
mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame
0b110111111000, # Mask
y, x, -z, # Position
0, 0, 0, # Velocity
0, 0, 0, # Acceleration
0, 0 # Yaw, yaw_rate
)
def set_parameter(self, namespace, param_name, value):
sysid = int(namespace[-1])
try:
self.connection.mav.param_set_send(
sysid,
1,
param_name.encode('utf-8'),
float(value),
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
print(f"Parameter {param_name} set to {value}")
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
class DroneControlApp(QMainWindow):
def __init__(self):
super().__init__()
self.namespaces = ['uav1', 'uav4', 'uav100']
self.workers = MAVLinkWorker()
self.initUI()
# Connect signals to update the UI
self.workers.state_signal.connect(self.update_state)
self.workers.battery_signal.connect(self.update_battery)
self.workers.gps_signal.connect(self.update_gps)
self.workers.gps_status_signal.connect(self.update_gps_status)
self.workers.local_position_signal.connect(self.update_local_position)
self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.start()
def initUI(self):
self.setWindowTitle("多無人機控制介面")
self.setGeometry(100, 100, 800, 600)
# 主佈局
main_layout = QVBoxLayout()
# 無人機選擇區域
uav_layout = QHBoxLayout()
self.uav_checkboxes = {}
for namespace in self.namespaces:
checkbox = QCheckBox(namespace)
checkbox.setChecked(True) # 預設勾選
self.uav_checkboxes[namespace] = checkbox
uav_layout.addWidget(checkbox)
main_layout.addLayout(uav_layout)
# 顯示所有無人機資訊,從左到右顯示
uav_layout = QHBoxLayout()
# 逐個顯示 UAV 的資訊
self.uav_labels = {}
for namespace in self.namespaces:
uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列
# 狀態顯示
self.uav_labels[namespace] = {
"status": QLabel(f"{namespace} 狀態:等待數據..."),
"battery": QLabel(f"{namespace} 電量:等待數據..."),
"gps": QLabel(f"{namespace} GPS位置等待數據...\n\n"),
"fix": QLabel(f"{namespace} Fix Type等待數據..."),
"satellites": QLabel(f"{namespace} 衛星數量:等待數據..."),
"local_position": QLabel(f"{namespace} Local Position等待數據..."),
"groundspeed": QLabel(f"{namespace} 地面速度:等待數據..."),
"pitch": QLabel(f"{namespace} Pitch等待數據..."),
"heading": QLabel(f"{namespace} Heading等待數據..."),
}
# 把每個資訊添加到該 UAV 的垂直佈局中
for label in self.uav_labels[namespace].values():
uav_info_layout.addWidget(label)
# 將該 UAV 的資訊佈局加到主佈局中(從左到右排列)
uav_layout.addLayout(uav_info_layout)
main_layout.addLayout(uav_layout)
# EK3_SRC2_POSXY參數設置
param_layout = QHBoxLayout()
self.paramInput = QLineEdit()
self.paramInput.setPlaceholderText("輸入EK3_SRC2_POSXY的新值")
self.setParamButton = QPushButton("設置EK3_SRC2_POSXY")
self.setParamButton.clicked.connect(self.set_ek3_src2_posxy)
param_layout.addWidget(QLabel("EK3_SRC2_POSXY:"))
param_layout.addWidget(self.paramInput)
param_layout.addWidget(self.setParamButton)
main_layout.addLayout(param_layout)
# 模式切換區域
mode_layout = QHBoxLayout()
self.modeComboBox = QComboBox()
self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"])
self.modeButton = QPushButton("切換模式")
self.modeButton.clicked.connect(self.change_mode)
mode_layout.addWidget(QLabel("選擇模式:"))
mode_layout.addWidget(self.modeComboBox)
mode_layout.addWidget(self.modeButton)
main_layout.addLayout(mode_layout)
# XYZ 設置欄位
xyz_layout = QHBoxLayout()
self.positionX = QLineEdit()
self.positionX.setPlaceholderText("X")
self.positionY = QLineEdit()
self.positionY.setPlaceholderText("Y")
self.positionZ = QLineEdit()
self.positionZ.setPlaceholderText("Z")
xyz_layout.addWidget(QLabel("輸入位置:"))
xyz_layout.addWidget(self.positionX)
xyz_layout.addWidget(self.positionY)
xyz_layout.addWidget(self.positionZ)
main_layout.addLayout(xyz_layout)
# 設置 XYZ 的按鈕
self.setPositionButton = QPushButton("設置位置")
self.setPositionButton.clicked.connect(self.set_local_position)
main_layout.addWidget(self.setPositionButton)
# 解鎖按鈕
self.armButton = QPushButton("解鎖")
self.armButton.clicked.connect(self.arm_drone)
main_layout.addWidget(self.armButton)
# 起飛按鈕
self.takeoffButton = QPushButton("起飛")
self.takeoffButton.clicked.connect(self.takeoff_drone)
main_layout.addWidget(self.takeoffButton)
# 降落按鈕
self.landButton = QPushButton("降落")
self.landButton.clicked.connect(self.land_drone)
main_layout.addWidget(self.landButton)
# 設置主佈局
central_widget = QWidget(self)
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
self.show()
def closeEvent(self, event):
try:
self.workers.stop()
finally:
event.accept()
def update_state(self, namespace, mode):
self.uav_labels[namespace]["status"].setText(f"{namespace} 狀態:{mode}")
def update_battery(self, namespace, percentage):
self.uav_labels[namespace]["battery"].setText(f"{namespace} 電量:{percentage * 100:.2f}%")
def update_gps(self, namespace, latitude, longitude):
self.uav_labels[namespace]["gps"].setText(f"{namespace} GPS位置 \nLat:{latitude:.6f}° \nLon:{longitude:.6f}°")
def update_gps_status(self, namespace, fix_type, satellites_visible):
fix_type_str = {
0: "No Fix",
1: "No Fix",
2: "2D Fix",
3: "3D Fix",
4: "RTK Float",
5: "RTK Fix"
}.get(fix_type, "Unknown")
self.uav_labels[namespace]["fix"].setText(f"{namespace} Fix Type{fix_type_str}")
self.uav_labels[namespace]["satellites"].setText(f"{namespace} 衛星數量:{satellites_visible}")
def update_local_position(self, namespace, x, y, z):
self.uav_labels[namespace]["local_position"].setText(f"{namespace} Local({x:.2f}, {y:.2f}, {z:.2f})")
def update_imu(self, namespace, pitch):
self.uav_labels[namespace]["pitch"].setText(f"{namespace} Pitch{pitch:.2f}°")
def update_hdg(self, namespace, heading):
self.uav_labels[namespace]["heading"].setText(f"{namespace} Heading{heading:.2f}°")
def update_groundspeed(self, namespace, groundspeed):
self.uav_labels[namespace]["groundspeed"].setText(f"{namespace} 地面速度:{groundspeed:.2f} m/s")
def change_mode(self):
try:
selected_mode = self.modeComboBox.currentText()
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_mode(namespace, selected_mode)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}")
def arm_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.arm(namespace, True)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}")
def takeoff_drone(self):
try:
z_text = self.positionZ.text().strip()
altitude = float(z_text) if z_text else 5.0
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.takeoff(namespace, altitude)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def land_drone(self):
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.land(namespace)
def set_local_position(self):
try:
x = float(self.positionX.text().strip())
y = float(self.positionY.text().strip())
z = float(self.positionZ.text().strip())
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def set_ek3_src2_posxy(self):
param_value = self.paramInput.text().strip()
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_parameter(namespace, "EK3_SRC2_POSXY", param_value)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def main(args=None):
rclpy.init(args=args)
app = QtWidgets.QApplication(sys.argv)
window = DroneControlApp()
window.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>drone_controller</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="dodo@todo.todo">dodo</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/drone_controller
[install]
install_scripts=$base/lib/drone_controller

@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'drone_controller'
setup(
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='dodo',
maintainer_email='dodo@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'interface = drone_controller.interface:main',
'mavlink = drone_controller.mavlink:main'
],
},
)
Loading…
Cancel
Save