Compare commits

...

34 Commits

Author SHA1 Message Date
wenchun 456d12ce56 Merge branch 'chiyu' into wenchun 11 months ago
Chiyu Chen a25372979b 整合後項目修正
1. mavlinkObject.py 統合 msg_id 參數名稱
2. mavlinkDevice.py 移除在預期操作下 仍會大量跳錯誤訊息的警示
3. devRun.py 重新驗證 merge 項目與版本調整
12 months ago
Chiyu Chen 4d82b92a11 Merge branch 'ken' into chiyu 12 months ago
Chiyu Chen f9080a5015 1. 修改 mavlinkObject.py 無法順利識別多台 vehicle 的重大bug
2. 重新編寫 devRun.py 讓其更好閱讀
12 months ago
Chiyu Chen 84119b788e 測試功能整理 有bug修正中
講解程式先 commit 一版
12 months ago
ken910606 4d694f28b4 Upload files to 'src/unitdev01/unitdev01' 1 year ago
Chiyu Chen f79aaf86fa mavlink_object 架構改善 新增功能 轉接不同 socket 的數據流
有時候不穩定 會有些小問題 之後再改善
另外 devRun 也更新了幾個範例
20號項 可以獨立測試 node 功能
12號項 展示最簡單的轉接到 GCS 功能
1 year ago
ken910606 945d79a6fa Upload files to 'src/unitdev01/unitdev01' 1 year ago
ken910606 72a59b1a3a Upload files to 'src/fc_network_adapter/fc_network_adapter' 1 year ago
ken910606 30239b87f8 Upload files to 'src/fc_network_adapter/fc_network_adapter' 1 year ago
ken910606 d8ef37fdc0 PyQt6 GUI 1 year ago
Chiyu Chen 0af71f8c8e 1. 重要參數命名變更
2. mavlink_object 建構式變更
3. 把開發代碼獨立到額外檔案 devRun.py
1 year ago
ken910606 53c062a218 Upload files to 'src/fc_network_adapter/fc_network_adapter' 1 year ago
ken910606 396da09100 Upload files to 'src/fc_network_adapter/fc_network_adapter' 1 year ago
ken910606 4dcff44bd3 Upload files to 'src/fc_network_adapter/fc_network_adapter' 1 year ago
ken910606 c38f277db4 Upload files to 'src/fc_network_adapter/fc_network_adapter' 1 year ago
ken910606 84901cecee 二進制封包 1 year ago
ken910606 de694fe301 ros2 1 year ago
ken910606 5535ab1dab Upload files to 'src/unitdev01/unitdev01' 1 year ago
ken910606 f6afd43e75 Upload files to 'src/unitdev01/unitdev01' 1 year ago
ken910606 f7f725a2ab Upload files to 'src/unitdev01/unitdev01' 1 year ago
Chiyu Chen 5c08a961cf code 沒有動 只是調整版面 與 新增一些註解 增加可讀性 1 year ago
Chiyu Chen bdd6cceb96 完成固定串流分析器的基礎結構並連結到 node topic
更嚴格的模組化 分析器 & socket adapter(mavlink object) & device 之間
程式可測試進入點為 mavlinkObject.py  依開發進度提供三個測試項目
新增 logger 功能 記錄執行狀況
1 year ago
Chiyu Chen b6c730f74a MAVLink simple message receiver 接通了 再來要實裝 node topic 1 year ago
Chiyu Chen d21bc6b069 Merge branch 'wenchun'
march 11 alignment
1 year ago
Chiyu Chen c9802e3eb1 Merge branch 'lunu'
march 11 alignment
1 year ago
Chiyu Chen 4d8ffc4a95 Merge branch 'ken'
march 11 alignment
1 year ago
Chiyu Chen ac272a6c3d Enhance mavlinkObject with multiplexing capabilities and add error handling; update README with team member assignments 1 year ago
lenting89 ff5c7718be 轉二進位封包 1 year ago
picard cd9d055409 temp commit for share code 1 year ago
ken910606 ee4cf1d07c Update 'src/unitdev01/unitdev01/mavlink.py' 1 year ago
ken910606 b8965c0bd6 Update 'src/unitdev01/unitdev01/interface.py' 1 year ago
ken910606 a8b36c391f Update 'src/unitdev01/unitdev01/interface.py' 1 year ago
picard 5c5cadc9da mavlink to gazebo devtest 02 1 year ago

3
.gitignore vendored

@ -8,7 +8,8 @@
# 編譯的資料夾
**/build/
**/install/
log/
**/log/
**/logs/
# CMAKE 的衍生檔
CMakeCache.txt

@ -1,22 +1,44 @@
這是天雷系統的專案
===
專案核心環境
專案核心框架
1. ROS2 Humble
2. Python
3. Ardupilot
2. Python 3.8.10
===
必要相依套件
Python
1. pymavlink
2.
ROS2
1. source ~/ros2_humble/install/setup.bash
2.
===
建議的開發測試
開發用輔助專案
1. Gazebo Garden
2. Ardupilot
===
Package 簡述
<<<<<<< HEAD
===
簡述0328
主要是透過test_transform這隻程式執行fdm_switch_example_one_with_remote_forwarding這個副函式透過UDP將封包轉接到Matlab並在Matlab進行演算法迭代後打包JSON檔並再次透過port口傳回封包傳回來的封包會在close_loop這隻程式被解析並提取其中有關velocity的資料儲存成變數以mavlink的方式打入Ardupilot以實現封閉迴路的構思。
=======
1. unitdev 為各自協作者做開發時的測試區
01 -> 晉凱(ken)
02 -> 其宇(chiyu)
03 -> 文鈞
04 -> 倫渝
2. fc_network_adapter
建立、維護與飛控韌體的連接
構築 mavlink 封包
同時處理與 Gazebo 的 ardupilot_plugin 溝通的 FDM/JSON 訊息
處理無線模組的通訊格式 (XBee)
>>>>>>> chiyu

