Merge branch 'wenchun'

march 11 alignment
chiyu
Chiyu Chen 1 year ago
commit d21bc6b069

@ -0,0 +1,174 @@
#!/usr/bin/env python3
import subprocess
import argparse
import time
import psutil
import h5py
import numpy as np
class GazeboModelManager:
def __init__(self):
self.base_cmd = "gz service -s /world/map"
self.timeout = 5000
def add_iris(self, number, x_pos, y_pos, yaw_angle):
cmd = f'''gz service -s /world/map/create --reqtype gz.msgs.EntityFactory --reptype gz.msgs.Boolean --timeout 5000 --req 'sdf: "<?xml version=\\"1.0\\" ?><sdf version=\\"1.9\\"><model name=\\"iris{number}\\"><include><uri>model://iris{number}</uri><pose degrees=\\"true\\">{x_pos} {y_pos} 0.195 0 0 {yaw_angle}</pose></include></model></sdf>"' '''
try:
print(f"正在新增 iris{number}...")
print(f"位置: x={x_pos}, y={y_pos}, yaw={yaw_angle}")
subprocess.run(cmd, shell=True, check=True)
print(f"成功新增 iris{number}")
sitl_instance = number - 1
port = 14550 + sitl_instance
sitl_cmd = f"sim_vehicle.py -v ArduCopter -f gazebo-iris{sitl_instance} -L NCHU --model JSON --map --console -I{sitl_instance} --out=127.0.0.1:{port}"
terminal_cmd = f'gnome-terminal -- bash -c "{sitl_cmd}; exec bash"'
print(f"正在啟動 SITL...")
subprocess.Popen(terminal_cmd, shell=True)
return True
except subprocess.CalledProcessError as e:
print(f"新增失敗: {e}")
return False
def cleanup_sitl(self, number):
instance_number = number - 1
print(f"正在清理 SITL 進程 {number}...")
cleanup_commands = [
f"pkill -f 'sim_vehicle.py.*-I{instance_number}'",
f"pkill -f 'arducopter.*-I{instance_number}'",
"pkill -f 'mavproxy.py.*map'",
"pkill -f 'mavproxy.py.*console'",
f"pkill -f 'mavproxy.*{instance_number}'",
"pkill -f 'python.*map.py'",
"pkill -f 'MAVProxy.py.*map'"
]
for cmd in cleanup_commands:
try:
subprocess.run(cmd, shell=True)
except subprocess.CalledProcessError:
pass
time.sleep(2)
print(f"SITL 進程清理完成")
def remove_iris(self, number):
cmd = f'gz service -s /world/map/remove --reqtype gz.msgs.Entity --reptype gz.msgs.Boolean --timeout 1000 --req \'name: "iris{number}"\''
try:
print(f"正在移除 iris{number}...")
subprocess.run(cmd, shell=True, check=True)
print(f"成功移除 iris{number}")
return True
except subprocess.CalledProcessError as e:
print(f"移除失敗: {e}")
return False
def add_obstacle(self, number, x_pos, y_pos, radius=0.5, height=2.0):
cmd = f'''gz service -s /world/map/create --reqtype gz.msgs.EntityFactory --reptype gz.msgs.Boolean --timeout 5000 --req 'sdf: "<?xml version=\\"1.0\\" ?><sdf version=\\"1.9\\"><model name=\\"obstacle_{number}\\"><static>true</static><pose>{x_pos} {y_pos} {height/2} 0 0 0</pose><link name=\\"link\\"><collision name=\\"collision\\"><geometry><cylinder><radius>{radius}</radius><length>{height}</length></cylinder></geometry></collision><visual name=\\"visual\\"><geometry><cylinder><radius>{radius}</radius><length>{height}</length></cylinder></geometry><material><ambient>0.3 0.3 0.3 1</ambient><diffuse>0.7 0.7 0.7 1</diffuse></material></visual></link></model></sdf>"' '''
try:
print(f"正在新增障礙物 {number}...")
print(f"位置: x={x_pos}, y={y_pos}, radius={radius}, height={height}")
subprocess.run(cmd, shell=True, check=True)
return True
except subprocess.CalledProcessError as e:
print(f"新增失敗: {e}")
return False
def remove_obstacle(self, number):
cmd = f'gz service -s /world/map/remove --reqtype gz.msgs.Entity --reptype gz.msgs.Boolean --timeout 1000 --req \'name: "obstacle_{number}"\''
try:
print(f"正在移除障礙物 {number}...")
subprocess.run(cmd, shell=True, check=True)
print(f"成功移除障礙物 {number}")
return True
except subprocess.CalledProcessError as e:
print(f"移除失敗: {e}")
return False
def add_obstacles_from_mat(self, mat_file):
"""從 .mat 檔案逐一生成障礙物"""
try:
with h5py.File(mat_file, 'r') as f:
x_data = f['#refs#/d'][()]
y_data = f['#refs#/e'][()]
z_data = f['#refs#/f'][()]
radius_data = f['#refs#/g'][()]
height_data = f['#refs#/h'][()]
data_matrix = np.vstack((x_data, y_data, z_data, radius_data, height_data)).T
for i, obstacle in enumerate(data_matrix):
x, y, _, radius, height = obstacle
success = self.add_obstacle(i+1, float(x), float(y), float(radius), float(height))
if not success:
print(f"警告:障礙物 {i+1} 新增失敗")
time.sleep(0.1)
print("所有障礙物新增完成")
return True
except Exception as e:
print(f"錯誤:讀取或處理 .mat 檔案時發生問題: {e}")
return False
def main():
parser = argparse.ArgumentParser(description='Gazebo Iris and Obstacle Manager')
parser.add_argument('action', choices=['add', 'remove', 'cleanup', 'add_from_mat'],
help='Action to perform (add, remove, cleanup, or add_from_mat)')
parser.add_argument('type', choices=['iris', 'obstacle'],
help='Type of object to manage (iris or obstacle)')
parser.add_argument('number', type=int, nargs='?',
help='Object number (e.g., 4 for iris4 or obstacle_4)')
parser.add_argument('--x-pos', type=float, default=15.0,
help='X position for new object')
parser.add_argument('--y-pos', type=float, default=0.0,
help='Y position for new object')
parser.add_argument('--yaw', type=float, default=90.0,
help='Yaw angle in degrees for iris')
parser.add_argument('--radius', type=float, default=0.5,
help='Radius for obstacle cylinder')
parser.add_argument('--height', type=float, default=2.0,
help='Height for obstacle cylinder')
parser.add_argument('--mat-file', type=str,
help='Path to .mat file containing obstacles data')
args = parser.parse_args()
manager = GazeboModelManager()
if args.action == 'add_from_mat':
if args.type == 'obstacle' and args.mat_file:
manager.add_obstacles_from_mat(args.mat_file)
else:
print("錯誤:使用 add_from_mat 時需指定 --mat-file 參數")
elif args.action == 'add':
if args.number is None:
print("錯誤:使用 add 時需指定 number 參數")
return
if args.type == 'iris':
manager.add_iris(args.number, args.x_pos, args.y_pos, args.yaw)
else: # obstacle
manager.add_obstacle(args.number, args.x_pos, args.y_pos, args.radius, args.height)
elif args.action == 'cleanup':
if args.number is None:
print("錯誤:使用 cleanup 時需指定 number 參數")
return
if args.type == 'iris':
manager.cleanup_sitl(args.number)
else: # remove
if args.number is None:
print("錯誤:使用 remove 時需指定 number 參數")
return
if args.type == 'iris':
manager.remove_iris(args.number)
else:
manager.remove_obstacle(args.number)
if __name__ == "__main__":
main()

