1. serialManager 功能完成

2. mainOrchestrator 把serial端口整合到介面中了
3. 新增獲取系統 serial 資源的 util 工具
chiyu
Chiyu Chen 5 months ago
parent bcccdec927
commit a8aefe7853

@ -20,7 +20,11 @@ from pymavlink import mavutil
# 自定義的 import
from . import mavlinkObject as mo
from . import serialManager as sm
from .utils import RingBuffer, setup_logger
from .utils import acquireSerial, acquirePort
from .utils.acquirePort import find_available_port
@ -32,11 +36,13 @@ class PanelState:
termination_start_time = None
self.mavlink_bridge_state = "Stopped"
self.object_manager_state = "Stopped"
self.serial_manager_state = "Stopped"
self.socket_object_list = []
self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息
# 這邊是儲存關於 socket object 的資料
self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊
self.serial_info_temp = {"Port": "", "Baud": 115200, "CommunicationType": "", "Go2Middleware": True} # 暫存 Serial 設定資訊
self.socket_info_single = {"socket_type": "", "socket_state": "", "bridge_msg_types": "", "return_msg_types": "",
"target_sockets": "", "primary_socket_id": "", "socket_connection_string": "",
"InfoReady": False} # 暫存單一 socket 的資訊
@ -122,7 +128,7 @@ class ControlPanel:
return user_input
def create_object_list_menu(self, state: PanelState, page=0, items_per_page=5):
def create_object_list_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 mavlink_object 列表選單(支持分頁)"""
children = []
@ -167,6 +173,55 @@ class ControlPanel:
menu.current_page = page
return menu
def create_serial_port_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 serial port 列表選單(支持分頁)"""
children = []
# 獲取可用的 Serial 連接埠列表
# serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓
serial_ports = acquireSerial.get_serial_ports_with_filter('/dev/ttyUSB*')
if not serial_ports:
children.append(MenuNode("(空)", "目前沒有串口設備", None))
else:
total_items = len(serial_ports)
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的串口
for port in serial_ports[start_idx:end_idx]:
port_menu = MenuNode(f"{port}", children=[
MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[
MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"),
MenuNode("Xbee(AT-AT)", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"),
]),
MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"),
MenuNode("Create", "建立此串口", "CREATE_SERIAL_PORT"),
MenuNode("返回", "回到列表", "BACK"),
])
# 將 port 附加到每個子選單項目上
for child in port_menu.children:
child.port = port
children.append(port_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
if page > 0:
prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE")
prev_node.page = page - 1
children.append(prev_node)
if page < total_pages - 1:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("返回", "回到上層選單", "BACK"))
menu = MenuNode("Serial Port List", f"串口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
return menu
def show_object_info(self, stdscr, socket_id, state: PanelState):
"""顯示物件詳細資訊的對話框"""
height, width = stdscr.getmaxyx()
@ -280,12 +335,14 @@ class ControlPanel:
MenuNode("Port(Target)", "設定目標的 Port", "TEXT_UDP_PORT"),
MenuNode("Create", "建立 UDP OutBound 連結口", "CREATE_UDP_OUTBOUND"),
]),
MenuNode("Serial InBound", action = "LIST_SERIAL_RES"),
]),
MenuNode("ListAll", "顯示並管理所有連結口", "LIST_MAV_OBJECT"),
]),
MenuNode("Engineer Mode", "工程模式", children=[
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), #TODO: 尚未實作
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), #TODO: 尚未實作
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"),
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"),
MenuNode("Stop Serial M.", "停止 Serial 端口轉接", "STOP_SERIAL_MANAGER"),
]),
MenuNode("Shutdown", "關閉整個系統", children=[
MenuNode("Return", "繼續運行", "BACK"),
@ -304,11 +361,11 @@ class ControlPanel:
curses.echo()
curses.endwin()
def panel_shutdown():
def pre_panel_shutdown():
# 先關閉所有模組 再關閉面板
cmd_q.put("SHUTDOWN_BRIDGE")
cmd_q.put("SHUTDOWN_MANAGER")
cmd_q.put("SHUTDOWN_SERIAL_MANAGER")
def draw_menu(screen):
nonlocal stdscr
@ -325,9 +382,6 @@ class ControlPanel:
state.intoSTART() # 設定狀態為運行中
while not stop_evt.is_set():
# 檢查是否需要退出
if stop_evt.is_set():
break
current_menu = menu_stack[-1]
current_idx = idx_stack[-1]
@ -335,20 +389,27 @@ class ControlPanel:
# 獲取終端機尺寸
height, width = stdscr.getmaxyx()
# 簡單暴力的限制視窗的大小
if height < 20 or width < 60:
MIN_HEIGHT = (
2 + # 邊界
6 + # 狀態列 操作說明列 一個空白
11+ # 最大選單 與 空白區
5 # 訊息區域
)
if height < MIN_HEIGHT or width < 60:
logger.error("Terminal size too small for Control Panel.")
break
stdscr.clear()
stdscr.border()
# 更新模組狀態顯示
stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD)
stdscr.addstr(1, 2, f" Panel Status : {state.panel_status}")
stdscr.addstr(2, 2, f"mavlink Bridge State : {state.mavlink_bridge_state}")
stdscr.addstr(3, 2, f"Object Manager State : {state.object_manager_state}")
stdscr.addstr(2, 2, f"Object Manager State : {state.object_manager_state}")
stdscr.addstr(3, 2, f"Mavlink Bridge State : {state.mavlink_bridge_state}")
stdscr.addstr(4, 2, f"Socket Object number : {len(state.socket_object_list)}")
stdscr.addstr(2, 36, f"Serial Manager State : {state.serial_manager_state}")
# # 更新模組狀態顯示
# stdscr.addstr(2, 25, f"{state.mavlink_bridge_state}")
# stdscr.addstr(3, 25, f"{state.object_manager_state}")
# stdscr.addstr(4, 25, f"{len(state.socket_object_list)} ")
@ -363,6 +424,10 @@ class ControlPanel:
desc = f"{child.desc} [{state.udp_info_temp['IP']}]"
elif child.action == "TEXT_UDP_PORT" and state.udp_info_temp["Port"]:
desc = f"{child.desc} [{state.udp_info_temp['Port']}]"
elif child.action == "SET_SERIAL_COMM" and state.serial_info_temp["CommunicationType"]:
desc = f"{child.desc} [{state.serial_info_temp['CommunicationType']}]"
elif child.action == "TEXT_BAUD_SERIAL" and state.serial_info_temp["Baud"]:
desc = f"{child.desc} [{state.serial_info_temp['Baud']}]"
line = f"{marker}{child.name:15s} {desc}"
attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL
@ -404,25 +469,29 @@ class ControlPanel:
stdscr.refresh()
# 若進入 TERMINATION 狀態,畫面可以刷新 但是不能操作
# 驗證 bridge 跟 manager 狀態 兩者都停止後 就進入 STOPPED 狀態並跳出迴圈
# 驗證 其他附屬模組的狀態都停止後 就進入 STOPPED 狀態並跳出迴圈
# 超過幾秒沒有反應就強制關閉
if state.panel_status == "Terminating":
if time.time() - state.termination_start_time > 3:
if time.time() - state.termination_start_time > 7: # 其他組件設定5秒 這邊給多一點
logger.warning("Control Panel forced shutdown after timeout.")
state.intoSTOPPED()
stop_evt.set()
continue
# stop_evt.set()
# continue
break
time.sleep(0.1)
if state.mavlink_bridge_state == "Stopped" and state.object_manager_state == "Stopped":
if (state.mavlink_bridge_state == "Stopped" and
state.object_manager_state == "Stopped" and
state.serial_manager_state == "Stopped"):
state.intoSTOPPED()
stop_evt.set()
# stop_evt.set()
break
continue
# 設定短暫的 timeout讓執行緒能夠響應 stop_evt
stdscr.timeout(100) # 100ms timeout
stdscr.timeout(100)
ch = stdscr.getch()
if ch == -1: # timeout繼續檢查 stop_evt
if ch == -1: # 沒有操作
continue
# 處理按鍵
@ -456,7 +525,7 @@ class ControlPanel:
elif ch in (ord('q'), 27):
if state.panel_status == "Engineer":
state.intoTERMINATION()
panel_shutdown()
pre_panel_shutdown()
elif ch in (curses.KEY_ENTER, 10, 13):
selected = current_menu.children[current_idx]
@ -473,7 +542,7 @@ class ControlPanel:
elif selected.action == "QUIT":
state.intoTERMINATION()
panel_shutdown()
pre_panel_shutdown()
elif selected.action == "TEXT_UDP_IP":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 IP 位址: ")
@ -491,40 +560,98 @@ class ControlPanel:
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
menu_stack.pop()
idx_stack.pop()
# menu_stack.pop()
# idx_stack.pop()
elif selected.action == "CREATE_UDP_OUTBOUND":
cmd_q.put("CREATE_UDP_OUTBOUND")
# 確認後回到上兩層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
# menu_stack.pop()
# idx_stack.pop()
elif selected.action == "TEXT_BAUD_SERIAL":
result = ControlPanel.input_dialog(stdscr, "請輸入 Baud Rate (e.g., 9600, 115200): ")
if result is not None:
try:
baud_rate = int(result)
except ValueError:
state.panel_info_msg_list.append(("Invalid Baud Rate input.", time.time()))
state.serial_info_temp["Baud"] = baud_rate
elif selected.action == "SET_SERIAL_COMM_XBEE":
state.serial_info_temp["CommunicationType"] = "XBee(API-AT)"
menu_stack.pop()
idx_stack.pop()
elif selected.action == "SET_SERIAL_COMM_TELEMETRY":
state.serial_info_temp["CommunicationType"] = "XBee(AT-AT)"
menu_stack.pop()
idx_stack.pop()
elif selected.action == "CREATE_SERIAL_PORT":
state.serial_info_temp["Port"] = menu_stack[-1].name # 從選單取得 Port 名稱
cmd_q.put("CREATE_SERIAL_PORT")
# 確認後回到上兩層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
menu_stack.pop()
idx_stack.pop()
elif selected.action == "LIST_SERIAL_RES":
created_list_menu = self.create_serial_port_menu(state, page=0)
menu_stack.append(created_list_menu)
idx_stack.append(0)
elif selected.action == "LIST_MAV_OBJECT":
# 動態生成 mavlink_object 列表選單
object_list_menu = self.create_object_list_menu(state, page=0)
menu_stack.append(object_list_menu)
created_list_menu = self.create_object_list_menu(state, page=0)
menu_stack.append(created_list_menu)
idx_stack.append(0)
elif selected.action == "PREV_PAGE":
# 上一頁
# elif selected.action == "PREV_PAGE":
# # 上一頁
# if hasattr(selected, 'page'):
# menu_stack.pop()
# idx_stack.pop()
# if menu_stack[-1].name == "Serial Port List":
# created_list_menu = self.create_serial_port_menu(state, page=selected.page)
# elif menu_stack[-1].name == "Object List":
# created_list_menu = self.create_object_list_menu(state, page=selected.page)
# menu_stack.append(created_list_menu)
# idx_stack.append(0)
# elif selected.action == "NEXT_PAGE":
# # 下一頁
# if hasattr(selected, 'page'):
# menu_stack.pop()
# idx_stack.pop()
# if menu_stack[-1].name == "Serial Port List":
# created_list_menu = self.create_serial_port_menu(state, page=selected.page)
# elif menu_stack[-1].name == "Object List":
# created_list_menu = self.create_object_list_menu(state, page=selected.page)
# menu_stack.append(created_list_menu)
# idx_stack.append(0)
elif selected.action in ("PREV_PAGE", "NEXT_PAGE"):
if hasattr(selected, 'page'):
current_list_menu = menu_stack[-1]
menu_stack.pop()
idx_stack.pop()
object_list_menu = self.create_object_list_menu(state, page=selected.page)
menu_stack.append(object_list_menu)
# 依據選單種類 重新建立分頁
if current_list_menu.name == "Serial Port List":
created_list_menu = self.create_serial_port_menu(state, page=selected.page)
elif current_list_menu.name == "Object List":
created_list_menu = self.create_object_list_menu(state, page=selected.page)
else:
# 不支援的選單類型,回到原本的選單
menu_stack.append(current_list_menu)
idx_stack.append(0)
continue
elif selected.action == "NEXT_PAGE":
# 下一頁
if hasattr(selected, 'page'):
menu_stack.pop()
idx_stack.pop()
object_list_menu = self.create_object_list_menu(state, page=selected.page)
menu_stack.append(object_list_menu)
menu_stack.append(created_list_menu)
idx_stack.append(0)
elif selected.action == "INSPECT_MAV_OBJECT":
@ -583,6 +710,13 @@ class ControlPanel:
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
cmd_q.put("SHUTDOWN_BRIDGE")
elif selected.action == "STOP_SERIAL_MANAGER":
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
cmd_q.put("SHUTDOWN_SERIAL_MANAGER")
elif callable(selected.action):
# 執行函式
cmd_q.put(selected.action)
@ -592,14 +726,13 @@ class ControlPanel:
except KeyboardInterrupt:
pass
finally:
stop_evt.set()
cleanup()
class Orchestrator:
def __init__(self, stop_sig):
self.stop_evt = stop_sig
self.stop_evt = stop_sig # 外部操作去中斷 "面板" 執行緒的訊號 (內部自己停止的話不需要用這個)
# === 1) 面板部分的準備 ===
self.cmd_q = queue.Queue()
@ -613,6 +746,9 @@ class Orchestrator:
self.manager = mo.async_io_manager()
self.bridge = mo.mavlink_bridge()
# === 3) serial_manager 部分的準備 ===
self.plumber = sm.serial_manager()
def engageWholeSystem(self):
"""啟動整個系統"""
# === 1) 面板部分的啟動 ===
@ -623,10 +759,14 @@ class Orchestrator:
self.manager.start()
self.bridge.start()
# === 3) serial_manager 部分的啟動 ===
self.plumber.start()
def mainLoop(self):
logger.info("Main orchestrator started <-")
try:
while not self.stop_evt.is_set():
# while not self.stop_evt.is_set():
while self.panel_thread.is_alive():
# A. 更新模組狀態
if self.manager.running:
@ -642,6 +782,11 @@ class Orchestrator:
else:
self.panelState.mavlink_bridge_state = 'Stopped'
if self.plumber.running:
self.panelState.serial_manager_state = 'Running'
else:
self.panelState.serial_manager_state = 'Stopped'
# 取出面板丟過來的「動作」
try:
cmd = self.cmd_q.get_nowait()
@ -686,10 +831,14 @@ class Orchestrator:
elif cmd == "CREATE_UDP_OUTBOUND":
self.panelState.udp_info_temp["direction"] = "outbound"
self.create_udp_object()
elif cmd == "CREATE_SERIAL_PORT":
self.create_serial_port_object()
elif cmd == "SHUTDOWN_BRIDGE":
self.bridge.stop()
elif cmd == "SHUTDOWN_MANAGER":
self.manager.shutdown()
elif cmd == "SHUTDOWN_SERIAL_MANAGER":
self.plumber.shutdown()
except queue.Empty:
pass
except Exception as e:
@ -703,18 +852,32 @@ class Orchestrator:
except Exception as e:
logger.error(f"Unexpected error in main loop: {e}")
finally:
logger.info("Main orchestrator END!")
# 關閉 mavlink_bridge (裡面有一個執行緒)
# 驗證並確保所有模組都被下達關閉訊號
# 若是由面板操作結束系統 這些關閉行為將於 ControlPanel.pre_panel_shutdown() 觸發
if self.bridge.thread.is_alive():
if self.bridge.running:
self.bridge.stop()
self.bridge.thread.join(timeout=2)
# 關閉 async_io_manager (裡面有一個執行緒)
if self.manager.thread.is_alive():
if self.manager.running:
self.manager.shutdown()
self.manager.thread.join(timeout=2)
if self.plumber.thread.is_alive():
if self.plumber.running:
self.plumber.shutdown()
self.plumber.thread.join(timeout=2)
# 關閉面板執行緒
if self.panel_thread.is_alive():
self.panel_thread.join(timeout=2)
logger.info("Main orchestrator END!")
# =============== 面板動作 - Mavlink Object ===============
def create_udp_object(self):
if self.panelState.udp_info_temp["direction"] == "inbound":
connection_string = f"udp:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}"
@ -761,6 +924,46 @@ class Orchestrator:
else:
self.panelState.panel_info_msg_list.append((f"Fail Removing target {target_id} from socket {source_id}", time.time()))
# =============== 面板動作 - Serial Manager ===============
def create_serial_port_object(self):
# 獲取可用的 udp port
udp_port_tmp = find_available_port(19000, 20000)
# 定義通訊類型映射表
COMM_TYPE_MAP = {
"XBee(API-AT)": sm.CommunicationType.XBee_API_AT,
# "XBee(AT-AT)": sm.CommunicationType.XBee_AT_AT, # TODO: 之後再弄
# 新增區
}
# 驗證輸入
comm_type = self.panelState.serial_info_temp['CommunicationType']
if not comm_type:
self.panelState.panel_info_msg_list.append(
("Please select Communication Type first.", time.time())
)
return
# 查找對應的通訊類型
comm_type_tmp = COMM_TYPE_MAP.get(comm_type)
if comm_type_tmp is None:
self.panelState.panel_info_msg_list.append(
(f"Communication type '{comm_type}' not supported yet.", time.time())
)
return
ret = self.plumber.create_serial_port(
port=self.panelState.serial_info_temp['Port'],
baudrate=self.panelState.serial_info_temp['Baud'],
target_port=udp_port_tmp,
communication_type=comm_type_tmp,
)
if not ret:
self.panelState.panel_info_msg_list.append((f"Failed to create Serial Port object at {self.panelState.serial_info_temp['Port']}.", time.time()))
return
def main():
stop_evt = threading.Event()

@ -128,7 +128,7 @@ class mavlink_bridge:
"""停止 mavlink_bridge 的運作"""
self.running = False
if self.thread and self.thread.is_alive():
self.thread.join(timeout=3.0)
self.thread.join(timeout=5.0)
# === Thread 區塊 ===
def _run_thread(self):
@ -632,13 +632,11 @@ class async_io_manager:
# 等待線程結束
if self.thread and self.thread.is_alive():
self.thread.join(timeout=10.0)
self.thread.join(timeout=5.0)
if self.thread.is_alive():
logger.warning("async_io_manager thread did not stop gracefully")
os.kill(os.getpid(), signal.SIGTERM) # 強制終止程序
logger.info("async_io_manager thread END!")
def _run_event_loop(self):

@ -11,6 +11,9 @@ import os
import sys
import serial
import signal
import time
import threading
import struct
from enum import Enum, auto
# # XBee 模組
@ -63,9 +66,10 @@ class XBeeFrameHandler:
# 'data': frame[15:-1]
# }
pass
return None
@staticmethod
def encapsulate_data(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes:
def encapsulate_data(data: bytes, dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', frame_id = 0x01) -> bytes:
"""
將數據封裝為 XBee API 傳輸幀
@ -117,7 +121,6 @@ class XBeeFrameHandler:
return data
class ATCommandHandler:
"""AT 指令回應處理器"""
@ -171,16 +174,18 @@ class ATCommandHandler:
# print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm")
def _handle_serial_high(self, data: bytes):
"""處理 SH (Serial Number High) - 範例"""
if len(data) >= 4:
serial_high = int.from_bytes(data[:4], 'big')
print(f"[{self.serial_port}] Serial High: 0x{serial_high:08X}")
# """處理 SH (Serial Number High) - 範例"""
# if len(data) >= 4:
# serial_high = int.from_bytes(data[:4], 'big')
# print(f"[{self.serial_port}] Serial High: 0x{serial_high:08X}")
pass
def _handle_serial_low(self, data: bytes):
"""處理 SL (Serial Number Low) - 範例"""
if len(data) >= 4:
serial_low = int.from_bytes(data[:4], 'big')
print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}")
# """處理 SL (Serial Number Low) - 範例"""
# if len(data) >= 4:
# serial_low = int.from_bytes(data[:4], 'big')
# print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}")
pass
@ -188,7 +193,7 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial
def __init__(self, udp_handler, serial_port_str):
self.udp_handler = udp_handler # UDP 的傳輸把手
self.serial_port_str = serial_port_str
self.at_handler = ATCommandHandler(serial_port)
self.at_handler = ATCommandHandler(serial_port_str)
self.buffer = bytearray() # 用於緩存接收到的資料
self.transport = None # Serial 自己的傳輸物件
@ -199,7 +204,7 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial
self.transport = transport
if hasattr(self.udp_handler, 'set_serial_handler'):
self.udp_handler.set_serial_handler(self)
logger.info(f"Serial port {self.serial_port_str} connected.")
# logger.info(f"Serial port {self.serial_port_str} connected.") # debug
# Serial 收到資料的處理過程
def data_received(self, data):
@ -222,27 +227,30 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial
break
# 6. 提取完整 frame 並從緩衝區移除
frame_payload = self.buffer[:full_length]
an_frame = self.buffer[:full_length]
del self.buffer[:full_length]
# 7. 判斷 frame 類型
frame_type = frame[3]
frame_type = an_frame[3]
if frame_type == 0x88:
# 處理 AT Command 回應
# response = XBeeFrameHandler.parse_at_command_response(frame)
# response = XBeeFrameHandler.parse_at_command_response(an_frame)
# self.at_handler.handle_response(response)
pass # debug
elif frame_type == 0x90:
# Receive Packet (RX) payload 先解碼
processed_data = XBeeFrameHandler.decapsulate_data(bytes(frame_payload))
processed_data = XBeeFrameHandler.decapsulate_data(bytes(an_frame))
# 轉換失敗就捨棄了
if processed_data is None:
break
continue
# 再透過 UDP 送出
self.udp_handler.transport.sendto(processed_data, (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port))
elif frame_type == 0x8B:
pass
else:
# 其他類型的 frame 未來可擴展處理 現在忽略
logger.warning(f"[{self.serial_port_str}] Undefined frame type: 0x{frame_type:02X}")
@ -295,7 +303,7 @@ class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處
def connection_made(self, transport):
self.transport = transport
print("UDP transport ready. Waiting for serial data before sending...")
# logger.info(f"UDP transport ready. Waiting for serial data before sending.") # debug
def set_serial_handler(self, serial_handler):
self.serial_handler = serial_handler
@ -328,7 +336,7 @@ class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處
class SerialReceiverType(Enum):
"""連接類型"""
TELEMETRY = auto()
XBEEAPI = auto()
XBEEAPI2AT = auto()
OTHER = auto()
@ -388,7 +396,30 @@ class serial_manager:
return False
def shutdown(self):
pass
"""停止 serial_manager 和其管理的所有 serial_object"""
# 自己在 running 狀態下才執行停止程序
if not self.running:
logger.warning("serial_manager is not running")
return
# 停止所有被管理的 serial_object
for serial_id in list(self.serial_objects.keys()):
self.remove_serial_link(serial_id)
# 停止自己
self.running = False
# 解開事件循環的阻塞
if self.loop:
self.loop.call_soon_threadsafe(self.loop.stop)
# 等待線程結束
if self.thread and self.thread.is_alive():
self.thread.join(timeout=5.0)
if self.thread.is_alive():
logger.warning("serial_manager thread did not stop gracefully")
logger.info("serial_manager thread END!")
def _run_event_loop(self):
"""在獨立線程中運行 asyncio 事件循環"""
@ -414,53 +445,142 @@ class serial_manager:
def create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType):
if self.loop is None:
if not self.running or not self.loop:
logger.error("Event loop not running, cannot create serial link")
return False
# 檢查 serial port 有效
self.check_serial_port(serial_port)
if not self.check_serial_port(serial_port, baudrate):
logger.error(f"Serial port {serial_port} validation failed")
return False
# 使用 run_coroutine_threadsafe 執行協程並獲取結果
future = asyncio.run_coroutine_threadsafe(
self._async_create_serial_link(serial_port, baudrate, target_port, receiver_type),
self.loop
)
try:
# 等待結果,設定合理的超時時間
result = future.result(timeout=5.0)
if result:
logger.info(f"Create Serial Link: {serial_port} -> UDP {target_port}")
return True
except asyncio.TimeoutError:
logger.error(f"Timeout creating serial link for {serial_port}")
return False
except Exception as e:
logger.error(f"Failed to create serial link for {serial_port}: {e}")
return False
async def _async_create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType):
"""在事件循環線程中執行實際的連接創建"""
try:
# 創建 serial_object 實例
serial_obj = self.serial_object(serial_port, baudrate, target_port, receiver_type)
# 建立 UDP 處理器並指定目標端口位置
serial_obj.udp_handler = UDPHandler(target_port)
# 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配
try:
serial_obj.transport, serial_obj.protocol = await self.loop.create_datagram_endpoint(
udp_transport, udp_protocol = await self.loop.create_datagram_endpoint(
lambda: serial_obj.udp_handler,
local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口
)
except Exception as e:
logger.error(f"Cannot Create UDP Endpoint: {str(e)}")
return False
serial_obj.transport = udp_transport
serial_obj.protocol = udp_protocol
# 建立 Serial 傳輸,將 UDP 處理器傳給它
try:
serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler)
# logger.info(f"UDP endpoint created for {serial_port}") # debug
_, serial_transport = await serial_asyncio.create_serial_connection(
self.loop, lambda: serial_obj.serial_handler, serial_port, baudrate=baudrate
# 建立 Serial 處理器,將 UDP 處理器傳給它
serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler, serial_port)
# 建立 Serial 連接
serial_transport, _ = await serial_asyncio.create_serial_connection(
self.loop,
lambda: serial_obj.serial_handler,
serial_port,
baudrate=baudrate
)
# logger.info(f"Serial connection created for {serial_port}") # debug
# 將 serial_object 加入管理列表
serial_id = self.serial_count + 1
self.serial_objects[serial_id] = serial_obj
self.serial_count += 1
# logger.info(f"Serial object {serial_id} added to manager") # debug
return True
except Exception as e:
logger.error(f"Cannot Create Serial Connection: {str(e)}")
logger.error(f"Exception in _async_create_serial_link for {serial_port}: {str(e)}")
# 清理已創建的資源
if 'serial_obj' in locals():
if hasattr(serial_obj, 'transport') and serial_obj.transport:
serial_obj.transport.close()
return
return False
# self.serial_objects.append(serial_obj)
self.serial_objects[serial_count+1] = serial_obj
serial_count += 1
def remove_serial_link(self, serial_id):
"""移除串口連接(線程安全方式)"""
# 確保事件循環正在運行
if not self.loop:
logger.error("Event loop not running")
return False
async def _async_create_serial_link(self, serial_port, baudrate, target_port):
pass
# 檢查 serial_id 是否存在
if serial_id not in self.serial_objects:
logger.warning(f"Serial object {serial_id} not found")
return False
def remove_serial_link(serial_id):
pass
# 使用 run_coroutine_threadsafe 執行協程
future = asyncio.run_coroutine_threadsafe(
self._async_remove_serial_link(serial_id),
self.loop
)
try:
result = future.result(timeout=3.0)
if result:
logger.info(f"Remove Serial Link {serial_id}")
return result
except asyncio.TimeoutError:
logger.error(f"Timeout removing serial link {serial_id}")
return False
except Exception as e:
logger.error(f"Failed to remove serial link {serial_id}: {e}")
return False
async def _async_remove_serial_link(self, serial_id):
pass
"""在事件循環線程中執行實際的連接移除"""
if serial_id not in self.serial_objects:
logger.warning(f"Serial object {serial_id} not in managed list")
return False
try:
serial_obj = self.serial_objects[serial_id]
# 關閉 UDP transport
if hasattr(serial_obj, 'transport') and serial_obj.transport:
serial_obj.transport.close()
# 關閉 Serial transport
if hasattr(serial_obj, 'serial_handler') and serial_obj.serial_handler:
if hasattr(serial_obj.serial_handler, 'transport') and serial_obj.serial_handler.transport:
serial_obj.serial_handler.transport.close()
def check_serial_port(serial_port):
# 從管理列表中移除
del self.serial_objects[serial_id]
# logger.info(f"Serial object {serial_id} removed from manager") # debug
return True
except Exception as e:
logger.error(f"Exception in _async_remove_serial_link for {serial_id}: {str(e)}")
return False
@staticmethod
def check_serial_port(serial_port, baudrate):
"""檢查串口是否存在與可用"""
# 檢查設備是否存在
if not os.path.exists(serial_port):
@ -469,7 +589,9 @@ class serial_manager:
# 檢查是否有權限訪問設備
try:
os.access(serial_port, os.R_OK | os.W_OK)
if not os.access(serial_port, os.R_OK | os.W_OK):
logger.error(f"No permission to access {serial_port}")
return False
except Exception as e:
logger.error(f"Cannot Access Serial Device {serial_port}: {str(e)}")
return False
@ -477,7 +599,7 @@ class serial_manager:
# 檢查是否被占用
try:
# 嘗試打開串口
ser = serial.Serial(serial_port, SERIAL_BAUDRATE)
ser = serial.Serial(serial_port, baudrate)
ser.close() # 打開成功後立即關閉
return True
except serial.SerialException as e:
@ -488,12 +610,14 @@ class serial_manager:
return False
if __main__ == '__main__':
if __name__ == '__main__':
sm = serial_manager()
sm.start()
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200
UDP_REMOTE_IP = '127.0.0.1'
UDP_REMOTE_PORT = 14560
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_IP, SerialReceiverType.XBEEAPI)
UDP_REMOTE_PORT = 14571
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialReceiverType.XBEEAPI2AT)
time.sleep(10)
sm.shutdown()

@ -0,0 +1,96 @@
"""
Serial Port Discovery Utility
This module provides functions to discover available serial ports on the system.
It uses glob pattern matching to find serial device files in /dev/.
"""
import glob
from typing import List
def get_serial_ports() -> List[str]:
"""
獲取系統中所有可用的串口設備列表
Linux 系統中會搜尋以下模式的設備
- /dev/ttyUSB* (USB 串口設備)
- /dev/ttyACM* (USB CDC ACM 設備)
- /dev/ttyS* (標準串口)
Returns:
List[str]: 包含所有找到的串口設備路徑的列表
Example:
>>> ports = get_serial_ports()
>>> print(ports)
['/dev/ttyUSB0', '/dev/ttyUSB1', '/dev/ttyS0']
"""
serial_ports = []
# 搜尋不同類型的串口設備
patterns = [
'/dev/ttyUSB*', # USB 串口轉換器
'/dev/ttyACM*', # USB CDC ACM 設備(如 Arduino
'/dev/ttyS*', # 標準串口
]
for pattern in patterns:
serial_ports.extend(glob.glob(pattern))
# 排序以便於閱讀
serial_ports.sort()
return serial_ports
def get_serial_ports_with_filter(filter_pattern: str = None) -> List[str]:
"""
獲取串口設備列表可選擇性地使用自訂篩選模式
Args:
filter_pattern (str, optional): 自訂的 glob 模式例如 '/dev/ttyUSB*'
如果為 None則使用預設模式搜尋所有串口
Returns:
List[str]: 符合條件的串口設備路徑列表
Example:
>>> # 只搜尋 USB 串口
>>> usb_ports = get_serial_ports_with_filter('/dev/ttyUSB*')
>>> print(usb_ports)
['/dev/ttyUSB0', '/dev/ttyUSB1']
"""
if filter_pattern:
serial_ports = glob.glob(filter_pattern)
serial_ports.sort()
return serial_ports
else:
return get_serial_ports()
if __name__ == "__main__":
# 使用範例
print("=== Serial Port Discovery ===\n")
# 1. 獲取所有串口設備
all_ports = get_serial_ports()
print(f"找到 {len(all_ports)} 個串口設備:")
for port in all_ports:
print(f" - {port}")
print("\n" + "="*30 + "\n")
# 2. 只搜尋 USB 串口
usb_ports = get_serial_ports_with_filter('/dev/ttyUSB*')
print(f"找到 {len(usb_ports)} 個 USB 串口設備:")
for port in usb_ports:
print(f" - {port}")
print("\n" + "="*30 + "\n")
# 3. 只搜尋 ACM 設備
acm_ports = get_serial_ports_with_filter('/dev/ttyACM*')
print(f"找到 {len(acm_ports)} 個 ACM 設備:")
for port in acm_ports:
print(f" - {port}")
Loading…
Cancel
Save