"""ROS2 client for fc_network MavPositionTargetGlobalInt service (SET_POSITION_TARGET_GLOBAL_INT / Offboard).""" from __future__ import annotations import math from dataclasses import dataclass import rclpy from rclpy.node import Node from fc_interfaces.srv import MavPositionTargetGlobalInt DEFAULT_SERVICE_NAME = "/fc_network/vehicle/pos_global_int" DEFAULT_TIMEOUT_SEC = 2.0 # MAV_FRAME MAV_FRAME_GLOBAL_RELATIVE_ALT_INT = 6 # 使用 WGS84 GNSS 座標系 高度為相對於 Home 點高度 # POSITION_TARGET_TYPEMASK # 1 1 Ignore position x # 2 2 Ignore position y # 3 4 Ignore position z # 4 8 Ignore velocity x # 5 16 Ignore velocity y # 6 32 Ignore velocity z # 7 64 Ignore acceleration x # 8 128 Ignore acceleration y # 9 256 Ignore acceleration z # 10 512 Use force instead of acceleration # 11 1024 Ignore yaw # 12 2048 Ignore yaw rate # 13+ Not Used # 6543210987654321 POSITION_ONLY = 0b0000110111111000 # 忽略速度、加速度、偏航 # 回傳 echo 與送出值比對時,浮點欄位允許的絕對誤差(MAVLink float 轉傳常有小差) _FLOAT_ECHO_ABS_TOL = 1e-5 def _float_echo_equal(a: float, b: float) -> bool: return math.isclose(float(a), float(b), rel_tol=0.0, abs_tol=_FLOAT_ECHO_ABS_TOL) def _echo_position_target_matches( *, type_mask: int, lat_int: int, lon_int: int, alt: float, vx: float, vy: float, vz: float, afx: float, afy: float, afz: float, yaw: float, yaw_rate: float, time_boot_ms: int, resp, ) -> tuple[bool, str]: """ 比對本次送出的軌跡欄位與 server 填回的 r_* 是否一致。 若不符,回傳 (False, 簡短說明);相符則 (True, "")。 """ _tm_sent = int(type_mask) & 0xFFF _tm_recv = int(resp.r_type_mask) & 0xFFF if _tm_recv != _tm_sent: return False, f"type_mask echo: sent {_tm_sent:012b} != ack {_tm_recv:012b}" coordinate_tolerance = 5 * 100 if abs(resp.r_lat_int - lat_int) >= coordinate_tolerance: return False, f"lat_int echo: sent {lat_int} != ack {resp.r_lat_int}" if abs(resp.r_lon_int - lon_int) >= coordinate_tolerance: return False, f"lon_int echo: sent {lon_int} != ack {resp.r_lon_int}" # TODO 我給飛控相對高度 它回給我絕對高度 看來這個比較方法 還有很多改良點 但是之後吧 # if not _float_echo_equal(resp.r_alt, alt): # return False, f"alt echo: sent {alt} != ack {resp.r_alt}" if not _float_echo_equal(resp.r_vx, vx): return False, f"vx echo: sent {vx} != ack {resp.r_vx}" if not _float_echo_equal(resp.r_vy, vy): return False, f"vy echo: sent {vy} != ack {resp.r_vy}" if not _float_echo_equal(resp.r_vz, vz): return False, f"vz echo: sent {vz} != ack {resp.r_vz}" if not _float_echo_equal(resp.r_afx, afx): return False, f"afx echo: sent {afx} != ack {resp.r_afx}" if not _float_echo_equal(resp.r_afy, afy): return False, f"afy echo: sent {afy} != ack {resp.r_afy}" if not _float_echo_equal(resp.r_afz, afz): return False, f"afz echo: sent {afz} != ack {resp.r_afz}" if not _float_echo_equal(resp.r_yaw, yaw): return False, f"yaw echo: sent {yaw} != ack {resp.r_yaw}" if not _float_echo_equal(resp.r_yaw_rate, yaw_rate): return False, f"yaw_rate echo: sent {yaw_rate} != ack {resp.r_yaw_rate}" # if int(resp.r_time_boot_ms) != int(time_boot_ms): # return False, f"time_boot_ms echo: sent {time_boot_ms} != r {resp.r_time_boot_ms}" return True, "" @dataclass class PositionTargetGlobalIntResult: """對應 MavPositionTargetGlobalInt.srv 的 Response;success 依「送出與 r_* echo 是否一致」。""" success: bool message: str ack_result: int r_time_boot_ms: int = 0 r_type_mask: int = 0 r_lat_int: int = 0 r_lon_int: int = 0 r_alt: float = 0.0 r_vx: float = 0.0 r_vy: float = 0.0 r_vz: float = 0.0 r_afx: float = 0.0 r_afy: float = 0.0 r_afz: float = 0.0 r_yaw: float = 0.0 r_yaw_rate: float = 0.0 class PositionTargetGlobalIntClient(Node): """ ROS2 client:對 pos_global_int service 發送 SET_POSITION_TARGET_GLOBAL_INT (msg 86), 並等待 POSITION_TARGET_GLOBAL_INT (msg 87) 回應(由 server 端處理)。 """ def __init__(self, service_name: str = DEFAULT_SERVICE_NAME) -> None: if not rclpy.ok(): rclpy.init(args=None) super().__init__("fc_position_target_global_int_client") self._client = self.create_client(MavPositionTargetGlobalInt, service_name) def wait_for_service(self, timeout_sec: float = 3.0) -> bool: return self._client.wait_for_service(timeout_sec=timeout_sec) def _send_position_target_global_int( self, *, target_sysid: int, target_compid: int, time_boot_ms: int, coordinate_frame: int, type_mask: int, lat_int: int, lon_int: int, alt: float, vx: float, vy: float, vz: float, afx: float, afy: float, afz: float, yaw: float, yaw_rate: float, timeout_sec: float, ) -> PositionTargetGlobalIntResult: req = MavPositionTargetGlobalInt.Request() req.target_sysid = target_sysid req.target_compid = target_compid req.time_boot_ms = time_boot_ms req.coordinate_frame = coordinate_frame req.type_mask = type_mask req.lat_int = lat_int req.lon_int = lon_int req.alt = float(alt) req.vx = float(vx) req.vy = float(vy) req.vz = float(vz) req.afx = float(afx) req.afy = float(afy) req.afz = float(afz) req.yaw = float(yaw) req.yaw_rate = float(yaw_rate) req.timeout_sec = float(timeout_sec) future = self._client.call_async(req) rclpy.spin_until_future_complete(self, future, timeout_sec=float(timeout_sec) + 1.0) if not future.done() or future.result() is None: return PositionTargetGlobalIntResult( success=False, message="Service timeout.", ack_result=-1, ) resp = future.result() if resp.ack_result != 0: return PositionTargetGlobalIntResult( success=False, message=resp.message, ack_result=resp.ack_result, ) echo_ok, echo_detail = _echo_position_target_matches( type_mask=type_mask, lat_int=lat_int, lon_int=lon_int, alt=alt, vx=vx, vy=vy, vz=vz, afx=afx, afy=afy, afz=afz, yaw=yaw, yaw_rate=yaw_rate, time_boot_ms=time_boot_ms, resp=resp, ) # out_message = resp.message # if not echo_ok: # out_message = echo_detail if not out_message else f"{echo_detail} | server: {out_message}" return PositionTargetGlobalIntResult( success=echo_ok, message=echo_detail, ack_result=resp.ack_result, r_time_boot_ms=int(resp.r_time_boot_ms), r_type_mask=int(resp.r_type_mask), r_lat_int=int(resp.r_lat_int), r_lon_int=int(resp.r_lon_int), r_alt=float(resp.r_alt), r_vx=float(resp.r_vx), r_vy=float(resp.r_vy), r_vz=float(resp.r_vz), r_afx=float(resp.r_afx), r_afy=float(resp.r_afy), r_afz=float(resp.r_afz), r_yaw=float(resp.r_yaw), r_yaw_rate=float(resp.r_yaw_rate), ) # def set_position_target_global_int( # self, # *, # target_sysid: int, # target_compid: int = 0, # time_boot_ms: int = 0, # coordinate_frame: int = 6, # type_mask: int = 0, # lat_int: int = 0, # lon_int: int = 0, # alt: float = 0.0, # vx: float = 0.0, # vy: float = 0.0, # vz: float = 0.0, # afx: float = 0.0, # afy: float = 0.0, # afz: float = 0.0, # yaw: float = 0.0, # yaw_rate: float = 0.0, # timeout_sec: float = DEFAULT_TIMEOUT_SEC, # ) -> PositionTargetGlobalIntResult: # """ # 送出 SET_POSITION_TARGET_GLOBAL_INT;coordinate_frame 預設 6 (MAV_FRAME_GLOBAL_INT) # 可依飛控/任務覆寫。 # """ # return self._send_position_target_global_int( # target_sysid=target_sysid, # target_compid=target_compid, # time_boot_ms=time_boot_ms, # coordinate_frame=coordinate_frame, # type_mask=type_mask, # lat_int=lat_int, # lon_int=lon_int, # alt=alt, # vx=vx, # vy=vy, # vz=vz, # afx=afx, # afy=afy, # afz=afz, # yaw=yaw, # yaw_rate=yaw_rate, # timeout_sec=timeout_sec, # ) def goto_position_only( self, *, target_sysid: int, target_compid: int = 0, latitude_deg: float, longitude_deg: float, alt: float = 0.0, timeout_sec: float, ): lat_int = int(round(latitude_deg * 1e7)) lon_int = int(round(longitude_deg * 1e7)) return self._send_position_target_global_int( target_sysid=target_sysid, target_compid=target_compid, time_boot_ms= 0, coordinate_frame=MAV_FRAME_GLOBAL_RELATIVE_ALT_INT, type_mask=POSITION_ONLY, lat_int=lat_int, lon_int=lon_int, alt=alt, vx=0, vy=0, vz=0, afx=0, afy=0, afz=0, yaw=0, yaw_rate=0, timeout_sec=timeout_sec, ) # def set_position_target_global_int_deg( # self, # *, # target_sysid: int, # latitude_deg: float, # longitude_deg: float, # target_compid: int = 0, # time_boot_ms: int = 0, # coordinate_frame: int = MAV_FRAME_GLOBAL_RELATIVE_ALT_INT, # type_mask: int = 0, # alt: float = 0.0, # vx: float = 0.0, # vy: float = 0.0, # vz: float = 0.0, # afx: float = 0.0, # afy: float = 0.0, # afz: float = 0.0, # yaw: float = 0.0, # yaw_rate: float = 0.0, # timeout_sec: float = DEFAULT_TIMEOUT_SEC, # ) -> PositionTargetGlobalIntResult: # """以度為單位的緯經,轉成 MAVLink 的 lat_int / lon_int (1e7 * deg)。""" # lat_int = int(round(latitude_deg * 1e7)) # lon_int = int(round(longitude_deg * 1e7)) # return self.set_position_target_global_int( # target_sysid=target_sysid, # target_compid=target_compid, # time_boot_ms=time_boot_ms, # coordinate_frame=coordinate_frame, # type_mask=type_mask, # lat_int=lat_int, # lon_int=lon_int, # alt=alt, # vx=vx, # vy=vy, # vz=vz, # afx=afx, # afy=afy, # afz=afz, # yaw=yaw, # yaw_rate=yaw_rate, # timeout_sec=timeout_sec, # )