Compare commits

...

5 Commits

@ -1,7 +1,5 @@
這是天雷系統的專案 這是天雷系統的專案
=== ===
## 功能簡介 ## 功能簡介
1. mavlink 多對多支援平台 1. mavlink 多對多支援平台

@ -29,7 +29,7 @@ from .utils import acquireSerial, acquirePort
from .utils.acquirePort import find_available_port from .utils.acquirePort import find_available_port
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
PROJECT_VER = "v0.61" PROJECT_VER = "v0.70"
class PanelState: class PanelState:
def __init__(self): def __init__(self):
@ -45,7 +45,7 @@ class PanelState:
# 關於創建通道時的暫存資訊 # 關於創建通道時的暫存資訊
self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊 self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊
self.serial_info_temp = {"Port": "", "Baud": 115200, "CommunicationType": "", "Go2Middleware": False} # 暫存 Serial 設定資訊 self.serial_info_temp = {"Port": "", "Baud": 115200, "CommunicationType": "", "Go2Middleware": True} # 暫存 Serial 設定資訊
# 關於顯示通道資訊 # 關於顯示通道資訊
self.socket_info_single = { self.socket_info_single = {
@ -426,7 +426,7 @@ class ControlPanel:
menu_stack.pop() menu_stack.pop()
idx_stack.pop() idx_stack.pop()
elif selected.action == "SET_SERIAL_COMM_TELEMETRY": elif selected.action == "SET_SERIAL_COMM_TELEMETRY":
state.serial_info_temp["CommunicationType"] = "XBee(AT-AT)" state.serial_info_temp["CommunicationType"] = "TELEMETRY"
menu_stack.pop() menu_stack.pop()
idx_stack.pop() idx_stack.pop()
@ -767,7 +767,7 @@ class ControlPanel:
port_menu = MenuNode(f"{port}", children=[ port_menu = MenuNode(f"{port}", children=[
MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[ MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[
MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"), MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"),
# MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"), MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"),
]), ]),
MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"), MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"),
MenuNode("Link to MW", "直接建立 Middleware UDP", "LINK_SERIAL_TO_MIDDLEWARE_UDP", children=[ MenuNode("Link to MW", "直接建立 Middleware UDP", "LINK_SERIAL_TO_MIDDLEWARE_UDP", children=[
@ -1058,11 +1058,15 @@ class ControlPanel:
# MAVLink 訊息名稱對應(縮寫版本) # MAVLink 訊息名稱對應(縮寫版本)
MSG_NAMES = { MSG_NAMES = {
0: "HB", 1: "SYS_ST", 24: "GPS_RAW", 27: "RAW_IMU", 0: "HB", 1: "S_STAT", 2: "S_TIME", 24: "GPS_RAW", 27: "RAW_IMU",
30: "ATT", 32: "LOC_POS", 33: "GLB_POS", 62: "NAV_CTL", 30: "ATT", 32: "LOC_POS", 33: "GLB_POS", 62: "NAV_CTL",
74: "VFR_HUD", 147: "BATT_ST" 74: "VFR_HUD", 147: "BATT_ST", 136: "TERRAIN", 241: "VIBRAT",
125: "POW_STA",
} }
# ardupilot mega
# 178: "AHRS2", 163: "AHRS", 193: "EKF_STAT",
# 顯示前 12 個最常見的訊息類型(兩列各 6 個) # 顯示前 12 個最常見的訊息類型(兩列各 6 個)
msg_items = list(msg_counts.items())[:12] msg_items = list(msg_counts.items())[:12]
line = 13 line = 13
@ -1134,6 +1138,7 @@ class Orchestrator:
self.bridge = mo.mavlink_bridge() self.bridge = mo.mavlink_bridge()
# === 3) serial_manager 部分的準備 === # === 3) serial_manager 部分的準備 ===
sm.rx_module_ack.clear()
self.plumber = sm.serial_manager() self.plumber = sm.serial_manager()
# === 4) ros 部分的準備 === # === 4) ros 部分的準備 ===
@ -1243,7 +1248,7 @@ class Orchestrator:
if serial_obj: if serial_obj:
self.panelState.serial_info_single["serial_port"] = serial_obj.serial_port self.panelState.serial_info_single["serial_port"] = serial_obj.serial_port
self.panelState.serial_info_single["baudrate"] = serial_obj.baudrate self.panelState.serial_info_single["baudrate"] = serial_obj.baudrate
self.panelState.serial_info_single["receiver_type"] = serial_obj.receiver_type.name self.panelState.serial_info_single["receiver_type"] = serial_obj.serial_mode.name
self.panelState.serial_info_single["target_port"] = serial_obj.target_port self.panelState.serial_info_single["target_port"] = serial_obj.target_port
self.panelState.serial_info_single["InfoReady"] = True # 標記資訊已準備好 self.panelState.serial_info_single["InfoReady"] = True # 標記資訊已準備好
elif action == "REMOVE_LINKED_SERIAL": elif action == "REMOVE_LINKED_SERIAL":
@ -1516,8 +1521,8 @@ class Orchestrator:
# 定義通訊類型映射表 # 定義通訊類型映射表
COMM_TYPE_MAP = { COMM_TYPE_MAP = {
"XBee(API-AT)": sm.SerialReceiverType.XBEEAPI2AT, "XBee(API-AT)": sm.SerialMode.XBEEAPI2AT,
# "XBee(AT-AT)": sm.SerialReceiverType.TELEMETRY, # TODO: 之後再弄 "TELEMETRY": sm.SerialMode.STRAIGHT,
# 新增區 # 新增區
} }
@ -1541,7 +1546,7 @@ class Orchestrator:
serial_port=self.panelState.serial_info_temp['Port'], serial_port=self.panelState.serial_info_temp['Port'],
baudrate=self.panelState.serial_info_temp['Baud'], baudrate=self.panelState.serial_info_temp['Baud'],
target_port=udp_port_tmp, target_port=udp_port_tmp,
receiver_type=comm_type_tmp, serial_mode=comm_type_tmp,
) )
if not ret: if not ret:
@ -1572,7 +1577,7 @@ def main():
if mvv.MODULE_VER != "1.00": if mvv.MODULE_VER != "1.00":
print("Module Version Error! : mavlinkVehicleView") print("Module Version Error! : mavlinkVehicleView")
version_check = False version_check = False
if sm.MODULE_VER != "0.60": if sm.MODULE_VER != "0.80":
print("Module Version Error! : serialManager") print("Module Version Error! : serialManager")
version_check = False version_check = False
if version_check == False: if version_check == False:

@ -230,6 +230,7 @@ class mavlink_bridge:
'vx': msg.vx, 'vy': msg.vy, 'vz': msg.vz 'vx': msg.vx, 'vy': msg.vy, 'vz': msg.vz
} }
def _handle_global_position(self, vehicle, component, msg, timestamp): def _handle_global_position(self, vehicle, component, msg, timestamp):
"""處理 GLOBAL_POSITION_INT 訊息 (msg_id: 33)""" """處理 GLOBAL_POSITION_INT 訊息 (msg_id: 33)"""
component.status.position.latitude = msg.lat / 1e7 # 轉換為度 component.status.position.latitude = msg.lat / 1e7 # 轉換為度

@ -210,7 +210,9 @@ class VehicleStatusPublisher(Node):
if pos.latitude is None or pos.longitude is None: if pos.latitude is None or pos.longitude is None:
return return
publisher = self._get_or_create_publisher(sysid, 'position', sensor_msgs.msg.NavSatFix) publisher = self._get_or_create_publisher(
sysid, 'position', sensor_msgs.msg.NavSatFix
)
# 檢查是否有訂閱者 # 檢查是否有訂閱者
if publisher.get_subscription_count() == 0: if publisher.get_subscription_count() == 0:
@ -243,7 +245,7 @@ class VehicleStatusPublisher(Node):
publisher = self._get_or_create_publisher( publisher = self._get_or_create_publisher(
sysid, 'position_ned', nav_msgs.msg.Odometry sysid, 'position_ned', nav_msgs.msg.Odometry
) )
if publisher.get_subscription_count() == 0: if publisher.get_subscription_count() == 0:
return return

@ -16,19 +16,22 @@ import threading
import struct import struct
from enum import Enum, auto from enum import Enum, auto
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass
# # XBee 模組 # # XBee 模組
# from xbee.frame import APIFrame # from xbee.frame import APIFrame
# 自定義的 import # 自定義的 import
from .utils import setup_logger from .utils import RingBuffer, setup_logger
# ====================== 分割線 ===================== # ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "0.60" MODULE_VER = "0.80"
# ====================== 分割線 ===================== rx_module_ack = RingBuffer(capacity=64, buffer_id=253)
# ====================== State DEFINITION =====================
# 定義 serial 連線的模式 # 定義 serial 連線的模式
class SerialMode(Enum): class SerialMode(Enum):
@ -38,7 +41,29 @@ class SerialMode(Enum):
NOT_USE = auto() # 不使用 NOT_USE = auto() # 不使用
# ====================== Frame Processor 基類與實現 ===================== # ====================== AT Frame Data Classes =====================
@dataclass
class ATRequest:
"""送往 dongle 的 AT 指令"""
command: bytes # 例如 b'DB'
parameter: bytes # 寫入用指令的參數,讀取型通常為空
frame_id: int # XBee frame id0x00 表示不要求回應
@dataclass
class ATResponse:
"""dongle 回傳的 AT Response (0x88)"""
frame_id: int
command: bytes
status: int
data: bytes
@property
def is_ok(self) -> bool:
return self.status == 0x00
# ====================== Frame Processor Base and Implementation =====================
class FrameProcessor(ABC): class FrameProcessor(ABC):
"""協議處理器基類""" """協議處理器基類"""
@ -47,10 +72,10 @@ class FrameProcessor(ABC):
self.buffer = bytearray() self.buffer = bytearray()
@abstractmethod @abstractmethod
def process_incoming(self, data: bytes): def process_incoming(self, data: bytes) -> bytes:
""" """
處理接收到的數據 處理接收到的數據
返回已完整解析的 payload 列表 返回已完整解析的 payload 列表 後續要丟到 UDP
""" """
pass pass
@ -66,7 +91,7 @@ class FrameProcessor(ABC):
class RawFrameProcessor(FrameProcessor): class RawFrameProcessor(FrameProcessor):
"""原始數據直通處理器""" """原始數據直通處理器"""
def process_incoming(self, data: bytes): def process_incoming(self, data: bytes) -> bytes:
"""直接返回原始數據,不進行緩衝""" """直接返回原始數據,不進行緩衝"""
return [data] if data else [] return [data] if data else []
@ -76,110 +101,108 @@ class RawFrameProcessor(FrameProcessor):
class XBeeFrameProcessor(FrameProcessor): class XBeeFrameProcessor(FrameProcessor):
"""XBee API 協議處理器""" """
XBee API 協議處理器
def process_incoming(self, data: bytes):
職責
- XBee API frame 的拆幀 / 組幀
- 0x90 (RX Packet) -> 解出 payload 回傳
- 0x88 (AT Response) -> 轉交 at_handler 處理若有注入
- 0x8B (TX Status) -> 目前忽略
- 其他 frame type -> warning 忽略
若未來要做變化型 XBee例如 API2 escape mode不同 addressing
繼承此類並覆寫 _encapsulate / _decapsulate / _try_extract_frame 即可
"""
# XBee API frame type
FRAME_TYPE_TX_REQUEST = 0x10
FRAME_TYPE_AT_RESPONSE = 0x88
FRAME_TYPE_TX_STATUS = 0x8B
FRAME_TYPE_RX_PACKET = 0x90
def __init__(self, at_handler: "ATCommandHandler" = None):
super().__init__()
self.at_handler = at_handler
# ---- 對外契約 ----
def process_incoming(self, data: bytes) -> bytes:
"""處理 XBee API 幀並提取 payload""" """處理 XBee API 幀並提取 payload"""
self.buffer.extend(data) self.buffer.extend(data)
payloads = [] payloads = []
while True:
frame = self._try_extract_frame()
if frame is None:
break
payload = self._dispatch_frame(frame)
if payload:
payloads.append(payload)
return payloads
def process_outgoing(self, data: bytes) -> bytes:
"""將數據封裝為 XBee API 傳輸幀"""
return self._encapsulate(data)
# ---- 內部:拆幀與分派 ----
def _try_extract_frame(self) -> bytes:
"""
buffer 嘗試抓一個完整 frame
- 對齊幀頭 0x7E
- 長度不足時等待下次進 buffer
- 成功則從 buffer 切掉並回傳整個 frame bytes
"""
while len(self.buffer) >= 3: while len(self.buffer) >= 3:
# 尋找幀頭
if self.buffer[0] != 0x7E: if self.buffer[0] != 0x7E:
self.buffer.pop(0) self.buffer.pop(0)
continue continue
# 讀取 payload 長度
length = (self.buffer[1] << 8) | self.buffer[2] length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1 # 起始符(1) + 長度(2) + payload + 校驗和(1) full_length = 3 + length + 1 # 起始符(1) + 長度(2) + payload + 校驗和(1)
# 等待完整幀
if len(self.buffer) < full_length: if len(self.buffer) < full_length:
break return None
# 提取完整 frame 並從緩衝區移除 # TODO: 驗證 checksum(sum(frame[3:]) & 0xFF) 應為 0xFF不通過則丟棄此 frame 並從 buffer pop 1 byte 再重新對齊
frame = bytes(self.buffer[:full_length]) frame = bytes(self.buffer[:full_length])
del self.buffer[:full_length] del self.buffer[:full_length]
return frame
# 判斷 frame 類型並處理
frame_type = frame[3]
if frame_type == 0x90: # RX Packet
payload = XBeeFrameHandler.decapsulate_data(frame)
if payload:
payloads.append(payload)
elif frame_type == 0x88: # AT Response
# 可以在這裡處理 AT 指令回應
# response = XBeeFrameHandler.parse_at_command_response(frame)
# 目前忽略
pass
elif frame_type == 0x8B: # Transmit Status
# 傳輸狀態,目前忽略
pass
else:
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
return payloads
def process_outgoing(self, data: bytes) -> bytes:
"""將數據封裝為 XBee API 傳輸幀"""
return XBeeFrameProcessor.encapsulate_data(data)
return None
# ====================== XBee Frame Handler ===================== def _dispatch_frame(self, frame: bytes) -> bytes:
"""根據 frame type 分派;若是 RX payload 回傳 bytes其餘回傳 None"""
frame_type = frame[3]
class XBeeFrameHandler: if frame_type == self.FRAME_TYPE_RX_PACKET:
"""XBee API Frame 處理器""" return self._decapsulate(frame)
@staticmethod if frame_type == self.FRAME_TYPE_AT_RESPONSE:
def parse_at_command_response(frame: bytes) -> dict: if self.at_handler is not None:
"""解析 AT Command Response (0x88)""" self.at_handler.handle_frame(frame)
if len(frame) < 8:
return None return None
frame_type = frame[3] if frame_type == self.FRAME_TYPE_TX_STATUS:
if frame_type != 0x88:
return None return None
frame_id = frame[4] logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
at_command = frame[5:7]
status = frame[7]
data = frame[8:] if len(frame) > 8 else b''
return {
'frame_id': frame_id,
'command': at_command,
'status': status,
'data': data,
'is_ok': status == 0x00
}
@staticmethod
def parse_receive_packet(frame: bytes) -> dict:
"""解析 RX Packet (0x90) - 未來擴展用"""
# if len(frame) < 15 or frame[3] != 0x90:
# return None
# return {
# 'source_addr': frame[4:12],
# 'reserved': frame[12:14],
# 'options': frame[14],
# 'data': frame[15:-1]
# }
pass
return None return None
# ---- 內部:編解碼 ----
@staticmethod @staticmethod
def encapsulate_data(data: bytes, dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', frame_id = 0x01) -> bytes: def _encapsulate(
data: bytes,
dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF',
frame_id: int = 0x01,
) -> bytes:
""" """
將數據封裝為 XBee API 傳輸幀 payload 包成 XBee TX Request (0x10)
使用 XBee API 格式封裝數據:
- 傳輸請求幀 (0x10)
- 使用廣播地址 - 使用廣播地址
- 添加適當的頭部和校驗和 - 添加適當的頭部和校驗和
""" """
frame_type = 0x10 frame_type = XBeeFrameProcessor.FRAME_TYPE_TX_REQUEST
dest_addr16 = b'\xFF\xFE' dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00 broadcast_radius = 0x00
options = 0x00 options = 0x00
@ -191,89 +214,173 @@ class XBeeFrameHandler:
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
@staticmethod @staticmethod
def decapsulate_data(data: bytes): def _decapsulate(frame: bytes) -> bytes:
"""從 RX Packet (0x90) 取出 payload"""
# 獲取數據長度 (不包括校驗和) length = (frame[1] << 8) | frame[2]
length = (data[1] << 8) | data[2]
rf_data_start = 3 + 12 rf_data_start = 3 + 12
return data[rf_data_start:3 + length] return frame[rf_data_start:3 + length]
# ====================== Dongle Command Handler =====================
class ATCommandHandler: class ATCommandHandler:
"""AT 指令回應處理器""" """
XBee AT 指令處理器
職責
- AT Command Request frame (0x08) 並透過注入的 writer 送出
- 解析 AT Response frame (0x88)
- AT 指令分派到對應的 handler
writer SerialHandler 在連線建立後 (connection_made) 注入
型別為 Callable[[bytes], None]通常就是 serial transport.write
"""
FRAME_TYPE_AT_COMMAND = 0x08
def __init__(self, serial_port: str): def __init__(self, serial_port: str):
self.serial_port = serial_port self.serial_port = serial_port
self.writer = None
self.handlers = { self.handlers = {
b'DB': self._handle_rssi, b'DB': self._handle_rssi,
b'SH': self._handle_serial_high, b'SH': self._handle_serial_high,
b'SL': self._handle_serial_low, b'SL': self._handle_serial_low,
# 可擴展其他 AT 指令 # 可擴展其他 AT 指令
} }
def handle_response(self, response: dict): # ---- 發送端 ----
def set_writer(self, writer):
"""由 SerialHandler 注入實際寫入 serial 的 callable"""
self.writer = writer
def send_command(self, request: ATRequest):
"""發送一筆 AT 指令給 dongle"""
if self.writer is None:
logger.warning(
f"[{self.serial_port}] AT writer 尚未就緒,指令 {request.command!r} 丟棄"
)
return
self.writer(self._build_at_request(request))
# logger.debug(
# f"[{self.serial_port}] send AT {request.command.decode(errors='replace')} "
# f"(frame_id=0x{request.frame_id:02X})"
# ) # dev
@staticmethod
def _build_at_request(request: ATRequest) -> bytes:
"""將 ATRequest 組成 XBee API AT Command Request frame (0x08) bytes"""
frame_type = ATCommandHandler.FRAME_TYPE_AT_COMMAND
frame = struct.pack(">B", frame_type) + struct.pack(">B", request.frame_id)
frame += request.command + request.parameter
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
# ---- 接收端 ----
def handle_frame(self, frame: bytes) -> None:
"""
接收一整個 AT Response frame
1. 解析成 ATResponse
2. 推進 rx_module_ack 供其他模組消費
3. 本地 dispatch 給對應的 _handle_xxx
"""
parsed = self._parse(frame)
if parsed is None:
return
if not rx_module_ack.put(parsed):
logger.warning(
f"[{self.serial_port}] rx_module_ack overflow, drop {parsed.command!r}"
)
self._dispatch(parsed)
@staticmethod
def _parse(frame: bytes) -> ATResponse:
"""解析 AT Command Response (0x88);不符格式回傳 None"""
if len(frame) < 8:
return None
if frame[3] != 0x88:
return None
return ATResponse(
frame_id=frame[4],
command=frame[5:7],
status=frame[7],
data=frame[8:] if len(frame) > 8 else b'',
)
def _dispatch(self, response: ATResponse) -> None:
"""根據 AT 指令類型分派處理""" """根據 AT 指令類型分派處理"""
if not response or not response['is_ok']: # print(f"[{self.serial_port}] AT Response: {response}") # dev
if response:
logger.warning(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}") if not response.is_ok:
logger.warning(
f"[{self.serial_port}] AT {response.command.decode()} "
f"失敗,狀態碼: {response.status}"
)
return return
command = response['command'] handler = self.handlers.get(response.command)
handler = self.handlers.get(command)
if handler: if handler:
handler(response['data']) handler(response.data)
else: else:
logger.debug(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}") logger.debug(
f"[{self.serial_port}] 未處理的 AT 指令: "
f"{response.command.decode()}"
)
def _handle_rssi(self, data: bytes): def _handle_rssi(self, data: bytes):
"""處理 DB (RSSI) 回應""" """處理 DB (RSSI) 回應:單 byte 無號值,單位 dBm"""
# 未來可實現 RSSI 處理邏輯
pass pass
# if data:
# print(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev
# logger.debug(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev
def _handle_serial_high(self, data: bytes): def _handle_serial_high(self, data: bytes):
"""處理 SH (Serial Number High)""" """處理 SH (Serial Number High)"""
pass pass
def _handle_serial_low(self, data: bytes): def _handle_serial_low(self, data: bytes):
"""處理 SL (Serial Number Low)""" """處理 SL (Serial Number Low)"""
pass pass
# ====================== Serial Handler ===================== # ================ Serial UDP Socket Object ==============
class SerialHandler(asyncio.Protocol): class SerialHandler(asyncio.Protocol):
"""asyncio.Protocol 用於處理 Serial 收發""" """asyncio.Protocol 用於處理 Serial 收發"""
def __init__(self, udp_handler, serial_port_str, serial_mode: SerialMode): def __init__(self, udp_handler, serial_port_str, serial_mode: SerialMode):
self.udp_handler = udp_handler # UDP 的傳輸物件 self.udp_handler = udp_handler # UDP 的傳輸物件
self.serial_port_str = serial_port_str self.serial_port_str = serial_port_str
self.serial_mode = serial_mode self.serial_mode = serial_mode
self.transport = None # Serial 自己的傳輸物件 self.transport = None # Serial 自己的傳輸物件
# 根據模式創建對應的 processor
self.processor = self._create_processor(serial_mode)
# AT 指令處理器(僅 XBee 模式使用)
if serial_mode == SerialMode.XBEEAPI2AT:
self.at_handler = ATCommandHandler(serial_port_str)
else:
self.at_handler = None
def _create_processor(self, serial_mode: SerialMode) -> FrameProcessor: # 根據模式建立 processor需要 AT handler 時一併在工廠內建好注入)
"""工廠方法:根據模式創建處理器""" self.processor = self._create_processor()
if serial_mode == SerialMode.STRAIGHT:
return RawFrameProcessor() def _create_processor(self) -> FrameProcessor:
elif serial_mode == SerialMode.XBEEAPI2AT: """工廠方法:根據 self.serial_mode 建立對應的 processor必要時一併注入 AT handler"""
return XBeeFrameProcessor() if self.serial_mode == SerialMode.STRAIGHT:
else:
logger.warning(f"Unknown serial mode: {serial_mode}, using Raw")
return RawFrameProcessor() return RawFrameProcessor()
if self.serial_mode == SerialMode.XBEEAPI2AT:
at_handler = ATCommandHandler(self.serial_port_str)
return XBeeFrameProcessor(at_handler=at_handler)
logger.warning(f"Unknown serial mode: {self.serial_mode}, using Raw")
return RawFrameProcessor()
def connection_made(self, transport): def connection_made(self, transport):
"""連接建立時的回調""" """連接建立時的回調"""
self.transport = transport self.transport = transport
# XBee 模式下,將 transport.write 注入給 AT handler讓它能送指令
if self.serial_mode == SerialMode.XBEEAPI2AT:
self.processor.at_handler.set_writer(self.transport.write)
if hasattr(self.udp_handler, 'set_serial_handler'): if hasattr(self.udp_handler, 'set_serial_handler'):
self.udp_handler.set_serial_handler(self) self.udp_handler.set_serial_handler(self)
logger.debug(f"Serial port {self.serial_port_str} connected") logger.debug(f"Serial port {self.serial_port_str} connected")
@ -291,8 +398,6 @@ class SerialHandler(asyncio.Protocol):
) )
# ====================== UDP Handler =====================
class UDPHandler(asyncio.DatagramProtocol): class UDPHandler(asyncio.DatagramProtocol):
"""asyncio.DatagramProtocol 用於處理 UDP 收發""" """asyncio.DatagramProtocol 用於處理 UDP 收發"""
@ -561,6 +666,33 @@ class serial_manager:
ret[key] = obj.serial_port ret[key] = obj.serial_port
return ret return ret
def send_at_command(self, serial_id, request: ATRequest) -> bool:
"""
對指定 serial_id XBee dongle 發送一筆 AT 指令thread-safe
- serial_id: create_serial_link 取得的編號
- request: ATRequest 物件攜帶 command / parameter / frame_id
回傳是否成功排進事件圈
"""
if not self.running or not self.loop:
logger.error("Event loop not running, cannot send AT command")
return False
if serial_id not in self.serial_objects:
logger.error(f"Serial object {serial_id} not found")
return False
serial_obj = self.serial_objects[serial_id]
if serial_obj.serial_mode != SerialMode.XBEEAPI2AT:
logger.error(
f"Serial {serial_id} mode is {serial_obj.serial_mode.name}, "
f"AT command only supported in XBEEAPI2AT mode"
)
return False
at_handler = serial_obj.serial_handler.processor.at_handler
self.loop.call_soon_threadsafe(at_handler.send_command, request)
return True
@staticmethod @staticmethod
def check_serial_port(serial_port, baudrate): def check_serial_port(serial_port, baudrate):
"""檢查串口是否存在與可用""" """檢查串口是否存在與可用"""
@ -598,20 +730,42 @@ if __name__ == '__main__':
sm = serial_manager() sm = serial_manager()
sm.start() sm.start()
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14571
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
# SERIAL_PORT = '/dev/ttyACM0' # 手動指定 # SERIAL_PORT = '/dev/ttyACM0' # 手動指定
# SERIAL_BAUDRATE = 115200 # SERIAL_BAUDRATE = 115200
# UDP_REMOTE_PORT = 14571 # UDP_REMOTE_PORT = 14571
# sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT) # sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT)
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14561
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
linked_serial = sm.get_serial_link() linked_serial = sm.get_serial_link()
print(linked_serial) print(linked_serial)
time.sleep(60)
# 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試
time.sleep(5)
rssi_request = ATRequest(command=b'DB', parameter=b'', frame_id=0x52)
for i in range(60):
sm.send_at_command(1, rssi_request)
time.sleep(1)
sm.remove_serial_link(1) sm.remove_serial_link(1)
time.sleep(3) time.sleep(3)
sm.shutdown() sm.shutdown()
'''
================= 改版記錄 ============================
2026.4.20
1. XBeeFrameHandler 結構移除
2. XBeeFrameProcessor 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler )
3. XBeeFrameProcessor 新增 _try_extract_frame 處理被可能截斷的 UART 封包
4. XBeeFrameProcessor 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler
5. ATCommandHandler 新增 _parse 去拆解 0x88 AT Command Response
6. ATCommandHandler 新增 _dispatch 把拆解的結果 分配到 _handle_XXX
7. ATCommandHandler 新增各項 _handle_XXX (未實作)
'''

