You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1279 lines
55 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import seaborn as sns
from itertools import permutations
import openjij as oj
import os
# ============ 輸出文件夾設置 ============
OUTPUT_DIR = "./plots_output"
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
print(f"✓ 已建立輸出文件夾: {OUTPUT_DIR}")
# =======================================
# ==================== 可調整參數區 ====================
# 問題規模設定
CITIES = 50 # 初始核心問題規模 (測試建議先用 12-15)
CITIES_BOUND = 10 # 問題規模的上下範圍 (單一數字控制:以 CITIES 為中心,例如 15±5即 10~20)
RANDOM = False
RANDOM_SEED = 42
COORD_RANGE = (0.0, 10.0)
# 【核心創新】:環境變異數控制與複雜度壓力測試參數
N_LIST = list(range(CITIES - CITIES_BOUND, CITIES + CITIES_BOUND + 1, 4)) # 自動產生測試的問題規模範圍
COORD_STD = 15.0 # 空間分佈變異數 (越大代表城市分佈越不均勻、越崎嶇)
RISK_STD = 10.0 # 擾動風險變異數 (越大代表某些路段特別危險H-infinity 衝突極大)
# 演算法執行設定
NUM_RUNS = 20 # 正式比較的執行次數
BETA = 1 # 🌟 稍微調高 (原為 10.0),讓低溫結冰得更紮實
SWEEPS_MAIN_TEST = 500 # 主要比較測試的退火步數
SWEEPS_COMPLEXITY_TEST = 200 # 複雜度擴展測試的退火步數
SWEEPS_HEATMAP_TEST = 200 # 熱力圖測試的退火步數
PAUSE_SWEEPS_DEFAULT = 150 # 主測試中 Pause SQA 的預設暫停步數
HEATMAP_RUNS = 5 # 熱力圖測試每種情況的平均次數
# QUBO 參數
PENALTY = 100.0 # 🌟 從 3000 大幅降回 500 (解開高爾夫球場效應)
BIG_PENALTY = 2000.0 # 🌟 起點約束
# TSP 求解參數
EXACT_LIMIT = 8
# 魯棒優化參數 (H-infinity Robust QUBO)
USE_ROBUST = True # 強制開啟,這是本論文的核心
GAMMA = 0.5
SIGMA = 1.0
ALPHA = 10.0
# 故障訊號參數 (Fault Signal)
ENABLE_FAULT_SIGNAL = True # 是否加入故障訊號測試
FAULT_LAMBDA = 50000.0 # 故障懲罰權重 (lambda),設高一點演算法才會怕
FAULT_PROBABILITY = 0.1 # 兩城市間發生故障/禁飛的機率 (0.1 代表 10% 的路徑斷線)
# 執行區塊控制
RUN_MAIN_TEST = True # 是否執行主要演算法(SA vs SQA)比較測試
INCLUDE_PAUSE_SQA = False # 主測試中是否包含 Pause SQA 進行比較
COMPARE_H_INFINITY = False # 是否比較有無 H-infinity 避障算法的效果 (將輸出 2x3 或 3x2 圖表)
RUN_COMPLEXITY_TEST = False # 是否執行複雜度擴展(N_LIST)測試
SHOW_TERRAIN_PLOTS = False # 是否顯示能量地形圖與驗證報告
RUN_HEATMAP_TEST = False # 是否繪製 s 參數與 Pause Duration 成本熱力圖
ENABLE_QUANTUM_MINEFIELD = True # 是否產生極端能量障礙與欺騙陷阱 (若為 False則產生一般隨機地圖)
# =====================================================
def generate_quantum_minefield(N, random_seed=42):
"""
生成具有「極大能量障礙」與「欺騙陷阱」的極端測試矩陣。
目標:讓 SA 卡死在陷阱裡,而 SQA 能夠穿隧找到黃金路線。
"""
np.random.seed(random_seed)
# 1. 築起能量高牆 (The Wall)
# 預設所有的路徑都是極度危險且遙遠的 (形成巨大的能量障礙)
D = np.random.uniform(30.0, 40.0, (N, N))
F = np.random.uniform(20.0, 30.0, (N, N))
np.fill_diagonal(D, 0)
np.fill_diagonal(F, 0)
# 2. 鑿出「黃金隧道 (Global Minimum)」
# 隱藏一條完美的最佳路徑 (0 -> 1 -> 2 -> ... -> N-1 -> 0)
# 這條路徑成本合理且無風險,但被周圍的高牆死死包圍
golden_path = list(range(N))
for i in range(N):
u = golden_path[i]
v = golden_path[(i + 1) % N]
D[u, v] = 2.0 # 距離適中
F[u, v] = 0.0 # 風險為 0 (完美的避險路線)
# 3. 佈置「致命陷阱 (Deceptive Traps)」
# 創造極度誘人的第一步,引誘演算法走錯路。
# SA 通常是貪婪的,會優先選眼前成本最低的路徑。
for i in range(N):
# 設定一個陷阱節點,遠離正確的下一步
trap_node = (i + int(N/2)) % N
# 避免陷阱剛好蓋掉黃金路線
if trap_node != golden_path[(i + 1) % N] and trap_node != i:
# 致命誘惑:眼前的距離和風險趨近於 0
D[i, trap_node] = 0.1
F[i, trap_node] = 0.1
# 物理意義SA 會毫不猶豫地跳進這個 0.1 的陷阱,
# 但一旦跳進去trap_node 接下來通往其他城市的路線全是 30.0 以上的絕望高牆!
# 只有具備量子穿隧效應的 SQA才能看破眼前的 0.1,選擇走 2.0 的黃金隧道。
print(f"⚠️ [極限測試] 已成功生成 N={N} 的量子雷區地貌!")
return D, F
def verify_minefield_stats(D, F):
"""
數值驗證:計算「黃金捷徑」與「一般隨機路徑」的成本差異,
證明地圖中確實存在極端的高牆與深谷。
"""
N = len(D)
# 1. 計算黃金捷徑 (0 -> 1 -> 2 -> ... -> N-1 -> 0) 的總成本
golden_path = list(range(N))
golden_cost = 0
for i in range(N):
u = golden_path[i]
v = golden_path[(i + 1) % N]
golden_cost += (D[u, v] + F[u, v])
# 2. 計算一條「不小心踩到高牆」的隨機路徑成本
np.random.seed(99)
random_path = np.random.permutation(N).tolist()
random_cost = 0
for i in range(N):
u = random_path[i]
v = random_path[(i + 1) % N]
random_cost += (D[u, v] + F[u, v])
print("\n🔍 [雷區地貌驗證報告]")
print(f"✅ 黃金隧道總成本: {golden_cost:.2f} (演算法的終極目標)")
print(f"❌ 隨機踩雷總成本: {random_cost:.2f} (一般 SA 容易落入的下場)")
print(f"⚖️ 難度倍率: 隨機路徑的成本是黃金捷徑的 {random_cost/golden_cost:.1f} 倍!\n")
def plot_energy_landscape_heatmap(D, F):
"""
視覺化驗證:繪製 D+F 總成本矩陣的熱力圖。
"""
Total_Cost_Matrix = D + F
plt.figure(figsize=(8, 6))
# 使用 'hot' 顏色地圖,顏色越亮(黃/白)代表成本越高(高牆),越暗(黑/紅)代表成本越低(深谷)
plt.imshow(Total_Cost_Matrix, cmap='hot', interpolation='nearest')
plt.colorbar(label='Total Energy Cost (D + F)')
plt.title('Quantum Minefield Energy Landscape')
plt.xlabel('To City')
plt.ylabel('From City')
# 標示出黃金路線 (次對角線)
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "energy_landscape_heatmap.png"), dpi=300)
print(f"🎨 能量地形圖已儲存: {os.path.join(OUTPUT_DIR, 'energy_landscape_heatmap.png')}")
# plt.show() 將在最後統一呼叫
def plot_energy_landscape_3d(D, F):
"""
視覺化驗證二:繪製 D+F 總成本矩陣的 3D 能量地形圖。
"""
Total_Cost_Matrix = D + F
N = len(D)
X, Y = np.meshgrid(range(N), range(N))
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
# 繪製 3D 表面
surf = ax.plot_surface(X, Y, Total_Cost_Matrix, cmap='hot', edgecolor='none', alpha=0.9)
fig.colorbar(surf, label='Total Energy Cost (D + F)', shrink=0.5, aspect=5)
ax.set_title('3D Quantum Minefield Energy Landscape')
ax.set_xlabel('To City')
ax.set_ylabel('From City')
ax.set_zlabel('Cost')
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "energy_landscape_3d.png"), dpi=300)
print(f"🎨 3D 能量地形圖已儲存: {os.path.join(OUTPUT_DIR, 'energy_landscape_3d.png')}")
# plt.show() 將在最後統一呼叫
# ----------------- 基礎輔助函數 -----------------
def generate_distance_matrix(num_cities, random=True, seed=None, coord_range=None):
if seed is None: seed = RANDOM_SEED
if coord_range is None: coord_range = COORD_RANGE
if not random: np.random.seed(seed)
else: np.random.seed(None)
low, high = coord_range
coords = np.random.uniform(low, high, size=(num_cities, 2))
D = np.zeros((num_cities, num_cities), dtype=float)
for i in range(num_cities):
for j in range(i + 1, num_cities):
dist = np.linalg.norm(coords[i] - coords[j])
D[i, j] = dist
D[j, i] = dist
return np.round(D, 2), coords
def generate_disturbance_matrix(num_cities, seed=None):
if seed is None: seed = RANDOM_SEED
np.random.seed(seed)
F = np.random.rand(num_cities, num_cities)
F = (F + F.T) / 2
np.fill_diagonal(F, 0.0)
return F
def generate_fault_matrix(num_cities, prob=0.1, seed=None):
"""產生 0 或 1 的故障矩陣 I_fault(i,j)"""
if seed is not None:
np.random.seed(seed)
# 產生 0 或 1 的矩陣
Fault_Mat = np.random.choice([0, 1], size=(num_cities, num_cities), p=[1-prob, prob])
# 確保矩陣對稱 (i 到 j 故障j 到 i 也故障)
Fault_Mat = np.maximum(Fault_Mat, Fault_Mat.T)
np.fill_diagonal(Fault_Mat, 0.0)
return Fault_Mat
def idx_mtsp(k, i, p, N):
return k * (N * N) + i * N + p
def build_robust_qubo(D, F, Fault_Mat=None, gamma=GAMMA, sigma=SIGMA, alpha=ALPHA, penalty=PENALTY, big_penalty=BIG_PENALTY, lambda_val=FAULT_LAMBDA):
N = D.shape[0]
Q = {}
def addQ(u, v, w):
if w == 0: return
if u > v: u, v = v, u
Q[(u, v)] = Q.get((u, v), 0.0) + w
for k in range(2):
for p in range(N):
q = (p + 1) % N
for i in range(N):
for j in range(N):
dij, fij = D[i, j], F[i, j]
if dij == 0: continue
# 這裡就是 H-infinity 的核心風險項
risk_term = (sigma / (gamma**2)) * (fij**2)
total_weight = dij + (alpha * risk_term)
# 🌟 新增:加入故障訊號懲罰
if ENABLE_FAULT_SIGNAL and Fault_Mat is not None:
total_weight += lambda_val * Fault_Mat[i, j]
u, v = idx_mtsp(k, i, p, N), idx_mtsp(k, j, q, N)
addQ(u, v, total_weight)
for k in range(2):
for p in range(N):
vars_pos = [idx_mtsp(k, i, p, N) for i in range(N)]
for u in vars_pos: addQ(u, u, -penalty)
for a in range(N):
for b in range(a+1, N): addQ(vars_pos[a], vars_pos[b], 2*penalty)
for i in range(1, N):
vars_city = [idx_mtsp(k, i, p, N) for k in range(2) for p in range(N)]
for u in vars_city: addQ(u, u, -penalty)
L = len(vars_city)
for a in range(L):
for b in range(a+1, L): addQ(vars_city[a], vars_city[b], 2*penalty)
for k in range(2):
for i in range(1, N): addQ(idx_mtsp(k, i, 0, N), idx_mtsp(k, i, 0, N), big_penalty)
addQ(idx_mtsp(k, 0, 0, N), idx_mtsp(k, 0, 0, N), -big_penalty)
return Q, N
# ---------------------------------------------------------
# 🌟 [新增] 產生平滑的 SQA 與 Pause SQA Schedule
# ---------------------------------------------------------
def get_smooth_sqa_schedule(beta, total_sweeps, num_steps=100):
"""產生標準 SQA 的平滑退火排程 (s 從 0.0 遞增到 1.0)"""
schedule = []
sweeps_per_step = max(1, int(total_sweeps / num_steps))
for s in np.linspace(0.0, 1.0, num_steps):
schedule.append([float(s), float(beta), int(sweeps_per_step)])
return schedule
def get_pause_sqa_schedule(s_star, beta, total_anneal_sweeps, pause_sweeps, num_steps=100):
"""產生帶有 Pause 策略的平滑退火排程"""
schedule = []
# 階段 1從 s=0.0 平滑退火到 s_star
steps_p1 = max(1, int(num_steps * s_star))
sweeps_p1 = max(1, int((total_anneal_sweeps / 2) / steps_p1))
for s in np.linspace(0.0, s_star, steps_p1, endpoint=False):
schedule.append([float(s), float(beta), int(sweeps_p1)])
# 階段 2在相變點 s_star 暫停 (Pause),讓量子系統充分熱化
schedule.append([float(s_star), float(beta), int(pause_sweeps)])
# 階段 3從 s_star 平滑退火到 s=1.0
steps_p3 = max(1, num_steps - steps_p1)
sweeps_p3 = max(1, int((total_anneal_sweeps / 2) / steps_p3))
for s in np.linspace(s_star, 1.0, steps_p3):
schedule.append([float(s), float(beta), int(sweeps_p3)])
return schedule
def decode_slots(sample, N):
slots = []
for k in range(2):
row = []
for p in range(N):
chosen = 0
for i in range(N):
if sample.get(idx_mtsp(k, i, p, N), 0) == 1:
chosen = i
break
row.append(chosen)
slots.append(row)
return slots[0], slots[1]
def repair_routes_from_slots(u1_slots, u2_slots, N):
count1, count2 = [0]*N, [0]*N
for c in u1_slots:
if 0 <= c < N: count1[c] += 1
for c in u2_slots:
if 0 <= c < N: count2[c] += 1
assign1, assign2 = [], []
for city in range(1, N):
if count1[city] > count2[city]: assign1.append(city)
elif count2[city] > count1[city]: assign2.append(city)
else:
if len(assign1) <= len(assign2): assign1.append(city)
else: assign2.append(city)
return sorted(assign1), sorted(assign2)
def uav_cost(path, D):
if not path or len(path) == 0: return 0.0
cost = D[0, path[0]]
for i in range(len(path)-1): cost += D[path[i], path[i+1]]
cost += D[path[-1], 0]
return float(cost)
def uav_disturbance_energy(path, F, gamma=GAMMA, sigma=SIGMA):
if not path or len(path) == 0: return 0.0
risk = (sigma / (gamma**2)) * (F[0, path[0]]**2)
for i in range(len(path)-1): risk += (sigma / (gamma**2)) * (F[path[i], path[i+1]]**2)
risk += (sigma / (gamma**2)) * (F[path[-1], 0]**2)
return float(risk)
def best_order_for_cities(cities, D, exact_limit=EXACT_LIMIT):
cities = list(cities)
if len(cities) <= 1: return cities, uav_cost(cities, D)
if len(cities) <= exact_limit:
best_perm, best_cost = None, float('inf')
for perm in permutations(cities):
c = uav_cost(list(perm), D)
if c < best_cost: best_cost, best_perm = c, list(perm)
return best_perm, best_cost
else: # 簡單的 nearest neighbor heuristic 當作 fallback
rem = set(cities)
curr = list(rem)[0]
rem.remove(curr)
route = [curr]
while rem:
nxt = min(rem, key=lambda x: D[curr, x])
route.append(nxt)
rem.remove(nxt)
curr = nxt
return route, uav_cost(route, D)
def get_makespan_and_risk(sample, N, D, F):
u1, u2 = decode_slots(sample, N)
a1, a2 = repair_routes_from_slots(u1, u2, N)
p1, c1 = best_order_for_cities(a1, D)
p2, c2 = best_order_for_cities(a2, D)
makespan = max(c1, c2)
dist_energy = uav_disturbance_energy(p1, F) + uav_disturbance_energy(p2, F)
# 確保回傳路徑,以便印出
return makespan, dist_energy, p1, p2
# ============================
# 核心創新功能區 (Pause Strategy)
# ============================
def pilot_search_phase_transition(Q, N, D, F):
print("\n🔍 [階段一] 前導偵查 (Pilot Search): 尋找最佳暫停點 s*")
test_s_list = np.arange(0.2, 1.0, 0.1)
variances = []
makespans = []
sampler = oj.SQASampler()
for s in test_s_list:
sched_var = [[s, BETA, 30]]
res_var = sampler.sample_qubo(Q, schedule=sched_var, num_reads=10)
energies = [d.energy for d in res_var.data()]
variances.append(np.var(energies))
sched_pause = [[s, BETA, 20], [s, BETA, 40], [1.0, BETA, 20]]
res_pause = sampler.sample_qubo(Q, schedule=sched_pause, num_reads=5)
mk, _, p1, p2 = get_makespan_and_risk(res_pause.first.sample, N, D, F)
makespans.append(mk)
print(f" 測試 s={s:.1f} | 變異數: {variances[-1]:.0f} | Makespan: {mk:.2f}")
best_s = test_s_list[np.argmin(makespans)]
print(f"⭐ 鎖定最佳暫停點 s* = {best_s:.1f}")
return test_s_list, variances, makespans, best_s
def test_pause_durations(Q, N, D, F, s_star):
print(f"\n⏳ [階段二] 暫停時間測試 (在 s*={s_star:.1f} 進行熱化)")
durations = [0, 20, 50, 100, 150]
duration_makespans = []
sampler = oj.SQASampler()
for d in durations:
if d == 0:
sched = get_smooth_sqa_schedule(BETA, total_sweeps=SWEEPS_MAIN_TEST)
else:
sched = get_pause_sqa_schedule(s_star, BETA, total_anneal_sweeps=SWEEPS_MAIN_TEST, pause_sweeps=d)
res = sampler.sample_qubo(Q, schedule=sched, num_reads=10)
mk, _, p1, p2 = get_makespan_and_risk(res.first.sample, N, D, F)
duration_makespans.append(mk)
print(f" 暫停 {d} 步 -> Makespan: {mk:.2f}")
return durations, duration_makespans
def run_comparative_evaluations(Q, N, D, F, s_star):
print(f"\n⚔️ [階段三] 正式對決 (SA vs SQA vs Pause-SQA, {NUM_RUNS} runs)")
results = {
'sa': {'mk': [], 'risk': [], 'energy': [], 'faults': [], 'best_mk': float('inf'), 'best_p1': None, 'best_p2': None},
'sqa': {'mk': [], 'risk': [], 'energy': [], 'faults': [], 'best_mk': float('inf'), 'best_p1': None, 'best_p2': None},
'pause': {'mk': [], 'risk': [], 'energy': [], 'faults': [], 'best_mk': float('inf'), 'best_p1': None, 'best_p2': None}
}
sqa_sampler = oj.SQASampler()
sa_sampler = oj.SASampler()
# 🌟 核心修改:建立 Sweeps 的退火排程
TOTAL_SWEEPS = SWEEPS_MAIN_TEST
PAUSE_SWEEPS = PAUSE_SWEEPS_DEFAULT # 讓 Pause-SQA 在相變點停留熱化
smooth_sched = get_smooth_sqa_schedule(beta=BETA, total_sweeps=TOTAL_SWEEPS, num_steps=100)
pause_sched = get_pause_sqa_schedule(s_star=s_star, beta=BETA, total_anneal_sweeps=TOTAL_SWEEPS, pause_sweeps=PAUSE_SWEEPS, num_steps=100)
# (選配) 在 pilot_search 和 test_pause 中也等比例放大 sweeps
# 例如 pilot_search 改成 [[s, BETA, 300]]
for r in range(NUM_RUNS):
# SA
res_sa = sa_sampler.sample_qubo(Q, num_reads=10)
mk, risk, p1, p2 = get_makespan_and_risk(res_sa.first.sample, N, D, F)
fault_count = count_faults_hit(p1, Fault_Mat) + count_faults_hit(p2, Fault_Mat) if ENABLE_FAULT_SIGNAL else 0
results['sa']['mk'].append(mk)
results['sa']['risk'].append(risk)
results['sa']['energy'].append(res_sa.first.energy)
results['sa']['faults'].append(fault_count)
if mk < results['sa']['best_mk']:
results['sa']['best_mk'] = mk
results['sa']['best_p1'] = p1
results['sa']['best_p2'] = p2
print(f" [SA 執行 {r+1}/{NUM_RUNS}] 路徑 1: [0, {', '.join(map(str, p1))}, 0], 路徑 2: [0, {', '.join(map(str, p2))}, 0] | 成本: {mk:.2f} | 故障數: {fault_count}")
# Standard SQA
t0 = time.time()
res_std = sqa_sampler.sample_qubo(Q, schedule=smooth_sched, num_reads=10)
mk, risk, p1, p2 = get_makespan_and_risk(res_std.first.sample, N, D, F)
fault_count = count_faults_hit(p1, Fault_Mat) + count_faults_hit(p2, Fault_Mat) if ENABLE_FAULT_SIGNAL else 0
results['sqa']['mk'].append(mk)
results['sqa']['risk'].append(risk)
results['sqa']['energy'].append(res_std.first.energy)
results['sqa']['faults'].append(fault_count)
if mk < results['sqa']['best_mk']:
results['sqa']['best_mk'] = mk
results['sqa']['best_p1'] = p1
results['sqa']['best_p2'] = p2
print(f" [Std-SQA 執行 {r+1}/{NUM_RUNS}] 耗時: {time.time()-t0:.2f}s | 路徑 1: [0, {', '.join(map(str, p1))}, 0], 路徑 2: [0, {', '.join(map(str, p2))}, 0] | 成本: {mk:.2f} | 故障數: {fault_count}")
# Pause SQA
if INCLUDE_PAUSE_SQA:
t0 = time.time()
res_pause = sqa_sampler.sample_qubo(Q, schedule=pause_sched, num_reads=10)
mk, risk, p1, p2 = get_makespan_and_risk(res_pause.first.sample, N, D, F)
fault_count = count_faults_hit(p1, Fault_Mat) + count_faults_hit(p2, Fault_Mat) if ENABLE_FAULT_SIGNAL else 0
results['pause']['mk'].append(mk)
results['pause']['risk'].append(risk)
results['pause']['energy'].append(res_pause.first.energy)
results['pause']['faults'].append(fault_count)
if mk < results['pause']['best_mk']:
results['pause']['best_mk'] = mk
results['pause']['best_p1'] = p1
results['pause']['best_p2'] = p2
print(f" [Pause-SQA 執行 {r+1}/{NUM_RUNS}] 耗時: {time.time()-t0:.2f}s | 路徑 1: [0, {', '.join(map(str, p1))}, 0], 路徑 2: [0, {', '.join(map(str, p2))}, 0] | 成本: {mk:.2f} | 故障數: {fault_count}")
return results
def create_basic_distribution_charts(results):
if COMPARE_H_INFINITY:
fig, axes = plt.subplots(2, 4, figsize=(20, 10))
fig.suptitle(r"Performance Distribution: Without vs With $H_\infty$ Obstacle Avoidance", fontsize=16, fontweight='bold', y=1.05)
row_labels = ['Without Obstacle Avoidance', r'With $H_\infty$ Obstacle Avoidance']
res_keys = ['baseline', 'robust']
for row, r_key in enumerate(res_keys):
res = results[r_key]
if INCLUDE_PAUSE_SQA:
labels = ['SA', 'Standard-SQA', 'Pause-SQA']
colors = ['gray', 'steelblue', 'firebrick']
mk_data = [res['sa']['mk'], res['sqa']['mk'], res['pause']['mk']]
risk_data = [res['sa']['risk'], res['sqa']['risk'], res['pause']['risk']]
energy_data = [res['sa']['energy'], res['sqa']['energy'], res['pause']['energy']]
fault_data = [res['sa']['faults'], res['sqa']['faults'], res['pause']['faults']]
else:
labels = ['SA', 'Standard-SQA']
colors = ['gray', 'steelblue']
mk_data = [res['sa']['mk'], res['sqa']['mk']]
risk_data = [res['sa']['risk'], res['sqa']['risk']]
energy_data = [res['sa']['energy'], res['sqa']['energy']]
fault_data = [res['sa']['faults'], res['sqa']['faults']]
parts1 = axes[row, 0].violinplot(mk_data, showmeans=True)
axes[row, 0].set_title(f"[{row_labels[row]}] Makespan Distribution")
axes[row, 0].set_ylabel("Distance Cost")
parts2 = axes[row, 1].violinplot(risk_data, showmeans=True)
axes[row, 1].set_title(f"[{row_labels[row]}] Risk Distribution")
axes[row, 1].set_ylabel("Risk Penalty")
parts3 = axes[row, 2].violinplot(energy_data, showmeans=True)
axes[row, 2].set_title(f"[{row_labels[row]}] Raw Energy")
axes[row, 2].set_ylabel("System Energy")
parts4 = axes[row, 3].violinplot(fault_data, showmeans=True)
axes[row, 3].set_title(f"[{row_labels[row]}] Faults Hit Distribution")
axes[row, 3].set_ylabel("Number of Faults")
for col, parts in enumerate([parts1, parts2, parts3, parts4]):
ax = axes[row, col]
ax.set_xticks(np.arange(1, len(labels) + 1))
ax.set_xticklabels(labels, fontsize=11)
ax.grid(alpha=0.3, axis='y')
for pc, color in zip(parts['bodies'], colors):
pc.set_facecolor(color)
pc.set_alpha(0.7)
else:
fig, axes = plt.subplots(1, 4, figsize=(20, 5))
fig.suptitle("Performance Distribution Comparison", fontsize=16, fontweight='bold', y=1.05)
if INCLUDE_PAUSE_SQA:
labels = ['SA', 'Standard-SQA', 'Pause-SQA']
colors = ['gray', 'steelblue', 'firebrick']
mk_data = [results['sa']['mk'], results['sqa']['mk'], results['pause']['mk']]
risk_data = [results['sa']['risk'], results['sqa']['risk'], results['pause']['risk']]
energy_data = [results['sa']['energy'], results['sqa']['energy'], results['pause']['energy']]
fault_data = [results['sa']['faults'], results['sqa']['faults'], results['pause']['faults']]
else:
labels = ['SA', 'Standard-SQA']
colors = ['gray', 'steelblue']
mk_data = [results['sa']['mk'], results['sqa']['mk']]
risk_data = [results['sa']['risk'], results['sqa']['risk']]
energy_data = [results['sa']['energy'], results['sqa']['energy']]
fault_data = [results['sa']['faults'], results['sqa']['faults']]
# 1. Makespan 分布
parts1 = axes[0].violinplot(mk_data, showmeans=True)
axes[0].set_title("Makespan (Distance Cost) Distribution")
axes[0].set_ylabel("Distance Cost")
# 2. Risk 分布
parts2 = axes[1].violinplot(risk_data, showmeans=True)
axes[1].set_title(r"$H_\infty$ Disturbance Risk Distribution")
axes[1].set_ylabel("Risk Penalty")
# 3. Energy 分布
parts3 = axes[2].violinplot(energy_data, showmeans=True)
axes[2].set_title("QUBO System Raw Energy")
axes[2].set_ylabel("System Energy")
# 4. Faults 分布
parts4 = axes[3].violinplot(fault_data, showmeans=True)
axes[3].set_title("Faults Hit Distribution")
axes[3].set_ylabel("Number of Faults")
# 設定提琴圖外觀
for i, ax in enumerate(axes):
ax.set_xticks(np.arange(1, len(labels) + 1))
ax.set_xticklabels(labels, fontsize=11)
ax.grid(alpha=0.3, axis='y')
parts = [parts1, parts2, parts3, parts4][i]
for pc, color in zip(parts['bodies'], colors):
pc.set_facecolor(color)
pc.set_alpha(0.7)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig(os.path.join(OUTPUT_DIR, "distribution_comparison.png"), dpi=300)
print(f"\n🎨 圖表已繪製: {os.path.join(OUTPUT_DIR, 'distribution_comparison.png')}")
# plt.show() (Moved to the end)
def plot_real_route_comparison(N, coords, p1_sa, p2_sa, p1_sqa, p2_sqa):
"""
視覺化 SA 與 Pause-SQA 的實體飛行路線對比 (使用真實座標)
"""
fig, axes = plt.subplots(1, 2, figsize=(14, 7))
fig.suptitle("UAV Actual Physical Trajectory", fontsize=16, fontweight='bold')
G = nx.Graph()
G.add_nodes_from(range(N))
# 🌟 關鍵修改:使用真實的 (x, y) 座標作為節點位置 🌟
pos = {i: (coords[i][0], coords[i][1]) for i in range(N)}
titles = ["Classical SA", "Pause-SQA" if INCLUDE_PAUSE_SQA else "Standard-SQA"]
routes_list = [(p1_sa, p2_sa), (p1_sqa, p2_sqa)]
for ax, title, (p1, p2) in zip(axes, titles, routes_list):
ax.set_title(title, fontsize=14)
# 畫節點 (依照真實地理位置散佈)
nx.draw_networkx_nodes(G, pos, nodelist=[0], node_color='red', node_shape='s', node_size=300, ax=ax, label='Depot')
nx.draw_networkx_nodes(G, pos, nodelist=range(1, N), node_color='skyblue', node_size=150, ax=ax)
nx.draw_networkx_labels(G, pos, font_size=8, ax=ax)
# 畫路線
def add_edges(path, color, style):
if not path: return
edges = [(0, path[0])] + [(path[i], path[i+1]) for i in range(len(path)-1)] + [(path[-1], 0)]
nx.draw_networkx_edges(G, pos, edgelist=edges, edge_color=color, style=style, width=2.0, arrows=True, ax=ax)
add_edges(p1, 'blue', 'solid')
add_edges(p2, 'darkorange', 'dashed')
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "route_trajectory_comparison.png"), dpi=300)
print(f"🎨 飛行軌跡對比圖已繪製: {os.path.join(OUTPUT_DIR, 'route_trajectory_comparison.png')}")
def create_academic_charts(s_list, variances, makespans, best_s, durations, dur_makespans, results):
"""原有的圖表二Pause Strategy 深度分析圖"""
if not INCLUDE_PAUSE_SQA:
print("⚠️ INCLUDE_PAUSE_SQA 為 False跳過 Pause SQA 深度分析圖表的繪製。")
return
fig = plt.figure(figsize=(16, 10))
fig.suptitle(f"Quantum Pause Strategy in H-infinity Robust mTSP (N={CITIES})", fontsize=16, fontweight='bold')
# Chart 1: Phase Transition & U-Valley
ax1 = plt.subplot(2, 2, 1)
color1, color2 = 'tab:blue', 'tab:red'
ax1.set_title("Chart 1: Phase Transition & U-Valley", fontsize=12)
ax1.set_xlabel("Annealing Parameter $s$")
ax1.set_ylabel("Trotter Energy Variance", color=color1)
ax1.plot(s_list, variances, marker='o', color=color1, linewidth=2)
ax1.tick_params(axis='y', labelcolor=color1)
ax1_twin = ax1.twinx()
ax1_twin.set_ylabel("Makespan", color=color2)
ax1_twin.plot(s_list, makespans, marker='s', color=color2, linestyle='--', linewidth=2)
ax1_twin.tick_params(axis='y', labelcolor=color2)
ax1.axvline(x=best_s, color='green', linestyle=':', linewidth=2)
ax1.grid(alpha=0.3)
# Subplot 2 (was Chart 4): Effect of Thermalization Time
ax2 = plt.subplot(2, 2, 2)
ax2.set_title(f"Chart 2: Effect of Thermalization Time at $s^*={best_s:.1f}$", fontsize=12)
ax2.plot(durations, dur_makespans, marker='D', color='darkorange', linewidth=2.5, markersize=8)
ax2.set_xlabel("Pause Duration (Sweeps)")
ax2.set_ylabel("Optimized Makespan")
ax2.grid(alpha=0.3, linestyle='--')
# Subplot 3 (was Chart 2): Pareto Front
ax3 = plt.subplot(2, 2, 3)
ax3.set_title("Chart 3: Robustness Pareto Front", fontsize=12)
ax3.scatter(results['sqa']['mk'], results['sqa']['risk'], c='steelblue', alpha=0.7, s=80, label='Standard-SQA')
if INCLUDE_PAUSE_SQA:
ax3.scatter(results['pause']['mk'], results['pause']['risk'], c='firebrick', alpha=0.9, s=120, marker='*', label='Pause-SQA')
ax3.set_xlabel("Makespan")
ax3.set_ylabel("Disturbance Risk")
ax3.legend()
ax3.grid(alpha=0.3, linestyle='--')
# Subplot 4 (was Chart 3): Reliability Violin Plot
ax4 = plt.subplot(2, 2, 4)
ax4.set_title("Chart 4: Final Makespan Comparison", fontsize=12)
if INCLUDE_PAUSE_SQA:
parts = ax4.violinplot([results['sqa']['mk'], results['pause']['mk']], showmeans=True)
parts['bodies'][0].set_facecolor('steelblue')
parts['bodies'][1].set_facecolor('firebrick')
for body in parts['bodies']: body.set_alpha(0.7)
ax4.set_xticks([1, 2])
ax4.set_xticklabels(['Standard-SQA', 'Pause-SQA'])
else:
parts = ax4.violinplot([results['sqa']['mk']], showmeans=True)
parts['bodies'][0].set_facecolor('steelblue')
for body in parts['bodies']: body.set_alpha(0.7)
ax4.set_xticks([1])
ax4.set_xticklabels(['Standard-SQA'])
if results['sa']['mk']:
sa_mean = np.mean(results['sa']['mk'])
ax4.axhline(y=sa_mean, color='gray', linestyle=':', linewidth=2, label=f'SA Mean: {sa_mean:.1f}')
ax4.legend()
ax4.grid(alpha=0.3, axis='y')
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig(os.path.join(OUTPUT_DIR, "academic_pause_strategy.png"), dpi=300)
print(f"🎨 原有圖表已繪製: {os.path.join(OUTPUT_DIR, 'academic_pause_strategy.png')}")
# plt.show() (Moved to the end)
# ============================
# 複雜度與環境控制功能 (新增)
# ============================
def inject_hidden_shortcut(D, F):
"""
注入隱藏的黃金捷徑 (Hidden Shortcut)
創造一條極度狹窄且成本極低的路線 (例如 4 -> 5 -> 6 -> 7)。
如果沒照順序走,成本會極高;一旦走對,總成本會大幅下降。
"""
N = D.shape[0]
if N < 8:
return D, F # 城市太少無法做長捷徑
# 定義黃金路線的節點 (假設是 4, 5, 6, 7)
golden_nodes = [4, 5, 6, 7]
# 1. 築起高牆:先把這幾個城市之間的所有連線,都變成極高成本 (阻止 SA 亂湊)
for i in golden_nodes:
for j in golden_nodes:
if i != j:
D[i, j] = 8.0
F[i, j] = 0.8 # 高風險
# 2. 挖出深谷:只開通 4->5, 5->6, 6->7 這條唯一且完美的捷徑
for i in range(len(golden_nodes) - 1):
n1 = golden_nodes[i]
n2 = golden_nodes[i + 1]
# 距離極短,風險為 0
D[n1, n2] = 0.05
D[n2, n1] = 0.05
F[n1, n2] = 0.0
F[n2, n1] = 0.0
print("\n✨ [隱藏捷徑已佈署] 演算法將挑戰尋找極狹窄的黃金路線 (4->5->6->7)")
return D, F
def inject_deceptive_trap(D, F, alpha=10.0, gamma=0.5, sigma=1.0):
"""
注入欺騙性陷阱 (Deceptive Trap)
改造矩陣,創造一條「看似完美的捷徑」,測試演算法是否會陷入局部陷阱。
"""
N = D.shape[0]
if N < 4:
return D, F # 城市太少無法做陷阱
# 定義陷阱節點
trap_start = 1
trap_end = 2
safe_detour = 3
# ==========================================
# 陷阱 1致命捷徑 (距離極度誘人,但風險爆表)
# ==========================================
D[trap_start, trap_end] = 0.1
D[trap_end, trap_start] = 0.1
F[trap_start, trap_end] = 0.99
F[trap_end, trap_start] = 0.99
# ==========================================
# 陷阱 2安全繞路 (距離較遠,但完全無風險)
# ==========================================
# 路線: trap_start -> safe_detour -> trap_end
D[trap_start, safe_detour] = 4.0
D[safe_detour, trap_start] = 4.0
D[safe_detour, trap_end] = 4.0
D[trap_end, safe_detour] = 4.0
F[trap_start, safe_detour] = 0.01
F[safe_detour, trap_start] = 0.01
F[safe_detour, trap_end] = 0.01
F[trap_end, safe_detour] = 0.01
# 計算並印出真實成本,讓您確認陷阱是否成立
# 真實成本 = 距離 + alpha * (sigma / gamma^2) * F^2
risk_multiplier = alpha * (sigma / (gamma**2))
shortcut_cost = 0.1 + risk_multiplier * (0.99**2)
detour_cost = (4.0 + risk_multiplier * (0.01**2)) + (4.0 + risk_multiplier * (0.01**2))
print(f" ➤ 致命捷徑 (1->2) 表觀距離: 0.1 | 隱藏真實成本: {shortcut_cost:.2f}")
print(f" ➤ 安全繞路 (1->3->2) 表觀距離: 8.0 | 隱藏真實成本: {detour_cost:.2f}")
return D, F
def generate_controlled_matrices(num_cities, coord_std, risk_std, seed=42):
"""根據指定的標準差(變異數)生成 D 與 F 矩陣,精準控制問題複雜度"""
np.random.seed(seed)
# 1. 生成距離矩陣 D (使用常態分佈控制空間聚集度)
coords = np.random.normal(loc=0.0, scale=coord_std, size=(num_cities, 2))
D = np.zeros((num_cities, num_cities), dtype=float)
for i in range(num_cities):
for j in range(i + 1, num_cities):
dist = np.linalg.norm(coords[i] - coords[j])
D[i, j] = D[j, i] = dist
# 2. 生成擾動矩陣 F (使用常態分佈控制風險極端值)
F = np.abs(np.random.normal(loc=1.0, scale=risk_std, size=(num_cities, num_cities)))
F = (F + F.T) / 2 # 確保對稱
np.fill_diagonal(F, 0.0)
return np.round(D, 2), np.round(F, 2)
def decode_and_eval(sample, N, D):
# 快速解碼與貪婪/暴力求成本
u1, u2 = [], []
for p in range(N):
for i in range(N):
if sample.get(idx_mtsp(0, i, p, N), 0) == 1: u1.append(i)
if sample.get(idx_mtsp(1, i, p, N), 0) == 1: u2.append(i)
# 簡單修復 (去除重複,補齊缺失)
c1, c2 = set(u1), set(u2)
a1, a2 = [], []
for city in range(1, N):
if city in c1 and city not in c2: a1.append(city)
elif city in c2 and city not in c1: a2.append(city)
else:
if len(a1) <= len(a2): a1.append(city)
else: a2.append(city)
# 評估成本
def eval_cost(cities):
if not cities: return 0
if len(cities) <= EXACT_LIMIT:
bc = float('inf')
for p in permutations(cities):
c = D[0, p[0]] + sum(D[p[i], p[i+1]] for i in range(len(p)-1)) + D[p[-1], 0]
bc = min(bc, c)
return bc
# 貪婪
rem, path, curr = set(cities), [], 0
while rem:
nxt = min(rem, key=lambda x: D[curr, x])
path.append(nxt)
rem.remove(nxt)
curr = nxt
return D[0, path[0]] + sum(D[path[i], path[i+1]] for i in range(len(path)-1)) + D[path[-1], 0]
return max(eval_cost(a1), eval_cost(a2))
def run_complexity_scaling():
print(f"\n🚀 開始複雜度壓力測試...")
print(f"⚙️ 空間變異數(COORD_STD)={COORD_STD}, 風險變異數(RISK_STD)={RISK_STD}")
std_means, pause_means = [], []
std_errs, pause_errs = [], []
sampler = oj.SQASampler()
for N in N_LIST:
print(f"\n📊 測試規模 N={N} ...")
D, F = generate_controlled_matrices(N, COORD_STD, RISK_STD)
# 🌟 新增:生成故障矩陣
Fault_Mat = generate_fault_matrix(N, prob=FAULT_PROBABILITY) if ENABLE_FAULT_SIGNAL else None
# 動態計算懲罰值
max_d = np.max(D)
max_f = np.max(F) if F is not None else 0
max_possible_cost = max_d + max_f
dyn_penalty = max_possible_cost * 12.0
dyn_big_penalty = dyn_penalty * 5.0
Q, _ = build_robust_qubo(D, F, Fault_Mat=Fault_Mat, penalty=dyn_penalty, big_penalty=dyn_big_penalty)
std_results, pause_results = [], []
# 為了節省時間,在壓力測試中我們使用固定的經驗暫停點 s=0.7
# 總步數統一為 SWEEPS_COMPLEXITY_TEST 步
std_sched = get_smooth_sqa_schedule(BETA, total_sweeps=SWEEPS_COMPLEXITY_TEST, num_steps=20)
pause_sched = get_pause_sqa_schedule(0.7, BETA, total_anneal_sweeps=SWEEPS_COMPLEXITY_TEST, pause_sweeps=100, num_steps=30)
# 這裡設定每次 N 跑的次數,與原本 NUM_RUNS 獨立,建議 10 次
runs_for_scale = 10
for r in range(runs_for_scale):
# Standard SQA
res_std = sampler.sample_qubo(Q, schedule=std_sched, num_reads=5)
std_results.append(decode_and_eval(res_std.first.sample, N, D))
# Pause SQA
res_pause = sampler.sample_qubo(Q, schedule=pause_sched, num_reads=5)
pause_results.append(decode_and_eval(res_pause.first.sample, N, D))
mean_std, err_std = np.mean(std_results), np.std(std_results)
mean_pause, err_pause = np.mean(pause_results), np.std(pause_results)
std_means.append(mean_std)
std_errs.append(err_std)
pause_means.append(mean_pause)
pause_errs.append(err_pause)
print(f" Standard-SQA 平均: {mean_std:.2f} ± {err_std:.2f}")
print(f" Pause-SQA 平均: {mean_pause:.2f} ± {err_pause:.2f}")
return std_means, std_errs, pause_means, pause_errs
def plot_crossover(std_means, std_errs, pause_means, pause_errs):
plt.figure(figsize=(10, 6))
plt.title(f"Complexity Scaling: Algorithm Performance vs Problem Size (N)\n(Coord Std={COORD_STD}, Risk Std={RISK_STD})", fontsize=14, fontweight='bold')
# 畫線與誤差棒
plt.errorbar(N_LIST, std_means, yerr=std_errs, fmt='-o', color='steelblue',
linewidth=2.5, capsize=5, markersize=8, label='Standard-SQA')
plt.errorbar(N_LIST, pause_means, yerr=pause_errs, fmt='-s', color='firebrick',
linewidth=2.5, capsize=5, markersize=8, label='Pause-SQA')
plt.xlabel("Problem Scale (Number of Cities $N$)", fontsize=12)
plt.ylabel("Optimized Makespan (Distance Cost)", fontsize=12)
plt.xticks(N_LIST)
plt.grid(alpha=0.4, linestyle='--')
plt.legend(fontsize=11)
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "complexity_crossover.png"), dpi=300)
print(f"🎨 交叉點分析圖已儲存為: {os.path.join(OUTPUT_DIR, 'complexity_crossover.png')}")
# plt.show() (Moved to the end)
def generate_pause_heatmap(Q, N, D, num_reads=10):
"""
掃描 s 參數與 Pause Duration並繪製成本熱力圖
"""
print("\n🚀 開始繪製量子暫停策略熱力圖...")
# 1. 定義掃描範圍
# s 值從 0.1 到 0.9,間隔 0.1 (共 9 個點)
s_values = np.round(np.linspace(0.1, 0.9, 9), 2)
# 暫停步數從 20 到 200間隔 20 (共 10 個點)
pause_durations = np.arange(20, 220, 20)
# 建立一個空矩陣來儲存結果
# row 對應 Pause Duration, col 對應 s
heatmap_data = np.zeros((len(pause_durations), len(s_values)))
# 2. 執行網格掃描 (Grid Search)
total_iters = len(s_values) * len(pause_durations)
current_iter = 0
sampler = oj.SQASampler()
for i, duration in enumerate(pause_durations):
for j, s_star in enumerate(s_values):
current_iter += 1
print(f"進度: {current_iter}/{total_iters} | 測試 s*={s_star}, 暫停={duration}")
pause_sched = get_pause_sqa_schedule(s_star, BETA, total_anneal_sweeps=SWEEPS_HEATMAP_TEST, pause_sweeps=duration, num_steps=30)
costs = []
for _ in range(HEATMAP_RUNS):
res = sampler.sample_qubo(Q, schedule=pause_sched, num_reads=num_reads)
costs.append(decode_and_eval(res.first.sample, N, D))
heatmap_data[i, j] = np.mean(costs)
# 3. 繪製熱力圖
plt.figure(figsize=(12, 8))
ax = sns.heatmap(
heatmap_data,
xticklabels=s_values,
yticklabels=pause_durations,
cmap="YlGnBu", # 黃-綠-藍 配色,數值大偏藍,數值小偏黃(或反過來)
annot=True, # 在格子上顯示數值
fmt=".0f", # 數值不留小數點
cbar_kws={'label': 'Optimized Makespan (Cost)'}
)
# 反轉 Y 軸,讓 20 步在最下面200 步在最上面 (符合物理直覺)
ax.invert_yaxis()
plt.title("Quantum Pause Strategy: Performance Heatmap\n(Finding the Optimal Phase Transition Phase)", fontsize=16, pad=15)
plt.xlabel("Annealing Parameter $s^*$ (Pause Location)", fontsize=14)
plt.ylabel("Pause Duration (Sweeps)", fontsize=14)
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "quantum_pause_heatmap.png"), dpi=300)
print(f"🎨 量子暫停策略熱力圖已儲存為: {os.path.join(OUTPUT_DIR, 'quantum_pause_heatmap.png')}")
def count_faults_hit(path, Fault_Mat):
"""計算路徑中踩到多少個故障點 (故障路段數)"""
if Fault_Mat is None or path is None:
return 0
fault_count = 0
for i in range(len(path) - 1):
city_from = path[i]
city_to = path[i + 1]
if Fault_Mat[city_from, city_to] > 0.5: # 故障點
fault_count += 1
return fault_count
def plot_disturbance_and_fault_matrices(F, Fault_Mat):
"""將擾動矩陣 (F) 與 故障矩陣 (Fault_Mat) 並排繪製熱力圖"""
if Fault_Mat is None:
return
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
fig.suptitle("Environment Disturbance vs. Fault Map", fontsize=16, fontweight='bold')
# 畫擾動矩陣 F (連續值)
sns.heatmap(F, ax=axes[0], cmap="YlOrRd", annot=False)
axes[0].set_title("Disturbance Matrix (F)", fontsize=14)
axes[0].set_xlabel("City index"); axes[0].set_ylabel("City index")
# 畫故障矩陣 Fault_Mat (0或1)
sns.heatmap(Fault_Mat, ax=axes[1], cmap="Reds", cbar=False, linewidths=0.5, linecolor='lightgray')
axes[1].set_title("Fault Matrix (0: Normal, 1: Fault)", fontsize=14)
axes[1].set_xlabel("City index"); axes[1].set_ylabel("City index")
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "disturbance_fault_matrices.png"), dpi=300)
print(f"🎨 擾動和故障矩陣圖已儲存: {os.path.join(OUTPUT_DIR, 'disturbance_fault_matrices.png')}")
# plt.show() (Moved to the end)
def plot_comprehensive_best_comparison(sa_mks, sqa_mks, sa_p1, sa_p2, sqa_p1, sqa_p2, D, F, Fault_Mat):
"""繪製 2x2 Subplot: 綜合比較最佳解的 Makespan、故障數、純距離、純擾動"""
if Fault_Mat is None:
return
# 輔助函數:計算純物理量 (不含 lambda 等懲罰權重)
def calc_raw_metrics(p1, p2):
dist_total = 0
disturb_total = 0
fault_hits = 0
for path in [p1, p2]:
if not path: continue # 避免讀到空路徑
full = [0] + path + [0]
for i in range(len(full)-1):
u, v = full[i], full[i+1]
dist_total += D[u, v]
disturb_total += F[u, v]
fault_hits += Fault_Mat[u, v]
return dist_total, disturb_total, fault_hits
sa_dist, sa_disturb, sa_faults = calc_raw_metrics(sa_p1, sa_p2)
sqa_dist, sqa_disturb, sqa_faults = calc_raw_metrics(sqa_p1, sqa_p2)
# 繪製 2x2 圖表
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle("Comprehensive Performance: SA vs SQA (Best Run)", fontsize=18, fontweight='bold')
labels = ['Classical SA', 'SQA / Pause-SQA']
colors = ['#4C72B0', '#DD8452'] # SA 藍色, SQA 橘色
def plot_bar(ax, vals, title, ylabel, is_int=False):
bars = ax.bar(labels, vals, color=colors, edgecolor='black', linewidth=1.2)
ax.set_title(title, fontsize=14, fontweight='bold')
ax.set_ylabel(ylabel, fontsize=12)
if is_int:
ax.yaxis.set_major_locator(plt.MaxNLocator(integer=True))
# 標示數值
for bar, v in zip(bars, vals):
yval = bar.get_height()
text_str = f'{int(v)}' if is_int else f'{v:.1f}'
ax.text(bar.get_x() + bar.get_width()/2.0, yval, text_str,
ha='center', va='bottom', fontweight='bold', fontsize=12)
# 圖 1: 總評估分數 (Makespan)
plot_bar(axes[0, 0], [sa_mks, sqa_mks], "1. Best Makespan (Total Objective Cost)", "Cost Score")
# 圖 2: 安全性 (Faults Hit)
plot_bar(axes[0, 1], [sa_faults, sqa_faults], "2. Faults Hit (Safety Penalty)", "Number of Hits", is_int=True)
axes[0, 1].set_title("2. Faults Hit (Safety Penalty)", color='darkred', fontweight='bold')
# 圖 3: 飛行距離 (Raw Distance)
plot_bar(axes[1, 0], [sa_dist, sqa_dist], "3. Total Raw Distance (D)", "Distance Units")
# 圖 4: 環境風險暴露 (Raw Disturbance)
plot_bar(axes[1, 1], [sa_disturb, sqa_disturb], "4. Total Environmental Disturbance (F)", "Disturbance Level")
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig(os.path.join(OUTPUT_DIR, "comprehensive_best_comparison.png"), dpi=300)
print(f"🎨 綜合最佳對比圖已儲存: {os.path.join(OUTPUT_DIR, 'comprehensive_best_comparison.png')}")
# plt.show() (Moved to the end)
# ============================
# 主程式執行入口
# ============================
if __name__ == "__main__":
print("==================================================")
if ENABLE_QUANTUM_MINEFIELD:
print("🚀 生成極限量子雷區測試...")
D, F = generate_quantum_minefield(CITIES, random_seed=RANDOM_SEED)
# 為極限雷區建立環狀顯示用實體座標 (符合原有 layout 的精神)
city_coords = []
for i in range(CITIES):
angle = 2 * np.pi * i / CITIES
city_coords.append((50 + 40 * np.cos(angle), 50 + 40 * np.sin(angle)))
# 進行數值驗證與畫圖
if SHOW_TERRAIN_PLOTS:
verify_minefield_stats(D, F)
plot_energy_landscape_heatmap(D, F)
plot_energy_landscape_3d(D, F)
else:
print("🚀 生成一般隨機任務環境...")
D, city_coords = generate_distance_matrix(CITIES, random=RANDOM)
F = generate_disturbance_matrix(CITIES)
# 您可選擇是否在此呼叫原有的干擾產生器
# D, F = inject_deceptive_trap(D, F, alpha=ALPHA, gamma=GAMMA, sigma=SIGMA)
# D, F = inject_hidden_shortcut(D, F)
# 🌟 新增:生成故障矩陣
if ENABLE_FAULT_SIGNAL:
print("🚨 故障訊號 (Fault Signal) 已啟用!正在生成隨機禁飛路段...")
Fault_Mat = generate_fault_matrix(CITIES, prob=FAULT_PROBABILITY, seed=RANDOM_SEED)
else:
Fault_Mat = None
# 原本的其他環境干擾暫時維持 (也可以選擇註解掉,因為 generate_quantum_minefield 已經很極端)
# 為了保持之前的介面一致,這裡先保留動態懲罰值計算
# ======== 🌟 新增:動態自適應懲罰值 ========
# 找出矩陣中最遙遠的距離與最大的擾動
max_d = np.max(D)
max_f = np.max(F) if F is not None else 0
max_possible_cost = max_d + max_f
# 根據最大可能成本,動態設定懲罰值 (通常設為 10 倍 ~ 15 倍最穩定)
PENALTY = max_possible_cost * 12.0
BIG_PENALTY = PENALTY * 5.0
Q = None
if RUN_MAIN_TEST or RUN_HEATMAP_TEST:
Q, _ = build_robust_qubo(D, F, Fault_Mat=Fault_Mat, penalty=PENALTY, big_penalty=BIG_PENALTY)
if RUN_MAIN_TEST:
print(f"🔧 [自動調校] 偵測到單趟最大成本: {max_possible_cost:.2f}")
print(f"🔧 [自動調校] 動態 PENALTY 設為: {PENALTY:.2f}, BIG_PENALTY 設為: {BIG_PENALTY:.2f}")
if COMPARE_H_INFINITY:
print("==================================================")
print("🚀 [開始 H-infinity 避障算法對比測試]")
print("==================================================")
# --- 基準測試 (無避障策略,不考慮障礙物風險) ---
print("\n--- 基準測試 (無避障策略 - Algorithm Ignores Obstacles) ---")
Q_baseline, _ = build_robust_qubo(D, F, Fault_Mat=Fault_Mat, alpha=0.0, penalty=PENALTY, big_penalty=BIG_PENALTY)
s_list_base, var_base, pilot_mks_base, best_s_base = [], [], [], 0.5
if INCLUDE_PAUSE_SQA:
s_list_base, var_base, pilot_mks_base, best_s_base = pilot_search_phase_transition(Q_baseline, CITIES, D, F)
results_base = run_comparative_evaluations(Q_baseline, CITIES, D, F, best_s_base)
# --- 穩健測試 (有避障策略,考慮障礙物風險) ---
print("\n--- 穩健測試 (有避障策略 - Algorithm Avoids Obstacles) ---")
Q_robust, _ = build_robust_qubo(D, F, Fault_Mat=Fault_Mat, alpha=ALPHA, penalty=PENALTY, big_penalty=BIG_PENALTY)
s_list_rob, var_rob, pilot_mks_rob, best_s_rob = [], [], [], 0.5
durations_rob, dur_mks_rob = [], []
if INCLUDE_PAUSE_SQA:
s_list_rob, var_rob, pilot_mks_rob, best_s_rob = pilot_search_phase_transition(Q_robust, CITIES, D, F)
durations_rob, dur_mks_rob = test_pause_durations(Q_robust, CITIES, D, F, best_s_rob)
results_rob = run_comparative_evaluations(Q_robust, CITIES, D, F, best_s_rob)
# Pack results for plotting
results = {
'baseline': results_base,
'robust': results_rob
}
# Note: For academic charts, we will pass robust's data since it's the focus
s_list, variances, pilot_mks, best_s = s_list_rob, var_rob, pilot_mks_rob, best_s_rob
durations, dur_mks = durations_rob, dur_mks_rob
results_for_academic = results_rob
else:
print("==================================================")
print(f"🚀 Quantum Pause Strategy Optimization (mTSP N={CITIES})")
print("==================================================")
print("\n[初始化] 距離矩陣 D:")
print(D)
print("\n[初始化] 擾動矩陣 F:")
print(F)
Q, _ = build_robust_qubo(D, F, Fault_Mat=Fault_Mat, penalty=PENALTY, big_penalty=BIG_PENALTY)
s_list, variances, pilot_mks, best_s = [], [], [], 0.5
durations, dur_mks = [], []
if INCLUDE_PAUSE_SQA:
# 階段 1找相變點
s_list, variances, pilot_mks, best_s = pilot_search_phase_transition(Q, CITIES, D, F)
# 階段 2測試暫停時間
durations, dur_mks = test_pause_durations(Q, CITIES, D, F, best_s)
# 階段 3正式評估 (一次抓齊所有數據)
results = run_comparative_evaluations(Q, CITIES, D, F, best_s)
results_for_academic = results
s_m, s_e, p_m, p_e = None, None, None, None
if RUN_COMPLEXITY_TEST:
# 階段 4複雜度壓力測試與交叉點分析
s_m, s_e, p_m, p_e = run_complexity_scaling()
print("\n🏁 所有運算完成!開始繪製圖表...")
if RUN_MAIN_TEST:
# 繪製圖表一:三大基礎分布對比
create_basic_distribution_charts(results)
# 繪製圖表二Pause SQA 深度分析
create_academic_charts(s_list, variances, pilot_mks, best_s, durations, dur_mks, results_for_academic)
# 繪製圖表:路徑對比圖
if COMPARE_H_INFINITY:
target_res = results['robust']
else:
target_res = results
# 🌟 新增1. 畫出擾動與故障矩陣對比熱力圖
if ENABLE_FAULT_SIGNAL:
plot_disturbance_and_fault_matrices(F, Fault_Mat)
best_sa_1 = target_res['sa']['best_p1']
best_sa_2 = target_res['sa']['best_p2']
if INCLUDE_PAUSE_SQA:
best_qa_1 = target_res['pause']['best_p1']
best_qa_2 = target_res['pause']['best_p2']
qa_makespan = target_res['pause']['best_mk']
else:
best_qa_1 = target_res['sqa']['best_p1']
best_qa_2 = target_res['sqa']['best_p2']
qa_makespan = target_res['sqa']['best_mk']
sa_makespan = target_res['sa']['best_mk']
# 🌟 修改:畫出 SA 與 SQA 的「四大綜合指標」對比圖
if ENABLE_FAULT_SIGNAL:
plot_comprehensive_best_comparison(
sa_mks=sa_makespan,
sqa_mks=qa_makespan,
sa_p1=best_sa_1,
sa_p2=best_sa_2,
sqa_p1=best_qa_1,
sqa_p2=best_qa_2,
D=D,
F=F,
Fault_Mat=Fault_Mat
)
plot_real_route_comparison(CITIES, city_coords, best_sa_1, best_sa_2, best_qa_1, best_qa_2)
if RUN_COMPLEXITY_TEST and s_m is not None:
# 繪製圖表三:複雜度壓力測試與交叉點分析
plot_crossover(s_m, s_e, p_m, p_e)
if RUN_HEATMAP_TEST:
# 繪製圖表四s 與 Pause Duration 熱力圖
generate_pause_heatmap(Q, CITIES, D, num_reads=5) # 為節省時間使用 5 reads
print("\n" + "="*60)
print("🏁 所有實驗與圖表繪製完成!")
print("="*60)
print(f"\n📁 所有圖表已儲存在: {os.path.abspath(OUTPUT_DIR)}")
print("\n📊 已生成的圖表文件:")
if os.path.exists(OUTPUT_DIR):
files = sorted(os.listdir(OUTPUT_DIR))
for i, fname in enumerate(files, 1):
print(f" {i}. {fname}")
print("\n💡 提示: 所有圖表窗口將同時顯示,請在各窗口完成檢視後關閉。")
print("="*60 + "\n")
# 同時顯示所有圖表窗口
plt.show()