Delete 'src/unitdev01/unitdev01/xbee.py'

chiyu
ken910606 4 months ago
parent 00eb6b512d
commit e4b658d578

@ -1,619 +0,0 @@
import sys
import time
import math
import serial
import struct
from PyQt5 import QtWidgets
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox
from pymavlink import mavutil
from pymavlink.dialects.v20 import ardupilotmega as mavlink2
class PacketCapture:
def __init__(self):
self.data = bytearray()
def write(self, b):
self.data.extend(b)
return len(b)
class MAVLinkWorker(QThread):
state_signal = pyqtSignal(str, str)
battery_signal = pyqtSignal(str, float)
gps_signal = pyqtSignal(str, float, float)
gps_status_signal = pyqtSignal(str, int, int)
local_position_signal = pyqtSignal(str, float, float, float)
imu_signal = pyqtSignal(str, float)
hdg_signal = pyqtSignal(str, float)
groundspeed_signal = pyqtSignal(str, float)
ping_signal = pyqtSignal(str, float)
loss_signal = pyqtSignal(str, float)
frequency_signal = pyqtSignal(str, float)
param_signal = pyqtSignal(str, int)
kbps_signal = pyqtSignal(str, float)
def __init__(self):
super().__init__()
self.namespaces = ['UAV1', 'UAV2', 'UAV3']
self.connection = mavutil.mavlink_connection('/dev/ttyUSB0', baud=57600)
self.connection.wait_heartbeat()
for sysid in self.namespaces:
sysid = int(sysid.replace('UAV', ''))
self.set_sr_params(sysid)
self.running = True
# 用於計算頻率丟包
self.message_count = {}
self.frequency = {}
self.start_time = {}
self.seq_numbers = {}
self.packet_loss_count = {}
self.total_packet_count = {}
self.loss_percentage = {}
self.total_bytes_received = {}
self.throughput_KBps = {}
for namespace in self.namespaces:
self.request_param(namespace, "SR1_EXTRA1")
self.connection.mav.timesync_send(
0, #tc1
int(time.time() * 1e9) #ts1
)
def run(self):
while self.running:
try:
msg = self.connection.recv_msg()
current_time = time.time()
if not msg:
continue
sysid = msg.get_srcSystem()
if sysid == 0:
continue
namespace = f"UAV{sysid}"
msg_type = msg.get_type()
if msg_type =="BAD_DATA":
continue
if sysid not in self.total_bytes_received:
self.total_bytes_received[sysid] = 0
self.throughput_KBps[sysid] = 0
# 計算訊息大小
msg_bytes = msg.get_msgbuf() # 取得封包的 bytes
if msg_bytes:
self.total_bytes_received[sysid] += len(msg_bytes)
# Packet loss calculation
if sysid not in self.seq_numbers:
self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking
self.packet_loss_count[sysid] = 0
self.total_packet_count[sysid] = 1
else:
current_seq = msg.get_seq()
expected_seq = (self.seq_numbers[sysid] + 1) % 256
self.total_packet_count[sysid] += 1
if current_seq != expected_seq: # Packet(s) lost
lost_packets = (current_seq - expected_seq) % 256
self.packet_loss_count[sysid] += lost_packets
self.total_packet_count[sysid] += lost_packets
self.seq_numbers[sysid] = current_seq
self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100
self.loss_signal.emit(namespace, self.loss_percentage[sysid])
# Frequency calculation
if sysid not in self.message_count:
self.message_count[sysid] = 0
self.start_time[sysid] = current_time
self.message_count[sysid] += 1
# 每隔一段時間更新
elapsed_time = current_time - self.start_time[sysid]
if elapsed_time >= 1:
# 每秒頻率
self.frequency[sysid] = self.message_count[sysid] / elapsed_time
self.frequency_signal.emit(namespace, self.frequency[sysid])
# 發送 timesync request
self.connection.mav.timesync_send(
0, #tc1
int(current_time * 1e9) #ts1
)
#吞吐量
self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time
self.kbps_signal.emit(namespace, self.throughput_KBps[sysid])
self.start_time[sysid] = current_time
self.message_count[sysid] = 0
self.total_bytes_received[sysid] = 0
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
self.state_signal.emit(namespace, mode)
elif msg_type == "BATTERY_STATUS":
voltage = msg.voltages[0] / 1000
self.battery_signal.emit(namespace, voltage)
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
longitude = msg.lon / 1e7
self.gps_signal.emit(namespace, latitude, longitude)
elif msg_type == "GPS_RAW_INT":
fix_type = msg.fix_type
satellites_visible = msg.satellites_visible
self.gps_status_signal.emit(namespace, fix_type, satellites_visible)
elif msg_type == "LOCAL_POSITION_NED":
x = msg.y
y = msg.x
z = -msg.z
self.local_position_signal.emit(namespace, x, y, z)
elif msg_type == "ATTITUDE":
pitch = math.degrees(msg.pitch)
self.imu_signal.emit(namespace, pitch)
elif msg_type == "VFR_HUD":
groundspeed = msg.groundspeed
heading = msg.heading
self.hdg_signal.emit(namespace, heading)
self.groundspeed_signal.emit(namespace, groundspeed)
elif msg_type == "TIMESYNC":
round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6
if(round_trip_time<1e6):
self.ping_signal.emit(namespace, round_trip_time)
elif msg_type == 'PARAM_VALUE':
param_name = "SR1_EXTRA1"
if msg.param_id == param_name:
self.param_signal.emit(namespace, msg.param_value)
except Exception as e:
print(f"Error reading message: {e}")
def stop(self):
self.running = False
self.connection.close()
def build_api_tx_frame(self, data: bytes, frame_id=0x01):
frame_type = 0x10
dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # Broadcast
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
def send_command(self, msg):
# Create the packet and send via serial port
PORT = "/dev/ttyUSB0"
BAUD = 57600
ser = serial.Serial(PORT, BAUD)
capture = PacketCapture()
mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1)
mav.version = 2
mav.send(msg)
api_frame = self.build_api_tx_frame(capture.data)
ser.write(api_frame)
print("RAW HEX:", capture.data.hex())
def set_mode(self, namespace, mode):
sysid = int(namespace.replace('UAV', ''))
if mode == 'STABILIZE':
mode = 0
elif mode == 'AUTO':
mode = 3
elif mode == 'GUIDED':
mode = 4
elif mode == 'LOITER':
mode = 5
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_DO_SET_MODE,
confirmation=0,
param1=1, # Custom mode enabled
param2=mode,
param3=0, param4=0, param5=0, param6=0, param7=0
)
self.send_command(msg)
def arm(self, namespace, arm):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
confirmation=0,
param1=1 if arm else 0, # 1 to arm, 0 to disarm
param2=0, param3=0, param4=0, param5=0, param6=0, param7=0
)
self.send_command(msg)
def takeoff(self, namespace, altitude):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
confirmation=0,
param1=0, param2=0, param3=0, param4=0, param5=0, param6=0,
param7=altitude
)
self.send_command(msg)
def land(self, namespace):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.command_long_encode(
target_system=sysid,
target_component=1,
command=mavutil.mavlink.MAV_CMD_NAV_LAND,
confirmation=0,
param1=0, param2=0, param3=0, param4=0, param5=0, param6=0,
param7=0
)
self.send_command(msg)
def set_local_position(self, namespace, x, y, z):
sysid = int(namespace.replace('UAV', ''))
msg = self.connection.mav.set_position_target_local_ned_encode(
0, sysid, 1, # time_boot_ms, sysid, compid
mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame
0b110111111000, # Mask
y, x, -z, # Position
0, 0, 0, # Velocity
0, 0, 0, # Acceleration
0, 0 # Yaw, yaw_rate
)
self.send_command(msg)
def reboot_drone(self, namespace):
sysid = int(namespace.replace('UAV', ''))
self.connection.mav.command_long_send(
sysid,
1, # target_component (一般為1)
mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, # Reboot 命令
0, # confirmation
1, # 第一個參數 (1 = reboot0 = 無操作2 = 關機)
0, 0, 0, 0, 0, 0 # 其餘參數保留
)
def set_param(self, namespace, param_name, value):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_set_send(
sysid,
1,
param_name.encode('utf-8'),
float(value),
mavutil.mavlink.MAV_PARAM_TYPE_REAL32
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
def request_param(self, namespace, param_name):
sysid = int(namespace.replace('UAV', ''))
try:
self.connection.mav.param_request_read_send(
sysid, # 系統 ID
1, # 組件 ID
param_name.encode('utf-8'), # 參數名稱
-1 # 參數索引,-1 表示根據名稱請求
)
except Exception as e:
print(f"Failed to set parameter {param_name}: {e}")
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 2, 4]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[1],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[1],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[1]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
'''
def set_sr_params(self, sysid):
""" 直接設置 MAVLink 訊息頻率 """
freqs = [0, 1, 1]
params = {
"SR1_ADSB": freqs[0],
"SR1_EXT_STAT": freqs[0],
"SR1_EXTRA1": freqs[2],
"SR1_EXTRA2": freqs[2],
"SR1_EXTRA3": freqs[0],
"SR1_POSITION": freqs[1],
"SR1_RAW_SENS": freqs[1],
"SR1_RC_CHAN": freqs[0]
}
for param, value in params.items():
self.set_param(f"UAV{sysid}", param, value)
class DroneControlApp(QMainWindow):
def __init__(self):
super().__init__()
self.workers = MAVLinkWorker()
self.namespaces = self.workers.namespaces
self.initUI()
# Connect signals to update the UI
self.workers.state_signal.connect(self.update_state)
self.workers.battery_signal.connect(self.update_battery)
self.workers.gps_signal.connect(self.update_gps)
self.workers.gps_status_signal.connect(self.update_gps_status)
self.workers.local_position_signal.connect(self.update_local_position)
self.workers.imu_signal.connect(self.update_imu)
self.workers.hdg_signal.connect(self.update_hdg)
self.workers.groundspeed_signal.connect(self.update_groundspeed)
self.workers.ping_signal.connect(self.update_ping)
self.workers.loss_signal.connect(self.update_loss)
self.workers.frequency_signal.connect(self.update_frequency)
self.workers.param_signal.connect(self.update_param)
self.workers.kbps_signal.connect(self.update_kbps)
self.workers.start()
def initUI(self):
self.setWindowTitle("多無人機控制介面")
self.setGeometry(100, 100, 800, 600)
# 主佈局
main_layout = QVBoxLayout()
# 無人機選擇區域
uav_layout = QHBoxLayout()
self.uav_checkboxes = {}
for namespace in self.namespaces:
checkbox = QCheckBox(namespace)
checkbox.setChecked(True) # 預設勾選
self.uav_checkboxes[namespace] = checkbox
uav_layout.addWidget(checkbox)
main_layout.addLayout(uav_layout)
# 顯示所有無人機資訊,從左到右顯示
uav_layout = QHBoxLayout()
# 逐個顯示 UAV 的資訊
self.uav_labels = {}
for namespace in self.namespaces:
uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列
# 狀態顯示
self.uav_labels[namespace] = {
"status": QLabel("狀態:等待數據..."),
"battery": QLabel("電壓:等待數據..."),
"local_position": QLabel("Local Position等待數據..."),
"gps": QLabel("GPS位置等待數據...\n\n"),
"fix": QLabel("Fix Type等待數據..."),
"satellites": QLabel("衛星數量:等待數據..."),
"groundspeed": QLabel("地面速度:等待數據..."),
"pitch": QLabel("Pitch等待數據..."),
"heading": QLabel("Heading等待數據..."),
"ping": QLabel("Ping等待數據..."),
"loss": QLabel("丟包:等待數據..."),
"frequency": QLabel("頻率:等待數據..."),
"kbps": QLabel("吞吐量:等待數據..."),
"param": QLabel("SR1_EXTRA1等待數據...")
}
# 把每個資訊添加到該 UAV 的垂直佈局中
for label in self.uav_labels[namespace].values():
uav_info_layout.addWidget(label)
# 將該 UAV 的資訊佈局加到主佈局中(從左到右排列)
uav_layout.addLayout(uav_info_layout)
main_layout.addLayout(uav_layout)
# SR1_EXTRA1參數設置
param_layout = QHBoxLayout()
self.paramInput = QLineEdit()
self.paramInput.setPlaceholderText("輸入SR1_EXTRA1的新值")
self.setParamButton = QPushButton("設置SR1_EXTRA1")
self.setParamButton.clicked.connect(self.set_SR1_EXTRA1)
param_layout.addWidget(self.paramInput)
param_layout.addWidget(self.setParamButton)
main_layout.addLayout(param_layout)
# 模式切換區域
mode_layout = QHBoxLayout()
self.modeComboBox = QComboBox()
self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"])
self.modeButton = QPushButton("切換模式")
self.modeButton.clicked.connect(self.change_mode)
mode_layout.addWidget(QLabel("選擇模式:"))
mode_layout.addWidget(self.modeComboBox)
mode_layout.addWidget(self.modeButton)
main_layout.addLayout(mode_layout)
# 解鎖按鈕
fly_layout = QHBoxLayout()
self.armButton = QPushButton("解鎖")
self.armButton.clicked.connect(self.arm_drone)
# 起飛按鈕
self.takeoffButton = QPushButton("起飛")
self.takeoffButton.clicked.connect(self.takeoff_drone)
# 降落按鈕
self.landButton = QPushButton("降落")
self.landButton.clicked.connect(self.land_drone)
fly_layout.addWidget(self.armButton)
fly_layout.addWidget(self.takeoffButton)
fly_layout.addWidget(self.landButton)
main_layout.addLayout(fly_layout)
# XYZ 設置欄位
xyz_layout = QHBoxLayout()
self.positionX = QLineEdit()
self.positionX.setPlaceholderText("X")
self.positionY = QLineEdit()
self.positionY.setPlaceholderText("Y")
self.positionZ = QLineEdit()
self.positionZ.setPlaceholderText("Z")
self.setPositionButton = QPushButton("設置位置")
self.setPositionButton.clicked.connect(self.set_local_position)
xyz_layout.addWidget(QLabel("輸入位置:"))
xyz_layout.addWidget(self.positionX)
xyz_layout.addWidget(self.positionY)
xyz_layout.addWidget(self.positionZ)
xyz_layout.addWidget(self.setPositionButton)
main_layout.addLayout(xyz_layout)
# 設置 XYZ 的按鈕
#設置重啟按鈕
self.rebootButton = QPushButton("重啟")
self.rebootButton.clicked.connect(self.reboot_drone)
main_layout.addWidget(self.rebootButton)
# 設置主佈局
central_widget = QWidget(self)
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
self.show()
def closeEvent(self, event):
try:
self.workers.stop()
finally:
event.accept()
def update_state(self, namespace, mode):
self.uav_labels[namespace]["status"].setText(f"狀態:{mode}")
def update_battery(self, namespace, voltage):
self.uav_labels[namespace]["battery"].setText(f"電壓:{voltage:.2f} V")
def update_gps(self, namespace, latitude, longitude):
self.uav_labels[namespace]["gps"].setText(f"GPS位置 \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°")
def update_gps_status(self, namespace, fix_type, satellites_visible):
fix_type_str = {
0: "No Fix",
1: "No Fix",
2: "2D Fix",
3: "3D Fix",
4: "RTK Float",
5: "RTK Fix"
}.get(fix_type, "Unknown")
self.uav_labels[namespace]["fix"].setText(f"Fix Type{fix_type_str}")
self.uav_labels[namespace]["satellites"].setText(f"衛星數量:{satellites_visible}")
def update_local_position(self, namespace, x, y, z):
self.uav_labels[namespace]["local_position"].setText(f"Local({x:.2f}, {y:.2f}, {z:.2f})")
def update_imu(self, namespace, pitch):
self.uav_labels[namespace]["pitch"].setText(f"Pitch{pitch:.2f}°")
def update_hdg(self, namespace, heading):
self.uav_labels[namespace]["heading"].setText(f"Heading{heading:.2f}°")
def update_groundspeed(self, namespace, groundspeed):
self.uav_labels[namespace]["groundspeed"].setText(f"地面速度:{groundspeed:.2f} m/s")
def update_ping(self, namespace, ping):
self.uav_labels[namespace]["ping"].setText(f"Ping{ping:.2f} ms")
def update_loss(self, namespace, loss):
self.uav_labels[namespace]["loss"].setText(f"丟包:{loss:.2f}%")
def update_frequency(self, namespace, frequency):
self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz")
def update_kbps(self, namespace, kbps):
self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s")
def update_param(self, namespace, value):
self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}")
def change_mode(self):
try:
selected_mode = self.modeComboBox.currentText()
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_mode(namespace, selected_mode)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}")
def arm_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.arm(namespace, True)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}")
def takeoff_drone(self):
try:
z_text = self.positionZ.text().strip()
z = float(z_text) if z_text else 5.0
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.takeoff(namespace, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def land_drone(self):
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.land(namespace)
def set_local_position(self):
try:
x = float(self.positionX.text().strip())
y = float(self.positionY.text().strip())
z = float(self.positionZ.text().strip())
h = 2
for i, namespace in enumerate(self.namespaces):
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_local_position(namespace, x, y, z + h * i)
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def reboot_drone(self):
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.reboot_drone(namespace)
except Exception as e:
QtWidgets.QMessageBox.warning(self, "錯誤", f"重啟失敗:{e}")
def set_SR1_EXTRA1(self):
param_value = self.paramInput.text().strip()
try:
for namespace in self.namespaces:
if self.uav_checkboxes[namespace].isChecked():
self.workers.set_param(namespace, "SR1_EXTRA1", param_value)
self.workers.request_param(namespace, "SR1_EXTRA1")
except ValueError:
QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!")
def main():
app = QtWidgets.QApplication(sys.argv)
window = DroneControlApp()
window.show()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Loading…
Cancel
Save