first commit
parent
af4d879dd4
commit
5f32c30239
@ -0,0 +1,200 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from mavros_msgs.srv import CommandBool, SetMode, CommandTOL
|
||||
from mavros_msgs.msg import State
|
||||
from geometry_msgs.msg import PoseStamped,Quaternion
|
||||
import time
|
||||
import math
|
||||
|
||||
class InteractiveDroneControlNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__('interactive_drone_control_node')
|
||||
self.state_sub = self.create_subscription(State, 'mavros/state', self.state_callback, 10)
|
||||
self.arm_client = self.create_client(CommandBool, 'mavros/cmd/arming')
|
||||
self.set_mode_client = self.create_client(SetMode, 'mavros/set_mode')
|
||||
self.takeoff_client = self.create_client(CommandTOL, 'mavros/cmd/takeoff')
|
||||
self.local_pos_pub = self.create_publisher(PoseStamped, 'mavros/setpoint_position/local', 10)
|
||||
self.current_state = None
|
||||
self.current_pose = PoseStamped()
|
||||
self.get_logger().info("Interactive Drone Control Node has been started")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def state_callback(self, msg):
|
||||
self.current_state = msg
|
||||
|
||||
def arm_drone(self):
|
||||
req = CommandBool.Request()
|
||||
req.value = True
|
||||
future = self.arm_client.call_async(req)
|
||||
rclpy.spin_until_future_complete(self, future)
|
||||
response = future.result()
|
||||
if response.success:
|
||||
self.get_logger().info('Drone armed successfully')
|
||||
else:
|
||||
self.get_logger().error('Failed to arm drone')
|
||||
|
||||
def set_guided_mode(self):
|
||||
req = SetMode.Request()
|
||||
req.custom_mode = "GUIDED"
|
||||
future = self.set_mode_client.call_async(req)
|
||||
rclpy.spin_until_future_complete(self, future)
|
||||
response = future.result()
|
||||
if response.mode_sent:
|
||||
self.get_logger().info('GUIDED mode set successfully')
|
||||
else:
|
||||
self.get_logger().error('Failed to set GUIDED mode')
|
||||
|
||||
def set_land_mode(self):
|
||||
req = SetMode.Request()
|
||||
req.custom_mode = "LAND"
|
||||
future = self.set_mode_client.call_async(req)
|
||||
rclpy.spin_until_future_complete(self, future)
|
||||
response = future.result()
|
||||
if response.mode_sent:
|
||||
self.get_logger().info('LAND mode set successfully')
|
||||
else:
|
||||
self.get_logger().error('Failed to set LAND mode')
|
||||
|
||||
def takeoff(self, takeoff_alt):
|
||||
req = CommandTOL.Request()
|
||||
req.altitude = takeoff_alt
|
||||
req.latitude = 0.0 # 默認值,不需要修改
|
||||
req.longitude = 0.0 # 默認值,不需要修改
|
||||
req.min_pitch = 0.0 # 默認值,不需要修改
|
||||
req.yaw = 0.0 # 默認值,不需要修改
|
||||
future = self.takeoff_client.call_async(req)
|
||||
rclpy.spin_until_future_complete(self, future)
|
||||
response = future.result()
|
||||
if response.success:
|
||||
self.get_logger().info(f'Takeoff command sent successfully: {response.success}')
|
||||
self.current_pose.pose.position.z = takeoff_alt # 設置當前高度
|
||||
time.sleep(3) # 延遲3秒以確保起飛完成
|
||||
else:
|
||||
self.get_logger().error('Failed to send takeoff command')
|
||||
|
||||
# def set_target_pose(self, x, y, z):
|
||||
# self.current_pose.pose.position.x = x
|
||||
# self.current_pose.pose.position.y = y
|
||||
# self.current_pose.pose.position.z = z
|
||||
# self.local_pos_pub.publish(self.current_pose)
|
||||
# self.get_logger().info(f'Target position set to ({x}, {y}, {z})')
|
||||
# time.sleep(2) # 延遲2秒以確保目標位置設置完成
|
||||
|
||||
def move_xy(self, x1, y1):
|
||||
new_x = self.current_pose.pose.position.x + x1
|
||||
new_y = self.current_pose.pose.position.y + y1
|
||||
|
||||
heading = math.atan2(y1, x1)
|
||||
quaternion = self.euler_to_quaternion(0, 0, heading)
|
||||
|
||||
|
||||
self.current_pose.pose.position.x = new_x
|
||||
self.current_pose.pose.position.y = new_y
|
||||
self.current_pose.pose.orientation = quaternion
|
||||
self.local_pos_pub.publish(self.current_pose)
|
||||
self.get_logger().info(f'Yaw to heading {heading} radians')
|
||||
time.sleep(2) # 延迟2秒以确保航向设置完成
|
||||
|
||||
self.local_pos_pub.publish(self.current_pose)
|
||||
self.get_logger().info(f'Moving to ({new_x}, {new_y}) while maintaining altitude {self.current_pose.pose.position.z}')
|
||||
time.sleep(2) # 延遲2秒以確保目標位置設置完成
|
||||
|
||||
def euler_to_quaternion(self, roll, pitch, yaw):
|
||||
qx = math.sin(roll / 2) * math.cos(pitch / 2) * math.cos(yaw / 2) - math.cos(roll / 2) * math.sin(pitch / 2) * math.sin(yaw / 2)
|
||||
qy = math.cos(roll / 2) * math.sin(pitch / 2) * math.cos(yaw / 2) + math.sin(roll / 2) * math.cos(pitch / 2) * math.sin(yaw / 2)
|
||||
qz = math.cos(roll / 2) * math.cos(pitch / 2) * math.sin(yaw / 2) - math.sin(roll / 2) * math.sin(pitch / 2) * math.cos(yaw / 2)
|
||||
qw = math.cos(roll / 2) * math.cos(pitch / 2) * math.cos(yaw / 2) + math.sin(roll / 2) * math.sin(pitch / 2) * math.sin(yaw / 2)
|
||||
return Quaternion(x=qx, y=qy, z=qz, w=qw)
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = InteractiveDroneControlNode()
|
||||
|
||||
def print_instructions():
|
||||
print("""
|
||||
Available commands:
|
||||
go - Set mode to GUIDED,Arm the drone,
|
||||
takeoff - Take off to specified altitude
|
||||
move_x - Move to specified x position
|
||||
back_x - Move to specified x position
|
||||
move_y - Move to specified y position
|
||||
back_y - Move to specified y position
|
||||
land - Land the drone
|
||||
quit - Quit the program
|
||||
""")
|
||||
|
||||
print_instructions()
|
||||
|
||||
try:
|
||||
while rclpy.ok():
|
||||
command = input("Enter command: ")
|
||||
if command == "go":
|
||||
node.set_guided_mode()
|
||||
node.arm_drone()
|
||||
#node.takeoff(5.0)
|
||||
elif command == "takeoff":
|
||||
node.takeoff(5.0)
|
||||
elif command == "land":
|
||||
node.set_land_mode()
|
||||
|
||||
elif command.startswith("move_x"):
|
||||
parts = command.split()
|
||||
if len(parts) == 1:
|
||||
try:
|
||||
x1 = 5.0
|
||||
y1 = 0.0
|
||||
node.move_xy(x1, y1)
|
||||
except ValueError:
|
||||
print("Invalid position values")
|
||||
else:
|
||||
print("Usage: move_x")
|
||||
|
||||
elif command.startswith("move_y"):
|
||||
parts = command.split()
|
||||
if len(parts) == 1:
|
||||
try:
|
||||
x1 = 0.0
|
||||
y1 = 5.0
|
||||
node.move_xy(x1, y1)
|
||||
except ValueError:
|
||||
print("Invalid position values")
|
||||
|
||||
elif command.startswith("back_x"):
|
||||
parts = command.split()
|
||||
if len(parts) == 1:
|
||||
try:
|
||||
x1 = -5.0
|
||||
y1 = 0.0
|
||||
node.move_xy(x1, y1)
|
||||
except ValueError:
|
||||
print("Invalid position values")
|
||||
else:
|
||||
print("Usage: back_x")
|
||||
|
||||
elif command.startswith("back_y"):
|
||||
parts = command.split()
|
||||
if len(parts) == 1:
|
||||
try:
|
||||
x1 = 0.0
|
||||
y1 = -5.0
|
||||
node.move_xy(x1, y1)
|
||||
except ValueError:
|
||||
print("Invalid position values")
|
||||
else:
|
||||
print("Usage: back_y")
|
||||
elif command == "quit":
|
||||
break
|
||||
else:
|
||||
print("Unknown command")
|
||||
print_instructions()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Loading…
Reference in New Issue