diff --git a/src/unitdev03/0328/pymavlink_02.py b/src/unitdev03/0328/pymavlink_02.py new file mode 100644 index 0000000..5dba35b --- /dev/null +++ b/src/unitdev03/0328/pymavlink_02.py @@ -0,0 +1,497 @@ +#!/usr/bin/env python3 +from pymavlink import mavutil +import time +import math +import threading +import sys + +class DroneController: + def __init__(self, connection_string="udp:127.0.0.1:14550", drone_id=0): + self.master = mavutil.mavlink_connection(connection_string) + self.drone_id = drone_id + self.connection_string = connection_string + self.master.wait_heartbeat() + print(f"無人機 #{self.drone_id} 已建立連接!") + + def set_mode(self, mode): + if mode == 'STABILIZE': + mode_id = 0 + elif mode == 'AUTO': + mode_id = 3 + elif mode == 'GUIDED': + mode_id = 4 + elif mode == 'LOITER': + mode_id = 5 + elif mode == 'RTL': + mode_id = 6 + elif mode == 'LAND': + mode_id = 9 + else: + print(f"不支援的模式: {mode}") + return False + + self.master.mav.set_mode_send( + self.master.target_system, + mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, + mode_id + ) + + for _ in range(5): + msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1) + if msg and msg.custom_mode == mode_id: + return True + return False + + def arm(self, arm_state=True): + self.master.mav.command_long_send( + self.master.target_system, + self.master.target_component, + mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, + 0, + 1 if arm_state else 0, 0, 0, 0, 0, 0, 0 + ) + + for _ in range(5): + msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1) + if msg: + armed = msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED + if (arm_state and armed) or (not arm_state and not armed): + return True + return False + + def takeoff(self, altitude): + self.master.mav.command_long_send( + self.master.target_system, + self.master.target_component, + mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, + 0, + 0, 0, 0, 0, 0, 0, + altitude + ) + + timeout = time.time() + 30 + while time.time() < timeout: + msg = self.master.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1) + if msg: + current_alt = msg.relative_alt / 1000.0 + if abs(current_alt - altitude) < 0.5: + return True + time.sleep(0.1) + return False + + def goto_position_local(self, x, y, z, vx=0, vy=0, vz=0, yaw=0, yaw_rate=0): + z = -z + + type_mask = ( + mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE | + mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE | + mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE + ) + + print(f"無人機 #{self.drone_id} 發送位置指令: x={x}, y={y}, z={-z}, vx={vx}, vy={vy}, vz={vz}") + + self.master.mav.set_position_target_local_ned_send( + 0, + self.master.target_system, + self.master.target_component, + mavutil.mavlink.MAV_FRAME_LOCAL_NED, + type_mask, + x, y, z, + vx, vy, vz, + 0, 0, 0, + yaw, yaw_rate + ) + + print(f"無人機 #{self.drone_id} 開始監控位置變化...") + start_time = time.time() + last_progress_time = start_time + min_dist = float('inf') + + while time.time() - start_time < 30: + msg = self.master.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1) + if msg: + dist = math.sqrt((msg.x - x)**2 + (msg.y - y)**2 + (msg.z - z)**2) + min_dist = min(min_dist, dist) + + current_time = time.time() + if current_time - last_progress_time >= 5: + if dist > min_dist + 0.1: + print(f"無人機 #{self.drone_id} 偏離目標,重新調整") + self.master.mav.set_position_target_local_ned_send( + 0, self.master.target_system, self.master.target_component, + mavutil.mavlink.MAV_FRAME_LOCAL_NED, + type_mask, + x, y, z, vx, vy, vz, 0, 0, 0, yaw, yaw_rate + ) + last_progress_time = current_time + + print(f"無人機 #{self.drone_id} 當前位置: x={msg.x:.2f}, y={msg.y:.2f}, z={-msg.z:.2f}, 距離目標: {dist:.2f}m") + + if dist < 0.5: + print(f"無人機 #{self.drone_id} 到達目標位置") + return True + + vel_msg = self.master.recv_match(type='LOCAL_POSITION_NED', blocking=False) + if vel_msg: + vel_total = math.sqrt(vel_msg.vx**2 + vel_msg.vy**2 + vel_msg.vz**2) + if dist < 1.0 and vel_total < 0.1: + print(f"無人機 #{self.drone_id} 已穩定懸停在目標位置附近") + return True + + time.sleep(0.1) + + print(f"無人機 #{self.drone_id} 移動超時") + return False + + def get_current_state(self): + state = {} + + # Mode and armed status + hb_msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=0.5) + if hb_msg: + mode_id = hb_msg.custom_mode + mode_name = { + 0: "STABILIZE", + 3: "AUTO", + 4: "GUIDED", + 5: "LOITER", + 6: "RTL", + 9: "LAND" + }.get(mode_id, f"Unknown({mode_id})") + + state['mode'] = mode_name + state['armed'] = bool(hb_msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) + + # Position + pos_msg = self.master.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1.0) + if pos_msg: + state['position'] = { + 'x': round(pos_msg.x, 2), + 'y': round(pos_msg.y, 2), + 'z': round(-pos_msg.z, 2) + } + + # GPS + gps_msg = self.master.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1.0) + if gps_msg: + state['gps_position'] = { + 'lat': gps_msg.lat / 1e7, + 'lon': gps_msg.lon / 1e7, + 'alt': gps_msg.alt / 1000, + 'relative_alt': gps_msg.relative_alt / 1000 + } + + return state + +class MultiDroneController: + def __init__(self, num_drones=5): + self.drones = [] + self.num_drones = num_drones + + for i in range(num_drones): + port = 14550 + i + connection_string = f"udp:127.0.0.1:{port}" + drone = DroneController(connection_string, i) + self.drones.append(drone) + print(f"已初始化無人機 #{i} 在端口 {port}") + + def get_drone(self, drone_id): + if 0 <= drone_id < len(self.drones): + return self.drones[drone_id] + return None + + def all_drones_command(self, command, *args): + results = [] + threads = [] + + for drone in self.drones: + thread = threading.Thread( + target=lambda d, cmd, args, results: results.append((d.drone_id, getattr(d, cmd)(*args))), + args=(drone, command, args, results) + ) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + for drone_id, success in results: + print(f"無人機 #{drone_id} 執行 {command} 結果: {'成功' if success else '失敗'}") + + return all(success for _, success in results) + +def print_menu(): + menu = [ + "\n=== 多機無人機控制界面 ===", + "1. 選擇操作的無人機", + "2. 對所有無人機執行命令", + "3. 查看所有無人機狀態", + "4. 退出", + "0. 顯示此清單", + "==========================" + ] + for item in menu: + print(item) + +def print_drone_menu(): + menu = [ + "\n=== 單機無人機控制界面 ===", + "1. 切換至 GUIDED 模式", + "2. 切換至 LAND 模式", + "3. 解鎖", + "4. 上鎖", + "5. 起飛", + "6. 飛到指定位置", + "7. 切換至 RTL 模式(返航)", + "8. 查看當前狀態", + "9. 返回主菜單", + "0. 顯示此清單", + "==========================" + ] + for item in menu: + print(item) + +def print_all_drones_menu(): + menu = [ + "\n=== 所有無人機控制界面 ===", + "1. 全部切換至 GUIDED 模式", + "2. 全部切換至 LAND 模式", + "3. 全部解鎖", + "4. 全部上鎖", + "5. 全部起飛", + "6. 全部返航", + "7. 返回主菜單", + "0. 顯示此清單", + "==========================" + ] + for item in menu: + print(item) + +def control_single_drone(drone): + print_drone_menu() + current_drone_id = drone.drone_id + + while True: + try: + choice = input(f"\n無人機 #{current_drone_id} - 請輸入指令編號: ") + + if choice == '0': + print_drone_menu() + + elif choice == '1': + if drone.set_mode("GUIDED"): + print(f"無人機 #{current_drone_id} 已切換至 GUIDED 模式") + else: + print(f"無人機 #{current_drone_id} 切換模式失敗") + + elif choice == '2': + if drone.set_mode("LAND"): + print(f"無人機 #{current_drone_id} 已切換至 LAND 模式") + else: + print(f"無人機 #{current_drone_id} 切換模式失敗") + + elif choice == '3': + if drone.arm(True): + print(f"無人機 #{current_drone_id} 已解鎖") + else: + print(f"無人機 #{current_drone_id} 解鎖失敗") + + elif choice == '4': + if drone.arm(False): + print(f"無人機 #{current_drone_id} 已上鎖") + else: + print(f"無人機 #{current_drone_id} 上鎖失敗") + + elif choice == '5': + altitude = float(input("請輸入起飛高度(公尺): ")) + if drone.takeoff(altitude): + print(f"無人機 #{current_drone_id} 已到達目標高度: {altitude}公尺") + else: + print(f"無人機 #{current_drone_id} 起飛失敗") + + elif choice == '6': + x = float(input("請輸入X座標(公尺): ")) + y = float(input("請輸入Y座標(公尺): ")) + z = float(input("請輸入Z座標(公尺,高度為正值): ")) + vx = float(input("請輸入X方向速度(公尺/秒,可選): ") or "0") + vy = float(input("請輸入Y方向速度(公尺/秒,可選): ") or "0") + if drone.goto_position_local(x, y, z, vx, vy): + print(f"無人機 #{current_drone_id} 已到達目標位置") + else: + print(f"無人機 #{current_drone_id} 移動失敗") + + elif choice == '7': + if drone.set_mode("RTL"): + print(f"無人機 #{current_drone_id} 已切換至 RTL 模式,正在返航") + else: + print(f"無人機 #{current_drone_id} 切換至 RTL 模式失敗") + + elif choice == '8': + state = drone.get_current_state() + if state: + print(f"\n=== 無人機 #{current_drone_id} 狀態 ===") + print(f"模式: {state.get('mode', '未知')}") + print(f"解鎖狀態: {'已解鎖' if state.get('armed', False) else '已上鎖'}") + + if 'gps_position' in state: + gps = state['gps_position'] + print(f"GPS位置: 緯度={gps['lat']:.6f}, 經度={gps['lon']:.6f}, 高度={gps['alt']:.2f}m") + + if 'position' in state: + pos = state['position'] + print(f"本地位置: X={pos['x']:.2f}m, Y={pos['y']:.2f}m, Z={pos['z']:.2f}m") + print("=============================") + else: + print(f"無法獲取無人機 #{current_drone_id} 狀態") + + elif choice == '9': + print("返回主菜單") + break + + else: + print("無效的指令,請重新輸入") + + except ValueError as e: + print(f"輸入錯誤: {e}") + except Exception as e: + print(f"發生錯誤: {e}") + +def control_all_drones(controller): + print_all_drones_menu() + + while True: + try: + choice = input("\n請輸入指令編號: ") + + if choice == '0': + print_all_drones_menu() + + elif choice == '1': + if controller.all_drones_command("set_mode", "GUIDED"): + print("所有無人機已切換至 GUIDED 模式") + else: + print("部分無人機切換模式失敗") + + elif choice == '2': + if controller.all_drones_command("set_mode", "LAND"): + print("所有無人機已切換至 LAND 模式") + else: + print("部分無人機切換模式失敗") + + elif choice == '3': + if controller.all_drones_command("arm", True): + print("所有無人機已解鎖") + else: + print("部分無人機解鎖失敗") + + elif choice == '4': + if controller.all_drones_command("arm", False): + print("所有無人機已上鎖") + else: + print("部分無人機上鎖失敗") + + elif choice == '5': + altitude = float(input("請輸入所有無人機起飛高度(公尺): ")) + if controller.all_drones_command("takeoff", altitude): + print(f"所有無人機已到達目標高度: {altitude}公尺") + else: + print("部分無人機起飛失敗") + + elif choice == '6': + if controller.all_drones_command("set_mode", "RTL"): + print("所有無人機已切換至 RTL 模式,正在返航") + else: + print("部分無人機切換至 RTL 模式失敗") + + elif choice == '7': + print("返回主菜單") + break + + else: + print("無效的指令,請重新輸入") + + except ValueError as e: + print(f"輸入錯誤: {e}") + except Exception as e: + print(f"發生錯誤: {e}") + +def display_all_drone_states(controller): + """顯示所有無人機狀態""" + print("\n=== 所有無人機狀態 ===") + + for drone in controller.drones: + max_attempts = 5 + state = None + + for attempt in range(max_attempts): + state = drone.get_current_state() + if state and 'position' in state: + break + time.sleep(0.2) + + print(f"\n--- 無人機 #{drone.drone_id} ---") + print(f"連接: {drone.connection_string}") + + if not state: + print("狀態: 無法獲取完整數據") + continue + + print(f"飛行模式: {state.get('mode', '未知')}") + print(f"解鎖狀態: {'已解鎖' if state.get('armed', False) else '已上鎖'}") + + if 'gps_position' in state: + gps = state['gps_position'] + print(f"GPS位置: 緯度={gps['lat']:.6f}, 經度={gps['lon']:.6f}, 高度={gps['alt']:.2f}m") + print(f"相對高度: {gps['relative_alt']:.2f}m") + + if 'position' in state: + pos = state['position'] + print(f"本地位置: X={pos['x']:.2f}m, Y={pos['y']:.2f}m, Z={pos['z']:.2f}m") + + print("-----------------------") + + print("=========================") + +def main(): + controller = MultiDroneController(5) + print_menu() + + while True: + try: + choice = input("\n請輸入指令編號: ") + + if choice == '0': + print_menu() + + elif choice == '1': + drone_id = int(input("請輸入要操作的無人機ID (0-4): ")) + drone = controller.get_drone(drone_id) + if drone: + control_single_drone(drone) + else: + print(f"無效的無人機ID: {drone_id}") + + elif choice == '2': + control_all_drones(controller) + + elif choice == '3': + display_all_drone_states(controller) + + elif choice == '4': + print("程式結束") + break + + else: + print("無效的指令,請重新輸入") + + except ValueError as e: + print(f"輸入錯誤: {e}") + except Exception as e: + print(f"發生錯誤: {e}") + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n程式被使用者中斷") + sys.exit(0) \ No newline at end of file