From 4dcff44bd3caa6849dfc0987fea1f5b0cc4b9499 Mon Sep 17 00:00:00 2001 From: ken910606 Date: Wed, 16 Apr 2025 22:58:01 +0800 Subject: [PATCH] Upload files to 'src/fc_network_adapter/fc_network_adapter' --- .../fc_network_adapter/mavlinkPublish.py | 249 ++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py new file mode 100644 index 0000000..0c2efb7 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py @@ -0,0 +1,249 @@ + +''' +這個檔案只有一個 class +是作為 mavlinkObject.py 中 mavlink_analyzer 類別的功能衍生 + +主要概念是將 "離散的" mavlink 參數轉換成 ROS topic +包含了創建 publisher 和 以及包裝並丟到 ros2 topic 的 packEmit + +publisher topic name 命名規則為 <前綴詞>/s/<具體 topic> +''' + +import std_msgs.msg +import sensor_msgs.msg +import geometry_msgs.msg +import mavros_msgs.msg + +from theLogger import setup_logger +import math + + +logger = setup_logger("mavlinkPublish.py") + +class mavlink_publisher(): + + prefix_path = 'MavLinkBus' + + def create_flightMode(self, sysid, component_obj): + # target topic name # 請跟這個 method 的名稱保持一致 + target_topic = 'flightMode' + + # 這邊要檢查 flight_mode 是否存在 + try: + _ = component_obj.emitParams['flightMode_mode'] + except KeyError: + # 這個 component id 還不存在 + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + + # 若存在則 建立 publisher object 並回傳 true + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(std_msgs.msg.String, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_flightMode] + + return True + + def packEmit_flightMode(cls, emitParams, publisher): + msg_str = emitParams['flightMode_mode'] + msg = std_msgs.msg.String() + msg.data = msg_str + publisher.publish(msg) + pass + + # ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同 ros2 topic 訊息 請放在這裡 ↓↓↓↓↓↓↓↓↓↓↓↓ + def euler_to_quaternion(cls,roll, pitch, yaw): + qx = math.sin(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) - math.cos(roll/2) * math.sin(pitch/2) * math.sin(yaw/2) + qy = math.cos(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) + qz = math.cos(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) - math.sin(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + qw = math.cos(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.sin(pitch/2) * math.sin(yaw/2) + return [qx, qy, qz, qw] + + def create_attitude(self, sysid, component_obj): + # target topic name # 請跟這個 method 的名稱保持一致 + target_topic = 'attitude' + + # 這邊要檢查 attitude 是否存在 + try: + _ = component_obj.emitParams['attitude'] + except KeyError: + # 這個 component id 還不存在 + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + + # 若存在則 建立 publisher object 並回傳 true + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(sensor_msgs.msg.Imu, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_attitude] + + def packEmit_attitude(self, emitParams, publisher): + mav_msg = emitParams['attitude'] + msg = sensor_msgs.msg.Imu() + x, y, z, w = self.euler_to_quaternion(mav_msg.roll, mav_msg.pitch, mav_msg.yaw) + msg.orientation.x = x + msg.orientation.y = y + msg.orientation.z = z + msg.orientation.w = w + msg.angular_velocity.x = mav_msg.rollspeed + msg.angular_velocity.y = mav_msg.pitchspeed + msg.angular_velocity.z = mav_msg.yawspeed + publisher.publish(msg) + pass + + def create_local_position_pose(self, sysid, component_obj): + target_topic = 'local_position/pose' + try: + _ = component_obj.emitParams['local_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(geometry_msgs.msg.Point, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_pose] + + def packEmit_local_pose(cls, emitParams, publisher): + mav_msg = emitParams['local_position'] + msg = geometry_msgs.msg.Point() + msg.x = mav_msg.x + msg.y = mav_msg.y + msg.z = mav_msg.z + publisher.publish(msg) + pass + + def create_local_position_velocity(self, sysid, component_obj): + target_topic = 'local_position/velocity' + try: + _ = component_obj.emitParams['local_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(geometry_msgs.msg.Vector3, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_vel] + + def packEmit_local_vel(cls, emitParams, publisher): + mav_msg = emitParams['local_position'] + msg = geometry_msgs.msg.Vector3() + msg.x = mav_msg.vx + msg.y = mav_msg.vy + msg.z = mav_msg.vz + publisher.publish(msg) + pass + + def create_global_global(self, sysid, component_obj): + target_topic = 'global_position/global' + try: + _ = component_obj.emitParams['global_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(sensor_msgs.msg.NavSatFix, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_global] + + def packEmit_global_global(cls, emitParams, publisher): + mav_msg = emitParams['global_position'] + msg = sensor_msgs.msg.NavSatFix() + msg.latitude = mav_msg.lat/1e7 + msg.longitude = mav_msg.lon/1e7 + msg.altitude = mav_msg.alt/1e3 + publisher.publish(msg) + pass + + def create_global_rel(self, sysid, component_obj): + target_topic = 'global_position/rel_alt' + try: + _ = component_obj.emitParams['global_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(std_msgs.msg.Float64, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_rel] + + def packEmit_global_rel(cls, emitParams, publisher): + mav_msg = emitParams['global_position'] + msg = std_msgs.msg.Float64() + msg.data = mav_msg.relative_alt/1e3 + publisher.publish(msg) + pass + + def create_global_vel(self, sysid, component_obj): + target_topic = 'global_position/gp_vel' + try: + _ = component_obj.emitParams['global_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(geometry_msgs.msg.Vector3, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_vel] + + def packEmit_global_vel(cls, emitParams, publisher): + mav_msg = emitParams['global_position'] + msg = geometry_msgs.msg.Vector3() + msg.x = mav_msg.vx/1e2 + msg.y = mav_msg.vy/1e2 + msg.z = mav_msg.vz/1e2 + publisher.publish(msg) + pass + + def create_global_hdg(self, sysid, component_obj): + target_topic = 'global_position/compass_hdg' + try: + _ = component_obj.emitParams['global_position'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(std_msgs.msg.Float64, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_hdg] + + def packEmit_global_hdg(cls, emitParams, publisher): + mav_msg = emitParams['global_position'] + msg = std_msgs.msg.Float64() + msg.data = mav_msg.hdg/1e2 + publisher.publish(msg) + pass + + def create_vfr_hud(self, sysid, component_obj): + target_topic = 'vfr_hud' + try: + _ = component_obj.emitParams['vfr_hud'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(mavros_msgs.msg.VfrHud, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_vfr_hud] + + def packEmit_vfr_hud(cls, emitParams, publisher): + mav_msg = emitParams['vfr_hud'] + msg = mavros_msgs.msg.VfrHud() + msg.airspeed = mav_msg.airspeed + msg.groundspeed = mav_msg.groundspeed + msg.heading = mav_msg.heading + msg.throttle = float(mav_msg.throttle) + msg.altitude = mav_msg.alt + msg.climb = mav_msg.climb + publisher.publish(msg) + pass + + def create_battery(self, sysid, component_obj): + target_topic = 'battery' + try: + _ = component_obj.emitParams['battery'] + except KeyError: + logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid'])) + return False + topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic) + publisher_ = self.create_publisher(sensor_msgs.msg.BatteryState, topic_name, 1) + component_obj.publishers[target_topic] = [publisher_, self.packEmit_battery] + + def packEmit_battery(cls, emitParams, publisher): + mav_msg = emitParams['battery'] + msg = sensor_msgs.msg.BatteryState() + msg.voltage = mav_msg.voltages[0]/1e3 + publisher.publish(msg) + pass + + # ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同 ros2 topic 訊息 請放在這裡 ↑↑↑↑↑↑↑↑↑↑↑↑ \ No newline at end of file