diff --git a/src/unitdev04/udptest4.py b/serial_udp_update.py similarity index 96% rename from src/unitdev04/udptest4.py rename to serial_udp_update.py index d9dbcb6..728adca 100644 --- a/src/unitdev04/udptest4.py +++ b/serial_udp_update.py @@ -1,193 +1,193 @@ -import asyncio -import serial_asyncio -import struct -import os -import sys -import serial -import signal -import traceback -from pymavlink import mavutil - -# === 設定區 === -SERIAL_PORT = 'COM15' # 手動指定 -SERIAL_BAUDRATE = 57600 -UDP_REMOTE_IP = '127.0.0.1' -UDP_REMOTE_PORT = 14550 -DEBUG_MODE = False -TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 - -# === 工具函數 === -def check_serial_port(): - try: - ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) - ser.close() - return True - except serial.SerialException as e: - print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") - return False - except Exception as e: - print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") - return False - -def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: - frame_type = 0x10 - dest_addr16 = b'\xFF\xFE' - broadcast_radius = 0x00 - options = 0x00 - - frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) - frame += dest_addr64 + dest_addr16 - frame += struct.pack(">BB", broadcast_radius, options) + data - checksum = 0xFF - (sum(frame) & 0xFF) - return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) - -# === Serial Protocol 實作 === -class SerialToUDP(asyncio.Protocol): - def __init__(self, udp_protocol): - self.udp_protocol = udp_protocol - self.buffer = bytearray() - - def connection_made(self, transport): - self.transport = transport - if hasattr(self.udp_protocol, 'set_serial_transport'): - self.udp_protocol.set_serial_transport(self) - print(f"Serial connection established on {SERIAL_PORT}") - - def data_received(self, data): - self.buffer.extend(data) - while True: - if len(self.buffer) < 3: - return - if self.buffer[0] != 0x7E: - self.buffer.pop(0) - continue - length = (self.buffer[1] << 8) | self.buffer[2] - full_length = 3 + length + 1 - if len(self.buffer) < full_length: - return - frame = self.buffer[:full_length] - del self.buffer[:full_length] - if hasattr(self.udp_protocol, 'send_udp'): - self.udp_protocol.send_udp(bytes(frame)) - - def write_to_serial(self, data): - try: - api_frame = build_api_tx_frame(data, TARGET_ADDR64) - pass - self.transport.write(api_frame) - except Exception as e: - print(f"[TX Error] 無法封裝或傳送資料: {e}") - -# === UDP Protocol 實作 === -class UDPHandler(asyncio.DatagramProtocol): - def __init__(self): - self.serial_transport = None - self.transport = None - self.mav_decoder = mavutil.mavlink.MAVLink(None) - - def connection_made(self, transport): - self.transport = transport - print("UDP transport ready.") - - def set_serial_transport(self, serial_transport): - self.serial_transport = serial_transport - - def datagram_received(self, data, addr): - if self.serial_transport: - self.serial_transport.write_to_serial(data) - - def send_udp(self, data): - decoded_data = self.decapsulate_data(data) - if decoded_data is None: - pass - return - self.decode_mavlink_data(decoded_data) - if self.transport: - self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) - - def decapsulate_data(self, data): - try: - if not data or data[0] != 0x7E: - return None - length = (data[1] << 8) | data[2] - if len(data) < length + 4: - return None - frame_type = data[3] - if frame_type == 0x90: - rf_data_start = 3 + 12 - return data[rf_data_start:3 + length] - else: - return None - except Exception as e: - print(f"[XBee 解封錯誤] {e}") - return None - - def decode_mavlink_data(self, data): - try: - msg = self.mav_decoder.parse_char(data) - if msg: - if msg.get_type() == "HEARTBEAT": - pass # 不輸出任何訊息 - else: - pass - except Exception as e: - print(f"[MAVLink Decode Error] {e}") - -# === 主流程 === -async def main(): - if not check_serial_port(): - print("程式終止:串口檢查失敗") - return - - loop = asyncio.get_running_loop() - if os.name != 'nt': # Windows 不支援 add_signal_handler - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop))) - - udp_handler = UDPHandler() - - try: - udp_transport, _ = await loop.create_datagram_endpoint( - lambda: udp_handler, - local_addr=('0.0.0.0', 0) - ) - except Exception as e: - print(f"無法創建 UDP 端點:{str(e)}") - return - - sock = udp_transport.get_extra_info('socket') - print(f"UDP listening on {sock.getsockname()}") - - try: - serial_proto = SerialToUDP(udp_handler) - await serial_asyncio.create_serial_connection( - loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE - ) - except Exception as e: - print(f"無法建立串口連接:{str(e)}") - traceback.print_exc() - udp_transport.close() - return - - print("等待串口資料...") - try: - await asyncio.Future() - except asyncio.CancelledError: - pass - -async def shutdown(loop): - print("Shutting down...") - tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) - loop.stop() - -if __name__ == '__main__': - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("程式被使用者中斷") - except Exception as e: - print("程式執行錯誤:") - traceback.print_exc() +import asyncio +import serial_asyncio +import struct +import os +import sys +import serial +import signal +import traceback +from pymavlink import mavutil + +# === 設定區 === +SERIAL_PORT = 'COM15' # 手動指定 +SERIAL_BAUDRATE = 57600 +UDP_REMOTE_IP = '127.0.0.1' +UDP_REMOTE_PORT = 14550 +DEBUG_MODE = False +TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 + +# === 工具函數 === +def check_serial_port(): + try: + ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) + ser.close() + return True + except serial.SerialException as e: + print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") + return False + except Exception as e: + print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") + return False + +def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: + frame_type = 0x10 + dest_addr16 = b'\xFF\xFE' + broadcast_radius = 0x00 + options = 0x00 + + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += dest_addr64 + dest_addr16 + frame += struct.pack(">BB", broadcast_radius, options) + data + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + +# === Serial Protocol 實作 === +class SerialToUDP(asyncio.Protocol): + def __init__(self, udp_protocol): + self.udp_protocol = udp_protocol + self.buffer = bytearray() + + def connection_made(self, transport): + self.transport = transport + if hasattr(self.udp_protocol, 'set_serial_transport'): + self.udp_protocol.set_serial_transport(self) + print(f"Serial connection established on {SERIAL_PORT}") + + def data_received(self, data): + self.buffer.extend(data) + while True: + if len(self.buffer) < 3: + return + if self.buffer[0] != 0x7E: + self.buffer.pop(0) + continue + length = (self.buffer[1] << 8) | self.buffer[2] + full_length = 3 + length + 1 + if len(self.buffer) < full_length: + return + frame = self.buffer[:full_length] + del self.buffer[:full_length] + if hasattr(self.udp_protocol, 'send_udp'): + self.udp_protocol.send_udp(bytes(frame)) + + def write_to_serial(self, data): + try: + api_frame = build_api_tx_frame(data, TARGET_ADDR64) + pass + self.transport.write(api_frame) + except Exception as e: + print(f"[TX Error] 無法封裝或傳送資料: {e}") + +# === UDP Protocol 實作 === +class UDPHandler(asyncio.DatagramProtocol): + def __init__(self): + self.serial_transport = None + self.transport = None + self.mav_decoder = mavutil.mavlink.MAVLink(None) + + def connection_made(self, transport): + self.transport = transport + print("UDP transport ready.") + + def set_serial_transport(self, serial_transport): + self.serial_transport = serial_transport + + def datagram_received(self, data, addr): + if self.serial_transport: + self.serial_transport.write_to_serial(data) + + def send_udp(self, data): + decoded_data = self.decapsulate_data(data) + if decoded_data is None: + pass + return + self.decode_mavlink_data(decoded_data) + if self.transport: + self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) + + def decapsulate_data(self, data): + try: + if not data or data[0] != 0x7E: + return None + length = (data[1] << 8) | data[2] + if len(data) < length + 4: + return None + frame_type = data[3] + if frame_type == 0x90: + rf_data_start = 3 + 12 + return data[rf_data_start:3 + length] + else: + return None + except Exception as e: + print(f"[XBee 解封錯誤] {e}") + return None + + def decode_mavlink_data(self, data): + try: + msg = self.mav_decoder.parse_char(data) + if msg: + if msg.get_type() == "HEARTBEAT": + pass # 不輸出任何訊息 + else: + pass + except Exception as e: + print(f"[MAVLink Decode Error] {e}") + +# === 主流程 === +async def main(): + if not check_serial_port(): + print("程式終止:串口檢查失敗") + return + + loop = asyncio.get_running_loop() + if os.name != 'nt': # Windows 不支援 add_signal_handler + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop))) + + udp_handler = UDPHandler() + + try: + udp_transport, _ = await loop.create_datagram_endpoint( + lambda: udp_handler, + local_addr=('0.0.0.0', 0) + ) + except Exception as e: + print(f"無法創建 UDP 端點:{str(e)}") + return + + sock = udp_transport.get_extra_info('socket') + print(f"UDP listening on {sock.getsockname()}") + + try: + serial_proto = SerialToUDP(udp_handler) + await serial_asyncio.create_serial_connection( + loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE + ) + except Exception as e: + print(f"無法建立串口連接:{str(e)}") + traceback.print_exc() + udp_transport.close() + return + + print("等待串口資料...") + try: + await asyncio.Future() + except asyncio.CancelledError: + pass + +async def shutdown(loop): + print("Shutting down...") + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("程式被使用者中斷") + except Exception as e: + print("程式執行錯誤:") + traceback.print_exc()