import os import sys sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # 基礎功能的 import import queue import time # ROS2 的 import import rclpy # mavlink 的 import from pymavlink import mavutil # 自定義的 import from ..fc_network_adapter import mavlinkObject as mo from ..fc_network_adapter import mavlinkVehicleView as mvv # from ..fc_network_adapter import mavlinkDevice as md # ====================== 分割線 ===================== test_item = 10 running_time = 3 print('test_item : ', test_item) ''' 測試項 個位數 表示分離的功能 測試項 1X 表示 mavlink_object 的功能 測試連線的能力 ''' if test_item == 10: # 需要開啟一個 ardupilot 的模擬器 # 測試 mavlink_object 放入 ring buffer 的應用 print('===> Start of Program .Test ', test_item) # 清空 ring buffer mo.stream_bridge_ring.clear() mo.return_packet_ring.clear() manager = mo.async_io_manager() manager.start() time.sleep(0.5) # 等待事件循環啟動 # 初始化輸入通道 connection_string="udp:127.0..1:14571" mavlink_socket1 = mavutil.mavlink_connection(connection_string) mavlink_object1 = mo.mavlink_object(mavlink_socket1) sock = mavlink_socket1.port print("Socket port:", sock) manager.add_mavlink_object(mavlink_object1) start_time = time.time() while (time.time() - start_time) < running_time: items_a = mo.stream_bridge_ring.get_all() items_b = mo.return_packet_ring.get_all() try: print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}") except IndexError: print("stream_bridge_ring is empty") try: print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}") except IndexError: print("return_packet_ring is empty") time.sleep(1) manager.shutdown() print('<=== End of Program') elif test_item == 11: # 需要開啟一個 ardupilot 的模擬器 # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 print('===> Start of Program .Test ', test_item) # 清空 ring buffer mo.stream_bridge_ring.clear() mo.return_packet_ring.clear() manager = mo.async_io_manager() manager.start() time.sleep(0.5) # 等待事件循環啟動 # 初始化輸入通道 connection_string="udp:127.0.0.1:14571" mavlink_socket1 = mavutil.mavlink_connection(connection_string) mavlink_object1 = mo.mavlink_object(mavlink_socket1) manager.add_mavlink_object(mavlink_object1) # 啟動 mavlink_bridge bridge = mo.mavlink_bridge() bridge.start() time.sleep(3) # 印出目前所有 mavlink_systems 的內容 print('目前所有的系統 : ') all_vehicles = mvv.vehicle_registry.get_all() for sysid, vehicle in all_vehicles.items(): print(f" System {sysid}: {vehicle}") start_time = time.time() show_time = time.time() while time.time() - start_time < running_time: if (time.time() - show_time) >= 2: # print("mark B") show_time = time.time() for sysid, vehicle in all_vehicles.items(): for compid in vehicle.components: comp = vehicle.get_component(compid) print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, comp.packet_stats.received_count)) comp.reset_packet_stats() print("===================") manager.shutdown() bridge.stop() print('<=== End of Program') elif test_item == 12: # 需要開啟一個 ardupilot 的模擬器 與 GCS # 這邊是測試 mavlink object 作為交換器功能的代碼 print('===> Start of Program .Test ', test_item) # 清空 ring buffer mo.stream_bridge_ring.clear() mo.return_packet_ring.clear() manager = mo.async_io_manager() manager.start() time.sleep(0.5) # 等待事件循環啟動 # 初始化輸入通道 connection_string="udp:127.0.0.1:14571" mavlink_socket_in1 = mavutil.mavlink_connection(connection_string) mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1) connection_string="udp:127.0.0.1:14571" mavlink_socket_in2 = mavutil.mavlink_connection(connection_string) mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2) # 初始化輸出通道 connection_string="udpout:127.0.0.1:14551" mavlink_socket_out = mavutil.mavlink_connection(connection_string) mavlink_object_out = mo.mavlink_object(mavlink_socket_out) manager.add_mavlink_object(mavlink_object_out) manager.add_mavlink_object(mavlink_object_in1) manager.add_mavlink_object(mavlink_object_in2) time.sleep(1) # 等待通道啟動 mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id) mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id) mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id) mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id) start_time = time.time() while (time.time() - start_time) < running_time: time.sleep(1) manager.shutdown() print('<=== End of Program') elif test_item == 21: # 需要開啟一個 ardupilot 的模擬器 # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 print('===> Start of Program .Test ', test_item) # 初始化 rclpy 才能使用 node rclpy.init() # 清空 ring buffer mo.stream_bridge_ring.clear() mo.return_packet_ring.clear() manager = mo.async_io_manager() manager.start() # 啟動 mavlink_bridge analyzer = mo.mavlink_bridge() # 關於 Node 的初始化 show_time = time.time() analyzer._init_node() # 初始化 node print('初始化 node 完成 耗時 : ',time.time() - show_time) time.sleep(0.5) # 系統 Setup 完成 # 創建通道 connection_string="udp:127.0.0.1:14560" mavlink_socket = mavutil.mavlink_connection(connection_string) mavlink_object3 = mo.mavlink_object(mavlink_socket) manager.add_mavlink_object(mavlink_object3) print('=== waiting for mavlink data ...') time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 print('目前所有的系統 : ') for sysid in analyzer.mavlink_systems: print(analyzer.mavlink_systems[sysid]) compid = 1 sysid = 1 start_time = time.time() analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) end_time = time.time() print(f"Execution time for create_flightMode: {end_time - start_time} seconds") print("start emit info") start_time = time.time() show_time = time.time() while time.time() - start_time < running_time: try: # print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) analyzer.emit_info() # 這邊是測試 node 的運行 time.sleep(1) except KeyboardInterrupt: break # 程式結束 analyzer.destroy_node() rclpy.shutdown() # 結束程式 退出所有 thread manager.stop() analyzer.stop() analyzer.thread.join() mavlink_socket.close() print('<=== End of Program') elif test_item == 52: print('===> Start of Program .Test ', test_item) manager = mo.async_io_manager() manager.start() # print(manager.thread.is_alive()) manager.shutdown() time.sleep(1) print('manager stopped')