From 7b7e02a9e7373f76c223bd08c8aae0f7c97a7870 Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Mon, 27 Apr 2026 15:12:38 +0800 Subject: [PATCH] =?UTF-8?q?(update)=20serialManager:=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=20AT=20=E6=8C=87=E4=BB=A4=E8=99=95=E7=90=86=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E8=88=87=E4=B8=B2=E5=8F=A3=E5=AF=AB=E5=85=A5=E6=94=AF=E6=8C=81?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0=E7=89=88=E6=9C=AC=E8=99=9F=E8=87=B3?= =?UTF-8?q?=200.80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fc_network_adapter/serialManager.py | 135 +++++++++++++++--- 1 file changed, 113 insertions(+), 22 deletions(-) diff --git a/src/fc_network_adapter/fc_network_adapter/serialManager.py b/src/fc_network_adapter/fc_network_adapter/serialManager.py index 19f78df..260102f 100644 --- a/src/fc_network_adapter/fc_network_adapter/serialManager.py +++ b/src/fc_network_adapter/fc_network_adapter/serialManager.py @@ -21,14 +21,16 @@ from abc import ABC, abstractmethod # from xbee.frame import APIFrame # 自定義的 import -from .utils import setup_logger +from .utils import RingBuffer, setup_logger # ====================== 分割線 ===================== logger = setup_logger(os.path.basename(__file__)) -MODULE_VER = "0.60" +MODULE_VER = "0.80" -# ====================== 分割線 ===================== +rx_module_ack = RingBuffer(capacity=256, buffer_id=253) + +# ====================== State DEFINITION ===================== # 定義 serial 連線的模式 class SerialMode(Enum): @@ -37,7 +39,6 @@ class SerialMode(Enum): XBEEAPI2AT = auto() # XBee API 模式 NOT_USE = auto() # 不使用 - # ====================== Frame Processor Base and Implementation ===================== class FrameProcessor(ABC): @@ -139,6 +140,8 @@ class XBeeFrameProcessor(FrameProcessor): if len(self.buffer) < full_length: return None + # TODO: 驗證 checksum:(sum(frame[3:]) & 0xFF) 應為 0xFF;不通過則丟棄此 frame 並從 buffer pop 1 byte 再重新對齊 + frame = bytes(self.buffer[:full_length]) del self.buffer[:full_length] return frame @@ -198,15 +201,22 @@ class XBeeFrameProcessor(FrameProcessor): class ATCommandHandler: """ - XBee AT 指令回應處理器 + XBee AT 指令處理器 職責: + - 組 AT Command Request frame (0x08) 並透過注入的 writer 送出 - 解析 AT Response frame (0x88) - 依 AT 指令分派到對應的 handler + + writer 由 SerialHandler 在連線建立後 (connection_made) 注入, + 型別為 Callable[[bytes], None](通常就是 serial transport.write)。 """ + FRAME_TYPE_AT_COMMAND = 0x08 + def __init__(self, serial_port: str): self.serial_port = serial_port + self.writer = None self.handlers = { b'DB': self._handle_rssi, b'SH': self._handle_serial_high, @@ -214,6 +224,45 @@ class ATCommandHandler: # 可擴展其他 AT 指令 } + # ---- 發送端 ---- + def set_writer(self, writer): + """由 SerialHandler 注入實際寫入 serial 的 callable""" + self.writer = writer + + def send_command(self, command: bytes, parameter: bytes, frame_id: int): + """ + 發送一筆 AT 指令給 dongle + - command: 2 bytes AT 指令名稱,例如 b'DB' + - parameter: 指令參數 bytes(讀取用指令通常為空) + - frame_id: XBee frame id,用來配對回應(0x00 表示不要求回應) + """ + if self.writer is None: + logger.warning( + f"[{self.serial_port}] AT writer 尚未就緒,指令 {command!r} 丟棄" + ) + return + + frame_bytes = self._build_at_request(command, parameter, frame_id) + self.writer(frame_bytes) + # logger.debug( + # f"[{self.serial_port}] send AT {command.decode(errors='replace')} " + # f"(frame_id=0x{frame_id:02X})" + # ) # dev + + @staticmethod + def _build_at_request(command: bytes, parameter: bytes, frame_id: int) -> bytes: + """ + 將 AT 指令組成 XBee API AT Command Request frame (0x08) + callers 必須明確指定 frame_id + """ + frame_type = ATCommandHandler.FRAME_TYPE_AT_COMMAND + + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += command + parameter + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + + # ---- 接收端 ---- def handle_frame(self, frame: bytes) -> None: """接收一整個 AT Response frame,內部完成 parse + dispatch""" parsed = self._parse(frame) @@ -245,6 +294,8 @@ class ATCommandHandler: def _dispatch(self, response: dict) -> None: """根據 AT 指令類型分派處理""" + # print(f"[{self.serial_port}] AT Response: {response}") # dev + if not response['is_ok']: logger.warning( f"[{self.serial_port}] AT {response['command'].decode()} " @@ -262,9 +313,9 @@ class ATCommandHandler: ) def _handle_rssi(self, data: bytes): - """處理 DB (RSSI) 回應""" - # 未來可實現 RSSI 處理邏輯 - pass + """處理 DB (RSSI) 回應:單 byte 無號值,單位 dBm""" + if data: + print(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev def _handle_serial_high(self, data: bytes): """處理 SH (Serial Number High)""" @@ -286,24 +337,28 @@ class SerialHandler(asyncio.Protocol): self.transport = None # Serial 自己的傳輸物件 # 根據模式建立 processor(需要 AT handler 時一併在工廠內建好注入) - self.processor = self._create_processor(serial_mode, serial_port_str) + self.processor = self._create_processor() - @staticmethod - def _create_processor(serial_mode: SerialMode, serial_port_str: str) -> FrameProcessor: - """工廠方法:根據模式建立對應的 processor,必要時一併注入 AT handler""" - if serial_mode == SerialMode.STRAIGHT: + def _create_processor(self) -> FrameProcessor: + """工廠方法:根據 self.serial_mode 建立對應的 processor,必要時一併注入 AT handler""" + if self.serial_mode == SerialMode.STRAIGHT: return RawFrameProcessor() - if serial_mode == SerialMode.XBEEAPI2AT: - at_handler = ATCommandHandler(serial_port_str) + if self.serial_mode == SerialMode.XBEEAPI2AT: + at_handler = ATCommandHandler(self.serial_port_str) return XBeeFrameProcessor(at_handler=at_handler) - logger.warning(f"Unknown serial mode: {serial_mode}, using Raw") + logger.warning(f"Unknown serial mode: {self.serial_mode}, using Raw") return RawFrameProcessor() def connection_made(self, transport): """連接建立時的回調""" self.transport = transport + + # XBee 模式下,將 transport.write 注入給 AT handler,讓它能送指令 + if self.serial_mode == SerialMode.XBEEAPI2AT: + self.processor.at_handler.set_writer(self.transport.write) + if hasattr(self.udp_handler, 'set_serial_handler'): self.udp_handler.set_serial_handler(self) logger.debug(f"Serial port {self.serial_port_str} connected") @@ -589,6 +644,37 @@ class serial_manager: ret[key] = obj.serial_port return ret + def send_at_command(self, serial_id, command: bytes, parameter: bytes = b'', frame_id: int = 0x52) -> bool: + """ + 對指定 serial_id 的 XBee dongle 發送一筆 AT 指令(thread-safe) + - serial_id: create_serial_link 取得的編號 + - command: 例如 b'DB' + - parameter: 指令參數,讀取型指令通常為空 + - frame_id: XBee frame id,0x00 代表不要求回應 + 回傳是否成功排進事件圈 + """ + if not self.running or not self.loop: + logger.error("Event loop not running, cannot send AT command") + return False + + if serial_id not in self.serial_objects: + logger.error(f"Serial object {serial_id} not found") + return False + + serial_obj = self.serial_objects[serial_id] + if serial_obj.serial_mode != SerialMode.XBEEAPI2AT: + logger.error( + f"Serial {serial_id} mode is {serial_obj.serial_mode.name}, " + f"AT command only supported in XBEEAPI2AT mode" + ) + return False + + at_handler = serial_obj.serial_handler.processor.at_handler + self.loop.call_soon_threadsafe( + at_handler.send_command, command, parameter, frame_id + ) + return True + @staticmethod def check_serial_port(serial_port, baudrate): """檢查串口是否存在與可用""" @@ -626,19 +712,24 @@ if __name__ == '__main__': sm = serial_manager() sm.start() - SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 - SERIAL_BAUDRATE = 115200 - UDP_REMOTE_PORT = 14561 - sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT) - # SERIAL_PORT = '/dev/ttyACM0' # 手動指定 # SERIAL_BAUDRATE = 115200 # UDP_REMOTE_PORT = 14571 # sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT) + SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 + SERIAL_BAUDRATE = 115200 + UDP_REMOTE_PORT = 14561 + sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT) + linked_serial = sm.get_serial_link() print(linked_serial) - time.sleep(60) + + # 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試 + time.sleep(5) + for i in range(60): + sm.send_at_command(1, b'DB', frame_id=0x52) + time.sleep(1) sm.remove_serial_link(1) time.sleep(3)