diff --git a/src/unitdev01/unitdev01/xbee.py b/src/unitdev01/unitdev01/xbee.py new file mode 100644 index 0000000..db51d0f --- /dev/null +++ b/src/unitdev01/unitdev01/xbee.py @@ -0,0 +1,620 @@ +import sys +import time +import math +import serial +import struct +from PyQt5 import QtWidgets +from PyQt5.QtCore import QThread, pyqtSignal +from PyQt5.QtWidgets import QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox, QWidget, QMainWindow, QComboBox +from pymavlink import mavutil +from pymavlink.dialects.v20 import ardupilotmega as mavlink2 + +class PacketCapture: + def __init__(self): + self.data = bytearray() + + def write(self, b): + self.data.extend(b) + return len(b) + +class MAVLinkWorker(QThread): + state_signal = pyqtSignal(str, str) + battery_signal = pyqtSignal(str, float) + gps_signal = pyqtSignal(str, float, float) + gps_status_signal = pyqtSignal(str, int, int) + local_position_signal = pyqtSignal(str, float, float, float) + imu_signal = pyqtSignal(str, float) + hdg_signal = pyqtSignal(str, float) + groundspeed_signal = pyqtSignal(str, float) + ping_signal = pyqtSignal(str, float) + loss_signal = pyqtSignal(str, float) + frequency_signal = pyqtSignal(str, float) + param_signal = pyqtSignal(str, int) + kbps_signal = pyqtSignal(str, float) + + def __init__(self): + super().__init__() + self.namespaces = ['UAV1', 'UAV2', 'UAV3'] + self.connection = mavutil.mavlink_connection('/dev/ttyUSB0', baud=57600) + self.connection.wait_heartbeat() + for sysid in self.namespaces: + sysid = int(sysid.replace('UAV', '')) + self.set_sr_params(sysid) + self.running = True + + # 用於計算頻率丟包 + self.message_count = {} + self.frequency = {} + self.start_time = {} + self.seq_numbers = {} + self.packet_loss_count = {} + self.total_packet_count = {} + self.loss_percentage = {} + self.total_bytes_received = {} + self.throughput_KBps = {} + + for namespace in self.namespaces: + self.request_param(namespace, "SR1_EXTRA1") + self.connection.mav.timesync_send( + 0, #tc1 + int(time.time() * 1e9) #ts1 + ) + + def run(self): + while self.running: + try: + msg = self.connection.recv_msg() + current_time = time.time() + if not msg: + continue + sysid = msg.get_srcSystem() + if sysid == 0: + continue + namespace = f"UAV{sysid}" + msg_type = msg.get_type() + if msg_type =="BAD_DATA": + continue + + + if sysid not in self.total_bytes_received: + self.total_bytes_received[sysid] = 0 + self.throughput_KBps[sysid] = 0 + + # 計算訊息大小 + msg_bytes = msg.get_msgbuf() # 取得封包的 bytes + if msg_bytes: + self.total_bytes_received[sysid] += len(msg_bytes) + + # Packet loss calculation + if sysid not in self.seq_numbers: + self.seq_numbers[sysid] = msg.get_seq() # Initialize sequence number tracking + self.packet_loss_count[sysid] = 0 + self.total_packet_count[sysid] = 1 + else: + current_seq = msg.get_seq() + expected_seq = (self.seq_numbers[sysid] + 1) % 256 + self.total_packet_count[sysid] += 1 + + if current_seq != expected_seq: # Packet(s) lost + lost_packets = (current_seq - expected_seq) % 256 + self.packet_loss_count[sysid] += lost_packets + self.total_packet_count[sysid] += lost_packets + + self.seq_numbers[sysid] = current_seq + self.loss_percentage[sysid] = (self.packet_loss_count[sysid] / self.total_packet_count[sysid]) * 100 + self.loss_signal.emit(namespace, self.loss_percentage[sysid]) + + # Frequency calculation + if sysid not in self.message_count: + self.message_count[sysid] = 0 + self.start_time[sysid] = current_time + + self.message_count[sysid] += 1 + + # 每隔一段時間更新 + elapsed_time = current_time - self.start_time[sysid] + if elapsed_time >= 1: + # 每秒頻率 + self.frequency[sysid] = self.message_count[sysid] / elapsed_time + self.frequency_signal.emit(namespace, self.frequency[sysid]) + + # 發送 timesync request + self.connection.mav.timesync_send( + 0, #tc1 + int(current_time * 1e9) #ts1 + ) + #吞吐量 + self.throughput_KBps[sysid] = (self.total_bytes_received[sysid] / (1024)) / elapsed_time + self.kbps_signal.emit(namespace, self.throughput_KBps[sysid]) + + self.start_time[sysid] = current_time + self.message_count[sysid] = 0 + self.total_bytes_received[sysid] = 0 + + if msg_type == "HEARTBEAT": + mode = mavutil.mode_string_v10(msg) + self.state_signal.emit(namespace, mode) + + elif msg_type == "BATTERY_STATUS": + voltage = msg.voltages[0] / 1000 + self.battery_signal.emit(namespace, voltage) + + elif msg_type == "GLOBAL_POSITION_INT": + latitude = msg.lat / 1e7 + longitude = msg.lon / 1e7 + self.gps_signal.emit(namespace, latitude, longitude) + + elif msg_type == "GPS_RAW_INT": + fix_type = msg.fix_type + satellites_visible = msg.satellites_visible + self.gps_status_signal.emit(namespace, fix_type, satellites_visible) + + elif msg_type == "LOCAL_POSITION_NED": + x = msg.y + y = msg.x + z = -msg.z + self.local_position_signal.emit(namespace, x, y, z) + + elif msg_type == "ATTITUDE": + pitch = math.degrees(msg.pitch) + self.imu_signal.emit(namespace, pitch) + + elif msg_type == "VFR_HUD": + groundspeed = msg.groundspeed + heading = msg.heading + self.hdg_signal.emit(namespace, heading) + self.groundspeed_signal.emit(namespace, groundspeed) + + elif msg_type == "TIMESYNC": + round_trip_time = (int(current_time * 1e9) - msg.ts1) / 1e6 + if(round_trip_time<1e6): + self.ping_signal.emit(namespace, round_trip_time) + + elif msg_type == 'PARAM_VALUE': + param_name = "SR1_EXTRA1" + if msg.param_id == param_name: + self.param_signal.emit(namespace, msg.param_value) + + except Exception as e: + print(f"Error reading message: {e}") + + def stop(self): + self.running = False + self.connection.close() + + def build_api_tx_frame(self, data: bytes, frame_id=0x01): + frame_type = 0x10 + dest_addr64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # Broadcast + dest_addr16 = b'\xFF\xFE' + broadcast_radius = 0x00 + options = 0x00 + + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += dest_addr64 + dest_addr16 + frame += struct.pack(">BB", broadcast_radius, options) + data + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + + def send_command(self, msg): + # Create the packet and send via serial port + PORT = "/dev/ttyUSB0" + BAUD = 57600 + ser = serial.Serial(PORT, BAUD) + capture = PacketCapture() + mav = mavlink2.MAVLink(capture, srcSystem=1, srcComponent=1) + mav.version = 2 + mav.send(msg) + api_frame = self.build_api_tx_frame(capture.data) + ser.write(api_frame) + print("RAW HEX:", capture.data.hex()) + + def set_mode(self, namespace, mode): + sysid = int(namespace.replace('UAV', '')) + if mode == 'STABILIZE': + mode = 0 + elif mode == 'AUTO': + mode = 3 + elif mode == 'GUIDED': + mode = 4 + elif mode == 'LOITER': + mode = 5 + + msg = self.connection.mav.command_long_encode( + target_system=sysid, + target_component=1, + command=mavutil.mavlink.MAV_CMD_DO_SET_MODE, + confirmation=0, + param1=1, # Custom mode enabled + param2=mode, + param3=0, param4=0, param5=0, param6=0, param7=0 + ) + self.send_command(msg) + + def arm(self, namespace, arm): + sysid = int(namespace.replace('UAV', '')) + msg = self.connection.mav.command_long_encode( + target_system=sysid, + target_component=1, + command=mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, + confirmation=0, + param1=1 if arm else 0, # 1 to arm, 0 to disarm + param2=0, param3=0, param4=0, param5=0, param6=0, param7=0 + ) + self.send_command(msg) + + def takeoff(self, namespace, altitude): + sysid = int(namespace.replace('UAV', '')) + msg = self.connection.mav.command_long_encode( + target_system=sysid, + target_component=1, + command=mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, + confirmation=0, + param1=0, param2=0, param3=0, param4=0, param5=0, param6=0, + param7=altitude + ) + self.send_command(msg) + + def land(self, namespace): + sysid = int(namespace.replace('UAV', '')) + msg = self.connection.mav.command_long_encode( + target_system=sysid, + target_component=1, + command=mavutil.mavlink.MAV_CMD_NAV_LAND, + confirmation=0, + param1=0, param2=0, param3=0, param4=0, param5=0, param6=0, + param7=0 + ) + self.send_command(msg) + + def set_local_position(self, namespace, x, y, z): + sysid = int(namespace.replace('UAV', '')) + msg = self.connection.mav.set_position_target_local_ned_encode( + 0, sysid, 1, # time_boot_ms, sysid, compid + mavutil.mavlink.MAV_FRAME_LOCAL_NED, # Frame + 0b110111111000, # Mask + y, x, -z, # Position + 0, 0, 0, # Velocity + 0, 0, 0, # Acceleration + 0, 0 # Yaw, yaw_rate + ) + self.send_command(msg) + + def reboot_drone(self, namespace): + sysid = int(namespace.replace('UAV', '')) + self.connection.mav.command_long_send( + sysid, + 1, # target_component (一般為1) + mavutil.mavlink.MAV_CMD_PREFLIGHT_REBOOT_SHUTDOWN, # Reboot 命令 + 0, # confirmation + 1, # 第一個參數 (1 = reboot,0 = 無操作,2 = 關機) + 0, 0, 0, 0, 0, 0 # 其餘參數保留 + ) + + def set_param(self, namespace, param_name, value): + sysid = int(namespace.replace('UAV', '')) + try: + self.connection.mav.param_set_send( + sysid, + 1, + param_name.encode('utf-8'), + float(value), + mavutil.mavlink.MAV_PARAM_TYPE_REAL32 + ) + except Exception as e: + print(f"Failed to set parameter {param_name}: {e}") + + def request_param(self, namespace, param_name): + sysid = int(namespace.replace('UAV', '')) + try: + self.connection.mav.param_request_read_send( + sysid, # 系統 ID + 1, # 組件 ID + param_name.encode('utf-8'), # 參數名稱 + -1 # 參數索引,-1 表示根據名稱請求 + ) + except Exception as e: + print(f"Failed to set parameter {param_name}: {e}") + + ''' + def set_sr_params(self, sysid): + """ 直接設置 MAVLink 訊息頻率 """ + freqs = [0, 2, 4] + params = { + "SR1_ADSB": freqs[0], + "SR1_EXT_STAT": freqs[1], + "SR1_EXTRA1": freqs[2], + "SR1_EXTRA2": freqs[2], + "SR1_EXTRA3": freqs[1], + "SR1_POSITION": freqs[1], + "SR1_RAW_SENS": freqs[1], + "SR1_RC_CHAN": freqs[1] + } + for param, value in params.items(): + self.set_param(f"UAV{sysid}", param, value) + ''' + def set_sr_params(self, sysid): + """ 直接設置 MAVLink 訊息頻率 """ + freqs = [0, 1, 1] + params = { + "SR1_ADSB": freqs[0], + "SR1_EXT_STAT": freqs[0], + "SR1_EXTRA1": freqs[2], + "SR1_EXTRA2": freqs[2], + "SR1_EXTRA3": freqs[0], + "SR1_POSITION": freqs[1], + "SR1_RAW_SENS": freqs[1], + "SR1_RC_CHAN": freqs[0] + } + for param, value in params.items(): + self.set_param(f"UAV{sysid}", param, value) + +class DroneControlApp(QMainWindow): + def __init__(self): + super().__init__() + self.workers = MAVLinkWorker() + self.namespaces = self.workers.namespaces + self.initUI() + + # Connect signals to update the UI + self.workers.state_signal.connect(self.update_state) + self.workers.battery_signal.connect(self.update_battery) + self.workers.gps_signal.connect(self.update_gps) + self.workers.gps_status_signal.connect(self.update_gps_status) + self.workers.local_position_signal.connect(self.update_local_position) + self.workers.imu_signal.connect(self.update_imu) + self.workers.hdg_signal.connect(self.update_hdg) + self.workers.groundspeed_signal.connect(self.update_groundspeed) + self.workers.ping_signal.connect(self.update_ping) + self.workers.loss_signal.connect(self.update_loss) + self.workers.frequency_signal.connect(self.update_frequency) + self.workers.param_signal.connect(self.update_param) + self.workers.kbps_signal.connect(self.update_kbps) + self.workers.start() + + def initUI(self): + self.setWindowTitle("多無人機控制介面") + self.setGeometry(100, 100, 800, 600) + + # 主佈局 + main_layout = QVBoxLayout() + + # 無人機選擇區域 + uav_layout = QHBoxLayout() + self.uav_checkboxes = {} + for namespace in self.namespaces: + checkbox = QCheckBox(namespace) + checkbox.setChecked(True) # 預設勾選 + self.uav_checkboxes[namespace] = checkbox + uav_layout.addWidget(checkbox) + main_layout.addLayout(uav_layout) + + # 顯示所有無人機資訊,從左到右顯示 + uav_layout = QHBoxLayout() + + # 逐個顯示 UAV 的資訊 + self.uav_labels = {} + for namespace in self.namespaces: + uav_info_layout = QVBoxLayout() # 每台 UAV 的資訊垂直排列 + + # 狀態顯示 + self.uav_labels[namespace] = { + "status": QLabel("狀態:等待數據..."), + "battery": QLabel("電壓:等待數據..."), + "local_position": QLabel("Local Position:等待數據..."), + "gps": QLabel("GPS位置:等待數據...\n\n"), + "fix": QLabel("Fix Type:等待數據..."), + "satellites": QLabel("衛星數量:等待數據..."), + "groundspeed": QLabel("地面速度:等待數據..."), + "pitch": QLabel("Pitch:等待數據..."), + "heading": QLabel("Heading:等待數據..."), + "ping": QLabel("Ping:等待數據..."), + "loss": QLabel("丟包:等待數據..."), + "frequency": QLabel("頻率:等待數據..."), + "kbps": QLabel("吞吐量:等待數據..."), + "param": QLabel("SR1_EXTRA1:等待數據...") + } + + # 把每個資訊添加到該 UAV 的垂直佈局中 + for label in self.uav_labels[namespace].values(): + uav_info_layout.addWidget(label) + + # 將該 UAV 的資訊佈局加到主佈局中(從左到右排列) + uav_layout.addLayout(uav_info_layout) + + main_layout.addLayout(uav_layout) + + # SR1_EXTRA1參數設置 + param_layout = QHBoxLayout() + self.paramInput = QLineEdit() + self.paramInput.setPlaceholderText("輸入SR1_EXTRA1的新值") + self.setParamButton = QPushButton("設置SR1_EXTRA1") + self.setParamButton.clicked.connect(self.set_SR1_EXTRA1) + param_layout.addWidget(self.paramInput) + param_layout.addWidget(self.setParamButton) + main_layout.addLayout(param_layout) + + # 模式切換區域 + mode_layout = QHBoxLayout() + self.modeComboBox = QComboBox() + self.modeComboBox.addItems(["GUIDED", "STABILIZE", "AUTO", "LOITER"]) + self.modeButton = QPushButton("切換模式") + self.modeButton.clicked.connect(self.change_mode) + mode_layout.addWidget(QLabel("選擇模式:")) + mode_layout.addWidget(self.modeComboBox) + mode_layout.addWidget(self.modeButton) + main_layout.addLayout(mode_layout) + + # 解鎖按鈕 + fly_layout = QHBoxLayout() + self.armButton = QPushButton("解鎖") + self.armButton.clicked.connect(self.arm_drone) + + # 起飛按鈕 + self.takeoffButton = QPushButton("起飛") + self.takeoffButton.clicked.connect(self.takeoff_drone) + + # 降落按鈕 + self.landButton = QPushButton("降落") + self.landButton.clicked.connect(self.land_drone) + fly_layout.addWidget(self.armButton) + fly_layout.addWidget(self.takeoffButton) + fly_layout.addWidget(self.landButton) + main_layout.addLayout(fly_layout) + + # XYZ 設置欄位 + xyz_layout = QHBoxLayout() + self.positionX = QLineEdit() + self.positionX.setPlaceholderText("X") + self.positionY = QLineEdit() + self.positionY.setPlaceholderText("Y") + self.positionZ = QLineEdit() + self.positionZ.setPlaceholderText("Z") + self.setPositionButton = QPushButton("設置位置") + self.setPositionButton.clicked.connect(self.set_local_position) + xyz_layout.addWidget(QLabel("輸入位置:")) + xyz_layout.addWidget(self.positionX) + xyz_layout.addWidget(self.positionY) + xyz_layout.addWidget(self.positionZ) + xyz_layout.addWidget(self.setPositionButton) + main_layout.addLayout(xyz_layout) + + # 設置 XYZ 的按鈕 + + #設置重啟按鈕 + self.rebootButton = QPushButton("重啟") + self.rebootButton.clicked.connect(self.reboot_drone) + main_layout.addWidget(self.rebootButton) + + # 設置主佈局 + central_widget = QWidget(self) + central_widget.setLayout(main_layout) + self.setCentralWidget(central_widget) + self.show() + + def closeEvent(self, event): + try: + self.workers.stop() + finally: + event.accept() + + def update_state(self, namespace, mode): + self.uav_labels[namespace]["status"].setText(f"狀態:{mode}") + + def update_battery(self, namespace, voltage): + self.uav_labels[namespace]["battery"].setText(f"電壓:{voltage:.2f} V") + + def update_gps(self, namespace, latitude, longitude): + self.uav_labels[namespace]["gps"].setText(f"GPS位置: \n Lat:{latitude:.6f}° \n Lon:{longitude:.6f}°") + + def update_gps_status(self, namespace, fix_type, satellites_visible): + fix_type_str = { + 0: "No Fix", + 1: "No Fix", + 2: "2D Fix", + 3: "3D Fix", + 4: "RTK Float", + 5: "RTK Fix" + }.get(fix_type, "Unknown") + self.uav_labels[namespace]["fix"].setText(f"Fix Type:{fix_type_str}") + self.uav_labels[namespace]["satellites"].setText(f"衛星數量:{satellites_visible}") + + def update_local_position(self, namespace, x, y, z): + self.uav_labels[namespace]["local_position"].setText(f"Local:({x:.2f}, {y:.2f}, {z:.2f})") + + def update_imu(self, namespace, pitch): + self.uav_labels[namespace]["pitch"].setText(f"Pitch:{pitch:.2f}°") + + def update_hdg(self, namespace, heading): + self.uav_labels[namespace]["heading"].setText(f"Heading:{heading:.2f}°") + + def update_groundspeed(self, namespace, groundspeed): + self.uav_labels[namespace]["groundspeed"].setText(f"地面速度:{groundspeed:.2f} m/s") + + def update_ping(self, namespace, ping): + self.uav_labels[namespace]["ping"].setText(f"Ping:{ping:.2f} ms") + + def update_loss(self, namespace, loss): + self.uav_labels[namespace]["loss"].setText(f"丟包:{loss:.2f}%") + + def update_frequency(self, namespace, frequency): + self.uav_labels[namespace]["frequency"].setText(f"頻率:{frequency:.2f} Hz") + + def update_kbps(self, namespace, kbps): + self.uav_labels[namespace]["kbps"].setText(f"吞吐量:{kbps:.2f} KB/s") + + def update_param(self, namespace, value): + self.uav_labels[namespace]["param"].setText(f"SR1_EXTRA1:{value}") + + def change_mode(self): + try: + selected_mode = self.modeComboBox.currentText() + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_mode(namespace, selected_mode) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"模式切換失敗:{str(e)}") + + def arm_drone(self): + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.arm(namespace, True) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"解鎖失敗:{e}") + + def takeoff_drone(self): + try: + z_text = self.positionZ.text().strip() + z = float(z_text) if z_text else 5.0 + h = 2 + for i, namespace in enumerate(self.namespaces): + if self.uav_checkboxes[namespace].isChecked(): + self.workers.takeoff(namespace, z + h * i) + + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + + def land_drone(self): + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.land(namespace) + + def set_local_position(self): + try: + x = float(self.positionX.text().strip()) + y = float(self.positionY.text().strip()) + z = float(self.positionZ.text().strip()) + h = 2 + for i, namespace in enumerate(self.namespaces): + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_local_position(namespace, x, y, z + h * i) + + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + + def reboot_drone(self): + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.reboot_drone(namespace) + except Exception as e: + QtWidgets.QMessageBox.warning(self, "錯誤", f"重啟失敗:{e}") + + def set_SR1_EXTRA1(self): + param_value = self.paramInput.text().strip() + try: + for namespace in self.namespaces: + if self.uav_checkboxes[namespace].isChecked(): + self.workers.set_param(namespace, "SR1_EXTRA1", param_value) + self.workers.request_param(namespace, "SR1_EXTRA1") + except ValueError: + QtWidgets.QMessageBox.warning(self, "錯誤", "請輸入有效的數值!") + +def main(): + app = QtWidgets.QApplication(sys.argv) + window = DroneControlApp() + window.show() + sys.exit(app.exec_()) + +if __name__ == '__main__': + main() \ No newline at end of file