#!/usr/bin/env python3 """ 任務執行模組 管理多架無人機的 GUIDED 模式飛行控制迴圈 設計: - 每架無人機持有一個航點序列,逐點推進 - 各自到達就各自切換到下一個航點 - 事件驅動發送:航點切換時送一次,收到 send_result 才決定重送/前進 - 失敗策略 (b):單機重試 MAX_RETRY 次仍失敗 → 該機 fallback LOITER,其他機繼續 - Rendezvous barrier:在指定 wp index 等所有活著的機到齊才一起推進,有 timeout 保護 - 用 QTimer 驅動到達判定,在 Qt 主線程執行 - 暫停 = 停止到達判定 + 停止新指令 """ import asyncio import math import time from enum import Enum from PyQt6.QtCore import QObject, QTimer, pyqtSignal class MissionState(Enum): """整體任務狀態""" IDLE = "idle" RUNNING = "running" PAUSED = "paused" class TaskStatus(Enum): """單機任務狀態""" NORMAL = "normal" RETRYING = "retrying" WAITING_AT_BARRIER = "waiting_at_barrier" FALLBACK_LOITER = "fallback_loiter" class DroneTask: """單架無人機的任務資料""" __slots__ = ( 'drone_id', 'sysid', 'waypoints', 'wp_index', 'done', 'sent_current_wp', 'fail_count', 'status', 'waiting_since', ) def __init__(self, drone_id, sysid, waypoints): self.drone_id = drone_id self.sysid = sysid self.waypoints = waypoints self.wp_index = 0 self.done = len(waypoints) == 0 self.sent_current_wp = False self.fail_count = 0 self.status = TaskStatus.NORMAL self.waiting_since = 0.0 # monotonic time 進入 WAITING_AT_BARRIER 的瞬間 @property def current_target(self): if self.done or self.wp_index >= len(self.waypoints): return None return self.waypoints[self.wp_index] @property def total_waypoints(self): return len(self.waypoints) class MissionExecutor(QObject): """ 任務執行器 (事件驅動版 + rendezvous barrier) planned_waypoints 格式: { 'drone_ids': ['s0_1', 's0_2', ...], 'waypoints': [ [(lat,lon,alt), ...], # drone 0 [(lat,lon,alt), ...], # drone 1 ], 'rendezvous_indices': [3, 7, ...], # 選填;缺省 = 全程不同步(各自跑) } """ MAX_RETRY = 3 drone_waypoint_reached = pyqtSignal(str, int, int) # (drone_id, wp_index, total) task_status_changed = pyqtSignal(str, str, str) # (drone_id, status, message) mission_completed = pyqtSignal() def __init__(self, sender, drone_gps, monitor=None, arrival_radius=2.0, tick_rate_hz=2.0, barrier_timeout_sec=20.0): super().__init__() self.sender = sender self.drone_gps = drone_gps self.monitor = monitor # 用於失敗 fallback 到 LOITER self.arrival_radius = arrival_radius self.barrier_timeout_sec = barrier_timeout_sec self.state = MissionState.IDLE self.tasks = {} self.rendezvous_indices = set() self._interval_ms = int(1000 / tick_rate_hz) self._timer = QTimer() self._timer.timeout.connect(self._tick) # 只有 Ros2CommandSender 有 send_result signal;MavlinkSender 沒有 if hasattr(sender, 'send_result'): sender.send_result.connect(self._on_send_result) # ------------------------------------------------------------------ 公開方法 def start(self, planned_waypoints): if self.state == MissionState.RUNNING: print("任務已在執行中") return self.tasks.clear() drone_ids = planned_waypoints['drone_ids'] waypoints_list = planned_waypoints['waypoints'] self.rendezvous_indices = set( planned_waypoints.get('rendezvous_indices', []) or [] ) for i, drone_id in enumerate(drone_ids): sysid = int(drone_id.split('_')[1]) self.tasks[drone_id] = DroneTask(drone_id, sysid, waypoints_list[i]) self.state = MissionState.RUNNING self._timer.start(self._interval_ms) total_wps = sum(t.total_waypoints for t in self.tasks.values()) rv_info = ( f"rendezvous WP={sorted(self.rendezvous_indices)}" if self.rendezvous_indices else "無 rendezvous (各自跑)" ) print(f"任務啟動: {len(self.tasks)} 架無人機, " f"共 {total_wps} 個航點, " f"到達半徑={self.arrival_radius}m, " f"tick 週期={self._interval_ms}ms, " f"barrier timeout={self.barrier_timeout_sec}s, " f"{rv_info}") def pause(self): if self.state == MissionState.RUNNING: self._timer.stop() self.state = MissionState.PAUSED print("任務暫停") def resume(self): if self.state == MissionState.PAUSED: self._timer.start(self._interval_ms) self.state = MissionState.RUNNING print("任務恢復") def stop(self): self._timer.stop() self.tasks.clear() self.rendezvous_indices = set() self.state = MissionState.IDLE print("任務停止") # ------------------------------------------------------------------ 控制迴圈 def _tick(self): """ 控制 tick: Phase 1: 每台機做到達判定;非 barrier 點直接推進,barrier 點改成 WAITING Phase 2: 檢查 barrier 能不能釋放(全員到齊或 timeout) Phase 3: 每台未送過當前航點的機送一次 Phase 4: 全部完成檢查 """ now = time.monotonic() # ---- Phase 1: 到達判定 ---- for task in self.tasks.values(): if task.done or task.status == TaskStatus.FALLBACK_LOITER: continue # 已在 barrier 等待的不再跑到達判定 if task.status == TaskStatus.WAITING_AT_BARRIER: continue target = task.current_target if target is None: continue gps = self.drone_gps.get(task.drone_id) if gps is None: continue tgt_lat, tgt_lon, _ = target distance = _haversine(gps['lat'], gps['lon'], tgt_lat, tgt_lon) if distance >= self.arrival_radius: continue # 到達當前 wp_index if (task.wp_index in self.rendezvous_indices and task.wp_index < task.total_waypoints - 1): # rendezvous 點 → 不推進,進入 barrier 等待 task.status = TaskStatus.WAITING_AT_BARRIER task.waiting_since = now print(f" {task.drone_id} → 抵達 barrier WP {task.wp_index},等待同伴") self.task_status_changed.emit( task.drone_id, task.status.value, f"waiting at WP {task.wp_index}" ) continue # 一般點 → 直接推進 self._advance_waypoint(task, distance) # ---- Phase 2: barrier 釋放檢查 ---- self._check_barriers(now) # ---- Phase 3: 發送未送過的目標 ---- for task in self.tasks.values(): if task.done or task.status in ( TaskStatus.FALLBACK_LOITER, TaskStatus.WAITING_AT_BARRIER ): continue if task.sent_current_wp: continue target = task.current_target if target is None: continue tgt_lat, tgt_lon, tgt_alt = target task.sent_current_wp = True self.sender.send_position_global( task.drone_id, task.sysid, tgt_lat, tgt_lon, tgt_alt ) # ---- Phase 4: 完成檢查 ---- all_done = all( t.done or t.status == TaskStatus.FALLBACK_LOITER for t in self.tasks.values() ) if all_done and self.tasks: self._timer.stop() self.state = MissionState.IDLE self.mission_completed.emit() print("===== 任務全部完成 =====") def _advance_waypoint(self, task, arrived_distance): """把 task 推進一個航點,重置發送旗標。不處理 barrier 邏輯。""" task.wp_index += 1 task.sent_current_wp = False task.fail_count = 0 if task.wp_index >= task.total_waypoints: task.done = True self.drone_waypoint_reached.emit( task.drone_id, task.wp_index, task.total_waypoints ) print(f" {task.drone_id} → DONE " f"({task.total_waypoints}/{task.total_waypoints})") return self.drone_waypoint_reached.emit( task.drone_id, task.wp_index, task.total_waypoints ) print(f" {task.drone_id} → WP {task.wp_index}/{task.total_waypoints} " f"(到達距離: {arrived_distance:.1f}m)") def _check_barriers(self, now): """檢查每個有人在等的 barrier 能不能釋放。""" # 蒐集目前有哪些 barrier 有人卡著 waiting_groups = {} # barrier_idx -> [task, ...] for task in self.tasks.values(): if task.status == TaskStatus.WAITING_AT_BARRIER: waiting_groups.setdefault(task.wp_index, []).append(task) if not waiting_groups: return for barrier_idx, waiting_tasks in waiting_groups.items(): # 判斷是否「全員到齊」:所有仍活著的機都要麼在此 barrier 等,要麼早已越過此 idx everyone_arrived = True for task in self.tasks.values(): if task.done or task.status == TaskStatus.FALLBACK_LOITER: continue # 失敗機/完成機不擋 if (task.status == TaskStatus.WAITING_AT_BARRIER and task.wp_index == barrier_idx): continue # 在這裡等 if task.wp_index > barrier_idx: continue # 早已越過(barrier 先前已對他釋放) # 還在路上 everyone_arrived = False break force_reason = None if not everyone_arrived: # 檢查 timeout:以該 barrier 上最早進入等待的時間為準 oldest_wait = min(t.waiting_since for t in waiting_tasks) if now - oldest_wait >= self.barrier_timeout_sec: force_reason = f"timeout {self.barrier_timeout_sec:.0f}s" print(f"⚠️ barrier WP {barrier_idx} {force_reason},強制放行") else: continue # 還沒到齊、也還沒 timeout → 繼續等 # 釋放:把所有在此 barrier 等待的機一起推進 tag = "全員到齊" if force_reason is None else force_reason print(f"✅ barrier WP {barrier_idx} 釋放 ({tag})," f"推進 {len(waiting_tasks)} 架") for task in waiting_tasks: task.status = TaskStatus.NORMAL msg = f"barrier WP {barrier_idx} released ({tag})" self.task_status_changed.emit( task.drone_id, task.status.value, msg ) self._advance_waypoint(task, self.arrival_radius) # ------------------------------------------------------------------ 結果回呼 def _on_send_result(self, drone_id, sysid, success, message): """Ros2CommandSender.send_result 的 slot""" task = self.tasks.get(drone_id) if task is None or task.done: return # 若已在 barrier 等待,舊指令的遲到回應不要再觸發重試/fallback if task.status == TaskStatus.WAITING_AT_BARRIER: return if success: if task.fail_count > 0 or task.status == TaskStatus.RETRYING: task.status = TaskStatus.NORMAL self.task_status_changed.emit(drone_id, task.status.value, "recovered") task.fail_count = 0 return task.fail_count += 1 print(f"⚠️ {drone_id} 發送失敗 {task.fail_count}/{self.MAX_RETRY}: {message}") if task.fail_count < self.MAX_RETRY: task.status = TaskStatus.RETRYING task.sent_current_wp = False # 下個 tick 會重送 self.task_status_changed.emit( drone_id, task.status.value, f"retry {task.fail_count}/{self.MAX_RETRY}: {message}" ) else: task.status = TaskStatus.FALLBACK_LOITER self.task_status_changed.emit( drone_id, task.status.value, f"fallback LOITER after {self.MAX_RETRY} fails: {message}" ) print(f"❌ {drone_id} 連續失敗 {self.MAX_RETRY} 次 → 切換 LOITER") self._fallback_to_loiter(drone_id) def _fallback_to_loiter(self, drone_id): """用 monitor.set_mode 切 LOITER。set_mode 是 coroutine,透過 event loop 派送。""" if self.monitor is None: print(f"⚠️ 無 monitor,無法將 {drone_id} 切到 LOITER") return try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.create_task(self.monitor.set_mode(drone_id, "LOITER")) # ------------------------------------------------------------------ 工具函式 def _haversine(lat1, lon1, lat2, lon2): """計算兩個 GPS 座標間的水平距離 (公尺)""" R = 6371000.0 lat1_r = math.radians(lat1) lat2_r = math.radians(lat2) dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = (math.sin(dlat / 2) ** 2 + math.cos(lat1_r) * math.cos(lat2_r) * math.sin(dlon / 2) ** 2) return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))