esp32 不寫死sysid

lunu
lenting89 2 weeks ago
parent 0e7070b412
commit ea7774eb9e

@ -5,30 +5,72 @@ import gc
# =========================================================
# ESP32 / MicroPythonUAV 端 XBee <-> Flight Controller Bridge
# Packet-size TDMA + DONE 標籤
# Packet-size TDMA + DONE + SYSID / GCS address 自動學習
#
# 核心行為:
# 1. FC -> ESP32持續把 MAVLink bytes 存進 tx_buf。
# 2. GCS -> ESP32收到 POLL + SYSID + quota_bytes 才允許上行傳送。
# 3. ESP32 -> GCS每次最多送 quota_bytes但只送「完整 MAVLink frame」。
# 4. ESP32 -> GCS送完後獨立送 DONE reportGCS 收到後可立即換下一台 UAV。
# 對應新版 GCS discovery 設計:
# 1. GCS 開機前 30 秒自動送 DISC。
# 2. GCS 之後停止自動 DISC。
# 3. 若後續新增 UAV可在 GCS GUI 按 Scan UAV送 DSCF 強制掃描。
#
# 注意DONE 不是接在 MAVLink bytes 後面,而是獨立 RF payload避免污染 MAVLink stream。
# ESP32 端核心行為:
# 1. FC -> ESP32持續讀 MAVLink bytes存進 tx_buf。
# 2. ESP32 從 FC MAVLink frame header 自動學自己的 MY_SYSID。
# 3. ESP32 收到 GCS 的 DISC / DSCF / POLL 時,從 XBee 0x90 src64 自動學 DEST_64。
# 4. 收到 DISC
# 同一個 GCS 只回一次 HELO避免 HELO 一直洗版。
# 5. 收到 DSCF
# 強制回 HELO給 GCS 手動重新掃描用。
# 6. 收到 POLL + SYSID + quota_bytes
# 若 target_sysid == MY_SYSID才根據 quota_bytes 傳完整 MAVLink frames。
# 7. 傳完本輪資料後,獨立送 DONE report。
#
# 注意:
# - DONE / HELO 都是獨立 RF payload不會接在 MAVLink stream 後面。
# - MY_SYSID 不寫死,由飛控 MAVLink 自動學。
# - DEST_64 不寫死,由 GCS 封包的 XBee 0x90 src64 自動學。
# =========================================================
# ================= 設定區 =================
FC_BAUDRATE = 115200
XB_BAUDRATE = 115200
# 這裡要改成你的地面站 / coordinator XBee 64-bit address
DEST_64 = b'\x00\x13\xA2\x00\x42\x5B\x9D\xC8'
# 不寫死地面站 XBee address。
# 收到 GCS 的 DISC / DSCF / POLL / 下行 MAVLink 時,
# 從 XBee 0x90 frame 的 src64 自動學到。
DEST_64 = None
# 不寫死每台 UAV 的 SYSID。
# 會從 FC -> ESP32 的 MAVLink frame header 自動學到。
MY_SYSID = None
# 為了避免雜訊誤判 SYSID要求連續確認幾次相同 SYSID 才正式採用。
SYSID_LEARN_CONFIRM_COUNT = 3
_sysid_candidate = None
_sysid_candidate_count = 0
# XBee API Transmit Request 內每次 RF payload 最大切片。
# 900HP / DigiMesh 實測常用 80~100 bytes 較穩。
XBEE_MAX_PAYLOAD = 100
# 每台 UAV 都要設定成自己的 SYSID
MY_SYSID = 15 # 第二台可改 10第三台可改 15依你的 MAVLink SYSID 設定
# GCS -> UAV discovery
# DISC一般 discovery同一個 GCS 只回一次 HELO。
# DSCFforce discovery收到後強制回 HELO給 GUI Scan UAV 按鍵用。
DISC_MAGIC = b'DISC'
DISC_FORCE_MAGIC = b'DSCF'
# UAV -> GCS hello
# HELO + sysid(1)
HELLO_MAGIC = b'HELO'
# 一般 HELO 最小間隔,避免短時間內重複回太多。
HELLO_MIN_INTERVAL_MS = 1000
_last_hello_ms = 0
# 對同一個 GCS 是否已經送過 HELO。
# 收到 DISC 時,如果 HELLO_SENT=True就不再回 HELO。
# 收到 DSCF 時,不管 HELLO_SENT都強制回 HELO。
HELLO_SENT = False
# GCS -> UAV poll 格式:
# 舊版b'POLL' + sysid(1)
@ -43,21 +85,86 @@ DONE_MAGIC = b'DONE'
# ESP32 RAM 有限,不要讓 FC 資料無限累積
MAX_BUF_SIZE = 6144
READ_FC_BYTES = 250
# UART 腳位。若你的接線不同,只要改這裡。
FC_UART_ID = 1
FC_TX_PIN = 32
FC_RX_PIN = 33
XB_UART_ID = 2
XB_TX_PIN = 25
XB_RX_PIN = 26
# =========================================
uart_fc = UART(1, baudrate=FC_BAUDRATE, tx=32, rx=33, rxbuf=4096)
uart_xb = UART(2, baudrate=XB_BAUDRATE, tx=25, rx=26, rxbuf=4096)
uart_fc = UART(FC_UART_ID, baudrate=FC_BAUDRATE, tx=FC_TX_PIN, rx=FC_RX_PIN, rxbuf=4096)
uart_xb = UART(XB_UART_ID, baudrate=XB_BAUDRATE, tx=XB_TX_PIN, rx=XB_RX_PIN, rxbuf=4096)
tx_buf = bytearray() # FC -> GCS 等待 TDMA poll 的 MAVLink stream
rx_buf = bytearray() # XBee API frame parser buffer
# =========================================================
# 基本工具函式
# =========================================================
def get_checksum(data):
return 0xFF - (sum(data) & 0xFF)
def is_valid_addr64(addr):
if addr is None:
return False
if len(addr) != 8:
return False
if addr == b'\x00\x00\x00\x00\x00\x00\x00\x00':
return False
return True
def learn_dest64_from_src64(src64):
"""
UAV 收到 GCS XBee 0x90 frame
0x90 裡面的 src64 就是 GCS / coordinator XBee 64-bit address
若第一次學到 GCS address
DEST_64 = src64
HELLO_SENT = False
若後來收到不同 src64
視為地面站 XBee 改變更新 DEST_64並允許重新回 HELO
"""
global DEST_64
global HELLO_SENT
if not is_valid_addr64(src64):
return False
src64 = bytes(src64)
if DEST_64 is None:
DEST_64 = src64
HELLO_SENT = False
return True
if DEST_64 != src64:
DEST_64 = src64
HELLO_SENT = False
return True
return False
def build_api_tx_frame(payload):
"""建立 XBee API frame type 0x10把 payload 送到 DEST_64。"""
"""
建立 XBee API frame type 0x10 payload 送到 DEST_64
DEST_64 尚未學到回傳 None
"""
global DEST_64
if DEST_64 is None:
return None
frame_content = bytearray()
frame_content.append(0x10) # Transmit Request
frame_content.append(0x00) # Frame ID = 0不要求 XBee ACK降低延遲
@ -68,17 +175,25 @@ def build_api_tx_frame(payload):
frame_content.extend(payload)
length = len(frame_content)
packet = bytearray()
packet.append(0x7E)
packet.append((length >> 8) & 0xFF)
packet.append(length & 0xFF)
packet.extend(frame_content)
packet.append(get_checksum(frame_content))
return packet
def send_to_xbee_chunked(payload):
"""把 payload 依 XBEE_MAX_PAYLOAD 切成多個 XBee API TX frame 送出。"""
"""
payload XBEE_MAX_PAYLOAD 切成多個 XBee API TX frame 送出
DEST_64 尚未學到直接不送回傳 False
"""
if DEST_64 is None:
return False
total_len = len(payload)
sent_len = 0
@ -86,22 +201,95 @@ def send_to_xbee_chunked(payload):
end_len = min(sent_len + XBEE_MAX_PAYLOAD, total_len)
chunk = payload[sent_len:end_len]
sent_len = end_len
uart_xb.write(build_api_tx_frame(chunk))
pkt = build_api_tx_frame(chunk)
if pkt is None:
return False
uart_xb.write(pkt)
# 很短的讓步,避免 ESP32 UART/XBee 本地 buffer 瞬間塞爆
time.sleep_ms(1)
return True
# =========================================================
# HELO / DONE 回報
# =========================================================
def send_hello_report(force=False):
"""
UAV 回報自己存在
HELO + MY_SYSID
force=False
HELLO_SENT=True代表同一個 GCS 已經註冊過不再回 HELO
force=True
不管 HELLO_SENT都重新回 HELO
用於 GCS GUI Scan UAV DSCF
"""
global _last_hello_ms
global HELLO_SENT
if MY_SYSID is None:
return False
if DEST_64 is None:
return False
if HELLO_SENT and not force:
return False
now = time.ticks_ms()
if not force:
if time.ticks_diff(now, _last_hello_ms) < HELLO_MIN_INTERVAL_MS:
return False
payload = HELLO_MAGIC + struct.pack('>B', MY_SYSID)
pkt = build_api_tx_frame(payload)
if pkt is None:
return False
uart_xb.write(pkt)
_last_hello_ms = now
HELLO_SENT = True
time.sleep_ms(1)
return True
def send_done_report(sent_len, remain_len):
"""送出 TDMA DONE 標籤。此封包不會轉給飛控,只給 GCS scheduler 使用。"""
"""
送出 TDMA DONE 標籤
此封包不會轉給飛控只給 GCS scheduler 使用
DONE payload:
b'DONE' + sysid(1) + sent_len(2) + remain_len(2)
"""
if MY_SYSID is None:
return False
if DEST_64 is None:
return False
if sent_len > 65535:
sent_len = 65535
if remain_len > 65535:
remain_len = 65535
payload = DONE_MAGIC + struct.pack('>BHH', MY_SYSID, sent_len, remain_len)
uart_xb.write(build_api_tx_frame(payload))
pkt = build_api_tx_frame(payload)
if pkt is None:
return False
uart_xb.write(pkt)
time.sleep_ms(1)
return True
# =========================================================
# MAVLink frame 解析與 MY_SYSID 自動學習
# =========================================================
def find_first_mavlink_magic(buf):
"""找 MAVLink v1/v2 magic0xFE 或 0xFD。"""
@ -122,6 +310,7 @@ def mavlink_frame_length(buf, start_idx):
MAVLink v1:
magic 0xFE, payload_len at byte 1, total = payload_len + 8
MAVLink v2:
magic 0xFD, payload_len at byte 1, incompat_flags at byte 2,
total = payload_len + 12 signed +13
@ -132,7 +321,6 @@ def mavlink_frame_length(buf, start_idx):
magic = buf[start_idx]
if magic == 0xFE:
# v1 至少需要 magic + len
if len(buf) - start_idx < 2:
return None
payload_len = buf[start_idx + 1]
@ -142,7 +330,6 @@ def mavlink_frame_length(buf, start_idx):
return total_len
if magic == 0xFD:
# v2 至少需要 magic + len + incompat_flags
if len(buf) - start_idx < 3:
return None
payload_len = buf[start_idx + 1]
@ -156,6 +343,90 @@ def mavlink_frame_length(buf, start_idx):
return None
def get_mavlink_sysid(buf, start_idx):
"""
MAVLink frame header 讀出 SYSID
不做 CRC 驗證但會搭配多次確認降低誤判
"""
if start_idx >= len(buf):
return None
magic = buf[start_idx]
# MAVLink v1:
# 0xFE | len | seq | sysid | compid | msgid | payload | checksum
if magic == 0xFE:
if len(buf) - start_idx >= 6:
return buf[start_idx + 3]
# MAVLink v2:
# 0xFD | len | incompat | compat | seq | sysid | compid | msgid(3) | payload | checksum
if magic == 0xFD:
if len(buf) - start_idx >= 10:
return buf[start_idx + 5]
return None
def learn_my_sysid_from_tx_buf():
"""
FC -> ESP32 tx_buf 中找完整 MAVLink frame
並自動學習飛控的 MAVLink SYSID
為避免雜訊造成誤判要求連續多次看到相同 SYSID 才正式設定 MY_SYSID
"""
global MY_SYSID
global _sysid_candidate
global _sysid_candidate_count
global HELLO_SENT
global tx_buf
if len(tx_buf) == 0:
return
idx = 0
checked = 0
while idx < len(tx_buf) and checked < 8:
sub = tx_buf[idx:]
rel_start = find_first_mavlink_magic(sub)
if rel_start == -1:
return
start = idx + rel_start
frame_len = mavlink_frame_length(tx_buf, start)
if frame_len is None:
return
sysid = get_mavlink_sysid(tx_buf, start)
if sysid is not None and sysid > 0:
if MY_SYSID is not None:
# 正常情況不任意改變已學到的 SYSID。
# 如果你真的會在飛行中改 FC SYSID再另行設計重學條件。
return
if _sysid_candidate == sysid:
_sysid_candidate_count += 1
else:
_sysid_candidate = sysid
_sysid_candidate_count = 1
if _sysid_candidate_count >= SYSID_LEARN_CONFIRM_COUNT:
MY_SYSID = _sysid_candidate
# 新學到 SYSID 後,允許對已知 GCS 回一次 HELO
HELLO_SENT = False
return
idx = start + frame_len
checked += 1
# =========================================================
# tx_buf MAVLink frame 取出與裁切
# =========================================================
def pop_mavlink_frames_by_quota(quota_bytes):
"""
tx_buf 取出不超過 quota_bytes 的完整 MAVLink frames
@ -173,6 +444,7 @@ def pop_mavlink_frames_by_quota(quota_bytes):
if start == -1:
tx_buf = bytearray()
return b''
if start > 0:
tx_buf = tx_buf[start:]
@ -212,7 +484,9 @@ def pop_mavlink_frames_by_quota(quota_bytes):
def trim_tx_buffer_if_needed():
"""tx_buf 過大時,丟掉較舊的完整 MAVLink frames保留較新的資料。"""
"""
tx_buf 過大時丟掉較舊的完整 MAVLink frames保留較新的資料
"""
global tx_buf
if len(tx_buf) <= MAX_BUF_SIZE:
@ -225,6 +499,7 @@ def trim_tx_buffer_if_needed():
if start == -1:
tx_buf = bytearray()
return
if start > 0:
tx_buf = tx_buf[start:]
continue
@ -249,6 +524,13 @@ def flush_tx_buffer(grant_bytes):
"""
global tx_buf
# 尚未學到 SYSID 或地面站地址,不進行上行傳送。
if MY_SYSID is None:
return
if DEST_64 is None:
return
if grant_bytes <= 0:
send_done_report(0, len(tx_buf))
return
@ -262,11 +544,16 @@ def flush_tx_buffer(grant_bytes):
send_done_report(sent_len, len(tx_buf))
# =========================================================
# GCS control payload 解析
# =========================================================
def parse_poll_payload(real_data):
"""
支援兩種 poll
舊版POLL + sysid 5 bytes
新版POLL + sysid + quota 7 bytes
回傳 (target_sysid, grant_bytes)不是 poll 則回傳 (None, None)
"""
if not real_data.startswith(POLL_MAGIC):
@ -283,6 +570,27 @@ def parse_poll_payload(real_data):
return None, None
def is_discovery_payload(real_data):
"""
一般 discovery
b'DISC'
"""
return real_data.startswith(DISC_MAGIC)
def is_force_discovery_payload(real_data):
"""
強制 discovery
b'DSCF'
用於 GCS GUI Scan UAV 按鍵
"""
return real_data.startswith(DISC_FORCE_MAGIC)
# =========================================================
# XBee API RX parser
# =========================================================
def process_xbee_buffer():
global rx_buf
@ -313,45 +621,115 @@ def process_xbee_buffer():
checksum_recv = rx_buf[3 + pkt_len]
if get_checksum(packet) == checksum_recv:
if packet[0] == 0x90:
# XBee Receive Packet 0x90RF data 從 packet[12:] 開始
if len(packet) > 0 and packet[0] == 0x90:
# XBee Receive Packet 0x90:
# packet = 90 | src64(8) | src16(2) | options(1) | RF data
if len(packet) >= 12:
src64 = bytes(packet[1:9])
real_data = packet[12:]
# -------------------------------------------------
# GCS 一般 DISC
# 同一個 GCS 只回一次 HELO避免一直 HELO。
# -------------------------------------------------
if is_discovery_payload(real_data):
learn_dest64_from_src64(src64)
learn_my_sysid_from_tx_buf()
if MY_SYSID is not None and DEST_64 is not None:
# 加一點由 SYSID / 時間產生的延遲,降低多機同時 HELO 撞包機率。
# 若 HELLO_SENT=Truesend_hello_report() 會自己擋掉。
delay_ms = 20 + (((MY_SYSID * 37) + (time.ticks_ms() & 0xFF)) % 180)
time.sleep_ms(delay_ms)
send_hello_report(force=False)
# -------------------------------------------------
# GCS 強制 DSCF
# 不管 HELLO_SENT重新回 HELO。
# 用於 GCS GUI Scan UAV 按鍵。
# -------------------------------------------------
elif is_force_discovery_payload(real_data):
learn_dest64_from_src64(src64)
learn_my_sysid_from_tx_buf()
if MY_SYSID is not None and DEST_64 is not None:
delay_ms = 20 + (((MY_SYSID * 37) + (time.ticks_ms() & 0xFF)) % 180)
time.sleep_ms(delay_ms)
send_hello_report(force=True)
else:
# -------------------------------------------------
# 判斷是不是 POLL
# -------------------------------------------------
target_sysid, grant_bytes = parse_poll_payload(real_data)
if target_sysid is not None:
if target_sysid == MY_SYSID:
# 收到 GCS 的 POLL也可以從 src64 學地面站地址。
learn_dest64_from_src64(src64)
# 嘗試從 FC buffer 學 MY_SYSID。
learn_my_sysid_from_tx_buf()
# 只有 target_sysid 等於自己飛控的 SYSID才回傳。
if MY_SYSID is not None and target_sysid == MY_SYSID:
flush_tx_buffer(grant_bytes)
else:
# -------------------------------------------------
# 一般 GCS -> FC MAVLink 下行資料
# 只接受來自已知 GCS address 的資料,避免其他節點污染飛控。
# -------------------------------------------------
if DEST_64 is None:
learn_dest64_from_src64(src64)
if DEST_64 is not None and src64 == DEST_64:
uart_fc.write(real_data)
rx_buf = rx_buf[total_len:]
else:
# checksum 錯誤,丟掉第一個 byte重新對齊
rx_buf = rx_buf[1:]
# =========================================================
# 初始清理
gc_ran = False
# =========================================================
try:
gc.collect()
gc_ran = True
except Exception:
pass
# =========================================================
# 主迴圈
# =========================================================
while True:
try:
# -------------------------------------------------
# FC -> ESP32 tx buffer
# -------------------------------------------------
if uart_fc.any():
data = uart_fc.read(READ_FC_BYTES)
if data:
tx_buf.extend(data)
# 從飛控 MAVLink stream 自動學 MY_SYSID
learn_my_sysid_from_tx_buf()
# 避免 tx_buf 無限長大
trim_tx_buffer_if_needed()
# -------------------------------------------------
# XBee -> ESP32
# -------------------------------------------------
if uart_xb.any():
chunk = uart_xb.read()
if chunk:
rx_buf.extend(chunk)
process_xbee_buffer()
time.sleep_ms(1)
@ -360,7 +738,12 @@ while True:
# OOM 時丟掉暫存資料,避免整顆 ESP32 死掉
tx_buf = bytearray()
rx_buf = bytearray()
try:
gc.collect()
except Exception:
pass
time.sleep_ms(10)
except Exception:

