You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
278 lines
7.9 KiB
Python
278 lines
7.9 KiB
Python
import os
|
|
import sys
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
|
|
# 基礎功能的 import
|
|
import queue
|
|
import time
|
|
|
|
# ROS2 的 import
|
|
import rclpy
|
|
|
|
# mavlink 的 import
|
|
from pymavlink import mavutil
|
|
|
|
# 自定義的 import
|
|
from ..fc_network_adapter import mavlinkObject as mo
|
|
from ..fc_network_adapter import mavlinkVehicleView as mvv
|
|
# from ..fc_network_adapter import mavlinkDevice as md
|
|
|
|
# ====================== 分割線 =====================
|
|
|
|
test_item = 1
|
|
running_time = 3
|
|
|
|
|
|
print('test_item : ', test_item)
|
|
|
|
'''
|
|
測試項 個位數 表示分離的功能
|
|
|
|
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
|
|
'''
|
|
|
|
if test_item == 1:
|
|
print('===> Start of Program .Test ', test_item)
|
|
|
|
connection_string="udp:127.0.0.1:14591"
|
|
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
|
|
# mavlink_object1 = mo.mavlink_object(mavlink_socket1)
|
|
|
|
time.sleep(1)
|
|
|
|
print("mark A")
|
|
|
|
# print("Socket IP:", mavlink_socket1.target_system)
|
|
print("Socket port:", mavlink_socket1.port.getsockname())
|
|
|
|
# print("=== ", dir(mavlink_socket1.port))
|
|
|
|
|
|
elif test_item == 10:
|
|
# 需要開啟一個 ardupilot 的模擬器
|
|
# 測試 mavlink_object 放入 ring buffer 的應用
|
|
print('===> Start of Program .Test ', test_item)
|
|
|
|
# 清空 ring buffer
|
|
mo.stream_bridge_ring.clear()
|
|
mo.return_packet_ring.clear()
|
|
|
|
manager = mo.async_io_manager()
|
|
manager.start()
|
|
time.sleep(0.5) # 等待事件循環啟動
|
|
|
|
# 初始化輸入通道
|
|
connection_string="udp:127.0..1:14571"
|
|
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
|
|
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
|
|
|
|
sock = mavlink_socket1.port
|
|
print("Socket port:", sock)
|
|
|
|
manager.add_mavlink_object(mavlink_object1)
|
|
|
|
start_time = time.time()
|
|
while (time.time() - start_time) < running_time:
|
|
items_a = mo.stream_bridge_ring.get_all()
|
|
items_b = mo.return_packet_ring.get_all()
|
|
try:
|
|
print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}")
|
|
except IndexError:
|
|
print("stream_bridge_ring is empty")
|
|
|
|
try:
|
|
print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}")
|
|
except IndexError:
|
|
print("return_packet_ring is empty")
|
|
time.sleep(1)
|
|
|
|
manager.shutdown()
|
|
|
|
print('<=== End of Program')
|
|
|
|
elif test_item == 11:
|
|
# 需要開啟一個 ardupilot 的模擬器
|
|
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
|
|
print('===> Start of Program .Test ', test_item)
|
|
|
|
# 清空 ring buffer
|
|
mo.stream_bridge_ring.clear()
|
|
mo.return_packet_ring.clear()
|
|
|
|
manager = mo.async_io_manager()
|
|
manager.start()
|
|
time.sleep(0.5) # 等待事件循環啟動
|
|
|
|
# 初始化輸入通道
|
|
connection_string="udp:127.0.0.1:14571"
|
|
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
|
|
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
|
|
manager.add_mavlink_object(mavlink_object1)
|
|
|
|
# 啟動 mavlink_bridge
|
|
bridge = mo.mavlink_bridge()
|
|
bridge.start()
|
|
|
|
time.sleep(3)
|
|
|
|
# 印出目前所有 mavlink_systems 的內容
|
|
print('目前所有的系統 : ')
|
|
all_vehicles = mvv.vehicle_registry.get_all()
|
|
for sysid, vehicle in all_vehicles.items():
|
|
print(f" System {sysid}: {vehicle}")
|
|
|
|
start_time = time.time()
|
|
show_time = time.time()
|
|
while time.time() - start_time < running_time:
|
|
if (time.time() - show_time) >= 2:
|
|
# print("mark B")
|
|
|
|
show_time = time.time()
|
|
for sysid, vehicle in all_vehicles.items():
|
|
for compid in vehicle.components:
|
|
comp = vehicle.get_component(compid)
|
|
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, comp.packet_stats.received_count))
|
|
comp.reset_packet_stats()
|
|
print("===================")
|
|
|
|
manager.shutdown()
|
|
bridge.stop()
|
|
|
|
print('<=== End of Program')
|
|
|
|
elif test_item == 12:
|
|
# 需要開啟一個 ardupilot 的模擬器 與 GCS
|
|
# 這邊是測試 mavlink object 作為交換器功能的代碼
|
|
print('===> Start of Program .Test ', test_item)
|
|
|
|
# 清空 ring buffer
|
|
mo.stream_bridge_ring.clear()
|
|
mo.return_packet_ring.clear()
|
|
|
|
manager = mo.async_io_manager()
|
|
manager.start()
|
|
time.sleep(0.5) # 等待事件循環啟動
|
|
|
|
# 初始化輸入通道
|
|
connection_string="udp:127.0.0.1:14571"
|
|
mavlink_socket_in1 = mavutil.mavlink_connection(connection_string)
|
|
mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1)
|
|
|
|
connection_string="udp:127.0.0.1:14571"
|
|
mavlink_socket_in2 = mavutil.mavlink_connection(connection_string)
|
|
mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2)
|
|
|
|
# 初始化輸出通道
|
|
connection_string="udpout:127.0.0.1:14551"
|
|
mavlink_socket_out = mavutil.mavlink_connection(connection_string)
|
|
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
|
|
|
|
manager.add_mavlink_object(mavlink_object_out)
|
|
manager.add_mavlink_object(mavlink_object_in1)
|
|
manager.add_mavlink_object(mavlink_object_in2)
|
|
|
|
time.sleep(1) # 等待通道啟動
|
|
|
|
mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id)
|
|
mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id)
|
|
|
|
mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id)
|
|
mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id)
|
|
|
|
start_time = time.time()
|
|
while (time.time() - start_time) < running_time:
|
|
|
|
time.sleep(1)
|
|
|
|
manager.shutdown()
|
|
|
|
print('<=== End of Program')
|
|
|
|
|
|
|
|
elif test_item == 21:
|
|
# 需要開啟一個 ardupilot 的模擬器
|
|
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
|
|
|
|
print('===> Start of Program .Test ', test_item)
|
|
# 初始化 rclpy 才能使用 node
|
|
rclpy.init()
|
|
|
|
# 清空 ring buffer
|
|
mo.stream_bridge_ring.clear()
|
|
mo.return_packet_ring.clear()
|
|
|
|
manager = mo.async_io_manager()
|
|
manager.start()
|
|
|
|
# 啟動 mavlink_bridge
|
|
analyzer = mo.mavlink_bridge()
|
|
# 關於 Node 的初始化
|
|
show_time = time.time()
|
|
analyzer._init_node() # 初始化 node
|
|
print('初始化 node 完成 耗時 : ',time.time() - show_time)
|
|
|
|
time.sleep(0.5) # 系統 Setup 完成
|
|
|
|
|
|
# 創建通道
|
|
connection_string="udp:127.0.0.1:14560"
|
|
mavlink_socket = mavutil.mavlink_connection(connection_string)
|
|
mavlink_object3 = mo.mavlink_object(mavlink_socket)
|
|
manager.add_mavlink_object(mavlink_object3)
|
|
|
|
print('=== waiting for mavlink data ...')
|
|
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
|
|
|
|
print('目前所有的系統 : ')
|
|
for sysid in analyzer.mavlink_systems:
|
|
print(analyzer.mavlink_systems[sysid])
|
|
|
|
compid = 1
|
|
sysid = 1
|
|
start_time = time.time()
|
|
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
|
end_time = time.time()
|
|
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
|
|
|
|
print("start emit info")
|
|
|
|
start_time = time.time()
|
|
show_time = time.time()
|
|
while time.time() - start_time < running_time:
|
|
try:
|
|
# print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
|
|
analyzer.emit_info() # 這邊是測試 node 的運行
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
break
|
|
|
|
|
|
# 程式結束
|
|
analyzer.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
# 結束程式 退出所有 thread
|
|
manager.stop()
|
|
analyzer.stop()
|
|
analyzer.thread.join()
|
|
|
|
mavlink_socket.close()
|
|
print('<=== End of Program')
|
|
|
|
|
|
elif test_item == 52:
|
|
print('===> Start of Program .Test ', test_item)
|
|
|
|
manager = mo.async_io_manager()
|
|
manager.start()
|
|
|
|
# print(manager.thread.is_alive())
|
|
|
|
manager.shutdown()
|
|
|
|
time.sleep(1)
|
|
|
|
print('manager stopped')
|
|
|