From 7af138b02a22198aaa02d0fd882f3dabb954b58e Mon Sep 17 00:00:00 2001 From: Chiyu Chen Date: Wed, 12 Nov 2025 20:26:49 +0800 Subject: [PATCH] =?UTF-8?q?1.=20=E8=AA=BF=E6=95=B4=E6=AA=94=E6=A1=88?= =?UTF-8?q?=E7=B5=90=E6=A7=8B=20=E8=AE=8A=E5=8B=95=20import=20=E7=9A=84?= =?UTF-8?q?=E8=B7=AF=E5=BE=91=E8=88=87=E6=96=B9=E6=B3=95=202.=20=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=20mainOrchestrator.py=20=E4=BD=9C=E7=82=BA=E6=8E=A5?= =?UTF-8?q?=E4=B8=8B=E4=BE=86=E9=96=8B=E7=99=BC=E4=BB=8B=E9=9D=A2=E5=8C=96?= =?UTF-8?q?=E7=B5=B1=E5=90=88=E5=B7=A5=E5=85=B7=E7=9A=84=E4=B8=BB=E8=A6=81?= =?UTF-8?q?=E6=AA=94=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 11 +- .../fc_network_adapter/mainOrchestrator.py | 262 +++++++++++ .../fc_network_adapter/mavlinkDevice.py | 6 +- .../fc_network_adapter/mavlinkObject.py | 15 +- .../fc_network_adapter/mavlinkPublish.py | 6 +- .../fc_network_adapter/test_mavlinkObject.py | 0 .../fc_network_adapter/utils/__init__.py | 7 + .../{ => utils}/ringBuffer.py | 0 .../{ => utils}/theLogger.py | 2 +- src/fc_network_adapter/setup.py | 1 + .../tests/{devRun.py => demo_integration.py} | 6 +- .../tests/demo_ringBuffer.py | 152 +++++++ .../tests/test_mavlinkObject.py | 3 +- .../tests/test_ringBuffer.py | 418 ++++++++++++------ 14 files changed, 735 insertions(+), 154 deletions(-) create mode 100644 src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py delete mode 100644 src/fc_network_adapter/fc_network_adapter/test_mavlinkObject.py create mode 100644 src/fc_network_adapter/fc_network_adapter/utils/__init__.py rename src/fc_network_adapter/fc_network_adapter/{ => utils}/ringBuffer.py (100%) rename src/fc_network_adapter/fc_network_adapter/{ => utils}/theLogger.py (93%) rename src/fc_network_adapter/tests/{devRun.py => demo_integration.py} (98%) create mode 100644 src/fc_network_adapter/tests/demo_ringBuffer.py diff --git a/README.md b/README.md index 7fd8868..8e81de0 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,15 @@ Package 簡述 2. fc_network_adapter 建立、維護與飛控韌體的連接 構築 mavlink 封包 - 同時處理與 Gazebo 的 ardupilot_plugin 溝通的 FDM/JSON 訊息 處理無線模組的通訊格式 (XBee) + --同時處理與 Gazebo 的 ardupilot_plugin 溝通的 FDM/JSON 訊息 (移除)-- + +N. logs 是執行時期的記錄檔 + +=== +請一律在 ~/AirTrapMine/src/ 資料夾下 以模組化啟動程式 + +例如 在 ~/AirTrapMine/src/ 資料夾下 +> python -m fc_network_adapter.fc_network_adapter.mainOrchestrator +> python -m fc_network_adapter.tests.test_ringBuffer diff --git a/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py new file mode 100644 index 0000000..805f1eb --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/mainOrchestrator.py @@ -0,0 +1,262 @@ + +''' + +主要調配流程的程式 + +這個檔案包含 Terminal Utility Layer (TUL) 作為人機互動介面,並調用 mavlinkDevice 和 mavlinkObject 來處理 MAVLink 通訊和物件管理。 + +''' + + +import os +import time +import sys + +import curses +import threading +import queue +import signal + +# 自定義的 import +from . import mavlinkObject as mo +from . import mavlinkDevice as md +from .utils import RingBuffer, setup_logger + +logger = setup_logger(os.path.basename(__file__)) + + +class MenuNode: + def __init__(self, name, desc="", action=None, children=None): + self.name = name + self.desc = desc + self.action = action # 可以是函式或特殊字串 + self.children = children or [] # 子選單列表 + +class ControlPanel: + def __init__(self): + pass + + def menu_tree(self): + """建立多層選單結構""" + return MenuNode("Main Menu", children=[ + MenuNode("MavLink Object Control", children=[ + MenuNode("New+", "新增一個連結口", "ADD_MAV_OBJECT"), + MenuNode("Remove", "移除某個連結口", "REMOVE_MAV_OBJECT"), + MenuNode("ListAll", "顯示連結口列表", "LIST_MAV_OBJECT"), + MenuNode("Inspection", "顯示連結口資訊", "INSPECT_MAV_OBJECT"), + ]), + MenuNode("參數設定", children=[ + MenuNode("Speed", children=[ + MenuNode("Increase Speed", "增加速度", "ADJUST_SPEED"), + MenuNode("Decrease Speed", "減少速度", "ADJUST_SPEED"), + ]), + MenuNode("輸入文字", "鍵盤輸入模式", "INPUT_TEXT"), + ]), + MenuNode("資訊查看", children=[ + MenuNode("顯示狀態", "查看當前狀態", "SHOW_STATUS"), + MenuNode("顯示輸入", "查看用戶輸入", "SHOW_INPUT"), + ]), + MenuNode("返回上層", "回到上一層選單", "BACK"), + MenuNode("關閉面板", "關閉控制面板", "QUIT"), + ]) + + def panel_thread(self, cmd_q: queue.Queue, stop_evt: threading.Event): + stdscr = None + + def cleanup(): + """清理 curses 狀態""" + if stdscr: + stdscr.keypad(False) + curses.nocbreak() + curses.echo() + curses.endwin() + + def draw_menu(screen): + nonlocal stdscr + stdscr = screen + + curses.curs_set(0) + stdscr.nodelay(False) # 阻塞讀鍵 + stdscr.keypad(True) + + # 選單導航狀態 + menu_stack = [self.menu_tree()] # 選單堆疊 + idx_stack = [0] # 索引堆疊 + + while not stop_evt.is_set(): + # 檢查是否需要退出 + if stop_evt.is_set(): + break + + current_menu = menu_stack[-1] + current_idx = idx_stack[-1] + + stdscr.clear() + + stdscr.border() + stdscr.addstr(0, 10, " MavLink MiddleWare ", curses.A_BOLD) + stdscr.addstr(1, 2, f"mavlink bridge state : ") + stdscr.addstr(2, 2, f"object manager state : ") + stdscr.addstr(3, 2, f"Node state : ") + + # # Header - 顯示選單路徑 + # path = " → ".join([menu.name for menu in menu_stack]) + # stdscr.addstr(0, 2, f"控制面板: {path}", curses.A_BOLD) + # stdscr.addstr(1, 2, f"狀態: {state.status} | 速度: {state.speed} | 計數: {state.counter}") + # if state.user_input: + # stdscr.addstr(2, 2, f"輸入: {state.user_input[:50]}...") + + # 顯示當前選單項目 + start_line = 5 + for i, child in enumerate(current_menu.children): + marker = "➤ " if i == current_idx else " " + line = f"{marker}{child.name:15s} – {child.desc}" + attr = curses.A_REVERSE if i == current_idx else curses.A_NORMAL + stdscr.addstr(start_line + i, 4, line, attr) + + # 操作說明 + help_line = start_line + len(current_menu.children) + 2 + stdscr.addstr(help_line, 2, "操作: ↑↓選擇 Enter確認 ←返回上層 ←→調參數 q退出", curses.A_DIM) + + stdscr.refresh() + + # 設定短暫的 timeout,讓執行緒能夠響應 stop_evt + stdscr.timeout(100) # 100ms timeout + ch = stdscr.getch() + + if ch == -1: # timeout,繼續檢查 stop_evt + continue + + # 處理按鍵 + if ch in (curses.KEY_UP, ord('k')): + idx_stack[-1] = (current_idx - 1) % len(current_menu.children) + + elif ch in (curses.KEY_DOWN, ord('j')): + idx_stack[-1] = (current_idx + 1) % len(current_menu.children) + + elif ch == curses.KEY_LEFT: + # 返回上層或調整參數 + if len(menu_stack) > 1: + menu_stack.pop() + idx_stack.pop() + else: + # 在根選單,檢查是否為調整參數 + selected = current_menu.children[current_idx] + if selected.action == "ADJUST_SPEED": + state.speed = max(1, state.speed - 1) + + elif ch == curses.KEY_RIGHT: + # 調整參數 + selected = current_menu.children[current_idx] + if selected.action == "ADJUST_SPEED": + state.speed = min(10, state.speed + 1) + + elif ch in (curses.KEY_ENTER, 10, 13): + selected = current_menu.children[current_idx] + + # 處理不同類型的動作 + if selected.children: # 有子選單 + menu_stack.append(selected) + idx_stack.append(0) + + elif selected.action == "BACK": + if len(menu_stack) > 1: + menu_stack.pop() + idx_stack.pop() + + elif selected.action == "QUIT": + break + + elif selected.action == "INPUT_TEXT": + # 進入輸入模式 + result = input_dialog(stdscr, "請輸入文字: ") + if result is not None: + cmd_q.put(lambda: state.set_user_input(result)) + + elif selected.action == "SHOW_STATUS": + # 顯示狀態訊息 + stdscr.clear() + stdscr.addstr(5, 2, f"當前狀態: {state.status}") + stdscr.addstr(6, 2, f"速度設定: {state.speed}") + stdscr.addstr(7, 2, f"計數器: {state.counter}") + stdscr.addstr(9, 2, "按任意鍵返回...") + stdscr.refresh() + stdscr.getch() + + elif selected.action == "SHOW_INPUT": + # 顯示用戶輸入 + stdscr.clear() + stdscr.addstr(5, 2, f"用戶輸入內容:") + stdscr.addstr(6, 2, f"{state.user_input}") + stdscr.addstr(8, 2, "按任意鍵返回...") + stdscr.refresh() + stdscr.getch() + + elif callable(selected.action): + # 執行函式 + cmd_q.put(selected.action) + + elif ch in (ord('q'), 27): + break + + try: + curses.wrapper(draw_menu) + except KeyboardInterrupt: + pass + finally: + cleanup() + +def main(): + logger.warning(f"Hello this is mainOrchestrator.py") + pp = ControlPanel() + + cmd_q = queue.Queue() + stop_evt = threading.Event() + panel_thread_obj = None + + def signal_handler(signum, frame): + """處理 Ctrl+C 信號""" + print("\n收到中斷信號,正在安全退出...") + stop_evt.set() + if panel_thread_obj and panel_thread_obj.is_alive(): + panel_thread_obj.join(timeout=2) + sys.exit(0) + + # 註冊信號處理器 + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # 啟動控制面板(改為非 daemon) + panel_thread_obj = threading.Thread(target=pp.panel_thread, args=(cmd_q, stop_evt)) + panel_thread_obj.start() + + print("多層選單控制面板啟動。Ctrl+C 結束程式。") + + try: + while not stop_evt.is_set(): + # 取出面板丟過來的「動作」 + try: + fn = cmd_q.get_nowait() + fn() # 執行對應動作 + except queue.Empty: + pass + + # # 模擬你的長跑邏輯 + # if state.status == "running": + # # 依 speed 前進 + # state.counter += state.speed + + time.sleep(0.1) + except KeyboardInterrupt: + print("\n收到 Ctrl+C,準備結束...") + finally: + stop_evt.set() + if panel_thread_obj.is_alive(): + panel_thread_obj.join(timeout=2) + pass + + +if __name__ == "__main__": + main() + + diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py index 47a1c92..2d7ea06 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkDevice.py @@ -1,10 +1,12 @@ +import os + # 自定義的 import -from .theLogger import setup_logger +from .utils import setup_logger # ====================== 分割線 ===================== -logger = setup_logger("mavlinkDevice.py") +logger = setup_logger(os.path.basename(__file__)) # 用來記錄每個 system 的資訊 # 資料格式 { sysid : mavlink_device object } diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py index dadb58f..a132e97 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkObject.py @@ -12,6 +12,7 @@ # 基礎功能的 import import threading +import os # import queue import time import asyncio @@ -27,12 +28,12 @@ from rclpy.node import Node # 自定義的 import from .mavlinkDevice import mavlink_device, mavlink_systems from .mavlinkPublish import mavlink_publisher -from .theLogger import setup_logger -from .ringBuffer import RingBuffer +from .utils import RingBuffer, setup_logger + # ====================== 分割線 ===================== -logger = setup_logger("mavlinkObject.py") +logger = setup_logger(os.path.basename(__file__)) stream_bridge_ring = RingBuffer(capacity=1024, buffer_id=255) return_packet_ring = RingBuffer(capacity=256, buffer_id=254) @@ -328,7 +329,7 @@ class mavlink_object: while self.state == MavlinkObjectState.RUNNING: timestamp = time.time() - try: + try: #TODO 這邊的錯誤處理要再想想看怎麼做比較好 # 處理接收到的封包 mavlinkMsg = self.mavlink_socket.recv_msg() # except Exception as e: @@ -368,7 +369,7 @@ class mavlink_object: if target_port != self.socket_id and target_port in self.mavlinkObjects: target_obj = self.mavlinkObjects[target_port] if target_obj.state == MavlinkObjectState.RUNNING: - try: + try: # TODO 藉由 if 的檢測 確定目標端口是開啟狀態後 再進行寫出 之後刪掉 try except target_obj.mavlink_socket.write(mavlinkMsg.get_msgbuf()) except Exception as e: logger.error(f"Error forwarding message to port {target_port}: {e}") @@ -449,7 +450,7 @@ class mavlink_object: try: # 使用鎖保護共享資源訪問 - with self.outgoing_msgs_lock: + with self.outgoing_msg_lock: self.outgoing_msgs.append(message_bytes) return True except Exception as e: @@ -559,7 +560,7 @@ class async_io_manager: task = self.managed_objects[socket_id] if task.done(): - try: + try: # TODO 這邊的錯誤處理要再想想看怎麼做比較好 exc = task.exception() if exc: logger.error(f"Task for mavlink_object {socket_id} raised exception: {exc}") diff --git a/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py index 727e47f..5dc3ca1 100644 --- a/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py +++ b/src/fc_network_adapter/fc_network_adapter/mavlinkPublish.py @@ -9,6 +9,8 @@ publisher topic name 命名規則為 <前綴詞>/s/<具體 topic> ''' +import os + # ROS2 的 import import std_msgs.msg import sensor_msgs.msg @@ -18,9 +20,9 @@ import mavros_msgs.msg import math # 自定義的 import -from .theLogger import setup_logger +from .utils import setup_logger -logger = setup_logger("mavlinkPublish.py") +logger = setup_logger(os.path.basename(__file__)) class mavlink_publisher(): diff --git a/src/fc_network_adapter/fc_network_adapter/test_mavlinkObject.py b/src/fc_network_adapter/fc_network_adapter/test_mavlinkObject.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/fc_network_adapter/fc_network_adapter/utils/__init__.py b/src/fc_network_adapter/fc_network_adapter/utils/__init__.py new file mode 100644 index 0000000..921faf4 --- /dev/null +++ b/src/fc_network_adapter/fc_network_adapter/utils/__init__.py @@ -0,0 +1,7 @@ +""" +共用工具模組 +""" +from .ringBuffer import RingBuffer +from .theLogger import setup_logger + +__all__ = ['RingBuffer', 'setup_logger'] \ No newline at end of file diff --git a/src/fc_network_adapter/fc_network_adapter/ringBuffer.py b/src/fc_network_adapter/fc_network_adapter/utils/ringBuffer.py similarity index 100% rename from src/fc_network_adapter/fc_network_adapter/ringBuffer.py rename to src/fc_network_adapter/fc_network_adapter/utils/ringBuffer.py diff --git a/src/fc_network_adapter/fc_network_adapter/theLogger.py b/src/fc_network_adapter/fc_network_adapter/utils/theLogger.py similarity index 93% rename from src/fc_network_adapter/fc_network_adapter/theLogger.py rename to src/fc_network_adapter/fc_network_adapter/utils/theLogger.py index c4dccfe..dce2ce7 100644 --- a/src/fc_network_adapter/fc_network_adapter/theLogger.py +++ b/src/fc_network_adapter/fc_network_adapter/utils/theLogger.py @@ -5,7 +5,7 @@ from logging.handlers import TimedRotatingFileHandler # 全域 Logger 實例 _global_logger = None -def setup_logger(name: str, log_dir: str = "log", level=logging.DEBUG) -> logging.Logger: +def setup_logger(name: str, log_dir: str = "logs", level=logging.DEBUG) -> logging.Logger: global _global_logger if _global_logger is None: diff --git a/src/fc_network_adapter/setup.py b/src/fc_network_adapter/setup.py index 33414cb..b28ac96 100644 --- a/src/fc_network_adapter/setup.py +++ b/src/fc_network_adapter/setup.py @@ -20,6 +20,7 @@ setup( tests_require=['pytest'], entry_points={ 'console_scripts': [ + 'mavlink_orchestrator = fc_network_adapter.mainOrchestrator:main', ], }, ) diff --git a/src/fc_network_adapter/tests/devRun.py b/src/fc_network_adapter/tests/demo_integration.py similarity index 98% rename from src/fc_network_adapter/tests/devRun.py rename to src/fc_network_adapter/tests/demo_integration.py index 48f9e2b..b621e73 100644 --- a/src/fc_network_adapter/tests/devRun.py +++ b/src/fc_network_adapter/tests/demo_integration.py @@ -13,12 +13,12 @@ import rclpy from pymavlink import mavutil # 自定義的 import -from fc_network_adapter import mavlinkObject as mo -from fc_network_adapter import mavlinkDevice as md +from ..fc_network_adapter import mavlinkObject as mo +from ..fc_network_adapter import mavlinkDevice as md # ====================== 分割線 ===================== -test_item = 12 +test_item = 10 running_time = 10000 diff --git a/src/fc_network_adapter/tests/demo_ringBuffer.py b/src/fc_network_adapter/tests/demo_ringBuffer.py new file mode 100644 index 0000000..a01ed73 --- /dev/null +++ b/src/fc_network_adapter/tests/demo_ringBuffer.py @@ -0,0 +1,152 @@ +import os +import sys +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import time +import threading + +from ..fc_network_adapter.utils import RingBuffer + + +def producer(buffer, count, interval=0.01): + """生產者:向緩衝區添加資料""" + print(f"Producer started (thread {threading.get_ident()})") + for i in range(count): + # 嘗試寫入數據,直到成功 + while not buffer.put(f"Item-{i}"): + print(f"Buffer full, producer waiting... (thread {threading.get_ident()})") + time.sleep(0.1) + + print(f"Produced: Item-{i}, buffer size: {buffer.size()}") + time.sleep(interval) # 模擬生產過程 + + print(f"Producer finished (thread {threading.get_ident()})") + +def consumer(buffer, max_items, interval=0.05): + """消費者:從緩衝區讀取資料""" + print(f"Consumer started (thread {threading.get_ident()})") + items_consumed = 0 + + while items_consumed < max_items: + # 嘗試讀取數據 + item = buffer.get() + if item: + print(f"Consumed: {item}, buffer size: {buffer.size()}") + items_consumed += 1 + else: + print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})") + + time.sleep(interval) # 模擬消費過程 + + print(f"Consumer finished (thread {threading.get_ident()})") + +def batch_consumer(buffer, interval=0.2): + """批量消費者:一次性讀取緩衝區中的所有資料""" + print(f"Batch consumer started (thread {threading.get_ident()})") + + for _ in range(5): # 執行5次批量讀取 + time.sleep(interval) # 等待緩衝區積累數據 + items = buffer.get_all() + if items: + print(f"Batch consumed {len(items)} items: {items}") + else: + print("Buffer empty for batch consumer") + + print(f"Batch consumer finished (thread {threading.get_ident()})") + +def demonstrate_multi_writer(): + """示範多個寫入執行緒同時操作緩衝區""" + print("\n=== Demonstrating Multiple Writers ===") + buffer = RingBuffer(capacity=80) + + # 創建多個生產者執行緒 + threads = [] + for i in range(3): + thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1))) + threads.append(thread) + thread.start() + + # 等待所有執行緒完成 + for thread in threads: + thread.join() + + buffer.print_stats() # 印出統計資訊 + + # 讀出所有剩餘資料 + remaining = buffer.get_all() + print(f"Remaining items in buffer after multiple writers: {remaining}") + +def demonstrate_basic_usage(): + """示範基本使用方式""" + print("\n=== Demonstrating Basic Usage ===") + # 創建緩衝區 + buffer = RingBuffer(capacity=20, buffer_id=7) + + # 檢查初始狀態 + print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") + + # 添加幾個項目 + for i in range(5): + buffer.put(f"Test-{i}") + + # 檢查狀態 + print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") + + # 讀取一個項目 + item = buffer.get() + print(f"Read item: {item}") + print(f"After reading 1 item - Content Size: {buffer.size()}") + + # 添加更多項目直到滿 + items_added = 0 + while not buffer.is_full(): + buffer.put(f"Fill-{items_added}") + items_added += 1 + + print(f"Added {items_added} more items until full") + print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") + + # 嘗試添加到已滿的緩衝區 + result = buffer.put("Overflow") + print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}") + + # 獲取所有項目 + all_items = buffer.get_all() + print(f"All items in buffer: {all_items}") + print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}") + + # 印出統計資訊 + buffer.print_stats() + +def demonstrate_producer_consumer(): + """示範生產者-消費者模式""" + print("\n=== Demonstrating Producer-Consumer Pattern ===") + buffer = RingBuffer(capacity=16) + + # 創建生產者和消費者執行緒 + producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1)) + consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2)) + batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5)) + + # 啟動執行緒 + producer_thread.start() + consumer_thread.start() + batch_thread.start() + + # 等待執行緒完成 + producer_thread.join() + consumer_thread.join() + batch_thread.join() + + # 檢查最終狀態 + print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}") + + buffer.print_stats() + +if __name__ == "__main__": + # 展示各種使用場景 + # demonstrate_basic_usage() + # demonstrate_producer_consumer() + demonstrate_multi_writer() + + print("\nAll demonstrations completed!") \ No newline at end of file diff --git a/src/fc_network_adapter/tests/test_mavlinkObject.py b/src/fc_network_adapter/tests/test_mavlinkObject.py index 23907ee..29744fc 100644 --- a/src/fc_network_adapter/tests/test_mavlinkObject.py +++ b/src/fc_network_adapter/tests/test_mavlinkObject.py @@ -16,7 +16,7 @@ import asyncio from unittest.mock import MagicMock, patch # 導入要測試的模組 -from fc_network_adapter.mavlinkObject import ( +from ..fc_network_adapter.mavlinkObject import ( mavlink_object, async_io_manager, MavlinkObjectState, @@ -466,3 +466,4 @@ class TestIntegration(unittest.TestCase): if __name__ == "__main__": unittest.main(defaultTest="TestMavlinkObject.test_send_message") + # unittest.main(defaultTest="TestAsyncIOManager") diff --git a/src/fc_network_adapter/tests/test_ringBuffer.py b/src/fc_network_adapter/tests/test_ringBuffer.py index 563087b..287a057 100644 --- a/src/fc_network_adapter/tests/test_ringBuffer.py +++ b/src/fc_network_adapter/tests/test_ringBuffer.py @@ -1,152 +1,296 @@ -import os -import sys -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +#!/usr/bin/env python +""" +測試 RingBuffer 類別的功能 +""" -import time +import unittest import threading +import time +from concurrent.futures import ThreadPoolExecutor -from fc_network_adapter.ringBuffer import RingBuffer - - -def producer(buffer, count, interval=0.01): - """生產者:向緩衝區添加資料""" - print(f"Producer started (thread {threading.get_ident()})") - for i in range(count): - # 嘗試寫入數據,直到成功 - while not buffer.put(f"Item-{i}"): - print(f"Buffer full, producer waiting... (thread {threading.get_ident()})") - time.sleep(0.1) - - print(f"Produced: Item-{i}, buffer size: {buffer.size()}") - time.sleep(interval) # 模擬生產過程 - - print(f"Producer finished (thread {threading.get_ident()})") +from ..fc_network_adapter.utils import RingBuffer -def consumer(buffer, max_items, interval=0.05): - """消費者:從緩衝區讀取資料""" - print(f"Consumer started (thread {threading.get_ident()})") - items_consumed = 0 - - while items_consumed < max_items: - # 嘗試讀取數據 - item = buffer.get() - if item: - print(f"Consumed: {item}, buffer size: {buffer.size()}") - items_consumed += 1 - else: - print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})") - - time.sleep(interval) # 模擬消費過程 - - print(f"Consumer finished (thread {threading.get_ident()})") +class TestRingBuffer(unittest.TestCase): + """測試 RingBuffer 基本功能""" + + def setUp(self): + """每個測試前的準備""" + self.buffer = RingBuffer(capacity=8) + + def test_initialization(self): + """測試初始化""" + self.assertEqual(self.buffer.capacity, 8) + self.assertTrue(self.buffer.is_empty()) + self.assertFalse(self.buffer.is_full()) + self.assertEqual(self.buffer.size(), 0) + + def test_put_get_basic(self): + """測試基本的放入和取出""" + # 測試放入 + self.assertTrue(self.buffer.put("item1")) + self.assertTrue(self.buffer.put("item2")) + self.assertEqual(self.buffer.size(), 2) + self.assertFalse(self.buffer.is_empty()) + + # 測試取出 + item1 = self.buffer.get() + self.assertEqual(item1, "item1") + self.assertEqual(self.buffer.size(), 1) + + item2 = self.buffer.get() + self.assertEqual(item2, "item2") + self.assertTrue(self.buffer.is_empty()) + + # 空緩衝區取出應返回 None + self.assertIsNone(self.buffer.get()) + + def test_capacity_overflow(self): + """測試緩衝區容量限制""" + # 填滿緩衝區 (容量-1,因為需要預留一個位置) + for i in range(7): # 8-1=7 + self.assertTrue(self.buffer.put(f"item{i}")) + + self.assertTrue(self.buffer.is_full()) + + # 嘗試再放入一個應該失敗 + self.assertFalse(self.buffer.put("overflow")) + self.assertEqual(self.buffer.overflow_count.value, 1) + + def test_get_all(self): + """測試取出所有項目""" + items = ["a", "b", "c", "d"] + for item in items: + self.buffer.put(item) + + all_items = self.buffer.get_all() + self.assertEqual(all_items, items) + self.assertTrue(self.buffer.is_empty()) + + def test_clear(self): + """測試清空緩衝區""" + for i in range(5): + self.buffer.put(f"item{i}") + + self.buffer.clear() + self.assertTrue(self.buffer.is_empty()) + self.assertEqual(self.buffer.size(), 0) -def batch_consumer(buffer, interval=0.2): - """批量消費者:一次性讀取緩衝區中的所有資料""" - print(f"Batch consumer started (thread {threading.get_ident()})") - - for _ in range(5): # 執行5次批量讀取 - time.sleep(interval) # 等待緩衝區積累數據 - items = buffer.get_all() - if items: - print(f"Batch consumed {len(items)} items: {items}") - else: - print("Buffer empty for batch consumer") - - print(f"Batch consumer finished (thread {threading.get_ident()})") -def demonstrate_multi_writer(): - """示範多個寫入執行緒同時操作緩衝區""" - print("\n=== Demonstrating Multiple Writers ===") - buffer = RingBuffer(capacity=80) - - # 創建多個生產者執行緒 - threads = [] - for i in range(3): - thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1))) - threads.append(thread) - thread.start() - - # 等待所有執行緒完成 - for thread in threads: - thread.join() +class TestRingBufferThreadSafety(unittest.TestCase): + """測試 RingBuffer 的線程安全性""" + + def setUp(self): + """每個測試前的準備""" + self.buffer = RingBuffer(capacity=256) + self.results = [] + self.write_count = 1000 + self.read_count = 1000 + + def test_concurrent_producers_consumers(self): + """測試多生產者多消費者場景""" + self.results = [] + stats = self.buffer.get_stats() + self.assertEqual(stats['total_writes'], 0) + + def producer(producer_id, count): + """生產者函數""" + for i in range(count): + item = f"producer_{producer_id}_item_{i}" + while not self.buffer.put(item): + time.sleep(0.001) # 緩衝區滿時稍微等待 + + def consumer(consumer_id, count): + """消費者函數""" + items = [] + for _ in range(count): + item = None + while item is None: + item = self.buffer.get() + if item is None: + time.sleep(0.001) # 緩衝區空時稍微等待 + items.append(item) + self.results.extend(items) + + # 創建多個生產者和消費者 + with ThreadPoolExecutor(max_workers=8) as executor: + # 2 個生產者,每個寫入 500 個項目 + producer_futures = [ + executor.submit(producer, 0, 500), + executor.submit(producer, 1, 500) + ] + + # 2 個消費者,每個讀取 500 個項目 + consumer_futures = [ + executor.submit(consumer, 0, 500), + executor.submit(consumer, 1, 500) + ] + + # 等待所有任務完成 + for future in producer_futures + consumer_futures: + future.result() + + # 驗證結果 + self.assertEqual(len(self.results), 1000) + self.assertTrue(self.buffer.is_empty()) + + # 檢查統計數據 + stats = self.buffer.get_stats() + self.assertEqual(stats['total_writes'], 1000) + self.assertGreater(stats['total_reads'], 1000) # 包含失敗的讀取嘗試 + self.assertGreater(stats['write_threads'], 1) + self.assertGreater(stats['read_threads'], 1) + + def test_high_throughput(self): + """測試高吞吐量場景""" + items_per_thread = 10000 + num_threads = 4 + + def writer(): + for i in range(items_per_thread): + while not self.buffer.put(i): + pass # 忙等待 + + def reader(): + items = [] + for _ in range(items_per_thread): + item = None + while item is None: + item = self.buffer.get() + items.append(item) + self.results.extend(items) + + start_time = time.time() + + with ThreadPoolExecutor(max_workers=num_threads * 2) as executor: + # 啟動寫入線程 + writer_futures = [executor.submit(writer) for _ in range(num_threads)] + + # 啟動讀取線程 + reader_futures = [executor.submit(reader) for _ in range(num_threads)] + + # 等待完成 + for future in writer_futures + reader_futures: + future.result() + + end_time = time.time() + + # 驗證結果 + total_items = items_per_thread * num_threads + self.assertEqual(len(self.results), total_items) + + # 性能統計 + duration = end_time - start_time + throughput = total_items / duration + + print(f"\nHigh Throughput Test Results:") + print(f"Total items: {total_items}") + print(f"Duration: {duration:.2f}s") + print(f"Throughput: {throughput:.0f} items/sec") + + # 顯示詳細統計 + self.buffer.print_stats() - buffer.print_stats() # 印出統計資訊 - # 讀出所有剩餘資料 - remaining = buffer.get_all() - print(f"Remaining items in buffer after multiple writers: {remaining}") +class TestRingBufferStatistics(unittest.TestCase): + """測試 RingBuffer 的統計功能""" + + def test_statistics_tracking(self): + """測試統計數據追蹤""" + buffer = RingBuffer(capacity=16) + + # 寫入一些數據 + for i in range(10): + buffer.put(f"item{i}") + + # 讀取一些數據 + for _ in range(5): + buffer.get() + + stats = buffer.get_stats() + + # 驗證基本統計 + self.assertEqual(stats['total_writes'], 10) + self.assertEqual(stats['total_reads'], 5) + self.assertEqual(stats['current_size'], 5) + self.assertEqual(stats['write_threads'], 1) + self.assertEqual(stats['read_threads'], 1) + + def test_reset_statistics(self): + """測試重置統計數據""" + buffer = RingBuffer(capacity=16) + + # 產生一些活動 + for i in range(5): + buffer.put(f"item{i}") + for _ in range(3): + buffer.get() + + # 重置統計 + buffer.reset_stats() + + stats = buffer.get_stats() + self.assertEqual(stats['total_writes'], 0) + self.assertEqual(stats['total_reads'], 0) + self.assertEqual(stats['concurrent_writes'], 0) + self.assertEqual(stats['concurrent_reads'], 0) + self.assertEqual(stats['overflow_count'], 0) -def demonstrate_basic_usage(): - """示範基本使用方式""" - print("\n=== Demonstrating Basic Usage ===") - # 創建緩衝區 - buffer = RingBuffer(capacity=20, buffer_id=7) - - # 檢查初始狀態 - print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") - - # 添加幾個項目 - for i in range(5): - buffer.put(f"Test-{i}") - - # 檢查狀態 - print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") - - # 讀取一個項目 - item = buffer.get() - print(f"Read item: {item}") - print(f"After reading 1 item - Content Size: {buffer.size()}") - - # 添加更多項目直到滿 - items_added = 0 - while not buffer.is_full(): - buffer.put(f"Fill-{items_added}") - items_added += 1 - - print(f"Added {items_added} more items until full") - print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}") - - # 嘗試添加到已滿的緩衝區 - result = buffer.put("Overflow") - print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}") - - # 獲取所有項目 - all_items = buffer.get_all() - print(f"All items in buffer: {all_items}") - print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}") - # 印出統計資訊 +def benchmark_ringbuffer(): + """RingBuffer 性能基準測試""" + print("\n=== RingBuffer Performance Benchmark ===") + + buffer = RingBuffer(capacity=1024) + num_operations = 100000 + + # 單線程性能測試 + start_time = time.time() + for i in range(num_operations): + buffer.put(i) + for _ in range(num_operations): + buffer.get() + end_time = time.time() + + single_thread_time = end_time - start_time + throughput = (num_operations * 2) / single_thread_time + + print(f"Single Thread: {throughput:.0f} ops/sec") + + # 多線程性能測試 + buffer = RingBuffer(capacity=1024) + + def producer(): + for i in range(num_operations // 2): + while not buffer.put(i): + pass + + def consumer(): + for _ in range(num_operations // 2): + while buffer.get() is None: + pass + + start_time = time.time() + + with ThreadPoolExecutor(max_workers=2) as executor: + future1 = executor.submit(producer) + future2 = executor.submit(consumer) + future1.result() + future2.result() + + end_time = time.time() + + multi_thread_time = end_time - start_time + throughput = num_operations / multi_thread_time + + print(f"Multi Thread: {throughput:.0f} ops/sec") + print(f"Speedup: {single_thread_time/multi_thread_time:.2f}x") + buffer.print_stats() -def demonstrate_producer_consumer(): - """示範生產者-消費者模式""" - print("\n=== Demonstrating Producer-Consumer Pattern ===") - buffer = RingBuffer(capacity=16) - - # 創建生產者和消費者執行緒 - producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1)) - consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2)) - batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5)) - - # 啟動執行緒 - producer_thread.start() - consumer_thread.start() - batch_thread.start() - - # 等待執行緒完成 - producer_thread.join() - consumer_thread.join() - batch_thread.join() - - # 檢查最終狀態 - print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}") - - buffer.print_stats() if __name__ == "__main__": - # 展示各種使用場景 - # demonstrate_basic_usage() - # demonstrate_producer_consumer() - demonstrate_multi_writer() + # 運行單元測試 + unittest.main(argv=[''], exit=False, verbosity=2) - print("\nAll demonstrations completed!") \ No newline at end of file + # 運行性能基準測試 + benchmark_ringbuffer() \ No newline at end of file