From 417d9e8f573f865de4ff4122be9e56b51abe981a Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Mon, 26 May 2025 22:43:57 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E6=96=B0=E5=A2=9E=20serial=5Fudp=5Fbitran?= =?UTF-8?q?s.py=20=E8=A9=B2=E7=A8=8B=E5=BC=8F=E9=80=A3=E7=B5=90=20serial?= =?UTF-8?q?=20=E7=9A=84=E5=B0=81=E5=8C=85=20=E7=B6=93=E9=81=8E=E8=BD=89?= =?UTF-8?q?=E6=8F=9B=E5=BE=8C=20=E5=86=8D=E5=90=91=20udp=20=E5=8F=A3?= =?UTF-8?q?=E4=B8=9F=E5=87=BA=20(=E5=8F=8D=E4=B9=8B=E4=BA=A6=E7=84=B6)=202?= =?UTF-8?q?.=20=E5=85=B6=E9=A4=98=E6=AA=94=E6=A1=88=E5=8F=AA=E6=98=AF=20?= =?UTF-8?q?=E5=90=8D=E7=A8=B1=E8=88=87=E8=A8=BB=E8=A7=A3=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 3 +- .../fc_network_adapter/mavlinkDevice.py | 5 + .../fc_network_adapter/mavlinkObject.py | 1 - .../fc_network_adapter/serial_udp_bitrans.py | 340 ++++++++++++++++++ src/unitdev02/unitdev02/server.py | 33 -- .../unitdev02/{client.py => unix_client.py} | 0 src/unitdev02/unitdev02/unix_server.py | 94 +++++ 7 files changed, 441 insertions(+), 35 deletions(-) create mode 100644 src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py delete mode 100644 src/unitdev02/unitdev02/server.py rename src/unitdev02/unitdev02/{client.py => unix_client.py} (100%) create mode 100644 src/unitdev02/unitdev02/unix_server.py diff --git a/README.md b/README.md index 9f6379f..7fd8868 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,8 @@ Python 1. pymavlink -2. +2. conda-forge 中的 pyserial-asyncio +3. ROS2 1. source ~/ros2_humble/install/setup.bash diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py index 4b2b07e..4ae4c98 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py @@ -31,6 +31,11 @@ class mavlink_device(): # p_str += '=====================\n' return p_str + ''' + 寫了半天 這個功能應該是 pymalink 本來就有的 + 去找 pymavlink_util.py 的 mavfile(object) + 算了 先擺著吧 之後再看看怎麼整合 + ''' def updateComponentPacketCount(self, compid, current_seq, current_type, current_time): # 這段檢查遺失封包 try: diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index 072d861..1589150 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -345,7 +345,6 @@ class mavlink_object(): return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg)) else: _queue = swap_queues[i-2] - # _queue.put((self.socket_id,timestamp,mavlinkMsg)) # 測試看看 也許不需要別的資訊 只需要封包 _queue.put(mavlinkMsg) # 處理要送出的封包 diff --git a/src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py b/src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py new file mode 100644 index 0000000..cb4373c --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py @@ -0,0 +1,340 @@ +import asyncio +import serial_asyncio + +# 附加驗證功能 +import os +import sys +import serial +import signal + +# XBee 模組 +from xbee.frame import APIFrame + +# ========================= + +SERIAL_PORT = '/dev/ttyACM0' # serial port +SERIAL_BAUDRATE = 57600 # serial baudrate + +UDP_REMOTE_IP = '127.0.0.1' # UDP Target IP +UDP_REMOTE_PORT = 14550 # UDP Target port + +# 測試用 只會吃一次資料 +DEBUG_MODE = False + +# ========================= + +def check_serial_port(): + """檢查串口是否存在與可用""" + # 檢查設備是否存在 + if not os.path.exists(SERIAL_PORT): + print(f"錯誤:串口設備 {SERIAL_PORT} 不存在") + return False + + # 檢查是否有權限訪問設備 + try: + os.access(SERIAL_PORT, os.R_OK | os.W_OK) + except Exception as e: + print(f"錯誤:無法訪問串口設備 {SERIAL_PORT}:{str(e)}") + return False + + # 檢查是否被占用 + try: + # 嘗試打開串口 + ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) + ser.close() # 打開成功後立即關閉 + return True + except serial.SerialException as e: + print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") + return False + except Exception as e: + print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") + return False + +# ========================= + +class SerialToUDP(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發 + def __init__(self, udp_protocol): + self.udp_protocol = udp_protocol + self.first_data = True # 標記是否為第一次收到資料 + self.has_processed = False # 測試模式用 處理數據旗標 # debug + + def connection_made(self, transport): + self.transport = transport + if hasattr(self.udp_protocol, 'set_serial_transport'): + self.udp_protocol.set_serial_transport(self) + print(f"Serial connection established on {SERIAL_PORT}") + + ## ===================================== + + # Serial 收到資料,轉發到 UDP + def data_received(self, data): + # 在 DEBUG 模式下,如果已經處理過數據,則直接返回 # debug + if DEBUG_MODE and self.has_processed: + return + + # 標記首次收到資料 + if hasattr(self.udp_protocol, 'send_udp'): + if self.first_data: + print(f"First data received from serial, forwarding to UDP: {data[:20]}...") + self.first_data = False + + # 轉發數據 + self.udp_protocol.send_udp(data) + + if DEBUG_MODE: # 測試模式用 # debug + self.has_processed = True + print("[DEBUG] SerialToUDP Mark") + + def write_to_serial(self, data): + # 在資料透過 Serial 發送之前進行處理 + processed_data = self.encapsulate_data(data) + + # 處理失敗就不發送 + if processed_data not None: + self.transport.write(processed_data) + + def encapsulate_data(self, data): + + """ + 將數據封裝為 XBee API 傳輸幀 + + 使用 XBee API 格式封裝數據: + - 傳輸請求幀 (0x10) + - 使用廣播地址 + - 添加適當的頭部和校驗和 + """ + + ## 方法一 + # 設置幀參數 + frame_id = 0x01 # 幀識別碼,用於確認 + frame_type = 0x10 # 幀類型:傳輸請求 + dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 64位目標地址 (廣播) + dest_addr16 = b'\xFF\xFE' # 16位目標地址 (未知/廣播) + broadcast_radius = 0x00 # 廣播半徑 (0 = 最大) + options = 0x00 # 傳輸選項 + + # 構建幀數據部分 + frame_data = bytearray() + frame_data.append(frame_type) # 添加幀類型 + frame_data.append(frame_id) # 添加幀 ID + frame_data.extend(dest_addr64) # 添加 64 位目標地址 + frame_data.extend(dest_addr16) # 添加 16 位目標地址 + frame_data.append(broadcast_radius) # 添加廣播半徑 + frame_data.append(options) # 添加選項 + frame_data.extend(data) # 添加實際數據內容 + + # 計算校驗和 (0xFF 減去所有字節的總和的最低 8 位) + checksum = 0xFF - (sum(frame_data) & 0xFF) + + # 構建完整的 API 幀 + # 起始分隔符 + 長度 (兩字節) + 幀數據 + 校驗和 + complete_frame = bytearray() + complete_frame.append(0x7E) # 添加起始分隔符 + complete_frame.extend(struct.pack(">H", len(frame_data))) # 添加長度 (高位優先) + complete_frame.extend(frame_data) # 添加幀數據 + complete_frame.append(checksum) # 添加校驗和 + + return bytes(complete_frame) + + ## 方法二 + # frame_id=0x01 + # frame_type = 0x10 + # dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 + # dest_addr16 = b'\xFF\xFE' + # broadcast_radius = 0x00 + # options = 0x00 + + # frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + # frame += dest_addr64 + dest_addr16 + # frame += struct.pack(">BB", broadcast_radius, options) + data + # checksum = 0xFF - (sum(frame) & 0xFF) + # return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + + +class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發 + def __init__(self): + self.serial_transport = None + self.transport = None + self.remote_addr = None # 儲存動態獲取的遠程地址 + self.has_processed = False # 測試模式用 處理數據旗標 # debug + + def connection_made(self, transport): + self.transport = transport + print("UDP transport ready. Waiting for serial data before sending...") + + def set_serial_transport(self, serial_transport): + self.serial_transport = serial_transport + + ## ===================================== + + # UDP 收到資料 + def datagram_received(self, data, addr): + # 儲存對方的地址(這樣就能向同一個來源回傳數據) + # self.remote_addr = addr + # print(f"Received UDP data from {addr}, setting as remote address") + + # 在 DEBUG 模式下,如果已經處理過數據,則直接返回 + if DEBUG_MODE and self.has_processed: + return + + if self.serial_transport: + self.serial_transport.write_to_serial(data) + + if DEBUG_MODE: # 測試模式用 + self.has_processed = True + print("[DEBUG] UDPHandler Mark") + + def send_udp(self, data): + # 發送資料到 UDP + + # if self.transport: + # # 如果有已知的回應地址,使用該地址 + # if self.remote_addr: + # self.transport.sendto(data, self.remote_addr) + # # print(f"Sending to dynamic address: {self.remote_addr}") + # else: + # # 否則使用預設地址 + # self.transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) + # print(f"Sending first UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}") + + if self.transport: + # 只有第一次或地址改變時才顯示 + # if not hasattr(self, '_last_sent_addr') or self._last_sent_addr != (UDP_REMOTE_IP, UDP_REMOTE_PORT): + # print(f"Sending UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}") + # self._last_sent_addr = (UDP_REMOTE_IP, UDP_REMOTE_PORT) + + # 在透過 UDP 發送數據之前進行解封裝 + decoded_data = self.decapsulate_data(data) + self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) + + def decapsulate_data(self, data): + # 這裡可以根據需要進行數據解封裝 + + ## 方法一 + try: + # 創建一個 API 幀處理器 + api_frame = APIFrame(escaped=True) + + # 嘗試解析數據 + api_frame.fill(data) + + # 如果數據不完整,直接返回原始數據 + if not api_frame.is_complete(): + return data + + # 解析幀內容 + parsed_data = api_frame.parse() + + # 提取有用數據 + if parsed_data: + frame_data = parsed_data.get('rf_data', None) + if frame_data: + return frame_data + + return data + + ## 方法二 - 手動解析 + # try: + # # XBee API 幀格式: + # # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節) + + # # 檢查幀起始符 (0x7E) + # if not data or len(data) < 5 or data[0] != 0x7E: + # return data + + # # 獲取數據長度 (不包括校驗和) + # length = (data[1] << 8) + data[2] + + # # 檢查幀完整性 + # if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和 + # return data + + # # 提取API標識符和數據 + # frame_type = data[3] + # frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中 + + # # 根據不同的幀類型進行處理 + # if frame_type == 0x90: # 例如,這是"接收數據包"類型 + # # 提取實際有效載荷 + # # 對於接收數據包,格式通常是: + # # API標識符(1) + 64位地址(8) + 16位地址(2) + 選項(1) + 數據 + # if len(frame_data) >= 12: # 確保數據長度足夠 + # payload = frame_data[11:] # 前11字節是地址和選項 + # return payload + # return data + + # 如果無法提取 則回傳 None + except Exception as e: + print(f"手動解析 XBee 數據錯誤: {e}") + return None + +async def main(): + # 先檢查串口是否可用 + if not check_serial_port(): + print("程式終止:串口檢查失敗") + return + + loop = asyncio.get_running_loop() + + # 設置終止處理 + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler( + sig, + lambda: asyncio.create_task(shutdown(loop)) + ) + + # 建立單一 UDP 端點處理收發 + udp_handler = UDPHandler() + + # 建立 UDP 傳輸,不指定接收端口,讓系統自動分配 + try: + udp_transport, protocol = await loop.create_datagram_endpoint( + lambda: udp_handler, + local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口 + ) + except Exception as e: + print(f"無法創建 UDP 端點:{str(e)}") + return + + # 獲取系統分配的本地端口 + sock = udp_transport.get_extra_info('socket') + local_addr = sock.getsockname() + print(f"UDP listening on {local_addr[0]}:{local_addr[1]}") + + # 建立 Serial 傳輸,將 UDP 處理器傳給它 + try: + serial_proto = SerialToUDP(udp_handler) + _, serial_transport = await serial_asyncio.create_serial_connection( + loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE + ) + except Exception as e: + print(f"無法建立串口連接:{str(e)}") + udp_transport.close() + return + + print(f"Waiting for data from serial port to initiate UDP communication...") + + # 保持運行 + try: + await asyncio.Future() + except asyncio.CancelledError: + pass + +async def shutdown(loop): + """關閉程序""" + print("Shutting down...") + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + + for task in tasks: + task.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("程式被使用者中斷") + except Exception as e: + print(f"程式執行錯誤:{str(e)}") \ No newline at end of file diff --git a/src/unitdev02/unitdev02/server.py b/src/unitdev02/unitdev02/server.py deleted file mode 100644 index 148edc5..0000000 --- a/src/unitdev02/unitdev02/server.py +++ /dev/null @@ -1,33 +0,0 @@ -import socket -import os - -# socket file path -SOCKET_PATH = "/tmp/unix_socket_example" - -# 若檔案存在就先刪除 -if os.path.exists(SOCKET_PATH): - os.remove(SOCKET_PATH) - -# 建立 socket -server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - -# 綁定 socket 到檔案 -server.bind(SOCKET_PATH) -server.listen(1) - -print("🔌 Server waiting for connection...") - -# 等待 client 連線 -conn, _ = server.accept() -print("✅ Client connected.") - -while True: - data = conn.recv(1024) - if not data: - break - print("📥 Received:", data.decode()) - conn.sendall(b"Echo: " + data) - -conn.close() -server.close() -os.remove(SOCKET_PATH) diff --git a/src/unitdev02/unitdev02/client.py b/src/unitdev02/unitdev02/unix_client.py similarity index 100% rename from src/unitdev02/unitdev02/client.py rename to src/unitdev02/unitdev02/unix_client.py diff --git a/src/unitdev02/unitdev02/unix_server.py b/src/unitdev02/unitdev02/unix_server.py new file mode 100644 index 0000000..30ef524 --- /dev/null +++ b/src/unitdev02/unitdev02/unix_server.py @@ -0,0 +1,94 @@ +import socket +import os + +# # socket file path +# SOCKET_PATH = "/tmp/unix_socket_example" + +# # 若檔案存在就先刪除 +# if os.path.exists(SOCKET_PATH): +# os.remove(SOCKET_PATH) + +# # 建立 socket +# server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + +# # 綁定 socket 到檔案 +# server.bind(SOCKET_PATH) +# server.listen(1) + +# print("🔌 Server waiting for connection...") + +# # 等待 client 連線 +# conn, _ = server.accept() +# print("✅ Client connected.") + +# while True: +# data = conn.recv(1024) +# if not data: +# break +# print("📥 Received:", data.decode()) +# conn.sendall(b"Echo: " + data) + +# conn.close() +# server.close() +# os.remove(SOCKET_PATH) + + +# ===================== + +# from pymavlink import mavutil + + +# def create_unix_socket_connection(): +# # 建立一個 mavtcpin 實例 +# mav_conn = mavutil.mavtcpin("127.0.0.250:9999", source_system=1, source_component=1) + +# # 替換底層 socket +# # 關閉原有的 socket +# mav_conn.listen.close() + +# # 創建 Unix socket 並替換 +# unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) +# unix_socket_path = "/tmp/unix_socket_mavlink.sock" + +# # 確保先前的 unix socket 已被移除 +# if os.path.exists(unix_socket_path): +# os.remove(unix_socket_path) + +# # 綁定並設定 Unix socket +# unix_socket.bind(unix_socket_path) +# unix_socket.listen(1) +# unix_socket.setblocking(0) +# mavutil.set_close_on_exec(unix_socket.fileno()) + +# # 替換 listen socket +# mav_conn.listen = unix_socket +# mav_conn.fd = unix_socket.fileno() + +# # mav_conn.port = unix_socket + +# return mav_conn + + +# a = create_unix_socket_connection() + +# # print(a.port) + + + + +# =============================== + +import mavunixin +mav_conn = mavunixin.mavunixin("unix:/tmp/unix_socket_mavlink.sock", source_system=1, source_component=1) + +import time +print("🔌 Server waiting for connection...") +while True: + time.sleep(1) + mavlinkMsg = mav_conn.recv_msg() + if mavlinkMsg is not None: + print("📥 Received:", mavlinkMsg) + # a.send(mavlinkMsg) + # print("📤 Server replied:", data.decode()) + else: + print("No message received.") \ No newline at end of file