MAVLink simple message receiver 接通了 再來要實裝 node topic

chiyu
Chiyu Chen 1 year ago
parent d21bc6b069
commit b6c730f74a

@ -10,7 +10,6 @@ import time
''' '''
# 不同的匯流排只有單一種通訊協定 # 不同的匯流排只有單一種通訊協定
# 匯流排接到訊息後透過 queue stack 傳送到分析器 # 匯流排接到訊息後透過 queue stack 傳送到分析器
# 會有三種 Queue 分別作為 1. 固定串流分析器 2. 回傳封包處理器 3. 轉格式(例如 mavlink to FDM)的轉換器 # 會有三種 Queue 分別作為 1. 固定串流分析器 2. 回傳封包處理器 3. 轉格式(例如 mavlink to FDM)的轉換器
@ -21,13 +20,14 @@ import time
# pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlinkObject 裡面 # pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlinkObject 裡面
# 這邊的 main 會是用來初始化 mavlinkObject 並啟動他的 run function # 這邊的 main 會是用來初始化 mavlinkObject 並啟動他的 run function
''' '''
fixed_stream_analyzer_queue = queue.Queue() fixed_stream_analyzer_queue = queue.Queue()
return_packet_processor_queue = queue.Queue() return_packet_processor_queue = queue.Queue()
converte_format_queues = [] converte_format_queues = []
mavlink_systems = {} # 用來記錄每個 system 的資訊
class MavlinkObject(): class MavlinkObject():
def __init__(self, mavlink_socket, socket_id = None): def __init__(self, mavlink_socket, socket_id = None):
self.socket_id = socket_id self.socket_id = socket_id
@ -35,18 +35,24 @@ class MavlinkObject():
self.msg_count = {} self.msg_count = {}
self.multiplexingList = [] self.multiplexingList = []
# 關聯到全域變數
global mavlink_systems
self.mavlink_systems = mavlink_systems
# 這三個 list 用來分配不同的訊息到不同的 queue
self.multiplexingToAnalysis = [ self.multiplexingToAnalysis = [
# 0, # HEARTBEAT 0, # HEARTBEAT
24, # GPS_RAW_INT
30, # ATTITUDE 30, # ATTITUDE
33, # GLOBAL_POSITION_INT 33, # GLOBAL_POSITION_INT
# 147, # BATTERY_STATUS 74, # VFR_HUD
147, # BATTERY_STATUS
] ]
self.multiplexingToReturn = [ self.multiplexingToReturn = [
0, # HEARTBEAT 0, # HEARTBEAT
# 30, # ATTITUDE # 30, # ATTITUDE
] ]
self.multiplexingToConvert = [ self.multiplexingToConvert = [
[74, ] # VFR_HUD
] ]
def __del__(self): def __del__(self):
self.mavlink_socket.close() self.mavlink_socket.close()
@ -66,6 +72,7 @@ class MavlinkObject():
def _run_thread(self): def _run_thread(self):
start_time = time.time() start_time = time.time()
while self.running: while self.running:
timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差
try: try:
mavlinkMsg = self.mavlink_socket.recv_msg() mavlinkMsg = self.mavlink_socket.recv_msg()
except Exception as e: except Exception as e:
@ -75,7 +82,13 @@ class MavlinkObject():
break break
if mavlinkMsg: if mavlinkMsg:
self.count_mavlink_type(mavlinkMsg) # 統計收到的訊息 # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間
sysid = mavlinkMsg.get_srcSystem()
compid = mavlinkMsg.get_srcComponent()
if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息
# mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型
mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp)
# print(mavlinkMsg) # data type 'pymavlink.dialects.v20.ardupilotmega.MAVLink_attitude_message' # debug # print(mavlinkMsg) # data type 'pymavlink.dialects.v20.ardupilotmega.MAVLink_attitude_message' # debug
# break # Debug # break # Debug
@ -83,115 +96,313 @@ class MavlinkObject():
for i in range(len(self.multiplexingList)): for i in range(len(self.multiplexingList)):
if mavlinkMsg.get_msgId() in self.multiplexingList[i]: if mavlinkMsg.get_msgId() in self.multiplexingList[i]:
if i == 0: if i == 0:
fixed_stream_analyzer_queue.put((self.socket_id,mavlinkMsg)) fixed_stream_analyzer_queue.put((self.socket_id,timestamp,mavlinkMsg))
elif i == 1: elif i == 1:
return_packet_processor_queue.put((self.socket_id,mavlinkMsg)) return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg))
else: else:
convert_queue = converte_format_queues[i-2] convert_queue = converte_format_queues[i-2]
convert_queue.put((self.socket_id,mavlinkMsg)) convert_queue.put((self.socket_id,timestamp,mavlinkMsg))
# 每秒中 統計一次 收到的訊息總量
elapsed_time = time.time() - start_time
if elapsed_time > 1:
print('[mavlinkObject.py] Messages Type received (in about 1 sec) :')
print(self.msg_count)
start_time = time.time()
self.msg_count = {}
# thread 結束 # thread 結束
print('[mavlinkObject.py] End of _run_thread') print('[mavlinkObject.py] End of MavlinkObject._run_thread')
def count_mavlink_type(self, mavlinkMsg):
msg_type = mavlinkMsg.get_type()
if msg_type in self.msg_count:
self.msg_count[msg_type] += 1
else:
self.msg_count[msg_type] = 1
def publish_mavlink_data(self, mavlinkMsg):
msg = String()
msg.data = str(mavlinkMsg)
self.publisher_.publish(msg)
def updateMultiplexingList(self): def updateMultiplexingList(self):
'''
更新 multiplexing list 並做簡單的檢查
'''
# 更新 multiplexing list # 更新 multiplexing list
self.multiplexingList = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToConvert self.multiplexingList = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToConvert
# 檢查 multiplexing list 格式是否有錯誤
check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in self.multiplexingList) check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in self.multiplexingList)
# 若有錯誤則回傳 False
if not check: if not check:
print('[mavlinkObject.py] Multiplexing List Error, Please check the list') print('[mavlinkObject.py] Multiplexing List Format Error, Please check the list')
return False # 若有錯誤則回傳 False
check = len(self.multiplexingToConvert) != len(converte_format_queues)
if check:
print('[mavlinkObject.py] Multiplexing Queue not fit List , Please check the list')
return False return False
return True return True
# ===================== 分割線 =====================================
class mavlink_analyzer(Node): class mavlink_analyzer(Node):
''' '''
這個 class 是用來接收 mavlink 訊息 並進行分析 這個 class 就是 固定串流分析器
是用來接收 mavlink 訊息 並進行分析
這個地方是針對 fixed_stream_analyzer_queue 的資料做處理的
記錄有 mavlink bus 上有那些 system id component id 記錄有 mavlink bus 上有那些 system id component id
為了每個 system id 都有一個對應的 device object 為了每個 system id 都有一個對應的 device object
並且看是否有重複 system id 並且看是否有重複 system id
也許可以用 node 去創建 這樣就可以直接進到 ros2 執行緒? 整段代碼包含兩大區塊 thread node
thread 區塊內會對 fixed_stream_analyzer_queue 進行監聽 並且將收到的訊息進行處理
其中 HEARTBEAT 是一個特殊類別 用來初始化整個 device object
node 區塊則是處理 ros2 publisher subscriber 訂閱相關
藉由控制 ros2 的機制再把 device object 的資訊發送出去
ps. 我限制了這個 class 只能有一個 instance
''' '''
_instance = None
_lock = threading.Lock() # 確保多線程安全
def __new__(cls, *args, **kwargs):
with cls._lock: # 確保多線程環境下只有一個實例被創建
if cls._instance is None:
cls._instance = super(mavlink_analyzer, cls).__new__(cls)
return cls._instance
def __init__(self): def __init__(self):
super().__init__('mavlink_analyzer') if not hasattr(self, "initialized"): # 防止重複初始化
self.initialized = True
rclpy.init()
super().__init__('mavlink_analyzer')
# 關聯到全域變數
global mavlink_systems
self.mavlink_systems = mavlink_systems
# 當 object 建立時會直接運行 thread 直到消滅
self.running = True
self.thread = threading.Thread(target=self._run_thread)
self.thread.start()
else:
print('[mavlinkObject.py] WARNING : mavlink_analyzer instance already exists')
def stop(self):
self.running = False
# === Thread 區塊 ===
def _run_thread(self):
start_time = time.time() # debug
# 從 Queue fixed_stream_analyzer_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
while self.running:
if fixed_stream_analyzer_queue.empty():
continue
msg_pack = fixed_stream_analyzer_queue.get()
msg = msg_pack[2]
sysid = msg.get_srcSystem()
# self.publisher_ = self.create_publisher(String, 'mavlink_data', 10) if msg.get_msgId() == 0: # HEARTBEAT 處理
# self.mavlink_object = None
# self.create_mavlink_object()
def create_mavlink_object(self): # 若這個 system id 還不存在 執行完整建立 device object 的流程
# connection_string="udp: if not sysid in self.mavlink_systems:
device_object = mavlink_device() # 創建一個新的 device object
self.mavlink_systems[sysid] = device_object
device_object.socket_id = msg_pack[0]
device_object.sysid = sysid
this_component = device_object.mavlink_component() # 創建一個新的 component object
device_object.components[msg.get_srcComponent()] = this_component
this_component.mav_type = msg.type
this_component.mav_autopilot = msg.autopilot
# 初始化 emitParams 方便之後更新與利用
this_component.emitParams.add_parameter('base_mode')
this_component.emitParams.add_parameter('flight_mode')
# TODO 這邊先不過度設計 但是也許之後要依照不同的載具類型再定義
# for BATTERY_STATUS (147)
this_component.emitParams.add_parameter('battery_voltage')
# for GPS_RAW_INT (24)
this_component.emitParams.add_parameter('gps_fix_type')
# for ATTITUDE (30)
this_component.emitParams.add_parameter('roll')
this_component.emitParams.add_parameter('pitch')
this_component.emitParams.add_parameter('yaw')
# for GLOBAL_POSITION_INT (33)
this_component.emitParams.add_parameter('latitude')
this_component.emitParams.add_parameter('longitude')
this_component.emitParams.add_parameter('altitude')
# for VFR_HUD (74)
this_component.emitParams.add_parameter('heading')
this_component.emitParams.base_mode = msg.base_mode
this_component.emitParams.flight_mode = mavutil.mode_string_v10(msg)
elif msg.get_msgId() == 147: # BATTERY_STATUS 處理
this_component = self.mavlink_systems[sysid].components[msg.get_srcComponent()]
# 若未定義的訊息類型則不處理 並跳出訊息
else:
print('[mavlinkObject.py] Warning This Message Type Did not define process method : ',msg.get_msgId(), get_type())
continue
print('[mavlinkObject.py] End of mavlink_analyzer._run_thread')
# === Node 區塊 ===
def _init_node(self):
pass pass
# self.publisher = self.create_publisher(String, 'mavlink_analyzer', 10)
# self.subscription = self.create_subscription(String, 'mavlink_analyzer', self.listener_callback, 10)
def emit_info(self):
# 這邊將 mavlink_systems 內所有的 device object 內所有的 component 輪循過
# 把 emitParams 的參數發送出去
for sysid in self.mavlink_systems:
for compid in self.mavlink_systems[sysid].components:
# 先抓 base_mode 驗證看看算法是否正確
msg_str = self.mavlink_systems[sysid].components[compid].base_mode
self.publisher.publish(msg_str)
# self.get_logger().info('Publishing: "%s"' % msg.data)
# 這個 class 是用來記錄每個 system 的資訊 每個 system 可能會有多個 component
class mavlink_device(): class mavlink_device():
def __init__(self): def __init__(self):
self.socket_id = None # 記錄來自於哪個 socket self.socket_id = None # 記錄來自於哪個 socket
self.sysid = None self.sysid = None
self.component_count = 1 self.components = {} # 用來記錄每個 component 的資訊 key 是 compid, value 是 component object
self.compid_list = []
def __str__(self):
p_str = ''
p_str += f'socket_id : {self.socket_id}\n'
p_str += f'sysid : {self.sysid}\n'
p_str += 'has components : \n'
for compid in self.components:
p_str += f'compid : {compid}\n'
p_str += f'mav_type : {self.components[compid].mav_type}\n'
p_str += '=====================\n'
return p_str
def updateComponentPacketCount(self, compid, current_seq, current_type, current_time):
# 這段檢查遺失封包
try:
last_seq = self.components[compid].msg_seq
except KeyError:
# 這個 component id 還不存在
print('[mavlinkObject.py] Warning System ID : {} This Component ID : {} Did not init yet'.format(self.sysid, compid))
return
if last_seq != None:
expected_seq = (last_seq + 1) % 256
diff = current_seq - expected_seq
# print("current last exp diff : ",current_seq, last_seq, expected_seq, diff) # debug
if diff < 0:
diff += 256
self.components[compid].lost_packet_count += diff
# 這段更新封包的基本資訊
self.components[compid].msg_seq = current_seq
self.components[compid].last_msg_time = current_time
if current_type in self.components[compid].msg_count:
self.components[compid].msg_count[current_type] += 1
else:
self.components[compid].msg_count[current_type] = 1
def resetComponentPacketCount(self, compid):
self.components[compid].msg_count = {}
self.components[compid].msg_seq = None
self.components[compid].lost_packet_count = 0
class mavlink_component():
# 這個 class 用來存放該 component 參數,然而這些參數是固定以一個頻率要輸出給 ros2 topic
class emitParamSet():
def __init__(self):
pass
def add_parameter(self, param_name):
setattr(self, param_name, None)
def __init__(self):
self.mav_type = None # 表示 Vehicle 或 component type (例如: 旋翼機是2, 雲台是26, GCS是6)
self.mav_autopilot = None # 表示 autopilot type (例如: ardupilot是3, px4是12)
self.mav_system_status = None # 表示 system status (例如: active是3, standby是4)
# 紀錄從這個 component 收到最後的訊息序號和時間
self.msg_count = {}
self.msg_seq = None
self.lost_packet_count = 0
self.last_msg_time = 0
# 存放該 emit component 參數的區域
self.emitParams = self.emitParamSet()
def main(args=None): def main(args=None):
# print('Start of Program .Test 1')
# # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# connection_string="udp:127.0.0.1:14550"
# mavlink_socket = mavutil.mavlink_connection(connection_string)
# mavlink_object = MavlinkObject(mavlink_socket, 1)
# mavlink_object.multiplexingToAnalysis = [30] # only ATTITUDE
# mavlink_object.multiplexingToReturn = [0] # only HEARTBEAT
# mavlink_object.multiplexingToConvert = [[74,]] # only VFR_HUD
# # 做一個空的 queue list 驗證 multiplexingToConvert
# converte_format_queues.append(queue.Queue())
# # 啟動連線的模組
# mavlink_object.run()
# # 運行幾秒並印出 queue 的資料
# start_time = time.time()
# while time.time() - start_time < 2:
# while not fixed_stream_analyzer_queue.empty():
# print('fixed_stream_analyzer_queue:')
# t = fixed_stream_analyzer_queue.get()
# print('from {} : {}'.format(t[0],t[2]))
# while not return_packet_processor_queue.empty():
# print('return_packet_processor_queue:')
# t = return_packet_processor_queue.get()
# # print('from {} : {}'.format(t[0],t[2]))
# print(t[2].type)
# for q in converte_format_queues:
# while not q.empty():
# print('converte_format_queues:')
# t = q.get()
# print('from {} : {}'.format(t[0],t[2]))
# # 結束程式 退出所有 thread
# mavlink_object.running = False
# mavlink_object.thread.join()
# mavlink_socket.close()
# print('End of Program .Test 1')
# # # ========================================================================
print('Start of Program .Test 2')
connection_string="udp:127.0.0.1:14550" connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string) mavlink_socket = mavutil.mavlink_connection(connection_string)
# 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
mavlink_object = MavlinkObject(mavlink_socket, 1) mavlink_object = MavlinkObject(mavlink_socket, 1)
mavlink_object.multiplexingToAnalysis = [30] # only ATTITUDE mavlink_object.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object.multiplexingToReturn = [0] # only HEARTBEAT mavlink_object.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object.multiplexingToConvert = [[74,]] # only VFR_HUD mavlink_object.multiplexingToConvert = [] #
converte_format_queues.append(queue.Queue())
# 啟動連線的模組
mavlink_object.run() mavlink_object.run()
start_time = time.time() # 啟動 mavlink_analyzer
while time.time() - start_time < 2: analyzer = mavlink_analyzer()
compid = 1
while not fixed_stream_analyzer_queue.empty():
print('fixed_stream_analyzer_queue:')
t = fixed_stream_analyzer_queue.get()
print('from {} : {}'.format(t[0],t[1]))
while not return_packet_processor_queue.empty():
print('return_packet_processor_queue:')
t = return_packet_processor_queue.get()
print('from {} : {}'.format(t[0],t[1]))
for q in converte_format_queues:
while not q.empty():
print('converte_format_queues:')
t = q.get()
print('from {} : {}'.format(t[0],t[1]))
# 運行幾秒
mavlink_object.running = False start_time = time.time()
show_time = time.time()
while time.time() - start_time < 10:
if (time.time() - show_time) >= 1:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count)
print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams.flight_mode)
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# 結束程式 退出所有 thread
mavlink_object.stop()
mavlink_object.thread.join() mavlink_object.thread.join()
print('End of Program') analyzer.stop()
analyzer = mavlink_analyzer() # 這邊是測試是否只有一個 instance
mavlink_socket.close()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

