Compare commits

...

4 Commits

1
.gitignore vendored

@ -25,3 +25,4 @@ Makefile
**/*.class **/*.class
**/*.pyc **/*.pyc
**/*.pyo **/*.pyo
**/.cursor/

@ -21,6 +21,12 @@ ROS2
1. source ~/ros2_humble/install/setup.bash 1. source ~/ros2_humble/install/setup.bash
2. 2.
===
功能簡介
1. mavlink 多對多支援平台
2. 不允許進到 ros 系統有相同 sysid
3. 假設所有 component 共用同一 socket
=== ===
開發用輔助專案 開發用輔助專案
1. Gazebo Garden 1. Gazebo Garden
@ -41,6 +47,8 @@ git submodule update
# 2. build 需要的 package # 2. build 需要的 package
colcon build --packages-select angles geographic_msgs colcon build --packages-select angles geographic_msgs
colcon build --packages-select mavros_msgs # 這個依賴前面的 colcon build --packages-select mavros_msgs # 這個依賴前面的
colcon build --packages-select fc_interfaces # 自己定義的
``` ```

@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.8)
project(fc_interfaces)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/MavPing.srv"
"srv/MavCommandLong.srv"
)
ament_package()

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>fc_interfaces</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="chiyu1468@hotmail.com">picars</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<exec_depend>rosidl_default_runtime</exec_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

@ -0,0 +1,18 @@
# Request
uint8 target_sysid
uint8 target_compid
uint16 command
uint8 confirmation
float32 param1
float32 param2
float32 param3
float32 param4
float32 param5
float32 param6
float32 param7
float32 timeout_sec
---
# Response
bool success
string message
uint8 ack_result

@ -0,0 +1,9 @@
# Request
uint8 target_sysid
uint8 target_compid
uint8 ping_seq
---
# Response
bool success
string message
float32 rtt_ms

@ -1138,8 +1138,7 @@ class Orchestrator:
# === 4) ros 部分的準備 === # === 4) ros 部分的準備 ===
self.fc_ros_manager = mros.ros2_manager self.fc_ros_manager = mros.ros2_manager
if not self.fc_ros_manager.initialized: self.fc_ros_manager.initialize()
self.fc_ros_manager.initialize()
self.fc_ros_manager.status_publisher.rate_controller.topic_intervals = { self.fc_ros_manager.status_publisher.rate_controller.topic_intervals = {
'position': 1.0, 'position': 1.0,
'attitude': 0.0, 'attitude': 0.0,
@ -1311,7 +1310,7 @@ class Orchestrator:
if self.fc_ros_manager.spin_thread.is_alive(): if self.fc_ros_manager.spin_thread.is_alive():
if self.fc_ros_manager.running: if self.fc_ros_manager.running:
self.fc_ros_manager.stop() self.fc_ros_manager.shutdown()
self.fc_ros_manager.spin_thread.join(timeout=2) self.fc_ros_manager.spin_thread.join(timeout=2)
# 關閉面板執行緒 # 關閉面板執行緒

@ -45,6 +45,8 @@ from collections import deque
# mavlink 的 import # mavlink 的 import
from pymavlink import mavutil from pymavlink import mavutil
from pymavlink.dialects.v20 import common as mav_common
from pymavlink.dialects.v20 import ardupilotmega as mav_ardupilot
# 自定義的 import # 自定義的 import
from .mavlinkVehicleView import ( from .mavlinkVehicleView import (
@ -111,7 +113,7 @@ class mavlink_bridge:
30: self._handle_attitude, # ATTITUDE 30: self._handle_attitude, # ATTITUDE
32: self._handle_local_position, # LOCAL_POSITION_NED 32: self._handle_local_position, # LOCAL_POSITION_NED
33: self._handle_global_position, # GLOBAL_POSITION_INT 33: self._handle_global_position, # GLOBAL_POSITION_INT
74: self._handle_vfr_hud, # VFR_HUD 74: self._handle_vfr_hud, # VFR_HUD
147: self._handle_battery_status, # BATTERY_STATUS 147: self._handle_battery_status, # BATTERY_STATUS
} }
@ -342,6 +344,19 @@ class mavlink_object:
mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) } mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) }
socket_num = 0 # 用來記錄目前的 socket 數量 socket_num = 0 # 用來記錄目前的 socket 數量
# 用來銜接 pymavlink 的 MavLink 實例的管道
class CapturingPipeline:
def __init__(self, target_deque):
self.target_deque = target_deque
self.last_data = b''
def write(self, data):
self.last_data = bytes(data)
# logger.debug(f'CapturingPipeline write data: {data.hex()}') # developer debug
# 把收到的資料 bytes 放到 target_deque
self.target_deque.append(data)
def seek(self, pos): pass
def tell(self): return 0
def __new__(cls, *args, **kwargs): def __new__(cls, *args, **kwargs):
# 每創建一個實例 就將其添加到 mavlinkObjects 列表中 # 每創建一個實例 就將其添加到 mavlinkObjects 列表中
# 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號 # 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號
@ -359,16 +374,20 @@ class mavlink_object:
cls.mavlinkObjects[socket_id] = instance cls.mavlinkObjects[socket_id] = instance
return instance return instance
def __init__(self, socket): def __init__(self, socket, socket_dialect = 'common'):
# 登入所需的 socket # 登入所需的 socket
self.mavlink_socket = socket self.mavlink_socket = socket
# 用於主線程發送消息的緩衝區 # 用於主線程發送消息的緩衝區
self.outgoing_msgs = deque() self.outgoing_msgs = deque()
self.mavlinkPipeline = self.CapturingPipeline(self.outgoing_msgs)
if socket_dialect == 'common':
self.MAVLink = mav_common.MAVLink(self.mavlinkPipeline, srcSystem=254, srcComponent=191)
# 記錄訊息過濾類型 (可選) # 記錄訊息過濾類型 (可選)
self.bridge_msg_types = set([0, 30, 33, 74, 147]) # 0 HEARTBEAT, 30 ATTITUDE, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS self.bridge_msg_types = set([0, 30, 33, 74, 147]) # 0 HEARTBEAT, 30 ATTITUDE, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS
self.return_msg_types = set() self.return_msg_types = set([])
# 轉發到別的 mavlink object 作為目標端口 的列表 # 轉發到別的 mavlink object 作為目標端口 的列表
self.target_sockets = set() self.target_sockets = set()

