1. (adding) mainOrchestrator 增加版本先驗程序

2. (adding) mavlinkVehicleView 增加 read_all 快速讀取
3. (modify) 優化 longCommand 與 navigation 使其更好引用
4. (adding) 提供一個完整的 example_wholeMoving 作為範例
4. (remove) 移除 fc_network_app 重複功能
5. (modify) 修改 overview_table 部分顯示文字
chiyu
Chiyu Chen 4 weeks ago
parent 017dd4923d
commit cf213fc556

@ -1,43 +1,46 @@
這是天雷系統的專案
===
專案核心框架
1. ROS2 Humble
2. Python 3.8.10
===
必要相依套件 順便記錄我開發時的環境版本
Python
1. pymavlink -> Version: 2.4.42
2. conda-forge 中的 pyserial-asyncio
3. importlib_metadata -> Version: 8.5.0
4. setuptools -> Version: 58.2.0 (版本太新不行)
5. pyserial-asyncio
ROS2
1. source ~/ros2_humble/install/setup.bash
2.
===
功能簡介
## 功能簡介
1. mavlink 多對多支援平台
2. 不允許進到 ros 系統有相同 sysid
3. 假設一台載具上所有 component 共用同一 socket
===
開發用輔助專案
## 運行環境
### 專案核心框架
1. ROS2 Humble
2. Python 3.8.10
### 必要相依套件及版本
- Python
[Package] fc_network_adapter
1. pymavlink -> Version: 2.4.42
2. conda-forge 中的 pyserial-asyncio
3. importlib_metadata -> Version: 8.5.0
4. setuptools -> Version: 58.2.0 (版本太新不行)
5. pyserial-asyncio
[Package] GUI
1. testresources
2. websockets
3. PyQt6
4. PyQt6-WebEngine
- ROS2
1. source ~/ros2_humble/install/setup.bash
2. geographic_info (https://github.com/ros-geographic-info/geographic_info.git) 已經作為 submodule 放在 external
3. angles (https://github.com/ros/angles.git) 已經作為 submodule 放在 external
4. mavros_msgs (https://github.com/mavlink/mavros) 這個專案中的一個資料夾 這邊手動複製的
### 開發用輔助專案
1. Gazebo Garden
2. Ardupilot
===
依賴的 ROS 庫
1. https://github.com/ros-geographic-info/geographic_info.git 記得要搞 ros2 版本的
2. https://github.com/ros/angles.git
3. mavros_msgs 是 https://github.com/mavlink/mavros 這個專案中的一個資料夾 這邊手動複製的
## 使用說明
Clone 專案後 請先執行這些指令
```bash
# 1.同步 submodule
@ -49,17 +52,29 @@ colcon build --packages-select angles geographic_msgs
colcon build --packages-select mavros_msgs # 這個依賴前面的
colcon build --packages-select fc_interfaces # 自己定義的
```
1.
主要專案 fc_network_adapter 請一律在 ~/AirTrapMine/src/ 路徑下 以模組化啟動程式
```bash
# 記得先開啟 依賴 Package 到 overlay
. ./install/local_setup.bash
# 範例
cd ~/AirTrapMine/src/ # 這是範例!!!
python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
python -m fc_network_adapter.tests.demo_integration
python -m someotherpkg.src.example_wholeMoving
```
===
Package 簡述
<<<<<<< HEAD
2.
GUI 介面
在 ~/AirTrapMine/src/GUI 路徑下 直接啟動
```bash
cd ~/AirTrapMine/src/GUI
python gui.py
```
===
簡述0328
主要是透過test_transform這隻程式執行fdm_switch_example_one_with_remote_forwarding這個副函式透過UDP將封包轉接到Matlab並在Matlab進行演算法迭代後打包JSON檔並再次透過port口傳回封包傳回來的封包會在close_loop這隻程式被解析並提取其中有關velocity的資料儲存成變數以mavlink的方式打入Ardupilot以實現封閉迴路的構思。
=======
## 資料夾說明
1. unitdev 為各自協作者做開發時的測試區
01 -> 晉凱(ken)
02 -> 其宇(chiyu)
@ -77,19 +92,13 @@ Package 簡述
使用者的外層包裝
5. someotherpkg
如何使用 fc_network_apps 的範例檔案
6. GUI
由 PyQt6 開發的互動式介面
N. logs 是執行時期的記錄檔
===
主要專案 fc_network_adapter 請一律在 ~/AirTrapMine/src/ 資料夾下 以模組化啟動程式
例如 在 ~/AirTrapMine/src/ 資料夾下
```bash
# 記得先開啟 依賴 Package 到 overlay
. ./install/local_setup.bash
# 範例
python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
python -m fc_network_adapter.tests.demo_integration
```
>>>>>>> chiyu
簡述0328
主要是透過test_transform這隻程式執行fdm_switch_example_one_with_remote_forwarding這個副函式透過UDP將封包轉接到Matlab並在Matlab進行演算法迭代後打包JSON檔並再次透過port口傳回封包傳回來的封包會在close_loop這隻程式被解析並提取其中有關velocity的資料儲存成變數以mavlink的方式打入Ardupilot以實現封閉迴路的構思。

@ -6,7 +6,7 @@ class OverviewTable(QTableWidget):
"""總覽表格,顯示所有無人機的狀態資訊"""
# 默認的資訊類型和映射
DEFAULT_INFO_TYPES = ["模式", "ARM", "電壓", "經度", "緯度", "高度", "位置", "速度", "地速", "航向",
DEFAULT_INFO_TYPES = ["模式", "ARM", "電壓", "經度", "緯度", "高度", "XY位置", "XY速度", "地速", "航向",
"空速", "油門", "HUD ALT", "爬升率", "Roll", "Pitch", "Yaw", "丟包", "延遲"]
DEFAULT_INFO_TYPE_MAP = {

@ -29,7 +29,7 @@ from .utils import acquireSerial, acquirePort
from .utils.acquirePort import find_available_port
logger = setup_logger(os.path.basename(__file__))
VERSION_NO = "v0.61"
PROJECT_VER = "v0.61"
class PanelState:
def __init__(self):
@ -39,9 +39,9 @@ class PanelState:
self.object_manager_state = "Stopped"
self.serial_manager_state = "Stopped"
self.ros2_manager_state = "Stopped"
self.socket_object_list = [] # 已有的 mavlink object
self.socket_object_list = [] # 已有的 mavlink object
self.linked_serial_dict = {} # 已連線的 serial 端口 serial id num : serial_port string
self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息
self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息
# 關於創建通道時的暫存資訊
self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊
@ -102,36 +102,36 @@ class ControlPanel:
def input_dialog(stdscr, prompt="請輸入文字: "):
"""顯示輸入對話框"""
height, width = stdscr.getmaxyx()
# 建立輸入視窗
dialog_height = 5
dialog_width = min(60, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
# 建立視窗邊框
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(1, 2, prompt)
dialog_win.addstr(3, 2, "按 Enter 確認, ESC 取消")
dialog_win.refresh()
# 輸入區域
input_win = curses.newwin(1, dialog_width - 6, start_y + 2, start_x + 2)
input_win.keypad(True)
curses.echo()
curses.curs_set(1)
user_input = ""
while True:
input_win.clear()
input_win.addstr(0, 0, user_input[-dialog_width+8:]) # 顯示輸入內容(滾動)
input_win.refresh()
ch = input_win.getch()
if ch == 27: # ESC
user_input = None
break
@ -141,16 +141,16 @@ class ControlPanel:
user_input = user_input[:-1]
elif 32 <= ch <= 126: # 可打印字符
user_input += chr(ch)
curses.noecho()
curses.curs_set(0)
# 清理視窗
del input_win
del dialog_win
stdscr.clear()
stdscr.refresh()
return user_input
# ================ 關於 主要選單 的部份 ===================
@ -179,9 +179,9 @@ class ControlPanel:
]),
MenuNode("Vehicles Insp.", "檢視已連線的遠端載具", action = "INSPECT_VEHICLES"),
MenuNode("Engineer Mode", "工程模式", children=[
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"),
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"),
MenuNode("Stop Serial M.", "停止 Serial 端口轉接", "STOP_SERIAL_MANAGER"),
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"),
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"),
MenuNode("Stop Serial M.", "停止 Serial 端口轉接", "STOP_SERIAL_MANAGER"),
]),
MenuNode("Shutdown", "關閉整個系統", children=[
MenuNode("Return", "繼續運行", "BACK"),
@ -191,7 +191,7 @@ class ControlPanel:
def panel_thread(self, cmd_q: queue.Queue, state: PanelState, stop_evt: threading.Event):
stdscr = None
def cleanup():
"""清理 curses 狀態"""
if stdscr:
@ -209,7 +209,7 @@ class ControlPanel:
def draw_menu(screen):
nonlocal stdscr
stdscr = screen
curses.curs_set(0)
stdscr.nodelay(False) # 阻塞讀鍵
stdscr.keypad(True)
@ -219,12 +219,12 @@ class ControlPanel:
idx_stack = [0] # 索引堆疊
state.intoSTART() # 設定狀態為運行中
while not stop_evt.is_set():
current_menu = menu_stack[-1]
current_idx = idx_stack[-1]
# 獲取終端機尺寸
height, width = stdscr.getmaxyx()
# 簡單暴力的限制視窗的大小
@ -237,7 +237,7 @@ class ControlPanel:
if height < MIN_HEIGHT or width < 60:
logger.error("Terminal size too small for Control Panel.")
break
stdscr.clear()
stdscr.border()
@ -267,26 +267,26 @@ class ControlPanel:
elif child.action == "LINK_SERIAL_TO_MIDDLEWARE_UDP":
link_status = "Yes" if state.serial_info_temp["Go2Middleware"] else "No"
desc = f"{child.desc} [{link_status}]"
line = f"{marker}{child.name:15s} {desc}"
attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL
stdscr.addstr(start_line + i, 4, line, attr)
# 顯示訊息區域
# 顯示訊息區域
# info_start_line = start_line + len(current_menu.children) + 1
info_start_line = height - 8
max_msg_lines = 5 # 最多顯示 5 行訊息
current_time = time.time()
# 清理過時的訊息
state.panel_info_msg_list = [
(msg, timestamp) for msg, timestamp in state.panel_info_msg_list
if current_time - timestamp < 2.0 #秒數
]
# 只顯示最新的 max_msg_lines 條訊息
display_msgs = state.panel_info_msg_list[-max_msg_lines:]
for i, msg_data in enumerate(display_msgs):
if info_start_line + i >= help_line - 1: # 避免超出邊界
break
@ -298,13 +298,13 @@ class ControlPanel:
stdscr.addstr(info_start_line + i, 2, f"💬 {msg}", curses.A_BOLD)
# 操作說明
# help_line = start_line + len(current_menu.children) + 2
help_line = height - 2
stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層", curses.A_DIM)
stdscr.addstr(height-1 , width-12, f" {VERSION_NO} ", curses.A_DIM)
stdscr.addstr(height-1 , width-12, f" {PROJECT_VER} ", curses.A_DIM)
stdscr.refresh()
@ -319,8 +319,8 @@ class ControlPanel:
# continue
break
time.sleep(0.1)
if (state.mavlink_bridge_state == "Stopped" and
state.object_manager_state == "Stopped" and
if (state.mavlink_bridge_state == "Stopped" and
state.object_manager_state == "Stopped" and
state.serial_manager_state == "Stopped"):
state.intoSTOPPED()
# stop_evt.set()
@ -330,14 +330,14 @@ class ControlPanel:
# 設定短暫的 timeout讓執行緒能夠響應 stop_evt
stdscr.timeout(100)
ch = stdscr.getch()
if ch == -1: # 沒有操作
continue
# 處理按鍵
if ch in (curses.KEY_UP, ord('k')):
idx_stack[-1] = (current_idx - 1) % len(current_menu.children)
elif ch in (curses.KEY_DOWN, ord('j')):
idx_stack[-1] = (current_idx + 1) % len(current_menu.children)
@ -347,14 +347,14 @@ class ControlPanel:
elif ch == (ord('o')):
# 離開工程模式
state.intoSTART()
state.intoSTART()
elif ch == curses.KEY_LEFT:
# 返回上層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
elif ch == curses.KEY_RIGHT:
# 進入下層 (但不執行動作)
selected = current_menu.children[current_idx]
@ -369,26 +369,26 @@ class ControlPanel:
elif ch in (curses.KEY_ENTER, 10, 13):
selected = current_menu.children[current_idx]
# 處理不同類型的動作
if selected.children: # 有子選單
menu_stack.append(selected)
idx_stack.append(0)
elif selected.action == "BACK":
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
elif selected.action == "QUIT":
state.intoTERMINATION()
pre_panel_shutdown()
elif selected.action == "TEXT_UDP_IP":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 IP 位址: ")
if result is not None:
state.udp_info_temp["IP"] = result
elif selected.action == "TEXT_UDP_PORT":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 Port: ")
if result is not None:
@ -402,7 +402,7 @@ class ControlPanel:
idx_stack.pop()
# menu_stack.pop()
# idx_stack.pop()
elif selected.action == "CREATE_UDP_OUTBOUND":
cmd_q.put("CREATE_UDP_OUTBOUND")
# 確認後回到上兩層
@ -457,8 +457,8 @@ class ControlPanel:
elif selected.action == "LIST_SERIAL_LINKS":
created_list_menu = self.create_linked_serial_menu(state, page=0)
menu_stack.append(created_list_menu)
idx_stack.append(0)
idx_stack.append(0)
elif selected.action == "INSPECT_LINKED_SERIAL":
# 顯示 Serial 連結詳細資訊
if hasattr(selected, 'serial_id'):
@ -488,7 +488,7 @@ class ControlPanel:
current_list_menu = menu_stack[-1]
menu_stack.pop()
idx_stack.pop()
# 依據選單種類 重新建立分頁
if current_list_menu.name == "Serial Port List":
created_list_menu = self.create_serial_port_menu(state, page=selected.page)
@ -503,16 +503,16 @@ class ControlPanel:
menu_stack.append(current_list_menu)
idx_stack.append(0)
continue
menu_stack.append(created_list_menu)
idx_stack.append(0)
elif selected.action == "INSPECT_MAV_OBJECT":
# 顯示物件詳細資訊
if hasattr(selected, 'socket_id'):
cmd_q.put(("INSPECT_MAV_OBJECT", selected.socket_id))
self.show_object_info(stdscr, selected.socket_id, state)
elif selected.action == "REMOVE_MAV_OBJECT":
# 移除物件
if hasattr(selected, 'socket_id'):
@ -575,7 +575,7 @@ class ControlPanel:
created_list_menu = self.create_vehicles_list_menu(state, page=0)
menu_stack.append(created_list_menu)
idx_stack.append(0)
elif selected.action == "INSPECT_VEHICLE":
# 顯示載具詳細資訊
if hasattr(selected, 'sysid') and hasattr(selected, 'compid'):
@ -585,7 +585,7 @@ class ControlPanel:
elif callable(selected.action):
# 執行函式
cmd_q.put(selected.action)
try:
curses.wrapper(draw_menu)
except KeyboardInterrupt:
@ -593,12 +593,12 @@ class ControlPanel:
finally:
cleanup()
# ================ 關於 mavlink object 的部份 ===================
# ================ 關於 mavlink object 的部份 ===================
def create_object_list_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 mavlink_object 列表選單(支持分頁)"""
children = []
if not state.socket_object_list:
children.append(MenuNode("(Empty)", "目前沒有連結口", None))
else:
@ -606,7 +606,7 @@ class ControlPanel:
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的物件
for socket_id in state.socket_object_list[start_idx:end_idx]:
# 為每個 socket 創建子選單
@ -656,11 +656,11 @@ class ControlPanel:
dialog_width = min(70, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(0, 2, f" Socket #{socket_id} 詳細資訊 ", curses.A_BOLD)
# 這裡顯示基本資訊
dialog_win.addstr(2, 2, f"Socket ID : {socket_id}")
dialog_win.addstr(3, 2, f"Socket status : {state.socket_info_single.get('socket_state', 'N/A')}")
@ -676,15 +676,15 @@ class ControlPanel:
dialog_win.addstr(8, 2, f"Switching Targets: {show_str if show_str else 'N/A'}")
state.socket_info_single['InfoReady'] = False # 重置狀態以便下次使用
dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...")
dialog_win.refresh()
dialog_win.getch()
del dialog_win
stdscr.clear()
stdscr.refresh()
def select_target_socket(self, stdscr, source_socket_id, state: PanelState, remove_mode=False):
"""選擇目標 socket 的對話框"""
height, width = stdscr.getmaxyx()
@ -692,13 +692,13 @@ class ControlPanel:
dialog_width = min(50, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.keypad(True)
title = "選擇要移除的目標" if remove_mode else "選擇轉發目標"
available_sockets = [sid for sid in state.socket_object_list if sid != source_socket_id]
if not available_sockets:
dialog_win.border()
dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD)
@ -710,24 +710,24 @@ class ControlPanel:
stdscr.clear()
stdscr.refresh()
return None
selected_idx = 0
while True:
dialog_win.clear()
dialog_win.border()
dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD)
for i, socket_id in enumerate(available_sockets):
marker = "" if i == selected_idx else " "
attr = curses.A_REVERSE if i == selected_idx else curses.A_NORMAL
dialog_win.addstr(2 + i, 2, f"{marker} Socket #{socket_id}", attr)
dialog_win.addstr(dialog_height - 2, 2, "Enter確認 ESC取消")
dialog_win.refresh()
ch = dialog_win.getch()
if ch in (curses.KEY_UP, ord('k')):
selected_idx = (selected_idx - 1) % len(available_sockets)
elif ch in (curses.KEY_DOWN, ord('j')):
@ -749,11 +749,11 @@ class ControlPanel:
def create_serial_port_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 serial port 列表選單(支持分頁)"""
children = []
# 獲取可用的 Serial 連接埠列表
# serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓
serial_ports = acquireSerial.get_serial_ports_with_filter(['/dev/ttyUSB*', '/dev/ttyACM*'])
if not serial_ports:
children.append(MenuNode("(Empty)", "目前沒有串口設備", None))
else:
@ -761,7 +761,7 @@ class ControlPanel:
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的串口
for port in serial_ports[start_idx:end_idx]:
port_menu = MenuNode(f"{port}", children=[
@ -781,7 +781,7 @@ class ControlPanel:
for child in port_menu.children:
child.port = port
children.append(port_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
@ -793,7 +793,7 @@ class ControlPanel:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Serial Port List", f"串口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
@ -802,7 +802,7 @@ class ControlPanel:
def create_linked_serial_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 已連線的 serial port 列表選單(支持分頁)並包含後續管理功能"""
children = []
if not state.linked_serial_dict:
children.append(MenuNode("(Empty)", "目前沒有連結口", None))
else:
@ -810,7 +810,7 @@ class ControlPanel:
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的物件
linked_serial_id_list = list(state.linked_serial_dict.keys())
for serial_id in linked_serial_id_list[start_idx:end_idx]:
@ -825,7 +825,7 @@ class ControlPanel:
for child in obj_menu.children:
child.serial_id = serial_id
children.append(obj_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
@ -837,7 +837,7 @@ class ControlPanel:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Linked Serial List", f"連結口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
@ -859,14 +859,14 @@ class ControlPanel:
dialog_width = min(70, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(0, 2, f" Serial Link #{serial_id} 詳細資訊 ", curses.A_BOLD)
# 從 linked_serial_dict 獲取資訊
serial_info = state.linked_serial_dict.get(serial_id, {})
if not serial_info:
dialog_win.addstr(2, 2, f"無法取得 Serial #{serial_id} 的資訊")
else:
@ -877,17 +877,17 @@ class ControlPanel:
dialog_win.addstr(5, 2, f"Communication : {state.serial_info_single.get('receiver_type', 'N/A')}")
dialog_win.addstr(6, 2, f"Target UDP Port : {state.serial_info_single.get('target_port', 'N/A')}")
dialog_win.addstr(7, 2, f"Status : {state.serial_info_single.get('status', 'Running')}")
# 如果有額外資訊可以繼續添加
if 'thread_alive' in serial_info:
thread_status = "Alive" if serial_info['thread_alive'] else "Stopped"
dialog_win.addstr(8, 2, f"Thread Status : {thread_status}")
state.serial_info_single['InfoReady'] = False # 重置狀態以便下次使用
dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...")
dialog_win.refresh()
dialog_win.getch()
del dialog_win
stdscr.clear()
@ -900,7 +900,7 @@ class ControlPanel:
每個 vehicle-component 組合都是獨立的選單項目
"""
children = []
if not state.connected_vehicles_dict:
children.append(MenuNode("(Empty)", "目前沒有已連線的載具", None))
else:
@ -908,22 +908,22 @@ class ControlPanel:
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# vehicle_id_list 現在是 (sysid, compid) 的元組列表
vehicle_comp_list = list(state.connected_vehicles_dict.keys())
# 顯示當前頁的物件
for sysid, compid in vehicle_comp_list[start_idx:end_idx]:
# 建立顯示名稱
display_name = f"Vehicle #{sysid} - Comp #{compid}"
desc = f"載具 {sysid} 組件 {compid}"
vehicle_menu = MenuNode(display_name, desc, "INSPECT_VEHICLE")
# 將 sysid 和 compid 附加到選單項目上
vehicle_menu.sysid = sysid
vehicle_menu.compid = compid
children.append(vehicle_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
@ -935,7 +935,7 @@ class ControlPanel:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Connected Vehicles", f"已連線載具列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
@ -943,7 +943,7 @@ class ControlPanel:
def show_vehicle_info(self, stdscr, sysid, compid, cmd_q: queue.Queue, state: PanelState):
"""顯示載具組件詳細資訊(動態更新,顯示變化率)"""
# 等待資訊準備
start = time.time()
while not state.vehicle_info_single.get('InfoReady', False):
@ -951,30 +951,30 @@ class ControlPanel:
state.panel_info_msg_list.append(("Fail! Vehicle Info NOT Acquired!", time.time()))
return
time.sleep(0.05)
info = state.vehicle_info_single
height, width = stdscr.getmaxyx()
dialog_height = min(22, height - 4)
dialog_width = min(70, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.nodelay(True) # 非阻塞模式,允許動態更新
dialog_win.keypad(True)
# MAV_TYPE 名稱對應
MAV_TYPE_NAMES = {
0: "Generic", 1: "Fixed Wing", 2: "Quadrotor", 3: "Helicopter",
4: "Antenna Tracker", 5: "GCS", 6: "Airship", 10: "Ground Rover",
12: "Boat", 13: "Submarine", 26: "Gimbal", 30: "Camera"
}
# 動態更新迴圈
last_update = time.time()
while True:
current_time = time.time()
# 每 1 秒重新請求資料
if current_time - last_update >= 1.0:
# 觸發資料更新(透過 Orchestrator
@ -989,58 +989,58 @@ class ControlPanel:
# 更新 info 參照
info = state.vehicle_info_single
last_update = current_time
dialog_win.clear()
dialog_win.border()
dialog_win.addstr(0, 2, f" Vehicle #{info['sysid']} - Component #{info['compid']} ", curses.A_BOLD)
# === 基礎資訊 ===
dialog_win.addstr(2, 2, "[Identity]", curses.A_UNDERLINE)
dialog_win.addstr(2, 32, "[Connection]", curses.A_UNDERLINE)
# # MAV Type # 這個用不到
# mav_type = info.get('vehicle_type', 'N/A')
# mav_type_str = f"{mav_type} ({MAV_TYPE_NAMES.get(mav_type, 'Unknown')})" if isinstance(mav_type, int) else str(mav_type)
# dialog_win.addstr(3, 2, f"MAV Type : {mav_type_str}")
# Component Type
dialog_win.addstr(3, 2, f"Component Type : {info.get('component_type', 'N/A')}")
# Autopilot Type
if info.get('mav_autopilot') is not None:
dialog_win.addstr(4, 2, f"Autopilot : {info.get('mav_autopilot', 'N/A')}")
# Connection Info
dialog_win.addstr(3, 32, f"Connection : {info.get('connection_type', 'N/A')}")
dialog_win.addstr(4, 32, f"Socket ID : #{info.get('socket_id', 'N/A')}")
# === 封包統計 ===
stats = info.get('packet_stats', {})
dialog_win.addstr(7, 2, "[Packet Statistics]", curses.A_UNDERLINE)
received = stats.get('received', 0)
lost = stats.get('lost', 0)
loss_rate = stats.get('loss_rate', 0.0)
last_seq = stats.get('last_seq', 'N/A')
# 計算變化
received_delta = stats.get('received_delta', 0)
lost_delta = stats.get('lost_delta', 0)
# 顯示變化率
recv_str = f"{received:6d}"
if received_delta > 0:
recv_str += f" (+{received_delta}↑)"
lost_str = f"{lost:4d}"
if lost_delta > 0:
lost_str += f" (+{lost_delta}↑)"
dialog_win.addstr(8, 2, f"Received : {recv_str}")
dialog_win.addstr(8, 32, f"Lost : {lost_str}")
dialog_win.addstr(9, 2, f"Loss Rate : {loss_rate:.2f}%")
dialog_win.addstr(9, 32, f"Last Seq : {last_seq}")
# 最後接收時間
last_msg_time = stats.get('last_msg_time')
if last_msg_time:
@ -1050,50 +1050,50 @@ class ControlPanel:
dialog_win.addstr(10, 32, f"Elapsed : {elapsed:.1f}s")
else:
dialog_win.addstr(10, 2, "Last Time : N/A")
# === 訊息類型分佈 ===
dialog_win.addstr(12, 2, "[Message Types] (Top 12)", curses.A_UNDERLINE)
msg_counts = info.get('msg_type_counts', {})
# MAVLink 訊息名稱對應(縮寫版本)
MSG_NAMES = {
0: "HB", 1: "SYS_ST", 24: "GPS_RAW", 27: "RAW_IMU",
30: "ATT", 32: "LOC_POS", 33: "GLB_POS", 62: "NAV_CTL",
74: "VFR_HUD", 147: "BATT_ST"
}
# 顯示前 12 個最常見的訊息類型(兩列各 6 個)
msg_items = list(msg_counts.items())[:12]
line = 13
for i, (msg_id, count) in enumerate(msg_items):
msg_name = MSG_NAMES.get(msg_id, "???")
delta = stats.get(f'msg_delta_{msg_id}', 0)
# 格式化數據
if delta > 0:
data_str = f"{count}(+{delta}↑)"
else:
data_str = f"{count}"
# 格式化顯示:[ID]NAME DATA (ID固定3字符寬度右對齊)
display_str = f"[{msg_id:3d}]{msg_name:8s} {data_str}"
# 左列(偶數索引)或右列(奇數索引)
col = 2 if i % 2 == 0 else 36
row = line + (i // 2)
if row >= dialog_height - 3: # 避免超出邊界
break
dialog_win.addstr(row, col, display_str)
# 確保跳過顯示區域
line = line + 6
dialog_win.addstr(dialog_height - 2, 2, "Press R: Reset Stats, C: Unregister, other key to return...")
dialog_win.refresh()
# 檢查是否有按鍵(非阻塞)
ch = dialog_win.getch()
if ch in (ord('R'), ord('r')):
@ -1103,10 +1103,10 @@ class ControlPanel:
break
elif ch != -1: # 有按鍵則退出
break
# 短暫延遲
time.sleep(0.1)
state.vehicle_info_single['InfoReady'] = False
del dialog_win
stdscr.clear()
@ -1140,19 +1140,19 @@ class Orchestrator:
self.fc_ros_manager = mros.ros2_manager
self.fc_ros_manager.initialize()
self.fc_ros_manager.status_publisher.rate_controller.topic_intervals = {
'position': 1.0,
'attitude': 0.0,
'velocity': 0.0,
'battery': 1.0,
'vfr_hud': 1.0,
'mode': 0.0,
'summary': 1.0,
'position': 1.0,
'attitude': 1.0,
'velocity': 0.0,
'battery': 1.0,
'vfr_hud': 1.0,
'mode': 0.0,
'summary': 1.0,
}
def engageWholeSystem(self):
"""啟動整個系統"""
# === 1) 面板部分的啟動 ===
self.panel_thread = threading.Thread(target=self.cPanel.panel_thread, args=(self.cmd_q, self.panelState, self.stop_evt))
self.panel_thread = threading.Thread(target=self.cPanel.panel_thread, args=(self.cmd_q, self.panelState, self.stop_evt))
self.panel_thread.start()
# === 2) async_io_manager & mavlink_bridge 部分的啟動 ===
@ -1215,7 +1215,7 @@ class Orchestrator:
for s_id in mo.mavlink_object.mavlinkObjects:
s_obj = mo.mavlink_object.mavlinkObjects[s_id]
if socket_id in s_obj.target_sockets:
self.remove_target_from_object(s_id, socket_id)
self.remove_target_from_object(s_id, socket_id)
# 再移除該物件
self.delete_udp_object(socket_id)
elif action == "MAVOBJ_ADD_TARGET":
@ -1262,7 +1262,7 @@ class Orchestrator:
elif action == "UNREGISTER_VEHICLE":
sysid = cmd[1]
self.vehicle_registry.unregister(sysid)
elif cmd == "CREATE_UDP_INBOUND":
self.panelState.udp_info_temp["direction"] = "inbound"
self.create_udp_object()
@ -1291,8 +1291,8 @@ class Orchestrator:
logger.error(f"Unexpected error in main loop: {e}")
finally:
# 驗證並確保所有模組都被下達關閉訊號
# 若是由面板操作結束系統 這些關閉行為將於 ControlPanel.pre_panel_shutdown() 觸發
# 驗證並確保所有模組都被下達關閉訊號
# 若是由面板操作結束系統 這些關閉行為將於 ControlPanel.pre_panel_shutdown() 觸發
if self.bridge.thread.is_alive():
if self.bridge.running:
self.bridge.stop()
@ -1318,7 +1318,7 @@ class Orchestrator:
self.panel_thread.join(timeout=2)
time.sleep(0.5) # 等待各模組穩定關閉
logger.info("Main orchestrator END!")
# =============== 面板動作 - Mavlink Object ===============
@ -1339,10 +1339,11 @@ class Orchestrator:
self.occupied_ip_ports[ip].append(port)
else:
self.occupied_ip_ports[ip] = [port]
# 外放資訊部分
elif self.panelState.udp_info_temp["direction"] == "outbound":
connection_string = f"udpout:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}"
try:
# 檢測這個 connection_string 是否能成功建立 mavlink 連結
mavlink_socket = mavutil.mavlink_connection(connection_string)
@ -1362,7 +1363,7 @@ class Orchestrator:
mavlink_object.socket_type = socket_type
self.panelState.panel_info_msg_list.append((f"Created UDP {self.panelState.udp_info_temp['direction']} object: {connection_string}", time.time()))
def delete_udp_object(self, socket_id):
"""移除指定的 mavlink_object"""
@ -1406,10 +1407,10 @@ class Orchestrator:
def _update_vehicles_list(self):
"""更新已連線載具列表(從 vehicle_registry 獲取)"""
vehicles_dict = {}
# 從 vehicle_registry 獲取所有載具
all_vehicles = self.vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
# 遍歷每個載具的所有組件
for compid, component in vehicle.components.items():
@ -1422,9 +1423,9 @@ class Orchestrator:
'connection_via': vehicle.connected_via.value,
'socket_id': vehicle.custom_meta.get('socket_id', 'N/A')
}
self.panelState.connected_vehicles_dict = vehicles_dict
def _prepare_vehicle_info(self, sysid, compid):
"""準備載具組件的詳細資訊(包含變化率計算)"""
vehicle = self.vehicle_registry.get(sysid)
@ -1433,37 +1434,37 @@ class Orchestrator:
return
socket_id = vehicle.custom_meta.get('socket_id', 'N/A')
component = vehicle.get_component(compid)
if not component:
logger.warning(f"Component {compid} not found in vehicle {sysid}")
return
stats = component.packet_stats
# 取得之前的統計資料(用於計算變化)
prev_stats = self.panelState.vehicle_info_single.get('prev_stats', {})
prev_received = prev_stats.get('received', 0)
prev_lost = prev_stats.get('lost', 0)
prev_msg_counts = prev_stats.get('msg_counts', {})
# 計算變化率
received_delta = stats.received_count - prev_received
lost_delta = stats.lost_count - prev_lost
# 準備訊息類型計數(排序後取前幾個)
sorted_msg_counts = dict(sorted(
stats.msg_type_count.items(),
key=lambda x: x[1],
reverse=True
)[:12]) # 取前 12 個最常見的
# 計算每種訊息類型的變化
msg_deltas = {}
for msg_id, count in sorted_msg_counts.items():
prev_count = prev_msg_counts.get(msg_id, 0)
msg_deltas[f'msg_delta_{msg_id}'] = count - prev_count
# 更新 vehicle_info_single
socket_type = "N/A"
socket_obj =mo.mavlink_object.mavlinkObjects.get(socket_id, None)
@ -1481,7 +1482,7 @@ class Orchestrator:
"packet_stats": {
"received": stats.received_count,
"lost": stats.lost_count,
"loss_rate": (stats.lost_count / stats.received_count * 100
"loss_rate": (stats.lost_count / stats.received_count * 100
if stats.received_count > 0 else 0),
"last_seq": stats.last_seq,
"last_msg_time": stats.last_msg_time,
@ -1508,8 +1509,8 @@ class Orchestrator:
(f"Fail! Serial Port {self.panelState.serial_info_temp['Port']} already linked.", time.time())
)
return
# 獲取可用的 udp port
# 獲取可用的 udp port
udp_port_tmp = find_available_port(19000, 20000)
# 定義通訊類型映射表
@ -1526,7 +1527,7 @@ class Orchestrator:
("Please select Communication Type first.", time.time())
)
return
# 查找對應的通訊類型
comm_type_tmp = COMM_TYPE_MAP.get(comm_type)
if comm_type_tmp is None:
@ -1554,12 +1555,31 @@ class Orchestrator:
self.panelState.udp_info_temp['Port'] = str(udp_port_tmp)
self.panelState.udp_info_temp['direction'] = "inbound"
self.create_udp_object("SERIAL_XBEE")
def main():
stop_evt = threading.Event()
# =========== 各項模組的版本先驗 ===========
# 除非你有在做這幾項模組的改版 不然動到這邊的版本號 代表執行環境有很大的問題!!!!!!
version_check = True
if mo.MODULE_VER != "1.50":
print("Module Version Error! : mavlinkObkect")
version_check = False
if mros.MODULE_VER != "1.50":
print("Module Version Error! : mavlinkROS2Nodes")
version_check = False
if mvv.MODULE_VER != "1.00":
print("Module Version Error! : mavlinkVehicleView")
version_check = False
if sm.MODULE_VER != "0.60":
print("Module Version Error! : serialManager")
version_check = False
if version_check == False:
print("Environment Obstacle! Check YOUR Execution System Path First!!")
return
# ========================================
stop_evt = threading.Event()
def signal_handler(signum, frame):
"""處理 Ctrl+C 信號 藉此訊號 會結束下面的 while 迴圈 並逐步關閉各模組"""
stop_evt.set()
@ -1593,4 +1613,7 @@ if __name__ == "__main__":
4. 註解掉無效代碼 action == "UPDATE_VEHICLES_LIST" 區塊
5. 系統納入 mavlink ROS2 Manager 模組
2025.04.13
1. 加入各項模組的版本先驗 檢驗失敗就直接離開
'''

@ -58,6 +58,8 @@ from .utils import RingBuffer, setup_logger
# ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.50"
stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255)
return_packet_ring = RingBuffer(capacity=256, buffer_id=254)

@ -39,6 +39,7 @@ from . import mavlinkObject as mo
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.50"
# ============================================================================
# VehicleStatusPublisher Node
@ -55,14 +56,16 @@ class PublishRateController:
# 例如:'ekf_status': 1.0, # EKF 狀態
# ═══════════════════════════════════════════════════════════════
# 各 topic 的發布間隔(秒)
# 注意 這邊是定義區 不要把參數寫在這裡 所以預設全部關閉
# 以這個專案 請看 mainOrchestrator.py 的 Orchestrator 初始化階段
self.topic_intervals = {
'position': 0.5, # GPS位置
'attitude': 0.5, # 姿態
'velocity': 0.5, # 速度
'battery': 1.0, # 電池
'vfr_hud': 0.5, # VFR HUD
'mode': 1.0, # 飛行模式
'summary': 1.0, # 載具摘要
'position': 0.0, # GPS位置
'attitude': 0.0, # 姿態
'velocity': 0.0, # 速度
'battery': 0.0, # 電池
'vfr_hud': 0.0, # VFR HUD
'mode': 0.0, # 飛行模式
'summary': 0.0, # 載具摘要
# 在這裡新增更多 topics...
}
# 記錄每個 topic 的最後發布時間 {(sysid, topic): timestamp}
@ -139,10 +142,8 @@ class VehicleStatusPublisher(Node):
if not self.running:
return
# 從 vehicle_registry 獲取所有載具
all_vehicles = mvv.vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
# 高頻快路徑:讀取 registry 的 immutable 快照
for sysid, vehicle in mvv.vehicle_registry.read_all():
self._publish_vehicle_status(vehicle)
def _publish_vehicle_status(self, vehicle: mvv.VehicleView):
@ -232,7 +233,6 @@ class VehicleStatusPublisher(Node):
att = status.attitude
if att.roll is None:
return
publisher = self._get_or_create_publisher(sysid, 'attitude', sensor_msgs.msg.Imu)
if publisher.get_subscription_count() == 0:
@ -257,6 +257,7 @@ class VehicleStatusPublisher(Node):
publisher.publish(msg)
# TODO 這邊要改成 XY 的位置與速度
def _publish_velocity(self, sysid: int, status: mvv.ComponentStatus):
"""發布速度"""
if not self.rate_controller.should_publish(sysid, 'velocity'):

@ -5,14 +5,14 @@ VehicleView - Pure State Container
"""
import os
from typing import Dict, Optional, Any
from typing import Dict, Optional, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.00"
# ====================== Enums =====================
@ -402,11 +402,18 @@ class VehicleViewRegistry:
def __init__(self):
self._vehicles: Dict[int, VehicleView] = {}
# 高頻讀取用 snapshot弱一致性
self._vehicles_snapshot_items: Tuple[Tuple[int, VehicleView], ...] = tuple()
def _refresh_snapshot(self) -> None:
"""重建快照(僅在 key 集合變動時觸發)"""
self._vehicles_snapshot_items = tuple(self._vehicles.items())
def register(self, sysid: int) -> VehicleView:
"""註冊新的載具視圖"""
if sysid not in self._vehicles:
self._vehicles[sysid] = VehicleView(sysid)
self._refresh_snapshot()
logger.info(f"Registered new VehicleView for system {sysid}")
return self._vehicles[sysid]
@ -418,6 +425,7 @@ class VehicleViewRegistry:
"""註銷載具視圖"""
if sysid in self._vehicles:
del self._vehicles[sysid]
self._refresh_snapshot()
logger.info(f"Unregistered VehicleView for system {sysid}")
return True
return False
@ -425,10 +433,22 @@ class VehicleViewRegistry:
def get_all(self) -> Dict[int, VehicleView]:
"""取得所有載具視圖"""
return self._vehicles.copy()
def read_all(self) -> Tuple[Tuple[int, VehicleView], ...]:
"""
高效讀取所有載具視圖快照弱一致性
注意:
- 回傳 immutable tuple 快照適合高頻迭代
- 僅在 register/unregister/clear 後更新可能不是最新資料
- 適合高頻讀取且可接受弱一致性的情境
"""
return self._vehicles_snapshot_items
def clear(self) -> None:
"""清空所有載具視圖"""
self._vehicles.clear()
self._refresh_snapshot()
logger.info("Cleared all VehicleViews")
def __len__(self) -> int:
@ -449,5 +469,8 @@ vehicle_registry = VehicleViewRegistry()
2026.01.16
1. 新增 重置指定組件的封包統計 功能
2026.04.13
1. 新增 VehicleViewRegistry.read_all 方法 ROS2 Node 組件更有效率的讀取其中的資料
'''

@ -26,6 +26,7 @@ from .utils import setup_logger
# ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "0.60"
# ====================== 分割線 =====================

@ -1,14 +1,10 @@
from .longCommand import CommandLongClient, ChangeModeResult
from .changeMode import change_mode
from .arm_disarm import arm_disarm
from .takeoff import takeoff
from .land import land
from .longCommand import CommandLongClient, CommandLongResult
from .navigation import PositionTargetGlobalIntClient, PositionTargetGlobalIntResult
__all__ = [
"CommandLongClient",
"ChangeModeResult",
"change_mode",
"arm_disarm",
"takeoff",
"land",
"PositionTargetGlobalIntClient",
"CommandLongResult",
"PositionTargetGlobalIntResult",
]

@ -1,93 +0,0 @@
"""Simple wrapper for arm/disarm via fc_network ROS2 service (MAV_CMD_COMPONENT_ARM_DISARM)."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_COMPONENT_ARM_DISARM = 400
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class ArmDisarmResult:
success: bool
message: str
ack_result: int
def arm_disarm(
*,
target_sysid: int,
arm: bool,
target_compid: int = 0,
confirmation: int = 0,
param2: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> ArmDisarmResult:
"""One-shot MAV_CMD_COMPONENT_ARM_DISARM (400) wrapper.
param1: 1.0 to arm, 0.0 to disarm.
param2: usually 0. Some stacks use 21196 for force-arm (ArduPilot); pass via param2 if needed.
"""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_arm_disarm_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return ArmDisarmResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_COMPONENT_ARM_DISARM
req.confirmation = confirmation
req.param1 = 1.0 if arm else 0.0
req.param2 = float(param2)
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return ArmDisarmResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return ArmDisarmResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
r = arm_disarm(target_sysid=3, arm=True)
print(
f"arm_disarm success={r.success}, "
f"ack_result={r.ack_result}, message='{r.message}'"
)

@ -1,99 +0,0 @@
"""Simple wrapper for mode change via fc_network ROS2 service.
Reference CLI command:
ros2 service call /fc_network/vehicle/send_command_long \
fc_interfaces/srv/MavCommandLong \
"{target_sysid: 3, target_compid: 0, command: 176, confirmation: 0, \
param1: 1, param2: 4, param3: 0, param4: 0, param5: 0, param6: 0, \
param7: 0, timeout_sec: 2}"
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_DO_SET_MODE = 176
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class ChangeModeResult:
"""Return value for mode change requests."""
success: bool
message: str
ack_result: int
def change_mode(
*,
target_sysid: int,
custom_mode: float,
target_compid: int = 0,
base_mode: float = 1.0,
confirmation: int = 0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> ChangeModeResult:
"""One-shot helper for collaborators who want minimal code."""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_change_mode_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return ChangeModeResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_DO_SET_MODE
req.confirmation = confirmation
req.param1 = float(base_mode)
req.param2 = float(custom_mode)
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return ChangeModeResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return ChangeModeResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
# Example values are aligned with your terminal command.
result = change_mode(target_sysid=3, custom_mode=4)
print(
f"change_mode success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -1,90 +0,0 @@
"""Simple wrapper for land via fc_network ROS2 service."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_NAV_LAND = 21
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class LandResult:
success: bool
message: str
ack_result: int
def land(
*,
target_sysid: int,
target_compid: int = 0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
altitude_m: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> LandResult:
"""One-shot MAV_CMD_NAV_LAND wrapper."""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_land_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return LandResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_NAV_LAND
req.confirmation = 0
req.param1 = 0.0
req.param2 = 0.0
req.param3 = 0.0
req.param4 = float(yaw_deg)
req.param5 = float(latitude) if latitude is not None else 0.0
req.param6 = float(longitude) if longitude is not None else 0.0
req.param7 = float(altitude_m)
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return LandResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return LandResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
result = land(target_sysid=3)
print(
f"land success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -25,10 +25,6 @@ class CommandLongResult:
message: str
ack_result: int
ChangeModeResult = CommandLongResult
class CommandLongClient(Node):
"""ROS2 client : 對同一個 send_command_long service 發送各種 MAV_CMD_*。"""

@ -99,7 +99,6 @@ def _echo_position_target_matches(
@dataclass
class PositionTargetGlobalIntResult:
"""對應 MavPositionTargetGlobalInt.srv 的 Responsesuccess 依「送出與 r_* echo 是否一致」。"""
success: bool
message: str
ack_result: int
@ -117,7 +116,6 @@ class PositionTargetGlobalIntResult:
r_yaw: float = 0.0
r_yaw_rate: float = 0.0
class PositionTargetGlobalIntClient(Node):
"""
ROS2 client pos_global_int service 發送 SET_POSITION_TARGET_GLOBAL_INT (msg 86)
@ -230,51 +228,6 @@ class PositionTargetGlobalIntClient(Node):
r_yaw_rate=float(resp.r_yaw_rate),
)
# def set_position_target_global_int(
# self,
# *,
# target_sysid: int,
# target_compid: int = 0,
# time_boot_ms: int = 0,
# coordinate_frame: int = 6,
# type_mask: int = 0,
# lat_int: int = 0,
# lon_int: int = 0,
# alt: float = 0.0,
# vx: float = 0.0,
# vy: float = 0.0,
# vz: float = 0.0,
# afx: float = 0.0,
# afy: float = 0.0,
# afz: float = 0.0,
# yaw: float = 0.0,
# yaw_rate: float = 0.0,
# timeout_sec: float = DEFAULT_TIMEOUT_SEC,
# ) -> PositionTargetGlobalIntResult:
# """
# 送出 SET_POSITION_TARGET_GLOBAL_INTcoordinate_frame 預設 6 (MAV_FRAME_GLOBAL_INT)
# 可依飛控/任務覆寫。
# """
# return self._send_position_target_global_int(
# target_sysid=target_sysid,
# target_compid=target_compid,
# time_boot_ms=time_boot_ms,
# coordinate_frame=coordinate_frame,
# type_mask=type_mask,
# lat_int=lat_int,
# lon_int=lon_int,
# alt=alt,
# vx=vx,
# vy=vy,
# vz=vz,
# afx=afx,
# afy=afy,
# afz=afz,
# yaw=yaw,
# yaw_rate=yaw_rate,
# timeout_sec=timeout_sec,
# )
def goto_position_only(
self,
*,
@ -307,47 +260,3 @@ class PositionTargetGlobalIntClient(Node):
yaw_rate=0,
timeout_sec=timeout_sec,
)
# def set_position_target_global_int_deg(
# self,
# *,
# target_sysid: int,
# latitude_deg: float,
# longitude_deg: float,
# target_compid: int = 0,
# time_boot_ms: int = 0,
# coordinate_frame: int = MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
# type_mask: int = 0,
# alt: float = 0.0,
# vx: float = 0.0,
# vy: float = 0.0,
# vz: float = 0.0,
# afx: float = 0.0,
# afy: float = 0.0,
# afz: float = 0.0,
# yaw: float = 0.0,
# yaw_rate: float = 0.0,
# timeout_sec: float = DEFAULT_TIMEOUT_SEC,
# ) -> PositionTargetGlobalIntResult:
# """以度為單位的緯經,轉成 MAVLink 的 lat_int / lon_int (1e7 * deg)。"""
# lat_int = int(round(latitude_deg * 1e7))
# lon_int = int(round(longitude_deg * 1e7))
# return self.set_position_target_global_int(
# target_sysid=target_sysid,
# target_compid=target_compid,
# time_boot_ms=time_boot_ms,
# coordinate_frame=coordinate_frame,
# type_mask=type_mask,
# lat_int=lat_int,
# lon_int=lon_int,
# alt=alt,
# vx=vx,
# vy=vy,
# vz=vz,
# afx=afx,
# afy=afy,
# afz=afz,
# yaw=yaw,
# yaw_rate=yaw_rate,
# timeout_sec=timeout_sec,
# )

@ -1,91 +0,0 @@
"""Simple wrapper for takeoff via fc_network ROS2 service."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_NAV_TAKEOFF = 22
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class TakeoffResult:
success: bool
message: str
ack_result: int
def takeoff(
*,
target_sysid: int,
altitude_m: float,
target_compid: int = 0,
min_pitch_deg: float = 0.0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> TakeoffResult:
"""One-shot MAV_CMD_NAV_TAKEOFF wrapper."""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_takeoff_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return TakeoffResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_NAV_TAKEOFF
req.confirmation = 0
req.param1 = float(min_pitch_deg)
req.param2 = 0.0
req.param3 = 0.0
req.param4 = float(yaw_deg)
req.param5 = float(latitude) if latitude is not None else 0.0
req.param6 = float(longitude) if longitude is not None else 0.0
req.param7 = float(altitude_m)
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return TakeoffResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return TakeoffResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
result = takeoff(target_sysid=3, altitude_m=10.0)
print(
f"takeoff success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -1,55 +0,0 @@
from fc_network_apps import CommandLongClient
import time
def main():
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
commandAPI = CommandLongClient()
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=3.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=5.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -1,30 +0,0 @@
"""Usage example for arm/disarm helper.
Run from repo root with module mode:
python -m someotherpkg.src.example_arm_disarm
"""
from fc_network_apps import arm_disarm
def main() -> None:
# MAV_CMD_COMPONENT_ARM_DISARM (command=400)
# param1: 1 = arm, 0 = disarm
result = arm_disarm(
target_sysid=3,
target_compid=0,
arm=True,
timeout_sec=2.0,
)
print("=== arm result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
# To disarm instead:
# result = arm_disarm(target_sysid=3, target_compid=0, arm=False, timeout_sec=2.0)
if __name__ == "__main__":
main()

@ -1,29 +0,0 @@
"""Usage example for collaborators.
Run after sourcing your ROS2 workspace:
source install/local_setup.bash
python src/fc_network_apps/example_change_mode.py
"""
from fc_network_apps import change_mode
def main() -> None:
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
result = change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -1,27 +0,0 @@
"""Usage example for land helper.
Run from repo root with module mode:
python -m someotherpkg.src.example_land
"""
from fc_network_apps import land
def main() -> None:
# MAV_CMD_NAV_LAND (command=21)
# This example asks vehicle sysid=3 to land.
result = land(
target_sysid=3,
target_compid=0,
yaw_deg=0.0,
timeout_sec=2.0,
)
print("=== land result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -1,48 +0,0 @@
from __future__ import annotations
from fc_network_apps.navigation import PositionTargetGlobalIntClient
import time
def main() -> None:
# 目標載具 system id依你的連線修改
target_sysid = 3
nav = PositionTargetGlobalIntClient()
if not nav.wait_for_service(timeout_sec=5.0):
print("Service /fc_network/vehicle/pos_global_int not available.")
return
result_deg = nav.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.376655,
longitude_deg=149.157011,
alt=200.0,
timeout_sec=3.0,
)
print("=== set_position_target_global_int_deg ===")
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
time.sleep(180)
result_deg = nav.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.3632621,
longitude_deg=149.1652375,
alt=100.0,
timeout_sec=3.0,
)
print("=== set_position_target_global_int_deg ===")
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
if __name__ == "__main__":
main()

@ -1,28 +0,0 @@
"""Usage example for takeoff helper.
Run from repo root with module mode:
python -m someotherpkg.src.example_takeoff
"""
from fc_network_apps import takeoff
def main() -> None:
# MAV_CMD_NAV_TAKEOFF (command=22)
# This example asks vehicle sysid=3 to take off to 10 meters.
result = takeoff(
target_sysid=3,
target_compid=0,
altitude_m=10.0,
yaw_deg=0.0,
timeout_sec=2.0,
)
print("=== takeoff result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -1,64 +0,0 @@
from fc_network_apps import CommandLongClient
import time
def main():
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
commandAPI = CommandLongClient()
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=3.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(3)
result = commandAPI.arm_disarm(
target_sysid=3,
target_compid=0,
arm = True,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
result = commandAPI.takeoff(
target_sysid=3,
target_compid=0,
altitude_m = 30.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
# time.sleep(20)
# result = commandAPI.land(
# target_sysid=3,
# target_compid=0,
# )
# print("=== change mode result ===")
# print(f"success : {result.success}")
# print(f"ack_result: {result.ack_result}")
# print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -0,0 +1,111 @@
import fc_network_apps as fap
import time
def main():
commandAPI = fap.CommandLongClient()
navAPI = fap.PositionTargetGlobalIntClient()
target_sysid = 3
print("=== change mode ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.change_mode(
target_sysid=target_sysid,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=3.0,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
print("=== arm vehicle ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.arm_disarm(
target_sysid=target_sysid,
arm = True,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
print("=== takeoff ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.takeoff(
target_sysid=target_sysid,
altitude_m = 100.0,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
time.sleep(15)
print("=== set_position_target_global_int_deg ===")
result_deg = navAPI.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.376655,
longitude_deg=149.157011,
alt=150.0,
timeout_sec=3.0,
)
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
time.sleep(180)
result_deg = navAPI.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.3632621,
longitude_deg=149.1652375,
alt=100.0,
timeout_sec=3.0,
)
print("=== set_position_target_global_int_deg ===")
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
time.sleep(180)
print("=== land ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.land(
target_sysid=3,
target_compid=0,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
if __name__ == "__main__":
main()

@ -35,7 +35,6 @@ ros2 topic echo
mavproxy.py --master=tcp:127.0.0.1:5790 --out=udp:127.0.0.1:14560
ros2 service call /mavlink/add_two_ints example_interfaces/srv/AddTwoInts "{a: 5,b: 8}"
ros2 service call /fc_network/vehicle/mav_ping fc_interfaces/srv/MavPing "{target_sysid: 3, target_compid: 0, ping_seq: 1}"
@ -49,16 +48,7 @@ sudo tcpdump -i lo 'udp dst port 14561' -X
sudo tcpdump -i lo 'udp dst port 14550' -X -vv
sudo tcpdump -i lo -X udp port 14550
colcon build --packages-select fc_interfaces
-35.360150, 149.159659
-35.376655, 149.157011
0b00 0000 00000 00000
0b00 0011 01111 11000
0b 11 01111 11000
0b 1111 1101 1111 1000
幾個要討論的
1. 專案文件中 .mat 檔案是幹嘛的?
2. GUI 的 print [Panel/Map Update] 11 Hz 希望關閉
3. readme 那串文字來源? 用途?

Loading…
Cancel
Save