commit b5ff857ef828a9e731157ae96646c5aa9ff2ad6b Author: picard Date: Tue Jan 21 15:35:51 2025 +0800 初始化專案 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..34df110 --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# 二進制 +*.o +*.so +*.out +*.a +*.exe + +# 編譯的資料夾 +**/build/ +**/install/ +log/ + +# CMAKE 的衍生檔 +CMakeCache.txt +CMakeFiles/ +Makefile + + +# IDE files +**/.vscode/ +**/.idea/ +**/__pycache__/ +**/*.iml +**/*.class +**/*.pyc +**/*.pyo diff --git a/README.md b/README.md new file mode 100644 index 0000000..5aaf7c4 --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ +這是天雷系統的專案 + +=== +專案核心環境 +1. ROS2 Humble +2. Python + +=== +必要相依套件 + + +=== +Package 簡述 + + diff --git a/src/_unitDev01/drone_controller/__init__.py b/src/_unitDev01/drone_controller/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/_unitDev01/drone_controller/interface.py b/src/_unitDev01/drone_controller/interface.py new file mode 100644 index 0000000..5699e46 --- /dev/null +++ b/src/_unitDev01/drone_controller/interface.py @@ -0,0 +1,349 @@ +import sys +import numpy as np +import rclpy +from rclpy.executors import SingleThreadedExecutor +from PyQt5 import QtWidgets +from PyQt5.QtCore import QThread, pyqtSignal +from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox +from std_msgs.msg import Float64 +from mavros_msgs.srv import SetMode, CommandBool, CommandTOL +from mavros_msgs.msg import State, GPSRAW, VfrHud +from sensor_msgs.msg import NavSatFix, BatteryState, Imu +from geometry_msgs.msg import PoseStamped +from mavros.base import SENSOR_QOS + +class ROS2Worker(QThread): + state_signal = pyqtSignal(str, str) # 新增 namespace 參數 + battery_signal = pyqtSignal(str, float) # 新增 namespace 參數 + gps_signal = pyqtSignal(str, float, float) # 新增 namespace 參數 + gps_status_signal = pyqtSignal(str, int, int) + local_position_signal = pyqtSignal(str, float, float, float) # 新增 namespace 參數 + imu_signal = pyqtSignal(str, float, float, float, float) + hdg_signal = pyqtSignal(str, float) + groundspeed_signal = pyqtSignal(str, float) # 新增 namespace 與 groundspeed 值 + + def __init__(self, namespaces): + super().__init__() + namespaces = ['uav1', 'uav2', 'uav3'] + self.node = rclpy.create_node('multi_uav_control_app') + self.executor = SingleThreadedExecutor() # 創建單執行器 + self.executor.add_node(self.node) # 添加節點到執行器 + self.running = True + + for namespace in namespaces: + self.node.create_subscription(State, f'/{namespace}/state', + lambda msg, ns=namespace: self.state_callback(msg, ns), qos_profile=SENSOR_QOS) + self.node.create_subscription(BatteryState, f'/{namespace}/battery', + lambda msg, ns=namespace: self.battery_callback(msg, ns), qos_profile=SENSOR_QOS) + self.node.create_subscription(NavSatFix, f'/{namespace}/global_position/global', + lambda msg, ns=namespace: self.gps_callback(msg, ns), qos_profile=SENSOR_QOS) + self.node.create_subscription(GPSRAW, f'/{namespace}/gpsstatus/gps1/raw', + lambda msg, ns=namespace: self.gps_status_callback(msg, ns), qos_profile=SENSOR_QOS) + self.node.create_subscription(PoseStamped, f'/{namespace}/local_position/pose', + lambda msg, ns=namespace: self.local_position_callback(msg, ns), qos_profile=SENSOR_QOS) + self.node.create_subscription(Imu, f'/{namespace}/imu/data', + lambda msg, ns=namespace: self.imu_callback(msg, ns), qos_profile=SENSOR_QOS) + self.node.create_subscription(Float64, f'/{namespace}/global_position/compass_hdg', + lambda msg, ns=namespace: self.heading_callback(msg, ns), qos_profile=SENSOR_QOS) + self.node.create_subscription(VfrHud, f'/{namespace}/vfr_hud', + lambda msg, ns=namespace: self.vfr_hud_callback(msg, ns), qos_profile=SENSOR_QOS) + + self.clients = {namespace: { + "set_mode": self.node.create_client(SetMode, f'/{namespace}/set_mode'), + "arming": self.node.create_client(CommandBool, f'/{namespace}/cmd/arming'), + "takeoff": self.node.create_client(CommandTOL, f'/{namespace}/cmd/takeoff'), + "land": self.node.create_client(CommandTOL, f'/{namespace}/cmd/land'), + "local_position": self.node.create_publisher(PoseStamped, f'/{namespace}/setpoint_position/local', 10) + } for namespace in namespaces} + + def run(self): + while self.running and rclpy.ok(): + self.executor.spin_once(timeout_sec=0.1) + + def stop(self): + self.running = False + self.executor.shutdown() + self.node.destroy_node() + + def state_callback(self, msg, namespace): + self.state_signal.emit(namespace, msg.mode) + + def battery_callback(self, msg, namespace): + self.battery_signal.emit(namespace, msg.percentage) + + def gps_callback(self, msg, namespace): + self.gps_signal.emit(namespace, msg.latitude, msg.longitude) + + def gps_status_callback(self, msg, namespace): + self.gps_status_signal.emit(namespace, msg.fix_type, msg.satellites_visible) + + def local_position_callback(self, msg, namespace): + self.local_position_signal.emit(namespace, msg.pose.position.x, msg.pose.position.y, msg.pose.position.z) + + def imu_callback(self, msg, namespace): + self.imu_signal.emit(namespace, msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w) + + def heading_callback(self, msg, namespace): + self.hdg_signal.emit(namespace, msg.data) + + def vfr_hud_callback(self, msg, namespace): + self.groundspeed_signal.emit(namespace, msg.groundspeed) + + def set_mode(self, namespace, mode): + req = SetMode.Request() + req.custom_mode = mode + self.clients[namespace]["set_mode"].call_async(req) + self.node.get_logger().info(f'{namespace}: 模式切換: {mode}') + + def arm(self, namespace, arm: bool): + req = CommandBool.Request() + req.value = arm + self.clients[namespace]["arming"].call_async(req) + self.node.get_logger().info(f'{namespace}: {"解鎖" if arm else "上鎖"}') + + def takeoff(self, namespace, altitude): + req = CommandTOL.Request() + req.altitude = altitude + req.latitude = 0.0 + req.longitude = 0.0 + req.yaw = 0.0 + req.min_pitch = 0.0 + self.clients[namespace]["takeoff"].call_async(req) + self.node.get_logger().info(f'{namespace}: 起飛到高度 {altitude}') + + def land(self, namespace): + req = CommandTOL.Request() + req.altitude = 0.0 + req.latitude = 0.0 + req.longitude = 0.0 + req.yaw = 0.0 + req.min_pitch = 0.0 + self.clients[namespace]["land"].call_async(req) + self.node.get_logger().info(f'{namespace}: 降落') + + def set_local_position(self, namespace, x, y, z): + pose_msg = PoseStamped() + pose_msg.pose.position.x = x + pose_msg.pose.position.y = y + pose_msg.pose.position.z = z + self.clients[namespace]["local_position"].publish(pose_msg) + self.node.get_logger().info(f'{namespace}: Published local position: X={x}, Y={y}, Z={z}') + +class DroneControlApp(QMainWindow): + def __init__(self): + super().__init__() + self.namespaces = ['uav1', 'uav2', 'uav3'] + self.workers = ROS2Worker(self.namespaces) + self.initUI() + # 訂閱所有數據更新 + self.workers.state_signal.connect(self.update_state) + self.workers.battery_signal.connect(self.update_battery) + self.workers.gps_signal.connect(self.update_gps) + self.workers.gps_status_signal.connect(self.update_gps_status) + self.workers.local_position_signal.connect(self.update_local_position) + self.workers.imu_signal.connect(self.update_imu) + self.workers.hdg_signal.connect(self.update_hdg) + self.workers.groundspeed_signal.connect(self.update_groundspeed) + + self.workers.start() + + def initUI(self): + self.setWindowTitle("多無人機控制介面") + self.setGeometry(100, 100, 800, 600) + + # 主佈局 + main_layout = QVBoxLayout() + + # 無人機選擇區域 + uav_layout = QHBoxLayout() + self.uav_checkboxes = {} + for namespace in self.namespaces: + checkbox = QCheckBox(namespace) + checkbox.setChecked(True) # 預設勾選 + self.uav_checkboxes[namespace] = checkbox + uav_layout.addWidget(checkbox) + main_layout.addLayout(uav_layout) + + # 顯示所有無人機資訊,從左到右顯示 + uav_layout = QHBoxLayout() + + # 逐個顯示 UAV 的資訊 + self.uav_labels = {} + for namespace in self.namespaces: + uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列 + + # 狀態顯示 + self.uav_labels[namespace] = { + "status": QLabel(f"{namespace} 狀態:等待數據..."), + "battery": QLabel(f"{namespace} 電量:等待數據..."), + "gps": QLabel(f"{namespace} GPS位置:等待數據...\n\n"), + "fix": QLabel(f"{namespace} Fix Type:等待數據..."), + "satellites": QLabel(f"{namespace} 衛星數量:等待數據..."), + "local_position": QLabel(f"{namespace} Local Position:等待數據..."), + "groundspeed": QLabel(f"{namespace} 地面速度:等待數據..."), + "pitch": QLabel(f"{namespace} Pitch:等待數據..."), + "heading": QLabel(f"{namespace} Heading:等待數據..."), + } + + # 把每個資訊添加到該 UAV 的垂直佈局中 + for label in self.uav_labels[namespace].values(): + uav_info_layout.addWidget(label) + + # 將該 UAV 的資訊佈局加到主佈局中(從左到右排列) + uav_layout.addLayout(uav_info_layout) + + main_layout.addLayout(uav_layout) + + # 模式切換區域 + mode_layout = QHBoxLayout() + self.modeComboBox = QComboBox() + self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"]) + self.modeButton = QPushButton("切換模式") + self.modeButton.clicked.connect(self.change_mode) + mode_layout.addWidget(QLabel("選擇模式:")) + mode_layout.addWidget(self.modeComboBox) + mode_layout.addWidget(self.modeButton) + main_layout.addLayout(mode_layout) + + # XYZ 設置欄位 + xyz_layout = QHBoxLayout() + self.positionX = QLineEdit() + self.positionX.setPlaceholderText("X") + self.positionY = QLineEdit() + self.positionY.setPlaceholderText("Y") + self.positionZ = QLineEdit() + self.positionZ.setPlaceholderText("Z") + xyz_layout.addWidget(QLabel("輸入位置:")) + xyz_layout.addWidget(self.positionX) + xyz_layout.addWidget(self.positionY) + xyz_layout.addWidget(self.positionZ) + main_layout.addLayout(xyz_layout) + + # 設置 XYZ 的按鈕 + self.setPositionButton = QPushButton("設置位置") + self.setPositionButton.clicked.connect(self.set_local_position) + main_layout.addWidget(self.setPositionButton) + + # 解鎖按鈕 + self.armButton = QPushButton("解鎖") + self.armButton.clicked.connect(self.arm_drone) + main_layout.addWidget(self.armButton) + + # 起飛按鈕 + self.takeoffButton = QPushButton("起飛") + self.takeoffButton.clicked.connect(self.takeoff_drone) + main_layout.addWidget(self.takeoffButton) + + # 降落按鈕 + self.landButton = QPushButton("降落") + self.landButton.clicked.connect(self.land_drone) + main_layout.addWidget(self.landButton) + + # 設置主佈局 + central_widget = QWidget(self) + central_widget.setLayout(main_layout) + self.setCentralWidget(central_widget) + self.show() + + def closeEvent(self, event): + try: + self.workers.stop() + finally: + event.accept() + + def update_state(self, namespace, mode): + self.uav_labels[namespace]["status"].setText(f"{namespace} 狀態:{mode}") + + def update_battery(self, namespace, percentage): + self.uav_labels[namespace]["battery"].setText(f"{namespace} 電量:{percentage * 100:.2f}%") + + def update_gps(self, namespace, latitude, longitude): + self.uav_labels[namespace]["gps"].setText(f"{namespace} GPS位置: \nLat:{latitude:.6f}° \nLon:{longitude:.6f}°") + + def update_gps_status(self, namespace, fix_type, satellites_visible): + fix_type_str = { + 0: "No Fix", + 1: "No Fix", + 2: "2D Fix", + 3: "3D Fix", + 4: "RTK Float", + 5: "RTK Fix" + }.get(fix_type, "Unknown") + self.uav_labels[namespace]["fix"].setText(f"{namespace} Fix Type:{fix_type_str}") + self.uav_labels[namespace]["satellites"].setText(f"{namespace} 衛星數量:{satellites_visible}") + + def update_local_position(self, namespace, x, y, z): + self.uav_labels[namespace]["local_position"].setText(f"{namespace} Local:({x:.2f}, {y:.2f}, {z:.2f})") + + def update_groundspeed(self, namespace, groundspeed): + self.uav_labels[namespace]["groundspeed"].setText(f"{namespace} 地面速度:{groundspeed:.2f} m/s") + + def update_imu(self, namespace, x, y, z, w): + q = np.array([x, y, z, w]) + pitch = self.quaternion_to_euler(q) + self.uav_labels[namespace]["pitch"].setText(f"{namespace} Pitch:{pitch:.2f}°") + + def update_hdg(self, namespace, data): + self.uav_labels[namespace]["heading"].setText(f"{namespace} Heading:{data:.2f}°") + + def change_mode(self): + try: + selected_mode = self.modeComboBox.currentText() + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_mode(namespace, selected_mode) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}") + + def set_local_position(self): + try: + x = float(self.positionX.text().strip()) + y = float(self.positionY.text().strip()) + z = float(self.positionZ.text().strip()) + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_local_position(namespace, x, y, z) + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + + def arm_drone(self): + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.arm(namespace, True) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}") + + def takeoff_drone(self): + try: + z_text = self.positionZ.text().strip() + altitude = float(z_text) if z_text else 5.0 + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.takeoff(namespace, altitude) + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + + def land_drone(self): + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.land(namespace) + + def quaternion_to_euler(self, q): + # Convert quaternion to Euler angles (roll, pitch, yaw) + x, y, z, w = q + + # Pitch (y-axis rotation) + pitch_y = np.arcsin(2.0 * (w * y - z * x)) + + return pitch_y + +def main(args=None): + rclpy.init(args=args) + app = QtWidgets.QApplication(sys.argv) + window = DroneControlApp() + window.show() + sys.exit(app.exec_()) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/src/_unitDev01/drone_controller/mavlink.py b/src/_unitDev01/drone_controller/mavlink.py new file mode 100644 index 0000000..0371d32 --- /dev/null +++ b/src/_unitDev01/drone_controller/mavlink.py @@ -0,0 +1,479 @@ +import sys +import time +import math +import rclpy +from collections import deque +from PyQt5 import QtWidgets +from PyQt5.QtCore import QThread, pyqtSignal +from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox +from pymavlink import mavutil + +class MAVLinkWorker(QThread): + state_signal = pyqtSignal(str, str) + battery_signal = pyqtSignal(str, float) + gps_signal = pyqtSignal(str, float, float) + gps_status_signal = pyqtSignal(str, int, int) + local_position_signal = pyqtSignal(str, float, float, float) + imu_signal = pyqtSignal(str, float) + hdg_signal = pyqtSignal(str, float) + groundspeed_signal = pyqtSignal(str, float) + + def __init__(self, connection_string="udp:127.0.0.1:14550", usb='/dev/ttyUSB0'): + super().__init__() + self.connection = mavutil.mavlink_connection(connection_string, baud=57600) + self.connection.wait_heartbeat() + self.running = True + self.uav_data = {} + + # 設置需要監控的訊息類型 + self.messages_to_monitor = ["HEARTBEAT", "BATTERY_STATUS", "GLOBAL_POSITION_INT", "GPS_RAW_INT", "LOCAL_POSITION_NED", "ATTITUDE", "VFR_HUD"] + # 記錄訊息的時間戳和丟包數 + self.last_received_time = time.time() + self.message_count = 0 + self.missed_messages = 0 + # 用於計算頻率的時間窗口(每隔 1 秒計算一次頻率) + self.start_time = time.time() + # 記錄處理過的消息類型及其對應時間 + self.message_times = {} + self.delay = deque(maxlen=20) + + self.seq_numbers = {} + self.packet_loss_count = {} + self.total_packet_count = {} + self.loss_percentage = {} + + self.seq = 0 + self.ping_time = 0 + + def run(self): + while self.running: + try: + current_time = time.time() + self.connection.mav.timesync_send(int(current_time*1e6), 0) + ''' + if self.ping_time - current_time > 1e3: + self.connection.mav.ping_send( + time_usec = current_time, + seq = self.seq, + target_system = 0, + target_component = 0 + ) + print(f"Ping {self.seq} sent at {current_time} ms") + ''' + msg = self.connection.recv_match(blocking=True) + if not msg: + continue + + msg_type = msg.get_type() + sysid = msg.get_srcSystem() + namespace = f"uav{sysid}" + + if sysid not in self.uav_data: + self.uav_data[sysid] = {"sysid": sysid} + + # Packet loss calculation + if sysid not in self.seq_numbers: + self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking + self.packet_loss_count[sysid] = 0 + self.total_packet_count[sysid] = 1 + else: + current_seq = msg.get_seq() + expected_seq = (self.seq_numbers[sysid] + 1) % 256 # MAVLink sequence numbers are modulo 256 + self.total_packet_count[sysid] += 1 + + if current_seq != expected_seq: + # Packet(s) lost + lost_packets = (current_seq - expected_seq) % 256 + self.packet_loss_count[sysid] += lost_packets + + self.seq_numbers[sysid] = current_seq + + ''' + if self.total_packet_count[sysid] > 0: + self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100 + print(f"UAV {sysid} Packet Loss: {self.packet_loss_count[sysid]} / {self.total_packet_count[sysid]} " + f"({self.loss_percentage[sysid]:.2f}%)") + ''' + + if msg_type == "HEARTBEAT": + mode = mavutil.mode_string_v10(msg) + self.state_signal.emit(namespace, mode) + + elif msg_type == "BATTERY_STATUS": + percentage = msg.battery_remaining/100 + self.battery_signal.emit(namespace, percentage) + + elif msg_type == "GLOBAL_POSITION_INT": + latitude = msg.lat / 1e7 + longitude = msg.lon / 1e7 + self.gps_signal.emit(namespace, latitude, longitude) + + elif msg_type == "GPS_RAW_INT": + fix_type = msg.fix_type + satellites_visible = msg.satellites_visible + self.gps_status_signal.emit(namespace, fix_type, satellites_visible) + + elif msg_type == "LOCAL_POSITION_NED": + x = msg.y + y = msg.x + z = -msg.z + self.local_position_signal.emit(namespace, x, y, z) + + elif msg_type == "ATTITUDE": + pitch = math.degrees(msg.pitch) + self.imu_signal.emit(namespace, pitch) + + elif msg_type == "VFR_HUD": + groundspeed = msg.groundspeed + heading = msg.heading + self.hdg_signal.emit(namespace, heading) + self.groundspeed_signal.emit(namespace, groundspeed) + + elif msg_type == "TIMESYNC": + # Calculate the round-trip time in milliseconds + round_trip_time = (int(time.time() * 1e6) - msg.ts1) / 1000.0 + print(f"Round-trip time (ping): {round_trip_time:.3f} ms") + + self.seq += 1 + + if msg_type in self.messages_to_monitor: + # 更新消息接收時間 + if msg_type not in self.message_times: + self.message_times[msg_type] = current_time + else: + # 計算每條消息的頻率 + self.time_diff = current_time - self.message_times[msg_type] + self.message_times[msg_type] = current_time + frequency = 1 / self.time_diff if self.time_diff > 0 else 0 + + timestamp = msg._timestamp + round_trip_delay = (timestamp - current_time)*1000 + self.delay.append(round_trip_delay) + + # 計算總的頻率 + self.message_count += 1 + + self.last_received_time = current_time + + # 每隔一段時間更新總頻率 + if current_time - self.start_time >= 1: + # 更新每秒頻率 + frequency = self.message_count / (current_time - self.start_time) + print(f"Overall frequency: {frequency:.2f} Hz") + if self.delay: + average = sum(self.delay) / len(self.delay) + print(f"The average delay is: {average:.4f} ms") + self.start_time = current_time + self.message_count = 0 + + + + except Exception as e: + print(f"Error reading message: {e}") + + def stop(self): + self.running = False + self.connection.close() + + def set_mode(self, namespace, mode): + sysid = int(namespace[-1]) + if mode == 'STABILIZE': + mode = 0 + elif mode == 'AUTO': + mode = 3 + elif mode == 'GUIDED': + mode = 4 + elif mode == 'LOITER': + mode = 5 + self.connection.mav.set_mode_send( #SET_MODE (11) — [DEP] + sysid, + mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, + mode + ) + + def arm(self, namespace, arm): + sysid = int(namespace[-1]) + self.connection.mav.command_long_send( + sysid, + 1, # Component ID + mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, #MAV_CMD_COMPONENT_ARM_DISARM (400) + 0, # Confirmation + 1 if arm else 0, + 0, 0, 0, 0, 0, 0 + ) + + def takeoff(self, namespace, altitude): + sysid = int(namespace[-1]) + self.connection.mav.command_long_send( + sysid, + 1, # Component ID + mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, #MAV_CMD_NAV_TAKEOFF (22) + 0, # Confirmation (0 = first transmission) + 0, 0, 0, 0, # Parameters 1-4 (Unused in takeoff command) + 0, 0, altitude # Latitude, Longitude, Altitude (target_altitude in meters) + ) + + def land(self, namespace): + sysid = int(namespace[-1]) + self.connection.mav.command_long_send( + sysid, + 1, # Component ID + mavutil.mavlink.MAV_CMD_NAV_LAND, #MAV_CMD_NAV_LAND (21) + 0, + 0, 0, 0, 0, + 0, 0, 0 + ) + + def set_local_position(self, namespace, x, y, z): + sysid = int(namespace[-1]) + self.connection.mav.set_position_target_local_ned_send( + 0, sysid, 1, # time_boot_ms, sysid, compid + mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame + 0b110111111000, # Mask + y, x, -z, # Position + 0, 0, 0, # Velocity + 0, 0, 0, # Acceleration + 0, 0 # Yaw, yaw_rate + ) + + def set_parameter(self, namespace, param_name, value): + sysid = int(namespace[-1]) + try: + self.connection.mav.param_set_send( + sysid, + 1, + param_name.encode('utf-8'), + float(value), + mavutil.mavlink.MAV_PARAM_TYPE_REAL32 + ) + print(f"Parameter {param_name} set to {value}") + except Exception as e: + print(f"Failed to set parameter {param_name}: {e}") + +class DroneControlApp(QMainWindow): + def __init__(self): + super().__init__() + self.namespaces = ['uav1', 'uav4', 'uav100'] + self.workers = MAVLinkWorker() + self.initUI() + + # Connect signals to update the UI + self.workers.state_signal.connect(self.update_state) + self.workers.battery_signal.connect(self.update_battery) + self.workers.gps_signal.connect(self.update_gps) + self.workers.gps_status_signal.connect(self.update_gps_status) + self.workers.local_position_signal.connect(self.update_local_position) + self.workers.imu_signal.connect(self.update_imu) + self.workers.hdg_signal.connect(self.update_hdg) + self.workers.groundspeed_signal.connect(self.update_groundspeed) + self.workers.start() + + def initUI(self): + self.setWindowTitle("多無人機控制介面") + self.setGeometry(100, 100, 800, 600) + + # 主佈局 + main_layout = QVBoxLayout() + + # 無人機選擇區域 + uav_layout = QHBoxLayout() + self.uav_checkboxes = {} + for namespace in self.namespaces: + checkbox = QCheckBox(namespace) + checkbox.setChecked(True) # 預設勾選 + self.uav_checkboxes[namespace] = checkbox + uav_layout.addWidget(checkbox) + main_layout.addLayout(uav_layout) + + # 顯示所有無人機資訊,從左到右顯示 + uav_layout = QHBoxLayout() + + # 逐個顯示 UAV 的資訊 + self.uav_labels = {} + for namespace in self.namespaces: + uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列 + + # 狀態顯示 + self.uav_labels[namespace] = { + "status": QLabel(f"{namespace} 狀態:等待數據..."), + "battery": QLabel(f"{namespace} 電量:等待數據..."), + "gps": QLabel(f"{namespace} GPS位置:等待數據...\n\n"), + "fix": QLabel(f"{namespace} Fix Type:等待數據..."), + "satellites": QLabel(f"{namespace} 衛星數量:等待數據..."), + "local_position": QLabel(f"{namespace} Local Position:等待數據..."), + "groundspeed": QLabel(f"{namespace} 地面速度:等待數據..."), + "pitch": QLabel(f"{namespace} Pitch:等待數據..."), + "heading": QLabel(f"{namespace} Heading:等待數據..."), + } + + # 把每個資訊添加到該 UAV 的垂直佈局中 + for label in self.uav_labels[namespace].values(): + uav_info_layout.addWidget(label) + + # 將該 UAV 的資訊佈局加到主佈局中(從左到右排列) + uav_layout.addLayout(uav_info_layout) + + main_layout.addLayout(uav_layout) + + # EK3_SRC2_POSXY參數設置 + param_layout = QHBoxLayout() + self.paramInput = QLineEdit() + self.paramInput.setPlaceholderText("輸入EK3_SRC2_POSXY的新值") + self.setParamButton = QPushButton("設置EK3_SRC2_POSXY") + self.setParamButton.clicked.connect(self.set_ek3_src2_posxy) + param_layout.addWidget(QLabel("EK3_SRC2_POSXY:")) + param_layout.addWidget(self.paramInput) + param_layout.addWidget(self.setParamButton) + main_layout.addLayout(param_layout) + + # 模式切換區域 + mode_layout = QHBoxLayout() + self.modeComboBox = QComboBox() + self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"]) + self.modeButton = QPushButton("切換模式") + self.modeButton.clicked.connect(self.change_mode) + mode_layout.addWidget(QLabel("選擇模式:")) + mode_layout.addWidget(self.modeComboBox) + mode_layout.addWidget(self.modeButton) + main_layout.addLayout(mode_layout) + + # XYZ 設置欄位 + xyz_layout = QHBoxLayout() + self.positionX = QLineEdit() + self.positionX.setPlaceholderText("X") + self.positionY = QLineEdit() + self.positionY.setPlaceholderText("Y") + self.positionZ = QLineEdit() + self.positionZ.setPlaceholderText("Z") + xyz_layout.addWidget(QLabel("輸入位置:")) + xyz_layout.addWidget(self.positionX) + xyz_layout.addWidget(self.positionY) + xyz_layout.addWidget(self.positionZ) + main_layout.addLayout(xyz_layout) + + # 設置 XYZ 的按鈕 + self.setPositionButton = QPushButton("設置位置") + self.setPositionButton.clicked.connect(self.set_local_position) + main_layout.addWidget(self.setPositionButton) + + # 解鎖按鈕 + self.armButton = QPushButton("解鎖") + self.armButton.clicked.connect(self.arm_drone) + main_layout.addWidget(self.armButton) + + # 起飛按鈕 + self.takeoffButton = QPushButton("起飛") + self.takeoffButton.clicked.connect(self.takeoff_drone) + main_layout.addWidget(self.takeoffButton) + + # 降落按鈕 + self.landButton = QPushButton("降落") + self.landButton.clicked.connect(self.land_drone) + main_layout.addWidget(self.landButton) + + # 設置主佈局 + central_widget = QWidget(self) + central_widget.setLayout(main_layout) + self.setCentralWidget(central_widget) + self.show() + + def closeEvent(self, event): + try: + self.workers.stop() + finally: + event.accept() + + def update_state(self, namespace, mode): + self.uav_labels[namespace]["status"].setText(f"{namespace} 狀態:{mode}") + + def update_battery(self, namespace, percentage): + self.uav_labels[namespace]["battery"].setText(f"{namespace} 電量:{percentage * 100:.2f}%") + + def update_gps(self, namespace, latitude, longitude): + self.uav_labels[namespace]["gps"].setText(f"{namespace} GPS位置: \nLat:{latitude:.6f}° \nLon:{longitude:.6f}°") + + def update_gps_status(self, namespace, fix_type, satellites_visible): + fix_type_str = { + 0: "No Fix", + 1: "No Fix", + 2: "2D Fix", + 3: "3D Fix", + 4: "RTK Float", + 5: "RTK Fix" + }.get(fix_type, "Unknown") + self.uav_labels[namespace]["fix"].setText(f"{namespace} Fix Type:{fix_type_str}") + self.uav_labels[namespace]["satellites"].setText(f"{namespace} 衛星數量:{satellites_visible}") + + def update_local_position(self, namespace, x, y, z): + self.uav_labels[namespace]["local_position"].setText(f"{namespace} Local:({x:.2f}, {y:.2f}, {z:.2f})") + + def update_imu(self, namespace, pitch): + self.uav_labels[namespace]["pitch"].setText(f"{namespace} Pitch:{pitch:.2f}°") + + def update_hdg(self, namespace, heading): + self.uav_labels[namespace]["heading"].setText(f"{namespace} Heading:{heading:.2f}°") + + def update_groundspeed(self, namespace, groundspeed): + self.uav_labels[namespace]["groundspeed"].setText(f"{namespace} 地面速度:{groundspeed:.2f} m/s") + + def change_mode(self): + try: + selected_mode = self.modeComboBox.currentText() + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_mode(namespace, selected_mode) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}") + + def arm_drone(self): + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.arm(namespace, True) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}") + + def takeoff_drone(self): + try: + z_text = self.positionZ.text().strip() + altitude = float(z_text) if z_text else 5.0 + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.takeoff(namespace, altitude) + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + + def land_drone(self): + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.land(namespace) + + def set_local_position(self): + try: + x = float(self.positionX.text().strip()) + y = float(self.positionY.text().strip()) + z = float(self.positionZ.text().strip()) + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_local_position(namespace, x, y, z) + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + + def set_ek3_src2_posxy(self): + param_value = self.paramInput.text().strip() + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_parameter(namespace, "EK3_SRC2_POSXY", param_value) + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + +def main(args=None): + rclpy.init(args=args) + app = QtWidgets.QApplication(sys.argv) + window = DroneControlApp() + window.show() + sys.exit(app.exec_()) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/src/_unitDev01/package.xml b/src/_unitDev01/package.xml new file mode 100644 index 0000000..a824539 --- /dev/null +++ b/src/_unitDev01/package.xml @@ -0,0 +1,18 @@ + + + + drone_controller + 0.0.0 + TODO: Package description + dodo + TODO: License declaration + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/src/_unitDev01/setup.cfg b/src/_unitDev01/setup.cfg new file mode 100644 index 0000000..a7caef3 --- /dev/null +++ b/src/_unitDev01/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/drone_controller +[install] +install_scripts=$base/lib/drone_controller diff --git a/src/_unitDev01/setup.py b/src/_unitDev01/setup.py new file mode 100644 index 0000000..213eb51 --- /dev/null +++ b/src/_unitDev01/setup.py @@ -0,0 +1,26 @@ +from setuptools import find_packages, setup + +package_name = 'drone_controller' + +setup( + version='0.0.0', + packages=[package_name], + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='dodo', + maintainer_email='dodo@todo.todo', + description='TODO: Package description', + license='TODO: License declaration', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'interface = drone_controller.interface:main', + 'mavlink = drone_controller.mavlink:main' + ], + }, +)