You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/fc_network_apps/changeMode.py

127 lines
3.8 KiB
Python

"""Simple wrapper for mode change via fc_network ROS2 service.
Reference CLI command:
ros2 service call /fc_network/vehicle/send_command_long \
fc_interfaces/srv/MavCommandLong \
"{target_sysid: 3, target_compid: 0, command: 176, confirmation: 0, \
param1: 1, param2: 4, param3: 0, param4: 0, param5: 0, param6: 0, \
param7: 0, timeout_sec: 2}"
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_DO_SET_MODE = 176
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class ChangeModeResult:
"""Return value for mode change requests."""
success: bool
message: str
ack_result: int
class ChangeModeClient(Node):
"""Small ROS2 client dedicated to change flight mode."""
def __init__(self, service_name: str = DEFAULT_SERVICE_NAME) -> None:
super().__init__("fc_change_mode_client")
self._client = self.create_client(MavCommandLong, service_name)
def wait_for_service(self, timeout_sec: float = 3.0) -> bool:
return self._client.wait_for_service(timeout_sec=timeout_sec)
def change_mode(
self,
*,
target_sysid: int,
custom_mode: float,
target_compid: int = 0,
base_mode: float = 1.0,
confirmation: int = 0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
) -> ChangeModeResult:
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_DO_SET_MODE
req.confirmation = confirmation
req.param1 = float(base_mode)
req.param2 = float(custom_mode)
req.param3 = 0.0
req.param4 = 0.0
req.param5 = 0.0
req.param6 = 0.0
req.param7 = 0.0
req.timeout_sec = float(timeout_sec)
future = self._client.call_async(req)
rclpy.spin_until_future_complete(self, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return ChangeModeResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return ChangeModeResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
def change_mode(
*,
target_sysid: int,
custom_mode: float,
target_compid: int = 0,
base_mode: float = 1.0,
confirmation: int = 0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> ChangeModeResult:
"""One-shot helper for collaborators who want minimal code."""
rclpy.init(args=None)
node: Optional[ChangeModeClient] = None
try:
node = ChangeModeClient(service_name=service_name)
if not node.wait_for_service(timeout_sec=timeout_sec):
return ChangeModeResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
return node.change_mode(
target_sysid=target_sysid,
target_compid=target_compid,
custom_mode=custom_mode,
base_mode=base_mode,
confirmation=confirmation,
timeout_sec=timeout_sec,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
# Example values are aligned with your terminal command.
result = change_mode(target_sysid=3, custom_mode=4)
print(
f"change_mode success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)