"""ROS2 client for fc_network MavCommandLong service (COMMAND_LONG 系列指令).""" from __future__ import annotations from dataclasses import dataclass from typing import Optional import asyncio import rclpy from rclpy.node import Node from fc_interfaces.srv import MavCommandLong COMMAND_DO_SET_MODE = 176 COMMAND_NAV_LAND = 21 COMMAND_NAV_TAKEOFF = 22 COMMAND_COMPONENT_ARM_DISARM = 400 DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long" DEFAULT_TIMEOUT_SEC = 2.0 @dataclass class CommandLongResult: success: bool message: str ack_result: int class CommandLongClient(Node): """ROS2 client : 對同一個 send_command_long service 發送各種 MAV_CMD_*。""" def __init__(self, service_name: str = DEFAULT_SERVICE_NAME) -> None: if not rclpy.ok(): rclpy.init(args=None) super().__init__("fc_command_long_client") self._client = self.create_client(MavCommandLong, service_name) def wait_for_service(self, timeout_sec: float = 3.0) -> bool: return self._client.wait_for_service(timeout_sec=timeout_sec) def _send_command_long( self, *, target_sysid: int, target_compid: int, command: int, confirmation: int, param1: float, param2: float, param3: float, param4: float, param5: float, param6: float, param7: float, timeout_sec: float, ) -> CommandLongResult: req = MavCommandLong.Request() req.target_sysid = target_sysid req.target_compid = target_compid req.command = command req.confirmation = confirmation req.param1 = param1 req.param2 = param2 req.param3 = param3 req.param4 = param4 req.param5 = param5 req.param6 = param6 req.param7 = param7 req.timeout_sec = float(timeout_sec) future = self._client.call_async(req) rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec + 1.0) if not future.done() or future.result() is None: return CommandLongResult( success=False, message="Service timeout.", ack_result=-1, ) response = future.result() return CommandLongResult( success=response.success, message=response.message, ack_result=response.ack_result, ) def change_mode( self, *, target_sysid: int, custom_mode: float, target_compid: int = 0, base_mode: float = 1.0, confirmation: int = 0, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: return self._send_command_long( target_sysid=target_sysid, target_compid=target_compid, command=COMMAND_DO_SET_MODE, confirmation=confirmation, param1=float(base_mode), param2=float(custom_mode), param3=0.0, param4=0.0, param5=0.0, param6=0.0, param7=0.0, timeout_sec=timeout_sec, ) def takeoff( self, *, target_sysid: int, altitude_m: float, target_compid: int = 0, min_pitch_deg: float = 0.0, yaw_deg: float = 0.0, latitude: Optional[float] = None, longitude: Optional[float] = None, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: return self._send_command_long( target_sysid=target_sysid, target_compid=target_compid, command=COMMAND_NAV_TAKEOFF, confirmation=0, param1=float(min_pitch_deg), param2=0.0, param3=0.0, param4=float(yaw_deg), param5=float(latitude) if latitude is not None else 0.0, param6=float(longitude) if longitude is not None else 0.0, param7=float(altitude_m), timeout_sec=timeout_sec, ) def land( self, *, target_sysid: int, target_compid: int = 0, yaw_deg: float = 0.0, latitude: Optional[float] = None, longitude: Optional[float] = None, altitude_m: float = 0.0, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: return self._send_command_long( target_sysid=target_sysid, target_compid=target_compid, command=COMMAND_NAV_LAND, confirmation=0, param1=0.0, param2=0.0, param3=0.0, param4=float(yaw_deg), param5=float(latitude) if latitude is not None else 0.0, param6=float(longitude) if longitude is not None else 0.0, param7=float(altitude_m), timeout_sec=timeout_sec, ) def arm_disarm( self, *, target_sysid: int, arm: bool, target_compid: int = 0, confirmation: int = 0, param2: float = 0.0, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: return self._send_command_long( target_sysid=target_sysid, target_compid=target_compid, command=COMMAND_COMPONENT_ARM_DISARM, confirmation=confirmation, param1=1.0 if arm else 0.0, param2=float(param2), param3=0.0, param4=0.0, param5=0.0, param6=0.0, param7=0.0, timeout_sec=timeout_sec, ) # ============================================================================ # 【新增】非阻塞 async 包裝方法(用於 GUI 的非阻塞調用) # 這些方法在 ThreadPoolExecutor 中運行同步版本,以避免阻塞事件循環 # ============================================================================ async def change_mode_async( self, *, target_sysid: int, custom_mode: float, target_compid: int = 0, base_mode: float = 1.0, confirmation: int = 0, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: """非阻塞 async 版本的 change_mode(在 ThreadPoolExecutor 中運行)""" from concurrent.futures import ThreadPoolExecutor loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) return await loop.run_in_executor( executor, lambda: self.change_mode( target_sysid=target_sysid, custom_mode=custom_mode, target_compid=target_compid, base_mode=base_mode, confirmation=confirmation, timeout_sec=timeout_sec, ) ) async def arm_disarm_async( self, *, target_sysid: int, arm: bool, target_compid: int = 0, confirmation: int = 0, param2: float = 0.0, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: """非阻塞 async 版本的 arm_disarm(在 ThreadPoolExecutor 中運行)""" from concurrent.futures import ThreadPoolExecutor loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) return await loop.run_in_executor( executor, lambda: self.arm_disarm( target_sysid=target_sysid, arm=arm, target_compid=target_compid, confirmation=confirmation, param2=param2, timeout_sec=timeout_sec, ) ) async def takeoff_async( self, *, target_sysid: int, altitude_m: float, target_compid: int = 0, min_pitch_deg: float = 0.0, yaw_deg: float = 0.0, latitude: Optional[float] = None, longitude: Optional[float] = None, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: """非阻塞 async 版本的 takeoff(在 ThreadPoolExecutor 中運行)""" from concurrent.futures import ThreadPoolExecutor loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) return await loop.run_in_executor( executor, lambda: self.takeoff( target_sysid=target_sysid, altitude_m=altitude_m, target_compid=target_compid, min_pitch_deg=min_pitch_deg, yaw_deg=yaw_deg, latitude=latitude, longitude=longitude, timeout_sec=timeout_sec, ) ) async def land_async( self, *, target_sysid: int, target_compid: int = 0, yaw_deg: float = 0.0, latitude: Optional[float] = None, longitude: Optional[float] = None, altitude_m: float = 0.0, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: """非阻塞 async 版本的 land(在 ThreadPoolExecutor 中運行)""" from concurrent.futures import ThreadPoolExecutor loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) return await loop.run_in_executor( executor, lambda: self.land( target_sysid=target_sysid, target_compid=target_compid, yaw_deg=yaw_deg, latitude=latitude, longitude=longitude, altitude_m=altitude_m, timeout_sec=timeout_sec, ) )