@ -0,0 +1,232 @@
#!/usr/bin/env python3
from pymavlink import mavutil
import time
import math
import threading
class DroneController:
def __init__(self, connection_string="udp:127.0.0.1:14550"):
# 建立 MAVLink 連接
self.master = mavutil.mavlink_connection(connection_string)
# 等待心跳包
self.master.wait_heartbeat()
print("已建立連接!")
def set_mode(self, mode):
"""設置飛行模式"""
mode_id = self.master.mode_mapping()[mode]
self.master.mav.set_mode_send(
self.master.target_system,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
mode_id
)
for _ in range(5):
msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1)
if msg and msg.custom_mode == mode_id:
return True
return False
def arm(self, arm_state=True):
"""解鎖/上鎖無人機"""
self.master.mav.command_long_send(
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0,
1 if arm_state else 0, 0, 0, 0, 0, 0, 0
)
for _ in range(5):
msg = self.master.recv_match(type='HEARTBEAT', blocking=True, timeout=1)
if msg:
armed = msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED
if armed == arm_state:
return True
return False
def takeoff(self, altitude):
"""起飛到指定高度"""
self.master.mav.command_long_send(
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
0,
0, 0, 0, 0, 0, 0,
altitude
)
while True:
msg = self.master.recv_match(type='GLOBAL_POSITION_INT', blocking=True)
if msg:
current_alt = msg.relative_alt / 1000.0
if abs(current_alt - altitude) < 0.5:
return True
time.sleep(0.1)
def goto_position_local(self, x, y, z, vx=2, vy=2, vz=0, yaw=0, yaw_rate=0):
"""
飛到指定的本地座標位置
x, y, z: 目標位置(公尺)
"""
# NED系統中向上飛需要給負值
z = -z # 將輸入的z值轉換為NED系統
# 重要type_mask 設為 0 表示使用該維度
# 我們只想使用位置控制 (x,y,z),所以其他都設為 ignore
type_mask = (
# mavutil.mavlink.POSITION_TARGET_TYPEMASK_VX_IGNORE | # 8
# mavutil.mavlink.POSITION_TARGET_TYPEMASK_VY_IGNORE | # 16
# mavutil.mavlink.POSITION_TARGET_TYPEMASK_VZ_IGNORE | # 32
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE | # 64
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE | # 128
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE | # 256
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE | # 1024
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE # 2048
)
# 或直接使用二進制: type_mask = 0b0000111111111000
print(f"發送位置指令: x={x}, y={y}, z={-z}")
self.master.mav.set_position_target_local_ned_send(
0, # timestamp
self.master.target_system,
self.master.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
type_mask,
x, y, z, # 位置
0, 0, 0, # 速度 (設為 0)
5, 5, 0, # 加速度 (設為 0)
0, 0 # yaw, yaw_rate (設為 0)
)
# 監控位置變化
print("開始監控位置變化...")
start_time = time.time()
while time.time() - start_time < 20: # 20秒超時
msg = self.master.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1)
if msg:
dist = math.sqrt((msg.x - x)**2 + (msg.y - y)**2 + (msg.z - z)**2)
print(f"當前位置: x={msg.x:.2f}, y={msg.y:.2f}, z={-msg.z:.2f}, 距離目標: {dist:.2f}m")
if dist < 0.5:
print("到達目標位置")
return True
time.sleep(0.1)
print("移動超時")
return False
def get_current_state(self):
"""獲取當前狀態"""
msg = self.master.recv_match(type='HEARTBEAT', blocking=True)
if msg:
mode = list(self.master.mode_mapping().keys())[list(self.master.mode_mapping().values()).index(msg.custom_mode)]
armed = bool(msg.base_mode & mavutil.mavlink.MAV_MODE_FLAG_SAFETY_ARMED)
return {
'mode': mode,
'armed': armed
}
return None
def print_menu():
"""列印功能表"""
menu = [
"\n=== 無人機控制界面 ===",
"1. 切換至 GUIDED 模式",
"2. 切換至 LAND 模式",
"3. 解鎖",
"4. 上鎖",
"5. 起飛",
"6. 飛到指定位置",
"7. 切換至 RTL 模式(返航)",
"8. 查看當前狀態",
"9. 退出",
"0. 顯示此清單",
"===================="
]
for item in menu:
print(item)
def main():
drone = DroneController()
print_menu()
while True:
try:
choice = input("\n請輸入指令編號: ")
if choice == '0':
print_menu()
elif choice == '1':
if drone.set_mode("GUIDED"):
print("已切換至 GUIDED 模式")
else:
print("切換模式失敗")
elif choice == '2':
if drone.set_mode("LAND"):
print("已切換至 LAND 模式")
else:
print("切換模式失敗")
elif choice == '3':
if drone.arm(True):
print("無人機已解鎖")
else:
print("解鎖失敗")
elif choice == '4':
if drone.arm(False):
print("無人機已上鎖")
else:
print("上鎖失敗")
elif choice == '5':
altitude = float(input("請輸入起飛高度(公尺): "))
if drone.takeoff(altitude):
print(f"已到達目標高度: {altitude}公尺")
else:
print("起飛失敗")
elif choice == '6':
x = float(input("請輸入X座標(公尺): "))
y = float(input("請輸入Y座標(公尺): "))
z = float(input("請輸入Z座標(公尺,負值代表向下): "))
if drone.goto_position_local(x, y, z):
print("已到達目標位置")
else:
print("移動失敗")
elif choice == '7':
if drone.set_mode("RTL"):
print("已切換至 RTL 模式,無人機正在返航")
else:
print("切換至 RTL 模式失敗")
elif choice == '8':
state = drone.get_current_state()
if state:
print("\n=== 無人機狀態 ===")
print(f"模式: {state['mode']}")
print(f"解鎖狀態: {'已解鎖' if state['armed'] else '已上鎖'}")
print("==================")
else:
print("無法獲取狀態")
elif choice == '9':
print("程式結束")
break
else:
print("無效的指令,請重新輸入")
except ValueError as e:
print(f"輸入錯誤: {e}")
except Exception as e:
print(f"發生錯誤: {e}")
if __name__ == "__main__":
main()
Loading…
Cancel
Save