From c12959d964d99a474c52a1dd8eed572a0cbc0676 Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Thu, 8 Jan 2026 13:00:00 +0800 Subject: [PATCH] =?UTF-8?q?(Sort=20Out)=201.=20=E6=96=B0=E5=A2=9E=20fc=5Fn?= =?UTF-8?q?etwork=5Fadapter.md=20=E6=95=B4=E7=90=86=E4=B8=A6=E8=A8=98?= =?UTF-8?q?=E9=8C=84=E5=B0=88=E6=A1=88=E7=B5=90=E6=A7=8B=20=E7=A8=8B?= =?UTF-8?q?=E5=BC=8F=E5=8A=9F=E8=83=BD=E7=84=A1=E6=94=B9=E5=8B=95=20?= =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=8E=92=E7=89=88=E8=88=87=E5=91=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fc_network_adapter/fc_network_adapter.md | 166 ++ .../fc_network_adapter/mainOrchestrator.py | 1673 ++++++++--------- .../fc_network_adapter/mavlinkObject.py | 16 +- .../fc_network_adapter/serialManager.py | 8 +- 4 files changed, 1013 insertions(+), 850 deletions(-) create mode 100644 src/fc_network_adapter/fc_network_adapter/fc_network_adapter.md diff --git a/src/fc_network_adapter/fc_network_adapter/fc_network_adapter.md b/src/fc_network_adapter/fc_network_adapter/fc_network_adapter.md new file mode 100644 index 0000000..98dba3a --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/fc_network_adapter.md @@ -0,0 +1,166 @@ + +這個檔案整理 此專案下 程式代碼的流程與思路 +只會挑出重要的變數與方法描述 +以利後續開發使用 + + + +# 檔案結構 + +特別注意: +1. 有標註 [async method] 都是不該被直接呼叫的內部方法 + +- *valuable* 這個是變數 **沒有括號** +- *method (parameters...)* 這個是方法 **有括號** + +## mainOrchestrator.py : 程式進入點 + +### **[Class]** Orchestrator + 最上層的發配資源與啟動終端機面板的調配者 +- *self.manager* 存放 async_io_manager 實例 +- *self.bridge* 存放 mavlink_bridge 實例 +- *self.plumber* 存放 serial_manager 實例 +- *self.vehicle_registry* 存放 vehicle_registry 實例 + +- *self.panel_thread* 面板的執行緒 +- *self.panelState* 暫存面板與調配者互動的資料流動區 + - 面板運行狀態 + - 面板操作結果 + - 其他模組的運行狀態 +--- +- *mainLoop()* 核心方法 + - 更新個模組狀態到 *self.panelState* + - 對應面板來的操作指令 +--- + 對於 async_io_manager 控制實現 +- *create_udp_object()* +- *delete_udp_object()* +- *add_target_to_object()* +- *remove_target_from_object()* +--- + 關於載具管理與檢視 +- *_update_vehicles_list()* +- *_prepare_vehicle_info* +--- + 關於 serial_manager 控制實現 +- *create_serial_port_object()* + + +### **[Class]** ControlPanel + 面板的核心運行物件 + 把自己的變數 獨立出來都放到 PanelState 去 +- *panel_thread()* 核心方法 + - 主選單的引入 + - 主選單下所有的按鍵操作 + - 定義所有人為操作後續面板執行緒行為 +- *menu_tree()* 基礎選單的定義檔 +--- + 關於 udp object 的操作 +- *create_object_list_menu()* object 選單的定義檔 +- *show_object_info()* 顯示 object 資訊 +- *select_target_socket()* object 對於轉拋功能的操作 +--- + 關於 serial 的操作 +- *create_serial_port_menu()* +- *create_linked_serial_menu()* +- *show_linked_serial_info()* +--- + 關於載具檢視與操作 +- *create_vehicles_list_menu()* +- *show_vehicle_info()* + +### **[Class]** PanelState + 作為面板執行緒(ControlPanel)與調配者(Orchestrator)溝通的管道 + 不包含具體實作方法 是 ControlPanel 的延伸 +- *self.panel_info_msg_list* 顯示在面板上的資訊訊息 + +## mavlinkObject.py + +### 全域變數 +- *stream_bridge_ring* +- *return_packet_ring* + +### **[Class]** mavlink_bridge + 唯一實例 + 實際去解析 mavlink 封包的地方 + 接收 stream_bridge_ring 與 return_packet_ring 的資料 +- *self.thread* 自己的執行緒 +--- +- *_run_thread()* 核心方法 +- *_handle_XXXXX()* 每一種單項 mavlink 封包的解析 + +### **[Class]** async_io_manager + 唯一實例 + 異步 event loop + 管理 mavlink_object 的地方 + 沒有核心方法 +- *self.thread* 自己的執行緒 +- *self.managed_objects* 資料結構 socket_id: mavlink_object +--- +- *add_mavlink_object(mavlink_object)* [call method] 把一個 mavlink_object 物件加入管理 +- *_async_add_mavlink_object(mavlink_object)* [async method] 對應上面的內部方法 不該直接使用 +- *remove_mavlink_object(socket_id)* [call method] 從管理區把指定 mavlink_object 移除 + +### **[Class]** mavlink_object + 儲存 mavlink socket + 處理 mavlink 封包分流的地方 +- *cls.mavlinkObjects* 資料結構 { socket_id(序號) : mavlink_object(物件實例) } +- *self.mavlink_socket* 從 pymavlink 繼承的socket物件 +- *self.state* 描述這個 socket 物件的狀態 +--- +- *process_data()* [async method] 核心方法 +- *remove_target_socket()* *add_target_socket()* +- *send_message()* + +## serialManager.py + 看這個檔案的重點再於要搞清楚 端口物件 還是 傳輸物件 + +### **[Class]** serial_manager + 異步 event loop + 管理 mavlink_object 的地方 +- *self.thread* 自己的執行緒 +- *self.loop* 自己的事件迴圈 +- *self.serial_objects* 存放管理的物件 serial id num : serial_object +--- +- *create_serial_link()* [call method] 把 serial 端口跟 UDP 端口打通 +- *_async_create_serial_link()* [async method] 把兩種端口接起來的重點程序 +- *remove_serial_link()* [call method] 關閉指定的 serial 端口 +- *_async_remove_serial_link()* [async method] + +### **[Class]** serial_object + 被塞在 serial_manager 裡面 + 只是一個變數物件 + 用來被儲存 serial 的資訊 +- *self.transport* +- *self.protocol* +- *self.udp_handler* UDP 端口物件 +- *self.serial_handler* Serial 端口物件 + +### **[Class]** UDPHandler + 處理 UDP 收發的端口 作為一個端口物件 + 作為 UDP OutBound 使用 所以不會佔用系統監聽資源 +- *self.transport* 自己的傳輸物件 +--- +- *datagram_received()* 先加碼成 Xbee 再呼叫 Serial 端口物件送出 + +### **[Class]** SerialHandler + 處理 Serial 收發的端口 作為一個端口物件 +- *self.transport* 自己的傳輸物件 +--- +- *data_received()* 先組合 Serial 封包 再解碼 再呼叫 UDP 端口物件送出 + +## mavlinkVehicleView.py + 這個檔案是作為載具的資訊暫存庫使用 會搭配 ROS2 的功能 再做利用 + +# 開發記錄 + +## 已實現功能 +1. mavlink 分流解析 +2. mavlink socket 建立 +3. mavlink socket 轉拋 +4. 連結 Serial 轉 UDP +5. 各單元模組化 +6. 終端機介面控制 +7. 基礎載具流量觀測 + + diff --git a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py index ed78fbd..f77d1ad 100644 --- a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py +++ b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py @@ -27,9 +27,8 @@ from .utils import RingBuffer, setup_logger from .utils import acquireSerial, acquirePort from .utils.acquirePort import find_available_port - - logger = setup_logger(os.path.basename(__file__)) +VERSION_NO = "v0.56" class PanelState: def __init__(self): @@ -84,8 +83,8 @@ class PanelState: def intoSTOPPED(self): self.panel_status = "Stopped" - def set_user_input(self, text): - self.user_input = text + # def set_user_input(self, text): + # self.user_input = text class MenuNode: def __init__(self, name, desc="", action=None, children=None): @@ -152,726 +151,213 @@ class ControlPanel: return user_input -# ================ 關於 mavlink object 的部份 =================== - - def create_object_list_menu(self, state: PanelState, page=0, items_per_page=8): - """動態創建 mavlink_object 列表選單(支持分頁)""" - children = [] - - if not state.socket_object_list: - children.append(MenuNode("(Empty)", "目前沒有連結口", None)) - else: - total_items = len(state.socket_object_list) - total_pages = (total_items + items_per_page - 1) // items_per_page - start_idx = page * items_per_page - end_idx = min(start_idx + items_per_page, total_items) - - # 顯示當前頁的物件 - for socket_id in state.socket_object_list[start_idx:end_idx]: - # 為每個 socket 創建子選單 - obj_menu = MenuNode(f"Socket #{socket_id}", f"連結口 {socket_id}", None, children=[ - MenuNode("Info", "查看詳細資訊", "INSPECT_MAV_OBJECT"), - MenuNode("Make Link", "建立轉發連結", "MAVOBJ_MAKE_LINK"), - MenuNode("Cancel Link", "取消轉發連結", "MAVOBJ_CANCEL_LINK"), - MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"), - MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"), - MenuNode("GoUp", "回到列表", "BACK"), - ]) - # 將 socket_id 附加到每個子選單項目上 - for child in obj_menu.children: - child.socket_id = socket_id - children.append(obj_menu) + # ================ 關於 主要選單 的部份 =================== - # 添加分頁控制 - if total_pages > 1: - children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) - if page > 0: - prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") - prev_node.page = page - 1 - children.append(prev_node) - if page < total_pages - 1: - next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") - next_node.page = page + 1 - children.append(next_node) + def menu_tree(self): + """建立多層選單結構""" + return MenuNode("Main Menu", children=[ + MenuNode("MavLink Object", "UDP MavLink 通道選項", children=[ + MenuNode("New+", children=[ + MenuNode("UDP InBound", children=[ + MenuNode("IP(Listen)", "設定監聽的 IP 位址", "TEXT_UDP_IP"), + MenuNode("Port(Listen)", "設定監聽的 Port", "TEXT_UDP_PORT"), + MenuNode("Create", "建立 UDP InBound 連結口", "CREATE_UDP_INBOUND"), + ]), + MenuNode("UDP OutBound", children=[ + MenuNode("IP(Target)", "設定目標的 IP 位址", "TEXT_UDP_IP"), + MenuNode("Port(Target)", "設定目標的 Port", "TEXT_UDP_PORT"), + MenuNode("Create", "建立 UDP OutBound 連結口", "CREATE_UDP_OUTBOUND"), + ]), + ]), + MenuNode("ListAll", "顯示並管理所有連結口", "LIST_MAV_OBJECT"), + ]), + MenuNode("Serial Manager", "Serial 連接埠選項", children=[ + MenuNode("New+", "新增 Serial 連接埠", action = "LIST_SERIAL_RES"), + MenuNode("ListAll", "顯示並管理已連線的 Serial", action = "LIST_SERIAL_LINKS"), + ]), + MenuNode("Vehicles Insp.", "檢視已連線的遠端載具", action = "INSPECT_VEHICLES"), + MenuNode("Engineer Mode", "工程模式", children=[ + MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), + MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), + MenuNode("Stop Serial M.", "停止 Serial 端口轉接", "STOP_SERIAL_MANAGER"), + ]), + MenuNode("Shutdown", "關閉整個系統", children=[ + MenuNode("Return", "繼續運行", "BACK"), + MenuNode("Confirm", "關閉系統", "QUIT"), + ]), + ]) - children.append(MenuNode("GoUp", "回到上層選單", "BACK")) - menu = MenuNode("Object List", f"連結口列表 (第 {page + 1} 頁)", children=children) - menu.current_page = page - return menu + def panel_thread(self, cmd_q: queue.Queue, state: PanelState, stop_evt: threading.Event): + stdscr = None + + def cleanup(): + """清理 curses 狀態""" + if stdscr: + stdscr.keypad(False) + curses.nocbreak() + curses.echo() + curses.endwin() - def show_object_info(self, stdscr, socket_id, state: PanelState): - """顯示物件詳細資訊的對話框""" + def pre_panel_shutdown(): + # 先關閉所有模組 再關閉面板 + cmd_q.put("SHUTDOWN_BRIDGE") + cmd_q.put("SHUTDOWN_MANAGER") + cmd_q.put("SHUTDOWN_SERIAL_MANAGER") - start = time.time() - while not state.socket_info_single.get('InfoReady', False): - # 太久沒有回應 - if time.time() - start > 2: - state.panel_info_msg_list.append(("Fail! Socket Info NOT Aquire!", time.time())) - return - time.sleep(0.05) # 等待資訊準備好 + def draw_menu(screen): + nonlocal stdscr + stdscr = screen + + curses.curs_set(0) + stdscr.nodelay(False) # 阻塞讀鍵 + stdscr.keypad(True) - height, width = stdscr.getmaxyx() - dialog_height = 15 - dialog_width = min(70, width - 4) - start_y = (height - dialog_height) // 2 - start_x = (width - dialog_width) // 2 - - dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) - dialog_win.border() - dialog_win.addstr(0, 2, f" Socket #{socket_id} 詳細資訊 ", curses.A_BOLD) - - # 這裡顯示基本資訊 - dialog_win.addstr(2, 2, f"Socket ID : {socket_id}") - dialog_win.addstr(3, 2, f"Socket status : {state.socket_info_single.get('socket_state', 'N/A')}") - # show_str = ", ".join(map(str, state.socket_info_single.get('socket_type', ''))) - dialog_win.addstr(4, 2, f"Socket Type : {state.socket_info_single.get('socket_type', '')}") - dialog_win.addstr(4, 30, f"{state.socket_info_single.get('socket_connection_string', '')}") - show_str = ",".join(map(str, state.socket_info_single.get('bridge_msg_types', ''))) - dialog_win.addstr(5, 2, f"Bridge Pack : {show_str if show_str else 'N/A'}") - show_str = ",".join(map(str, state.socket_info_single.get('return_msg_types', ''))) - dialog_win.addstr(6, 2, f"Return Pack : {show_str if show_str else 'N/A'}") - dialog_win.addstr(7, 2, f"Primary Socket ID: {state.socket_info_single.get('primary_socket_id', 'It Self')}") - show_str = ",".join(map(str, state.socket_info_single.get('target_sockets', ''))) - dialog_win.addstr(8, 2, f"Switching Targets: {show_str if show_str else 'N/A'}") + # 選單導航狀態 + menu_stack = [self.menu_tree()] # 選單堆疊 + idx_stack = [0] # 索引堆疊 - state.socket_info_single['InfoReady'] = False # 重置狀態以便下次使用 - - dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...") - dialog_win.refresh() - - dialog_win.getch() - del dialog_win - stdscr.clear() - stdscr.refresh() - - def select_target_socket(self, stdscr, source_socket_id, state: PanelState, remove_mode=False): - """選擇目標 socket 的對話框""" - height, width = stdscr.getmaxyx() - dialog_height = min(15, len(state.socket_object_list) + 5) - dialog_width = min(50, width - 4) - start_y = (height - dialog_height) // 2 - start_x = (width - dialog_width) // 2 - - dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) - dialog_win.keypad(True) - - title = "選擇要移除的目標" if remove_mode else "選擇轉發目標" - available_sockets = [sid for sid in state.socket_object_list if sid != source_socket_id] - - if not available_sockets: - dialog_win.border() - dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD) - dialog_win.addstr(2, 2, "沒有可用的目標") - dialog_win.addstr(4, 2, "按任意鍵返回...") - dialog_win.refresh() - dialog_win.getch() - del dialog_win - stdscr.clear() - stdscr.refresh() - return None - - selected_idx = 0 - - while True: - dialog_win.clear() - dialog_win.border() - dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD) - - for i, socket_id in enumerate(available_sockets): - marker = "➤" if i == selected_idx else " " - attr = curses.A_REVERSE if i == selected_idx else curses.A_NORMAL - dialog_win.addstr(2 + i, 2, f"{marker} Socket #{socket_id}", attr) - - dialog_win.addstr(dialog_height - 2, 2, "Enter確認 ESC取消") - dialog_win.refresh() - - ch = dialog_win.getch() + state.intoSTART() # 設定狀態為運行中 - if ch in (curses.KEY_UP, ord('k')): - selected_idx = (selected_idx - 1) % len(available_sockets) - elif ch in (curses.KEY_DOWN, ord('j')): - selected_idx = (selected_idx + 1) % len(available_sockets) - elif ch in (curses.KEY_ENTER, 10, 13): - result = available_sockets[selected_idx] - del dialog_win - stdscr.clear() - stdscr.refresh() - return result - elif ch == 27: # ESC - del dialog_win + while not stop_evt.is_set(): + + current_menu = menu_stack[-1] + current_idx = idx_stack[-1] + + # 獲取終端機尺寸 + height, width = stdscr.getmaxyx() + # 簡單暴力的限制視窗的大小 + MIN_HEIGHT = ( + 2 + # 邊界 + 6 + # 狀態列 操作說明列 一個空白 + 11+ # 最大選單 與 空白區 + 5 # 訊息區域 + ) + if height < MIN_HEIGHT or width < 60: + logger.error("Terminal size too small for Control Panel.") + break + stdscr.clear() - stdscr.refresh() - return None + stdscr.border() -# ================ 關於 serial link 的部份 =================== + # 更新模組狀態顯示 + stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD) + stdscr.addstr(1, 2, f" Panel Status : {state.panel_status}") + stdscr.addstr(2, 2, f"Object Manager State : {state.object_manager_state}") + stdscr.addstr(3, 2, f"Mavlink Bridge State : {state.mavlink_bridge_state}") + stdscr.addstr(4, 2, f"Socket Object number : {len(state.socket_object_list)}") + stdscr.addstr(2, 36, f"Serial Manager State : {state.serial_manager_state}") - def create_serial_port_menu(self, state: PanelState, page=0, items_per_page=8): - """動態創建 serial port 列表選單(支持分頁)""" - children = [] - - # 獲取可用的 Serial 連接埠列表 - # serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓 - serial_ports = acquireSerial.get_serial_ports_with_filter(['/dev/ttyUSB*', '/dev/ttyACM*']) - - if not serial_ports: - children.append(MenuNode("(Empty)", "目前沒有串口設備", None)) - else: - total_items = len(serial_ports) - total_pages = (total_items + items_per_page - 1) // items_per_page - start_idx = page * items_per_page - end_idx = min(start_idx + items_per_page, total_items) - - # 顯示當前頁的串口 - for port in serial_ports[start_idx:end_idx]: - port_menu = MenuNode(f"{port}", children=[ - MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[ - MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"), - # MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"), - ]), - MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"), - MenuNode("Link to MW", "直接建立 Middleware UDP", "LINK_SERIAL_TO_MIDDLEWARE_UDP", children=[ - MenuNode("Yes", action = "LINK_SERIAL_TO_MIDDLEWARE_UDP_YES"), - MenuNode("No", action = "LINK_SERIAL_TO_MIDDLEWARE_UDP_NO"), - ]), - MenuNode("Create", "建立此串口", "CREATE_SERIAL_PORT"), - MenuNode("GoUp", "回到列表", "BACK"), - ]) - # 將 port 附加到每個子選單項目上 - for child in port_menu.children: - child.port = port - children.append(port_menu) - - # 添加分頁控制 - if total_pages > 1: - children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) - if page > 0: - prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") - prev_node.page = page - 1 - children.append(prev_node) - if page < total_pages - 1: - next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") - next_node.page = page + 1 - children.append(next_node) - - children.append(MenuNode("GoUp", "回到上層選單", "BACK")) - menu = MenuNode("Serial Port List", f"串口列表 (第 {page + 1} 頁)", children=children) - menu.current_page = page - return menu + # 顯示當前選單項目 + start_line = 6 + for i, child in enumerate(current_menu.children): + marker = "➤ " if i == current_idx else " " + # 動態顯示已輸入的值 + desc = child.desc + if child.action == "TEXT_UDP_IP" and state.udp_info_temp["IP"]: + desc = f"{child.desc} [{state.udp_info_temp['IP']}]" + elif child.action == "TEXT_UDP_PORT" and state.udp_info_temp["Port"]: + desc = f"{child.desc} [{state.udp_info_temp['Port']}]" + elif child.action == "SET_SERIAL_COMM" and state.serial_info_temp["CommunicationType"]: + desc = f"{child.desc} [{state.serial_info_temp['CommunicationType']}]" + elif child.action == "TEXT_BAUD_SERIAL" and state.serial_info_temp["Baud"]: + desc = f"{child.desc} [{state.serial_info_temp['Baud']}]" + elif child.action == "LINK_SERIAL_TO_MIDDLEWARE_UDP": + link_status = "Yes" if state.serial_info_temp["Go2Middleware"] else "No" + desc = f"{child.desc} [{link_status}]" + + line = f"{marker}{child.name:15s} – {desc}" + attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL + stdscr.addstr(start_line + i, 4, line, attr) - def create_linked_serial_menu(self, state: PanelState, page=0, items_per_page=8): - """動態創建 已連線的 serial port 列表選單(支持分頁)並包含後續管理功能""" - children = [] - - if not state.linked_serial_dict: - children.append(MenuNode("(Empty)", "目前沒有連結口", None)) - else: - total_items = len(state.linked_serial_dict) - total_pages = (total_items + items_per_page - 1) // items_per_page - start_idx = page * items_per_page - end_idx = min(start_idx + items_per_page, total_items) - - # 顯示當前頁的物件 - linked_serial_id_list = list(state.linked_serial_dict.keys()) - for serial_id in linked_serial_id_list[start_idx:end_idx]: - # 為每個 socket 創建子選單 - obj_menu = MenuNode(f"Serial #{serial_id}", f"連結口 {serial_id}", None, children=[ - MenuNode("Info", "查看詳細資訊", "INSPECT_LINKED_SERIAL"), - MenuNode("Remove", "移除此連結口", "REMOVE_LINKED_SERIAL"), - # MenuNode("Change UDP Target", "變更目標 UDP (工程)", "CHANGE_LINKED_SERIAL_TARGET"), - MenuNode("GoUp", "回到列表", "BACK"), - ]) - # 將 serial_id 附加到每個子選單項目上 - for child in obj_menu.children: - child.serial_id = serial_id - children.append(obj_menu) - - # 添加分頁控制 - if total_pages > 1: - children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) - if page > 0: - prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") - prev_node.page = page - 1 - children.append(prev_node) - if page < total_pages - 1: - next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") - next_node.page = page + 1 - children.append(next_node) - - children.append(MenuNode("GoUp", "回到上層選單", "BACK")) - menu = MenuNode("Linked Serial List", f"連結口列表 (第 {page + 1} 頁)", children=children) - menu.current_page = page - return menu + # 顯示訊息區域 + # info_start_line = start_line + len(current_menu.children) + 1 + info_start_line = height - 8 + max_msg_lines = 5 # 最多顯示 5 行訊息 + current_time = time.time() + + # 清理過時的訊息 + state.panel_info_msg_list = [ + (msg, timestamp) for msg, timestamp in state.panel_info_msg_list + if current_time - timestamp < 2.0 #秒數 + ] + + # 只顯示最新的 max_msg_lines 條訊息 + display_msgs = state.panel_info_msg_list[-max_msg_lines:] + + for i, msg_data in enumerate(display_msgs): + if info_start_line + i >= help_line - 1: # 避免超出邊界 + break + msg = msg_data[0] if isinstance(msg_data, tuple) else msg_data + # 截斷過長的訊息 + max_msg_width = width - 6 + if len(msg) > max_msg_width: + msg = msg[:max_msg_width-3] + "..." - def show_linked_serial_info(self, stdscr, serial_id, state: PanelState): - """顯示 Serial 連結詳細資訊的對話框""" + stdscr.addstr(info_start_line + i, 2, f"💬 {msg}", curses.A_BOLD) - start = time.time() - while not state.serial_info_single.get('InfoReady', False): - # 太久沒有回應 - if time.time() - start > 2: - state.panel_info_msg_list.append(("Fail! Serial Info NOT Aquire!", time.time())) - return - time.sleep(0.05) # 等待資訊準備好 + + + # 操作說明 + # help_line = start_line + len(current_menu.children) + 2 + help_line = height - 2 + stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層", curses.A_DIM) + stdscr.addstr(height-1 , width-12, f" {VERSION_NO} ", curses.A_DIM) - height, width = stdscr.getmaxyx() - dialog_height = 15 - dialog_width = min(70, width - 4) - start_y = (height - dialog_height) // 2 - start_x = (width - dialog_width) // 2 - - dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) - dialog_win.border() - dialog_win.addstr(0, 2, f" Serial Link #{serial_id} 詳細資訊 ", curses.A_BOLD) + stdscr.refresh() - # 從 linked_serial_dict 獲取資訊 - serial_info = state.linked_serial_dict.get(serial_id, {}) - - if not serial_info: - dialog_win.addstr(2, 2, f"無法取得 Serial #{serial_id} 的資訊") - else: - # 顯示基本資訊 - dialog_win.addstr(2, 2, f"Serial ID : {serial_id}") - dialog_win.addstr(3, 2, f"Serial Port : {state.serial_info_single.get('serial_port', 'N/A')}") - dialog_win.addstr(4, 2, f"Baudrate : {state.serial_info_single.get('baudrate', 'N/A')}") - dialog_win.addstr(5, 2, f"Communication : {state.serial_info_single.get('receiver_type', 'N/A')}") - dialog_win.addstr(6, 2, f"Target UDP Port : {state.serial_info_single.get('target_port', 'N/A')}") - dialog_win.addstr(7, 2, f"Status : {state.serial_info_single.get('status', 'Running')}") - - # 如果有額外資訊可以繼續添加 - if 'thread_alive' in serial_info: - thread_status = "Alive" if serial_info['thread_alive'] else "Stopped" - dialog_win.addstr(8, 2, f"Thread Status : {thread_status}") - - state.serial_info_single['InfoReady'] = False # 重置狀態以便下次使用 + # 若進入 TERMINATION 狀態,畫面可以刷新 但是不能操作 + # 驗證 其他附屬模組的狀態都停止後 就進入 STOPPED 狀態並跳出迴圈 + # 超過幾秒沒有反應就強制關閉 + if state.panel_status == "Terminating": + if time.time() - state.termination_start_time > 7: # 其他組件設定5秒 這邊給多一點 + logger.warning("Control Panel forced shutdown after timeout.") + state.intoSTOPPED() + # stop_evt.set() + # continue + break + time.sleep(0.1) + if (state.mavlink_bridge_state == "Stopped" and + state.object_manager_state == "Stopped" and + state.serial_manager_state == "Stopped"): + state.intoSTOPPED() + # stop_evt.set() + break + continue - dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...") - dialog_win.refresh() - - dialog_win.getch() - del dialog_win - stdscr.clear() - stdscr.refresh() + # 設定短暫的 timeout,讓執行緒能夠響應 stop_evt + stdscr.timeout(100) + ch = stdscr.getch() + + if ch == -1: # 沒有操作 + continue + + # 處理按鍵 + if ch in (curses.KEY_UP, ord('k')): + idx_stack[-1] = (current_idx - 1) % len(current_menu.children) + + elif ch in (curses.KEY_DOWN, ord('j')): + idx_stack[-1] = (current_idx + 1) % len(current_menu.children) -# ================ 關於載具檢視的部份 =================== + elif ch == (ord('O')): + # 進入工程模式 + state.intoENGINEER() - def create_vehicles_list_menu(self, state: PanelState, page=0, items_per_page=8): - """動態創建 已連線載具 列表選單(支持分頁) - 每個 vehicle-component 組合都是獨立的選單項目 - """ - children = [] - - if not state.connected_vehicles_dict: - children.append(MenuNode("(Empty)", "目前沒有已連線的載具", None)) - else: - total_items = len(state.connected_vehicles_dict) - total_pages = (total_items + items_per_page - 1) // items_per_page - start_idx = page * items_per_page - end_idx = min(start_idx + items_per_page, total_items) - - # vehicle_id_list 現在是 (sysid, compid) 的元組列表 - vehicle_comp_list = list(state.connected_vehicles_dict.keys()) - - # 顯示當前頁的物件 - for sysid, compid in vehicle_comp_list[start_idx:end_idx]: - # 建立顯示名稱 - display_name = f"Vehicle #{sysid} - Comp #{compid}" - desc = f"載具 {sysid} 組件 {compid}" + elif ch == (ord('o')): + # 離開工程模式 + state.intoSTART() + + elif ch == curses.KEY_LEFT: + # 返回上層 + if len(menu_stack) > 1: + menu_stack.pop() + idx_stack.pop() - vehicle_menu = MenuNode(display_name, desc, "INSPECT_VEHICLE") - # 將 sysid 和 compid 附加到選單項目上 - vehicle_menu.sysid = sysid - vehicle_menu.compid = compid - children.append(vehicle_menu) - - # 添加分頁控制 - if total_pages > 1: - children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) - if page > 0: - prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") - prev_node.page = page - 1 - children.append(prev_node) - if page < total_pages - 1: - next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") - next_node.page = page + 1 - children.append(next_node) - - children.append(MenuNode("GoUp", "回到上層選單", "BACK")) - menu = MenuNode("Connected Vehicles", f"已連線載具列表 (第 {page + 1} 頁)", children=children) - menu.current_page = page - return menu - - def show_vehicle_info(self, stdscr, sysid, compid, cmd_q: queue.Queue, state: PanelState): - """顯示載具組件詳細資訊(動態更新,顯示變化率)""" - - # 等待資訊準備 - start = time.time() - while not state.vehicle_info_single.get('InfoReady', False): - if time.time() - start > 2: - state.panel_info_msg_list.append(("Fail! Vehicle Info NOT Acquired!", time.time())) - return - time.sleep(0.05) - - info = state.vehicle_info_single - height, width = stdscr.getmaxyx() - dialog_height = min(22, height - 4) - dialog_width = min(70, width - 4) - start_y = (height - dialog_height) // 2 - start_x = (width - dialog_width) // 2 - - dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) - dialog_win.nodelay(True) # 非阻塞模式,允許動態更新 - dialog_win.keypad(True) - - # MAV_TYPE 名稱對應 - MAV_TYPE_NAMES = { - 0: "Generic", 1: "Fixed Wing", 2: "Quadrotor", 3: "Helicopter", - 4: "Antenna Tracker", 5: "GCS", 6: "Airship", 10: "Ground Rover", - 12: "Boat", 13: "Submarine", 26: "Gimbal", 30: "Camera" - } - - # 動態更新迴圈 - last_update = time.time() - while True: - current_time = time.time() - - # 每 1 秒重新請求資料 - if current_time - last_update >= 1.0: - # 觸發資料更新(透過 Orchestrator) - cmd_q.put(("INSPECT_VEHICLE", sysid, compid)) - # 等待新資料準備好 - wait_start = time.time() - state.vehicle_info_single['InfoReady'] = False - while not state.vehicle_info_single.get('InfoReady', False): - if time.time() - wait_start > 0.5: # 最多等 0.5 秒 - break - time.sleep(0.01) - # 更新 info 參照 - info = state.vehicle_info_single - last_update = current_time - - dialog_win.clear() - dialog_win.border() - dialog_win.addstr(0, 2, f" Vehicle #{info['sysid']} - Component #{info['compid']} ", curses.A_BOLD) - - # === 基礎資訊 === - dialog_win.addstr(2, 2, "[Identity]", curses.A_UNDERLINE) - dialog_win.addstr(2, 32, "[Connection]", curses.A_UNDERLINE) - - # # MAV Type # 這個用不到 - # mav_type = info.get('vehicle_type', 'N/A') - # mav_type_str = f"{mav_type} ({MAV_TYPE_NAMES.get(mav_type, 'Unknown')})" if isinstance(mav_type, int) else str(mav_type) - # dialog_win.addstr(3, 2, f"MAV Type : {mav_type_str}") - - # Component Type - dialog_win.addstr(3, 2, f"Component Type : {info.get('component_type', 'N/A')}") - - # Autopilot Type - if info.get('mav_autopilot') is not None: - dialog_win.addstr(4, 2, f"Autopilot : {info.get('mav_autopilot', 'N/A')}") - - # Connection Info - dialog_win.addstr(3, 32, f"Connection : {info.get('connection_type', 'N/A')}") - dialog_win.addstr(4, 32, f"Socket ID : #{info.get('socket_id', 'N/A')}") - - # === 封包統計 === - stats = info.get('packet_stats', {}) - dialog_win.addstr(7, 2, "[Packet Statistics]", curses.A_UNDERLINE) - - received = stats.get('received', 0) - lost = stats.get('lost', 0) - loss_rate = stats.get('loss_rate', 0.0) - last_seq = stats.get('last_seq', 'N/A') - - # 計算變化 - received_delta = stats.get('received_delta', 0) - lost_delta = stats.get('lost_delta', 0) - - # 顯示變化率 - recv_str = f"{received:6d}" - if received_delta > 0: - recv_str += f" (+{received_delta}↑)" - - lost_str = f"{lost:4d}" - if lost_delta > 0: - lost_str += f" (+{lost_delta}↑)" - - dialog_win.addstr(8, 2, f"Received : {recv_str}") - dialog_win.addstr(8, 32, f"Lost : {lost_str}") - dialog_win.addstr(9, 2, f"Loss Rate : {loss_rate:.2f}%") - dialog_win.addstr(9, 32, f"Last Seq : {last_seq}") - - # 最後接收時間 - last_msg_time = stats.get('last_msg_time') - if last_msg_time: - time_str = time.strftime("%H:%M:%S", time.localtime(last_msg_time)) - elapsed = current_time - last_msg_time - dialog_win.addstr(10, 2, f"Last Time : {time_str}") - dialog_win.addstr(10, 32, f"Elapsed : {elapsed:.1f}s") - else: - dialog_win.addstr(10, 2, "Last Time : N/A") - - # === 訊息類型分佈 === - dialog_win.addstr(12, 2, "[Message Types] (Top 12)", curses.A_UNDERLINE) - - msg_counts = info.get('msg_type_counts', {}) - - # MAVLink 訊息名稱對應(縮寫版本) - MSG_NAMES = { - 0: "HB", 1: "SYS_ST", 24: "GPS_RAW", 27: "RAW_IMU", - 30: "ATT", 32: "LOC_POS", 33: "GLB_POS", 62: "NAV_CTL", - 74: "VFR_HUD", 147: "BATT_ST" - } - - # 顯示前 12 個最常見的訊息類型(兩列各 6 個) - msg_items = list(msg_counts.items())[:12] - line = 13 - for i, (msg_id, count) in enumerate(msg_items): - msg_name = MSG_NAMES.get(msg_id, "???") - delta = stats.get(f'msg_delta_{msg_id}', 0) - - # 格式化數據 - if delta > 0: - data_str = f"{count}(+{delta}↑)" - else: - data_str = f"{count}" - - # 格式化顯示:[ID]NAME DATA (ID固定3字符寬度,右對齊) - display_str = f"[{msg_id:3d}]{msg_name:8s} {data_str}" - - # 左列(偶數索引)或右列(奇數索引) - col = 2 if i % 2 == 0 else 36 - row = line + (i // 2) - - if row >= dialog_height - 3: # 避免超出邊界 - break - - dialog_win.addstr(row, col, display_str) - - # 確保跳過顯示區域 - line = line + 6 - - dialog_win.addstr(dialog_height - 2, 2, "Press any key to return... | Auto-refresh: 1.0s") - dialog_win.refresh() - - # 檢查是否有按鍵(非阻塞) - ch = dialog_win.getch() - if ch != -1: # 有按鍵則退出 - break - - # 短暫延遲 - time.sleep(0.1) - - state.vehicle_info_single['InfoReady'] = False - del dialog_win - stdscr.clear() - stdscr.refresh() - -# ================ 關於 主要選單 的部份 =================== - - def menu_tree(self): - """建立多層選單結構""" - return MenuNode("Main Menu", children=[ - MenuNode("MavLink Object", "MavLink 通道選項", children=[ - MenuNode("New+", children=[ - MenuNode("UDP InBound", children=[ - MenuNode("IP(Listen)", "設定監聽的 IP 位址", "TEXT_UDP_IP"), - MenuNode("Port(Listen)", "設定監聽的 Port", "TEXT_UDP_PORT"), - MenuNode("Create", "建立 UDP InBound 連結口", "CREATE_UDP_INBOUND"), - ]), - MenuNode("UDP OutBound", children=[ - MenuNode("IP(Target)", "設定目標的 IP 位址", "TEXT_UDP_IP"), - MenuNode("Port(Target)", "設定目標的 Port", "TEXT_UDP_PORT"), - MenuNode("Create", "建立 UDP OutBound 連結口", "CREATE_UDP_OUTBOUND"), - ]), - ]), - MenuNode("ListAll", "顯示並管理所有連結口", "LIST_MAV_OBJECT"), - ]), - MenuNode("Serial Manager", "Serial 連接埠選項", children=[ - MenuNode("New+", "新增 Serial 連接埠", action = "LIST_SERIAL_RES"), - MenuNode("ListAll", "顯示並管理已連線的 Serial", action = "LIST_SERIAL_LINKS"), - ]), - MenuNode("Vehicles Insp.", "檢視已連線的遠端載具", action = "INSPECT_VEHICLES"), - MenuNode("Engineer Mode", "工程模式", children=[ - MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), - MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), - MenuNode("Stop Serial M.", "停止 Serial 端口轉接", "STOP_SERIAL_MANAGER"), - ]), - MenuNode("Shutdown", "關閉整個系統", children=[ - MenuNode("Return", "繼續運行", "BACK"), - MenuNode("Confirm", "關閉系統", "QUIT"), - ]), - ]) - - def panel_thread(self, cmd_q: queue.Queue, state: PanelState, stop_evt: threading.Event): - stdscr = None - - def cleanup(): - """清理 curses 狀態""" - if stdscr: - stdscr.keypad(False) - curses.nocbreak() - curses.echo() - curses.endwin() - - def pre_panel_shutdown(): - # 先關閉所有模組 再關閉面板 - cmd_q.put("SHUTDOWN_BRIDGE") - cmd_q.put("SHUTDOWN_MANAGER") - cmd_q.put("SHUTDOWN_SERIAL_MANAGER") - - def draw_menu(screen): - nonlocal stdscr - stdscr = screen - - curses.curs_set(0) - stdscr.nodelay(False) # 阻塞讀鍵 - stdscr.keypad(True) - - # 選單導航狀態 - menu_stack = [self.menu_tree()] # 選單堆疊 - idx_stack = [0] # 索引堆疊 - - state.intoSTART() # 設定狀態為運行中 - - while not stop_evt.is_set(): - - current_menu = menu_stack[-1] - current_idx = idx_stack[-1] - - # 獲取終端機尺寸 - height, width = stdscr.getmaxyx() - # 簡單暴力的限制視窗的大小 - MIN_HEIGHT = ( - 2 + # 邊界 - 6 + # 狀態列 操作說明列 一個空白 - 11+ # 最大選單 與 空白區 - 5 # 訊息區域 - ) - if height < MIN_HEIGHT or width < 60: - logger.error("Terminal size too small for Control Panel.") - break - - stdscr.clear() - stdscr.border() - - # 更新模組狀態顯示 - stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD) - stdscr.addstr(1, 2, f" Panel Status : {state.panel_status}") - stdscr.addstr(2, 2, f"Object Manager State : {state.object_manager_state}") - stdscr.addstr(3, 2, f"Mavlink Bridge State : {state.mavlink_bridge_state}") - stdscr.addstr(4, 2, f"Socket Object number : {len(state.socket_object_list)}") - stdscr.addstr(2, 36, f"Serial Manager State : {state.serial_manager_state}") - - # 顯示當前選單項目 - start_line = 6 - for i, child in enumerate(current_menu.children): - marker = "➤ " if i == current_idx else " " - # 動態顯示已輸入的值 - desc = child.desc - if child.action == "TEXT_UDP_IP" and state.udp_info_temp["IP"]: - desc = f"{child.desc} [{state.udp_info_temp['IP']}]" - elif child.action == "TEXT_UDP_PORT" and state.udp_info_temp["Port"]: - desc = f"{child.desc} [{state.udp_info_temp['Port']}]" - elif child.action == "SET_SERIAL_COMM" and state.serial_info_temp["CommunicationType"]: - desc = f"{child.desc} [{state.serial_info_temp['CommunicationType']}]" - elif child.action == "TEXT_BAUD_SERIAL" and state.serial_info_temp["Baud"]: - desc = f"{child.desc} [{state.serial_info_temp['Baud']}]" - elif child.action == "LINK_SERIAL_TO_MIDDLEWARE_UDP": - link_status = "Yes" if state.serial_info_temp["Go2Middleware"] else "No" - desc = f"{child.desc} [{link_status}]" - - line = f"{marker}{child.name:15s} – {desc}" - attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL - stdscr.addstr(start_line + i, 4, line, attr) - - # 顯示訊息區域 - # info_start_line = start_line + len(current_menu.children) + 1 - info_start_line = height - 8 - max_msg_lines = 5 # 最多顯示 5 行訊息 - current_time = time.time() - - # 清理過時的訊息 - state.panel_info_msg_list = [ - (msg, timestamp) for msg, timestamp in state.panel_info_msg_list - if current_time - timestamp < 2.0 #秒數 - ] - - # 只顯示最新的 max_msg_lines 條訊息 - display_msgs = state.panel_info_msg_list[-max_msg_lines:] - - for i, msg_data in enumerate(display_msgs): - if info_start_line + i >= help_line - 1: # 避免超出邊界 - break - msg = msg_data[0] if isinstance(msg_data, tuple) else msg_data - # 截斷過長的訊息 - max_msg_width = width - 6 - if len(msg) > max_msg_width: - msg = msg[:max_msg_width-3] + "..." - - stdscr.addstr(info_start_line + i, 2, f"💬 {msg}", curses.A_BOLD) - - - - # 操作說明 - # help_line = start_line + len(current_menu.children) + 2 - help_line = height - 2 - stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層", curses.A_DIM) - - stdscr.refresh() - - # 若進入 TERMINATION 狀態,畫面可以刷新 但是不能操作 - # 驗證 其他附屬模組的狀態都停止後 就進入 STOPPED 狀態並跳出迴圈 - # 超過幾秒沒有反應就強制關閉 - if state.panel_status == "Terminating": - if time.time() - state.termination_start_time > 7: # 其他組件設定5秒 這邊給多一點 - logger.warning("Control Panel forced shutdown after timeout.") - state.intoSTOPPED() - # stop_evt.set() - # continue - break - time.sleep(0.1) - if (state.mavlink_bridge_state == "Stopped" and - state.object_manager_state == "Stopped" and - state.serial_manager_state == "Stopped"): - state.intoSTOPPED() - # stop_evt.set() - break - continue - - # 設定短暫的 timeout,讓執行緒能夠響應 stop_evt - stdscr.timeout(100) - ch = stdscr.getch() - - if ch == -1: # 沒有操作 - continue - - # 處理按鍵 - if ch in (curses.KEY_UP, ord('k')): - idx_stack[-1] = (current_idx - 1) % len(current_menu.children) - - elif ch in (curses.KEY_DOWN, ord('j')): - idx_stack[-1] = (current_idx + 1) % len(current_menu.children) - - elif ch == (ord('O')): - # 進入工程模式 - state.intoENGINEER() - - elif ch == (ord('o')): - # 離開工程模式 - state.intoSTART() - - elif ch == curses.KEY_LEFT: - # 返回上層 - if len(menu_stack) > 1: - menu_stack.pop() - idx_stack.pop() - - elif ch == curses.KEY_RIGHT: - # 進入下層 (但不執行動作) - selected = current_menu.children[current_idx] - if selected.children: # 有子選單 - menu_stack.append(selected) - idx_stack.append(0) + elif ch == curses.KEY_RIGHT: + # 進入下層 (但不執行動作) + selected = current_menu.children[current_idx] + if selected.children: # 有子選單 + menu_stack.append(selected) + idx_stack.append(0) elif ch in (ord('q'), 27): if state.panel_status == "Engineer": @@ -942,7 +428,6 @@ class ControlPanel: idx_stack.pop() elif selected.action == "LINK_SERIAL_TO_MIDDLEWARE_UDP_YES": - logger.info("mark A") state.serial_info_temp["Go2Middleware"] = True menu_stack.pop() idx_stack.pop() @@ -966,144 +451,658 @@ class ControlPanel: menu_stack.append(created_list_menu) idx_stack.append(0) - elif selected.action == "LIST_SERIAL_LINKS": - created_list_menu = self.create_linked_serial_menu(state, page=0) - menu_stack.append(created_list_menu) - idx_stack.append(0) - - elif selected.action == "INSPECT_LINKED_SERIAL": - # 顯示 Serial 連結詳細資訊 - if hasattr(selected, 'serial_id'): - cmd_q.put(("INSPECT_LINKED_SERIAL", selected.serial_id)) - self.show_linked_serial_info(stdscr, selected.serial_id, state) + elif selected.action == "LIST_SERIAL_LINKS": + created_list_menu = self.create_linked_serial_menu(state, page=0) + menu_stack.append(created_list_menu) + idx_stack.append(0) + + elif selected.action == "INSPECT_LINKED_SERIAL": + # 顯示 Serial 連結詳細資訊 + if hasattr(selected, 'serial_id'): + cmd_q.put(("INSPECT_LINKED_SERIAL", selected.serial_id)) + self.show_linked_serial_info(stdscr, selected.serial_id, state) + + elif selected.action == "REMOVE_LINKED_SERIAL": + # 移除 Serial 連結 + if hasattr(selected, 'serial_id'): + cmd_q.put(("REMOVE_LINKED_SERIAL", selected.serial_id)) + # 返回上層(回到列表) + if len(menu_stack) > 1: + menu_stack.pop() + idx_stack.pop() + # 一樣退兩層 + menu_stack.pop() + idx_stack.pop() + + elif selected.action == "LIST_MAV_OBJECT": + # 動態生成 mavlink_object 列表選單 + created_list_menu = self.create_object_list_menu(state, page=0) + menu_stack.append(created_list_menu) + idx_stack.append(0) + + elif selected.action in ("PREV_PAGE", "NEXT_PAGE"): + if hasattr(selected, 'page'): + current_list_menu = menu_stack[-1] + menu_stack.pop() + idx_stack.pop() + + # 依據選單種類 重新建立分頁 + if current_list_menu.name == "Serial Port List": + created_list_menu = self.create_serial_port_menu(state, page=selected.page) + elif current_list_menu.name == "Object List": + created_list_menu = self.create_object_list_menu(state, page=selected.page) + elif current_list_menu.name == "Linked Serial List": + created_list_menu = self.create_linked_serial_menu(state, page=selected.page) + elif current_list_menu.name == "Connected Vehicles": + created_list_menu = self.create_vehicles_list_menu(state, page=selected.page) + else: + # 不支援的選單類型,回到原本的選單 + menu_stack.append(current_list_menu) + idx_stack.append(0) + continue + + menu_stack.append(created_list_menu) + idx_stack.append(0) + + elif selected.action == "INSPECT_MAV_OBJECT": + # 顯示物件詳細資訊 + if hasattr(selected, 'socket_id'): + cmd_q.put(("INSPECT_MAV_OBJECT", selected.socket_id)) + self.show_object_info(stdscr, selected.socket_id, state) + + elif selected.action == "REMOVE_MAV_OBJECT": + # 移除物件 + if hasattr(selected, 'socket_id'): + cmd_q.put(("REMOVE_OBJECT", selected.socket_id)) + # 返回上層(回到列表) + if len(menu_stack) > 1: + menu_stack.pop() + idx_stack.pop() + # 反正刷新列表會出錯 乾脆再退一層 在下一次進入列表時刷新就好 + menu_stack.pop() + idx_stack.pop() + + elif selected.action == "MAVOBJ_MAKE_LINK": + # 建立轉發連結 + if hasattr(selected, 'socket_id'): + target_id = self.select_target_socket(stdscr, selected.socket_id, state) + if target_id is not None: + cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id)) + cmd_q.put(("MAVOBJ_ADD_TARGET", target_id, selected.socket_id)) # 雙向連結 + + elif selected.action == "MAVOBJ_CANCEL_LINK": + # 取消轉發連結 + if hasattr(selected, 'socket_id'): + target_id = self.select_target_socket(stdscr, selected.socket_id, state, remove_mode=True) + if target_id is not None: + cmd_q.put(("MAVOBJ_REMOVE_TARGET", selected.socket_id, target_id)) + cmd_q.put(("MAVOBJ_REMOVE_TARGET", target_id, selected.socket_id)) # 雙向取消連結 + + elif selected.action == "MAVOBJ_ADD_TARGET": + # 添加目標端口 + if state.panel_status != "Engineer": + state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) + continue # 只有在工程模式下才能操作 + if hasattr(selected, 'socket_id'): + target_id = self.select_target_socket(stdscr, selected.socket_id, state) + if target_id is not None: + cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id)) + + elif selected.action == "STOP_MANAGER": + if state.panel_status != "Engineer": + state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) + continue # 只有在工程模式下才能操作 + cmd_q.put("SHUTDOWN_MANAGER") + + elif selected.action == "STOP_BRIDGE": + if state.panel_status != "Engineer": + state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) + continue # 只有在工程模式下才能操作 + cmd_q.put("SHUTDOWN_BRIDGE") + + elif selected.action == "STOP_SERIAL_MANAGER": + if state.panel_status != "Engineer": + state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) + continue # 只有在工程模式下才能操作 + cmd_q.put("SHUTDOWN_SERIAL_MANAGER") + + elif selected.action == "INSPECT_VEHICLES": + # 進入載具檢視選單 + cmd_q.put("UPDATE_VEHICLES_LIST") + created_list_menu = self.create_vehicles_list_menu(state, page=0) + menu_stack.append(created_list_menu) + idx_stack.append(0) + + elif selected.action == "INSPECT_VEHICLE": + # 顯示載具詳細資訊 + if hasattr(selected, 'sysid') and hasattr(selected, 'compid'): + cmd_q.put(("INSPECT_VEHICLE", selected.sysid, selected.compid)) + self.show_vehicle_info(stdscr, selected.sysid, selected.compid, cmd_q, state) + + elif callable(selected.action): + # 執行函式 + cmd_q.put(selected.action) + + try: + curses.wrapper(draw_menu) + except KeyboardInterrupt: + pass + finally: + cleanup() + + # ================ 關於 mavlink object 的部份 =================== + + def create_object_list_menu(self, state: PanelState, page=0, items_per_page=8): + """動態創建 mavlink_object 列表選單(支持分頁)""" + children = [] + + if not state.socket_object_list: + children.append(MenuNode("(Empty)", "目前沒有連結口", None)) + else: + total_items = len(state.socket_object_list) + total_pages = (total_items + items_per_page - 1) // items_per_page + start_idx = page * items_per_page + end_idx = min(start_idx + items_per_page, total_items) + + # 顯示當前頁的物件 + for socket_id in state.socket_object_list[start_idx:end_idx]: + # 為每個 socket 創建子選單 + obj_menu = MenuNode(f"Socket #{socket_id}", f"連結口 {socket_id}", None, children=[ + MenuNode("Info", "查看詳細資訊", "INSPECT_MAV_OBJECT"), + MenuNode("Make Link", "建立轉發連結", "MAVOBJ_MAKE_LINK"), + MenuNode("Cancel Link", "取消轉發連結", "MAVOBJ_CANCEL_LINK"), + MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"), + MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"), + MenuNode("GoUp", "回到列表", "BACK"), + ]) + # 將 socket_id 附加到每個子選單項目上 + for child in obj_menu.children: + child.socket_id = socket_id + children.append(obj_menu) + + # 添加分頁控制 + if total_pages > 1: + children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) + if page > 0: + prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") + prev_node.page = page - 1 + children.append(prev_node) + if page < total_pages - 1: + next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") + next_node.page = page + 1 + children.append(next_node) + + children.append(MenuNode("GoUp", "回到上層選單", "BACK")) + menu = MenuNode("Object List", f"連結口列表 (第 {page + 1} 頁)", children=children) + menu.current_page = page + return menu + + def show_object_info(self, stdscr, socket_id, state: PanelState): + """顯示物件詳細資訊的對話框""" + + start = time.time() + while not state.socket_info_single.get('InfoReady', False): + # 太久沒有回應 + if time.time() - start > 2: + state.panel_info_msg_list.append(("Fail! Socket Info NOT Aquire!", time.time())) + return + time.sleep(0.05) # 等待資訊準備好 + + height, width = stdscr.getmaxyx() + dialog_height = 15 + dialog_width = min(70, width - 4) + start_y = (height - dialog_height) // 2 + start_x = (width - dialog_width) // 2 + + dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) + dialog_win.border() + dialog_win.addstr(0, 2, f" Socket #{socket_id} 詳細資訊 ", curses.A_BOLD) + + # 這裡顯示基本資訊 + dialog_win.addstr(2, 2, f"Socket ID : {socket_id}") + dialog_win.addstr(3, 2, f"Socket status : {state.socket_info_single.get('socket_state', 'N/A')}") + # show_str = ", ".join(map(str, state.socket_info_single.get('socket_type', ''))) + dialog_win.addstr(4, 2, f"Socket Type : {state.socket_info_single.get('socket_type', '')}") + dialog_win.addstr(4, 30, f"{state.socket_info_single.get('socket_connection_string', '')}") + show_str = ",".join(map(str, state.socket_info_single.get('bridge_msg_types', ''))) + dialog_win.addstr(5, 2, f"Bridge Pack : {show_str if show_str else 'N/A'}") + show_str = ",".join(map(str, state.socket_info_single.get('return_msg_types', ''))) + dialog_win.addstr(6, 2, f"Return Pack : {show_str if show_str else 'N/A'}") + dialog_win.addstr(7, 2, f"Primary Socket ID: {state.socket_info_single.get('primary_socket_id', 'It Self')}") + show_str = ",".join(map(str, state.socket_info_single.get('target_sockets', ''))) + dialog_win.addstr(8, 2, f"Switching Targets: {show_str if show_str else 'N/A'}") + + state.socket_info_single['InfoReady'] = False # 重置狀態以便下次使用 + + dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...") + dialog_win.refresh() + + dialog_win.getch() + del dialog_win + stdscr.clear() + stdscr.refresh() + + def select_target_socket(self, stdscr, source_socket_id, state: PanelState, remove_mode=False): + """選擇目標 socket 的對話框""" + height, width = stdscr.getmaxyx() + dialog_height = min(15, len(state.socket_object_list) + 5) + dialog_width = min(50, width - 4) + start_y = (height - dialog_height) // 2 + start_x = (width - dialog_width) // 2 + + dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) + dialog_win.keypad(True) + + title = "選擇要移除的目標" if remove_mode else "選擇轉發目標" + available_sockets = [sid for sid in state.socket_object_list if sid != source_socket_id] + + if not available_sockets: + dialog_win.border() + dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD) + dialog_win.addstr(2, 2, "沒有可用的目標") + dialog_win.addstr(4, 2, "按任意鍵返回...") + dialog_win.refresh() + dialog_win.getch() + del dialog_win + stdscr.clear() + stdscr.refresh() + return None + + selected_idx = 0 + + while True: + dialog_win.clear() + dialog_win.border() + dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD) + + for i, socket_id in enumerate(available_sockets): + marker = "➤" if i == selected_idx else " " + attr = curses.A_REVERSE if i == selected_idx else curses.A_NORMAL + dialog_win.addstr(2 + i, 2, f"{marker} Socket #{socket_id}", attr) + + dialog_win.addstr(dialog_height - 2, 2, "Enter確認 ESC取消") + dialog_win.refresh() + + ch = dialog_win.getch() + + if ch in (curses.KEY_UP, ord('k')): + selected_idx = (selected_idx - 1) % len(available_sockets) + elif ch in (curses.KEY_DOWN, ord('j')): + selected_idx = (selected_idx + 1) % len(available_sockets) + elif ch in (curses.KEY_ENTER, 10, 13): + result = available_sockets[selected_idx] + del dialog_win + stdscr.clear() + stdscr.refresh() + return result + elif ch == 27: # ESC + del dialog_win + stdscr.clear() + stdscr.refresh() + return None + + # ================ 關於 serial link 的部份 =================== - elif selected.action == "REMOVE_LINKED_SERIAL": - # 移除 Serial 連結 - if hasattr(selected, 'serial_id'): - cmd_q.put(("REMOVE_LINKED_SERIAL", selected.serial_id)) - # 返回上層(回到列表) - if len(menu_stack) > 1: - menu_stack.pop() - idx_stack.pop() - # 一樣退兩層 - menu_stack.pop() - idx_stack.pop() + def create_serial_port_menu(self, state: PanelState, page=0, items_per_page=8): + """動態創建 serial port 列表選單(支持分頁)""" + children = [] + + # 獲取可用的 Serial 連接埠列表 + # serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓 + serial_ports = acquireSerial.get_serial_ports_with_filter(['/dev/ttyUSB*', '/dev/ttyACM*']) + + if not serial_ports: + children.append(MenuNode("(Empty)", "目前沒有串口設備", None)) + else: + total_items = len(serial_ports) + total_pages = (total_items + items_per_page - 1) // items_per_page + start_idx = page * items_per_page + end_idx = min(start_idx + items_per_page, total_items) + + # 顯示當前頁的串口 + for port in serial_ports[start_idx:end_idx]: + port_menu = MenuNode(f"{port}", children=[ + MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[ + MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"), + # MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"), + ]), + MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"), + MenuNode("Link to MW", "直接建立 Middleware UDP", "LINK_SERIAL_TO_MIDDLEWARE_UDP", children=[ + MenuNode("Yes", action = "LINK_SERIAL_TO_MIDDLEWARE_UDP_YES"), + MenuNode("No", action = "LINK_SERIAL_TO_MIDDLEWARE_UDP_NO"), + ]), + MenuNode("Create", "建立此串口", "CREATE_SERIAL_PORT"), + MenuNode("GoUp", "回到列表", "BACK"), + ]) + # 將 port 附加到每個子選單項目上 + for child in port_menu.children: + child.port = port + children.append(port_menu) + + # 添加分頁控制 + if total_pages > 1: + children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) + if page > 0: + prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") + prev_node.page = page - 1 + children.append(prev_node) + if page < total_pages - 1: + next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") + next_node.page = page + 1 + children.append(next_node) + + children.append(MenuNode("GoUp", "回到上層選單", "BACK")) + menu = MenuNode("Serial Port List", f"串口列表 (第 {page + 1} 頁)", children=children) + menu.current_page = page + return menu - elif selected.action == "LIST_MAV_OBJECT": - # 動態生成 mavlink_object 列表選單 - created_list_menu = self.create_object_list_menu(state, page=0) - menu_stack.append(created_list_menu) - idx_stack.append(0) + def create_linked_serial_menu(self, state: PanelState, page=0, items_per_page=8): + """動態創建 已連線的 serial port 列表選單(支持分頁)並包含後續管理功能""" + children = [] + + if not state.linked_serial_dict: + children.append(MenuNode("(Empty)", "目前沒有連結口", None)) + else: + total_items = len(state.linked_serial_dict) + total_pages = (total_items + items_per_page - 1) // items_per_page + start_idx = page * items_per_page + end_idx = min(start_idx + items_per_page, total_items) + + # 顯示當前頁的物件 + linked_serial_id_list = list(state.linked_serial_dict.keys()) + for serial_id in linked_serial_id_list[start_idx:end_idx]: + # 為每個 socket 創建子選單 + obj_menu = MenuNode(f"Serial #{serial_id}", f"連結口 {serial_id}", None, children=[ + MenuNode("Info", "查看詳細資訊", "INSPECT_LINKED_SERIAL"), + MenuNode("Remove", "移除此連結口", "REMOVE_LINKED_SERIAL"), + # MenuNode("Change UDP Target", "變更目標 UDP (工程)", "CHANGE_LINKED_SERIAL_TARGET"), + MenuNode("GoUp", "回到列表", "BACK"), + ]) + # 將 serial_id 附加到每個子選單項目上 + for child in obj_menu.children: + child.serial_id = serial_id + children.append(obj_menu) + + # 添加分頁控制 + if total_pages > 1: + children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) + if page > 0: + prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") + prev_node.page = page - 1 + children.append(prev_node) + if page < total_pages - 1: + next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") + next_node.page = page + 1 + children.append(next_node) + + children.append(MenuNode("GoUp", "回到上層選單", "BACK")) + menu = MenuNode("Linked Serial List", f"連結口列表 (第 {page + 1} 頁)", children=children) + menu.current_page = page + return menu - elif selected.action in ("PREV_PAGE", "NEXT_PAGE"): - if hasattr(selected, 'page'): - current_list_menu = menu_stack[-1] - menu_stack.pop() - idx_stack.pop() - - # 依據選單種類 重新建立分頁 - if current_list_menu.name == "Serial Port List": - created_list_menu = self.create_serial_port_menu(state, page=selected.page) - elif current_list_menu.name == "Object List": - created_list_menu = self.create_object_list_menu(state, page=selected.page) - elif current_list_menu.name == "Linked Serial List": - created_list_menu = self.create_linked_serial_menu(state, page=selected.page) - elif current_list_menu.name == "Connected Vehicles": - created_list_menu = self.create_vehicles_list_menu(state, page=selected.page) - else: - # 不支援的選單類型,回到原本的選單 - menu_stack.append(current_list_menu) - idx_stack.append(0) - continue - - menu_stack.append(created_list_menu) - idx_stack.append(0) - - elif selected.action == "INSPECT_MAV_OBJECT": - # 顯示物件詳細資訊 - if hasattr(selected, 'socket_id'): - cmd_q.put(("INSPECT_MAV_OBJECT", selected.socket_id)) - self.show_object_info(stdscr, selected.socket_id, state) - - elif selected.action == "REMOVE_MAV_OBJECT": - # 移除物件 - if hasattr(selected, 'socket_id'): - cmd_q.put(("REMOVE_OBJECT", selected.socket_id)) - # 返回上層(回到列表) - if len(menu_stack) > 1: - menu_stack.pop() - idx_stack.pop() - # 反正刷新列表會出錯 乾脆再退一層 在下一次進入列表時刷新就好 - menu_stack.pop() - idx_stack.pop() + def show_linked_serial_info(self, stdscr, serial_id, state: PanelState): + """顯示 Serial 連結詳細資訊的對話框""" - elif selected.action == "MAVOBJ_MAKE_LINK": - # 建立轉發連結 - if hasattr(selected, 'socket_id'): - target_id = self.select_target_socket(stdscr, selected.socket_id, state) - if target_id is not None: - cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id)) - cmd_q.put(("MAVOBJ_ADD_TARGET", target_id, selected.socket_id)) # 雙向連結 + start = time.time() + while not state.serial_info_single.get('InfoReady', False): + # 太久沒有回應 + if time.time() - start > 2: + state.panel_info_msg_list.append(("Fail! Serial Info NOT Aquire!", time.time())) + return + time.sleep(0.05) # 等待資訊準備好 - elif selected.action == "MAVOBJ_CANCEL_LINK": - # 取消轉發連結 - if hasattr(selected, 'socket_id'): - target_id = self.select_target_socket(stdscr, selected.socket_id, state, remove_mode=True) - if target_id is not None: - cmd_q.put(("MAVOBJ_REMOVE_TARGET", selected.socket_id, target_id)) - cmd_q.put(("MAVOBJ_REMOVE_TARGET", target_id, selected.socket_id)) # 雙向取消連結 + height, width = stdscr.getmaxyx() + dialog_height = 15 + dialog_width = min(70, width - 4) + start_y = (height - dialog_height) // 2 + start_x = (width - dialog_width) // 2 + + dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) + dialog_win.border() + dialog_win.addstr(0, 2, f" Serial Link #{serial_id} 詳細資訊 ", curses.A_BOLD) - elif selected.action == "MAVOBJ_ADD_TARGET": - # 添加目標端口 - if state.panel_status != "Engineer": - state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) - continue # 只有在工程模式下才能操作 - if hasattr(selected, 'socket_id'): - target_id = self.select_target_socket(stdscr, selected.socket_id, state) - if target_id is not None: - cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id)) + # 從 linked_serial_dict 獲取資訊 + serial_info = state.linked_serial_dict.get(serial_id, {}) + + if not serial_info: + dialog_win.addstr(2, 2, f"無法取得 Serial #{serial_id} 的資訊") + else: + # 顯示基本資訊 + dialog_win.addstr(2, 2, f"Serial ID : {serial_id}") + dialog_win.addstr(3, 2, f"Serial Port : {state.serial_info_single.get('serial_port', 'N/A')}") + dialog_win.addstr(4, 2, f"Baudrate : {state.serial_info_single.get('baudrate', 'N/A')}") + dialog_win.addstr(5, 2, f"Communication : {state.serial_info_single.get('receiver_type', 'N/A')}") + dialog_win.addstr(6, 2, f"Target UDP Port : {state.serial_info_single.get('target_port', 'N/A')}") + dialog_win.addstr(7, 2, f"Status : {state.serial_info_single.get('status', 'Running')}") + + # 如果有額外資訊可以繼續添加 + if 'thread_alive' in serial_info: + thread_status = "Alive" if serial_info['thread_alive'] else "Stopped" + dialog_win.addstr(8, 2, f"Thread Status : {thread_status}") + + state.serial_info_single['InfoReady'] = False # 重置狀態以便下次使用 - elif selected.action == "STOP_MANAGER": - if state.panel_status != "Engineer": - state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) - continue # 只有在工程模式下才能操作 - cmd_q.put("SHUTDOWN_MANAGER") + dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...") + dialog_win.refresh() + + dialog_win.getch() + del dialog_win + stdscr.clear() + stdscr.refresh() - elif selected.action == "STOP_BRIDGE": - if state.panel_status != "Engineer": - state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) - continue # 只有在工程模式下才能操作 - cmd_q.put("SHUTDOWN_BRIDGE") + # ================ 關於載具檢視的部份 =================== - elif selected.action == "STOP_SERIAL_MANAGER": - if state.panel_status != "Engineer": - state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) - continue # 只有在工程模式下才能操作 - cmd_q.put("SHUTDOWN_SERIAL_MANAGER") + def create_vehicles_list_menu(self, state: PanelState, page=0, items_per_page=8): + """動態創建 已連線載具 列表選單(支持分頁) + 每個 vehicle-component 組合都是獨立的選單項目 + """ + children = [] + + if not state.connected_vehicles_dict: + children.append(MenuNode("(Empty)", "目前沒有已連線的載具", None)) + else: + total_items = len(state.connected_vehicles_dict) + total_pages = (total_items + items_per_page - 1) // items_per_page + start_idx = page * items_per_page + end_idx = min(start_idx + items_per_page, total_items) + + # vehicle_id_list 現在是 (sysid, compid) 的元組列表 + vehicle_comp_list = list(state.connected_vehicles_dict.keys()) + + # 顯示當前頁的物件 + for sysid, compid in vehicle_comp_list[start_idx:end_idx]: + # 建立顯示名稱 + display_name = f"Vehicle #{sysid} - Comp #{compid}" + desc = f"載具 {sysid} 組件 {compid}" + + vehicle_menu = MenuNode(display_name, desc, "INSPECT_VEHICLE") + # 將 sysid 和 compid 附加到選單項目上 + vehicle_menu.sysid = sysid + vehicle_menu.compid = compid + children.append(vehicle_menu) + + # 添加分頁控制 + if total_pages > 1: + children.append(MenuNode("---", f"第 {page+1}/{total_pages} 頁", None)) + if page > 0: + prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE") + prev_node.page = page - 1 + children.append(prev_node) + if page < total_pages - 1: + next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE") + next_node.page = page + 1 + children.append(next_node) + + children.append(MenuNode("GoUp", "回到上層選單", "BACK")) + menu = MenuNode("Connected Vehicles", f"已連線載具列表 (第 {page + 1} 頁)", children=children) + menu.current_page = page + return menu - elif selected.action == "INSPECT_VEHICLES": - # 進入載具檢視選單 - cmd_q.put("UPDATE_VEHICLES_LIST") - created_list_menu = self.create_vehicles_list_menu(state, page=0) - menu_stack.append(created_list_menu) - idx_stack.append(0) + def show_vehicle_info(self, stdscr, sysid, compid, cmd_q: queue.Queue, state: PanelState): + """顯示載具組件詳細資訊(動態更新,顯示變化率)""" + + # 等待資訊準備 + start = time.time() + while not state.vehicle_info_single.get('InfoReady', False): + if time.time() - start > 2: + state.panel_info_msg_list.append(("Fail! Vehicle Info NOT Acquired!", time.time())) + return + time.sleep(0.05) + + info = state.vehicle_info_single + height, width = stdscr.getmaxyx() + dialog_height = min(22, height - 4) + dialog_width = min(70, width - 4) + start_y = (height - dialog_height) // 2 + start_x = (width - dialog_width) // 2 + + dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) + dialog_win.nodelay(True) # 非阻塞模式,允許動態更新 + dialog_win.keypad(True) + + # MAV_TYPE 名稱對應 + MAV_TYPE_NAMES = { + 0: "Generic", 1: "Fixed Wing", 2: "Quadrotor", 3: "Helicopter", + 4: "Antenna Tracker", 5: "GCS", 6: "Airship", 10: "Ground Rover", + 12: "Boat", 13: "Submarine", 26: "Gimbal", 30: "Camera" + } + + # 動態更新迴圈 + last_update = time.time() + while True: + current_time = time.time() + + # 每 1 秒重新請求資料 + if current_time - last_update >= 1.0: + # 觸發資料更新(透過 Orchestrator) + cmd_q.put(("INSPECT_VEHICLE", sysid, compid)) + # 等待新資料準備好 + wait_start = time.time() + state.vehicle_info_single['InfoReady'] = False + while not state.vehicle_info_single.get('InfoReady', False): + if time.time() - wait_start > 0.5: # 最多等 0.5 秒 + break + time.sleep(0.01) + # 更新 info 參照 + info = state.vehicle_info_single + last_update = current_time + + dialog_win.clear() + dialog_win.border() + dialog_win.addstr(0, 2, f" Vehicle #{info['sysid']} - Component #{info['compid']} ", curses.A_BOLD) + + # === 基礎資訊 === + dialog_win.addstr(2, 2, "[Identity]", curses.A_UNDERLINE) + dialog_win.addstr(2, 32, "[Connection]", curses.A_UNDERLINE) + + # # MAV Type # 這個用不到 + # mav_type = info.get('vehicle_type', 'N/A') + # mav_type_str = f"{mav_type} ({MAV_TYPE_NAMES.get(mav_type, 'Unknown')})" if isinstance(mav_type, int) else str(mav_type) + # dialog_win.addstr(3, 2, f"MAV Type : {mav_type_str}") + + # Component Type + dialog_win.addstr(3, 2, f"Component Type : {info.get('component_type', 'N/A')}") + + # Autopilot Type + if info.get('mav_autopilot') is not None: + dialog_win.addstr(4, 2, f"Autopilot : {info.get('mav_autopilot', 'N/A')}") + + # Connection Info + dialog_win.addstr(3, 32, f"Connection : {info.get('connection_type', 'N/A')}") + dialog_win.addstr(4, 32, f"Socket ID : #{info.get('socket_id', 'N/A')}") + + # === 封包統計 === + stats = info.get('packet_stats', {}) + dialog_win.addstr(7, 2, "[Packet Statistics]", curses.A_UNDERLINE) + + received = stats.get('received', 0) + lost = stats.get('lost', 0) + loss_rate = stats.get('loss_rate', 0.0) + last_seq = stats.get('last_seq', 'N/A') + + # 計算變化 + received_delta = stats.get('received_delta', 0) + lost_delta = stats.get('lost_delta', 0) + + # 顯示變化率 + recv_str = f"{received:6d}" + if received_delta > 0: + recv_str += f" (+{received_delta}↑)" + + lost_str = f"{lost:4d}" + if lost_delta > 0: + lost_str += f" (+{lost_delta}↑)" + + dialog_win.addstr(8, 2, f"Received : {recv_str}") + dialog_win.addstr(8, 32, f"Lost : {lost_str}") + dialog_win.addstr(9, 2, f"Loss Rate : {loss_rate:.2f}%") + dialog_win.addstr(9, 32, f"Last Seq : {last_seq}") + + # 最後接收時間 + last_msg_time = stats.get('last_msg_time') + if last_msg_time: + time_str = time.strftime("%H:%M:%S", time.localtime(last_msg_time)) + elapsed = current_time - last_msg_time + dialog_win.addstr(10, 2, f"Last Time : {time_str}") + dialog_win.addstr(10, 32, f"Elapsed : {elapsed:.1f}s") + else: + dialog_win.addstr(10, 2, "Last Time : N/A") + + # === 訊息類型分佈 === + dialog_win.addstr(12, 2, "[Message Types] (Top 12)", curses.A_UNDERLINE) + + msg_counts = info.get('msg_type_counts', {}) + + # MAVLink 訊息名稱對應(縮寫版本) + MSG_NAMES = { + 0: "HB", 1: "SYS_ST", 24: "GPS_RAW", 27: "RAW_IMU", + 30: "ATT", 32: "LOC_POS", 33: "GLB_POS", 62: "NAV_CTL", + 74: "VFR_HUD", 147: "BATT_ST" + } + + # 顯示前 12 個最常見的訊息類型(兩列各 6 個) + msg_items = list(msg_counts.items())[:12] + line = 13 + for i, (msg_id, count) in enumerate(msg_items): + msg_name = MSG_NAMES.get(msg_id, "???") + delta = stats.get(f'msg_delta_{msg_id}', 0) + + # 格式化數據 + if delta > 0: + data_str = f"{count}(+{delta}↑)" + else: + data_str = f"{count}" + + # 格式化顯示:[ID]NAME DATA (ID固定3字符寬度,右對齊) + display_str = f"[{msg_id:3d}]{msg_name:8s} {data_str}" + + # 左列(偶數索引)或右列(奇數索引) + col = 2 if i % 2 == 0 else 36 + row = line + (i // 2) + + if row >= dialog_height - 3: # 避免超出邊界 + break - elif selected.action == "INSPECT_VEHICLE": - # 顯示載具詳細資訊 - if hasattr(selected, 'sysid') and hasattr(selected, 'compid'): - cmd_q.put(("INSPECT_VEHICLE", selected.sysid, selected.compid)) - self.show_vehicle_info(stdscr, selected.sysid, selected.compid, cmd_q, state) - - elif callable(selected.action): - # 執行函式 - cmd_q.put(selected.action) - - try: - curses.wrapper(draw_menu) - except KeyboardInterrupt: - pass - finally: - cleanup() + dialog_win.addstr(row, col, display_str) + + # 確保跳過顯示區域 + line = line + 6 + + dialog_win.addstr(dialog_height - 2, 2, "Press any key to return... | Auto-refresh: 1.0s") + dialog_win.refresh() + + # 檢查是否有按鍵(非阻塞) + ch = dialog_win.getch() + if ch != -1: # 有按鍵則退出 + break + + # 短暫延遲 + time.sleep(0.1) + + state.vehicle_info_single['InfoReady'] = False + del dialog_win + stdscr.clear() + stdscr.refresh() @@ -1189,7 +1188,7 @@ class Orchestrator: if socket_id in s_obj.target_sockets: self.remove_target_from_object(s_id, socket_id) # 再移除該物件 - self.delete_mavlink_object(socket_id) + self.delete_udp_object(socket_id) elif action == "MAVOBJ_ADD_TARGET": source_id, target_id = cmd[1], cmd[2] self.add_target_to_object(source_id, target_id) @@ -1206,9 +1205,7 @@ class Orchestrator: self.panelState.socket_info_single["return_msg_types"] = mav_obj.return_msg_types self.panelState.socket_info_single["primary_socket_id"] = mav_obj.primary_socket_id self.panelState.socket_info_single["target_sockets"] = mav_obj.target_sockets - ip_info = mav_obj.mavlink_socket.port.getsockname() - self.panelState.socket_info_single["socket_connection_string"] = f"{ip_info[0]}:{ip_info[1]}" - # getattr(mav_obj.mavlink_socket, "connection_string", "") + self.panelState.socket_info_single["socket_connection_string"] = mav_obj.mavlink_socket.address self.panelState.socket_info_single["InfoReady"] = True # 標記資訊已準備好 elif action == "INSPECT_LINKED_SERIAL": serial_id = cmd[1] @@ -1322,7 +1319,7 @@ class Orchestrator: self.panelState.panel_info_msg_list.append((f"Created UDP {self.panelState.udp_info_temp['direction']} object: {connection_string}", time.time())) - def delete_mavlink_object(self, socket_id): + def delete_udp_object(self, socket_id): """移除指定的 mavlink_object""" self.manager.remove_mavlink_object(socket_id) diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index 71760a0..7b321f1 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -663,16 +663,16 @@ class async_io_manager: self.running = False logger.info("async_io_manager event loop END!") - async def _main_task(self): # 當初想說可能要一個額外的 task 來管理 但是目前好像用不掉 先放著不管 - """主任務協程 讓 async_io_manager 在執行緒中持續運作""" - logger.info("async_io_manager main task started") + # async def _main_task(self): # 當初想說可能要一個額外的 task 來管理 但是目前好像用不掉 先放著不管 + # """主任務協程 讓 async_io_manager 在執行緒中持續運作""" + # logger.info("async_io_manager main task started") - while self.running and not self._stop_event.is_set(): - await asyncio.sleep(0.1) + # while self.running and not self._stop_event.is_set(): + # await asyncio.sleep(0.1) - logger.info("async_io_manager main task ended") + # logger.info("async_io_manager main task ended") - def add_mavlink_object(self, mavlink_obj): + def add_mavlink_object(self, mavlink_obj: mavlink_object): """添加 mavlink_object""" # 一個防呆 確保有 event loop 與 _main_task 正在運作 if not self.running or not self.loop: @@ -718,7 +718,7 @@ class async_io_manager: logger.error(f"Failed to create task for mavlink_object {socket_id}: {e}") return False - def remove_mavlink_object(self, socket_id): + def remove_mavlink_object(self, socket_id: int): """移除 mavlink_object""" # 一個防呆 確保有 event loop 正在運作 diff --git a/src/fc_network_adapter/fc_network_adapter/serialManager.py b/src/fc_network_adapter/fc_network_adapter/serialManager.py index 02588e1..731a950 100644 --- a/src/fc_network_adapter/fc_network_adapter/serialManager.py +++ b/src/fc_network_adapter/fc_network_adapter/serialManager.py @@ -187,11 +187,11 @@ class ATCommandHandler: # print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}") pass - +# ====================== 分割線 ===================== class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發 def __init__(self, udp_handler, serial_port_str): - self.udp_handler = udp_handler # UDP 的傳輸把手 + self.udp_handler = udp_handler # UDP 的傳輸物件 self.serial_port_str = serial_port_str self.at_handler = ATCommandHandler(serial_port_str) @@ -327,8 +327,8 @@ class serial_manager: self.receiver_type = receiver_type self.target_port = target_port # 指向的 UPD 端口 - self.transport = None - self.protocol = None + self.transport = None # TODO 這個變數可能沒有作用 + self.protocol = None # TODO 這個變數可能沒有作用 self.udp_handler = None self.serial_handler = None