1. 重要參數命名變更

2. mavlink_object 建構式變更
3. 把開發代碼獨立到額外檔案 devRun.py
chiyu
Chiyu Chen 1 year ago
parent 5c08a961cf
commit 0af71f8c8e

@ -0,0 +1,166 @@
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
import mavlinkObject as mo
# ====================== 分割線 =====================
test_item = 3
print('test_item : ', test_item)
if test_item == 1:
# 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# 只啟用了 mavlink_object 的功能
print('Start of Program .Test 1')
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket)
mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE
mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT
mavlink_object1.multiplexingToConvert = [[74,]] # only VFR_HUD
# 做一個空的 queue list 驗證 multiplexingToConvert
mo.swap_queues.append(queue.Queue())
# 啟動連線的模組
mavlink_object1.run()
# 運行幾秒並印出 queue 的資料
start_time = time.time()
while time.time() - start_time < 2:
while not mo.fixed_stream_bridge_queue.empty():
print('fixed_stream_bridge_queue:')
t = mo.fixed_stream_bridge_queue.get()
print('from {} : {}'.format(t[0],t[2]))
while not mo.return_packet_processor_queue.empty():
print('return_packet_processor_queue:')
t = mo.return_packet_processor_queue.get()
# print('from {} : {}'.format(t[0],t[2]))
print(t[2].type)
for q in mo.swap_queues:
while not q.empty():
print('swap_queues:')
t = q.get()
print('from {} : {}'.format(t[0],t[2]))
# 結束程式 退出所有 thread
mavlink_object1.running = False
mavlink_object1.thread.join()
mavlink_socket.close()
print('End of Program .Test 1')
elif test_item == 2:
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
# 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能
print('Start of Program .Test 2')
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object2 = mo.mavlink_object(mavlink_socket)
mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object2.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object2.multiplexingToConvert = [] #
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 啟動連線的模組
mavlink_object2.run()
start_time = time.time()
show_time = time.time()
compid = 1
while time.time() - start_time < 5:
if (time.time() - show_time) >= 1:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count)
print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
# 結束程式 退出所有 thread
mavlink_object2.stop()
mavlink_object2.thread.join()
analyzer.stop()
analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance
analyzer.thread.join()
mavlink_socket.close()
print('End of Program .Test 2')
elif test_item == 3:
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('Start of Program .Test 3')
rclpy.init() # 注意要初始化 rclpy 才能使用 node
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
mavlink_object3.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object3.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object3.multiplexingToConvert = [] #
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 啟動連線的模組
mavlink_object3.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < 10:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(0.5)
except KeyboardInterrupt:
break
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('End of Program .Test 3')

@ -1,9 +1,13 @@
# 自定義的 import
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkDevice.py")
# 這個 class 是用來記錄每個 system 的資訊 每個 system 可能會有多個 component
class mavlink_device():
def __init__(self):

