dev test for some mavlink command

master
picard 1 year ago
parent b5ff857ef8
commit eecdab8985

@ -4,11 +4,16 @@
專案核心環境
1. ROS2 Humble
2. Python
3. Ardupilot
===
必要相依套件
===
建議的開發測試
1. Gazebo Garden
===
Package 簡述

@ -1,4 +0,0 @@
[develop]
script_dir=$base/lib/drone_controller
[install]
install_scripts=$base/lib/drone_controller

@ -1,7 +1,7 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>drone_controller</name>
<name>unitdev01</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="dodo@todo.todo">dodo</maintainer>

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/unitdev01
[install]
install_scripts=$base/lib/unitdev01

@ -1,10 +1,11 @@
from setuptools import find_packages, setup
package_name = 'drone_controller'
package_name = 'unitdev01'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
@ -19,8 +20,8 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
'interface = drone_controller.interface:main',
'mavlink = drone_controller.mavlink:main'
'interface = unitdev01.interface:main',
'mavlink = unitdev01.mavlink:main'
],
},
)

@ -1,8 +1,5 @@
import sys
import time
import math
import rclpy
from collections import deque
from PyQt5 import QtWidgets
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox
@ -18,48 +15,15 @@ class MAVLinkWorker(QThread):
hdg_signal = pyqtSignal(str, float)
groundspeed_signal = pyqtSignal(str, float)
def __init__(self, connection_string="udp:127.0.0.1:14550", usb='/dev/ttyUSB0'):
def __init__(self, connection_string="udp:127.0.0.1:14550"):
super().__init__()
self.connection = mavutil.mavlink_connection(connection_string, baud=57600)
self.connection.wait_heartbeat()
self.connection = mavutil.mavlink_connection(connection_string)
self.running = True
self.uav_data = {}
# 設置需要監控的訊息類型
self.messages_to_monitor = ["HEARTBEAT", "BATTERY_STATUS", "GLOBAL_POSITION_INT", "GPS_RAW_INT", "LOCAL_POSITION_NED", "ATTITUDE", "VFR_HUD"]
# 記錄訊息的時間戳和丟包數
self.last_received_time = time.time()
self.message_count = 0
self.missed_messages = 0
# 用於計算頻率的時間窗口(每隔 1 秒計算一次頻率)
self.start_time = time.time()
# 記錄處理過的消息類型及其對應時間
self.message_times = {}
self.delay = deque(maxlen=20)
self.seq_numbers = {}
self.packet_loss_count = {}
self.total_packet_count = {}
self.loss_percentage = {}
self.seq = 0
self.ping_time = 0
def run(self):
while self.running:
try:
current_time = time.time()
self.connection.mav.timesync_send(int(current_time*1e6), 0)
'''
if self.ping_time - current_time > 1e3:
self.connection.mav.ping_send(
time_usec = current_time,
seq = self.seq,
target_system = 0,
target_component = 0
)
print(f"Ping {self.seq} sent at {current_time} ms")
'''
msg = self.connection.recv_match(blocking=True)
if not msg:
continue
@ -71,30 +35,6 @@ class MAVLinkWorker(QThread):
if sysid not in self.uav_data:
self.uav_data[sysid] = {"sysid": sysid}
# Packet loss calculation
if sysid not in self.seq_numbers:
self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking
self.packet_loss_count[sysid] = 0
self.total_packet_count[sysid] = 1
else:
current_seq = msg.get_seq()
expected_seq = (self.seq_numbers[sysid] + 1) % 256 # MAVLink sequence numbers are modulo 256
self.total_packet_count[sysid] += 1
if current_seq != expected_seq:
# Packet(s) lost
lost_packets = (current_seq - expected_seq) % 256
self.packet_loss_count[sysid] += lost_packets
self.seq_numbers[sysid] = current_seq
'''
if self.total_packet_count[sysid] > 0:
self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100
print(f"UAV {sysid} Packet Loss: {self.packet_loss_count[sysid]} / {self.total_packet_count[sysid]} "
f"({self.loss_percentage[sysid]:.2f}%)")
'''
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
self.state_signal.emit(namespace, mode)
@ -120,7 +60,7 @@ class MAVLinkWorker(QThread):
self.local_position_signal.emit(namespace, x, y, z)
elif msg_type == "ATTITUDE":
pitch = math.degrees(msg.pitch)
pitch = msg.pitch
self.imu_signal.emit(namespace, pitch)
elif msg_type == "VFR_HUD":
@ -129,45 +69,6 @@ class MAVLinkWorker(QThread):
self.hdg_signal.emit(namespace, heading)
self.groundspeed_signal.emit(namespace, groundspeed)
elif msg_type == "TIMESYNC":
# Calculate the round-trip time in milliseconds
round_trip_time = (int(time.time() * 1e6) - msg.ts1) / 1000.0
print(f"Round-trip time (ping): {round_trip_time:.3f} ms")
self.seq += 1
if msg_type in self.messages_to_monitor:
# 更新消息接收時間
if msg_type not in self.message_times:
self.message_times[msg_type] = current_time
else:
# 計算每條消息的頻率
self.time_diff = current_time - self.message_times[msg_type]
self.message_times[msg_type] = current_time
frequency = 1 / self.time_diff if self.time_diff > 0 else 0
timestamp = msg._timestamp
round_trip_delay = (timestamp - current_time)*1000
self.delay.append(round_trip_delay)
# 計算總的頻率
self.message_count += 1
self.last_received_time = current_time
# 每隔一段時間更新總頻率
if current_time - self.start_time >= 1:
# 更新每秒頻率
frequency = self.message_count / (current_time - self.start_time)
print(f"Overall frequency: {frequency:.2f} Hz")
if self.delay:
average = sum(self.delay) / len(self.delay)
print(f"The average delay is: {average:.4f} ms")
self.start_time = current_time
self.message_count = 0
except Exception as e:
print(f"Error reading message: {e}")
@ -185,7 +86,7 @@ class MAVLinkWorker(QThread):
mode = 4
elif mode == 'LOITER':
mode = 5
self.connection.mav.set_mode_send( #SET_MODE (11) — [DEP]
self.connection.mav.set_mode_send(
sysid,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode
@ -196,18 +97,27 @@ class MAVLinkWorker(QThread):
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, #MAV_CMD_COMPONENT_ARM_DISARM (400)
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0, # Confirmation
1 if arm else 0,
0, 0, 0, 0, 0, 0
)
# self.connection.mav.command_long_send(
# sysid,
# 1, # Component ID
# mavutil.mavlink.MAV_CMD_SET_MESSAGE_INTERVAL,
# 0,
# 33,
# -1,
# 0, 0, 0, 0, 0
# )
def takeoff(self, namespace, altitude):
sysid = int(namespace[-1])
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, #MAV_CMD_NAV_TAKEOFF (22)
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, # Command to execute
0, # Confirmation (0 = first transmission)
0, 0, 0, 0, # Parameters 1-4 (Unused in takeoff command)
0, 0, altitude # Latitude, Longitude, Altitude (target_altitude in meters)
@ -218,7 +128,7 @@ class MAVLinkWorker(QThread):
self.connection.mav.command_long_send(
sysid,
1, # Component ID
mavutil.mavlink.MAV_CMD_NAV_LAND, #MAV_CMD_NAV_LAND (21)
mavutil.mavlink.MAV_CMD_NAV_LAND,
0,
0, 0, 0, 0,
0, 0, 0
@ -236,24 +146,11 @@ class MAVLinkWorker(QThread):
0, 0 # Yaw, yaw_rate
)
def set_parameter(self, namespace, param_name, value):
sysid = int(namespace[-1])
try:
self.connection.mav.param_set_send(
sysid,
1,
param_name.encode('utf-8'),
float(value),
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
print(f"Parameter {param_name} set to {value}")
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
class DroneControlApp(QMainWindow):
def __init__(self):
super().__init__()
self.namespaces = ['uav1', 'uav4', 'uav100']
self.namespaces = ['uav1', 'uav2', 'uav3']
self.workers = MAVLinkWorker()
self.initUI()
@ -266,6 +163,7 @@ class DroneControlApp(QMainWindow):
self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.start()
def initUI(self):
@ -315,17 +213,6 @@ class DroneControlApp(QMainWindow):
main_layout.addLayout(uav_layout)
# EK3_SRC2_POSXY參數設置
param_layout = QHBoxLayout()
self.paramInput = QLineEdit()
self.paramInput.setPlaceholderText("輸入EK3_SRC2_POSXY的新值")
self.setParamButton = QPushButton("設置EK3_SRC2_POSXY")
self.setParamButton.clicked.connect(self.set_ek3_src2_posxy)
param_layout.addWidget(QLabel("EK3_SRC2_POSXY:"))
param_layout.addWidget(self.paramInput)
param_layout.addWidget(self.setParamButton)
main_layout.addLayout(param_layout)
# 模式切換區域
mode_layout = QHBoxLayout()
self.modeComboBox = QComboBox()
@ -459,15 +346,6 @@ class DroneControlApp(QMainWindow):
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def set_ek3_src2_posxy(self):
param_value = self.paramInput.text().strip()
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_parameter(namespace, "EK3_SRC2_POSXY", param_value)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def main(args=None):
rclpy.init(args=args)
app = QtWidgets.QApplication(sys.argv)

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>unitdev02</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="chiyu1468@hotmail.com">picars</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/unitdev02
[install]
install_scripts=$base/lib/unitdev02

@ -0,0 +1,27 @@
from setuptools import find_packages, setup
package_name = 'unitdev02'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='picars',
maintainer_email='chiyu1468@hotmail.com',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'showmavsimple = unitdev02.test01:mavlink_analyzer_simple',
'fdm_switch_one = unitdev02.test01:fdm_swicth_example_one',
],
},
)

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

