diff --git a/serial_udp_update.py b/serial_udp_update.py new file mode 100644 index 0000000..728adca --- /dev/null +++ b/serial_udp_update.py @@ -0,0 +1,193 @@ +import asyncio +import serial_asyncio +import struct +import os +import sys +import serial +import signal +import traceback +from pymavlink import mavutil + +# === 設定區 === +SERIAL_PORT = 'COM15' # 手動指定 +SERIAL_BAUDRATE = 57600 +UDP_REMOTE_IP = '127.0.0.1' +UDP_REMOTE_PORT = 14550 +DEBUG_MODE = False +TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 + +# === 工具函數 === +def check_serial_port(): + try: + ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) + ser.close() + return True + except serial.SerialException as e: + print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") + return False + except Exception as e: + print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") + return False + +def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: + frame_type = 0x10 + dest_addr16 = b'\xFF\xFE' + broadcast_radius = 0x00 + options = 0x00 + + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += dest_addr64 + dest_addr16 + frame += struct.pack(">BB", broadcast_radius, options) + data + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + +# === Serial Protocol 實作 === +class SerialToUDP(asyncio.Protocol): + def __init__(self, udp_protocol): + self.udp_protocol = udp_protocol + self.buffer = bytearray() + + def connection_made(self, transport): + self.transport = transport + if hasattr(self.udp_protocol, 'set_serial_transport'): + self.udp_protocol.set_serial_transport(self) + print(f"Serial connection established on {SERIAL_PORT}") + + def data_received(self, data): + self.buffer.extend(data) + while True: + if len(self.buffer) < 3: + return + if self.buffer[0] != 0x7E: + self.buffer.pop(0) + continue + length = (self.buffer[1] << 8) | self.buffer[2] + full_length = 3 + length + 1 + if len(self.buffer) < full_length: + return + frame = self.buffer[:full_length] + del self.buffer[:full_length] + if hasattr(self.udp_protocol, 'send_udp'): + self.udp_protocol.send_udp(bytes(frame)) + + def write_to_serial(self, data): + try: + api_frame = build_api_tx_frame(data, TARGET_ADDR64) + pass + self.transport.write(api_frame) + except Exception as e: + print(f"[TX Error] 無法封裝或傳送資料: {e}") + +# === UDP Protocol 實作 === +class UDPHandler(asyncio.DatagramProtocol): + def __init__(self): + self.serial_transport = None + self.transport = None + self.mav_decoder = mavutil.mavlink.MAVLink(None) + + def connection_made(self, transport): + self.transport = transport + print("UDP transport ready.") + + def set_serial_transport(self, serial_transport): + self.serial_transport = serial_transport + + def datagram_received(self, data, addr): + if self.serial_transport: + self.serial_transport.write_to_serial(data) + + def send_udp(self, data): + decoded_data = self.decapsulate_data(data) + if decoded_data is None: + pass + return + self.decode_mavlink_data(decoded_data) + if self.transport: + self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) + + def decapsulate_data(self, data): + try: + if not data or data[0] != 0x7E: + return None + length = (data[1] << 8) | data[2] + if len(data) < length + 4: + return None + frame_type = data[3] + if frame_type == 0x90: + rf_data_start = 3 + 12 + return data[rf_data_start:3 + length] + else: + return None + except Exception as e: + print(f"[XBee 解封錯誤] {e}") + return None + + def decode_mavlink_data(self, data): + try: + msg = self.mav_decoder.parse_char(data) + if msg: + if msg.get_type() == "HEARTBEAT": + pass # 不輸出任何訊息 + else: + pass + except Exception as e: + print(f"[MAVLink Decode Error] {e}") + +# === 主流程 === +async def main(): + if not check_serial_port(): + print("程式終止:串口檢查失敗") + return + + loop = asyncio.get_running_loop() + if os.name != 'nt': # Windows 不支援 add_signal_handler + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop))) + + udp_handler = UDPHandler() + + try: + udp_transport, _ = await loop.create_datagram_endpoint( + lambda: udp_handler, + local_addr=('0.0.0.0', 0) + ) + except Exception as e: + print(f"無法創建 UDP 端點:{str(e)}") + return + + sock = udp_transport.get_extra_info('socket') + print(f"UDP listening on {sock.getsockname()}") + + try: + serial_proto = SerialToUDP(udp_handler) + await serial_asyncio.create_serial_connection( + loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE + ) + except Exception as e: + print(f"無法建立串口連接:{str(e)}") + traceback.print_exc() + udp_transport.close() + return + + print("等待串口資料...") + try: + await asyncio.Future() + except asyncio.CancelledError: + pass + +async def shutdown(loop): + print("Shutting down...") + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("程式被使用者中斷") + except Exception as e: + print("程式執行錯誤:") + traceback.print_exc() diff --git a/src/unitdev04/udptest4.py b/src/unitdev04/udptest4.py new file mode 100644 index 0000000..d9dbcb6 --- /dev/null +++ b/src/unitdev04/udptest4.py @@ -0,0 +1,193 @@ +import asyncio +import serial_asyncio +import struct +import os +import sys +import serial +import signal +import traceback +from pymavlink import mavutil + +# === 設定區 === +SERIAL_PORT = 'COM15' # 手動指定 +SERIAL_BAUDRATE = 57600 +UDP_REMOTE_IP = '127.0.0.1' +UDP_REMOTE_PORT = 14550 +DEBUG_MODE = False +TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' # 廣播 + +# === 工具函數 === +def check_serial_port(): + try: + ser = serial.Serial(SERIAL_PORT, SERIAL_BAUDRATE) + ser.close() + return True + except serial.SerialException as e: + print(f"錯誤:串口設備 {SERIAL_PORT} 被占用或無法訪問:{str(e)}") + return False + except Exception as e: + print(f"錯誤:檢查串口時發生未知錯誤:{str(e)}") + return False + +def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: + frame_type = 0x10 + dest_addr16 = b'\xFF\xFE' + broadcast_radius = 0x00 + options = 0x00 + + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += dest_addr64 + dest_addr16 + frame += struct.pack(">BB", broadcast_radius, options) + data + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + +# === Serial Protocol 實作 === +class SerialToUDP(asyncio.Protocol): + def __init__(self, udp_protocol): + self.udp_protocol = udp_protocol + self.buffer = bytearray() + + def connection_made(self, transport): + self.transport = transport + if hasattr(self.udp_protocol, 'set_serial_transport'): + self.udp_protocol.set_serial_transport(self) + print(f"Serial connection established on {SERIAL_PORT}") + + def data_received(self, data): + self.buffer.extend(data) + while True: + if len(self.buffer) < 3: + return + if self.buffer[0] != 0x7E: + self.buffer.pop(0) + continue + length = (self.buffer[1] << 8) | self.buffer[2] + full_length = 3 + length + 1 + if len(self.buffer) < full_length: + return + frame = self.buffer[:full_length] + del self.buffer[:full_length] + if hasattr(self.udp_protocol, 'send_udp'): + self.udp_protocol.send_udp(bytes(frame)) + + def write_to_serial(self, data): + try: + api_frame = build_api_tx_frame(data, TARGET_ADDR64) + pass + self.transport.write(api_frame) + except Exception as e: + print(f"[TX Error] 無法封裝或傳送資料: {e}") + +# === UDP Protocol 實作 === +class UDPHandler(asyncio.DatagramProtocol): + def __init__(self): + self.serial_transport = None + self.transport = None + self.mav_decoder = mavutil.mavlink.MAVLink(None) + + def connection_made(self, transport): + self.transport = transport + print("UDP transport ready.") + + def set_serial_transport(self, serial_transport): + self.serial_transport = serial_transport + + def datagram_received(self, data, addr): + if self.serial_transport: + self.serial_transport.write_to_serial(data) + + def send_udp(self, data): + decoded_data = self.decapsulate_data(data) + if decoded_data is None: + pass + return + self.decode_mavlink_data(decoded_data) + if self.transport: + self.transport.sendto(decoded_data, (UDP_REMOTE_IP, UDP_REMOTE_PORT)) + + def decapsulate_data(self, data): + try: + if not data or data[0] != 0x7E: + return None + length = (data[1] << 8) | data[2] + if len(data) < length + 4: + return None + frame_type = data[3] + if frame_type == 0x90: + rf_data_start = 3 + 12 + return data[rf_data_start:3 + length] + else: + return None + except Exception as e: + print(f"[XBee 解封錯誤] {e}") + return None + + def decode_mavlink_data(self, data): + try: + msg = self.mav_decoder.parse_char(data) + if msg: + if msg.get_type() == "HEARTBEAT": + pass # 不輸出任何訊息 + else: + pass + except Exception as e: + print(f"[MAVLink Decode Error] {e}") + +# === 主流程 === +async def main(): + if not check_serial_port(): + print("程式終止:串口檢查失敗") + return + + loop = asyncio.get_running_loop() + if os.name != 'nt': # Windows 不支援 add_signal_handler + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown(loop))) + + udp_handler = UDPHandler() + + try: + udp_transport, _ = await loop.create_datagram_endpoint( + lambda: udp_handler, + local_addr=('0.0.0.0', 0) + ) + except Exception as e: + print(f"無法創建 UDP 端點:{str(e)}") + return + + sock = udp_transport.get_extra_info('socket') + print(f"UDP listening on {sock.getsockname()}") + + try: + serial_proto = SerialToUDP(udp_handler) + await serial_asyncio.create_serial_connection( + loop, lambda: serial_proto, SERIAL_PORT, baudrate=SERIAL_BAUDRATE + ) + except Exception as e: + print(f"無法建立串口連接:{str(e)}") + traceback.print_exc() + udp_transport.close() + return + + print("等待串口資料...") + try: + await asyncio.Future() + except asyncio.CancelledError: + pass + +async def shutdown(loop): + print("Shutting down...") + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + loop.stop() + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("程式被使用者中斷") + except Exception as e: + print("程式執行錯誤:") + traceback.print_exc() diff --git a/src/unitdev04/udptest8.py b/src/unitdev04/udptest8.py new file mode 100644 index 0000000..acf92bd --- /dev/null +++ b/src/unitdev04/udptest8.py @@ -0,0 +1,384 @@ +# === import 保持不變 === +import asyncio +import serial_asyncio +import struct +import serial +import traceback +import time +from collections import deque, defaultdict +from pymavlink import mavutil +import matplotlib.pyplot as plt +import matplotlib.animation as animation +import threading + +# === 多組設備設定 === +CONFIGS = [ + {"serial_port": "/dev/ttyUSB2", "udp_port": 14551}, + {"serial_port": "COM43", "udp_port": 14552}, + {"serial_port": "COM15", "udp_port": 14553}, +] + +SERIAL_BAUDRATE = 115200 +UDP_REMOTE_IP = '127.0.0.1' +TARGET_ADDR64 = b'\x00\x00\x00\x00\x00\x00\xFF\xFF' + +# === 數據存儲 === +rssi_history = defaultdict(lambda: deque(maxlen=5000)) +time_history = defaultdict(lambda: deque(maxlen=5000)) + +# === 新增:丟包率追蹤 === +packet_loss_history = defaultdict(lambda: deque(maxlen=1000)) +packet_loss_time_history = defaultdict(lambda: deque(maxlen=1000)) +mavlink_sequence_tracker = defaultdict(dict) # {sysid: {compid: {'last_seq': x, 'total_packets': y, 'lost_packets': z}}} +packet_loss_stats = defaultdict(lambda: {'loss_rate': 0.0, 'total_received': 0, 'total_lost': 0}) + +serial_to_sysid = {} +serial_last_mavlink_time = {} # 優化 1:追蹤各 serial port 最近的 MAVLink 時間 + +def calculate_packet_loss(sysid, compid, current_seq): + """計算丟包率""" + global mavlink_sequence_tracker, packet_loss_stats + + tracker = mavlink_sequence_tracker[sysid] + if compid not in tracker: + # 第一次收到這個component的消息 + tracker[compid] = { + 'last_seq': current_seq, + 'total_packets': 1, + 'lost_packets': 0, + 'last_update': time.time() + } + return 0.0 + + comp_tracker = tracker[compid] + last_seq = comp_tracker['last_seq'] + + # 計算序列號差異(處理255溢出) + if current_seq > last_seq: + expected_packets = current_seq - last_seq + elif current_seq < last_seq: + # 序列號溢出(0-255循環) + expected_packets = (255 - last_seq) + current_seq + 1 + else: + # 重複的序列號,忽略 + return comp_tracker.get('loss_rate', 0.0) + + # 更新統計 + comp_tracker['total_packets'] += expected_packets + lost_packets = expected_packets - 1 # 實際收到1個,應該收到expected_packets個 + comp_tracker['lost_packets'] += max(0, lost_packets) + comp_tracker['last_seq'] = current_seq + comp_tracker['last_update'] = time.time() + + # 計算丟包率 + total_expected = comp_tracker['total_packets'] + total_lost = comp_tracker['lost_packets'] + loss_rate = (total_lost / total_expected) * 100.0 if total_expected > 0 else 0.0 + comp_tracker['loss_rate'] = loss_rate + + # 更新全局統計(按sysid匯總所有component) + total_received = 0 + total_lost_all = 0 + total_expected_all = 0 + + for comp_id, comp_data in tracker.items(): + total_expected_all += comp_data['total_packets'] + total_lost_all += comp_data['lost_packets'] + total_received += comp_data['total_packets'] - comp_data['lost_packets'] + + overall_loss_rate = (total_lost_all / total_expected_all) * 100.0 if total_expected_all > 0 else 0.0 + + packet_loss_stats[sysid] = { + 'loss_rate': overall_loss_rate, + 'total_received': total_received, + 'total_lost': total_lost_all + } + + # 記錄到歷史數據 + now = time.time() + packet_loss_history[sysid].append(overall_loss_rate) + packet_loss_time_history[sysid].append(now) + + return overall_loss_rate + +def build_api_tx_frame(data: bytes, dest_addr64: bytes, frame_id=0x01) -> bytes: + frame_type = 0x10 + dest_addr16 = b'\xFF\xFE' + broadcast_radius = 0x00 + options = 0x00 + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + frame += dest_addr64 + dest_addr16 + frame += struct.pack(">BB", broadcast_radius, options) + data + checksum = 0xFF - (sum(frame) & 0xFF) + return b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + +class SerialToUDP(asyncio.Protocol): + def __init__(self, udp_protocol, serial_port): + self.udp_protocol = udp_protocol + self.serial_port = serial_port + self.buffer = bytearray() + + def connection_made(self, transport): + self.transport = transport + if hasattr(self.udp_protocol, 'set_serial_transport'): + self.udp_protocol.set_serial_transport(self) + print(f"[{self.serial_port}] Serial connection established.") + + def data_received(self, data): + self.buffer.extend(data) + while True: + if len(self.buffer) < 3: + return + if self.buffer[0] != 0x7E: + self.buffer.pop(0) + continue + length = (self.buffer[1] << 8) | self.buffer[2] + full_length = 3 + length + 1 + if len(self.buffer) < full_length: + return + frame = self.buffer[:full_length] + del self.buffer[:full_length] + + if hasattr(self.udp_protocol, 'send_udp'): + self.udp_protocol.send_udp(bytes(frame)) + + if frame[3] == 0x88 and frame[5:7] == b'DB': + status = frame[7] + if status == 0x00 and len(frame) > 8: + rssi_value = frame[8] + now = time.time() + + # === 優化 1:僅信任最近 0.5 秒內有接收 MAVLink 的 port + last_time = serial_last_mavlink_time.get(self.serial_port, 0) + if now - last_time <= 0.5: + sysid = serial_to_sysid.get(self.serial_port, None) + if sysid is not None: + rssi_history[sysid].append(-rssi_value) + time_history[sysid].append(now) + # print(f"[SYSID:{sysid}] RSSI = -{rssi_value} dBm") + else: + print(f"[{self.serial_port}] 找不到 sysid 對應,RSSI = -{rssi_value} dBm,已忽略") + else: + print(f"[{self.serial_port}] 超過 0.5 秒未接收 MAVLink,RSSI = -{rssi_value} dBm 已忽略") + else: + print(f"[{self.serial_port}] DB 指令失敗,狀態碼: {status}") + + def write_to_serial(self, data): + try: + api_frame = build_api_tx_frame(data, TARGET_ADDR64) + self.transport.write(api_frame) + except Exception as e: + print(f"[{self.serial_port} TX Error] 無法封裝或傳送資料: {e}") + + def send_at_command_db(self): + try: + frame_type = 0x08 + frame_id = 0x52 + at_command = b'DB' + parameter = b'' + frame = struct.pack(">B", frame_type) + struct.pack(">B", frame_id) + at_command + parameter + checksum = 0xFF - (sum(frame) & 0xFF) + api_frame = b'\x7E' + struct.pack(">H", len(frame)) + frame + struct.pack("B", checksum) + self.transport.write(api_frame) + except Exception as e: + print(f"[{self.serial_port}] 發送 DB 指令失敗: {e}") + +class UDPHandler(asyncio.DatagramProtocol): + def __init__(self, udp_port): + self.udp_port = udp_port + self.serial_transport = None + self.transport = None + self.mav_decoder = mavutil.mavlink.MAVLink(None) + + def connection_made(self, transport): + self.transport = transport + print(f"[UDP:{self.udp_port}] transport ready.") + + def set_serial_transport(self, serial_transport): + self.serial_transport = serial_transport + + def datagram_received(self, data, addr): + if self.serial_transport: + self.serial_transport.write_to_serial(data) + + def send_udp(self, data): + decoded_data = self.decapsulate_data(data) + if decoded_data is None: + return + self.decode_mavlink_data(decoded_data) + if self.transport: + self.transport.sendto(decoded_data, (UDP_REMOTE_IP, self.udp_port)) + + def decapsulate_data(self, data): + try: + if not data or data[0] != 0x7E: + return None + length = (data[1] << 8) | data[2] + if len(data) < length + 4: + return None + frame_type = data[3] + if frame_type == 0x90: + rf_data_start = 3 + 12 + return data[rf_data_start:3 + length] + else: + return None + except Exception as e: + print(f"[UDP:{self.udp_port} 解封錯誤] {e}") + return None + + def decode_mavlink_data(self, data): + try: + # 逐字節解析MAVLink + for byte in data: + msg = self.mav_decoder.parse_char(bytes([byte])) + if msg: + sysid = msg.get_srcSystem() + compid = msg.get_srcComponent() + seq = msg.get_seq() + + if sysid == 0: + continue + + if self.serial_transport: + serial_to_sysid[self.serial_transport.serial_port] = sysid + serial_last_mavlink_time[self.serial_transport.serial_port] = time.time() + self.serial_transport.send_at_command_db() + + # === 新增:計算丟包率 === + try: + loss_rate = calculate_packet_loss(sysid, compid, seq) + # print(f"[SYSID:{sysid}] 丟包率: {loss_rate:.1f}%, SEQ: {seq}") + except Exception as e: + print(f"[UDP:{self.udp_port}] 丟包率計算錯誤: {e}") + + except Exception as e: + print(f"[UDP:{self.udp_port} MAVLink Decode Error] {e}") + +def start_plotting(): + # 創建子圖:上方RSSI,下方丟包率 + fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8)) + + def update(frame): + ax1.clear() + ax2.clear() + + # === RSSI 圖 === + ax1.set_title("RSSI", fontsize=14) + ax1.set_xlabel("Time (s ago)") + ax1.set_ylabel("RSSI (dBm)") + ax1.set_xlim(30, 0) # 顯示最近10秒 + ax1.set_ylim(-100, -10) + ax1.grid(True, alpha=0.3) + + # === 丟包率圖 === + ax2.set_title("Packet Loss Rate", fontsize=14) + ax2.set_xlabel("Time (s ago)") + ax2.set_ylabel("Loss Rate (%)") + ax2.set_xlim(10, 0) + ax2.set_ylim(0, 100) + ax2.grid(True, alpha=0.3) + + now = time.time() + + # 顏色列表 + colors = ['blue', 'red', 'green', 'orange', 'purple', 'brown', 'pink', 'gray'] + + for i, sysid in enumerate(sorted(rssi_history.keys())): + color = colors[i % len(colors)] + + # === 繪製 RSSI === + rssi_recent_indices = [ + idx for idx, ts in enumerate(time_history[sysid]) if now - ts <= 30 + ] + if rssi_recent_indices: + rssi_t = [now - time_history[sysid][idx] for idx in rssi_recent_indices] + rssi_r = [rssi_history[sysid][idx] for idx in rssi_recent_indices] + if rssi_t and rssi_r: + ax1.plot(rssi_t, rssi_r, label=f"SYSID:{sysid}", color=color, linewidth=2) + + # 在RSSI圖上顯示當前數值 + if rssi_r: + latest_rssi = rssi_r[-1] + ax1.text(2, latest_rssi, f'{latest_rssi:.0f}dBm', + bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.7), + fontsize=10, fontweight='bold', color='white') + + # === 繪製丟包率 === + if sysid in packet_loss_history: + loss_recent_indices = [ + idx for idx, ts in enumerate(packet_loss_time_history[sysid]) if now - ts <= 10 + ] + if loss_recent_indices: + loss_t = [now - packet_loss_time_history[sysid][idx] for idx in loss_recent_indices] + loss_r = [packet_loss_history[sysid][idx] for idx in loss_recent_indices] + if loss_t and loss_r: + ax2.plot(loss_t, loss_r, label=f"SYSID:{sysid}", color=color, linewidth=2, marker='o', markersize=3) + + # 在丟包率圖上顯示當前數值和統計 + if loss_r: + latest_loss = loss_r[-1] + stats = packet_loss_stats[sysid] + + # 顯示丟包率百分比 + ax2.text(2, latest_loss + (i * 8), f'{latest_loss:.1f}%', + bbox=dict(boxstyle="round,pad=0.3", facecolor=color, alpha=0.7), + fontsize=10, fontweight='bold', color='white') + + # 在圖表左側顯示詳細統計 + stats_text = f"SYSID {sysid}:\nRecieved: {stats['total_received']}\nLoss: {stats['total_lost']}\nLoss Rate: {latest_loss:.1f}%" + ax2.text(28, 85 - (i * 20), stats_text, + bbox=dict(boxstyle="round,pad=0.5", facecolor=color, alpha=0.8), + fontsize=9, color='white', verticalalignment='top') + + # 設置圖例 + ax1.legend(loc="upper right", fontsize=10) + ax2.legend(loc="upper right", fontsize=10) + + ani = animation.FuncAnimation(fig, update, interval=1000) + plt.tight_layout() + plt.show() + +async def setup_bridge(config): + serial_port = config["serial_port"] + udp_port = config["udp_port"] + try: + ser = serial.Serial(serial_port, SERIAL_BAUDRATE) + ser.close() + except Exception as e: + print(f"[{serial_port}] 串口打開失敗: {e}") + return + loop = asyncio.get_running_loop() + udp_handler = UDPHandler(udp_port) + try: + await loop.create_datagram_endpoint( + lambda: udp_handler, + local_addr=('0.0.0.0', 0) + ) + except Exception as e: + print(f"[{serial_port}] 無法創建 UDP: {e}") + return + try: + serial_proto = SerialToUDP(udp_handler, serial_port) + await serial_asyncio.create_serial_connection( + loop, lambda: serial_proto, serial_port, baudrate=SERIAL_BAUDRATE + ) + except Exception as e: + print(f"[{serial_port}] 無法建立串口連接:{e}") + traceback.print_exc() + return + print(f"[{serial_port}] Serial <--> UDP:{udp_port} bridge ready.") + +async def main(): + tasks = [setup_bridge(cfg) for cfg in CONFIGS] + await asyncio.gather(*tasks) + await asyncio.Future() + +if __name__ == '__main__': + try: + threading.Thread(target=lambda: asyncio.run(main()), daemon=True).start() + start_plotting() + except KeyboardInterrupt: + print("使用者中斷程式") + except Exception as e: + print("程式執行錯誤:") + traceback.print_exc() \ No newline at end of file