add esp32 and udptest8-2 to unitdev04

lunu
lenting89 2 weeks ago
parent b720bb6ffd
commit 75d9d260e1

@ -0,0 +1,151 @@
from machine import UART
import time
import struct
import gc
# ================= 設定區 =================
FC_BAUDRATE = 115200
XB_BAUDRATE = 115200
DEST_64 = b'\x00\x13\xA2\x00\x42\x5B\x9D\xC8'
XBEE_MAX_PAYLOAD = 100
MY_SYSID = 15 # 第二台就改成 10第三台 15
POLL_MAGIC = b'POLL'
tx_buf = bytearray()
MAX_BUF_SIZE = 6144 # 保留安全餘裕的緩衝區大小
# =========================================
uart_fc = UART(1, baudrate=FC_BAUDRATE, tx=32, rx=33, rxbuf=4096)
uart_xb = UART(2, baudrate=XB_BAUDRATE, tx=25, rx=26, rxbuf=4096)
rx_buf = bytearray()
def get_checksum(data):
return 0xFF - (sum(data) & 0xFF)
def send_to_xbee_chunked(payload):
total_len = len(payload)
sent_len = 0
while sent_len < total_len:
end_len = min(sent_len + XBEE_MAX_PAYLOAD, total_len)
chunk = payload[sent_len : end_len]
sent_len = end_len
frame_content = bytearray()
frame_content.append(0x10)
frame_content.append(0x00) # 0x00 代表不要求 XBee ACK極大加快發送速度
frame_content.extend(DEST_64)
frame_content.extend(b'\xFF\xFE')
frame_content.append(0x00)
frame_content.append(0x00)
frame_content.extend(chunk)
length = len(frame_content)
checksum = get_checksum(frame_content)
packet = bytearray()
packet.append(0x7E)
packet.append((length >> 8) & 0xFF)
packet.append(length & 0xFF)
packet.extend(frame_content)
packet.append(checksum)
uart_xb.write(packet)
# 僅需極短暫延遲,讓硬體發送緩衝區消化即可
time.sleep_ms(1)
def process_xbee_buffer():
global rx_buf, tx_buf
while True:
start_pos = rx_buf.find(b'\x7E')
if start_pos == -1:
rx_buf = bytearray() # 避免用 clear()
return
if start_pos > 0:
rx_buf = rx_buf[start_pos:]
if len(rx_buf) < 3: return
pkt_len = (rx_buf[1] << 8) | rx_buf[2]
total_len = pkt_len + 4
if pkt_len > 300:
rx_buf = rx_buf[1:]
continue
if len(rx_buf) < total_len: return
packet = rx_buf[3 : 3+pkt_len]
checksum_recv = rx_buf[3+pkt_len]
if get_checksum(packet) == checksum_recv:
if packet[0] == 0x90:
real_data = packet[12:]
if len(real_data) == 5 and real_data.startswith(POLL_MAGIC):
if real_data[4] == MY_SYSID:
flush_tx_buffer()
else:
uart_fc.write(real_data)
rx_buf = rx_buf[total_len:]
else:
rx_buf = rx_buf[1:]
def flush_tx_buffer():
global tx_buf
if len(tx_buf) == 0:
return
# 單次最大發送量設為 1200 bytes避免在空中飛行超過時槽導致碰撞
send_limit = min(len(tx_buf), 1200)
cut_idx = send_limit
if send_limit < len(tx_buf):
last_fd_pos = tx_buf[:send_limit].rfind(b'\xFD')
if last_fd_pos > 0:
cut_idx = last_fd_pos
data_to_send = tx_buf[:cut_idx]
tx_buf = tx_buf[cut_idx:]
send_to_xbee_chunked(data_to_send)
# 初始清理
gc.collect()
while True:
try:
# FC -> 緩衝區
if uart_fc.any():
data = uart_fc.read(250)
if data:
tx_buf.extend(data)
# 聰明的溫和防爆機制 (砍掉舊的 2/3保留最新 1/3)
if len(tx_buf) > MAX_BUF_SIZE:
safe_cut = tx_buf.find(b'\xFD', (MAX_BUF_SIZE // 3) * 2)
if safe_cut != -1:
tx_buf = tx_buf[safe_cut:]
else:
tx_buf = tx_buf[(MAX_BUF_SIZE // 2):]
# XBee -> FC
if uart_xb.any():
chunk = uart_xb.read()
if chunk:
rx_buf.extend(chunk)
process_xbee_buffer()
time.sleep_ms(1)
except MemoryError:
# 發生 OOM 時的「斷尾求生」法,強制捨棄舊變數以釋放空間,防止死機
tx_buf = bytearray()
rx_buf = bytearray()
gc.collect()
time.sleep_ms(10)
except Exception as e:
time.sleep_ms(2)

@ -0,0 +1,500 @@
import asyncio
import serial_asyncio
import struct
import serial
import time
import threading
import tkinter as tk
from tkinter import ttk
from collections import deque, defaultdict
from pymavlink import mavutil
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
# === 多組設備設定 ===
CONFIGS = [
{"serial_port": "/dev/ttyUSB0", "udp_port": 14551},
{"serial_port": "COM15", "udp_port": 14570},
{"serial_port": "/dev/ttyUSB2", "udp_port": 14553},
{"serial_port": "/dev/ttyUSB3", "udp_port": 14554},
]
SERIAL_BAUDRATE = 115200
UDP_REMOTE_IP = '127.0.0.1'
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF'
ACTIVE_SYSIDS = [3, 10, 15]
POLL_MAGIC = b'POLL'
# === 全域變數與狀態追蹤 ===
current_base_slot_ms = 250 # 預設 TDMA 時槽
LOSS_TIME_WINDOW_SEC = 5.0 # 丟包率計算的時間視窗 (5秒)
uav_states = {
sysid: {
"mode": "NORMAL",
} for sysid in ACTIVE_SYSIDS
}
rssi_history = defaultdict(lambda: deque(maxlen=5000))
time_history = defaultdict(lambda: deque(maxlen=5000))
packet_loss_history = defaultdict(lambda: deque(maxlen=1000))
packet_loss_time_history = defaultdict(lambda: deque(maxlen=1000))
mavlink_sequence_tracker = defaultdict(dict)
packet_loss_stats = defaultdict(lambda: {'loss_rate': 0.0, 'total_received': 0, 'total_lost': 0})
serial_to_sysid = {}
serial_last_mavlink_time = {}
last_db_query_time = {}
# === 核心邏輯區 ===
def calculate_packet_loss(sysid, compid, current_seq):
global mavlink_sequence_tracker, packet_loss_stats
tracker = mavlink_sequence_tracker[sysid]
now = time.time()
if compid not in tracker:
tracker[compid] = {
'last_seq': current_seq,
'history': deque()
}
return 0.0
comp_tracker = tracker[compid]
last_seq = comp_tracker['last_seq']
if current_seq > last_seq:
expected = current_seq - last_seq
elif current_seq < last_seq:
expected = (255 - last_seq) + current_seq + 1
else:
return packet_loss_history[sysid][-1] if packet_loss_history[sysid] else 0.0
lost = max(0, expected - 1)
comp_tracker['history'].append((now, expected, lost))
comp_tracker['last_seq'] = current_seq
total_expected_all = 0
total_lost_all = 0
for c_id, c_data in tracker.items():
if 'history' in c_data:
while c_data['history'] and (now - c_data['history'][0][0]) > LOSS_TIME_WINDOW_SEC:
c_data['history'].popleft()
total_expected_all += sum(item[1] for item in c_data['history'])
total_lost_all += sum(item[2] for item in c_data['history'])
overall_loss_rate = (total_lost_all / total_expected_all) * 100.0 if total_expected_all > 0 else 0.0
packet_loss_stats[sysid] = {
'loss_rate': overall_loss_rate,
'total_received': total_expected_all - total_lost_all,
'total_lost': total_lost_all
}
packet_loss_history[sysid].append(overall_loss_rate)
packet_loss_time_history[sysid].append(now)
return overall_loss_rate
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x00) -> bytes:
frame = b'\x10' + struct.pack(">B", frame_id) + dest_addr64 + b'\xFF\xFE\x00\x00' + data
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", 0xFF - (sum(frame) & 0xFF))
class SerialToUDP(asyncio.Protocol):
def send_poll(self, target_sysid):
poll_payload = POLL_MAGIC + struct.pack("B", target_sysid)
api_frame = build_api_tx_frame(poll_payload, TARGET_ADDR64, 0x00)
self.transport.write(api_frame)
def __init__(self, udp_protocol, serial_port):
self.udp_protocol = udp_protocol
self.serial_port = serial_port
self.buffer = bytearray()
self.gcs_tx_queue = bytearray()
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"[{self.serial_port}] Serial connection established.")
def data_received(self, data):
self.buffer.extend(data)
while True:
try:
start_idx = self.buffer.index(0x7E)
if start_idx > 0:
del self.buffer[:start_idx]
except ValueError:
self.buffer.clear()
return
if len(self.buffer) < 3: return
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1
if length > 300:
self.buffer.pop(0)
continue
if len(self.buffer) < full_length: return
frame = self.buffer[:full_length]
checksum = 0xFF - (sum(frame[3:-1]) & 0xFF)
if checksum != frame[-1]:
self.buffer.pop(0)
continue
del self.buffer[:full_length]
if hasattr(self.udp_protocol, 'send_udp'):
self.udp_protocol.send_udp(bytes(frame))
if frame[3] == 0x88 and frame[5:7] == b'DB':
status = frame[7]
if status == 0x00 and len(frame) > 8:
rssi_value = frame[8]
now = time.time()
last_time = serial_last_mavlink_time.get(self.serial_port, 0)
if now - last_time <= 0.5:
sysid = serial_to_sysid.get(self.serial_port, None)
if sysid is not None:
rssi_history[sysid].append(-rssi_value)
time_history[sysid].append(now)
def write_to_serial(self, data):
self.gcs_tx_queue.extend(data)
def flush_gcs_queue(self):
if not self.gcs_tx_queue: return
send_limit = min(len(self.gcs_tx_queue), 150)
data_to_send = self.gcs_tx_queue[:send_limit]
self.gcs_tx_queue = self.gcs_tx_queue[send_limit:]
asyncio.create_task(self._async_send_chunks(data_to_send))
async def _async_send_chunks(self, data):
try:
MAX_PAYLOAD = 80
sent_len = 0
while sent_len < len(data):
end_len = min(sent_len + MAX_PAYLOAD, len(data))
chunk = data[sent_len:end_len]
sent_len = end_len
api_frame = build_api_tx_frame(chunk, TARGET_ADDR64, 0x00)
self.transport.write(api_frame)
await asyncio.sleep(0.01)
except Exception:
pass
def send_at_command_db(self):
try:
frame_type = 0x08
frame_id = 0x52
at_command = b'DB'
parameter = b''
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + at_command + parameter
checksum = 0xFF - (sum(frame) & 0xFF)
api_frame = b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
self.transport.write(api_frame)
except Exception:
pass
class UDPHandler(asyncio.DatagramProtocol):
def __init__(self, udp_port):
self.udp_port = udp_port
self.serial_transport = None
self.transport = None
self.mav_decoder = mavutil.mavlink.MAVLink(None)
def connection_made(self, transport):
self.transport = transport
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
def datagram_received(self, data, addr):
if self.serial_transport:
self.serial_transport.write_to_serial(data)
def decapsulate_data(self, data):
try:
if not data or data[0] != 0x7E:
return None
length = (data[1] << 8) | data[2]
if len(data) < length + 4:
return None
frame_type = data[3]
if frame_type == 0x90:
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
else:
return None
except Exception:
return None
def send_udp(self, data):
decoded_data = self.decapsulate_data(data)
if decoded_data is None: return
try:
for byte in decoded_data:
msg = self.mav_decoder.parse_char(bytes([byte]))
if msg:
sysid = msg.get_srcSystem()
compid = msg.get_srcComponent()
seq = msg.get_seq()
if sysid == 0: continue
if self.serial_transport:
port_name = self.serial_transport.serial_port
serial_to_sysid[port_name] = sysid
serial_last_mavlink_time[port_name] = time.time()
now = time.time()
last_query = last_db_query_time.get(port_name, 0)
if now - last_query >= 0.5:
self.serial_transport.send_at_command_db()
last_db_query_time[port_name] = now
calculate_packet_loss(sysid, compid, seq)
except Exception:
pass
if self.transport:
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, self.udp_port))
async def setup_bridge(config):
port, udp = config["serial_port"], config["udp_port"]
try:
ser = serial.Serial(port, SERIAL_BAUDRATE)
ser.close()
except: return None
loop = asyncio.get_running_loop()
udp_handler = UDPHandler(udp)
await loop.create_datagram_endpoint(lambda: udp_handler, local_addr=('0.0.0.0', 0))
serial_proto = SerialToUDP(udp_handler, port)
await serial_asyncio.create_serial_connection(loop, lambda: serial_proto, port, baudrate=SERIAL_BAUDRATE)
return serial_proto
async def tdma_scheduler(serial_protocols):
print(f"TDMA Scheduler Started... 找到 {len(serial_protocols)} 個可用端口")
while True:
# GCS 下行時槽
for sp in serial_protocols:
if hasattr(sp, 'flush_gcs_queue'):
sp.flush_gcs_queue()
await asyncio.sleep(0.1)
# UAV 上行時槽
for sysid in ACTIVE_SYSIDS:
state = uav_states[sysid]
if state["mode"] == "INITIALIZING":
# VIP 模式0.8秒內連發4次點名暴力獲取參數
for _ in range(4):
for sp in serial_protocols:
if hasattr(sp, 'send_poll'):
sp.send_poll(sysid)
await asyncio.sleep(0.2)
else:
# NORMAL 模式:聽從滑桿設定的時間
slot_time = current_base_slot_ms / 1000.0
for sp in serial_protocols:
if hasattr(sp, 'send_poll'):
sp.send_poll(sysid)
await asyncio.sleep(slot_time)
async def async_main():
protocols = await asyncio.gather(*(setup_bridge(cfg) for cfg in CONFIGS))
valid_protocols = [p for p in protocols if p is not None]
if valid_protocols:
asyncio.create_task(tdma_scheduler(valid_protocols))
await asyncio.Future()
# === GUI 介面區 ===
def start_gui():
root = tk.Tk()
root.title("UAV TDMA Control Station")
root.geometry("1200x800")
# --- 左側控制面板 ---
control_frame = tk.Frame(root, width=300, bg="#f0f0f0", padx=20, pady=20)
control_frame.pack(side=tk.LEFT, fill=tk.Y)
tk.Label(control_frame, text="TDMA 基礎時槽控制", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=10)
slider_val = tk.IntVar(value=current_base_slot_ms)
def on_slider_change(val):
global current_base_slot_ms
current_base_slot_ms = int(float(val))
val_label.config(text=f"當前設定: {current_base_slot_ms} ms")
slider = ttk.Scale(control_frame, from_=50, to_=1000, orient='horizontal', variable=slider_val, command=on_slider_change)
slider.pack(fill=tk.X, pady=10)
val_label = tk.Label(control_frame, text=f"當前設定: {current_base_slot_ms} ms", bg="#f0f0f0")
val_label.pack()
tk.Label(control_frame, text="群機初始化控制 (INIT)", font=("Arial", 14, "bold"), bg="#f0f0f0").pack(pady=(30, 10))
# === 1. 改為單選按鈕 (Radiobutton) ===
init_var = tk.IntVar(value=0) # 0 代表全體 NORMAL
radios = {}
status_labels = {}
def on_radio_change():
selected = init_var.get()
for sysid in ACTIVE_SYSIDS:
if sysid == selected:
uav_states[sysid]["mode"] = "INITIALIZING"
else:
uav_states[sysid]["mode"] = "NORMAL"
# 新增一個「取消特權」的選項
# tk.Radiobutton(control_frame, text="全體 NORMAL (無特權)", variable=init_var, value=0, command=on_radio_change, bg="#f0f0f0", font=("Arial", 11, "bold"), fg="blue").pack(anchor=tk.W, pady=(0, 10))
for sysid in ACTIVE_SYSIDS:
frame = tk.Frame(control_frame, bg="#f0f0f0")
frame.pack(fill=tk.X, pady=5)
rb = tk.Radiobutton(frame, text=f"SYSID {sysid} 專屬載入", variable=init_var, value=sysid,
command=on_radio_change, bg="#f0f0f0", font=("Arial", 11))
rb.pack(side=tk.LEFT)
radios[sysid] = rb
lbl = tk.Label(frame, text="NORMAL", font=("Arial", 10, "bold"), fg="green", bg="#f0f0f0")
lbl.pack(side=tk.RIGHT, padx=5)
status_labels[sysid] = lbl
# === 2. 改為開關式的鎖定按鈕 (Toggle) ===
is_locked = False
def toggle_tdma_mode():
nonlocal is_locked
is_locked = not is_locked
if is_locked:
# 鎖定:強制切回 0 (全體 NORMAL),並禁用單選按鈕
init_var.set(0)
on_radio_change()
for rb in radios.values():
rb.config(state=tk.DISABLED)
# 改變按鈕外觀為「解鎖」狀態
lock_btn.config(text="點擊解鎖 (允許重新下載參數)", bg="orange")
print("已鎖定進入純 TDMA 模式")
else:
# 解鎖:恢復單選按鈕功能
for rb in radios.values():
rb.config(state=tk.NORMAL)
# 改變按鈕外觀為「鎖定」狀態
lock_btn.config(text="參數載入完畢,鎖定進入 TDMA", bg="#4CAF50")
print("已解除鎖定,可重新分配特權")
lock_btn = tk.Button(control_frame, text="參數載入完畢,鎖定進入 TDMA",
font=("Arial", 12, "bold"), bg="#4CAF50", fg="white",
command=toggle_tdma_mode)
lock_btn.pack(pady=30, fill=tk.X)
# 動態更新狀態標籤
def update_status_gui():
for sysid, lbl in status_labels.items():
mode = uav_states[sysid]["mode"]
if mode == "INITIALIZING":
lbl.config(text="INIT (特權下載中)", fg="orange")
else:
lbl.config(text="NORMAL", fg="green")
root.after(500, update_status_gui)
update_status_gui()
# --- 右側圖表區 ---
plot_frame = tk.Frame(root)
plot_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(9, 8), dpi=100)
canvas = FigureCanvasTkAgg(fig, master=plot_frame)
canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
def update_plot(frame):
ax1.clear()
ax2.clear()
ax1.set_title("RSSI", fontsize=12); ax1.set_xlim(10, 0); ax1.set_ylim(-100, -10); ax1.grid(True, alpha=0.3)
ax2.set_title("Packet Loss Rate (5s Window)", fontsize=12); ax2.set_xlim(10, 0); ax2.set_ylim(0, 100); ax2.grid(True, alpha=0.3)
now = time.time()
colors = ['blue', 'red', 'green', 'orange', 'purple']
try: sysids = sorted(list(rssi_history.keys()))
except RuntimeError: return
loss_labels = []
for i, sysid in enumerate(sysids):
color = colors[i % len(colors)]
try:
t_hist, r_hist = list(time_history[sysid]), list(rssi_history[sysid])
lt_hist, l_hist = list(packet_loss_time_history.get(sysid, [])), list(packet_loss_history.get(sysid, []))
except RuntimeError: continue
rssi_recent = [idx for idx, ts in enumerate(t_hist) if now - ts <= 10]
if rssi_recent:
ax1.plot([now - t_hist[idx] for idx in rssi_recent], [r_hist[idx] for idx in rssi_recent], label=f"SYSID:{sysid}", color=color)
loss_recent = [idx for idx, ts in enumerate(lt_hist) if now - ts <= 10]
if loss_recent:
loss_t = [now - lt_hist[idx] for idx in loss_recent]
loss_r = [l_hist[idx] for idx in loss_recent]
ax2.plot(loss_t, loss_r, label=f"SYSID:{sysid}", color=color, marker='o', markersize=3)
if loss_r:
loss_labels.append({
'sysid': sysid, 'y_real': loss_r[-1], 'x_real': loss_t[-1], 'color': color
})
if loss_labels:
loss_labels = sorted(loss_labels, key=lambda k: k['y_real'])
min_gap = 12.0
y_positions = [lbl['y_real'] for lbl in loss_labels]
for j in range(1, len(y_positions)):
if y_positions[j] - y_positions[j-1] < min_gap:
y_positions[j] = y_positions[j-1] + min_gap
if y_positions[-1] > 90:
shift = y_positions[-1] - 90
y_positions = [y - shift for y in y_positions]
for j, lbl in enumerate(loss_labels):
sysid = lbl['sysid']
color = lbl['color']
real_y = lbl['y_real']
text_y = y_positions[j]
ax2.text(0.5, text_y, f'ID:{sysid} ({real_y:.1f}%)',
bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.8),
fontsize=10, fontweight='bold', color='white',
horizontalalignment='right', verticalalignment='center')
if abs(real_y - text_y) > 1.0:
ax2.plot([lbl['x_real'], 0.5], [real_y, text_y], color=color, linestyle=':', alpha=0.6)
ax1.legend(loc="upper left")
ax2.legend(loc="upper left")
ani = animation.FuncAnimation(fig, update_plot, interval=1000)
def on_closing():
root.quit()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == '__main__':
threading.Thread(target=lambda: asyncio.run(async_main()), daemon=True).start()
start_gui()
Loading…
Cancel
Save