timesync 初步做好

Chiyu Chen 1 month ago
parent 1d49ca12e9
commit 5374e2b9d9

1
.gitignore vendored

@ -25,3 +25,4 @@ Makefile
**/*.class
**/*.pyc
**/*.pyo
**/.cursor/

@ -21,6 +21,12 @@ ROS2
1. source ~/ros2_humble/install/setup.bash
2.
===
功能簡介
1. mavlink 多對多支援平台
2. 不允許進到 ros 系統有相同 sysid
3. 假設所有 component 共用同一 socket
===
開發用輔助專案
1. Gazebo Garden
@ -41,6 +47,8 @@ git submodule update
# 2. build 需要的 package
colcon build --packages-select angles geographic_msgs
colcon build --packages-select mavros_msgs # 這個依賴前面的
colcon build --packages-select fc_interfaces # 自己定義的
```

@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.8)
project(fc_interfaces)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/MavPing.srv"
"srv/MavCommandLong.srv"
)
ament_package()

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>fc_interfaces</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="chiyu1468@hotmail.com">picars</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<exec_depend>rosidl_default_runtime</exec_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

@ -0,0 +1,18 @@
# Request
uint8 target_sysid
uint8 target_compid
uint16 command
uint8 confirmation
float32 param1
float32 param2
float32 param3
float32 param4
float32 param5
float32 param6
float32 param7
float32 timeout_sec
---
# Response
bool success
string message
uint8 ack_result

@ -0,0 +1,9 @@
# Request
uint8 target_sysid
uint8 target_compid
uint8 ping_seq
---
# Response
bool success
string message
float32 rtt_ms

@ -1138,7 +1138,6 @@ class Orchestrator:
# === 4) ros 部分的準備 ===
self.fc_ros_manager = mros.ros2_manager
if not self.fc_ros_manager.initialized:
self.fc_ros_manager.initialize()
self.fc_ros_manager.status_publisher.rate_controller.topic_intervals = {
'position': 1.0,
@ -1311,7 +1310,7 @@ class Orchestrator:
if self.fc_ros_manager.spin_thread.is_alive():
if self.fc_ros_manager.running:
self.fc_ros_manager.stop()
self.fc_ros_manager.shutdown()
self.fc_ros_manager.spin_thread.join(timeout=2)
# 關閉面板執行緒

@ -45,6 +45,8 @@ from collections import deque
# mavlink 的 import
from pymavlink import mavutil
from pymavlink.dialects.v20 import common as mav_common
from pymavlink.dialects.v20 import ardupilotmega as mav_ardupilot
# 自定義的 import
from .mavlinkVehicleView import (
@ -342,6 +344,19 @@ class mavlink_object:
mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) }
socket_num = 0 # 用來記錄目前的 socket 數量
# 用來銜接 pymavlink 的 MavLink 實例的管道
class CapturingPipeline:
def __init__(self, target_deque):
self.target_deque = target_deque
self.last_data = b''
def write(self, data):
self.last_data = bytes(data)
logger.debug(f'CapturingPipeline write data: {data.hex()}') # developer debug
# 把收到的資料 bytes 放到 target_deque
self.target_deque.append(data)
def seek(self, pos): pass
def tell(self): return 0
def __new__(cls, *args, **kwargs):
# 每創建一個實例 就將其添加到 mavlinkObjects 列表中
# 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號
@ -359,16 +374,20 @@ class mavlink_object:
cls.mavlinkObjects[socket_id] = instance
return instance
def __init__(self, socket):
def __init__(self, socket, socket_dialect = 'common'):
# 登入所需的 socket
self.mavlink_socket = socket
# 用於主線程發送消息的緩衝區
self.outgoing_msgs = deque()
self.mavlinkPipeline = self.CapturingPipeline(self.outgoing_msgs)
if socket_dialect == 'common':
self.MAVLink = mav_common.MAVLink(self.mavlinkPipeline, srcSystem=254, srcComponent=191)
# 記錄訊息過濾類型 (可選)
self.bridge_msg_types = set([0, 30, 33, 74, 147]) # 0 HEARTBEAT, 30 ATTITUDE, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS
self.return_msg_types = set()
self.return_msg_types = set([])
# 轉發到別的 mavlink object 作為目標端口 的列表
self.target_sockets = set()

@ -24,8 +24,12 @@ import sensor_msgs.msg
import geometry_msgs.msg
import mavros_msgs.msg
# ROS2 Service imports
import fc_interfaces.srv as fcsrv
# 自定義 imports
from . import mavlinkVehicleView as mvv
from . import mavlinkObject as mo
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
@ -450,25 +454,53 @@ class VehicleStatusPublisher(Node):
# MavlinkCommandService Node
# ============================================================================
class PendingEntry():
def __init__(self, event, match_fn):
# self.match_msgid = match_msgid # 只會在初始時設定
self.match_fn = match_fn
self.event = event # 只會在外迴圈 set()
self.result_mav_msg = None #只會在外迴圈 被寫入
self.error = ""
# def match_fn(self, compare_msg):
# # 驗證是不是期待的封包
# if self.match_msgid == compare_msg.get_msgId():
# return True
# else:
# return False
class MavlinkCommandService(Node):
"""
MAVLink 指令服務節點 - 提供 ROS2 service 介面來發送 MAVLink 指令
職責:
- 作為 service server等待 client 請求
- 作為 service server 等待 client 請求
- 接收請求組裝 MAVLink 封包
- 調用 mavlinkObject 發送封包
- 處理 ACK 等待和超時未來實現
設計理念回歸 MAVLink 純粹結構
- 只負責將 ROS2 請求轉換為 MAVLink 封包
- 不預設功能 ARM/DISARM保持模組化
- 不預設功能 ARM/DISARM) 保持模組化
- 高層應用可透過此 service 實現各種功能
"""
serviceString_prefix = '/fc_network/vehicle'
def __init__(self):
super().__init__('mavlink_command_service')
# 狀態標記
self.running = True
# mavlinkObject 的引用(將由外部設置)
self.mavlink_analyzer = None
# pending 旗標物件的儲存庫
self._pending_by_sysid = {} # sysid(int) : PendingEntry
# ═══════════════════════════════════════════════════════════════════
# ROS2 Service 架構說明:
#
@ -495,150 +527,336 @@ class MavlinkCommandService(Node):
# # 或 response = client.call(request) # 同步調用
# ═══════════════════════════════════════════════════════════════════
# 由於 ROS2 自定義 service 需要 .srv 檔案編譯
# 這裡先使用標準 String service 作為簡化實現
# TODO: 未來可創建專門的 .srv 檔案
from std_srvs.srv import Trigger
from example_interfaces.srv import SetBool
# ═══════════════════════════════════════════════════════════════════
# Service 1: 發送 MAVLink Message通用介面
# 使用 Trigger 作為臨時實現,未來應使用自定義 service
# Service : ADD_TWO 測試用,未來刪除
# ═══════════════════════════════════════════════════════════════════
# TODO: 創建 SendMavlinkMessage.srv
# Request:
# uint8 target_sysid
# uint8 target_compid
# uint16 message_id
# string fields_json # JSON 格式的字段數據
# bool wait_response
# uint16 response_msgid
# float32 timeout
# Response:
# bool success
# string response_json
# string error_message
# 暫時使用簡化版本(僅示範架構)
self.srv_send_message = self.create_service(
Trigger,
'/mavlink/send_message',
self.handle_send_message
from example_interfaces.srv import AddTwoInts
self.srv_test = self.create_service(
AddTwoInts,
'mavlink/add_two_ints',
self.handle_add_two_ints
)
# ═══════════════════════════════════════════════════════════════════
# Service 2: 發送 COMMAND_LONG
# Service : TIMESYNC 可以作為模板
# ═══════════════════════════════════════════════════════════════════
self.srv_command_long = self.create_service(
Trigger,
'/mavlink/send_command_long',
self.handle_command_long
self.srv_mav_ping = self.create_service(
fcsrv.MavPing,
self.serviceString_prefix + '/mav_ping',
self.handle_mav_timesync_ping
)
# ═══════════════════════════════════════════════════════════════════
# Service 3: 參數請求
# Service : 發送 COMMAND_LONG
# ═══════════════════════════════════════════════════════════════════
self.srv_param_request = self.create_service(
Trigger,
'/mavlink/param_request',
self.handle_param_request
)
# 狀態標記
self.running = True
# mavlinkObject 的引用(將由外部設置)
self.mavlink_analyzer = None
# self.srv_command_long = self.create_service(
# fcsrv.MavCommandLong,
# self.serviceString_prefix + '/send_command_long',
# self.handle_command_long
# )
logger.info("MavlinkCommandService initialized")
def set_mavlink_analyzer(self, mavlink_analyzer):
"""
設置 mavlink_analyzer 引用
def _index(self, target_sysid, target_compid):
# 找到對應的 vehicle
vehicle = mvv.vehicle_registry.get(target_sysid)
if not vehicle:
return None
socket_id = vehicle.custom_meta.get("socket_id")
if socket_id is None:
return None
# 提取主要的 socket_id
mav_obj = mo.mavlink_object.mavlinkObjects.get(socket_id)
if mav_obj is None:
return None
return mav_obj
def return_router(self):
'''
這邊是給外部迴圈呼叫的 消耗 return_packet_ring 裡接收到的 mavlink 封包
分送到各自的 pending
藉由 event.set() 解開 service 中的 block
'''
return_tuple = mo.return_packet_ring.get()
# 確認 return_packet_ring 有資料
if return_tuple == None: return
socketid, timestamp, msg = return_tuple
sysid = msg.get_srcSystem()
_pending = self._pending_by_sysid.get(sysid)
# 確認是否有 service 在等待回應 若無直接 return 此封包也會被忽略
if _pending == None:
return
Args:
mavlink_analyzer: mavlinkObject.mavlink_analyzer 實例
"""
self.mavlink_analyzer = mavlink_analyzer
logger.info("MavlinkCommandService: mavlink_analyzer set")
if _pending.match_fn(msg):
_pending.result_mav_msg = msg
_pending.event.set()
# ═══════════════════════════════════════════════════════════════════════
# Service Handler: 發送 MAVLink Message
# Service Handler: 發送 TIMESYNC 可以作為模板
# ═══════════════════════════════════════════════════════════════════════
def handle_send_message(self, request, response):
"""
處理發送 MAVLink 訊息的請求
def handle_mav_timesync_ping(self, request, response):
'''
timesync 封包驗證來回的時間
'''
fail_skip = False
ROS2 Service Callback 說明
- 此函數會在 client 調用 service 時被執行
- request: 包含 client 傳入的參數
- response: 需要填充結果並返回給 client
- 必須 return response
# 設定失效回應
response.success = False
response.message = "Unknown error"
Args:
request: Trigger.Request (暫時使用未來改為自定義)
response: Trigger.Response
response.rtt_ms = 0.0
timeout_sec = 2.0
expect_recieve_msg_id = 111 # TIMESYNC
Returns:
response: 填充後的回應
"""
logger.info("Received send_message request")
# 1) 確認是否已經有相同 sysid 的其他需求正在 pending
if request.target_sysid in self._pending_by_sysid:
response.message = f"sysid {request.target_sysid} already has pending request"
return response
# 檢查 mavlink_analyzer 是否已設置
if self.mavlink_analyzer is None:
response.success = False
response.message = "Error: mavlink_analyzer not set"
logger.error(response.message)
# 2) 找到 socket 標地
socketObject = self._index(request.target_sysid, request.target_compid)
if socketObject is None:
response.message = "This system id not found."
return response
# TODO: 實際實現
# 1. 從 request 解析參數target_sysid, message_id, fields 等)
# 2. 使用 pymavlink 組裝 MAVLink 封包
# 3. 調用 mavlink_analyzer.send_message() 發送
# 4. 如果 wait_response=True則等待 return_packet_ring 中的回應
# 3) 接收封包系統 的設定
# 在 socket 那邊先把要的封包種類導流進來
socketObject.set_return_message_types(list(socketObject.return_msg_types) + [expect_recieve_msg_id])
evt = threading.Event()
# 設定封包檢驗
# 這邊是設定 tc1 不為 0
def match_fn(compare_msg):
if compare_msg.get_msgId() != expect_recieve_msg_id :
return False
# logger.debug(f"mark A : {compare_msg.get_srcSystem()} ,{compare_msg.get_msgId()}, {compare_msg.tc1}, {compare_msg.ts1}")
if compare_msg.tc1 == 0 :
return False
return True
# 暫時返回成功(示範用)
response.success = True
response.message = "Message sent (placeholder implementation)"
_pending = PendingEntry(event = evt, match_fn = match_fn)
self._pending_by_sysid[request.target_sysid] = _pending
# 4) 送出封包
now_us = int(time.monotonic() * 1e6)
socketObject.MAVLink.timesync_send(0, now_us)
# 5) 等待回應封包
if not evt.wait(timeout_sec):
response.message = "waiting Timeout - TIMESYNC"
msg_types = list(socketObject.return_msg_types)
if expect_recieve_msg_id in msg_types:
msg_types.remove(expect_recieve_msg_id)
socketObject.set_return_message_types(msg_types)
del evt
self._pending_by_sysid.pop(request.target_sysid, None)
return response
# 6) 處理回應封包
ack_msg = _pending.result_mav_msg
current_time = int(time.monotonic() * 1e6)
response.rtt_ms = (current_time - now_us) / 1e3
response.message = ""
# 7) 接收封包系統 的重置
msg_types = list(socketObject.return_msg_types)
if expect_recieve_msg_id in msg_types:
msg_types.remove(expect_recieve_msg_id)
socketObject.set_return_message_types(msg_types)
del evt
self._pending_by_sysid.pop(request.target_sysid, None)
return response
def handle_add_two_ints(self, request, response):
"""測試用 Service Handler 未來刪除"""
logger.info(f"Received add_two_ints request: {request.a} + {request.b}")
response.sum = request.a + request.b
logger.info(
f"[add_two_ints] thread_ident={threading.get_ident()} time={time.time()}"
)
time.sleep(1)
return response
# # 感覺用不到
# def set_mavlink_analyzer(self, mavlink_analyzer):
# """
# 設置 mavlink_analyzer 引用
# Args:
# mavlink_analyzer: mavlinkObject.mavlink_analyzer 實例
# """
# self.mavlink_analyzer = mavlink_analyzer
# logger.info("MavlinkCommandService: mavlink_analyzer set")
# ═══════════════════════════════════════════════════════════════════════
# Service Handler: 發送 COMMAND_LONG
# ═══════════════════════════════════════════════════════════════════════
def handle_command_long(self, request, response):
"""
處理發送 COMMAND_LONG 的請求
# 設定失效回應
response.success = False
response.ack_result = 255
# 等待回應 timeout
timeout_sec = request.timeout_sec
COMMAND_LONG (MAVLink message ID=76):
- 用於發送簡單命令給載具
- 常用於 ARM/DISARM, 模式切換, TAKEOFF, LAND
# 1) 確認是否已經有相同 sysid 的其他需求正在 pending
if request.target_sysid in self._pending_by_sysid:
response.message = f"sysid {request.target_sysid} already has pending request"
return response
Args:
request: Trigger.Request
response: Trigger.Response
# 2) 找到 socket 標地
socketObject = self._index(request.target_sysid, request.target_compid)
if socketObject is None:
response.message = "This system id not found."
return response
Returns:
response: 填充後的回應
"""
logger.info("Received command_long request")
# 3) 接收封包系統 的設定
expect_recieve_msg_id = mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK # 77
socketObject.set_return_message_types(list(socketObject.return_msg_types) + [expect_recieve_msg_id])
evt = threading.Event()
if self.mavlink_analyzer is None:
response.success = False
response.message = "Error: mavlink_analyzer not set"
def match_fn(compare_msg):
if compare_msg.get_msgId() != expect_recieve_msg_id :
return False
return True
_pending = PendingEntry(event = evt, match_fn = match_fn)
self._pending_by_sysid[request.target_sysid] = _pending
# 4) 送出封包
socketObject.MAVLink.command_long_send(
request.target_sysid,
request.target_compid,
request.command,
request.confirmation,
request.param1, request.param2, request.param3, request.param4,
request.param5, request.param6, request.param7
)
# 5) 等待回應封包
if not evt.wait(timeout_sec):
response.message = "waiting Timeout - CommLONG"
del evt
del self._pending_by_sysid[request.target_sysid]
return response
# TODO: 實際實現
# 1. 從 request 解析 COMMAND_LONG 參數
# - target_sysid, target_compid
# - command (MAV_CMD_xxx)
# - param1~param7
# 2. 組裝 COMMAND_LONG 封包
# 3. 發送並等待 COMMAND_ACK (message ID=77)
# 4. 解析 ACK 結果ACCEPTED/FAILED 等)
# 6) 處理回應封包
ack_msg = _pending.result_mav_msg
response.success = True
response.message = "Command sent (placeholder implementation)"
# 7) 接收封包系統 的重置
msg_types = list(socketObject.return_msg_types)
if expect_recieve_msg_id in msg_types:
msg_types.remove(expect_recieve_msg_id)
socketObject.set_return_message_types(msg_types)
del evt
del self._pending_by_sysid[request.target_sysid]
return response
# target_sysid = request.target_sysid
# target_compid = request.target_compid
# timeout_sec = request.timeout_sec if request.timeout_sec > 0 else 1.0
# # 1) 找 vehicle/socket/mav_obj
# vehicle = mvv.vehicle_registry.get(target_sysid)
# if not vehicle:
# response.success = False
# response.ack_result = 255
# response.message = f"Vehicle {target_sysid} not found"
# return response
# socket_id = vehicle.custom_meta.get("socket_id")
# mav_obj = mo.mavlink_object.mavlinkObjects.get(socket_id)
# if mav_obj is None:
# response.success = False
# response.ack_result = 255
# response.message = "mavlink_object not found"
# return response
# # 2) 設定回應型別(至少含 COMMAND_ACK=77
# mav_obj.set_return_message_types([mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK])
# # 3) 每機最多一筆 pendingbusy 就直接回錯
# evt = threading.Event()
# pending = PendingEntry(
# event=evt,
# deadline_monotonic=time.monotonic() + timeout_sec,
# result_msg=None,
# error=""
# )
# def _match_ack(msg):
# return (
# msg.get_msgId() == mavutil.mavlink.MAVLINK_MSG_ID_COMMAND_ACK and
# msg.command == request.command
# )
# pending.match_fn = _match_ack
# with self._pending_lock:
# if target_sysid in self._pending_by_sysid:
# response.success = False
# response.ack_result = 255
# response.message = f"sysid {target_sysid} already has pending request"
# return response
# self._pending_by_sysid[target_sysid] = pending
# try:
# # 4) 組封包
# mav_obj.MAVLink.command_long_send(
# target_sysid,
# target_compid,
# request.command,
# request.confirmation,
# request.param1, request.param2, request.param3, request.param4,
# request.param5, request.param6, request.param7
# )
# if not mav_obj.outgoing_msgs:
# raise RuntimeError("No encoded command_long bytes")
# message_bytes = mav_obj.outgoing_msgs.popleft()
# # 5) 發送
# ok = mo.mavlink_bridge().send_message(message_bytes, target_sysid=target_sysid)
# if not ok:
# raise RuntimeError("send_message failed")
# # 6) 等 Router 通知(不是掃 ring
# if not evt.wait(timeout_sec):
# response.success = False
# response.ack_result = 255
# response.message = "Timeout waiting COMMAND_ACK"
# return response
# if pending.result_msg is None:
# response.success = False
# response.ack_result = 255
# response.message = pending.error or "No ACK message"
# return response
# ack = pending.result_msg
# response.ack_result = ack.result
# response.success = (ack.result == mavutil.mavlink.MAV_RESULT_ACCEPTED)
# response.message = f"COMMAND_ACK result={ack.result}"
# return response
# except Exception as e:
# response.success = False
# response.ack_result = 255
# response.message = str(e)
# return response
# finally:
# with self._pending_lock:
# # 防止異常路徑殘留 pending
# self._pending_by_sysid.pop(target_sysid, None)
# ═══════════════════════════════════════════════════════════════════════
# Service Handler: 參數請求
@ -719,12 +937,6 @@ class MavlinkCommandService(Node):
response.message = "Param request sent (placeholder implementation)"
return response
# ═══════════════════════════════════════════════════════════════════════
# 【新增 Service 位置 4/4】
# 若要新增 service請在此處添加新的 handler 方法
# 並在 __init__ 中創建對應的 service server
# ═══════════════════════════════════════════════════════════════════════
def stop(self):
"""停止服務"""
self.running = False
@ -739,6 +951,11 @@ class fc_ros_manager:
"""
MAVLink ROS2 節點管理器
因為物件創建在 mavlinkROS2Nodes.py 所以會分為 __init__ initialize 兩個階段
stop 是停下止 ROS2 nodes 的運行但不銷毀節點實例允許後續再次 start
shutdown 是完全關閉 ROS2 並銷毀節點實例
管理兩個獨立的 ROS2 Node
- VehicleStatusPublisher
- MavlinkCommandService
@ -750,7 +967,7 @@ class fc_ros_manager:
self.initialized = False
self.running = False
# 两个 node
# 两个 node
self.status_publisher: Optional[VehicleStatusPublisher] = None
self.command_service: Optional[MavlinkCommandService] = None
@ -772,7 +989,7 @@ class fc_ros_manager:
self.status_publisher = VehicleStatusPublisher()
self.command_service = MavlinkCommandService()
# 創建執行者 MultiThreadedExecutor
# 創建執行者 MultiThreadedExecutor 並把 node 加入其中
self.executor = MultiThreadedExecutor()
self.executor.add_node(self.status_publisher)
self.executor.add_node(self.command_service)
@ -813,12 +1030,14 @@ class fc_ros_manager:
self.running = False
return False
# 循環執行的地方
def _spin_executor(self):
"""在 thread 中運行的 executor"""
try:
# logger.info("ROS2 executor spinning...")
while self.running:
self.executor.spin_once(timeout_sec=0.1)
self.command_service.return_router()
except Exception as e:
logger.error(f"fc_ros_manager error in spinning executor: {e}")
@ -900,3 +1119,4 @@ ros2_manager = fc_ros_manager()
5. 預留 MavlinkCommandService 結構稍後實現
'''

@ -191,31 +191,12 @@ class XBeeFrameHandler:
@staticmethod
def decapsulate_data(data: bytes):
"""解封裝 XBee API 幀,提取 RF 數據"""
# XBee API 幀格式:
# 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節)
# 檢查幀起始符 (0x7E)
if not data or len(data) < 5 or data[0] != 0x7E:
return None
# 獲取數據長度 (不包括校驗和)
length = (self.buffer[1] << 8) | data[2]
# 檢查幀完整性
if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和
return None
# 提取 API 標識符
frame_type = data[3]
length = (data[1] << 8) | data[2]
# 根據不同的幀類型進行處理
if frame_type == 0x90: # RX Packet
# 0x90 幀結構: [0x7E][長度H][長度L][0x90][64位地址(8)][16位地址(2)][選項(1)][RF數據...]
rf_data_start = 3 + 12 # 起始符(1) + 長度(2) + 類型(1) + 地址等(11)
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
else:
return None
class ATCommandHandler:
@ -482,6 +463,7 @@ class serial_manager:
lambda: serial_obj.udp_handler,
local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口
)
serial_obj.transport = udp_transport
serial_obj.protocol = udp_protocol
@ -619,11 +601,15 @@ if __name__ == '__main__':
SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14571
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
# SERIAL_PORT = '/dev/ttyACM0' # 手動指定
# SERIAL_BAUDRATE = 115200
# UDP_REMOTE_PORT = 14571
# sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT)
linked_serial = sm.get_serial_link()
print(linked_serial)
time.sleep(30)
time.sleep(60)
sm.remove_serial_link(1)
time.sleep(3)

@ -22,6 +22,7 @@
自己的常用指令
python -m fc_network_adapter.tests.test_vehicleStatusPublisher
python -m fc_network_adapter.tests.test_ringBuffer
python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
ros2 topic list
ros2 topic echo
@ -31,5 +32,11 @@ ros2 topic echo
mavproxy.py --master=tcp:127.0.0.1:5790 --out=udp:127.0.0.1:14560
ros2 service call /fc_network/vehicle/mav_ping fc_interfaces/srv/MavPing "{target_sysid: 3, target_compid: 0, ping_seq: 1}"
ros2 service call /mavlink/add_two_ints example_interfaces/srv/AddTwoInts "{a: 5,b: 8}"
sudo tcpdump -i lo 'udp dst port 14561' -X
sudo tcpdump -i lo 'udp dst port 14550' -X -vv
sudo tcpdump -i lo -X udp port 14550
Loading…
Cancel
Save