from __future__ import annotations from dataclasses import dataclass from typing import Optional import rclpy from rclpy.node import Node from fc_interfaces.srv import MavCommandLong COMMAND_DO_SET_MODE = 176 DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long" DEFAULT_TIMEOUT_SEC = 2.0 @dataclass class ChangeModeResult: """Return value for mode change requests.""" success: bool message: str ack_result: int class CommandLongClient(Node): """Small ROS2 client dedicated to change flight mode.""" def __init__(self, service_name: str = DEFAULT_SERVICE_NAME) -> None: rclpy.init() super().__init__("fc_change_mode_client") self._client = self.create_client(MavCommandLong, service_name) def wait_for_service(self, timeout_sec: float = 3.0) -> bool: return self._client.wait_for_service(timeout_sec=timeout_sec) def change_mode( self, *, target_sysid: int, custom_mode: float, target_compid: int = 0, base_mode: float = 1.0, confirmation: int = 0, timeout_sec: float = DEFAULT_TIMEOUT_SEC ) -> ChangeModeResult: req = MavCommandLong.Request() req.target_sysid = target_sysid req.target_compid = target_compid req.command = COMMAND_DO_SET_MODE req.confirmation = confirmation req.param1 = float(base_mode) req.param2 = float(custom_mode) req.param3 = 0.0 req.param4 = 0.0 req.param5 = 0.0 req.param6 = 0.0 req.param7 = 0.0 req.timeout_sec = float(timeout_sec) future = self._client.call_async(req) rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec + 1.0) if not future.done() or future.result() is None: return ChangeModeResult( success=False, message="Service call timeout or no response.", ack_result=-1, ) response = future.result() return ChangeModeResult( success=response.success, message=response.message, ack_result=response.ack_result, )