diff --git a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py index 257f571..0010a1b 100644 --- a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py +++ b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py @@ -20,7 +20,11 @@ from pymavlink import mavutil # 自定義的 import from . import mavlinkObject as mo +from . import serialManager as sm + from .utils import RingBuffer, setup_logger +from .utils import acquireSerial, acquirePort +from .utils.acquirePort import find_available_port @@ -32,11 +36,13 @@ class PanelState: termination_start_time = None self.mavlink_bridge_state = "Stopped" self.object_manager_state = "Stopped" + self.serial_manager_state = "Stopped" self.socket_object_list = [] self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息 # 這邊是儲存關於 socket object 的資料 self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊 + self.serial_info_temp = {"Port": "", "Baud": 115200, "CommunicationType": "", "Go2Middleware": True} # 暫存 Serial 設定資訊 self.socket_info_single = {"socket_type": "", "socket_state": "", "bridge_msg_types": "", "return_msg_types": "", "target_sockets": "", "primary_socket_id": "", "socket_connection_string": "", "InfoReady": False} # 暫存單一 socket 的資訊 @@ -122,7 +128,7 @@ class ControlPanel: return user_input - def create_object_list_menu(self, state: PanelState, page=0, items_per_page=5): + def create_object_list_menu(self, state: PanelState, page=0, items_per_page=8): """動態創建 mavlink_object 列表選單(支持分頁)""" children = [] @@ -166,7 +172,56 @@ class ControlPanel: menu = MenuNode("Object List", f"連結口列表 (第 {page + 1} 頁)", children=children) menu.current_page = page return menu - + + def create_serial_port_menu(self, state: PanelState, page=0, items_per_page=8): + """動態創建 serial port 列表選單(支持分頁)""" + children = [] + + # 獲取可用的 Serial 連接埠列表 + # serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓 + serial_ports = acquireSerial.get_serial_ports_with_filter('/dev/ttyUSB*') + + if not serial_ports: + children.append(MenuNode("(空)", "目前沒有串口設備", None)) + else: + total_items = len(serial_ports) + total_pages = (total_items + items_per_page - 1) // items_per_page + start_idx = page * items_per_page + end_idx = min(start_idx + items_per_page, total_items) + + # 顯示當前頁的串口 + for port in serial_ports[start_idx:end_idx]: + port_menu = MenuNode(f"{port}", children=[ + MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[ + MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"), + MenuNode("Xbee(AT-AT)", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"), + ]), + MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"), + MenuNode("Create", "建立此串口", "CREATE_SERIAL_PORT"), + MenuNode("返回", "回到列表", "BACK"), + ]) + # 將 port 附加到每個子選單項目上 + for child in port_menu.children: + child.port = port + children.append(port_menu) + + # 添加分頁控制 + if total_pages > 1: + children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) + if page > 0: + prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") + prev_node.page = page - 1 + children.append(prev_node) + if page < total_pages - 1: + next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") + next_node.page = page + 1 + children.append(next_node) + + children.append(MenuNode("返回", "回到上層選單", "BACK")) + menu = MenuNode("Serial Port List", f"串口列表 (第 {page + 1} 頁)", children=children) + menu.current_page = page + return menu + def show_object_info(self, stdscr, socket_id, state: PanelState): """顯示物件詳細資訊的對話框""" height, width = stdscr.getmaxyx() @@ -280,12 +335,14 @@ class ControlPanel: MenuNode("Port(Target)", "設定目標的 Port", "TEXT_UDP_PORT"), MenuNode("Create", "建立 UDP OutBound 連結口", "CREATE_UDP_OUTBOUND"), ]), + MenuNode("Serial InBound", action = "LIST_SERIAL_RES"), ]), MenuNode("ListAll", "顯示並管理所有連結口", "LIST_MAV_OBJECT"), ]), MenuNode("Engineer Mode", "工程模式", children=[ - MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), #TODO: 尚未實作 - MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), #TODO: 尚未實作 + MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), + MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), + MenuNode("Stop Serial M.", "停止 Serial 端口轉接", "STOP_SERIAL_MANAGER"), ]), MenuNode("Shutdown", "關閉整個系統", children=[ MenuNode("Return", "繼續運行", "BACK"), @@ -304,11 +361,11 @@ class ControlPanel: curses.echo() curses.endwin() - def panel_shutdown(): + def pre_panel_shutdown(): # 先關閉所有模組 再關閉面板 cmd_q.put("SHUTDOWN_BRIDGE") cmd_q.put("SHUTDOWN_MANAGER") - + cmd_q.put("SHUTDOWN_SERIAL_MANAGER") def draw_menu(screen): nonlocal stdscr @@ -325,9 +382,6 @@ class ControlPanel: state.intoSTART() # 設定狀態為運行中 while not stop_evt.is_set(): - # 檢查是否需要退出 - if stop_evt.is_set(): - break current_menu = menu_stack[-1] current_idx = idx_stack[-1] @@ -335,20 +389,27 @@ class ControlPanel: # 獲取終端機尺寸 height, width = stdscr.getmaxyx() # 簡單暴力的限制視窗的大小 - if height < 20 or width < 60: + MIN_HEIGHT = ( + 2 + # 邊界 + 6 + # 狀態列 操作說明列 一個空白 + 11+ # 最大選單 與 空白區 + 5 # 訊息區域 + ) + if height < MIN_HEIGHT or width < 60: logger.error("Terminal size too small for Control Panel.") break stdscr.clear() - stdscr.border() + + # 更新模組狀態顯示 stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD) stdscr.addstr(1, 2, f" Panel Status : {state.panel_status}") - stdscr.addstr(2, 2, f"mavlink Bridge State : {state.mavlink_bridge_state}") - stdscr.addstr(3, 2, f"Object Manager State : {state.object_manager_state}") + stdscr.addstr(2, 2, f"Object Manager State : {state.object_manager_state}") + stdscr.addstr(3, 2, f"Mavlink Bridge State : {state.mavlink_bridge_state}") stdscr.addstr(4, 2, f"Socket Object number : {len(state.socket_object_list)}") + stdscr.addstr(2, 36, f"Serial Manager State : {state.serial_manager_state}") - # # 更新模組狀態顯示 # stdscr.addstr(2, 25, f"{state.mavlink_bridge_state}") # stdscr.addstr(3, 25, f"{state.object_manager_state}") # stdscr.addstr(4, 25, f"{len(state.socket_object_list)} ") @@ -363,6 +424,10 @@ class ControlPanel: desc = f"{child.desc} [{state.udp_info_temp['IP']}]" elif child.action == "TEXT_UDP_PORT" and state.udp_info_temp["Port"]: desc = f"{child.desc} [{state.udp_info_temp['Port']}]" + elif child.action == "SET_SERIAL_COMM" and state.serial_info_temp["CommunicationType"]: + desc = f"{child.desc} [{state.serial_info_temp['CommunicationType']}]" + elif child.action == "TEXT_BAUD_SERIAL" and state.serial_info_temp["Baud"]: + desc = f"{child.desc} [{state.serial_info_temp['Baud']}]" line = f"{marker}{child.name:15s} – {desc}" attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL @@ -404,25 +469,29 @@ class ControlPanel: stdscr.refresh() # 若進入 TERMINATION 狀態,畫面可以刷新 但是不能操作 - # 驗證 bridge 跟 manager 狀態 兩者都停止後 就進入 STOPPED 狀態並跳出迴圈 + # 驗證 其他附屬模組的狀態都停止後 就進入 STOPPED 狀態並跳出迴圈 # 超過幾秒沒有反應就強制關閉 if state.panel_status == "Terminating": - if time.time() - state.termination_start_time > 3: + if time.time() - state.termination_start_time > 7: # 其他組件設定5秒 這邊給多一點 logger.warning("Control Panel forced shutdown after timeout.") state.intoSTOPPED() - stop_evt.set() - continue + # stop_evt.set() + # continue + break time.sleep(0.1) - if state.mavlink_bridge_state == "Stopped" and state.object_manager_state == "Stopped": + if (state.mavlink_bridge_state == "Stopped" and + state.object_manager_state == "Stopped" and + state.serial_manager_state == "Stopped"): state.intoSTOPPED() - stop_evt.set() + # stop_evt.set() + break continue # 設定短暫的 timeout,讓執行緒能夠響應 stop_evt - stdscr.timeout(100) # 100ms timeout + stdscr.timeout(100) ch = stdscr.getch() - if ch == -1: # timeout,繼續檢查 stop_evt + if ch == -1: # 沒有操作 continue # 處理按鍵 @@ -456,7 +525,7 @@ class ControlPanel: elif ch in (ord('q'), 27): if state.panel_status == "Engineer": state.intoTERMINATION() - panel_shutdown() + pre_panel_shutdown() elif ch in (curses.KEY_ENTER, 10, 13): selected = current_menu.children[current_idx] @@ -473,7 +542,7 @@ class ControlPanel: elif selected.action == "QUIT": state.intoTERMINATION() - panel_shutdown() + pre_panel_shutdown() elif selected.action == "TEXT_UDP_IP": result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 IP 位址: ") @@ -484,47 +553,105 @@ class ControlPanel: result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 Port: ") if result is not None: state.udp_info_temp["Port"] = result - + elif selected.action == "CREATE_UDP_INBOUND": cmd_q.put("CREATE_UDP_INBOUND") # 確認後回到上兩層 if len(menu_stack) > 1: menu_stack.pop() idx_stack.pop() - menu_stack.pop() - idx_stack.pop() + # menu_stack.pop() + # idx_stack.pop() elif selected.action == "CREATE_UDP_OUTBOUND": cmd_q.put("CREATE_UDP_OUTBOUND") # 確認後回到上兩層 + if len(menu_stack) > 1: + menu_stack.pop() + idx_stack.pop() + # menu_stack.pop() + # idx_stack.pop() + + elif selected.action == "TEXT_BAUD_SERIAL": + result = ControlPanel.input_dialog(stdscr, "請輸入 Baud Rate (e.g., 9600, 115200): ") + if result is not None: + try: + baud_rate = int(result) + except ValueError: + state.panel_info_msg_list.append(("Invalid Baud Rate input.", time.time())) + state.serial_info_temp["Baud"] = baud_rate + + elif selected.action == "SET_SERIAL_COMM_XBEE": + state.serial_info_temp["CommunicationType"] = "XBee(API-AT)" + menu_stack.pop() + idx_stack.pop() + elif selected.action == "SET_SERIAL_COMM_TELEMETRY": + state.serial_info_temp["CommunicationType"] = "XBee(AT-AT)" + menu_stack.pop() + idx_stack.pop() + + elif selected.action == "CREATE_SERIAL_PORT": + state.serial_info_temp["Port"] = menu_stack[-1].name # 從選單取得 Port 名稱 + cmd_q.put("CREATE_SERIAL_PORT") + # 確認後回到上兩層 if len(menu_stack) > 1: menu_stack.pop() idx_stack.pop() menu_stack.pop() idx_stack.pop() + + elif selected.action == "LIST_SERIAL_RES": + created_list_menu = self.create_serial_port_menu(state, page=0) + menu_stack.append(created_list_menu) + idx_stack.append(0) elif selected.action == "LIST_MAV_OBJECT": # 動態生成 mavlink_object 列表選單 - object_list_menu = self.create_object_list_menu(state, page=0) - menu_stack.append(object_list_menu) + created_list_menu = self.create_object_list_menu(state, page=0) + menu_stack.append(created_list_menu) idx_stack.append(0) - elif selected.action == "PREV_PAGE": - # 上一頁 - if hasattr(selected, 'page'): - menu_stack.pop() - idx_stack.pop() - object_list_menu = self.create_object_list_menu(state, page=selected.page) - menu_stack.append(object_list_menu) - idx_stack.append(0) + # elif selected.action == "PREV_PAGE": + # # 上一頁 + # if hasattr(selected, 'page'): + # menu_stack.pop() + # idx_stack.pop() + # if menu_stack[-1].name == "Serial Port List": + # created_list_menu = self.create_serial_port_menu(state, page=selected.page) + # elif menu_stack[-1].name == "Object List": + # created_list_menu = self.create_object_list_menu(state, page=selected.page) + # menu_stack.append(created_list_menu) + # idx_stack.append(0) - elif selected.action == "NEXT_PAGE": - # 下一頁 + # elif selected.action == "NEXT_PAGE": + # # 下一頁 + # if hasattr(selected, 'page'): + # menu_stack.pop() + # idx_stack.pop() + # if menu_stack[-1].name == "Serial Port List": + # created_list_menu = self.create_serial_port_menu(state, page=selected.page) + # elif menu_stack[-1].name == "Object List": + # created_list_menu = self.create_object_list_menu(state, page=selected.page) + # menu_stack.append(created_list_menu) + # idx_stack.append(0) + elif selected.action in ("PREV_PAGE", "NEXT_PAGE"): if hasattr(selected, 'page'): + current_list_menu = menu_stack[-1] menu_stack.pop() idx_stack.pop() - object_list_menu = self.create_object_list_menu(state, page=selected.page) - menu_stack.append(object_list_menu) + + # 依據選單種類 重新建立分頁 + if current_list_menu.name == "Serial Port List": + created_list_menu = self.create_serial_port_menu(state, page=selected.page) + elif current_list_menu.name == "Object List": + created_list_menu = self.create_object_list_menu(state, page=selected.page) + else: + # 不支援的選單類型,回到原本的選單 + menu_stack.append(current_list_menu) + idx_stack.append(0) + continue + + menu_stack.append(created_list_menu) idx_stack.append(0) elif selected.action == "INSPECT_MAV_OBJECT": @@ -583,6 +710,13 @@ class ControlPanel: state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) continue # 只有在工程模式下才能操作 cmd_q.put("SHUTDOWN_BRIDGE") + + elif selected.action == "STOP_SERIAL_MANAGER": + if state.panel_status != "Engineer": + state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) + continue # 只有在工程模式下才能操作 + cmd_q.put("SHUTDOWN_SERIAL_MANAGER") + elif callable(selected.action): # 執行函式 cmd_q.put(selected.action) @@ -592,14 +726,13 @@ class ControlPanel: except KeyboardInterrupt: pass finally: - stop_evt.set() cleanup() class Orchestrator: def __init__(self, stop_sig): - self.stop_evt = stop_sig + self.stop_evt = stop_sig # 外部操作去中斷 "面板" 執行緒的訊號 (內部自己停止的話不需要用這個) # === 1) 面板部分的準備 === self.cmd_q = queue.Queue() @@ -613,6 +746,9 @@ class Orchestrator: self.manager = mo.async_io_manager() self.bridge = mo.mavlink_bridge() + # === 3) serial_manager 部分的準備 === + self.plumber = sm.serial_manager() + def engageWholeSystem(self): """啟動整個系統""" # === 1) 面板部分的啟動 === @@ -623,10 +759,14 @@ class Orchestrator: self.manager.start() self.bridge.start() + # === 3) serial_manager 部分的啟動 === + self.plumber.start() + def mainLoop(self): logger.info("Main orchestrator started <-") try: - while not self.stop_evt.is_set(): + # while not self.stop_evt.is_set(): + while self.panel_thread.is_alive(): # A. 更新模組狀態 if self.manager.running: @@ -642,6 +782,11 @@ class Orchestrator: else: self.panelState.mavlink_bridge_state = 'Stopped' + if self.plumber.running: + self.panelState.serial_manager_state = 'Running' + else: + self.panelState.serial_manager_state = 'Stopped' + # 取出面板丟過來的「動作」 try: cmd = self.cmd_q.get_nowait() @@ -686,10 +831,14 @@ class Orchestrator: elif cmd == "CREATE_UDP_OUTBOUND": self.panelState.udp_info_temp["direction"] = "outbound" self.create_udp_object() + elif cmd == "CREATE_SERIAL_PORT": + self.create_serial_port_object() elif cmd == "SHUTDOWN_BRIDGE": self.bridge.stop() elif cmd == "SHUTDOWN_MANAGER": self.manager.shutdown() + elif cmd == "SHUTDOWN_SERIAL_MANAGER": + self.plumber.shutdown() except queue.Empty: pass except Exception as e: @@ -703,17 +852,31 @@ class Orchestrator: except Exception as e: logger.error(f"Unexpected error in main loop: {e}") finally: - logger.info("Main orchestrator END!") - # 關閉 mavlink_bridge (裡面有一個執行緒) - self.bridge.stop() + # 驗證並確保所有模組都被下達關閉訊號 + # 若是由面板操作結束系統 這些關閉行為將於 ControlPanel.pre_panel_shutdown() 觸發 + if self.bridge.thread.is_alive(): + if self.bridge.running: + self.bridge.stop() + self.bridge.thread.join(timeout=2) + + if self.manager.thread.is_alive(): + if self.manager.running: + self.manager.shutdown() + self.manager.thread.join(timeout=2) - # 關閉 async_io_manager (裡面有一個執行緒) - self.manager.shutdown() + if self.plumber.thread.is_alive(): + if self.plumber.running: + self.plumber.shutdown() + self.plumber.thread.join(timeout=2) # 關閉面板執行緒 if self.panel_thread.is_alive(): self.panel_thread.join(timeout=2) + + logger.info("Main orchestrator END!") + + # =============== 面板動作 - Mavlink Object =============== def create_udp_object(self): if self.panelState.udp_info_temp["direction"] == "inbound": @@ -761,6 +924,46 @@ class Orchestrator: else: self.panelState.panel_info_msg_list.append((f"Fail Removing target {target_id} from socket {source_id}", time.time())) + # =============== 面板動作 - Serial Manager =============== + + def create_serial_port_object(self): + # 獲取可用的 udp port + udp_port_tmp = find_available_port(19000, 20000) + + # 定義通訊類型映射表 + COMM_TYPE_MAP = { + "XBee(API-AT)": sm.CommunicationType.XBee_API_AT, + # "XBee(AT-AT)": sm.CommunicationType.XBee_AT_AT, # TODO: 之後再弄 + # 新增區 + } + + # 驗證輸入 + comm_type = self.panelState.serial_info_temp['CommunicationType'] + if not comm_type: + self.panelState.panel_info_msg_list.append( + ("Please select Communication Type first.", time.time()) + ) + return + + # 查找對應的通訊類型 + comm_type_tmp = COMM_TYPE_MAP.get(comm_type) + if comm_type_tmp is None: + self.panelState.panel_info_msg_list.append( + (f"Communication type '{comm_type}' not supported yet.", time.time()) + ) + return + + ret = self.plumber.create_serial_port( + port=self.panelState.serial_info_temp['Port'], + baudrate=self.panelState.serial_info_temp['Baud'], + target_port=udp_port_tmp, + communication_type=comm_type_tmp, + ) + + if not ret: + self.panelState.panel_info_msg_list.append((f"Failed to create Serial Port object at {self.panelState.serial_info_temp['Port']}.", time.time())) + return + def main(): stop_evt = threading.Event() diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index 635f113..71760a0 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -128,7 +128,7 @@ class mavlink_bridge: """停止 mavlink_bridge 的運作""" self.running = False if self.thread and self.thread.is_alive(): - self.thread.join(timeout=3.0) + self.thread.join(timeout=5.0) # === Thread 區塊 === def _run_thread(self): @@ -632,12 +632,10 @@ class async_io_manager: # 等待線程結束 if self.thread and self.thread.is_alive(): - self.thread.join(timeout=10.0) + self.thread.join(timeout=5.0) if self.thread.is_alive(): logger.warning("async_io_manager thread did not stop gracefully") os.kill(os.getpid(), signal.SIGTERM) # 強制終止程序 - - logger.info("async_io_manager thread END!") diff --git a/src/fc_network_adapter/fc_network_adapter/serialManager.py b/src/fc_network_adapter/fc_network_adapter/serialManager.py index 7de45bb..1cf79e0 100644 --- a/src/fc_network_adapter/fc_network_adapter/serialManager.py +++ b/src/fc_network_adapter/fc_network_adapter/serialManager.py @@ -11,6 +11,9 @@ import os import sys import serial import signal +import time +import threading +import struct from enum import Enum, auto # # XBee 模組 @@ -63,9 +66,10 @@ class XBeeFrameHandler: # 'data': frame[15:-1] # } pass + return None @staticmethod - def encapsulate_data(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: + def encapsulate_data(data: bytes, dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', frame_id = 0x01) -> bytes: """ 將數據封裝為 XBee API 傳輸幀 @@ -117,7 +121,6 @@ class XBeeFrameHandler: return data - class ATCommandHandler: """AT 指令回應處理器""" @@ -171,16 +174,18 @@ class ATCommandHandler: # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm") def _handle_serial_high(self, data: bytes): - """處理 SH (Serial Number High) - 範例""" - if len(data) >= 4: - serial_high = int.from_bytes(data[:4], 'big') - print(f"[{self.serial_port}] Serial High: 0x{serial_high:08X}") + # """處理 SH (Serial Number High) - 範例""" + # if len(data) >= 4: + # serial_high = int.from_bytes(data[:4], 'big') + # print(f"[{self.serial_port}] Serial High: 0x{serial_high:08X}") + pass def _handle_serial_low(self, data: bytes): - """處理 SL (Serial Number Low) - 範例""" - if len(data) >= 4: - serial_low = int.from_bytes(data[:4], 'big') - print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}") + # """處理 SL (Serial Number Low) - 範例""" + # if len(data) >= 4: + # serial_low = int.from_bytes(data[:4], 'big') + # print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}") + pass @@ -188,7 +193,7 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial def __init__(self, udp_handler, serial_port_str): self.udp_handler = udp_handler # UDP 的傳輸把手 self.serial_port_str = serial_port_str - self.at_handler = ATCommandHandler(serial_port) + self.at_handler = ATCommandHandler(serial_port_str) self.buffer = bytearray() # 用於緩存接收到的資料 self.transport = None # Serial 自己的傳輸物件 @@ -199,7 +204,7 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial self.transport = transport if hasattr(self.udp_handler, 'set_serial_handler'): self.udp_handler.set_serial_handler(self) - logger.info(f"Serial port {self.serial_port_str} connected.") + # logger.info(f"Serial port {self.serial_port_str} connected.") # debug # Serial 收到資料的處理過程 def data_received(self, data): @@ -222,27 +227,30 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial break # 6. 提取完整 frame 並從緩衝區移除 - frame_payload = self.buffer[:full_length] + an_frame = self.buffer[:full_length] del self.buffer[:full_length] # 7. 判斷 frame 類型 - frame_type = frame[3] + frame_type = an_frame[3] if frame_type == 0x88: # 處理 AT Command 回應 - # response = XBeeFrameHandler.parse_at_command_response(frame) + # response = XBeeFrameHandler.parse_at_command_response(an_frame) # self.at_handler.handle_response(response) pass # debug elif frame_type == 0x90: # Receive Packet (RX) payload 先解碼 - processed_data = XBeeFrameHandler.decapsulate_data(bytes(frame_payload)) + processed_data = XBeeFrameHandler.decapsulate_data(bytes(an_frame)) # 轉換失敗就捨棄了 if processed_data is None: - break + continue # 再透過 UDP 送出 self.udp_handler.transport.sendto(processed_data, (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port)) + elif frame_type == 0x8B: + pass + else: # 其他類型的 frame 未來可擴展處理 現在忽略 logger.warning(f"[{self.serial_port_str}] Undefined frame type: 0x{frame_type:02X}") @@ -295,7 +303,7 @@ class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處 def connection_made(self, transport): self.transport = transport - print("UDP transport ready. Waiting for serial data before sending...") + # logger.info(f"UDP transport ready. Waiting for serial data before sending.") # debug def set_serial_handler(self, serial_handler): self.serial_handler = serial_handler @@ -328,7 +336,7 @@ class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處 class SerialReceiverType(Enum): """連接類型""" TELEMETRY = auto() - XBEEAPI = auto() + XBEEAPI2AT = auto() OTHER = auto() @@ -388,7 +396,30 @@ class serial_manager: return False def shutdown(self): - pass + """停止 serial_manager 和其管理的所有 serial_object""" + # 自己在 running 狀態下才執行停止程序 + if not self.running: + logger.warning("serial_manager is not running") + return + + # 停止所有被管理的 serial_object + for serial_id in list(self.serial_objects.keys()): + self.remove_serial_link(serial_id) + + # 停止自己 + self.running = False + + # 解開事件循環的阻塞 + if self.loop: + self.loop.call_soon_threadsafe(self.loop.stop) + + # 等待線程結束 + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=5.0) + if self.thread.is_alive(): + logger.warning("serial_manager thread did not stop gracefully") + + logger.info("serial_manager thread END!") def _run_event_loop(self): """在獨立線程中運行 asyncio 事件循環""" @@ -414,53 +445,142 @@ class serial_manager: def create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): - if self.loop is None: + if not self.running or not self.loop: logger.error("Event loop not running, cannot create serial link") return False # 檢查 serial port 有效 - self.check_serial_port(serial_port) + if not self.check_serial_port(serial_port, baudrate): + logger.error(f"Serial port {serial_port} validation failed") + return False - serial_obj = self.serial_object(serial_port, baudrate, target_port, receiver_type) + # 使用 run_coroutine_threadsafe 執行協程並獲取結果 + future = asyncio.run_coroutine_threadsafe( + self._async_create_serial_link(serial_port, baudrate, target_port, receiver_type), + self.loop + ) - # 建立 UDP 處理器 並指定目標端口位置 - serial_obj.udp_handler = UDPHandler(target_port) - # 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配 try: - serial_obj.transport, serial_obj.protocol = await self.loop.create_datagram_endpoint( - lambda: serial_obj.udp_handler, - local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口 - ) + # 等待結果,設定合理的超時時間 + result = future.result(timeout=5.0) + if result: + logger.info(f"Create Serial Link: {serial_port} -> UDP {target_port}") + return True + except asyncio.TimeoutError: + logger.error(f"Timeout creating serial link for {serial_port}") + return False except Exception as e: - logger.error(f"Cannot Create UDP Endpoint: {str(e)}") + logger.error(f"Failed to create serial link for {serial_port}: {e}") return False - # 建立 Serial 傳輸,將 UDP 處理器傳給它 + + async def _async_create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): + """在事件循環線程中執行實際的連接創建""" try: - serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler) + # 創建 serial_object 實例 + serial_obj = self.serial_object(serial_port, baudrate, target_port, receiver_type) - _, serial_transport = await serial_asyncio.create_serial_connection( - self.loop, lambda: serial_obj.serial_handler, serial_port, baudrate=baudrate + # 建立 UDP 處理器並指定目標端口位置 + serial_obj.udp_handler = UDPHandler(target_port) + + # 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配 + udp_transport, udp_protocol = await self.loop.create_datagram_endpoint( + lambda: serial_obj.udp_handler, + local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口 ) - except Exception as e: - logger.error(f"Cannot Create Serial Connection: {str(e)}") - serial_obj.transport.close() - return + serial_obj.transport = udp_transport + serial_obj.protocol = udp_protocol + + # logger.info(f"UDP endpoint created for {serial_port}") # debug - # self.serial_objects.append(serial_obj) - self.serial_objects[serial_count+1] = serial_obj - serial_count += 1 + # 建立 Serial 處理器,將 UDP 處理器傳給它 + serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler, serial_port) - async def _async_create_serial_link(self, serial_port, baudrate, target_port): - pass + # 建立 Serial 連接 + serial_transport, _ = await serial_asyncio.create_serial_connection( + self.loop, + lambda: serial_obj.serial_handler, + serial_port, + baudrate=baudrate + ) + + # logger.info(f"Serial connection created for {serial_port}") # debug - def remove_serial_link(serial_id): - pass + # 將 serial_object 加入管理列表 + serial_id = self.serial_count + 1 + self.serial_objects[serial_id] = serial_obj + self.serial_count += 1 + + # logger.info(f"Serial object {serial_id} added to manager") # debug + return True + + except Exception as e: + logger.error(f"Exception in _async_create_serial_link for {serial_port}: {str(e)}") + # 清理已創建的資源 + if 'serial_obj' in locals(): + if hasattr(serial_obj, 'transport') and serial_obj.transport: + serial_obj.transport.close() + return False + + def remove_serial_link(self, serial_id): + """移除串口連接(線程安全方式)""" + # 確保事件循環正在運行 + if not self.loop: + logger.error("Event loop not running") + return False + + # 檢查 serial_id 是否存在 + if serial_id not in self.serial_objects: + logger.warning(f"Serial object {serial_id} not found") + return False + + # 使用 run_coroutine_threadsafe 執行協程 + future = asyncio.run_coroutine_threadsafe( + self._async_remove_serial_link(serial_id), + self.loop + ) + + try: + result = future.result(timeout=3.0) + if result: + logger.info(f"Remove Serial Link {serial_id}") + return result + except asyncio.TimeoutError: + logger.error(f"Timeout removing serial link {serial_id}") + return False + except Exception as e: + logger.error(f"Failed to remove serial link {serial_id}: {e}") + return False async def _async_remove_serial_link(self, serial_id): - pass + """在事件循環線程中執行實際的連接移除""" + if serial_id not in self.serial_objects: + logger.warning(f"Serial object {serial_id} not in managed list") + return False + + try: + serial_obj = self.serial_objects[serial_id] + + # 關閉 UDP transport + if hasattr(serial_obj, 'transport') and serial_obj.transport: + serial_obj.transport.close() + + # 關閉 Serial transport + if hasattr(serial_obj, 'serial_handler') and serial_obj.serial_handler: + if hasattr(serial_obj.serial_handler, 'transport') and serial_obj.serial_handler.transport: + serial_obj.serial_handler.transport.close() + + # 從管理列表中移除 + del self.serial_objects[serial_id] + # logger.info(f"Serial object {serial_id} removed from manager") # debug + return True + + except Exception as e: + logger.error(f"Exception in _async_remove_serial_link for {serial_id}: {str(e)}") + return False - def check_serial_port(serial_port): + @staticmethod + def check_serial_port(serial_port, baudrate): """檢查串口是否存在與可用""" # 檢查設備是否存在 if not os.path.exists(serial_port): @@ -469,7 +589,9 @@ class serial_manager: # 檢查是否有權限訪問設備 try: - os.access(serial_port, os.R_OK | os.W_OK) + if not os.access(serial_port, os.R_OK | os.W_OK): + logger.error(f"No permission to access {serial_port}") + return False except Exception as e: logger.error(f"Cannot Access Serial Device {serial_port}: {str(e)}") return False @@ -477,7 +599,7 @@ class serial_manager: # 檢查是否被占用 try: # 嘗試打開串口 - ser = serial.Serial(serial_port, SERIAL_BAUDRATE) + ser = serial.Serial(serial_port, baudrate) ser.close() # 打開成功後立即關閉 return True except serial.SerialException as e: @@ -488,12 +610,14 @@ class serial_manager: return False -if __main__ == '__main__': +if __name__ == '__main__': sm = serial_manager() sm.start() SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 SERIAL_BAUDRATE = 115200 - UDP_REMOTE_IP = '127.0.0.1' - UDP_REMOTE_PORT = 14560 - sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_IP, SerialReceiverType.XBEEAPI) \ No newline at end of file + UDP_REMOTE_PORT = 14571 + sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialReceiverType.XBEEAPI2AT) + + time.sleep(10) + sm.shutdown() \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/utils/acquireSerial.py b/src/fc_network_adapter/fc_network_adapter/utils/acquireSerial.py new file mode 100644 index 0000000..703ae36 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/utils/acquireSerial.py @@ -0,0 +1,96 @@ +""" +Serial Port Discovery Utility + +This module provides functions to discover available serial ports on the system. +It uses glob pattern matching to find serial device files in /dev/. +""" + +import glob +from typing import List + + +def get_serial_ports() -> List[str]: + """ + 獲取系統中所有可用的串口設備列表 + + 在 Linux 系統中,會搜尋以下模式的設備: + - /dev/ttyUSB* (USB 串口設備) + - /dev/ttyACM* (USB CDC ACM 設備) + - /dev/ttyS* (標準串口) + + Returns: + List[str]: 包含所有找到的串口設備路徑的列表 + + Example: + >>> ports = get_serial_ports() + >>> print(ports) + ['/dev/ttyUSB0', '/dev/ttyUSB1', '/dev/ttyS0'] + """ + serial_ports = [] + + # 搜尋不同類型的串口設備 + patterns = [ + '/dev/ttyUSB*', # USB 串口轉換器 + '/dev/ttyACM*', # USB CDC ACM 設備(如 Arduino) + '/dev/ttyS*', # 標準串口 + ] + + for pattern in patterns: + serial_ports.extend(glob.glob(pattern)) + + # 排序以便於閱讀 + serial_ports.sort() + + return serial_ports + + +def get_serial_ports_with_filter(filter_pattern: str = None) -> List[str]: + """ + 獲取串口設備列表,可選擇性地使用自訂篩選模式 + + Args: + filter_pattern (str, optional): 自訂的 glob 模式,例如 '/dev/ttyUSB*' + 如果為 None,則使用預設模式搜尋所有串口 + + Returns: + List[str]: 符合條件的串口設備路徑列表 + + Example: + >>> # 只搜尋 USB 串口 + >>> usb_ports = get_serial_ports_with_filter('/dev/ttyUSB*') + >>> print(usb_ports) + ['/dev/ttyUSB0', '/dev/ttyUSB1'] + """ + if filter_pattern: + serial_ports = glob.glob(filter_pattern) + serial_ports.sort() + return serial_ports + else: + return get_serial_ports() + + +if __name__ == "__main__": + # 使用範例 + print("=== Serial Port Discovery ===\n") + + # 1. 獲取所有串口設備 + all_ports = get_serial_ports() + print(f"找到 {len(all_ports)} 個串口設備:") + for port in all_ports: + print(f" - {port}") + + print("\n" + "="*30 + "\n") + + # 2. 只搜尋 USB 串口 + usb_ports = get_serial_ports_with_filter('/dev/ttyUSB*') + print(f"找到 {len(usb_ports)} 個 USB 串口設備:") + for port in usb_ports: + print(f" - {port}") + + print("\n" + "="*30 + "\n") + + # 3. 只搜尋 ACM 設備 + acm_ports = get_serial_ports_with_filter('/dev/ttyACM*') + print(f"找到 {len(acm_ports)} 個 ACM 設備:") + for port in acm_ports: + print(f" - {port}")