實現close_loop的部份,主要是負責接收matlab回傳的封包,並以mavlink指令的方式打進去Ardupilot裡面

chiyu
wenchun 1 year ago
parent 598e80d070
commit a934ccfa3c

@ -0,0 +1,102 @@
#!/usr/bin/env python3
from pymavlink import mavutil
import socket
import re
import time
import math
def main():
# 建立MAVLink連接
print("建立MAVLink連接...")
connection_string = "udp:127.0.0.1:14550"
master = mavutil.mavlink_connection(connection_string)
master.wait_heartbeat()
print("MAVLink連接已建立")
# 切換到GUIDED模式
master.mav.set_mode_send(
master.target_system,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
4 # GUIDED模式ID
)
print("已切換到GUIDED模式")
# 解鎖無人機
master.mav.command_long_send(
master.target_system,
master.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0,
1, 0, 0, 0, 0, 0, 0 # 1表示解鎖
)
print("無人機已解鎖")
# 設置UDP服務器
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('0.0.0.0', 5000))
server_socket.settimeout(0.1) # 非阻塞操作
print("UDP服務器已啟動監聽端口5000")
# 主循環,持續處理數據
while True:
try:
# 接收UDP數據
try:
data, _ = server_socket.recvfrom(1024)
decoded_data = data.decode('utf-8')
# 提取velocity數據
vel_match = re.search(r'"next_velocity":\[([-\d\.\,]+)\]', decoded_data)
if vel_match:
velocity_str = vel_match.group(1)
velocity = [float(x) for x in velocity_str.split(',')]
vx, vy, vz = velocity
# 獲取當前位置
pos_msg = master.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=0.5)
if pos_msg:
x, y, z = pos_msg.x, pos_msg.y, pos_msg.z
# 設置type_mask - 僅使用速度控制
type_mask = (
mavutil.mavlink.POSITION_TARGET_TYPEMASK_X_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_Y_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_Z_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE
)
# 發送位置目標命令
print(f"發送速度指令: vx={vx}, vy={vy}, vz={vz}")
master.mav.set_position_target_local_ned_send(
0,
master.target_system,
master.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
type_mask,
0, 0, 0,
vx, vy, vz, # 速度
0, 0, 0, # 加速度
0, 0 # 航向
)
print("success")
# 要再去偵錯沒有data進來的狀況
except socket.timeout:
pass
time.sleep(0.05) # 控制循環速率
except KeyboardInterrupt:
print("程序終止")
server_socket.close()
break
except Exception as e:
print(f"錯誤: {e}")
time.sleep(1)
if __name__ == "__main__":
main()
Loading…
Cancel
Save