You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
194 lines
6.0 KiB
Python
194 lines
6.0 KiB
Python
import asyncio
|
|
import serial_asyncio
|
|
import struct
|
|
import os
|
|
import sys
|
|
import serial
|
|
import signal
|
|
import traceback
|
|
from pymavlink import mavutil
|
|
|
|
# === 設定區 ===
|
|
SERIAL_PORT = 'COM15' # 手動指定
|
|
SERIAL_BAUDRATE = 57600
|
|
UDP_REMOTE_IP = '127.0.0.1'
|
|
UDP_REMOTE_PORT = 14550
|
|
DEBUG_MODE = False
|
|
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
|
|
|
|
# === 工具函數 ===
|
|
def check_serial_port():
|
|
try:
|
|
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
|
|
ser.close()
|
|
return True
|
|
except serial.SerialException as e:
|
|
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
|
|
return False
|
|
|
|
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes:
|
|
frame_type = 0x10
|
|
dest_addr16 = b'\xFF\xFE'
|
|
broadcast_radius = 0x00
|
|
options = 0x00
|
|
|
|
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
|
|
frame += dest_addr64 + dest_addr16
|
|
frame += struct.pack(">BB", broadcast_radius, options) + data
|
|
checksum = 0xFF - (sum(frame) & 0xFF)
|
|
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
|
|
|
|
# === Serial Protocol 實作 ===
|
|
class SerialToUDP(asyncio.Protocol):
|
|
def __init__(self, udp_protocol):
|
|
self.udp_protocol = udp_protocol
|
|
self.buffer = bytearray()
|
|
|
|
def connection_made(self, transport):
|
|
self.transport = transport
|
|
if hasattr(self.udp_protocol, 'set_serial_transport'):
|
|
self.udp_protocol.set_serial_transport(self)
|
|
print(f"Serial connection established on {SERIAL_PORT}")
|
|
|
|
def data_received(self, data):
|
|
self.buffer.extend(data)
|
|
while True:
|
|
if len(self.buffer) < 3:
|
|
return
|
|
if self.buffer[0] != 0x7E:
|
|
self.buffer.pop(0)
|
|
continue
|
|
length = (self.buffer[1] << 8) | self.buffer[2]
|
|
full_length = 3 + length + 1
|
|
if len(self.buffer) < full_length:
|
|
return
|
|
frame = self.buffer[:full_length]
|
|
del self.buffer[:full_length]
|
|
if hasattr(self.udp_protocol, 'send_udp'):
|
|
self.udp_protocol.send_udp(bytes(frame))
|
|
|
|
def write_to_serial(self, data):
|
|
try:
|
|
api_frame = build_api_tx_frame(data, TARGET_ADDR64)
|
|
pass
|
|
self.transport.write(api_frame)
|
|
except Exception as e:
|
|
print(f"[TX Error] 無法封裝或傳送資料: {e}")
|
|
|
|
# === UDP Protocol 實作 ===
|
|
class UDPHandler(asyncio.DatagramProtocol):
|
|
def __init__(self):
|
|
self.serial_transport = None
|
|
self.transport = None
|
|
self.mav_decoder = mavutil.mavlink.MAVLink(None)
|
|
|
|
def connection_made(self, transport):
|
|
self.transport = transport
|
|
print("UDP transport ready.")
|
|
|
|
def set_serial_transport(self, serial_transport):
|
|
self.serial_transport = serial_transport
|
|
|
|
def datagram_received(self, data, addr):
|
|
if self.serial_transport:
|
|
self.serial_transport.write_to_serial(data)
|
|
|
|
def send_udp(self, data):
|
|
decoded_data = self.decapsulate_data(data)
|
|
if decoded_data is None:
|
|
pass
|
|
return
|
|
self.decode_mavlink_data(decoded_data)
|
|
if self.transport:
|
|
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
|
|
|
|
def decapsulate_data(self, data):
|
|
try:
|
|
if not data or data[0] != 0x7E:
|
|
return None
|
|
length = (data[1] << 8) | data[2]
|
|
if len(data) < length + 4:
|
|
return None
|
|
frame_type = data[3]
|
|
if frame_type == 0x90:
|
|
rf_data_start = 3 + 12
|
|
return data[rf_data_start:3 + length]
|
|
else:
|
|
return None
|
|
except Exception as e:
|
|
print(f"[XBee 解封錯誤] {e}")
|
|
return None
|
|
|
|
def decode_mavlink_data(self, data):
|
|
try:
|
|
msg = self.mav_decoder.parse_char(data)
|
|
if msg:
|
|
if msg.get_type() == "HEARTBEAT":
|
|
pass # 不輸出任何訊息
|
|
else:
|
|
pass
|
|
except Exception as e:
|
|
print(f"[MAVLink Decode Error] {e}")
|
|
|
|
# === 主流程 ===
|
|
async def main():
|
|
if not check_serial_port():
|
|
print("程式終止:串口檢查失敗")
|
|
return
|
|
|
|
loop = asyncio.get_running_loop()
|
|
if os.name != 'nt': # Windows 不支援 add_signal_handler
|
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
|
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop)))
|
|
|
|
udp_handler = UDPHandler()
|
|
|
|
try:
|
|
udp_transport, _ = await loop.create_datagram_endpoint(
|
|
lambda: udp_handler,
|
|
local_addr=('0.0.0.0', 0)
|
|
)
|
|
except Exception as e:
|
|
print(f"無法創建 UDP 端點:{str(e)}")
|
|
return
|
|
|
|
sock = udp_transport.get_extra_info('socket')
|
|
print(f"UDP listening on {sock.getsockname()}")
|
|
|
|
try:
|
|
serial_proto = SerialToUDP(udp_handler)
|
|
await serial_asyncio.create_serial_connection(
|
|
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
|
|
)
|
|
except Exception as e:
|
|
print(f"無法建立串口連接:{str(e)}")
|
|
traceback.print_exc()
|
|
udp_transport.close()
|
|
return
|
|
|
|
print("等待串口資料...")
|
|
try:
|
|
await asyncio.Future()
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def shutdown(loop):
|
|
print("Shutting down...")
|
|
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
|
|
for task in tasks:
|
|
task.cancel()
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
loop.stop()
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("程式被使用者中斷")
|
|
except Exception as e:
|
|
print("程式執行錯誤:")
|
|
traceback.print_exc()
|