Mavlink and XBee packet transfer

chiyu
lenting1027 1 year ago
parent 21dce163ed
commit 716b3726c2

@ -0,0 +1,109 @@
import curses
import serial
import struct
from pymavlink.dialects.v20 import ardupilotmega as mavlink2
PORT = "COM5" # or "/dev/ttyUSB0"
BAUD = 57600
target_system = 5
target_component = 1
MAV_CMD_DO_SET_MODE = 176
mode_list = [
("STABILIZE", 0),
("AUTO", 3),
("GUIDED", 4),
("LOITER", 5)
]
ser = serial.Serial(PORT, BAUD)
class PacketCapture:
def __init__(self):
self.data = bytearray()
def write(self, b):
self.data.extend(b)
return len(b)
def build_api_tx_frame(data: bytes, frame_id=0x01):
frame_type = 0x10
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
def curses_main(stdscr):
curses.curs_set(0)
selected = 0
while True:
stdscr.clear()
h, w = stdscr.getmaxyx()
stdscr.addstr(1, 2, "🛫 模式選單(使用 ↑↓ 選擇Enter 發送q 離開)")
for i, (name, _) in enumerate(mode_list):
if i == selected:
stdscr.attron(curses.A_REVERSE)
stdscr.addstr(i + 3, 4, f"> {name}")
stdscr.attroff(curses.A_REVERSE)
else:
stdscr.addstr(i + 3, 4, f" {name}")
key = stdscr.getch()
if key == curses.KEY_UP:
selected = (selected - 1) % len(mode_list)
elif key == curses.KEY_DOWN:
selected = (selected + 1) % len(mode_list)
elif key in [10, 13]: # Enter
name, custom_mode = mode_list[selected]
capture = PacketCapture()
mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1)
mav.version = 2
msg = mav.command_long_encode(
target_system=target_system,
target_component=target_component,
command=MAV_CMD_DO_SET_MODE,
confirmation=0,
param1=1,
param2=custom_mode,
param3=0, param4=0, param5=0, param6=0, param7=0
)
print("🧪 msg =", msg) # 確認封包物件生成
mav.send(msg) # ✅ 改為 send() 會寫入 capture
print("📦 RAW HEX:", capture.data.hex())
api_frame = build_api_tx_frame(capture.data)
ser.write(api_frame)
# 顯示封包資訊
msg_line = min(h - 4, len(mode_list) + 5)
stdscr.addstr(msg_line, 2, f"✅ 發送模式切換:{name} ({custom_mode})")
stdscr.addstr(msg_line + 1, 2, f"📦 MAVLink HEX: {' '.join(f'{b:02x}' for b in capture.data)[:w-4]}")
stdscr.addstr(msg_line + 2, 2, f"📡 XBee API HEX: {' '.join(f'{b:02x}' for b in api_frame)[:w-4]}")
stdscr.refresh()
curses.napms(1500)
elif key in [ord('q'), ord('Q')]:
break
stdscr.refresh()
try:
curses.wrapper(lambda stdscr: curses_main(stdscr)) # 使用 lambda 函數來傳遞 ser
except Exception as e:
print(f"❌ 發生錯誤: {e}")
finally:
ser.close()
print("👋 程式結束,串口已關閉")
Loading…
Cancel
Save