@ -24,8 +24,12 @@ import sensor_msgs.msg
import geometry_msgs.msg import geometry_msgs.msg
import mavros_msgs.msg import mavros_msgs.msg
# ROS2 Service imports
import fc_interfaces.srv as fcsrv
# 自定義 imports # 自定義 imports
from . import mavlinkVehicleView as mvv from . import mavlinkVehicleView as mvv
from . import mavlinkObject as mo
from .utils import setup_logger from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
@ -450,195 +454,263 @@ class VehicleStatusPublisher(Node):
# MavlinkCommandService Node # MavlinkCommandService Node
# ============================================================================ # ============================================================================
class PendingEntry():
def __init__(self, event, match_fn):
# self.match_msgid = match_msgid # 只會在初始時設定
self.match_fn = match_fn
self.event = event # 只會在外迴圈 set()
self.result_mav_msg = None #只會在外迴圈 被寫入
self.error = ""
class MavlinkCommandService(Node): class MavlinkCommandService(Node):
""" """
MAVLink 指令服務節點 - 提供 ROS2 service 介面來發送 MAVLink 指令 MAVLink 指令服務節點 - 提供 ROS2 service 介面來發送 MAVLink 指令
職責: 職責:
- 作為 service server等待 client 請求 - 作為 service server 等待 client 請求
- 接收請求組裝 MAVLink 封包 - 接收請求組裝 MAVLink 封包
- 調用 mavlinkObject 發送封包 - 調用 mavlinkObject 發送封包
- 處理 ACK 等待和超時未來實現 - 處理 ACK 等待和超時未來實現
設計理念回歸 MAVLink 純粹結構 設計理念回歸 MAVLink 純粹結構
- 只負責將 ROS2 請求轉換為 MAVLink 封包 - 只負責將 ROS2 請求轉換為 MAVLink 封包
- 不預設功能 ARM/DISARM保持模組化 - 不預設功能 ARM/DISARM) 保持模組化
- 高層應用可透過此 service 實現各種功能 - 高層應用可透過此 service 實現各種功能
""" """
serviceString_prefix = '/fc_network/vehicle'
def __init__(self): def __init__(self):
super().__init__('mavlink_command_service') super().__init__('mavlink_command_service')
# ═══════════════════════════════════════════════════════════════════ # 狀態標記
# ROS2 Service 架構說明: self.running = True
#
# 1. Service 定義:由 .srv 檔案定義Request + Response
# - Request: client 發送的請求內容
# - Response: server 回傳的結果
#
# 2. Service Server 創建:
# self.create_service(srv_type, service_name, callback_function)
# - srv_type: service 的訊息類型(需要自定義或使用標準)
# - service_name: service 的名稱client 用此名稱呼叫)
# - callback_function: 處理請求的回調函數
#
# 3. Callback 函數:
# def callback(self, request, response):
# # request: 包含 client 發送的數據
# # response: 需要填充並返回給 client
# return response
#
# 4. Service Client 使用方式(在其他程式中):
# client = node.create_client(srv_type, service_name)
# request = srv_type.Request()
# future = client.call_async(request) # 異步調用
# # 或 response = client.call(request) # 同步調用
# ═══════════════════════════════════════════════════════════════════
# 由於 ROS2 自定義 service 需要 .srv 檔案編譯
# 這裡先使用標準 String service 作為簡化實現
# TODO: 未來可創建專門的 .srv 檔案
from std_srvs.srv import Trigger
from example_interfaces.srv import SetBool
# mavlinkObject 的引用(將由外部設置)
self.mavlink_analyzer = None
# pending 旗標物件的儲存庫
self._pending_by_sysid = {} # sysid(int) : PendingEntry
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
# Service 1: 發送 MAVLink Message通用介面 # Service : ADD_TWO 測試用,未來刪除
# 使用 Trigger 作為臨時實現,未來應使用自定義 service
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
# TODO: 創建 SendMavlinkMessage.srv from example_interfaces.srv import AddTwoInts
# Request: self.srv_test = self.create_service(
# uint8 target_sysid AddTwoInts,
# uint8 target_compid 'mavlink/add_two_ints',
# uint16 message_id self.handle_add_two_ints
# string fields_json # JSON 格式的字段數據
# bool wait_response
# uint16 response_msgid
# float32 timeout
# Response:
# bool success
# string response_json
# string error_message
# 暫時使用簡化版本(僅示範架構)
self.srv_send_message = self.create_service(
Trigger,
'/mavlink/send_message',
self.handle_send_message
) )
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
# Service 2: 發送 COMMAND_LONG # Service : TIMESYNC 可以作為模板
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
self.srv_command_long = self.create_service( self.srv_mav_ping = self.create_service(
Trigger, fcsrv.MavPing,
'/mavlink/send_command_long', self.serviceString_prefix + '/mav_ping',
self.handle_command_long self.handle_mav_timesync_ping
) )
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
# Service 3: 參數請求 # Service : 發送 COMMAND_LONG
# ═══════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════
self.srv_param_request = self.create_service( self.srv_command_long = self.create_service(
Trigger, fcsrv.MavCommandLong,
'/mavlink/param_request', self.serviceString_prefix + '/send_command_long',
self.handle_param_request self.handle_command_long
) )
# 狀態標記
self.running = True
# mavlinkObject 的引用(將由外部設置)
self.mavlink_analyzer = None
logger.info("MavlinkCommandService initialized") logger.info("MavlinkCommandService initialized")
def set_mavlink_analyzer(self, mavlink_analyzer): def _index(self, target_sysid, target_compid):
"""
設置 mavlink_analyzer 引用 # 找到對應的 vehicle
vehicle = mvv.vehicle_registry.get(target_sysid)
Args: if not vehicle:
mavlink_analyzer: mavlinkObject.mavlink_analyzer 實例 return None
"""
self.mavlink_analyzer = mavlink_analyzer socket_id = vehicle.custom_meta.get("socket_id")
logger.info("MavlinkCommandService: mavlink_analyzer set") if socket_id is None:
return None
# 提取主要的 socket_id
mav_obj = mo.mavlink_object.mavlinkObjects.get(socket_id)
if mav_obj is None:
return None
return mav_obj
def return_router(self):
'''
這邊是給外部迴圈呼叫的 消耗 return_packet_ring 裡接收到的 mavlink 封包
分送到各自的 pending
藉由 event.set() 解開 service 中的 block
'''
return_tuple = mo.return_packet_ring.get()
# 確認 return_packet_ring 有資料
if return_tuple == None: return
socketid, timestamp, msg = return_tuple
sysid = msg.get_srcSystem()
_pending = self._pending_by_sysid.get(sysid)
# 確認是否有 service 在等待回應 若無直接 return 此封包也會被忽略
if _pending == None:
return
if _pending.match_fn(msg):
_pending.result_mav_msg = msg
_pending.event.set()
# ═══════════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════════
# Service Handler: 發送 MAVLink Message # Service Handler: 發送 TIMESYNC 可以作為模板
# ═══════════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════════
def handle_send_message(self, request, response): def handle_mav_timesync_ping(self, request, response):
""" '''
處理發送 MAVLink 訊息的請求 timesync 封包驗證來回的時間
'''
ROS2 Service Callback 說明 # 設定失效回應
- 此函數會在 client 調用 service 時被執行 response.success = False
- request: 包含 client 傳入的參數 response.message = "Unknown error"
- response: 需要填充結果並返回給 client response.rtt_ms = 0.0
- 必須 return response timeout_sec = 2.0
Args: # 1) 確認是否已經有相同 sysid 的其他需求正在 pending
request: Trigger.Request (暫時使用未來改為自定義) if request.target_sysid in self._pending_by_sysid:
response: Trigger.Response response.message = f"sysid {request.target_sysid} already has pending request"
Returns:
response: 填充後的回應
"""
logger.info("Received send_message request")
# 檢查 mavlink_analyzer 是否已設置
if self.mavlink_analyzer is None:
response.success = False
response.message = "Error: mavlink_analyzer not set"
logger.error(response.message)
return response return response
# TODO: 實際實現 # 2) 找到 socket 標地
# 1. 從 request 解析參數target_sysid, message_id, fields 等) socketObject = self._index(request.target_sysid, request.target_compid)
# 2. 使用 pymavlink 組裝 MAVLink 封包 if socketObject is None:
# 3. 調用 mavlink_analyzer.send_message() 發送 response.message = "This system id not found."
# 4. 如果 wait_response=True則等待 return_packet_ring 中的回應 return response
# 暫時返回成功(示範用) # 3) 接收封包系統 的設定
response.success = True # 在 socket 那邊先把要的封包種類導流進來
response.message = "Message sent (placeholder implementation)" expect_recieve_msg_id = 111 # TIMESYNC
socketObject.set_return_message_types(list(socketObject.return_msg_types) + [expect_recieve_msg_id])
evt = threading.Event()
# 設定封包檢驗
# 這邊是設定 tc1 不為 0
def match_fn(compare_msg):
if compare_msg.get_msgId() != expect_recieve_msg_id :
return False
# logger.debug(f"mark A : {compare_msg.get_srcSystem()} ,{compare_msg.get_msgId()}, {compare_msg.tc1}, {compare_msg.ts1}")
if compare_msg.tc1 == 0 :
return False
return True
_pending = PendingEntry(event = evt, match_fn = match_fn)
self._pending_by_sysid[request.target_sysid] = _pending
# 4) 送出封包
now_us = int(time.monotonic() * 1e6)
socketObject.MAVLink.timesync_send(0, now_us)
fail_skip = False
# 5) 等待回應封包
if not evt.wait(timeout_sec):
response.message = "waiting Timeout - TIMESYNC"
fail_skip = True
# 6) 處理回應封包
if not fail_skip:
ack_msg = _pending.result_mav_msg
current_time = int(time.monotonic() * 1e6)
response.rtt_ms = (current_time - now_us) / 1e3
response.message = ""
# 7) 接收封包系統 的重置
msg_types = list(socketObject.return_msg_types)
if expect_recieve_msg_id in msg_types:
msg_types.remove(expect_recieve_msg_id)
socketObject.set_return_message_types(msg_types)
del evt
self._pending_by_sysid.pop(request.target_sysid, None)
return response return response
def handle_add_two_ints(self, request, response):
"""測試用 Service Handler 未來刪除"""
logger.info(f"Received add_two_ints request: {request.a} + {request.b}")
response.sum = request.a + request.b
logger.info(
f"[add_two_ints] thread_ident={threading.get_ident()} time={time.time()}"
)
time.sleep(1)
return response
# ═══════════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════════
# Service Handler: 發送 COMMAND_LONG # Service Handler: 發送 COMMAND_LONG
# ═══════════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════════
def handle_command_long(self, request, response): def handle_command_long(self, request, response):
""" # 設定失效回應
處理發送 COMMAND_LONG 的請求 response.success = False
response.message = "Unknown error"
COMMAND_LONG (MAVLink message ID=76): timeout_sec = request.timeout_sec
- 用於發送簡單命令給載具
- 常用於 ARM/DISARM, 模式切換, TAKEOFF, LAND # 1) 確認是否已經有相同 sysid 的其他需求正在 pending
if request.target_sysid in self._pending_by_sysid:
Args: response.message = f"sysid {request.target_sysid} already has pending request"
request: Trigger.Request
response: Trigger.Response
Returns:
response: 填充後的回應
"""
logger.info("Received command_long request")
if self.mavlink_analyzer is None:
response.success = False
response.message = "Error: mavlink_analyzer not set"
return response return response
# TODO: 實際實現 # 2) 找到 socket 標地
# 1. 從 request 解析 COMMAND_LONG 參數 socketObject = self._index(request.target_sysid, request.target_compid)
# - target_sysid, target_compid if socketObject is None:
# - command (MAV_CMD_xxx) response.message = "This system id not found."
# - param1~param7 return response
# 2. 組裝 COMMAND_LONG 封包
# 3. 發送並等待 COMMAND_ACK (message ID=77) # 3) 接收封包系統 的設定
# 4. 解析 ACK 結果ACCEPTED/FAILED 等) expect_recieve_msg_id = 77 # mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK
socketObject.set_return_message_types(list(socketObject.return_msg_types) + [expect_recieve_msg_id])
response.success = True evt = threading.Event()
response.message = "Command sent (placeholder implementation)"
def match_fn(compare_msg):
if compare_msg.get_msgId() != expect_recieve_msg_id :
return False
return True
_pending = PendingEntry(event = evt, match_fn = match_fn)
self._pending_by_sysid[request.target_sysid] = _pending
# 4) 送出封包
socketObject.MAVLink.command_long_send(
request.target_sysid,
request.target_compid,
request.command,
request.confirmation,
request.param1, request.param2, request.param3, request.param4,
request.param5, request.param6, request.param7
)
fail_skip = False
# 5) 等待回應封包
if not evt.wait(timeout_sec):
response.message = "waiting Timeout - CommLONG"
fail_skip = True
# 6) 處理回應封包
if not fail_skip:
ack_msg = _pending.result_mav_msg
response.success = (ack_msg.result == 0) # mavutil.mavlink.MAV_RESULT_ACCEPTED
response.message = "" # 沒有消息就是好消息
response.ack_result = ack_msg.result
# 7) 接收封包系統 的重置
msg_types = list(socketObject.return_msg_types)
if expect_recieve_msg_id in msg_types:
msg_types.remove(expect_recieve_msg_id)
socketObject.set_return_message_types(msg_types)
del evt
self._pending_by_sysid.pop(request.target_sysid, None)
return response return response
# ═══════════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════════
# Service Handler: 參數請求 # Service Handler: 參數請求
@ -719,12 +791,6 @@ class MavlinkCommandService(Node):
response.message = "Param request sent (placeholder implementation)" response.message = "Param request sent (placeholder implementation)"
return response return response
# ═══════════════════════════════════════════════════════════════════════
# 【新增 Service 位置 4/4】
# 若要新增 service請在此處添加新的 handler 方法
# 並在 __init__ 中創建對應的 service server
# ═══════════════════════════════════════════════════════════════════════
def stop(self): def stop(self):
"""停止服務""" """停止服務"""
self.running = False self.running = False
@ -739,6 +805,11 @@ class fc_ros_manager:
""" """
MAVLink ROS2 節點管理器 MAVLink ROS2 節點管理器
因為物件創建在 mavlinkROS2Nodes.py 所以會分為 __init__ initialize 兩個階段
stop 是停下止 ROS2 nodes 的運行但不銷毀節點實例允許後續再次 start
shutdown 是完全關閉 ROS2 並銷毀節點實例
管理兩個獨立的 ROS2 Node 管理兩個獨立的 ROS2 Node
- VehicleStatusPublisher - VehicleStatusPublisher
- MavlinkCommandService - MavlinkCommandService
@ -750,7 +821,7 @@ class fc_ros_manager:
self.initialized = False self.initialized = False
self.running = False self.running = False
# 两个 node # 两个 node
self.status_publisher: Optional[VehicleStatusPublisher] = None self.status_publisher: Optional[VehicleStatusPublisher] = None
self.command_service: Optional[MavlinkCommandService] = None self.command_service: Optional[MavlinkCommandService] = None
@ -772,7 +843,7 @@ class fc_ros_manager:
self.status_publisher = VehicleStatusPublisher() self.status_publisher = VehicleStatusPublisher()
self.command_service = MavlinkCommandService() self.command_service = MavlinkCommandService()
# 創建執行者 MultiThreadedExecutor # 創建執行者 MultiThreadedExecutor 並把 node 加入其中
self.executor = MultiThreadedExecutor() self.executor = MultiThreadedExecutor()
self.executor.add_node(self.status_publisher) self.executor.add_node(self.status_publisher)
self.executor.add_node(self.command_service) self.executor.add_node(self.command_service)
@ -813,12 +884,14 @@ class fc_ros_manager:
self.running = False self.running = False
return False return False
# 循環執行的地方
def _spin_executor(self): def _spin_executor(self):
"""在 thread 中運行的 executor""" """在 thread 中運行的 executor"""
try: try:
# logger.info("ROS2 executor spinning...") # logger.info("ROS2 executor spinning...")
while self.running: while self.running:
self.executor.spin_once(timeout_sec=0.1) self.executor.spin_once(timeout_sec=0.1)
self.command_service.return_router()
except Exception as e: except Exception as e:
logger.error(f"fc_ros_manager error in spinning executor: {e}") logger.error(f"fc_ros_manager error in spinning executor: {e}")
@ -899,4 +972,8 @@ ros2_manager = fc_ros_manager()
4. 添加頻率控制器 控制各 topic 發布頻率 以及是否發布 4. 添加頻率控制器 控制各 topic 發布頻率 以及是否發布
5. 預留 MavlinkCommandService 結構稍後實現 5. 預留 MavlinkCommandService 結構稍後實現
2026.03.27
1. 完成 ros2 service 結構與 timesync command_long 協議
''' '''

