Compare commits

...

4 Commits

Author SHA1 Message Date
Chiyu Chen b0f1bd56f5 Merge branch 'master' of http://140.120.108.238:49308/chiyu1468/AirTrapMine
主專案整合
4 weeks ago
Chiyu Chen 2990de2f3f 1. (update) mavlinkROS2Nodes 新增 local_position 資訊 4 weeks ago
Chiyu Chen cf213fc556 1. (adding) mainOrchestrator 增加版本先驗程序
2. (adding) mavlinkVehicleView 增加 read_all 快速讀取
3. (modify) 優化 longCommand 與 navigation 使其更好引用
4. (adding) 提供一個完整的 example_wholeMoving 作為範例
4. (remove) 移除 fc_network_app 重複功能
5. (modify) 修改 overview_table 部分顯示文字
4 weeks ago
Chiyu Chen 017dd4923d 更新說明檔 fc_network_adapter 4 weeks ago

@ -1,43 +1,46 @@
這是天雷系統的專案
===
專案核心框架
1. ROS2 Humble
2. Python 3.8.10
===
必要相依套件 順便記錄我開發時的環境版本
Python
1. pymavlink -> Version: 2.4.42
2. conda-forge 中的 pyserial-asyncio
3. importlib_metadata -> Version: 8.5.0
4. setuptools -> Version: 58.2.0 (版本太新不行)
5. pyserial-asyncio
ROS2
1. source ~/ros2_humble/install/setup.bash
2.
===
功能簡介
## 功能簡介
1. mavlink 多對多支援平台
2. 不允許進到 ros 系統有相同 sysid
3. 假設一台載具上所有 component 共用同一 socket
===
開發用輔助專案
## 運行環境
### 專案核心框架
1. ROS2 Humble
2. Python 3.8.10
### 必要相依套件及版本
- Python
[Package] fc_network_adapter
1. pymavlink -> Version: 2.4.42
2. conda-forge 中的 pyserial-asyncio
3. importlib_metadata -> Version: 8.5.0
4. setuptools -> Version: 58.2.0 (版本太新不行)
5. pyserial-asyncio
[Package] GUI
1. testresources
2. websockets
3. PyQt6
4. PyQt6-WebEngine
- ROS2
1. source ~/ros2_humble/install/setup.bash
2. geographic_info (https://github.com/ros-geographic-info/geographic_info.git) 已經作為 submodule 放在 external
3. angles (https://github.com/ros/angles.git) 已經作為 submodule 放在 external
4. mavros_msgs (https://github.com/mavlink/mavros) 這個專案中的一個資料夾 這邊手動複製的
### 開發用輔助專案
1. Gazebo Garden
2. Ardupilot
===
依賴的 ROS 庫
1. https://github.com/ros-geographic-info/geographic_info.git 記得要搞 ros2 版本的
2. https://github.com/ros/angles.git
3. mavros_msgs 是 https://github.com/mavlink/mavros 這個專案中的一個資料夾 這邊手動複製的
## 使用說明
Clone 專案後 請先執行這些指令
```bash
# 1.同步 submodule
@ -49,17 +52,29 @@ colcon build --packages-select angles geographic_msgs
colcon build --packages-select mavros_msgs # 這個依賴前面的
colcon build --packages-select fc_interfaces # 自己定義的
```
1.
主要專案 fc_network_adapter 請一律在 ~/AirTrapMine/src/ 路徑下 以模組化啟動程式
```bash
# 記得先開啟 依賴 Package 到 overlay
. ./install/local_setup.bash
# 範例
cd ~/AirTrapMine/src/ # 這是範例!!!
python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
python -m fc_network_adapter.tests.demo_integration
python -m someotherpkg.src.example_wholeMoving
```
===
Package 簡述
<<<<<<< HEAD
2.
GUI 介面
在 ~/AirTrapMine/src/GUI 路徑下 直接啟動
```bash
cd ~/AirTrapMine/src/GUI
python gui.py
```
===
簡述0328
主要是透過test_transform這隻程式執行fdm_switch_example_one_with_remote_forwarding這個副函式透過UDP將封包轉接到Matlab並在Matlab進行演算法迭代後打包JSON檔並再次透過port口傳回封包傳回來的封包會在close_loop這隻程式被解析並提取其中有關velocity的資料儲存成變數以mavlink的方式打入Ardupilot以實現封閉迴路的構思。
=======
## 資料夾說明
1. unitdev 為各自協作者做開發時的測試區
01 -> 晉凱(ken)
02 -> 其宇(chiyu)
@ -77,19 +92,13 @@ Package 簡述
使用者的外層包裝
5. someotherpkg
如何使用 fc_network_apps 的範例檔案
6. GUI
由 PyQt6 開發的互動式介面
N. logs 是執行時期的記錄檔
===
主要專案 fc_network_adapter 請一律在 ~/AirTrapMine/src/ 資料夾下 以模組化啟動程式
例如 在 ~/AirTrapMine/src/ 資料夾下
```bash
# 記得先開啟 依賴 Package 到 overlay
. ./install/local_setup.bash
# 範例
python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
python -m fc_network_adapter.tests.demo_integration
```
>>>>>>> chiyu
簡述0328
主要是透過test_transform這隻程式執行fdm_switch_example_one_with_remote_forwarding這個副函式透過UDP將封包轉接到Matlab並在Matlab進行演算法迭代後打包JSON檔並再次透過port口傳回封包傳回來的封包會在close_loop這隻程式被解析並提取其中有關velocity的資料儲存成變數以mavlink的方式打入Ardupilot以實現封閉迴路的構思。

@ -6,7 +6,7 @@ class OverviewTable(QTableWidget):
"""總覽表格,顯示所有無人機的狀態資訊"""
# 默認的資訊類型和映射
DEFAULT_INFO_TYPES = ["模式", "ARM", "電壓", "經度", "緯度", "高度", "位置", "速度", "地速", "航向",
DEFAULT_INFO_TYPES = ["模式", "ARM", "電壓", "經度", "緯度", "高度", "XY位置", "XY速度", "地速", "航向",
"空速", "油門", "HUD ALT", "爬升率", "Roll", "Pitch", "Yaw", "丟包", "延遲"]
DEFAULT_INFO_TYPE_MAP = {

@ -6,6 +6,7 @@
# 開發此專案的注意事項
- 預設 autopilot 的 component id = 1
- 不允許 system id 重複
- 預設同一載具僅會使用相同的 socket
- 增加一個固定數值監控然後要到 ros2 topic
- mavlinkROS2Node.py 檔案內
- PublishRateController.topic_intervals 建立
@ -126,6 +127,7 @@
- *cls.mavlinkObjects* 資料結構 { socket_id(序號) : mavlink_object(物件實例) }
- *self.mavlink_socket* 從 pymavlink 繼承的socket物件
- *self.state* 描述這個 socket 物件的狀態
- *self.MAVLink* pymavlink 的一個裝置物件 模擬自己是某一個裝置 用以對於該 mavlink bus 負責封包發送
---
- *process_data()* [async method] 核心方法
- *remove_target_socket()* *add_target_socket()*
@ -170,6 +172,45 @@
## mavlinkVehicleView.py
這個檔案是作為載具的資訊暫存庫使用 會搭配 ROS2 的功能 再做利用
## mavlinkROS2Nodes.py
這裡支持了所有 ROS2 框架的對應功能 包涵
1. 將載具的串流更新的狀態更新到對應的 topic
2. 將地面站與載具互動 mavlink microservice 包裝起來提供 ros2 service 互動介面
### **[Class]** fc_ros_manager
MAVLink ROS2 節點管理器 管理兩個獨立的 Node
會開一個執行緒 讓兩個 Node 都跑在裡面
然後用 spin_once 保持 Node 的活性
- *self.status_publisher* 物件實例
- *self.command_service* 物件實例
- *self.spin_thread* 執行緒
---
- *start()* 啟動自己
- *stop()* 停下自己
- *shutdown()* 關閉自己並清理
### **[Class]** VehicleStatusPublisher
這整個組件都是自動的 目前沒有什麼需要 runtime 設定的地方
- *self.fc_publishers* 儲存所有 publisher 的資料結構
- *self.rate_controller* 儲存頻率參數的地方 也可以關掉某個 topic 的發佈
---
- *timer_callback()* 自動的抓出現有的 vehicle 中有的資訊 然後固定的將訊息丟到負責發佈的方法中
- *_publish_vehicle_status()* 接 timer_callback 後去分配到各項發佈中
- *_get_or_create_publisher()* 實際創建 topic 的地方
- *_publish_XXX()* 各項發佈的方法
### **[Class]** MavlinkCommandService
提供 ros2 service 讓 serial_object 去丟出 mavlink 封包
然後會從回應封包中挑出期待的回應 再傳給 ros2 request
其中 PendingEntry 是很關鍵的東西
它的每個物件 代表需要等待的封包種類與必需包含的內容
- *self._pending_by_sysid* 儲存 PendingEntry 的地方
---
- *return_router()* 檢查並消耗 return_packet_ring 然後將封包分配到 pending 去
- *__init__()* 這邊登入要創建的 service 到 ros2 系統
- *handle_XXX()* 這邊是實現 service 的具體方法
# 開發記錄
## 已實現功能
@ -182,9 +223,9 @@
7. 終端機介面控制
8. 基礎載具流量觀測
9. 載具狀態收集與彙整
10. a. ros2 topic 應用開發介面
10. a. ros2 topic 應用開發介面 (基礎框架)
b. ros2 service 應用開發介面 (基礎框架)
### 待開發功能
5-1. 建立 serial 連線 並可以對接收器下達AT指令
5-2. 模組化 serial 連線機制 以利後期擴容其他模組
10. a. ros2 應用開發介面
5-2. 模組化 serial 連線機制 以利後期擴容其他模組

@ -29,7 +29,7 @@ from .utils import acquireSerial, acquirePort
from .utils.acquirePort import find_available_port
logger = setup_logger(os.path.basename(__file__))
VERSION_NO = "v0.61"
PROJECT_VER = "v0.61"
class PanelState:
def __init__(self):
@ -39,9 +39,9 @@ class PanelState:
self.object_manager_state = "Stopped"
self.serial_manager_state = "Stopped"
self.ros2_manager_state = "Stopped"
self.socket_object_list = [] # 已有的 mavlink object
self.socket_object_list = [] # 已有的 mavlink object
self.linked_serial_dict = {} # 已連線的 serial 端口 serial id num : serial_port string
self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息
self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息
# 關於創建通道時的暫存資訊
self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊
@ -102,36 +102,36 @@ class ControlPanel:
def input_dialog(stdscr, prompt="請輸入文字: "):
"""顯示輸入對話框"""
height, width = stdscr.getmaxyx()
# 建立輸入視窗
dialog_height = 5
dialog_width = min(60, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
# 建立視窗邊框
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(1, 2, prompt)
dialog_win.addstr(3, 2, "按 Enter 確認, ESC 取消")
dialog_win.refresh()
# 輸入區域
input_win = curses.newwin(1, dialog_width - 6, start_y + 2, start_x + 2)
input_win.keypad(True)
curses.echo()
curses.curs_set(1)
user_input = ""
while True:
input_win.clear()
input_win.addstr(0, 0, user_input[-dialog_width+8:]) # 顯示輸入內容(滾動)
input_win.refresh()
ch = input_win.getch()
if ch == 27: # ESC
user_input = None
break
@ -141,16 +141,16 @@ class ControlPanel:
user_input = user_input[:-1]
elif 32 <= ch <= 126: # 可打印字符
user_input += chr(ch)
curses.noecho()
curses.curs_set(0)
# 清理視窗
del input_win
del dialog_win
stdscr.clear()
stdscr.refresh()
return user_input
# ================ 關於 主要選單 的部份 ===================
@ -179,9 +179,9 @@ class ControlPanel:
]),
MenuNode("Vehicles Insp.", "檢視已連線的遠端載具", action = "INSPECT_VEHICLES"),
MenuNode("Engineer Mode", "工程模式", children=[
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"),
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"),
MenuNode("Stop Serial M.", "停止 Serial 端口轉接", "STOP_SERIAL_MANAGER"),
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"),
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"),
MenuNode("Stop Serial M.", "停止 Serial 端口轉接", "STOP_SERIAL_MANAGER"),
]),
MenuNode("Shutdown", "關閉整個系統", children=[
MenuNode("Return", "繼續運行", "BACK"),
@ -191,7 +191,7 @@ class ControlPanel:
def panel_thread(self, cmd_q: queue.Queue, state: PanelState, stop_evt: threading.Event):
stdscr = None
def cleanup():
"""清理 curses 狀態"""
if stdscr:
@ -209,7 +209,7 @@ class ControlPanel:
def draw_menu(screen):
nonlocal stdscr
stdscr = screen
curses.curs_set(0)
stdscr.nodelay(False) # 阻塞讀鍵
stdscr.keypad(True)
@ -219,12 +219,12 @@ class ControlPanel:
idx_stack = [0] # 索引堆疊
state.intoSTART() # 設定狀態為運行中
while not stop_evt.is_set():
current_menu = menu_stack[-1]
current_idx = idx_stack[-1]
# 獲取終端機尺寸
height, width = stdscr.getmaxyx()
# 簡單暴力的限制視窗的大小
@ -237,7 +237,7 @@ class ControlPanel:
if height < MIN_HEIGHT or width < 60:
logger.error("Terminal size too small for Control Panel.")
break
stdscr.clear()
stdscr.border()
@ -267,26 +267,26 @@ class ControlPanel:
elif child.action == "LINK_SERIAL_TO_MIDDLEWARE_UDP":
link_status = "Yes" if state.serial_info_temp["Go2Middleware"] else "No"
desc = f"{child.desc} [{link_status}]"
line = f"{marker}{child.name:15s} {desc}"
attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL
stdscr.addstr(start_line + i, 4, line, attr)
# 顯示訊息區域
# 顯示訊息區域
# info_start_line = start_line + len(current_menu.children) + 1
info_start_line = height - 8
max_msg_lines = 5 # 最多顯示 5 行訊息
current_time = time.time()
# 清理過時的訊息
state.panel_info_msg_list = [
(msg, timestamp) for msg, timestamp in state.panel_info_msg_list
if current_time - timestamp < 2.0 #秒數
]
# 只顯示最新的 max_msg_lines 條訊息
display_msgs = state.panel_info_msg_list[-max_msg_lines:]
for i, msg_data in enumerate(display_msgs):
if info_start_line + i >= help_line - 1: # 避免超出邊界
break
@ -298,13 +298,13 @@ class ControlPanel:
stdscr.addstr(info_start_line + i, 2, f"💬 {msg}", curses.A_BOLD)
# 操作說明
# help_line = start_line + len(current_menu.children) + 2
help_line = height - 2
stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層", curses.A_DIM)
stdscr.addstr(height-1 , width-12, f" {VERSION_NO} ", curses.A_DIM)
stdscr.addstr(height-1 , width-12, f" {PROJECT_VER} ", curses.A_DIM)
stdscr.refresh()
@ -319,8 +319,8 @@ class ControlPanel:
# continue
break
time.sleep(0.1)
if (state.mavlink_bridge_state == "Stopped" and
state.object_manager_state == "Stopped" and
if (state.mavlink_bridge_state == "Stopped" and
state.object_manager_state == "Stopped" and
state.serial_manager_state == "Stopped"):
state.intoSTOPPED()
# stop_evt.set()
@ -330,14 +330,14 @@ class ControlPanel:
# 設定短暫的 timeout讓執行緒能夠響應 stop_evt
stdscr.timeout(100)
ch = stdscr.getch()
if ch == -1: # 沒有操作
continue
# 處理按鍵
if ch in (curses.KEY_UP, ord('k')):
idx_stack[-1] = (current_idx - 1) % len(current_menu.children)
elif ch in (curses.KEY_DOWN, ord('j')):
idx_stack[-1] = (current_idx + 1) % len(current_menu.children)
@ -347,14 +347,14 @@ class ControlPanel:
elif ch == (ord('o')):
# 離開工程模式
state.intoSTART()
state.intoSTART()
elif ch == curses.KEY_LEFT:
# 返回上層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
elif ch == curses.KEY_RIGHT:
# 進入下層 (但不執行動作)
selected = current_menu.children[current_idx]
@ -369,26 +369,26 @@ class ControlPanel:
elif ch in (curses.KEY_ENTER, 10, 13):
selected = current_menu.children[current_idx]
# 處理不同類型的動作
if selected.children: # 有子選單
menu_stack.append(selected)
idx_stack.append(0)
elif selected.action == "BACK":
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
elif selected.action == "QUIT":
state.intoTERMINATION()
pre_panel_shutdown()
elif selected.action == "TEXT_UDP_IP":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 IP 位址: ")
if result is not None:
state.udp_info_temp["IP"] = result
elif selected.action == "TEXT_UDP_PORT":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 Port: ")
if result is not None:
@ -402,7 +402,7 @@ class ControlPanel:
idx_stack.pop()
# menu_stack.pop()
# idx_stack.pop()
elif selected.action == "CREATE_UDP_OUTBOUND":
cmd_q.put("CREATE_UDP_OUTBOUND")
# 確認後回到上兩層
@ -457,8 +457,8 @@ class ControlPanel:
elif selected.action == "LIST_SERIAL_LINKS":
created_list_menu = self.create_linked_serial_menu(state, page=0)
menu_stack.append(created_list_menu)
idx_stack.append(0)
idx_stack.append(0)
elif selected.action == "INSPECT_LINKED_SERIAL":
# 顯示 Serial 連結詳細資訊
if hasattr(selected, 'serial_id'):
@ -488,7 +488,7 @@ class ControlPanel:
current_list_menu = menu_stack[-1]
menu_stack.pop()
idx_stack.pop()
# 依據選單種類 重新建立分頁
if current_list_menu.name == "Serial Port List":
created_list_menu = self.create_serial_port_menu(state, page=selected.page)
@ -503,16 +503,16 @@ class ControlPanel:
menu_stack.append(current_list_menu)
idx_stack.append(0)
continue
menu_stack.append(created_list_menu)
idx_stack.append(0)
elif selected.action == "INSPECT_MAV_OBJECT":
# 顯示物件詳細資訊
if hasattr(selected, 'socket_id'):
cmd_q.put(("INSPECT_MAV_OBJECT", selected.socket_id))
self.show_object_info(stdscr, selected.socket_id, state)
elif selected.action == "REMOVE_MAV_OBJECT":
# 移除物件
if hasattr(selected, 'socket_id'):
@ -575,7 +575,7 @@ class ControlPanel:
created_list_menu = self.create_vehicles_list_menu(state, page=0)
menu_stack.append(created_list_menu)
idx_stack.append(0)
elif selected.action == "INSPECT_VEHICLE":
# 顯示載具詳細資訊
if hasattr(selected, 'sysid') and hasattr(selected, 'compid'):
@ -585,7 +585,7 @@ class ControlPanel:
elif callable(selected.action):
# 執行函式
cmd_q.put(selected.action)
try:
curses.wrapper(draw_menu)
except KeyboardInterrupt:
@ -593,12 +593,12 @@ class ControlPanel:
finally:
cleanup()
# ================ 關於 mavlink object 的部份 ===================
# ================ 關於 mavlink object 的部份 ===================
def create_object_list_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 mavlink_object 列表選單(支持分頁)"""
children = []
if not state.socket_object_list:
children.append(MenuNode("(Empty)", "目前沒有連結口", None))
else:
@ -606,7 +606,7 @@ class ControlPanel:
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的物件
for socket_id in state.socket_object_list[start_idx:end_idx]:
# 為每個 socket 創建子選單
@ -656,11 +656,11 @@ class ControlPanel:
dialog_width = min(70, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(0, 2, f" Socket #{socket_id} 詳細資訊 ", curses.A_BOLD)
# 這裡顯示基本資訊
dialog_win.addstr(2, 2, f"Socket ID : {socket_id}")
dialog_win.addstr(3, 2, f"Socket status : {state.socket_info_single.get('socket_state', 'N/A')}")
@ -676,15 +676,15 @@ class ControlPanel:
dialog_win.addstr(8, 2, f"Switching Targets: {show_str if show_str else 'N/A'}")
state.socket_info_single['InfoReady'] = False # 重置狀態以便下次使用
dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...")
dialog_win.refresh()
dialog_win.getch()
del dialog_win
stdscr.clear()
stdscr.refresh()
def select_target_socket(self, stdscr, source_socket_id, state: PanelState, remove_mode=False):
"""選擇目標 socket 的對話框"""
height, width = stdscr.getmaxyx()
@ -692,13 +692,13 @@ class ControlPanel:
dialog_width = min(50, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.keypad(True)
title = "選擇要移除的目標" if remove_mode else "選擇轉發目標"
available_sockets = [sid for sid in state.socket_object_list if sid != source_socket_id]
if not available_sockets:
dialog_win.border()
dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD)
@ -710,24 +710,24 @@ class ControlPanel:
stdscr.clear()
stdscr.refresh()
return None
selected_idx = 0
while True:
dialog_win.clear()
dialog_win.border()
dialog_win.addstr(0, 2, f" {title} ", curses.A_BOLD)
for i, socket_id in enumerate(available_sockets):
marker = "" if i == selected_idx else " "
attr = curses.A_REVERSE if i == selected_idx else curses.A_NORMAL
dialog_win.addstr(2 + i, 2, f"{marker} Socket #{socket_id}", attr)
dialog_win.addstr(dialog_height - 2, 2, "Enter確認 ESC取消")
dialog_win.refresh()
ch = dialog_win.getch()
if ch in (curses.KEY_UP, ord('k')):
selected_idx = (selected_idx - 1) % len(available_sockets)
elif ch in (curses.KEY_DOWN, ord('j')):
@ -749,11 +749,11 @@ class ControlPanel:
def create_serial_port_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 serial port 列表選單(支持分頁)"""
children = []
# 獲取可用的 Serial 連接埠列表
# serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓
serial_ports = acquireSerial.get_serial_ports_with_filter(['/dev/ttyUSB*', '/dev/ttyACM*'])
if not serial_ports:
children.append(MenuNode("(Empty)", "目前沒有串口設備", None))
else:
@ -761,7 +761,7 @@ class ControlPanel:
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的串口
for port in serial_ports[start_idx:end_idx]:
port_menu = MenuNode(f"{port}", children=[
@ -781,7 +781,7 @@ class ControlPanel:
for child in port_menu.children:
child.port = port
children.append(port_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
@ -793,7 +793,7 @@ class ControlPanel:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Serial Port List", f"串口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
@ -802,7 +802,7 @@ class ControlPanel:
def create_linked_serial_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 已連線的 serial port 列表選單(支持分頁)並包含後續管理功能"""
children = []
if not state.linked_serial_dict:
children.append(MenuNode("(Empty)", "目前沒有連結口", None))
else:
@ -810,7 +810,7 @@ class ControlPanel:
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的物件
linked_serial_id_list = list(state.linked_serial_dict.keys())
for serial_id in linked_serial_id_list[start_idx:end_idx]:
@ -825,7 +825,7 @@ class ControlPanel:
for child in obj_menu.children:
child.serial_id = serial_id
children.append(obj_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
@ -837,7 +837,7 @@ class ControlPanel:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Linked Serial List", f"連結口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
@ -859,14 +859,14 @@ class ControlPanel:
dialog_width = min(70, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.border()
dialog_win.addstr(0, 2, f" Serial Link #{serial_id} 詳細資訊 ", curses.A_BOLD)
# 從 linked_serial_dict 獲取資訊
serial_info = state.linked_serial_dict.get(serial_id, {})
if not serial_info:
dialog_win.addstr(2, 2, f"無法取得 Serial #{serial_id} 的資訊")
else:
@ -877,17 +877,17 @@ class ControlPanel:
dialog_win.addstr(5, 2, f"Communication : {state.serial_info_single.get('receiver_type', 'N/A')}")
dialog_win.addstr(6, 2, f"Target UDP Port : {state.serial_info_single.get('target_port', 'N/A')}")
dialog_win.addstr(7, 2, f"Status : {state.serial_info_single.get('status', 'Running')}")
# 如果有額外資訊可以繼續添加
if 'thread_alive' in serial_info:
thread_status = "Alive" if serial_info['thread_alive'] else "Stopped"
dialog_win.addstr(8, 2, f"Thread Status : {thread_status}")
state.serial_info_single['InfoReady'] = False # 重置狀態以便下次使用
dialog_win.addstr(dialog_height - 2, 2, "按任意鍵返回...")
dialog_win.refresh()
dialog_win.getch()
del dialog_win
stdscr.clear()
@ -900,7 +900,7 @@ class ControlPanel:
每個 vehicle-component 組合都是獨立的選單項目
"""
children = []
if not state.connected_vehicles_dict:
children.append(MenuNode("(Empty)", "目前沒有已連線的載具", None))
else:
@ -908,22 +908,22 @@ class ControlPanel:
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# vehicle_id_list 現在是 (sysid, compid) 的元組列表
vehicle_comp_list = list(state.connected_vehicles_dict.keys())
# 顯示當前頁的物件
for sysid, compid in vehicle_comp_list[start_idx:end_idx]:
# 建立顯示名稱
display_name = f"Vehicle #{sysid} - Comp #{compid}"
desc = f"載具 {sysid} 組件 {compid}"
vehicle_menu = MenuNode(display_name, desc, "INSPECT_VEHICLE")
# 將 sysid 和 compid 附加到選單項目上
vehicle_menu.sysid = sysid
vehicle_menu.compid = compid
children.append(vehicle_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
@ -935,7 +935,7 @@ class ControlPanel:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Connected Vehicles", f"已連線載具列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
@ -943,7 +943,7 @@ class ControlPanel:
def show_vehicle_info(self, stdscr, sysid, compid, cmd_q: queue.Queue, state: PanelState):
"""顯示載具組件詳細資訊(動態更新,顯示變化率)"""
# 等待資訊準備
start = time.time()
while not state.vehicle_info_single.get('InfoReady', False):
@ -951,30 +951,30 @@ class ControlPanel:
state.panel_info_msg_list.append(("Fail! Vehicle Info NOT Acquired!", time.time()))
return
time.sleep(0.05)
info = state.vehicle_info_single
height, width = stdscr.getmaxyx()
dialog_height = min(22, height - 4)
dialog_width = min(70, width - 4)
start_y = (height - dialog_height) // 2
start_x = (width - dialog_width) // 2
dialog_win = curses.newwin(dialog_height, dialog_width, start_y, start_x)
dialog_win.nodelay(True) # 非阻塞模式,允許動態更新
dialog_win.keypad(True)
# MAV_TYPE 名稱對應
MAV_TYPE_NAMES = {
0: "Generic", 1: "Fixed Wing", 2: "Quadrotor", 3: "Helicopter",
4: "Antenna Tracker", 5: "GCS", 6: "Airship", 10: "Ground Rover",
12: "Boat", 13: "Submarine", 26: "Gimbal", 30: "Camera"
}
# 動態更新迴圈
last_update = time.time()
while True:
current_time = time.time()
# 每 1 秒重新請求資料
if current_time - last_update >= 1.0:
# 觸發資料更新(透過 Orchestrator
@ -989,58 +989,58 @@ class ControlPanel:
# 更新 info 參照
info = state.vehicle_info_single
last_update = current_time
dialog_win.clear()
dialog_win.border()
dialog_win.addstr(0, 2, f" Vehicle #{info['sysid']} - Component #{info['compid']} ", curses.A_BOLD)
# === 基礎資訊 ===
dialog_win.addstr(2, 2, "[Identity]", curses.A_UNDERLINE)
dialog_win.addstr(2, 32, "[Connection]", curses.A_UNDERLINE)
# # MAV Type # 這個用不到
# mav_type = info.get('vehicle_type', 'N/A')
# mav_type_str = f"{mav_type} ({MAV_TYPE_NAMES.get(mav_type, 'Unknown')})" if isinstance(mav_type, int) else str(mav_type)
# dialog_win.addstr(3, 2, f"MAV Type : {mav_type_str}")
# Component Type
dialog_win.addstr(3, 2, f"Component Type : {info.get('component_type', 'N/A')}")
# Autopilot Type
if info.get('mav_autopilot') is not None:
dialog_win.addstr(4, 2, f"Autopilot : {info.get('mav_autopilot', 'N/A')}")
# Connection Info
dialog_win.addstr(3, 32, f"Connection : {info.get('connection_type', 'N/A')}")
dialog_win.addstr(4, 32, f"Socket ID : #{info.get('socket_id', 'N/A')}")
# === 封包統計 ===
stats = info.get('packet_stats', {})
dialog_win.addstr(7, 2, "[Packet Statistics]", curses.A_UNDERLINE)
received = stats.get('received', 0)
lost = stats.get('lost', 0)
loss_rate = stats.get('loss_rate', 0.0)
last_seq = stats.get('last_seq', 'N/A')
# 計算變化
received_delta = stats.get('received_delta', 0)
lost_delta = stats.get('lost_delta', 0)
# 顯示變化率
recv_str = f"{received:6d}"
if received_delta > 0:
recv_str += f" (+{received_delta}↑)"
lost_str = f"{lost:4d}"
if lost_delta > 0:
lost_str += f" (+{lost_delta}↑)"
dialog_win.addstr(8, 2, f"Received : {recv_str}")
dialog_win.addstr(8, 32, f"Lost : {lost_str}")
dialog_win.addstr(9, 2, f"Loss Rate : {loss_rate:.2f}%")
dialog_win.addstr(9, 32, f"Last Seq : {last_seq}")
# 最後接收時間
last_msg_time = stats.get('last_msg_time')
if last_msg_time:
@ -1050,50 +1050,50 @@ class ControlPanel:
dialog_win.addstr(10, 32, f"Elapsed : {elapsed:.1f}s")
else:
dialog_win.addstr(10, 2, "Last Time : N/A")
# === 訊息類型分佈 ===
dialog_win.addstr(12, 2, "[Message Types] (Top 12)", curses.A_UNDERLINE)
msg_counts = info.get('msg_type_counts', {})
# MAVLink 訊息名稱對應(縮寫版本)
MSG_NAMES = {
0: "HB", 1: "SYS_ST", 24: "GPS_RAW", 27: "RAW_IMU",
30: "ATT", 32: "LOC_POS", 33: "GLB_POS", 62: "NAV_CTL",
74: "VFR_HUD", 147: "BATT_ST"
}
# 顯示前 12 個最常見的訊息類型(兩列各 6 個)
msg_items = list(msg_counts.items())[:12]
line = 13
for i, (msg_id, count) in enumerate(msg_items):
msg_name = MSG_NAMES.get(msg_id, "???")
delta = stats.get(f'msg_delta_{msg_id}', 0)
# 格式化數據
if delta > 0:
data_str = f"{count}(+{delta}↑)"
else:
data_str = f"{count}"
# 格式化顯示:[ID]NAME DATA (ID固定3字符寬度右對齊)
display_str = f"[{msg_id:3d}]{msg_name:8s} {data_str}"
# 左列(偶數索引)或右列(奇數索引)
col = 2 if i % 2 == 0 else 36
row = line + (i // 2)
if row >= dialog_height - 3: # 避免超出邊界
break
dialog_win.addstr(row, col, display_str)
# 確保跳過顯示區域
line = line + 6
dialog_win.addstr(dialog_height - 2, 2, "Press R: Reset Stats, C: Unregister, other key to return...")
dialog_win.refresh()
# 檢查是否有按鍵(非阻塞)
ch = dialog_win.getch()
if ch in (ord('R'), ord('r')):
@ -1103,10 +1103,10 @@ class ControlPanel:
break
elif ch != -1: # 有按鍵則退出
break
# 短暫延遲
time.sleep(0.1)
state.vehicle_info_single['InfoReady'] = False
del dialog_win
stdscr.clear()
@ -1140,19 +1140,20 @@ class Orchestrator:
self.fc_ros_manager = mros.ros2_manager
self.fc_ros_manager.initialize()
self.fc_ros_manager.status_publisher.rate_controller.topic_intervals = {
'position': 1.0,
'attitude': 0.0,
'velocity': 0.0,
'battery': 1.0,
'vfr_hud': 1.0,
'mode': 0.0,
'summary': 1.0,
'position': 1.0,
'position_ned': 1.0,
'attitude': 1.0,
'velocity': 0.0,
'battery': 1.0,
'vfr_hud': 1.0,
'mode': 0.0,
'summary': 1.0,
}
def engageWholeSystem(self):
"""啟動整個系統"""
# === 1) 面板部分的啟動 ===
self.panel_thread = threading.Thread(target=self.cPanel.panel_thread, args=(self.cmd_q, self.panelState, self.stop_evt))
self.panel_thread = threading.Thread(target=self.cPanel.panel_thread, args=(self.cmd_q, self.panelState, self.stop_evt))
self.panel_thread.start()
# === 2) async_io_manager & mavlink_bridge 部分的啟動 ===
@ -1215,7 +1216,7 @@ class Orchestrator:
for s_id in mo.mavlink_object.mavlinkObjects:
s_obj = mo.mavlink_object.mavlinkObjects[s_id]
if socket_id in s_obj.target_sockets:
self.remove_target_from_object(s_id, socket_id)
self.remove_target_from_object(s_id, socket_id)
# 再移除該物件
self.delete_udp_object(socket_id)
elif action == "MAVOBJ_ADD_TARGET":
@ -1262,7 +1263,7 @@ class Orchestrator:
elif action == "UNREGISTER_VEHICLE":
sysid = cmd[1]
self.vehicle_registry.unregister(sysid)
elif cmd == "CREATE_UDP_INBOUND":
self.panelState.udp_info_temp["direction"] = "inbound"
self.create_udp_object()
@ -1291,8 +1292,8 @@ class Orchestrator:
logger.error(f"Unexpected error in main loop: {e}")
finally:
# 驗證並確保所有模組都被下達關閉訊號
# 若是由面板操作結束系統 這些關閉行為將於 ControlPanel.pre_panel_shutdown() 觸發
# 驗證並確保所有模組都被下達關閉訊號
# 若是由面板操作結束系統 這些關閉行為將於 ControlPanel.pre_panel_shutdown() 觸發
if self.bridge.thread.is_alive():
if self.bridge.running:
self.bridge.stop()
@ -1318,7 +1319,7 @@ class Orchestrator:
self.panel_thread.join(timeout=2)
time.sleep(0.5) # 等待各模組穩定關閉
logger.info("Main orchestrator END!")
# =============== 面板動作 - Mavlink Object ===============
@ -1339,10 +1340,11 @@ class Orchestrator:
self.occupied_ip_ports[ip].append(port)
else:
self.occupied_ip_ports[ip] = [port]
# 外放資訊部分
elif self.panelState.udp_info_temp["direction"] == "outbound":
connection_string = f"udpout:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}"
try:
# 檢測這個 connection_string 是否能成功建立 mavlink 連結
mavlink_socket = mavutil.mavlink_connection(connection_string)
@ -1362,7 +1364,7 @@ class Orchestrator:
mavlink_object.socket_type = socket_type
self.panelState.panel_info_msg_list.append((f"Created UDP {self.panelState.udp_info_temp['direction']} object: {connection_string}", time.time()))
def delete_udp_object(self, socket_id):
"""移除指定的 mavlink_object"""
@ -1406,10 +1408,10 @@ class Orchestrator:
def _update_vehicles_list(self):
"""更新已連線載具列表(從 vehicle_registry 獲取)"""
vehicles_dict = {}
# 從 vehicle_registry 獲取所有載具
all_vehicles = self.vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
# 遍歷每個載具的所有組件
for compid, component in vehicle.components.items():
@ -1422,9 +1424,9 @@ class Orchestrator:
'connection_via': vehicle.connected_via.value,
'socket_id': vehicle.custom_meta.get('socket_id', 'N/A')
}
self.panelState.connected_vehicles_dict = vehicles_dict
def _prepare_vehicle_info(self, sysid, compid):
"""準備載具組件的詳細資訊(包含變化率計算)"""
vehicle = self.vehicle_registry.get(sysid)
@ -1433,37 +1435,37 @@ class Orchestrator:
return
socket_id = vehicle.custom_meta.get('socket_id', 'N/A')
component = vehicle.get_component(compid)
if not component:
logger.warning(f"Component {compid} not found in vehicle {sysid}")
return
stats = component.packet_stats
# 取得之前的統計資料(用於計算變化)
prev_stats = self.panelState.vehicle_info_single.get('prev_stats', {})
prev_received = prev_stats.get('received', 0)
prev_lost = prev_stats.get('lost', 0)
prev_msg_counts = prev_stats.get('msg_counts', {})
# 計算變化率
received_delta = stats.received_count - prev_received
lost_delta = stats.lost_count - prev_lost
# 準備訊息類型計數(排序後取前幾個)
sorted_msg_counts = dict(sorted(
stats.msg_type_count.items(),
key=lambda x: x[1],
reverse=True
)[:12]) # 取前 12 個最常見的
# 計算每種訊息類型的變化
msg_deltas = {}
for msg_id, count in sorted_msg_counts.items():
prev_count = prev_msg_counts.get(msg_id, 0)
msg_deltas[f'msg_delta_{msg_id}'] = count - prev_count
# 更新 vehicle_info_single
socket_type = "N/A"
socket_obj =mo.mavlink_object.mavlinkObjects.get(socket_id, None)
@ -1481,7 +1483,7 @@ class Orchestrator:
"packet_stats": {
"received": stats.received_count,
"lost": stats.lost_count,
"loss_rate": (stats.lost_count / stats.received_count * 100
"loss_rate": (stats.lost_count / stats.received_count * 100
if stats.received_count > 0 else 0),
"last_seq": stats.last_seq,
"last_msg_time": stats.last_msg_time,
@ -1508,8 +1510,8 @@ class Orchestrator:
(f"Fail! Serial Port {self.panelState.serial_info_temp['Port']} already linked.", time.time())
)
return
# 獲取可用的 udp port
# 獲取可用的 udp port
udp_port_tmp = find_available_port(19000, 20000)
# 定義通訊類型映射表
@ -1526,7 +1528,7 @@ class Orchestrator:
("Please select Communication Type first.", time.time())
)
return
# 查找對應的通訊類型
comm_type_tmp = COMM_TYPE_MAP.get(comm_type)
if comm_type_tmp is None:
@ -1554,12 +1556,31 @@ class Orchestrator:
self.panelState.udp_info_temp['Port'] = str(udp_port_tmp)
self.panelState.udp_info_temp['direction'] = "inbound"
self.create_udp_object("SERIAL_XBEE")
def main():
stop_evt = threading.Event()
# =========== 各項模組的版本先驗 ===========
# 除非你有在做這幾項模組的改版 不然動到這邊的版本號 代表執行環境有很大的問題!!!!!!
version_check = True
if mo.MODULE_VER != "1.50":
print("Module Version Error! : mavlinkObkect")
version_check = False
if mros.MODULE_VER != "1.50":
print("Module Version Error! : mavlinkROS2Nodes")
version_check = False
if mvv.MODULE_VER != "1.00":
print("Module Version Error! : mavlinkVehicleView")
version_check = False
if sm.MODULE_VER != "0.60":
print("Module Version Error! : serialManager")
version_check = False
if version_check == False:
print("Environment Obstacle! Check YOUR Execution System Path First!!")
return
# ========================================
stop_evt = threading.Event()
def signal_handler(signum, frame):
"""處理 Ctrl+C 信號 藉此訊號 會結束下面的 while 迴圈 並逐步關閉各模組"""
stop_evt.set()
@ -1593,4 +1614,7 @@ if __name__ == "__main__":
4. 註解掉無效代碼 action == "UPDATE_VEHICLES_LIST" 區塊
5. 系統納入 mavlink ROS2 Manager 模組
2025.04.13
1. 加入各項模組的版本先驗 檢驗失敗就直接離開
'''

@ -28,9 +28,6 @@ mavlink_bridge:
專門處理 stream_bridge_ring 裡面的訊息流
會把訊息流解開後 存放到 mavlinkVehicleView.py 定義的載具結構視圖
'''
# 基礎功能的 import
@ -61,6 +58,8 @@ from .utils import RingBuffer, setup_logger
# ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.50"
stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255)
return_packet_ring = RingBuffer(capacity=256, buffer_id=254)
@ -386,7 +385,7 @@ class mavlink_object:
self.MAVLink = mav_common.MAVLink(self.mavlinkPipeline, srcSystem=254, srcComponent=191)
# 記錄訊息過濾類型 (可選)
self.bridge_msg_types = set([0, 30, 33, 74, 147]) # 0 HEARTBEAT, 30 ATTITUDE, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS
self.bridge_msg_types = set([0, 30, 32, 33, 74, 147]) # 0 HEARTBEAT, 30 ATTITUDE, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS, 32 LOCAL_POSITION_NED
self.return_msg_types = set([])
# 轉發到別的 mavlink object 作為目標端口 的列表

@ -27,6 +27,7 @@ from rclpy.executors import MultiThreadedExecutor
import std_msgs.msg
import sensor_msgs.msg
import geometry_msgs.msg
import nav_msgs.msg
import mavros_msgs.msg
# ROS2 Service imports
@ -39,12 +40,13 @@ from . import mavlinkObject as mo
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.50"
# ============================================================================
# 頻率控制器
# VehicleStatusPublisher Node
# ============================================================================
# 頻率控制器
class PublishRateController:
"""發布頻率控制器 - 按時間間隔控制發布頻率"""
@ -55,14 +57,17 @@ class PublishRateController:
# 例如:'ekf_status': 1.0, # EKF 狀態
# ═══════════════════════════════════════════════════════════════
# 各 topic 的發布間隔(秒)
# 注意 這邊是定義區 不要把參數寫在這裡 所以預設全部關閉
# 以這個專案 請看 mainOrchestrator.py 的 Orchestrator 初始化階段
self.topic_intervals = {
'position': 0.5, # GPS位置
'attitude': 0.5, # 姿態
'velocity': 0.5, # 速度
'battery': 1.0, # 電池
'vfr_hud': 0.5, # VFR HUD
'mode': 1.0, # 飛行模式
'summary': 1.0, # 載具摘要
'position': 0.0, # GNSS位置
'position_ned': 0.0, # LOCAL_POSITION_NED (位置+速度)
'attitude': 0.0, # 姿態 (pitch yaw row 與其加速狀態)
'velocity': 0.0, # 速度 (已經包含在 vfr_hud 未來移除)
'battery': 0.0, # 電池
'vfr_hud': 0.0, # VFR HUD (地速 空速 絕對高度 爬升率 航向 油門)
'mode': 0.0, # 飛行模式 (已經在 summary 裡 未來移除)
'summary': 0.0, # 載具摘要 (sysid 飛行模式 解鎖上鎖 gps狀態)
# 在這裡新增更多 topics...
}
# 記錄每個 topic 的最後發布時間 {(sysid, topic): timestamp}
@ -103,11 +108,6 @@ class PublishRateController:
"""重置所有計時器"""
self.last_publish_time.clear()
# ============================================================================
# VehicleStatusPublisher Node
# ============================================================================
class VehicleStatusPublisher(Node):
"""
載具狀態發布者 - vehicle_registry 讀取數據並發布到 ROS2 topics
@ -144,10 +144,8 @@ class VehicleStatusPublisher(Node):
if not self.running:
return
# 從 vehicle_registry 獲取所有載具
all_vehicles = mvv.vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
# 高頻快路徑:讀取 registry 的 immutable 快照
for sysid, vehicle in mvv.vehicle_registry.read_all():
self._publish_vehicle_status(vehicle)
def _publish_vehicle_status(self, vehicle: mvv.VehicleView):
@ -173,6 +171,7 @@ class VehicleStatusPublisher(Node):
# ═══════════════════════════════════════════════════════════════
# 發布各種狀態(通過頻率控制器判斷是否發布)
self._publish_position(sysid, status)
self._publish_position_ned(sysid, status)
self._publish_attitude(sysid, status)
self._publish_velocity(sysid, status)
self._publish_battery(sysid, status)
@ -228,6 +227,42 @@ class VehicleStatusPublisher(Node):
msg.status.status = gps.fix_type - 1 # MAVLink fix_type 轉 NavSatStatus
publisher.publish(msg)
def _publish_position_ned(self, sysid: int, status: mvv.ComponentStatus):
"""發布 LOCAL_POSITION_NEDNED 座標,含位置與速度)"""
if not self.rate_controller.should_publish(sysid, 'position_ned'):
return
local_pos = status.custom_status.get('local_position')
if not local_pos:
return
required_keys = ('x', 'y', 'z', 'vx', 'vy', 'vz')
if any(local_pos.get(k) is None for k in required_keys):
return
publisher = self._get_or_create_publisher(
sysid, 'position_ned', nav_msgs.msg.Odometry
)
if publisher.get_subscription_count() == 0:
return
msg = nav_msgs.msg.Odometry()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'local_ned'
msg.child_frame_id = 'base_link_ned'
# 保持 MAVLink LOCAL_POSITION_NED 座標定義,不進行 ENU 轉換
msg.pose.pose.position.x = float(local_pos['x'])
msg.pose.pose.position.y = float(local_pos['y'])
msg.pose.pose.position.z = float(local_pos['z'])
msg.twist.twist.linear.x = float(local_pos['vx'])
msg.twist.twist.linear.y = float(local_pos['vy'])
msg.twist.twist.linear.z = float(local_pos['vz'])
publisher.publish(msg)
def _publish_attitude(self, sysid: int, status: mvv.ComponentStatus):
"""發布姿態IMU"""
@ -237,7 +272,6 @@ class VehicleStatusPublisher(Node):
att = status.attitude
if att.roll is None:
return
publisher = self._get_or_create_publisher(sysid, 'attitude', sensor_msgs.msg.Imu)
if publisher.get_subscription_count() == 0:
@ -477,12 +511,17 @@ class MavlinkCommandService(Node):
- 作為 service server 等待 client 請求
- 接收請求組裝 MAVLink 封包
- 調用 mavlinkObject 發送封包
- 處理 ACK 等待和超時未來實現
- 處理 ACK 等待和超時
設計理念回歸 MAVLink 純粹結構
- 只負責將 ROS2 請求轉換為 MAVLink 封包
- 不預設功能 ARM/DISARM) 保持模組化
- 高層應用可透過此 service 實現各種功能
講白話一點就是
每次接到一個 service 請求 要整個系統丟某種指令給載具時
會做兩件事 1."丟出mavlink封包" 2."創造一個臨時信箱 Pending"
"""
serviceString_prefix = '/fc_network/vehicle'
@ -493,8 +532,8 @@ class MavlinkCommandService(Node):
# 狀態標記
self.running = True
# mavlinkObject 的引用(將由外部設置)
self.mavlink_analyzer = None
# # mavlinkObject 的引用(將由外部設置) 用不到
# self.mavlink_analyzer = None
# pending 旗標物件的儲存庫
self._pending_by_sysid = {} # sysid(int) : PendingEntry
@ -557,7 +596,8 @@ class MavlinkCommandService(Node):
def return_router(self):
'''
這邊是給外部迴圈呼叫的 消耗 return_packet_ring 裡接收到的 mavlink 封包
在節點管理器哪邊被呼叫
消耗 return_packet_ring 裡接收到的 mavlink 封包
分送到各自的 pending
藉由 event.set() 解開 service 中的 block
'''
@ -953,7 +993,8 @@ class fc_ros_manager:
try:
# 初始化 ROS2
rclpy.init()
if not rclpy.ok():
rclpy.init(args=None)
# 創建節點 node
self.status_publisher = VehicleStatusPublisher()
@ -1095,5 +1136,9 @@ ros2_manager = fc_ros_manager()
1. 完成 ros2 MavPositionTargetGlobalInt 區域
2. 優化 response.ack_result 回傳值的資訊
TODO
1. service 部分會需要跟 mavlinkobject 大量互動 也許需要考慮對方的生命週期
2. 還沒測試 如果有失效狀態讓 fc_ros_manager 中斷了 如何恢復的問題
'''

@ -5,14 +5,14 @@ VehicleView - Pure State Container
"""
import os
from typing import Dict, Optional, Any
from typing import Dict, Optional, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.00"
# ====================== Enums =====================
@ -402,11 +402,18 @@ class VehicleViewRegistry:
def __init__(self):
self._vehicles: Dict[int, VehicleView] = {}
# 高頻讀取用 snapshot弱一致性
self._vehicles_snapshot_items: Tuple[Tuple[int, VehicleView], ...] = tuple()
def _refresh_snapshot(self) -> None:
"""重建快照(僅在 key 集合變動時觸發)"""
self._vehicles_snapshot_items = tuple(self._vehicles.items())
def register(self, sysid: int) -> VehicleView:
"""註冊新的載具視圖"""
if sysid not in self._vehicles:
self._vehicles[sysid] = VehicleView(sysid)
self._refresh_snapshot()
logger.info(f"Registered new VehicleView for system {sysid}")
return self._vehicles[sysid]
@ -418,6 +425,7 @@ class VehicleViewRegistry:
"""註銷載具視圖"""
if sysid in self._vehicles:
del self._vehicles[sysid]
self._refresh_snapshot()
logger.info(f"Unregistered VehicleView for system {sysid}")
return True
return False
@ -425,10 +433,22 @@ class VehicleViewRegistry:
def get_all(self) -> Dict[int, VehicleView]:
"""取得所有載具視圖"""
return self._vehicles.copy()
def read_all(self) -> Tuple[Tuple[int, VehicleView], ...]:
"""
高效讀取所有載具視圖快照弱一致性
注意:
- 回傳 immutable tuple 快照適合高頻迭代
- 僅在 register/unregister/clear 後更新可能不是最新資料
- 適合高頻讀取且可接受弱一致性的情境
"""
return self._vehicles_snapshot_items
def clear(self) -> None:
"""清空所有載具視圖"""
self._vehicles.clear()
self._refresh_snapshot()
logger.info("Cleared all VehicleViews")
def __len__(self) -> int:
@ -449,5 +469,8 @@ vehicle_registry = VehicleViewRegistry()
2026.01.16
1. 新增 重置指定組件的封包統計 功能
2026.04.13
1. 新增 VehicleViewRegistry.read_all 方法 ROS2 Node 組件更有效率的讀取其中的資料
'''

@ -26,6 +26,7 @@ from .utils import setup_logger
# ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "0.60"
# ====================== 分割線 =====================

@ -1,14 +1,10 @@
from .longCommand import CommandLongClient, ChangeModeResult
from .changeMode import change_mode
from .arm_disarm import arm_disarm
from .takeoff import takeoff
from .land import land
from .longCommand import CommandLongClient, CommandLongResult
from .navigation import PositionTargetGlobalIntClient, PositionTargetGlobalIntResult
__all__ = [
"CommandLongClient",
"ChangeModeResult",
"change_mode",
"arm_disarm",
"takeoff",
"land",
"PositionTargetGlobalIntClient",
"CommandLongResult",
"PositionTargetGlobalIntResult",
]

@ -1,93 +0,0 @@
"""Simple wrapper for arm/disarm via fc_network ROS2 service (MAV_CMD_COMPONENT_ARM_DISARM)."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_COMPONENT_ARM_DISARM = 400
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class ArmDisarmResult:
success: bool
message: str
ack_result: int
def arm_disarm(
*,
target_sysid: int,
arm: bool,
target_compid: int = 0,
confirmation: int = 0,
param2: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> ArmDisarmResult:
"""One-shot MAV_CMD_COMPONENT_ARM_DISARM (400) wrapper.
param1: 1.0 to arm, 0.0 to disarm.
param2: usually 0. Some stacks use 21196 for force-arm (ArduPilot); pass via param2 if needed.
"""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_arm_disarm_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return ArmDisarmResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_COMPONENT_ARM_DISARM
req.confirmation = confirmation
req.param1 = 1.0 if arm else 0.0
req.param2 = float(param2)
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return ArmDisarmResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return ArmDisarmResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
r = arm_disarm(target_sysid=3, arm=True)
print(
f"arm_disarm success={r.success}, "
f"ack_result={r.ack_result}, message='{r.message}'"
)

@ -1,99 +0,0 @@
"""Simple wrapper for mode change via fc_network ROS2 service.
Reference CLI command:
ros2 service call /fc_network/vehicle/send_command_long \
fc_interfaces/srv/MavCommandLong \
"{target_sysid: 3, target_compid: 0, command: 176, confirmation: 0, \
param1: 1, param2: 4, param3: 0, param4: 0, param5: 0, param6: 0, \
param7: 0, timeout_sec: 2}"
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_DO_SET_MODE = 176
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class ChangeModeResult:
"""Return value for mode change requests."""
success: bool
message: str
ack_result: int
def change_mode(
*,
target_sysid: int,
custom_mode: float,
target_compid: int = 0,
base_mode: float = 1.0,
confirmation: int = 0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> ChangeModeResult:
"""One-shot helper for collaborators who want minimal code."""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_change_mode_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return ChangeModeResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_DO_SET_MODE
req.confirmation = confirmation
req.param1 = float(base_mode)
req.param2 = float(custom_mode)
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return ChangeModeResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return ChangeModeResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
# Example values are aligned with your terminal command.
result = change_mode(target_sysid=3, custom_mode=4)
print(
f"change_mode success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -1,90 +0,0 @@
"""Simple wrapper for land via fc_network ROS2 service."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_NAV_LAND = 21
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class LandResult:
success: bool
message: str
ack_result: int
def land(
*,
target_sysid: int,
target_compid: int = 0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
altitude_m: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> LandResult:
"""One-shot MAV_CMD_NAV_LAND wrapper."""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_land_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return LandResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_NAV_LAND
req.confirmation = 0
req.param1 = 0.0
req.param2 = 0.0
req.param3 = 0.0
req.param4 = float(yaw_deg)
req.param5 = float(latitude) if latitude is not None else 0.0
req.param6 = float(longitude) if longitude is not None else 0.0
req.param7 = float(altitude_m)
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return LandResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return LandResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
result = land(target_sysid=3)
print(
f"land success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -25,10 +25,6 @@ class CommandLongResult:
message: str
ack_result: int
ChangeModeResult = CommandLongResult
class CommandLongClient(Node):
"""ROS2 client : 對同一個 send_command_long service 發送各種 MAV_CMD_*。"""

@ -99,7 +99,6 @@ def _echo_position_target_matches(
@dataclass
class PositionTargetGlobalIntResult:
"""對應 MavPositionTargetGlobalInt.srv 的 Responsesuccess 依「送出與 r_* echo 是否一致」。"""
success: bool
message: str
ack_result: int
@ -117,7 +116,6 @@ class PositionTargetGlobalIntResult:
r_yaw: float = 0.0
r_yaw_rate: float = 0.0
class PositionTargetGlobalIntClient(Node):
"""
ROS2 client pos_global_int service 發送 SET_POSITION_TARGET_GLOBAL_INT (msg 86)
@ -230,51 +228,6 @@ class PositionTargetGlobalIntClient(Node):
r_yaw_rate=float(resp.r_yaw_rate),
)
# def set_position_target_global_int(
# self,
# *,
# target_sysid: int,
# target_compid: int = 0,
# time_boot_ms: int = 0,
# coordinate_frame: int = 6,
# type_mask: int = 0,
# lat_int: int = 0,
# lon_int: int = 0,
# alt: float = 0.0,
# vx: float = 0.0,
# vy: float = 0.0,
# vz: float = 0.0,
# afx: float = 0.0,
# afy: float = 0.0,
# afz: float = 0.0,
# yaw: float = 0.0,
# yaw_rate: float = 0.0,
# timeout_sec: float = DEFAULT_TIMEOUT_SEC,
# ) -> PositionTargetGlobalIntResult:
# """
# 送出 SET_POSITION_TARGET_GLOBAL_INTcoordinate_frame 預設 6 (MAV_FRAME_GLOBAL_INT)
# 可依飛控/任務覆寫。
# """
# return self._send_position_target_global_int(
# target_sysid=target_sysid,
# target_compid=target_compid,
# time_boot_ms=time_boot_ms,
# coordinate_frame=coordinate_frame,
# type_mask=type_mask,
# lat_int=lat_int,
# lon_int=lon_int,
# alt=alt,
# vx=vx,
# vy=vy,
# vz=vz,
# afx=afx,
# afy=afy,
# afz=afz,
# yaw=yaw,
# yaw_rate=yaw_rate,
# timeout_sec=timeout_sec,
# )
def goto_position_only(
self,
*,
@ -307,47 +260,3 @@ class PositionTargetGlobalIntClient(Node):
yaw_rate=0,
timeout_sec=timeout_sec,
)
# def set_position_target_global_int_deg(
# self,
# *,
# target_sysid: int,
# latitude_deg: float,
# longitude_deg: float,
# target_compid: int = 0,
# time_boot_ms: int = 0,
# coordinate_frame: int = MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
# type_mask: int = 0,
# alt: float = 0.0,
# vx: float = 0.0,
# vy: float = 0.0,
# vz: float = 0.0,
# afx: float = 0.0,
# afy: float = 0.0,
# afz: float = 0.0,
# yaw: float = 0.0,
# yaw_rate: float = 0.0,
# timeout_sec: float = DEFAULT_TIMEOUT_SEC,
# ) -> PositionTargetGlobalIntResult:
# """以度為單位的緯經,轉成 MAVLink 的 lat_int / lon_int (1e7 * deg)。"""
# lat_int = int(round(latitude_deg * 1e7))
# lon_int = int(round(longitude_deg * 1e7))
# return self.set_position_target_global_int(
# target_sysid=target_sysid,
# target_compid=target_compid,
# time_boot_ms=time_boot_ms,
# coordinate_frame=coordinate_frame,
# type_mask=type_mask,
# lat_int=lat_int,
# lon_int=lon_int,
# alt=alt,
# vx=vx,
# vy=vy,
# vz=vz,
# afx=afx,
# afy=afy,
# afz=afz,
# yaw=yaw,
# yaw_rate=yaw_rate,
# timeout_sec=timeout_sec,
# )

@ -1,91 +0,0 @@
"""Simple wrapper for takeoff via fc_network ROS2 service."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_NAV_TAKEOFF = 22
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class TakeoffResult:
success: bool
message: str
ack_result: int
def takeoff(
*,
target_sysid: int,
altitude_m: float,
target_compid: int = 0,
min_pitch_deg: float = 0.0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> TakeoffResult:
"""One-shot MAV_CMD_NAV_TAKEOFF wrapper."""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_takeoff_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return TakeoffResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_NAV_TAKEOFF
req.confirmation = 0
req.param1 = float(min_pitch_deg)
req.param2 = 0.0
req.param3 = 0.0
req.param4 = float(yaw_deg)
req.param5 = float(latitude) if latitude is not None else 0.0
req.param6 = float(longitude) if longitude is not None else 0.0
req.param7 = float(altitude_m)
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return TakeoffResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return TakeoffResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
result = takeoff(target_sysid=3, altitude_m=10.0)
print(
f"takeoff success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -1,55 +0,0 @@
from fc_network_apps import CommandLongClient
import time
def main():
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
commandAPI = CommandLongClient()
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=3.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=5.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -1,30 +0,0 @@
"""Usage example for arm/disarm helper.
Run from repo root with module mode:
python -m someotherpkg.src.example_arm_disarm
"""
from fc_network_apps import arm_disarm
def main() -> None:
# MAV_CMD_COMPONENT_ARM_DISARM (command=400)
# param1: 1 = arm, 0 = disarm
result = arm_disarm(
target_sysid=3,
target_compid=0,
arm=True,
timeout_sec=2.0,
)
print("=== arm result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
# To disarm instead:
# result = arm_disarm(target_sysid=3, target_compid=0, arm=False, timeout_sec=2.0)
if __name__ == "__main__":
main()

@ -1,29 +0,0 @@
"""Usage example for collaborators.
Run after sourcing your ROS2 workspace:
source install/local_setup.bash
python src/fc_network_apps/example_change_mode.py
"""
from fc_network_apps import change_mode
def main() -> None:
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
result = change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -1,27 +0,0 @@
"""Usage example for land helper.
Run from repo root with module mode:
python -m someotherpkg.src.example_land
"""
from fc_network_apps import land
def main() -> None:
# MAV_CMD_NAV_LAND (command=21)
# This example asks vehicle sysid=3 to land.
result = land(
target_sysid=3,
target_compid=0,
yaw_deg=0.0,
timeout_sec=2.0,
)
print("=== land result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -1,48 +0,0 @@
from __future__ import annotations
from fc_network_apps.navigation import PositionTargetGlobalIntClient
import time
def main() -> None:
# 目標載具 system id依你的連線修改
target_sysid = 3
nav = PositionTargetGlobalIntClient()
if not nav.wait_for_service(timeout_sec=5.0):
print("Service /fc_network/vehicle/pos_global_int not available.")
return
result_deg = nav.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.376655,
longitude_deg=149.157011,
alt=200.0,
timeout_sec=3.0,
)
print("=== set_position_target_global_int_deg ===")
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
time.sleep(180)
result_deg = nav.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.3632621,
longitude_deg=149.1652375,
alt=100.0,
timeout_sec=3.0,
)
print("=== set_position_target_global_int_deg ===")
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
if __name__ == "__main__":
main()

@ -1,28 +0,0 @@
"""Usage example for takeoff helper.
Run from repo root with module mode:
python -m someotherpkg.src.example_takeoff
"""
from fc_network_apps import takeoff
def main() -> None:
# MAV_CMD_NAV_TAKEOFF (command=22)
# This example asks vehicle sysid=3 to take off to 10 meters.
result = takeoff(
target_sysid=3,
target_compid=0,
altitude_m=10.0,
yaw_deg=0.0,
timeout_sec=2.0,
)
print("=== takeoff result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -1,64 +0,0 @@
from fc_network_apps import CommandLongClient
import time
def main():
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
commandAPI = CommandLongClient()
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=3.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(3)
result = commandAPI.arm_disarm(
target_sysid=3,
target_compid=0,
arm = True,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
result = commandAPI.takeoff(
target_sysid=3,
target_compid=0,
altitude_m = 30.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
# time.sleep(20)
# result = commandAPI.land(
# target_sysid=3,
# target_compid=0,
# )
# print("=== change mode result ===")
# print(f"success : {result.success}")
# print(f"ack_result: {result.ack_result}")
# print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -0,0 +1,111 @@
import fc_network_apps as fap
import time
def main():
commandAPI = fap.CommandLongClient()
navAPI = fap.PositionTargetGlobalIntClient()
target_sysid = 3
print("=== change mode ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.change_mode(
target_sysid=target_sysid,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=3.0,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
print("=== arm vehicle ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.arm_disarm(
target_sysid=target_sysid,
arm = True,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
print("=== takeoff ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.takeoff(
target_sysid=target_sysid,
altitude_m = 100.0,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
time.sleep(15)
print("=== set_position_target_global_int_deg ===")
result_deg = navAPI.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.376655,
longitude_deg=149.157011,
alt=150.0,
timeout_sec=3.0,
)
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
time.sleep(180)
result_deg = navAPI.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.3632621,
longitude_deg=149.1652375,
alt=100.0,
timeout_sec=3.0,
)
print("=== set_position_target_global_int_deg ===")
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
time.sleep(180)
print("=== land ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.land(
target_sysid=3,
target_compid=0,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
if __name__ == "__main__":
main()

@ -28,14 +28,13 @@ python -m someotherpkg.src.example_takeoff_land
python -m someotherpkg.src.example_change_mode
ros2 topic list
ros2 topic echo
ros2 topic echo /fc_network/vehicle/sys3/battery
/home/picars/ardupilot/build/sitl/bin/arducopter -S --model + --speedup 1 --slave 0 --defaults /home/picars/ardupilot/Tools/autotest/default_params/copter.parm --sim-address=127.0.0.1 -I3 --sysid 7
mavproxy.py --master=tcp:127.0.0.1:5790 --out=udp:127.0.0.1:14560
ros2 service call /mavlink/add_two_ints example_interfaces/srv/AddTwoInts "{a: 5,b: 8}"
ros2 service call /fc_network/vehicle/mav_ping fc_interfaces/srv/MavPing "{target_sysid: 3, target_compid: 0, ping_seq: 1}"
@ -49,16 +48,10 @@ sudo tcpdump -i lo 'udp dst port 14561' -X
sudo tcpdump -i lo 'udp dst port 14550' -X -vv
sudo tcpdump -i lo -X udp port 14550
colcon build --packages-select fc_interfaces
-35.360150, 149.159659
-35.376655, 149.157011
0b00 0000 00000 00000
0b00 0011 01111 11000
0b 11 01111 11000
0b 1111 1101 1111 1000
幾個要討論的
1. 專案文件中 .mat 檔案是幹嘛的?
2. GUI 的 print [Panel/Map Update] 11 Hz 希望關閉
3. readme 那串文字來源? 用途?
4. GUI介面 加一下海拔高 跟 相對高 速度跟位置 變更為 XY速度 XY位置
5. pitch yaw roll 出來了 弄一下
6.

Loading…
Cancel
Save