diff --git a/src/fc_network_adapter/fc_network_adapter/serialManager.py b/src/fc_network_adapter/fc_network_adapter/serialManager.py index 731a950..02f89a4 100644 --- a/src/fc_network_adapter/fc_network_adapter/serialManager.py +++ b/src/fc_network_adapter/fc_network_adapter/serialManager.py @@ -15,6 +15,7 @@ import time import threading import struct from enum import Enum, auto +from abc import ABC, abstractmethod # # XBee 模組 # from xbee.frame import APIFrame @@ -27,6 +28,105 @@ from .utils import setup_logger logger = setup_logger(os.path.basename(__file__)) # ====================== 分割線 ===================== + +# 定義 serial 連線的模式 +class SerialMode(Enum): + """連接類型""" + STRAIGHT = auto() # 原始數據直通 + XBEEAPI2AT = auto() # XBee API 模式 + NOT_USE = auto() # 不使用 + + +# ====================== Frame Processor 基類與實現 ===================== + +class FrameProcessor(ABC): + """協議處理器基類""" + + def __init__(self): + self.buffer = bytearray() + + @abstractmethod + def process_incoming(self, data: bytes): + """ + 處理接收到的數據 + 返回:已完整解析的 payload 列表 + """ + pass + + @abstractmethod + def process_outgoing(self, data: bytes) -> bytes: + """ + 封裝要發送的數據 + 返回:封裝後的完整幀 + """ + pass + + +class RawFrameProcessor(FrameProcessor): + """原始數據直通處理器""" + + def process_incoming(self, data: bytes): + """直接返回原始數據,不進行緩衝""" + return [data] if data else [] + + def process_outgoing(self, data: bytes) -> bytes: + """直接返回原始數據,不進行封裝""" + return data + + +class XBeeFrameProcessor(FrameProcessor): + """XBee API 協議處理器""" + + def process_incoming(self, data: bytes): + """處理 XBee API 幀並提取 payload""" + self.buffer.extend(data) + payloads = [] + + while len(self.buffer) >= 3: + # 尋找幀頭 + if self.buffer[0] != 0x7E: + self.buffer.pop(0) + continue + + # 讀取 payload 長度 + length = (self.buffer[1] << 8) | self.buffer[2] + full_length = 3 + length + 1 # 起始符(1) + 長度(2) + payload + 校驗和(1) + + # 等待完整幀 + if len(self.buffer) < full_length: + break + + # 提取完整 frame 並從緩衝區移除 + frame = bytes(self.buffer[:full_length]) + del self.buffer[:full_length] + + # 判斷 frame 類型並處理 + frame_type = frame[3] + + if frame_type == 0x90: # RX Packet + payload = XBeeFrameHandler.decapsulate_data(frame) + if payload: + payloads.append(payload) + elif frame_type == 0x88: # AT Response + # 可以在這裡處理 AT 指令回應 + # response = XBeeFrameHandler.parse_at_command_response(frame) + # 目前忽略 + pass + elif frame_type == 0x8B: # Transmit Status + # 傳輸狀態,目前忽略 + pass + else: + logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}") + + return payloads + + def process_outgoing(self, data: bytes) -> bytes: + """將數據封裝為 XBee API 傳輸幀""" + return XBeeFrameProcessor.encapsulate_data(data) + + +# ====================== XBee Frame Handler ===================== + class XBeeFrameHandler: """XBee API Frame 處理器""" @@ -55,7 +155,7 @@ class XBeeFrameHandler: @staticmethod def parse_receive_packet(frame: bytes) -> dict: - # """解析 RX Packet (0x90) - 未來擴展用""" + """解析 RX Packet (0x90) - 未來擴展用""" # if len(frame) < 15 or frame[3] != 0x90: # return None @@ -91,34 +191,31 @@ class XBeeFrameHandler: @staticmethod def decapsulate_data(data: bytes): - # 這裡可以根據需要進行數據解封裝 - + """解封裝 XBee API 幀,提取 RF 數據""" # XBee API 幀格式: # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節) # 檢查幀起始符 (0x7E) if not data or len(data) < 5 or data[0] != 0x7E: - return data + return None # 獲取數據長度 (不包括校驗和) - # length = (data[1] << 8) + data[2] - length = (data[1] << 8) | data[2] + length = (self.buffer[1] << 8) | data[2] # 檢查幀完整性 if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和 - return data + return None - # 提取API標識符和數據 + # 提取 API 標識符 frame_type = data[3] - # frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中 # 根據不同的幀類型進行處理 - if frame_type == 0x90: # 例如,這是"接收數據包"類型 - rf_data_start = 3 + 12 + if frame_type == 0x90: # RX Packet + # 0x90 幀結構: [0x7E][長度H][長度L][0x90][64位地址(8)][16位地址(2)][選項(1)][RF數據...] + rf_data_start = 3 + 12 # 起始符(1) + 長度(2) + 類型(1) + 地址等(11) return data[rf_data_start:3 + length] else: - return None - return data + return None class ATCommandHandler: @@ -137,7 +234,7 @@ class ATCommandHandler: """根據 AT 指令類型分派處理""" if not response or not response['is_ok']: if response: - print(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}") + logger.warning(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}") return command = response['command'] @@ -146,189 +243,122 @@ class ATCommandHandler: if handler: handler(response['data']) else: - print(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}") + logger.debug(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}") def _handle_rssi(self, data: bytes): """處理 DB (RSSI) 回應""" - if not data: - return - - rssi_value = data[0] - now = time.time() - - # 檢查是否最近有收到 MAVLink - last_mavlink_time = serial_last_mavlink_time.get(self.serial_port, 0) - if now - last_mavlink_time > 0.5: - print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLink,RSSI = -{rssi_value} dBm 已忽略") - return - - # 取得對應的 sysid - sysid = serial_to_sysid.get(self.serial_port) - if sysid is None: - print(f"[{self.serial_port}] 找不到 sysid 對應,RSSI = -{rssi_value} dBm,已忽略") - return - - # 記錄 RSSI - rssi_history[sysid].append(-rssi_value) - time_history[sysid].append(now) - # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm") + # 未來可實現 RSSI 處理邏輯 + pass def _handle_serial_high(self, data: bytes): - # """處理 SH (Serial Number High) - 範例""" - # if len(data) >= 4: - # serial_high = int.from_bytes(data[:4], 'big') - # print(f"[{self.serial_port}] Serial High: 0x{serial_high:08X}") + """處理 SH (Serial Number High)""" pass def _handle_serial_low(self, data: bytes): - # """處理 SL (Serial Number Low) - 範例""" - # if len(data) >= 4: - # serial_low = int.from_bytes(data[:4], 'big') - # print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}") + """處理 SL (Serial Number Low)""" pass -# ====================== 分割線 ===================== -class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發 - def __init__(self, udp_handler, serial_port_str): - self.udp_handler = udp_handler # UDP 的傳輸物件 - self.serial_port_str = serial_port_str - self.at_handler = ATCommandHandler(serial_port_str) +# ====================== Serial Handler ===================== - self.buffer = bytearray() # 用於緩存接收到的資料 - self.transport = None # Serial 自己的傳輸物件 - # self.first_data = True # 標記是否為第一次收到資料 - # self.has_processed = False # 測試模式用 處理數據旗標 # debug +class SerialHandler(asyncio.Protocol): + """asyncio.Protocol 用於處理 Serial 收發""" + + def __init__(self, udp_handler, serial_port_str, serial_mode: SerialMode): + self.udp_handler = udp_handler # UDP 的傳輸物件 + self.serial_port_str = serial_port_str + self.serial_mode = serial_mode + self.transport = None # Serial 自己的傳輸物件 + + # 根據模式創建對應的 processor + self.processor = self._create_processor(serial_mode) + + # AT 指令處理器(僅 XBee 模式使用) + if serial_mode == SerialMode.XBEEAPI2AT: + self.at_handler = ATCommandHandler(serial_port_str) + else: + self.at_handler = None + + def _create_processor(self, serial_mode: SerialMode) -> FrameProcessor: + """工廠方法:根據模式創建處理器""" + if serial_mode == SerialMode.STRAIGHT: + return RawFrameProcessor() + elif serial_mode == SerialMode.XBEEAPI2AT: + return XBeeFrameProcessor() + else: + logger.warning(f"Unknown serial mode: {serial_mode}, using Raw") + return RawFrameProcessor() def connection_made(self, transport): + """連接建立時的回調""" self.transport = transport if hasattr(self.udp_handler, 'set_serial_handler'): self.udp_handler.set_serial_handler(self) - # logger.info(f"Serial port {self.serial_port_str} connected.") # debug + logger.debug(f"Serial port {self.serial_port_str} connected") - # Serial 收到資料的處理過程 def data_received(self, data): - # 1. 把收到的資料加入緩衝區 - self.buffer.extend(data) - - # 2. 需要完整的 header 才能解析 - while len(self.buffer) >= 3: - # 3. 瞄準 XBee API Frame (0x7E 開頭的封包) - if self.buffer[0] != 0x7E: - self.buffer.pop(0) # 如果不是就丟掉 - continue - - # 4. 讀取 payload 長度 - length = (self.buffer[1] << 8) | self.buffer[2] - full_length = 3 + length + 1 - - # 5. 等待完整封包 - if len(self.buffer) < full_length: - break - - # 6. 提取完整 frame 並從緩衝區移除 - an_frame = self.buffer[:full_length] - del self.buffer[:full_length] + """Serial 收到資料的處理過程""" + # 使用 processor 處理接收到的數據 + payloads = self.processor.process_incoming(data) + + # 將所有解析完成的 payload 轉發到 UDP + for payload in payloads: + self.udp_handler.transport.sendto( + payload, + (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port) + ) - # 7. 判斷 frame 類型 - frame_type = an_frame[3] - - if frame_type == 0x88: - # 處理 AT Command 回應 - # response = XBeeFrameHandler.parse_at_command_response(an_frame) - # self.at_handler.handle_response(response) - pass - - elif frame_type == 0x90: - # Receive Packet (RX) payload 先解碼 - processed_data = XBeeFrameHandler.decapsulate_data(bytes(an_frame)) - # 轉換失敗就捨棄了 - if processed_data is None: - continue - # 再透過 UDP 送出 - self.udp_handler.transport.sendto(processed_data, (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port)) - - elif frame_type == 0x8B: - pass - else: - # 其他類型的 frame 未來可擴展處理 現在忽略 - logger.warning(f"[{self.serial_port_str}] Undefined frame type: 0x{frame_type:02X}") - - # # RSSI - # if frame[3] == 0x88 and frame[5:7] == b'DB': # frame[3] == 0x88 AT -> API 封包 - # # frame[5:7] == b'DB' -> API 封包的DB參數 - # status = frame[7] # - # if status == 0x00 and len(frame) > 8: # status == 0x00 -> 這個封包是有效封包 - # rssi_value = frame[8] - # now = time.time() - - # # === 優化 1:僅信任最近 0.5 秒內有接收 MAVLink 的 port - # last_time = serial_last_mavlink_time.get(self.serial_port, 0) - # if now - last_time <= 0.5: - # sysid = serial_to_sysid.get(self.serial_port, None) - # if sysid is not None: - # rssi_history[sysid].append(-rssi_value) - # time_history[sysid].append(now) - # # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm") - # else: - # print(f"[{self.serial_port}] 找不到 sysid 對應,RSSI = -{rssi_value} dBm,已忽略") - # else: - # print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLink,RSSI = -{rssi_value} dBm 已忽略") - # else: - # print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}") - +# ====================== UDP Handler ===================== -class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發 +class UDPHandler(asyncio.DatagramProtocol): + """asyncio.DatagramProtocol 用於處理 UDP 收發""" - LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端IP - - def __init__(self, target_port): - self.target_port = target_port # 目標 UDP 端口 + LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端 IP - self.serial_handler = None # Serial 的傳輸物件 - self.transport = None # UDP 自己的傳輸物件 - self.remote_addr = None # 儲存動態獲取的遠程地址 # debug - # self.has_processed = False # 測試模式用 處理數據旗標 # debug + def __init__(self, target_port, serial_mode: SerialMode): + self.target_port = target_port # 目標 UDP 端口 + self.serial_mode = serial_mode + self.serial_handler = None # Serial 的傳輸物件 + self.transport = None # UDP 自己的傳輸物件 def connection_made(self, transport): + """連接建立時的回調""" self.transport = transport - # logger.info(f"UDP transport ready. Waiting for serial data before sending.") # debug + logger.debug(f"UDP transport ready for port {self.target_port}") def set_serial_handler(self, serial_handler): + """設置對應的 Serial Handler""" self.serial_handler = serial_handler - # UDP 收到資料的處理過程 def datagram_received(self, data, addr): - # 儲存對方的地址(這樣就能向同一個來源回傳數據) - # self.remote_addr = addr # debug - # print(f"Received UDP data from {addr}, setting as remote address") + """UDP 收到資料的處理過程""" + if not self.serial_handler: + logger.warning("Serial handler not set, dropping UDP packet") + return - processed_data = XBeeFrameHandler.encapsulate_data(data) - - if self.serial_handler: - self.serial_handler.transport.write(processed_data) - -#================================================================== + # 使用 processor 封裝數據 + processed_data = self.serial_handler.processor.process_outgoing(data) + + # 發送到串口 + self.serial_handler.transport.write(processed_data) -class SerialReceiverType(Enum): - """連接類型""" - TELEMETRY = auto() - XBEEAPI2AT = auto() - OTHER = auto() +# ====================== Serial Manager ===================== class serial_manager: + """串口管理器""" class serial_object: - def __init__(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): - self.serial_port = serial_port # /dev/ttyUSB or COM3 ...etc + """串口物件""" + def __init__(self, serial_port, baudrate, target_port, serial_mode: SerialMode): + self.serial_port = serial_port # /dev/ttyUSB or COM3 ...etc self.baudrate = baudrate - self.receiver_type = receiver_type - self.target_port = target_port # 指向的 UPD 端口 + self.serial_mode = serial_mode + self.target_port = target_port # 指向的 UDP 端口 - self.transport = None # TODO 這個變數可能沒有作用 - self.protocol = None # TODO 這個變數可能沒有作用 + self.transport = None + self.protocol = None self.udp_handler = None self.serial_handler = None @@ -337,21 +367,21 @@ class serial_manager: self.loop = None self.running = False self.serial_count = 0 - self.serial_objects = {} # serial id num : serial_object + self.serial_objects = {} # serial id num : serial_object def __del__(self): self.loop = None self.thread = None def start(self): - + """啟動 serial_manager""" if self.running: logger.warning("serial_manager already running") - return + return False self.running = True - # 啟動獨立線程 命名為 SerialManager + # 啟動獨立線程,命名為 SerialManager self.thread = threading.Thread( target=self._run_event_loop, name="SerialManager" @@ -375,7 +405,6 @@ class serial_manager: def shutdown(self): """停止 serial_manager 和其管理的所有 serial_object""" - # 自己在 running 狀態下才執行停止程序 if not self.running: logger.warning("serial_manager is not running") return @@ -404,37 +433,25 @@ class serial_manager: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) - # # 為每個 serial_object 建立連接 - # for serial_obj in self.serial_objects: - # coro = serial_asyncio.create_serial_connection( - # self.loop, - # lambda: SerialProtocol(serial_obj.receiver_type), - # serial_obj.serial_port, - # baudrate=serial_obj.baudrate - # ) - # transport, protocol = self.loop.run_until_complete(coro) - # serial_obj.transport = transport - # serial_obj.protocol = protocol - try: self.loop.run_forever() finally: self.loop.close() - def create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): - + def create_serial_link(self, serial_port, baudrate, target_port, serial_mode: SerialMode): + """創建串口連接""" if not self.running or not self.loop: logger.error("Event loop not running, cannot create serial link") return False - # 檢查 serial port 有效 + # 檢查 serial port 有效性 if not self.check_serial_port(serial_port, baudrate): logger.error(f"Serial port {serial_port} validation failed") return False # 使用 run_coroutine_threadsafe 執行協程並獲取結果 future = asyncio.run_coroutine_threadsafe( - self._async_create_serial_link(serial_port, baudrate, target_port, receiver_type), + self._async_create_serial_link(serial_port, baudrate, target_port, serial_mode), self.loop ) @@ -442,8 +459,8 @@ class serial_manager: # 等待結果,設定合理的超時時間 result = future.result(timeout=5.0) if result: - logger.info(f"Create Serial Link: {serial_port} -> UDP {target_port}") - return True + logger.info(f"Create Serial Link: {serial_port} ({serial_mode.name}) -> UDP {target_port}") + return result except asyncio.TimeoutError: logger.error(f"Timeout creating serial link for {serial_port}") return False @@ -451,14 +468,14 @@ class serial_manager: logger.error(f"Failed to create serial link for {serial_port}: {e}") return False - async def _async_create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): + async def _async_create_serial_link(self, serial_port, baudrate, target_port, serial_mode: SerialMode): """在事件循環線程中執行實際的連接創建""" try: # 創建 serial_object 實例 - serial_obj = self.serial_object(serial_port, baudrate, target_port, receiver_type) + serial_obj = self.serial_object(serial_port, baudrate, target_port, serial_mode) # 建立 UDP 處理器並指定目標端口位置 - serial_obj.udp_handler = UDPHandler(target_port) + serial_obj.udp_handler = UDPHandler(target_port, serial_mode) # 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配 udp_transport, udp_protocol = await self.loop.create_datagram_endpoint( @@ -468,10 +485,10 @@ class serial_manager: serial_obj.transport = udp_transport serial_obj.protocol = udp_protocol - # logger.info(f"UDP endpoint created for {serial_port}") # debug + logger.debug(f"UDP endpoint created for {serial_port}") - # 建立 Serial 處理器,將 UDP 處理器傳給它 - serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler, serial_port) + # 建立 Serial 處理器,將 UDP 處理器和模式傳給它 + serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler, serial_port, serial_mode) # 建立 Serial 連接 serial_transport, _ = await serial_asyncio.create_serial_connection( @@ -481,14 +498,14 @@ class serial_manager: baudrate=baudrate ) - # logger.info(f"Serial connection created for {serial_port}") # debug + logger.debug(f"Serial connection created for {serial_port}") # 將 serial_object 加入管理列表 serial_id = self.serial_count + 1 self.serial_objects[serial_id] = serial_obj self.serial_count += 1 - # logger.info(f"Serial object {serial_id} added to manager") # debug + logger.debug(f"Serial object {serial_id} added to manager") return True except Exception as e: @@ -501,12 +518,10 @@ class serial_manager: def remove_serial_link(self, serial_id): """移除串口連接(線程安全方式)""" - # 確保事件循環正在運行 if not self.loop: logger.error("Event loop not running") return False - # 檢查 serial_id 是否存在 if serial_id not in self.serial_objects: logger.warning(f"Serial object {serial_id} not found") return False @@ -549,7 +564,7 @@ class serial_manager: # 從管理列表中移除 del self.serial_objects[serial_id] - # logger.info(f"Serial object {serial_id} removed from manager") # debug + logger.debug(f"Serial object {serial_id} removed from manager") return True except Exception as e: @@ -557,7 +572,8 @@ class serial_manager: return False def get_serial_link(self): - ret = {} # serial id num : serial_port string + """取得所有串口連接資訊""" + ret = {} # serial id num : serial_port string for key, obj in self.serial_objects.items(): ret[key] = obj.serial_port return ret @@ -593,6 +609,8 @@ class serial_manager: return False +# ====================== 測試代碼 ===================== + if __name__ == '__main__': sm = serial_manager() sm.start() @@ -600,12 +618,13 @@ if __name__ == '__main__': SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 SERIAL_BAUDRATE = 115200 UDP_REMOTE_PORT = 14571 - sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialReceiverType.XBEEAPI2AT) + sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT) + # sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT) linked_serial = sm.get_serial_link() print(linked_serial) - time.sleep(10) + time.sleep(30) sm.remove_serial_link(1) time.sleep(3) - sm.shutdown() \ No newline at end of file + sm.shutdown()