Compare commits

...

10 Commits

@ -1,7 +1,5 @@
這是天雷系統的專案
===
## 功能簡介
1. mavlink 多對多支援平台

@ -4,10 +4,14 @@ import math
import re
import threading
from threading import Lock
from concurrent.futures import ThreadPoolExecutor
import asyncio
import websockets
import json
import socket
import sys
import os
import traceback
from pymavlink import mavutil
from geometry_msgs.msg import Point, Vector3
from sensor_msgs.msg import BatteryState, NavSatFix, Imu
@ -15,6 +19,23 @@ from std_msgs.msg import Float64, String
from mavros_msgs.msg import State, VfrHud
from mavros_msgs.srv import CommandBool, CommandTOL
# 確保 src 目錄在 Python 路徑中(用於 fc_network_apps 導入)
_src_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _src_path not in sys.path:
sys.path.insert(0, _src_path)
# 導入 fc_network_apps 的 longCommand統一的 MAV_CMD_* API
try:
from fc_network_apps.longCommand import CommandLongClient
except ImportError as e:
import traceback
print(f"⚠️ 警告: 無法導入 CommandLongClient")
print(f" 错误: {e}")
print(f" 这通常表示 ROS2 的 fc_interfaces 包未被编译或未正确安装")
print(f" 完整堆栈跟踪:")
traceback.print_exc()
CommandLongClient = None
class DroneSignals(QObject):
update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data)
@ -536,44 +557,163 @@ class DroneMonitor(Node):
setattr(self, f'drone_{sys_id}_subs', subs)
async def arm_drone(self, drone_id, arm):
if drone_id not in self.arm_clients:
return False
client = self.arm_clients[drone_id]
if not client.wait_for_service(timeout_sec=1.0):
return False
request = CommandBool.Request()
request.value = arm
future = client.call_async(request)
# ================================================================================
# 【新增】模式名稱到 custom_mode 值的映射(基於 Copter 模式)
# ================================================================================
MODE_MAPPING = {
"STABILIZE": 0,
"ACRO": 1,
"ALT_HOLD": 2,
"AUTO": 3,
"GUIDED": 4,
"LOITER": 5,
"RTL": 6,
"CIRCLE": 7,
"POSITION": 8,
"LAND": 9,
"OF_LOITER": 10,
"DRIFT": 11,
"SPORT": 13,
"FLIP": 14,
"AUTOTUNE": 15,
"POSHOLD": 16,
"BRAKE": 17,
"THROW": 18,
"AVOID_ADSB": 19,
"GUIDED_NOGPS": 20,
"SMART_RTL": 21,
}
# ================================================================================
async def set_mode(self, drone_id, mode_name):
"""使用 CommandLongClient 切換無人機飛行模式"""
try:
response = await future
return response.success
# 解析 drone_id 提取 sysid
parts = drone_id.split('_')
if len(parts) < 2:
print(f"❌ [SET_MODE] 無效的 drone_id 格式: {drone_id}")
return False
sysid = int(parts[-1])
# 獲取模式對應的 custom_mode 值
custom_mode = self.MODE_MAPPING.get(mode_name)
if custom_mode is None:
print(f"❌ [SET_MODE] 未知模式: {mode_name}")
return False
print(f"\n📢 [SET_MODE] {drone_id}{mode_name} (custom_mode={custom_mode})")
# 在線程池中調用 CommandLongClient
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
def _call_set_mode():
client = CommandLongClient() if CommandLongClient else None
if not client:
return False
result = client.change_mode(
target_sysid=sysid,
custom_mode=float(custom_mode),
target_compid=0,
base_mode=1.0,
timeout_sec=2.0,
)
client.destroy_node()
return result.success if result else False
result = await loop.run_in_executor(executor, _call_set_mode)
if result:
print(f"✅ [SET_MODE] 模式切換成功")
return True
else:
print(f"❌ [SET_MODE] 模式切換失敗")
return False
except Exception as e:
self.get_logger().error(f'Arm service call failed: {e}')
print(f"❌ [SET_MODE] 例外錯誤: {e}")
traceback.print_exc()
return False
async def takeoff_drone(self, drone_id, altitude=10.0):
if drone_id not in self.takeoff_clients:
return False
async def arm_drone(self, drone_id, arm):
"""使用 CommandLongClient 執行 ARM/DISARM"""
try:
# 解析 drone_id 提取 sysid
parts = drone_id.split('_')
if len(parts) < 2:
print(f"❌ [ARM] 無效的 drone_id 格式: {drone_id}")
return False
sysid = int(parts[-1])
client = self.takeoff_clients[drone_id]
if not client.wait_for_service(timeout_sec=1.0):
return False
action_name = "解鎖" if arm else "上鎖"
print(f"\n📢 [ARM] {drone_id}{action_name}")
request = CommandTOL.Request()
request.altitude = altitude
request.min_pitch = 0.0
request.yaw = 0.0
future = client.call_async(request)
# 在線程池中調用 CommandLongClient
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
def _call_arm_disarm():
client = CommandLongClient() if CommandLongClient else None
if not client:
return False
result = client.arm_disarm(
target_sysid=sysid,
arm=arm,
target_compid=0,
timeout_sec=2.0,
)
client.destroy_node()
return result.success if result else False
result = await loop.run_in_executor(executor, _call_arm_disarm)
if result:
print(f"✅ [ARM] {action_name}成功")
return True
else:
print(f"❌ [ARM] {action_name}失敗")
return False
except Exception as e:
print(f"❌ [ARM] 例外錯誤: {e}")
traceback.print_exc()
return False
async def takeoff_drone(self, drone_id, altitude):
"""使用 CommandLongClient 執行無人機起飛"""
try:
response = await future
return response.success
# 解析 drone_id 提取 sysid
parts = drone_id.split('_')
if len(parts) < 2:
print(f"❌ [TAKEOFF] 無效的 drone_id 格式: {drone_id}")
return False
sysid = int(parts[-1])
print(f"\n📢 [TAKEOFF] {drone_id} → 起飛 (高度={altitude}m)")
# 在線程池中調用 CommandLongClient
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
def _call_takeoff():
client = CommandLongClient() if CommandLongClient else None
if not client:
return False
result = client.takeoff(
target_sysid=sysid,
altitude_m=float(altitude),
target_compid=0,
timeout_sec=2.0,
)
client.destroy_node()
return result.success if result else False
result = await loop.run_in_executor(executor, _call_takeoff)
if result:
print(f"✅ [TAKEOFF] 起飛成功")
return True
else:
print(f"❌ [TAKEOFF] 起飛失敗")
return False
except Exception as e:
self.get_logger().error(f'Takeoff service call failed: {e}')
print(f"❌ [TAKEOFF] 例外錯誤: {e}")
traceback.print_exc()
return False
def send_setpoint(self, drone_id, x, y, z):

