(modify) serialManager 架構調整 詳見改版記錄

chiyu
Chiyu Chen 3 weeks ago
parent 7ce094d211
commit 06463d71bc

@ -47,10 +47,10 @@ class FrameProcessor(ABC):
self.buffer = bytearray()
@abstractmethod
def process_incoming(self, data: bytes):
def process_incoming(self, data: bytes) -> bytes:
"""
處理接收到的數據
返回已完整解析的 payload 列表
返回已完整解析的 payload 列表 後續要丟到 UDP
"""
pass
@ -66,7 +66,7 @@ class FrameProcessor(ABC):
class RawFrameProcessor(FrameProcessor):
"""原始數據直通處理器"""
def process_incoming(self, data: bytes):
def process_incoming(self, data: bytes) -> bytes:
"""直接返回原始數據,不進行緩衝"""
return [data] if data else []
@ -76,108 +76,106 @@ class RawFrameProcessor(FrameProcessor):
class XBeeFrameProcessor(FrameProcessor):
"""XBee API 協議處理器"""
def process_incoming(self, data: bytes):
"""
XBee API 協議處理器
職責
- XBee API frame 的拆幀 / 組幀
- 0x90 (RX Packet) -> 解出 payload 回傳
- 0x88 (AT Response) -> 轉交 at_handler 處理若有注入
- 0x8B (TX Status) -> 目前忽略
- 其他 frame type -> warning 忽略
若未來要做變化型 XBee例如 API2 escape mode不同 addressing
繼承此類並覆寫 _encapsulate / _decapsulate / _try_extract_frame 即可
"""
# XBee API frame type
FRAME_TYPE_TX_REQUEST = 0x10
FRAME_TYPE_AT_RESPONSE = 0x88
FRAME_TYPE_TX_STATUS = 0x8B
FRAME_TYPE_RX_PACKET = 0x90
def __init__(self, at_handler: "ATCommandHandler" = None):
super().__init__()
self.at_handler = at_handler
# ---- 對外契約 ----
def process_incoming(self, data: bytes) -> bytes:
"""處理 XBee API 幀並提取 payload"""
self.buffer.extend(data)
payloads = []
while True:
frame = self._try_extract_frame()
if frame is None:
break
payload = self._dispatch_frame(frame)
if payload:
payloads.append(payload)
return payloads
def process_outgoing(self, data: bytes) -> bytes:
"""將數據封裝為 XBee API 傳輸幀"""
return self._encapsulate(data)
# ---- 內部:拆幀與分派 ----
def _try_extract_frame(self) -> bytes:
"""
buffer 嘗試抓一個完整 frame
- 對齊幀頭 0x7E
- 長度不足時等待下次進 buffer
- 成功則從 buffer 切掉並回傳整個 frame bytes
"""
while len(self.buffer) >= 3:
# 尋找幀頭
if self.buffer[0] != 0x7E:
self.buffer.pop(0)
continue
# 讀取 payload 長度
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1 # 起始符(1) + 長度(2) + payload + 校驗和(1)
# 等待完整幀
if len(self.buffer) < full_length:
break
# 提取完整 frame 並從緩衝區移除
return None
frame = bytes(self.buffer[:full_length])
del self.buffer[:full_length]
# 判斷 frame 類型並處理
frame_type = frame[3]
if frame_type == 0x90: # RX Packet
payload = XBeeFrameHandler.decapsulate_data(frame)
if payload:
payloads.append(payload)
elif frame_type == 0x88: # AT Response
# 可以在這裡處理 AT 指令回應
# response = XBeeFrameHandler.parse_at_command_response(frame)
# 目前忽略
pass
elif frame_type == 0x8B: # Transmit Status
# 傳輸狀態,目前忽略
pass
else:
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
return payloads
def process_outgoing(self, data: bytes) -> bytes:
"""將數據封裝為 XBee API 傳輸幀"""
return XBeeFrameHandler.encapsulate_data(data)
return frame
return None
class XBeeFrameHandler:
"""XBee API Frame 處理器"""
@staticmethod
def parse_at_command_response(frame: bytes) -> dict:
"""解析 AT Command Response (0x88)"""
if len(frame) < 8:
return None
def _dispatch_frame(self, frame: bytes) -> bytes:
"""根據 frame type 分派;若是 RX payload 回傳 bytes其餘回傳 None"""
frame_type = frame[3]
if frame_type != 0x88:
if frame_type == self.FRAME_TYPE_RX_PACKET:
return self._decapsulate(frame)
if frame_type == self.FRAME_TYPE_AT_RESPONSE:
if self.at_handler is not None:
self.at_handler.handle_frame(frame)
return None
frame_id = frame[4]
at_command = frame[5:7]
status = frame[7]
data = frame[8:] if len(frame) > 8 else b''
return {
'frame_id': frame_id,
'command': at_command,
'status': status,
'data': data,
'is_ok': status == 0x00
}
@staticmethod
def parse_receive_packet(frame: bytes) -> dict:
"""解析 RX Packet (0x90) - 未來擴展用"""
# if len(frame) < 15 or frame[3] != 0x90:
# return None
# return {
# 'source_addr': frame[4:12],
# 'reserved': frame[12:14],
# 'options': frame[14],
# 'data': frame[15:-1]
# }
pass
if frame_type == self.FRAME_TYPE_TX_STATUS:
return None
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
return None
# ---- 內部:編解碼 ----
@staticmethod
def encapsulate_data(data: bytes, dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', frame_id = 0x01) -> bytes:
def _encapsulate(
data: bytes,
dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF',
frame_id: int = 0x01,
) -> bytes:
"""
將數據封裝為 XBee API 傳輸幀
使用 XBee API 格式封裝數據:
- 傳輸請求幀 (0x10)
payload 包成 XBee TX Request (0x10)
- 使用廣播地址
- 添加適當的頭部和校驗和
"""
frame_type = 0x10
frame_type = XBeeFrameProcessor.FRAME_TYPE_TX_REQUEST
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
@ -189,22 +187,24 @@ class XBeeFrameHandler:
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
@staticmethod
def decapsulate_data(data: bytes):
# 獲取數據長度 (不包括校驗和)
length = (data[1] << 8) | data[2]
def _decapsulate(frame: bytes) -> bytes:
"""從 RX Packet (0x90) 取出 payload"""
length = (frame[1] << 8) | frame[2]
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
return frame[rf_data_start:3 + length]
# ====================== Dongle Command Handler =====================
class ATCommandHandler:
"""AT 指令回應處理器"""
"""
XBee AT 指令回應處理器
職責
- 解析 AT Response frame (0x88)
- AT 指令分派到對應的 handler
"""
def __init__(self, serial_port: str):
self.serial_port = serial_port
self.handlers = {
@ -213,63 +213,93 @@ class ATCommandHandler:
b'SL': self._handle_serial_low,
# 可擴展其他 AT 指令
}
def handle_response(self, response: dict):
def handle_frame(self, frame: bytes) -> None:
"""接收一整個 AT Response frame內部完成 parse + dispatch"""
parsed = self._parse(frame)
if parsed is None:
return
self._dispatch(parsed)
@staticmethod
def _parse(frame: bytes) -> dict:
"""解析 AT Command Response (0x88);不符格式回傳 None"""
if len(frame) < 8:
return None
if frame[3] != 0x88:
return None
frame_id = frame[4]
at_command = frame[5:7]
status = frame[7]
data = frame[8:] if len(frame) > 8 else b''
return {
'frame_id': frame_id,
'command': at_command,
'status': status,
'data': data,
'is_ok': status == 0x00,
}
def _dispatch(self, response: dict) -> None:
"""根據 AT 指令類型分派處理"""
if not response or not response['is_ok']:
if response:
logger.warning(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}")
if not response['is_ok']:
logger.warning(
f"[{self.serial_port}] AT {response['command'].decode()} "
f"失敗,狀態碼: {response['status']}"
)
return
command = response['command']
handler = self.handlers.get(command)
handler = self.handlers.get(response['command'])
if handler:
handler(response['data'])
else:
logger.debug(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}")
logger.debug(
f"[{self.serial_port}] 未處理的 AT 指令: "
f"{response['command'].decode()}"
)
def _handle_rssi(self, data: bytes):
"""處理 DB (RSSI) 回應"""
# 未來可實現 RSSI 處理邏輯
pass
def _handle_serial_high(self, data: bytes):
"""處理 SH (Serial Number High)"""
pass
def _handle_serial_low(self, data: bytes):
"""處理 SL (Serial Number Low)"""
pass
# ================ Serial UDP Socket Object ==============
class SerialHandler(asyncio.Protocol):
"""asyncio.Protocol 用於處理 Serial 收發"""
def __init__(self, udp_handler, serial_port_str, serial_mode: SerialMode):
self.udp_handler = udp_handler # UDP 的傳輸物件
self.serial_port_str = serial_port_str
self.serial_mode = serial_mode
self.transport = None # Serial 自己的傳輸物件
# 根據模式創建對應的 processor
self.processor = self._create_processor(serial_mode)
# AT 指令處理器(僅 XBee 模式使用)
if serial_mode == SerialMode.XBEEAPI2AT:
self.at_handler = ATCommandHandler(serial_port_str)
else:
self.at_handler = None
def _create_processor(self, serial_mode: SerialMode) -> FrameProcessor:
"""工廠方法:根據模式創建處理器"""
# 根據模式建立 processor需要 AT handler 時一併在工廠內建好注入)
self.processor = self._create_processor(serial_mode, serial_port_str)
@staticmethod
def _create_processor(serial_mode: SerialMode, serial_port_str: str) -> FrameProcessor:
"""工廠方法:根據模式建立對應的 processor必要時一併注入 AT handler"""
if serial_mode == SerialMode.STRAIGHT:
return RawFrameProcessor()
elif serial_mode == SerialMode.XBEEAPI2AT:
return XBeeFrameProcessor()
else:
logger.warning(f"Unknown serial mode: {serial_mode}, using Raw")
return RawFrameProcessor()
if serial_mode == SerialMode.XBEEAPI2AT:
at_handler = ATCommandHandler(serial_port_str)
return XBeeFrameProcessor(at_handler=at_handler)
logger.warning(f"Unknown serial mode: {serial_mode}, using Raw")
return RawFrameProcessor()
def connection_made(self, transport):
"""連接建立時的回調"""
@ -613,3 +643,19 @@ if __name__ == '__main__':
sm.remove_serial_link(1)
time.sleep(3)
sm.shutdown()
'''
================= 改版記錄 ============================
2026.4.20
1. XBeeFrameHandler 結構移除
2. XBeeFrameProcessor 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler )
3. XBeeFrameProcessor 新增 _try_extract_frame 處理被可能截斷的 UART 封包
4. XBeeFrameProcessor 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler
5. ATCommandHandler 新增 _parse 去拆解 0x88 AT Command Response
6. ATCommandHandler 新增 _dispatch 把拆解的結果 分配到 _handle_XXX
7. ATCommandHandler 新增各項 _handle_XXX (未實作)
'''

Loading…
Cancel
Save