1. (modify) acquireSerial.py 撈取多個端口字串功能

2. (modify) 調整面板的顯示字串
3. (remove) serial_udp_bitrans.py 用不到了
Chiyu Chen 4 months ago
parent 62356cc056
commit e00880c6de

@ -21,6 +21,7 @@ from pymavlink import mavutil
# 自定義的 import
from . import mavlinkObject as mo
from . import serialManager as sm
from . import mavlinkVehicleView as mvv
from .utils import RingBuffer, setup_logger
from .utils import acquireSerial, acquirePort
@ -142,7 +143,7 @@ class ControlPanel:
children = []
if not state.socket_object_list:
children.append(MenuNode("()", "目前沒有連結口", None))
children.append(MenuNode("(Empty)", "目前沒有連結口", None))
else:
total_items = len(state.socket_object_list)
total_pages = (total_items + items_per_page - 1) // items_per_page
@ -158,7 +159,7 @@ class ControlPanel:
MenuNode("Cancel Link", "取消轉發連結", "MAVOBJ_CANCEL_LINK"),
MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"),
MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"),
MenuNode("返回", "回到列表", "BACK"),
MenuNode("GoUp", "回到列表", "BACK"),
])
# 將 socket_id 附加到每個子選單項目上
for child in obj_menu.children:
@ -177,7 +178,7 @@ class ControlPanel:
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("返回", "回到上層選單", "BACK"))
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Object List", f"連結口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
return menu
@ -294,10 +295,10 @@ class ControlPanel:
# 獲取可用的 Serial 連接埠列表
# serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓
serial_ports = acquireSerial.get_serial_ports_with_filter('/dev/ttyUSB*')
serial_ports = acquireSerial.get_serial_ports_with_filter(['/dev/ttyUSB*', '/dev/ttyACM*'])
if not serial_ports:
children.append(MenuNode("()", "目前沒有串口設備", None))
children.append(MenuNode("(Empty)", "目前沒有串口設備", None))
else:
total_items = len(serial_ports)
total_pages = (total_items + items_per_page - 1) // items_per_page
@ -312,12 +313,12 @@ class ControlPanel:
# MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"),
]),
MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"),
MenuNode("Link to Middleware", "方便功能 可以直接建立 UDP object", "LINK_SERIAL_TO_MIDDLEWARE_UDP", children=[
MenuNode("Link to MW", "直接建立 Middleware UDP", "LINK_SERIAL_TO_MIDDLEWARE_UDP", children=[
MenuNode("Yes", action = "LINK_SERIAL_TO_MIDDLEWARE_UDP_YES"),
MenuNode("No", action = "LINK_SERIAL_TO_MIDDLEWARE_UDP_NO"),
]),
MenuNode("Create", "建立此串口", "CREATE_SERIAL_PORT"),
MenuNode("返回", "回到列表", "BACK"),
MenuNode("GoUp", "回到列表", "BACK"),
])
# 將 port 附加到每個子選單項目上
for child in port_menu.children:
@ -336,7 +337,7 @@ class ControlPanel:
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("返回", "回到上層選單", "BACK"))
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Serial Port List", f"串口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
return menu
@ -346,7 +347,7 @@ class ControlPanel:
children = []
if not state.linked_serial_dict:
children.append(MenuNode("()", "目前沒有連結口", None))
children.append(MenuNode("(Empty)", "目前沒有連結口", None))
else:
total_items = len(state.linked_serial_dict)
total_pages = (total_items + items_per_page - 1) // items_per_page
@ -361,7 +362,7 @@ class ControlPanel:
MenuNode("Info", "查看詳細資訊", "INSPECT_LINKED_SERIAL"),
MenuNode("Remove", "移除此連結口", "REMOVE_LINKED_SERIAL"),
# MenuNode("Change UDP Target", "變更目標 UDP (工程)", "CHANGE_LINKED_SERIAL_TARGET"),
MenuNode("返回", "回到列表", "BACK"),
MenuNode("GoUp", "回到列表", "BACK"),
])
# 將 serial_id 附加到每個子選單項目上
for child in obj_menu.children:
@ -380,7 +381,7 @@ class ControlPanel:
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("返回", "回到上層選單", "BACK"))
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Linked Serial List", f"連結口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
return menu
@ -435,6 +436,49 @@ class ControlPanel:
stdscr.clear()
stdscr.refresh()
# ================ 關於載具檢視的部份 ===================
def create_vehicles_list_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 已連線載具 列表選單(支持分頁)"""
children = []
if not state.connected_vehicles_dict:
children.append(MenuNode("(Empty)", "目前沒有已連線的載具", None))
else:
total_items = len(state.connected_vehicles_dict)
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
vehicle_id_list = list(state.connected_vehicles_dict.keys())
# 顯示當前頁的物件
for vehicle_id in vehicle_id_list[start_idx:end_idx]:
vehicle_menu = MenuNode(f"Vehicle #{vehicle_id}", f"載具 {vehicle_id}", None, children=[
MenuNode("Info", "查看詳細資訊", "INSPECT_VEHICLE"),
MenuNode("GoUp", "回到列表", "BACK"),
])
# 將 vehicle_id 附加到每個子選單項目上
for child in vehicle_menu.children:
child.vehicle_id = vehicle_id
children.append(vehicle_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
if page > 0:
prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE")
prev_node.page = page - 1
children.append(prev_node)
if page < total_pages - 1:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("GoUp", "回到上層選單", "BACK"))
menu = MenuNode("Connected Vehicles", f"已連線載具列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
return menu
# ================ 關於 主要選單 的部份 ===================
def menu_tree(self):
@ -457,8 +501,9 @@ class ControlPanel:
]),
MenuNode("Serial Manager", "Serial 連接埠選項", children=[
MenuNode("New+", "新增 Serial 連接埠", action = "LIST_SERIAL_RES"),
MenuNode("ListAll", "顯示已連線的 Serial", action = "LIST_SERIAL_LINKS"),
MenuNode("ListAll", "顯示並管理已連線的 Serial", action = "LIST_SERIAL_LINKS"),
]),
MenuNode("Vehicles Insp.", "檢視已連線的遠端載具", action = "INSPECT_VEHICLES"),
MenuNode("Engineer Mode", "工程模式", children=[
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"),
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"),
@ -865,6 +910,8 @@ class Orchestrator:
self.stop_evt = stop_sig # 外部操作去中斷 "面板" 執行緒的訊號 (內部自己停止的話不需要用這個)
self.occupied_ip_ports = {} # 紀錄已被佔用的 ip:port 組合 "ip str" : [port int, port int, ...]
self.vehicle_registry = mvv.vehicle_registry
# === 1) 面板部分的準備 ===
self.cmd_q = queue.Queue()
self.panelState = PanelState() # 面板的狀態儲存

@ -237,7 +237,7 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial
# 處理 AT Command 回應
# response = XBeeFrameHandler.parse_at_command_response(an_frame)
# self.at_handler.handle_response(response)
pass # debug
pass
elif frame_type == 0x90:
# Receive Packet (RX) payload 先解碼
@ -279,16 +279,6 @@ class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial
# print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}")
# def write_to_serial(self, data):
# # 在資料透過 Serial 發送之前進行處理
# processed_data = self.encapsulate_data(data)
# # 處理失敗就不發送
# if processed_data not None:
# self.transport.write(processed_data)
class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發
LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端IP
@ -311,7 +301,7 @@ class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處
# UDP 收到資料的處理過程
def datagram_received(self, data, addr):
# 儲存對方的地址(這樣就能向同一個來源回傳數據)
# self.remote_addr = addr
# self.remote_addr = addr # debug
# print(f"Received UDP data from {addr}, setting as remote address")
processed_data = XBeeFrameHandler.encapsulate_data(data)
@ -319,18 +309,6 @@ class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處
if self.serial_handler:
self.serial_handler.transport.write(processed_data)
# def send_udp(self, data):
# # 藉由 UDP 發送資料出去
# # 在透過 UDP 發送數據之前進行解封裝
# decoded_data = self.decapsulate_data(data)
# if decoded_data is None:
# return
# if self.transport:
# self.transport.sendto(decoded_data, (self.LOCAL_HOST_IP, self.target_port))
#==================================================================
class SerialReceiverType(Enum):

@ -1,340 +0,0 @@
import asyncio
import serial_asyncio
# 附加驗證功能
import os
import sys
import serial
import signal
# XBee 模組
from xbee.frame import APIFrame
# =========================
SERIAL_PORT = '/dev/ttyACM0' # serial port
SERIAL_BAUDRATE = 57600 # serial baudrate
UDP_REMOTE_IP = '127.0.0.1' # UDP Target IP
UDP_REMOTE_PORT = 14550 # UDP Target port
# 測試用 只會吃一次資料
DEBUG_MODE = False
# =========================
def check_serial_port():
"""檢查串口是否存在與可用"""
# 檢查設備是否存在
if not os.path.exists(SERIAL_PORT):
print(f"錯誤:串口設備 {SERIAL_PORT} 不存在")
return False
# 檢查是否有權限訪問設備
try:
os.access(SERIAL_PORT, os.R_OK | os.W_OK)
except Exception as e:
print(f"錯誤:無法訪問串口設備 {SERIAL_PORT}{str(e)}")
return False
# 檢查是否被占用
try:
# 嘗試打開串口
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
ser.close() # 打開成功後立即關閉
return True
except serial.SerialException as e:
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
return False
except Exception as e:
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
return False
# =========================
class SerialToUDP(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發
def __init__(self, udp_protocol):
self.udp_protocol = udp_protocol
self.first_data = True # 標記是否為第一次收到資料
self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"Serial connection established on {SERIAL_PORT}")
## =====================================
# Serial 收到資料,轉發到 UDP
def data_received(self, data):
# 在 DEBUG 模式下,如果已經處理過數據,則直接返回 # debug
if DEBUG_MODE and self.has_processed:
return
# 標記首次收到資料
if hasattr(self.udp_protocol, 'send_udp'):
if self.first_data:
print(f"First data received from serial, forwarding to UDP: {data[:20]}...")
self.first_data = False
# 轉發數據
self.udp_protocol.send_udp(data)
if DEBUG_MODE: # 測試模式用 # debug
self.has_processed = True
print("[DEBUG] SerialToUDP Mark")
def write_to_serial(self, data):
# 在資料透過 Serial 發送之前進行處理
processed_data = self.encapsulate_data(data)
# 處理失敗就不發送
if processed_data not None:
self.transport.write(processed_data)
def encapsulate_data(self, data):
"""
將數據封裝為 XBee API 傳輸幀
使用 XBee API 格式封裝數據:
- 傳輸請求幀 (0x10)
- 使用廣播地址
- 添加適當的頭部和校驗和
"""
## 方法一
# 設置幀參數
frame_id = 0x01 # 幀識別碼,用於確認
frame_type = 0x10 # 幀類型:傳輸請求
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 64位目標地址 (廣播)
dest_addr16 = b'\xFF\xFE' # 16位目標地址 (未知/廣播)
broadcast_radius = 0x00 # 廣播半徑 (0 = 最大)
options = 0x00 # 傳輸選項
# 構建幀數據部分
frame_data = bytearray()
frame_data.append(frame_type) # 添加幀類型
frame_data.append(frame_id) # 添加幀 ID
frame_data.extend(dest_addr64) # 添加 64 位目標地址
frame_data.extend(dest_addr16) # 添加 16 位目標地址
frame_data.append(broadcast_radius) # 添加廣播半徑
frame_data.append(options) # 添加選項
frame_data.extend(data) # 添加實際數據內容
# 計算校驗和 (0xFF 減去所有字節的總和的最低 8 位)
checksum = 0xFF - (sum(frame_data) & 0xFF)
# 構建完整的 API 幀
# 起始分隔符 + 長度 (兩字節) + 幀數據 + 校驗和
complete_frame = bytearray()
complete_frame.append(0x7E) # 添加起始分隔符
complete_frame.extend(struct.pack(">H", len(frame_data))) # 添加長度 (高位優先)
complete_frame.extend(frame_data) # 添加幀數據
complete_frame.append(checksum) # 添加校驗和
return bytes(complete_frame)
## 方法二
# frame_id=0x01
# frame_type = 0x10
# dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
# dest_addr16 = b'\xFF\xFE'
# broadcast_radius = 0x00
# options = 0x00
# frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
# frame += dest_addr64 + dest_addr16
# frame += struct.pack(">BB", broadcast_radius, options) + data
# checksum = 0xFF - (sum(frame) & 0xFF)
# return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發
def __init__(self):
self.serial_transport = None
self.transport = None
self.remote_addr = None # 儲存動態獲取的遠程地址
self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
print("UDP transport ready. Waiting for serial data before sending...")
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
## =====================================
# UDP 收到資料
def datagram_received(self, data, addr):
# 儲存對方的地址(這樣就能向同一個來源回傳數據)
# self.remote_addr = addr
# print(f"Received UDP data from {addr}, setting as remote address")
# 在 DEBUG 模式下,如果已經處理過數據,則直接返回
if DEBUG_MODE and self.has_processed:
return
if self.serial_transport:
self.serial_transport.write_to_serial(data)
if DEBUG_MODE: # 測試模式用
self.has_processed = True
print("[DEBUG] UDPHandler Mark")
def send_udp(self, data):
# 發送資料到 UDP
# if self.transport:
# # 如果有已知的回應地址,使用該地址
# if self.remote_addr:
# self.transport.sendto(data, self.remote_addr)
# # print(f"Sending to dynamic address: {self.remote_addr}")
# else:
# # 否則使用預設地址
# self.transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
# print(f"Sending first UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}")
if self.transport:
# 只有第一次或地址改變時才顯示
# if not hasattr(self, '_last_sent_addr') or self._last_sent_addr != (UDP_REMOTE_IP, UDP_REMOTE_PORT):
# print(f"Sending UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}")
# self._last_sent_addr = (UDP_REMOTE_IP, UDP_REMOTE_PORT)
# 在透過 UDP 發送數據之前進行解封裝
decoded_data = self.decapsulate_data(data)
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def decapsulate_data(self, data):
# 這裡可以根據需要進行數據解封裝
## 方法一
try:
# 創建一個 API 幀處理器
api_frame = APIFrame(escaped=True)
# 嘗試解析數據
api_frame.fill(data)
# 如果數據不完整,直接返回原始數據
if not api_frame.is_complete():
return data
# 解析幀內容
parsed_data = api_frame.parse()
# 提取有用數據
if parsed_data:
frame_data = parsed_data.get('rf_data', None)
if frame_data:
return frame_data
return data
## 方法二 - 手動解析
# try:
# # XBee API 幀格式:
# # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節)
# # 檢查幀起始符 (0x7E)
# if not data or len(data) < 5 or data[0] != 0x7E:
# return data
# # 獲取數據長度 (不包括校驗和)
# length = (data[1] << 8) + data[2]
# # 檢查幀完整性
# if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和
# return data
# # 提取API標識符和數據
# frame_type = data[3]
# frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中
# # 根據不同的幀類型進行處理
# if frame_type == 0x90: # 例如,這是"接收數據包"類型
# # 提取實際有效載荷
# # 對於接收數據包,格式通常是:
# # API標識符(1) + 64位地址(8) + 16位地址(2) + 選項(1) + 數據
# if len(frame_data) >= 12: # 確保數據長度足夠
# payload = frame_data[11:] # 前11字節是地址和選項
# return payload
# return data
# 如果無法提取 則回傳 None
except Exception as e:
print(f"手動解析 XBee 數據錯誤: {e}")
return None
async def main():
# 先檢查串口是否可用
if not check_serial_port():
print("程式終止:串口檢查失敗")
return
loop = asyncio.get_running_loop()
# 設置終止處理
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(
sig,
lambda: asyncio.create_task(shutdown(loop))
)
# 建立單一 UDP 端點處理收發
udp_handler = UDPHandler()
# 建立 UDP 傳輸,不指定接收端口,讓系統自動分配
try:
udp_transport, protocol = await loop.create_datagram_endpoint(
lambda: udp_handler,
local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口
)
except Exception as e:
print(f"無法創建 UDP 端點:{str(e)}")
return
# 獲取系統分配的本地端口
sock = udp_transport.get_extra_info('socket')
local_addr = sock.getsockname()
print(f"UDP listening on {local_addr[0]}:{local_addr[1]}")
# 建立 Serial 傳輸,將 UDP 處理器傳給它
try:
serial_proto = SerialToUDP(udp_handler)
_, serial_transport = await serial_asyncio.create_serial_connection(
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
except Exception as e:
print(f"無法建立串口連接:{str(e)}")
udp_transport.close()
return
print(f"Waiting for data from serial port to initiate UDP communication...")
# 保持運行
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
async def shutdown(loop):
"""關閉程序"""
print("Shutting down...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程式被使用者中斷")
except Exception as e:
print(f"程式執行錯誤:{str(e)}")

@ -6,7 +6,7 @@ It uses glob pattern matching to find serial device files in /dev/.
"""
import glob
from typing import List
from typing import List, Union
def get_serial_ports() -> List[str]:
@ -44,29 +44,44 @@ def get_serial_ports() -> List[str]:
return serial_ports
def get_serial_ports_with_filter(filter_pattern: str = None) -> List[str]:
def get_serial_ports_with_filter(filter_patterns: Union[str, List[str]] = None) -> List[str]:
"""
獲取串口設備列表可選擇性地使用自訂篩選模式
Args:
filter_pattern (str, optional): 自訂的 glob 模式例如 '/dev/ttyUSB*'
filter_patterns (Union[str, List[str]], optional):
單一或多個 glob 模式
- 字串: '/dev/ttyUSB*'
- 列表: ['/dev/ttyUSB*', '/dev/ttyACM*']
如果為 None則使用預設模式搜尋所有串口
Returns:
List[str]: 符合條件的串口設備路徑列表
Example:
>>> # 只搜尋 USB 串口
>>> usb_ports = get_serial_ports_with_filter('/dev/ttyUSB*')
>>> print(usb_ports)
>>> # 單一 pattern
>>> ports = get_serial_ports_with_filter('/dev/ttyUSB*')
>>> print(ports)
['/dev/ttyUSB0', '/dev/ttyUSB1']
>>> # 多個 patterns
>>> ports = get_serial_ports_with_filter(['/dev/ttyUSB*', '/dev/ttyACM*'])
>>> print(ports)
['/dev/ttyACM0', '/dev/ttyUSB0', '/dev/ttyUSB1']
"""
if filter_pattern:
serial_ports = glob.glob(filter_pattern)
if filter_patterns is None:
return get_serial_ports()
# 統一轉成 list 處理
if isinstance(filter_patterns, str):
filter_patterns = [filter_patterns]
serial_ports = []
for pattern in filter_patterns:
serial_ports.extend(glob.glob(pattern))
serial_ports.sort()
return serial_ports
else:
return get_serial_ports()
if __name__ == "__main__":

Loading…
Cancel
Save