import os import sys sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # 基礎功能的 import import queue import time # ROS2 的 import import rclpy # mavlink 的 import from pymavlink import mavutil # 自定義的 import from ..fc_network_adapter import mavlinkObject as mo from ..fc_network_adapter import mavlinkDevice as md # ====================== 分割線 ===================== test_item = 10 running_time = 10000 print('test_item : ', test_item) ''' 測試項 個位數 表示分離的功能 測試項 1X 表示 mavlink_object 的功能 測試連線的能力 ''' if test_item == 10: # 需要開啟一個 ardupilot 的模擬器 # 測試 mavlink_object 放入 ring buffer 的應用 print('===> Start of Program .Test ', test_item) # 清空 ring buffer mo.stream_bridge_ring.clear() mo.return_packet_ring.clear() manager = mo.async_io_manager() manager.start() time.sleep(0.5) # 等待事件循環啟動 # 初始化輸入通道 connection_string="udp:127.0.0.1:14560" mavlink_socket1 = mavutil.mavlink_connection(connection_string) mavlink_object1 = mo.mavlink_object(mavlink_socket1) manager.add_mavlink_object(mavlink_object1) start_time = time.time() while (time.time() - start_time) < running_time: items_a = mo.stream_bridge_ring.get_all() items_b = mo.return_packet_ring.get_all() try: print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}") except IndexError: print("stream_bridge_ring is empty") try: print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}") except IndexError: print("return_packet_ring is empty") time.sleep(1) manager.stop() print('<=== End of Program') elif test_item == 11: # 需要開啟一個 ardupilot 的模擬器 # 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況 print('===> Start of Program .Test ', test_item) # 清空 ring buffer mo.stream_bridge_ring.clear() mo.return_packet_ring.clear() manager = mo.async_io_manager() manager.start() time.sleep(0.5) # 等待事件循環啟動 # 初始化輸入通道 connection_string="udp:127.0.0.1:14560" mavlink_socket1 = mavutil.mavlink_connection(connection_string) mavlink_object1 = mo.mavlink_object(mavlink_socket1) manager.add_mavlink_object(mavlink_object1) # 啟動 mavlink_bridge analyzer = mo.mavlink_bridge() time.sleep(3) # 印出目前所有 mavlink_systems 的內容 print('目前所有的系統 : ') for sysid in analyzer.mavlink_systems: print(analyzer.mavlink_systems[sysid]) start_time = time.time() show_time = time.time() while time.time() - start_time < running_time: if (time.time() - show_time) >= 2: # print("mark B") show_time = time.time() for sysid in analyzer.mavlink_systems: for compid in analyzer.mavlink_systems[sysid].components: print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) print("===================") manager.stop() analyzer.stop() print('<=== End of Program') elif test_item == 12: # 需要開啟一個 ardupilot 的模擬器 與 GCS # 這邊是測試 mavlink object 作為交換器功能的代碼 print('===> Start of Program .Test ', test_item) # 清空 ring buffer mo.stream_bridge_ring.clear() mo.return_packet_ring.clear() manager = mo.async_io_manager() manager.start() time.sleep(0.5) # 等待事件循環啟動 # 初始化輸入通道 connection_string="udp:127.0.0.1:14560" mavlink_socket_in1 = mavutil.mavlink_connection(connection_string) mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1) connection_string="udp:127.0.0.1:14561" mavlink_socket_in2 = mavutil.mavlink_connection(connection_string) mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2) # 初始化輸出通道 connection_string="udpout:127.0.0.1:14550" mavlink_socket_out = mavutil.mavlink_connection(connection_string) mavlink_object_out = mo.mavlink_object(mavlink_socket_out) manager.add_mavlink_object(mavlink_object_out) manager.add_mavlink_object(mavlink_object_in1) manager.add_mavlink_object(mavlink_object_in2) time.sleep(1) # 等待通道啟動 mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id) mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id) mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id) mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id) start_time = time.time() while (time.time() - start_time) < running_time: time.sleep(1) manager.stop() print('<=== End of Program') # elif test_item == 20: # # 這邊測試 node 生成 topic 的功能 # # 利用 空的通道 發出假的 heartbeat 封包 # print('===> Start of Program .Test ', test_item) # rclpy.init() # 注意要初始化 rclpy 才能使用 node # from pymavlink.dialects.v20 import common as mavlink2 # sysid = 1 # compid = 1 # # 啟動 mavlink_bridge # analyzer = mo.mavlink_bridge() # mav = mavlink2.MAVLink(None) # mav.srcSystem = sysid # mav.srcComponent = compid # # 這是一個 heartbeat 封包 # fake_heartbeat = mavlink2.MAVLink_heartbeat_message( # type=mavlink2.MAV_TYPE_QUADROTOR, # autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA, # base_mode=3, # custom_mode=0, # system_status=0, # mavlink_version=3 # ) # mavlink_bytes = fake_heartbeat.pack(mav) # fake_heartbeat_msg = mav.parse_char(mavlink_bytes) # print(analyzer.mavlink_systems) # mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data # time.sleep(0.1) # print(analyzer.mavlink_systems) # # ROS2 初始化 # analyzer._init_node() # # 創建 ROS2 topic # analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid # print("topic created") # time.sleep(5) # # 丟出 topic # analyzer.emit_info() # # 結束程式 # analyzer.running = False # analyzer.destroy_node() # rclpy.shutdown() # analyzer.stop() # analyzer.thread.join() # print('<=== End of Program') elif test_item == 21: # 需要開啟一個 ardupilot 的模擬器 # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 print('===> Start of Program .Test ', test_item) # 初始化 rclpy 才能使用 node rclpy.init() # 清空 ring buffer mo.stream_bridge_ring.clear() mo.return_packet_ring.clear() manager = mo.async_io_manager() manager.start() # 啟動 mavlink_bridge analyzer = mo.mavlink_bridge() # 關於 Node 的初始化 show_time = time.time() analyzer._init_node() # 初始化 node print('初始化 node 完成 耗時 : ',time.time() - show_time) time.sleep(0.5) # 系統 Setup 完成 # 創建通道 connection_string="udp:127.0.0.1:14560" mavlink_socket = mavutil.mavlink_connection(connection_string) mavlink_object3 = mo.mavlink_object(mavlink_socket) manager.add_mavlink_object(mavlink_object3) print('=== waiting for mavlink data ...') time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 print('目前所有的系統 : ') for sysid in analyzer.mavlink_systems: print(analyzer.mavlink_systems[sysid]) compid = 1 sysid = 1 start_time = time.time() analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) end_time = time.time() print(f"Execution time for create_flightMode: {end_time - start_time} seconds") print("start emit info") start_time = time.time() show_time = time.time() while time.time() - start_time < running_time: try: # print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) analyzer.emit_info() # 這邊是測試 node 的運行 time.sleep(1) except KeyboardInterrupt: break # 程式結束 analyzer.destroy_node() rclpy.shutdown() # 結束程式 退出所有 thread manager.stop() analyzer.stop() analyzer.thread.join() mavlink_socket.close() print('<=== End of Program') # elif test_item == 22: # # 需要開啟一個 ardupilot 的模擬器 與 GCS # # 這邊是測試代碼 引入 rclpy 來測試 node 的運行 # print('===> Start of Program .Test ', test_item) # rclpy.init() # 注意要初始化 rclpy 才能使用 node # # 啟動 mavlink_bridge # analyzer = mo.mavlink_bridge() # # 關於 Node 的初始化 # show_time = time.time() # analyzer._init_node() # 初始化 node # print('初始化 node 完成 耗時 : ',time.time() - show_time) # # 初始化兩個通道 # connection_string_in="udp:127.0.0.1:15551" # mavlink_socket_in = mavutil.mavlink_connection(connection_string_in) # mavlink_object_in = mo.mavlink_object(mavlink_socket_in) # mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] # connection_string_out="udpout:127.0.0.1:14553" # mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) # mavlink_object_out = mo.mavlink_object(mavlink_socket_out) # mavlink_object_out.multiplexingToAnalysis = [] # # 讓兩個通道互相傳輸 # mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all # mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all # # 啟動通道 # mavlink_object_in.run() # mavlink_object_out.run() # print('waiting for mavlink data ...') # time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 # print('目前所有的系統 : ') # for sysid in analyzer.mavlink_systems: # print(analyzer.mavlink_systems[sysid]) # compid = 1 # sysid = 1 # show_time = time.time() # analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # print(f"Execution time for create_flightMode: {time.time() - show_time} seconds") # print("start emit info") # start_time = time.time() # show_time = time.time() # show_time2 = time.time() # while time.time() - start_time < running_time: # if (time.time() - show_time2) >= 1: # analyzer.emit_info() # 這邊是測試 node 的運行 # # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode'] # fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode'] # sss = mavutil.mode_string_v10(fmsg) # # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq())) # print("目前的飛行模式 : {}".format(sss)) # show_time2 = time.time() # # if (time.time() - show_time) >= 2: # # show_time = time.time() # # for sysid in analyzer.mavlink_systems: # # for compid in analyzer.mavlink_systems[sysid].components: # # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) # # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) # # print("===================") # analyzer.destroy_node() # rclpy.shutdown() # # 結束程式 退出所有 thread # mavlink_object_in.stop() # mavlink_object_in.thread.join() # mavlink_socket_in.close() # mavlink_object_out.stop() # mavlink_object_out.thread.join() # mavlink_socket_out.close() # analyzer.stop() # analyzer.thread.join() # print('<=== End of Program') # elif test_item == 51: # # 晉凱的測試項目 # print('===> Start of Program .Test ', test_item) # rclpy.init() # 注意要初始化 rclpy 才能使用 node # # 啟動 mavlink_bridge # analyzer = mo.mavlink_bridge() # # 關於 Node 的初始化 # show_time = time.time() # analyzer._init_node() # 初始化 node # print('初始化 node 完成 耗時 : ',time.time() - show_time) # # 創建通道 # connection_string="udp:127.0.0.1:15551" # mavlink_socket3 = mavutil.mavlink_connection(connection_string) # mavlink_object3 = mo.mavlink_object(mavlink_socket3) # # 設定通道流動 # mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147] # mavlink_object3.multiplexingToReturn = [] # # # mavlink_object3.multiplexingToSwap = [] # # # 啟動通道 # mavlink_object3.run() # print('waiting for mavlink data ...') # time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 # compid = 1 # sysid = 1 # start_time = time.time() # analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid]) # analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid]) # analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid]) # analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid]) # analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid]) # analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid]) # analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid]) # end_time = time.time() # print(f"Execution time for create all topic: {end_time - start_time} seconds") # print("start emit info") # start_time = time.time() # show_time = time.time() # while time.time() - start_time < running_time: # try: # # rclpy.spin(analyzer) # analyzer.emit_info() # 這邊是測試 node 的運行 # time.sleep(0.5) # except KeyboardInterrupt: # break # analyzer.destroy_node() # rclpy.shutdown() # # 結束程式 退出所有 thread # mavlink_object3.stop() # mavlink_object3.thread.join() # analyzer.stop() # analyzer.thread.join() # mavlink_socket3.close() # print('<=== End of Program')