From 4c3fe226d6a53ebdd39531c787651bff2308482a Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Tue, 16 Jun 2026 17:14:39 +0800 Subject: [PATCH] Update serialManager.py to support XBEE new mode (combine ESP32 function) - Introduce XBeeFrameProcessor_ESPv1 class for handling send and process special MAGIC header for esp32 - Refactored request_discovery and request_poll methods for thread-safe operation. --- .../fc_network_adapter/mainOrchestrator.py | 2 +- .../fc_network_adapter/serialManager.py | 720 +++++++++++++++++- .../fc_network_adapter/utils/pollStrategy.py | 71 ++ 3 files changed, 753 insertions(+), 40 deletions(-) create mode 100644 src/fc_network_adapter/fc_network_adapter/utils/pollStrategy.py diff --git a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py index 67bc1f4..a07a330 100644 --- a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py +++ b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py @@ -1650,7 +1650,7 @@ def main(): if mvv.MODULE_VER != "1.10": print("Module Version Error! : mavlinkVehicleView") version_check = False - if sm.MODULE_VER != "0.80": + if sm.MODULE_VER != "2.00": print("Module Version Error! : serialManager") version_check = False if version_check == False: diff --git a/src/fc_network_adapter/fc_network_adapter/serialManager.py b/src/fc_network_adapter/fc_network_adapter/serialManager.py index e4051fc..8ad482c 100644 --- a/src/fc_network_adapter/fc_network_adapter/serialManager.py +++ b/src/fc_network_adapter/fc_network_adapter/serialManager.py @@ -14,20 +14,23 @@ import signal import time import threading import struct +from collections import deque from enum import Enum, auto from abc import ABC, abstractmethod from dataclasses import dataclass +from typing import Callable, Optional # # XBee 模組 # from xbee.frame import APIFrame # 自定義的 import from .utils import RingBuffer, setup_logger +from .utils import pollStrategy # ====================== 分割線 ===================== logger = setup_logger(os.path.basename(__file__)) -MODULE_VER = "0.80" +MODULE_VER = "2.00" rx_module_ack = RingBuffer(capacity=64, buffer_id=253) @@ -37,8 +40,8 @@ rx_module_ack = RingBuffer(capacity=64, buffer_id=253) class SerialMode(Enum): """連接類型""" STRAIGHT = auto() # 原始數據直通 - XBEEAPI2AT = auto() # XBee API 模式 - XBEEAPI_POLL = auto() + XBEEAPI2AT = auto() # XBee API-AT 模式 + XBEEAPI_espv1 = auto() # XBee API-API 模式 esp v1 NOT_USE = auto() # 不使用 @@ -101,19 +104,28 @@ class RawFrameProcessor(FrameProcessor): return data -class XBeeFrameProcessor(FrameProcessor): +class XBeeFrameProcessor_Base(FrameProcessor): """ XBee API 協議處理器 + 處理 XBEE API 端口 對應 -> 遠端 XBEE AT Mode + For SerialMode.XBEEAPI2AT 職責: - XBee API frame 的拆幀 / 組幀 - 0x90 (RX Packet) -> 解出 payload 回傳 - 0x88 (AT Response) -> 轉交 at_handler 處理(若有注入) - - 0x8B (TX Status) -> 目前忽略 + - 0x8B (TX Status) -> 目前寫LOG (開發用) - 其他 frame type -> 記 warning 忽略 - 若未來要做變化型 XBee(例如 API2 escape mode、不同 addressing), + 若未來要做變化型 XBee (例如 API2 escape mode、不同 addressing) 繼承此類並覆寫 _encapsulate / _decapsulate / _try_extract_frame 即可。 + + 硬體產品系列 (Product Family) : XB3-24 + 晶片世代: Digi XBee 3 代架構(搭載 Silicon Labs EFR32 微控制器)。 + 運作頻段: 2.4 GHz RF。 + 硬體構型: TH (Through-Hole),標準針腳插孔式引腳設計。 + 運行協議 (Protocol / Function Set): 802.15.4 + 韌體版本 (Firmware Version): 2014 -> 2 (XBee 3 平台) + 0 (802.15.4 協議代碼) + 14 (次要版本號)。 """ # XBee API frame type @@ -122,6 +134,10 @@ class XBeeFrameProcessor(FrameProcessor): FRAME_TYPE_TX_STATUS = 0x8B FRAME_TYPE_RX_PACKET = 0x90 + # ADDR16 選項 + DEST_ADDR16_UNICAST = b'\xFF\xFE' + DEST_ADDR16_BRAODCAST = b'\xFF\xFF' + def __init__(self, at_handler: "ATCommandHandler" = None): super().__init__() self.at_handler = at_handler @@ -174,11 +190,15 @@ class XBeeFrameProcessor(FrameProcessor): return None def _dispatch_frame(self, frame: bytes) -> bytes: - """根據 frame type 分派;若是 RX payload 回傳 bytes,其餘回傳 None""" + """ + 根據 frame type 分派 + 1. 若是 RX payload 則 return bytes 遞交出去 + 2. 若是 AT command 則 進到相依類別 ATCommandHandler 處理 + """ frame_type = frame[3] if frame_type == self.FRAME_TYPE_RX_PACKET: # mavlink - return self._decapsulate(frame) + return self._decapsulate(frame)[0] if frame_type == self.FRAME_TYPE_AT_RESPONSE: # AT command if self.at_handler is not None: @@ -186,6 +206,12 @@ class XBeeFrameProcessor(FrameProcessor): return None if frame_type == self.FRAME_TYPE_TX_STATUS: + length = (frame[1] << 8) | frame[2] # for debug + logger.info( + f"TX Status raw={frame.hex()}, api_len={length}, " + f"fid=0x{frame[4]:02X}, dest16=0x{(frame[5]<<8)|frame[6]:04X}, " + f"retry={frame[7]}, delivery={frame[8]}, discovery={frame[9]}" + ) # for debug return None logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}") @@ -195,7 +221,8 @@ class XBeeFrameProcessor(FrameProcessor): @staticmethod def _encapsulate( data: bytes, - dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', + dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\x00\x00', + dest_addr16 = DEST_ADDR16_BRAODCAST, frame_id: int = 0x01, ) -> bytes: """ @@ -203,8 +230,7 @@ class XBeeFrameProcessor(FrameProcessor): - 使用廣播地址 - 添加適當的頭部和校驗和 """ - frame_type = XBeeFrameProcessor.FRAME_TYPE_TX_REQUEST - dest_addr16 = b'\xFF\xFE' + frame_type = XBeeFrameProcessor_Base.FRAME_TYPE_TX_REQUEST broadcast_radius = 0x00 options = 0x00 @@ -219,7 +245,519 @@ class XBeeFrameProcessor(FrameProcessor): """從 RX Packet (0x90) 取出 payload""" length = (frame[1] << 8) | frame[2] rf_data_start = 3 + 12 - return frame[rf_data_start:3 + length] + payload = frame[rf_data_start:3 + length] + senderAddr = frame[4:12] + return payload, senderAddr + + +class XBeeFrameProcessor_ESPv1(XBeeFrameProcessor_Base): + + # GCS -> UAV: + # DISC + # POLL + esp_sysid(1) + grant_bytes(2) + # + # UAV -> GCS: + # HELO + esp_sysid(1) + # DONE + sysid(1) + sent_len(2) + remain_len(2) + + DISC_HEADER = b'DISC' + HELLO_HEADER = b'HELO' + POLL_HEADER = b'POLL' + DONE_HEADER = b'DONE' + + MAX_BYTES_PER_FLUSH = 150 + MAX_PAYLOAD_PER_FRAME = 80 + CHUNK_SEND_INTERVAL_SEC = 0.01 + + class Esp32DeviceInfo: + def __init__(self, system_id, address_64, last_hello_time): + self.system_id = system_id + self.address_64 = address_64 + self.last_hello_time = last_hello_time + + self.remain_bytes = 0 # 剩餘 buffer 量 + self.last_done_time = 0.0 # 最後送出Done的時間 + self.received_len = 0 # 收到封包累計 + + def __init__(self, at_handler: "ATCommandHandler" = None): + super().__init__(at_handler) + + self.max_discovery_window_ms = 220 + self.is_discovery_phase = False + self.esp32_address_mapping = {} + + self.operator_busy = False + self.operator_running = False + + self.serial_writer: Optional[Callable[[bytes], None]] = None + self.event_loop: Optional[asyncio.AbstractEventLoop] = None + self.serial_baudrate = 115200 + + self.gcs_transmit_queue: deque[bytearray] = deque() + self.poll_scheduler_state = pollStrategy.PollSchedulerState() + + self.command_pending_event: Optional[asyncio.Event] = None + self.poll_done_event: Optional[asyncio.Event] = None + self.current_poll_address_64: Optional[bytes] = None + + self.last_discovery_time = 0.0 + self.discovery_interval_seconds = 30.0 # 每次做 discovery 程序的間隔時間 + self.device_offline_timeout = self.discovery_interval_seconds * 2 # 遠端沒有回應會被踢出 超時時限 + self.operator_tick_interval_seconds = 0.03 # + self.guard_milliseconds = 50 # POLL DONE 的保底時間間隔 + + self.pending_manual_discovery = False + self.pending_manual_poll: Optional[tuple[int, Optional[int]]] = None + + # ---- 注入與設定 ---- + + def set_writer(self, writer: Callable[[bytes], None]) -> None: + self.serial_writer = writer + + def set_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: + self.event_loop = loop + self._ensure_async_primitives() + + def set_serial_baudrate(self, baudrate: int) -> None: + self.serial_baudrate = baudrate + + def _ensure_async_primitives(self) -> None: + if self.command_pending_event is None: + self.command_pending_event = asyncio.Event() + if self.poll_done_event is None: + self.poll_done_event = asyncio.Event() + + # 有急事 叫醒 operator 做下一個 tick + def wake_operator(self) -> None: + self._ensure_async_primitives() + self.command_pending_event.set() + + # ---- 拆幀分派 ---- + + def _dispatch_frame(self, frame: bytes) -> Optional[bytes]: + frame_type = frame[3] + + if frame_type == self.FRAME_TYPE_RX_PACKET: + payload, sender_address_64 = self._decapsulate(frame) + + if payload.startswith(self.HELLO_HEADER) and len(payload) == 5: + self.handle_hello_report(payload, sender_address_64) + return None + + if payload.startswith(self.DONE_HEADER) and len(payload) == 9: + self.handle_done_report(payload, sender_address_64) + return None + + remote_device = self.esp32_address_mapping.get(sender_address_64) + if remote_device is not None: + remote_device.received_len += len(payload) + return payload + + if frame_type == self.FRAME_TYPE_AT_RESPONSE: + if self.at_handler is not None: + self.at_handler.handle_frame(frame) + return None + + if frame_type == self.FRAME_TYPE_TX_STATUS: + length = (frame[1] << 8) | frame[2] + logger.debug( + f"TX Status raw={frame.hex()}, api_len={length}, " + f"fid=0x{frame[4]:02X}, dest16=0x{(frame[5]<<8)|frame[6]:04X}, " + f"retry={frame[7]}, delivery={frame[8]}, discovery={frame[9]}" + ) + return None + + logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}") + return None + + # ---- DISC / POLL 封裝 ---- + + def pack_discovery(self) -> bytes: + return self._encapsulate(self.DISC_HEADER, frame_id=0x00) + + def handle_hello_report(self, payload: bytes, sender_address_64: bytes) -> None: + system_id = payload[4] + remote_device = self.esp32_address_mapping.get(sender_address_64) + + if remote_device is None: + self.esp32_address_mapping[sender_address_64] = self.Esp32DeviceInfo( + system_id, sender_address_64, time.time() + ) + logger.debug( + f"new HELO system_id={system_id}, address_64={sender_address_64.hex()}" + ) + elif remote_device.address_64 == sender_address_64: + remote_device.last_hello_time = time.time() + else: + logger.warning( + f"SYSID duplicated system_id={system_id}, " + f"address_64={sender_address_64.hex()}" + ) + + def pack_poll(self, target_address_64: bytes, grant_bytes: int = 0) -> Optional[bytes]: + remote_device = self.esp32_address_mapping.get(target_address_64) + if remote_device is None: + return None + + poll_payload = self.POLL_HEADER + struct.pack( + '>BH', remote_device.system_id, grant_bytes + ) + remote_device.received_len = 0 + return self._encapsulate( + poll_payload, + dest_addr64=remote_device.address_64, + dest_addr16=self.DEST_ADDR16_UNICAST, + frame_id=0x02, + ) + + def handle_done_report(self, payload: bytes, sender_address_64: bytes) -> None: + remote_device = self.esp32_address_mapping.get(sender_address_64) + if remote_device is None: + return + + system_id, sent_length, remain_length = struct.unpack('>BHH', payload[4:9]) + if sent_length != remote_device.received_len: + logger.info( + f"POLL may be missing packets sent={sent_length} " + f"received={remote_device.received_len} system_id={system_id}" + ) + + remote_device.received_len = 0 + remote_device.remain_bytes = remain_length + remote_device.last_done_time = time.time() + + if ( + self.current_poll_address_64 is not None + and sender_address_64 == self.current_poll_address_64 + and self.poll_done_event is not None + ): + self.poll_done_event.set() + + # ---- UDP到Serial 佇列 ---- + + def enqueue_gcs_transmit(self, payload: bytes) -> None: + if not payload: + return + self.gcs_transmit_queue.append(bytearray(payload)) + + # 只是 show 狀態 + def get_gcs_queue_byte_count(self) -> int: + return sum(len(packet) for packet in self.gcs_transmit_queue) + + # 只是 show 狀態 + def get_gcs_queue_packet_count(self) -> int: + return len(self.gcs_transmit_queue) + + # 把 gcs_transmit_queue 的 mavlink 封包依照大小打包出來 + def _pop_flush_batch(self, max_bytes: int) -> list[bytes]: + if not self.gcs_transmit_queue: + return [] + + batch: list[bytes] = [] + total_bytes = 0 + + while self.gcs_transmit_queue: + next_packet = bytes(self.gcs_transmit_queue[0]) + packet_length = len(next_packet) + + if not batch: + batch.append(self.gcs_transmit_queue.popleft()) + total_bytes = packet_length + if packet_length > max_bytes: + break + continue + + if total_bytes + packet_length <= max_bytes: + batch.append(self.gcs_transmit_queue.popleft()) + total_bytes += packet_length + else: + break + + return batch + + # 把數據封裝好以後 交給 serial_writer 排程送出 + async def _send_gcs_packet(self, packet: bytes) -> None: + # 重複判斷 + # if self.serial_writer is None: + # return + + sent_offset = 0 + while sent_offset < len(packet): + chunk_end = min( + sent_offset + self.MAX_PAYLOAD_PER_FRAME, + len(packet), + ) + chunk = packet[sent_offset:chunk_end] + sent_offset = chunk_end + self.serial_writer(self._encapsulate(chunk)) + await asyncio.sleep(self.CHUNK_SEND_INTERVAL_SEC) + + # 把上面兩個步驟打包起來 處理從 UDP 來的資訊 丟給 Serial + async def flush_gcs_transmit_queue( + self, + max_bytes: int = MAX_BYTES_PER_FLUSH, + ) -> None: + if self.serial_writer is None: + logger.warning("GCS flush skipped: serial writer not ready") + return + + for packet in self._pop_flush_batch(max_bytes): + await self._send_gcs_packet(packet) + + # ---- POLL 排程輔助 ---- + + # 計算下次要 poll 的對象跟大小 + def _pick_poll_target(self) -> tuple[Optional[bytes], int]: + poll_devices = [ + pollStrategy.PollDevice( + address_64=address_64, + # system_id=device.system_id, + remain_bytes=device.remain_bytes, + last_done_time=device.last_done_time, + ) + for address_64, device in self.esp32_address_mapping.items() + ] + + return pollStrategy.pick_next( + poll_devices, + self.poll_scheduler_state + ) + + # 移除長時間沒有 HELLO 或 DONE 的 Dongle + def _prune_stale_devices(self) -> None: + now = time.time() + stale_addresses = [ + address_64 + for address_64, device in self.esp32_address_mapping.items() + if (now - device.last_hello_time > self.device_offline_timeout) and \ + (now - device.last_done_time > self.device_offline_timeout) + ] + for address_64 in stale_addresses: + device = self.esp32_address_mapping.pop(address_64, None) + if device is not None: + logger.info( + f"Removed stale device system_id={device.system_id} " + f"address_64={address_64.hex()}" + ) + + # 判斷是否進入 discovery 程序 + def _should_run_discovery(self) -> bool: + # 條件1. 目前沒有任何遠端ESP裝置被紀錄 或者 手動啟動 + if (not self.esp32_address_mapping) or (self.pending_manual_discovery): + return True + # 條件2. 每個固定週期 會做一次 + return ( + time.time() - self.last_discovery_time + >= self.discovery_interval_seconds + ) + + # ---- 手動請求(thread-safe 對外 API)---- + + def request_discovery(self) -> bool: + """ + 手動觸發 discovery 可從任意 thread 呼叫。 + 實際排程在 serial_manager 的 event loop 內執行。 + """ + if self.event_loop is None: + logger.warning("ESPv1 request_discovery: event loop not ready") + return False + if not self.operator_running: + logger.warning("ESPv1 request_discovery: operator not running") + return False + self.event_loop.call_soon_threadsafe(self._apply_request_discovery) + return True + + def request_poll( + self, + target_system_id: int, + grant_bytes: Optional[int] = None, + ) -> bool: + """ + 手動對指定 system_id 排入一輪 POLL 可從任意 thread 呼叫。 + 實際排程在 serial_manager 的 event loop 內執行。 + """ + if self.event_loop is None: + logger.warning("ESPv1 request_poll: event loop not ready") + return False + if not self.operator_running: + logger.warning("ESPv1 request_poll: operator not running") + return False + self.event_loop.call_soon_threadsafe( + self._apply_request_poll, + target_system_id, + grant_bytes, + ) + return True + + def _apply_request_discovery(self) -> None: + self.pending_manual_discovery = True + self.wake_operator() + + def _apply_request_poll( + self, + target_system_id: int, + grant_bytes: Optional[int], + ) -> None: + self.pending_manual_poll = (target_system_id, grant_bytes) + self.wake_operator() + + def get_status_snapshot(self) -> dict: + """ + 唯讀狀態快照 for debug + 從其他 thread 讀取時不保證與 operator 原子一致。 + """ + now = time.time() + devices = [] + for address_64, device in self.esp32_address_mapping.items(): + devices.append({ + "system_id": device.system_id, + "address_64": address_64.hex(), + "remain_bytes": device.remain_bytes, + "last_hello_age_seconds": now - device.last_hello_time, + "last_done_age_seconds": ( + now - device.last_done_time if device.last_done_time else None + ), + }) + return { + "operator_busy": self.operator_busy, + "is_discovery_phase": self.is_discovery_phase, + "gcs_queue_bytes": self.get_gcs_queue_byte_count(), + "gcs_queue_packets": self.get_gcs_queue_packet_count(), + "devices": devices, + } + + # ---- operator 主循環 ---- + + # operator 的運行鐘 有點像是 spin_once + async def operator_loop(self) -> None: + self._ensure_async_primitives() + logger.info("ESPv1 operator loop started") + + while self.operator_running: + try: + await self._operator_tick() # TODO 最好不要在 while loop 塞 try 想辦法改掉 + except asyncio.CancelledError: + raise + except Exception as exc: + logger.error(f"ESPv1 operator tick error: {exc}") + + await self._wait_for_next_tick() + + logger.info("ESPv1 operator loop stopped") + + # 安排啥時要醒來做事 + async def _wait_for_next_tick(self) -> None: + self.command_pending_event.clear() + + # 固定時間醒來 + sleep_task = asyncio.create_task( asyncio.sleep(self.operator_tick_interval_seconds) ) + # 有"急事"被叫醒 + wake_task = asyncio.create_task(self.command_pending_event.wait()) + + done, pending = await asyncio.wait( + {sleep_task, wake_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + + for task in pending: + task.cancel() + + async def _operator_tick(self) -> None: + # 1. 移除沒反應 dongle + self._prune_stale_devices() + + # 2. 忙碌時 略過這次循環 + if self.operator_busy: + return + + # 3. (最優先) 處理從 UDP 過來的封包 並且透過 Serial 送出 + if self.gcs_transmit_queue: + await self.flush_gcs_transmit_queue() + return + + # 4. 檢測要不要跑 discovery 程序 + if self._should_run_discovery(): + await self._run_discovery() + return + + # 5. POLL 程序 (若有手動先執行 若無自動則策略決策) + if self.pending_manual_poll is not None: + target_system_id, grant_bytes = self.pending_manual_poll + self.pending_manual_poll = None + target_address = self._find_address_by_system_id(target_system_id) + else: + target_address, grant_bytes = self._pick_poll_target() + + if target_address is None: + return + await self._run_one_poll(target_address, grant_bytes if grant_bytes is not None else 0) + + def _find_address_by_system_id(self, target_system_id: int) -> Optional[bytes]: + for address_64, device in self.esp32_address_mapping.items(): + if device.system_id == target_system_id: + return address_64 + return None + + # discovery 程序 + async def _run_discovery(self) -> None: + if self.serial_writer is None: + self.pending_manual_discovery = False + return + + self.operator_busy = True + self.is_discovery_phase = True + + self.serial_writer(self.pack_discovery()) + self.last_discovery_time = time.time() + await asyncio.sleep(self.max_discovery_window_ms / 1000.0) + + self.operator_busy = False + self.is_discovery_phase = False + self.pending_manual_discovery = False + + # poll 程序 + async def _run_one_poll(self, target_address_64: bytes, grant_bytes: int) -> None: + if self.serial_writer is None: + return + + # 這邊把要求封包最小量放在36 是讓載具端正好可以回傳一個有簽章的 HEARTBEAT 封包的大小 + # 算是 "探測封包" 這樣讓系統可以更快速的知道載具端殘餘的資料量 + grant_bytes = max(36, min(int(grant_bytes), 65535)) + + poll_frame = self.pack_poll(target_address_64, grant_bytes) + if poll_frame is None: + return + + self._ensure_async_primitives() + self.operator_busy = True + self.current_poll_address_64 = target_address_64 + self.poll_done_event.clear() + self.serial_writer(poll_frame) + + timeout_seconds = pollStrategy.estimate_poll_timeout( + grant_bytes, + self.serial_baudrate, + self.guard_milliseconds, + ) + try: + await asyncio.wait_for( + self.poll_done_event.wait(), + timeout=timeout_seconds, + ) + except asyncio.TimeoutError: + logger.warning( + f"POLL timeout address_64={target_address_64.hex()} " + f"grant_bytes={grant_bytes}" + ) + finally: + self.operator_busy = False + self.current_poll_address_64 = None + await asyncio.sleep(self.guard_milliseconds / 1000.0) + + def stop_operator(self) -> None: + self.operator_running = False + self.gcs_transmit_queue.clear() + self.wake_operator() # ====================== Dongle Command Handler ===================== @@ -349,7 +887,6 @@ class ATCommandHandler: """處理 SL (Serial Number Low)""" pass - # ================ Serial UDP Socket Object ============== class SerialHandler(asyncio.Protocol): """asyncio.Protocol 用於處理 Serial 收發""" @@ -368,13 +905,13 @@ class SerialHandler(asyncio.Protocol): if self.serial_mode == SerialMode.STRAIGHT: return RawFrameProcessor() - if self.serial_mode == SerialMode.XBEEAPI2AT: + elif self.serial_mode == SerialMode.XBEEAPI2AT: at_handler = ATCommandHandler(self.serial_port_str) - return XBeeFrameProcessor(at_handler=at_handler) + return XBeeFrameProcessor_Base(at_handler=at_handler) - # if self.serial_mode == SerialMode.XBEEAPI_POLL: - # at_handler = ATCommandHandler_new(self.serial_port_str) - # return XBeeFrameProcessor(at_handler=at_handler) + elif self.serial_mode == SerialMode.XBEEAPI_espv1: + at_handler = ATCommandHandler(self.serial_port_str) + return XBeeFrameProcessor_ESPv1(at_handler=at_handler) logger.warning(f"Unknown serial mode: {self.serial_mode}, using Raw") return RawFrameProcessor() @@ -387,6 +924,13 @@ class SerialHandler(asyncio.Protocol): if self.serial_mode == SerialMode.XBEEAPI2AT: self.processor.at_handler.set_writer(self.transport.write) + elif self.serial_mode == SerialMode.XBEEAPI_espv1: + if isinstance(self.processor, XBeeFrameProcessor_ESPv1): + self.processor.set_writer(self.transport.write) + self.processor.set_event_loop(asyncio.get_running_loop()) + if self.processor.at_handler is not None: + self.processor.at_handler.set_writer(self.transport.write) + if hasattr(self.udp_handler, 'set_serial_handler'): self.udp_handler.set_serial_handler(self) logger.debug(f"Serial port {self.serial_port_str} connected") @@ -429,11 +973,15 @@ class UDPHandler(asyncio.DatagramProtocol): if not self.serial_handler: logger.warning("Serial handler not set, dropping UDP packet") return - - # 使用 processor 封裝數據 + + if self.serial_mode == SerialMode.XBEEAPI_espv1: + processor = self.serial_handler.processor + # if isinstance(processor, XBeeFrameProcessor_ESPv1): # 預設唯一綁定 少一個 if 多一點效率 + processor.enqueue_gcs_transmit(data) + processor.wake_operator() + return + processed_data = self.serial_handler.processor.process_outgoing(data) - - # 發送到串口 self.serial_handler.transport.write(processed_data) @@ -454,6 +1002,7 @@ class serial_manager: self.protocol = None self.udp_handler = None self.serial_handler = None + self.operator_task = None def __init__(self): self.thread = None @@ -594,6 +1143,16 @@ class serial_manager: logger.debug(f"Serial connection created for {serial_port}") + if serial_mode == SerialMode.XBEEAPI_espv1: + processor = serial_obj.serial_handler.processor + if isinstance(processor, XBeeFrameProcessor_ESPv1): + processor.set_serial_baudrate(baudrate) + processor.operator_running = True + serial_obj.operator_task = asyncio.create_task( + processor.operator_loop() + ) + logger.debug(f"ESPv1 operator task started for {serial_port}") + # 將 serial_object 加入管理列表 serial_id = self.serial_count + 1 self.serial_objects[serial_id] = serial_obj @@ -646,7 +1205,19 @@ class serial_manager: try: serial_obj = self.serial_objects[serial_id] - + + if serial_obj.serial_mode == SerialMode.XBEEAPI_espv1: + processor = serial_obj.serial_handler.processor + if isinstance(processor, XBeeFrameProcessor_ESPv1): + processor.stop_operator() + if serial_obj.operator_task is not None: + serial_obj.operator_task.cancel() + try: + await serial_obj.operator_task + except asyncio.CancelledError: + pass + serial_obj.operator_task = None + # 關閉 UDP transport if hasattr(serial_obj, 'transport') and serial_obj.transport: serial_obj.transport.close() @@ -687,6 +1258,7 @@ class serial_manager: logger.error(f"Serial object {serial_id} not found") return False + # TODO 這邊的防呆 應該可以不用 if 有空再改 serial_obj = self.serial_objects[serial_id] if serial_obj.serial_mode != SerialMode.XBEEAPI2AT: logger.error( @@ -699,6 +1271,18 @@ class serial_manager: self.loop.call_soon_threadsafe(at_handler.send_command, request) return True + def get_espv1_processor(self, serial_id: int) -> Optional[XBeeFrameProcessor_ESPv1]: + """依 serial_id 取得 ESPv1 processor 不存在或模式不符回傳 None。""" + if serial_id not in self.serial_objects: + return None + serial_obj = self.serial_objects[serial_id] + if serial_obj.serial_mode != SerialMode.XBEEAPI_espv1: + return None + processor = serial_obj.serial_handler.processor + if not isinstance(processor, XBeeFrameProcessor_ESPv1): + return None + return processor + @staticmethod def check_serial_port(serial_port, baudrate): """檢查串口是否存在與可用""" @@ -736,42 +1320,100 @@ if __name__ == '__main__': sm = serial_manager() sm.start() + # # 測試項一 # SERIAL_PORT = '/dev/ttyACM0' # 手動指定 # SERIAL_BAUDRATE = 115200 # UDP_REMOTE_PORT = 14571 # sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT) - SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 + # # 測試項二 + # SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 + # SERIAL_BAUDRATE = 115200 + # UDP_REMOTE_PORT = 14561 + # sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT) + + # linked_serial = sm.get_serial_link() + # print(linked_serial) + + # # 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試 + # time.sleep(5) + # rssi_request = ATRequest(command=b'DB', parameter=b'', frame_id=0x52) + # for i in range(60): + # sm.send_at_command(1, rssi_request) + # time.sleep(1) + + # sm.remove_serial_link(1) + # time.sleep(3) + # sm.shutdown() + + # # 測試項三 + SERIAL_PORT = '/dev/ttyUSB0' SERIAL_BAUDRATE = 115200 UDP_REMOTE_PORT = 14561 - sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT) + sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI_espv1) - linked_serial = sm.get_serial_link() - print(linked_serial) + time.sleep(2) # 等 serial 連線與 operator 啟動 - # 等 connection_made 完成 writer 注入,再發一筆 AT 指令測試 - time.sleep(5) - rssi_request = ATRequest(command=b'DB', parameter=b'', frame_id=0x52) - for i in range(60): - sm.send_at_command(1, rssi_request) - time.sleep(1) + serial_id = 1 + processor = sm.get_espv1_processor(serial_id) + if processor is not None: + processor.request_discovery() + processor.request_poll(target_system_id=1) + processor.request_poll(target_system_id=1, grant_bytes=200) + print(processor.get_status_snapshot()) + print(processor.get_gcs_queue_byte_count()) - sm.remove_serial_link(1) + sm.remove_serial_link(serial_id) time.sleep(3) sm.shutdown() - - ''' ================= 改版記錄 ============================ 2026.4.20 1. XBeeFrameHandler 結構移除 -2. XBeeFrameProcessor 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler 中) -3. XBeeFrameProcessor 新增 _try_extract_frame 處理被可能截斷的 UART 封包 -4. XBeeFrameProcessor 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler +2. XBeeFrameProcessor_Base 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler 中) +3. XBeeFrameProcessor_Base 新增 _try_extract_frame 處理被可能截斷的 UART 封包 +4. XBeeFrameProcessor_Base 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler 5. ATCommandHandler 新增 _parse 去拆解 0x88 AT Command Response 6. ATCommandHandler 新增 _dispatch 把拆解的結果 分配到 _handle_XXX 7. ATCommandHandler 新增各項 _handle_XXX (未實作) +2026.06.15 +1. 修改 XBeeFrameProcessor_Base _decapsulate 使其解出發送端 dongle 的 src64 地址 +2. 新增 XBeeFrameProcessor_ESPv1 這個類別繼承 XBeeFrameProcessor_Base +3. 新增 XBeeFrameProcessor_ESPv1 組合 DISC 跟 POLL 訊息 處理 HELO 跟 DONE 的能力 +4. DISC 自動化與手動 +5. POLL 的自動化 + +2026.06.16 +1. 移除 DongleCommandHandler_ESPv1 +2. ESPv1 對外 API(request_discovery / request_poll)移至 XBeeFrameProcessor_ESPv1,thread-safe +3. serial_manager 僅保留 get_espv1_processor(serial_id) lookup + + +註解1 : FRAME_TYPE_TX_STATUS 的對應解說 (我不喜歡程式塞太多為了顯示而顯示的東西 錯誤碼自己下來看) + TX_DELIVERY_STATUS + 0x00: "Success", + 0x01: "No ACK received", + 0x02: "CCA failure", + 0x15: "Invalid destination endpoint", + 0x21: "Network ACK failure", + 0x22: "Not joined to network", + 0x23: "Self-addressed", + 0x24: "Address not found", + 0x25: "Route not found", + 0x26: "Broadcast source relay", + 0x27: "Insufficient data", + 0x28: "TX buffered", + 0x32: "Invalid send flag", + 0x74: "Resource error", + + TX_DISCOVERY_STATUS + 0x00: "No discovery overhead", + 0x01: "Address discovery", + 0x02: "Route discovery", + 0x03: "Address and route discovery", + + ''' diff --git a/src/fc_network_adapter/fc_network_adapter/utils/pollStrategy.py b/src/fc_network_adapter/fc_network_adapter/utils/pollStrategy.py new file mode 100644 index 0000000..92290d8 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/utils/pollStrategy.py @@ -0,0 +1,71 @@ +""" +POLL 輪詢策略,供 XBeeFrameProcessor_ESPv1 使用。 + +不依賴 serialManager,僅接受裝置快照與排程狀態。 +""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class PollDevice: + """從 esp32AddrMapping 抽出的唯讀快照。""" + address_64: bytes + # system_id: int + remain_bytes: int + last_done_time: float + + +@dataclass +class PollSchedulerState: + """每條 serial link 一份,保存 round-robin 索引。""" + round_robin_index: int = 0 + + +def pick_next( + devices: list[PollDevice], + scheduler_state: PollSchedulerState, + # default_grant_bytes: int = 600, +) -> tuple[bytes | None, int]: + """ + 選下一個 POLL 目標。 + + 回傳 (target_address_64, grant_bytes);devices 為空時回傳 (None, 0)。 + """ + if not devices: + return None, 0 + + device_count = len(devices) + selected_index = scheduler_state.round_robin_index % device_count + selected_device = devices[selected_index] + scheduler_state.round_robin_index += 1 + + grant_bytes = 0 + if selected_device.remain_bytes > 0: + grant_bytes = min( 65535, max(0, selected_device.remain_bytes)) + + return selected_device.address_64, grant_bytes + + +def estimate_poll_timeout( + grant_bytes: int, + baudrate: int, + guard_milliseconds: int = 20, +) -> float: + """ + 估算 POLL 後等待 DONE 的超時秒數。 + 移植自 udptest8 estimate_tdma_timeout。 + """ + grant_bytes = max(0, int(grant_bytes)) + max_payload_per_chunk = 100 + chunk_count = max(1, (grant_bytes + max_payload_per_chunk - 1) // max_payload_per_chunk) + + uart_time = (grant_bytes + chunk_count * 18 + 32) * 10.0 / baudrate + timeout = ( + uart_time + + chunk_count * 0.012 + + (guard_milliseconds / 1000.0) + + 0.35 + ) + return timeout +