#!/usr/bin/env python3 from pymavlink import mavutil import time import math import threading class DroneController: def __init__(self, connection_string="udp:127.0.0.1:14550"): # 建立 MAVLink 連接 self.master = mavutil.mavlink_connection(connection_string) # 等待心跳包 self.master.wait_heartbeat() print("已建立連接!") def set_mode(self, mode): """設置飛行模式""" mode_id = self.master.mode_mapping()[mode] self.master.mav.set_mode_send( self.master.target_system, mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, mode_id ) for _ in range(5): msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1) if msg and msg.custom_mode == mode_id: return True return False def arm(self, arm_state=True): """解鎖/上鎖無人機""" self.master.mav.command_long_send( self.master.target_system, self.master.target_component, mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, 0, 1 if arm_state else 0, 0, 0, 0, 0, 0, 0 ) for _ in range(5): msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1) if msg: armed = msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED if armed == arm_state: return True return False def takeoff(self, altitude): """起飛到指定高度""" self.master.mav.command_long_send( self.master.target_system, self.master.target_component, mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, 0, 0, 0, 0, 0, 0, 0, altitude ) while True: msg = self.master.recv_match(type='GLOBAL_POSITION_INT', blocking=True) if msg: current_alt = msg.relative_alt / 1000.0 if abs(current_alt - altitude) < 0.5: return True time.sleep(0.1) def goto_position_local(self, x, y, z, vx=2, vy=2, vz=0, yaw=0, yaw_rate=0): """ 飛到指定的本地座標位置 x, y, z: 目標位置(公尺) """ # NED系統中,向上飛需要給負值 z = -z # 將輸入的z值轉換為NED系統 # 重要:type_mask 設為 0 表示使用該維度 # 我們只想使用位置控制 (x,y,z),所以其他都設為 ignore type_mask = ( # mavutil.mavlink.POSITION_TARGET_TYPEMASK_VX_IGNORE | # 8 # mavutil.mavlink.POSITION_TARGET_TYPEMASK_VY_IGNORE | # 16 # mavutil.mavlink.POSITION_TARGET_TYPEMASK_VZ_IGNORE | # 32 mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE | # 64 mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE | # 128 mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE | # 256 mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE | # 1024 mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE # 2048 ) # 或直接使用二進制: type_mask = 0b0000111111111000 print(f"發送位置指令: x={x}, y={y}, z={-z}") self.master.mav.set_position_target_local_ned_send( 0, # timestamp self.master.target_system, self.master.target_component, mavutil.mavlink.MAV_FRAME_LOCAL_NED, type_mask, x, y, z, # 位置 0, 0, 0, # 速度 (設為 0) 5, 5, 0, # 加速度 (設為 0) 0, 0 # yaw, yaw_rate (設為 0) ) # 監控位置變化 print("開始監控位置變化...") start_time = time.time() while time.time() - start_time < 20: # 20秒超時 msg = self.master.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1) if msg: dist = math.sqrt((msg.x - x)**2 + (msg.y - y)**2 + (msg.z - z)**2) print(f"當前位置: x={msg.x:.2f}, y={msg.y:.2f}, z={-msg.z:.2f}, 距離目標: {dist:.2f}m") if dist < 0.5: print("到達目標位置") return True time.sleep(0.1) print("移動超時") return False def get_current_state(self): """獲取當前狀態""" msg = self.master.recv_match(type='HEARTBEAT', blocking=True) if msg: mode = list(self.master.mode_mapping().keys())[list(self.master.mode_mapping().values()).index(msg.custom_mode)] armed = bool(msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) return { 'mode': mode, 'armed': armed } return None def print_menu(): """列印功能表""" menu = [ "\n=== 無人機控制界面 ===", "1. 切換至 GUIDED 模式", "2. 切換至 LAND 模式", "3. 解鎖", "4. 上鎖", "5. 起飛", "6. 飛到指定位置", "7. 切換至 RTL 模式(返航)", "8. 查看當前狀態", "9. 退出", "0. 顯示此清單", "====================" ] for item in menu: print(item) def main(): drone = DroneController() print_menu() while True: try: choice = input("\n請輸入指令編號: ") if choice == '0': print_menu() elif choice == '1': if drone.set_mode("GUIDED"): print("已切換至 GUIDED 模式") else: print("切換模式失敗") elif choice == '2': if drone.set_mode("LAND"): print("已切換至 LAND 模式") else: print("切換模式失敗") elif choice == '3': if drone.arm(True): print("無人機已解鎖") else: print("解鎖失敗") elif choice == '4': if drone.arm(False): print("無人機已上鎖") else: print("上鎖失敗") elif choice == '5': altitude = float(input("請輸入起飛高度(公尺): ")) if drone.takeoff(altitude): print(f"已到達目標高度: {altitude}公尺") else: print("起飛失敗") elif choice == '6': x = float(input("請輸入X座標(公尺): ")) y = float(input("請輸入Y座標(公尺): ")) z = float(input("請輸入Z座標(公尺,負值代表向下): ")) if drone.goto_position_local(x, y, z): print("已到達目標位置") else: print("移動失敗") elif choice == '7': if drone.set_mode("RTL"): print("已切換至 RTL 模式,無人機正在返航") else: print("切換至 RTL 模式失敗") elif choice == '8': state = drone.get_current_state() if state: print("\n=== 無人機狀態 ===") print(f"模式: {state['mode']}") print(f"解鎖狀態: {'已解鎖' if state['armed'] else '已上鎖'}") print("==================") else: print("無法獲取狀態") elif choice == '9': print("程式結束") break else: print("無效的指令,請重新輸入") except ValueError as e: print(f"輸入錯誤: {e}") except Exception as e: print(f"發生錯誤: {e}") if __name__ == "__main__": main()