"""ROS2 client for fc_network MavCommandLong service (COMMAND_LONG 系列指令).""" from __future__ import annotations from dataclasses import dataclass from typing import Optional import rclpy from rclpy.node import Node from fc_interfaces.srv import MavCommandLong COMMAND_DO_SET_MODE = 176 COMMAND_NAV_LAND = 21 COMMAND_NAV_TAKEOFF = 22 COMMAND_COMPONENT_ARM_DISARM = 400 DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long" DEFAULT_TIMEOUT_SEC = 2.0 @dataclass class CommandLongResult: success: bool message: str ack_result: int class CommandLongClient(Node): """ROS2 client : 對同一個 send_command_long service 發送各種 MAV_CMD_*。""" def __init__(self, service_name: str = DEFAULT_SERVICE_NAME) -> None: if not rclpy.ok(): rclpy.init(args=None) super().__init__("fc_command_long_client") self._client = self.create_client(MavCommandLong, service_name) def wait_for_service(self, timeout_sec: float = 3.0) -> bool: return self._client.wait_for_service(timeout_sec=timeout_sec) def _send_command_long( self, *, target_sysid: int, target_compid: int, command: int, confirmation: int, param1: float, param2: float, param3: float, param4: float, param5: float, param6: float, param7: float, timeout_sec: float, ) -> CommandLongResult: req = MavCommandLong.Request() req.target_sysid = target_sysid req.target_compid = target_compid req.command = command req.confirmation = confirmation req.param1 = param1 req.param2 = param2 req.param3 = param3 req.param4 = param4 req.param5 = param5 req.param6 = param6 req.param7 = param7 req.timeout_sec = float(timeout_sec) future = self._client.call_async(req) rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec + 1.0) if not future.done() or future.result() is None: return CommandLongResult( success=False, message="Service timeout.", ack_result=-1, ) response = future.result() return CommandLongResult( success=response.success, message=response.message, ack_result=response.ack_result, ) def change_mode( self, *, target_sysid: int, custom_mode: float, target_compid: int = 0, base_mode: float = 1.0, confirmation: int = 0, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: return self._send_command_long( target_sysid=target_sysid, target_compid=target_compid, command=COMMAND_DO_SET_MODE, confirmation=confirmation, param1=float(base_mode), param2=float(custom_mode), param3=0.0, param4=0.0, param5=0.0, param6=0.0, param7=0.0, timeout_sec=timeout_sec, ) def takeoff( self, *, target_sysid: int, altitude_m: float, target_compid: int = 0, min_pitch_deg: float = 0.0, yaw_deg: float = 0.0, latitude: Optional[float] = None, longitude: Optional[float] = None, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: return self._send_command_long( target_sysid=target_sysid, target_compid=target_compid, command=COMMAND_NAV_TAKEOFF, confirmation=0, param1=float(min_pitch_deg), param2=0.0, param3=0.0, param4=float(yaw_deg), param5=float(latitude) if latitude is not None else 0.0, param6=float(longitude) if longitude is not None else 0.0, param7=float(altitude_m), timeout_sec=timeout_sec, ) def land( self, *, target_sysid: int, target_compid: int = 0, yaw_deg: float = 0.0, latitude: Optional[float] = None, longitude: Optional[float] = None, altitude_m: float = 0.0, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: return self._send_command_long( target_sysid=target_sysid, target_compid=target_compid, command=COMMAND_NAV_LAND, confirmation=0, param1=0.0, param2=0.0, param3=0.0, param4=float(yaw_deg), param5=float(latitude) if latitude is not None else 0.0, param6=float(longitude) if longitude is not None else 0.0, param7=float(altitude_m), timeout_sec=timeout_sec, ) def arm_disarm( self, *, target_sysid: int, arm: bool, target_compid: int = 0, confirmation: int = 0, param2: float = 0.0, timeout_sec: float = DEFAULT_TIMEOUT_SEC, ) -> CommandLongResult: return self._send_command_long( target_sysid=target_sysid, target_compid=target_compid, command=COMMAND_COMPONENT_ARM_DISARM, confirmation=confirmation, param1=1.0 if arm else 0.0, param2=float(param2), param3=0.0, param4=0.0, param5=0.0, param6=0.0, param7=0.0, timeout_sec=timeout_sec, )