@ -14,18 +14,27 @@ from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
# =========================================================
# GCS 端XBee Serial <-> UDP Bridge
# Packet-size TDMA + DONE 標籤 + 精準 RSSI 歸屬 + 最近 5 次 RSSI 平均
# Packet-size TDMA + DONE + RSSI + Auto Discovery + Force Scan
#
# 對應 ESP32 端:
# - DISC一般 discovery。UAV 對同一個 GCS 只回一次 HELO。
# - DSCFforce discovery。GUI 按 Scan UAV 時送出UAV 強制回 HELO。
# - HELOUAV -> GCS回報 SYSID。
# - POLLGCS -> UAV授權某台 SYSID 傳送 quota_bytes。
# - DONEUAV -> GCS表示本輪資料已送完GCS 可立即換下一台。
#
# 核心行為:
# 1. GCS 對某個 SYSID 發 POLL + quota_bytes。
# 2. UAV 收到自己的 POLL 後,送出最多 quota_bytes 的完整 MAVLink frames。
# 3. UAV 送完後回 DONEGCS 收到 DONE 後立即 poll 下一台。
# 4. 收到 DONE 後GCS 立即發 ATDB 查詢本地 XBee 最近一包 RF packet 的 RSSI。
# 5. 因為 DONE payload 帶有 SYSID且 XBee 0x90 frame 帶有 src64RSSI 不再用 MAVLink SYSID 猜。
# 6. RSSI 顯示採用「最近 5 次 DB RSSI」平均值降低跳動。
# 1. 程式啟動前 AUTO_DISCOVERY_DURATION_SEC 秒自動 broadcast DISC。
# 2. 收到 HELO 後GCS 從 XBee 0x90 src64 學 UAV XBee address
# 從 HELO payload 學 MAVLink SYSID建立 SYSID <-> src64 對應。
# 3. TDMA scheduler 依 ACTIVE_SYSIDS 輪詢。
# 4. send_poll() 若已知 SYSID 對應 src64則 unicast未知則 fallback broadcast。
# 5. 收到 DONE 後立即查 ATDB將 RSSI 歸屬到該 SYSID。
# 6. GUI 的 Scan UAV 按鈕會送 DSCF burst讓新加入或已存在的 UAV 強制回 HELO。
# =========================================================
# === 多組設備設定 ===
# ================= 多組設備設定 =================
# Windows 測試時建議先只留你的實際 COM port例如 COM15。
CONFIGS = [
{"serial_port": "/dev/ttyUSB0", "udp_port": 14551},
@ -37,36 +46,43 @@ CONFIGS = [
SERIAL_BAUDRATE = 115200
UDP_REMOTE_IP = '127.0.0.1'
# Broadcast由 RF payload 內的 SYSID 決定哪台 ESP32 回應。
# 若你要讓外部程式固定送 UDP 到本 bridge可改成 True。
# False本程式只負責 sendto 到 udp_port本地接收 port 由系統隨機配置。
# True :本程式綁定 local UDP port = config['udp_port']。
UDP_LISTEN_FIXED_PORT = False
# XBee broadcast address
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF'
ACTIVE_SYSIDS = [3, 10, 15]
# 起始已知 SYSID。完全自動 discovery 可設空 list。
# 若你想保留舊版行為,可改成 [3, 10, 15]。
INITIAL_ACTIVE_SYSIDS = []
ACTIVE_SYSIDS = list(INITIAL_ACTIVE_SYSIDS)
# 如果你知道每台 UAV XBee 的 64-bit address可以填在這裡。
# 即使不填,本程式也會從 DONE frame 自動學習 src64 -> SYSID。
# 範例:
# XBEE_ADDR64_TO_SYSID = {
# b'\x00\x13\xA2\x00\x42\x5B\x9D\xC8': 15,
# b'\x00\x13\xA2\x00\x42\x5B\x9D\xAA': 10,
# b'\x00\x13\xA2\x00\x42\x5B\x9D\xBB': 3,
# }
# 若你知道每台 UAV XBee 的 64-bit address可以預先填入。
# 即使不填,本程式也會從 HELO / DONE / MAVLink data 自動學習。
XBEE_ADDR64_TO_SYSID = {}
AUTO_LEARN_XBEE_ADDR = True
# GCS -> UAV poll 格式b'POLL' + sysid(1) + quota_bytes(2)
POLL_MAGIC = b'POLL'
# 控制封包 magic
DISC_MAGIC = b'DISC' # GCS -> UAV 一般 discovery
DISC_FORCE_MAGIC = b'DSCF' # GCS -> UAV 強制 discoveryGUI Scan UAV 用
HELLO_MAGIC = b'HELO' # UAV -> GCS: HELO + sysid(1)
POLL_MAGIC = b'POLL' # GCS -> UAV: POLL + sysid(1) + quota_bytes(2)
DONE_MAGIC = b'DONE' # UAV -> GCS: DONE + sysid(1) + sent_len(2) + remain_len(2)
# UAV -> GCS done 格式b'DONE' + sysid(1) + sent_len(2) + remain_len(2)
DONE_MAGIC = b'DONE'
# ================= Discovery 參數 =================
AUTO_DISCOVERY_ENABLED = True
AUTO_DISCOVERY_DURATION_SEC = 30.0
DISCOVERY_INTERVAL_SEC = 1.5
# === TDMA 參數 ===
# 每次 poll 授權該 UAV 最多送多少 MAVLink 原始資料 bytes。
current_grant_bytes = 600
# GUI 按 Scan UAV 時送出 DSCF burst
FORCE_DISCOVERY_BURST_COUNT = 5
FORCE_DISCOVERY_BURST_GAP_SEC = 0.20
# 切換下一台 UAV 前的保護時間,避免最後一包還在 XBee/UART buffer 裡。
# ================= TDMA 參數 =================
current_grant_bytes = 600
current_guard_ms = 20
# INITIALIZING 模式給較大的 quota用於參數載入。
INIT_GRANT_BYTES = 1200
# 丟包率計算時間視窗
@ -75,18 +91,17 @@ LOSS_TIME_WINDOW_SEC = 5.0
# RSSI 最近 N 次平均
RSSI_AVG_WINDOW = 5
# 是否也在資料封包時查 DB。預設 False只在 DONE 後查 DB較不干擾資料流。
# 是否也在資料封包時查 DB。預設 False只在 DONE / HELO 後查 DB較不干擾資料流。
RSSI_QUERY_ON_DATA_FRAME = False
RSSI_DATA_QUERY_MIN_INTERVAL_SEC = 0.5
uav_states = {
sysid: {
"mode": "NORMAL",
} for sysid in ACTIVE_SYSIDS
}
# === 狀態追蹤 ===
rssi_history = defaultdict(lambda: deque(maxlen=5000)) # 畫圖用RSSI avg5
# ================= 全域狀態 =================
uav_states = {}
for sid in ACTIVE_SYSIDS:
uav_states[sid] = {"mode": "NORMAL"}
rssi_history = defaultdict(lambda: deque(maxlen=5000)) # 畫圖用RSSI avgN
rssi_raw_windows = defaultdict(lambda: deque(maxlen=RSSI_AVG_WINDOW))
rssi_time_history = defaultdict(lambda: deque(maxlen=5000))
rssi_latest_stats = defaultdict(lambda: {
@ -110,7 +125,17 @@ learned_sysid_to_addr64 = {}
tdma_done_events = {}
tdma_last_reports = defaultdict(lambda: {'time': 0.0, 'sent_len': 0, 'remain_len': 0})
# === 工具函式 ===
# HELO 回報紀錄
hello_last_reports = defaultdict(lambda: {'time': 0.0, 'src64': None})
# asyncio loop / serial protocols 給 GUI button 使用
ASYNC_LOOP = None
SERIAL_PROTOCOLS = []
# =========================================================
# 工具函式
# =========================================================
def format_addr64(addr):
if not addr:
@ -118,6 +143,29 @@ def format_addr64(addr):
return ''.join(f'{b:02X}' for b in addr)
def ensure_active_sysid(sysid):
"""若發現新的 SYSID加入 ACTIVE_SYSIDS 並建立狀態。"""
if sysid is None:
return
if sysid <= 0:
return
if sysid not in ACTIVE_SYSIDS:
ACTIVE_SYSIDS.append(sysid)
ACTIVE_SYSIDS.sort()
print(f"[DISCOVERY] New SYSID discovered: {sysid}")
if sysid not in uav_states:
uav_states[sysid] = {"mode": "NORMAL"}
if sysid not in tdma_done_events:
try:
tdma_done_events[sysid] = asyncio.Event()
except RuntimeError:
# 若從 GUI thread 呼叫時沒有 running loopscheduler 之後會補建。
pass
def infer_sysid_from_addr64(src64):
if not src64:
return None
@ -129,14 +177,37 @@ def infer_sysid_from_addr64(src64):
def learn_xbee_source(sysid, src64):
"""
建立 UAV XBee src64 <-> MAVLink SYSID 對應
GCS 端收到 UAV 0x90 frame src64 UAV XBee address
"""
if not AUTO_LEARN_XBEE_ADDR:
return
if sysid is None or src64 is None:
return
if sysid <= 0:
return
src64 = bytes(src64)
learned_addr64_to_sysid[src64] = sysid
learned_sysid_to_addr64[sysid] = src64
ensure_active_sysid(sysid)
def get_poll_dest_addr64(sysid):
"""
已知 SYSID -> src64 優先 unicast
未知時 fallback broadcast
"""
if sysid in learned_sysid_to_addr64:
return learned_sysid_to_addr64[sysid]
# 使用者預填的 XBEE_ADDR64_TO_SYSID 反查
for addr, sid in XBEE_ADDR64_TO_SYSID.items():
if sid == sysid:
return addr
return TARGET_ADDR64
def record_rssi(sysid, rssi_positive_db, src64=None):
@ -147,6 +218,8 @@ def record_rssi(sysid, rssi_positive_db, src64=None):
if sysid is None:
return
ensure_active_sysid(sysid)
raw_dbm = -int(rssi_positive_db)
win = rssi_raw_windows[sysid]
win.append(raw_dbm)
@ -166,6 +239,9 @@ def record_rssi(sysid, rssi_positive_db, src64=None):
def calculate_packet_loss(sysid, compid, current_seq):
global mavlink_sequence_tracker, packet_loss_stats
ensure_active_sysid(sysid)
tracker = mavlink_sequence_tracker[sysid]
now = time.time()
@ -216,6 +292,9 @@ def calculate_packet_loss(sysid, compid, current_seq):
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x00) -> bytes:
"""
GCS XBee API TX frame type 0x10
"""
frame = b'\x10' + struct.pack('>B', frame_id) + dest_addr64 + b'\xFF\xFE\x00\x00' + data
return b'\x7E' + struct.pack('>H', len(frame)) + frame + struct.pack('B', 0xFF - (sum(frame) & 0xFF))
@ -236,6 +315,8 @@ def estimate_tdma_timeout(grant_bytes):
async def grant_one_uav(serial_protocols, sysid, grant_bytes):
"""授權一台 UAV收到 DONE 立即結束,否則 timeout 後換下一台。"""
ensure_active_sysid(sysid)
ev = tdma_done_events.get(sysid)
if ev is None:
ev = asyncio.Event()
@ -254,6 +335,10 @@ async def grant_one_uav(serial_protocols, sysid, grant_bytes):
await asyncio.sleep(current_guard_ms / 1000.0)
# =========================================================
# SerialToUDPXBee Serial API parser
# =========================================================
class SerialToUDP(asyncio.Protocol):
def __init__(self, udp_protocol, serial_port):
self.udp_protocol = udp_protocol
@ -269,6 +354,24 @@ class SerialToUDP(asyncio.Protocol):
self.pending_db = {} # frame_id -> {'sysid', 'src64', 'time', 'reason'}
self.last_data_db_query_time = 0.0
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"[{self.serial_port}] Serial connection established.")
# -------------------- GCS -> UAV control --------------------
def send_discovery(self, force=False):
"""
discovery
- force=FalseDISC一般 discoveryUAV 對同一 GCS 只回一次 HELO
- force=True DSCF強制 discoveryUAV 會重新回 HELO
"""
payload = DISC_FORCE_MAGIC if force else DISC_MAGIC
api_frame = build_api_tx_frame(payload, TARGET_ADDR64, 0x00)
self.transport.write(api_frame)
def send_poll(self, target_sysid, grant_bytes=None):
if grant_bytes is None:
grant_bytes = current_grant_bytes
@ -278,14 +381,13 @@ class SerialToUDP(asyncio.Protocol):
self.current_poll_time = time.time()
poll_payload = POLL_MAGIC + struct.pack('>BH', target_sysid, grant_bytes)
api_frame = build_api_tx_frame(poll_payload, TARGET_ADDR64, 0x00)
dest_addr64 = get_poll_dest_addr64(target_sysid)
api_frame = build_api_tx_frame(poll_payload, dest_addr64, 0x00)
self.transport.write(api_frame)
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"[{self.serial_port}] Serial connection established.")
# -------------------- Serial RX parser --------------------
def data_received(self, data):
self.buffer.extend(data)
@ -306,8 +408,7 @@ class SerialToUDP(asyncio.Protocol):
full_length = 3 + length + 1
# 本程式使用 XBee RF payload 約 80~100 bytes正常 length 不會很大。
# 若你把 XBEE_MAX_PAYLOAD 調大,可同步放寬此限制。
if length > 600:
if length > 700:
self.buffer.pop(0)
continue
@ -329,7 +430,7 @@ class SerialToUDP(asyncio.Protocol):
if frame_type == 0x90:
# XBee Receive Packet:
# whole API frame: 7E | len(2) | 90 | src64(8) | src16(2) | options(1) | RF data | checksum
# 7E | len(2) | 90 | src64(8) | src16(2) | options(1) | RF data | checksum
src64 = frame[4:12]
rf_data = frame[15:-1]
self.handle_rx_packet(src64, rf_data)
@ -342,7 +443,7 @@ class SerialToUDP(asyncio.Protocol):
# 其他如 0x8B Transmit Status 可視需求處理;目前忽略。
def handle_rx_packet(self, src64, rf_data):
# 先用 src64 查。若尚未學習,DONE payload 會帶 sysid可建立對應。
# 先用 src64 查。若尚未學習,HELO / DONE payload 會帶 sysid可建立對應。
sysid_hint = infer_sysid_from_addr64(src64)
# 若在最近 poll 時槽內收到資料,也可作為備援歸屬。
@ -350,12 +451,13 @@ class SerialToUDP(asyncio.Protocol):
if time.time() - self.current_poll_time <= 2.0:
sysid_hint = self.current_poll_sysid
done_sysid = self.udp_protocol.process_rf_data(rf_data, src64=src64, sysid_hint=sysid_hint)
result = self.udp_protocol.process_rf_data(rf_data, src64=src64, sysid_hint=sysid_hint)
if done_sysid is not None:
# DONE payload 明確帶 SYSID這裡可精準學習該 SYSID 對應的 XBee src64。
learn_xbee_source(done_sysid, src64)
self.send_at_command_db(done_sysid, src64=src64, reason='DONE')
if result is not None:
# result: ('DONE' or 'HELO', sysid)
reason, sysid = result
learn_xbee_source(sysid, src64)
self.send_at_command_db(sysid, src64=src64, reason=reason)
return
if RSSI_QUERY_ON_DATA_FRAME and sysid_hint is not None:
@ -364,6 +466,8 @@ class SerialToUDP(asyncio.Protocol):
self.send_at_command_db(sysid_hint, src64=src64, reason='DATA')
self.last_data_db_query_time = now
# -------------------- ATDB RSSI --------------------
def next_at_frame_id(self):
self.at_frame_id += 1
if self.at_frame_id > 0xFE:
@ -373,7 +477,7 @@ class SerialToUDP(asyncio.Protocol):
def send_at_command_db(self, sysid, src64=None, reason=''):
"""
查本地 XBee DB DB 代表本地 XBee 最近收到 RF packet RSSI
這裡在 DONE 後立即查所以 RSSI 會歸屬到剛完成 TDMA slot sysid
這裡在 DONE / HELO 後立即查所以 RSSI 會歸屬到剛剛回報sysid
"""
try:
frame_id = self.next_at_frame_id()
@ -429,6 +533,8 @@ class SerialToUDP(asyncio.Protocol):
if sysid is not None:
record_rssi(sysid, rssi_value, src64=src64)
# -------------------- UDP -> UAV downlink --------------------
def write_to_serial(self, data):
self.gcs_tx_queue.extend(data)
@ -436,6 +542,8 @@ class SerialToUDP(asyncio.Protocol):
"""GCS -> UAV 下行資料,小批量送出,避免阻塞上行 TDMA。"""
if not self.gcs_tx_queue:
return
# 若還沒任何 UAV address仍 broadcast。ESP32 端只接受已知 GCS 來源資料。
send_limit = min(len(self.gcs_tx_queue), 150)
data_to_send = self.gcs_tx_queue[:send_limit]
self.gcs_tx_queue = self.gcs_tx_queue[send_limit:]
@ -450,6 +558,8 @@ class SerialToUDP(asyncio.Protocol):
chunk = data[sent_len:end_len]
sent_len = end_len
# 下行命令目前 broadcast所有 ESP32 收到後會寫給 FC。
# 若你要精準下行到某台 UAV需在 MAVLink router 層或 UI 選定 sysid 後 unicast。
api_frame = build_api_tx_frame(chunk, TARGET_ADDR64, 0x00)
self.transport.write(api_frame)
await asyncio.sleep(0.01)
@ -457,6 +567,10 @@ class SerialToUDP(asyncio.Protocol):
pass
# =========================================================
# UDPHandlerUDP / MAVLink / HELO / DONE
# =========================================================
class UDPHandler(asyncio.DatagramProtocol):
def __init__(self, udp_port):
self.udp_port = udp_port
@ -474,35 +588,74 @@ class UDPHandler(asyncio.DatagramProtocol):
if self.serial_transport:
self.serial_transport.write_to_serial(data)
def handle_done_report(self, rf_data):
def handle_hello_report(self, rf_data, src64=None):
"""
HELO + sysid(1)
回傳 sysid None
"""
if len(rf_data) < 5 or not rf_data.startswith(HELLO_MAGIC):
return None
try:
sysid = rf_data[4]
hello_last_reports[sysid] = {
'time': time.time(),
'src64': src64,
}
learn_xbee_source(sysid, src64)
print(f"[HELO] SYSID {sysid}, src64={format_addr64(src64)}")
return sysid
except Exception:
return None
def handle_done_report(self, rf_data, src64=None):
"""
DONE + sysid(1) + sent_len(2) + remain_len(2)
回傳 sysid None
"""
if len(rf_data) < 9 or not rf_data.startswith(DONE_MAGIC):
return None
try:
sysid, sent_len, remain_len = struct.unpack('>BHH', rf_data[4:9])
tdma_last_reports[sysid] = {
'time': time.time(),
'sent_len': sent_len,
'remain_len': remain_len,
}
learn_xbee_source(sysid, src64)
ev = tdma_done_events.get(sysid)
if ev is not None:
ev.set()
return sysid
except Exception:
return None
def process_rf_data(self, rf_data, src64=None, sysid_hint=None):
"""
處理 XBee 0x90 RF data
回傳若是 DONE回傳 done_sysid若是 MAVLink data回傳 None
回傳
('HELO', sysid) 若是 HELO
('DONE', sysid) 若是 DONE
None 若是 MAVLink data
"""
done_sysid = self.handle_done_report(rf_data)
hello_sysid = self.handle_hello_report(rf_data, src64=src64)
if hello_sysid is not None:
return ('HELO', hello_sysid)
done_sysid = self.handle_done_report(rf_data, src64=src64)
if done_sysid is not None:
return done_sysid
return ('DONE', done_sysid)
# MAVLink data
try:
for byte in rf_data:
msg = self.mav_decoder.parse_char(bytes([byte]))
@ -513,11 +666,11 @@ class UDPHandler(asyncio.DatagramProtocol):
if sysid == 0:
continue
# MAVLink sysid 用於 packet loss 計算XBee src64 用於 RSSI 歸屬。
if src64 is not None:
learn_xbee_source(sysid, src64)
calculate_packet_loss(sysid, compid, seq)
except Exception:
pass
@ -527,8 +680,13 @@ class UDPHandler(asyncio.DatagramProtocol):
return None
# =========================================================
# Bridge setup / scheduler / discovery
# =========================================================
async def setup_bridge(config):
port, udp = config['serial_port'], config['udp_port']
try:
ser = serial.Serial(port, SERIAL_BAUDRATE)
ser.close()
@ -538,17 +696,55 @@ async def setup_bridge(config):
loop = asyncio.get_running_loop()
udp_handler = UDPHandler(udp)
await loop.create_datagram_endpoint(lambda: udp_handler, local_addr=('0.0.0.0', 0))
local_port = udp if UDP_LISTEN_FIXED_PORT else 0
await loop.create_datagram_endpoint(lambda: udp_handler, local_addr=('0.0.0.0', local_port))
serial_proto = SerialToUDP(udp_handler, port)
await serial_asyncio.create_serial_connection(loop, lambda: serial_proto, port, baudrate=SERIAL_BAUDRATE)
return serial_proto
async def auto_discovery_task(serial_protocols):
"""
程式啟動前 AUTO_DISCOVERY_DURATION_SEC 秒自動送 DISC
時間到後停止自動 DISC避免 UAV 一直回 HELO
後續新增 UAV 請用 GUI Scan UAV 按鈕送 DSCF
"""
if not AUTO_DISCOVERY_ENABLED:
return
print(f"[DISCOVERY] Auto DISC started for {AUTO_DISCOVERY_DURATION_SEC:.1f} seconds")
start_t = time.time()
while time.time() - start_t < AUTO_DISCOVERY_DURATION_SEC:
for sp in serial_protocols:
if hasattr(sp, 'send_discovery'):
sp.send_discovery(force=False)
await asyncio.sleep(DISCOVERY_INTERVAL_SEC)
print("[DISCOVERY] Auto DISC stopped. Use GUI Scan UAV for new aircraft.")
async def force_discovery_burst(serial_protocols):
"""
GUI Scan UAV DSCF burst
UAV 收到 DSCF 後會強制回 HELO
"""
print("[DISCOVERY] Force scan started")
for _ in range(FORCE_DISCOVERY_BURST_COUNT):
for sp in serial_protocols:
if hasattr(sp, 'send_discovery'):
sp.send_discovery(force=True)
await asyncio.sleep(FORCE_DISCOVERY_BURST_GAP_SEC)
print("[DISCOVERY] Force scan finished")
async def tdma_scheduler(serial_protocols):
print(f"Packet-size TDMA Scheduler Started... 找到 {len(serial_protocols)} 個可用端口")
for sysid in ACTIVE_SYSIDS:
for sysid in list(ACTIVE_SYSIDS):
ensure_active_sysid(sysid)
tdma_done_events[sysid] = asyncio.Event()
while True:
@ -556,13 +752,21 @@ async def tdma_scheduler(serial_protocols):
for sp in serial_protocols:
if hasattr(sp, 'flush_gcs_queue'):
sp.flush_gcs_queue()
await asyncio.sleep(current_guard_ms / 1000.0)
# 若尚未發現任何 UAV避免空轉太快
if not ACTIVE_SYSIDS:
await asyncio.sleep(0.2)
continue
# UAV 上行:以 byte quota 授權,不以固定 time slot 等待
for sysid in ACTIVE_SYSIDS:
state = uav_states[sysid]
for sysid in list(ACTIVE_SYSIDS):
ensure_active_sysid(sysid)
state = uav_states.get(sysid, {"mode": "NORMAL"})
if state['mode'] == 'INITIALIZING':
if state.get('mode') == 'INITIALIZING':
for _ in range(4):
await grant_one_uav(serial_protocols, sysid, INIT_GRANT_BYTES)
else:
@ -570,28 +774,57 @@ async def tdma_scheduler(serial_protocols):
async def async_main():
global ASYNC_LOOP
global SERIAL_PROTOCOLS
ASYNC_LOOP = asyncio.get_running_loop()
protocols = await asyncio.gather(*(setup_bridge(cfg) for cfg in CONFIGS))
valid_protocols = [p for p in protocols if p is not None]
SERIAL_PROTOCOLS = valid_protocols
if valid_protocols:
asyncio.create_task(auto_discovery_task(valid_protocols))
asyncio.create_task(tdma_scheduler(valid_protocols))
else:
print('No valid serial ports found.')
await asyncio.Future()
# === GUI 介面區 ===
# =========================================================
# GUI
# =========================================================
def start_gui():
root = tk.Tk()
root.title('UAV Packet-size TDMA Control Station - RSSI avg5')
root.geometry('1300x880')
root.title('UAV Packet-size TDMA Control Station - Auto Discovery / Force Scan')
root.geometry('1350x900')
# --- 左側控制面板 ---
control_frame = tk.Frame(root, width=350, bg='#f0f0f0', padx=20, pady=20)
control_frame = tk.Frame(root, width=390, bg='#f0f0f0', padx=20, pady=20)
control_frame.pack(side=tk.LEFT, fill=tk.Y)
tk.Label(control_frame, text='Packet-size TDMA 控制', font=('Arial', 14, 'bold'), bg='#f0f0f0').pack(pady=10)
# --- Discovery buttons ---
def on_scan_uav():
if ASYNC_LOOP is not None and SERIAL_PROTOCOLS:
asyncio.run_coroutine_threadsafe(force_discovery_burst(SERIAL_PROTOCOLS), ASYNC_LOOP)
scan_label.config(text='已送出 DSCF 強制掃描')
else:
scan_label.config(text='尚未建立 serial / asyncio loop')
scan_btn = tk.Button(control_frame, text='Scan UAV / 發送 DSCF',
font=('Arial', 12, 'bold'), bg='#2196F3', fg='white',
command=on_scan_uav)
scan_btn.pack(pady=(5, 5), fill=tk.X)
scan_label = tk.Label(control_frame, text=f'開機前 {AUTO_DISCOVERY_DURATION_SEC:.0f}s 自動 DISC之後可手動掃描',
bg='#f0f0f0', font=('Arial', 9), wraplength=330, justify=tk.LEFT)
scan_label.pack(pady=(0, 15), anchor=tk.W)
# --- grant bytes slider ---
grant_var = tk.IntVar(value=current_grant_bytes)
def on_grant_change(val):
@ -599,11 +832,13 @@ def start_gui():
current_grant_bytes = int(float(val))
grant_label.config(text=f'每台授權: {current_grant_bytes} bytes')
grant_slider = ttk.Scale(control_frame, from_=100, to_=1800, orient='horizontal', variable=grant_var, command=on_grant_change)
grant_slider = ttk.Scale(control_frame, from_=100, to_=1800, orient='horizontal',
variable=grant_var, command=on_grant_change)
grant_slider.pack(fill=tk.X, pady=10)
grant_label = tk.Label(control_frame, text=f'每台授權: {current_grant_bytes} bytes', bg='#f0f0f0')
grant_label.pack()
# --- guard time slider ---
guard_var = tk.IntVar(value=current_guard_ms)
def on_guard_change(val):
@ -612,34 +847,46 @@ def start_gui():
guard_label.config(text=f'切換保護時間: {current_guard_ms} ms')
tk.Label(control_frame, text='Guard Time', font=('Arial', 11, 'bold'), bg='#f0f0f0').pack(pady=(20, 0))
guard_slider = ttk.Scale(control_frame, from_=0, to_=100, orient='horizontal', variable=guard_var, command=on_guard_change)
guard_slider = ttk.Scale(control_frame, from_=0, to_=100, orient='horizontal',
variable=guard_var, command=on_guard_change)
guard_slider.pack(fill=tk.X, pady=10)
guard_label = tk.Label(control_frame, text=f'切換保護時間: {current_guard_ms} ms', bg='#f0f0f0')
guard_label.pack()
# --- UAV INIT mode ---
tk.Label(control_frame, text='群機初始化控制 (INIT)', font=('Arial', 14, 'bold'), bg='#f0f0f0').pack(pady=(30, 10))
init_var = tk.IntVar(value=0)
radios = {}
status_labels = {}
radio_container = tk.Frame(control_frame, bg='#f0f0f0')
radio_container.pack(fill=tk.X)
def on_radio_change():
selected = init_var.get()
for sysid in ACTIVE_SYSIDS:
for sysid in list(ACTIVE_SYSIDS):
ensure_active_sysid(sysid)
if sysid == selected:
uav_states[sysid]['mode'] = 'INITIALIZING'
else:
uav_states[sysid]['mode'] = 'NORMAL'
tk.Radiobutton(control_frame, text='全體 NORMAL', variable=init_var, value=0,
command=on_radio_change, bg='#f0f0f0', font=('Arial', 11, 'bold'), fg='blue').pack(anchor=tk.W, pady=(0, 10))
tk.Radiobutton(radio_container, text='全體 NORMAL', variable=init_var, value=0,
command=on_radio_change, bg='#f0f0f0',
font=('Arial', 11, 'bold'), fg='blue').pack(anchor=tk.W, pady=(0, 10))
def ensure_radio_row(sysid):
if sysid in radios:
return
for sysid in ACTIVE_SYSIDS:
frame = tk.Frame(control_frame, bg='#f0f0f0')
frame.pack(fill=tk.X, pady=5)
frame = tk.Frame(radio_container, bg='#f0f0f0')
frame.pack(fill=tk.X, pady=3)
rb = tk.Radiobutton(frame, text=f'SYSID {sysid} 專屬載入', variable=init_var, value=sysid,
command=on_radio_change, bg='#f0f0f0', font=('Arial', 11))
rb = tk.Radiobutton(frame, text=f'SYSID {sysid} 專屬載入',
variable=init_var, value=sysid,
command=on_radio_change,
bg='#f0f0f0', font=('Arial', 11))
rb.pack(side=tk.LEFT)
radios[sysid] = rb
@ -647,6 +894,9 @@ def start_gui():
lbl.pack(side=tk.RIGHT, padx=5)
status_labels[sysid] = lbl
for sid in ACTIVE_SYSIDS:
ensure_radio_row(sid)
is_locked = False
def toggle_tdma_mode():
@ -669,38 +919,81 @@ def start_gui():
lock_btn = tk.Button(control_frame, text='參數載入完畢,鎖定進入 TDMA',
font=('Arial', 12, 'bold'), bg='#4CAF50', fg='white',
command=toggle_tdma_mode)
lock_btn.pack(pady=25, fill=tk.X)
lock_btn.pack(pady=20, fill=tk.X)
report_title = tk.Label(control_frame, text='最近 DONE 回報', font=('Arial', 12, 'bold'), bg='#f0f0f0')
# --- Reports ---
report_title = tk.Label(control_frame, text='最近 HELO / DONE 回報', font=('Arial', 12, 'bold'), bg='#f0f0f0')
report_title.pack(pady=(10, 5))
report_container = tk.Frame(control_frame, bg='#f0f0f0')
report_container.pack(fill=tk.X)
report_labels = {}
for sysid in ACTIVE_SYSIDS:
lbl = tk.Label(control_frame, text=f'SYSID {sysid}: sent=0, remain=0', font=('Arial', 10), bg='#f0f0f0')
lbl.pack(anchor=tk.W)
report_labels[sysid] = lbl
hello_labels = {}
def ensure_report_row(sysid):
if sysid not in hello_labels:
hl = tk.Label(report_container, text=f'SYSID {sysid}: HELO no report',
font=('Arial', 9), bg='#f0f0f0', justify=tk.LEFT)
hl.pack(anchor=tk.W)
hello_labels[sysid] = hl
if sysid not in report_labels:
dl = tk.Label(report_container, text=f'SYSID {sysid}: DONE sent=0, remain=0',
font=('Arial', 9), bg='#f0f0f0', justify=tk.LEFT)
dl.pack(anchor=tk.W)
report_labels[sysid] = dl
for sid in ACTIVE_SYSIDS:
ensure_report_row(sid)
rssi_title = tk.Label(control_frame, text=f'RSSI 最近 {RSSI_AVG_WINDOW} 次平均', font=('Arial', 12, 'bold'), bg='#f0f0f0')
rssi_title.pack(pady=(18, 5))
rssi_container = tk.Frame(control_frame, bg='#f0f0f0')
rssi_container.pack(fill=tk.X)
rssi_labels = {}
for sysid in ACTIVE_SYSIDS:
lbl = tk.Label(control_frame, text=f'SYSID {sysid}: RSSI avg -- dBm', font=('Arial', 10), bg='#f0f0f0', justify=tk.LEFT)
def ensure_rssi_row(sysid):
if sysid in rssi_labels:
return
lbl = tk.Label(rssi_container, text=f'SYSID {sysid}: RSSI avg -- dBm',
font=('Arial', 9), bg='#f0f0f0', justify=tk.LEFT)
lbl.pack(anchor=tk.W)
rssi_labels[sysid] = lbl
for sid in ACTIVE_SYSIDS:
ensure_rssi_row(sid)
# --- status updater ---
def update_status_gui():
now = time.time()
for sysid in list(ACTIVE_SYSIDS):
ensure_radio_row(sysid)
ensure_report_row(sysid)
ensure_rssi_row(sysid)
for sysid, lbl in status_labels.items():
mode = uav_states[sysid]['mode']
mode = uav_states.get(sysid, {"mode": "NORMAL"}).get("mode", "NORMAL")
if mode == 'INITIALIZING':
lbl.config(text='INIT', fg='orange')
else:
lbl.config(text='NORMAL', fg='green')
for sysid, lbl in hello_labels.items():
rep = hello_last_reports[sysid]
age = now - rep['time'] if rep['time'] else 999.0
age_text = f'{age:.1f}s ago' if age < 99 else 'no report'
addr_text = format_addr64(rep.get('src64'))
lbl.config(text=f"SYSID {sysid}: HELO ({age_text}) src64={addr_text}")
for sysid, lbl in report_labels.items():
rep = tdma_last_reports[sysid]
age = now - rep['time'] if rep['time'] else 999.0
age_text = f'{age:.1f}s ago' if age < 99 else 'no report'
lbl.config(text=f"SYSID {sysid}: sent={rep['sent_len']}, remain={rep['remain_len']} ({age_text})")
lbl.config(text=f"SYSID {sysid}: DONE sent={rep['sent_len']}, remain={rep['remain_len']} ({age_text})")
for sysid, lbl in rssi_labels.items():
st = rssi_latest_stats[sysid]
@ -722,14 +1015,15 @@ def start_gui():
plot_frame = tk.Frame(root)
plot_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(9.5, 8.3), dpi=100)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(9.5, 8.5), dpi=100)
canvas = FigureCanvasTkAgg(fig, master=plot_frame)
canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def update_plot(frame):
ax1.clear()
ax2.clear()
ax1.set_title(f'RSSI avg{RSSI_AVG_WINDOW} from XBee ATDB after DONE', fontsize=12)
ax1.set_title(f'RSSI avg{RSSI_AVG_WINDOW} from XBee ATDB after HELO / DONE', fontsize=12)
ax1.set_xlim(10, 0)
ax1.set_ylim(-100, -10)
ax1.grid(True, alpha=0.3)
@ -740,7 +1034,7 @@ def start_gui():
ax2.grid(True, alpha=0.3)
now = time.time()
colors = ['blue', 'red', 'green', 'orange', 'purple', 'brown']
colors = ['blue', 'red', 'green', 'orange', 'purple', 'brown', 'cyan', 'magenta']
try:
sysids = sorted(list(set(list(rssi_history.keys()) + list(packet_loss_history.keys()) + ACTIVE_SYSIDS)))
@ -806,9 +1100,13 @@ def start_gui():
if abs(real_y - text_y) > 1.0:
ax2.plot([lbl['x_real'], 0.5], [real_y, text_y], color=color, linestyle=':', alpha=0.6)
if ax1.lines:
ax1.legend(loc='upper left')
if ax2.lines:
ax2.legend(loc='upper left')
canvas.draw_idle()
ani = animation.FuncAnimation(fig, update_plot, interval=1000)
def on_closing():
@ -819,6 +1117,10 @@ def start_gui():
root.mainloop()
# =========================================================
# Entry point
# =========================================================
if __name__ == '__main__':
threading.Thread(target=lambda: asyncio.run(async_main()), daemon=True).start()
start_gui()

Loading…
Cancel
Save