diff --git a/README.md b/README.md index 1cbbf37..ce0703a 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Python 2. conda-forge 中的 pyserial-asyncio 3. importlib_metadata -> Version: 8.5.0 4. setuptools -> Version: 58.2.0 (版本太新不行) +5. pyserial-asyncio ROS2 diff --git a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py index f15bad6..257f571 100644 --- a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py +++ b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py @@ -38,7 +38,8 @@ class PanelState: # 這邊是儲存關於 socket object 的資料 self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊 self.socket_info_single = {"socket_type": "", "socket_state": "", "bridge_msg_types": "", "return_msg_types": "", - "target_sockets": "", "primary_socket_id": "", "InfoReady": False} # 暫存單一 socket 的資訊 + "target_sockets": "", "primary_socket_id": "", "socket_connection_string": "", + "InfoReady": False} # 暫存單一 socket 的資訊 def intoSTART(self): self.panel_status = "Running" @@ -139,9 +140,9 @@ class ControlPanel: obj_menu = MenuNode(f"Socket #{socket_id}", f"連結口 {socket_id}", None, children=[ MenuNode("Info", "查看詳細資訊", "INSPECT_MAV_OBJECT"), MenuNode("Make Link", "建立轉發連結", "MAVOBJ_MAKE_LINK"), - MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"), + MenuNode("Cancel Link", "取消轉發連結", "MAVOBJ_CANCEL_LINK"), MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"), - MenuNode("Remove Target", "移除轉發目標(工程)", "MAVOBJ_REMOVE_TARGET"), + MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"), MenuNode("返回", "回到列表", "BACK"), ]) # 將 socket_id 附加到每個子選單項目上 @@ -183,9 +184,10 @@ class ControlPanel: # 這裡顯示基本資訊 dialog_win.addstr(2, 2, f"Socket ID : {socket_id}") - dialog_win.addstr(3, 2, f"Socket status : 運行中") + # dialog_win.addstr(3, 2, f"Socket status : 運行中") # show_str = ", ".join(map(str, state.socket_info_single.get('socket_type', ''))) dialog_win.addstr(4, 2, f"Socket Type : {state.socket_info_single.get('socket_type', '')}") + dialog_win.addstr(4, 30, f"{state.socket_info_single.get('socket_connection_string', '')}") show_str = ",".join(map(str, state.socket_info_single.get('bridge_msg_types', ''))) dialog_win.addstr(5, 2, f"Bridge Pack : {show_str if show_str else 'N/A'}") show_str = ",".join(map(str, state.socket_info_single.get('return_msg_types', ''))) @@ -397,7 +399,7 @@ class ControlPanel: # 操作說明 # help_line = start_line + len(current_menu.children) + 2 help_line = height - 2 - stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層 q退出", curses.A_DIM) + stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層", curses.A_DIM) stdscr.refresh() @@ -431,8 +433,12 @@ class ControlPanel: idx_stack[-1] = (current_idx + 1) % len(current_menu.children) elif ch == (ord('O')): - # 直接進入工程模式 + # 進入工程模式 state.intoENGINEER() + + elif ch == (ord('o')): + # 離開工程模式 + state.intoSTART() elif ch == curses.KEY_LEFT: # 返回上層 @@ -448,8 +454,9 @@ class ControlPanel: idx_stack.append(0) elif ch in (ord('q'), 27): - state.intoTERMINATION() - panel_shutdown() + if state.panel_status == "Engineer": + state.intoTERMINATION() + panel_shutdown() elif ch in (curses.KEY_ENTER, 10, 13): selected = current_menu.children[current_idx] @@ -537,22 +544,23 @@ class ControlPanel: # 反正刷新列表會出錯 乾脆再退一層 在下一次進入列表時刷新就好 menu_stack.pop() idx_stack.pop() - # # 刷新列表頁面 - # if len(menu_stack) > 1: - # current_page = menu_stack[-1].current_page if hasattr(menu_stack[-1], 'current_page') else 0 - # menu_stack.pop() - # idx_stack.pop() - # time.sleep(0.1) # 等待物件被移除 - # object_list_menu = self.create_object_list_menu(state, page=current_page) - # menu_stack.append(object_list_menu) - # idx_stack.append(0) elif selected.action == "MAVOBJ_MAKE_LINK": # 建立轉發連結 if hasattr(selected, 'socket_id'): target_id = self.select_target_socket(stdscr, selected.socket_id, state) if target_id is not None: - cmd_q.put(("MAVOBJ_MAKE_LINK", selected.socket_id, target_id)) + # cmd_q.put(("MAVOBJ_MAKE_LINK", selected.socket_id, target_id)) + cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id)) + cmd_q.put(("MAVOBJ_ADD_TARGET", target_id, selected.socket_id)) # 雙向連結 + + elif selected.action == "MAVOBJ_CANCEL_LINK": + # 取消轉發連結 + if hasattr(selected, 'socket_id'): + target_id = self.select_target_socket(stdscr, selected.socket_id, state, remove_mode=True) + if target_id is not None: + cmd_q.put(("MAVOBJ_REMOVE_TARGET", selected.socket_id, target_id)) + cmd_q.put(("MAVOBJ_REMOVE_TARGET", target_id, selected.socket_id)) # 雙向取消連結 elif selected.action == "MAVOBJ_ADD_TARGET": # 添加目標端口 @@ -563,16 +571,6 @@ class ControlPanel: target_id = self.select_target_socket(stdscr, selected.socket_id, state) if target_id is not None: cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id)) - - elif selected.action == "MAVOBJ_REMOVE_TARGET": - # 移除目標端口 - if state.panel_status != "Engineer": - state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) - continue # 只有在工程模式下才能操作 - if hasattr(selected, 'socket_id'): - target_id = self.select_target_socket(stdscr, selected.socket_id, state, remove_mode=True) - if target_id is not None: - cmd_q.put(("MAVOBJ_REMOVE_TARGET", selected.socket_id, target_id)) elif selected.action == "STOP_MANAGER": if state.panel_status != "Engineer": @@ -661,10 +659,6 @@ class Orchestrator: self.remove_target_from_object(s_id, socket_id) # 再移除該物件 self.delete_mavlink_object(socket_id) - elif action == "MAVOBJ_MAKE_LINK": - source_id, target_id = cmd[1], cmd[2] - self.add_target_to_object(source_id, target_id) - self.add_target_to_object(target_id, source_id) # 雙向連結 elif action == "MAVOBJ_ADD_TARGET": source_id, target_id = cmd[1], cmd[2] self.add_target_to_object(source_id, target_id) @@ -681,6 +675,9 @@ class Orchestrator: self.panelState.socket_info_single["return_msg_types"] = mav_obj.return_msg_types self.panelState.socket_info_single["primary_socket_id"] = mav_obj.primary_socket_id self.panelState.socket_info_single["target_sockets"] = mav_obj.target_sockets + ip_info = mav_obj.mavlink_socket.port.getsockname() + self.panelState.socket_info_single["socket_connection_string"] = f"{ip_info[0]}:{ip_info[1]}" + # getattr(mav_obj.mavlink_socket, "connection_string", "") self.panelState.socket_info_single["InfoReady"] = True # 標記資訊已準備好 elif cmd == "CREATE_UDP_INBOUND": diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index b4d747f..635f113 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -1,40 +1,35 @@ ''' -# 不同的匯流排只有單一種通訊協定 -# 匯流排接到訊息後透過 ring buffer 傳送到橋接器 -# 會有兩種 RingBuffer 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 -# 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去 +這個檔案是對 udp 進來的 mavlink 訊息做 "分流" "轉拋" 的地方 (並不會做 "分析") +概念上 把每個 udp 接口 視為一個獨立的 mavlink bus 針對 bus 來統籌管理 -# 關於 mavlink 採用 pymavlink 這個套件 製作匯流排 -# pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlink_object 裡面 -# 這邊的 main 會是用來初始化 mavlink_object 並啟動他的 run function +主要的重點部分: +1. stream_bridge_ring & return_packet_ring +2. mavlink_object & async_io_manager +3. mavlink_bridge -================= 改版記錄 ============================ -2025年 6月 20日 -1. mavlink_object 中 由於 Queue 的效能太差 會完全移除 - 其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現 - 而 multiplexingToSwap 會完全被移除代替方式下一條描述 -2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流 - (註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object) - 因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移 - 並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出 -3. mavlink_object 需要加上 state 去管理其狀態 -4. mavlink_object 需要加上 target port 去管理寫出的目標 -5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道 -6. 基於第1,2項 updateMultiplexingList 會被完全移除 -7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object -8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring 與 return_packet_ring 來取代 另外 swap_queues 會被完全移除 +stream_bridge_ring & return_packet_ring: + 這兩個 ring buffer 是用來做 mavlink 訊息的分流 + stream_bridge_ring 這邊的資訊比較是給會固定更新的資訊 (例如 HEARTBEAT, ATTITUDE 之類的) + return_packet_ring 比較是處理指令後的回應封包 (例如 PARAM_VALUE, MISSION_ITEM 等等) + +mavlink_object: + 每個 mavlink bus 都會有一個 mavlink_object + 使用 asyncio 處理資料流 用 RingBuffer 來分配訊息 + 內容中沒有獨立的執行緒 只有一個個 asyncio function 會被放到 async_io_manager 裡面執行 + + 關於分流與轉拋的具體實現是在 process_data 這個 asyncio function 裡面 + +async_io_manager: + 首先它紀律並管理所有 mavlink_object 實例 + 有自己一個獨立的執行緒 執行 asyncio loop (mavlink_object 裡面的 asyncio function 都會被放到這個 loop 裡面執行) + +mavlink_bridge: + 專門處理 stream_bridge_ring 裡面的訊息流 + 會把訊息流解開後 存放到 mavlinkVehicleView.py 定義的載具結構視圖 + -2025年 11月 15日 -1. mavlink_bridge 類別新增為 singleton 模式 確保全系統只有一個實例在運行 -2. mavlink_bridge 處理封包改為映射表 (需要在 _init_message_handlers 中新增處理器函式) -3. mavlink_bridge 的主要迴圈 增加 send_message 功能 可指定目標 sysid 或 socket_id 發送 mavlink 封包 -4. async_io_manager 循環邏輯大改動 優化 mavlink_object 加入與移除的邏輯 並使得 task 與 evenlt loop 分層更清楚 -5. mavlink_object 移除不必要的 start 與 stop 方法 由 async_io_manager 統一管理其生命週期 -6. mavlink_object 優化 send_message 方法 避免無效判斷 與 增加一些防呆檢驗 並與 mavlink_bridge 連動工作 -7. 移除迴圈內的 try except 堆疊 增加效能 -8. 移除對於 mavlinkDevice 的依賴 改用 vehicle_registry 來管理所有的載具 ''' @@ -61,7 +56,6 @@ from .mavlinkVehicleView import ( ) from .utils import RingBuffer, setup_logger - # ====================== 分割線 ===================== logger = setup_logger(os.path.basename(__file__)) @@ -331,8 +325,6 @@ class mavlink_bridge: mav_obj = mavlink_object.mavlinkObjects[socket_id] return mav_obj.send_message(message_bytes) -# ====================== 分割線 ===================== - # 定義 mavlink_object 的狀態 class MavlinkObjectState(Enum): INIT = auto() # 初始化狀態 @@ -560,6 +552,7 @@ class async_io_manager: start 方法 會先做一個新的執行緒 然後讓新的執行緒 透過 _run_event_loop 方法來建立一個空的事件循環 self.loop 然後在 _run_event_loop 方法中 會建立一個異步任務 _main_task 來監控和管理所有的 mavlink_object 任務 """ + _instance = None _lock = threading.Lock() @@ -714,7 +707,7 @@ class async_io_manager: async def _async_add_mavlink_object(self, mavlink_obj): """在事件循環線程中同步執行""" socket_id = mavlink_obj.socket_id - + try: task = asyncio.create_task(mavlink_obj.process_data()) self.managed_objects[socket_id] = mavlink_obj @@ -789,3 +782,36 @@ class async_io_manager: if __name__ == '__main__': pass + + +''' +================= 改版記錄 ============================ + +2025年 6月 20日 +1. mavlink_object 中 由於 Queue 的效能太差 會完全移除 + 其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現 + 而 multiplexingToSwap 會完全被移除代替方式下一條描述 +2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流 + (註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object) + 因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移 + 並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出 +3. mavlink_object 需要加上 state 去管理其狀態 +4. mavlink_object 需要加上 target port 去管理寫出的目標 +5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道 +6. 基於第1,2項 updateMultiplexingList 會被完全移除 +7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object +8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring 與 return_packet_ring 來取代 另外 swap_queues 會被完全移除 + + +2025年 11月 15日 +1. mavlink_bridge 類別新增為 singleton 模式 確保全系統只有一個實例在運行 +2. mavlink_bridge 處理封包改為映射表 (需要在 _init_message_handlers 中新增處理器函式) +3. mavlink_bridge 的主要迴圈 增加 send_message 功能 可指定目標 sysid 或 socket_id 發送 mavlink 封包 +4. async_io_manager 循環邏輯大改動 優化 mavlink_object 加入與移除的邏輯 並使得 task 與 evenlt loop 分層更清楚 +5. mavlink_object 移除不必要的 start 與 stop 方法 由 async_io_manager 統一管理其生命週期 +6. mavlink_object 優化 send_message 方法 避免無效判斷 與 增加一些防呆檢驗 並與 mavlink_bridge 連動工作 +7. 移除迴圈內的 try except 堆疊 增加效能 +8. 移除對於 mavlinkDevice 的依賴 改用 vehicle_registry 來管理所有的載具 + +''' + diff --git a/src/fc_network_adapter/fc_network_adapter/serialManager.py b/src/fc_network_adapter/fc_network_adapter/serialManager.py new file mode 100644 index 0000000..7de45bb --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/serialManager.py @@ -0,0 +1,499 @@ +''' + + +''' + +# 基礎功能的 import +import asyncio +import serial_asyncio + +import os +import sys +import serial +import signal +from enum import Enum, auto + +# # XBee 模組 +# from xbee.frame import APIFrame + +# 自定義的 import +from .utils import setup_logger + +# ====================== 分割線 ===================== + +logger = setup_logger(os.path.basename(__file__)) + +# ====================== 分割線 ===================== +class XBeeFrameHandler: + """XBee API Frame 處理器""" + + @staticmethod + def parse_at_command_response(frame: bytes) -> dict: + """解析 AT Command Response (0x88)""" + if len(frame) < 8: + return None + + frame_type = frame[3] + if frame_type != 0x88: + return None + + frame_id = frame[4] + at_command = frame[5:7] + status = frame[7] + data = frame[8:] if len(frame) > 8 else b'' + + return { + 'frame_id': frame_id, + 'command': at_command, + 'status': status, + 'data': data, + 'is_ok': status == 0x00 + } + + @staticmethod + def parse_receive_packet(frame: bytes) -> dict: + # """解析 RX Packet (0x90) - 未來擴展用""" + # if len(frame) < 15 or frame[3] != 0x90: + # return None + + # return { + # 'source_addr': frame[4:12], + # 'reserved': frame[12:14], + # 'options': frame[14], + # 'data': frame[15:-1] + # } + pass + + @staticmethod + def encapsulate_data(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: + """ + 將數據封裝為 XBee API 傳輸幀 + + 使用 XBee API 格式封裝數據: + - 傳輸請求幀 (0x10) + - 使用廣播地址 + - 添加適當的頭部和校驗和 + """ + frame_type = 0x10 + dest_addr16 = b'\xFF\xFE' + broadcast_radius = 0x00 + options = 0x00 + + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += dest_addr64 + dest_addr16 + frame += struct.pack(">BB", broadcast_radius, options) + data + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + + @staticmethod + def decapsulate_data(data: bytes): + # 這裡可以根據需要進行數據解封裝 + + # XBee API 幀格式: + # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節) + + # 檢查幀起始符 (0x7E) + if not data or len(data) < 5 or data[0] != 0x7E: + return data + + # 獲取數據長度 (不包括校驗和) + # length = (data[1] << 8) + data[2] + length = (data[1] << 8) | data[2] + + # 檢查幀完整性 + if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和 + return data + + # 提取API標識符和數據 + frame_type = data[3] + # frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中 + + # 根據不同的幀類型進行處理 + if frame_type == 0x90: # 例如,這是"接收數據包"類型 + rf_data_start = 3 + 12 + return data[rf_data_start:3 + length] + else: + return None + return data + + + +class ATCommandHandler: + """AT 指令回應處理器""" + + def __init__(self, serial_port: str): + self.serial_port = serial_port + self.handlers = { + b'DB': self._handle_rssi, + b'SH': self._handle_serial_high, + b'SL': self._handle_serial_low, + # 可擴展其他 AT 指令 + } + + def handle_response(self, response: dict): + """根據 AT 指令類型分派處理""" + if not response or not response['is_ok']: + if response: + print(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}") + return + + command = response['command'] + handler = self.handlers.get(command) + + if handler: + handler(response['data']) + else: + print(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}") + + def _handle_rssi(self, data: bytes): + """處理 DB (RSSI) 回應""" + if not data: + return + + rssi_value = data[0] + now = time.time() + + # 檢查是否最近有收到 MAVLink + last_mavlink_time = serial_last_mavlink_time.get(self.serial_port, 0) + if now - last_mavlink_time > 0.5: + print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLink,RSSI = -{rssi_value} dBm 已忽略") + return + + # 取得對應的 sysid + sysid = serial_to_sysid.get(self.serial_port) + if sysid is None: + print(f"[{self.serial_port}] 找不到 sysid 對應,RSSI = -{rssi_value} dBm,已忽略") + return + + # 記錄 RSSI + rssi_history[sysid].append(-rssi_value) + time_history[sysid].append(now) + # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm") + + def _handle_serial_high(self, data: bytes): + """處理 SH (Serial Number High) - 範例""" + if len(data) >= 4: + serial_high = int.from_bytes(data[:4], 'big') + print(f"[{self.serial_port}] Serial High: 0x{serial_high:08X}") + + def _handle_serial_low(self, data: bytes): + """處理 SL (Serial Number Low) - 範例""" + if len(data) >= 4: + serial_low = int.from_bytes(data[:4], 'big') + print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}") + + + +class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發 + def __init__(self, udp_handler, serial_port_str): + self.udp_handler = udp_handler # UDP 的傳輸把手 + self.serial_port_str = serial_port_str + self.at_handler = ATCommandHandler(serial_port) + + self.buffer = bytearray() # 用於緩存接收到的資料 + self.transport = None # Serial 自己的傳輸物件 + # self.first_data = True # 標記是否為第一次收到資料 + # self.has_processed = False # 測試模式用 處理數據旗標 # debug + + def connection_made(self, transport): + self.transport = transport + if hasattr(self.udp_handler, 'set_serial_handler'): + self.udp_handler.set_serial_handler(self) + logger.info(f"Serial port {self.serial_port_str} connected.") + + # Serial 收到資料的處理過程 + def data_received(self, data): + # 1. 把收到的資料加入緩衝區 + self.buffer.extend(data) + + # 2. 需要完整的 header 才能解析 + while len(self.buffer) >= 3: + # 3. 瞄準 XBee API Frame (0x7E 開頭的封包) + if self.buffer[0] != 0x7E: + self.buffer.pop(0) # 如果不是就丟掉 + continue + + # 4. 讀取 payload 長度 + length = (self.buffer[1] << 8) | self.buffer[2] + full_length = 3 + length + 1 + + # 5. 等待完整封包 + if len(self.buffer) < full_length: + break + + # 6. 提取完整 frame 並從緩衝區移除 + frame_payload = self.buffer[:full_length] + del self.buffer[:full_length] + + # 7. 判斷 frame 類型 + frame_type = frame[3] + + if frame_type == 0x88: + # 處理 AT Command 回應 + # response = XBeeFrameHandler.parse_at_command_response(frame) + # self.at_handler.handle_response(response) + pass # debug + + elif frame_type == 0x90: + # Receive Packet (RX) payload 先解碼 + processed_data = XBeeFrameHandler.decapsulate_data(bytes(frame_payload)) + # 轉換失敗就捨棄了 + if processed_data is None: + break + # 再透過 UDP 送出 + self.udp_handler.transport.sendto(processed_data, (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port)) + + else: + # 其他類型的 frame 未來可擴展處理 現在忽略 + logger.warning(f"[{self.serial_port_str}] Undefined frame type: 0x{frame_type:02X}") + + # # RSSI + # if frame[3] == 0x88 and frame[5:7] == b'DB': # frame[3] == 0x88 AT -> API 封包 + # # frame[5:7] == b'DB' -> API 封包的DB參數 + # status = frame[7] # + # if status == 0x00 and len(frame) > 8: # status == 0x00 -> 這個封包是有效封包 + # rssi_value = frame[8] + # now = time.time() + + # # === 優化 1:僅信任最近 0.5 秒內有接收 MAVLink 的 port + # last_time = serial_last_mavlink_time.get(self.serial_port, 0) + # if now - last_time <= 0.5: + # sysid = serial_to_sysid.get(self.serial_port, None) + # if sysid is not None: + # rssi_history[sysid].append(-rssi_value) + # time_history[sysid].append(now) + # # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm") + # else: + # print(f"[{self.serial_port}] 找不到 sysid 對應,RSSI = -{rssi_value} dBm,已忽略") + # else: + # print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLink,RSSI = -{rssi_value} dBm 已忽略") + # else: + # print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}") + + + # def write_to_serial(self, data): + # # 在資料透過 Serial 發送之前進行處理 + # processed_data = self.encapsulate_data(data) + + # # 處理失敗就不發送 + # if processed_data not None: + # self.transport.write(processed_data) + + + +class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發 + + LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端IP + + def __init__(self, target_port): + self.target_port = target_port # 目標 UDP 端口 + + self.serial_handler = None # Serial 的傳輸物件 + self.transport = None # UDP 自己的傳輸物件 + self.remote_addr = None # 儲存動態獲取的遠程地址 # debug + # self.has_processed = False # 測試模式用 處理數據旗標 # debug + + def connection_made(self, transport): + self.transport = transport + print("UDP transport ready. Waiting for serial data before sending...") + + def set_serial_handler(self, serial_handler): + self.serial_handler = serial_handler + + # UDP 收到資料的處理過程 + def datagram_received(self, data, addr): + # 儲存對方的地址(這樣就能向同一個來源回傳數據) + # self.remote_addr = addr + # print(f"Received UDP data from {addr}, setting as remote address") + + processed_data = XBeeFrameHandler.encapsulate_data(data) + + if self.serial_handler: + self.serial_handler.transport.write(processed_data) + + + # def send_udp(self, data): + # # 藉由 UDP 發送資料出去 + + # # 在透過 UDP 發送數據之前進行解封裝 + # decoded_data = self.decapsulate_data(data) + # if decoded_data is None: + # return + + # if self.transport: + # self.transport.sendto(decoded_data, (self.LOCAL_HOST_IP, self.target_port)) + +#================================================================== + +class SerialReceiverType(Enum): + """連接類型""" + TELEMETRY = auto() + XBEEAPI = auto() + OTHER = auto() + + +class serial_manager: + + class serial_object: + def __init__(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): + self.serial_port = serial_port # /dev/ttyUSB or COM3 ...etc + self.baudrate = baudrate + self.receiver_type = receiver_type + self.target_port = target_port # 指向的 UPD 端口 + + self.transport = None + self.protocol = None + self.udp_handler = None + self.serial_handler = None + + def __init__(self): + self.thread = None + self.loop = None + self.running = False + self.serial_count = 0 + self.serial_objects = {} # serial id num : serial object + + def __del__(self): + self.loop = None + self.thread = None + + def start(self): + + if self.running: + logger.warning("serial_manager already running") + return + + self.running = True + + # 啟動獨立線程 命名為 SerialManager + self.thread = threading.Thread( + target=self._run_event_loop, + name="SerialManager" + ) + self.thread.daemon = False # 不設為 daemon,確保正確關閉 + self.thread.start() + + # 等待 _run_event_loop 建立事件循環的物件 self.loop + start_timeout = 2.0 + start_time = time.time() + while not self.loop and time.time() - start_time < start_timeout: + time.sleep(0.1) + + # 檢查另一個執行緒有沒有成功建立事件循環物件 self.loop + if self.loop: + logger.info("serial_manager thread started <-") + return True + else: + logger.error("serial_manager failed to start") + return False + + def shutdown(self): + pass + + def _run_event_loop(self): + """在獨立線程中運行 asyncio 事件循環""" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + # # 為每個 serial_object 建立連接 + # for serial_obj in self.serial_objects: + # coro = serial_asyncio.create_serial_connection( + # self.loop, + # lambda: SerialProtocol(serial_obj.receiver_type), + # serial_obj.serial_port, + # baudrate=serial_obj.baudrate + # ) + # transport, protocol = self.loop.run_until_complete(coro) + # serial_obj.transport = transport + # serial_obj.protocol = protocol + + try: + self.loop.run_forever() + finally: + self.loop.close() + + def create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): + + if self.loop is None: + logger.error("Event loop not running, cannot create serial link") + return False + + # 檢查 serial port 有效 + self.check_serial_port(serial_port) + + serial_obj = self.serial_object(serial_port, baudrate, target_port, receiver_type) + + # 建立 UDP 處理器 並指定目標端口位置 + serial_obj.udp_handler = UDPHandler(target_port) + # 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配 + try: + serial_obj.transport, serial_obj.protocol = await self.loop.create_datagram_endpoint( + lambda: serial_obj.udp_handler, + local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口 + ) + except Exception as e: + logger.error(f"Cannot Create UDP Endpoint: {str(e)}") + return False + + # 建立 Serial 傳輸,將 UDP 處理器傳給它 + try: + serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler) + + _, serial_transport = await serial_asyncio.create_serial_connection( + self.loop, lambda: serial_obj.serial_handler, serial_port, baudrate=baudrate + ) + except Exception as e: + logger.error(f"Cannot Create Serial Connection: {str(e)}") + serial_obj.transport.close() + return + + # self.serial_objects.append(serial_obj) + self.serial_objects[serial_count+1] = serial_obj + serial_count += 1 + + async def _async_create_serial_link(self, serial_port, baudrate, target_port): + pass + + def remove_serial_link(serial_id): + pass + + async def _async_remove_serial_link(self, serial_id): + pass + + def check_serial_port(serial_port): + """檢查串口是否存在與可用""" + # 檢查設備是否存在 + if not os.path.exists(serial_port): + logger.error(f"Serial Device {serial_port} Not Found") + return False + + # 檢查是否有權限訪問設備 + try: + os.access(serial_port, os.R_OK | os.W_OK) + except Exception as e: + logger.error(f"Cannot Access Serial Device {serial_port}: {str(e)}") + return False + + # 檢查是否被占用 + try: + # 嘗試打開串口 + ser = serial.Serial(serial_port, SERIAL_BAUDRATE) + ser.close() # 打開成功後立即關閉 + return True + except serial.SerialException as e: + logger.error(f"Serial Device {serial_port} is Occupied or Inaccessible: {str(e)}") + return False + except Exception as e: + logger.error(f"Unknown Error: {str(e)}") + return False + + +if __main__ == '__main__': + sm = serial_manager() + sm.start() + + SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 + SERIAL_BAUDRATE = 115200 + UDP_REMOTE_IP = '127.0.0.1' + UDP_REMOTE_PORT = 14560 + sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_IP, SerialReceiverType.XBEEAPI) \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/socketManager.py b/src/fc_network_adapter/fc_network_adapter/socketManager.py deleted file mode 100644 index 52e8681..0000000 --- a/src/fc_network_adapter/fc_network_adapter/socketManager.py +++ /dev/null @@ -1,14 +0,0 @@ - -''' - -透過某個地方 得到 udp 或 uart 接口 -對於每個接口 視為一個獨立的物件 - -物件對於不同的接口 是為不同類型的物件 - -每個類型的物件 創建一個獨立的執行緒 來處理資料 -關於執行緒的實作 是寫在另一個模組 - -物件之間 也可以做資料轉換/轉拋 - -''' \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/utils/acquirePort.py b/src/fc_network_adapter/fc_network_adapter/utils/acquirePort.py new file mode 100644 index 0000000..0c6a873 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/utils/acquirePort.py @@ -0,0 +1,129 @@ +import socket +import random +import os + + +def get_used_ports(): + """ + 從 /proc/net/tcp 和 /proc/net/udp 讀取系統已占用的 port + 直接讀取 Linux 系統資訊,避免暴力嘗試 + + Returns: + set: 已被占用的 port 號集合 + """ + used_ports = set() + + # 讀取 TCP 占用的 port (包含 IPv4 和 IPv6) + for filepath in ['/proc/net/tcp', '/proc/net/tcp6']: + if os.path.exists(filepath): + try: + with open(filepath, 'r') as f: + lines = f.readlines()[1:] # 跳過標題行 + for line in lines: + parts = line.split() + if len(parts) > 1: + # local_address 格式: "0100007F:1F90" (hex) + local_addr = parts[1] + port_hex = local_addr.split(':')[1] + port = int(port_hex, 16) + used_ports.add(port) + except (IOError, PermissionError): + pass + + # 讀取 UDP 占用的 port (包含 IPv4 和 IPv6) + for filepath in ['/proc/net/udp', '/proc/net/udp6']: + if os.path.exists(filepath): + try: + with open(filepath, 'r') as f: + lines = f.readlines()[1:] # 跳過標題行 + for line in lines: + parts = line.split() + if len(parts) > 1: + local_addr = parts[1] + port_hex = local_addr.split(':')[1] + port = int(port_hex, 16) + used_ports.add(port) + except (IOError, PermissionError): + pass + + return used_ports + + +def is_port_available(port): + """ + 測試指定 port 是否可用 (TCP 和 UDP 都測試) + + Args: + port (int): 要測試的 port 號 + + Returns: + bool: True 表示可用,False 表示被占用 + """ + # 測試 TCP + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('', port)) + except OSError: + return False + + # 測試 UDP + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(('', port)) + except OSError: + return False + + return True + + +def find_available_port(start_port=1024, end_port=65535): + """ + 在指定的 port 區間內隨機找出一個未被占用的 port + 使用 Linux /proc/net 系統資訊來過濾已占用的 port,避免暴力嘗試 + 確保 TCP 和 UDP 都可用 + + Args: + start_port (int): 起始 port 號 (預設 1024) + end_port (int): 結束 port 號 (預設 65535) + + Returns: + int: 可用的 port 號,如果找不到則返回 None + """ + if start_port < 1 or end_port > 65535 or start_port >= end_port: + raise ValueError("Port 範圍必須在 1-65535 之間,且起始 port 必須小於結束 port") + + # 從系統讀取已占用的 port + used_ports = get_used_ports() + + # 計算可用的 port 列表 (排除已占用的) + available_ports = [p for p in range(start_port, end_port + 1) if p not in used_ports] + + if not available_ports: + return None + + # 隨機打亂順序 + random.shuffle(available_ports) + + # 從可用列表中挑選,再用 socket 雙重確認 TCP 和 UDP 都可用 + for port in available_ports: + if is_port_available(port): + return port + + # 如果都不可用 + return None + + +if __name__ == "__main__": + # 使用範例 + port = find_available_port(8000, 9000) + if port: + print(f"找到可用的 port: {port}") + else: + print("找不到可用的 port") + + # 自訂範圍範例 + port = find_available_port(10000, 20000) + if port: + print(f"在 10000-20000 範圍找到可用的 port: {port}") diff --git a/src/fc_network_adapter/tests/demo_integration.py b/src/fc_network_adapter/tests/demo_integration.py index 7546f65..11ac1d7 100644 --- a/src/fc_network_adapter/tests/demo_integration.py +++ b/src/fc_network_adapter/tests/demo_integration.py @@ -19,7 +19,7 @@ from ..fc_network_adapter import mavlinkVehicleView as mvv # ====================== 分割線 ===================== -test_item = 10 +test_item = 1 running_time = 3 @@ -31,7 +31,24 @@ print('test_item : ', test_item) 測試項 1X 表示 mavlink_object 的功能 測試連線的能力 ''' -if test_item == 10: +if test_item == 1: + print('===> Start of Program .Test ', test_item) + + connection_string="udp:127.0.0.1:14591" + mavlink_socket1 = mavutil.mavlink_connection(connection_string) + # mavlink_object1 = mo.mavlink_object(mavlink_socket1) + + time.sleep(1) + + print("mark A") + + # print("Socket IP:", mavlink_socket1.target_system) + print("Socket port:", mavlink_socket1.port.getsockname()) + + # print("=== ", dir(mavlink_socket1.port)) + + +elif test_item == 10: # 需要開啟一個 ardupilot 的模擬器 # 測試 mavlink_object 放入 ring buffer 的應用 print('===> Start of Program .Test ', test_item)