@ -1,45 +0,0 @@
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
# import mavros_msgs.srv
class TalkerNode(Node):
def __init__(self):
start_time = time.time()
super().__init__('talker_node')
end_time = time.time()
print(f"Node initialization took {end_time - start_time:.2f} seconds")
self.publisher_ = self.create_publisher(String, 'hahatest/_1', 10)
self.timer = self.create_timer(1.0, self.timer_callback) # 每秒執行一次
self.get_logger().info('TalkerNode has been started.')
def timer_callback(self):
msg = String()
msg.data = 'Hello, ROS 2!'
self.publisher_.publish(msg)
self.get_logger().info(f'Published: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
node = TalkerNode()
print("Before sleep")
time.sleep(5) # 等待 5 秒鐘
print("After sleep")
try:
start_time = time.time()
while time.time() - start_time < 10: # 持續 10 秒鐘
rclpy.spin_once(node)
time.sleep(1) # 每秒執行一次
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

@ -0,0 +1,188 @@
import time
from pymavlink import mavutil
# # 強制 source_system=255
# master = mavutil.mavlink_connection('udpout:localhost:14561', source_system=255)
# print("Starting Force-Ping Test...")
# while True:
# # 1. 發送 Heartbeat (證明連線活著)
# master.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS,
# mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0)
# # 2. 強制發送 PING (target 1, 1)
# # 我們加一個計數器,方便在 tcpdump 裡辨識
# ping_seq = int(time.time()) % 100
# master.mav.ping_send(int(time.time() * 1e6), ping_seq, 1, 1)
# print(f"Sent Heartbeat and PING (seq {ping_seq})")
# # 3. 非阻塞讀取,避免卡死
# msg = master.recv_match(blocking=False)
# if msg:
# print(f"--> Received something: {msg.get_type()}")
# time.sleep(1) # 每秒噴一次
# -------------------------------------------------------------------
# # 改成 udpin這會讓你的程式守在 14561 等模擬器送資料進來
# # 這樣你就能收到模擬器的 Heartbeat建立連線後再發 PING
# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255)
# print("Waiting for simulator heartbeat...")
# # 這一行很重要:先收到 Heartbeatpymavlink 才會設定好 target 地址
# master.wait_heartbeat()
# print(f"Heartbeat from system {master.target_system} component {master.target_component}")
# while True:
# # 既然收到了,現在發 PING 就會順著原路回去
# master.mav.ping_send(int(time.time() * 1e6), 1, master.target_system, master.target_component)
# print("PING sent!")
# msg = master.recv_match(type='PING', blocking=True, timeout=2)
# if msg:
# print(f"Got PING response: {msg}")
# time.sleep(1)
# -------------------------------------------------------------------
# import time
# from pymavlink import mavutil
# # 核心修改:因為 MAVProxy 正在往 14561 噴資料,我們必須「接住」它
# # 使用 udpin 會讓你的程式在 14561 監聽
# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=254)
# print("正在等待 MAVProxy 轉發的 Heartbeat...")
# # 只要這行跑過,代表你跟 MAVProxy 握手成功了
# master.wait_heartbeat()
# print(f"成功連線到系統 {master.target_system}")
# while True:
# # 1. 務必發送 Heartbeat讓 MAVProxy 知道要把回應送回這個 Port
# master.mav.heartbeat_send(
# mavutil.mavlink.MAV_TYPE_GCS,
# mavutil.mavlink.MAV_AUTOPILOT_INVALID,
# 0, 0, 0
# )
# # 2. 修改 target_component 為 1 (飛控核心)
# # 注意:這裡刻意將 seq 設為動態,方便你在 watch PING 裡觀察
# p_seq = int(time.time()) % 255
# master.mav.ping_send(
# int(time.time() * 1e6),
# p_seq,
# 1,
# 1 # <--- 改成 1 試試看!
# )
# print(f"Sent PING seq {p_seq} to 1/1")
# # 3. 讀取所有訊息,看看有沒有 PING 回來
# # 有時候回應的 target_system 可能是 255 (你的 ID)
# msg = master.recv_match(type='PING', blocking=True, timeout=1.0)
# if msg:
# print(f"🎉 成功!收到回應: {msg}")
# time.sleep(1)
# -------------------------------------------------------------------
# import time
# from pymavlink import mavutil
# # 建議一樣用 udpin 監聽 MAVProxy 的輸出
# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255)
# def get_time_ns():
# return int(time.time() * 1e9) # 轉為奈秒
# print("開始測試 TIMESYNC 響應...")
# while True:
# now_ns = get_time_ns()
# # 發送 TIMESYNC 請求
# # tc1=0 代表這是請求ts1=當前時間
# master.mav.timesync_send(0, now_ns)
# # 等待回應
# msg = master.recv_match(type='TIMESYNC', blocking=True, timeout=1.0)
# if msg and msg.tc1 != 0:
# # 收到回應msg.tc1 應該等於我們剛才送出的 now_ns
# rtt_ms = (get_time_ns() - msg.tc1) / 1e6
# print(f"🕒 TIMESYNC RTT: {rtt_ms:.3f} ms")
# break # 測試一次就好
# else:
# print("等待 TIMESYNC 回應中...")
# time.sleep(1)
# -------------------------------------------------------------------
import time
import statistics
from pymavlink import mavutil
# 連接到 MAVProxy 轉發的端口
master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255)
# 測試設定
TARGET_SYS = 2 # 你想測試的載具 ID
SAMPLES = 20 # 取 20 次樣本來算平均和抖動
rtt_history = []
print(f"開始測試對 System {TARGET_SYS} 的通道品質...")
def get_time_ns():
return int(time.time() * 1e9)
try:
while len(rtt_history) < SAMPLES:
ts1_send = get_time_ns()
# 發送 TIMESYNC (V1/V2 格式相容)
# 針對特定系統發送
master.mav.timesync_send(0, ts1_send)
# 等待回應 (過濾目標系統的訊息)
msg = master.recv_match(type='TIMESYNC', blocking=True, timeout=1.0)
if msg and msg.get_srcSystem() == TARGET_SYS and msg.tc1 != 0:
ts1_recv = get_time_ns()
rtt_ns = ts1_recv - msg.tc1
rtt_ms = rtt_ns / 1e6
rtt_history.append(rtt_ms)
print(f"Sample {len(rtt_history)}: RTT = {rtt_ms:.3f} ms")
else:
print("Request timeout or mismatched ID...")
time.sleep(0.1) # 快速採樣
# 4. 計算統計數據
avg_latency = statistics.mean(rtt_history)
std_dev = statistics.stdev(rtt_history) # 這就是 Jitter 的一種表現
max_rtt = max(rtt_history)
min_rtt = min(rtt_history)
print("\n--- 物理通道品質分析報告 ---")
print(f"目標載具 : System {TARGET_SYS}")
print(f"平均延遲 (Avg) : {avg_latency:.3f} ms")
print(f"最小延遲 (Min) : {min_rtt:.3f} ms")
print(f"最大延遲 (Max) : {max_rtt:.3f} ms")
print(f"抖動幅度 (Jitter): {std_dev:.3f} ms (標準差)")
if std_dev > 10:
print("警告:網路抖動過高,可能影響控制穩定性!")
else:
print("通道品質:良好")
except KeyboardInterrupt:
print("測試終止。")

