You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/GUI/communication.py

1104 lines
44 KiB
Python

from rclpy.node import Node
from PyQt6.QtCore import QObject, pyqtSignal
import math
import re
import threading
from threading import Lock
import asyncio
import websockets
import json
import socket
import sys
import os
from pymavlink import mavutil
from geometry_msgs.msg import Point, Vector3
from sensor_msgs.msg import BatteryState, NavSatFix, Imu
from std_msgs.msg import Float64, String
from mavros_msgs.msg import State, VfrHud
from mavros_msgs.srv import CommandBool, CommandTOL
# 確保 src 目錄在 Python 路徑中(用於 fc_network_apps 導入)
_src_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _src_path not in sys.path:
sys.path.insert(0, _src_path)
# 導入 fc_network_apps 的函數
try:
from fc_network_apps import change_mode, takeoff
except ImportError as e:
import traceback
print(f"⚠️ 警告: 無法導入 fc_network_apps")
print(f" 错误: {e}")
print(f" 这通常表示 ROS2 的 fc_interfaces 包未被编译或未正确安装")
print(f" 完整堆栈跟踪:")
traceback.print_exc()
change_mode = None
takeoff = None
class DroneSignals(QObject):
update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data)
class UDPMavlinkReceiver(threading.Thread):
"""UDP MAVLink 接收器"""
def __init__(self, ip, port, signals, connection_name, monitor=None):
super().__init__(daemon=True)
self.ip = ip
self.port = port
self.signals = signals
self.connection_name = connection_name
self.monitor = monitor # 保存 monitor 引用
self.socket_id = monitor.get_next_socket_id() if monitor else 0
self.running = False
self.sock = None
def run(self):
"""執行 UDP 接收循環"""
self.running = True
try:
print(f"UDP MAVLink receiver started on {self.ip}:{self.port}")
# 創建 MAVLink 連接
mav = mavutil.mavlink_connection(f'udpin:{self.ip}:{self.port}')
while self.running:
try:
msg = mav.recv_match(blocking=True, timeout=1.0)
if msg is None:
continue
self.process_mavlink_message(msg)
except socket.timeout:
continue
except Exception as e:
print(f"Error receiving MAVLink message: {e}")
except Exception as e:
print(f"UDP receiver error: {e}")
finally:
if self.sock:
self.sock.close()
def process_mavlink_message(self, msg):
"""處理 MAVLink 訊息"""
try:
msg_type = msg.get_type()
system_id = msg.get_srcSystem()
drone_id = f"s{self.socket_id}_{system_id}"
if msg_type == "HEARTBEAT":
# 先發送連接類型資訊
self.signals.update_signal.emit('connection_type', drone_id, {
'type': 'UDP'
})
mode = mavutil.mode_string_v10(msg)
armed = bool(msg.base_mode & 128)
self.signals.update_signal.emit('state', drone_id, {
'mode': mode,
'armed': armed
})
elif msg_type == "BATTERY_STATUS":
voltage = msg.voltages[0] / 1000
self.signals.update_signal.emit('battery', drone_id, {
'voltage': voltage
})
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
longitude = msg.lon / 1e7
self.signals.update_signal.emit('gps', drone_id, {
'lat': latitude,
'lon': longitude
})
elif msg_type == "GPS_RAW_INT":
fix_type = msg.fix_type
elif msg_type == "LOCAL_POSITION_NED":
x = msg.y
y = msg.x
z = -msg.z
self.signals.update_signal.emit('local_pose', drone_id, {
'x': x,
'y': y,
'z': z
})
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': z
})
self.signals.update_signal.emit('velocity', drone_id, {
'vx': msg.vx,
'vy': msg.vy,
'vz': msg.vz
})
elif msg_type == "ATTITUDE":
# 從 MAVLink 訊息中提取並轉為角度
pitch = math.degrees(msg.pitch)
roll = math.degrees(msg.roll)
yaw = math.degrees(msg.yaw)
self.signals.update_signal.emit('attitude', drone_id, {
'pitch': pitch,
'roll': roll,
'yaw': yaw,
'rates': (msg.rollspeed, msg.pitchspeed, msg.yawspeed)
})
elif msg_type == "VFR_HUD":
self.signals.update_signal.emit('hud', drone_id, {
'airspeed': msg.airspeed,
'groundspeed': msg.groundspeed,
'heading': msg.heading,
'throttle': msg.throttle,
'alt': msg.alt,
'climb': msg.climb
})
except Exception as e:
print(f"Error processing MAVLink message: {e}")
def stop(self):
"""停止接收器"""
self.running = False
class SerialMavlinkReceiver(threading.Thread):
"""串口 MAVLink 接收器"""
def __init__(self, port, baudrate, signals, connection_name, monitor=None):
super().__init__(daemon=True)
self.port = port
self.baudrate = baudrate
self.signals = signals
self.connection_name = connection_name
self.monitor = monitor # 保存 monitor 引用
self.socket_id = monitor.get_next_socket_id() if monitor else 0
self.running = False
self.mav = None
def run(self):
"""執行串口接收循環"""
self.running = True
try:
print(f"Serial MAVLink receiver started on {self.port} at {self.baudrate} baud")
# 創建 MAVLink 串口連接
self.mav = mavutil.mavlink_connection(
self.port,
baud=self.baudrate,
source_system=255
)
print(f"Waiting for heartbeat from {self.port}...")
self.mav.wait_heartbeat()
print(f"Heartbeat received from system {self.mav.target_system}, component {self.mav.target_component}")
while self.running:
try:
msg = self.mav.recv_match(blocking=True, timeout=1.0)
if msg is None:
continue
self.process_mavlink_message(msg)
except Exception as e:
if self.running:
print(f"Error receiving MAVLink message from serial: {e}")
except Exception as e:
print(f"Serial receiver error: {e}")
finally:
if self.mav:
try:
self.mav.close()
except:
pass
def process_mavlink_message(self, msg):
"""處理 MAVLink 訊息"""
try:
msg_type = msg.get_type()
system_id = msg.get_srcSystem()
drone_id = f"s{self.socket_id}_{system_id}"
if msg_type == "HEARTBEAT":
# 先發送連接類型資訊
self.signals.update_signal.emit('connection_type', drone_id, {
'type': 'Serial'
})
mode = mavutil.mode_string_v10(msg)
armed = bool(msg.base_mode & 128)
self.signals.update_signal.emit('state', drone_id, {
'mode': mode,
'armed': armed
})
elif msg_type == "BATTERY_STATUS":
voltage = msg.voltages[0] / 1000
self.signals.update_signal.emit('battery', drone_id, {
'voltage': voltage
})
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
longitude = msg.lon / 1e7
self.signals.update_signal.emit('gps', drone_id, {
'lat': latitude,
'lon': longitude
})
elif msg_type == "GPS_RAW_INT":
fix_type = msg.fix_type
elif msg_type == "LOCAL_POSITION_NED":
x = msg.y
y = msg.x
z = -msg.z
self.signals.update_signal.emit('local_pose', drone_id, {
'x': x,
'y': y,
'z': z
})
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': z
})
self.signals.update_signal.emit('velocity', drone_id, {
'vx': msg.vx,
'vy': msg.vy,
'vz': msg.vz
})
elif msg_type == "ATTITUDE":
# 從 MAVLink 訊息中提取並轉為角度
pitch = math.degrees(msg.pitch)
roll = math.degrees(msg.roll)
yaw = math.degrees(msg.yaw)
self.signals.update_signal.emit('attitude', drone_id, {
'pitch': pitch,
'roll': roll,
'yaw': yaw,
'rates': (msg.rollspeed, msg.pitchspeed, msg.yawspeed)
})
elif msg_type == "VFR_HUD":
self.signals.update_signal.emit('hud', drone_id, {
'airspeed': msg.airspeed,
'groundspeed': msg.groundspeed,
'heading': msg.heading,
'throttle': msg.throttle,
'alt': msg.alt,
'climb': msg.climb
})
except Exception as e:
print(f"Error processing MAVLink message from serial: {e}")
def stop(self):
"""停止接收器"""
self.running = False
class WebSocketMavlinkReceiver(threading.Thread):
"""WebSocket MAVLink 接收器"""
def __init__(self, url, signals, connection_name, monitor=None):
super().__init__(daemon=True)
self.url = url
self.signals = signals
self.connection_name = connection_name
self.monitor = monitor # 保存 monitor 引用 self.socket_id = monitor.get_next_socket_id() if monitor else 0 # 一次性分配 socket_id self.running = False
self.max_retries = 5
self.base_delay = 1.0
def run(self):
"""執行 WebSocket 接收循環"""
self.running = True
asyncio.set_event_loop(asyncio.new_event_loop())
asyncio.get_event_loop().run_until_complete(self.ws_client_loop())
async def ws_client_loop(self):
"""WebSocket 連接的主循環"""
retry_count = 0
print(f"Starting WebSocket client for {self.connection_name} at {self.url}")
while self.running and retry_count < self.max_retries:
try:
async with websockets.connect(self.url) as websocket:
print(f"WebSocket {self.connection_name} connected to {self.url}")
retry_count = 0 # 重置重試計數
async for message in websocket:
if not self.running:
break
try:
data = json.loads(message)
data['_connection_source'] = self.connection_name
self.process_websocket_message(data)
except json.JSONDecodeError as e:
print(f"WebSocket {self.connection_name} JSON decode error: {e}")
except Exception as e:
print(f"WebSocket {self.connection_name} message processing error: {e}")
except websockets.exceptions.ConnectionClosedError:
print(f"WebSocket {self.connection_name} connection closed")
if self.running:
retry_count += 1
if retry_count < self.max_retries:
delay = self.base_delay * (2 ** min(retry_count, 4))
print(f"Reconnecting in {delay}s...")
await asyncio.sleep(delay)
else:
break
except Exception as e:
retry_count += 1
if retry_count < self.max_retries and self.running:
delay = self.base_delay * (2 ** min(retry_count, 4))
print(f"WebSocket {self.connection_name} connection error: {e}, retrying in {delay}s (attempt {retry_count}/{self.max_retries})")
await asyncio.sleep(delay)
else:
break
print(f"WebSocket client {self.connection_name} stopped")
def process_websocket_message(self, data):
"""處理 WebSocket 訊息"""
try:
system_id = data.get('system_id')
if not system_id:
return
drone_id = f"s{self.socket_id}_{system_id}"
# 模式
if 'mode' in data:
# 先發送連接類型資訊
self.signals.update_signal.emit('connection_type', drone_id, {
'type': 'WS'
})
self.signals.update_signal.emit('state', drone_id, {
'mode': data['mode'],
})
# 電池
if 'battery' in data:
self.signals.update_signal.emit('battery', drone_id, {
'percentage': data['battery']
})
# 位置
if 'position' in data:
pos = data['position']
self.signals.update_signal.emit('gps', drone_id, {
'lat': pos.get('lat', 0),
'lon': pos.get('lon', 0)
})
# Local position - 設定 x, y 為 0.0
self.signals.update_signal.emit('local_pose', drone_id, {
'x': 0.0,
'y': 0.0,
'z': 0.0
})
# Altitude - 設定為 0.0
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': 0.0
})
# 航向
if 'heading' in data:
self.signals.update_signal.emit('hud', drone_id, {
'heading': data['heading'],
'groundspeed': 0.0
})
except Exception as e:
print(f"WebSocket message processing error: {e}")
def stop(self):
"""停止接收器"""
self.running = False
class DroneMonitor(Node):
# Subscribe to drone ROS2 topics
def __init__(self):
super().__init__('drone_monitor')
self.signals = DroneSignals()
self.drone_topics = {}
self.lock = Lock()
self.arm_clients = {}
self.takeoff_clients = {}
self.setpoint_pubs = {}
self.selected_drones = set()
self.latest_data = {}
# 定義需要過濾的模式
self.filtered_modes = ['Mode(0x000000c0)']
# WebSocket 接收器列表
self.ws_receivers = []
# 串口接收器列表
# ================================================================================
# 【新增】儲存 GPS 資料的字典
# ================================================================================
self.drone_gps = {} # {drone_id: {'lat': ..., 'lon': ..., 'alt': ...}}
# ================================================================================
# ================================================================================
# 【新增】Socket ID 重新分配機制 (從 0 開始)
# ================================================================================
self.socket_id_mapping = {} # {原始socket_id: 重新分配的socket_id}
self.socket_id_counter = 0 # 當前分配到的最大socket_id
self.socket_id_lock = Lock() # 線程安全鎖
# ================================================================================
# ================================================================================
# 【新增】儲存 sys_id 到 actual_drone_id 的映射 (從 summary 獲取)
# ================================================================================
self.sys_to_actual_id = {} # {sys_id: actual_drone_id} e.g. {'sys11': 's0_11'}
self.sys_to_socket_id = {} # {sys_id: assigned_socket_id} e.g. {'sys11': 0}
# ================================================================================
self.serial_receivers = []
# 主题检测定时器
self.create_timer(1.0, self.scan_topics)
def get_next_socket_id(self):
"""获取下一个可用的 socket_id从 0 开始连续分配)"""
with self.socket_id_lock:
current_id = self.socket_id_counter
self.socket_id_counter += 1
return current_id
def get_or_assign_socket_id(self, original_socket_id):
"""根據原始 socket_id 分配或獲取對應的 socket_id從 0 開始連續分配)
同一個原始 socket_id 會得到同一個分配的 ID
"""
original_socket_id = str(original_socket_id)
with self.socket_id_lock:
if original_socket_id not in self.socket_id_mapping:
# 分配新的 socket_id
self.socket_id_mapping[original_socket_id] = self.socket_id_counter
self.socket_id_counter += 1
return self.socket_id_mapping[original_socket_id]
def scan_topics(self):
topics = self.get_topic_names_and_types()
drone_pattern = re.compile(r'/fc_network/vehicle/(sys\d+)/(\w+)')
found_drones = set()
for topic_name, _ in topics:
if match := drone_pattern.match(topic_name):
sys_id, topic_type = match.groups()
found_drones.add(sys_id)
with self.lock:
self.drone_topics.setdefault(sys_id, set()).add(topic_type)
for sys_id in found_drones:
# 为每个 sys_id 分配 socket_id如果还没有分配
# 注意:如果后续 summary 提供了 socket_id会使用 summary 的映射覆盖
if sys_id not in self.sys_to_socket_id:
# 暂时所有 ROS2 topic 共享同一个 socket_id = 0
self.sys_to_socket_id[sys_id] = 0
if not hasattr(self, f'drone_{sys_id}_subs'):
self.setup_drone(sys_id)
def setup_drone(self, sys_id):
# sys_id 格式: sys11, sys12, ...
base_topic = f'/fc_network/vehicle/{sys_id}'
# Add service clients (保留但可能無法使用,因為新 topic 可能沒有這些服務)
self.arm_clients[sys_id] = self.create_client(
CommandBool,
f'{base_topic}/cmd/arming'
)
self.takeoff_clients[sys_id] = self.create_client(
CommandTOL,
f'{base_topic}/cmd/takeoff'
)
# Add setpoint publisher
self.setpoint_pubs[sys_id] = self.create_publisher(
Point,
f'{base_topic}/setpoint_position/local',
10
)
subs = {
'battery': self.create_subscription(
BatteryState,
f'{base_topic}/battery',
lambda msg, sid=sys_id: self.battery_callback(sid, msg),
10
),
'position': self.create_subscription(
NavSatFix,
f'{base_topic}/position',
lambda msg, sid=sys_id: self.gps_callback(sid, msg),
10
),
'summary': self.create_subscription(
String,
f'{base_topic}/summary',
lambda msg, sid=sys_id: self.summary_callback(sid, msg),
10
),
'vfr_hud': self.create_subscription(
VfrHud,
f'{base_topic}/vfr_hud',
lambda msg, sid=sys_id: self.hud_callback(sid, msg),
10
)
}
setattr(self, f'drone_{sys_id}_subs', subs)
# ================================================================================
# 【新增】模式名稱到 custom_mode 值的映射(基於 Copter 模式)
# ================================================================================
MODE_MAPPING = {
"STABILIZE": 0,
"ACRO": 1,
"ALT_HOLD": 2,
"AUTO": 3,
"GUIDED": 4,
"LOITER": 5,
"RTL": 6,
"CIRCLE": 7,
"POSITION": 8,
"LAND": 9,
"OF_LOITER": 10,
"DRIFT": 11,
"SPORT": 13,
"FLIP": 14,
"AUTOTUNE": 15,
"POSHOLD": 16,
"BRAKE": 17,
"THROW": 18,
"AVOID_ADSB": 19,
"GUIDED_NOGPS": 20,
"SMART_RTL": 21,
}
# ================================================================================
async def set_mode(self, drone_id, mode_name):
"""
使用 fc_network_apps change_mode 函數切換無人機飛行模式
參數:
drone_id: 無人機ID (格式: "s{socket_id}_{sysid}")
mode_name: 模式名稱 (例如: "GUIDED", "AUTO", "LOITER")
返回:
bool: 模式切換是否成功
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
print(f"\n🔵 [SET_MODE] set_mode() 異步函數被調用 (drone_id={drone_id}, mode={mode_name})", flush=True)
print(f" 事件循环: {asyncio.get_event_loop()}", flush=True)
print(f" 当前任务: {asyncio.current_task()}\n", flush=True)
# 解析 drone_id 以提取 sysid
# drone_id 格式: "s{socket_id}_{sysid}" (例如: "s0_1", "s0_11")
try:
parts = drone_id.split('_')
if len(parts) < 2:
self.get_logger().error(f"Invalid drone_id format: {drone_id}")
print(f"❌ [SET_MODE] 無效的 drone_id 格式: {drone_id}")
print(f" 返回: False")
return False
sysid = int(parts[-1])
print(f"✓ [SET_MODE] 解析 drone_id: {drone_id} → sysid={sysid}")
except (ValueError, IndexError) as e:
self.get_logger().error(f"Failed to parse drone_id {drone_id}: {e}")
print(f"❌ [SET_MODE] 無法解析 drone_id {drone_id}: {e}")
print(f" 返回: False")
return False
# 獲取模式對應的 custom_mode 值
custom_mode = self.MODE_MAPPING.get(mode_name)
if custom_mode is None:
self.get_logger().error(f"Unknown mode: {mode_name}. Available modes: {list(self.MODE_MAPPING.keys())}")
print(f"❌ [SET_MODE] 未知模式: {mode_name}")
print(f" 支持的模式: {list(self.MODE_MAPPING.keys())}")
return False
print(f"✓ [SET_MODE] 模式對應: {mode_name} → custom_mode={custom_mode}")
# 檢查 fc_network_apps 的 change_mode 函數是否可用
if change_mode is None:
self.get_logger().error("fc_network_apps is not available. Cannot change mode.")
print(f"❌ [SET_MODE] fc_network_apps 不可用")
return False
# 使用 fc_network_apps 的 change_mode 函數
try:
msg = f"ROS2 服務呼叫: target_sysid={sysid}, custom_mode={custom_mode}, base_mode=1.0"
self.get_logger().info(f"Changing mode for drone {drone_id} to {mode_name} (custom_mode={custom_mode})")
print(f"\n📢 [SET_MODE] 開始切換模式")
print(f" Drone ID: {drone_id}")
print(f" 模式: {mode_name}")
print(f" {msg}")
# 在線程池中運行同步的 change_mode 函數
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
def _call_change_mode():
print(f"\n 🔄 [_call_change_mode] 在線程池中調用 change_mode...")
print(f" ├─ 線程開始時間: {__import__('time').time()}")
print(f" ├─ 目標: sysid={sysid}, mode={custom_mode}\n")
result = change_mode(
target_sysid=sysid,
custom_mode=float(custom_mode),
target_compid=0,
base_mode=1.0,
confirmation=0,
timeout_sec=2.0,
)
print(f"\n ├─ change_mode() 返回結果對象: {result}")
print(f" └─ 線程任務完成")
return result
print(f" 📢 [SET_MODE] 提交 change_mode 到線程池...")
result = await loop.run_in_executor(executor, _call_change_mode)
print(f"\n ✓ [SET_MODE] 從線程池接收到返回值")
print(f"\n📥 [SET_MODE] 從 change_mode() 接收服務響應:")
print(f" ├─ result 对象类型: {type(result)}")
print(f" ├─ result.success: {result.success} (类型: {type(result.success)})")
print(f" ├─ result.message: '{result.message}' (类型: {type(result.message)})")
print(f" └─ result.ack_result: {result.ack_result} (类型: {type(result.ack_result)})")
print(f"\n【返回值確認】")
print(f" success == True: {result.success == True}")
print(f" success is True: {result.success is True}")
print(f" bool(success): {bool(result.success)}")
print(f"\n【FC_NETWORK SERVICE 回传值确认】")
print(f" ├─ result.success: {result.success}")
print(f" ├─ result.message: '{result.message}'")
print(f" └─ result.ack_result: {result.ack_result}")
if result.success:
self.get_logger().info(f"Mode change successful for {drone_id}: {result.message}")
print(f"\n✅ [SET_MODE] 模式切換成功!")
print(f" ├─ fc_network 確認: success=True")
print(f" ├─ 訊息: {result.message}")
print(f" ├─ ACK代碼: {result.ack_result}")
print(f" └─ 返回到 GUI: True")
return True
else:
self.get_logger().warning(f"Mode change failed for {drone_id}: {result.message} (ack={result.ack_result})")
print(f"\n❌ [SET_MODE] 模式切換失敗!")
print(f" ├─ fc_network 確認: success=False")
print(f" ├─ 原因: {result.message}")
print(f" ├─ ACK代碼: {result.ack_result}")
print(f" └─ 返回到 GUI: False")
return False
except Exception as e:
self.get_logger().error(f"Exception during mode change for {drone_id}: {e}")
print(f"\n❌ [SET_MODE] 例外錯誤: {e}")
import traceback
traceback.print_exc()
print(f" 返回: False (异常)")
return False
async def arm_drone(self, drone_id, arm):
"""
使用 fc_network_apps arm_disarm 函數上鎖/解鎖無人機
參數:
drone_id: 無人機ID (格式: "s{socket_id}_{sysid}")
arm: True 為上鎖, False 為解鎖
返回:
bool: 操作是否成功
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
action_name = "上鎖" if arm else "解鎖"
print(f"\n🔵 [ARM_DISARM] arm_drone() 異步函數被調用 (drone_id={drone_id}, arm={arm}, 動作={action_name})", flush=True)
# 解析 drone_id 以提取 sysid
try:
parts = drone_id.split('_')
if len(parts) < 2:
self.get_logger().error(f"Invalid drone_id format: {drone_id}")
print(f"❌ [ARM_DISARM] 無效的 drone_id 格式: {drone_id}")
return False
sysid = int(parts[-1])
print(f"✓ [ARM_DISARM] 解析 drone_id: {drone_id} → sysid={sysid}")
except (ValueError, IndexError) as e:
self.get_logger().error(f"Failed to parse drone_id {drone_id}: {e}")
print(f"❌ [ARM_DISARM] 無法解析 drone_id {drone_id}: {e}")
return False
try:
msg = f"ROS2 服務呼叫: target_sysid={sysid}, arm={arm}"
self.get_logger().info(f"Changing arm state for drone {drone_id} to {action_name}")
print(f"\n📢 [ARM_DISARM] 開始{action_name}無人機")
print(f" Drone ID: {drone_id}")
print(f" 動作: {action_name}")
print(f" {msg}")
# 在線程池中直接調用 ROS2 服務(避免 arm_disarm() 導致的初始化衝突)
from fc_interfaces.srv import MavCommandLong
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
def _call_ros2_arm_service():
"""直接調用 ROS2 服務"""
import time
print(f"\n 🔄 [_call_ros2_arm_service] 在線程池中調用 ROS2 服務...")
print(f" ├─ 時間: {time.time()}")
print(f" ├─ 目標: sysid={sysid}, arm={arm}")
print(f" └─ 直接調用ROS2服務避免rclpy.init()衝突)\n")
try:
# 建立 ROS2 客戶端(使用現有 context
client = self.create_client(MavCommandLong, "/fc_network/vehicle/send_command_long")
# 等待服務
if not client.wait_for_service(timeout_sec=2.0):
print(f" ❌ 服務不可用")
return {'success': False, 'message': 'Service not available', 'ack_result': -1}
print(f" ✓ 服務已連接")
# 準備請求
req = MavCommandLong.Request()
req.target_sysid = sysid
req.target_compid = 0
req.command = 400 # MAV_CMD_COMPONENT_ARM_DISARM
req.confirmation = 0
req.param1 = 1.0 if arm else 0.0
req.param2 = 0.0
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = 2.0
# 調用服務
future = client.call_async(req)
# 簡單等待
timeout = time.time() + 3.0
while not future.done() and time.time() < timeout:
time.sleep(0.01)
if future.done() and future.result():
response = future.result()
return {
'success': response.success,
'message': response.message,
'ack_result': response.ack_result,
}
else:
return {'success': False, 'message': 'Service call timeout', 'ack_result': -1}
except Exception as e:
print(f" ❌ 異常: {e}")
return {'success': False, 'message': str(e), 'ack_result': -1}
print(f" 📢 [ARM_DISARM] 提交 ROS2 服務呼叫到線程池...")
result_dict = await loop.run_in_executor(executor, _call_ros2_arm_service)
if result_dict['success']:
self.get_logger().info(f"Arm state change successful for {drone_id}")
print(f"\n✅ [ARM_DISARM] 無人機{action_name}成功!")
return True
else:
self.get_logger().warning(f"Arm state change failed for {drone_id}")
print(f"\n❌ [ARM_DISARM] 無人機{action_name}失敗!")
return False
except Exception as e:
self.get_logger().error(f"Exception during arm state change for {drone_id}: {e}")
print(f"\n❌ [ARM_DISARM] 例外錯誤: {e}")
return False
async def takeoff_drone(self, drone_id, altitude):
"""
使用 fc_network_apps takeoff 函數執行無人機起飛
參數:
drone_id: 無人機ID (格式: "s{socket_id}_{sysid}")
altitude: 目標高度 ()
返回:
bool: 起飛是否成功
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
print(f"\n🔵 [TAKEOFF] takeoff_drone() 異步函數被調用 (drone_id={drone_id}, altitude={altitude})", flush=True)
# 解析 drone_id 以提取 sysid
try:
parts = drone_id.split('_')
if len(parts) < 2:
self.get_logger().error(f"Invalid drone_id format: {drone_id}")
print(f"❌ [TAKEOFF] 無效的 drone_id 格式: {drone_id}")
return False
sysid = int(parts[-1])
print(f"✓ [TAKEOFF] 解析 drone_id: {drone_id} → sysid={sysid}")
except (ValueError, IndexError) as e:
self.get_logger().error(f"Failed to parse drone_id {drone_id}: {e}")
print(f"❌ [TAKEOFF] 無法解析 drone_id {drone_id}: {e}")
return False
# 檢查 fc_network_apps 的 takeoff 函數是否可用
if takeoff is None:
self.get_logger().error("fc_network_apps takeoff is not available. Cannot takeoff drone.")
print(f"❌ [TAKEOFF] fc_network_apps takeoff 不可用")
return False
try:
print(f"\n📢 [TAKEOFF] 開始起飛無人機")
print(f" Drone ID: {drone_id}")
print(f" ROS2 服務呼叫: target_sysid={sysid}, altitude_m={altitude}")
# 在線程池中運行同步的 takeoff 函數
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
def _call_takeoff():
print(f"\n 🔄 [_call_takeoff] 在線程池中調用 takeoff(altitude={altitude})...")
result = takeoff(
target_sysid=sysid,
altitude_m=float(altitude),
target_compid=0,
min_pitch_deg=0.0,
yaw_deg=0.0,
timeout_sec=2.0,
)
print(f"\n └─ takeoff() 返回結果")
return result
print(f" 📢 [TAKEOFF] 提交 takeoff 到線程池...")
result = await loop.run_in_executor(executor, _call_takeoff)
print(f"\n📥 [TAKEOFF] 從 takeoff() 接收服務響應:")
print(f" ├─ result.success: {result.success}")
print(f" ├─ result.message: '{result.message}'")
print(f" └─ result.ack_result: {result.ack_result}")
if result.success:
self.get_logger().info(f"Drone {drone_id} takeoff successfully: {result.message}")
print(f"\n✅ [TAKEOFF] 無人機起飛成功!")
print(f" ├─ fc_network 確認: success=True")
print(f" ├─ 訊息: {result.message}")
print(f" └─ ACK代碼: {result.ack_result}")
return True
else:
self.get_logger().warning(f"Failed to takeoff drone {drone_id}: {result.message} (ack={result.ack_result})")
print(f"\n❌ [TAKEOFF] 無人機起飛失敗!")
print(f" ├─ fc_network 確認: success=False")
print(f" ├─ 原因: {result.message}")
print(f" └─ ACK代碼: {result.ack_result}")
return False
except Exception as e:
self.get_logger().error(f"Exception during takeoff for {drone_id}: {e}")
print(f"\n❌ [TAKEOFF] 例外錯誤: {e}")
import traceback
traceback.print_exc()
return False
def send_setpoint(self, drone_id, x, y, z):
"""Send setpoint position command"""
if drone_id not in self.setpoint_pubs:
return False
msg = Point()
msg.x = float(x)
msg.y = float(y)
msg.z = float(z)
self.setpoint_pubs[drone_id].publish(msg)
return True
def quaternion_to_euler(self, q):
sinr_cosp = 2 * (q.w * q.x + q.y * q.z)
cosr_cosp = 1 - 2 * (q.x**2 + q.y**2)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2 * (q.w * q.y - q.z * q.x)
pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi/2, sinp)
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y**2 + q.z**2)
yaw = math.atan2(siny_cosp, cosy_cosp)
return math.degrees(roll), math.degrees(pitch), math.degrees(yaw)
# callbacks
def attitude_callback(self, drone_id, msg):
if hasattr(msg, 'orientation'):
roll, pitch, yaw = self.quaternion_to_euler(msg.orientation)
self.latest_data[(drone_id, 'attitude')] = {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'rates': (msg.angular_velocity.x,
msg.angular_velocity.y,
msg.angular_velocity.z)
}
def battery_callback(self, sys_id, msg):
# 使用映射獲取實際的 drone_id
actual_drone_id = self.sys_to_actual_id.get(sys_id, None)
# 如果還沒有從 summary 獲取到映射,則不處理
if actual_drone_id is None:
return
self.latest_data[(actual_drone_id, 'battery')] = {
'voltage': msg.voltage
}
def state_callback(self, drone_id, msg):
mode = msg.mode
if mode in self.filtered_modes:
return
self.latest_data[(drone_id, 'state')] = {
'mode': msg.mode,
'armed': msg.armed
}
def summary_callback(self, sys_id, msg):
"""處理 summary topic (JSON 格式)"""
try:
data = json.loads(msg.data)
mode = data.get('mode_name', 'UNKNOWN')
if mode in self.filtered_modes:
return
# 從 summary 獲取原始 socket_id並映射到分配的 socket_id
original_socket_id = data.get('socket_id')
if original_socket_id is not None:
# 使用原始 socket_id 獲取或分配統一的 socket_id
assigned_socket_id = self.get_or_assign_socket_id(original_socket_id)
else:
# 如果沒有 socket_id使用 sys_to_socket_id 映射
assigned_socket_id = self.sys_to_socket_id.get(sys_id, 0)
sysid = data.get('sysid')
if sysid is not None:
actual_drone_id = f's{assigned_socket_id}_{sysid}'
# ================================================================================
# 【關鍵】保存 sys_id 到 actual_drone_id 的映射
# ================================================================================
self.sys_to_actual_id[sys_id] = actual_drone_id
# ================================================================================
else:
# 如果沒有 sysid使用 sys_id 中的數字
sys_num = sys_id.replace('sys', '')
actual_drone_id = f's{assigned_socket_id}_{sys_num}'
self.sys_to_actual_id[sys_id] = actual_drone_id
# 先發送連接類型資訊
self.signals.update_signal.emit('connection_type', actual_drone_id, {
'type': 'ROS2'
})
self.latest_data[(actual_drone_id, 'state')] = {
'mode': mode,
'armed': data.get('armed', False),
'socket_id': original_socket_id,
'sysid': sysid,
'vehicle_type': data.get('vehicle_type'),
'autopilot': data.get('autopilot'),
'gps_fix': data.get('gps_fix'),
'gps_fix_type': data.get('gps_fix'),
'connected': data.get('connected')
}
except json.JSONDecodeError as e:
print(f"Error parsing summary JSON for {sys_id}: {e}")
except Exception as e:
print(f"Error in summary_callback for {sys_id}: {e}")
def gps_callback(self, sys_id, msg):
# 使用映射獲取實際的 drone_id
actual_drone_id = self.sys_to_actual_id.get(sys_id, None)
# 如果還沒有從 summary 獲取到映射,則不處理
if actual_drone_id is None:
return
self.latest_data[(actual_drone_id, 'gps')] = {
'lat': msg.latitude,
'lon': msg.longitude,
'alt': msg.altitude
}
# ================================================================================
# 【新增】儲存 GPS 資料到 drone_gps 字典
# ================================================================================
self.drone_gps[actual_drone_id] = {
'lat': msg.latitude,
'lon': msg.longitude,
'alt': msg.altitude
}
# ================================================================================
def local_vel_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'velocity')] = {
'vx': msg.x,
'vy': msg.y,
'vz': msg.z
}
def altitude_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'altitude')] = {
'altitude': msg.data
}
def local_pose_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'local_pose')] = {
'x': msg.x,
'y': msg.y,
'z': msg.z
}
def hud_callback(self, sys_id, msg):
# 使用映射獲取實際的 drone_id
actual_drone_id = self.sys_to_actual_id.get(sys_id, None)
# 如果還沒有從 summary 獲取到映射,則不處理
if actual_drone_id is None:
return
self.latest_data[(actual_drone_id, 'hud')] = {
'airspeed': msg.airspeed,
'groundspeed': msg.groundspeed,
'heading': msg.heading,
'throttle': msg.throttle,
'alt': msg.altitude,
'climb': msg.climb
}
def loss_rate_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'loss_rate')] = {
'loss_rate': msg.data
}
def ping_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'ping')] = {
'ping': msg.data
}
def start_serial_connection(self, port='/dev/ttyUSB0', baudrate=57600):
"""啟動串口 MAVLink 連接"""
connection_name = f"Serial_{port.replace('/', '_')}"
receiver = SerialMavlinkReceiver(port, baudrate, self.signals, connection_name, self)
receiver.start()
self.serial_receivers.append(receiver)
print(f"Started serial connection on {port} at {baudrate} baud")
return receiver