diff --git a/README.md b/README.md index 880dda1..26d7903 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,8 @@ Python 1. pymavlink -2. +2. conda-forge 中的 pyserial-asyncio +3. ROS2 1. source ~/ros2_humble/install/setup.bash diff --git a/src/fc_network_adapter/fc_network_adapter/devRun.py b/src/fc_network_adapter/fc_network_adapter/devRun.py deleted file mode 100644 index 110fb61..0000000 --- a/src/fc_network_adapter/fc_network_adapter/devRun.py +++ /dev/null @@ -1,531 +0,0 @@ -# 基礎功能的 import -import queue -import time - -# ROS2 的 import -import rclpy - -# mavlink 的 import -from pymavlink import mavutil - -# 自定義的 import -import mavlinkObject as mo -import mavlinkDevice as md - -# ====================== 分割線 ===================== - -test_item = 51 -running_time = 30 -print('test_item : ', test_item) - -if test_item == 1: - # 測試 mavlink_object 中 updateMultiplexingList 的輸出 - print('===> Start of Program .Test ', test_item) - mavlink_object_none = mo.mavlink_object(None) - - print('=====================') - print('正常範例') - mavlink_object_none.multiplexingToAnalysis = [] - mavlink_object_none.multiplexingToReturn = [] - # mavlink_object_none.multiplexingToSwap = [[]] - ret = mavlink_object_none.updateMultiplexingList() - print('execute result : ', ret) - print('multiplexingList : ', mavlink_object_none._multiplexingList) - - print('=====================') - print('錯誤範例 長度不一致') - mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],] - ret = mavlink_object_none.updateMultiplexingList() - print('execute result : ', ret) - - print('=====================') - print('正常範例') - mo.swap_queues.append(queue.Queue()) - mo.swap_queues.append(queue.Queue()) - mavlink_object_none.multiplexingToAnalysis = [0, 1, 2, 3, 4, 5] - mavlink_object_none.multiplexingToReturn = [6, 7, 8, 9, 10] - mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],] - ret = mavlink_object_none.updateMultiplexingList() - print('execute result : ', ret) - print('multiplexingList : ', mavlink_object_none._multiplexingList) - - print('=====================') - print('錯誤範例 multiplexingToAnalysis 不可以有 -1') - mavlink_object_none.multiplexingToAnalysis = [-1, 1, 2, 3, 4, 5] - ret = mavlink_object_none.updateMultiplexingList() - print('execute result : ', ret) - - print('<=== End of Program') - -elif test_item == 2: - # 測試 mavlink_object 創建時 socket_id 是否正確 - # 說實在 這個測試項 似乎因為 python 的 GC 機制 會導致難以測試 - print('===> Start of Program .Test ', test_item) - mavlink_object_none1 = mo.mavlink_object(None) - mavlink_object_none2 = mo.mavlink_object(None) - mavlink_object_none3 = mo.mavlink_object(None) - del mavlink_object_none2 - - print(len(mo.swap_queues)) - - mavlink_object_none2 = mo.mavlink_object(None) - print(len(mo.swap_queues)) - - print(mavlink_object_none1._multiplexingList) - - print('<=== End of Program') - -elif test_item == 10: - # 需要開啟一個 ardupilot 的模擬器 - # 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來 - # 只啟用了 mavlink_object 的功能 - print('===> Start of Program .Test ', test_item) - - # 創建一個空的通道 這個通道的 socket_id 是 0 - mavlink_object_none = mo.mavlink_object(None) - - # 創建另一個通道 - connection_string="udp:127.0.0.1:14550" - mavlink_socket = mavutil.mavlink_connection(connection_string) - mavlink_object1 = mo.mavlink_object(mavlink_socket) - # 設定通道轉發的參數 - mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE - mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT - mavlink_object1.multiplexingToSwap[0] = [74, ] # only VFR_HUD - - # print(mavlink_object1.multiplexingToSwap) - # print(mo.swap_queues) - - # 啟動通道 - mavlink_object1.run() - - # 確認轉拋的設定有沒有錯 - print("_multiplexingList for mavlink object :", mavlink_object1._multiplexingList) - - # 運行幾秒並印出 queue 的資料 - start_time = time.time() - while time.time() - start_time < running_time: - while not mo.fixed_stream_bridge_queue.empty(): - print('fixed_stream_bridge_queue:') - t = mo.fixed_stream_bridge_queue.get() - print('from {} : {}'.format(t[0],t[2])) - while not mo.return_packet_processor_queue.empty(): - print('return_packet_processor_queue:') - t = mo.return_packet_processor_queue.get() - print('from {} : {}'.format(t[0],t[2])) - # print(t[2].get_msgbuf()) - # print(t[2].type) - - for n in range(len(mo.swap_queues)): - q = mo.swap_queues[n] - while not q.empty(): - print('swap_queues:') - t = q.get() - print('{} gets : {}'.format(n,t)) - - - # 結束程式 退出所有 thread - mavlink_object1.running = False - mavlink_object1.thread.join() - mavlink_socket.close() - print('<=== End of Program') - -elif test_item == 11: - # 需要開啟一個 ardupilot 的模擬器 - # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 - # 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能 - print('===> Start of Program .Test ', test_item) - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - - # 創建通道 - connection_string="udp:127.0.0.1:14550" - mavlink_socket = mavutil.mavlink_connection(connection_string) - mavlink_object2 = mo.mavlink_object(mavlink_socket) - # mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT - # mavlink_object2.multiplexingToReturn = [] - - # 啟動連線的模組 - mavlink_object2.run() - - # 做點延遲 讓 heartbeat 先吃進來 - time.sleep(2) - print("=== connection established! ===") - - # 印出目前所有 mavlink_systems 的內容 - print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) - - start_time = time.time() - show_time = time.time() - compid = 1 - while time.time() - start_time < running_time: - if (time.time() - show_time) >= 1: - show_time = time.time() - for sysid in analyzer.mavlink_systems: - print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count) - print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count) - print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) - analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) - print("===================") - - # 結束程式 退出所有 thread - mavlink_object2.stop() - mavlink_object2.thread.join() - analyzer.stop() - - analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance - analyzer.thread.join() - mavlink_socket.close() - print('<=== End of Program') - -elif test_item == 12: - # 需要開啟一個 ardupilot 的模擬器 與 GCS - # 這邊是測試 mavlink object 作為交換器功能的代碼 - print('===> Start of Program .Test ', test_item) - - # 啟動連線的模組 - analyzer = mo.mavlink_bridge() - - # 初始化輸入通道 - connection_string_in="udp:127.0.0.1:15551" - mavlink_socket_in = mavutil.mavlink_connection(connection_string_in) - mavlink_object_in = mo.mavlink_object(mavlink_socket_in) - mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT - - # 初始化輸出通道 - connection_string_out="udpout:127.0.0.1:14553" - mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) - mavlink_object_out = mo.mavlink_object(mavlink_socket_out) - mavlink_object_out.multiplexingToAnalysis = [0] - - # 做一個空的通道驗證 可以拿來 debug - mavlink_object_none = mo.mavlink_object(None) - - # 讓兩個通道互相傳輸 - mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all - mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all - # mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all - - # 啟動通道 - mavlink_object_in.run() - mavlink_object_out.run() - - # 做點延遲 讓 heartbeat 先吃進來 - time.sleep(3) - print("=== connection established! ===") - - print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) - - # print(type(mavlink_socket_out)) - # print(type(mavlink_socket_out.mav)) - - start_time = time.time() - show_time = time.time() - while time.time() - start_time < running_time: - try: - test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False) - print('none object : ', test) - except queue.Empty: - pass - - if (time.time() - show_time) >= 2: - show_time = time.time() - for sysid in analyzer.mavlink_systems: - for compid in analyzer.mavlink_systems[sysid].components: - print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) - analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) - print("===================") - - - - # 結束程式 退出所有 thread - mavlink_object_in.stop() - mavlink_object_in.thread.join() - mavlink_object_out.stop() - mavlink_object_out.thread.join() - mavlink_socket_in.close() - mavlink_socket_out.close() - - analyzer.stop() - print('<=== End of Program') - -elif test_item == 20: - # 這邊測試 node 生成 topic 的功能 - # 利用 空的通道 發出假的 heartbeat 封包 - print('===> Start of Program .Test ', test_item) - rclpy.init() # 注意要初始化 rclpy 才能使用 node - - from pymavlink.dialects.v20 import common as mavlink2 - - sysid = 1 - compid = 1 - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - - mav = mavlink2.MAVLink(None) - mav.srcSystem = sysid - mav.srcComponent = compid - - # 這是一個 heartbeat 封包 - fake_heartbeat = mavlink2.MAVLink_heartbeat_message( - type=mavlink2.MAV_TYPE_QUADROTOR, - autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA, - base_mode=3, - custom_mode=0, - system_status=0, - mavlink_version=3 - ) - mavlink_bytes = fake_heartbeat.pack(mav) - fake_heartbeat_msg = mav.parse_char(mavlink_bytes) - - print(analyzer.mavlink_systems) - - mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data - time.sleep(0.1) - print(analyzer.mavlink_systems) - - # ROS2 初始化 - analyzer._init_node() - - # 創建 ROS2 topic - analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid - print("topic created") - time.sleep(5) - - # 丟出 topic - analyzer.emit_info() - - # 結束程式 - analyzer.running = False - analyzer.destroy_node() - - rclpy.shutdown() - analyzer.stop() - analyzer.thread.join() - print('<=== End of Program') - -elif test_item == 21: - # 需要開啟一個 ardupilot 的模擬器 - # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 - - print('===> Start of Program .Test ', test_item) - rclpy.init() # 注意要初始化 rclpy 才能使用 node - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - # 關於 Node 的初始化 - show_time = time.time() - analyzer._init_node() # 初始化 node - print('初始化 node 完成 耗時 : ',time.time() - show_time) - - # 創建通道 - connection_string="udp:127.0.0.1:15551" - mavlink_socket = mavutil.mavlink_connection(connection_string) - mavlink_object3 = mo.mavlink_object(mavlink_socket) - # 啟動通道 - mavlink_object3.run() - - - print('=== waiting for mavlink data ...') - time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 - - print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) - - compid = 1 - sysid = 1 - start_time = time.time() - analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) - end_time = time.time() - print(f"Execution time for create_flightMode: {end_time - start_time} seconds") - - print("start emit info") - - start_time = time.time() - show_time = time.time() - while time.time() - start_time < running_time: - try: - # rclpy.spin(analyzer) - analyzer.emit_info() # 這邊是測試 node 的運行 - time.sleep(1) - except KeyboardInterrupt: - break - - - # 程式結束 - analyzer.destroy_node() - rclpy.shutdown() - - # 結束程式 退出所有 thread - mavlink_object3.stop() - mavlink_object3.thread.join() - analyzer.stop() - analyzer.thread.join() - - mavlink_socket.close() - print('<=== End of Program') - -elif test_item == 22: - # 需要開啟一個 ardupilot 的模擬器 與 GCS - # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 - print('===> Start of Program .Test ', test_item) - rclpy.init() # 注意要初始化 rclpy 才能使用 node - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - # 關於 Node 的初始化 - show_time = time.time() - analyzer._init_node() # 初始化 node - print('初始化 node 完成 耗時 : ',time.time() - show_time) - - # 初始化兩個通道 - connection_string_in="udp:127.0.0.1:15551" - mavlink_socket_in = mavutil.mavlink_connection(connection_string_in) - mavlink_object_in = mo.mavlink_object(mavlink_socket_in) - mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] - - - connection_string_out="udpout:127.0.0.1:14553" - mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) - mavlink_object_out = mo.mavlink_object(mavlink_socket_out) - mavlink_object_out.multiplexingToAnalysis = [] - - - - # 讓兩個通道互相傳輸 - mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all - mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all - - # 啟動通道 - mavlink_object_in.run() - mavlink_object_out.run() - - print('waiting for mavlink data ...') - time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 - - print('目前所有的系統 : ') - for sysid in analyzer.mavlink_systems: - print(analyzer.mavlink_systems[sysid]) - - compid = 1 - sysid = 1 - show_time = time.time() - analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) - print(f"Execution time for create_flightMode: {time.time() - show_time} seconds") - - print("start emit info") - - start_time = time.time() - show_time = time.time() - show_time2 = time.time() - - while time.time() - start_time < running_time: - if (time.time() - show_time2) >= 1: - analyzer.emit_info() # 這邊是測試 node 的運行 - # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode'] - fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode'] - sss = mavutil.mode_string_v10(fmsg) - # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq())) - print("目前的飛行模式 : {}".format(sss)) - show_time2 = time.time() - - # if (time.time() - show_time) >= 2: - # show_time = time.time() - # for sysid in analyzer.mavlink_systems: - # for compid in analyzer.mavlink_systems[sysid].components: - # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) - # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) - # print("===================") - - - - analyzer.destroy_node() - rclpy.shutdown() - - - # 結束程式 退出所有 thread - mavlink_object_in.stop() - mavlink_object_in.thread.join() - mavlink_socket_in.close() - mavlink_object_out.stop() - mavlink_object_out.thread.join() - mavlink_socket_out.close() - analyzer.stop() - analyzer.thread.join() - - - print('<=== End of Program') - -elif test_item == 51: - - # 晉凱的測試項目 - print('===> Start of Program .Test ', test_item) - rclpy.init() # 注意要初始化 rclpy 才能使用 node - - # 啟動 mavlink_bridge - analyzer = mo.mavlink_bridge() - # 關於 Node 的初始化 - show_time = time.time() - analyzer._init_node() # 初始化 node - print('初始化 node 完成 耗時 : ',time.time() - show_time) - - # 創建通道 - connection_string="udp:127.0.0.1:15551" - mavlink_socket3 = mavutil.mavlink_connection(connection_string) - mavlink_object3 = mo.mavlink_object(mavlink_socket3) - # 設定通道流動 - mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] - mavlink_object3.multiplexingToReturn = [] # - # mavlink_object3.multiplexingToSwap = [] # - # 啟動通道 - mavlink_object3.run() - - - - print('waiting for mavlink data ...') - time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 - - compid = 1 - sysid = 1 - start_time = time.time() - analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid]) - analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid]) - end_time = time.time() - print(f"Execution time for create all topic: {end_time - start_time} seconds") - - print("start emit info") - - start_time = time.time() - show_time = time.time() - while time.time() - start_time < running_time: - try: - # rclpy.spin(analyzer) - analyzer.emit_info() # 這邊是測試 node 的運行 - time.sleep(0.5) - except KeyboardInterrupt: - break - - analyzer.destroy_node() - rclpy.shutdown() - - # 結束程式 退出所有 thread - mavlink_object3.stop() - mavlink_object3.thread.join() - analyzer.stop() - analyzer.thread.join() - - mavlink_socket3.close() - print('<=== End of Program') \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py index 4b2b07e..47a1c92 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py @@ -1,6 +1,6 @@ # 自定義的 import -from theLogger import setup_logger +from .theLogger import setup_logger # ====================== 分割線 ===================== @@ -31,6 +31,11 @@ class mavlink_device(): # p_str += '=====================\n' return p_str + ''' + 寫了半天 這個功能應該是 pymalink 本來就有的 + 去找 pymavlink_util.py 的 mavfile(object) + 算了 先擺著吧 之後再看看怎麼整合 + ''' def updateComponentPacketCount(self, compid, current_seq, current_type, current_time): # 這段檢查遺失封包 try: diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index 072d861..d0231ed 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -1,8 +1,7 @@ ''' # 不同的匯流排只有單一種通訊協定 -# 匯流排接到訊息後透過 queue stack 傳送到橋接器 -# 會有三種 Queue 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 3. 轉拋串流 -# 第三種比較麻煩 會需要用 list 管理多個轉換器的 queue +# 匯流排接到訊息後透過 ring buffer 傳送到橋接器 +# 會有兩種 RingBuffer 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 # 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去 # 關於 mavlink 採用 pymavlink 這個套件 製作匯流排 @@ -13,8 +12,11 @@ # 基礎功能的 import import threading -import queue +# import queue import time +import asyncio +from enum import Enum, auto +# from typing import Dict, List, Optional, Set, Any, Tuple # mavlink 的 import from pymavlink import mavutil @@ -23,38 +25,17 @@ from pymavlink import mavutil from rclpy.node import Node # 自定義的 import -from mavlinkDevice import mavlink_device -from mavlinkPublish import mavlink_publisher -from theLogger import setup_logger - +from .mavlinkDevice import mavlink_device, mavlink_systems +from .mavlinkPublish import mavlink_publisher +from .theLogger import setup_logger +from .ringBuffer import RingBuffer # ====================== 分割線 ===================== logger = setup_logger("mavlinkObject.py") -# 基礎功能的 import -import threading -import queue -import time - -# mavlink 的 import -from pymavlink import mavutil - -# ROS2 的 import -from rclpy.node import Node - -# 自定義的 import -from mavlinkDevice import mavlink_device, mavlink_systems -from mavlinkPublish import mavlink_publisher -from theLogger import setup_logger - -# ====================== 分割線 ===================== - -logger = setup_logger("mavlinkObject.py") - -fixed_stream_bridge_queue = queue.Queue() -return_packet_processor_queue = queue.Queue() -swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer +stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255) +return_packet_ring = RingBuffer(capacity=256, buffer_id=254) # ====================== 分割線 ===================== @@ -75,6 +56,8 @@ class mavlink_bridge(Node, mavlink_publisher): node 區塊則是處理 ros2 的 publisher 和 subscriber 訂閱相關 藉由控制 ros2 的機制再把 device object 的資訊發送出去 + fixed_stream_bridge_queue 置換成 stream_bridge_ring + ps. 我限制了這個 class 只能有一個 instance ''' _instance = None @@ -106,13 +89,25 @@ class mavlink_bridge(Node, mavlink_publisher): # === Thread 區塊 === def _run_thread(self): - # start_time = time.time() # debug + start_time = time.time() # debug + run_loop_count = 0 # debug logger.info('Start of mavlink_bridge._run_thread') - # 從 Queue fixed_stream_bridge_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理 + # 從 Queue stream_bridge_ring 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理 while self.running: - if fixed_stream_bridge_queue.empty(): + # # 這個迴圈每秒鐘被執行了幾輪? # 這整段都是 debug 用的 + # current_time = time.time() + # if (current_time - start_time) >= 1.0: + # logger.info(f'mavlink_bridge._run_thread loop count: {run_loop_count}') + # run_loop_count = 0 + # start_time = current_time + # else: + # run_loop_count += 1 + # # ========================= + + + if stream_bridge_ring.is_empty(): continue - msg_pack = fixed_stream_bridge_queue.get() + msg_pack = stream_bridge_ring.get() msg = msg_pack[2] sysid = msg.get_srcSystem() @@ -150,7 +145,7 @@ class mavlink_bridge(Node, mavlink_publisher): this_component.emitParams['flightMode'] = msg # debug # print("mav_type : ", msg.type) # debug - # print("get mode :", mavutil.mode_string_v10(msg)) # debug + print("get mode :", mavutil.mode_string_v10(msg)) # debug # print("record mode :", this_component.emitParams['flightMode_mode']) # debug elif msg_id == 30: # ATTITUDE 處理 @@ -201,24 +196,45 @@ class mavlink_bridge(Node, mavlink_publisher): # ====================== 分割線 ===================== -class mavlink_object(): +''' +本次改版的暫時記錄註解於此 +1. mavlink_object 中 由於 Queue 的效能太差 會完全移除 + 其中 multiplexingToAnalysis multiplexingToReturn 的功能會改用 ring_buffer 來實現 + 而 multiplexingToSwap 會完全被移除代替方式下一條描述 +2. mavlink_object 會捨棄每個通道單獨 thread 的實現 轉而採用 asyncio 的方式 將需要資料轉換的通道 以群組方式處理其數據流 + (註解: 因為本專案的規模還不大 目前不做動態分配 asyncio thread 而是簡單的採用單一 thread 處理所有的 mavlink_object) + 因此 資料轉換直接使用通道的 socket 寫出 進而節省任何資料的複製與搬移 + 並且 完全捨棄 multiplexingToSwap 不會再對需要轉傳的資料進行過濾 而是將全部的 mavlink_msg 直接 socket 透過寫出 +3. mavlink_object 需要加上 state 去管理其狀態 +4. mavlink_object 需要加上 target port 去管理寫出的目標 +5. mavlink_object 要容忍髒資料流入 而不是直接關閉通道 +6. 基於第1,2項 updateMultiplexingList 會被完全移除 +7. 基於第2項 需要新建一個 async_io_manager 類別去管理所有的 mavlink_object +8. 基於第1項全域變數 fixed_stream_bridge_queue return_packet_processor_queue 會用 stream_bridge_ring 與 return_packet_ring 來取代 另外 swap_queues 會被完全移除 + +''' + +# 定義 mavlink_object 的狀態 +class MavlinkObjectState(Enum): + INIT = auto() # 初始化狀態 + RUNNING = auto() # 運行中狀態 + ERROR = auto() # 錯誤狀態 + CLOSED = auto() # 已關閉狀態 + +class mavlink_object: ''' 每個 mavlink bus 都會有一個 mavlink_object - 其中主要是 thread 做統計封包與分流 - 分流表的控制在三個 list 分別為 - multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的 - multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的 - multiplexingToSwap : 這個 list 是用來分流到轉拋串流的 - 透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為 + 使用 asyncio 處理資料流 用 RingBuffer 來分配訊息 + 直接透過 socket 寫出 ''' - mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) } - socket_num = 0 # 用來記錄目前的 socket 數量 + mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) } + socket_num = 0 # 用來記錄目前的 socket 數量 + def __new__(cls, *args, **kwargs): # 每創建一個實例 就將其添加到 mavlinkObjects 列表中 # 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號 # 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號 # socket_id 從 0 開始 - instance = super().__new__(cls) socket_id = 0 for k in cls.mavlinkObjects.keys(): @@ -232,183 +248,389 @@ class mavlink_object(): return instance def __init__(self, socket): + # 登入所需的 socket self.mavlink_socket = socket - # 這邊變數是執行的時候被使用的 不要直接寫入它 - self._multiplexingList = [] - # 存放要發送的訊息的 queue 或稱 buffer - self.output_buffer = queue.Queue() - if self.socket_id >= len(swap_queues): - swap_queues.append(self.output_buffer) - else: - swap_queues[self.socket_id] = self.output_buffer - + + # 存放目前是否分流到固定串流橋接器和回傳封包處理器的標誌 # 都是要做的 把這個判斷拿掉 + # self.send_to_bridge = True + # self.send_to_return = False + + # 用於主線程發送消息的緩衝區 + self.outgoing_msg_lock = threading.Lock() + self.outgoing_msgs = [] + + # 記錄訊息過濾類型 (可選) + self.bridge_msg_types = [0] # 默認只處理 HEARTBEAT + self.return_msg_types = [] + + # 目標端口 + self.target_sockets = [] + # 關聯到全域變數 global mavlink_systems self.mavlink_systems = mavlink_systems - - # 這三個 list 用來分配不同的訊息到不同的 queue - self.multiplexingToAnalysis = [ - 0, # HEARTBEAT # 挺必要的項目 - # 24, # GPS_RAW_INT - # 30, # ATTITUDE - # 33, # GLOBAL_POSITION_INT - # 74, # VFR_HUD - # 147, # BATTERY_STATUS - ] - self.multiplexingToReturn = [] - self.multiplexingToSwap = [ - [] for _ in range(len(swap_queues)) - ] - - # 刷新其他 mavlink_object 的 multiplexingToSwap - for k, object in self.mavlinkObjects.items(): - if (k != self.socket_id) and (len(object.multiplexingToSwap) <= self.socket_id): - object.multiplexingToSwap.append([]) - object.updateMultiplexingList() - - logger.info('mavlink_object instance {} created'.format(self.socket_id)) + + # 物件變數 + self.task = None # Task reference + self.dirtyDataCount = 0 # 髒資料計數器 + self.dirtyDataMax = 10 # 髒資料容許閾值 + + self.state = MavlinkObjectState.INIT + logger.info(f'mavlink_object instance {self.socket_id} created') def __del__(self): - # 停下自己的 thread - if self.mavlink_socket != None: - self.mavlink_socket.close() - self.stop() - - # 移除其他 mavlink_object 的 multiplexingToSwap - for k, object in self.mavlinkObjects.items(): - if (k != self.socket_id) and (len(object.multiplexingToSwap) > self.socket_id): - object.multiplexingToSwap[self.socket_id] = [] - object.updateMultiplexingList() - - # 移除自己的 swap_queues - swap_queues[self.socket_id] = None - + # 停止 asyncio task + self.stop() + + # 關閉 socket + if hasattr(self, 'mavlink_socket') and self.mavlink_socket: + try: + self.mavlink_socket.close() + except: + pass + # 處理 class 的 instance 記錄 - self.socket_num -= 1 - self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance - - # logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替 - # print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug - # logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) + if hasattr(self.__class__, 'socket_num'): + self.__class__.socket_num -= 1 + + if hasattr(self.__class__, 'mavlinkObjects') and hasattr(self, 'socket_id'): + if self.socket_id in self.__class__.mavlinkObjects: + self.__class__.mavlinkObjects.pop(self.socket_id) + + # 這段不知道怎麼了 反正會一直讓 logger ERROR 我先關掉 # try: - # logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) + # logger.info(f'mavlink_object instance {self.socket_id} deleted') # except Exception as e: # print(f"Error logging in __del__: {e}") - # if 'logger' in globals() and logger: - # logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) - def run(self): - # TODO 檢查 socket 是否有連線 - self.thread = threading.Thread(target=self._run_thread) - self.running = self.updateMultiplexingList() - self.thread.start() + def start(self): + """啟動 mavlink_object 處理循環""" + if self.state == MavlinkObjectState.RUNNING: + logger.warning(f"mavlink_object {self.socket_id} is already running") + return + + self.state = MavlinkObjectState.RUNNING + # 實際的啟動會由 async_io_manager 處理 def stop(self): - self.running = False - - def _run_thread(self): - logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id)) - logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id)) - start_time = time.time() - while self.running: - timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差 - - # 處理接收到的封包 + """停止 mavlink_object 處理循環""" + if self.state in (MavlinkObjectState.CLOSED, MavlinkObjectState.ERROR): + return + + self.state = MavlinkObjectState.CLOSED + if self.task: + if not self.task.done(): + self.task.cancel() + + async def process_data(self): + """處理 mavlink 數據的主要 asyncio 協程""" + logger.info(f'Start of mavlink_object.process_data id: {self.socket_id}') + + while self.state == MavlinkObjectState.RUNNING: + timestamp = time.time() try: + # 處理接收到的封包 mavlinkMsg = self.mavlink_socket.recv_msg() except Exception as e: - logger.critical(f"Receiving data not mavlink format. Object Delete.") - logger.critical(f"Receiving data not mavlink format. Object Delete.") - print(f"[mavlinkObject.py] Error receiving mavlink data: {e}") - print(mavlinkMsg) - self.running = False - break - - if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega. etc...' - # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間 - sysid = mavlinkMsg.get_srcSystem() - compid = mavlinkMsg.get_srcComponent() - if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息 - # mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型 - mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) - - # break # Debug function can put here you can see the input data from mavlink - - # 將訊息依照 multiplexing list 分發到不同的 queue - for i in range(len(self._multiplexingList)): - if (self._multiplexingList[i] == []): - continue - elif (mavlinkMsg.get_msgId() in self._multiplexingList[i]) or (self._multiplexingList[i][0] == -1): - if i == 0: - fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg)) - elif i == 1: - return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg)) - else: - _queue = swap_queues[i-2] - # _queue.put((self.socket_id,timestamp,mavlinkMsg)) # 測試看看 也許不需要別的資訊 只需要封包 - _queue.put(mavlinkMsg) - - # 處理要送出的封包 - # 如果 有資料在 output_buffer 中則將其取出並發送 - # 沒有就略過發送 + logger.warning(f"Error in mavlink_object.process_data for id {self.socket_id}: {e}") + self.dirtyDataCount += 1 + + if self.dirtyDataCount >= self.dirtyDataMax: + logger.error(f"Too many dirty data received in mavlink_object {self.socket_id}, entering ERROR state") + self.state = MavlinkObjectState.ERROR + # 不直接退出,嘗試容忍髒數據 + await asyncio.sleep(0.01) # 短暫暫停 + continue + try: - mavlinkMsg_send = self.output_buffer.get(block=False) - except queue.Empty: - mavlinkMsg_send = None - # except Exception as e: - # logger.error(f"Error getting data from output_buffer: {e}") - # mavlinkMsg_send = None - - if mavlinkMsg_send: - # self.mavlink_socket.mav.send(mavlinkMsg_send) - self.mavlink_socket.write(mavlinkMsg_send.get_msgbuf()) # 這邊會將封包寫入 socket 中 - - # thread 結束 - logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id)) - - def updateMultiplexingList(self): - ''' - 更新 multiplexing list 並做簡單的檢查 - ''' - # 檢查 multiplexingToAnalysis 與 multiplexingToReturn 是否有 -1 值 - check = (-1 in self.multiplexingToAnalysis) or (-1 in self.multiplexingToReturn) - if check: - logger.error('MultiplexingToAnalysis or MultiplexingToReturn NOT all type spilt. socket id : {}'.format(self.socket_id)) - return False - - # 檢查 multiplexingToSwap 與 swap_queues 的長度是否一致 而且 swap_queues 的長度不能為 0 - check = len(self.multiplexingToSwap) != len(swap_queues) or len(swap_queues) == 0 - if check: - logger.error('Multiplexing Queue not fit List , Please check the list. socket id : {}'.format(self.socket_id)) + if mavlinkMsg: + # 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間 + sysid = mavlinkMsg.get_srcSystem() + compid = mavlinkMsg.get_srcComponent() + + if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息 + self.mavlink_systems[sysid].updateComponentPacketCount( + compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp) #TODO 這邊怪怪的 好像要再檢查 + + # 分發訊息到 RingBuffer + msg_id = mavlinkMsg.get_msgId() + + # if self.send_to_bridge and (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types): + if (msg_id in self.bridge_msg_types or -1 in self.bridge_msg_types): + stream_bridge_ring.put((self.socket_id, timestamp, mavlinkMsg)) + + # if self.send_to_return and (msg_id in self.return_msg_types or -1 in self.return_msg_types): + if (msg_id in self.return_msg_types or -1 in self.return_msg_types): + return_packet_ring.put((self.socket_id, timestamp, mavlinkMsg)) + + # 將接收到的訊息轉發給所有目標端口 + for target_port in self.target_sockets: + if target_port != self.socket_id and target_port in self.mavlinkObjects: + target_obj = self.mavlinkObjects[target_port] + if target_obj.state == MavlinkObjectState.RUNNING: + try: + target_obj.mavlink_socket.write(mavlinkMsg.get_msgbuf()) + except Exception as e: + logger.error(f"Error forwarding message to port {target_port}: {e}") + + with self.outgoing_msg_lock: + if self.outgoing_messages and (send_msg := self.outgoing_messages.pop(0)): + try: + self.mavlink_socket.write(send_msg) + except Exception as e: + logger.error(f"mavlink_object {self.socket_id} failed to send message: {e}") + + # 這邊的重點不是延遲 而是透過 await 讓出 event loop 的控制權 + await asyncio.sleep(0.001) + + except asyncio.CancelledError: + logger.info(f'mavlink_object.process_data for id {self.socket_id} was cancelled') + break + except Exception as e: + await asyncio.sleep(0.01) # 短暫暫停避免CPU過載 + + + logger.info(f'End of mavlink_object.process_data id: {self.socket_id}') + self.state = MavlinkObjectState.CLOSED + + def add_target_socket(self, target_socket_id): + """添加一個目標端口用於轉發""" + if target_socket_id not in self.target_sockets and target_socket_id != self.socket_id: + self.target_sockets.append(target_socket_id) + logger.info(f"mavlink_object Added target port {target_socket_id} to mavlink_object {self.socket_id}") + return True + return False + + def remove_target_socket(self, target_socket_id): + """移除目標端口""" + if target_socket_id in self.target_sockets: + self.target_sockets.remove(target_socket_id) + logger.info(f"mavlink_object Removed target port {target_socket_id} from mavlink_object {self.socket_id}") + return True + return False + + def set_bridge_message_types(self, msg_types): + """設置需要分流到橋接器的訊息類型""" + if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types): + self.bridge_msg_types = msg_types + return True + logger.error(f"Invalid bridge message types: {msg_types}") + return False + + def set_return_message_types(self, msg_types): + """設置需要分流到回傳處理器的訊息類型""" + if isinstance(msg_types, list) and all(isinstance(t, int) for t in msg_types): + self.return_msg_types = msg_types + return True + logger.error(f"Invalid return message types: {msg_types}") + return False + + def send_message(self, message_bytes): + """ + 從主線程向此 mavlink_object 的 socket 發送數據 + 將數據添加到簡單的列表中,由 asyncio 任務處理 + + Args: + message_bytes: 要發送的 mavlink 消息的字節數據 + + Returns: + bool: 是否成功添加消息到列表 + """ + if self.state != MavlinkObjectState.RUNNING: + logger.warning(f"Cannot send message: mavlink_object {self.socket_id} is not running") return False - - # 對應到自己的 multiplexingToSwap 必需為空 避免對自己迴圈轉拋 + try: - self.multiplexingToSwap[self.socket_id] = [] - except IndexError: - logger.error('Multiplexing List of self socket id should be void. socket id : {}'.format(self.socket_id)) - return False - - # 組合 multiplexing list - multiL_tmp = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToSwap - - # 檢查 multiplexing list 格式是否有錯誤 # 全部都要是 list 每個 list 裡面都要是 int - check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in multiL_tmp) - if not check: - logger.error('Multiplexing List Format Error, Please check the list. socket id : {}'.format(self.socket_id)) - return False # 若有錯誤則回傳 False - - # 更新 multiplexing list - self._multiplexingList = multiL_tmp + # 使用鎖保護共享資源訪問 + with self.outgoing_messages_lock: + self.outgoing_messages.append(message_bytes) + return True + except Exception as e: + logger.error(f"Error queueing message for mavlink_object {self.socket_id}: {e}") + return False + + # def enable_bridge(self, enable=True): + # """啟用或禁用橋接器分流""" + # self.send_to_bridge = enable + + # def enable_return(self, enable=True): + # """啟用或禁用回傳處理器分流""" + # self.send_to_return = enable + +class async_io_manager: + """ + 管理所有 mavlink_object 實例的 asyncio 任務 + 提供單一線程來處理所有 mavlink 通道的數據 + """ + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + with cls._lock: + if cls._instance is None: + cls._instance = super(async_io_manager, cls).__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, 'initialized'): + self.initialized = True + self.loop = None + self.main_task = None + self.running = False + self.managed_objects = {} # socket_id: task + + def start(self): + """啟動 async_io_manager 和其管理的所有 mavlink_object""" + if self.running: + logger.warning("async_io_manager already running") + return + + self.running = True + self.thread = threading.Thread(target=self._run_event_loop) + self.thread.start() + logger.info("async_io_manager started") + + def stop(self): + """停止 async_io_manager 和其管理的所有 mavlink_object""" + if not self.running: + return + + self.running = False + + if self.loop: + # 取消所有任務 + for socket_id in list(self.managed_objects.keys()): + self.remove_mavlink_object(socket_id) + + # 停止事件循環 + if self.main_task and not self.main_task.done(): + asyncio.run_coroutine_threadsafe(self._shutdown(), self.loop) + + # 等待線程結束 + if hasattr(self, 'thread') and self.thread.is_alive(): + self.thread.join(timeout=5.0) + + logger.info("async_io_manager stopped") + + async def _shutdown(self): + """正確關閉事件循環的協程""" + tasks = [task for task in asyncio.all_tasks(self.loop) if task is not asyncio.current_task()] + + for task in tasks: + task.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + self.loop.stop() + + def _run_event_loop(self): + """在獨立線程中運行事件循環""" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + try: + self.main_task = self.loop.create_task(self._main_task()) + self.loop.run_forever() + except Exception as e: + logger.error(f"Error in async_io_manager event loop: {e}") + finally: + self.loop.close() + self.loop = None + self.running = False + logger.info("async_io_manager event loop ended") + + async def _main_task(self): + """主任務協程,用於監視和管理其他任務""" + logger.info("async_io_manager main task started") + + try: + while self.running: + # 這邊就是一個無窮迴圈 確保 async_io_manager 物件續存 + # 每秒鐘檢查並移除已完成或已取消的任務 + await asyncio.sleep(1.0) + for socket_id in list(self.managed_objects.keys()): + task = self.managed_objects[socket_id] + + if task.done(): + try: + exc = task.exception() + if exc: + logger.error(f"Task for mavlink_object {socket_id} raised exception: {exc}") + except (asyncio.CancelledError, asyncio.InvalidStateError): + pass + + if socket_id in mavlink_object.mavlinkObjects: + print(f"this is manager, i make socket_id {socket_id} closed, he is now in {mavlink_object.mavlinkObjects[socket_id].state} state.") # debug + mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED + + del self.managed_objects[socket_id] + + + + except asyncio.CancelledError: + logger.info("async_io_manager main task was cancelled") + except Exception as e: + logger.error(f"Error in async_io_manager main task: {e}") + + logger.info("async_io_manager main task ended") + + def add_mavlink_object(self, mavlink_obj): + """添加一個 mavlink_object 實例到 async_io_manager 並啟動其處理任務""" + if not self.running: + logger.error("Cannot add mavlink_object: async_io_manager is not running") + return False + + if not isinstance(mavlink_obj, mavlink_object): + logger.error(f"Invalid mavlink_object: {mavlink_obj}") + return False + + socket_id = mavlink_obj.socket_id + + if socket_id in self.managed_objects: + logger.warning(f"mavlink_object {socket_id} already managed") + return False + + # 創建並啟動任務 + if self.loop: + # task = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop).result() + future = asyncio.run_coroutine_threadsafe(mavlink_obj.process_data(), self.loop) + + # print(f"Task created for mavlink_object {socket_id}: {future1}") # debug + # task = future1.result() + self.managed_objects[socket_id] = future + mavlink_obj.task = future + mavlink_obj.state = MavlinkObjectState.RUNNING + logger.info(f"Added mavlink_object {socket_id} to async_io_manager") + return True + + return False + + def remove_mavlink_object(self, socket_id): + """從 async_io_manager 中移除一個 mavlink_object 實例並取消其處理任務""" + if socket_id not in self.managed_objects: + logger.warning(f"mavlink_object {socket_id} not managed by async_io_manager") + return False + + # 取消任務 + task = self.managed_objects[socket_id] + if not task.done(): + task.cancel() + + # 從管理列表中移除 + del self.managed_objects[socket_id] + + # 更新 mavlink_object 狀態 + if socket_id in mavlink_object.mavlinkObjects: + mavlink_object.mavlinkObjects[socket_id].state = MavlinkObjectState.CLOSED + + logger.info(f"Removed mavlink_object {socket_id} from async_io_manager") return True + + def get_managed_objects(self): + """獲取所有被管理的對象列表""" + return list(self.managed_objects.keys()) # ====================== 分割線 ===================== -# 整合到 ros2 之後的程式進入點 -def main_node(): - pass - - if __name__ == '__main__': pass diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py index 028e3ee..727e47f 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py @@ -9,14 +9,16 @@ publisher topic name 命名規則為 <前綴詞>/s/<具體 topic> ''' +# ROS2 的 import import std_msgs.msg import sensor_msgs.msg import geometry_msgs.msg import mavros_msgs.msg -from theLogger import setup_logger import math +# 自定義的 import +from .theLogger import setup_logger logger = setup_logger("mavlinkPublish.py") diff --git a/src/fc_network_adapter/fc_network_adapter/ringBuffer.py b/src/fc_network_adapter/fc_network_adapter/ringBuffer.py new file mode 100644 index 0000000..6728426 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/ringBuffer.py @@ -0,0 +1,231 @@ +# import array +import threading +import ctypes +from typing import Any, Optional, Tuple + +class RingBuffer: + """ + 高效能無鎖環形緩衝區,使用原子操作避免鎖定 + 用於在不同執行緒間高效傳遞資料 + """ + # 緩衝區計數器,用於自動分配 buffer_id + _buffer_counter = 0 + _counter_lock = threading.Lock() + + def __init__(self, capacity: int = 256, buffer_id: int = None): + """ + 初始化環形緩衝區 + + Args: + capacity: 緩衝區容量 (必須是 2 的次方) + buffer_id: 緩衝區 ID,如果為 None 則自動分配 + """ + # 確保容量是 2 的次方,便於使用位運算取模 + if capacity & (capacity - 1) != 0: + # 找到大於等於 capacity 的最小 2 的次方 + capacity = 1 << (capacity - 1).bit_length() + + # 分配緩衝區 ID + if buffer_id is None: + with RingBuffer._counter_lock: + buffer_id = RingBuffer._buffer_counter + RingBuffer._buffer_counter += 1 + + self.buffer_id = buffer_id + self.capacity = capacity + self.mask = capacity - 1 # 用於快速取模 + self.buffer = [None] * capacity + + # 原子計數器作為讀寫指標 + self.write_index = ctypes.c_uint64(0) + self.read_index = ctypes.c_uint64(0) + + # 用於檢測上次操作的執行緒 ID + self._last_read_thread = None + self._last_write_thread = None + + # 添加同時讀寫統計 + self.concurrent_write_count = ctypes.c_uint64(0) # 同時寫入計數 + self.concurrent_read_count = ctypes.c_uint64(0) # 同時讀取計數 + self.total_write_count = ctypes.c_uint64(0) # 總寫入操作計數 + self.total_read_count = ctypes.c_uint64(0) # 總讀取操作計數 + self.overflow_count = ctypes.c_uint64(0) # 緩衝區溢出次數 + + # 記錄各執行緒的操作次數 + self.thread_write_counts = {} # {thread_id: count} + self.thread_read_counts = {} # {thread_id: count} + + # 用於保護統計數據的鎖(僅用於統計,不影響主要讀寫操作) + self._stats_lock = threading.Lock() + + def put(self, item: Any) -> bool: + """ + 將項目存入緩衝區 + + Args: + item: 要存入的項目 + + Returns: + bool: 成功寫入返回 True,緩衝區已滿返回 False + """ + # 更新寫入統計 + self.total_write_count.value += 1 + + # 檢測上次寫入的執行緒 + current_thread = threading.get_ident() + + # 記錄當前執行緒寫入次數 + with self._stats_lock: + self.thread_write_counts[current_thread] = self.thread_write_counts.get(current_thread, 0) + 1 + + # 檢測是否為不同執行緒同時寫入 + if self._last_write_thread is not None and current_thread != self._last_write_thread: + self.concurrent_write_count.value += 1 + + self._last_write_thread = current_thread + + # 原子獲取當前寫入位置 + current = self.write_index.value + next_pos = (current + 1) & self.mask + + # 檢查緩衝區是否已滿 + if next_pos == self.read_index.value: + self.overflow_count.value += 1 + return False + + # 寫入資料並原子更新寫入位置 + self.buffer[current] = item + self.write_index.value = next_pos + return True + + def get(self) -> Optional[Any]: + """ + 從緩衝區讀取項目 + + Returns: + 項目或 None(如果緩衝區為空) + """ + # 更新讀取統計 + self.total_read_count.value += 1 + + # 檢測上次讀取的執行緒 + current_thread = threading.get_ident() + + # 記錄當前執行緒讀取次數 + with self._stats_lock: + self.thread_read_counts[current_thread] = self.thread_read_counts.get(current_thread, 0) + 1 + + # 檢測是否為不同執行緒同時讀取 + if self._last_read_thread is not None and current_thread != self._last_read_thread: + self.concurrent_read_count.value += 1 + + self._last_read_thread = current_thread + + # 檢查緩衝區是否為空 + if self.read_index.value == self.write_index.value: + return None + + # 原子獲取當前讀取位置並讀取資料 + current = self.read_index.value + item = self.buffer[current] + + # 原子更新讀取位置 + self.read_index.value = (current + 1) & self.mask + return item + + def get_all(self) -> list: + """ + 獲取緩衝區中的所有項目 + + Returns: + list: 所有可用項目的列表 + """ + items = [] + while True: + item = self.get() + if item is None: + break + items.append(item) + return items + + def size(self) -> int: + # 返回目前緩衝區內項目數量 + return (self.write_index.value - self.read_index.value) & self.mask + + def is_empty(self) -> bool: + # 檢查緩衝區是否為空 + return self.read_index.value == self.write_index.value + + def is_full(self) -> bool: + # 檢查緩衝區是否已滿 + return ((self.write_index.value + 1) & self.mask) == self.read_index.value + + def clear(self) -> None: + """清空緩衝區""" + self.read_index.value = self.write_index.value + + def get_stats(self) -> dict: + """ + 獲取緩衝區統計資訊 + + Returns: + dict: 包含各種統計數據的字典 + """ + with self._stats_lock: + stats = { + "buffer_id": self.buffer_id, + "capacity": self.capacity, + "current_size": self.size(), + "is_full": self.is_full(), + "is_empty": self.is_empty(), + "total_writes": self.total_write_count.value, + "total_reads": self.total_read_count.value, + "concurrent_writes": self.concurrent_write_count.value, + "concurrent_reads": self.concurrent_read_count.value, + "overflow_count": self.overflow_count.value, + "write_threads": len(self.thread_write_counts), + "read_threads": len(self.thread_read_counts), + "concurrent_write_ratio": self.concurrent_write_count.value / max(1, self.total_write_count.value), + "concurrent_read_ratio": self.concurrent_read_count.value / max(1, self.total_read_count.value), + "top_writer_threads": sorted(self.thread_write_counts.items(), key=lambda x: x[1], reverse=True)[:3], + "top_reader_threads": sorted(self.thread_read_counts.items(), key=lambda x: x[1], reverse=True)[:3], + } + return stats + + def print_stats(self) -> None: + """輸出當前緩衝區統計資訊""" + stats = self.get_stats() + + print(f"\n=== RingBuffer #{stats['buffer_id']} Statistics ===") + print(f"Capacity: {stats['capacity']}, Current Size: {stats['current_size']}") + print(f"Total Write Operations: {stats['total_writes']}") + print(f"Total Read Operations: {stats['total_reads']}") + print(f"Concurrent Write Events: {stats['concurrent_writes']} ({stats['concurrent_write_ratio']:.2%})") + print(f"Concurrent Read Events: {stats['concurrent_reads']} ({stats['concurrent_read_ratio']:.2%})") + print(f"Buffer Overflow Count: {stats['overflow_count']}") + print(f"Writing Threads: {stats['write_threads']}") + print(f"Reading Threads: {stats['read_threads']}") + + print("Top Writer Threads:") + for thread_id, count in stats['top_writer_threads']: + print(f" Thread {thread_id}: {count} writes") + + print("Top Reader Threads:") + for thread_id, count in stats['top_reader_threads']: + print(f" Thread {thread_id}: {count} reads") + print("=============================\n") + + def reset_stats(self) -> None: + """重置所有統計數據""" + with self._stats_lock: + self.concurrent_write_count.value = 0 + self.concurrent_read_count.value = 0 + self.total_write_count.value = 0 + self.total_read_count.value = 0 + self.overflow_count.value = 0 + self.thread_write_counts.clear() + self.thread_read_counts.clear() + + def __str__(self) -> str: + """返回緩衝區的字符串表示""" + return f"RingBuffer(id={self.buffer_id}, capacity={self.capacity}, size={self.size()})" \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py b/src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py new file mode 100644 index 0000000..cb4373c --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/serial_udp_bitrans.py @@ -0,0 +1,340 @@ +import asyncio +import serial_asyncio + +# 附加驗證功能 +import os +import sys +import serial +import signal + +# XBee 模組 +from xbee.frame import APIFrame + +# ========================= + +SERIAL_PORT = '/dev/ttyACM0' # serial port +SERIAL_BAUDRATE = 57600 # serial baudrate + +UDP_REMOTE_IP = '127.0.0.1' # UDP Target IP +UDP_REMOTE_PORT = 14550 # UDP Target port + +# 測試用 只會吃一次資料 +DEBUG_MODE = False + +# ========================= + +def check_serial_port(): + """檢查串口是否存在與可用""" + # 檢查設備是否存在 + if not os.path.exists(SERIAL_PORT): + print(f"錯誤:串口設備 {SERIAL_PORT} 不存在") + return False + + # 檢查是否有權限訪問設備 + try: + os.access(SERIAL_PORT, os.R_OK | os.W_OK) + except Exception as e: + print(f"錯誤:無法訪問串口設備 {SERIAL_PORT}:{str(e)}") + return False + + # 檢查是否被占用 + try: + # 嘗試打開串口 + ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) + ser.close() # 打開成功後立即關閉 + return True + except serial.SerialException as e: + print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") + return False + except Exception as e: + print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") + return False + +# ========================= + +class SerialToUDP(asyncio.Protocol): # asyncio.Protocol 用於處理 Serial 收發 + def __init__(self, udp_protocol): + self.udp_protocol = udp_protocol + self.first_data = True # 標記是否為第一次收到資料 + self.has_processed = False # 測試模式用 處理數據旗標 # debug + + def connection_made(self, transport): + self.transport = transport + if hasattr(self.udp_protocol, 'set_serial_transport'): + self.udp_protocol.set_serial_transport(self) + print(f"Serial connection established on {SERIAL_PORT}") + + ## ===================================== + + # Serial 收到資料,轉發到 UDP + def data_received(self, data): + # 在 DEBUG 模式下,如果已經處理過數據,則直接返回 # debug + if DEBUG_MODE and self.has_processed: + return + + # 標記首次收到資料 + if hasattr(self.udp_protocol, 'send_udp'): + if self.first_data: + print(f"First data received from serial, forwarding to UDP: {data[:20]}...") + self.first_data = False + + # 轉發數據 + self.udp_protocol.send_udp(data) + + if DEBUG_MODE: # 測試模式用 # debug + self.has_processed = True + print("[DEBUG] SerialToUDP Mark") + + def write_to_serial(self, data): + # 在資料透過 Serial 發送之前進行處理 + processed_data = self.encapsulate_data(data) + + # 處理失敗就不發送 + if processed_data not None: + self.transport.write(processed_data) + + def encapsulate_data(self, data): + + """ + 將數據封裝為 XBee API 傳輸幀 + + 使用 XBee API 格式封裝數據: + - 傳輸請求幀 (0x10) + - 使用廣播地址 + - 添加適當的頭部和校驗和 + """ + + ## 方法一 + # 設置幀參數 + frame_id = 0x01 # 幀識別碼,用於確認 + frame_type = 0x10 # 幀類型:傳輸請求 + dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 64位目標地址 (廣播) + dest_addr16 = b'\xFF\xFE' # 16位目標地址 (未知/廣播) + broadcast_radius = 0x00 # 廣播半徑 (0 = 最大) + options = 0x00 # 傳輸選項 + + # 構建幀數據部分 + frame_data = bytearray() + frame_data.append(frame_type) # 添加幀類型 + frame_data.append(frame_id) # 添加幀 ID + frame_data.extend(dest_addr64) # 添加 64 位目標地址 + frame_data.extend(dest_addr16) # 添加 16 位目標地址 + frame_data.append(broadcast_radius) # 添加廣播半徑 + frame_data.append(options) # 添加選項 + frame_data.extend(data) # 添加實際數據內容 + + # 計算校驗和 (0xFF 減去所有字節的總和的最低 8 位) + checksum = 0xFF - (sum(frame_data) & 0xFF) + + # 構建完整的 API 幀 + # 起始分隔符 + 長度 (兩字節) + 幀數據 + 校驗和 + complete_frame = bytearray() + complete_frame.append(0x7E) # 添加起始分隔符 + complete_frame.extend(struct.pack(">H", len(frame_data))) # 添加長度 (高位優先) + complete_frame.extend(frame_data) # 添加幀數據 + complete_frame.append(checksum) # 添加校驗和 + + return bytes(complete_frame) + + ## 方法二 + # frame_id=0x01 + # frame_type = 0x10 + # dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 + # dest_addr16 = b'\xFF\xFE' + # broadcast_radius = 0x00 + # options = 0x00 + + # frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + # frame += dest_addr64 + dest_addr16 + # frame += struct.pack(">BB", broadcast_radius, options) + data + # checksum = 0xFF - (sum(frame) & 0xFF) + # return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + + +class UDPHandler(asyncio.DatagramProtocol): # asyncio.DatagramProtocol 用於處理 UDP 收發 + def __init__(self): + self.serial_transport = None + self.transport = None + self.remote_addr = None # 儲存動態獲取的遠程地址 + self.has_processed = False # 測試模式用 處理數據旗標 # debug + + def connection_made(self, transport): + self.transport = transport + print("UDP transport ready. Waiting for serial data before sending...") + + def set_serial_transport(self, serial_transport): + self.serial_transport = serial_transport + + ## ===================================== + + # UDP 收到資料 + def datagram_received(self, data, addr): + # 儲存對方的地址(這樣就能向同一個來源回傳數據) + # self.remote_addr = addr + # print(f"Received UDP data from {addr}, setting as remote address") + + # 在 DEBUG 模式下,如果已經處理過數據,則直接返回 + if DEBUG_MODE and self.has_processed: + return + + if self.serial_transport: + self.serial_transport.write_to_serial(data) + + if DEBUG_MODE: # 測試模式用 + self.has_processed = True + print("[DEBUG] UDPHandler Mark") + + def send_udp(self, data): + # 發送資料到 UDP + + # if self.transport: + # # 如果有已知的回應地址,使用該地址 + # if self.remote_addr: + # self.transport.sendto(data, self.remote_addr) + # # print(f"Sending to dynamic address: {self.remote_addr}") + # else: + # # 否則使用預設地址 + # self.transport.sendto(data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) + # print(f"Sending first UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}") + + if self.transport: + # 只有第一次或地址改變時才顯示 + # if not hasattr(self, '_last_sent_addr') or self._last_sent_addr != (UDP_REMOTE_IP, UDP_REMOTE_PORT): + # print(f"Sending UDP packet to: {UDP_REMOTE_IP}:{UDP_REMOTE_PORT}") + # self._last_sent_addr = (UDP_REMOTE_IP, UDP_REMOTE_PORT) + + # 在透過 UDP 發送數據之前進行解封裝 + decoded_data = self.decapsulate_data(data) + self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) + + def decapsulate_data(self, data): + # 這裡可以根據需要進行數據解封裝 + + ## 方法一 + try: + # 創建一個 API 幀處理器 + api_frame = APIFrame(escaped=True) + + # 嘗試解析數據 + api_frame.fill(data) + + # 如果數據不完整,直接返回原始數據 + if not api_frame.is_complete(): + return data + + # 解析幀內容 + parsed_data = api_frame.parse() + + # 提取有用數據 + if parsed_data: + frame_data = parsed_data.get('rf_data', None) + if frame_data: + return frame_data + + return data + + ## 方法二 - 手動解析 + # try: + # # XBee API 幀格式: + # # 起始分隔符(1字節) + 長度(2字節) + API標識符(1字節) + 數據 + 校驗和(1字節) + + # # 檢查幀起始符 (0x7E) + # if not data or len(data) < 5 or data[0] != 0x7E: + # return data + + # # 獲取數據長度 (不包括校驗和) + # length = (data[1] << 8) + data[2] + + # # 檢查幀完整性 + # if len(data) < length + 4: # 起始符 + 長度(2字節) + 數據 + 校驗和 + # return data + + # # 提取API標識符和數據 + # frame_type = data[3] + # frame_data = data[4:4+length-1] # 減1是因為API標識符已經算在長度中 + + # # 根據不同的幀類型進行處理 + # if frame_type == 0x90: # 例如,這是"接收數據包"類型 + # # 提取實際有效載荷 + # # 對於接收數據包,格式通常是: + # # API標識符(1) + 64位地址(8) + 16位地址(2) + 選項(1) + 數據 + # if len(frame_data) >= 12: # 確保數據長度足夠 + # payload = frame_data[11:] # 前11字節是地址和選項 + # return payload + # return data + + # 如果無法提取 則回傳 None + except Exception as e: + print(f"手動解析 XBee 數據錯誤: {e}") + return None + +async def main(): + # 先檢查串口是否可用 + if not check_serial_port(): + print("程式終止:串口檢查失敗") + return + + loop = asyncio.get_running_loop() + + # 設置終止處理 + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler( + sig, + lambda: asyncio.create_task(shutdown(loop)) + ) + + # 建立單一 UDP 端點處理收發 + udp_handler = UDPHandler() + + # 建立 UDP 傳輸,不指定接收端口,讓系統自動分配 + try: + udp_transport, protocol = await loop.create_datagram_endpoint( + lambda: udp_handler, + local_addr=('0.0.0.0', 0) # 使用端口 0 讓系統自動分配可用端口 + ) + except Exception as e: + print(f"無法創建 UDP 端點:{str(e)}") + return + + # 獲取系統分配的本地端口 + sock = udp_transport.get_extra_info('socket') + local_addr = sock.getsockname() + print(f"UDP listening on {local_addr[0]}:{local_addr[1]}") + + # 建立 Serial 傳輸,將 UDP 處理器傳給它 + try: + serial_proto = SerialToUDP(udp_handler) + _, serial_transport = await serial_asyncio.create_serial_connection( + loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE + ) + except Exception as e: + print(f"無法建立串口連接:{str(e)}") + udp_transport.close() + return + + print(f"Waiting for data from serial port to initiate UDP communication...") + + # 保持運行 + try: + await asyncio.Future() + except asyncio.CancelledError: + pass + +async def shutdown(loop): + """關閉程序""" + print("Shutting down...") + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + + for task in tasks: + task.cancel() + + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("程式被使用者中斷") + except Exception as e: + print(f"程式執行錯誤:{str(e)}") \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/test_mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/test_mavlinkObject.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fc_network_adapter/tests/__init__.py b/src/fc_network_adapter/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fc_network_adapter/tests/devRun.py b/src/fc_network_adapter/tests/devRun.py new file mode 100644 index 0000000..0707b51 --- /dev/null +++ b/src/fc_network_adapter/tests/devRun.py @@ -0,0 +1,448 @@ +import os +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# 基礎功能的 import +import queue +import time + +# ROS2 的 import +import rclpy + +# mavlink 的 import +from pymavlink import mavutil + +# 自定義的 import +from fc_network_adapter import mavlinkObject as mo +from fc_network_adapter import mavlinkDevice as md + +# ====================== 分割線 ===================== + +test_item = 21 +running_time = 30 + + +print('test_item : ', test_item) + +''' +測試項 個位數 表示分離的功能 + +測試項 1X 表示 mavlink_object 的功能 測試連線的能力 +''' + +if test_item == 10: + # 需要開啟一個 ardupilot 的模擬器 + # 測試 mavlink_object 放入 ring buffer 的應用 + print('===> Start of Program .Test ', test_item) + + # 清空 ring buffer + mo.stream_bridge_ring.clear() + mo.return_packet_ring.clear() + + manager = mo.async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 初始化輸入通道 + connection_string="udp:127.0.0.1:14560" + mavlink_socket1 = mavutil.mavlink_connection(connection_string) + mavlink_object1 = mo.mavlink_object(mavlink_socket1) + + manager.add_mavlink_object(mavlink_object1) + + start_time = time.time() + while (time.time() - start_time) < running_time: + items_a = mo.stream_bridge_ring.get_all() + items_b = mo.return_packet_ring.get_all() + try: + print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}") + except IndexError: + print("stream_bridge_ring is empty") + + try: + print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}") + except IndexError: + print("return_packet_ring is empty") + time.sleep(1) + + manager.stop() + + print('<=== End of Program') + +elif test_item == 11: + # 需要開啟一個 ardupilot 的模擬器 + # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 + print('===> Start of Program .Test ', test_item) + + # 清空 ring buffer + mo.stream_bridge_ring.clear() + mo.return_packet_ring.clear() + + manager = mo.async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 初始化輸入通道 + connection_string="udp:127.0.0.1:14560" + mavlink_socket1 = mavutil.mavlink_connection(connection_string) + mavlink_object1 = mo.mavlink_object(mavlink_socket1) + manager.add_mavlink_object(mavlink_object1) + + # 啟動 mavlink_bridge + analyzer = mo.mavlink_bridge() + + time.sleep(3) + + # 印出目前所有 mavlink_systems 的內容 + print('目前所有的系統 : ') + for sysid in analyzer.mavlink_systems: + print(analyzer.mavlink_systems[sysid]) + + start_time = time.time() + show_time = time.time() + while time.time() - start_time < running_time: + if (time.time() - show_time) >= 2: + # print("mark B") + + show_time = time.time() + for sysid in analyzer.mavlink_systems: + for compid in analyzer.mavlink_systems[sysid].components: + print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) + analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) + print("===================") + + manager.stop() + analyzer.stop() + + print('<=== End of Program') + +elif test_item == 12: + # 需要開啟一個 ardupilot 的模擬器 與 GCS + # 這邊是測試 mavlink object 作為交換器功能的代碼 + print('===> Start of Program .Test ', test_item) + + # 清空 ring buffer + mo.stream_bridge_ring.clear() + mo.return_packet_ring.clear() + + manager = mo.async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 初始化輸入通道 + connection_string="udp:127.0.0.1:14560" + mavlink_socket_in1 = mavutil.mavlink_connection(connection_string) + mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1) + + connection_string="udp:127.0.0.1:14561" + mavlink_socket_in2 = mavutil.mavlink_connection(connection_string) + mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2) + + # 初始化輸出通道 + connection_string="udpout:127.0.0.1:14550" + mavlink_socket_out = mavutil.mavlink_connection(connection_string) + mavlink_object_out = mo.mavlink_object(mavlink_socket_out) + + manager.add_mavlink_object(mavlink_object_out) + manager.add_mavlink_object(mavlink_object_in1) + manager.add_mavlink_object(mavlink_object_in2) + + mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id) + mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id) + + mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id) + mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id) + + start_time = time.time() + while (time.time() - start_time) < running_time: + + time.sleep(1) + + manager.stop() + + print('<=== End of Program') + + +# elif test_item == 20: +# # 這邊測試 node 生成 topic 的功能 +# # 利用 空的通道 發出假的 heartbeat 封包 +# print('===> Start of Program .Test ', test_item) +# rclpy.init() # 注意要初始化 rclpy 才能使用 node + +# from pymavlink.dialects.v20 import common as mavlink2 + +# sysid = 1 +# compid = 1 + +# # 啟動 mavlink_bridge +# analyzer = mo.mavlink_bridge() + +# mav = mavlink2.MAVLink(None) +# mav.srcSystem = sysid +# mav.srcComponent = compid + +# # 這是一個 heartbeat 封包 +# fake_heartbeat = mavlink2.MAVLink_heartbeat_message( +# type=mavlink2.MAV_TYPE_QUADROTOR, +# autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA, +# base_mode=3, +# custom_mode=0, +# system_status=0, +# mavlink_version=3 +# ) +# mavlink_bytes = fake_heartbeat.pack(mav) +# fake_heartbeat_msg = mav.parse_char(mavlink_bytes) + +# print(analyzer.mavlink_systems) + +# mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data +# time.sleep(0.1) +# print(analyzer.mavlink_systems) + +# # ROS2 初始化 +# analyzer._init_node() + +# # 創建 ROS2 topic +# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid +# print("topic created") +# time.sleep(5) + +# # 丟出 topic +# analyzer.emit_info() + +# # 結束程式 +# analyzer.running = False +# analyzer.destroy_node() + +# rclpy.shutdown() +# analyzer.stop() +# analyzer.thread.join() +# print('<=== End of Program') + +elif test_item == 21: + # 需要開啟一個 ardupilot 的模擬器 + # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 + + print('===> Start of Program .Test ', test_item) + # 初始化 rclpy 才能使用 node + rclpy.init() + + # 清空 ring buffer + mo.stream_bridge_ring.clear() + mo.return_packet_ring.clear() + + manager = mo.async_io_manager() + manager.start() + + # 啟動 mavlink_bridge + analyzer = mo.mavlink_bridge() + # 關於 Node 的初始化 + show_time = time.time() + analyzer._init_node() # 初始化 node + print('初始化 node 完成 耗時 : ',time.time() - show_time) + + time.sleep(0.5) # 系統 Setup 完成 + + + # 創建通道 + connection_string="udp:127.0.0.1:14560" + mavlink_socket = mavutil.mavlink_connection(connection_string) + mavlink_object3 = mo.mavlink_object(mavlink_socket) + manager.add_mavlink_object(mavlink_object3) + + print('=== waiting for mavlink data ...') + time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 + + print('目前所有的系統 : ') + for sysid in analyzer.mavlink_systems: + print(analyzer.mavlink_systems[sysid]) + + compid = 1 + sysid = 1 + start_time = time.time() + analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) + end_time = time.time() + print(f"Execution time for create_flightMode: {end_time - start_time} seconds") + + print("start emit info") + + start_time = time.time() + show_time = time.time() + while time.time() - start_time < running_time: + try: + # print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) + analyzer.emit_info() # 這邊是測試 node 的運行 + time.sleep(1) + except KeyboardInterrupt: + break + + + # 程式結束 + analyzer.destroy_node() + rclpy.shutdown() + + # 結束程式 退出所有 thread + manager.stop() + analyzer.stop() + analyzer.thread.join() + + mavlink_socket.close() + print('<=== End of Program') + +# elif test_item == 22: +# # 需要開啟一個 ardupilot 的模擬器 與 GCS +# # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 +# print('===> Start of Program .Test ', test_item) +# rclpy.init() # 注意要初始化 rclpy 才能使用 node + +# # 啟動 mavlink_bridge +# analyzer = mo.mavlink_bridge() +# # 關於 Node 的初始化 +# show_time = time.time() +# analyzer._init_node() # 初始化 node +# print('初始化 node 完成 耗時 : ',time.time() - show_time) + +# # 初始化兩個通道 +# connection_string_in="udp:127.0.0.1:15551" +# mavlink_socket_in = mavutil.mavlink_connection(connection_string_in) +# mavlink_object_in = mo.mavlink_object(mavlink_socket_in) +# mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] + + +# connection_string_out="udpout:127.0.0.1:14553" +# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) +# mavlink_object_out = mo.mavlink_object(mavlink_socket_out) +# mavlink_object_out.multiplexingToAnalysis = [] + + + +# # 讓兩個通道互相傳輸 +# mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all +# mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all + +# # 啟動通道 +# mavlink_object_in.run() +# mavlink_object_out.run() + +# print('waiting for mavlink data ...') +# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 + +# print('目前所有的系統 : ') +# for sysid in analyzer.mavlink_systems: +# print(analyzer.mavlink_systems[sysid]) + +# compid = 1 +# sysid = 1 +# show_time = time.time() +# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# print(f"Execution time for create_flightMode: {time.time() - show_time} seconds") + +# print("start emit info") + +# start_time = time.time() +# show_time = time.time() +# show_time2 = time.time() + +# while time.time() - start_time < running_time: +# if (time.time() - show_time2) >= 1: +# analyzer.emit_info() # 這邊是測試 node 的運行 +# # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode'] +# fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode'] +# sss = mavutil.mode_string_v10(fmsg) +# # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq())) +# print("目前的飛行模式 : {}".format(sss)) +# show_time2 = time.time() + +# # if (time.time() - show_time) >= 2: +# # show_time = time.time() +# # for sysid in analyzer.mavlink_systems: +# # for compid in analyzer.mavlink_systems[sysid].components: +# # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) +# # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) +# # print("===================") + + + +# analyzer.destroy_node() +# rclpy.shutdown() + + +# # 結束程式 退出所有 thread +# mavlink_object_in.stop() +# mavlink_object_in.thread.join() +# mavlink_socket_in.close() +# mavlink_object_out.stop() +# mavlink_object_out.thread.join() +# mavlink_socket_out.close() +# analyzer.stop() +# analyzer.thread.join() + + +# print('<=== End of Program') + +# elif test_item == 51: + +# # 晉凱的測試項目 +# print('===> Start of Program .Test ', test_item) +# rclpy.init() # 注意要初始化 rclpy 才能使用 node + +# # 啟動 mavlink_bridge +# analyzer = mo.mavlink_bridge() +# # 關於 Node 的初始化 +# show_time = time.time() +# analyzer._init_node() # 初始化 node +# print('初始化 node 完成 耗時 : ',time.time() - show_time) + +# # 創建通道 +# connection_string="udp:127.0.0.1:15551" +# mavlink_socket3 = mavutil.mavlink_connection(connection_string) +# mavlink_object3 = mo.mavlink_object(mavlink_socket3) +# # 設定通道流動 +# mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] +# mavlink_object3.multiplexingToReturn = [] # +# # mavlink_object3.multiplexingToSwap = [] # +# # 啟動通道 +# mavlink_object3.run() + + + +# print('waiting for mavlink data ...') +# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 + +# compid = 1 +# sysid = 1 +# start_time = time.time() +# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid]) +# end_time = time.time() +# print(f"Execution time for create all topic: {end_time - start_time} seconds") + +# print("start emit info") + +# start_time = time.time() +# show_time = time.time() +# while time.time() - start_time < running_time: +# try: +# # rclpy.spin(analyzer) +# analyzer.emit_info() # 這邊是測試 node 的運行 +# time.sleep(0.5) +# except KeyboardInterrupt: +# break + +# analyzer.destroy_node() +# rclpy.shutdown() + +# # 結束程式 退出所有 thread +# mavlink_object3.stop() +# mavlink_object3.thread.join() +# analyzer.stop() +# analyzer.thread.join() + +# mavlink_socket3.close() +# print('<=== End of Program') \ No newline at end of file diff --git a/src/fc_network_adapter/tests/test_mavlinkObject.py b/src/fc_network_adapter/tests/test_mavlinkObject.py new file mode 100644 index 0000000..23907ee --- /dev/null +++ b/src/fc_network_adapter/tests/test_mavlinkObject.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python +""" +測試腳本,用於測試 mavlinkObject.py 中的 mavlink_object 和 async_io_manager 類別 +""" + +import os +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + + +import unittest +import time +import threading +import socket +import asyncio +from unittest.mock import MagicMock, patch + +# 導入要測試的模組 +from fc_network_adapter.mavlinkObject import ( + mavlink_object, + async_io_manager, + MavlinkObjectState, + stream_bridge_ring, + return_packet_ring +) + + +class MockMavlinkSocket: + """模擬 Mavlink Socket 的類別,用於測試""" + + def __init__(self, test_data=None): + self.closed = False + self.test_data = test_data or [] + self.data_index = 0 + self.written_data = [] + + def recv_msg(self): + if not self.test_data or self.data_index >= len(self.test_data): + return None + + data = self.test_data[self.data_index] + self.data_index += 1 + return data + + def write(self, data): + self.written_data.append(data) + + def close(self): + self.closed = True + + +class MockMavlinkMessage: + """模擬 Mavlink 訊息的類別,用於測試""" + + def __init__(self, msg_id, sysid, compid, seq=0): + self.msg_id = msg_id + self.sysid = sysid + self.compid = compid + self.seq = seq + self.msg_buf = bytes([msg_id, sysid, compid, seq]) + + def get_msgId(self): + return self.msg_id + + def get_srcSystem(self): + return self.sysid + + def get_srcComponent(self): + return self.compid + + def get_seq(self): + return self.seq + + def get_msgbuf(self): + return self.msg_buf + + def get_type(self): + return f"MSG_ID_{self.msg_id}" + + +class TestMavlinkObject(unittest.TestCase): + """測試 mavlink_object 類別""" + + def setUp(self): + """在每個測試方法執行前準備環境""" + # 清空全局變數 + mavlink_object.mavlinkObjects = {} + mavlink_object.socket_num = 0 + + # 清空 ring buffer + stream_bridge_ring.clear() + return_packet_ring.clear() + + # 創建測試訊息 + self.heartbeat_msg = MockMavlinkMessage(msg_id=0, sysid=1, compid=1) + self.attitude_msg = MockMavlinkMessage(msg_id=30, sysid=1, compid=1) + self.position_msg = MockMavlinkMessage(msg_id=32, sysid=1, compid=1) + + # 創建模擬的 socket # 假的 Mavlink Socket + self.mock_socket = MockMavlinkSocket([ + self.heartbeat_msg, + self.attitude_msg, + self.position_msg + ]) + + # 創建測試對象 + self.mavlink_obj = mavlink_object(self.mock_socket) + + def test_initialization(self): + """測試 mavlink_object 初始化是否正確""" + # print("Testing mavlink_object initialization") + self.assertEqual(self.mavlink_obj.socket_id, 0) + self.assertEqual(self.mavlink_obj.state, MavlinkObjectState.INIT) + self.assertEqual(len(self.mavlink_obj.target_sockets), 0) + self.assertEqual(self.mavlink_obj.bridge_msg_types, [0]) + + def test_add_remove_target_socket(self): + """測試添加和移除目標端口功能""" + # 添加目標端口 + self.assertTrue(self.mavlink_obj.add_target_socket(1)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 1) + self.assertEqual(self.mavlink_obj.target_sockets[0], 1) + self.assertTrue(self.mavlink_obj.add_target_socket(2)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 2) + self.assertIn(2, self.mavlink_obj.target_sockets) + + # 嘗試添加已存在的端口 + self.assertFalse(self.mavlink_obj.add_target_socket(1)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 2) + + # 嘗試添加自己的端口 + self.assertFalse(self.mavlink_obj.add_target_socket(0)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 2) + + # 移除端口 + self.assertTrue(self.mavlink_obj.remove_target_socket(2)) + self.assertEqual(len(self.mavlink_obj.target_sockets), 1) + + # 嘗試移除不存在的端口 + self.assertFalse(self.mavlink_obj.remove_target_socket(2)) + + def test_set_message_types(self): + """測試設置訊息類型功能""" + # 設置橋接器訊息類型 + self.assertTrue(self.mavlink_obj.set_bridge_message_types([0, 30])) + self.assertEqual(self.mavlink_obj.bridge_msg_types, [0, 30]) + + # 設置回傳處理器訊息類型 + self.assertTrue(self.mavlink_obj.set_return_message_types([32])) + self.assertEqual(self.mavlink_obj.return_msg_types, [32]) + + # 測試無效的訊息類型 + self.assertFalse(self.mavlink_obj.set_bridge_message_types("invalid")) + self.assertFalse(self.mavlink_obj.set_return_message_types([0, "invalid"])) + + def test_send_message(self): + """測試 send_message 功能""" + # 創建一個新的 mavlink_object 實例 + mock_socket = MockMavlinkSocket() + mavlink_obj = mavlink_object(mock_socket) + + # 準備測試數據 + test_message1 = b"test_message_1" + test_message2 = b"test_message_2" + + # 測試初始狀態 + self.assertEqual(len(mock_socket.written_data), 0) + + # 測試非運行狀態下發送消息 + self.assertFalse(mavlink_obj.send_message(test_message1)) + self.assertEqual(len(mock_socket.written_data), 0) + + # 啟動 manager + manager = async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 添加對象到 manager + manager.add_mavlink_object(mavlink_obj) + time.sleep(0.1) # 等待對象啟動 + + # 確認對象狀態 + self.assertEqual(mavlink_obj.state, MavlinkObjectState.RUNNING) + + # 測試發送消息 + self.assertTrue(mavlink_obj.send_message(test_message1)) + time.sleep(0.2) # 等待消息處理 + + # 確認消息已發送 + self.assertEqual(len(mock_socket.written_data), 1) + self.assertEqual(mock_socket.written_data[0], test_message1) + + # 測試連續發送多條消息 + self.assertTrue(mavlink_obj.send_message(test_message2)) + time.sleep(0.2) # 等待消息處理 + + # 確認兩條消息都已發送 + self.assertEqual(len(mock_socket.written_data), 2) + self.assertEqual(mock_socket.written_data[1], test_message2) + + # 模擬發送出錯的情況 + class ErrorWriteSocket(MockMavlinkSocket): + def write(self, data): + raise Exception("Write error") + + error_socket = ErrorWriteSocket() + error_obj = mavlink_object(error_socket) + manager.add_mavlink_object(error_obj) + time.sleep(0.1) # 等待對象啟動 + + # 發送消息到錯誤的 socket + self.assertTrue(error_obj.send_message(test_message1)) + time.sleep(0.2) # 等待消息處理 + + # 即使寫入失敗,send_message 應該也返回 True + # 因為消息已成功加入到佇列中,只是後續的實際發送失敗 + + # 停止 manager + manager.stop() + time.sleep(0.5) # 等待 manager 停止 + + # 測試對象已關閉後發送消息 + self.assertFalse(mavlink_obj.send_message(test_message1)) + self.assertEqual(len(mock_socket.written_data), 2) # 消息數量未增加 + +class TestAsyncIOManager(unittest.TestCase): + """測試 async_io_manager 類別""" + + def setUp(self): + """在每個測試方法執行前準備環境""" + # 清空全局變數 + mavlink_object.mavlinkObjects = {} + mavlink_object.socket_num = 0 + + # 清空 ring buffer + stream_bridge_ring.clear() + return_packet_ring.clear() + + # 創建 async_io_manager 實例 + self.manager = async_io_manager() + + # 模擬 mavlink 對象 + self.mock_socket1 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=1, compid=1), + MockMavlinkMessage(msg_id=30, sysid=1, compid=1) + ]) + self.mock_socket2 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=2, compid=1) + ]) + + self.mavlink_obj1 = mavlink_object(self.mock_socket1) + self.mavlink_obj2 = mavlink_object(self.mock_socket2) + + def tearDown(self): + """在每個測試方法執行後清理環境""" + if self.manager.running: + self.manager.stop() + + def test_singleton_pattern(self): + """測試 async_io_manager 的單例模式 就是不會產生兩個 magager""" + manager1 = async_io_manager() + manager2 = async_io_manager() + self.assertIs(manager1, manager2) + + def test_start_stop(self): + """測試 async_io_manager 的啟動和停止功能""" + # 啟動管理器 + self.manager.start() + self.assertTrue(self.manager.running) + self.assertIsNotNone(self.manager.thread) + + # 再次啟動應該沒有效果 + old_thread = self.manager.thread + self.manager.start() + self.assertIs(self.manager.thread, old_thread) + + # 停止管理器 + self.manager.stop() + self.assertFalse(self.manager.running) + + # 最多等待 5 秒讓線程結束 + start_time = time.time() + while self.manager.thread.is_alive() and time.time() - start_time < 5: + time.sleep(0.1) + self.assertFalse(self.manager.thread.is_alive()) + + def test_add_remove_objects(self): + """測試添加和移除 mavlink_object""" + # 啟動管理器 + self.manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 添加對象 + self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj1)) + self.assertEqual(len(self.manager.managed_objects), 1) + self.assertEqual(self.mavlink_obj1.state, MavlinkObjectState.RUNNING) + + # 添加另一個對象 + self.assertTrue(self.manager.add_mavlink_object(self.mavlink_obj2)) + self.assertEqual(len(self.manager.managed_objects), 2) + + # 檢查受管理對象列表 + managed_objects = self.manager.get_managed_objects() + self.assertEqual(len(managed_objects), 2) + self.assertIn(0, managed_objects) + self.assertIn(1, managed_objects) + + # 移除對象 + self.assertTrue(self.manager.remove_mavlink_object(0)) + self.assertEqual(len(self.manager.managed_objects), 1) + + # 嘗試移除不存在的對象 + self.assertFalse(self.manager.remove_mavlink_object(999)) + + # 停止管理器 + self.manager.stop() + + def test_data_processing(self): + """測試數據處理""" + # 創建用於轉發的 mavlink_objects + mock_socket3 = MockMavlinkSocket() + mavlink_obj3 = mavlink_object(mock_socket3) + + # 設置轉發 + self.mavlink_obj1.add_target_socket(2) # socket1 轉發到 socket3 + + # 設置訊息類型 + self.mavlink_obj1.set_bridge_message_types([0, 30]) # HEARTBEAT, ATTITUDE + # self.mavlink_obj1.enable_return(True) + self.mavlink_obj1.set_return_message_types([30]) # ATTITUDE + + # 啟動管理器並添加對象 + self.manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + self.manager.add_mavlink_object(self.mavlink_obj1) + self.manager.add_mavlink_object(mavlink_obj3) + + # 等待處理完成 + time.sleep(1.0) + + # print("testing Mark A") + + # 檢查 Ring buffer 是否有正確的數據 + self.assertEqual(stream_bridge_ring.size(), 2) # HEARTBEAT + ATTITUDE + self.assertEqual(return_packet_ring.size(), 1) # ATTITUDE + + a = stream_bridge_ring.get() + print(f"stream_bridge_ring: {a}") + + # 檢查是否正確轉發 + self.assertEqual(len(mock_socket3.written_data), 2) # HEARTBEAT + ATTITUDE + + # print("testing Mark B") + + # 停止管理器 + self.manager.stop() + + def test_error_handling(self): + """測試錯誤處理情況""" + print("=== mark A ===") + # 創建一個會引發異常的 socket + class ErrorSocket(MockMavlinkSocket): + def recv_msg(self): + raise Exception("Test exception") + + error_socket = ErrorSocket() + mavlink_obj_err = mavlink_object(error_socket) + + # 啟動管理器並添加對象 + self.manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + self.manager.add_mavlink_object(mavlink_obj_err) + + print("=== mark B ===") + + # 等待錯誤處理 + time.sleep(1.0) + + # 對象應該進入錯誤狀態,但不會崩潰 + # self.assertEqual(mavlink_obj_err.state, MavlinkObjectState.ERROR) + + print("=== mark C ===") + + # 管理器應該仍在運行 + self.assertTrue(self.manager.running) + + + # 故意等一段時間 確認 socket 有被 manager 處理掉 + time.sleep(3) + + # 停止管理器 + self.manager.stop() + + +class TestIntegration(unittest.TestCase): + """整合測試,測試多個 mavlink_object 之間的互動""" + + def test_bidirectional_forwarding(self): + """測試雙向轉發""" + # 清空全局變數和 ring buffer + mavlink_object.mavlinkObjects = {} + mavlink_object.socket_num = 0 + stream_bridge_ring.clear() + return_packet_ring.clear() + + # 創建三個 mavlink 對象,模擬三個通道 + socket1 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=1, compid=1), + MockMavlinkMessage(msg_id=30, sysid=1, compid=1) + ]) + socket2 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=2, compid=1), + MockMavlinkMessage(msg_id=32, sysid=2, compid=1) + ]) + socket3 = MockMavlinkSocket([ + MockMavlinkMessage(msg_id=0, sysid=3, compid=1), + MockMavlinkMessage(msg_id=33, sysid=3, compid=1) + ]) + + obj1 = mavlink_object(socket1) + obj2 = mavlink_object(socket2) + obj3 = mavlink_object(socket3) + + # 設置雙向轉發 + # obj1 <-> obj2 <-> obj3 + obj1.add_target_socket(1) # obj1 -> obj2 + obj2.add_target_socket(0) # obj2 -> obj1 + obj2.add_target_socket(2) # obj2 -> obj3 + obj3.add_target_socket(1) # obj3 -> obj2 + + # 啟動 async_io_manager + manager = async_io_manager() + manager.start() + time.sleep(0.5) # 等待事件循環啟動 + + # 添加所有 mavlink_object + manager.add_mavlink_object(obj1) + manager.add_mavlink_object(obj2) + manager.add_mavlink_object(obj3) + + # 等待處理所有訊息 + time.sleep(1.5) + + # 檢查轉發結果 + # socket1 應該收到 socket2 和 socket3 的訊息 + self.assertEqual(len(socket1.written_data), 4) # 2 from obj2, 2 from obj3 via obj2 + + # socket2 應該收到 socket1 和 socket3 的訊息 + self.assertEqual(len(socket2.written_data), 4) # 2 from obj1, 2 from obj3 + + # socket3 應該收到 socket1 和 socket2 的訊息 + self.assertEqual(len(socket3.written_data), 4) # 2 from obj1 via obj2, 2 from obj2 + + # 檢查 ring buffer 的數據 + # 假設所有對象都啟用了橋接器,且預設的 bridge_msg_types = [0] + # 應該有 3 個 HEARTBEAT 訊息 + self.assertEqual(stream_bridge_ring.size(), 3) # 3 HEARTBEAT + + # 停止管理器 + manager.stop() + + + + +if __name__ == "__main__": + unittest.main(defaultTest="TestMavlinkObject.test_send_message") diff --git a/src/fc_network_adapter/tests/test_ringBuffer.py b/src/fc_network_adapter/tests/test_ringBuffer.py new file mode 100644 index 0000000..563087b --- /dev/null +++ b/src/fc_network_adapter/tests/test_ringBuffer.py @@ -0,0 +1,152 @@ +import os +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import time +import threading + +from fc_network_adapter.ringBuffer import RingBuffer + + +def producer(buffer, count, interval=0.01): + """生產者:向緩衝區添加資料""" + print(f"Producer started (thread {threading.get_ident()})") + for i in range(count): + # 嘗試寫入數據,直到成功 + while not buffer.put(f"Item-{i}"): + print(f"Buffer full, producer waiting... (thread {threading.get_ident()})") + time.sleep(0.1) + + print(f"Produced: Item-{i}, buffer size: {buffer.size()}") + time.sleep(interval) # 模擬生產過程 + + print(f"Producer finished (thread {threading.get_ident()})") + +def consumer(buffer, max_items, interval=0.05): + """消費者:從緩衝區讀取資料""" + print(f"Consumer started (thread {threading.get_ident()})") + items_consumed = 0 + + while items_consumed < max_items: + # 嘗試讀取數據 + item = buffer.get() + if item: + print(f"Consumed: {item}, buffer size: {buffer.size()}") + items_consumed += 1 + else: + print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})") + + time.sleep(interval) # 模擬消費過程 + + print(f"Consumer finished (thread {threading.get_ident()})") + +def batch_consumer(buffer, interval=0.2): + """批量消費者:一次性讀取緩衝區中的所有資料""" + print(f"Batch consumer started (thread {threading.get_ident()})") + + for _ in range(5): # 執行5次批量讀取 + time.sleep(interval) # 等待緩衝區積累數據 + items = buffer.get_all() + if items: + print(f"Batch consumed {len(items)} items: {items}") + else: + print("Buffer empty for batch consumer") + + print(f"Batch consumer finished (thread {threading.get_ident()})") + +def demonstrate_multi_writer(): + """示範多個寫入執行緒同時操作緩衝區""" + print("\n=== Demonstrating Multiple Writers ===") + buffer = RingBuffer(capacity=80) + + # 創建多個生產者執行緒 + threads = [] + for i in range(3): + thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1))) + threads.append(thread) + thread.start() + + # 等待所有執行緒完成 + for thread in threads: + thread.join() + + buffer.print_stats() # 印出統計資訊 + + # 讀出所有剩餘資料 + remaining = buffer.get_all() + print(f"Remaining items in buffer after multiple writers: {remaining}") + +def demonstrate_basic_usage(): + """示範基本使用方式""" + print("\n=== Demonstrating Basic Usage ===") + # 創建緩衝區 + buffer = RingBuffer(capacity=20, buffer_id=7) + + # 檢查初始狀態 + print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") + + # 添加幾個項目 + for i in range(5): + buffer.put(f"Test-{i}") + + # 檢查狀態 + print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") + + # 讀取一個項目 + item = buffer.get() + print(f"Read item: {item}") + print(f"After reading 1 item - Content Size: {buffer.size()}") + + # 添加更多項目直到滿 + items_added = 0 + while not buffer.is_full(): + buffer.put(f"Fill-{items_added}") + items_added += 1 + + print(f"Added {items_added} more items until full") + print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") + + # 嘗試添加到已滿的緩衝區 + result = buffer.put("Overflow") + print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}") + + # 獲取所有項目 + all_items = buffer.get_all() + print(f"All items in buffer: {all_items}") + print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}") + + # 印出統計資訊 + buffer.print_stats() + +def demonstrate_producer_consumer(): + """示範生產者-消費者模式""" + print("\n=== Demonstrating Producer-Consumer Pattern ===") + buffer = RingBuffer(capacity=16) + + # 創建生產者和消費者執行緒 + producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1)) + consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2)) + batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5)) + + # 啟動執行緒 + producer_thread.start() + consumer_thread.start() + batch_thread.start() + + # 等待執行緒完成 + producer_thread.join() + consumer_thread.join() + batch_thread.join() + + # 檢查最終狀態 + print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}") + + buffer.print_stats() + +if __name__ == "__main__": + # 展示各種使用場景 + # demonstrate_basic_usage() + # demonstrate_producer_consumer() + demonstrate_multi_writer() + + print("\nAll demonstrations completed!") \ No newline at end of file diff --git a/src/unitdev02/unitdev02/server.py b/src/unitdev02/unitdev02/server.py deleted file mode 100644 index 148edc5..0000000 --- a/src/unitdev02/unitdev02/server.py +++ /dev/null @@ -1,33 +0,0 @@ -import socket -import os - -# socket file path -SOCKET_PATH = "/tmp/unix_socket_example" - -# 若檔案存在就先刪除 -if os.path.exists(SOCKET_PATH): - os.remove(SOCKET_PATH) - -# 建立 socket -server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - -# 綁定 socket 到檔案 -server.bind(SOCKET_PATH) -server.listen(1) - -print("🔌 Server waiting for connection...") - -# 等待 client 連線 -conn, _ = server.accept() -print("✅ Client connected.") - -while True: - data = conn.recv(1024) - if not data: - break - print("📥 Received:", data.decode()) - conn.sendall(b"Echo: " + data) - -conn.close() -server.close() -os.remove(SOCKET_PATH) diff --git a/src/unitdev02/unitdev02/client.py b/src/unitdev02/unitdev02/unix_client.py similarity index 100% rename from src/unitdev02/unitdev02/client.py rename to src/unitdev02/unitdev02/unix_client.py diff --git a/src/unitdev02/unitdev02/unix_server.py b/src/unitdev02/unitdev02/unix_server.py new file mode 100644 index 0000000..30ef524 --- /dev/null +++ b/src/unitdev02/unitdev02/unix_server.py @@ -0,0 +1,94 @@ +import socket +import os + +# # socket file path +# SOCKET_PATH = "/tmp/unix_socket_example" + +# # 若檔案存在就先刪除 +# if os.path.exists(SOCKET_PATH): +# os.remove(SOCKET_PATH) + +# # 建立 socket +# server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + +# # 綁定 socket 到檔案 +# server.bind(SOCKET_PATH) +# server.listen(1) + +# print("🔌 Server waiting for connection...") + +# # 等待 client 連線 +# conn, _ = server.accept() +# print("✅ Client connected.") + +# while True: +# data = conn.recv(1024) +# if not data: +# break +# print("📥 Received:", data.decode()) +# conn.sendall(b"Echo: " + data) + +# conn.close() +# server.close() +# os.remove(SOCKET_PATH) + + +# ===================== + +# from pymavlink import mavutil + + +# def create_unix_socket_connection(): +# # 建立一個 mavtcpin 實例 +# mav_conn = mavutil.mavtcpin("127.0.0.250:9999", source_system=1, source_component=1) + +# # 替換底層 socket +# # 關閉原有的 socket +# mav_conn.listen.close() + +# # 創建 Unix socket 並替換 +# unix_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) +# unix_socket_path = "/tmp/unix_socket_mavlink.sock" + +# # 確保先前的 unix socket 已被移除 +# if os.path.exists(unix_socket_path): +# os.remove(unix_socket_path) + +# # 綁定並設定 Unix socket +# unix_socket.bind(unix_socket_path) +# unix_socket.listen(1) +# unix_socket.setblocking(0) +# mavutil.set_close_on_exec(unix_socket.fileno()) + +# # 替換 listen socket +# mav_conn.listen = unix_socket +# mav_conn.fd = unix_socket.fileno() + +# # mav_conn.port = unix_socket + +# return mav_conn + + +# a = create_unix_socket_connection() + +# # print(a.port) + + + + +# =============================== + +import mavunixin +mav_conn = mavunixin.mavunixin("unix:/tmp/unix_socket_mavlink.sock", source_system=1, source_component=1) + +import time +print("🔌 Server waiting for connection...") +while True: + time.sleep(1) + mavlinkMsg = mav_conn.recv_msg() + if mavlinkMsg is not None: + print("📥 Received:", mavlinkMsg) + # a.send(mavlinkMsg) + # print("📤 Server replied:", data.decode()) + else: + print("No message received.") \ No newline at end of file diff --git a/src/unitdev04/change_mode.py b/src/unitdev04/change_mode.py new file mode 100644 index 0000000..53c96f3 --- /dev/null +++ b/src/unitdev04/change_mode.py @@ -0,0 +1,109 @@ +import curses +import serial +import struct +from pymavlink.dialects.v20 import ardupilotmega as mavlink2 + +PORT = "COM5" # or "/dev/ttyUSB0" +BAUD = 57600 +target_system = 5 +target_component = 1 +MAV_CMD_DO_SET_MODE = 176 + +mode_list = [ + ("STABILIZE", 0), + ("AUTO", 3), + ("GUIDED", 4), + ("LOITER", 5) +] + +ser = serial.Serial(PORT, BAUD) + +class PacketCapture: + def __init__(self): + self.data = bytearray() + def write(self, b): + self.data.extend(b) + return len(b) + +def build_api_tx_frame(data: bytes, frame_id=0x01): + frame_type = 0x10 + dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 + dest_addr16 = b'\xFF\xFE' + broadcast_radius = 0x00 + options = 0x00 + + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += dest_addr64 + dest_addr16 + frame += struct.pack(">BB", broadcast_radius, options) + data + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + +def curses_main(stdscr): + curses.curs_set(0) + selected = 0 + + while True: + stdscr.clear() + h, w = stdscr.getmaxyx() + + stdscr.addstr(1, 2, "模式選單(使用:箭頭選擇,Enter:發送,q:離開)") + + for i, (name, _) in enumerate(mode_list): + if i == selected: + stdscr.attron(curses.A_REVERSE) + stdscr.addstr(i + 3, 4, f"> {name}") + stdscr.attroff(curses.A_REVERSE) + else: + stdscr.addstr(i + 3, 4, f" {name}") + + key = stdscr.getch() + + if key == curses.KEY_UP: + selected = (selected - 1) % len(mode_list) + elif key == curses.KEY_DOWN: + selected = (selected + 1) % len(mode_list) + elif key in [10, 13]: # Enter + name, custom_mode = mode_list[selected] + capture = PacketCapture() + mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1) + mav.version = 2 + + msg = mav.command_long_encode( + target_system=target_system, + target_component=target_component, + command=MAV_CMD_DO_SET_MODE, + confirmation=0, + param1=1, + param2=custom_mode, + param3=0, param4=0, param5=0, param6=0, param7=0 + ) + print("msg =", msg) # 確認封包物件生成 + + mav.send(msg) # 改為 send() 會寫入 capture + print("RAW HEX:", capture.data.hex()) + + api_frame = build_api_tx_frame(capture.data) + ser.write(api_frame) + + # 顯示封包資訊 + msg_line = min(h - 4, len(mode_list) + 5) + stdscr.addstr(msg_line, 2, f"發送模式切換:{name} ({custom_mode})") + stdscr.addstr(msg_line + 1, 2, f"MAVLink HEX: {' '.join(f'{b:02x}' for b in capture.data)[:w-4]}") + stdscr.addstr(msg_line + 2, 2, f"XBee API HEX: {' '.join(f'{b:02x}' for b in api_frame)[:w-4]}") + stdscr.refresh() + curses.napms(1500) + + elif key in [ord('q'), ord('Q')]: + break + + stdscr.refresh() + + +try: + curses.wrapper(lambda stdscr: curses_main(stdscr)) # 使用 lambda 函數來傳遞 ser + +except Exception as e: + print(f"發生錯誤: {e}") +finally: + ser.close() + print("程式結束,串口已關閉") diff --git a/src/unitdev04/endpoint_at.py b/src/unitdev04/endpoint_at.py index b6e0bfb..21c53f4 100644 --- a/src/unitdev04/endpoint_at.py +++ b/src/unitdev04/endpoint_at.py @@ -38,4 +38,4 @@ try: pass except KeyboardInterrupt: print("\nProgram terminated.") - ser.close() + ser.close() \ No newline at end of file diff --git a/src/unitdev04/receive.py b/src/unitdev04/receive.py new file mode 100644 index 0000000..eab6436 --- /dev/null +++ b/src/unitdev04/receive.py @@ -0,0 +1,60 @@ +from xbee import XBee +import serial +from pymavlink import mavutil +import time + +# 配置 XBee 串口連接 +ser = serial.Serial('COM5', 57600) # XBee 串口 + + +# 無人機系統 ID(假設為 5)和目標系統 ID(假設為 1) +system_id = 5 +target_system_id = 1 # 設定目標系統 ID,這裡假設目標系統 ID 是 1 + +# 目標設備的 64 位地址(請替換為實際的無人機地址) +target_address = b'\x00\x13\xa2\x00\x40\x5f\x88\x56' # 例如:00 13 A2 00 40 5F 88 56 + +def decode_mavlink_data(data,master): + """解碼 MAVLink 的原始數據並處理 HEARTBEAT 訊息""" + + try: + msg = master.parse_char(data) + if msg: + if msg.get_type() == "HEARTBEAT": + print(f"Raw MAVLink Data (Hex): {data.hex()}") + print(f"Received MAVLink message: {msg}") + print(f"System status: {msg.system_status}") + print(f"Flight mode: {mavutil.mode_string_v10(msg)}") + print(f"System ID: {msg.get_srcSystem()}") # 使用 get_srcSystem() 獲取 system_id + except Exception as e: + print(f"Failed to decode MAVLink data: {e}") + +def receive_packets(ser): + xbee = XBee(ser) + # 創建 MAVLink 解析器並與 XBee 串口連接 + master = mavutil.mavlink.MAVLink(ser) + + while True: + try: + # 從 XBee 接收數據 + xbee_data = xbee.wait_read_frame() + + # 讀取 `rf_data` 而非 `payload` + raw_data = xbee_data.get('rf_data', None) + if raw_data is None: + print("Warning: No 'rf_data' found in received XBee data!") + continue # 跳過此次循環 + + # 解碼 MAVLink 訊息 + decode_mavlink_data(raw_data,master) + + # 根據需要觸發模式切換,例如根據用戶輸入更改模式 + + + except KeyboardInterrupt: + print("Exiting...") + break + except Exception as e: + print(f"Error: {e}") + + time.sleep(0.1)