@ -205,6 +205,21 @@ def mavlink_btye_generator_test():
custom_mode=0, custom_mode=0,
system_status=mavutil.mavlink.MAV_STATE_ACTIVE system_status=mavutil.mavlink.MAV_STATE_ACTIVE
) )
msg = mav.command_long_encode(
target_system=1,
target_component=1,
command=mavutil.mavlink.MAV_CMD_DO_SET_MODE,
confirmation=0,
param1=0, # Custom mode
param2=0, # Custom sub-mode
param3=0,
param4=0,
param5=0,
param6=0,
param7=0
)
# 取得 MAVLink 訊息的 bytes # 取得 MAVLink 訊息的 bytes
mavlink_bytes = msg.pack(mav) mavlink_bytes = msg.pack(mav)
@ -219,6 +234,19 @@ def mavlink_btye_generator_test():
print(dir(mav2)) print(dir(mav2))
# print(my_msg) # print(my_msg)
def simple_getMavlink():
connection_string="udp:127.0.0.1:14550"
# connection_string='/dev/ttyUSB0'
connection = mavutil.mavlink_connection(connection_string)
while True:
msg = connection.recv_msg()
if msg:
print(msg)
else:
print('No message yet, 0.1 second for data to fill')
sleep(0.1)
print('Start') print('Start')
@ -226,5 +254,6 @@ print('Start')
# fdm_parser_example() # fdm_parser_example()
# mavlink_analyzer_simple(8) # mavlink_analyzer_simple(8)
# mavlink_btye_generator_test() # mavlink_btye_generator_test()
simple_getMavlink()

Loading…
Cancel
Save