forked from chiyu1468/AirTrapMine
Upload files to 'src/fc_network_adapter/fc_network_adapter'
parent
c38f277db4
commit
4dcff44bd3
@ -0,0 +1,249 @@
|
|||||||
|
|
||||||
|
'''
|
||||||
|
這個檔案只有一個 class
|
||||||
|
是作為 mavlinkObject.py 中 mavlink_analyzer 類別的功能衍生
|
||||||
|
|
||||||
|
主要概念是將 "離散的" mavlink 參數轉換成 ROS topic
|
||||||
|
包含了創建 publisher 和 以及包裝並丟到 ros2 topic 的 packEmit
|
||||||
|
|
||||||
|
publisher topic name 命名規則為 <前綴詞>/s<sysid>/<具體 topic>
|
||||||
|
'''
|
||||||
|
|
||||||
|
import std_msgs.msg
|
||||||
|
import sensor_msgs.msg
|
||||||
|
import geometry_msgs.msg
|
||||||
|
import mavros_msgs.msg
|
||||||
|
|
||||||
|
from theLogger import setup_logger
|
||||||
|
import math
|
||||||
|
|
||||||
|
|
||||||
|
logger = setup_logger("mavlinkPublish.py")
|
||||||
|
|
||||||
|
class mavlink_publisher():
|
||||||
|
|
||||||
|
prefix_path = 'MavLinkBus'
|
||||||
|
|
||||||
|
def create_flightMode(self, sysid, component_obj):
|
||||||
|
# target topic name # 請跟這個 method 的名稱保持一致
|
||||||
|
target_topic = 'flightMode'
|
||||||
|
|
||||||
|
# 這邊要檢查 flight_mode 是否存在
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['flightMode_mode']
|
||||||
|
except KeyError:
|
||||||
|
# 這個 component id 還不存在
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 若存在則 建立 publisher object 並回傳 true
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(std_msgs.msg.String, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_flightMode]
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def packEmit_flightMode(cls, emitParams, publisher):
|
||||||
|
msg_str = emitParams['flightMode_mode']
|
||||||
|
msg = std_msgs.msg.String()
|
||||||
|
msg.data = msg_str
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同 ros2 topic 訊息 請放在這裡 ↓↓↓↓↓↓↓↓↓↓↓↓
|
||||||
|
def euler_to_quaternion(cls,roll, pitch, yaw):
|
||||||
|
qx = math.sin(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) - math.cos(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
|
||||||
|
qy = math.cos(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.cos(pitch/2) * math.sin(yaw/2)
|
||||||
|
qz = math.cos(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) - math.sin(roll/2) * math.sin(pitch/2) * math.cos(yaw/2)
|
||||||
|
qw = math.cos(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
|
||||||
|
return [qx, qy, qz, qw]
|
||||||
|
|
||||||
|
def create_attitude(self, sysid, component_obj):
|
||||||
|
# target topic name # 請跟這個 method 的名稱保持一致
|
||||||
|
target_topic = 'attitude'
|
||||||
|
|
||||||
|
# 這邊要檢查 attitude 是否存在
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['attitude']
|
||||||
|
except KeyError:
|
||||||
|
# 這個 component id 還不存在
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 若存在則 建立 publisher object 並回傳 true
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(sensor_msgs.msg.Imu, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_attitude]
|
||||||
|
|
||||||
|
def packEmit_attitude(self, emitParams, publisher):
|
||||||
|
mav_msg = emitParams['attitude']
|
||||||
|
msg = sensor_msgs.msg.Imu()
|
||||||
|
x, y, z, w = self.euler_to_quaternion(mav_msg.roll, mav_msg.pitch, mav_msg.yaw)
|
||||||
|
msg.orientation.x = x
|
||||||
|
msg.orientation.y = y
|
||||||
|
msg.orientation.z = z
|
||||||
|
msg.orientation.w = w
|
||||||
|
msg.angular_velocity.x = mav_msg.rollspeed
|
||||||
|
msg.angular_velocity.y = mav_msg.pitchspeed
|
||||||
|
msg.angular_velocity.z = mav_msg.yawspeed
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_local_position_pose(self, sysid, component_obj):
|
||||||
|
target_topic = 'local_position/pose'
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['local_position']
|
||||||
|
except KeyError:
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(geometry_msgs.msg.Point, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_pose]
|
||||||
|
|
||||||
|
def packEmit_local_pose(cls, emitParams, publisher):
|
||||||
|
mav_msg = emitParams['local_position']
|
||||||
|
msg = geometry_msgs.msg.Point()
|
||||||
|
msg.x = mav_msg.x
|
||||||
|
msg.y = mav_msg.y
|
||||||
|
msg.z = mav_msg.z
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_local_position_velocity(self, sysid, component_obj):
|
||||||
|
target_topic = 'local_position/velocity'
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['local_position']
|
||||||
|
except KeyError:
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(geometry_msgs.msg.Vector3, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_vel]
|
||||||
|
|
||||||
|
def packEmit_local_vel(cls, emitParams, publisher):
|
||||||
|
mav_msg = emitParams['local_position']
|
||||||
|
msg = geometry_msgs.msg.Vector3()
|
||||||
|
msg.x = mav_msg.vx
|
||||||
|
msg.y = mav_msg.vy
|
||||||
|
msg.z = mav_msg.vz
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_global_global(self, sysid, component_obj):
|
||||||
|
target_topic = 'global_position/global'
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['global_position']
|
||||||
|
except KeyError:
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(sensor_msgs.msg.NavSatFix, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_global]
|
||||||
|
|
||||||
|
def packEmit_global_global(cls, emitParams, publisher):
|
||||||
|
mav_msg = emitParams['global_position']
|
||||||
|
msg = sensor_msgs.msg.NavSatFix()
|
||||||
|
msg.latitude = mav_msg.lat/1e7
|
||||||
|
msg.longitude = mav_msg.lon/1e7
|
||||||
|
msg.altitude = mav_msg.alt/1e3
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_global_rel(self, sysid, component_obj):
|
||||||
|
target_topic = 'global_position/rel_alt'
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['global_position']
|
||||||
|
except KeyError:
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(std_msgs.msg.Float64, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_rel]
|
||||||
|
|
||||||
|
def packEmit_global_rel(cls, emitParams, publisher):
|
||||||
|
mav_msg = emitParams['global_position']
|
||||||
|
msg = std_msgs.msg.Float64()
|
||||||
|
msg.data = mav_msg.relative_alt/1e3
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_global_vel(self, sysid, component_obj):
|
||||||
|
target_topic = 'global_position/gp_vel'
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['global_position']
|
||||||
|
except KeyError:
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(geometry_msgs.msg.Vector3, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_vel]
|
||||||
|
|
||||||
|
def packEmit_global_vel(cls, emitParams, publisher):
|
||||||
|
mav_msg = emitParams['global_position']
|
||||||
|
msg = geometry_msgs.msg.Vector3()
|
||||||
|
msg.x = mav_msg.vx/1e2
|
||||||
|
msg.y = mav_msg.vy/1e2
|
||||||
|
msg.z = mav_msg.vz/1e2
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_global_hdg(self, sysid, component_obj):
|
||||||
|
target_topic = 'global_position/compass_hdg'
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['global_position']
|
||||||
|
except KeyError:
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(std_msgs.msg.Float64, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_hdg]
|
||||||
|
|
||||||
|
def packEmit_global_hdg(cls, emitParams, publisher):
|
||||||
|
mav_msg = emitParams['global_position']
|
||||||
|
msg = std_msgs.msg.Float64()
|
||||||
|
msg.data = mav_msg.hdg/1e2
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_vfr_hud(self, sysid, component_obj):
|
||||||
|
target_topic = 'vfr_hud'
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['vfr_hud']
|
||||||
|
except KeyError:
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(mavros_msgs.msg.VfrHud, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_vfr_hud]
|
||||||
|
|
||||||
|
def packEmit_vfr_hud(cls, emitParams, publisher):
|
||||||
|
mav_msg = emitParams['vfr_hud']
|
||||||
|
msg = mavros_msgs.msg.VfrHud()
|
||||||
|
msg.airspeed = mav_msg.airspeed
|
||||||
|
msg.groundspeed = mav_msg.groundspeed
|
||||||
|
msg.heading = mav_msg.heading
|
||||||
|
msg.throttle = float(mav_msg.throttle)
|
||||||
|
msg.altitude = mav_msg.alt
|
||||||
|
msg.climb = mav_msg.climb
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def create_battery(self, sysid, component_obj):
|
||||||
|
target_topic = 'battery'
|
||||||
|
try:
|
||||||
|
_ = component_obj.emitParams['battery']
|
||||||
|
except KeyError:
|
||||||
|
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
|
||||||
|
return False
|
||||||
|
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
|
||||||
|
publisher_ = self.create_publisher(sensor_msgs.msg.BatteryState, topic_name, 1)
|
||||||
|
component_obj.publishers[target_topic] = [publisher_, self.packEmit_battery]
|
||||||
|
|
||||||
|
def packEmit_battery(cls, emitParams, publisher):
|
||||||
|
mav_msg = emitParams['battery']
|
||||||
|
msg = sensor_msgs.msg.BatteryState()
|
||||||
|
msg.voltage = mav_msg.voltages[0]/1e3
|
||||||
|
publisher.publish(msg)
|
||||||
|
pass
|
||||||
|
|
||||||
|
# ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同 ros2 topic 訊息 請放在這裡 ↑↑↑↑↑↑↑↑↑↑↑↑
|
||||||
Loading…
Reference in New Issue