Upload files to ''

main
ken910606 1 month ago
parent 97dda65da9
commit c85d2f8801

File diff suppressed because it is too large Load Diff

@ -0,0 +1,996 @@
import time
import numpy as np
import matplotlib.pyplot as plt
from itertools import permutations
import math
import openjij as oj
# ==================== 可調整參數區 ====================
# 問題規模設定
CITIES = 30 # 城市總數(含 depot=0建議範圍: 8-30
RANDOM = False # True: 每次隨機矩陣False: 固定 seed每次執行都一樣
RANDOM_SEED = 42 # 當 RANDOM=False 時使用的固定 seed
COORD_RANGE = (0.0, 10.0) # 城市座標範圍
# 演算法執行設定
CLASSIC = False # 是否執行經典暴力解全域最佳【警告CITIES>10 會非常慢】
NUM_RUNS = 10 # SA / SQA 執行次數(建議 10-100
# QUBO 參數
PENALTY = 20.0 # 約束懲罰權重(一般約束)
BIG_PENALTY = 9999.0 # 硬約束懲罰權重(起點約束)
NUM_READS = 30 # 每次 QUBO 取樣的讀取次數(建議 10-100
# TSP 求解參數
EXACT_LIMIT = 8 # 暴力求解的城市數上限(超過則用貪婪法)
# 魯棒優化參數 (H-infinity Robust QUBO)
USE_ROBUST = False # True or False 是否使用魯棒QUBO考慮干擾/故障風險)
GAMMA = 0.5 # H-infinity 控制器參數(越小越保守,建議 0.3-1.0
SIGMA = 1.0 # 干擾強度參數
ALPHA = 10.0 # 風險權重係數(越大越重視安全性)
# =====================================================
def generate_distance_matrix(num_cities, random=True, seed=None,
coord_range=None):
"""
產生 num_cities x num_cities 的對稱距離矩陣
預設 city 0 depot其餘 1...(num_cities-1) 為任務點
random=False 時使用固定 seed確保每次結果相同
"""
if seed is None:
seed = RANDOM_SEED
if coord_range is None:
coord_range = COORD_RANGE
if not random:
np.random.seed(seed)
else:
# 如果你希望每次執行完全獨立,可以在這裡不要設 seed
# 或用 np.random.seed(None)
np.random.seed(None)
low, high = coord_range
# 這裡把所有城市都放在同一個 2D 平面上隨機產生座標
# 也可以改成固定 depot=(0,0),其餘隨機,看你喜歡:
coords = np.random.uniform(low, high, size=(num_cities, 2))
# 計算歐氏距離矩陣
D = np.zeros((num_cities, num_cities), dtype=float)
for i in range(num_cities):
for j in range(i + 1, num_cities):
dist = np.linalg.norm(coords[i] - coords[j])
D[i, j] = dist
D[j, i] = dist
# 將距離四捨五入到小數點後兩位,易讀一點
D = np.round(D, 2)
return D, coords
def generate_disturbance_matrix(num_cities, seed=None):
"""產生對稱的干擾/故障機率矩陣 F (0~1)"""
if seed is None:
seed = RANDOM_SEED
np.random.seed(seed)
F = np.random.rand(num_cities, num_cities)
# 對稱化 (假設 A到B 和 B到A 的風險一樣)
F = (F + F.T) / 2
# 將對角線設為 0 (自己到自己沒有風險)
np.fill_diagonal(F, 0.0)
return F
# ============================
# 1. KDE 函數
# ============================
def kde_1d(samples, num_points=200):
if len(samples) == 0:
return np.array([]), np.array([])
xs = np.linspace(min(samples), max(samples), num_points)
n = len(samples)
if n < 2:
return xs, np.zeros_like(xs)
std = np.std(samples)
if std == 0:
std = 1.0
h = 1.06 * std * (n ** (-1/5))
ys = []
inv_sqrt_2pi = 1.0 / math.sqrt(2 * math.pi)
for x in xs:
s = 0.0
for xi in samples:
z = (x - xi) / h
s += math.exp(-0.5 * z * z) * inv_sqrt_2pi
ys.append(s / (n * h))
return xs, np.array(ys)
# ============================
# 2. route diversity matrix支援可變長度路徑
# ============================
def route_diversity_matrix(routes, N):
"""
routes: list of routes每條 route 是城市序列例如 [0, 2, 5, 1, 9]
我們只用前 N 個位置來統計超過 N 的部分忽略
"""
freq = np.zeros((N, N), dtype=float)
valid_routes = 0
for route in routes:
if len(route) == 0:
continue
valid_routes += 1
for t in range(min(len(route), N)):
city = route[t]
if 0 <= city < N:
freq[city, t] += 1
if valid_routes > 0:
freq /= valid_routes
return freq
# ============================
# 3. MTSP 變數編碼x[k,i,p] → index
# ============================
def idx_mtsp(k, i, p, N):
""" x[k,i,p] → 1D index """
return k * (N * N) + i * N + p
# ============================
# 4. 建立 2-UAV M-TSP QUBO城市數不固定
# ============================
def build_mtsp_qubo_variable(D, penalty=None, big_penalty=None):
"""
- 城市: 0..N-1其中 0 depot
- 兩台 UAV: k = 0,1
- 每台 UAV N slot: p = 0..N-1
- 變數: x_{k,i,p}
QUBO包含
距離成本 + 位置約束 + 城市約束 + 起點約束
"""
if penalty is None:
penalty = PENALTY
if big_penalty is None:
big_penalty = BIG_PENALTY
D = np.asarray(D)
N = D.shape[0]
Q = {}
def addQ(u, v, w):
if w == 0:
return
if u > v:
u, v = v, u
Q[(u, v)] = Q.get((u, v), 0.0) + w
# ---- 距離成本 ----
for k in range(2):
for p in range(N):
q = (p + 1) % N
for i in range(N):
for j in range(N):
dij = D[i, j]
if dij == 0:
continue
u = idx_mtsp(k, i, p, N)
v = idx_mtsp(k, j, q, N)
addQ(u, v, dij)
# ---- 位置約束:每 slot 只能一個城市 ----
for k in range(2):
for p in range(N):
vars_pos = [idx_mtsp(k, i, p, N) for i in range(N)]
for u in vars_pos:
addQ(u, u, -penalty)
for a in range(N):
for b in range(a+1, N):
addQ(vars_pos[a], vars_pos[b], 2*penalty)
# ---- 城市約束1..N-1 每城恰好一次 ----
for i in range(1, N):
vars_city = []
for k in range(2):
for p in range(N):
vars_city.append(idx_mtsp(k, i, p, N))
for u in vars_city:
addQ(u, u, -penalty)
L = len(vars_city)
for a in range(L):
for b in range(a+1, L):
addQ(vars_city[a], vars_city[b], 2*penalty)
# ---- 起點約束:每台 UAV 的 p=0 必須是 city 0 ----
for k in range(2):
for i in range(1, N):
addQ(idx_mtsp(k, i, 0, N), idx_mtsp(k, i, 0, N), big_penalty)
addQ(idx_mtsp(k, 0, 0, N), idx_mtsp(k, 0, 0, N), -big_penalty)
return Q, N
# ============================
# 4b. 建立包含 H_infinity 魯棒項的 QUBO
# ============================
def build_robust_qubo(D, F, gamma=None, sigma=None, alpha=None, penalty=None, big_penalty=None):
"""
建立包含 H_infinity 魯棒項的 QUBO
D: 距離矩陣
F: 干擾/故障機率矩陣 (0~1)
gamma: H_infinity 控制器參數越小越保守
sigma: 干擾強度參數
alpha: 用來平衡 Distance Risk 的權重係數
penalty: 約束懲罰權重
big_penalty: 硬約束懲罰權重
"""
if gamma is None:
gamma = GAMMA
if sigma is None:
sigma = SIGMA
if alpha is None:
alpha = ALPHA
if penalty is None:
penalty = PENALTY
if big_penalty is None:
big_penalty = BIG_PENALTY
N = D.shape[0]
Q = {}
def addQ(u, v, w):
if w == 0:
return
if u > v:
u, v = v, u
Q[(u, v)] = Q.get((u, v), 0.0) + w
# ==========================
# 1. 目標函數 (距離 + H_infinity 風險)
# ==========================
for k in range(2): # 2台 UAV
for p in range(N):
q = (p + 1) % N
for i in range(N):
for j in range(N):
dij = D[i, j]
fij = F[i, j]
if dij == 0:
continue
# --- H_infinity Robust Cost ---
# Risk = (sigma / gamma^2) * F^2
# 當 F 接近 1 且 gamma 小時,這項會很大
risk_term = (sigma / (gamma**2)) * (fij**2)
# 總權重 = 距離 + alpha * 風險
# alpha 越大,越重視安全性
total_weight = dij + (alpha * risk_term)
u = idx_mtsp(k, i, p, N)
v = idx_mtsp(k, j, q, N)
addQ(u, v, total_weight)
# ==========================
# 2. 約束條件 (與原本 MTSP 相同)
# ==========================
# (A) 位置約束:每台 UAV 在每個時間點 p 只能在一個城市
for k in range(2):
for p in range(N):
vars_pos = [idx_mtsp(k, i, p, N) for i in range(N)]
for u in vars_pos:
addQ(u, u, -penalty)
for a in range(N):
for b in range(a+1, N):
addQ(vars_pos[a], vars_pos[b], 2*penalty)
# (B) 城市約束:每個任務城市 (1..N-1) 必須被拜訪恰好一次
for i in range(1, N):
vars_city = []
for k in range(2):
for p in range(N):
vars_city.append(idx_mtsp(k, i, p, N))
for u in vars_city:
addQ(u, u, -penalty)
L = len(vars_city)
for a in range(L):
for b in range(a+1, L):
addQ(vars_city[a], vars_city[b], 2*penalty)
# (C) 起點約束:所有 UAV 時間 0 都在 depot (city 0)
for k in range(2):
# p=0 必須是 city 0
for i in range(1, N):
u = idx_mtsp(k, i, 0, N)
addQ(u, u, big_penalty) # 懲罰出現在 p=0 的非 depot 點
# 鼓勵 city 0 在 p=0
u_depot = idx_mtsp(k, 0, 0, N)
addQ(u_depot, u_depot, -big_penalty)
return Q, N
# ============================
# 5. 解碼 slot 序列(不修剪)
# ============================
def decode_slots(sample, N):
"""
回傳
u1_slots, u2_slots
各自長度 N裡面是城市 index可能重複也可能很多 0
"""
slots = []
for k in range(2):
row = []
for p in range(N):
chosen = 0 # 預設當作待在 depot
for i in range(N):
if sample.get(idx_mtsp(k, i, p, N), 0) == 1:
chosen = i
break
row.append(chosen)
slots.append(row)
return slots[0], slots[1]
# ============================
# 6. 根據 slots 做 repair → 產生合法 uav1 / uav2 path
# ============================
def repair_routes_from_slots(u1_slots, u2_slots, N):
"""
依據 slots 傾向資訊把城市分配給兩台 UAV
並用暴力 TSP 在各自子集合裡找最短路徑確保
- 每個城市 1..N-1 被恰好一台 UAV 拜訪
- 兩台 UAV 路徑皆為 0 -> ... -> 0成本計算時處理
"""
# 統計每個城市在兩台 UAV slots 中出現次數
count1 = [0]*N
count2 = [0]*N
for c in u1_slots:
if 0 <= c < N:
count1[c] += 1
for c in u2_slots:
if 0 <= c < N:
count2[c] += 1
# 分配城市:看誰出現次數多,平手則讓城市較少的一邊拿
assign1 = []
assign2 = []
for city in range(1, N): # city 0 = depot 不分配
if count1[city] > count2[city]:
assign1.append(city)
elif count2[city] > count1[city]:
assign2.append(city)
else:
# 平手 → 給目前負載較少的 UAV
if len(assign1) <= len(assign2):
assign1.append(city)
else:
assign2.append(city)
return sorted(assign1), sorted(assign2)
# ============================
# 7. 單 UAV 成本 & 子集合最佳排列
# ============================
def uav_cost(path, D):
if not path:
return 0.0
path = [c for c in path if 0 <= c < len(D)]
if len(path) == 0:
return 0.0
cost = D[0, path[0]]
for i in range(len(path)-1):
cost += D[path[i], path[i+1]]
cost += D[path[-1], 0]
return float(cost)
def uav_disturbance_energy(path, F, gamma=None, sigma=None):
"""
計算單一 UAV 路徑的擾動能量
path: UAV 的路徑 (list of city indices)
F: 干擾矩陣
公式: Risk = (sigma / gamma^2) * sum(F[i,j]^2 for all edges)
"""
if gamma is None:
gamma = GAMMA
if sigma is None:
sigma = SIGMA
if not path or len(path) == 0:
return 0.0
path = [c for c in path if 0 <= c < len(F)]
if len(path) == 0:
return 0.0
risk = 0.0
# depot (0) -> first city
risk += (sigma / (gamma**2)) * (F[0, path[0]]**2)
# city to city
for i in range(len(path)-1):
risk += (sigma / (gamma**2)) * (F[path[i], path[i+1]]**2)
# last city -> depot (0)
risk += (sigma / (gamma**2)) * (F[path[-1], 0]**2)
return float(risk)
def best_order_for_cities(cities, D, exact_limit=None):
"""
cities: list of city index (1..N-1)
exact_limit: 小於等於這個長度才用暴力全排列太多就改用近鄰貪婪
"""
if exact_limit is None:
exact_limit = EXACT_LIMIT
cities = list(cities)
if len(cities) <= 1:
return cities, uav_cost(cities, D)
# 小問題可以繼續用暴力(例如 <= 8
if len(cities) <= exact_limit:
best_perm = None
best_cost = float('inf')
for perm in permutations(cities):
c = uav_cost(list(perm), D)
if c < best_cost:
best_cost = c
best_perm = list(perm)
return best_perm, best_cost
# 太多城市 → 改用最近鄰貪婪法
remaining = set(cities)
path = []
current = 0 # 從 depot 出發
while remaining:
# 找離 current 最近的城市
next_city = min(remaining, key=lambda c: D[current, c])
path.append(next_city)
remaining.remove(next_city)
current = next_city
return path, uav_cost(path, D)
# ============================
# 8. 利用 QUBO 取樣 + repair 得到合法解
# ============================
def sample_and_repair(sampler, Q, N, D, F=None, run_num=None, method_name=""):
"""
一次取樣 + repair即可保證得到合法 2-UAV M-TSP
目標minimize max(UAV1_cost, UAV2_cost)minMax / makespan
F: 干擾矩陣可選
"""
result = sampler.sample_qubo(Q, num_reads=NUM_READS)
sample = result.first.sample
# 1) 先解出 slots
u1_slots, u2_slots = decode_slots(sample, N)
# 2) 依 slots 傾向分配城市給兩台 UAV
assign1, assign2 = repair_routes_from_slots(u1_slots, u2_slots, N)
# 3) 各自子集合內找最短路徑(仍是對各自子集合做最短路徑)
u1_path, c1 = best_order_for_cities(assign1, D)
u2_path, c2 = best_order_for_cities(assign2, D)
# 🔁 這裡把原本的「總成本 = c1 + c2」改成「max(c1, c2)」
makespan = max(c1, c2)
# 計算擾動能量(如果有干擾矩陣)
disturbance_energy = 0.0
if F is not None:
risk1 = uav_disturbance_energy(u1_path, F)
risk2 = uav_disturbance_energy(u2_path, F)
disturbance_energy = risk1 + risk2 # 總擾動能量
return u1_path, u2_path, c1, c2, makespan, disturbance_energy
# ============================
# 9. Classic 2-UAV MTSP 暴力搜尋(全域最佳)
# ============================
def classic_mtsp_two_uav_bruteforce(D):
"""
2-UAV M-TSP 的經典暴力解
目標改為 minimize max(cost(UAV1), cost(UAV2)) = 最短完成時間 (minMax)
"""
N = D.shape[0]
cities = list(range(1, N))
best_makespan = float('inf')
best_sum_cost = float('inf')
best_u1, best_u2 = None, None
best_route = None
def cost_of(path):
return uav_cost(path, D)
# 計算總排列數
from math import factorial
total_perms = factorial(len(cities))
total_combos = total_perms * len(cities) # 每個排列有 len+1 種切分
print(f"📊 暴力搜尋: {total_perms} 種排列 × {len(cities)+1} 種切分 = {total_combos:,} 種組合")
count = 0
last_print_time = time.time()
print_interval = 0.5 # 每 0.5 秒更新一次進度
for perm in permutations(cities):
for cut in range(len(perm) + 1):
count += 1
# 定期顯示進度
current_time = time.time()
if current_time - last_print_time >= print_interval:
progress = (count / total_combos) * 100
print(f"\r 進度: {count:,}/{total_combos:,} ({progress:.1f}%)", end="", flush=True)
last_print_time = current_time
u1 = list(perm[:cut])
u2 = list(perm[cut:])
c1 = cost_of(u1)
c2 = cost_of(u2)
makespan = max(c1, c2)
total_sum = c1 + c2
if (makespan < best_makespan) or \
(math.isclose(makespan, best_makespan) and total_sum < best_sum_cost):
best_makespan = makespan
best_sum_cost = total_sum
best_u1 = u1
best_u2 = u2
best_route = [0] + list(perm)
print(f"\r 進度: {total_combos:,}/{total_combos:,} (100.0%) ✓")
return best_u1, best_u2, best_makespan, best_route
# ============================
# 10. 共用:計算兩 UAV 總成本
# ============================
def compute_two_uav_cost(uav1_path, uav2_path, D):
c1 = uav_cost(uav1_path, D)
c2 = uav_cost(uav2_path, D)
return c1 + c2, c1, c2
# ============================
# 11. SA & SQA每 run 使用 sample_and_repair
# ============================
def run_mtsp_sa_sqa(Q, N, D, F=None, NUM_RUNS=10):
"""
使用 SA / SQA QUBO每次取一組解並經過 repair
評估指標使用 makespan = max(UAV1_cost, UAV2_cost)minMax
F: 干擾矩陣可選
"""
sa_makespans = []
sa_routes = []
sa_u1_list = []
sa_u2_list = []
sa_uav_costs = []
sa_disturbances = [] # 擾動能量
sqa_makespans = []
sqa_routes = []
sqa_u1_list = []
sqa_u2_list = []
sqa_uav_costs = []
sqa_disturbances = [] # 擾動能量
sa = oj.SASampler()
sqa = oj.SQASampler()
# ========== SA 計時開始 ==========
print("\n🔥 Running SA (with repair, always feasible, minMax objective)...")
print(f"📊 總共 {NUM_RUNS} 次運行")
sa_start_time = time.time()
for r in range(NUM_RUNS):
u1, u2, c1, c2, makespan, dist_energy = sample_and_repair(sa, Q, N, D, F, run_num=r+1, method_name="SA")
full_route = [0] + u1 + u2
if F is not None:
print(f" 結果: UAV1={u1}, UAV2={u2}, "
f"UAV1_cost={c1:.2f}, UAV2_cost={c2:.2f}, "
f"Makespan={makespan:.2f}, Disturbance={dist_energy:.4f}")
else:
print(f" 結果: UAV1={u1}, UAV2={u2}, "
f"UAV1_cost={c1:.2f}, UAV2_cost={c2:.2f}, "
f"Makespan={makespan:.2f}")
sa_makespans.append(makespan)
sa_routes.append(full_route)
sa_u1_list.append(u1)
sa_u2_list.append(u2)
sa_uav_costs.append((c1, c2))
sa_disturbances.append(dist_energy)
sa_elapsed = time.time() - sa_start_time
print(f"⏱️ SA Total Time: {sa_elapsed:.3f} seconds ({sa_elapsed/NUM_RUNS:.3f} sec/run)")
# ========== SQA 計時開始 ==========
print("\n🔥 Running SQA (with repair, always feasible, minMax objective)...")
print(f"📊 總共 {NUM_RUNS} 次運行")
sqa_start_time = time.time()
for r in range(NUM_RUNS):
u1, u2, c1, c2, makespan, dist_energy = sample_and_repair(sqa, Q, N, D, F, run_num=r+1, method_name="SQA")
full_route = [0] + u1 + u2
if F is not None:
print(f" 結果: UAV1={u1}, UAV2={u2}, "
f"UAV1_cost={c1:.2f}, UAV2_cost={c2:.2f}, "
f"Makespan={makespan:.2f}, Disturbance={dist_energy:.4f}")
else:
print(f" 結果: UAV1={u1}, UAV2={u2}, "
f"UAV1_cost={c1:.2f}, UAV2_cost={c2:.2f}, "
f"Makespan={makespan:.2f}")
sqa_makespans.append(makespan)
sqa_routes.append(full_route)
sqa_u1_list.append(u1)
sqa_u2_list.append(u2)
sqa_uav_costs.append((c1, c2))
sqa_disturbances.append(dist_energy)
sqa_elapsed = time.time() - sqa_start_time
print(f"⏱️ SQA Total Time: {sqa_elapsed:.3f} seconds ({sqa_elapsed/NUM_RUNS:.3f} sec/run)")
sa_results = (sa_makespans, sa_routes, sa_u1_list, sa_u2_list, sa_uav_costs, sa_disturbances)
sqa_results = (sqa_makespans, sqa_routes, sqa_u1_list, sqa_u2_list, sqa_uav_costs, sqa_disturbances)
return sa_results, sqa_results, sa_elapsed, sqa_elapsed
# ============================
# 12. 綜合分析圖表(你指定的版本)
# ============================
def create_comprehensive_charts(sa_results, sqa_results, classic_results, D, N):
"""創建綜合分析圖表目標minMax makespan"""
fig = plt.figure(figsize=(24, 12))
# 解包結果(第一個 list 現在代表 makespan最後一個是擾動能量
sa_costs, sa_routes, sa_uav1, sa_uav2, sa_uav_costs, sa_disturbances = sa_results
sqa_costs, sqa_routes, sqa_uav1, sqa_uav2, sqa_uav_costs, sqa_disturbances = sqa_results
classic_uav1, classic_uav2, classic_cost, classic_route = classic_results
# ---------------------------------------------------------
# 1. MTSP 完成時間 (makespan) 分布比較 (左上)
# ---------------------------------------------------------
ax1 = plt.subplot(2, 4, 1)
bins = range(int(min(sa_costs + sqa_costs)) - 1,
int(max(sa_costs + sqa_costs)) + 2)
ax1.hist(sa_costs, bins=bins, alpha=0.6, label="SA Makespan", density=True, color='lightblue')
ax1.hist(sqa_costs, bins=bins, alpha=0.6, label="SQA Makespan", density=True, color='lightcoral')
# 添加KDE
if len(sa_costs) > 1:
xs_sa, ys_sa = kde_1d(sa_costs)
ax1.plot(xs_sa, ys_sa, label="SA KDE", color='blue', linewidth=2)
if len(sqa_costs) > 1:
xs_sqa, ys_sqa = kde_1d(sqa_costs)
ax1.plot(xs_sqa, ys_sqa, label="SQA KDE", color='red', linewidth=2)
ax1.axvline(x=classic_cost, color='green', linestyle='--',
label=f'Classic minMax: {classic_cost:.2f}', linewidth=2)
ax1.set_title("System Makespan Distribution (max route length)")
ax1.set_xlabel("Makespan (max cost of UAV1, UAV2)")
ax1.set_ylabel("Density")
ax1.legend()
ax1.grid(True, alpha=0.3)
# ---------------------------------------------------------
# 2. 單一 UAV 成本分布比較 (右上)
# ---------------------------------------------------------
ax2 = plt.subplot(2, 4, 2)
sa_single_costs = [c for costs in sa_uav_costs for c in costs]
sqa_single_costs = [c for costs in sqa_uav_costs for c in costs]
c1_classic = uav_cost(classic_uav1, D)
c2_classic = uav_cost(classic_uav2, D)
all_single = sa_single_costs + sqa_single_costs
uav_bins = range(int(min(all_single)) - 1, int(max(all_single)) + 2)
ax2.hist(sa_single_costs, bins=uav_bins, alpha=0.6, label="SA Single UAV", density=True, color='skyblue')
ax2.hist(sqa_single_costs, bins=uav_bins, alpha=0.6, label="SQA Single UAV", density=True, color='salmon')
ax2.axvline(x=c1_classic, color='darkgreen', linestyle=':', label='Classic UAV1')
ax2.axvline(x=c2_classic, color='limegreen', linestyle=':', label='Classic UAV2')
ax2.set_title("Route Length Distribution per UAV")
ax2.set_xlabel("Cost per UAV")
ax2.set_ylabel("Density")
ax2.legend()
ax2.grid(True, alpha=0.3)
# ---------------------------------------------------------
# 3. 箱型圖比較 (中上):系統 makespan vs 單機成本
# ---------------------------------------------------------
ax3 = plt.subplot(2, 3, 3)
box_data = [sa_costs, sqa_costs, sa_single_costs, sqa_single_costs]
bp = ax3.boxplot(
box_data,
labels=["SA\nMakespan", "SQA\nMakespan", "SA\nSingle", "SQA\nSingle"],
patch_artist=True,
showmeans=True
)
colors = ['lightblue', 'lightcoral', 'skyblue', 'salmon']
for patch, color in zip(bp['boxes'], colors):
patch.set_facecolor(color)
ax3.set_title("Makespan vs Individual UAV Cost")
ax3.set_ylabel("Cost")
ax3.grid(True, alpha=0.3)
# ---------------------------------------------------------
# 4. 總能量分布比較 (Total Energy - Distance Cost)
# ---------------------------------------------------------
ax4 = plt.subplot(2, 4, 4)
# 繪製總能量直方圖
all_energies = sa_costs + sqa_costs
bins_energy = range(int(min(all_energies)) - 1, int(max(all_energies)) + 2)
ax4.hist(sa_costs, bins=bins_energy, alpha=0.6, label="SA Total Energy", density=True, color='skyblue')
ax4.hist(sqa_costs, bins=bins_energy, alpha=0.6, label="SQA Total Energy", density=True, color='salmon')
# 添加 KDE
if len(sa_costs) > 1:
xs_sa, ys_sa = kde_1d(sa_costs)
ax4.plot(xs_sa, ys_sa, color='blue', linewidth=2, alpha=0.8)
if len(sqa_costs) > 1:
xs_sqa, ys_sqa = kde_1d(sqa_costs)
ax4.plot(xs_sqa, ys_sqa, color='red', linewidth=2, alpha=0.8)
ax4.set_xlabel("Total Energy (Distance Cost)")
ax4.set_ylabel("Density")
ax4.set_title(f"Total Energy Distribution\nSA: {np.mean(sa_costs):.2f}±{np.std(sa_costs):.2f} | SQA: {np.mean(sqa_costs):.2f}±{np.std(sqa_costs):.2f}")
ax4.legend()
ax4.grid(alpha=0.3)
# ---------------------------------------------------------
# 5. SA 路徑多樣性分析
# ---------------------------------------------------------
ax5 = plt.subplot(2, 4, 5)
sa_mat = route_diversity_matrix(sa_routes, N)
im1 = ax5.imshow(sa_mat, aspect='auto', origin='lower', cmap='viridis')
ax5.set_title("SA Route Diversity")
ax5.set_xlabel("Position")
ax5.set_ylabel("City")
plt.colorbar(im1, ax=ax5, shrink=0.8)
# ---------------------------------------------------------
# 6. SQA 路徑多樣性分析
# ---------------------------------------------------------
ax6 = plt.subplot(2, 4, 6)
sqa_mat = route_diversity_matrix(sqa_routes, N)
im2 = ax6.imshow(sqa_mat, aspect='auto', origin='lower', cmap='viridis')
ax6.set_title("SQA Route Diversity")
ax6.set_xlabel("Position")
ax6.set_ylabel("City")
plt.colorbar(im2, ax=ax6, shrink=0.8)
# ---------------------------------------------------------
# 7. 擾動能量分布比較 (Disturbance Energy Distribution)
# ---------------------------------------------------------
ax7 = plt.subplot(2, 4, 7)
# 繪製擾動能量直方圖
all_disturbances = sa_disturbances + sqa_disturbances
if len(all_disturbances) > 0 and max(all_disturbances) > 0:
bins_disturb = np.linspace(min(all_disturbances), max(all_disturbances), 20)
ax7.hist(sa_disturbances, bins=bins_disturb, alpha=0.6, label="SA Disturbance",
density=True, color='lightgreen')
ax7.hist(sqa_disturbances, bins=bins_disturb, alpha=0.6, label="SQA Disturbance",
density=True, color='lightpink')
# 添加 KDE
if len(sa_disturbances) > 1 and np.std(sa_disturbances) > 0:
xs_sa, ys_sa = kde_1d(sa_disturbances)
ax7.plot(xs_sa, ys_sa, color='green', linewidth=2, alpha=0.8)
if len(sqa_disturbances) > 1 and np.std(sqa_disturbances) > 0:
xs_sqa, ys_sqa = kde_1d(sqa_disturbances)
ax7.plot(xs_sqa, ys_sqa, color='purple', linewidth=2, alpha=0.8)
ax7.set_title(f"Disturbance Energy Distribution\\nSA: {np.mean(sa_disturbances):.4f}±{np.std(sa_disturbances):.4f} | SQA: {np.mean(sqa_disturbances):.4f}±{np.std(sqa_disturbances):.4f}")
else:
ax7.text(0.5, 0.5, "No disturbance data\\n(USE_ROBUST=False)",
ha='center', va='center', transform=ax7.transAxes, fontsize=12)
ax7.set_title("Disturbance Energy Distribution")
ax7.set_xlabel("Disturbance Energy")
ax7.set_ylabel("Density")
ax7.legend()
ax7.grid(alpha=0.3)
# ---------------------------------------------------------
# 8. 最佳路徑可視化 (Best Route Visualization)
# ---------------------------------------------------------
ax8 = plt.subplot(2, 4, 8)
best_sa_idx = sa_costs.index(min(sa_costs))
best_sqa_idx = sqa_costs.index(min(sqa_costs))
best_sa_u1 = sa_uav1[best_sa_idx]
best_sa_u2 = sa_uav2[best_sa_idx]
best_sqa_u1 = sqa_uav1[best_sqa_idx]
best_sqa_u2 = sqa_uav2[best_sqa_idx]
if min(sa_costs) <= min(sqa_costs):
u1 = best_sa_u1
u2 = best_sa_u2
route_name = "SA"
route_cost = min(sa_costs)
else:
u1 = best_sqa_u1
u2 = best_sqa_u2
route_name = "SQA"
route_cost = min(sqa_costs)
angles = np.linspace(0, 2*np.pi, N, endpoint=False)
x_pos = np.cos(angles)
y_pos = np.sin(angles)
ax8.scatter(x_pos, y_pos, s=200, c='red', zorder=5)
for i, (x, y) in enumerate(zip(x_pos, y_pos)):
ax8.annotate(str(i), (x, y), ha='center', va='center',
fontsize=10, fontweight='bold', color='white', zorder=6)
if len(u1) > 0:
pts = [0] + u1 + [0]
xs = [x_pos[c] for c in pts]
ys = [y_pos[c] for c in pts]
ax8.plot(xs, ys, '-o', color='blue', linewidth=2, alpha=0.8,
label=f'UAV1 ({len(u1)})')
if len(u2) > 0:
pts = [0] + u2 + [0]
xs = [x_pos[c] for c in pts]
ys = [y_pos[c] for c in pts]
ax8.plot(xs, ys, '-o', color='orange', linewidth=2, alpha=0.8,
label=f'UAV2 ({len(u2)})')
ax8.set_title(f"Best {route_name} MTSP Solution (minMax) Makespan: {route_cost:.2f}")
ax8.set_xlim(-1.3, 1.3)
ax8.set_ylim(-1.3, 1.3)
ax8.set_aspect('equal')
ax8.axis('off')
ax8.legend(loc='upper right')
plt.tight_layout()
plt.savefig("mtsp_two_uav_comprehensive_analysis.png", dpi=300, bbox_inches='tight')
print(f"\n📊 綜合分析圖表已保存: mtsp_two_uav_comprehensive_analysis.png")
plt.show()
# ============================
# 13. main整合執行
# ============================
if __name__ == "__main__":
# 距離矩陣10 城市
# 讓原本程式沿用 N、D 命名
N = CITIES
D, city_coords = generate_distance_matrix(CITIES, random=RANDOM)
print(f"產生 {CITIES} 個城市的距離矩陣RANDOM={RANDOM}")
print("城市座標 (含 depot=0)")
for idx, (x, y) in enumerate(city_coords):
print(f"City {idx}: ({x:.2f}, {y:.2f})")
'''
D = np.array([
[0.00, 3.16, 4.47, 7.81, 8.54, 7.28, 6.08, 8.60, 5.83, 9.85],
[3.16, 0.00, 3.16, 5.00, 7.00, 4.12, 5.39, 6.32, 2.83, 8.06],
[4.47, 3.16, 0.00, 4.12, 4.12, 5.39, 2.24, 4.24, 3.16, 5.39],
[7.81, 5.00, 4.12, 0.00, 4.24, 3.16, 5.10, 2.24, 2.24, 4.47],
[8.54, 7.00, 4.12, 4.24, 0.00, 7.21, 2.83, 2.24, 5.39, 1.41],
[7.28, 4.12, 5.39, 3.16, 7.21, 0.00, 7.21, 5.39, 2.24, 7.62],
[6.08, 5.39, 2.24, 5.10, 2.83, 7.21, 0.00, 4.12, 5.00, 4.24],
[8.60, 6.32, 4.24, 2.24, 2.24, 5.39, 4.12, 0.00, 4.00, 2.24],
[5.83, 2.83, 3.16, 2.24, 5.39, 2.24, 5.00, 4.00, 0.00, 6.08],
[9.85, 8.06, 5.39, 4.47, 1.41, 7.62, 4.24, 2.24, 6.08, 0.00],
])
D = np.array([
[0. , 7.56, 2.09, 6.49, 7.75, 4.1, 1.24, 7.15, 2.81, 4.79],
[7.56, 0., 9.02, 5.63, 2.44, 6.15, 8.37, 2.37, 8.94, 7.51],
[2.09, 9.02, 0., 8.57, 8.71, 6.19, 0.85, 8.97, 4.27, 6.78],
[6.49, 5.63, 8.57, 0., 7.7, 2.48, 7.72, 3.39, 5.82, 2.9 ],
[7.75, 2.44, 8.71, 7.7, 0., 7.6, 8.23, 4.71, 9.76, 9.05],
[4.1, 6.15, 6.19, 2.48, 7.6, 0., 5.34, 4.52, 3.45, 1.45],
[1.24, 8.37, 0.85, 7.72, 8.23, 5.34, 0., 8.2, 3.64, 5.98],
[7.15, 2.37, 8.97, 3.39, 4.71, 4.52, 8.2, 0., 7.78, 5.7 ],
[2.81, 8.94, 4.27, 5.82, 9.76, 3.45, 3.64, 7.78, 0., 3.17],
[4.79, 7.51, 6.78, 2.9, 9.05, 1.45, 5.98, 5.7, 3.17, 0. ]
])
'''
N = D.shape[0]
print("🚀 2-UAV M-TSP (Quantum + Repair vs Classic) 開始運行...\n")
print(f"Distance Matrix ({N}x{N}):")
print(D)
# 建立 QUBO (根據 USE_ROBUST 決定使用哪種)
F = None # 初始化干擾矩陣
if USE_ROBUST:
print("\n🛡️ 使用魯棒 QUBO (H-infinity Robust)")
print(f" Gamma={GAMMA}, Sigma={SIGMA}, Alpha={ALPHA}")
F = generate_disturbance_matrix(N)
print(f"\n干擾矩陣 F ({N}x{N}):")
print(F)
Q, _ = build_robust_qubo(D, F)
else:
print("\n📊 使用標準 QUBO (不考慮風險)")
F = generate_disturbance_matrix(N) # 即使不用魯棒QUBO也生成F用於統計
Q, _ = build_mtsp_qubo_variable(D)
# SA / SQA每 run 一定是合法解
sa_results, sqa_results, sa_time, sqa_time = run_mtsp_sa_sqa(Q, N, D, F, NUM_RUNS)
# ========== Classic 計時 ==========
if CLASSIC == True:
print("\n🎯 Running Classic 2-UAV MTSP (Bruteforce ALL solutions)...")
classic_start_time = time.time()
classic_uav1, classic_uav2, classic_total_cost, classic_route = classic_mtsp_two_uav_bruteforce(D)
classic_elapsed = time.time() - classic_start_time
print(f"Classic UAV1={classic_uav1}, UAV2={classic_uav2}, Cost={classic_total_cost}")
print(f"Classic merged route (0 + perm): {classic_route}")
print(f"⏱️ Classic Total Time: {classic_elapsed:.3f} seconds")
else:
classic_total_cost = float('inf')
classic_elapsed = 0.0
classic_uav1, classic_uav2, classic_route = [], [], []
# Summary
sa_costs, _, _, _, _, sa_disturbances = sa_results
sqa_costs, _, _, _, _, sqa_disturbances = sqa_results
print("\n📊 Result Summary:")
print(f"SA best minMax (makespan) = {min(sa_costs):.2f}, mean = {np.mean(sa_costs):.2f}")
print(f"SQA best minMax (makespan) = {min(sqa_costs):.2f}, mean = {np.mean(sqa_costs):.2f}")
print(f"Classic global minMax optimum = {classic_total_cost:.2f}")
# 擾動能量統計
if F is not None:
print("\n⚡ Disturbance Energy Summary:")
print(f"SA min = {min(sa_disturbances):.4f}, max = {max(sa_disturbances):.4f}, mean = {np.mean(sa_disturbances):.4f}")
print(f"SQA min = {min(sqa_disturbances):.4f}, max = {max(sqa_disturbances):.4f}, mean = {np.mean(sqa_disturbances):.4f}")
# ========== 時間比較摘要 ==========
print("\n⏱️ Execution Time Comparison:")
print(f" SA : {sa_time:.3f} sec (avg {sa_time/NUM_RUNS:.3f} sec/run)")
print(f" SQA : {sqa_time:.3f} sec (avg {sqa_time/NUM_RUNS:.3f} sec/run)")
print(f" Classic: {classic_elapsed:.3f} sec")
print(f" SA speedup vs Classic: {classic_elapsed/sa_time:.2f}x")
print(f" SQA speedup vs Classic: {classic_elapsed/sqa_time:.2f}x")
# 畫圖
classic_results = (classic_uav1, classic_uav2, classic_total_cost, classic_route)
print("\n🎨 Creating comprehensive analysis charts...")
create_comprehensive_charts(sa_results, sqa_results, classic_results, D, N)
print("\n🏁 Done.")