@ -0,0 +1,531 @@
# 基礎功能的 import
import queue
import time
# ROS2 的 import
import rclpy
# mavlink 的 import
from pymavlink import mavutil
# 自定義的 import
import mavlinkObject as mo
import mavlinkDevice as md
# ====================== 分割線 =====================
test_item = 51
running_time = 30
print('test_item : ', test_item)
if test_item == 1:
# 測試 mavlink_object 中 updateMultiplexingList 的輸出
print('===> Start of Program .Test ', test_item)
mavlink_object_none = mo.mavlink_object(None)
print('=====================')
print('正常範例')
mavlink_object_none.multiplexingToAnalysis = []
mavlink_object_none.multiplexingToReturn = []
# mavlink_object_none.multiplexingToSwap = [[]]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 長度不一致')
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('=====================')
print('正常範例')
mo.swap_queues.append(queue.Queue())
mo.swap_queues.append(queue.Queue())
mavlink_object_none.multiplexingToAnalysis = [0, 1, 2, 3, 4, 5]
mavlink_object_none.multiplexingToReturn = [6, 7, 8, 9, 10]
mavlink_object_none.multiplexingToSwap = [[-1], [11,12,13], [14,15,16],]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('multiplexingList : ', mavlink_object_none._multiplexingList)
print('=====================')
print('錯誤範例 multiplexingToAnalysis 不可以有 -1')
mavlink_object_none.multiplexingToAnalysis = [-1, 1, 2, 3, 4, 5]
ret = mavlink_object_none.updateMultiplexingList()
print('execute result : ', ret)
print('<=== End of Program')
elif test_item == 2:
# 測試 mavlink_object 創建時 socket_id 是否正確
# 說實在 這個測試項 似乎因為 python 的 GC 機制 會導致難以測試
print('===> Start of Program .Test ', test_item)
mavlink_object_none1 = mo.mavlink_object(None)
mavlink_object_none2 = mo.mavlink_object(None)
mavlink_object_none3 = mo.mavlink_object(None)
del mavlink_object_none2
print(len(mo.swap_queues))
mavlink_object_none2 = mo.mavlink_object(None)
print(len(mo.swap_queues))
print(mavlink_object_none1._multiplexingList)
print('<=== End of Program')
elif test_item == 10:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 運行10秒 過程中把三個 queue 的資料印出來
# 只啟用了 mavlink_object 的功能
print('===> Start of Program .Test ', test_item)
# 創建一個空的通道 這個通道的 socket_id 是 0
mavlink_object_none = mo.mavlink_object(None)
# 創建另一個通道
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object1 = mo.mavlink_object(mavlink_socket)
# 設定通道轉發的參數
mavlink_object1.multiplexingToAnalysis = [30] # only ATTITUDE
mavlink_object1.multiplexingToReturn = [0] # only HEARTBEAT
mavlink_object1.multiplexingToSwap[0] = [74, ] # only VFR_HUD
# print(mavlink_object1.multiplexingToSwap)
# print(mo.swap_queues)
# 啟動通道
mavlink_object1.run()
# 確認轉拋的設定有沒有錯
print("_multiplexingList for mavlink object :", mavlink_object1._multiplexingList)
# 運行幾秒並印出 queue 的資料
start_time = time.time()
while time.time() - start_time < running_time:
while not mo.fixed_stream_bridge_queue.empty():
print('fixed_stream_bridge_queue:')
t = mo.fixed_stream_bridge_queue.get()
print('from {} : {}'.format(t[0],t[2]))
while not mo.return_packet_processor_queue.empty():
print('return_packet_processor_queue:')
t = mo.return_packet_processor_queue.get()
print('from {} : {}'.format(t[0],t[2]))
# print(t[2].get_msgbuf())
# print(t[2].type)
for n in range(len(mo.swap_queues)):
q = mo.swap_queues[n]
while not q.empty():
print('swap_queues:')
t = q.get()
print('{} gets : {}'.format(n,t))
# 結束程式 退出所有 thread
mavlink_object1.running = False
mavlink_object1.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 11:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
# 啟用 mavlink_object 與 mavlink_bridge的thread區塊 的功能
print('===> Start of Program .Test ', test_item)
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 創建通道
connection_string="udp:127.0.0.1:14550"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object2 = mo.mavlink_object(mavlink_socket)
# mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT
# mavlink_object2.multiplexingToReturn = []
# 啟動連線的模組
mavlink_object2.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(2)
print("=== connection established! ===")
# 印出目前所有 mavlink_systems 的內容
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
start_time = time.time()
show_time = time.time()
compid = 1
while time.time() - start_time < running_time:
if (time.time() - show_time) >= 1:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
print("目前收到的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].msg_count)
print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count)
print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 結束程式 退出所有 thread
mavlink_object2.stop()
mavlink_object2.thread.join()
analyzer.stop()
analyzer = mo.mavlink_bridge() # 這邊是測試是否只有一個 instance
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 12:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試 mavlink object 作為交換器功能的代碼
print('===> Start of Program .Test ', test_item)
# 啟動連線的模組
analyzer = mo.mavlink_bridge()
# 初始化輸入通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT
# 初始化輸出通道
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = [0]
# 做一個空的通道驗證 可以拿來 debug
mavlink_object_none = mo.mavlink_object(None)
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
# 做點延遲 讓 heartbeat 先吃進來
time.sleep(3)
print("=== connection established! ===")
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
# print(type(mavlink_socket_out))
# print(type(mavlink_socket_out.mav))
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False)
print('none object : ', test)
except queue.Empty:
pass
if (time.time() - show_time) >= 2:
show_time = time.time()
for sysid in analyzer.mavlink_systems:
for compid in analyzer.mavlink_systems[sysid].components:
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
print("===================")
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_in.close()
mavlink_socket_out.close()
analyzer.stop()
print('<=== End of Program')
elif test_item == 20:
# 這邊測試 node 生成 topic 的功能
# 利用 空的通道 發出假的 heartbeat 封包
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
from pymavlink.dialects.v20 import common as mavlink2
sysid = 1
compid = 1
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
mav = mavlink2.MAVLink(None)
mav.srcSystem = sysid
mav.srcComponent = compid
# 這是一個 heartbeat 封包
fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
type=mavlink2.MAV_TYPE_QUADROTOR,
autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
base_mode=3,
custom_mode=0,
system_status=0,
mavlink_version=3
)
mavlink_bytes = fake_heartbeat.pack(mav)
fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
print(analyzer.mavlink_systems)
mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
time.sleep(0.1)
print(analyzer.mavlink_systems)
# ROS2 初始化
analyzer._init_node()
# 創建 ROS2 topic
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
print("topic created")
time.sleep(5)
# 丟出 topic
analyzer.emit_info()
# 結束程式
analyzer.running = False
analyzer.destroy_node()
rclpy.shutdown()
analyzer.stop()
analyzer.thread.join()
print('<=== End of Program')
elif test_item == 21:
# 需要開啟一個 ardupilot 的模擬器
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket)
# 啟動通道
mavlink_object3.run()
print('=== waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(1)
except KeyboardInterrupt:
break
# 程式結束
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket.close()
print('<=== End of Program')
elif test_item == 22:
# 需要開啟一個 ardupilot 的模擬器 與 GCS
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 初始化兩個通道
connection_string_in="udp:127.0.0.1:15551"
mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
connection_string_out="udpout:127.0.0.1:14553"
mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
mavlink_object_out.multiplexingToAnalysis = []
# 讓兩個通道互相傳輸
mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
# 啟動通道
mavlink_object_in.run()
mavlink_object_out.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
print('目前所有的系統 : ')
for sysid in analyzer.mavlink_systems:
print(analyzer.mavlink_systems[sysid])
compid = 1
sysid = 1
show_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
show_time2 = time.time()
while time.time() - start_time < running_time:
if (time.time() - show_time2) >= 1:
analyzer.emit_info() # 這邊是測試 node 的運行
# sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
sss = mavutil.mode_string_v10(fmsg)
# print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
print("目前的飛行模式 : {}".format(sss))
show_time2 = time.time()
# if (time.time() - show_time) >= 2:
# show_time = time.time()
# for sysid in analyzer.mavlink_systems:
# for compid in analyzer.mavlink_systems[sysid].components:
# print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
# analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
# print("===================")
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object_in.stop()
mavlink_object_in.thread.join()
mavlink_socket_in.close()
mavlink_object_out.stop()
mavlink_object_out.thread.join()
mavlink_socket_out.close()
analyzer.stop()
analyzer.thread.join()
print('<=== End of Program')
elif test_item == 51:
# 晉凱的測試項目
print('===> Start of Program .Test ', test_item)
rclpy.init() # 注意要初始化 rclpy 才能使用 node
# 啟動 mavlink_bridge
analyzer = mo.mavlink_bridge()
# 關於 Node 的初始化
show_time = time.time()
analyzer._init_node() # 初始化 node
print('初始化 node 完成 耗時 : ',time.time() - show_time)
# 創建通道
connection_string="udp:127.0.0.1:15551"
mavlink_socket3 = mavutil.mavlink_connection(connection_string)
mavlink_object3 = mo.mavlink_object(mavlink_socket3)
# 設定通道流動
mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
mavlink_object3.multiplexingToReturn = [] #
# mavlink_object3.multiplexingToSwap = [] #
# 啟動通道
mavlink_object3.run()
print('waiting for mavlink data ...')
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
compid = 1
sysid = 1
start_time = time.time()
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
end_time = time.time()
print(f"Execution time for create all topic: {end_time - start_time} seconds")
print("start emit info")
start_time = time.time()
show_time = time.time()
while time.time() - start_time < running_time:
try:
# rclpy.spin(analyzer)
analyzer.emit_info() # 這邊是測試 node 的運行
time.sleep(0.5)
except KeyboardInterrupt:
break
analyzer.destroy_node()
rclpy.shutdown()
# 結束程式 退出所有 thread
mavlink_object3.stop()
mavlink_object3.thread.join()
analyzer.stop()
analyzer.thread.join()
mavlink_socket3.close()
print('<=== End of Program')

@ -0,0 +1,91 @@
# 自定義的 import
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkDevice.py")
# 用來記錄每個 system 的資訊
# 資料格式 { sysid : mavlink_device object }
mavlink_systems = {}
# ====================== 分割線 =====================
# 這個 class 是用來記錄每個 system 的資訊 每個 system 可能會有多個 component
class mavlink_device():
def __init__(self):
self.socket_id = None # 記錄來自於哪個 socket
self.sysid = None
self.components = {} # 用來記錄每個 component 的資訊 key 是 compid, value 是 component object
def __str__(self):
p_str = '=====================\n'
p_str += f'object id : {id(self)}\n' # debug
p_str += f'socket_id : {self.socket_id}\n'
p_str += f'sysid : {self.sysid}\n'
p_str += 'has components : \n'
for compid in self.components:
p_str += f'compid : {compid}\n'
p_str += f'mav_type : {self.components[compid].mav_type}\n'
# p_str += '=====================\n'
return p_str
def updateComponentPacketCount(self, compid, current_seq, current_type, current_time):
# 這段檢查遺失封包
try:
last_seq = self.components[compid].msg_seq
except KeyError:
# 這個 component id 還不存在
# logger.error('System ID : {} This Component ID : {} Did not init yet'.format(self.sysid, compid)) # 因為初始化的之前 會有大量非 heartbeat 的訊息進來 這是正常現象 TODO 之後要幫這個類別加上初始化狀態 再進行這個判斷
return
if last_seq != None:
expected_seq = (last_seq + 1) % 256
diff = current_seq - expected_seq
# print("current last exp diff : ",current_seq, last_seq, expected_seq, diff) # debug
if diff < 0:
diff += 256
self.components[compid].lost_packet_count += diff
# 這段更新封包的基本資訊
self.components[compid].msg_seq = current_seq
self.components[compid].last_msg_time = current_time
if current_type in self.components[compid].msg_count:
self.components[compid].msg_count[current_type] += 1
else:
self.components[compid].msg_count[current_type] = 1
def resetComponentPacketCount(self, compid):
self.components[compid].msg_count = {}
self.components[compid].msg_seq = None
self.components[compid].lost_packet_count = 0
class mavlink_component():
# 程式用不到的參數 但是做個記錄
# paraEmitList = ['base_mode', 'flightMode_mode',
# 'battery_voltage', # from BATTERY_STATUS (147)
# 'gps_fix_type', # from GPS_RAW_INT (24)
# 'roll', 'pitch', 'yaw', # from ATTITUDE (30)
# 'position_latitude', 'position_longitude', 'position_altitude', # from GLOBAL_POSITION_INT (33)
# 'heading', # for VFR_HUD (74)
# ]
def __init__(self):
self.mav_type = None # 表示 Vehicle 或 component type (例如: 旋翼機是2, 雲台是26, GCS是6)
self.mav_autopilot = None # 表示 autopilot type (例如: ardupilot是3, px4是12)
self.mav_system_status = None # 表示 system status (例如: active是3, standby是4)
# 紀錄從這個 component 收到最後的訊息序號和時間
self.msg_count = {}
self.msg_seq = None
self.lost_packet_count = 0
self.last_msg_time = 0
# 存放該 emit component 參數的區域
# 內容格式為 {param_name(字串) : param_value}
# param_name 請見上面 paraEmitList
self.emitParams = {}
# 用來存放每個 topic 的 publisher
# 內容格式 為 {topic_name(字串) : [publisher(物件), method(函式)]} (?
self.publishers = {}

@ -0,0 +1,414 @@
'''
# 不同的匯流排只有單一種通訊協定
# 匯流排接到訊息後透過 queue stack 傳送到橋接器
# 會有三種 Queue 分別作為 1. 固定串流橋接器 2. 回傳封包處理器 3. 轉拋串流
# 第三種比較麻煩 會需要用 list 管理多個轉換器的 queue
# 橋接器會將解析得到的結論 再透過 ros2 的 publisher 發送出去
# 關於 mavlink 採用 pymavlink 這個套件 製作匯流排
# pymavlink 的 socket 會由其他地方製作(例如 main) 再放到 mavlink_object 裡面
# 這邊的 main 會是用來初始化 mavlink_object 並啟動他的 run function
'''
# 基礎功能的 import
import threading
import queue
import time
# mavlink 的 import
from pymavlink import mavutil
# ROS2 的 import
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
# 基礎功能的 import
import threading
import queue
import time
# mavlink 的 import
from pymavlink import mavutil
# ROS2 的 import
from rclpy.node import Node
# 自定義的 import
from mavlinkDevice import mavlink_device, mavlink_systems
from mavlinkPublish import mavlink_publisher
from theLogger import setup_logger
# ====================== 分割線 =====================
logger = setup_logger("mavlinkObject.py")
fixed_stream_bridge_queue = queue.Queue()
return_packet_processor_queue = queue.Queue()
swap_queues = [] # 這個 list 用來存放轉拋串流的 queue # 這些 queue 同時也是各自 mavlink_object 的 output buffer
# ====================== 分割線 =====================
class mavlink_bridge(Node, mavlink_publisher):
'''
這個 class 就是 固定串流橋接器
是用來接收 mavlink 訊息 並進行橋接
這個地方是針對 fixed_stream_bridge_queue 的資料做處理的
記錄有 mavlink bus 上有那些 system id component id
為了每個 system id 都有一個對應的 device object
並且看是否有重複 system id
整段代碼包含兩大區塊 thread node
thread 區塊內會對 fixed_stream_bridge_queue 進行監聽 並且將收到的訊息進行處理
其中 HEARTBEAT 是一個特殊類別 用來初始化整個 device object
node 區塊則是處理 ros2 publisher subscriber 訂閱相關
藉由控制 ros2 的機制再把 device object 的資訊發送出去
ps. 我限制了這個 class 只能有一個 instance
'''
_instance = None
_lock = threading.Lock() # 確保多線程安全
def __new__(cls, *args, **kwargs):
with cls._lock: # 確保多線程環境下只有一個實例被創建
if cls._instance is None:
cls._instance = super(mavlink_bridge, cls).__new__(cls)
return cls._instance
def __init__(self):
if not hasattr(self, "initialized"): # 防止重複初始化
self.initialized = True
# 關聯到全域變數
global mavlink_systems
self.mavlink_systems = mavlink_systems
# 當 object 建立時會直接運行 thread 直到消滅
self.running = True
self.thread = threading.Thread(target=self._run_thread)
self.thread.start()
else:
logger.error('mavlink_bridge instance already exists. Do not create another one.')
def stop(self):
self.running = False
# === Thread 區塊 ===
def _run_thread(self):
# start_time = time.time() # debug
logger.info('Start of mavlink_bridge._run_thread')
# 從 Queue fixed_stream_bridge_queue 中取出 mavlink 資料 並呼叫相對應的 function 進行後處理
while self.running:
if fixed_stream_bridge_queue.empty():
continue
msg_pack = fixed_stream_bridge_queue.get()
msg = msg_pack[2]
sysid = msg.get_srcSystem()
compid = msg.get_srcComponent()
msg_id = msg.get_msgId()
# 若這個 system id 還不存在 則建立 device object
if not sysid in self.mavlink_systems:
this_device = mavlink_device() # 創建一個新的 device object
self.mavlink_systems[sysid] = this_device
this_device.socket_id = msg_pack[0]
this_device.sysid = sysid
else:
this_device = self.mavlink_systems[sysid]
# 若該 component id 存在 則直接使用該 component object
# 若該 component id 不存在 則利用 heartbeat 創建一個新的 component object
# 若該 component id 不存在 又不是 heartbeat 則不處理
if compid in self.mavlink_systems[sysid].components:
this_component = self.mavlink_systems[sysid].components[compid]
elif msg_id == 0:
# 只有透過 heartbeat 可以創建一個新的 component object
this_component = this_device.mavlink_component()
this_device.components[msg.get_srcComponent()] = this_component
this_component.mav_type = msg.type
this_component.mav_autopilot = msg.autopilot
else:
continue
# ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↓↓↓↓↓↓↓↓↓↓↓↓
if msg_id == 0: # HEARTBEAT 處理
this_component.emitParams['base_mode'] = msg.base_mode
this_component.emitParams['flightMode_mode'] = mavutil.mode_string_v10(msg)
this_component.emitParams['flightMode'] = msg # debug
# print("mav_type : ", msg.type) # debug
# print("get mode :", mavutil.mode_string_v10(msg)) # debug
# print("record mode :", this_component.emitParams['flightMode_mode']) # debug
elif msg_id == 30: # ATTITUDE 處理
this_component.emitParams['attitude'] = msg
elif msg_id == 32: # LOCAL_POSITION_NED 處理
this_component.emitParams['local_position'] = msg
elif msg_id == 33: # GLOBAL_POSITION_INT 處理
this_component.emitParams['global_position'] = msg
elif msg_id == 74: # VFR_HUD 處理
this_component.emitParams['vfr_hud'] = msg
elif msg_id == 147: # BATTERY_STATUS 處理
this_component.emitParams['battery'] = msg
# ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同訊息類型的功能寫在這裡 請加在這個 elif 之內 ↑↑↑↑↑↑↑↑↑↑↑↑
# 若未定義的訊息類型則不處理 並跳出訊息
else:
logger.warning('This Message Type Did not define process method : {} / {}'.format(msg.get_msgId(), msg.get_type()))
continue
logger.info('End of mavlink_bridge._run_thread')
# === Node 區塊 ===
def _init_node(self):
logger.info('Start of mavlink_bridge._init_node')
super().__init__('mavlink_bridge') # TODO 不知道為何 這句耗時超長 可以到 5~10 秒
def emit_info(self):
# 這邊將 mavlink_systems 內所有的 device object 內所有的 component 輪循過
# 把 emitParams 的參數發送出去
for sysid, device in self.mavlink_systems.items():
for compid, component in device.components.items():
for topic_name in component.publishers.keys():
publisher = component.publishers[topic_name][0]
packEmit_func = component.publishers[topic_name][1]
packEmit_func(component.emitParams, publisher)
def _del_node(self):
# TODO 這邊要刪除 node 的時候要做的事情
# 先註銷所有 mavlink_systems 中 component 的 publisher
# 再註銷所有 mavlink_systems 中的 device object
# 再註銷 node
pass
# ====================== 分割線 =====================
class mavlink_object():
'''
每個 mavlink bus 都會有一個 mavlink_object
其中主要是 thread 做統計封包與分流
分流表的控制在三個 list 分別為
multiplexingToAnalysis : 這個 list 是用來分流到固定串流橋接器的
multiplexingToReturn : 這個 list 是用來分流到回傳封包處理器的
multiplexingToSwap : 這個 list 是用來分流到轉拋串流的
透過先更新上述三個 list 後再執行 updateMultiplexingList 可以變更分流行為
'''
mavlinkObjects = {} # 用來記錄所有的 mavlink_object instance 資料格式 { socket_id(序號) : mavlink_object(物件實例) }
socket_num = 0 # 用來記錄目前的 socket 數量
def __new__(cls, *args, **kwargs):
# 每創建一個實例 就將其添加到 mavlinkObjects 列表中
# 創建時 會檢查 mavlinkObjects 列表中空缺的 socket_id 序號
# 若序號無中斷 則 socket_id 往後加一 若序號有中斷 則填補最小的序號
# socket_id 從 0 開始
instance = super().__new__(cls)
socket_id = 0
for k in cls.mavlinkObjects.keys():
if k == socket_id:
socket_id += 1
else:
break
instance.socket_id = socket_id
cls.socket_num += 1
cls.mavlinkObjects[socket_id] = instance
return instance
def __init__(self, socket):
self.mavlink_socket = socket
# 這邊變數是執行的時候被使用的 不要直接寫入它
self._multiplexingList = []
# 存放要發送的訊息的 queue 或稱 buffer
self.output_buffer = queue.Queue()
if self.socket_id >= len(swap_queues):
swap_queues.append(self.output_buffer)
else:
swap_queues[self.socket_id] = self.output_buffer
# 關聯到全域變數
global mavlink_systems
self.mavlink_systems = mavlink_systems
# 這三個 list 用來分配不同的訊息到不同的 queue
self.multiplexingToAnalysis = [
0, # HEARTBEAT # 挺必要的項目
# 24, # GPS_RAW_INT
# 30, # ATTITUDE
# 33, # GLOBAL_POSITION_INT
# 74, # VFR_HUD
# 147, # BATTERY_STATUS
]
self.multiplexingToReturn = []
self.multiplexingToSwap = [
[] for _ in range(len(swap_queues))
]
# 刷新其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) <= self.socket_id):
object.multiplexingToSwap.append([])
object.updateMultiplexingList()
logger.info('mavlink_object instance {} created'.format(self.socket_id))
def __del__(self):
# 停下自己的 thread
if self.mavlink_socket != None:
self.mavlink_socket.close()
self.stop()
# 移除其他 mavlink_object 的 multiplexingToSwap
for k, object in self.mavlinkObjects.items():
if (k != self.socket_id) and (len(object.multiplexingToSwap) > self.socket_id):
object.multiplexingToSwap[self.socket_id] = []
object.updateMultiplexingList()
# 移除自己的 swap_queues
swap_queues[self.socket_id] = None
# 處理 class 的 instance 記錄
self.socket_num -= 1
self.mavlinkObjects.pop(self.socket_id) # 刪除這個 instance
# logger 模組在這邊會一直掛掉 找不到問題 先用 print 代替
# print('mavlink_object instance {} deleted'.format(self.socket_id)) # debug
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
# try:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
# except Exception as e:
# print(f"Error logging in __del__: {e}")
# if 'logger' in globals() and logger:
# logger.info('mavlink_object instance {} deleted'.format(self.socket_id))
def run(self):
# TODO 檢查 socket 是否有連線
self.thread = threading.Thread(target=self._run_thread)
self.running = self.updateMultiplexingList()
self.thread.start()
def stop(self):
self.running = False
def _run_thread(self):
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id))
start_time = time.time()
while self.running:
timestamp = time.time() # 記錄接受到訊息的時間 # 這邊若是在大量訊息造成壅塞時 可能會有些微偏差
# 處理接收到的封包
try:
mavlinkMsg = self.mavlink_socket.recv_msg()
except Exception as e:
logger.critical(f"Receiving data not mavlink format. Object Delete.")
logger.critical(f"Receiving data not mavlink format. Object Delete.")
print(f"[mavlinkObject.py] Error receiving mavlink data: {e}")
print(mavlinkMsg)
self.running = False
break
if mavlinkMsg: # data type should be 'pymavlink.dialects.v20.ardupilotmega. etc...'
# 統計收到的訊息 & 透過 mavlink 封包的序列號來檢查是否有遺失的封包 & 記錄最後收到的封包時間
sysid = mavlinkMsg.get_srcSystem()
compid = mavlinkMsg.get_srcComponent()
if sysid in self.mavlink_systems: # 只有當這個 system id 已經透過 HEARTBEAT 訊號被初始化過 才會記錄相關訊息
# mavlinkMsg.get_type() # mavlinkMsg.get_msgId() # 前者是字串 後者是 int 都是代表 mavlink 類型
mavlink_systems[sysid].updateComponentPacketCount(compid, mavlinkMsg.get_seq(), mavlinkMsg.get_msgId(), timestamp)
# break # Debug function can put here you can see the input data from mavlink
# 將訊息依照 multiplexing list 分發到不同的 queue
for i in range(len(self._multiplexingList)):
if (self._multiplexingList[i] == []):
continue
elif (mavlinkMsg.get_msgId() in self._multiplexingList[i]) or (self._multiplexingList[i][0] == -1):
if i == 0:
fixed_stream_bridge_queue.put((self.socket_id,timestamp,mavlinkMsg))
elif i == 1:
return_packet_processor_queue.put((self.socket_id,timestamp,mavlinkMsg))
else:
_queue = swap_queues[i-2]
# _queue.put((self.socket_id,timestamp,mavlinkMsg)) # 測試看看 也許不需要別的資訊 只需要封包
_queue.put(mavlinkMsg)
# 處理要送出的封包
# 如果 有資料在 output_buffer 中則將其取出並發送
# 沒有就略過發送
try:
mavlinkMsg_send = self.output_buffer.get(block=False)
except queue.Empty:
mavlinkMsg_send = None
# except Exception as e:
# logger.error(f"Error getting data from output_buffer: {e}")
# mavlinkMsg_send = None
if mavlinkMsg_send:
# self.mavlink_socket.mav.send(mavlinkMsg_send)
self.mavlink_socket.write(mavlinkMsg_send.get_msgbuf()) # 這邊會將封包寫入 socket 中
# thread 結束
logger.info('End of mavlink_object._run_thread id : {}'.format(self.socket_id))
def updateMultiplexingList(self):
'''
更新 multiplexing list 並做簡單的檢查
'''
# 檢查 multiplexingToAnalysis 與 multiplexingToReturn 是否有 -1 值
check = (-1 in self.multiplexingToAnalysis) or (-1 in self.multiplexingToReturn)
if check:
logger.error('MultiplexingToAnalysis or MultiplexingToReturn NOT all type spilt. socket id : {}'.format(self.socket_id))
return False
# 檢查 multiplexingToSwap 與 swap_queues 的長度是否一致 而且 swap_queues 的長度不能為 0
check = len(self.multiplexingToSwap) != len(swap_queues) or len(swap_queues) == 0
if check:
logger.error('Multiplexing Queue not fit List , Please check the list. socket id : {}'.format(self.socket_id))
return False
# 對應到自己的 multiplexingToSwap 必需為空 避免對自己迴圈轉拋
try:
self.multiplexingToSwap[self.socket_id] = []
except IndexError:
logger.error('Multiplexing List of self socket id should be void. socket id : {}'.format(self.socket_id))
return False
# 組合 multiplexing list
multiL_tmp = [self.multiplexingToAnalysis, self.multiplexingToReturn] + self.multiplexingToSwap
# 檢查 multiplexing list 格式是否有錯誤 # 全部都要是 list 每個 list 裡面都要是 int
check = all(isinstance(i, list) and all(isinstance(j, int) for j in i) for i in multiL_tmp)
if not check:
logger.error('Multiplexing List Format Error, Please check the list. socket id : {}'.format(self.socket_id))
return False # 若有錯誤則回傳 False
# 更新 multiplexing list
self._multiplexingList = multiL_tmp
return True
# ====================== 分割線 =====================
# 整合到 ros2 之後的程式進入點
def main_node():
pass
if __name__ == '__main__':
pass

