You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/unitdev03/0519/enemy_status.py

340 lines
12 KiB
Python

#!/usr/bin/env python3
from pymavlink import mavutil
import time
import sys
import json
import socket
import threading
import os
from datetime import datetime
# 固定初始位置設定 (5,5,0)
INITIAL_POSITION = [5, 5, 0]
# MAVLink 連接設定
connection_string = 'udp:127.0.0.1:14553'
# 網路傳輸設定
target_ip = '140.120.31.130'
target_port = 9053
# 建立 UDP socket 用於傳送數據
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 封包編號計數器
packet_counter = 0
# 共享變數 - 存儲最新位置
current_position = None
initial_position = INITIAL_POSITION.copy()
position_lock = threading.Lock()
# 連接到無人機
print(f"嘗試連接到 MAVLink: {connection_string}")
try:
connection = mavutil.mavlink_connection(connection_string, autoreconnect=True)
connection.wait_heartbeat()
print(f"成功連接到系統 {connection.target_system}!")
except Exception as e:
print(f"MAVLink 連接失敗: {e}")
sys.exit(1)
print(f"使用指定初始位置: 北={initial_position[0]}, 東={initial_position[1]}, 下={initial_position[2]}")
# 請求位置數據流
connection.mav.request_data_stream_send(
connection.target_system,
connection.target_component,
mavutil.mavlink.MAV_DATA_STREAM_POSITION,
10,
1
)
# 日誌記錄函數
def start_logging():
"""創建並開啟日誌文件"""
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = os.path.join(log_dir, f"drone_log_{timestamp}.csv")
file = open(log_file, "w")
file.write("packet_id,timestamp,x,y,z,vx,vy,vz\n") # CSV標頭
print(f"日誌記錄已開始: {log_file}")
return file
# 控制函數
def set_guided_mode():
"""設置為 GUIDED 模式"""
connection.mav.command_long_send(
connection.target_system,
connection.target_component,
mavutil.mavlink.MAV_CMD_DO_SET_MODE,
0,
mavutil.mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED,
4,
0, 0, 0, 0, 0
)
ack = connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3)
return ack and ack.result == mavutil.mavlink.MAV_RESULT_ACCEPTED
def arm_throttle():
"""解鎖油門"""
connection.mav.command_long_send(
connection.target_system,
connection.target_component,
mavutil.mavlink.MAV_CMD_COMPONENT_ARM_DISARM,
0, 1, 0, 0, 0, 0, 0, 0
)
ack = connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3)
return ack and ack.result == mavutil.mavlink.MAV_RESULT_ACCEPTED
def takeoff(altitude):
"""起飛到指定高度"""
connection.mav.command_long_send(
connection.target_system,
connection.target_component,
mavutil.mavlink.MAV_CMD_NAV_TAKEOFF,
0, 0, 0, 0, 0, 0, 0, altitude
)
ack = connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3)
return ack and ack.result == mavutil.mavlink.MAV_RESULT_ACCEPTED
def set_speed_limit(speed_type, speed):
"""
設置速度限制
Parameters:
-----------
speed_type : int
速度類型 (0=空速, 1=地速, 2=爬升速度, 3=下降速度)
speed : float
速度值 (米/秒)
"""
connection.mav.command_long_send(
connection.target_system,
connection.target_component,
mavutil.mavlink.MAV_CMD_DO_CHANGE_SPEED,
0, # 確認
speed_type, # 參數1: 速度類型
speed, # 參數2: 速度 (m/s)
-1, # 參數3: 油門 (-1 表示不變)
0, 0, 0, 0 # 參數4-7: 未使用
)
ack = connection.recv_match(type='COMMAND_ACK', blocking=True, timeout=3)
success = ack and ack.result == mavutil.mavlink.MAV_RESULT_ACCEPTED
if success:
print(f"已設置速度限制為 {speed:.2f} m/s")
else:
print("設置速度限制失敗")
return success
def goto_position(north, east, down):
"""前往指定位置 (世界座標系)"""
# 將世界座標轉換為相對座標
rel_north = north - initial_position[0]
rel_east = east - initial_position[1]
rel_down = down - initial_position[2]
connection.mav.set_position_target_local_ned_send(
0,
connection.target_system,
connection.target_component,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
0b0000111111111000,
rel_north, rel_east, rel_down,
0, 0, 0,
0, 0, 0,
0, 0
)
# print(f"相對位置: 北={rel_north:.2f}, 東={rel_east:.2f}, 下={rel_down:.2f}")
def get_current_position():
"""獲取存儲的當前位置"""
with position_lock:
if current_position is None:
return None
return current_position.copy()
def control_interface():
"""命令行控制界面"""
print("無人機控制介面已啟動,輸入命令:")
print(" arm - 解鎖油門")
print(" takeoff [高度] - 起飛")
print(" goto [北] [東] [高度] - 前往位置 (世界座標系)")
print(" pos - 顯示當前位置 (相對座標和世界座標)")
print(" init - 顯示初始位置")
print(" exit - 退出控制")
while True:
try:
cmd = input("命令> ").strip().split()
if not cmd:
continue
if cmd[0] == "arm":
print("嘗試解鎖油門...")
if set_guided_mode() and arm_throttle():
print("成功解鎖油門!")
else:
print("解鎖失敗")
elif cmd[0] == "takeoff":
alt = float(cmd[1]) if len(cmd) > 1 else 3.0
print(f"嘗試起飛到 {alt} 米高...")
if takeoff(alt):
print("起飛命令已發送")
else:
print("起飛失敗")
elif cmd[0] == "goto":
if len(cmd) >= 4:
north = float(cmd[1])
east = float(cmd[2])
down = -float(cmd[3]) # 高度是負的下
print(f"前往世界位置: 北:{north} 東:{east} 下:{-down}")
goto_position(north, east, down)
else:
print("用法: goto [北] [東] [高度]")
elif cmd[0] == "pos":
pos = get_current_position()
if pos:
rel_x, rel_y, rel_z = pos
# 計算世界座標
world_x = initial_position[0] + rel_x
world_y = initial_position[1] + rel_y
world_z = initial_position[2] + rel_z
# NED 轉 ENU
enu_x = rel_y
enu_y = rel_x
enu_z = -rel_z
print("\n當前位置:")
#print(f" 相對座標 (NED): 北={rel_x:.2f}m, 東={rel_y:.2f}m, 下={rel_z:.2f}m")
print(f" 世界座標 : 北={world_x:.2f}m, 東={world_y:.2f}m, 下={-world_z:.2f}m")
#print(f" 相對座標 (ENU): 東={enu_x:.2f}m, 北={enu_y:.2f}m, 上={enu_z:.2f}m")
#print(f" 高度: {-rel_z:.2f}m")
else:
print("尚未獲取到位置信息")
elif cmd[0] == "speed":
if len(cmd) > 1:
speed = float(cmd[1])
set_speed_limit(1, speed) # 1 = 地速
else:
print("用法: speed [速度m/s]")
elif cmd[0] == "climbspeed":
if len(cmd) > 1:
speed = float(cmd[1])
set_speed_limit(2, speed) # 2 = 爬升速度
set_speed_limit(3, speed) # 3 = 下降速度
else:
print("用法: climbspeed [速度m/s]")
elif cmd[0] == "init":
print(f"初始位置 (NED): 北={initial_position[0]:.2f}m, 東={initial_position[1]:.2f}m, 下={initial_position[2]:.2f}m")
elif cmd[0] == "exit":
print("退出控制介面")
break
else:
print(f"未知命令: {cmd[0]}")
except Exception as e:
print(f"命令執行錯誤: {e}")
# 數據傳輸線程函數
def data_sending_thread():
global packet_counter, current_position
# 開啟日誌文件
log_file = start_logging()
try:
print(f"開始接收數據並傳送到 {target_ip}:{target_port}...")
while True:
msg = connection.recv_match(type='LOCAL_POSITION_NED', blocking=True, timeout=1)
if msg is not None:
# 更新當前位置(線程安全)
with position_lock:
current_position = [msg.x, msg.y, msg.z]
# 增加封包編號
packet_counter += 1
# 計算世界座標
world_x = initial_position[0] + msg.x
world_y = initial_position[1] + msg.y
world_z = initial_position[2] + msg.z
# NED 轉換為 ENU 並使用世界座標
enu_position = {
"x": world_x, # 東 (從 NED 的 x)
"y": world_y, # 北 (從 NED 的 y)
"z": -world_z # 上 (從 NED 的 -z)
}
enu_velocity = {
"vx": msg.vy, # 東向速度
"vy": msg.vx, # 北向速度
"vz": -msg.vz # 上向速度
}
# 創建JSON對象 (使用世界座標)
json_data = {
"packet_id": packet_counter,
"timestamp": msg.time_boot_ms,
"enemy_position": enu_position,
"enemy_velocity": enu_velocity
}
# 寫入日誌文件 (CSV格式)
try:
log_file.write(f"{packet_counter},{msg.time_boot_ms},{enu_position['x']},{enu_position['y']},{enu_position['z']},{enu_velocity['vx']},{enu_velocity['vy']},{enu_velocity['vz']}\n")
log_file.flush() # 確保數據即時寫入文件
except Exception as e:
print(f"寫入日誌失敗: {e}")
json_str = json.dumps(json_data)
try:
sock.sendto(json_str.encode('utf-8'), (target_ip, target_port))
except Exception as e:
print(f"傳送數據失敗: {e}")
time.sleep(0.1)
except Exception as e:
print(f"數據傳輸線程錯誤: {e}")
finally:
# 確保關閉日誌文件
log_file.close()
print("日誌文件已關閉")
# 創建並啟動數據發送線程
sender_thread = threading.Thread(target=data_sending_thread)
sender_thread.daemon = True
sender_thread.start()
# 啟動命令行控制界面
try:
control_interface()
except KeyboardInterrupt:
print("\n程序已被用戶中斷")
finally:
if connection:
connection.mav.request_data_stream_send(
connection.target_system,
connection.target_component,
mavutil.mavlink.MAV_DATA_STREAM_ALL,
0, 0
)
print("數據流已停止")
sock.close()
print("網絡連接已關閉")
print(f"總共傳送了 {packet_counter} 個封包")
print("日誌文件已儲存")