From f79aaf86fa895962df5fc8fe9e8b9c4af3a33bed Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Wed, 7 May 2025 16:19:17 +0800 Subject: [PATCH] =?UTF-8?q?mavlink=5Fobject=20=E6=9E=B6=E6=A7=8B=E6=94=B9?= =?UTF-8?q?=E5=96=84=20=E6=96=B0=E5=A2=9E=E5=8A=9F=E8=83=BD=20=E8=BD=89?= =?UTF-8?q?=E6=8E=A5=E4=B8=8D=E5=90=8C=20socket=20=E7=9A=84=E6=95=B8?= =?UTF-8?q?=E6=93=9A=E6=B5=81=20=E6=9C=89=E6=99=82=E5=80=99=E4=B8=8D?= =?UTF-8?q?=E7=A9=A9=E5=AE=9A=20=E6=9C=83=E6=9C=89=E4=BA=9B=E5=B0=8F?= =?UTF-8?q?=E5=95=8F=E9=A1=8C=20=E4=B9=8B=E5=BE=8C=E5=86=8D=E6=94=B9?= =?UTF-8?q?=E5=96=84=20=E5=8F=A6=E5=A4=96=20devRun=20=E4=B9=9F=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E4=BA=86=E5=B9=BE=E5=80=8B=E7=AF=84=E4=BE=8B=2020?= =?UTF-8?q?=E8=99=9F=E9=A0=85=20=E5=8F=AF=E4=BB=A5=E7=8D=A8=E7=AB=8B?= =?UTF-8?q?=E6=B8=AC=E8=A9=A6=20node=20=E5=8A=9F=E8=83=BD=2012=E8=99=9F?= =?UTF-8?q?=E9=A0=85=20=E5=B1=95=E7=A4=BA=E6=9C=80=E7=B0=A1=E5=96=AE?= =?UTF-8?q?=E7=9A=84=E8=BD=89=E6=8E=A5=E5=88=B0=20GCS=20=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fc_network_adapter/devRun.py | 233 ++++++++++++++++-- .../fc_network_adapter/mavlinkDevice.py | 4 + .../fc_network_adapter/mavlinkObject.py | 141 ++++++++--- 3 files changed, 319 insertions(+), 59 deletions(-) diff --git a/src/fc_network_adapter/fc_network_adapter/devRun.py b/src/fc_network_adapter/fc_network_adapter/devRun.py index 538938e..21b68dc 100644 --- a/src/fc_network_adapter/fc_network_adapter/devRun.py +++ b/src/fc_network_adapter/fc_network_adapter/devRun.py @@ -13,29 +13,93 @@ import mavlinkObject as mo # ====================== 分割線 ===================== -test_item = 3 +test_item = 12 print('test_item : ', test_item) if test_item == 1: + # 測試 updateMultiplexingList 的輸出 + print('===> Start of Program .Test ', test_item) + mavlink_object_none = mo.mavlink_object(None) + + print('=====================') + print('正常範例') + mavlink_object_none.multiplexingToAnalysis = [] + mavlink_object_none.multiplexingToReturn = [] + # mavlink_object_none.multiplexingToSwap = [[]] + ret = mavlink_object_none.updateMultiplexingList() + print('execute result : ', ret) + print('multiplexingList : ', mavlink_object_none._multiplexingList) + + print('=====================') + print('錯誤範例 長度不一致') + mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],] + ret = mavlink_object_none.updateMultiplexingList() + print('execute result : ', ret) + + print('=====================') + print('正常範例') + mo.swap_queues.append(queue.Queue()) + mo.swap_queues.append(queue.Queue()) + mavlink_object_none.multiplexingToAnalysis = [0, 1, 2, 3, 4, 5] + mavlink_object_none.multiplexingToReturn = [6, 7, 8, 9, 10] + mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],] + ret = mavlink_object_none.updateMultiplexingList() + print('execute result : ', ret) + print('multiplexingList : ', mavlink_object_none._multiplexingList) + + print('=====================') + print('錯誤範例 multiplexingToAnalysis 不可以有 -1') + mavlink_object_none.multiplexingToAnalysis = [-1, 1, 2, 3, 4, 5] + ret = mavlink_object_none.updateMultiplexingList() + print('execute result : ', ret) + + print('End of Program') + + +elif test_item == 2: + # 測試 updateMultiplexingList 的輸出 + print('===> Start of Program .Test ', test_item) + mavlink_object_none1 = mo.mavlink_object(None) + mavlink_object_none2 = mo.mavlink_object(None) + mavlink_object_none3 = mo.mavlink_object(None) + del mavlink_object_none2 + + print(len(mo.swap_queues)) + + mavlink_object_none2 = mo.mavlink_object(None) + print(len(mo.swap_queues)) + + print(mavlink_object_none1._multiplexingList) + + print('End of Program') + + +elif test_item == 10: # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來 # 只啟用了 mavlink_object 的功能 - print('Start of Program .Test 1') + print('===> Start of Program .Test ', test_item) + mavlink_object_none = mo.mavlink_object(None) + connection_string="udp:127.0.0.1:14550" mavlink_socket = mavutil.mavlink_connection(connection_string) mavlink_object1 = mo.mavlink_object(mavlink_socket) + + print(mavlink_object1.multiplexingToSwap) + print(mo.swap_queues) mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT - mavlink_object1.multiplexingToConvert = [[74,]] # only VFR_HUD + mavlink_object1.multiplexingToSwap[0] = [74, ] # only VFR_HUD + # mavlink_object1.multiplexingToSwap[0] = [-1, ] # all - # 做一個空的 queue list 驗證 multiplexingToConvert - mo.swap_queues.append(queue.Queue()) # 啟動連線的模組 mavlink_object1.run() + print(mavlink_object1._multiplexingList) + # 運行幾秒並印出 queue 的資料 start_time = time.time() - while time.time() - start_time < 2: + while time.time() - start_time < 3: while not mo.fixed_stream_bridge_queue.empty(): print('fixed_stream_bridge_queue:') t = mo.fixed_stream_bridge_queue.get() @@ -43,32 +107,35 @@ if test_item == 1: while not mo.return_packet_processor_queue.empty(): print('return_packet_processor_queue:') t = mo.return_packet_processor_queue.get() - # print('from {} : {}'.format(t[0],t[2])) - print(t[2].type) + print('from {} : {}'.format(t[0],t[2])) + # print(t[2].get_msgbuf()) + # print(t[2].type) - for q in mo.swap_queues: + for n in range(len(mo.swap_queues)): + q = mo.swap_queues[n] while not q.empty(): print('swap_queues:') t = q.get() - print('from {} : {}'.format(t[0],t[2])) + print('{} gets : {}'.format(n,t)) + # 結束程式 退出所有 thread mavlink_object1.running = False mavlink_object1.thread.join() mavlink_socket.close() - print('End of Program .Test 1') + print('End of Program') -elif test_item == 2: + +elif test_item == 11: # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 # 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能 - print('Start of Program .Test 2') + print('===> Start of Program .Test ', test_item) connection_string="udp:127.0.0.1:14550" mavlink_socket = mavutil.mavlink_connection(connection_string) mavlink_object2 = mo.mavlink_object(mavlink_socket) mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT - mavlink_object2.multiplexingToReturn = [] # only HEARTBEAT - mavlink_object2.multiplexingToConvert = [] # + mavlink_object2.multiplexingToReturn = [] # 啟動 mavlink_bridge analyzer = mo.mavlink_bridge() @@ -105,11 +172,137 @@ elif test_item == 2: analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance analyzer.thread.join() mavlink_socket.close() - print('End of Program .Test 2') + print('End of Program') + +elif test_item == 12: + # 我這邊會測試 mavlink object 作為交換器的功能 + print('===> Start of Program .Test ', test_item) + + # 初始化兩個通道 + connection_string_in="udp:127.0.0.1:15551" + mavlink_socket_in = mavutil.mavlink_connection(connection_string_in) + + mavlink_object_in = mo.mavlink_object(mavlink_socket_in) + mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT + + connection_string_out="udpout:127.0.0.1:14553" + mavlink_socket_out = mavutil.mavlink_connection(connection_string_out, source_system=17, source_component=200) + + mavlink_object_out = mo.mavlink_object(mavlink_socket_out) + mavlink_object_out.multiplexingToAnalysis = [0] + + + # 讓兩個通道互相傳輸 + mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all + mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all + + # 做一個空的通道驗證 + mavlink_object_none = mo.mavlink_object(None) + # mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all + + # 啟動連線的模組 + analyzer = mo.mavlink_bridge() + + # 啟動通道 + mavlink_object_in.run() + mavlink_object_out.run() + + start_time = time.time() + show_time = time.time() + print("connection established!") + + print(type(mavlink_socket_out)) + print(type(mavlink_socket_out.mav)) + + while time.time() - start_time < 10: + try: + test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False) + print('none object : ', test) + except queue.Empty: + pass + + if (time.time() - show_time) >= 2: + show_time = time.time() + for sysid in analyzer.mavlink_systems: + for compid in analyzer.mavlink_systems[sysid].components: + print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) + + + analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) + print("===================") + + print('目前所有的系統 : ') + for sysid in analyzer.mavlink_systems: + print(analyzer.mavlink_systems[sysid]) + + + # 結束程式 退出所有 thread + mavlink_object_in.stop() + mavlink_object_in.thread.join() + mavlink_object_out.stop() + mavlink_object_out.thread.join() + mavlink_socket_in.close() + mavlink_socket_out.close() + + analyzer.stop() + print('End of Program') + +elif test_item == 20: + sysid = 1 + compid = 1 + + # 這邊測試 node 生成 topic 的功能 + rclpy.init() # 注意要初始化 rclpy 才能使用 node + # mavlink_object_none = mo.mavlink_object(None) + + # 啟動 mavlink_bridge + analyzer = mo.mavlink_bridge() + + from pymavlink.dialects.v20 import common as mavlink2 + + mav = mavlink2.MAVLink(None) + mav.srcSystem = sysid + mav.srcComponent = compid + + # 這是一個 heartbeat 封包 + fake_heartbeat = mavlink2.MAVLink_heartbeat_message( + type=mavlink2.MAV_TYPE_QUADROTOR, + autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA, + base_mode=3, + custom_mode=0, + system_status=0, + mavlink_version=3 + ) + mavlink_bytes = fake_heartbeat.pack(mav) + fake_heartbeat_msg = mav.parse_char(mavlink_bytes) + + print(analyzer.mavlink_systems) + + mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data + time.sleep(0.1) + print(analyzer.mavlink_systems) + + analyzer._init_node() + + analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid + print("topic created") + time.sleep(5) + + analyzer.emit_info() + + # 結束程式 + analyzer.running = False + analyzer.destroy_node() + + rclpy.shutdown() + analyzer.stop() + analyzer.thread.join() + print('End of Program') + -elif test_item == 3: +elif test_item == 21: # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 - print('Start of Program .Test 3') + print('===> Start of Program .Test ', test_item) rclpy.init() # 注意要初始化 rclpy 才能使用 node connection_string="udp:127.0.0.1:14550" mavlink_socket = mavutil.mavlink_connection(connection_string) @@ -117,7 +310,7 @@ elif test_item == 3: mavlink_object3 = mo.mavlink_object(mavlink_socket) mavlink_object3.multiplexingToAnalysis = [0] # only HEARTBEAT mavlink_object3.multiplexingToReturn = [] # only HEARTBEAT - mavlink_object3.multiplexingToConvert = [] # + mavlink_object3.multiplexingToSwap = [] # # 啟動 mavlink_bridge analyzer = mo.mavlink_bridge() @@ -163,4 +356,4 @@ elif test_item == 3: analyzer.thread.join() mavlink_socket.close() - print('End of Program .Test 3') + print('End of Program') diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py index cdb6768..d8c6e04 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py @@ -6,7 +6,11 @@ from theLogger import setup_logger logger = setup_logger("mavlinkDevice.py") +# 用來記錄每個 system 的資訊 +# 資料格式 { sysid : mavlink_device object } +mavlink_systems = {} +# ====================== 分割線 ===================== # 這個 class 是用來記錄每個 system 的資訊 每個 system 可能會有多個 component class mavlink_device(): diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index 741170f..7ecfb02 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -23,7 +23,7 @@ from pymavlink import mavutil from rclpy.node import Node # 自定義的 import -from mavlinkDevice import mavlink_device +from mavlinkDevice import mavlink_device, mavlink_systems from mavlinkPublish import mavlink_publisher from theLogger import setup_logger @@ -35,10 +35,6 @@ fixed_stream_bridge_queue = queue.Queue() return_packet_processor_queue = queue.Queue() swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer -# 用來記錄每個 system 的資訊 -# 資料格式 { sysid : mavlink_device object } -mavlink_systems = {} - # ====================== 分割線 ===================== class mavlink_bridge(Node, mavlink_publisher): @@ -163,31 +159,39 @@ class mavlink_object(): 分流表的控制在三個 list 分別為 multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的 multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的 - multiplexingToConvert : 這個 list 是用來分流到轉拋串流的 + multiplexingToSwap : 這個 list 是用來分流到轉拋串流的 透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為 ''' mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) } + socket_num = 0 # 用來記錄目前的 socket 數量 def __new__(cls, *args, **kwargs): # 每創建一個實例 就將其添加到 mavlinkObjects 列表中 # 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號 # 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號 - # socket_id 從 1 開始 + # socket_id 從 0 開始 instance = super().__new__(cls) - socket_id = 1 + socket_id = 0 for k in cls.mavlinkObjects.keys(): if k == socket_id: socket_id += 1 else: break instance.socket_id = socket_id + cls.socket_num += 1 cls.mavlinkObjects[socket_id] = instance return instance def __init__(self, socket): self.mavlink_socket = socket - self.msg_count = {} - self.multiplexingList = [] + # 這邊變數是執行的時候被使用的 不要直接寫入它 + self._multiplexingList = [] + # 存放要發送的訊息的 queue 或稱 buffer + self.output_buffer = queue.Queue() + if self.socket_id >= len(swap_queues): + swap_queues.append(self.output_buffer) + else: + swap_queues[self.socket_id] = self.output_buffer # 關聯到全域變數 global mavlink_systems @@ -195,27 +199,57 @@ class mavlink_object(): # 這三個 list 用來分配不同的訊息到不同的 queue self.multiplexingToAnalysis = [ - 0, # HEARTBEAT - 24, # GPS_RAW_INT - 30, # ATTITUDE - 33, # GLOBAL_POSITION_INT - 74, # VFR_HUD - 147, # BATTERY_STATUS - ] - self.multiplexingToReturn = [ - 0, # HEARTBEAT - # 30, # ATTITUDE + 0, # HEARTBEAT # 挺必要的項目 + # 24, # GPS_RAW_INT + # 30, # ATTITUDE + # 33, # GLOBAL_POSITION_INT + # 74, # VFR_HUD + # 147, # BATTERY_STATUS ] - self.multiplexingToConvert = [ + self.multiplexingToReturn = [] + self.multiplexingToSwap = [ + [] for _ in range(len(swap_queues)) ] + + # 刷新其他 mavlink_object 的 multiplexingToSwap + for k, object in self.mavlinkObjects.items(): + if (k != self.socket_id) and (len(object.multiplexingToSwap) <= self.socket_id): + object.multiplexingToSwap.append([]) + object.updateMultiplexingList() + logger.info('mavlink_object instance {} created'.format(self.socket_id)) def __del__(self): - self.mavlink_socket.close() - self.stop() + # 停下自己的 thread + if self.mavlink_socket != None: + self.mavlink_socket.close() + self.stop() + + # 移除其他 mavlink_object 的 multiplexingToSwap + for k, object in self.mavlinkObjects.items(): + if (k != self.socket_id) and (len(object.multiplexingToSwap) > self.socket_id): + object.multiplexingToSwap[self.socket_id] = [] + object.updateMultiplexingList() + + # 移除自己的 swap_queues + swap_queues[self.socket_id] = None + + # 處理 class 的 instance 記錄 + self.socket_num -= 1 self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance + # logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替 + print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug + # logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) + # try: + # logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) + # except Exception as e: + # print(f"Error logging in __del__: {e}") + # if 'logger' in globals() and logger: + # logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) + def run(self): + # TODO 檢查 socket 是否有連線 self.thread = threading.Thread(target=self._run_thread) self.running = self.updateMultiplexingList() self.thread.start() @@ -247,15 +281,13 @@ class mavlink_object(): # mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型 mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) - # print(type(mavlinkMsg.pack)) # debug - # print("mark A") - # # print(dir(mavlinkMsg)) # debug - # print(mavlinkMsg.pack()) # debug - # break # Debug + # break # Debug function can put here you can see the input data from mavlink # 將訊息依照 multiplexing list 分發到不同的 queue - for i in range(len(self.multiplexingList)): - if mavlinkMsg.get_msgId() in self.multiplexingList[i]: + for i in range(len(self._multiplexingList)): + if (self._multiplexingList[i] == []): + continue + elif (mavlinkMsg.get_msgId() in self._multiplexingList[i]) or (self._multiplexingList[i][0] == -1): if i == 0: fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg)) elif i == 1: @@ -266,6 +298,20 @@ class mavlink_object(): _queue.put(mavlinkMsg) # 處理要送出的封包 + # 如果 有資料在 output_buffer 中則將其取出並發送 + # 沒有就略過發送 + + try: + mavlinkMsg_send = self.output_buffer.get(block=False) + except queue.Empty: + mavlinkMsg_send = None + # except Exception as e: + # logger.error(f"Error getting data from output_buffer: {e}") + # mavlinkMsg_send = None + + if mavlinkMsg_send: + # self.mavlink_socket.mav.send(mavlinkMsg_send) + self.mavlink_socket.write(mavlinkMsg_send.get_msgbuf()) # 這邊會將封包寫入 socket 中 # thread 結束 logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id)) @@ -274,20 +320,37 @@ class mavlink_object(): ''' 更新 multiplexing list 並做簡單的檢查 ''' + # 檢查 multiplexingToAnalysis 與 multiplexingToReturn 是否有 -1 值 + check = (-1 in self.multiplexingToAnalysis) or (-1 in self.multiplexingToReturn) + if check: + logger.error('MultiplexingToAnalysis or MultiplexingToReturn NOT all type spilt. socket id : {}'.format(self.socket_id)) + return False - # 更新 multiplexing list - self.multiplexingList = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToConvert + # 檢查 multiplexingToSwap 與 swap_queues 的長度是否一致 而且 swap_queues 的長度不能為 0 + check = len(self.multiplexingToSwap) != len(swap_queues) or len(swap_queues) == 0 + if check: + logger.error('Multiplexing Queue not fit List , Please check the list. socket id : {}'.format(self.socket_id)) + return False + + # 對應到自己的 multiplexingToSwap 必需為空 避免對自己迴圈轉拋 + try: + self.multiplexingToSwap[self.socket_id] = [] + except IndexError: + logger.error('Multiplexing List of self socket id should be void. socket id : {}'.format(self.socket_id)) + return False - # 檢查 multiplexing list 格式是否有錯誤 - check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in self.multiplexingList) + # 組合 multiplexing list + multiL_tmp = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToSwap + + # 檢查 multiplexing list 格式是否有錯誤 # 全部都要是 list 每個 list 裡面都要是 int + check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in multiL_tmp) if not check: - logger.error('Multiplexing List Format Error, Please check the list') + logger.error('Multiplexing List Format Error, Please check the list. socket id : {}'.format(self.socket_id)) return False # 若有錯誤則回傳 False - check = len(self.multiplexingToConvert) != len(swap_queues) - if check: - logger.error('Multiplexing Queue not fit List , Please check the list') - return False + # 更新 multiplexing list + self._multiplexingList = multiL_tmp + return True # ====================== 分割線 =====================