@ -12,6 +12,7 @@ import asyncio
import json
import subprocess
import time
import traceback
# 導入分離的類別
from communication import DroneMonitor, UDPMavlinkReceiver, WebSocketMavlinkReceiver
@ -32,15 +33,18 @@ from mission_group import (
# ================================================================================
class ControlStationUI(QMainWindow):
VERSION = '1.0.1'
VERSION = '2.0.3'
def __init__(self):
super().__init__()
self.setWindowTitle(f'GCS v{self.VERSION}')
self.resize(1400, 900)
# 初始化ROS2
rclpy.init()
# 初始化消息隊列(用於線程安全的 GUI 更新)
import queue
self.message_queue = queue.Queue()
# 初始化ROS2 MonitorROS2 本身在 main() 中已初始化)
self.monitor = DroneMonitor()
self.monitor.signals.update_signal.connect(self.update_ui)
@ -53,11 +57,22 @@ class ControlStationUI(QMainWindow):
self.timer.timeout.connect(self.spin_ros)
self.timer.start(10)
# 驅動 asyncio 事件循環的定時器(新增 - 關鍵!)
# 這允許異步任務(如 arm_drone能夠執行
self.asyncio_timer = QTimer()
self.asyncio_timer.timeout.connect(self._spin_asyncio)
self.asyncio_timer.start(10) # 每 10ms 驅動一次 asyncio
# 初始化 panel 和 map 更新10Hz
self.panel_map_timer = QTimer()
self.panel_map_timer.timeout.connect(self._update_panel_and_map)
self.panel_map_timer.start(100) # 10Hz
# 消息隊列處理定時器(來自線程的 GUI 更新)
self.msg_queue_timer = QTimer()
self.msg_queue_timer.timeout.connect(self._process_message_queue)
self.msg_queue_timer.start(50) # 每 50ms 檢查一次
# 快取消息數據,以便在沒有新消息時使用上一次的值
self._message_cache = {}
@ -164,20 +179,29 @@ class ControlStationUI(QMainWindow):
# ========== 任務群組 Tab ==========
group_header = QHBoxLayout()
# 標題 + 收起/展開按鈕
title_layout = QHBoxLayout()
group_title = QLabel("任務群組")
group_title.setStyleSheet(
"color: #DDD; font-size: 14px; font-weight: bold; padding: 2px;")
group_header.addWidget(group_title)
group_header.addStretch()
add_group_btn = QPushButton("+ 新增群組")
add_group_btn.setStyleSheet("""
QPushButton { background-color: #4A9EFF; color: white; border: none;
padding: 5px 12px; border-radius: 4px; font-size: 12px; font-weight: bold; }
QPushButton:hover { background-color: #3A8EEF; }
title_layout.addWidget(group_title)
# 收起/展開按鈕
self.toggle_group_btn = QPushButton("")
self.toggle_group_btn.setStyleSheet("""
QPushButton { background-color: #555; color: white; border: none;
padding: 3px 8px; border-radius: 3px; font-size: 12px; font-weight: bold;
min-width: 30px; max-width: 30px; }
QPushButton:hover { background-color: #666; }
""")
add_group_btn.clicked.connect(self._add_mission_group)
group_header.addWidget(add_group_btn)
self.toggle_group_btn.setToolTip("收起/展開任務群組")
self.toggle_group_btn.clicked.connect(self._toggle_group_panel)
title_layout.addWidget(self.toggle_group_btn)
title_layout.addStretch()
group_header.addLayout(title_layout)
group_header.addStretch()
clear_traj_btn = QPushButton("清除軌跡")
clear_traj_btn.setStyleSheet("""
@ -204,6 +228,9 @@ class ControlStationUI(QMainWindow):
self.group_tab_widget.setFixedHeight(150)
self.group_tab_widget.currentChanged.connect(self._on_group_tab_changed)
right_layout.addWidget(self.group_tab_widget)
# 🌟 新增:保存群組面板的展開狀態
self.group_panel_expanded = True
# 預設建立 Group A
self._add_mission_group()
@ -353,6 +380,8 @@ class ControlStationUI(QMainWindow):
status_label.setToolTip("運行中")
self.statusBar().showMessage(f"已啟動 Serial 連接: {conn['port']}", 3000)
except Exception as e:
print(f"❌ Serial 連接啟動失敗: {str(e)}")
traceback.print_exc()
self.statusBar().showMessage(f"啟動 Serial 連接失敗: {str(e)}", 5000)
def remove_serial_connection(self, conn, panel):
@ -377,22 +406,39 @@ class ControlStationUI(QMainWindow):
# ================================================================================
def handle_mode_change(self, drone_id):
print(f"\n📢 [GUI] handle_mode_change 被调用")
print(f" drone_id: {drone_id}")
# 從 active group 的 mode_combo 讀取模式
group = self._get_active_group()
if group:
panel = self.group_panels.get(group.group_id)
mode = panel.mode_combo.currentText() if panel else "GUIDED"
print(f" 从群组读取模式: {mode}")
else:
mode = "GUIDED"
print(f" 没有活跃群组,使用默认模式: {mode}")
loop = asyncio.get_event_loop()
future = self.monitor.set_mode(drone_id, mode)
loop.create_task(self.handle_service_response(future, f"切換模式 {mode} {drone_id}"))
def handle_arm(self, drone_id):
print(f"\n📢 [GUI] handle_arm 被調用")
print(f" drone_id: {drone_id}")
loop = asyncio.get_event_loop()
print(f" 獲得事件循環: {loop}")
arm_state = not self.monitor.get_arm_state(drone_id)
future = self.monitor.arm_drone(drone_id, arm_state)
loop.create_task(self.handle_service_response(future, f"{'解鎖' if arm_state else '上鎖'} {drone_id}"))
print(f" 當前鎖定狀態: {not arm_state}, 目標狀態: {arm_state}")
print(f" 準備調用 arm_drone(drone_id={drone_id}, arm={arm_state})")
coro = self.monitor.arm_drone(drone_id, arm_state)
print(f" arm_drone() 返回類型: {type(coro)}")
print(f" coroutine 對象: {coro}")
action_text = f"{'解鎖' if arm_state else '上鎖'} {drone_id}"
print(f" 準備提交到事件循環: {action_text}")
task = loop.create_task(self.handle_service_response(coro, action_text))
print(f" task 已創建: {task}")
print(f" 已提交到事件循環\n")
def handle_takeoff(self, drone_id):
loop = asyncio.get_event_loop()
@ -425,20 +471,52 @@ class ControlStationUI(QMainWindow):
self.statusBar().showMessage("座標格式錯誤", 3000)
async def handle_service_response(self, future, action):
print(f"\n📥 [GUI] handle_service_response 開始處理: {action}")
print(f" Future/Coroutine 類型: {type(future)}")
print(f" Future/Coroutine 對象: {future}")
try:
print(f" ├─ 等待 future/coroutine 完成...")
print(f" └─ (這是一個阻塞等待,直到服務返回)\n")
result = await future
print(f"\n ✓ Future/Coroutine 完成,接收到返回值!")
print(f" ├─ 返回值類型: {type(result)}")
print(f" ├─ 返回值內容: {result}")
print(f" ├─ 返回值 == True: {result == True}")
print(f" ├─ 返回值 is True: {result is True}")
print(f" └─ bool(返回值): {bool(result)}")
# 詳細檢查 result 結構(用於調試 fc_network 回傳值)
if hasattr(result, '__dict__'):
print(f" └─ 返回值屬性: {result.__dict__}")
if result:
print(f"\n{action} 成功 (result={result})")
self.statusBar().showMessage(f"{action} 成功", 3000)
else:
print(f"\n{action} 失敗 (result={result})")
self.statusBar().showMessage(f"{action} 失敗", 3000)
except asyncio.CancelledError:
print(f"⚠️ {action} 被取消")
except Exception as e:
print(f"{action} 錯誤: {str(e)}")
traceback.print_exc()
self.statusBar().showMessage(f"{action} 錯誤: {str(e)}", 3000)
def handle_arm_selected(self):
print(f"\n📢 [GUI] handle_arm_selected 被調用")
selected = list(self.monitor.selected_drones)
print(f" 已選擇的無人機: {selected}")
loop = asyncio.get_event_loop()
for drone_id in self.monitor.selected_drones:
future = self.monitor.arm_drone(drone_id, True)
loop.create_task(self.handle_service_response(future, f"批次解鎖 {drone_id}"))
for drone_id in selected:
print(f" 準備批次解鎖: {drone_id}")
coro = self.monitor.arm_drone(drone_id, True)
print(f" arm_drone 返回: {coro}")
# 使用 run_coroutine_threadsafe() 正確地在事件循環中運行
asyncio.run_coroutine_threadsafe(
self.handle_service_response(coro, f"批次解鎖 {drone_id}"),
loop
)
print(f" handle_arm_selected 完成\n")
def handle_takeoff_selected(self):
loop = asyncio.get_event_loop()
@ -456,6 +534,23 @@ class ControlStationUI(QMainWindow):
self._group_counter += 1
return gid
def _toggle_group_panel(self):
"""🌟 收起/展開任務群組面板"""
if self.group_panel_expanded:
# 收起
self.group_tab_widget.setFixedHeight(0)
self.group_tab_widget.hide()
self.toggle_group_btn.setText("")
self.toggle_group_btn.setToolTip("展開任務群組")
self.group_panel_expanded = False
else:
# 展開
self.group_tab_widget.setFixedHeight(150)
self.group_tab_widget.show()
self.toggle_group_btn.setText("")
self.toggle_group_btn.setToolTip("收起任務群組")
self.group_panel_expanded = True
def _add_mission_group(self):
"""新增一個任務群組"""
gid = self._next_group_id()
@ -475,6 +570,8 @@ class ControlStationUI(QMainWindow):
panel.box_select_requested.connect(self._handle_box_select)
panel.select_all_requested.connect(self._handle_select_all_for_group)
panel.clear_group_requested.connect(self._handle_clear_group)
panel.add_group_requested.connect(self._add_mission_group)
panel.delete_group_requested.connect(self._handle_delete_group)
self.group_panels[gid] = panel
# 用帶顏色的 tab 標題
@ -538,6 +635,19 @@ class ControlStationUI(QMainWindow):
if panel:
panel.update_drone_list()
panel.update_status()
# 同步更新左側面板的 checkbox 狀態
self.monitor.selected_drones = group.drone_ids.copy()
for drone_id in all_ids:
if drone_id in self.drones:
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
checkbox.blockSignals(True)
checkbox.setChecked(drone_id in group.drone_ids)
checkbox.blockSignals(False)
# 更新 socket 群組的 checkbox 狀態
self.update_group_checkbox_state(self.get_socket_id(drone_id))
self.statusBar().showMessage(
f"Group {group_id}: 已分配 {len(group.drone_ids)} 台無人機", 3000)
@ -611,23 +721,86 @@ class ControlStationUI(QMainWindow):
def _handle_group_mode_change(self, group_id, mode):
"""切換群組內所有無人機的飛行模式"""
print(f"\n📢 [GUI] _handle_group_mode_change 被调用", flush=True)
print(f" group_id: {group_id}, mode: {mode}", flush=True)
group = self.mission_groups.get(group_id)
if not group:
print(f"❌ 找不到群組: {group_id}", flush=True)
return
if not group.drone_ids:
print(f"⚠️ 群組中沒有無人機", flush=True)
self.statusBar().showMessage(f"群組 {group_id} 中沒有無人機", 3000)
return
print(f" 準備為 {len(group.drone_ids)} 台無人機切換模式...", flush=True)
self.statusBar().showMessage(f"正在切換模式...", 1000)
# 使用 asyncio 執行(通過事件循環)
async def do_mode_changes_async():
print(f"\n 【異步任務】開始執行模式切換", flush=True)
for drone_id in group.drone_ids:
print(f"\n 【切換】{drone_id}{mode}", flush=True)
try:
result = await self.monitor.set_mode(drone_id, mode)
if result:
msg = f"{drone_id} 切換成功"
print(f" {msg}", flush=True)
self.message_queue.put((msg, 2000))
else:
msg = f"{drone_id} 切換失敗"
print(f" {msg}", flush=True)
self.message_queue.put((msg, 2000))
except Exception as e:
msg = f"{drone_id} 錯誤: {str(e)}"
print(f" {msg}", flush=True)
traceback.print_exc()
self.message_queue.put((msg, 2000))
print(f"\n 【完成】模式切換任務結束\n", flush=True)
# 通過事件循環提交異步任務
print(f" 【排隊】將任務提交至事件循環", flush=True)
loop = asyncio.get_event_loop()
for drone_id in group.drone_ids:
future = self.monitor.set_mode(drone_id, mode)
loop.create_task(self.handle_service_response(future, f"{drone_id} 切換 {mode}"))
asyncio.run_coroutine_threadsafe(
do_mode_changes_async(),
loop
)
def _handle_group_arm(self, group_id):
"""解鎖群組內所有無人機"""
print(f"\n📢 [GUI] _handle_group_arm 被調用")
print(f" 群組 ID: {group_id}")
group = self.mission_groups.get(group_id)
print(f" 群組存在: {group is not None}")
if not group:
print(f" ⚠️ 群組不存在,返回\n")
return
print(f" 群組內無人機: {group.drone_ids}")
loop = asyncio.get_event_loop()
print(f" 事件循環: {loop}")
for drone_id in group.drone_ids:
future = self.monitor.arm_drone(drone_id, True)
loop.create_task(self.handle_service_response(future, f"解鎖 {drone_id}"))
print(f"\n ┌─ 處理無人機: {drone_id}")
print(f" ├─ 準備調用 arm_drone(drone_id={drone_id}, arm=True)")
coro = self.monitor.arm_drone(drone_id, True)
print(f" ├─ arm_drone 返回類型: {type(coro)}")
action_text = f"解鎖 {drone_id}"
print(f" ├─ 準備提交到事件循環: {action_text}")
# 使用 run_coroutine_threadsafe() 正確地在事件循環中運行 coroutine
# 這是 Qt + asyncio 整合的正確方式
future = asyncio.run_coroutine_threadsafe(
self.handle_service_response(coro, action_text),
loop
)
print(f" ├─ Future 已創建: {future}")
print(f" └─ Future 將在事件循環中執行")
print(f"\n _handle_group_arm 完成coroutine 已提交至事件循環)\n")
def _handle_group_takeoff(self, group_id, altitude):
"""群組內所有無人機起飛"""
@ -663,23 +836,59 @@ class ControlStationUI(QMainWindow):
if panel:
panel.update_drone_list()
panel.update_status()
# 同步更新左側面板的 checkbox 狀態
self.monitor.selected_drones = group.drone_ids.copy()
for drone_id in self.drones.keys():
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
checkbox.blockSignals(True)
checkbox.setChecked(drone_id in group.drone_ids)
checkbox.blockSignals(False)
self.update_group_checkbox_state(self.get_socket_id(drone_id))
self.statusBar().showMessage(
f"Group {group_id}: 框選分配 {len(valid)} 台無人機", 3000)
def _handle_select_all_for_group(self, group_id):
"""全選所有可用無人機,直接分配到該群組"""
"""全選/取消全選 - Toggle 邏輯"""
group = self.mission_groups.get(group_id)
if not group:
return
other = self._get_other_assigned(group_id)
available = {did for did in self.drones.keys() if did not in other}
group.drone_ids = available
# Toggle 邏輯:如果已全選,則清空;否則全選
if group.drone_ids == available:
# 已全選 → 清空
group.drone_ids = set()
self.monitor.selected_drones.clear()
msg_status = "已取消全選"
else:
# 未全選 → 全選
group.drone_ids = available
self.monitor.selected_drones = group.drone_ids.copy()
msg_status = f"全選分配 {len(available)} 台無人機"
panel = self.group_panels.get(group_id)
if panel:
panel.update_drone_list()
panel.update_status()
# 更新按鈕文本
panel.set_all_select_state(group.drone_ids == available)
# 同步更新左側面板的 checkbox 狀態
for drone_id in self.drones.keys():
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
checkbox.blockSignals(True)
checkbox.setChecked(drone_id in group.drone_ids)
checkbox.blockSignals(False)
self.update_group_checkbox_state(self.get_socket_id(drone_id))
self.statusBar().showMessage(
f"Group {group_id}: 全選分配 {len(available)} 台無人機", 3000)
f"Group {group_id}: {msg_status}", 3000)
def _handle_clear_group(self, group_id):
"""清除群組的無人機分配"""
@ -696,9 +905,59 @@ class ControlStationUI(QMainWindow):
panel.update_drone_list()
panel.update_status()
panel.clear_mission_info()
# 同步更新左側面板的 checkbox 狀態(全部取消勾選)
self.monitor.selected_drones.clear()
for drone_id in self.drones.keys():
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
checkbox.blockSignals(True)
checkbox.setChecked(False)
checkbox.blockSignals(False)
self.update_group_checkbox_state(self.get_socket_id(drone_id))
self.statusBar().showMessage(
f"Group {group_id}: 已清除分組", 3000)
def _handle_delete_group(self, group_id):
"""刪除一個任務群組"""
if group_id not in self.mission_groups:
self.statusBar().showMessage(f"Group {group_id} 不存在", 3000)
return
# 停止群組的執行(如果有)
group = self.mission_groups[group_id]
if group.executor:
group.executor.stop()
# 移除地圖上的任務計畫
self.drone_map.clear_mission_plan_for_group(group_id)
# 移除 tab
for i in range(self.group_tab_widget.count()):
if f"Group {group_id}" in self.group_tab_widget.tabText(i):
self.group_tab_widget.removeTab(i)
break
# 移除資料
del self.mission_groups[group_id]
if group_id in self.group_panels:
del self.group_panels[group_id]
# 更新 active group
if self.active_group_id == group_id:
if self.group_tab_widget.count() > 0:
self.group_tab_widget.setCurrentIndex(0)
# 更新 active_group_id 為當前 tab 的群組
for gid, panel in self.group_panels.items():
if panel == self.group_tab_widget.currentWidget().widget():
self.active_group_id = gid
break
else:
self.active_group_id = None
self.statusBar().showMessage(f"已刪除 Group {group_id}", 3000)
def _on_group_mission_completed(self, group_id):
"""群組任務完成回呼"""
panel = self.group_panels.get(group_id)
@ -739,23 +998,111 @@ class ControlStationUI(QMainWindow):
# ================================================================================
def handle_group_selection(self, socket_id, state):
"""處理 socket 群組 checkbox 的勾選/取消勾選
這個方法在用戶點擊 socket 群組的 checkbox 時被調用
需要同時更新
1. socket 下所有無人機的 checkbox
2. self.monitor.selected_drones用於控制面板同步
3. 右侧活躍群組的無人機分配新增
參數
socket_id: socket ID (str)
state: Qt.CheckState 的整數值 (0=Unchecked, 1=PartiallyChecked, 2=Checked)
"""
print(f"\n📢 [GUI] handle_group_selection 被調用", flush=True)
print(f" socket_id: {socket_id}, state: {state}", flush=True)
print(f" state 類型: {type(state)}", flush=True)
# 獲取該 socket 下所有無人機
group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id]
is_checked = state == Qt.CheckState.Checked.value
print(f" 該 socket 下的無人機: {group_drones}", flush=True)
# 判斷是否勾選(只有 state == 2 時才是 Checked
is_checked = (state == 2) # Qt.CheckState.Checked.value == 2
print(f" is_checked: {is_checked}", flush=True)
# 更新該 socket 下所有無人機的 checkbox 狀態
for drone_id in group_drones:
checkbox = self.drones[drone_id].get_checkbox()
if checkbox:
print(f" └─ 更新 {drone_id}: setChecked({is_checked})", flush=True)
checkbox.blockSignals(True)
checkbox.setChecked(is_checked)
checkbox.blockSignals(False)
if is_checked: self.monitor.selected_drones.add(drone_id)
else: self.monitor.selected_drones.discard(drone_id)
# 同時更新 monitor.selected_drones 以同步控制面板
if is_checked:
self.monitor.selected_drones.add(drone_id)
else:
self.monitor.selected_drones.discard(drone_id)
# 👇 新增:同步更新右侧活躍群組的無人機分配
if self.active_group_id:
group = self.mission_groups.get(self.active_group_id)
panel = self.group_panels.get(self.active_group_id)
if group and panel:
print(f" ├─ 同步右侧群組 {self.active_group_id}", flush=True)
if is_checked:
# 勾選時:將該 socket 下的無人機添加到活躍群組
for drone_id in group_drones:
group.drone_ids.add(drone_id)
print(f" │ └─ 添加到群組: {group_drones}", flush=True)
else:
# 取消勾選時:從活躍群組移除該 socket 下的無人機
for drone_id in group_drones:
group.drone_ids.discard(drone_id)
print(f" │ └─ 從群組移除: {group_drones}", flush=True)
# 更新右側群組面板的顯示
panel.update_drone_list()
panel.update_status()
print(f" │ └─ 已更新右侧群組面板", flush=True)
print(f" 最終 selected_drones: {self.monitor.selected_drones}", flush=True)
print(f"✓ handle_group_selection 完成\n", flush=True)
def handle_drone_selection(self, drone_id, state):
if state == Qt.CheckState.Checked.value:
is_checked = state == Qt.CheckState.Checked.value
if is_checked:
self.monitor.selected_drones.add(drone_id)
else:
self.monitor.selected_drones.discard(drone_id)
self.update_group_checkbox_state(self.get_socket_id(drone_id))
# 同步更新任務群組的無人機分配狀態
# 遍歷所有任務群組,更新已分配的無人機列表顯示
if not is_checked:
# 取消勾選時:從所有包含該無人機的群組中移除
for group_id, group in self.mission_groups.items():
if drone_id in group.drone_ids:
group.drone_ids.discard(drone_id)
panel = self.group_panels.get(group_id)
if panel:
panel.update_drone_list()
panel.update_status()
# 更新全選按鈕狀態
other = self._get_other_assigned(group_id)
available = {did for did in self.drones.keys() if did not in other}
panel.set_all_select_state(group.drone_ids == available)
else:
# 勾選時:檢查該無人機是否已分配給其他群組,若未分配則添加到當前活躍群組
is_already_assigned = any(
drone_id in group.drone_ids
for group in self.mission_groups.values()
)
if not is_already_assigned and self.active_group_id:
# 無人機未被分配給任何群組,可以添加到當前活躍群組
group = self.mission_groups.get(self.active_group_id)
panel = self.group_panels.get(self.active_group_id)
if group and panel:
group.drone_ids.add(drone_id)
panel.update_drone_list()
panel.update_status()
# 更新全選按鈕狀態
other = self._get_other_assigned(self.active_group_id)
available = {did for did in self.drones.keys() if did not in other}
panel.set_all_select_state(group.drone_ids == available)
def update_group_checkbox_state(self, socket_id):
group_drones = [did for did in self.drones.keys() if self.get_socket_id(did) == socket_id]
@ -890,7 +1237,6 @@ class ControlStationUI(QMainWindow):
)
except Exception as e:
self.statusBar().showMessage(f"❌ Group {group.group_id}: 規劃失敗: {str(e)}", 5000)
import traceback
traceback.print_exc()
# ================================================================================
@ -962,7 +1308,6 @@ class ControlStationUI(QMainWindow):
)
except Exception as e:
self.statusBar().showMessage(f"❌ Group {group.group_id}: Grid Sweep 規劃失敗: {str(e)}", 5000)
import traceback
traceback.print_exc()
# ================================================================================
@ -1042,7 +1387,6 @@ class ControlStationUI(QMainWindow):
)
except Exception as e:
self.statusBar().showMessage(f"❌ Group {group.group_id}: 跟隨模式規劃失敗: {str(e)}", 5000)
import traceback
traceback.print_exc()
# ================================================================================
@ -1180,7 +1524,6 @@ class ControlStationUI(QMainWindow):
self._map_update_count += 1
now = time.time()
if now - self._map_update_time >= 1.0:
print(f"[Panel/Map Update] {self._map_update_count} Hz")
self._map_update_time = now
self._map_update_count = 0
@ -1314,32 +1657,126 @@ class ControlStationUI(QMainWindow):
def _process_message_queue(self):
"""處理來自後台線程的消息隊列(更新 GUI 狀態欄)"""
try:
while True:
try:
message, duration = self.message_queue.get_nowait()
self.statusBar().showMessage(message, duration)
except:
break
except Exception as e:
print(f"❌ 消息隊列處理錯誤: {e}", flush=True)
traceback.print_exc()
def _spin_asyncio(self):
"""驅動 asyncio 事件循環,允許異步任務執行
關鍵修復asyncio 事件循環不會自動運行
這個定時器會定期執行事件循環 run_coroutine_threadsafe() 提交的任務能夠執行
"""
try:
loop = asyncio.get_event_loop()
if loop and not loop.is_closed() and not loop.is_running():
# 執行事件循環直到沒有掛起的任務或超時
# 使用 run_until_complete() 配合一個快速返回的 coroutine
loop.run_until_complete(asyncio.sleep(0))
except Exception as e:
# 靜默忽略任何錯誤,防止 Qt 定時器出現異常
# 但仍然打印詳細的堆棧跟踪以便調試
traceback.print_exc()
def spin_ros(self):
try:
self.executor.spin_once(timeout_sec=0.01)
for (drone_id, msg_type), data in self.monitor.latest_data.items():
self.monitor.signals.update_signal.emit(msg_type, drone_id, data)
self.monitor.latest_data.clear()
# 仅在 ROS2 正常工作时才尝试 spin
if rclpy.ok():
self.executor.spin_once(timeout_sec=0.01)
for (drone_id, msg_type), data in self.monitor.latest_data.items():
self.monitor.signals.update_signal.emit(msg_type, drone_id, data)
self.monitor.latest_data.clear()
except RuntimeError as e:
# ROS2 context 被破坏或不可用
if "Context" in str(e) or "context" in str(e).lower():
print(f"⚠️ ROS2 context 錯誤(忽略): {e}", flush=True)
else:
print(f"ROS spin error: {e}", flush=True)
traceback.print_exc()
except Exception as e:
print(f"ROS spin error: {e}")
print(f"ROS spin error: {e}", flush=True)
traceback.print_exc()
def closeEvent(self, event):
for group in self.mission_groups.values():
if group.executor:
group.executor.stop()
self.command_sender.close()
for receiver in self.udp_receivers:
receiver.stop()
for receiver in self.monitor.ws_receivers:
receiver.stop()
self.monitor.destroy_node()
self.executor.shutdown()
rclpy.shutdown()
try:
for group in self.mission_groups.values():
if group.executor:
group.executor.stop()
self.command_sender.close()
for receiver in self.udp_receivers:
receiver.stop()
for receiver in self.monitor.ws_receivers:
receiver.stop()
self.monitor.destroy_node()
self.executor.shutdown()
except Exception as e:
print(f"⚠️ 清理資源時出錯: {e}", flush=True)
traceback.print_exc()
# 安全地 shutdown ROS2
try:
if rclpy.ok():
rclpy.shutdown()
except RuntimeError as e:
print(f"⚠️ ROS2 shutdown 錯誤: {e}", flush=True)
traceback.print_exc()
event.accept()
def main():
"""
GUI 應用程式的主入口點
標準 ROS2 + Qt 架構
1. 在最外層/最前面只做一次 rclpy.init()
2. 啟動 Qt 應用程式
3. finally 中做 rclpy.shutdown()
這樣可以確保所有 ROS2 相關操作都共享同一個初始化狀態
"""
# 第一步:在最外層只初始化一次 ROS2終極防護
# 添加 rclpy.ok() 檢查,防止重複初始化導致 "Context.init() must only be called once" 錯誤
print("🚀 [GUI 主程式] 檢查 ROS2 初始化狀態...", flush=True)
if not rclpy.ok():
print("🚀 [GUI 主程式] ROS2 未初始化,開始初始化...", flush=True)
rclpy.init()
print("✅ [GUI 主程式] ROS2 已全局初始化(由 GUI 主程式霸佔)", flush=True)
else:
print(" [GUI 主程式] ROS2 已初始化,跳過重複初始化", flush=True)
try:
# 第二步:啟動 Qt 應用程式
print("🚀 [GUI 主程式] 啟動 Qt 應用程式...", flush=True)
app = QApplication(sys.argv)
station = ControlStationUI()
station.show()
print("✓ [GUI 主程式] 應用程式已啟動", flush=True)
# 第三步:進入 Qt 事件循環(阻塞直到用戶關閉應用)
print("🎯 [GUI 主程式] 進入主事件循環,等待用戶操作...", flush=True)
sys.exit(app.exec())
finally:
# 第四步:只有當 GUI 視窗被關閉時,才做 ROS2 cleanup終極防護
# 這裡確保 ROS2 被正確且安全地關閉
print("\n🛑 [GUI 主程式] 應用程式關閉,正在清理 ROS2 資源...", flush=True)
if rclpy.ok():
rclpy.shutdown()
print("✓ [GUI 主程式] ROS2 已安全關閉", flush=True)
else:
print(" [GUI 主程式] ROS2 已關閉或不可用,無需重複 shutdown", flush=True)
print("✓ [GUI 主程式] 應用程式完全退出", flush=True)
if __name__ == '__main__':
app = QApplication(sys.argv)
station = ControlStationUI()
station.show()
app.exec()
main()

