@ -29,7 +29,7 @@ from .utils import acquireSerial, acquirePort
from . utils . acquirePort import find_available_port
logger = setup_logger ( os . path . basename ( __file__ ) )
VERSION_NO = " v0.60 "
PROJECT_VER = " v0.61 "
class PanelState :
def __init__ ( self ) :
@ -39,9 +39,9 @@ class PanelState:
self . object_manager_state = " Stopped "
self . serial_manager_state = " Stopped "
self . ros2_manager_state = " Stopped "
self . socket_object_list = [ ] # 已有的 mavlink object
self . socket_object_list = [ ] # 已有的 mavlink object
self . linked_serial_dict = { } # 已連線的 serial 端口 serial id num : serial_port string
self . panel_info_msg_list = [ ] # 顯示在面板上的資訊訊息
self . panel_info_msg_list = [ ] # 顯示在面板上的資訊訊息
# 關於創建通道時的暫存資訊
self . udp_info_temp = { " IP " : " 127.0.0.1 " , " Port " : " " , " Direction " : " " } # 暫存 UDP 設定資訊
@ -102,36 +102,36 @@ class ControlPanel:
def input_dialog ( stdscr , prompt = " 請輸入文字: " ) :
""" 顯示輸入對話框 """
height , width = stdscr . getmaxyx ( )
# 建立輸入視窗
dialog_height = 5
dialog_width = min ( 60 , width - 4 )
start_y = ( height - dialog_height ) / / 2
start_x = ( width - dialog_width ) / / 2
# 建立視窗邊框
dialog_win = curses . newwin ( dialog_height , dialog_width , start_y , start_x )
dialog_win . border ( )
dialog_win . addstr ( 1 , 2 , prompt )
dialog_win . addstr ( 3 , 2 , " 按 Enter 確認, ESC 取消 " )
dialog_win . refresh ( )
# 輸入區域
input_win = curses . newwin ( 1 , dialog_width - 6 , start_y + 2 , start_x + 2 )
input_win . keypad ( True )
curses . echo ( )
curses . curs_set ( 1 )
user_input = " "
while True :
input_win . clear ( )
input_win . addstr ( 0 , 0 , user_input [ - dialog_width + 8 : ] ) # 顯示輸入內容(滾動)
input_win . refresh ( )
ch = input_win . getch ( )
if ch == 27 : # ESC
user_input = None
break
@ -141,16 +141,16 @@ class ControlPanel:
user_input = user_input [ : - 1 ]
elif 32 < = ch < = 126 : # 可打印字符
user_input + = chr ( ch )
curses . noecho ( )
curses . curs_set ( 0 )
# 清理視窗
del input_win
del dialog_win
stdscr . clear ( )
stdscr . refresh ( )
return user_input
# ================ 關於 主要選單 的部份 ===================
@ -179,9 +179,9 @@ class ControlPanel:
] ) ,
MenuNode ( " Vehicles Insp. " , " 檢視已連線的遠端載具 " , action = " INSPECT_VEHICLES " ) ,
MenuNode ( " Engineer Mode " , " 工程模式 " , children = [
MenuNode ( " Stop Manager " , " 停止 Mavlink 物件管理 " , " STOP_MANAGER " ) ,
MenuNode ( " Stop Bridge " , " 停止 Mavlink-ROS 橋接 " , " STOP_BRIDGE " ) ,
MenuNode ( " Stop Serial M. " , " 停止 Serial 端口轉接 " , " STOP_SERIAL_MANAGER " ) ,
MenuNode ( " Stop Manager " , " 停止 Mavlink 物件管理 " , " STOP_MANAGER " ) ,
MenuNode ( " Stop Bridge " , " 停止 Mavlink-ROS 橋接 " , " STOP_BRIDGE " ) ,
MenuNode ( " Stop Serial M. " , " 停止 Serial 端口轉接 " , " STOP_SERIAL_MANAGER " ) ,
] ) ,
MenuNode ( " Shutdown " , " 關閉整個系統 " , children = [
MenuNode ( " Return " , " 繼續運行 " , " BACK " ) ,
@ -191,7 +191,7 @@ class ControlPanel:
def panel_thread ( self , cmd_q : queue . Queue , state : PanelState , stop_evt : threading . Event ) :
stdscr = None
def cleanup ( ) :
""" 清理 curses 狀態 """
if stdscr :
@ -209,7 +209,7 @@ class ControlPanel:
def draw_menu ( screen ) :
nonlocal stdscr
stdscr = screen
curses . curs_set ( 0 )
stdscr . nodelay ( False ) # 阻塞讀鍵
stdscr . keypad ( True )
@ -219,12 +219,12 @@ class ControlPanel:
idx_stack = [ 0 ] # 索引堆疊
state . intoSTART ( ) # 設定狀態為運行中
while not stop_evt . is_set ( ) :
current_menu = menu_stack [ - 1 ]
current_idx = idx_stack [ - 1 ]
# 獲取終端機尺寸
height , width = stdscr . getmaxyx ( )
# 簡單暴力的限制視窗的大小
@ -237,7 +237,7 @@ class ControlPanel:
if height < MIN_HEIGHT or width < 60 :
logger . error ( " Terminal size too small for Control Panel. " )
break
stdscr . clear ( )
stdscr . border ( )
@ -267,26 +267,26 @@ class ControlPanel:
elif child . action == " LINK_SERIAL_TO_MIDDLEWARE_UDP " :
link_status = " Yes " if state . serial_info_temp [ " Go2Middleware " ] else " No "
desc = f " { child . desc } [ { link_status } ] "
line = f " { marker } { child . name : 15s } – { desc } "
attr = curses . A_REVERSE if i == current_idx else curses . A_NORMAL
stdscr . addstr ( start_line + i , 4 , line , attr )
# 顯示訊息區域
# 顯示訊息區域
# info_start_line = start_line + len(current_menu.children) + 1
info_start_line = height - 8
max_msg_lines = 5 # 最多顯示 5 行訊息
current_time = time . time ( )
# 清理過時的訊息
state . panel_info_msg_list = [
( msg , timestamp ) for msg , timestamp in state . panel_info_msg_list
if current_time - timestamp < 2.0 #秒數
]
# 只顯示最新的 max_msg_lines 條訊息
display_msgs = state . panel_info_msg_list [ - max_msg_lines : ]
for i , msg_data in enumerate ( display_msgs ) :
if info_start_line + i > = help_line - 1 : # 避免超出邊界
break
@ -298,13 +298,13 @@ class ControlPanel:
stdscr . addstr ( info_start_line + i , 2 , f " 💬 { msg } " , curses . A_BOLD )
# 操作說明
# help_line = start_line + len(current_menu.children) + 2
help_line = height - 2
stdscr . addstr ( help_line , 2 , " 操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層 " , curses . A_DIM )
stdscr . addstr ( height - 1 , width - 12 , f " { VERSION_NO } " , curses . A_DIM )
stdscr . addstr ( height - 1 , width - 12 , f " { PROJECT_ VER} " , curses . A_DIM )
stdscr . refresh ( )
@ -319,8 +319,8 @@ class ControlPanel:
# continue
break
time . sleep ( 0.1 )
if ( state . mavlink_bridge_state == " Stopped " and
state . object_manager_state == " Stopped " and
if ( state . mavlink_bridge_state == " Stopped " and
state . object_manager_state == " Stopped " and
state . serial_manager_state == " Stopped " ) :
state . intoSTOPPED ( )
# stop_evt.set()
@ -330,14 +330,14 @@ class ControlPanel:
# 設定短暫的 timeout, 讓執行緒能夠響應 stop_evt
stdscr . timeout ( 100 )
ch = stdscr . getch ( )
if ch == - 1 : # 沒有操作
continue
# 處理按鍵
if ch in ( curses . KEY_UP , ord ( ' k ' ) ) :
idx_stack [ - 1 ] = ( current_idx - 1 ) % len ( current_menu . children )
elif ch in ( curses . KEY_DOWN , ord ( ' j ' ) ) :
idx_stack [ - 1 ] = ( current_idx + 1 ) % len ( current_menu . children )
@ -347,14 +347,14 @@ class ControlPanel:
elif ch == ( ord ( ' o ' ) ) :
# 離開工程模式
state . intoSTART ( )
state . intoSTART ( )
elif ch == curses . KEY_LEFT :
# 返回上層
if len ( menu_stack ) > 1 :
menu_stack . pop ( )
idx_stack . pop ( )
elif ch == curses . KEY_RIGHT :
# 進入下層 (但不執行動作)
selected = current_menu . children [ current_idx ]
@ -369,26 +369,26 @@ class ControlPanel:
elif ch in ( curses . KEY_ENTER , 10 , 13 ) :
selected = current_menu . children [ current_idx ]
# 處理不同類型的動作
if selected . children : # 有子選單
menu_stack . append ( selected )
idx_stack . append ( 0 )
elif selected . action == " BACK " :
if len ( menu_stack ) > 1 :
menu_stack . pop ( )
idx_stack . pop ( )
elif selected . action == " QUIT " :
state . intoTERMINATION ( )
pre_panel_shutdown ( )
elif selected . action == " TEXT_UDP_IP " :
result = ControlPanel . input_dialog ( stdscr , " 請輸入監聽的 IP 位址: " )
if result is not None :
state . udp_info_temp [ " IP " ] = result
elif selected . action == " TEXT_UDP_PORT " :
result = ControlPanel . input_dialog ( stdscr , " 請輸入監聽的 Port: " )
if result is not None :
@ -402,7 +402,7 @@ class ControlPanel:
idx_stack . pop ( )
# menu_stack.pop()
# idx_stack.pop()
elif selected . action == " CREATE_UDP_OUTBOUND " :
cmd_q . put ( " CREATE_UDP_OUTBOUND " )
# 確認後回到上兩層
@ -457,8 +457,8 @@ class ControlPanel:
elif selected . action == " LIST_SERIAL_LINKS " :
created_list_menu = self . create_linked_serial_menu ( state , page = 0 )
menu_stack . append ( created_list_menu )
idx_stack . append ( 0 )
idx_stack . append ( 0 )
elif selected . action == " INSPECT_LINKED_SERIAL " :
# 顯示 Serial 連結詳細資訊
if hasattr ( selected , ' serial_id ' ) :
@ -488,7 +488,7 @@ class ControlPanel:
current_list_menu = menu_stack [ - 1 ]
menu_stack . pop ( )
idx_stack . pop ( )
# 依據選單種類 重新建立分頁
if current_list_menu . name == " Serial Port List " :
created_list_menu = self . create_serial_port_menu ( state , page = selected . page )
@ -503,16 +503,16 @@ class ControlPanel:
menu_stack . append ( current_list_menu )
idx_stack . append ( 0 )
continue
menu_stack . append ( created_list_menu )
idx_stack . append ( 0 )
elif selected . action == " INSPECT_MAV_OBJECT " :
# 顯示物件詳細資訊
if hasattr ( selected , ' socket_id ' ) :
cmd_q . put ( ( " INSPECT_MAV_OBJECT " , selected . socket_id ) )
self . show_object_info ( stdscr , selected . socket_id , state )
elif selected . action == " REMOVE_MAV_OBJECT " :
# 移除物件
if hasattr ( selected , ' socket_id ' ) :
@ -575,7 +575,7 @@ class ControlPanel:
created_list_menu = self . create_vehicles_list_menu ( state , page = 0 )
menu_stack . append ( created_list_menu )
idx_stack . append ( 0 )
elif selected . action == " INSPECT_VEHICLE " :
# 顯示載具詳細資訊
if hasattr ( selected , ' sysid ' ) and hasattr ( selected , ' compid ' ) :
@ -585,7 +585,7 @@ class ControlPanel:
elif callable ( selected . action ) :
# 執行函式
cmd_q . put ( selected . action )
try :
curses . wrapper ( draw_menu )
except KeyboardInterrupt :
@ -593,12 +593,12 @@ class ControlPanel:
finally :
cleanup ( )
# ================ 關於 mavlink object 的部份 ===================
# ================ 關於 mavlink object 的部份 ===================
def create_object_list_menu ( self , state : PanelState , page = 0 , items_per_page = 8 ) :
""" 動態創建 mavlink_object 列表選單(支持分頁) """
children = [ ]
if not state . socket_object_list :
children . append ( MenuNode ( " (Empty) " , " 目前沒有連結口 " , None ) )
else :
@ -606,7 +606,7 @@ class ControlPanel:
total_pages = ( total_items + items_per_page - 1 ) / / items_per_page
start_idx = page * items_per_page
end_idx = min ( start_idx + items_per_page , total_items )
# 顯示當前頁的物件
for socket_id in state . socket_object_list [ start_idx : end_idx ] :
# 為每個 socket 創建子選單
@ -656,11 +656,11 @@ class ControlPanel:
dialog_width = min ( 70 , width - 4 )
start_y = ( height - dialog_height ) / / 2
start_x = ( width - dialog_width ) / / 2
dialog_win = curses . newwin ( dialog_height , dialog_width , start_y , start_x )
dialog_win . border ( )
dialog_win . addstr ( 0 , 2 , f " Socket # { socket_id } 詳細資訊 " , curses . A_BOLD )
# 這裡顯示基本資訊
dialog_win . addstr ( 2 , 2 , f " Socket ID : { socket_id } " )
dialog_win . addstr ( 3 , 2 , f " Socket status : { state . socket_info_single . get ( ' socket_state ' , ' N/A ' ) } " )
@ -676,15 +676,15 @@ class ControlPanel:
dialog_win . addstr ( 8 , 2 , f " Switching Targets: { show_str if show_str else ' N/A ' } " )
state . socket_info_single [ ' InfoReady ' ] = False # 重置狀態以便下次使用
dialog_win . addstr ( dialog_height - 2 , 2 , " 按任意鍵返回... " )
dialog_win . refresh ( )
dialog_win . getch ( )
del dialog_win
stdscr . clear ( )
stdscr . refresh ( )
def select_target_socket ( self , stdscr , source_socket_id , state : PanelState , remove_mode = False ) :
""" 選擇目標 socket 的對話框 """
height , width = stdscr . getmaxyx ( )
@ -692,13 +692,13 @@ class ControlPanel:
dialog_width = min ( 50 , width - 4 )
start_y = ( height - dialog_height ) / / 2
start_x = ( width - dialog_width ) / / 2
dialog_win = curses . newwin ( dialog_height , dialog_width , start_y , start_x )
dialog_win . keypad ( True )
title = " 選擇要移除的目標 " if remove_mode else " 選擇轉發目標 "
available_sockets = [ sid for sid in state . socket_object_list if sid != source_socket_id ]
if not available_sockets :
dialog_win . border ( )
dialog_win . addstr ( 0 , 2 , f " { title } " , curses . A_BOLD )
@ -710,24 +710,24 @@ class ControlPanel:
stdscr . clear ( )
stdscr . refresh ( )
return None
selected_idx = 0
while True :
dialog_win . clear ( )
dialog_win . border ( )
dialog_win . addstr ( 0 , 2 , f " { title } " , curses . A_BOLD )
for i , socket_id in enumerate ( available_sockets ) :
marker = " ➤ " if i == selected_idx else " "
attr = curses . A_REVERSE if i == selected_idx else curses . A_NORMAL
dialog_win . addstr ( 2 + i , 2 , f " { marker } Socket # { socket_id } " , attr )
dialog_win . addstr ( dialog_height - 2 , 2 , " Enter確認 ESC取消 " )
dialog_win . refresh ( )
ch = dialog_win . getch ( )
if ch in ( curses . KEY_UP , ord ( ' k ' ) ) :
selected_idx = ( selected_idx - 1 ) % len ( available_sockets )
elif ch in ( curses . KEY_DOWN , ord ( ' j ' ) ) :
@ -749,11 +749,11 @@ class ControlPanel:
def create_serial_port_menu ( self , state : PanelState , page = 0 , items_per_page = 8 ) :
""" 動態創建 serial port 列表選單(支持分頁) """
children = [ ]
# 獲取可用的 Serial 連接埠列表
# serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓
serial_ports = acquireSerial . get_serial_ports_with_filter ( [ ' /dev/ttyUSB* ' , ' /dev/ttyACM* ' ] )
if not serial_ports :
children . append ( MenuNode ( " (Empty) " , " 目前沒有串口設備 " , None ) )
else :
@ -761,7 +761,7 @@ class ControlPanel:
total_pages = ( total_items + items_per_page - 1 ) / / items_per_page
start_idx = page * items_per_page
end_idx = min ( start_idx + items_per_page , total_items )
# 顯示當前頁的串口
for port in serial_ports [ start_idx : end_idx ] :
port_menu = MenuNode ( f " { port } " , children = [
@ -781,7 +781,7 @@ class ControlPanel:
for child in port_menu . children :
child . port = port
children . append ( port_menu )
# 添加分頁控制
if total_pages > 1 :
children . append ( MenuNode ( " --- " , f " 第 { page + 1 } / { total_pages } 頁 " , None ) )
@ -793,7 +793,7 @@ class ControlPanel:
next_node = MenuNode ( " Next ▶ " , " 下頁 " , " NEXT_PAGE " )
next_node . page = page + 1
children . append ( next_node )
children . append ( MenuNode ( " GoUp " , " 回到上層選單 " , " BACK " ) )
menu = MenuNode ( " Serial Port List " , f " 串口列表 (第 { page + 1 } 頁) " , children = children )
menu . current_page = page
@ -802,7 +802,7 @@ class ControlPanel:
def create_linked_serial_menu ( self , state : PanelState , page = 0 , items_per_page = 8 ) :
""" 動態創建 已連線的 serial port 列表選單(支持分頁)並包含後續管理功能 """
children = [ ]
if not state . linked_serial_dict :
children . append ( MenuNode ( " (Empty) " , " 目前沒有連結口 " , None ) )
else :
@ -810,7 +810,7 @@ class ControlPanel:
total_pages = ( total_items + items_per_page - 1 ) / / items_per_page
start_idx = page * items_per_page
end_idx = min ( start_idx + items_per_page , total_items )
# 顯示當前頁的物件
linked_serial_id_list = list ( state . linked_serial_dict . keys ( ) )
for serial_id in linked_serial_id_list [ start_idx : end_idx ] :
@ -825,7 +825,7 @@ class ControlPanel:
for child in obj_menu . children :
child . serial_id = serial_id
children . append ( obj_menu )
# 添加分頁控制
if total_pages > 1 :
children . append ( MenuNode ( " --- " , f " 第 { page + 1 } / { total_pages } 頁 " , None ) )
@ -837,7 +837,7 @@ class ControlPanel:
next_node = MenuNode ( " Next ▶ " , " 下頁 " , " NEXT_PAGE " )
next_node . page = page + 1
children . append ( next_node )
children . append ( MenuNode ( " GoUp " , " 回到上層選單 " , " BACK " ) )
menu = MenuNode ( " Linked Serial List " , f " 連結口列表 (第 { page + 1 } 頁) " , children = children )
menu . current_page = page
@ -859,14 +859,14 @@ class ControlPanel:
dialog_width = min ( 70 , width - 4 )
start_y = ( height - dialog_height ) / / 2
start_x = ( width - dialog_width ) / / 2
dialog_win = curses . newwin ( dialog_height , dialog_width , start_y , start_x )
dialog_win . border ( )
dialog_win . addstr ( 0 , 2 , f " Serial Link # { serial_id } 詳細資訊 " , curses . A_BOLD )
# 從 linked_serial_dict 獲取資訊
serial_info = state . linked_serial_dict . get ( serial_id , { } )
if not serial_info :
dialog_win . addstr ( 2 , 2 , f " 無法取得 Serial # { serial_id } 的資訊 " )
else :
@ -877,17 +877,17 @@ class ControlPanel:
dialog_win . addstr ( 5 , 2 , f " Communication : { state . serial_info_single . get ( ' receiver_type ' , ' N/A ' ) } " )
dialog_win . addstr ( 6 , 2 , f " Target UDP Port : { state . serial_info_single . get ( ' target_port ' , ' N/A ' ) } " )
dialog_win . addstr ( 7 , 2 , f " Status : { state . serial_info_single . get ( ' status ' , ' Running ' ) } " )
# 如果有額外資訊可以繼續添加
if ' thread_alive ' in serial_info :
thread_status = " Alive " if serial_info [ ' thread_alive ' ] else " Stopped "
dialog_win . addstr ( 8 , 2 , f " Thread Status : { thread_status } " )
state . serial_info_single [ ' InfoReady ' ] = False # 重置狀態以便下次使用
dialog_win . addstr ( dialog_height - 2 , 2 , " 按任意鍵返回... " )
dialog_win . refresh ( )
dialog_win . getch ( )
del dialog_win
stdscr . clear ( )
@ -900,7 +900,7 @@ class ControlPanel:
每個 vehicle - component 組合都是獨立的選單項目
"""
children = [ ]
if not state . connected_vehicles_dict :
children . append ( MenuNode ( " (Empty) " , " 目前沒有已連線的載具 " , None ) )
else :
@ -908,22 +908,22 @@ class ControlPanel:
total_pages = ( total_items + items_per_page - 1 ) / / items_per_page
start_idx = page * items_per_page
end_idx = min ( start_idx + items_per_page , total_items )
# vehicle_id_list 現在是 (sysid, compid) 的元組列表
vehicle_comp_list = list ( state . connected_vehicles_dict . keys ( ) )
# 顯示當前頁的物件
for sysid , compid in vehicle_comp_list [ start_idx : end_idx ] :
# 建立顯示名稱
display_name = f " Vehicle # { sysid } - Comp # { compid } "
desc = f " 載具 { sysid } 組件 { compid } "
vehicle_menu = MenuNode ( display_name , desc , " INSPECT_VEHICLE " )
# 將 sysid 和 compid 附加到選單項目上
vehicle_menu . sysid = sysid
vehicle_menu . compid = compid
children . append ( vehicle_menu )
# 添加分頁控制
if total_pages > 1 :
children . append ( MenuNode ( " --- " , f " 第 { page + 1 } / { total_pages } 頁 " , None ) )
@ -935,7 +935,7 @@ class ControlPanel:
next_node = MenuNode ( " Next ▶ " , " 下頁 " , " NEXT_PAGE " )
next_node . page = page + 1
children . append ( next_node )
children . append ( MenuNode ( " GoUp " , " 回到上層選單 " , " BACK " ) )
menu = MenuNode ( " Connected Vehicles " , f " 已連線載具列表 (第 { page + 1 } 頁) " , children = children )
menu . current_page = page
@ -943,7 +943,7 @@ class ControlPanel:
def show_vehicle_info ( self , stdscr , sysid , compid , cmd_q : queue . Queue , state : PanelState ) :
""" 顯示載具組件詳細資訊(動態更新,顯示變化率) """
# 等待資訊準備
start = time . time ( )
while not state . vehicle_info_single . get ( ' InfoReady ' , False ) :
@ -951,30 +951,30 @@ class ControlPanel:
state . panel_info_msg_list . append ( ( " Fail! Vehicle Info NOT Acquired! " , time . time ( ) ) )
return
time . sleep ( 0.05 )
info = state . vehicle_info_single
height , width = stdscr . getmaxyx ( )
dialog_height = min ( 22 , height - 4 )
dialog_width = min ( 70 , width - 4 )
start_y = ( height - dialog_height ) / / 2
start_x = ( width - dialog_width ) / / 2
dialog_win = curses . newwin ( dialog_height , dialog_width , start_y , start_x )
dialog_win . nodelay ( True ) # 非阻塞模式,允許動態更新
dialog_win . keypad ( True )
# MAV_TYPE 名稱對應
MAV_TYPE_NAMES = {
0 : " Generic " , 1 : " Fixed Wing " , 2 : " Quadrotor " , 3 : " Helicopter " ,
4 : " Antenna Tracker " , 5 : " GCS " , 6 : " Airship " , 10 : " Ground Rover " ,
12 : " Boat " , 13 : " Submarine " , 26 : " Gimbal " , 30 : " Camera "
}
# 動態更新迴圈
last_update = time . time ( )
while True :
current_time = time . time ( )
# 每 1 秒重新請求資料
if current_time - last_update > = 1.0 :
# 觸發資料更新(透過 Orchestrator)
@ -989,58 +989,58 @@ class ControlPanel:
# 更新 info 參照
info = state . vehicle_info_single
last_update = current_time
dialog_win . clear ( )
dialog_win . border ( )
dialog_win . addstr ( 0 , 2 , f " Vehicle # { info [ ' sysid ' ] } - Component # { info [ ' compid ' ] } " , curses . A_BOLD )
# === 基礎資訊 ===
dialog_win . addstr ( 2 , 2 , " [Identity] " , curses . A_UNDERLINE )
dialog_win . addstr ( 2 , 32 , " [Connection] " , curses . A_UNDERLINE )
# # MAV Type # 這個用不到
# mav_type = info.get('vehicle_type', 'N/A')
# mav_type_str = f"{mav_type} ({MAV_TYPE_NAMES.get(mav_type, 'Unknown')})" if isinstance(mav_type, int) else str(mav_type)
# dialog_win.addstr(3, 2, f"MAV Type : {mav_type_str}")
# Component Type
dialog_win . addstr ( 3 , 2 , f " Component Type : { info . get ( ' component_type ' , ' N/A ' ) } " )
# Autopilot Type
if info . get ( ' mav_autopilot ' ) is not None :
dialog_win . addstr ( 4 , 2 , f " Autopilot : { info . get ( ' mav_autopilot ' , ' N/A ' ) } " )
# Connection Info
dialog_win . addstr ( 3 , 32 , f " Connection : { info . get ( ' connection_type ' , ' N/A ' ) } " )
dialog_win . addstr ( 4 , 32 , f " Socket ID : # { info . get ( ' socket_id ' , ' N/A ' ) } " )
# === 封包統計 ===
stats = info . get ( ' packet_stats ' , { } )
dialog_win . addstr ( 7 , 2 , " [Packet Statistics] " , curses . A_UNDERLINE )
received = stats . get ( ' received ' , 0 )
lost = stats . get ( ' lost ' , 0 )
loss_rate = stats . get ( ' loss_rate ' , 0.0 )
last_seq = stats . get ( ' last_seq ' , ' N/A ' )
# 計算變化
received_delta = stats . get ( ' received_delta ' , 0 )
lost_delta = stats . get ( ' lost_delta ' , 0 )
# 顯示變化率
recv_str = f " { received : 6d } "
if received_delta > 0 :
recv_str + = f " (+ { received_delta } ↑) "
lost_str = f " { lost : 4d } "
if lost_delta > 0 :
lost_str + = f " (+ { lost_delta } ↑) "
dialog_win . addstr ( 8 , 2 , f " Received : { recv_str } " )
dialog_win . addstr ( 8 , 32 , f " Lost : { lost_str } " )
dialog_win . addstr ( 9 , 2 , f " Loss Rate : { loss_rate : .2f } % " )
dialog_win . addstr ( 9 , 32 , f " Last Seq : { last_seq } " )
# 最後接收時間
last_msg_time = stats . get ( ' last_msg_time ' )
if last_msg_time :
@ -1050,50 +1050,50 @@ class ControlPanel:
dialog_win . addstr ( 10 , 32 , f " Elapsed : { elapsed : .1f } s " )
else :
dialog_win . addstr ( 10 , 2 , " Last Time : N/A " )
# === 訊息類型分佈 ===
dialog_win . addstr ( 12 , 2 , " [Message Types] (Top 12) " , curses . A_UNDERLINE )
msg_counts = info . get ( ' msg_type_counts ' , { } )
# MAVLink 訊息名稱對應(縮寫版本)
MSG_NAMES = {
0 : " HB " , 1 : " SYS_ST " , 24 : " GPS_RAW " , 27 : " RAW_IMU " ,
30 : " ATT " , 32 : " LOC_POS " , 33 : " GLB_POS " , 62 : " NAV_CTL " ,
74 : " VFR_HUD " , 147 : " BATT_ST "
}
# 顯示前 12 個最常見的訊息類型(兩列各 6 個)
msg_items = list ( msg_counts . items ( ) ) [ : 12 ]
line = 13
for i , ( msg_id , count ) in enumerate ( msg_items ) :
msg_name = MSG_NAMES . get ( msg_id , " ??? " )
delta = stats . get ( f ' msg_delta_ { msg_id } ' , 0 )
# 格式化數據
if delta > 0 :
data_str = f " { count } (+ { delta } ↑) "
else :
data_str = f " { count } "
# 格式化顯示:[ID]NAME DATA (ID固定3字符寬度, 右對齊)
display_str = f " [ { msg_id : 3d } ] { msg_name : 8s } { data_str } "
# 左列(偶數索引)或右列(奇數索引)
col = 2 if i % 2 == 0 else 36
row = line + ( i / / 2 )
if row > = dialog_height - 3 : # 避免超出邊界
break
dialog_win . addstr ( row , col , display_str )
# 確保跳過顯示區域
line = line + 6
dialog_win . addstr ( dialog_height - 2 , 2 , " Press R: Reset Stats, C: Unregister, other key to return... " )
dialog_win . refresh ( )
# 檢查是否有按鍵(非阻塞)
ch = dialog_win . getch ( )
if ch in ( ord ( ' R ' ) , ord ( ' r ' ) ) :
@ -1103,10 +1103,10 @@ class ControlPanel:
break
elif ch != - 1 : # 有按鍵則退出
break
# 短暫延遲
time . sleep ( 0.1 )
state . vehicle_info_single [ ' InfoReady ' ] = False
del dialog_win
stdscr . clear ( )
@ -1140,19 +1140,20 @@ class Orchestrator:
self . fc_ros_manager = mros . ros2_manager
self . fc_ros_manager . initialize ( )
self . fc_ros_manager . status_publisher . rate_controller . topic_intervals = {
' position ' : 1.0 ,
' attitude ' : 0.0 ,
' velocity ' : 0.0 ,
' battery ' : 1.0 ,
' vfr_hud ' : 1.0 ,
' mode ' : 0.0 ,
' summary ' : 1.0 ,
' position ' : 1.0 ,
' position_ned ' : 1.0 ,
' attitude ' : 1.0 ,
' velocity ' : 0.0 ,
' battery ' : 1.0 ,
' vfr_hud ' : 1.0 ,
' mode ' : 0.0 ,
' summary ' : 1.0 ,
}
def engageWholeSystem ( self ) :
""" 啟動整個系統 """
# === 1) 面板部分的啟動 ===
self . panel_thread = threading . Thread ( target = self . cPanel . panel_thread , args = ( self . cmd_q , self . panelState , self . stop_evt ) )
self . panel_thread = threading . Thread ( target = self . cPanel . panel_thread , args = ( self . cmd_q , self . panelState , self . stop_evt ) )
self . panel_thread . start ( )
# === 2) async_io_manager & mavlink_bridge 部分的啟動 ===
@ -1215,7 +1216,7 @@ class Orchestrator:
for s_id in mo . mavlink_object . mavlinkObjects :
s_obj = mo . mavlink_object . mavlinkObjects [ s_id ]
if socket_id in s_obj . target_sockets :
self . remove_target_from_object ( s_id , socket_id )
self . remove_target_from_object ( s_id , socket_id )
# 再移除該物件
self . delete_udp_object ( socket_id )
elif action == " MAVOBJ_ADD_TARGET " :
@ -1262,7 +1263,7 @@ class Orchestrator:
elif action == " UNREGISTER_VEHICLE " :
sysid = cmd [ 1 ]
self . vehicle_registry . unregister ( sysid )
elif cmd == " CREATE_UDP_INBOUND " :
self . panelState . udp_info_temp [ " direction " ] = " inbound "
self . create_udp_object ( )
@ -1291,8 +1292,8 @@ class Orchestrator:
logger . error ( f " Unexpected error in main loop: { e } " )
finally :
# 驗證並確保所有模組都被下達關閉訊號
# 若是由面板操作結束系統 這些關閉行為將於 ControlPanel.pre_panel_shutdown() 觸發
# 驗證並確保所有模組都被下達關閉訊號
# 若是由面板操作結束系統 這些關閉行為將於 ControlPanel.pre_panel_shutdown() 觸發
if self . bridge . thread . is_alive ( ) :
if self . bridge . running :
self . bridge . stop ( )
@ -1318,7 +1319,7 @@ class Orchestrator:
self . panel_thread . join ( timeout = 2 )
time . sleep ( 0.5 ) # 等待各模組穩定關閉
logger . info ( " Main orchestrator END! " )
# =============== 面板動作 - Mavlink Object ===============
@ -1339,10 +1340,11 @@ class Orchestrator:
self . occupied_ip_ports [ ip ] . append ( port )
else :
self . occupied_ip_ports [ ip ] = [ port ]
# 外放資訊部分
elif self . panelState . udp_info_temp [ " direction " ] == " outbound " :
connection_string = f " udpout: { self . panelState . udp_info_temp [ ' IP ' ] } : { self . panelState . udp_info_temp [ ' Port ' ] } "
try :
# 檢測這個 connection_string 是否能成功建立 mavlink 連結
mavlink_socket = mavutil . mavlink_connection ( connection_string )
@ -1362,7 +1364,7 @@ class Orchestrator:
mavlink_object . socket_type = socket_type
self . panelState . panel_info_msg_list . append ( ( f " Created UDP { self . panelState . udp_info_temp [ ' direction ' ] } object: { connection_string } " , time . time ( ) ) )
def delete_udp_object ( self , socket_id ) :
""" 移除指定的 mavlink_object """
@ -1406,10 +1408,10 @@ class Orchestrator:
def _update_vehicles_list ( self ) :
""" 更新已連線載具列表(從 vehicle_registry 獲取) """
vehicles_dict = { }
# 從 vehicle_registry 獲取所有載具
all_vehicles = self . vehicle_registry . get_all ( )
for sysid , vehicle in all_vehicles . items ( ) :
# 遍歷每個載具的所有組件
for compid , component in vehicle . components . items ( ) :
@ -1422,9 +1424,9 @@ class Orchestrator:
' connection_via ' : vehicle . connected_via . value ,
' socket_id ' : vehicle . custom_meta . get ( ' socket_id ' , ' N/A ' )
}
self . panelState . connected_vehicles_dict = vehicles_dict
def _prepare_vehicle_info ( self , sysid , compid ) :
""" 準備載具組件的詳細資訊(包含變化率計算) """
vehicle = self . vehicle_registry . get ( sysid )
@ -1433,37 +1435,37 @@ class Orchestrator:
return
socket_id = vehicle . custom_meta . get ( ' socket_id ' , ' N/A ' )
component = vehicle . get_component ( compid )
if not component :
logger . warning ( f " Component { compid } not found in vehicle { sysid } " )
return
stats = component . packet_stats
# 取得之前的統計資料(用於計算變化)
prev_stats = self . panelState . vehicle_info_single . get ( ' prev_stats ' , { } )
prev_received = prev_stats . get ( ' received ' , 0 )
prev_lost = prev_stats . get ( ' lost ' , 0 )
prev_msg_counts = prev_stats . get ( ' msg_counts ' , { } )
# 計算變化率
received_delta = stats . received_count - prev_received
lost_delta = stats . lost_count - prev_lost
# 準備訊息類型計數(排序後取前幾個)
sorted_msg_counts = dict ( sorted (
stats . msg_type_count . items ( ) ,
key = lambda x : x [ 1 ] ,
reverse = True
) [ : 12 ] ) # 取前 12 個最常見的
# 計算每種訊息類型的變化
msg_deltas = { }
for msg_id , count in sorted_msg_counts . items ( ) :
prev_count = prev_msg_counts . get ( msg_id , 0 )
msg_deltas [ f ' msg_delta_ { msg_id } ' ] = count - prev_count
# 更新 vehicle_info_single
socket_type = " N/A "
socket_obj = mo . mavlink_object . mavlinkObjects . get ( socket_id , None )
@ -1481,7 +1483,7 @@ class Orchestrator:
" packet_stats " : {
" received " : stats . received_count ,
" lost " : stats . lost_count ,
" loss_rate " : ( stats . lost_count / stats . received_count * 100
" loss_rate " : ( stats . lost_count / stats . received_count * 100
if stats . received_count > 0 else 0 ) ,
" last_seq " : stats . last_seq ,
" last_msg_time " : stats . last_msg_time ,
@ -1508,8 +1510,8 @@ class Orchestrator:
( f " Fail! Serial Port { self . panelState . serial_info_temp [ ' Port ' ] } already linked. " , time . time ( ) )
)
return
# 獲取可用的 udp port
# 獲取可用的 udp port
udp_port_tmp = find_available_port ( 19000 , 20000 )
# 定義通訊類型映射表
@ -1526,7 +1528,7 @@ class Orchestrator:
( " Please select Communication Type first. " , time . time ( ) )
)
return
# 查找對應的通訊類型
comm_type_tmp = COMM_TYPE_MAP . get ( comm_type )
if comm_type_tmp is None :
@ -1554,12 +1556,31 @@ class Orchestrator:
self . panelState . udp_info_temp [ ' Port ' ] = str ( udp_port_tmp )
self . panelState . udp_info_temp [ ' direction ' ] = " inbound "
self . create_udp_object ( " SERIAL_XBEE " )
def main ( ) :
stop_evt = threading . Event ( )
# =========== 各項模組的版本先驗 ===========
# 除非你有在做這幾項模組的改版 不然動到這邊的版本號 代表執行環境有很大的問題!!!!!!
version_check = True
if mo . MODULE_VER != " 1.50 " :
print ( " Module Version Error! : mavlinkObkect " )
version_check = False
if mros . MODULE_VER != " 1.50 " :
print ( " Module Version Error! : mavlinkROS2Nodes " )
version_check = False
if mvv . MODULE_VER != " 1.00 " :
print ( " Module Version Error! : mavlinkVehicleView " )
version_check = False
if sm . MODULE_VER != " 0.60 " :
print ( " Module Version Error! : serialManager " )
version_check = False
if version_check == False :
print ( " Environment Obstacle! Check YOUR Execution System Path First!! " )
return
# ========================================
stop_evt = threading . Event ( )
def signal_handler ( signum , frame ) :
""" 處理 Ctrl+C 信號 藉此訊號 會結束下面的 while 迴圈 並逐步關閉各模組 """
stop_evt . set ( )
@ -1593,4 +1614,7 @@ if __name__ == "__main__":
4. 註解掉無效代碼 action == " UPDATE_VEHICLES_LIST " 區塊
5. 系統納入 mavlink ROS2 Manager 模組
2025.04 .13
1. 加入各項模組的版本先驗 檢驗失敗就直接離開
'''