Compare commits

..

No commits in common. 'master' and 'wenchun' have entirely different histories.

@ -67,8 +67,6 @@ cd ~/AirTrapMine/src/ # 這是範例!!!
python -m fc_network_adapter.fc_network_adapter.mainOrchestrator
python -m fc_network_adapter.tests.demo_integration
python -m someotherpkg.src.example_wholeMoving
python fc_network_apps/vehicleDiagShow.py
```
2.
@ -85,16 +83,17 @@ python gui.py
建立、維護與飛控韌體的連接
構築 mavlink 封包
處理無線模組的通訊格式 (XBee)
2. fc_interfaces (必要)
自定義的 ROS2 介面檔 沒有這個核心會運作不了
--同時處理與 Gazebo 的 ardupilot_plugin 溝通的 FDM/JSON 訊息 (移除)--
2. fc_interfaces (重要)
自定義的 ROS2 介面檔 沒啥好說的 沒有這個核心會運作不了
3. fc_network_module (重要)
非核心 但是支援載具的重要附屬功能
需要該功能時 作為一個 ros2 節點打開
例如 : ntrip rtk 訊號轉接
4. fc_network_apps
是 fc_network_adapter 高階包裝API
與 fc_network_adapter 銜接做高階功能包裝的應用小程式
利於開發GUI或其他應用 使用者的外層包裝
可以不使用 或者當作一個範例程式來看
這裡的定位是 "核心功能的高階包裝" 可以完全不去用
5. someotherpkg
如何使用 fc_network_apps 的範例檔案
6. GUI

@ -0,0 +1,193 @@
import asyncio
import serial_asyncio
import struct
import os
import sys
import serial
import signal
import traceback
from pymavlink import mavutil
# === 設定區 ===
SERIAL_PORT = 'COM15' # 手動指定
SERIAL_BAUDRATE = 57600
UDP_REMOTE_IP = '127.0.0.1'
UDP_REMOTE_PORT = 14550
DEBUG_MODE = False
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
# === 工具函數 ===
def check_serial_port():
try:
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
ser.close()
return True
except serial.SerialException as e:
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
return False
except Exception as e:
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
return False
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes:
frame_type = 0x10
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
# === Serial Protocol 實作 ===
class SerialToUDP(asyncio.Protocol):
def __init__(self, udp_protocol):
self.udp_protocol = udp_protocol
self.buffer = bytearray()
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"Serial connection established on {SERIAL_PORT}")
def data_received(self, data):
self.buffer.extend(data)
while True:
if len(self.buffer) < 3:
return
if self.buffer[0] != 0x7E:
self.buffer.pop(0)
continue
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1
if len(self.buffer) < full_length:
return
frame = self.buffer[:full_length]
del self.buffer[:full_length]
if hasattr(self.udp_protocol, 'send_udp'):
self.udp_protocol.send_udp(bytes(frame))
def write_to_serial(self, data):
try:
api_frame = build_api_tx_frame(data, TARGET_ADDR64)
pass
self.transport.write(api_frame)
except Exception as e:
print(f"[TX Error] 無法封裝或傳送資料: {e}")
# === UDP Protocol 實作 ===
class UDPHandler(asyncio.DatagramProtocol):
def __init__(self):
self.serial_transport = None
self.transport = None
self.mav_decoder = mavutil.mavlink.MAVLink(None)
def connection_made(self, transport):
self.transport = transport
print("UDP transport ready.")
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
def datagram_received(self, data, addr):
if self.serial_transport:
self.serial_transport.write_to_serial(data)
def send_udp(self, data):
decoded_data = self.decapsulate_data(data)
if decoded_data is None:
pass
return
self.decode_mavlink_data(decoded_data)
if self.transport:
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def decapsulate_data(self, data):
try:
if not data or data[0] != 0x7E:
return None
length = (data[1] << 8) | data[2]
if len(data) < length + 4:
return None
frame_type = data[3]
if frame_type == 0x90:
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
else:
return None
except Exception as e:
print(f"[XBee 解封錯誤] {e}")
return None
def decode_mavlink_data(self, data):
try:
msg = self.mav_decoder.parse_char(data)
if msg:
if msg.get_type() == "HEARTBEAT":
pass # 不輸出任何訊息
else:
pass
except Exception as e:
print(f"[MAVLink Decode Error] {e}")
# === 主流程 ===
async def main():
if not check_serial_port():
print("程式終止:串口檢查失敗")
return
loop = asyncio.get_running_loop()
if os.name != 'nt': # Windows 不支援 add_signal_handler
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop)))
udp_handler = UDPHandler()
try:
udp_transport, _ = await loop.create_datagram_endpoint(
lambda: udp_handler,
local_addr=('0.0.0.0', 0)
)
except Exception as e:
print(f"無法創建 UDP 端點:{str(e)}")
return
sock = udp_transport.get_extra_info('socket')
print(f"UDP listening on {sock.getsockname()}")
try:
serial_proto = SerialToUDP(udp_handler)
await serial_asyncio.create_serial_connection(
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
except Exception as e:
print(f"無法建立串口連接:{str(e)}")
traceback.print_exc()
udp_transport.close()
return
print("等待串口資料...")
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
async def shutdown(loop):
print("Shutting down...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程式被使用者中斷")
except Exception as e:
print("程式執行錯誤:")
traceback.print_exc()

@ -14,7 +14,6 @@ find_package(builtin_interfaces REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/AttitudeRaw.msg"
"msg/GnssRaw.msg"
"msg/SystemDiagnosticsRaw.msg"
"msg/ServiceAckResult.msg"
"srv/MavPing.srv"
"srv/MavCommandLong.srv"

@ -1,11 +0,0 @@
builtin_interfaces/Time stamp
uint32 sensors_install_mask
uint32 sensors_enabled_mask
uint32 sensors_health_mask
uint16 mcu_load
uint16 bus_error_rate
uint16 bus_error_count
uint16 errors_count1
uint16 errors_count2
uint16 errors_count3
uint16 errors_count4

@ -29,7 +29,7 @@ from .utils import acquireSerial, acquirePort
from .utils.acquirePort import find_available_port
logger = setup_logger(os.path.basename(__file__))
PROJECT_VER = "v1.20"
PROJECT_VER = "v1.10"
class PanelState:
def __init__(self):
@ -1106,7 +1106,7 @@ class ControlPanel:
0: "HB", 1: "S_STAT", 2: "S_TIME", 24: "GPS_RAW", 27: "RAW_IMU",
30: "ATT", 32: "LOC_POS", 33: "GLB_POS", 62: "NAV_CTL",
74: "VFR_HUD", 147: "BATT_ST", 136: "TERRAIN", 241: "VIBRAT",
125: "POW_STA", 253: "STAT_TXT",
125: "POW_STA",
}
# ardupilot mega
@ -1198,8 +1198,6 @@ class Orchestrator:
'vfr_hud': 1.0,
'mode': 0.0,
'summary': 1.0,
'sys_diags': 1.0,
'status_text': 1.0,
}
def engageWholeSystem(self):
@ -1644,13 +1642,13 @@ def main():
if mo.MODULE_VER != "1.50":
print("Module Version Error! : mavlinkObkect")
version_check = False
if mros.MODULE_VER != "2.50":
if mros.MODULE_VER != "2.10":
print("Module Version Error! : mavlinkROS2Nodes")
version_check = False
if mvv.MODULE_VER != "1.10":
if mvv.MODULE_VER != "1.00":
print("Module Version Error! : mavlinkVehicleView")
version_check = False
if sm.MODULE_VER != "2.00":
if sm.MODULE_VER != "0.80":
print("Module Version Error! : serialManager")
version_check = False
if version_check == False:

@ -51,7 +51,7 @@ from .mavlinkVehicleView import (
VehicleView,
VehicleComponent,
ComponentType,
StatusTextEntry,
ConnectionType
)
from .utils import RingBuffer, setup_logger
@ -109,14 +109,12 @@ class mavlink_bridge:
"""初始化訊息處理器映射表,提高處理效率"""
self.message_handlers = {
0: self._handle_heartbeat, # HEARTBEAT
1: self._handle_vehicle_sys_status, # SYS_STATUS
24: self._handle_gps_raw_int, # GPS_RAW_INT
30: self._handle_attitude, # ATTITUDE
32: self._handle_local_position, # LOCAL_POSITION_NED
33: self._handle_global_position, # GLOBAL_POSITION_INT
74: self._handle_vfr_hud, # VFR_HUD
147: self._handle_battery_status, # BATTERY_STATUS
253: self._handle_status_text, # STATUSTEXT
}
def start(self):
@ -233,29 +231,6 @@ class mavlink_bridge:
'vx': msg.vx, 'vy': msg.vy, 'vz': msg.vz
}
def _handle_vehicle_sys_status(self, vehicle, component, msg, timestamp):
"""處理 SYS_STATUS 訊息 (msg_id: 1)"""
diag = component.status.sys_diag
diag.sensors_install_mask = msg.onboard_control_sensors_present
diag.sensors_enabled_mask = msg.onboard_control_sensors_enabled
diag.sensors_health_mask = msg.onboard_control_sensors_health
diag.mcuLoad = msg.load
diag.busErrorRate = msg.drop_rate_comm
diag.busErrorCount = msg.errors_comm
diag.errors_count1 = msg.errors_count1
diag.errors_count2 = msg.errors_count2
diag.errors_count3 = msg.errors_count3
diag.errors_count4 = msg.errors_count4
diag.timestamp = timestamp
def _handle_status_text(self, vehicle, component, msg, timestamp):
"""處理 STATUSTEXT 訊息 (msg_id: 253)"""
text = msg.text.rstrip('\x00') if msg.text else ''
if not text:
return
component.status.status_text_queue.append(
StatusTextEntry(text=text, severity=msg.severity, timestamp=timestamp)
)
def _handle_global_position(self, vehicle, component, msg, timestamp):
"""處理 GLOBAL_POSITION_INT 訊息 (msg_id: 33)"""
@ -420,9 +395,9 @@ class mavlink_object:
self.MAVLink = mav_common.MAVLink(self.mavlinkPipeline, srcSystem=254, srcComponent=191)
# 記錄訊息過濾類型 (可選)
# 0 HEARTBEAT, 1 SYS_STATUS, 24 GPS_RAW_INT, 30 ATTITUDE,
# 32 LOCAL_POSITION_NED, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS, 253 STATUSTEXT
self.bridge_msg_types = set([0, 1, 24, 30, 32, 33, 74, 147, 253])
# 0 HEARTBEAT, 24 GPS_RAW_INT, 30 ATTITUDE,
# 32 LOCAL_POSITION_NED, 33 GLOBAL_POSITION_INT, 74 VFR_HUD, 147 BATTERY_STATUS
self.bridge_msg_types = set([0, 24, 30, 32, 33, 74, 147])
self.return_msg_types = set([])
# 轉發到別的 mavlink object 作為目標端口 的列表
@ -859,8 +834,5 @@ if __name__ == '__main__':
1. async_io_manager.managed_objects mavlink_object.mavlinkObjects 功能重複整合 保留 mavlink_object.mavlinkObjects
2. async_io_manager _stop_event 無效變數移除
2026 06 10
1. 增加 SYS_STATUS STATUSTEXT 訊息的處理機制
'''

@ -45,7 +45,7 @@ from . import mavlinkObject as mo
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "2.50"
MODULE_VER = "2.10"
FC_ROS_DOMAIN_ID = "0"
NODE_KEYS = ("status_publisher", "command_service", "rtcm_relay")
@ -68,16 +68,14 @@ class PublishRateController:
# 注意 這邊是定義區 不要把參數寫在這裡 所以預設全部關閉
# 以這個專案 請看 mainOrchestrator.py 的 Orchestrator 初始化階段
self.topic_intervals = {
'summary': 0.0, # 載具摘要 (sysid 飛行模式 解鎖上鎖 gps狀態)
'position_gnss': 0.0, # GNSS位置 (海拔高度)
'position_ned': 0.0, # LOCAL_POSITION_NED (位置+速度+相對高度)
'position_gnss': 0.0, # GNSS位置
'position_ned': 0.0, # LOCAL_POSITION_NED (位置+速度)
'attitude': 0.0, # 姿態 (pitch yaw row 與其加速狀態)
'velocity': 0.0, # 速度 (已經包含在 vfr_hud 未來移除)
'battery': 0.0, # 電池
'vfr_hud': 0.0, # VFR HUD (地速 空速 絕對高度 爬升率 航向 油門)
'sys_diags': 0.0, # SYS_STATUS 系統診斷
'status_text': 0.0, # STATUSTEXT 飛控文字(佇列驅動,>0 僅作啟用旗標)
'mode': 0.0, # 飛行模式 (已經在 summary 裡 未來移除)
'velocity': 0.0, # 速度 (已經包含在 vfr_hud 未來移除)
'summary': 0.0, # 載具摘要 (sysid 飛行模式 解鎖上鎖 gps狀態)
# 在這裡新增更多 topics...
}
# 記錄每個 topic 的最後發布時間 {(sysid, topic): timestamp}
@ -114,10 +112,6 @@ class PublishRateController:
return False
def is_topic_enabled(self, topic: str) -> bool:
"""檢查 topic 是否啟用interval > 0"""
return self.topic_intervals.get(topic, 0) > 0
def reset(self):
"""重置所有計時器"""
self.last_publish_time.clear()
@ -128,7 +122,7 @@ class VehicleStatusPublisher(Node):
職責:
- 定期從 vehicle_registry 讀取載具狀態
- 頻率控制 (位置/姿態 2Hz, 電池/摘要 1Hz)
- 頻率控制位置/姿態 2Hz電池/摘要 1Hz
- 發布標準 ROS2 消息類型
- 檢測訂閱者按需發布
"""
@ -187,13 +181,11 @@ class VehicleStatusPublisher(Node):
self._publish_position_gnss(sysid, status)
self._publish_position_ned(sysid, status)
self._publish_attitude(sysid, status)
self._publish_velocity(sysid, status)
self._publish_battery(sysid, status)
self._publish_vfr_hud(sysid, status)
self._publish_summary(vehicle)
self._publish_system_diagnostics(sysid, status)
self._publish_status_text(sysid, status)
self._publish_velocity(sysid, status)
self._publish_mode(sysid, status)
self._publish_summary(vehicle)
# 在這裡新增更多 publish 方法調用...
def _get_or_create_publisher(self, sysid: int, topic: str, msg_type, qos: int = 1):
@ -447,7 +439,11 @@ class VehicleStatusPublisher(Node):
'armed': status.armed if status.armed is not None else False,
# 'mode_custom': status.mode.custom_mode if status.mode.custom_mode else 0,
'mode_name': status.mode.mode_name if status.mode.mode_name else "UNKNOWN",
'mav_status': status.system_status if status.system_status is not None else 0,
# 'latitude': status.position.latitude if status.position.latitude else 0.0,
# 'longitude': status.position.longitude if status.position.longitude else 0.0,
# 'altitude': status.position.altitude if status.position.altitude else 0.0,
# 'battery_percent': status.battery.remaining if status.battery.remaining else 0,
# 'gps_fix': status.gps.fix_type if status.gps.fix_type else 0,
'connection_type': vehicle.connected_via.value,
'last_update': component.packet_stats.last_msg_time if component.packet_stats.last_msg_time else 0.0,
}
@ -456,62 +452,6 @@ class VehicleStatusPublisher(Node):
msg.data = json.dumps(summary)
publisher.publish(msg)
def _publish_system_diagnostics(self, sysid: int, status: mvv.ComponentStatus):
"""發布 SYS_STATUS 系統診斷資訊"""
if not self.rate_controller.should_publish(sysid, 'sys_diags'):
return
diag = status.sys_diag
if diag.sensors_install_mask is None:
return
publisher = self._get_or_create_publisher(
sysid, 'sys_diags', fcmsg.SystemDiagnosticsRaw
)
if publisher.get_subscription_count() == 0:
return
msg = fcmsg.SystemDiagnosticsRaw()
msg.stamp = self.get_clock().now().to_msg()
msg.sensors_install_mask = int(diag.sensors_install_mask)
msg.sensors_enabled_mask = int(diag.sensors_enabled_mask or 0)
msg.sensors_health_mask = int(diag.sensors_health_mask or 0)
msg.mcu_load = int(diag.mcuLoad or 0)
msg.bus_error_rate = int(diag.busErrorRate or 0)
msg.bus_error_count = int(diag.busErrorCount or 0)
msg.errors_count1 = int(diag.errors_count1 or 0)
msg.errors_count2 = int(diag.errors_count2 or 0)
msg.errors_count3 = int(diag.errors_count3 or 0)
msg.errors_count4 = int(diag.errors_count4 or 0)
publisher.publish(msg)
def _publish_status_text(self, sysid: int, status: mvv.ComponentStatus):
"""發布 STATUSTEXT 飛控文字 (佇列 drain, 無訂閱者直接丟棄) """
# 是否啟用
if not self.rate_controller.is_topic_enabled('status_text'):
return
# 是否有資料
queue = status.status_text_queue
if not queue:
return
publisher = self._get_or_create_publisher(sysid, 'status_text', std_msgs.msg.String)
# 是否有監聽者
if publisher.get_subscription_count() == 0:
queue.clear()
return
while queue:
entry = queue.popleft()
msg = std_msgs.msg.String()
ts = entry.timestamp if entry.timestamp is not None else 0.0
sev = entry.severity if entry.severity is not None else -1
msg.data = f'[{ts:.3f}] [{sev}] {entry.text}'
publisher.publish(msg)
# ═══════════════════════════════════════════════════════════════
# 【新增 Topic 位置 3/4】
# 若要新增 topic請在此處實作對應的發布方法
@ -530,6 +470,29 @@ class VehicleStatusPublisher(Node):
# # ... 實作發布邏輯
# ═══════════════════════════════════════════════════════════════
@staticmethod
def _euler_to_quaternion(roll, pitch, yaw):
"""
歐拉角轉四元數
Args:
roll: 橫滾角 (弧度)
pitch: 俯仰角 (弧度)
yaw: 偏航角 (弧度)
Returns:
tuple: (qx, qy, qz, qw)
"""
qx = math.sin(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) - \
math.cos(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
qy = math.cos(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + \
math.sin(roll/2) * math.cos(pitch/2) * math.sin(yaw/2)
qz = math.cos(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) - \
math.sin(roll/2) * math.sin(pitch/2) * math.cos(yaw/2)
qw = math.cos(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) + \
math.sin(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
return (qx, qy, qz, qw)
def stop(self):
"""停止發布"""
self.running = False
@ -568,14 +531,6 @@ class MavlinkCommandService(Node):
每次接到一個 service 請求 要整個系統丟某種指令給載具時
會做兩件事 1."丟出mavlink封包" 2."創造一個臨時信箱 Pending"
然後透過每次 manager spin
會去呼叫 return_router() 方法
這個方法會監聽 return_packet_ring 跟臨時信箱的 Pending 做配對
配對到的解開 Pending
解開後 相對應的 handle_XXX 就會開始做事
"""
serviceString_prefix = '/fc_network/vehicle'
@ -1198,10 +1153,6 @@ class fc_ros_manager:
- RtcmRelay
提供統一的啟動/停止介面給 mainOrchestrator
另外 這邊用到 MultiThreadedExecutor 會開出額外的 thread 的特性
使得就算 executor 在跑一些需要等待的方法
常態的 spin_once 也不會被 block (spin_thread 是另一個支線)
"""
def __init__(self):
@ -1481,10 +1432,6 @@ ros2_manager = fc_ros_manager()
2. schedule_restart_node / _restart_node : 手動重啟單一 node (spin thread 內執行
3. orchestrator cmd: ("RESTART_ROS_NODE", node_key), node_key NODE_KEYS
2026.06.10
1. 增加了 _publish_system_diagnostics _publish_status_text 功能
TODO
1. service 部分會需要跟 mavlinkobject 大量互動 也許需要考慮對方的生命週期

@ -5,7 +5,6 @@ VehicleView - Pure State Container
"""
import os
from collections import deque
from typing import Dict, Optional, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
@ -13,7 +12,7 @@ from enum import Enum
from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "1.10"
MODULE_VER = "1.00"
# ====================== Enums =====================
@ -118,30 +117,6 @@ class VFR:
timestamp: Optional[float] = None # 時間戳記
@dataclass
class StatusTextEntry:
"""飛控狀態文字來源MAVLink STATUSTEXT"""
text: str
severity: Optional[int] = None
timestamp: Optional[float] = None
@dataclass
class SystemDiagnostics:
"""系統診斷資訊來源MAVLink SYS_STATUS不含電池欄位"""
sensors_install_mask: Optional[int] = None
sensors_enabled_mask: Optional[int] = None
sensors_health_mask: Optional[int] = None
mcuLoad: Optional[int] = None # 單位 1%
busErrorRate: Optional[int] = None # 單位 0.1%
busErrorCount: Optional[int] = None
errors_count1: Optional[int] = None
errors_count2: Optional[int] = None
errors_count3: Optional[int] = None
errors_count4: Optional[int] = None
timestamp: Optional[float] = None
@dataclass
class ComponentStatus:
"""組件狀態容器"""
@ -152,8 +127,6 @@ class ComponentStatus:
ekf: EKF = field(default_factory=EKF)
gps: GPS = field(default_factory=GPS)
vfr: VFR = field(default_factory=VFR)
sys_diag: SystemDiagnostics = field(default_factory=SystemDiagnostics)
status_text_queue: deque = field(default_factory=lambda: deque(maxlen=64))
# 系統狀態
system_status: Optional[int] = None # MAV_STATE

@ -14,23 +14,20 @@ import signal
import time
import threading
import struct
from collections import deque
from enum import Enum, auto
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Callable, Optional
# # XBee 模組
# from xbee.frame import APIFrame
# 自定義的 import
from .utils import RingBuffer, setup_logger
from .utils import pollStrategy
# ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "2.00"
MODULE_VER = "0.80"
rx_module_ack = RingBuffer(capacity=64, buffer_id=253)
@ -40,8 +37,8 @@ rx_module_ack = RingBuffer(capacity=64, buffer_id=253)
class SerialMode(Enum):
"""連接類型"""
STRAIGHT = auto() # 原始數據直通
XBEEAPI2AT = auto() # XBee API-AT 模式
XBEEAPI_espv1 = auto() # XBee API-API 模式 esp v1
XBEEAPI2AT = auto() # XBee API 模式
XBEEAPI_POLL = auto()
NOT_USE = auto() # 不使用
@ -104,28 +101,19 @@ class RawFrameProcessor(FrameProcessor):
return data
class XBeeFrameProcessor_Base(FrameProcessor):
class XBeeFrameProcessor(FrameProcessor):
"""
XBee API 協議處理器
處理 XBEE API 端口 對應 -> 遠端 XBEE AT Mode
For SerialMode.XBEEAPI2AT
職責
- XBee API frame 的拆幀 / 組幀
- 0x90 (RX Packet) -> 解出 payload 回傳
- 0x88 (AT Response) -> 轉交 at_handler 處理若有注入
- 0x8B (TX Status) -> 目前寫LOG (開發用)
- 0x8B (TX Status) -> 目前忽略
- 其他 frame type -> warning 忽略
若未來要做變化型 XBee (例如 API2 escape mode不同 addressing)
若未來要做變化型 XBee例如 API2 escape mode不同 addressing
繼承此類並覆寫 _encapsulate / _decapsulate / _try_extract_frame 即可
硬體產品系列 (Product Family) : XB3-24
晶片世代: Digi XBee 3 代架構搭載 Silicon Labs EFR32 微控制器
運作頻段: 2.4 GHz RF
硬體構型: TH (Through-Hole)標準針腳插孔式引腳設計
運行協議 (Protocol / Function Set): 802.15.4
韌體版本 (Firmware Version): 2014 -> 2 (XBee 3 平台) + 0 (802.15.4 協議代碼) + 14 (次要版本號)
"""
# XBee API frame type
@ -134,10 +122,6 @@ class XBeeFrameProcessor_Base(FrameProcessor):
FRAME_TYPE_TX_STATUS = 0x8B
FRAME_TYPE_RX_PACKET = 0x90
# ADDR16 選項
DEST_ADDR16_UNICAST = b'\xFF\xFE'
DEST_ADDR16_BRAODCAST = b'\xFF\xFF'
def __init__(self, at_handler: "ATCommandHandler" = None):
super().__init__()
self.at_handler = at_handler
@ -190,15 +174,11 @@ class XBeeFrameProcessor_Base(FrameProcessor):
return None
def _dispatch_frame(self, frame: bytes) -> bytes:
"""
根據 frame type 分派
1. 若是 RX payload return bytes 遞交出去
2. 若是 AT command 進到相依類別 ATCommandHandler 處理
"""
"""根據 frame type 分派;若是 RX payload 回傳 bytes其餘回傳 None"""
frame_type = frame[3]
if frame_type == self.FRAME_TYPE_RX_PACKET: # mavlink
return self._decapsulate(frame)[0]
return self._decapsulate(frame)
if frame_type == self.FRAME_TYPE_AT_RESPONSE: # AT command
if self.at_handler is not None:
@ -206,12 +186,6 @@ class XBeeFrameProcessor_Base(FrameProcessor):
return None
if frame_type == self.FRAME_TYPE_TX_STATUS:
length = (frame[1] << 8) | frame[2] # for debug
logger.info(
f"TX Status raw={frame.hex()}, api_len={length}, "
f"fid=0x{frame[4]:02X}, dest16=0x{(frame[5]<<8)|frame[6]:04X}, "
f"retry={frame[7]}, delivery={frame[8]}, discovery={frame[9]}"
) # for debug
return None
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
@ -221,8 +195,7 @@ class XBeeFrameProcessor_Base(FrameProcessor):
@staticmethod
def _encapsulate(
data: bytes,
dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\x00\x00',
dest_addr16 = DEST_ADDR16_BRAODCAST,
dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF',
frame_id: int = 0x01,
) -> bytes:
"""
@ -230,7 +203,8 @@ class XBeeFrameProcessor_Base(FrameProcessor):
- 使用廣播地址
- 添加適當的頭部和校驗和
"""
frame_type = XBeeFrameProcessor_Base.FRAME_TYPE_TX_REQUEST
frame_type = XBeeFrameProcessor.FRAME_TYPE_TX_REQUEST
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
@ -245,519 +219,7 @@ class XBeeFrameProcessor_Base(FrameProcessor):
"""從 RX Packet (0x90) 取出 payload"""
length = (frame[1] << 8) | frame[2]
rf_data_start = 3 + 12
payload = frame[rf_data_start:3 + length]
senderAddr = frame[4:12]
return payload, senderAddr
class XBeeFrameProcessor_ESPv1(XBeeFrameProcessor_Base):
# GCS -> UAV:
# DISC
# POLL + esp_sysid(1) + grant_bytes(2)
#
# UAV -> GCS:
# HELO + esp_sysid(1)
# DONE + sysid(1) + sent_len(2) + remain_len(2)
DISC_HEADER = b'DISC'
HELLO_HEADER = b'HELO'
POLL_HEADER = b'POLL'
DONE_HEADER = b'DONE'
MAX_BYTES_PER_FLUSH = 150
MAX_PAYLOAD_PER_FRAME = 80
CHUNK_SEND_INTERVAL_SEC = 0.01
class Esp32DeviceInfo:
def __init__(self, system_id, address_64, last_hello_time):
self.system_id = system_id
self.address_64 = address_64
self.last_hello_time = last_hello_time
self.remain_bytes = 0 # 剩餘 buffer 量
self.last_done_time = 0.0 # 最後送出Done的時間
self.received_len = 0 # 收到封包累計
def __init__(self, at_handler: "ATCommandHandler" = None):
super().__init__(at_handler)
self.max_discovery_window_ms = 220
self.is_discovery_phase = False
self.esp32_address_mapping = {}
self.operator_busy = False
self.operator_running = False
self.serial_writer: Optional[Callable[[bytes], None]] = None
self.event_loop: Optional[asyncio.AbstractEventLoop] = None
self.serial_baudrate = 115200
self.gcs_transmit_queue: deque[bytearray] = deque()
self.poll_scheduler_state = pollStrategy.PollSchedulerState()
self.command_pending_event: Optional[asyncio.Event] = None
self.poll_done_event: Optional[asyncio.Event] = None
self.current_poll_address_64: Optional[bytes] = None
self.last_discovery_time = 0.0
self.discovery_interval_seconds = 30.0 # 每次做 discovery 程序的間隔時間
self.device_offline_timeout = self.discovery_interval_seconds * 2 # 遠端沒有回應會被踢出 超時時限
self.operator_tick_interval_seconds = 0.03 #
self.guard_milliseconds = 50 # POLL DONE 的保底時間間隔
self.pending_manual_discovery = False
self.pending_manual_poll: Optional[tuple[int, Optional[int]]] = None
# ---- 注入與設定 ----
def set_writer(self, writer: Callable[[bytes], None]) -> None:
self.serial_writer = writer
def set_event_loop(self, loop: asyncio.AbstractEventLoop) -> None:
self.event_loop = loop
self._ensure_async_primitives()
def set_serial_baudrate(self, baudrate: int) -> None:
self.serial_baudrate = baudrate
def _ensure_async_primitives(self) -> None:
if self.command_pending_event is None:
self.command_pending_event = asyncio.Event()
if self.poll_done_event is None:
self.poll_done_event = asyncio.Event()
# 有急事 叫醒 operator 做下一個 tick
def wake_operator(self) -> None:
self._ensure_async_primitives()
self.command_pending_event.set()
# ---- 拆幀分派 ----
def _dispatch_frame(self, frame: bytes) -> Optional[bytes]:
frame_type = frame[3]
if frame_type == self.FRAME_TYPE_RX_PACKET:
payload, sender_address_64 = self._decapsulate(frame)
if payload.startswith(self.HELLO_HEADER) and len(payload) == 5:
self.handle_hello_report(payload, sender_address_64)
return None
if payload.startswith(self.DONE_HEADER) and len(payload) == 9:
self.handle_done_report(payload, sender_address_64)
return None
remote_device = self.esp32_address_mapping.get(sender_address_64)
if remote_device is not None:
remote_device.received_len += len(payload)
return payload
if frame_type == self.FRAME_TYPE_AT_RESPONSE:
if self.at_handler is not None:
self.at_handler.handle_frame(frame)
return None
if frame_type == self.FRAME_TYPE_TX_STATUS:
length = (frame[1] << 8) | frame[2]
logger.debug(
f"TX Status raw={frame.hex()}, api_len={length}, "
f"fid=0x{frame[4]:02X}, dest16=0x{(frame[5]<<8)|frame[6]:04X}, "
f"retry={frame[7]}, delivery={frame[8]}, discovery={frame[9]}"
)
return None
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
return None
# ---- DISC / POLL 封裝 ----
def pack_discovery(self) -> bytes:
return self._encapsulate(self.DISC_HEADER, frame_id=0x00)
def handle_hello_report(self, payload: bytes, sender_address_64: bytes) -> None:
system_id = payload[4]
remote_device = self.esp32_address_mapping.get(sender_address_64)
if remote_device is None:
self.esp32_address_mapping[sender_address_64] = self.Esp32DeviceInfo(
system_id, sender_address_64, time.time()
)
logger.debug(
f"new HELO system_id={system_id}, address_64={sender_address_64.hex()}"
)
elif remote_device.address_64 == sender_address_64:
remote_device.last_hello_time = time.time()
else:
logger.warning(
f"SYSID duplicated system_id={system_id}, "
f"address_64={sender_address_64.hex()}"
)
def pack_poll(self, target_address_64: bytes, grant_bytes: int = 0) -> Optional[bytes]:
remote_device = self.esp32_address_mapping.get(target_address_64)
if remote_device is None:
return None
poll_payload = self.POLL_HEADER + struct.pack(
'>BH', remote_device.system_id, grant_bytes
)
remote_device.received_len = 0
return self._encapsulate(
poll_payload,
dest_addr64=remote_device.address_64,
dest_addr16=self.DEST_ADDR16_UNICAST,
frame_id=0x02,
)
def handle_done_report(self, payload: bytes, sender_address_64: bytes) -> None:
remote_device = self.esp32_address_mapping.get(sender_address_64)
if remote_device is None:
return
system_id, sent_length, remain_length = struct.unpack('>BHH', payload[4:9])
if sent_length != remote_device.received_len:
logger.info(
f"POLL may be missing packets sent={sent_length} "
f"received={remote_device.received_len} system_id={system_id}"
)
remote_device.received_len = 0
remote_device.remain_bytes = remain_length
remote_device.last_done_time = time.time()
if (
self.current_poll_address_64 is not None
and sender_address_64 == self.current_poll_address_64
and self.poll_done_event is not None
):
self.poll_done_event.set()
# ---- UDP到Serial 佇列 ----
def enqueue_gcs_transmit(self, payload: bytes) -> None:
if not payload:
return
self.gcs_transmit_queue.append(bytearray(payload))
# 只是 show 狀態
def get_gcs_queue_byte_count(self) -> int:
return sum(len(packet) for packet in self.gcs_transmit_queue)
# 只是 show 狀態
def get_gcs_queue_packet_count(self) -> int:
return len(self.gcs_transmit_queue)
# 把 gcs_transmit_queue 的 mavlink 封包依照大小打包出來
def _pop_flush_batch(self, max_bytes: int) -> list[bytes]:
if not self.gcs_transmit_queue:
return []
batch: list[bytes] = []
total_bytes = 0
while self.gcs_transmit_queue:
next_packet = bytes(self.gcs_transmit_queue[0])
packet_length = len(next_packet)
if not batch:
batch.append(self.gcs_transmit_queue.popleft())
total_bytes = packet_length
if packet_length > max_bytes:
break
continue
if total_bytes + packet_length <= max_bytes:
batch.append(self.gcs_transmit_queue.popleft())
total_bytes += packet_length
else:
break
return batch
# 把數據封裝好以後 交給 serial_writer 排程送出
async def _send_gcs_packet(self, packet: bytes) -> None:
# 重複判斷
# if self.serial_writer is None:
# return
sent_offset = 0
while sent_offset < len(packet):
chunk_end = min(
sent_offset + self.MAX_PAYLOAD_PER_FRAME,
len(packet),
)
chunk = packet[sent_offset:chunk_end]
sent_offset = chunk_end
self.serial_writer(self._encapsulate(chunk))
await asyncio.sleep(self.CHUNK_SEND_INTERVAL_SEC)
# 把上面兩個步驟打包起來 處理從 UDP 來的資訊 丟給 Serial
async def flush_gcs_transmit_queue(
self,
max_bytes: int = MAX_BYTES_PER_FLUSH,
) -> None:
if self.serial_writer is None:
logger.warning("GCS flush skipped: serial writer not ready")
return
for packet in self._pop_flush_batch(max_bytes):
await self._send_gcs_packet(packet)
# ---- POLL 排程輔助 ----
# 計算下次要 poll 的對象跟大小
def _pick_poll_target(self) -> tuple[Optional[bytes], int]:
poll_devices = [
pollStrategy.PollDevice(
address_64=address_64,
# system_id=device.system_id,
remain_bytes=device.remain_bytes,
last_done_time=device.last_done_time,
)
for address_64, device in self.esp32_address_mapping.items()
]
return pollStrategy.pick_next(
poll_devices,
self.poll_scheduler_state
)
# 移除長時間沒有 HELLO 或 DONE 的 Dongle
def _prune_stale_devices(self) -> None:
now = time.time()
stale_addresses = [
address_64
for address_64, device in self.esp32_address_mapping.items()
if (now - device.last_hello_time > self.device_offline_timeout) and \
(now - device.last_done_time > self.device_offline_timeout)
]
for address_64 in stale_addresses:
device = self.esp32_address_mapping.pop(address_64, None)
if device is not None:
logger.info(
f"Removed stale device system_id={device.system_id} "
f"address_64={address_64.hex()}"
)
# 判斷是否進入 discovery 程序
def _should_run_discovery(self) -> bool:
# 條件1. 目前沒有任何遠端ESP裝置被紀錄 或者 手動啟動
if (not self.esp32_address_mapping) or (self.pending_manual_discovery):
return True
# 條件2. 每個固定週期 會做一次
return (
time.time() - self.last_discovery_time
>= self.discovery_interval_seconds
)
# ---- 手動請求thread-safe 對外 API----
def request_discovery(self) -> bool:
"""
手動觸發 discovery 可從任意 thread 呼叫
實際排程在 serial_manager event loop 內執行
"""
if self.event_loop is None:
logger.warning("ESPv1 request_discovery: event loop not ready")
return False
if not self.operator_running:
logger.warning("ESPv1 request_discovery: operator not running")
return False
self.event_loop.call_soon_threadsafe(self._apply_request_discovery)
return True
def request_poll(
self,
target_system_id: int,
grant_bytes: Optional[int] = None,
) -> bool:
"""
手動對指定 system_id 排入一輪 POLL 可從任意 thread 呼叫
實際排程在 serial_manager event loop 內執行
"""
if self.event_loop is None:
logger.warning("ESPv1 request_poll: event loop not ready")
return False
if not self.operator_running:
logger.warning("ESPv1 request_poll: operator not running")
return False
self.event_loop.call_soon_threadsafe(
self._apply_request_poll,
target_system_id,
grant_bytes,
)
return True
def _apply_request_discovery(self) -> None:
self.pending_manual_discovery = True
self.wake_operator()
def _apply_request_poll(
self,
target_system_id: int,
grant_bytes: Optional[int],
) -> None:
self.pending_manual_poll = (target_system_id, grant_bytes)
self.wake_operator()
def get_status_snapshot(self) -> dict:
"""
唯讀狀態快照 for debug
從其他 thread 讀取時不保證與 operator 原子一致
"""
now = time.time()
devices = []
for address_64, device in self.esp32_address_mapping.items():
devices.append({
"system_id": device.system_id,
"address_64": address_64.hex(),
"remain_bytes": device.remain_bytes,
"last_hello_age_seconds": now - device.last_hello_time,
"last_done_age_seconds": (
now - device.last_done_time if device.last_done_time else None
),
})
return {
"operator_busy": self.operator_busy,
"is_discovery_phase": self.is_discovery_phase,
"gcs_queue_bytes": self.get_gcs_queue_byte_count(),
"gcs_queue_packets": self.get_gcs_queue_packet_count(),
"devices": devices,
}
# ---- operator 主循環 ----
# operator 的運行鐘 有點像是 spin_once
async def operator_loop(self) -> None:
self._ensure_async_primitives()
logger.info("ESPv1 operator loop started")
while self.operator_running:
try:
await self._operator_tick() # TODO 最好不要在 while loop 塞 try 想辦法改掉
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error(f"ESPv1 operator tick error: {exc}")
await self._wait_for_next_tick()
logger.info("ESPv1 operator loop stopped")
# 安排啥時要醒來做事
async def _wait_for_next_tick(self) -> None:
self.command_pending_event.clear()
# 固定時間醒來
sleep_task = asyncio.create_task( asyncio.sleep(self.operator_tick_interval_seconds) )
# 有"急事"被叫醒
wake_task = asyncio.create_task(self.command_pending_event.wait())
done, pending = await asyncio.wait(
{sleep_task, wake_task},
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
async def _operator_tick(self) -> None:
# 1. 移除沒反應 dongle
self._prune_stale_devices()
# 2. 忙碌時 略過這次循環
if self.operator_busy:
return
# 3. (最優先) 處理從 UDP 過來的封包 並且透過 Serial 送出
if self.gcs_transmit_queue:
await self.flush_gcs_transmit_queue()
return
# 4. 檢測要不要跑 discovery 程序
if self._should_run_discovery():
await self._run_discovery()
return
# 5. POLL 程序 (若有手動先執行 若無自動則策略決策)
if self.pending_manual_poll is not None:
target_system_id, grant_bytes = self.pending_manual_poll
self.pending_manual_poll = None
target_address = self._find_address_by_system_id(target_system_id)
else:
target_address, grant_bytes = self._pick_poll_target()
if target_address is None:
return
await self._run_one_poll(target_address, grant_bytes if grant_bytes is not None else 0)
def _find_address_by_system_id(self, target_system_id: int) -> Optional[bytes]:
for address_64, device in self.esp32_address_mapping.items():
if device.system_id == target_system_id:
return address_64
return None
# discovery 程序
async def _run_discovery(self) -> None:
if self.serial_writer is None:
self.pending_manual_discovery = False
return
self.operator_busy = True
self.is_discovery_phase = True
self.serial_writer(self.pack_discovery())
self.last_discovery_time = time.time()
await asyncio.sleep(self.max_discovery_window_ms / 1000.0)
self.operator_busy = False
self.is_discovery_phase = False
self.pending_manual_discovery = False
# poll 程序
async def _run_one_poll(self, target_address_64: bytes, grant_bytes: int) -> None:
if self.serial_writer is None:
return
# 這邊把要求封包最小量放在36 是讓載具端正好可以回傳一個有簽章的 HEARTBEAT 封包的大小
# 算是 "探測封包" 這樣讓系統可以更快速的知道載具端殘餘的資料量
grant_bytes = max(36, min(int(grant_bytes), 65535))
poll_frame = self.pack_poll(target_address_64, grant_bytes)
if poll_frame is None:
return
self._ensure_async_primitives()
self.operator_busy = True
self.current_poll_address_64 = target_address_64
self.poll_done_event.clear()
self.serial_writer(poll_frame)
timeout_seconds = pollStrategy.estimate_poll_timeout(
grant_bytes,
self.serial_baudrate,
self.guard_milliseconds,
)
try:
await asyncio.wait_for(
self.poll_done_event.wait(),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
logger.warning(
f"POLL timeout address_64={target_address_64.hex()} "
f"grant_bytes={grant_bytes}"
)
finally:
self.operator_busy = False
self.current_poll_address_64 = None
await asyncio.sleep(self.guard_milliseconds / 1000.0)
def stop_operator(self) -> None:
self.operator_running = False
self.gcs_transmit_queue.clear()
self.wake_operator()
return frame[rf_data_start:3 + length]
# ====================== Dongle Command Handler =====================
@ -887,6 +349,7 @@ class ATCommandHandler:
"""處理 SL (Serial Number Low)"""
pass
# ================ Serial UDP Socket Object ==============
class SerialHandler(asyncio.Protocol):
"""asyncio.Protocol 用於處理 Serial 收發"""
@ -905,13 +368,13 @@ class SerialHandler(asyncio.Protocol):
if self.serial_mode == SerialMode.STRAIGHT:
return RawFrameProcessor()
elif self.serial_mode == SerialMode.XBEEAPI2AT:
if self.serial_mode == SerialMode.XBEEAPI2AT:
at_handler = ATCommandHandler(self.serial_port_str)
return XBeeFrameProcessor_Base(at_handler=at_handler)
return XBeeFrameProcessor(at_handler=at_handler)
elif self.serial_mode == SerialMode.XBEEAPI_espv1:
at_handler = ATCommandHandler(self.serial_port_str)
return XBeeFrameProcessor_ESPv1(at_handler=at_handler)
# if self.serial_mode == SerialMode.XBEEAPI_POLL:
# at_handler = ATCommandHandler_new(self.serial_port_str)
# return XBeeFrameProcessor(at_handler=at_handler)
logger.warning(f"Unknown serial mode: {self.serial_mode}, using Raw")
return RawFrameProcessor()
@ -924,13 +387,6 @@ class SerialHandler(asyncio.Protocol):
if self.serial_mode == SerialMode.XBEEAPI2AT:
self.processor.at_handler.set_writer(self.transport.write)
elif self.serial_mode == SerialMode.XBEEAPI_espv1:
if isinstance(self.processor, XBeeFrameProcessor_ESPv1):
self.processor.set_writer(self.transport.write)
self.processor.set_event_loop(asyncio.get_running_loop())
if self.processor.at_handler is not None:
self.processor.at_handler.set_writer(self.transport.write)
if hasattr(self.udp_handler, 'set_serial_handler'):
self.udp_handler.set_serial_handler(self)
logger.debug(f"Serial port {self.serial_port_str} connected")
@ -974,14 +430,10 @@ class UDPHandler(asyncio.DatagramProtocol):
logger.warning("Serial handler not set, dropping UDP packet")
return
if self.serial_mode == SerialMode.XBEEAPI_espv1:
processor = self.serial_handler.processor
# if isinstance(processor, XBeeFrameProcessor_ESPv1): # 預設唯一綁定 少一個 if 多一點效率
processor.enqueue_gcs_transmit(data)
processor.wake_operator()
return
# 使用 processor 封裝數據
processed_data = self.serial_handler.processor.process_outgoing(data)
# 發送到串口
self.serial_handler.transport.write(processed_data)
@ -1002,7 +454,6 @@ class serial_manager:
self.protocol = None
self.udp_handler = None
self.serial_handler = None
self.operator_task = None
def __init__(self):
self.thread = None
@ -1143,16 +594,6 @@ class serial_manager:
logger.debug(f"Serial connection created for {serial_port}")
if serial_mode == SerialMode.XBEEAPI_espv1:
processor = serial_obj.serial_handler.processor
if isinstance(processor, XBeeFrameProcessor_ESPv1):
processor.set_serial_baudrate(baudrate)
processor.operator_running = True
serial_obj.operator_task = asyncio.create_task(
processor.operator_loop()
)
logger.debug(f"ESPv1 operator task started for {serial_port}")
# 將 serial_object 加入管理列表
serial_id = self.serial_count + 1
self.serial_objects[serial_id] = serial_obj
@ -1206,18 +647,6 @@ class serial_manager:
try:
serial_obj = self.serial_objects[serial_id]
if serial_obj.serial_mode == SerialMode.XBEEAPI_espv1:
processor = serial_obj.serial_handler.processor
if isinstance(processor, XBeeFrameProcessor_ESPv1):
processor.stop_operator()
if serial_obj.operator_task is not None:
serial_obj.operator_task.cancel()
try:
await serial_obj.operator_task
except asyncio.CancelledError:
pass
serial_obj.operator_task = None
# 關閉 UDP transport
if hasattr(serial_obj, 'transport') and serial_obj.transport:
serial_obj.transport.close()
@ -1258,7 +687,6 @@ class serial_manager:
logger.error(f"Serial object {serial_id} not found")
return False
# TODO 這邊的防呆 應該可以不用 if 有空再改
serial_obj = self.serial_objects[serial_id]
if serial_obj.serial_mode != SerialMode.XBEEAPI2AT:
logger.error(
@ -1271,18 +699,6 @@ class serial_manager:
self.loop.call_soon_threadsafe(at_handler.send_command, request)
return True
def get_espv1_processor(self, serial_id: int) -> Optional[XBeeFrameProcessor_ESPv1]:
"""依 serial_id 取得 ESPv1 processor 不存在或模式不符回傳 None。"""
if serial_id not in self.serial_objects:
return None
serial_obj = self.serial_objects[serial_id]
if serial_obj.serial_mode != SerialMode.XBEEAPI_espv1:
return None
processor = serial_obj.serial_handler.processor
if not isinstance(processor, XBeeFrameProcessor_ESPv1):
return None
return processor
@staticmethod
def check_serial_port(serial_port, baudrate):
"""檢查串口是否存在與可用"""
@ -1320,100 +736,42 @@ if __name__ == '__main__':
sm = serial_manager()
sm.start()
# # 測試項一
# SERIAL_PORT = '/dev/ttyACM0' # 手動指定
# SERIAL_BAUDRATE = 115200
# UDP_REMOTE_PORT = 14571
# sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT)
# # 測試項二
# SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
# SERIAL_BAUDRATE = 115200
# UDP_REMOTE_PORT = 14561
# sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
# linked_serial = sm.get_serial_link()
# print(linked_serial)
# # 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試
# time.sleep(5)
# rssi_request = ATRequest(command=b'DB', parameter=b'', frame_id=0x52)
# for i in range(60):
# sm.send_at_command(1, rssi_request)
# time.sleep(1)
# sm.remove_serial_link(1)
# time.sleep(3)
# sm.shutdown()
# # 測試項三
SERIAL_PORT = '/dev/ttyUSB0'
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14561
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI_espv1)
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
time.sleep(2) # 等 serial 連線與 operator 啟動
linked_serial = sm.get_serial_link()
print(linked_serial)
serial_id = 1
processor = sm.get_espv1_processor(serial_id)
if processor is not None:
processor.request_discovery()
processor.request_poll(target_system_id=1)
processor.request_poll(target_system_id=1, grant_bytes=200)
print(processor.get_status_snapshot())
print(processor.get_gcs_queue_byte_count())
# 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試
time.sleep(5)
rssi_request = ATRequest(command=b'DB', parameter=b'', frame_id=0x52)
for i in range(60):
sm.send_at_command(1, rssi_request)
time.sleep(1)
sm.remove_serial_link(serial_id)
sm.remove_serial_link(1)
time.sleep(3)
sm.shutdown()
'''
================= 改版記錄 ============================
2026.4.20
1. XBeeFrameHandler 結構移除
2. XBeeFrameProcessor_Base 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler )
3. XBeeFrameProcessor_Base 新增 _try_extract_frame 處理被可能截斷的 UART 封包
4. XBeeFrameProcessor_Base 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler
2. XBeeFrameProcessor 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler )
3. XBeeFrameProcessor 新增 _try_extract_frame 處理被可能截斷的 UART 封包
4. XBeeFrameProcessor 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler
5. ATCommandHandler 新增 _parse 去拆解 0x88 AT Command Response
6. ATCommandHandler 新增 _dispatch 把拆解的結果 分配到 _handle_XXX
7. ATCommandHandler 新增各項 _handle_XXX (未實作)
2026.06.15
1. 修改 XBeeFrameProcessor_Base _decapsulate 使其解出發送端 dongle src64 地址
2. 新增 XBeeFrameProcessor_ESPv1 這個類別繼承 XBeeFrameProcessor_Base
3. 新增 XBeeFrameProcessor_ESPv1 組合 DISC POLL 訊息 處理 HELO DONE 的能力
4. DISC 自動化與手動
5. POLL 的自動化
2026.06.16
1. 移除 DongleCommandHandler_ESPv1
2. ESPv1 對外 APIrequest_discovery / request_poll移至 XBeeFrameProcessor_ESPv1thread-safe
3. serial_manager 僅保留 get_espv1_processor(serial_id) lookup
註解1 : FRAME_TYPE_TX_STATUS 的對應解說 (我不喜歡程式塞太多為了顯示而顯示的東西 錯誤碼自己下來看)
TX_DELIVERY_STATUS
0x00: "Success",
0x01: "No ACK received",
0x02: "CCA failure",
0x15: "Invalid destination endpoint",
0x21: "Network ACK failure",
0x22: "Not joined to network",
0x23: "Self-addressed",
0x24: "Address not found",
0x25: "Route not found",
0x26: "Broadcast source relay",
0x27: "Insufficient data",
0x28: "TX buffered",
0x32: "Invalid send flag",
0x74: "Resource error",
TX_DISCOVERY_STATUS
0x00: "No discovery overhead",
0x01: "Address discovery",
0x02: "Route discovery",
0x03: "Address and route discovery",
'''

@ -1,71 +0,0 @@
"""
POLL 輪詢策略 XBeeFrameProcessor_ESPv1 使用
不依賴 serialManager僅接受裝置快照與排程狀態
"""
from dataclasses import dataclass
@dataclass(frozen=True)
class PollDevice:
"""從 esp32AddrMapping 抽出的唯讀快照。"""
address_64: bytes
# system_id: int
remain_bytes: int
last_done_time: float
@dataclass
class PollSchedulerState:
"""每條 serial link 一份,保存 round-robin 索引。"""
round_robin_index: int = 0
def pick_next(
devices: list[PollDevice],
scheduler_state: PollSchedulerState,
# default_grant_bytes: int = 600,
) -> tuple[bytes | None, int]:
"""
選下一個 POLL 目標
回傳 (target_address_64, grant_bytes)devices 為空時回傳 (None, 0)
"""
if not devices:
return None, 0
device_count = len(devices)
selected_index = scheduler_state.round_robin_index % device_count
selected_device = devices[selected_index]
scheduler_state.round_robin_index += 1
grant_bytes = 0
if selected_device.remain_bytes > 0:
grant_bytes = min( 65535, max(0, selected_device.remain_bytes))
return selected_device.address_64, grant_bytes
def estimate_poll_timeout(
grant_bytes: int,
baudrate: int,
guard_milliseconds: int = 20,
) -> float:
"""
估算 POLL 後等待 DONE 的超時秒數
移植自 udptest8 estimate_tdma_timeout
"""
grant_bytes = max(0, int(grant_bytes))
max_payload_per_chunk = 100
chunk_count = max(1, (grant_bytes + max_payload_per_chunk - 1) // max_payload_per_chunk)
uart_time = (grant_bytes + chunk_count * 18 + 32) * 10.0 / baudrate
timeout = (
uart_time
+ chunk_count * 0.012
+ (guard_milliseconds / 1000.0)
+ 0.35
)
return timeout

@ -1,544 +0,0 @@
#!/usr/bin/env python3
"""Standalone curses TUI for fc_network vehicle diagnostics (sys_diags, status_text, summary)."""
from __future__ import annotations
import curses
import json
import re
import sys
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from fc_interfaces.msg import SystemDiagnosticsRaw
VEHICLE_TOPIC_RE = re.compile(
r'^/fc_network/vehicle/(sys\d+)/(sys_diags|status_text|summary)$'
)
STATUS_TEXT_RE = re.compile(r'^\[([\d.]+)\]\s*\[(-?\d+)\]\s*(.*)$')
STATUS_TEXT_TTL_SEC = 30.0
STALE_SEC = 3.0
RESCAN_SEC = 2.0
OFFLINE_SEC = 10.0
DRAW_HZ = 5.0
# MAV_SYS_STATUS_SENSOR bit labels (MAVLink common)
SENSOR_BITS: Tuple[Tuple[int, str], ...] = (
(268435456, 'PreArm'),
(1, 'Gyro'),
(2, 'Accel'),
(4, 'Mag'),
(8, 'Baro'),
(16, 'DiffPress'),
(32, 'GPS'),
(64, 'OptFlow'),
(128, 'Vision'),
(256, 'Laser'),
(512, 'ExtGT'),
(1024, 'AngRate'),
(2048, 'AttStab'),
(4096, 'YawPos'),
(8192, 'AltCtrl'),
(16384, 'XYCtrl'),
(32768, 'Motor'),
(65536, 'RC'),
(131072, 'Gyro2'),
(262144, 'Accel2'),
(524288, 'Mag2'),
(1048576, 'Geofence'),
(2097152, 'AHRS'),
(4194304, 'Terrain'),
(8388608, 'RevMotor'),
(16777216, 'Log'),
(33554432, 'Batt'),
(67108864, 'Prox'),
(134217728, 'Satcom'),
(536870912, 'ObsAvoid'),
(1073741824, 'Propulsion'),
)
MAV_STATE_LABELS = {
0: 'UNINIT',
1: 'BOOT',
2: 'CAL',
3: 'STANDBY',
4: 'ACTIVE',
5: 'CRITICAL',
6: 'EMERGENCY',
7: 'OFF',
8: 'TERM',
}
SEVERITY_LABELS = {
0: 'EMERG',
1: 'ALERT',
2: 'CRIT',
3: 'ERROR',
4: 'WARN',
5: 'NOTICE',
6: 'INFO',
7: 'DEBUG',
}
@dataclass
class StatusTextEntry:
received_at: float
severity: int
text: str
raw: str
sys_key: str
@dataclass
class VehicleDiagState:
sys_key: str
sysid: int
summary: Optional[dict] = None
diag: Optional[SystemDiagnosticsRaw] = None
status_entries: List[StatusTextEntry] = field(default_factory=list)
last_summary_ts: float = 0.0
last_diag_ts: float = 0.0
last_topic_seen_ts: float = 0.0
offline: bool = False
def sys_key_to_id(sys_key: str) -> int:
return int(sys_key.replace('sys', '', 1))
def purge_status_entries(entries: List[StatusTextEntry], now: float, ttl: float = STATUS_TEXT_TTL_SEC) -> None:
cutoff = now - ttl
entries[:] = [e for e in entries if e.received_at >= cutoff]
def parse_status_text(raw: str, sys_key: str, received_at: float) -> StatusTextEntry:
match = STATUS_TEXT_RE.match(raw.strip())
if match:
severity = int(match.group(2))
text = match.group(3)
else:
severity = -1
text = raw
return StatusTextEntry(
received_at=received_at,
severity=severity,
text=text,
raw=raw,
sys_key=sys_key,
)
def decode_sensors(install: int, enabled: int, health: int) -> List[Tuple[str, str]]:
"""Return list of (label, status) where status is OK / FAIL / OFF."""
items: List[Tuple[str, str]] = []
for bit, label in SENSOR_BITS:
if not (install & bit):
continue
if not (enabled & bit):
items.append((label, 'OFF'))
elif health & bit:
items.append((label, 'OK'))
else:
items.append((label, 'FAIL'))
return items
def format_age(seconds: float) -> str:
if seconds < 0:
return 'never'
if seconds < 1.0:
return f'{seconds * 1000:.0f}ms'
if seconds < 60.0:
return f'{seconds:.1f}s'
return f'{seconds / 60:.1f}m'
def truncate(text: str, width: int) -> str:
if width <= 0:
return ''
if len(text) <= width:
return text
if width <= 3:
return text[:width]
return text[: width - 3] + '...'
class VehicleDiagNode(Node):
"""ROS2 node: discover vehicles and subscribe to diagnostic topics."""
def __init__(self) -> None:
super().__init__('vehicle_diag_show')
self._lock = threading.Lock()
self._vehicles: Dict[str, VehicleDiagState] = {}
self._subs: Dict[str, Dict[str, object]] = {}
self._scan_timer = self.create_timer(RESCAN_SEC, self._rescan_topics)
self._rescan_topics()
def snapshot(self) -> Dict[str, VehicleDiagState]:
now = time.monotonic()
with self._lock:
for state in self._vehicles.values():
purge_status_entries(state.status_entries, now)
if state.last_topic_seen_ts > 0:
state.offline = (now - state.last_topic_seen_ts) > OFFLINE_SEC
return {k: state for k, state in self._vehicles.items()}
def _get_or_create_state(self, sys_key: str) -> VehicleDiagState:
if sys_key not in self._vehicles:
self._vehicles[sys_key] = VehicleDiagState(
sys_key=sys_key,
sysid=sys_key_to_id(sys_key),
)
return self._vehicles[sys_key]
def _rescan_topics(self) -> None:
now = time.monotonic()
found: set = set()
for topic_name, _ in self.get_topic_names_and_types():
match = VEHICLE_TOPIC_RE.match(topic_name)
if not match:
continue
sys_key = match.group(1)
found.add(sys_key)
with self._lock:
state = self._get_or_create_state(sys_key)
state.last_topic_seen_ts = now
state.offline = False
self._ensure_subscriptions(sys_key)
with self._lock:
for sys_key, state in self._vehicles.items():
if sys_key not in found and state.last_topic_seen_ts > 0:
state.offline = (now - state.last_topic_seen_ts) > OFFLINE_SEC
def _ensure_subscriptions(self, sys_key: str) -> None:
if sys_key in self._subs:
return
base = f'/fc_network/vehicle/{sys_key}'
subs = {
'summary': self.create_subscription(
String,
f'{base}/summary',
lambda msg, sk=sys_key: self._on_summary(sk, msg),
10,
),
'sys_diags': self.create_subscription(
SystemDiagnosticsRaw,
f'{base}/sys_diags',
lambda msg, sk=sys_key: self._on_sys_diags(sk, msg),
10,
),
'status_text': self.create_subscription(
String,
f'{base}/status_text',
lambda msg, sk=sys_key: self._on_status_text(sk, msg),
10,
),
}
with self._lock:
self._subs[sys_key] = subs
def _on_summary(self, sys_key: str, msg: String) -> None:
now = time.monotonic()
try:
summary = json.loads(msg.data)
except json.JSONDecodeError:
summary = {'raw': msg.data}
with self._lock:
state = self._get_or_create_state(sys_key)
state.summary = summary
state.last_summary_ts = now
state.last_topic_seen_ts = now
def _on_sys_diags(self, sys_key: str, msg: SystemDiagnosticsRaw) -> None:
now = time.monotonic()
with self._lock:
state = self._get_or_create_state(sys_key)
state.diag = msg
state.last_diag_ts = now
state.last_topic_seen_ts = now
def _on_status_text(self, sys_key: str, msg: String) -> None:
now = time.monotonic()
entry = parse_status_text(msg.data, sys_key, now)
with self._lock:
state = self._get_or_create_state(sys_key)
state.status_entries.append(entry)
purge_status_entries(state.status_entries, now)
state.last_topic_seen_ts = now
class DiagCursesApp:
"""Curses front-end for vehicle diagnostics."""
def __init__(self, node: VehicleDiagNode) -> None:
self._node = node
self._running = True
def run(self) -> None:
curses.wrapper(self._main)
def _init_colors(self) -> None:
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, curses.COLOR_GREEN, -1) # OK / DISARMED / INFO
curses.init_pair(2, curses.COLOR_RED, -1) # FAIL / ARMED / ERROR+
curses.init_pair(3, curses.COLOR_YELLOW, -1) # WARN / OFF
curses.init_pair(4, curses.COLOR_CYAN, -1) # header / INFO
curses.init_pair(5, curses.COLOR_WHITE, -1) # dim / offline
curses.init_pair(6, curses.COLOR_MAGENTA, -1) # NOTICE
def _main(self, stdscr) -> None:
curses.curs_set(0)
stdscr.nodelay(True)
stdscr.keypad(True)
self._init_colors()
while self._running and rclpy.ok():
try:
self._draw(stdscr)
except curses.error:
pass
key = stdscr.getch()
if key in (ord('q'), ord('Q'), 27):
self._running = False
time.sleep(1.0 / DRAW_HZ)
def _draw(self, stdscr) -> None:
stdscr.erase()
height, width = stdscr.getmaxyx()
now = time.monotonic()
vehicles = self._node.snapshot()
sorted_keys = sorted(vehicles.keys(), key=sys_key_to_id)
header = f' Vehicle Diagnostics Monitor vehicles:{len(sorted_keys)} Q:quit '
stdscr.attron(curses.color_pair(4) | curses.A_BOLD)
stdscr.addstr(0, 0, truncate(header.ljust(width - 1), width - 1))
stdscr.attroff(curses.color_pair(4) | curses.A_BOLD)
row = 1
if not sorted_keys:
msg = ' Waiting for vehicles... (need /fc_network/vehicle/sysN/* topics) '
if row < height - 1:
stdscr.addstr(row, max(0, (width - len(msg)) // 2), truncate(msg, width - 1))
row += 2
vehicle_rows_end = max(row, height - 8)
for sys_key in sorted_keys:
if row >= vehicle_rows_end:
break
state = vehicles[sys_key]
row = self._draw_vehicle_block(stdscr, row, width, state, now)
if row < height - 1:
try:
stdscr.addstr(row, 0, truncate('-' * (width - 1), width - 1))
except curses.error:
pass
row += 1
log_title = ' Status Text (last 30s) '
if row < height - 1:
stdscr.attron(curses.color_pair(4))
stdscr.addstr(row, 0, truncate(log_title.ljust(width - 1), width - 1))
stdscr.attroff(curses.color_pair(4))
row += 1
all_entries: List[StatusTextEntry] = []
for sys_key in sorted_keys:
all_entries.extend(vehicles[sys_key].status_entries)
all_entries.sort(key=lambda e: e.received_at)
log_rows = height - row - 1
if log_rows <= 0:
stdscr.refresh()
return
if not all_entries:
empty = ' (no messages in last 30s) '
if row < height - 1:
stdscr.addstr(row, 0, truncate(empty, width - 1))
else:
visible = all_entries[-log_rows:]
for entry in visible:
if row >= height - 1:
break
self._draw_status_line(stdscr, row, width, entry)
row += 1
stdscr.refresh()
def _draw_vehicle_block(
self,
stdscr,
row: int,
width: int,
state: VehicleDiagState,
now: float,
) -> int:
summary = state.summary or {}
mode = summary.get('mode_name', 'UNKNOWN')
armed = summary.get('armed', False)
conn = summary.get('connection_type', '?')
socket_id = summary.get('socket_id', -1)
mav_status = summary.get('mav_status', 0)
mav_label = MAV_STATE_LABELS.get(int(mav_status), str(mav_status))
last_update = max(state.last_summary_ts, state.last_diag_ts)
age_sec = (now - last_update) if last_update > 0 else -1.0
stale = age_sec >= 0 and age_sec > STALE_SEC
armed_str = 'ARMED' if armed else 'DISARMED'
flags = []
if state.offline:
flags.append('OFFLINE')
if stale and not state.offline:
flags.append('STALE')
flag_str = f" [{' '.join(flags)}]" if flags else ''
line1 = (
f' {state.sys_key} (MAV {state.sysid}) {mode} {armed_str} '
f'{conn} socket:{socket_id} {mav_label} '
f'updated {format_age(age_sec)}{flag_str}'
)
if row < stdscr.getmaxyx()[0] - 1:
if state.offline:
stdscr.attron(curses.color_pair(5))
stdscr.addstr(row, 0, truncate(line1, width - 1))
stdscr.attroff(curses.color_pair(5))
else:
stdscr.addstr(row, 0, truncate(line1, width - 1))
armed_pos = line1.find(armed_str)
if armed_pos >= 0:
pair = curses.color_pair(2) if armed else curses.color_pair(1)
stdscr.attron(pair | curses.A_BOLD)
stdscr.addstr(row, armed_pos, armed_str)
stdscr.attroff(pair | curses.A_BOLD)
row += 1
if state.diag is not None and row < stdscr.getmaxyx()[0] - 1:
load_pct = state.diag.mcu_load / 10.0
drop_pct = state.diag.bus_error_rate / 10.0
line2 = f' MCU {load_pct:.1f}% CommDrop {drop_pct:.1f}%'
stdscr.addstr(row, 0, truncate(line2, width - 1))
row += 1
sensors = decode_sensors(
state.diag.sensors_install_mask,
state.diag.sensors_enabled_mask,
state.diag.sensors_health_mask,
)
row = self._draw_sensor_line(stdscr, row, width, sensors)
elif row < stdscr.getmaxyx()[0] - 1:
stdscr.addstr(row, 0, truncate(' (no sys_diags yet)', width - 1))
row += 1
return row
def _draw_sensor_line(
self,
stdscr,
row: int,
width: int,
sensors: List[Tuple[str, str]],
) -> int:
max_row, _ = stdscr.getmaxyx()
if not sensors:
if row < max_row - 1:
stdscr.addstr(row, 0, truncate(' Sensors: (none installed)', width - 1))
return row + 1
lines: List[List[Tuple[str, str]]] = [[]]
line_len = len(' Sensors: ')
for label, status in sensors:
token = f'{label}:{status} '
if lines[-1] and line_len + len(token) >= width - 1:
lines.append([])
line_len = 2
lines[-1].append((label, status))
line_len += len(token)
for idx, line_items in enumerate(lines):
if row >= max_row - 1:
break
prefix = ' Sensors: ' if idx == 0 else ' '
col = 0
stdscr.addstr(row, col, prefix)
col += len(prefix)
for label, status in line_items:
token = f'{label}:{status} '
if col + len(token) >= width - 1:
stdscr.addstr(row, col, truncate('...', width - col - 1))
return row + 1
pair = curses.A_NORMAL
if status == 'OK':
pair = curses.color_pair(1)
elif status == 'FAIL':
pair = curses.color_pair(2) | curses.A_BOLD
elif status == 'OFF':
pair = curses.color_pair(3)
stdscr.attron(pair)
stdscr.addstr(row, col, token)
stdscr.attroff(pair)
col += len(token)
row += 1
return row
def _draw_status_line(
self,
stdscr,
row: int,
width: int,
entry: StatusTextEntry,
) -> None:
sev_label = SEVERITY_LABELS.get(entry.severity, str(entry.severity))
prefix = f'[{entry.sys_key:5}] [{entry.severity} {sev_label:5}] '
body = truncate(entry.text, max(0, width - len(prefix) - 2))
line = prefix + body
pair = curses.A_NORMAL
if entry.severity <= 3:
pair = curses.color_pair(2)
elif entry.severity == 4:
pair = curses.color_pair(3)
elif entry.severity == 5:
pair = curses.color_pair(6)
elif entry.severity == 6:
pair = curses.color_pair(1)
elif entry.severity == 7:
pair = curses.color_pair(5)
stdscr.attron(pair)
stdscr.addstr(row, 0, truncate(line, width - 1))
stdscr.attroff(pair)
def main() -> int:
if not sys.stdout.isatty():
print('vehicleDiagShow requires an interactive terminal (TTY).', file=sys.stderr)
return 1
rclpy.init()
node = VehicleDiagNode()
spin_thread = threading.Thread(target=rclpy.spin, args=(node,), daemon=True)
spin_thread.start()
try:
DiagCursesApp(node).run()
finally:
node.destroy_node()
rclpy.shutdown()
spin_thread.join(timeout=2.0)
return 0
if __name__ == '__main__':
sys.exit(main())

@ -2,72 +2,17 @@ import socket
import base64
import threading
import time
from typing import Optional, Tuple
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, HistoryPolicy, ReliabilityPolicy, DurabilityPolicy
from mavros_msgs.msg import RTCM
# TODO: ROS_DOMAIN_ID 要補一下
class GGA_stream():
@classmethod
def nmea_checksum(cls, body: str) -> str:
"""body 不含 '$''*checksum',例如 'GPGGA,123519,...'"""
value = 0
for ch in body:
value ^= ord(ch)
return f"{value:02X}"
@classmethod
def decimal_to_nmea_dm(cls, deg: float, *, is_latitude: bool) -> Tuple[str, str]:
"""十進位度 → NMEA 的 (d)dmm.mmmm 與半球字元。"""
if is_latitude:
hemi = "N" if deg >= 0 else "S"
deg_width = 2
else:
hemi = "E" if deg >= 0 else "W"
deg_width = 3
deg = abs(deg)
d = int(deg)
m = (deg - d) * 60.0
return f"{d:0{deg_width}d}{m:07.4f}", hemi
@classmethod
def build_gga_sentence(cls, lat_deg: float, lon_deg: float, alt_m: float = 100.0) -> bytes:
"""
組一筆 $GPGGA 句子 checksum) 回傳 bytes 可直接 sock.sendall
lat_deg / lon_deg : 十進位經緯度(東為正)
測試時請改成 mount 服務範圍內的近似位置正式使用應來自 GNSS 真實定位
"""
utc = time.gmtime()
t_str = f"{utc.tm_hour:02d}{utc.tm_min:02d}{utc.tm_sec:02d}.00"
lat_dm, ns = cls.decimal_to_nmea_dm(lat_deg, is_latitude=True)
lon_dm, ew = cls.decimal_to_nmea_dm(lon_deg, is_latitude=False)
# quality=1GPS fix、8 顆星、HDOP=1.0 僅供測試示意
body = (
f"GPGGA,{t_str},{lat_dm},{ns},{lon_dm},{ew},"
f"1,08,1.0,{alt_m:.1f},M,0.0,M,,"
)
sentence = f"${body}*{cls.nmea_checksum(body)}\r\n"
return sentence.encode("ascii")
@classmethod
def send_gga(cls, sock: socket.socket, lat_deg: float, lon_deg: float, alt_m: float = 100.0) -> str:
"""送出 GGA回傳可讀句子供列印。"""
payload = cls.build_gga_sentence(lat_deg, lon_deg, alt_m)
sock.sendall(payload)
return payload.decode("ascii").strip()
class NtripClientNode(Node):
"""連線 NTRIP caster, 接收 RTCM3 資料流並發布至 ROS2 topic """
"""連線 NTRIP caster接收 RTCM3 資料流並發布至 ROS2 topic。"""
RECONNECT_BASE_SEC = 10.0
RECONNECT_BASE_SEC = 2.0
RECONNECT_MAX_SEC = 60.0
def __init__(self):
@ -76,15 +21,9 @@ class NtripClientNode(Node):
self.declare_parameter('host', 'rtk2go.com')
self.declare_parameter('port', 2101)
self.declare_parameter('mountpoint', '')
self.declare_parameter('username', 'template@mail.com')
self.declare_parameter('username', '')
self.declare_parameter('password', '')
self.declare_parameter('gga_on', False)
self.declare_parameter('gga_lat', None)
self.declare_parameter('gga_lon', None)
self.declare_parameter('gga_alt_m', 100)
self.declare_parameter('gga_interval_sec', 60)
rtcm_qos = QoSProfile(
history=HistoryPolicy.KEEP_LAST,
depth=1,
@ -93,9 +32,7 @@ class NtripClientNode(Node):
)
self.publisher_ = self.create_publisher(RTCM, '/fc_network/rtcm_input', rtcm_qos)
self._stop_event = threading.Event()
self._sock_lock = threading.Lock()
self._active_sock: Optional[socket.socket] = None
self._running = True
self._thread = threading.Thread(target=self._receive_loop, daemon=True)
self._thread.start()
@ -103,50 +40,31 @@ class NtripClientNode(Node):
# ── NTRIP 連線 + RTCM 接收迴圈 ──────────────────────────────
def _request_stop(self):
self._stop_event.set()
with self._sock_lock:
if self._active_sock is not None:
try:
self._active_sock.close()
except OSError:
pass
def _receive_loop(self):
backoff = self.RECONNECT_BASE_SEC
while self._running:
host = self.get_parameter('host').value
port = self.get_parameter('port').value
mountpoint = self.get_parameter('mountpoint').value
mount = self.get_parameter('mountpoint').value
user = self.get_parameter('username').value
pwd = self.get_parameter('password').value
if not mountpoint:
self.get_logger().error('mountpoint 參數未設定...')
return
while not self._stop_event.is_set():
if not mount:
self.get_logger().error('mountpoint 參數未設定,等待重試...')
time.sleep(backoff)
continue
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
with self._sock_lock:
self._active_sock = sock
self.gga_lat = self.get_parameter('gga_lat').value
self.gga_lon = self.get_parameter('gga_lon').value
self.gga_alt_m = self.get_parameter('gga_alt_m').value
self.gga_interval_sec = self.get_parameter('gga_interval_sec').value
self.gga_on = self.get_parameter('gga_on').value
self.gga_on = self.gga_lat is not None and self.gga_lon is not None and self.gga_on
try:
self.get_logger().info(f'正在連線 {host}:{port}/{mountpoint} ...')
self.get_logger().info(f'正在連線 {host}:{port}/{mount} ...')
sock.connect((host, port))
auth = base64.b64encode(f'{user}:{pwd}'.encode()).decode()
request = (
f'GET /{mountpoint} HTTP/1.0\r\n'
f'GET /{mount} HTTP/1.0\r\n'
f'User-Agent: NTRIP PythonClient\r\n'
f'Authorization: Basic {auth}\r\n'
f'Connection: close\r\n\r\n'
@ -160,54 +78,29 @@ class NtripClientNode(Node):
)
raise ConnectionError('handshake failed')
self.get_logger().info(f'已成功連接至 {mountpoint}')
self.get_logger().info(f'已連接至 {mount},開始接收 RTCM 資料流')
backoff = self.RECONNECT_BASE_SEC
self._read_stream(sock)
except Exception as e:
if self._stop_event.is_set():
break
if self._running:
self.get_logger().warn(f'連線中斷: {e}{backoff:.0f}s 後重連')
if self._stop_event.wait(timeout=backoff):
break
time.sleep(backoff)
backoff = min(backoff * 2, self.RECONNECT_MAX_SEC)
finally:
with self._sock_lock:
if self._active_sock is sock:
self._active_sock = None
try:
sock.close()
except OSError:
pass
def _read_stream(self, sock: socket.socket):
buf = b''
last_gga_time = 0.0
while not self._stop_event.is_set():
now = time.time()
if self.gga_on and (now - last_gga_time >= self.gga_interval_sec):
line = GGA_stream.send_gga(sock, self.gga_lat, self.gga_lon, self.gga_alt_m)
self.get_logger().info(f"[GGA] {line}")
last_gga_time = now
while self._running:
try:
chunk = sock.recv(4096)
except socket.timeout:
if self._stop_event.is_set():
break
continue
except OSError:
if self._stop_event.is_set():
break
raise
if not chunk:
if self._stop_event.is_set():
break
raise ConnectionError("stream ended. Empty Chunk")
buf += chunk
while len(buf) >= 6:
@ -233,11 +126,8 @@ class NtripClientNode(Node):
self.publisher_.publish(msg)
def destroy_node(self):
self._request_stop()
if self._thread.is_alive():
self._thread.join(timeout=5.0)
if self._thread.is_alive():
self.get_logger().warn('receive thread 未在時限內結束')
self._running = False
self._thread.join(timeout=3.0)
super().destroy_node()

@ -8,8 +8,7 @@
RTK
跟 RTK2go 抓取列表 Done
從特定 mount point 得到數據 Done
做一個 ros2 service 接到數據並包裝 Done
RTK 實機測試
做一個 ros2 service 接到數據並包裝
下一步
@ -35,11 +34,7 @@ python -m fc_network_module.fc_network_module.ntrip_client --ros-args -p mountpo
python -m fc_network_module.fc_network_module.ntrip_client --ros-args -p host:=210.241.63.193 -p port:=81 -p mountpoint:=2020_GNSS -p username:=uavlab6061 -p password:=iamsupersmart
python3 -m fc_network_module.fc_network_module.ntrip_client --ros-args -p host:=210.241.63.193 -p port:=81 -p mountpoint:=2020_GNSS -p username:=uavlab6061 -p password:=iamsupersmart -p gga_on:=True -p gga_lat:=24.155792000 -p gga_lon:=120.630679000 -p gga_interval_sec:=60
watch GPS_RAW_INT
export ROS_DOMAIN_ID=1
export ROS_DOMAIN_ID=13
ros2 daemon stop
ros2 daemon start

Loading…
Cancel
Save