@ -1,9 +1,9 @@
'''
# 不同的匯流排只有單一種通訊協定
# 匯流排接到訊息後透過 queue stack 傳送到分析
# 會有三種 Queue 分別作為 1. 固定串流分析器 2. 回傳封包處理器 3. 轉格式(例如 mavlink to FDM)的轉換器
# 匯流排接到訊息後透過 queue stack 傳送到橋接
# 會有三種 Queue 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 3. 轉拋串流
# 第三種比較麻煩 會需要用 list 管理多個轉換器的 queue
# 分析器會將解析得到的結論 再透過 ros2 的 publisher 發送出去
# 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去
# 關於 mavlink 採用 pymavlink 這個套件 製作匯流排
# pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlink_object 裡面
@ -21,21 +21,19 @@ from pymavlink import mavutil
# ROS2 的 import
from rclpy.node import Node
import std_msgs.msg
# 自定義的 import
from mavlinkDevice import mavlink_device
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
fixed_stream_analyzer_queue = queue.Queue()
fixed_stream_bridge_queue = queue.Queue()
return_packet_processor_queue = queue.Queue()
converte_format_queues = []
swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer
# 用來記錄每個 system 的資訊
# 資料格式 { sysid : mavlink_device object }
@ -43,18 +41,18 @@ mavlink_systems = {}
# ====================== 分割線 =====================
class mavlink_analyzer(Node, mavlink_publisher):
class mavlink_bridge(Node, mavlink_publisher):
'''
這個 class 就是 固定串流分析
是用來接收 mavlink 訊息 並進行分析
這個地方是針對 fixed_stream_analyzer_queue 的資料做處理的
這個 class 就是 固定串流橋接
是用來接收 mavlink 訊息 並進行橋接
這個地方是針對 fixed_stream_bridge_queue 的資料做處理的
記錄有 mavlink bus 上有那些 system id component id
為了每個 system id 都有一個對應的 device object
並且看是否有重複 system id
整段代碼包含兩大區塊 thread node
thread 區塊內會對 fixed_stream_analyzer_queue 進行監聽 並且將收到的訊息進行處理
thread 區塊內會對 fixed_stream_bridge_queue 進行監聽 並且將收到的訊息進行處理
其中 HEARTBEAT 是一個特殊類別 用來初始化整個 device object
node 區塊則是處理 ros2 publisher subscriber 訂閱相關
@ -69,7 +67,7 @@ class mavlink_analyzer(Node, mavlink_publisher):
def __new__(cls, *args, **kwargs):
with cls._lock: # 確保多線程環境下只有一個實例被創建
if cls._instance is None:
cls._instance = super(mavlink_analyzer, cls).__new__(cls)
cls._instance = super(mavlink_bridge, cls).__new__(cls)
return cls._instance
def __init__(self):
@ -85,7 +83,7 @@ class mavlink_analyzer(Node, mavlink_publisher):
self.thread = threading.Thread(target=self._run_thread)
self.thread.start()
else:
logger.error('mavlink_analyzer instance already exists. Do not create another one.')
logger.error('mavlink_bridge instance already exists. Do not create another one.')
def stop(self):
self.running = False
@ -93,12 +91,12 @@ class mavlink_analyzer(Node, mavlink_publisher):
# === Thread 區塊 ===
def _run_thread(self):
# start_time = time.time() # debug
logger.info('Start of mavlink_analyzer._run_thread')
# 從 Queue fixed_stream_analyzer_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
logger.info('Start of mavlink_bridge._run_thread')
# 從 Queue fixed_stream_bridge_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
while self.running:
if fixed_stream_analyzer_queue.empty():
if fixed_stream_bridge_queue.empty():
continue
msg_pack = fixed_stream_analyzer_queue.get()
msg_pack = fixed_stream_bridge_queue.get()
msg = msg_pack[2]
sysid = msg.get_srcSystem()
@ -132,12 +130,12 @@ class mavlink_analyzer(Node, mavlink_publisher):
logger.warning('This Message Type Did not define process method : {} / {}'.format(msg.get_msgId(), msg.get_type()))
continue
logger.info('End of mavlink_analyzer._run_thread')
logger.info('End of mavlink_bridge._run_thread')
# === Node 區塊 ===
def _init_node(self):
logger.info('Start of mavlink_analyzer._init_node')
super().__init__('mavlink_analyzer') # TODO 不知道為何 這句耗時超長 可以到 5~10 秒
logger.info('Start of mavlink_bridge._init_node')
super().__init__('mavlink_bridge') # TODO 不知道為何 這句耗時超長 可以到 5~10 秒
def emit_info(self):
# 這邊將 mavlink_systems 內所有的 device object 內所有的 component 輪循過
@ -163,14 +161,31 @@ class mavlink_object():
每個 mavlink bus 都會有一個 mavlink_object
其中主要是 thread 做統計封包與分流
分流表的控制在三個 list 分別為
multiplexingToAnalysis : 這個 list 是用來分流到固定串流分析器的
multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的
multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的
multiplexingToConvert : 這個 list 是用來分流到轉格式
multiplexingToConvert : 這個 list 是用來分流到轉拋串流
透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為
'''
def __init__(self, mavlink_socket, socket_id = None):
self.socket_id = socket_id
self.mavlink_socket = mavlink_socket
mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) }
def __new__(cls, *args, **kwargs):
# 每創建一個實例 就將其添加到 mavlinkObjects 列表中
# 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號
# 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號
# socket_id 從 1 開始
instance = super().__new__(cls)
socket_id = 1
for k in cls.mavlinkObjects.keys():
if k == socket_id:
socket_id += 1
else:
break
instance.socket_id = socket_id
cls.mavlinkObjects[socket_id] = instance
return instance
def __init__(self, socket):
self.mavlink_socket = socket
self.msg_count = {}
self.multiplexingList = []
@ -193,15 +208,14 @@ class mavlink_object():
]
self.multiplexingToConvert = [
]
logger.info('mavlink_object instance {} created'.format(self.socket_id))
def __del__(self):
self.mavlink_socket.close()
self.stop()
self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance
def run(self):
if not self.socket_id:
logger.error('Please set socket id before running')
return
self.thread = threading.Thread(target=self._run_thread)
self.running = self.updateMultiplexingList()
self.thread.start()
@ -214,6 +228,8 @@ class mavlink_object():
start_time = time.time()
while self.running:
timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差
# 處理接收到的封包
try:
mavlinkMsg = self.mavlink_socket.recv_msg()
except Exception as e:
@ -223,7 +239,7 @@ class mavlink_object():
self.running = False
break
if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega.MAVLink_attitude_message'
if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega. etc...'
# 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間
sysid = mavlinkMsg.get_srcSystem()
compid = mavlinkMsg.get_srcComponent()
@ -231,19 +247,25 @@ class mavlink_object():
# mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型
mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp)
# print(mavlinkMsg) # debug
# print(type(mavlinkMsg.pack)) # debug
# print("mark A")
# # print(dir(mavlinkMsg)) # debug
# print(mavlinkMsg.pack()) # debug
# break # Debug
# 將訊息依照 multiplexing list 分發到不同的 queue
for i in range(len(self.multiplexingList)):
if mavlinkMsg.get_msgId() in self.multiplexingList[i]:
if i == 0:
fixed_stream_analyzer_queue.put((self.socket_id,timestamp,mavlinkMsg))
fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg))
elif i == 1:
return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg))
else:
convert_queue = converte_format_queues[i-2]
convert_queue.put((self.socket_id,timestamp,mavlinkMsg))
_queue = swap_queues[i-2]
# _queue.put((self.socket_id,timestamp,mavlinkMsg)) # 測試看看 也許不需要別的資訊 只需要封包
_queue.put(mavlinkMsg)
# 處理要送出的封包
# thread 結束
logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id))
@ -262,7 +284,7 @@ class mavlink_object():
logger.error('Multiplexing List Format Error, Please check the list')
return False # 若有錯誤則回傳 False
check = len(self.multiplexingToConvert) != len(converte_format_queues)
check = len(self.multiplexingToConvert) != len(swap_queues)
if check:
logger.error('Multiplexing Queue not fit List , Please check the list')
return False
@ -274,163 +296,6 @@ class mavlink_object():
def main_node():
pass
# ====================== 分割線 =====================
# 測試時的程式進入點
def main_develop(args=None):
test_item = 2
print('test_item : ', test_item)
if test_item == 1:
# 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# 只啟用了 mavlink_object 的功能
print('Start of Program .Test 1')
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mavlink_object(mavlink_socket, 1)
mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE
mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT
mavlink_object1.multiplexingToConvert = [[74,]] # only VFR_HUD
# 做一個空的 queue list 驗證 multiplexingToConvert
converte_format_queues.append(queue.Queue())
# 啟動連線的模組
mavlink_object1.run()
# 運行幾秒並印出 queue 的資料
start_time = time.time()
while time.time() - start_time < 2:
while not fixed_stream_analyzer_queue.empty():
print('fixed_stream_analyzer_queue:')
t = fixed_stream_analyzer_queue.get()
print('from {} : {}'.format(t[0],t[2]))
while not return_packet_processor_queue.empty():
print('return_packet_processor_queue:')
t = return_packet_processor_queue.get()
# print('from {} : {}'.format(t[0],t[2]))
print(t[2].type)
for q in converte_format_queues:
while not q.empty():
print('converte_format_queues:')
t = q.get()
print('from {} : {}'.format(t[0],t[2]))
# 結束程式 退出所有 thread
mavlink_object1.running = False
mavlink_object1.thread.join()
mavlink_socket.close()
print('End of Program .Test 1')
elif test_item == 2:
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
# 啟用 mavlink_object 與 mavlink_analyzer的thread區塊 的功能
print('Start of Program .Test 2')
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object2 = mavlink_object(mavlink_socket, 1)
mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object2.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object2.multiplexingToConvert = [] #
# 啟動 mavlink_analyzer
analyzer = mavlink_analyzer()
# 啟動連線的模組
mavlink_object2.run()
start_time = time.time()
show_time = time.time()
compid = 1
while time.time() - start_time < 5:
if (time.time() - show_time) >= 1:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count)
print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in mavlink_systems:
print(mavlink_systems[sysid])
# 結束程式 退出所有 thread
mavlink_object2.stop()
mavlink_object2.thread.join()
analyzer.stop()
analyzer = mavlink_analyzer() # 這邊是測試是否只有一個 instance
analyzer.thread.join()
mavlink_socket.close()
print('End of Program .Test 2')
elif test_item == 3:
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('Start of Program .Test 3')
rclpy.init() # 注意要初始化 rclpy 才能使用 node
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mavlink_object(mavlink_socket, 1)
mavlink_object3.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object3.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object3.multiplexingToConvert = [] #
# 啟動 mavlink_analyzer
analyzer = mavlink_analyzer()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 啟動連線的模組
mavlink_object3.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < 20:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(0.5)
except KeyboardInterrupt:
break
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('End of Program .Test 3')
if __name__ == '__main__':
import rclpy # 這邊是為了測試而加的
main_develop()
pass

Loading…
Cancel
Save