update files

lunu
lenting89 3 weeks ago
parent 75d9d260e1
commit 0e7070b412

@ -1,151 +0,0 @@
from machine import UART
import time
import struct
import gc
# ================= 設定區 =================
FC_BAUDRATE = 115200
XB_BAUDRATE = 115200
DEST_64 = b'\x00\x13\xA2\x00\x42\x5B\x9D\xC8'
XBEE_MAX_PAYLOAD = 100
MY_SYSID = 15 # 第二台就改成 10第三台 15
POLL_MAGIC = b'POLL'
tx_buf = bytearray()
MAX_BUF_SIZE = 6144 # 保留安全餘裕的緩衝區大小
# =========================================
uart_fc = UART(1, baudrate=FC_BAUDRATE, tx=32, rx=33, rxbuf=4096)
uart_xb = UART(2, baudrate=XB_BAUDRATE, tx=25, rx=26, rxbuf=4096)
rx_buf = bytearray()
def get_checksum(data):
return 0xFF - (sum(data) & 0xFF)
def send_to_xbee_chunked(payload):
total_len = len(payload)
sent_len = 0
while sent_len < total_len:
end_len = min(sent_len + XBEE_MAX_PAYLOAD, total_len)
chunk = payload[sent_len : end_len]
sent_len = end_len
frame_content = bytearray()
frame_content.append(0x10)
frame_content.append(0x00) # 0x00 代表不要求 XBee ACK極大加快發送速度
frame_content.extend(DEST_64)
frame_content.extend(b'\xFF\xFE')
frame_content.append(0x00)
frame_content.append(0x00)
frame_content.extend(chunk)
length = len(frame_content)
checksum = get_checksum(frame_content)
packet = bytearray()
packet.append(0x7E)
packet.append((length >> 8) & 0xFF)
packet.append(length & 0xFF)
packet.extend(frame_content)
packet.append(checksum)
uart_xb.write(packet)
# 僅需極短暫延遲,讓硬體發送緩衝區消化即可
time.sleep_ms(1)
def process_xbee_buffer():
global rx_buf, tx_buf
while True:
start_pos = rx_buf.find(b'\x7E')
if start_pos == -1:
rx_buf = bytearray() # 避免用 clear()
return
if start_pos > 0:
rx_buf = rx_buf[start_pos:]
if len(rx_buf) < 3: return
pkt_len = (rx_buf[1] << 8) | rx_buf[2]
total_len = pkt_len + 4
if pkt_len > 300:
rx_buf = rx_buf[1:]
continue
if len(rx_buf) < total_len: return
packet = rx_buf[3 : 3+pkt_len]
checksum_recv = rx_buf[3+pkt_len]
if get_checksum(packet) == checksum_recv:
if packet[0] == 0x90:
real_data = packet[12:]
if len(real_data) == 5 and real_data.startswith(POLL_MAGIC):
if real_data[4] == MY_SYSID:
flush_tx_buffer()
else:
uart_fc.write(real_data)
rx_buf = rx_buf[total_len:]
else:
rx_buf = rx_buf[1:]
def flush_tx_buffer():
global tx_buf
if len(tx_buf) == 0:
return
# 單次最大發送量設為 1200 bytes避免在空中飛行超過時槽導致碰撞
send_limit = min(len(tx_buf), 1200)
cut_idx = send_limit
if send_limit < len(tx_buf):
last_fd_pos = tx_buf[:send_limit].rfind(b'\xFD')
if last_fd_pos > 0:
cut_idx = last_fd_pos
data_to_send = tx_buf[:cut_idx]
tx_buf = tx_buf[cut_idx:]
send_to_xbee_chunked(data_to_send)
# 初始清理
gc.collect()
while True:
try:
# FC -> 緩衝區
if uart_fc.any():
data = uart_fc.read(250)
if data:
tx_buf.extend(data)
# 聰明的溫和防爆機制 (砍掉舊的 2/3保留最新 1/3)
if len(tx_buf) > MAX_BUF_SIZE:
safe_cut = tx_buf.find(b'\xFD', (MAX_BUF_SIZE // 3) * 2)
if safe_cut != -1:
tx_buf = tx_buf[safe_cut:]
else:
tx_buf = tx_buf[(MAX_BUF_SIZE // 2):]
# XBee -> FC
if uart_xb.any():
chunk = uart_xb.read()
if chunk:
rx_buf.extend(chunk)
process_xbee_buffer()
time.sleep_ms(1)
except MemoryError:
# 發生 OOM 時的「斷尾求生」法,強制捨棄舊變數以釋放空間,防止死機
tx_buf = bytearray()
rx_buf = bytearray()
gc.collect()
time.sleep_ms(10)
except Exception as e:
time.sleep_ms(2)

@ -0,0 +1,368 @@
from machine import UART
import time
import struct
import gc
# =========================================================
# ESP32 / MicroPythonUAV 端 XBee <-> Flight Controller Bridge
# Packet-size TDMA + DONE 標籤版
#
# 核心行為:
# 1. FC -> ESP32持續把 MAVLink bytes 存進 tx_buf。
# 2. GCS -> ESP32收到 POLL + SYSID + quota_bytes 才允許上行傳送。
# 3. ESP32 -> GCS每次最多送 quota_bytes但只送「完整 MAVLink frame」。
# 4. ESP32 -> GCS送完後獨立送 DONE reportGCS 收到後可立即換下一台 UAV。
#
# 注意DONE 不是接在 MAVLink bytes 後面,而是獨立 RF payload避免污染 MAVLink stream。
# =========================================================
# ================= 設定區 =================
FC_BAUDRATE = 115200
XB_BAUDRATE = 115200
# 這裡要改成你的地面站 / coordinator XBee 64-bit address
DEST_64 = b'\x00\x13\xA2\x00\x42\x5B\x9D\xC8'
# XBee API Transmit Request 內每次 RF payload 最大切片。
# 900HP / DigiMesh 實測常用 80~100 bytes 較穩。
XBEE_MAX_PAYLOAD = 100
# 每台 UAV 都要設定成自己的 SYSID
MY_SYSID = 15 # 第二台可改 10第三台可改 15依你的 MAVLink SYSID 設定
# GCS -> UAV poll 格式:
# 舊版b'POLL' + sysid(1)
# 新版b'POLL' + sysid(1) + quota_bytes(2, big-endian)
POLL_MAGIC = b'POLL'
DEFAULT_GRANT_BYTES = 600
# UAV -> GCS done 格式:
# b'DONE' + sysid(1) + sent_len(2) + remain_len(2)
DONE_MAGIC = b'DONE'
# ESP32 RAM 有限,不要讓 FC 資料無限累積
MAX_BUF_SIZE = 6144
READ_FC_BYTES = 250
# =========================================
uart_fc = UART(1, baudrate=FC_BAUDRATE, tx=32, rx=33, rxbuf=4096)
uart_xb = UART(2, baudrate=XB_BAUDRATE, tx=25, rx=26, rxbuf=4096)
tx_buf = bytearray() # FC -> GCS 等待 TDMA poll 的 MAVLink stream
rx_buf = bytearray() # XBee API frame parser buffer
def get_checksum(data):
return 0xFF - (sum(data) & 0xFF)
def build_api_tx_frame(payload):
"""建立 XBee API frame type 0x10把 payload 送到 DEST_64。"""
frame_content = bytearray()
frame_content.append(0x10) # Transmit Request
frame_content.append(0x00) # Frame ID = 0不要求 XBee ACK降低延遲
frame_content.extend(DEST_64)
frame_content.extend(b'\xFF\xFE')
frame_content.append(0x00) # broadcast radius
frame_content.append(0x00) # options
frame_content.extend(payload)
length = len(frame_content)
packet = bytearray()
packet.append(0x7E)
packet.append((length >> 8) & 0xFF)
packet.append(length & 0xFF)
packet.extend(frame_content)
packet.append(get_checksum(frame_content))
return packet
def send_to_xbee_chunked(payload):
"""把 payload 依 XBEE_MAX_PAYLOAD 切成多個 XBee API TX frame 送出。"""
total_len = len(payload)
sent_len = 0
while sent_len < total_len:
end_len = min(sent_len + XBEE_MAX_PAYLOAD, total_len)
chunk = payload[sent_len:end_len]
sent_len = end_len
uart_xb.write(build_api_tx_frame(chunk))
# 很短的讓步,避免 ESP32 UART/XBee 本地 buffer 瞬間塞爆
time.sleep_ms(1)
def send_done_report(sent_len, remain_len):
"""送出 TDMA DONE 標籤。此封包不會轉給飛控,只給 GCS scheduler 使用。"""
if sent_len > 65535:
sent_len = 65535
if remain_len > 65535:
remain_len = 65535
payload = DONE_MAGIC + struct.pack('>BHH', MY_SYSID, sent_len, remain_len)
uart_xb.write(build_api_tx_frame(payload))
time.sleep_ms(1)
def find_first_mavlink_magic(buf):
"""找 MAVLink v1/v2 magic0xFE 或 0xFD。"""
pos_fe = buf.find(b'\xFE')
pos_fd = buf.find(b'\xFD')
if pos_fe == -1:
return pos_fd
if pos_fd == -1:
return pos_fe
return pos_fe if pos_fe < pos_fd else pos_fd
def mavlink_frame_length(buf, start_idx):
"""
回傳從 start_idx 開始的一包 MAVLink frame 長度
header 還不完整或整包還沒收完回傳 None
MAVLink v1:
magic 0xFE, payload_len at byte 1, total = payload_len + 8
MAVLink v2:
magic 0xFD, payload_len at byte 1, incompat_flags at byte 2,
total = payload_len + 12 signed +13
"""
if start_idx >= len(buf):
return None
magic = buf[start_idx]
if magic == 0xFE:
# v1 至少需要 magic + len
if len(buf) - start_idx < 2:
return None
payload_len = buf[start_idx + 1]
total_len = payload_len + 8
if len(buf) - start_idx < total_len:
return None
return total_len
if magic == 0xFD:
# v2 至少需要 magic + len + incompat_flags
if len(buf) - start_idx < 3:
return None
payload_len = buf[start_idx + 1]
incompat_flags = buf[start_idx + 2]
signed = (incompat_flags & 0x01) != 0
total_len = payload_len + 12 + (13 if signed else 0)
if len(buf) - start_idx < total_len:
return None
return total_len
return None
def pop_mavlink_frames_by_quota(quota_bytes):
"""
tx_buf 取出不超過 quota_bytes 的完整 MAVLink frames
不會使用 rfind(0xFD) 亂切避免 payload 中剛好有 0xFD 造成誤判
若第一個完整 MAVLink frame quota 還大仍會送出第一包避免永遠卡住
"""
global tx_buf
if quota_bytes <= 0 or len(tx_buf) == 0:
return b''
# 丟掉 MAVLink magic 前面的雜訊
start = find_first_mavlink_magic(tx_buf)
if start == -1:
tx_buf = bytearray()
return b''
if start > 0:
tx_buf = tx_buf[start:]
out = bytearray()
while len(tx_buf) > 0:
# 再次對齊 magic避免中間出現雜訊
if tx_buf[0] not in (0xFE, 0xFD):
start = find_first_mavlink_magic(tx_buf)
if start == -1:
tx_buf = bytearray()
break
tx_buf = tx_buf[start:]
frame_len = mavlink_frame_length(tx_buf, 0)
if frame_len is None:
# 第一包尚未完整,留在 buffer 等下一輪 FC bytes
break
# 正常情況:加了這包會超過 quota就先停止
if len(out) > 0 and (len(out) + frame_len) > quota_bytes:
break
# 第一包就比 quota 大:仍送出,避免 quota 設太小導致永遠送不出去
if len(out) == 0 and frame_len > quota_bytes:
out.extend(tx_buf[:frame_len])
tx_buf = tx_buf[frame_len:]
break
out.extend(tx_buf[:frame_len])
tx_buf = tx_buf[frame_len:]
if len(out) >= quota_bytes:
break
return bytes(out)
def trim_tx_buffer_if_needed():
"""tx_buf 過大時,丟掉較舊的完整 MAVLink frames保留較新的資料。"""
global tx_buf
if len(tx_buf) <= MAX_BUF_SIZE:
return
target_size = MAX_BUF_SIZE // 2
while len(tx_buf) > target_size:
start = find_first_mavlink_magic(tx_buf)
if start == -1:
tx_buf = bytearray()
return
if start > 0:
tx_buf = tx_buf[start:]
continue
frame_len = mavlink_frame_length(tx_buf, 0)
if frame_len is None:
# 若剩下的是超大半包資料,直接保留最後 target_size bytes
if len(tx_buf) > target_size:
tx_buf = tx_buf[-target_size:]
return
# 丟掉最舊的一包完整 MAVLink frame
tx_buf = tx_buf[frame_len:]
def flush_tx_buffer(grant_bytes):
"""
收到自己 SYSID POLL 後執行
1. 根據 grant_bytes 取出完整 MAVLink frames
2. XBee API frame 送出去
3. 獨立送 DONE report GCS 立即進下一台
"""
global tx_buf
if grant_bytes <= 0:
send_done_report(0, len(tx_buf))
return
data_to_send = pop_mavlink_frames_by_quota(grant_bytes)
sent_len = len(data_to_send)
if sent_len > 0:
send_to_xbee_chunked(data_to_send)
send_done_report(sent_len, len(tx_buf))
def parse_poll_payload(real_data):
"""
支援兩種 poll
舊版POLL + sysid 5 bytes
新版POLL + sysid + quota 7 bytes
回傳 (target_sysid, grant_bytes)不是 poll 則回傳 (None, None)
"""
if not real_data.startswith(POLL_MAGIC):
return None, None
if len(real_data) == 5:
return real_data[4], DEFAULT_GRANT_BYTES
if len(real_data) == 7:
target_sysid = real_data[4]
grant_bytes = (real_data[5] << 8) | real_data[6]
return target_sysid, grant_bytes
return None, None
def process_xbee_buffer():
global rx_buf
while True:
start_pos = rx_buf.find(b'\x7E')
if start_pos == -1:
rx_buf = bytearray()
return
if start_pos > 0:
rx_buf = rx_buf[start_pos:]
if len(rx_buf) < 3:
return
pkt_len = (rx_buf[1] << 8) | rx_buf[2]
total_len = pkt_len + 4
# 避免亂資料讓 buffer 爆掉XBee RX frame 通常不會太大
if pkt_len > 300:
rx_buf = rx_buf[1:]
continue
if len(rx_buf) < total_len:
return
packet = rx_buf[3:3 + pkt_len]
checksum_recv = rx_buf[3 + pkt_len]
if get_checksum(packet) == checksum_recv:
if packet[0] == 0x90:
# XBee Receive Packet 0x90RF data 從 packet[12:] 開始
real_data = packet[12:]
target_sysid, grant_bytes = parse_poll_payload(real_data)
if target_sysid is not None:
if target_sysid == MY_SYSID:
flush_tx_buffer(grant_bytes)
else:
# 一般 GCS -> FC MAVLink 下行資料
uart_fc.write(real_data)
rx_buf = rx_buf[total_len:]
else:
rx_buf = rx_buf[1:]
# 初始清理
gc_ran = False
try:
gc.collect()
gc_ran = True
except Exception:
pass
while True:
try:
# FC -> ESP32 tx buffer
if uart_fc.any():
data = uart_fc.read(READ_FC_BYTES)
if data:
tx_buf.extend(data)
trim_tx_buffer_if_needed()
# XBee -> ESP32
if uart_xb.any():
chunk = uart_xb.read()
if chunk:
rx_buf.extend(chunk)
process_xbee_buffer()
time.sleep_ms(1)
except MemoryError:
# OOM 時丟掉暫存資料,避免整顆 ESP32 死掉
tx_buf = bytearray()
rx_buf = bytearray()
gc.collect()
time.sleep_ms(10)
except Exception:
# 現場飛行時避免因單次解析錯誤中止主迴圈
time.sleep_ms(2)

@ -1,500 +0,0 @@
import asyncio
import serial_asyncio
import struct
import serial
import time
import threading
import tkinter as tk
from tkinter import ttk
from collections import deque, defaultdict
from pymavlink import mavutil
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
# === 多組設備設定 ===
CONFIGS = [
{"serial_port": "/dev/ttyUSB0", "udp_port": 14551},
{"serial_port": "COM15", "udp_port": 14570},
{"serial_port": "/dev/ttyUSB2", "udp_port": 14553},
{"serial_port": "/dev/ttyUSB3", "udp_port": 14554},
]
SERIAL_BAUDRATE = 115200
UDP_REMOTE_IP = '127.0.0.1'
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF'
ACTIVE_SYSIDS = [3, 10, 15]
POLL_MAGIC = b'POLL'
# === 全域變數與狀態追蹤 ===
current_base_slot_ms = 250 # 預設 TDMA 時槽
LOSS_TIME_WINDOW_SEC = 5.0 # 丟包率計算的時間視窗 (5秒)
uav_states = {
sysid: {
"mode": "NORMAL",
} for sysid in ACTIVE_SYSIDS
}
rssi_history = defaultdict(lambda: deque(maxlen=5000))
time_history = defaultdict(lambda: deque(maxlen=5000))
packet_loss_history = defaultdict(lambda: deque(maxlen=1000))
packet_loss_time_history = defaultdict(lambda: deque(maxlen=1000))
mavlink_sequence_tracker = defaultdict(dict)
packet_loss_stats = defaultdict(lambda: {'loss_rate': 0.0, 'total_received': 0, 'total_lost': 0})
serial_to_sysid = {}
serial_last_mavlink_time = {}
last_db_query_time = {}
# === 核心邏輯區 ===
def calculate_packet_loss(sysid, compid, current_seq):
global mavlink_sequence_tracker, packet_loss_stats
tracker = mavlink_sequence_tracker[sysid]
now = time.time()
if compid not in tracker:
tracker[compid] = {
'last_seq': current_seq,
'history': deque()
}
return 0.0
comp_tracker = tracker[compid]
last_seq = comp_tracker['last_seq']
if current_seq > last_seq:
expected = current_seq - last_seq
elif current_seq < last_seq:
expected = (255 - last_seq) + current_seq + 1
else:
return packet_loss_history[sysid][-1] if packet_loss_history[sysid] else 0.0
lost = max(0, expected - 1)
comp_tracker['history'].append((now, expected, lost))
comp_tracker['last_seq'] = current_seq
total_expected_all = 0
total_lost_all = 0
for c_id, c_data in tracker.items():
if 'history' in c_data:
while c_data['history'] and (now - c_data['history'][0][0]) > LOSS_TIME_WINDOW_SEC:
c_data['history'].popleft()
total_expected_all += sum(item[1] for item in c_data['history'])
total_lost_all += sum(item[2] for item in c_data['history'])
overall_loss_rate = (total_lost_all / total_expected_all) * 100.0 if total_expected_all > 0 else 0.0
packet_loss_stats[sysid] = {
'loss_rate': overall_loss_rate,
'total_received': total_expected_all - total_lost_all,
'total_lost': total_lost_all
}
packet_loss_history[sysid].append(overall_loss_rate)
packet_loss_time_history[sysid].append(now)
return overall_loss_rate
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x00) -> bytes:
frame = b'\x10' + struct.pack(">B", frame_id) + dest_addr64 + b'\xFF\xFE\x00\x00' + data
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", 0xFF - (sum(frame) & 0xFF))
class SerialToUDP(asyncio.Protocol):
def send_poll(self, target_sysid):
poll_payload = POLL_MAGIC + struct.pack("B", target_sysid)
api_frame = build_api_tx_frame(poll_payload, TARGET_ADDR64, 0x00)
self.transport.write(api_frame)
def __init__(self, udp_protocol, serial_port):
self.udp_protocol = udp_protocol
self.serial_port = serial_port
self.buffer = bytearray()
self.gcs_tx_queue = bytearray()
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"[{self.serial_port}] Serial connection established.")
def data_received(self, data):
self.buffer.extend(data)
while True:
try:
start_idx = self.buffer.index(0x7E)
if start_idx > 0:
del self.buffer[:start_idx]
except ValueError:
self.buffer.clear()
return
if len(self.buffer) < 3: return
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1
if length > 300:
self.buffer.pop(0)
continue
if len(self.buffer) < full_length: return
frame = self.buffer[:full_length]
checksum = 0xFF - (sum(frame[3:-1]) & 0xFF)
if checksum != frame[-1]:
self.buffer.pop(0)
continue
del self.buffer[:full_length]
if hasattr(self.udp_protocol, 'send_udp'):
self.udp_protocol.send_udp(bytes(frame))
if frame[3] == 0x88 and frame[5:7] == b'DB':
status = frame[7]
if status == 0x00 and len(frame) > 8:
rssi_value = frame[8]
now = time.time()
last_time = serial_last_mavlink_time.get(self.serial_port, 0)
if now - last_time <= 0.5:
sysid = serial_to_sysid.get(self.serial_port, None)
if sysid is not None:
rssi_history[sysid].append(-rssi_value)
time_history[sysid].append(now)
def write_to_serial(self, data):
self.gcs_tx_queue.extend(data)
def flush_gcs_queue(self):
if not self.gcs_tx_queue: return
send_limit = min(len(self.gcs_tx_queue), 150)
data_to_send = self.gcs_tx_queue[:send_limit]
self.gcs_tx_queue = self.gcs_tx_queue[send_limit:]
asyncio.create_task(self._async_send_chunks(data_to_send))
async def _async_send_chunks(self, data):
try:
MAX_PAYLOAD = 80
sent_len = 0
while sent_len < len(data):
end_len = min(sent_len + MAX_PAYLOAD, len(data))
chunk = data[sent_len:end_len]
sent_len = end_len
api_frame = build_api_tx_frame(chunk, TARGET_ADDR64, 0x00)
self.transport.write(api_frame)
await asyncio.sleep(0.01)
except Exception:
pass
def send_at_command_db(self):
try:
frame_type = 0x08
frame_id = 0x52
at_command = b'DB'
parameter = b''
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + at_command + parameter
checksum = 0xFF - (sum(frame) & 0xFF)
api_frame = b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
self.transport.write(api_frame)
except Exception:
pass
class UDPHandler(asyncio.DatagramProtocol):
def __init__(self, udp_port):
self.udp_port = udp_port
self.serial_transport = None
self.transport = None
self.mav_decoder = mavutil.mavlink.MAVLink(None)
def connection_made(self, transport):
self.transport = transport
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
def datagram_received(self, data, addr):
if self.serial_transport:
self.serial_transport.write_to_serial(data)
def decapsulate_data(self, data):
try:
if not data or data[0] != 0x7E:
return None
length = (data[1] << 8) | data[2]
if len(data) < length + 4:
return None
frame_type = data[3]
if frame_type == 0x90:
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
else:
return None
except Exception:
return None
def send_udp(self, data):
decoded_data = self.decapsulate_data(data)
if decoded_data is None: return
try:
for byte in decoded_data:
msg = self.mav_decoder.parse_char(bytes([byte]))
if msg:
sysid = msg.get_srcSystem()
compid = msg.get_srcComponent()
seq = msg.get_seq()
if sysid == 0: continue
if self.serial_transport:
port_name = self.serial_transport.serial_port
serial_to_sysid[port_name] = sysid
serial_last_mavlink_time[port_name] = time.time()
now = time.time()
last_query = last_db_query_time.get(port_name, 0)
if now - last_query >= 0.5:
self.serial_transport.send_at_command_db()
last_db_query_time[port_name] = now
calculate_packet_loss(sysid, compid, seq)
except Exception:
pass
if self.transport:
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, self.udp_port))
async def setup_bridge(config):
port, udp = config["serial_port"], config["udp_port"]
try:
ser = serial.Serial(port, SERIAL_BAUDRATE)
ser.close()
except: return None
loop = asyncio.get_running_loop()
udp_handler = UDPHandler(udp)
await loop.create_datagram_endpoint(lambda: udp_handler, local_addr=('0.0.0.0', 0))
serial_proto = SerialToUDP(udp_handler, port)
await serial_asyncio.create_serial_connection(loop, lambda: serial_proto, port, baudrate=SERIAL_BAUDRATE)
return serial_proto
async def tdma_scheduler(serial_protocols):
print(f"TDMA Scheduler Started... 找到 {len(serial_protocols)} 個可用端口")
while True:
# GCS 下行時槽
for sp in serial_protocols:
if hasattr(sp, 'flush_gcs_queue'):
sp.flush_gcs_queue()
await asyncio.sleep(0.1)
# UAV 上行時槽
for sysid in ACTIVE_SYSIDS:
state = uav_states[sysid]
if state["mode"] == "INITIALIZING":
# VIP 模式0.8秒內連發4次點名暴力獲取參數
for _ in range(4):
for sp in serial_protocols:
if hasattr(sp, 'send_poll'):
sp.send_poll(sysid)
await asyncio.sleep(0.2)
else:
# NORMAL 模式:聽從滑桿設定的時間
slot_time = current_base_slot_ms / 1000.0
for sp in serial_protocols:
if hasattr(sp, 'send_poll'):
sp.send_poll(sysid)
await asyncio.sleep(slot_time)
async def async_main():
protocols = await asyncio.gather(*(setup_bridge(cfg) for cfg in CONFIGS))
valid_protocols = [p for p in protocols if p is not None]
if valid_protocols:
asyncio.create_task(tdma_scheduler(valid_protocols))
await asyncio.Future()
# === GUI 介面區 ===
def start_gui():
root = tk.Tk()
root.title("UAV TDMA Control Station")
root.geometry("1200x800")
# --- 左側控制面板 ---
control_frame = tk.Frame(root, width=300, bg="#f0f0f0", padx=20, pady=20)
control_frame.pack(side=tk.LEFT, fill=tk.Y)
tk.Label(control_frame, text="TDMA 基礎時槽控制", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=10)
slider_val = tk.IntVar(value=current_base_slot_ms)
def on_slider_change(val):
global current_base_slot_ms
current_base_slot_ms = int(float(val))
val_label.config(text=f"當前設定: {current_base_slot_ms} ms")
slider = ttk.Scale(control_frame, from_=50, to_=1000, orient='horizontal', variable=slider_val, command=on_slider_change)
slider.pack(fill=tk.X, pady=10)
val_label = tk.Label(control_frame, text=f"當前設定: {current_base_slot_ms} ms", bg="#f0f0f0")
val_label.pack()
tk.Label(control_frame, text="群機初始化控制 (INIT)", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=(30, 10))
# === 1. 改為單選按鈕 (Radiobutton) ===
init_var = tk.IntVar(value=0) # 0 代表全體 NORMAL
radios = {}
status_labels = {}
def on_radio_change():
selected = init_var.get()
for sysid in ACTIVE_SYSIDS:
if sysid == selected:
uav_states[sysid]["mode"] = "INITIALIZING"
else:
uav_states[sysid]["mode"] = "NORMAL"
# 新增一個「取消特權」的選項
# tk.Radiobutton(control_frame, text="全體 NORMAL (無特權)", variable=init_var, value=0, command=on_radio_change, bg="#f0f0f0", font=("Arial", 11, "bold"), fg="blue").pack(anchor=tk.W, pady=(0, 10))
for sysid in ACTIVE_SYSIDS:
frame = tk.Frame(control_frame, bg="#f0f0f0")
frame.pack(fill=tk.X, pady=5)
rb = tk.Radiobutton(frame, text=f"SYSID {sysid} 專屬載入", variable=init_var, value=sysid,
command=on_radio_change, bg="#f0f0f0", font=("Arial", 11))
rb.pack(side=tk.LEFT)
radios[sysid] = rb
lbl = tk.Label(frame, text="NORMAL", font=("Arial", 10, "bold"), fg="green", bg="#f0f0f0")
lbl.pack(side=tk.RIGHT, padx=5)
status_labels[sysid] = lbl
# === 2. 改為開關式的鎖定按鈕 (Toggle) ===
is_locked = False
def toggle_tdma_mode():
nonlocal is_locked
is_locked = not is_locked
if is_locked:
# 鎖定:強制切回 0 (全體 NORMAL),並禁用單選按鈕
init_var.set(0)
on_radio_change()
for rb in radios.values():
rb.config(state=tk.DISABLED)
# 改變按鈕外觀為「解鎖」狀態
lock_btn.config(text="點擊解鎖 (允許重新下載參數)", bg="orange")
print("已鎖定進入純 TDMA 模式")
else:
# 解鎖:恢復單選按鈕功能
for rb in radios.values():
rb.config(state=tk.NORMAL)
# 改變按鈕外觀為「鎖定」狀態
lock_btn.config(text="參數載入完畢,鎖定進入 TDMA", bg="#4CAF50")
print("已解除鎖定,可重新分配特權")
lock_btn = tk.Button(control_frame, text="參數載入完畢,鎖定進入 TDMA",
font=("Arial", 12, "bold"), bg="#4CAF50", fg="white",
command=toggle_tdma_mode)
lock_btn.pack(pady=30, fill=tk.X)
# 動態更新狀態標籤
def update_status_gui():
for sysid, lbl in status_labels.items():
mode = uav_states[sysid]["mode"]
if mode == "INITIALIZING":
lbl.config(text="INIT (特權下載中)", fg="orange")
else:
lbl.config(text="NORMAL", fg="green")
root.after(500, update_status_gui)
update_status_gui()
# --- 右側圖表區 ---
plot_frame = tk.Frame(root)
plot_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(9, 8), dpi=100)
canvas = FigureCanvasTkAgg(fig, master=plot_frame)
canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def update_plot(frame):
ax1.clear()
ax2.clear()
ax1.set_title("RSSI", fontsize=12); ax1.set_xlim(10, 0); ax1.set_ylim(-100, -10); ax1.grid(True, alpha=0.3)
ax2.set_title("Packet Loss Rate (5s Window)", fontsize=12); ax2.set_xlim(10, 0); ax2.set_ylim(0, 100); ax2.grid(True, alpha=0.3)
now = time.time()
colors = ['blue', 'red', 'green', 'orange', 'purple']
try: sysids = sorted(list(rssi_history.keys()))
except RuntimeError: return
loss_labels = []
for i, sysid in enumerate(sysids):
color = colors[i % len(colors)]
try:
t_hist, r_hist = list(time_history[sysid]), list(rssi_history[sysid])
lt_hist, l_hist = list(packet_loss_time_history.get(sysid, [])), list(packet_loss_history.get(sysid, []))
except RuntimeError: continue
rssi_recent = [idx for idx, ts in enumerate(t_hist) if now - ts <= 10]
if rssi_recent:
ax1.plot([now - t_hist[idx] for idx in rssi_recent], [r_hist[idx] for idx in rssi_recent], label=f"SYSID:{sysid}", color=color)
loss_recent = [idx for idx, ts in enumerate(lt_hist) if now - ts <= 10]
if loss_recent:
loss_t = [now - lt_hist[idx] for idx in loss_recent]
loss_r = [l_hist[idx] for idx in loss_recent]
ax2.plot(loss_t, loss_r, label=f"SYSID:{sysid}", color=color, marker='o', markersize=3)
if loss_r:
loss_labels.append({
'sysid': sysid, 'y_real': loss_r[-1], 'x_real': loss_t[-1], 'color': color
})
if loss_labels:
loss_labels = sorted(loss_labels, key=lambda k: k['y_real'])
min_gap = 12.0
y_positions = [lbl['y_real'] for lbl in loss_labels]
for j in range(1, len(y_positions)):
if y_positions[j] - y_positions[j-1] < min_gap:
y_positions[j] = y_positions[j-1] + min_gap
if y_positions[-1] > 90:
shift = y_positions[-1] - 90
y_positions = [y - shift for y in y_positions]
for j, lbl in enumerate(loss_labels):
sysid = lbl['sysid']
color = lbl['color']
real_y = lbl['y_real']
text_y = y_positions[j]
ax2.text(0.5, text_y, f'ID:{sysid} ({real_y:.1f}%)',
bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.8),
fontsize=10, fontweight='bold', color='white',
horizontalalignment='right', verticalalignment='center')
if abs(real_y - text_y) > 1.0:
ax2.plot([lbl['x_real'], 0.5], [real_y, text_y], color=color, linestyle=':', alpha=0.6)
ax1.legend(loc="upper left")
ax2.legend(loc="upper left")
ani = animation.FuncAnimation(fig, update_plot, interval=1000)
def on_closing():
root.quit()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == '__main__':
threading.Thread(target=lambda: asyncio.run(async_main()), daemon=True).start()
start_gui()

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save