Gazebo物件生成

chiyu
wenchun 1 year ago
parent a8968e25b3
commit 4f9e6fa51e

@ -0,0 +1,174 @@
#!/usr/bin/env python3
import subprocess
import argparse
import time
import psutil
import h5py
import numpy as np
class GazeboModelManager:
def __init__(self):
self.base_cmd = "gz service -s /world/map"
self.timeout = 5000
def add_iris(self, number, x_pos, y_pos, yaw_angle):
cmd = f'''gz service -s /world/map/create --reqtype gz.msgs.EntityFactory --reptype gz.msgs.Boolean --timeout 5000 --req 'sdf: "<?xml version=\\"1.0\\" ?><sdf version=\\"1.9\\"><model name=\\"iris{number}\\"><include><uri>model://iris{number}</uri><pose degrees=\\"true\\">{x_pos} {y_pos} 0.195 0 0 {yaw_angle}</pose></include></model></sdf>"' '''
try:
print(f"正在新增 iris{number}...")
print(f"位置: x={x_pos}, y={y_pos}, yaw={yaw_angle}")
subprocess.run(cmd, shell=True, check=True)
print(f"成功新增 iris{number}")
sitl_instance = number - 1
port = 14550 + sitl_instance
sitl_cmd = f"sim_vehicle.py -v ArduCopter -f gazebo-iris{sitl_instance} -L NCHU --model JSON --map --console -I{sitl_instance} --out=127.0.0.1:{port}"
terminal_cmd = f'gnome-terminal -- bash -c "{sitl_cmd}; exec bash"'
print(f"正在啟動 SITL...")
subprocess.Popen(terminal_cmd, shell=True)
return True
except subprocess.CalledProcessError as e:
print(f"新增失敗: {e}")
return False
def cleanup_sitl(self, number):
instance_number = number - 1
print(f"正在清理 SITL 進程 {number}...")
cleanup_commands = [
f"pkill -f 'sim_vehicle.py.*-I{instance_number}'",
f"pkill -f 'arducopter.*-I{instance_number}'",
"pkill -f 'mavproxy.py.*map'",
"pkill -f 'mavproxy.py.*console'",
f"pkill -f 'mavproxy.*{instance_number}'",
"pkill -f 'python.*map.py'",
"pkill -f 'MAVProxy.py.*map'"
]
for cmd in cleanup_commands:
try:
subprocess.run(cmd, shell=True)
except subprocess.CalledProcessError:
pass
time.sleep(2)
print(f"SITL 進程清理完成")
def remove_iris(self, number):
cmd = f'gz service -s /world/map/remove --reqtype gz.msgs.Entity --reptype gz.msgs.Boolean --timeout 1000 --req \'name: "iris{number}"\''
try:
print(f"正在移除 iris{number}...")
subprocess.run(cmd, shell=True, check=True)
print(f"成功移除 iris{number}")
return True
except subprocess.CalledProcessError as e:
print(f"移除失敗: {e}")
return False
def add_obstacle(self, number, x_pos, y_pos, radius=0.5, height=2.0):
cmd = f'''gz service -s /world/map/create --reqtype gz.msgs.EntityFactory --reptype gz.msgs.Boolean --timeout 5000 --req 'sdf: "<?xml version=\\"1.0\\" ?><sdf version=\\"1.9\\"><model name=\\"obstacle_{number}\\"><static>true</static><pose>{x_pos} {y_pos} {height/2} 0 0 0</pose><link name=\\"link\\"><collision name=\\"collision\\"><geometry><cylinder><radius>{radius}</radius><length>{height}</length></cylinder></geometry></collision><visual name=\\"visual\\"><geometry><cylinder><radius>{radius}</radius><length>{height}</length></cylinder></geometry><material><ambient>0.3 0.3 0.3 1</ambient><diffuse>0.7 0.7 0.7 1</diffuse></material></visual></link></model></sdf>"' '''
try:
print(f"正在新增障礙物 {number}...")
print(f"位置: x={x_pos}, y={y_pos}, radius={radius}, height={height}")
subprocess.run(cmd, shell=True, check=True)
return True
except subprocess.CalledProcessError as e:
print(f"新增失敗: {e}")
return False
def remove_obstacle(self, number):
cmd = f'gz service -s /world/map/remove --reqtype gz.msgs.Entity --reptype gz.msgs.Boolean --timeout 1000 --req \'name: "obstacle_{number}"\''
try:
print(f"正在移除障礙物 {number}...")
subprocess.run(cmd, shell=True, check=True)
print(f"成功移除障礙物 {number}")
return True
except subprocess.CalledProcessError as e:
print(f"移除失敗: {e}")
return False
def add_obstacles_from_mat(self, mat_file):
"""從 .mat 檔案逐一生成障礙物"""
try:
with h5py.File(mat_file, 'r') as f:
x_data = f['#refs#/d'][()]
y_data = f['#refs#/e'][()]
z_data = f['#refs#/f'][()]
radius_data = f['#refs#/g'][()]
height_data = f['#refs#/h'][()]
data_matrix = np.vstack((x_data, y_data, z_data, radius_data, height_data)).T
for i, obstacle in enumerate(data_matrix):
x, y, _, radius, height = obstacle
success = self.add_obstacle(i+1, float(x), float(y), float(radius), float(height))
if not success:
print(f"警告:障礙物 {i+1} 新增失敗")
time.sleep(0.1)
print("所有障礙物新增完成")
return True
except Exception as e:
print(f"錯誤:讀取或處理 .mat 檔案時發生問題: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Gazebo Iris and Obstacle Manager')
parser.add_argument('action', choices=['add', 'remove', 'cleanup', 'add_from_mat'],
help='Action to perform (add, remove, cleanup, or add_from_mat)')
parser.add_argument('type', choices=['iris', 'obstacle'],
help='Type of object to manage (iris or obstacle)')
parser.add_argument('number', type=int, nargs='?',
help='Object number (e.g., 4 for iris4 or obstacle_4)')
parser.add_argument('--x-pos', type=float, default=15.0,
help='X position for new object')
parser.add_argument('--y-pos', type=float, default=0.0,
help='Y position for new object')
parser.add_argument('--yaw', type=float, default=90.0,
help='Yaw angle in degrees for iris')
parser.add_argument('--radius', type=float, default=0.5,
help='Radius for obstacle cylinder')
parser.add_argument('--height', type=float, default=2.0,
help='Height for obstacle cylinder')
parser.add_argument('--mat-file', type=str,
help='Path to .mat file containing obstacles data')
args = parser.parse_args()
manager = GazeboModelManager()
if args.action == 'add_from_mat':
if args.type == 'obstacle' and args.mat_file:
manager.add_obstacles_from_mat(args.mat_file)
else:
print("錯誤:使用 add_from_mat 時需指定 --mat-file 參數")
elif args.action == 'add':
if args.number is None:
print("錯誤:使用 add 時需指定 number 參數")
return
if args.type == 'iris':
manager.add_iris(args.number, args.x_pos, args.y_pos, args.yaw)
else: # obstacle
manager.add_obstacle(args.number, args.x_pos, args.y_pos, args.radius, args.height)
elif args.action == 'cleanup':
if args.number is None:
print("錯誤:使用 cleanup 時需指定 number 參數")
return
if args.type == 'iris':
manager.cleanup_sitl(args.number)
else: # remove
if args.number is None:
print("錯誤:使用 remove 時需指定 number 參數")
return
if args.type == 'iris':
manager.remove_iris(args.number)
else:
manager.remove_obstacle(args.number)
if __name__ == "__main__":
main()
Loading…
Cancel
Save