diff --git a/src/unitdev03/gazebo_native_manager.py b/src/unitdev03/gazebo_native_manager.py new file mode 100644 index 0000000..8699188 --- /dev/null +++ b/src/unitdev03/gazebo_native_manager.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +import subprocess +import argparse +import time +import psutil +import h5py +import numpy as np + +class GazeboModelManager: + def __init__(self): + self.base_cmd = "gz service -s /world/map" + self.timeout = 5000 + + def add_iris(self, number, x_pos, y_pos, yaw_angle): + cmd = f'''gz service -s /world/map/create --reqtype gz.msgs.EntityFactory --reptype gz.msgs.Boolean --timeout 5000 --req 'sdf: "model://iris{number}{x_pos} {y_pos} 0.195 0 0 {yaw_angle}"' ''' + + try: + print(f"正在新增 iris{number}...") + print(f"位置: x={x_pos}, y={y_pos}, yaw={yaw_angle}度") + subprocess.run(cmd, shell=True, check=True) + print(f"成功新增 iris{number}") + + sitl_instance = number - 1 + port = 14550 + sitl_instance + + sitl_cmd = f"sim_vehicle.py -v ArduCopter -f gazebo-iris{sitl_instance} -L NCHU --model JSON --map --console -I{sitl_instance} --out=127.0.0.1:{port}" + terminal_cmd = f'gnome-terminal -- bash -c "{sitl_cmd}; exec bash"' + print(f"正在啟動 SITL...") + subprocess.Popen(terminal_cmd, shell=True) + + return True + except subprocess.CalledProcessError as e: + print(f"新增失敗: {e}") + return False + + def cleanup_sitl(self, number): + instance_number = number - 1 + print(f"正在清理 SITL 進程 {number}...") + + cleanup_commands = [ + f"pkill -f 'sim_vehicle.py.*-I{instance_number}'", + f"pkill -f 'arducopter.*-I{instance_number}'", + "pkill -f 'mavproxy.py.*map'", + "pkill -f 'mavproxy.py.*console'", + f"pkill -f 'mavproxy.*{instance_number}'", + "pkill -f 'python.*map.py'", + "pkill -f 'MAVProxy.py.*map'" + ] + + for cmd in cleanup_commands: + try: + subprocess.run(cmd, shell=True) + except subprocess.CalledProcessError: + pass + + time.sleep(2) + print(f"SITL 進程清理完成") + + def remove_iris(self, number): + cmd = f'gz service -s /world/map/remove --reqtype gz.msgs.Entity --reptype gz.msgs.Boolean --timeout 1000 --req \'name: "iris{number}"\'' + + try: + print(f"正在移除 iris{number}...") + subprocess.run(cmd, shell=True, check=True) + print(f"成功移除 iris{number}") + return True + except subprocess.CalledProcessError as e: + print(f"移除失敗: {e}") + return False + + def add_obstacle(self, number, x_pos, y_pos, radius=0.5, height=2.0): + cmd = f'''gz service -s /world/map/create --reqtype gz.msgs.EntityFactory --reptype gz.msgs.Boolean --timeout 5000 --req 'sdf: "true{x_pos} {y_pos} {height/2} 0 0 0{radius}{height}{radius}{height}0.3 0.3 0.3 10.7 0.7 0.7 1"' ''' + + try: + print(f"正在新增障礙物 {number}...") + print(f"位置: x={x_pos}, y={y_pos}, radius={radius}, height={height}") + subprocess.run(cmd, shell=True, check=True) + return True + except subprocess.CalledProcessError as e: + print(f"新增失敗: {e}") + return False + + def remove_obstacle(self, number): + cmd = f'gz service -s /world/map/remove --reqtype gz.msgs.Entity --reptype gz.msgs.Boolean --timeout 1000 --req \'name: "obstacle_{number}"\'' + + try: + print(f"正在移除障礙物 {number}...") + subprocess.run(cmd, shell=True, check=True) + print(f"成功移除障礙物 {number}") + return True + except subprocess.CalledProcessError as e: + print(f"移除失敗: {e}") + return False + + def add_obstacles_from_mat(self, mat_file): + """從 .mat 檔案逐一生成障礙物""" + try: + with h5py.File(mat_file, 'r') as f: + x_data = f['#refs#/d'][()] + y_data = f['#refs#/e'][()] + z_data = f['#refs#/f'][()] + radius_data = f['#refs#/g'][()] + height_data = f['#refs#/h'][()] + + data_matrix = np.vstack((x_data, y_data, z_data, radius_data, height_data)).T + + for i, obstacle in enumerate(data_matrix): + x, y, _, radius, height = obstacle + success = self.add_obstacle(i+1, float(x), float(y), float(radius), float(height)) + if not success: + print(f"警告:障礙物 {i+1} 新增失敗") + time.sleep(0.1) + + print("所有障礙物新增完成") + return True + + except Exception as e: + print(f"錯誤:讀取或處理 .mat 檔案時發生問題: {e}") + return False + +def main(): + parser = argparse.ArgumentParser(description='Gazebo Iris and Obstacle Manager') + parser.add_argument('action', choices=['add', 'remove', 'cleanup', 'add_from_mat'], + help='Action to perform (add, remove, cleanup, or add_from_mat)') + parser.add_argument('type', choices=['iris', 'obstacle'], + help='Type of object to manage (iris or obstacle)') + parser.add_argument('number', type=int, nargs='?', + help='Object number (e.g., 4 for iris4 or obstacle_4)') + parser.add_argument('--x-pos', type=float, default=15.0, + help='X position for new object') + parser.add_argument('--y-pos', type=float, default=0.0, + help='Y position for new object') + parser.add_argument('--yaw', type=float, default=90.0, + help='Yaw angle in degrees for iris') + parser.add_argument('--radius', type=float, default=0.5, + help='Radius for obstacle cylinder') + parser.add_argument('--height', type=float, default=2.0, + help='Height for obstacle cylinder') + parser.add_argument('--mat-file', type=str, + help='Path to .mat file containing obstacles data') + + args = parser.parse_args() + manager = GazeboModelManager() + + if args.action == 'add_from_mat': + if args.type == 'obstacle' and args.mat_file: + manager.add_obstacles_from_mat(args.mat_file) + else: + print("錯誤:使用 add_from_mat 時需指定 --mat-file 參數") + elif args.action == 'add': + if args.number is None: + print("錯誤:使用 add 時需指定 number 參數") + return + if args.type == 'iris': + manager.add_iris(args.number, args.x_pos, args.y_pos, args.yaw) + else: # obstacle + manager.add_obstacle(args.number, args.x_pos, args.y_pos, args.radius, args.height) + elif args.action == 'cleanup': + if args.number is None: + print("錯誤:使用 cleanup 時需指定 number 參數") + return + if args.type == 'iris': + manager.cleanup_sitl(args.number) + else: # remove + if args.number is None: + print("錯誤:使用 remove 時需指定 number 參數") + return + if args.type == 'iris': + manager.remove_iris(args.number) + else: + manager.remove_obstacle(args.number) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/unitdev03/pymavlink_01.py b/src/unitdev03/pymavlink_01.py new file mode 100644 index 0000000..dc8ebb1 --- /dev/null +++ b/src/unitdev03/pymavlink_01.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python3 +from pymavlink import mavutil +import time +import math +import threading + +class DroneController: + def __init__(self, connection_string="udp:127.0.0.1:14550"): + # 建立 MAVLink 連接 + self.master = mavutil.mavlink_connection(connection_string) + + # 等待心跳包 + self.master.wait_heartbeat() + print("已建立連接!") + + def set_mode(self, mode): + """設置飛行模式""" + mode_id = self.master.mode_mapping()[mode] + self.master.mav.set_mode_send( + self.master.target_system, + mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, + mode_id + ) + + for _ in range(5): + msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1) + if msg and msg.custom_mode == mode_id: + return True + return False + + def arm(self, arm_state=True): + """解鎖/上鎖無人機""" + self.master.mav.command_long_send( + self.master.target_system, + self.master.target_component, + mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM, + 0, + 1 if arm_state else 0, 0, 0, 0, 0, 0, 0 + ) + + for _ in range(5): + msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1) + if msg: + armed = msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED + if armed == arm_state: + return True + return False + + def takeoff(self, altitude): + """起飛到指定高度""" + self.master.mav.command_long_send( + self.master.target_system, + self.master.target_component, + mavutil.mavlink.MAV_CMD_NAV_TAKEOFF, + 0, + 0, 0, 0, 0, 0, 0, + altitude + ) + + while True: + msg = self.master.recv_match(type='GLOBAL_POSITION_INT', blocking=True) + if msg: + current_alt = msg.relative_alt / 1000.0 + if abs(current_alt - altitude) < 0.5: + return True + time.sleep(0.1) + + def goto_position_local(self, x, y, z, vx=2, vy=2, vz=0, yaw=0, yaw_rate=0): + """ + 飛到指定的本地座標位置 + x, y, z: 目標位置(公尺) + """ + # NED系統中,向上飛需要給負值 + z = -z # 將輸入的z值轉換為NED系統 + + # 重要:type_mask 設為 0 表示使用該維度 + # 我們只想使用位置控制 (x,y,z),所以其他都設為 ignore + type_mask = ( + # mavutil.mavlink.POSITION_TARGET_TYPEMASK_VX_IGNORE | # 8 + # mavutil.mavlink.POSITION_TARGET_TYPEMASK_VY_IGNORE | # 16 + # mavutil.mavlink.POSITION_TARGET_TYPEMASK_VZ_IGNORE | # 32 + mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE | # 64 + mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE | # 128 + mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE | # 256 + mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE | # 1024 + mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE # 2048 + ) + + # 或直接使用二進制: type_mask = 0b0000111111111000 + + print(f"發送位置指令: x={x}, y={y}, z={-z}") + + self.master.mav.set_position_target_local_ned_send( + 0, # timestamp + self.master.target_system, + self.master.target_component, + mavutil.mavlink.MAV_FRAME_LOCAL_NED, + type_mask, + x, y, z, # 位置 + 0, 0, 0, # 速度 (設為 0) + 5, 5, 0, # 加速度 (設為 0) + 0, 0 # yaw, yaw_rate (設為 0) + ) + + # 監控位置變化 + print("開始監控位置變化...") + start_time = time.time() + while time.time() - start_time < 20: # 20秒超時 + msg = self.master.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1) + if msg: + dist = math.sqrt((msg.x - x)**2 + (msg.y - y)**2 + (msg.z - z)**2) + print(f"當前位置: x={msg.x:.2f}, y={msg.y:.2f}, z={-msg.z:.2f}, 距離目標: {dist:.2f}m") + if dist < 0.5: + print("到達目標位置") + return True + time.sleep(0.1) + + print("移動超時") + return False + + def get_current_state(self): + """獲取當前狀態""" + msg = self.master.recv_match(type='HEARTBEAT', blocking=True) + if msg: + mode = list(self.master.mode_mapping().keys())[list(self.master.mode_mapping().values()).index(msg.custom_mode)] + armed = bool(msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED) + return { + 'mode': mode, + 'armed': armed + } + return None + +def print_menu(): + """列印功能表""" + menu = [ + "\n=== 無人機控制界面 ===", + "1. 切換至 GUIDED 模式", + "2. 切換至 LAND 模式", + "3. 解鎖", + "4. 上鎖", + "5. 起飛", + "6. 飛到指定位置", + "7. 切換至 RTL 模式(返航)", + "8. 查看當前狀態", + "9. 退出", + "0. 顯示此清單", + "====================" + ] + for item in menu: + print(item) + +def main(): + drone = DroneController() + print_menu() + + while True: + try: + choice = input("\n請輸入指令編號: ") + + if choice == '0': + print_menu() + + elif choice == '1': + if drone.set_mode("GUIDED"): + print("已切換至 GUIDED 模式") + else: + print("切換模式失敗") + + elif choice == '2': + if drone.set_mode("LAND"): + print("已切換至 LAND 模式") + else: + print("切換模式失敗") + + elif choice == '3': + if drone.arm(True): + print("無人機已解鎖") + else: + print("解鎖失敗") + + elif choice == '4': + if drone.arm(False): + print("無人機已上鎖") + else: + print("上鎖失敗") + + elif choice == '5': + altitude = float(input("請輸入起飛高度(公尺): ")) + if drone.takeoff(altitude): + print(f"已到達目標高度: {altitude}公尺") + else: + print("起飛失敗") + + elif choice == '6': + x = float(input("請輸入X座標(公尺): ")) + y = float(input("請輸入Y座標(公尺): ")) + z = float(input("請輸入Z座標(公尺,負值代表向下): ")) + if drone.goto_position_local(x, y, z): + print("已到達目標位置") + else: + print("移動失敗") + + elif choice == '7': + if drone.set_mode("RTL"): + print("已切換至 RTL 模式,無人機正在返航") + else: + print("切換至 RTL 模式失敗") + + elif choice == '8': + state = drone.get_current_state() + if state: + print("\n=== 無人機狀態 ===") + print(f"模式: {state['mode']}") + print(f"解鎖狀態: {'已解鎖' if state['armed'] else '已上鎖'}") + print("==================") + else: + print("無法獲取狀態") + + elif choice == '9': + print("程式結束") + break + + else: + print("無效的指令,請重新輸入") + + except ValueError as e: + print(f"輸入錯誤: {e}") + except Exception as e: + print(f"發生錯誤: {e}") + +if __name__ == "__main__": + main() \ No newline at end of file