Compare commits
2 Commits
2c4fc4583e
...
6a71e4530f
| Author | SHA1 | Date |
|---|---|---|
|
|
6a71e4530f | 5 months ago |
|
|
7af138b02a | 6 months ago |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,7 @@
|
||||
"""
|
||||
共用工具模組
|
||||
"""
|
||||
from .ringBuffer import RingBuffer
|
||||
from .theLogger import setup_logger
|
||||
|
||||
__all__ = ['RingBuffer', 'setup_logger']
|
||||
@ -0,0 +1,260 @@
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
# 基礎功能的 import
|
||||
import queue
|
||||
import time
|
||||
|
||||
# ROS2 的 import
|
||||
import rclpy
|
||||
|
||||
# mavlink 的 import
|
||||
from pymavlink import mavutil
|
||||
|
||||
# 自定義的 import
|
||||
from ..fc_network_adapter import mavlinkObject as mo
|
||||
from ..fc_network_adapter import mavlinkVehicleView as mvv
|
||||
# from ..fc_network_adapter import mavlinkDevice as md
|
||||
|
||||
# ====================== 分割線 =====================
|
||||
|
||||
test_item = 10
|
||||
running_time = 3
|
||||
|
||||
|
||||
print('test_item : ', test_item)
|
||||
|
||||
'''
|
||||
測試項 個位數 表示分離的功能
|
||||
|
||||
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
|
||||
'''
|
||||
|
||||
if test_item == 10:
|
||||
# 需要開啟一個 ardupilot 的模擬器
|
||||
# 測試 mavlink_object 放入 ring buffer 的應用
|
||||
print('===> Start of Program .Test ', test_item)
|
||||
|
||||
# 清空 ring buffer
|
||||
mo.stream_bridge_ring.clear()
|
||||
mo.return_packet_ring.clear()
|
||||
|
||||
manager = mo.async_io_manager()
|
||||
manager.start()
|
||||
time.sleep(0.5) # 等待事件循環啟動
|
||||
|
||||
# 初始化輸入通道
|
||||
connection_string="udp:127.0..1:14571"
|
||||
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
|
||||
|
||||
sock = mavlink_socket1.port
|
||||
print("Socket port:", sock)
|
||||
|
||||
manager.add_mavlink_object(mavlink_object1)
|
||||
|
||||
start_time = time.time()
|
||||
while (time.time() - start_time) < running_time:
|
||||
items_a = mo.stream_bridge_ring.get_all()
|
||||
items_b = mo.return_packet_ring.get_all()
|
||||
try:
|
||||
print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}")
|
||||
except IndexError:
|
||||
print("stream_bridge_ring is empty")
|
||||
|
||||
try:
|
||||
print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}")
|
||||
except IndexError:
|
||||
print("return_packet_ring is empty")
|
||||
time.sleep(1)
|
||||
|
||||
manager.shutdown()
|
||||
|
||||
print('<=== End of Program')
|
||||
|
||||
elif test_item == 11:
|
||||
# 需要開啟一個 ardupilot 的模擬器
|
||||
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
|
||||
print('===> Start of Program .Test ', test_item)
|
||||
|
||||
# 清空 ring buffer
|
||||
mo.stream_bridge_ring.clear()
|
||||
mo.return_packet_ring.clear()
|
||||
|
||||
manager = mo.async_io_manager()
|
||||
manager.start()
|
||||
time.sleep(0.5) # 等待事件循環啟動
|
||||
|
||||
# 初始化輸入通道
|
||||
connection_string="udp:127.0.0.1:14571"
|
||||
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
|
||||
manager.add_mavlink_object(mavlink_object1)
|
||||
|
||||
# 啟動 mavlink_bridge
|
||||
bridge = mo.mavlink_bridge()
|
||||
bridge.start()
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
# 印出目前所有 mavlink_systems 的內容
|
||||
print('目前所有的系統 : ')
|
||||
all_vehicles = mvv.vehicle_registry.get_all()
|
||||
for sysid, vehicle in all_vehicles.items():
|
||||
print(f" System {sysid}: {vehicle}")
|
||||
|
||||
start_time = time.time()
|
||||
show_time = time.time()
|
||||
while time.time() - start_time < running_time:
|
||||
if (time.time() - show_time) >= 2:
|
||||
# print("mark B")
|
||||
|
||||
show_time = time.time()
|
||||
for sysid, vehicle in all_vehicles.items():
|
||||
for compid in vehicle.components:
|
||||
comp = vehicle.get_component(compid)
|
||||
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, comp.packet_stats.received_count))
|
||||
comp.reset_packet_stats()
|
||||
print("===================")
|
||||
|
||||
manager.shutdown()
|
||||
bridge.stop()
|
||||
|
||||
print('<=== End of Program')
|
||||
|
||||
elif test_item == 12:
|
||||
# 需要開啟一個 ardupilot 的模擬器 與 GCS
|
||||
# 這邊是測試 mavlink object 作為交換器功能的代碼
|
||||
print('===> Start of Program .Test ', test_item)
|
||||
|
||||
# 清空 ring buffer
|
||||
mo.stream_bridge_ring.clear()
|
||||
mo.return_packet_ring.clear()
|
||||
|
||||
manager = mo.async_io_manager()
|
||||
manager.start()
|
||||
time.sleep(0.5) # 等待事件循環啟動
|
||||
|
||||
# 初始化輸入通道
|
||||
connection_string="udp:127.0.0.1:14571"
|
||||
mavlink_socket_in1 = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1)
|
||||
|
||||
connection_string="udp:127.0.0.1:14571"
|
||||
mavlink_socket_in2 = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2)
|
||||
|
||||
# 初始化輸出通道
|
||||
connection_string="udpout:127.0.0.1:14551"
|
||||
mavlink_socket_out = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
|
||||
|
||||
manager.add_mavlink_object(mavlink_object_out)
|
||||
manager.add_mavlink_object(mavlink_object_in1)
|
||||
manager.add_mavlink_object(mavlink_object_in2)
|
||||
|
||||
time.sleep(1) # 等待通道啟動
|
||||
|
||||
mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id)
|
||||
mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id)
|
||||
|
||||
mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id)
|
||||
mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id)
|
||||
|
||||
start_time = time.time()
|
||||
while (time.time() - start_time) < running_time:
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
manager.shutdown()
|
||||
|
||||
print('<=== End of Program')
|
||||
|
||||
|
||||
|
||||
elif test_item == 21:
|
||||
# 需要開啟一個 ardupilot 的模擬器
|
||||
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
|
||||
|
||||
print('===> Start of Program .Test ', test_item)
|
||||
# 初始化 rclpy 才能使用 node
|
||||
rclpy.init()
|
||||
|
||||
# 清空 ring buffer
|
||||
mo.stream_bridge_ring.clear()
|
||||
mo.return_packet_ring.clear()
|
||||
|
||||
manager = mo.async_io_manager()
|
||||
manager.start()
|
||||
|
||||
# 啟動 mavlink_bridge
|
||||
analyzer = mo.mavlink_bridge()
|
||||
# 關於 Node 的初始化
|
||||
show_time = time.time()
|
||||
analyzer._init_node() # 初始化 node
|
||||
print('初始化 node 完成 耗時 : ',time.time() - show_time)
|
||||
|
||||
time.sleep(0.5) # 系統 Setup 完成
|
||||
|
||||
|
||||
# 創建通道
|
||||
connection_string="udp:127.0.0.1:14560"
|
||||
mavlink_socket = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object3 = mo.mavlink_object(mavlink_socket)
|
||||
manager.add_mavlink_object(mavlink_object3)
|
||||
|
||||
print('=== waiting for mavlink data ...')
|
||||
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
|
||||
|
||||
print('目前所有的系統 : ')
|
||||
for sysid in analyzer.mavlink_systems:
|
||||
print(analyzer.mavlink_systems[sysid])
|
||||
|
||||
compid = 1
|
||||
sysid = 1
|
||||
start_time = time.time()
|
||||
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
end_time = time.time()
|
||||
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
|
||||
|
||||
print("start emit info")
|
||||
|
||||
start_time = time.time()
|
||||
show_time = time.time()
|
||||
while time.time() - start_time < running_time:
|
||||
try:
|
||||
# print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
|
||||
analyzer.emit_info() # 這邊是測試 node 的運行
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
# 程式結束
|
||||
analyzer.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
# 結束程式 退出所有 thread
|
||||
manager.stop()
|
||||
analyzer.stop()
|
||||
analyzer.thread.join()
|
||||
|
||||
mavlink_socket.close()
|
||||
print('<=== End of Program')
|
||||
|
||||
|
||||
elif test_item == 52:
|
||||
print('===> Start of Program .Test ', test_item)
|
||||
|
||||
manager = mo.async_io_manager()
|
||||
manager.start()
|
||||
|
||||
# print(manager.thread.is_alive())
|
||||
|
||||
manager.shutdown()
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
print('manager stopped')
|
||||
|
||||
@ -0,0 +1,331 @@
|
||||
"""
|
||||
VehicleView 使用範例
|
||||
展示如何使用純狀態容器來管理 MAVLink 載具資訊
|
||||
"""
|
||||
|
||||
import time
|
||||
from ..fc_network_adapter.mavlinkVehicleView import (
|
||||
VehicleView,
|
||||
VehicleComponent,
|
||||
RFModule,
|
||||
vehicle_registry,
|
||||
ConnectionType,
|
||||
ComponentType,
|
||||
RFModuleType
|
||||
)
|
||||
|
||||
|
||||
def example_basic_usage():
|
||||
"""基本使用範例"""
|
||||
print("=== 基本使用範例 ===\n")
|
||||
|
||||
# 1. 建立載具視圖
|
||||
vehicle = VehicleView(sysid=1)
|
||||
vehicle.kind = "Copter"
|
||||
vehicle.vehicle_type = 2 # MAV_TYPE_QUADROTOR
|
||||
vehicle.connected_via = ConnectionType.UDP
|
||||
|
||||
print(f"建立載具: {vehicle}\n")
|
||||
|
||||
# 2. 新增 autopilot 組件
|
||||
autopilot = vehicle.add_component(
|
||||
component_id=1,
|
||||
comp_type=ComponentType.AUTOPILOT
|
||||
)
|
||||
autopilot.mav_type = 2 # MAV_TYPE_QUADROTOR
|
||||
autopilot.mav_autopilot = 3 # MAV_AUTOPILOT_ARDUPILOTMEGA
|
||||
|
||||
print(f"新增組件: {autopilot}\n")
|
||||
|
||||
# 3. 手動餵入位置資訊
|
||||
autopilot.status.position.latitude = 25.0330
|
||||
autopilot.status.position.longitude = 121.5654
|
||||
autopilot.status.position.altitude = 100.5
|
||||
autopilot.status.position.timestamp = time.time()
|
||||
|
||||
print(f"位置: 緯度={autopilot.status.position.latitude}, "
|
||||
f"經度={autopilot.status.position.longitude}, "
|
||||
f"高度={autopilot.status.position.altitude}m\n")
|
||||
|
||||
# 4. 手動餵入姿態資訊
|
||||
autopilot.status.attitude.roll = 0.05 # 弧度
|
||||
autopilot.status.attitude.pitch = -0.02
|
||||
autopilot.status.attitude.yaw = 1.57
|
||||
autopilot.status.attitude.timestamp = time.time()
|
||||
|
||||
print(f"姿態: Roll={autopilot.status.attitude.roll:.3f}, "
|
||||
f"Pitch={autopilot.status.attitude.pitch:.3f}, "
|
||||
f"Yaw={autopilot.status.attitude.yaw:.3f} rad\n")
|
||||
|
||||
# 5. 手動餵入飛行模式
|
||||
autopilot.status.mode.base_mode = 89
|
||||
autopilot.status.mode.custom_mode = 4
|
||||
autopilot.status.mode.mode_name = "GUIDED"
|
||||
autopilot.status.mode.timestamp = time.time()
|
||||
|
||||
print(f"飛行模式: {autopilot.status.mode.mode_name}\n")
|
||||
|
||||
# 6. 手動餵入電池資訊
|
||||
autopilot.status.battery.voltage = 12.6
|
||||
autopilot.status.battery.current = 15.2
|
||||
autopilot.status.battery.remaining = 75
|
||||
autopilot.status.battery.timestamp = time.time()
|
||||
|
||||
print(f"電池: 電壓={autopilot.status.battery.voltage}V, "
|
||||
f"電流={autopilot.status.battery.current}A, "
|
||||
f"剩餘={autopilot.status.battery.remaining}%\n")
|
||||
|
||||
|
||||
def example_packet_tracking():
|
||||
"""封包追蹤範例"""
|
||||
print("\n=== 封包追蹤範例 ===\n")
|
||||
|
||||
vehicle = VehicleView(sysid=2)
|
||||
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
|
||||
|
||||
# 模擬接收封包
|
||||
timestamp = time.time()
|
||||
|
||||
# 接收 HEARTBEAT (msg_type=0)
|
||||
autopilot.update_packet_stats(seq=0, msg_type=0, timestamp=timestamp)
|
||||
|
||||
# 接收 ATTITUDE (msg_type=30)
|
||||
autopilot.update_packet_stats(seq=1, msg_type=30, timestamp=timestamp+0.1)
|
||||
|
||||
# 接收 GLOBAL_POSITION_INT (msg_type=33)
|
||||
autopilot.update_packet_stats(seq=2, msg_type=33, timestamp=timestamp+0.2)
|
||||
|
||||
# 模擬封包遺失 (seq 跳過 3, 4, 5)
|
||||
autopilot.update_packet_stats(seq=6, msg_type=0, timestamp=timestamp+0.3)
|
||||
|
||||
stats = autopilot.packet_stats
|
||||
print(f"封包統計:")
|
||||
print(f" 接收: {stats.received_count}")
|
||||
print(f" 遺失: {stats.lost_count}")
|
||||
print(f" 最後序號: {stats.last_seq}")
|
||||
print(f" 訊息類型計數: {stats.msg_type_count}\n")
|
||||
|
||||
|
||||
def example_parameters():
|
||||
"""參數管理範例"""
|
||||
print("\n=== 參數管理範例 ===\n")
|
||||
|
||||
vehicle = VehicleView(sysid=3)
|
||||
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
|
||||
|
||||
# 手動設定參數 (不會主動下載)
|
||||
autopilot.set_parameter("ARMING_CHECK", 1)
|
||||
autopilot.set_parameter("ANGLE_MAX", 4500)
|
||||
autopilot.set_parameter("WPNAV_SPEED", 500)
|
||||
|
||||
print(f"參數數量: {len(autopilot.parameters)}")
|
||||
print(f"ARMING_CHECK = {autopilot.get_parameter('ARMING_CHECK')}")
|
||||
print(f"ANGLE_MAX = {autopilot.get_parameter('ANGLE_MAX')}")
|
||||
print(f"WPNAV_SPEED = {autopilot.get_parameter('WPNAV_SPEED')}\n")
|
||||
|
||||
|
||||
def example_rf_module():
|
||||
"""RF模組範例"""
|
||||
print("\n=== RF模組範例 ===\n")
|
||||
|
||||
vehicle = VehicleView(sysid=4)
|
||||
vehicle.connected_via = ConnectionType.SERIAL
|
||||
|
||||
# 設定 XBee RF 模組
|
||||
rf = vehicle.set_rf_module(RFModuleType.XBEE)
|
||||
|
||||
# 更新 Socket 資訊
|
||||
rf.update_socket_info(
|
||||
ip="192.168.1.100",
|
||||
port=14550,
|
||||
local_ip="192.168.1.1",
|
||||
local_port=14551,
|
||||
connected=True
|
||||
)
|
||||
|
||||
# 更新 RSSI
|
||||
rf.update_rssi(rssi=-65, timestamp=time.time())
|
||||
|
||||
# 更新 AT 命令回應
|
||||
rf.update_at_response("OK", timestamp=time.time())
|
||||
|
||||
# 自定義狀態
|
||||
rf.status.custom_status['signal_quality'] = 'excellent'
|
||||
rf.status.custom_status['packet_error_rate'] = 0.001
|
||||
|
||||
print(f"RF模組: {rf}")
|
||||
print(f"Socket: {rf.socket_info.ip}:{rf.socket_info.port}")
|
||||
print(f"RSSI: {rf.status.rssi} dBm")
|
||||
print(f"AT回應: {rf.status.at_response}")
|
||||
print(f"自定義狀態: {rf.status.custom_status}\n")
|
||||
|
||||
|
||||
def example_multiple_components():
|
||||
"""多組件範例"""
|
||||
print("\n=== 多組件範例 ===\n")
|
||||
|
||||
vehicle = VehicleView(sysid=5)
|
||||
vehicle.kind = "Copter with Gimbal"
|
||||
|
||||
# Autopilot 組件
|
||||
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
|
||||
autopilot.mav_type = 2
|
||||
autopilot.status.mode.mode_name = "AUTO"
|
||||
|
||||
# Gimbal 組件
|
||||
gimbal = vehicle.add_component(154, ComponentType.GIMBAL)
|
||||
gimbal.mav_type = 26 # MAV_TYPE_GIMBAL
|
||||
gimbal.status.attitude.pitch = -0.785 # 向下45度
|
||||
gimbal.status.attitude.yaw = 0.0
|
||||
|
||||
# Camera 組件
|
||||
camera = vehicle.add_component(100, ComponentType.CAMERA)
|
||||
camera.mav_type = 30 # MAV_TYPE_CAMERA
|
||||
camera.status.custom_status['recording'] = True
|
||||
camera.status.custom_status['photo_interval'] = 2.0
|
||||
|
||||
print(f"載具: {vehicle}")
|
||||
print(f"組件數量: {len(vehicle.components)}")
|
||||
for cid, comp in vehicle.components.items():
|
||||
print(f" 組件 {cid}: {comp.type.value}, MAV_TYPE={comp.mav_type}")
|
||||
print()
|
||||
|
||||
|
||||
def example_registry():
|
||||
"""註冊表使用範例"""
|
||||
print("\n=== 註冊表使用範例 ===\n")
|
||||
|
||||
# 註冊多個載具
|
||||
v1 = vehicle_registry.register(sysid=1)
|
||||
v1.kind = "Copter-1"
|
||||
v1.add_component(1, ComponentType.AUTOPILOT)
|
||||
|
||||
v2 = vehicle_registry.register(sysid=2)
|
||||
v2.kind = "Plane-1"
|
||||
v2.add_component(1, ComponentType.AUTOPILOT)
|
||||
|
||||
v3 = vehicle_registry.register(sysid=3)
|
||||
v3.kind = "Rover-1"
|
||||
v3.add_component(1, ComponentType.AUTOPILOT)
|
||||
|
||||
print(f"註冊表中的載具數量: {len(vehicle_registry)}")
|
||||
|
||||
# 取得所有載具
|
||||
all_vehicles = vehicle_registry.get_all()
|
||||
for sysid, vehicle in all_vehicles.items():
|
||||
print(f" System {sysid}: {vehicle.kind}")
|
||||
|
||||
# 檢查載具是否存在
|
||||
print(f"\nSystem 2 存在? {2 in vehicle_registry}")
|
||||
print(f"System 99 存在? {99 in vehicle_registry}")
|
||||
|
||||
# 取得特定載具
|
||||
vehicle = vehicle_registry.get(2)
|
||||
if vehicle:
|
||||
print(f"\n取得載具: {vehicle}")
|
||||
|
||||
# 註銷載具
|
||||
vehicle_registry.unregister(3)
|
||||
print(f"\n註銷 System 3 後,剩餘載具: {len(vehicle_registry)}\n")
|
||||
|
||||
|
||||
def example_serialization():
|
||||
"""序列化範例 (除錯/日誌用)"""
|
||||
print("\n=== 序列化範例 ===\n")
|
||||
|
||||
vehicle = VehicleView(sysid=10)
|
||||
vehicle.kind = "Test Copter"
|
||||
vehicle.connected_via = ConnectionType.UDP
|
||||
vehicle.custom_meta['firmware'] = 'ArduCopter 4.3.0'
|
||||
vehicle.custom_meta['frame_type'] = 'X'
|
||||
|
||||
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
|
||||
autopilot.mav_type = 2
|
||||
autopilot.status.position.altitude = 50.0
|
||||
autopilot.status.battery.voltage = 12.4
|
||||
autopilot.update_packet_stats(0, 0, time.time())
|
||||
autopilot.update_packet_stats(1, 30, time.time())
|
||||
|
||||
rf = vehicle.set_rf_module(RFModuleType.UDP)
|
||||
rf.update_rssi(-70)
|
||||
rf.update_socket_info(ip="192.168.1.200", port=14550, connected=True)
|
||||
|
||||
# 轉換為字典
|
||||
data = vehicle.to_dict()
|
||||
|
||||
print("載具資料 (字典格式):")
|
||||
import json
|
||||
print(json.dumps(data, indent=2, ensure_ascii=False))
|
||||
|
||||
|
||||
def example_gps_ekf():
|
||||
"""GPS 與 EKF 範例"""
|
||||
print("\n\n=== GPS 與 EKF 範例 ===\n")
|
||||
|
||||
vehicle = VehicleView(sysid=11)
|
||||
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
|
||||
|
||||
# GPS 資訊
|
||||
autopilot.status.gps.fix_type = 3 # 3D Fix
|
||||
autopilot.status.gps.satellites_visible = 12
|
||||
autopilot.status.gps.eph = 120 # HDOP = 1.2
|
||||
autopilot.status.gps.epv = 180 # VDOP = 1.8
|
||||
autopilot.status.gps.timestamp = time.time()
|
||||
|
||||
print(f"GPS:")
|
||||
print(f" Fix Type: {autopilot.status.gps.fix_type}")
|
||||
print(f" 衛星數: {autopilot.status.gps.satellites_visible}")
|
||||
print(f" HDOP: {autopilot.status.gps.eph/100}")
|
||||
|
||||
# EKF 資訊
|
||||
autopilot.status.ekf.flags = 0x1FF # 所有 flags 都 OK
|
||||
autopilot.status.ekf.velocity_variance = 0.5
|
||||
autopilot.status.ekf.pos_horiz_variance = 1.2
|
||||
autopilot.status.ekf.pos_vert_variance = 2.0
|
||||
autopilot.status.ekf.timestamp = time.time()
|
||||
|
||||
print(f"\nEKF:")
|
||||
print(f" Flags: 0x{autopilot.status.ekf.flags:X}")
|
||||
print(f" 速度變異: {autopilot.status.ekf.velocity_variance}")
|
||||
print(f" 水平位置變異: {autopilot.status.ekf.pos_horiz_variance}")
|
||||
print(f" 垂直位置變異: {autopilot.status.ekf.pos_vert_variance}\n")
|
||||
|
||||
|
||||
def example_vfr_hud():
|
||||
"""VFR HUD 範例"""
|
||||
print("\n=== VFR HUD 範例 ===\n")
|
||||
|
||||
vehicle = VehicleView(sysid=12)
|
||||
autopilot = vehicle.add_component(1, ComponentType.AUTOPILOT)
|
||||
|
||||
# VFR HUD 資訊
|
||||
autopilot.status.vfr.airspeed = 15.5 # m/s
|
||||
autopilot.status.vfr.groundspeed = 14.8 # m/s
|
||||
autopilot.status.vfr.heading = 90 # 東方
|
||||
autopilot.status.vfr.throttle = 65 # %
|
||||
autopilot.status.vfr.climb = 2.5 # m/s
|
||||
autopilot.status.vfr.timestamp = time.time()
|
||||
|
||||
print(f"VFR HUD:")
|
||||
print(f" 空速: {autopilot.status.vfr.airspeed} m/s")
|
||||
print(f" 地速: {autopilot.status.vfr.groundspeed} m/s")
|
||||
print(f" 航向: {autopilot.status.vfr.heading}°")
|
||||
print(f" 油門: {autopilot.status.vfr.throttle}%")
|
||||
print(f" 爬升率: {autopilot.status.vfr.climb} m/s\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 執行所有範例
|
||||
# example_basic_usage()
|
||||
# example_packet_tracking()
|
||||
# example_parameters()
|
||||
# example_rf_module()
|
||||
# example_multiple_components()
|
||||
# example_registry()
|
||||
# example_serialization()
|
||||
# example_gps_ekf()
|
||||
example_vfr_hud()
|
||||
|
||||
print("\n" + "="*50)
|
||||
print("所有範例執行完成!")
|
||||
print("="*50)
|
||||
@ -0,0 +1,152 @@
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
import time
|
||||
import threading
|
||||
|
||||
from ..fc_network_adapter.utils import RingBuffer
|
||||
|
||||
|
||||
def producer(buffer, count, interval=0.01):
|
||||
"""生產者:向緩衝區添加資料"""
|
||||
print(f"Producer started (thread {threading.get_ident()})")
|
||||
for i in range(count):
|
||||
# 嘗試寫入數據,直到成功
|
||||
while not buffer.put(f"Item-{i}"):
|
||||
print(f"Buffer full, producer waiting... (thread {threading.get_ident()})")
|
||||
time.sleep(0.1)
|
||||
|
||||
print(f"Produced: Item-{i}, buffer size: {buffer.size()}")
|
||||
time.sleep(interval) # 模擬生產過程
|
||||
|
||||
print(f"Producer finished (thread {threading.get_ident()})")
|
||||
|
||||
def consumer(buffer, max_items, interval=0.05):
|
||||
"""消費者:從緩衝區讀取資料"""
|
||||
print(f"Consumer started (thread {threading.get_ident()})")
|
||||
items_consumed = 0
|
||||
|
||||
while items_consumed < max_items:
|
||||
# 嘗試讀取數據
|
||||
item = buffer.get()
|
||||
if item:
|
||||
print(f"Consumed: {item}, buffer size: {buffer.size()}")
|
||||
items_consumed += 1
|
||||
else:
|
||||
print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})")
|
||||
|
||||
time.sleep(interval) # 模擬消費過程
|
||||
|
||||
print(f"Consumer finished (thread {threading.get_ident()})")
|
||||
|
||||
def batch_consumer(buffer, interval=0.2):
|
||||
"""批量消費者:一次性讀取緩衝區中的所有資料"""
|
||||
print(f"Batch consumer started (thread {threading.get_ident()})")
|
||||
|
||||
for _ in range(5): # 執行5次批量讀取
|
||||
time.sleep(interval) # 等待緩衝區積累數據
|
||||
items = buffer.get_all()
|
||||
if items:
|
||||
print(f"Batch consumed {len(items)} items: {items}")
|
||||
else:
|
||||
print("Buffer empty for batch consumer")
|
||||
|
||||
print(f"Batch consumer finished (thread {threading.get_ident()})")
|
||||
|
||||
def demonstrate_multi_writer():
|
||||
"""示範多個寫入執行緒同時操作緩衝區"""
|
||||
print("\n=== Demonstrating Multiple Writers ===")
|
||||
buffer = RingBuffer(capacity=80)
|
||||
|
||||
# 創建多個生產者執行緒
|
||||
threads = []
|
||||
for i in range(3):
|
||||
thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1)))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
# 等待所有執行緒完成
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
buffer.print_stats() # 印出統計資訊
|
||||
|
||||
# 讀出所有剩餘資料
|
||||
remaining = buffer.get_all()
|
||||
print(f"Remaining items in buffer after multiple writers: {remaining}")
|
||||
|
||||
def demonstrate_basic_usage():
|
||||
"""示範基本使用方式"""
|
||||
print("\n=== Demonstrating Basic Usage ===")
|
||||
# 創建緩衝區
|
||||
buffer = RingBuffer(capacity=20, buffer_id=7)
|
||||
|
||||
# 檢查初始狀態
|
||||
print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
|
||||
|
||||
# 添加幾個項目
|
||||
for i in range(5):
|
||||
buffer.put(f"Test-{i}")
|
||||
|
||||
# 檢查狀態
|
||||
print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
|
||||
|
||||
# 讀取一個項目
|
||||
item = buffer.get()
|
||||
print(f"Read item: {item}")
|
||||
print(f"After reading 1 item - Content Size: {buffer.size()}")
|
||||
|
||||
# 添加更多項目直到滿
|
||||
items_added = 0
|
||||
while not buffer.is_full():
|
||||
buffer.put(f"Fill-{items_added}")
|
||||
items_added += 1
|
||||
|
||||
print(f"Added {items_added} more items until full")
|
||||
print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
|
||||
|
||||
# 嘗試添加到已滿的緩衝區
|
||||
result = buffer.put("Overflow")
|
||||
print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}")
|
||||
|
||||
# 獲取所有項目
|
||||
all_items = buffer.get_all()
|
||||
print(f"All items in buffer: {all_items}")
|
||||
print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}")
|
||||
|
||||
# 印出統計資訊
|
||||
buffer.print_stats()
|
||||
|
||||
def demonstrate_producer_consumer():
|
||||
"""示範生產者-消費者模式"""
|
||||
print("\n=== Demonstrating Producer-Consumer Pattern ===")
|
||||
buffer = RingBuffer(capacity=16)
|
||||
|
||||
# 創建生產者和消費者執行緒
|
||||
producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1))
|
||||
consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2))
|
||||
batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5))
|
||||
|
||||
# 啟動執行緒
|
||||
producer_thread.start()
|
||||
consumer_thread.start()
|
||||
batch_thread.start()
|
||||
|
||||
# 等待執行緒完成
|
||||
producer_thread.join()
|
||||
consumer_thread.join()
|
||||
batch_thread.join()
|
||||
|
||||
# 檢查最終狀態
|
||||
print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}")
|
||||
|
||||
buffer.print_stats()
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 展示各種使用場景
|
||||
# demonstrate_basic_usage()
|
||||
# demonstrate_producer_consumer()
|
||||
demonstrate_multi_writer()
|
||||
|
||||
print("\nAll demonstrations completed!")
|
||||
@ -1,450 +0,0 @@
|
||||
import os
|
||||
import sys
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
# 基礎功能的 import
|
||||
import queue
|
||||
import time
|
||||
|
||||
# ROS2 的 import
|
||||
import rclpy
|
||||
|
||||
# mavlink 的 import
|
||||
from pymavlink import mavutil
|
||||
|
||||
# 自定義的 import
|
||||
from fc_network_adapter import mavlinkObject as mo
|
||||
from fc_network_adapter import mavlinkDevice as md
|
||||
|
||||
# ====================== 分割線 =====================
|
||||
|
||||
test_item = 12
|
||||
running_time = 10000
|
||||
|
||||
|
||||
print('test_item : ', test_item)
|
||||
|
||||
'''
|
||||
測試項 個位數 表示分離的功能
|
||||
|
||||
測試項 1X 表示 mavlink_object 的功能 測試連線的能力
|
||||
'''
|
||||
|
||||
if test_item == 10:
|
||||
# 需要開啟一個 ardupilot 的模擬器
|
||||
# 測試 mavlink_object 放入 ring buffer 的應用
|
||||
print('===> Start of Program .Test ', test_item)
|
||||
|
||||
# 清空 ring buffer
|
||||
mo.stream_bridge_ring.clear()
|
||||
mo.return_packet_ring.clear()
|
||||
|
||||
manager = mo.async_io_manager()
|
||||
manager.start()
|
||||
time.sleep(0.5) # 等待事件循環啟動
|
||||
|
||||
# 初始化輸入通道
|
||||
connection_string="udp:127.0.0.1:14560"
|
||||
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
|
||||
|
||||
manager.add_mavlink_object(mavlink_object1)
|
||||
|
||||
start_time = time.time()
|
||||
while (time.time() - start_time) < running_time:
|
||||
items_a = mo.stream_bridge_ring.get_all()
|
||||
items_b = mo.return_packet_ring.get_all()
|
||||
try:
|
||||
print(f"data num {len(items_a)} in return_packet_ring, first data : {items_a[0][2]}")
|
||||
except IndexError:
|
||||
print("stream_bridge_ring is empty")
|
||||
|
||||
try:
|
||||
print(f"data num {len(items_b)} in return_packet_ring, first data : {items_b[0][2]}")
|
||||
except IndexError:
|
||||
print("return_packet_ring is empty")
|
||||
time.sleep(1)
|
||||
|
||||
manager.stop()
|
||||
|
||||
print('<=== End of Program')
|
||||
|
||||
elif test_item == 11:
|
||||
# 需要開啟一個 ardupilot 的模擬器
|
||||
# 這邊是測試代碼 確認 analyzer 運行後對於 device object 的建立與封包統計狀況
|
||||
print('===> Start of Program .Test ', test_item)
|
||||
|
||||
# 清空 ring buffer
|
||||
mo.stream_bridge_ring.clear()
|
||||
mo.return_packet_ring.clear()
|
||||
|
||||
manager = mo.async_io_manager()
|
||||
manager.start()
|
||||
time.sleep(0.5) # 等待事件循環啟動
|
||||
|
||||
# 初始化輸入通道
|
||||
connection_string="udp:127.0.0.1:14560"
|
||||
mavlink_socket1 = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object1 = mo.mavlink_object(mavlink_socket1)
|
||||
manager.add_mavlink_object(mavlink_object1)
|
||||
|
||||
# 啟動 mavlink_bridge
|
||||
analyzer = mo.mavlink_bridge()
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
# 印出目前所有 mavlink_systems 的內容
|
||||
print('目前所有的系統 : ')
|
||||
for sysid in analyzer.mavlink_systems:
|
||||
print(analyzer.mavlink_systems[sysid])
|
||||
|
||||
start_time = time.time()
|
||||
show_time = time.time()
|
||||
while time.time() - start_time < running_time:
|
||||
if (time.time() - show_time) >= 2:
|
||||
# print("mark B")
|
||||
|
||||
show_time = time.time()
|
||||
for sysid in analyzer.mavlink_systems:
|
||||
for compid in analyzer.mavlink_systems[sysid].components:
|
||||
print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
|
||||
analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
|
||||
print("===================")
|
||||
|
||||
manager.stop()
|
||||
analyzer.stop()
|
||||
|
||||
print('<=== End of Program')
|
||||
|
||||
elif test_item == 12:
|
||||
# 需要開啟一個 ardupilot 的模擬器 與 GCS
|
||||
# 這邊是測試 mavlink object 作為交換器功能的代碼
|
||||
print('===> Start of Program .Test ', test_item)
|
||||
|
||||
# 清空 ring buffer
|
||||
mo.stream_bridge_ring.clear()
|
||||
mo.return_packet_ring.clear()
|
||||
|
||||
manager = mo.async_io_manager()
|
||||
manager.start()
|
||||
time.sleep(0.5) # 等待事件循環啟動
|
||||
|
||||
# 初始化輸入通道
|
||||
connection_string="udp:127.0.0.1:14560"
|
||||
mavlink_socket_in1 = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object_in1 = mo.mavlink_object(mavlink_socket_in1)
|
||||
|
||||
connection_string="udp:127.0.0.1:14561"
|
||||
mavlink_socket_in2 = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object_in2 = mo.mavlink_object(mavlink_socket_in2)
|
||||
|
||||
# 初始化輸出通道
|
||||
connection_string="udpout:127.0.0.1:14550"
|
||||
mavlink_socket_out = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
|
||||
|
||||
manager.add_mavlink_object(mavlink_object_out)
|
||||
manager.add_mavlink_object(mavlink_object_in1)
|
||||
manager.add_mavlink_object(mavlink_object_in2)
|
||||
|
||||
time.sleep(1) # 等待通道啟動
|
||||
|
||||
mavlink_object_in1.add_target_socket(mavlink_object_out.socket_id)
|
||||
mavlink_object_out.add_target_socket(mavlink_object_in1.socket_id)
|
||||
|
||||
mavlink_object_in2.add_target_socket(mavlink_object_out.socket_id)
|
||||
mavlink_object_out.add_target_socket(mavlink_object_in2.socket_id)
|
||||
|
||||
start_time = time.time()
|
||||
while (time.time() - start_time) < running_time:
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
manager.stop()
|
||||
|
||||
print('<=== End of Program')
|
||||
|
||||
|
||||
# elif test_item == 20:
|
||||
# # 這邊測試 node 生成 topic 的功能
|
||||
# # 利用 空的通道 發出假的 heartbeat 封包
|
||||
# print('===> Start of Program .Test ', test_item)
|
||||
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
|
||||
|
||||
# from pymavlink.dialects.v20 import common as mavlink2
|
||||
|
||||
# sysid = 1
|
||||
# compid = 1
|
||||
|
||||
# # 啟動 mavlink_bridge
|
||||
# analyzer = mo.mavlink_bridge()
|
||||
|
||||
# mav = mavlink2.MAVLink(None)
|
||||
# mav.srcSystem = sysid
|
||||
# mav.srcComponent = compid
|
||||
|
||||
# # 這是一個 heartbeat 封包
|
||||
# fake_heartbeat = mavlink2.MAVLink_heartbeat_message(
|
||||
# type=mavlink2.MAV_TYPE_QUADROTOR,
|
||||
# autopilot=mavlink2.MAV_AUTOPILOT_ARDUPILOTMEGA,
|
||||
# base_mode=3,
|
||||
# custom_mode=0,
|
||||
# system_status=0,
|
||||
# mavlink_version=3
|
||||
# )
|
||||
# mavlink_bytes = fake_heartbeat.pack(mav)
|
||||
# fake_heartbeat_msg = mav.parse_char(mavlink_bytes)
|
||||
|
||||
# print(analyzer.mavlink_systems)
|
||||
|
||||
# mo.fixed_stream_bridge_queue.put((0, 0, fake_heartbeat_msg)) # socket id, timestamp, data
|
||||
# time.sleep(0.1)
|
||||
# print(analyzer.mavlink_systems)
|
||||
|
||||
# # ROS2 初始化
|
||||
# analyzer._init_node()
|
||||
|
||||
# # 創建 ROS2 topic
|
||||
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid]) # sysid, compid
|
||||
# print("topic created")
|
||||
# time.sleep(5)
|
||||
|
||||
# # 丟出 topic
|
||||
# analyzer.emit_info()
|
||||
|
||||
# # 結束程式
|
||||
# analyzer.running = False
|
||||
# analyzer.destroy_node()
|
||||
|
||||
# rclpy.shutdown()
|
||||
# analyzer.stop()
|
||||
# analyzer.thread.join()
|
||||
# print('<=== End of Program')
|
||||
|
||||
elif test_item == 21:
|
||||
# 需要開啟一個 ardupilot 的模擬器
|
||||
# 這邊是測試代碼 引入 rclpy 來測試 node 的運行
|
||||
|
||||
print('===> Start of Program .Test ', test_item)
|
||||
# 初始化 rclpy 才能使用 node
|
||||
rclpy.init()
|
||||
|
||||
# 清空 ring buffer
|
||||
mo.stream_bridge_ring.clear()
|
||||
mo.return_packet_ring.clear()
|
||||
|
||||
manager = mo.async_io_manager()
|
||||
manager.start()
|
||||
|
||||
# 啟動 mavlink_bridge
|
||||
analyzer = mo.mavlink_bridge()
|
||||
# 關於 Node 的初始化
|
||||
show_time = time.time()
|
||||
analyzer._init_node() # 初始化 node
|
||||
print('初始化 node 完成 耗時 : ',time.time() - show_time)
|
||||
|
||||
time.sleep(0.5) # 系統 Setup 完成
|
||||
|
||||
|
||||
# 創建通道
|
||||
connection_string="udp:127.0.0.1:14560"
|
||||
mavlink_socket = mavutil.mavlink_connection(connection_string)
|
||||
mavlink_object3 = mo.mavlink_object(mavlink_socket)
|
||||
manager.add_mavlink_object(mavlink_object3)
|
||||
|
||||
print('=== waiting for mavlink data ...')
|
||||
time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
|
||||
|
||||
print('目前所有的系統 : ')
|
||||
for sysid in analyzer.mavlink_systems:
|
||||
print(analyzer.mavlink_systems[sysid])
|
||||
|
||||
compid = 1
|
||||
sysid = 1
|
||||
start_time = time.time()
|
||||
analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
end_time = time.time()
|
||||
print(f"Execution time for create_flightMode: {end_time - start_time} seconds")
|
||||
|
||||
print("start emit info")
|
||||
|
||||
start_time = time.time()
|
||||
show_time = time.time()
|
||||
while time.time() - start_time < running_time:
|
||||
try:
|
||||
# print(analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode'])
|
||||
analyzer.emit_info() # 這邊是測試 node 的運行
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
# 程式結束
|
||||
analyzer.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
# 結束程式 退出所有 thread
|
||||
manager.stop()
|
||||
analyzer.stop()
|
||||
analyzer.thread.join()
|
||||
|
||||
mavlink_socket.close()
|
||||
print('<=== End of Program')
|
||||
|
||||
# elif test_item == 22:
|
||||
# # 需要開啟一個 ardupilot 的模擬器 與 GCS
|
||||
# # 這邊是測試代碼 引入 rclpy 來測試 node 的運行
|
||||
# print('===> Start of Program .Test ', test_item)
|
||||
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
|
||||
|
||||
# # 啟動 mavlink_bridge
|
||||
# analyzer = mo.mavlink_bridge()
|
||||
# # 關於 Node 的初始化
|
||||
# show_time = time.time()
|
||||
# analyzer._init_node() # 初始化 node
|
||||
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
|
||||
|
||||
# # 初始化兩個通道
|
||||
# connection_string_in="udp:127.0.0.1:15551"
|
||||
# mavlink_socket_in = mavutil.mavlink_connection(connection_string_in)
|
||||
# mavlink_object_in = mo.mavlink_object(mavlink_socket_in)
|
||||
# mavlink_object_in.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
|
||||
|
||||
|
||||
# connection_string_out="udpout:127.0.0.1:14553"
|
||||
# mavlink_socket_out = mavutil.mavlink_connection(connection_string_out)
|
||||
# mavlink_object_out = mo.mavlink_object(mavlink_socket_out)
|
||||
# mavlink_object_out.multiplexingToAnalysis = []
|
||||
|
||||
|
||||
|
||||
# # 讓兩個通道互相傳輸
|
||||
# mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all
|
||||
# mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all
|
||||
|
||||
# # 啟動通道
|
||||
# mavlink_object_in.run()
|
||||
# mavlink_object_out.run()
|
||||
|
||||
# print('waiting for mavlink data ...')
|
||||
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
|
||||
|
||||
# print('目前所有的系統 : ')
|
||||
# for sysid in analyzer.mavlink_systems:
|
||||
# print(analyzer.mavlink_systems[sysid])
|
||||
|
||||
# compid = 1
|
||||
# sysid = 1
|
||||
# show_time = time.time()
|
||||
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
# print(f"Execution time for create_flightMode: {time.time() - show_time} seconds")
|
||||
|
||||
# print("start emit info")
|
||||
|
||||
# start_time = time.time()
|
||||
# show_time = time.time()
|
||||
# show_time2 = time.time()
|
||||
|
||||
# while time.time() - start_time < running_time:
|
||||
# if (time.time() - show_time2) >= 1:
|
||||
# analyzer.emit_info() # 這邊是測試 node 的運行
|
||||
# # sss = analyzer.mavlink_systems[1].components[1].emitParams['flightMode_mode']
|
||||
# fmsg = analyzer.mavlink_systems[1].components[1].emitParams['flightMode']
|
||||
# sss = mavutil.mode_string_v10(fmsg)
|
||||
# # print("目前的飛行模式 : {}, Msg Seq : {}".format(sss, fmsg.get_seq()))
|
||||
# print("目前的飛行模式 : {}".format(sss))
|
||||
# show_time2 = time.time()
|
||||
|
||||
# # if (time.time() - show_time) >= 2:
|
||||
# # show_time = time.time()
|
||||
# # for sysid in analyzer.mavlink_systems:
|
||||
# # for compid in analyzer.mavlink_systems[sysid].components:
|
||||
# # print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count))
|
||||
# # analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid)
|
||||
# # print("===================")
|
||||
|
||||
|
||||
|
||||
# analyzer.destroy_node()
|
||||
# rclpy.shutdown()
|
||||
|
||||
|
||||
# # 結束程式 退出所有 thread
|
||||
# mavlink_object_in.stop()
|
||||
# mavlink_object_in.thread.join()
|
||||
# mavlink_socket_in.close()
|
||||
# mavlink_object_out.stop()
|
||||
# mavlink_object_out.thread.join()
|
||||
# mavlink_socket_out.close()
|
||||
# analyzer.stop()
|
||||
# analyzer.thread.join()
|
||||
|
||||
|
||||
# print('<=== End of Program')
|
||||
|
||||
# elif test_item == 51:
|
||||
|
||||
# # 晉凱的測試項目
|
||||
# print('===> Start of Program .Test ', test_item)
|
||||
# rclpy.init() # 注意要初始化 rclpy 才能使用 node
|
||||
|
||||
# # 啟動 mavlink_bridge
|
||||
# analyzer = mo.mavlink_bridge()
|
||||
# # 關於 Node 的初始化
|
||||
# show_time = time.time()
|
||||
# analyzer._init_node() # 初始化 node
|
||||
# print('初始化 node 完成 耗時 : ',time.time() - show_time)
|
||||
|
||||
# # 創建通道
|
||||
# connection_string="udp:127.0.0.1:15551"
|
||||
# mavlink_socket3 = mavutil.mavlink_connection(connection_string)
|
||||
# mavlink_object3 = mo.mavlink_object(mavlink_socket3)
|
||||
# # 設定通道流動
|
||||
# mavlink_object3.multiplexingToAnalysis = [0, 30, 32, 33, 74, 147]
|
||||
# mavlink_object3.multiplexingToReturn = [] #
|
||||
# # mavlink_object3.multiplexingToSwap = [] #
|
||||
# # 啟動通道
|
||||
# mavlink_object3.run()
|
||||
|
||||
|
||||
|
||||
# print('waiting for mavlink data ...')
|
||||
# time.sleep(2) # 等待 2 秒鐘 讓 device object 收到足夠的 mavlink 訊息
|
||||
|
||||
# compid = 1
|
||||
# sysid = 1
|
||||
# start_time = time.time()
|
||||
# analyzer.create_flightMode(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
# analyzer.create_attitude(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
# analyzer.create_local_position_pose(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
# analyzer.create_local_position_velocity(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
# analyzer.create_global_global(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
# analyzer.create_vfr_hud(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
# analyzer.create_battery(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
# analyzer.create_global_rel(sysid, analyzer.mavlink_systems[sysid].components[compid])
|
||||
# end_time = time.time()
|
||||
# print(f"Execution time for create all topic: {end_time - start_time} seconds")
|
||||
|
||||
# print("start emit info")
|
||||
|
||||
# start_time = time.time()
|
||||
# show_time = time.time()
|
||||
# while time.time() - start_time < running_time:
|
||||
# try:
|
||||
# # rclpy.spin(analyzer)
|
||||
# analyzer.emit_info() # 這邊是測試 node 的運行
|
||||
# time.sleep(0.5)
|
||||
# except KeyboardInterrupt:
|
||||
# break
|
||||
|
||||
# analyzer.destroy_node()
|
||||
# rclpy.shutdown()
|
||||
|
||||
# # 結束程式 退出所有 thread
|
||||
# mavlink_object3.stop()
|
||||
# mavlink_object3.thread.join()
|
||||
# analyzer.stop()
|
||||
# analyzer.thread.join()
|
||||
|
||||
# mavlink_socket3.close()
|
||||
# print('<=== End of Program')
|
||||
Loading…
Reference in New Issue