@ -0,0 +1,49 @@
'ATTITUDE': 73, 'GLOBAL_POSITION_INT': 73,'NAV_CONTROLLER_OUTPUT': 73, 'AHRS': 73, 'AHRS2': 72,'LOCAL_POSITION_NED': 72,
1. 飛行狀態與姿態相關
ATTITUDE無人機的姿態資訊包括滾轉角 (Roll)、俯仰角 (Pitch)、偏航角 (Yaw) 及其角速度。
GLOBAL_POSITION_INT全球位置資訊提供經度、緯度、高度以及無人機的相對高度 (relative altitude)。
LOCAL_POSITION_NED無人機在當前本地座標系統 (NED - North, East, Down) 中的位置與速度資訊。
NAV_CONTROLLER_OUTPUT導航控制輸出包括航向 (heading)、導航誤差等資訊,用於導航系統內部運算。
AHRS輔助導航狀態如 AHRS (Attitude and Heading Reference System) 計算的誤差、修正參數。
AHRS2進一步的 AHRS 擴充資訊,例如不同座標系之間的轉換參數。
'VFR_HUD': 73, 'RAW_IMU': 73, 'SCALED_IMU2': 73, 'SCALED_IMU3': 73, 'SCALED_PRESSURE': 73, 'SCALED_PRESSURE2': 73,'VIBRATION': 72,
2. 感測器資訊
RAW_IMUIMU 原始數據,包括加速度計 (Accelerometer)、陀螺儀 (Gyroscope)、磁力計 (Magnetometer) 的測量值。
SCALED_IMU2 / SCALED_IMU3經過校正的 IMU 數據,可能來自第二組或第三組 IMU提供更精確的測量值。
SCALED_PRESSURE / SCALED_PRESSURE2經過校正的大氣壓力感測器數據用於高度計的計算。
VFR_HUDHUD (Head-Up Display) 相關資訊,如空速 (airspeed)、高度 (altitude)、油門 (throttle) 位置、飛行模式等。
VIBRATION機體振動資訊包含加速度變化率等幫助監測飛機是否有異常振動。
'SYS_STATUS': 73, 'POWER_STATUS': 73,'MEMINFO': 73, 'BATTERY_STATUS': 72, 'ESC_TELEMETRY_1_TO_4': 72,
3. 動力與系統狀態
SYS_STATUS無人機的系統狀態包括 CPU 負載、感測器狀態、儲存空間、記憶體狀態等。
POWER_STATUS無人機的電源狀態如電壓、電流消耗、備用電池狀態等。
BATTERY_STATUS詳細的電池資訊如電壓、電流、充電狀態、剩餘電量等。
ESC_TELEMETRY_1_TO_4電子調速器 (ESC) 的遙測數據,可能包括轉速、電流消耗、溫度等。
MEMINFO記憶體資訊如剩餘 RAM、堆疊使用量等用於監測飛控是否有記憶體不足問題。
'TERRAIN_REPORT': 72,'EKF_STATUS_REPORT': 72, 'GPS_RAW_INT': 73, 'WIND': 72,
4. 導航與 GPS 相關
GPS_RAW_INTGPS 原始數據,如衛星數量、經緯度、高度、水平與垂直精度等資訊。
TERRAIN_REPORT地形資訊可能包含當前無人機下方的地表高度 (Terrain Altitude) 等。
EKF_STATUS_REPORTEKF (Extended Kalman Filter) 狀態,提供導航估測誤差、狀態收斂情況等資訊。
WIND風速與風向資訊用於導航與飛控補償。
'SYSTEM_TIME': 73, 'HEARTBEAT': 18, 'TIMESYNC': 2
5. 通訊與時間同步
HEARTBEAT心跳訊號告知系統無人機仍然運作中並提供當前模式、系統狀態等基本資訊。
TIMESYNC時間同步封包確保地面站與無人機之間的時鐘同步。
SYSTEM_TIME系統時間資訊用於確保所有設備的時間基準一致。
'SERVO_OUTPUT_RAW': 73, 'RC_CHANNELS': 73,
6. 控制與遙控輸入
SERVO_OUTPUT_RAW各個伺服馬達的 PWM (Pulse Width Modulation) 輸出數據,反映控制器實際輸出的命令。
RC_CHANNELS遙控器 (RC) 的通道數據,代表從操控者 (或模擬器) 端傳遞過來的命令。
'MISSION_CURRENT': 73, 'SIMSTATE': 72,
7. 任務與導航
MISSION_CURRENT目前正在執行的飛行任務 (Mission) 的編號。
SIMSTATESITL 模擬器的狀態,如飛行模式、地面速度等。

