From e4b658d578fe24f62c0df0c0d98270039d734187 Mon Sep 17 00:00:00 2001 From: ken910606 Date: Thu, 8 Jan 2026 17:14:12 +0800 Subject: [PATCH] Delete 'src/unitdev01/unitdev01/xbee.py' --- src/unitdev01/unitdev01/xbee.py | 619 -------------------------------- 1 file changed, 619 deletions(-) delete mode 100644 src/unitdev01/unitdev01/xbee.py diff --git a/src/unitdev01/unitdev01/xbee.py b/src/unitdev01/unitdev01/xbee.py deleted file mode 100644 index b550e1e..0000000 --- a/src/unitdev01/unitdev01/xbee.py +++ /dev/null @@ -1,619 +0,0 @@ -import sys -import time -import math -import serial -import struct -from PyQt5 import QtWidgets -from PyQt5.QtCore import QThread, pyqtSignal -from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox -from pymavlink import mavutil -from pymavlink.dialects.v20 import ardupilotmega as mavlink2 - -class PacketCapture: - def __init__(self): - self.data = bytearray() - - def write(self, b): - self.data.extend(b) - return len(b) - -class MAVLinkWorker(QThread): - state_signal = pyqtSignal(str, str) - battery_signal = pyqtSignal(str, float) - gps_signal = pyqtSignal(str, float, float) - gps_status_signal = pyqtSignal(str, int, int) - local_position_signal = pyqtSignal(str, float, float, float) - imu_signal = pyqtSignal(str, float) - hdg_signal = pyqtSignal(str, float) - groundspeed_signal = pyqtSignal(str, float) - ping_signal = pyqtSignal(str, float) - loss_signal = pyqtSignal(str, float) - frequency_signal = pyqtSignal(str, float) - param_signal = pyqtSignal(str, int) - kbps_signal = pyqtSignal(str, float) - - def __init__(self): - super().__init__() - self.namespaces = ['UAV1', 'UAV2', 'UAV3'] - self.connection = mavutil.mavlink_connection('/dev/ttyUSB0', baud=57600) - self.connection.wait_heartbeat() - for sysid in self.namespaces: - sysid = int(sysid.replace('UAV', '')) - self.set_sr_params(sysid) - self.running = True - - # 用於計算頻率丟包 - self.message_count = {} - self.frequency = {} - self.start_time = {} - self.seq_numbers = {} - self.packet_loss_count = {} - self.total_packet_count = {} - self.loss_percentage = {} - self.total_bytes_received = {} - self.throughput_KBps = {} - - for namespace in self.namespaces: - self.request_param(namespace, "SR1_EXTRA1") - self.connection.mav.timesync_send( - 0, #tc1 - int(time.time() * 1e9) #ts1 - ) - - def run(self): - while self.running: - try: - msg = self.connection.recv_msg() - current_time = time.time() - if not msg: - continue - sysid = msg.get_srcSystem() - if sysid == 0: - continue - namespace = f"UAV{sysid}" - msg_type = msg.get_type() - if msg_type =="BAD_DATA": - continue - - if sysid not in self.total_bytes_received: - self.total_bytes_received[sysid] = 0 - self.throughput_KBps[sysid] = 0 - - # 計算訊息大小 - msg_bytes = msg.get_msgbuf() # 取得封包的 bytes - if msg_bytes: - self.total_bytes_received[sysid] += len(msg_bytes) - - # Packet loss calculation - if sysid not in self.seq_numbers: - self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking - self.packet_loss_count[sysid] = 0 - self.total_packet_count[sysid] = 1 - else: - current_seq = msg.get_seq() - expected_seq = (self.seq_numbers[sysid] + 1) % 256 - self.total_packet_count[sysid] += 1 - - if current_seq != expected_seq: # Packet(s) lost - lost_packets = (current_seq - expected_seq) % 256 - self.packet_loss_count[sysid] += lost_packets - self.total_packet_count[sysid] += lost_packets - - self.seq_numbers[sysid] = current_seq - self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100 - self.loss_signal.emit(namespace, self.loss_percentage[sysid]) - - # Frequency calculation - if sysid not in self.message_count: - self.message_count[sysid] = 0 - self.start_time[sysid] = current_time - - self.message_count[sysid] += 1 - - # 每隔一段時間更新 - elapsed_time = current_time - self.start_time[sysid] - if elapsed_time >= 1: - # 每秒頻率 - self.frequency[sysid] = self.message_count[sysid] / elapsed_time - self.frequency_signal.emit(namespace, self.frequency[sysid]) - - # 發送 timesync request - self.connection.mav.timesync_send( - 0, #tc1 - int(current_time * 1e9) #ts1 - ) - #吞吐量 - self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time - self.kbps_signal.emit(namespace, self.throughput_KBps[sysid]) - - self.start_time[sysid] = current_time - self.message_count[sysid] = 0 - self.total_bytes_received[sysid] = 0 - - if msg_type == "HEARTBEAT": - mode = mavutil.mode_string_v10(msg) - self.state_signal.emit(namespace, mode) - - elif msg_type == "BATTERY_STATUS": - voltage = msg.voltages[0] / 1000 - self.battery_signal.emit(namespace, voltage) - - elif msg_type == "GLOBAL_POSITION_INT": - latitude = msg.lat / 1e7 - longitude = msg.lon / 1e7 - self.gps_signal.emit(namespace, latitude, longitude) - - elif msg_type == "GPS_RAW_INT": - fix_type = msg.fix_type - satellites_visible = msg.satellites_visible - self.gps_status_signal.emit(namespace, fix_type, satellites_visible) - - elif msg_type == "LOCAL_POSITION_NED": - x = msg.y - y = msg.x - z = -msg.z - self.local_position_signal.emit(namespace, x, y, z) - - elif msg_type == "ATTITUDE": - pitch = math.degrees(msg.pitch) - self.imu_signal.emit(namespace, pitch) - - elif msg_type == "VFR_HUD": - groundspeed = msg.groundspeed - heading = msg.heading - self.hdg_signal.emit(namespace, heading) - self.groundspeed_signal.emit(namespace, groundspeed) - - elif msg_type == "TIMESYNC": - round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6 - if(round_trip_time<1e6): - self.ping_signal.emit(namespace, round_trip_time) - - elif msg_type == 'PARAM_VALUE': - param_name = "SR1_EXTRA1" - if msg.param_id == param_name: - self.param_signal.emit(namespace, msg.param_value) - - except Exception as e: - print(f"Error reading message: {e}") - - def stop(self): - self.running = False - self.connection.close() - - def build_api_tx_frame(self, data: bytes, frame_id=0x01): - frame_type = 0x10 - dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # Broadcast - dest_addr16 = b'\xFF\xFE' - broadcast_radius = 0x00 - options = 0x00 - - frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) - frame += dest_addr64 + dest_addr16 - frame += struct.pack(">BB", broadcast_radius, options) + data - checksum = 0xFF - (sum(frame) & 0xFF) - return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) - - def send_command(self, msg): - # Create the packet and send via serial port - PORT = "/dev/ttyUSB0" - BAUD = 57600 - ser = serial.Serial(PORT, BAUD) - capture = PacketCapture() - mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1) - mav.version = 2 - mav.send(msg) - api_frame = self.build_api_tx_frame(capture.data) - ser.write(api_frame) - print("RAW HEX:", capture.data.hex()) - - def set_mode(self, namespace, mode): - sysid = int(namespace.replace('UAV', '')) - if mode == 'STABILIZE': - mode = 0 - elif mode == 'AUTO': - mode = 3 - elif mode == 'GUIDED': - mode = 4 - elif mode == 'LOITER': - mode = 5 - - msg = self.connection.mav.command_long_encode( - target_system=sysid, - target_component=1, - command=mavutil.mavlink.MAV_CMD_DO_SET_MODE, - confirmation=0, - param1=1, # Custom mode enabled - param2=mode, - param3=0, param4=0, param5=0, param6=0, param7=0 - ) - self.send_command(msg) - - def arm(self, namespace, arm): - sysid = int(namespace.replace('UAV', '')) - msg = self.connection.mav.command_long_encode( - target_system=sysid, - target_component=1, - command=mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, - confirmation=0, - param1=1 if arm else 0, # 1 to arm, 0 to disarm - param2=0, param3=0, param4=0, param5=0, param6=0, param7=0 - ) - self.send_command(msg) - - def takeoff(self, namespace, altitude): - sysid = int(namespace.replace('UAV', '')) - msg = self.connection.mav.command_long_encode( - target_system=sysid, - target_component=1, - command=mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, - confirmation=0, - param1=0, param2=0, param3=0, param4=0, param5=0, param6=0, - param7=altitude - ) - self.send_command(msg) - - def land(self, namespace): - sysid = int(namespace.replace('UAV', '')) - msg = self.connection.mav.command_long_encode( - target_system=sysid, - target_component=1, - command=mavutil.mavlink.MAV_CMD_NAV_LAND, - confirmation=0, - param1=0, param2=0, param3=0, param4=0, param5=0, param6=0, - param7=0 - ) - self.send_command(msg) - - def set_local_position(self, namespace, x, y, z): - sysid = int(namespace.replace('UAV', '')) - msg = self.connection.mav.set_position_target_local_ned_encode( - 0, sysid, 1, # time_boot_ms, sysid, compid - mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame - 0b110111111000, # Mask - y, x, -z, # Position - 0, 0, 0, # Velocity - 0, 0, 0, # Acceleration - 0, 0 # Yaw, yaw_rate - ) - self.send_command(msg) - - def reboot_drone(self, namespace): - sysid = int(namespace.replace('UAV', '')) - self.connection.mav.command_long_send( - sysid, - 1, # target_component (一般為1) - mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, # Reboot 命令 - 0, # confirmation - 1, # 第一個參數 (1 = reboot,0 = 無操作,2 = 關機) - 0, 0, 0, 0, 0, 0 # 其餘參數保留 - ) - - def set_param(self, namespace, param_name, value): - sysid = int(namespace.replace('UAV', '')) - try: - self.connection.mav.param_set_send( - sysid, - 1, - param_name.encode('utf-8'), - float(value), - mavutil.mavlink.MAV_PARAM_TYPE_REAL32 - ) - except Exception as e: - print(f"Failed to set parameter {param_name}: {e}") - - def request_param(self, namespace, param_name): - sysid = int(namespace.replace('UAV', '')) - try: - self.connection.mav.param_request_read_send( - sysid, # 系統 ID - 1, # 組件 ID - param_name.encode('utf-8'), # 參數名稱 - -1 # 參數索引,-1 表示根據名稱請求 - ) - except Exception as e: - print(f"Failed to set parameter {param_name}: {e}") - - ''' - def set_sr_params(self, sysid): - """ 直接設置 MAVLink 訊息頻率 """ - freqs = [0, 2, 4] - params = { - "SR1_ADSB": freqs[0], - "SR1_EXT_STAT": freqs[1], - "SR1_EXTRA1": freqs[2], - "SR1_EXTRA2": freqs[2], - "SR1_EXTRA3": freqs[1], - "SR1_POSITION": freqs[1], - "SR1_RAW_SENS": freqs[1], - "SR1_RC_CHAN": freqs[1] - } - for param, value in params.items(): - self.set_param(f"UAV{sysid}", param, value) - ''' - def set_sr_params(self, sysid): - """ 直接設置 MAVLink 訊息頻率 """ - freqs = [0, 1, 1] - params = { - "SR1_ADSB": freqs[0], - "SR1_EXT_STAT": freqs[0], - "SR1_EXTRA1": freqs[2], - "SR1_EXTRA2": freqs[2], - "SR1_EXTRA3": freqs[0], - "SR1_POSITION": freqs[1], - "SR1_RAW_SENS": freqs[1], - "SR1_RC_CHAN": freqs[0] - } - for param, value in params.items(): - self.set_param(f"UAV{sysid}", param, value) - -class DroneControlApp(QMainWindow): - def __init__(self): - super().__init__() - self.workers = MAVLinkWorker() - self.namespaces = self.workers.namespaces - self.initUI() - - # Connect signals to update the UI - self.workers.state_signal.connect(self.update_state) - self.workers.battery_signal.connect(self.update_battery) - self.workers.gps_signal.connect(self.update_gps) - self.workers.gps_status_signal.connect(self.update_gps_status) - self.workers.local_position_signal.connect(self.update_local_position) - self.workers.imu_signal.connect(self.update_imu) - self.workers.hdg_signal.connect(self.update_hdg) - self.workers.groundspeed_signal.connect(self.update_groundspeed) - self.workers.ping_signal.connect(self.update_ping) - self.workers.loss_signal.connect(self.update_loss) - self.workers.frequency_signal.connect(self.update_frequency) - self.workers.param_signal.connect(self.update_param) - self.workers.kbps_signal.connect(self.update_kbps) - self.workers.start() - - def initUI(self): - self.setWindowTitle("多無人機控制介面") - self.setGeometry(100, 100, 800, 600) - - # 主佈局 - main_layout = QVBoxLayout() - - # 無人機選擇區域 - uav_layout = QHBoxLayout() - self.uav_checkboxes = {} - for namespace in self.namespaces: - checkbox = QCheckBox(namespace) - checkbox.setChecked(True) # 預設勾選 - self.uav_checkboxes[namespace] = checkbox - uav_layout.addWidget(checkbox) - main_layout.addLayout(uav_layout) - - # 顯示所有無人機資訊,從左到右顯示 - uav_layout = QHBoxLayout() - - # 逐個顯示 UAV 的資訊 - self.uav_labels = {} - for namespace in self.namespaces: - uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列 - - # 狀態顯示 - self.uav_labels[namespace] = { - "status": QLabel("狀態:等待數據..."), - "battery": QLabel("電壓:等待數據..."), - "local_position": QLabel("Local Position:等待數據..."), - "gps": QLabel("GPS位置:等待數據...\n\n"), - "fix": QLabel("Fix Type:等待數據..."), - "satellites": QLabel("衛星數量:等待數據..."), - "groundspeed": QLabel("地面速度:等待數據..."), - "pitch": QLabel("Pitch:等待數據..."), - "heading": QLabel("Heading:等待數據..."), - "ping": QLabel("Ping:等待數據..."), - "loss": QLabel("丟包:等待數據..."), - "frequency": QLabel("頻率:等待數據..."), - "kbps": QLabel("吞吐量:等待數據..."), - "param": QLabel("SR1_EXTRA1:等待數據...") - } - - # 把每個資訊添加到該 UAV 的垂直佈局中 - for label in self.uav_labels[namespace].values(): - uav_info_layout.addWidget(label) - - # 將該 UAV 的資訊佈局加到主佈局中(從左到右排列) - uav_layout.addLayout(uav_info_layout) - - main_layout.addLayout(uav_layout) - - # SR1_EXTRA1參數設置 - param_layout = QHBoxLayout() - self.paramInput = QLineEdit() - self.paramInput.setPlaceholderText("輸入SR1_EXTRA1的新值") - self.setParamButton = QPushButton("設置SR1_EXTRA1") - self.setParamButton.clicked.connect(self.set_SR1_EXTRA1) - param_layout.addWidget(self.paramInput) - param_layout.addWidget(self.setParamButton) - main_layout.addLayout(param_layout) - - # 模式切換區域 - mode_layout = QHBoxLayout() - self.modeComboBox = QComboBox() - self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"]) - self.modeButton = QPushButton("切換模式") - self.modeButton.clicked.connect(self.change_mode) - mode_layout.addWidget(QLabel("選擇模式:")) - mode_layout.addWidget(self.modeComboBox) - mode_layout.addWidget(self.modeButton) - main_layout.addLayout(mode_layout) - - # 解鎖按鈕 - fly_layout = QHBoxLayout() - self.armButton = QPushButton("解鎖") - self.armButton.clicked.connect(self.arm_drone) - - # 起飛按鈕 - self.takeoffButton = QPushButton("起飛") - self.takeoffButton.clicked.connect(self.takeoff_drone) - - # 降落按鈕 - self.landButton = QPushButton("降落") - self.landButton.clicked.connect(self.land_drone) - fly_layout.addWidget(self.armButton) - fly_layout.addWidget(self.takeoffButton) - fly_layout.addWidget(self.landButton) - main_layout.addLayout(fly_layout) - - # XYZ 設置欄位 - xyz_layout = QHBoxLayout() - self.positionX = QLineEdit() - self.positionX.setPlaceholderText("X") - self.positionY = QLineEdit() - self.positionY.setPlaceholderText("Y") - self.positionZ = QLineEdit() - self.positionZ.setPlaceholderText("Z") - self.setPositionButton = QPushButton("設置位置") - self.setPositionButton.clicked.connect(self.set_local_position) - xyz_layout.addWidget(QLabel("輸入位置:")) - xyz_layout.addWidget(self.positionX) - xyz_layout.addWidget(self.positionY) - xyz_layout.addWidget(self.positionZ) - xyz_layout.addWidget(self.setPositionButton) - main_layout.addLayout(xyz_layout) - - # 設置 XYZ 的按鈕 - - #設置重啟按鈕 - self.rebootButton = QPushButton("重啟") - self.rebootButton.clicked.connect(self.reboot_drone) - main_layout.addWidget(self.rebootButton) - - # 設置主佈局 - central_widget = QWidget(self) - central_widget.setLayout(main_layout) - self.setCentralWidget(central_widget) - self.show() - - def closeEvent(self, event): - try: - self.workers.stop() - finally: - event.accept() - - def update_state(self, namespace, mode): - self.uav_labels[namespace]["status"].setText(f"狀態:{mode}") - - def update_battery(self, namespace, voltage): - self.uav_labels[namespace]["battery"].setText(f"電壓:{voltage:.2f} V") - - def update_gps(self, namespace, latitude, longitude): - self.uav_labels[namespace]["gps"].setText(f"GPS位置: \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°") - - def update_gps_status(self, namespace, fix_type, satellites_visible): - fix_type_str = { - 0: "No Fix", - 1: "No Fix", - 2: "2D Fix", - 3: "3D Fix", - 4: "RTK Float", - 5: "RTK Fix" - }.get(fix_type, "Unknown") - self.uav_labels[namespace]["fix"].setText(f"Fix Type:{fix_type_str}") - self.uav_labels[namespace]["satellites"].setText(f"衛星數量:{satellites_visible}") - - def update_local_position(self, namespace, x, y, z): - self.uav_labels[namespace]["local_position"].setText(f"Local:({x:.2f}, {y:.2f}, {z:.2f})") - - def update_imu(self, namespace, pitch): - self.uav_labels[namespace]["pitch"].setText(f"Pitch:{pitch:.2f}°") - - def update_hdg(self, namespace, heading): - self.uav_labels[namespace]["heading"].setText(f"Heading:{heading:.2f}°") - - def update_groundspeed(self, namespace, groundspeed): - self.uav_labels[namespace]["groundspeed"].setText(f"地面速度:{groundspeed:.2f} m/s") - - def update_ping(self, namespace, ping): - self.uav_labels[namespace]["ping"].setText(f"Ping:{ping:.2f} ms") - - def update_loss(self, namespace, loss): - self.uav_labels[namespace]["loss"].setText(f"丟包:{loss:.2f}%") - - def update_frequency(self, namespace, frequency): - self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz") - - def update_kbps(self, namespace, kbps): - self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s") - - def update_param(self, namespace, value): - self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}") - - def change_mode(self): - try: - selected_mode = self.modeComboBox.currentText() - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.set_mode(namespace, selected_mode) - except Exception as e: - QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}") - - def arm_drone(self): - try: - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.arm(namespace, True) - except Exception as e: - QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}") - - def takeoff_drone(self): - try: - z_text = self.positionZ.text().strip() - z = float(z_text) if z_text else 5.0 - h = 2 - for i, namespace in enumerate(self.namespaces): - if self.uav_checkboxes[namespace].isChecked(): - self.workers.takeoff(namespace, z + h * i) - - except ValueError: - QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") - - def land_drone(self): - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.land(namespace) - - def set_local_position(self): - try: - x = float(self.positionX.text().strip()) - y = float(self.positionY.text().strip()) - z = float(self.positionZ.text().strip()) - h = 2 - for i, namespace in enumerate(self.namespaces): - if self.uav_checkboxes[namespace].isChecked(): - self.workers.set_local_position(namespace, x, y, z + h * i) - - except ValueError: - QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") - - def reboot_drone(self): - try: - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.reboot_drone(namespace) - except Exception as e: - QtWidgets.QMessageBox.warning(self, "錯誤", f"重啟失敗:{e}") - - def set_SR1_EXTRA1(self): - param_value = self.paramInput.text().strip() - try: - for namespace in self.namespaces: - if self.uav_checkboxes[namespace].isChecked(): - self.workers.set_param(namespace, "SR1_EXTRA1", param_value) - self.workers.request_param(namespace, "SR1_EXTRA1") - except ValueError: - QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") - -def main(): - app = QtWidgets.QApplication(sys.argv) - window = DroneControlApp() - window.show() - sys.exit(app.exec_()) - -if __name__ == '__main__': - main() \ No newline at end of file