lenting89 2 weeks ago
commit 6ef06e7cb7

@ -0,0 +1,193 @@
import asyncio
import serial_asyncio
import struct
import os
import sys
import serial
import signal
import traceback
from pymavlink import mavutil
# === 設定區 ===
SERIAL_PORT = 'COM15' # 手動指定
SERIAL_BAUDRATE = 57600
UDP_REMOTE_IP = '127.0.0.1'
UDP_REMOTE_PORT = 14550
DEBUG_MODE = False
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
# === 工具函數 ===
def check_serial_port():
try:
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
ser.close()
return True
except serial.SerialException as e:
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
return False
except Exception as e:
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
return False
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes:
frame_type = 0x10
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
# === Serial Protocol 實作 ===
class SerialToUDP(asyncio.Protocol):
def __init__(self, udp_protocol):
self.udp_protocol = udp_protocol
self.buffer = bytearray()
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"Serial connection established on {SERIAL_PORT}")
def data_received(self, data):
self.buffer.extend(data)
while True:
if len(self.buffer) < 3:
return
if self.buffer[0] != 0x7E:
self.buffer.pop(0)
continue
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1
if len(self.buffer) < full_length:
return
frame = self.buffer[:full_length]
del self.buffer[:full_length]
if hasattr(self.udp_protocol, 'send_udp'):
self.udp_protocol.send_udp(bytes(frame))
def write_to_serial(self, data):
try:
api_frame = build_api_tx_frame(data, TARGET_ADDR64)
pass
self.transport.write(api_frame)
except Exception as e:
print(f"[TX Error] 無法封裝或傳送資料: {e}")
# === UDP Protocol 實作 ===
class UDPHandler(asyncio.DatagramProtocol):
def __init__(self):
self.serial_transport = None
self.transport = None
self.mav_decoder = mavutil.mavlink.MAVLink(None)
def connection_made(self, transport):
self.transport = transport
print("UDP transport ready.")
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
def datagram_received(self, data, addr):
if self.serial_transport:
self.serial_transport.write_to_serial(data)
def send_udp(self, data):
decoded_data = self.decapsulate_data(data)
if decoded_data is None:
pass
return
self.decode_mavlink_data(decoded_data)
if self.transport:
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def decapsulate_data(self, data):
try:
if not data or data[0] != 0x7E:
return None
length = (data[1] << 8) | data[2]
if len(data) < length + 4:
return None
frame_type = data[3]
if frame_type == 0x90:
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
else:
return None
except Exception as e:
print(f"[XBee 解封錯誤] {e}")
return None
def decode_mavlink_data(self, data):
try:
msg = self.mav_decoder.parse_char(data)
if msg:
if msg.get_type() == "HEARTBEAT":
pass # 不輸出任何訊息
else:
pass
except Exception as e:
print(f"[MAVLink Decode Error] {e}")
# === 主流程 ===
async def main():
if not check_serial_port():
print("程式終止:串口檢查失敗")
return
loop = asyncio.get_running_loop()
if os.name != 'nt': # Windows 不支援 add_signal_handler
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop)))
udp_handler = UDPHandler()
try:
udp_transport, _ = await loop.create_datagram_endpoint(
lambda: udp_handler,
local_addr=('0.0.0.0', 0)
)
except Exception as e:
print(f"無法創建 UDP 端點:{str(e)}")
return
sock = udp_transport.get_extra_info('socket')
print(f"UDP listening on {sock.getsockname()}")
try:
serial_proto = SerialToUDP(udp_handler)
await serial_asyncio.create_serial_connection(
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
except Exception as e:
print(f"無法建立串口連接:{str(e)}")
traceback.print_exc()
udp_transport.close()
return
print("等待串口資料...")
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
async def shutdown(loop):
print("Shutting down...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程式被使用者中斷")
except Exception as e:
print("程式執行錯誤:")
traceback.print_exc()

@ -0,0 +1,193 @@
import asyncio
import serial_asyncio
import struct
import os
import sys
import serial
import signal
import traceback
from pymavlink import mavutil
# === 設定區 ===
SERIAL_PORT = 'COM15' # 手動指定
SERIAL_BAUDRATE = 57600
UDP_REMOTE_IP = '127.0.0.1'
UDP_REMOTE_PORT = 14550
DEBUG_MODE = False
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播
# === 工具函數 ===
def check_serial_port():
try:
ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE)
ser.close()
return True
except serial.SerialException as e:
print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}")
return False
except Exception as e:
print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}")
return False
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes:
frame_type = 0x10
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
# === Serial Protocol 實作 ===
class SerialToUDP(asyncio.Protocol):
def __init__(self, udp_protocol):
self.udp_protocol = udp_protocol
self.buffer = bytearray()
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"Serial connection established on {SERIAL_PORT}")
def data_received(self, data):
self.buffer.extend(data)
while True:
if len(self.buffer) < 3:
return
if self.buffer[0] != 0x7E:
self.buffer.pop(0)
continue
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1
if len(self.buffer) < full_length:
return
frame = self.buffer[:full_length]
del self.buffer[:full_length]
if hasattr(self.udp_protocol, 'send_udp'):
self.udp_protocol.send_udp(bytes(frame))
def write_to_serial(self, data):
try:
api_frame = build_api_tx_frame(data, TARGET_ADDR64)
pass
self.transport.write(api_frame)
except Exception as e:
print(f"[TX Error] 無法封裝或傳送資料: {e}")
# === UDP Protocol 實作 ===
class UDPHandler(asyncio.DatagramProtocol):
def __init__(self):
self.serial_transport = None
self.transport = None
self.mav_decoder = mavutil.mavlink.MAVLink(None)
def connection_made(self, transport):
self.transport = transport
print("UDP transport ready.")
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
def datagram_received(self, data, addr):
if self.serial_transport:
self.serial_transport.write_to_serial(data)
def send_udp(self, data):
decoded_data = self.decapsulate_data(data)
if decoded_data is None:
pass
return
self.decode_mavlink_data(decoded_data)
if self.transport:
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT))
def decapsulate_data(self, data):
try:
if not data or data[0] != 0x7E:
return None
length = (data[1] << 8) | data[2]
if len(data) < length + 4:
return None
frame_type = data[3]
if frame_type == 0x90:
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
else:
return None
except Exception as e:
print(f"[XBee 解封錯誤] {e}")
return None
def decode_mavlink_data(self, data):
try:
msg = self.mav_decoder.parse_char(data)
if msg:
if msg.get_type() == "HEARTBEAT":
pass # 不輸出任何訊息
else:
pass
except Exception as e:
print(f"[MAVLink Decode Error] {e}")
# === 主流程 ===
async def main():
if not check_serial_port():
print("程式終止:串口檢查失敗")
return
loop = asyncio.get_running_loop()
if os.name != 'nt': # Windows 不支援 add_signal_handler
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop)))
udp_handler = UDPHandler()
try:
udp_transport, _ = await loop.create_datagram_endpoint(
lambda: udp_handler,
local_addr=('0.0.0.0', 0)
)
except Exception as e:
print(f"無法創建 UDP 端點:{str(e)}")
return
sock = udp_transport.get_extra_info('socket')
print(f"UDP listening on {sock.getsockname()}")
try:
serial_proto = SerialToUDP(udp_handler)
await serial_asyncio.create_serial_connection(
loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE
)
except Exception as e:
print(f"無法建立串口連接:{str(e)}")
traceback.print_exc()
udp_transport.close()
return
print("等待串口資料...")
try:
await asyncio.Future()
except asyncio.CancelledError:
pass
async def shutdown(loop):
print("Shutting down...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("程式被使用者中斷")
except Exception as e:
print("程式執行錯誤:")
traceback.print_exc()

@ -0,0 +1,384 @@
# === import 保持不變 ===
import asyncio
import serial_asyncio
import struct
import serial
import traceback
import time
from collections import deque, defaultdict
from pymavlink import mavutil
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import threading
# === 多組設備設定 ===
CONFIGS = [
{"serial_port": "/dev/ttyUSB2", "udp_port": 14551},
{"serial_port": "COM43", "udp_port": 14552},
{"serial_port": "COM15", "udp_port": 14553},
]
SERIAL_BAUDRATE = 115200
UDP_REMOTE_IP = '127.0.0.1'
TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF'
# === 數據存儲 ===
rssi_history = defaultdict(lambda: deque(maxlen=5000))
time_history = defaultdict(lambda: deque(maxlen=5000))
# === 新增:丟包率追蹤 ===
packet_loss_history = defaultdict(lambda: deque(maxlen=1000))
packet_loss_time_history = defaultdict(lambda: deque(maxlen=1000))
mavlink_sequence_tracker = defaultdict(dict) # {sysid: {compid: {'last_seq': x, 'total_packets': y, 'lost_packets': z}}}
packet_loss_stats = defaultdict(lambda: {'loss_rate': 0.0, 'total_received': 0, 'total_lost': 0})
serial_to_sysid = {}
serial_last_mavlink_time = {} # 優化 1追蹤各 serial port 最近的 MAVLink 時間
def calculate_packet_loss(sysid, compid, current_seq):
"""計算丟包率"""
global mavlink_sequence_tracker, packet_loss_stats
tracker = mavlink_sequence_tracker[sysid]
if compid not in tracker:
# 第一次收到這個component的消息
tracker[compid] = {
'last_seq': current_seq,
'total_packets': 1,
'lost_packets': 0,
'last_update': time.time()
}
return 0.0
comp_tracker = tracker[compid]
last_seq = comp_tracker['last_seq']
# 計算序列號差異處理255溢出
if current_seq > last_seq:
expected_packets = current_seq - last_seq
elif current_seq < last_seq:
# 序列號溢出0-255循環
expected_packets = (255 - last_seq) + current_seq + 1
else:
# 重複的序列號,忽略
return comp_tracker.get('loss_rate', 0.0)
# 更新統計
comp_tracker['total_packets'] += expected_packets
lost_packets = expected_packets - 1 # 實際收到1個應該收到expected_packets個
comp_tracker['lost_packets'] += max(0, lost_packets)
comp_tracker['last_seq'] = current_seq
comp_tracker['last_update'] = time.time()
# 計算丟包率
total_expected = comp_tracker['total_packets']
total_lost = comp_tracker['lost_packets']
loss_rate = (total_lost / total_expected) * 100.0 if total_expected > 0 else 0.0
comp_tracker['loss_rate'] = loss_rate
# 更新全局統計按sysid匯總所有component
total_received = 0
total_lost_all = 0
total_expected_all = 0
for comp_id, comp_data in tracker.items():
total_expected_all += comp_data['total_packets']
total_lost_all += comp_data['lost_packets']
total_received += comp_data['total_packets'] - comp_data['lost_packets']
overall_loss_rate = (total_lost_all / total_expected_all) * 100.0 if total_expected_all > 0 else 0.0
packet_loss_stats[sysid] = {
'loss_rate': overall_loss_rate,
'total_received': total_received,
'total_lost': total_lost_all
}
# 記錄到歷史數據
now = time.time()
packet_loss_history[sysid].append(overall_loss_rate)
packet_loss_time_history[sysid].append(now)
return overall_loss_rate
def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes:
frame_type = 0x10
dest_addr16 = b'\xFF\xFE'
broadcast_radius = 0x00
options = 0x00
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id)
frame += dest_addr64 + dest_addr16
frame += struct.pack(">BB", broadcast_radius, options) + data
checksum = 0xFF - (sum(frame) & 0xFF)
return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
class SerialToUDP(asyncio.Protocol):
def __init__(self, udp_protocol, serial_port):
self.udp_protocol = udp_protocol
self.serial_port = serial_port
self.buffer = bytearray()
def connection_made(self, transport):
self.transport = transport
if hasattr(self.udp_protocol, 'set_serial_transport'):
self.udp_protocol.set_serial_transport(self)
print(f"[{self.serial_port}] Serial connection established.")
def data_received(self, data):
self.buffer.extend(data)
while True:
if len(self.buffer) < 3:
return
if self.buffer[0] != 0x7E:
self.buffer.pop(0)
continue
length = (self.buffer[1] << 8) | self.buffer[2]
full_length = 3 + length + 1
if len(self.buffer) < full_length:
return
frame = self.buffer[:full_length]
del self.buffer[:full_length]
if hasattr(self.udp_protocol, 'send_udp'):
self.udp_protocol.send_udp(bytes(frame))
if frame[3] == 0x88 and frame[5:7] == b'DB':
status = frame[7]
if status == 0x00 and len(frame) > 8:
rssi_value = frame[8]
now = time.time()
# === 優化 1僅信任最近 0.5 秒內有接收 MAVLink 的 port
last_time = serial_last_mavlink_time.get(self.serial_port, 0)
if now - last_time <= 0.5:
sysid = serial_to_sysid.get(self.serial_port, None)
if sysid is not None:
rssi_history[sysid].append(-rssi_value)
time_history[sysid].append(now)
# print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm")
else:
print(f"[{self.serial_port}] 找不到 sysid 對應RSSI = -{rssi_value} dBm已忽略")
else:
print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLinkRSSI = -{rssi_value} dBm 已忽略")
else:
print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}")
def write_to_serial(self, data):
try:
api_frame = build_api_tx_frame(data, TARGET_ADDR64)
self.transport.write(api_frame)
except Exception as e:
print(f"[{self.serial_port} TX Error] 無法封裝或傳送資料: {e}")
def send_at_command_db(self):
try:
frame_type = 0x08
frame_id = 0x52
at_command = b'DB'
parameter = b''
frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + at_command + parameter
checksum = 0xFF - (sum(frame) & 0xFF)
api_frame = b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum)
self.transport.write(api_frame)
except Exception as e:
print(f"[{self.serial_port}] 發送 DB 指令失敗: {e}")
class UDPHandler(asyncio.DatagramProtocol):
def __init__(self, udp_port):
self.udp_port = udp_port
self.serial_transport = None
self.transport = None
self.mav_decoder = mavutil.mavlink.MAVLink(None)
def connection_made(self, transport):
self.transport = transport
print(f"[UDP:{self.udp_port}] transport ready.")
def set_serial_transport(self, serial_transport):
self.serial_transport = serial_transport
def datagram_received(self, data, addr):
if self.serial_transport:
self.serial_transport.write_to_serial(data)
def send_udp(self, data):
decoded_data = self.decapsulate_data(data)
if decoded_data is None:
return
self.decode_mavlink_data(decoded_data)
if self.transport:
self.transport.sendto(decoded_data, (UDP_REMOTE_IP, self.udp_port))
def decapsulate_data(self, data):
try:
if not data or data[0] != 0x7E:
return None
length = (data[1] << 8) | data[2]
if len(data) < length + 4:
return None
frame_type = data[3]
if frame_type == 0x90:
rf_data_start = 3 + 12
return data[rf_data_start:3 + length]
else:
return None
except Exception as e:
print(f"[UDP:{self.udp_port} 解封錯誤] {e}")
return None
def decode_mavlink_data(self, data):
try:
# 逐字節解析MAVLink
for byte in data:
msg = self.mav_decoder.parse_char(bytes([byte]))
if msg:
sysid = msg.get_srcSystem()
compid = msg.get_srcComponent()
seq = msg.get_seq()
if sysid == 0:
continue
if self.serial_transport:
serial_to_sysid[self.serial_transport.serial_port] = sysid
serial_last_mavlink_time[self.serial_transport.serial_port] = time.time()
self.serial_transport.send_at_command_db()
# === 新增:計算丟包率 ===
try:
loss_rate = calculate_packet_loss(sysid, compid, seq)
# print(f"[SYSID:{sysid}] 丟包率: {loss_rate:.1f}%, SEQ: {seq}")
except Exception as e:
print(f"[UDP:{self.udp_port}] 丟包率計算錯誤: {e}")
except Exception as e:
print(f"[UDP:{self.udp_port} MAVLink Decode Error] {e}")
def start_plotting():
# 創建子圖上方RSSI下方丟包率
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
def update(frame):
ax1.clear()
ax2.clear()
# === RSSI 圖 ===
ax1.set_title("RSSI", fontsize=14)
ax1.set_xlabel("Time (s ago)")
ax1.set_ylabel("RSSI (dBm)")
ax1.set_xlim(30, 0) # 顯示最近10秒
ax1.set_ylim(-100, -10)
ax1.grid(True, alpha=0.3)
# === 丟包率圖 ===
ax2.set_title("Packet Loss Rate", fontsize=14)
ax2.set_xlabel("Time (s ago)")
ax2.set_ylabel("Loss Rate (%)")
ax2.set_xlim(10, 0)
ax2.set_ylim(0, 100)
ax2.grid(True, alpha=0.3)
now = time.time()
# 顏色列表
colors = ['blue', 'red', 'green', 'orange', 'purple', 'brown', 'pink', 'gray']
for i, sysid in enumerate(sorted(rssi_history.keys())):
color = colors[i % len(colors)]
# === 繪製 RSSI ===
rssi_recent_indices = [
idx for idx, ts in enumerate(time_history[sysid]) if now - ts <= 30
]
if rssi_recent_indices:
rssi_t = [now - time_history[sysid][idx] for idx in rssi_recent_indices]
rssi_r = [rssi_history[sysid][idx] for idx in rssi_recent_indices]
if rssi_t and rssi_r:
ax1.plot(rssi_t, rssi_r, label=f"SYSID:{sysid}", color=color, linewidth=2)
# 在RSSI圖上顯示當前數值
if rssi_r:
latest_rssi = rssi_r[-1]
ax1.text(2, latest_rssi, f'{latest_rssi:.0f}dBm',
bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.7),
fontsize=10, fontweight='bold', color='white')
# === 繪製丟包率 ===
if sysid in packet_loss_history:
loss_recent_indices = [
idx for idx, ts in enumerate(packet_loss_time_history[sysid]) if now - ts <= 10
]
if loss_recent_indices:
loss_t = [now - packet_loss_time_history[sysid][idx] for idx in loss_recent_indices]
loss_r = [packet_loss_history[sysid][idx] for idx in loss_recent_indices]
if loss_t and loss_r:
ax2.plot(loss_t, loss_r, label=f"SYSID:{sysid}", color=color, linewidth=2, marker='o', markersize=3)
# 在丟包率圖上顯示當前數值和統計
if loss_r:
latest_loss = loss_r[-1]
stats = packet_loss_stats[sysid]
# 顯示丟包率百分比
ax2.text(2, latest_loss + (i * 8), f'{latest_loss:.1f}%',
bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.7),
fontsize=10, fontweight='bold', color='white')
# 在圖表左側顯示詳細統計
stats_text = f"SYSID {sysid}:\nRecieved: {stats['total_received']}\nLoss: {stats['total_lost']}\nLoss Rate: {latest_loss:.1f}%"
ax2.text(28, 85 - (i * 20), stats_text,
bbox=dict(boxstyle="round,pad=0.5", facecolor=color, alpha=0.8),
fontsize=9, color='white', verticalalignment='top')
# 設置圖例
ax1.legend(loc="upper right", fontsize=10)
ax2.legend(loc="upper right", fontsize=10)
ani = animation.FuncAnimation(fig, update, interval=1000)
plt.tight_layout()
plt.show()
async def setup_bridge(config):
serial_port = config["serial_port"]
udp_port = config["udp_port"]
try:
ser = serial.Serial(serial_port, SERIAL_BAUDRATE)
ser.close()
except Exception as e:
print(f"[{serial_port}] 串口打開失敗: {e}")
return
loop = asyncio.get_running_loop()
udp_handler = UDPHandler(udp_port)
try:
await loop.create_datagram_endpoint(
lambda: udp_handler,
local_addr=('0.0.0.0', 0)
)
except Exception as e:
print(f"[{serial_port}] 無法創建 UDP: {e}")
return
try:
serial_proto = SerialToUDP(udp_handler, serial_port)
await serial_asyncio.create_serial_connection(
loop, lambda: serial_proto, serial_port, baudrate=SERIAL_BAUDRATE
)
except Exception as e:
print(f"[{serial_port}] 無法建立串口連接:{e}")
traceback.print_exc()
return
print(f"[{serial_port}] Serial <--> UDP:{udp_port} bridge ready.")
async def main():
tasks = [setup_bridge(cfg) for cfg in CONFIGS]
await asyncio.gather(*tasks)
await asyncio.Future()
if __name__ == '__main__':
try:
threading.Thread(target=lambda: asyncio.run(main()), daemon=True).start()
start_plotting()
except KeyboardInterrupt:
print("使用者中斷程式")
except Exception as e:
print("程式執行錯誤:")
traceback.print_exc()
Loading…
Cancel
Save