測試功能整理 有bug修正中

講解程式先 commit 一版
chiyu
Chiyu Chen 12 months ago
parent f79aaf86fa
commit 84119b788e

@ -13,7 +13,7 @@ import mavlinkObject as mo
# ====================== 分割線 ===================== # ====================== 分割線 =====================
test_item = 12 test_item = 22
print('test_item : ', test_item) print('test_item : ', test_item)
if test_item == 1: if test_item == 1:
@ -214,7 +214,7 @@ elif test_item == 12:
print(type(mavlink_socket_out)) print(type(mavlink_socket_out))
print(type(mavlink_socket_out.mav)) print(type(mavlink_socket_out.mav))
while time.time() - start_time < 10: while time.time() - start_time < 100:
try: try:
test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False) test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False)
print('none object : ', test) print('none object : ', test)
@ -304,27 +304,28 @@ elif test_item == 21:
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行 # 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item) print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node rclpy.init() # 注意要初始化 rclpy 才能使用 node
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
mavlink_object3.multiplexingToAnalysis = [0] # only HEARTBEAT
mavlink_object3.multiplexingToReturn = [] # only HEARTBEAT
mavlink_object3.multiplexingToSwap = [] #
# 啟動 mavlink_bridge # 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge() analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化 # 關於 Node 的初始化
show_time = time.time() show_time = time.time()
analyzer._init_node() # 初始化 node analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time) print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 啟動連線的模組 # 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
# 啟動通道
mavlink_object3.run() mavlink_object3.run()
print('waiting for mavlink data ...') print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息 time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1 compid = 1
sysid = 1 sysid = 1
@ -341,7 +342,7 @@ elif test_item == 21:
try: try:
# rclpy.spin(analyzer) # rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行 analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(0.5) time.sleep(1)
except KeyboardInterrupt: except KeyboardInterrupt:
break break
@ -357,3 +358,91 @@ elif test_item == 21:
mavlink_socket.close() mavlink_socket.close()
print('End of Program') print('End of Program')
elif test_item == 22:
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 初始化兩個通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_none = mo.mavlink_object(None)
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
mavlink_object_in.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ]
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
show_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
show_time2 = time.time()
while time.time() - start_time < 100:
if (time.time() - show_time2) >= 1:
analyzer.emit_info() # 這邊是測試 node 的運行
# sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
sss = mavutil.mode_string_v10(fmsg)
print("目前的飛行模式 : ", sss)
show_time2 = time.time()
# if (time.time() - show_time) >= 2:
# show_time = time.time()
# for sysid in analyzer.mavlink_systems:
# for compid in analyzer.mavlink_systems[sysid].components:
# print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# print("===================")
time.sleep(1)
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_socket_in.close()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_out.close()
analyzer.stop()
analyzer.thread.join()
print('End of Program')

@ -115,6 +115,9 @@ class mavlink_bridge(Node, mavlink_publisher):
this_component.emitParams['base_mode'] = msg.base_mode this_component.emitParams['base_mode'] = msg.base_mode
this_component.emitParams['flightMode_mode'] = mavutil.mode_string_v10(msg) this_component.emitParams['flightMode_mode'] = mavutil.mode_string_v10(msg)
# print("get mode :", mavutil.mode_string_v10(msg)) # debug
# print("record mode :", this_component.emitParams['flightMode_mode']) # debug
this_component.emitParams['flightMode'] = msg # debug
elif msg.get_msgId() == 147: # BATTERY_STATUS 處理 elif msg.get_msgId() == 147: # BATTERY_STATUS 處理
this_component = self.mavlink_systems[sysid].components[msg.get_srcComponent()] this_component = self.mavlink_systems[sysid].components[msg.get_srcComponent()]
@ -239,7 +242,7 @@ class mavlink_object():
self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance
# logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替 # logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替
print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug # print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) # logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
# try: # try:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id)) # logger.info('mavlink_object instance {} deleted'.format(self.socket_id))

@ -0,0 +1,14 @@
import socket
SOCKET_PATH = "/tmp/unix_socket_example"
# 建立 socket
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(SOCKET_PATH)
# 傳送資料
client.sendall(b"Hello from client!")
data = client.recv(1024)
print("📤 Server replied:", data.decode())
client.close()

@ -0,0 +1,45 @@
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
# import mavros_msgs.srv
class TalkerNode(Node):
def __init__(self):
start_time = time.time()
super().__init__('talker_node')
end_time = time.time()
print(f"Node initialization took {end_time - start_time:.2f} seconds")
self.publisher_ = self.create_publisher(String, 'hahatest/_1', 10)
self.timer = self.create_timer(1.0, self.timer_callback) # 每秒執行一次
self.get_logger().info('TalkerNode has been started.')
def timer_callback(self):
msg = String()
msg.data = 'Hello, ROS 2!'
self.publisher_.publish(msg)
self.get_logger().info(f'Published: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
node = TalkerNode()
print("Before sleep")
time.sleep(5) # 等待 5 秒鐘
print("After sleep")
try:
start_time = time.time()
while time.time() - start_time < 10: # 持續 10 秒鐘
rclpy.spin_once(node)
time.sleep(1) # 每秒執行一次
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

@ -0,0 +1,33 @@
import socket
import os
# socket file path
SOCKET_PATH = "/tmp/unix_socket_example"
# 若檔案存在就先刪除
if os.path.exists(SOCKET_PATH):
os.remove(SOCKET_PATH)
# 建立 socket
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# 綁定 socket 到檔案
server.bind(SOCKET_PATH)
server.listen(1)
print("🔌 Server waiting for connection...")
# 等待 client 連線
conn, _ = server.accept()
print("✅ Client connected.")
while True:
data = conn.recv(1024)
if not data:
break
print("📥 Received:", data.decode())
conn.sendall(b"Echo: " + data)
conn.close()
server.close()
os.remove(SOCKET_PATH)
Loading…
Cancel
Save