@ -146,6 +146,8 @@ class GroupPanel(QWidget):
box_select_requested = pyqtSignal(str) # group_id — 框選直接分配
select_all_requested = pyqtSignal(str) # group_id — 全選直接分配
clear_group_requested = pyqtSignal(str) # group_id — 清除分組
add_group_requested = pyqtSignal() # 新增群組
delete_group_requested = pyqtSignal(str) # group_id — 刪除群組
BUTTON_STYLE = """
QPushButton {{ background-color: {bg}; color: {fg}; border: none;
@ -157,6 +159,8 @@ class GroupPanel(QWidget):
def __init__(self, group: MissionGroup, parent=None):
super().__init__(parent)
self.group = group
self._is_all_selected = False # 追蹤全選狀態
self.all_btn_ref = None # 保存全選按鈕的參考
self._build_ui()
def _make_sep(self):
@ -296,7 +300,7 @@ class GroupPanel(QWidget):
mid.addLayout(mid_body)
# ============================
# 選取與分組(2x2 按鈕)
# 選取與分組(3x2 按鈕)
# ============================
right = QVBoxLayout()
right.setSpacing(3)
@ -310,9 +314,28 @@ class GroupPanel(QWidget):
self.drone_list_label.setWordWrap(True)
right.addWidget(self.drone_list_label)
# 2x2 按鈕網格
# 3x2 按鈕網格:第一行 框選 全選 新增群組
grid_r1 = QHBoxLayout()
grid_r1.setSpacing(3)
box_btn = QPushButton("框選")
box_btn.setStyleSheet(BTN.format(bg='#64B5F6', fg='white', hover='#42A5F5'))
box_btn.clicked.connect(
lambda: self.box_select_requested.emit(self.group.group_id))
all_btn = QPushButton("全選/取消")
all_btn.setStyleSheet(BTN.format(bg='#64B5F6', fg='white', hover='#42A5F5'))
all_btn.clicked.connect(self._on_all_select_clicked)
self.all_btn_ref = all_btn # 保存按鈕參考(備用)
add_group_btn = QPushButton("+ 新增群組")
add_group_btn.setStyleSheet(BTN.format(bg='#4A9EFF', fg='white', hover='#3A8EEF'))
add_group_btn.clicked.connect(lambda: self.add_group_requested.emit())
grid_r1.addWidget(box_btn)
grid_r1.addWidget(all_btn)
grid_r1.addWidget(add_group_btn)
right.addLayout(grid_r1)
# 第二行 編輯分配 清除分組 刪除群組
grid_r2 = QHBoxLayout()
grid_r2.setSpacing(3)
assign_btn = QPushButton("編輯分配")
assign_btn.setStyleSheet(BTN.format(bg='#555', fg='#DDD', hover='#666'))
assign_btn.clicked.connect(
@ -321,22 +344,13 @@ class GroupPanel(QWidget):
clear_btn.setStyleSheet(BTN.format(bg='#777', fg='white', hover='#888'))
clear_btn.clicked.connect(
lambda: self.clear_group_requested.emit(self.group.group_id))
grid_r1.addWidget(assign_btn)
grid_r1.addWidget(clear_btn)
right.addLayout(grid_r1)
grid_r2 = QHBoxLayout()
grid_r2.setSpacing(3)
box_btn = QPushButton("框選")
box_btn.setStyleSheet(BTN.format(bg='#64B5F6', fg='white', hover='#42A5F5'))
box_btn.clicked.connect(
lambda: self.box_select_requested.emit(self.group.group_id))
all_btn = QPushButton("全選")
all_btn.setStyleSheet(BTN.format(bg='#64B5F6', fg='white', hover='#42A5F5'))
all_btn.clicked.connect(
lambda: self.select_all_requested.emit(self.group.group_id))
grid_r2.addWidget(box_btn)
grid_r2.addWidget(all_btn)
delete_group_btn = QPushButton("刪除群組")
delete_group_btn.setStyleSheet(BTN.format(bg='#EF5350', fg='white', hover='#E53935'))
delete_group_btn.clicked.connect(
lambda: self.delete_group_requested.emit(self.group.group_id))
grid_r2.addWidget(assign_btn)
grid_r2.addWidget(clear_btn)
grid_r2.addWidget(delete_group_btn)
right.addLayout(grid_r2)
right.addStretch()
@ -410,13 +424,15 @@ class GroupPanel(QWidget):
self.type_combo.currentTextChanged.connect(self._update_param_visibility)
# ── 組裝四欄:控制指令 > 任務規劃 > 任務參數 > 選取與分組 ──
cols.addLayout(left, 1)
# 使用伸展因子 0 讓列根據內容自動調整寬度,而不是均等分配
cols.addLayout(left, 0)
cols.addWidget(self._make_sep())
cols.addLayout(mid, 1)
cols.addLayout(mid, 0)
cols.addWidget(self._make_sep())
cols.addLayout(param_col, 1)
cols.addLayout(param_col, 0)
cols.addWidget(self._make_sep())
cols.addLayout(right, 1)
cols.addLayout(right, 0)
cols.addStretch() # 填充剩餘空間,使四列置左
layout.addLayout(cols)
@ -451,6 +467,14 @@ class GroupPanel(QWidget):
self.status_label.setStyleSheet(
f"color: {self.group.color}; font-size: 11px; font-weight: bold;")
def _on_all_select_clicked(self):
"""全選按鈕點擊 - 發送信號給 gui.py 處理 toggle 邏輯"""
self.select_all_requested.emit(self.group.group_id)
def set_all_select_state(self, is_selected):
"""外部設置全選狀態(按鈕文本保持「全選/取消」)"""
self._is_all_selected = is_selected
def _update_param_visibility(self, _=None):
"""根據當前任務類型,顯示/隱藏對應的參數列"""
mission_type = self.type_combo.currentText()

@ -38,7 +38,7 @@ class SerialMode(Enum):
NOT_USE = auto() # 不使用
# ====================== Frame Processor 基類與實現 =====================
# ====================== Frame Processor Base and Implementation =====================
class FrameProcessor(ABC):
"""協議處理器基類"""
@ -47,10 +47,10 @@ class FrameProcessor(ABC):
self.buffer = bytearray()
@abstractmethod
def process_incoming(self, data: bytes):
def process_incoming(self, data: bytes) -> bytes:
"""
處理接收到的數據
返回已完整解析的 payload 列表
返回已完整解析的 payload 列表 後續要丟到 UDP
"""
pass
@ -66,7 +66,7 @@ class FrameProcessor(ABC):
class RawFrameProcessor(FrameProcessor):
"""原始數據直通處理器"""
def process_incoming(self, data: bytes):
def process_incoming(self, data: bytes) -> bytes:
"""直接返回原始數據,不進行緩衝"""
return [data] if data else []
@ -76,110 +76,106 @@ class RawFrameProcessor(FrameProcessor):
class XBeeFrameProcessor(FrameProcessor):
"""XBee API 協議處理器"""
def process_incoming(self, data: bytes):
"""
XBee API 協議處理器
職責
- XBee API frame 的拆幀 / 組幀
- 0x90 (RX Packet) -> 解出 payload 回傳
- 0x88 (AT Response) -> 轉交 at_handler 處理若有注入
- 0x8B (TX Status) -> 目前忽略
- 其他 frame type -> warning 忽略
若未來要做變化型 XBee例如 API2 escape mode不同 addressing
繼承此類並覆寫 _encapsulate / _decapsulate / _try_extract_frame 即可
"""
# XBee API frame type
FRAME_TYPE_TX_REQUEST = 0x10
FRAME_TYPE_AT_RESPONSE = 0x88
FRAME_TYPE_TX_STATUS = 0x8B
FRAME_TYPE_RX_PACKET = 0x90
def __init__(self, at_handler: "ATCommandHandler" = None):
super().__init__()
self.at_handler = at_handler
# ---- 對外契約 ----
def process_incoming(self, data: bytes) -> bytes:
"""處理 XBee API 幀並提取 payload"""
self.buffer.extend(data)
payloads = []
while True:
frame = self._try_extract_frame()
if frame is None:
break
payload = self._dispatch_frame(frame)
if payload:
payloads.append(payload)
return payloads
def process_outgoing(self, data: bytes) -> bytes:
"""將數據封裝為 XBee API 傳輸幀"""
return self._encapsulate(data)
# ---- 內部:拆幀與分派 ----
def _try_extract_frame(self) -> bytes:
"""
buffer 嘗試抓一個完整 frame
- 對齊幀頭 0x7E
- 長度不足時等待下次進 buffer
- 成功則從 buffer 切掉並回傳整個 frame bytes
"""
while len(self.buffer) >= 3:
# 尋找幀頭
if self.buffer[0] != 0x7E:
self.buffer.pop(0)
continue
# 讀取 payload 長度
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1 # 起始符(1) + 長度(2) + payload + 校驗和(1)
# 等待完整幀
if len(self.buffer) < full_length:
break
# 提取完整 frame 並從緩衝區移除
return None
frame = bytes(self.buffer[:full_length])
del self.buffer[:full_length]
# 判斷 frame 類型並處理
frame_type = frame[3]
if frame_type == 0x90: # RX Packet
payload = XBeeFrameHandler.decapsulate_data(frame)
if payload:
payloads.append(payload)
elif frame_type == 0x88: # AT Response
# 可以在這裡處理 AT 指令回應
# response = XBeeFrameHandler.parse_at_command_response(frame)
# 目前忽略
pass
elif frame_type == 0x8B: # Transmit Status
# 傳輸狀態,目前忽略
pass
else:
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
return payloads
def process_outgoing(self, data: bytes) -> bytes:
"""將數據封裝為 XBee API 傳輸幀"""
return XBeeFrameProcessor.encapsulate_data(data)
return frame
return None
# ====================== XBee Frame Handler =====================
def _dispatch_frame(self, frame: bytes) -> bytes:
"""根據 frame type 分派;若是 RX payload 回傳 bytes其餘回傳 None"""
frame_type = frame[3]
class XBeeFrameHandler:
"""XBee API Frame 處理器"""
@staticmethod
def parse_at_command_response(frame: bytes) -> dict:
"""解析 AT Command Response (0x88)"""
if len(frame) < 8:
if frame_type == self.FRAME_TYPE_RX_PACKET:
return self._decapsulate(frame)
if frame_type == self.FRAME_TYPE_AT_RESPONSE:
if self.at_handler is not None:
self.at_handler.handle_frame(frame)
return None
frame_type = frame[3]
if frame_type != 0x88:
if frame_type == self.FRAME_TYPE_TX_STATUS:
return None
frame_id = frame[4]
at_command = frame[5:7]
status = frame[7]
data = frame[8:] if len(frame) > 8 else b''
return {
'frame_id': frame_id,
'command': at_command,
'status': status,
'data': data,
'is_ok': status == 0x00
}
@staticmethod
def parse_receive_packet(frame: bytes) -> dict:
"""解析 RX Packet (0x90) - 未來擴展用"""
# if len(frame) < 15 or frame[3] != 0x90:
# return None
# return {
# 'source_addr': frame[4:12],
# 'reserved': frame[12:14],
# 'options': frame[14],
# 'data': frame[15:-1]
# }
pass
logger.warning(f"Unknown XBee frame type: 0x{frame_type:02X}")
return None
# ---- 內部:編解碼 ----
@staticmethod
def encapsulate_data(data: bytes, dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF', frame_id = 0x01) -> bytes:
def _encapsulate(
data: bytes,
dest_addr64: bytes = b'\x00\x00\x00\x00\x00\x00\xFF\xFF',
frame_id: int = 0x01,
) -> bytes:
"""
將數據封裝為 XBee API 傳輸幀
使用 XBee API 格式封裝數據:
- 傳輸請求幀 (0x10)
payload 包成 XBee TX Request (0x10)
- 使用廣播地址
- 添加適當的頭部和校驗和
"""
frame_type = 0x10
frame_type = XBeeFrameProcessor.FRAME_TYPE_TX_REQUEST
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
@ -191,18 +187,24 @@ class XBeeFrameHandler:
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
@staticmethod
def decapsulate_data(data: bytes):
# 獲取數據長度 (不包括校驗和)
length = (data[1] << 8) | data[2]
def _decapsulate(frame: bytes) -> bytes:
"""從 RX Packet (0x90) 取出 payload"""
length = (frame[1] << 8) | frame[2]
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
return frame[rf_data_start:3 + length]
# ====================== Dongle Command Handler =====================
class ATCommandHandler:
"""AT 指令回應處理器"""
"""
XBee AT 指令回應處理器
職責
- 解析 AT Response frame (0x88)
- AT 指令分派到對應的 handler
"""
def __init__(self, serial_port: str):
self.serial_port = serial_port
self.handlers = {
@ -211,65 +213,93 @@ class ATCommandHandler:
b'SL': self._handle_serial_low,
# 可擴展其他 AT 指令
}
def handle_response(self, response: dict):
def handle_frame(self, frame: bytes) -> None:
"""接收一整個 AT Response frame內部完成 parse + dispatch"""
parsed = self._parse(frame)
if parsed is None:
return
self._dispatch(parsed)
@staticmethod
def _parse(frame: bytes) -> dict:
"""解析 AT Command Response (0x88);不符格式回傳 None"""
if len(frame) < 8:
return None
if frame[3] != 0x88:
return None
frame_id = frame[4]
at_command = frame[5:7]
status = frame[7]
data = frame[8:] if len(frame) > 8 else b''
return {
'frame_id': frame_id,
'command': at_command,
'status': status,
'data': data,
'is_ok': status == 0x00,
}
def _dispatch(self, response: dict) -> None:
"""根據 AT 指令類型分派處理"""
if not response or not response['is_ok']:
if response:
logger.warning(f"[{self.serial_port}] AT {response['command'].decode()} 失敗,狀態碼: {response['status']}")
if not response['is_ok']:
logger.warning(
f"[{self.serial_port}] AT {response['command'].decode()} "
f"失敗,狀態碼: {response['status']}"
)
return
command = response['command']
handler = self.handlers.get(command)
handler = self.handlers.get(response['command'])
if handler:
handler(response['data'])
else:
logger.debug(f"[{self.serial_port}] 未處理的 AT 指令: {command.decode()}")
logger.debug(
f"[{self.serial_port}] 未處理的 AT 指令: "
f"{response['command'].decode()}"
)
def _handle_rssi(self, data: bytes):
"""處理 DB (RSSI) 回應"""
# 未來可實現 RSSI 處理邏輯
pass
def _handle_serial_high(self, data: bytes):
"""處理 SH (Serial Number High)"""
pass
def _handle_serial_low(self, data: bytes):
"""處理 SL (Serial Number Low)"""
pass
# ====================== Serial Handler =====================
# ================ Serial UDP Socket Object ==============
class SerialHandler(asyncio.Protocol):
"""asyncio.Protocol 用於處理 Serial 收發"""
def __init__(self, udp_handler, serial_port_str, serial_mode: SerialMode):
self.udp_handler = udp_handler # UDP 的傳輸物件
self.serial_port_str = serial_port_str
self.serial_mode = serial_mode
self.transport = None # Serial 自己的傳輸物件
# 根據模式創建對應的 processor
self.processor = self._create_processor(serial_mode)
# AT 指令處理器(僅 XBee 模式使用)
if serial_mode == SerialMode.XBEEAPI2AT:
self.at_handler = ATCommandHandler(serial_port_str)
else:
self.at_handler = None
def _create_processor(self, serial_mode: SerialMode) -> FrameProcessor:
"""工廠方法:根據模式創建處理器"""
# 根據模式建立 processor需要 AT handler 時一併在工廠內建好注入)
self.processor = self._create_processor(serial_mode, serial_port_str)
@staticmethod
def _create_processor(serial_mode: SerialMode, serial_port_str: str) -> FrameProcessor:
"""工廠方法:根據模式建立對應的 processor必要時一併注入 AT handler"""
if serial_mode == SerialMode.STRAIGHT:
return RawFrameProcessor()
elif serial_mode == SerialMode.XBEEAPI2AT:
return XBeeFrameProcessor()
else:
logger.warning(f"Unknown serial mode: {serial_mode}, using Raw")
return RawFrameProcessor()
if serial_mode == SerialMode.XBEEAPI2AT:
at_handler = ATCommandHandler(serial_port_str)
return XBeeFrameProcessor(at_handler=at_handler)
logger.warning(f"Unknown serial mode: {serial_mode}, using Raw")
return RawFrameProcessor()
def connection_made(self, transport):
"""連接建立時的回調"""
@ -291,8 +321,6 @@ class SerialHandler(asyncio.Protocol):
)
# ====================== UDP Handler =====================
class UDPHandler(asyncio.DatagramProtocol):
"""asyncio.DatagramProtocol 用於處理 UDP 收發"""
@ -600,7 +628,7 @@ if __name__ == '__main__':
SERIAL_PORT = '/dev/ttyUSB0' # 手動指定
SERIAL_BAUDRATE = 115200
UDP_REMOTE_PORT = 14571
UDP_REMOTE_PORT = 14561
sm.create_serial_link(SERIAL_PORT, SERIAL_BAUDRATE, UDP_REMOTE_PORT, SerialMode.XBEEAPI2AT)
# SERIAL_PORT = '/dev/ttyACM0' # 手動指定
@ -615,3 +643,19 @@ if __name__ == '__main__':
sm.remove_serial_link(1)
time.sleep(3)
sm.shutdown()
'''
================= 改版記錄 ============================
2026.4.20
1. XBeeFrameHandler 結構移除
2. XBeeFrameProcessor 新增 _encapsulate, _decapsulate 編碼解碼 xbee 封包的功能 (原來在 XBeeFrameHandler )
3. XBeeFrameProcessor 新增 _try_extract_frame 處理被可能截斷的 UART 封包
4. XBeeFrameProcessor 新增 _dispatch_frame 分配封包到 UDP 或者 Dongle Command Handler
5. ATCommandHandler 新增 _parse 去拆解 0x88 AT Command Response
6. ATCommandHandler 新增 _dispatch 把拆解的結果 分配到 _handle_XXX
7. ATCommandHandler 新增各項 _handle_XXX (未實作)
'''

@ -1,45 +0,0 @@
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
# import mavros_msgs.srv
class TalkerNode(Node):
def __init__(self):
start_time = time.time()
super().__init__('talker_node')
end_time = time.time()
print(f"Node initialization took {end_time - start_time:.2f} seconds")
self.publisher_ = self.create_publisher(String, 'hahatest/_1', 10)
self.timer = self.create_timer(1.0, self.timer_callback) # 每秒執行一次
self.get_logger().info('TalkerNode has been started.')
def timer_callback(self):
msg = String()
msg.data = 'Hello, ROS 2!'
self.publisher_.publish(msg)
self.get_logger().info(f'Published: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
node = TalkerNode()
print("Before sleep")
time.sleep(5) # 等待 5 秒鐘
print("After sleep")
try:
start_time = time.time()
while time.time() - start_time < 10: # 持續 10 秒鐘
rclpy.spin_once(node)
time.sleep(1) # 每秒執行一次
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

@ -0,0 +1,188 @@
import time
from pymavlink import mavutil
# # 強制 source_system=255
# master = mavutil.mavlink_connection('udpout:localhost:14561', source_system=255)
# print("Starting Force-Ping Test...")
# while True:
# # 1. 發送 Heartbeat (證明連線活著)
# master.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_GCS,
# mavutil.mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0)
# # 2. 強制發送 PING (target 1, 1)
# # 我們加一個計數器,方便在 tcpdump 裡辨識
# ping_seq = int(time.time()) % 100
# master.mav.ping_send(int(time.time() * 1e6), ping_seq, 1, 1)
# print(f"Sent Heartbeat and PING (seq {ping_seq})")
# # 3. 非阻塞讀取,避免卡死
# msg = master.recv_match(blocking=False)
# if msg:
# print(f"--> Received something: {msg.get_type()}")
# time.sleep(1) # 每秒噴一次
# -------------------------------------------------------------------
# # 改成 udpin這會讓你的程式守在 14561 等模擬器送資料進來
# # 這樣你就能收到模擬器的 Heartbeat建立連線後再發 PING
# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255)
# print("Waiting for simulator heartbeat...")
# # 這一行很重要:先收到 Heartbeatpymavlink 才會設定好 target 地址
# master.wait_heartbeat()
# print(f"Heartbeat from system {master.target_system} component {master.target_component}")
# while True:
# # 既然收到了,現在發 PING 就會順著原路回去
# master.mav.ping_send(int(time.time() * 1e6), 1, master.target_system, master.target_component)
# print("PING sent!")
# msg = master.recv_match(type='PING', blocking=True, timeout=2)
# if msg:
# print(f"Got PING response: {msg}")
# time.sleep(1)
# -------------------------------------------------------------------
# import time
# from pymavlink import mavutil
# # 核心修改:因為 MAVProxy 正在往 14561 噴資料,我們必須「接住」它
# # 使用 udpin 會讓你的程式在 14561 監聽
# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=254)
# print("正在等待 MAVProxy 轉發的 Heartbeat...")
# # 只要這行跑過,代表你跟 MAVProxy 握手成功了
# master.wait_heartbeat()
# print(f"成功連線到系統 {master.target_system}")
# while True:
# # 1. 務必發送 Heartbeat讓 MAVProxy 知道要把回應送回這個 Port
# master.mav.heartbeat_send(
# mavutil.mavlink.MAV_TYPE_GCS,
# mavutil.mavlink.MAV_AUTOPILOT_INVALID,
# 0, 0, 0
# )
# # 2. 修改 target_component 為 1 (飛控核心)
# # 注意:這裡刻意將 seq 設為動態,方便你在 watch PING 裡觀察
# p_seq = int(time.time()) % 255
# master.mav.ping_send(
# int(time.time() * 1e6),
# p_seq,
# 1,
# 1 # <--- 改成 1 試試看!
# )
# print(f"Sent PING seq {p_seq} to 1/1")
# # 3. 讀取所有訊息,看看有沒有 PING 回來
# # 有時候回應的 target_system 可能是 255 (你的 ID)
# msg = master.recv_match(type='PING', blocking=True, timeout=1.0)
# if msg:
# print(f"🎉 成功!收到回應: {msg}")
# time.sleep(1)
# -------------------------------------------------------------------
# import time
# from pymavlink import mavutil
# # 建議一樣用 udpin 監聽 MAVProxy 的輸出
# master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255)
# def get_time_ns():
# return int(time.time() * 1e9) # 轉為奈秒
# print("開始測試 TIMESYNC 響應...")
# while True:
# now_ns = get_time_ns()
# # 發送 TIMESYNC 請求
# # tc1=0 代表這是請求ts1=當前時間
# master.mav.timesync_send(0, now_ns)
# # 等待回應
# msg = master.recv_match(type='TIMESYNC', blocking=True, timeout=1.0)
# if msg and msg.tc1 != 0:
# # 收到回應msg.tc1 應該等於我們剛才送出的 now_ns
# rtt_ms = (get_time_ns() - msg.tc1) / 1e6
# print(f"🕒 TIMESYNC RTT: {rtt_ms:.3f} ms")
# break # 測試一次就好
# else:
# print("等待 TIMESYNC 回應中...")
# time.sleep(1)
# -------------------------------------------------------------------
import time
import statistics
from pymavlink import mavutil
# 連接到 MAVProxy 轉發的端口
master = mavutil.mavlink_connection('udpin:localhost:14561', source_system=255)
# 測試設定
TARGET_SYS = 2 # 你想測試的載具 ID
SAMPLES = 20 # 取 20 次樣本來算平均和抖動
rtt_history = []
print(f"開始測試對 System {TARGET_SYS} 的通道品質...")
def get_time_ns():
return int(time.time() * 1e9)
try:
while len(rtt_history) < SAMPLES:
ts1_send = get_time_ns()
# 發送 TIMESYNC (V1/V2 格式相容)
# 針對特定系統發送
master.mav.timesync_send(0, ts1_send)
# 等待回應 (過濾目標系統的訊息)
msg = master.recv_match(type='TIMESYNC', blocking=True, timeout=1.0)
if msg and msg.get_srcSystem() == TARGET_SYS and msg.tc1 != 0:
ts1_recv = get_time_ns()
rtt_ns = ts1_recv - msg.tc1
rtt_ms = rtt_ns / 1e6
rtt_history.append(rtt_ms)
print(f"Sample {len(rtt_history)}: RTT = {rtt_ms:.3f} ms")
else:
print("Request timeout or mismatched ID...")
time.sleep(0.1) # 快速採樣
# 4. 計算統計數據
avg_latency = statistics.mean(rtt_history)
std_dev = statistics.stdev(rtt_history) # 這就是 Jitter 的一種表現
max_rtt = max(rtt_history)
min_rtt = min(rtt_history)
print("\n--- 物理通道品質分析報告 ---")
print(f"目標載具 : System {TARGET_SYS}")
print(f"平均延遲 (Avg) : {avg_latency:.3f} ms")
print(f"最小延遲 (Min) : {min_rtt:.3f} ms")
print(f"最大延遲 (Max) : {max_rtt:.3f} ms")
print(f"抖動幅度 (Jitter): {std_dev:.3f} ms (標準差)")
if std_dev > 10:
print("警告:網路抖動過高,可能影響控制穩定性!")
else:
print("通道品質:良好")
except KeyboardInterrupt:
print("測試終止。")

@ -0,0 +1,46 @@
from pymavlink.dialects.v20 import common as mavlink
class NullBuffer:
def write(self, data): pass
def seek(self, pos): pass
def tell(self): return 0
class CapturingBuffer:
def __init__(self):
self.last_data = b''
def write(self, data):
self.last_data = bytes(data)
def seek(self, pos): pass
def tell(self): return 0
# 1. 初始化(僅作為編碼器)
# file 參數可以是任何擁有 write() 方法的物件,這裡用 BytesIO 模擬
# 初始化方法一:使用 BytesIO
# import io
# out_buf = io.BytesIO()
# mav = mavlink.MAVLink(out_buf, srcSystem=1, srcComponent=191)
# 初始化方法二:使用自定義的 NullBuffer
mav = mavlink.MAVLink(NullBuffer(), srcSystem=1, srcComponent=191)
mav.seq = 254
# 2. 建立心跳包並取得二進制數據
# 連續產出「不同」的訊息物件
for i in range(3):
msg = mav.heartbeat_encode(mavlink.MAV_TYPE_GCS, 0, 0, 0, 0)
data = msg.pack(mav)
mav.seq = (mav.seq + 1) & 0xFF
# MAVLink 2 的序列號在第 3 個 Byte (Index 2)
print(f"{i} 次發送, Seq: {data[4]}, Raw: {data.hex()}")
print("分隔線")
buf = CapturingBuffer()
mav_buf2 = mavlink.MAVLink(buf, srcSystem=1, srcComponent=191)
for i in range(3):
mav_buf2.heartbeat_send(mavlink.MAV_TYPE_GCS, 0, 0, 0, 0)
data = buf.last_data
print(f"{i} 次發送, Seq: {data[4]}, Raw: {data.hex()}")

File diff suppressed because it is too large Load Diff

@ -0,0 +1,71 @@
import asyncio
import serial_asyncio
SERIAL_PORT = '/dev/ttyUSB0' # 修改為你的 serial port
SERIAL_BAUDRATE = 115200 # 修改為你的 baudrate
UDP_REMOTE_IP = '192.168.1.100' # 修改為目標 IP
UDP_REMOTE_PORT = 5005 # 修改為目標 port
UDP_LOCAL_PORT = 5006 # 本地 UDP 監聽 port
class SerialToUDP(asyncio.Protocol):
def __init__(self, udp_transport):
self.udp_transport = udp_transport
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
# Serial 收到資料,轉發到 UDP
self.udp_transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def write_to_serial(self, data):
self.transport.write(data)
class UDPToSerial(asyncio.DatagramProtocol):
def __init__(self, serial_proto):
self.serial_proto = serial_proto
def datagram_received(self, data, addr):
# UDP 收到資料,轉發到 Serial
self.serial_proto.write_to_serial(data)
async def main():
loop = asyncio.get_running_loop()
# 定義協議工廠函數
def create_empty_protocol():
return asyncio.DatagramProtocol()
# 建立 UDP1 傳輸
udp_transport, _ = await loop.create_datagram_endpoint(
create_empty_protocol,
local_addr=('0.0.0.0', UDP_LOCAL_PORT)
)
# 建立 Serial 傳輸
serial_proto = SerialToUDP(udp_transport)
def get_serial_protocol():
return serial_proto
_, serial_transport = await serial_asyncio.create_serial_connection(
loop, get_serial_protocol, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
# 建立 UDP2 監聽
udp_proto = UDPToSerial(serial_proto)
def get_udp_protocol():
return udp_proto
udp_listen_transport, _ = await loop.create_datagram_endpoint(
get_udp_protocol,
local_addr=('0.0.0.0', UDP_LOCAL_PORT)
)
# 保持運行
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
if __name__ == '__main__':
asyncio.run(main())

@ -1,27 +0,0 @@
#!/bin/bash
# 網站清單
DOMAINS=("google.com" "smarter.nchu.edu.tw")
echo "網站 SSL 憑證剩餘天數:"
echo "---------------------------"
for domain in "${DOMAINS[@]}"; do
end_date=$(echo | openssl s_client -servername "$domain" -connect "$domain:443" 2>/dev/null |
openssl x509 -noout -enddate | cut -d= -f2)
end_timestamp=$(date -d "$end_date" +%s)
now_timestamp=$(date +%s)
remaining_days=$(( (end_timestamp - now_timestamp) / 86400 ))
if [ $remaining_days -lt 0 ]; then
status="已過期 ❌"
elif [ $remaining_days -lt 15 ]; then
status="即將到期 ⚠️"
else
status="正常 ✅"
fi
printf "%-20s 到期日:%-25s 剩餘天數:%3d 天 %s\n" "$domain" "$end_date" "$remaining_days" "$status"
done

@ -0,0 +1,40 @@
import socket
import sys
import select
# 設定來源 IP 與 Port
SRC_IP = '127.0.0.1' # 監聽所有介面
SRC_PORT = 16661 # 請自行修改
# 建立 UDP 監聽 socket
src_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
src_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
src_sock.bind((SRC_IP, SRC_PORT))
print(f"Listening for UDP on {SRC_IP}:{SRC_PORT}")
# 設定目標 Unix socket 路徑
UNIX_SOCKET_PATH = '/tmp/unix_socket_mavlink.sock' # 請自行修改
# 建立 Unix socket 連線
unix_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
unix_sock.connect(UNIX_SOCKET_PATH)
except Exception as e:
print(f"Failed to connect to unix socket: {e}")
sys.exit(1)
while True:
# 使用 select 監聽兩個 socket
readable, _, _ = select.select([src_sock, unix_sock], [], [])
for sock in readable:
if sock is src_sock:
data, addr = src_sock.recvfrom(4096)
if data:
unix_sock.sendall(data)
# print(f"Received UDP data from {addr}: {data}") # debug
# break # debug
elif sock is unix_sock:
data = unix_sock.recv(4096)
if data:
# 回送到最近收到資料的 UDP client
src_sock.sendto(data, addr)
Loading…
Cancel
Save