Compare commits

...

2 Commits

Author SHA1 Message Date
Chiyu Chen 6a71e4530f 1. 大量添增終端機介面功能
2. 優化 mavlink_object 與 manager 的流程
5 months ago
Chiyu Chen 7af138b02a 1. 調整檔案結構 變動 import 的路徑與方法
2. 新增 mainOrchestrator.py 作為接下來開發介面化統合工具的主要檔案
6 months ago

@ -33,6 +33,16 @@ Package 簡述
2. fc_network_adapter
建立、維護與飛控韌體的連接
構築 mavlink 封包
同時處理與 Gazebo 的 ardupilot_plugin 溝通的 FDM/JSON 訊息
處理無線模組的通訊格式 (XBee)
--同時處理與 Gazebo 的 ardupilot_plugin 溝通的 FDM/JSON 訊息 (移除)--
N. logs 是執行時期的記錄檔
===
請一律在 ~/AirTrapMine/src/ 資料夾下 以模組化啟動程式
例如 在 ~/AirTrapMine/src/ 資料夾下
> python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
> python -m fc_network_adapter.tests.test_ringBuffer
> python -m fc_network_adapter.tests.demo_integration

@ -0,0 +1,786 @@
'''
主要調配流程的程式
這個檔案包含 Terminal Utility Layer (TUL) 作為人機互動介面並調用 mavlinkDevice mavlinkObject 來處理 MAVLink 通訊和物件管理
'''
import os
import time
import sys
import curses
import threading
import queue
import signal
from pymavlink import mavutil
# 自定義的 import
from . import mavlinkObject as mo
from .utils import RingBuffer, setup_logger
logger = setup_logger(os.path.basename(__file__))
class PanelState:
def __init__(self):
self.panel_status = "Idle"
termination_start_time = None
self.mavlink_bridge_state = "Stopped"
self.object_manager_state = "Stopped"
self.socket_object_list = []
self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息
# 這邊是儲存關於 socket object 的資料
self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊
self.socket_info_single = {"socket_type": "", "socket_state": "", "bridge_msg_types": "", "return_msg_types": "",
"target_sockets": "", "primary_socket_id": "", "InfoReady": False} # 暫存單一 socket 的資訊
def intoSTART(self):
self.panel_status = "Running"
def intoTERMINATION(self):
self.termination_start_time = time.time()
self.panel_status = "Terminating"
def intoENGINEER(self):
self.panel_status = "Engineer"
def intoSTOPPED(self):
self.panel_status = "Stopped"
def set_user_input(self, text):
self.user_input = text
class MenuNode:
def __init__(self, name, desc="", action=None, children=None):
self.name = name
self.desc = desc
self.action = action # 可以是函式或特殊字串
self.children = children or [] # 子選單列表
class ControlPanel:
def __init__(self):
pass
def input_dialog(stdscr, prompt="請輸入文字: "):
"""顯示輸入對話框"""
height, width = stdscr.getmaxyx()
# 建立輸入視窗
dialog_height = 5
dialog_width = min(60, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
# 建立視窗邊框
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(1, 2, prompt)
dialog_win.addstr(3, 2, "按 Enter 確認, ESC 取消")
dialog_win.refresh()
# 輸入區域
input_win = curses.newwin(1, dialog_width - 6, start_y + 2, start_x + 2)
input_win.keypad(True)
curses.echo()
curses.curs_set(1)
user_input = ""
while True:
input_win.clear()
input_win.addstr(0, 0, user_input[-dialog_width+8:]) # 顯示輸入內容(滾動)
input_win.refresh()
ch = input_win.getch()
if ch == 27: # ESC
user_input = None
break
elif ch in (curses.KEY_ENTER, 10, 13): # Enter
break
elif ch in (curses.KEY_BACKSPACE, 127, 8): # Backspace
user_input = user_input[:-1]
elif 32 <= ch <= 126: # 可打印字符
user_input += chr(ch)
curses.noecho()
curses.curs_set(0)
# 清理視窗
del input_win
del dialog_win
stdscr.clear()
stdscr.refresh()
return user_input
def create_object_list_menu(self, state: PanelState, page=0, items_per_page=5):
"""動態創建 mavlink_object 列表選單(支持分頁)"""
children = []
if not state.socket_object_list:
children.append(MenuNode("(空)", "目前沒有連結口", None))
else:
total_items = len(state.socket_object_list)
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的物件
for socket_id in state.socket_object_list[start_idx:end_idx]:
# 為每個 socket 創建子選單
obj_menu = MenuNode(f"Socket #{socket_id}", f"連結口 {socket_id}", None, children=[
MenuNode("Info", "查看詳細資訊", "INSPECT_MAV_OBJECT"),
MenuNode("Make Link", "建立轉發連結", "MAVOBJ_MAKE_LINK"),
MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"),
MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"),
MenuNode("Remove Target", "移除轉發目標(工程)", "MAVOBJ_REMOVE_TARGET"),
MenuNode("返回", "回到列表", "BACK"),
])
# 將 socket_id 附加到每個子選單項目上
for child in obj_menu.children:
child.socket_id = socket_id
children.append(obj_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", "---", None))
if page > 0:
prev_node = MenuNode("◀ 上一頁", f"{page}/{total_pages}", "PREV_PAGE")
prev_node.page = page - 1
children.append(prev_node)
if page < total_pages - 1:
next_node = MenuNode("下一頁 ▶", f"{page + 2}/{total_pages}", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("返回", "回到上層選單", "BACK"))
menu = MenuNode("Object List", f"連結口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
return menu
def show_object_info(self, stdscr, socket_id, state: PanelState):
"""顯示物件詳細資訊的對話框"""
height, width = stdscr.getmaxyx()
dialog_height = 15
dialog_width = min(70, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(0, 2, f" Socket #{socket_id} 詳細資訊 ", curses.A_BOLD)
while not state.socket_info_single.get('InfoReady', False):
time.sleep(0.05) # 等待資訊準備好
# 這裡顯示基本資訊
dialog_win.addstr(2, 2, f"Socket ID : {socket_id}")
dialog_win.addstr(3, 2, f"Socket status : 運行中")
# show_str = ", ".join(map(str, state.socket_info_single.get('socket_type', '')))
dialog_win.addstr(4, 2, f"Socket Type : {state.socket_info_single.get('socket_type', '')}")
show_str = ",".join(map(str, state.socket_info_single.get('bridge_msg_types', '')))
dialog_win.addstr(5, 2, f"Bridge Pack : {show_str if show_str else 'N/A'}")
show_str = ",".join(map(str, state.socket_info_single.get('return_msg_types', '')))
dialog_win.addstr(6, 2, f"Return Pack : {show_str if show_str else 'N/A'}")
dialog_win.addstr(7, 2, f"Primary Socket ID: {state.socket_info_single.get('primary_socket_id', 'It Self')}")
show_str = ",".join(map(str, state.socket_info_single.get('target_sockets', '')))
dialog_win.addstr(8, 2, f"Switching Targets: {show_str if show_str else 'N/A'}")
state.socket_info_single['InfoReady'] = False # 重置狀態以便下次使用
dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...")
dialog_win.refresh()
dialog_win.getch()
del dialog_win
stdscr.clear()
stdscr.refresh()
def select_target_socket(self, stdscr, source_socket_id, state: PanelState, remove_mode=False):
"""選擇目標 socket 的對話框"""
height, width = stdscr.getmaxyx()
dialog_height = min(15, len(state.socket_object_list) + 5)
dialog_width = min(50, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.keypad(True)
title = "選擇要移除的目標" if remove_mode else "選擇轉發目標"
available_sockets = [sid for sid in state.socket_object_list if sid != source_socket_id]
if not available_sockets:
dialog_win.border()
dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD)
dialog_win.addstr(2, 2, "沒有可用的目標")
dialog_win.addstr(4, 2, "按任意鍵返回...")
dialog_win.refresh()
dialog_win.getch()
del dialog_win
stdscr.clear()
stdscr.refresh()
return None
selected_idx = 0
while True:
dialog_win.clear()
dialog_win.border()
dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD)
for i, socket_id in enumerate(available_sockets):
marker = "" if i == selected_idx else " "
attr = curses.A_REVERSE if i == selected_idx else curses.A_NORMAL
dialog_win.addstr(2 + i, 2, f"{marker} Socket #{socket_id}", attr)
dialog_win.addstr(dialog_height - 2, 2, "Enter確認 ESC取消")
dialog_win.refresh()
ch = dialog_win.getch()
if ch in (curses.KEY_UP, ord('k')):
selected_idx = (selected_idx - 1) % len(available_sockets)
elif ch in (curses.KEY_DOWN, ord('j')):
selected_idx = (selected_idx + 1) % len(available_sockets)
elif ch in (curses.KEY_ENTER, 10, 13):
result = available_sockets[selected_idx]
del dialog_win
stdscr.clear()
stdscr.refresh()
return result
elif ch == 27: # ESC
del dialog_win
stdscr.clear()
stdscr.refresh()
return None
def menu_tree(self):
"""建立多層選單結構"""
return MenuNode("Main Menu", children=[
MenuNode("MavLink Object", "控制 MavLink 物件", children=[
MenuNode("New+", children=[
MenuNode("UDP InBound", children=[
MenuNode("IP(Listen)", "設定監聽的 IP 位址", "TEXT_UDP_IP"),
MenuNode("Port(Listen)", "設定監聽的 Port", "TEXT_UDP_PORT"),
MenuNode("Create", "建立 UDP InBound 連結口", "CREATE_UDP_INBOUND"),
]),
MenuNode("UDP OutBound", children=[
MenuNode("IP(Target)", "設定目標的 IP 位址", "TEXT_UDP_IP"),
MenuNode("Port(Target)", "設定目標的 Port", "TEXT_UDP_PORT"),
MenuNode("Create", "建立 UDP OutBound 連結口", "CREATE_UDP_OUTBOUND"),
]),
]),
MenuNode("ListAll", "顯示並管理所有連結口", "LIST_MAV_OBJECT"),
]),
MenuNode("Engineer Mode", "工程模式", children=[
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), #TODO: 尚未實作
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), #TODO: 尚未實作
]),
MenuNode("Shutdown", "關閉整個系統", children=[
MenuNode("Return", "繼續運行", "BACK"),
MenuNode("Confirm", "關閉系統", "QUIT"),
]),
])
def panel_thread(self, cmd_q: queue.Queue, state: PanelState, stop_evt: threading.Event):
stdscr = None
def cleanup():
"""清理 curses 狀態"""
if stdscr:
stdscr.keypad(False)
curses.nocbreak()
curses.echo()
curses.endwin()
def panel_shutdown():
# 先關閉所有模組 再關閉面板
cmd_q.put("SHUTDOWN_BRIDGE")
cmd_q.put("SHUTDOWN_MANAGER")
def draw_menu(screen):
nonlocal stdscr
stdscr = screen
curses.curs_set(0)
stdscr.nodelay(False) # 阻塞讀鍵
stdscr.keypad(True)
# 選單導航狀態
menu_stack = [self.menu_tree()] # 選單堆疊
idx_stack = [0] # 索引堆疊
state.intoSTART() # 設定狀態為運行中
while not stop_evt.is_set():
# 檢查是否需要退出
if stop_evt.is_set():
break
current_menu = menu_stack[-1]
current_idx = idx_stack[-1]
# 獲取終端機尺寸
height, width = stdscr.getmaxyx()
# 簡單暴力的限制視窗的大小
if height < 20 or width < 60:
logger.error("Terminal size too small for Control Panel.")
break
stdscr.clear()
stdscr.border()
stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD)
stdscr.addstr(1, 2, f" Panel Status : {state.panel_status}")
stdscr.addstr(2, 2, f"mavlink Bridge State : {state.mavlink_bridge_state}")
stdscr.addstr(3, 2, f"Object Manager State : {state.object_manager_state}")
stdscr.addstr(4, 2, f"Socket Object number : {len(state.socket_object_list)}")
# # 更新模組狀態顯示
# stdscr.addstr(2, 25, f"{state.mavlink_bridge_state}")
# stdscr.addstr(3, 25, f"{state.object_manager_state}")
# stdscr.addstr(4, 25, f"{len(state.socket_object_list)} ")
# 顯示當前選單項目
start_line = 6
for i, child in enumerate(current_menu.children):
marker = "" if i == current_idx else " "
# 動態顯示已輸入的值
desc = child.desc
if child.action == "TEXT_UDP_IP" and state.udp_info_temp["IP"]:
desc = f"{child.desc} [{state.udp_info_temp['IP']}]"
elif child.action == "TEXT_UDP_PORT" and state.udp_info_temp["Port"]:
desc = f"{child.desc} [{state.udp_info_temp['Port']}]"
line = f"{marker}{child.name:15s} {desc}"
attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL
stdscr.addstr(start_line + i, 4, line, attr)
# 顯示訊息區域
# info_start_line = start_line + len(current_menu.children) + 1
info_start_line = height - 8
max_msg_lines = 5 # 最多顯示 5 行訊息
current_time = time.time()
# 清理過時的訊息
state.panel_info_msg_list = [
(msg, timestamp) for msg, timestamp in state.panel_info_msg_list
if current_time - timestamp < 2.0 #秒數
]
# 只顯示最新的 max_msg_lines 條訊息
display_msgs = state.panel_info_msg_list[-max_msg_lines:]
for i, msg_data in enumerate(display_msgs):
if info_start_line + i >= help_line - 1: # 避免超出邊界
break
msg = msg_data[0] if isinstance(msg_data, tuple) else msg_data
# 截斷過長的訊息
max_msg_width = width - 6
if len(msg) > max_msg_width:
msg = msg[:max_msg_width-3] + "..."
stdscr.addstr(info_start_line + i, 2, f"💬 {msg}", curses.A_BOLD)
# 操作說明
# help_line = start_line + len(current_menu.children) + 2
help_line = height - 2
stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層 q退出", curses.A_DIM)
stdscr.refresh()
# 若進入 TERMINATION 狀態,畫面可以刷新 但是不能操作
# 驗證 bridge 跟 manager 狀態 兩者都停止後 就進入 STOPPED 狀態並跳出迴圈
# 超過幾秒沒有反應就強制關閉
if state.panel_status == "Terminating":
if time.time() - state.termination_start_time > 3:
logger.warning("Control Panel forced shutdown after timeout.")
state.intoSTOPPED()
stop_evt.set()
continue
time.sleep(0.1)
if state.mavlink_bridge_state == "Stopped" and state.object_manager_state == "Stopped":
state.intoSTOPPED()
stop_evt.set()
continue
# 設定短暫的 timeout讓執行緒能夠響應 stop_evt
stdscr.timeout(100) # 100ms timeout
ch = stdscr.getch()
if ch == -1: # timeout繼續檢查 stop_evt
continue
# 處理按鍵
if ch in (curses.KEY_UP, ord('k')):
idx_stack[-1] = (current_idx - 1) % len(current_menu.children)
elif ch in (curses.KEY_DOWN, ord('j')):
idx_stack[-1] = (current_idx + 1) % len(current_menu.children)
elif ch == (ord('O')):
# 直接進入工程模式
state.intoENGINEER()
elif ch == curses.KEY_LEFT:
# 返回上層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
elif ch == curses.KEY_RIGHT:
# 進入下層 (但不執行動作)
selected = current_menu.children[current_idx]
if selected.children: # 有子選單
menu_stack.append(selected)
idx_stack.append(0)
elif ch in (ord('q'), 27):
state.intoTERMINATION()
panel_shutdown()
elif ch in (curses.KEY_ENTER, 10, 13):
selected = current_menu.children[current_idx]
# 處理不同類型的動作
if selected.children: # 有子選單
menu_stack.append(selected)
idx_stack.append(0)
elif selected.action == "BACK":
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
elif selected.action == "QUIT":
state.intoTERMINATION()
panel_shutdown()
elif selected.action == "TEXT_UDP_IP":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 IP 位址: ")
if result is not None:
state.udp_info_temp["IP"] = result
elif selected.action == "TEXT_UDP_PORT":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 Port: ")
if result is not None:
state.udp_info_temp["Port"] = result
elif selected.action == "CREATE_UDP_INBOUND":
cmd_q.put("CREATE_UDP_INBOUND")
# 確認後回到上兩層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
menu_stack.pop()
idx_stack.pop()
elif selected.action == "CREATE_UDP_OUTBOUND":
cmd_q.put("CREATE_UDP_OUTBOUND")
# 確認後回到上兩層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
menu_stack.pop()
idx_stack.pop()
elif selected.action == "LIST_MAV_OBJECT":
# 動態生成 mavlink_object 列表選單
object_list_menu = self.create_object_list_menu(state, page=0)
menu_stack.append(object_list_menu)
idx_stack.append(0)
elif selected.action == "PREV_PAGE":
# 上一頁
if hasattr(selected, 'page'):
menu_stack.pop()
idx_stack.pop()
object_list_menu = self.create_object_list_menu(state, page=selected.page)
menu_stack.append(object_list_menu)
idx_stack.append(0)
elif selected.action == "NEXT_PAGE":
# 下一頁
if hasattr(selected, 'page'):
menu_stack.pop()
idx_stack.pop()
object_list_menu = self.create_object_list_menu(state, page=selected.page)
menu_stack.append(object_list_menu)
idx_stack.append(0)
elif selected.action == "INSPECT_MAV_OBJECT":
# 顯示物件詳細資訊
if hasattr(selected, 'socket_id'):
cmd_q.put(("INSPECT_MAV_OBJECT", selected.socket_id))
self.show_object_info(stdscr, selected.socket_id, state)
elif selected.action == "REMOVE_MAV_OBJECT":
# 移除物件
if hasattr(selected, 'socket_id'):
cmd_q.put(("REMOVE_OBJECT", selected.socket_id))
# 返回上層(回到列表)
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
# 反正刷新列表會出錯 乾脆再退一層 在下一次進入列表時刷新就好
menu_stack.pop()
idx_stack.pop()
# # 刷新列表頁面
# if len(menu_stack) > 1:
# current_page = menu_stack[-1].current_page if hasattr(menu_stack[-1], 'current_page') else 0
# menu_stack.pop()
# idx_stack.pop()
# time.sleep(0.1) # 等待物件被移除
# object_list_menu = self.create_object_list_menu(state, page=current_page)
# menu_stack.append(object_list_menu)
# idx_stack.append(0)
elif selected.action == "MAVOBJ_MAKE_LINK":
# 建立轉發連結
if hasattr(selected, 'socket_id'):
target_id = self.select_target_socket(stdscr, selected.socket_id, state)
if target_id is not None:
cmd_q.put(("MAVOBJ_MAKE_LINK", selected.socket_id, target_id))
elif selected.action == "MAVOBJ_ADD_TARGET":
# 添加目標端口
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
if hasattr(selected, 'socket_id'):
target_id = self.select_target_socket(stdscr, selected.socket_id, state)
if target_id is not None:
cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id))
elif selected.action == "MAVOBJ_REMOVE_TARGET":
# 移除目標端口
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
if hasattr(selected, 'socket_id'):
target_id = self.select_target_socket(stdscr, selected.socket_id, state, remove_mode=True)
if target_id is not None:
cmd_q.put(("MAVOBJ_REMOVE_TARGET", selected.socket_id, target_id))
elif selected.action == "STOP_MANAGER":
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
cmd_q.put("SHUTDOWN_MANAGER")
elif selected.action == "STOP_BRIDGE":
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
cmd_q.put("SHUTDOWN_BRIDGE")
elif callable(selected.action):
# 執行函式
cmd_q.put(selected.action)
try:
curses.wrapper(draw_menu)
except KeyboardInterrupt:
pass
finally:
stop_evt.set()
cleanup()
class Orchestrator:
def __init__(self, stop_sig):
self.stop_evt = stop_sig
# === 1) 面板部分的準備 ===
self.cmd_q = queue.Queue()
self.panelState = PanelState() # 面板的狀態儲存
self.cPanel = ControlPanel() # 面板的功能
self.panel_thread = None
# === 2) async_io_manager & mavlink_bridge 部分的準備 ===
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
self.manager = mo.async_io_manager()
self.bridge = mo.mavlink_bridge()
def engageWholeSystem(self):
"""啟動整個系統"""
# === 1) 面板部分的啟動 ===
self.panel_thread = threading.Thread(target=self.cPanel.panel_thread, args=(self.cmd_q, self.panelState, self.stop_evt))
self.panel_thread.start()
# === 2) async_io_manager & mavlink_bridge 部分的啟動 ===
self.manager.start()
self.bridge.start()
def mainLoop(self):
logger.info("Main orchestrator started <-")
try:
while not self.stop_evt.is_set():
# A. 更新模組狀態
if self.manager.running:
self.panelState.object_manager_state = 'Running'
else:
self.panelState.object_manager_state = 'Stopped'
socketid_list = self.manager.get_managed_objects()
self.panelState.socket_object_list = socketid_list
if self.bridge.running:
self.panelState.mavlink_bridge_state = 'Running'
else:
self.panelState.mavlink_bridge_state = 'Stopped'
# 取出面板丟過來的「動作」
try:
cmd = self.cmd_q.get_nowait()
if callable(cmd):
cmd() # 執行對應動作
elif isinstance(cmd, tuple):
# 處理多參數命令
action = cmd[0]
if action == "REMOVE_OBJECT":
socket_id = cmd[1]
# 先移除所有跟他關聯的 target sockets
for s_id in mo.mavlink_object.mavlinkObjects:
s_obj = mo.mavlink_object.mavlinkObjects[s_id]
if socket_id in s_obj.target_sockets:
self.remove_target_from_object(s_id, socket_id)
# 再移除該物件
self.delete_mavlink_object(socket_id)
elif action == "MAVOBJ_MAKE_LINK":
source_id, target_id = cmd[1], cmd[2]
self.add_target_to_object(source_id, target_id)
self.add_target_to_object(target_id, source_id) # 雙向連結
elif action == "MAVOBJ_ADD_TARGET":
source_id, target_id = cmd[1], cmd[2]
self.add_target_to_object(source_id, target_id)
elif action == "MAVOBJ_REMOVE_TARGET":
source_id, target_id = cmd[1], cmd[2]
self.remove_target_from_object(source_id, target_id)
elif action == "INSPECT_MAV_OBJECT":
socket_id = cmd[1]
mav_obj = self.manager.managed_objects.get(socket_id, None)
if mav_obj:
self.panelState.socket_info_single["socket_type"] = mav_obj.socket_type
# self.panelState.socket_info_single["socket_state"] = "Running" if mav_obj.running
self.panelState.socket_info_single["bridge_msg_types"] = mav_obj.bridge_msg_types
self.panelState.socket_info_single["return_msg_types"] = mav_obj.return_msg_types
self.panelState.socket_info_single["primary_socket_id"] = mav_obj.primary_socket_id
self.panelState.socket_info_single["target_sockets"] = mav_obj.target_sockets
self.panelState.socket_info_single["InfoReady"] = True # 標記資訊已準備好
elif cmd == "CREATE_UDP_INBOUND":
self.panelState.udp_info_temp["direction"] = "inbound"
self.create_udp_object()
elif cmd == "CREATE_UDP_OUTBOUND":
self.panelState.udp_info_temp["direction"] = "outbound"
self.create_udp_object()
elif cmd == "SHUTDOWN_BRIDGE":
self.bridge.stop()
elif cmd == "SHUTDOWN_MANAGER":
self.manager.shutdown()
except queue.Empty:
pass
except Exception as e:
logger.error(f"Error processing command: {e}")
time.sleep(0.1)
except KeyboardInterrupt:
pass
except Exception as e:
logger.error(f"Unexpected error in main loop: {e}")
finally:
logger.info("Main orchestrator END!")
# 關閉 mavlink_bridge (裡面有一個執行緒)
self.bridge.stop()
# 關閉 async_io_manager (裡面有一個執行緒)
self.manager.shutdown()
# 關閉面板執行緒
if self.panel_thread.is_alive():
self.panel_thread.join(timeout=2)
def create_udp_object(self):
if self.panelState.udp_info_temp["direction"] == "inbound":
connection_string = f"udp:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}"
elif self.panelState.udp_info_temp["direction"] == "outbound":
connection_string = f"udpout:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}"
try:
mavlink_socket = mavutil.mavlink_connection(connection_string)
except Exception as e:
self.panelState.panel_info_msg_list.append((f"Failed to create UDP {self.panelState.udp_info_temp['direction']} object: {e}", time.time()-1))
return
mavlink_object = mo.mavlink_object(mavlink_socket)
mavlink_object.socket_type = "UDP " + self.panelState.udp_info_temp['direction'].capitalize()
self.manager.add_mavlink_object(mavlink_object)
self.panelState.panel_info_msg_list.append((f"Created UDP {self.panelState.udp_info_temp['direction']} object: {connection_string}", time.time()))
def delete_mavlink_object(self, socket_id):
"""移除指定的 mavlink_object"""
self.manager.remove_mavlink_object(socket_id)
def add_target_to_object(self, source_id, target_id):
"""為 mavlink_object 添加轉發目標"""
if source_id in mo.mavlink_object.mavlinkObjects:
source_obj = mo.mavlink_object.mavlinkObjects[source_id]
else:
self.panelState.panel_info_msg_list.append((f"Source object {source_id} not found", time.time()))
return
if source_obj.add_target_socket(target_id):
self.panelState.panel_info_msg_list.append((f"Added target {target_id} to socket {source_id}", time.time()))
else:
self.panelState.panel_info_msg_list.append((f"Fail Adding target {target_id} to socket {source_id}", time.time()))
def remove_target_from_object(self, source_id, target_id):
"""從 mavlink_object 移除轉發目標"""
if source_id in mo.mavlink_object.mavlinkObjects:
source_obj = mo.mavlink_object.mavlinkObjects[source_id]
else:
self.panelState.panel_info_msg_list.append((f"Source object {source_id} not found", time.time()))
return
if source_obj.remove_target_socket(target_id):
self.panelState.panel_info_msg_list.append((f"Removed target {target_id} from socket {source_id}", time.time()))
else:
self.panelState.panel_info_msg_list.append((f"Fail Removing target {target_id} from socket {source_id}", time.time()))
def main():
stop_evt = threading.Event()
def signal_handler(signum, frame):
"""處理 Ctrl+C 信號 藉此訊號 會結束下面的 while 迴圈 並逐步關閉各模組"""
stop_evt.set()
# 註冊信號處理器
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
orchestrator = Orchestrator(stop_evt)
orchestrator.engageWholeSystem()
orchestrator.mainLoop() # 程式會 block 在這邊 直到收到中斷信號
if __name__ == "__main__":
main()