@ -0,0 +1,46 @@
from pymavlink.dialects.v20 import common as mavlink
class NullBuffer:
def write(self, data): pass
def seek(self, pos): pass
def tell(self): return 0
class CapturingBuffer:
def __init__(self):
self.last_data = b''
def write(self, data):
self.last_data = bytes(data)
def seek(self, pos): pass
def tell(self): return 0
# 1. 初始化(僅作為編碼器)
# file 參數可以是任何擁有 write() 方法的物件,這裡用 BytesIO 模擬
# 初始化方法一:使用 BytesIO
# import io
# out_buf = io.BytesIO()
# mav = mavlink.MAVLink(out_buf, srcSystem=1, srcComponent=191)
# 初始化方法二:使用自定義的 NullBuffer
mav = mavlink.MAVLink(NullBuffer(), srcSystem=1, srcComponent=191)
mav.seq = 254
# 2. 建立心跳包並取得二進制數據
# 連續產出「不同」的訊息物件
for i in range(3):
msg = mav.heartbeat_encode(mavlink.MAV_TYPE_GCS, 0, 0, 0, 0)
data = msg.pack(mav)
mav.seq = (mav.seq + 1) & 0xFF
# MAVLink 2 的序列號在第 3 個 Byte (Index 2)
print(f"{i} 次發送, Seq: {data[4]}, Raw: {data.hex()}")
print("分隔線")
buf = CapturingBuffer()
mav_buf2 = mavlink.MAVLink(buf, srcSystem=1, srcComponent=191)
for i in range(3):
mav_buf2.heartbeat_send(mavlink.MAV_TYPE_GCS, 0, 0, 0, 0)
data = buf.last_data
print(f"{i} 次發送, Seq: {data[4]}, Raw: {data.hex()}")

