You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

397 lines
15 KiB
Python

import time
import numpy as np
import matplotlib.pyplot as plt
from itertools import permutations
import math
import openjij as oj
# ==================== 可調整參數區 ====================
# 問題規模設定
CITIES = 15 # 測試建議先用 12-15確認圖表沒問題再上調到 30
RANDOM = False
RANDOM_SEED = 42
COORD_RANGE = (0.0, 10.0)
# 演算法執行設定
NUM_RUNS = 20 # 正式比較的執行次數
BETA = 10.0 # SQA 的逆溫度參數
# QUBO 參數
PENALTY = 20.0
BIG_PENALTY = 9999.0
# TSP 求解參數
EXACT_LIMIT = 8
# 魯棒優化參數 (H-infinity Robust QUBO)
USE_ROBUST = True # 強制開啟,這是本論文的核心
GAMMA = 0.5
SIGMA = 1.0
ALPHA = 10.0
# =====================================================
# ----------------- 基礎輔助函數 -----------------
def generate_distance_matrix(num_cities, random=True, seed=None, coord_range=None):
if seed is None: seed = RANDOM_SEED
if coord_range is None: coord_range = COORD_RANGE
if not random: np.random.seed(seed)
else: np.random.seed(None)
low, high = coord_range
coords = np.random.uniform(low, high, size=(num_cities, 2))
D = np.zeros((num_cities, num_cities), dtype=float)
for i in range(num_cities):
for j in range(i + 1, num_cities):
dist = np.linalg.norm(coords[i] - coords[j])
D[i, j] = dist
D[j, i] = dist
return np.round(D, 2), coords
def generate_disturbance_matrix(num_cities, seed=None):
if seed is None: seed = RANDOM_SEED
np.random.seed(seed)
F = np.random.rand(num_cities, num_cities)
F = (F + F.T) / 2
np.fill_diagonal(F, 0.0)
return F
def idx_mtsp(k, i, p, N):
return k * (N * N) + i * N + p
def build_robust_qubo(D, F, gamma=GAMMA, sigma=SIGMA, alpha=ALPHA, penalty=PENALTY, big_penalty=BIG_PENALTY):
N = D.shape[0]
Q = {}
def addQ(u, v, w):
if w == 0: return
if u > v: u, v = v, u
Q[(u, v)] = Q.get((u, v), 0.0) + w
for k in range(2):
for p in range(N):
q = (p + 1) % N
for i in range(N):
for j in range(N):
dij, fij = D[i, j], F[i, j]
if dij == 0: continue
# 這裡就是 H-infinity 的核心風險項!完全保留著!
risk_term = (sigma / (gamma**2)) * (fij**2)
total_weight = dij + (alpha * risk_term)
u, v = idx_mtsp(k, i, p, N), idx_mtsp(k, j, q, N)
addQ(u, v, total_weight)
for k in range(2):
for p in range(N):
vars_pos = [idx_mtsp(k, i, p, N) for i in range(N)]
for u in vars_pos: addQ(u, u, -penalty)
for a in range(N):
for b in range(a+1, N): addQ(vars_pos[a], vars_pos[b], 2*penalty)
for i in range(1, N):
vars_city = [idx_mtsp(k, i, p, N) for k in range(2) for p in range(N)]
for u in vars_city: addQ(u, u, -penalty)
L = len(vars_city)
for a in range(L):
for b in range(a+1, L): addQ(vars_city[a], vars_city[b], 2*penalty)
for k in range(2):
for i in range(1, N): addQ(idx_mtsp(k, i, 0, N), idx_mtsp(k, i, 0, N), big_penalty)
addQ(idx_mtsp(k, 0, 0, N), idx_mtsp(k, 0, 0, N), -big_penalty)
return Q, N
def decode_slots(sample, N):
slots = []
for k in range(2):
row = []
for p in range(N):
chosen = 0
for i in range(N):
if sample.get(idx_mtsp(k, i, p, N), 0) == 1:
chosen = i
break
row.append(chosen)
slots.append(row)
return slots[0], slots[1]
def repair_routes_from_slots(u1_slots, u2_slots, N):
count1, count2 = [0]*N, [0]*N
for c in u1_slots:
if 0 <= c < N: count1[c] += 1
for c in u2_slots:
if 0 <= c < N: count2[c] += 1
assign1, assign2 = [], []
for city in range(1, N):
if count1[city] > count2[city]: assign1.append(city)
elif count2[city] > count1[city]: assign2.append(city)
else:
if len(assign1) <= len(assign2): assign1.append(city)
else: assign2.append(city)
return sorted(assign1), sorted(assign2)
def uav_cost(path, D):
if not path or len(path) == 0: return 0.0
cost = D[0, path[0]]
for i in range(len(path)-1): cost += D[path[i], path[i+1]]
cost += D[path[-1], 0]
return float(cost)
def uav_disturbance_energy(path, F, gamma=GAMMA, sigma=SIGMA):
if not path or len(path) == 0: return 0.0
risk = (sigma / (gamma**2)) * (F[0, path[0]]**2)
for i in range(len(path)-1): risk += (sigma / (gamma**2)) * (F[path[i], path[i+1]]**2)
risk += (sigma / (gamma**2)) * (F[path[-1], 0]**2)
return float(risk)
def best_order_for_cities(cities, D, exact_limit=EXACT_LIMIT):
cities = list(cities)
if len(cities) <= 1: return cities, uav_cost(cities, D)
if len(cities) <= exact_limit:
best_perm, best_cost = None, float('inf')
for perm in permutations(cities):
c = uav_cost(list(perm), D)
if c < best_cost: best_cost, best_perm = c, list(perm)
return best_perm, best_cost
else: # 簡單的 nearest neighbor heuristic 當作 fallback
rem = set(cities)
curr = list(rem)[0]
rem.remove(curr)
route = [curr]
while rem:
nxt = min(rem, key=lambda x: D[curr, x])
route.append(nxt)
rem.remove(nxt)
curr = nxt
return route, uav_cost(route, D)
def get_makespan_and_risk(sample, N, D, F):
u1, u2 = decode_slots(sample, N)
a1, a2 = repair_routes_from_slots(u1, u2, N)
p1, c1 = best_order_for_cities(a1, D)
p2, c2 = best_order_for_cities(a2, D)
makespan = max(c1, c2)
dist_energy = uav_disturbance_energy(p1, F) + uav_disturbance_energy(p2, F)
return makespan, dist_energy
# ============================
# 核心創新功能區 (Pause Strategy)
# ============================
def pilot_search_phase_transition(Q, N, D, F):
print("\n🔍 [階段一] 前導偵查 (Pilot Search): 尋找最佳暫停點 s*")
test_s_list = np.arange(0.2, 1.0, 0.1)
variances = []
makespans = []
sampler = oj.SQASampler()
for s in test_s_list:
sched_var = [[s, BETA, 30]]
res_var = sampler.sample_qubo(Q, schedule=sched_var, num_reads=10)
energies = [d.energy for d in res_var.data()]
variances.append(np.var(energies))
sched_pause = [[s, BETA, 20], [s, BETA, 40], [1.0, BETA, 20]]
res_pause = sampler.sample_qubo(Q, schedule=sched_pause, num_reads=5)
mk, _ = get_makespan_and_risk(res_pause.first.sample, N, D, F)
makespans.append(mk)
print(f" 測試 s={s:.1f} | 變異數: {variances[-1]:.0f} | Makespan: {mk:.2f}")
best_s = test_s_list[np.argmin(makespans)]
print(f"⭐ 鎖定最佳暫停點 s* = {best_s:.1f}")
return test_s_list, variances, makespans, best_s
def test_pause_durations(Q, N, D, F, s_star):
print(f"\n⏳ [階段二] 暫停時間測試 (在 s*={s_star:.1f} 進行熱化)")
durations = [0, 20, 50, 100, 150]
duration_makespans = []
sampler = oj.SQASampler()
for d in durations:
if d == 0:
sched = [[1.0, BETA, 100]]
else:
sched = [[s_star, BETA, 50], [s_star, BETA, d], [1.0, BETA, 50]]
res = sampler.sample_qubo(Q, schedule=sched, num_reads=10)
mk, _ = get_makespan_and_risk(res.first.sample, N, D, F)
duration_makespans.append(mk)
print(f" 暫停 {d} 步 -> Makespan: {mk:.2f}")
return durations, duration_makespans
def run_comparative_evaluations(Q, N, D, F, s_star):
print(f"\n⚔️ [階段三] 正式對決 (SA vs SQA vs Pause-SQA, {NUM_RUNS} runs)")
# 資料容器 (包含 Makespan, Risk, Energy)
results = {
'sa': {'mk': [], 'risk': [], 'energy': []},
'sqa': {'mk': [], 'risk': [], 'energy': []},
'pause': {'mk': [], 'risk': [], 'energy': []}
}
sqa_sampler = oj.SQASampler()
sa_sampler = oj.SASampler()
std_sched = [[1.0, BETA, 200]]
pause_sched = [[s_star, BETA, 50], [s_star, BETA, 100], [1.0, BETA, 50]]
for r in range(NUM_RUNS):
# 1. SA (作為全方位 Baseline現在也跑多次)
res_sa = sa_sampler.sample_qubo(Q, num_reads=10)
mk_sa, risk_sa = get_makespan_and_risk(res_sa.first.sample, N, D, F)
results['sa']['mk'].append(mk_sa)
results['sa']['risk'].append(risk_sa)
results['sa']['energy'].append(res_sa.first.energy)
# 2. Standard SQA
res_std = sqa_sampler.sample_qubo(Q, schedule=std_sched, num_reads=10)
mk_std, risk_std = get_makespan_and_risk(res_std.first.sample, N, D, F)
results['sqa']['mk'].append(mk_std)
results['sqa']['risk'].append(risk_std)
results['sqa']['energy'].append(res_std.first.energy)
# 3. Pause SQA
res_pause = sqa_sampler.sample_qubo(Q, schedule=pause_sched, num_reads=10)
mk_pause, risk_pause = get_makespan_and_risk(res_pause.first.sample, N, D, F)
results['pause']['mk'].append(mk_pause)
results['pause']['risk'].append(risk_pause)
results['pause']['energy'].append(res_pause.first.energy)
print(f" SA 平均 Makespan: {np.mean(results['sa']['mk']):.2f}")
print(f" Standard-SQA 平均 Makespan: {np.mean(results['sqa']['mk']):.2f}")
print(f" Pause-SQA 平均 Makespan: {np.mean(results['pause']['mk']):.2f} (Proposed)")
return results
# ============================
# 學術圖表繪製 (Academic Visualization)
# ============================
def create_basic_distribution_charts(results):
"""新增的圖表一:繪製三大指標的分布對比圖 (MinMax, Energy, Disturbance)"""
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
fig.suptitle(f"Algorithm Performance Distributions (N={CITIES})", fontsize=16, fontweight='bold')
labels = ['SA', 'Standard-SQA', 'Pause-SQA']
colors = ['gray', 'steelblue', 'firebrick']
# 1. Makespan 分布
data_mk = [results['sa']['mk'], results['sqa']['mk'], results['pause']['mk']]
parts1 = axes[0].violinplot(data_mk, showmeans=True)
axes[0].set_title("Makespan (minMax) Distribution")
axes[0].set_ylabel("Distance Cost")
# 2. Energy 分布
data_energy = [results['sa']['energy'], results['sqa']['energy'], results['pause']['energy']]
parts2 = axes[1].violinplot(data_energy, showmeans=True)
axes[1].set_title("QUBO System Energy Distribution")
axes[1].set_ylabel("System Energy")
# 3. Disturbance Risk 分布
data_risk = [results['sa']['risk'], results['sqa']['risk'], results['pause']['risk']]
parts3 = axes[2].violinplot(data_risk, showmeans=True)
axes[2].set_title(r"$H_\infty$ Disturbance Risk Distribution")
axes[2].set_ylabel("Risk Penalty")
# 設定外觀
for i, ax in enumerate(axes):
ax.set_xticks([1, 2, 3])
ax.set_xticklabels(labels, fontsize=11)
ax.grid(alpha=0.3, axis='y')
# 上色
parts = [parts1, parts2, parts3][i]
for pc, color in zip(parts['bodies'], colors):
pc.set_facecolor(color)
pc.set_alpha(0.7)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig("distribution_comparison.png", dpi=300)
print("\n🎨 新增圖表已繪製: distribution_comparison.png")
plt.show()
def create_academic_charts(s_list, variances, makespans, best_s, durations, dur_makespans, results):
"""原有的圖表二Pause Strategy 深度分析圖"""
fig = plt.figure(figsize=(16, 10))
fig.suptitle(f"Quantum Pause Strategy in H-infinity Robust mTSP (N={CITIES})", fontsize=16, fontweight='bold')
# Chart 1: Phase Transition & U-Valley
ax1 = plt.subplot(2, 2, 1)
color1, color2 = 'tab:blue', 'tab:red'
ax1.set_title("Chart 1: Phase Transition & U-Valley", fontsize=12)
ax1.set_xlabel("Annealing Parameter $s$")
ax1.set_ylabel("Trotter Energy Variance", color=color1)
ax1.plot(s_list, variances, marker='o', color=color1, linewidth=2)
ax1.tick_params(axis='y', labelcolor=color1)
ax1_twin = ax1.twinx()
ax1_twin.set_ylabel("Makespan", color=color2)
ax1_twin.plot(s_list, makespans, marker='s', color=color2, linestyle='--', linewidth=2)
ax1_twin.tick_params(axis='y', labelcolor=color2)
ax1.axvline(x=best_s, color='green', linestyle=':', linewidth=2)
ax1.grid(alpha=0.3)
# Chart 2: Pareto Front
ax2 = plt.subplot(2, 2, 2)
ax2.set_title("Chart 2: Robustness Pareto Front", fontsize=12)
ax2.scatter(results['sqa']['mk'], results['sqa']['risk'], c='steelblue', alpha=0.7, s=80, label='Standard-SQA')
ax2.scatter(results['pause']['mk'], results['pause']['risk'], c='firebrick', alpha=0.9, s=120, marker='*', label='Pause-SQA')
ax2.set_xlabel("Makespan")
ax2.set_ylabel("Disturbance Risk")
ax2.legend()
ax2.grid(alpha=0.3, linestyle='--')
# Chart 3: Reliability Violin Plot
ax3 = plt.subplot(2, 2, 3)
ax3.set_title("Chart 3: Final Makespan Comparison", fontsize=12)
parts = ax3.violinplot([results['sqa']['mk'], results['pause']['mk']], showmeans=True)
parts['bodies'][0].set_facecolor('steelblue')
parts['bodies'][1].set_facecolor('firebrick')
for body in parts['bodies']: body.set_alpha(0.7)
ax3.set_xticks([1, 2])
ax3.set_xticklabels(['Standard-SQA', 'Pause-SQA'])
sa_mean = np.mean(results['sa']['mk'])
ax3.axhline(y=sa_mean, color='gray', linestyle=':', linewidth=2, label=f'SA Mean: {sa_mean:.1f}')
ax3.legend()
ax3.grid(alpha=0.3, axis='y')
# Chart 4: Effect of Thermalization Time
ax4 = plt.subplot(2, 2, 4)
ax4.set_title(f"Chart 4: Effect of Thermalization Time at $s^*={best_s:.1f}$", fontsize=12)
ax4.plot(durations, dur_makespans, marker='D', color='darkorange', linewidth=2.5, markersize=8)
ax4.set_xlabel("Pause Duration (Sweeps)")
ax4.set_ylabel("Optimized Makespan")
ax4.grid(alpha=0.3, linestyle='--')
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig("academic_pause_strategy.png", dpi=300)
print("🎨 原有圖表已繪製: academic_pause_strategy.png")
plt.show()
# ============================
# 主程式執行入口
# ============================
if __name__ == "__main__":
D, city_coords = generate_distance_matrix(CITIES, random=RANDOM)
F = generate_disturbance_matrix(CITIES)
print("==================================================")
print(f"🚀 Quantum Pause Strategy Optimization (mTSP N={CITIES})")
print("==================================================")
Q, _ = build_robust_qubo(D, F)
# 階段 1找相變點
s_list, variances, pilot_mks, best_s = pilot_search_phase_transition(Q, CITIES, D, F)
# 階段 2測試暫停時間
durations, dur_mks = test_pause_durations(Q, CITIES, D, F, best_s)
# 階段 3正式評估 (一次抓齊所有數據)
results = run_comparative_evaluations(Q, CITIES, D, F, best_s)
# 繪製圖表一:三大基礎分布對比
create_basic_distribution_charts(results)
# 繪製圖表二Pause SQA 深度分析
create_academic_charts(s_list, variances, pilot_mks, best_s, durations, dur_mks, results)
print("🏁 實驗完成!")