diff --git a/src/fc_network_adapter/fc_network_adapter/devRun.py b/src/fc_network_adapter/fc_network_adapter/devRun.py new file mode 100644 index 0000000..538938e --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/devRun.py @@ -0,0 +1,166 @@ +# 基礎功能的 import +import queue +import time + +# ROS2 的 import +import rclpy + +# mavlink 的 import +from pymavlink import mavutil + +# 自定義的 import +import mavlinkObject as mo + +# ====================== 分割線 ===================== + +test_item = 3 +print('test_item : ', test_item) + +if test_item == 1: + # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來 + # 只啟用了 mavlink_object 的功能 + print('Start of Program .Test 1') + connection_string="udp:127.0.0.1:14550" + mavlink_socket = mavutil.mavlink_connection(connection_string) + + mavlink_object1 = mo.mavlink_object(mavlink_socket) + mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE + mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT + mavlink_object1.multiplexingToConvert = [[74,]] # only VFR_HUD + + # 做一個空的 queue list 驗證 multiplexingToConvert + mo.swap_queues.append(queue.Queue()) + # 啟動連線的模組 + mavlink_object1.run() + + # 運行幾秒並印出 queue 的資料 + start_time = time.time() + while time.time() - start_time < 2: + while not mo.fixed_stream_bridge_queue.empty(): + print('fixed_stream_bridge_queue:') + t = mo.fixed_stream_bridge_queue.get() + print('from {} : {}'.format(t[0],t[2])) + while not mo.return_packet_processor_queue.empty(): + print('return_packet_processor_queue:') + t = mo.return_packet_processor_queue.get() + # print('from {} : {}'.format(t[0],t[2])) + print(t[2].type) + + for q in mo.swap_queues: + while not q.empty(): + print('swap_queues:') + t = q.get() + print('from {} : {}'.format(t[0],t[2])) + + # 結束程式 退出所有 thread + mavlink_object1.running = False + mavlink_object1.thread.join() + mavlink_socket.close() + print('End of Program .Test 1') + +elif test_item == 2: + # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 + # 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能 + print('Start of Program .Test 2') + connection_string="udp:127.0.0.1:14550" + mavlink_socket = mavutil.mavlink_connection(connection_string) + + mavlink_object2 = mo.mavlink_object(mavlink_socket) + mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT + mavlink_object2.multiplexingToReturn = [] # only HEARTBEAT + mavlink_object2.multiplexingToConvert = [] # + + # 啟動 mavlink_bridge + analyzer = mo.mavlink_bridge() + + # 啟動連線的模組 + mavlink_object2.run() + + start_time = time.time() + show_time = time.time() + compid = 1 + while time.time() - start_time < 5: + if (time.time() - show_time) >= 1: + show_time = time.time() + for sysid in analyzer.mavlink_systems: + print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count) + print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count) + print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) + analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) + print("===================") + + # 印出目前所有 mavlink_systems 的內容 + print('目前所有的系統 : ') + for sysid in analyzer.mavlink_systems: + print(analyzer.mavlink_systems[sysid]) + + + + + # 結束程式 退出所有 thread + mavlink_object2.stop() + mavlink_object2.thread.join() + analyzer.stop() + + analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance + analyzer.thread.join() + mavlink_socket.close() + print('End of Program .Test 2') + +elif test_item == 3: + # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 + print('Start of Program .Test 3') + rclpy.init() # 注意要初始化 rclpy 才能使用 node + connection_string="udp:127.0.0.1:14550" + mavlink_socket = mavutil.mavlink_connection(connection_string) + + mavlink_object3 = mo.mavlink_object(mavlink_socket) + mavlink_object3.multiplexingToAnalysis = [0] # only HEARTBEAT + mavlink_object3.multiplexingToReturn = [] # only HEARTBEAT + mavlink_object3.multiplexingToConvert = [] # + + # 啟動 mavlink_bridge + analyzer = mo.mavlink_bridge() + + # 關於 Node 的初始化 + show_time = time.time() + analyzer._init_node() # 初始化 node + print('初始化 node 完成 耗時 : ',time.time() - show_time) + + # 啟動連線的模組 + mavlink_object3.run() + + print('waiting for mavlink data ...') + time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 + + compid = 1 + sysid = 1 + start_time = time.time() + analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) + end_time = time.time() + print(f"Execution time for create_flightMode: {end_time - start_time} seconds") + + print("start emit info") + + start_time = time.time() + show_time = time.time() + while time.time() - start_time < 10: + try: + # rclpy.spin(analyzer) + analyzer.emit_info() # 這邊是測試 node 的運行 + time.sleep(0.5) + except KeyboardInterrupt: + break + + analyzer.destroy_node() + rclpy.shutdown() + + + # 結束程式 退出所有 thread + mavlink_object3.stop() + mavlink_object3.thread.join() + analyzer.stop() + analyzer.thread.join() + + mavlink_socket.close() + print('End of Program .Test 3') diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py index ba12465..cdb6768 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py @@ -1,9 +1,13 @@ - +# 自定義的 import from theLogger import setup_logger +# ====================== 分割線 ===================== + logger = setup_logger("mavlinkDevice.py") + + # 這個 class 是用來記錄每個 system 的資訊 每個 system 可能會有多個 component class mavlink_device(): def __init__(self): diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index 185bea2..741170f 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -1,9 +1,9 @@ ''' # 不同的匯流排只有單一種通訊協定 -# 匯流排接到訊息後透過 queue stack 傳送到分析器 -# 會有三種 Queue 分別作為 1. 固定串流分析器 2. 回傳封包處理器 3. 轉格式(例如 mavlink to FDM)的轉換器 +# 匯流排接到訊息後透過 queue stack 傳送到橋接器 +# 會有三種 Queue 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 3. 轉拋串流 # 第三種比較麻煩 會需要用 list 管理多個轉換器的 queue -# 分析器會將解析得到的結論 再透過 ros2 的 publisher 發送出去 +# 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去 # 關於 mavlink 採用 pymavlink 這個套件 製作匯流排 # pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlink_object 裡面 @@ -21,21 +21,19 @@ from pymavlink import mavutil # ROS2 的 import from rclpy.node import Node -import std_msgs.msg # 自定義的 import from mavlinkDevice import mavlink_device from mavlinkPublish import mavlink_publisher from theLogger import setup_logger - # ====================== 分割線 ===================== logger = setup_logger("mavlinkObject.py") -fixed_stream_analyzer_queue = queue.Queue() +fixed_stream_bridge_queue = queue.Queue() return_packet_processor_queue = queue.Queue() -converte_format_queues = [] +swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer # 用來記錄每個 system 的資訊 # 資料格式 { sysid : mavlink_device object } @@ -43,18 +41,18 @@ mavlink_systems = {} # ====================== 分割線 ===================== -class mavlink_analyzer(Node, mavlink_publisher): +class mavlink_bridge(Node, mavlink_publisher): ''' - 這個 class 就是 固定串流分析器 - 是用來接收 mavlink 訊息 並進行分析 - 這個地方是針對 fixed_stream_analyzer_queue 的資料做處理的 + 這個 class 就是 固定串流橋接器 + 是用來接收 mavlink 訊息 並進行橋接 + 這個地方是針對 fixed_stream_bridge_queue 的資料做處理的 記錄有 mavlink bus 上有那些 system id 和 component id 為了每個 system id 都有一個對應的 device object 並且看是否有重複 system id 整段代碼包含兩大區塊 thread 和 node - thread 區塊內會對 fixed_stream_analyzer_queue 進行監聽 並且將收到的訊息進行處理 + thread 區塊內會對 fixed_stream_bridge_queue 進行監聽 並且將收到的訊息進行處理 其中 HEARTBEAT 是一個特殊類別 用來初始化整個 device object node 區塊則是處理 ros2 的 publisher 和 subscriber 訂閱相關 @@ -69,7 +67,7 @@ class mavlink_analyzer(Node, mavlink_publisher): def __new__(cls, *args, **kwargs): with cls._lock: # 確保多線程環境下只有一個實例被創建 if cls._instance is None: - cls._instance = super(mavlink_analyzer, cls).__new__(cls) + cls._instance = super(mavlink_bridge, cls).__new__(cls) return cls._instance def __init__(self): @@ -85,7 +83,7 @@ class mavlink_analyzer(Node, mavlink_publisher): self.thread = threading.Thread(target=self._run_thread) self.thread.start() else: - logger.error('mavlink_analyzer instance already exists. Do not create another one.') + logger.error('mavlink_bridge instance already exists. Do not create another one.') def stop(self): self.running = False @@ -93,12 +91,12 @@ class mavlink_analyzer(Node, mavlink_publisher): # === Thread 區塊 === def _run_thread(self): # start_time = time.time() # debug - logger.info('Start of mavlink_analyzer._run_thread') - # 從 Queue fixed_stream_analyzer_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理 + logger.info('Start of mavlink_bridge._run_thread') + # 從 Queue fixed_stream_bridge_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理 while self.running: - if fixed_stream_analyzer_queue.empty(): + if fixed_stream_bridge_queue.empty(): continue - msg_pack = fixed_stream_analyzer_queue.get() + msg_pack = fixed_stream_bridge_queue.get() msg = msg_pack[2] sysid = msg.get_srcSystem() @@ -132,12 +130,12 @@ class mavlink_analyzer(Node, mavlink_publisher): logger.warning('This Message Type Did not define process method : {} / {}'.format(msg.get_msgId(), msg.get_type())) continue - logger.info('End of mavlink_analyzer._run_thread') + logger.info('End of mavlink_bridge._run_thread') # === Node 區塊 === def _init_node(self): - logger.info('Start of mavlink_analyzer._init_node') - super().__init__('mavlink_analyzer') # TODO 不知道為何 這句耗時超長 可以到 5~10 秒 + logger.info('Start of mavlink_bridge._init_node') + super().__init__('mavlink_bridge') # TODO 不知道為何 這句耗時超長 可以到 5~10 秒 def emit_info(self): # 這邊將 mavlink_systems 內所有的 device object 內所有的 component 輪循過 @@ -163,14 +161,31 @@ class mavlink_object(): 每個 mavlink bus 都會有一個 mavlink_object 其中主要是 thread 做統計封包與分流 分流表的控制在三個 list 分別為 - multiplexingToAnalysis : 這個 list 是用來分流到固定串流分析器的 + multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的 multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的 - multiplexingToConvert : 這個 list 是用來分流到轉格式的 + multiplexingToConvert : 這個 list 是用來分流到轉拋串流的 透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為 ''' - def __init__(self, mavlink_socket, socket_id = None): - self.socket_id = socket_id - self.mavlink_socket = mavlink_socket + mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) } + def __new__(cls, *args, **kwargs): + # 每創建一個實例 就將其添加到 mavlinkObjects 列表中 + # 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號 + # 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號 + # socket_id 從 1 開始 + + instance = super().__new__(cls) + socket_id = 1 + for k in cls.mavlinkObjects.keys(): + if k == socket_id: + socket_id += 1 + else: + break + instance.socket_id = socket_id + cls.mavlinkObjects[socket_id] = instance + return instance + + def __init__(self, socket): + self.mavlink_socket = socket self.msg_count = {} self.multiplexingList = [] @@ -193,15 +208,14 @@ class mavlink_object(): ] self.multiplexingToConvert = [ ] + logger.info('mavlink_object instance {} created'.format(self.socket_id)) def __del__(self): self.mavlink_socket.close() self.stop() + self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance def run(self): - if not self.socket_id: - logger.error('Please set socket id before running') - return self.thread = threading.Thread(target=self._run_thread) self.running = self.updateMultiplexingList() self.thread.start() @@ -214,6 +228,8 @@ class mavlink_object(): start_time = time.time() while self.running: timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差 + + # 處理接收到的封包 try: mavlinkMsg = self.mavlink_socket.recv_msg() except Exception as e: @@ -223,7 +239,7 @@ class mavlink_object(): self.running = False break - if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega.MAVLink_attitude_message' + if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega. etc...' # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間 sysid = mavlinkMsg.get_srcSystem() compid = mavlinkMsg.get_srcComponent() @@ -231,19 +247,25 @@ class mavlink_object(): # mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型 mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) - # print(mavlinkMsg) # debug + # print(type(mavlinkMsg.pack)) # debug + # print("mark A") + # # print(dir(mavlinkMsg)) # debug + # print(mavlinkMsg.pack()) # debug # break # Debug # 將訊息依照 multiplexing list 分發到不同的 queue for i in range(len(self.multiplexingList)): if mavlinkMsg.get_msgId() in self.multiplexingList[i]: if i == 0: - fixed_stream_analyzer_queue.put((self.socket_id,timestamp,mavlinkMsg)) + fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg)) elif i == 1: return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg)) else: - convert_queue = converte_format_queues[i-2] - convert_queue.put((self.socket_id,timestamp,mavlinkMsg)) + _queue = swap_queues[i-2] + # _queue.put((self.socket_id,timestamp,mavlinkMsg)) # 測試看看 也許不需要別的資訊 只需要封包 + _queue.put(mavlinkMsg) + + # 處理要送出的封包 # thread 結束 logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id)) @@ -262,7 +284,7 @@ class mavlink_object(): logger.error('Multiplexing List Format Error, Please check the list') return False # 若有錯誤則回傳 False - check = len(self.multiplexingToConvert) != len(converte_format_queues) + check = len(self.multiplexingToConvert) != len(swap_queues) if check: logger.error('Multiplexing Queue not fit List , Please check the list') return False @@ -274,163 +296,6 @@ class mavlink_object(): def main_node(): pass -# ====================== 分割線 ===================== -# 測試時的程式進入點 -def main_develop(args=None): - - test_item = 2 - print('test_item : ', test_item) - - if test_item == 1: - # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來 - # 只啟用了 mavlink_object 的功能 - print('Start of Program .Test 1') - connection_string="udp:127.0.0.1:14550" - mavlink_socket = mavutil.mavlink_connection(connection_string) - - mavlink_object1 = mavlink_object(mavlink_socket, 1) - mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE - mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT - mavlink_object1.multiplexingToConvert = [[74,]] # only VFR_HUD - - # 做一個空的 queue list 驗證 multiplexingToConvert - converte_format_queues.append(queue.Queue()) - # 啟動連線的模組 - mavlink_object1.run() - - # 運行幾秒並印出 queue 的資料 - start_time = time.time() - while time.time() - start_time < 2: - - while not fixed_stream_analyzer_queue.empty(): - print('fixed_stream_analyzer_queue:') - t = fixed_stream_analyzer_queue.get() - print('from {} : {}'.format(t[0],t[2])) - while not return_packet_processor_queue.empty(): - print('return_packet_processor_queue:') - t = return_packet_processor_queue.get() - # print('from {} : {}'.format(t[0],t[2])) - print(t[2].type) - - for q in converte_format_queues: - while not q.empty(): - print('converte_format_queues:') - t = q.get() - print('from {} : {}'.format(t[0],t[2])) - - # 結束程式 退出所有 thread - mavlink_object1.running = False - mavlink_object1.thread.join() - mavlink_socket.close() - print('End of Program .Test 1') - - elif test_item == 2: - # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 - # 啟用 mavlink_object 與 mavlink_analyzer的thread區塊 的功能 - print('Start of Program .Test 2') - connection_string="udp:127.0.0.1:14550" - mavlink_socket = mavutil.mavlink_connection(connection_string) - - mavlink_object2 = mavlink_object(mavlink_socket, 1) - mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT - mavlink_object2.multiplexingToReturn = [] # only HEARTBEAT - mavlink_object2.multiplexingToConvert = [] # - - # 啟動 mavlink_analyzer - analyzer = mavlink_analyzer() - - # 啟動連線的模組 - mavlink_object2.run() - - start_time = time.time() - show_time = time.time() - compid = 1 - while time.time() - start_time < 5: - if (time.time() - show_time) >= 1: - show_time = time.time() - for sysid in analyzer.mavlink_systems: - print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count) - print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count) - print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) - analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) - print("===================") - - # 印出目前所有 mavlink_systems 的內容 - print('目前所有的系統 : ') - for sysid in mavlink_systems: - print(mavlink_systems[sysid]) - - - - - # 結束程式 退出所有 thread - mavlink_object2.stop() - mavlink_object2.thread.join() - analyzer.stop() - - analyzer = mavlink_analyzer() # 這邊是測試是否只有一個 instance - analyzer.thread.join() - mavlink_socket.close() - print('End of Program .Test 2') - - elif test_item == 3: - # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 - print('Start of Program .Test 3') - rclpy.init() # 注意要初始化 rclpy 才能使用 node - connection_string="udp:127.0.0.1:14550" - mavlink_socket = mavutil.mavlink_connection(connection_string) - - mavlink_object3 = mavlink_object(mavlink_socket, 1) - mavlink_object3.multiplexingToAnalysis = [0] # only HEARTBEAT - mavlink_object3.multiplexingToReturn = [] # only HEARTBEAT - mavlink_object3.multiplexingToConvert = [] # - - # 啟動 mavlink_analyzer - analyzer = mavlink_analyzer() - - # 關於 Node 的初始化 - show_time = time.time() - analyzer._init_node() # 初始化 node - print('初始化 node 完成 耗時 : ',time.time() - show_time) - - # 啟動連線的模組 - mavlink_object3.run() - - print('waiting for mavlink data ...') - time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 - - compid = 1 - sysid = 1 - start_time = time.time() - analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) - end_time = time.time() - print(f"Execution time for create_flightMode: {end_time - start_time} seconds") - - print("start emit info") - - start_time = time.time() - show_time = time.time() - while time.time() - start_time < 20: - try: - # rclpy.spin(analyzer) - analyzer.emit_info() # 這邊是測試 node 的運行 - time.sleep(0.5) - except KeyboardInterrupt: - break - - analyzer.destroy_node() - rclpy.shutdown() - - - # 結束程式 退出所有 thread - mavlink_object3.stop() - mavlink_object3.thread.join() - analyzer.stop() - analyzer.thread.join() - - mavlink_socket.close() - print('End of Program .Test 3') if __name__ == '__main__': - import rclpy # 這邊是為了測試而加的 - main_develop() + pass