From 75d9d260e107a7f00b8287d45ac384f48b3f2d08 Mon Sep 17 00:00:00 2001 From: lenting89 Date: Tue, 28 Apr 2026 16:30:58 +0800 Subject: [PATCH] add esp32 and udptest8-2 to unitdev04 --- src/unitdev04/esp32-final.py | 151 +++++++++++ src/unitdev04/udptest8-2.py | 500 +++++++++++++++++++++++++++++++++++ 2 files changed, 651 insertions(+) create mode 100644 src/unitdev04/esp32-final.py create mode 100644 src/unitdev04/udptest8-2.py diff --git a/src/unitdev04/esp32-final.py b/src/unitdev04/esp32-final.py new file mode 100644 index 0000000..ab00cef --- /dev/null +++ b/src/unitdev04/esp32-final.py @@ -0,0 +1,151 @@ +from machine import UART +import time +import struct +import gc + +# ================= 設定區 ================= +FC_BAUDRATE = 115200 +XB_BAUDRATE = 115200 +DEST_64 = b'\x00\x13\xA2\x00\x42\x5B\x9D\xC8' +XBEE_MAX_PAYLOAD = 100 +MY_SYSID = 15 # 第二台就改成 10,第三台 15 +POLL_MAGIC = b'POLL' + +tx_buf = bytearray() +MAX_BUF_SIZE = 6144 # 保留安全餘裕的緩衝區大小 +# ========================================= + +uart_fc = UART(1, baudrate=FC_BAUDRATE, tx=32, rx=33, rxbuf=4096) +uart_xb = UART(2, baudrate=XB_BAUDRATE, tx=25, rx=26, rxbuf=4096) + +rx_buf = bytearray() + +def get_checksum(data): + return 0xFF - (sum(data) & 0xFF) + +def send_to_xbee_chunked(payload): + total_len = len(payload) + sent_len = 0 + + while sent_len < total_len: + end_len = min(sent_len + XBEE_MAX_PAYLOAD, total_len) + chunk = payload[sent_len : end_len] + sent_len = end_len + + frame_content = bytearray() + frame_content.append(0x10) + frame_content.append(0x00) # 0x00 代表不要求 XBee ACK,極大加快發送速度 + frame_content.extend(DEST_64) + frame_content.extend(b'\xFF\xFE') + frame_content.append(0x00) + frame_content.append(0x00) + frame_content.extend(chunk) + + length = len(frame_content) + checksum = get_checksum(frame_content) + + packet = bytearray() + packet.append(0x7E) + packet.append((length >> 8) & 0xFF) + packet.append(length & 0xFF) + packet.extend(frame_content) + packet.append(checksum) + + uart_xb.write(packet) + # 僅需極短暫延遲,讓硬體發送緩衝區消化即可 + time.sleep_ms(1) + +def process_xbee_buffer(): + global rx_buf, tx_buf + while True: + start_pos = rx_buf.find(b'\x7E') + if start_pos == -1: + rx_buf = bytearray() # 避免用 clear() + return + + if start_pos > 0: + rx_buf = rx_buf[start_pos:] + + if len(rx_buf) < 3: return + + pkt_len = (rx_buf[1] << 8) | rx_buf[2] + total_len = pkt_len + 4 + + if pkt_len > 300: + rx_buf = rx_buf[1:] + continue + + if len(rx_buf) < total_len: return + + packet = rx_buf[3 : 3+pkt_len] + checksum_recv = rx_buf[3+pkt_len] + + if get_checksum(packet) == checksum_recv: + if packet[0] == 0x90: + real_data = packet[12:] + if len(real_data) == 5 and real_data.startswith(POLL_MAGIC): + if real_data[4] == MY_SYSID: + flush_tx_buffer() + else: + uart_fc.write(real_data) + + rx_buf = rx_buf[total_len:] + else: + rx_buf = rx_buf[1:] + +def flush_tx_buffer(): + global tx_buf + if len(tx_buf) == 0: + return + + # 單次最大發送量設為 1200 bytes,避免在空中飛行超過時槽導致碰撞 + send_limit = min(len(tx_buf), 1200) + + cut_idx = send_limit + if send_limit < len(tx_buf): + last_fd_pos = tx_buf[:send_limit].rfind(b'\xFD') + if last_fd_pos > 0: + cut_idx = last_fd_pos + + data_to_send = tx_buf[:cut_idx] + tx_buf = tx_buf[cut_idx:] + + send_to_xbee_chunked(data_to_send) + +# 初始清理 +gc.collect() + +while True: + try: + # FC -> 緩衝區 + if uart_fc.any(): + data = uart_fc.read(250) + if data: + tx_buf.extend(data) + + # 聰明的溫和防爆機制 (砍掉舊的 2/3,保留最新 1/3) + if len(tx_buf) > MAX_BUF_SIZE: + safe_cut = tx_buf.find(b'\xFD', (MAX_BUF_SIZE // 3) * 2) + if safe_cut != -1: + tx_buf = tx_buf[safe_cut:] + else: + tx_buf = tx_buf[(MAX_BUF_SIZE // 2):] + + # XBee -> FC + if uart_xb.any(): + chunk = uart_xb.read() + if chunk: + rx_buf.extend(chunk) + process_xbee_buffer() + + time.sleep_ms(1) + + except MemoryError: + # 發生 OOM 時的「斷尾求生」法,強制捨棄舊變數以釋放空間,防止死機 + tx_buf = bytearray() + rx_buf = bytearray() + gc.collect() + time.sleep_ms(10) + + except Exception as e: + time.sleep_ms(2) diff --git a/src/unitdev04/udptest8-2.py b/src/unitdev04/udptest8-2.py new file mode 100644 index 0000000..1b4e98a --- /dev/null +++ b/src/unitdev04/udptest8-2.py @@ -0,0 +1,500 @@ +import asyncio +import serial_asyncio +import struct +import serial +import time +import threading +import tkinter as tk +from tkinter import ttk +from collections import deque, defaultdict +from pymavlink import mavutil +import matplotlib.pyplot as plt +import matplotlib.animation as animation +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg + +# === 多組設備設定 === +CONFIGS = [ + {"serial_port": "/dev/ttyUSB0", "udp_port": 14551}, + {"serial_port": "COM15", "udp_port": 14570}, + {"serial_port": "/dev/ttyUSB2", "udp_port": 14553}, + {"serial_port": "/dev/ttyUSB3", "udp_port": 14554}, +] + +SERIAL_BAUDRATE = 115200 +UDP_REMOTE_IP = '127.0.0.1' +TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' +ACTIVE_SYSIDS = [3, 10, 15] +POLL_MAGIC = b'POLL' + +# === 全域變數與狀態追蹤 === +current_base_slot_ms = 250 # 預設 TDMA 時槽 +LOSS_TIME_WINDOW_SEC = 5.0 # 丟包率計算的時間視窗 (5秒) + +uav_states = { + sysid: { + "mode": "NORMAL", + } for sysid in ACTIVE_SYSIDS +} + +rssi_history = defaultdict(lambda: deque(maxlen=5000)) +time_history = defaultdict(lambda: deque(maxlen=5000)) +packet_loss_history = defaultdict(lambda: deque(maxlen=1000)) +packet_loss_time_history = defaultdict(lambda: deque(maxlen=1000)) +mavlink_sequence_tracker = defaultdict(dict) +packet_loss_stats = defaultdict(lambda: {'loss_rate': 0.0, 'total_received': 0, 'total_lost': 0}) +serial_to_sysid = {} +serial_last_mavlink_time = {} +last_db_query_time = {} + +# === 核心邏輯區 === + +def calculate_packet_loss(sysid, compid, current_seq): + global mavlink_sequence_tracker, packet_loss_stats + tracker = mavlink_sequence_tracker[sysid] + now = time.time() + + if compid not in tracker: + tracker[compid] = { + 'last_seq': current_seq, + 'history': deque() + } + return 0.0 + + comp_tracker = tracker[compid] + last_seq = comp_tracker['last_seq'] + + if current_seq > last_seq: + expected = current_seq - last_seq + elif current_seq < last_seq: + expected = (255 - last_seq) + current_seq + 1 + else: + return packet_loss_history[sysid][-1] if packet_loss_history[sysid] else 0.0 + + lost = max(0, expected - 1) + + comp_tracker['history'].append((now, expected, lost)) + comp_tracker['last_seq'] = current_seq + + total_expected_all = 0 + total_lost_all = 0 + + for c_id, c_data in tracker.items(): + if 'history' in c_data: + while c_data['history'] and (now - c_data['history'][0][0]) > LOSS_TIME_WINDOW_SEC: + c_data['history'].popleft() + + total_expected_all += sum(item[1] for item in c_data['history']) + total_lost_all += sum(item[2] for item in c_data['history']) + + overall_loss_rate = (total_lost_all / total_expected_all) * 100.0 if total_expected_all > 0 else 0.0 + + packet_loss_stats[sysid] = { + 'loss_rate': overall_loss_rate, + 'total_received': total_expected_all - total_lost_all, + 'total_lost': total_lost_all + } + + packet_loss_history[sysid].append(overall_loss_rate) + packet_loss_time_history[sysid].append(now) + return overall_loss_rate + +def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x00) -> bytes: + frame = b'\x10' + struct.pack(">B", frame_id) + dest_addr64 + b'\xFF\xFE\x00\x00' + data + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", 0xFF - (sum(frame) & 0xFF)) + +class SerialToUDP(asyncio.Protocol): + def send_poll(self, target_sysid): + poll_payload = POLL_MAGIC + struct.pack("B", target_sysid) + api_frame = build_api_tx_frame(poll_payload, TARGET_ADDR64, 0x00) + self.transport.write(api_frame) + + def __init__(self, udp_protocol, serial_port): + self.udp_protocol = udp_protocol + self.serial_port = serial_port + self.buffer = bytearray() + self.gcs_tx_queue = bytearray() + + def connection_made(self, transport): + self.transport = transport + if hasattr(self.udp_protocol, 'set_serial_transport'): + self.udp_protocol.set_serial_transport(self) + print(f"[{self.serial_port}] Serial connection established.") + + def data_received(self, data): + self.buffer.extend(data) + while True: + try: + start_idx = self.buffer.index(0x7E) + if start_idx > 0: + del self.buffer[:start_idx] + except ValueError: + self.buffer.clear() + return + + if len(self.buffer) < 3: return + + length = (self.buffer[1] << 8) | self.buffer[2] + full_length = 3 + length + 1 + + if length > 300: + self.buffer.pop(0) + continue + + if len(self.buffer) < full_length: return + + frame = self.buffer[:full_length] + checksum = 0xFF - (sum(frame[3:-1]) & 0xFF) + + if checksum != frame[-1]: + self.buffer.pop(0) + continue + + del self.buffer[:full_length] + + if hasattr(self.udp_protocol, 'send_udp'): + self.udp_protocol.send_udp(bytes(frame)) + + if frame[3] == 0x88 and frame[5:7] == b'DB': + status = frame[7] + if status == 0x00 and len(frame) > 8: + rssi_value = frame[8] + now = time.time() + last_time = serial_last_mavlink_time.get(self.serial_port, 0) + if now - last_time <= 0.5: + sysid = serial_to_sysid.get(self.serial_port, None) + if sysid is not None: + rssi_history[sysid].append(-rssi_value) + time_history[sysid].append(now) + + def write_to_serial(self, data): + self.gcs_tx_queue.extend(data) + + def flush_gcs_queue(self): + if not self.gcs_tx_queue: return + send_limit = min(len(self.gcs_tx_queue), 150) + data_to_send = self.gcs_tx_queue[:send_limit] + self.gcs_tx_queue = self.gcs_tx_queue[send_limit:] + asyncio.create_task(self._async_send_chunks(data_to_send)) + + async def _async_send_chunks(self, data): + try: + MAX_PAYLOAD = 80 + sent_len = 0 + while sent_len < len(data): + end_len = min(sent_len + MAX_PAYLOAD, len(data)) + chunk = data[sent_len:end_len] + sent_len = end_len + + api_frame = build_api_tx_frame(chunk, TARGET_ADDR64, 0x00) + self.transport.write(api_frame) + await asyncio.sleep(0.01) + except Exception: + pass + + def send_at_command_db(self): + try: + frame_type = 0x08 + frame_id = 0x52 + at_command = b'DB' + parameter = b'' + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + at_command + parameter + checksum = 0xFF - (sum(frame) & 0xFF) + api_frame = b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + self.transport.write(api_frame) + except Exception: + pass + +class UDPHandler(asyncio.DatagramProtocol): + def __init__(self, udp_port): + self.udp_port = udp_port + self.serial_transport = None + self.transport = None + self.mav_decoder = mavutil.mavlink.MAVLink(None) + + def connection_made(self, transport): + self.transport = transport + + def set_serial_transport(self, serial_transport): + self.serial_transport = serial_transport + + def datagram_received(self, data, addr): + if self.serial_transport: + self.serial_transport.write_to_serial(data) + + def decapsulate_data(self, data): + try: + if not data or data[0] != 0x7E: + return None + length = (data[1] << 8) | data[2] + if len(data) < length + 4: + return None + frame_type = data[3] + if frame_type == 0x90: + rf_data_start = 3 + 12 + return data[rf_data_start:3 + length] + else: + return None + except Exception: + return None + + def send_udp(self, data): + decoded_data = self.decapsulate_data(data) + if decoded_data is None: return + + try: + for byte in decoded_data: + msg = self.mav_decoder.parse_char(bytes([byte])) + if msg: + sysid = msg.get_srcSystem() + compid = msg.get_srcComponent() + seq = msg.get_seq() + if sysid == 0: continue + + if self.serial_transport: + port_name = self.serial_transport.serial_port + serial_to_sysid[port_name] = sysid + serial_last_mavlink_time[port_name] = time.time() + + now = time.time() + last_query = last_db_query_time.get(port_name, 0) + if now - last_query >= 0.5: + self.serial_transport.send_at_command_db() + last_db_query_time[port_name] = now + + calculate_packet_loss(sysid, compid, seq) + except Exception: + pass + + if self.transport: + self.transport.sendto(decoded_data, (UDP_REMOTE_IP, self.udp_port)) + +async def setup_bridge(config): + port, udp = config["serial_port"], config["udp_port"] + try: + ser = serial.Serial(port, SERIAL_BAUDRATE) + ser.close() + except: return None + + loop = asyncio.get_running_loop() + udp_handler = UDPHandler(udp) + await loop.create_datagram_endpoint(lambda: udp_handler, local_addr=('0.0.0.0', 0)) + + serial_proto = SerialToUDP(udp_handler, port) + await serial_asyncio.create_serial_connection(loop, lambda: serial_proto, port, baudrate=SERIAL_BAUDRATE) + return serial_proto + +async def tdma_scheduler(serial_protocols): + print(f"TDMA Scheduler Started... 找到 {len(serial_protocols)} 個可用端口") + while True: + # GCS 下行時槽 + for sp in serial_protocols: + if hasattr(sp, 'flush_gcs_queue'): + sp.flush_gcs_queue() + await asyncio.sleep(0.1) + + # UAV 上行時槽 + for sysid in ACTIVE_SYSIDS: + state = uav_states[sysid] + + if state["mode"] == "INITIALIZING": + # VIP 模式:0.8秒內連發4次點名,暴力獲取參數 + for _ in range(4): + for sp in serial_protocols: + if hasattr(sp, 'send_poll'): + sp.send_poll(sysid) + await asyncio.sleep(0.2) + else: + # NORMAL 模式:聽從滑桿設定的時間 + slot_time = current_base_slot_ms / 1000.0 + for sp in serial_protocols: + if hasattr(sp, 'send_poll'): + sp.send_poll(sysid) + await asyncio.sleep(slot_time) + +async def async_main(): + protocols = await asyncio.gather(*(setup_bridge(cfg) for cfg in CONFIGS)) + valid_protocols = [p for p in protocols if p is not None] + if valid_protocols: + asyncio.create_task(tdma_scheduler(valid_protocols)) + await asyncio.Future() + +# === GUI 介面區 === + +def start_gui(): + root = tk.Tk() + root.title("UAV TDMA Control Station") + root.geometry("1200x800") + + # --- 左側控制面板 --- + control_frame = tk.Frame(root, width=300, bg="#f0f0f0", padx=20, pady=20) + control_frame.pack(side=tk.LEFT, fill=tk.Y) + + tk.Label(control_frame, text="TDMA 基礎時槽控制", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=10) + + slider_val = tk.IntVar(value=current_base_slot_ms) + def on_slider_change(val): + global current_base_slot_ms + current_base_slot_ms = int(float(val)) + val_label.config(text=f"當前設定: {current_base_slot_ms} ms") + + slider = ttk.Scale(control_frame, from_=50, to_=1000, orient='horizontal', variable=slider_val, command=on_slider_change) + slider.pack(fill=tk.X, pady=10) + val_label = tk.Label(control_frame, text=f"當前設定: {current_base_slot_ms} ms", bg="#f0f0f0") + val_label.pack() + + tk.Label(control_frame, text="群機初始化控制 (INIT)", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=(30, 10)) + + # === 1. 改為單選按鈕 (Radiobutton) === + init_var = tk.IntVar(value=0) # 0 代表全體 NORMAL + radios = {} + status_labels = {} + + def on_radio_change(): + selected = init_var.get() + for sysid in ACTIVE_SYSIDS: + if sysid == selected: + uav_states[sysid]["mode"] = "INITIALIZING" + else: + uav_states[sysid]["mode"] = "NORMAL" + + # 新增一個「取消特權」的選項 + # tk.Radiobutton(control_frame, text="全體 NORMAL (無特權)", variable=init_var, value=0, command=on_radio_change, bg="#f0f0f0", font=("Arial", 11, "bold"), fg="blue").pack(anchor=tk.W, pady=(0, 10)) + + for sysid in ACTIVE_SYSIDS: + frame = tk.Frame(control_frame, bg="#f0f0f0") + frame.pack(fill=tk.X, pady=5) + + rb = tk.Radiobutton(frame, text=f"SYSID {sysid} 專屬載入", variable=init_var, value=sysid, + command=on_radio_change, bg="#f0f0f0", font=("Arial", 11)) + rb.pack(side=tk.LEFT) + radios[sysid] = rb + + lbl = tk.Label(frame, text="NORMAL", font=("Arial", 10, "bold"), fg="green", bg="#f0f0f0") + lbl.pack(side=tk.RIGHT, padx=5) + status_labels[sysid] = lbl + + # === 2. 改為開關式的鎖定按鈕 (Toggle) === + is_locked = False + def toggle_tdma_mode(): + nonlocal is_locked + is_locked = not is_locked + + if is_locked: + # 鎖定:強制切回 0 (全體 NORMAL),並禁用單選按鈕 + init_var.set(0) + on_radio_change() + for rb in radios.values(): + rb.config(state=tk.DISABLED) + # 改變按鈕外觀為「解鎖」狀態 + lock_btn.config(text="點擊解鎖 (允許重新下載參數)", bg="orange") + print("已鎖定進入純 TDMA 模式") + else: + # 解鎖:恢復單選按鈕功能 + for rb in radios.values(): + rb.config(state=tk.NORMAL) + # 改變按鈕外觀為「鎖定」狀態 + lock_btn.config(text="參數載入完畢,鎖定進入 TDMA", bg="#4CAF50") + print("已解除鎖定,可重新分配特權") + + lock_btn = tk.Button(control_frame, text="參數載入完畢,鎖定進入 TDMA", + font=("Arial", 12, "bold"), bg="#4CAF50", fg="white", + command=toggle_tdma_mode) + lock_btn.pack(pady=30, fill=tk.X) + + # 動態更新狀態標籤 + def update_status_gui(): + for sysid, lbl in status_labels.items(): + mode = uav_states[sysid]["mode"] + if mode == "INITIALIZING": + lbl.config(text="INIT (特權下載中)", fg="orange") + else: + lbl.config(text="NORMAL", fg="green") + root.after(500, update_status_gui) + + update_status_gui() + + # --- 右側圖表區 --- + plot_frame = tk.Frame(root) + plot_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) + + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(9, 8), dpi=100) + canvas = FigureCanvasTkAgg(fig, master=plot_frame) + canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) + + def update_plot(frame): + ax1.clear() + ax2.clear() + ax1.set_title("RSSI", fontsize=12); ax1.set_xlim(10, 0); ax1.set_ylim(-100, -10); ax1.grid(True, alpha=0.3) + ax2.set_title("Packet Loss Rate (5s Window)", fontsize=12); ax2.set_xlim(10, 0); ax2.set_ylim(0, 100); ax2.grid(True, alpha=0.3) + + now = time.time() + colors = ['blue', 'red', 'green', 'orange', 'purple'] + + try: sysids = sorted(list(rssi_history.keys())) + except RuntimeError: return + + loss_labels = [] + + for i, sysid in enumerate(sysids): + color = colors[i % len(colors)] + try: + t_hist, r_hist = list(time_history[sysid]), list(rssi_history[sysid]) + lt_hist, l_hist = list(packet_loss_time_history.get(sysid, [])), list(packet_loss_history.get(sysid, [])) + except RuntimeError: continue + + rssi_recent = [idx for idx, ts in enumerate(t_hist) if now - ts <= 10] + if rssi_recent: + ax1.plot([now - t_hist[idx] for idx in rssi_recent], [r_hist[idx] for idx in rssi_recent], label=f"SYSID:{sysid}", color=color) + + loss_recent = [idx for idx, ts in enumerate(lt_hist) if now - ts <= 10] + if loss_recent: + loss_t = [now - lt_hist[idx] for idx in loss_recent] + loss_r = [l_hist[idx] for idx in loss_recent] + ax2.plot(loss_t, loss_r, label=f"SYSID:{sysid}", color=color, marker='o', markersize=3) + + if loss_r: + loss_labels.append({ + 'sysid': sysid, 'y_real': loss_r[-1], 'x_real': loss_t[-1], 'color': color + }) + + if loss_labels: + loss_labels = sorted(loss_labels, key=lambda k: k['y_real']) + min_gap = 12.0 + y_positions = [lbl['y_real'] for lbl in loss_labels] + + for j in range(1, len(y_positions)): + if y_positions[j] - y_positions[j-1] < min_gap: + y_positions[j] = y_positions[j-1] + min_gap + + if y_positions[-1] > 90: + shift = y_positions[-1] - 90 + y_positions = [y - shift for y in y_positions] + + for j, lbl in enumerate(loss_labels): + sysid = lbl['sysid'] + color = lbl['color'] + real_y = lbl['y_real'] + text_y = y_positions[j] + + ax2.text(0.5, text_y, f'ID:{sysid} ({real_y:.1f}%)', + bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.8), + fontsize=10, fontweight='bold', color='white', + horizontalalignment='right', verticalalignment='center') + + if abs(real_y - text_y) > 1.0: + ax2.plot([lbl['x_real'], 0.5], [real_y, text_y], color=color, linestyle=':', alpha=0.6) + + ax1.legend(loc="upper left") + ax2.legend(loc="upper left") + + ani = animation.FuncAnimation(fig, update_plot, interval=1000) + + def on_closing(): + root.quit() + root.destroy() + root.protocol("WM_DELETE_WINDOW", on_closing) + root.mainloop() + +if __name__ == '__main__': + threading.Thread(target=lambda: asyncio.run(async_main()), daemon=True).start() + start_gui() \ No newline at end of file