diff --git a/src/fc_network_adapter/fc_network_adapter/serialManager.py b/src/fc_network_adapter/fc_network_adapter/serialManager.py index 2d1c715..19f78df 100644 --- a/src/fc_network_adapter/fc_network_adapter/serialManager.py +++ b/src/fc_network_adapter/fc_network_adapter/serialManager.py @@ -47,10 +47,10 @@ class FrameProcessor(ABC): self.buffer = bytearray() @abstractmethod - def process_incoming(self, data: bytes): + def process_incoming(self, data: bytes) -> bytes: """ 處理接收到的數據 - 返回:已完整解析的 payload 列表 + 返回:已完整解析的 payload 列表 後續要丟到 UDP 去 """ pass @@ -66,7 +66,7 @@ class FrameProcessor(ABC): class RawFrameProcessor(FrameProcessor): """原始數據直通處理器""" - def process_incoming(self, data: bytes): + def process_incoming(self, data: bytes) -> bytes: """直接返回原始數據,不進行緩衝""" return [data] if data else [] @@ -76,108 +76,106 @@ class RawFrameProcessor(FrameProcessor): class XBeeFrameProcessor(FrameProcessor): - """XBee API 協議處理器""" - - def process_incoming(self, data: bytes): + """ + XBee API 協議處理器 + + 職責: + - XBee API frame 的拆幀 / 組幀 + - 0x90 (RX Packet) -> 解出 payload 回傳 + - 0x88 (AT Response) -> 轉交 at_handler 處理(若有注入) + - 0x8B (TX Status) -> 目前忽略 + - 其他 frame type -> 記 warning 忽略 + + 若未來要做變化型 XBee(例如 API2 escape mode、不同 addressing), + 繼承此類並覆寫 _encapsulate / _decapsulate / _try_extract_frame 即可。 + """ + + # XBee API frame type + FRAME_TYPE_TX_REQUEST = 0x10 + FRAME_TYPE_AT_RESPONSE = 0x88 + FRAME_TYPE_TX_STATUS = 0x8B + FRAME_TYPE_RX_PACKET = 0x90 + + def __init__(self, at_handler: "ATCommandHandler" = None): + super().__init__() + self.at_handler = at_handler + + # ---- 對外契約 ---- + def process_incoming(self, data: bytes) -> bytes: """處理 XBee API 幀並提取 payload""" self.buffer.extend(data) payloads = [] - + + while True: + frame = self._try_extract_frame() + if frame is None: + break + payload = self._dispatch_frame(frame) + if payload: + payloads.append(payload) + + return payloads + + def process_outgoing(self, data: bytes) -> bytes: + """將數據封裝為 XBee API 傳輸幀""" + return self._encapsulate(data) + + # ---- 內部:拆幀與分派 ---- + def _try_extract_frame(self) -> bytes: + """ + 從 buffer 嘗試抓一個完整 frame + - 對齊幀頭 0x7E + - 長度不足時等待下次進 buffer + - 成功則從 buffer 切掉並回傳整個 frame bytes + """ while len(self.buffer) >= 3: - # 尋找幀頭 if self.buffer[0] != 0x7E: self.buffer.pop(0) continue - - # 讀取 payload 長度 + length = (self.buffer[1] << 8) | self.buffer[2] full_length = 3 + length + 1 # 起始符(1) + 長度(2) + payload + 校驗和(1) - - # 等待完整幀 + if len(self.buffer) < full_length: - break - - # 提取完整 frame 並從緩衝區移除 + return None + frame = bytes(self.buffer[:full_length]) del self.buffer[:full_length] - - # 判斷 frame 類型並處理 - frame_type = frame[3] - - if frame_type == 0x90: # RX Packet - payload = XBeeFrameHandler.decapsulate_data(frame) - if payload: - payloads.append(payload) - elif frame_type == 0x88: # AT Response - # 可以在這裡處理 AT 指令回應 - # response = XBeeFrameHandler.parse_at_command_response(frame) - # 目前忽略 - pass - elif frame_type == 0x8B: # Transmit Status - # 傳輸狀態,目前忽略 - pass - else: - logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}") - - return payloads - - def process_outgoing(self, data: bytes) -> bytes: - """將數據封裝為 XBee API 傳輸幀""" - return XBeeFrameHandler.encapsulate_data(data) + return frame + return None -class XBeeFrameHandler: - """XBee API Frame 處理器""" - - @staticmethod - def parse_at_command_response(frame: bytes) -> dict: - """解析 AT Command Response (0x88)""" - if len(frame) < 8: - return None - + def _dispatch_frame(self, frame: bytes) -> bytes: + """根據 frame type 分派;若是 RX payload 回傳 bytes,其餘回傳 None""" frame_type = frame[3] - if frame_type != 0x88: + + if frame_type == self.FRAME_TYPE_RX_PACKET: + return self._decapsulate(frame) + + if frame_type == self.FRAME_TYPE_AT_RESPONSE: + if self.at_handler is not None: + self.at_handler.handle_frame(frame) return None - - frame_id = frame[4] - at_command = frame[5:7] - status = frame[7] - data = frame[8:] if len(frame) > 8 else b'' - - return { - 'frame_id': frame_id, - 'command': at_command, - 'status': status, - 'data': data, - 'is_ok': status == 0x00 - } - - @staticmethod - def parse_receive_packet(frame: bytes) -> dict: - """解析 RX Packet (0x90) - 未來擴展用""" - # if len(frame) < 15 or frame[3] != 0x90: - # return None - - # return { - # 'source_addr': frame[4:12], - # 'reserved': frame[12:14], - # 'options': frame[14], - # 'data': frame[15:-1] - # } - pass + + if frame_type == self.FRAME_TYPE_TX_STATUS: + return None + + logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}") return None + # ---- 內部:編解碼 ---- @staticmethod - def encapsulate_data(data: bytes, dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', frame_id = 0x01) -> bytes: + def _encapsulate( + data: bytes, + dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', + frame_id: int = 0x01, + ) -> bytes: """ - 將數據封裝為 XBee API 傳輸幀 - - 使用 XBee API 格式封裝數據: - - 傳輸請求幀 (0x10) + 將 payload 包成 XBee TX Request (0x10) - 使用廣播地址 - 添加適當的頭部和校驗和 """ - frame_type = 0x10 + frame_type = XBeeFrameProcessor.FRAME_TYPE_TX_REQUEST dest_addr16 = b'\xFF\xFE' broadcast_radius = 0x00 options = 0x00 @@ -189,22 +187,24 @@ class XBeeFrameHandler: return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) @staticmethod - def decapsulate_data(data: bytes): - - # 獲取數據長度 (不包括校驗和) - length = (data[1] << 8) | data[2] - + def _decapsulate(frame: bytes) -> bytes: + """從 RX Packet (0x90) 取出 payload""" + length = (frame[1] << 8) | frame[2] rf_data_start = 3 + 12 - return data[rf_data_start:3 + length] - - + return frame[rf_data_start:3 + length] # ====================== Dongle Command Handler ===================== class ATCommandHandler: - """AT 指令回應處理器""" - + """ + XBee AT 指令回應處理器 + + 職責: + - 解析 AT Response frame (0x88) + - 依 AT 指令分派到對應的 handler + """ + def __init__(self, serial_port: str): self.serial_port = serial_port self.handlers = { @@ -213,63 +213,93 @@ class ATCommandHandler: b'SL': self._handle_serial_low, # 可擴展其他 AT 指令 } - - def handle_response(self, response: dict): + + def handle_frame(self, frame: bytes) -> None: + """接收一整個 AT Response frame,內部完成 parse + dispatch""" + parsed = self._parse(frame) + if parsed is None: + return + self._dispatch(parsed) + + @staticmethod + def _parse(frame: bytes) -> dict: + """解析 AT Command Response (0x88);不符格式回傳 None""" + if len(frame) < 8: + return None + + if frame[3] != 0x88: + return None + + frame_id = frame[4] + at_command = frame[5:7] + status = frame[7] + data = frame[8:] if len(frame) > 8 else b'' + + return { + 'frame_id': frame_id, + 'command': at_command, + 'status': status, + 'data': data, + 'is_ok': status == 0x00, + } + + def _dispatch(self, response: dict) -> None: """根據 AT 指令類型分派處理""" - if not response or not response['is_ok']: - if response: - logger.warning(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}") + if not response['is_ok']: + logger.warning( + f"[{self.serial_port}] AT {response['command'].decode()} " + f"失敗,狀態碼: {response['status']}" + ) return - - command = response['command'] - handler = self.handlers.get(command) - + + handler = self.handlers.get(response['command']) if handler: handler(response['data']) else: - logger.debug(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}") - + logger.debug( + f"[{self.serial_port}] 未處理的 AT 指令: " + f"{response['command'].decode()}" + ) + def _handle_rssi(self, data: bytes): """處理 DB (RSSI) 回應""" # 未來可實現 RSSI 處理邏輯 pass - + def _handle_serial_high(self, data: bytes): """處理 SH (Serial Number High)""" pass - + def _handle_serial_low(self, data: bytes): """處理 SL (Serial Number Low)""" pass + # ================ Serial UDP Socket Object ============== class SerialHandler(asyncio.Protocol): """asyncio.Protocol 用於處理 Serial 收發""" - + def __init__(self, udp_handler, serial_port_str, serial_mode: SerialMode): self.udp_handler = udp_handler # UDP 的傳輸物件 self.serial_port_str = serial_port_str self.serial_mode = serial_mode self.transport = None # Serial 自己的傳輸物件 - - # 根據模式創建對應的 processor - self.processor = self._create_processor(serial_mode) - - # AT 指令處理器(僅 XBee 模式使用) - if serial_mode == SerialMode.XBEEAPI2AT: - self.at_handler = ATCommandHandler(serial_port_str) - else: - self.at_handler = None - def _create_processor(self, serial_mode: SerialMode) -> FrameProcessor: - """工廠方法:根據模式創建處理器""" + # 根據模式建立 processor(需要 AT handler 時一併在工廠內建好注入) + self.processor = self._create_processor(serial_mode, serial_port_str) + + @staticmethod + def _create_processor(serial_mode: SerialMode, serial_port_str: str) -> FrameProcessor: + """工廠方法:根據模式建立對應的 processor,必要時一併注入 AT handler""" if serial_mode == SerialMode.STRAIGHT: return RawFrameProcessor() - elif serial_mode == SerialMode.XBEEAPI2AT: - return XBeeFrameProcessor() - else: - logger.warning(f"Unknown serial mode: {serial_mode}, using Raw") - return RawFrameProcessor() + + if serial_mode == SerialMode.XBEEAPI2AT: + at_handler = ATCommandHandler(serial_port_str) + return XBeeFrameProcessor(at_handler=at_handler) + + logger.warning(f"Unknown serial mode: {serial_mode}, using Raw") + return RawFrameProcessor() def connection_made(self, transport): """連接建立時的回調""" @@ -613,3 +643,19 @@ if __name__ == '__main__': sm.remove_serial_link(1) time.sleep(3) sm.shutdown() + + + +''' +================= 改版記錄 ============================ +2026.4.20 +1. XBeeFrameHandler 結構移除 +2. XBeeFrameProcessor 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler 中) +3. XBeeFrameProcessor 新增 _try_extract_frame 處理被可能截斷的 UART 封包 +4. XBeeFrameProcessor 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler +5. ATCommandHandler 新增 _parse 去拆解 0x88 AT Command Response +6. ATCommandHandler 新增 _dispatch 把拆解的結果 分配到 _handle_XXX +7. ATCommandHandler 新增各項 _handle_XXX (未實作) + +''' +