1. 大量添增終端機介面功能

2. 優化 mavlink_object 與 manager 的流程
chiyu
Chiyu Chen 5 months ago
parent 7af138b02a
commit 6a71e4530f

@ -44,4 +44,5 @@ N. logs 是執行時期的記錄檔
例如 在 ~/AirTrapMine/src/ 資料夾下
> python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
> python -m fc_network_adapter.tests.test_ringBuffer
> python -m fc_network_adapter.tests.demo_integration

@ -7,7 +7,6 @@
'''
import os
import time
import sys
@ -17,13 +16,45 @@ import threading
import queue
import signal
from pymavlink import mavutil
# 自定義的 import
from . import mavlinkObject as mo
from . import mavlinkDevice as md
from .utils import RingBuffer, setup_logger
logger = setup_logger(os.path.basename(__file__))
class PanelState:
def __init__(self):
self.panel_status = "Idle"
termination_start_time = None
self.mavlink_bridge_state = "Stopped"
self.object_manager_state = "Stopped"
self.socket_object_list = []
self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息
# 這邊是儲存關於 socket object 的資料
self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊
self.socket_info_single = {"socket_type": "", "socket_state": "", "bridge_msg_types": "", "return_msg_types": "",
"target_sockets": "", "primary_socket_id": "", "InfoReady": False} # 暫存單一 socket 的資訊
def intoSTART(self):
self.panel_status = "Running"
def intoTERMINATION(self):
self.termination_start_time = time.time()
self.panel_status = "Terminating"
def intoENGINEER(self):
self.panel_status = "Engineer"
def intoSTOPPED(self):
self.panel_status = "Stopped"
def set_user_input(self, text):
self.user_input = text
class MenuNode:
def __init__(self, name, desc="", action=None, children=None):
@ -36,31 +67,231 @@ class ControlPanel:
def __init__(self):
pass
def input_dialog(stdscr, prompt="請輸入文字: "):
"""顯示輸入對話框"""
height, width = stdscr.getmaxyx()
# 建立輸入視窗
dialog_height = 5
dialog_width = min(60, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
# 建立視窗邊框
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(1, 2, prompt)
dialog_win.addstr(3, 2, "按 Enter 確認, ESC 取消")
dialog_win.refresh()
# 輸入區域
input_win = curses.newwin(1, dialog_width - 6, start_y + 2, start_x + 2)
input_win.keypad(True)
curses.echo()
curses.curs_set(1)
user_input = ""
while True:
input_win.clear()
input_win.addstr(0, 0, user_input[-dialog_width+8:]) # 顯示輸入內容(滾動)
input_win.refresh()
ch = input_win.getch()
if ch == 27: # ESC
user_input = None
break
elif ch in (curses.KEY_ENTER, 10, 13): # Enter
break
elif ch in (curses.KEY_BACKSPACE, 127, 8): # Backspace
user_input = user_input[:-1]
elif 32 <= ch <= 126: # 可打印字符
user_input += chr(ch)
curses.noecho()
curses.curs_set(0)
# 清理視窗
del input_win
del dialog_win
stdscr.clear()
stdscr.refresh()
return user_input
def create_object_list_menu(self, state: PanelState, page=0, items_per_page=5):
"""動態創建 mavlink_object 列表選單(支持分頁)"""
children = []
if not state.socket_object_list:
children.append(MenuNode("(空)", "目前沒有連結口", None))
else:
total_items = len(state.socket_object_list)
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的物件
for socket_id in state.socket_object_list[start_idx:end_idx]:
# 為每個 socket 創建子選單
obj_menu = MenuNode(f"Socket #{socket_id}", f"連結口 {socket_id}", None, children=[
MenuNode("Info", "查看詳細資訊", "INSPECT_MAV_OBJECT"),
MenuNode("Make Link", "建立轉發連結", "MAVOBJ_MAKE_LINK"),
MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"),
MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"),
MenuNode("Remove Target", "移除轉發目標(工程)", "MAVOBJ_REMOVE_TARGET"),
MenuNode("返回", "回到列表", "BACK"),
])
# 將 socket_id 附加到每個子選單項目上
for child in obj_menu.children:
child.socket_id = socket_id
children.append(obj_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", "---", None))
if page > 0:
prev_node = MenuNode("◀ 上一頁", f"{page}/{total_pages}", "PREV_PAGE")
prev_node.page = page - 1
children.append(prev_node)
if page < total_pages - 1:
next_node = MenuNode("下一頁 ▶", f"{page + 2}/{total_pages}", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("返回", "回到上層選單", "BACK"))
menu = MenuNode("Object List", f"連結口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
return menu
def show_object_info(self, stdscr, socket_id, state: PanelState):
"""顯示物件詳細資訊的對話框"""
height, width = stdscr.getmaxyx()
dialog_height = 15
dialog_width = min(70, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(0, 2, f" Socket #{socket_id} 詳細資訊 ", curses.A_BOLD)
while not state.socket_info_single.get('InfoReady', False):
time.sleep(0.05) # 等待資訊準備好
# 這裡顯示基本資訊
dialog_win.addstr(2, 2, f"Socket ID : {socket_id}")
dialog_win.addstr(3, 2, f"Socket status : 運行中")
# show_str = ", ".join(map(str, state.socket_info_single.get('socket_type', '')))
dialog_win.addstr(4, 2, f"Socket Type : {state.socket_info_single.get('socket_type', '')}")
show_str = ",".join(map(str, state.socket_info_single.get('bridge_msg_types', '')))
dialog_win.addstr(5, 2, f"Bridge Pack : {show_str if show_str else 'N/A'}")
show_str = ",".join(map(str, state.socket_info_single.get('return_msg_types', '')))
dialog_win.addstr(6, 2, f"Return Pack : {show_str if show_str else 'N/A'}")
dialog_win.addstr(7, 2, f"Primary Socket ID: {state.socket_info_single.get('primary_socket_id', 'It Self')}")
show_str = ",".join(map(str, state.socket_info_single.get('target_sockets', '')))
dialog_win.addstr(8, 2, f"Switching Targets: {show_str if show_str else 'N/A'}")
state.socket_info_single['InfoReady'] = False # 重置狀態以便下次使用
dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...")
dialog_win.refresh()
dialog_win.getch()
del dialog_win
stdscr.clear()
stdscr.refresh()
def select_target_socket(self, stdscr, source_socket_id, state: PanelState, remove_mode=False):
"""選擇目標 socket 的對話框"""
height, width = stdscr.getmaxyx()
dialog_height = min(15, len(state.socket_object_list) + 5)
dialog_width = min(50, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.keypad(True)
title = "選擇要移除的目標" if remove_mode else "選擇轉發目標"
available_sockets = [sid for sid in state.socket_object_list if sid != source_socket_id]
if not available_sockets:
dialog_win.border()
dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD)
dialog_win.addstr(2, 2, "沒有可用的目標")
dialog_win.addstr(4, 2, "按任意鍵返回...")
dialog_win.refresh()
dialog_win.getch()
del dialog_win
stdscr.clear()
stdscr.refresh()
return None
selected_idx = 0
while True:
dialog_win.clear()
dialog_win.border()
dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD)
for i, socket_id in enumerate(available_sockets):
marker = "" if i == selected_idx else " "
attr = curses.A_REVERSE if i == selected_idx else curses.A_NORMAL
dialog_win.addstr(2 + i, 2, f"{marker} Socket #{socket_id}", attr)
dialog_win.addstr(dialog_height - 2, 2, "Enter確認 ESC取消")
dialog_win.refresh()
ch = dialog_win.getch()
if ch in (curses.KEY_UP, ord('k')):
selected_idx = (selected_idx - 1) % len(available_sockets)
elif ch in (curses.KEY_DOWN, ord('j')):
selected_idx = (selected_idx + 1) % len(available_sockets)
elif ch in (curses.KEY_ENTER, 10, 13):
result = available_sockets[selected_idx]
del dialog_win
stdscr.clear()
stdscr.refresh()
return result
elif ch == 27: # ESC
del dialog_win
stdscr.clear()
stdscr.refresh()
return None
def menu_tree(self):
"""建立多層選單結構"""
return MenuNode("Main Menu", children=[
MenuNode("MavLink Object Control", children=[
MenuNode("New+", "新增一個連結口", "ADD_MAV_OBJECT"),
MenuNode("Remove", "移除某個連結口", "REMOVE_MAV_OBJECT"),
MenuNode("ListAll", "顯示連結口列表", "LIST_MAV_OBJECT"),
MenuNode("Inspection", "顯示連結口資訊", "INSPECT_MAV_OBJECT"),
MenuNode("MavLink Object", "控制 MavLink 物件", children=[
MenuNode("New+", children=[
MenuNode("UDP InBound", children=[
MenuNode("IP(Listen)", "設定監聽的 IP 位址", "TEXT_UDP_IP"),
MenuNode("Port(Listen)", "設定監聽的 Port", "TEXT_UDP_PORT"),
MenuNode("Create", "建立 UDP InBound 連結口", "CREATE_UDP_INBOUND"),
]),
MenuNode("UDP OutBound", children=[
MenuNode("IP(Target)", "設定目標的 IP 位址", "TEXT_UDP_IP"),
MenuNode("Port(Target)", "設定目標的 Port", "TEXT_UDP_PORT"),
MenuNode("Create", "建立 UDP OutBound 連結口", "CREATE_UDP_OUTBOUND"),
]),
]),
MenuNode("參數設定", children=[
MenuNode("Speed", children=[
MenuNode("Increase Speed", "增加速度", "ADJUST_SPEED"),
MenuNode("Decrease Speed", "減少速度", "ADJUST_SPEED"),
MenuNode("ListAll", "顯示並管理所有連結口", "LIST_MAV_OBJECT"),
]),
MenuNode("輸入文字", "鍵盤輸入模式", "INPUT_TEXT"),
MenuNode("Engineer Mode", "工程模式", children=[
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), #TODO: 尚未實作
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), #TODO: 尚未實作
]),
MenuNode("資訊查看", children=[
MenuNode("顯示狀態", "查看當前狀態", "SHOW_STATUS"),
MenuNode("顯示輸入", "查看用戶輸入", "SHOW_INPUT"),
MenuNode("Shutdown", "關閉整個系統", children=[
MenuNode("Return", "繼續運行", "BACK"),
MenuNode("Confirm", "關閉系統", "QUIT"),
]),
MenuNode("返回上層", "回到上一層選單", "BACK"),
MenuNode("關閉面板", "關閉控制面板", "QUIT"),
])
def panel_thread(self, cmd_q: queue.Queue, stop_evt: threading.Event):
def panel_thread(self, cmd_q: queue.Queue, state: PanelState, stop_evt: threading.Event):
stdscr = None
def cleanup():
@ -71,6 +302,12 @@ class ControlPanel:
curses.echo()
curses.endwin()
def panel_shutdown():
# 先關閉所有模組 再關閉面板
cmd_q.put("SHUTDOWN_BRIDGE")
cmd_q.put("SHUTDOWN_MANAGER")
def draw_menu(screen):
nonlocal stdscr
stdscr = screen
@ -83,6 +320,8 @@ class ControlPanel:
menu_stack = [self.menu_tree()] # 選單堆疊
idx_stack = [0] # 索引堆疊
state.intoSTART() # 設定狀態為運行中
while not stop_evt.is_set():
# 檢查是否需要退出
if stop_evt.is_set():
@ -91,35 +330,92 @@ class ControlPanel:
current_menu = menu_stack[-1]
current_idx = idx_stack[-1]
# 獲取終端機尺寸
height, width = stdscr.getmaxyx()
# 簡單暴力的限制視窗的大小
if height < 20 or width < 60:
logger.error("Terminal size too small for Control Panel.")
break
stdscr.clear()
stdscr.border()
stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD)
stdscr.addstr(1, 2, f"mavlink bridge state : ")
stdscr.addstr(2, 2, f"object manager state : ")
stdscr.addstr(3, 2, f"Node state : ")
stdscr.addstr(1, 2, f" Panel Status : {state.panel_status}")
stdscr.addstr(2, 2, f"mavlink Bridge State : {state.mavlink_bridge_state}")
stdscr.addstr(3, 2, f"Object Manager State : {state.object_manager_state}")
stdscr.addstr(4, 2, f"Socket Object number : {len(state.socket_object_list)}")
# # Header - 顯示選單路徑
# path = " → ".join([menu.name for menu in menu_stack])
# stdscr.addstr(0, 2, f"控制面板: {path}", curses.A_BOLD)
# stdscr.addstr(1, 2, f"狀態: {state.status} | 速度: {state.speed} | 計數: {state.counter}")
# if state.user_input:
# stdscr.addstr(2, 2, f"輸入: {state.user_input[:50]}...")
# # 更新模組狀態顯示
# stdscr.addstr(2, 25, f"{state.mavlink_bridge_state}")
# stdscr.addstr(3, 25, f"{state.object_manager_state}")
# stdscr.addstr(4, 25, f"{len(state.socket_object_list)} ")
# 顯示當前選單項目
start_line = 5
start_line = 6
for i, child in enumerate(current_menu.children):
marker = "" if i == current_idx else " "
line = f"{marker}{child.name:15s} {child.desc}"
# 動態顯示已輸入的值
desc = child.desc
if child.action == "TEXT_UDP_IP" and state.udp_info_temp["IP"]:
desc = f"{child.desc} [{state.udp_info_temp['IP']}]"
elif child.action == "TEXT_UDP_PORT" and state.udp_info_temp["Port"]:
desc = f"{child.desc} [{state.udp_info_temp['Port']}]"
line = f"{marker}{child.name:15s} {desc}"
attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL
stdscr.addstr(start_line + i, 4, line, attr)
# 顯示訊息區域
# info_start_line = start_line + len(current_menu.children) + 1
info_start_line = height - 8
max_msg_lines = 5 # 最多顯示 5 行訊息
current_time = time.time()
# 清理過時的訊息
state.panel_info_msg_list = [
(msg, timestamp) for msg, timestamp in state.panel_info_msg_list
if current_time - timestamp < 2.0 #秒數
]
# 只顯示最新的 max_msg_lines 條訊息
display_msgs = state.panel_info_msg_list[-max_msg_lines:]
for i, msg_data in enumerate(display_msgs):
if info_start_line + i >= help_line - 1: # 避免超出邊界
break
msg = msg_data[0] if isinstance(msg_data, tuple) else msg_data
# 截斷過長的訊息
max_msg_width = width - 6
if len(msg) > max_msg_width:
msg = msg[:max_msg_width-3] + "..."
stdscr.addstr(info_start_line + i, 2, f"💬 {msg}", curses.A_BOLD)
# 操作說明
help_line = start_line + len(current_menu.children) + 2
stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 ←→調參數 q退出", curses.A_DIM)
# help_line = start_line + len(current_menu.children) + 2
help_line = height - 2
stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層 q退出", curses.A_DIM)
stdscr.refresh()
# 若進入 TERMINATION 狀態,畫面可以刷新 但是不能操作
# 驗證 bridge 跟 manager 狀態 兩者都停止後 就進入 STOPPED 狀態並跳出迴圈
# 超過幾秒沒有反應就強制關閉
if state.panel_status == "Terminating":
if time.time() - state.termination_start_time > 3:
logger.warning("Control Panel forced shutdown after timeout.")
state.intoSTOPPED()
stop_evt.set()
continue
time.sleep(0.1)
if state.mavlink_bridge_state == "Stopped" and state.object_manager_state == "Stopped":
state.intoSTOPPED()
stop_evt.set()
continue
# 設定短暫的 timeout讓執行緒能夠響應 stop_evt
stdscr.timeout(100) # 100ms timeout
ch = stdscr.getch()
@ -134,22 +430,26 @@ class ControlPanel:
elif ch in (curses.KEY_DOWN, ord('j')):
idx_stack[-1] = (current_idx + 1) % len(current_menu.children)
elif ch == (ord('O')):
# 直接進入工程模式
state.intoENGINEER()
elif ch == curses.KEY_LEFT:
# 返回上層或調整參數
# 返回上層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
else:
# 在根選單,檢查是否為調整參數
selected = current_menu.children[current_idx]
if selected.action == "ADJUST_SPEED":
state.speed = max(1, state.speed - 1)
elif ch == curses.KEY_RIGHT:
# 調整參數
# 進入下層 (但不執行動作)
selected = current_menu.children[current_idx]
if selected.action == "ADJUST_SPEED":
state.speed = min(10, state.speed + 1)
if selected.children: # 有子選單
menu_stack.append(selected)
idx_stack.append(0)
elif ch in (ord('q'), 27):
state.intoTERMINATION()
panel_shutdown()
elif ch in (curses.KEY_ENTER, 10, 13):
selected = current_menu.children[current_idx]
@ -165,96 +465,320 @@ class ControlPanel:
idx_stack.pop()
elif selected.action == "QUIT":
break
state.intoTERMINATION()
panel_shutdown()
elif selected.action == "INPUT_TEXT":
# 進入輸入模式
result = input_dialog(stdscr, "請輸入文字: ")
elif selected.action == "TEXT_UDP_IP":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 IP 位址: ")
if result is not None:
cmd_q.put(lambda: state.set_user_input(result))
state.udp_info_temp["IP"] = result
elif selected.action == "SHOW_STATUS":
# 顯示狀態訊息
stdscr.clear()
stdscr.addstr(5, 2, f"當前狀態: {state.status}")
stdscr.addstr(6, 2, f"速度設定: {state.speed}")
stdscr.addstr(7, 2, f"計數器: {state.counter}")
stdscr.addstr(9, 2, "按任意鍵返回...")
stdscr.refresh()
stdscr.getch()
elif selected.action == "TEXT_UDP_PORT":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 Port: ")
if result is not None:
state.udp_info_temp["Port"] = result
elif selected.action == "SHOW_INPUT":
# 顯示用戶輸入
stdscr.clear()
stdscr.addstr(5, 2, f"用戶輸入內容:")
stdscr.addstr(6, 2, f"{state.user_input}")
stdscr.addstr(8, 2, "按任意鍵返回...")
stdscr.refresh()
stdscr.getch()
elif selected.action == "CREATE_UDP_INBOUND":
cmd_q.put("CREATE_UDP_INBOUND")
# 確認後回到上兩層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
menu_stack.pop()
idx_stack.pop()
elif selected.action == "CREATE_UDP_OUTBOUND":
cmd_q.put("CREATE_UDP_OUTBOUND")
# 確認後回到上兩層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
menu_stack.pop()
idx_stack.pop()
elif selected.action == "LIST_MAV_OBJECT":
# 動態生成 mavlink_object 列表選單
object_list_menu = self.create_object_list_menu(state, page=0)
menu_stack.append(object_list_menu)
idx_stack.append(0)
elif selected.action == "PREV_PAGE":
# 上一頁
if hasattr(selected, 'page'):
menu_stack.pop()
idx_stack.pop()
object_list_menu = self.create_object_list_menu(state, page=selected.page)
menu_stack.append(object_list_menu)
idx_stack.append(0)
elif selected.action == "NEXT_PAGE":
# 下一頁
if hasattr(selected, 'page'):
menu_stack.pop()
idx_stack.pop()
object_list_menu = self.create_object_list_menu(state, page=selected.page)
menu_stack.append(object_list_menu)
idx_stack.append(0)
elif selected.action == "INSPECT_MAV_OBJECT":
# 顯示物件詳細資訊
if hasattr(selected, 'socket_id'):
cmd_q.put(("INSPECT_MAV_OBJECT", selected.socket_id))
self.show_object_info(stdscr, selected.socket_id, state)
elif selected.action == "REMOVE_MAV_OBJECT":
# 移除物件
if hasattr(selected, 'socket_id'):
cmd_q.put(("REMOVE_OBJECT", selected.socket_id))
# 返回上層(回到列表)
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
# 反正刷新列表會出錯 乾脆再退一層 在下一次進入列表時刷新就好
menu_stack.pop()
idx_stack.pop()
# # 刷新列表頁面
# if len(menu_stack) > 1:
# current_page = menu_stack[-1].current_page if hasattr(menu_stack[-1], 'current_page') else 0
# menu_stack.pop()
# idx_stack.pop()
# time.sleep(0.1) # 等待物件被移除
# object_list_menu = self.create_object_list_menu(state, page=current_page)
# menu_stack.append(object_list_menu)
# idx_stack.append(0)
elif selected.action == "MAVOBJ_MAKE_LINK":
# 建立轉發連結
if hasattr(selected, 'socket_id'):
target_id = self.select_target_socket(stdscr, selected.socket_id, state)
if target_id is not None:
cmd_q.put(("MAVOBJ_MAKE_LINK", selected.socket_id, target_id))
elif selected.action == "MAVOBJ_ADD_TARGET":
# 添加目標端口
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
if hasattr(selected, 'socket_id'):
target_id = self.select_target_socket(stdscr, selected.socket_id, state)
if target_id is not None:
cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id))
elif selected.action == "MAVOBJ_REMOVE_TARGET":
# 移除目標端口
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
if hasattr(selected, 'socket_id'):
target_id = self.select_target_socket(stdscr, selected.socket_id, state, remove_mode=True)
if target_id is not None:
cmd_q.put(("MAVOBJ_REMOVE_TARGET", selected.socket_id, target_id))
elif selected.action == "STOP_MANAGER":
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
cmd_q.put("SHUTDOWN_MANAGER")
elif selected.action == "STOP_BRIDGE":
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
cmd_q.put("SHUTDOWN_BRIDGE")
elif callable(selected.action):
# 執行函式
cmd_q.put(selected.action)
elif ch in (ord('q'), 27):
break
try:
curses.wrapper(draw_menu)
except KeyboardInterrupt:
pass
finally:
stop_evt.set()
cleanup()
def main():
logger.warning(f"Hello this is mainOrchestrator.py")
pp = ControlPanel()
cmd_q = queue.Queue()
stop_evt = threading.Event()
panel_thread_obj = None
def signal_handler(signum, frame):
"""處理 Ctrl+C 信號"""
print("\n收到中斷信號,正在安全退出...")
stop_evt.set()
if panel_thread_obj and panel_thread_obj.is_alive():
panel_thread_obj.join(timeout=2)
sys.exit(0)
class Orchestrator:
def __init__(self, stop_sig):
self.stop_evt = stop_sig
# 註冊信號處理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# === 1) 面板部分的準備 ===
self.cmd_q = queue.Queue()
self.panelState = PanelState() # 面板的狀態儲存
self.cPanel = ControlPanel() # 面板的功能
self.panel_thread = None
# 啟動控制面板(改為非 daemon
panel_thread_obj = threading.Thread(target=pp.panel_thread, args=(cmd_q, stop_evt))
panel_thread_obj.start()
# === 2) async_io_manager & mavlink_bridge 部分的準備 ===
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
self.manager = mo.async_io_manager()
self.bridge = mo.mavlink_bridge()
print("多層選單控制面板啟動。Ctrl+C 結束程式。")
def engageWholeSystem(self):
"""啟動整個系統"""
# === 1) 面板部分的啟動 ===
self.panel_thread = threading.Thread(target=self.cPanel.panel_thread, args=(self.cmd_q, self.panelState, self.stop_evt))
self.panel_thread.start()
# === 2) async_io_manager & mavlink_bridge 部分的啟動 ===
self.manager.start()
self.bridge.start()
def mainLoop(self):
logger.info("Main orchestrator started <-")
try:
while not stop_evt.is_set():
while not self.stop_evt.is_set():
# A. 更新模組狀態
if self.manager.running:
self.panelState.object_manager_state = 'Running'
else:
self.panelState.object_manager_state = 'Stopped'
socketid_list = self.manager.get_managed_objects()
self.panelState.socket_object_list = socketid_list
if self.bridge.running:
self.panelState.mavlink_bridge_state = 'Running'
else:
self.panelState.mavlink_bridge_state = 'Stopped'
# 取出面板丟過來的「動作」
try:
fn = cmd_q.get_nowait()
fn() # 執行對應動作
cmd = self.cmd_q.get_nowait()
if callable(cmd):
cmd() # 執行對應動作
elif isinstance(cmd, tuple):
# 處理多參數命令
action = cmd[0]
if action == "REMOVE_OBJECT":
socket_id = cmd[1]
# 先移除所有跟他關聯的 target sockets
for s_id in mo.mavlink_object.mavlinkObjects:
s_obj = mo.mavlink_object.mavlinkObjects[s_id]
if socket_id in s_obj.target_sockets:
self.remove_target_from_object(s_id, socket_id)
# 再移除該物件
self.delete_mavlink_object(socket_id)
elif action == "MAVOBJ_MAKE_LINK":
source_id, target_id = cmd[1], cmd[2]
self.add_target_to_object(source_id, target_id)
self.add_target_to_object(target_id, source_id) # 雙向連結
elif action == "MAVOBJ_ADD_TARGET":
source_id, target_id = cmd[1], cmd[2]
self.add_target_to_object(source_id, target_id)
elif action == "MAVOBJ_REMOVE_TARGET":
source_id, target_id = cmd[1], cmd[2]
self.remove_target_from_object(source_id, target_id)
elif action == "INSPECT_MAV_OBJECT":
socket_id = cmd[1]
mav_obj = self.manager.managed_objects.get(socket_id, None)
if mav_obj:
self.panelState.socket_info_single["socket_type"] = mav_obj.socket_type
# self.panelState.socket_info_single["socket_state"] = "Running" if mav_obj.running
self.panelState.socket_info_single["bridge_msg_types"] = mav_obj.bridge_msg_types
self.panelState.socket_info_single["return_msg_types"] = mav_obj.return_msg_types
self.panelState.socket_info_single["primary_socket_id"] = mav_obj.primary_socket_id
self.panelState.socket_info_single["target_sockets"] = mav_obj.target_sockets
self.panelState.socket_info_single["InfoReady"] = True # 標記資訊已準備好
elif cmd == "CREATE_UDP_INBOUND":
self.panelState.udp_info_temp["direction"] = "inbound"
self.create_udp_object()
elif cmd == "CREATE_UDP_OUTBOUND":
self.panelState.udp_info_temp["direction"] = "outbound"
self.create_udp_object()
elif cmd == "SHUTDOWN_BRIDGE":
self.bridge.stop()
elif cmd == "SHUTDOWN_MANAGER":
self.manager.shutdown()
except queue.Empty:
pass
except Exception as e:
logger.error(f"Error processing command: {e}")
# # 模擬你的長跑邏輯
# if state.status == "running":
# # 依 speed 前進
# state.counter += state.speed
time.sleep(0.1)
except KeyboardInterrupt:
print("\n收到 Ctrl+C準備結束...")
pass
except Exception as e:
logger.error(f"Unexpected error in main loop: {e}")
finally:
logger.info("Main orchestrator END!")
# 關閉 mavlink_bridge (裡面有一個執行緒)
self.bridge.stop()
# 關閉 async_io_manager (裡面有一個執行緒)
self.manager.shutdown()
# 關閉面板執行緒
if self.panel_thread.is_alive():
self.panel_thread.join(timeout=2)
def create_udp_object(self):
if self.panelState.udp_info_temp["direction"] == "inbound":
connection_string = f"udp:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}"
elif self.panelState.udp_info_temp["direction"] == "outbound":
connection_string = f"udpout:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}"
try:
mavlink_socket = mavutil.mavlink_connection(connection_string)
except Exception as e:
self.panelState.panel_info_msg_list.append((f"Failed to create UDP {self.panelState.udp_info_temp['direction']} object: {e}", time.time()-1))
return
mavlink_object = mo.mavlink_object(mavlink_socket)
mavlink_object.socket_type = "UDP " + self.panelState.udp_info_temp['direction'].capitalize()
self.manager.add_mavlink_object(mavlink_object)
self.panelState.panel_info_msg_list.append((f"Created UDP {self.panelState.udp_info_temp['direction']} object: {connection_string}", time.time()))
def delete_mavlink_object(self, socket_id):
"""移除指定的 mavlink_object"""
self.manager.remove_mavlink_object(socket_id)
def add_target_to_object(self, source_id, target_id):
"""為 mavlink_object 添加轉發目標"""
if source_id in mo.mavlink_object.mavlinkObjects:
source_obj = mo.mavlink_object.mavlinkObjects[source_id]
else:
self.panelState.panel_info_msg_list.append((f"Source object {source_id} not found", time.time()))
return
if source_obj.add_target_socket(target_id):
self.panelState.panel_info_msg_list.append((f"Added target {target_id} to socket {source_id}", time.time()))
else:
self.panelState.panel_info_msg_list.append((f"Fail Adding target {target_id} to socket {source_id}", time.time()))
def remove_target_from_object(self, source_id, target_id):
"""從 mavlink_object 移除轉發目標"""
if source_id in mo.mavlink_object.mavlinkObjects:
source_obj = mo.mavlink_object.mavlinkObjects[source_id]
else:
self.panelState.panel_info_msg_list.append((f"Source object {source_id} not found", time.time()))
return
if source_obj.remove_target_socket(target_id):
self.panelState.panel_info_msg_list.append((f"Removed target {target_id} from socket {source_id}", time.time()))
else:
self.panelState.panel_info_msg_list.append((f"Fail Removing target {target_id} from socket {source_id}", time.time()))
def main():
stop_evt = threading.Event()
def signal_handler(signum, frame):
"""處理 Ctrl+C 信號 藉此訊號 會結束下面的 while 迴圈 並逐步關閉各模組"""
stop_evt.set()
if panel_thread_obj.is_alive():
panel_thread_obj.join(timeout=2)
pass
# 註冊信號處理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
orchestrator = Orchestrator(stop_evt)
orchestrator.engageWholeSystem()
orchestrator.mainLoop() # 程式會 block 在這邊 直到收到中斷信號
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

@ -0,0 +1,435 @@
"""
VehicleView - Pure State Container
純粹的狀態容器不主動通訊不背景下載參數不搶 RF/MAVLink 流量
只提供狀態存取介面由外部手動餵資料push state
"""
import os
from typing import Dict, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
# ====================== Enums =====================
class ConnectionType(Enum):
"""連接類型"""
SERIAL = "serial"
UDP = "udp"
TCP = "tcp"
OTHER = "other"
class ComponentType(Enum):
"""組件類型"""
AUTOPILOT = "autopilot"
GCS = "gcs"
CAMERA = "camera"
GIMBAL = "gimbal"
OTHER = "other"
class RFModuleType(Enum):
"""RF模組類型"""
XBEE = "xbee"
UDP = "udp"
TCP = "tcp"
OTHER = "other"
# ====================== Data Classes =====================
@dataclass
class Position:
"""位置資訊"""
latitude: Optional[float] = None # 緯度 (度)
longitude: Optional[float] = None # 經度 (度)
altitude: Optional[float] = None # 海拔高度 (公尺)
relative_altitude: Optional[float] = None # 相對高度 (公尺)
timestamp: Optional[float] = None # 時間戳記
@dataclass
class Attitude:
"""姿態資訊"""
roll: Optional[float] = None # 橫滾角 (弧度)
pitch: Optional[float] = None # 俯仰角 (弧度)
yaw: Optional[float] = None # 偏航角 (弧度)
rollspeed: Optional[float] = None # 橫滾速度 (弧度/秒)
pitchspeed: Optional[float] = None # 俯仰速度 (弧度/秒)
yawspeed: Optional[float] = None # 偏航速度 (弧度/秒)
timestamp: Optional[float] = None # 時間戳記
@dataclass
class FlightMode:
"""飛行模式資訊"""
base_mode: Optional[int] = None # MAVLink base mode
custom_mode: Optional[int] = None # 自定義模式
mode_name: Optional[str] = None # 模式名稱 (例如: "GUIDED", "AUTO")
timestamp: Optional[float] = None # 時間戳記
@dataclass
class Battery:
"""電池資訊"""
voltage: Optional[float] = None # 電壓 (V)
current: Optional[float] = None # 電流 (A)
remaining: Optional[int] = None # 剩餘電量 (%)
temperature: Optional[float] = None # 溫度 (攝氏)
timestamp: Optional[float] = None # 時間戳記
@dataclass
class EKF:
"""EKF狀態資訊"""
flags: Optional[int] = None # EKF 旗標
velocity_variance: Optional[float] = None # 速度變異
pos_horiz_variance: Optional[float] = None # 水平位置變異
pos_vert_variance: Optional[float] = None # 垂直位置變異
compass_variance: Optional[float] = None # 羅盤變異
terrain_alt_variance: Optional[float] = None # 地形高度變異
timestamp: Optional[float] = None # 時間戳記
@dataclass
class GPS:
"""GPS資訊"""
fix_type: Optional[int] = None # GPS fix 類型 (0=無, 1=無fix, 2=2D, 3=3D, 4=DGPS, 5=RTK)
satellites_visible: Optional[int] = None # 可見衛星數
eph: Optional[int] = None # GPS HDOP 水平精度因子
epv: Optional[int] = None # GPS VDOP 垂直精度因子
timestamp: Optional[float] = None # 時間戳記
@dataclass
class VFR:
"""VFR HUD資訊"""
airspeed: Optional[float] = None # 空速 (m/s)
groundspeed: Optional[float] = None # 地速 (m/s)
heading: Optional[int] = None # 航向 (度)
throttle: Optional[int] = None # 油門 (%)
climb: Optional[float] = None # 爬升率 (m/s)
timestamp: Optional[float] = None # 時間戳記
@dataclass
class ComponentStatus:
"""組件狀態容器"""
position: Position = field(default_factory=Position)
attitude: Attitude = field(default_factory=Attitude)
mode: FlightMode = field(default_factory=FlightMode)
battery: Battery = field(default_factory=Battery)
ekf: EKF = field(default_factory=EKF)
gps: GPS = field(default_factory=GPS)
vfr: VFR = field(default_factory=VFR)
# 系統狀態
system_status: Optional[int] = None # MAV_STATE
armed: Optional[bool] = None # 解鎖狀態
# 其他自定義狀態
custom_status: Dict[str, Any] = field(default_factory=dict)
@dataclass
class PacketStats:
"""封包統計資訊"""
received_count: int = 0 # 接收封包數
lost_count: int = 0 # 遺失封包數
last_seq: Optional[int] = None # 最後序號
last_msg_time: Optional[float] = None # 最後訊息時間
msg_type_count: Dict[int, int] = field(default_factory=dict) # 各類訊息計數 {msg_type: count}
@dataclass
class RFStatus:
"""RF模組狀態"""
rssi: Optional[int] = None # 信號強度
noise: Optional[int] = None # 噪音水平
at_response: Optional[str] = None # AT 命令回應
link_quality: Optional[int] = None # 連接品質
timestamp: Optional[float] = None # 時間戳記
custom_status: Dict[str, Any] = field(default_factory=dict) # 其他自定義狀態
@dataclass
class SocketInfo:
"""Socket連接資訊"""
ip: Optional[str] = None # IP位址
port: Optional[int] = None # 埠號
local_ip: Optional[str] = None # 本地IP
local_port: Optional[int] = None # 本地埠號
connected: bool = False # 連接狀態
# ====================== Component Class =====================
class VehicleComponent:
"""載具組件 - 代表一個 MAVLink component"""
def __init__(self, component_id: int, comp_type: ComponentType = ComponentType.OTHER):
self.component_id = component_id
self.type = comp_type
# MAVLink 組件資訊
self.mav_type: Optional[int] = None # MAV_TYPE
self.mav_autopilot: Optional[int] = None # MAV_AUTOPILOT
# 狀態容器
self.status = ComponentStatus()
# 參數容器 (只存放,不主動下載)
self.parameters: Dict[str, Any] = {}
# 封包統計
self.packet_stats = PacketStats()
def update_packet_stats(self, seq: int, msg_type: int, timestamp: float) -> None:
"""
更新封包統計
Args:
seq: 訊息序號
msg_type: 訊息類型
timestamp: 時間戳記
"""
stats = self.packet_stats
# 檢查遺失封包
if stats.last_seq is not None:
expected_seq = (stats.last_seq + 1) % 256
diff = seq - expected_seq
if diff < 0:
diff += 256
stats.lost_count += diff
# 更新統計資訊
stats.received_count += 1
stats.last_seq = seq
stats.last_msg_time = timestamp
# 更新訊息類型計數
if msg_type in stats.msg_type_count:
stats.msg_type_count[msg_type] += 1
else:
stats.msg_type_count[msg_type] = 1
def reset_packet_stats(self) -> None:
"""重置封包統計"""
self.packet_stats = PacketStats()
def set_parameter(self, param_name: str, param_value: Any) -> None:
"""設定參數 (手動餵入)"""
self.parameters[param_name] = param_value
def get_parameter(self, param_name: str, default: Any = None) -> Any:
"""取得參數"""
return self.parameters.get(param_name, default)
def __str__(self) -> str:
return (f"Component(id={self.component_id}, type={self.type.value}, "
f"mav_type={self.mav_type}, received={self.packet_stats.received_count}, "
f"lost={self.packet_stats.lost_count})")
# ====================== RF Module Class =====================
class RFModule:
"""RF模組資訊容器"""
def __init__(self, rf_type: RFModuleType = RFModuleType.OTHER):
self.type = rf_type
self.status = RFStatus()
self.socket_info = SocketInfo()
def update_rssi(self, rssi: int, timestamp: Optional[float] = None) -> None:
"""更新RSSI"""
self.status.rssi = rssi
if timestamp:
self.status.timestamp = timestamp
def update_at_response(self, response: str, timestamp: Optional[float] = None) -> None:
"""更新AT命令回應"""
self.status.at_response = response
if timestamp:
self.status.timestamp = timestamp
def update_socket_info(self, ip: str = None, port: int = None,
local_ip: str = None, local_port: int = None,
connected: bool = None) -> None:
"""更新Socket資訊"""
if ip is not None:
self.socket_info.ip = ip
if port is not None:
self.socket_info.port = port
if local_ip is not None:
self.socket_info.local_ip = local_ip
if local_port is not None:
self.socket_info.local_port = local_port
if connected is not None:
self.socket_info.connected = connected
def __str__(self) -> str:
return (f"RFModule(type={self.type.value}, rssi={self.status.rssi}, "
f"connected={self.socket_info.connected})")
# ====================== Main VehicleView Class =====================
class VehicleView:
"""
載具視圖 - 純狀態容器
特點:
1. 只有狀態容器沒有任何主動通訊功能
2. 不主動通訊不背景下載參數不搶 RF/MAVLink 流量
3. 可以手動餵資料 (push state)
4. 可擴充 (支援 RF 模組狀態)
"""
def __init__(self, sysid: int):
# Meta 資訊
self.sysid = sysid
self.kind: Optional[str] = None # 載具種類描述 (例如: "Copter", "Plane")
self.vehicle_type: Optional[int] = None # MAV_TYPE
self.connected_via: ConnectionType = ConnectionType.OTHER
# 組件容器 {component_id: VehicleComponent}
self.components: Dict[int, VehicleComponent] = {}
# RF模組
self.rf_module: Optional[RFModule] = None
# 其他自定義meta資訊
self.custom_meta: Dict[str, Any] = {}
def add_component(self, component_id: int, comp_type: ComponentType = ComponentType.OTHER) -> VehicleComponent:
"""
新增組件
Args:
component_id: 組件ID
comp_type: 組件類型
Returns:
VehicleComponent: 新增的組件
"""
if component_id not in self.components:
self.components[component_id] = VehicleComponent(component_id, comp_type)
logger.info(f"Added component {component_id} to system {self.sysid}")
return self.components[component_id]
def get_component(self, component_id: int) -> Optional[VehicleComponent]:
"""取得組件"""
return self.components.get(component_id)
def remove_component(self, component_id: int) -> bool:
"""移除組件"""
if component_id in self.components:
del self.components[component_id]
logger.info(f"Removed component {component_id} from system {self.sysid}")
return True
return False
def set_rf_module(self, rf_type: RFModuleType) -> RFModule:
"""設定RF模組"""
self.rf_module = RFModule(rf_type)
return self.rf_module
def get_rf_module(self) -> Optional[RFModule]:
"""取得RF模組"""
return self.rf_module
def __str__(self) -> str:
comp_list = ", ".join([str(cid) for cid in self.components.keys()])
return (f"VehicleView(sysid={self.sysid}, kind={self.kind}, "
f"connected_via={self.connected_via.value}, "
f"components=[{comp_list}], "
f"rf_module={self.rf_module is not None})")
def to_dict(self) -> Dict[str, Any]:
"""轉換為字典格式 (方便序列化/除錯)"""
return {
'meta': {
'sysid': self.sysid,
'kind': self.kind,
'vehicle_type': self.vehicle_type,
'connected_via': self.connected_via.value,
'custom_meta': self.custom_meta
},
'components': {
cid: {
'component_id': comp.component_id,
'type': comp.type.value,
'mav_type': comp.mav_type,
'mav_autopilot': comp.mav_autopilot,
'packet_stats': {
'received': comp.packet_stats.received_count,
'lost': comp.packet_stats.lost_count,
'last_seq': comp.packet_stats.last_seq,
'last_msg_time': comp.packet_stats.last_msg_time
},
'parameters_count': len(comp.parameters)
}
for cid, comp in self.components.items()
},
'rf_module': {
'type': self.rf_module.type.value,
'rssi': self.rf_module.status.rssi,
'socket_connected': self.rf_module.socket_info.connected
} if self.rf_module else None
}
# ====================== Registry =====================
class VehicleViewRegistry:
"""載具視圖註冊表 - 管理所有的 VehicleView"""
def __init__(self):
self._vehicles: Dict[int, VehicleView] = {}
def register(self, sysid: int) -> VehicleView:
"""註冊新的載具視圖"""
if sysid not in self._vehicles:
self._vehicles[sysid] = VehicleView(sysid)
logger.info(f"Registered new VehicleView for system {sysid}")
return self._vehicles[sysid]
def get(self, sysid: int) -> Optional[VehicleView]:
"""取得載具視圖"""
return self._vehicles.get(sysid)
def unregister(self, sysid: int) -> bool:
"""註銷載具視圖"""
if sysid in self._vehicles:
del self._vehicles[sysid]
logger.info(f"Unregistered VehicleView for system {sysid}")
return True
return False
def get_all(self) -> Dict[int, VehicleView]:
"""取得所有載具視圖"""
return self._vehicles.copy()
def clear(self) -> None:
"""清空所有載具視圖"""
self._vehicles.clear()
logger.info("Cleared all VehicleViews")
def __len__(self) -> int:
return len(self._vehicles)
def __contains__(self, sysid: int) -> bool:
return sysid in self._vehicles
# ====================== Global Instance =====================
# 全域註冊表實例
vehicle_registry = VehicleViewRegistry()

@ -14,12 +14,13 @@ from pymavlink import mavutil
# 自定義的 import
from ..fc_network_adapter import mavlinkObject as mo
from ..fc_network_adapter import mavlinkDevice as md
from ..fc_network_adapter import mavlinkVehicleView as mvv
# from ..fc_network_adapter import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 10
running_time = 10000
running_time = 3
print('test_item : ', test_item)
@ -44,10 +45,13 @@ if test_item == 10:
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
connection_string="udp:127.0..1:14571"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
sock = mavlink_socket1.port
print("Socket port:", sock)
manager.add_mavlink_object(mavlink_object1)
start_time = time.time()
@ -65,7 +69,7 @@ if test_item == 10:
print("return_packet_ring is empty")
time.sleep(1)
manager.stop()
manager.shutdown()
print('<=== End of Program')
@ -83,20 +87,22 @@ elif test_item == 11:
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
connection_string="udp:127.0.0.1:14571"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
bridge = mo.mavlink_bridge()
bridge.start()
time.sleep(3)
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
all_vehicles = mvv.vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
print(f" System {sysid}: {vehicle}")
start_time = time.time()
show_time = time.time()
@ -105,14 +111,15 @@ elif test_item == 11:
# print("mark B")
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
for sysid, vehicle in all_vehicles.items():
for compid in vehicle.components:
comp = vehicle.get_component(compid)
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, comp.packet_stats.received_count))
comp.reset_packet_stats()
print("===================")
manager.stop()
analyzer.stop()
manager.shutdown()
bridge.stop()
print('<=== End of Program')
@ -130,16 +137,16 @@ elif test_item == 12:
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
connection_string="udp:127.0.0.1:14571"
mavlink_socket_in1 = mavutil.mavlink_connection(connection_string)
mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1)
connection_string="udp:127.0.0.1:14561"
connection_string="udp:127.0.0.1:14571"
mavlink_socket_in2 = mavutil.mavlink_connection(connection_string)
mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2)
# 初始化輸出通道
connection_string="udpout:127.0.0.1:14550"
connection_string="udpout:127.0.0.1:14551"
mavlink_socket_out = mavutil.mavlink_connection(connection_string)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
@ -160,66 +167,11 @@ elif test_item == 12:
time.sleep(1)
manager.stop()
manager.shutdown()
print('<=== End of Program')
# elif test_item == 20:
# # 這邊測試 node 生成 topic 的功能
# # 利用 空的通道 發出假的 heartbeat 封包
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# from pymavlink.dialects.v20 import common as mavlink2
# sysid = 1
# compid = 1
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# mav = mavlink2.MAVLink(None)
# mav.srcSystem = sysid
# mav.srcComponent = compid
# # 這是一個 heartbeat 封包
# fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
# type=mavlink2.MAV_TYPE_QUADROTOR,
# autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
# base_mode=3,
# custom_mode=0,
# system_status=0,
# mavlink_version=3
# )
# mavlink_bytes = fake_heartbeat.pack(mav)
# fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
# print(analyzer.mavlink_systems)
# mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
# time.sleep(0.1)
# print(analyzer.mavlink_systems)
# # ROS2 初始化
# analyzer._init_node()
# # 創建 ROS2 topic
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
# print("topic created")
# time.sleep(5)
# # 丟出 topic
# analyzer.emit_info()
# # 結束程式
# analyzer.running = False
# analyzer.destroy_node()
# rclpy.shutdown()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
@ -291,160 +243,18 @@ elif test_item == 21:
mavlink_socket.close()
print('<=== End of Program')
# elif test_item == 22:
# # 需要開啟一個 ardupilot 的模擬器 與 GCS
# # 這邊是測試代碼 引入 rclpy 來測試 node 的運行
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 初始化兩個通道
# connection_string_in="udp:127.0.0.1:15551"
# mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
# mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
# mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
elif test_item == 52:
print('===> Start of Program .Test ', test_item)
manager = mo.async_io_manager()
manager.start()
# connection_string_out="udpout:127.0.0.1:14553"
# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
# mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
# mavlink_object_out.multiplexingToAnalysis = []
# print(manager.thread.is_alive())
manager.shutdown()
time.sleep(1)
print('manager stopped')
# # 讓兩個通道互相傳輸
# mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# # 啟動通道
# mavlink_object_in.run()
# mavlink_object_out.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# print('目前所有的系統 : ')
# for sysid in analyzer.mavlink_systems:
# print(analyzer.mavlink_systems[sysid])
# compid = 1
# sysid = 1
# show_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# show_time2 = time.time()
# while time.time() - start_time < running_time:
# if (time.time() - show_time2) >= 1:
# analyzer.emit_info() # 這邊是測試 node 的運行
# # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
# fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
# sss = mavutil.mode_string_v10(fmsg)
# # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
# print("目前的飛行模式 : {}".format(sss))
# show_time2 = time.time()
# # if (time.time() - show_time) >= 2:
# # show_time = time.time()
# # for sysid in analyzer.mavlink_systems:
# # for compid in analyzer.mavlink_systems[sysid].components:
# # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# # print("===================")
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object_in.stop()
# mavlink_object_in.thread.join()
# mavlink_socket_in.close()
# mavlink_object_out.stop()
# mavlink_object_out.thread.join()
# mavlink_socket_out.close()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
# elif test_item == 51:
# # 晉凱的測試項目
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 創建通道
# connection_string="udp:127.0.0.1:15551"
# mavlink_socket3 = mavutil.mavlink_connection(connection_string)
# mavlink_object3 = mo.mavlink_object(mavlink_socket3)
# # 設定通道流動
# mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
# mavlink_object3.multiplexingToReturn = [] #
# # mavlink_object3.multiplexingToSwap = [] #
# # 啟動通道
# mavlink_object3.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# compid = 1
# sysid = 1
# start_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
# end_time = time.time()
# print(f"Execution time for create all topic: {end_time - start_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# while time.time() - start_time < running_time:
# try:
# # rclpy.spin(analyzer)
# analyzer.emit_info() # 這邊是測試 node 的運行
# time.sleep(0.5)
# except KeyboardInterrupt:
# break
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object3.stop()
# mavlink_object3.thread.join()
# analyzer.stop()
# analyzer.thread.join()
# mavlink_socket3.close()
# print('<=== End of Program')

@ -0,0 +1,331 @@
"""
VehicleView 使用範例
展示如何使用純狀態容器來管理 MAVLink 載具資訊
"""
import time
from ..fc_network_adapter.mavlinkVehicleView import (
VehicleView,
VehicleComponent,
RFModule,
vehicle_registry,
ConnectionType,
ComponentType,
RFModuleType
)
def example_basic_usage():
"""基本使用範例"""
print("=== 基本使用範例 ===\n")
# 1. 建立載具視圖
vehicle = VehicleView(sysid=1)
vehicle.kind = "Copter"
vehicle.vehicle_type = 2 # MAV_TYPE_QUADROTOR
vehicle.connected_via = ConnectionType.UDP
print(f"建立載具: {vehicle}\n")
# 2. 新增 autopilot 組件
autopilot = vehicle.add_component(
component_id=1,
comp_type=ComponentType.AUTOPILOT
)
autopilot.mav_type = 2 # MAV_TYPE_QUADROTOR
autopilot.mav_autopilot = 3 # MAV_AUTOPILOT_ARDUPILOTMEGA
print(f"新增組件: {autopilot}\n")
# 3. 手動餵入位置資訊
autopilot.status.position.latitude = 25.0330
autopilot.status.position.longitude = 121.5654
autopilot.status.position.altitude = 100.5
autopilot.status.position.timestamp = time.time()
print(f"位置: 緯度={autopilot.status.position.latitude}, "
f"經度={autopilot.status.position.longitude}, "
f"高度={autopilot.status.position.altitude}m\n")
# 4. 手動餵入姿態資訊
autopilot.status.attitude.roll = 0.05 # 弧度
autopilot.status.attitude.pitch = -0.02
autopilot.status.attitude.yaw = 1.57
autopilot.status.attitude.timestamp = time.time()
print(f"姿態: Roll={autopilot.status.attitude.roll:.3f}, "
f"Pitch={autopilot.status.attitude.pitch:.3f}, "
f"Yaw={autopilot.status.attitude.yaw:.3f} rad\n")
# 5. 手動餵入飛行模式
autopilot.status.mode.base_mode = 89
autopilot.status.mode.custom_mode = 4
autopilot.status.mode.mode_name = "GUIDED"
autopilot.status.mode.timestamp = time.time()
print(f"飛行模式: {autopilot.status.mode.mode_name}\n")
# 6. 手動餵入電池資訊
autopilot.status.battery.voltage = 12.6
autopilot.status.battery.current = 15.2
autopilot.status.battery.remaining = 75
autopilot.status.battery.timestamp = time.time()
print(f"電池: 電壓={autopilot.status.battery.voltage}V, "
f"電流={autopilot.status.battery.current}A, "
f"剩餘={autopilot.status.battery.remaining}%\n")
def example_packet_tracking():
"""封包追蹤範例"""
print("\n=== 封包追蹤範例 ===\n")
vehicle = VehicleView(sysid=2)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
# 模擬接收封包
timestamp = time.time()
# 接收 HEARTBEAT (msg_type=0)
autopilot.update_packet_stats(seq=0, msg_type=0, timestamp=timestamp)
# 接收 ATTITUDE (msg_type=30)
autopilot.update_packet_stats(seq=1, msg_type=30, timestamp=timestamp+0.1)
# 接收 GLOBAL_POSITION_INT (msg_type=33)
autopilot.update_packet_stats(seq=2, msg_type=33, timestamp=timestamp+0.2)
# 模擬封包遺失 (seq 跳過 3, 4, 5)
autopilot.update_packet_stats(seq=6, msg_type=0, timestamp=timestamp+0.3)
stats = autopilot.packet_stats
print(f"封包統計:")
print(f" 接收: {stats.received_count}")
print(f" 遺失: {stats.lost_count}")
print(f" 最後序號: {stats.last_seq}")
print(f" 訊息類型計數: {stats.msg_type_count}\n")
def example_parameters():
"""參數管理範例"""
print("\n=== 參數管理範例 ===\n")
vehicle = VehicleView(sysid=3)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
# 手動設定參數 (不會主動下載)
autopilot.set_parameter("ARMING_CHECK", 1)
autopilot.set_parameter("ANGLE_MAX", 4500)
autopilot.set_parameter("WPNAV_SPEED", 500)
print(f"參數數量: {len(autopilot.parameters)}")
print(f"ARMING_CHECK = {autopilot.get_parameter('ARMING_CHECK')}")
print(f"ANGLE_MAX = {autopilot.get_parameter('ANGLE_MAX')}")
print(f"WPNAV_SPEED = {autopilot.get_parameter('WPNAV_SPEED')}\n")
def example_rf_module():
"""RF模組範例"""
print("\n=== RF模組範例 ===\n")
vehicle = VehicleView(sysid=4)
vehicle.connected_via = ConnectionType.SERIAL
# 設定 XBee RF 模組
rf = vehicle.set_rf_module(RFModuleType.XBEE)
# 更新 Socket 資訊
rf.update_socket_info(
ip="192.168.1.100",
port=14550,
local_ip="192.168.1.1",
local_port=14551,
connected=True
)
# 更新 RSSI
rf.update_rssi(rssi=-65, timestamp=time.time())
# 更新 AT 命令回應
rf.update_at_response("OK", timestamp=time.time())
# 自定義狀態
rf.status.custom_status['signal_quality'] = 'excellent'
rf.status.custom_status['packet_error_rate'] = 0.001
print(f"RF模組: {rf}")
print(f"Socket: {rf.socket_info.ip}:{rf.socket_info.port}")
print(f"RSSI: {rf.status.rssi} dBm")
print(f"AT回應: {rf.status.at_response}")
print(f"自定義狀態: {rf.status.custom_status}\n")
def example_multiple_components():
"""多組件範例"""
print("\n=== 多組件範例 ===\n")
vehicle = VehicleView(sysid=5)
vehicle.kind = "Copter with Gimbal"
# Autopilot 組件
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
autopilot.mav_type = 2
autopilot.status.mode.mode_name = "AUTO"
# Gimbal 組件
gimbal = vehicle.add_component(154, ComponentType.GIMBAL)
gimbal.mav_type = 26 # MAV_TYPE_GIMBAL
gimbal.status.attitude.pitch = -0.785 # 向下45度
gimbal.status.attitude.yaw = 0.0
# Camera 組件
camera = vehicle.add_component(100, ComponentType.CAMERA)
camera.mav_type = 30 # MAV_TYPE_CAMERA
camera.status.custom_status['recording'] = True
camera.status.custom_status['photo_interval'] = 2.0
print(f"載具: {vehicle}")
print(f"組件數量: {len(vehicle.components)}")
for cid, comp in vehicle.components.items():
print(f" 組件 {cid}: {comp.type.value}, MAV_TYPE={comp.mav_type}")
print()
def example_registry():
"""註冊表使用範例"""
print("\n=== 註冊表使用範例 ===\n")
# 註冊多個載具
v1 = vehicle_registry.register(sysid=1)
v1.kind = "Copter-1"
v1.add_component(1, ComponentType.AUTOPILOT)
v2 = vehicle_registry.register(sysid=2)
v2.kind = "Plane-1"
v2.add_component(1, ComponentType.AUTOPILOT)
v3 = vehicle_registry.register(sysid=3)
v3.kind = "Rover-1"
v3.add_component(1, ComponentType.AUTOPILOT)
print(f"註冊表中的載具數量: {len(vehicle_registry)}")
# 取得所有載具
all_vehicles = vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
print(f" System {sysid}: {vehicle.kind}")
# 檢查載具是否存在
print(f"\nSystem 2 存在? {2 in vehicle_registry}")
print(f"System 99 存在? {99 in vehicle_registry}")
# 取得特定載具
vehicle = vehicle_registry.get(2)
if vehicle:
print(f"\n取得載具: {vehicle}")
# 註銷載具
vehicle_registry.unregister(3)
print(f"\n註銷 System 3 後,剩餘載具: {len(vehicle_registry)}\n")
def example_serialization():
"""序列化範例 (除錯/日誌用)"""
print("\n=== 序列化範例 ===\n")
vehicle = VehicleView(sysid=10)
vehicle.kind = "Test Copter"
vehicle.connected_via = ConnectionType.UDP
vehicle.custom_meta['firmware'] = 'ArduCopter 4.3.0'
vehicle.custom_meta['frame_type'] = 'X'
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
autopilot.mav_type = 2
autopilot.status.position.altitude = 50.0
autopilot.status.battery.voltage = 12.4
autopilot.update_packet_stats(0, 0, time.time())
autopilot.update_packet_stats(1, 30, time.time())
rf = vehicle.set_rf_module(RFModuleType.UDP)
rf.update_rssi(-70)
rf.update_socket_info(ip="192.168.1.200", port=14550, connected=True)
# 轉換為字典
data = vehicle.to_dict()
print("載具資料 (字典格式):")
import json
print(json.dumps(data, indent=2, ensure_ascii=False))
def example_gps_ekf():
"""GPS 與 EKF 範例"""
print("\n\n=== GPS 與 EKF 範例 ===\n")
vehicle = VehicleView(sysid=11)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
# GPS 資訊
autopilot.status.gps.fix_type = 3 # 3D Fix
autopilot.status.gps.satellites_visible = 12
autopilot.status.gps.eph = 120 # HDOP = 1.2
autopilot.status.gps.epv = 180 # VDOP = 1.8
autopilot.status.gps.timestamp = time.time()
print(f"GPS:")
print(f" Fix Type: {autopilot.status.gps.fix_type}")
print(f" 衛星數: {autopilot.status.gps.satellites_visible}")
print(f" HDOP: {autopilot.status.gps.eph/100}")
# EKF 資訊
autopilot.status.ekf.flags = 0x1FF # 所有 flags 都 OK
autopilot.status.ekf.velocity_variance = 0.5
autopilot.status.ekf.pos_horiz_variance = 1.2
autopilot.status.ekf.pos_vert_variance = 2.0
autopilot.status.ekf.timestamp = time.time()
print(f"\nEKF:")
print(f" Flags: 0x{autopilot.status.ekf.flags:X}")
print(f" 速度變異: {autopilot.status.ekf.velocity_variance}")
print(f" 水平位置變異: {autopilot.status.ekf.pos_horiz_variance}")
print(f" 垂直位置變異: {autopilot.status.ekf.pos_vert_variance}\n")
def example_vfr_hud():
"""VFR HUD 範例"""
print("\n=== VFR HUD 範例 ===\n")
vehicle = VehicleView(sysid=12)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
# VFR HUD 資訊
autopilot.status.vfr.airspeed = 15.5 # m/s
autopilot.status.vfr.groundspeed = 14.8 # m/s
autopilot.status.vfr.heading = 90 # 東方
autopilot.status.vfr.throttle = 65 # %
autopilot.status.vfr.climb = 2.5 # m/s
autopilot.status.vfr.timestamp = time.time()
print(f"VFR HUD:")
print(f" 空速: {autopilot.status.vfr.airspeed} m/s")
print(f" 地速: {autopilot.status.vfr.groundspeed} m/s")
print(f" 航向: {autopilot.status.vfr.heading}°")
print(f" 油門: {autopilot.status.vfr.throttle}%")
print(f" 爬升率: {autopilot.status.vfr.climb} m/s\n")
if __name__ == "__main__":
# 執行所有範例
# example_basic_usage()
# example_packet_tracking()
# example_parameters()
# example_rf_module()
# example_multiple_components()
# example_registry()
# example_serialization()
# example_gps_ekf()
example_vfr_hud()
print("\n" + "="*50)
print("所有範例執行完成!")
print("="*50)

@ -13,7 +13,6 @@ import time
import threading
import socket
import asyncio
from unittest.mock import MagicMock, patch
# 導入要測試的模組
from ..fc_network_adapter.mavlinkObject import (
@ -24,62 +23,100 @@ from ..fc_network_adapter.mavlinkObject import (
return_packet_ring
)
# 預先定義好的真實 MAVLink heartbeat 封包 (MAVLink 1.0 格式)
# Format: STX(0xFE) + LEN + SEQ + SYS + COMP + MSG_ID + PAYLOAD(9 bytes for heartbeat) + CRC(2 bytes)
HEARTBEAT_PACKET_1 = bytes([
0xFE, # STX (MAVLink 1.0)
0x09, # payload length (9 bytes)
0x00, # sequence
0x01, # system ID = 1
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
# Payload (9 bytes): custom_mode(4), type(1), autopilot(1), base_mode(1), system_status(1), mavlink_version(1)
0x00, 0x00, 0x00, 0x00, # custom_mode = 0
0x02, # type = MAV_TYPE_QUADROTOR (2)
0x03, # autopilot = MAV_AUTOPILOT_ARDUPILOTMEGA (3)
0x40, # base_mode = MAV_MODE_FLAG_CUSTOM_MODE_ENABLED (64)
0x03, # system_status = MAV_STATE_STANDBY (3)
0x03, # mavlink_version = 3
0x62, 0x8E # CRC (simplified placeholder)
])
HEARTBEAT_PACKET_2 = bytes([
0xFE, # STX
0x09, # payload length
0x01, # sequence (增加)
0x01, # system ID = 1
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
0x00, 0x00, 0x00, 0x00,
0x02, 0x03, 0x41, 0x03, 0x03,
0x33, 0xEC
])
HEARTBEAT_PACKET_3 = bytes([
0xFE, # STX
0x09, # payload length
0x02, # sequence
0x02, # system ID = 2
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
0x00, 0x00, 0x00, 0x00,
0x02, 0x03, 0x42, 0x03, 0x03,
0x37, 0x44
])
class MockMavlinkSocket:
"""模擬 Mavlink Socket 的類別,用於測試"""
"""模擬 Mavlink Socket 的類別,用於測試
使用真實的 MAVLink 封包而不是模擬的訊息對象
"""
def __init__(self, test_data=None):
def __init__(self, test_packets=None):
"""
Args:
test_packets: list of bytes每個元素都是完整的 MAVLink 封包
"""
self.closed = False
self.test_data = test_data or []
self.data_index = 0
self.test_packets = test_packets or []
self.packet_index = 0
self.written_data = []
# 使用 pymavlink 來解析封包
from pymavlink import mavutil
self.mav_parser = mavutil.mavlink.MAVLink(self)
def recv_msg(self):
if not self.test_data or self.data_index >= len(self.test_data):
"""返回解析後的 MAVLink 訊息對象"""
if not self.test_packets or self.packet_index >= len(self.test_packets):
return None
data = self.test_data[self.data_index]
self.data_index += 1
return data
packet = self.test_packets[self.packet_index]
self.packet_index += 1
# 使用 pymavlink 解析封包
try:
for byte in packet:
msg = self.mav_parser.parse_char(bytes([byte]))
if msg:
return msg
except Exception as e:
print(f"Error parsing packet: {e}")
return None
return None
def write(self, data):
"""寫入數據(用於檢查轉發)"""
self.written_data.append(data)
def close(self):
"""關閉 socket"""
self.closed = True
class MockMavlinkMessage:
"""模擬 Mavlink 訊息的類別,用於測試"""
def __init__(self, msg_id, sysid, compid, seq=0):
self.msg_id = msg_id
self.sysid = sysid
self.compid = compid
self.seq = seq
self.msg_buf = bytes([msg_id, sysid, compid, seq])
def get_msgId(self):
return self.msg_id
def get_srcSystem(self):
return self.sysid
def get_srcComponent(self):
return self.compid
def get_seq(self):
return self.seq
def get_msgbuf(self):
return self.msg_buf
def get_type(self):
return f"MSG_ID_{self.msg_id}"
class TestMavlinkObject(unittest.TestCase):
"""測試 mavlink_object 類別"""
"""測試 mavlink_object 類別的獨立功能"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
@ -91,28 +128,19 @@ class TestMavlinkObject(unittest.TestCase):
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建測試訊息
self.heartbeat_msg = MockMavlinkMessage(msg_id=0, sysid=1, compid=1)
self.attitude_msg = MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
self.position_msg = MockMavlinkMessage(msg_id=32, sysid=1, compid=1)
# 創建模擬的 socket # 假的 Mavlink Socket
self.mock_socket = MockMavlinkSocket([
self.heartbeat_msg,
self.attitude_msg,
self.position_msg
])
# 創建模擬的 socket使用真實封包
self.mock_socket = MockMavlinkSocket([HEARTBEAT_PACKET_1])
# 創建測試對象
self.mavlink_obj = mavlink_object(self.mock_socket)
def test_initialization(self):
"""測試 mavlink_object 初始化是否正確"""
# print("Testing mavlink_object initialization")
self.assertEqual(self.mavlink_obj.socket_id, 0)
self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT)
self.assertEqual(len(self.mavlink_obj.target_sockets), 0)
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0])
self.assertEqual(self.mavlink_obj.return_msg_types, [])
def test_add_remove_target_socket(self):
"""測試添加和移除目標端口功能"""
@ -120,6 +148,7 @@ class TestMavlinkObject(unittest.TestCase):
self.assertTrue(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
self.assertEqual(self.mavlink_obj.target_sockets[0], 1)
self.assertTrue(self.mavlink_obj.add_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
self.assertIn(2, self.mavlink_obj.target_sockets)
@ -153,78 +182,31 @@ class TestMavlinkObject(unittest.TestCase):
self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid"))
self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"]))
def test_send_message(self):
"""測試 send_message 功能"""
# 創建一個新的 mavlink_object 實例
mock_socket = MockMavlinkSocket()
mavlink_obj = mavlink_object(mock_socket)
# 準備測試數據
test_message1 = b"test_message_1"
test_message2 = b"test_message_2"
# 測試初始狀態
self.assertEqual(len(mock_socket.written_data), 0)
def test_send_message_validation(self):
"""測試 send_message 的數據驗證功能(不需要啟動 manager"""
# 測試非運行狀態下發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 0)
self.assertFalse(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1))
# 啟動 manager
manager = async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 測試無效的數據類型
self.mavlink_obj.state = MavlinkObjectState.RUNNING # 臨時設置狀態
self.assertFalse(self.mavlink_obj.send_message("invalid"))
self.assertFalse(self.mavlink_obj.send_message(123))
# 添加對象到 manager
manager.add_mavlink_object(mavlink_obj)
time.sleep(0.1) # 等待對象啟動
# 測試太短的封包
self.assertFalse(self.mavlink_obj.send_message(bytes([0xFE, 0x00])))
# 確認對象狀態
self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING)
# 測試發送消息
self.assertTrue(mavlink_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
# 測試無效的起始標記
invalid_packet = bytes([0xFF, 0x09, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00])
self.assertFalse(self.mavlink_obj.send_message(invalid_packet))
# 確認消息已發送
self.assertEqual(len(mock_socket.written_data), 1)
self.assertEqual(mock_socket.written_data[0], test_message1)
# 測試連續發送多條消息
self.assertTrue(mavlink_obj.send_message(test_message2))
time.sleep(0.2) # 等待消息處理
# 確認兩條消息都已發送
self.assertEqual(len(mock_socket.written_data), 2)
self.assertEqual(mock_socket.written_data[1], test_message2)
# 模擬發送出錯的情況
class ErrorWriteSocket(MockMavlinkSocket):
def write(self, data):
raise Exception("Write error")
error_socket = ErrorWriteSocket()
error_obj = mavlink_object(error_socket)
manager.add_mavlink_object(error_obj)
time.sleep(0.1) # 等待對象啟動
# 發送消息到錯誤的 socket
self.assertTrue(error_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
# 測試有效的封包可以加入佇列
self.assertTrue(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(self.mavlink_obj.outgoing_msgs), 1)
# 即使寫入失敗send_message 應該也返回 True
# 因為消息已成功加入到佇列中,只是後續的實際發送失敗
# 停止 manager
manager.stop()
time.sleep(0.5) # 等待 manager 停止
# 測試對象已關閉後發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加
self.mavlink_obj.state = MavlinkObjectState.INIT # 恢復狀態
class TestAsyncIOManager(unittest.TestCase):
"""測試 async_io_manager 類別"""
"""測試 async_io_manager 類別的獨立功能"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
@ -239,14 +221,9 @@ class TestAsyncIOManager(unittest.TestCase):
# 創建 async_io_manager 實例
self.manager = async_io_manager()
# 模擬 mavlink 對象
self.mock_socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
self.mock_socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1)
])
# 創建模擬 mavlink 對象,使用真實封包
self.mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2])
self.mock_socket2 = MockMavlinkSocket([HEARTBEAT_PACKET_3])
self.mavlink_obj1 = mavlink_object(self.mock_socket1)
self.mavlink_obj2 = mavlink_object(self.mock_socket2)
@ -254,10 +231,10 @@ class TestAsyncIOManager(unittest.TestCase):
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.stop()
self.manager.shutdown()
def test_singleton_pattern(self):
"""測試 async_io_manager 的單例模式 就是不會產生兩個 magager"""
"""測試 async_io_manager 的單例模式"""
manager1 = async_io_manager()
manager2 = async_io_manager()
self.assertIs(manager1, manager2)
@ -275,7 +252,7 @@ class TestAsyncIOManager(unittest.TestCase):
self.assertIs(self.manager.thread, old_thread)
# 停止管理器
self.manager.stop()
self.manager.shutdown()
self.assertFalse(self.manager.running)
# 最多等待 5 秒讓線程結束
@ -313,89 +290,115 @@ class TestAsyncIOManager(unittest.TestCase):
self.assertFalse(self.manager.remove_mavlink_object(999))
# 停止管理器
self.manager.stop()
self.manager.shutdown()
def test_data_processing(self):
"""測試數據處理"""
# 創建用於轉發的 mavlink_objects
mock_socket3 = MockMavlinkSocket()
mavlink_obj3 = mavlink_object(mock_socket3)
class TestIntegration(unittest.TestCase):
"""整合測試,測試多個 mavlink_object 之間的互動與資料流"""
# 設置轉發
self.mavlink_obj1.add_target_socket(2) # socket1 轉發到 socket3
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 設置訊息類型
self.mavlink_obj1.set_bridge_message_types([0, 30]) # HEARTBEAT, ATTITUDE
# self.mavlink_obj1.enable_return(True)
self.mavlink_obj1.set_return_message_types([30]) # ATTITUDE
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 創建 async_io_manager 實例
self.manager = async_io_manager()
self.manager.add_mavlink_object(self.mavlink_obj1)
self.manager.add_mavlink_object(mavlink_obj3)
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.shutdown()
# 等待處理完成
time.sleep(1.0)
def test_send_message_with_manager(self):
"""測試透過 async_io_manager 發送訊息的完整流程"""
# 創建一個新的 mavlink_object 實例
mock_socket = MockMavlinkSocket()
mavlink_obj = mavlink_object(mock_socket)
# print("testing Mark A")
# 測試初始狀態
self.assertEqual(len(mock_socket.written_data), 0)
# 檢查 Ring buffer 是否有正確的數據
self.assertEqual(stream_bridge_ring.size(), 2) # HEARTBEAT + ATTITUDE
self.assertEqual(return_packet_ring.size(), 1) # ATTITUDE
# 測試非運行狀態下發送消息
self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(mock_socket.written_data), 0)
a = stream_bridge_ring.get()
print(f"stream_bridge_ring: {a}")
# 啟動 manager
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 檢查是否正確轉發
self.assertEqual(len(mock_socket3.written_data), 2) # HEARTBEAT + ATTITUDE
# 添加對象到 manager
self.manager.add_mavlink_object(mavlink_obj)
time.sleep(0.1) # 等待對象啟動
# print("testing Mark B")
# 確認對象狀態
self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING)
# 停止管理器
self.manager.stop()
# 測試發送消息
self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
time.sleep(0.2) # 等待消息處理
def test_error_handling(self):
"""測試錯誤處理情況"""
print("=== mark A ===")
# 創建一個會引發異常的 socket
class ErrorSocket(MockMavlinkSocket):
def recv_msg(self):
raise Exception("Test exception")
# 確認消息已發送
self.assertEqual(len(mock_socket.written_data), 1)
self.assertEqual(mock_socket.written_data[0], HEARTBEAT_PACKET_1)
error_socket = ErrorSocket()
mavlink_obj_err = mavlink_object(error_socket)
# 測試連續發送多條消息
self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_2))
time.sleep(0.2) # 等待消息處理
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 確認兩條消息都已發送
self.assertEqual(len(mock_socket.written_data), 2)
self.assertEqual(mock_socket.written_data[1], HEARTBEAT_PACKET_2)
self.manager.add_mavlink_object(mavlink_obj_err)
# 停止 manager
self.manager.shutdown()
time.sleep(0.5) # 等待 manager 停止
print("=== mark B ===")
# 測試對象已關閉後發送消息
self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加
# 等待錯誤處理
time.sleep(1.0)
def test_data_processing_and_forwarding(self):
"""測試數據處理與轉發流程"""
# 創建用於轉發的 mavlink_objects
mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2,])
mock_socket3 = MockMavlinkSocket()
# 對象應該進入錯誤狀態,但不會崩潰
# self.assertEqual(mavlink_obj_err.state, MavlinkObjectState.ERROR)
mavlink_obj1 = mavlink_object(mock_socket1)
mavlink_obj3 = mavlink_object(mock_socket3)
print("=== mark C ===")
# 設置訊息類型
mavlink_obj1.set_bridge_message_types([0]) # 只處理 HEARTBEAT
# 管理器應該仍在運行
self.assertTrue(self.manager.running)
# 設置轉發: obj1 -> obj3
mavlink_obj1.add_target_socket(mavlink_obj3.socket_id) # socket1 轉發到 socket3 (socket_id=1)
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 故意等一段時間 確認 socket 有被 manager 處理掉
time.sleep(3)
"""
這邊出現很奇怪的狀況 應該說 設計時沒有考量 但是實測會發現
mavlink_obj3 是接收端 必需要被優先加入 manager 才能正確接收來自 mavlink_obj1 的轉發封包
若先把 mavlink_ojb1 加入 manger 則可能會導致前面幾個封包丟失
"""
self.manager.add_mavlink_object(mavlink_obj3)
self.manager.add_mavlink_object(mavlink_obj1)
# 停止管理器
self.manager.stop()
# 等待處理完成
time.sleep(0.5)
# 檢查 Ring buffer 是否有正確的數據
self.assertGreaterEqual(stream_bridge_ring.size(), 2) # 至少 2 個 HEARTBEAT
class TestIntegration(unittest.TestCase):
"""整合測試,測試多個 mavlink_object 之間的互動"""
# 檢查是否正確轉發
self.assertGreaterEqual(len(mock_socket3.written_data), 2) # 至少 2 個 HEARTBEAT
# 停止管理器
self.manager.shutdown()
def test_bidirectional_forwarding(self):
"""測試雙向轉發"""
@ -406,18 +409,9 @@ class TestIntegration(unittest.TestCase):
return_packet_ring.clear()
# 創建三個 mavlink 對象,模擬三個通道
socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1),
MockMavlinkMessage(msg_id=32, sysid=2, compid=1)
])
socket3 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=3, compid=1),
MockMavlinkMessage(msg_id=33, sysid=3, compid=1)
])
socket1 = MockMavlinkSocket()
socket2 = MockMavlinkSocket()
socket3 = MockMavlinkSocket()
obj1 = mavlink_object(socket1)
obj2 = mavlink_object(socket2)
@ -431,39 +425,44 @@ class TestIntegration(unittest.TestCase):
obj3.add_target_socket(1) # obj3 -> obj2
# 啟動 async_io_manager
manager = async_io_manager()
manager.start()
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加所有 mavlink_object
manager.add_mavlink_object(obj1)
manager.add_mavlink_object(obj2)
manager.add_mavlink_object(obj3)
self.manager.add_mavlink_object(obj1)
self.manager.add_mavlink_object(obj2)
self.manager.add_mavlink_object(obj3)
# 對三個對象添加數據
socket1.test_packets.append(HEARTBEAT_PACKET_1)
socket2.test_packets.append(HEARTBEAT_PACKET_2)
socket3.test_packets.append(HEARTBEAT_PACKET_3)
# 等待處理所有訊息
time.sleep(1.5)
time.sleep(1.0)
# 檢查轉發結果
# socket1 應該收到 socket2 和 socket3 的訊息
self.assertEqual(len(socket1.written_data), 4) # 2 from obj2, 2 from obj3 via obj2
# socket1 應該收到 socket2 的訊息
self.assertGreaterEqual(len(socket1.written_data), 1)
# socket2 應該收到 socket1 和 socket3 的訊息
self.assertEqual(len(socket2.written_data), 4) # 2 from obj1, 2 from obj3
self.assertGreaterEqual(len(socket2.written_data), 2)
# socket3 應該收到 socket1 和 socket2 的訊息
self.assertEqual(len(socket3.written_data), 4) # 2 from obj1 via obj2, 2 from obj2
# socket3 應該收到 socket2 的訊息
self.assertGreaterEqual(len(socket3.written_data), 1)
# 檢查 ring buffer 的數據
# 假設所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0]
# 應該有 3 個 HEARTBEAT 訊息
self.assertEqual(stream_bridge_ring.size(), 3) # 3 HEARTBEAT
# 所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0]
self.assertGreaterEqual(stream_bridge_ring.size(), 3) # 至少 3 個 HEARTBEAT
# 停止管理器
manager.stop()
self.manager.shutdown()
if __name__ == "__main__":
unittest.main(defaultTest="TestMavlinkObject.test_send_message")
# unittest.main(defaultTest="TestAsyncIOManager")
# 可以指定要運行的測試
# unittest.main(defaultTest="TestMavlinkObject.test_send_message_validation")
# unittest.main(defaultTest="TestAsyncIOManager.test_add_remove_objects")
unittest.main(defaultTest="TestIntegration.test_bidirectional_forwarding")
unittest.main()

Loading…
Cancel
Save