Compare commits

...

4 Commits

Author SHA1 Message Date
Chiyu Chen 5134fa8466 commit to merge
serial port 功能已驗證
5 months ago
Chiyu Chen f31cc8742a (Tested) 1. mainOrchestrator.py 修正呼叫錯誤 5 months ago
Chiyu Chen a8aefe7853 1. serialManager 功能完成
2. mainOrchestrator 把serial端口整合到介面中了
3. 新增獲取系統 serial 資源的 util 工具
5 months ago
Chiyu Chen bcccdec927 modify: mainOrchestrator.py 改善顯示介面
add: serialManager.py 添加Serial 連結功能 (未完成)
5 months ago

@ -14,6 +14,7 @@ Python
2. conda-forge 中的 pyserial-asyncio 2. conda-forge 中的 pyserial-asyncio
3. importlib_metadata -> Version: 8.5.0 3. importlib_metadata -> Version: 8.5.0
4. setuptools -> Version: 58.2.0 (版本太新不行) 4. setuptools -> Version: 58.2.0 (版本太新不行)
5. pyserial-asyncio
ROS2 ROS2

@ -20,7 +20,11 @@ from pymavlink import mavutil
# 自定義的 import # 自定義的 import
from . import mavlinkObject as mo from . import mavlinkObject as mo
from . import serialManager as sm
from .utils import RingBuffer, setup_logger from .utils import RingBuffer, setup_logger
from .utils import acquireSerial, acquirePort
from .utils.acquirePort import find_available_port
@ -32,13 +36,17 @@ class PanelState:
termination_start_time = None termination_start_time = None
self.mavlink_bridge_state = "Stopped" self.mavlink_bridge_state = "Stopped"
self.object_manager_state = "Stopped" self.object_manager_state = "Stopped"
self.socket_object_list = [] self.serial_manager_state = "Stopped"
self.socket_object_list = [] # 已有的 mavlink object
self.linked_serial_dict = {} # 已連線的 serial 端口
self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息 self.panel_info_msg_list = [] # 顯示在面板上的資訊訊息
# 這邊是儲存關於 socket object 的資料 # 這邊是儲存關於 socket object 的資料
self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊 self.udp_info_temp = {"IP": "127.0.0.1", "Port": "", "Direction": ""} # 暫存 UDP 設定資訊
self.serial_info_temp = {"Port": "", "Baud": 115200, "CommunicationType": "", "Go2Middleware": True} # 暫存 Serial 設定資訊
self.socket_info_single = {"socket_type": "", "socket_state": "", "bridge_msg_types": "", "return_msg_types": "", self.socket_info_single = {"socket_type": "", "socket_state": "", "bridge_msg_types": "", "return_msg_types": "",
"target_sockets": "", "primary_socket_id": "", "InfoReady": False} # 暫存單一 socket 的資訊 "target_sockets": "", "primary_socket_id": "", "socket_connection_string": "",
"InfoReady": False} # 暫存單一 socket 的資訊
def intoSTART(self): def intoSTART(self):
self.panel_status = "Running" self.panel_status = "Running"
@ -121,7 +129,9 @@ class ControlPanel:
return user_input return user_input
def create_object_list_menu(self, state: PanelState, page=0, items_per_page=5): # ================ 關於 mavlink object 的部份 ===================
def create_object_list_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 mavlink_object 列表選單(支持分頁)""" """動態創建 mavlink_object 列表選單(支持分頁)"""
children = [] children = []
@ -139,9 +149,9 @@ class ControlPanel:
obj_menu = MenuNode(f"Socket #{socket_id}", f"連結口 {socket_id}", None, children=[ obj_menu = MenuNode(f"Socket #{socket_id}", f"連結口 {socket_id}", None, children=[
MenuNode("Info", "查看詳細資訊", "INSPECT_MAV_OBJECT"), MenuNode("Info", "查看詳細資訊", "INSPECT_MAV_OBJECT"),
MenuNode("Make Link", "建立轉發連結", "MAVOBJ_MAKE_LINK"), MenuNode("Make Link", "建立轉發連結", "MAVOBJ_MAKE_LINK"),
MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"), MenuNode("Cancel Link", "取消轉發連結", "MAVOBJ_CANCEL_LINK"),
MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"), MenuNode("Add Target", "添加轉發目標(工程)", "MAVOBJ_ADD_TARGET"),
MenuNode("Remove Target", "移除轉發目標(工程)", "MAVOBJ_REMOVE_TARGET"), MenuNode("Remove", "移除此連結口", "REMOVE_MAV_OBJECT"),
MenuNode("返回", "回到列表", "BACK"), MenuNode("返回", "回到列表", "BACK"),
]) ])
# 將 socket_id 附加到每個子選單項目上 # 將 socket_id 附加到每個子選單項目上
@ -151,13 +161,13 @@ class ControlPanel:
# 添加分頁控制 # 添加分頁控制
if total_pages > 1: if total_pages > 1:
children.append(MenuNode("---", "---", None)) children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
if page > 0: if page > 0:
prev_node = MenuNode("上一頁", f"{page}/{total_pages} ", "PREV_PAGE") prev_node = MenuNode("Prev", "", "PREV_PAGE")
prev_node.page = page - 1 prev_node.page = page - 1
children.append(prev_node) children.append(prev_node)
if page < total_pages - 1: if page < total_pages - 1:
next_node = MenuNode("下一頁 ▶", f"{page + 2}/{total_pages} ", "NEXT_PAGE") next_node = MenuNode("Next ▶", "", "NEXT_PAGE")
next_node.page = page + 1 next_node.page = page + 1
children.append(next_node) children.append(next_node)
@ -183,9 +193,10 @@ class ControlPanel:
# 這裡顯示基本資訊 # 這裡顯示基本資訊
dialog_win.addstr(2, 2, f"Socket ID : {socket_id}") dialog_win.addstr(2, 2, f"Socket ID : {socket_id}")
dialog_win.addstr(3, 2, f"Socket status : 運行中") # dialog_win.addstr(3, 2, f"Socket status : 運行中")
# show_str = ", ".join(map(str, state.socket_info_single.get('socket_type', ''))) # show_str = ", ".join(map(str, state.socket_info_single.get('socket_type', '')))
dialog_win.addstr(4, 2, f"Socket Type : {state.socket_info_single.get('socket_type', '')}") dialog_win.addstr(4, 2, f"Socket Type : {state.socket_info_single.get('socket_type', '')}")
dialog_win.addstr(4, 30, f"{state.socket_info_single.get('socket_connection_string', '')}")
show_str = ",".join(map(str, state.socket_info_single.get('bridge_msg_types', ''))) show_str = ",".join(map(str, state.socket_info_single.get('bridge_msg_types', '')))
dialog_win.addstr(5, 2, f"Bridge Pack : {show_str if show_str else 'N/A'}") dialog_win.addstr(5, 2, f"Bridge Pack : {show_str if show_str else 'N/A'}")
show_str = ",".join(map(str, state.socket_info_single.get('return_msg_types', ''))) show_str = ",".join(map(str, state.socket_info_single.get('return_msg_types', '')))
@ -263,10 +274,110 @@ class ControlPanel:
stdscr.refresh() stdscr.refresh()
return None return None
# ================ 關於 serial link 的部份 ===================
def create_serial_port_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 serial port 列表選單(支持分頁)"""
children = []
# 獲取可用的 Serial 連接埠列表
# serial_ports = acquireSerial.get_serial_ports() # debug 全部抓一抓
serial_ports = acquireSerial.get_serial_ports_with_filter('/dev/ttyUSB*')
if not serial_ports:
children.append(MenuNode("(空)", "目前沒有串口設備", None))
else:
total_items = len(serial_ports)
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的串口
for port in serial_ports[start_idx:end_idx]:
port_menu = MenuNode(f"{port}", children=[
MenuNode("Set Comm Type", "設定通訊形態", "SET_SERIAL_COMM", children=[
MenuNode("XBee(API-AT)", "XBee 模式", "SET_SERIAL_COMM_XBEE"),
# MenuNode("Telemetry", "數傳模式", "SET_SERIAL_COMM_TELEMETRY"),
]),
MenuNode("Set Baud", "設定 Baud", "TEXT_BAUD_SERIAL"),
MenuNode("Create", "建立此串口", "CREATE_SERIAL_PORT"),
MenuNode("返回", "回到列表", "BACK"),
])
# 將 port 附加到每個子選單項目上
for child in port_menu.children:
child.port = port
children.append(port_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
if page > 0:
prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE")
prev_node.page = page - 1
children.append(prev_node)
if page < total_pages - 1:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("返回", "回到上層選單", "BACK"))
menu = MenuNode("Serial Port List", f"串口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
return menu
def create_linked_serial_menu(self, state: PanelState, page=0, items_per_page=8):
"""動態創建 已連線的 serial port 列表選單(支持分頁)並包含後續管理功能"""
children = []
if not state.linked_serial_dict:
children.append(MenuNode("(空)", "目前沒有連結口", None))
else:
total_items = len(state.linked_serial_dict)
total_pages = (total_items + items_per_page - 1) // items_per_page
start_idx = page * items_per_page
end_idx = min(start_idx + items_per_page, total_items)
# 顯示當前頁的物件
linked_serial_id_list = list(state.linked_serial_dict.keys())
for serial_id in linked_serial_id_list[start_idx:end_idx]:
# 為每個 socket 創建子選單
obj_menu = MenuNode(f"Serial #{serial_id}", f"連結口 {serial_id}", None, children=[
MenuNode("Info", "查看詳細資訊", "INSPECT_LINKED_SERIAL"),
MenuNode("Remove", "移除此連結口", "REMOVE_LINKED_SERIAL"),
# MenuNode("Change UDP Target", "變更目標 UDP (工程)", "CHANGE_LINKED_SERIAL_TARGET"),
MenuNode("返回", "回到列表", "BACK"),
])
# 將 serial_id 附加到每個子選單項目上
for child in obj_menu.children:
child.serial_id = serial_id
children.append(obj_menu)
# 添加分頁控制
if total_pages > 1:
children.append(MenuNode("---", f"{page+1}/{total_pages}", None))
if page > 0:
prev_node = MenuNode("◀ Prev", "上頁", "PREV_PAGE")
prev_node.page = page - 1
children.append(prev_node)
if page < total_pages - 1:
next_node = MenuNode("Next ▶", "下頁", "NEXT_PAGE")
next_node.page = page + 1
children.append(next_node)
children.append(MenuNode("返回", "回到上層選單", "BACK"))
menu = MenuNode("Linked Serial List", f"連結口列表 (第 {page + 1} 頁)", children=children)
menu.current_page = page
return menu
def show_linked_serial_info(self):
pass
# ================ 關於 主要選單 的部份 ===================
def menu_tree(self): def menu_tree(self):
"""建立多層選單結構""" """建立多層選單結構"""
return MenuNode("Main Menu", children=[ return MenuNode("Main Menu", children=[
MenuNode("MavLink Object", "控制 MavLink 物件", children=[ MenuNode("MavLink Object", "MavLink 通道選項", children=[
MenuNode("New+", children=[ MenuNode("New+", children=[
MenuNode("UDP InBound", children=[ MenuNode("UDP InBound", children=[
MenuNode("IP(Listen)", "設定監聽的 IP 位址", "TEXT_UDP_IP"), MenuNode("IP(Listen)", "設定監聽的 IP 位址", "TEXT_UDP_IP"),
@ -281,9 +392,14 @@ class ControlPanel:
]), ]),
MenuNode("ListAll", "顯示並管理所有連結口", "LIST_MAV_OBJECT"), MenuNode("ListAll", "顯示並管理所有連結口", "LIST_MAV_OBJECT"),
]), ]),
MenuNode("Serial Manager", "Serial 連接埠選項", children=[
MenuNode("New+", "新增 Serial 連接埠", action = "LIST_SERIAL_RES"),
MenuNode("ListAll", "顯示已連線的 Serial", action = "LIST_SERIAL_LINKS"),
]),
MenuNode("Engineer Mode", "工程模式", children=[ MenuNode("Engineer Mode", "工程模式", children=[
MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"), #TODO: 尚未實作 MenuNode("Stop Manager", "停止 Mavlink 物件管理", "STOP_MANAGER"),
MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"), #TODO: 尚未實作 MenuNode("Stop Bridge", "停止 Mavlink-ROS 橋接", "STOP_BRIDGE"),
MenuNode("Stop Serial M.", "停止 Serial 端口轉接", "STOP_SERIAL_MANAGER"),
]), ]),
MenuNode("Shutdown", "關閉整個系統", children=[ MenuNode("Shutdown", "關閉整個系統", children=[
MenuNode("Return", "繼續運行", "BACK"), MenuNode("Return", "繼續運行", "BACK"),
@ -302,11 +418,11 @@ class ControlPanel:
curses.echo() curses.echo()
curses.endwin() curses.endwin()
def panel_shutdown(): def pre_panel_shutdown():
# 先關閉所有模組 再關閉面板 # 先關閉所有模組 再關閉面板
cmd_q.put("SHUTDOWN_BRIDGE") cmd_q.put("SHUTDOWN_BRIDGE")
cmd_q.put("SHUTDOWN_MANAGER") cmd_q.put("SHUTDOWN_MANAGER")
cmd_q.put("SHUTDOWN_SERIAL_MANAGER")
def draw_menu(screen): def draw_menu(screen):
nonlocal stdscr nonlocal stdscr
@ -323,9 +439,6 @@ class ControlPanel:
state.intoSTART() # 設定狀態為運行中 state.intoSTART() # 設定狀態為運行中
while not stop_evt.is_set(): while not stop_evt.is_set():
# 檢查是否需要退出
if stop_evt.is_set():
break
current_menu = menu_stack[-1] current_menu = menu_stack[-1]
current_idx = idx_stack[-1] current_idx = idx_stack[-1]
@ -333,23 +446,26 @@ class ControlPanel:
# 獲取終端機尺寸 # 獲取終端機尺寸
height, width = stdscr.getmaxyx() height, width = stdscr.getmaxyx()
# 簡單暴力的限制視窗的大小 # 簡單暴力的限制視窗的大小
if height < 20 or width < 60: MIN_HEIGHT = (
2 + # 邊界
6 + # 狀態列 操作說明列 一個空白
11+ # 最大選單 與 空白區
5 # 訊息區域
)
if height < MIN_HEIGHT or width < 60:
logger.error("Terminal size too small for Control Panel.") logger.error("Terminal size too small for Control Panel.")
break break
stdscr.clear() stdscr.clear()
stdscr.border() stdscr.border()
# 更新模組狀態顯示
stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD) stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD)
stdscr.addstr(1, 2, f" Panel Status : {state.panel_status}") stdscr.addstr(1, 2, f" Panel Status : {state.panel_status}")
stdscr.addstr(2, 2, f"mavlink Bridge State : {state.mavlink_bridge_state}") stdscr.addstr(2, 2, f"Object Manager State : {state.object_manager_state}")
stdscr.addstr(3, 2, f"Object Manager State : {state.object_manager_state}") stdscr.addstr(3, 2, f"Mavlink Bridge State : {state.mavlink_bridge_state}")
stdscr.addstr(4, 2, f"Socket Object number : {len(state.socket_object_list)}") stdscr.addstr(4, 2, f"Socket Object number : {len(state.socket_object_list)}")
stdscr.addstr(2, 36, f"Serial Manager State : {state.serial_manager_state}")
# # 更新模組狀態顯示
# stdscr.addstr(2, 25, f"{state.mavlink_bridge_state}")
# stdscr.addstr(3, 25, f"{state.object_manager_state}")
# stdscr.addstr(4, 25, f"{len(state.socket_object_list)} ")
# 顯示當前選單項目 # 顯示當前選單項目
start_line = 6 start_line = 6
@ -361,6 +477,10 @@ class ControlPanel:
desc = f"{child.desc} [{state.udp_info_temp['IP']}]" desc = f"{child.desc} [{state.udp_info_temp['IP']}]"
elif child.action == "TEXT_UDP_PORT" and state.udp_info_temp["Port"]: elif child.action == "TEXT_UDP_PORT" and state.udp_info_temp["Port"]:
desc = f"{child.desc} [{state.udp_info_temp['Port']}]" desc = f"{child.desc} [{state.udp_info_temp['Port']}]"
elif child.action == "SET_SERIAL_COMM" and state.serial_info_temp["CommunicationType"]:
desc = f"{child.desc} [{state.serial_info_temp['CommunicationType']}]"
elif child.action == "TEXT_BAUD_SERIAL" and state.serial_info_temp["Baud"]:
desc = f"{child.desc} [{state.serial_info_temp['Baud']}]"
line = f"{marker}{child.name:15s} {desc}" line = f"{marker}{child.name:15s} {desc}"
attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL
@ -397,30 +517,34 @@ class ControlPanel:
# 操作說明 # 操作說明
# help_line = start_line + len(current_menu.children) + 2 # help_line = start_line + len(current_menu.children) + 2
help_line = height - 2 help_line = height - 2
stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層 q退出", curses.A_DIM) stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 →進入下層", curses.A_DIM)
stdscr.refresh() stdscr.refresh()
# 若進入 TERMINATION 狀態,畫面可以刷新 但是不能操作 # 若進入 TERMINATION 狀態,畫面可以刷新 但是不能操作
# 驗證 bridge 跟 manager 狀態 兩者都停止後 就進入 STOPPED 狀態並跳出迴圈 # 驗證 其他附屬模組的狀態都停止後 就進入 STOPPED 狀態並跳出迴圈
# 超過幾秒沒有反應就強制關閉 # 超過幾秒沒有反應就強制關閉
if state.panel_status == "Terminating": if state.panel_status == "Terminating":
if time.time() - state.termination_start_time > 3: if time.time() - state.termination_start_time > 7: # 其他組件設定5秒 這邊給多一點
logger.warning("Control Panel forced shutdown after timeout.") logger.warning("Control Panel forced shutdown after timeout.")
state.intoSTOPPED() state.intoSTOPPED()
stop_evt.set() # stop_evt.set()
continue # continue
break
time.sleep(0.1) time.sleep(0.1)
if state.mavlink_bridge_state == "Stopped" and state.object_manager_state == "Stopped": if (state.mavlink_bridge_state == "Stopped" and
state.object_manager_state == "Stopped" and
state.serial_manager_state == "Stopped"):
state.intoSTOPPED() state.intoSTOPPED()
stop_evt.set() # stop_evt.set()
break
continue continue
# 設定短暫的 timeout讓執行緒能夠響應 stop_evt # 設定短暫的 timeout讓執行緒能夠響應 stop_evt
stdscr.timeout(100) # 100ms timeout stdscr.timeout(100)
ch = stdscr.getch() ch = stdscr.getch()
if ch == -1: # timeout繼續檢查 stop_evt if ch == -1: # 沒有操作
continue continue
# 處理按鍵 # 處理按鍵
@ -431,9 +555,13 @@ class ControlPanel:
idx_stack[-1] = (current_idx + 1) % len(current_menu.children) idx_stack[-1] = (current_idx + 1) % len(current_menu.children)
elif ch == (ord('O')): elif ch == (ord('O')):
# 直接進入工程模式 # 進入工程模式
state.intoENGINEER() state.intoENGINEER()
elif ch == (ord('o')):
# 離開工程模式
state.intoSTART()
elif ch == curses.KEY_LEFT: elif ch == curses.KEY_LEFT:
# 返回上層 # 返回上層
if len(menu_stack) > 1: if len(menu_stack) > 1:
@ -448,8 +576,9 @@ class ControlPanel:
idx_stack.append(0) idx_stack.append(0)
elif ch in (ord('q'), 27): elif ch in (ord('q'), 27):
if state.panel_status == "Engineer":
state.intoTERMINATION() state.intoTERMINATION()
panel_shutdown() pre_panel_shutdown()
elif ch in (curses.KEY_ENTER, 10, 13): elif ch in (curses.KEY_ENTER, 10, 13):
selected = current_menu.children[current_idx] selected = current_menu.children[current_idx]
@ -466,7 +595,7 @@ class ControlPanel:
elif selected.action == "QUIT": elif selected.action == "QUIT":
state.intoTERMINATION() state.intoTERMINATION()
panel_shutdown() pre_panel_shutdown()
elif selected.action == "TEXT_UDP_IP": elif selected.action == "TEXT_UDP_IP":
result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 IP 位址: ") result = ControlPanel.input_dialog(stdscr, "請輸入監聽的 IP 位址: ")
@ -484,40 +613,82 @@ class ControlPanel:
if len(menu_stack) > 1: if len(menu_stack) > 1:
menu_stack.pop() menu_stack.pop()
idx_stack.pop() idx_stack.pop()
menu_stack.pop() # menu_stack.pop()
idx_stack.pop() # idx_stack.pop()
elif selected.action == "CREATE_UDP_OUTBOUND": elif selected.action == "CREATE_UDP_OUTBOUND":
cmd_q.put("CREATE_UDP_OUTBOUND") cmd_q.put("CREATE_UDP_OUTBOUND")
# 確認後回到上兩層 # 確認後回到上兩層
if len(menu_stack) > 1:
menu_stack.pop()
idx_stack.pop()
# menu_stack.pop()
# idx_stack.pop()
elif selected.action == "TEXT_BAUD_SERIAL":
result = ControlPanel.input_dialog(stdscr, "請輸入 Baud Rate (e.g., 9600, 115200): ")
if result is not None:
try:
baud_rate = int(result)
except ValueError:
state.panel_info_msg_list.append(("Invalid Baud Rate input.", time.time()))
state.serial_info_temp["Baud"] = baud_rate
elif selected.action == "SET_SERIAL_COMM_XBEE":
state.serial_info_temp["CommunicationType"] = "XBee(API-AT)"
menu_stack.pop()
idx_stack.pop()
elif selected.action == "SET_SERIAL_COMM_TELEMETRY":
state.serial_info_temp["CommunicationType"] = "XBee(AT-AT)"
menu_stack.pop()
idx_stack.pop()
elif selected.action == "CREATE_SERIAL_PORT":
state.serial_info_temp["Port"] = menu_stack[-1].name # 從選單取得 Port 名稱
cmd_q.put("CREATE_SERIAL_PORT")
# 確認後回到上兩層
if len(menu_stack) > 1: if len(menu_stack) > 1:
menu_stack.pop() menu_stack.pop()
idx_stack.pop() idx_stack.pop()
menu_stack.pop() menu_stack.pop()
idx_stack.pop() idx_stack.pop()
elif selected.action == "LIST_SERIAL_RES":
created_list_menu = self.create_serial_port_menu(state, page=0)
menu_stack.append(created_list_menu)
idx_stack.append(0)
elif selected.action == "LIST_SERIAL_LINKS":
created_list_menu = self.create_linked_serial_menu(state, page=0)
menu_stack.append(created_list_menu)
idx_stack.append(0)
elif selected.action == "LIST_MAV_OBJECT": elif selected.action == "LIST_MAV_OBJECT":
# 動態生成 mavlink_object 列表選單 # 動態生成 mavlink_object 列表選單
object_list_menu = self.create_object_list_menu(state, page=0) created_list_menu = self.create_object_list_menu(state, page=0)
menu_stack.append(object_list_menu) menu_stack.append(created_list_menu)
idx_stack.append(0) idx_stack.append(0)
elif selected.action == "PREV_PAGE": elif selected.action in ("PREV_PAGE", "NEXT_PAGE"):
# 上一頁
if hasattr(selected, 'page'): if hasattr(selected, 'page'):
current_list_menu = menu_stack[-1]
menu_stack.pop() menu_stack.pop()
idx_stack.pop() idx_stack.pop()
object_list_menu = self.create_object_list_menu(state, page=selected.page)
menu_stack.append(object_list_menu) # 依據選單種類 重新建立分頁
if current_list_menu.name == "Serial Port List":
created_list_menu = self.create_serial_port_menu(state, page=selected.page)
elif current_list_menu.name == "Object List":
created_list_menu = self.create_object_list_menu(state, page=selected.page)
elif current_list_menu.name == "Linked Serial List":
created_list_menu = self.create_linked_serial_menu(state, page=selected.page)
else:
# 不支援的選單類型,回到原本的選單
menu_stack.append(current_list_menu)
idx_stack.append(0) idx_stack.append(0)
continue
elif selected.action == "NEXT_PAGE": menu_stack.append(created_list_menu)
# 下一頁
if hasattr(selected, 'page'):
menu_stack.pop()
idx_stack.pop()
object_list_menu = self.create_object_list_menu(state, page=selected.page)
menu_stack.append(object_list_menu)
idx_stack.append(0) idx_stack.append(0)
elif selected.action == "INSPECT_MAV_OBJECT": elif selected.action == "INSPECT_MAV_OBJECT":
@ -537,22 +708,23 @@ class ControlPanel:
# 反正刷新列表會出錯 乾脆再退一層 在下一次進入列表時刷新就好 # 反正刷新列表會出錯 乾脆再退一層 在下一次進入列表時刷新就好
menu_stack.pop() menu_stack.pop()
idx_stack.pop() idx_stack.pop()
# # 刷新列表頁面
# if len(menu_stack) > 1:
# current_page = menu_stack[-1].current_page if hasattr(menu_stack[-1], 'current_page') else 0
# menu_stack.pop()
# idx_stack.pop()
# time.sleep(0.1) # 等待物件被移除
# object_list_menu = self.create_object_list_menu(state, page=current_page)
# menu_stack.append(object_list_menu)
# idx_stack.append(0)
elif selected.action == "MAVOBJ_MAKE_LINK": elif selected.action == "MAVOBJ_MAKE_LINK":
# 建立轉發連結 # 建立轉發連結
if hasattr(selected, 'socket_id'): if hasattr(selected, 'socket_id'):
target_id = self.select_target_socket(stdscr, selected.socket_id, state) target_id = self.select_target_socket(stdscr, selected.socket_id, state)
if target_id is not None: if target_id is not None:
cmd_q.put(("MAVOBJ_MAKE_LINK", selected.socket_id, target_id)) # cmd_q.put(("MAVOBJ_MAKE_LINK", selected.socket_id, target_id))
cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id))
cmd_q.put(("MAVOBJ_ADD_TARGET", target_id, selected.socket_id)) # 雙向連結
elif selected.action == "MAVOBJ_CANCEL_LINK":
# 取消轉發連結
if hasattr(selected, 'socket_id'):
target_id = self.select_target_socket(stdscr, selected.socket_id, state, remove_mode=True)
if target_id is not None:
cmd_q.put(("MAVOBJ_REMOVE_TARGET", selected.socket_id, target_id))
cmd_q.put(("MAVOBJ_REMOVE_TARGET", target_id, selected.socket_id)) # 雙向取消連結
elif selected.action == "MAVOBJ_ADD_TARGET": elif selected.action == "MAVOBJ_ADD_TARGET":
# 添加目標端口 # 添加目標端口
@ -564,16 +736,6 @@ class ControlPanel:
if target_id is not None: if target_id is not None:
cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id)) cmd_q.put(("MAVOBJ_ADD_TARGET", selected.socket_id, target_id))
elif selected.action == "MAVOBJ_REMOVE_TARGET":
# 移除目標端口
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
if hasattr(selected, 'socket_id'):
target_id = self.select_target_socket(stdscr, selected.socket_id, state, remove_mode=True)
if target_id is not None:
cmd_q.put(("MAVOBJ_REMOVE_TARGET", selected.socket_id, target_id))
elif selected.action == "STOP_MANAGER": elif selected.action == "STOP_MANAGER":
if state.panel_status != "Engineer": if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
@ -585,6 +747,13 @@ class ControlPanel:
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time())) state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作 continue # 只有在工程模式下才能操作
cmd_q.put("SHUTDOWN_BRIDGE") cmd_q.put("SHUTDOWN_BRIDGE")
elif selected.action == "STOP_SERIAL_MANAGER":
if state.panel_status != "Engineer":
state.panel_info_msg_list.append(("Not in Engineer Mode.", time.time()))
continue # 只有在工程模式下才能操作
cmd_q.put("SHUTDOWN_SERIAL_MANAGER")
elif callable(selected.action): elif callable(selected.action):
# 執行函式 # 執行函式
cmd_q.put(selected.action) cmd_q.put(selected.action)
@ -594,14 +763,13 @@ class ControlPanel:
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally: finally:
stop_evt.set()
cleanup() cleanup()
class Orchestrator: class Orchestrator:
def __init__(self, stop_sig): def __init__(self, stop_sig):
self.stop_evt = stop_sig self.stop_evt = stop_sig # 外部操作去中斷 "面板" 執行緒的訊號 (內部自己停止的話不需要用這個)
# === 1) 面板部分的準備 === # === 1) 面板部分的準備 ===
self.cmd_q = queue.Queue() self.cmd_q = queue.Queue()
@ -615,6 +783,9 @@ class Orchestrator:
self.manager = mo.async_io_manager() self.manager = mo.async_io_manager()
self.bridge = mo.mavlink_bridge() self.bridge = mo.mavlink_bridge()
# === 3) serial_manager 部分的準備 ===
self.plumber = sm.serial_manager()
def engageWholeSystem(self): def engageWholeSystem(self):
"""啟動整個系統""" """啟動整個系統"""
# === 1) 面板部分的啟動 === # === 1) 面板部分的啟動 ===
@ -625,10 +796,14 @@ class Orchestrator:
self.manager.start() self.manager.start()
self.bridge.start() self.bridge.start()
# === 3) serial_manager 部分的啟動 ===
self.plumber.start()
def mainLoop(self): def mainLoop(self):
logger.info("Main orchestrator started <-") logger.info("Main orchestrator started <-")
try: try:
while not self.stop_evt.is_set(): # while not self.stop_evt.is_set():
while self.panel_thread.is_alive():
# A. 更新模組狀態 # A. 更新模組狀態
if self.manager.running: if self.manager.running:
@ -644,6 +819,14 @@ class Orchestrator:
else: else:
self.panelState.mavlink_bridge_state = 'Stopped' self.panelState.mavlink_bridge_state = 'Stopped'
if self.plumber.running:
self.panelState.serial_manager_state = 'Running'
else:
self.panelState.serial_manager_state = 'Stopped'
linked_serial_dict = self.plumber.get_serial_link()
self.panelState.linked_serial_dict = linked_serial_dict
# 取出面板丟過來的「動作」 # 取出面板丟過來的「動作」
try: try:
cmd = self.cmd_q.get_nowait() cmd = self.cmd_q.get_nowait()
@ -661,10 +844,6 @@ class Orchestrator:
self.remove_target_from_object(s_id, socket_id) self.remove_target_from_object(s_id, socket_id)
# 再移除該物件 # 再移除該物件
self.delete_mavlink_object(socket_id) self.delete_mavlink_object(socket_id)
elif action == "MAVOBJ_MAKE_LINK":
source_id, target_id = cmd[1], cmd[2]
self.add_target_to_object(source_id, target_id)
self.add_target_to_object(target_id, source_id) # 雙向連結
elif action == "MAVOBJ_ADD_TARGET": elif action == "MAVOBJ_ADD_TARGET":
source_id, target_id = cmd[1], cmd[2] source_id, target_id = cmd[1], cmd[2]
self.add_target_to_object(source_id, target_id) self.add_target_to_object(source_id, target_id)
@ -681,6 +860,9 @@ class Orchestrator:
self.panelState.socket_info_single["return_msg_types"] = mav_obj.return_msg_types self.panelState.socket_info_single["return_msg_types"] = mav_obj.return_msg_types
self.panelState.socket_info_single["primary_socket_id"] = mav_obj.primary_socket_id self.panelState.socket_info_single["primary_socket_id"] = mav_obj.primary_socket_id
self.panelState.socket_info_single["target_sockets"] = mav_obj.target_sockets self.panelState.socket_info_single["target_sockets"] = mav_obj.target_sockets
ip_info = mav_obj.mavlink_socket.port.getsockname()
self.panelState.socket_info_single["socket_connection_string"] = f"{ip_info[0]}:{ip_info[1]}"
# getattr(mav_obj.mavlink_socket, "connection_string", "")
self.panelState.socket_info_single["InfoReady"] = True # 標記資訊已準備好 self.panelState.socket_info_single["InfoReady"] = True # 標記資訊已準備好
elif cmd == "CREATE_UDP_INBOUND": elif cmd == "CREATE_UDP_INBOUND":
@ -689,10 +871,14 @@ class Orchestrator:
elif cmd == "CREATE_UDP_OUTBOUND": elif cmd == "CREATE_UDP_OUTBOUND":
self.panelState.udp_info_temp["direction"] = "outbound" self.panelState.udp_info_temp["direction"] = "outbound"
self.create_udp_object() self.create_udp_object()
elif cmd == "CREATE_SERIAL_PORT":
self.create_serial_port_object()
elif cmd == "SHUTDOWN_BRIDGE": elif cmd == "SHUTDOWN_BRIDGE":
self.bridge.stop() self.bridge.stop()
elif cmd == "SHUTDOWN_MANAGER": elif cmd == "SHUTDOWN_MANAGER":
self.manager.shutdown() self.manager.shutdown()
elif cmd == "SHUTDOWN_SERIAL_MANAGER":
self.plumber.shutdown()
except queue.Empty: except queue.Empty:
pass pass
except Exception as e: except Exception as e:
@ -706,18 +892,32 @@ class Orchestrator:
except Exception as e: except Exception as e:
logger.error(f"Unexpected error in main loop: {e}") logger.error(f"Unexpected error in main loop: {e}")
finally: finally:
logger.info("Main orchestrator END!")
# 關閉 mavlink_bridge (裡面有一個執行緒) # 驗證並確保所有模組都被下達關閉訊號
# 若是由面板操作結束系統 這些關閉行為將於 ControlPanel.pre_panel_shutdown() 觸發
if self.bridge.thread.is_alive():
if self.bridge.running:
self.bridge.stop() self.bridge.stop()
self.bridge.thread.join(timeout=2)
# 關閉 async_io_manager (裡面有一個執行緒) if self.manager.thread.is_alive():
if self.manager.running:
self.manager.shutdown() self.manager.shutdown()
self.manager.thread.join(timeout=2)
if self.plumber.thread.is_alive():
if self.plumber.running:
self.plumber.shutdown()
self.plumber.thread.join(timeout=2)
# 關閉面板執行緒 # 關閉面板執行緒
if self.panel_thread.is_alive(): if self.panel_thread.is_alive():
self.panel_thread.join(timeout=2) self.panel_thread.join(timeout=2)
logger.info("Main orchestrator END!")
# =============== 面板動作 - Mavlink Object ===============
def create_udp_object(self): def create_udp_object(self):
if self.panelState.udp_info_temp["direction"] == "inbound": if self.panelState.udp_info_temp["direction"] == "inbound":
connection_string = f"udp:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}" connection_string = f"udp:{self.panelState.udp_info_temp['IP']}:{self.panelState.udp_info_temp['Port']}"
@ -764,6 +964,46 @@ class Orchestrator:
else: else:
self.panelState.panel_info_msg_list.append((f"Fail Removing target {target_id} from socket {source_id}", time.time())) self.panelState.panel_info_msg_list.append((f"Fail Removing target {target_id} from socket {source_id}", time.time()))
# =============== 面板動作 - Serial Manager ===============
def create_serial_port_object(self):
# 獲取可用的 udp port
udp_port_tmp = find_available_port(19000, 20000)
# 定義通訊類型映射表
COMM_TYPE_MAP = {
"XBee(API-AT)": sm.SerialReceiverType.XBEEAPI2AT,
# "XBee(AT-AT)": sm.SerialReceiverType.TELEMETRY, # TODO: 之後再弄
# 新增區
}
# 驗證輸入
comm_type = self.panelState.serial_info_temp['CommunicationType']
if not comm_type:
self.panelState.panel_info_msg_list.append(
("Please select Communication Type first.", time.time())
)
return
# 查找對應的通訊類型
comm_type_tmp = COMM_TYPE_MAP.get(comm_type)
if comm_type_tmp is None:
self.panelState.panel_info_msg_list.append(
(f"Communication type '{comm_type}' not supported yet.", time.time())
)
return
ret = self.plumber.create_serial_link(
serial_port=self.panelState.serial_info_temp['Port'],
baudrate=self.panelState.serial_info_temp['Baud'],
target_port=udp_port_tmp,
receiver_type=comm_type_tmp,
)
if not ret:
self.panelState.panel_info_msg_list.append((f"Failed to create Serial Port object at {self.panelState.serial_info_temp['Port']}.", time.time()))
return
def main(): def main():
stop_evt = threading.Event() stop_evt = threading.Event()

@ -1,40 +1,35 @@
''' '''
# 不同的匯流排只有單一種通訊協定 這個檔案是對 udp 進來的 mavlink 訊息做 "分流" "轉拋" 的地方 (並不會做 "分析")
# 匯流排接到訊息後透過 ring buffer 傳送到橋接器 概念上 把每個 udp 接口 視為一個獨立的 mavlink bus 針對 bus 來統籌管理
# 會有兩種 RingBuffer 分別作為 1. 固定串流橋接器 2. 回傳封包處理器
# 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去
# 關於 mavlink 採用 pymavlink 這個套件 製作匯流排 主要的重點部分:
# pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlink_object 裡面 1. stream_bridge_ring & return_packet_ring
# 這邊的 main 會是用來初始化 mavlink_object 並啟動他的 run function 2. mavlink_object & async_io_manager
3. mavlink_bridge
================= 改版記錄 ============================
2025 6 20 stream_bridge_ring & return_packet_ring:
1. mavlink_object 由於 Queue 的效能太差 會完全移除 這兩個 ring buffer 是用來做 mavlink 訊息的分流
其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現 stream_bridge_ring 這邊的資訊比較是給會固定更新的資訊 (例如 HEARTBEAT, ATTITUDE 之類的)
multiplexingToSwap 會完全被移除代替方式下一條描述 return_packet_ring 比較是處理指令後的回應封包 (例如 PARAM_VALUE, MISSION_ITEM 等等)
2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流
(註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object) mavlink_object:
因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移 每個 mavlink bus 都會有一個 mavlink_object
並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出 使用 asyncio 處理資料流 RingBuffer 來分配訊息
3. mavlink_object 需要加上 state 去管理其狀態 內容中沒有獨立的執行緒 只有一個個 asyncio function 會被放到 async_io_manager 裡面執行
4. mavlink_object 需要加上 target port 去管理寫出的目標
5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道 關於分流與轉拋的具體實現是在 process_data 這個 asyncio function 裡面
6. 基於第1,2 updateMultiplexingList 會被完全移除
7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object async_io_manager:
8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring return_packet_ring 來取代 另外 swap_queues 會被完全移除 首先它紀律並管理所有 mavlink_object 實例
有自己一個獨立的執行緒 執行 asyncio loop (mavlink_object 裡面的 asyncio function 都會被放到這個 loop 裡面執行)
mavlink_bridge:
專門處理 stream_bridge_ring 裡面的訊息流
會把訊息流解開後 存放到 mavlinkVehicleView.py 定義的載具結構視圖
2025 11 15
1. mavlink_bridge 類別新增為 singleton 模式 確保全系統只有一個實例在運行
2. mavlink_bridge 處理封包改為映射表 (需要在 _init_message_handlers 中新增處理器函式)
3. mavlink_bridge 的主要迴圈 增加 send_message 功能 可指定目標 sysid socket_id 發送 mavlink 封包
4. async_io_manager 循環邏輯大改動 優化 mavlink_object 加入與移除的邏輯 並使得 task evenlt loop 分層更清楚
5. mavlink_object 移除不必要的 start stop 方法 async_io_manager 統一管理其生命週期
6. mavlink_object 優化 send_message 方法 避免無效判斷 增加一些防呆檢驗 並與 mavlink_bridge 連動工作
7. 移除迴圈內的 try except 堆疊 增加效能
8. 移除對於 mavlinkDevice 的依賴 改用 vehicle_registry 來管理所有的載具
''' '''
@ -61,7 +56,6 @@ from .mavlinkVehicleView import (
) )
from .utils import RingBuffer, setup_logger from .utils import RingBuffer, setup_logger
# ====================== 分割線 ===================== # ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__)) logger = setup_logger(os.path.basename(__file__))
@ -134,7 +128,7 @@ class mavlink_bridge:
"""停止 mavlink_bridge 的運作""" """停止 mavlink_bridge 的運作"""
self.running = False self.running = False
if self.thread and self.thread.is_alive(): if self.thread and self.thread.is_alive():
self.thread.join(timeout=3.0) self.thread.join(timeout=5.0)
# === Thread 區塊 === # === Thread 區塊 ===
def _run_thread(self): def _run_thread(self):
@ -331,8 +325,6 @@ class mavlink_bridge:
mav_obj = mavlink_object.mavlinkObjects[socket_id] mav_obj = mavlink_object.mavlinkObjects[socket_id]
return mav_obj.send_message(message_bytes) return mav_obj.send_message(message_bytes)
# ====================== 分割線 =====================
# 定義 mavlink_object 的狀態 # 定義 mavlink_object 的狀態
class MavlinkObjectState(Enum): class MavlinkObjectState(Enum):
INIT = auto() # 初始化狀態 INIT = auto() # 初始化狀態
@ -560,6 +552,7 @@ class async_io_manager:
start 方法 會先做一個新的執行緒 然後讓新的執行緒 透過 _run_event_loop 方法來建立一個空的事件循環 self.loop start 方法 會先做一個新的執行緒 然後讓新的執行緒 透過 _run_event_loop 方法來建立一個空的事件循環 self.loop
然後在 _run_event_loop 方法中 會建立一個異步任務 _main_task 來監控和管理所有的 mavlink_object 任務 然後在 _run_event_loop 方法中 會建立一個異步任務 _main_task 來監控和管理所有的 mavlink_object 任務
""" """
_instance = None _instance = None
_lock = threading.Lock() _lock = threading.Lock()
@ -639,13 +632,11 @@ class async_io_manager:
# 等待線程結束 # 等待線程結束
if self.thread and self.thread.is_alive(): if self.thread and self.thread.is_alive():
self.thread.join(timeout=10.0) self.thread.join(timeout=5.0)
if self.thread.is_alive(): if self.thread.is_alive():
logger.warning("async_io_manager thread did not stop gracefully") logger.warning("async_io_manager thread did not stop gracefully")
os.kill(os.getpid(), signal.SIGTERM) # 強制終止程序 os.kill(os.getpid(), signal.SIGTERM) # 強制終止程序
logger.info("async_io_manager thread END!") logger.info("async_io_manager thread END!")
def _run_event_loop(self): def _run_event_loop(self):
@ -789,3 +780,36 @@ class async_io_manager:
if __name__ == '__main__': if __name__ == '__main__':
pass pass
'''
================= 改版記錄 ============================
2025 6 20
1. mavlink_object 由於 Queue 的效能太差 會完全移除
其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現
multiplexingToSwap 會完全被移除代替方式下一條描述
2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流
(註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object)
因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移
並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出
3. mavlink_object 需要加上 state 去管理其狀態
4. mavlink_object 需要加上 target port 去管理寫出的目標
5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道
6. 基於第1,2 updateMultiplexingList 會被完全移除
7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object
8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring return_packet_ring 來取代 另外 swap_queues 會被完全移除
2025 11 15
1. mavlink_bridge 類別新增為 singleton 模式 確保全系統只有一個實例在運行
2. mavlink_bridge 處理封包改為映射表 (需要在 _init_message_handlers 中新增處理器函式)
3. mavlink_bridge 的主要迴圈 增加 send_message 功能 可指定目標 sysid socket_id 發送 mavlink 封包
4. async_io_manager 循環邏輯大改動 優化 mavlink_object 加入與移除的邏輯 並使得 task evenlt loop 分層更清楚
5. mavlink_object 移除不必要的 start stop 方法 async_io_manager 統一管理其生命週期
6. mavlink_object 優化 send_message 方法 避免無效判斷 增加一些防呆檢驗 並與 mavlink_bridge 連動工作
7. 移除迴圈內的 try except 堆疊 增加效能
8. 移除對於 mavlinkDevice 的依賴 改用 vehicle_registry 來管理所有的載具
'''

@ -0,0 +1,631 @@
'''
'''
# 基礎功能的 import
import asyncio
import serial_asyncio
import os
import sys
import serial
import signal
import time
import threading
import struct
from enum import Enum, auto
# # XBee 模組
# from xbee.frame import APIFrame
# 自定義的 import
from .utils import setup_logger
# ====================== 分割線 =====================
logger = setup_logger(os.path.basename(__file__))
# ====================== 分割線 =====================
class XBeeFrameHandler:
"""XBee API Frame 處理器"""
@staticmethod
def parse_at_command_response(frame: bytes) -> dict:
"""解析 AT Command Response (0x88)"""
if len(frame) < 8:
return None
frame_type = frame[3]
if frame_type != 0x88:
return None
frame_id = frame[4]
at_command = frame[5:7]
status = frame[7]
data = frame[8:] if len(frame) > 8 else b''
return {
'frame_id': frame_id,
'command': at_command,
'status': status,
'data': data,
'is_ok': status == 0x00
}
@staticmethod
def parse_receive_packet(frame: bytes) -> dict:
# """解析 RX Packet (0x90) - 未來擴展用"""
# if len(frame) < 15 or frame[3] != 0x90:
# return None
# return {
# 'source_addr': frame[4:12],
# 'reserved': frame[12:14],
# 'options': frame[14],
# 'data': frame[15:-1]
# }
pass
return None
@staticmethod
def encapsulate_data(data: bytes, dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', frame_id = 0x01) -> bytes:
"""
將數據封裝為 XBee API 傳輸幀
使用 XBee API 格式封裝數據:
- 傳輸請求幀 (0x10)
- 使用廣播地址
- 添加適當的頭部和校驗和
"""
frame_type = 0x10
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
@staticmethod
def decapsulate_data(data: bytes):
# 這裡可以根據需要進行數據解封裝
# XBee API 幀格式:
# 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節)
# 檢查幀起始符 (0x7E)
if not data or len(data) < 5 or data[0] != 0x7E:
return data
# 獲取數據長度 (不包括校驗和)
# length = (data[1] << 8) + data[2]
length = (data[1] << 8) | data[2]
# 檢查幀完整性
if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和
return data
# 提取API標識符和數據
frame_type = data[3]
# frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中
# 根據不同的幀類型進行處理
if frame_type == 0x90: # 例如,這是"接收數據包"類型
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
else:
return None
return data
class ATCommandHandler:
"""AT 指令回應處理器"""
def __init__(self, serial_port: str):
self.serial_port = serial_port
self.handlers = {
b'DB': self._handle_rssi,
b'SH': self._handle_serial_high,
b'SL': self._handle_serial_low,
# 可擴展其他 AT 指令
}
def handle_response(self, response: dict):
"""根據 AT 指令類型分派處理"""
if not response or not response['is_ok']:
if response:
print(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}")
return
command = response['command']
handler = self.handlers.get(command)
if handler:
handler(response['data'])
else:
print(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}")
def _handle_rssi(self, data: bytes):
"""處理 DB (RSSI) 回應"""
if not data:
return
rssi_value = data[0]
now = time.time()
# 檢查是否最近有收到 MAVLink
last_mavlink_time = serial_last_mavlink_time.get(self.serial_port, 0)
if now - last_mavlink_time > 0.5:
print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLinkRSSI = -{rssi_value} dBm 已忽略")
return
# 取得對應的 sysid
sysid = serial_to_sysid.get(self.serial_port)
if sysid is None:
print(f"[{self.serial_port}] 找不到 sysid 對應RSSI = -{rssi_value} dBm已忽略")
return
# 記錄 RSSI
rssi_history[sysid].append(-rssi_value)
time_history[sysid].append(now)
# print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm")
def _handle_serial_high(self, data: bytes):
# """處理 SH (Serial Number High) - 範例"""
# if len(data) >= 4:
# serial_high = int.from_bytes(data[:4], 'big')
# print(f"[{self.serial_port}] Serial High: 0x{serial_high:08X}")
pass
def _handle_serial_low(self, data: bytes):
# """處理 SL (Serial Number Low) - 範例"""
# if len(data) >= 4:
# serial_low = int.from_bytes(data[:4], 'big')
# print(f"[{self.serial_port}] Serial Low: 0x{serial_low:08X}")
pass
class SerialHandler(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發
def __init__(self, udp_handler, serial_port_str):
self.udp_handler = udp_handler # UDP 的傳輸把手
self.serial_port_str = serial_port_str
self.at_handler = ATCommandHandler(serial_port_str)
self.buffer = bytearray() # 用於緩存接收到的資料
self.transport = None # Serial 自己的傳輸物件
# self.first_data = True # 標記是否為第一次收到資料
# self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_handler, 'set_serial_handler'):
self.udp_handler.set_serial_handler(self)
# logger.info(f"Serial port {self.serial_port_str} connected.") # debug
# Serial 收到資料的處理過程
def data_received(self, data):
# 1. 把收到的資料加入緩衝區
self.buffer.extend(data)
# 2. 需要完整的 header 才能解析
while len(self.buffer) >= 3:
# 3. 瞄準 XBee API Frame (0x7E 開頭的封包)
if self.buffer[0] != 0x7E:
self.buffer.pop(0) # 如果不是就丟掉
continue
# 4. 讀取 payload 長度
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1
# 5. 等待完整封包
if len(self.buffer) < full_length:
break
# 6. 提取完整 frame 並從緩衝區移除
an_frame = self.buffer[:full_length]
del self.buffer[:full_length]
# 7. 判斷 frame 類型
frame_type = an_frame[3]
if frame_type == 0x88:
# 處理 AT Command 回應
# response = XBeeFrameHandler.parse_at_command_response(an_frame)
# self.at_handler.handle_response(response)
pass # debug
elif frame_type == 0x90:
# Receive Packet (RX) payload 先解碼
processed_data = XBeeFrameHandler.decapsulate_data(bytes(an_frame))
# 轉換失敗就捨棄了
if processed_data is None:
continue
# 再透過 UDP 送出
self.udp_handler.transport.sendto(processed_data, (self.udp_handler.LOCAL_HOST_IP, self.udp_handler.target_port))
elif frame_type == 0x8B:
pass
else:
# 其他類型的 frame 未來可擴展處理 現在忽略
logger.warning(f"[{self.serial_port_str}] Undefined frame type: 0x{frame_type:02X}")
# # RSSI
# if frame[3] == 0x88 and frame[5:7] == b'DB': # frame[3] == 0x88 AT -> API 封包
# # frame[5:7] == b'DB' -> API 封包的DB參數
# status = frame[7] #
# if status == 0x00 and len(frame) > 8: # status == 0x00 -> 這個封包是有效封包
# rssi_value = frame[8]
# now = time.time()
# # === 優化 1僅信任最近 0.5 秒內有接收 MAVLink 的 port
# last_time = serial_last_mavlink_time.get(self.serial_port, 0)
# if now - last_time <= 0.5:
# sysid = serial_to_sysid.get(self.serial_port, None)
# if sysid is not None:
# rssi_history[sysid].append(-rssi_value)
# time_history[sysid].append(now)
# # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm")
# else:
# print(f"[{self.serial_port}] 找不到 sysid 對應RSSI = -{rssi_value} dBm已忽略")
# else:
# print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLinkRSSI = -{rssi_value} dBm 已忽略")
# else:
# print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}")
# def write_to_serial(self, data):
# # 在資料透過 Serial 發送之前進行處理
# processed_data = self.encapsulate_data(data)
# # 處理失敗就不發送
# if processed_data not None:
# self.transport.write(processed_data)
class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發
LOCAL_HOST_IP = '127.0.0.1' # 只送給本地端IP
def __init__(self, target_port):
self.target_port = target_port # 目標 UDP 端口
self.serial_handler = None # Serial 的傳輸物件
self.transport = None # UDP 自己的傳輸物件
self.remote_addr = None # 儲存動態獲取的遠程地址 # debug
# self.has_processed = False # 測試模式用 處理數據旗標 # debug
def connection_made(self, transport):
self.transport = transport
# logger.info(f"UDP transport ready. Waiting for serial data before sending.") # debug
def set_serial_handler(self, serial_handler):
self.serial_handler = serial_handler
# UDP 收到資料的處理過程
def datagram_received(self, data, addr):
# 儲存對方的地址(這樣就能向同一個來源回傳數據)
# self.remote_addr = addr
# print(f"Received UDP data from {addr}, setting as remote address")
processed_data = XBeeFrameHandler.encapsulate_data(data)
if self.serial_handler:
self.serial_handler.transport.write(processed_data)
# def send_udp(self, data):
# # 藉由 UDP 發送資料出去
# # 在透過 UDP 發送數據之前進行解封裝
# decoded_data = self.decapsulate_data(data)
# if decoded_data is None:
# return
# if self.transport:
# self.transport.sendto(decoded_data, (self.LOCAL_HOST_IP, self.target_port))
#==================================================================
class SerialReceiverType(Enum):
"""連接類型"""
TELEMETRY = auto()
XBEEAPI2AT = auto()
OTHER = auto()
class serial_manager:
class serial_object:
def __init__(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType):
self.serial_port = serial_port # /dev/ttyUSB or COM3 ...etc
self.baudrate = baudrate
self.receiver_type = receiver_type
self.target_port = target_port # 指向的 UPD 端口
self.transport = None
self.protocol = None
self.udp_handler = None
self.serial_handler = None
def __init__(self):
self.thread = None
self.loop = None
self.running = False
self.serial_count = 0
self.serial_objects = {} # serial id num : serial object
def __del__(self):
self.loop = None
self.thread = None
def start(self):
if self.running:
logger.warning("serial_manager already running")
return
self.running = True
# 啟動獨立線程 命名為 SerialManager
self.thread = threading.Thread(
target=self._run_event_loop,
name="SerialManager"
)
self.thread.daemon = False # 不設為 daemon確保正確關閉
self.thread.start()
# 等待 _run_event_loop 建立事件循環的物件 self.loop
start_timeout = 2.0
start_time = time.time()
while not self.loop and time.time() - start_time < start_timeout:
time.sleep(0.1)
# 檢查另一個執行緒有沒有成功建立事件循環物件 self.loop
if self.loop:
logger.info("serial_manager thread started <-")
return True
else:
logger.error("serial_manager failed to start")
return False
def shutdown(self):
"""停止 serial_manager 和其管理的所有 serial_object"""
# 自己在 running 狀態下才執行停止程序
if not self.running:
logger.warning("serial_manager is not running")
return
# 停止所有被管理的 serial_object
for serial_id in list(self.serial_objects.keys()):
self.remove_serial_link(serial_id)
# 停止自己
self.running = False
# 解開事件循環的阻塞
if self.loop:
self.loop.call_soon_threadsafe(self.loop.stop)
# 等待線程結束
if self.thread and self.thread.is_alive():
self.thread.join(timeout=5.0)
if self.thread.is_alive():
logger.warning("serial_manager thread did not stop gracefully")
logger.info("serial_manager thread END!")
def _run_event_loop(self):
"""在獨立線程中運行 asyncio 事件循環"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
# # 為每個 serial_object 建立連接
# for serial_obj in self.serial_objects:
# coro = serial_asyncio.create_serial_connection(
# self.loop,
# lambda: SerialProtocol(serial_obj.receiver_type),
# serial_obj.serial_port,
# baudrate=serial_obj.baudrate
# )
# transport, protocol = self.loop.run_until_complete(coro)
# serial_obj.transport = transport
# serial_obj.protocol = protocol
try:
self.loop.run_forever()
finally:
self.loop.close()
def create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType):
if not self.running or not self.loop:
logger.error("Event loop not running, cannot create serial link")
return False
# 檢查 serial port 有效
if not self.check_serial_port(serial_port, baudrate):
logger.error(f"Serial port {serial_port} validation failed")
return False
# 使用 run_coroutine_threadsafe 執行協程並獲取結果
future = asyncio.run_coroutine_threadsafe(
self._async_create_serial_link(serial_port, baudrate, target_port, receiver_type),
self.loop
)
try:
# 等待結果,設定合理的超時時間
result = future.result(timeout=5.0)
if result:
logger.info(f"Create Serial Link: {serial_port} -> UDP {target_port}")
return True
except asyncio.TimeoutError:
logger.error(f"Timeout creating serial link for {serial_port}")
return False
except Exception as e:
logger.error(f"Failed to create serial link for {serial_port}: {e}")
return False
async def _async_create_serial_link(self, serial_port, baudrate, target_port, receiver_type: SerialReceiverType):
"""在事件循環線程中執行實際的連接創建"""
try:
# 創建 serial_object 實例
serial_obj = self.serial_object(serial_port, baudrate, target_port, receiver_type)
# 建立 UDP 處理器並指定目標端口位置
serial_obj.udp_handler = UDPHandler(target_port)
# 建立 UDP 傳輸,不指定接收端口(自己),讓系統自動分配
udp_transport, udp_protocol = await self.loop.create_datagram_endpoint(
lambda: serial_obj.udp_handler,
local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口
)
serial_obj.transport = udp_transport
serial_obj.protocol = udp_protocol
# logger.info(f"UDP endpoint created for {serial_port}") # debug
# 建立 Serial 處理器,將 UDP 處理器傳給它
serial_obj.serial_handler = SerialHandler(serial_obj.udp_handler, serial_port)
# 建立 Serial 連接
serial_transport, _ = await serial_asyncio.create_serial_connection(
self.loop,
lambda: serial_obj.serial_handler,
serial_port,
baudrate=baudrate
)
# logger.info(f"Serial connection created for {serial_port}") # debug
# 將 serial_object 加入管理列表
serial_id = self.serial_count + 1
self.serial_objects[serial_id] = serial_obj
self.serial_count += 1
# logger.info(f"Serial object {serial_id} added to manager") # debug
return True
except Exception as e:
logger.error(f"Exception in _async_create_serial_link for {serial_port}: {str(e)}")
# 清理已創建的資源
if 'serial_obj' in locals():
if hasattr(serial_obj, 'transport') and serial_obj.transport:
serial_obj.transport.close()
return False
def remove_serial_link(self, serial_id):
"""移除串口連接(線程安全方式)"""
# 確保事件循環正在運行
if not self.loop:
logger.error("Event loop not running")
return False
# 檢查 serial_id 是否存在
if serial_id not in self.serial_objects:
logger.warning(f"Serial object {serial_id} not found")
return False
# 使用 run_coroutine_threadsafe 執行協程
future = asyncio.run_coroutine_threadsafe(
self._async_remove_serial_link(serial_id),
self.loop
)
try:
result = future.result(timeout=3.0)
if result:
logger.info(f"Remove Serial Link {serial_id}")
return result
except asyncio.TimeoutError:
logger.error(f"Timeout removing serial link {serial_id}")
return False
except Exception as e:
logger.error(f"Failed to remove serial link {serial_id}: {e}")
return False
async def _async_remove_serial_link(self, serial_id):
"""在事件循環線程中執行實際的連接移除"""
if serial_id not in self.serial_objects:
logger.warning(f"Serial object {serial_id} not in managed list")
return False
try:
serial_obj = self.serial_objects[serial_id]
# 關閉 UDP transport
if hasattr(serial_obj, 'transport') and serial_obj.transport:
serial_obj.transport.close()
# 關閉 Serial transport
if hasattr(serial_obj, 'serial_handler') and serial_obj.serial_handler:
if hasattr(serial_obj.serial_handler, 'transport') and serial_obj.serial_handler.transport:
serial_obj.serial_handler.transport.close()
# 從管理列表中移除
del self.serial_objects[serial_id]
# logger.info(f"Serial object {serial_id} removed from manager") # debug
return True
except Exception as e:
logger.error(f"Exception in _async_remove_serial_link for {serial_id}: {str(e)}")
return False
def get_serial_link(self):
ret = {} # serial id num : serial_port string
for key, obj in self.serial_objects.items():
ret[key] = obj.serial_port
return ret
@staticmethod
def check_serial_port(serial_port, baudrate):
"""檢查串口是否存在與可用"""
# 檢查設備是否存在
if not os.path.exists(serial_port):
logger.error(f"Serial Device {serial_port} Not Found")
return False
# 檢查是否有權限訪問設備
try:
if not os.access(serial_port, os.R_OK | os.W_OK):
logger.error(f"No permission to access {serial_port}")
return False
except Exception as e:
logger.error(f"Cannot Access Serial Device {serial_port}: {str(e)}")
return False
# 檢查是否被占用
try:
# 嘗試打開串口
ser = serial.Serial(serial_port, baudrate)
ser.close() # 打開成功後立即關閉
return True
except serial.SerialException as e:
logger.error(f"Serial Device {serial_port} is Occupied or Inaccessible: {str(e)}")
return False
except Exception as e:
logger.error(f"Unknown Error: {str(e)}")
return False
if __name__ == '__main__':
sm = serial_manager()
sm.start()
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14571
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialReceiverType.XBEEAPI2AT)
linked_serial = sm.get_serial_link()
print(linked_serial)
time.sleep(10)
sm.shutdown()

@ -1,14 +0,0 @@
'''
透過某個地方 得到 udp uart 接口
對於每個接口 視為一個獨立的物件
物件對於不同的接口 是為不同類型的物件
每個類型的物件 創建一個獨立的執行緒 來處理資料
關於執行緒的實作 是寫在另一個模組
物件之間 也可以做資料轉換/轉拋
'''

@ -0,0 +1,129 @@
import socket
import random
import os
def get_used_ports():
"""
/proc/net/tcp /proc/net/udp 讀取系統已占用的 port
直接讀取 Linux 系統資訊避免暴力嘗試
Returns:
set: 已被占用的 port 號集合
"""
used_ports = set()
# 讀取 TCP 占用的 port (包含 IPv4 和 IPv6)
for filepath in ['/proc/net/tcp', '/proc/net/tcp6']:
if os.path.exists(filepath):
try:
with open(filepath, 'r') as f:
lines = f.readlines()[1:] # 跳過標題行
for line in lines:
parts = line.split()
if len(parts) > 1:
# local_address 格式: "0100007F:1F90" (hex)
local_addr = parts[1]
port_hex = local_addr.split(':')[1]
port = int(port_hex, 16)
used_ports.add(port)
except (IOError, PermissionError):
pass
# 讀取 UDP 占用的 port (包含 IPv4 和 IPv6)
for filepath in ['/proc/net/udp', '/proc/net/udp6']:
if os.path.exists(filepath):
try:
with open(filepath, 'r') as f:
lines = f.readlines()[1:] # 跳過標題行
for line in lines:
parts = line.split()
if len(parts) > 1:
local_addr = parts[1]
port_hex = local_addr.split(':')[1]
port = int(port_hex, 16)
used_ports.add(port)
except (IOError, PermissionError):
pass
return used_ports
def is_port_available(port):
"""
測試指定 port 是否可用 (TCP UDP 都測試)
Args:
port (int): 要測試的 port
Returns:
bool: True 表示可用False 表示被占用
"""
# 測試 TCP
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', port))
except OSError:
return False
# 測試 UDP
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('', port))
except OSError:
return False
return True
def find_available_port(start_port=1024, end_port=65535):
"""
在指定的 port 區間內隨機找出一個未被占用的 port
使用 Linux /proc/net 系統資訊來過濾已占用的 port避免暴力嘗試
確保 TCP UDP 都可用
Args:
start_port (int): 起始 port (預設 1024)
end_port (int): 結束 port (預設 65535)
Returns:
int: 可用的 port 如果找不到則返回 None
"""
if start_port < 1 or end_port > 65535 or start_port >= end_port:
raise ValueError("Port 範圍必須在 1-65535 之間,且起始 port 必須小於結束 port")
# 從系統讀取已占用的 port
used_ports = get_used_ports()
# 計算可用的 port 列表 (排除已占用的)
available_ports = [p for p in range(start_port, end_port + 1) if p not in used_ports]
if not available_ports:
return None
# 隨機打亂順序
random.shuffle(available_ports)
# 從可用列表中挑選,再用 socket 雙重確認 TCP 和 UDP 都可用
for port in available_ports:
if is_port_available(port):
return port
# 如果都不可用
return None
if __name__ == "__main__":
# 使用範例
port = find_available_port(8000, 9000)
if port:
print(f"找到可用的 port: {port}")
else:
print("找不到可用的 port")
# 自訂範圍範例
port = find_available_port(10000, 20000)
if port:
print(f"在 10000-20000 範圍找到可用的 port: {port}")

@ -0,0 +1,96 @@
"""
Serial Port Discovery Utility
This module provides functions to discover available serial ports on the system.
It uses glob pattern matching to find serial device files in /dev/.
"""
import glob
from typing import List
def get_serial_ports() -> List[str]:
"""
獲取系統中所有可用的串口設備列表
Linux 系統中會搜尋以下模式的設備
- /dev/ttyUSB* (USB 串口設備)
- /dev/ttyACM* (USB CDC ACM 設備)
- /dev/ttyS* (標準串口)
Returns:
List[str]: 包含所有找到的串口設備路徑的列表
Example:
>>> ports = get_serial_ports()
>>> print(ports)
['/dev/ttyUSB0', '/dev/ttyUSB1', '/dev/ttyS0']
"""
serial_ports = []
# 搜尋不同類型的串口設備
patterns = [
'/dev/ttyUSB*', # USB 串口轉換器
'/dev/ttyACM*', # USB CDC ACM 設備(如 Arduino
'/dev/ttyS*', # 標準串口
]
for pattern in patterns:
serial_ports.extend(glob.glob(pattern))
# 排序以便於閱讀
serial_ports.sort()
return serial_ports
def get_serial_ports_with_filter(filter_pattern: str = None) -> List[str]:
"""
獲取串口設備列表可選擇性地使用自訂篩選模式
Args:
filter_pattern (str, optional): 自訂的 glob 模式例如 '/dev/ttyUSB*'
如果為 None則使用預設模式搜尋所有串口
Returns:
List[str]: 符合條件的串口設備路徑列表
Example:
>>> # 只搜尋 USB 串口
>>> usb_ports = get_serial_ports_with_filter('/dev/ttyUSB*')
>>> print(usb_ports)
['/dev/ttyUSB0', '/dev/ttyUSB1']
"""
if filter_pattern:
serial_ports = glob.glob(filter_pattern)
serial_ports.sort()
return serial_ports
else:
return get_serial_ports()
if __name__ == "__main__":
# 使用範例
print("=== Serial Port Discovery ===\n")
# 1. 獲取所有串口設備
all_ports = get_serial_ports()
print(f"找到 {len(all_ports)} 個串口設備:")
for port in all_ports:
print(f" - {port}")
print("\n" + "="*30 + "\n")
# 2. 只搜尋 USB 串口
usb_ports = get_serial_ports_with_filter('/dev/ttyUSB*')
print(f"找到 {len(usb_ports)} 個 USB 串口設備:")
for port in usb_ports:
print(f" - {port}")
print("\n" + "="*30 + "\n")
# 3. 只搜尋 ACM 設備
acm_ports = get_serial_ports_with_filter('/dev/ttyACM*')
print(f"找到 {len(acm_ports)} 個 ACM 設備:")
for port in acm_ports:
print(f" - {port}")

@ -19,7 +19,7 @@ from ..fc_network_adapter import mavlinkVehicleView as mvv
# ====================== 分割線 ===================== # ====================== 分割線 =====================
test_item = 10 test_item = 1
running_time = 3 running_time = 3
@ -31,7 +31,24 @@ print('test_item : ', test_item)
測試項 1X 表示 mavlink_object 的功能 測試連線的能力 測試項 1X 表示 mavlink_object 的功能 測試連線的能力
''' '''
if test_item == 10: if test_item == 1:
print('===> Start of Program .Test ', test_item)
connection_string="udp:127.0.0.1:14591"
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
# mavlink_object1 = mo.mavlink_object(mavlink_socket1)
time.sleep(1)
print("mark A")
# print("Socket IP:", mavlink_socket1.target_system)
print("Socket port:", mavlink_socket1.port.getsockname())
# print("=== ", dir(mavlink_socket1.port))
elif test_item == 10:
# 需要開啟一個 ardupilot 的模擬器 # 需要開啟一個 ardupilot 的模擬器
# 測試 mavlink_object 放入 ring buffer 的應用 # 測試 mavlink_object 放入 ring buffer 的應用
print('===> Start of Program .Test ', test_item) print('===> Start of Program .Test ', test_item)

Loading…
Cancel
Save