File diff suppressed because it is too large Load Diff

@ -0,0 +1,396 @@
import time
import numpy as np
import matplotlib.pyplot as plt
from itertools import permutations
import math
import openjij as oj
# ==================== 可調整參數區 ====================
# 問題規模設定
CITIES = 15 # 測試建議先用 12-15確認圖表沒問題再上調到 30
RANDOM = False
RANDOM_SEED = 42
COORD_RANGE = (0.0, 10.0)
# 演算法執行設定
NUM_RUNS = 20 # 正式比較的執行次數
BETA = 10.0 # SQA 的逆溫度參數
# QUBO 參數
PENALTY = 20.0
BIG_PENALTY = 9999.0
# TSP 求解參數
EXACT_LIMIT = 8
# 魯棒優化參數 (H-infinity Robust QUBO)
USE_ROBUST = True # 強制開啟,這是本論文的核心
GAMMA = 0.5
SIGMA = 1.0
ALPHA = 10.0
# =====================================================
# ----------------- 基礎輔助函數 -----------------
def generate_distance_matrix(num_cities, random=True, seed=None, coord_range=None):
if seed is None: seed = RANDOM_SEED
if coord_range is None: coord_range = COORD_RANGE
if not random: np.random.seed(seed)
else: np.random.seed(None)
low, high = coord_range
coords = np.random.uniform(low, high, size=(num_cities, 2))
D = np.zeros((num_cities, num_cities), dtype=float)
for i in range(num_cities):
for j in range(i + 1, num_cities):
dist = np.linalg.norm(coords[i] - coords[j])
D[i, j] = dist
D[j, i] = dist
return np.round(D, 2), coords
def generate_disturbance_matrix(num_cities, seed=None):
if seed is None: seed = RANDOM_SEED
np.random.seed(seed)
F = np.random.rand(num_cities, num_cities)
F = (F + F.T) / 2
np.fill_diagonal(F, 0.0)
return F
def idx_mtsp(k, i, p, N):
return k * (N * N) + i * N + p
def build_robust_qubo(D, F, gamma=GAMMA, sigma=SIGMA, alpha=ALPHA, penalty=PENALTY, big_penalty=BIG_PENALTY):
N = D.shape[0]
Q = {}
def addQ(u, v, w):
if w == 0: return
if u > v: u, v = v, u
Q[(u, v)] = Q.get((u, v), 0.0) + w
for k in range(2):
for p in range(N):
q = (p + 1) % N
for i in range(N):
for j in range(N):
dij, fij = D[i, j], F[i, j]
if dij == 0: continue
# 這裡就是 H-infinity 的核心風險項!完全保留著!
risk_term = (sigma / (gamma**2)) * (fij**2)
total_weight = dij + (alpha * risk_term)
u, v = idx_mtsp(k, i, p, N), idx_mtsp(k, j, q, N)
addQ(u, v, total_weight)
for k in range(2):
for p in range(N):
vars_pos = [idx_mtsp(k, i, p, N) for i in range(N)]
for u in vars_pos: addQ(u, u, -penalty)
for a in range(N):
for b in range(a+1, N): addQ(vars_pos[a], vars_pos[b], 2*penalty)
for i in range(1, N):
vars_city = [idx_mtsp(k, i, p, N) for k in range(2) for p in range(N)]
for u in vars_city: addQ(u, u, -penalty)
L = len(vars_city)
for a in range(L):
for b in range(a+1, L): addQ(vars_city[a], vars_city[b], 2*penalty)
for k in range(2):
for i in range(1, N): addQ(idx_mtsp(k, i, 0, N), idx_mtsp(k, i, 0, N), big_penalty)
addQ(idx_mtsp(k, 0, 0, N), idx_mtsp(k, 0, 0, N), -big_penalty)
return Q, N
def decode_slots(sample, N):
slots = []
for k in range(2):
row = []
for p in range(N):
chosen = 0
for i in range(N):
if sample.get(idx_mtsp(k, i, p, N), 0) == 1:
chosen = i
break
row.append(chosen)
slots.append(row)
return slots[0], slots[1]
def repair_routes_from_slots(u1_slots, u2_slots, N):
count1, count2 = [0]*N, [0]*N
for c in u1_slots:
if 0 <= c < N: count1[c] += 1
for c in u2_slots:
if 0 <= c < N: count2[c] += 1
assign1, assign2 = [], []
for city in range(1, N):
if count1[city] > count2[city]: assign1.append(city)
elif count2[city] > count1[city]: assign2.append(city)
else:
if len(assign1) <= len(assign2): assign1.append(city)
else: assign2.append(city)
return sorted(assign1), sorted(assign2)
def uav_cost(path, D):
if not path or len(path) == 0: return 0.0
cost = D[0, path[0]]
for i in range(len(path)-1): cost += D[path[i], path[i+1]]
cost += D[path[-1], 0]
return float(cost)
def uav_disturbance_energy(path, F, gamma=GAMMA, sigma=SIGMA):
if not path or len(path) == 0: return 0.0
risk = (sigma / (gamma**2)) * (F[0, path[0]]**2)
for i in range(len(path)-1): risk += (sigma / (gamma**2)) * (F[path[i], path[i+1]]**2)
risk += (sigma / (gamma**2)) * (F[path[-1], 0]**2)
return float(risk)
def best_order_for_cities(cities, D, exact_limit=EXACT_LIMIT):
cities = list(cities)
if len(cities) <= 1: return cities, uav_cost(cities, D)
if len(cities) <= exact_limit:
best_perm, best_cost = None, float('inf')
for perm in permutations(cities):
c = uav_cost(list(perm), D)
if c < best_cost: best_cost, best_perm = c, list(perm)
return best_perm, best_cost
else: # 簡單的 nearest neighbor heuristic 當作 fallback
rem = set(cities)
curr = list(rem)[0]
rem.remove(curr)
route = [curr]
while rem:
nxt = min(rem, key=lambda x: D[curr, x])
route.append(nxt)
rem.remove(nxt)
curr = nxt
return route, uav_cost(route, D)
def get_makespan_and_risk(sample, N, D, F):
u1, u2 = decode_slots(sample, N)
a1, a2 = repair_routes_from_slots(u1, u2, N)
p1, c1 = best_order_for_cities(a1, D)
p2, c2 = best_order_for_cities(a2, D)
makespan = max(c1, c2)
dist_energy = uav_disturbance_energy(p1, F) + uav_disturbance_energy(p2, F)
return makespan, dist_energy
# ============================
# 核心創新功能區 (Pause Strategy)
# ============================
def pilot_search_phase_transition(Q, N, D, F):
print("\n🔍 [階段一] 前導偵查 (Pilot Search): 尋找最佳暫停點 s*")
test_s_list = np.arange(0.2, 1.0, 0.1)
variances = []
makespans = []
sampler = oj.SQASampler()
for s in test_s_list:
sched_var = [[s, BETA, 30]]
res_var = sampler.sample_qubo(Q, schedule=sched_var, num_reads=10)
energies = [d.energy for d in res_var.data()]
variances.append(np.var(energies))
sched_pause = [[s, BETA, 20], [s, BETA, 40], [1.0, BETA, 20]]
res_pause = sampler.sample_qubo(Q, schedule=sched_pause, num_reads=5)
mk, _ = get_makespan_and_risk(res_pause.first.sample, N, D, F)
makespans.append(mk)
print(f" 測試 s={s:.1f} | 變異數: {variances[-1]:.0f} | Makespan: {mk:.2f}")
best_s = test_s_list[np.argmin(makespans)]
print(f"⭐ 鎖定最佳暫停點 s* = {best_s:.1f}")
return test_s_list, variances, makespans, best_s
def test_pause_durations(Q, N, D, F, s_star):
print(f"\n⏳ [階段二] 暫停時間測試 (在 s*={s_star:.1f} 進行熱化)")
durations = [0, 20, 50, 100, 150]
duration_makespans = []
sampler = oj.SQASampler()
for d in durations:
if d == 0:
sched = [[1.0, BETA, 100]]
else:
sched = [[s_star, BETA, 50], [s_star, BETA, d], [1.0, BETA, 50]]
res = sampler.sample_qubo(Q, schedule=sched, num_reads=10)
mk, _ = get_makespan_and_risk(res.first.sample, N, D, F)
duration_makespans.append(mk)
print(f" 暫停 {d} 步 -> Makespan: {mk:.2f}")
return durations, duration_makespans
def run_comparative_evaluations(Q, N, D, F, s_star):
print(f"\n⚔️ [階段三] 正式對決 (SA vs SQA vs Pause-SQA, {NUM_RUNS} runs)")
# 資料容器 (包含 Makespan, Risk, Energy)
results = {
'sa': {'mk': [], 'risk': [], 'energy': []},
'sqa': {'mk': [], 'risk': [], 'energy': []},
'pause': {'mk': [], 'risk': [], 'energy': []}
}
sqa_sampler = oj.SQASampler()
sa_sampler = oj.SASampler()
std_sched = [[1.0, BETA, 200]]
pause_sched = [[s_star, BETA, 50], [s_star, BETA, 100], [1.0, BETA, 50]]
for r in range(NUM_RUNS):
# 1. SA (作為全方位 Baseline現在也跑多次)
res_sa = sa_sampler.sample_qubo(Q, num_reads=10)
mk_sa, risk_sa = get_makespan_and_risk(res_sa.first.sample, N, D, F)
results['sa']['mk'].append(mk_sa)
results['sa']['risk'].append(risk_sa)
results['sa']['energy'].append(res_sa.first.energy)
# 2. Standard SQA
res_std = sqa_sampler.sample_qubo(Q, schedule=std_sched, num_reads=10)
mk_std, risk_std = get_makespan_and_risk(res_std.first.sample, N, D, F)
results['sqa']['mk'].append(mk_std)
results['sqa']['risk'].append(risk_std)
results['sqa']['energy'].append(res_std.first.energy)
# 3. Pause SQA
res_pause = sqa_sampler.sample_qubo(Q, schedule=pause_sched, num_reads=10)
mk_pause, risk_pause = get_makespan_and_risk(res_pause.first.sample, N, D, F)
results['pause']['mk'].append(mk_pause)
results['pause']['risk'].append(risk_pause)
results['pause']['energy'].append(res_pause.first.energy)
print(f" SA 平均 Makespan: {np.mean(results['sa']['mk']):.2f}")
print(f" Standard-SQA 平均 Makespan: {np.mean(results['sqa']['mk']):.2f}")
print(f" Pause-SQA 平均 Makespan: {np.mean(results['pause']['mk']):.2f} (Proposed)")
return results
# ============================
# 學術圖表繪製 (Academic Visualization)
# ============================
def create_basic_distribution_charts(results):
"""新增的圖表一:繪製三大指標的分布對比圖 (MinMax, Energy, Disturbance)"""
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
fig.suptitle(f"Algorithm Performance Distributions (N={CITIES})", fontsize=16, fontweight='bold')
labels = ['SA', 'Standard-SQA', 'Pause-SQA']
colors = ['gray', 'steelblue', 'firebrick']
# 1. Makespan 分布
data_mk = [results['sa']['mk'], results['sqa']['mk'], results['pause']['mk']]
parts1 = axes[0].violinplot(data_mk, showmeans=True)
axes[0].set_title("Makespan (minMax) Distribution")
axes[0].set_ylabel("Distance Cost")
# 2. Energy 分布
data_energy = [results['sa']['energy'], results['sqa']['energy'], results['pause']['energy']]
parts2 = axes[1].violinplot(data_energy, showmeans=True)
axes[1].set_title("QUBO System Energy Distribution")
axes[1].set_ylabel("System Energy")
# 3. Disturbance Risk 分布
data_risk = [results['sa']['risk'], results['sqa']['risk'], results['pause']['risk']]
parts3 = axes[2].violinplot(data_risk, showmeans=True)
axes[2].set_title(r"$H_\infty$ Disturbance Risk Distribution")
axes[2].set_ylabel("Risk Penalty")
# 設定外觀
for i, ax in enumerate(axes):
ax.set_xticks([1, 2, 3])
ax.set_xticklabels(labels, fontsize=11)
ax.grid(alpha=0.3, axis='y')
# 上色
parts = [parts1, parts2, parts3][i]
for pc, color in zip(parts['bodies'], colors):
pc.set_facecolor(color)
pc.set_alpha(0.7)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig("distribution_comparison.png", dpi=300)
print("\n🎨 新增圖表已繪製: distribution_comparison.png")
plt.show()
def create_academic_charts(s_list, variances, makespans, best_s, durations, dur_makespans, results):
"""原有的圖表二Pause Strategy 深度分析圖"""
fig = plt.figure(figsize=(16, 10))
fig.suptitle(f"Quantum Pause Strategy in H-infinity Robust mTSP (N={CITIES})", fontsize=16, fontweight='bold')
# Chart 1: Phase Transition & U-Valley
ax1 = plt.subplot(2, 2, 1)
color1, color2 = 'tab:blue', 'tab:red'
ax1.set_title("Chart 1: Phase Transition & U-Valley", fontsize=12)
ax1.set_xlabel("Annealing Parameter $s$")
ax1.set_ylabel("Trotter Energy Variance", color=color1)
ax1.plot(s_list, variances, marker='o', color=color1, linewidth=2)
ax1.tick_params(axis='y', labelcolor=color1)
ax1_twin = ax1.twinx()
ax1_twin.set_ylabel("Makespan", color=color2)
ax1_twin.plot(s_list, makespans, marker='s', color=color2, linestyle='--', linewidth=2)
ax1_twin.tick_params(axis='y', labelcolor=color2)
ax1.axvline(x=best_s, color='green', linestyle=':', linewidth=2)
ax1.grid(alpha=0.3)
# Chart 2: Pareto Front
ax2 = plt.subplot(2, 2, 2)
ax2.set_title("Chart 2: Robustness Pareto Front", fontsize=12)
ax2.scatter(results['sqa']['mk'], results['sqa']['risk'], c='steelblue', alpha=0.7, s=80, label='Standard-SQA')
ax2.scatter(results['pause']['mk'], results['pause']['risk'], c='firebrick', alpha=0.9, s=120, marker='*', label='Pause-SQA')
ax2.set_xlabel("Makespan")
ax2.set_ylabel("Disturbance Risk")
ax2.legend()
ax2.grid(alpha=0.3, linestyle='--')
# Chart 3: Reliability Violin Plot
ax3 = plt.subplot(2, 2, 3)
ax3.set_title("Chart 3: Final Makespan Comparison", fontsize=12)
parts = ax3.violinplot([results['sqa']['mk'], results['pause']['mk']], showmeans=True)
parts['bodies'][0].set_facecolor('steelblue')
parts['bodies'][1].set_facecolor('firebrick')
for body in parts['bodies']: body.set_alpha(0.7)
ax3.set_xticks([1, 2])
ax3.set_xticklabels(['Standard-SQA', 'Pause-SQA'])
sa_mean = np.mean(results['sa']['mk'])
ax3.axhline(y=sa_mean, color='gray', linestyle=':', linewidth=2, label=f'SA Mean: {sa_mean:.1f}')
ax3.legend()
ax3.grid(alpha=0.3, axis='y')
# Chart 4: Effect of Thermalization Time
ax4 = plt.subplot(2, 2, 4)
ax4.set_title(f"Chart 4: Effect of Thermalization Time at $s^*={best_s:.1f}$", fontsize=12)
ax4.plot(durations, dur_makespans, marker='D', color='darkorange', linewidth=2.5, markersize=8)
ax4.set_xlabel("Pause Duration (Sweeps)")
ax4.set_ylabel("Optimized Makespan")
ax4.grid(alpha=0.3, linestyle='--')
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig("academic_pause_strategy.png", dpi=300)
print("🎨 原有圖表已繪製: academic_pause_strategy.png")
plt.show()
# ============================
# 主程式執行入口
# ============================
if __name__ == "__main__":
D, city_coords = generate_distance_matrix(CITIES, random=RANDOM)
F = generate_disturbance_matrix(CITIES)
print("==================================================")
print(f"🚀 Quantum Pause Strategy Optimization (mTSP N={CITIES})")
print("==================================================")
Q, _ = build_robust_qubo(D, F)
# 階段 1找相變點
s_list, variances, pilot_mks, best_s = pilot_search_phase_transition(Q, CITIES, D, F)
# 階段 2測試暫停時間
durations, dur_mks = test_pause_durations(Q, CITIES, D, F, best_s)
# 階段 3正式評估 (一次抓齊所有數據)
results = run_comparative_evaluations(Q, CITIES, D, F, best_s)
# 繪製圖表一:三大基礎分布對比
create_basic_distribution_charts(results)
# 繪製圖表二Pause SQA 深度分析
create_academic_charts(s_list, variances, pilot_mks, best_s, durations, dur_mks, results)
print("🏁 實驗完成!")

