From 0e7070b4123cdb071bfdbbaab992dfb3f872a582 Mon Sep 17 00:00:00 2001 From: lenting89 Date: Wed, 3 Jun 2026 14:12:12 +0800 Subject: [PATCH] update files --- src/unitdev04/esp32-final.py | 151 ------ src/unitdev04/esp32.py | 368 ++++++++++++++ src/unitdev04/udptest8-2.py | 500 ------------------ src/unitdev04/udptest8.py | 956 +++++++++++++++++++++++++---------- 4 files changed, 1066 insertions(+), 909 deletions(-) delete mode 100644 src/unitdev04/esp32-final.py create mode 100644 src/unitdev04/esp32.py delete mode 100644 src/unitdev04/udptest8-2.py diff --git a/src/unitdev04/esp32-final.py b/src/unitdev04/esp32-final.py deleted file mode 100644 index ab00cef..0000000 --- a/src/unitdev04/esp32-final.py +++ /dev/null @@ -1,151 +0,0 @@ -from machine import UART -import time -import struct -import gc - -# ================= 設定區 ================= -FC_BAUDRATE = 115200 -XB_BAUDRATE = 115200 -DEST_64 = b'\x00\x13\xA2\x00\x42\x5B\x9D\xC8' -XBEE_MAX_PAYLOAD = 100 -MY_SYSID = 15 # 第二台就改成 10,第三台 15 -POLL_MAGIC = b'POLL' - -tx_buf = bytearray() -MAX_BUF_SIZE = 6144 # 保留安全餘裕的緩衝區大小 -# ========================================= - -uart_fc = UART(1, baudrate=FC_BAUDRATE, tx=32, rx=33, rxbuf=4096) -uart_xb = UART(2, baudrate=XB_BAUDRATE, tx=25, rx=26, rxbuf=4096) - -rx_buf = bytearray() - -def get_checksum(data): - return 0xFF - (sum(data) & 0xFF) - -def send_to_xbee_chunked(payload): - total_len = len(payload) - sent_len = 0 - - while sent_len < total_len: - end_len = min(sent_len + XBEE_MAX_PAYLOAD, total_len) - chunk = payload[sent_len : end_len] - sent_len = end_len - - frame_content = bytearray() - frame_content.append(0x10) - frame_content.append(0x00) # 0x00 代表不要求 XBee ACK,極大加快發送速度 - frame_content.extend(DEST_64) - frame_content.extend(b'\xFF\xFE') - frame_content.append(0x00) - frame_content.append(0x00) - frame_content.extend(chunk) - - length = len(frame_content) - checksum = get_checksum(frame_content) - - packet = bytearray() - packet.append(0x7E) - packet.append((length >> 8) & 0xFF) - packet.append(length & 0xFF) - packet.extend(frame_content) - packet.append(checksum) - - uart_xb.write(packet) - # 僅需極短暫延遲,讓硬體發送緩衝區消化即可 - time.sleep_ms(1) - -def process_xbee_buffer(): - global rx_buf, tx_buf - while True: - start_pos = rx_buf.find(b'\x7E') - if start_pos == -1: - rx_buf = bytearray() # 避免用 clear() - return - - if start_pos > 0: - rx_buf = rx_buf[start_pos:] - - if len(rx_buf) < 3: return - - pkt_len = (rx_buf[1] << 8) | rx_buf[2] - total_len = pkt_len + 4 - - if pkt_len > 300: - rx_buf = rx_buf[1:] - continue - - if len(rx_buf) < total_len: return - - packet = rx_buf[3 : 3+pkt_len] - checksum_recv = rx_buf[3+pkt_len] - - if get_checksum(packet) == checksum_recv: - if packet[0] == 0x90: - real_data = packet[12:] - if len(real_data) == 5 and real_data.startswith(POLL_MAGIC): - if real_data[4] == MY_SYSID: - flush_tx_buffer() - else: - uart_fc.write(real_data) - - rx_buf = rx_buf[total_len:] - else: - rx_buf = rx_buf[1:] - -def flush_tx_buffer(): - global tx_buf - if len(tx_buf) == 0: - return - - # 單次最大發送量設為 1200 bytes,避免在空中飛行超過時槽導致碰撞 - send_limit = min(len(tx_buf), 1200) - - cut_idx = send_limit - if send_limit < len(tx_buf): - last_fd_pos = tx_buf[:send_limit].rfind(b'\xFD') - if last_fd_pos > 0: - cut_idx = last_fd_pos - - data_to_send = tx_buf[:cut_idx] - tx_buf = tx_buf[cut_idx:] - - send_to_xbee_chunked(data_to_send) - -# 初始清理 -gc.collect() - -while True: - try: - # FC -> 緩衝區 - if uart_fc.any(): - data = uart_fc.read(250) - if data: - tx_buf.extend(data) - - # 聰明的溫和防爆機制 (砍掉舊的 2/3,保留最新 1/3) - if len(tx_buf) > MAX_BUF_SIZE: - safe_cut = tx_buf.find(b'\xFD', (MAX_BUF_SIZE // 3) * 2) - if safe_cut != -1: - tx_buf = tx_buf[safe_cut:] - else: - tx_buf = tx_buf[(MAX_BUF_SIZE // 2):] - - # XBee -> FC - if uart_xb.any(): - chunk = uart_xb.read() - if chunk: - rx_buf.extend(chunk) - process_xbee_buffer() - - time.sleep_ms(1) - - except MemoryError: - # 發生 OOM 時的「斷尾求生」法,強制捨棄舊變數以釋放空間,防止死機 - tx_buf = bytearray() - rx_buf = bytearray() - gc.collect() - time.sleep_ms(10) - - except Exception as e: - time.sleep_ms(2) diff --git a/src/unitdev04/esp32.py b/src/unitdev04/esp32.py new file mode 100644 index 0000000..54a0a48 --- /dev/null +++ b/src/unitdev04/esp32.py @@ -0,0 +1,368 @@ +from machine import UART +import time +import struct +import gc + +# ========================================================= +# ESP32 / MicroPython:UAV 端 XBee <-> Flight Controller Bridge +# Packet-size TDMA + DONE 標籤版 +# +# 核心行為: +# 1. FC -> ESP32:持續把 MAVLink bytes 存進 tx_buf。 +# 2. GCS -> ESP32:收到 POLL + SYSID + quota_bytes 才允許上行傳送。 +# 3. ESP32 -> GCS:每次最多送 quota_bytes,但只送「完整 MAVLink frame」。 +# 4. ESP32 -> GCS:送完後獨立送 DONE report,GCS 收到後可立即換下一台 UAV。 +# +# 注意:DONE 不是接在 MAVLink bytes 後面,而是獨立 RF payload,避免污染 MAVLink stream。 +# ========================================================= + +# ================= 設定區 ================= +FC_BAUDRATE = 115200 +XB_BAUDRATE = 115200 + +# 這裡要改成你的地面站 / coordinator XBee 64-bit address +DEST_64 = b'\x00\x13\xA2\x00\x42\x5B\x9D\xC8' + +# XBee API Transmit Request 內每次 RF payload 最大切片。 +# 900HP / DigiMesh 實測常用 80~100 bytes 較穩。 +XBEE_MAX_PAYLOAD = 100 + +# 每台 UAV 都要設定成自己的 SYSID +MY_SYSID = 15 # 第二台可改 10,第三台可改 15,依你的 MAVLink SYSID 設定 + +# GCS -> UAV poll 格式: +# 舊版:b'POLL' + sysid(1) +# 新版:b'POLL' + sysid(1) + quota_bytes(2, big-endian) +POLL_MAGIC = b'POLL' +DEFAULT_GRANT_BYTES = 600 + +# UAV -> GCS done 格式: +# b'DONE' + sysid(1) + sent_len(2) + remain_len(2) +DONE_MAGIC = b'DONE' + +# ESP32 RAM 有限,不要讓 FC 資料無限累積 +MAX_BUF_SIZE = 6144 +READ_FC_BYTES = 250 +# ========================================= + +uart_fc = UART(1, baudrate=FC_BAUDRATE, tx=32, rx=33, rxbuf=4096) +uart_xb = UART(2, baudrate=XB_BAUDRATE, tx=25, rx=26, rxbuf=4096) + +tx_buf = bytearray() # FC -> GCS 等待 TDMA poll 的 MAVLink stream +rx_buf = bytearray() # XBee API frame parser buffer + + +def get_checksum(data): + return 0xFF - (sum(data) & 0xFF) + + +def build_api_tx_frame(payload): + """建立 XBee API frame type 0x10,把 payload 送到 DEST_64。""" + frame_content = bytearray() + frame_content.append(0x10) # Transmit Request + frame_content.append(0x00) # Frame ID = 0,不要求 XBee ACK,降低延遲 + frame_content.extend(DEST_64) + frame_content.extend(b'\xFF\xFE') + frame_content.append(0x00) # broadcast radius + frame_content.append(0x00) # options + frame_content.extend(payload) + + length = len(frame_content) + packet = bytearray() + packet.append(0x7E) + packet.append((length >> 8) & 0xFF) + packet.append(length & 0xFF) + packet.extend(frame_content) + packet.append(get_checksum(frame_content)) + return packet + + +def send_to_xbee_chunked(payload): + """把 payload 依 XBEE_MAX_PAYLOAD 切成多個 XBee API TX frame 送出。""" + total_len = len(payload) + sent_len = 0 + + while sent_len < total_len: + end_len = min(sent_len + XBEE_MAX_PAYLOAD, total_len) + chunk = payload[sent_len:end_len] + sent_len = end_len + uart_xb.write(build_api_tx_frame(chunk)) + # 很短的讓步,避免 ESP32 UART/XBee 本地 buffer 瞬間塞爆 + time.sleep_ms(1) + + +def send_done_report(sent_len, remain_len): + """送出 TDMA DONE 標籤。此封包不會轉給飛控,只給 GCS scheduler 使用。""" + if sent_len > 65535: + sent_len = 65535 + if remain_len > 65535: + remain_len = 65535 + + payload = DONE_MAGIC + struct.pack('>BHH', MY_SYSID, sent_len, remain_len) + uart_xb.write(build_api_tx_frame(payload)) + time.sleep_ms(1) + + +def find_first_mavlink_magic(buf): + """找 MAVLink v1/v2 magic:0xFE 或 0xFD。""" + pos_fe = buf.find(b'\xFE') + pos_fd = buf.find(b'\xFD') + + if pos_fe == -1: + return pos_fd + if pos_fd == -1: + return pos_fe + return pos_fe if pos_fe < pos_fd else pos_fd + + +def mavlink_frame_length(buf, start_idx): + """ + 回傳從 start_idx 開始的一包 MAVLink frame 長度。 + 若 header 還不完整或整包還沒收完,回傳 None。 + + MAVLink v1: + magic 0xFE, payload_len at byte 1, total = payload_len + 8 + MAVLink v2: + magic 0xFD, payload_len at byte 1, incompat_flags at byte 2, + total = payload_len + 12,若 signed 則 +13 + """ + if start_idx >= len(buf): + return None + + magic = buf[start_idx] + + if magic == 0xFE: + # v1 至少需要 magic + len + if len(buf) - start_idx < 2: + return None + payload_len = buf[start_idx + 1] + total_len = payload_len + 8 + if len(buf) - start_idx < total_len: + return None + return total_len + + if magic == 0xFD: + # v2 至少需要 magic + len + incompat_flags + if len(buf) - start_idx < 3: + return None + payload_len = buf[start_idx + 1] + incompat_flags = buf[start_idx + 2] + signed = (incompat_flags & 0x01) != 0 + total_len = payload_len + 12 + (13 if signed else 0) + if len(buf) - start_idx < total_len: + return None + return total_len + + return None + + +def pop_mavlink_frames_by_quota(quota_bytes): + """ + 從 tx_buf 取出不超過 quota_bytes 的完整 MAVLink frames。 + 不會使用 rfind(0xFD) 亂切,避免 payload 中剛好有 0xFD 造成誤判。 + + 若第一個完整 MAVLink frame 比 quota 還大,仍會送出第一包,避免永遠卡住。 + """ + global tx_buf + + if quota_bytes <= 0 or len(tx_buf) == 0: + return b'' + + # 丟掉 MAVLink magic 前面的雜訊 + start = find_first_mavlink_magic(tx_buf) + if start == -1: + tx_buf = bytearray() + return b'' + if start > 0: + tx_buf = tx_buf[start:] + + out = bytearray() + + while len(tx_buf) > 0: + # 再次對齊 magic,避免中間出現雜訊 + if tx_buf[0] not in (0xFE, 0xFD): + start = find_first_mavlink_magic(tx_buf) + if start == -1: + tx_buf = bytearray() + break + tx_buf = tx_buf[start:] + + frame_len = mavlink_frame_length(tx_buf, 0) + if frame_len is None: + # 第一包尚未完整,留在 buffer 等下一輪 FC bytes + break + + # 正常情況:加了這包會超過 quota,就先停止 + if len(out) > 0 and (len(out) + frame_len) > quota_bytes: + break + + # 第一包就比 quota 大:仍送出,避免 quota 設太小導致永遠送不出去 + if len(out) == 0 and frame_len > quota_bytes: + out.extend(tx_buf[:frame_len]) + tx_buf = tx_buf[frame_len:] + break + + out.extend(tx_buf[:frame_len]) + tx_buf = tx_buf[frame_len:] + + if len(out) >= quota_bytes: + break + + return bytes(out) + + +def trim_tx_buffer_if_needed(): + """tx_buf 過大時,丟掉較舊的完整 MAVLink frames,保留較新的資料。""" + global tx_buf + + if len(tx_buf) <= MAX_BUF_SIZE: + return + + target_size = MAX_BUF_SIZE // 2 + + while len(tx_buf) > target_size: + start = find_first_mavlink_magic(tx_buf) + if start == -1: + tx_buf = bytearray() + return + if start > 0: + tx_buf = tx_buf[start:] + continue + + frame_len = mavlink_frame_length(tx_buf, 0) + if frame_len is None: + # 若剩下的是超大半包資料,直接保留最後 target_size bytes + if len(tx_buf) > target_size: + tx_buf = tx_buf[-target_size:] + return + + # 丟掉最舊的一包完整 MAVLink frame + tx_buf = tx_buf[frame_len:] + + +def flush_tx_buffer(grant_bytes): + """ + 收到自己 SYSID 的 POLL 後執行: + 1. 根據 grant_bytes 取出完整 MAVLink frames。 + 2. 用 XBee API frame 送出去。 + 3. 獨立送 DONE report,讓 GCS 立即進下一台。 + """ + global tx_buf + + if grant_bytes <= 0: + send_done_report(0, len(tx_buf)) + return + + data_to_send = pop_mavlink_frames_by_quota(grant_bytes) + sent_len = len(data_to_send) + + if sent_len > 0: + send_to_xbee_chunked(data_to_send) + + send_done_report(sent_len, len(tx_buf)) + + +def parse_poll_payload(real_data): + """ + 支援兩種 poll: + 舊版:POLL + sysid,共 5 bytes + 新版:POLL + sysid + quota,共 7 bytes + 回傳 (target_sysid, grant_bytes),不是 poll 則回傳 (None, None)。 + """ + if not real_data.startswith(POLL_MAGIC): + return None, None + + if len(real_data) == 5: + return real_data[4], DEFAULT_GRANT_BYTES + + if len(real_data) == 7: + target_sysid = real_data[4] + grant_bytes = (real_data[5] << 8) | real_data[6] + return target_sysid, grant_bytes + + return None, None + + +def process_xbee_buffer(): + global rx_buf + + while True: + start_pos = rx_buf.find(b'\x7E') + if start_pos == -1: + rx_buf = bytearray() + return + + if start_pos > 0: + rx_buf = rx_buf[start_pos:] + + if len(rx_buf) < 3: + return + + pkt_len = (rx_buf[1] << 8) | rx_buf[2] + total_len = pkt_len + 4 + + # 避免亂資料讓 buffer 爆掉;XBee RX frame 通常不會太大 + if pkt_len > 300: + rx_buf = rx_buf[1:] + continue + + if len(rx_buf) < total_len: + return + + packet = rx_buf[3:3 + pkt_len] + checksum_recv = rx_buf[3 + pkt_len] + + if get_checksum(packet) == checksum_recv: + if packet[0] == 0x90: + # XBee Receive Packet 0x90:RF data 從 packet[12:] 開始 + real_data = packet[12:] + + target_sysid, grant_bytes = parse_poll_payload(real_data) + if target_sysid is not None: + if target_sysid == MY_SYSID: + flush_tx_buffer(grant_bytes) + else: + # 一般 GCS -> FC MAVLink 下行資料 + uart_fc.write(real_data) + + rx_buf = rx_buf[total_len:] + else: + rx_buf = rx_buf[1:] + + +# 初始清理 +gc_ran = False +try: + gc.collect() + gc_ran = True +except Exception: + pass + +while True: + try: + # FC -> ESP32 tx buffer + if uart_fc.any(): + data = uart_fc.read(READ_FC_BYTES) + if data: + tx_buf.extend(data) + trim_tx_buffer_if_needed() + + # XBee -> ESP32 + if uart_xb.any(): + chunk = uart_xb.read() + if chunk: + rx_buf.extend(chunk) + process_xbee_buffer() + + time.sleep_ms(1) + + except MemoryError: + # OOM 時丟掉暫存資料,避免整顆 ESP32 死掉 + tx_buf = bytearray() + rx_buf = bytearray() + gc.collect() + time.sleep_ms(10) + + except Exception: + # 現場飛行時避免因單次解析錯誤中止主迴圈 + time.sleep_ms(2) diff --git a/src/unitdev04/udptest8-2.py b/src/unitdev04/udptest8-2.py deleted file mode 100644 index 1b4e98a..0000000 --- a/src/unitdev04/udptest8-2.py +++ /dev/null @@ -1,500 +0,0 @@ -import asyncio -import serial_asyncio -import struct -import serial -import time -import threading -import tkinter as tk -from tkinter import ttk -from collections import deque, defaultdict -from pymavlink import mavutil -import matplotlib.pyplot as plt -import matplotlib.animation as animation -from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg - -# === 多組設備設定 === -CONFIGS = [ - {"serial_port": "/dev/ttyUSB0", "udp_port": 14551}, - {"serial_port": "COM15", "udp_port": 14570}, - {"serial_port": "/dev/ttyUSB2", "udp_port": 14553}, - {"serial_port": "/dev/ttyUSB3", "udp_port": 14554}, -] - -SERIAL_BAUDRATE = 115200 -UDP_REMOTE_IP = '127.0.0.1' -TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' -ACTIVE_SYSIDS = [3, 10, 15] -POLL_MAGIC = b'POLL' - -# === 全域變數與狀態追蹤 === -current_base_slot_ms = 250 # 預設 TDMA 時槽 -LOSS_TIME_WINDOW_SEC = 5.0 # 丟包率計算的時間視窗 (5秒) - -uav_states = { - sysid: { - "mode": "NORMAL", - } for sysid in ACTIVE_SYSIDS -} - -rssi_history = defaultdict(lambda: deque(maxlen=5000)) -time_history = defaultdict(lambda: deque(maxlen=5000)) -packet_loss_history = defaultdict(lambda: deque(maxlen=1000)) -packet_loss_time_history = defaultdict(lambda: deque(maxlen=1000)) -mavlink_sequence_tracker = defaultdict(dict) -packet_loss_stats = defaultdict(lambda: {'loss_rate': 0.0, 'total_received': 0, 'total_lost': 0}) -serial_to_sysid = {} -serial_last_mavlink_time = {} -last_db_query_time = {} - -# === 核心邏輯區 === - -def calculate_packet_loss(sysid, compid, current_seq): - global mavlink_sequence_tracker, packet_loss_stats - tracker = mavlink_sequence_tracker[sysid] - now = time.time() - - if compid not in tracker: - tracker[compid] = { - 'last_seq': current_seq, - 'history': deque() - } - return 0.0 - - comp_tracker = tracker[compid] - last_seq = comp_tracker['last_seq'] - - if current_seq > last_seq: - expected = current_seq - last_seq - elif current_seq < last_seq: - expected = (255 - last_seq) + current_seq + 1 - else: - return packet_loss_history[sysid][-1] if packet_loss_history[sysid] else 0.0 - - lost = max(0, expected - 1) - - comp_tracker['history'].append((now, expected, lost)) - comp_tracker['last_seq'] = current_seq - - total_expected_all = 0 - total_lost_all = 0 - - for c_id, c_data in tracker.items(): - if 'history' in c_data: - while c_data['history'] and (now - c_data['history'][0][0]) > LOSS_TIME_WINDOW_SEC: - c_data['history'].popleft() - - total_expected_all += sum(item[1] for item in c_data['history']) - total_lost_all += sum(item[2] for item in c_data['history']) - - overall_loss_rate = (total_lost_all / total_expected_all) * 100.0 if total_expected_all > 0 else 0.0 - - packet_loss_stats[sysid] = { - 'loss_rate': overall_loss_rate, - 'total_received': total_expected_all - total_lost_all, - 'total_lost': total_lost_all - } - - packet_loss_history[sysid].append(overall_loss_rate) - packet_loss_time_history[sysid].append(now) - return overall_loss_rate - -def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x00) -> bytes: - frame = b'\x10' + struct.pack(">B", frame_id) + dest_addr64 + b'\xFF\xFE\x00\x00' + data - return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", 0xFF - (sum(frame) & 0xFF)) - -class SerialToUDP(asyncio.Protocol): - def send_poll(self, target_sysid): - poll_payload = POLL_MAGIC + struct.pack("B", target_sysid) - api_frame = build_api_tx_frame(poll_payload, TARGET_ADDR64, 0x00) - self.transport.write(api_frame) - - def __init__(self, udp_protocol, serial_port): - self.udp_protocol = udp_protocol - self.serial_port = serial_port - self.buffer = bytearray() - self.gcs_tx_queue = bytearray() - - def connection_made(self, transport): - self.transport = transport - if hasattr(self.udp_protocol, 'set_serial_transport'): - self.udp_protocol.set_serial_transport(self) - print(f"[{self.serial_port}] Serial connection established.") - - def data_received(self, data): - self.buffer.extend(data) - while True: - try: - start_idx = self.buffer.index(0x7E) - if start_idx > 0: - del self.buffer[:start_idx] - except ValueError: - self.buffer.clear() - return - - if len(self.buffer) < 3: return - - length = (self.buffer[1] << 8) | self.buffer[2] - full_length = 3 + length + 1 - - if length > 300: - self.buffer.pop(0) - continue - - if len(self.buffer) < full_length: return - - frame = self.buffer[:full_length] - checksum = 0xFF - (sum(frame[3:-1]) & 0xFF) - - if checksum != frame[-1]: - self.buffer.pop(0) - continue - - del self.buffer[:full_length] - - if hasattr(self.udp_protocol, 'send_udp'): - self.udp_protocol.send_udp(bytes(frame)) - - if frame[3] == 0x88 and frame[5:7] == b'DB': - status = frame[7] - if status == 0x00 and len(frame) > 8: - rssi_value = frame[8] - now = time.time() - last_time = serial_last_mavlink_time.get(self.serial_port, 0) - if now - last_time <= 0.5: - sysid = serial_to_sysid.get(self.serial_port, None) - if sysid is not None: - rssi_history[sysid].append(-rssi_value) - time_history[sysid].append(now) - - def write_to_serial(self, data): - self.gcs_tx_queue.extend(data) - - def flush_gcs_queue(self): - if not self.gcs_tx_queue: return - send_limit = min(len(self.gcs_tx_queue), 150) - data_to_send = self.gcs_tx_queue[:send_limit] - self.gcs_tx_queue = self.gcs_tx_queue[send_limit:] - asyncio.create_task(self._async_send_chunks(data_to_send)) - - async def _async_send_chunks(self, data): - try: - MAX_PAYLOAD = 80 - sent_len = 0 - while sent_len < len(data): - end_len = min(sent_len + MAX_PAYLOAD, len(data)) - chunk = data[sent_len:end_len] - sent_len = end_len - - api_frame = build_api_tx_frame(chunk, TARGET_ADDR64, 0x00) - self.transport.write(api_frame) - await asyncio.sleep(0.01) - except Exception: - pass - - def send_at_command_db(self): - try: - frame_type = 0x08 - frame_id = 0x52 - at_command = b'DB' - parameter = b'' - frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + at_command + parameter - checksum = 0xFF - (sum(frame) & 0xFF) - api_frame = b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) - self.transport.write(api_frame) - except Exception: - pass - -class UDPHandler(asyncio.DatagramProtocol): - def __init__(self, udp_port): - self.udp_port = udp_port - self.serial_transport = None - self.transport = None - self.mav_decoder = mavutil.mavlink.MAVLink(None) - - def connection_made(self, transport): - self.transport = transport - - def set_serial_transport(self, serial_transport): - self.serial_transport = serial_transport - - def datagram_received(self, data, addr): - if self.serial_transport: - self.serial_transport.write_to_serial(data) - - def decapsulate_data(self, data): - try: - if not data or data[0] != 0x7E: - return None - length = (data[1] << 8) | data[2] - if len(data) < length + 4: - return None - frame_type = data[3] - if frame_type == 0x90: - rf_data_start = 3 + 12 - return data[rf_data_start:3 + length] - else: - return None - except Exception: - return None - - def send_udp(self, data): - decoded_data = self.decapsulate_data(data) - if decoded_data is None: return - - try: - for byte in decoded_data: - msg = self.mav_decoder.parse_char(bytes([byte])) - if msg: - sysid = msg.get_srcSystem() - compid = msg.get_srcComponent() - seq = msg.get_seq() - if sysid == 0: continue - - if self.serial_transport: - port_name = self.serial_transport.serial_port - serial_to_sysid[port_name] = sysid - serial_last_mavlink_time[port_name] = time.time() - - now = time.time() - last_query = last_db_query_time.get(port_name, 0) - if now - last_query >= 0.5: - self.serial_transport.send_at_command_db() - last_db_query_time[port_name] = now - - calculate_packet_loss(sysid, compid, seq) - except Exception: - pass - - if self.transport: - self.transport.sendto(decoded_data, (UDP_REMOTE_IP, self.udp_port)) - -async def setup_bridge(config): - port, udp = config["serial_port"], config["udp_port"] - try: - ser = serial.Serial(port, SERIAL_BAUDRATE) - ser.close() - except: return None - - loop = asyncio.get_running_loop() - udp_handler = UDPHandler(udp) - await loop.create_datagram_endpoint(lambda: udp_handler, local_addr=('0.0.0.0', 0)) - - serial_proto = SerialToUDP(udp_handler, port) - await serial_asyncio.create_serial_connection(loop, lambda: serial_proto, port, baudrate=SERIAL_BAUDRATE) - return serial_proto - -async def tdma_scheduler(serial_protocols): - print(f"TDMA Scheduler Started... 找到 {len(serial_protocols)} 個可用端口") - while True: - # GCS 下行時槽 - for sp in serial_protocols: - if hasattr(sp, 'flush_gcs_queue'): - sp.flush_gcs_queue() - await asyncio.sleep(0.1) - - # UAV 上行時槽 - for sysid in ACTIVE_SYSIDS: - state = uav_states[sysid] - - if state["mode"] == "INITIALIZING": - # VIP 模式:0.8秒內連發4次點名,暴力獲取參數 - for _ in range(4): - for sp in serial_protocols: - if hasattr(sp, 'send_poll'): - sp.send_poll(sysid) - await asyncio.sleep(0.2) - else: - # NORMAL 模式:聽從滑桿設定的時間 - slot_time = current_base_slot_ms / 1000.0 - for sp in serial_protocols: - if hasattr(sp, 'send_poll'): - sp.send_poll(sysid) - await asyncio.sleep(slot_time) - -async def async_main(): - protocols = await asyncio.gather(*(setup_bridge(cfg) for cfg in CONFIGS)) - valid_protocols = [p for p in protocols if p is not None] - if valid_protocols: - asyncio.create_task(tdma_scheduler(valid_protocols)) - await asyncio.Future() - -# === GUI 介面區 === - -def start_gui(): - root = tk.Tk() - root.title("UAV TDMA Control Station") - root.geometry("1200x800") - - # --- 左側控制面板 --- - control_frame = tk.Frame(root, width=300, bg="#f0f0f0", padx=20, pady=20) - control_frame.pack(side=tk.LEFT, fill=tk.Y) - - tk.Label(control_frame, text="TDMA 基礎時槽控制", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=10) - - slider_val = tk.IntVar(value=current_base_slot_ms) - def on_slider_change(val): - global current_base_slot_ms - current_base_slot_ms = int(float(val)) - val_label.config(text=f"當前設定: {current_base_slot_ms} ms") - - slider = ttk.Scale(control_frame, from_=50, to_=1000, orient='horizontal', variable=slider_val, command=on_slider_change) - slider.pack(fill=tk.X, pady=10) - val_label = tk.Label(control_frame, text=f"當前設定: {current_base_slot_ms} ms", bg="#f0f0f0") - val_label.pack() - - tk.Label(control_frame, text="群機初始化控制 (INIT)", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=(30, 10)) - - # === 1. 改為單選按鈕 (Radiobutton) === - init_var = tk.IntVar(value=0) # 0 代表全體 NORMAL - radios = {} - status_labels = {} - - def on_radio_change(): - selected = init_var.get() - for sysid in ACTIVE_SYSIDS: - if sysid == selected: - uav_states[sysid]["mode"] = "INITIALIZING" - else: - uav_states[sysid]["mode"] = "NORMAL" - - # 新增一個「取消特權」的選項 - # tk.Radiobutton(control_frame, text="全體 NORMAL (無特權)", variable=init_var, value=0, command=on_radio_change, bg="#f0f0f0", font=("Arial", 11, "bold"), fg="blue").pack(anchor=tk.W, pady=(0, 10)) - - for sysid in ACTIVE_SYSIDS: - frame = tk.Frame(control_frame, bg="#f0f0f0") - frame.pack(fill=tk.X, pady=5) - - rb = tk.Radiobutton(frame, text=f"SYSID {sysid} 專屬載入", variable=init_var, value=sysid, - command=on_radio_change, bg="#f0f0f0", font=("Arial", 11)) - rb.pack(side=tk.LEFT) - radios[sysid] = rb - - lbl = tk.Label(frame, text="NORMAL", font=("Arial", 10, "bold"), fg="green", bg="#f0f0f0") - lbl.pack(side=tk.RIGHT, padx=5) - status_labels[sysid] = lbl - - # === 2. 改為開關式的鎖定按鈕 (Toggle) === - is_locked = False - def toggle_tdma_mode(): - nonlocal is_locked - is_locked = not is_locked - - if is_locked: - # 鎖定:強制切回 0 (全體 NORMAL),並禁用單選按鈕 - init_var.set(0) - on_radio_change() - for rb in radios.values(): - rb.config(state=tk.DISABLED) - # 改變按鈕外觀為「解鎖」狀態 - lock_btn.config(text="點擊解鎖 (允許重新下載參數)", bg="orange") - print("已鎖定進入純 TDMA 模式") - else: - # 解鎖:恢復單選按鈕功能 - for rb in radios.values(): - rb.config(state=tk.NORMAL) - # 改變按鈕外觀為「鎖定」狀態 - lock_btn.config(text="參數載入完畢,鎖定進入 TDMA", bg="#4CAF50") - print("已解除鎖定,可重新分配特權") - - lock_btn = tk.Button(control_frame, text="參數載入完畢,鎖定進入 TDMA", - font=("Arial", 12, "bold"), bg="#4CAF50", fg="white", - command=toggle_tdma_mode) - lock_btn.pack(pady=30, fill=tk.X) - - # 動態更新狀態標籤 - def update_status_gui(): - for sysid, lbl in status_labels.items(): - mode = uav_states[sysid]["mode"] - if mode == "INITIALIZING": - lbl.config(text="INIT (特權下載中)", fg="orange") - else: - lbl.config(text="NORMAL", fg="green") - root.after(500, update_status_gui) - - update_status_gui() - - # --- 右側圖表區 --- - plot_frame = tk.Frame(root) - plot_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) - - fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(9, 8), dpi=100) - canvas = FigureCanvasTkAgg(fig, master=plot_frame) - canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) - - def update_plot(frame): - ax1.clear() - ax2.clear() - ax1.set_title("RSSI", fontsize=12); ax1.set_xlim(10, 0); ax1.set_ylim(-100, -10); ax1.grid(True, alpha=0.3) - ax2.set_title("Packet Loss Rate (5s Window)", fontsize=12); ax2.set_xlim(10, 0); ax2.set_ylim(0, 100); ax2.grid(True, alpha=0.3) - - now = time.time() - colors = ['blue', 'red', 'green', 'orange', 'purple'] - - try: sysids = sorted(list(rssi_history.keys())) - except RuntimeError: return - - loss_labels = [] - - for i, sysid in enumerate(sysids): - color = colors[i % len(colors)] - try: - t_hist, r_hist = list(time_history[sysid]), list(rssi_history[sysid]) - lt_hist, l_hist = list(packet_loss_time_history.get(sysid, [])), list(packet_loss_history.get(sysid, [])) - except RuntimeError: continue - - rssi_recent = [idx for idx, ts in enumerate(t_hist) if now - ts <= 10] - if rssi_recent: - ax1.plot([now - t_hist[idx] for idx in rssi_recent], [r_hist[idx] for idx in rssi_recent], label=f"SYSID:{sysid}", color=color) - - loss_recent = [idx for idx, ts in enumerate(lt_hist) if now - ts <= 10] - if loss_recent: - loss_t = [now - lt_hist[idx] for idx in loss_recent] - loss_r = [l_hist[idx] for idx in loss_recent] - ax2.plot(loss_t, loss_r, label=f"SYSID:{sysid}", color=color, marker='o', markersize=3) - - if loss_r: - loss_labels.append({ - 'sysid': sysid, 'y_real': loss_r[-1], 'x_real': loss_t[-1], 'color': color - }) - - if loss_labels: - loss_labels = sorted(loss_labels, key=lambda k: k['y_real']) - min_gap = 12.0 - y_positions = [lbl['y_real'] for lbl in loss_labels] - - for j in range(1, len(y_positions)): - if y_positions[j] - y_positions[j-1] < min_gap: - y_positions[j] = y_positions[j-1] + min_gap - - if y_positions[-1] > 90: - shift = y_positions[-1] - 90 - y_positions = [y - shift for y in y_positions] - - for j, lbl in enumerate(loss_labels): - sysid = lbl['sysid'] - color = lbl['color'] - real_y = lbl['y_real'] - text_y = y_positions[j] - - ax2.text(0.5, text_y, f'ID:{sysid} ({real_y:.1f}%)', - bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.8), - fontsize=10, fontweight='bold', color='white', - horizontalalignment='right', verticalalignment='center') - - if abs(real_y - text_y) > 1.0: - ax2.plot([lbl['x_real'], 0.5], [real_y, text_y], color=color, linestyle=':', alpha=0.6) - - ax1.legend(loc="upper left") - ax2.legend(loc="upper left") - - ani = animation.FuncAnimation(fig, update_plot, interval=1000) - - def on_closing(): - root.quit() - root.destroy() - root.protocol("WM_DELETE_WINDOW", on_closing) - root.mainloop() - -if __name__ == '__main__': - threading.Thread(target=lambda: asyncio.run(async_main()), daemon=True).start() - start_gui() \ No newline at end of file diff --git a/src/unitdev04/udptest8.py b/src/unitdev04/udptest8.py index acf92bd..4ae5ec2 100644 --- a/src/unitdev04/udptest8.py +++ b/src/unitdev04/udptest8.py @@ -1,122 +1,285 @@ -# === import 保持不變 === import asyncio import serial_asyncio import struct import serial -import traceback import time +import threading +import tkinter as tk +from tkinter import ttk from collections import deque, defaultdict from pymavlink import mavutil import matplotlib.pyplot as plt import matplotlib.animation as animation -import threading +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg + +# ========================================================= +# GCS 端:XBee Serial <-> UDP Bridge +# Packet-size TDMA + DONE 標籤 + 精準 RSSI 歸屬 + 最近 5 次 RSSI 平均 +# +# 核心行為: +# 1. GCS 對某個 SYSID 發 POLL + quota_bytes。 +# 2. UAV 收到自己的 POLL 後,送出最多 quota_bytes 的完整 MAVLink frames。 +# 3. UAV 送完後回 DONE,GCS 收到 DONE 後立即 poll 下一台。 +# 4. 收到 DONE 後,GCS 立即發 ATDB 查詢本地 XBee 最近一包 RF packet 的 RSSI。 +# 5. 因為 DONE payload 帶有 SYSID,且 XBee 0x90 frame 帶有 src64,RSSI 不再用 MAVLink SYSID 猜。 +# 6. RSSI 顯示採用「最近 5 次 DB RSSI」平均值,降低跳動。 +# ========================================================= # === 多組設備設定 === +# Windows 測試時建議先只留你的實際 COM port,例如 COM15。 CONFIGS = [ - {"serial_port": "/dev/ttyUSB2", "udp_port": 14551}, - {"serial_port": "COM43", "udp_port": 14552}, - {"serial_port": "COM15", "udp_port": 14553}, + {"serial_port": "/dev/ttyUSB0", "udp_port": 14551}, + {"serial_port": "COM15", "udp_port": 14590}, + {"serial_port": "/dev/ttyUSB2", "udp_port": 14553}, + {"serial_port": "/dev/ttyUSB3", "udp_port": 14554}, ] SERIAL_BAUDRATE = 115200 UDP_REMOTE_IP = '127.0.0.1' + +# Broadcast:由 RF payload 內的 SYSID 決定哪台 ESP32 回應。 TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' -# === 數據存儲 === -rssi_history = defaultdict(lambda: deque(maxlen=5000)) -time_history = defaultdict(lambda: deque(maxlen=5000)) +ACTIVE_SYSIDS = [3, 10, 15] + +# 如果你知道每台 UAV XBee 的 64-bit address,可以填在這裡。 +# 即使不填,本程式也會從 DONE frame 自動學習 src64 -> SYSID。 +# 範例: +# XBEE_ADDR64_TO_SYSID = { +# b'\x00\x13\xA2\x00\x42\x5B\x9D\xC8': 15, +# b'\x00\x13\xA2\x00\x42\x5B\x9D\xAA': 10, +# b'\x00\x13\xA2\x00\x42\x5B\x9D\xBB': 3, +# } +XBEE_ADDR64_TO_SYSID = {} +AUTO_LEARN_XBEE_ADDR = True + +# GCS -> UAV poll 格式:b'POLL' + sysid(1) + quota_bytes(2) +POLL_MAGIC = b'POLL' + +# UAV -> GCS done 格式:b'DONE' + sysid(1) + sent_len(2) + remain_len(2) +DONE_MAGIC = b'DONE' + +# === TDMA 參數 === +# 每次 poll 授權該 UAV 最多送多少 MAVLink 原始資料 bytes。 +current_grant_bytes = 600 + +# 切換下一台 UAV 前的保護時間,避免最後一包還在 XBee/UART buffer 裡。 +current_guard_ms = 20 + +# INITIALIZING 模式給較大的 quota,用於參數載入。 +INIT_GRANT_BYTES = 1200 + +# 丟包率計算時間視窗 +LOSS_TIME_WINDOW_SEC = 5.0 + +# RSSI 最近 N 次平均 +RSSI_AVG_WINDOW = 5 + +# 是否也在資料封包時查 DB。預設 False:只在 DONE 後查 DB,比較不干擾資料流。 +RSSI_QUERY_ON_DATA_FRAME = False +RSSI_DATA_QUERY_MIN_INTERVAL_SEC = 0.5 + +uav_states = { + sysid: { + "mode": "NORMAL", + } for sysid in ACTIVE_SYSIDS +} + +# === 狀態追蹤 === +rssi_history = defaultdict(lambda: deque(maxlen=5000)) # 畫圖用:RSSI avg5 +rssi_raw_windows = defaultdict(lambda: deque(maxlen=RSSI_AVG_WINDOW)) +rssi_time_history = defaultdict(lambda: deque(maxlen=5000)) +rssi_latest_stats = defaultdict(lambda: { + 'raw': None, + 'avg': None, + 'count': 0, + 'time': 0.0, + 'src64': None, +}) -# === 新增:丟包率追蹤 === packet_loss_history = defaultdict(lambda: deque(maxlen=1000)) packet_loss_time_history = defaultdict(lambda: deque(maxlen=1000)) -mavlink_sequence_tracker = defaultdict(dict) # {sysid: {compid: {'last_seq': x, 'total_packets': y, 'lost_packets': z}}} +mavlink_sequence_tracker = defaultdict(dict) packet_loss_stats = defaultdict(lambda: {'loss_rate': 0.0, 'total_received': 0, 'total_lost': 0}) -serial_to_sysid = {} -serial_last_mavlink_time = {} # 優化 1:追蹤各 serial port 最近的 MAVLink 時間 +# 自動學習到的 XBee address 對應 +learned_addr64_to_sysid = {} +learned_sysid_to_addr64 = {} + +# 每個 SYSID 一個 asyncio.Event;收到 DONE 後 set,scheduler 立即換下一台。 +tdma_done_events = {} +tdma_last_reports = defaultdict(lambda: {'time': 0.0, 'sent_len': 0, 'remain_len': 0}) + +# === 工具函式 === + +def format_addr64(addr): + if not addr: + return 'unknown' + return ''.join(f'{b:02X}' for b in addr) + + +def infer_sysid_from_addr64(src64): + if not src64: + return None + if src64 in XBEE_ADDR64_TO_SYSID: + return XBEE_ADDR64_TO_SYSID[src64] + if src64 in learned_addr64_to_sysid: + return learned_addr64_to_sysid[src64] + return None + + +def learn_xbee_source(sysid, src64): + if not AUTO_LEARN_XBEE_ADDR: + return + if sysid is None or src64 is None: + return + if sysid <= 0: + return + learned_addr64_to_sysid[src64] = sysid + learned_sysid_to_addr64[sysid] = src64 + + +def record_rssi(sysid, rssi_positive_db, src64=None): + """ + XBee ATDB 回傳通常是正值,例如 55 表示 -55 dBm。 + 這裡統一轉成負 dBm,並做最近 RSSI_AVG_WINDOW 次平均。 + """ + if sysid is None: + return + + raw_dbm = -int(rssi_positive_db) + win = rssi_raw_windows[sysid] + win.append(raw_dbm) + avg_dbm = sum(win) / len(win) + now = time.time() + + rssi_history[sysid].append(avg_dbm) + rssi_time_history[sysid].append(now) + rssi_latest_stats[sysid] = { + 'raw': raw_dbm, + 'avg': avg_dbm, + 'count': len(win), + 'time': now, + 'src64': src64, + } + def calculate_packet_loss(sysid, compid, current_seq): - """計算丟包率""" global mavlink_sequence_tracker, packet_loss_stats - tracker = mavlink_sequence_tracker[sysid] + now = time.time() + if compid not in tracker: - # 第一次收到這個component的消息 tracker[compid] = { 'last_seq': current_seq, - 'total_packets': 1, - 'lost_packets': 0, - 'last_update': time.time() + 'history': deque() } return 0.0 - + comp_tracker = tracker[compid] last_seq = comp_tracker['last_seq'] - - # 計算序列號差異(處理255溢出) + if current_seq > last_seq: - expected_packets = current_seq - last_seq + expected = current_seq - last_seq elif current_seq < last_seq: - # 序列號溢出(0-255循環) - expected_packets = (255 - last_seq) + current_seq + 1 + expected = (255 - last_seq) + current_seq + 1 else: - # 重複的序列號,忽略 - return comp_tracker.get('loss_rate', 0.0) - - # 更新統計 - comp_tracker['total_packets'] += expected_packets - lost_packets = expected_packets - 1 # 實際收到1個,應該收到expected_packets個 - comp_tracker['lost_packets'] += max(0, lost_packets) + return packet_loss_history[sysid][-1] if packet_loss_history[sysid] else 0.0 + + lost = max(0, expected - 1) + + comp_tracker['history'].append((now, expected, lost)) comp_tracker['last_seq'] = current_seq - comp_tracker['last_update'] = time.time() - - # 計算丟包率 - total_expected = comp_tracker['total_packets'] - total_lost = comp_tracker['lost_packets'] - loss_rate = (total_lost / total_expected) * 100.0 if total_expected > 0 else 0.0 - comp_tracker['loss_rate'] = loss_rate - - # 更新全局統計(按sysid匯總所有component) - total_received = 0 - total_lost_all = 0 + total_expected_all = 0 - - for comp_id, comp_data in tracker.items(): - total_expected_all += comp_data['total_packets'] - total_lost_all += comp_data['lost_packets'] - total_received += comp_data['total_packets'] - comp_data['lost_packets'] - + total_lost_all = 0 + + for c_data in tracker.values(): + if 'history' in c_data: + while c_data['history'] and (now - c_data['history'][0][0]) > LOSS_TIME_WINDOW_SEC: + c_data['history'].popleft() + + total_expected_all += sum(item[1] for item in c_data['history']) + total_lost_all += sum(item[2] for item in c_data['history']) + overall_loss_rate = (total_lost_all / total_expected_all) * 100.0 if total_expected_all > 0 else 0.0 - + packet_loss_stats[sysid] = { 'loss_rate': overall_loss_rate, - 'total_received': total_received, + 'total_received': total_expected_all - total_lost_all, 'total_lost': total_lost_all } - - # 記錄到歷史數據 - now = time.time() + packet_loss_history[sysid].append(overall_loss_rate) packet_loss_time_history[sysid].append(now) - return overall_loss_rate -def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: - frame_type = 0x10 - dest_addr16 = b'\xFF\xFE' - broadcast_radius = 0x00 - options = 0x00 - frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) - frame += dest_addr64 + dest_addr16 - frame += struct.pack(">BB", broadcast_radius, options) + data - checksum = 0xFF - (sum(frame) & 0xFF) - return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + +def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x00) -> bytes: + frame = b'\x10' + struct.pack('>B', frame_id) + dest_addr64 + b'\xFF\xFE\x00\x00' + data + return b'\x7E' + struct.pack('>H', len(frame)) + frame + struct.pack('B', 0xFF - (sum(frame) & 0xFF)) + + +def estimate_tdma_timeout(grant_bytes): + """ + DONE 正常收到時不會等到 timeout。 + timeout 只在 DONE 掉包或該台沒回應時保護 scheduler。 + """ + grant_bytes = max(0, int(grant_bytes)) + max_payload = 100 + chunks = max(1, (grant_bytes + max_payload - 1) // max_payload) + + uart_time = (grant_bytes + chunks * 18 + 32) * 10.0 / SERIAL_BAUDRATE + timeout = uart_time + chunks * 0.012 + (current_guard_ms / 1000.0) + 0.35 + return max(0.12, timeout) + + +async def grant_one_uav(serial_protocols, sysid, grant_bytes): + """授權一台 UAV;收到 DONE 立即結束,否則 timeout 後換下一台。""" + ev = tdma_done_events.get(sysid) + if ev is None: + ev = asyncio.Event() + tdma_done_events[sysid] = ev + ev.clear() + + for sp in serial_protocols: + if hasattr(sp, 'send_poll'): + sp.send_poll(sysid, grant_bytes) + + try: + await asyncio.wait_for(ev.wait(), timeout=estimate_tdma_timeout(grant_bytes)) + except asyncio.TimeoutError: + print(f"[TDMA] SYSID {sysid} timeout; move to next UAV") + + await asyncio.sleep(current_guard_ms / 1000.0) + class SerialToUDP(asyncio.Protocol): def __init__(self, udp_protocol, serial_port): self.udp_protocol = udp_protocol self.serial_port = serial_port self.buffer = bytearray() + self.gcs_tx_queue = bytearray() + self.transport = None + + self.current_poll_sysid = None + self.current_poll_time = 0.0 + + self.at_frame_id = 0x20 + self.pending_db = {} # frame_id -> {'sysid', 'src64', 'time', 'reason'} + self.last_data_db_query_time = 0.0 + + def send_poll(self, target_sysid, grant_bytes=None): + if grant_bytes is None: + grant_bytes = current_grant_bytes + grant_bytes = max(0, min(int(grant_bytes), 65535)) + + self.current_poll_sysid = target_sysid + self.current_poll_time = time.time() + + poll_payload = POLL_MAGIC + struct.pack('>BH', target_sysid, grant_bytes) + api_frame = build_api_tx_frame(poll_payload, TARGET_ADDR64, 0x00) + self.transport.write(api_frame) def connection_made(self, transport): self.transport = transport @@ -126,62 +289,173 @@ class SerialToUDP(asyncio.Protocol): def data_received(self, data): self.buffer.extend(data) + while True: + try: + start_idx = self.buffer.index(0x7E) + if start_idx > 0: + del self.buffer[:start_idx] + except ValueError: + self.buffer.clear() + return + if len(self.buffer) < 3: return - if self.buffer[0] != 0x7E: - self.buffer.pop(0) - continue + length = (self.buffer[1] << 8) | self.buffer[2] full_length = 3 + length + 1 + + # 本程式使用 XBee RF payload 約 80~100 bytes,正常 length 不會很大。 + # 若你把 XBEE_MAX_PAYLOAD 調大,可同步放寬此限制。 + if length > 600: + self.buffer.pop(0) + continue + if len(self.buffer) < full_length: return - frame = self.buffer[:full_length] + + frame = bytes(self.buffer[:full_length]) + checksum = 0xFF - (sum(frame[3:-1]) & 0xFF) + + if checksum != frame[-1]: + self.buffer.pop(0) + continue + del self.buffer[:full_length] + self.handle_api_frame(frame) - if hasattr(self.udp_protocol, 'send_udp'): - self.udp_protocol.send_udp(bytes(frame)) - - if frame[3] == 0x88 and frame[5:7] == b'DB': - status = frame[7] - if status == 0x00 and len(frame) > 8: - rssi_value = frame[8] - now = time.time() - - # === 優化 1:僅信任最近 0.5 秒內有接收 MAVLink 的 port - last_time = serial_last_mavlink_time.get(self.serial_port, 0) - if now - last_time <= 0.5: - sysid = serial_to_sysid.get(self.serial_port, None) - if sysid is not None: - rssi_history[sysid].append(-rssi_value) - time_history[sysid].append(now) - # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm") - else: - print(f"[{self.serial_port}] 找不到 sysid 對應,RSSI = -{rssi_value} dBm,已忽略") - else: - print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLink,RSSI = -{rssi_value} dBm 已忽略") - else: - print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}") + def handle_api_frame(self, frame): + frame_type = frame[3] - def write_to_serial(self, data): - try: - api_frame = build_api_tx_frame(data, TARGET_ADDR64) - self.transport.write(api_frame) - except Exception as e: - print(f"[{self.serial_port} TX Error] 無法封裝或傳送資料: {e}") + if frame_type == 0x90: + # XBee Receive Packet: + # whole API frame: 7E | len(2) | 90 | src64(8) | src16(2) | options(1) | RF data | checksum + src64 = frame[4:12] + rf_data = frame[15:-1] + self.handle_rx_packet(src64, rf_data) + return + + if frame_type == 0x88: + self.handle_at_response(frame) + return + + # 其他如 0x8B Transmit Status 可視需求處理;目前忽略。 + + def handle_rx_packet(self, src64, rf_data): + # 先用 src64 查。若尚未學習,DONE payload 會帶 sysid,可建立對應。 + sysid_hint = infer_sysid_from_addr64(src64) + + # 若在最近 poll 時槽內收到資料,也可作為備援歸屬。 + if sysid_hint is None and self.current_poll_sysid is not None: + if time.time() - self.current_poll_time <= 2.0: + sysid_hint = self.current_poll_sysid + + done_sysid = self.udp_protocol.process_rf_data(rf_data, src64=src64, sysid_hint=sysid_hint) + + if done_sysid is not None: + # DONE payload 明確帶 SYSID;這裡可精準學習該 SYSID 對應的 XBee src64。 + learn_xbee_source(done_sysid, src64) + self.send_at_command_db(done_sysid, src64=src64, reason='DONE') + return - def send_at_command_db(self): + if RSSI_QUERY_ON_DATA_FRAME and sysid_hint is not None: + now = time.time() + if now - self.last_data_db_query_time >= RSSI_DATA_QUERY_MIN_INTERVAL_SEC: + self.send_at_command_db(sysid_hint, src64=src64, reason='DATA') + self.last_data_db_query_time = now + + def next_at_frame_id(self): + self.at_frame_id += 1 + if self.at_frame_id > 0xFE: + self.at_frame_id = 0x20 + return self.at_frame_id + + def send_at_command_db(self, sysid, src64=None, reason=''): + """ + 查本地 XBee 的 DB 值。DB 代表本地 XBee 最近收到 RF packet 的 RSSI。 + 這裡在 DONE 後立即查,所以 RSSI 會歸屬到剛完成 TDMA slot 的 sysid。 + """ try: + frame_id = self.next_at_frame_id() frame_type = 0x08 - frame_id = 0x52 at_command = b'DB' parameter = b'' - frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + at_command + parameter - checksum = 0xFF - (sum(frame) & 0xFF) - api_frame = b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + frame_data = struct.pack('>B', frame_type) + struct.pack('>B', frame_id) + at_command + parameter + checksum = 0xFF - (sum(frame_data) & 0xFF) + api_frame = b'\x7E' + struct.pack('>H', len(frame_data)) + frame_data + struct.pack('B', checksum) + + self.pending_db[frame_id] = { + 'sysid': sysid, + 'src64': src64, + 'time': time.time(), + 'reason': reason, + } + + # 清掉太舊 pending,避免長時間累積 + now = time.time() + stale = [fid for fid, info in self.pending_db.items() if now - info['time'] > 2.0] + for fid in stale: + self.pending_db.pop(fid, None) + self.transport.write(api_frame) except Exception as e: - print(f"[{self.serial_port}] 發送 DB 指令失敗: {e}") + print(f"[{self.serial_port}] send ATDB failed: {e}") + + def handle_at_response(self, frame): + # AT Command Response: 0x88 | frame_id | AT(2) | status | value... + if len(frame) < 9: + return + + frame_id = frame[4] + at_command = frame[5:7] + status = frame[7] + + if at_command != b'DB': + return + if status != 0x00 or len(frame) <= 8: + return + + rssi_value = frame[8] + info = self.pending_db.pop(frame_id, None) + + if info is None: + # 理論上不該發生;做保底:歸屬到目前 TDMA slot。 + sysid = self.current_poll_sysid + src64 = learned_sysid_to_addr64.get(sysid) + else: + sysid = info.get('sysid') + src64 = info.get('src64') + + if sysid is not None: + record_rssi(sysid, rssi_value, src64=src64) + + def write_to_serial(self, data): + self.gcs_tx_queue.extend(data) + + def flush_gcs_queue(self): + """GCS -> UAV 下行資料,小批量送出,避免阻塞上行 TDMA。""" + if not self.gcs_tx_queue: + return + send_limit = min(len(self.gcs_tx_queue), 150) + data_to_send = self.gcs_tx_queue[:send_limit] + self.gcs_tx_queue = self.gcs_tx_queue[send_limit:] + asyncio.create_task(self._async_send_chunks(data_to_send)) + + async def _async_send_chunks(self, data): + try: + max_payload = 80 + sent_len = 0 + while sent_len < len(data): + end_len = min(sent_len + max_payload, len(data)) + chunk = data[sent_len:end_len] + sent_len = end_len + + api_frame = build_api_tx_frame(chunk, TARGET_ADDR64, 0x00) + self.transport.write(api_frame) + await asyncio.sleep(0.01) + except Exception: + pass + class UDPHandler(asyncio.DatagramProtocol): def __init__(self, udp_port): @@ -192,7 +466,6 @@ class UDPHandler(asyncio.DatagramProtocol): def connection_made(self, transport): self.transport = transport - print(f"[UDP:{self.udp_port}] transport ready.") def set_serial_transport(self, serial_transport): self.serial_transport = serial_transport @@ -201,184 +474,351 @@ class UDPHandler(asyncio.DatagramProtocol): if self.serial_transport: self.serial_transport.write_to_serial(data) - def send_udp(self, data): - decoded_data = self.decapsulate_data(data) - if decoded_data is None: - return - self.decode_mavlink_data(decoded_data) - if self.transport: - self.transport.sendto(decoded_data, (UDP_REMOTE_IP, self.udp_port)) + def handle_done_report(self, rf_data): + if len(rf_data) < 9 or not rf_data.startswith(DONE_MAGIC): + return None - def decapsulate_data(self, data): try: - if not data or data[0] != 0x7E: - return None - length = (data[1] << 8) | data[2] - if len(data) < length + 4: - return None - frame_type = data[3] - if frame_type == 0x90: - rf_data_start = 3 + 12 - return data[rf_data_start:3 + length] - else: - return None - except Exception as e: - print(f"[UDP:{self.udp_port} 解封錯誤] {e}") + sysid, sent_len, remain_len = struct.unpack('>BHH', rf_data[4:9]) + tdma_last_reports[sysid] = { + 'time': time.time(), + 'sent_len': sent_len, + 'remain_len': remain_len, + } + + ev = tdma_done_events.get(sysid) + if ev is not None: + ev.set() + + return sysid + except Exception: return None - def decode_mavlink_data(self, data): + def process_rf_data(self, rf_data, src64=None, sysid_hint=None): + """ + 處理 XBee 0x90 的 RF data。 + 回傳:若是 DONE,回傳 done_sysid;若是 MAVLink data,回傳 None。 + """ + done_sysid = self.handle_done_report(rf_data) + if done_sysid is not None: + return done_sysid + try: - # 逐字節解析MAVLink - for byte in data: + for byte in rf_data: msg = self.mav_decoder.parse_char(bytes([byte])) if msg: sysid = msg.get_srcSystem() compid = msg.get_srcComponent() seq = msg.get_seq() - if sysid == 0: continue - - if self.serial_transport: - serial_to_sysid[self.serial_transport.serial_port] = sysid - serial_last_mavlink_time[self.serial_transport.serial_port] = time.time() - self.serial_transport.send_at_command_db() - - # === 新增:計算丟包率 === - try: - loss_rate = calculate_packet_loss(sysid, compid, seq) - # print(f"[SYSID:{sysid}] 丟包率: {loss_rate:.1f}%, SEQ: {seq}") - except Exception as e: - print(f"[UDP:{self.udp_port}] 丟包率計算錯誤: {e}") - - except Exception as e: - print(f"[UDP:{self.udp_port} MAVLink Decode Error] {e}") -def start_plotting(): - # 創建子圖:上方RSSI,下方丟包率 - fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8)) + # MAVLink sysid 用於 packet loss 計算;XBee src64 用於 RSSI 歸屬。 + if src64 is not None: + learn_xbee_source(sysid, src64) + + calculate_packet_loss(sysid, compid, seq) + except Exception: + pass + + if self.transport: + self.transport.sendto(rf_data, (UDP_REMOTE_IP, self.udp_port)) + + return None + + +async def setup_bridge(config): + port, udp = config['serial_port'], config['udp_port'] + try: + ser = serial.Serial(port, SERIAL_BAUDRATE) + ser.close() + except Exception: + print(f"[{port}] Serial open failed, skip.") + return None + + loop = asyncio.get_running_loop() + udp_handler = UDPHandler(udp) + await loop.create_datagram_endpoint(lambda: udp_handler, local_addr=('0.0.0.0', 0)) + + serial_proto = SerialToUDP(udp_handler, port) + await serial_asyncio.create_serial_connection(loop, lambda: serial_proto, port, baudrate=SERIAL_BAUDRATE) + return serial_proto + + +async def tdma_scheduler(serial_protocols): + print(f"Packet-size TDMA Scheduler Started... 找到 {len(serial_protocols)} 個可用端口") + + for sysid in ACTIVE_SYSIDS: + tdma_done_events[sysid] = asyncio.Event() + + while True: + # GCS 下行:每輪小批量送出,不再固定空等 100 ms + for sp in serial_protocols: + if hasattr(sp, 'flush_gcs_queue'): + sp.flush_gcs_queue() + await asyncio.sleep(current_guard_ms / 1000.0) + + # UAV 上行:以 byte quota 授權,不以固定 time slot 等待 + for sysid in ACTIVE_SYSIDS: + state = uav_states[sysid] + + if state['mode'] == 'INITIALIZING': + for _ in range(4): + await grant_one_uav(serial_protocols, sysid, INIT_GRANT_BYTES) + else: + await grant_one_uav(serial_protocols, sysid, current_grant_bytes) + + +async def async_main(): + protocols = await asyncio.gather(*(setup_bridge(cfg) for cfg in CONFIGS)) + valid_protocols = [p for p in protocols if p is not None] + if valid_protocols: + asyncio.create_task(tdma_scheduler(valid_protocols)) + else: + print('No valid serial ports found.') + await asyncio.Future() + + +# === GUI 介面區 === + +def start_gui(): + root = tk.Tk() + root.title('UAV Packet-size TDMA Control Station - RSSI avg5') + root.geometry('1300x880') + + # --- 左側控制面板 --- + control_frame = tk.Frame(root, width=350, bg='#f0f0f0', padx=20, pady=20) + control_frame.pack(side=tk.LEFT, fill=tk.Y) + + tk.Label(control_frame, text='Packet-size TDMA 控制', font=('Arial', 14, 'bold'), bg='#f0f0f0').pack(pady=10) + + grant_var = tk.IntVar(value=current_grant_bytes) + + def on_grant_change(val): + global current_grant_bytes + current_grant_bytes = int(float(val)) + grant_label.config(text=f'每台授權: {current_grant_bytes} bytes') + + grant_slider = ttk.Scale(control_frame, from_=100, to_=1800, orient='horizontal', variable=grant_var, command=on_grant_change) + grant_slider.pack(fill=tk.X, pady=10) + grant_label = tk.Label(control_frame, text=f'每台授權: {current_grant_bytes} bytes', bg='#f0f0f0') + grant_label.pack() + + guard_var = tk.IntVar(value=current_guard_ms) + + def on_guard_change(val): + global current_guard_ms + current_guard_ms = int(float(val)) + guard_label.config(text=f'切換保護時間: {current_guard_ms} ms') + + tk.Label(control_frame, text='Guard Time', font=('Arial', 11, 'bold'), bg='#f0f0f0').pack(pady=(20, 0)) + guard_slider = ttk.Scale(control_frame, from_=0, to_=100, orient='horizontal', variable=guard_var, command=on_guard_change) + guard_slider.pack(fill=tk.X, pady=10) + guard_label = tk.Label(control_frame, text=f'切換保護時間: {current_guard_ms} ms', bg='#f0f0f0') + guard_label.pack() + + tk.Label(control_frame, text='群機初始化控制 (INIT)', font=('Arial', 14, 'bold'), bg='#f0f0f0').pack(pady=(30, 10)) - def update(frame): + init_var = tk.IntVar(value=0) + radios = {} + status_labels = {} + + def on_radio_change(): + selected = init_var.get() + for sysid in ACTIVE_SYSIDS: + if sysid == selected: + uav_states[sysid]['mode'] = 'INITIALIZING' + else: + uav_states[sysid]['mode'] = 'NORMAL' + + tk.Radiobutton(control_frame, text='全體 NORMAL', variable=init_var, value=0, + command=on_radio_change, bg='#f0f0f0', font=('Arial', 11, 'bold'), fg='blue').pack(anchor=tk.W, pady=(0, 10)) + + for sysid in ACTIVE_SYSIDS: + frame = tk.Frame(control_frame, bg='#f0f0f0') + frame.pack(fill=tk.X, pady=5) + + rb = tk.Radiobutton(frame, text=f'SYSID {sysid} 專屬載入', variable=init_var, value=sysid, + command=on_radio_change, bg='#f0f0f0', font=('Arial', 11)) + rb.pack(side=tk.LEFT) + radios[sysid] = rb + + lbl = tk.Label(frame, text='NORMAL', font=('Arial', 10, 'bold'), fg='green', bg='#f0f0f0') + lbl.pack(side=tk.RIGHT, padx=5) + status_labels[sysid] = lbl + + is_locked = False + + def toggle_tdma_mode(): + nonlocal is_locked + is_locked = not is_locked + + if is_locked: + init_var.set(0) + on_radio_change() + for rb in radios.values(): + rb.config(state=tk.DISABLED) + lock_btn.config(text='點擊解鎖 (允許重新下載參數)', bg='orange') + print('已鎖定進入純 Packet-size TDMA 模式') + else: + for rb in radios.values(): + rb.config(state=tk.NORMAL) + lock_btn.config(text='參數載入完畢,鎖定進入 TDMA', bg='#4CAF50') + print('已解除鎖定,可重新分配特權') + + lock_btn = tk.Button(control_frame, text='參數載入完畢,鎖定進入 TDMA', + font=('Arial', 12, 'bold'), bg='#4CAF50', fg='white', + command=toggle_tdma_mode) + lock_btn.pack(pady=25, fill=tk.X) + + report_title = tk.Label(control_frame, text='最近 DONE 回報', font=('Arial', 12, 'bold'), bg='#f0f0f0') + report_title.pack(pady=(10, 5)) + report_labels = {} + for sysid in ACTIVE_SYSIDS: + lbl = tk.Label(control_frame, text=f'SYSID {sysid}: sent=0, remain=0', font=('Arial', 10), bg='#f0f0f0') + lbl.pack(anchor=tk.W) + report_labels[sysid] = lbl + + rssi_title = tk.Label(control_frame, text=f'RSSI 最近 {RSSI_AVG_WINDOW} 次平均', font=('Arial', 12, 'bold'), bg='#f0f0f0') + rssi_title.pack(pady=(18, 5)) + rssi_labels = {} + for sysid in ACTIVE_SYSIDS: + lbl = tk.Label(control_frame, text=f'SYSID {sysid}: RSSI avg -- dBm', font=('Arial', 10), bg='#f0f0f0', justify=tk.LEFT) + lbl.pack(anchor=tk.W) + rssi_labels[sysid] = lbl + + def update_status_gui(): + now = time.time() + for sysid, lbl in status_labels.items(): + mode = uav_states[sysid]['mode'] + if mode == 'INITIALIZING': + lbl.config(text='INIT', fg='orange') + else: + lbl.config(text='NORMAL', fg='green') + + for sysid, lbl in report_labels.items(): + rep = tdma_last_reports[sysid] + age = now - rep['time'] if rep['time'] else 999.0 + age_text = f'{age:.1f}s ago' if age < 99 else 'no report' + lbl.config(text=f"SYSID {sysid}: sent={rep['sent_len']}, remain={rep['remain_len']} ({age_text})") + + for sysid, lbl in rssi_labels.items(): + st = rssi_latest_stats[sysid] + if st['avg'] is None: + lbl.config(text=f'SYSID {sysid}: RSSI avg -- dBm') + else: + age = now - st['time'] + addr_text = format_addr64(st['src64']) + lbl.config(text=( + f"SYSID {sysid}: avg={st['avg']:.1f} dBm, raw={st['raw']} dBm, " + f"n={st['count']} ({age:.1f}s)\n src64={addr_text}" + )) + + root.after(500, update_status_gui) + + update_status_gui() + + # --- 右側圖表區 --- + plot_frame = tk.Frame(root) + plot_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) + + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(9.5, 8.3), dpi=100) + canvas = FigureCanvasTkAgg(fig, master=plot_frame) + canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True) + + def update_plot(frame): ax1.clear() ax2.clear() - - # === RSSI 圖 === - ax1.set_title("RSSI", fontsize=14) - ax1.set_xlabel("Time (s ago)") - ax1.set_ylabel("RSSI (dBm)") - ax1.set_xlim(30, 0) # 顯示最近10秒 + ax1.set_title(f'RSSI avg{RSSI_AVG_WINDOW} from XBee ATDB after DONE', fontsize=12) + ax1.set_xlim(10, 0) ax1.set_ylim(-100, -10) ax1.grid(True, alpha=0.3) - # === 丟包率圖 === - ax2.set_title("Packet Loss Rate", fontsize=14) - ax2.set_xlabel("Time (s ago)") - ax2.set_ylabel("Loss Rate (%)") - ax2.set_xlim(10, 0) + ax2.set_title('Packet Loss Rate (5s Window)', fontsize=12) + ax2.set_xlim(10, 0) ax2.set_ylim(0, 100) ax2.grid(True, alpha=0.3) now = time.time() - - # 顏色列表 - colors = ['blue', 'red', 'green', 'orange', 'purple', 'brown', 'pink', 'gray'] - - for i, sysid in enumerate(sorted(rssi_history.keys())): + colors = ['blue', 'red', 'green', 'orange', 'purple', 'brown'] + + try: + sysids = sorted(list(set(list(rssi_history.keys()) + list(packet_loss_history.keys()) + ACTIVE_SYSIDS))) + except RuntimeError: + return + + loss_labels = [] + + for i, sysid in enumerate(sysids): color = colors[i % len(colors)] - - # === 繪製 RSSI === - rssi_recent_indices = [ - idx for idx, ts in enumerate(time_history[sysid]) if now - ts <= 30 - ] - if rssi_recent_indices: - rssi_t = [now - time_history[sysid][idx] for idx in rssi_recent_indices] - rssi_r = [rssi_history[sysid][idx] for idx in rssi_recent_indices] - if rssi_t and rssi_r: - ax1.plot(rssi_t, rssi_r, label=f"SYSID:{sysid}", color=color, linewidth=2) - - # 在RSSI圖上顯示當前數值 - if rssi_r: - latest_rssi = rssi_r[-1] - ax1.text(2, latest_rssi, f'{latest_rssi:.0f}dBm', - bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.7), - fontsize=10, fontweight='bold', color='white') - - # === 繪製丟包率 === - if sysid in packet_loss_history: - loss_recent_indices = [ - idx for idx, ts in enumerate(packet_loss_time_history[sysid]) if now - ts <= 10 - ] - if loss_recent_indices: - loss_t = [now - packet_loss_time_history[sysid][idx] for idx in loss_recent_indices] - loss_r = [packet_loss_history[sysid][idx] for idx in loss_recent_indices] - if loss_t and loss_r: - ax2.plot(loss_t, loss_r, label=f"SYSID:{sysid}", color=color, linewidth=2, marker='o', markersize=3) - - # 在丟包率圖上顯示當前數值和統計 - if loss_r: - latest_loss = loss_r[-1] - stats = packet_loss_stats[sysid] - - # 顯示丟包率百分比 - ax2.text(2, latest_loss + (i * 8), f'{latest_loss:.1f}%', - bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.7), - fontsize=10, fontweight='bold', color='white') - - # 在圖表左側顯示詳細統計 - stats_text = f"SYSID {sysid}:\nRecieved: {stats['total_received']}\nLoss: {stats['total_lost']}\nLoss Rate: {latest_loss:.1f}%" - ax2.text(28, 85 - (i * 20), stats_text, - bbox=dict(boxstyle="round,pad=0.5", facecolor=color, alpha=0.8), - fontsize=9, color='white', verticalalignment='top') - - # 設置圖例 - ax1.legend(loc="upper right", fontsize=10) - ax2.legend(loc="upper right", fontsize=10) - - ani = animation.FuncAnimation(fig, update, interval=1000) - plt.tight_layout() - plt.show() + try: + t_hist = list(rssi_time_history[sysid]) + r_hist = list(rssi_history[sysid]) + lt_hist = list(packet_loss_time_history.get(sysid, [])) + l_hist = list(packet_loss_history.get(sysid, [])) + except RuntimeError: + continue -async def setup_bridge(config): - serial_port = config["serial_port"] - udp_port = config["udp_port"] - try: - ser = serial.Serial(serial_port, SERIAL_BAUDRATE) - ser.close() - except Exception as e: - print(f"[{serial_port}] 串口打開失敗: {e}") - return - loop = asyncio.get_running_loop() - udp_handler = UDPHandler(udp_port) - try: - await loop.create_datagram_endpoint( - lambda: udp_handler, - local_addr=('0.0.0.0', 0) - ) - except Exception as e: - print(f"[{serial_port}] 無法創建 UDP: {e}") - return - try: - serial_proto = SerialToUDP(udp_handler, serial_port) - await serial_asyncio.create_serial_connection( - loop, lambda: serial_proto, serial_port, baudrate=SERIAL_BAUDRATE - ) - except Exception as e: - print(f"[{serial_port}] 無法建立串口連接:{e}") - traceback.print_exc() - return - print(f"[{serial_port}] Serial <--> UDP:{udp_port} bridge ready.") + rssi_recent = [idx for idx, ts in enumerate(t_hist) if now - ts <= 10] + if rssi_recent: + ax1.plot([now - t_hist[idx] for idx in rssi_recent], + [r_hist[idx] for idx in rssi_recent], + label=f'SYSID:{sysid}', color=color, marker='o', markersize=3) + + loss_recent = [idx for idx, ts in enumerate(lt_hist) if now - ts <= 10] + if loss_recent: + loss_t = [now - lt_hist[idx] for idx in loss_recent] + loss_r = [l_hist[idx] for idx in loss_recent] + ax2.plot(loss_t, loss_r, label=f'SYSID:{sysid}', color=color, marker='o', markersize=3) + + if loss_r: + loss_labels.append({ + 'sysid': sysid, + 'y_real': loss_r[-1], + 'x_real': loss_t[-1], + 'color': color + }) + + if loss_labels: + loss_labels = sorted(loss_labels, key=lambda k: k['y_real']) + min_gap = 12.0 + y_positions = [lbl['y_real'] for lbl in loss_labels] + + for j in range(1, len(y_positions)): + if y_positions[j] - y_positions[j - 1] < min_gap: + y_positions[j] = y_positions[j - 1] + min_gap + + if y_positions[-1] > 90: + shift = y_positions[-1] - 90 + y_positions = [y - shift for y in y_positions] + + for j, lbl in enumerate(loss_labels): + sysid = lbl['sysid'] + color = lbl['color'] + real_y = lbl['y_real'] + text_y = y_positions[j] + + ax2.text(0.5, text_y, f'ID:{sysid} ({real_y:.1f}%)', + bbox=dict(boxstyle='round,pad=0.3', facecolor=color, alpha=0.8), + fontsize=10, fontweight='bold', color='white', + horizontalalignment='right', verticalalignment='center') + + if abs(real_y - text_y) > 1.0: + ax2.plot([lbl['x_real'], 0.5], [real_y, text_y], color=color, linestyle=':', alpha=0.6) + + ax1.legend(loc='upper left') + ax2.legend(loc='upper left') + + ani = animation.FuncAnimation(fig, update_plot, interval=1000) + + def on_closing(): + root.quit() + root.destroy() + + root.protocol('WM_DELETE_WINDOW', on_closing) + root.mainloop() -async def main(): - tasks = [setup_bridge(cfg) for cfg in CONFIGS] - await asyncio.gather(*tasks) - await asyncio.Future() if __name__ == '__main__': - try: - threading.Thread(target=lambda: asyncio.run(main()), daemon=True).start() - start_plotting() - except KeyboardInterrupt: - print("使用者中斷程式") - except Exception as e: - print("程式執行錯誤:") - traceback.print_exc() \ No newline at end of file + threading.Thread(target=lambda: asyncio.run(async_main()), daemon=True).start() + start_gui()