Update GUI 2.6.0: trajectory, color, window, serial JSON

ken910606 3 weeks ago
parent 0ee7dd1fcf
commit 5231cffcb2

@ -247,7 +247,7 @@ class CommPanel(QWidget):
self.serial_baudrate_combo = QComboBox() self.serial_baudrate_combo = QComboBox()
self.serial_baudrate_combo.addItems(['9600', '19200', '38400', '57600', '115200']) self.serial_baudrate_combo.addItems(['9600', '19200', '38400', '57600', '115200'])
self.serial_baudrate_combo.setCurrentText('57600') self.serial_baudrate_combo.setCurrentText('115200')
self.serial_baudrate_combo.setFixedWidth(100) self.serial_baudrate_combo.setFixedWidth(100)
_set_scaled_stylesheet(self.serial_baudrate_combo, """ _set_scaled_stylesheet(self.serial_baudrate_combo, """
QComboBox { QComboBox {

@ -83,7 +83,10 @@ class JsonTelemetryProcessor:
"position": {"lat": 24.0, "lon": 120.0}, "position": {"lat": 24.0, "lon": 120.0},
"heading": 90 "heading": 90
} }
Serial JSON uses this same shape; only the transport/framing differs. Serial JSON also accepts the compact UAV.py shape:
{"s": 1, "m": "GUIDED", "a": 1, "b": 85, "h": 10.0,
"v": 4.2, "p": [24.0, 120.0], "ypr": [90.0, 0.0, 0.0],
"g": 3, "d": [0.8, 1.2]}
""" """
def _emit_json_connection_type(self, drone_id): def _emit_json_connection_type(self, drone_id):
@ -103,19 +106,21 @@ class JsonTelemetryProcessor:
if not isinstance(data, dict): if not isinstance(data, dict):
return return
system_id = data.get('system_id', data.get('sysid')) system_id = data.get('system_id', data.get('sysid', data.get('s')))
if system_id is None: if system_id is None:
return return
drone_id = f"s{self.socket_id}_{system_id}" drone_id = f"s{self.socket_id}_{system_id}"
self._emit_json_connection_type(drone_id) self._emit_json_connection_type(drone_id)
mode = data.get('mode', data.get('mode_name')) mode = data.get('mode', data.get('mode_name', data.get('m')))
state = {} state = {}
if mode is not None: if mode is not None:
state['mode'] = mode state['mode'] = mode
if 'armed' in data: if 'armed' in data:
state['armed'] = data.get('armed') state['armed'] = data.get('armed')
elif 'a' in data:
state['armed'] = bool(data.get('a'))
if state: if state:
self.signals.update_signal.emit('state', drone_id, state) self.signals.update_signal.emit('state', drone_id, state)
@ -144,6 +149,10 @@ class JsonTelemetryProcessor:
if 'battery_voltage' in data: if 'battery_voltage' in data:
battery_data['voltage'] = data.get('battery_voltage') battery_data['voltage'] = data.get('battery_voltage')
self.signals.update_signal.emit('battery', drone_id, battery_data) self.signals.update_signal.emit('battery', drone_id, battery_data)
elif 'b' in data:
self.signals.update_signal.emit('battery', drone_id, {
'percentage': data.get('b')
})
pos = data.get('position') pos = data.get('position')
if isinstance(pos, dict): if isinstance(pos, dict):
@ -159,6 +168,22 @@ class JsonTelemetryProcessor:
'lon': data.get('lon', data.get('longitude', 0)), 'lon': data.get('lon', data.get('longitude', 0)),
'alt': data.get('alt', data.get('altitude', 0)) 'alt': data.get('alt', data.get('altitude', 0))
}) })
elif isinstance(data.get('p'), (list, tuple)) and len(data.get('p')) >= 2:
dop = data.get('d') if isinstance(data.get('d'), (list, tuple)) else []
gps_data = {
'lat': data['p'][0],
'lon': data['p'][1],
'alt': data.get('h', 0)
}
if 'g' in data:
gps_data['fix_type'] = data.get('g')
if len(dop) >= 1:
gps_data['eph'] = dop[0]
gps_data['hdop'] = dop[0]
if len(dop) >= 2:
gps_data['epv'] = dop[1]
gps_data['vdop'] = dop[1]
self.signals.update_signal.emit('gps', drone_id, gps_data)
local = data.get('local_position', data.get('local_pose', data.get('local'))) local = data.get('local_position', data.get('local_pose', data.get('local')))
if isinstance(local, dict): if isinstance(local, dict):
@ -183,6 +208,29 @@ class JsonTelemetryProcessor:
self.signals.update_signal.emit('altitude', drone_id, { self.signals.update_signal.emit('altitude', drone_id, {
'altitude': alt 'altitude': alt
}) })
elif 'h' in data:
height = data.get('h', 0.0)
self.signals.update_signal.emit('local_pose', drone_id, {
'x': 0.0,
'y': 0.0,
'z': height
})
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': height
})
elif (
isinstance(data.get('p'), (list, tuple))
or 'lat' in data
or 'latitude' in data
):
self.signals.update_signal.emit('local_pose', drone_id, {
'x': 0.0,
'y': 0.0,
'z': 0.0
})
self.signals.update_signal.emit('altitude', drone_id, {
'altitude': 0.0
})
velocity = data.get('velocity') velocity = data.get('velocity')
if isinstance(velocity, dict): if isinstance(velocity, dict):
@ -191,6 +239,12 @@ class JsonTelemetryProcessor:
'vy': velocity.get('vy', velocity.get('y', 0.0)), 'vy': velocity.get('vy', velocity.get('y', 0.0)),
'vz': velocity.get('vz', velocity.get('z', 0.0)) 'vz': velocity.get('vz', velocity.get('z', 0.0))
}) })
elif 'v' in data:
self.signals.update_signal.emit('velocity', drone_id, {
'vx': data.get('v', 0.0),
'vy': 0.0,
'vz': 0.0
})
attitude = data.get('attitude') attitude = data.get('attitude')
if isinstance(attitude, dict): if isinstance(attitude, dict):
@ -200,12 +254,28 @@ class JsonTelemetryProcessor:
'yaw': attitude.get('yaw', 0.0), 'yaw': attitude.get('yaw', 0.0),
'rates': attitude.get('rates', (0.0, 0.0, 0.0)) 'rates': attitude.get('rates', (0.0, 0.0, 0.0))
}) })
elif isinstance(data.get('ypr'), (list, tuple)) and len(data.get('ypr')) >= 3:
yaw, pitch, roll = data['ypr'][0], data['ypr'][1], data['ypr'][2]
self.signals.update_signal.emit('attitude', drone_id, {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'rates': (0.0, 0.0, 0.0)
})
hud = data.get('hud', {}) hud = data.get('hud', {})
if not isinstance(hud, dict): if not isinstance(hud, dict):
hud = {} hud = {}
if 'heading' in data: if 'heading' in data:
hud['heading'] = data.get('heading') hud['heading'] = data.get('heading')
elif 'y' in data:
hud['heading'] = data.get('y')
elif isinstance(data.get('ypr'), (list, tuple)) and len(data.get('ypr')) >= 1:
hud['heading'] = data['ypr'][0]
if 'v' in data and 'groundspeed' not in hud:
hud['groundspeed'] = data.get('v')
if 'h' in data and 'alt' not in hud and 'altitude' not in hud:
hud['alt'] = data.get('h')
if hud: if hud:
self.signals.update_signal.emit('hud', drone_id, { self.signals.update_signal.emit('hud', drone_id, {
'heading': hud.get('heading', 0.0), 'heading': hud.get('heading', 0.0),
@ -229,6 +299,7 @@ class UDPMavlinkReceiver(threading.Thread):
self.connection_name = connection_name self.connection_name = connection_name
self.monitor = monitor # 保存 monitor 引用 self.monitor = monitor # 保存 monitor 引用
self.socket_id = monitor.get_next_socket_id() if monitor else 0 self.socket_id = monitor.get_next_socket_id() if monitor else 0
self._socket_id_released = False
self.running = False self.running = False
self.sock = None self.sock = None
@ -258,6 +329,7 @@ class UDPMavlinkReceiver(threading.Thread):
finally: finally:
if self.sock: if self.sock:
self.sock.close() self.sock.close()
self._release_socket_id()
def process_mavlink_message(self, msg): def process_mavlink_message(self, msg):
"""處理 MAVLink 訊息""" """處理 MAVLink 訊息"""
@ -343,6 +415,12 @@ class UDPMavlinkReceiver(threading.Thread):
def stop(self): def stop(self):
"""停止接收器""" """停止接收器"""
self.running = False self.running = False
self._release_socket_id()
def _release_socket_id(self):
if self.monitor and not self._socket_id_released:
self.monitor.release_socket_id(self.socket_id)
self._socket_id_released = True
class SerialMavlinkReceiver(threading.Thread, JsonTelemetryProcessor): class SerialMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
"""串口遙測接收器,可自動處理 MAVLink 或 WebSocket 格式 JSON。""" """串口遙測接收器,可自動處理 MAVLink 或 WebSocket 格式 JSON。"""
@ -354,6 +432,7 @@ class SerialMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
self.connection_name = connection_name self.connection_name = connection_name
self.monitor = monitor # 保存 monitor 引用 self.monitor = monitor # 保存 monitor 引用
self.socket_id = monitor.get_next_socket_id() if monitor else 0 self.socket_id = monitor.get_next_socket_id() if monitor else 0
self._socket_id_released = False
self.source_type = 'Serial' self.source_type = 'Serial'
self.running = False self.running = False
self.serial_conn = None self.serial_conn = None
@ -402,6 +481,7 @@ class SerialMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
self.serial_conn.close() self.serial_conn.close()
except Exception: except Exception:
pass pass
self._release_socket_id()
def _process_mavlink_byte(self, byte): def _process_mavlink_byte(self, byte):
try: try:
@ -562,6 +642,12 @@ class SerialMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
self.serial_conn.close() self.serial_conn.close()
except Exception: except Exception:
pass pass
self._release_socket_id()
def _release_socket_id(self):
if self.monitor and not self._socket_id_released:
self.monitor.release_socket_id(self.socket_id)
self._socket_id_released = True
class WebSocketMavlinkReceiver(threading.Thread, JsonTelemetryProcessor): class WebSocketMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
"""WebSocket MAVLink 接收器""" """WebSocket MAVLink 接收器"""
@ -572,6 +658,7 @@ class WebSocketMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
self.connection_name = connection_name self.connection_name = connection_name
self.monitor = monitor # 保存 monitor 引用 self.monitor = monitor # 保存 monitor 引用
self.socket_id = monitor.get_next_socket_id() if monitor else 0 # 一次性分配 socket_id self.socket_id = monitor.get_next_socket_id() if monitor else 0 # 一次性分配 socket_id
self._socket_id_released = False
self.source_type = 'WS' self.source_type = 'WS'
self.running = False self.running = False
self.max_retries = 5 self.max_retries = 5
@ -629,6 +716,7 @@ class WebSocketMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
break break
print(f"WebSocket client {self.connection_name} stopped") print(f"WebSocket client {self.connection_name} stopped")
self._release_socket_id()
def process_websocket_message(self, data): def process_websocket_message(self, data):
"""處理 WebSocket 訊息""" """處理 WebSocket 訊息"""
@ -637,6 +725,12 @@ class WebSocketMavlinkReceiver(threading.Thread, JsonTelemetryProcessor):
def stop(self): def stop(self):
"""停止接收器""" """停止接收器"""
self.running = False self.running = False
self._release_socket_id()
def _release_socket_id(self):
if self.monitor and not self._socket_id_released:
self.monitor.release_socket_id(self.socket_id)
self._socket_id_released = True
class DroneMonitor(Node): class DroneMonitor(Node):
# Subscribe to drone ROS2 topics # Subscribe to drone ROS2 topics
@ -675,7 +769,7 @@ class DroneMonitor(Node):
# 【新增】Socket ID 重新分配機制 (從 0 開始) # 【新增】Socket ID 重新分配機制 (從 0 開始)
# ================================================================================ # ================================================================================
self.socket_id_mapping = {} # {原始socket_id: 重新分配的socket_id} self.socket_id_mapping = {} # {原始socket_id: 重新分配的socket_id}
self.socket_id_counter = 0 # 當前分配到的最大socket_id self.active_socket_ids = set() # 目前通訊連線使用中的 socket_id
self.socket_id_lock = Lock() # 線程安全鎖 self.socket_id_lock = Lock() # 線程安全鎖
# ================================================================================ # ================================================================================
@ -708,23 +802,55 @@ class DroneMonitor(Node):
self.create_timer(1.0, self.scan_topics) self.create_timer(1.0, self.scan_topics)
def get_next_socket_id(self): def get_next_socket_id(self):
"""获取下一个可用的 socket_id从 0 开始连续分配)""" """取得目前最小的未使用 socket_id從 0 開始)。"""
with self.socket_id_lock: with self.socket_id_lock:
current_id = self.socket_id_counter socket_id = 0
self.socket_id_counter += 1 while socket_id in self.active_socket_ids:
return current_id socket_id += 1
self.active_socket_ids.add(socket_id)
return socket_id
def release_socket_id(self, socket_id):
"""釋放通訊連線使用的 socket_id讓後續連線可重用最小空缺。"""
try:
socket_id = int(socket_id)
except (TypeError, ValueError):
return
with self.socket_id_lock:
if socket_id in getattr(self, 'socket_id_mapping', {}).values():
return
if socket_id in getattr(self, 'sys_to_socket_id', {}).values():
return
self.active_socket_ids.discard(socket_id)
def get_or_assign_socket_id(self, original_socket_id): def get_or_assign_socket_id(self, original_socket_id):
"""根據原始 socket_id 分配或獲取對應的 socket_id從 0 開始連續分配) """ROS2 socket_id 映射。
同一個原始 socket_id 會得到同一個分配的 ID
有原始 socket_id=N 時優先使用 N N 已被其他通訊占用
才改用目前最小未使用 ID同一個原始 socket_id 會得到同一個映射
""" """
original_socket_id = str(original_socket_id) original_socket_id = str(original_socket_id)
with self.socket_id_lock: with self.socket_id_lock:
if original_socket_id not in self.socket_id_mapping: if original_socket_id not in self.socket_id_mapping:
# 分配新的 socket_id try:
self.socket_id_mapping[original_socket_id] = self.socket_id_counter preferred_socket_id = int(original_socket_id)
self.socket_id_counter += 1 except (TypeError, ValueError):
preferred_socket_id = None
if (
preferred_socket_id is not None
and preferred_socket_id >= 0
and preferred_socket_id not in self.active_socket_ids
):
socket_id = preferred_socket_id
else:
socket_id = 0
while socket_id in self.active_socket_ids:
socket_id += 1
self.active_socket_ids.add(socket_id)
self.socket_id_mapping[original_socket_id] = socket_id
return self.socket_id_mapping[original_socket_id] return self.socket_id_mapping[original_socket_id]
@ -796,8 +922,7 @@ class DroneMonitor(Node):
# 为每个 sys_id 分配 socket_id如果还没有分配 # 为每个 sys_id 分配 socket_id如果还没有分配
# 注意:如果后续 summary 提供了 socket_id会使用 summary 的映射覆盖 # 注意:如果后续 summary 提供了 socket_id会使用 summary 的映射覆盖
if sys_id not in self.sys_to_socket_id: if sys_id not in self.sys_to_socket_id:
# 暂时所有 ROS2 topic 共享同一个 socket_id = 0 self.sys_to_socket_id[sys_id] = self.get_next_socket_id()
self.sys_to_socket_id[sys_id] = 0
subs_attr = f'drone_{sys_id}_subs' subs_attr = f'drone_{sys_id}_subs'
if not hasattr(self, subs_attr): if not hasattr(self, subs_attr):
@ -1194,8 +1319,12 @@ class DroneMonitor(Node):
# 從 summary 獲取原始 socket_id並映射到分配的 socket_id # 從 summary 獲取原始 socket_id並映射到分配的 socket_id
original_socket_id = data.get('socket_id') original_socket_id = data.get('socket_id')
if original_socket_id is not None: if original_socket_id is not None:
fallback_socket_id = self.sys_to_socket_id.pop(sys_id, None)
if fallback_socket_id is not None:
self.release_socket_id(fallback_socket_id)
# 使用原始 socket_id 獲取或分配統一的 socket_id # 使用原始 socket_id 獲取或分配統一的 socket_id
assigned_socket_id = self.get_or_assign_socket_id(original_socket_id) assigned_socket_id = self.get_or_assign_socket_id(original_socket_id)
self.sys_to_socket_id[sys_id] = assigned_socket_id
else: else:
# 如果沒有 socket_id使用 sys_to_socket_id 映射 # 如果沒有 socket_id使用 sys_to_socket_id 映射
assigned_socket_id = self.sys_to_socket_id.get(sys_id, 0) assigned_socket_id = self.sys_to_socket_id.get(sys_id, 0)
@ -1361,7 +1490,7 @@ class DroneMonitor(Node):
'ping': msg.data 'ping': msg.data
} }
def start_serial_connection(self, port='/dev/ttyUSB0', baudrate=57600): def start_serial_connection(self, port='/dev/ttyUSB0', baudrate=115200):
"""啟動串口遙測連接(自動辨識 MAVLink / JSON""" """啟動串口遙測連接(自動辨識 MAVLink / JSON"""
connection_name = f"Serial_{port.replace('/', '_')}" connection_name = f"Serial_{port.replace('/', '_')}"
receiver = SerialMavlinkReceiver(port, baudrate, self.signals, connection_name, self) receiver = SerialMavlinkReceiver(port, baudrate, self.signals, connection_name, self)

@ -290,6 +290,7 @@ class DronePanel(QWidget):
# 分隔符 # 分隔符
separator1 = QLabel(" - ") separator1 = QLabel(" - ")
separator1.setObjectName(f"{self.drone_id}_battery_sep1")
_set_scaled_stylesheet(separator1, "color: #DDD;") _set_scaled_stylesheet(separator1, "color: #DDD;")
# 顯示電壓 # 顯示電壓
@ -299,6 +300,7 @@ class DronePanel(QWidget):
# 分隔符 # 分隔符
separator2 = QLabel(" - ") separator2 = QLabel(" - ")
separator2.setObjectName(f"{self.drone_id}_battery_sep2")
_set_scaled_stylesheet(separator2, "color: #DDD;") _set_scaled_stylesheet(separator2, "color: #DDD;")
# 顯示電池節數 (S count) # 顯示電池節數 (S count)

@ -148,7 +148,7 @@ class ToggleSwitch(QWidget):
class ControlStationUI(QMainWindow): class ControlStationUI(QMainWindow):
planning_finished = pyqtSignal(object) planning_finished = pyqtSignal(object)
VERSION = '2.5.0' VERSION = '2.6.0'
FONT_SCALE_MIN = 70 FONT_SCALE_MIN = 70
FONT_SCALE_MAX = 180 FONT_SCALE_MAX = 180
FONT_SCALE_DEFAULT = 100 FONT_SCALE_DEFAULT = 100
@ -220,23 +220,24 @@ class ControlStationUI(QMainWindow):
self.socket_types = {} self.socket_types = {}
self.socket_colors = { self.socket_colors = {
'0': '#00BFFF', # 天藍色 (DeepSkyBlue) '0': '#00ACC1', # Cyan
'1': '#FFD700', # 金色 (Gold) '1': "#FDD835", # Yellow
'2': '#FF6969', # 淺紅色 (Light Red) '2': '#F8BBD0', # Pink
'3': '#FF69B4', # 熱粉紅 (HotPink) '3': '#90EE90', # Green
'4': '#00FA9A', # 中春綠 (MediumSpringGreen) '4': '#FB8C00', # Orange
'5': '#9370DB', # 中紫色 (MediumPurple) - 串口 '5': '#8E24AA', # Purple
'6': '#FFA500', # 橙色 (Orange) '6': '#1E88E5', # Blue
'7': '#20B2AA', # 淺海綠 (LightSeaGreen) '7': '#E53935', # Red
'8': '#7CFC00', # 草綠色 (LawnGreen) '8': '#C0CA33', # Lime
'9': '#FF8C00', # 深橙色 (DarkOrange) '9': '#6D4C41', # Brown
'default': '#AAAAAA' # 灰色 'default': '#E0E0E0' # Light gray
} }
self.drone_positions = {} self.drone_positions = {}
self.drone_headings = {} self.drone_headings = {}
# 初始化地圖 # 初始化地圖
self.drone_map = DroneMap() self.drone_map = DroneMap()
self.drone_map.set_socket_colors(self.socket_colors)
# 地圖更新獨立節流10Hz避免 WebEngine / JS map 拖慢 panel 更新 # 地圖更新獨立節流10Hz避免 WebEngine / JS map 拖慢 panel 更新
self.map_timer = QTimer() self.map_timer = QTimer()
@ -379,7 +380,7 @@ class ControlStationUI(QMainWindow):
verify_label = QLabel("模擬驗證") verify_label = QLabel("模擬驗證")
verify_label.setStyleSheet("color: #CCC; font-size: 12px;") verify_label.setStyleSheet("color: #CCC; font-size: 12px;")
self.verify_toggle = ToggleSwitch(checked=True) self.verify_toggle = ToggleSwitch(checked=False)
group_header.addWidget(verify_label) group_header.addWidget(verify_label)
group_header.addWidget(self.verify_toggle) group_header.addWidget(self.verify_toggle)
@ -1183,6 +1184,16 @@ class ControlStationUI(QMainWindow):
panel.update_drone_list() panel.update_drone_list()
panel.update_status() panel.update_status()
self._sync_map_group_colors()
def _sync_map_group_colors(self):
"""同步每台 drone 的 group 顏色到地圖箭頭外框。"""
drone_group_colors = {}
for group in self.mission_groups.values():
for drone_id in group.selected_drone_ids:
drone_group_colors[drone_id] = group.color
self.drone_map.set_drone_group_colors(drone_group_colors)
def _get_drone_checkbox_style(self, blocked=False): def _get_drone_checkbox_style(self, blocked=False):
"""回傳 drone checkbox 樣式;被其他 group 佔用時顯示紅色填滿。""" """回傳 drone checkbox 樣式;被其他 group 佔用時顯示紅色填滿。"""
blocked_style = """ blocked_style = """
@ -1272,6 +1283,8 @@ class ControlStationUI(QMainWindow):
# 只有當操作目標組是 active 組時,才更新 UI # 只有當操作目標組是 active 組時,才更新 UI
if group_id == self.active_group_id: if group_id == self.active_group_id:
self.refresh_selection_ui() self.refresh_selection_ui()
else:
self._sync_map_group_colors()
panel = self.group_panels.get(group_id) panel = self.group_panels.get(group_id)
if panel: if panel:
@ -1492,6 +1505,8 @@ class ControlStationUI(QMainWindow):
if group_id == self.active_group_id: if group_id == self.active_group_id:
self.refresh_selection_ui() self.refresh_selection_ui()
else:
self._sync_map_group_colors()
panel = self.group_panels.get(group_id) panel = self.group_panels.get(group_id)
if panel: if panel:
@ -1528,6 +1543,8 @@ class ControlStationUI(QMainWindow):
# 只有當操作目標組是 active 組時,才更新 UI # 只有當操作目標組是 active 組時,才更新 UI
if group_id == self.active_group_id: if group_id == self.active_group_id:
self.refresh_selection_ui() self.refresh_selection_ui()
else:
self._sync_map_group_colors()
panel = self.group_panels.get(group_id) panel = self.group_panels.get(group_id)
if panel: if panel:
@ -1552,6 +1569,8 @@ class ControlStationUI(QMainWindow):
# 只有當操作目標組是 active 組時,才更新 UI # 只有當操作目標組是 active 組時,才更新 UI
if group_id == self.active_group_id: if group_id == self.active_group_id:
self.refresh_selection_ui() self.refresh_selection_ui()
else:
self._sync_map_group_colors()
panel = self.group_panels.get(group_id) panel = self.group_panels.get(group_id)
if panel: if panel:
@ -1598,6 +1617,7 @@ class ControlStationUI(QMainWindow):
if self.group_tab_widget.count() > 0: if self.group_tab_widget.count() > 0:
self.group_tab_widget.setCurrentIndex(0) self.group_tab_widget.setCurrentIndex(0)
# _on_group_tab_changed 會自動設定新的 active group # _on_group_tab_changed 會自動設定新的 active group
self._sync_map_group_colors()
self.statusBar().showMessage(f"已刪除 Group {group_id}", 3000) self.statusBar().showMessage(f"已刪除 Group {group_id}", 3000)
@ -2314,17 +2334,30 @@ class ControlStationUI(QMainWindow):
self.queue_overview_update(drone_id, 'armed', arm_text) self.queue_overview_update(drone_id, 'armed', arm_text)
elif msg_type == 'battery': elif msg_type == 'battery':
voltage = data.get('voltage', 16) voltage = data.get('voltage')
cells = round(voltage / 3.95) cells = round(voltage / 3.95) if voltage is not None else None
percentage = (voltage / cells - 3.7) / 0.5 * 100 if cells > 0 else 0 percentage = (
(voltage / cells - 3.7) / 0.5 * 100
if voltage is not None and cells and cells > 0
else data.get('percentage', 0)
)
if percentage < 20: voltage_color = '#FF6464' if percentage < 20: voltage_color = '#FF6464'
elif percentage < 50: voltage_color = '#FFA500' elif percentage < 50: voltage_color = '#FFA500'
else: voltage_color = '#FFFFFF' else: voltage_color = '#FFFFFF'
percentage = data.get('percentage', percentage) percentage = data.get('percentage', percentage)
self.update_field(panel, drone_id, 'battery_pct', f"{percentage:.0f}%", voltage_color) self.update_field(panel, drone_id, 'battery_pct', f"{percentage:.0f}%", voltage_color)
self.update_field(panel, drone_id, 'battery_vol', f"{voltage:.2f}V") if voltage is not None:
self.update_field(panel, drone_id, 'battery_cells', f"{cells}S") self.update_field(panel, drone_id, 'battery_sep1', " - ")
self.queue_overview_update(drone_id, 'battery', f"{voltage:.2f}V") self.update_field(panel, drone_id, 'battery_vol', f"{voltage:.2f}V")
self.update_field(panel, drone_id, 'battery_sep2', " - ")
self.update_field(panel, drone_id, 'battery_cells', f"{cells}S")
self.queue_overview_update(drone_id, 'battery', f"{voltage:.2f}V")
else:
self.update_field(panel, drone_id, 'battery_sep1', "")
self.update_field(panel, drone_id, 'battery_vol', "")
self.update_field(panel, drone_id, 'battery_sep2', "")
self.update_field(panel, drone_id, 'battery_cells', "")
self.queue_overview_update(drone_id, 'battery', f"{percentage:.0f}%")
elif msg_type == 'altitude': elif msg_type == 'altitude':
altitude = data.get('altitude', 0) altitude = data.get('altitude', 0)

@ -1,4 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json
from PyQt6.QtWebEngineWidgets import QWebEngineView from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtCore import QTimer, pyqtSignal, QObject, pyqtSlot from PyQt6.QtCore import QTimer, pyqtSignal, QObject, pyqtSlot
from PyQt6.QtWebChannel import QWebChannel from PyQt6.QtWebChannel import QWebChannel
@ -17,6 +19,8 @@ class DroneMap:
self.map_loaded = False self.map_loaded = False
self.pending_map_updates = {} self.pending_map_updates = {}
self.font_scale = 1.0 self.font_scale = 1.0
self.socket_colors = {}
self.drone_group_colors = {}
# 創建橋接對象 # 創建橋接對象
self.bridge = MapBridge() self.bridge = MapBridge()
@ -281,38 +285,37 @@ class DroneMap:
}, 100); }, 100);
}); });
function createArrowIcon(color) { function createArrowIcon(color, groupColor) {
const outlineColor = groupColor || 'transparent';
return L.divIcon({ return L.divIcon({
className: 'drone-arrow', className: 'drone-arrow',
html: ` html: `
<div style=" <div style="
width: 30px; height: 30px; width: 34px; height: 34px;
display: flex; display: flex;
align-items: center; align-items: center;
justify-content: center; justify-content: center;
"> ">
<svg width="30" height="30" viewBox="0 0 30 30"> <svg width="34" height="34" viewBox="0 0 34 34">
<polygon points="15,0 30,30 15,25 0,30" fill="${color || '#FF0000'}" /> <polygon
points="17,3 31,31 17,26 3,31"
fill="${color || '#FF0000'}"
stroke="${outlineColor}"
stroke-width="3"
stroke-linejoin="round"
/>
</svg> </svg>
</div> </div>
`, `,
iconSize: [30, 30], iconSize: [34, 34],
iconAnchor: [15, 15] iconAnchor: [17, 17]
}); });
} }
function getColorBySocketId(id) { function getColorBySocketId(id) {
if (id.startsWith("s0_")) return "#00BFFF"; const match = /^s([^_]+)_/.exec(id || '');
if (id.startsWith("s1_")) return "#FFD700"; const socketId = match ? match[1] : 'default';
if (id.startsWith("s2_")) return "#FF6969"; return socketColors[socketId] || socketColors.default || "#E0E0E0";
if (id.startsWith("s3_")) return "#FF69B4";
if (id.startsWith("s4_")) return "#00FA9A";
if (id.startsWith("s5_")) return "#9370DB";
if (id.startsWith("s6_")) return "#FFA500";
if (id.startsWith("s7_")) return "#20B2AA";
if (id.startsWith("s8_")) return "#7CFC00";
if (id.startsWith("s9_")) return "#FF8C00";
return "#AAAAAA";
} }
function createIdIcon(id) { function createIdIcon(id) {
@ -340,8 +343,11 @@ class DroneMap:
var markers = {}; var markers = {};
var idLabels = {}; var idLabels = {};
var trajectories = {}; var trajectories = {};
const maxTrajectoryPoints = 300; const maxTrajectoryPoints = 2000;
const trajectoryMinDistanceMeters = 0.05;
var trajectoryLines = {}; var trajectoryLines = {};
var socketColors = {};
var droneGroupColors = {};
var focusedId = null; var focusedId = null;
var initialized = false; var initialized = false;
var trajectoryGroup = L.layerGroup().addTo(map); var trajectoryGroup = L.layerGroup().addTo(map);
@ -703,7 +709,7 @@ class DroneMap:
color: color, color: color,
weight: 3, weight: 3,
opacity: 0.7, opacity: 0.7,
smoothFactor: 1 smoothFactor: 0.1
}).addTo(trajectoryGroup); }).addTo(trajectoryGroup);
} }
} }
@ -746,7 +752,7 @@ class DroneMap:
if (markers[id]) { if (markers[id]) {
const lastPos = markers[id].getLatLng(); const lastPos = markers[id].getLatLng();
const distance = lastPos.distanceTo([lat, lon]); const distance = lastPos.distanceTo([lat, lon]);
if (distance > 1) { if (distance >= trajectoryMinDistanceMeters) {
addTrajectoryPoint(id, lat, lon); addTrajectoryPoint(id, lat, lon);
} }
@ -760,7 +766,7 @@ class DroneMap:
const color = getColorBySocketId(id); const color = getColorBySocketId(id);
markers[id] = L.marker([lat, lon], { markers[id] = L.marker([lat, lon], {
icon: createArrowIcon(color), icon: createArrowIcon(color, droneGroupColors[id]),
rotationAngle: heading, rotationAngle: heading,
rotationOrigin: 'center' rotationOrigin: 'center'
}) })
@ -798,6 +804,33 @@ class DroneMap:
} }
} }
function setDroneGroupColors(groupColors) {
droneGroupColors = groupColors || {};
Object.keys(markers).forEach(id => {
markers[id].setIcon(
createArrowIcon(getColorBySocketId(id), droneGroupColors[id])
);
if (typeof markers[id].setRotationAngle === 'function') {
markers[id].setRotationAngle(markers[id].options.rotationAngle || 0);
}
});
}
function setSocketColors(colors) {
socketColors = colors || {};
Object.keys(markers).forEach(id => {
markers[id].setIcon(
createArrowIcon(getColorBySocketId(id), droneGroupColors[id])
);
if (typeof markers[id].setRotationAngle === 'function') {
markers[id].setRotationAngle(markers[id].options.rotationAngle || 0);
}
});
Object.keys(trajectoryLines).forEach(id => {
trajectoryLines[id].setStyle({ color: getColorBySocketId(id) });
});
}
function clearAllTrajectories() { function clearAllTrajectories() {
trajectories = {}; trajectories = {};
Object.values(trajectoryLines).forEach(line => { Object.values(trajectoryLines).forEach(line => {
@ -904,6 +937,8 @@ class DroneMap:
if ok: if ok:
self.map_loaded = True self.map_loaded = True
self.set_font_scale(self.font_scale) self.set_font_scale(self.font_scale)
self.set_socket_colors(self.socket_colors)
self.set_drone_group_colors(self.drone_group_colors)
else: else:
_log("ERROR", "地圖載入失敗") _log("ERROR", "地圖載入失敗")
@ -918,7 +953,7 @@ class DroneMap:
js_commands = [] js_commands = []
for drone_id, (lat, lon, heading) in self.pending_map_updates.items(): for drone_id, (lat, lon, heading) in self.pending_map_updates.items():
js_commands.append(f"updateDrone({lat:.6f}, {lon:.6f}, '{drone_id}', {heading:.1f});") js_commands.append(f"updateDrone({lat:.8f}, {lon:.8f}, '{drone_id}', {heading:.1f});")
if js_commands: if js_commands:
combined_js = "\n".join(js_commands) combined_js = "\n".join(js_commands)
@ -931,6 +966,20 @@ class DroneMap:
if self.map_loaded: if self.map_loaded:
self.map_view.page().runJavaScript("clearAllTrajectories();") self.map_view.page().runJavaScript("clearAllTrajectories();")
def set_drone_group_colors(self, drone_group_colors):
"""設定每台無人機的 group 外框顏色;未分組者不傳入即為透明。"""
self.drone_group_colors = dict(drone_group_colors or {})
if self.map_loaded:
payload = json.dumps(self.drone_group_colors)
self.map_view.page().runJavaScript(f"setDroneGroupColors({payload});")
def set_socket_colors(self, socket_colors):
"""設定 socket 顏色,供地圖箭頭本體與軌跡線使用。"""
self.socket_colors = dict(socket_colors or {})
if self.map_loaded:
payload = json.dumps(self.socket_colors)
self.map_view.page().runJavaScript(f"setSocketColors({payload});")
def focus_on_drone(self, drone_id): def focus_on_drone(self, drone_id):
"""聚焦到指定無人機""" """聚焦到指定無人機"""
if self.map_loaded: if self.map_loaded:

@ -13,14 +13,13 @@ from mission_executor import MissionExecutor, MissionState
# 群組顏色(循環使用) # 群組顏色(循環使用)
GROUP_COLORS = [ GROUP_COLORS = [
'#4A9EFF', # 藍 "#E53935", # Deep Red
'#FF8C42', # 橘 "#FB8C00", # Deep Orange
'#56C87A', # 綠 "#FDD835", # Deep Yellow
'#E85D75', # 紅 "#43A047", # Deep Green
'#B07CED', # 紫 "#1E88E5", # Deep Blue
'#F5C542', # 黃 "#8E24AA", # Deep Purple
'#42C9C9', # 青 "#D81B60", # Deep Pink
'#FF6B9D', # 粉
] ]
DEFAULT_MISSION_PARAM_VALUES = { DEFAULT_MISSION_PARAM_VALUES = {

Loading…
Cancel
Save