之前簡易模擬的小程式

chiyu
wenchun 1 year ago
parent 30f29f678b
commit f73e067c0b

@ -0,0 +1,497 @@
#!/usr/bin/env python3
from pymavlink import mavutil
import time
import math
import threading
import sys
class DroneController:
def __init__(self, connection_string="udp:127.0.0.1:14550", drone_id=0):
self.master = mavutil.mavlink_connection(connection_string)
self.drone_id = drone_id
self.connection_string = connection_string
self.master.wait_heartbeat()
print(f"無人機 #{self.drone_id} 已建立連接!")
def set_mode(self, mode):
if mode == 'STABILIZE':
mode_id = 0
elif mode == 'AUTO':
mode_id = 3
elif mode == 'GUIDED':
mode_id = 4
elif mode == 'LOITER':
mode_id = 5
elif mode == 'RTL':
mode_id = 6
elif mode == 'LAND':
mode_id = 9
else:
print(f"不支援的模式: {mode}")
return False
self.master.mav.set_mode_send(
self.master.target_system,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode_id
)
for _ in range(5):
msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1)
if msg and msg.custom_mode == mode_id:
return True
return False
def arm(self, arm_state=True):
self.master.mav.command_long_send(
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0,
1 if arm_state else 0, 0, 0, 0, 0, 0, 0
)
for _ in range(5):
msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1)
if msg:
armed = msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED
if (arm_state and armed) or (not arm_state and not armed):
return True
return False
def takeoff(self, altitude):
self.master.mav.command_long_send(
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
0,
0, 0, 0, 0, 0, 0,
altitude
)
timeout = time.time() + 30
while time.time() < timeout:
msg = self.master.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1)
if msg:
current_alt = msg.relative_alt / 1000.0
if abs(current_alt - altitude) < 0.5:
return True
time.sleep(0.1)
return False
def goto_position_local(self, x, y, z, vx=0, vy=0, vz=0, yaw=0, yaw_rate=0):
z = -z
type_mask = (
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE
)
print(f"無人機 #{self.drone_id} 發送位置指令: x={x}, y={y}, z={-z}, vx={vx}, vy={vy}, vz={vz}")
self.master.mav.set_position_target_local_ned_send(
0,
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
type_mask,
x, y, z,
vx, vy, vz,
0, 0, 0,
yaw, yaw_rate
)
print(f"無人機 #{self.drone_id} 開始監控位置變化...")
start_time = time.time()
last_progress_time = start_time
min_dist = float('inf')
while time.time() - start_time < 30:
msg = self.master.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1)
if msg:
dist = math.sqrt((msg.x - x)**2 + (msg.y - y)**2 + (msg.z - z)**2)
min_dist = min(min_dist, dist)
current_time = time.time()
if current_time - last_progress_time >= 5:
if dist > min_dist + 0.1:
print(f"無人機 #{self.drone_id} 偏離目標,重新調整")
self.master.mav.set_position_target_local_ned_send(
0, self.master.target_system, self.master.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
type_mask,
x, y, z, vx, vy, vz, 0, 0, 0, yaw, yaw_rate
)
last_progress_time = current_time
print(f"無人機 #{self.drone_id} 當前位置: x={msg.x:.2f}, y={msg.y:.2f}, z={-msg.z:.2f}, 距離目標: {dist:.2f}m")
if dist < 0.5:
print(f"無人機 #{self.drone_id} 到達目標位置")
return True
vel_msg = self.master.recv_match(type='LOCAL_POSITION_NED', blocking=False)
if vel_msg:
vel_total = math.sqrt(vel_msg.vx**2 + vel_msg.vy**2 + vel_msg.vz**2)
if dist < 1.0 and vel_total < 0.1:
print(f"無人機 #{self.drone_id} 已穩定懸停在目標位置附近")
return True
time.sleep(0.1)
print(f"無人機 #{self.drone_id} 移動超時")
return False
def get_current_state(self):
state = {}
# Mode and armed status
hb_msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=0.5)
if hb_msg:
mode_id = hb_msg.custom_mode
mode_name = {
0: "STABILIZE",
3: "AUTO",
4: "GUIDED",
5: "LOITER",
6: "RTL",
9: "LAND"
}.get(mode_id, f"Unknown({mode_id})")
state['mode'] = mode_name
state['armed'] = bool(hb_msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED)
# Position
pos_msg = self.master.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1.0)
if pos_msg:
state['position'] = {
'x': round(pos_msg.x, 2),
'y': round(pos_msg.y, 2),
'z': round(-pos_msg.z, 2)
}
# GPS
gps_msg = self.master.recv_match(type='GLOBAL_POSITION_INT', blocking=True, timeout=1.0)
if gps_msg:
state['gps_position'] = {
'lat': gps_msg.lat / 1e7,
'lon': gps_msg.lon / 1e7,
'alt': gps_msg.alt / 1000,
'relative_alt': gps_msg.relative_alt / 1000
}
return state
class MultiDroneController:
def __init__(self, num_drones=5):
self.drones = []
self.num_drones = num_drones
for i in range(num_drones):
port = 14550 + i
connection_string = f"udp:127.0.0.1:{port}"
drone = DroneController(connection_string, i)
self.drones.append(drone)
print(f"已初始化無人機 #{i} 在端口 {port}")
def get_drone(self, drone_id):
if 0 <= drone_id < len(self.drones):
return self.drones[drone_id]
return None
def all_drones_command(self, command, *args):
results = []
threads = []
for drone in self.drones:
thread = threading.Thread(
target=lambda d, cmd, args, results: results.append((d.drone_id, getattr(d, cmd)(*args))),
args=(drone, command, args, results)
)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
for drone_id, success in results:
print(f"無人機 #{drone_id} 執行 {command} 結果: {'成功' if success else '失敗'}")
return all(success for _, success in results)
def print_menu():
menu = [
"\n=== 多機無人機控制界面 ===",
"1. 選擇操作的無人機",
"2. 對所有無人機執行命令",
"3. 查看所有無人機狀態",
"4. 退出",
"0. 顯示此清單",
"=========================="
]
for item in menu:
print(item)
def print_drone_menu():
menu = [
"\n=== 單機無人機控制界面 ===",
"1. 切換至 GUIDED 模式",
"2. 切換至 LAND 模式",
"3. 解鎖",
"4. 上鎖",
"5. 起飛",
"6. 飛到指定位置",
"7. 切換至 RTL 模式(返航)",
"8. 查看當前狀態",
"9. 返回主菜單",
"0. 顯示此清單",
"=========================="
]
for item in menu:
print(item)
def print_all_drones_menu():
menu = [
"\n=== 所有無人機控制界面 ===",
"1. 全部切換至 GUIDED 模式",
"2. 全部切換至 LAND 模式",
"3. 全部解鎖",
"4. 全部上鎖",
"5. 全部起飛",
"6. 全部返航",
"7. 返回主菜單",
"0. 顯示此清單",
"=========================="
]
for item in menu:
print(item)
def control_single_drone(drone):
print_drone_menu()
current_drone_id = drone.drone_id
while True:
try:
choice = input(f"\n無人機 #{current_drone_id} - 請輸入指令編號: ")
if choice == '0':
print_drone_menu()
elif choice == '1':
if drone.set_mode("GUIDED"):
print(f"無人機 #{current_drone_id} 已切換至 GUIDED 模式")
else:
print(f"無人機 #{current_drone_id} 切換模式失敗")
elif choice == '2':
if drone.set_mode("LAND"):
print(f"無人機 #{current_drone_id} 已切換至 LAND 模式")
else:
print(f"無人機 #{current_drone_id} 切換模式失敗")
elif choice == '3':
if drone.arm(True):
print(f"無人機 #{current_drone_id} 已解鎖")
else:
print(f"無人機 #{current_drone_id} 解鎖失敗")
elif choice == '4':
if drone.arm(False):
print(f"無人機 #{current_drone_id} 已上鎖")
else:
print(f"無人機 #{current_drone_id} 上鎖失敗")
elif choice == '5':
altitude = float(input("請輸入起飛高度(公尺): "))
if drone.takeoff(altitude):
print(f"無人機 #{current_drone_id} 已到達目標高度: {altitude}公尺")
else:
print(f"無人機 #{current_drone_id} 起飛失敗")
elif choice == '6':
x = float(input("請輸入X座標(公尺): "))
y = float(input("請輸入Y座標(公尺): "))
z = float(input("請輸入Z座標(公尺,高度為正值): "))
vx = float(input("請輸入X方向速度(公尺/秒,可選): ") or "0")
vy = float(input("請輸入Y方向速度(公尺/秒,可選): ") or "0")
if drone.goto_position_local(x, y, z, vx, vy):
print(f"無人機 #{current_drone_id} 已到達目標位置")
else:
print(f"無人機 #{current_drone_id} 移動失敗")
elif choice == '7':
if drone.set_mode("RTL"):
print(f"無人機 #{current_drone_id} 已切換至 RTL 模式,正在返航")
else:
print(f"無人機 #{current_drone_id} 切換至 RTL 模式失敗")
elif choice == '8':
state = drone.get_current_state()
if state:
print(f"\n=== 無人機 #{current_drone_id} 狀態 ===")
print(f"模式: {state.get('mode', '未知')}")
print(f"解鎖狀態: {'已解鎖' if state.get('armed', False) else '已上鎖'}")
if 'gps_position' in state:
gps = state['gps_position']
print(f"GPS位置: 緯度={gps['lat']:.6f}, 經度={gps['lon']:.6f}, 高度={gps['alt']:.2f}m")
if 'position' in state:
pos = state['position']
print(f"本地位置: X={pos['x']:.2f}m, Y={pos['y']:.2f}m, Z={pos['z']:.2f}m")
print("=============================")
else:
print(f"無法獲取無人機 #{current_drone_id} 狀態")
elif choice == '9':
print("返回主菜單")
break
else:
print("無效的指令,請重新輸入")
except ValueError as e:
print(f"輸入錯誤: {e}")
except Exception as e:
print(f"發生錯誤: {e}")
def control_all_drones(controller):
print_all_drones_menu()
while True:
try:
choice = input("\n請輸入指令編號: ")
if choice == '0':
print_all_drones_menu()
elif choice == '1':
if controller.all_drones_command("set_mode", "GUIDED"):
print("所有無人機已切換至 GUIDED 模式")
else:
print("部分無人機切換模式失敗")
elif choice == '2':
if controller.all_drones_command("set_mode", "LAND"):
print("所有無人機已切換至 LAND 模式")
else:
print("部分無人機切換模式失敗")
elif choice == '3':
if controller.all_drones_command("arm", True):
print("所有無人機已解鎖")
else:
print("部分無人機解鎖失敗")
elif choice == '4':
if controller.all_drones_command("arm", False):
print("所有無人機已上鎖")
else:
print("部分無人機上鎖失敗")
elif choice == '5':
altitude = float(input("請輸入所有無人機起飛高度(公尺): "))
if controller.all_drones_command("takeoff", altitude):
print(f"所有無人機已到達目標高度: {altitude}公尺")
else:
print("部分無人機起飛失敗")
elif choice == '6':
if controller.all_drones_command("set_mode", "RTL"):
print("所有無人機已切換至 RTL 模式,正在返航")
else:
print("部分無人機切換至 RTL 模式失敗")
elif choice == '7':
print("返回主菜單")
break
else:
print("無效的指令,請重新輸入")
except ValueError as e:
print(f"輸入錯誤: {e}")
except Exception as e:
print(f"發生錯誤: {e}")
def display_all_drone_states(controller):
"""顯示所有無人機狀態"""
print("\n=== 所有無人機狀態 ===")
for drone in controller.drones:
max_attempts = 5
state = None
for attempt in range(max_attempts):
state = drone.get_current_state()
if state and 'position' in state:
break
time.sleep(0.2)
print(f"\n--- 無人機 #{drone.drone_id} ---")
print(f"連接: {drone.connection_string}")
if not state:
print("狀態: 無法獲取完整數據")
continue
print(f"飛行模式: {state.get('mode', '未知')}")
print(f"解鎖狀態: {'已解鎖' if state.get('armed', False) else '已上鎖'}")
if 'gps_position' in state:
gps = state['gps_position']
print(f"GPS位置: 緯度={gps['lat']:.6f}, 經度={gps['lon']:.6f}, 高度={gps['alt']:.2f}m")
print(f"相對高度: {gps['relative_alt']:.2f}m")
if 'position' in state:
pos = state['position']
print(f"本地位置: X={pos['x']:.2f}m, Y={pos['y']:.2f}m, Z={pos['z']:.2f}m")
print("-----------------------")
print("=========================")
def main():
controller = MultiDroneController(5)
print_menu()
while True:
try:
choice = input("\n請輸入指令編號: ")
if choice == '0':
print_menu()
elif choice == '1':
drone_id = int(input("請輸入要操作的無人機ID (0-4): "))
drone = controller.get_drone(drone_id)
if drone:
control_single_drone(drone)
else:
print(f"無效的無人機ID: {drone_id}")
elif choice == '2':
control_all_drones(controller)
elif choice == '3':
display_all_drone_states(controller)
elif choice == '4':
print("程式結束")
break
else:
print("無效的指令,請重新輸入")
except ValueError as e:
print(f"輸入錯誤: {e}")
except Exception as e:
print(f"發生錯誤: {e}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n程式被使用者中斷")
sys.exit(0)
Loading…
Cancel
Save