@ -1,10 +1,12 @@
import os
# 自定義的 import
from .theLogger import setup_logger
from .utils import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkDevice.py")
logger = setup_logger(os.path.basename(__file__))
# 用來記錄每個 system 的資訊
# 資料格式 { sysid : mavlink_device object }

File diff suppressed because it is too large Load Diff

@ -9,6 +9,8 @@
publisher topic name 命名規則為 <前綴詞>/s<sysid>/<具體 topic>
'''
import os
# ROS2 的 import
import std_msgs.msg
import sensor_msgs.msg
@ -18,9 +20,9 @@ import mavros_msgs.msg
import math
# 自定義的 import
from .theLogger import setup_logger
from .utils import setup_logger
logger = setup_logger("mavlinkPublish.py")
logger = setup_logger(os.path.basename(__file__))
class mavlink_publisher():

@ -0,0 +1,435 @@
"""
VehicleView - Pure State Container
純粹的狀態容器不主動通訊不背景下載參數不搶 RF/MAVLink 流量
只提供狀態存取介面由外部手動餵資料push state
"""
import os
from typing import Dict, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
# ====================== Enums =====================
class ConnectionType(Enum):
"""連接類型"""
SERIAL = "serial"
UDP = "udp"
TCP = "tcp"
OTHER = "other"
class ComponentType(Enum):
"""組件類型"""
AUTOPILOT = "autopilot"
GCS = "gcs"
CAMERA = "camera"
GIMBAL = "gimbal"
OTHER = "other"
class RFModuleType(Enum):
"""RF模組類型"""
XBEE = "xbee"
UDP = "udp"
TCP = "tcp"
OTHER = "other"
# ====================== Data Classes =====================
@dataclass
class Position:
"""位置資訊"""
latitude: Optional[float] = None # 緯度 (度)
longitude: Optional[float] = None # 經度 (度)
altitude: Optional[float] = None # 海拔高度 (公尺)
relative_altitude: Optional[float] = None # 相對高度 (公尺)
timestamp: Optional[float] = None # 時間戳記
@dataclass
class Attitude:
"""姿態資訊"""
roll: Optional[float] = None # 橫滾角 (弧度)
pitch: Optional[float] = None # 俯仰角 (弧度)
yaw: Optional[float] = None # 偏航角 (弧度)
rollspeed: Optional[float] = None # 橫滾速度 (弧度/秒)
pitchspeed: Optional[float] = None # 俯仰速度 (弧度/秒)
yawspeed: Optional[float] = None # 偏航速度 (弧度/秒)
timestamp: Optional[float] = None # 時間戳記
@dataclass
class FlightMode:
"""飛行模式資訊"""
base_mode: Optional[int] = None # MAVLink base mode
custom_mode: Optional[int] = None # 自定義模式
mode_name: Optional[str] = None # 模式名稱 (例如: "GUIDED", "AUTO")
timestamp: Optional[float] = None # 時間戳記
@dataclass
class Battery:
"""電池資訊"""
voltage: Optional[float] = None # 電壓 (V)
current: Optional[float] = None # 電流 (A)
remaining: Optional[int] = None # 剩餘電量 (%)
temperature: Optional[float] = None # 溫度 (攝氏)
timestamp: Optional[float] = None # 時間戳記
@dataclass
class EKF:
"""EKF狀態資訊"""
flags: Optional[int] = None # EKF 旗標
velocity_variance: Optional[float] = None # 速度變異
pos_horiz_variance: Optional[float] = None # 水平位置變異
pos_vert_variance: Optional[float] = None # 垂直位置變異
compass_variance: Optional[float] = None # 羅盤變異
terrain_alt_variance: Optional[float] = None # 地形高度變異
timestamp: Optional[float] = None # 時間戳記
@dataclass
class GPS:
"""GPS資訊"""
fix_type: Optional[int] = None # GPS fix 類型 (0=無, 1=無fix, 2=2D, 3=3D, 4=DGPS, 5=RTK)
satellites_visible: Optional[int] = None # 可見衛星數
eph: Optional[int] = None # GPS HDOP 水平精度因子
epv: Optional[int] = None # GPS VDOP 垂直精度因子
timestamp: Optional[float] = None # 時間戳記
@dataclass
class VFR:
"""VFR HUD資訊"""
airspeed: Optional[float] = None # 空速 (m/s)
groundspeed: Optional[float] = None # 地速 (m/s)
heading: Optional[int] = None # 航向 (度)
throttle: Optional[int] = None # 油門 (%)
climb: Optional[float] = None # 爬升率 (m/s)
timestamp: Optional[float] = None # 時間戳記
@dataclass
class ComponentStatus:
"""組件狀態容器"""
position: Position = field(default_factory=Position)
attitude: Attitude = field(default_factory=Attitude)
mode: FlightMode = field(default_factory=FlightMode)
battery: Battery = field(default_factory=Battery)
ekf: EKF = field(default_factory=EKF)
gps: GPS = field(default_factory=GPS)
vfr: VFR = field(default_factory=VFR)
# 系統狀態
system_status: Optional[int] = None # MAV_STATE
armed: Optional[bool] = None # 解鎖狀態
# 其他自定義狀態
custom_status: Dict[str, Any] = field(default_factory=dict)
@dataclass
class PacketStats:
"""封包統計資訊"""
received_count: int = 0 # 接收封包數
lost_count: int = 0 # 遺失封包數
last_seq: Optional[int] = None # 最後序號
last_msg_time: Optional[float] = None # 最後訊息時間
msg_type_count: Dict[int, int] = field(default_factory=dict) # 各類訊息計數 {msg_type: count}
@dataclass
class RFStatus:
"""RF模組狀態"""
rssi: Optional[int] = None # 信號強度
noise: Optional[int] = None # 噪音水平
at_response: Optional[str] = None # AT 命令回應
link_quality: Optional[int] = None # 連接品質
timestamp: Optional[float] = None # 時間戳記
custom_status: Dict[str, Any] = field(default_factory=dict) # 其他自定義狀態
@dataclass
class SocketInfo:
"""Socket連接資訊"""
ip: Optional[str] = None # IP位址
port: Optional[int] = None # 埠號
local_ip: Optional[str] = None # 本地IP
local_port: Optional[int] = None # 本地埠號
connected: bool = False # 連接狀態
# ====================== Component Class =====================
class VehicleComponent:
"""載具組件 - 代表一個 MAVLink component"""
def __init__(self, component_id: int, comp_type: ComponentType = ComponentType.OTHER):
self.component_id = component_id
self.type = comp_type
# MAVLink 組件資訊
self.mav_type: Optional[int] = None # MAV_TYPE
self.mav_autopilot: Optional[int] = None # MAV_AUTOPILOT
# 狀態容器
self.status = ComponentStatus()
# 參數容器 (只存放,不主動下載)
self.parameters: Dict[str, Any] = {}
# 封包統計
self.packet_stats = PacketStats()
def update_packet_stats(self, seq: int, msg_type: int, timestamp: float) -> None:
"""
更新封包統計
Args:
seq: 訊息序號
msg_type: 訊息類型
timestamp: 時間戳記
"""
stats = self.packet_stats
# 檢查遺失封包
if stats.last_seq is not None:
expected_seq = (stats.last_seq + 1) % 256
diff = seq - expected_seq
if diff < 0:
diff += 256
stats.lost_count += diff
# 更新統計資訊
stats.received_count += 1
stats.last_seq = seq
stats.last_msg_time = timestamp
# 更新訊息類型計數
if msg_type in stats.msg_type_count:
stats.msg_type_count[msg_type] += 1
else:
stats.msg_type_count[msg_type] = 1
def reset_packet_stats(self) -> None:
"""重置封包統計"""
self.packet_stats = PacketStats()
def set_parameter(self, param_name: str, param_value: Any) -> None:
"""設定參數 (手動餵入)"""
self.parameters[param_name] = param_value
def get_parameter(self, param_name: str, default: Any = None) -> Any:
"""取得參數"""
return self.parameters.get(param_name, default)
def __str__(self) -> str:
return (f"Component(id={self.component_id}, type={self.type.value}, "
f"mav_type={self.mav_type}, received={self.packet_stats.received_count}, "
f"lost={self.packet_stats.lost_count})")
# ====================== RF Module Class =====================
class RFModule:
"""RF模組資訊容器"""
def __init__(self, rf_type: RFModuleType = RFModuleType.OTHER):
self.type = rf_type
self.status = RFStatus()
self.socket_info = SocketInfo()
def update_rssi(self, rssi: int, timestamp: Optional[float] = None) -> None:
"""更新RSSI"""
self.status.rssi = rssi
if timestamp:
self.status.timestamp = timestamp
def update_at_response(self, response: str, timestamp: Optional[float] = None) -> None:
"""更新AT命令回應"""
self.status.at_response = response
if timestamp:
self.status.timestamp = timestamp
def update_socket_info(self, ip: str = None, port: int = None,
local_ip: str = None, local_port: int = None,
connected: bool = None) -> None:
"""更新Socket資訊"""
if ip is not None:
self.socket_info.ip = ip
if port is not None:
self.socket_info.port = port
if local_ip is not None:
self.socket_info.local_ip = local_ip
if local_port is not None:
self.socket_info.local_port = local_port
if connected is not None:
self.socket_info.connected = connected
def __str__(self) -> str:
return (f"RFModule(type={self.type.value}, rssi={self.status.rssi}, "
f"connected={self.socket_info.connected})")
# ====================== Main VehicleView Class =====================
class VehicleView:
"""
載具視圖 - 純狀態容器
特點:
1. 只有狀態容器沒有任何主動通訊功能
2. 不主動通訊不背景下載參數不搶 RF/MAVLink 流量
3. 可以手動餵資料 (push state)
4. 可擴充 (支援 RF 模組狀態)
"""
def __init__(self, sysid: int):
# Meta 資訊
self.sysid = sysid
self.kind: Optional[str] = None # 載具種類描述 (例如: "Copter", "Plane")
self.vehicle_type: Optional[int] = None # MAV_TYPE
self.connected_via: ConnectionType = ConnectionType.OTHER
# 組件容器 {component_id: VehicleComponent}
self.components: Dict[int, VehicleComponent] = {}
# RF模組
self.rf_module: Optional[RFModule] = None
# 其他自定義meta資訊
self.custom_meta: Dict[str, Any] = {}
def add_component(self, component_id: int, comp_type: ComponentType = ComponentType.OTHER) -> VehicleComponent:
"""
新增組件
Args:
component_id: 組件ID
comp_type: 組件類型
Returns:
VehicleComponent: 新增的組件
"""
if component_id not in self.components:
self.components[component_id] = VehicleComponent(component_id, comp_type)
logger.info(f"Added component {component_id} to system {self.sysid}")
return self.components[component_id]
def get_component(self, component_id: int) -> Optional[VehicleComponent]:
"""取得組件"""
return self.components.get(component_id)
def remove_component(self, component_id: int) -> bool:
"""移除組件"""
if component_id in self.components:
del self.components[component_id]
logger.info(f"Removed component {component_id} from system {self.sysid}")
return True
return False
def set_rf_module(self, rf_type: RFModuleType) -> RFModule:
"""設定RF模組"""
self.rf_module = RFModule(rf_type)
return self.rf_module
def get_rf_module(self) -> Optional[RFModule]:
"""取得RF模組"""
return self.rf_module
def __str__(self) -> str:
comp_list = ", ".join([str(cid) for cid in self.components.keys()])
return (f"VehicleView(sysid={self.sysid}, kind={self.kind}, "
f"connected_via={self.connected_via.value}, "
f"components=[{comp_list}], "
f"rf_module={self.rf_module is not None})")
def to_dict(self) -> Dict[str, Any]:
"""轉換為字典格式 (方便序列化/除錯)"""
return {
'meta': {
'sysid': self.sysid,
'kind': self.kind,
'vehicle_type': self.vehicle_type,
'connected_via': self.connected_via.value,
'custom_meta': self.custom_meta
},
'components': {
cid: {
'component_id': comp.component_id,
'type': comp.type.value,
'mav_type': comp.mav_type,
'mav_autopilot': comp.mav_autopilot,
'packet_stats': {
'received': comp.packet_stats.received_count,
'lost': comp.packet_stats.lost_count,
'last_seq': comp.packet_stats.last_seq,
'last_msg_time': comp.packet_stats.last_msg_time
},
'parameters_count': len(comp.parameters)
}
for cid, comp in self.components.items()
},
'rf_module': {
'type': self.rf_module.type.value,
'rssi': self.rf_module.status.rssi,
'socket_connected': self.rf_module.socket_info.connected
} if self.rf_module else None
}
# ====================== Registry =====================
class VehicleViewRegistry:
"""載具視圖註冊表 - 管理所有的 VehicleView"""
def __init__(self):
self._vehicles: Dict[int, VehicleView] = {}
def register(self, sysid: int) -> VehicleView:
"""註冊新的載具視圖"""
if sysid not in self._vehicles:
self._vehicles[sysid] = VehicleView(sysid)
logger.info(f"Registered new VehicleView for system {sysid}")
return self._vehicles[sysid]
def get(self, sysid: int) -> Optional[VehicleView]:
"""取得載具視圖"""
return self._vehicles.get(sysid)
def unregister(self, sysid: int) -> bool:
"""註銷載具視圖"""
if sysid in self._vehicles:
del self._vehicles[sysid]
logger.info(f"Unregistered VehicleView for system {sysid}")
return True
return False
def get_all(self) -> Dict[int, VehicleView]:
"""取得所有載具視圖"""
return self._vehicles.copy()
def clear(self) -> None:
"""清空所有載具視圖"""
self._vehicles.clear()
logger.info("Cleared all VehicleViews")
def __len__(self) -> int:
return len(self._vehicles)
def __contains__(self, sysid: int) -> bool:
return sysid in self._vehicles
# ====================== Global Instance =====================
# 全域註冊表實例
vehicle_registry = VehicleViewRegistry()

@ -0,0 +1,7 @@
"""
共用工具模組
"""
from .ringBuffer import RingBuffer
from .theLogger import setup_logger
__all__ = ['RingBuffer', 'setup_logger']

@ -5,7 +5,7 @@ from logging.handlers import TimedRotatingFileHandler
# 全域 Logger 實例
_global_logger = None
def setup_logger(name: str, log_dir: str = "log", level=logging.DEBUG) -> logging.Logger:
def setup_logger(name: str, log_dir: str = "logs", level=logging.DEBUG) -> logging.Logger:
global _global_logger
if _global_logger is None:

@ -20,6 +20,7 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
'mavlink_orchestrator = fc_network_adapter.mainOrchestrator:main',
],
},
)

@ -0,0 +1,260 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
from ..fc_network_adapter import mavlinkObject as mo
from ..fc_network_adapter import mavlinkVehicleView as mvv
# from ..fc_network_adapter import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 10
running_time = 3
print('test_item : ', test_item)
'''
測試項 個位數 表示分離的功能
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
'''
if test_item == 10:
# 需要開啟一個 ardupilot 的模擬器
# 測試 mavlink_object 放入 ring buffer 的應用
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0..1:14571"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
sock = mavlink_socket1.port
print("Socket port:", sock)
manager.add_mavlink_object(mavlink_object1)
start_time = time.time()
while (time.time() - start_time) < running_time:
items_a = mo.stream_bridge_ring.get_all()
items_b = mo.return_packet_ring.get_all()
try:
print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}")
except IndexError:
print("stream_bridge_ring is empty")
try:
print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}")
except IndexError:
print("return_packet_ring is empty")
time.sleep(1)
manager.shutdown()
print('<=== End of Program')
elif test_item == 11:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14571"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
# 啟動 mavlink_bridge
bridge = mo.mavlink_bridge()
bridge.start()
time.sleep(3)
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
all_vehicles = mvv.vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
print(f" System {sysid}: {vehicle}")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
if (time.time() - show_time) >= 2:
# print("mark B")
show_time = time.time()
for sysid, vehicle in all_vehicles.items():
for compid in vehicle.components:
comp = vehicle.get_component(compid)
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, comp.packet_stats.received_count))
comp.reset_packet_stats()
print("===================")
manager.shutdown()
bridge.stop()
print('<=== End of Program')
elif test_item == 12:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試 mavlink object 作為交換器功能的代碼
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14571"
mavlink_socket_in1 = mavutil.mavlink_connection(connection_string)
mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1)
connection_string="udp:127.0.0.1:14571"
mavlink_socket_in2 = mavutil.mavlink_connection(connection_string)
mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2)
# 初始化輸出通道
connection_string="udpout:127.0.0.1:14551"
mavlink_socket_out = mavutil.mavlink_connection(connection_string)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
manager.add_mavlink_object(mavlink_object_out)
manager.add_mavlink_object(mavlink_object_in1)
manager.add_mavlink_object(mavlink_object_in2)
time.sleep(1) # 等待通道啟動
mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id)
mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id)
start_time = time.time()
while (time.time() - start_time) < running_time:
time.sleep(1)
manager.shutdown()
print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
# 初始化 rclpy 才能使用 node
rclpy.init()
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
time.sleep(0.5) # 系統 Setup 完成
# 創建通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
manager.add_mavlink_object(mavlink_object3)
print('=== waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(1)
except KeyboardInterrupt:
break
# 程式結束
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
manager.stop()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 52:
print('===> Start of Program .Test ', test_item)
manager = mo.async_io_manager()
manager.start()
# print(manager.thread.is_alive())
manager.shutdown()
time.sleep(1)
print('manager stopped')

@ -0,0 +1,331 @@
"""
VehicleView 使用範例
展示如何使用純狀態容器來管理 MAVLink 載具資訊
"""
import time
from ..fc_network_adapter.mavlinkVehicleView import (
VehicleView,
VehicleComponent,
RFModule,
vehicle_registry,
ConnectionType,
ComponentType,
RFModuleType
)
def example_basic_usage():
"""基本使用範例"""
print("=== 基本使用範例 ===\n")
# 1. 建立載具視圖
vehicle = VehicleView(sysid=1)
vehicle.kind = "Copter"
vehicle.vehicle_type = 2 # MAV_TYPE_QUADROTOR
vehicle.connected_via = ConnectionType.UDP
print(f"建立載具: {vehicle}\n")
# 2. 新增 autopilot 組件
autopilot = vehicle.add_component(
component_id=1,
comp_type=ComponentType.AUTOPILOT
)
autopilot.mav_type = 2 # MAV_TYPE_QUADROTOR
autopilot.mav_autopilot = 3 # MAV_AUTOPILOT_ARDUPILOTMEGA
print(f"新增組件: {autopilot}\n")
# 3. 手動餵入位置資訊
autopilot.status.position.latitude = 25.0330
autopilot.status.position.longitude = 121.5654
autopilot.status.position.altitude = 100.5
autopilot.status.position.timestamp = time.time()
print(f"位置: 緯度={autopilot.status.position.latitude}, "
f"經度={autopilot.status.position.longitude}, "
f"高度={autopilot.status.position.altitude}m\n")
# 4. 手動餵入姿態資訊
autopilot.status.attitude.roll = 0.05 # 弧度
autopilot.status.attitude.pitch = -0.02
autopilot.status.attitude.yaw = 1.57
autopilot.status.attitude.timestamp = time.time()
print(f"姿態: Roll={autopilot.status.attitude.roll:.3f}, "
f"Pitch={autopilot.status.attitude.pitch:.3f}, "
f"Yaw={autopilot.status.attitude.yaw:.3f} rad\n")
# 5. 手動餵入飛行模式
autopilot.status.mode.base_mode = 89
autopilot.status.mode.custom_mode = 4
autopilot.status.mode.mode_name = "GUIDED"
autopilot.status.mode.timestamp = time.time()
print(f"飛行模式: {autopilot.status.mode.mode_name}\n")
# 6. 手動餵入電池資訊
autopilot.status.battery.voltage = 12.6
autopilot.status.battery.current = 15.2
autopilot.status.battery.remaining = 75
autopilot.status.battery.timestamp = time.time()
print(f"電池: 電壓={autopilot.status.battery.voltage}V, "
f"電流={autopilot.status.battery.current}A, "
f"剩餘={autopilot.status.battery.remaining}%\n")
def example_packet_tracking():
"""封包追蹤範例"""
print("\n=== 封包追蹤範例 ===\n")
vehicle = VehicleView(sysid=2)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
# 模擬接收封包
timestamp = time.time()
# 接收 HEARTBEAT (msg_type=0)
autopilot.update_packet_stats(seq=0, msg_type=0, timestamp=timestamp)
# 接收 ATTITUDE (msg_type=30)
autopilot.update_packet_stats(seq=1, msg_type=30, timestamp=timestamp+0.1)
# 接收 GLOBAL_POSITION_INT (msg_type=33)
autopilot.update_packet_stats(seq=2, msg_type=33, timestamp=timestamp+0.2)
# 模擬封包遺失 (seq 跳過 3, 4, 5)
autopilot.update_packet_stats(seq=6, msg_type=0, timestamp=timestamp+0.3)
stats = autopilot.packet_stats
print(f"封包統計:")
print(f" 接收: {stats.received_count}")
print(f" 遺失: {stats.lost_count}")
print(f" 最後序號: {stats.last_seq}")
print(f" 訊息類型計數: {stats.msg_type_count}\n")
def example_parameters():
"""參數管理範例"""
print("\n=== 參數管理範例 ===\n")
vehicle = VehicleView(sysid=3)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
# 手動設定參數 (不會主動下載)
autopilot.set_parameter("ARMING_CHECK", 1)
autopilot.set_parameter("ANGLE_MAX", 4500)
autopilot.set_parameter("WPNAV_SPEED", 500)
print(f"參數數量: {len(autopilot.parameters)}")
print(f"ARMING_CHECK = {autopilot.get_parameter('ARMING_CHECK')}")
print(f"ANGLE_MAX = {autopilot.get_parameter('ANGLE_MAX')}")
print(f"WPNAV_SPEED = {autopilot.get_parameter('WPNAV_SPEED')}\n")
def example_rf_module():
"""RF模組範例"""
print("\n=== RF模組範例 ===\n")
vehicle = VehicleView(sysid=4)
vehicle.connected_via = ConnectionType.SERIAL
# 設定 XBee RF 模組
rf = vehicle.set_rf_module(RFModuleType.XBEE)
# 更新 Socket 資訊
rf.update_socket_info(
ip="192.168.1.100",
port=14550,
local_ip="192.168.1.1",
local_port=14551,
connected=True
)
# 更新 RSSI
rf.update_rssi(rssi=-65, timestamp=time.time())
# 更新 AT 命令回應
rf.update_at_response("OK", timestamp=time.time())
# 自定義狀態
rf.status.custom_status['signal_quality'] = 'excellent'
rf.status.custom_status['packet_error_rate'] = 0.001
print(f"RF模組: {rf}")
print(f"Socket: {rf.socket_info.ip}:{rf.socket_info.port}")
print(f"RSSI: {rf.status.rssi} dBm")
print(f"AT回應: {rf.status.at_response}")
print(f"自定義狀態: {rf.status.custom_status}\n")
def example_multiple_components():
"""多組件範例"""
print("\n=== 多組件範例 ===\n")
vehicle = VehicleView(sysid=5)
vehicle.kind = "Copter with Gimbal"
# Autopilot 組件
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
autopilot.mav_type = 2
autopilot.status.mode.mode_name = "AUTO"
# Gimbal 組件
gimbal = vehicle.add_component(154, ComponentType.GIMBAL)
gimbal.mav_type = 26 # MAV_TYPE_GIMBAL
gimbal.status.attitude.pitch = -0.785 # 向下45度
gimbal.status.attitude.yaw = 0.0
# Camera 組件
camera = vehicle.add_component(100, ComponentType.CAMERA)
camera.mav_type = 30 # MAV_TYPE_CAMERA
camera.status.custom_status['recording'] = True
camera.status.custom_status['photo_interval'] = 2.0
print(f"載具: {vehicle}")
print(f"組件數量: {len(vehicle.components)}")
for cid, comp in vehicle.components.items():
print(f" 組件 {cid}: {comp.type.value}, MAV_TYPE={comp.mav_type}")
print()
def example_registry():
"""註冊表使用範例"""
print("\n=== 註冊表使用範例 ===\n")
# 註冊多個載具
v1 = vehicle_registry.register(sysid=1)
v1.kind = "Copter-1"
v1.add_component(1, ComponentType.AUTOPILOT)
v2 = vehicle_registry.register(sysid=2)
v2.kind = "Plane-1"
v2.add_component(1, ComponentType.AUTOPILOT)
v3 = vehicle_registry.register(sysid=3)
v3.kind = "Rover-1"
v3.add_component(1, ComponentType.AUTOPILOT)
print(f"註冊表中的載具數量: {len(vehicle_registry)}")
# 取得所有載具
all_vehicles = vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
print(f" System {sysid}: {vehicle.kind}")
# 檢查載具是否存在
print(f"\nSystem 2 存在? {2 in vehicle_registry}")
print(f"System 99 存在? {99 in vehicle_registry}")
# 取得特定載具
vehicle = vehicle_registry.get(2)
if vehicle:
print(f"\n取得載具: {vehicle}")
# 註銷載具
vehicle_registry.unregister(3)
print(f"\n註銷 System 3 後,剩餘載具: {len(vehicle_registry)}\n")
def example_serialization():
"""序列化範例 (除錯/日誌用)"""
print("\n=== 序列化範例 ===\n")
vehicle = VehicleView(sysid=10)
vehicle.kind = "Test Copter"
vehicle.connected_via = ConnectionType.UDP
vehicle.custom_meta['firmware'] = 'ArduCopter 4.3.0'
vehicle.custom_meta['frame_type'] = 'X'
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
autopilot.mav_type = 2
autopilot.status.position.altitude = 50.0
autopilot.status.battery.voltage = 12.4
autopilot.update_packet_stats(0, 0, time.time())
autopilot.update_packet_stats(1, 30, time.time())
rf = vehicle.set_rf_module(RFModuleType.UDP)
rf.update_rssi(-70)
rf.update_socket_info(ip="192.168.1.200", port=14550, connected=True)
# 轉換為字典
data = vehicle.to_dict()
print("載具資料 (字典格式):")
import json
print(json.dumps(data, indent=2, ensure_ascii=False))
def example_gps_ekf():
"""GPS 與 EKF 範例"""
print("\n\n=== GPS 與 EKF 範例 ===\n")
vehicle = VehicleView(sysid=11)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
# GPS 資訊
autopilot.status.gps.fix_type = 3 # 3D Fix
autopilot.status.gps.satellites_visible = 12
autopilot.status.gps.eph = 120 # HDOP = 1.2
autopilot.status.gps.epv = 180 # VDOP = 1.8
autopilot.status.gps.timestamp = time.time()
print(f"GPS:")
print(f" Fix Type: {autopilot.status.gps.fix_type}")
print(f" 衛星數: {autopilot.status.gps.satellites_visible}")
print(f" HDOP: {autopilot.status.gps.eph/100}")
# EKF 資訊
autopilot.status.ekf.flags = 0x1FF # 所有 flags 都 OK
autopilot.status.ekf.velocity_variance = 0.5
autopilot.status.ekf.pos_horiz_variance = 1.2
autopilot.status.ekf.pos_vert_variance = 2.0
autopilot.status.ekf.timestamp = time.time()
print(f"\nEKF:")
print(f" Flags: 0x{autopilot.status.ekf.flags:X}")
print(f" 速度變異: {autopilot.status.ekf.velocity_variance}")
print(f" 水平位置變異: {autopilot.status.ekf.pos_horiz_variance}")
print(f" 垂直位置變異: {autopilot.status.ekf.pos_vert_variance}\n")
def example_vfr_hud():
"""VFR HUD 範例"""
print("\n=== VFR HUD 範例 ===\n")
vehicle = VehicleView(sysid=12)
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
# VFR HUD 資訊
autopilot.status.vfr.airspeed = 15.5 # m/s
autopilot.status.vfr.groundspeed = 14.8 # m/s
autopilot.status.vfr.heading = 90 # 東方
autopilot.status.vfr.throttle = 65 # %
autopilot.status.vfr.climb = 2.5 # m/s
autopilot.status.vfr.timestamp = time.time()
print(f"VFR HUD:")
print(f" 空速: {autopilot.status.vfr.airspeed} m/s")
print(f" 地速: {autopilot.status.vfr.groundspeed} m/s")
print(f" 航向: {autopilot.status.vfr.heading}°")
print(f" 油門: {autopilot.status.vfr.throttle}%")
print(f" 爬升率: {autopilot.status.vfr.climb} m/s\n")
if __name__ == "__main__":
# 執行所有範例
# example_basic_usage()
# example_packet_tracking()
# example_parameters()
# example_rf_module()
# example_multiple_components()
# example_registry()
# example_serialization()
# example_gps_ekf()
example_vfr_hud()
print("\n" + "="*50)
print("所有範例執行完成!")
print("="*50)

@ -0,0 +1,152 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import time
import threading
from ..fc_network_adapter.utils import RingBuffer
def producer(buffer, count, interval=0.01):
"""生產者:向緩衝區添加資料"""
print(f"Producer started (thread {threading.get_ident()})")
for i in range(count):
# 嘗試寫入數據,直到成功
while not buffer.put(f"Item-{i}"):
print(f"Buffer full, producer waiting... (thread {threading.get_ident()})")
time.sleep(0.1)
print(f"Produced: Item-{i}, buffer size: {buffer.size()}")
time.sleep(interval) # 模擬生產過程
print(f"Producer finished (thread {threading.get_ident()})")
def consumer(buffer, max_items, interval=0.05):
"""消費者:從緩衝區讀取資料"""
print(f"Consumer started (thread {threading.get_ident()})")
items_consumed = 0
while items_consumed < max_items:
# 嘗試讀取數據
item = buffer.get()
if item:
print(f"Consumed: {item}, buffer size: {buffer.size()}")
items_consumed += 1
else:
print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})")
time.sleep(interval) # 模擬消費過程
print(f"Consumer finished (thread {threading.get_ident()})")
def batch_consumer(buffer, interval=0.2):
"""批量消費者:一次性讀取緩衝區中的所有資料"""
print(f"Batch consumer started (thread {threading.get_ident()})")
for _ in range(5): # 執行5次批量讀取
time.sleep(interval) # 等待緩衝區積累數據
items = buffer.get_all()
if items:
print(f"Batch consumed {len(items)} items: {items}")
else:
print("Buffer empty for batch consumer")
print(f"Batch consumer finished (thread {threading.get_ident()})")
def demonstrate_multi_writer():
"""示範多個寫入執行緒同時操作緩衝區"""
print("\n=== Demonstrating Multiple Writers ===")
buffer = RingBuffer(capacity=80)
# 創建多個生產者執行緒
threads = []
for i in range(3):
thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1)))
threads.append(thread)
thread.start()
# 等待所有執行緒完成
for thread in threads:
thread.join()
buffer.print_stats() # 印出統計資訊
# 讀出所有剩餘資料
remaining = buffer.get_all()
print(f"Remaining items in buffer after multiple writers: {remaining}")
def demonstrate_basic_usage():
"""示範基本使用方式"""
print("\n=== Demonstrating Basic Usage ===")
# 創建緩衝區
buffer = RingBuffer(capacity=20, buffer_id=7)
# 檢查初始狀態
print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 添加幾個項目
for i in range(5):
buffer.put(f"Test-{i}")
# 檢查狀態
print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 讀取一個項目
item = buffer.get()
print(f"Read item: {item}")
print(f"After reading 1 item - Content Size: {buffer.size()}")
# 添加更多項目直到滿
items_added = 0
while not buffer.is_full():
buffer.put(f"Fill-{items_added}")
items_added += 1
print(f"Added {items_added} more items until full")
print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 嘗試添加到已滿的緩衝區
result = buffer.put("Overflow")
print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}")
# 獲取所有項目
all_items = buffer.get_all()
print(f"All items in buffer: {all_items}")
print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}")
# 印出統計資訊
buffer.print_stats()
def demonstrate_producer_consumer():
"""示範生產者-消費者模式"""
print("\n=== Demonstrating Producer-Consumer Pattern ===")
buffer = RingBuffer(capacity=16)
# 創建生產者和消費者執行緒
producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1))
consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2))
batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5))
# 啟動執行緒
producer_thread.start()
consumer_thread.start()
batch_thread.start()
# 等待執行緒完成
producer_thread.join()
consumer_thread.join()
batch_thread.join()
# 檢查最終狀態
print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}")
buffer.print_stats()
if __name__ == "__main__":
# 展示各種使用場景
# demonstrate_basic_usage()
# demonstrate_producer_consumer()
demonstrate_multi_writer()
print("\nAll demonstrations completed!")

@ -1,450 +0,0 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
from fc_network_adapter import mavlinkObject as mo
from fc_network_adapter import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 12
running_time = 10000
print('test_item : ', test_item)
'''
測試項 個位數 表示分離的功能
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
'''
if test_item == 10:
# 需要開啟一個 ardupilot 的模擬器
# 測試 mavlink_object 放入 ring buffer 的應用
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
start_time = time.time()
while (time.time() - start_time) < running_time:
items_a = mo.stream_bridge_ring.get_all()
items_b = mo.return_packet_ring.get_all()
try:
print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}")
except IndexError:
print("stream_bridge_ring is empty")
try:
print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}")
except IndexError:
print("return_packet_ring is empty")
time.sleep(1)
manager.stop()
print('<=== End of Program')
elif test_item == 11:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
manager.add_mavlink_object(mavlink_object1)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
time.sleep(3)
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
if (time.time() - show_time) >= 2:
# print("mark B")
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
manager.stop()
analyzer.stop()
print('<=== End of Program')
elif test_item == 12:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試 mavlink object 作為交換器功能的代碼
print('===> Start of Program .Test ', test_item)
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 初始化輸入通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket_in1 = mavutil.mavlink_connection(connection_string)
mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1)
connection_string="udp:127.0.0.1:14561"
mavlink_socket_in2 = mavutil.mavlink_connection(connection_string)
mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2)
# 初始化輸出通道
connection_string="udpout:127.0.0.1:14550"
mavlink_socket_out = mavutil.mavlink_connection(connection_string)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
manager.add_mavlink_object(mavlink_object_out)
manager.add_mavlink_object(mavlink_object_in1)
manager.add_mavlink_object(mavlink_object_in2)
time.sleep(1) # 等待通道啟動
mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id)
mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id)
mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id)
start_time = time.time()
while (time.time() - start_time) < running_time:
time.sleep(1)
manager.stop()
print('<=== End of Program')
# elif test_item == 20:
# # 這邊測試 node 生成 topic 的功能
# # 利用 空的通道 發出假的 heartbeat 封包
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# from pymavlink.dialects.v20 import common as mavlink2
# sysid = 1
# compid = 1
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# mav = mavlink2.MAVLink(None)
# mav.srcSystem = sysid
# mav.srcComponent = compid
# # 這是一個 heartbeat 封包
# fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
# type=mavlink2.MAV_TYPE_QUADROTOR,
# autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
# base_mode=3,
# custom_mode=0,
# system_status=0,
# mavlink_version=3
# )
# mavlink_bytes = fake_heartbeat.pack(mav)
# fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
# print(analyzer.mavlink_systems)
# mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
# time.sleep(0.1)
# print(analyzer.mavlink_systems)
# # ROS2 初始化
# analyzer._init_node()
# # 創建 ROS2 topic
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
# print("topic created")
# time.sleep(5)
# # 丟出 topic
# analyzer.emit_info()
# # 結束程式
# analyzer.running = False
# analyzer.destroy_node()
# rclpy.shutdown()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
# 初始化 rclpy 才能使用 node
rclpy.init()
# 清空 ring buffer
mo.stream_bridge_ring.clear()
mo.return_packet_ring.clear()
manager = mo.async_io_manager()
manager.start()
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
time.sleep(0.5) # 系統 Setup 完成
# 創建通道
connection_string="udp:127.0.0.1:14560"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
manager.add_mavlink_object(mavlink_object3)
print('=== waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(1)
except KeyboardInterrupt:
break
# 程式結束
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
manager.stop()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
# elif test_item == 22:
# # 需要開啟一個 ardupilot 的模擬器 與 GCS
# # 這邊是測試代碼 引入 rclpy 來測試 node 的運行
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 初始化兩個通道
# connection_string_in="udp:127.0.0.1:15551"
# mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
# mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
# mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
# connection_string_out="udpout:127.0.0.1:14553"
# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
# mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
# mavlink_object_out.multiplexingToAnalysis = []
# # 讓兩個通道互相傳輸
# mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# # 啟動通道
# mavlink_object_in.run()
# mavlink_object_out.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# print('目前所有的系統 : ')
# for sysid in analyzer.mavlink_systems:
# print(analyzer.mavlink_systems[sysid])
# compid = 1
# sysid = 1
# show_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# show_time2 = time.time()
# while time.time() - start_time < running_time:
# if (time.time() - show_time2) >= 1:
# analyzer.emit_info() # 這邊是測試 node 的運行
# # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
# fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
# sss = mavutil.mode_string_v10(fmsg)
# # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
# print("目前的飛行模式 : {}".format(sss))
# show_time2 = time.time()
# # if (time.time() - show_time) >= 2:
# # show_time = time.time()
# # for sysid in analyzer.mavlink_systems:
# # for compid in analyzer.mavlink_systems[sysid].components:
# # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# # print("===================")
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object_in.stop()
# mavlink_object_in.thread.join()
# mavlink_socket_in.close()
# mavlink_object_out.stop()
# mavlink_object_out.thread.join()
# mavlink_socket_out.close()
# analyzer.stop()
# analyzer.thread.join()
# print('<=== End of Program')
# elif test_item == 51:
# # 晉凱的測試項目
# print('===> Start of Program .Test ', test_item)
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
# # 啟動 mavlink_bridge
# analyzer = mo.mavlink_bridge()
# # 關於 Node 的初始化
# show_time = time.time()
# analyzer._init_node() # 初始化 node
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
# # 創建通道
# connection_string="udp:127.0.0.1:15551"
# mavlink_socket3 = mavutil.mavlink_connection(connection_string)
# mavlink_object3 = mo.mavlink_object(mavlink_socket3)
# # 設定通道流動
# mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
# mavlink_object3.multiplexingToReturn = [] #
# # mavlink_object3.multiplexingToSwap = [] #
# # 啟動通道
# mavlink_object3.run()
# print('waiting for mavlink data ...')
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
# compid = 1
# sysid = 1
# start_time = time.time()
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
# analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
# end_time = time.time()
# print(f"Execution time for create all topic: {end_time - start_time} seconds")
# print("start emit info")
# start_time = time.time()
# show_time = time.time()
# while time.time() - start_time < running_time:
# try:
# # rclpy.spin(analyzer)
# analyzer.emit_info() # 這邊是測試 node 的運行
# time.sleep(0.5)
# except KeyboardInterrupt:
# break
# analyzer.destroy_node()
# rclpy.shutdown()
# # 結束程式 退出所有 thread
# mavlink_object3.stop()
# mavlink_object3.thread.join()
# analyzer.stop()
# analyzer.thread.join()
# mavlink_socket3.close()
# print('<=== End of Program')

@ -13,10 +13,9 @@ import time
import threading
import socket
import asyncio
from unittest.mock import MagicMock, patch
# 導入要測試的模組
from fc_network_adapter.mavlinkObject import (
from ..fc_network_adapter.mavlinkObject import (
mavlink_object,
async_io_manager,
MavlinkObjectState,
@ -24,62 +23,100 @@ from fc_network_adapter.mavlinkObject import (
return_packet_ring
)
# 預先定義好的真實 MAVLink heartbeat 封包 (MAVLink 1.0 格式)
# Format: STX(0xFE) + LEN + SEQ + SYS + COMP + MSG_ID + PAYLOAD(9 bytes for heartbeat) + CRC(2 bytes)
HEARTBEAT_PACKET_1 = bytes([
0xFE, # STX (MAVLink 1.0)
0x09, # payload length (9 bytes)
0x00, # sequence
0x01, # system ID = 1
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
# Payload (9 bytes): custom_mode(4), type(1), autopilot(1), base_mode(1), system_status(1), mavlink_version(1)
0x00, 0x00, 0x00, 0x00, # custom_mode = 0
0x02, # type = MAV_TYPE_QUADROTOR (2)
0x03, # autopilot = MAV_AUTOPILOT_ARDUPILOTMEGA (3)
0x40, # base_mode = MAV_MODE_FLAG_CUSTOM_MODE_ENABLED (64)
0x03, # system_status = MAV_STATE_STANDBY (3)
0x03, # mavlink_version = 3
0x62, 0x8E # CRC (simplified placeholder)
])
HEARTBEAT_PACKET_2 = bytes([
0xFE, # STX
0x09, # payload length
0x01, # sequence (增加)
0x01, # system ID = 1
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
0x00, 0x00, 0x00, 0x00,
0x02, 0x03, 0x41, 0x03, 0x03,
0x33, 0xEC
])
HEARTBEAT_PACKET_3 = bytes([
0xFE, # STX
0x09, # payload length
0x02, # sequence
0x02, # system ID = 2
0x01, # component ID = 1
0x00, # message ID (HEARTBEAT = 0)
0x00, 0x00, 0x00, 0x00,
0x02, 0x03, 0x42, 0x03, 0x03,
0x37, 0x44
])
class MockMavlinkSocket:
"""模擬 Mavlink Socket 的類別,用於測試"""
"""模擬 Mavlink Socket 的類別,用於測試
使用真實的 MAVLink 封包而不是模擬的訊息對象
"""
def __init__(self, test_data=None):
def __init__(self, test_packets=None):
"""
Args:
test_packets: list of bytes每個元素都是完整的 MAVLink 封包
"""
self.closed = False
self.test_data = test_data or []
self.data_index = 0
self.test_packets = test_packets or []
self.packet_index = 0
self.written_data = []
# 使用 pymavlink 來解析封包
from pymavlink import mavutil
self.mav_parser = mavutil.mavlink.MAVLink(self)
def recv_msg(self):
if not self.test_data or self.data_index >= len(self.test_data):
"""返回解析後的 MAVLink 訊息對象"""
if not self.test_packets or self.packet_index >= len(self.test_packets):
return None
packet = self.test_packets[self.packet_index]
self.packet_index += 1
# 使用 pymavlink 解析封包
try:
for byte in packet:
msg = self.mav_parser.parse_char(bytes([byte]))
if msg:
return msg
except Exception as e:
print(f"Error parsing packet: {e}")
return None
data = self.test_data[self.data_index]
self.data_index += 1
return data
return None
def write(self, data):
"""寫入數據(用於檢查轉發)"""
self.written_data.append(data)
def close(self):
"""關閉 socket"""
self.closed = True
class MockMavlinkMessage:
"""模擬 Mavlink 訊息的類別,用於測試"""
def __init__(self, msg_id, sysid, compid, seq=0):
self.msg_id = msg_id
self.sysid = sysid
self.compid = compid
self.seq = seq
self.msg_buf = bytes([msg_id, sysid, compid, seq])
def get_msgId(self):
return self.msg_id
def get_srcSystem(self):
return self.sysid
def get_srcComponent(self):
return self.compid
def get_seq(self):
return self.seq
def get_msgbuf(self):
return self.msg_buf
def get_type(self):
return f"MSG_ID_{self.msg_id}"
class TestMavlinkObject(unittest.TestCase):
"""測試 mavlink_object 類別"""
"""測試 mavlink_object 類別的獨立功能"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
@ -91,35 +128,27 @@ class TestMavlinkObject(unittest.TestCase):
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建測試訊息
self.heartbeat_msg = MockMavlinkMessage(msg_id=0, sysid=1, compid=1)
self.attitude_msg = MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
self.position_msg = MockMavlinkMessage(msg_id=32, sysid=1, compid=1)
# 創建模擬的 socket # 假的 Mavlink Socket
self.mock_socket = MockMavlinkSocket([
self.heartbeat_msg,
self.attitude_msg,
self.position_msg
])
# 創建模擬的 socket使用真實封包
self.mock_socket = MockMavlinkSocket([HEARTBEAT_PACKET_1])
# 創建測試對象
self.mavlink_obj = mavlink_object(self.mock_socket)
def test_initialization(self):
"""測試 mavlink_object 初始化是否正確"""
# print("Testing mavlink_object initialization")
self.assertEqual(self.mavlink_obj.socket_id, 0)
self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT)
self.assertEqual(len(self.mavlink_obj.target_sockets), 0)
self.assertEqual(self.mavlink_obj.bridge_msg_types, [0])
self.assertEqual(self.mavlink_obj.return_msg_types, [])
def test_add_remove_target_socket(self):
"""測試添加和移除目標端口功能"""
# 添加目標端口
self.assertTrue(self.mavlink_obj.add_target_socket(1))
self.assertEqual(len(self.mavlink_obj.target_sockets), 1)
self.assertEqual(self.mavlink_obj.target_sockets[0], 1)
self.assertTrue(self.mavlink_obj.add_target_socket(2))
self.assertEqual(len(self.mavlink_obj.target_sockets), 2)
self.assertIn(2, self.mavlink_obj.target_sockets)
@ -153,78 +182,31 @@ class TestMavlinkObject(unittest.TestCase):
self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid"))
self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"]))
def test_send_message(self):
"""測試 send_message 功能"""
# 創建一個新的 mavlink_object 實例
mock_socket = MockMavlinkSocket()
mavlink_obj = mavlink_object(mock_socket)
# 準備測試數據
test_message1 = b"test_message_1"
test_message2 = b"test_message_2"
# 測試初始狀態
self.assertEqual(len(mock_socket.written_data), 0)
def test_send_message_validation(self):
"""測試 send_message 的數據驗證功能(不需要啟動 manager"""
# 測試非運行狀態下發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 0)
# 啟動 manager
manager = async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象到 manager
manager.add_mavlink_object(mavlink_obj)
time.sleep(0.1) # 等待對象啟動
# 確認對象狀態
self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING)
# 測試發送消息
self.assertTrue(mavlink_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
self.assertFalse(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1))
# 確認消息已發送
self.assertEqual(len(mock_socket.written_data), 1)
self.assertEqual(mock_socket.written_data[0], test_message1)
# 測試連續發送多條消息
self.assertTrue(mavlink_obj.send_message(test_message2))
time.sleep(0.2) # 等待消息處理
# 確認兩條消息都已發送
self.assertEqual(len(mock_socket.written_data), 2)
self.assertEqual(mock_socket.written_data[1], test_message2)
# 模擬發送出錯的情況
class ErrorWriteSocket(MockMavlinkSocket):
def write(self, data):
raise Exception("Write error")
error_socket = ErrorWriteSocket()
error_obj = mavlink_object(error_socket)
manager.add_mavlink_object(error_obj)
time.sleep(0.1) # 等待對象啟動
# 測試無效的數據類型
self.mavlink_obj.state = MavlinkObjectState.RUNNING # 臨時設置狀態
self.assertFalse(self.mavlink_obj.send_message("invalid"))
self.assertFalse(self.mavlink_obj.send_message(123))
# 發送消息到錯誤的 socket
self.assertTrue(error_obj.send_message(test_message1))
time.sleep(0.2) # 等待消息處理
# 測試太短的封包
self.assertFalse(self.mavlink_obj.send_message(bytes([0xFE, 0x00])))
# 即使寫入失敗send_message 應該也返回 True
# 因為消息已成功加入到佇列中,只是後續的實際發送失敗
# 測試無效的起始標記
invalid_packet = bytes([0xFF, 0x09, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00])
self.assertFalse(self.mavlink_obj.send_message(invalid_packet))
# 停止 manager
manager.stop()
time.sleep(0.5) # 等待 manager 停止
# 測試有效的封包可以加入佇列
self.assertTrue(self.mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(self.mavlink_obj.outgoing_msgs), 1)
# 測試對象已關閉後發送消息
self.assertFalse(mavlink_obj.send_message(test_message1))
self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加
self.mavlink_obj.state = MavlinkObjectState.INIT # 恢復狀態
class TestAsyncIOManager(unittest.TestCase):
"""測試 async_io_manager 類別"""
"""測試 async_io_manager 類別的獨立功能"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
@ -239,14 +221,9 @@ class TestAsyncIOManager(unittest.TestCase):
# 創建 async_io_manager 實例
self.manager = async_io_manager()
# 模擬 mavlink 對象
self.mock_socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
self.mock_socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1)
])
# 創建模擬 mavlink 對象,使用真實封包
self.mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2])
self.mock_socket2 = MockMavlinkSocket([HEARTBEAT_PACKET_3])
self.mavlink_obj1 = mavlink_object(self.mock_socket1)
self.mavlink_obj2 = mavlink_object(self.mock_socket2)
@ -254,10 +231,10 @@ class TestAsyncIOManager(unittest.TestCase):
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.stop()
self.manager.shutdown()
def test_singleton_pattern(self):
"""測試 async_io_manager 的單例模式 就是不會產生兩個 magager"""
"""測試 async_io_manager 的單例模式"""
manager1 = async_io_manager()
manager2 = async_io_manager()
self.assertIs(manager1, manager2)
@ -275,7 +252,7 @@ class TestAsyncIOManager(unittest.TestCase):
self.assertIs(self.manager.thread, old_thread)
# 停止管理器
self.manager.stop()
self.manager.shutdown()
self.assertFalse(self.manager.running)
# 最多等待 5 秒讓線程結束
@ -313,156 +290,179 @@ class TestAsyncIOManager(unittest.TestCase):
self.assertFalse(self.manager.remove_mavlink_object(999))
# 停止管理器
self.manager.stop()
self.manager.shutdown()
class TestIntegration(unittest.TestCase):
"""整合測試,測試多個 mavlink_object 之間的互動與資料流"""
def setUp(self):
"""在每個測試方法執行前準備環境"""
# 清空全局變數
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
# 清空 ring buffer
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建 async_io_manager 實例
self.manager = async_io_manager()
def tearDown(self):
"""在每個測試方法執行後清理環境"""
if self.manager.running:
self.manager.shutdown()
def test_send_message_with_manager(self):
"""測試透過 async_io_manager 發送訊息的完整流程"""
# 創建一個新的 mavlink_object 實例
mock_socket = MockMavlinkSocket()
mavlink_obj = mavlink_object(mock_socket)
def test_data_processing(self):
"""測試數據處理"""
# 測試初始狀態
self.assertEqual(len(mock_socket.written_data), 0)
# 測試非運行狀態下發送消息
self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(mock_socket.written_data), 0)
# 啟動 manager
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加對象到 manager
self.manager.add_mavlink_object(mavlink_obj)
time.sleep(0.1) # 等待對象啟動
# 確認對象狀態
self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING)
# 測試發送消息
self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
time.sleep(0.2) # 等待消息處理
# 確認消息已發送
self.assertEqual(len(mock_socket.written_data), 1)
self.assertEqual(mock_socket.written_data[0], HEARTBEAT_PACKET_1)
# 測試連續發送多條消息
self.assertTrue(mavlink_obj.send_message(HEARTBEAT_PACKET_2))
time.sleep(0.2) # 等待消息處理
# 確認兩條消息都已發送
self.assertEqual(len(mock_socket.written_data), 2)
self.assertEqual(mock_socket.written_data[1], HEARTBEAT_PACKET_2)
# 停止 manager
self.manager.shutdown()
time.sleep(0.5) # 等待 manager 停止
# 測試對象已關閉後發送消息
self.assertFalse(mavlink_obj.send_message(HEARTBEAT_PACKET_1))
self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加
def test_data_processing_and_forwarding(self):
"""測試數據處理與轉發流程"""
# 創建用於轉發的 mavlink_objects
mock_socket1 = MockMavlinkSocket([HEARTBEAT_PACKET_1, HEARTBEAT_PACKET_2,])
mock_socket3 = MockMavlinkSocket()
mavlink_obj3 = mavlink_object(mock_socket3)
# 設置轉發
self.mavlink_obj1.add_target_socket(2) # socket1 轉發到 socket3
mavlink_obj1 = mavlink_object(mock_socket1)
mavlink_obj3 = mavlink_object(mock_socket3)
# 設置訊息類型
self.mavlink_obj1.set_bridge_message_types([0, 30]) # HEARTBEAT, ATTITUDE
# self.mavlink_obj1.enable_return(True)
self.mavlink_obj1.set_return_message_types([30]) # ATTITUDE
mavlink_obj1.set_bridge_message_types([0]) # 只處理 HEARTBEAT
# 設置轉發: obj1 -> obj3
mavlink_obj1.add_target_socket(mavlink_obj3.socket_id) # socket1 轉發到 socket3 (socket_id=1)
# 啟動管理器並添加對象
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
self.manager.add_mavlink_object(self.mavlink_obj1)
"""
這邊出現很奇怪的狀況 應該說 設計時沒有考量 但是實測會發現
mavlink_obj3 是接收端 必需要被優先加入 manager 才能正確接收來自 mavlink_obj1 的轉發封包
若先把 mavlink_ojb1 加入 manger 則可能會導致前面幾個封包丟失
"""
self.manager.add_mavlink_object(mavlink_obj3)
self.manager.add_mavlink_object(mavlink_obj1)
# 等待處理完成
time.sleep(1.0)
time.sleep(0.5)
# print("testing Mark A")
# 檢查 Ring buffer 是否有正確的數據
self.assertEqual(stream_bridge_ring.size(), 2) # HEARTBEAT + ATTITUDE
self.assertEqual(return_packet_ring.size(), 1) # ATTITUDE
self.assertGreaterEqual(stream_bridge_ring.size(), 2) # 至少 2 個 HEARTBEAT
a = stream_bridge_ring.get()
print(f"stream_bridge_ring: {a}")
# 檢查是否正確轉發
self.assertEqual(len(mock_socket3.written_data), 2) # HEARTBEAT + ATTITUDE
# print("testing Mark B")
# 停止管理器
self.manager.stop()
def test_error_handling(self):
"""測試錯誤處理情況"""
print("=== mark A ===")
# 創建一個會引發異常的 socket
class ErrorSocket(MockMavlinkSocket):
def recv_msg(self):
raise Exception("Test exception")
self.assertGreaterEqual(len(mock_socket3.written_data), 2) # 至少 2 個 HEARTBEAT
error_socket = ErrorSocket()
mavlink_obj_err = mavlink_object(error_socket)
# 停止管理器
self.manager.shutdown()
# 啟動管理器並添加對象
def test_bidirectional_forwarding(self):
"""測試雙向轉發"""
# 清空全局變數和 ring buffer
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建三個 mavlink 對象,模擬三個通道
socket1 = MockMavlinkSocket()
socket2 = MockMavlinkSocket()
socket3 = MockMavlinkSocket()
obj1 = mavlink_object(socket1)
obj2 = mavlink_object(socket2)
obj3 = mavlink_object(socket3)
# 設置雙向轉發
# obj1 <-> obj2 <-> obj3
obj1.add_target_socket(1) # obj1 -> obj2
obj2.add_target_socket(0) # obj2 -> obj1
obj2.add_target_socket(2) # obj2 -> obj3
obj3.add_target_socket(1) # obj3 -> obj2
# 啟動 async_io_manager
self.manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加所有 mavlink_object
self.manager.add_mavlink_object(obj1)
self.manager.add_mavlink_object(obj2)
self.manager.add_mavlink_object(obj3)
self.manager.add_mavlink_object(mavlink_obj_err)
print("=== mark B ===")
# 等待錯誤處理
# 對三個對象添加數據
socket1.test_packets.append(HEARTBEAT_PACKET_1)
socket2.test_packets.append(HEARTBEAT_PACKET_2)
socket3.test_packets.append(HEARTBEAT_PACKET_3)
# 等待處理所有訊息
time.sleep(1.0)
# 對象應該進入錯誤狀態,但不會崩潰
# self.assertEqual(mavlink_obj_err.state, MavlinkObjectState.ERROR)
print("=== mark C ===")
# 管理器應該仍在運行
self.assertTrue(self.manager.running)
# 故意等一段時間 確認 socket 有被 manager 處理掉
time.sleep(3)
# 檢查轉發結果
# socket1 應該收到 socket2 的訊息
self.assertGreaterEqual(len(socket1.written_data), 1)
# socket2 應該收到 socket1 和 socket3 的訊息
self.assertGreaterEqual(len(socket2.written_data), 2)
# socket3 應該收到 socket2 的訊息
self.assertGreaterEqual(len(socket3.written_data), 1)
# 檢查 ring buffer 的數據
# 所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0]
self.assertGreaterEqual(stream_bridge_ring.size(), 3) # 至少 3 個 HEARTBEAT
# 停止管理器
self.manager.stop()
class TestIntegration(unittest.TestCase):
"""整合測試,測試多個 mavlink_object 之間的互動"""
def test_bidirectional_forwarding(self):
"""測試雙向轉發"""
# 清空全局變數和 ring buffer
mavlink_object.mavlinkObjects = {}
mavlink_object.socket_num = 0
stream_bridge_ring.clear()
return_packet_ring.clear()
# 創建三個 mavlink 對象,模擬三個通道
socket1 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=1, compid=1),
MockMavlinkMessage(msg_id=30, sysid=1, compid=1)
])
socket2 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=2, compid=1),
MockMavlinkMessage(msg_id=32, sysid=2, compid=1)
])
socket3 = MockMavlinkSocket([
MockMavlinkMessage(msg_id=0, sysid=3, compid=1),
MockMavlinkMessage(msg_id=33, sysid=3, compid=1)
])
obj1 = mavlink_object(socket1)
obj2 = mavlink_object(socket2)
obj3 = mavlink_object(socket3)
# 設置雙向轉發
# obj1 <-> obj2 <-> obj3
obj1.add_target_socket(1) # obj1 -> obj2
obj2.add_target_socket(0) # obj2 -> obj1
obj2.add_target_socket(2) # obj2 -> obj3
obj3.add_target_socket(1) # obj3 -> obj2
# 啟動 async_io_manager
manager = async_io_manager()
manager.start()
time.sleep(0.5) # 等待事件循環啟動
# 添加所有 mavlink_object
manager.add_mavlink_object(obj1)
manager.add_mavlink_object(obj2)
manager.add_mavlink_object(obj3)
# 等待處理所有訊息
time.sleep(1.5)
# 檢查轉發結果
# socket1 應該收到 socket2 和 socket3 的訊息
self.assertEqual(len(socket1.written_data), 4) # 2 from obj2, 2 from obj3 via obj2
# socket2 應該收到 socket1 和 socket3 的訊息
self.assertEqual(len(socket2.written_data), 4) # 2 from obj1, 2 from obj3
# socket3 應該收到 socket1 和 socket2 的訊息
self.assertEqual(len(socket3.written_data), 4) # 2 from obj1 via obj2, 2 from obj2
# 檢查 ring buffer 的數據
# 假設所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0]
# 應該有 3 個 HEARTBEAT 訊息
self.assertEqual(stream_bridge_ring.size(), 3) # 3 HEARTBEAT
# 停止管理器
manager.stop()
self.manager.shutdown()
if __name__ == "__main__":
unittest.main(defaultTest="TestMavlinkObject.test_send_message")
# 可以指定要運行的測試
# unittest.main(defaultTest="TestMavlinkObject.test_send_message_validation")
# unittest.main(defaultTest="TestAsyncIOManager.test_add_remove_objects")
unittest.main(defaultTest="TestIntegration.test_bidirectional_forwarding")
unittest.main()

@ -1,152 +1,296 @@
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
#!/usr/bin/env python
"""
測試 RingBuffer 類別的功能
"""
import time
import unittest
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from fc_network_adapter.ringBuffer import RingBuffer
def producer(buffer, count, interval=0.01):
"""生產者:向緩衝區添加資料"""
print(f"Producer started (thread {threading.get_ident()})")
for i in range(count):
# 嘗試寫入數據,直到成功
while not buffer.put(f"Item-{i}"):
print(f"Buffer full, producer waiting... (thread {threading.get_ident()})")
time.sleep(0.1)
print(f"Produced: Item-{i}, buffer size: {buffer.size()}")
time.sleep(interval) # 模擬生產過程
print(f"Producer finished (thread {threading.get_ident()})")
from ..fc_network_adapter.utils import RingBuffer
def consumer(buffer, max_items, interval=0.05):
"""消費者:從緩衝區讀取資料"""
print(f"Consumer started (thread {threading.get_ident()})")
items_consumed = 0
while items_consumed < max_items:
# 嘗試讀取數據
item = buffer.get()
if item:
print(f"Consumed: {item}, buffer size: {buffer.size()}")
items_consumed += 1
else:
print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})")
time.sleep(interval) # 模擬消費過程
print(f"Consumer finished (thread {threading.get_ident()})")
class TestRingBuffer(unittest.TestCase):
"""測試 RingBuffer 基本功能"""
def setUp(self):
"""每個測試前的準備"""
self.buffer = RingBuffer(capacity=8)
def test_initialization(self):
"""測試初始化"""
self.assertEqual(self.buffer.capacity, 8)
self.assertTrue(self.buffer.is_empty())
self.assertFalse(self.buffer.is_full())
self.assertEqual(self.buffer.size(), 0)
def test_put_get_basic(self):
"""測試基本的放入和取出"""
# 測試放入
self.assertTrue(self.buffer.put("item1"))
self.assertTrue(self.buffer.put("item2"))
self.assertEqual(self.buffer.size(), 2)
self.assertFalse(self.buffer.is_empty())
# 測試取出
item1 = self.buffer.get()
self.assertEqual(item1, "item1")
self.assertEqual(self.buffer.size(), 1)
item2 = self.buffer.get()
self.assertEqual(item2, "item2")
self.assertTrue(self.buffer.is_empty())
# 空緩衝區取出應返回 None
self.assertIsNone(self.buffer.get())
def test_capacity_overflow(self):
"""測試緩衝區容量限制"""
# 填滿緩衝區 (容量-1因為需要預留一個位置)
for i in range(7): # 8-1=7
self.assertTrue(self.buffer.put(f"item{i}"))
self.assertTrue(self.buffer.is_full())
# 嘗試再放入一個應該失敗
self.assertFalse(self.buffer.put("overflow"))
self.assertEqual(self.buffer.overflow_count.value, 1)
def test_get_all(self):
"""測試取出所有項目"""
items = ["a", "b", "c", "d"]
for item in items:
self.buffer.put(item)
all_items = self.buffer.get_all()
self.assertEqual(all_items, items)
self.assertTrue(self.buffer.is_empty())
def test_clear(self):
"""測試清空緩衝區"""
for i in range(5):
self.buffer.put(f"item{i}")
self.buffer.clear()
self.assertTrue(self.buffer.is_empty())
self.assertEqual(self.buffer.size(), 0)
def batch_consumer(buffer, interval=0.2):
"""批量消費者:一次性讀取緩衝區中的所有資料"""
print(f"Batch consumer started (thread {threading.get_ident()})")
for _ in range(5): # 執行5次批量讀取
time.sleep(interval) # 等待緩衝區積累數據
items = buffer.get_all()
if items:
print(f"Batch consumed {len(items)} items: {items}")
else:
print("Buffer empty for batch consumer")
print(f"Batch consumer finished (thread {threading.get_ident()})")
def demonstrate_multi_writer():
"""示範多個寫入執行緒同時操作緩衝區"""
print("\n=== Demonstrating Multiple Writers ===")
buffer = RingBuffer(capacity=80)
# 創建多個生產者執行緒
threads = []
for i in range(3):
thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1)))
threads.append(thread)
thread.start()
# 等待所有執行緒完成
for thread in threads:
thread.join()
class TestRingBufferThreadSafety(unittest.TestCase):
"""測試 RingBuffer 的線程安全性"""
def setUp(self):
"""每個測試前的準備"""
self.buffer = RingBuffer(capacity=256)
self.results = []
self.write_count = 1000
self.read_count = 1000
def test_concurrent_producers_consumers(self):
"""測試多生產者多消費者場景"""
self.results = []
stats = self.buffer.get_stats()
self.assertEqual(stats['total_writes'], 0)
def producer(producer_id, count):
"""生產者函數"""
for i in range(count):
item = f"producer_{producer_id}_item_{i}"
while not self.buffer.put(item):
time.sleep(0.001) # 緩衝區滿時稍微等待
def consumer(consumer_id, count):
"""消費者函數"""
items = []
for _ in range(count):
item = None
while item is None:
item = self.buffer.get()
if item is None:
time.sleep(0.001) # 緩衝區空時稍微等待
items.append(item)
self.results.extend(items)
# 創建多個生產者和消費者
with ThreadPoolExecutor(max_workers=8) as executor:
# 2 個生產者,每個寫入 500 個項目
producer_futures = [
executor.submit(producer, 0, 500),
executor.submit(producer, 1, 500)
]
# 2 個消費者,每個讀取 500 個項目
consumer_futures = [
executor.submit(consumer, 0, 500),
executor.submit(consumer, 1, 500)
]
# 等待所有任務完成
for future in producer_futures + consumer_futures:
future.result()
# 驗證結果
self.assertEqual(len(self.results), 1000)
self.assertTrue(self.buffer.is_empty())
# 檢查統計數據
stats = self.buffer.get_stats()
self.assertEqual(stats['total_writes'], 1000)
self.assertGreater(stats['total_reads'], 1000) # 包含失敗的讀取嘗試
self.assertGreater(stats['write_threads'], 1)
self.assertGreater(stats['read_threads'], 1)
def test_high_throughput(self):
"""測試高吞吐量場景"""
items_per_thread = 10000
num_threads = 4
def writer():
for i in range(items_per_thread):
while not self.buffer.put(i):
pass # 忙等待
def reader():
items = []
for _ in range(items_per_thread):
item = None
while item is None:
item = self.buffer.get()
items.append(item)
self.results.extend(items)
start_time = time.time()
with ThreadPoolExecutor(max_workers=num_threads * 2) as executor:
# 啟動寫入線程
writer_futures = [executor.submit(writer) for _ in range(num_threads)]
# 啟動讀取線程
reader_futures = [executor.submit(reader) for _ in range(num_threads)]
# 等待完成
for future in writer_futures + reader_futures:
future.result()
end_time = time.time()
# 驗證結果
total_items = items_per_thread * num_threads
self.assertEqual(len(self.results), total_items)
# 性能統計
duration = end_time - start_time
throughput = total_items / duration
print(f"\nHigh Throughput Test Results:")
print(f"Total items: {total_items}")
print(f"Duration: {duration:.2f}s")
print(f"Throughput: {throughput:.0f} items/sec")
# 顯示詳細統計
self.buffer.print_stats()
buffer.print_stats() # 印出統計資訊
# 讀出所有剩餘資料
remaining = buffer.get_all()
print(f"Remaining items in buffer after multiple writers: {remaining}")
class TestRingBufferStatistics(unittest.TestCase):
"""測試 RingBuffer 的統計功能"""
def test_statistics_tracking(self):
"""測試統計數據追蹤"""
buffer = RingBuffer(capacity=16)
# 寫入一些數據
for i in range(10):
buffer.put(f"item{i}")
# 讀取一些數據
for _ in range(5):
buffer.get()
stats = buffer.get_stats()
# 驗證基本統計
self.assertEqual(stats['total_writes'], 10)
self.assertEqual(stats['total_reads'], 5)
self.assertEqual(stats['current_size'], 5)
self.assertEqual(stats['write_threads'], 1)
self.assertEqual(stats['read_threads'], 1)
def test_reset_statistics(self):
"""測試重置統計數據"""
buffer = RingBuffer(capacity=16)
# 產生一些活動
for i in range(5):
buffer.put(f"item{i}")
for _ in range(3):
buffer.get()
# 重置統計
buffer.reset_stats()
stats = buffer.get_stats()
self.assertEqual(stats['total_writes'], 0)
self.assertEqual(stats['total_reads'], 0)
self.assertEqual(stats['concurrent_writes'], 0)
self.assertEqual(stats['concurrent_reads'], 0)
self.assertEqual(stats['overflow_count'], 0)
def demonstrate_basic_usage():
"""示範基本使用方式"""
print("\n=== Demonstrating Basic Usage ===")
# 創建緩衝區
buffer = RingBuffer(capacity=20, buffer_id=7)
# 檢查初始狀態
print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 添加幾個項目
for i in range(5):
buffer.put(f"Test-{i}")
# 檢查狀態
print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 讀取一個項目
item = buffer.get()
print(f"Read item: {item}")
print(f"After reading 1 item - Content Size: {buffer.size()}")
# 添加更多項目直到滿
items_added = 0
while not buffer.is_full():
buffer.put(f"Fill-{items_added}")
items_added += 1
print(f"Added {items_added} more items until full")
print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 嘗試添加到已滿的緩衝區
result = buffer.put("Overflow")
print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}")
# 獲取所有項目
all_items = buffer.get_all()
print(f"All items in buffer: {all_items}")
print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}")
# 印出統計資訊
def benchmark_ringbuffer():
"""RingBuffer 性能基準測試"""
print("\n=== RingBuffer Performance Benchmark ===")
buffer = RingBuffer(capacity=1024)
num_operations = 100000
# 單線程性能測試
start_time = time.time()
for i in range(num_operations):
buffer.put(i)
for _ in range(num_operations):
buffer.get()
end_time = time.time()
single_thread_time = end_time - start_time
throughput = (num_operations * 2) / single_thread_time
print(f"Single Thread: {throughput:.0f} ops/sec")
# 多線程性能測試
buffer = RingBuffer(capacity=1024)
def producer():
for i in range(num_operations // 2):
while not buffer.put(i):
pass
def consumer():
for _ in range(num_operations // 2):
while buffer.get() is None:
pass
start_time = time.time()
with ThreadPoolExecutor(max_workers=2) as executor:
future1 = executor.submit(producer)
future2 = executor.submit(consumer)
future1.result()
future2.result()
end_time = time.time()
multi_thread_time = end_time - start_time
throughput = num_operations / multi_thread_time
print(f"Multi Thread: {throughput:.0f} ops/sec")
print(f"Speedup: {single_thread_time/multi_thread_time:.2f}x")
buffer.print_stats()
def demonstrate_producer_consumer():
"""示範生產者-消費者模式"""
print("\n=== Demonstrating Producer-Consumer Pattern ===")
buffer = RingBuffer(capacity=16)
# 創建生產者和消費者執行緒
producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1))
consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2))
batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5))
# 啟動執行緒
producer_thread.start()
consumer_thread.start()
batch_thread.start()
# 等待執行緒完成
producer_thread.join()
consumer_thread.join()
batch_thread.join()
# 檢查最終狀態
print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}")
buffer.print_stats()
if __name__ == "__main__":
# 展示各種使用場景
# demonstrate_basic_usage()
# demonstrate_producer_consumer()
demonstrate_multi_writer()
# 運行單元測試
unittest.main(argv=[''], exit=False, verbosity=2)
print("\nAll demonstrations completed!")
# 運行性能基準測試
benchmark_ringbuffer()
Loading…
Cancel
Save