@ -0,0 +1,689 @@
import time
import numpy as np
import matplotlib.pyplot as plt
from itertools import permutations
import math
import openjij as oj
# ==================== 可調整參數區 ====================
# 問題規模設定
CITIES = 30 # 初始核心問題規模 (測試建議先用 12-15)
CITIES_BOUND = 10 # 問題規模的上下範圍 (單一數字控制:以 CITIES 為中心,例如 15±5即 10~20)
RANDOM = False
RANDOM_SEED = 42
COORD_RANGE = (0.0, 10.0)
# 【核心創新】:環境變異數控制與複雜度壓力測試參數
N_LIST = list(range(CITIES - CITIES_BOUND, CITIES + CITIES_BOUND + 1, 4)) # 自動產生測試的問題規模範圍
COORD_STD = 15.0 # 空間分佈變異數 (越大代表城市分佈越不均勻、越崎嶇)
RISK_STD = 10.0 # 擾動風險變異數 (越大代表某些路段特別危險H-infinity 衝突極大)
# 演算法執行設定
NUM_RUNS = 10 # 正式比較的執行次數
BETA = 15.0 # 🌟 稍微調高 (原為 10.0),讓低溫結冰得更紮實
# QUBO 參數
PENALTY = 500.0 # 🌟 從 3000 大幅降回 500 (解開高爾夫球場效應!)
BIG_PENALTY = 2000.0 # 🌟 起點約束降到 2000 即可
# TSP 求解參數
EXACT_LIMIT = 8
# 魯棒優化參數 (H-infinity Robust QUBO)
USE_ROBUST = True # 強制開啟,這是本論文的核心
GAMMA = 0.5
SIGMA = 1.0
ALPHA = 10.0
# =====================================================
# ----------------- 基礎輔助函數 -----------------
def generate_distance_matrix(num_cities, random=True, seed=None, coord_range=None):
if seed is None: seed = RANDOM_SEED
if coord_range is None: coord_range = COORD_RANGE
if not random: np.random.seed(seed)
else: np.random.seed(None)
low, high = coord_range
coords = np.random.uniform(low, high, size=(num_cities, 2))
D = np.zeros((num_cities, num_cities), dtype=float)
for i in range(num_cities):
for j in range(i + 1, num_cities):
dist = np.linalg.norm(coords[i] - coords[j])
D[i, j] = dist
D[j, i] = dist
return np.round(D, 2), coords
def generate_disturbance_matrix(num_cities, seed=None):
if seed is None: seed = RANDOM_SEED
np.random.seed(seed)
F = np.random.rand(num_cities, num_cities)
F = (F + F.T) / 2
np.fill_diagonal(F, 0.0)
return F
def idx_mtsp(k, i, p, N):
return k * (N * N) + i * N + p
def build_robust_qubo(D, F, gamma=GAMMA, sigma=SIGMA, alpha=ALPHA, penalty=PENALTY, big_penalty=BIG_PENALTY):
N = D.shape[0]
Q = {}
def addQ(u, v, w):
if w == 0: return
if u > v: u, v = v, u
Q[(u, v)] = Q.get((u, v), 0.0) + w
for k in range(2):
for p in range(N):
q = (p + 1) % N
for i in range(N):
for j in range(N):
dij, fij = D[i, j], F[i, j]
if dij == 0: continue
# 這裡就是 H-infinity 的核心風險項!完全保留著!
risk_term = (sigma / (gamma**2)) * (fij**2)
total_weight = dij + (alpha * risk_term)
u, v = idx_mtsp(k, i, p, N), idx_mtsp(k, j, q, N)
addQ(u, v, total_weight)
for k in range(2):
for p in range(N):
vars_pos = [idx_mtsp(k, i, p, N) for i in range(N)]
for u in vars_pos: addQ(u, u, -penalty)
for a in range(N):
for b in range(a+1, N): addQ(vars_pos[a], vars_pos[b], 2*penalty)
for i in range(1, N):
vars_city = [idx_mtsp(k, i, p, N) for k in range(2) for p in range(N)]
for u in vars_city: addQ(u, u, -penalty)
L = len(vars_city)
for a in range(L):
for b in range(a+1, L): addQ(vars_city[a], vars_city[b], 2*penalty)
for k in range(2):
for i in range(1, N): addQ(idx_mtsp(k, i, 0, N), idx_mtsp(k, i, 0, N), big_penalty)
addQ(idx_mtsp(k, 0, 0, N), idx_mtsp(k, 0, 0, N), -big_penalty)
return Q, N
# ---------------------------------------------------------
# 🌟 [新增] 產生平滑的 SQA 與 Pause SQA Schedule
# ---------------------------------------------------------
def get_smooth_sqa_schedule(beta, total_sweeps, num_steps=100):
"""產生標準 SQA 的平滑退火排程 (s 從 0.0 遞增到 1.0)"""
schedule = []
sweeps_per_step = max(1, int(total_sweeps / num_steps))
for s in np.linspace(0.0, 1.0, num_steps):
schedule.append([float(s), float(beta), int(sweeps_per_step)])
return schedule
def get_pause_sqa_schedule(s_star, beta, total_anneal_sweeps, pause_sweeps, num_steps=100):
"""產生帶有 Pause 策略的平滑退火排程"""
schedule = []
# 階段 1從 s=0.0 平滑退火到 s_star
steps_p1 = max(1, int(num_steps * s_star))
sweeps_p1 = max(1, int((total_anneal_sweeps / 2) / steps_p1))
for s in np.linspace(0.0, s_star, steps_p1, endpoint=False):
schedule.append([float(s), float(beta), int(sweeps_p1)])
# 階段 2在相變點 s_star 暫停 (Pause),讓量子系統充分熱化
schedule.append([float(s_star), float(beta), int(pause_sweeps)])
# 階段 3從 s_star 平滑退火到 s=1.0
steps_p3 = max(1, num_steps - steps_p1)
sweeps_p3 = max(1, int((total_anneal_sweeps / 2) / steps_p3))
for s in np.linspace(s_star, 1.0, steps_p3):
schedule.append([float(s), float(beta), int(sweeps_p3)])
return schedule
def decode_slots(sample, N):
slots = []
for k in range(2):
row = []
for p in range(N):
chosen = 0
for i in range(N):
if sample.get(idx_mtsp(k, i, p, N), 0) == 1:
chosen = i
break
row.append(chosen)
slots.append(row)
return slots[0], slots[1]
def repair_routes_from_slots(u1_slots, u2_slots, N):
count1, count2 = [0]*N, [0]*N
for c in u1_slots:
if 0 <= c < N: count1[c] += 1
for c in u2_slots:
if 0 <= c < N: count2[c] += 1
assign1, assign2 = [], []
for city in range(1, N):
if count1[city] > count2[city]: assign1.append(city)
elif count2[city] > count1[city]: assign2.append(city)
else:
if len(assign1) <= len(assign2): assign1.append(city)
else: assign2.append(city)
return sorted(assign1), sorted(assign2)
def uav_cost(path, D):
if not path or len(path) == 0: return 0.0
cost = D[0, path[0]]
for i in range(len(path)-1): cost += D[path[i], path[i+1]]
cost += D[path[-1], 0]
return float(cost)
def uav_disturbance_energy(path, F, gamma=GAMMA, sigma=SIGMA):
if not path or len(path) == 0: return 0.0
risk = (sigma / (gamma**2)) * (F[0, path[0]]**2)
for i in range(len(path)-1): risk += (sigma / (gamma**2)) * (F[path[i], path[i+1]]**2)
risk += (sigma / (gamma**2)) * (F[path[-1], 0]**2)
return float(risk)
def best_order_for_cities(cities, D, exact_limit=EXACT_LIMIT):
cities = list(cities)
if len(cities) <= 1: return cities, uav_cost(cities, D)
if len(cities) <= exact_limit:
best_perm, best_cost = None, float('inf')
for perm in permutations(cities):
c = uav_cost(list(perm), D)
if c < best_cost: best_cost, best_perm = c, list(perm)
return best_perm, best_cost
else: # 簡單的 nearest neighbor heuristic 當作 fallback
rem = set(cities)
curr = list(rem)[0]
rem.remove(curr)
route = [curr]
while rem:
nxt = min(rem, key=lambda x: D[curr, x])
route.append(nxt)
rem.remove(nxt)
curr = nxt
return route, uav_cost(route, D)
def get_makespan_and_risk(sample, N, D, F):
u1, u2 = decode_slots(sample, N)
a1, a2 = repair_routes_from_slots(u1, u2, N)
p1, c1 = best_order_for_cities(a1, D)
p2, c2 = best_order_for_cities(a2, D)
makespan = max(c1, c2)
dist_energy = uav_disturbance_energy(p1, F) + uav_disturbance_energy(p2, F)
# 確保回傳路徑,以便印出
return makespan, dist_energy, p1, p2
# ============================
# 核心創新功能區 (Pause Strategy)
# ============================
def pilot_search_phase_transition(Q, N, D, F):
print("\n🔍 [階段一] 前導偵查 (Pilot Search): 尋找最佳暫停點 s*")
test_s_list = np.arange(0.2, 1.0, 0.1)
variances = []
makespans = []
sampler = oj.SQASampler()
for s in test_s_list:
sched_var = [[s, BETA, 30]]
res_var = sampler.sample_qubo(Q, schedule=sched_var, num_reads=10)
energies = [d.energy for d in res_var.data()]
variances.append(np.var(energies))
sched_pause = [[s, BETA, 20], [s, BETA, 40], [1.0, BETA, 20]]
res_pause = sampler.sample_qubo(Q, schedule=sched_pause, num_reads=5)
mk, _, p1, p2 = get_makespan_and_risk(res_pause.first.sample, N, D, F)
makespans.append(mk)
print(f" 測試 s={s:.1f} | 變異數: {variances[-1]:.0f} | Makespan: {mk:.2f}")
best_s = test_s_list[np.argmin(makespans)]
print(f"⭐ 鎖定最佳暫停點 s* = {best_s:.1f}")
return test_s_list, variances, makespans, best_s
def test_pause_durations(Q, N, D, F, s_star):
print(f"\n⏳ [階段二] 暫停時間測試 (在 s*={s_star:.1f} 進行熱化)")
durations = [0, 20, 50, 100, 150]
duration_makespans = []
sampler = oj.SQASampler()
for d in durations:
if d == 0:
sched = get_smooth_sqa_schedule(BETA, total_sweeps=1000)
else:
sched = get_pause_sqa_schedule(s_star, BETA, total_anneal_sweeps=1000, pause_sweeps=d)
res = sampler.sample_qubo(Q, schedule=sched, num_reads=10)
mk, _, p1, p2 = get_makespan_and_risk(res.first.sample, N, D, F)
duration_makespans.append(mk)
print(f" 暫停 {d} 步 -> Makespan: {mk:.2f}")
return durations, duration_makespans
def run_comparative_evaluations(Q, N, D, F, s_star):
print(f"\n⚔️ [階段三] 正式對決 (SA vs SQA vs Pause-SQA, {NUM_RUNS} runs)")
results = {
'sa': {'mk': [], 'risk': [], 'energy': []},
'sqa': {'mk': [], 'risk': [], 'energy': []},
'pause': {'mk': [], 'risk': [], 'energy': []}
}
sqa_sampler = oj.SQASampler()
sa_sampler = oj.SASampler()
# 🌟 核心修改:建立 1000 Sweeps 的退火排程
TOTAL_SWEEPS = 1000
PAUSE_SWEEPS = 500 # 讓 Pause-SQA 在相變點多停留 500 步熱化
smooth_sched = get_smooth_sqa_schedule(beta=BETA, total_sweeps=TOTAL_SWEEPS, num_steps=100)
pause_sched = get_pause_sqa_schedule(s_star=s_star, beta=BETA, total_anneal_sweeps=TOTAL_SWEEPS, pause_sweeps=PAUSE_SWEEPS, num_steps=100)
# (選配) 在 pilot_search 和 test_pause 中也等比例放大 sweeps
# 例如 pilot_search 改成 [[s, BETA, 300]]
for r in range(NUM_RUNS):
# SA
res_sa = sa_sampler.sample_qubo(Q, num_reads=10)
mk, risk, p1, p2 = get_makespan_and_risk(res_sa.first.sample, N, D, F)
results['sa']['mk'].append(mk)
results['sa']['risk'].append(risk)
results['sa']['energy'].append(res_sa.first.energy)
print(f" [SA 執行 {r+1}/{NUM_RUNS}] 路徑 1: [0, {', '.join(map(str, p1))}, 0], 路徑 2: [0, {', '.join(map(str, p2))}, 0] | 成本: {mk:.2f}")
# Standard SQA
t0 = time.time()
res_std = sqa_sampler.sample_qubo(Q, schedule=smooth_sched, num_reads=10)
mk, risk, p1, p2 = get_makespan_and_risk(res_std.first.sample, N, D, F)
results['sqa']['mk'].append(mk)
results['sqa']['risk'].append(risk)
results['sqa']['energy'].append(res_std.first.energy)
print(f" [Std-SQA 執行 {r+1}/{NUM_RUNS}] 耗時: {time.time()-t0:.2f}s | 路徑 1: [0, {', '.join(map(str, p1))}, 0], 路徑 2: [0, {', '.join(map(str, p2))}, 0] | 成本: {mk:.2f}")
# Pause SQA
t0 = time.time()
res_pause = sqa_sampler.sample_qubo(Q, schedule=pause_sched, num_reads=10)
mk, risk, p1, p2 = get_makespan_and_risk(res_pause.first.sample, N, D, F)
results['pause']['mk'].append(mk)
results['pause']['risk'].append(risk)
results['pause']['energy'].append(res_pause.first.energy)
print(f" [Pause-SQA 執行 {r+1}/{NUM_RUNS}] 耗時: {time.time()-t0:.2f}s | 路徑 1: [0, {', '.join(map(str, p1))}, 0], 路徑 2: [0, {', '.join(map(str, p2))}, 0] | 成本: {mk:.2f}")
return results
# ============================
# 學術圖表繪製 (Academic Visualization)
# ============================
def create_basic_distribution_charts(results):
"""基本分佈分析圖"""
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
fig.suptitle(f"Algorithm Performance Distributions (N={CITIES})", fontsize=16, fontweight='bold')
labels = ['SA', 'Standard-SQA', 'Pause-SQA']
colors = ['gray', 'steelblue', 'firebrick']
# 1. Makespan 分布
parts1 = axes[0].violinplot([results['sa']['mk'], results['sqa']['mk'], results['pause']['mk']], showmeans=True)
axes[0].set_title("Makespan (Distance Cost) Distribution")
axes[0].set_ylabel("Distance Cost")
# 2. Risk 分布
parts2 = axes[1].violinplot([results['sa']['risk'], results['sqa']['risk'], results['pause']['risk']], showmeans=True)
axes[1].set_title(r"$H_\infty$ Disturbance Risk Distribution")
axes[1].set_ylabel("Risk Penalty")
# 3. Energy 分布
parts3 = axes[2].violinplot([results['sa']['energy'], results['sqa']['energy'], results['pause']['energy']], showmeans=True)
axes[2].set_title("QUBO System Raw Energy")
axes[2].set_ylabel("System Energy")
# 設定提琴圖外觀
for i, ax in enumerate(axes):
ax.set_xticks([1, 2, 3])
ax.set_xticklabels(labels, fontsize=11)
ax.grid(alpha=0.3, axis='y')
parts = [parts1, parts2, parts3][i]
for pc, color in zip(parts['bodies'], colors):
pc.set_facecolor(color)
pc.set_alpha(0.7)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig("distribution_comparison.png", dpi=300)
print("\n🎨 圖表已繪製: distribution_comparison.png")
# plt.show() (Moved to the end)
def create_academic_charts(s_list, variances, makespans, best_s, durations, dur_makespans, results):
"""原有的圖表二Pause Strategy 深度分析圖"""
fig = plt.figure(figsize=(16, 10))
fig.suptitle(f"Quantum Pause Strategy in H-infinity Robust mTSP (N={CITIES})", fontsize=16, fontweight='bold')
# Chart 1: Phase Transition & U-Valley
ax1 = plt.subplot(2, 2, 1)
color1, color2 = 'tab:blue', 'tab:red'
ax1.set_title("Chart 1: Phase Transition & U-Valley", fontsize=12)
ax1.set_xlabel("Annealing Parameter $s$")
ax1.set_ylabel("Trotter Energy Variance", color=color1)
ax1.plot(s_list, variances, marker='o', color=color1, linewidth=2)
ax1.tick_params(axis='y', labelcolor=color1)
ax1_twin = ax1.twinx()
ax1_twin.set_ylabel("Makespan", color=color2)
ax1_twin.plot(s_list, makespans, marker='s', color=color2, linestyle='--', linewidth=2)
ax1_twin.tick_params(axis='y', labelcolor=color2)
ax1.axvline(x=best_s, color='green', linestyle=':', linewidth=2)
ax1.grid(alpha=0.3)
# Subplot 2 (was Chart 4): Effect of Thermalization Time
ax2 = plt.subplot(2, 2, 2)
ax2.set_title(f"Chart 2: Effect of Thermalization Time at $s^*={best_s:.1f}$", fontsize=12)
ax2.plot(durations, dur_makespans, marker='D', color='darkorange', linewidth=2.5, markersize=8)
ax2.set_xlabel("Pause Duration (Sweeps)")
ax2.set_ylabel("Optimized Makespan")
ax2.grid(alpha=0.3, linestyle='--')
# Subplot 3 (was Chart 2): Pareto Front
ax3 = plt.subplot(2, 2, 3)
ax3.set_title("Chart 3: Robustness Pareto Front", fontsize=12)
ax3.scatter(results['sqa']['mk'], results['sqa']['risk'], c='steelblue', alpha=0.7, s=80, label='Standard-SQA')
ax3.scatter(results['pause']['mk'], results['pause']['risk'], c='firebrick', alpha=0.9, s=120, marker='*', label='Pause-SQA')
ax3.set_xlabel("Makespan")
ax3.set_ylabel("Disturbance Risk")
ax3.legend()
ax3.grid(alpha=0.3, linestyle='--')
# Subplot 4 (was Chart 3): Reliability Violin Plot
ax4 = plt.subplot(2, 2, 4)
ax4.set_title("Chart 4: Final Makespan Comparison", fontsize=12)
parts = ax4.violinplot([results['sqa']['mk'], results['pause']['mk']], showmeans=True)
parts['bodies'][0].set_facecolor('steelblue')
parts['bodies'][1].set_facecolor('firebrick')
for body in parts['bodies']: body.set_alpha(0.7)
ax4.set_xticks([1, 2])
ax4.set_xticklabels(['Standard-SQA', 'Pause-SQA'])
sa_mean = np.mean(results['sa']['mk'])
ax4.axhline(y=sa_mean, color='gray', linestyle=':', linewidth=2, label=f'SA Mean: {sa_mean:.1f}')
ax4.legend()
ax4.grid(alpha=0.3, axis='y')
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig("academic_pause_strategy.png", dpi=300)
print("🎨 原有圖表已繪製: academic_pause_strategy.png")
# plt.show() (Moved to the end)
# ============================
# 複雜度與環境控制功能 (新增)
# ============================
def inject_hidden_shortcut(D, F):
"""
注入隱藏的黃金捷徑 (Hidden Shortcut)
創造一條極度狹窄且成本極低的路線 (例如 4 -> 5 -> 6 -> 7)
如果沒照順序走成本會極高一旦走對總成本會大幅下降
"""
N = D.shape[0]
if N < 8:
return D, F # 城市太少無法做長捷徑
# 定義黃金路線的節點 (假設是 4, 5, 6, 7)
golden_nodes = [4, 5, 6, 7]
# 1. 築起高牆:先把這幾個城市之間的所有連線,都變成極高成本 (阻止 SA 亂湊)
for i in golden_nodes:
for j in golden_nodes:
if i != j:
D[i, j] = 8.0
F[i, j] = 0.8 # 高風險
# 2. 挖出深谷:只開通 4->5, 5->6, 6->7 這條唯一且完美的捷徑
for i in range(len(golden_nodes) - 1):
n1 = golden_nodes[i]
n2 = golden_nodes[i + 1]
# 距離極短,風險為 0
D[n1, n2] = 0.05
D[n2, n1] = 0.05
F[n1, n2] = 0.0
F[n2, n1] = 0.0
print("\n✨ [隱藏捷徑已佈署] 演算法將挑戰尋找極狹窄的黃金路線 (4->5->6->7)")
return D, F
def inject_deceptive_trap(D, F, alpha=10.0, gamma=0.5, sigma=1.0):
"""
注入欺騙性陷阱 (Deceptive Trap)
改造矩陣創造一條看似完美的捷徑測試演算法是否會陷入局部陷阱
"""
N = D.shape[0]
if N < 4:
return D, F # 城市太少無法做陷阱
# 定義陷阱節點
trap_start = 1
trap_end = 2
safe_detour = 3
# ==========================================
# 陷阱 1致命捷徑 (距離極度誘人,但風險爆表)
# ==========================================
D[trap_start, trap_end] = 0.1
D[trap_end, trap_start] = 0.1
F[trap_start, trap_end] = 0.99
F[trap_end, trap_start] = 0.99
# ==========================================
# 陷阱 2安全繞路 (距離較遠,但完全無風險)
# ==========================================
# 路線: trap_start -> safe_detour -> trap_end
D[trap_start, safe_detour] = 4.0
D[safe_detour, trap_start] = 4.0
D[safe_detour, trap_end] = 4.0
D[trap_end, safe_detour] = 4.0
F[trap_start, safe_detour] = 0.01
F[safe_detour, trap_start] = 0.01
F[safe_detour, trap_end] = 0.01
F[trap_end, safe_detour] = 0.01
# 計算並印出真實成本,讓您確認陷阱是否成立
# 真實成本 = 距離 + alpha * (sigma / gamma^2) * F^2
risk_multiplier = alpha * (sigma / (gamma**2))
shortcut_cost = 0.1 + risk_multiplier * (0.99**2)
detour_cost = (4.0 + risk_multiplier * (0.01**2)) + (4.0 + risk_multiplier * (0.01**2))
print(f" ➤ 致命捷徑 (1->2) 表觀距離: 0.1 | 隱藏真實成本: {shortcut_cost:.2f}")
print(f" ➤ 安全繞路 (1->3->2) 表觀距離: 8.0 | 隱藏真實成本: {detour_cost:.2f}")
return D, F
def generate_controlled_matrices(num_cities, coord_std, risk_std, seed=42):
"""根據指定的標準差(變異數)生成 D 與 F 矩陣,精準控制問題複雜度"""
np.random.seed(seed)
# 1. 生成距離矩陣 D (使用常態分佈控制空間聚集度)
coords = np.random.normal(loc=0.0, scale=coord_std, size=(num_cities, 2))
D = np.zeros((num_cities, num_cities), dtype=float)
for i in range(num_cities):
for j in range(i + 1, num_cities):
dist = np.linalg.norm(coords[i] - coords[j])
D[i, j] = D[j, i] = dist
# 2. 生成擾動矩陣 F (使用常態分佈控制風險極端值)
F = np.abs(np.random.normal(loc=1.0, scale=risk_std, size=(num_cities, num_cities)))
F = (F + F.T) / 2 # 確保對稱
np.fill_diagonal(F, 0.0)
return np.round(D, 2), np.round(F, 2)
def decode_and_eval(sample, N, D):
# 快速解碼與貪婪/暴力求成本
u1, u2 = [], []
for p in range(N):
for i in range(N):
if sample.get(idx_mtsp(0, i, p, N), 0) == 1: u1.append(i)
if sample.get(idx_mtsp(1, i, p, N), 0) == 1: u2.append(i)
# 簡單修復 (去除重複,補齊缺失)
c1, c2 = set(u1), set(u2)
a1, a2 = [], []
for city in range(1, N):
if city in c1 and city not in c2: a1.append(city)
elif city in c2 and city not in c1: a2.append(city)
else:
if len(a1) <= len(a2): a1.append(city)
else: a2.append(city)
# 評估成本
def eval_cost(cities):
if not cities: return 0
if len(cities) <= EXACT_LIMIT:
bc = float('inf')
for p in permutations(cities):
c = D[0, p[0]] + sum(D[p[i], p[i+1]] for i in range(len(p)-1)) + D[p[-1], 0]
bc = min(bc, c)
return bc
# 貪婪
rem, path, curr = set(cities), [], 0
while rem:
nxt = min(rem, key=lambda x: D[curr, x])
path.append(nxt)
rem.remove(nxt)
curr = nxt
return D[0, path[0]] + sum(D[path[i], path[i+1]] for i in range(len(path)-1)) + D[path[-1], 0]
return max(eval_cost(a1), eval_cost(a2))
def run_complexity_scaling():
print(f"\n🚀 開始複雜度壓力測試...")
print(f"⚙️ 空間變異數(COORD_STD)={COORD_STD}, 風險變異數(RISK_STD)={RISK_STD}")
std_means, pause_means = [], []
std_errs, pause_errs = [], []
sampler = oj.SQASampler()
for N in N_LIST:
print(f"\n📊 測試規模 N={N} ...")
D, F = generate_controlled_matrices(N, COORD_STD, RISK_STD)
# 動態計算懲罰值
max_d = np.max(D)
max_f = np.max(F) if F is not None else 0
max_possible_cost = max_d + max_f
dyn_penalty = max_possible_cost * 12.0
dyn_big_penalty = dyn_penalty * 5.0
Q, _ = build_robust_qubo(D, F, penalty=dyn_penalty, big_penalty=dyn_big_penalty)
std_results, pause_results = [], []
# 為了節省時間,在壓力測試中我們使用固定的經驗暫停點 s=0.7
# 總步數統一為 200 步
std_sched = get_smooth_sqa_schedule(BETA, total_sweeps=200, num_steps=20)
pause_sched = get_pause_sqa_schedule(0.7, BETA, total_anneal_sweeps=200, pause_sweeps=100, num_steps=30)
# 這裡設定每次 N 跑的次數,與原本 NUM_RUNS 獨立,建議 10 次
runs_for_scale = 10
for r in range(runs_for_scale):
# Standard SQA
res_std = sampler.sample_qubo(Q, schedule=std_sched, num_reads=5)
std_results.append(decode_and_eval(res_std.first.sample, N, D))
# Pause SQA
res_pause = sampler.sample_qubo(Q, schedule=pause_sched, num_reads=5)
pause_results.append(decode_and_eval(res_pause.first.sample, N, D))
mean_std, err_std = np.mean(std_results), np.std(std_results)
mean_pause, err_pause = np.mean(pause_results), np.std(pause_results)
std_means.append(mean_std)
std_errs.append(err_std)
pause_means.append(mean_pause)
pause_errs.append(err_pause)
print(f" Standard-SQA 平均: {mean_std:.2f} ± {err_std:.2f}")
print(f" Pause-SQA 平均: {mean_pause:.2f} ± {err_pause:.2f}")
return std_means, std_errs, pause_means, pause_errs
def plot_crossover(std_means, std_errs, pause_means, pause_errs):
plt.figure(figsize=(10, 6))
plt.title(f"Complexity Scaling: Algorithm Performance vs Problem Size (N)\n(Coord Std={COORD_STD}, Risk Std={RISK_STD})", fontsize=14, fontweight='bold')
# 畫線與誤差棒
plt.errorbar(N_LIST, std_means, yerr=std_errs, fmt='-o', color='steelblue',
linewidth=2.5, capsize=5, markersize=8, label='Standard-SQA')
plt.errorbar(N_LIST, pause_means, yerr=pause_errs, fmt='-s', color='firebrick',
linewidth=2.5, capsize=5, markersize=8, label='Pause-SQA')
plt.xlabel("Problem Scale (Number of Cities $N$)", fontsize=12)
plt.ylabel("Optimized Makespan (Distance Cost)", fontsize=12)
plt.xticks(N_LIST)
plt.grid(alpha=0.4, linestyle='--')
plt.legend(fontsize=11)
plt.tight_layout()
plt.savefig("complexity_crossover.png", dpi=300)
print("\n🎨 交叉點分析圖已儲存為: complexity_crossover.png")
# plt.show() (Moved to the end)
# ============================
# 主程式執行入口
# ============================
if __name__ == "__main__":
D, city_coords = generate_distance_matrix(CITIES, random=RANDOM)
F = generate_disturbance_matrix(CITIES)
# 1. 佈署死亡陷阱 (測試演算法是否會被騙)
D, F = inject_deceptive_trap(D, F, alpha=ALPHA, gamma=GAMMA, sigma=SIGMA)
# 2. 🌟 佈署黃金捷徑 (測試演算法能否找到狹窄深谷)
D, F = inject_hidden_shortcut(D, F)
# ======== 🌟 新增:動態自適應懲罰值 ========
# 找出矩陣中最遙遠的距離與最大的擾動
max_d = np.max(D)
max_f = np.max(F) if F is not None else 0
max_possible_cost = max_d + max_f
# 根據最大可能成本,動態設定懲罰值 (通常設為 10 倍 ~ 15 倍最穩定)
PENALTY = max_possible_cost * 12.0
BIG_PENALTY = PENALTY * 5.0
print(f"🔧 [自動調校] 偵測到單趟最大成本: {max_possible_cost:.2f}")
print(f"🔧 [自動調校] 動態 PENALTY 設為: {PENALTY:.2f}, BIG_PENALTY 設為: {BIG_PENALTY:.2f}")
# ==========================================
print("==================================================")
print(f"🚀 Quantum Pause Strategy Optimization (mTSP N={CITIES})")
print("==================================================")
print("\n[初始化] 距離矩陣 D:")
print(D)
print("\n[初始化] 擾動矩陣 F:")
print(F)
Q, _ = build_robust_qubo(D, F, penalty=PENALTY, big_penalty=BIG_PENALTY)
# 階段 1找相變點
s_list, variances, pilot_mks, best_s = pilot_search_phase_transition(Q, CITIES, D, F)
# 階段 2測試暫停時間
durations, dur_mks = test_pause_durations(Q, CITIES, D, F, best_s)
# 階段 3正式評估 (一次抓齊所有數據)
results = run_comparative_evaluations(Q, CITIES, D, F, best_s)
# 階段 4複雜度壓力測試與交叉點分析
s_m, s_e, p_m, p_e = run_complexity_scaling()
print("\n🏁 所有運算完成!開始繪製圖表...")
# 繪製圖表一:三大基礎分布對比
create_basic_distribution_charts(results)
# 繪製圖表二Pause SQA 深度分析
create_academic_charts(s_list, variances, pilot_mks, best_s, durations, dur_mks, results)
# 繪製圖表三:複雜度壓力測試與交叉點分析
plot_crossover(s_m, s_e, p_m, p_e)
print("🏁 所有實驗與圖表繪製完成!")
plt.show()
Loading…
Cancel
Save