Update serialManager.py to support XBEE new mode (combine ESP32 function)

- Introduce XBeeFrameProcessor_ESPv1 class for handling send and process special MAGIC header for esp32
- Refactored request_discovery and request_poll methods for thread-safe operation.
master
Chiyu Chen 7 days ago
parent ab616ceb54
commit 4c3fe226d6

@ -1650,7 +1650,7 @@ def main():
if mvv.MODULE_VER != "1.10": if mvv.MODULE_VER != "1.10":
print("Module Version Error! : mavlinkVehicleView") print("Module Version Error! : mavlinkVehicleView")
version_check = False version_check = False
if sm.MODULE_VER != "0.80": if sm.MODULE_VER != "2.00":
print("Module Version Error! : serialManager") print("Module Version Error! : serialManager")
version_check = False version_check = False
if version_check == False: if version_check == False:

@ -14,20 +14,23 @@ import signal
import time import time
import threading import threading
import struct import struct
from collections import deque
from enum import Enum, auto from enum import Enum, auto
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from typing import Callable, Optional
# # XBee 模組 # # XBee 模組
# from xbee.frame import APIFrame # from xbee.frame import APIFrame
# 自定義的 import # 自定義的 import
from .utils import RingBuffer, setup_logger from .utils import RingBuffer, setup_logger
from .utils import pollStrategy
# ====================== 分割線 ===================== # ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
MODULE_VER = "0.80" MODULE_VER = "2.00"
rx_module_ack = RingBuffer(capacity=64, buffer_id=253) rx_module_ack = RingBuffer(capacity=64, buffer_id=253)
@ -37,8 +40,8 @@ rx_module_ack = RingBuffer(capacity=64, buffer_id=253)
class SerialMode(Enum): class SerialMode(Enum):
"""連接類型""" """連接類型"""
STRAIGHT = auto() # 原始數據直通 STRAIGHT = auto() # 原始數據直通
XBEEAPI2AT = auto() # XBee API 模式 XBEEAPI2AT = auto() # XBee API-AT 模式
XBEEAPI_POLL = auto() XBEEAPI_espv1 = auto() # XBee API-API 模式 esp v1
NOT_USE = auto() # 不使用 NOT_USE = auto() # 不使用
@ -101,19 +104,28 @@ class RawFrameProcessor(FrameProcessor):
return data return data
class XBeeFrameProcessor(FrameProcessor): class XBeeFrameProcessor_Base(FrameProcessor):
""" """
XBee API 協議處理器 XBee API 協議處理器
處理 XBEE API 端口 對應 -> 遠端 XBEE AT Mode
For SerialMode.XBEEAPI2AT
職責 職責
- XBee API frame 的拆幀 / 組幀 - XBee API frame 的拆幀 / 組幀
- 0x90 (RX Packet) -> 解出 payload 回傳 - 0x90 (RX Packet) -> 解出 payload 回傳
- 0x88 (AT Response) -> 轉交 at_handler 處理若有注入 - 0x88 (AT Response) -> 轉交 at_handler 處理若有注入
- 0x8B (TX Status) -> 目前忽略 - 0x8B (TX Status) -> 目前寫LOG (開發用)
- 其他 frame type -> warning 忽略 - 其他 frame type -> warning 忽略
若未來要做變化型 XBee例如 API2 escape mode不同 addressing 若未來要做變化型 XBee (例如 API2 escape mode不同 addressing)
繼承此類並覆寫 _encapsulate / _decapsulate / _try_extract_frame 即可 繼承此類並覆寫 _encapsulate / _decapsulate / _try_extract_frame 即可
硬體產品系列 (Product Family) : XB3-24
晶片世代: Digi XBee 3 代架構搭載 Silicon Labs EFR32 微控制器
運作頻段: 2.4 GHz RF
硬體構型: TH (Through-Hole)標準針腳插孔式引腳設計
運行協議 (Protocol / Function Set): 802.15.4
韌體版本 (Firmware Version): 2014 -> 2 (XBee 3 平台) + 0 (802.15.4 協議代碼) + 14 (次要版本號)
""" """
# XBee API frame type # XBee API frame type
@ -122,6 +134,10 @@ class XBeeFrameProcessor(FrameProcessor):
FRAME_TYPE_TX_STATUS = 0x8B FRAME_TYPE_TX_STATUS = 0x8B
FRAME_TYPE_RX_PACKET = 0x90 FRAME_TYPE_RX_PACKET = 0x90
# ADDR16 選項
DEST_ADDR16_UNICAST = b'\xFF\xFE'
DEST_ADDR16_BRAODCAST = b'\xFF\xFF'
def __init__(self, at_handler: "ATCommandHandler" = None): def __init__(self, at_handler: "ATCommandHandler" = None):
super().__init__() super().__init__()
self.at_handler = at_handler self.at_handler = at_handler
@ -174,11 +190,15 @@ class XBeeFrameProcessor(FrameProcessor):
return None return None
def _dispatch_frame(self, frame: bytes) -> bytes: def _dispatch_frame(self, frame: bytes) -> bytes:
"""根據 frame type 分派;若是 RX payload 回傳 bytes其餘回傳 None""" """
根據 frame type 分派
1. 若是 RX payload return bytes 遞交出去
2. 若是 AT command 進到相依類別 ATCommandHandler 處理
"""
frame_type = frame[3] frame_type = frame[3]
if frame_type == self.FRAME_TYPE_RX_PACKET: # mavlink if frame_type == self.FRAME_TYPE_RX_PACKET: # mavlink
return self._decapsulate(frame) return self._decapsulate(frame)[0]
if frame_type == self.FRAME_TYPE_AT_RESPONSE: # AT command if frame_type == self.FRAME_TYPE_AT_RESPONSE: # AT command
if self.at_handler is not None: if self.at_handler is not None:
@ -186,6 +206,12 @@ class XBeeFrameProcessor(FrameProcessor):
return None return None
if frame_type == self.FRAME_TYPE_TX_STATUS: if frame_type == self.FRAME_TYPE_TX_STATUS:
length = (frame[1] << 8) | frame[2] # for debug
logger.info(
f"TX Status raw={frame.hex()}, api_len={length}, "
f"fid=0x{frame[4]:02X}, dest16=0x{(frame[5]<<8)|frame[6]:04X}, "
f"retry={frame[7]}, delivery={frame[8]}, discovery={frame[9]}"
) # for debug
return None return None
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}") logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
@ -195,7 +221,8 @@ class XBeeFrameProcessor(FrameProcessor):
@staticmethod @staticmethod
def _encapsulate( def _encapsulate(
data: bytes, data: bytes,
dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\x00\x00',
dest_addr16 = DEST_ADDR16_BRAODCAST,
frame_id: int = 0x01, frame_id: int = 0x01,
) -> bytes: ) -> bytes:
""" """
@ -203,8 +230,7 @@ class XBeeFrameProcessor(FrameProcessor):
- 使用廣播地址 - 使用廣播地址
- 添加適當的頭部和校驗和 - 添加適當的頭部和校驗和
""" """
frame_type = XBeeFrameProcessor.FRAME_TYPE_TX_REQUEST frame_type = XBeeFrameProcessor_Base.FRAME_TYPE_TX_REQUEST
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00 broadcast_radius = 0x00
options = 0x00 options = 0x00
@ -219,7 +245,519 @@ class XBeeFrameProcessor(FrameProcessor):
"""從 RX Packet (0x90) 取出 payload""" """從 RX Packet (0x90) 取出 payload"""
length = (frame[1] << 8) | frame[2] length = (frame[1] << 8) | frame[2]
rf_data_start = 3 + 12 rf_data_start = 3 + 12
return frame[rf_data_start:3 + length] payload = frame[rf_data_start:3 + length]
senderAddr = frame[4:12]
return payload, senderAddr
class XBeeFrameProcessor_ESPv1(XBeeFrameProcessor_Base):
# GCS -> UAV:
# DISC
# POLL + esp_sysid(1) + grant_bytes(2)
#
# UAV -> GCS:
# HELO + esp_sysid(1)
# DONE + sysid(1) + sent_len(2) + remain_len(2)
DISC_HEADER = b'DISC'
HELLO_HEADER = b'HELO'
POLL_HEADER = b'POLL'
DONE_HEADER = b'DONE'
MAX_BYTES_PER_FLUSH = 150
MAX_PAYLOAD_PER_FRAME = 80
CHUNK_SEND_INTERVAL_SEC = 0.01
class Esp32DeviceInfo:
def __init__(self, system_id, address_64, last_hello_time):
self.system_id = system_id
self.address_64 = address_64
self.last_hello_time = last_hello_time
self.remain_bytes = 0 # 剩餘 buffer 量
self.last_done_time = 0.0 # 最後送出Done的時間
self.received_len = 0 # 收到封包累計
def __init__(self, at_handler: "ATCommandHandler" = None):
super().__init__(at_handler)
self.max_discovery_window_ms = 220
self.is_discovery_phase = False
self.esp32_address_mapping = {}
self.operator_busy = False
self.operator_running = False
self.serial_writer: Optional[Callable[[bytes], None]] = None
self.event_loop: Optional[asyncio.AbstractEventLoop] = None
self.serial_baudrate = 115200
self.gcs_transmit_queue: deque[bytearray] = deque()
self.poll_scheduler_state = pollStrategy.PollSchedulerState()
self.command_pending_event: Optional[asyncio.Event] = None
self.poll_done_event: Optional[asyncio.Event] = None
self.current_poll_address_64: Optional[bytes] = None
self.last_discovery_time = 0.0
self.discovery_interval_seconds = 30.0 # 每次做 discovery 程序的間隔時間
self.device_offline_timeout = self.discovery_interval_seconds * 2 # 遠端沒有回應會被踢出 超時時限
self.operator_tick_interval_seconds = 0.03 #
self.guard_milliseconds = 50 # POLL DONE 的保底時間間隔
self.pending_manual_discovery = False
self.pending_manual_poll: Optional[tuple[int, Optional[int]]] = None
# ---- 注入與設定 ----
def set_writer(self, writer: Callable[[bytes], None]) -> None:
self.serial_writer = writer
def set_event_loop(self, loop: asyncio.AbstractEventLoop) -> None:
self.event_loop = loop
self._ensure_async_primitives()
def set_serial_baudrate(self, baudrate: int) -> None:
self.serial_baudrate = baudrate
def _ensure_async_primitives(self) -> None:
if self.command_pending_event is None:
self.command_pending_event = asyncio.Event()
if self.poll_done_event is None:
self.poll_done_event = asyncio.Event()
# 有急事 叫醒 operator 做下一個 tick
def wake_operator(self) -> None:
self._ensure_async_primitives()
self.command_pending_event.set()
# ---- 拆幀分派 ----
def _dispatch_frame(self, frame: bytes) -> Optional[bytes]:
frame_type = frame[3]
if frame_type == self.FRAME_TYPE_RX_PACKET:
payload, sender_address_64 = self._decapsulate(frame)
if payload.startswith(self.HELLO_HEADER) and len(payload) == 5:
self.handle_hello_report(payload, sender_address_64)
return None
if payload.startswith(self.DONE_HEADER) and len(payload) == 9:
self.handle_done_report(payload, sender_address_64)
return None
remote_device = self.esp32_address_mapping.get(sender_address_64)
if remote_device is not None:
remote_device.received_len += len(payload)
return payload
if frame_type == self.FRAME_TYPE_AT_RESPONSE:
if self.at_handler is not None:
self.at_handler.handle_frame(frame)
return None
if frame_type == self.FRAME_TYPE_TX_STATUS:
length = (frame[1] << 8) | frame[2]
logger.debug(
f"TX Status raw={frame.hex()}, api_len={length}, "
f"fid=0x{frame[4]:02X}, dest16=0x{(frame[5]<<8)|frame[6]:04X}, "
f"retry={frame[7]}, delivery={frame[8]}, discovery={frame[9]}"
)
return None
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
return None
# ---- DISC / POLL 封裝 ----
def pack_discovery(self) -> bytes:
return self._encapsulate(self.DISC_HEADER, frame_id=0x00)
def handle_hello_report(self, payload: bytes, sender_address_64: bytes) -> None:
system_id = payload[4]
remote_device = self.esp32_address_mapping.get(sender_address_64)
if remote_device is None:
self.esp32_address_mapping[sender_address_64] = self.Esp32DeviceInfo(
system_id, sender_address_64, time.time()
)
logger.debug(
f"new HELO system_id={system_id}, address_64={sender_address_64.hex()}"
)
elif remote_device.address_64 == sender_address_64:
remote_device.last_hello_time = time.time()
else:
logger.warning(
f"SYSID duplicated system_id={system_id}, "
f"address_64={sender_address_64.hex()}"
)
def pack_poll(self, target_address_64: bytes, grant_bytes: int = 0) -> Optional[bytes]:
remote_device = self.esp32_address_mapping.get(target_address_64)
if remote_device is None:
return None
poll_payload = self.POLL_HEADER + struct.pack(
'>BH', remote_device.system_id, grant_bytes
)
remote_device.received_len = 0
return self._encapsulate(
poll_payload,
dest_addr64=remote_device.address_64,
dest_addr16=self.DEST_ADDR16_UNICAST,
frame_id=0x02,
)
def handle_done_report(self, payload: bytes, sender_address_64: bytes) -> None:
remote_device = self.esp32_address_mapping.get(sender_address_64)
if remote_device is None:
return
system_id, sent_length, remain_length = struct.unpack('>BHH', payload[4:9])
if sent_length != remote_device.received_len:
logger.info(
f"POLL may be missing packets sent={sent_length} "
f"received={remote_device.received_len} system_id={system_id}"
)
remote_device.received_len = 0
remote_device.remain_bytes = remain_length
remote_device.last_done_time = time.time()
if (
self.current_poll_address_64 is not None
and sender_address_64 == self.current_poll_address_64
and self.poll_done_event is not None
):
self.poll_done_event.set()
# ---- UDP到Serial 佇列 ----
def enqueue_gcs_transmit(self, payload: bytes) -> None:
if not payload:
return
self.gcs_transmit_queue.append(bytearray(payload))
# 只是 show 狀態
def get_gcs_queue_byte_count(self) -> int:
return sum(len(packet) for packet in self.gcs_transmit_queue)
# 只是 show 狀態
def get_gcs_queue_packet_count(self) -> int:
return len(self.gcs_transmit_queue)
# 把 gcs_transmit_queue 的 mavlink 封包依照大小打包出來
def _pop_flush_batch(self, max_bytes: int) -> list[bytes]:
if not self.gcs_transmit_queue:
return []
batch: list[bytes] = []
total_bytes = 0
while self.gcs_transmit_queue:
next_packet = bytes(self.gcs_transmit_queue[0])
packet_length = len(next_packet)
if not batch:
batch.append(self.gcs_transmit_queue.popleft())
total_bytes = packet_length
if packet_length > max_bytes:
break
continue
if total_bytes + packet_length <= max_bytes:
batch.append(self.gcs_transmit_queue.popleft())
total_bytes += packet_length
else:
break
return batch
# 把數據封裝好以後 交給 serial_writer 排程送出
async def _send_gcs_packet(self, packet: bytes) -> None:
# 重複判斷
# if self.serial_writer is None:
# return
sent_offset = 0
while sent_offset < len(packet):
chunk_end = min(
sent_offset + self.MAX_PAYLOAD_PER_FRAME,
len(packet),
)
chunk = packet[sent_offset:chunk_end]
sent_offset = chunk_end
self.serial_writer(self._encapsulate(chunk))
await asyncio.sleep(self.CHUNK_SEND_INTERVAL_SEC)
# 把上面兩個步驟打包起來 處理從 UDP 來的資訊 丟給 Serial
async def flush_gcs_transmit_queue(
self,
max_bytes: int = MAX_BYTES_PER_FLUSH,
) -> None:
if self.serial_writer is None:
logger.warning("GCS flush skipped: serial writer not ready")
return
for packet in self._pop_flush_batch(max_bytes):
await self._send_gcs_packet(packet)
# ---- POLL 排程輔助 ----
# 計算下次要 poll 的對象跟大小
def _pick_poll_target(self) -> tuple[Optional[bytes], int]:
poll_devices = [
pollStrategy.PollDevice(
address_64=address_64,
# system_id=device.system_id,
remain_bytes=device.remain_bytes,
last_done_time=device.last_done_time,
)
for address_64, device in self.esp32_address_mapping.items()
]
return pollStrategy.pick_next(
poll_devices,
self.poll_scheduler_state
)
# 移除長時間沒有 HELLO 或 DONE 的 Dongle
def _prune_stale_devices(self) -> None:
now = time.time()
stale_addresses = [
address_64
for address_64, device in self.esp32_address_mapping.items()
if (now - device.last_hello_time > self.device_offline_timeout) and \
(now - device.last_done_time > self.device_offline_timeout)
]
for address_64 in stale_addresses:
device = self.esp32_address_mapping.pop(address_64, None)
if device is not None:
logger.info(
f"Removed stale device system_id={device.system_id} "
f"address_64={address_64.hex()}"
)
# 判斷是否進入 discovery 程序
def _should_run_discovery(self) -> bool:
# 條件1. 目前沒有任何遠端ESP裝置被紀錄 或者 手動啟動
if (not self.esp32_address_mapping) or (self.pending_manual_discovery):
return True
# 條件2. 每個固定週期 會做一次
return (
time.time() - self.last_discovery_time
>= self.discovery_interval_seconds
)
# ---- 手動請求thread-safe 對外 API----
def request_discovery(self) -> bool:
"""
手動觸發 discovery 可從任意 thread 呼叫
實際排程在 serial_manager event loop 內執行
"""
if self.event_loop is None:
logger.warning("ESPv1 request_discovery: event loop not ready")
return False
if not self.operator_running:
logger.warning("ESPv1 request_discovery: operator not running")
return False
self.event_loop.call_soon_threadsafe(self._apply_request_discovery)
return True
def request_poll(
self,
target_system_id: int,
grant_bytes: Optional[int] = None,
) -> bool:
"""
手動對指定 system_id 排入一輪 POLL 可從任意 thread 呼叫
實際排程在 serial_manager event loop 內執行
"""
if self.event_loop is None:
logger.warning("ESPv1 request_poll: event loop not ready")
return False
if not self.operator_running:
logger.warning("ESPv1 request_poll: operator not running")
return False
self.event_loop.call_soon_threadsafe(
self._apply_request_poll,
target_system_id,
grant_bytes,
)
return True
def _apply_request_discovery(self) -> None:
self.pending_manual_discovery = True
self.wake_operator()
def _apply_request_poll(
self,
target_system_id: int,
grant_bytes: Optional[int],
) -> None:
self.pending_manual_poll = (target_system_id, grant_bytes)
self.wake_operator()
def get_status_snapshot(self) -> dict:
"""
唯讀狀態快照 for debug
從其他 thread 讀取時不保證與 operator 原子一致
"""
now = time.time()
devices = []
for address_64, device in self.esp32_address_mapping.items():
devices.append({
"system_id": device.system_id,
"address_64": address_64.hex(),
"remain_bytes": device.remain_bytes,
"last_hello_age_seconds": now - device.last_hello_time,
"last_done_age_seconds": (
now - device.last_done_time if device.last_done_time else None
),
})
return {
"operator_busy": self.operator_busy,
"is_discovery_phase": self.is_discovery_phase,
"gcs_queue_bytes": self.get_gcs_queue_byte_count(),
"gcs_queue_packets": self.get_gcs_queue_packet_count(),
"devices": devices,
}
# ---- operator 主循環 ----
# operator 的運行鐘 有點像是 spin_once
async def operator_loop(self) -> None:
self._ensure_async_primitives()
logger.info("ESPv1 operator loop started")
while self.operator_running:
try:
await self._operator_tick() # TODO 最好不要在 while loop 塞 try 想辦法改掉
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error(f"ESPv1 operator tick error: {exc}")
await self._wait_for_next_tick()
logger.info("ESPv1 operator loop stopped")
# 安排啥時要醒來做事
async def _wait_for_next_tick(self) -> None:
self.command_pending_event.clear()
# 固定時間醒來
sleep_task = asyncio.create_task( asyncio.sleep(self.operator_tick_interval_seconds) )
# 有"急事"被叫醒
wake_task = asyncio.create_task(self.command_pending_event.wait())
done, pending = await asyncio.wait(
{sleep_task, wake_task},
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
async def _operator_tick(self) -> None:
# 1. 移除沒反應 dongle
self._prune_stale_devices()
# 2. 忙碌時 略過這次循環
if self.operator_busy:
return
# 3. (最優先) 處理從 UDP 過來的封包 並且透過 Serial 送出
if self.gcs_transmit_queue:
await self.flush_gcs_transmit_queue()
return
# 4. 檢測要不要跑 discovery 程序
if self._should_run_discovery():
await self._run_discovery()
return
# 5. POLL 程序 (若有手動先執行 若無自動則策略決策)
if self.pending_manual_poll is not None:
target_system_id, grant_bytes = self.pending_manual_poll
self.pending_manual_poll = None
target_address = self._find_address_by_system_id(target_system_id)
else:
target_address, grant_bytes = self._pick_poll_target()
if target_address is None:
return
await self._run_one_poll(target_address, grant_bytes if grant_bytes is not None else 0)
def _find_address_by_system_id(self, target_system_id: int) -> Optional[bytes]:
for address_64, device in self.esp32_address_mapping.items():
if device.system_id == target_system_id:
return address_64
return None
# discovery 程序
async def _run_discovery(self) -> None:
if self.serial_writer is None:
self.pending_manual_discovery = False
return
self.operator_busy = True
self.is_discovery_phase = True
self.serial_writer(self.pack_discovery())
self.last_discovery_time = time.time()
await asyncio.sleep(self.max_discovery_window_ms / 1000.0)
self.operator_busy = False
self.is_discovery_phase = False
self.pending_manual_discovery = False
# poll 程序
async def _run_one_poll(self, target_address_64: bytes, grant_bytes: int) -> None:
if self.serial_writer is None:
return
# 這邊把要求封包最小量放在36 是讓載具端正好可以回傳一個有簽章的 HEARTBEAT 封包的大小
# 算是 "探測封包" 這樣讓系統可以更快速的知道載具端殘餘的資料量
grant_bytes = max(36, min(int(grant_bytes), 65535))
poll_frame = self.pack_poll(target_address_64, grant_bytes)
if poll_frame is None:
return
self._ensure_async_primitives()
self.operator_busy = True
self.current_poll_address_64 = target_address_64
self.poll_done_event.clear()
self.serial_writer(poll_frame)
timeout_seconds = pollStrategy.estimate_poll_timeout(
grant_bytes,
self.serial_baudrate,
self.guard_milliseconds,
)
try:
await asyncio.wait_for(
self.poll_done_event.wait(),
timeout=timeout_seconds,
)
except asyncio.TimeoutError:
logger.warning(
f"POLL timeout address_64={target_address_64.hex()} "
f"grant_bytes={grant_bytes}"
)
finally:
self.operator_busy = False
self.current_poll_address_64 = None
await asyncio.sleep(self.guard_milliseconds / 1000.0)
def stop_operator(self) -> None:
self.operator_running = False
self.gcs_transmit_queue.clear()
self.wake_operator()
# ====================== Dongle Command Handler ===================== # ====================== Dongle Command Handler =====================
@ -349,7 +887,6 @@ class ATCommandHandler:
"""處理 SL (Serial Number Low)""" """處理 SL (Serial Number Low)"""
pass pass
# ================ Serial UDP Socket Object ============== # ================ Serial UDP Socket Object ==============
class SerialHandler(asyncio.Protocol): class SerialHandler(asyncio.Protocol):
"""asyncio.Protocol 用於處理 Serial 收發""" """asyncio.Protocol 用於處理 Serial 收發"""
@ -368,13 +905,13 @@ class SerialHandler(asyncio.Protocol):
if self.serial_mode == SerialMode.STRAIGHT: if self.serial_mode == SerialMode.STRAIGHT:
return RawFrameProcessor() return RawFrameProcessor()
if self.serial_mode == SerialMode.XBEEAPI2AT: elif self.serial_mode == SerialMode.XBEEAPI2AT:
at_handler = ATCommandHandler(self.serial_port_str) at_handler = ATCommandHandler(self.serial_port_str)
return XBeeFrameProcessor(at_handler=at_handler) return XBeeFrameProcessor_Base(at_handler=at_handler)
# if self.serial_mode == SerialMode.XBEEAPI_POLL: elif self.serial_mode == SerialMode.XBEEAPI_espv1:
# at_handler = ATCommandHandler_new(self.serial_port_str) at_handler = ATCommandHandler(self.serial_port_str)
# return XBeeFrameProcessor(at_handler=at_handler) return XBeeFrameProcessor_ESPv1(at_handler=at_handler)
logger.warning(f"Unknown serial mode: {self.serial_mode}, using Raw") logger.warning(f"Unknown serial mode: {self.serial_mode}, using Raw")
return RawFrameProcessor() return RawFrameProcessor()
@ -387,6 +924,13 @@ class SerialHandler(asyncio.Protocol):
if self.serial_mode == SerialMode.XBEEAPI2AT: if self.serial_mode == SerialMode.XBEEAPI2AT:
self.processor.at_handler.set_writer(self.transport.write) self.processor.at_handler.set_writer(self.transport.write)
elif self.serial_mode == SerialMode.XBEEAPI_espv1:
if isinstance(self.processor, XBeeFrameProcessor_ESPv1):
self.processor.set_writer(self.transport.write)
self.processor.set_event_loop(asyncio.get_running_loop())
if self.processor.at_handler is not None:
self.processor.at_handler.set_writer(self.transport.write)
if hasattr(self.udp_handler, 'set_serial_handler'): if hasattr(self.udp_handler, 'set_serial_handler'):
self.udp_handler.set_serial_handler(self) self.udp_handler.set_serial_handler(self)
logger.debug(f"Serial port {self.serial_port_str} connected") logger.debug(f"Serial port {self.serial_port_str} connected")
@ -430,10 +974,14 @@ class UDPHandler(asyncio.DatagramProtocol):
logger.warning("Serial handler not set, dropping UDP packet") logger.warning("Serial handler not set, dropping UDP packet")
return return
# 使用 processor 封裝數據 if self.serial_mode == SerialMode.XBEEAPI_espv1:
processed_data = self.serial_handler.processor.process_outgoing(data) processor = self.serial_handler.processor
# if isinstance(processor, XBeeFrameProcessor_ESPv1): # 預設唯一綁定 少一個 if 多一點效率
processor.enqueue_gcs_transmit(data)
processor.wake_operator()
return
# 發送到串口 processed_data = self.serial_handler.processor.process_outgoing(data)
self.serial_handler.transport.write(processed_data) self.serial_handler.transport.write(processed_data)
@ -454,6 +1002,7 @@ class serial_manager:
self.protocol = None self.protocol = None
self.udp_handler = None self.udp_handler = None
self.serial_handler = None self.serial_handler = None
self.operator_task = None
def __init__(self): def __init__(self):
self.thread = None self.thread = None
@ -594,6 +1143,16 @@ class serial_manager:
logger.debug(f"Serial connection created for {serial_port}") logger.debug(f"Serial connection created for {serial_port}")
if serial_mode == SerialMode.XBEEAPI_espv1:
processor = serial_obj.serial_handler.processor
if isinstance(processor, XBeeFrameProcessor_ESPv1):
processor.set_serial_baudrate(baudrate)
processor.operator_running = True
serial_obj.operator_task = asyncio.create_task(
processor.operator_loop()
)
logger.debug(f"ESPv1 operator task started for {serial_port}")
# 將 serial_object 加入管理列表 # 將 serial_object 加入管理列表
serial_id = self.serial_count + 1 serial_id = self.serial_count + 1
self.serial_objects[serial_id] = serial_obj self.serial_objects[serial_id] = serial_obj
@ -647,6 +1206,18 @@ class serial_manager:
try: try:
serial_obj = self.serial_objects[serial_id] serial_obj = self.serial_objects[serial_id]
if serial_obj.serial_mode == SerialMode.XBEEAPI_espv1:
processor = serial_obj.serial_handler.processor
if isinstance(processor, XBeeFrameProcessor_ESPv1):
processor.stop_operator()
if serial_obj.operator_task is not None:
serial_obj.operator_task.cancel()
try:
await serial_obj.operator_task
except asyncio.CancelledError:
pass
serial_obj.operator_task = None
# 關閉 UDP transport # 關閉 UDP transport
if hasattr(serial_obj, 'transport') and serial_obj.transport: if hasattr(serial_obj, 'transport') and serial_obj.transport:
serial_obj.transport.close() serial_obj.transport.close()
@ -687,6 +1258,7 @@ class serial_manager:
logger.error(f"Serial object {serial_id} not found") logger.error(f"Serial object {serial_id} not found")
return False return False
# TODO 這邊的防呆 應該可以不用 if 有空再改
serial_obj = self.serial_objects[serial_id] serial_obj = self.serial_objects[serial_id]
if serial_obj.serial_mode != SerialMode.XBEEAPI2AT: if serial_obj.serial_mode != SerialMode.XBEEAPI2AT:
logger.error( logger.error(
@ -699,6 +1271,18 @@ class serial_manager:
self.loop.call_soon_threadsafe(at_handler.send_command, request) self.loop.call_soon_threadsafe(at_handler.send_command, request)
return True return True
def get_espv1_processor(self, serial_id: int) -> Optional[XBeeFrameProcessor_ESPv1]:
"""依 serial_id 取得 ESPv1 processor 不存在或模式不符回傳 None。"""
if serial_id not in self.serial_objects:
return None
serial_obj = self.serial_objects[serial_id]
if serial_obj.serial_mode != SerialMode.XBEEAPI_espv1:
return None
processor = serial_obj.serial_handler.processor
if not isinstance(processor, XBeeFrameProcessor_ESPv1):
return None
return processor
@staticmethod @staticmethod
def check_serial_port(serial_port, baudrate): def check_serial_port(serial_port, baudrate):
"""檢查串口是否存在與可用""" """檢查串口是否存在與可用"""
@ -736,42 +1320,100 @@ if __name__ == '__main__':
sm = serial_manager() sm = serial_manager()
sm.start() sm.start()
# # 測試項一
# SERIAL_PORT = '/dev/ttyACM0' # 手動指定 # SERIAL_PORT = '/dev/ttyACM0' # 手動指定
# SERIAL_BAUDRATE = 115200 # SERIAL_BAUDRATE = 115200
# UDP_REMOTE_PORT = 14571 # UDP_REMOTE_PORT = 14571
# sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT) # sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT)
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 # # 測試項二
# SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
# SERIAL_BAUDRATE = 115200
# UDP_REMOTE_PORT = 14561
# sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
# linked_serial = sm.get_serial_link()
# print(linked_serial)
# # 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試
# time.sleep(5)
# rssi_request = ATRequest(command=b'DB', parameter=b'', frame_id=0x52)
# for i in range(60):
# sm.send_at_command(1, rssi_request)
# time.sleep(1)
# sm.remove_serial_link(1)
# time.sleep(3)
# sm.shutdown()
# # 測試項三
SERIAL_PORT = '/dev/ttyUSB0'
SERIAL_BAUDRATE = 115200 SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14561 UDP_REMOTE_PORT = 14561
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT) sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI_espv1)
linked_serial = sm.get_serial_link() time.sleep(2) # 等 serial 連線與 operator 啟動
print(linked_serial)
# 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試 serial_id = 1
time.sleep(5) processor = sm.get_espv1_processor(serial_id)
rssi_request = ATRequest(command=b'DB', parameter=b'', frame_id=0x52) if processor is not None:
for i in range(60): processor.request_discovery()
sm.send_at_command(1, rssi_request) processor.request_poll(target_system_id=1)
time.sleep(1) processor.request_poll(target_system_id=1, grant_bytes=200)
print(processor.get_status_snapshot())
print(processor.get_gcs_queue_byte_count())
sm.remove_serial_link(1) sm.remove_serial_link(serial_id)
time.sleep(3) time.sleep(3)
sm.shutdown() sm.shutdown()
''' '''
================= 改版記錄 ============================ ================= 改版記錄 ============================
2026.4.20 2026.4.20
1. XBeeFrameHandler 結構移除 1. XBeeFrameHandler 結構移除
2. XBeeFrameProcessor 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler ) 2. XBeeFrameProcessor_Base 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler )
3. XBeeFrameProcessor 新增 _try_extract_frame 處理被可能截斷的 UART 封包 3. XBeeFrameProcessor_Base 新增 _try_extract_frame 處理被可能截斷的 UART 封包
4. XBeeFrameProcessor 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler 4. XBeeFrameProcessor_Base 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler
5. ATCommandHandler 新增 _parse 去拆解 0x88 AT Command Response 5. ATCommandHandler 新增 _parse 去拆解 0x88 AT Command Response
6. ATCommandHandler 新增 _dispatch 把拆解的結果 分配到 _handle_XXX 6. ATCommandHandler 新增 _dispatch 把拆解的結果 分配到 _handle_XXX
7. ATCommandHandler 新增各項 _handle_XXX (未實作) 7. ATCommandHandler 新增各項 _handle_XXX (未實作)
2026.06.15
1. 修改 XBeeFrameProcessor_Base _decapsulate 使其解出發送端 dongle src64 地址
2. 新增 XBeeFrameProcessor_ESPv1 這個類別繼承 XBeeFrameProcessor_Base
3. 新增 XBeeFrameProcessor_ESPv1 組合 DISC POLL 訊息 處理 HELO DONE 的能力
4. DISC 自動化與手動
5. POLL 的自動化
2026.06.16
1. 移除 DongleCommandHandler_ESPv1
2. ESPv1 對外 APIrequest_discovery / request_poll移至 XBeeFrameProcessor_ESPv1thread-safe
3. serial_manager 僅保留 get_espv1_processor(serial_id) lookup
註解1 : FRAME_TYPE_TX_STATUS 的對應解說 (我不喜歡程式塞太多為了顯示而顯示的東西 錯誤碼自己下來看)
TX_DELIVERY_STATUS
0x00: "Success",
0x01: "No ACK received",
0x02: "CCA failure",
0x15: "Invalid destination endpoint",
0x21: "Network ACK failure",
0x22: "Not joined to network",
0x23: "Self-addressed",
0x24: "Address not found",
0x25: "Route not found",
0x26: "Broadcast source relay",
0x27: "Insufficient data",
0x28: "TX buffered",
0x32: "Invalid send flag",
0x74: "Resource error",
TX_DISCOVERY_STATUS
0x00: "No discovery overhead",
0x01: "Address discovery",
0x02: "Route discovery",
0x03: "Address and route discovery",
''' '''

