You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
AirTrapMine/src/fc_network_adapter/tests/demo_ringBuffer.py

152 lines
5.0 KiB
Python

import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import time
import threading
from ..fc_network_adapter.utils import RingBuffer
def producer(buffer, count, interval=0.01):
"""生產者:向緩衝區添加資料"""
print(f"Producer started (thread {threading.get_ident()})")
for i in range(count):
# 嘗試寫入數據,直到成功
while not buffer.put(f"Item-{i}"):
print(f"Buffer full, producer waiting... (thread {threading.get_ident()})")
time.sleep(0.1)
print(f"Produced: Item-{i}, buffer size: {buffer.size()}")
time.sleep(interval) # 模擬生產過程
print(f"Producer finished (thread {threading.get_ident()})")
def consumer(buffer, max_items, interval=0.05):
"""消費者:從緩衝區讀取資料"""
print(f"Consumer started (thread {threading.get_ident()})")
items_consumed = 0
while items_consumed < max_items:
# 嘗試讀取數據
item = buffer.get()
if item:
print(f"Consumed: {item}, buffer size: {buffer.size()}")
items_consumed += 1
else:
print(f"Buffer empty, consumer waiting... (thread {threading.get_ident()})")
time.sleep(interval) # 模擬消費過程
print(f"Consumer finished (thread {threading.get_ident()})")
def batch_consumer(buffer, interval=0.2):
"""批量消費者:一次性讀取緩衝區中的所有資料"""
print(f"Batch consumer started (thread {threading.get_ident()})")
for _ in range(5): # 執行5次批量讀取
time.sleep(interval) # 等待緩衝區積累數據
items = buffer.get_all()
if items:
print(f"Batch consumed {len(items)} items: {items}")
else:
print("Buffer empty for batch consumer")
print(f"Batch consumer finished (thread {threading.get_ident()})")
def demonstrate_multi_writer():
"""示範多個寫入執行緒同時操作緩衝區"""
print("\n=== Demonstrating Multiple Writers ===")
buffer = RingBuffer(capacity=80)
# 創建多個生產者執行緒
threads = []
for i in range(3):
thread = threading.Thread(target=producer, args=(buffer, 5, 0.1 * (i+1)))
threads.append(thread)
thread.start()
# 等待所有執行緒完成
for thread in threads:
thread.join()
buffer.print_stats() # 印出統計資訊
# 讀出所有剩餘資料
remaining = buffer.get_all()
print(f"Remaining items in buffer after multiple writers: {remaining}")
def demonstrate_basic_usage():
"""示範基本使用方式"""
print("\n=== Demonstrating Basic Usage ===")
# 創建緩衝區
buffer = RingBuffer(capacity=20, buffer_id=7)
# 檢查初始狀態
print(f"Initial buffer state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 添加幾個項目
for i in range(5):
buffer.put(f"Test-{i}")
# 檢查狀態
print(f"After adding 5 items - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 讀取一個項目
item = buffer.get()
print(f"Read item: {item}")
print(f"After reading 1 item - Content Size: {buffer.size()}")
# 添加更多項目直到滿
items_added = 0
while not buffer.is_full():
buffer.put(f"Fill-{items_added}")
items_added += 1
print(f"Added {items_added} more items until full")
print(f"Buffer full state - Empty: {buffer.is_empty()}, Full: {buffer.is_full()}, Content Size: {buffer.size()}")
# 嘗試添加到已滿的緩衝區
result = buffer.put("Overflow")
print(f"Attempt to add to full buffer: {'Succeeded' if result else 'Failed'}")
# 獲取所有項目
all_items = buffer.get_all()
print(f"All items in buffer: {all_items}")
print(f"Buffer after get_all() - Empty: {buffer.is_empty()}, Content Size: {buffer.size()}")
# 印出統計資訊
buffer.print_stats()
def demonstrate_producer_consumer():
"""示範生產者-消費者模式"""
print("\n=== Demonstrating Producer-Consumer Pattern ===")
buffer = RingBuffer(capacity=16)
# 創建生產者和消費者執行緒
producer_thread = threading.Thread(target=producer, args=(buffer, 20, 0.1))
consumer_thread = threading.Thread(target=consumer, args=(buffer, 3, 0.2))
batch_thread = threading.Thread(target=batch_consumer, args=(buffer, 0.5))
# 啟動執行緒
producer_thread.start()
consumer_thread.start()
batch_thread.start()
# 等待執行緒完成
producer_thread.join()
consumer_thread.join()
batch_thread.join()
# 檢查最終狀態
print(f"Final buffer state - Empty: {buffer.is_empty()}, Size: {buffer.size()}")
buffer.print_stats()
if __name__ == "__main__":
# 展示各種使用場景
# demonstrate_basic_usage()
# demonstrate_producer_consumer()
demonstrate_multi_writer()
print("\nAll demonstrations completed!")