@ -14,8 +14,8 @@ import mavlinkDevice as md
# ====================== 分割線 =====================
# ====================== 分割線 =====================
test_item = 22
test_item = 51
running_time = 2 0
running_time = 3 0
print ( ' test_item : ' , test_item )
print ( ' test_item : ' , test_item )
if test_item == 1 :
if test_item == 1 :
@ -55,7 +55,7 @@ if test_item == 1:
ret = mavlink_object_none . updateMultiplexingList ( )
ret = mavlink_object_none . updateMultiplexingList ( )
print ( ' execute result : ' , ret )
print ( ' execute result : ' , ret )
print ( ' End of Program' )
print ( ' <=== End of Program' )
elif test_item == 2 :
elif test_item == 2 :
# 測試 mavlink_object 創建時 socket_id 是否正確
# 測試 mavlink_object 創建時 socket_id 是否正確
@ -73,7 +73,7 @@ elif test_item == 2:
print ( mavlink_object_none1 . _multiplexingList )
print ( mavlink_object_none1 . _multiplexingList )
print ( ' End of Program' )
print ( ' <=== End of Program' )
elif test_item == 10 :
elif test_item == 10 :
# 需要開啟一個 ardupilot 的模擬器
# 需要開啟一個 ardupilot 的模擬器
@ -128,7 +128,7 @@ elif test_item == 10:
mavlink_object1 . running = False
mavlink_object1 . running = False
mavlink_object1 . thread . join ( )
mavlink_object1 . thread . join ( )
mavlink_socket . close ( )
mavlink_socket . close ( )
print ( ' End of Program' )
print ( ' <=== End of Program' )
elif test_item == 11 :
elif test_item == 11 :
# 需要開啟一個 ardupilot 的模擬器
# 需要開啟一個 ardupilot 的模擬器
@ -179,7 +179,7 @@ elif test_item == 11:
analyzer = mo . mavlink_bridge ( ) # 這邊是測試是否只有一個 instance
analyzer = mo . mavlink_bridge ( ) # 這邊是測試是否只有一個 instance
analyzer . thread . join ( )
analyzer . thread . join ( )
mavlink_socket . close ( )
mavlink_socket . close ( )
print ( ' End of Program' )
print ( ' <=== End of Program' )
elif test_item == 12 :
elif test_item == 12 :
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 需要開啟一個 ardupilot 的模擬器 與 GCS
@ -252,7 +252,7 @@ elif test_item == 12:
mavlink_socket_out . close ( )
mavlink_socket_out . close ( )
analyzer . stop ( )
analyzer . stop ( )
print ( ' End of Program' )
print ( ' <=== End of Program' )
elif test_item == 20 :
elif test_item == 20 :
# 這邊測試 node 生成 topic 的功能
# 這邊測試 node 生成 topic 的功能
@ -308,7 +308,7 @@ elif test_item == 20:
rclpy . shutdown ( )
rclpy . shutdown ( )
analyzer . stop ( )
analyzer . stop ( )
analyzer . thread . join ( )
analyzer . thread . join ( )
print ( ' End of Program' )
print ( ' <=== End of Program' )
elif test_item == 21 :
elif test_item == 21 :
# 需要開啟一個 ardupilot 的模擬器
# 需要開啟一個 ardupilot 的模擬器
@ -370,7 +370,7 @@ elif test_item == 21:
analyzer . thread . join ( )
analyzer . thread . join ( )
mavlink_socket . close ( )
mavlink_socket . close ( )
print ( ' End of Program' )
print ( ' <=== End of Program' )
elif test_item == 22 :
elif test_item == 22 :
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 需要開啟一個 ardupilot 的模擬器 與 GCS
@ -389,6 +389,7 @@ elif test_item == 22:
connection_string_in = " udp:127.0.0.1:15551 "
connection_string_in = " udp:127.0.0.1:15551 "
mavlink_socket_in = mavutil . mavlink_connection ( connection_string_in )
mavlink_socket_in = mavutil . mavlink_connection ( connection_string_in )
mavlink_object_in = mo . mavlink_object ( mavlink_socket_in )
mavlink_object_in = mo . mavlink_object ( mavlink_socket_in )
mavlink_object_in . multiplexingToAnalysis = [ 0 , 30 , 32 , 33 , 74 , 147 ]
connection_string_out = " udpout:127.0.0.1:14553 "
connection_string_out = " udpout:127.0.0.1:14553 "
@ -460,4 +461,71 @@ elif test_item == 22:
analyzer . thread . join ( )
analyzer . thread . join ( )
print ( ' End of Program ' )
print ( ' <=== End of Program ' )
elif test_item == 51 :
# 晉凱的測試項目
print ( ' ===> Start of Program .Test ' , test_item )
rclpy . init ( ) # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo . mavlink_bridge ( )
# 關於 Node 的初始化
show_time = time . time ( )
analyzer . _init_node ( ) # 初始化 node
print ( ' 初始化 node 完成 耗時 : ' , time . time ( ) - show_time )
# 創建通道
connection_string = " udp:127.0.0.1:15551 "
mavlink_socket3 = mavutil . mavlink_connection ( connection_string )
mavlink_object3 = mo . mavlink_object ( mavlink_socket3 )
# 設定通道流動
mavlink_object3 . multiplexingToAnalysis = [ 0 , 30 , 32 , 33 , 74 , 147 ]
mavlink_object3 . multiplexingToReturn = [ ] #
# mavlink_object3.multiplexingToSwap = [] #
# 啟動通道
mavlink_object3 . run ( )
print ( ' waiting for mavlink data ... ' )
time . sleep ( 2 ) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
compid = 1
sysid = 1
start_time = time . time ( )
analyzer . create_flightMode ( sysid , analyzer . mavlink_systems [ sysid ] . components [ compid ] )
analyzer . create_attitude ( sysid , analyzer . mavlink_systems [ sysid ] . components [ compid ] )
analyzer . create_local_position_pose ( sysid , analyzer . mavlink_systems [ sysid ] . components [ compid ] )
analyzer . create_local_position_velocity ( sysid , analyzer . mavlink_systems [ sysid ] . components [ compid ] )
analyzer . create_global_global ( sysid , analyzer . mavlink_systems [ sysid ] . components [ compid ] )
analyzer . create_vfr_hud ( sysid , analyzer . mavlink_systems [ sysid ] . components [ compid ] )
analyzer . create_battery ( sysid , analyzer . mavlink_systems [ sysid ] . components [ compid ] )
analyzer . create_global_rel ( sysid , analyzer . mavlink_systems [ sysid ] . components [ compid ] )
end_time = time . time ( )
print ( f " Execution time for create all topic: { end_time - start_time } seconds " )
print ( " start emit info " )
start_time = time . time ( )
show_time = time . time ( )
while time . time ( ) - start_time < running_time :
try :
# rclpy.spin(analyzer)
analyzer . emit_info ( ) # 這邊是測試 node 的運行
time . sleep ( 0.5 )
except KeyboardInterrupt :
break
analyzer . destroy_node ( )
rclpy . shutdown ( )
# 結束程式 退出所有 thread
mavlink_object3 . stop ( )
mavlink_object3 . thread . join ( )
analyzer . stop ( )
analyzer . thread . join ( )
mavlink_socket3 . close ( )
print ( ' <=== End of Program ' )