Compare commits

..

1 Commits

Author SHA1 Message Date
ken910606 f2bb4679c2 Update master with local changes 1 month ago

@ -1,46 +1,43 @@
這是天雷系統的專案 這是天雷系統的專案
===
專案核心框架
1. ROS2 Humble
2. Python 3.8.10
=== ===
## 功能簡介 必要相依套件 順便記錄我開發時的環境版本
1. mavlink 多對多支援平台
2. 不允許進到 ros 系統有相同 sysid
3. 假設一台載具上所有 component 共用同一 socket
=== Python
## 運行環境 1. pymavlink -> Version: 2.4.42
2. conda-forge 中的 pyserial-asyncio
3. importlib_metadata -> Version: 8.5.0
4. setuptools -> Version: 58.2.0 (版本太新不行)
5. pyserial-asyncio
### 專案核心框架
1. ROS2 Humble
2. Python 3.8.10
### 必要相依套件及版本 ROS2
- Python
[Package] fc_network_adapter
1. pymavlink -> Version: 2.4.42
2. conda-forge 中的 pyserial-asyncio
3. importlib_metadata -> Version: 8.5.0
4. setuptools -> Version: 58.2.0 (版本太新不行)
5. pyserial-asyncio
[Package] GUI
1. testresources
2. websockets
3. PyQt6
4. PyQt6-WebEngine
- ROS2
1. source ~/ros2_humble/install/setup.bash 1. source ~/ros2_humble/install/setup.bash
2. geographic_info (https://github.com/ros-geographic-info/geographic_info.git) 已經作為 submodule 放在 external 2.
3. angles (https://github.com/ros/angles.git) 已經作為 submodule 放在 external
4. mavros_msgs (https://github.com/mavlink/mavros) 這個專案中的一個資料夾 這邊手動複製的
### 開發用輔助專案 ===
功能簡介
1. mavlink 多對多支援平台
2. 不允許進到 ros 系統有相同 sysid
3. 假設一台載具上所有 component 共用同一 socket
===
開發用輔助專案
1. Gazebo Garden 1. Gazebo Garden
2. Ardupilot 2. Ardupilot
=== ===
## 使用說明 依賴的 ROS 庫
1. https://github.com/ros-geographic-info/geographic_info.git 記得要搞 ros2 版本的
2. https://github.com/ros/angles.git
3. mavros_msgs 是 https://github.com/mavlink/mavros 這個專案中的一個資料夾 這邊手動複製的
Clone 專案後 請先執行這些指令 Clone 專案後 請先執行這些指令
```bash ```bash
# 1.同步 submodule # 1.同步 submodule
@ -52,29 +49,17 @@ colcon build --packages-select angles geographic_msgs
colcon build --packages-select mavros_msgs # 這個依賴前面的 colcon build --packages-select mavros_msgs # 這個依賴前面的
colcon build --packages-select fc_interfaces # 自己定義的 colcon build --packages-select fc_interfaces # 自己定義的
```
1.
主要專案 fc_network_adapter 請一律在 ~/AirTrapMine/src/ 路徑下 以模組化啟動程式
```bash
# 記得先開啟 依賴 Package 到 overlay
. ./install/local_setup.bash
# 範例
cd ~/AirTrapMine/src/ # 這是範例!!!
python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
python -m fc_network_adapter.tests.demo_integration
python -m someotherpkg.src.example_wholeMoving
``` ```
2.
GUI 介面
在 ~/AirTrapMine/src/GUI 路徑下 直接啟動
```bash
cd ~/AirTrapMine/src/GUI
python gui.py
```
=== ===
## 資料夾說明 Package 簡述
<<<<<<< HEAD
===
簡述0328
主要是透過test_transform這隻程式執行fdm_switch_example_one_with_remote_forwarding這個副函式透過UDP將封包轉接到Matlab並在Matlab進行演算法迭代後打包JSON檔並再次透過port口傳回封包傳回來的封包會在close_loop這隻程式被解析並提取其中有關velocity的資料儲存成變數以mavlink的方式打入Ardupilot以實現封閉迴路的構思。
=======
1. unitdev 為各自協作者做開發時的測試區 1. unitdev 為各自協作者做開發時的測試區
01 -> 晉凱(ken) 01 -> 晉凱(ken)
02 -> 其宇(chiyu) 02 -> 其宇(chiyu)
@ -92,13 +77,19 @@ python gui.py
使用者的外層包裝 使用者的外層包裝
5. someotherpkg 5. someotherpkg
如何使用 fc_network_apps 的範例檔案 如何使用 fc_network_apps 的範例檔案
6. GUI
由 PyQt6 開發的互動式介面
N. logs 是執行時期的記錄檔 N. logs 是執行時期的記錄檔
=== ===
主要專案 fc_network_adapter 請一律在 ~/AirTrapMine/src/ 資料夾下 以模組化啟動程式
例如 在 ~/AirTrapMine/src/ 資料夾下
```bash
# 記得先開啟 依賴 Package 到 overlay
. ./install/local_setup.bash
# 範例
python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
python -m fc_network_adapter.tests.demo_integration
```
簡述0328 >>>>>>> chiyu
主要是透過test_transform這隻程式執行fdm_switch_example_one_with_remote_forwarding這個副函式透過UDP將封包轉接到Matlab並在Matlab進行演算法迭代後打包JSON檔並再次透過port口傳回封包傳回來的封包會在close_loop這隻程式被解析並提取其中有關velocity的資料儲存成變數以mavlink的方式打入Ardupilot以實現封閉迴路的構思。

@ -4,14 +4,12 @@ import math
import re import re
import threading import threading
from threading import Lock from threading import Lock
from concurrent.futures import ThreadPoolExecutor
import asyncio import asyncio
import websockets import websockets
import json import json
import socket import socket
import sys import sys
import os import os
import traceback
from pymavlink import mavutil from pymavlink import mavutil
from geometry_msgs.msg import Point, Vector3 from geometry_msgs.msg import Point, Vector3
from sensor_msgs.msg import BatteryState, NavSatFix, Imu from sensor_msgs.msg import BatteryState, NavSatFix, Imu
@ -24,17 +22,18 @@ _src_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _src_path not in sys.path: if _src_path not in sys.path:
sys.path.insert(0, _src_path) sys.path.insert(0, _src_path)
# 導入 fc_network_apps 的 longCommand統一的 MAV_CMD_* API # 導入 fc_network_apps 的函數
try: try:
from fc_network_apps.longCommand import CommandLongClient from fc_network_apps import change_mode, takeoff
except ImportError as e: except ImportError as e:
import traceback import traceback
print(f"⚠️ 警告: 無法導入 CommandLongClient") print(f"⚠️ 警告: 無法導入 fc_network_apps")
print(f" 错误: {e}") print(f" 错误: {e}")
print(f" 这通常表示 ROS2 的 fc_interfaces 包未被编译或未正确安装") print(f" 这通常表示 ROS2 的 fc_interfaces 包未被编译或未正确安装")
print(f" 完整堆栈跟踪:") print(f" 完整堆栈跟踪:")
traceback.print_exc() traceback.print_exc()
CommandLongClient = None change_mode = None
takeoff = None
class DroneSignals(QObject): class DroneSignals(QObject):
update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data) update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data)
@ -419,13 +418,8 @@ class WebSocketMavlinkReceiver(threading.Thread):
class DroneMonitor(Node): class DroneMonitor(Node):
# Subscribe to drone ROS2 topics # Subscribe to drone ROS2 topics
_instance = None # Singleton pattern to prevent duplicate nodes
def __init__(self): def __init__(self):
# Use a unique node name with timestamp to avoid conflicts on restart super().__init__('drone_monitor')
import time
node_name = f'drone_monitor_{int(time.time() * 1000) % 100000}'
super().__init__(node_name)
self.signals = DroneSignals() self.signals = DroneSignals()
self.drone_topics = {} self.drone_topics = {}
self.lock = Lock() self.lock = Lock()
@ -466,17 +460,6 @@ class DroneMonitor(Node):
# ================================================================================ # ================================================================================
self.serial_receivers = [] self.serial_receivers = []
# ================================================================================
# 【新增】初始化 CommandLongClient持久化不會每次調用都創建/銷毀)
# ================================================================================
self.command_long_client = None
try:
self.command_long_client = CommandLongClient()
except Exception as e:
print(f"⚠️ 警告: 無法初始化 CommandLongClient: {e}")
self.command_long_client = None
# ================================================================================
# 主题检测定时器 # 主题检测定时器
self.create_timer(1.0, self.scan_topics) self.create_timer(1.0, self.scan_topics)
@ -602,115 +585,331 @@ class DroneMonitor(Node):
# ================================================================================ # ================================================================================
async def set_mode(self, drone_id, mode_name): async def set_mode(self, drone_id, mode_name):
"""使用 CommandLongClient 切換無人機飛行模式(使用非阻塞的 async 方法)""" """
使用 fc_network_apps change_mode 函數切換無人機飛行模式
參數:
drone_id: 無人機ID (格式: "s{socket_id}_{sysid}")
mode_name: 模式名稱 (例如: "GUIDED", "AUTO", "LOITER")
返回:
bool: 模式切換是否成功
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
print(f"\n🔵 [SET_MODE] set_mode() 異步函數被調用 (drone_id={drone_id}, mode={mode_name})", flush=True)
print(f" 事件循环: {asyncio.get_event_loop()}", flush=True)
print(f" 当前任务: {asyncio.current_task()}\n", flush=True)
# 解析 drone_id 以提取 sysid
# drone_id 格式: "s{socket_id}_{sysid}" (例如: "s0_1", "s0_11")
try: try:
# 解析 drone_id 提取 sysid
parts = drone_id.split('_') parts = drone_id.split('_')
if len(parts) < 2: if len(parts) < 2:
self.get_logger().error(f"Invalid drone_id format: {drone_id}")
print(f"❌ [SET_MODE] 無效的 drone_id 格式: {drone_id}") print(f"❌ [SET_MODE] 無效的 drone_id 格式: {drone_id}")
print(f" 返回: False")
return False return False
sysid = int(parts[-1]) sysid = int(parts[-1])
print(f"✓ [SET_MODE] 解析 drone_id: {drone_id} → sysid={sysid}")
except (ValueError, IndexError) as e:
self.get_logger().error(f"Failed to parse drone_id {drone_id}: {e}")
print(f"❌ [SET_MODE] 無法解析 drone_id {drone_id}: {e}")
print(f" 返回: False")
return False
# 獲取模式對應的 custom_mode 值 # 獲取模式對應的 custom_mode 值
custom_mode = self.MODE_MAPPING.get(mode_name) custom_mode = self.MODE_MAPPING.get(mode_name)
if custom_mode is None: if custom_mode is None:
print(f"❌ [SET_MODE] 未知模式: {mode_name}") self.get_logger().error(f"Unknown mode: {mode_name}. Available modes: {list(self.MODE_MAPPING.keys())}")
return False print(f"❌ [SET_MODE] 未知模式: {mode_name}")
print(f" 支持的模式: {list(self.MODE_MAPPING.keys())}")
print(f"\n📢 [SET_MODE] {drone_id}{mode_name} (custom_mode={custom_mode})") return False
if not self.command_long_client: print(f"✓ [SET_MODE] 模式對應: {mode_name} → custom_mode={custom_mode}")
print(f"❌ [SET_MODE] CommandLongClient 未初始化")
return False
# 直接調用 async 方法,無需線程池(避免嵌套執行器衝突) # 檢查 fc_network_apps 的 change_mode 函數是否可用
result = await self.command_long_client.change_mode_async( if change_mode is None:
target_sysid=sysid, self.get_logger().error("fc_network_apps is not available. Cannot change mode.")
custom_mode=float(custom_mode), print(f"❌ [SET_MODE] fc_network_apps 不可用")
target_compid=0, return False
base_mode=1.0,
timeout_sec=2.0,
)
if result and result.success: # 使用 fc_network_apps 的 change_mode 函數
print(f"✅ [SET_MODE] 模式切換成功") try:
msg = f"ROS2 服務呼叫: target_sysid={sysid}, custom_mode={custom_mode}, base_mode=1.0"
self.get_logger().info(f"Changing mode for drone {drone_id} to {mode_name} (custom_mode={custom_mode})")
print(f"\n📢 [SET_MODE] 開始切換模式")
print(f" Drone ID: {drone_id}")
print(f" 模式: {mode_name}")
print(f" {msg}")
# 在線程池中運行同步的 change_mode 函數
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
def _call_change_mode():
print(f"\n 🔄 [_call_change_mode] 在線程池中調用 change_mode...")
print(f" ├─ 線程開始時間: {__import__('time').time()}")
print(f" ├─ 目標: sysid={sysid}, mode={custom_mode}\n")
result = change_mode(
target_sysid=sysid,
custom_mode=float(custom_mode),
target_compid=0,
base_mode=1.0,
confirmation=0,
timeout_sec=2.0,
)
print(f"\n ├─ change_mode() 返回結果對象: {result}")
print(f" └─ 線程任務完成")
return result
print(f" 📢 [SET_MODE] 提交 change_mode 到線程池...")
result = await loop.run_in_executor(executor, _call_change_mode)
print(f"\n ✓ [SET_MODE] 從線程池接收到返回值")
print(f"\n📥 [SET_MODE] 從 change_mode() 接收服務響應:")
print(f" ├─ result 对象类型: {type(result)}")
print(f" ├─ result.success: {result.success} (类型: {type(result.success)})")
print(f" ├─ result.message: '{result.message}' (类型: {type(result.message)})")
print(f" └─ result.ack_result: {result.ack_result} (类型: {type(result.ack_result)})")
print(f"\n【返回值確認】")
print(f" success == True: {result.success == True}")
print(f" success is True: {result.success is True}")
print(f" bool(success): {bool(result.success)}")
print(f"\n【FC_NETWORK SERVICE 回传值确认】")
print(f" ├─ result.success: {result.success}")
print(f" ├─ result.message: '{result.message}'")
print(f" └─ result.ack_result: {result.ack_result}")
if result.success:
self.get_logger().info(f"Mode change successful for {drone_id}: {result.message}")
print(f"\n✅ [SET_MODE] 模式切換成功!")
print(f" ├─ fc_network 確認: success=True")
print(f" ├─ 訊息: {result.message}")
print(f" ├─ ACK代碼: {result.ack_result}")
print(f" └─ 返回到 GUI: True")
return True return True
else: else:
print(f"❌ [SET_MODE] 模式切換失敗 (message={result.message if result else 'None'})") self.get_logger().warning(f"Mode change failed for {drone_id}: {result.message} (ack={result.ack_result})")
print(f"\n❌ [SET_MODE] 模式切換失敗!")
print(f" ├─ fc_network 確認: success=False")
print(f" ├─ 原因: {result.message}")
print(f" ├─ ACK代碼: {result.ack_result}")
print(f" └─ 返回到 GUI: False")
return False return False
except Exception as e: except Exception as e:
print(f"❌ [SET_MODE] 例外錯誤: {e}") self.get_logger().error(f"Exception during mode change for {drone_id}: {e}")
print(f"\n❌ [SET_MODE] 例外錯誤: {e}")
import traceback
traceback.print_exc() traceback.print_exc()
print(f" 返回: False (异常)")
return False return False
async def arm_drone(self, drone_id, arm): async def arm_drone(self, drone_id, arm):
"""使用 CommandLongClient 執行 ARM/DISARM使用非阻塞的 async 方法)""" """
使用 fc_network_apps arm_disarm 函數上鎖/解鎖無人機
參數:
drone_id: 無人機ID (格式: "s{socket_id}_{sysid}")
arm: True 為上鎖, False 為解鎖
返回:
bool: 操作是否成功
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
action_name = "上鎖" if arm else "解鎖"
print(f"\n🔵 [ARM_DISARM] arm_drone() 異步函數被調用 (drone_id={drone_id}, arm={arm}, 動作={action_name})", flush=True)
# 解析 drone_id 以提取 sysid
try: try:
# 解析 drone_id 提取 sysid
parts = drone_id.split('_') parts = drone_id.split('_')
if len(parts) < 2: if len(parts) < 2:
print(f"❌ [ARM] 無效的 drone_id 格式: {drone_id}") self.get_logger().error(f"Invalid drone_id format: {drone_id}")
print(f"❌ [ARM_DISARM] 無效的 drone_id 格式: {drone_id}")
return False return False
sysid = int(parts[-1]) sysid = int(parts[-1])
print(f"✓ [ARM_DISARM] 解析 drone_id: {drone_id} → sysid={sysid}")
except (ValueError, IndexError) as e:
self.get_logger().error(f"Failed to parse drone_id {drone_id}: {e}")
print(f"❌ [ARM_DISARM] 無法解析 drone_id {drone_id}: {e}")
return False
action_name = "解鎖" if arm else "上鎖"
print(f"\n📢 [ARM] {drone_id}{action_name}")
if not self.command_long_client: try:
print(f"❌ [ARM] CommandLongClient 未初始化") msg = f"ROS2 服務呼叫: target_sysid={sysid}, arm={arm}"
return False self.get_logger().info(f"Changing arm state for drone {drone_id} to {action_name}")
print(f"\n📢 [ARM_DISARM] 開始{action_name}無人機")
print(f" Drone ID: {drone_id}")
print(f" 動作: {action_name}")
print(f" {msg}")
# 在線程池中直接調用 ROS2 服務(避免 arm_disarm() 導致的初始化衝突)
from fc_interfaces.srv import MavCommandLong
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
def _call_ros2_arm_service():
"""直接調用 ROS2 服務"""
import time
print(f"\n 🔄 [_call_ros2_arm_service] 在線程池中調用 ROS2 服務...")
print(f" ├─ 時間: {time.time()}")
print(f" ├─ 目標: sysid={sysid}, arm={arm}")
print(f" └─ 直接調用ROS2服務避免rclpy.init()衝突)\n")
# 直接調用 async 方法,無需線程池(避免嵌套執行器衝突) try:
result = await self.command_long_client.arm_disarm_async( # 建立 ROS2 客戶端(使用現有 context
target_sysid=sysid, client = self.create_client(MavCommandLong, "/fc_network/vehicle/send_command_long")
arm=arm,
target_compid=0, # 等待服務
timeout_sec=2.0, if not client.wait_for_service(timeout_sec=2.0):
) print(f" ❌ 服務不可用")
return {'success': False, 'message': 'Service not available', 'ack_result': -1}
print(f" ✓ 服務已連接")
# 準備請求
req = MavCommandLong.Request()
req.target_sysid = sysid
req.target_compid = 0
req.command = 400 # MAV_CMD_COMPONENT_ARM_DISARM
req.confirmation = 0
req.param1 = 1.0 if arm else 0.0
req.param2 = 0.0
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = 2.0
# 調用服務
future = client.call_async(req)
# 簡單等待
timeout = time.time() + 3.0
while not future.done() and time.time() < timeout:
time.sleep(0.01)
if future.done() and future.result():
response = future.result()
return {
'success': response.success,
'message': response.message,
'ack_result': response.ack_result,
}
else:
return {'success': False, 'message': 'Service call timeout', 'ack_result': -1}
if result and result.success: except Exception as e:
print(f"✅ [ARM] {action_name}成功") print(f" ❌ 異常: {e}")
return {'success': False, 'message': str(e), 'ack_result': -1}
print(f" 📢 [ARM_DISARM] 提交 ROS2 服務呼叫到線程池...")
result_dict = await loop.run_in_executor(executor, _call_ros2_arm_service)
if result_dict['success']:
self.get_logger().info(f"Arm state change successful for {drone_id}")
print(f"\n✅ [ARM_DISARM] 無人機{action_name}成功!")
return True return True
else: else:
print(f"❌ [ARM] {action_name}失敗 (message={result.message if result else 'None'})") self.get_logger().warning(f"Arm state change failed for {drone_id}")
print(f"\n❌ [ARM_DISARM] 無人機{action_name}失敗!")
return False return False
except Exception as e: except Exception as e:
print(f"❌ [ARM] 例外錯誤: {e}") self.get_logger().error(f"Exception during arm state change for {drone_id}: {e}")
traceback.print_exc() print(f"\n❌ [ARM_DISARM] 例外錯誤: {e}")
return False return False
async def takeoff_drone(self, drone_id, altitude): async def takeoff_drone(self, drone_id, altitude):
"""使用 CommandLongClient 執行無人機起飛(使用非阻塞的 async 方法)""" """
使用 fc_network_apps takeoff 函數執行無人機起飛
參數:
drone_id: 無人機ID (格式: "s{socket_id}_{sysid}")
altitude: 目標高度 ()
返回:
bool: 起飛是否成功
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
print(f"\n🔵 [TAKEOFF] takeoff_drone() 異步函數被調用 (drone_id={drone_id}, altitude={altitude})", flush=True)
# 解析 drone_id 以提取 sysid
try: try:
# 解析 drone_id 提取 sysid
parts = drone_id.split('_') parts = drone_id.split('_')
if len(parts) < 2: if len(parts) < 2:
self.get_logger().error(f"Invalid drone_id format: {drone_id}")
print(f"❌ [TAKEOFF] 無效的 drone_id 格式: {drone_id}") print(f"❌ [TAKEOFF] 無效的 drone_id 格式: {drone_id}")
return False return False
sysid = int(parts[-1]) sysid = int(parts[-1])
print(f"✓ [TAKEOFF] 解析 drone_id: {drone_id} → sysid={sysid}")
except (ValueError, IndexError) as e:
self.get_logger().error(f"Failed to parse drone_id {drone_id}: {e}")
print(f"❌ [TAKEOFF] 無法解析 drone_id {drone_id}: {e}")
return False
print(f"\n📢 [TAKEOFF] {drone_id} → 起飛 (高度={altitude}m)") # 檢查 fc_network_apps 的 takeoff 函數是否可用
if takeoff is None:
if not self.command_long_client: self.get_logger().error("fc_network_apps takeoff is not available. Cannot takeoff drone.")
print(f"❌ [TAKEOFF] CommandLongClient 未初始化") print(f"❌ [TAKEOFF] fc_network_apps takeoff 不可用")
return False return False
# 直接調用 async 方法,無需線程池(避免嵌套執行器衝突)
result = await self.command_long_client.takeoff_async(
target_sysid=sysid,
altitude_m=float(altitude),
target_compid=0,
timeout_sec=2.0,
)
if result and result.success: try:
print(f"✅ [TAKEOFF] 起飛成功") print(f"\n📢 [TAKEOFF] 開始起飛無人機")
print(f" Drone ID: {drone_id}")
print(f" ROS2 服務呼叫: target_sysid={sysid}, altitude_m={altitude}")
# 在線程池中運行同步的 takeoff 函數
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
def _call_takeoff():
print(f"\n 🔄 [_call_takeoff] 在線程池中調用 takeoff(altitude={altitude})...")
result = takeoff(
target_sysid=sysid,
altitude_m=float(altitude),
target_compid=0,
min_pitch_deg=0.0,
yaw_deg=0.0,
timeout_sec=2.0,
)
print(f"\n └─ takeoff() 返回結果")
return result
print(f" 📢 [TAKEOFF] 提交 takeoff 到線程池...")
result = await loop.run_in_executor(executor, _call_takeoff)
print(f"\n📥 [TAKEOFF] 從 takeoff() 接收服務響應:")
print(f" ├─ result.success: {result.success}")
print(f" ├─ result.message: '{result.message}'")
print(f" └─ result.ack_result: {result.ack_result}")
if result.success:
self.get_logger().info(f"Drone {drone_id} takeoff successfully: {result.message}")
print(f"\n✅ [TAKEOFF] 無人機起飛成功!")
print(f" ├─ fc_network 確認: success=True")
print(f" ├─ 訊息: {result.message}")
print(f" └─ ACK代碼: {result.ack_result}")
return True return True
else: else:
print(f"❌ [TAKEOFF] 起飛失敗 (message={result.message if result else 'None'})") self.get_logger().warning(f"Failed to takeoff drone {drone_id}: {result.message} (ack={result.ack_result})")
print(f"\n❌ [TAKEOFF] 無人機起飛失敗!")
print(f" ├─ fc_network 確認: success=False")
print(f" ├─ 原因: {result.message}")
print(f" └─ ACK代碼: {result.ack_result}")
return False return False
except Exception as e: except Exception as e:
print(f"❌ [TAKEOFF] 例外錯誤: {e}") self.get_logger().error(f"Exception during takeoff for {drone_id}: {e}")
print(f"\n❌ [TAKEOFF] 例外錯誤: {e}")
import traceback
traceback.print_exc() traceback.print_exc()
return False return False

@ -33,7 +33,7 @@ from mission_group import (
# ================================================================================ # ================================================================================
class ControlStationUI(QMainWindow): class ControlStationUI(QMainWindow):
VERSION = '2.0.5' VERSION = '2.0.2'
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@ -52,10 +52,6 @@ class ControlStationUI(QMainWindow):
self.executor = rclpy.executors.SingleThreadedExecutor() self.executor = rclpy.executors.SingleThreadedExecutor()
self.executor.add_node(self.monitor) self.executor.add_node(self.monitor)
# 添加 CommandLongClient 到執行器(這樣它的回調才能被處理)
if self.monitor.command_long_client:
self.executor.add_node(self.monitor.command_long_client)
# 定时处理ROS事件 # 定时处理ROS事件
self.timer = QTimer() self.timer = QTimer()
self.timer.timeout.connect(self.spin_ros) self.timer.timeout.connect(self.spin_ros)
@ -741,22 +737,70 @@ class ControlStationUI(QMainWindow):
print(f" 準備為 {len(group.drone_ids)} 台無人機切換模式...", flush=True) print(f" 準備為 {len(group.drone_ids)} 台無人機切換模式...", flush=True)
self.statusBar().showMessage(f"正在切換模式...", 1000) self.statusBar().showMessage(f"正在切換模式...", 1000)
# 使用 asyncio 執行(通過事件循環) # 在後台線程中執行(避免阻塞 GUI
async def do_mode_changes_async(): def do_mode_changes_threaded():
print(f"\n 【異步任務】開始執行模式切換", flush=True) print(f"\n 【後台線程】開始執行模式切換", flush=True)
import sys
import os
import time
# 确保 src 在 Python 路径中
src_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if src_path not in sys.path:
sys.path.insert(0, src_path)
print(f" 【路徑】已添加: {src_path}", flush=True)
# 模式值映射
MODE_MAPPING = {
"STABILIZE": 0, "ACRO": 1, "ALT_HOLD": 2, "AUTO": 3,
"GUIDED": 4, "LOITER": 5, "RTL": 6, "CIRCLE": 7,
"LAND": 9, "DRIFT": 11, "SPORT": 13, "AUTOTUNE": 15,
"POSHOLD": 16, "BRAKE": 17, "SMART_RTL": 21,
}
custom_mode = MODE_MAPPING.get(mode)
if custom_mode is None:
msg = f"❌ 未知模式: {mode}"
print(f" {msg}", flush=True)
self.message_queue.put((msg, 2000))
return
for drone_id in group.drone_ids: for drone_id in group.drone_ids:
print(f"\n 【切換】{drone_id}{mode}", flush=True) print(f"\n 【切換】{drone_id}{mode} (mode={custom_mode})", flush=True)
try: try:
result = await self.monitor.set_mode(drone_id, mode) # 導入模式切換函數
from fc_network_apps.changeMode import change_mode
# 解析 sysid從 drone_id 的最後一個數字)
sysid = int(drone_id.split('_')[-1])
print(f" └─ sysid={sysid}", flush=True)
# 調用 change_mode同步操作
try:
result = change_mode(
target_sysid=sysid,
custom_mode=float(custom_mode),
target_compid=0,
base_mode=1.0,
confirmation=0,
timeout_sec=2.0,
)
print(f" └─ 結果: success={result.success}", flush=True)
if result.success:
msg = f"{drone_id} 切換成功"
print(f" {msg}", flush=True)
self.message_queue.put((msg, 2000))
else:
msg = f"{drone_id} 切換失敗: {result.message}"
print(f" {msg}", flush=True)
self.message_queue.put((msg, 2000))
if result: except Exception as service_error:
msg = f"{drone_id} 切換成功" msg = f"{drone_id} 服務調用錯誤: {str(service_error)}"
print(f" {msg}", flush=True)
self.message_queue.put((msg, 2000))
else:
msg = f"{drone_id} 切換失敗"
print(f" {msg}", flush=True) print(f" {msg}", flush=True)
traceback.print_exc()
self.message_queue.put((msg, 2000)) self.message_queue.put((msg, 2000))
except Exception as e: except Exception as e:
@ -764,16 +808,15 @@ class ControlStationUI(QMainWindow):
print(f" {msg}", flush=True) print(f" {msg}", flush=True)
traceback.print_exc() traceback.print_exc()
self.message_queue.put((msg, 2000)) self.message_queue.put((msg, 2000))
self.message_queue.put((msg, 2000))
print(f"\n 【完成】模式切換任務結束\n", flush=True) print(f"\n 【完成】模式切換任務結束\n", flush=True)
# 通過事件循環提交異步任務 # 在後台線程執行
print(f" 【排隊】將任務提交至事件循環", flush=True) import threading
loop = asyncio.get_event_loop() print(f" 【排隊】將任務提交至後台線程", flush=True)
asyncio.run_coroutine_threadsafe( thread = threading.Thread(target=do_mode_changes_threaded, daemon=True)
do_mode_changes_async(), thread.start()
loop
)
def _handle_group_arm(self, group_id): def _handle_group_arm(self, group_id):
"""解鎖群組內所有無人機""" """解鎖群組內所有無人機"""
@ -1720,15 +1763,6 @@ class ControlStationUI(QMainWindow):
receiver.stop() receiver.stop()
for receiver in self.monitor.ws_receivers: for receiver in self.monitor.ws_receivers:
receiver.stop() receiver.stop()
# Clean up serial receivers
for receiver in self.monitor.serial_receivers:
receiver.stop()
# Clean up CommandLongClient
if self.monitor.command_long_client:
try:
self.monitor.command_long_client.destroy_node()
except:
pass
self.monitor.destroy_node() self.monitor.destroy_node()
self.executor.shutdown() self.executor.shutdown()
except Exception as e: except Exception as e:

@ -6,7 +6,7 @@ class OverviewTable(QTableWidget):
"""總覽表格,顯示所有無人機的狀態資訊""" """總覽表格,顯示所有無人機的狀態資訊"""
# 默認的資訊類型和映射 # 默認的資訊類型和映射
DEFAULT_INFO_TYPES = ["模式", "ARM", "電壓", "經度", "緯度", "高度", "XY位置", "XY速度", "地速", "航向", DEFAULT_INFO_TYPES = ["模式", "ARM", "電壓", "經度", "緯度", "高度", "位置", "速度", "地速", "航向",
"空速", "油門", "HUD ALT", "爬升率", "Roll", "Pitch", "Yaw", "丟包", "延遲"] "空速", "油門", "HUD ALT", "爬升率", "Roll", "Pitch", "Yaw", "丟包", "延遲"]
DEFAULT_INFO_TYPE_MAP = { DEFAULT_INFO_TYPE_MAP = {

@ -6,7 +6,6 @@
# 開發此專案的注意事項 # 開發此專案的注意事項
- 預設 autopilot 的 component id = 1 - 預設 autopilot 的 component id = 1
- 不允許 system id 重複 - 不允許 system id 重複
- 預設同一載具僅會使用相同的 socket
- 增加一個固定數值監控然後要到 ros2 topic - 增加一個固定數值監控然後要到 ros2 topic
- mavlinkROS2Node.py 檔案內 - mavlinkROS2Node.py 檔案內
- PublishRateController.topic_intervals 建立 - PublishRateController.topic_intervals 建立
@ -127,7 +126,6 @@
- *cls.mavlinkObjects* 資料結構 { socket_id(序號) : mavlink_object(物件實例) } - *cls.mavlinkObjects* 資料結構 { socket_id(序號) : mavlink_object(物件實例) }
- *self.mavlink_socket* 從 pymavlink 繼承的socket物件 - *self.mavlink_socket* 從 pymavlink 繼承的socket物件
- *self.state* 描述這個 socket 物件的狀態 - *self.state* 描述這個 socket 物件的狀態
- *self.MAVLink* pymavlink 的一個裝置物件 模擬自己是某一個裝置 用以對於該 mavlink bus 負責封包發送
--- ---
- *process_data()* [async method] 核心方法 - *process_data()* [async method] 核心方法
- *remove_target_socket()* *add_target_socket()* - *remove_target_socket()* *add_target_socket()*
@ -172,45 +170,6 @@
## mavlinkVehicleView.py ## mavlinkVehicleView.py
這個檔案是作為載具的資訊暫存庫使用 會搭配 ROS2 的功能 再做利用 這個檔案是作為載具的資訊暫存庫使用 會搭配 ROS2 的功能 再做利用
## mavlinkROS2Nodes.py
這裡支持了所有 ROS2 框架的對應功能 包涵
1. 將載具的串流更新的狀態更新到對應的 topic
2. 將地面站與載具互動 mavlink microservice 包裝起來提供 ros2 service 互動介面
### **[Class]** fc_ros_manager
MAVLink ROS2 節點管理器 管理兩個獨立的 Node
會開一個執行緒 讓兩個 Node 都跑在裡面
然後用 spin_once 保持 Node 的活性
- *self.status_publisher* 物件實例
- *self.command_service* 物件實例
- *self.spin_thread* 執行緒
---
- *start()* 啟動自己
- *stop()* 停下自己
- *shutdown()* 關閉自己並清理
### **[Class]** VehicleStatusPublisher
這整個組件都是自動的 目前沒有什麼需要 runtime 設定的地方
- *self.fc_publishers* 儲存所有 publisher 的資料結構
- *self.rate_controller* 儲存頻率參數的地方 也可以關掉某個 topic 的發佈
---
- *timer_callback()* 自動的抓出現有的 vehicle 中有的資訊 然後固定的將訊息丟到負責發佈的方法中
- *_publish_vehicle_status()* 接 timer_callback 後去分配到各項發佈中
- *_get_or_create_publisher()* 實際創建 topic 的地方
- *_publish_XXX()* 各項發佈的方法
### **[Class]** MavlinkCommandService
提供 ros2 service 讓 serial_object 去丟出 mavlink 封包
然後會從回應封包中挑出期待的回應 再傳給 ros2 request
其中 PendingEntry 是很關鍵的東西
它的每個物件 代表需要等待的封包種類與必需包含的內容
- *self._pending_by_sysid* 儲存 PendingEntry 的地方
---
- *return_router()* 檢查並消耗 return_packet_ring 然後將封包分配到 pending 去
- *__init__()* 這邊登入要創建的 service 到 ros2 系統
- *handle_XXX()* 這邊是實現 service 的具體方法
# 開發記錄 # 開發記錄
## 已實現功能 ## 已實現功能
@ -223,9 +182,9 @@
7. 終端機介面控制 7. 終端機介面控制
8. 基礎載具流量觀測 8. 基礎載具流量觀測
9. 載具狀態收集與彙整 9. 載具狀態收集與彙整
10. a. ros2 topic 應用開發介面 (基礎框架) 10. a. ros2 topic 應用開發介面
b. ros2 service 應用開發介面 (基礎框架)
### 待開發功能 ### 待開發功能
5-1. 建立 serial 連線 並可以對接收器下達AT指令 5-1. 建立 serial 連線 並可以對接收器下達AT指令
5-2. 模組化 serial 連線機制 以利後期擴容其他模組 5-2. 模組化 serial 連線機制 以利後期擴容其他模組
10. a. ros2 應用開發介面

@ -29,7 +29,7 @@ from .utils import acquireSerial, acquirePort
from .utils.acquirePort import find_available_port from .utils.acquirePort import find_available_port
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
PROJECT_VER = "v0.61" VERSION_NO = "v0.61"
class PanelState: class PanelState:
def __init__(self): def __init__(self):
@ -304,7 +304,7 @@ class ControlPanel:
# help_line = start_line + len(current_menu.children) + 2 # help_line = start_line + len(current_menu.children) + 2
help_line = height - 2 help_line = height - 2
stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層", curses.A_DIM) stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層", curses.A_DIM)
stdscr.addstr(height-1 , width-12, f" {PROJECT_VER} ", curses.A_DIM) stdscr.addstr(height-1 , width-12, f" {VERSION_NO} ", curses.A_DIM)
stdscr.refresh() stdscr.refresh()
@ -1141,8 +1141,7 @@ class Orchestrator:
self.fc_ros_manager.initialize() self.fc_ros_manager.initialize()
self.fc_ros_manager.status_publisher.rate_controller.topic_intervals = { self.fc_ros_manager.status_publisher.rate_controller.topic_intervals = {
'position': 1.0, 'position': 1.0,
'position_ned': 1.0, 'attitude': 0.0,
'attitude': 1.0,
'velocity': 0.0, 'velocity': 0.0,
'battery': 1.0, 'battery': 1.0,
'vfr_hud': 1.0, 'vfr_hud': 1.0,
@ -1340,7 +1339,6 @@ class Orchestrator:
self.occupied_ip_ports[ip].append(port) self.occupied_ip_ports[ip].append(port)
else: else:
self.occupied_ip_ports[ip] = [port] self.occupied_ip_ports[ip] = [port]
# 外放資訊部分 # 外放資訊部分
elif self.panelState.udp_info_temp["direction"] == "outbound": elif self.panelState.udp_info_temp["direction"] == "outbound":
connection_string = f"udpout:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}" connection_string = f"udpout:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}"
@ -1560,27 +1558,8 @@ class Orchestrator:
def main(): def main():
# =========== 各項模組的版本先驗 ===========
# 除非你有在做這幾項模組的改版 不然動到這邊的版本號 代表執行環境有很大的問題!!!!!!
version_check = True
if mo.MODULE_VER != "1.50":
print("Module Version Error! : mavlinkObkect")
version_check = False
if mros.MODULE_VER != "1.50":
print("Module Version Error! : mavlinkROS2Nodes")
version_check = False
if mvv.MODULE_VER != "1.00":
print("Module Version Error! : mavlinkVehicleView")
version_check = False
if sm.MODULE_VER != "0.60":
print("Module Version Error! : serialManager")
version_check = False
if version_check == False:
print("Environment Obstacle! Check YOUR Execution System Path First!!")
return
# ========================================
stop_evt = threading.Event() stop_evt = threading.Event()
def signal_handler(signum, frame): def signal_handler(signum, frame):
"""處理 Ctrl+C 信號 藉此訊號 會結束下面的 while 迴圈 並逐步關閉各模組""" """處理 Ctrl+C 信號 藉此訊號 會結束下面的 while 迴圈 並逐步關閉各模組"""
stop_evt.set() stop_evt.set()
@ -1614,7 +1593,4 @@ if __name__ == "__main__":
4. 註解掉無效代碼 action == "UPDATE_VEHICLES_LIST" 區塊 4. 註解掉無效代碼 action == "UPDATE_VEHICLES_LIST" 區塊
5. 系統納入 mavlink ROS2 Manager 模組 5. 系統納入 mavlink ROS2 Manager 模組
2025.04.13
1. 加入各項模組的版本先驗 檢驗失敗就直接離開
''' '''

@ -28,6 +28,9 @@ mavlink_bridge:
專門處理 stream_bridge_ring 裡面的訊息流 專門處理 stream_bridge_ring 裡面的訊息流
會把訊息流解開後 存放到 mavlinkVehicleView.py 定義的載具結構視圖 會把訊息流解開後 存放到 mavlinkVehicleView.py 定義的載具結構視圖
''' '''
# 基礎功能的 import # 基礎功能的 import
@ -58,8 +61,6 @@ from .utils import RingBuffer, setup_logger
# ====================== 分割線 ===================== # ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.50"
stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255) stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255)
return_packet_ring = RingBuffer(capacity=256, buffer_id=254) return_packet_ring = RingBuffer(capacity=256, buffer_id=254)
@ -385,7 +386,7 @@ class mavlink_object:
self.MAVLink = mav_common.MAVLink(self.mavlinkPipeline, srcSystem=254, srcComponent=191) self.MAVLink = mav_common.MAVLink(self.mavlinkPipeline, srcSystem=254, srcComponent=191)
# 記錄訊息過濾類型 (可選) # 記錄訊息過濾類型 (可選)
self.bridge_msg_types = set([0, 30, 32, 33, 74, 147]) # 0 HEARTBEAT, 30 ATTITUDE, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS, 32 LOCAL_POSITION_NED self.bridge_msg_types = set([0, 30, 33, 74, 147]) # 0 HEARTBEAT, 30 ATTITUDE, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS
self.return_msg_types = set([]) self.return_msg_types = set([])
# 轉發到別的 mavlink object 作為目標端口 的列表 # 轉發到別的 mavlink object 作為目標端口 的列表

@ -27,7 +27,6 @@ from rclpy.executors import MultiThreadedExecutor
import std_msgs.msg import std_msgs.msg
import sensor_msgs.msg import sensor_msgs.msg
import geometry_msgs.msg import geometry_msgs.msg
import nav_msgs.msg
import mavros_msgs.msg import mavros_msgs.msg
# ROS2 Service imports # ROS2 Service imports
@ -40,13 +39,12 @@ from . import mavlinkObject as mo
from .utils import setup_logger from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.50"
# ============================================================================ # ============================================================================
# VehicleStatusPublisher Node # 頻率控制器
# ============================================================================ # ============================================================================
# 頻率控制器
class PublishRateController: class PublishRateController:
"""發布頻率控制器 - 按時間間隔控制發布頻率""" """發布頻率控制器 - 按時間間隔控制發布頻率"""
@ -57,17 +55,14 @@ class PublishRateController:
# 例如:'ekf_status': 1.0, # EKF 狀態 # 例如:'ekf_status': 1.0, # EKF 狀態
# ═══════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════
# 各 topic 的發布間隔(秒) # 各 topic 的發布間隔(秒)
# 注意 這邊是定義區 不要把參數寫在這裡 所以預設全部關閉
# 以這個專案 請看 mainOrchestrator.py 的 Orchestrator 初始化階段
self.topic_intervals = { self.topic_intervals = {
'position': 0.0, # GNSS位置 'position': 0.5, # GPS位置
'position_ned': 0.0, # LOCAL_POSITION_NED (位置+速度) 'attitude': 0.5, # 姿態
'attitude': 0.0, # 姿態 (pitch yaw row 與其加速狀態) 'velocity': 0.5, # 速度
'velocity': 0.0, # 速度 (已經包含在 vfr_hud 未來移除) 'battery': 1.0, # 電池
'battery': 0.0, # 電池 'vfr_hud': 0.5, # VFR HUD
'vfr_hud': 0.0, # VFR HUD (地速 空速 絕對高度 爬升率 航向 油門) 'mode': 1.0, # 飛行模式
'mode': 0.0, # 飛行模式 (已經在 summary 裡 未來移除) 'summary': 1.0, # 載具摘要
'summary': 0.0, # 載具摘要 (sysid 飛行模式 解鎖上鎖 gps狀態)
# 在這裡新增更多 topics... # 在這裡新增更多 topics...
} }
# 記錄每個 topic 的最後發布時間 {(sysid, topic): timestamp} # 記錄每個 topic 的最後發布時間 {(sysid, topic): timestamp}
@ -108,6 +103,11 @@ class PublishRateController:
"""重置所有計時器""" """重置所有計時器"""
self.last_publish_time.clear() self.last_publish_time.clear()
# ============================================================================
# VehicleStatusPublisher Node
# ============================================================================
class VehicleStatusPublisher(Node): class VehicleStatusPublisher(Node):
""" """
載具狀態發布者 - vehicle_registry 讀取數據並發布到 ROS2 topics 載具狀態發布者 - vehicle_registry 讀取數據並發布到 ROS2 topics
@ -144,8 +144,10 @@ class VehicleStatusPublisher(Node):
if not self.running: if not self.running:
return return
# 高頻快路徑:讀取 registry 的 immutable 快照 # 從 vehicle_registry 獲取所有載具
for sysid, vehicle in mvv.vehicle_registry.read_all(): all_vehicles = mvv.vehicle_registry.get_all()
for sysid, vehicle in all_vehicles.items():
self._publish_vehicle_status(vehicle) self._publish_vehicle_status(vehicle)
def _publish_vehicle_status(self, vehicle: mvv.VehicleView): def _publish_vehicle_status(self, vehicle: mvv.VehicleView):
@ -171,7 +173,6 @@ class VehicleStatusPublisher(Node):
# ═══════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════
# 發布各種狀態(通過頻率控制器判斷是否發布) # 發布各種狀態(通過頻率控制器判斷是否發布)
self._publish_position(sysid, status) self._publish_position(sysid, status)
self._publish_position_ned(sysid, status)
self._publish_attitude(sysid, status) self._publish_attitude(sysid, status)
self._publish_velocity(sysid, status) self._publish_velocity(sysid, status)
self._publish_battery(sysid, status) self._publish_battery(sysid, status)
@ -228,42 +229,6 @@ class VehicleStatusPublisher(Node):
publisher.publish(msg) publisher.publish(msg)
def _publish_position_ned(self, sysid: int, status: mvv.ComponentStatus):
"""發布 LOCAL_POSITION_NEDNED 座標,含位置與速度)"""
if not self.rate_controller.should_publish(sysid, 'position_ned'):
return
local_pos = status.custom_status.get('local_position')
if not local_pos:
return
required_keys = ('x', 'y', 'z', 'vx', 'vy', 'vz')
if any(local_pos.get(k) is None for k in required_keys):
return
publisher = self._get_or_create_publisher(
sysid, 'position_ned', nav_msgs.msg.Odometry
)
if publisher.get_subscription_count() == 0:
return
msg = nav_msgs.msg.Odometry()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'local_ned'
msg.child_frame_id = 'base_link_ned'
# 保持 MAVLink LOCAL_POSITION_NED 座標定義,不進行 ENU 轉換
msg.pose.pose.position.x = float(local_pos['x'])
msg.pose.pose.position.y = float(local_pos['y'])
msg.pose.pose.position.z = float(local_pos['z'])
msg.twist.twist.linear.x = float(local_pos['vx'])
msg.twist.twist.linear.y = float(local_pos['vy'])
msg.twist.twist.linear.z = float(local_pos['vz'])
publisher.publish(msg)
def _publish_attitude(self, sysid: int, status: mvv.ComponentStatus): def _publish_attitude(self, sysid: int, status: mvv.ComponentStatus):
"""發布姿態IMU""" """發布姿態IMU"""
if not self.rate_controller.should_publish(sysid, 'attitude'): if not self.rate_controller.should_publish(sysid, 'attitude'):
@ -272,6 +237,7 @@ class VehicleStatusPublisher(Node):
att = status.attitude att = status.attitude
if att.roll is None: if att.roll is None:
return return
publisher = self._get_or_create_publisher(sysid, 'attitude', sensor_msgs.msg.Imu) publisher = self._get_or_create_publisher(sysid, 'attitude', sensor_msgs.msg.Imu)
if publisher.get_subscription_count() == 0: if publisher.get_subscription_count() == 0:
@ -511,17 +477,12 @@ class MavlinkCommandService(Node):
- 作為 service server 等待 client 請求 - 作為 service server 等待 client 請求
- 接收請求組裝 MAVLink 封包 - 接收請求組裝 MAVLink 封包
- 調用 mavlinkObject 發送封包 - 調用 mavlinkObject 發送封包
- 處理 ACK 等待和超時 - 處理 ACK 等待和超時未來實現
設計理念回歸 MAVLink 純粹結構 設計理念回歸 MAVLink 純粹結構
- 只負責將 ROS2 請求轉換為 MAVLink 封包 - 只負責將 ROS2 請求轉換為 MAVLink 封包
- 不預設功能 ARM/DISARM) 保持模組化 - 不預設功能 ARM/DISARM) 保持模組化
- 高層應用可透過此 service 實現各種功能 - 高層應用可透過此 service 實現各種功能
講白話一點就是
每次接到一個 service 請求 要整個系統丟某種指令給載具時
會做兩件事 1."丟出mavlink封包" 2."創造一個臨時信箱 Pending"
""" """
serviceString_prefix = '/fc_network/vehicle' serviceString_prefix = '/fc_network/vehicle'
@ -532,8 +493,8 @@ class MavlinkCommandService(Node):
# 狀態標記 # 狀態標記
self.running = True self.running = True
# # mavlinkObject 的引用(將由外部設置) 用不到 # mavlinkObject 的引用(將由外部設置)
# self.mavlink_analyzer = None self.mavlink_analyzer = None
# pending 旗標物件的儲存庫 # pending 旗標物件的儲存庫
self._pending_by_sysid = {} # sysid(int) : PendingEntry self._pending_by_sysid = {} # sysid(int) : PendingEntry
@ -596,8 +557,7 @@ class MavlinkCommandService(Node):
def return_router(self): def return_router(self):
''' '''
在節點管理器哪邊被呼叫 這邊是給外部迴圈呼叫的 消耗 return_packet_ring 裡接收到的 mavlink 封包
消耗 return_packet_ring 裡接收到的 mavlink 封包
分送到各自的 pending 分送到各自的 pending
藉由 event.set() 解開 service 中的 block 藉由 event.set() 解開 service 中的 block
''' '''
@ -993,8 +953,7 @@ class fc_ros_manager:
try: try:
# 初始化 ROS2 # 初始化 ROS2
if not rclpy.ok(): rclpy.init()
rclpy.init(args=None)
# 創建節點 node # 創建節點 node
self.status_publisher = VehicleStatusPublisher() self.status_publisher = VehicleStatusPublisher()
@ -1136,9 +1095,5 @@ ros2_manager = fc_ros_manager()
1. 完成 ros2 MavPositionTargetGlobalInt 區域 1. 完成 ros2 MavPositionTargetGlobalInt 區域
2. 優化 response.ack_result 回傳值的資訊 2. 優化 response.ack_result 回傳值的資訊
TODO
1. service 部分會需要跟 mavlinkobject 大量互動 也許需要考慮對方的生命週期
2. 還沒測試 如果有失效狀態讓 fc_ros_manager 中斷了 如何恢復的問題
''' '''

@ -5,14 +5,14 @@ VehicleView - Pure State Container
""" """
import os import os
from typing import Dict, Optional, Any, Tuple from typing import Dict, Optional, Any
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum from enum import Enum
from .utils import setup_logger from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.00"
# ====================== Enums ===================== # ====================== Enums =====================
@ -402,18 +402,11 @@ class VehicleViewRegistry:
def __init__(self): def __init__(self):
self._vehicles: Dict[int, VehicleView] = {} self._vehicles: Dict[int, VehicleView] = {}
# 高頻讀取用 snapshot弱一致性
self._vehicles_snapshot_items: Tuple[Tuple[int, VehicleView], ...] = tuple()
def _refresh_snapshot(self) -> None:
"""重建快照(僅在 key 集合變動時觸發)"""
self._vehicles_snapshot_items = tuple(self._vehicles.items())
def register(self, sysid: int) -> VehicleView: def register(self, sysid: int) -> VehicleView:
"""註冊新的載具視圖""" """註冊新的載具視圖"""
if sysid not in self._vehicles: if sysid not in self._vehicles:
self._vehicles[sysid] = VehicleView(sysid) self._vehicles[sysid] = VehicleView(sysid)
self._refresh_snapshot()
logger.info(f"Registered new VehicleView for system {sysid}") logger.info(f"Registered new VehicleView for system {sysid}")
return self._vehicles[sysid] return self._vehicles[sysid]
@ -425,7 +418,6 @@ class VehicleViewRegistry:
"""註銷載具視圖""" """註銷載具視圖"""
if sysid in self._vehicles: if sysid in self._vehicles:
del self._vehicles[sysid] del self._vehicles[sysid]
self._refresh_snapshot()
logger.info(f"Unregistered VehicleView for system {sysid}") logger.info(f"Unregistered VehicleView for system {sysid}")
return True return True
return False return False
@ -434,21 +426,9 @@ class VehicleViewRegistry:
"""取得所有載具視圖""" """取得所有載具視圖"""
return self._vehicles.copy() return self._vehicles.copy()
def read_all(self) -> Tuple[Tuple[int, VehicleView], ...]:
"""
高效讀取所有載具視圖快照弱一致性
注意:
- 回傳 immutable tuple 快照適合高頻迭代
- 僅在 register/unregister/clear 後更新可能不是最新資料
- 適合高頻讀取且可接受弱一致性的情境
"""
return self._vehicles_snapshot_items
def clear(self) -> None: def clear(self) -> None:
"""清空所有載具視圖""" """清空所有載具視圖"""
self._vehicles.clear() self._vehicles.clear()
self._refresh_snapshot()
logger.info("Cleared all VehicleViews") logger.info("Cleared all VehicleViews")
def __len__(self) -> int: def __len__(self) -> int:
@ -469,8 +449,5 @@ vehicle_registry = VehicleViewRegistry()
2026.01.16 2026.01.16
1. 新增 重置指定組件的封包統計 功能 1. 新增 重置指定組件的封包統計 功能
2026.04.13
1. 新增 VehicleViewRegistry.read_all 方法 ROS2 Node 組件更有效率的讀取其中的資料
''' '''

@ -26,7 +26,6 @@ from .utils import setup_logger
# ====================== 分割線 ===================== # ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "0.60"
# ====================== 分割線 ===================== # ====================== 分割線 =====================

@ -1,10 +1,14 @@
from .longCommand import CommandLongClient, CommandLongResult from .longCommand import CommandLongClient, ChangeModeResult
from .navigation import PositionTargetGlobalIntClient, PositionTargetGlobalIntResult from .changeMode import change_mode
from .arm_disarm import arm_disarm
from .takeoff import takeoff
from .land import land
__all__ = [ __all__ = [
"CommandLongClient", "CommandLongClient",
"PositionTargetGlobalIntClient", "ChangeModeResult",
"CommandLongResult", "change_mode",
"PositionTargetGlobalIntResult", "arm_disarm",
"takeoff",
"land",
] ]

@ -0,0 +1,93 @@
"""Simple wrapper for arm/disarm via fc_network ROS2 service (MAV_CMD_COMPONENT_ARM_DISARM)."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_COMPONENT_ARM_DISARM = 400
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class ArmDisarmResult:
success: bool
message: str
ack_result: int
def arm_disarm(
*,
target_sysid: int,
arm: bool,
target_compid: int = 0,
confirmation: int = 0,
param2: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> ArmDisarmResult:
"""One-shot MAV_CMD_COMPONENT_ARM_DISARM (400) wrapper.
param1: 1.0 to arm, 0.0 to disarm.
param2: usually 0. Some stacks use 21196 for force-arm (ArduPilot); pass via param2 if needed.
"""
if not rclpy.ok():
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_arm_disarm_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return ArmDisarmResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_COMPONENT_ARM_DISARM
req.confirmation = confirmation
req.param1 = 1.0 if arm else 0.0
req.param2 = float(param2)
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return ArmDisarmResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return ArmDisarmResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
if __name__ == "__main__":
r = arm_disarm(target_sysid=3, arm=True)
print(
f"arm_disarm success={r.success}, "
f"ack_result={r.ack_result}, message='{r.message}'"
)

@ -0,0 +1,99 @@
"""Simple wrapper for mode change via fc_network ROS2 service.
Reference CLI command:
ros2 service call /fc_network/vehicle/send_command_long \
fc_interfaces/srv/MavCommandLong \
"{target_sysid: 3, target_compid: 0, command: 176, confirmation: 0, \
param1: 1, param2: 4, param3: 0, param4: 0, param5: 0, param6: 0, \
param7: 0, timeout_sec: 2}"
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_DO_SET_MODE = 176
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class ChangeModeResult:
"""Return value for mode change requests."""
success: bool
message: str
ack_result: int
def change_mode(
*,
target_sysid: int,
custom_mode: float,
target_compid: int = 0,
base_mode: float = 1.0,
confirmation: int = 0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> ChangeModeResult:
"""One-shot helper for collaborators who want minimal code."""
if not rclpy.ok():
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_change_mode_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return ChangeModeResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_DO_SET_MODE
req.confirmation = confirmation
req.param1 = float(base_mode)
req.param2 = float(custom_mode)
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return ChangeModeResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return ChangeModeResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
if __name__ == "__main__":
# Example values are aligned with your terminal command.
result = change_mode(target_sysid=3, custom_mode=4)
print(
f"change_mode success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -0,0 +1,90 @@
"""Simple wrapper for land via fc_network ROS2 service."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_NAV_LAND = 21
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class LandResult:
success: bool
message: str
ack_result: int
def land(
*,
target_sysid: int,
target_compid: int = 0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
altitude_m: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> LandResult:
"""One-shot MAV_CMD_NAV_LAND wrapper."""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_land_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return LandResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_NAV_LAND
req.confirmation = 0
req.param1 = 0.0
req.param2 = 0.0
req.param3 = 0.0
req.param4 = float(yaw_deg)
req.param5 = float(latitude) if latitude is not None else 0.0
req.param6 = float(longitude) if longitude is not None else 0.0
req.param7 = float(altitude_m)
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return LandResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return LandResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
result = land(target_sysid=3)
print(
f"land success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -25,6 +25,10 @@ class CommandLongResult:
message: str message: str
ack_result: int ack_result: int
ChangeModeResult = CommandLongResult
class CommandLongClient(Node): class CommandLongClient(Node):
"""ROS2 client : 對同一個 send_command_long service 發送各種 MAV_CMD_*。""" """ROS2 client : 對同一個 send_command_long service 發送各種 MAV_CMD_*。"""

@ -99,6 +99,7 @@ def _echo_position_target_matches(
@dataclass @dataclass
class PositionTargetGlobalIntResult: class PositionTargetGlobalIntResult:
"""對應 MavPositionTargetGlobalInt.srv 的 Responsesuccess 依「送出與 r_* echo 是否一致」。""" """對應 MavPositionTargetGlobalInt.srv 的 Responsesuccess 依「送出與 r_* echo 是否一致」。"""
success: bool success: bool
message: str message: str
ack_result: int ack_result: int
@ -116,6 +117,7 @@ class PositionTargetGlobalIntResult:
r_yaw: float = 0.0 r_yaw: float = 0.0
r_yaw_rate: float = 0.0 r_yaw_rate: float = 0.0
class PositionTargetGlobalIntClient(Node): class PositionTargetGlobalIntClient(Node):
""" """
ROS2 client pos_global_int service 發送 SET_POSITION_TARGET_GLOBAL_INT (msg 86) ROS2 client pos_global_int service 發送 SET_POSITION_TARGET_GLOBAL_INT (msg 86)
@ -228,6 +230,51 @@ class PositionTargetGlobalIntClient(Node):
r_yaw_rate=float(resp.r_yaw_rate), r_yaw_rate=float(resp.r_yaw_rate),
) )
# def set_position_target_global_int(
# self,
# *,
# target_sysid: int,
# target_compid: int = 0,
# time_boot_ms: int = 0,
# coordinate_frame: int = 6,
# type_mask: int = 0,
# lat_int: int = 0,
# lon_int: int = 0,
# alt: float = 0.0,
# vx: float = 0.0,
# vy: float = 0.0,
# vz: float = 0.0,
# afx: float = 0.0,
# afy: float = 0.0,
# afz: float = 0.0,
# yaw: float = 0.0,
# yaw_rate: float = 0.0,
# timeout_sec: float = DEFAULT_TIMEOUT_SEC,
# ) -> PositionTargetGlobalIntResult:
# """
# 送出 SET_POSITION_TARGET_GLOBAL_INTcoordinate_frame 預設 6 (MAV_FRAME_GLOBAL_INT)
# 可依飛控/任務覆寫。
# """
# return self._send_position_target_global_int(
# target_sysid=target_sysid,
# target_compid=target_compid,
# time_boot_ms=time_boot_ms,
# coordinate_frame=coordinate_frame,
# type_mask=type_mask,
# lat_int=lat_int,
# lon_int=lon_int,
# alt=alt,
# vx=vx,
# vy=vy,
# vz=vz,
# afx=afx,
# afy=afy,
# afz=afz,
# yaw=yaw,
# yaw_rate=yaw_rate,
# timeout_sec=timeout_sec,
# )
def goto_position_only( def goto_position_only(
self, self,
*, *,
@ -260,3 +307,47 @@ class PositionTargetGlobalIntClient(Node):
yaw_rate=0, yaw_rate=0,
timeout_sec=timeout_sec, timeout_sec=timeout_sec,
) )
# def set_position_target_global_int_deg(
# self,
# *,
# target_sysid: int,
# latitude_deg: float,
# longitude_deg: float,
# target_compid: int = 0,
# time_boot_ms: int = 0,
# coordinate_frame: int = MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
# type_mask: int = 0,
# alt: float = 0.0,
# vx: float = 0.0,
# vy: float = 0.0,
# vz: float = 0.0,
# afx: float = 0.0,
# afy: float = 0.0,
# afz: float = 0.0,
# yaw: float = 0.0,
# yaw_rate: float = 0.0,
# timeout_sec: float = DEFAULT_TIMEOUT_SEC,
# ) -> PositionTargetGlobalIntResult:
# """以度為單位的緯經,轉成 MAVLink 的 lat_int / lon_int (1e7 * deg)。"""
# lat_int = int(round(latitude_deg * 1e7))
# lon_int = int(round(longitude_deg * 1e7))
# return self.set_position_target_global_int(
# target_sysid=target_sysid,
# target_compid=target_compid,
# time_boot_ms=time_boot_ms,
# coordinate_frame=coordinate_frame,
# type_mask=type_mask,
# lat_int=lat_int,
# lon_int=lon_int,
# alt=alt,
# vx=vx,
# vy=vy,
# vz=vz,
# afx=afx,
# afy=afy,
# afz=afz,
# yaw=yaw,
# yaw_rate=yaw_rate,
# timeout_sec=timeout_sec,
# )

@ -0,0 +1,91 @@
"""Simple wrapper for takeoff via fc_network ROS2 service."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_NAV_TAKEOFF = 22
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class TakeoffResult:
success: bool
message: str
ack_result: int
def takeoff(
*,
target_sysid: int,
altitude_m: float,
target_compid: int = 0,
min_pitch_deg: float = 0.0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> TakeoffResult:
"""One-shot MAV_CMD_NAV_TAKEOFF wrapper."""
if not rclpy.ok():
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_takeoff_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return TakeoffResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_NAV_TAKEOFF
req.confirmation = 0
req.param1 = float(min_pitch_deg)
req.param2 = 0.0
req.param3 = 0.0
req.param4 = float(yaw_deg)
req.param5 = float(latitude) if latitude is not None else 0.0
req.param6 = float(longitude) if longitude is not None else 0.0
req.param7 = float(altitude_m)
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return TakeoffResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return TakeoffResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
if __name__ == "__main__":
result = takeoff(target_sysid=3, altitude_m=10.0)
print(
f"takeoff success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -0,0 +1,55 @@
from fc_network_apps import CommandLongClient
import time
def main():
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
commandAPI = CommandLongClient()
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=3.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=5.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -0,0 +1,30 @@
"""Usage example for arm/disarm helper.
Run from repo root with module mode:
python -m someotherpkg.src.example_arm_disarm
"""
from fc_network_apps import arm_disarm
def main() -> None:
# MAV_CMD_COMPONENT_ARM_DISARM (command=400)
# param1: 1 = arm, 0 = disarm
result = arm_disarm(
target_sysid=3,
target_compid=0,
arm=True,
timeout_sec=2.0,
)
print("=== arm result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
# To disarm instead:
# result = arm_disarm(target_sysid=3, target_compid=0, arm=False, timeout_sec=2.0)
if __name__ == "__main__":
main()

@ -0,0 +1,29 @@
"""Usage example for collaborators.
Run after sourcing your ROS2 workspace:
source install/local_setup.bash
python src/fc_network_apps/example_change_mode.py
"""
from fc_network_apps import change_mode
def main() -> None:
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
result = change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -0,0 +1,27 @@
"""Usage example for land helper.
Run from repo root with module mode:
python -m someotherpkg.src.example_land
"""
from fc_network_apps import land
def main() -> None:
# MAV_CMD_NAV_LAND (command=21)
# This example asks vehicle sysid=3 to land.
result = land(
target_sysid=3,
target_compid=0,
yaw_deg=0.0,
timeout_sec=2.0,
)
print("=== land result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -0,0 +1,48 @@
from __future__ import annotations
from fc_network_apps.navigation import PositionTargetGlobalIntClient
import time
def main() -> None:
# 目標載具 system id依你的連線修改
target_sysid = 3
nav = PositionTargetGlobalIntClient()
if not nav.wait_for_service(timeout_sec=5.0):
print("Service /fc_network/vehicle/pos_global_int not available.")
return
result_deg = nav.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.376655,
longitude_deg=149.157011,
alt=200.0,
timeout_sec=3.0,
)
print("=== set_position_target_global_int_deg ===")
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
time.sleep(180)
result_deg = nav.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.3632621,
longitude_deg=149.1652375,
alt=100.0,
timeout_sec=3.0,
)
print("=== set_position_target_global_int_deg ===")
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
if __name__ == "__main__":
main()

@ -0,0 +1,28 @@
"""Usage example for takeoff helper.
Run from repo root with module mode:
python -m someotherpkg.src.example_takeoff
"""
from fc_network_apps import takeoff
def main() -> None:
# MAV_CMD_NAV_TAKEOFF (command=22)
# This example asks vehicle sysid=3 to take off to 10 meters.
result = takeoff(
target_sysid=3,
target_compid=0,
altitude_m=10.0,
yaw_deg=0.0,
timeout_sec=2.0,
)
print("=== takeoff result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -0,0 +1,64 @@
from fc_network_apps import CommandLongClient
import time
def main():
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
commandAPI = CommandLongClient()
result = commandAPI.change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=3.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(3)
result = commandAPI.arm_disarm(
target_sysid=3,
target_compid=0,
arm = True,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
result = commandAPI.takeoff(
target_sysid=3,
target_compid=0,
altitude_m = 30.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
# time.sleep(20)
# result = commandAPI.land(
# target_sysid=3,
# target_compid=0,
# )
# print("=== change mode result ===")
# print(f"success : {result.success}")
# print(f"ack_result: {result.ack_result}")
# print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -1,111 +0,0 @@
import fc_network_apps as fap
import time
def main():
commandAPI = fap.CommandLongClient()
navAPI = fap.PositionTargetGlobalIntClient()
target_sysid = 3
print("=== change mode ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.change_mode(
target_sysid=target_sysid,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=3.0,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
print("=== arm vehicle ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.arm_disarm(
target_sysid=target_sysid,
arm = True,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
print("=== takeoff ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.takeoff(
target_sysid=target_sysid,
altitude_m = 100.0,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
time.sleep(15)
print("=== set_position_target_global_int_deg ===")
result_deg = navAPI.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.376655,
longitude_deg=149.157011,
alt=150.0,
timeout_sec=3.0,
)
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
time.sleep(180)
result_deg = navAPI.goto_position_only(
target_sysid=target_sysid,
latitude_deg=-35.3632621,
longitude_deg=149.1652375,
alt=100.0,
timeout_sec=3.0,
)
print("=== set_position_target_global_int_deg ===")
print(f"success : {result_deg.success}")
print(f"ack_result: {result_deg.ack_result}")
print(f"message : {result_deg.message}")
print(f"r_lat_int : {result_deg.r_lat_int} r_lon_int: {result_deg.r_lon_int}")
time.sleep(180)
print("=== land ===")
result = fap.CommandLongResult(success=False,message="",ack_result=-1)
while not result.success:
result.ack_result -= 1
result = commandAPI.land(
target_sysid=3,
target_compid=0,
)
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
time.sleep(1)
if result.ack_result < -3:
return
if __name__ == "__main__":
main()

@ -28,13 +28,14 @@ python -m someotherpkg.src.example_takeoff_land
python -m someotherpkg.src.example_change_mode python -m someotherpkg.src.example_change_mode
ros2 topic list ros2 topic list
ros2 topic echo /fc_network/vehicle/sys3/battery ros2 topic echo
/home/picars/ardupilot/build/sitl/bin/arducopter -S --model + --speedup 1 --slave 0 --defaults /home/picars/ardupilot/Tools/autotest/default_params/copter.parm --sim-address=127.0.0.1 -I3 --sysid 7 /home/picars/ardupilot/build/sitl/bin/arducopter -S --model + --speedup 1 --slave 0 --defaults /home/picars/ardupilot/Tools/autotest/default_params/copter.parm --sim-address=127.0.0.1 -I3 --sysid 7
mavproxy.py --master=tcp:127.0.0.1:5790 --out=udp:127.0.0.1:14560 mavproxy.py --master=tcp:127.0.0.1:5790 --out=udp:127.0.0.1:14560
ros2 service call /mavlink/add_two_ints example_interfaces/srv/AddTwoInts "{a: 5,b: 8}" ros2 service call /mavlink/add_two_ints example_interfaces/srv/AddTwoInts "{a: 5,b: 8}"
ros2 service call /fc_network/vehicle/mav_ping fc_interfaces/srv/MavPing "{target_sysid: 3, target_compid: 0, ping_seq: 1}" ros2 service call /fc_network/vehicle/mav_ping fc_interfaces/srv/MavPing "{target_sysid: 3, target_compid: 0, ping_seq: 1}"
@ -48,10 +49,16 @@ sudo tcpdump -i lo 'udp dst port 14561' -X
sudo tcpdump -i lo 'udp dst port 14550' -X -vv sudo tcpdump -i lo 'udp dst port 14550' -X -vv
sudo tcpdump -i lo -X udp port 14550 sudo tcpdump -i lo -X udp port 14550
幾個要討論的
1. 專案文件中 .mat 檔案是幹嘛的? colcon build --packages-select fc_interfaces
2. GUI 的 print [Panel/Map Update] 11 Hz 希望關閉
3. readme 那串文字來源? 用途?
4. GUI介面 加一下海拔高 跟 相對高 速度跟位置 變更為 XY速度 XY位置
5. pitch yaw roll 出來了 弄一下 -35.360150, 149.159659
6. -35.376655, 149.157011
0b00 0000 00000 00000
0b00 0011 01111 11000
0b 11 01111 11000
0b 1111 1101 1111 1000
Loading…
Cancel
Save