@ -15,6 +15,7 @@ import time
import threading import threading
import struct import struct
from enum import Enum, auto from enum import Enum, auto
from abc import ABC, abstractmethod
# # XBee 模組 # # XBee 模組
# from xbee.frame import APIFrame # from xbee.frame import APIFrame
@ -27,6 +28,105 @@ from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
# ====================== 分割線 ===================== # ====================== 分割線 =====================
# 定義 serial 連線的模式
class SerialMode(Enum):
"""連接類型"""
STRAIGHT = auto() # 原始數據直通
XBEEAPI2AT = auto() # XBee API 模式
NOT_USE = auto() # 不使用
# ====================== Frame Processor 基類與實現 =====================
class FrameProcessor(ABC):
"""協議處理器基類"""
def __init__(self):
self.buffer = bytearray()
@abstractmethod
def process_incoming(self, data: bytes):
"""
處理接收到的數據
返回已完整解析的 payload 列表
"""
pass
@abstractmethod
def process_outgoing(self, data: bytes) -> bytes:
"""
封裝要發送的數據
返回封裝後的完整幀
"""
pass
class RawFrameProcessor(FrameProcessor):
"""原始數據直通處理器"""
def process_incoming(self, data: bytes):
"""直接返回原始數據,不進行緩衝"""
return [data] if data else []
def process_outgoing(self, data: bytes) -> bytes:
"""直接返回原始數據,不進行封裝"""
return data
class XBeeFrameProcessor(FrameProcessor):
"""XBee API 協議處理器"""
def process_incoming(self, data: bytes):
"""處理 XBee API 幀並提取 payload"""
self.buffer.extend(data)
payloads = []
while len(self.buffer) >= 3:
# 尋找幀頭
if self.buffer[0] != 0x7E:
self.buffer.pop(0)
continue
# 讀取 payload 長度
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1 # 起始符(1) + 長度(2) + payload + 校驗和(1)
# 等待完整幀
if len(self.buffer) < full_length:
break
# 提取完整 frame 並從緩衝區移除
frame = bytes(self.buffer[:full_length])
del self.buffer[:full_length]
# 判斷 frame 類型並處理
frame_type = frame[3]
if frame_type == 0x90: # RX Packet
payload = XBeeFrameHandler.decapsulate_data(frame)
if payload:
payloads.append(payload)
elif frame_type == 0x88: # AT Response
# 可以在這裡處理 AT 指令回應
# response = XBeeFrameHandler.parse_at_command_response(frame)
# 目前忽略
pass
elif frame_type == 0x8B: # Transmit Status
# 傳輸狀態,目前忽略
pass
else:
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
return payloads
def process_outgoing(self, data: bytes) -> bytes:
"""將數據封裝為 XBee API 傳輸幀"""
return XBeeFrameProcessor.encapsulate_data(data)
# ====================== XBee Frame Handler =====================
class XBeeFrameHandler: class XBeeFrameHandler:
"""XBee API Frame 處理器""" """XBee API Frame 處理器"""
@ -55,7 +155,7 @@ class XBeeFrameHandler:
@staticmethod @staticmethod
def parse_receive_packet(frame: bytes) -> dict: def parse_receive_packet(frame: bytes) -> dict:
# """解析 RX Packet (0x90) - 未來擴展用""" """解析 RX Packet (0x90) - 未來擴展用"""
# if len(frame) < 15 or frame[3] != 0x90: # if len(frame) < 15 or frame[3] != 0x90:
# return None # return None
@ -91,34 +191,12 @@ class XBeeFrameHandler:
@staticmethod @staticmethod
def decapsulate_data(data: bytes): def decapsulate_data(data: bytes):
# 這裡可以根據需要進行數據解封裝
# XBee API 幀格式:
# 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節)
# 檢查幀起始符 (0x7E)
if not data or len(data) < 5 or data[0] != 0x7E:
return data
# 獲取數據長度 (不包括校驗和) # 獲取數據長度 (不包括校驗和)
# length = (data[1] << 8) + data[2]
length = (data[1] << 8) | data[2] length = (data[1] << 8) | data[2]
# 檢查幀完整性 rf_data_start = 3 + 12
if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和 return data[rf_data_start:3 + length]
return data
# 提取API標識符和數據
frame_type = data[3]
# frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中
# 根據不同的幀類型進行處理
if frame_type == 0x90: # 例如,這是"接收數據包"類型
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
else:
return None
return data
class ATCommandHandler: class ATCommandHandler:
@ -137,7 +215,7 @@ class ATCommandHandler:
"""根據 AT 指令類型分派處理""" """根據 AT 指令類型分派處理"""
if not response or not response['is_ok']: if not response or not response['is_ok']:
if response: if response:
print(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}") logger.warning(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}")
return return
command = response['command'] command = response['command']
@ -146,189 +224,122 @@ class ATCommandHandler:
if handler: if handler:
handler(response['data']) handler(response['data'])
else: else:
print(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}") logger.debug(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}")
def _handle_rssi(self, data: bytes): def _handle_rssi(self, data: bytes):
"""處理 DB (RSSI) 回應""" """處理 DB (RSSI) 回應"""
if not data: # 未來可實現 RSSI 處理邏輯
return pass
rssi_value = data[0]
now = time.time()
# 檢查是否最近有收到 MAVLink
last_mavlink_time = serial_last_mavlink_time.get(self.serial_port, 0)
if now - last_mavlink_time > 0.5:
print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLinkRSSI = -{rssi_value} dBm 已忽略")
return
# 取得對應的 sysid
sysid = serial_to_sysid.get(self.serial_port)
if sysid is None:
print(f"[{self.serial_port}] 找不到 sysid 對應RSSI = -{rssi_value} dBm已忽略")
return
# 記錄 RSSI
rssi_history[sysid].append(-rssi_value)
time_history[sysid].append(now)
# print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm")
def _handle_serial_high(self, data: bytes): def _handle_serial_high(self, data: bytes):
# """處理 SH (Serial Number High) - 範例""" """處理 SH (Serial Number High)"""
# if len(data) >= 4:
# serial_high = int.from_bytes(data[:4], 'big')
# print(f"[{self.serial_port}] Serial High: 0x{serial_high:08X}")
pass pass
def _handle_serial_low(self, data: bytes): def _handle_serial_low(self, data: bytes):
# """處理 SL (Serial Number Low) - 範例""" """處理 SL (Serial Number Low)"""
# if len(data) >= 4:
# serial_low = int.from_bytes(data[:4], 'big')
# print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}")
pass pass
# ====================== 分割線 =====================
class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發 # ====================== Serial Handler =====================
def __init__(self, udp_handler, serial_port_str):
self.udp_handler = udp_handler # UDP 的傳輸物件
self.serial_port_str = serial_port_str
self.at_handler = ATCommandHandler(serial_port_str)
self.buffer = bytearray() # 用於緩存接收到的資料 class SerialHandler(asyncio.Protocol):
self.transport = None # Serial 自己的傳輸物件 """asyncio.Protocol 用於處理 Serial 收發"""
# self.first_data = True # 標記是否為第一次收到資料
# self.has_processed = False # 測試模式用 處理數據旗標 # debug def __init__(self, udp_handler, serial_port_str, serial_mode: SerialMode):
self.udp_handler = udp_handler # UDP 的傳輸物件
self.serial_port_str = serial_port_str
self.serial_mode = serial_mode
self.transport = None # Serial 自己的傳輸物件
# 根據模式創建對應的 processor
self.processor = self._create_processor(serial_mode)
# AT 指令處理器(僅 XBee 模式使用)
if serial_mode == SerialMode.XBEEAPI2AT:
self.at_handler = ATCommandHandler(serial_port_str)
else:
self.at_handler = None
def _create_processor(self, serial_mode: SerialMode) -> FrameProcessor:
"""工廠方法:根據模式創建處理器"""
if serial_mode == SerialMode.STRAIGHT:
return RawFrameProcessor()
elif serial_mode == SerialMode.XBEEAPI2AT:
return XBeeFrameProcessor()
else:
logger.warning(f"Unknown serial mode: {serial_mode}, using Raw")
return RawFrameProcessor()
def connection_made(self, transport): def connection_made(self, transport):
"""連接建立時的回調"""
self.transport = transport self.transport = transport
if hasattr(self.udp_handler, 'set_serial_handler'): if hasattr(self.udp_handler, 'set_serial_handler'):
self.udp_handler.set_serial_handler(self) self.udp_handler.set_serial_handler(self)
# logger.info(f"Serial port {self.serial_port_str} connected.") # debug logger.debug(f"Serial port {self.serial_port_str} connected")
# Serial 收到資料的處理過程
def data_received(self, data): def data_received(self, data):
# 1. 把收到的資料加入緩衝區 """Serial 收到資料的處理過程"""
self.buffer.extend(data) # 使用 processor 處理接收到的數據
payloads = self.processor.process_incoming(data)
# 2. 需要完整的 header 才能解析
while len(self.buffer) >= 3: # 將所有解析完成的 payload 轉發到 UDP
# 3. 瞄準 XBee API Frame (0x7E 開頭的封包) for payload in payloads:
if self.buffer[0] != 0x7E: self.udp_handler.transport.sendto(
self.buffer.pop(0) # 如果不是就丟掉 payload,
continue (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port)
)
# 4. 讀取 payload 長度
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1
# 5. 等待完整封包
if len(self.buffer) < full_length:
break
# 6. 提取完整 frame 並從緩衝區移除
an_frame = self.buffer[:full_length]
del self.buffer[:full_length]
# 7. 判斷 frame 類型 # ====================== UDP Handler =====================
frame_type = an_frame[3]
if frame_type == 0x88:
# 處理 AT Command 回應
# response = XBeeFrameHandler.parse_at_command_response(an_frame)
# self.at_handler.handle_response(response)
pass
elif frame_type == 0x90:
# Receive Packet (RX) payload 先解碼
processed_data = XBeeFrameHandler.decapsulate_data(bytes(an_frame))
# 轉換失敗就捨棄了
if processed_data is None:
continue
# 再透過 UDP 送出
self.udp_handler.transport.sendto(processed_data, (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port))
elif frame_type == 0x8B:
pass
else: class UDPHandler(asyncio.DatagramProtocol):
# 其他類型的 frame 未來可擴展處理 現在忽略 """asyncio.DatagramProtocol 用於處理 UDP 收發"""
logger.warning(f"[{self.serial_port_str}] Undefined frame type: 0x{frame_type:02X}")
# # RSSI
# if frame[3] == 0x88 and frame[5:7] == b'DB': # frame[3] == 0x88 AT -> API 封包
# # frame[5:7] == b'DB' -> API 封包的DB參數
# status = frame[7] #
# if status == 0x00 and len(frame) > 8: # status == 0x00 -> 這個封包是有效封包
# rssi_value = frame[8]
# now = time.time()
# # === 優化 1僅信任最近 0.5 秒內有接收 MAVLink 的 port
# last_time = serial_last_mavlink_time.get(self.serial_port, 0)
# if now - last_time <= 0.5:
# sysid = serial_to_sysid.get(self.serial_port, None)
# if sysid is not None:
# rssi_history[sysid].append(-rssi_value)
# time_history[sysid].append(now)
# # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm")
# else:
# print(f"[{self.serial_port}] 找不到 sysid 對應RSSI = -{rssi_value} dBm已忽略")
# else:
# print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLinkRSSI = -{rssi_value} dBm 已忽略")
# else:
# print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}")
class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發
LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端IP LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端 IP
def __init__(self, target_port):
self.target_port = target_port # 目標 UDP 端口
self.serial_handler = None # Serial 的傳輸物件 def __init__(self, target_port, serial_mode: SerialMode):
self.transport = None # UDP 自己的傳輸物件 self.target_port = target_port # 目標 UDP 端口
self.remote_addr = None # 儲存動態獲取的遠程地址 # debug self.serial_mode = serial_mode
# self.has_processed = False # 測試模式用 處理數據旗標 # debug self.serial_handler = None # Serial 的傳輸物件
self.transport = None # UDP 自己的傳輸物件
def connection_made(self, transport): def connection_made(self, transport):
"""連接建立時的回調"""
self.transport = transport self.transport = transport
# logger.info(f"UDP transport ready. Waiting for serial data before sending.") # debug logger.debug(f"UDP transport ready for port {self.target_port}")
def set_serial_handler(self, serial_handler): def set_serial_handler(self, serial_handler):
"""設置對應的 Serial Handler"""
self.serial_handler = serial_handler self.serial_handler = serial_handler
# UDP 收到資料的處理過程
def datagram_received(self, data, addr): def datagram_received(self, data, addr):
# 儲存對方的地址(這樣就能向同一個來源回傳數據) """UDP 收到資料的處理過程"""
# self.remote_addr = addr # debug if not self.serial_handler:
# print(f"Received UDP data from {addr}, setting as remote address") logger.warning("Serial handler not set, dropping UDP packet")
return
processed_data = XBeeFrameHandler.encapsulate_data(data) # 使用 processor 封裝數據
processed_data = self.serial_handler.processor.process_outgoing(data)
if self.serial_handler:
self.serial_handler.transport.write(processed_data) # 發送到串口
self.serial_handler.transport.write(processed_data)
#==================================================================
class SerialReceiverType(Enum):
"""連接類型"""
TELEMETRY = auto()
XBEEAPI2AT = auto()
OTHER = auto()
# ====================== Serial Manager =====================
class serial_manager: class serial_manager:
"""串口管理器"""
class serial_object: class serial_object:
def __init__(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): """串口物件"""
self.serial_port = serial_port # /dev/ttyUSB or COM3 ...etc def __init__(self, serial_port, baudrate, target_port, serial_mode: SerialMode):
self.serial_port = serial_port # /dev/ttyUSB or COM3 ...etc
self.baudrate = baudrate self.baudrate = baudrate
self.receiver_type = receiver_type self.serial_mode = serial_mode
self.target_port = target_port # 指向的 UPD 端口 self.target_port = target_port # 指向的 UDP 端口
self.transport = None # TODO 這個變數可能沒有作用 self.transport = None
self.protocol = None # TODO 這個變數可能沒有作用 self.protocol = None
self.udp_handler = None self.udp_handler = None
self.serial_handler = None self.serial_handler = None
@ -337,21 +348,21 @@ class serial_manager:
self.loop = None self.loop = None
self.running = False self.running = False
self.serial_count = 0 self.serial_count = 0
self.serial_objects = {} # serial id num : serial_object self.serial_objects = {} # serial id num : serial_object
def __del__(self): def __del__(self):
self.loop = None self.loop = None
self.thread = None self.thread = None
def start(self): def start(self):
"""啟動 serial_manager"""
if self.running: if self.running:
logger.warning("serial_manager already running") logger.warning("serial_manager already running")
return return False
self.running = True self.running = True
# 啟動獨立線程 命名為 SerialManager # 啟動獨立線程命名為 SerialManager
self.thread = threading.Thread( self.thread = threading.Thread(
target=self._run_event_loop, target=self._run_event_loop,
name="SerialManager" name="SerialManager"
@ -375,7 +386,6 @@ class serial_manager:
def shutdown(self): def shutdown(self):
"""停止 serial_manager 和其管理的所有 serial_object""" """停止 serial_manager 和其管理的所有 serial_object"""
# 自己在 running 狀態下才執行停止程序
if not self.running: if not self.running:
logger.warning("serial_manager is not running") logger.warning("serial_manager is not running")
return return
@ -404,37 +414,25 @@ class serial_manager:
self.loop = asyncio.new_event_loop() self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) asyncio.set_event_loop(self.loop)
# # 為每個 serial_object 建立連接
# for serial_obj in self.serial_objects:
# coro = serial_asyncio.create_serial_connection(
# self.loop,
# lambda: SerialProtocol(serial_obj.receiver_type),
# serial_obj.serial_port,
# baudrate=serial_obj.baudrate
# )
# transport, protocol = self.loop.run_until_complete(coro)
# serial_obj.transport = transport
# serial_obj.protocol = protocol
try: try:
self.loop.run_forever() self.loop.run_forever()
finally: finally:
self.loop.close() self.loop.close()
def create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): def create_serial_link(self, serial_port, baudrate, target_port, serial_mode: SerialMode):
"""創建串口連接"""
if not self.running or not self.loop: if not self.running or not self.loop:
logger.error("Event loop not running, cannot create serial link") logger.error("Event loop not running, cannot create serial link")
return False return False
# 檢查 serial port 有效 # 檢查 serial port 有效性
if not self.check_serial_port(serial_port, baudrate): if not self.check_serial_port(serial_port, baudrate):
logger.error(f"Serial port {serial_port} validation failed") logger.error(f"Serial port {serial_port} validation failed")
return False return False
# 使用 run_coroutine_threadsafe 執行協程並獲取結果 # 使用 run_coroutine_threadsafe 執行協程並獲取結果
future = asyncio.run_coroutine_threadsafe( future = asyncio.run_coroutine_threadsafe(
self._async_create_serial_link(serial_port, baudrate, target_port, receiver_type), self._async_create_serial_link(serial_port, baudrate, target_port, serial_mode),
self.loop self.loop
) )
@ -442,8 +440,8 @@ class serial_manager:
# 等待結果,設定合理的超時時間 # 等待結果,設定合理的超時時間
result = future.result(timeout=5.0) result = future.result(timeout=5.0)
if result: if result:
logger.info(f"Create Serial Link: {serial_port} -> UDP {target_port}") logger.info(f"Create Serial Link: {serial_port} ({serial_mode.name}) -> UDP {target_port}")
return True return result
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.error(f"Timeout creating serial link for {serial_port}") logger.error(f"Timeout creating serial link for {serial_port}")
return False return False
@ -451,27 +449,28 @@ class serial_manager:
logger.error(f"Failed to create serial link for {serial_port}: {e}") logger.error(f"Failed to create serial link for {serial_port}: {e}")
return False return False
async def _async_create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): async def _async_create_serial_link(self, serial_port, baudrate, target_port, serial_mode: SerialMode):
"""在事件循環線程中執行實際的連接創建""" """在事件循環線程中執行實際的連接創建"""
try: try:
# 創建 serial_object 實例 # 創建 serial_object 實例
serial_obj = self.serial_object(serial_port, baudrate, target_port, receiver_type) serial_obj = self.serial_object(serial_port, baudrate, target_port, serial_mode)
# 建立 UDP 處理器並指定目標端口位置 # 建立 UDP 處理器並指定目標端口位置
serial_obj.udp_handler = UDPHandler(target_port) serial_obj.udp_handler = UDPHandler(target_port, serial_mode)
# 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配 # 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配
udp_transport, udp_protocol = await self.loop.create_datagram_endpoint( udp_transport, udp_protocol = await self.loop.create_datagram_endpoint(
lambda: serial_obj.udp_handler, lambda: serial_obj.udp_handler,
local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口 local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口
) )
serial_obj.transport = udp_transport serial_obj.transport = udp_transport
serial_obj.protocol = udp_protocol serial_obj.protocol = udp_protocol
# logger.info(f"UDP endpoint created for {serial_port}") # debug logger.debug(f"UDP endpoint created for {serial_port}")
# 建立 Serial 處理器,將 UDP 處理器傳給它 # 建立 Serial 處理器,將 UDP 處理器和模式傳給它
serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler, serial_port) serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler, serial_port, serial_mode)
# 建立 Serial 連接 # 建立 Serial 連接
serial_transport, _ = await serial_asyncio.create_serial_connection( serial_transport, _ = await serial_asyncio.create_serial_connection(
@ -481,14 +480,14 @@ class serial_manager:
baudrate=baudrate baudrate=baudrate
) )
# logger.info(f"Serial connection created for {serial_port}") # debug logger.debug(f"Serial connection created for {serial_port}")
# 將 serial_object 加入管理列表 # 將 serial_object 加入管理列表
serial_id = self.serial_count + 1 serial_id = self.serial_count + 1
self.serial_objects[serial_id] = serial_obj self.serial_objects[serial_id] = serial_obj
self.serial_count += 1 self.serial_count += 1
# logger.info(f"Serial object {serial_id} added to manager") # debug logger.debug(f"Serial object {serial_id} added to manager")
return True return True
except Exception as e: except Exception as e:
@ -501,12 +500,10 @@ class serial_manager:
def remove_serial_link(self, serial_id): def remove_serial_link(self, serial_id):
"""移除串口連接(線程安全方式)""" """移除串口連接(線程安全方式)"""
# 確保事件循環正在運行
if not self.loop: if not self.loop:
logger.error("Event loop not running") logger.error("Event loop not running")
return False return False
# 檢查 serial_id 是否存在
if serial_id not in self.serial_objects: if serial_id not in self.serial_objects:
logger.warning(f"Serial object {serial_id} not found") logger.warning(f"Serial object {serial_id} not found")
return False return False
@ -549,7 +546,7 @@ class serial_manager:
# 從管理列表中移除 # 從管理列表中移除
del self.serial_objects[serial_id] del self.serial_objects[serial_id]
# logger.info(f"Serial object {serial_id} removed from manager") # debug logger.debug(f"Serial object {serial_id} removed from manager")
return True return True
except Exception as e: except Exception as e:
@ -557,7 +554,8 @@ class serial_manager:
return False return False
def get_serial_link(self): def get_serial_link(self):
ret = {} # serial id num : serial_port string """取得所有串口連接資訊"""
ret = {} # serial id num : serial_port string
for key, obj in self.serial_objects.items(): for key, obj in self.serial_objects.items():
ret[key] = obj.serial_port ret[key] = obj.serial_port
return ret return ret
@ -593,6 +591,8 @@ class serial_manager:
return False return False
# ====================== 測試代碼 =====================
if __name__ == '__main__': if __name__ == '__main__':
sm = serial_manager() sm = serial_manager()
sm.start() sm.start()
@ -600,12 +600,17 @@ if __name__ == '__main__':
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200 SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14571 UDP_REMOTE_PORT = 14571
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialReceiverType.XBEEAPI2AT) sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
# SERIAL_PORT = '/dev/ttyACM0' # 手動指定
# SERIAL_BAUDRATE = 115200
# UDP_REMOTE_PORT = 14571
# sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT)
linked_serial = sm.get_serial_link() linked_serial = sm.get_serial_link()
print(linked_serial) print(linked_serial)
time.sleep(10) time.sleep(60)
sm.remove_serial_link(1) sm.remove_serial_link(1)
time.sleep(3) time.sleep(3)
sm.shutdown() sm.shutdown()

