From c38f277db46a19d918f85fe68d24c2db05c7d3e8 Mon Sep 17 00:00:00 2001 From: ken910606 Date: Wed, 16 Apr 2025 22:57:49 +0800 Subject: [PATCH] Upload files to 'src/fc_network_adapter/fc_network_adapter' --- .../fc_network_adapter/mavlinkObject.py | 469 ++++++++++++++---- 1 file changed, 361 insertions(+), 108 deletions(-) diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index f84ce60..deaf82f 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -1,16 +1,4 @@ -import threading -from pymavlink import mavutil -import queue - -import rclpy -from rclpy.node import Node -from std_msgs.msg import String - -import time - - ''' - # 不同的匯流排只有單一種通訊協定 # 匯流排接到訊息後透過 queue stack 傳送到分析器 # 會有三種 Queue 分別作為 1. 固定串流分析器 2. 回傳封包處理器 3. 轉格式(例如 mavlink to FDM)的轉換器 @@ -18,43 +6,211 @@ import time # 分析器會將解析得到的結論 再透過 ros2 的 publisher 發送出去 # 關於 mavlink 採用 pymavlink 這個套件 製作匯流排 -# pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlinkObject 裡面 -# 這邊的 main 會是用來初始化 mavlinkObject 並啟動他的 run function - +# pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlink_object 裡面 +# 這邊的 main 會是用來初始化 mavlink_object 並啟動他的 run function ''' +# 基礎功能的 import +import threading +import queue +import time + +# mavlink 的 import +from pymavlink import mavutil + +# ROS2 的 import +from rclpy.node import Node + +# 自定義的 import +from mavlinkDevice import mavlink_device +from mavlinkPublish import mavlink_publisher +from theLogger import setup_logger + + +# ====================== 分割線 ===================== + +logger = setup_logger("mavlinkObject.py") + fixed_stream_analyzer_queue = queue.Queue() return_packet_processor_queue = queue.Queue() converte_format_queues = [] -class MavlinkObject(): +# 用來記錄每個 system 的資訊 +# 資料格式 { sysid : mavlink_device object } +mavlink_systems = {} + +# ====================== 分割線 ===================== + +class mavlink_analyzer(Node, mavlink_publisher): + ''' + 這個 class 就是 固定串流分析器 + 是用來接收 mavlink 訊息 並進行分析 + 這個地方是針對 fixed_stream_analyzer_queue 的資料做處理的 + 記錄有 mavlink bus 上有那些 system id 和 component id + 為了每個 system id 都有一個對應的 device object + 並且看是否有重複 system id + + 整段代碼包含兩大區塊 thread 和 node + + thread 區塊內會對 fixed_stream_analyzer_queue 進行監聽 並且將收到的訊息進行處理 + 其中 HEARTBEAT 是一個特殊類別 用來初始化整個 device object + + node 區塊則是處理 ros2 的 publisher 和 subscriber 訂閱相關 + 藉由控制 ros2 的機制再把 device object 的資訊發送出去 + + ps. 我限制了這個 class 只能有一個 instance + + ''' + _instance = None + _lock = threading.Lock() # 確保多線程安全 + + def __new__(cls, *args, **kwargs): + with cls._lock: # 確保多線程環境下只有一個實例被創建 + if cls._instance is None: + cls._instance = super(mavlink_analyzer, cls).__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, "initialized"): # 防止重複初始化 + self.initialized = True + + # 關聯到全域變數 + global mavlink_systems + self.mavlink_systems = mavlink_systems + + # 當 object 建立時會直接運行 thread 直到消滅 + self.running = True + self.thread = threading.Thread(target=self._run_thread) + self.thread.start() + else: + logger.error('mavlink_analyzer instance already exists. Do not create another one.') + + def stop(self): + self.running = False + + # === Thread 區塊 === + def _run_thread(self): + # start_time = time.time() # debug + logger.info('Start of mavlink_analyzer._run_thread') + # 從 Queue fixed_stream_analyzer_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理 + while self.running: + if fixed_stream_analyzer_queue.empty(): + continue + msg_pack = fixed_stream_analyzer_queue.get() + + msg = msg_pack[2] + sysid = msg.get_srcSystem() + msg_id = msg.get_msgId() + msg_type = msg.get_type() + + # 若這個 system id 還不存在 執行完整建立 device object 的流程 + if not sysid in self.mavlink_systems: + device_object = mavlink_device() # 創建一個新的 device object + self.mavlink_systems[sysid] = device_object + device_object.socket_id = msg_pack[0] + device_object.sysid = sysid + + this_component = device_object.mavlink_component() # 創建一個新的 component object + device_object.components[msg.get_srcComponent()] = this_component + this_component.mav_type = msg_type + + # ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↓↓↓↓↓↓↓↓↓↓↓↓ + if msg_id == 0: # HEARTBEAT 處理 + this_component.emitParams['base_mode'] = msg.base_mode + this_component.emitParams['flightMode_mode'] = mavutil.mode_string_v10(msg) + + elif msg_id == 30: # ATTITUDE 處理 + this_component.emitParams['attitude'] = msg + + elif msg_id == 32: # LOCAL_POSITION_NED 處理 + this_component.emitParams['local_position'] = msg + + elif msg_id == 33: # GLOBAL_POSITION_INT 處理 + this_component.emitParams['global_position'] = msg + + elif msg_id == 74: # VFR_HUD 處理 + this_component.emitParams['vfr_hud'] = msg + + elif msg_id == 147: # BATTERY_STATUS 處理 + this_component.emitParams['battery'] = msg + + # ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↑↑↑↑↑↑↑↑↑↑↑↑ + + # 若未定義的訊息類型則不處理 並跳出訊息 + else: + logger.warning('This Message Type Did not define process method : {} / {}'.format(msg.get_msgId(), msg.get_type())) + continue + + logger.info('End of mavlink_analyzer._run_thread') + + # === Node 區塊 === + def _init_node(self): + logger.info('Start of mavlink_analyzer._init_node') + super().__init__('mavlink_analyzer') # TODO 不知道為何 這句耗時超長 可以到 5~10 秒 + + def emit_info(self): + # 這邊將 mavlink_systems 內所有的 device object 內所有的 component 輪循過 + # 把 emitParams 的參數發送出去 + for sysid, device in self.mavlink_systems.items(): + for compid, component in device.components.items(): + for topic_name in component.publishers.keys(): + publisher = component.publishers[topic_name][0] + packEmit_func = component.publishers[topic_name][1] + packEmit_func(component.emitParams, publisher) + + def _del_node(self): + # TODO 這邊要刪除 node 的時候要做的事情 + # 先註銷所有 mavlink_systems 中 component 的 publisher + # 再註銷所有 mavlink_systems 中的 device object + # 再註銷 node + pass + +# ====================== 分割線 ===================== + +class mavlink_object(): + ''' + 每個 mavlink bus 都會有一個 mavlink_object + 其中主要是 thread 做統計封包與分流 + 分流表的控制在三個 list 分別為 + multiplexingToAnalysis : 這個 list 是用來分流到固定串流分析器的 + multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的 + multiplexingToConvert : 這個 list 是用來分流到轉格式的 + 透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為 + ''' def __init__(self, mavlink_socket, socket_id = None): self.socket_id = socket_id self.mavlink_socket = mavlink_socket self.msg_count = {} self.multiplexingList = [] + # 關聯到全域變數 + global mavlink_systems + self.mavlink_systems = mavlink_systems + + # 這三個 list 用來分配不同的訊息到不同的 queue self.multiplexingToAnalysis = [ - # 0, # HEARTBEAT + 0, # HEARTBEAT + 24, # GPS_RAW_INT 30, # ATTITUDE + 32, # LOCAL_POSITION_NED 33, # GLOBAL_POSITION_INT - # 147, # BATTERY_STATUS + 74, # VFR_HUD + 147, # BATTERY_STATUS ] self.multiplexingToReturn = [ 0, # HEARTBEAT - # 30, # ATTITUDE ] self.multiplexingToConvert = [ - [74, ] # VFR_HUD ] + def __del__(self): self.mavlink_socket.close() self.stop() def run(self): if not self.socket_id: - print('[mavlinkObject.py] Please set socket id before running') + logger.error('Please set socket id before running') return self.thread = threading.Thread(target=self._run_thread) self.running = self.updateMultiplexingList() @@ -64,134 +220,231 @@ class MavlinkObject(): self.running = False def _run_thread(self): + logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id)) start_time = time.time() while self.running: + timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差 try: mavlinkMsg = self.mavlink_socket.recv_msg() except Exception as e: + logger.critical(f"Receiving data not mavlink format. Object Delete.") print(f"[mavlinkObject.py] Error receiving mavlink data: {e}") print(mavlinkMsg) self.running = False break - if mavlinkMsg: - self.count_mavlink_type(mavlinkMsg) # 統計收到的訊息 - # print(mavlinkMsg) # data type 'pymavlink.dialects.v20.ardupilotmega.MAVLink_attitude_message' # debug + if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega.MAVLink_attitude_message' + # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間 + sysid = mavlinkMsg.get_srcSystem() + compid = mavlinkMsg.get_srcComponent() + if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息 + # mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型 + mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) + + # print(mavlinkMsg) # debug # break # Debug # 將訊息依照 multiplexing list 分發到不同的 queue for i in range(len(self.multiplexingList)): if mavlinkMsg.get_msgId() in self.multiplexingList[i]: if i == 0: - fixed_stream_analyzer_queue.put((self.socket_id,mavlinkMsg)) + fixed_stream_analyzer_queue.put((self.socket_id,timestamp,mavlinkMsg)) elif i == 1: - return_packet_processor_queue.put((self.socket_id,mavlinkMsg)) + return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg)) else: convert_queue = converte_format_queues[i-2] - convert_queue.put((self.socket_id,mavlinkMsg)) - - # 每秒中 統計一次 收到的訊息總量 - elapsed_time = time.time() - start_time - if elapsed_time > 1: - print('[mavlinkObject.py] Messages Type received (in about 1 sec) :') - print(self.msg_count) - start_time = time.time() - self.msg_count = {} - - # thread 結束 - print('[mavlinkObject.py] End of _run_thread') - - def count_mavlink_type(self, mavlinkMsg): - msg_type = mavlinkMsg.get_type() - if msg_type in self.msg_count: - self.msg_count[msg_type] += 1 - else: - self.msg_count[msg_type] = 1 + convert_queue.put((self.socket_id,timestamp,mavlinkMsg)) - def publish_mavlink_data(self, mavlinkMsg): - msg = String() - msg.data = str(mavlinkMsg) - self.publisher_.publish(msg) + # thread 結束 + logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id)) def updateMultiplexingList(self): + ''' + 更新 multiplexing list 並做簡單的檢查 + ''' + # 更新 multiplexing list self.multiplexingList = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToConvert + # 檢查 multiplexing list 格式是否有錯誤 check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in self.multiplexingList) - - # 若有錯誤則回傳 False if not check: - print('[mavlinkObject.py] Multiplexing List Error, Please check the list') + logger.error('Multiplexing List Format Error, Please check the list') + return False # 若有錯誤則回傳 False + + check = len(self.multiplexingToConvert) != len(converte_format_queues) + if check: + logger.error('Multiplexing Queue not fit List , Please check the list') return False return True -class mavlink_analyzer(Node): - ''' - 這個 class 是用來接收 mavlink 訊息 並進行分析 - 記錄有 mavlink bus 上有那些 system id 和 component id - 為了每個 system id 都有一個對應的 device object - 並且看是否有重複 system id +# ====================== 分割線 ===================== - 也許可以用 node 去創建 這樣就可以直接進到 ros2 執行緒? - ''' - def __init__(self): - super().__init__('mavlink_analyzer') +# 整合到 ros2 之後的程式進入點 +def main_node(): + pass - # self.publisher_ = self.create_publisher(String, 'mavlink_data', 10) - # self.mavlink_object = None - # self.create_mavlink_object() +# ====================== 分割線 ===================== +# 測試時的程式進入點 +def main_develop(args=None): - def create_mavlink_object(self): - # connection_string="udp: - pass + test_item = 3 + print('test_item : ', test_item) -class mavlink_device(): - def __init__(self): - self.socket_id = None # 記錄來自於哪個 socket - self.sysid = None - self.component_count = 1 - self.compid_list = [] - + if test_item == 1: + # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來 + # 只啟用了 mavlink_object 的功能 + print('Start of Program .Test 1') + connection_string="udp:127.0.0.1:14550" + mavlink_socket = mavutil.mavlink_connection(connection_string) + mavlink_object1 = mavlink_object(mavlink_socket, 1) + mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE + mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT + mavlink_object1.multiplexingToConvert = [[74,]] # only VFR_HUD -def main(args=None): + # 做一個空的 queue list 驗證 multiplexingToConvert + converte_format_queues.append(queue.Queue()) + # 啟動連線的模組 + mavlink_object1.run() - connection_string="udp:127.0.0.1:14550" - mavlink_socket = mavutil.mavlink_connection(connection_string) + # 運行幾秒並印出 queue 的資料 + start_time = time.time() + while True: + while not fixed_stream_analyzer_queue.empty(): + print('fixed_stream_analyzer_queue:') + t = fixed_stream_analyzer_queue.get() + print('from {} : {}'.format(t[0],t[2])) + while not return_packet_processor_queue.empty(): + print('return_packet_processor_queue:') + t = return_packet_processor_queue.get() + # print('from {} : {}'.format(t[0],t[2])) + print(t[2].type) + + for q in converte_format_queues: + while not q.empty(): + print('converte_format_queues:') + t = q.get() + print('from {} : {}'.format(t[0],t[2])) + + # 結束程式 退出所有 thread + mavlink_object1.running = False + mavlink_object1.thread.join() + mavlink_socket.close() + print('End of Program .Test 1') + + elif test_item == 2: + # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 + # 啟用 mavlink_object 與 mavlink_analyzer的thread區塊 的功能 + print('Start of Program .Test 2') + connection_string="udp:127.0.0.1:14550" + mavlink_socket = mavutil.mavlink_connection(connection_string) + + mavlink_object2 = mavlink_object(mavlink_socket, 1) + mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT + mavlink_object2.multiplexingToReturn = [] # only HEARTBEAT + mavlink_object2.multiplexingToConvert = [] # + + # 啟動 mavlink_analyzer + analyzer = mavlink_analyzer() + + # 啟動連線的模組 + mavlink_object2.run() - # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來 - mavlink_object = MavlinkObject(mavlink_socket, 1) - mavlink_object.multiplexingToAnalysis = [30] # only ATTITUDE - mavlink_object.multiplexingToReturn = [0] # only HEARTBEAT - mavlink_object.multiplexingToConvert = [[74,]] # only VFR_HUD - - converte_format_queues.append(queue.Queue()) + start_time = time.time() + show_time = time.time() + compid = 1 + while time.time() - start_time < 5: + if (time.time() - show_time) >= 1: + show_time = time.time() + for sysid in analyzer.mavlink_systems: + print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count) + print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count) + print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) + analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) + print("===================") + + # 印出目前所有 mavlink_systems 的內容 + print('目前所有的系統 : ') + for sysid in mavlink_systems: + print(mavlink_systems[sysid]) + # 結束程式 退出所有 thread + mavlink_object2.stop() + mavlink_object2.thread.join() + analyzer.stop() + + analyzer = mavlink_analyzer() # 這邊是測試是否只有一個 instance + analyzer.thread.join() + mavlink_socket.close() + print('End of Program .Test 2') + + elif test_item == 3: + # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 + print('Start of Program .Test 3') + rclpy.init() # 注意要初始化 rclpy 才能使用 node + connection_string="udp:127.0.0.1:14550" + mavlink_socket = mavutil.mavlink_connection(connection_string) + + mavlink_object3 = mavlink_object(mavlink_socket, 1) + mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] + mavlink_object3.multiplexingToReturn = [] # only HEARTBEAT + mavlink_object3.multiplexingToConvert = [] # + + # 啟動 mavlink_analyzer + analyzer = mavlink_analyzer() + + # 關於 Node 的初始化 + show_time = time.time() + analyzer._init_node() # 初始化 node + print('初始化 node 完成 耗時 : ',time.time() - show_time) + + # 啟動連線的模組 + mavlink_object3.run() + + print('waiting for mavlink data ...') + time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 + + compid = 1 + sysid = 1 + start_time = time.time() + analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) + analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid]) + analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid]) + analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid]) + analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid]) + analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid]) + analyzer.create_global_vel(sysid, analyzer.mavlink_systems[sysid].components[compid]) + analyzer.create_global_hdg(sysid, analyzer.mavlink_systems[sysid].components[compid]) + analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid]) + analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid]) + end_time = time.time() + print(f"Execution time for create_flightMode: {end_time - start_time} seconds") + + print("start emit info") - mavlink_object.run() - start_time = time.time() - while time.time() - start_time < 2: - - while not fixed_stream_analyzer_queue.empty(): - print('fixed_stream_analyzer_queue:') - t = fixed_stream_analyzer_queue.get() - print('from {} : {}'.format(t[0],t[1])) - while not return_packet_processor_queue.empty(): - print('return_packet_processor_queue:') - t = return_packet_processor_queue.get() - print('from {} : {}'.format(t[0],t[1])) + start_time = time.time() + show_time = time.time() + while time.time() - start_time < 20: + try: + # rclpy.spin(analyzer) + analyzer.emit_info() # 這邊是測試 node 的運行 + time.sleep(0.5) + except KeyboardInterrupt: + break - for q in converte_format_queues: - while not q.empty(): - print('converte_format_queues:') - t = q.get() - print('from {} : {}'.format(t[0],t[1])) - - - mavlink_object.running = False - mavlink_object.thread.join() - print('End of Program') + analyzer.destroy_node() + rclpy.shutdown() + + # 結束程式 退出所有 thread + mavlink_object3.stop() + mavlink_object3.thread.join() + analyzer.stop() + analyzer.thread.join() + mavlink_socket.close() + print('End of Program .Test 3') if __name__ == '__main__': - main() \ No newline at end of file + import rclpy # 這邊是為了測試而加的 + main_develop()