From 7f7753d0b4bb3d1a9d43953d6043866c69701a0c Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Tue, 3 Jun 2025 02:04:51 +0800 Subject: [PATCH] =?UTF-8?q?1.=20(=E4=B8=AD=E7=B9=BC=E7=9A=84=E5=AD=98?= =?UTF-8?q?=E6=AA=94)=20=E6=BA=96=E5=82=99=E5=B0=8D=20mavlinkObject.py=20?= =?UTF-8?q?=E7=9A=84=20mavlink=5Fobject=20=E9=87=8D=E6=A7=8B=202.=20?= =?UTF-8?q?=E6=96=B0=E5=A2=9E=20ringBuffer.py=20=E4=BD=9C=E7=82=BA=20queue?= =?UTF-8?q?=20=E5=8F=96=E4=BB=A3=203.=20=E6=96=B0=E5=A2=9E=20tests=20?= =?UTF-8?q?=E8=B3=87=E6=96=99=E5=A4=BE=E5=88=86=E9=9B=A2=E7=A8=8B=E5=BC=8F?= =?UTF-8?q?=E8=88=87=E6=B8=AC=E8=A9=A6=E6=AA=94=E6=A1=88=204.=20asyncioMan?= =?UTF-8?q?ager.py=20=E6=98=AF=E5=8F=83=E8=80=83=E7=94=A8=E7=9A=84?= =?UTF-8?q?=E7=AF=84=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fc_network_adapter/asyncioManager.py | 290 ++++++++++++++++++ .../fc_network_adapter/devRun.py | 99 +++++- .../fc_network_adapter/mavlinkObject.py | 2 - .../fc_network_adapter/ringBuffer.py | 231 ++++++++++++++ src/fc_network_adapter/tests/__init__.py | 0 .../tests/test_asyncioManager.py | 246 +++++++++++++++ .../tests/test_ringBuffer.py | 152 +++++++++ 7 files changed, 1015 insertions(+), 5 deletions(-) create mode 100644 src/fc_network_adapter/fc_network_adapter/asyncioManager.py create mode 100644 src/fc_network_adapter/fc_network_adapter/ringBuffer.py create mode 100644 src/fc_network_adapter/tests/__init__.py create mode 100644 src/fc_network_adapter/tests/test_asyncioManager.py create mode 100644 src/fc_network_adapter/tests/test_ringBuffer.py diff --git a/src/fc_network_adapter/fc_network_adapter/asyncioManager.py b/src/fc_network_adapter/fc_network_adapter/asyncioManager.py new file mode 100644 index 0000000..9a9ba65 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/asyncioManager.py @@ -0,0 +1,290 @@ +import asyncio +import socket +import threading +import time +import logging +from typing import Dict, Any, Optional, List, Callable, Tuple +from pymavlink import mavutil +from dataclasses import dataclass, field +from .ringBuffer import RingBuffer + +# 設置日誌 +logger = logging.getLogger("async_io_manager") + +@dataclass +class Connection: + """表示一個 MAVLink 連接""" + socket_id: int + socket: Any # pymavlink socket 或相容物件 + buffer: RingBuffer = field(default_factory=lambda: RingBuffer(1024)) + last_activity: float = field(default_factory=time.time) + is_closed: bool = False + stats: Dict[str, int] = field(default_factory=lambda: {"rx_count": 0, "rx_bytes": 0, "errors": 0}) + +class AsyncIOManager: + """ + 異步 I/O 管理器 + 負責高效處理所有 MAVLink socket 的 I/O 操作 + """ + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + with cls._lock: + if cls._instance is None: + cls._instance = super(AsyncIOManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self._initialized = True + self.connections: Dict[int, Connection] = {} + self.next_socket_id = 0 + self._running = False + self._event_loop = None + self._thread = None + self.check_interval = 0.01 # 檢查間隔,單位:秒 + + def start(self): + """啟動異步 I/O 管理器""" + if self._running: + logger.warning("AsyncIOManager already running") + return + + self._running = True + self._thread = threading.Thread(target=self._run_event_loop, daemon=True) + self._thread.start() + logger.info("AsyncIOManager started") + + def _run_event_loop(self): + """在專用執行緒中執行 asyncio 事件循環""" + try: + # 創建新的事件循環 + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + self._event_loop = loop + + # 添加主要協程任務 + loop.create_task(self._process_all_connections()) + + # 運行事件循環 + loop.run_forever() + except Exception as e: + logger.error(f"Error in AsyncIOManager event loop: {e}") + finally: + if loop.is_running(): + loop.stop() + logger.info("AsyncIOManager event loop stopped") + + async def _process_all_connections(self): + """處理所有連接的主循環""" + while self._running: + for socket_id, conn in list(self.connections.items()): + if conn.is_closed: + continue + + try: + # 非阻塞模式讀取數據 + await asyncio.get_event_loop().run_in_executor( + None, self._read_from_connection, conn + ) + except Exception as e: + conn.stats["errors"] += 1 + logger.error(f"Error processing connection {socket_id}: {e}") + + # 短暫休息,讓出 CPU + await asyncio.sleep(self.check_interval) + + def _read_from_connection(self, conn: Connection): + """從連接讀取數據,此函數在執行器中運行""" + try: + # 使用 mavlink 的非阻塞讀取 + msg = conn.socket.recv_msg() + if msg: + # 收到消息,更新統計資訊 + conn.last_activity = time.time() + conn.stats["rx_count"] += 1 + conn.stats["rx_bytes"] += len(msg.get_msgbuf()) + + # 將消息放入緩衝區 + if not conn.buffer.put(msg): + # 緩衝區已滿,記錄錯誤 + logger.warning(f"Buffer full for connection {conn.socket_id}, message dropped") + except Exception as e: + conn.stats["errors"] += 1 + if "Connection refused" not in str(e): # 忽略常見的連接拒絕錯誤 + logger.error(f"Error reading from socket {conn.socket_id}: {e}") + + def stop(self): + """停止異步 I/O 管理器""" + if not self._running: + return + + self._running = False + + # 停止事件循環 + if self._event_loop: + asyncio.run_coroutine_threadsafe( + self._shutdown_loop(), self._event_loop + ) + + # 等待執行緒結束 + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=2.0) + if self._thread.is_alive(): + logger.warning("AsyncIOManager thread did not terminate cleanly") + + logger.info("AsyncIOManager stopped") + + async def _shutdown_loop(self): + """關閉事件循環""" + # 關閉所有連接 + for socket_id in list(self.connections.keys()): + self.remove_connection(socket_id) + + # 停止事件循環 + loop = asyncio.get_event_loop() + loop.stop() + + def register_connection(self, socket: Any) -> int: + """ + 註冊新的連接 + + Args: + socket: pymavlink socket 或相容物件 + + Returns: + int: 分配的 socket_id + """ + socket_id = self.next_socket_id + self.next_socket_id += 1 + + # 創建新的連接 + connection = Connection(socket_id=socket_id, socket=socket) + self.connections[socket_id] = connection + + logger.info(f"Registered new connection with ID {socket_id}") + return socket_id + + def remove_connection(self, socket_id: int) -> bool: + """ + 移除連接 + + Args: + socket_id: 連接 ID + + Returns: + bool: 成功移除返回 True + """ + if socket_id in self.connections: + conn = self.connections[socket_id] + conn.is_closed = True + + try: + # 關閉原始 socket + if hasattr(conn.socket, 'close'): + conn.socket.close() + except Exception as e: + logger.error(f"Error closing socket {socket_id}: {e}") + + # 從字典中移除 + self.connections.pop(socket_id) + logger.info(f"Removed connection with ID {socket_id}") + return True + return False + + def get_buffer(self, socket_id: int) -> Optional[RingBuffer]: + """ + 獲取指定連接的緩衝區 + + Args: + socket_id: 連接 ID + + Returns: + RingBuffer 或 None(如果連接不存在) + """ + if socket_id in self.connections: + return self.connections[socket_id].buffer + return None + + def get_stats(self, socket_id: int = None) -> Dict: + """ + 獲取連接統計資訊 + + Args: + socket_id: 指定連接 ID,None 表示所有連接 + + Returns: + Dict: 統計資訊 + """ + if socket_id is not None and socket_id in self.connections: + return self.connections[socket_id].stats + + # 彙總所有連接的統計資訊 + total_stats = {"rx_count": 0, "rx_bytes": 0, "errors": 0, "connection_count": len(self.connections)} + for conn in self.connections.values(): + for key, value in conn.stats.items(): + if key in total_stats: + total_stats[key] += value + return total_stats + + def send_message(self, socket_id: int, message: Any) -> bool: + """ + 發送 MAVLink 訊息 + + Args: + socket_id: 連接 ID + message: 要發送的訊息 + + Returns: + bool: 成功發送返回 True + """ + if socket_id not in self.connections: + return False + + conn = self.connections[socket_id] + if conn.is_closed: + return False + + try: + # 使用 asyncio.run_coroutine_threadsafe 在事件循環中執行 IO 操作 + future = asyncio.run_coroutine_threadsafe( + self._send_message_async(conn, message), + self._event_loop + ) + # 等待操作完成(可選) + return future.result(timeout=1.0) + except Exception as e: + logger.error(f"Error sending message on socket {socket_id}: {e}") + return False + + async def _send_message_async(self, conn: Connection, message: Any) -> bool: + """在事件循環中異步發送訊息""" + try: + # 調用執行器進行實際發送操作 + return await asyncio.get_event_loop().run_in_executor( + None, self._send_message_sync, conn, message + ) + except Exception as e: + conn.stats["errors"] += 1 + logger.error(f"Async error sending message: {e}") + return False + + def _send_message_sync(self, conn: Connection, message: Any) -> bool: + """同步發送訊息函數,在執行器中運行""" + try: + if hasattr(message, 'get_msgbuf'): + # 如果是 MAVLink 訊息對象 + conn.socket.write(message.get_msgbuf()) + else: + # 假設是已編碼的字節 + conn.socket.write(message) + conn.last_activity = time.time() + return True + except Exception as e: + conn.stats["errors"] += 1 + logger.error(f"Error in sync message send: {e}") + return False \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/devRun.py b/src/fc_network_adapter/fc_network_adapter/devRun.py index 110fb61..c638659 100644 --- a/src/fc_network_adapter/fc_network_adapter/devRun.py +++ b/src/fc_network_adapter/fc_network_adapter/devRun.py @@ -2,6 +2,7 @@ import queue import time + # ROS2 的 import import rclpy @@ -14,10 +15,16 @@ import mavlinkDevice as md # ====================== 分割線 ===================== -test_item = 51 -running_time = 30 +test_item = 12 +running_time = 3000 print('test_item : ', test_item) +''' +測試項 個位數 表示分離的功能 + +測試項 1X 表示 mavlink_object 的功能 測試連線的能力 +''' + if test_item == 1: # 測試 mavlink_object 中 updateMultiplexingList 的輸出 print('===> Start of Program .Test ', test_item) @@ -140,7 +147,7 @@ elif test_item == 11: analyzer = mo.mavlink_bridge() # 創建通道 - connection_string="udp:127.0.0.1:14550" + connection_string="udp:127.0.0.1:15551" mavlink_socket = mavutil.mavlink_connection(connection_string) mavlink_object2 = mo.mavlink_object(mavlink_socket) # mavlink_object2.multiplexingToAnalysis = [0] # only HEARTBEAT @@ -169,6 +176,7 @@ elif test_item == 11: print("目前遺失的訊息數量 : ",analyzer.mavlink_systems[sysid].components[compid].lost_packet_count) print("現在飛機的模式 : ",analyzer.mavlink_systems[sysid].components[compid].emitParams['flightMode_mode']) analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) + print(mavlink_socket.last_seq) print("===================") # 結束程式 退出所有 thread @@ -254,6 +262,91 @@ elif test_item == 12: analyzer.stop() print('<=== End of Program') +elif test_item == 13: + # 這邊測試看看能否狸貓換太子的方式 + # 把原來 pymavlink 的 mavtcpin 換成 unix socket + print('===> Start of Program .Test ', test_item) + + import os + import socket + + # 建立一個 mavtcpin 實例 + import mavunixin + + + + + # ===== 以下就是按照原本測試 12 的流程 ===== + # 啟動連線的模組 + analyzer = mo.mavlink_bridge() + + # 初始化輸入通道 + mavlink_socket_unix = mavunixin.mavunixin("unix:/tmp/unix_socket_mavlink.sock", source_system=1, source_component=1) + mavlink_object_in = mo.mavlink_object(mavlink_socket_unix) + mavlink_object_in.multiplexingToAnalysis = [0] # only HEARTBEAT + + # 停留五秒 我要啟動另一邊 + time.sleep(5) + + # 初始化輸出通道 + connection_string_out="udpout:127.0.0.1:14553" + mavlink_socket_out = mavutil.mavlink_connection(connection_string_out) + mavlink_object_out = mo.mavlink_object(mavlink_socket_out) + mavlink_object_out.multiplexingToAnalysis = [0] + + # 做一個空的通道驗證 可以拿來 debug + mavlink_object_none = mo.mavlink_object(None) + + # 讓兩個通道互相傳輸 + mavlink_object_in.multiplexingToSwap[mavlink_object_out.socket_id] = [-1, ] # all + mavlink_object_out.multiplexingToSwap[mavlink_object_in.socket_id] = [-1, ] # all + # mavlink_object_out.multiplexingToSwap[mavlink_object_none.socket_id] = [-1, ] # all + + # 啟動通道 + mavlink_object_in.run() + mavlink_object_out.run() + + # 做點延遲 讓 heartbeat 先吃進來 + time.sleep(3) + print("=== connection established! ===") + + print('目前所有的系統 : ') + for sysid in analyzer.mavlink_systems: + print(analyzer.mavlink_systems[sysid]) + + # print(type(mavlink_socket_out)) + # print(type(mavlink_socket_out.mav)) + + start_time = time.time() + show_time = time.time() + while time.time() - start_time < running_time: + try: + test = mo.swap_queues[mavlink_object_none.socket_id].get(block=False) + print('none object : ', test) + except queue.Empty: + pass + + if (time.time() - show_time) >= 2: + show_time = time.time() + for sysid in analyzer.mavlink_systems: + for compid in analyzer.mavlink_systems[sysid].components: + print("Sysid : {} ,目前收到的訊息數量 : {}".format(sysid, analyzer.mavlink_systems[sysid].components[compid].msg_count)) + analyzer.mavlink_systems[sysid].resetComponentPacketCount(compid) + print("===================") + + + + # 結束程式 退出所有 thread + mavlink_object_in.stop() + mavlink_object_in.thread.join() + mavlink_object_out.stop() + mavlink_object_out.thread.join() + mavlink_socket_unix.close() + mavlink_socket_out.close() + + analyzer.stop() + print('<=== End of Program') + elif test_item == 20: # 這邊測試 node 生成 topic 的功能 # 利用 空的通道 發出假的 heartbeat 封包 diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index 1589150..b55107c 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -307,7 +307,6 @@ class mavlink_object(): self.running = False def _run_thread(self): - logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id)) logger.info('Start of mavlink_object._run_thread id : {}'.format(self.socket_id)) start_time = time.time() while self.running: @@ -317,7 +316,6 @@ class mavlink_object(): try: mavlinkMsg = self.mavlink_socket.recv_msg() except Exception as e: - logger.critical(f"Receiving data not mavlink format. Object Delete.") logger.critical(f"Receiving data not mavlink format. Object Delete.") print(f"[mavlinkObject.py] Error receiving mavlink data: {e}") print(mavlinkMsg) diff --git a/src/fc_network_adapter/fc_network_adapter/ringBuffer.py b/src/fc_network_adapter/fc_network_adapter/ringBuffer.py new file mode 100644 index 0000000..6728426 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/ringBuffer.py @@ -0,0 +1,231 @@ +# import array +import threading +import ctypes +from typing import Any, Optional, Tuple + +class RingBuffer: + """ + 高效能無鎖環形緩衝區,使用原子操作避免鎖定 + 用於在不同執行緒間高效傳遞資料 + """ + # 緩衝區計數器,用於自動分配 buffer_id + _buffer_counter = 0 + _counter_lock = threading.Lock() + + def __init__(self, capacity: int = 256, buffer_id: int = None): + """ + 初始化環形緩衝區 + + Args: + capacity: 緩衝區容量 (必須是 2 的次方) + buffer_id: 緩衝區 ID,如果為 None 則自動分配 + """ + # 確保容量是 2 的次方,便於使用位運算取模 + if capacity & (capacity - 1) != 0: + # 找到大於等於 capacity 的最小 2 的次方 + capacity = 1 << (capacity - 1).bit_length() + + # 分配緩衝區 ID + if buffer_id is None: + with RingBuffer._counter_lock: + buffer_id = RingBuffer._buffer_counter + RingBuffer._buffer_counter += 1 + + self.buffer_id = buffer_id + self.capacity = capacity + self.mask = capacity - 1 # 用於快速取模 + self.buffer = [None] * capacity + + # 原子計數器作為讀寫指標 + self.write_index = ctypes.c_uint64(0) + self.read_index = ctypes.c_uint64(0) + + # 用於檢測上次操作的執行緒 ID + self._last_read_thread = None + self._last_write_thread = None + + # 添加同時讀寫統計 + self.concurrent_write_count = ctypes.c_uint64(0) # 同時寫入計數 + self.concurrent_read_count = ctypes.c_uint64(0) # 同時讀取計數 + self.total_write_count = ctypes.c_uint64(0) # 總寫入操作計數 + self.total_read_count = ctypes.c_uint64(0) # 總讀取操作計數 + self.overflow_count = ctypes.c_uint64(0) # 緩衝區溢出次數 + + # 記錄各執行緒的操作次數 + self.thread_write_counts = {} # {thread_id: count} + self.thread_read_counts = {} # {thread_id: count} + + # 用於保護統計數據的鎖(僅用於統計,不影響主要讀寫操作) + self._stats_lock = threading.Lock() + + def put(self, item: Any) -> bool: + """ + 將項目存入緩衝區 + + Args: + item: 要存入的項目 + + Returns: + bool: 成功寫入返回 True,緩衝區已滿返回 False + """ + # 更新寫入統計 + self.total_write_count.value += 1 + + # 檢測上次寫入的執行緒 + current_thread = threading.get_ident() + + # 記錄當前執行緒寫入次數 + with self._stats_lock: + self.thread_write_counts[current_thread] = self.thread_write_counts.get(current_thread, 0) + 1 + + # 檢測是否為不同執行緒同時寫入 + if self._last_write_thread is not None and current_thread != self._last_write_thread: + self.concurrent_write_count.value += 1 + + self._last_write_thread = current_thread + + # 原子獲取當前寫入位置 + current = self.write_index.value + next_pos = (current + 1) & self.mask + + # 檢查緩衝區是否已滿 + if next_pos == self.read_index.value: + self.overflow_count.value += 1 + return False + + # 寫入資料並原子更新寫入位置 + self.buffer[current] = item + self.write_index.value = next_pos + return True + + def get(self) -> Optional[Any]: + """ + 從緩衝區讀取項目 + + Returns: + 項目或 None(如果緩衝區為空) + """ + # 更新讀取統計 + self.total_read_count.value += 1 + + # 檢測上次讀取的執行緒 + current_thread = threading.get_ident() + + # 記錄當前執行緒讀取次數 + with self._stats_lock: + self.thread_read_counts[current_thread] = self.thread_read_counts.get(current_thread, 0) + 1 + + # 檢測是否為不同執行緒同時讀取 + if self._last_read_thread is not None and current_thread != self._last_read_thread: + self.concurrent_read_count.value += 1 + + self._last_read_thread = current_thread + + # 檢查緩衝區是否為空 + if self.read_index.value == self.write_index.value: + return None + + # 原子獲取當前讀取位置並讀取資料 + current = self.read_index.value + item = self.buffer[current] + + # 原子更新讀取位置 + self.read_index.value = (current + 1) & self.mask + return item + + def get_all(self) -> list: + """ + 獲取緩衝區中的所有項目 + + Returns: + list: 所有可用項目的列表 + """ + items = [] + while True: + item = self.get() + if item is None: + break + items.append(item) + return items + + def size(self) -> int: + # 返回目前緩衝區內項目數量 + return (self.write_index.value - self.read_index.value) & self.mask + + def is_empty(self) -> bool: + # 檢查緩衝區是否為空 + return self.read_index.value == self.write_index.value + + def is_full(self) -> bool: + # 檢查緩衝區是否已滿 + return ((self.write_index.value + 1) & self.mask) == self.read_index.value + + def clear(self) -> None: + """清空緩衝區""" + self.read_index.value = self.write_index.value + + def get_stats(self) -> dict: + """ + 獲取緩衝區統計資訊 + + Returns: + dict: 包含各種統計數據的字典 + """ + with self._stats_lock: + stats = { + "buffer_id": self.buffer_id, + "capacity": self.capacity, + "current_size": self.size(), + "is_full": self.is_full(), + "is_empty": self.is_empty(), + "total_writes": self.total_write_count.value, + "total_reads": self.total_read_count.value, + "concurrent_writes": self.concurrent_write_count.value, + "concurrent_reads": self.concurrent_read_count.value, + "overflow_count": self.overflow_count.value, + "write_threads": len(self.thread_write_counts), + "read_threads": len(self.thread_read_counts), + "concurrent_write_ratio": self.concurrent_write_count.value / max(1, self.total_write_count.value), + "concurrent_read_ratio": self.concurrent_read_count.value / max(1, self.total_read_count.value), + "top_writer_threads": sorted(self.thread_write_counts.items(), key=lambda x: x[1], reverse=True)[:3], + "top_reader_threads": sorted(self.thread_read_counts.items(), key=lambda x: x[1], reverse=True)[:3], + } + return stats + + def print_stats(self) -> None: + """輸出當前緩衝區統計資訊""" + stats = self.get_stats() + + print(f"\n=== RingBuffer #{stats['buffer_id']} Statistics ===") + print(f"Capacity: {stats['capacity']}, Current Size: {stats['current_size']}") + print(f"Total Write Operations: {stats['total_writes']}") + print(f"Total Read Operations: {stats['total_reads']}") + print(f"Concurrent Write Events: {stats['concurrent_writes']} ({stats['concurrent_write_ratio']:.2%})") + print(f"Concurrent Read Events: {stats['concurrent_reads']} ({stats['concurrent_read_ratio']:.2%})") + print(f"Buffer Overflow Count: {stats['overflow_count']}") + print(f"Writing Threads: {stats['write_threads']}") + print(f"Reading Threads: {stats['read_threads']}") + + print("Top Writer Threads:") + for thread_id, count in stats['top_writer_threads']: + print(f" Thread {thread_id}: {count} writes") + + print("Top Reader Threads:") + for thread_id, count in stats['top_reader_threads']: + print(f" Thread {thread_id}: {count} reads") + print("=============================\n") + + def reset_stats(self) -> None: + """重置所有統計數據""" + with self._stats_lock: + self.concurrent_write_count.value = 0 + self.concurrent_read_count.value = 0 + self.total_write_count.value = 0 + self.total_read_count.value = 0 + self.overflow_count.value = 0 + self.thread_write_counts.clear() + self.thread_read_counts.clear() + + def __str__(self) -> str: + """返回緩衝區的字符串表示""" + return f"RingBuffer(id={self.buffer_id}, capacity={self.capacity}, size={self.size()})" \ No newline at end of file diff --git a/src/fc_network_adapter/tests/__init__.py b/src/fc_network_adapter/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/fc_network_adapter/tests/test_asyncioManager.py b/src/fc_network_adapter/tests/test_asyncioManager.py new file mode 100644 index 0000000..27a790a --- /dev/null +++ b/src/fc_network_adapter/tests/test_asyncioManager.py @@ -0,0 +1,246 @@ +import os +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import time +import threading +import socket +import random +from pymavlink import mavutil +from pymavlink.dialects.v20 import common as mavlink2 + +from fc_network_adapter.asyncioManager import AsyncIOManager, Connection + +class MockMAVLinkSocket: + """模擬 pymavlink socket 行為的類,用於測試""" + + def __init__(self, name="mock", simulate_errors=False): + self.name = name + self.simulate_errors = simulate_errors + self.is_closed = False + self.received_msgs = [] + self.seq = 0 + + def recv_msg(self): + """模擬接收 MAVLink 消息""" + if self.is_closed: + return None + + # 隨機模擬網絡延遲 + time.sleep(random.uniform(0.01, 0.05)) + + # 偶爾模擬錯誤 + if self.simulate_errors and random.random() < 0.1: + raise Exception("Simulated network error") + + # 70% 的機率返回消息,30% 返回 None (模擬無數據可讀) + if random.random() > 0.3: + msg = self._generate_mavlink_message() + self.received_msgs.append(msg) + return msg + + return None + + def _generate_mavlink_message(self): + """生成模擬 MAVLink 消息""" + # 創建 MAVLink 實例 + mav = mavlink2.MAVLink(self) + + # 設置來源系統和組件 + mav.srcSystem = random.randint(1, 5) + mav.srcComponent = random.randint(1, 5) + + # 創建 HEARTBEAT 消息 + msg = mavlink2.MAVLink_heartbeat_message( + type=mavlink2.MAV_TYPE_QUADROTOR, + autopilot=mavlink2.MAV_AUTOPILOT_GENERIC, + base_mode=0, + custom_mode=0, + system_status=0, + mavlink_version=3 + ) + + # 打包消息 + msg.pack(mav) + self.seq += 1 + return msg + + def write(self, data): + """模擬發送數據""" + if self.is_closed: + raise Exception("Socket is closed") + + # 模擬寫入操作 + time.sleep(random.uniform(0.001, 0.01)) + + # 偶爾模擬錯誤 + if self.simulate_errors and random.random() < 0.1: + raise Exception("Simulated write error") + + return len(data) + + def close(self): + """關閉連接""" + self.is_closed = True + +def create_mock_mavlink_sockets(count=3): + """創建多個模擬 MAVLink socket""" + sockets = [] + for i in range(count): + # 每三個 socket 中有一個模擬錯誤 + simulate_errors = (i % 3 == 0) + socket = MockMAVLinkSocket(f"mock_{i}", simulate_errors) + sockets.append(socket) + return sockets + +def monitor_thread(io_manager): + """監視 AsyncIOManager 狀態的執行緒""" + print("Starting monitoring thread...") + + try: + while True: + # 獲取總體統計資訊 + stats = io_manager.get_stats() + print(f"\n=== AsyncIOManager Stats ===") + print(f"Total connections: {stats.get('connection_count', 0)}") + print(f"Total messages received: {stats.get('rx_count', 0)}") + print(f"Total bytes received: {stats.get('rx_bytes', 0)}") + print(f"Total errors: {stats.get('errors', 0)}") + + # 獲取每個連接的緩衝區狀態 + for socket_id in io_manager.connections: + buffer = io_manager.get_buffer(socket_id) + conn_stats = io_manager.get_stats(socket_id) + + if buffer and not io_manager.connections[socket_id].is_closed: + print(f"Connection {socket_id}: Buffer size={buffer.size()}, " + + f"Messages={conn_stats.get('rx_count', 0)}, " + + f"Last activity={time.time() - io_manager.connections[socket_id].last_activity:.1f}s ago") + + # 每秒更新一次 + time.sleep(1.0) + except KeyboardInterrupt: + print("Monitoring thread stopped.") + +def consumer_thread(io_manager, socket_id): + """從指定連接的緩衝區消費消息""" + print(f"Starting consumer for socket {socket_id}...") + + try: + while True: + # 獲取緩衝區 + buffer = io_manager.get_buffer(socket_id) + if not buffer: + print(f"Consumer {socket_id}: Buffer not found or connection closed") + break + + # 獲取所有消息 + messages = buffer.get_all() + if messages: + print(f"Consumer {socket_id}: Received {len(messages)} messages") + for msg in messages: + # 處理每條消息 (這裡只是示例,實際應用中可能有更複雜的處理) + print(f" - Message from SysID={msg.get_srcSystem()}, CompID={msg.get_srcComponent()}, Type={msg.get_type()}") + + # 短暫休眠,避免 CPU 空轉 + time.sleep(0.2) + except Exception as e: + print(f"Consumer {socket_id} stopped: {e}") + +def send_command_thread(io_manager, socket_ids): + """定期向所有連接發送命令""" + print("Starting command sender thread...") + + # 創建一個 MAVLink 實例用於生成消息 + mav = mavlink2.MAVLink(None) + mav.srcSystem = 255 # GCS system ID + mav.srcComponent = 0 + + try: + count = 0 + while True: + # 等待 2 秒 + time.sleep(2.0) + count += 1 + + # 創建一個命令消息 + msg = mavlink2.MAVLink_command_long_message( + target_system=1, + target_component=1, + command=mavlink2.MAV_CMD_REQUEST_PROTOCOL_VERSION, + confirmation=0, + param1=0, param2=0, param3=0, + param4=0, param5=0, param6=0, param7=0 + ) + + # 向所有連接發送消息 + for socket_id in socket_ids: + if socket_id in io_manager.connections and not io_manager.connections[socket_id].is_closed: + try: + # 打包並發送消息 + result = io_manager.send_message(socket_id, msg) + print(f"Sent command to socket {socket_id}: {'Success' if result else 'Failed'}") + except Exception as e: + print(f"Error sending to socket {socket_id}: {e}") + + # 每 10 次迭代關閉一個連接 (用於演示關閉功能) + if count % 10 == 0 and socket_ids: + socket_id = socket_ids.pop(0) + print(f"Closing connection {socket_id}...") + io_manager.remove_connection(socket_id) + except Exception as e: + print(f"Command sender thread stopped: {e}") + +def main(): + print("AsyncIOManager Example") + print("======================") + + # 創建 AsyncIOManager 實例 + io_manager = AsyncIOManager() + + # 啟動 I/O 管理器 + io_manager.start() + print("AsyncIOManager started") + + # 創建模擬 MAVLink socket + mock_sockets = create_mock_mavlink_sockets(5) + socket_ids = [] + + # 註冊連接 + for i, socket in enumerate(mock_sockets): + socket_id = io_manager.register_connection(socket) + socket_ids.append(socket_id) + print(f"Registered socket {i} with ID {socket_id}") + + # 啟動監視執行緒 + monitor = threading.Thread(target=monitor_thread, args=(io_manager,)) + monitor.daemon = True + monitor.start() + + # 為每個連接啟動消費者執行緒 + consumers = [] + for socket_id in socket_ids: + consumer = threading.Thread(target=consumer_thread, args=(io_manager, socket_id)) + consumer.daemon = True + consumer.start() + consumers.append(consumer) + + # 啟動命令發送執行緒 + sender = threading.Thread(target=send_command_thread, args=(io_manager, socket_ids.copy())) + sender.daemon = True + sender.start() + + print("\nPress Ctrl+C to exit...") + try: + # 運行一段時間然後退出 + time.sleep(30) + except KeyboardInterrupt: + pass + finally: + # 清理資源 + print("\nShutting down...") + io_manager.stop() + print("AsyncIOManager stopped") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/fc_network_adapter/tests/test_ringBuffer.py b/src/fc_network_adapter/tests/test_ringBuffer.py new file mode 100644 index 0000000..563087b --- /dev/null +++ b/src/fc_network_adapter/tests/test_ringBuffer.py @@ -0,0 +1,152 @@ +import os +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import time +import threading + +from fc_network_adapter.ringBuffer import RingBuffer + + +def producer(buffer, count, interval=0.01): + """生產者:向緩衝區添加資料""" + print(f"Producer started (thread {threading.get_ident()})") + for i in range(count): + # 嘗試寫入數據,直到成功 + while not buffer.put(f"Item-{i}"): + print(f"Buffer full, producer waiting... (thread {threading.get_ident()})") + time.sleep(0.1) + + print(f"Produced: Item-{i}, buffer size: {buffer.size()}") + time.sleep(interval) # 模擬生產過程 + + print(f"Producer finished (thread {threading.get_ident()})") + +def consumer(buffer, max_items, interval=0.05): + """消費者:從緩衝區讀取資料""" + print(f"Consumer started (thread {threading.get_ident()})") + items_consumed = 0 + + while items_consumed < max_items: + # 嘗試讀取數據 + item = buffer.get() + if item: + print(f"Consumed: {item}, buffer size: {buffer.size()}") + items_consumed += 1 + else: + print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})") + + time.sleep(interval) # 模擬消費過程 + + print(f"Consumer finished (thread {threading.get_ident()})") + +def batch_consumer(buffer, interval=0.2): + """批量消費者:一次性讀取緩衝區中的所有資料""" + print(f"Batch consumer started (thread {threading.get_ident()})") + + for _ in range(5): # 執行5次批量讀取 + time.sleep(interval) # 等待緩衝區積累數據 + items = buffer.get_all() + if items: + print(f"Batch consumed {len(items)} items: {items}") + else: + print("Buffer empty for batch consumer") + + print(f"Batch consumer finished (thread {threading.get_ident()})") + +def demonstrate_multi_writer(): + """示範多個寫入執行緒同時操作緩衝區""" + print("\n=== Demonstrating Multiple Writers ===") + buffer = RingBuffer(capacity=80) + + # 創建多個生產者執行緒 + threads = [] + for i in range(3): + thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1))) + threads.append(thread) + thread.start() + + # 等待所有執行緒完成 + for thread in threads: + thread.join() + + buffer.print_stats() # 印出統計資訊 + + # 讀出所有剩餘資料 + remaining = buffer.get_all() + print(f"Remaining items in buffer after multiple writers: {remaining}") + +def demonstrate_basic_usage(): + """示範基本使用方式""" + print("\n=== Demonstrating Basic Usage ===") + # 創建緩衝區 + buffer = RingBuffer(capacity=20, buffer_id=7) + + # 檢查初始狀態 + print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") + + # 添加幾個項目 + for i in range(5): + buffer.put(f"Test-{i}") + + # 檢查狀態 + print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") + + # 讀取一個項目 + item = buffer.get() + print(f"Read item: {item}") + print(f"After reading 1 item - Content Size: {buffer.size()}") + + # 添加更多項目直到滿 + items_added = 0 + while not buffer.is_full(): + buffer.put(f"Fill-{items_added}") + items_added += 1 + + print(f"Added {items_added} more items until full") + print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") + + # 嘗試添加到已滿的緩衝區 + result = buffer.put("Overflow") + print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}") + + # 獲取所有項目 + all_items = buffer.get_all() + print(f"All items in buffer: {all_items}") + print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}") + + # 印出統計資訊 + buffer.print_stats() + +def demonstrate_producer_consumer(): + """示範生產者-消費者模式""" + print("\n=== Demonstrating Producer-Consumer Pattern ===") + buffer = RingBuffer(capacity=16) + + # 創建生產者和消費者執行緒 + producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1)) + consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2)) + batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5)) + + # 啟動執行緒 + producer_thread.start() + consumer_thread.start() + batch_thread.start() + + # 等待執行緒完成 + producer_thread.join() + consumer_thread.join() + batch_thread.join() + + # 檢查最終狀態 + print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}") + + buffer.print_stats() + +if __name__ == "__main__": + # 展示各種使用場景 + # demonstrate_basic_usage() + # demonstrate_producer_consumer() + demonstrate_multi_writer() + + print("\nAll demonstrations completed!") \ No newline at end of file