@ -0,0 +1,71 @@
"""
POLL 輪詢策略 XBeeFrameProcessor_ESPv1 使用
不依賴 serialManager僅接受裝置快照與排程狀態
"""
from dataclasses import dataclass
@dataclass(frozen=True)
class PollDevice:
"""從 esp32AddrMapping 抽出的唯讀快照。"""
address_64: bytes
# system_id: int
remain_bytes: int
last_done_time: float
@dataclass
class PollSchedulerState:
"""每條 serial link 一份,保存 round-robin 索引。"""
round_robin_index: int = 0
def pick_next(
devices: list[PollDevice],
scheduler_state: PollSchedulerState,
# default_grant_bytes: int = 600,
) -> tuple[bytes | None, int]:
"""
選下一個 POLL 目標
回傳 (target_address_64, grant_bytes)devices 為空時回傳 (None, 0)
"""
if not devices:
return None, 0
device_count = len(devices)
selected_index = scheduler_state.round_robin_index % device_count
selected_device = devices[selected_index]
scheduler_state.round_robin_index += 1
grant_bytes = 0
if selected_device.remain_bytes > 0:
grant_bytes = min( 65535, max(0, selected_device.remain_bytes))
return selected_device.address_64, grant_bytes
def estimate_poll_timeout(
grant_bytes: int,
baudrate: int,
guard_milliseconds: int = 20,
) -> float:
"""
估算 POLL 後等待 DONE 的超時秒數
移植自 udptest8 estimate_tdma_timeout
"""
grant_bytes = max(0, int(grant_bytes))
max_payload_per_chunk = 100
chunk_count = max(1, (grant_bytes + max_payload_per_chunk - 1) // max_payload_per_chunk)
uart_time = (grant_bytes + chunk_count * 18 + 32) * 10.0 / baudrate
timeout = (
uart_time
+ chunk_count * 0.012
+ (guard_milliseconds / 1000.0)
+ 0.35
)
return timeout
Loading…
Cancel
Save