From 29269811c4ef663f0fa3ba8d31dfda2020b373f4 Mon Sep 17 00:00:00 2001 From: lenting1027 Date: Wed, 11 Jun 2025 14:50:35 +0800 Subject: [PATCH] udp_serial_update --- src/unitdev04/udptest4.py | 193 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 src/unitdev04/udptest4.py diff --git a/src/unitdev04/udptest4.py b/src/unitdev04/udptest4.py new file mode 100644 index 0000000..d9dbcb6 --- /dev/null +++ b/src/unitdev04/udptest4.py @@ -0,0 +1,193 @@ +import asyncio +import serial_asyncio +import struct +import os +import sys +import serial +import signal +import traceback +from pymavlink import mavutil + +# === 設定區 === +SERIAL_PORT = 'COM15' # 手動指定 +SERIAL_BAUDRATE = 57600 +UDP_REMOTE_IP = '127.0.0.1' +UDP_REMOTE_PORT = 14550 +DEBUG_MODE = False +TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 + +# === 工具函數 === +def check_serial_port(): + try: + ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) + ser.close() + return True + except serial.SerialException as e: + print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") + return False + except Exception as e: + print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") + return False + +def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: + frame_type = 0x10 + dest_addr16 = b'\xFF\xFE' + broadcast_radius = 0x00 + options = 0x00 + + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += dest_addr64 + dest_addr16 + frame += struct.pack(">BB", broadcast_radius, options) + data + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + +# === Serial Protocol 實作 === +class SerialToUDP(asyncio.Protocol): + def __init__(self, udp_protocol): + self.udp_protocol = udp_protocol + self.buffer = bytearray() + + def connection_made(self, transport): + self.transport = transport + if hasattr(self.udp_protocol, 'set_serial_transport'): + self.udp_protocol.set_serial_transport(self) + print(f"Serial connection established on {SERIAL_PORT}") + + def data_received(self, data): + self.buffer.extend(data) + while True: + if len(self.buffer) < 3: + return + if self.buffer[0] != 0x7E: + self.buffer.pop(0) + continue + length = (self.buffer[1] << 8) | self.buffer[2] + full_length = 3 + length + 1 + if len(self.buffer) < full_length: + return + frame = self.buffer[:full_length] + del self.buffer[:full_length] + if hasattr(self.udp_protocol, 'send_udp'): + self.udp_protocol.send_udp(bytes(frame)) + + def write_to_serial(self, data): + try: + api_frame = build_api_tx_frame(data, TARGET_ADDR64) + pass + self.transport.write(api_frame) + except Exception as e: + print(f"[TX Error] 無法封裝或傳送資料: {e}") + +# === UDP Protocol 實作 === +class UDPHandler(asyncio.DatagramProtocol): + def __init__(self): + self.serial_transport = None + self.transport = None + self.mav_decoder = mavutil.mavlink.MAVLink(None) + + def connection_made(self, transport): + self.transport = transport + print("UDP transport ready.") + + def set_serial_transport(self, serial_transport): + self.serial_transport = serial_transport + + def datagram_received(self, data, addr): + if self.serial_transport: + self.serial_transport.write_to_serial(data) + + def send_udp(self, data): + decoded_data = self.decapsulate_data(data) + if decoded_data is None: + pass + return + self.decode_mavlink_data(decoded_data) + if self.transport: + self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) + + def decapsulate_data(self, data): + try: + if not data or data[0] != 0x7E: + return None + length = (data[1] << 8) | data[2] + if len(data) < length + 4: + return None + frame_type = data[3] + if frame_type == 0x90: + rf_data_start = 3 + 12 + return data[rf_data_start:3 + length] + else: + return None + except Exception as e: + print(f"[XBee 解封錯誤] {e}") + return None + + def decode_mavlink_data(self, data): + try: + msg = self.mav_decoder.parse_char(data) + if msg: + if msg.get_type() == "HEARTBEAT": + pass # 不輸出任何訊息 + else: + pass + except Exception as e: + print(f"[MAVLink Decode Error] {e}") + +# === 主流程 === +async def main(): + if not check_serial_port(): + print("程式終止:串口檢查失敗") + return + + loop = asyncio.get_running_loop() + if os.name != 'nt': # Windows 不支援 add_signal_handler + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop))) + + udp_handler = UDPHandler() + + try: + udp_transport, _ = await loop.create_datagram_endpoint( + lambda: udp_handler, + local_addr=('0.0.0.0', 0) + ) + except Exception as e: + print(f"無法創建 UDP 端點:{str(e)}") + return + + sock = udp_transport.get_extra_info('socket') + print(f"UDP listening on {sock.getsockname()}") + + try: + serial_proto = SerialToUDP(udp_handler) + await serial_asyncio.create_serial_connection( + loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE + ) + except Exception as e: + print(f"無法建立串口連接:{str(e)}") + traceback.print_exc() + udp_transport.close() + return + + print("等待串口資料...") + try: + await asyncio.Future() + except asyncio.CancelledError: + pass + +async def shutdown(loop): + print("Shutting down...") + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("程式被使用者中斷") + except Exception as e: + print("程式執行錯誤:") + traceback.print_exc()