(temp) 暫存

chiyu
Chiyu Chen 3 months ago
parent 3d48b1d9fe
commit 1d49ca12e9

@ -15,6 +15,7 @@ import time
import threading import threading
import struct import struct
from enum import Enum, auto from enum import Enum, auto
from abc import ABC, abstractmethod
# # XBee 模組 # # XBee 模組
# from xbee.frame import APIFrame # from xbee.frame import APIFrame
@ -27,6 +28,105 @@ from .utils import setup_logger
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
# ====================== 分割線 ===================== # ====================== 分割線 =====================
# 定義 serial 連線的模式
class SerialMode(Enum):
"""連接類型"""
STRAIGHT = auto() # 原始數據直通
XBEEAPI2AT = auto() # XBee API 模式
NOT_USE = auto() # 不使用
# ====================== Frame Processor 基類與實現 =====================
class FrameProcessor(ABC):
"""協議處理器基類"""
def __init__(self):
self.buffer = bytearray()
@abstractmethod
def process_incoming(self, data: bytes):
"""
處理接收到的數據
返回已完整解析的 payload 列表
"""
pass
@abstractmethod
def process_outgoing(self, data: bytes) -> bytes:
"""
封裝要發送的數據
返回封裝後的完整幀
"""
pass
class RawFrameProcessor(FrameProcessor):
"""原始數據直通處理器"""
def process_incoming(self, data: bytes):
"""直接返回原始數據,不進行緩衝"""
return [data] if data else []
def process_outgoing(self, data: bytes) -> bytes:
"""直接返回原始數據,不進行封裝"""
return data
class XBeeFrameProcessor(FrameProcessor):
"""XBee API 協議處理器"""
def process_incoming(self, data: bytes):
"""處理 XBee API 幀並提取 payload"""
self.buffer.extend(data)
payloads = []
while len(self.buffer) >= 3:
# 尋找幀頭
if self.buffer[0] != 0x7E:
self.buffer.pop(0)
continue
# 讀取 payload 長度
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1 # 起始符(1) + 長度(2) + payload + 校驗和(1)
# 等待完整幀
if len(self.buffer) < full_length:
break
# 提取完整 frame 並從緩衝區移除
frame = bytes(self.buffer[:full_length])
del self.buffer[:full_length]
# 判斷 frame 類型並處理
frame_type = frame[3]
if frame_type == 0x90: # RX Packet
payload = XBeeFrameHandler.decapsulate_data(frame)
if payload:
payloads.append(payload)
elif frame_type == 0x88: # AT Response
# 可以在這裡處理 AT 指令回應
# response = XBeeFrameHandler.parse_at_command_response(frame)
# 目前忽略
pass
elif frame_type == 0x8B: # Transmit Status
# 傳輸狀態,目前忽略
pass
else:
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
return payloads
def process_outgoing(self, data: bytes) -> bytes:
"""將數據封裝為 XBee API 傳輸幀"""
return XBeeFrameProcessor.encapsulate_data(data)
# ====================== XBee Frame Handler =====================
class XBeeFrameHandler: class XBeeFrameHandler:
"""XBee API Frame 處理器""" """XBee API Frame 處理器"""
@ -55,7 +155,7 @@ class XBeeFrameHandler:
@staticmethod @staticmethod
def parse_receive_packet(frame: bytes) -> dict: def parse_receive_packet(frame: bytes) -> dict:
# """解析 RX Packet (0x90) - 未來擴展用""" """解析 RX Packet (0x90) - 未來擴展用"""
# if len(frame) < 15 or frame[3] != 0x90: # if len(frame) < 15 or frame[3] != 0x90:
# return None # return None
@ -91,34 +191,31 @@ class XBeeFrameHandler:
@staticmethod @staticmethod
def decapsulate_data(data: bytes): def decapsulate_data(data: bytes):
# 這裡可以根據需要進行數據解封裝 """解封裝 XBee API 幀,提取 RF 數據"""
# XBee API 幀格式: # XBee API 幀格式:
# 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節) # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節)
# 檢查幀起始符 (0x7E) # 檢查幀起始符 (0x7E)
if not data or len(data) < 5 or data[0] != 0x7E: if not data or len(data) < 5 or data[0] != 0x7E:
return data return None
# 獲取數據長度 (不包括校驗和) # 獲取數據長度 (不包括校驗和)
# length = (data[1] << 8) + data[2] length = (self.buffer[1] << 8) | data[2]
length = (data[1] << 8) | data[2]
# 檢查幀完整性 # 檢查幀完整性
if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和 if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和
return data return None
# 提取API標識符和數據 # 提取 API 標識符
frame_type = data[3] frame_type = data[3]
# frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中
# 根據不同的幀類型進行處理 # 根據不同的幀類型進行處理
if frame_type == 0x90: # 例如,這是"接收數據包"類型 if frame_type == 0x90: # RX Packet
rf_data_start = 3 + 12 # 0x90 幀結構: [0x7E][長度H][長度L][0x90][64位地址(8)][16位地址(2)][選項(1)][RF數據...]
rf_data_start = 3 + 12 # 起始符(1) + 長度(2) + 類型(1) + 地址等(11)
return data[rf_data_start:3 + length] return data[rf_data_start:3 + length]
else: else:
return None return None
return data
class ATCommandHandler: class ATCommandHandler:
@ -137,7 +234,7 @@ class ATCommandHandler:
"""根據 AT 指令類型分派處理""" """根據 AT 指令類型分派處理"""
if not response or not response['is_ok']: if not response or not response['is_ok']:
if response: if response:
print(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}") logger.warning(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}")
return return
command = response['command'] command = response['command']
@ -146,189 +243,122 @@ class ATCommandHandler:
if handler: if handler:
handler(response['data']) handler(response['data'])
else: else:
print(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}") logger.debug(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}")
def _handle_rssi(self, data: bytes): def _handle_rssi(self, data: bytes):
"""處理 DB (RSSI) 回應""" """處理 DB (RSSI) 回應"""
if not data: # 未來可實現 RSSI 處理邏輯
return pass
rssi_value = data[0]
now = time.time()
# 檢查是否最近有收到 MAVLink
last_mavlink_time = serial_last_mavlink_time.get(self.serial_port, 0)
if now - last_mavlink_time > 0.5:
print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLinkRSSI = -{rssi_value} dBm 已忽略")
return
# 取得對應的 sysid
sysid = serial_to_sysid.get(self.serial_port)
if sysid is None:
print(f"[{self.serial_port}] 找不到 sysid 對應RSSI = -{rssi_value} dBm已忽略")
return
# 記錄 RSSI
rssi_history[sysid].append(-rssi_value)
time_history[sysid].append(now)
# print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm")
def _handle_serial_high(self, data: bytes): def _handle_serial_high(self, data: bytes):
# """處理 SH (Serial Number High) - 範例""" """處理 SH (Serial Number High)"""
# if len(data) >= 4:
# serial_high = int.from_bytes(data[:4], 'big')
# print(f"[{self.serial_port}] Serial High: 0x{serial_high:08X}")
pass pass
def _handle_serial_low(self, data: bytes): def _handle_serial_low(self, data: bytes):
# """處理 SL (Serial Number Low) - 範例""" """處理 SL (Serial Number Low)"""
# if len(data) >= 4:
# serial_low = int.from_bytes(data[:4], 'big')
# print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}")
pass pass
# ====================== 分割線 =====================
class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發 # ====================== Serial Handler =====================
def __init__(self, udp_handler, serial_port_str):
class SerialHandler(asyncio.Protocol):
"""asyncio.Protocol 用於處理 Serial 收發"""
def __init__(self, udp_handler, serial_port_str, serial_mode: SerialMode):
self.udp_handler = udp_handler # UDP 的傳輸物件 self.udp_handler = udp_handler # UDP 的傳輸物件
self.serial_port_str = serial_port_str self.serial_port_str = serial_port_str
self.at_handler = ATCommandHandler(serial_port_str) self.serial_mode = serial_mode
self.buffer = bytearray() # 用於緩存接收到的資料
self.transport = None # Serial 自己的傳輸物件 self.transport = None # Serial 自己的傳輸物件
# self.first_data = True # 標記是否為第一次收到資料
# self.has_processed = False # 測試模式用 處理數據旗標 # debug # 根據模式創建對應的 processor
self.processor = self._create_processor(serial_mode)
# AT 指令處理器(僅 XBee 模式使用)
if serial_mode == SerialMode.XBEEAPI2AT:
self.at_handler = ATCommandHandler(serial_port_str)
else:
self.at_handler = None
def _create_processor(self, serial_mode: SerialMode) -> FrameProcessor:
"""工廠方法:根據模式創建處理器"""
if serial_mode == SerialMode.STRAIGHT:
return RawFrameProcessor()
elif serial_mode == SerialMode.XBEEAPI2AT:
return XBeeFrameProcessor()
else:
logger.warning(f"Unknown serial mode: {serial_mode}, using Raw")
return RawFrameProcessor()
def connection_made(self, transport): def connection_made(self, transport):
"""連接建立時的回調"""
self.transport = transport self.transport = transport
if hasattr(self.udp_handler, 'set_serial_handler'): if hasattr(self.udp_handler, 'set_serial_handler'):
self.udp_handler.set_serial_handler(self) self.udp_handler.set_serial_handler(self)
# logger.info(f"Serial port {self.serial_port_str} connected.") # debug logger.debug(f"Serial port {self.serial_port_str} connected")
# Serial 收到資料的處理過程
def data_received(self, data): def data_received(self, data):
# 1. 把收到的資料加入緩衝區 """Serial 收到資料的處理過程"""
self.buffer.extend(data) # 使用 processor 處理接收到的數據
payloads = self.processor.process_incoming(data)
# 2. 需要完整的 header 才能解析
while len(self.buffer) >= 3: # 將所有解析完成的 payload 轉發到 UDP
# 3. 瞄準 XBee API Frame (0x7E 開頭的封包) for payload in payloads:
if self.buffer[0] != 0x7E: self.udp_handler.transport.sendto(
self.buffer.pop(0) # 如果不是就丟掉 payload,
continue (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port)
)
# 4. 讀取 payload 長度
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1
# 5. 等待完整封包
if len(self.buffer) < full_length:
break
# 6. 提取完整 frame 並從緩衝區移除
an_frame = self.buffer[:full_length]
del self.buffer[:full_length]
# 7. 判斷 frame 類型
frame_type = an_frame[3]
if frame_type == 0x88:
# 處理 AT Command 回應
# response = XBeeFrameHandler.parse_at_command_response(an_frame)
# self.at_handler.handle_response(response)
pass
elif frame_type == 0x90:
# Receive Packet (RX) payload 先解碼
processed_data = XBeeFrameHandler.decapsulate_data(bytes(an_frame))
# 轉換失敗就捨棄了
if processed_data is None:
continue
# 再透過 UDP 送出
self.udp_handler.transport.sendto(processed_data, (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port))
elif frame_type == 0x8B: # ====================== UDP Handler =====================
pass
else: class UDPHandler(asyncio.DatagramProtocol):
# 其他類型的 frame 未來可擴展處理 現在忽略 """asyncio.DatagramProtocol 用於處理 UDP 收發"""
logger.warning(f"[{self.serial_port_str}] Undefined frame type: 0x{frame_type:02X}")
# # RSSI
# if frame[3] == 0x88 and frame[5:7] == b'DB': # frame[3] == 0x88 AT -> API 封包
# # frame[5:7] == b'DB' -> API 封包的DB參數
# status = frame[7] #
# if status == 0x00 and len(frame) > 8: # status == 0x00 -> 這個封包是有效封包
# rssi_value = frame[8]
# now = time.time()
# # === 優化 1僅信任最近 0.5 秒內有接收 MAVLink 的 port
# last_time = serial_last_mavlink_time.get(self.serial_port, 0)
# if now - last_time <= 0.5:
# sysid = serial_to_sysid.get(self.serial_port, None)
# if sysid is not None:
# rssi_history[sysid].append(-rssi_value)
# time_history[sysid].append(now)
# # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm")
# else:
# print(f"[{self.serial_port}] 找不到 sysid 對應RSSI = -{rssi_value} dBm已忽略")
# else:
# print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLinkRSSI = -{rssi_value} dBm 已忽略")
# else:
# print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}")
class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發
LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端 IP LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端 IP
def __init__(self, target_port): def __init__(self, target_port, serial_mode: SerialMode):
self.target_port = target_port # 目標 UDP 端口 self.target_port = target_port # 目標 UDP 端口
self.serial_mode = serial_mode
self.serial_handler = None # Serial 的傳輸物件 self.serial_handler = None # Serial 的傳輸物件
self.transport = None # UDP 自己的傳輸物件 self.transport = None # UDP 自己的傳輸物件
self.remote_addr = None # 儲存動態獲取的遠程地址 # debug
# self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport): def connection_made(self, transport):
"""連接建立時的回調"""
self.transport = transport self.transport = transport
# logger.info(f"UDP transport ready. Waiting for serial data before sending.") # debug logger.debug(f"UDP transport ready for port {self.target_port}")
def set_serial_handler(self, serial_handler): def set_serial_handler(self, serial_handler):
"""設置對應的 Serial Handler"""
self.serial_handler = serial_handler self.serial_handler = serial_handler
# UDP 收到資料的處理過程
def datagram_received(self, data, addr): def datagram_received(self, data, addr):
# 儲存對方的地址(這樣就能向同一個來源回傳數據) """UDP 收到資料的處理過程"""
# self.remote_addr = addr # debug if not self.serial_handler:
# print(f"Received UDP data from {addr}, setting as remote address") logger.warning("Serial handler not set, dropping UDP packet")
return
processed_data = XBeeFrameHandler.encapsulate_data(data) # 使用 processor 封裝數據
processed_data = self.serial_handler.processor.process_outgoing(data)
if self.serial_handler: # 發送到串口
self.serial_handler.transport.write(processed_data) self.serial_handler.transport.write(processed_data)
#==================================================================
class SerialReceiverType(Enum):
"""連接類型"""
TELEMETRY = auto()
XBEEAPI2AT = auto()
OTHER = auto()
# ====================== Serial Manager =====================
class serial_manager: class serial_manager:
"""串口管理器"""
class serial_object: class serial_object:
def __init__(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): """串口物件"""
def __init__(self, serial_port, baudrate, target_port, serial_mode: SerialMode):
self.serial_port = serial_port # /dev/ttyUSB or COM3 ...etc self.serial_port = serial_port # /dev/ttyUSB or COM3 ...etc
self.baudrate = baudrate self.baudrate = baudrate
self.receiver_type = receiver_type self.serial_mode = serial_mode
self.target_port = target_port # 指向的 UPD 端口 self.target_port = target_port # 指向的 UDP 端口
self.transport = None # TODO 這個變數可能沒有作用 self.transport = None
self.protocol = None # TODO 這個變數可能沒有作用 self.protocol = None
self.udp_handler = None self.udp_handler = None
self.serial_handler = None self.serial_handler = None
@ -344,14 +374,14 @@ class serial_manager:
self.thread = None self.thread = None
def start(self): def start(self):
"""啟動 serial_manager"""
if self.running: if self.running:
logger.warning("serial_manager already running") logger.warning("serial_manager already running")
return return False
self.running = True self.running = True
# 啟動獨立線程 命名為 SerialManager # 啟動獨立線程命名為 SerialManager
self.thread = threading.Thread( self.thread = threading.Thread(
target=self._run_event_loop, target=self._run_event_loop,
name="SerialManager" name="SerialManager"
@ -375,7 +405,6 @@ class serial_manager:
def shutdown(self): def shutdown(self):
"""停止 serial_manager 和其管理的所有 serial_object""" """停止 serial_manager 和其管理的所有 serial_object"""
# 自己在 running 狀態下才執行停止程序
if not self.running: if not self.running:
logger.warning("serial_manager is not running") logger.warning("serial_manager is not running")
return return
@ -404,37 +433,25 @@ class serial_manager:
self.loop = asyncio.new_event_loop() self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) asyncio.set_event_loop(self.loop)
# # 為每個 serial_object 建立連接
# for serial_obj in self.serial_objects:
# coro = serial_asyncio.create_serial_connection(
# self.loop,
# lambda: SerialProtocol(serial_obj.receiver_type),
# serial_obj.serial_port,
# baudrate=serial_obj.baudrate
# )
# transport, protocol = self.loop.run_until_complete(coro)
# serial_obj.transport = transport
# serial_obj.protocol = protocol
try: try:
self.loop.run_forever() self.loop.run_forever()
finally: finally:
self.loop.close() self.loop.close()
def create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): def create_serial_link(self, serial_port, baudrate, target_port, serial_mode: SerialMode):
"""創建串口連接"""
if not self.running or not self.loop: if not self.running or not self.loop:
logger.error("Event loop not running, cannot create serial link") logger.error("Event loop not running, cannot create serial link")
return False return False
# 檢查 serial port 有效 # 檢查 serial port 有效性
if not self.check_serial_port(serial_port, baudrate): if not self.check_serial_port(serial_port, baudrate):
logger.error(f"Serial port {serial_port} validation failed") logger.error(f"Serial port {serial_port} validation failed")
return False return False
# 使用 run_coroutine_threadsafe 執行協程並獲取結果 # 使用 run_coroutine_threadsafe 執行協程並獲取結果
future = asyncio.run_coroutine_threadsafe( future = asyncio.run_coroutine_threadsafe(
self._async_create_serial_link(serial_port, baudrate, target_port, receiver_type), self._async_create_serial_link(serial_port, baudrate, target_port, serial_mode),
self.loop self.loop
) )
@ -442,8 +459,8 @@ class serial_manager:
# 等待結果,設定合理的超時時間 # 等待結果,設定合理的超時時間
result = future.result(timeout=5.0) result = future.result(timeout=5.0)
if result: if result:
logger.info(f"Create Serial Link: {serial_port} -> UDP {target_port}") logger.info(f"Create Serial Link: {serial_port} ({serial_mode.name}) -> UDP {target_port}")
return True return result
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.error(f"Timeout creating serial link for {serial_port}") logger.error(f"Timeout creating serial link for {serial_port}")
return False return False
@ -451,14 +468,14 @@ class serial_manager:
logger.error(f"Failed to create serial link for {serial_port}: {e}") logger.error(f"Failed to create serial link for {serial_port}: {e}")
return False return False
async def _async_create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType): async def _async_create_serial_link(self, serial_port, baudrate, target_port, serial_mode: SerialMode):
"""在事件循環線程中執行實際的連接創建""" """在事件循環線程中執行實際的連接創建"""
try: try:
# 創建 serial_object 實例 # 創建 serial_object 實例
serial_obj = self.serial_object(serial_port, baudrate, target_port, receiver_type) serial_obj = self.serial_object(serial_port, baudrate, target_port, serial_mode)
# 建立 UDP 處理器並指定目標端口位置 # 建立 UDP 處理器並指定目標端口位置
serial_obj.udp_handler = UDPHandler(target_port) serial_obj.udp_handler = UDPHandler(target_port, serial_mode)
# 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配 # 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配
udp_transport, udp_protocol = await self.loop.create_datagram_endpoint( udp_transport, udp_protocol = await self.loop.create_datagram_endpoint(
@ -468,10 +485,10 @@ class serial_manager:
serial_obj.transport = udp_transport serial_obj.transport = udp_transport
serial_obj.protocol = udp_protocol serial_obj.protocol = udp_protocol
# logger.info(f"UDP endpoint created for {serial_port}") # debug logger.debug(f"UDP endpoint created for {serial_port}")
# 建立 Serial 處理器,將 UDP 處理器傳給它 # 建立 Serial 處理器,將 UDP 處理器和模式傳給它
serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler, serial_port) serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler, serial_port, serial_mode)
# 建立 Serial 連接 # 建立 Serial 連接
serial_transport, _ = await serial_asyncio.create_serial_connection( serial_transport, _ = await serial_asyncio.create_serial_connection(
@ -481,14 +498,14 @@ class serial_manager:
baudrate=baudrate baudrate=baudrate
) )
# logger.info(f"Serial connection created for {serial_port}") # debug logger.debug(f"Serial connection created for {serial_port}")
# 將 serial_object 加入管理列表 # 將 serial_object 加入管理列表
serial_id = self.serial_count + 1 serial_id = self.serial_count + 1
self.serial_objects[serial_id] = serial_obj self.serial_objects[serial_id] = serial_obj
self.serial_count += 1 self.serial_count += 1
# logger.info(f"Serial object {serial_id} added to manager") # debug logger.debug(f"Serial object {serial_id} added to manager")
return True return True
except Exception as e: except Exception as e:
@ -501,12 +518,10 @@ class serial_manager:
def remove_serial_link(self, serial_id): def remove_serial_link(self, serial_id):
"""移除串口連接(線程安全方式)""" """移除串口連接(線程安全方式)"""
# 確保事件循環正在運行
if not self.loop: if not self.loop:
logger.error("Event loop not running") logger.error("Event loop not running")
return False return False
# 檢查 serial_id 是否存在
if serial_id not in self.serial_objects: if serial_id not in self.serial_objects:
logger.warning(f"Serial object {serial_id} not found") logger.warning(f"Serial object {serial_id} not found")
return False return False
@ -549,7 +564,7 @@ class serial_manager:
# 從管理列表中移除 # 從管理列表中移除
del self.serial_objects[serial_id] del self.serial_objects[serial_id]
# logger.info(f"Serial object {serial_id} removed from manager") # debug logger.debug(f"Serial object {serial_id} removed from manager")
return True return True
except Exception as e: except Exception as e:
@ -557,6 +572,7 @@ class serial_manager:
return False return False
def get_serial_link(self): def get_serial_link(self):
"""取得所有串口連接資訊"""
ret = {} # serial id num : serial_port string ret = {} # serial id num : serial_port string
for key, obj in self.serial_objects.items(): for key, obj in self.serial_objects.items():
ret[key] = obj.serial_port ret[key] = obj.serial_port
@ -593,6 +609,8 @@ class serial_manager:
return False return False
# ====================== 測試代碼 =====================
if __name__ == '__main__': if __name__ == '__main__':
sm = serial_manager() sm = serial_manager()
sm.start() sm.start()
@ -600,11 +618,12 @@ if __name__ == '__main__':
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定 SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200 SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14571 UDP_REMOTE_PORT = 14571
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialReceiverType.XBEEAPI2AT) sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
# sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.STRAIGHT)
linked_serial = sm.get_serial_link() linked_serial = sm.get_serial_link()
print(linked_serial) print(linked_serial)
time.sleep(10) time.sleep(30)
sm.remove_serial_link(1) sm.remove_serial_link(1)
time.sleep(3) time.sleep(3)

Loading…
Cancel
Save