From 4f9e6fa51eed47ea5d3033a496a35fe8c91d7e01 Mon Sep 17 00:00:00 2001 From: wenchun Date: Wed, 26 Feb 2025 11:13:15 +0800 Subject: [PATCH] =?UTF-8?q?Gazebo=E7=89=A9=E4=BB=B6=E7=94=9F=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/unitdev03/gazebo_native_manager.py | 174 +++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 src/unitdev03/gazebo_native_manager.py diff --git a/src/unitdev03/gazebo_native_manager.py b/src/unitdev03/gazebo_native_manager.py new file mode 100644 index 0000000..8699188 --- /dev/null +++ b/src/unitdev03/gazebo_native_manager.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +import subprocess +import argparse +import time +import psutil +import h5py +import numpy as np + +class GazeboModelManager: + def __init__(self): + self.base_cmd = "gz service -s /world/map" + self.timeout = 5000 + + def add_iris(self, number, x_pos, y_pos, yaw_angle): + cmd = f'''gz service -s /world/map/create --reqtype gz.msgs.EntityFactory --reptype gz.msgs.Boolean --timeout 5000 --req 'sdf: "model://iris{number}{x_pos} {y_pos} 0.195 0 0 {yaw_angle}"' ''' + + try: + print(f"正在新增 iris{number}...") + print(f"位置: x={x_pos}, y={y_pos}, yaw={yaw_angle}度") + subprocess.run(cmd, shell=True, check=True) + print(f"成功新增 iris{number}") + + sitl_instance = number - 1 + port = 14550 + sitl_instance + + sitl_cmd = f"sim_vehicle.py -v ArduCopter -f gazebo-iris{sitl_instance} -L NCHU --model JSON --map --console -I{sitl_instance} --out=127.0.0.1:{port}" + terminal_cmd = f'gnome-terminal -- bash -c "{sitl_cmd}; exec bash"' + print(f"正在啟動 SITL...") + subprocess.Popen(terminal_cmd, shell=True) + + return True + except subprocess.CalledProcessError as e: + print(f"新增失敗: {e}") + return False + + def cleanup_sitl(self, number): + instance_number = number - 1 + print(f"正在清理 SITL 進程 {number}...") + + cleanup_commands = [ + f"pkill -f 'sim_vehicle.py.*-I{instance_number}'", + f"pkill -f 'arducopter.*-I{instance_number}'", + "pkill -f 'mavproxy.py.*map'", + "pkill -f 'mavproxy.py.*console'", + f"pkill -f 'mavproxy.*{instance_number}'", + "pkill -f 'python.*map.py'", + "pkill -f 'MAVProxy.py.*map'" + ] + + for cmd in cleanup_commands: + try: + subprocess.run(cmd, shell=True) + except subprocess.CalledProcessError: + pass + + time.sleep(2) + print(f"SITL 進程清理完成") + + def remove_iris(self, number): + cmd = f'gz service -s /world/map/remove --reqtype gz.msgs.Entity --reptype gz.msgs.Boolean --timeout 1000 --req \'name: "iris{number}"\'' + + try: + print(f"正在移除 iris{number}...") + subprocess.run(cmd, shell=True, check=True) + print(f"成功移除 iris{number}") + return True + except subprocess.CalledProcessError as e: + print(f"移除失敗: {e}") + return False + + def add_obstacle(self, number, x_pos, y_pos, radius=0.5, height=2.0): + cmd = f'''gz service -s /world/map/create --reqtype gz.msgs.EntityFactory --reptype gz.msgs.Boolean --timeout 5000 --req 'sdf: "true{x_pos} {y_pos} {height/2} 0 0 0{radius}{height}{radius}{height}0.3 0.3 0.3 10.7 0.7 0.7 1"' ''' + + try: + print(f"正在新增障礙物 {number}...") + print(f"位置: x={x_pos}, y={y_pos}, radius={radius}, height={height}") + subprocess.run(cmd, shell=True, check=True) + return True + except subprocess.CalledProcessError as e: + print(f"新增失敗: {e}") + return False + + def remove_obstacle(self, number): + cmd = f'gz service -s /world/map/remove --reqtype gz.msgs.Entity --reptype gz.msgs.Boolean --timeout 1000 --req \'name: "obstacle_{number}"\'' + + try: + print(f"正在移除障礙物 {number}...") + subprocess.run(cmd, shell=True, check=True) + print(f"成功移除障礙物 {number}") + return True + except subprocess.CalledProcessError as e: + print(f"移除失敗: {e}") + return False + + def add_obstacles_from_mat(self, mat_file): + """從 .mat 檔案逐一生成障礙物""" + try: + with h5py.File(mat_file, 'r') as f: + x_data = f['#refs#/d'][()] + y_data = f['#refs#/e'][()] + z_data = f['#refs#/f'][()] + radius_data = f['#refs#/g'][()] + height_data = f['#refs#/h'][()] + + data_matrix = np.vstack((x_data, y_data, z_data, radius_data, height_data)).T + + for i, obstacle in enumerate(data_matrix): + x, y, _, radius, height = obstacle + success = self.add_obstacle(i+1, float(x), float(y), float(radius), float(height)) + if not success: + print(f"警告:障礙物 {i+1} 新增失敗") + time.sleep(0.1) + + print("所有障礙物新增完成") + return True + + except Exception as e: + print(f"錯誤:讀取或處理 .mat 檔案時發生問題: {e}") + return False + +def main(): + parser = argparse.ArgumentParser(description='Gazebo Iris and Obstacle Manager') + parser.add_argument('action', choices=['add', 'remove', 'cleanup', 'add_from_mat'], + help='Action to perform (add, remove, cleanup, or add_from_mat)') + parser.add_argument('type', choices=['iris', 'obstacle'], + help='Type of object to manage (iris or obstacle)') + parser.add_argument('number', type=int, nargs='?', + help='Object number (e.g., 4 for iris4 or obstacle_4)') + parser.add_argument('--x-pos', type=float, default=15.0, + help='X position for new object') + parser.add_argument('--y-pos', type=float, default=0.0, + help='Y position for new object') + parser.add_argument('--yaw', type=float, default=90.0, + help='Yaw angle in degrees for iris') + parser.add_argument('--radius', type=float, default=0.5, + help='Radius for obstacle cylinder') + parser.add_argument('--height', type=float, default=2.0, + help='Height for obstacle cylinder') + parser.add_argument('--mat-file', type=str, + help='Path to .mat file containing obstacles data') + + args = parser.parse_args() + manager = GazeboModelManager() + + if args.action == 'add_from_mat': + if args.type == 'obstacle' and args.mat_file: + manager.add_obstacles_from_mat(args.mat_file) + else: + print("錯誤:使用 add_from_mat 時需指定 --mat-file 參數") + elif args.action == 'add': + if args.number is None: + print("錯誤:使用 add 時需指定 number 參數") + return + if args.type == 'iris': + manager.add_iris(args.number, args.x_pos, args.y_pos, args.yaw) + else: # obstacle + manager.add_obstacle(args.number, args.x_pos, args.y_pos, args.radius, args.height) + elif args.action == 'cleanup': + if args.number is None: + print("錯誤:使用 cleanup 時需指定 number 參數") + return + if args.type == 'iris': + manager.cleanup_sitl(args.number) + else: # remove + if args.number is None: + print("錯誤:使用 remove 時需指定 number 參數") + return + if args.type == 'iris': + manager.remove_iris(args.number) + else: + manager.remove_obstacle(args.number) + +if __name__ == "__main__": + main() \ No newline at end of file