import asyncio import serial_asyncio import struct import os import sys import serial import signal import traceback from pymavlink import mavutil # === 設定區 === SERIAL_PORT = 'COM15' # 手動指定 SERIAL_BAUDRATE = 57600 UDP_REMOTE_IP = '127.0.0.1' UDP_REMOTE_PORT = 14550 DEBUG_MODE = False TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 # === 工具函數 === def check_serial_port(): try: ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) ser.close() return True except serial.SerialException as e: print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") return False except Exception as e: print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") return False def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: frame_type = 0x10 dest_addr16 = b'\xFF\xFE' broadcast_radius = 0x00 options = 0x00 frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) frame += dest_addr64 + dest_addr16 frame += struct.pack(">BB", broadcast_radius, options) + data checksum = 0xFF - (sum(frame) & 0xFF) return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) # === Serial Protocol 實作 === class SerialToUDP(asyncio.Protocol): def __init__(self, udp_protocol): self.udp_protocol = udp_protocol self.buffer = bytearray() def connection_made(self, transport): self.transport = transport if hasattr(self.udp_protocol, 'set_serial_transport'): self.udp_protocol.set_serial_transport(self) print(f"Serial connection established on {SERIAL_PORT}") def data_received(self, data): self.buffer.extend(data) while True: if len(self.buffer) < 3: return if self.buffer[0] != 0x7E: self.buffer.pop(0) continue length = (self.buffer[1] << 8) | self.buffer[2] full_length = 3 + length + 1 if len(self.buffer) < full_length: return frame = self.buffer[:full_length] del self.buffer[:full_length] if hasattr(self.udp_protocol, 'send_udp'): self.udp_protocol.send_udp(bytes(frame)) def write_to_serial(self, data): try: api_frame = build_api_tx_frame(data, TARGET_ADDR64) pass self.transport.write(api_frame) except Exception as e: print(f"[TX Error] 無法封裝或傳送資料: {e}") # === UDP Protocol 實作 === class UDPHandler(asyncio.DatagramProtocol): def __init__(self): self.serial_transport = None self.transport = None self.mav_decoder = mavutil.mavlink.MAVLink(None) def connection_made(self, transport): self.transport = transport print("UDP transport ready.") def set_serial_transport(self, serial_transport): self.serial_transport = serial_transport def datagram_received(self, data, addr): if self.serial_transport: self.serial_transport.write_to_serial(data) def send_udp(self, data): decoded_data = self.decapsulate_data(data) if decoded_data is None: pass return self.decode_mavlink_data(decoded_data) if self.transport: self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) def decapsulate_data(self, data): try: if not data or data[0] != 0x7E: return None length = (data[1] << 8) | data[2] if len(data) < length + 4: return None frame_type = data[3] if frame_type == 0x90: rf_data_start = 3 + 12 return data[rf_data_start:3 + length] else: return None except Exception as e: print(f"[XBee 解封錯誤] {e}") return None def decode_mavlink_data(self, data): try: msg = self.mav_decoder.parse_char(data) if msg: if msg.get_type() == "HEARTBEAT": pass # 不輸出任何訊息 else: pass except Exception as e: print(f"[MAVLink Decode Error] {e}") # === 主流程 === async def main(): if not check_serial_port(): print("程式終止:串口檢查失敗") return loop = asyncio.get_running_loop() if os.name != 'nt': # Windows 不支援 add_signal_handler for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop))) udp_handler = UDPHandler() try: udp_transport, _ = await loop.create_datagram_endpoint( lambda: udp_handler, local_addr=('0.0.0.0', 0) ) except Exception as e: print(f"無法創建 UDP 端點:{str(e)}") return sock = udp_transport.get_extra_info('socket') print(f"UDP listening on {sock.getsockname()}") try: serial_proto = SerialToUDP(udp_handler) await serial_asyncio.create_serial_connection( loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE ) except Exception as e: print(f"無法建立串口連接:{str(e)}") traceback.print_exc() udp_transport.close() return print("等待串口資料...") try: await asyncio.Future() except asyncio.CancelledError: pass async def shutdown(loop): print("Shutting down...") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) loop.stop() if __name__ == '__main__': try: asyncio.run(main()) except KeyboardInterrupt: print("程式被使用者中斷") except Exception as e: print("程式執行錯誤:") traceback.print_exc()