import time import numpy as np import matplotlib.pyplot as plt import seaborn as sns from itertools import permutations import openjij as oj # ==================== 可調整參數區 ==================== # 問題規模設定 CITIES = 50 # 初始核心問題規模 (測試建議先用 12-15) CITIES_BOUND = 10 # 問題規模的上下範圍 (單一數字控制:以 CITIES 為中心,例如 15±5,即 10~20) RANDOM = False RANDOM_SEED = 42 COORD_RANGE = (0.0, 10.0) # 【核心創新】:環境變異數控制與複雜度壓力測試參數 N_LIST = list(range(CITIES - CITIES_BOUND, CITIES + CITIES_BOUND + 1, 4)) # 自動產生測試的問題規模範圍 COORD_STD = 15.0 # 空間分佈變異數 (越大代表城市分佈越不均勻、越崎嶇) RISK_STD = 10.0 # 擾動風險變異數 (越大代表某些路段特別危險,H-infinity 衝突極大) # 演算法執行設定 NUM_RUNS = 20 # 正式比較的執行次數 BETA = 1 # 🌟 稍微調高 (原為 10.0),讓低溫結冰得更紮實 SWEEPS_MAIN_TEST = 500 # 主要比較測試的退火步數 SWEEPS_COMPLEXITY_TEST = 200 # 複雜度擴展測試的退火步數 SWEEPS_HEATMAP_TEST = 200 # 熱力圖測試的退火步數 PAUSE_SWEEPS_DEFAULT = 150 # 主測試中 Pause SQA 的預設暫停步數 HEATMAP_RUNS = 5 # 熱力圖測試每種情況的平均次數 # QUBO 參數 PENALTY = 100.0 # 🌟 從 3000 大幅降回 500 (解開高爾夫球場效應) BIG_PENALTY = 2000.0 # 🌟 起點約束 # TSP 求解參數 EXACT_LIMIT = 8 # 魯棒優化參數 (H-infinity Robust QUBO) USE_ROBUST = True # 強制開啟,這是本論文的核心 GAMMA = 0.5 SIGMA = 1.0 ALPHA = 10.0 # 執行區塊控制 RUN_MAIN_TEST = True # 是否執行主要演算法(SA vs SQA)比較測試 INCLUDE_PAUSE_SQA = False # 主測試中是否包含 Pause SQA 進行比較 COMPARE_H_INFINITY = True # 是否比較有無 H-infinity 避障算法的效果 (將輸出 2x3 或 3x2 圖表) RUN_COMPLEXITY_TEST = False # 是否執行複雜度擴展(N_LIST)測試 SHOW_TERRAIN_PLOTS = False # 是否顯示能量地形圖與驗證報告 RUN_HEATMAP_TEST = False # 是否繪製 s 參數與 Pause Duration 成本熱力圖 ENABLE_QUANTUM_MINEFIELD = True # 是否產生極端能量障礙與欺騙陷阱 (若為 False,則產生一般隨機地圖) # ===================================================== def generate_quantum_minefield(N, random_seed=42): """ 生成具有「極大能量障礙」與「欺騙陷阱」的極端測試矩陣。 目標:讓 SA 卡死在陷阱裡,而 SQA 能夠穿隧找到黃金路線。 """ np.random.seed(random_seed) # 1. 築起能量高牆 (The Wall) # 預設所有的路徑都是極度危險且遙遠的 (形成巨大的能量障礙) D = np.random.uniform(30.0, 40.0, (N, N)) F = np.random.uniform(20.0, 30.0, (N, N)) np.fill_diagonal(D, 0) np.fill_diagonal(F, 0) # 2. 鑿出「黃金隧道 (Global Minimum)」 # 隱藏一條完美的最佳路徑 (0 -> 1 -> 2 -> ... -> N-1 -> 0) # 這條路徑成本合理且無風險,但被周圍的高牆死死包圍 golden_path = list(range(N)) for i in range(N): u = golden_path[i] v = golden_path[(i + 1) % N] D[u, v] = 2.0 # 距離適中 F[u, v] = 0.0 # 風險為 0 (完美的避險路線) # 3. 佈置「致命陷阱 (Deceptive Traps)」 # 創造極度誘人的第一步,引誘演算法走錯路。 # SA 通常是貪婪的,會優先選眼前成本最低的路徑。 for i in range(N): # 設定一個陷阱節點,遠離正確的下一步 trap_node = (i + int(N/2)) % N # 避免陷阱剛好蓋掉黃金路線 if trap_node != golden_path[(i + 1) % N] and trap_node != i: # 致命誘惑:眼前的距離和風險趨近於 0 D[i, trap_node] = 0.1 F[i, trap_node] = 0.1 # 物理意義:SA 會毫不猶豫地跳進這個 0.1 的陷阱, # 但一旦跳進去,trap_node 接下來通往其他城市的路線全是 30.0 以上的絕望高牆! # 只有具備量子穿隧效應的 SQA,才能看破眼前的 0.1,選擇走 2.0 的黃金隧道。 print(f"⚠️ [極限測試] 已成功生成 N={N} 的量子雷區地貌!") return D, F def verify_minefield_stats(D, F): """ 數值驗證:計算「黃金捷徑」與「一般隨機路徑」的成本差異, 證明地圖中確實存在極端的高牆與深谷。 """ N = len(D) # 1. 計算黃金捷徑 (0 -> 1 -> 2 -> ... -> N-1 -> 0) 的總成本 golden_path = list(range(N)) golden_cost = 0 for i in range(N): u = golden_path[i] v = golden_path[(i + 1) % N] golden_cost += (D[u, v] + F[u, v]) # 2. 計算一條「不小心踩到高牆」的隨機路徑成本 np.random.seed(99) random_path = np.random.permutation(N).tolist() random_cost = 0 for i in range(N): u = random_path[i] v = random_path[(i + 1) % N] random_cost += (D[u, v] + F[u, v]) print("\n🔍 [雷區地貌驗證報告]") print(f"✅ 黃金隧道總成本: {golden_cost:.2f} (演算法的終極目標)") print(f"❌ 隨機踩雷總成本: {random_cost:.2f} (一般 SA 容易落入的下場)") print(f"⚖️ 難度倍率: 隨機路徑的成本是黃金捷徑的 {random_cost/golden_cost:.1f} 倍!\n") def plot_energy_landscape_heatmap(D, F): """ 視覺化驗證:繪製 D+F 總成本矩陣的熱力圖。 """ Total_Cost_Matrix = D + F plt.figure(figsize=(8, 6)) # 使用 'hot' 顏色地圖,顏色越亮(黃/白)代表成本越高(高牆),越暗(黑/紅)代表成本越低(深谷) plt.imshow(Total_Cost_Matrix, cmap='hot', interpolation='nearest') plt.colorbar(label='Total Energy Cost (D + F)') plt.title('Quantum Minefield Energy Landscape') plt.xlabel('To City') plt.ylabel('From City') # 標示出黃金路線 (次對角線) plt.tight_layout() # plt.show() 將在最後統一呼叫 def plot_energy_landscape_3d(D, F): """ 視覺化驗證二:繪製 D+F 總成本矩陣的 3D 能量地形圖。 """ Total_Cost_Matrix = D + F N = len(D) X, Y = np.meshgrid(range(N), range(N)) fig = plt.figure(figsize=(10, 8)) ax = fig.add_subplot(111, projection='3d') # 繪製 3D 表面 surf = ax.plot_surface(X, Y, Total_Cost_Matrix, cmap='hot', edgecolor='none', alpha=0.9) fig.colorbar(surf, label='Total Energy Cost (D + F)', shrink=0.5, aspect=5) ax.set_title('3D Quantum Minefield Energy Landscape') ax.set_xlabel('To City') ax.set_ylabel('From City') ax.set_zlabel('Cost') plt.tight_layout() # plt.show() 將在最後統一呼叫 # ----------------- 基礎輔助函數 ----------------- def generate_distance_matrix(num_cities, random=True, seed=None, coord_range=None): if seed is None: seed = RANDOM_SEED if coord_range is None: coord_range = COORD_RANGE if not random: np.random.seed(seed) else: np.random.seed(None) low, high = coord_range coords = np.random.uniform(low, high, size=(num_cities, 2)) D = np.zeros((num_cities, num_cities), dtype=float) for i in range(num_cities): for j in range(i + 1, num_cities): dist = np.linalg.norm(coords[i] - coords[j]) D[i, j] = dist D[j, i] = dist return np.round(D, 2), coords def generate_disturbance_matrix(num_cities, seed=None): if seed is None: seed = RANDOM_SEED np.random.seed(seed) F = np.random.rand(num_cities, num_cities) F = (F + F.T) / 2 np.fill_diagonal(F, 0.0) return F def idx_mtsp(k, i, p, N): return k * (N * N) + i * N + p def build_robust_qubo(D, F, gamma=GAMMA, sigma=SIGMA, alpha=ALPHA, penalty=PENALTY, big_penalty=BIG_PENALTY): N = D.shape[0] Q = {} def addQ(u, v, w): if w == 0: return if u > v: u, v = v, u Q[(u, v)] = Q.get((u, v), 0.0) + w for k in range(2): for p in range(N): q = (p + 1) % N for i in range(N): for j in range(N): dij, fij = D[i, j], F[i, j] if dij == 0: continue # 這裡就是 H-infinity 的核心風險項!完全保留著! risk_term = (sigma / (gamma**2)) * (fij**2) total_weight = dij + (alpha * risk_term) u, v = idx_mtsp(k, i, p, N), idx_mtsp(k, j, q, N) addQ(u, v, total_weight) for k in range(2): for p in range(N): vars_pos = [idx_mtsp(k, i, p, N) for i in range(N)] for u in vars_pos: addQ(u, u, -penalty) for a in range(N): for b in range(a+1, N): addQ(vars_pos[a], vars_pos[b], 2*penalty) for i in range(1, N): vars_city = [idx_mtsp(k, i, p, N) for k in range(2) for p in range(N)] for u in vars_city: addQ(u, u, -penalty) L = len(vars_city) for a in range(L): for b in range(a+1, L): addQ(vars_city[a], vars_city[b], 2*penalty) for k in range(2): for i in range(1, N): addQ(idx_mtsp(k, i, 0, N), idx_mtsp(k, i, 0, N), big_penalty) addQ(idx_mtsp(k, 0, 0, N), idx_mtsp(k, 0, 0, N), -big_penalty) return Q, N # --------------------------------------------------------- # 🌟 [新增] 產生平滑的 SQA 與 Pause SQA Schedule # --------------------------------------------------------- def get_smooth_sqa_schedule(beta, total_sweeps, num_steps=100): """產生標準 SQA 的平滑退火排程 (s 從 0.0 遞增到 1.0)""" schedule = [] sweeps_per_step = max(1, int(total_sweeps / num_steps)) for s in np.linspace(0.0, 1.0, num_steps): schedule.append([float(s), float(beta), int(sweeps_per_step)]) return schedule def get_pause_sqa_schedule(s_star, beta, total_anneal_sweeps, pause_sweeps, num_steps=100): """產生帶有 Pause 策略的平滑退火排程""" schedule = [] # 階段 1:從 s=0.0 平滑退火到 s_star steps_p1 = max(1, int(num_steps * s_star)) sweeps_p1 = max(1, int((total_anneal_sweeps / 2) / steps_p1)) for s in np.linspace(0.0, s_star, steps_p1, endpoint=False): schedule.append([float(s), float(beta), int(sweeps_p1)]) # 階段 2:在相變點 s_star 暫停 (Pause),讓量子系統充分熱化 schedule.append([float(s_star), float(beta), int(pause_sweeps)]) # 階段 3:從 s_star 平滑退火到 s=1.0 steps_p3 = max(1, num_steps - steps_p1) sweeps_p3 = max(1, int((total_anneal_sweeps / 2) / steps_p3)) for s in np.linspace(s_star, 1.0, steps_p3): schedule.append([float(s), float(beta), int(sweeps_p3)]) return schedule def decode_slots(sample, N): slots = [] for k in range(2): row = [] for p in range(N): chosen = 0 for i in range(N): if sample.get(idx_mtsp(k, i, p, N), 0) == 1: chosen = i break row.append(chosen) slots.append(row) return slots[0], slots[1] def repair_routes_from_slots(u1_slots, u2_slots, N): count1, count2 = [0]*N, [0]*N for c in u1_slots: if 0 <= c < N: count1[c] += 1 for c in u2_slots: if 0 <= c < N: count2[c] += 1 assign1, assign2 = [], [] for city in range(1, N): if count1[city] > count2[city]: assign1.append(city) elif count2[city] > count1[city]: assign2.append(city) else: if len(assign1) <= len(assign2): assign1.append(city) else: assign2.append(city) return sorted(assign1), sorted(assign2) def uav_cost(path, D): if not path or len(path) == 0: return 0.0 cost = D[0, path[0]] for i in range(len(path)-1): cost += D[path[i], path[i+1]] cost += D[path[-1], 0] return float(cost) def uav_disturbance_energy(path, F, gamma=GAMMA, sigma=SIGMA): if not path or len(path) == 0: return 0.0 risk = (sigma / (gamma**2)) * (F[0, path[0]]**2) for i in range(len(path)-1): risk += (sigma / (gamma**2)) * (F[path[i], path[i+1]]**2) risk += (sigma / (gamma**2)) * (F[path[-1], 0]**2) return float(risk) def best_order_for_cities(cities, D, exact_limit=EXACT_LIMIT): cities = list(cities) if len(cities) <= 1: return cities, uav_cost(cities, D) if len(cities) <= exact_limit: best_perm, best_cost = None, float('inf') for perm in permutations(cities): c = uav_cost(list(perm), D) if c < best_cost: best_cost, best_perm = c, list(perm) return best_perm, best_cost else: # 簡單的 nearest neighbor heuristic 當作 fallback rem = set(cities) curr = list(rem)[0] rem.remove(curr) route = [curr] while rem: nxt = min(rem, key=lambda x: D[curr, x]) route.append(nxt) rem.remove(nxt) curr = nxt return route, uav_cost(route, D) def get_makespan_and_risk(sample, N, D, F): u1, u2 = decode_slots(sample, N) a1, a2 = repair_routes_from_slots(u1, u2, N) p1, c1 = best_order_for_cities(a1, D) p2, c2 = best_order_for_cities(a2, D) makespan = max(c1, c2) dist_energy = uav_disturbance_energy(p1, F) + uav_disturbance_energy(p2, F) # 確保回傳路徑,以便印出 return makespan, dist_energy, p1, p2 # ============================ # 核心創新功能區 (Pause Strategy) # ============================ def pilot_search_phase_transition(Q, N, D, F): print("\n🔍 [階段一] 前導偵查 (Pilot Search): 尋找最佳暫停點 s*") test_s_list = np.arange(0.2, 1.0, 0.1) variances = [] makespans = [] sampler = oj.SQASampler() for s in test_s_list: sched_var = [[s, BETA, 30]] res_var = sampler.sample_qubo(Q, schedule=sched_var, num_reads=10) energies = [d.energy for d in res_var.data()] variances.append(np.var(energies)) sched_pause = [[s, BETA, 20], [s, BETA, 40], [1.0, BETA, 20]] res_pause = sampler.sample_qubo(Q, schedule=sched_pause, num_reads=5) mk, _, p1, p2 = get_makespan_and_risk(res_pause.first.sample, N, D, F) makespans.append(mk) print(f" 測試 s={s:.1f} | 變異數: {variances[-1]:.0f} | Makespan: {mk:.2f}") best_s = test_s_list[np.argmin(makespans)] print(f"⭐ 鎖定最佳暫停點 s* = {best_s:.1f}") return test_s_list, variances, makespans, best_s def test_pause_durations(Q, N, D, F, s_star): print(f"\n⏳ [階段二] 暫停時間測試 (在 s*={s_star:.1f} 進行熱化)") durations = [0, 20, 50, 100, 150] duration_makespans = [] sampler = oj.SQASampler() for d in durations: if d == 0: sched = get_smooth_sqa_schedule(BETA, total_sweeps=SWEEPS_MAIN_TEST) else: sched = get_pause_sqa_schedule(s_star, BETA, total_anneal_sweeps=SWEEPS_MAIN_TEST, pause_sweeps=d) res = sampler.sample_qubo(Q, schedule=sched, num_reads=10) mk, _, p1, p2 = get_makespan_and_risk(res.first.sample, N, D, F) duration_makespans.append(mk) print(f" 暫停 {d} 步 -> Makespan: {mk:.2f}") return durations, duration_makespans def run_comparative_evaluations(Q, N, D, F, s_star): print(f"\n⚔️ [階段三] 正式對決 (SA vs SQA vs Pause-SQA, {NUM_RUNS} runs)") results = { 'sa': {'mk': [], 'risk': [], 'energy': []}, 'sqa': {'mk': [], 'risk': [], 'energy': []}, 'pause': {'mk': [], 'risk': [], 'energy': []} } sqa_sampler = oj.SQASampler() sa_sampler = oj.SASampler() # 🌟 核心修改:建立 Sweeps 的退火排程 TOTAL_SWEEPS = SWEEPS_MAIN_TEST PAUSE_SWEEPS = PAUSE_SWEEPS_DEFAULT # 讓 Pause-SQA 在相變點停留熱化 smooth_sched = get_smooth_sqa_schedule(beta=BETA, total_sweeps=TOTAL_SWEEPS, num_steps=100) pause_sched = get_pause_sqa_schedule(s_star=s_star, beta=BETA, total_anneal_sweeps=TOTAL_SWEEPS, pause_sweeps=PAUSE_SWEEPS, num_steps=100) # (選配) 在 pilot_search 和 test_pause 中也等比例放大 sweeps # 例如 pilot_search 改成 [[s, BETA, 300]] for r in range(NUM_RUNS): # SA res_sa = sa_sampler.sample_qubo(Q, num_reads=10) mk, risk, p1, p2 = get_makespan_and_risk(res_sa.first.sample, N, D, F) results['sa']['mk'].append(mk) results['sa']['risk'].append(risk) results['sa']['energy'].append(res_sa.first.energy) print(f" [SA 執行 {r+1}/{NUM_RUNS}] 路徑 1: [0, {', '.join(map(str, p1))}, 0], 路徑 2: [0, {', '.join(map(str, p2))}, 0] | 成本: {mk:.2f}") # Standard SQA t0 = time.time() res_std = sqa_sampler.sample_qubo(Q, schedule=smooth_sched, num_reads=10) mk, risk, p1, p2 = get_makespan_and_risk(res_std.first.sample, N, D, F) results['sqa']['mk'].append(mk) results['sqa']['risk'].append(risk) results['sqa']['energy'].append(res_std.first.energy) print(f" [Std-SQA 執行 {r+1}/{NUM_RUNS}] 耗時: {time.time()-t0:.2f}s | 路徑 1: [0, {', '.join(map(str, p1))}, 0], 路徑 2: [0, {', '.join(map(str, p2))}, 0] | 成本: {mk:.2f}") # Pause SQA if INCLUDE_PAUSE_SQA: t0 = time.time() res_pause = sqa_sampler.sample_qubo(Q, schedule=pause_sched, num_reads=10) mk, risk, p1, p2 = get_makespan_and_risk(res_pause.first.sample, N, D, F) results['pause']['mk'].append(mk) results['pause']['risk'].append(risk) results['pause']['energy'].append(res_pause.first.energy) print(f" [Pause-SQA 執行 {r+1}/{NUM_RUNS}] 耗時: {time.time()-t0:.2f}s | 路徑 1: [0, {', '.join(map(str, p1))}, 0], 路徑 2: [0, {', '.join(map(str, p2))}, 0] | 成本: {mk:.2f}") return results def create_basic_distribution_charts(results): if COMPARE_H_INFINITY: fig, axes = plt.subplots(2, 3, figsize=(16, 10)) fig.suptitle(r"Performance Distribution: Without vs With $H_\infty$ Obstacle Avoidance", fontsize=16, fontweight='bold', y=1.05) row_labels = ['Without Obstacle Avoidance', r'With $H_\infty$ Obstacle Avoidance'] res_keys = ['baseline', 'robust'] for row, r_key in enumerate(res_keys): res = results[r_key] if INCLUDE_PAUSE_SQA: labels = ['SA', 'Standard-SQA', 'Pause-SQA'] colors = ['gray', 'steelblue', 'firebrick'] mk_data = [res['sa']['mk'], res['sqa']['mk'], res['pause']['mk']] risk_data = [res['sa']['risk'], res['sqa']['risk'], res['pause']['risk']] energy_data = [res['sa']['energy'], res['sqa']['energy'], res['pause']['energy']] else: labels = ['SA', 'Standard-SQA'] colors = ['gray', 'steelblue'] mk_data = [res['sa']['mk'], res['sqa']['mk']] risk_data = [res['sa']['risk'], res['sqa']['risk']] energy_data = [res['sa']['energy'], res['sqa']['energy']] parts1 = axes[row, 0].violinplot(mk_data, showmeans=True) axes[row, 0].set_title(f"[{row_labels[row]}] Makespan Distribution") axes[row, 0].set_ylabel("Distance Cost") parts2 = axes[row, 1].violinplot(risk_data, showmeans=True) axes[row, 1].set_title(f"[{row_labels[row]}] Risk Distribution") axes[row, 1].set_ylabel("Risk Penalty") parts3 = axes[row, 2].violinplot(energy_data, showmeans=True) axes[row, 2].set_title(f"[{row_labels[row]}] Raw Energy") axes[row, 2].set_ylabel("System Energy") for col, parts in enumerate([parts1, parts2, parts3]): ax = axes[row, col] ax.set_xticks(np.arange(1, len(labels) + 1)) ax.set_xticklabels(labels, fontsize=11) ax.grid(alpha=0.3, axis='y') for pc, color in zip(parts['bodies'], colors): pc.set_facecolor(color) pc.set_alpha(0.7) else: fig, axes = plt.subplots(1, 3, figsize=(16, 5)) fig.suptitle("Performance Distribution Comparison", fontsize=16, fontweight='bold', y=1.05) if INCLUDE_PAUSE_SQA: labels = ['SA', 'Standard-SQA', 'Pause-SQA'] colors = ['gray', 'steelblue', 'firebrick'] mk_data = [results['sa']['mk'], results['sqa']['mk'], results['pause']['mk']] risk_data = [results['sa']['risk'], results['sqa']['risk'], results['pause']['risk']] energy_data = [results['sa']['energy'], results['sqa']['energy'], results['pause']['energy']] else: labels = ['SA', 'Standard-SQA'] colors = ['gray', 'steelblue'] mk_data = [results['sa']['mk'], results['sqa']['mk']] risk_data = [results['sa']['risk'], results['sqa']['risk']] energy_data = [results['sa']['energy'], results['sqa']['energy']] # 1. Makespan 分布 parts1 = axes[0].violinplot(mk_data, showmeans=True) axes[0].set_title("Makespan (Distance Cost) Distribution") axes[0].set_ylabel("Distance Cost") # 2. Risk 分布 parts2 = axes[1].violinplot(risk_data, showmeans=True) axes[1].set_title(r"$H_\infty$ Disturbance Risk Distribution") axes[1].set_ylabel("Risk Penalty") # 3. Energy 分布 parts3 = axes[2].violinplot(energy_data, showmeans=True) axes[2].set_title("QUBO System Raw Energy") axes[2].set_ylabel("System Energy") # 設定提琴圖外觀 for i, ax in enumerate(axes): ax.set_xticks(np.arange(1, len(labels) + 1)) ax.set_xticklabels(labels, fontsize=11) ax.grid(alpha=0.3, axis='y') parts = [parts1, parts2, parts3][i] for pc, color in zip(parts['bodies'], colors): pc.set_facecolor(color) pc.set_alpha(0.7) plt.tight_layout(rect=[0, 0.03, 1, 0.95]) plt.savefig("distribution_comparison.png", dpi=300) print("\n🎨 圖表已繪製: distribution_comparison.png") # plt.show() (Moved to the end) def create_academic_charts(s_list, variances, makespans, best_s, durations, dur_makespans, results): """原有的圖表二:Pause Strategy 深度分析圖""" if not INCLUDE_PAUSE_SQA: print("⚠️ INCLUDE_PAUSE_SQA 為 False,跳過 Pause SQA 深度分析圖表的繪製。") return fig = plt.figure(figsize=(16, 10)) fig.suptitle(f"Quantum Pause Strategy in H-infinity Robust mTSP (N={CITIES})", fontsize=16, fontweight='bold') # Chart 1: Phase Transition & U-Valley ax1 = plt.subplot(2, 2, 1) color1, color2 = 'tab:blue', 'tab:red' ax1.set_title("Chart 1: Phase Transition & U-Valley", fontsize=12) ax1.set_xlabel("Annealing Parameter $s$") ax1.set_ylabel("Trotter Energy Variance", color=color1) ax1.plot(s_list, variances, marker='o', color=color1, linewidth=2) ax1.tick_params(axis='y', labelcolor=color1) ax1_twin = ax1.twinx() ax1_twin.set_ylabel("Makespan", color=color2) ax1_twin.plot(s_list, makespans, marker='s', color=color2, linestyle='--', linewidth=2) ax1_twin.tick_params(axis='y', labelcolor=color2) ax1.axvline(x=best_s, color='green', linestyle=':', linewidth=2) ax1.grid(alpha=0.3) # Subplot 2 (was Chart 4): Effect of Thermalization Time ax2 = plt.subplot(2, 2, 2) ax2.set_title(f"Chart 2: Effect of Thermalization Time at $s^*={best_s:.1f}$", fontsize=12) ax2.plot(durations, dur_makespans, marker='D', color='darkorange', linewidth=2.5, markersize=8) ax2.set_xlabel("Pause Duration (Sweeps)") ax2.set_ylabel("Optimized Makespan") ax2.grid(alpha=0.3, linestyle='--') # Subplot 3 (was Chart 2): Pareto Front ax3 = plt.subplot(2, 2, 3) ax3.set_title("Chart 3: Robustness Pareto Front", fontsize=12) ax3.scatter(results['sqa']['mk'], results['sqa']['risk'], c='steelblue', alpha=0.7, s=80, label='Standard-SQA') if INCLUDE_PAUSE_SQA: ax3.scatter(results['pause']['mk'], results['pause']['risk'], c='firebrick', alpha=0.9, s=120, marker='*', label='Pause-SQA') ax3.set_xlabel("Makespan") ax3.set_ylabel("Disturbance Risk") ax3.legend() ax3.grid(alpha=0.3, linestyle='--') # Subplot 4 (was Chart 3): Reliability Violin Plot ax4 = plt.subplot(2, 2, 4) ax4.set_title("Chart 4: Final Makespan Comparison", fontsize=12) if INCLUDE_PAUSE_SQA: parts = ax4.violinplot([results['sqa']['mk'], results['pause']['mk']], showmeans=True) parts['bodies'][0].set_facecolor('steelblue') parts['bodies'][1].set_facecolor('firebrick') for body in parts['bodies']: body.set_alpha(0.7) ax4.set_xticks([1, 2]) ax4.set_xticklabels(['Standard-SQA', 'Pause-SQA']) else: parts = ax4.violinplot([results['sqa']['mk']], showmeans=True) parts['bodies'][0].set_facecolor('steelblue') for body in parts['bodies']: body.set_alpha(0.7) ax4.set_xticks([1]) ax4.set_xticklabels(['Standard-SQA']) if results['sa']['mk']: sa_mean = np.mean(results['sa']['mk']) ax4.axhline(y=sa_mean, color='gray', linestyle=':', linewidth=2, label=f'SA Mean: {sa_mean:.1f}') ax4.legend() ax4.grid(alpha=0.3, axis='y') plt.tight_layout(rect=[0, 0.03, 1, 0.95]) plt.savefig("academic_pause_strategy.png", dpi=300) print("🎨 原有圖表已繪製: academic_pause_strategy.png") # plt.show() (Moved to the end) # ============================ # 複雜度與環境控制功能 (新增) # ============================ def inject_hidden_shortcut(D, F): """ 注入隱藏的黃金捷徑 (Hidden Shortcut): 創造一條極度狹窄且成本極低的路線 (例如 4 -> 5 -> 6 -> 7)。 如果沒照順序走,成本會極高;一旦走對,總成本會大幅下降。 """ N = D.shape[0] if N < 8: return D, F # 城市太少無法做長捷徑 # 定義黃金路線的節點 (假設是 4, 5, 6, 7) golden_nodes = [4, 5, 6, 7] # 1. 築起高牆:先把這幾個城市之間的所有連線,都變成極高成本 (阻止 SA 亂湊) for i in golden_nodes: for j in golden_nodes: if i != j: D[i, j] = 8.0 F[i, j] = 0.8 # 高風險 # 2. 挖出深谷:只開通 4->5, 5->6, 6->7 這條唯一且完美的捷徑 for i in range(len(golden_nodes) - 1): n1 = golden_nodes[i] n2 = golden_nodes[i + 1] # 距離極短,風險為 0 D[n1, n2] = 0.05 D[n2, n1] = 0.05 F[n1, n2] = 0.0 F[n2, n1] = 0.0 print("\n✨ [隱藏捷徑已佈署] 演算法將挑戰尋找極狹窄的黃金路線 (4->5->6->7)!") return D, F def inject_deceptive_trap(D, F, alpha=10.0, gamma=0.5, sigma=1.0): """ 注入欺騙性陷阱 (Deceptive Trap): 改造矩陣,創造一條「看似完美的捷徑」,測試演算法是否會陷入局部陷阱。 """ N = D.shape[0] if N < 4: return D, F # 城市太少無法做陷阱 # 定義陷阱節點 trap_start = 1 trap_end = 2 safe_detour = 3 # ========================================== # 陷阱 1:致命捷徑 (距離極度誘人,但風險爆表) # ========================================== D[trap_start, trap_end] = 0.1 D[trap_end, trap_start] = 0.1 F[trap_start, trap_end] = 0.99 F[trap_end, trap_start] = 0.99 # ========================================== # 陷阱 2:安全繞路 (距離較遠,但完全無風險) # ========================================== # 路線: trap_start -> safe_detour -> trap_end D[trap_start, safe_detour] = 4.0 D[safe_detour, trap_start] = 4.0 D[safe_detour, trap_end] = 4.0 D[trap_end, safe_detour] = 4.0 F[trap_start, safe_detour] = 0.01 F[safe_detour, trap_start] = 0.01 F[safe_detour, trap_end] = 0.01 F[trap_end, safe_detour] = 0.01 # 計算並印出真實成本,讓您確認陷阱是否成立 # 真實成本 = 距離 + alpha * (sigma / gamma^2) * F^2 risk_multiplier = alpha * (sigma / (gamma**2)) shortcut_cost = 0.1 + risk_multiplier * (0.99**2) detour_cost = (4.0 + risk_multiplier * (0.01**2)) + (4.0 + risk_multiplier * (0.01**2)) print(f" ➤ 致命捷徑 (1->2) 表觀距離: 0.1 | 隱藏真實成本: {shortcut_cost:.2f}") print(f" ➤ 安全繞路 (1->3->2) 表觀距離: 8.0 | 隱藏真實成本: {detour_cost:.2f}") return D, F def generate_controlled_matrices(num_cities, coord_std, risk_std, seed=42): """根據指定的標準差(變異數)生成 D 與 F 矩陣,精準控制問題複雜度""" np.random.seed(seed) # 1. 生成距離矩陣 D (使用常態分佈控制空間聚集度) coords = np.random.normal(loc=0.0, scale=coord_std, size=(num_cities, 2)) D = np.zeros((num_cities, num_cities), dtype=float) for i in range(num_cities): for j in range(i + 1, num_cities): dist = np.linalg.norm(coords[i] - coords[j]) D[i, j] = D[j, i] = dist # 2. 生成擾動矩陣 F (使用常態分佈控制風險極端值) F = np.abs(np.random.normal(loc=1.0, scale=risk_std, size=(num_cities, num_cities))) F = (F + F.T) / 2 # 確保對稱 np.fill_diagonal(F, 0.0) return np.round(D, 2), np.round(F, 2) def decode_and_eval(sample, N, D): # 快速解碼與貪婪/暴力求成本 u1, u2 = [], [] for p in range(N): for i in range(N): if sample.get(idx_mtsp(0, i, p, N), 0) == 1: u1.append(i) if sample.get(idx_mtsp(1, i, p, N), 0) == 1: u2.append(i) # 簡單修復 (去除重複,補齊缺失) c1, c2 = set(u1), set(u2) a1, a2 = [], [] for city in range(1, N): if city in c1 and city not in c2: a1.append(city) elif city in c2 and city not in c1: a2.append(city) else: if len(a1) <= len(a2): a1.append(city) else: a2.append(city) # 評估成本 def eval_cost(cities): if not cities: return 0 if len(cities) <= EXACT_LIMIT: bc = float('inf') for p in permutations(cities): c = D[0, p[0]] + sum(D[p[i], p[i+1]] for i in range(len(p)-1)) + D[p[-1], 0] bc = min(bc, c) return bc # 貪婪 rem, path, curr = set(cities), [], 0 while rem: nxt = min(rem, key=lambda x: D[curr, x]) path.append(nxt) rem.remove(nxt) curr = nxt return D[0, path[0]] + sum(D[path[i], path[i+1]] for i in range(len(path)-1)) + D[path[-1], 0] return max(eval_cost(a1), eval_cost(a2)) def run_complexity_scaling(): print(f"\n🚀 開始複雜度壓力測試...") print(f"⚙️ 空間變異數(COORD_STD)={COORD_STD}, 風險變異數(RISK_STD)={RISK_STD}") std_means, pause_means = [], [] std_errs, pause_errs = [], [] sampler = oj.SQASampler() for N in N_LIST: print(f"\n📊 測試規模 N={N} ...") D, F = generate_controlled_matrices(N, COORD_STD, RISK_STD) # 動態計算懲罰值 max_d = np.max(D) max_f = np.max(F) if F is not None else 0 max_possible_cost = max_d + max_f dyn_penalty = max_possible_cost * 12.0 dyn_big_penalty = dyn_penalty * 5.0 Q, _ = build_robust_qubo(D, F, penalty=dyn_penalty, big_penalty=dyn_big_penalty) std_results, pause_results = [], [] # 為了節省時間,在壓力測試中我們使用固定的經驗暫停點 s=0.7 # 總步數統一為 SWEEPS_COMPLEXITY_TEST 步 std_sched = get_smooth_sqa_schedule(BETA, total_sweeps=SWEEPS_COMPLEXITY_TEST, num_steps=20) pause_sched = get_pause_sqa_schedule(0.7, BETA, total_anneal_sweeps=SWEEPS_COMPLEXITY_TEST, pause_sweeps=100, num_steps=30) # 這裡設定每次 N 跑的次數,與原本 NUM_RUNS 獨立,建議 10 次 runs_for_scale = 10 for r in range(runs_for_scale): # Standard SQA res_std = sampler.sample_qubo(Q, schedule=std_sched, num_reads=5) std_results.append(decode_and_eval(res_std.first.sample, N, D)) # Pause SQA res_pause = sampler.sample_qubo(Q, schedule=pause_sched, num_reads=5) pause_results.append(decode_and_eval(res_pause.first.sample, N, D)) mean_std, err_std = np.mean(std_results), np.std(std_results) mean_pause, err_pause = np.mean(pause_results), np.std(pause_results) std_means.append(mean_std) std_errs.append(err_std) pause_means.append(mean_pause) pause_errs.append(err_pause) print(f" Standard-SQA 平均: {mean_std:.2f} ± {err_std:.2f}") print(f" Pause-SQA 平均: {mean_pause:.2f} ± {err_pause:.2f}") return std_means, std_errs, pause_means, pause_errs def plot_crossover(std_means, std_errs, pause_means, pause_errs): plt.figure(figsize=(10, 6)) plt.title(f"Complexity Scaling: Algorithm Performance vs Problem Size (N)\n(Coord Std={COORD_STD}, Risk Std={RISK_STD})", fontsize=14, fontweight='bold') # 畫線與誤差棒 plt.errorbar(N_LIST, std_means, yerr=std_errs, fmt='-o', color='steelblue', linewidth=2.5, capsize=5, markersize=8, label='Standard-SQA') plt.errorbar(N_LIST, pause_means, yerr=pause_errs, fmt='-s', color='firebrick', linewidth=2.5, capsize=5, markersize=8, label='Pause-SQA') plt.xlabel("Problem Scale (Number of Cities $N$)", fontsize=12) plt.ylabel("Optimized Makespan (Distance Cost)", fontsize=12) plt.xticks(N_LIST) plt.grid(alpha=0.4, linestyle='--') plt.legend(fontsize=11) plt.tight_layout() plt.savefig("complexity_crossover.png", dpi=300) print("\n🎨 交叉點分析圖已儲存為: complexity_crossover.png") # plt.show() (Moved to the end) def generate_pause_heatmap(Q, N, D, num_reads=10): """ 掃描 s 參數與 Pause Duration,並繪製成本熱力圖 """ print("\n🚀 開始繪製量子暫停策略熱力圖...") # 1. 定義掃描範圍 # s 值從 0.1 到 0.9,間隔 0.1 (共 9 個點) s_values = np.round(np.linspace(0.1, 0.9, 9), 2) # 暫停步數從 20 到 200,間隔 20 (共 10 個點) pause_durations = np.arange(20, 220, 20) # 建立一個空矩陣來儲存結果 # row 對應 Pause Duration, col 對應 s heatmap_data = np.zeros((len(pause_durations), len(s_values))) # 2. 執行網格掃描 (Grid Search) total_iters = len(s_values) * len(pause_durations) current_iter = 0 sampler = oj.SQASampler() for i, duration in enumerate(pause_durations): for j, s_star in enumerate(s_values): current_iter += 1 print(f"進度: {current_iter}/{total_iters} | 測試 s*={s_star}, 暫停={duration}步") pause_sched = get_pause_sqa_schedule(s_star, BETA, total_anneal_sweeps=SWEEPS_HEATMAP_TEST, pause_sweeps=duration, num_steps=30) costs = [] for _ in range(HEATMAP_RUNS): res = sampler.sample_qubo(Q, schedule=pause_sched, num_reads=num_reads) costs.append(decode_and_eval(res.first.sample, N, D)) heatmap_data[i, j] = np.mean(costs) # 3. 繪製熱力圖 plt.figure(figsize=(12, 8)) ax = sns.heatmap( heatmap_data, xticklabels=s_values, yticklabels=pause_durations, cmap="YlGnBu", # 黃-綠-藍 配色,數值大偏藍,數值小偏黃(或反過來) annot=True, # 在格子上顯示數值 fmt=".0f", # 數值不留小數點 cbar_kws={'label': 'Optimized Makespan (Cost)'} ) # 反轉 Y 軸,讓 20 步在最下面,200 步在最上面 (符合物理直覺) ax.invert_yaxis() plt.title("Quantum Pause Strategy: Performance Heatmap\n(Finding the Optimal Phase Transition Phase)", fontsize=16, pad=15) plt.xlabel("Annealing Parameter $s^*$ (Pause Location)", fontsize=14) plt.ylabel("Pause Duration (Sweeps)", fontsize=14) plt.tight_layout() plt.savefig("quantum_pause_heatmap.png", dpi=300) # ============================ # 主程式執行入口 # ============================ if __name__ == "__main__": print("==================================================") if ENABLE_QUANTUM_MINEFIELD: print("🚀 生成極限量子雷區測試...") D, F = generate_quantum_minefield(CITIES, random_seed=RANDOM_SEED) # 進行數值驗證與畫圖 if SHOW_TERRAIN_PLOTS: verify_minefield_stats(D, F) plot_energy_landscape_heatmap(D, F) plot_energy_landscape_3d(D, F) else: print("🚀 生成一般隨機任務環境...") D, city_coords = generate_distance_matrix(CITIES, random=RANDOM) F = generate_disturbance_matrix(CITIES) # 您可選擇是否在此呼叫原有的干擾產生器 # D, F = inject_deceptive_trap(D, F, alpha=ALPHA, gamma=GAMMA, sigma=SIGMA) # D, F = inject_hidden_shortcut(D, F) # 原本的其他環境干擾暫時維持 (也可以選擇註解掉,因為 generate_quantum_minefield 已經很極端) # 為了保持之前的介面一致,這裡先保留動態懲罰值計算 # ======== 🌟 新增:動態自適應懲罰值 ======== # 找出矩陣中最遙遠的距離與最大的擾動 max_d = np.max(D) max_f = np.max(F) if F is not None else 0 max_possible_cost = max_d + max_f # 根據最大可能成本,動態設定懲罰值 (通常設為 10 倍 ~ 15 倍最穩定) PENALTY = max_possible_cost * 12.0 BIG_PENALTY = PENALTY * 5.0 Q = None if RUN_MAIN_TEST or RUN_HEATMAP_TEST: Q, _ = build_robust_qubo(D, F, penalty=PENALTY, big_penalty=BIG_PENALTY) if RUN_MAIN_TEST: print(f"🔧 [自動調校] 偵測到單趟最大成本: {max_possible_cost:.2f}") print(f"🔧 [自動調校] 動態 PENALTY 設為: {PENALTY:.2f}, BIG_PENALTY 設為: {BIG_PENALTY:.2f}") if COMPARE_H_INFINITY: print("==================================================") print("🚀 [開始 H-infinity 避障算法對比測試]") print("==================================================") # --- 基準測試 (無避障策略,不考慮障礙物風險) --- print("\n--- 基準測試 (無避障策略 - Algorithm Ignores Obstacles) ---") Q_baseline, _ = build_robust_qubo(D, F, alpha=0.0, penalty=PENALTY, big_penalty=BIG_PENALTY) s_list_base, var_base, pilot_mks_base, best_s_base = [], [], [], 0.5 if INCLUDE_PAUSE_SQA: s_list_base, var_base, pilot_mks_base, best_s_base = pilot_search_phase_transition(Q_baseline, CITIES, D, F) results_base = run_comparative_evaluations(Q_baseline, CITIES, D, F, best_s_base) # --- 穩健測試 (有避障策略,考慮障礙物風險) --- print("\n--- 穩健測試 (有避障策略 - Algorithm Avoids Obstacles) ---") Q_robust, _ = build_robust_qubo(D, F, alpha=ALPHA, penalty=PENALTY, big_penalty=BIG_PENALTY) s_list_rob, var_rob, pilot_mks_rob, best_s_rob = [], [], [], 0.5 durations_rob, dur_mks_rob = [], [] if INCLUDE_PAUSE_SQA: s_list_rob, var_rob, pilot_mks_rob, best_s_rob = pilot_search_phase_transition(Q_robust, CITIES, D, F) durations_rob, dur_mks_rob = test_pause_durations(Q_robust, CITIES, D, F, best_s_rob) results_rob = run_comparative_evaluations(Q_robust, CITIES, D, F, best_s_rob) # Pack results for plotting results = { 'baseline': results_base, 'robust': results_rob } # Note: For academic charts, we will pass robust's data since it's the focus s_list, variances, pilot_mks, best_s = s_list_rob, var_rob, pilot_mks_rob, best_s_rob durations, dur_mks = durations_rob, dur_mks_rob results_for_academic = results_rob else: print("==================================================") print(f"🚀 Quantum Pause Strategy Optimization (mTSP N={CITIES})") print("==================================================") print("\n[初始化] 距離矩陣 D:") print(D) print("\n[初始化] 擾動矩陣 F:") print(F) Q, _ = build_robust_qubo(D, F, penalty=PENALTY, big_penalty=BIG_PENALTY) s_list, variances, pilot_mks, best_s = [], [], [], 0.5 durations, dur_mks = [], [] if INCLUDE_PAUSE_SQA: # 階段 1:找相變點 s_list, variances, pilot_mks, best_s = pilot_search_phase_transition(Q, CITIES, D, F) # 階段 2:測試暫停時間 durations, dur_mks = test_pause_durations(Q, CITIES, D, F, best_s) # 階段 3:正式評估 (一次抓齊所有數據) results = run_comparative_evaluations(Q, CITIES, D, F, best_s) results_for_academic = results s_m, s_e, p_m, p_e = None, None, None, None if RUN_COMPLEXITY_TEST: # 階段 4:複雜度壓力測試與交叉點分析 s_m, s_e, p_m, p_e = run_complexity_scaling() print("\n🏁 所有運算完成!開始繪製圖表...") if RUN_MAIN_TEST: # 繪製圖表一:三大基礎分布對比 create_basic_distribution_charts(results) # 繪製圖表二:Pause SQA 深度分析 create_academic_charts(s_list, variances, pilot_mks, best_s, durations, dur_mks, results_for_academic) if RUN_COMPLEXITY_TEST and s_m is not None: # 繪製圖表三:複雜度壓力測試與交叉點分析 plot_crossover(s_m, s_e, p_m, p_e) if RUN_HEATMAP_TEST: # 繪製圖表四:s 與 Pause Duration 熱力圖 generate_pause_heatmap(Q, CITIES, D, num_reads=5) # 為節省時間使用 5 reads print("🏁 所有實驗與圖表繪製完成!") plt.show()