diff --git a/README.md b/README.md index 8e81de0..490d84b 100644 --- a/README.md +++ b/README.md @@ -44,4 +44,5 @@ N. logs 是執行時期的記錄檔 例如 在 ~/AirTrapMine/src/ 資料夾下 > python -m fc_network_adapter.fc_network_adapter.mainOrchestrator > python -m fc_network_adapter.tests.test_ringBuffer +> python -m fc_network_adapter.tests.demo_integration diff --git a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py index 805f1eb..f15bad6 100644 --- a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py +++ b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py @@ -7,7 +7,6 @@ ''' - import os import time import sys @@ -17,13 +16,45 @@ import threading import queue import signal +from pymavlink import mavutil + # 自定義的 import from . import mavlinkObject as mo -from . import mavlinkDevice as md from .utils import RingBuffer, setup_logger + + logger = setup_logger(os.path.basename(__file__)) +class PanelState: + def __init__(self): + self.panel_status = "Idle" + termination_start_time = None + self.mavlink_bridge_state = "Stopped" + self.object_manager_state = "Stopped" + self.socket_object_list = [] + self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息 + + # 這邊是儲存關於 socket object 的資料 + self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊 + self.socket_info_single = {"socket_type": "", "socket_state": "", "bridge_msg_types": "", "return_msg_types": "", + "target_sockets": "", "primary_socket_id": "", "InfoReady": False} # 暫存單一 socket 的資訊 + + def intoSTART(self): + self.panel_status = "Running" + + def intoTERMINATION(self): + self.termination_start_time = time.time() + self.panel_status = "Terminating" + + def intoENGINEER(self): + self.panel_status = "Engineer" + + def intoSTOPPED(self): + self.panel_status = "Stopped" + + def set_user_input(self, text): + self.user_input = text class MenuNode: def __init__(self, name, desc="", action=None, children=None): @@ -36,33 +67,233 @@ class ControlPanel: def __init__(self): pass + def input_dialog(stdscr, prompt="請輸入文字: "): + """顯示輸入對話框""" + height, width = stdscr.getmaxyx() + + # 建立輸入視窗 + dialog_height = 5 + dialog_width = min(60, width - 4) + start_y = (height - dialog_height) // 2 + start_x = (width - dialog_width) // 2 + + # 建立視窗邊框 + dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) + dialog_win.border() + dialog_win.addstr(1, 2, prompt) + dialog_win.addstr(3, 2, "按 Enter 確認, ESC 取消") + dialog_win.refresh() + + # 輸入區域 + input_win = curses.newwin(1, dialog_width - 6, start_y + 2, start_x + 2) + input_win.keypad(True) + + curses.echo() + curses.curs_set(1) + + user_input = "" + + while True: + input_win.clear() + input_win.addstr(0, 0, user_input[-dialog_width+8:]) # 顯示輸入內容(滾動) + input_win.refresh() + + ch = input_win.getch() + + if ch == 27: # ESC + user_input = None + break + elif ch in (curses.KEY_ENTER, 10, 13): # Enter + break + elif ch in (curses.KEY_BACKSPACE, 127, 8): # Backspace + user_input = user_input[:-1] + elif 32 <= ch <= 126: # 可打印字符 + user_input += chr(ch) + + curses.noecho() + curses.curs_set(0) + + # 清理視窗 + del input_win + del dialog_win + stdscr.clear() + stdscr.refresh() + + return user_input + + def create_object_list_menu(self, state: PanelState, page=0, items_per_page=5): + """動態創建 mavlink_object 列表選單(支持分頁)""" + children = [] + + if not state.socket_object_list: + children.append(MenuNode("(空)", "目前沒有連結口", None)) + else: + total_items = len(state.socket_object_list) + total_pages = (total_items + items_per_page - 1) // items_per_page + start_idx = page * items_per_page + end_idx = min(start_idx + items_per_page, total_items) + + # 顯示當前頁的物件 + for socket_id in state.socket_object_list[start_idx:end_idx]: + # 為每個 socket 創建子選單 + obj_menu = MenuNode(f"Socket #{socket_id}", f"連結口 {socket_id}", None, children=[ + MenuNode("Info", "查看詳細資訊", "INSPECT_MAV_OBJECT"), + MenuNode("Make Link", "建立轉發連結", "MAVOBJ_MAKE_LINK"), + MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"), + MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"), + MenuNode("Remove Target", "移除轉發目標(工程)", "MAVOBJ_REMOVE_TARGET"), + MenuNode("返回", "回到列表", "BACK"), + ]) + # 將 socket_id 附加到每個子選單項目上 + for child in obj_menu.children: + child.socket_id = socket_id + children.append(obj_menu) + + # 添加分頁控制 + if total_pages > 1: + children.append(MenuNode("---", "---", None)) + if page > 0: + prev_node = MenuNode("◀ 上一頁", f"第 {page}/{total_pages} 頁", "PREV_PAGE") + prev_node.page = page - 1 + children.append(prev_node) + if page < total_pages - 1: + next_node = MenuNode("下一頁 ▶", f"第 {page + 2}/{total_pages} 頁", "NEXT_PAGE") + next_node.page = page + 1 + children.append(next_node) + + children.append(MenuNode("返回", "回到上層選單", "BACK")) + menu = MenuNode("Object List", f"連結口列表 (第 {page + 1} 頁)", children=children) + menu.current_page = page + return menu + + def show_object_info(self, stdscr, socket_id, state: PanelState): + """顯示物件詳細資訊的對話框""" + height, width = stdscr.getmaxyx() + dialog_height = 15 + dialog_width = min(70, width - 4) + start_y = (height - dialog_height) // 2 + start_x = (width - dialog_width) // 2 + + dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) + dialog_win.border() + dialog_win.addstr(0, 2, f" Socket #{socket_id} 詳細資訊 ", curses.A_BOLD) + + while not state.socket_info_single.get('InfoReady', False): + time.sleep(0.05) # 等待資訊準備好 + + # 這裡顯示基本資訊 + dialog_win.addstr(2, 2, f"Socket ID : {socket_id}") + dialog_win.addstr(3, 2, f"Socket status : 運行中") + # show_str = ", ".join(map(str, state.socket_info_single.get('socket_type', ''))) + dialog_win.addstr(4, 2, f"Socket Type : {state.socket_info_single.get('socket_type', '')}") + show_str = ",".join(map(str, state.socket_info_single.get('bridge_msg_types', ''))) + dialog_win.addstr(5, 2, f"Bridge Pack : {show_str if show_str else 'N/A'}") + show_str = ",".join(map(str, state.socket_info_single.get('return_msg_types', ''))) + dialog_win.addstr(6, 2, f"Return Pack : {show_str if show_str else 'N/A'}") + dialog_win.addstr(7, 2, f"Primary Socket ID: {state.socket_info_single.get('primary_socket_id', 'It Self')}") + show_str = ",".join(map(str, state.socket_info_single.get('target_sockets', ''))) + dialog_win.addstr(8, 2, f"Switching Targets: {show_str if show_str else 'N/A'}") + + state.socket_info_single['InfoReady'] = False # 重置狀態以便下次使用 + + dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...") + dialog_win.refresh() + + dialog_win.getch() + del dialog_win + stdscr.clear() + stdscr.refresh() + + def select_target_socket(self, stdscr, source_socket_id, state: PanelState, remove_mode=False): + """選擇目標 socket 的對話框""" + height, width = stdscr.getmaxyx() + dialog_height = min(15, len(state.socket_object_list) + 5) + dialog_width = min(50, width - 4) + start_y = (height - dialog_height) // 2 + start_x = (width - dialog_width) // 2 + + dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x) + dialog_win.keypad(True) + + title = "選擇要移除的目標" if remove_mode else "選擇轉發目標" + available_sockets = [sid for sid in state.socket_object_list if sid != source_socket_id] + + if not available_sockets: + dialog_win.border() + dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD) + dialog_win.addstr(2, 2, "沒有可用的目標") + dialog_win.addstr(4, 2, "按任意鍵返回...") + dialog_win.refresh() + dialog_win.getch() + del dialog_win + stdscr.clear() + stdscr.refresh() + return None + + selected_idx = 0 + + while True: + dialog_win.clear() + dialog_win.border() + dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD) + + for i, socket_id in enumerate(available_sockets): + marker = "➤" if i == selected_idx else " " + attr = curses.A_REVERSE if i == selected_idx else curses.A_NORMAL + dialog_win.addstr(2 + i, 2, f"{marker} Socket #{socket_id}", attr) + + dialog_win.addstr(dialog_height - 2, 2, "Enter確認 ESC取消") + dialog_win.refresh() + + ch = dialog_win.getch() + + if ch in (curses.KEY_UP, ord('k')): + selected_idx = (selected_idx - 1) % len(available_sockets) + elif ch in (curses.KEY_DOWN, ord('j')): + selected_idx = (selected_idx + 1) % len(available_sockets) + elif ch in (curses.KEY_ENTER, 10, 13): + result = available_sockets[selected_idx] + del dialog_win + stdscr.clear() + stdscr.refresh() + return result + elif ch == 27: # ESC + del dialog_win + stdscr.clear() + stdscr.refresh() + return None + def menu_tree(self): """建立多層選單結構""" return MenuNode("Main Menu", children=[ - MenuNode("MavLink Object Control", children=[ - MenuNode("New+", "新增一個連結口", "ADD_MAV_OBJECT"), - MenuNode("Remove", "移除某個連結口", "REMOVE_MAV_OBJECT"), - MenuNode("ListAll", "顯示連結口列表", "LIST_MAV_OBJECT"), - MenuNode("Inspection", "顯示連結口資訊", "INSPECT_MAV_OBJECT"), - ]), - MenuNode("參數設定", children=[ - MenuNode("Speed", children=[ - MenuNode("Increase Speed", "增加速度", "ADJUST_SPEED"), - MenuNode("Decrease Speed", "減少速度", "ADJUST_SPEED"), + MenuNode("MavLink Object", "控制 MavLink 物件", children=[ + MenuNode("New+", children=[ + MenuNode("UDP InBound", children=[ + MenuNode("IP(Listen)", "設定監聽的 IP 位址", "TEXT_UDP_IP"), + MenuNode("Port(Listen)", "設定監聽的 Port", "TEXT_UDP_PORT"), + MenuNode("Create", "建立 UDP InBound 連結口", "CREATE_UDP_INBOUND"), + ]), + MenuNode("UDP OutBound", children=[ + MenuNode("IP(Target)", "設定目標的 IP 位址", "TEXT_UDP_IP"), + MenuNode("Port(Target)", "設定目標的 Port", "TEXT_UDP_PORT"), + MenuNode("Create", "建立 UDP OutBound 連結口", "CREATE_UDP_OUTBOUND"), + ]), + ]), + MenuNode("ListAll", "顯示並管理所有連結口", "LIST_MAV_OBJECT"), + ]), + MenuNode("Engineer Mode", "工程模式", children=[ + MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), #TODO: 尚未實作 + MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), #TODO: 尚未實作 + ]), + MenuNode("Shutdown", "關閉整個系統", children=[ + MenuNode("Return", "繼續運行", "BACK"), + MenuNode("Confirm", "關閉系統", "QUIT"), ]), - MenuNode("輸入文字", "鍵盤輸入模式", "INPUT_TEXT"), - ]), - MenuNode("資訊查看", children=[ - MenuNode("顯示狀態", "查看當前狀態", "SHOW_STATUS"), - MenuNode("顯示輸入", "查看用戶輸入", "SHOW_INPUT"), - ]), - MenuNode("返回上層", "回到上一層選單", "BACK"), - MenuNode("關閉面板", "關閉控制面板", "QUIT"), ]) - def panel_thread(self, cmd_q: queue.Queue, stop_evt: threading.Event): + def panel_thread(self, cmd_q: queue.Queue, state: PanelState, stop_evt: threading.Event): stdscr = None - + def cleanup(): """清理 curses 狀態""" if stdscr: @@ -70,7 +301,13 @@ class ControlPanel: curses.nocbreak() curses.echo() curses.endwin() - + + def panel_shutdown(): + # 先關閉所有模組 再關閉面板 + cmd_q.put("SHUTDOWN_BRIDGE") + cmd_q.put("SHUTDOWN_MANAGER") + + def draw_menu(screen): nonlocal stdscr stdscr = screen @@ -82,6 +319,8 @@ class ControlPanel: # 選單導航狀態 menu_stack = [self.menu_tree()] # 選單堆疊 idx_stack = [0] # 索引堆疊 + + state.intoSTART() # 設定狀態為運行中 while not stop_evt.is_set(): # 檢查是否需要退出 @@ -91,35 +330,92 @@ class ControlPanel: current_menu = menu_stack[-1] current_idx = idx_stack[-1] + # 獲取終端機尺寸 + height, width = stdscr.getmaxyx() + # 簡單暴力的限制視窗的大小 + if height < 20 or width < 60: + logger.error("Terminal size too small for Control Panel.") + break + stdscr.clear() stdscr.border() stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD) - stdscr.addstr(1, 2, f"mavlink bridge state : ") - stdscr.addstr(2, 2, f"object manager state : ") - stdscr.addstr(3, 2, f"Node state : ") + stdscr.addstr(1, 2, f" Panel Status : {state.panel_status}") + stdscr.addstr(2, 2, f"mavlink Bridge State : {state.mavlink_bridge_state}") + stdscr.addstr(3, 2, f"Object Manager State : {state.object_manager_state}") + stdscr.addstr(4, 2, f"Socket Object number : {len(state.socket_object_list)}") - # # Header - 顯示選單路徑 - # path = " → ".join([menu.name for menu in menu_stack]) - # stdscr.addstr(0, 2, f"控制面板: {path}", curses.A_BOLD) - # stdscr.addstr(1, 2, f"狀態: {state.status} | 速度: {state.speed} | 計數: {state.counter}") - # if state.user_input: - # stdscr.addstr(2, 2, f"輸入: {state.user_input[:50]}...") + # # 更新模組狀態顯示 + # stdscr.addstr(2, 25, f"{state.mavlink_bridge_state}") + # stdscr.addstr(3, 25, f"{state.object_manager_state}") + # stdscr.addstr(4, 25, f"{len(state.socket_object_list)} ") # 顯示當前選單項目 - start_line = 5 + start_line = 6 for i, child in enumerate(current_menu.children): marker = "➤ " if i == current_idx else " " - line = f"{marker}{child.name:15s} – {child.desc}" + # 動態顯示已輸入的值 + desc = child.desc + if child.action == "TEXT_UDP_IP" and state.udp_info_temp["IP"]: + desc = f"{child.desc} [{state.udp_info_temp['IP']}]" + elif child.action == "TEXT_UDP_PORT" and state.udp_info_temp["Port"]: + desc = f"{child.desc} [{state.udp_info_temp['Port']}]" + + line = f"{marker}{child.name:15s} – {desc}" attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL stdscr.addstr(start_line + i, 4, line, attr) + # 顯示訊息區域 + # info_start_line = start_line + len(current_menu.children) + 1 + info_start_line = height - 8 + max_msg_lines = 5 # 最多顯示 5 行訊息 + current_time = time.time() + + # 清理過時的訊息 + state.panel_info_msg_list = [ + (msg, timestamp) for msg, timestamp in state.panel_info_msg_list + if current_time - timestamp < 2.0 #秒數 + ] + + # 只顯示最新的 max_msg_lines 條訊息 + display_msgs = state.panel_info_msg_list[-max_msg_lines:] + + for i, msg_data in enumerate(display_msgs): + if info_start_line + i >= help_line - 1: # 避免超出邊界 + break + msg = msg_data[0] if isinstance(msg_data, tuple) else msg_data + # 截斷過長的訊息 + max_msg_width = width - 6 + if len(msg) > max_msg_width: + msg = msg[:max_msg_width-3] + "..." + + stdscr.addstr(info_start_line + i, 2, f"💬 {msg}", curses.A_BOLD) + + + # 操作說明 - help_line = start_line + len(current_menu.children) + 2 - stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 ←→調參數 q退出", curses.A_DIM) + # help_line = start_line + len(current_menu.children) + 2 + help_line = height - 2 + stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層 q退出", curses.A_DIM) stdscr.refresh() + # 若進入 TERMINATION 狀態,畫面可以刷新 但是不能操作 + # 驗證 bridge 跟 manager 狀態 兩者都停止後 就進入 STOPPED 狀態並跳出迴圈 + # 超過幾秒沒有反應就強制關閉 + if state.panel_status == "Terminating": + if time.time() - state.termination_start_time > 3: + logger.warning("Control Panel forced shutdown after timeout.") + state.intoSTOPPED() + stop_evt.set() + continue + time.sleep(0.1) + if state.mavlink_bridge_state == "Stopped" and state.object_manager_state == "Stopped": + state.intoSTOPPED() + stop_evt.set() + continue + # 設定短暫的 timeout,讓執行緒能夠響應 stop_evt stdscr.timeout(100) # 100ms timeout ch = stdscr.getch() @@ -133,24 +429,28 @@ class ControlPanel: elif ch in (curses.KEY_DOWN, ord('j')): idx_stack[-1] = (current_idx + 1) % len(current_menu.children) + + elif ch == (ord('O')): + # 直接進入工程模式 + state.intoENGINEER() elif ch == curses.KEY_LEFT: - # 返回上層或調整參數 + # 返回上層 if len(menu_stack) > 1: menu_stack.pop() idx_stack.pop() - else: - # 在根選單,檢查是否為調整參數 - selected = current_menu.children[current_idx] - if selected.action == "ADJUST_SPEED": - state.speed = max(1, state.speed - 1) - + elif ch == curses.KEY_RIGHT: - # 調整參數 + # 進入下層 (但不執行動作) selected = current_menu.children[current_idx] - if selected.action == "ADJUST_SPEED": - state.speed = min(10, state.speed + 1) - + if selected.children: # 有子選單 + menu_stack.append(selected) + idx_stack.append(0) + + elif ch in (ord('q'), 27): + state.intoTERMINATION() + panel_shutdown() + elif ch in (curses.KEY_ENTER, 10, 13): selected = current_menu.children[current_idx] @@ -165,96 +465,320 @@ class ControlPanel: idx_stack.pop() elif selected.action == "QUIT": - break - - elif selected.action == "INPUT_TEXT": - # 進入輸入模式 - result = input_dialog(stdscr, "請輸入文字: ") + state.intoTERMINATION() + panel_shutdown() + + elif selected.action == "TEXT_UDP_IP": + result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 IP 位址: ") if result is not None: - cmd_q.put(lambda: state.set_user_input(result)) - - elif selected.action == "SHOW_STATUS": - # 顯示狀態訊息 - stdscr.clear() - stdscr.addstr(5, 2, f"當前狀態: {state.status}") - stdscr.addstr(6, 2, f"速度設定: {state.speed}") - stdscr.addstr(7, 2, f"計數器: {state.counter}") - stdscr.addstr(9, 2, "按任意鍵返回...") - stdscr.refresh() - stdscr.getch() - - elif selected.action == "SHOW_INPUT": - # 顯示用戶輸入 - stdscr.clear() - stdscr.addstr(5, 2, f"用戶輸入內容:") - stdscr.addstr(6, 2, f"{state.user_input}") - stdscr.addstr(8, 2, "按任意鍵返回...") - stdscr.refresh() - stdscr.getch() - + state.udp_info_temp["IP"] = result + + elif selected.action == "TEXT_UDP_PORT": + result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 Port: ") + if result is not None: + state.udp_info_temp["Port"] = result + + elif selected.action == "CREATE_UDP_INBOUND": + cmd_q.put("CREATE_UDP_INBOUND") + # 確認後回到上兩層 + if len(menu_stack) > 1: + menu_stack.pop() + idx_stack.pop() + menu_stack.pop() + idx_stack.pop() + + elif selected.action == "CREATE_UDP_OUTBOUND": + cmd_q.put("CREATE_UDP_OUTBOUND") + # 確認後回到上兩層 + if len(menu_stack) > 1: + menu_stack.pop() + idx_stack.pop() + menu_stack.pop() + idx_stack.pop() + + elif selected.action == "LIST_MAV_OBJECT": + # 動態生成 mavlink_object 列表選單 + object_list_menu = self.create_object_list_menu(state, page=0) + menu_stack.append(object_list_menu) + idx_stack.append(0) + + elif selected.action == "PREV_PAGE": + # 上一頁 + if hasattr(selected, 'page'): + menu_stack.pop() + idx_stack.pop() + object_list_menu = self.create_object_list_menu(state, page=selected.page) + menu_stack.append(object_list_menu) + idx_stack.append(0) + + elif selected.action == "NEXT_PAGE": + # 下一頁 + if hasattr(selected, 'page'): + menu_stack.pop() + idx_stack.pop() + object_list_menu = self.create_object_list_menu(state, page=selected.page) + menu_stack.append(object_list_menu) + idx_stack.append(0) + + elif selected.action == "INSPECT_MAV_OBJECT": + # 顯示物件詳細資訊 + if hasattr(selected, 'socket_id'): + cmd_q.put(("INSPECT_MAV_OBJECT", selected.socket_id)) + self.show_object_info(stdscr, selected.socket_id, state) + + elif selected.action == "REMOVE_MAV_OBJECT": + # 移除物件 + if hasattr(selected, 'socket_id'): + cmd_q.put(("REMOVE_OBJECT", selected.socket_id)) + # 返回上層(回到列表) + if len(menu_stack) > 1: + menu_stack.pop() + idx_stack.pop() + # 反正刷新列表會出錯 乾脆再退一層 在下一次進入列表時刷新就好 + menu_stack.pop() + idx_stack.pop() + # # 刷新列表頁面 + # if len(menu_stack) > 1: + # current_page = menu_stack[-1].current_page if hasattr(menu_stack[-1], 'current_page') else 0 + # menu_stack.pop() + # idx_stack.pop() + # time.sleep(0.1) # 等待物件被移除 + # object_list_menu = self.create_object_list_menu(state, page=current_page) + # menu_stack.append(object_list_menu) + # idx_stack.append(0) + + elif selected.action == "MAVOBJ_MAKE_LINK": + # 建立轉發連結 + if hasattr(selected, 'socket_id'): + target_id = self.select_target_socket(stdscr, selected.socket_id, state) + if target_id is not None: + cmd_q.put(("MAVOBJ_MAKE_LINK", selected.socket_id, target_id)) + + elif selected.action == "MAVOBJ_ADD_TARGET": + # 添加目標端口 + if state.panel_status != "Engineer": + state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) + continue # 只有在工程模式下才能操作 + if hasattr(selected, 'socket_id'): + target_id = self.select_target_socket(stdscr, selected.socket_id, state) + if target_id is not None: + cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id)) + + elif selected.action == "MAVOBJ_REMOVE_TARGET": + # 移除目標端口 + if state.panel_status != "Engineer": + state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) + continue # 只有在工程模式下才能操作 + if hasattr(selected, 'socket_id'): + target_id = self.select_target_socket(stdscr, selected.socket_id, state, remove_mode=True) + if target_id is not None: + cmd_q.put(("MAVOBJ_REMOVE_TARGET", selected.socket_id, target_id)) + + elif selected.action == "STOP_MANAGER": + if state.panel_status != "Engineer": + state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) + continue # 只有在工程模式下才能操作 + cmd_q.put("SHUTDOWN_MANAGER") + + elif selected.action == "STOP_BRIDGE": + if state.panel_status != "Engineer": + state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) + continue # 只有在工程模式下才能操作 + cmd_q.put("SHUTDOWN_BRIDGE") elif callable(selected.action): # 執行函式 cmd_q.put(selected.action) - elif ch in (ord('q'), 27): - break - try: curses.wrapper(draw_menu) except KeyboardInterrupt: pass finally: + stop_evt.set() cleanup() + + +class Orchestrator: + def __init__(self, stop_sig): + self.stop_evt = stop_sig + + # === 1) 面板部分的準備 === + self.cmd_q = queue.Queue() + self.panelState = PanelState() # 面板的狀態儲存 + self.cPanel = ControlPanel() # 面板的功能 + self.panel_thread = None + + # === 2) async_io_manager & mavlink_bridge 部分的準備 === + mo.stream_bridge_ring.clear() + mo.return_packet_ring.clear() + self.manager = mo.async_io_manager() + self.bridge = mo.mavlink_bridge() + + def engageWholeSystem(self): + """啟動整個系統""" + # === 1) 面板部分的啟動 === + self.panel_thread = threading.Thread(target=self.cPanel.panel_thread, args=(self.cmd_q, self.panelState, self.stop_evt)) + self.panel_thread.start() + + # === 2) async_io_manager & mavlink_bridge 部分的啟動 === + self.manager.start() + self.bridge.start() + + def mainLoop(self): + logger.info("Main orchestrator started <-") + try: + while not self.stop_evt.is_set(): + + # A. 更新模組狀態 + if self.manager.running: + self.panelState.object_manager_state = 'Running' + else: + self.panelState.object_manager_state = 'Stopped' + + socketid_list = self.manager.get_managed_objects() + self.panelState.socket_object_list = socketid_list + + if self.bridge.running: + self.panelState.mavlink_bridge_state = 'Running' + else: + self.panelState.mavlink_bridge_state = 'Stopped' + + # 取出面板丟過來的「動作」 + try: + cmd = self.cmd_q.get_nowait() + if callable(cmd): + cmd() # 執行對應動作 + elif isinstance(cmd, tuple): + # 處理多參數命令 + action = cmd[0] + if action == "REMOVE_OBJECT": + socket_id = cmd[1] + # 先移除所有跟他關聯的 target sockets + for s_id in mo.mavlink_object.mavlinkObjects: + s_obj = mo.mavlink_object.mavlinkObjects[s_id] + if socket_id in s_obj.target_sockets: + self.remove_target_from_object(s_id, socket_id) + # 再移除該物件 + self.delete_mavlink_object(socket_id) + elif action == "MAVOBJ_MAKE_LINK": + source_id, target_id = cmd[1], cmd[2] + self.add_target_to_object(source_id, target_id) + self.add_target_to_object(target_id, source_id) # 雙向連結 + elif action == "MAVOBJ_ADD_TARGET": + source_id, target_id = cmd[1], cmd[2] + self.add_target_to_object(source_id, target_id) + elif action == "MAVOBJ_REMOVE_TARGET": + source_id, target_id = cmd[1], cmd[2] + self.remove_target_from_object(source_id, target_id) + elif action == "INSPECT_MAV_OBJECT": + socket_id = cmd[1] + mav_obj = self.manager.managed_objects.get(socket_id, None) + if mav_obj: + self.panelState.socket_info_single["socket_type"] = mav_obj.socket_type + # self.panelState.socket_info_single["socket_state"] = "Running" if mav_obj.running + self.panelState.socket_info_single["bridge_msg_types"] = mav_obj.bridge_msg_types + self.panelState.socket_info_single["return_msg_types"] = mav_obj.return_msg_types + self.panelState.socket_info_single["primary_socket_id"] = mav_obj.primary_socket_id + self.panelState.socket_info_single["target_sockets"] = mav_obj.target_sockets + self.panelState.socket_info_single["InfoReady"] = True # 標記資訊已準備好 + + elif cmd == "CREATE_UDP_INBOUND": + self.panelState.udp_info_temp["direction"] = "inbound" + self.create_udp_object() + elif cmd == "CREATE_UDP_OUTBOUND": + self.panelState.udp_info_temp["direction"] = "outbound" + self.create_udp_object() + elif cmd == "SHUTDOWN_BRIDGE": + self.bridge.stop() + elif cmd == "SHUTDOWN_MANAGER": + self.manager.shutdown() + except queue.Empty: + pass + except Exception as e: + logger.error(f"Error processing command: {e}") + + + time.sleep(0.1) + + except KeyboardInterrupt: + pass + except Exception as e: + logger.error(f"Unexpected error in main loop: {e}") + finally: + logger.info("Main orchestrator END!") + + # 關閉 mavlink_bridge (裡面有一個執行緒) + self.bridge.stop() + + # 關閉 async_io_manager (裡面有一個執行緒) + self.manager.shutdown() + + # 關閉面板執行緒 + if self.panel_thread.is_alive(): + self.panel_thread.join(timeout=2) + + def create_udp_object(self): + if self.panelState.udp_info_temp["direction"] == "inbound": + connection_string = f"udp:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}" + elif self.panelState.udp_info_temp["direction"] == "outbound": + connection_string = f"udpout:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}" + + try: + mavlink_socket = mavutil.mavlink_connection(connection_string) + except Exception as e: + self.panelState.panel_info_msg_list.append((f"Failed to create UDP {self.panelState.udp_info_temp['direction']} object: {e}", time.time()-1)) + return + mavlink_object = mo.mavlink_object(mavlink_socket) + mavlink_object.socket_type = "UDP " + self.panelState.udp_info_temp['direction'].capitalize() + self.manager.add_mavlink_object(mavlink_object) + self.panelState.panel_info_msg_list.append((f"Created UDP {self.panelState.udp_info_temp['direction']} object: {connection_string}", time.time())) + + def delete_mavlink_object(self, socket_id): + """移除指定的 mavlink_object""" + self.manager.remove_mavlink_object(socket_id) + + def add_target_to_object(self, source_id, target_id): + """為 mavlink_object 添加轉發目標""" + if source_id in mo.mavlink_object.mavlinkObjects: + source_obj = mo.mavlink_object.mavlinkObjects[source_id] + else: + self.panelState.panel_info_msg_list.append((f"Source object {source_id} not found", time.time())) + return + + if source_obj.add_target_socket(target_id): + self.panelState.panel_info_msg_list.append((f"Added target {target_id} to socket {source_id}", time.time())) + else: + self.panelState.panel_info_msg_list.append((f"Fail Adding target {target_id} to socket {source_id}", time.time())) + + def remove_target_from_object(self, source_id, target_id): + """從 mavlink_object 移除轉發目標""" + if source_id in mo.mavlink_object.mavlinkObjects: + source_obj = mo.mavlink_object.mavlinkObjects[source_id] + else: + self.panelState.panel_info_msg_list.append((f"Source object {source_id} not found", time.time())) + return + + if source_obj.remove_target_socket(target_id): + self.panelState.panel_info_msg_list.append((f"Removed target {target_id} from socket {source_id}", time.time())) + else: + self.panelState.panel_info_msg_list.append((f"Fail Removing target {target_id} from socket {source_id}", time.time())) + def main(): - logger.warning(f"Hello this is mainOrchestrator.py") - pp = ControlPanel() - cmd_q = queue.Queue() stop_evt = threading.Event() - panel_thread_obj = None def signal_handler(signum, frame): - """處理 Ctrl+C 信號""" - print("\n收到中斷信號,正在安全退出...") + """處理 Ctrl+C 信號 藉此訊號 會結束下面的 while 迴圈 並逐步關閉各模組""" stop_evt.set() - if panel_thread_obj and panel_thread_obj.is_alive(): - panel_thread_obj.join(timeout=2) - sys.exit(0) # 註冊信號處理器 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - # 啟動控制面板(改為非 daemon) - panel_thread_obj = threading.Thread(target=pp.panel_thread, args=(cmd_q, stop_evt)) - panel_thread_obj.start() - - print("多層選單控制面板啟動。Ctrl+C 結束程式。") - - try: - while not stop_evt.is_set(): - # 取出面板丟過來的「動作」 - try: - fn = cmd_q.get_nowait() - fn() # 執行對應動作 - except queue.Empty: - pass - - # # 模擬你的長跑邏輯 - # if state.status == "running": - # # 依 speed 前進 - # state.counter += state.speed - - time.sleep(0.1) - except KeyboardInterrupt: - print("\n收到 Ctrl+C,準備結束...") - finally: - stop_evt.set() - if panel_thread_obj.is_alive(): - panel_thread_obj.join(timeout=2) - pass - + orchestrator = Orchestrator(stop_evt) + orchestrator.engageWholeSystem() + orchestrator.mainLoop() # 程式會 block 在這邊 直到收到中斷信號 if __name__ == "__main__": main() diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index a132e97..b4d747f 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -8,26 +8,57 @@ # pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlink_object 裡面 # 這邊的 main 會是用來初始化 mavlink_object 並啟動他的 run function +================= 改版記錄 ============================ + +2025年 6月 20日 +1. mavlink_object 中 由於 Queue 的效能太差 會完全移除 + 其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現 + 而 multiplexingToSwap 會完全被移除代替方式下一條描述 +2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流 + (註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object) + 因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移 + 並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出 +3. mavlink_object 需要加上 state 去管理其狀態 +4. mavlink_object 需要加上 target port 去管理寫出的目標 +5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道 +6. 基於第1,2項 updateMultiplexingList 會被完全移除 +7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object +8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring 與 return_packet_ring 來取代 另外 swap_queues 會被完全移除 + + +2025年 11月 15日 +1. mavlink_bridge 類別新增為 singleton 模式 確保全系統只有一個實例在運行 +2. mavlink_bridge 處理封包改為映射表 (需要在 _init_message_handlers 中新增處理器函式) +3. mavlink_bridge 的主要迴圈 增加 send_message 功能 可指定目標 sysid 或 socket_id 發送 mavlink 封包 +4. async_io_manager 循環邏輯大改動 優化 mavlink_object 加入與移除的邏輯 並使得 task 與 evenlt loop 分層更清楚 +5. mavlink_object 移除不必要的 start 與 stop 方法 由 async_io_manager 統一管理其生命週期 +6. mavlink_object 優化 send_message 方法 避免無效判斷 與 增加一些防呆檢驗 並與 mavlink_bridge 連動工作 +7. 移除迴圈內的 try except 堆疊 增加效能 +8. 移除對於 mavlinkDevice 的依賴 改用 vehicle_registry 來管理所有的載具 + ''' # 基礎功能的 import -import threading -import os -# import queue +import os +import signal import time +import threading import asyncio from enum import Enum, auto +from collections import deque # from typing import Dict, List, Optional, Set, Any, Tuple # mavlink 的 import from pymavlink import mavutil -# ROS2 的 import -from rclpy.node import Node - # 自定義的 import -from .mavlinkDevice import mavlink_device, mavlink_systems -from .mavlinkPublish import mavlink_publisher +from .mavlinkVehicleView import ( + vehicle_registry, + VehicleView, + VehicleComponent, + ComponentType, + ConnectionType +) from .utils import RingBuffer, setup_logger @@ -40,26 +71,24 @@ return_packet_ring = RingBuffer(capacity=256, buffer_id=254) # ====================== 分割線 ===================== -class mavlink_bridge(Node, mavlink_publisher): +# 使用 vehicle_registry 來管理所有的載具視圖 +# vehicle_registry 是從 mavlinkVehicleView 導入的全域實例 + +class mavlink_bridge: ''' 這個 class 就是 固定串流橋接器 - 是用來接收 mavlink 訊息 並進行橋接 - 這個地方是針對 fixed_stream_bridge_queue 的資料做處理的 + 是用來接收 mavlink 訊息 並進行橋接處理 + 這個地方是針對 stream_bridge_ring 的資料做處理的 記錄有 mavlink bus 上有那些 system id 和 component id 為了每個 system id 都有一個對應的 device object - 並且看是否有重複 system id - - 整段代碼包含兩大區塊 thread 和 node - - thread 區塊內會對 fixed_stream_bridge_queue 進行監聽 並且將收到的訊息進行處理 - 其中 HEARTBEAT 是一個特殊類別 用來初始化整個 device object - - node 區塊則是處理 ros2 的 publisher 和 subscriber 訂閱相關 - 藉由控制 ros2 的機制再把 device object 的資訊發送出去 - - fixed_stream_bridge_queue 置換成 stream_bridge_ring - - ps. 我限制了這個 class 只能有一個 instance + + 此類別負責: + 1. 從 stream_bridge_ring 接收訊息 + 2. 管理 mavlink_systems(device 和 component objects) + 3. 處理接收到的訊息並更新對應的 component 狀態 + 4. 提供發送訊息的介面,將訊息路由到正確的 mavlink_object + + ps. 此 class 為 singleton,只能有一個 instance ''' _instance = None _lock = threading.Lock() # 確保多線程安全 @@ -73,154 +102,244 @@ class mavlink_bridge(Node, mavlink_publisher): def __init__(self): if not hasattr(self, "initialized"): # 防止重複初始化 self.initialized = True + self.thread = None + self.running = False - # 關聯到全域變數 - global mavlink_systems - self.mavlink_systems = mavlink_systems + # 初始化訊息處理器字典 (msg_id -> handler_function) + self._init_message_handlers() + else: + logger.error('mavlink_bridge instance already exists. Do not create another one.') - # 當 object 建立時會直接運行 thread 直到消滅 + def _init_message_handlers(self): + """初始化訊息處理器映射表,提高處理效率""" + self.message_handlers = { + 0: self._handle_heartbeat, # HEARTBEAT + 30: self._handle_attitude, # ATTITUDE + 32: self._handle_local_position, # LOCAL_POSITION_NED + 33: self._handle_global_position, # GLOBAL_POSITION_INT + 74: self._handle_vfr_hud, # VFR_HUD + 147: self._handle_battery_status, # BATTERY_STATUS + } + + def start(self): + """啟動 mavlink_bridge 的運作""" + if not self.running: self.running = True - self.thread = threading.Thread(target=self._run_thread) + self.thread = threading.Thread(target=self._run_thread, name="MavlinkBridge") self.thread.start() else: - logger.error('mavlink_bridge instance already exists. Do not create another one.') + logger.warning("mavlink_bridge is already running") def stop(self): + """停止 mavlink_bridge 的運作""" self.running = False + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=3.0) # === Thread 區塊 === def _run_thread(self): - start_time = time.time() # debug - run_loop_count = 0 # debug - logger.info('Start of mavlink_bridge._run_thread') - # 從 Queue stream_bridge_ring 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理 + """主執行緒:從 stream_bridge_ring 接收並處理 mavlink 訊息""" + logger.info('mavlink_bridge started <-') + while self.running: - # # 這個迴圈每秒鐘被執行了幾輪? # 這整段都是 debug 用的 - # current_time = time.time() - # if (current_time - start_time) >= 1.0: - # logger.info(f'mavlink_bridge._run_thread loop count: {run_loop_count}') - # run_loop_count = 0 - # start_time = current_time - # else: - # run_loop_count += 1 - # # ========================= - - + # 檢查是否有訊息 if stream_bridge_ring.is_empty(): + time.sleep(0.001) # 避免忙碌等待 continue + + # 取出訊息包:(socket_id, timestamp, mavlink_msg) msg_pack = stream_bridge_ring.get() - - msg = msg_pack[2] + socket_id, timestamp, msg = msg_pack[0], msg_pack[1], msg_pack[2] + + # 解析訊息基本資訊 sysid = msg.get_srcSystem() compid = msg.get_srcComponent() msg_id = msg.get_msgId() - - # 若這個 system id 還不存在 則建立 device object - if not sysid in self.mavlink_systems: - this_device = mavlink_device() # 創建一個新的 device object - self.mavlink_systems[sysid] = this_device - this_device.socket_id = msg_pack[0] - this_device.sysid = sysid - else: - this_device = self.mavlink_systems[sysid] - - # 若該 component id 存在 則直接使用該 component object - # 若該 component id 不存在 則利用 heartbeat 創建一個新的 component object - # 若該 component id 不存在 又不是 heartbeat 則不處理 - if compid in self.mavlink_systems[sysid].components: - this_component = self.mavlink_systems[sysid].components[compid] - elif msg_id == 0: - # 只有透過 heartbeat 可以創建一個新的 component object - this_component = this_device.mavlink_component() - this_device.components[msg.get_srcComponent()] = this_component - this_component.mav_type = msg.type - this_component.mav_autopilot = msg.autopilot + + # 確保 VehicleView 存在 + vehicle = vehicle_registry.get(sysid) + if vehicle is None: + vehicle = vehicle_registry.register(sysid) + # 存儲 socket_id 到自定義 meta 中 + vehicle.custom_meta['socket_id'] = socket_id + + # 確保 VehicleComponent 存在 + component = vehicle.get_component(compid) + if component is None: + if msg_id == 0: # 只有透過 HEARTBEAT 才能創建新的 component + # 根據 mav_type 判斷 component 類型 + comp_type = self._determine_component_type(msg.type) + component = vehicle.add_component(compid, comp_type) + component.mav_type = msg.type + component.mav_autopilot = msg.autopilot + else: + # component 不存在且非 HEARTBEAT,忽略此訊息 + continue + + # 使用處理器字典分發訊息處理 + if msg_id in self.message_handlers: + try: + self.message_handlers[msg_id](vehicle, component, msg, timestamp) + except Exception as e: + logger.error(f'Error handling message type {msg_id}: {e}') else: - continue - - # ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↓↓↓↓↓↓↓↓↓↓↓↓ - - if msg_id == 0: # HEARTBEAT 處理 - this_component.emitParams['base_mode'] = msg.base_mode - this_component.emitParams['flightMode_mode'] = mavutil.mode_string_v10(msg) - this_component.emitParams['flightMode'] = msg # debug - - # print("mav_type : ", msg.type) # debug - print("get mode :", mavutil.mode_string_v10(msg)) # debug - # print("record mode :", this_component.emitParams['flightMode_mode']) # debug - - elif msg_id == 30: # ATTITUDE 處理 - this_component.emitParams['attitude'] = msg - - elif msg_id == 32: # LOCAL_POSITION_NED 處理 - this_component.emitParams['local_position'] = msg - - elif msg_id == 33: # GLOBAL_POSITION_INT 處理 - this_component.emitParams['global_position'] = msg - - elif msg_id == 74: # VFR_HUD 處理 - this_component.emitParams['vfr_hud'] = msg + logger.debug(f'Unhandled message type: {msg_id} / {msg.get_type()}') + + logger.info('mavlink_bridge END!') + + def _determine_component_type(self, mav_type: int) -> ComponentType: + """根據 MAV_TYPE 判斷組件類型""" + # MAV_TYPE 定義: + # 0=通用, 1=固定翼, 2=四旋翼, 3=直升機, 4=天線追蹤器, 5=GCS, 6=飛船, + # 26=雲台, 27=ADSB, 28=降落傘等 + if mav_type == 6: # MAV_TYPE_GCS + return ComponentType.GCS + elif mav_type == 26: # MAV_TYPE_GIMBAL + return ComponentType.GIMBAL + elif mav_type == 30: # MAV_TYPE_CAMERA + return ComponentType.CAMERA + elif mav_type in [1, 2, 3, 4, 13, 14, 15, 19, 20, 21, 22]: # 各種飛行器類型 + return ComponentType.AUTOPILOT + else: + return ComponentType.OTHER + + # === 訊息處理器 === + def _handle_heartbeat(self, vehicle, component, msg, timestamp): + """處理 HEARTBEAT 訊息 (msg_id: 0)""" + component.status.mode.base_mode = msg.base_mode + component.status.mode.custom_mode = msg.custom_mode + component.status.mode.mode_name = mavutil.mode_string_v10(msg) + component.status.mode.timestamp = timestamp + component.status.system_status = msg.system_status + component.status.armed = (msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) != 0 + # print("get mode:", mavutil.mode_string_v10(msg)) # debug + + def _handle_attitude(self, vehicle, component, msg, timestamp): + """處理 ATTITUDE 訊息 (msg_id: 30)""" + component.status.attitude.roll = msg.roll + component.status.attitude.pitch = msg.pitch + component.status.attitude.yaw = msg.yaw + component.status.attitude.rollspeed = msg.rollspeed + component.status.attitude.pitchspeed = msg.pitchspeed + component.status.attitude.yawspeed = msg.yawspeed + component.status.attitude.timestamp = timestamp + + def _handle_local_position(self, vehicle, component, msg, timestamp): + """處理 LOCAL_POSITION_NED 訊息 (msg_id: 32)""" + # LOCAL_POSITION_NED 提供相對位置資訊 + component.status.position.relative_altitude = -msg.z # NED 座標系,z 為負表示高度 + component.status.position.timestamp = timestamp + # 也可以存儲到 custom_status 中保留原始資料 + component.status.custom_status['local_position'] = { + 'x': msg.x, 'y': msg.y, 'z': msg.z, + 'vx': msg.vx, 'vy': msg.vy, 'vz': msg.vz + } + + def _handle_global_position(self, vehicle, component, msg, timestamp): + """處理 GLOBAL_POSITION_INT 訊息 (msg_id: 33)""" + component.status.position.latitude = msg.lat / 1e7 # 轉換為度 + component.status.position.longitude = msg.lon / 1e7 # 轉換為度 + component.status.position.altitude = msg.alt / 1000.0 # 轉換為公尺 + component.status.position.relative_altitude = msg.relative_alt / 1000.0 # 轉換為公尺 + component.status.position.timestamp = timestamp + + def _handle_vfr_hud(self, vehicle, component, msg, timestamp): + """處理 VFR_HUD 訊息 (msg_id: 74)""" + component.status.vfr.airspeed = msg.airspeed + component.status.vfr.groundspeed = msg.groundspeed + component.status.vfr.heading = msg.heading + component.status.vfr.throttle = msg.throttle + component.status.vfr.climb = msg.climb + component.status.vfr.timestamp = timestamp + + def _handle_battery_status(self, vehicle, component, msg, timestamp): + """處理 BATTERY_STATUS 訊息 (msg_id: 147)""" + # 計算電池總電壓(mV 轉 V) + if hasattr(msg, 'voltages') and msg.voltages[0] != 65535: + # voltages 是一個陣列,包含各個電池單元的電壓 + total_voltage = sum(v for v in msg.voltages if v != 65535) / 1000.0 + component.status.battery.voltage = total_voltage + + component.status.battery.current = msg.current_battery / 100.0 if msg.current_battery != -1 else None # cA 轉 A + component.status.battery.remaining = msg.battery_remaining if msg.battery_remaining != -1 else None # 百分比 + component.status.battery.temperature = msg.temperature / 100.0 if hasattr(msg, 'temperature') and msg.temperature != 32767 else None # 轉換為攝氏 + component.status.battery.timestamp = timestamp - elif msg_id == 147: # BATTERY_STATUS 處理 - this_component.emitParams['battery'] = msg + # === 訊息發送功能 === + def send_message(self, message_bytes, target_sysid=None, target_socket_id=None, broadcast=False): + """ + 發送訊息到指定的 mavlink_object + + Args: + message_bytes: 準備好的 mavlink 封包 (bytes) + target_sysid: 目標系統 ID (可選,用於自動查找對應的 socket) + target_socket_id: 目標 socket ID (可選,直接指定) + + Returns: + bool: 是否成功發送 + + 使用方式: + 1. broadcast: 廣播到所有活動的 mavlink_object + 2. 指定 target_socket_id:直接發送到該 socket + 3. 指定 target_sysid:自動查找該系統對應的 socket 並發送 + """ + if not isinstance(message_bytes, (bytes, bytearray)): + logger.error(f"Invalid message type: {type(message_bytes)}") + return False - # ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↑↑↑↑↑↑↑↑↑↑↑↑ + # 情況 1: 廣播到所有活動的 mavlink_object + if broadcast: + success_count = 0 + for socket_id, mav_obj in mavlink_object.mavlinkObjects.items(): + if self._send_to_socket(message_bytes, socket_id): + success_count += 1 + + return success_count > 0 - # 若未定義的訊息類型則不處理 並跳出訊息 + # 情況 2: 直接指定 socket_id + if target_socket_id is not None: + return self._send_to_socket(message_bytes, target_socket_id) + + # 情況 3: 透過 sysid 查找對應的 socket + if target_sysid is not None: + vehicle = vehicle_registry.get(target_sysid) + if vehicle and 'socket_id' in vehicle.custom_meta: + socket_id = vehicle.custom_meta['socket_id'] + return self._send_to_socket(message_bytes, socket_id) else: - logger.warning('This Message Type Did not define process method : {} / {}'.format(msg.get_msgId(), msg.get_type())) - continue + logger.warning(f"System ID {target_sysid} not found or no socket_id") + return False - logger.info('End of mavlink_bridge._run_thread') - - # === Node 區塊 === - def _init_node(self): - logger.info('Start of mavlink_bridge._init_node') - super().__init__('mavlink_bridge') # TODO 不知道為何 這句耗時超長 可以到 5~10 秒 - - def emit_info(self): - # 這邊將 mavlink_systems 內所有的 device object 內所有的 component 輪循過 - # 把 emitParams 的參數發送出去 - for sysid, device in self.mavlink_systems.items(): - for compid, component in device.components.items(): - for topic_name in component.publishers.keys(): - publisher = component.publishers[topic_name][0] - packEmit_func = component.publishers[topic_name][1] - packEmit_func(component.emitParams, publisher) - - def _del_node(self): - # TODO 這邊要刪除 node 的時候要做的事情 - # 先註銷所有 mavlink_systems 中 component 的 publisher - # 再註銷所有 mavlink_systems 中的 device object - # 再註銷 node - pass + logger.warning("No target specified for sending message. WTF ARE YOU DOING?") + return False # 若無指定任何目標,則返回失敗 + + def _send_to_socket(self, message_bytes, socket_id): + """ + 將訊息發送到指定的 mavlink_object + + Args: + message_bytes: mavlink 封包 + socket_id: 目標 socket ID + + Returns: + bool: 是否成功 + """ + if socket_id not in mavlink_object.mavlinkObjects: + logger.warning(f"mavlink_object {socket_id} not found") + return False + + mav_obj = mavlink_object.mavlinkObjects[socket_id] + return mav_obj.send_message(message_bytes) # ====================== 分割線 ===================== -''' -本次改版的暫時記錄註解於此 -1. mavlink_object 中 由於 Queue 的效能太差 會完全移除 - 其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現 - 而 multiplexingToSwap 會完全被移除代替方式下一條描述 -2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流 - (註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object) - 因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移 - 並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出 -3. mavlink_object 需要加上 state 去管理其狀態 -4. mavlink_object 需要加上 target port 去管理寫出的目標 -5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道 -6. 基於第1,2項 updateMultiplexingList 會被完全移除 -7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object -8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring 與 return_packet_ring 來取代 另外 swap_queues 會被完全移除 - -''' - # 定義 mavlink_object 的狀態 class MavlinkObjectState(Enum): - INIT = auto() # 初始化狀態 - RUNNING = auto() # 運行中狀態 - ERROR = auto() # 錯誤狀態 - CLOSED = auto() # 已關閉狀態 + INIT = auto() # 初始化狀態 + RUNNING = auto() # 運行中狀態 + SHUTTINGDOWN = auto() # 關閉中狀態 + ERROR = auto() # 錯誤狀態 + CLOSED = auto() # 已關閉狀態 class mavlink_object: ''' @@ -251,25 +370,16 @@ class mavlink_object: def __init__(self, socket): # 登入所需的 socket self.mavlink_socket = socket - - # 存放目前是否分流到固定串流橋接器和回傳封包處理器的標誌 # 都是要做的 把這個判斷拿掉 - # self.send_to_bridge = True - # self.send_to_return = False # 用於主線程發送消息的緩衝區 - self.outgoing_msg_lock = threading.Lock() - self.outgoing_msgs = [] + self.outgoing_msgs = deque() # 記錄訊息過濾類型 (可選) - self.bridge_msg_types = [0] # 默認只處理 HEARTBEAT - self.return_msg_types = [] + self.bridge_msg_types = set([0, 30]) # 0 HEARTBEAT, 30 ATTITUDE, ... + self.return_msg_types = set() - # 目標端口 - self.target_sockets = [] - - # 關聯到全域變數 - global mavlink_systems - self.mavlink_systems = mavlink_systems + # 轉發到別的 mavlink object 作為目標端口 的列表 + self.target_sockets = set() # 物件變數 self.task = None # Task reference @@ -277,12 +387,20 @@ class mavlink_object: self.dirtyDataMax = 10 # 髒資料容許閾值 self.state = MavlinkObjectState.INIT - logger.info(f'mavlink_object instance {self.socket_id} created') + # logger.info(f'mavlink_object instance {self.socket_id} created') # 先註解掉避免太多 log 但是為了 debug 保留 + + # 附加參數 (並非 mavlink_object 運行本體必要 但是要給上層結構運用的) + # 若這個 socket 是 另一個"主要 socket"的備用連接 則設定為"主要 socket id" + self.primary_socket_id = None # None 表示不是備用連接 + # socket type + self.socket_type = 'UNDEFINED' # 可選 'UDP_INBOUND', 'SERIAL_XBEE'...etc def __del__(self): - # 停止 asyncio task - self.stop() - + # # 先移除其他 socket 指向這個 socket 的目標 # 還是不要在這邊做了 畢竟本來就有判斷 object 不活躍就不轉拋 + # for mavlink_obj in self.mavlinkObjects.values(): + # if self.socket_id in mavlink_obj.target_sockets: + # mavlink_obj.remove_target_socket(self.socket_id) + # 關閉 socket if hasattr(self, 'mavlink_socket') and self.mavlink_socket: try: @@ -304,110 +422,68 @@ class mavlink_object: # except Exception as e: # print(f"Error logging in __del__: {e}") - def start(self): - """啟動 mavlink_object 處理循環""" - if self.state == MavlinkObjectState.RUNNING: - logger.warning(f"mavlink_object {self.socket_id} is already running") - return - - self.state = MavlinkObjectState.RUNNING - # 實際的啟動會由 async_io_manager 處理 - - def stop(self): - """停止 mavlink_object 處理循環""" - if self.state in (MavlinkObjectState.CLOSED, MavlinkObjectState.ERROR): - return - - self.state = MavlinkObjectState.CLOSED - if self.task: - if not self.task.done(): - self.task.cancel() - async def process_data(self): """處理 mavlink 數據的主要 asyncio 協程""" - logger.info(f'Start of mavlink_object.process_data id: {self.socket_id}') + # logger.info(f'Start of mavlink_object id: {self.socket_id}') # 先註解掉避免太多 log 但是為了 debug 保留 while self.state == MavlinkObjectState.RUNNING: timestamp = time.time() - try: #TODO 這邊的錯誤處理要再想想看怎麼做比較好 - # 處理接收到的封包 - mavlinkMsg = self.mavlink_socket.recv_msg() - # except Exception as e: - # logger.warning(f"Error in mavlink_object.process_data for id {self.socket_id}: {e}") - # self.dirtyDataCount += 1 - - # if self.dirtyDataCount >= self.dirtyDataMax: - # logger.error(f"Too many dirty data received in mavlink_object {self.socket_id}, entering ERROR state") - # self.state = MavlinkObjectState.ERROR - # # 不直接退出,嘗試容忍髒數據 - # await asyncio.sleep(0.01) # 短暫暫停 - # continue - # try: - if mavlinkMsg: - # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間 - sysid = mavlinkMsg.get_srcSystem() - compid = mavlinkMsg.get_srcComponent() - - if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息 - self.mavlink_systems[sysid].updateComponentPacketCount( - compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) #TODO 這邊怪怪的 好像要再檢查 - - # 分發訊息到 RingBuffer - msg_id = mavlinkMsg.get_msgId() - - # if self.send_to_bridge and (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types): - if (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types): - stream_bridge_ring.put((self.socket_id, timestamp, mavlinkMsg)) - - # if self.send_to_return and (msg_id in self.return_msg_types or -1 in self.return_msg_types): - if (msg_id in self.return_msg_types or -1 in self.return_msg_types): - return_packet_ring.put((self.socket_id, timestamp, mavlinkMsg)) - - # 將接收到的訊息轉發給所有目標端口 - for target_port in self.target_sockets: - if target_port != self.socket_id and target_port in self.mavlinkObjects: - target_obj = self.mavlinkObjects[target_port] - if target_obj.state == MavlinkObjectState.RUNNING: - try: # TODO 藉由 if 的檢測 確定目標端口是開啟狀態後 再進行寫出 之後刪掉 try except - target_obj.mavlink_socket.write(mavlinkMsg.get_msgbuf()) - except Exception as e: - logger.error(f"Error forwarding message to port {target_port}: {e}") - - with self.outgoing_msg_lock: - if self.outgoing_msgs and (send_msg := self.outgoing_msgs.pop(0)): - try: - self.mavlink_socket.write(send_msg) - except Exception as e: - logger.error(f"mavlink_object {self.socket_id} failed to send message: {e}") + # 處理接收到的封包 + mavlinkMsg = self.mavlink_socket.recv_msg() + + if mavlinkMsg: + # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間 + sysid = mavlinkMsg.get_srcSystem() + compid = mavlinkMsg.get_srcComponent() - # 這邊的重點不是延遲 而是透過 await 讓出 event loop 的控制權 - await asyncio.sleep(0.001) + # 更新封包統計資訊 + vehicle = vehicle_registry.get(sysid) + if vehicle: # 只有當這個 system id 已經被註冊過才會記錄統計 + component = vehicle.get_component(compid) + if component: + component.update_packet_stats( + mavlinkMsg.get_seq(), + mavlinkMsg.get_msgId(), + timestamp + ) - except asyncio.CancelledError: - logger.info(f'mavlink_object.process_data for id {self.socket_id} was cancelled') - break - except Exception as e: - logger.warning(f"Error in mavlink_object.process_data for id {self.socket_id}: {e}") - self.dirtyDataCount += 1 + # 分發訊息到 RingBuffer + msg_id = mavlinkMsg.get_msgId() - if self.dirtyDataCount >= self.dirtyDataMax: - logger.error(f"Too many dirty data received in mavlink_object {self.socket_id}, entering ERROR state") - self.state = MavlinkObjectState.ERROR - # 不直接退出,嘗試容忍髒數據 - await asyncio.sleep(0.01) # 短暫暫停避免CPU過載 + if (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types): + stream_bridge_ring.put((self.socket_id, timestamp, mavlinkMsg)) + + if (msg_id in self.return_msg_types or -1 in self.return_msg_types): + return_packet_ring.put((self.socket_id, timestamp, mavlinkMsg)) - - logger.info(f'End of mavlink_object.process_data id: {self.socket_id}') + # 將全部接收到的訊息轉發給目標列表中的 mavlink_object + for target_socket in self.target_sockets: + if target_socket in self.mavlinkObjects: + target_obj = self.mavlinkObjects[target_socket] + if target_obj.state == MavlinkObjectState.RUNNING: + target_obj.mavlink_socket.write(mavlinkMsg.get_msgbuf()) + + if self.outgoing_msgs: + send_msg = self.outgoing_msgs.popleft() + self.mavlink_socket.write(send_msg) + + + # 這邊的重點不是延遲 而是透過 await 讓出 event loop 的控制權 + await asyncio.sleep(0.001) + + logger.info(f'End of mavlink_object id: {self.socket_id}') self.state = MavlinkObjectState.CLOSED def add_target_socket(self, target_socket_id): """添加一個目標端口用於轉發""" - if target_socket_id not in self.target_sockets and target_socket_id != self.socket_id: - self.target_sockets.append(target_socket_id) - logger.info(f"mavlink_object Added target port {target_socket_id} to mavlink_object {self.socket_id}") - return True - return False + if (target_socket_id != self.socket_id) and (target_socket_id != self.primary_socket_id): + if target_socket_id not in self.target_sockets: + self.target_sockets.add(target_socket_id) + logger.info(f"mavlink_object Added target port {target_socket_id} to mavlink_object {self.socket_id}") + return True + return False # 已存在 + return False # 不能添加自己 也不能添加主要 socket def remove_target_socket(self, target_socket_id): """移除目標端口""" @@ -415,12 +491,12 @@ class mavlink_object: self.target_sockets.remove(target_socket_id) logger.info(f"mavlink_object Removed target port {target_socket_id} from mavlink_object {self.socket_id}") return True - return False + return False # 不存在 def set_bridge_message_types(self, msg_types): """設置需要分流到橋接器的訊息類型""" if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types): - self.bridge_msg_types = msg_types + self.bridge_msg_types = set(msg_types) return True logger.error(f"Invalid bridge message types: {msg_types}") return False @@ -428,7 +504,7 @@ class mavlink_object: def set_return_message_types(self, msg_types): """設置需要分流到回傳處理器的訊息類型""" if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types): - self.return_msg_types = msg_types + self.return_msg_types = set(msg_types) return True logger.error(f"Invalid return message types: {msg_types}") return False @@ -444,31 +520,45 @@ class mavlink_object: Returns: bool: 是否成功添加消息到列表 """ + # 狀態檢查 if self.state != MavlinkObjectState.RUNNING: logger.warning(f"Cannot send message: mavlink_object {self.socket_id} is not running") return False - try: - # 使用鎖保護共享資源訪問 - with self.outgoing_msg_lock: - self.outgoing_msgs.append(message_bytes) - return True - except Exception as e: - logger.error(f"Error queueing message for mavlink_object {self.socket_id}: {e}") - return False + # 基本數據類型檢查(輕量級) + if not isinstance(message_bytes, (bytes, bytearray)): + logger.error(f"Invalid message type: expected bytes/bytearray, got {type(message_bytes)}") + return False + + # 基本長度檢查(MAVLink v1.0 最小8字節,v2.0最小12字節) + if len(message_bytes) < 8: + logger.error(f"Message too short: {len(message_bytes)} bytes (minimum 8)") + return False + + # MAVLink 起始標記檢查(輕量級) + if len(message_bytes) > 0 and message_bytes[0] not in (0xFE, 0xFD): # v1.0: 0xFE, v2.0: 0xFD + logger.error(f"Invalid MAVLink start marker: 0x{message_bytes[0]:02X}") + return False + + # 緩衝區大小保護(防止記憶體耗盡) + if len(self.outgoing_msgs) > 1000: # 可調整的閾值 + logger.warning(f"Outgoing message buffer full for mavlink_object {self.socket_id}") + return False - # def enable_bridge(self, enable=True): - # """啟用或禁用橋接器分流""" - # self.send_to_bridge = enable - - # def enable_return(self, enable=True): - # """啟用或禁用回傳處理器分流""" - # self.send_to_return = enable + self.outgoing_msgs.append(message_bytes) + return True class async_io_manager: """ 管理所有 mavlink_object 實例的 asyncio 任務 提供單一線程來處理所有 mavlink 通道的數據 + + 首先 async_io_manager 是 singleton 的 所以只能有一個實例 + + 這個 async_io_manager 是藉由 start 方法來啟動的 + + start 方法 會先做一個新的執行緒 然後讓新的執行緒 透過 _run_event_loop 方法來建立一個空的事件循環 self.loop + 然後在 _run_event_loop 方法中 會建立一個異步任務 _main_task 來監控和管理所有的 mavlink_object 任務 """ _instance = None _lock = threading.Lock() @@ -483,157 +573,214 @@ class async_io_manager: if not hasattr(self, 'initialized'): self.initialized = True self.loop = None - self.main_task = None self.running = False - self.managed_objects = {} # socket_id: task - + # self.main_task = None + self.managed_objects = {} # socket_id: mavlink_object + self.thread = None + self._stop_event = threading.Event() + + def __del__(self): + self.loop = None + self.thread = None + def start(self): - """啟動 async_io_manager 和其管理的所有 mavlink_object""" + """ + 啟動 async_io_manager + + """ if self.running: logger.warning("async_io_manager already running") return self.running = True - self.thread = threading.Thread(target=self._run_event_loop) + self._stop_event.clear() + + # 啟動獨立線程 命名為 AsyncIOManager + self.thread = threading.Thread( + target=self._run_event_loop, + name="AsyncIOManager" + ) + self.thread.daemon = False # 不設為 daemon,確保正確關閉 self.thread.start() - logger.info("async_io_manager started") + + # 等待 _run_event_loop 建立事件循環的物件 self.loop + start_timeout = 2.0 + start_time = time.time() + while not self.loop and time.time() - start_time < start_timeout: + time.sleep(0.1) + + # 檢查另一個執行緒有沒有成功建立事件循環物件 self.loop + if self.loop: + logger.info("async_io_manager thread started <-") + return True + else: + logger.error("async_io_manager failed to start") + return False - def stop(self): + def shutdown(self): """停止 async_io_manager 和其管理的所有 mavlink_object""" + + # 自己在 running 狀態下才執行停止程序 if not self.running: return - + + # 停止所有被管理的 mavlink_object 所屬的 task + for socket_id in list(self.managed_objects.keys()): + self.remove_mavlink_object(socket_id) + + # 停止自己的 task self.running = False + self._stop_event.set() + + # 解開事件循環的阻塞 + self.loop.call_soon_threadsafe(self.loop.stop) + + # print("mark A", len(asyncio.all_tasks(self.loop))) # debug - if self.loop: - # 取消所有任務 - for socket_id in list(self.managed_objects.keys()): - self.remove_mavlink_object(socket_id) - - # 停止事件循環 - if self.main_task and not self.main_task.done(): - asyncio.run_coroutine_threadsafe(self._shutdown(), self.loop) - - # 等待線程結束 - if hasattr(self, 'thread') and self.thread.is_alive(): - self.thread.join(timeout=5.0) - - logger.info("async_io_manager stopped") - - async def _shutdown(self): - """正確關閉事件循環的協程""" - tasks = [task for task in asyncio.all_tasks(self.loop) if task is not asyncio.current_task()] + # 等待線程結束 + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=10.0) + if self.thread.is_alive(): + logger.warning("async_io_manager thread did not stop gracefully") + os.kill(os.getpid(), signal.SIGTERM) # 強制終止程序 - for task in tasks: - task.cancel() + - await asyncio.gather(*tasks, return_exceptions=True) - self.loop.stop() + logger.info("async_io_manager thread END!") def _run_event_loop(self): """在獨立線程中運行事件循環""" - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) - try: - self.main_task = self.loop.create_task(self._main_task()) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + logger.info("async_io_manager event loop started <-") + + # 創建主任務 + # main_task = self.loop.create_task(self._main_task()) + + # 運行事件循環 self.loop.run_forever() + except Exception as e: logger.error(f"Error in async_io_manager event loop: {e}") finally: - self.loop.close() + # 清理 + if self.loop: + self.loop.close() self.loop = None self.running = False - logger.info("async_io_manager event loop ended") + logger.info("async_io_manager event loop END!") - async def _main_task(self): - """主任務協程,用於監視和管理其他任務""" + async def _main_task(self): # 當初想說可能要一個額外的 task 來管理 但是目前好像用不掉 先放著不管 + """主任務協程 讓 async_io_manager 在執行緒中持續運作""" logger.info("async_io_manager main task started") - try: - while self.running: - # 這邊就是一個無窮迴圈 確保 async_io_manager 物件續存 + while self.running and not self._stop_event.is_set(): + await asyncio.sleep(0.1) - # 每秒鐘檢查並移除已完成或已取消的任務 - await asyncio.sleep(1.0) - for socket_id in list(self.managed_objects.keys()): - task = self.managed_objects[socket_id] - - if task.done(): - try: # TODO 這邊的錯誤處理要再想想看怎麼做比較好 - exc = task.exception() - if exc: - logger.error(f"Task for mavlink_object {socket_id} raised exception: {exc}") - except (asyncio.CancelledError, asyncio.InvalidStateError): - pass - - if socket_id in mavlink_object.mavlinkObjects: - print(f"this is manager, i make socket_id {socket_id} closed, he is now in {mavlink_object.mavlinkObjects[socket_id].state} state.") # debug - mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED - - del self.managed_objects[socket_id] - - - - except asyncio.CancelledError: - logger.info("async_io_manager main task was cancelled") - except Exception as e: - logger.error(f"Error in async_io_manager main task: {e}") - logger.info("async_io_manager main task ended") - + def add_mavlink_object(self, mavlink_obj): - """添加一個 mavlink_object 實例到 async_io_manager 並啟動其處理任務""" - if not self.running: + """添加 mavlink_object""" + # 一個防呆 確保有 event loop 與 _main_task 正在運作 + if not self.running or not self.loop: logger.error("Cannot add mavlink_object: async_io_manager is not running") return False - - if not isinstance(mavlink_obj, mavlink_object): - logger.error(f"Invalid mavlink_object: {mavlink_obj}") - return False - + socket_id = mavlink_obj.socket_id if socket_id in self.managed_objects: logger.warning(f"mavlink_object {socket_id} already managed") return False - - # 創建並啟動任務 - if self.loop: - # task = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop).result() - future = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop) - # print(f"Task created for mavlink_object {socket_id}: {future1}") # debug - # task = future1.result() - self.managed_objects[socket_id] = future - mavlink_obj.task = future + # 使用 run_coroutine_threadsafe 執行協程並獲取結果 + future = asyncio.run_coroutine_threadsafe( + self._async_add_mavlink_object(mavlink_obj), + self.loop + ) + + try: + # 等待結果,設定合理的超時時間 + result = future.result(timeout=3.0) + return result + except asyncio.TimeoutError: + logger.error(f"Timeout adding mavlink_object {socket_id}") + return False + except Exception as e: + logger.error(f"Failed to add mavlink_object {socket_id}: {e}") + return False + + async def _async_add_mavlink_object(self, mavlink_obj): + """在事件循環線程中同步執行""" + socket_id = mavlink_obj.socket_id + + try: + task = asyncio.create_task(mavlink_obj.process_data()) + self.managed_objects[socket_id] = mavlink_obj + mavlink_obj.task = task mavlink_obj.state = MavlinkObjectState.RUNNING - logger.info(f"Added mavlink_object {socket_id} to async_io_manager") + mavlink_obj.outgoing_msgs.clear() + logger.info(f"Added mavlink_object {socket_id} into manager.") return True - - return False + except Exception as e: + logger.error(f"Failed to create task for mavlink_object {socket_id}: {e}") + return False def remove_mavlink_object(self, socket_id): - """從 async_io_manager 中移除一個 mavlink_object 實例並取消其處理任務""" - if socket_id not in self.managed_objects: - logger.warning(f"mavlink_object {socket_id} not managed by async_io_manager") + """移除 mavlink_object""" + + # 一個防呆 確保有 event loop 正在運作 + if not self.loop: return False - # 取消任務 - task = self.managed_objects[socket_id] - if not task.done(): - task.cancel() + # 同樣使用 run_coroutine_threadsafe + future = asyncio.run_coroutine_threadsafe( + self._async_remove_mavlink_object(socket_id), + self.loop + ) - # 從管理列表中移除 - del self.managed_objects[socket_id] + try: + result = future.result(timeout=3.0) + return result + except asyncio.TimeoutError: + logger.error(f"Timeout removing mavlink_object {socket_id}") + return False + except Exception as e: + logger.error(f"Failed to remove mavlink_object {socket_id}: {e}") + return False + + async def _async_remove_mavlink_object(self, socket_id): + """在事件循環線程中同步執行""" + if socket_id not in self.managed_objects: + logger.warning(f"mavlink_object {socket_id} not managed") + return - # 更新 mavlink_object 狀態 - if socket_id in mavlink_object.mavlinkObjects: - mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED + mavlink_obj = self.managed_objects[socket_id] + mavlink_obj.state = MavlinkObjectState.SHUTTINGDOWN + + if not mavlink_obj.task.done(): + mavlink_obj.task.cancel() - logger.info(f"Removed mavlink_object {socket_id} from async_io_manager") - return True - + # 等待一秒或者 task完全結束 + timeout = 1.0 + start_time = asyncio.get_event_loop().time() + while not mavlink_obj.task.done(): + if asyncio.get_event_loop().time() - start_time > timeout: + break + await asyncio.sleep(0.1) + + # 如果正常結束 則移除 + if mavlink_obj.task.done(): + del self.managed_objects[socket_id] + mavlink_obj.state = MavlinkObjectState.CLOSED + logger.info(f"Removed mavlink_object {socket_id} from manager.") + return True + else: + mavlink_obj.state = MavlinkObjectState.ERROR + logger.warning(f"mavlink_object {socket_id} task did not terminate in time") + return False + def get_managed_objects(self): """獲取所有被管理的對象列表""" return list(self.managed_objects.keys()) diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkVehicleView.py b/src/fc_network_adapter/fc_network_adapter/mavlinkVehicleView.py new file mode 100644 index 0000000..86c0478 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkVehicleView.py @@ -0,0 +1,435 @@ +""" +VehicleView - Pure State Container +純粹的狀態容器,不主動通訊、不背景下載參數、不搶 RF/MAVLink 流量 +只提供狀態存取介面,由外部手動餵資料(push state) +""" + +import os +from typing import Dict, Optional, Any +from dataclasses import dataclass, field +from enum import Enum + +from .utils import setup_logger + +logger = setup_logger(os.path.basename(__file__)) + + +# ====================== Enums ===================== + +class ConnectionType(Enum): + """連接類型""" + SERIAL = "serial" + UDP = "udp" + TCP = "tcp" + OTHER = "other" + + +class ComponentType(Enum): + """組件類型""" + AUTOPILOT = "autopilot" + GCS = "gcs" + CAMERA = "camera" + GIMBAL = "gimbal" + OTHER = "other" + + +class RFModuleType(Enum): + """RF模組類型""" + XBEE = "xbee" + UDP = "udp" + TCP = "tcp" + OTHER = "other" + + +# ====================== Data Classes ===================== + +@dataclass +class Position: + """位置資訊""" + latitude: Optional[float] = None # 緯度 (度) + longitude: Optional[float] = None # 經度 (度) + altitude: Optional[float] = None # 海拔高度 (公尺) + relative_altitude: Optional[float] = None # 相對高度 (公尺) + timestamp: Optional[float] = None # 時間戳記 + + +@dataclass +class Attitude: + """姿態資訊""" + roll: Optional[float] = None # 橫滾角 (弧度) + pitch: Optional[float] = None # 俯仰角 (弧度) + yaw: Optional[float] = None # 偏航角 (弧度) + rollspeed: Optional[float] = None # 橫滾速度 (弧度/秒) + pitchspeed: Optional[float] = None # 俯仰速度 (弧度/秒) + yawspeed: Optional[float] = None # 偏航速度 (弧度/秒) + timestamp: Optional[float] = None # 時間戳記 + + +@dataclass +class FlightMode: + """飛行模式資訊""" + base_mode: Optional[int] = None # MAVLink base mode + custom_mode: Optional[int] = None # 自定義模式 + mode_name: Optional[str] = None # 模式名稱 (例如: "GUIDED", "AUTO") + timestamp: Optional[float] = None # 時間戳記 + + +@dataclass +class Battery: + """電池資訊""" + voltage: Optional[float] = None # 電壓 (V) + current: Optional[float] = None # 電流 (A) + remaining: Optional[int] = None # 剩餘電量 (%) + temperature: Optional[float] = None # 溫度 (攝氏) + timestamp: Optional[float] = None # 時間戳記 + + +@dataclass +class EKF: + """EKF狀態資訊""" + flags: Optional[int] = None # EKF 旗標 + velocity_variance: Optional[float] = None # 速度變異 + pos_horiz_variance: Optional[float] = None # 水平位置變異 + pos_vert_variance: Optional[float] = None # 垂直位置變異 + compass_variance: Optional[float] = None # 羅盤變異 + terrain_alt_variance: Optional[float] = None # 地形高度變異 + timestamp: Optional[float] = None # 時間戳記 + + +@dataclass +class GPS: + """GPS資訊""" + fix_type: Optional[int] = None # GPS fix 類型 (0=無, 1=無fix, 2=2D, 3=3D, 4=DGPS, 5=RTK) + satellites_visible: Optional[int] = None # 可見衛星數 + eph: Optional[int] = None # GPS HDOP 水平精度因子 + epv: Optional[int] = None # GPS VDOP 垂直精度因子 + timestamp: Optional[float] = None # 時間戳記 + + +@dataclass +class VFR: + """VFR HUD資訊""" + airspeed: Optional[float] = None # 空速 (m/s) + groundspeed: Optional[float] = None # 地速 (m/s) + heading: Optional[int] = None # 航向 (度) + throttle: Optional[int] = None # 油門 (%) + climb: Optional[float] = None # 爬升率 (m/s) + timestamp: Optional[float] = None # 時間戳記 + + +@dataclass +class ComponentStatus: + """組件狀態容器""" + position: Position = field(default_factory=Position) + attitude: Attitude = field(default_factory=Attitude) + mode: FlightMode = field(default_factory=FlightMode) + battery: Battery = field(default_factory=Battery) + ekf: EKF = field(default_factory=EKF) + gps: GPS = field(default_factory=GPS) + vfr: VFR = field(default_factory=VFR) + + # 系統狀態 + system_status: Optional[int] = None # MAV_STATE + armed: Optional[bool] = None # 解鎖狀態 + + # 其他自定義狀態 + custom_status: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class PacketStats: + """封包統計資訊""" + received_count: int = 0 # 接收封包數 + lost_count: int = 0 # 遺失封包數 + last_seq: Optional[int] = None # 最後序號 + last_msg_time: Optional[float] = None # 最後訊息時間 + msg_type_count: Dict[int, int] = field(default_factory=dict) # 各類訊息計數 {msg_type: count} + + +@dataclass +class RFStatus: + """RF模組狀態""" + rssi: Optional[int] = None # 信號強度 + noise: Optional[int] = None # 噪音水平 + at_response: Optional[str] = None # AT 命令回應 + link_quality: Optional[int] = None # 連接品質 + timestamp: Optional[float] = None # 時間戳記 + custom_status: Dict[str, Any] = field(default_factory=dict) # 其他自定義狀態 + + +@dataclass +class SocketInfo: + """Socket連接資訊""" + ip: Optional[str] = None # IP位址 + port: Optional[int] = None # 埠號 + local_ip: Optional[str] = None # 本地IP + local_port: Optional[int] = None # 本地埠號 + connected: bool = False # 連接狀態 + + +# ====================== Component Class ===================== + +class VehicleComponent: + """載具組件 - 代表一個 MAVLink component""" + + def __init__(self, component_id: int, comp_type: ComponentType = ComponentType.OTHER): + self.component_id = component_id + self.type = comp_type + + # MAVLink 組件資訊 + self.mav_type: Optional[int] = None # MAV_TYPE + self.mav_autopilot: Optional[int] = None # MAV_AUTOPILOT + + # 狀態容器 + self.status = ComponentStatus() + + # 參數容器 (只存放,不主動下載) + self.parameters: Dict[str, Any] = {} + + # 封包統計 + self.packet_stats = PacketStats() + + def update_packet_stats(self, seq: int, msg_type: int, timestamp: float) -> None: + """ + 更新封包統計 + + Args: + seq: 訊息序號 + msg_type: 訊息類型 + timestamp: 時間戳記 + """ + stats = self.packet_stats + + # 檢查遺失封包 + if stats.last_seq is not None: + expected_seq = (stats.last_seq + 1) % 256 + diff = seq - expected_seq + if diff < 0: + diff += 256 + stats.lost_count += diff + + # 更新統計資訊 + stats.received_count += 1 + stats.last_seq = seq + stats.last_msg_time = timestamp + + # 更新訊息類型計數 + if msg_type in stats.msg_type_count: + stats.msg_type_count[msg_type] += 1 + else: + stats.msg_type_count[msg_type] = 1 + + def reset_packet_stats(self) -> None: + """重置封包統計""" + self.packet_stats = PacketStats() + + def set_parameter(self, param_name: str, param_value: Any) -> None: + """設定參數 (手動餵入)""" + self.parameters[param_name] = param_value + + def get_parameter(self, param_name: str, default: Any = None) -> Any: + """取得參數""" + return self.parameters.get(param_name, default) + + def __str__(self) -> str: + return (f"Component(id={self.component_id}, type={self.type.value}, " + f"mav_type={self.mav_type}, received={self.packet_stats.received_count}, " + f"lost={self.packet_stats.lost_count})") + + +# ====================== RF Module Class ===================== + +class RFModule: + """RF模組資訊容器""" + + def __init__(self, rf_type: RFModuleType = RFModuleType.OTHER): + self.type = rf_type + self.status = RFStatus() + self.socket_info = SocketInfo() + + def update_rssi(self, rssi: int, timestamp: Optional[float] = None) -> None: + """更新RSSI""" + self.status.rssi = rssi + if timestamp: + self.status.timestamp = timestamp + + def update_at_response(self, response: str, timestamp: Optional[float] = None) -> None: + """更新AT命令回應""" + self.status.at_response = response + if timestamp: + self.status.timestamp = timestamp + + def update_socket_info(self, ip: str = None, port: int = None, + local_ip: str = None, local_port: int = None, + connected: bool = None) -> None: + """更新Socket資訊""" + if ip is not None: + self.socket_info.ip = ip + if port is not None: + self.socket_info.port = port + if local_ip is not None: + self.socket_info.local_ip = local_ip + if local_port is not None: + self.socket_info.local_port = local_port + if connected is not None: + self.socket_info.connected = connected + + def __str__(self) -> str: + return (f"RFModule(type={self.type.value}, rssi={self.status.rssi}, " + f"connected={self.socket_info.connected})") + + +# ====================== Main VehicleView Class ===================== + +class VehicleView: + """ + 載具視圖 - 純狀態容器 + + 特點: + 1. 只有狀態容器,沒有任何主動通訊功能 + 2. 不主動通訊、不背景下載參數、不搶 RF/MAVLink 流量 + 3. 可以手動餵資料 (push state) + 4. 可擴充 (支援 RF 模組狀態) + """ + + def __init__(self, sysid: int): + # Meta 資訊 + self.sysid = sysid + self.kind: Optional[str] = None # 載具種類描述 (例如: "Copter", "Plane") + self.vehicle_type: Optional[int] = None # MAV_TYPE + self.connected_via: ConnectionType = ConnectionType.OTHER + + # 組件容器 {component_id: VehicleComponent} + self.components: Dict[int, VehicleComponent] = {} + + # RF模組 + self.rf_module: Optional[RFModule] = None + + # 其他自定義meta資訊 + self.custom_meta: Dict[str, Any] = {} + + def add_component(self, component_id: int, comp_type: ComponentType = ComponentType.OTHER) -> VehicleComponent: + """ + 新增組件 + + Args: + component_id: 組件ID + comp_type: 組件類型 + + Returns: + VehicleComponent: 新增的組件 + """ + if component_id not in self.components: + self.components[component_id] = VehicleComponent(component_id, comp_type) + logger.info(f"Added component {component_id} to system {self.sysid}") + return self.components[component_id] + + def get_component(self, component_id: int) -> Optional[VehicleComponent]: + """取得組件""" + return self.components.get(component_id) + + def remove_component(self, component_id: int) -> bool: + """移除組件""" + if component_id in self.components: + del self.components[component_id] + logger.info(f"Removed component {component_id} from system {self.sysid}") + return True + return False + + def set_rf_module(self, rf_type: RFModuleType) -> RFModule: + """設定RF模組""" + self.rf_module = RFModule(rf_type) + return self.rf_module + + def get_rf_module(self) -> Optional[RFModule]: + """取得RF模組""" + return self.rf_module + + def __str__(self) -> str: + comp_list = ", ".join([str(cid) for cid in self.components.keys()]) + return (f"VehicleView(sysid={self.sysid}, kind={self.kind}, " + f"connected_via={self.connected_via.value}, " + f"components=[{comp_list}], " + f"rf_module={self.rf_module is not None})") + + def to_dict(self) -> Dict[str, Any]: + """轉換為字典格式 (方便序列化/除錯)""" + return { + 'meta': { + 'sysid': self.sysid, + 'kind': self.kind, + 'vehicle_type': self.vehicle_type, + 'connected_via': self.connected_via.value, + 'custom_meta': self.custom_meta + }, + 'components': { + cid: { + 'component_id': comp.component_id, + 'type': comp.type.value, + 'mav_type': comp.mav_type, + 'mav_autopilot': comp.mav_autopilot, + 'packet_stats': { + 'received': comp.packet_stats.received_count, + 'lost': comp.packet_stats.lost_count, + 'last_seq': comp.packet_stats.last_seq, + 'last_msg_time': comp.packet_stats.last_msg_time + }, + 'parameters_count': len(comp.parameters) + } + for cid, comp in self.components.items() + }, + 'rf_module': { + 'type': self.rf_module.type.value, + 'rssi': self.rf_module.status.rssi, + 'socket_connected': self.rf_module.socket_info.connected + } if self.rf_module else None + } + + +# ====================== Registry ===================== + +class VehicleViewRegistry: + """載具視圖註冊表 - 管理所有的 VehicleView""" + + def __init__(self): + self._vehicles: Dict[int, VehicleView] = {} + + def register(self, sysid: int) -> VehicleView: + """註冊新的載具視圖""" + if sysid not in self._vehicles: + self._vehicles[sysid] = VehicleView(sysid) + logger.info(f"Registered new VehicleView for system {sysid}") + return self._vehicles[sysid] + + def get(self, sysid: int) -> Optional[VehicleView]: + """取得載具視圖""" + return self._vehicles.get(sysid) + + def unregister(self, sysid: int) -> bool: + """註銷載具視圖""" + if sysid in self._vehicles: + del self._vehicles[sysid] + logger.info(f"Unregistered VehicleView for system {sysid}") + return True + return False + + def get_all(self) -> Dict[int, VehicleView]: + """取得所有載具視圖""" + return self._vehicles.copy() + + def clear(self) -> None: + """清空所有載具視圖""" + self._vehicles.clear() + logger.info("Cleared all VehicleViews") + + def __len__(self) -> int: + return len(self._vehicles) + + def __contains__(self, sysid: int) -> bool: + return sysid in self._vehicles + + +# ====================== Global Instance ===================== + +# 全域註冊表實例 +vehicle_registry = VehicleViewRegistry() diff --git a/src/fc_network_adapter/tests/demo_integration.py b/src/fc_network_adapter/tests/demo_integration.py index b621e73..7546f65 100644 --- a/src/fc_network_adapter/tests/demo_integration.py +++ b/src/fc_network_adapter/tests/demo_integration.py @@ -14,12 +14,13 @@ from pymavlink import mavutil # 自定義的 import from ..fc_network_adapter import mavlinkObject as mo -from ..fc_network_adapter import mavlinkDevice as md +from ..fc_network_adapter import mavlinkVehicleView as mvv +# from ..fc_network_adapter import mavlinkDevice as md # ====================== 分割線 ===================== test_item = 10 -running_time = 10000 +running_time = 3 print('test_item : ', test_item) @@ -44,10 +45,13 @@ if test_item == 10: time.sleep(0.5) # 等待事件循環啟動 # 初始化輸入通道 - connection_string="udp:127.0.0.1:14560" + connection_string="udp:127.0..1:14571" mavlink_socket1 = mavutil.mavlink_connection(connection_string) mavlink_object1 = mo.mavlink_object(mavlink_socket1) + sock = mavlink_socket1.port + print("Socket port:", sock) + manager.add_mavlink_object(mavlink_object1) start_time = time.time() @@ -65,7 +69,7 @@ if test_item == 10: print("return_packet_ring is empty") time.sleep(1) - manager.stop() + manager.shutdown() print('<=== End of Program') @@ -83,20 +87,22 @@ elif test_item == 11: time.sleep(0.5) # 等待事件循環啟動 # 初始化輸入通道 - connection_string="udp:127.0.0.1:14560" + connection_string="udp:127.0.0.1:14571" mavlink_socket1 = mavutil.mavlink_connection(connection_string) mavlink_object1 = mo.mavlink_object(mavlink_socket1) manager.add_mavlink_object(mavlink_object1) # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() + bridge = mo.mavlink_bridge() + bridge.start() time.sleep(3) # 印出目前所有 mavlink_systems 的內容 print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) + all_vehicles = mvv.vehicle_registry.get_all() + for sysid, vehicle in all_vehicles.items(): + print(f" System {sysid}: {vehicle}") start_time = time.time() show_time = time.time() @@ -105,14 +111,15 @@ elif test_item == 11: # print("mark B") show_time = time.time() - for sysid in analyzer.mavlink_systems: - for compid in analyzer.mavlink_systems[sysid].components: - print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) - analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) + for sysid, vehicle in all_vehicles.items(): + for compid in vehicle.components: + comp = vehicle.get_component(compid) + print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, comp.packet_stats.received_count)) + comp.reset_packet_stats() print("===================") - manager.stop() - analyzer.stop() + manager.shutdown() + bridge.stop() print('<=== End of Program') @@ -130,16 +137,16 @@ elif test_item == 12: time.sleep(0.5) # 等待事件循環啟動 # 初始化輸入通道 - connection_string="udp:127.0.0.1:14560" + connection_string="udp:127.0.0.1:14571" mavlink_socket_in1 = mavutil.mavlink_connection(connection_string) mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1) - connection_string="udp:127.0.0.1:14561" + connection_string="udp:127.0.0.1:14571" mavlink_socket_in2 = mavutil.mavlink_connection(connection_string) mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2) # 初始化輸出通道 - connection_string="udpout:127.0.0.1:14550" + connection_string="udpout:127.0.0.1:14551" mavlink_socket_out = mavutil.mavlink_connection(connection_string) mavlink_object_out = mo.mavlink_object(mavlink_socket_out) @@ -160,66 +167,11 @@ elif test_item == 12: time.sleep(1) - manager.stop() + manager.shutdown() print('<=== End of Program') -# elif test_item == 20: -# # 這邊測試 node 生成 topic 的功能 -# # 利用 空的通道 發出假的 heartbeat 封包 -# print('===> Start of Program .Test ', test_item) -# rclpy.init() # 注意要初始化 rclpy 才能使用 node - -# from pymavlink.dialects.v20 import common as mavlink2 - -# sysid = 1 -# compid = 1 - -# # 啟動 mavlink_bridge -# analyzer = mo.mavlink_bridge() - -# mav = mavlink2.MAVLink(None) -# mav.srcSystem = sysid -# mav.srcComponent = compid - -# # 這是一個 heartbeat 封包 -# fake_heartbeat = mavlink2.MAVLink_heartbeat_message( -# type=mavlink2.MAV_TYPE_QUADROTOR, -# autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA, -# base_mode=3, -# custom_mode=0, -# system_status=0, -# mavlink_version=3 -# ) -# mavlink_bytes = fake_heartbeat.pack(mav) -# fake_heartbeat_msg = mav.parse_char(mavlink_bytes) - -# print(analyzer.mavlink_systems) - -# mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data -# time.sleep(0.1) -# print(analyzer.mavlink_systems) - -# # ROS2 初始化 -# analyzer._init_node() - -# # 創建 ROS2 topic -# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid -# print("topic created") -# time.sleep(5) - -# # 丟出 topic -# analyzer.emit_info() - -# # 結束程式 -# analyzer.running = False -# analyzer.destroy_node() - -# rclpy.shutdown() -# analyzer.stop() -# analyzer.thread.join() -# print('<=== End of Program') elif test_item == 21: # 需要開啟一個 ardupilot 的模擬器 @@ -291,160 +243,18 @@ elif test_item == 21: mavlink_socket.close() print('<=== End of Program') -# elif test_item == 22: -# # 需要開啟一個 ardupilot 的模擬器 與 GCS -# # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 -# print('===> Start of Program .Test ', test_item) -# rclpy.init() # 注意要初始化 rclpy 才能使用 node - -# # 啟動 mavlink_bridge -# analyzer = mo.mavlink_bridge() -# # 關於 Node 的初始化 -# show_time = time.time() -# analyzer._init_node() # 初始化 node -# print('初始化 node 完成 耗時 : ',time.time() - show_time) - -# # 初始化兩個通道 -# connection_string_in="udp:127.0.0.1:15551" -# mavlink_socket_in = mavutil.mavlink_connection(connection_string_in) -# mavlink_object_in = mo.mavlink_object(mavlink_socket_in) -# mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] - - -# connection_string_out="udpout:127.0.0.1:14553" -# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) -# mavlink_object_out = mo.mavlink_object(mavlink_socket_out) -# mavlink_object_out.multiplexingToAnalysis = [] +elif test_item == 52: + print('===> Start of Program .Test ', test_item) + manager = mo.async_io_manager() + manager.start() -# # 讓兩個通道互相傳輸 -# mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all -# mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all + # print(manager.thread.is_alive()) -# # 啟動通道 -# mavlink_object_in.run() -# mavlink_object_out.run() + manager.shutdown() -# print('waiting for mavlink data ...') -# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 - -# print('目前所有的系統 : ') -# for sysid in analyzer.mavlink_systems: -# print(analyzer.mavlink_systems[sysid]) - -# compid = 1 -# sysid = 1 -# show_time = time.time() -# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) -# print(f"Execution time for create_flightMode: {time.time() - show_time} seconds") - -# print("start emit info") - -# start_time = time.time() -# show_time = time.time() -# show_time2 = time.time() - -# while time.time() - start_time < running_time: -# if (time.time() - show_time2) >= 1: -# analyzer.emit_info() # 這邊是測試 node 的運行 -# # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode'] -# fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode'] -# sss = mavutil.mode_string_v10(fmsg) -# # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq())) -# print("目前的飛行模式 : {}".format(sss)) -# show_time2 = time.time() - -# # if (time.time() - show_time) >= 2: -# # show_time = time.time() -# # for sysid in analyzer.mavlink_systems: -# # for compid in analyzer.mavlink_systems[sysid].components: -# # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) -# # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) -# # print("===================") - - + time.sleep(1) -# analyzer.destroy_node() -# rclpy.shutdown() + print('manager stopped') - -# # 結束程式 退出所有 thread -# mavlink_object_in.stop() -# mavlink_object_in.thread.join() -# mavlink_socket_in.close() -# mavlink_object_out.stop() -# mavlink_object_out.thread.join() -# mavlink_socket_out.close() -# analyzer.stop() -# analyzer.thread.join() - - -# print('<=== End of Program') - -# elif test_item == 51: - -# # 晉凱的測試項目 -# print('===> Start of Program .Test ', test_item) -# rclpy.init() # 注意要初始化 rclpy 才能使用 node - -# # 啟動 mavlink_bridge -# analyzer = mo.mavlink_bridge() -# # 關於 Node 的初始化 -# show_time = time.time() -# analyzer._init_node() # 初始化 node -# print('初始化 node 完成 耗時 : ',time.time() - show_time) - -# # 創建通道 -# connection_string="udp:127.0.0.1:15551" -# mavlink_socket3 = mavutil.mavlink_connection(connection_string) -# mavlink_object3 = mo.mavlink_object(mavlink_socket3) -# # 設定通道流動 -# mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] -# mavlink_object3.multiplexingToReturn = [] # -# # mavlink_object3.multiplexingToSwap = [] # -# # 啟動通道 -# mavlink_object3.run() - - - -# print('waiting for mavlink data ...') -# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 - -# compid = 1 -# sysid = 1 -# start_time = time.time() -# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) -# analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid]) -# analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid]) -# analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid]) -# analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid]) -# analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid]) -# analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid]) -# analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid]) -# end_time = time.time() -# print(f"Execution time for create all topic: {end_time - start_time} seconds") - -# print("start emit info") - -# start_time = time.time() -# show_time = time.time() -# while time.time() - start_time < running_time: -# try: -# # rclpy.spin(analyzer) -# analyzer.emit_info() # 這邊是測試 node 的運行 -# time.sleep(0.5) -# except KeyboardInterrupt: -# break - -# analyzer.destroy_node() -# rclpy.shutdown() - -# # 結束程式 退出所有 thread -# mavlink_object3.stop() -# mavlink_object3.thread.join() -# analyzer.stop() -# analyzer.thread.join() - -# mavlink_socket3.close() -# print('<=== End of Program') \ No newline at end of file diff --git a/src/fc_network_adapter/tests/demo_mavlinkVehicleView.py b/src/fc_network_adapter/tests/demo_mavlinkVehicleView.py new file mode 100644 index 0000000..d6954d3 --- /dev/null +++ b/src/fc_network_adapter/tests/demo_mavlinkVehicleView.py @@ -0,0 +1,331 @@ +""" +VehicleView 使用範例 +展示如何使用純狀態容器來管理 MAVLink 載具資訊 +""" + +import time +from ..fc_network_adapter.mavlinkVehicleView import ( + VehicleView, + VehicleComponent, + RFModule, + vehicle_registry, + ConnectionType, + ComponentType, + RFModuleType +) + + +def example_basic_usage(): + """基本使用範例""" + print("=== 基本使用範例 ===\n") + + # 1. 建立載具視圖 + vehicle = VehicleView(sysid=1) + vehicle.kind = "Copter" + vehicle.vehicle_type = 2 # MAV_TYPE_QUADROTOR + vehicle.connected_via = ConnectionType.UDP + + print(f"建立載具: {vehicle}\n") + + # 2. 新增 autopilot 組件 + autopilot = vehicle.add_component( + component_id=1, + comp_type=ComponentType.AUTOPILOT + ) + autopilot.mav_type = 2 # MAV_TYPE_QUADROTOR + autopilot.mav_autopilot = 3 # MAV_AUTOPILOT_ARDUPILOTMEGA + + print(f"新增組件: {autopilot}\n") + + # 3. 手動餵入位置資訊 + autopilot.status.position.latitude = 25.0330 + autopilot.status.position.longitude = 121.5654 + autopilot.status.position.altitude = 100.5 + autopilot.status.position.timestamp = time.time() + + print(f"位置: 緯度={autopilot.status.position.latitude}, " + f"經度={autopilot.status.position.longitude}, " + f"高度={autopilot.status.position.altitude}m\n") + + # 4. 手動餵入姿態資訊 + autopilot.status.attitude.roll = 0.05 # 弧度 + autopilot.status.attitude.pitch = -0.02 + autopilot.status.attitude.yaw = 1.57 + autopilot.status.attitude.timestamp = time.time() + + print(f"姿態: Roll={autopilot.status.attitude.roll:.3f}, " + f"Pitch={autopilot.status.attitude.pitch:.3f}, " + f"Yaw={autopilot.status.attitude.yaw:.3f} rad\n") + + # 5. 手動餵入飛行模式 + autopilot.status.mode.base_mode = 89 + autopilot.status.mode.custom_mode = 4 + autopilot.status.mode.mode_name = "GUIDED" + autopilot.status.mode.timestamp = time.time() + + print(f"飛行模式: {autopilot.status.mode.mode_name}\n") + + # 6. 手動餵入電池資訊 + autopilot.status.battery.voltage = 12.6 + autopilot.status.battery.current = 15.2 + autopilot.status.battery.remaining = 75 + autopilot.status.battery.timestamp = time.time() + + print(f"電池: 電壓={autopilot.status.battery.voltage}V, " + f"電流={autopilot.status.battery.current}A, " + f"剩餘={autopilot.status.battery.remaining}%\n") + + +def example_packet_tracking(): + """封包追蹤範例""" + print("\n=== 封包追蹤範例 ===\n") + + vehicle = VehicleView(sysid=2) + autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT) + + # 模擬接收封包 + timestamp = time.time() + + # 接收 HEARTBEAT (msg_type=0) + autopilot.update_packet_stats(seq=0, msg_type=0, timestamp=timestamp) + + # 接收 ATTITUDE (msg_type=30) + autopilot.update_packet_stats(seq=1, msg_type=30, timestamp=timestamp+0.1) + + # 接收 GLOBAL_POSITION_INT (msg_type=33) + autopilot.update_packet_stats(seq=2, msg_type=33, timestamp=timestamp+0.2) + + # 模擬封包遺失 (seq 跳過 3, 4, 5) + autopilot.update_packet_stats(seq=6, msg_type=0, timestamp=timestamp+0.3) + + stats = autopilot.packet_stats + print(f"封包統計:") + print(f" 接收: {stats.received_count}") + print(f" 遺失: {stats.lost_count}") + print(f" 最後序號: {stats.last_seq}") + print(f" 訊息類型計數: {stats.msg_type_count}\n") + + +def example_parameters(): + """參數管理範例""" + print("\n=== 參數管理範例 ===\n") + + vehicle = VehicleView(sysid=3) + autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT) + + # 手動設定參數 (不會主動下載) + autopilot.set_parameter("ARMING_CHECK", 1) + autopilot.set_parameter("ANGLE_MAX", 4500) + autopilot.set_parameter("WPNAV_SPEED", 500) + + print(f"參數數量: {len(autopilot.parameters)}") + print(f"ARMING_CHECK = {autopilot.get_parameter('ARMING_CHECK')}") + print(f"ANGLE_MAX = {autopilot.get_parameter('ANGLE_MAX')}") + print(f"WPNAV_SPEED = {autopilot.get_parameter('WPNAV_SPEED')}\n") + + +def example_rf_module(): + """RF模組範例""" + print("\n=== RF模組範例 ===\n") + + vehicle = VehicleView(sysid=4) + vehicle.connected_via = ConnectionType.SERIAL + + # 設定 XBee RF 模組 + rf = vehicle.set_rf_module(RFModuleType.XBEE) + + # 更新 Socket 資訊 + rf.update_socket_info( + ip="192.168.1.100", + port=14550, + local_ip="192.168.1.1", + local_port=14551, + connected=True + ) + + # 更新 RSSI + rf.update_rssi(rssi=-65, timestamp=time.time()) + + # 更新 AT 命令回應 + rf.update_at_response("OK", timestamp=time.time()) + + # 自定義狀態 + rf.status.custom_status['signal_quality'] = 'excellent' + rf.status.custom_status['packet_error_rate'] = 0.001 + + print(f"RF模組: {rf}") + print(f"Socket: {rf.socket_info.ip}:{rf.socket_info.port}") + print(f"RSSI: {rf.status.rssi} dBm") + print(f"AT回應: {rf.status.at_response}") + print(f"自定義狀態: {rf.status.custom_status}\n") + + +def example_multiple_components(): + """多組件範例""" + print("\n=== 多組件範例 ===\n") + + vehicle = VehicleView(sysid=5) + vehicle.kind = "Copter with Gimbal" + + # Autopilot 組件 + autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT) + autopilot.mav_type = 2 + autopilot.status.mode.mode_name = "AUTO" + + # Gimbal 組件 + gimbal = vehicle.add_component(154, ComponentType.GIMBAL) + gimbal.mav_type = 26 # MAV_TYPE_GIMBAL + gimbal.status.attitude.pitch = -0.785 # 向下45度 + gimbal.status.attitude.yaw = 0.0 + + # Camera 組件 + camera = vehicle.add_component(100, ComponentType.CAMERA) + camera.mav_type = 30 # MAV_TYPE_CAMERA + camera.status.custom_status['recording'] = True + camera.status.custom_status['photo_interval'] = 2.0 + + print(f"載具: {vehicle}") + print(f"組件數量: {len(vehicle.components)}") + for cid, comp in vehicle.components.items(): + print(f" 組件 {cid}: {comp.type.value}, MAV_TYPE={comp.mav_type}") + print() + + +def example_registry(): + """註冊表使用範例""" + print("\n=== 註冊表使用範例 ===\n") + + # 註冊多個載具 + v1 = vehicle_registry.register(sysid=1) + v1.kind = "Copter-1" + v1.add_component(1, ComponentType.AUTOPILOT) + + v2 = vehicle_registry.register(sysid=2) + v2.kind = "Plane-1" + v2.add_component(1, ComponentType.AUTOPILOT) + + v3 = vehicle_registry.register(sysid=3) + v3.kind = "Rover-1" + v3.add_component(1, ComponentType.AUTOPILOT) + + print(f"註冊表中的載具數量: {len(vehicle_registry)}") + + # 取得所有載具 + all_vehicles = vehicle_registry.get_all() + for sysid, vehicle in all_vehicles.items(): + print(f" System {sysid}: {vehicle.kind}") + + # 檢查載具是否存在 + print(f"\nSystem 2 存在? {2 in vehicle_registry}") + print(f"System 99 存在? {99 in vehicle_registry}") + + # 取得特定載具 + vehicle = vehicle_registry.get(2) + if vehicle: + print(f"\n取得載具: {vehicle}") + + # 註銷載具 + vehicle_registry.unregister(3) + print(f"\n註銷 System 3 後,剩餘載具: {len(vehicle_registry)}\n") + + +def example_serialization(): + """序列化範例 (除錯/日誌用)""" + print("\n=== 序列化範例 ===\n") + + vehicle = VehicleView(sysid=10) + vehicle.kind = "Test Copter" + vehicle.connected_via = ConnectionType.UDP + vehicle.custom_meta['firmware'] = 'ArduCopter 4.3.0' + vehicle.custom_meta['frame_type'] = 'X' + + autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT) + autopilot.mav_type = 2 + autopilot.status.position.altitude = 50.0 + autopilot.status.battery.voltage = 12.4 + autopilot.update_packet_stats(0, 0, time.time()) + autopilot.update_packet_stats(1, 30, time.time()) + + rf = vehicle.set_rf_module(RFModuleType.UDP) + rf.update_rssi(-70) + rf.update_socket_info(ip="192.168.1.200", port=14550, connected=True) + + # 轉換為字典 + data = vehicle.to_dict() + + print("載具資料 (字典格式):") + import json + print(json.dumps(data, indent=2, ensure_ascii=False)) + + +def example_gps_ekf(): + """GPS 與 EKF 範例""" + print("\n\n=== GPS 與 EKF 範例 ===\n") + + vehicle = VehicleView(sysid=11) + autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT) + + # GPS 資訊 + autopilot.status.gps.fix_type = 3 # 3D Fix + autopilot.status.gps.satellites_visible = 12 + autopilot.status.gps.eph = 120 # HDOP = 1.2 + autopilot.status.gps.epv = 180 # VDOP = 1.8 + autopilot.status.gps.timestamp = time.time() + + print(f"GPS:") + print(f" Fix Type: {autopilot.status.gps.fix_type}") + print(f" 衛星數: {autopilot.status.gps.satellites_visible}") + print(f" HDOP: {autopilot.status.gps.eph/100}") + + # EKF 資訊 + autopilot.status.ekf.flags = 0x1FF # 所有 flags 都 OK + autopilot.status.ekf.velocity_variance = 0.5 + autopilot.status.ekf.pos_horiz_variance = 1.2 + autopilot.status.ekf.pos_vert_variance = 2.0 + autopilot.status.ekf.timestamp = time.time() + + print(f"\nEKF:") + print(f" Flags: 0x{autopilot.status.ekf.flags:X}") + print(f" 速度變異: {autopilot.status.ekf.velocity_variance}") + print(f" 水平位置變異: {autopilot.status.ekf.pos_horiz_variance}") + print(f" 垂直位置變異: {autopilot.status.ekf.pos_vert_variance}\n") + + +def example_vfr_hud(): + """VFR HUD 範例""" + print("\n=== VFR HUD 範例 ===\n") + + vehicle = VehicleView(sysid=12) + autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT) + + # VFR HUD 資訊 + autopilot.status.vfr.airspeed = 15.5 # m/s + autopilot.status.vfr.groundspeed = 14.8 # m/s + autopilot.status.vfr.heading = 90 # 東方 + autopilot.status.vfr.throttle = 65 # % + autopilot.status.vfr.climb = 2.5 # m/s + autopilot.status.vfr.timestamp = time.time() + + print(f"VFR HUD:") + print(f" 空速: {autopilot.status.vfr.airspeed} m/s") + print(f" 地速: {autopilot.status.vfr.groundspeed} m/s") + print(f" 航向: {autopilot.status.vfr.heading}°") + print(f" 油門: {autopilot.status.vfr.throttle}%") + print(f" 爬升率: {autopilot.status.vfr.climb} m/s\n") + + +if __name__ == "__main__": + # 執行所有範例 + # example_basic_usage() + # example_packet_tracking() + # example_parameters() + # example_rf_module() + # example_multiple_components() + # example_registry() + # example_serialization() + # example_gps_ekf() + example_vfr_hud() + + print("\n" + "="*50) + print("所有範例執行完成!") + print("="*50) diff --git a/src/fc_network_adapter/tests/test_mavlinkObject.py b/src/fc_network_adapter/tests/test_mavlinkObject.py index 29744fc..9c3ca78 100644 --- a/src/fc_network_adapter/tests/test_mavlinkObject.py +++ b/src/fc_network_adapter/tests/test_mavlinkObject.py @@ -13,7 +13,6 @@ import time import threading import socket import asyncio -from unittest.mock import MagicMock, patch # 導入要測試的模組 from ..fc_network_adapter.mavlinkObject import ( @@ -24,62 +23,100 @@ from ..fc_network_adapter.mavlinkObject import ( return_packet_ring ) +# 預先定義好的真實 MAVLink heartbeat 封包 (MAVLink 1.0 格式) +# Format: STX(0xFE) + LEN + SEQ + SYS + COMP + MSG_ID + PAYLOAD(9 bytes for heartbeat) + CRC(2 bytes) +HEARTBEAT_PACKET_1 = bytes([ + 0xFE, # STX (MAVLink 1.0) + 0x09, # payload length (9 bytes) + 0x00, # sequence + 0x01, # system ID = 1 + 0x01, # component ID = 1 + 0x00, # message ID (HEARTBEAT = 0) + # Payload (9 bytes): custom_mode(4), type(1), autopilot(1), base_mode(1), system_status(1), mavlink_version(1) + 0x00, 0x00, 0x00, 0x00, # custom_mode = 0 + 0x02, # type = MAV_TYPE_QUADROTOR (2) + 0x03, # autopilot = MAV_AUTOPILOT_ARDUPILOTMEGA (3) + 0x40, # base_mode = MAV_MODE_FLAG_CUSTOM_MODE_ENABLED (64) + 0x03, # system_status = MAV_STATE_STANDBY (3) + 0x03, # mavlink_version = 3 + 0x62, 0x8E # CRC (simplified placeholder) +]) + +HEARTBEAT_PACKET_2 = bytes([ + 0xFE, # STX + 0x09, # payload length + 0x01, # sequence (增加) + 0x01, # system ID = 1 + 0x01, # component ID = 1 + 0x00, # message ID (HEARTBEAT = 0) + 0x00, 0x00, 0x00, 0x00, + 0x02, 0x03, 0x41, 0x03, 0x03, + 0x33, 0xEC +]) + +HEARTBEAT_PACKET_3 = bytes([ + 0xFE, # STX + 0x09, # payload length + 0x02, # sequence + 0x02, # system ID = 2 + 0x01, # component ID = 1 + 0x00, # message ID (HEARTBEAT = 0) + 0x00, 0x00, 0x00, 0x00, + 0x02, 0x03, 0x42, 0x03, 0x03, + 0x37, 0x44 +]) + class MockMavlinkSocket: - """模擬 Mavlink Socket 的類別,用於測試""" + """模擬 Mavlink Socket 的類別,用於測試 + 使用真實的 MAVLink 封包,而不是模擬的訊息對象 + """ - def __init__(self, test_data=None): + def __init__(self, test_packets=None): + """ + Args: + test_packets: list of bytes,每個元素都是完整的 MAVLink 封包 + """ self.closed = False - self.test_data = test_data or [] - self.data_index = 0 + self.test_packets = test_packets or [] + self.packet_index = 0 self.written_data = [] + # 使用 pymavlink 來解析封包 + from pymavlink import mavutil + self.mav_parser = mavutil.mavlink.MAVLink(self) + def recv_msg(self): - if not self.test_data or self.data_index >= len(self.test_data): + """返回解析後的 MAVLink 訊息對象""" + if not self.test_packets or self.packet_index >= len(self.test_packets): + return None + + packet = self.test_packets[self.packet_index] + self.packet_index += 1 + + # 使用 pymavlink 解析封包 + try: + for byte in packet: + msg = self.mav_parser.parse_char(bytes([byte])) + if msg: + return msg + except Exception as e: + print(f"Error parsing packet: {e}") return None - data = self.test_data[self.data_index] - self.data_index += 1 - return data + return None def write(self, data): + """寫入數據(用於檢查轉發)""" self.written_data.append(data) def close(self): + """關閉 socket""" self.closed = True - - -class MockMavlinkMessage: - """模擬 Mavlink 訊息的類別,用於測試""" - - def __init__(self, msg_id, sysid, compid, seq=0): - self.msg_id = msg_id - self.sysid = sysid - self.compid = compid - self.seq = seq - self.msg_buf = bytes([msg_id, sysid, compid, seq]) - - def get_msgId(self): - return self.msg_id - - def get_srcSystem(self): - return self.sysid - - def get_srcComponent(self): - return self.compid - - def get_seq(self): - return self.seq - - def get_msgbuf(self): - return self.msg_buf - - def get_type(self): - return f"MSG_ID_{self.msg_id}" class TestMavlinkObject(unittest.TestCase): - """測試 mavlink_object 類別""" + """測試 mavlink_object 類別的獨立功能""" def setUp(self): """在每個測試方法執行前準備環境""" @@ -91,35 +128,27 @@ class TestMavlinkObject(unittest.TestCase): stream_bridge_ring.clear() return_packet_ring.clear() - # 創建測試訊息 - self.heartbeat_msg = MockMavlinkMessage(msg_id=0, sysid=1, compid=1) - self.attitude_msg = MockMavlinkMessage(msg_id=30, sysid=1, compid=1) - self.position_msg = MockMavlinkMessage(msg_id=32, sysid=1, compid=1) - - # 創建模擬的 socket # 假的 Mavlink Socket - self.mock_socket = MockMavlinkSocket([ - self.heartbeat_msg, - self.attitude_msg, - self.position_msg - ]) + # 創建模擬的 socket,使用真實封包 + self.mock_socket = MockMavlinkSocket([HEARTBEAT_PACKET_1]) # 創建測試對象 self.mavlink_obj = mavlink_object(self.mock_socket) def test_initialization(self): """測試 mavlink_object 初始化是否正確""" - # print("Testing mavlink_object initialization") self.assertEqual(self.mavlink_obj.socket_id, 0) self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT) self.assertEqual(len(self.mavlink_obj.target_sockets), 0) self.assertEqual(self.mavlink_obj.bridge_msg_types, [0]) - + self.assertEqual(self.mavlink_obj.return_msg_types, []) + def test_add_remove_target_socket(self): """測試添加和移除目標端口功能""" # 添加目標端口 self.assertTrue(self.mavlink_obj.add_target_socket(1)) self.assertEqual(len(self.mavlink_obj.target_sockets), 1) self.assertEqual(self.mavlink_obj.target_sockets[0], 1) + self.assertTrue(self.mavlink_obj.add_target_socket(2)) self.assertEqual(len(self.mavlink_obj.target_sockets), 2) self.assertIn(2, self.mavlink_obj.target_sockets) @@ -153,78 +182,31 @@ class TestMavlinkObject(unittest.TestCase): self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid")) self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"])) - def test_send_message(self): - """測試 send_message 功能""" - # 創建一個新的 mavlink_object 實例 - mock_socket = MockMavlinkSocket() - mavlink_obj = mavlink_object(mock_socket) - - # 準備測試數據 - test_message1 = b"test_message_1" - test_message2 = b"test_message_2" - - # 測試初始狀態 - self.assertEqual(len(mock_socket.written_data), 0) - + def test_send_message_validation(self): + """測試 send_message 的數據驗證功能(不需要啟動 manager)""" # 測試非運行狀態下發送消息 - self.assertFalse(mavlink_obj.send_message(test_message1)) - self.assertEqual(len(mock_socket.written_data), 0) - - # 啟動 manager - manager = async_io_manager() - manager.start() - time.sleep(0.5) # 等待事件循環啟動 - - # 添加對象到 manager - manager.add_mavlink_object(mavlink_obj) - time.sleep(0.1) # 等待對象啟動 - - # 確認對象狀態 - self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING) - - # 測試發送消息 - self.assertTrue(mavlink_obj.send_message(test_message1)) - time.sleep(0.2) # 等待消息處理 + self.assertFalse(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1)) - # 確認消息已發送 - self.assertEqual(len(mock_socket.written_data), 1) - self.assertEqual(mock_socket.written_data[0], test_message1) - - # 測試連續發送多條消息 - self.assertTrue(mavlink_obj.send_message(test_message2)) - time.sleep(0.2) # 等待消息處理 - - # 確認兩條消息都已發送 - self.assertEqual(len(mock_socket.written_data), 2) - self.assertEqual(mock_socket.written_data[1], test_message2) - - # 模擬發送出錯的情況 - class ErrorWriteSocket(MockMavlinkSocket): - def write(self, data): - raise Exception("Write error") - - error_socket = ErrorWriteSocket() - error_obj = mavlink_object(error_socket) - manager.add_mavlink_object(error_obj) - time.sleep(0.1) # 等待對象啟動 + # 測試無效的數據類型 + self.mavlink_obj.state = MavlinkObjectState.RUNNING # 臨時設置狀態 + self.assertFalse(self.mavlink_obj.send_message("invalid")) + self.assertFalse(self.mavlink_obj.send_message(123)) - # 發送消息到錯誤的 socket - self.assertTrue(error_obj.send_message(test_message1)) - time.sleep(0.2) # 等待消息處理 + # 測試太短的封包 + self.assertFalse(self.mavlink_obj.send_message(bytes([0xFE, 0x00]))) - # 即使寫入失敗,send_message 應該也返回 True - # 因為消息已成功加入到佇列中,只是後續的實際發送失敗 + # 測試無效的起始標記 + invalid_packet = bytes([0xFF, 0x09, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00]) + self.assertFalse(self.mavlink_obj.send_message(invalid_packet)) - # 停止 manager - manager.stop() - time.sleep(0.5) # 等待 manager 停止 + # 測試有效的封包可以加入佇列 + self.assertTrue(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1)) + self.assertEqual(len(self.mavlink_obj.outgoing_msgs), 1) - # 測試對象已關閉後發送消息 - self.assertFalse(mavlink_obj.send_message(test_message1)) - self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加 + self.mavlink_obj.state = MavlinkObjectState.INIT # 恢復狀態 class TestAsyncIOManager(unittest.TestCase): - """測試 async_io_manager 類別""" + """測試 async_io_manager 類別的獨立功能""" def setUp(self): """在每個測試方法執行前準備環境""" @@ -239,14 +221,9 @@ class TestAsyncIOManager(unittest.TestCase): # 創建 async_io_manager 實例 self.manager = async_io_manager() - # 模擬 mavlink 對象 - self.mock_socket1 = MockMavlinkSocket([ - MockMavlinkMessage(msg_id=0, sysid=1, compid=1), - MockMavlinkMessage(msg_id=30, sysid=1, compid=1) - ]) - self.mock_socket2 = MockMavlinkSocket([ - MockMavlinkMessage(msg_id=0, sysid=2, compid=1) - ]) + # 創建模擬 mavlink 對象,使用真實封包 + self.mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2]) + self.mock_socket2 = MockMavlinkSocket([HEARTBEAT_PACKET_3]) self.mavlink_obj1 = mavlink_object(self.mock_socket1) self.mavlink_obj2 = mavlink_object(self.mock_socket2) @@ -254,10 +231,10 @@ class TestAsyncIOManager(unittest.TestCase): def tearDown(self): """在每個測試方法執行後清理環境""" if self.manager.running: - self.manager.stop() + self.manager.shutdown() def test_singleton_pattern(self): - """測試 async_io_manager 的單例模式 就是不會產生兩個 magager""" + """測試 async_io_manager 的單例模式""" manager1 = async_io_manager() manager2 = async_io_manager() self.assertIs(manager1, manager2) @@ -275,7 +252,7 @@ class TestAsyncIOManager(unittest.TestCase): self.assertIs(self.manager.thread, old_thread) # 停止管理器 - self.manager.stop() + self.manager.shutdown() self.assertFalse(self.manager.running) # 最多等待 5 秒讓線程結束 @@ -313,157 +290,179 @@ class TestAsyncIOManager(unittest.TestCase): self.assertFalse(self.manager.remove_mavlink_object(999)) # 停止管理器 - self.manager.stop() + self.manager.shutdown() + +class TestIntegration(unittest.TestCase): + """整合測試,測試多個 mavlink_object 之間的互動與資料流""" + + def setUp(self): + """在每個測試方法執行前準備環境""" + # 清空全局變數 + mavlink_object.mavlinkObjects = {} + mavlink_object.socket_num = 0 + + # 清空 ring buffer + stream_bridge_ring.clear() + return_packet_ring.clear() + + # 創建 async_io_manager 實例 + self.manager = async_io_manager() + + def tearDown(self): + """在每個測試方法執行後清理環境""" + if self.manager.running: + self.manager.shutdown() + + def test_send_message_with_manager(self): + """測試透過 async_io_manager 發送訊息的完整流程""" + # 創建一個新的 mavlink_object 實例 + mock_socket = MockMavlinkSocket() + mavlink_obj = mavlink_object(mock_socket) - def test_data_processing(self): - """測試數據處理""" + # 測試初始狀態 + self.assertEqual(len(mock_socket.written_data), 0) + + # 測試非運行狀態下發送消息 + self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1)) + self.assertEqual(len(mock_socket.written_data), 0) + + # 啟動 manager + self.manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 添加對象到 manager + self.manager.add_mavlink_object(mavlink_obj) + time.sleep(0.1) # 等待對象啟動 + + # 確認對象狀態 + self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING) + + # 測試發送消息 + self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_1)) + time.sleep(0.2) # 等待消息處理 + + # 確認消息已發送 + self.assertEqual(len(mock_socket.written_data), 1) + self.assertEqual(mock_socket.written_data[0], HEARTBEAT_PACKET_1) + + # 測試連續發送多條消息 + self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_2)) + time.sleep(0.2) # 等待消息處理 + + # 確認兩條消息都已發送 + self.assertEqual(len(mock_socket.written_data), 2) + self.assertEqual(mock_socket.written_data[1], HEARTBEAT_PACKET_2) + + # 停止 manager + self.manager.shutdown() + time.sleep(0.5) # 等待 manager 停止 + + # 測試對象已關閉後發送消息 + self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1)) + self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加 + + def test_data_processing_and_forwarding(self): + """測試數據處理與轉發流程""" # 創建用於轉發的 mavlink_objects + mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2,]) mock_socket3 = MockMavlinkSocket() - mavlink_obj3 = mavlink_object(mock_socket3) - - # 設置轉發 - self.mavlink_obj1.add_target_socket(2) # socket1 轉發到 socket3 + mavlink_obj1 = mavlink_object(mock_socket1) + mavlink_obj3 = mavlink_object(mock_socket3) + # 設置訊息類型 - self.mavlink_obj1.set_bridge_message_types([0, 30]) # HEARTBEAT, ATTITUDE - # self.mavlink_obj1.enable_return(True) - self.mavlink_obj1.set_return_message_types([30]) # ATTITUDE + mavlink_obj1.set_bridge_message_types([0]) # 只處理 HEARTBEAT + + # 設置轉發: obj1 -> obj3 + mavlink_obj1.add_target_socket(mavlink_obj3.socket_id) # socket1 轉發到 socket3 (socket_id=1) # 啟動管理器並添加對象 self.manager.start() time.sleep(0.5) # 等待事件循環啟動 - self.manager.add_mavlink_object(self.mavlink_obj1) + """ + 這邊出現很奇怪的狀況 應該說 設計時沒有考量 但是實測會發現 + mavlink_obj3 是接收端 必需要被優先加入 manager 才能正確接收來自 mavlink_obj1 的轉發封包 + 若先把 mavlink_ojb1 加入 manger 則可能會導致前面幾個封包丟失 + """ self.manager.add_mavlink_object(mavlink_obj3) + self.manager.add_mavlink_object(mavlink_obj1) # 等待處理完成 - time.sleep(1.0) + time.sleep(0.5) - # print("testing Mark A") - # 檢查 Ring buffer 是否有正確的數據 - self.assertEqual(stream_bridge_ring.size(), 2) # HEARTBEAT + ATTITUDE - self.assertEqual(return_packet_ring.size(), 1) # ATTITUDE + self.assertGreaterEqual(stream_bridge_ring.size(), 2) # 至少 2 個 HEARTBEAT - a = stream_bridge_ring.get() - print(f"stream_bridge_ring: {a}") - # 檢查是否正確轉發 - self.assertEqual(len(mock_socket3.written_data), 2) # HEARTBEAT + ATTITUDE - - # print("testing Mark B") - - # 停止管理器 - self.manager.stop() - - def test_error_handling(self): - """測試錯誤處理情況""" - print("=== mark A ===") - # 創建一個會引發異常的 socket - class ErrorSocket(MockMavlinkSocket): - def recv_msg(self): - raise Exception("Test exception") + self.assertGreaterEqual(len(mock_socket3.written_data), 2) # 至少 2 個 HEARTBEAT - error_socket = ErrorSocket() - mavlink_obj_err = mavlink_object(error_socket) + # 停止管理器 + self.manager.shutdown() - # 啟動管理器並添加對象 + def test_bidirectional_forwarding(self): + """測試雙向轉發""" + # 清空全局變數和 ring buffer + mavlink_object.mavlinkObjects = {} + mavlink_object.socket_num = 0 + stream_bridge_ring.clear() + return_packet_ring.clear() + + # 創建三個 mavlink 對象,模擬三個通道 + socket1 = MockMavlinkSocket() + socket2 = MockMavlinkSocket() + socket3 = MockMavlinkSocket() + + obj1 = mavlink_object(socket1) + obj2 = mavlink_object(socket2) + obj3 = mavlink_object(socket3) + + # 設置雙向轉發 + # obj1 <-> obj2 <-> obj3 + obj1.add_target_socket(1) # obj1 -> obj2 + obj2.add_target_socket(0) # obj2 -> obj1 + obj2.add_target_socket(2) # obj2 -> obj3 + obj3.add_target_socket(1) # obj3 -> obj2 + + # 啟動 async_io_manager self.manager.start() time.sleep(0.5) # 等待事件循環啟動 + + # 添加所有 mavlink_object + self.manager.add_mavlink_object(obj1) + self.manager.add_mavlink_object(obj2) + self.manager.add_mavlink_object(obj3) - self.manager.add_mavlink_object(mavlink_obj_err) - - print("=== mark B ===") - - # 等待錯誤處理 + # 對三個對象添加數據 + socket1.test_packets.append(HEARTBEAT_PACKET_1) + socket2.test_packets.append(HEARTBEAT_PACKET_2) + socket3.test_packets.append(HEARTBEAT_PACKET_3) + + # 等待處理所有訊息 time.sleep(1.0) - - # 對象應該進入錯誤狀態,但不會崩潰 - # self.assertEqual(mavlink_obj_err.state, MavlinkObjectState.ERROR) - - print("=== mark C ===") - - # 管理器應該仍在運行 - self.assertTrue(self.manager.running) - - - # 故意等一段時間 確認 socket 有被 manager 處理掉 - time.sleep(3) - + + # 檢查轉發結果 + # socket1 應該收到 socket2 的訊息 + self.assertGreaterEqual(len(socket1.written_data), 1) + + # socket2 應該收到 socket1 和 socket3 的訊息 + self.assertGreaterEqual(len(socket2.written_data), 2) + + # socket3 應該收到 socket2 的訊息 + self.assertGreaterEqual(len(socket3.written_data), 1) + + # 檢查 ring buffer 的數據 + # 所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0] + self.assertGreaterEqual(stream_bridge_ring.size(), 3) # 至少 3 個 HEARTBEAT + # 停止管理器 - self.manager.stop() - - -class TestIntegration(unittest.TestCase): - """整合測試,測試多個 mavlink_object 之間的互動""" - - def test_bidirectional_forwarding(self): - """測試雙向轉發""" - # 清空全局變數和 ring buffer - mavlink_object.mavlinkObjects = {} - mavlink_object.socket_num = 0 - stream_bridge_ring.clear() - return_packet_ring.clear() - - # 創建三個 mavlink 對象,模擬三個通道 - socket1 = MockMavlinkSocket([ - MockMavlinkMessage(msg_id=0, sysid=1, compid=1), - MockMavlinkMessage(msg_id=30, sysid=1, compid=1) - ]) - socket2 = MockMavlinkSocket([ - MockMavlinkMessage(msg_id=0, sysid=2, compid=1), - MockMavlinkMessage(msg_id=32, sysid=2, compid=1) - ]) - socket3 = MockMavlinkSocket([ - MockMavlinkMessage(msg_id=0, sysid=3, compid=1), - MockMavlinkMessage(msg_id=33, sysid=3, compid=1) - ]) - - obj1 = mavlink_object(socket1) - obj2 = mavlink_object(socket2) - obj3 = mavlink_object(socket3) - - # 設置雙向轉發 - # obj1 <-> obj2 <-> obj3 - obj1.add_target_socket(1) # obj1 -> obj2 - obj2.add_target_socket(0) # obj2 -> obj1 - obj2.add_target_socket(2) # obj2 -> obj3 - obj3.add_target_socket(1) # obj3 -> obj2 - - # 啟動 async_io_manager - manager = async_io_manager() - manager.start() - time.sleep(0.5) # 等待事件循環啟動 - - # 添加所有 mavlink_object - manager.add_mavlink_object(obj1) - manager.add_mavlink_object(obj2) - manager.add_mavlink_object(obj3) - - # 等待處理所有訊息 - time.sleep(1.5) - - # 檢查轉發結果 - # socket1 應該收到 socket2 和 socket3 的訊息 - self.assertEqual(len(socket1.written_data), 4) # 2 from obj2, 2 from obj3 via obj2 - - # socket2 應該收到 socket1 和 socket3 的訊息 - self.assertEqual(len(socket2.written_data), 4) # 2 from obj1, 2 from obj3 - - # socket3 應該收到 socket1 和 socket2 的訊息 - self.assertEqual(len(socket3.written_data), 4) # 2 from obj1 via obj2, 2 from obj2 - - # 檢查 ring buffer 的數據 - # 假設所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0] - # 應該有 3 個 HEARTBEAT 訊息 - self.assertEqual(stream_bridge_ring.size(), 3) # 3 HEARTBEAT - - # 停止管理器 - manager.stop() - - + self.manager.shutdown() if __name__ == "__main__": - unittest.main(defaultTest="TestMavlinkObject.test_send_message") - # unittest.main(defaultTest="TestAsyncIOManager") + # 可以指定要運行的測試 + # unittest.main(defaultTest="TestMavlinkObject.test_send_message_validation") + # unittest.main(defaultTest="TestAsyncIOManager.test_add_remove_objects") + unittest.main(defaultTest="TestIntegration.test_bidirectional_forwarding") + unittest.main() +