import rclpy from rclpy.node import Node from mavros_msgs.srv import CommandBool, SetMode, CommandTOL from mavros_msgs.msg import State from geometry_msgs.msg import PoseStamped,Quaternion import time import math class InteractiveDroneControlNode(Node): def __init__(self): super().__init__('interactive_drone_control_node') self.state_sub = self.create_subscription(State, 'mavros/state', self.state_callback, 10) self.arm_client = self.create_client(CommandBool, 'mavros/cmd/arming') self.set_mode_client = self.create_client(SetMode, 'mavros/set_mode') self.takeoff_client = self.create_client(CommandTOL, 'mavros/cmd/takeoff') self.local_pos_pub = self.create_publisher(PoseStamped, 'mavros/setpoint_position/local', 10) self.current_state = None self.current_pose = PoseStamped() self.get_logger().info("Interactive Drone Control Node has been started") def state_callback(self, msg): self.current_state = msg def arm_drone(self): req = CommandBool.Request() req.value = True future = self.arm_client.call_async(req) rclpy.spin_until_future_complete(self, future) response = future.result() if response.success: self.get_logger().info('Drone armed successfully') else: self.get_logger().error('Failed to arm drone') def set_guided_mode(self): req = SetMode.Request() req.custom_mode = "GUIDED" future = self.set_mode_client.call_async(req) rclpy.spin_until_future_complete(self, future) response = future.result() if response.mode_sent: self.get_logger().info('GUIDED mode set successfully') else: self.get_logger().error('Failed to set GUIDED mode') def set_land_mode(self): req = SetMode.Request() req.custom_mode = "LAND" future = self.set_mode_client.call_async(req) rclpy.spin_until_future_complete(self, future) response = future.result() if response.mode_sent: self.get_logger().info('LAND mode set successfully') else: self.get_logger().error('Failed to set LAND mode') def takeoff(self, takeoff_alt): req = CommandTOL.Request() req.altitude = takeoff_alt req.latitude = 0.0 # 默認值,不需要修改 req.longitude = 0.0 # 默認值,不需要修改 req.min_pitch = 0.0 # 默認值,不需要修改 req.yaw = 0.0 # 默認值,不需要修改 future = self.takeoff_client.call_async(req) rclpy.spin_until_future_complete(self, future) response = future.result() if response.success: self.get_logger().info(f'Takeoff command sent successfully: {response.success}') self.current_pose.pose.position.z = takeoff_alt # 設置當前高度 time.sleep(3) # 延遲3秒以確保起飛完成 else: self.get_logger().error('Failed to send takeoff command') # def set_target_pose(self, x, y, z): # self.current_pose.pose.position.x = x # self.current_pose.pose.position.y = y # self.current_pose.pose.position.z = z # self.local_pos_pub.publish(self.current_pose) # self.get_logger().info(f'Target position set to ({x}, {y}, {z})') # time.sleep(2) # 延遲2秒以確保目標位置設置完成 def move_xy(self, x1, y1): new_x = self.current_pose.pose.position.x + x1 new_y = self.current_pose.pose.position.y + y1 heading = math.atan2(y1, x1) quaternion = self.euler_to_quaternion(0, 0, heading) self.current_pose.pose.position.x = new_x self.current_pose.pose.position.y = new_y self.current_pose.pose.orientation = quaternion self.local_pos_pub.publish(self.current_pose) self.get_logger().info(f'Yaw to heading {heading} radians') time.sleep(2) # 延迟2秒以确保航向设置完成 self.local_pos_pub.publish(self.current_pose) self.get_logger().info(f'Moving to ({new_x}, {new_y}) while maintaining altitude {self.current_pose.pose.position.z}') time.sleep(2) # 延遲2秒以確保目標位置設置完成 def euler_to_quaternion(self, roll, pitch, yaw): qx = math.sin(roll / 2) * math.cos(pitch / 2) * math.cos(yaw / 2) - math.cos(roll / 2) * math.sin(pitch / 2) * math.sin(yaw / 2) qy = math.cos(roll / 2) * math.sin(pitch / 2) * math.cos(yaw / 2) + math.sin(roll / 2) * math.cos(pitch / 2) * math.sin(yaw / 2) qz = math.cos(roll / 2) * math.cos(pitch / 2) * math.sin(yaw / 2) - math.sin(roll / 2) * math.sin(pitch / 2) * math.cos(yaw / 2) qw = math.cos(roll / 2) * math.cos(pitch / 2) * math.cos(yaw / 2) + math.sin(roll / 2) * math.sin(pitch / 2) * math.sin(yaw / 2) return Quaternion(x=qx, y=qy, z=qz, w=qw) def main(args=None): rclpy.init(args=args) node = InteractiveDroneControlNode() def print_instructions(): print(""" Available commands: go - Set mode to GUIDED,Arm the drone,Take off to specified altitude move_x - Move to specified x position back_x - Move to specified x position move_y - Move to specified y position back_y - Move to specified y position land - Land the drone quit - Quit the program """) print_instructions() try: while rclpy.ok(): command = input("Enter command: ") if command == "go": node.set_guided_mode() node.arm_drone() node.takeoff(5.0) elif command == "land": node.set_land_mode() elif command.startswith("move_x"): parts = command.split() if len(parts) == 1: try: x1 = 5.0 y1 = 0.0 node.move_xy(x1, y1) except ValueError: print("Invalid position values") else: print("Usage: move_x") elif command.startswith("move_y"): parts = command.split() if len(parts) == 1: try: x1 = 0.0 y1 = 5.0 node.move_xy(x1, y1) except ValueError: print("Invalid position values") elif command.startswith("back_x"): parts = command.split() if len(parts) == 1: try: x1 = -5.0 y1 = 0.0 node.move_xy(x1, y1) except ValueError: print("Invalid position values") else: print("Usage: back_x") elif command.startswith("back_y"): parts = command.split() if len(parts) == 1: try: x1 = 0.0 y1 = -5.0 node.move_xy(x1, y1) except ValueError: print("Invalid position values") else: print("Usage: back_y") elif command == "quit": break else: print("Unknown command") print_instructions() except KeyboardInterrupt: pass node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()