@ -0,0 +1,208 @@
'''
這個檔案只有一個 class
是作為 mavlinkObject.py mavlink_analyzer 類別的功能衍生
主要概念是將 "離散的" mavlink 參數轉換成 ROS topic
包含了創建 publisher 以及包裝並丟到 ros2 topic packEmit
publisher topic name 命名規則為 <前綴詞>/s<sysid>/<具體 topic>
'''
import std_msgs.msg
import sensor_msgs.msg
import geometry_msgs.msg
import mavros_msgs.msg
from theLogger import setup_logger
import math
logger = setup_logger("mavlinkPublish.py")
class mavlink_publisher():
prefix_path = 'MavLinkBus'
def create_flightMode(self, sysid, component_obj):
# target topic name # 請跟這個 method 的名稱保持一致
target_topic = 'flightMode'
# 這邊要檢查 flight_mode 是否存在
try:
_ = component_obj.emitParams['flightMode_mode']
except KeyError:
# 這個 component id 還不存在
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
# 若存在則 建立 publisher object 並回傳 true
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(std_msgs.msg.String, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_flightMode]
return True
def packEmit_flightMode(cls, emitParams, publisher):
msg_str = emitParams['flightMode_mode']
msg = std_msgs.msg.String()
msg.data = msg_str
publisher.publish(msg)
pass
# ↓↓↓↓↓↓↓↓↓↓↓↓ 處理不同 ros2 topic 訊息 請放在這裡 ↓↓↓↓↓↓↓↓↓↓↓↓
def euler_to_quaternion(cls,roll, pitch, yaw):
qx = math.sin(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) - math.cos(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
qy = math.cos(roll/2) * math.sin(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.cos(pitch/2) * math.sin(yaw/2)
qz = math.cos(roll/2) * math.cos(pitch/2) * math.sin(yaw/2) - math.sin(roll/2) * math.sin(pitch/2) * math.cos(yaw/2)
qw = math.cos(roll/2) * math.cos(pitch/2) * math.cos(yaw/2) + math.sin(roll/2) * math.sin(pitch/2) * math.sin(yaw/2)
return [qx, qy, qz, qw]
def create_attitude(self, sysid, component_obj):
target_topic = 'attitude'
try:
_ = component_obj.emitParams['attitude']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(sensor_msgs.msg.Imu, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_attitude]
def packEmit_attitude(self, emitParams, publisher):
mav_msg = emitParams['attitude']
msg = sensor_msgs.msg.Imu()
x, y, z, w = self.euler_to_quaternion(mav_msg.roll, mav_msg.pitch, mav_msg.yaw)
msg.orientation.x = x
msg.orientation.y = y
msg.orientation.z = z
msg.orientation.w = w
msg.angular_velocity.x = mav_msg.rollspeed
msg.angular_velocity.y = mav_msg.pitchspeed
msg.angular_velocity.z = mav_msg.yawspeed
publisher.publish(msg)
pass
def create_local_position_pose(self, sysid, component_obj):
target_topic = 'local_position/pose'
try:
_ = component_obj.emitParams['local_position']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(geometry_msgs.msg.Point, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_pose]
def packEmit_local_pose(cls, emitParams, publisher):
mav_msg = emitParams['local_position']
msg = geometry_msgs.msg.Point()
msg.x = mav_msg.x
msg.y = mav_msg.y
msg.z = mav_msg.z
publisher.publish(msg)
pass
def create_local_position_velocity(self, sysid, component_obj):
target_topic = 'local_position/velocity'
try:
_ = component_obj.emitParams['local_position']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(geometry_msgs.msg.Vector3, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_local_vel]
def packEmit_local_vel(cls, emitParams, publisher):
mav_msg = emitParams['local_position']
msg = geometry_msgs.msg.Vector3()
msg.x = mav_msg.vx
msg.y = mav_msg.vy
msg.z = mav_msg.vz
publisher.publish(msg)
pass
def create_global_global(self, sysid, component_obj):
target_topic = 'global_position/global'
try:
_ = component_obj.emitParams['global_position']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(sensor_msgs.msg.NavSatFix, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_global]
def packEmit_global_global(cls, emitParams, publisher):
mav_msg = emitParams['global_position']
msg = sensor_msgs.msg.NavSatFix()
msg.latitude = mav_msg.lat/1e7
msg.longitude = mav_msg.lon/1e7
msg.altitude = mav_msg.alt/1e3
publisher.publish(msg)
pass
def create_global_rel(self, sysid, component_obj):
target_topic = 'global_position/rel_alt'
try:
_ = component_obj.emitParams['global_position']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(std_msgs.msg.Float64, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_global_rel]
def packEmit_global_rel(cls, emitParams, publisher):
mav_msg = emitParams['global_position']
msg = std_msgs.msg.Float64()
msg.data = float(mav_msg.relative_alt/1e3)
publisher.publish(msg)
pass
def create_vfr_hud(self, sysid, component_obj):
target_topic = 'vfr_hud'
try:
_ = component_obj.emitParams['vfr_hud']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(mavros_msgs.msg.VfrHud, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_vfr_hud]
def packEmit_vfr_hud(cls, emitParams, publisher):
mav_msg = emitParams['vfr_hud']
msg = mavros_msgs.msg.VfrHud()
msg.airspeed = mav_msg.airspeed
msg.groundspeed = mav_msg.groundspeed
msg.heading = mav_msg.heading
msg.throttle = float(mav_msg.throttle)
msg.altitude = mav_msg.alt
msg.climb = mav_msg.climb
publisher.publish(msg)
pass
def create_battery(self, sysid, component_obj):
target_topic = 'battery'
try:
_ = component_obj.emitParams['battery']
except KeyError:
logger.warning('System ID : {} This Component ID : {} Did not init yet'.format(component_obj['sysid'], component_obj['compid']))
return False
topic_name = '{}/s{}/{}'.format(self.prefix_path, sysid, target_topic)
publisher_ = self.create_publisher(sensor_msgs.msg.BatteryState, topic_name, 1)
component_obj.publishers[target_topic] = [publisher_, self.packEmit_battery]
def packEmit_battery(cls, emitParams, publisher):
mav_msg = emitParams['battery']
msg = sensor_msgs.msg.BatteryState()
msg.voltage = mav_msg.voltages[0]/1e3
publisher.publish(msg)
pass
# ↑↑↑↑↑↑↑↑↑↑↑↑ 處理不同 ros2 topic 訊息 請放在這裡 ↑↑↑↑↑↑↑↑↑↑↑↑

@ -0,0 +1,14 @@
'''
透過某個地方 得到 udp uart 接口
對於每個接口 視為一個獨立的物件
物件對於不同的接口 是為不同類型的物件
每個類型的物件 創建一個獨立的執行緒 來處理資料
關於執行緒的實作 是寫在另一個模組
物件之間 也可以做資料轉換/轉拋
'''

@ -0,0 +1,43 @@
import logging
import os
from logging.handlers import TimedRotatingFileHandler
# 全域 Logger 實例
_global_logger = None
def setup_logger(name: str, log_dir: str = "log", level=logging.DEBUG) -> logging.Logger:
global _global_logger
if _global_logger is None:
# 確保 logs 資料夾存在
os.makedirs(log_dir, exist_ok=True)
# 建立全域 Logger
_global_logger = logging.getLogger("global_logger")
_global_logger.setLevel(level)
_global_logger.propagate = False # 防止重複輸出
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%m-%d %H:%M:%S"
)
# 檔案輸出(每天輪替)
file_handler = TimedRotatingFileHandler(
filename=os.path.join(log_dir, "application.log"),
when="midnight", # 每天 0 點輪替
backupCount=7, # 保留 7 天
encoding="utf-8"
)
file_handler.setFormatter(formatter)
_global_logger.addHandler(file_handler)
# 終端機輸出
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
_global_logger.addHandler(console_handler)
# 為每個模組建立子 Logger並設定名稱
module_logger = _global_logger.getChild(name)
module_logger.name = name # 修改子 Logger 的名稱,僅保留子 Logger 名稱
return module_logger

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>fc_network_adapter</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="chiyu1468@hotmail.com">picars</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/fc_network_adapter
[install]
install_scripts=$base/lib/fc_network_adapter

@ -0,0 +1,25 @@
from setuptools import find_packages, setup
package_name = 'fc_network_adapter'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='picars',
maintainer_email='chiyu1468@hotmail.com',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
],
},
)