File diff suppressed because it is too large Load Diff

@ -0,0 +1,71 @@
import asyncio
import serial_asyncio
SERIAL_PORT = '/dev/ttyUSB0' # 修改為你的 serial port
SERIAL_BAUDRATE = 115200 # 修改為你的 baudrate
UDP_REMOTE_IP = '192.168.1.100' # 修改為目標 IP
UDP_REMOTE_PORT = 5005 # 修改為目標 port
UDP_LOCAL_PORT = 5006 # 本地 UDP 監聽 port
class SerialToUDP(asyncio.Protocol):
def __init__(self, udp_transport):
self.udp_transport = udp_transport
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
# Serial 收到資料,轉發到 UDP
self.udp_transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def write_to_serial(self, data):
self.transport.write(data)
class UDPToSerial(asyncio.DatagramProtocol):
def __init__(self, serial_proto):
self.serial_proto = serial_proto
def datagram_received(self, data, addr):
# UDP 收到資料,轉發到 Serial
self.serial_proto.write_to_serial(data)
async def main():
loop = asyncio.get_running_loop()
# 定義協議工廠函數
def create_empty_protocol():
return asyncio.DatagramProtocol()
# 建立 UDP1 傳輸
udp_transport, _ = await loop.create_datagram_endpoint(
create_empty_protocol,
local_addr=('0.0.0.0', UDP_LOCAL_PORT)
)
# 建立 Serial 傳輸
serial_proto = SerialToUDP(udp_transport)
def get_serial_protocol():
return serial_proto
_, serial_transport = await serial_asyncio.create_serial_connection(
loop, get_serial_protocol, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
# 建立 UDP2 監聽
udp_proto = UDPToSerial(serial_proto)
def get_udp_protocol():
return udp_proto
udp_listen_transport, _ = await loop.create_datagram_endpoint(
get_udp_protocol,
local_addr=('0.0.0.0', UDP_LOCAL_PORT)
)
# 保持運行
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
if __name__ == '__main__':
asyncio.run(main())

@ -1,27 +0,0 @@
#!/bin/bash
# 網站清單
DOMAINS=("google.com" "smarter.nchu.edu.tw")
echo "網站 SSL 憑證剩餘天數:"
echo "---------------------------"
for domain in "${DOMAINS[@]}"; do
end_date=$(echo | openssl s_client -servername "$domain" -connect "$domain:443" 2>/dev/null |
openssl x509 -noout -enddate | cut -d= -f2)
end_timestamp=$(date -d "$end_date" +%s)
now_timestamp=$(date +%s)
remaining_days=$(( (end_timestamp - now_timestamp) / 86400 ))
if [ $remaining_days -lt 0 ]; then
status="已過期 ❌"
elif [ $remaining_days -lt 15 ]; then
status="即將到期 ⚠️"
else
status="正常 ✅"
fi
printf "%-20s 到期日:%-25s 剩餘天數:%3d 天 %s\n" "$domain" "$end_date" "$remaining_days" "$status"
done

@ -0,0 +1,40 @@
import socket
import sys
import select
# 設定來源 IP 與 Port
SRC_IP = '127.0.0.1' # 監聽所有介面
SRC_PORT = 16661 # 請自行修改
# 建立 UDP 監聽 socket
src_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
src_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
src_sock.bind((SRC_IP, SRC_PORT))
print(f"Listening for UDP on {SRC_IP}:{SRC_PORT}")
# 設定目標 Unix socket 路徑
UNIX_SOCKET_PATH = '/tmp/unix_socket_mavlink.sock' # 請自行修改
# 建立 Unix socket 連線
unix_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
unix_sock.connect(UNIX_SOCKET_PATH)
except Exception as e:
print(f"Failed to connect to unix socket: {e}")
sys.exit(1)
while True:
# 使用 select 監聽兩個 socket
readable, _, _ = select.select([src_sock, unix_sock], [], [])
for sock in readable:
if sock is src_sock:
data, addr = src_sock.recvfrom(4096)
if data:
unix_sock.sendall(data)
# print(f"Received UDP data from {addr}: {data}") # debug
# break # debug
elif sock is unix_sock:
data = unix_sock.recv(4096)
if data:
# 回送到最近收到資料的 UDP client
src_sock.sendto(data, addr)
Loading…
Cancel
Save