You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/fc_network_apps/longCommand.py

382 lines
12 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""ROS2 client for fc_network MavCommandLong service (COMMAND_LONG 系列指令)."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import asyncio
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_DO_SET_MODE = 176
COMMAND_NAV_LAND = 21
COMMAND_NAV_TAKEOFF = 22
COMMAND_COMPONENT_ARM_DISARM = 400
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class CommandLongResult:
success: bool
message: str
ack_result: int
class CommandLongClient(Node):
"""ROS2 client : 對同一個 send_command_long service 發送各種 MAV_CMD_*。"""
def __init__(self, service_name: str = DEFAULT_SERVICE_NAME, node_name: str = None) -> None:
if not rclpy.ok():
rclpy.init(args=None)
# 使用提供的 node_name或使用帶時間戳的預設名稱以避免名稱衝突
if node_name is None:
import time
node_name = f"fc_command_long_client_{int(time.time() * 1000) % 100000}"
super().__init__(node_name)
self._client = self.create_client(MavCommandLong, service_name)
def wait_for_service(self, timeout_sec: float = 3.0) -> bool:
return self._client.wait_for_service(timeout_sec=timeout_sec)
def _send_command_long(
self,
*,
target_sysid: int,
target_compid: int,
command: int,
confirmation: int,
param1: float,
param2: float,
param3: float,
param4: float,
param5: float,
param6: float,
param7: float,
timeout_sec: float,
) -> CommandLongResult:
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = command
req.confirmation = confirmation
req.param1 = param1
req.param2 = param2
req.param3 = param3
req.param4 = param4
req.param5 = param5
req.param6 = param6
req.param7 = param7
req.timeout_sec = float(timeout_sec)
future = self._client.call_async(req)
rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return CommandLongResult(
success=False,
message="Service timeout.",
ack_result=-1,
)
response = future.result()
return CommandLongResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
def change_mode(
self,
*,
target_sysid: int,
custom_mode: float,
target_compid: int = 0,
base_mode: float = 1.0,
confirmation: int = 0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> CommandLongResult:
return self._send_command_long(
target_sysid=target_sysid,
target_compid=target_compid,
command=COMMAND_DO_SET_MODE,
confirmation=confirmation,
param1=float(base_mode),
param2=float(custom_mode),
param3=0.0,
param4=0.0,
param5=0.0,
param6=0.0,
param7=0.0,
timeout_sec=timeout_sec,
)
def takeoff(
self,
*,
target_sysid: int,
altitude_m: float,
target_compid: int = 0,
min_pitch_deg: float = 0.0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> CommandLongResult:
return self._send_command_long(
target_sysid=target_sysid,
target_compid=target_compid,
command=COMMAND_NAV_TAKEOFF,
confirmation=0,
param1=float(min_pitch_deg),
param2=0.0,
param3=0.0,
param4=float(yaw_deg),
param5=float(latitude) if latitude is not None else 0.0,
param6=float(longitude) if longitude is not None else 0.0,
param7=float(altitude_m),
timeout_sec=timeout_sec,
)
def land(
self,
*,
target_sysid: int,
target_compid: int = 0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
altitude_m: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> CommandLongResult:
return self._send_command_long(
target_sysid=target_sysid,
target_compid=target_compid,
command=COMMAND_NAV_LAND,
confirmation=0,
param1=0.0,
param2=0.0,
param3=0.0,
param4=float(yaw_deg),
param5=float(latitude) if latitude is not None else 0.0,
param6=float(longitude) if longitude is not None else 0.0,
param7=float(altitude_m),
timeout_sec=timeout_sec,
)
def arm_disarm(
self,
*,
target_sysid: int,
arm: bool,
target_compid: int = 0,
confirmation: int = 0,
param2: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> CommandLongResult:
return self._send_command_long(
target_sysid=target_sysid,
target_compid=target_compid,
command=COMMAND_COMPONENT_ARM_DISARM,
confirmation=confirmation,
param1=1.0 if arm else 0.0,
param2=float(param2),
param3=0.0,
param4=0.0,
param5=0.0,
param6=0.0,
param7=0.0,
timeout_sec=timeout_sec,
)
# ============================================================================
# 【新增】非阻塞 async 包裝方法(用於 GUI 的非阻塞調用)
# 這些方法在 ThreadPoolExecutor 中運行同步版本,以避免阻塞事件循環
# ============================================================================
# ============================================================================
# 【重新實現】正確的非阻塞 async 方法(不使用 ThreadPoolExecutor
# 使用 asyncio 的 polling 機制,避免在線程中調用 spin_until_future_complete
# ============================================================================
async def _send_command_long_async(
self,
*,
target_sysid: int,
target_compid: int,
command: int,
confirmation: int,
param1: float,
param2: float,
param3: float,
param4: float,
param5: float,
param6: float,
param7: float,
timeout_sec: float,
) -> CommandLongResult:
"""非阻塞的 async 版本 - 使用 asyncio polling 而非 spin"""
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = command
req.confirmation = confirmation
req.param1 = param1
req.param2 = param2
req.param3 = param3
req.param4 = param4
req.param5 = param5
req.param6 = param6
req.param7 = param7
req.timeout_sec = float(timeout_sec)
# 發送異步調用
future = self._client.call_async(req)
# 使用 asyncio.sleep 進行 polling而非 spin_until_future_complete
# 使用 10ms 輪詢間隔以匹配 GUI 執行器的 spin_once() 頻率
timeout = timeout_sec + 1.0
elapsed = 0.0
poll_interval = 0.01 # 10ms - 與 GUI timer 頻率一致
while elapsed < timeout:
if future.done():
try:
response = future.result()
if response is None:
return CommandLongResult(
success=False,
message="Service returned None.",
ack_result=-1,
)
return CommandLongResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
except Exception as e:
return CommandLongResult(
success=False,
message=f"Error getting result: {e}",
ack_result=-1,
)
# 讓出控制權給事件循環,允許 GUI executor 執行
await asyncio.sleep(poll_interval)
elapsed += poll_interval
return CommandLongResult(
success=False,
message=f"Service timeout after {timeout}s.",
ack_result=-1,
)
async def change_mode_async(
self,
*,
target_sysid: int,
custom_mode: float,
target_compid: int = 0,
base_mode: float = 1.0,
confirmation: int = 0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> CommandLongResult:
"""非阻塞 async 版本的 change_mode"""
return await self._send_command_long_async(
target_sysid=target_sysid,
target_compid=target_compid,
command=COMMAND_DO_SET_MODE,
confirmation=confirmation,
param1=float(base_mode),
param2=float(custom_mode),
param3=0.0,
param4=0.0,
param5=0.0,
param6=0.0,
param7=0.0,
timeout_sec=timeout_sec,
)
async def arm_disarm_async(
self,
*,
target_sysid: int,
arm: bool,
target_compid: int = 0,
confirmation: int = 0,
param2: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> CommandLongResult:
"""非阻塞 async 版本的 arm_disarm"""
return await self._send_command_long_async(
target_sysid=target_sysid,
target_compid=target_compid,
command=COMMAND_COMPONENT_ARM_DISARM,
confirmation=confirmation,
param1=1.0 if arm else 0.0,
param2=float(param2),
param3=0.0,
param4=0.0,
param5=0.0,
param6=0.0,
param7=0.0,
timeout_sec=timeout_sec,
)
async def takeoff_async(
self,
*,
target_sysid: int,
altitude_m: float,
target_compid: int = 0,
min_pitch_deg: float = 0.0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> CommandLongResult:
"""非阻塞 async 版本的 takeoff"""
return await self._send_command_long_async(
target_sysid=target_sysid,
target_compid=target_compid,
command=COMMAND_NAV_TAKEOFF,
confirmation=0,
param1=float(min_pitch_deg),
param2=0.0,
param3=0.0,
param4=float(yaw_deg),
param5=float(latitude) if latitude is not None else 0.0,
param6=float(longitude) if longitude is not None else 0.0,
param7=float(altitude_m),
timeout_sec=timeout_sec,
)
async def land_async(
self,
*,
target_sysid: int,
target_compid: int = 0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
altitude_m: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> CommandLongResult:
"""非阻塞 async 版本的 land"""
return await self._send_command_long_async(
target_sysid=target_sysid,
target_compid=target_compid,
command=COMMAND_NAV_LAND,
confirmation=0,
param1=0.0,
param2=0.0,
param3=0.0,
param4=float(yaw_deg),
param5=float(latitude) if latitude is not None else 0.0,
param6=float(longitude) if longitude is not None else 0.0,
param7=float(altitude_m),
timeout_sec=timeout_sec,
)