@ -0,0 +1,444 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QSplitter, QScrollArea,
QSizePolicy, QApplication)
from PyQt6.QtCore import Qt, QTimer, QObject, pyqtSignal, QUrl
from PyQt6.QtGui import QColor
from PyQt6.QtWebEngineWidgets import QWebEngineView
import math
import re
import os
import sys
from threading import Lock
from geometry_msgs.msg import Point, Vector3
from sensor_msgs.msg import BatteryState, NavSatFix, Imu
from std_msgs.msg import String, Float64
from mavros_msgs.msg import VfrHud
class DroneSignals(QObject):
update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data)
class DroneMonitor(Node):
def __init__(self):
super().__init__('drone_monitor')
self.signals = DroneSignals()
self.drone_topics = {}
self.lock = Lock()
# 主题检测定时器
self.create_timer(1.0, self.scan_topics)
def scan_topics(self):
topics = self.get_topic_names_and_types()
drone_pattern = re.compile(r'/MavLinkBus/(s\d+)/(\w+)')
found_drones = set()
for topic_name, _ in topics:
if match := drone_pattern.match(topic_name):
drone_id, topic_type = match.groups()
found_drones.add(drone_id)
with self.lock:
self.drone_topics.setdefault(drone_id, set()).add(topic_type)
for drone_id in found_drones:
if not hasattr(self, f'drone_{drone_id}_subs'):
self.setup_drone_subs(drone_id)
def setup_drone_subs(self, drone_id):
base_topic = f'/MavLinkBus/{drone_id}'
subs = {
'attitude': self.create_subscription(
Imu,
f'{base_topic}/attitude',
lambda msg, did=drone_id: self.attitude_callback(did, msg),
10
),
'battery': self.create_subscription(
BatteryState,
f'{base_topic}/battery',
lambda msg, did=drone_id: self.battery_callback(did, msg),
10
),
'flightMode': self.create_subscription(
String,
f'{base_topic}/flightMode',
lambda msg, did=drone_id: self.mode_callback(did, msg),
10
),
'global': self.create_subscription(
NavSatFix,
f'{base_topic}/global_position/global',
lambda msg, did=drone_id: self.gps_callback(did, msg),
10
),
'rel_alt': self.create_subscription(
Float64,
f'{base_topic}/global_position/rel_alt',
lambda msg, did=drone_id: self.altitude_callback(did, msg),
10
),
'local_pose': self.create_subscription(
Point,
f'{base_topic}/local_position/pose',
lambda msg, did=drone_id: self.local_pose_callback(did, msg),
10
),
'local_vel': self.create_subscription(
Vector3,
f'{base_topic}/local_position/velocity',
lambda msg, did=drone_id: self.local_vel_callback(did, msg),
10
),
'vfr_hud': self.create_subscription(
VfrHud,
f'{base_topic}/vfr_hud',
lambda msg, did=drone_id: self.hud_callback(did, msg),
10
)
}
setattr(self, f'drone_{drone_id}_subs', subs)
self.signals.update_signal.emit('new_drone', drone_id, None)
def quaternion_to_euler(self, q):
sinr_cosp = 2 * (q.w * q.x + q.y * q.z)
cosr_cosp = 1 - 2 * (q.x**2 + q.y**2)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2 * (q.w * q.y - q.z * q.x)
pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi/2, sinp)
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y**2 + q.z**2)
yaw = math.atan2(siny_cosp, cosy_cosp)
return math.degrees(roll), math.degrees(pitch), math.degrees(yaw)
# 回调函数
def attitude_callback(self, drone_id, msg):
if hasattr(msg, 'orientation'):
roll, pitch, yaw = self.quaternion_to_euler(msg.orientation)
self.signals.update_signal.emit('attitude', drone_id, {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'rates': (msg.angular_velocity.x,
msg.angular_velocity.y,
msg.angular_velocity.z)
})
def battery_callback(self, drone_id, msg):
self.signals.update_signal.emit('battery', drone_id, {
'voltage': msg.voltage
})
def mode_callback(self, drone_id, msg):
self.signals.update_signal.emit('mode', drone_id, msg.data)
def gps_callback(self, drone_id, msg):
self.signals.update_signal.emit('gps', drone_id, {
'lat': msg.latitude,
'lon': msg.longitude
,'alt': msg.altitude
})
def local_vel_callback(self, drone_id, msg):
self.signals.update_signal.emit('velocity', drone_id, {
'vx': msg.x,
'vy': msg.y,
'vz': msg.z
})
def altitude_callback(self, drone_id, msg):
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': msg.data
})
def local_pose_callback(self, drone_id, msg):
self.signals.update_signal.emit('local_pose', drone_id, {
'x': msg.x,
'y': msg.y,
'z': msg.z
})
def hud_callback(self, drone_id, msg):
self.signals.update_signal.emit('hud', drone_id, {
'airspeed': msg.airspeed,
'groundspeed': msg.groundspeed,
'heading': msg.heading,
'throttle': msg.throttle,
'alt': msg.altitude,
'climb': msg.climb
})
class ControlStationUI(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle('GCS')
self.resize(1400, 900)
self.drones = {}
# 初始化ROS2
rclpy.init()
self.monitor = DroneMonitor()
self.monitor.signals.update_signal.connect(self.update_ui)
# ROS执行器配置
self.executor = rclpy.executors.SingleThreadedExecutor()
self.executor.add_node(self.monitor)
# 初始化UI
self.drone_positions = {}
self.map_loaded = False
self.init_ui()
# 定时处理ROS事件
self.timer = QTimer()
self.timer.timeout.connect(self.spin_ros)
self.timer.start(50)
def init_ui(self):
main_splitter = QSplitter(Qt.Orientation.Horizontal)
# 左侧信息面板
left_panel = QWidget()
left_layout = QVBoxLayout(left_panel)
left_layout.setContentsMargins(5, 5, 5, 5)
# 信息滚动区域
scroll_area = QScrollArea()
scroll_area.setWidgetResizable(True)
self.info_container = QWidget()
self.info_layout = QVBoxLayout(self.info_container)
self.info_layout.setAlignment(Qt.AlignmentFlag.AlignTop)
scroll_area.setWidget(self.info_container)
left_layout.addWidget(scroll_area)
# 右侧地图区域
self.map_view = QWebEngineView()
inline_html = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<link rel="stylesheet" href="https://unpkg.com/leaflet/dist/leaflet.css" />
<script src="https://unpkg.com/leaflet/dist/leaflet.js"></script>
<script src="https://unpkg.com/leaflet-rotatedmarker/leaflet.rotatedMarker.js"></script>
<style>
html, body, #map { height: 100%; margin: 0; }
</style>
</head>
<body>
<div id="map"></div>
<script>
var map = L.map('map').setView([0, 0], 2);
L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
maxZoom: 19
}).addTo(map);
var arrowIcon = L.icon({
iconUrl: 'https://cdn-icons-png.flaticon.com/512/399/399308.png', // 朝上的箭頭
iconSize: [40, 40],
iconAnchor: [20, 20]
});
var markers = {};
function updateDrone(lat, lon, id, heading) {
if (markers[id]) {
markers[id].setLatLng([lat, lon]);
markers[id].setRotationAngle(heading);
} else {
markers[id] = L.marker([lat, lon], {
icon: arrowIcon,
rotationAngle: heading,
rotationOrigin: 'center'
}).addTo(map).bindPopup(id);
}
map.setView([lat, lon], 16);
}
</script>
</body>
</html>
'''
self.map_view.setHtml(inline_html)
self.map_view.loadFinished.connect(self.on_map_loaded)
main_splitter.addWidget(left_panel)
main_splitter.addWidget(self.map_view)
main_splitter.setSizes([400, 1000])
self.setCentralWidget(main_splitter)
def on_map_loaded(self, ok: bool):
if ok:
self.map_loaded = True
else:
print("⚠️ 地图页加载失败")
def create_drone_panel(self, drone_id):
panel = QWidget()
panel.setObjectName(f"panel_{drone_id}")
panel.setStyleSheet("""
QWidget#panel_%s {
background-color: #2A2A2A;
border-radius: 8px;
margin: 6px;
padding: 10px;
border: 1px solid #444;
}
QLabel {
color: #DDD;
font-size: 12px;
padding: 2px;
}
""" % drone_id)
layout = QVBoxLayout(panel)
layout.setContentsMargins(8, 8, 8, 8)
layout.setSpacing(4)
# 顶部标题栏
header = QWidget()
header_layout = QHBoxLayout(header)
header_layout.setContentsMargins(0, 0, 0, 0)
# ID显示
id_label = QLabel(drone_id)
id_label.setStyleSheet("""
font-weight: bold;
font-size: 14px;
color: #7FFFD4;
min-width: 80px;
""")
# 状态指示灯
status_light = QLabel("")
status_light.setStyleSheet("color: #00FF00; font-size: 16px;")
header_layout.addWidget(id_label)
header_layout.addWidget(status_light)
header_layout.addStretch()
layout.addWidget(header)
# 数据字段(带标签)
self.create_data_row(layout, "模式:", f"{drone_id}_mode", "--")
self.create_data_row(layout, "電壓:", f"{drone_id}_battery", "--")
self.create_data_row(layout, "GPS:", f"{drone_id}_gps", "--")
self.create_data_row(layout, "高度:", f"{drone_id}_altitude", "--")
self.create_data_row(layout, "Local:", f"{drone_id}_local", "--")
self.create_data_row(layout, "HUD:", f"{drone_id}_hud", "--")
return panel
def create_data_row(self, layout, title, object_name, default):
row = QWidget()
hbox = QHBoxLayout(row)
hbox.setContentsMargins(0, 0, 0, 0)
# 标题标签
title_label = QLabel(title)
title_label.setStyleSheet("color: #888; min-width: 80px;")
title_label.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Preferred)
# 数据标签
value_label = QLabel(default)
value_label.setObjectName(object_name)
value_label.setWordWrap(True)
value_label.setStyleSheet("margin-left: 10px;")
hbox.addWidget(title_label)
hbox.addWidget(value_label)
layout.addWidget(row)
def update_ui(self, msg_type, drone_id, data):
if msg_type == 'new_drone':
self.add_drone(drone_id)
return
if not (panel := self.drones.get(drone_id)):
return
if msg_type == 'mode':
self.update_field(panel, drone_id, 'mode', f"{data}",
'#FF5555' if '返航' in data else '#55FF55')
elif msg_type == 'battery':
voltage = data.get('voltage', 0)
self.update_field(panel, drone_id, 'battery',
f"{voltage:.1f} V",
'#FF6464' if voltage < 12 else '#FFFFFF')
elif msg_type == 'gps':
lat, lon = data['lat'], data['lon']
self.drone_positions[drone_id] = (lat, lon)
text = (f"緯度: {lat:.6f}°\n"
f"經度: {lon:.6f}°")
self.update_field(panel, drone_id, 'gps', text)
elif msg_type == 'altitude':
text = (f"{data['altitude']:.1f} m")
self.update_field(panel, drone_id, 'altitude', text)
elif msg_type == 'local_pose':
text = (f"({data['x']:.1f}, {data['y']:.1f}, {data['z']:.1f})")
self.update_field(panel, drone_id, 'local', text)
elif msg_type == 'hud':
heading = data['heading']
text = (f"地速: {data['groundspeed']:.1f} m/s\n"
f"航向: {heading:.1f}°")
self.update_field(panel, drone_id, 'hud', text)
if self.map_loaded and drone_id in self.drone_positions:
lat, lon = self.drone_positions[drone_id]
js = f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});"
self.map_view.page().runJavaScript(js)
'''
elif msg_type == 'velocity':
text = (f"VX: {data['vx']:.1f} m/s\n"
f"VY: {data['vy']:.1f} m/s\n"
f"VZ: {data['vz']:.1f} m/s")
self.update_field(panel, drone_id, 'velocity', text)
elif msg_type == 'attitude':
text = (f"Roll: {data['roll']:.1f}°\n"
f"Pitch: {data['pitch']:.1f}°\n"
f"Yaw: {data['yaw']:.1f}°")
self.update_field(panel, drone_id, 'attitude', text)
'''
def update_field(self, panel, drone_id, field, text, color=None):
if label := panel.findChild(QLabel, f"{drone_id}_{field}"):
label.setText(text)
if color:
label.setStyleSheet(f"color: {color};")
def add_drone(self, drone_id):
if drone_id not in self.drones:
panel = self.create_drone_panel(drone_id)
self.info_layout.addWidget(panel)
self.drones[drone_id] = panel
def spin_ros(self):
try:
self.executor.spin_once(timeout_sec=0)
except Exception as e:
print(f"ROS spin error: {e}")
def closeEvent(self, event):
self.monitor.destroy_node()
self.executor.shutdown()
rclpy.shutdown()
event.accept()
if __name__ == '__main__':
app = QApplication(sys.argv)
station = ControlStationUI()
station.show()
app.exec(app.exec())

@ -144,7 +144,6 @@ class DroneControlApp(QMainWindow):
self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.start()
def initUI(self):

@ -1,5 +1,6 @@
import sys
import rclpy
import time
import math
from PyQt5 import QtWidgets
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox
@ -14,34 +15,118 @@ class MAVLinkWorker(QThread):
imu_signal = pyqtSignal(str, float)
hdg_signal = pyqtSignal(str, float)
groundspeed_signal = pyqtSignal(str, float)
ping_signal = pyqtSignal(str, float)
loss_signal = pyqtSignal(str, float)
frequency_signal = pyqtSignal(str, float)
param_signal = pyqtSignal(str, int)
kbps_signal = pyqtSignal(str, float)
def __init__(self, connection_string="udp:127.0.0.1:14550"):
def __init__(self, connection_string="udp:127.0.0.1:14550", usb='/dev/ttyUSB0'):
super().__init__()
self.connection = mavutil.mavlink_connection(connection_string)
self.namespaces = ['UAV1', 'UAV2', 'UAV3']
self.connection = mavutil.mavlink_connection(usb, baud=57600)
self.connection.wait_heartbeat()
for sysid in self.namespaces:
sysid = int(sysid.replace('UAV', ''))
self.set_sr_params(sysid)
self.running = True
self.uav_data = {}
# 用於計算頻率丟包
self.message_count = {}
self.frequency = {}
self.start_time = {}
self.seq_numbers = {}
self.packet_loss_count = {}
self.total_packet_count = {}
self.loss_percentage = {}
self.total_bytes_received = {}
self.throughput_KBps = {}
for namespace in self.namespaces:
self.request_param(namespace, "SR1_EXTRA1")
self.connection.mav.timesync_send(
0, #tc1
int(time.time() * 1e9) #ts1
)
def run(self):
while self.running:
try:
msg = self.connection.recv_match(blocking=True)
msg = self.connection.recv_msg()
current_time = time.time()
if not msg:
continue
msg_type = msg.get_type()
sysid = msg.get_srcSystem()
namespace = f"uav{sysid}"
if sysid not in self.uav_data:
self.uav_data[sysid] = {"sysid": sysid}
if sysid == 0:
continue
namespace = f"UAV{sysid}"
msg_type = msg.get_type()
if msg_type =="BAD_DATA":
continue
if sysid not in self.total_bytes_received:
self.total_bytes_received[sysid] = 0
self.throughput_KBps[sysid] = 0
# 計算訊息大小
msg_bytes = msg.get_msgbuf() # 取得封包的 bytes
if msg_bytes:
self.total_bytes_received[sysid] += len(msg_bytes)
# Packet loss calculation
if sysid not in self.seq_numbers:
self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking
self.packet_loss_count[sysid] = 0
self.total_packet_count[sysid] = 1
else:
current_seq = msg.get_seq()
expected_seq = (self.seq_numbers[sysid] + 1) % 256
self.total_packet_count[sysid] += 1
if current_seq != expected_seq: # Packet(s) lost
lost_packets = (current_seq - expected_seq) % 256
self.packet_loss_count[sysid] += lost_packets
self.total_packet_count[sysid] += lost_packets
self.seq_numbers[sysid] = current_seq
self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100
self.loss_signal.emit(namespace, self.loss_percentage[sysid])
# Frequency calculation
if sysid not in self.message_count:
self.message_count[sysid] = 0
self.start_time[sysid] = current_time
self.message_count[sysid] += 1
# 每隔一段時間更新
elapsed_time = current_time - self.start_time[sysid]
if elapsed_time >= 1:
# 每秒頻率
self.frequency[sysid] = self.message_count[sysid] / elapsed_time
self.frequency_signal.emit(namespace, self.frequency[sysid])
# 發送 timesync request
self.connection.mav.timesync_send(
0, #tc1
int(current_time * 1e9) #ts1
)
#吞吐量
self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time
self.kbps_signal.emit(namespace, self.throughput_KBps[sysid])
self.start_time[sysid] = current_time
self.message_count[sysid] = 0
self.total_bytes_received[sysid] = 0
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
self.state_signal.emit(namespace, mode)
elif msg_type == "BATTERY_STATUS":
percentage = msg.battery_remaining/100
self.battery_signal.emit(namespace, percentage)
voltage = msg.voltages[0] / 1000
self.battery_signal.emit(namespace, voltage)
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
@ -60,7 +145,7 @@ class MAVLinkWorker(QThread):
self.local_position_signal.emit(namespace, x, y, z)
elif msg_type == "ATTITUDE":
pitch = msg.pitch
pitch = math.degrees(msg.pitch)
self.imu_signal.emit(namespace, pitch)
elif msg_type == "VFR_HUD":
@ -69,6 +154,16 @@ class MAVLinkWorker(QThread):
self.hdg_signal.emit(namespace, heading)
self.groundspeed_signal.emit(namespace, groundspeed)
elif msg_type == "TIMESYNC":
round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6
if(round_trip_time<1e6):
self.ping_signal.emit(namespace, round_trip_time)
elif msg_type == 'PARAM_VALUE':
param_name = "SR1_EXTRA1"
if msg.param_id == param_name:
self.param_signal.emit(namespace, msg.param_value)
except Exception as e:
print(f"Error reading message: {e}")
@ -77,7 +172,7 @@ class MAVLinkWorker(QThread):
self.connection.close()
def set_mode(self, namespace, mode):
sysid = int(namespace[-1])
sysid = int(namespace.replace('UAV', ''))
if mode == 'STABILIZE':
mode = 0
elif mode == 'AUTO':
@ -86,56 +181,47 @@ class MAVLinkWorker(QThread):
mode = 4
elif mode == 'LOITER':
mode = 5
self.connection.mav.set_mode_send(
self.connection.mav.set_mode_send( #SET_MODE (11) — [DEP]
sysid,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode
)
def arm(self, namespace, arm):
sysid = int(namespace[-1])
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, #MAV_CMD_COMPONENT_ARM_DISARM (400)
0, # Confirmation
1 if arm else 0,
0, 0, 0, 0, 0, 0
)
# self.connection.mav.command_long_send(
# sysid,
# 1, # Component ID
# mavutil.mavlink.MAV_CMD_SET_MESSAGE_INTERVAL,
# 0,
# 33,
# -1,
# 0, 0, 0, 0, 0
# )
def takeoff(self, namespace, altitude):
sysid = int(namespace[-1])
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, # Command to execute
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, #MAV_CMD_NAV_TAKEOFF (22)
0, # Confirmation (0 = first transmission)
0, 0, 0, 0, # Parameters 1-4 (Unused in takeoff command)
0, 0, altitude # Latitude, Longitude, Altitude (target_altitude in meters)
)
def land(self, namespace):
sysid = int(namespace[-1])
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_NAV_LAND,
mavutil.mavlink.MAV_CMD_NAV_LAND, #MAV_CMD_NAV_LAND (21)
0,
0, 0, 0, 0,
0, 0, 0
)
def set_local_position(self, namespace, x, y, z):
sysid = int(namespace[-1])
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.set_position_target_local_ned_send(
0, sysid, 1, # time_boot_ms, sysid, compid
mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame
@ -146,12 +232,80 @@ class MAVLinkWorker(QThread):
0, 0 # Yaw, yaw_rate
)
def reboot_drone(self, namespace):
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.command_long_send(
sysid,
1, # target_component (一般為1)
mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, # Reboot 命令
0, # confirmation
1, # 第一個參數 (1 = reboot0 = 無操作2 = 關機)
0, 0, 0, 0, 0, 0 # 其餘參數保留
)
def set_param(self, namespace, param_name, value):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_set_send(
sysid,
1,
param_name.encode('utf-8'),
float(value),
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
def request_param(self, namespace, param_name):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_request_read_send(
sysid, # 系統 ID
1, # 組件 ID
param_name.encode('utf-8'), # 參數名稱
-1 # 參數索引,-1 表示根據名稱請求
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 2, 4]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[1],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[1],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[1]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 1, 1]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[0],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[0],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[0]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
class DroneControlApp(QMainWindow):
def __init__(self):
super().__init__()
self.namespaces = ['uav1', 'uav2', 'uav3']
self.workers = MAVLinkWorker()
self.namespaces = self.workers.namespaces
self.initUI()
# Connect signals to update the UI
@ -163,7 +317,11 @@ class DroneControlApp(QMainWindow):
self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.ping_signal.connect(self.update_ping)
self.workers.loss_signal.connect(self.update_loss)
self.workers.frequency_signal.connect(self.update_frequency)
self.workers.param_signal.connect(self.update_param)
self.workers.kbps_signal.connect(self.update_kbps)
self.workers.start()
def initUI(self):
@ -193,15 +351,20 @@ class DroneControlApp(QMainWindow):
# 狀態顯示
self.uav_labels[namespace] = {
"status": QLabel(f"{namespace} 狀態:等待數據..."),
"battery": QLabel(f"{namespace} 電量:等待數據..."),
"gps": QLabel(f"{namespace} GPS位置等待數據...\n\n"),
"fix": QLabel(f"{namespace} Fix Type等待數據..."),
"satellites": QLabel(f"{namespace} 衛星數量:等待數據..."),
"local_position": QLabel(f"{namespace} Local Position等待數據..."),
"groundspeed": QLabel(f"{namespace} 地面速度:等待數據..."),
"pitch": QLabel(f"{namespace} Pitch等待數據..."),
"heading": QLabel(f"{namespace} Heading等待數據..."),
"status": QLabel("狀態:等待數據..."),
"battery": QLabel("電壓:等待數據..."),
"local_position": QLabel("Local Position等待數據..."),
"gps": QLabel("GPS位置等待數據...\n\n"),
"fix": QLabel("Fix Type等待數據..."),
"satellites": QLabel("衛星數量:等待數據..."),
"groundspeed": QLabel("地面速度:等待數據..."),
"pitch": QLabel("Pitch等待數據..."),
"heading": QLabel("Heading等待數據..."),
"ping": QLabel("Ping等待數據..."),
"loss": QLabel("丟包:等待數據..."),
"frequency": QLabel("頻率:等待數據..."),
"kbps": QLabel("吞吐量:等待數據..."),
"param": QLabel("SR1_EXTRA1等待數據...")
}
# 把每個資訊添加到該 UAV 的垂直佈局中
@ -213,6 +376,16 @@ class DroneControlApp(QMainWindow):
main_layout.addLayout(uav_layout)
# SR1_EXTRA1參數設置
param_layout = QHBoxLayout()
self.paramInput = QLineEdit()
self.paramInput.setPlaceholderText("輸入SR1_EXTRA1的新值")
self.setParamButton = QPushButton("設置SR1_EXTRA1")
self.setParamButton.clicked.connect(self.set_SR1_EXTRA1)
param_layout.addWidget(self.paramInput)
param_layout.addWidget(self.setParamButton)
main_layout.addLayout(param_layout)
# 模式切換區域
mode_layout = QHBoxLayout()
self.modeComboBox = QComboBox()
@ -224,6 +397,23 @@ class DroneControlApp(QMainWindow):
mode_layout.addWidget(self.modeButton)
main_layout.addLayout(mode_layout)
# 解鎖按鈕
fly_layout = QHBoxLayout()
self.armButton = QPushButton("解鎖")
self.armButton.clicked.connect(self.arm_drone)
# 起飛按鈕
self.takeoffButton = QPushButton("起飛")
self.takeoffButton.clicked.connect(self.takeoff_drone)
# 降落按鈕
self.landButton = QPushButton("降落")
self.landButton.clicked.connect(self.land_drone)
fly_layout.addWidget(self.armButton)
fly_layout.addWidget(self.takeoffButton)
fly_layout.addWidget(self.landButton)
main_layout.addLayout(fly_layout)
# XYZ 設置欄位
xyz_layout = QHBoxLayout()
self.positionX = QLineEdit()
@ -232,31 +422,21 @@ class DroneControlApp(QMainWindow):
self.positionY.setPlaceholderText("Y")
self.positionZ = QLineEdit()
self.positionZ.setPlaceholderText("Z")
self.setPositionButton = QPushButton("設置位置")
self.setPositionButton.clicked.connect(self.set_local_position)
xyz_layout.addWidget(QLabel("輸入位置:"))
xyz_layout.addWidget(self.positionX)
xyz_layout.addWidget(self.positionY)
xyz_layout.addWidget(self.positionZ)
xyz_layout.addWidget(self.setPositionButton)
main_layout.addLayout(xyz_layout)
# 設置 XYZ 的按鈕
self.setPositionButton = QPushButton("設置位置")
self.setPositionButton.clicked.connect(self.set_local_position)
main_layout.addWidget(self.setPositionButton)
# 解鎖按鈕
self.armButton = QPushButton("解鎖")
self.armButton.clicked.connect(self.arm_drone)
main_layout.addWidget(self.armButton)
# 起飛按鈕
self.takeoffButton = QPushButton("起飛")
self.takeoffButton.clicked.connect(self.takeoff_drone)
main_layout.addWidget(self.takeoffButton)
# 降落按鈕
self.landButton = QPushButton("降落")
self.landButton.clicked.connect(self.land_drone)
main_layout.addWidget(self.landButton)
#設置重啟按鈕
self.rebootButton = QPushButton("重啟")
self.rebootButton.clicked.connect(self.reboot_drone)
main_layout.addWidget(self.rebootButton)
# 設置主佈局
central_widget = QWidget(self)
@ -271,13 +451,13 @@ class DroneControlApp(QMainWindow):
event.accept()
def update_state(self, namespace, mode):
self.uav_labels[namespace]["status"].setText(f"{namespace} 狀態:{mode}")
self.uav_labels[namespace]["status"].setText(f"狀態:{mode}")
def update_battery(self, namespace, percentage):
self.uav_labels[namespace]["battery"].setText(f"{namespace} 電量:{percentage * 100:.2f}%")
def update_battery(self, namespace, voltage):
self.uav_labels[namespace]["battery"].setText(f"電壓:{voltage:.2f} V")
def update_gps(self, namespace, latitude, longitude):
self.uav_labels[namespace]["gps"].setText(f"{namespace} GPS位置 \nLat:{latitude:.6f}° \nLon:{longitude:.6f}°")
self.uav_labels[namespace]["gps"].setText(f"GPS位置 \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°")
def update_gps_status(self, namespace, fix_type, satellites_visible):
fix_type_str = {
@ -288,20 +468,35 @@ class DroneControlApp(QMainWindow):
4: "RTK Float",
5: "RTK Fix"
}.get(fix_type, "Unknown")
self.uav_labels[namespace]["fix"].setText(f"{namespace} Fix Type{fix_type_str}")
self.uav_labels[namespace]["satellites"].setText(f"{namespace} 衛星數量:{satellites_visible}")
self.uav_labels[namespace]["fix"].setText(f"Fix Type{fix_type_str}")
self.uav_labels[namespace]["satellites"].setText(f"衛星數量:{satellites_visible}")
def update_local_position(self, namespace, x, y, z):
self.uav_labels[namespace]["local_position"].setText(f"{namespace} Local({x:.2f}, {y:.2f}, {z:.2f})")
self.uav_labels[namespace]["local_position"].setText(f"Local({x:.2f}, {y:.2f}, {z:.2f})")
def update_imu(self, namespace, pitch):
self.uav_labels[namespace]["pitch"].setText(f"{namespace} Pitch{pitch:.2f}°")
self.uav_labels[namespace]["pitch"].setText(f"Pitch{pitch:.2f}°")
def update_hdg(self, namespace, heading):
self.uav_labels[namespace]["heading"].setText(f"{namespace} Heading{heading:.2f}°")
self.uav_labels[namespace]["heading"].setText(f"Heading{heading:.2f}°")
def update_groundspeed(self, namespace, groundspeed):
self.uav_labels[namespace]["groundspeed"].setText(f"{namespace} 地面速度:{groundspeed:.2f} m/s")
self.uav_labels[namespace]["groundspeed"].setText(f"地面速度:{groundspeed:.2f} m/s")
def update_ping(self, namespace, ping):
self.uav_labels[namespace]["ping"].setText(f"Ping{ping:.2f} ms")
def update_loss(self, namespace, loss):
self.uav_labels[namespace]["loss"].setText(f"丟包:{loss:.2f}%")
def update_frequency(self, namespace, frequency):
self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz")
def update_kbps(self, namespace, kbps):
self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s")
def update_param(self, namespace, value):
self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}")
def change_mode(self):
try:
@ -323,10 +518,12 @@ class DroneControlApp(QMainWindow):
def takeoff_drone(self):
try:
z_text = self.positionZ.text().strip()
altitude = float(z_text) if z_text else 5.0
for namespace in self.namespaces:
z = float(z_text) if z_text else 5.0
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.takeoff(namespace, altitude)
self.workers.takeoff(namespace, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
@ -340,14 +537,33 @@ class DroneControlApp(QMainWindow):
x = float(self.positionX.text().strip())
y = float(self.positionY.text().strip())
z = float(self.positionZ.text().strip())
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def reboot_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.reboot_drone(namespace)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"重啟失敗:{e}")
def set_SR1_EXTRA1(self):
param_value = self.paramInput.text().strip()
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z)
self.workers.set_param(namespace, "SR1_EXTRA1", param_value)
self.workers.request_param(namespace, "SR1_EXTRA1")
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def main(args=None):
rclpy.init(args=args)
def main():
app = QtWidgets.QApplication(sys.argv)
window = DroneControlApp()
window.show()

@ -0,0 +1,619 @@
import sys
import time
import math
import serial
import struct
from PyQt5 import QtWidgets
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox
from pymavlink import mavutil
from pymavlink.dialects.v20 import ardupilotmega as mavlink2
class PacketCapture:
def __init__(self):
self.data = bytearray()
def write(self, b):
self.data.extend(b)
return len(b)
class MAVLinkWorker(QThread):
state_signal = pyqtSignal(str, str)
battery_signal = pyqtSignal(str, float)
gps_signal = pyqtSignal(str, float, float)
gps_status_signal = pyqtSignal(str, int, int)
local_position_signal = pyqtSignal(str, float, float, float)
imu_signal = pyqtSignal(str, float)
hdg_signal = pyqtSignal(str, float)
groundspeed_signal = pyqtSignal(str, float)
ping_signal = pyqtSignal(str, float)
loss_signal = pyqtSignal(str, float)
frequency_signal = pyqtSignal(str, float)
param_signal = pyqtSignal(str, int)
kbps_signal = pyqtSignal(str, float)
def __init__(self):
super().__init__()
self.namespaces = ['UAV1', 'UAV2', 'UAV3']
self.connection = mavutil.mavlink_connection('/dev/ttyUSB0', baud=57600)
self.connection.wait_heartbeat()
for sysid in self.namespaces:
sysid = int(sysid.replace('UAV', ''))
self.set_sr_params(sysid)
self.running = True
# 用於計算頻率丟包
self.message_count = {}
self.frequency = {}
self.start_time = {}
self.seq_numbers = {}
self.packet_loss_count = {}
self.total_packet_count = {}
self.loss_percentage = {}
self.total_bytes_received = {}
self.throughput_KBps = {}
for namespace in self.namespaces:
self.request_param(namespace, "SR1_EXTRA1")
self.connection.mav.timesync_send(
0, #tc1
int(time.time() * 1e9) #ts1
)
def run(self):
while self.running:
try:
msg = self.connection.recv_msg()
current_time = time.time()
if not msg:
continue
sysid = msg.get_srcSystem()
if sysid == 0:
continue
namespace = f"UAV{sysid}"
msg_type = msg.get_type()
if msg_type =="BAD_DATA":
continue
if sysid not in self.total_bytes_received:
self.total_bytes_received[sysid] = 0
self.throughput_KBps[sysid] = 0
# 計算訊息大小
msg_bytes = msg.get_msgbuf() # 取得封包的 bytes
if msg_bytes:
self.total_bytes_received[sysid] += len(msg_bytes)
# Packet loss calculation
if sysid not in self.seq_numbers:
self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking
self.packet_loss_count[sysid] = 0
self.total_packet_count[sysid] = 1
else:
current_seq = msg.get_seq()
expected_seq = (self.seq_numbers[sysid] + 1) % 256
self.total_packet_count[sysid] += 1
if current_seq != expected_seq: # Packet(s) lost
lost_packets = (current_seq - expected_seq) % 256
self.packet_loss_count[sysid] += lost_packets
self.total_packet_count[sysid] += lost_packets
self.seq_numbers[sysid] = current_seq
self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100
self.loss_signal.emit(namespace, self.loss_percentage[sysid])
# Frequency calculation
if sysid not in self.message_count:
self.message_count[sysid] = 0
self.start_time[sysid] = current_time
self.message_count[sysid] += 1
# 每隔一段時間更新
elapsed_time = current_time - self.start_time[sysid]
if elapsed_time >= 1:
# 每秒頻率
self.frequency[sysid] = self.message_count[sysid] / elapsed_time
self.frequency_signal.emit(namespace, self.frequency[sysid])
# 發送 timesync request
self.connection.mav.timesync_send(
0, #tc1
int(current_time * 1e9) #ts1
)
#吞吐量
self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time
self.kbps_signal.emit(namespace, self.throughput_KBps[sysid])
self.start_time[sysid] = current_time
self.message_count[sysid] = 0
self.total_bytes_received[sysid] = 0
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
self.state_signal.emit(namespace, mode)
elif msg_type == "BATTERY_STATUS":
voltage = msg.voltages[0] / 1000
self.battery_signal.emit(namespace, voltage)
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
longitude = msg.lon / 1e7
self.gps_signal.emit(namespace, latitude, longitude)
elif msg_type == "GPS_RAW_INT":
fix_type = msg.fix_type
satellites_visible = msg.satellites_visible
self.gps_status_signal.emit(namespace, fix_type, satellites_visible)
elif msg_type == "LOCAL_POSITION_NED":
x = msg.y
y = msg.x
z = -msg.z
self.local_position_signal.emit(namespace, x, y, z)
elif msg_type == "ATTITUDE":
pitch = math.degrees(msg.pitch)
self.imu_signal.emit(namespace, pitch)
elif msg_type == "VFR_HUD":
groundspeed = msg.groundspeed
heading = msg.heading
self.hdg_signal.emit(namespace, heading)
self.groundspeed_signal.emit(namespace, groundspeed)
elif msg_type == "TIMESYNC":
round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6
if(round_trip_time<1e6):
self.ping_signal.emit(namespace, round_trip_time)
elif msg_type == 'PARAM_VALUE':
param_name = "SR1_EXTRA1"
if msg.param_id == param_name:
self.param_signal.emit(namespace, msg.param_value)
except Exception as e:
print(f"Error reading message: {e}")
def stop(self):
self.running = False
self.connection.close()
def build_api_tx_frame(self, data: bytes, frame_id=0x01):
frame_type = 0x10
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # Broadcast
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
def send_command(self, msg):
# Create the packet and send via serial port
PORT = "/dev/ttyUSB0"
BAUD = 57600
ser = serial.Serial(PORT, BAUD)
capture = PacketCapture()
mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1)
mav.version = 2
mav.send(msg)
api_frame = self.build_api_tx_frame(capture.data)
ser.write(api_frame)
print("RAW HEX:", capture.data.hex())
def set_mode(self, namespace, mode):
sysid = int(namespace.replace('UAV', ''))
if mode == 'STABILIZE':
mode = 0
elif mode == 'AUTO':
mode = 3
elif mode == 'GUIDED':
mode = 4
elif mode == 'LOITER':
mode = 5
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_DO_SET_MODE,
confirmation=0,
param1=1, # Custom mode enabled
param2=mode,
param3=0, param4=0, param5=0, param6=0, param7=0
)
self.send_command(msg)
def arm(self, namespace, arm):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
confirmation=0,
param1=1 if arm else 0, # 1 to arm, 0 to disarm
param2=0, param3=0, param4=0, param5=0, param6=0, param7=0
)
self.send_command(msg)
def takeoff(self, namespace, altitude):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
confirmation=0,
param1=0, param2=0, param3=0, param4=0, param5=0, param6=0,
param7=altitude
)
self.send_command(msg)
def land(self, namespace):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_NAV_LAND,
confirmation=0,
param1=0, param2=0, param3=0, param4=0, param5=0, param6=0,
param7=0
)
self.send_command(msg)
def set_local_position(self, namespace, x, y, z):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.set_position_target_local_ned_encode(
0, sysid, 1, # time_boot_ms, sysid, compid
mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame
0b110111111000, # Mask
y, x, -z, # Position
0, 0, 0, # Velocity
0, 0, 0, # Acceleration
0, 0 # Yaw, yaw_rate
)
self.send_command(msg)
def reboot_drone(self, namespace):
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.command_long_send(
sysid,
1, # target_component (一般為1)
mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, # Reboot 命令
0, # confirmation
1, # 第一個參數 (1 = reboot0 = 無操作2 = 關機)
0, 0, 0, 0, 0, 0 # 其餘參數保留
)
def set_param(self, namespace, param_name, value):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_set_send(
sysid,
1,
param_name.encode('utf-8'),
float(value),
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
def request_param(self, namespace, param_name):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_request_read_send(
sysid, # 系統 ID
1, # 組件 ID
param_name.encode('utf-8'), # 參數名稱
-1 # 參數索引,-1 表示根據名稱請求
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 2, 4]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[1],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[1],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[1]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 1, 1]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[0],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[0],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[0]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
class DroneControlApp(QMainWindow):
def __init__(self):
super().__init__()
self.workers = MAVLinkWorker()
self.namespaces = self.workers.namespaces
self.initUI()
# Connect signals to update the UI
self.workers.state_signal.connect(self.update_state)
self.workers.battery_signal.connect(self.update_battery)
self.workers.gps_signal.connect(self.update_gps)
self.workers.gps_status_signal.connect(self.update_gps_status)
self.workers.local_position_signal.connect(self.update_local_position)
self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.ping_signal.connect(self.update_ping)
self.workers.loss_signal.connect(self.update_loss)
self.workers.frequency_signal.connect(self.update_frequency)
self.workers.param_signal.connect(self.update_param)
self.workers.kbps_signal.connect(self.update_kbps)
self.workers.start()
def initUI(self):
self.setWindowTitle("多無人機控制介面")
self.setGeometry(100, 100, 800, 600)
# 主佈局
main_layout = QVBoxLayout()
# 無人機選擇區域
uav_layout = QHBoxLayout()
self.uav_checkboxes = {}
for namespace in self.namespaces:
checkbox = QCheckBox(namespace)
checkbox.setChecked(True) # 預設勾選
self.uav_checkboxes[namespace] = checkbox
uav_layout.addWidget(checkbox)
main_layout.addLayout(uav_layout)
# 顯示所有無人機資訊,從左到右顯示
uav_layout = QHBoxLayout()
# 逐個顯示 UAV 的資訊
self.uav_labels = {}
for namespace in self.namespaces:
uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列
# 狀態顯示
self.uav_labels[namespace] = {
"status": QLabel("狀態:等待數據..."),
"battery": QLabel("電壓:等待數據..."),
"local_position": QLabel("Local Position等待數據..."),
"gps": QLabel("GPS位置等待數據...\n\n"),
"fix": QLabel("Fix Type等待數據..."),
"satellites": QLabel("衛星數量:等待數據..."),
"groundspeed": QLabel("地面速度:等待數據..."),
"pitch": QLabel("Pitch等待數據..."),
"heading": QLabel("Heading等待數據..."),
"ping": QLabel("Ping等待數據..."),
"loss": QLabel("丟包:等待數據..."),
"frequency": QLabel("頻率:等待數據..."),
"kbps": QLabel("吞吐量:等待數據..."),
"param": QLabel("SR1_EXTRA1等待數據...")
}
# 把每個資訊添加到該 UAV 的垂直佈局中
for label in self.uav_labels[namespace].values():
uav_info_layout.addWidget(label)
# 將該 UAV 的資訊佈局加到主佈局中(從左到右排列)
uav_layout.addLayout(uav_info_layout)
main_layout.addLayout(uav_layout)
# SR1_EXTRA1參數設置
param_layout = QHBoxLayout()
self.paramInput = QLineEdit()
self.paramInput.setPlaceholderText("輸入SR1_EXTRA1的新值")
self.setParamButton = QPushButton("設置SR1_EXTRA1")
self.setParamButton.clicked.connect(self.set_SR1_EXTRA1)
param_layout.addWidget(self.paramInput)
param_layout.addWidget(self.setParamButton)
main_layout.addLayout(param_layout)
# 模式切換區域
mode_layout = QHBoxLayout()
self.modeComboBox = QComboBox()
self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"])
self.modeButton = QPushButton("切換模式")
self.modeButton.clicked.connect(self.change_mode)
mode_layout.addWidget(QLabel("選擇模式:"))
mode_layout.addWidget(self.modeComboBox)
mode_layout.addWidget(self.modeButton)
main_layout.addLayout(mode_layout)
# 解鎖按鈕
fly_layout = QHBoxLayout()
self.armButton = QPushButton("解鎖")
self.armButton.clicked.connect(self.arm_drone)
# 起飛按鈕
self.takeoffButton = QPushButton("起飛")
self.takeoffButton.clicked.connect(self.takeoff_drone)
# 降落按鈕
self.landButton = QPushButton("降落")
self.landButton.clicked.connect(self.land_drone)
fly_layout.addWidget(self.armButton)
fly_layout.addWidget(self.takeoffButton)
fly_layout.addWidget(self.landButton)
main_layout.addLayout(fly_layout)
# XYZ 設置欄位
xyz_layout = QHBoxLayout()
self.positionX = QLineEdit()
self.positionX.setPlaceholderText("X")
self.positionY = QLineEdit()
self.positionY.setPlaceholderText("Y")
self.positionZ = QLineEdit()
self.positionZ.setPlaceholderText("Z")
self.setPositionButton = QPushButton("設置位置")
self.setPositionButton.clicked.connect(self.set_local_position)
xyz_layout.addWidget(QLabel("輸入位置:"))
xyz_layout.addWidget(self.positionX)
xyz_layout.addWidget(self.positionY)
xyz_layout.addWidget(self.positionZ)
xyz_layout.addWidget(self.setPositionButton)
main_layout.addLayout(xyz_layout)
# 設置 XYZ 的按鈕
#設置重啟按鈕
self.rebootButton = QPushButton("重啟")
self.rebootButton.clicked.connect(self.reboot_drone)
main_layout.addWidget(self.rebootButton)
# 設置主佈局
central_widget = QWidget(self)
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
self.show()
def closeEvent(self, event):
try:
self.workers.stop()
finally:
event.accept()
def update_state(self, namespace, mode):
self.uav_labels[namespace]["status"].setText(f"狀態:{mode}")
def update_battery(self, namespace, voltage):
self.uav_labels[namespace]["battery"].setText(f"電壓:{voltage:.2f} V")
def update_gps(self, namespace, latitude, longitude):
self.uav_labels[namespace]["gps"].setText(f"GPS位置 \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°")
def update_gps_status(self, namespace, fix_type, satellites_visible):
fix_type_str = {
0: "No Fix",
1: "No Fix",
2: "2D Fix",
3: "3D Fix",
4: "RTK Float",
5: "RTK Fix"
}.get(fix_type, "Unknown")
self.uav_labels[namespace]["fix"].setText(f"Fix Type{fix_type_str}")
self.uav_labels[namespace]["satellites"].setText(f"衛星數量:{satellites_visible}")
def update_local_position(self, namespace, x, y, z):
self.uav_labels[namespace]["local_position"].setText(f"Local({x:.2f}, {y:.2f}, {z:.2f})")
def update_imu(self, namespace, pitch):
self.uav_labels[namespace]["pitch"].setText(f"Pitch{pitch:.2f}°")
def update_hdg(self, namespace, heading):
self.uav_labels[namespace]["heading"].setText(f"Heading{heading:.2f}°")
def update_groundspeed(self, namespace, groundspeed):
self.uav_labels[namespace]["groundspeed"].setText(f"地面速度:{groundspeed:.2f} m/s")
def update_ping(self, namespace, ping):
self.uav_labels[namespace]["ping"].setText(f"Ping{ping:.2f} ms")
def update_loss(self, namespace, loss):
self.uav_labels[namespace]["loss"].setText(f"丟包:{loss:.2f}%")
def update_frequency(self, namespace, frequency):
self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz")
def update_kbps(self, namespace, kbps):
self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s")
def update_param(self, namespace, value):
self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}")
def change_mode(self):
try:
selected_mode = self.modeComboBox.currentText()
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_mode(namespace, selected_mode)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}")
def arm_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.arm(namespace, True)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}")
def takeoff_drone(self):
try:
z_text = self.positionZ.text().strip()
z = float(z_text) if z_text else 5.0
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.takeoff(namespace, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def land_drone(self):
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.land(namespace)
def set_local_position(self):
try:
x = float(self.positionX.text().strip())
y = float(self.positionY.text().strip())
z = float(self.positionZ.text().strip())
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def reboot_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.reboot_drone(namespace)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"重啟失敗:{e}")
def set_SR1_EXTRA1(self):
param_value = self.paramInput.text().strip()
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_param(namespace, "SR1_EXTRA1", param_value)
self.workers.request_param(namespace, "SR1_EXTRA1")
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def main():
app = QtWidgets.QApplication(sys.argv)
window = DroneControlApp()
window.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()

@ -21,7 +21,8 @@ setup(
entry_points={
'console_scripts': [
'showmavsimple = unitdev02.test01:mavlink_analyzer_simple',
'fdm_switch_one = unitdev02.test01:fdm_swicth_example_one',
'fdm_switch_one = unitdev02.test01:fdm_switch_example_one',
'fdm_switch_two = unitdev02.test01:fdm_switch_example_two',
],
},
)

@ -0,0 +1,14 @@
import socket
SOCKET_PATH = "/tmp/unix_socket_example"
# 建立 socket
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(SOCKET_PATH)
# 傳送資料
client.sendall(b"Hello from client!")
data = client.recv(1024)
print("📤 Server replied:", data.decode())
client.close()

@ -0,0 +1,45 @@
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
# import mavros_msgs.srv
class TalkerNode(Node):
def __init__(self):
start_time = time.time()
super().__init__('talker_node')
end_time = time.time()
print(f"Node initialization took {end_time - start_time:.2f} seconds")
self.publisher_ = self.create_publisher(String, 'hahatest/_1', 10)
self.timer = self.create_timer(1.0, self.timer_callback) # 每秒執行一次
self.get_logger().info('TalkerNode has been started.')
def timer_callback(self):
msg = String()
msg.data = 'Hello, ROS 2!'
self.publisher_.publish(msg)
self.get_logger().info(f'Published: "{msg.data}"')
def main(args=None):
rclpy.init(args=args)
node = TalkerNode()
print("Before sleep")
time.sleep(5) # 等待 5 秒鐘
print("After sleep")
try:
start_time = time.time()
while time.time() - start_time < 10: # 持續 10 秒鐘
rclpy.spin_once(node)
time.sleep(1) # 每秒執行一次
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

@ -0,0 +1,33 @@
import socket
import os
# socket file path
SOCKET_PATH = "/tmp/unix_socket_example"
# 若檔案存在就先刪除
if os.path.exists(SOCKET_PATH):
os.remove(SOCKET_PATH)
# 建立 socket
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# 綁定 socket 到檔案
server.bind(SOCKET_PATH)
server.listen(1)
print("🔌 Server waiting for connection...")
# 等待 client 連線
conn, _ = server.accept()
print("✅ Client connected.")
while True:
data = conn.recv(1024)
if not data:
break
print("📥 Received:", data.decode())
conn.sendall(b"Echo: " + data)
conn.close()
server.close()
os.remove(SOCKET_PATH)

@ -46,4 +46,60 @@ RC_CHANNELS遙控器 (RC) 的通道數據,代表從操控者 (或模擬器)
'MISSION_CURRENT': 73, 'SIMSTATE': 72,
7. 任務與導航
MISSION_CURRENT目前正在執行的飛行任務 (Mission) 的編號。
SIMSTATESITL 模擬器的狀態,如飛行模式、地面速度等。
SIMSTATESITL 模擬器的狀態,如飛行模式、地面速度等。
================================
['__annotations__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__',
'_crc', '_fieldnames', '_header', '_instance_field', '_instance_offset', '_instances', '_link_id', '_msgbuf', '_pack', '_payload', '_posted', '_signed', '_timestamp', ,
, 'crc_extra', 'fieldunits_by_name', 'format_attr', 'get_crc', , 'get_header', 'get_link_id', , 'get_msgbuf', 'get_payload', 'get_seq', 'get_signed', 'instance_field', 'instance_offset', 'lengths', 'name', 'native_format', 'ordered_fieldnames', 'orders', 'pack', 'pitch', 'pitchspeed', 'roll', 'rollspeed', 'sign_packet', 'time_boot_ms', 'to_dict', 'to_json', 'unpacker', 'yaw', 'yawspeed']
mavlink 封包 種類名稱
'get_type()' '_type' 'msgname'
'get_msgId()' 'id'
'fieldenums_by_name'
發送端的 sysid get_srcComponent()
發送端的 compid get_srcComponent()
這邊是一組的 mavlink 封包 內容的
資料長度 'array_lengths'
資料名稱 'fieldnames' 'get_fieldnames()'
資料格式 'fieldtypes'
=====================================
HEARTBEAT {type : 2, autopilot : 3, base_mode : 81, custom_mode : 0, system_status : 3, mavlink_version : 3}
=====================================
?
https://github.com/ros/angles
https://github.com/ros-geographic-info/geographic_info
指令
mkdir -p ~/ros2_geographic_info/src
cd ~/ros2_geographic_info/src
git clone https://github.com/ros/angles.git -b ros2
git clone https://github.com/ros-geographic-info/geographic_info.git -b ros2
cd ..
rosdep check --from-paths src --ignore-src
colcon build --packages-select angles
. ./install/setup.bash
colcon build --packages-select geographic_info
. ./install/setup.bash
colcon build --packages-select mavros_msgs

@ -6,6 +6,10 @@ import time
import socket
import struct
# used at fdm_switch_example_two
import threading
import queue
def mavlink_analyzer_simple(count = 500):
# rclpy.init()
# node = rclpy.create_node('mavlink_analyzer_simple')
@ -32,11 +36,17 @@ def mavlink_analyzer_simple(count = 500):
if msg.get_type() == 'SERVO_OUTPUT_RAW':
print(msg)
pass
else:
print('No message yet, 1 second for data to fill')
sleep(1)
print("markA ------")
print(msg.get_type())
print(msg.ordered_fieldnames)
print("markA ------ END")
print('Messages Type received:')
print(msg_count)
@ -52,7 +62,6 @@ def fdm_parser_example(data=None):
data = bytes.fromhex('1a48e903b1340500e803e803e803e803000000000000000000000000000000000000000000000000')
# 1a48e903b1340500e803e803e803e803000000000000000000000000000000000000000000000000
parse_format = 'HHI16H'
if len(data) != struct.calcsize(parse_format):
@ -103,7 +112,8 @@ def fdm_switch_example_one():
elapsed_time = current_time - start_time
if elapsed_time >= 1.0:
print(f"Packets received in the last second: {packet_count}")
# print(f"Packets received in the last second: {packet_count}")
print(f'JSON Pack : {data_g2s}')
packet_count = 0
start_time = current_time
fdm_parser_example(data_s2g)
@ -113,34 +123,140 @@ def fdm_switch_example_two():
'''
這個範例在 SITL Gazebo 之間 做一個簡單的UDP轉發器
有轉換數據格式
time_usec
servo1_raw
servo16_raw
'''
# read info from SITL via MAVLink
connection_string="udp:127.0.0.1:14550"
connection_string="udp:127.0.0.1:14550" # uart mavlink # MATLAB
connection = mavutil.mavlink_connection(connection_string)
# set a target link pass JSON to Gazebo, as a client
sock_g2s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ('127.0.0.1', 19002)
data_queue = queue.Queue()
servo_out = [0] * 16 # [0 0 0 0 0 0 0 0 0.....]
data_queue.put(servo_out)
def send_data_from_queue(sock, server_address, q, frame_rate_fdm=1, frame_count_fdm=1):
interval = 1.0 / frame_rate_fdm
while True:
start_time = time.time()
try:
data = q.get(timeout=0.1) # [0 0 0 0 0 0 0 0 0.....]
if data is None:
break
last_data = data
except queue.Empty:
data = last_data
data_fdm = struct.pack('HHI16H', 0x481a, int(frame_rate_fdm * 0.1), frame_count_fdm, *servo_out) # [ FMD ]
sock.sendto(data_fdm, server_address)
frame_count_fdm += 3
# fdm_parser_example(data_fdm) # debug
elapsed_time = time.time() - start_time
sleep_time = interval - elapsed_time
if sleep_time > 0:
sleep(sleep_time)
Running = True
thread = threading.Thread(target=send_data_from_queue, args=(sock_g2s, server_address, data_queue))
thread.start()
print('Start sending data to Gazebo')
while Running:
msgb1 = None
msg = connection.recv_msg()
while msg :
if msg.get_type() == 'SERVO_OUTPUT_RAW':
msgb1 = msg
# break # 這裡不需要break因為我要最後一個 servo_output_raw 的值
msg = connection.recv_msg()
if msgb1:
for i in range(16):
servo_out[i] = getattr(msgb1, f'servo{i+1}_raw')
data_queue.put(servo_out)
msgb1 = None
# Running = False # debug
else:
# print('No message yet, 10 ms for data to fill')
sleep(0.01)
# Example of putting data into the queue
# data_queue.put(data)
# To stop the thread, you can put None into the queue
# data_queue.put(None)
from pymavlink.dialects.v20 import common # 使用 MAVLink 2.0
def mavlink_btye_generator_test():
# 創建一個 MAVLink 對象
mav = common.MAVLink(None) # 不透過 connection直接使用 None
# 創建一個 HEARTBEAT 訊息
msg = mav.heartbeat_encode(
type=mavutil.mavlink.MAV_TYPE_QUADROTOR,
autopilot=mavutil.mavlink.MAV_AUTOPILOT_GENERIC,
base_mode=0,
custom_mode=0,
system_status=mavutil.mavlink.MAV_STATE_ACTIVE
)
msg = mav.command_long_encode(
target_system=1,
target_component=1,
command=mavutil.mavlink.MAV_CMD_DO_SET_MODE,
confirmation=0,
param1=0, # Custom mode
param2=0, # Custom sub-mode
param3=0,
param4=0,
param5=0,
param6=0,
param7=0
)
# 取得 MAVLink 訊息的 bytes
mavlink_bytes = msg.pack(mav)
# 回傳 bytes
print(mavlink_bytes)
print(type(mavlink_bytes))
mav2 = common.MAVLink(None)
# my_msg = mav2.decode(mavlink_bytes)
print("========")
print(dir(mav2))
# print(my_msg)
def simple_getMavlink():
connection_string="udp:127.0.0.1:14550"
# connection_string='/dev/ttyUSB0'
connection = mavutil.mavlink_connection(connection_string)
while True:
msg = connection.recv_msg()
if msg:
print(msg)
else:
print('No message yet, 0.1 second for data to fill')
sleep(0.1)
print('Start')
mavlink_analyzer_simple()
# connection_string="udp:127.0.0.1:14550"
# connection = mavutil.mavlink_connection(connection_string)
# connection.mav.command_long_send(
# 1,
# 1,
# mavutil.mavlink.MAV_CMD_SET_MESSAGE_INTERVAL,
# 0, 33, 1000000, 0, 0, 0, 0, 0
# )
# connection.mav.command_long_send(
# 1,
# 1, # Component ID
# mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
# 0, # Confirmation
# 1, # 0: disarm, 1: arm
# 0, 0, 0, 0, 0, 0
# )
# fdm_switch_example_two()
# fdm_parser_example()
# mavlink_analyzer_simple(8)
# mavlink_btye_generator_test()
simple_getMavlink()
# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out, source_system=17, source_component=200)

@ -0,0 +1,47 @@
import threading
import time
from digi.xbee.devices import XBeeDevice
# Configure XBee connection
PORT = "/dev/ttyUSB1"
BAUD_RATE = 57600
# Initialize XBee device
device = XBeeDevice(PORT, BAUD_RATE)
device.open()
# Callback function to handle incoming messages
def data_received_callback(xbee_message):
sender = xbee_message.remote_device.get_64bit_addr()
try:
data = xbee_message.data.decode("utf-8") # Attempt UTF-8 decoding
except UnicodeDecodeError:
data = xbee_message.data.hex() # Fallback to HEX if decoding fails
print(f"\nReceived from {sender}: {data}\nEnter message: ", end="")
# Register callback for incoming data
device.add_data_received_callback(data_received_callback)
# Function to send broadcast messages (runs in a separate thread)
def send_messages():
while True:
user_input = input("\nEnter message: ") # User input
status = device.send_data_broadcast(user_input) # Broadcast message
if status:
print("Broadcast message sent successfully!")
else:
print("Failed to send broadcast message.")
# Start the message sending thread
send_thread = threading.Thread(target=send_messages, daemon=True)
send_thread.start()
print("Broadcast chat mode activated. Type a message and press Enter to send.\n")
# Keep the program running
try:
while True:
time.sleep(0.1) # Reduce CPU usage and ensure stability
except KeyboardInterrupt:
print("\nProgram terminated.")
device.close()

@ -0,0 +1,41 @@
import serial
import threading
PORT = "COM15" # AT Mode XBee COM Port
BAUD_RATE = 57600
# Initialize serial connection
ser = serial.Serial(PORT, BAUD_RATE, timeout=1)
# Function to receive messages from the Coordinator
def receive_data():
while True:
data = ser.readline().decode("utf-8", errors="ignore").strip() # Read incoming data
if data:
print(f"\nReceived from Coordinator: {data}")
print("Enter message: ", end="", flush=True) # Keep input prompt intact
# Function to send messages to the Coordinator
def send_data():
while True:
user_input = input("\nEnter message: ") # User input
ser.write(user_input.encode() + b'\r') # Send message in AT Mode
print("Message sent.")
# Start the receiving thread
receive_thread = threading.Thread(target=receive_data, daemon=True)
receive_thread.start()
# Start the sending thread
send_thread = threading.Thread(target=send_data, daemon=True)
send_thread.start()
print("AT Mode XBee communication started. Type a message and press Enter to send.\n")
# Keep the program running
try:
while True:
pass
except KeyboardInterrupt:
print("\nProgram terminated.")
ser.close()
Loading…
Cancel
Save