@ -0,0 +1,180 @@
"""
MavlinkCommandService 使用範例
展示如何從其他 ROS2 節點調用 MAVLink 指令服務
"""
import rclpy
from rclpy.node import Node
from std_srvs.srv import Trigger
import time
class MavlinkClientExample(Node):
"""
範例MAVLink Service Client
這個節點展示如何調用 MavlinkCommandService 提供的服務
"""
def __init__(self):
super().__init__('mavlink_client_example')
# 創建 service client
self.client = self.create_client(
Trigger,
'/mavlink/send_command_long'
)
# 等待服務可用
self.get_logger().info('等待 service 可用...')
self.client.wait_for_service()
self.get_logger().info('Service 已連接!')
def send_arm_command(self):
"""
範例發送 ARM 指令
實際使用時應該發送
- message_id = 76 (COMMAND_LONG)
- command = 400 (MAV_CMD_COMPONENT_ARM_DISARM)
- param1 = 1 (ARM)
"""
self.get_logger().info('發送 ARM 指令...')
request = Trigger.Request()
# TODO: 實際使用時應該是自定義的 SendCommandLong.Request
# request.target_sysid = 1
# request.target_compid = 1
# request.command = 400 # MAV_CMD_COMPONENT_ARM_DISARM
# request.param1 = 1.0 # 1 = ARM, 0 = DISARM
# request.param2 = 0.0
# ... param3-7
# request.timeout = 3.0
# 異步調用
future = self.client.call_async(request)
# 等待結果
rclpy.spin_until_future_complete(self, future)
if future.done():
response = future.result()
if response.success:
self.get_logger().info('✓ ARM 指令發送成功')
else:
self.get_logger().error(f'✗ ARM 指令失敗: {response.message}')
else:
self.get_logger().error('✗ Service 調用超時')
def main():
"""主函數"""
rclpy.init()
# 創建客戶端節點
client = MavlinkClientExample()
# 發送指令
client.send_arm_command()
# 清理
client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
"""
進階使用範例高層應用程式
假設我們要創建一個 "任務控制器"通過 MavlinkCommandService 來控制載具
```python
class MissionController(Node):
def __init__(self):
super().__init__('mission_controller')
# 創建各種 service clients
self.client_arm = self.create_client(
SendCommandLong, '/mavlink/send_command_long')
self.client_mode = self.create_client(
SendCommandLong, '/mavlink/send_command_long')
self.client_takeoff = self.create_client(
SendCommandLong, '/mavlink/send_command_long')
self.client_goto = self.create_client(
SendCommandInt, '/mavlink/send_command_int')
def arm_vehicle(self, sysid=1):
\"\"\"解鎖載具\"\"\"
request = SendCommandLong.Request()
request.target_sysid = sysid
request.target_compid = 1
request.command = 400 # MAV_CMD_COMPONENT_ARM_DISARM
request.param1 = 1.0 # ARM
future = self.client_arm.call_async(request)
# 處理回應...
def set_mode(self, sysid=1, mode='GUIDED'):
\"\"\"設置飛行模式\"\"\"
# 實現模式切換邏輯...
pass
def takeoff(self, sysid=1, altitude=10.0):
\"\"\"起飛\"\"\"
request = SendCommandLong.Request()
request.target_sysid = sysid
request.target_compid = 1
request.command = 22 # MAV_CMD_NAV_TAKEOFF
request.param7 = altitude
future = self.client_takeoff.call_async(request)
# 處理回應...
def goto_position(self, sysid=1, lat=0, lon=0, alt=10):
\"\"\"前往指定位置\"\"\"
request = SendCommandInt.Request()
request.target_sysid = sysid
request.target_compid = 1
request.command = 192 # MAV_CMD_DO_REPOSITION
request.x = int(lat * 1e7)
request.y = int(lon * 1e7)
request.z = alt
future = self.client_goto.call_async(request)
# 處理回應...
def execute_mission(self):
\"\"\"執行完整任務\"\"\"
# 1. 解鎖
self.arm_vehicle()
time.sleep(1)
# 2. 切換到 GUIDED 模式
self.set_mode(mode='GUIDED')
time.sleep(1)
# 3. 起飛到 10 公尺
self.takeoff(altitude=10.0)
time.sleep(10)
# 4. 前往目標點
self.goto_position(lat=25.033, lon=121.565, alt=10)
time.sleep(30)
# 5. 返回並降落
# ...
```
這樣的設計讓高層應用可以
1. 完全不需要知道 MAVLink 協議細節
2. 通過 ROS2 service 與載具互動
3. 模組化開發不同功能
4. 易於測試和維護
"""

