Update 'src/unitdev01/unitdev01/interface.py'

ken910606 1 year ago
parent a8b36c391f
commit b8965c0bd6

@ -1,289 +1,141 @@
import sys import sys
import time import numpy as np
import math
import rclpy import rclpy
from rclpy.executors import SingleThreadedExecutor
from PyQt5 import QtWidgets from PyQt5 import QtWidgets
from PyQt5.QtCore import QThread, pyqtSignal from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox
from pymavlink import mavutil from std_msgs.msg import Float64
from mavros_msgs.srv import SetMode, CommandBool, CommandTOL
class MAVLinkWorker(QThread): from mavros_msgs.msg import State, GPSRAW, VfrHud
state_signal = pyqtSignal(str, str) from sensor_msgs.msg import NavSatFix, BatteryState, Imu
battery_signal = pyqtSignal(str, float) from geometry_msgs.msg import PoseStamped
gps_signal = pyqtSignal(str, float, float) from mavros.base import SENSOR_QOS
class ROS2Worker(QThread):
state_signal = pyqtSignal(str, str) # 新增 namespace 參數
battery_signal = pyqtSignal(str, float) # 新增 namespace 參數
gps_signal = pyqtSignal(str, float, float) # 新增 namespace 參數
gps_status_signal = pyqtSignal(str, int, int) gps_status_signal = pyqtSignal(str, int, int)
local_position_signal = pyqtSignal(str, float, float, float) local_position_signal = pyqtSignal(str, float, float, float) # 新增 namespace 參數
imu_signal = pyqtSignal(str, float) imu_signal = pyqtSignal(str, float, float, float, float)
hdg_signal = pyqtSignal(str, float) hdg_signal = pyqtSignal(str, float)
groundspeed_signal = pyqtSignal(str, float) groundspeed_signal = pyqtSignal(str, float) # 新增 namespace 與 groundspeed 值
ping_signal = pyqtSignal(str, float)
loss_signal = pyqtSignal(str, float)
frequency_signal = pyqtSignal(str, float)
param_signal = pyqtSignal(str, int)
def __init__(self, connection_string="udp:127.0.0.1:14550", usb='/dev/ttyUSB0'): def __init__(self, namespaces):
super().__init__() super().__init__()
self.namespaces = ['UAV1', 'UAV4', 'UAV5'] namespaces = ['uav1', 'uav2', 'uav3']
self.connection = mavutil.mavlink_connection(usb, baud=57600) self.node = rclpy.create_node('multi_uav_control_app')
self.connection.wait_heartbeat() self.executor = SingleThreadedExecutor() # 創建單執行器
for sysid in self.namespaces: self.executor.add_node(self.node) # 添加節點到執行器
sysid = int(sysid.replace('UAV', ''))
self.set_sr_params(sysid)
self.running = True self.running = True
# 設置需要監控的訊息類型 for namespace in namespaces:
self.messages_to_monitor = ["HEARTBEAT", "BATTERY_STATUS", "GLOBAL_POSITION_INT", "GPS_RAW_INT", "LOCAL_POSITION_NED", "ATTITUDE", "VFR_HUD", "TIMESYNC"] self.node.create_subscription(State, f'/{namespace}/state',
self.message_count = {} lambda msg, ns=namespace: self.state_callback(msg, ns), qos_profile=SENSOR_QOS)
self.frequency = {} self.node.create_subscription(BatteryState, f'/{namespace}/battery',
self.start_time = {} lambda msg, ns=namespace: self.battery_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(NavSatFix, f'/{namespace}/global_position/global',
# 用於計算丟包 lambda msg, ns=namespace: self.gps_callback(msg, ns), qos_profile=SENSOR_QOS)
self.seq_numbers = {} self.node.create_subscription(GPSRAW, f'/{namespace}/gpsstatus/gps1/raw',
self.packet_loss_count = {} lambda msg, ns=namespace: self.gps_status_callback(msg, ns), qos_profile=SENSOR_QOS)
self.total_packet_count = {} self.node.create_subscription(PoseStamped, f'/{namespace}/local_position/pose',
self.loss_percentage = {} lambda msg, ns=namespace: self.local_position_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(Imu, f'/{namespace}/imu/data',
for namespace in self.namespaces: lambda msg, ns=namespace: self.imu_callback(msg, ns), qos_profile=SENSOR_QOS)
self.request_param(namespace, "SR1_EXTRA1") self.node.create_subscription(Float64, f'/{namespace}/global_position/compass_hdg',
lambda msg, ns=namespace: self.heading_callback(msg, ns), qos_profile=SENSOR_QOS)
self.node.create_subscription(VfrHud, f'/{namespace}/vfr_hud',
lambda msg, ns=namespace: self.vfr_hud_callback(msg, ns), qos_profile=SENSOR_QOS)
self.clients = {namespace: {
"set_mode": self.node.create_client(SetMode, f'/{namespace}/set_mode'),
"arming": self.node.create_client(CommandBool, f'/{namespace}/cmd/arming'),
"takeoff": self.node.create_client(CommandTOL, f'/{namespace}/cmd/takeoff'),
"land": self.node.create_client(CommandTOL, f'/{namespace}/cmd/land'),
"local_position": self.node.create_publisher(PoseStamped, f'/{namespace}/setpoint_position/local', 10)
} for namespace in namespaces}
def run(self): def run(self):
while self.running: while self.running and rclpy.ok():
try: self.executor.spin_once(timeout_sec=0.1)
current_time = time.time()
msg = self.connection.recv_msg()
if not msg:
continue
sysid = msg.get_srcSystem()
if sysid == 0:
continue
namespace = f"UAV{sysid}"
print(sysid)
msg_type = msg.get_type()
if msg_type =="BAD_DATA":
continue
print(msg_type)
# Packet loss calculation
if sysid not in self.seq_numbers:
self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking
self.packet_loss_count[sysid] = 0
self.total_packet_count[sysid] = 1
else:
current_seq = msg.get_seq()
expected_seq = (self.seq_numbers[sysid] + 1) % 256 # MAVLink sequence numbers are modulo 256
self.total_packet_count[sysid] += 1
if current_seq != expected_seq: # Packet(s) lost
lost_packets = (current_seq - expected_seq) % 256
self.packet_loss_count[sysid] += lost_packets
self.total_packet_count[sysid] += lost_packets
self.seq_numbers[sysid] = current_seq
self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100
self.loss_signal.emit(namespace, self.loss_percentage[sysid])
# Frequency calculation
if sysid not in self.message_count:
self.message_count[sysid] = 0
self.start_time[sysid] = current_time
self.message_count[sysid] += 1
# 每隔一段時間更新
if current_time - self.start_time[sysid] >= 1:
# 每秒頻率
self.frequency[sysid] = self.message_count[sysid] / (current_time - self.start_time[sysid])
self.start_time[sysid] = current_time
self.message_count[sysid] = 0
self.frequency_signal.emit(namespace, self.frequency[sysid])
# 發送 timesync request
self.connection.mav.timesync_send(
0, #tc1
int(time.time() * 1e9) #ts1
)
#self.send_heartbeat
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
self.state_signal.emit(namespace, mode)
elif msg_type == "BATTERY_STATUS":
percentage = msg.battery_remaining/100
self.battery_signal.emit(namespace, percentage)
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
longitude = msg.lon / 1e7
self.gps_signal.emit(namespace, latitude, longitude)
elif msg_type == "GPS_RAW_INT":
fix_type = msg.fix_type
satellites_visible = msg.satellites_visible
self.gps_status_signal.emit(namespace, fix_type, satellites_visible)
elif msg_type == "LOCAL_POSITION_NED":
x = msg.y
y = msg.x
z = -msg.z
self.local_position_signal.emit(namespace, x, y, z)
elif msg_type == "ATTITUDE":
pitch = math.degrees(msg.pitch)
self.imu_signal.emit(namespace, pitch)
elif msg_type == "VFR_HUD":
groundspeed = msg.groundspeed
heading = msg.heading
self.hdg_signal.emit(namespace, heading)
self.groundspeed_signal.emit(namespace, groundspeed)
elif msg_type == "TIMESYNC":
round_trip_time = (int(time.time() * 1e9) - msg.ts1) / 1e6
if(round_trip_time<1e6):
self.ping_signal.emit(namespace, round_trip_time)
elif msg.get_type() == 'PARAM_VALUE':
param_name = "SR1_EXTRA1"
if msg.param_id == param_name:
self.param_signal.emit(namespace, msg.param_value)
except Exception as e:
print(f"Error reading message: {e}")
def stop(self): def stop(self):
self.running = False self.running = False
self.connection.close() self.executor.shutdown()
self.node.destroy_node()
def state_callback(self, msg, namespace):
self.state_signal.emit(namespace, msg.mode)
def battery_callback(self, msg, namespace):
self.battery_signal.emit(namespace, msg.percentage)
def gps_callback(self, msg, namespace):
self.gps_signal.emit(namespace, msg.latitude, msg.longitude)
def gps_status_callback(self, msg, namespace):
self.gps_status_signal.emit(namespace, msg.fix_type, msg.satellites_visible)
def local_position_callback(self, msg, namespace):
self.local_position_signal.emit(namespace, msg.pose.position.x, msg.pose.position.y, msg.pose.position.z)
def imu_callback(self, msg, namespace):
self.imu_signal.emit(namespace, msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w)
def heading_callback(self, msg, namespace):
self.hdg_signal.emit(namespace, msg.data)
def vfr_hud_callback(self, msg, namespace):
self.groundspeed_signal.emit(namespace, msg.groundspeed)
def set_mode(self, namespace, mode): def set_mode(self, namespace, mode):
sysid = int(namespace.replace('UAV', '')) req = SetMode.Request()
if mode == 'STABILIZE': req.custom_mode = mode
mode = 0 self.clients[namespace]["set_mode"].call_async(req)
elif mode == 'AUTO': self.node.get_logger().info(f'{namespace}: 模式切換: {mode}')
mode = 3
elif mode == 'GUIDED': def arm(self, namespace, arm: bool):
mode = 4 req = CommandBool.Request()
elif mode == 'LOITER': req.value = arm
mode = 5 self.clients[namespace]["arming"].call_async(req)
self.connection.mav.set_mode_send( #SET_MODE (11) — [DEP] self.node.get_logger().info(f'{namespace}: {"解鎖" if arm else "上鎖"}')
sysid,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode
)
def arm(self, namespace, arm):
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, #MAV_CMD_COMPONENT_ARM_DISARM (400)
0, # Confirmation
1 if arm else 0,
0, 0, 0, 0, 0, 0
)
def takeoff(self, namespace, altitude): def takeoff(self, namespace, altitude):
sysid = int(namespace.replace('UAV', '')) req = CommandTOL.Request()
self.connection.mav.command_long_send( req.altitude = altitude
sysid, req.latitude = 0.0
1, # Component ID req.longitude = 0.0
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, #MAV_CMD_NAV_TAKEOFF (22) req.yaw = 0.0
0, # Confirmation (0 = first transmission) req.min_pitch = 0.0
0, 0, 0, 0, # Parameters 1-4 (Unused in takeoff command) self.clients[namespace]["takeoff"].call_async(req)
0, 0, altitude # Latitude, Longitude, Altitude (target_altitude in meters) self.node.get_logger().info(f'{namespace}: 起飛到高度 {altitude}')
)
def land(self, namespace): def land(self, namespace):
sysid = int(namespace.replace('UAV', '')) req = CommandTOL.Request()
self.connection.mav.command_long_send( req.altitude = 0.0
sysid, req.latitude = 0.0
1, # Component ID req.longitude = 0.0
mavutil.mavlink.MAV_CMD_NAV_LAND, #MAV_CMD_NAV_LAND (21) req.yaw = 0.0
0, req.min_pitch = 0.0
0, 0, 0, 0, self.clients[namespace]["land"].call_async(req)
0, 0, 0 self.node.get_logger().info(f'{namespace}: 降落')
)
def set_local_position(self, namespace, x, y, z): def set_local_position(self, namespace, x, y, z):
sysid = int(namespace.replace('UAV', '')) pose_msg = PoseStamped()
self.connection.mav.set_position_target_local_ned_send( pose_msg.pose.position.x = x
0, sysid, 1, # time_boot_ms, sysid, compid pose_msg.pose.position.y = y
mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame pose_msg.pose.position.z = z
0b110111111000, # Mask self.clients[namespace]["local_position"].publish(pose_msg)
y, x, -z, # Position self.node.get_logger().info(f'{namespace}: Published local position: X={x}, Y={y}, Z={z}')
0, 0, 0, # Velocity
0, 0, 0, # Acceleration
0, 0 # Yaw, yaw_rate
)
def reboot_drone(self, namespace):
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.command_long_send(
sysid,
1, # target_component (一般為1)
mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, # Reboot 命令
0, # confirmation
1, # 第一個參數 (1 = reboot0 = 無操作2 = 關機)
0, 0, 0, 0, 0, 0 # 其餘參數保留
)
def set_param(self, namespace, param_name, value):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_set_send(
sysid,
1,
param_name.encode('utf-8'),
float(value),
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
def request_param(self, namespace, param_name):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_request_read_send(
sysid, # 系統 ID
1, # 組件 ID
param_name.encode('utf-8'), # 參數名稱
-1 # 參數索引,-1 表示根據名稱請求
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
def send_heartbeat(self):
self.connection.mav.heartbeat_send(
mavutil.mavlink.MAV_TYPE_GCS,
mavutil.mavlink.MAV_AUTOPILOT_INVALID,
0, 0, 0
)
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 2, 4]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[1],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[1],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[1]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
class DroneControlApp(QMainWindow): class DroneControlApp(QMainWindow):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.workers = MAVLinkWorker() self.namespaces = ['uav1', 'uav2', 'uav3']
self.namespaces = self.workers.namespaces self.workers = ROS2Worker(self.namespaces)
self.initUI() self.initUI()
# 訂閱所有數據更新
# Connect signals to update the UI
self.workers.state_signal.connect(self.update_state) self.workers.state_signal.connect(self.update_state)
self.workers.battery_signal.connect(self.update_battery) self.workers.battery_signal.connect(self.update_battery)
self.workers.gps_signal.connect(self.update_gps) self.workers.gps_signal.connect(self.update_gps)
@ -292,10 +144,7 @@ class DroneControlApp(QMainWindow):
self.workers.imu_signal.connect(self.update_imu) self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg) self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed) self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.ping_signal.connect(self.update_ping)
self.workers.loss_signal.connect(self.update_loss)
self.workers.frequency_signal.connect(self.update_frequency)
self.workers.param_signal.connect(self.update_param)
self.workers.start() self.workers.start()
def initUI(self): def initUI(self):
@ -325,19 +174,15 @@ class DroneControlApp(QMainWindow):
# 狀態顯示 # 狀態顯示
self.uav_labels[namespace] = { self.uav_labels[namespace] = {
"status": QLabel("狀態:等待數據..."), "status": QLabel(f"{namespace} 狀態:等待數據..."),
"battery": QLabel("電量:等待數據..."), "battery": QLabel(f"{namespace} 電量:等待數據..."),
"gps": QLabel("GPS位置等待數據...\n\n"), "gps": QLabel(f"{namespace} GPS位置等待數據...\n\n"),
"fix": QLabel("Fix Type等待數據..."), "fix": QLabel(f"{namespace} Fix Type等待數據..."),
"satellites": QLabel("衛星數量:等待數據..."), "satellites": QLabel(f"{namespace} 衛星數量:等待數據..."),
"local_position": QLabel("Local Position等待數據..."), "local_position": QLabel(f"{namespace} Local Position等待數據..."),
"groundspeed": QLabel("地面速度:等待數據..."), "groundspeed": QLabel(f"{namespace} 地面速度:等待數據..."),
"pitch": QLabel("Pitch等待數據..."), "pitch": QLabel(f"{namespace} Pitch等待數據..."),
"heading": QLabel("Heading等待數據..."), "heading": QLabel(f"{namespace} Heading等待數據..."),
"ping": QLabel("Ping等待數據..."),
"loss": QLabel("丟包:等待數據..."),
"frequency": QLabel("頻率:等待數據..."),
"param": QLabel("SR1_EXTRA1等待數據...")
} }
# 把每個資訊添加到該 UAV 的垂直佈局中 # 把每個資訊添加到該 UAV 的垂直佈局中
@ -349,16 +194,6 @@ class DroneControlApp(QMainWindow):
main_layout.addLayout(uav_layout) main_layout.addLayout(uav_layout)
# SR1_EXTRA1參數設置
param_layout = QHBoxLayout()
self.paramInput = QLineEdit()
self.paramInput.setPlaceholderText("輸入SR1_EXTRA1的新值")
self.setParamButton = QPushButton("設置SR1_EXTRA1")
self.setParamButton.clicked.connect(self.set_SR1_EXTRA1)
param_layout.addWidget(self.paramInput)
param_layout.addWidget(self.setParamButton)
main_layout.addLayout(param_layout)
# 模式切換區域 # 模式切換區域
mode_layout = QHBoxLayout() mode_layout = QHBoxLayout()
self.modeComboBox = QComboBox() self.modeComboBox = QComboBox()
@ -390,26 +225,19 @@ class DroneControlApp(QMainWindow):
main_layout.addWidget(self.setPositionButton) main_layout.addWidget(self.setPositionButton)
# 解鎖按鈕 # 解鎖按鈕
fly_layout = QHBoxLayout()
self.armButton = QPushButton("解鎖") self.armButton = QPushButton("解鎖")
self.armButton.clicked.connect(self.arm_drone) self.armButton.clicked.connect(self.arm_drone)
main_layout.addWidget(self.armButton)
# 起飛按鈕 # 起飛按鈕
self.takeoffButton = QPushButton("起飛") self.takeoffButton = QPushButton("起飛")
self.takeoffButton.clicked.connect(self.takeoff_drone) self.takeoffButton.clicked.connect(self.takeoff_drone)
main_layout.addWidget(self.takeoffButton)
# 降落按鈕 # 降落按鈕
self.landButton = QPushButton("降落") self.landButton = QPushButton("降落")
self.landButton.clicked.connect(self.land_drone) self.landButton.clicked.connect(self.land_drone)
fly_layout.addWidget(self.armButton) main_layout.addWidget(self.landButton)
fly_layout.addWidget(self.takeoffButton)
fly_layout.addWidget(self.landButton)
main_layout.addLayout(fly_layout)
#設置重啟按鈕
self.rebootButton = QPushButton("重啟")
self.rebootButton.clicked.connect(self.reboot_drone)
main_layout.addWidget(self.rebootButton)
# 設置主佈局 # 設置主佈局
central_widget = QWidget(self) central_widget = QWidget(self)
@ -424,13 +252,13 @@ class DroneControlApp(QMainWindow):
event.accept() event.accept()
def update_state(self, namespace, mode): def update_state(self, namespace, mode):
self.uav_labels[namespace]["status"].setText(f"狀態:{mode}") self.uav_labels[namespace]["status"].setText(f"{namespace} 狀態:{mode}")
def update_battery(self, namespace, percentage): def update_battery(self, namespace, percentage):
self.uav_labels[namespace]["battery"].setText(f"電量:{percentage * 100:.2f}%") self.uav_labels[namespace]["battery"].setText(f"{namespace} 電量:{percentage * 100:.2f}%")
def update_gps(self, namespace, latitude, longitude): def update_gps(self, namespace, latitude, longitude):
self.uav_labels[namespace]["gps"].setText(f"GPS位置 \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°") self.uav_labels[namespace]["gps"].setText(f"{namespace} GPS位置 \nLat:{latitude:.6f}° \nLon:{longitude:.6f}°")
def update_gps_status(self, namespace, fix_type, satellites_visible): def update_gps_status(self, namespace, fix_type, satellites_visible):
fix_type_str = { fix_type_str = {
@ -441,32 +269,22 @@ class DroneControlApp(QMainWindow):
4: "RTK Float", 4: "RTK Float",
5: "RTK Fix" 5: "RTK Fix"
}.get(fix_type, "Unknown") }.get(fix_type, "Unknown")
self.uav_labels[namespace]["fix"].setText(f"Fix Type{fix_type_str}") self.uav_labels[namespace]["fix"].setText(f"{namespace} Fix Type{fix_type_str}")
self.uav_labels[namespace]["satellites"].setText(f"衛星數量:{satellites_visible}") self.uav_labels[namespace]["satellites"].setText(f"{namespace} 衛星數量:{satellites_visible}")
def update_local_position(self, namespace, x, y, z): def update_local_position(self, namespace, x, y, z):
self.uav_labels[namespace]["local_position"].setText(f"Local({x:.2f}, {y:.2f}, {z:.2f})") self.uav_labels[namespace]["local_position"].setText(f"{namespace} Local({x:.2f}, {y:.2f}, {z:.2f})")
def update_imu(self, namespace, pitch):
self.uav_labels[namespace]["pitch"].setText(f"Pitch{pitch:.2f}°")
def update_hdg(self, namespace, heading):
self.uav_labels[namespace]["heading"].setText(f"Heading{heading:.2f}°")
def update_groundspeed(self, namespace, groundspeed): def update_groundspeed(self, namespace, groundspeed):
self.uav_labels[namespace]["groundspeed"].setText(f"地面速度:{groundspeed:.2f} m/s") self.uav_labels[namespace]["groundspeed"].setText(f"{namespace} 地面速度:{groundspeed:.2f} m/s")
def update_ping(self, namespace, ping):
self.uav_labels[namespace]["ping"].setText(f"Ping{ping:.2f} ms")
def update_loss(self, namespace, loss): def update_imu(self, namespace, x, y, z, w):
self.uav_labels[namespace]["loss"].setText(f"丟包:{loss:.2f}%") q = np.array([x, y, z, w])
pitch = self.quaternion_to_euler(q)
self.uav_labels[namespace]["pitch"].setText(f"{namespace} Pitch{pitch:.2f}°")
def update_frequency(self, namespace, frequency): def update_hdg(self, namespace, data):
self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz") self.uav_labels[namespace]["heading"].setText(f"{namespace} Heading{data:.2f}°")
def update_param(self, namespace, value):
self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}")
def change_mode(self): def change_mode(self):
try: try:
@ -477,6 +295,17 @@ class DroneControlApp(QMainWindow):
except Exception as e: except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}") QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}")
def set_local_position(self):
try:
x = float(self.positionX.text().strip())
y = float(self.positionY.text().strip())
z = float(self.positionZ.text().strip())
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def arm_drone(self): def arm_drone(self):
try: try:
for namespace in self.namespaces: for namespace in self.namespaces:
@ -500,35 +329,15 @@ class DroneControlApp(QMainWindow):
if self.uav_checkboxes[namespace].isChecked(): if self.uav_checkboxes[namespace].isChecked():
self.workers.land(namespace) self.workers.land(namespace)
def set_local_position(self): def quaternion_to_euler(self, q):
try: # Convert quaternion to Euler angles (roll, pitch, yaw)
x = float(self.positionX.text().strip()) x, y, z, w = q
y = float(self.positionY.text().strip())
z = float(self.positionZ.text().strip())
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def reboot_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.reboot_drone(namespace)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"重啟失敗:{e}")
def set_SR1_EXTRA1(self):
param_value = self.paramInput.text().strip()
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_param(namespace, "SR1_EXTRA1", param_value)
self.workers.request_param(namespace, "SR1_EXTRA1")
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
# Pitch (y-axis rotation)
pitch_y = np.arcsin(2.0 * (w * y - z * x))
return pitch_y
def main(args=None): def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
app = QtWidgets.QApplication(sys.argv) app = QtWidgets.QApplication(sys.argv)

Loading…
Cancel
Save