From e00880c6def8323ea4e366dbd0b871743596a1e7 Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Fri, 26 Dec 2025 12:17:00 +0800 Subject: [PATCH] =?UTF-8?q?1.=20(modify)=20acquireSerial.py=20=E6=92=88?= =?UTF-8?q?=E5=8F=96=E5=A4=9A=E5=80=8B=E7=AB=AF=E5=8F=A3=E5=AD=97=E4=B8=B2?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=202.=20(modify)=20=E8=AA=BF=E6=95=B4?= =?UTF-8?q?=E9=9D=A2=E6=9D=BF=E7=9A=84=E9=A1=AF=E7=A4=BA=E5=AD=97=E4=B8=B2?= =?UTF-8?q?=203.=20(remove)=20serial=5Fudp=5Fbitrans.py=20=E7=94=A8?= =?UTF-8?q?=E4=B8=8D=E5=88=B0=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fc_network_adapter/mainOrchestrator.py | 71 +++- .../fc_network_adapter/serialManager.py | 26 +- .../fc_network_adapter/serial_udp_bitrans.py | 340 ------------------ .../fc_network_adapter/utils/acquireSerial.py | 39 +- 4 files changed, 88 insertions(+), 388 deletions(-) delete mode 100644 src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py diff --git a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py index 52655ff..4a9f16a 100644 --- a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py +++ b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py @@ -21,6 +21,7 @@ from pymavlink import mavutil # 自定義的 import from . import mavlinkObject as mo from . import serialManager as sm +from . import mavlinkVehicleView as mvv from .utils import RingBuffer, setup_logger from .utils import acquireSerial, acquirePort @@ -142,7 +143,7 @@ class ControlPanel: children = [] if not state.socket_object_list: - children.append(MenuNode("(空)", "目前沒有連結口", None)) + children.append(MenuNode("(Empty)", "目前沒有連結口", None)) else: total_items = len(state.socket_object_list) total_pages = (total_items + items_per_page - 1) // items_per_page @@ -158,7 +159,7 @@ class ControlPanel: MenuNode("Cancel Link", "取消轉發連結", "MAVOBJ_CANCEL_LINK"), MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"), MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"), - MenuNode("返回", "回到列表", "BACK"), + MenuNode("GoUp", "回到列表", "BACK"), ]) # 將 socket_id 附加到每個子選單項目上 for child in obj_menu.children: @@ -177,7 +178,7 @@ class ControlPanel: next_node.page = page + 1 children.append(next_node) - children.append(MenuNode("返回", "回到上層選單", "BACK")) + children.append(MenuNode("GoUp", "回到上層選單", "BACK")) menu = MenuNode("Object List", f"連結口列表 (第 {page + 1} 頁)", children=children) menu.current_page = page return menu @@ -294,10 +295,10 @@ class ControlPanel: # 獲取可用的 Serial 連接埠列表 # serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓 - serial_ports = acquireSerial.get_serial_ports_with_filter('/dev/ttyUSB*') + serial_ports = acquireSerial.get_serial_ports_with_filter(['/dev/ttyUSB*', '/dev/ttyACM*']) if not serial_ports: - children.append(MenuNode("(空)", "目前沒有串口設備", None)) + children.append(MenuNode("(Empty)", "目前沒有串口設備", None)) else: total_items = len(serial_ports) total_pages = (total_items + items_per_page - 1) // items_per_page @@ -312,12 +313,12 @@ class ControlPanel: # MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"), ]), MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"), - MenuNode("Link to Middleware", "方便功能 可以直接建立 UDP object", "LINK_SERIAL_TO_MIDDLEWARE_UDP", children=[ + MenuNode("Link to MW", "直接建立 Middleware UDP", "LINK_SERIAL_TO_MIDDLEWARE_UDP", children=[ MenuNode("Yes", action = "LINK_SERIAL_TO_MIDDLEWARE_UDP_YES"), MenuNode("No", action = "LINK_SERIAL_TO_MIDDLEWARE_UDP_NO"), ]), MenuNode("Create", "建立此串口", "CREATE_SERIAL_PORT"), - MenuNode("返回", "回到列表", "BACK"), + MenuNode("GoUp", "回到列表", "BACK"), ]) # 將 port 附加到每個子選單項目上 for child in port_menu.children: @@ -336,7 +337,7 @@ class ControlPanel: next_node.page = page + 1 children.append(next_node) - children.append(MenuNode("返回", "回到上層選單", "BACK")) + children.append(MenuNode("GoUp", "回到上層選單", "BACK")) menu = MenuNode("Serial Port List", f"串口列表 (第 {page + 1} 頁)", children=children) menu.current_page = page return menu @@ -346,7 +347,7 @@ class ControlPanel: children = [] if not state.linked_serial_dict: - children.append(MenuNode("(空)", "目前沒有連結口", None)) + children.append(MenuNode("(Empty)", "目前沒有連結口", None)) else: total_items = len(state.linked_serial_dict) total_pages = (total_items + items_per_page - 1) // items_per_page @@ -361,7 +362,7 @@ class ControlPanel: MenuNode("Info", "查看詳細資訊", "INSPECT_LINKED_SERIAL"), MenuNode("Remove", "移除此連結口", "REMOVE_LINKED_SERIAL"), # MenuNode("Change UDP Target", "變更目標 UDP (工程)", "CHANGE_LINKED_SERIAL_TARGET"), - MenuNode("返回", "回到列表", "BACK"), + MenuNode("GoUp", "回到列表", "BACK"), ]) # 將 serial_id 附加到每個子選單項目上 for child in obj_menu.children: @@ -380,7 +381,7 @@ class ControlPanel: next_node.page = page + 1 children.append(next_node) - children.append(MenuNode("返回", "回到上層選單", "BACK")) + children.append(MenuNode("GoUp", "回到上層選單", "BACK")) menu = MenuNode("Linked Serial List", f"連結口列表 (第 {page + 1} 頁)", children=children) menu.current_page = page return menu @@ -435,6 +436,49 @@ class ControlPanel: stdscr.clear() stdscr.refresh() +# ================ 關於載具檢視的部份 =================== + + def create_vehicles_list_menu(self, state: PanelState, page=0, items_per_page=8): + """動態創建 已連線載具 列表選單(支持分頁)""" + children = [] + + if not state.connected_vehicles_dict: + children.append(MenuNode("(Empty)", "目前沒有已連線的載具", None)) + else: + total_items = len(state.connected_vehicles_dict) + total_pages = (total_items + items_per_page - 1) // items_per_page + start_idx = page * items_per_page + end_idx = min(start_idx + items_per_page, total_items) + + vehicle_id_list = list(state.connected_vehicles_dict.keys()) + # 顯示當前頁的物件 + for vehicle_id in vehicle_id_list[start_idx:end_idx]: + vehicle_menu = MenuNode(f"Vehicle #{vehicle_id}", f"載具 {vehicle_id}", None, children=[ + MenuNode("Info", "查看詳細資訊", "INSPECT_VEHICLE"), + MenuNode("GoUp", "回到列表", "BACK"), + ]) + # 將 vehicle_id 附加到每個子選單項目上 + for child in vehicle_menu.children: + child.vehicle_id = vehicle_id + children.append(vehicle_menu) + + # 添加分頁控制 + if total_pages > 1: + children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) + if page > 0: + prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") + prev_node.page = page - 1 + children.append(prev_node) + if page < total_pages - 1: + next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") + next_node.page = page + 1 + children.append(next_node) + + children.append(MenuNode("GoUp", "回到上層選單", "BACK")) + menu = MenuNode("Connected Vehicles", f"已連線載具列表 (第 {page + 1} 頁)", children=children) + menu.current_page = page + return menu + # ================ 關於 主要選單 的部份 =================== def menu_tree(self): @@ -457,8 +501,9 @@ class ControlPanel: ]), MenuNode("Serial Manager", "Serial 連接埠選項", children=[ MenuNode("New+", "新增 Serial 連接埠", action = "LIST_SERIAL_RES"), - MenuNode("ListAll", "顯示已連線的 Serial", action = "LIST_SERIAL_LINKS"), + MenuNode("ListAll", "顯示並管理已連線的 Serial", action = "LIST_SERIAL_LINKS"), ]), + MenuNode("Vehicles Insp.", "檢視已連線的遠端載具", action = "INSPECT_VEHICLES"), MenuNode("Engineer Mode", "工程模式", children=[ MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), @@ -865,6 +910,8 @@ class Orchestrator: self.stop_evt = stop_sig # 外部操作去中斷 "面板" 執行緒的訊號 (內部自己停止的話不需要用這個) self.occupied_ip_ports = {} # 紀錄已被佔用的 ip:port 組合 "ip str" : [port int, port int, ...] + self.vehicle_registry = mvv.vehicle_registry + # === 1) 面板部分的準備 === self.cmd_q = queue.Queue() self.panelState = PanelState() # 面板的狀態儲存 diff --git a/src/fc_network_adapter/fc_network_adapter/serialManager.py b/src/fc_network_adapter/fc_network_adapter/serialManager.py index c4e6dba..02588e1 100644 --- a/src/fc_network_adapter/fc_network_adapter/serialManager.py +++ b/src/fc_network_adapter/fc_network_adapter/serialManager.py @@ -237,7 +237,7 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial # 處理 AT Command 回應 # response = XBeeFrameHandler.parse_at_command_response(an_frame) # self.at_handler.handle_response(response) - pass # debug + pass elif frame_type == 0x90: # Receive Packet (RX) payload 先解碼 @@ -279,16 +279,6 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial # print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}") - # def write_to_serial(self, data): - # # 在資料透過 Serial 發送之前進行處理 - # processed_data = self.encapsulate_data(data) - - # # 處理失敗就不發送 - # if processed_data not None: - # self.transport.write(processed_data) - - - class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發 LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端IP @@ -311,25 +301,13 @@ class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處 # UDP 收到資料的處理過程 def datagram_received(self, data, addr): # 儲存對方的地址(這樣就能向同一個來源回傳數據) - # self.remote_addr = addr + # self.remote_addr = addr # debug # print(f"Received UDP data from {addr}, setting as remote address") processed_data = XBeeFrameHandler.encapsulate_data(data) if self.serial_handler: self.serial_handler.transport.write(processed_data) - - - # def send_udp(self, data): - # # 藉由 UDP 發送資料出去 - - # # 在透過 UDP 發送數據之前進行解封裝 - # decoded_data = self.decapsulate_data(data) - # if decoded_data is None: - # return - - # if self.transport: - # self.transport.sendto(decoded_data, (self.LOCAL_HOST_IP, self.target_port)) #================================================================== diff --git a/src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py b/src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py deleted file mode 100644 index cb4373c..0000000 --- a/src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py +++ /dev/null @@ -1,340 +0,0 @@ -import asyncio -import serial_asyncio - -# 附加驗證功能 -import os -import sys -import serial -import signal - -# XBee 模組 -from xbee.frame import APIFrame - -# ========================= - -SERIAL_PORT = '/dev/ttyACM0' # serial port -SERIAL_BAUDRATE = 57600 # serial baudrate - -UDP_REMOTE_IP = '127.0.0.1' # UDP Target IP -UDP_REMOTE_PORT = 14550 # UDP Target port - -# 測試用 只會吃一次資料 -DEBUG_MODE = False - -# ========================= - -def check_serial_port(): - """檢查串口是否存在與可用""" - # 檢查設備是否存在 - if not os.path.exists(SERIAL_PORT): - print(f"錯誤:串口設備 {SERIAL_PORT} 不存在") - return False - - # 檢查是否有權限訪問設備 - try: - os.access(SERIAL_PORT, os.R_OK | os.W_OK) - except Exception as e: - print(f"錯誤:無法訪問串口設備 {SERIAL_PORT}:{str(e)}") - return False - - # 檢查是否被占用 - try: - # 嘗試打開串口 - ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) - ser.close() # 打開成功後立即關閉 - return True - except serial.SerialException as e: - print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") - return False - except Exception as e: - print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") - return False - -# ========================= - -class SerialToUDP(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發 - def __init__(self, udp_protocol): - self.udp_protocol = udp_protocol - self.first_data = True # 標記是否為第一次收到資料 - self.has_processed = False # 測試模式用 處理數據旗標 # debug - - def connection_made(self, transport): - self.transport = transport - if hasattr(self.udp_protocol, 'set_serial_transport'): - self.udp_protocol.set_serial_transport(self) - print(f"Serial connection established on {SERIAL_PORT}") - - ## ===================================== - - # Serial 收到資料,轉發到 UDP - def data_received(self, data): - # 在 DEBUG 模式下,如果已經處理過數據,則直接返回 # debug - if DEBUG_MODE and self.has_processed: - return - - # 標記首次收到資料 - if hasattr(self.udp_protocol, 'send_udp'): - if self.first_data: - print(f"First data received from serial, forwarding to UDP: {data[:20]}...") - self.first_data = False - - # 轉發數據 - self.udp_protocol.send_udp(data) - - if DEBUG_MODE: # 測試模式用 # debug - self.has_processed = True - print("[DEBUG] SerialToUDP Mark") - - def write_to_serial(self, data): - # 在資料透過 Serial 發送之前進行處理 - processed_data = self.encapsulate_data(data) - - # 處理失敗就不發送 - if processed_data not None: - self.transport.write(processed_data) - - def encapsulate_data(self, data): - - """ - 將數據封裝為 XBee API 傳輸幀 - - 使用 XBee API 格式封裝數據: - - 傳輸請求幀 (0x10) - - 使用廣播地址 - - 添加適當的頭部和校驗和 - """ - - ## 方法一 - # 設置幀參數 - frame_id = 0x01 # 幀識別碼,用於確認 - frame_type = 0x10 # 幀類型:傳輸請求 - dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 64位目標地址 (廣播) - dest_addr16 = b'\xFF\xFE' # 16位目標地址 (未知/廣播) - broadcast_radius = 0x00 # 廣播半徑 (0 = 最大) - options = 0x00 # 傳輸選項 - - # 構建幀數據部分 - frame_data = bytearray() - frame_data.append(frame_type) # 添加幀類型 - frame_data.append(frame_id) # 添加幀 ID - frame_data.extend(dest_addr64) # 添加 64 位目標地址 - frame_data.extend(dest_addr16) # 添加 16 位目標地址 - frame_data.append(broadcast_radius) # 添加廣播半徑 - frame_data.append(options) # 添加選項 - frame_data.extend(data) # 添加實際數據內容 - - # 計算校驗和 (0xFF 減去所有字節的總和的最低 8 位) - checksum = 0xFF - (sum(frame_data) & 0xFF) - - # 構建完整的 API 幀 - # 起始分隔符 + 長度 (兩字節) + 幀數據 + 校驗和 - complete_frame = bytearray() - complete_frame.append(0x7E) # 添加起始分隔符 - complete_frame.extend(struct.pack(">H", len(frame_data))) # 添加長度 (高位優先) - complete_frame.extend(frame_data) # 添加幀數據 - complete_frame.append(checksum) # 添加校驗和 - - return bytes(complete_frame) - - ## 方法二 - # frame_id=0x01 - # frame_type = 0x10 - # dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 - # dest_addr16 = b'\xFF\xFE' - # broadcast_radius = 0x00 - # options = 0x00 - - # frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) - # frame += dest_addr64 + dest_addr16 - # frame += struct.pack(">BB", broadcast_radius, options) + data - # checksum = 0xFF - (sum(frame) & 0xFF) - # return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) - - -class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發 - def __init__(self): - self.serial_transport = None - self.transport = None - self.remote_addr = None # 儲存動態獲取的遠程地址 - self.has_processed = False # 測試模式用 處理數據旗標 # debug - - def connection_made(self, transport): - self.transport = transport - print("UDP transport ready. Waiting for serial data before sending...") - - def set_serial_transport(self, serial_transport): - self.serial_transport = serial_transport - - ## ===================================== - - # UDP 收到資料 - def datagram_received(self, data, addr): - # 儲存對方的地址(這樣就能向同一個來源回傳數據) - # self.remote_addr = addr - # print(f"Received UDP data from {addr}, setting as remote address") - - # 在 DEBUG 模式下,如果已經處理過數據,則直接返回 - if DEBUG_MODE and self.has_processed: - return - - if self.serial_transport: - self.serial_transport.write_to_serial(data) - - if DEBUG_MODE: # 測試模式用 - self.has_processed = True - print("[DEBUG] UDPHandler Mark") - - def send_udp(self, data): - # 發送資料到 UDP - - # if self.transport: - # # 如果有已知的回應地址,使用該地址 - # if self.remote_addr: - # self.transport.sendto(data, self.remote_addr) - # # print(f"Sending to dynamic address: {self.remote_addr}") - # else: - # # 否則使用預設地址 - # self.transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) - # print(f"Sending first UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}") - - if self.transport: - # 只有第一次或地址改變時才顯示 - # if not hasattr(self, '_last_sent_addr') or self._last_sent_addr != (UDP_REMOTE_IP, UDP_REMOTE_PORT): - # print(f"Sending UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}") - # self._last_sent_addr = (UDP_REMOTE_IP, UDP_REMOTE_PORT) - - # 在透過 UDP 發送數據之前進行解封裝 - decoded_data = self.decapsulate_data(data) - self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) - - def decapsulate_data(self, data): - # 這裡可以根據需要進行數據解封裝 - - ## 方法一 - try: - # 創建一個 API 幀處理器 - api_frame = APIFrame(escaped=True) - - # 嘗試解析數據 - api_frame.fill(data) - - # 如果數據不完整,直接返回原始數據 - if not api_frame.is_complete(): - return data - - # 解析幀內容 - parsed_data = api_frame.parse() - - # 提取有用數據 - if parsed_data: - frame_data = parsed_data.get('rf_data', None) - if frame_data: - return frame_data - - return data - - ## 方法二 - 手動解析 - # try: - # # XBee API 幀格式: - # # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節) - - # # 檢查幀起始符 (0x7E) - # if not data or len(data) < 5 or data[0] != 0x7E: - # return data - - # # 獲取數據長度 (不包括校驗和) - # length = (data[1] << 8) + data[2] - - # # 檢查幀完整性 - # if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和 - # return data - - # # 提取API標識符和數據 - # frame_type = data[3] - # frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中 - - # # 根據不同的幀類型進行處理 - # if frame_type == 0x90: # 例如,這是"接收數據包"類型 - # # 提取實際有效載荷 - # # 對於接收數據包,格式通常是: - # # API標識符(1) + 64位地址(8) + 16位地址(2) + 選項(1) + 數據 - # if len(frame_data) >= 12: # 確保數據長度足夠 - # payload = frame_data[11:] # 前11字節是地址和選項 - # return payload - # return data - - # 如果無法提取 則回傳 None - except Exception as e: - print(f"手動解析 XBee 數據錯誤: {e}") - return None - -async def main(): - # 先檢查串口是否可用 - if not check_serial_port(): - print("程式終止:串口檢查失敗") - return - - loop = asyncio.get_running_loop() - - # 設置終止處理 - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler( - sig, - lambda: asyncio.create_task(shutdown(loop)) - ) - - # 建立單一 UDP 端點處理收發 - udp_handler = UDPHandler() - - # 建立 UDP 傳輸,不指定接收端口,讓系統自動分配 - try: - udp_transport, protocol = await loop.create_datagram_endpoint( - lambda: udp_handler, - local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口 - ) - except Exception as e: - print(f"無法創建 UDP 端點:{str(e)}") - return - - # 獲取系統分配的本地端口 - sock = udp_transport.get_extra_info('socket') - local_addr = sock.getsockname() - print(f"UDP listening on {local_addr[0]}:{local_addr[1]}") - - # 建立 Serial 傳輸,將 UDP 處理器傳給它 - try: - serial_proto = SerialToUDP(udp_handler) - _, serial_transport = await serial_asyncio.create_serial_connection( - loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE - ) - except Exception as e: - print(f"無法建立串口連接:{str(e)}") - udp_transport.close() - return - - print(f"Waiting for data from serial port to initiate UDP communication...") - - # 保持運行 - try: - await asyncio.Future() - except asyncio.CancelledError: - pass - -async def shutdown(loop): - """關閉程序""" - print("Shutting down...") - tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - - for task in tasks: - task.cancel() - - await asyncio.gather(*tasks, return_exceptions=True) - loop.stop() - -if __name__ == '__main__': - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("程式被使用者中斷") - except Exception as e: - print(f"程式執行錯誤:{str(e)}") \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/utils/acquireSerial.py b/src/fc_network_adapter/fc_network_adapter/utils/acquireSerial.py index 703ae36..1e1a73f 100644 --- a/src/fc_network_adapter/fc_network_adapter/utils/acquireSerial.py +++ b/src/fc_network_adapter/fc_network_adapter/utils/acquireSerial.py @@ -6,7 +6,7 @@ It uses glob pattern matching to find serial device files in /dev/. """ import glob -from typing import List +from typing import List, Union def get_serial_ports() -> List[str]: @@ -44,29 +44,44 @@ def get_serial_ports() -> List[str]: return serial_ports -def get_serial_ports_with_filter(filter_pattern: str = None) -> List[str]: +def get_serial_ports_with_filter(filter_patterns: Union[str, List[str]] = None) -> List[str]: """ 獲取串口設備列表,可選擇性地使用自訂篩選模式 Args: - filter_pattern (str, optional): 自訂的 glob 模式,例如 '/dev/ttyUSB*' - 如果為 None,則使用預設模式搜尋所有串口 + filter_patterns (Union[str, List[str]], optional): + 單一或多個 glob 模式 + - 字串: '/dev/ttyUSB*' + - 列表: ['/dev/ttyUSB*', '/dev/ttyACM*'] + 如果為 None,則使用預設模式搜尋所有串口 Returns: List[str]: 符合條件的串口設備路徑列表 Example: - >>> # 只搜尋 USB 串口 - >>> usb_ports = get_serial_ports_with_filter('/dev/ttyUSB*') - >>> print(usb_ports) + >>> # 單一 pattern + >>> ports = get_serial_ports_with_filter('/dev/ttyUSB*') + >>> print(ports) ['/dev/ttyUSB0', '/dev/ttyUSB1'] + + >>> # 多個 patterns + >>> ports = get_serial_ports_with_filter(['/dev/ttyUSB*', '/dev/ttyACM*']) + >>> print(ports) + ['/dev/ttyACM0', '/dev/ttyUSB0', '/dev/ttyUSB1'] """ - if filter_pattern: - serial_ports = glob.glob(filter_pattern) - serial_ports.sort() - return serial_ports - else: + if filter_patterns is None: return get_serial_ports() + + # 統一轉成 list 處理 + if isinstance(filter_patterns, str): + filter_patterns = [filter_patterns] + + serial_ports = [] + for pattern in filter_patterns: + serial_ports.extend(glob.glob(pattern)) + + serial_ports.sort() + return serial_ports if __name__ == "__main__":