@ -0,0 +1,3 @@
from .changeMode import ChangeModeClient, ChangeModeResult, change_mode
__all__ = ["ChangeModeClient", "ChangeModeResult", "change_mode"]

@ -0,0 +1,127 @@
"""Simple wrapper for mode change via fc_network ROS2 service.
Reference CLI command:
ros2 service call /fc_network/vehicle/send_command_long \
fc_interfaces/srv/MavCommandLong \
"{target_sysid: 3, target_compid: 0, command: 176, confirmation: 0, \
param1: 1, param2: 4, param3: 0, param4: 0, param5: 0, param6: 0, \
param7: 0, timeout_sec: 2}"
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_DO_SET_MODE = 176
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class ChangeModeResult:
"""Return value for mode change requests."""
success: bool
message: str
ack_result: int
class ChangeModeClient(Node):
"""Small ROS2 client dedicated to change flight mode."""
def __init__(self, service_name: str = DEFAULT_SERVICE_NAME) -> None:
super().__init__("fc_change_mode_client")
self._client = self.create_client(MavCommandLong, service_name)
def wait_for_service(self, timeout_sec: float = 3.0) -> bool:
return self._client.wait_for_service(timeout_sec=timeout_sec)
def change_mode(
self,
*,
target_sysid: int,
custom_mode: float,
target_compid: int = 0,
base_mode: float = 1.0,
confirmation: int = 0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> ChangeModeResult:
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_DO_SET_MODE
req.confirmation = confirmation
req.param1 = float(base_mode)
req.param2 = float(custom_mode)
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = float(timeout_sec)
future = self._client.call_async(req)
rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return ChangeModeResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return ChangeModeResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
def change_mode(
*,
target_sysid: int,
custom_mode: float,
target_compid: int = 0,
base_mode: float = 1.0,
confirmation: int = 0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> ChangeModeResult:
"""One-shot helper for collaborators who want minimal code."""
rclpy.init(args=None)
node: Optional[ChangeModeClient] = None
try:
node = ChangeModeClient(service_name=service_name)
if not node.wait_for_service(timeout_sec=timeout_sec):
return ChangeModeResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
return node.change_mode(
target_sysid=target_sysid,
target_compid=target_compid,
custom_mode=custom_mode,
base_mode=base_mode,
confirmation=confirmation,
timeout_sec=timeout_sec,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
# Example values are aligned with your terminal command.
result = change_mode(target_sysid=3, custom_mode=4)
print(
f"change_mode success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)

@ -0,0 +1,29 @@
"""Usage example for collaborators.
Run after sourcing your ROS2 workspace:
source install/local_setup.bash
python src/fc_network_apps/example_change_mode.py
"""
from fc_network_apps import change_mode
def main() -> None:
# Equivalent to:
# ros2 service call /fc_network/vehicle/send_command_long ... param1:1 param2:4
result = change_mode(
target_sysid=3,
target_compid=0,
base_mode=1.0,
custom_mode=4.0,
timeout_sec=2.0,
)
print("=== change mode result ===")
print(f"success : {result.success}")
print(f"ack_result: {result.ack_result}")
print(f"message : {result.message}")
if __name__ == "__main__":
main()

@ -22,6 +22,7 @@
自己的常用指令 自己的常用指令
python -m fc_network_adapter.tests.test_vehicleStatusPublisher python -m fc_network_adapter.tests.test_vehicleStatusPublisher
python -m fc_network_adapter.tests.test_ringBuffer python -m fc_network_adapter.tests.test_ringBuffer
python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
ros2 topic list ros2 topic list
ros2 topic echo ros2 topic echo
@ -32,4 +33,14 @@ ros2 topic echo
mavproxy.py --master=tcp:127.0.0.1:5790 --out=udp:127.0.0.1:14560 mavproxy.py --master=tcp:127.0.0.1:5790 --out=udp:127.0.0.1:14560
ros2 service call /mavlink/add_two_ints example_interfaces/srv/AddTwoInts "{a: 5,b: 8}"
ros2 service call /fc_network/vehicle/mav_ping fc_interfaces/srv/MavPing "{target_sysid: 3, target_compid: 0, ping_seq: 1}"
ros2 service call /fc_network/vehicle/send_command_long fc_interfaces/srv/MavCommandLong "{target_sysid: 3, target_compid: 0, command: 176, confirmation: 0, param1: 1, param2: 4, param3: 0,param4: 0,param5: 0,param6: 0,param7: 0, timeout_sec: 2}"
MAV_CMD_DO_SET_MODE (176)
sudo tcpdump -i lo 'udp dst port 14561' -X
sudo tcpdump -i lo 'udp dst port 14550' -X -vv
sudo tcpdump -i lo -X udp port 14550
Loading…
Cancel
Save