import asyncio import serial_asyncio import struct import serial import time import threading import tkinter as tk from tkinter import ttk from collections import deque, defaultdict from pymavlink import mavutil import matplotlib.pyplot as plt import matplotlib.animation as animation from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg # === 多組設備設定 === CONFIGS = [ {"serial_port": "/dev/ttyUSB0", "udp_port": 14551}, {"serial_port": "COM15", "udp_port": 14570}, {"serial_port": "/dev/ttyUSB2", "udp_port": 14553}, {"serial_port": "/dev/ttyUSB3", "udp_port": 14554}, ] SERIAL_BAUDRATE = 115200 UDP_REMOTE_IP = '127.0.0.1' TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' ACTIVE_SYSIDS = [3, 10, 15] POLL_MAGIC = b'POLL' # === 全域變數與狀態追蹤 === current_base_slot_ms = 250 # 預設 TDMA 時槽 LOSS_TIME_WINDOW_SEC = 5.0 # 丟包率計算的時間視窗 (5秒) uav_states = { sysid: { "mode": "NORMAL", } for sysid in ACTIVE_SYSIDS } rssi_history = defaultdict(lambda: deque(maxlen=5000)) time_history = defaultdict(lambda: deque(maxlen=5000)) packet_loss_history = defaultdict(lambda: deque(maxlen=1000)) packet_loss_time_history = defaultdict(lambda: deque(maxlen=1000)) mavlink_sequence_tracker = defaultdict(dict) packet_loss_stats = defaultdict(lambda: {'loss_rate': 0.0, 'total_received': 0, 'total_lost': 0}) serial_to_sysid = {} serial_last_mavlink_time = {} last_db_query_time = {} # === 核心邏輯區 === def calculate_packet_loss(sysid, compid, current_seq): global mavlink_sequence_tracker, packet_loss_stats tracker = mavlink_sequence_tracker[sysid] now = time.time() if compid not in tracker: tracker[compid] = { 'last_seq': current_seq, 'history': deque() } return 0.0 comp_tracker = tracker[compid] last_seq = comp_tracker['last_seq'] if current_seq > last_seq: expected = current_seq - last_seq elif current_seq < last_seq: expected = (255 - last_seq) + current_seq + 1 else: return packet_loss_history[sysid][-1] if packet_loss_history[sysid] else 0.0 lost = max(0, expected - 1) comp_tracker['history'].append((now, expected, lost)) comp_tracker['last_seq'] = current_seq total_expected_all = 0 total_lost_all = 0 for c_id, c_data in tracker.items(): if 'history' in c_data: while c_data['history'] and (now - c_data['history'][0][0]) > LOSS_TIME_WINDOW_SEC: c_data['history'].popleft() total_expected_all += sum(item[1] for item in c_data['history']) total_lost_all += sum(item[2] for item in c_data['history']) overall_loss_rate = (total_lost_all / total_expected_all) * 100.0 if total_expected_all > 0 else 0.0 packet_loss_stats[sysid] = { 'loss_rate': overall_loss_rate, 'total_received': total_expected_all - total_lost_all, 'total_lost': total_lost_all } packet_loss_history[sysid].append(overall_loss_rate) packet_loss_time_history[sysid].append(now) return overall_loss_rate def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x00) -> bytes: frame = b'\x10' + struct.pack(">B", frame_id) + dest_addr64 + b'\xFF\xFE\x00\x00' + data return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", 0xFF - (sum(frame) & 0xFF)) class SerialToUDP(asyncio.Protocol): def send_poll(self, target_sysid): poll_payload = POLL_MAGIC + struct.pack("B", target_sysid) api_frame = build_api_tx_frame(poll_payload, TARGET_ADDR64, 0x00) self.transport.write(api_frame) def __init__(self, udp_protocol, serial_port): self.udp_protocol = udp_protocol self.serial_port = serial_port self.buffer = bytearray() self.gcs_tx_queue = bytearray() def connection_made(self, transport): self.transport = transport if hasattr(self.udp_protocol, 'set_serial_transport'): self.udp_protocol.set_serial_transport(self) print(f"[{self.serial_port}] Serial connection established.") def data_received(self, data): self.buffer.extend(data) while True: try: start_idx = self.buffer.index(0x7E) if start_idx > 0: del self.buffer[:start_idx] except ValueError: self.buffer.clear() return if len(self.buffer) < 3: return length = (self.buffer[1] << 8) | self.buffer[2] full_length = 3 + length + 1 if length > 300: self.buffer.pop(0) continue if len(self.buffer) < full_length: return frame = self.buffer[:full_length] checksum = 0xFF - (sum(frame[3:-1]) & 0xFF) if checksum != frame[-1]: self.buffer.pop(0) continue del self.buffer[:full_length] if hasattr(self.udp_protocol, 'send_udp'): self.udp_protocol.send_udp(bytes(frame)) if frame[3] == 0x88 and frame[5:7] == b'DB': status = frame[7] if status == 0x00 and len(frame) > 8: rssi_value = frame[8] now = time.time() last_time = serial_last_mavlink_time.get(self.serial_port, 0) if now - last_time <= 0.5: sysid = serial_to_sysid.get(self.serial_port, None) if sysid is not None: rssi_history[sysid].append(-rssi_value) time_history[sysid].append(now) def write_to_serial(self, data): self.gcs_tx_queue.extend(data) def flush_gcs_queue(self): if not self.gcs_tx_queue: return send_limit = min(len(self.gcs_tx_queue), 150) data_to_send = self.gcs_tx_queue[:send_limit] self.gcs_tx_queue = self.gcs_tx_queue[send_limit:] asyncio.create_task(self._async_send_chunks(data_to_send)) async def _async_send_chunks(self, data): try: MAX_PAYLOAD = 80 sent_len = 0 while sent_len < len(data): end_len = min(sent_len + MAX_PAYLOAD, len(data)) chunk = data[sent_len:end_len] sent_len = end_len api_frame = build_api_tx_frame(chunk, TARGET_ADDR64, 0x00) self.transport.write(api_frame) await asyncio.sleep(0.01) except Exception: pass def send_at_command_db(self): try: frame_type = 0x08 frame_id = 0x52 at_command = b'DB' parameter = b'' frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + at_command + parameter checksum = 0xFF - (sum(frame) & 0xFF) api_frame = b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) self.transport.write(api_frame) except Exception: pass class UDPHandler(asyncio.DatagramProtocol): def __init__(self, udp_port): self.udp_port = udp_port self.serial_transport = None self.transport = None self.mav_decoder = mavutil.mavlink.MAVLink(None) def connection_made(self, transport): self.transport = transport def set_serial_transport(self, serial_transport): self.serial_transport = serial_transport def datagram_received(self, data, addr): if self.serial_transport: self.serial_transport.write_to_serial(data) def decapsulate_data(self, data): try: if not data or data[0] != 0x7E: return None length = (data[1] << 8) | data[2] if len(data) < length + 4: return None frame_type = data[3] if frame_type == 0x90: rf_data_start = 3 + 12 return data[rf_data_start:3 + length] else: return None except Exception: return None def send_udp(self, data): decoded_data = self.decapsulate_data(data) if decoded_data is None: return try: for byte in decoded_data: msg = self.mav_decoder.parse_char(bytes([byte])) if msg: sysid = msg.get_srcSystem() compid = msg.get_srcComponent() seq = msg.get_seq() if sysid == 0: continue if self.serial_transport: port_name = self.serial_transport.serial_port serial_to_sysid[port_name] = sysid serial_last_mavlink_time[port_name] = time.time() now = time.time() last_query = last_db_query_time.get(port_name, 0) if now - last_query >= 0.5: self.serial_transport.send_at_command_db() last_db_query_time[port_name] = now calculate_packet_loss(sysid, compid, seq) except Exception: pass if self.transport: self.transport.sendto(decoded_data, (UDP_REMOTE_IP, self.udp_port)) async def setup_bridge(config): port, udp = config["serial_port"], config["udp_port"] try: ser = serial.Serial(port, SERIAL_BAUDRATE) ser.close() except: return None loop = asyncio.get_running_loop() udp_handler = UDPHandler(udp) await loop.create_datagram_endpoint(lambda: udp_handler, local_addr=('0.0.0.0', 0)) serial_proto = SerialToUDP(udp_handler, port) await serial_asyncio.create_serial_connection(loop, lambda: serial_proto, port, baudrate=SERIAL_BAUDRATE) return serial_proto async def tdma_scheduler(serial_protocols): print(f"TDMA Scheduler Started... 找到 {len(serial_protocols)} 個可用端口") while True: # GCS 下行時槽 for sp in serial_protocols: if hasattr(sp, 'flush_gcs_queue'): sp.flush_gcs_queue() await asyncio.sleep(0.1) # UAV 上行時槽 for sysid in ACTIVE_SYSIDS: state = uav_states[sysid] if state["mode"] == "INITIALIZING": # VIP 模式:0.8秒內連發4次點名,暴力獲取參數 for _ in range(4): for sp in serial_protocols: if hasattr(sp, 'send_poll'): sp.send_poll(sysid) await asyncio.sleep(0.2) else: # NORMAL 模式:聽從滑桿設定的時間 slot_time = current_base_slot_ms / 1000.0 for sp in serial_protocols: if hasattr(sp, 'send_poll'): sp.send_poll(sysid) await asyncio.sleep(slot_time) async def async_main(): protocols = await asyncio.gather(*(setup_bridge(cfg) for cfg in CONFIGS)) valid_protocols = [p for p in protocols if p is not None] if valid_protocols: asyncio.create_task(tdma_scheduler(valid_protocols)) await asyncio.Future() # === GUI 介面區 === def start_gui(): root = tk.Tk() root.title("UAV TDMA Control Station") root.geometry("1200x800") # --- 左側控制面板 --- control_frame = tk.Frame(root, width=300, bg="#f0f0f0", padx=20, pady=20) control_frame.pack(side=tk.LEFT, fill=tk.Y) tk.Label(control_frame, text="TDMA 基礎時槽控制", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=10) slider_val = tk.IntVar(value=current_base_slot_ms) def on_slider_change(val): global current_base_slot_ms current_base_slot_ms = int(float(val)) val_label.config(text=f"當前設定: {current_base_slot_ms} ms") slider = ttk.Scale(control_frame, from_=50, to_=1000, orient='horizontal', variable=slider_val, command=on_slider_change) slider.pack(fill=tk.X, pady=10) val_label = tk.Label(control_frame, text=f"當前設定: {current_base_slot_ms} ms", bg="#f0f0f0") val_label.pack() tk.Label(control_frame, text="群機初始化控制 (INIT)", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=(30, 10)) # === 1. 改為單選按鈕 (Radiobutton) === init_var = tk.IntVar(value=0) # 0 代表全體 NORMAL radios = {} status_labels = {} def on_radio_change(): selected = init_var.get() for sysid in ACTIVE_SYSIDS: if sysid == selected: uav_states[sysid]["mode"] = "INITIALIZING" else: uav_states[sysid]["mode"] = "NORMAL" # 新增一個「取消特權」的選項 # tk.Radiobutton(control_frame, text="全體 NORMAL (無特權)", variable=init_var, value=0, command=on_radio_change, bg="#f0f0f0", font=("Arial", 11, "bold"), fg="blue").pack(anchor=tk.W, pady=(0, 10)) for sysid in ACTIVE_SYSIDS: frame = tk.Frame(control_frame, bg="#f0f0f0") frame.pack(fill=tk.X, pady=5) rb = tk.Radiobutton(frame, text=f"SYSID {sysid} 專屬載入", variable=init_var, value=sysid, command=on_radio_change, bg="#f0f0f0", font=("Arial", 11)) rb.pack(side=tk.LEFT) radios[sysid] = rb lbl = tk.Label(frame, text="NORMAL", font=("Arial", 10, "bold"), fg="green", bg="#f0f0f0") lbl.pack(side=tk.RIGHT, padx=5) status_labels[sysid] = lbl # === 2. 改為開關式的鎖定按鈕 (Toggle) === is_locked = False def toggle_tdma_mode(): nonlocal is_locked is_locked = not is_locked if is_locked: # 鎖定:強制切回 0 (全體 NORMAL),並禁用單選按鈕 init_var.set(0) on_radio_change() for rb in radios.values(): rb.config(state=tk.DISABLED) # 改變按鈕外觀為「解鎖」狀態 lock_btn.config(text="點擊解鎖 (允許重新下載參數)", bg="orange") print("已鎖定進入純 TDMA 模式") else: # 解鎖:恢復單選按鈕功能 for rb in radios.values(): rb.config(state=tk.NORMAL) # 改變按鈕外觀為「鎖定」狀態 lock_btn.config(text="參數載入完畢,鎖定進入 TDMA", bg="#4CAF50") print("已解除鎖定,可重新分配特權") lock_btn = tk.Button(control_frame, text="參數載入完畢,鎖定進入 TDMA", font=("Arial", 12, "bold"), bg="#4CAF50", fg="white", command=toggle_tdma_mode) lock_btn.pack(pady=30, fill=tk.X) # 動態更新狀態標籤 def update_status_gui(): for sysid, lbl in status_labels.items(): mode = uav_states[sysid]["mode"] if mode == "INITIALIZING": lbl.config(text="INIT (特權下載中)", fg="orange") else: lbl.config(text="NORMAL", fg="green") root.after(500, update_status_gui) update_status_gui() # --- 右側圖表區 --- plot_frame = tk.Frame(root) plot_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(9, 8), dpi=100) canvas = FigureCanvasTkAgg(fig, master=plot_frame) canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) def update_plot(frame): ax1.clear() ax2.clear() ax1.set_title("RSSI", fontsize=12); ax1.set_xlim(10, 0); ax1.set_ylim(-100, -10); ax1.grid(True, alpha=0.3) ax2.set_title("Packet Loss Rate (5s Window)", fontsize=12); ax2.set_xlim(10, 0); ax2.set_ylim(0, 100); ax2.grid(True, alpha=0.3) now = time.time() colors = ['blue', 'red', 'green', 'orange', 'purple'] try: sysids = sorted(list(rssi_history.keys())) except RuntimeError: return loss_labels = [] for i, sysid in enumerate(sysids): color = colors[i % len(colors)] try: t_hist, r_hist = list(time_history[sysid]), list(rssi_history[sysid]) lt_hist, l_hist = list(packet_loss_time_history.get(sysid, [])), list(packet_loss_history.get(sysid, [])) except RuntimeError: continue rssi_recent = [idx for idx, ts in enumerate(t_hist) if now - ts <= 10] if rssi_recent: ax1.plot([now - t_hist[idx] for idx in rssi_recent], [r_hist[idx] for idx in rssi_recent], label=f"SYSID:{sysid}", color=color) loss_recent = [idx for idx, ts in enumerate(lt_hist) if now - ts <= 10] if loss_recent: loss_t = [now - lt_hist[idx] for idx in loss_recent] loss_r = [l_hist[idx] for idx in loss_recent] ax2.plot(loss_t, loss_r, label=f"SYSID:{sysid}", color=color, marker='o', markersize=3) if loss_r: loss_labels.append({ 'sysid': sysid, 'y_real': loss_r[-1], 'x_real': loss_t[-1], 'color': color }) if loss_labels: loss_labels = sorted(loss_labels, key=lambda k: k['y_real']) min_gap = 12.0 y_positions = [lbl['y_real'] for lbl in loss_labels] for j in range(1, len(y_positions)): if y_positions[j] - y_positions[j-1] < min_gap: y_positions[j] = y_positions[j-1] + min_gap if y_positions[-1] > 90: shift = y_positions[-1] - 90 y_positions = [y - shift for y in y_positions] for j, lbl in enumerate(loss_labels): sysid = lbl['sysid'] color = lbl['color'] real_y = lbl['y_real'] text_y = y_positions[j] ax2.text(0.5, text_y, f'ID:{sysid} ({real_y:.1f}%)', bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.8), fontsize=10, fontweight='bold', color='white', horizontalalignment='right', verticalalignment='center') if abs(real_y - text_y) > 1.0: ax2.plot([lbl['x_real'], 0.5], [real_y, text_y], color=color, linestyle=':', alpha=0.6) ax1.legend(loc="upper left") ax2.legend(loc="upper left") ani = animation.FuncAnimation(fig, update_plot, interval=1000) def on_closing(): root.quit() root.destroy() root.protocol("WM_DELETE_WINDOW", on_closing) root.mainloop() if __name__ == '__main__': threading.Thread(target=lambda: asyncio.run(async_main()), daemon=True).start() start_gui()