新增 GUI 資料夾與相關功能

chiyu
wenchun 3 months ago
parent 303dfaf04a
commit 317483a5c0

@ -0,0 +1,398 @@
#!/usr/bin/env python3
from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QLineEdit)
from PyQt6.QtCore import pyqtSignal
class CommPanel(QWidget):
"""通讯设置面板类"""
# 定义信号
udp_connection_added = pyqtSignal(str, int) # ip, port
udp_connection_toggled = pyqtSignal(dict, QPushButton, QLabel) # conn, btn, status_label
udp_connection_removed = pyqtSignal(dict, QWidget) # conn, panel
ws_connection_added = pyqtSignal(str) # url
ws_connection_toggled = pyqtSignal(dict, QPushButton, QLabel) # conn, btn, status_label
ws_connection_removed = pyqtSignal(dict, QWidget) # conn, panel
status_message = pyqtSignal(str, int) # message, timeout
def __init__(self, parent=None):
super().__init__(parent)
self.udp_connections = []
self.ws_connections = []
self._init_ui()
def _init_ui(self):
"""初始化UI"""
layout = QVBoxLayout(self)
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(10)
# ========== UDP MAVLink 區域 ==========
udp_title = QLabel("UDP")
udp_title.setStyleSheet("""
color: #DDD;
font-size: 14px;
font-weight: bold;
padding: 5px;
background-color: #333;
border-radius: 4px;
""")
layout.addWidget(udp_title)
# UDP 連接列表容器
self.udp_list_widget = QWidget()
self.udp_list_layout = QVBoxLayout(self.udp_list_widget)
self.udp_list_layout.setContentsMargins(0, 0, 0, 0)
self.udp_list_layout.setSpacing(5)
layout.addWidget(self.udp_list_widget)
# UDP 添加新連接區域
add_udp_widget = QWidget()
add_udp_layout = QHBoxLayout(add_udp_widget)
add_udp_layout.setContentsMargins(0, 0, 0, 0)
self.udp_ip_input = QLineEdit()
self.udp_ip_input.setText("127.0.0.1")
self.udp_ip_input.setPlaceholderText("IP")
self.udp_ip_input.setStyleSheet("""
QLineEdit {
background-color: #333;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
}
""")
self.udp_port_input = QLineEdit()
self.udp_port_input.setText("14540")
self.udp_port_input.setPlaceholderText("Port")
self.udp_port_input.setFixedWidth(80)
self.udp_port_input.setStyleSheet("""
QLineEdit {
background-color: #333;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
}
""")
add_udp_btn = QPushButton("添加")
add_udp_btn.clicked.connect(self._handle_add_udp)
add_udp_btn.setStyleSheet("""
QPushButton {
background-color: #4CAF50;
color: white;
border: none;
padding: 6px 12px;
border-radius: 4px;
min-width: 30px;
}
QPushButton:hover { background-color: #45a049; }
""")
add_udp_layout.addWidget(QLabel("IP:", styleSheet="color: #DDD;"))
add_udp_layout.addWidget(self.udp_ip_input)
add_udp_layout.addWidget(QLabel("Port:", styleSheet="color: #DDD;"))
add_udp_layout.addWidget(self.udp_port_input)
add_udp_layout.addWidget(add_udp_btn)
layout.addWidget(add_udp_widget)
# 分隔線
separator = QWidget()
separator.setFixedHeight(20)
layout.addWidget(separator)
# ========== WebSocket 區域 ==========
ws_title = QLabel("WebSocket")
ws_title.setStyleSheet("""
color: #DDD;
font-size: 14px;
font-weight: bold;
padding: 5px;
background-color: #333;
border-radius: 4px;
""")
layout.addWidget(ws_title)
# WebSocket 連接列表容器
self.ws_list_widget = QWidget()
self.ws_list_layout = QVBoxLayout(self.ws_list_widget)
self.ws_list_layout.setContentsMargins(0, 0, 0, 0)
self.ws_list_layout.setSpacing(5)
layout.addWidget(self.ws_list_widget)
# WebSocket 添加新連接區域
add_ws_widget = QWidget()
add_ws_layout = QHBoxLayout(add_ws_widget)
add_ws_layout.setContentsMargins(0, 0, 0, 0)
self.ws_url_input = QLineEdit()
self.ws_url_input.setPlaceholderText("host")
self.ws_url_input.setStyleSheet("""
QLineEdit {
background-color: #333;
color: #DDD;
border: 1px solid #555;
border-radius: 4px;
padding: 5px;
}
""")
add_ws_btn = QPushButton("添加")
add_ws_btn.clicked.connect(self._handle_add_ws)
add_ws_btn.setStyleSheet("""
QPushButton {
background-color: #4CAF50;
color: white;
border: none;
padding: 6px 12px;
border-radius: 4px;
min-width: 30px;
}
QPushButton:hover { background-color: #45a049; }
""")
add_ws_layout.addWidget(QLabel("URL:", styleSheet="color: #DDD;"))
add_ws_layout.addWidget(self.ws_url_input)
add_ws_layout.addWidget(add_ws_btn)
layout.addWidget(add_ws_widget)
layout.addStretch()
def _handle_add_udp(self):
"""處理添加 UDP 連接"""
ip = self.udp_ip_input.text().strip()
port_text = self.udp_port_input.text().strip()
if not ip or not port_text:
self.status_message.emit("請輸入 IP 和 Port", 3000)
return
try:
port = int(port_text)
if port < 1 or port > 65535:
raise ValueError("Port 超出範圍")
except ValueError:
self.status_message.emit("Port 必須是 1-65535 的數字", 3000)
return
# 檢查是否已存在相同連接
for conn in self.udp_connections:
if conn['ip'] == ip and conn['port'] == port:
self.status_message.emit("連接已存在", 3000)
return
# 發送信號通知主窗口
self.udp_connection_added.emit(ip, port)
# 清空輸入框
self.udp_ip_input.clear()
self.udp_port_input.clear()
def _handle_add_ws(self):
"""處理添加 WebSocket 連接"""
input_url = self.ws_url_input.text().strip()
if not input_url:
self.status_message.emit("請輸入 WebSocket URL", 3000)
return
# 自動添加 ws:// 前綴
if not input_url.startswith('ws://') and not input_url.startswith('wss://'):
url = f'ws://{input_url}'
else:
url = input_url
# 基本 URL 格式驗證
try:
if '://' in url:
parts = url.split('://', 1)
if len(parts) == 2 and ':' not in parts[1]:
self.status_message.emit("URL 格式錯誤,需要包含端口號 (例如: 127.0.0.1:8756)", 3000)
return
except:
self.status_message.emit("URL 格式錯誤", 3000)
return
# 檢查是否已存在相同連接
for conn in self.ws_connections:
if conn['url'] == url:
self.status_message.emit("連接已存在", 3000)
return
# 發送信號通知主窗口
self.ws_connection_added.emit(url)
# 清空輸入框
self.ws_url_input.clear()
def create_udp_connection_panel(self, conn):
"""創建 UDP 連接面板"""
panel = QWidget()
panel.setStyleSheet("""
QWidget {
background-color: #2A2A2A;
border-radius: 6px;
padding: 8px;
border: 1px solid #444;
}
""")
layout = QHBoxLayout(panel)
layout.setContentsMargins(8, 8, 8, 8)
# 連接資訊
info_label = QLabel(f"{conn['name']} - {conn['ip']}:{conn['port']}")
info_label.setStyleSheet("color: #DDD; font-size: 12px;")
# 狀態指示器
status_label = QLabel("")
if conn.get('enabled', False):
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
else:
status_label.setStyleSheet("color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
# 控制按鈕
toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動")
toggle_btn.setFixedWidth(60)
toggle_btn.clicked.connect(lambda: self.udp_connection_toggled.emit(conn, toggle_btn, status_label))
toggle_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #555; }
""")
remove_btn = QPushButton("移除")
remove_btn.setFixedWidth(60)
remove_btn.clicked.connect(lambda: self.udp_connection_removed.emit(conn, panel))
remove_btn.setStyleSheet("""
QPushButton {
background-color: #d32f2f;
color: white;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #b71c1c; }
""")
layout.addWidget(status_label)
layout.addWidget(info_label)
layout.addStretch()
layout.addWidget(toggle_btn)
layout.addWidget(remove_btn)
# 儲存引用
panel.connection = conn
panel.toggle_btn = toggle_btn
panel.status_label = status_label
return panel
def create_ws_connection_panel(self, conn):
"""創建 WebSocket 連接面板"""
panel = QWidget()
panel.setStyleSheet("""
QWidget {
background-color: #2A2A2A;
border-radius: 6px;
padding: 8px;
border: 1px solid #444;
}
""")
layout = QHBoxLayout(panel)
layout.setContentsMargins(8, 8, 8, 8)
# 連接資訊
info_label = QLabel(f"{conn['name']} - {conn['url']}")
info_label.setStyleSheet("color: #DDD; font-size: 12px;")
# 狀態指示器
status_label = QLabel("")
if conn.get('enabled', False):
status_label.setStyleSheet("color: #4CAF50; font-size: 16px;")
status_label.setToolTip("運行中")
else:
status_label.setStyleSheet("color: #888; font-size: 16px;")
status_label.setToolTip("已停止")
# 控制按鈕
toggle_btn = QPushButton("停止" if conn.get('enabled', False) else "啟動")
toggle_btn.setFixedWidth(60)
toggle_btn.clicked.connect(lambda: self.ws_connection_toggled.emit(conn, toggle_btn, status_label))
toggle_btn.setStyleSheet("""
QPushButton {
background-color: #444;
color: #DDD;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #555; }
""")
remove_btn = QPushButton("移除")
remove_btn.setFixedWidth(60)
remove_btn.clicked.connect(lambda: self.ws_connection_removed.emit(conn, panel))
remove_btn.setStyleSheet("""
QPushButton {
background-color: #d32f2f;
color: white;
border: none;
padding: 4px 8px;
border-radius: 3px;
font-size: 11px;
}
QPushButton:hover { background-color: #b71c1c; }
""")
layout.addWidget(status_label)
layout.addWidget(info_label)
layout.addStretch()
layout.addWidget(toggle_btn)
layout.addWidget(remove_btn)
# 儲存引用
panel.connection = conn
panel.toggle_btn = toggle_btn
panel.status_label = status_label
return panel
def add_udp_panel(self, conn):
"""添加 UDP 連接面板到列表"""
panel = self.create_udp_connection_panel(conn)
self.udp_list_layout.addWidget(panel)
self.udp_connections.append(conn)
return panel
def add_ws_panel(self, conn):
"""添加 WebSocket 連接面板到列表"""
panel = self.create_ws_connection_panel(conn)
self.ws_list_layout.addWidget(panel)
self.ws_connections.append(conn)
return panel
def remove_udp_connection_from_list(self, conn):
"""從列表中移除 UDP 連接"""
if conn in self.udp_connections:
self.udp_connections.remove(conn)
def remove_ws_connection_from_list(self, conn):
"""從列表中移除 WebSocket 連接"""
if conn in self.ws_connections:
self.ws_connections.remove(conn)

@ -0,0 +1,662 @@
from rclpy.node import Node
from PyQt6.QtCore import QObject, pyqtSignal
import math
import re
import threading
from threading import Lock
import asyncio
import websockets
import json
import socket
from pymavlink import mavutil
from geometry_msgs.msg import Point, Vector3
from sensor_msgs.msg import BatteryState, NavSatFix, Imu
from std_msgs.msg import Float64
from mavros_msgs.msg import State, VfrHud
from mavros_msgs.srv import CommandBool, CommandTOL
class DroneSignals(QObject):
update_signal = pyqtSignal(str, str, object) # (msg_type, drone_id, data)
class UDPMavlinkReceiver(threading.Thread):
"""UDP MAVLink 接收器"""
def __init__(self, ip, port, signals, connection_name):
super().__init__(daemon=True)
self.ip = ip
self.port = port
self.signals = signals
self.connection_name = connection_name
self.running = False
self.sock = None
def run(self):
"""執行 UDP 接收循環"""
self.running = True
try:
print(f"UDP MAVLink receiver started on {self.ip}:{self.port}")
# 創建 MAVLink 連接
mav = mavutil.mavlink_connection(f'udpin:{self.ip}:{self.port}')
while self.running:
try:
msg = mav.recv_match(blocking=True, timeout=1.0)
if msg is None:
continue
self.process_mavlink_message(msg)
except socket.timeout:
continue
except Exception as e:
print(f"Error receiving MAVLink message: {e}")
except Exception as e:
print(f"UDP receiver error: {e}")
finally:
if self.sock:
self.sock.close()
def process_mavlink_message(self, msg):
"""處理 MAVLink 訊息"""
try:
msg_type = msg.get_type()
system_id = msg.get_srcSystem()
drone_id = f"s8_{system_id}" # 使用 s8_ 前綴表示 UDP 來源
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
armed = bool(msg.base_mode & 128)
self.signals.update_signal.emit('state', drone_id, {
'mode': mode,
'armed': armed
})
elif msg_type == "BATTERY_STATUS":
voltage = msg.voltages[0] / 1000
self.signals.update_signal.emit('battery', drone_id, {
'voltage': voltage
})
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
longitude = msg.lon / 1e7
self.signals.update_signal.emit('gps', drone_id, {
'lat': latitude,
'lon': longitude
})
elif msg_type == "GPS_RAW_INT":
fix_type = msg.fix_type
elif msg_type == "LOCAL_POSITION_NED":
x = msg.y
y = msg.x
z = -msg.z
self.signals.update_signal.emit('local_pose', drone_id, {
'x': x,
'y': y,
'z': z
})
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': z
})
self.signals.update_signal.emit('velocity', drone_id, {
'vx': msg.vx,
'vy': msg.vy,
'vz': msg.vz
})
elif msg_type == "ATTITUDE":
pitch = math.degrees(msg.pitch)
self.signals.update_signal.emit('attitude', drone_id, {
'pitch': pitch,
'roll': 0,
'yaw': 0,
'rates': (0, 0, 0)
})
elif msg_type == "VFR_HUD":
groundspeed = msg.groundspeed
heading = msg.heading
self.signals.update_signal.emit('hud', drone_id, {
'heading': heading,
'groundspeed': groundspeed
})
except Exception as e:
print(f"Error processing MAVLink message: {e}")
def stop(self):
"""停止接收器"""
self.running = False
class SerialMavlinkReceiver(threading.Thread):
"""串口 MAVLink 接收器"""
def __init__(self, port, baudrate, signals, connection_name):
super().__init__(daemon=True)
self.port = port
self.baudrate = baudrate
self.signals = signals
self.connection_name = connection_name
self.running = False
self.mav = None
def run(self):
"""執行串口接收循環"""
self.running = True
try:
print(f"Serial MAVLink receiver started on {self.port} at {self.baudrate} baud")
# 創建 MAVLink 串口連接
self.mav = mavutil.mavlink_connection(
self.port,
baud=self.baudrate,
source_system=255
)
print(f"Waiting for heartbeat from {self.port}...")
self.mav.wait_heartbeat()
print(f"Heartbeat received from system {self.mav.target_system}, component {self.mav.target_component}")
while self.running:
try:
msg = self.mav.recv_match(blocking=True, timeout=1.0)
if msg is None:
continue
self.process_mavlink_message(msg)
except Exception as e:
if self.running:
print(f"Error receiving MAVLink message from serial: {e}")
except Exception as e:
print(f"Serial receiver error: {e}")
finally:
if self.mav:
try:
self.mav.close()
except:
pass
def process_mavlink_message(self, msg):
"""處理 MAVLink 訊息"""
try:
msg_type = msg.get_type()
system_id = msg.get_srcSystem()
drone_id = f"s5_{system_id}" # 使用 serial_ 前綴表示串口來源
if msg_type == "HEARTBEAT":
mode = mavutil.mode_string_v10(msg)
armed = bool(msg.base_mode & 128)
self.signals.update_signal.emit('state', drone_id, {
'mode': mode,
'armed': armed
})
elif msg_type == "BATTERY_STATUS":
voltage = msg.voltages[0] / 1000
self.signals.update_signal.emit('battery', drone_id, {
'voltage': voltage
})
elif msg_type == "GLOBAL_POSITION_INT":
latitude = msg.lat / 1e7
longitude = msg.lon / 1e7
self.signals.update_signal.emit('gps', drone_id, {
'lat': latitude,
'lon': longitude
})
elif msg_type == "GPS_RAW_INT":
fix_type = msg.fix_type
elif msg_type == "LOCAL_POSITION_NED":
x = msg.y
y = msg.x
z = -msg.z
self.signals.update_signal.emit('local_pose', drone_id, {
'x': x,
'y': y,
'z': z
})
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': z
})
self.signals.update_signal.emit('velocity', drone_id, {
'vx': msg.vx,
'vy': msg.vy,
'vz': msg.vz
})
elif msg_type == "ATTITUDE":
pitch = math.degrees(msg.pitch)
self.signals.update_signal.emit('attitude', drone_id, {
'pitch': pitch,
'roll': 0,
'yaw': 0,
'rates': (0, 0, 0)
})
elif msg_type == "VFR_HUD":
groundspeed = msg.groundspeed
heading = msg.heading
self.signals.update_signal.emit('hud', drone_id, {
'heading': heading,
'groundspeed': groundspeed
})
except Exception as e:
print(f"Error processing MAVLink message from serial: {e}")
def stop(self):
"""停止接收器"""
self.running = False
class WebSocketMavlinkReceiver(threading.Thread):
"""WebSocket MAVLink 接收器"""
def __init__(self, url, signals, connection_name):
super().__init__(daemon=True)
self.url = url
self.signals = signals
self.connection_name = connection_name
self.running = False
self.max_retries = 5
self.base_delay = 1.0
def run(self):
"""執行 WebSocket 接收循環"""
self.running = True
asyncio.set_event_loop(asyncio.new_event_loop())
asyncio.get_event_loop().run_until_complete(self.ws_client_loop())
async def ws_client_loop(self):
"""WebSocket 連接的主循環"""
retry_count = 0
print(f"Starting WebSocket client for {self.connection_name} at {self.url}")
while self.running and retry_count < self.max_retries:
try:
async with websockets.connect(self.url) as websocket:
print(f"WebSocket {self.connection_name} connected to {self.url}")
retry_count = 0 # 重置重試計數
async for message in websocket:
if not self.running:
break
try:
data = json.loads(message)
data['_connection_source'] = self.connection_name
self.process_websocket_message(data)
except json.JSONDecodeError as e:
print(f"WebSocket {self.connection_name} JSON decode error: {e}")
except Exception as e:
print(f"WebSocket {self.connection_name} message processing error: {e}")
except websockets.exceptions.ConnectionClosedError:
print(f"WebSocket {self.connection_name} connection closed")
if self.running:
retry_count += 1
if retry_count < self.max_retries:
delay = self.base_delay * (2 ** min(retry_count, 4))
print(f"Reconnecting in {delay}s...")
await asyncio.sleep(delay)
else:
break
except Exception as e:
retry_count += 1
if retry_count < self.max_retries and self.running:
delay = self.base_delay * (2 ** min(retry_count, 4))
print(f"WebSocket {self.connection_name} connection error: {e}, retrying in {delay}s (attempt {retry_count}/{self.max_retries})")
await asyncio.sleep(delay)
else:
break
print(f"WebSocket client {self.connection_name} stopped")
def process_websocket_message(self, data):
"""處理 WebSocket 訊息"""
try:
drone_id = data.get('system_id')
drone_id = f"s9_{drone_id}" if drone_id else None
if not drone_id:
return
# 模式
if 'mode' in data:
self.signals.update_signal.emit('state', drone_id, {
'mode': data['mode'],
})
# 電池
if 'battery' in data:
self.signals.update_signal.emit('battery', drone_id, {
'percentage': data['battery']
})
# 位置
if 'position' in data:
pos = data['position']
self.signals.update_signal.emit('gps', drone_id, {
'lat': pos.get('lat', 0),
'lon': pos.get('lon', 0)
})
# Local position - 設定 x, y 為 0.0
self.signals.update_signal.emit('local_pose', drone_id, {
'x': 0.0,
'y': 0.0,
'z': 0.0
})
# Altitude - 設定為 0.0
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': 0.0
})
# 航向
if 'heading' in data:
self.signals.update_signal.emit('hud', drone_id, {
'heading': data['heading'],
'groundspeed': 0.0
})
except Exception as e:
print(f"WebSocket message processing error: {e}")
def stop(self):
"""停止接收器"""
self.running = False
class DroneMonitor(Node):
def __init__(self):
super().__init__('drone_monitor')
self.signals = DroneSignals()
self.drone_topics = {}
self.lock = Lock()
self.arm_clients = {}
self.takeoff_clients = {}
self.setpoint_pubs = {}
self.selected_drones = set()
self.latest_data = {}
# 定義需要過濾的模式
self.filtered_modes = ['Mode(0x000000c0)']
# WebSocket 接收器列表
self.ws_receivers = []
# 串口接收器列表
# ================================================================================
# 【新增】儲存 GPS 資料的字典
# ================================================================================
self.drone_gps = {} # {drone_id: {'lat': ..., 'lon': ..., 'alt': ...}}
# ================================================================================
self.serial_receivers = []
# 主题检测定时器
self.create_timer(1.0, self.scan_topics)
def scan_topics(self):
topics = self.get_topic_names_and_types()
drone_pattern = re.compile(r'/MavLinkBus/(s\d+_\d+)/(\w+)')
found_drones = set()
for topic_name, _ in topics:
if match := drone_pattern.match(topic_name):
drone_id, topic_type = match.groups()
found_drones.add(drone_id)
with self.lock:
self.drone_topics.setdefault(drone_id, set()).add(topic_type)
for drone_id in found_drones:
if not hasattr(self, f'drone_{drone_id}_subs'):
self.setup_drone(drone_id)
def setup_drone(self, drone_id):
base_topic = f'/MavLinkBus/{drone_id}'
# Add service clients
self.arm_clients[drone_id] = self.create_client(
CommandBool,
f'{base_topic}/cmd/arming'
)
self.takeoff_clients[drone_id] = self.create_client(
CommandTOL,
f'{base_topic}/cmd/takeoff'
)
# Add setpoint publisher
self.setpoint_pubs[drone_id] = self.create_publisher(
Point,
f'{base_topic}/setpoint_position/local',
10
)
subs = {
'attitude': self.create_subscription(
Imu,
f'{base_topic}/attitude',
lambda msg, did=drone_id: self.attitude_callback(did, msg),
10
),
'battery': self.create_subscription(
BatteryState,
f'{base_topic}/battery',
lambda msg, did=drone_id: self.battery_callback(did, msg),
10
),
'global': self.create_subscription(
NavSatFix,
f'{base_topic}/global_position/global',
lambda msg, did=drone_id: self.gps_callback(did, msg),
10
),
'rel_alt': self.create_subscription(
Float64,
f'{base_topic}/global_position/rel_alt',
lambda msg, did=drone_id: self.altitude_callback(did, msg),
10
),
'local_pose': self.create_subscription(
Point,
f'{base_topic}/local_position/pose',
lambda msg, did=drone_id: self.local_pose_callback(did, msg),
10
),
'local_vel': self.create_subscription(
Vector3,
f'{base_topic}/local_position/velocity',
lambda msg, did=drone_id: self.local_vel_callback(did, msg),
10
),
'loss_rate': self.create_subscription(
Float64,
f'{base_topic}/packet_loss_rate',
lambda msg, did=drone_id: self.loss_rate_callback(did, msg),
10
),
'state': self.create_subscription(
State,
f'{base_topic}/state',
lambda msg, did=drone_id: self.state_callback(did, msg),
10
),
'ping': self.create_subscription(
Float64,
f'{base_topic}/ping',
lambda msg, did=drone_id: self.ping_callback(did, msg),
10
),
'vfr_hud': self.create_subscription(
VfrHud,
f'{base_topic}/vfr_hud',
lambda msg, did=drone_id: self.hud_callback(did, msg),
10
)
}
setattr(self, f'drone_{drone_id}_subs', subs)
async def arm_drone(self, drone_id, arm):
if drone_id not in self.arm_clients:
return False
client = self.arm_clients[drone_id]
if not client.wait_for_service(timeout_sec=1.0):
return False
request = CommandBool.Request()
request.value = arm
future = client.call_async(request)
try:
response = await future
return response.success
except Exception as e:
self.get_logger().error(f'Arm service call failed: {e}')
return False
async def takeoff_drone(self, drone_id, altitude=10.0):
if drone_id not in self.takeoff_clients:
return False
client = self.takeoff_clients[drone_id]
if not client.wait_for_service(timeout_sec=1.0):
return False
request = CommandTOL.Request()
request.altitude = altitude
request.min_pitch = 0.0
request.yaw = 0.0
future = client.call_async(request)
try:
response = await future
return response.success
except Exception as e:
self.get_logger().error(f'Takeoff service call failed: {e}')
return False
def send_setpoint(self, drone_id, x, y, z):
"""Send setpoint position command"""
if drone_id not in self.setpoint_pubs:
return False
msg = Point()
msg.x = float(x)
msg.y = float(y)
msg.z = float(z)
self.setpoint_pubs[drone_id].publish(msg)
return True
def quaternion_to_euler(self, q):
sinr_cosp = 2 * (q.w * q.x + q.y * q.z)
cosr_cosp = 1 - 2 * (q.x**2 + q.y**2)
roll = math.atan2(sinr_cosp, cosr_cosp)
sinp = 2 * (q.w * q.y - q.z * q.x)
pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi/2, sinp)
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1 - 2 * (q.y**2 + q.z**2)
yaw = math.atan2(siny_cosp, cosy_cosp)
return math.degrees(roll), math.degrees(pitch), math.degrees(yaw)
# callbacks
def attitude_callback(self, drone_id, msg):
if hasattr(msg, 'orientation'):
roll, pitch, yaw = self.quaternion_to_euler(msg.orientation)
self.latest_data[(drone_id, 'attitude')] = {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'rates': (msg.angular_velocity.x,
msg.angular_velocity.y,
msg.angular_velocity.z)
}
def battery_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'battery')] = {
'voltage': msg.voltage
}
def state_callback(self, drone_id, msg):
mode = msg.mode
if mode in self.filtered_modes:
return
self.latest_data[(drone_id, 'state')] = {
'mode': msg.mode,
'armed': msg.armed
}
def gps_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'gps')] = {
'lat': msg.latitude,
'lon': msg.longitude,
'alt': msg.altitude
}
# ================================================================================
# 【新增】儲存 GPS 資料到 drone_gps 字典
# ================================================================================
self.drone_gps[drone_id] = {
'lat': msg.latitude,
'lon': msg.longitude,
'alt': msg.altitude
}
# ================================================================================
def local_vel_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'velocity')] = {
'vx': msg.x,
'vy': msg.y,
'vz': msg.z
}
def altitude_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'altitude')] = {
'altitude': msg.data
}
def local_pose_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'local_pose')] = {
'x': msg.x,
'y': msg.y,
'z': msg.z
}
def hud_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'hud')] = {
'airspeed': msg.airspeed,
'groundspeed': msg.groundspeed,
'heading': msg.heading,
'throttle': msg.throttle,
'alt': msg.altitude,
'climb': msg.climb
}
def loss_rate_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'loss_rate')] = {
'loss_rate': msg.data
}
def ping_callback(self, drone_id, msg):
self.latest_data[(drone_id, 'ping')] = {
'ping': msg.data
}
def start_serial_connection(self, port='/dev/ttyUSB0', baudrate=57600):
"""啟動串口 MAVLink 連接"""
connection_name = f"Serial_{port.replace('/', '_')}"
receiver = SerialMavlinkReceiver(port, baudrate, self.signals, connection_name)
receiver.start()
self.serial_receivers.append(receiver)
print(f"Started serial connection on {port} at {baudrate} baud")
return receiver

@ -0,0 +1,378 @@
#!/usr/bin/env python3
from PyQt6.QtWidgets import (QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QPushButton, QSizePolicy, QCheckBox)
from PyQt6.QtCore import pyqtSignal
class DronePanel(QWidget):
"""單個無人機面板類別"""
# 定義信號
mode_change_requested = pyqtSignal(str) # drone_id
arm_requested = pyqtSignal(str) # drone_id
takeoff_requested = pyqtSignal(str) # drone_id
setpoint_requested = pyqtSignal(str) # drone_id
selection_changed = pyqtSignal(str, int) # drone_id, state
def __init__(self, drone_id, parent=None):
super().__init__(parent)
self.drone_id = drone_id
self.display_id = 's' + drone_id.split('_')[1]
self._init_ui()
def _init_ui(self):
"""初始化UI"""
self.setObjectName(f"panel_{self.drone_id}")
self.setFixedHeight(140)
self.setStyleSheet("""
background-color: #2A2A2A;
border-radius: 8px;
""")
# 主佈局
main_layout = QHBoxLayout(self)
main_layout.setContentsMargins(8, 8, 8, 8)
main_layout.setSpacing(0)
# 創建內容容器(包含 info 和 control
content_widget = QWidget()
content_widget.setStyleSheet("background-color: #333; border-radius: 6px;")
content_layout = QHBoxLayout(content_widget)
content_layout.setContentsMargins(8, 8, 8, 8)
content_layout.setSpacing(8)
# 左側資訊區域
info_widget = self._create_info_section()
# 右側控制按鈕區域
control_widget = self._create_control_section()
# 將 info 和 control 加入內容容器
content_layout.addWidget(info_widget)
content_layout.addWidget(control_widget)
# 將內容容器加入主佈局
main_layout.addWidget(content_widget)
def _create_info_section(self):
"""創建資訊顯示區域"""
info_widget = QWidget()
info_layout = QVBoxLayout(info_widget)
info_layout.setContentsMargins(0, 0, 0, 0)
info_layout.setSpacing(4)
# 頂部標題欄
header = QWidget()
header_layout = QHBoxLayout(header)
header_layout.setContentsMargins(0, 0, 0, 0)
# 勾選框
self.checkbox = QCheckBox()
self.checkbox.setObjectName(f"{self.drone_id}_checkbox")
self.checkbox.setStyleSheet("QCheckBox { color: #DDD; }")
self.checkbox.stateChanged.connect(
lambda state: self.selection_changed.emit(self.drone_id, state)
)
# ID 顯示
id_label = QLabel(self.display_id)
id_label.setStyleSheet("""
font-weight: bold;
font-size: 14px;
color: #7FFFD4;
min-width: 80px;
""")
header_layout.addWidget(self.checkbox)
header_layout.addWidget(id_label)
header_layout.addStretch()
info_layout.addWidget(header)
# 第一行:狀態 (模式 + ARM狀態)
status_row = self._create_status_row()
info_layout.addWidget(status_row)
# 第二行:電池
battery_row = self._create_battery_row()
info_layout.addWidget(battery_row)
# 第三行:位置 + 高度
position_row = self._create_position_row()
info_layout.addWidget(position_row)
# 第四行:航向 + 速度
nav_row = self._create_nav_row()
info_layout.addWidget(nav_row)
return info_widget
def _create_status_row(self):
"""創建狀態行"""
status_row = QWidget()
status_layout = QHBoxLayout(status_row)
status_layout.setContentsMargins(0, 0, 0, 0)
status_title = QLabel("狀態:")
status_title.setStyleSheet("color: #888; min-width: 50px;")
self.mode_label = QLabel("--")
self.mode_label.setObjectName(f"{self.drone_id}_mode")
self.mode_label.setStyleSheet("color: #DDD;")
self.armed_label = QLabel("--")
self.armed_label.setObjectName(f"{self.drone_id}_armed")
self.armed_label.setStyleSheet("color: #DDD;")
status_layout.addWidget(status_title)
status_layout.addWidget(self.mode_label)
status_layout.addWidget(self.armed_label)
status_layout.addStretch()
return status_row
def _create_battery_row(self):
"""創建電池行"""
battery_row = QWidget()
battery_layout = QHBoxLayout(battery_row)
battery_layout.setContentsMargins(0, 0, 0, 0)
battery_title = QLabel("電池:")
battery_title.setStyleSheet("color: #888; min-width: 50px;")
self.battery_label = QLabel("--")
self.battery_label.setObjectName(f"{self.drone_id}_battery")
self.battery_label.setStyleSheet("color: #DDD;")
battery_layout.addWidget(battery_title)
battery_layout.addWidget(self.battery_label)
battery_layout.addStretch()
return battery_row
def _create_position_row(self):
"""創建位置行"""
position_row = QWidget()
position_layout = QHBoxLayout(position_row)
position_layout.setContentsMargins(0, 0, 0, 0)
position_title = QLabel("位置:")
position_title.setStyleSheet("color: #888; min-width: 50px;")
self.local_label = QLabel("--")
self.local_label.setObjectName(f"{self.drone_id}_local")
self.local_label.setStyleSheet("color: #DDD;")
altitude_title = QLabel("高度:")
altitude_title.setStyleSheet("color: #888; margin-left: 10px;")
self.altitude_label = QLabel("--")
self.altitude_label.setObjectName(f"{self.drone_id}_altitude")
self.altitude_label.setStyleSheet("color: #DDD;")
position_layout.addWidget(position_title)
position_layout.addWidget(self.local_label)
position_layout.addWidget(altitude_title)
position_layout.addWidget(self.altitude_label)
position_layout.addStretch()
return position_row
def _create_nav_row(self):
"""創建導航行"""
nav_row = QWidget()
nav_layout = QHBoxLayout(nav_row)
nav_layout.setContentsMargins(0, 0, 0, 0)
heading_title = QLabel("航向:")
heading_title.setStyleSheet("color: #888; min-width: 50px;")
self.heading_label = QLabel("--")
self.heading_label.setObjectName(f"{self.drone_id}_heading")
self.heading_label.setStyleSheet("color: #DDD;")
speed_title = QLabel("速度:")
speed_title.setStyleSheet("color: #888; margin-left: 10px;")
self.groundspeed_label = QLabel("--")
self.groundspeed_label.setObjectName(f"{self.drone_id}_groundspeed")
self.groundspeed_label.setStyleSheet("color: #DDD;")
nav_layout.addWidget(heading_title)
nav_layout.addWidget(self.heading_label)
nav_layout.addWidget(speed_title)
nav_layout.addWidget(self.groundspeed_label)
nav_layout.addStretch()
return nav_row
def _create_control_section(self):
"""創建控制按鈕區域"""
control_widget = QWidget()
control_layout = QVBoxLayout(control_widget)
control_layout.setContentsMargins(0, 0, 0, 0)
control_layout.setSpacing(6)
control_widget.setFixedWidth(80)
btn_style = """
QPushButton {
background-color: #444;
color: #DDD;
border: none;
border-radius: 4px;
font-size: 11px;
}
QPushButton:hover {
background-color: #555;
}
"""
# 模式切換按鈕
mode_btn = QPushButton("切換模式")
mode_btn.setStyleSheet(btn_style)
mode_btn.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding)
mode_btn.clicked.connect(lambda: self.mode_change_requested.emit(self.drone_id))
# 解鎖按鈕
arm_btn = QPushButton("解鎖")
arm_btn.setStyleSheet(btn_style)
arm_btn.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding)
arm_btn.clicked.connect(lambda: self.arm_requested.emit(self.drone_id))
# 起飛按鈕
takeoff_btn = QPushButton("起飛")
takeoff_btn.setStyleSheet(btn_style)
takeoff_btn.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding)
takeoff_btn.clicked.connect(lambda: self.takeoff_requested.emit(self.drone_id))
# Setpoint 按鈕
setpoint_btn = QPushButton("Setpoint")
setpoint_btn.setStyleSheet(btn_style)
setpoint_btn.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Expanding)
setpoint_btn.clicked.connect(lambda: self.setpoint_requested.emit(self.drone_id))
control_layout.addWidget(mode_btn)
control_layout.addWidget(arm_btn)
control_layout.addWidget(takeoff_btn)
control_layout.addWidget(setpoint_btn)
return control_widget
def update_field(self, field, text, color=None):
"""更新指定欄位的值"""
label = self.findChild(QLabel, f"{self.drone_id}_{field}")
if label and label.text() != text:
label.setText(text)
if color:
label.setStyleSheet(f"color: {color};")
def get_checkbox(self):
"""獲取勾選框"""
return self.checkbox
def set_checked(self, checked):
"""設置勾選狀態"""
self.checkbox.setChecked(checked)
def is_checked(self):
"""獲取勾選狀態"""
return self.checkbox.isChecked()
class SocketGroupPanel(QWidget):
# 定義信號
group_selection_changed = pyqtSignal(str, int) # socket_id, state
def __init__(self, socket_id, color='#AAAAAA', parent=None):
super().__init__(parent)
self.socket_id = socket_id
self.color = color
self._init_ui()
def _init_ui(self):
"""初始化UI"""
self.setObjectName(f"socket_group_{self.socket_id}")
self.setStyleSheet("""
background-color: #1E1E1E;
border-radius: 12px;
""")
layout = QVBoxLayout(self)
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(6)
# Socket 分組標題行 - 包含勾選框
title_row = QWidget()
title_layout = QHBoxLayout(title_row)
title_layout.setContentsMargins(0, 0, 0, 0)
# 分組勾選框
self.group_checkbox = QCheckBox()
self.group_checkbox.setObjectName(f"socket_{self.socket_id}_checkbox")
self.group_checkbox.setStyleSheet(f"""
QCheckBox {{ color: #DDD; }}
QCheckBox::indicator {{
width: 14px;
height: 14px;
border: 2px solid #888888;
border-radius: 3px;
background: transparent;
}}
QCheckBox::indicator:checked {{
background-color: {self.color};
border: 2px solid #888888;
}}
QCheckBox::indicator:indeterminate {{
background-color: #666;
border: 2px solid #888888;
}}
""")
self.group_checkbox.stateChanged.connect(
lambda state: self.group_selection_changed.emit(self.socket_id, state)
)
# Socket 分組標題
title_label = QLabel(f"Socket {self.socket_id}")
title_label.setStyleSheet(f"""
font-weight: bold;
font-size: 16px;
color: {self.color};
margin-bottom: 8px;
padding: 4px 8px;
border-radius: 6px;
""")
title_layout.addWidget(self.group_checkbox)
title_layout.addWidget(title_label)
title_layout.addStretch()
layout.addWidget(title_row)
# 創建子容器用於放置該 socket 下的所有無人機面板
self.drones_container = QWidget()
self.drones_layout = QVBoxLayout(self.drones_container)
self.drones_layout.setContentsMargins(0, 0, 0, 0)
self.drones_layout.setSpacing(4)
layout.addWidget(self.drones_container)
def add_drone_panel(self, panel):
"""添加無人機面板到分組"""
self.drones_layout.addWidget(panel)
def clear_drones(self):
"""清空所有無人機面板"""
while self.drones_layout.count():
item = self.drones_layout.takeAt(0)
if item.widget():
item.widget().setParent(None)
def get_checkbox(self):
"""獲取分組勾選框"""
return self.group_checkbox
def set_checked(self, checked):
"""設置分組勾選狀態"""
self.group_checkbox.setChecked(checked)
def set_check_state(self, state):
"""設置分組勾選狀態(支持半選)"""
self.group_checkbox.setCheckState(state)

File diff suppressed because it is too large Load Diff

@ -0,0 +1,423 @@
#!/usr/bin/env python3
from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtCore import QTimer, pyqtSignal, QObject, pyqtSlot
from PyQt6.QtWebChannel import QWebChannel
class DroneMap:
"""無人機地圖類別 - 負責管理 Leaflet 地圖顯示"""
def __init__(self):
"""初始化地圖"""
self.map_view = QWebEngineView()
self.map_loaded = False
self.pending_map_updates = {}
# 創建橋接對象
self.bridge = MapBridge()
# 設置 QWebChannel
self.channel = QWebChannel()
self.channel.registerObject('bridge', self.bridge)
self.map_view.page().setWebChannel(self.channel)
# 設置地圖 HTML
inline_html = '''
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<link rel="stylesheet" href="https://unpkg.com/leaflet/dist/leaflet.css" />
<script src="https://unpkg.com/leaflet/dist/leaflet.js"></script>
<script src="https://unpkg.com/leaflet-rotatedmarker/leaflet.rotatedMarker.js"></script>
<script src="qrc:///qtwebchannel/qwebchannel.js"></script>
<style>
html, body, #map { height: 100%; margin: 0; }
.map-controls {
position: absolute;
top: 10px;
right: 10px;
z-index: 1000;
}
.control-button {
padding: 8px 12px;
background-color: #f44336;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 12px;
box-shadow: 0 2px 5px rgba(0,0,0,0.2);
}
.control-button:hover {
background-color: #d32f2f;
}
</style>
</head>
<body>
<div id="map"></div>
<div class="map-controls">
<button class="control-button" onclick="clearAllTrajectories()">清除軌跡</button>
</div>
<script>
var bridge;
new QWebChannel(qt.webChannelTransport, function(channel) {
bridge = channel.objects.bridge;
});
var map = L.map('map').setView([0, 0], 19);
L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
maxZoom: 19,
attribution: '© OpenStreetMap contributors'
}).addTo(map);
// 地圖點擊事件
map.on('click', function(e) {
if (bridge) {
bridge.emitGpsSignal(e.latlng.lat, e.latlng.lng);
console.log('點擊位置:', e.latlng.lat, e.latlng.lng);
}
});
var arrowIcon = L.icon({
iconUrl: 'https://cdn-icons-png.flaticon.com/512/399/399308.png',
iconSize: [40, 40],
iconAnchor: [20, 20]
});
function getColorBySocketId(id) {
if (id.startsWith("s0_")) return "#00BFFF"; // 天藍色
if (id.startsWith("s1_")) return "#FFD700"; // 金色
if (id.startsWith("s2_")) return "#FF6969"; // 淺紅色
if (id.startsWith("s3_")) return "#FF69B4"; // 熱粉紅
if (id.startsWith("s4_")) return "#00FA9A"; // 中春綠
if (id.startsWith("s5_")) return "#9370DB"; // 中紫色 (串口)
if (id.startsWith("s6_")) return "#FFA500"; // 橙色
if (id.startsWith("s7_")) return "#20B2AA"; // 淺海綠
if (id.startsWith("s8_")) return "#7CFC00"; // 草綠色
if (id.startsWith("s9_")) return "#FF8C00"; // 深橙色
return "#AAAAAA"; // 灰色 (預設)
}
function createIdIcon(id) {
const color = getColorBySocketId(id);
const sysid = id.split('_')[1];
return L.divIcon({
className: 'drone-id',
html: `<div style="
position: relative;
left: 2px;
background-color: ${color};
color: black;
width: 16px;
height: 16px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
font-weight: bold;
font-size: 10px;
">${sysid}</div>`,
iconSize: [20, 20],
iconAnchor: [10, 10]
});
}
var markers = {};
var idLabels = {};
var trajectories = {};
var trajectoryLines = {};
var focusedId = null;
var initialized = false;
var trajectoryGroup = L.layerGroup().addTo(map);
// ================================================================================
// 新增任務規劃視覺化變數
// ================================================================================
var missionPlanGroup = L.layerGroup().addTo(map); // 任務規劃圖層
var centerMarker = null; // 中心點標記
var targetMarker = null; // 目標點標記
var missionLine = null; // 中心點到目標點的虛線
// ================================================================================
function initTrajectory(id) {
if (!trajectories[id]) {
trajectories[id] = [];
const color = getColorBySocketId(id);
trajectoryLines[id] = L.polyline([], {
color: color,
weight: 3,
opacity: 0.7,
smoothFactor: 1
}).addTo(trajectoryGroup);
}
}
function addTrajectoryPoint(id, lat, lon) {
initTrajectory(id);
const point = [lat, lon];
trajectories[id].push(point);
if (trajectories[id].length > 1000) {
trajectories[id].shift();
}
trajectoryLines[id].setLatLngs([...trajectories[id]]);
}
function focusOn(id) {
if (!markers[id]) return;
focusedId = id;
var latlng = markers[id].getLatLng();
map.flyTo(latlng, map.getZoom());
}
setInterval(() => {
if (focusedId && markers[focusedId]) {
var latlng = markers[focusedId].getLatLng();
map.panTo(latlng);
}
}, 1000);
function updateDrone(lat, lon, id, heading) {
if (markers[id]) {
const lastPos = markers[id].getLatLng();
const distance = lastPos.distanceTo([lat, lon]);
if (distance > 1) {
addTrajectoryPoint(id, lat, lon);
}
markers[id]
.setLatLng([lat, lon])
.setRotationAngle(heading);
idLabels[id].setLatLng([lat, lon]);
} else {
initTrajectory(id);
addTrajectoryPoint(id, lat, lon);
markers[id] = L.marker([lat, lon], {
icon: arrowIcon,
rotationAngle: heading,
rotationOrigin: 'center'
})
.on('click', function () {
focusOn(id);
})
.addTo(map);
idLabels[id] = L.marker([lat, lon], {
icon: createIdIcon(id),
zIndexOffset: 1000
})
.on('click', function() {
focusOn(id);
})
.addTo(map);
if (!initialized || id < focusedId) {
focusOn(id);
initialized = true;
}
}
}
function clearAllTrajectories() {
trajectories = {};
Object.values(trajectoryLines).forEach(line => {
trajectoryGroup.removeLayer(line);
});
trajectoryLines = {};
console.log('所有軌跡已清除');
}
// ================================================================================
// 新增任務規劃視覺化函式
// ================================================================================
function drawMissionPlan(centerLat, centerLon, targetLat, targetLon) {
// 清除舊的任務規劃標記
clearMissionPlan();
// 繪製中心點標記 "C"縮小到 20px
var centerIcon = L.divIcon({
className: 'mission-center',
html: `<div style="
background-color: #FF4444;
color: white;
width: 20px;
height: 20px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
font-weight: bold;
font-size: 12px;
border: 2px solid white;
box-shadow: 0 2px 5px rgba(0,0,0,0.3);
">C</div>`,
iconSize: [20, 20],
iconAnchor: [10, 10]
});
centerMarker = L.marker([centerLat, centerLon], {
icon: centerIcon,
zIndexOffset: 2000
}).addTo(missionPlanGroup);
// 繪製目標點標記 ""星星符號
var targetIcon = L.divIcon({
className: 'mission-target',
html: `<div style="
background-color: #FFD700;
color: #FF4444;
width: 20px;
height: 20px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
font-weight: bold;
font-size: 14px;
border: 2px solid white;
box-shadow: 0 2px 5px rgba(0,0,0,0.3);
">★</div>`,
iconSize: [20, 20],
iconAnchor: [10, 10]
});
targetMarker = L.marker([targetLat, targetLon], {
icon: targetIcon,
zIndexOffset: 2000
}).addTo(missionPlanGroup);
// 繪製中心點到目標點的虛線
missionLine = L.polyline(
[[centerLat, centerLon], [targetLat, targetLon]],
{
color: '#FF4444',
weight: 3,
opacity: 0.8,
dashArray: '10, 10', // 虛線樣式
smoothFactor: 1
}
).addTo(missionPlanGroup);
console.log('任務規劃已繪製: C(' + centerLat + ', ' + centerLon + ') -> T(' + targetLat + ', ' + targetLon + ')');
}
function clearMissionPlan() {
// 清除中心點標記
if (centerMarker) {
missionPlanGroup.removeLayer(centerMarker);
centerMarker = null;
}
// 清除目標點標記
if (targetMarker) {
missionPlanGroup.removeLayer(targetMarker);
targetMarker = null;
}
// 清除任務線
if (missionLine) {
missionPlanGroup.removeLayer(missionLine);
missionLine = null;
}
console.log('任務規劃已清除');
}
// ================================================================================
</script>
</body>
</html>
'''
self.map_view.setHtml(inline_html)
self.map_view.loadFinished.connect(self._on_map_loaded)
# 設置地圖更新計時器
self.map_update_timer = QTimer()
self.map_update_timer.timeout.connect(self.update_map_positions)
self.map_update_timer.start(200) # 每 200ms 更新一次
def _on_map_loaded(self, ok: bool):
"""地圖加載完成回調"""
if ok:
self.map_loaded = True
else:
print("⚠️ 地圖加載失敗")
def update_drone_position(self, drone_id, lat, lon, heading):
"""更新無人機位置(加入待處理隊列)"""
self.pending_map_updates[drone_id] = (lat, lon, heading)
def update_map_positions(self):
"""批量更新地圖上的無人機位置"""
if not self.map_loaded or not self.pending_map_updates:
return
# 批量執行所有待更新的位置
js_commands = []
for drone_id, (lat, lon, heading) in self.pending_map_updates.items():
js_commands.append(f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});")
if js_commands:
combined_js = "\n".join(js_commands)
self.map_view.page().runJavaScript(combined_js)
# 清空待更新緩存
self.pending_map_updates.clear()
def clear_trajectories(self):
"""清除所有軌跡"""
if self.map_loaded:
self.map_view.page().runJavaScript("clearAllTrajectories();")
def focus_on_drone(self, drone_id):
"""聚焦到指定無人機"""
if self.map_loaded:
self.map_view.page().runJavaScript(f"focusOn('{drone_id}');")
# ================================================================================
# 【新增】任務規劃視覺化方法
# ================================================================================
def draw_mission_plan(self, center_lat, center_lon, target_lat, target_lon):
"""
在地圖上繪製任務規劃
Args:
center_lat: 中心點緯度
center_lon: 中心點經度
target_lat: 目標點緯度
target_lon: 目標點經度
"""
if self.map_loaded:
js_code = f"drawMissionPlan({center_lat:.6f}, {center_lon:.6f}, {target_lat:.6f}, {target_lon:.6f});"
self.map_view.page().runJavaScript(js_code)
print(f"📍 地圖已繪製任務規劃: C({center_lat:.6f}, {center_lon:.6f}) -> T({target_lat:.6f}, {target_lon:.6f})")
def clear_mission_plan(self):
"""清除地圖上的任務規劃標記"""
if self.map_loaded:
self.map_view.page().runJavaScript("clearMissionPlan();")
print("🗑️ 地圖已清除任務規劃")
# ================================================================================
def get_widget(self):
"""獲取地圖 widget"""
return self.map_view
def get_gps_signal(self):
"""獲取 GPS 信號"""
return self.bridge.gps_signal
class MapBridge(QObject):
"""JavaScript 和 Python 之間的橋接類"""
gps_signal = pyqtSignal(float, float) # lat, lon
def __init__(self):
super().__init__()
@pyqtSlot(float, float)
def emitGpsSignal(self, lat, lon):
"""供 JavaScript 調用的方法"""
self.gps_signal.emit(lat, lon)

@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""
飛行任務規劃模組
負責將 GPS 點擊座標轉換為隊形飛行任務
"""
import math
from typing import List, Tuple, Optional
class CoordinateConverter:
"""GPS 座標與 Local 座標的轉換器"""
def __init__(self, origin_lat: float, origin_lon: float, origin_alt: float = 0):
"""
初始化座標轉換器
Args:
origin_lat: 參考原點緯度
origin_lon: 參考原點經度
origin_alt: 參考原點高度公尺相對於海平面
"""
self.origin_lat = origin_lat
self.origin_lon = origin_lon
self.origin_alt = origin_alt
self.R = 6371000.0 # 地球半徑(公尺)
def gps_to_local(self, lat: float, lon: float, alt: float) -> Tuple[float, float, float]:
"""
GPS 座標轉換為 Local 座標ENU 系統East-North-Up
Args:
lat: 緯度
lon: 經度
alt: 高度公尺相對於海平面
Returns:
(x, y, z): Local 座標公尺
x: East東方
y: North北方
z: Up向上
"""
lat_rad = math.radians(lat)
lon_rad = math.radians(lon)
origin_lat_rad = math.radians(self.origin_lat)
origin_lon_rad = math.radians(self.origin_lon)
dlat = lat_rad - origin_lat_rad
dlon = lon_rad - origin_lon_rad
# 使用 Equirectangular projection適用於小範圍 < 10 km
x = self.R * dlon * math.cos((lat_rad + origin_lat_rad) / 2) # East
y = self.R * dlat # North
z = alt - self.origin_alt # Up
return x, y, z
def local_to_gps(self, x: float, y: float, z: float) -> Tuple[float, float, float]:
"""
Local 座標轉換為 GPS 座標
Args:
x: East東方公尺
y: North北方公尺
z: Up向上公尺
Returns:
(lat, lon, alt): GPS 座標
"""
origin_lat_rad = math.radians(self.origin_lat)
origin_lon_rad = math.radians(self.origin_lon)
lat_rad = origin_lat_rad + (y / self.R)
lon_rad = origin_lon_rad + (x / (self.R * math.cos((lat_rad + origin_lat_rad) / 2)))
lat = math.degrees(lat_rad)
lon = math.degrees(lon_rad)
alt = self.origin_alt + z
return lat, lon, alt
class FormationPlanner:
"""隊形規劃器"""
def __init__(self, spacing: float = 5.0,
base_altitude: float = 10.0,
altitude_diff: float = 2.0):
"""
初始化隊形規劃器
Args:
spacing: 無人機間距公尺
base_altitude: 基準飛行高度公尺相對於參考原點
altitude_diff: M 字形的高低差公尺
"""
self.spacing = spacing
self.base_altitude = base_altitude
self.altitude_diff = altitude_diff
self.current_origin = None
self.converter = None
def plan_formation_mission(self,
drone_gps_positions: List[Tuple[float, float, float]],
target_gps: Tuple[float, float, float]) -> Tuple[List[Tuple[float, float, float]],
List[Tuple[float, float, float]]]:
"""
規劃兩階段隊形任務
Args:
drone_gps_positions: 當前無人機 GPS 位置列表 [(lat, lon, alt), ...]
target_gps: 目標點 GPS 座標 (lat, lon, alt)
Returns:
stage1_gps: 階段 1集合點 GPS 航點列表
stage2_gps: 階段 2目標點 GPS 航點列表
origin: 中心點參考原點 GPS 座標 (lat, lon, alt)
origin: 中心點參考原點 GPS 座標 (lat, lon, alt)
"""
if len(drone_gps_positions) == 0:
raise ValueError("無人機位置列表不能為空")
# ===== 步驟 1: 計算中央點並設為參考原點 =====
center_lat = sum(pos[0] for pos in drone_gps_positions) / len(drone_gps_positions)
center_lon = sum(pos[1] for pos in drone_gps_positions) / len(drone_gps_positions)
center_alt = sum(pos[2] for pos in drone_gps_positions) / len(drone_gps_positions)
self.current_origin = (center_lat, center_lon, center_alt)
self.converter = CoordinateConverter(center_lat, center_lon, center_alt)
print(f"📍 參考原點: ({center_lat:.6f}, {center_lon:.6f}, {center_alt:.1f}m)")
# ===== 步驟 2: 轉換到 Local 座標系 =====
drone_local_positions = []
for lat, lon, alt in drone_gps_positions:
x, y, z = self.converter.gps_to_local(lat, lon, alt)
drone_local_positions.append((x, y, z))
target_local = self.converter.gps_to_local(
target_gps[0], target_gps[1], target_gps[2]
)
# ===== 步驟 3: 在 Local 座標系中計算隊形 =====
stage1_local, stage2_local = self._calculate_formation_local(
drone_local_positions,
target_local
)
# ===== 步驟 4: 轉回 GPS 座標 =====
stage1_gps = []
for x, y, z in stage1_local:
lat, lon, alt = self.converter.local_to_gps(x, y, z)
stage1_gps.append((lat, lon, alt))
stage2_gps = []
for x, y, z in stage2_local:
lat, lon, alt = self.converter.local_to_gps(x, y, z)
stage2_gps.append((lat, lon, alt))
print(f"✅ 任務規劃完成: {len(stage1_gps)} 台無人機2 階段飛行")
# ================================================================================
# 【修改】回傳中心點座標供地圖視覺化使用
# ================================================================================
return stage1_gps, stage2_gps, self.current_origin
# ================================================================================
def _calculate_formation_local(self,
drone_positions: List[Tuple[float, float, float]],
target_point: Tuple[float, float, float]) -> Tuple[List[Tuple[float, float, float]],
List[Tuple[float, float, float]]]:
"""
Local 座標系中計算隊形
Args:
drone_positions: Local 座標的無人機位置 [(x, y, z), ...]
target_point: Local 座標的目標點 (x, y, z)
Returns:
stage1_positions: 階段 1中央點分佈 Local 座標
stage2_positions: 階段 2目標點分佈 Local 座標
"""
N = len(drone_positions)
# ===== 步驟 1: 計算中央點(在 Local 座標系中應該接近原點)=====
C_x = sum(pos[0] for pos in drone_positions) / N
C_y = sum(pos[1] for pos in drone_positions) / N
C_z = sum(pos[2] for pos in drone_positions) / N
print(f" 中央點 Local: ({C_x:.2f}, {C_y:.2f}, {C_z:.2f})")
# ===== 步驟 2: 計算方向向量(中央點 → 目標點)=====
T_x, T_y, T_z = target_point
V_x = T_x - C_x
V_y = T_y - C_y
print(f" 方向向量: ({V_x:.2f}, {V_y:.2f})")
# ===== 步驟 3: 計算垂直向量(逆時針旋轉 90°=====
P_x = -V_y
P_y = V_x
# 單位化垂直向量
length = math.sqrt(P_x**2 + P_y**2)
if length < 0.01: # 避免除以零
# 如果目標點與中央點重合,使用默認方向(向東)
P_x_unit, P_y_unit = 1.0, 0.0
else:
P_x_unit = P_x / length
P_y_unit = P_y / length
print(f" 垂直單位向量: ({P_x_unit:.3f}, {P_y_unit:.3f})")
# ===== 步驟 4: 計算無人機在垂直方向的投影並排序 =====
projections = []
for i, pos in enumerate(drone_positions):
relative_x = pos[0] - C_x
relative_y = pos[1] - C_y
projection = relative_x * P_x_unit + relative_y * P_y_unit
projections.append((projection, i))
# 按投影值排序(保持左右順序)
projections.sort()
sorted_indices = [idx for _, idx in projections]
print(f" 排序後的無人機索引: {sorted_indices}")
# ===== 步驟 5: 分佈無人機位置 =====
stage1_positions = [None] * N # 預分配列表
stage2_positions = [None] * N
for rank, original_idx in enumerate(sorted_indices):
# 計算相對中心的水平偏移
offset = (rank - (N - 1) / 2) * self.spacing
# 計算 M 字形高度(交替高低)
if rank % 2 == 0:
altitude = self.base_altitude + self.altitude_diff # 高
else:
altitude = self.base_altitude - self.altitude_diff # 低
# 階段 1在中央點附近分佈
stage1_x = C_x + P_x_unit * offset
stage1_y = C_y + P_y_unit * offset
stage1_z = altitude
# 階段 2在目標點附近分佈保持相同的相對位置
stage2_x = T_x + P_x_unit * offset
stage2_y = T_y + P_y_unit * offset
stage2_z = altitude
# 按照原始索引存儲(保持無人機 ID 順序)
stage1_positions[original_idx] = (stage1_x, stage1_y, stage1_z)
stage2_positions[original_idx] = (stage2_x, stage2_y, stage2_z)
return stage1_positions, stage2_positions
def update_parameters(self, spacing: Optional[float] = None,
base_altitude: Optional[float] = None,
altitude_diff: Optional[float] = None):
"""
更新隊形參數
Args:
spacing: 無人機間距公尺
base_altitude: 基準飛行高度公尺
altitude_diff: M 字形的高低差公尺
"""
if spacing is not None:
self.spacing = spacing
print(f" 間距更新為: {spacing} m")
if base_altitude is not None:
self.base_altitude = base_altitude
print(f" 基準高度更新為: {base_altitude} m")
if altitude_diff is not None:
self.altitude_diff = altitude_diff
print(f" 高低差更新為: {altitude_diff} m")
# ===== 測試程式碼 =====
if __name__ == "__main__":
print("=" * 60)
print("隊形任務規劃器測試")
print("=" * 60)
# 模擬 5 台無人機的 GPS 位置
drone_gps = [
(24.123450, 120.567800, 100.0), # 無人機 0
(24.123470, 120.567820, 102.0), # 無人機 1
(24.123440, 120.567810, 98.0), # 無人機 2
(24.123460, 120.567830, 101.0), # 無人機 3
(24.123445, 120.567795, 99.0), # 無人機 4
]
# 目標點
target_gps = (24.123600, 120.568000, 105.0)
# 建立規劃器
planner = FormationPlanner(
spacing=5.0, # 5 公尺間距
base_altitude=10.0, # 基準高度 10 公尺
altitude_diff=2.0 # 高低差 2 公尺
)
# 規劃任務
print("\n開始規劃任務...")
stage1, stage2 = planner.plan_formation_mission(drone_gps, target_gps)
# 顯示結果
print("\n" + "=" * 60)
print("階段 1集合點位置GPS")
print("=" * 60)
for i, (lat, lon, alt) in enumerate(stage1):
print(f"無人機 {i}: ({lat:.6f}, {lon:.6f}, {alt:.1f}m)")
print("\n" + "=" * 60)
print("階段 2目標點位置GPS")
print("=" * 60)
for i, (lat, lon, alt) in enumerate(stage2):
print(f"無人機 {i}: ({lat:.6f}, {lon:.6f}, {alt:.1f}m)")
print("\n✅ 測試完成!")

@ -0,0 +1 @@
Subproject commit a96224f9ab3ac51fe8fd981c1e1554528dc4345a

@ -0,0 +1 @@
Subproject commit 24806adc767414eb3a34a58aefeb648ee415b09a
Loading…
Cancel
Save