serial_udp_update

lunu
lenting1027 11 months ago
parent 29269811c4
commit 02ee5d8378

@ -1,193 +1,193 @@
import asyncio import asyncio
import serial_asyncio import serial_asyncio
import struct import struct
import os import os
import sys import sys
import serial import serial
import signal import signal
import traceback import traceback
from pymavlink import mavutil from pymavlink import mavutil
# === 設定區 === # === 設定區 ===
SERIAL_PORT = 'COM15' # 手動指定 SERIAL_PORT = 'COM15' # 手動指定
SERIAL_BAUDRATE = 57600 SERIAL_BAUDRATE = 57600
UDP_REMOTE_IP = '127.0.0.1' UDP_REMOTE_IP = '127.0.0.1'
UDP_REMOTE_PORT = 14550 UDP_REMOTE_PORT = 14550
DEBUG_MODE = False DEBUG_MODE = False
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
# === 工具函數 === # === 工具函數 ===
def check_serial_port(): def check_serial_port():
try: try:
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
ser.close() ser.close()
return True return True
except serial.SerialException as e: except serial.SerialException as e:
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
return False return False
except Exception as e: except Exception as e:
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
return False return False
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes:
frame_type = 0x10 frame_type = 0x10
dest_addr16 = b'\xFF\xFE' dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00 broadcast_radius = 0x00
options = 0x00 options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16 frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF) checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
# === Serial Protocol 實作 === # === Serial Protocol 實作 ===
class SerialToUDP(asyncio.Protocol): class SerialToUDP(asyncio.Protocol):
def __init__(self, udp_protocol): def __init__(self, udp_protocol):
self.udp_protocol = udp_protocol self.udp_protocol = udp_protocol
self.buffer = bytearray() self.buffer = bytearray()
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'): if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self) self.udp_protocol.set_serial_transport(self)
print(f"Serial connection established on {SERIAL_PORT}") print(f"Serial connection established on {SERIAL_PORT}")
def data_received(self, data): def data_received(self, data):
self.buffer.extend(data) self.buffer.extend(data)
while True: while True:
if len(self.buffer) < 3: if len(self.buffer) < 3:
return return
if self.buffer[0] != 0x7E: if self.buffer[0] != 0x7E:
self.buffer.pop(0) self.buffer.pop(0)
continue continue
length = (self.buffer[1] << 8) | self.buffer[2] length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1 full_length = 3 + length + 1
if len(self.buffer) < full_length: if len(self.buffer) < full_length:
return return
frame = self.buffer[:full_length] frame = self.buffer[:full_length]
del self.buffer[:full_length] del self.buffer[:full_length]
if hasattr(self.udp_protocol, 'send_udp'): if hasattr(self.udp_protocol, 'send_udp'):
self.udp_protocol.send_udp(bytes(frame)) self.udp_protocol.send_udp(bytes(frame))
def write_to_serial(self, data): def write_to_serial(self, data):
try: try:
api_frame = build_api_tx_frame(data, TARGET_ADDR64) api_frame = build_api_tx_frame(data, TARGET_ADDR64)
pass pass
self.transport.write(api_frame) self.transport.write(api_frame)
except Exception as e: except Exception as e:
print(f"[TX Error] 無法封裝或傳送資料: {e}") print(f"[TX Error] 無法封裝或傳送資料: {e}")
# === UDP Protocol 實作 === # === UDP Protocol 實作 ===
class UDPHandler(asyncio.DatagramProtocol): class UDPHandler(asyncio.DatagramProtocol):
def __init__(self): def __init__(self):
self.serial_transport = None self.serial_transport = None
self.transport = None self.transport = None
self.mav_decoder = mavutil.mavlink.MAVLink(None) self.mav_decoder = mavutil.mavlink.MAVLink(None)
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
print("UDP transport ready.") print("UDP transport ready.")
def set_serial_transport(self, serial_transport): def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport self.serial_transport = serial_transport
def datagram_received(self, data, addr): def datagram_received(self, data, addr):
if self.serial_transport: if self.serial_transport:
self.serial_transport.write_to_serial(data) self.serial_transport.write_to_serial(data)
def send_udp(self, data): def send_udp(self, data):
decoded_data = self.decapsulate_data(data) decoded_data = self.decapsulate_data(data)
if decoded_data is None: if decoded_data is None:
pass pass
return return
self.decode_mavlink_data(decoded_data) self.decode_mavlink_data(decoded_data)
if self.transport: if self.transport:
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def decapsulate_data(self, data): def decapsulate_data(self, data):
try: try:
if not data or data[0] != 0x7E: if not data or data[0] != 0x7E:
return None return None
length = (data[1] << 8) | data[2] length = (data[1] << 8) | data[2]
if len(data) < length + 4: if len(data) < length + 4:
return None return None
frame_type = data[3] frame_type = data[3]
if frame_type == 0x90: if frame_type == 0x90:
rf_data_start = 3 + 12 rf_data_start = 3 + 12
return data[rf_data_start:3 + length] return data[rf_data_start:3 + length]
else: else:
return None return None
except Exception as e: except Exception as e:
print(f"[XBee 解封錯誤] {e}") print(f"[XBee 解封錯誤] {e}")
return None return None
def decode_mavlink_data(self, data): def decode_mavlink_data(self, data):
try: try:
msg = self.mav_decoder.parse_char(data) msg = self.mav_decoder.parse_char(data)
if msg: if msg:
if msg.get_type() == "HEARTBEAT": if msg.get_type() == "HEARTBEAT":
pass # 不輸出任何訊息 pass # 不輸出任何訊息
else: else:
pass pass
except Exception as e: except Exception as e:
print(f"[MAVLink Decode Error] {e}") print(f"[MAVLink Decode Error] {e}")
# === 主流程 === # === 主流程 ===
async def main(): async def main():
if not check_serial_port(): if not check_serial_port():
print("程式終止:串口檢查失敗") print("程式終止:串口檢查失敗")
return return
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
if os.name != 'nt': # Windows 不支援 add_signal_handler if os.name != 'nt': # Windows 不支援 add_signal_handler
for sig in (signal.SIGINT, signal.SIGTERM): for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop))) loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop)))
udp_handler = UDPHandler() udp_handler = UDPHandler()
try: try:
udp_transport, _ = await loop.create_datagram_endpoint( udp_transport, _ = await loop.create_datagram_endpoint(
lambda: udp_handler, lambda: udp_handler,
local_addr=('0.0.0.0', 0) local_addr=('0.0.0.0', 0)
) )
except Exception as e: except Exception as e:
print(f"無法創建 UDP 端點:{str(e)}") print(f"無法創建 UDP 端點:{str(e)}")
return return
sock = udp_transport.get_extra_info('socket') sock = udp_transport.get_extra_info('socket')
print(f"UDP listening on {sock.getsockname()}") print(f"UDP listening on {sock.getsockname()}")
try: try:
serial_proto = SerialToUDP(udp_handler) serial_proto = SerialToUDP(udp_handler)
await serial_asyncio.create_serial_connection( await serial_asyncio.create_serial_connection(
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
) )
except Exception as e: except Exception as e:
print(f"無法建立串口連接:{str(e)}") print(f"無法建立串口連接:{str(e)}")
traceback.print_exc() traceback.print_exc()
udp_transport.close() udp_transport.close()
return return
print("等待串口資料...") print("等待串口資料...")
try: try:
await asyncio.Future() await asyncio.Future()
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
async def shutdown(loop): async def shutdown(loop):
print("Shutting down...") print("Shutting down...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks: for task in tasks:
task.cancel() task.cancel()
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
loop.stop() loop.stop()
if __name__ == '__main__': if __name__ == '__main__':
try: try:
asyncio.run(main()) asyncio.run(main())
except KeyboardInterrupt: except KeyboardInterrupt:
print("程式被使用者中斷") print("程式被使用者中斷")
except Exception as e: except Exception as e:
print("程式執行錯誤:") print("程式執行錯誤:")
traceback.print_exc() traceback.print_exc()
Loading…
Cancel
Save