XMerge branch 'lunu' into chiyu

chiyu
Chiyu Chen 12 months ago
commit 5769f9ab3b

@ -0,0 +1,109 @@
import curses
import serial
import struct
from pymavlink.dialects.v20 import ardupilotmega as mavlink2
PORT = "COM5" # or "/dev/ttyUSB0"
BAUD = 57600
target_system = 5
target_component = 1
MAV_CMD_DO_SET_MODE = 176
mode_list = [
("STABILIZE", 0),
("AUTO", 3),
("GUIDED", 4),
("LOITER", 5)
]
ser = serial.Serial(PORT, BAUD)
class PacketCapture:
def __init__(self):
self.data = bytearray()
def write(self, b):
self.data.extend(b)
return len(b)
def build_api_tx_frame(data: bytes, frame_id=0x01):
frame_type = 0x10
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
def curses_main(stdscr):
curses.curs_set(0)
selected = 0
while True:
stdscr.clear()
h, w = stdscr.getmaxyx()
stdscr.addstr(1, 2, "模式選單(使用:箭頭選擇Enter:發送q:離開)")
for i, (name, _) in enumerate(mode_list):
if i == selected:
stdscr.attron(curses.A_REVERSE)
stdscr.addstr(i + 3, 4, f"> {name}")
stdscr.attroff(curses.A_REVERSE)
else:
stdscr.addstr(i + 3, 4, f" {name}")
key = stdscr.getch()
if key == curses.KEY_UP:
selected = (selected - 1) % len(mode_list)
elif key == curses.KEY_DOWN:
selected = (selected + 1) % len(mode_list)
elif key in [10, 13]: # Enter
name, custom_mode = mode_list[selected]
capture = PacketCapture()
mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1)
mav.version = 2
msg = mav.command_long_encode(
target_system=target_system,
target_component=target_component,
command=MAV_CMD_DO_SET_MODE,
confirmation=0,
param1=1,
param2=custom_mode,
param3=0, param4=0, param5=0, param6=0, param7=0
)
print("msg =", msg) # 確認封包物件生成
mav.send(msg) # 改為 send() 會寫入 capture
print("RAW HEX:", capture.data.hex())
api_frame = build_api_tx_frame(capture.data)
ser.write(api_frame)
# 顯示封包資訊
msg_line = min(h - 4, len(mode_list) + 5)
stdscr.addstr(msg_line, 2, f"發送模式切換:{name} ({custom_mode})")
stdscr.addstr(msg_line + 1, 2, f"MAVLink HEX: {' '.join(f'{b:02x}' for b in capture.data)[:w-4]}")
stdscr.addstr(msg_line + 2, 2, f"XBee API HEX: {' '.join(f'{b:02x}' for b in api_frame)[:w-4]}")
stdscr.refresh()
curses.napms(1500)
elif key in [ord('q'), ord('Q')]:
break
stdscr.refresh()
try:
curses.wrapper(lambda stdscr: curses_main(stdscr)) # 使用 lambda 函數來傳遞 ser
except Exception as e:
print(f"發生錯誤: {e}")
finally:
ser.close()
print("程式結束,串口已關閉")

@ -0,0 +1,60 @@
from xbee import XBee
import serial
from pymavlink import mavutil
import time
# 配置 XBee 串口連接
ser = serial.Serial('COM5', 57600) # XBee 串口
# 無人機系統 ID假設為 5和目標系統 ID假設為 1
system_id = 5
target_system_id = 1 # 設定目標系統 ID這裡假設目標系統 ID 是 1
# 目標設備的 64 位地址(請替換為實際的無人機地址)
target_address = b'\x00\x13\xa2\x00\x40\x5f\x88\x56' # 例如00 13 A2 00 40 5F 88 56
def decode_mavlink_data(data,master):
"""解碼 MAVLink 的原始數據並處理 HEARTBEAT 訊息"""
try:
msg = master.parse_char(data)
if msg:
if msg.get_type() == "HEARTBEAT":
print(f"Raw MAVLink Data (Hex): {data.hex()}")
print(f"Received MAVLink message: {msg}")
print(f"System status: {msg.system_status}")
print(f"Flight mode: {mavutil.mode_string_v10(msg)}")
print(f"System ID: {msg.get_srcSystem()}") # 使用 get_srcSystem() 獲取 system_id
except Exception as e:
print(f"Failed to decode MAVLink data: {e}")
def receive_packets(ser):
xbee = XBee(ser)
# 創建 MAVLink 解析器並與 XBee 串口連接
master = mavutil.mavlink.MAVLink(ser)
while True:
try:
# 從 XBee 接收數據
xbee_data = xbee.wait_read_frame()
# 讀取 `rf_data` 而非 `payload`
raw_data = xbee_data.get('rf_data', None)
if raw_data is None:
print("Warning: No 'rf_data' found in received XBee data!")
continue # 跳過此次循環
# 解碼 MAVLink 訊息
decode_mavlink_data(raw_data,master)
# 根據需要觸發模式切換,例如根據用戶輸入更改模式
except KeyboardInterrupt:
print("Exiting...")
break
except Exception as e:
print(f"Error: {e}")
time.sleep(0.1)
Loading…
Cancel
Save