You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

91 lines
2.5 KiB
Python

"""Simple wrapper for land via fc_network ROS2 service."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from fc_interfaces.srv import MavCommandLong
COMMAND_NAV_LAND = 21
DEFAULT_SERVICE_NAME = "/fc_network/vehicle/send_command_long"
DEFAULT_TIMEOUT_SEC = 2.0
@dataclass
class LandResult:
success: bool
message: str
ack_result: int
def land(
*,
target_sysid: int,
target_compid: int = 0,
yaw_deg: float = 0.0,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
altitude_m: float = 0.0,
timeout_sec: float = DEFAULT_TIMEOUT_SEC,
service_name: str = DEFAULT_SERVICE_NAME,
) -> LandResult:
"""One-shot MAV_CMD_NAV_LAND wrapper."""
rclpy.init(args=None)
node: Optional[Node] = None
try:
node = Node("fc_land_client_once")
client = node.create_client(MavCommandLong, service_name)
if not client.wait_for_service(timeout_sec=timeout_sec):
return LandResult(
success=False,
message=f"Service not available: {service_name}",
ack_result=-1,
)
req = MavCommandLong.Request()
req.target_sysid = target_sysid
req.target_compid = target_compid
req.command = COMMAND_NAV_LAND
req.confirmation = 0
req.param1 = 0.0
req.param2 = 0.0
req.param3 = 0.0
req.param4 = float(yaw_deg)
req.param5 = float(latitude) if latitude is not None else 0.0
req.param6 = float(longitude) if longitude is not None else 0.0
req.param7 = float(altitude_m)
req.timeout_sec = float(timeout_sec)
future = client.call_async(req)
rclpy.spin_until_future_complete(node, future, timeout_sec=timeout_sec + 1.0)
if not future.done() or future.result() is None:
return LandResult(
success=False,
message="Service call timeout or no response.",
ack_result=-1,
)
response = future.result()
return LandResult(
success=response.success,
message=response.message,
ack_result=response.ack_result,
)
finally:
if node is not None:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
result = land(target_sysid=3)
print(
f"land success={result.success}, "
f"ack_result={result.ack_result}, message='{result.message}'"
)