@ -0,0 +1,146 @@
import rclpy
from pymavlink import mavutil
from time import sleep
import time
import socket
import struct
def mavlink_analyzer_simple(count = 500):
# rclpy.init()
# node = rclpy.create_node('mavlink_analyzer_simple')
print('Inintializing connection')
connection_string="udp:127.0.0.1:14550"
connection = mavutil.mavlink_connection(connection_string)
print('Catch messages')
msg_count = {}
start_time = time.time()
for _ in range(count):
msg = connection.recv_msg()
# msg = connection.recv_match(blocking=True)
if msg:
msg_type = msg.get_type()
if msg_type in msg_count:
msg_count[msg_type] += 1
else:
msg_count[msg_type] = 1
if msg.get_type() == 'SERVO_OUTPUT_RAW':
print(msg)
else:
print('No message yet, 1 second for data to fill')
sleep(1)
print('Messages Type received:')
print(msg_count)
end_time = time.time()
print('Time elapsed: ', end_time - start_time)
print('Closing connection')
connection.close()
def fdm_parser_example(data=None):
if data is None:
data = bytes.fromhex('1a48e903b1340500e803e803e803e803000000000000000000000000000000000000000000000000')
# 1a48e903b1340500e803e803e803e803000000000000000000000000000000000000000000000000
parse_format = 'HHI16H'
if len(data) != struct.calcsize(parse_format):
print("got packet of len %u, expected %u" % (len(data), struct.calcsize(parse_format)))
exit(1)
decoded = struct.unpack(parse_format,data)
magic = decoded[0]
frame_rate_hz = decoded[1]
frame_count = decoded[2]
pwm = decoded[3:]
print("magic: %u, frame_rate_hz: %u, frame_count: %u, pwm: %s" % (magic, frame_rate_hz, frame_count, pwm))
def fdm_switch_example_one():
'''
這個範例在 SITL Gazebo 之間 做一個簡單的UDP轉發器
沒有轉換數據格式
'''
# make a link get fdm from SITL, as a server
sock_s2g = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_s2g.bind(('', 9002))
sock_s2g.settimeout(0.1)
# set a target link pass JSON to Gazebo, as a client
sock_g2s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ('127.0.0.1', 19002)
packet_count = 0
start_time = time.time()
while True:
try:
# 接到SITL的數據 並轉發給Gazebo
data_s2g, addr_s2g = sock_s2g.recvfrom(1024)
sock_g2s.sendto(data_s2g, server_address)
# 得到Gazebo的回應 並轉發給SITL
data_g2s, addr_g2s = sock_g2s.recvfrom(1024)
sock_s2g.sendto(data_g2s, addr_s2g)
packet_count += 1
except socket.timeout:
pass
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time >= 1.0:
print(f"Packets received in the last second: {packet_count}")
packet_count = 0
start_time = current_time
fdm_parser_example(data_s2g)
def fdm_switch_example_two():
'''
這個範例在 SITL Gazebo 之間 做一個簡單的UDP轉發器
有轉換數據格式
'''
# read info from SITL via MAVLink
connection_string="udp:127.0.0.1:14550"
connection = mavutil.mavlink_connection(connection_string)
# set a target link pass JSON to Gazebo, as a client
sock_g2s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_address = ('127.0.0.1', 19002)
print('Start')
mavlink_analyzer_simple()
# connection_string="udp:127.0.0.1:14550"
# connection = mavutil.mavlink_connection(connection_string)
# connection.mav.command_long_send(
# 1,
# 1,
# mavutil.mavlink.MAV_CMD_SET_MESSAGE_INTERVAL,
# 0, 33, 1000000, 0, 0, 0, 0, 0
# )
# connection.mav.command_long_send(
# 1,
# 1, # Component ID
# mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
# 0, # Confirmation
# 1, # 0: disarm, 1: arm
# 0, 0, 0, 0, 0, 0
# )
Loading…
Cancel
Save