(update) serialManager: 增加 AT 指令處理功能與串口寫入支持,更新版本號至 0.80

Chiyu Chen 2 weeks ago
parent 06463d71bc
commit 7b7e02a9e7

@ -21,14 +21,16 @@ from abc import ABC, abstractmethod
# from xbee.frame import APIFrame
# 自定義的 import
from .utils import setup_logger
from .utils import RingBuffer, setup_logger
# ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "0.60"
MODULE_VER = "0.80"
# ====================== 分割線 =====================
rx_module_ack = RingBuffer(capacity=256, buffer_id=253)
# ====================== State DEFINITION =====================
# 定義 serial 連線的模式
class SerialMode(Enum):
@ -37,7 +39,6 @@ class SerialMode(Enum):
XBEEAPI2AT = auto() # XBee API 模式
NOT_USE = auto() # 不使用
# ====================== Frame Processor Base and Implementation =====================
class FrameProcessor(ABC):
@ -139,6 +140,8 @@ class XBeeFrameProcessor(FrameProcessor):
if len(self.buffer) < full_length:
return None
# TODO: 驗證 checksum(sum(frame[3:]) & 0xFF) 應為 0xFF不通過則丟棄此 frame 並從 buffer pop 1 byte 再重新對齊
frame = bytes(self.buffer[:full_length])
del self.buffer[:full_length]
return frame
@ -198,15 +201,22 @@ class XBeeFrameProcessor(FrameProcessor):
class ATCommandHandler:
"""
XBee AT 指令回應處理器
XBee AT 指令處理器
職責
- AT Command Request frame (0x08) 並透過注入的 writer 送出
- 解析 AT Response frame (0x88)
- AT 指令分派到對應的 handler
writer SerialHandler 在連線建立後 (connection_made) 注入
型別為 Callable[[bytes], None]通常就是 serial transport.write
"""
FRAME_TYPE_AT_COMMAND = 0x08
def __init__(self, serial_port: str):
self.serial_port = serial_port
self.writer = None
self.handlers = {
b'DB': self._handle_rssi,
b'SH': self._handle_serial_high,
@ -214,6 +224,45 @@ class ATCommandHandler:
# 可擴展其他 AT 指令
}
# ---- 發送端 ----
def set_writer(self, writer):
"""由 SerialHandler 注入實際寫入 serial 的 callable"""
self.writer = writer
def send_command(self, command: bytes, parameter: bytes, frame_id: int):
"""
發送一筆 AT 指令給 dongle
- command: 2 bytes AT 指令名稱例如 b'DB'
- parameter: 指令參數 bytes讀取用指令通常為空
- frame_id: XBee frame id用來配對回應0x00 表示不要求回應
"""
if self.writer is None:
logger.warning(
f"[{self.serial_port}] AT writer 尚未就緒,指令 {command!r} 丟棄"
)
return
frame_bytes = self._build_at_request(command, parameter, frame_id)
self.writer(frame_bytes)
# logger.debug(
# f"[{self.serial_port}] send AT {command.decode(errors='replace')} "
# f"(frame_id=0x{frame_id:02X})"
# ) # dev
@staticmethod
def _build_at_request(command: bytes, parameter: bytes, frame_id: int) -> bytes:
"""
AT 指令組成 XBee API AT Command Request frame (0x08)
callers 必須明確指定 frame_id
"""
frame_type = ATCommandHandler.FRAME_TYPE_AT_COMMAND
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += command + parameter
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
# ---- 接收端 ----
def handle_frame(self, frame: bytes) -> None:
"""接收一整個 AT Response frame內部完成 parse + dispatch"""
parsed = self._parse(frame)
@ -245,6 +294,8 @@ class ATCommandHandler:
def _dispatch(self, response: dict) -> None:
"""根據 AT 指令類型分派處理"""
# print(f"[{self.serial_port}] AT Response: {response}") # dev
if not response['is_ok']:
logger.warning(
f"[{self.serial_port}] AT {response['command'].decode()} "
@ -262,9 +313,9 @@ class ATCommandHandler:
)
def _handle_rssi(self, data: bytes):
"""處理 DB (RSSI) 回應"""
# 未來可實現 RSSI 處理邏輯
pass
"""處理 DB (RSSI) 回應:單 byte 無號值,單位 dBm"""
if data:
print(f"[{self.serial_port}] RSSI = -{data[0]} dBm") # dev
def _handle_serial_high(self, data: bytes):
"""處理 SH (Serial Number High)"""
@ -286,24 +337,28 @@ class SerialHandler(asyncio.Protocol):
self.transport = None # Serial 自己的傳輸物件
# 根據模式建立 processor需要 AT handler 時一併在工廠內建好注入)
self.processor = self._create_processor(serial_mode, serial_port_str)
self.processor = self._create_processor()
@staticmethod
def _create_processor(serial_mode: SerialMode, serial_port_str: str) -> FrameProcessor:
"""工廠方法:根據模式建立對應的 processor必要時一併注入 AT handler"""
if serial_mode == SerialMode.STRAIGHT:
def _create_processor(self) -> FrameProcessor:
"""工廠方法:根據 self.serial_mode 建立對應的 processor必要時一併注入 AT handler"""
if self.serial_mode == SerialMode.STRAIGHT:
return RawFrameProcessor()
if serial_mode == SerialMode.XBEEAPI2AT:
at_handler = ATCommandHandler(serial_port_str)
if self.serial_mode == SerialMode.XBEEAPI2AT:
at_handler = ATCommandHandler(self.serial_port_str)
return XBeeFrameProcessor(at_handler=at_handler)
logger.warning(f"Unknown serial mode: {serial_mode}, using Raw")
logger.warning(f"Unknown serial mode: {self.serial_mode}, using Raw")
return RawFrameProcessor()
def connection_made(self, transport):
"""連接建立時的回調"""
self.transport = transport
# XBee 模式下,將 transport.write 注入給 AT handler讓它能送指令
if self.serial_mode == SerialMode.XBEEAPI2AT:
self.processor.at_handler.set_writer(self.transport.write)
if hasattr(self.udp_handler, 'set_serial_handler'):
self.udp_handler.set_serial_handler(self)
logger.debug(f"Serial port {self.serial_port_str} connected")
@ -589,6 +644,37 @@ class serial_manager:
ret[key] = obj.serial_port
return ret
def send_at_command(self, serial_id, command: bytes, parameter: bytes = b'', frame_id: int = 0x52) -> bool:
"""
對指定 serial_id XBee dongle 發送一筆 AT 指令thread-safe
- serial_id: create_serial_link 取得的編號
- command: 例如 b'DB'
- parameter: 指令參數讀取型指令通常為空
- frame_id: XBee frame id0x00 代表不要求回應
回傳是否成功排進事件圈
"""
if not self.running or not self.loop:
logger.error("Event loop not running, cannot send AT command")
return False
if serial_id not in self.serial_objects:
logger.error(f"Serial object {serial_id} not found")
return False
serial_obj = self.serial_objects[serial_id]
if serial_obj.serial_mode != SerialMode.XBEEAPI2AT:
logger.error(
f"Serial {serial_id} mode is {serial_obj.serial_mode.name}, "
f"AT command only supported in XBEEAPI2AT mode"
)
return False
at_handler = serial_obj.serial_handler.processor.at_handler
self.loop.call_soon_threadsafe(
at_handler.send_command, command, parameter, frame_id
)
return True
@staticmethod
def check_serial_port(serial_port, baudrate):
"""檢查串口是否存在與可用"""
@ -626,19 +712,24 @@ if __name__ == '__main__':
sm = serial_manager()
sm.start()
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14561
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
# SERIAL_PORT = '/dev/ttyACM0' # 手動指定
# SERIAL_BAUDRATE = 115200
# UDP_REMOTE_PORT = 14571
# sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT)
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14561
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
linked_serial = sm.get_serial_link()
print(linked_serial)
time.sleep(60)
# 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試
time.sleep(5)
for i in range(60):
sm.send_at_command(1, b'DB', frame_id=0x52)
time.sleep(1)
sm.remove_serial_link(1)
time.sleep(3)

Loading…
Cancel
Save